Ticket #83: twisted.2.diff
| File twisted.2.diff, 19.5 KB (added by davep, 10 years ago) |
|---|
-
twisted/enterprise/adbapi.py
RCS file: /cvs/Twisted/twisted/enterprise/adbapi.py,v retrieving revision 1.50 diff -u -r1.50 adbapi.py
25 25 26 26 class Transaction: 27 27 """ 28 I am a lightweight wrapper for a database'cursor' object. I relay28 I am a lightweight wrapper for a DB-API 'cursor' object. I relay 29 29 attribute access to the DB cursor. 30 30 """ 31 31 _cursor = None … … 46 46 class ConnectionPool(pb.Referenceable): 47 47 """I represent a pool of connections to a DB-API 2.0 compliant database. 48 48 49 You can pass the noisy arg which determines whether informational 50 log messages are generated during the pool's operation. 49 You can pass cp_min, cp_max or both to set the minimum and maximum 50 number of connections that will be opened by the pool. You can pass 51 the noisy arg which determines whether informational log messages are 52 generated during the pool's operation. 51 53 """ 52 noisy = 153 54 54 # XXX - make the min and max attributes (and cp_min and cp_max 55 # kwargs to __init__) actually do something? 56 min = 3 57 max = 5 55 noisy = 1 # if true, generate informational log messages 56 min = 3 # minimum number of connections in pool 57 max = 5 # maximum number of connections in pool 58 58 59 59 def __init__(self, dbapiName, *connargs, **connkw): 60 60 """See ConnectionPool.__doc__ 61 61 """ 62 62 self.dbapiName = dbapiName 63 if self.noisy:64 log.msg("Connecting to database: %s %s %s" %65 (dbapiName, connargs, connkw))66 63 self.dbapi = reflect.namedModule(dbapiName) 67 64 68 65 if getattr(self.dbapi, 'apilevel', None) != '2.0': … … 74 71 self.connargs = connargs 75 72 self.connkw = connkw 76 73 77 import thread78 self.threadID = thread.get_ident79 self.connections = {}80 81 74 if connkw.has_key('cp_min'): 82 75 self.min = connkw['cp_min'] 83 76 del connkw['cp_min'] … … 90 83 self.noisy = connkw['cp_noisy'] 91 84 del connkw['cp_noisy'] 92 85 86 self.min = min(self.min, self.max) 87 self.max = max(self.min, self.max) 88 89 self.connections = {} # all connections, hashed on thread id 90 91 # these are optional so import them here 92 from twisted.python import threadpool 93 import thread 94 95 self.threadID = thread.get_ident 96 self.threadpool = threadpool.ThreadPool(self.min, self.max) 97 self.threadpool.start() 98 99 # TODO: start up min connections 100 93 101 from twisted.internet import reactor 94 102 self.shutdownID = reactor.addSystemEventTrigger('during', 'shutdown', 95 103 self.finalClose) … … 98 106 """Interact with the database and return the result. 99 107 100 108 The 'interaction' is a callable object which will be executed in a 101 pooled thread. It will be passed an L{Transaction} object as an102 argument (whose interface is identical to that of the database cursor103 for your DB-API module of choice), and its results will be returned as104 a Deferred. If running the method raises an exception, the transaction105 will be rolled back. If the method returns a value, the transaction106 will be committed.109 thread using a pooled connection. It will be passed an L{Transaction} 110 object as an argument (whose interface is identical to that of the 111 database cursor for your DB-API module of choice), and its results 112 will be returned as a Deferred. If running the method raises an 113 exception, the transaction will be rolled back. If the method returns 114 a value, the transaction will be committed. 107 115 108 116 @param interaction: a callable object whose first argument is 109 117 L{adbapi.Transaction}. … … 117 125 apply(self.interaction, (interaction,d.callback,d.errback,)+args, kw) 118 126 return d 119 127 120 def __getstate__(self): 121 return {'dbapiName': self.dbapiName, 122 'noisy': self.noisy, 123 'min': self.min, 124 'max': self.max, 125 'connargs': self.connargs, 126 'connkw': self.connkw} 128 def runQuery(self, *args, **kw): 129 """Execute an SQL query and return the result. 127 130 128 def __setstate__(self, state): 129 self.__dict__ = state 130 apply(self.__init__, (self.dbapiName, )+self.connargs, self.connkw) 131 A DB-API cursor will will be invoked with cursor.execute(*args, **kw). 132 The exact nature of the arguments will depend on the specific flavor 133 of DB-API being used, but the first argument in *args be an SQL 134 statement. The result of a subsequent cursor.fetchall() will be 135 fired to the Deferred which is returned. If either the 'execute' or 136 'fetchall' methods raise an exception, the transaction will be rolled 137 back and a Failure returned. 138 139 @param *args,**kw: arguments to be passed to a DB-API cursor's 140 'execute' method. 141 142 @return: a Deferred which will fire the return value of a DB-API 143 cursor's 'fetchall' method, or a Failure. 144 """ 145 146 d = defer.Deferred() 147 apply(self.query, (d.callback, d.errback)+args, kw) 148 return d 149 150 def runOperation(self, *args, **kw): 151 """Execute an SQL query and return None. 152 153 A DB-API cursor will will be invoked with cursor.execute(*args, **kw). 154 The exact nature of the arguments will depend on the specific flavor 155 of DB-API being used, but the first argument in *args will be an SQL 156 statement. This method will not attempt to fetch any results from the 157 query and is thus suitable for INSERT, DELETE, and other SQL statements 158 which do not return values. If the 'execute' method raises an exception, 159 the transaction will be rolled back and a Failure returned. 160 161 @param *args,**kw: arguments to be passed to a DB-API cursor's 162 'execute' method. 163 164 @return: a Deferred which will fire None or a Failure. 165 """ 166 167 d = defer.Deferred() 168 apply(self.operation, (d.callback, d.errback)+args, kw) 169 return d 170 171 def close(self): 172 """Close all pool connections and shutdown the pool. 173 174 Connections will be closed even if they are in use! 175 """ 176 177 from twisted.internet import reactor 178 reactor.removeSystemEventTrigger(self.shutdownID) 179 self.finalClose() 180 181 def finalClose(self): 182 """This should only be called by the shutdown trigger.""" 183 184 self.threadpool.stop() 185 for connection in self.connections.values(): 186 if self.noisy: 187 log.msg('adbapi closing: %s %s%s' % (self.dbapiName, 188 self.connargs or '', 189 self.connkw or '')) 190 connection.close() 191 self.connections.clear() 131 192 132 193 def connect(self): 133 """ Should be run in thread, blocks.194 """Return a database connection when one becomes available. This method blocks and should be run in a thread from the internal threadpool. 134 195 135 196 Don't call this method directly from non-threaded twisted code. 197 198 @return: a database connection from the pool. 136 199 """ 200 137 201 tid = self.threadID() 138 202 conn = self.connections.get(tid) 139 if not conn: 203 if conn is None: 204 if self.noisy: 205 log.msg('adbapi connecting: %s %s%s' % (self.dbapiName, 206 self.connargs or '', 207 self.connkw or '')) 140 208 conn = apply(self.dbapi.connect, self.connargs, self.connkw) 141 209 self.connections[tid] = conn 142 if self.noisy:143 log.msg('adbapi connecting: %s %s%s' %144 ( self.dbapiName, self.connargs or '', self.connkw or ''))145 210 return conn 146 211 212 def _runInteraction(self, interaction, *args, **kw): 213 trans = Transaction(self, self.connect()) 214 try: 215 result = apply(interaction, (trans,)+args, kw) 216 trans.close() 217 trans._connection.commit() 218 return result 219 except: 220 log.msg('Exception in SQL interaction. Rolling back.') 221 log.deferr() 222 trans._connection.rollback() 223 raise 224 147 225 def _runQuery(self, args, kw): 148 226 conn = self.connect() 149 227 curs = conn.cursor() … … 154 232 conn.commit() 155 233 return result 156 234 except: 235 log.msg('Exception in SQL query. Rolling back.') 236 log.deferr() 157 237 conn.rollback() 158 238 raise 159 239 160 240 def _runOperation(self, args, kw): 161 241 conn = self.connect() 162 242 curs = conn.cursor() 163 164 243 try: 165 244 apply(curs.execute, args, kw) 166 result = None167 245 curs.close() 168 246 conn.commit() 169 247 except: 170 # XXX - failures aren't working here 248 log.msg('Exception in SQL operation. Rolling back.') 249 log.deferr() 171 250 conn.rollback() 172 251 raise 173 return result 252 253 def __getstate__(self): 254 return {'dbapiName': self.dbapiName, 255 'noisy': self.noisy, 256 'min': self.min, 257 'max': self.max, 258 'connargs': self.connargs, 259 'connkw': self.connkw} 260 261 def __setstate__(self, state): 262 self.__dict__ = state 263 apply(self.__init__, (self.dbapiName, )+self.connargs, self.connkw) 264 265 def deferToThread(self, f, *args, **kwargs): 266 """Internal function. 267 268 Call f in one of the connection pool's threads. 269 """ 270 271 d = defer.Deferred() 272 self.threadpool.callInThread(threads._putResultInDeferred, 273 d, f, args, kwargs) 274 return d 174 275 175 276 def query(self, callback, errback, *args, **kw): 176 277 # this will be deprecated ASAP 177 threads.deferToThread(self._runQuery, args, kw).addCallbacks(278 self.deferToThread(self._runQuery, args, kw).addCallbacks( 178 279 callback, errback) 179 280 180 281 def operation(self, callback, errback, *args, **kw): 181 282 # this will be deprecated ASAP 182 threads.deferToThread(self._runOperation, args, kw).addCallbacks(283 self.deferToThread(self._runOperation, args, kw).addCallbacks( 183 284 callback, errback) 184 285 185 286 def synchronousOperation(self, *args, **kw): … … 187 288 188 289 def interaction(self, interaction, callback, errback, *args, **kw): 189 290 # this will be deprecated ASAP 190 apply( threads.deferToThread,291 apply(self.deferToThread, 191 292 (self._runInteraction, interaction) + args, kw).addCallbacks( 192 293 callback, errback) 193 294 194 def runOperation(self, *args, **kw):195 """Run a SQL statement and return a Deferred of result."""196 d = defer.Deferred()197 apply(self.operation, (d.callback,d.errback)+args, kw)198 return d199 200 def runQuery(self, *args, **kw):201 """Run a read-only query and return a Deferred."""202 d = defer.Deferred()203 apply(self.query, (d.callback, d.errback)+args, kw)204 return d205 206 def _runInteraction(self, interaction, *args, **kw):207 trans = Transaction(self, self.connect())208 try:209 result = apply(interaction, (trans,)+args, kw)210 except:211 log.msg('Exception in SQL interaction! rolling back...')212 log.deferr()213 trans._connection.rollback()214 raise215 else:216 trans._cursor.close()217 trans._connection.commit()218 return result219 220 def close(self):221 from twisted.internet import reactor222 reactor.removeSystemEventTrigger(self.shutdownID)223 self.finalClose()224 225 def finalClose(self):226 for connection in self.connections.values():227 if self.noisy:228 log.msg('adbapi closing: %s %s%s' % (self.dbapiName,229 self.connargs or '',230 self.connkw or ''))231 connection.close()232 295 233 296 class Augmentation: 234 297 '''A class which augments a database connector with some functionality. … … 242 305 243 306 def __init__(self, dbpool): 244 307 self.dbpool = dbpool 245 #self.createSchema()246 308 247 309 def __setstate__(self, state): 248 310 self.__dict__ = state 249 #self.createSchema()250 311 251 312 def operationDone(self, done): 252 313 """Example callback for database operation success. -
twisted/test/test_enterprise.py
RCS file: /cvs/Twisted/twisted/test/test_enterprise.py,v retrieving revision 1.16 diff -u -r1.16 test_enterprise.py
22 22 import os 23 23 import random 24 24 25 from twisted.trial.util import deferredResult26 25 from twisted.enterprise.row import RowObject 27 26 from twisted.enterprise.reflector import * 28 27 from twisted.enterprise.xmlreflector import XMLReflector 29 28 from twisted.enterprise.sqlreflector import SQLReflector 30 29 from twisted.enterprise.adbapi import ConnectionPool 31 30 from twisted.enterprise import util 31 from twisted.internet import defer 32 from twisted.trial.util import deferredResult, deferredError 33 from twisted.python import log 32 34 33 35 try: import gadfly 34 36 except: gadfly = None … … 89 91 ) 90 92 """ 91 93 94 simple_table_schema = """ 95 CREATE TABLE simple ( 96 x integer 97 ) 98 """ 99 92 100 def randomizeRow(row, nullsOK=1, trailingSpacesOK=1): 93 101 values = {} 94 102 for name, type in row.rowColumns: … … 298 306 DB_USER = 'twisted_test' 299 307 DB_PASS = 'twisted_test' 300 308 309 can_rollback = 1 310 301 311 reflectorClass = SQLReflector 302 312 303 313 def createReflector(self): … … 305 315 self.dbpool = self.makePool() 306 316 deferredResult(self.dbpool.runOperation(main_table_schema)) 307 317 deferredResult(self.dbpool.runOperation(child_table_schema)) 318 deferredResult(self.dbpool.runOperation(simple_table_schema)) 308 319 return self.reflectorClass(self.dbpool, [TestRow, ChildRow]) 309 320 310 321 def destroyReflector(self): 311 322 deferredResult(self.dbpool.runOperation('DROP TABLE testTable')) 312 323 deferredResult(self.dbpool.runOperation('DROP TABLE childTable')) 324 deferredResult(self.dbpool.runOperation('DROP TABLE simple')) 313 325 self.dbpool.close() 314 326 self.stopDB() 315 327 316 def startDB(self): pass 317 def stopDB(self): pass 328 def testPool(self): 329 # make sure failures are raised correctly 330 deferredError(self.dbpool.runQuery("select * from NOTABLE")) 331 deferredError(self.dbpool.runOperation("delete from * from NOTABLE")) 332 deferredError(self.dbpool.runInteraction(self.bad_interaction)) 333 log.flushErrors() 334 335 # verify simple table is empty 336 sql = "select count(1) from simple" 337 row = deferredResult(self.dbpool.runQuery(sql)) 338 self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back") 339 340 # add some rows to simple table (runOperation) 341 for i in range(self.count): 342 sql = "insert into simple(x) values(%d)" % i 343 deferredResult(self.dbpool.runOperation(sql)) 344 345 # make sure they were added (runQuery) 346 sql = "select x from simple order by x"; 347 rows = deferredResult(self.dbpool.runQuery(sql)) 348 self.failUnless(len(rows) == self.count, "Wrong number of rows") 349 for i in range(self.count): 350 self.failUnless(len(rows[i]) == 1, "Wrong size row") 351 self.failUnless(rows[i][0] == i, "Values not returned.") 352 353 # runInteraction 354 deferredResult(self.dbpool.runInteraction(self.interaction)) 355 356 # give the pool a workout 357 ds = [] 358 for i in range(self.count): 359 sql = "select x from simple where x = %d" % i 360 ds.append(self.dbpool.runQuery(sql)) 361 dlist = defer.DeferredList(ds, fireOnOneErrback=1) 362 result = deferredResult(dlist) 363 for i in range(self.count): 364 self.failUnless(result[i][1][0][0] == i, "Value not returned") 365 366 # now delete everything 367 ds = [] 368 for i in range(self.count): 369 sql = "delete from simple where x = %d" % i 370 ds.append(self.dbpool.runOperation(sql)) 371 dlist = defer.DeferredList(ds, fireOnOneErrback=1) 372 deferredResult(dlist) 373 374 # verify simple table is empty 375 sql = "select count(1) from simple" 376 row = deferredResult(self.dbpool.runQuery(sql)) 377 self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back") 378 379 def interaction(self, transaction): 380 transaction.execute("select x from simple order by x") 381 for i in range(self.count): 382 row = transaction.fetchone() 383 self.failUnless(len(row) == 1, "Wrong size row") 384 self.failUnless(row[0] == i, "Value not returned.") 385 # should test this, but gadfly throws an exception instead 386 #self.failUnless(transaction.fetchone() is None, "Too many rows") 387 388 def bad_interaction(self, transaction): 389 if self.can_rollback: 390 transaction.execute("insert into simple(x) values(0)") 318 391 392 transaction.execute("select * from NOTABLE") 319 393 320 class SinglePool(ConnectionPool): 321 """A pool for just one connection at a time. 322 Remove this when ConnectionPool is fixed. 323 """ 324 325 def __init__(self, connection): 326 self.connection = connection 327 328 def connect(self): 329 return self.connection 330 331 def close(self): 332 self.connection.close() 333 del self.connection 394 def startDB(self): pass 395 def stopDB(self): pass 334 396 335 397 336 398 class NoSlashSQLReflector(SQLReflector): … … 346 408 nullsOK = 0 347 409 DB_DIR = "./gadflyDB" 348 410 reflectorClass = NoSlashSQLReflector 411 can_rollback = 0 349 412 350 413 def startDB(self): 351 414 if not os.path.exists(self.DB_DIR): os.mkdir(self.DB_DIR) … … 359 422 conn.close() 360 423 361 424 def makePool(self): 362 return SinglePool(gadfly.gadfly(self.DB_NAME, self.DB_DIR))425 return ConnectionPool('gadfly', self.DB_NAME, self.DB_DIR, cp_max=1) 363 426 364 427 365 428 class SQLiteTestCase(SQLReflectorTestCase, unittest.TestCase): … … 375 438 if os.path.exists(self.database): os.unlink(self.database) 376 439 377 440 def makePool(self): 378 return SinglePool(sqlite.connect(database=self.database))441 return ConnectionPool('sqlite', database=self.database, cp_max=1) 379 442 380 443 381 444 class PostgresTestCase(SQLReflectorTestCase, unittest.TestCase): … … 384 447 385 448 def makePool(self): 386 449 return ConnectionPool('pyPgSQL.PgSQL', database=self.DB_NAME, 387 user=self.DB_USER, password=self.DB_PASS) 450 user=self.DB_USER, password=self.DB_PASS, 451 cp_min=0) 388 452 389 453 390 454 class MySQLTestCase(SQLReflectorTestCase, unittest.TestCase): … … 392 456 """ 393 457 394 458 trailingSpacesOK = 0 459 can_rollback = 0 395 460 396 461 def makePool(self): 397 462 return ConnectionPool('MySQLdb', db=self.DB_NAME, … … 410 475 411 476 412 477 if gadfly is None: GadflyTestCase.skip = 1 478 elif not getattr(gadfly, 'connect', None): gadfly.connect = gadfly.gadfly 479 413 480 if sqlite is None: SQLiteTestCase.skip = 1 414 481 415 482 if PgSQL is None: PostgresTestCase.skip = 1
