Ticket #83: twisted.3.diff
| File twisted.3.diff, 20.6 KB (added by davep, 10 years ago) |
|---|
-
twisted/enterprise/adbapi.py
RCS file: /cvs/Twisted/twisted/enterprise/adbapi.py,v retrieving revision 1.50 diff -u -r1.50 adbapi.py
25 25 26 26 class Transaction: 27 27 """ 28 I am a lightweight wrapper for a database'cursor' object. I relay28 I am a lightweight wrapper for a DB-API 'cursor' object. I relay 29 29 attribute access to the DB cursor. 30 30 """ 31 31 _cursor = None … … 46 46 class ConnectionPool(pb.Referenceable): 47 47 """I represent a pool of connections to a DB-API 2.0 compliant database. 48 48 49 You can pass the noisy arg which determines whether informational 50 log messages are generated during the pool's operation. 49 You can pass cp_min, cp_max or both to set the minimum and maximum 50 number of connections that will be opened by the pool. You can pass 51 the noisy arg which determines whether informational log messages are 52 generated during the pool's operation. 51 53 """ 52 noisy = 153 54 54 # XXX - make the min and max attributes (and cp_min and cp_max55 # kwargs to __init__) actually do something?56 m in = 357 max = 555 noisy = 1 # if true, generate informational log messages 56 min = 3 # minimum number of connections in pool 57 max = 5 # maximum number of connections in pool 58 started = 0 # 1 if threadpool has been started 58 59 59 60 def __init__(self, dbapiName, *connargs, **connkw): 60 61 """See ConnectionPool.__doc__ 61 62 """ 62 63 self.dbapiName = dbapiName 63 if self.noisy:64 log.msg("Connecting to database: %s %s %s" %65 (dbapiName, connargs, connkw))66 64 self.dbapi = reflect.namedModule(dbapiName) 67 65 68 66 if getattr(self.dbapi, 'apilevel', None) != '2.0': … … 74 72 self.connargs = connargs 75 73 self.connkw = connkw 76 74 77 import thread78 self.threadID = thread.get_ident79 self.connections = {}80 81 75 if connkw.has_key('cp_min'): 82 76 self.min = connkw['cp_min'] 83 77 del connkw['cp_min'] … … 90 84 self.noisy = connkw['cp_noisy'] 91 85 del connkw['cp_noisy'] 92 86 87 self.min = min(self.min, self.max) 88 self.max = max(self.min, self.max) 89 90 self.connections = {} # all connections, hashed on thread id 91 92 # these are optional so import them here 93 from twisted.python import threadpool 94 import thread 95 96 self.threadID = thread.get_ident 97 self.threadpool = threadpool.ThreadPool(self.min, self.max) 98 99 # TODO: start up min connections 100 93 101 from twisted.internet import reactor 102 reactor.callLater(0, self.threadpool.start) 94 103 self.shutdownID = reactor.addSystemEventTrigger('during', 'shutdown', 95 104 self.finalClose) 96 105 … … 98 107 """Interact with the database and return the result. 99 108 100 109 The 'interaction' is a callable object which will be executed in a 101 pooled thread. It will be passed an L{Transaction} object as an102 argument (whose interface is identical to that of the database cursor103 for your DB-API module of choice), and its results will be returned as104 a Deferred. If running the method raises an exception, the transaction105 will be rolled back. If the method returns a value, the transaction106 will be committed.110 thread using a pooled connection. It will be passed an L{Transaction} 111 object as an argument (whose interface is identical to that of the 112 database cursor for your DB-API module of choice), and its results 113 will be returned as a Deferred. If running the method raises an 114 exception, the transaction will be rolled back. If the method returns 115 a value, the transaction will be committed. 107 116 108 117 @param interaction: a callable object whose first argument is 109 118 L{adbapi.Transaction}. … … 117 126 apply(self.interaction, (interaction,d.callback,d.errback,)+args, kw) 118 127 return d 119 128 120 def __getstate__(self): 121 return {'dbapiName': self.dbapiName, 122 'noisy': self.noisy, 123 'min': self.min, 124 'max': self.max, 125 'connargs': self.connargs, 126 'connkw': self.connkw} 129 def runQuery(self, *args, **kw): 130 """Execute an SQL query and return the result. 127 131 128 def __setstate__(self, state): 129 self.__dict__ = state 130 apply(self.__init__, (self.dbapiName, )+self.connargs, self.connkw) 132 A DB-API cursor will will be invoked with cursor.execute(*args, **kw). 133 The exact nature of the arguments will depend on the specific flavor 134 of DB-API being used, but the first argument in *args be an SQL 135 statement. The result of a subsequent cursor.fetchall() will be 136 fired to the Deferred which is returned. If either the 'execute' or 137 'fetchall' methods raise an exception, the transaction will be rolled 138 back and a Failure returned. 139 140 @param *args,**kw: arguments to be passed to a DB-API cursor's 141 'execute' method. 142 143 @return: a Deferred which will fire the return value of a DB-API 144 cursor's 'fetchall' method, or a Failure. 145 """ 146 147 d = defer.Deferred() 148 apply(self.query, (d.callback, d.errback)+args, kw) 149 return d 150 151 def runOperation(self, *args, **kw): 152 """Execute an SQL query and return None. 153 154 A DB-API cursor will will be invoked with cursor.execute(*args, **kw). 155 The exact nature of the arguments will depend on the specific flavor 156 of DB-API being used, but the first argument in *args will be an SQL 157 statement. This method will not attempt to fetch any results from the 158 query and is thus suitable for INSERT, DELETE, and other SQL statements 159 which do not return values. If the 'execute' method raises an exception, 160 the transaction will be rolled back and a Failure returned. 161 162 @param *args,**kw: arguments to be passed to a DB-API cursor's 163 'execute' method. 164 165 @return: a Deferred which will fire None or a Failure. 166 """ 167 168 d = defer.Deferred() 169 apply(self.operation, (d.callback, d.errback)+args, kw) 170 return d 171 172 def close(self): 173 """Close all pool connections and shutdown the pool. 174 175 Connections will be closed even if they are in use! 176 """ 177 178 from twisted.internet import reactor 179 reactor.removeSystemEventTrigger(self.shutdownID) 180 self.finalClose() 181 182 def finalClose(self): 183 """This should only be called by the shutdown trigger.""" 184 185 self.threadpool.stop() 186 for connection in self.connections.values(): 187 if self.noisy: 188 log.msg('adbapi closing: %s %s%s' % (self.dbapiName, 189 self.connargs or '', 190 self.connkw or '')) 191 connection.close() 192 self.connections.clear() 131 193 132 194 def connect(self): 133 """ Should be run in thread, blocks.195 """Return a database connection when one becomes available. This method blocks and should be run in a thread from the internal threadpool. 134 196 135 197 Don't call this method directly from non-threaded twisted code. 198 199 @return: a database connection from the pool. 136 200 """ 201 137 202 tid = self.threadID() 138 203 conn = self.connections.get(tid) 139 if not conn: 204 if conn is None: 205 if self.noisy: 206 log.msg('adbapi connecting: %s %s%s' % (self.dbapiName, 207 self.connargs or '', 208 self.connkw or '')) 140 209 conn = apply(self.dbapi.connect, self.connargs, self.connkw) 141 210 self.connections[tid] = conn 142 if self.noisy:143 log.msg('adbapi connecting: %s %s%s' %144 ( self.dbapiName, self.connargs or '', self.connkw or ''))145 211 return conn 146 212 213 def _runInteraction(self, interaction, *args, **kw): 214 trans = Transaction(self, self.connect()) 215 try: 216 result = apply(interaction, (trans,)+args, kw) 217 trans.close() 218 trans._connection.commit() 219 return result 220 except: 221 log.msg('Exception in SQL interaction. Rolling back.') 222 log.deferr() 223 trans._connection.rollback() 224 raise 225 147 226 def _runQuery(self, args, kw): 148 227 conn = self.connect() 149 228 curs = conn.cursor() … … 154 233 conn.commit() 155 234 return result 156 235 except: 236 log.msg('Exception in SQL query. Rolling back.') 237 log.deferr() 157 238 conn.rollback() 158 239 raise 159 240 160 241 def _runOperation(self, args, kw): 161 242 conn = self.connect() 162 243 curs = conn.cursor() 163 164 244 try: 165 245 apply(curs.execute, args, kw) 166 result = None167 246 curs.close() 168 247 conn.commit() 169 248 except: 170 # XXX - failures aren't working here 249 log.msg('Exception in SQL operation. Rolling back.') 250 log.deferr() 171 251 conn.rollback() 172 252 raise 173 return result 253 254 def __getstate__(self): 255 return {'dbapiName': self.dbapiName, 256 'noisy': self.noisy, 257 'min': self.min, 258 'max': self.max, 259 'connargs': self.connargs, 260 'connkw': self.connkw} 261 262 def __setstate__(self, state): 263 self.__dict__ = state 264 apply(self.__init__, (self.dbapiName, )+self.connargs, self.connkw) 265 266 def _deferToThread(self, f, *args, **kwargs): 267 """Internal function. 268 269 Call f in one of the connection pool's threads. 270 """ 271 272 d = defer.Deferred() 273 self.threadpool.callInThread(threads._putResultInDeferred, 274 d, f, args, kwargs) 275 return d 174 276 175 277 def query(self, callback, errback, *args, **kw): 176 278 # this will be deprecated ASAP 177 threads.deferToThread(self._runQuery, args, kw).addCallbacks(279 self._deferToThread(self._runQuery, args, kw).addCallbacks( 178 280 callback, errback) 179 281 180 282 def operation(self, callback, errback, *args, **kw): 181 283 # this will be deprecated ASAP 182 threads.deferToThread(self._runOperation, args, kw).addCallbacks(284 self._deferToThread(self._runOperation, args, kw).addCallbacks( 183 285 callback, errback) 184 286 185 287 def synchronousOperation(self, *args, **kw): … … 187 289 188 290 def interaction(self, interaction, callback, errback, *args, **kw): 189 291 # this will be deprecated ASAP 190 apply( threads.deferToThread,292 apply(self._deferToThread, 191 293 (self._runInteraction, interaction) + args, kw).addCallbacks( 192 294 callback, errback) 193 295 194 def runOperation(self, *args, **kw):195 """Run a SQL statement and return a Deferred of result."""196 d = defer.Deferred()197 apply(self.operation, (d.callback,d.errback)+args, kw)198 return d199 200 def runQuery(self, *args, **kw):201 """Run a read-only query and return a Deferred."""202 d = defer.Deferred()203 apply(self.query, (d.callback, d.errback)+args, kw)204 return d205 206 def _runInteraction(self, interaction, *args, **kw):207 trans = Transaction(self, self.connect())208 try:209 result = apply(interaction, (trans,)+args, kw)210 except:211 log.msg('Exception in SQL interaction! rolling back...')212 log.deferr()213 trans._connection.rollback()214 raise215 else:216 trans._cursor.close()217 trans._connection.commit()218 return result219 220 def close(self):221 from twisted.internet import reactor222 reactor.removeSystemEventTrigger(self.shutdownID)223 self.finalClose()224 225 def finalClose(self):226 for connection in self.connections.values():227 if self.noisy:228 log.msg('adbapi closing: %s %s%s' % (self.dbapiName,229 self.connargs or '',230 self.connkw or ''))231 connection.close()232 296 233 297 class Augmentation: 234 298 '''A class which augments a database connector with some functionality. … … 242 306 243 307 def __init__(self, dbpool): 244 308 self.dbpool = dbpool 245 #self.createSchema()246 309 247 310 def __setstate__(self, state): 248 311 self.__dict__ = state 249 #self.createSchema()250 312 251 313 def operationDone(self, done): 252 314 """Example callback for database operation success. -
twisted/python/threadpool.py
RCS file: /cvs/Twisted/twisted/python/threadpool.py,v retrieving revision 1.20 diff -u -r1.20 threadpool.py
70 70 def start(self): 71 71 """Start the threadpool. 72 72 """ 73 self.workers = self.min73 self.workers = min(max(self.min, self.q.qsize()), self.max) 74 74 self.joined = 0 75 75 self.started = 1 76 for i in range(self. min):76 for i in range(self.workers): 77 77 name = "PoolThread-%s-%s" % (id(self), i) 78 78 threading.Thread(target=self._worker, name=name).start() 79 79 … … 107 107 self.q.put(o) 108 108 if self.started and not self.waiters: 109 109 self._startSomeWorkers() 110 110 111 111 def _runWithCallback(self, callback, errback, func, args, kwargs): 112 112 try: 113 113 result = apply(func, args, kwargs) -
twisted/test/test_enterprise.py
RCS file: /cvs/Twisted/twisted/test/test_enterprise.py,v retrieving revision 1.16 diff -u -r1.16 test_enterprise.py
22 22 import os 23 23 import random 24 24 25 from twisted.trial.util import deferredResult26 25 from twisted.enterprise.row import RowObject 27 26 from twisted.enterprise.reflector import * 28 27 from twisted.enterprise.xmlreflector import XMLReflector 29 28 from twisted.enterprise.sqlreflector import SQLReflector 30 29 from twisted.enterprise.adbapi import ConnectionPool 31 30 from twisted.enterprise import util 31 from twisted.internet import defer 32 from twisted.trial.util import deferredResult, deferredError 33 from twisted.python import log 32 34 33 35 try: import gadfly 34 36 except: gadfly = None … … 89 91 ) 90 92 """ 91 93 94 simple_table_schema = """ 95 CREATE TABLE simple ( 96 x integer 97 ) 98 """ 99 92 100 def randomizeRow(row, nullsOK=1, trailingSpacesOK=1): 93 101 values = {} 94 102 for name, type in row.rowColumns: … … 298 306 DB_USER = 'twisted_test' 299 307 DB_PASS = 'twisted_test' 300 308 309 can_rollback = 1 310 301 311 reflectorClass = SQLReflector 302 312 303 313 def createReflector(self): … … 305 315 self.dbpool = self.makePool() 306 316 deferredResult(self.dbpool.runOperation(main_table_schema)) 307 317 deferredResult(self.dbpool.runOperation(child_table_schema)) 318 deferredResult(self.dbpool.runOperation(simple_table_schema)) 308 319 return self.reflectorClass(self.dbpool, [TestRow, ChildRow]) 309 320 310 321 def destroyReflector(self): 311 322 deferredResult(self.dbpool.runOperation('DROP TABLE testTable')) 312 323 deferredResult(self.dbpool.runOperation('DROP TABLE childTable')) 324 deferredResult(self.dbpool.runOperation('DROP TABLE simple')) 313 325 self.dbpool.close() 314 326 self.stopDB() 315 327 316 def startDB(self): pass 317 def stopDB(self): pass 328 def testPool(self): 329 # make sure failures are raised correctly 330 deferredError(self.dbpool.runQuery("select * from NOTABLE")) 331 deferredError(self.dbpool.runOperation("delete from * from NOTABLE")) 332 deferredError(self.dbpool.runInteraction(self.bad_interaction)) 333 log.flushErrors() 334 335 # verify simple table is empty 336 sql = "select count(1) from simple" 337 row = deferredResult(self.dbpool.runQuery(sql)) 338 self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back") 339 340 # add some rows to simple table (runOperation) 341 for i in range(self.count): 342 sql = "insert into simple(x) values(%d)" % i 343 deferredResult(self.dbpool.runOperation(sql)) 344 345 # make sure they were added (runQuery) 346 sql = "select x from simple order by x"; 347 rows = deferredResult(self.dbpool.runQuery(sql)) 348 self.failUnless(len(rows) == self.count, "Wrong number of rows") 349 for i in range(self.count): 350 self.failUnless(len(rows[i]) == 1, "Wrong size row") 351 self.failUnless(rows[i][0] == i, "Values not returned.") 352 353 # runInteraction 354 deferredResult(self.dbpool.runInteraction(self.interaction)) 355 356 # give the pool a workout 357 ds = [] 358 for i in range(self.count): 359 sql = "select x from simple where x = %d" % i 360 ds.append(self.dbpool.runQuery(sql)) 361 dlist = defer.DeferredList(ds, fireOnOneErrback=1) 362 result = deferredResult(dlist) 363 for i in range(self.count): 364 self.failUnless(result[i][1][0][0] == i, "Value not returned") 365 366 # now delete everything 367 ds = [] 368 for i in range(self.count): 369 sql = "delete from simple where x = %d" % i 370 ds.append(self.dbpool.runOperation(sql)) 371 dlist = defer.DeferredList(ds, fireOnOneErrback=1) 372 deferredResult(dlist) 373 374 # verify simple table is empty 375 sql = "select count(1) from simple" 376 row = deferredResult(self.dbpool.runQuery(sql)) 377 self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back") 378 379 def interaction(self, transaction): 380 transaction.execute("select x from simple order by x") 381 for i in range(self.count): 382 row = transaction.fetchone() 383 self.failUnless(len(row) == 1, "Wrong size row") 384 self.failUnless(row[0] == i, "Value not returned.") 385 # should test this, but gadfly throws an exception instead 386 #self.failUnless(transaction.fetchone() is None, "Too many rows") 387 388 def bad_interaction(self, transaction): 389 if self.can_rollback: 390 transaction.execute("insert into simple(x) values(0)") 318 391 392 transaction.execute("select * from NOTABLE") 319 393 320 class SinglePool(ConnectionPool): 321 """A pool for just one connection at a time. 322 Remove this when ConnectionPool is fixed. 323 """ 324 325 def __init__(self, connection): 326 self.connection = connection 327 328 def connect(self): 329 return self.connection 330 331 def close(self): 332 self.connection.close() 333 del self.connection 394 def startDB(self): pass 395 def stopDB(self): pass 334 396 335 397 336 398 class NoSlashSQLReflector(SQLReflector): … … 346 408 nullsOK = 0 347 409 DB_DIR = "./gadflyDB" 348 410 reflectorClass = NoSlashSQLReflector 411 can_rollback = 0 349 412 350 413 def startDB(self): 351 414 if not os.path.exists(self.DB_DIR): os.mkdir(self.DB_DIR) … … 359 422 conn.close() 360 423 361 424 def makePool(self): 362 return SinglePool(gadfly.gadfly(self.DB_NAME, self.DB_DIR))425 return ConnectionPool('gadfly', self.DB_NAME, self.DB_DIR, cp_max=1) 363 426 364 427 365 428 class SQLiteTestCase(SQLReflectorTestCase, unittest.TestCase): … … 375 438 if os.path.exists(self.database): os.unlink(self.database) 376 439 377 440 def makePool(self): 378 return SinglePool(sqlite.connect(database=self.database))441 return ConnectionPool('sqlite', database=self.database, cp_max=1) 379 442 380 443 381 444 class PostgresTestCase(SQLReflectorTestCase, unittest.TestCase): … … 384 447 385 448 def makePool(self): 386 449 return ConnectionPool('pyPgSQL.PgSQL', database=self.DB_NAME, 387 user=self.DB_USER, password=self.DB_PASS) 450 user=self.DB_USER, password=self.DB_PASS, 451 cp_min=0) 388 452 389 453 390 454 class MySQLTestCase(SQLReflectorTestCase, unittest.TestCase): … … 392 456 """ 393 457 394 458 trailingSpacesOK = 0 459 can_rollback = 0 395 460 396 461 def makePool(self): 397 462 return ConnectionPool('MySQLdb', db=self.DB_NAME, … … 410 475 411 476 412 477 if gadfly is None: GadflyTestCase.skip = 1 478 elif not getattr(gadfly, 'connect', None): gadfly.connect = gadfly.gadfly 479 413 480 if sqlite is None: SQLiteTestCase.skip = 1 414 481 415 482 if PgSQL is None: PostgresTestCase.skip = 1
