[Twisted-Python] patch implementing "fetchmany" from enterprise.adbapi

Clark C. Evans cce at clarkevans.com
Tue Mar 4 16:44:35 EST 2003


On Tue, Mar 04, 2003 at 12:43:59PM -0500, Itamar Shtull-Trauring wrote:
| This sounds like a good idea, and the iterator deferred thing does as
| well. But, as is the fetchmany() support will only work in 2.2, I think?
| This is a problem, we can't have core functionality run only in 2.2. 
|
| Adding the deferred/iterator thing is not an issue (assuming the codee
| is good, no chance to look at it yet), since that adds additional
| functionality for 2.2 users. The problem is adding generally useful code
| that doesn't run in 2.1.

I've made three updates to the patches:

  0) Added isError to Deferrable as in some cases this
     attribute isn't added before __del__ is invoked. 

  1) I added addFinishCallback to the MultiDeferred so that
     it is possible to be notified when iteration is finished.
     I didn't know what to call the finish_callback function...

  2) I added threads.StopIteration so that the whole mechanism
     is useable from 2.1 as well.  Besides the exception which
     marks the stop of the iteration loop, the whole iterator
     concept is more of a convention (with a wee bit of build-in
     sugar).  Since this code doesn't use the sugar, and uses
     a mock exception when the built-in exception isn't available,
     it should work well with both 2.1 and 2.2

Here are the new patches...  public domain as usual

--- defer.py.orig	Mon Mar  3 21:59:25 2003
+++ defer.py	Tue Mar  4 16:02:37 2003
@@ -94,6 +94,7 @@
     called = 0
     default = 0
     paused = 0
+    isError = 0
 
     def __init__(self):
         self.callbacks = []
@@ -291,6 +292,41 @@
             log.msg("Unhandled error in Deferred:")
             log.err(self.result)
 
+class MultiDeferred(Deferred):
+    """I am a deferred that can be called (visited) more than once.
+
+       This is a verision of Deferred where the callback can be
+       invoked as many times as necessary, but where the errback
+       can only be activated once.  This is accomplished by cloning
+       the current deferred object and carrying out the callbacks
+       on the clone.  When the callback is called, we call this
+       a 'visit'.
+
+       This Deferred adds a 'finish' callback which can be invoked
+       once all visits are completed.  Finish callbacks can be added
+       via 'addFinishCallback' and invoked with 'finish_callback'
+    """
+    def __init__(self):
+        Deferred.__init__(self)
+        self._finish = Deferred()
+    def _startRunCallbacks(self, result, isError):
+        if isError: 
+            Deferred._startRunCallbacks(result,1)
+            return
+        visit = Deferred()
+        visit.default = self.default
+        for x in self.callbacks:
+            visit.callbacks.append(x)
+        visit._startRunCallbacks(result,0)
+    def addFinishCallback(self, callback, *args, **kw):
+        """Add a callback to be executed when finished visiting"""
+        return self._finish.addCallbacks(callback, callbackArgs=args,
+                                         callbackKeywords=kw)
+    def finish_callback(self,result):
+        """Run all finish callbacks which have been added to
+           this deferred."""
+        self.called = 1
+        self._finish.callback(result)
 
 class DeferredList(Deferred):
     """I combine a group of deferreds into one callback.



--- threads.py.orig	Mon Mar  3 21:49:07 2003
+++ threads.py	Tue Mar  4 15:35:58 2003
@@ -60,5 +60,43 @@
     """
     reactor.callInThread(_runMultiple, tupleList)
 
