[Twisted-Python] mutexes in twisted.enterprise?

Andrew Bennetts andrew-twisted at puzzling.org
Thu Apr 18 20:14:48 MDT 2002


On Fri, Apr 19, 2002 at 11:32:50AM +1000, Andrew Bennetts wrote:
> .acquire().  I'd recommend that the network reading thread pushes the
> data onto a Queue.Queue that is being popped by a DB thread.  If this is
> all you are doing with the DB, then perhaps you don't even need
> twisted.enterprise, which is more designed for getting data than
> inserting it.
> 
> I actually have a module somewhere that basically does exactly that, and
> returns Deferreds that trigger when individual data items are inserted.
> I might see if I can find it, and if it is not too specific that I can
> post it.

And here is that module.

It has a few issues, it assumes you're using mx.ODBC.Windows to talk to
MS SQL Server using a DSN rather than an ODBC link, and it has the
beginnings of some code to ensure that packets don't get lost if the DB
goes down, but isn't being used yet.  It could do with general tidying
up and documentation.  On the other hand, it seems to work for the
simple uses I've put it to. 

Oh, I forgot to add this comment: call .insertPacket(None) to terminate
the thread when it is done.  That can be handy combined with the
daemonic flag you can pass to __init__, because you can use it to ensure
that your program doesn't exit until the DB done.

If there's sufficient interest, I'll look into fixing it up and checking
it in...

-Andrew.

----
# Twisted imports
from twisted.python import threadable, defer, failure
from twisted.internet import task

# Standard library imports
from Queue import Queue
from cStringIO import StringIO
import threading, traceback, time

# Twiddle the mx.ODBC.Windows module to, well, work.
import mx.ODBC.Windows
from mx.ODBC.Windows import DataError, IntegrityError, ProgrammingError, \
                            InternalError
# Required by DBAPI 2.0 -- it should already be there, but isn't :(
mx.ODBC.Windows.threadsafety = 2
# For some reason, the default connect function doesn't work with our DSNs
mx.ODBC.Windows.connect = mx.ODBC.Windows.DriverConnect

threadable.init(1)

class PacketInserter:
    """Class for inserting data from network packets into a DB
    
    It creates a seperate thread for processing SQL statements.  
    """
    dsn = ("driver={SQL Server};provider=sqloledb;server=db02;"
           "uid=sa;pwd=*****;database=Test;app=Twisted;")
    
    
    def __init__(self, daemonic=1):
        self.queue = Queue()
        self.recoveryQueue = Queue()
        self.dbThread = threading.Thread(target=self._processQueue,
                                         name='DB-Thread-')
        self.dbThread.setDaemon(daemonic)
        self.successCount = self.errorCount = 0
        self.active = 0
        self._db = None
        self.dbThread.start()

    def insertPacket(self, packet):
        """Insert a packet into the db.

        packet should be an instance of something with a .toSQL method.
        
        It returns a defer.Deferred that is called with the packet that was
        passed to this method.  The Deferred will be called after the packet
        has been inserted (or it will have its errback called).
        """
        d = defer.Deferred()
        self.queue.put((packet, d))
        return d

    def _getNext(self):
        if self.queue.empty():
            self.active = 0
        packet, deferred = self.queue.get()
        self.active = 1
        return packet, deferred
        
    def _getCursor(self):
        if self._db is not None:
            try:
                self._db.close()
            except:
                pass
            self._db = None

        cursor = None
        try:
            self._db = mx.ODBC.Windows.connect(self.dsn)
            cursor = self._db.cursor()
            cursor.execute('set implicit_transactions off;')
            print 'Connected to DB'
            #self._doRecovery()  FIXME
        except Exception, e:
            print 'Unable to create DB connection:', e
        return cursor
        
    def _doRecovery(self):
        print 'Retrying missed packets...'
        while not self.recoveryQueue.empty():
            # FIXME
            pass
        
    def _processQueue(self):
        cursor = self._getCursor()
        packet, deferred = self._getNext()
        start = time.time()
        while 1:
            if packet is None:
                print 'End of queue marker found, DB-Thread terminating'
                break

            if cursor is None:
                time.sleep(1)
                cursor = self._getCursor()
                continue

            try:
                cursor.execute(packet.toSQL())

            except (ProgrammingError, DataError, IntegrityError, 
                    InternalError), e:
                # InternalErrors include 
                # "COMMIT TRAN has no corresponding BEGIN TRAN" errors, which
                # should be a programming error...
                print 'Error inserting SQL: "%s"' % (packet.toSQL(),)
                traceback.print_exc()

                print 'Discarding statement'
                self.errorCount += 1
                task.schedule(deferred.errback, Failure(e))
                packet, deferred = self._getNext()

            except Exception, e:
                print 'Error inserting SQL: "%s"' % (packet.toSQL(),)
                traceback.print_exc()

                print 'Queuing statement'
                # FIXME  self.recoveryQueue.put(packet)
                print 'Reconnecting to DB'
                task.schedule(deferred.errback, Failure(e))
                cursor = self._getCursor()
            else:
                # If the last statement executed ok, then get the next one
                self.successCount += 1
                task.schedule(deferred.callback, packet)
                packet, deferred = self._getNext()

        end = time.time()
        print 'DB-Thread processed %d packets (and rejected %d) in %0.3f secs' \
              % (self.successCount, self.errorCount, (end - start),)
        print '%0.3f packets/sec' % ((end - start)/self.successCount,)

----





More information about the Twisted-Python mailing list