[Twisted-Python] patch implementing "fetchmany" from enterprise.adbapi

Clark C. Evans cce at clarkevans.com
Tue Mar 4 02:32:28 EST 2003

Please find following a patch to add the ability for fetchmany()
to be used from within the enterprise adbapi.py interface.  This
uses the patches previously posted (and included following) which
enable an iterator to be deferred via a thread.  

Basically, one can use runQueryChunked instead of runQuery 
and it will call your callback once for each "fetchmany"
return, which, depending on the database driver can be
about 10-50 rows at a time.  This is essential for incremental
handling of large query results.

The patch also tests for "threadsaftey", for some reason
mxODBC has an apilevel of "2.0" yet they are missing this
attribute (they call it threadlevel).   I'm not fond of
this part of the patch, but it is included since this
is what I tested with (I don't have permissions to change

The remainder of the message is hereby public domain.



--- adbapi.py.orig	Fri Feb 28 16:51:08 2003
+++ adbapi.py	Tue Mar  4 02:01:36 2003
@@ -52,7 +52,9 @@
             log.msg("Connecting to database: %s %s %s" % (dbapiName, connargs, connkw))
         self.dbapi = reflect.namedModule(dbapiName)
         assert self.dbapi.apilevel == '2.0', 'DB API module not DB API 2.0 compliant.'
-        assert self.dbapi.threadsafety > 0, 'DB API module not sufficiently thread-safe.'
+        if hasattr(self.dbapi,"threadsaftey"):
+           test = self.dbapi.threadsaftey > 0 
+           assert test, 'DB API module not sufficiently thread-safe.'
         self.connargs = connargs
         self.connkw = connkw
         import thread
@@ -99,6 +101,21 @@
         return result
+    def _runQueryChunked(self, args, kw):
+        conn = self.connect()
+        curs = conn.cursor()
+        apply(curs.execute, args, kw)
+        class fetchChunk:
+            def __init__(self,curs):
+                self.curs = curs
+            def next(self):
+                ret = self.curs.fetchmany()
+                if not ret:
+                    self.curs.close()
+                    raise StopIteration
+                return ret
+        return fetchChunk(curs)
     def _runOperation(self, args, kw):
         """This is used for non-query operations that don't want "fetch*" to be called
@@ -121,6 +138,10 @@
         threads.deferToThread(self._runQuery, args, kw).addCallbacks(
             callback, errback)
+    def queryChunked(self, callback, errback, *args, **kw):
+        threads.deferIterationToThread(self._runQueryChunked, args, kw
+                                      ).addCallbacks(callback, errback)
     def operation(self, callback, errback, *args, **kw):
         threads.deferToThread(self._runOperation, args, kw).addCallbacks(
             callback, errback)
@@ -229,6 +250,11 @@
         apply(self.dbpool.query, (d.callback, d.errback)+args, kw)
         return d
+    def runQueryChunked(self, *args, **kw):
+        d = defer.MultiCallDeferred()
+        apply(self.dbpool.queryChunked, (d.callback, d.errback)+args, kw)
+        return d
     def runOperation(self, *args, **kw):
         d = defer.Deferred()
         apply(self.dbpool.operation, (d.callback,d.errback)+args, kw)

----- Forwarded message from "Clark C. Evans" <cce at clarkevans.com> -----
From: "Clark C. Evans" <cce at clarkevans.com>
To: twisted-python at twistedmatrix.com
Subject: [Twisted-Python] patch implementing deferIterationToThread
Date: Tue, 4 Mar 2003 05:46:48 +0000

Please find a patch to internet/defer.py and iternet/threads.py
which enables an "iterator" to be deferred to a thread.  This is
useful since often you have a long running process (a database
query for example) which you'd like to return results every
once and a while rather than delivering all of the output 
at the end of the process.  This patch adds to the given
files (it doesn't change any code).

The remainder of the message is hereby public domain.

# code to append to defer.py
class MultiCallDeferred(Deferred):
    """This is a verision of Deferred which can be invoked more
       than once.  This is accomplished by cloning the current
       deferred object and carrying out the callbacks on the clone.
    def _startRunCallbacks(self, result, isError):
        clone = Deferred()
        clone.default = self.default
        for x in self.callbacks:
        clone._startRunCallbacks(result, isError)

# code to be added to threads.py
def _putIterationInDeferred(deferred, f, args, kwargs):
    """Send the results of an iteration to a deferred.
       The function called should return an object
       with a next() operator.  This is the ideal mechanism
       to defer a generator.
    from twisted.internet import reactor
        itr = apply(f, args, kwargs)
        while 1:
            reactor.callFromThread(deferred.callback, itr.next())
    except StopIteration: pass
        f = failure.Failure()
        reactor.callFromThread(deferred.errback, f)
def deferIterationToThread(f, *args, **kwargs):
    """Run the results of an iterator in a thread."""
    d = defer.MultiCallDeferred()
    reactor.callInThread(_putIterationInDeferred, d, f, args, kwargs)
    return d

# usage using iterators (or generators for simpler syntax)
from twisted.internet.threads import deferIterationToThread
from twisted.internet import reactor
class producer:
    def __init__(self):
        self.val = 9
    def next(self):
        val = self.val
        if val < 1: raise StopIteration
        self.val -= 1
        return val
def bldr(): return producer()
def printResult(x): print x
d = deferIterationToThread(bldr)
try: # if you have generators
    from __future__ import generators
    def gene(start=99):
        while(start > 90):
            yield start
            start -= 1
    d = deferIterationToThread(gene)
except: pass

Twisted-Python mailing list
Twisted-Python at twistedmatrix.com

----- End forwarded message -----

More information about the Twisted-Python mailing list