tags:

views:

55

answers:

2

Hi, I am getting following error,after few few hours of successful run.

Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.6/dist-packages/twisted/python/threadpool.py", line 210, in _worker result = context.call(ctx, function, *args, **kwargs) File "/usr/lib/python2.6/dist-packages/twisted/python/context.py", line 59, in callWithContext return self.currentContext().callWithContext(ctx, func, *args, **kw) File "/usr/lib/python2.6/dist-packages/twisted/python/context.py", line 37, in callWithContext return func(args,*kw) --- --- File "/usr/lib/python2.6/dist-packages/twisted/enterprise/adbapi.py", line 436, in _runInteraction conn.rollback() File "/usr/lib/python2.6/dist-packages/twisted/enterprise/adbapi.py", line 52, in rollback self._connection.rollback() _mysql_exceptions.OperationalError: (2006, 'MySQL server has gone away')

My code is something like this...

from twisted.internet import reactor, defer,threads from twisted.enterprise import adbapi

dbpool = adbapi.ConnectionPool("MySQLdb", '192.168.1.102','test', 'test', 'test') class Scanner: def _execQuery(self,txn): sql="SELECT tool_id,tool_name FROM tool_master" txn.execute(sql) result = txn.fetchall() return result def objCursor(self): return dbpool.runInteraction(self._execQuery)

def printResult(self,result): print "resssssssssssssssssss",result reactor.callLater(3,self.deferExecute)

def deferExecute(self): self.objCursor().addCallback(self.printResult) Scanner()

class MyApp(object): reactor.callInThread(Scanner().deferExecute) reactor.run() MyApp()

can anyone tell me why I am getting this error..

thanx

A: 

can anyone tell me why I am getting this error.. because you're doing it wrong.

runInteraction runs the supplied function with an argument of a cursor to a transaction which is run in a thread. You shouldn't be calling reactor.callInThread(Scanner().deferExecute).

  • It's better to use a twisted.internet.task.LoopingCall, it will make sure that the call completes before the next is fired.
  • You're just running a query in your example, so you could just use ConnectionPool.runQuery instead of ConnectionPool.runInteraction.
  • Use errorBack functions to report on Exceptions.

Attempting to correct for your badly formatted code, I think you've got this:

from twisted.internet import reactor, defer,threads
from twisted.enterprise import adbapi

dbpool = adbapi.ConnectionPool("MySQLdb", '192.168.1.102','test', 'test', 'test')

class Scanner:
  def _execQuery(self,txn):
    sql="SELECT tool_id,tool_name FROM tool_master"
    txn.execute(sql)
    result = txn.fetchall()
    return result

  def objCursor(self):
    return dbpool.runInteraction(self._execQuery)

  def printResult(self,result):
    print "resssssssssssssssssss",result
    reactor.callLater(3,self.deferExecute)

  def deferExecute(self):
    self.objCursor().addCallback(self.printResult)

Scanner()

class MyApp(object):
  reactor.callInThread(Scanner().deferExecute)
  reactor.run()

MyApp()

When you probably need something like the following instead. If you're planning on writing a twisted Application will be easy to modify this Scanner class to inherit from twisted.application.service.Service.

from twisted.internet import reactor, defer, task
from twisted.enterprise import adbapi

class Scanner(object):
  def __init__(self,dbpool=None):
    self.dbpool = dbpool
    self.loopCall = task.LoopingCall(self.myQuery)
  def start(self):
    print "Started scanner"
    self.loopCall.start(3)
  def stop(self):
    print "Stopping scanner"
    self.loopCall.stop()
  def myQuery(self):
    def interact(txn):
      sql="SELECT tool_id,tool_name FROM tool_master"
      txn.execute(sql)
      return txn.fetchall()
    d = self.dbpool.runInteraction(interact)
    d.addCallbacks(self.printResult,self.printError)
  def printResult(self,result):
    print "Got Result: %r" % result
  def printError(self,error):
    print "Got Error: %r" % error
    error.printTraceback()

if __name__ == '__main__':
  from twisted.internet import reactor
  dbpool = adbapi.ConnectionPool("MySQLdb", '192.168.1.102','test', 'test', 'test')
  s = Scanner(dbpool)
  reactor.callWhenRunning(s.start)
  reactor.addSystemEventTrigger('before','shutdown',s.stop)
  reactor.run()
