[Twisted-Python] patch implementing "fetchmany" from enterprise.adbapi
Clark C. Evans
cce at clarkevans.com
Tue Mar 4 02:32:28 EST 2003
Please find following a patch to add the ability for fetchmany()
to be used from within the enterprise adbapi.py interface. This
uses the patches previously posted (and included following) which
enable an iterator to be deferred via a thread.
Basically, one can use runQueryChunked instead of runQuery
and it will call your callback once for each "fetchmany"
return, which, depending on the database driver can be
about 10-50 rows at a time. This is essential for incremental
handling of large query results.
The patch also tests for "threadsaftey", for some reason
mxODBC has an apilevel of "2.0" yet they are missing this
attribute (they call it threadlevel). I'm not fond of
this part of the patch, but it is included since this
is what I tested with (I don't have permissions to change
mxODBC).
The remainder of the message is hereby public domain.
Best,
Clark
--- adbapi.py.orig Fri Feb 28 16:51:08 2003
+++ adbapi.py Tue Mar 4 02:01:36 2003
@@ -52,7 +52,9 @@
log.msg("Connecting to database: %s %s %s" % (dbapiName, connargs, connkw))
self.dbapi = reflect.namedModule(dbapiName)
assert self.dbapi.apilevel == '2.0', 'DB API module not DB API 2.0 compliant.'
- assert self.dbapi.threadsafety > 0, 'DB API module not sufficiently thread-safe.'
+ if hasattr(self.dbapi,"threadsaftey"):
+ test = self.dbapi.threadsaftey > 0
+ assert test, 'DB API module not sufficiently thread-safe.'
self.connargs = connargs
self.connkw = connkw
import thread
@@ -99,6 +101,21 @@
curs.close()
return result
+ def _runQueryChunked(self, args, kw):
+ conn = self.connect()
+ curs = conn.cursor()
+ apply(curs.execute, args, kw)
+ class fetchChunk:
+ def __init__(self,curs):
+ self.curs = curs
+ def next(self):
+ ret = self.curs.fetchmany()
+ if not ret:
+ self.curs.close()
+ raise StopIteration
+ return ret
+ return fetchChunk(curs)
+
def _runOperation(self, args, kw):
"""This is used for non-query operations that don't want "fetch*" to be called
"""
@@ -121,6 +138,10 @@
threads.deferToThread(self._runQuery, args, kw).addCallbacks(
callback, errback)
+ def queryChunked(self, callback, errback, *args, **kw):
+ threads.deferIterationToThread(self._runQueryChunked, args, kw
+ ).addCallbacks(callback, errback)
+
def operation(self, callback, errback, *args, **kw):
threads.deferToThread(self._runOperation, args, kw).addCallbacks(
callback, errback)
@@ -229,6 +250,11 @@
apply(self.dbpool.query, (d.callback, d.errback)+args, kw)
return d
+ def runQueryChunked(self, *args, **kw):
+ d = defer.MultiCallDeferred()
+ apply(self.dbpool.queryChunked, (d.callback, d.errback)+args, kw)
+ return d
+
def runOperation(self, *args, **kw):
d = defer.Deferred()
apply(self.dbpool.operation, (d.callback,d.errback)+args, kw)
----- Forwarded message from "Clark C. Evans" <cce at clarkevans.com> -----
From: "Clark C. Evans" <cce at clarkevans.com>
To: twisted-python at twistedmatrix.com
Subject: [Twisted-Python] patch implementing deferIterationToThread
Date: Tue, 4 Mar 2003 05:46:48 +0000
Please find a patch to internet/defer.py and iternet/threads.py
which enables an "iterator" to be deferred to a thread. This is
useful since often you have a long running process (a database
query for example) which you'd like to return results every
once and a while rather than delivering all of the output
at the end of the process. This patch adds to the given
files (it doesn't change any code).
The remainder of the message is hereby public domain.
#
# code to append to defer.py
#
class MultiCallDeferred(Deferred):
"""This is a verision of Deferred which can be invoked more
than once. This is accomplished by cloning the current
deferred object and carrying out the callbacks on the clone.
"""
def _startRunCallbacks(self, result, isError):
clone = Deferred()
clone.default = self.default
for x in self.callbacks:
clone.callbacks.append(x)
clone._startRunCallbacks(result, isError)
#
# code to be added to threads.py
#
def _putIterationInDeferred(deferred, f, args, kwargs):
"""Send the results of an iteration to a deferred.
The function called should return an object
with a next() operator. This is the ideal mechanism
to defer a generator.
"""
from twisted.internet import reactor
try:
itr = apply(f, args, kwargs)
while 1:
reactor.callFromThread(deferred.callback, itr.next())
except StopIteration: pass
except:
f = failure.Failure()
reactor.callFromThread(deferred.errback, f)
def deferIterationToThread(f, *args, **kwargs):
"""Run the results of an iterator in a thread."""
d = defer.MultiCallDeferred()
reactor.callInThread(_putIterationInDeferred, d, f, args, kwargs)
return d
#
# usage using iterators (or generators for simpler syntax)
#
from twisted.internet.threads import deferIterationToThread
from twisted.internet import reactor
class producer:
def __init__(self):
self.val = 9
def next(self):
val = self.val
if val < 1: raise StopIteration
self.val -= 1
return val
def bldr(): return producer()
def printResult(x): print x
d = deferIterationToThread(bldr)
d.addCallback(printResult)
try: # if you have generators
from __future__ import generators
def gene(start=99):
while(start > 90):
yield start
start -= 1
d = deferIterationToThread(gene)
d.addCallback(printResult)
except: pass
reactor.run()
_______________________________________________
Twisted-Python mailing list
Twisted-Python at twistedmatrix.com
http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
----- End forwarded message -----
More information about the Twisted-Python
mailing list