Ticket #83: twisted.4.diff
| File twisted.4.diff, 24.4 KB (added by davep, 10 years ago) |
|---|
-
twisted/enterprise/adbapi.py
RCS file: /cvs/Twisted/twisted/enterprise/adbapi.py,v retrieving revision 1.50 diff -u -r1.50 adbapi.py
25 25 26 26 class Transaction: 27 27 """ 28 I am a lightweight wrapper for a database'cursor' object. I relay28 I am a lightweight wrapper for a DB-API 'cursor' object. I relay 29 29 attribute access to the DB cursor. 30 30 """ 31 31 _cursor = None … … 46 46 class ConnectionPool(pb.Referenceable): 47 47 """I represent a pool of connections to a DB-API 2.0 compliant database. 48 48 49 You can pass the noisy arg which determines whether informational 50 log messages are generated during the pool's operation. 49 You can pass cp_min, cp_max or both to set the minimum and maximum 50 number of connections that will be opened by the pool. You can pass 51 the noisy arg which determines whether informational log messages are 52 generated during the pool's operation. 51 53 """ 52 noisy = 153 54 54 # XXX - make the min and max attributes (and cp_min and cp_max 55 # kwargs to __init__) actually do something? 56 min = 3 57 max = 5 55 noisy = 1 # if true, generate informational log messages 56 min = 3 # minimum number of connections in pool 57 max = 5 # maximum number of connections in pool 58 58 59 59 def __init__(self, dbapiName, *connargs, **connkw): 60 60 """See ConnectionPool.__doc__ 61 61 """ 62 62 self.dbapiName = dbapiName 63 if self.noisy:64 log.msg("Connecting to database: %s %s %s" %65 (dbapiName, connargs, connkw))66 63 self.dbapi = reflect.namedModule(dbapiName) 67 64 68 65 if getattr(self.dbapi, 'apilevel', None) != '2.0': … … 74 71 self.connargs = connargs 75 72 self.connkw = connkw 76 73 77 import thread78 self.threadID = thread.get_ident79 self.connections = {}80 81 74 if connkw.has_key('cp_min'): 82 75 self.min = connkw['cp_min'] 83 76 del connkw['cp_min'] … … 90 83 self.noisy = connkw['cp_noisy'] 91 84 del connkw['cp_noisy'] 92 85 86 self.min = min(self.min, self.max) 87 self.max = max(self.min, self.max) 88 89 self.connections = {} # all connections, hashed on thread id 90 91 # these are optional so import them here 92 from twisted.python import threadpool 93 import thread 94 95 self.threadID = thread.get_ident 96 self.threadpool = threadpool.ThreadPool(self.min, self.max, 97 self.connect) 98 93 99 from twisted.internet import reactor 100 reactor.callWhenRunning(self.threadpool.start) 94 101 self.shutdownID = reactor.addSystemEventTrigger('during', 'shutdown', 95 102 self.finalClose) 96 103 … … 98 105 """Interact with the database and return the result. 99 106 100 107 The 'interaction' is a callable object which will be executed in a 101 pooled thread. It will be passed an L{Transaction} object as an102 argument (whose interface is identical to that of the database cursor103 for your DB-API module of choice), and its results will be returned as104 a Deferred. If running the method raises an exception, the transaction105 will be rolled back. If the method returns a value, the transaction106 will be committed.108 thread using a pooled connection. It will be passed an L{Transaction} 109 object as an argument (whose interface is identical to that of the 110 database cursor for your DB-API module of choice), and its results 111 will be returned as a Deferred. If running the method raises an 112 exception, the transaction will be rolled back. If the method returns 113 a value, the transaction will be committed. 107 114 108 115 @param interaction: a callable object whose first argument is 109 116 L{adbapi.Transaction}. … … 117 124 apply(self.interaction, (interaction,d.callback,d.errback,)+args, kw) 118 125 return d 119 126 120 def __getstate__(self): 121 return {'dbapiName': self.dbapiName, 122 'noisy': self.noisy, 123 'min': self.min, 124 'max': self.max, 125 'connargs': self.connargs, 126 'connkw': self.connkw} 127 def runQuery(self, *args, **kw): 128 """Execute an SQL query and return the result. 127 129 128 def __setstate__(self, state): 129 self.__dict__ = state 130 apply(self.__init__, (self.dbapiName, )+self.connargs, self.connkw) 130 A DB-API cursor will will be invoked with cursor.execute(*args, **kw). 131 The exact nature of the arguments will depend on the specific flavor 132 of DB-API being used, but the first argument in *args be an SQL 133 statement. The result of a subsequent cursor.fetchall() will be 134 fired to the Deferred which is returned. If either the 'execute' or 135 'fetchall' methods raise an exception, the transaction will be rolled 136 back and a Failure returned. 137 138 @param *args,**kw: arguments to be passed to a DB-API cursor's 139 'execute' method. 140 141 @return: a Deferred which will fire the return value of a DB-API 142 cursor's 'fetchall' method, or a Failure. 143 """ 144 145 d = defer.Deferred() 146 apply(self.query, (d.callback, d.errback)+args, kw) 147 return d 148 149 def runOperation(self, *args, **kw): 150 """Execute an SQL query and return None. 151 152 A DB-API cursor will will be invoked with cursor.execute(*args, **kw). 153 The exact nature of the arguments will depend on the specific flavor 154 of DB-API being used, but the first argument in *args will be an SQL 155 statement. This method will not attempt to fetch any results from the 156 query and is thus suitable for INSERT, DELETE, and other SQL statements 157 which do not return values. If the 'execute' method raises an exception, 158 the transaction will be rolled back and a Failure returned. 159 160 @param *args,**kw: arguments to be passed to a DB-API cursor's 161 'execute' method. 162 163 @return: a Deferred which will fire None or a Failure. 164 """ 165 166 d = defer.Deferred() 167 apply(self.operation, (d.callback, d.errback)+args, kw) 168 return d 169 170 def close(self): 171 """Close all pool connections and shutdown the pool. 172 173 Connections will be closed even if they are in use! 174 """ 175 176 from twisted.internet import reactor 177 reactor.removeSystemEventTrigger(self.shutdownID) 178 self.finalClose() 179 180 def finalClose(self): 181 """This should only be called by the shutdown trigger.""" 182 183 self.threadpool.stop() 184 for connection in self.connections.values(): 185 if self.noisy: 186 log.msg('adbapi closing: %s %s%s' % (self.dbapiName, 187 self.connargs or '', 188 self.connkw or '')) 189 connection.close() 190 self.connections.clear() 131 191 132 192 def connect(self): 133 """ Should be run in thread, blocks.193 """Return a database connection when one becomes available. This method blocks and should be run in a thread from the internal threadpool. 134 194 135 195 Don't call this method directly from non-threaded twisted code. 196 197 @return: a database connection from the pool. 136 198 """ 199 137 200 tid = self.threadID() 138 201 conn = self.connections.get(tid) 139 if not conn: 202 if conn is None: 203 if self.noisy: 204 log.msg('adbapi connecting: %s %s%s' % (self.dbapiName, 205 self.connargs or '', 206 self.connkw or '')) 140 207 conn = apply(self.dbapi.connect, self.connargs, self.connkw) 141 208 self.connections[tid] = conn 142 if self.noisy:143 log.msg('adbapi connecting: %s %s%s' %144 ( self.dbapiName, self.connargs or '', self.connkw or ''))145 209 return conn 146 210 211 def _runInteraction(self, interaction, *args, **kw): 212 trans = Transaction(self, self.connect()) 213 try: 214 result = apply(interaction, (trans,)+args, kw) 215 trans.close() 216 trans._connection.commit() 217 return result 218 except: 219 log.msg('Exception in SQL interaction. Rolling back.') 220 log.deferr() 221 trans._connection.rollback() 222 raise 223 147 224 def _runQuery(self, args, kw): 148 225 conn = self.connect() 149 226 curs = conn.cursor() … … 154 231 conn.commit() 155 232 return result 156 233 except: 234 log.msg('Exception in SQL query. Rolling back.') 235 log.deferr() 157 236 conn.rollback() 158 237 raise 159 238 160 239 def _runOperation(self, args, kw): 161 240 conn = self.connect() 162 241 curs = conn.cursor() 163 164 242 try: 165 243 apply(curs.execute, args, kw) 166 result = None167 244 curs.close() 168 245 conn.commit() 169 246 except: 170 # XXX - failures aren't working here 247 log.msg('Exception in SQL operation. Rolling back.') 248 log.deferr() 171 249 conn.rollback() 172 250 raise 173 return result 251 252 def __getstate__(self): 253 return {'dbapiName': self.dbapiName, 254 'noisy': self.noisy, 255 'min': self.min, 256 'max': self.max, 257 'connargs': self.connargs, 258 'connkw': self.connkw} 259 260 def __setstate__(self, state): 261 self.__dict__ = state 262 apply(self.__init__, (self.dbapiName, )+self.connargs, self.connkw) 263 264 def _deferToThread(self, f, *args, **kwargs): 265 """Internal function. 266 267 Call f in one of the connection pool's threads. 268 """ 269 270 d = defer.Deferred() 271 self.threadpool.callInThread(threads._putResultInDeferred, 272 d, f, args, kwargs) 273 return d 174 274 175 275 def query(self, callback, errback, *args, **kw): 176 276 # this will be deprecated ASAP 177 threads.deferToThread(self._runQuery, args, kw).addCallbacks(277 self._deferToThread(self._runQuery, args, kw).addCallbacks( 178 278 callback, errback) 179 279 180 280 def operation(self, callback, errback, *args, **kw): 181 281 # this will be deprecated ASAP 182 threads.deferToThread(self._runOperation, args, kw).addCallbacks(282 self._deferToThread(self._runOperation, args, kw).addCallbacks( 183 283 callback, errback) 184 284 185 285 def synchronousOperation(self, *args, **kw): … … 187 287 188 288 def interaction(self, interaction, callback, errback, *args, **kw): 189 289 # this will be deprecated ASAP 190 apply( threads.deferToThread,290 apply(self._deferToThread, 191 291 (self._runInteraction, interaction) + args, kw).addCallbacks( 192 292 callback, errback) 193 293 194 def runOperation(self, *args, **kw):195 """Run a SQL statement and return a Deferred of result."""196 d = defer.Deferred()197 apply(self.operation, (d.callback,d.errback)+args, kw)198 return d199 200 def runQuery(self, *args, **kw):201 """Run a read-only query and return a Deferred."""202 d = defer.Deferred()203 apply(self.query, (d.callback, d.errback)+args, kw)204 return d205 206 def _runInteraction(self, interaction, *args, **kw):207 trans = Transaction(self, self.connect())208 try:209 result = apply(interaction, (trans,)+args, kw)210 except:211 log.msg('Exception in SQL interaction! rolling back...')212 log.deferr()213 trans._connection.rollback()214 raise215 else:216 trans._cursor.close()217 trans._connection.commit()218 return result219 220 def close(self):221 from twisted.internet import reactor222 reactor.removeSystemEventTrigger(self.shutdownID)223 self.finalClose()224 225 def finalClose(self):226 for connection in self.connections.values():227 if self.noisy:228 log.msg('adbapi closing: %s %s%s' % (self.dbapiName,229 self.connargs or '',230 self.connkw or ''))231 connection.close()232 294 233 295 class Augmentation: 234 296 '''A class which augments a database connector with some functionality. … … 242 304 243 305 def __init__(self, dbpool): 244 306 self.dbpool = dbpool 245 #self.createSchema()246 307 247 308 def __setstate__(self, state): 248 309 self.__dict__ = state 249 #self.createSchema()250 310 251 311 def operationDone(self, done): 252 312 """Example callback for database operation success. -
twisted/internet/base.py
RCS file: /cvs/Twisted/twisted/internet/base.py,v retrieving revision 1.58 diff -u -r1.58 base.py
163 163 def __init__(self): 164 164 self._eventTriggers = {} 165 165 self._pendingTimedCalls = [] 166 self.running = 0 166 167 self.waker = None 167 168 self.resolver = None 168 169 self.usingThreads = 0 … … 349 350 "after": 2}[phase] 350 351 ].remove(item) 351 352 353 def callWhenRunning(self, callable, *args, **kw): 354 """See twisted.internet.interfaces.IReactorCore.callWhenRunning. 355 """ 356 if self.running: 357 callable(*args, **kw) 358 else: 359 self.addSystemEventTrigger('after', 'startup', 360 callable, *args, **kw) 352 361 353 362 # IReactorTime 354 363 -
twisted/internet/interfaces.py
RCS file: /cvs/Twisted/twisted/internet/interfaces.py,v retrieving revision 1.88 diff -u -r1.88 interfaces.py
402 402 403 403 @param args: the arguments to call it with. 404 404 405 @param kw: the ykeyword arguments to call it with.405 @param kw: the keyword arguments to call it with. 406 406 407 407 @returns: An L{IDelayedCall} object that can be used to cancel 408 408 the scheduled call, by calling its C{cancel()} method. … … 589 589 """Removes a trigger added with addSystemEventTrigger. 590 590 591 591 @param triggerID: a value returned from addSystemEventTrigger. 592 """ 593 594 def callWhenRunning(self, callable, *args, **kw): 595 """Call a function when the reactor is running. 596 597 If the reactor has not started, the callable will be scheduled 598 to run when it does start. Otherwise, the callable will be invoked 599 immediately. 600 601 @param callable: the callable object to call later. 602 603 @param args: the arguments to call it with. 604 605 @param kw: the keyword arguments to call it with. 606 607 @returns: None 592 608 """ 593 609 594 610 -
twisted/python/threadpool.py
RCS file: /cvs/Twisted/twisted/python/threadpool.py,v retrieving revision 1.20 diff -u -r1.20 threadpool.py
54 54 joined = 0 55 55 started = 0 56 56 workers = 0 57 58 def __init__(self, minthreads=5, maxthreads=20): 57 58 def __init__(self, minthreads=5, maxthreads=20, 59 init=None, *initargs, **initkw): 60 """Create a new threadpool. 61 62 @param minthreads: minimum number of threads in the pool 63 64 @param maxthreads: maximum number of threads in the pool 65 66 @param init: initialization function called from new threads 67 68 @param *initargs, **initkw: additional arguments to be passed to 'init' 69 """ 70 59 71 assert minthreads <= maxthreads, 'minimum is greater than maximum' 60 72 self.q = Queue.Queue(0) 61 73 self.min = minthreads 62 74 self.max = maxthreads 75 self.init = init 76 self.initargs = initargs 77 self.initkw = initkw 63 78 if runtime.platform.getType() != "java": 64 79 self.waiters = [] 65 80 else: 66 81 self.waiters = ThreadSafeList() 67 82 self.threads = [] 68 83 self.working = {} 69 84 70 85 def start(self): 71 86 """Start the threadpool. 72 87 """ 73 self.workers = self.min88 self.workers = min(max(self.min, self.q.qsize()), self.max) 74 89 self.joined = 0 75 90 self.started = 1 76 for i in range(self. min):91 for i in range(self.workers): 77 92 name = "PoolThread-%s-%s" % (id(self), i) 78 93 threading.Thread(target=self._worker, name=name).start() 79 94 … … 86 101 state['min'] = self.min 87 102 state['max'] = self.max 88 103 return state 89 104 90 105 def _startSomeWorkers(self): 91 106 if not self.waiters: 92 107 if self.workers < self.max: … … 124 139 self.callInThread(self._runWithCallback, callback, errback, func, args, kw) 125 140 126 141 def _worker(self): 142 if self.init: 143 self.init(*self.initargs, **self.initkw) 127 144 ct = threading.currentThread() 128 145 self.threads.append(ct) 129 146 130 147 while 1: 131 148 self.waiters.append(ct) 132 149 o = self.q.get() -
twisted/test/test_enterprise.py
RCS file: /cvs/Twisted/twisted/test/test_enterprise.py,v retrieving revision 1.16 diff -u -r1.16 test_enterprise.py
22 22 import os 23 23 import random 24 24 25 from twisted.trial.util import deferredResult26 25 from twisted.enterprise.row import RowObject 27 26 from twisted.enterprise.reflector import * 28 27 from twisted.enterprise.xmlreflector import XMLReflector 29 28 from twisted.enterprise.sqlreflector import SQLReflector 30 29 from twisted.enterprise.adbapi import ConnectionPool 31 30 from twisted.enterprise import util 31 from twisted.internet import defer 32 from twisted.trial.util import deferredResult, deferredError 33 from twisted.python import log 32 34 33 35 try: import gadfly 34 36 except: gadfly = None … … 89 91 ) 90 92 """ 91 93 94 simple_table_schema = """ 95 CREATE TABLE simple ( 96 x integer 97 ) 98 """ 99 92 100 def randomizeRow(row, nullsOK=1, trailingSpacesOK=1): 93 101 values = {} 94 102 for name, type in row.rowColumns: … … 298 306 DB_USER = 'twisted_test' 299 307 DB_PASS = 'twisted_test' 300 308 309 can_rollback = 1 310 301 311 reflectorClass = SQLReflector 302 312 303 313 def createReflector(self): 304 314 self.startDB() 305 315 self.dbpool = self.makePool() 316 self.dbpool.threadpool.start() # since the reactor never really starts 306 317 deferredResult(self.dbpool.runOperation(main_table_schema)) 307 318 deferredResult(self.dbpool.runOperation(child_table_schema)) 319 deferredResult(self.dbpool.runOperation(simple_table_schema)) 308 320 return self.reflectorClass(self.dbpool, [TestRow, ChildRow]) 309 321 310 322 def destroyReflector(self): 311 323 deferredResult(self.dbpool.runOperation('DROP TABLE testTable')) 312 324 deferredResult(self.dbpool.runOperation('DROP TABLE childTable')) 325 deferredResult(self.dbpool.runOperation('DROP TABLE simple')) 313 326 self.dbpool.close() 314 327 self.stopDB() 315 328 316 def startDB(self): pass 317 def stopDB(self): pass 329 def testPool(self): 330 # make sure failures are raised correctly 331 deferredError(self.dbpool.runQuery("select * from NOTABLE")) 332 deferredError(self.dbpool.runOperation("delete from * from NOTABLE")) 333 deferredError(self.dbpool.runInteraction(self.bad_interaction)) 334 log.flushErrors() 335 336 # verify simple table is empty 337 sql = "select count(1) from simple" 338 row = deferredResult(self.dbpool.runQuery(sql)) 339 self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back") 340 341 # add some rows to simple table (runOperation) 342 for i in range(self.count): 343 sql = "insert into simple(x) values(%d)" % i 344 deferredResult(self.dbpool.runOperation(sql)) 345 346 # make sure they were added (runQuery) 347 sql = "select x from simple order by x"; 348 rows = deferredResult(self.dbpool.runQuery(sql)) 349 self.failUnless(len(rows) == self.count, "Wrong number of rows") 350 for i in range(self.count): 351 self.failUnless(len(rows[i]) == 1, "Wrong size row") 352 self.failUnless(rows[i][0] == i, "Values not returned.") 353 354 # runInteraction 355 deferredResult(self.dbpool.runInteraction(self.interaction)) 356 357 # give the pool a workout 358 ds = [] 359 for i in range(self.count): 360 sql = "select x from simple where x = %d" % i 361 ds.append(self.dbpool.runQuery(sql)) 362 dlist = defer.DeferredList(ds, fireOnOneErrback=1) 363 result = deferredResult(dlist) 364 for i in range(self.count): 365 self.failUnless(result[i][1][0][0] == i, "Value not returned") 366 367 # now delete everything 368 ds = [] 369 for i in range(self.count): 370 sql = "delete from simple where x = %d" % i 371 ds.append(self.dbpool.runOperation(sql)) 372 dlist = defer.DeferredList(ds, fireOnOneErrback=1) 373 deferredResult(dlist) 374 375 # verify simple table is empty 376 sql = "select count(1) from simple" 377 row = deferredResult(self.dbpool.runQuery(sql)) 378 self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back") 379 380 def interaction(self, transaction): 381 transaction.execute("select x from simple order by x") 382 for i in range(self.count): 383 row = transaction.fetchone() 384 self.failUnless(len(row) == 1, "Wrong size row") 385 self.failUnless(row[0] == i, "Value not returned.") 386 # should test this, but gadfly throws an exception instead 387 #self.failUnless(transaction.fetchone() is None, "Too many rows") 388 389 def bad_interaction(self, transaction): 390 if self.can_rollback: 391 transaction.execute("insert into simple(x) values(0)") 318 392 393 transaction.execute("select * from NOTABLE") 319 394 320 class SinglePool(ConnectionPool): 321 """A pool for just one connection at a time. 322 Remove this when ConnectionPool is fixed. 323 """ 324 325 def __init__(self, connection): 326 self.connection = connection 327 328 def connect(self): 329 return self.connection 330 331 def close(self): 332 self.connection.close() 333 del self.connection 395 def startDB(self): pass 396 def stopDB(self): pass 334 397 335 398 336 399 class NoSlashSQLReflector(SQLReflector): … … 346 409 nullsOK = 0 347 410 DB_DIR = "./gadflyDB" 348 411 reflectorClass = NoSlashSQLReflector 412 can_rollback = 0 349 413 350 414 def startDB(self): 351 415 if not os.path.exists(self.DB_DIR): os.mkdir(self.DB_DIR) … … 359 423 conn.close() 360 424 361 425 def makePool(self): 362 return SinglePool(gadfly.gadfly(self.DB_NAME, self.DB_DIR))426 return ConnectionPool('gadfly', self.DB_NAME, self.DB_DIR, cp_max=1) 363 427 364 428 365 429 class SQLiteTestCase(SQLReflectorTestCase, unittest.TestCase): … … 375 439 if os.path.exists(self.database): os.unlink(self.database) 376 440 377 441 def makePool(self): 378 return SinglePool(sqlite.connect(database=self.database))442 return ConnectionPool('sqlite', database=self.database, cp_max=1) 379 443 380 444 381 445 class PostgresTestCase(SQLReflectorTestCase, unittest.TestCase): … … 384 448 385 449 def makePool(self): 386 450 return ConnectionPool('pyPgSQL.PgSQL', database=self.DB_NAME, 387 user=self.DB_USER, password=self.DB_PASS) 451 user=self.DB_USER, password=self.DB_PASS, 452 cp_min=0) 388 453 389 454 390 455 class MySQLTestCase(SQLReflectorTestCase, unittest.TestCase): … … 392 457 """ 393 458 394 459 trailingSpacesOK = 0 460 can_rollback = 0 395 461 396 462 def makePool(self): 397 463 return ConnectionPool('MySQLdb', db=self.DB_NAME, … … 410 476 411 477 412 478 if gadfly is None: GadflyTestCase.skip = 1 479 elif not getattr(gadfly, 'connect', None): gadfly.connect = gadfly.gadfly 480 413 481 if sqlite is None: SQLiteTestCase.skip = 1 414 482 415 483 if PgSQL is None: PostgresTestCase.skip = 1