MattH
thanx MattH,your explanation cleared my lot of confusion. But I don't have to run one query,I have to run two-three queries such that one query output is input for other.that's why I used runInteraction() method. I have a question from runInteraction() method 'MattH',can I make a method such that its cursor say in above case "txn" will be used in another method? cause I have such requirement that I have to inreract with database many time.
jitendra
@jitendra: `runInteraction` is to do work with a database within a single transaction, in a thread. This means you have to be careful what the function you pass does because it may become non-thread-safe. Long-lived transactions are generally a bad thing. You don't want to be hanging on to the "txn". The ConnectionPool is there to manage database connection resources, unless you discover otherwise, assume that it offers reasonable performance. It varies between databases, but generally creating cursors is cheaper than creating connections.
MattH
suppose I got res (res={1:[2,3],2:[3,4]}) and I am passing this value in another method.now based on these values I have to interact with database like.. def printResult(self,res): other_class_obj.generate(res) class other_class: generate(self,res={}): for id in res.iterkeys(): for tool in res[id]: sql_tool_setting="select * from tool_asset_settings where tool_id =%s" %(tool,) #here you have to run this query and other queries also.then should I use runInteraction method?
jitendra
@jitendra: Why are you using another class? If you're only using the result of a single select to select more things, the chances are that it can be consolidated into a single SQL query. You use transactions in this case when you are concerned that the data that you're querying may change between selects. Either consolidate your queries into one or execute all of the queries in the single `interact` function defined. Also, you should use the database API parameterization instead of doing it yourself with string formatting.
MattH
jitendra
The twisted reactor is single threaded. The runInteraction thread will only live roughly as long as the supplied function lives. So there will be one or occasionally two threads. How about an 'up-vote' and 'accepted answer'?
MattH
In one function I use deferToThread(self.exectool, tool) method in main reactor.Passing tool(got an id from db) into method exectool. def exectool(self,tool): exec tool return def getResult(self,tool): return deferToThread(self.exectool, tool)
jitendra
Code in comments doesn't show up very well. If you have a new question, I suggest that you start a new question and make a little more effort formatting your question so that the code is readable.
MattH
alright MattH,Actually this was dependent on above code thats why I sent in this thread.
jitendra
A: 

After all the suggestion & help by Matt I have following code which is running successfuly..:)

!usr/bin/python

Using the "dbmodule" from the previous example, create a ConnectionPool

from twisted.internet import reactor

from twisted.enterprise import adbapi

from twisted.internet import reactor, defer,threads

from twisted.python.threadpool import ThreadPool

import itertools

from twisted.internet.threads import deferToThread

from twisted.internet import reactor, defer, task

from tools.printTime import *

from tools.getVersion import *

from sh_log import *

concurrent = 30 finished=itertools.count(1) reactor.suggestThreadPoolSize(concurrent)

Creating Global Instance variables

path="tools" lo=Log()

class ToolsBuilder:

def build(self,txn,tool,asset_id):

if tool:

    print "\n"
    try:

        sql="select tool_filename from tool_master where tool_id = %s" %(tool,)
        sql_asset="select asset_url from asset_master where asset_id = %s" %(asset_id,)
        txn.execute(sql_asset)
        asset_url = txn.fetchall()
        log_date=lo.log_date()
        txn.execute(sql)
        result = txn.fetchall()

        log='\n'+log_date+"::"+str(result[0][0])+ " tool object is created......\n"
        lo.wfile(log)
        temp=(path +'/' + str(result[0][0]))

        if result:
                    if temp:
                        f=open(temp).read()
                        obj_tool=compile(f, 'a_filename', 'exec')
                        return obj_tool
    except:
        lo.wfile("Error in creating executable tool object......")

tb=ToolsBuilder()

class ToolsVectorGenerator:

def generate(self,txn,res_set={}):

v1=[]
for asset_id in res_set.iterkeys():
    try:
        obj_tools=[]
        if asset_id:
            print "asset_id..............................",asset_id
            log_date=lo.log_date()
            log=log_date+"::"+" \nVector generation for the asset number............:"+str(asset_id)
            lo.wfile(log)
            vector=[]
            tools_arr=[]
            obj_tools=[]
            for tool in res_set[asset_id]:
                if tool:
                    print "tool..............",tool
                    temp_tool=tb.build(txn,tool,asset_id)
                    print "temp_tool..........",temp_tool

                    #fetch data of tool setting.....
                    sql_tool_setting="select * from tool_asset_settings where tool_id =%s" %(tool,)
                    txn.execute(sql_tool_setting)
                    result_tool_setting = txn.fetchall()
                    tool_id=result_tool_setting[0][1]
                    t_id=int(tool_id)
                    tool_id_arr=[]
                    tool_id_arr.append(t_id)
                    tool_id_arr.append(result_tool_setting)
                    tool_id_arr.append(temp_tool)
                    tools_arr.append(tool_id_arr)

            #fetch data from asset master
            sql_asset="select asset_name from asset_master where asset_id=%s" %(asset_id,)
            txn.execute(sql_asset)
            result_asset = txn.fetchall()
            vector.append(result_asset)
            vector.append(tools_arr)
    except:
        lo.wfile("\nError in getting asset,please check your database or network connection......")
    tvm.executeVector(vector)

