Ticket #83: twisted.diff
| File twisted.diff, 20.5 KB (added by davep, 10 years ago) |
|---|
-
ChangeLog
RCS file: /cvs/Twisted/ChangeLog,v retrieving revision 1.416 diff -u -r1.416 ChangeLog
1 2003-07-14 Dave Peticolas <dave@krondo.com> 2 3 * twisted/test/test_enterprise.py: more tests. 4 5 * twisted/enterprise/adbapi.py: obey cp_min & cp_max args. 6 add doc strings. 7 1 8 2003-07-12 Christopher Armstrong <radix@twistedmatrix.com> 2 9 3 10 * twisted/web/util.py: Add a new ChildRedirector that, when placed -
twisted/enterprise/adbapi.py
RCS file: /cvs/Twisted/twisted/enterprise/adbapi.py,v retrieving revision 1.50 diff -u -r1.50 adbapi.py
25 25 26 26 class Transaction: 27 27 """ 28 I am a lightweight wrapper for a database'cursor' object. I relay28 I am a lightweight wrapper for a DB-API 'cursor' object. I relay 29 29 attribute access to the DB cursor. 30 30 """ 31 31 _cursor = None … … 46 46 class ConnectionPool(pb.Referenceable): 47 47 """I represent a pool of connections to a DB-API 2.0 compliant database. 48 48 49 You can pass the noisy arg which determines whether informational 50 log messages are generated during the pool's operation. 49 You can pass cp_min, cp_max or both to set the minimum and maximum 50 number of connections that will be opened by the pool. You can pass 51 the noisy arg which determines whether informational log messages are 52 generated during the pool's operation. 51 53 """ 52 noisy = 153 54 54 # XXX - make the min and max attributes (and cp_min and cp_max 55 # kwargs to __init__) actually do something? 56 min = 3 57 max = 5 55 noisy = 1 # if true, generate informational log messages 56 min = 3 # minimum number of connections in pool 57 max = 5 # maximum number of connections in pool 58 58 59 59 def __init__(self, dbapiName, *connargs, **connkw): 60 60 """See ConnectionPool.__doc__ 61 61 """ 62 62 self.dbapiName = dbapiName 63 if self.noisy:64 log.msg("Connecting to database: %s %s %s" %65 (dbapiName, connargs, connkw))66 63 self.dbapi = reflect.namedModule(dbapiName) 67 64 68 65 if getattr(self.dbapi, 'apilevel', None) != '2.0': … … 74 71 self.connargs = connargs 75 72 self.connkw = connkw 76 73 77 import thread78 self.threadID = thread.get_ident79 self.connections = {}80 81 74 if connkw.has_key('cp_min'): 82 75 self.min = connkw['cp_min'] 83 76 del connkw['cp_min'] … … 90 83 self.noisy = connkw['cp_noisy'] 91 84 del connkw['cp_noisy'] 92 85 86 self.min = min(self.min, self.max) 87 self.max = max(self.min, self.max) 88 89 import threading # threading is an optional module so import it here 90 self.connsem = threading.BoundedSemaphore(self.max) # limit access 91 self.freelock = threading.Lock() # protect free list 92 93 self.freelist = [] # connections which aren't being used 94 self.connections = {} # all connections 95 for _ in range(self.min): 96 if self.noisy: 97 log.msg('adbapi connecting: %s %s%s' % (self.dbapiName, 98 self.connargs or '', 99 self.connkw or '')) 100 conn = apply(self.dbapi.connect, self.connargs, self.connkw) 101 self.freelist.append(conn) 102 self.connections[conn] = conn 103 93 104 from twisted.internet import reactor 94 105 self.shutdownID = reactor.addSystemEventTrigger('during', 'shutdown', 95 106 self.finalClose) … … 98 109 """Interact with the database and return the result. 99 110 100 111 The 'interaction' is a callable object which will be executed in a 101 pooled thread. It will be passed an L{Transaction} object as an102 argument (whose interface is identical to that of the database cursor103 for your DB-API module of choice), and its results will be returned as104 a Deferred. If running the method raises an exception, the transaction105 will be rolled back. If the method returns a value, the transaction106 will be committed.112 thread using a pooled connection. It will be passed an L{Transaction} 113 object as an argument (whose interface is identical to that of the 114 database cursor for your DB-API module of choice), and its results 115 will be returned as a Deferred. If running the method raises an 116 exception, the transaction will be rolled back. If the method returns 117 a value, the transaction will be committed. 107 118 108 119 @param interaction: a callable object whose first argument is 109 120 L{adbapi.Transaction}. … … 117 128 apply(self.interaction, (interaction,d.callback,d.errback,)+args, kw) 118 129 return d 119 130 120 def __getstate__(self): 121 return {'dbapiName': self.dbapiName, 122 'noisy': self.noisy, 123 'min': self.min, 124 'max': self.max, 125 'connargs': self.connargs, 126 'connkw': self.connkw} 131 def runQuery(self, *args, **kw): 132 """Execute an SQL query and return the result. 127 133 128 def __setstate__(self, state): 129 self.__dict__ = state 130 apply(self.__init__, (self.dbapiName, )+self.connargs, self.connkw) 134 A DB-API cursor will will be invoked with cursor.execute(*args, **kw). 135 The exact nature of the arguments will depend on the specific flavor 136 of DB-API being used, but the first argument in *args be an SQL 137 statement. The result of a subsequent cursor.fetchall() will be 138 fired to the Deferred which is returned. If either the 'execute' or 139 'fetchall' methods raise an exception, the transaction will be rolled 140 back and a Failure returned. 141 142 @param *args,**kw: arguments to be passed to a DB-API cursor's 143 'execute' method. 144 145 @return: a Deferred which will fire the return value of a DB-API 146 cursor's 'fetchall' method, or a Failure. 147 """ 148 149 d = defer.Deferred() 150 apply(self.query, (d.callback, d.errback)+args, kw) 151 return d 152 153 def runOperation(self, *args, **kw): 154 """Execute an SQL query and return None. 155 156 A DB-API cursor will will be invoked with cursor.execute(*args, **kw). 157 The exact nature of the arguments will depend on the specific flavor 158 of DB-API being used, but the first argument in *args will be an SQL 159 statement. This method will not attempt to fetch any results from the 160 query and is thus suitable for INSERT, DELETE, and other SQL statements 161 which do not return values. If the 'execute' method raises an exception, 162 the transaction will be rolled back and a Failure returned. 163 164 @param *args,**kw: arguments to be passed to a DB-API cursor's 165 'execute' method. 166 167 @return: a Deferred which will fire None or a Failure. 168 """ 169 170 d = defer.Deferred() 171 apply(self.operation, (d.callback, d.errback)+args, kw) 172 return d 173 174 def close(self): 175 """Close all pool connections. 176 177 Connections will be closed even if they are in use! 178 """ 179 180 self.finalClose() 181 182 def finalClose(self): 183 """This should only be called by self.close() or the shutdown trigger. 184 """ 185 186 self.freelist[:] = [] 187 for connection in self.connections.values(): 188 if self.noisy: 189 log.msg('adbapi closing: %s %s%s' % (self.dbapiName, 190 self.connargs or '', 191 self.connkw or '')) 192 connection.close() 193 self.connections.clear() 131 194 132 195 def connect(self): 133 """ Should be run in thread, blocks.196 """Return a database connection when one becomes available. This method blocks and should be run in a thread. 134 197 135 Don't call this method directly from non-threaded twisted code. 198 Don't call this method directly from non-threaded twisted code. This 199 method acquires the connection semaphore. You must call dbpool.release 200 on the connection when you are finished. 201 202 @return: a database connection from the pool.. 136 203 """ 137 tid = self.threadID() 138 conn = self.connections.get(tid) 204 205 self.connsem.acquire() # wait for our turn 206 207 # see if there is a free connection available 208 self.freelock.acquire() 209 if self.freelist: conn = self.freelist.pop() 210 else: conn = None 211 self.freelock.release() 212 213 # otherwise make a new connection 139 214 if not conn: 140 conn = apply(self.dbapi.connect, self.connargs, self.connkw)141 self.connections[tid] = conn142 215 if self.noisy: 143 log.msg('adbapi connecting: %s %s%s' % 144 ( self.dbapiName, self.connargs or '', self.connkw or '')) 216 log.msg('adbapi connecting: %s %s%s' % (self.dbapiName, 217 self.connargs or '', 218 self.connkw or '')) 219 conn = apply(self.dbapi.connect, self.connargs, self.connkw) 220 self.connections[conn] = conn 145 221 return conn 146 222 223 def release(self, connection): 224 """Release a connection back into the pool. 225 226 @param connection: connection to be released. Must have been created 227 with self.connect. 228 """ 229 230 self.freelist.append(connection) # atomic, no need for lock 231 self.connsem.release() 232 233 def _runInteraction(self, interaction, *args, **kw): 234 trans = Transaction(self, self.connect()) 235 try: 236 result = apply(interaction, (trans,)+args, kw) 237 trans.close() 238 trans._connection.commit() 239 self.release(trans._connection) 240 return result 241 except: 242 log.msg('Exception in SQL interaction. Rolling back.') 243 log.deferr() 244 trans._connection.rollback() 245 self.release(trans._connection) 246 raise 247 147 248 def _runQuery(self, args, kw): 148 249 conn = self.connect() 149 250 curs = conn.cursor() … … 152 253 result = curs.fetchall() 153 254 curs.close() 154 255 conn.commit() 256 self.release(conn) 155 257 return result 156 258 except: 259 log.msg('Exception in SQL query. Rolling back.') 260 log.deferr() 157 261 conn.rollback() 262 self.release(conn) 158 263 raise 159 264 160 265 def _runOperation(self, args, kw): 161 266 conn = self.connect() 162 267 curs = conn.cursor() 163 164 268 try: 165 269 apply(curs.execute, args, kw) 166 result = None167 270 curs.close() 168 271 conn.commit() 272 self.release(conn) 169 273 except: 170 # XXX - failures aren't working here 274 log.msg('Exception in SQL operation. Rolling back.') 275 log.deferr() 171 276 conn.rollback() 277 self.release(conn) 172 278 raise 173 return result 279 280 def __getstate__(self): 281 return {'dbapiName': self.dbapiName, 282 'noisy': self.noisy, 283 'min': self.min, 284 'max': self.max, 285 'connargs': self.connargs, 286 'connkw': self.connkw} 287 288 def __setstate__(self, state): 289 self.__dict__ = state 290 apply(self.__init__, (self.dbapiName, )+self.connargs, self.connkw) 174 291 175 292 def query(self, callback, errback, *args, **kw): 176 293 # this will be deprecated ASAP … … 191 308 (self._runInteraction, interaction) + args, kw).addCallbacks( 192 309 callback, errback) 193 310 194 def runOperation(self, *args, **kw):195 """Run a SQL statement and return a Deferred of result."""196 d = defer.Deferred()197 apply(self.operation, (d.callback,d.errback)+args, kw)198 return d199 200 def runQuery(self, *args, **kw):201 """Run a read-only query and return a Deferred."""202 d = defer.Deferred()203 apply(self.query, (d.callback, d.errback)+args, kw)204 return d205 206 def _runInteraction(self, interaction, *args, **kw):207 trans = Transaction(self, self.connect())208 try:209 result = apply(interaction, (trans,)+args, kw)210 except:211 log.msg('Exception in SQL interaction! rolling back...')212 log.deferr()213 trans._connection.rollback()214 raise215 else:216 trans._cursor.close()217 trans._connection.commit()218 return result219 220 def close(self):221 from twisted.internet import reactor222 reactor.removeSystemEventTrigger(self.shutdownID)223 self.finalClose()224 225 def finalClose(self):226 for connection in self.connections.values():227 if self.noisy:228 log.msg('adbapi closing: %s %s%s' % (self.dbapiName,229 self.connargs or '',230 self.connkw or ''))231 connection.close()232 311 233 312 class Augmentation: 234 313 '''A class which augments a database connector with some functionality. … … 242 321 243 322 def __init__(self, dbpool): 244 323 self.dbpool = dbpool 245 #self.createSchema()246 324 247 325 def __setstate__(self, state): 248 326 self.__dict__ = state 249 #self.createSchema()250 327 251 328 def operationDone(self, done): 252 329 """Example callback for database operation success. -
twisted/test/test_enterprise.py
RCS file: /cvs/Twisted/twisted/test/test_enterprise.py,v retrieving revision 1.16 diff -u -r1.16 test_enterprise.py
22 22 import os 23 23 import random 24 24 25 from twisted.trial.util import deferredResult26 25 from twisted.enterprise.row import RowObject 27 26 from twisted.enterprise.reflector import * 28 27 from twisted.enterprise.xmlreflector import XMLReflector 29 28 from twisted.enterprise.sqlreflector import SQLReflector 30 29 from twisted.enterprise.adbapi import ConnectionPool 31 30 from twisted.enterprise import util 31 from twisted.internet import defer 32 from twisted.trial.util import deferredResult, deferredError 33 from twisted.python import log 32 34 33 35 try: import gadfly 34 36 except: gadfly = None … … 89 91 ) 90 92 """ 91 93 94 simple_table_schema = """ 95 CREATE TABLE simple ( 96 x integer 97 ) 98 """ 99 92 100 def randomizeRow(row, nullsOK=1, trailingSpacesOK=1): 93 101 values = {} 94 102 for name, type in row.rowColumns: … … 298 306 DB_USER = 'twisted_test' 299 307 DB_PASS = 'twisted_test' 300 308 309 can_rollback = 1 310 301 311 reflectorClass = SQLReflector 302 312 303 313 def createReflector(self): … … 305 315 self.dbpool = self.makePool() 306 316 deferredResult(self.dbpool.runOperation(main_table_schema)) 307 317 deferredResult(self.dbpool.runOperation(child_table_schema)) 318 deferredResult(self.dbpool.runOperation(simple_table_schema)) 308 319 return self.reflectorClass(self.dbpool, [TestRow, ChildRow]) 309 320 310 321 def destroyReflector(self): 311 322 deferredResult(self.dbpool.runOperation('DROP TABLE testTable')) 312 323 deferredResult(self.dbpool.runOperation('DROP TABLE childTable')) 324 deferredResult(self.dbpool.runOperation('DROP TABLE simple')) 313 325 self.dbpool.close() 314 326 self.stopDB() 315 327 316 def startDB(self): pass 317 def stopDB(self): pass 328 def testPool(self): 329 # make sure failures are raised correctly 330 deferredError(self.dbpool.runQuery("select * from NOTABLE")) 331 deferredError(self.dbpool.runOperation("delete from * from NOTABLE")) 332 deferredError(self.dbpool.runInteraction(self.bad_interaction)) 333 log.flushErrors() 334 335 # verify simple table is empty 336 sql = "select count(1) from simple" 337 row = deferredResult(self.dbpool.runQuery(sql)) 338 self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back") 339 340 # add some rows to simple table (runOperation) 341 for i in range(self.count): 342 sql = "insert into simple(x) values(%d)" % i 343 deferredResult(self.dbpool.runOperation(sql)) 344 345 # make sure they were added (runQuery) 346 sql = "select x from simple order by x"; 347 rows = deferredResult(self.dbpool.runQuery(sql)) 348 self.failUnless(len(rows) == self.count, "Wrong number of rows") 349 for i in range(self.count): 350 self.failUnless(len(rows[i]) == 1, "Wrong size row") 351 self.failUnless(rows[i][0] == i, "Values not returned.") 352 353 # runInteraction 354 deferredResult(self.dbpool.runInteraction(self.interaction)) 355 356 # give the pool a workout 357 ds = [] 358 for i in range(self.count): 359 sql = "select x from simple where x = %d" % i 360 ds.append(self.dbpool.runQuery(sql)) 361 dlist = defer.DeferredList(ds, fireOnOneErrback=1) 362 result = deferredResult(dlist) 363 for i in range(self.count): 364 self.failUnless(result[i][1][0][0] == i, "Value not returned") 365 366 # now delete everything 367 ds = [] 368 for i in range(self.count): 369 sql = "delete from simple where x = %d" % i 370 ds.append(self.dbpool.runOperation(sql)) 371 dlist = defer.DeferredList(ds, fireOnOneErrback=1) 372 deferredResult(dlist) 373 374 # verify simple table is empty 375 sql = "select count(1) from simple" 376 row = deferredResult(self.dbpool.runQuery(sql)) 377 self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back") 378 379 def interaction(self, transaction): 380 transaction.execute("select x from simple order by x") 381 for i in range(self.count): 382 row = transaction.fetchone() 383 self.failUnless(len(row) == 1, "Wrong size row") 384 self.failUnless(row[0] == i, "Value not returned.") 385 # should test this, but gadfly throws an exception instead 386 #self.failUnless(transaction.fetchone() is None, "Too many rows") 387 388 def bad_interaction(self, transaction): 389 if self.can_rollback: 390 transaction.execute("insert into simple(x) values(0)") 318 391 392 transaction.execute("select * from NOTABLE") 319 393 320 class SinglePool(ConnectionPool): 321 """A pool for just one connection at a time. 322 Remove this when ConnectionPool is fixed. 323 """ 324 325 def __init__(self, connection): 326 self.connection = connection 327 328 def connect(self): 329 return self.connection 330 331 def close(self): 332 self.connection.close() 333 del self.connection 394 def startDB(self): pass 395 def stopDB(self): pass 334 396 335 397 336 398 class NoSlashSQLReflector(SQLReflector): … … 346 408 nullsOK = 0 347 409 DB_DIR = "./gadflyDB" 348 410 reflectorClass = NoSlashSQLReflector 411 can_rollback = 0 349 412 350 413 def startDB(self): 351 414 if not os.path.exists(self.DB_DIR): os.mkdir(self.DB_DIR) … … 359 422 conn.close() 360 423 361 424 def makePool(self): 362 return SinglePool(gadfly.gadfly(self.DB_NAME, self.DB_DIR))425 return ConnectionPool('gadfly', self.DB_NAME, self.DB_DIR, cp_max=1) 363 426 364 427 365 428 class SQLiteTestCase(SQLReflectorTestCase, unittest.TestCase): … … 375 438 if os.path.exists(self.database): os.unlink(self.database) 376 439 377 440 def makePool(self): 378 return SinglePool(sqlite.connect(database=self.database))441 return ConnectionPool('sqlite', database=self.database, cp_max=1) 379 442 380 443 381 444 class PostgresTestCase(SQLReflectorTestCase, unittest.TestCase): … … 384 447 385 448 def makePool(self): 386 449 return ConnectionPool('pyPgSQL.PgSQL', database=self.DB_NAME, 387 user=self.DB_USER, password=self.DB_PASS) 450 user=self.DB_USER, password=self.DB_PASS, 451 cp_min=0) 388 452 389 453 390 454 class MySQLTestCase(SQLReflectorTestCase, unittest.TestCase): … … 392 456 """ 393 457 394 458 trailingSpacesOK = 0 459 can_rollback = 0 395 460 396 461 def makePool(self): 397 462 return ConnectionPool('MySQLdb', db=self.DB_NAME, … … 410 475 411 476 412 477 if gadfly is None: GadflyTestCase.skip = 1 478 elif not getattr(gadfly, 'connect', None): gadfly.connect = gadfly.gadfly 479 413 480 if sqlite is None: SQLiteTestCase.skip = 1 414 481 415 482 if PgSQL is None: PostgresTestCase.skip = 1