+# support iterators for 2.1
+try: 
+   StopIteration = StopIteration  
+except:
+   class StopIteration(Exception): pass
+
+def _putIterationInDeferred(deferred, f, args, kwargs):
+    """Send the results of an iteration to a deferred.
+       The function called should return an object
+       with a next() operator.
+    """
+    from twisted.internet import reactor
+    try:
+        itr = apply(f, args, kwargs)
+        nCount = 0
+        while 1:
+            reactor.callFromThread(deferred.callback, itr.next())
+            nCount += 1
+    except StopIteration: 
+        reactor.callFromThread(deferred.finish_callback, nCount)
+    except:
+        f = failure.Failure()
+        reactor.callFromThread(deferred.errback, f)
+
+def deferIterationToThread(f, *args, **kwargs):
+    """Run the results of an iterator in a thread.
+
+       This returns a MultiDeferred object, which is a deferred 
+       having an added method, addFinishCallback to provide
+       a callback once the iteration has finished.
+
+       The function passed, when arguments applied, should
+       return an object with a next() method raising 
+       StopIteration when there isn't any more content.       
+    """
+    deferred = defer.MultiDeferred()
+    reactor.callInThread(_putIterationInDeferred, deferred, f, args, kwargs)
+    return deferred
 
 __all__ = ["deferToThread", "callMultipleInThread"]



--- adbapi.py.orig	Tue Mar  4 03:40:13 2003
+++ adbapi.py	Tue Mar  4 15:40:53 2003
@@ -99,6 +101,23 @@
         curs.close()
         return result
 
+    def _runQueryChunked(self, args, kw):
+        conn = self.connect()
+        curs = conn.cursor()
+        apply(curs.execute, args, kw)
+        class chunkIterator:
+            def __init__(self,curs):
+                self.curs = curs
+            def __iter__(self): 
+                return self
+            def next(self):
+                ret = self.curs.fetchmany()
+                if not ret:
+                    self.curs.close()
+                    raise threads.StopIteration
+                return ret
+        return chunkIterator(curs)
+
     def _runOperation(self, args, kw):
         """This is used for non-query operations that don't want "fetch*" to be called
         """
@@ -121,6 +140,15 @@
         threads.deferToThread(self._runQuery, args, kw).addCallbacks(
             callback, errback)
 
+    def queryChunked(self, *args, **kw):
+        """ Sets up a deferred execution query that returns
+            one or more result chunks.
+      
+            This method returns a MultiDeferred, which is notified when
+            the query has finished via its FinishCallback.
+        """
+        return threads.deferIterationToThread(self._runQueryChunked, args, kw)
+
     def operation(self, callback, errback, *args, **kw):
         threads.deferToThread(self._runOperation, args, kw).addCallbacks(
             callback, errback)


#
# usage of the new iterator deferred using 2.1
# (this works under 2.2 and should work under 2.1
#  but it has not been tested there)
#
from twisted.internet.threads import StopIteration
from twisted.internet.threads import deferIterationToThread
from twisted.internet import reactor
class producer:
    def __init__(self):
        self.val = 9
    def next(self):
        val = self.val
        if val < 1: raise StopIteration
        self.val -= 1
        return val
def bldr(): return producer()
def printResult(x): print x
def printDone(x): print "done", x
d = deferIterationToThread(bldr)
d.addCallback(printResult)
d.addFinishCallback(printDone)
reactor.run()

#
# usage of the new iterator deferred using 2.2
#
from __future__ import generators
from twisted.internet import threads
from twisted.internet import reactor
def printResult(x): print x
def printDone(x): print "done", x
def gene(start=99):
    while(start > 90):
        yield start
        start -= 1
d = threads.deferIterationToThread(gene)
d.addFinishCallback(printDone)
d.addCallback(printResult)
reactor.run()

#
# usage of the Chunked adbapi patch
#
from twisted.enterprise import adbapi
pool = adbapi.ConnectionPool("mx.ODBC.EasySoft","SomeDSN")
def good(lst):
    for itm in lst:
        print itm[0]
def done(cnt): print "done, blocks = ", cnt
d = pool.queryChunked("SELECT <query>")
d.addCallback(good)
d.addFinishCallback(done)
from twisted.internet import reactor
reactor.run()





More information about the Twisted-Python mailing list