tvg=ToolsVectorGenerator()

class Tool:

def exectool(self,tool):

    exec tool
    return

def getResult(self,tool):

        return deferToThread(self.exectool, tool)

to=Tool()

class StateMachine:

def setPriority(self,txn,tup):

    temp=[]
    arr=[]
    for li in tup:
        sql2="select tool_dependency from tool_asset_settings where tool_id =%s" %(li[1],)
        txn.execute(sql2)
        result12 = txn.fetchall()
        arr=[]
        if result12[0][0]!=None:
            tup12=result12[0][0]
            arr=(li[0],tup12)
#                print "arr.........",arr
            if arr in tup:
                print "This element is already exist......."
            else:
                temp.append(arr)
    temp.extend(tup)
    return tuple(temp)

st=StateMachine()

class ToolsVectorExecutionManager(object):

def executeVector(self,toolsvector):

    print "toolsvector================>",toolsvector
    if toolsvector:
        for tools in toolsvector[1]:
            if tools[2] != None:
                to.getResult(tools[2])

tvm=ToolsVectorExecutionManager()

class ToolsToExecuteAnalyzer:

def init(self,dbpool=None): self.dbpool = dbpool self.loopCall = task.LoopingCall(self.myQuery)

def start(self): print "Started scanner" self.loopCall.start(3)

def stop(self): print "Stopping scanner" self.loopCall.stop()

def myQuery(self):

def interact(txn):

  sql="SELECT tool_asset_id,tool_execute_id FROM tool_to_execute where status='0'"
  txn.execute(sql)
  result=txn.fetchall()
  if result:
        tool_asset_id=tuple([int(e[0]) for e in result])
        tool_execute_id=tuple([int(e[1]) for e in result])
        if len(tool_asset_id)>1:
            sql1="SELECT asset_id,tool_id FROM tool_in_assets WHERE tool_asset_id IN %s"%(tool_asset_id,)
        else:
            sql1="SELECT asset_id,tool_id FROM tool_in_assets WHERE tool_asset_id = (%s)"%(tool_asset_id)
        txn.execute(sql1)
        tup = txn.fetchall()

dependency check for the selected tool

        asset_tool=st.setPriority(txn,tup)

        log_date=lo.log_date()
        log=log_date+"::priority have been set for the tools......\n"
        lo.wfile(log)

creating group of asset with their tools

        res={}
        for element in asset_tool:
            if element[0] in res:
                res[element[0]].append(int(element[1]))
            else:
                res[int(element[0])] = [int(element[1])]

Recored deletion from tool_to_execute table

        if res!=None and res.keys()!=[]:
            for asset_id in res.iterkeys():
                if len(tool_execute_id)>1:
                    sql_del="delete from tool_to_execute where tool_execute_id in %s " %(tool_execute_id,)
                else:
                    sql_del="delete from tool_to_execute where tool_execute_id = %s" %(tool_execute_id)
            txn.execute(sql_del)

New Addition of vector

        tvg.generate(txn,res)

return res

d = self.dbpool.runInteraction(interact)
d.addCallbacks(self.printResult,self.printError)

def printResult(self,res):

print "In printResult after generate...."

def printError(self,error):

print "Got Error: %r" % error
error.printTraceback()

ToolsToExecuteAnalyzer()

if name == 'main':

from twisted.internet import reactor

dbpool = adbapi.ConnectionPool("MySQLdb", 'localhost', 'test', 'test','test')

s = ToolsToExecuteAnalyzer(dbpool)

reactor.callWhenRunning(s.start)

reactor.addSystemEventTrigger('before','shutdown',s.stop)

reactor.run()

This is my whole code,I just wanted to know how many threads running,means for each tool new thread? any ways thanx Matt for your help..:)

jitendra
You might want to check out the [StackOverflow Markup Guide](http://stackoverflow.com/editing-help) to make your posts most readable by other users.
Chacha102