Ticket #83: twisted.3.diff

File twisted.3.diff, 20.6 KB (added by davep, 11 years ago)
  • twisted/enterprise/adbapi.py

    RCS file: /cvs/Twisted/twisted/enterprise/adbapi.py,v
    retrieving revision 1.50
    diff -u -r1.50 adbapi.py
     
    2525 
    2626class Transaction: 
    2727    """ 
    28     I am a lightweight wrapper for a database 'cursor' object.  I relay 
     28    I am a lightweight wrapper for a DB-API 'cursor' object.  I relay 
    2929    attribute access to the DB cursor. 
    3030    """ 
    3131    _cursor = None 
     
    4646class ConnectionPool(pb.Referenceable): 
    4747    """I represent a pool of connections to a DB-API 2.0 compliant database. 
    4848 
    49     You can pass the noisy arg which determines whether informational 
    50     log messages are generated during the pool's operation. 
     49    You can pass cp_min, cp_max or both to set the minimum and maximum 
     50    number of connections that will be opened by the pool. You can pass 
     51    the noisy arg which determines whether informational log messages are 
     52    generated during the pool's operation. 
    5153    """ 
    52     noisy = 1 
    5354 
    54     # XXX - make the min and max attributes (and cp_min and cp_max 
    55     # kwargs to __init__) actually do something? 
    56     min = 3 
    57     max = 5 
     55    noisy = 1   # if true, generate informational log messages 
     56    min = 3     # minimum number of connections in pool 
     57    max = 5     # maximum number of connections in pool 
     58    started = 0 # 1 if threadpool has been started 
    5859 
    5960    def __init__(self, dbapiName, *connargs, **connkw): 
    6061        """See ConnectionPool.__doc__ 
    6162        """ 
    6263        self.dbapiName = dbapiName 
    63         if self.noisy: 
    64             log.msg("Connecting to database: %s %s %s" % 
    65                     (dbapiName, connargs, connkw)) 
    6664        self.dbapi = reflect.namedModule(dbapiName) 
    6765 
    6866        if getattr(self.dbapi, 'apilevel', None) != '2.0': 
     
    7472        self.connargs = connargs 
    7573        self.connkw = connkw 
    7674 
    77         import thread 
    78         self.threadID = thread.get_ident 
    79         self.connections = {} 
    80  
    8175        if connkw.has_key('cp_min'): 
    8276            self.min = connkw['cp_min'] 
    8377            del connkw['cp_min'] 
     
    9084            self.noisy = connkw['cp_noisy'] 
    9185            del connkw['cp_noisy'] 
    9286 
     87        self.min = min(self.min, self.max) 
     88        self.max = max(self.min, self.max) 
     89 
     90        self.connections = {}  # all connections, hashed on thread id 
     91 
     92        # these are optional so import them here 
     93        from twisted.python import threadpool 
     94        import thread 
     95 
     96        self.threadID = thread.get_ident 
     97        self.threadpool = threadpool.ThreadPool(self.min, self.max) 
     98 
     99        # TODO: start up min connections 
     100 
    93101        from twisted.internet import reactor 
     102        reactor.callLater(0, self.threadpool.start) 
    94103        self.shutdownID = reactor.addSystemEventTrigger('during', 'shutdown', 
    95104                                                        self.finalClose) 
    96105 
     
    98107        """Interact with the database and return the result. 
    99108 
    100109        The 'interaction' is a callable object which will be executed in a 
    101         pooled thread.  It will be passed an L{Transaction} object as an 
    102         argument (whose interface is identical to that of the database cursor 
    103         for your DB-API module of choice), and its results will be returned as 
    104         a Deferred.  If running the method raises an exception, the transaction 
    105         will be rolled back.  If the method returns a value, the transaction 
    106         will be committed. 
     110        thread using a pooled connection. It will be passed an L{Transaction} 
     111        object as an argument (whose interface is identical to that of the 
     112        database cursor for your DB-API module of choice), and its results 
     113        will be returned as a Deferred. If running the method raises an 
     114        exception, the transaction will be rolled back. If the method returns 
     115        a value, the transaction will be committed. 
    107116 
    108117        @param interaction: a callable object whose first argument is 
    109118            L{adbapi.Transaction}. 
     
    117126        apply(self.interaction, (interaction,d.callback,d.errback,)+args, kw) 
    118127        return d 
    119128 
    120     def __getstate__(self): 
    121         return {'dbapiName': self.dbapiName, 
    122                 'noisy': self.noisy, 
    123                 'min': self.min, 
    124                 'max': self.max, 
    125                 'connargs': self.connargs, 
    126                 'connkw': self.connkw} 
     129    def runQuery(self, *args, **kw): 
     130        """Execute an SQL query and return the result. 
    127131 
    128     def __setstate__(self, state): 
    129         self.__dict__ = state 
    130         apply(self.__init__, (self.dbapiName, )+self.connargs, self.connkw) 
     132        A DB-API cursor will will be invoked with cursor.execute(*args, **kw). 
     133        The exact nature of the arguments will depend on the specific flavor 
     134        of DB-API being used, but the first argument in *args be an SQL 
     135        statement. The result of a subsequent cursor.fetchall() will be 
     136        fired to the Deferred which is returned. If either the 'execute' or 
     137        'fetchall' methods raise an exception, the transaction will be rolled 
     138        back and a Failure returned. 
     139 
     140        @param *args,**kw: arguments to be passed to a DB-API cursor's 
     141        'execute' method. 
     142 
     143        @return: a Deferred which will fire the return value of a DB-API 
     144        cursor's 'fetchall' method, or a Failure. 
     145        """ 
     146 
     147        d = defer.Deferred() 
     148        apply(self.query, (d.callback, d.errback)+args, kw) 
     149        return d 
     150 
     151    def runOperation(self, *args, **kw): 
     152        """Execute an SQL query and return None. 
     153 
     154        A DB-API cursor will will be invoked with cursor.execute(*args, **kw). 
     155        The exact nature of the arguments will depend on the specific flavor 
     156        of DB-API being used, but the first argument in *args will be an SQL 
     157        statement. This method will not attempt to fetch any results from the 
     158        query and is thus suitable for INSERT, DELETE, and other SQL statements 
     159        which do not return values. If the 'execute' method raises an exception, 
     160        the transaction will be rolled back and a Failure returned. 
     161 
     162        @param *args,**kw: arguments to be passed to a DB-API cursor's 
     163        'execute' method. 
     164 
     165        @return: a Deferred which will fire None or a Failure. 
     166        """ 
     167 
     168        d = defer.Deferred() 
     169        apply(self.operation, (d.callback, d.errback)+args, kw) 
     170        return d 
     171 
     172    def close(self): 
     173        """Close all pool connections and shutdown the pool. 
     174 
     175        Connections will be closed even if they are in use! 
     176        """ 
     177 
     178        from twisted.internet import reactor 
     179        reactor.removeSystemEventTrigger(self.shutdownID) 
     180        self.finalClose() 
     181 
     182    def finalClose(self): 
     183        """This should only be called by the shutdown trigger.""" 
     184 
     185        self.threadpool.stop() 
     186        for connection in self.connections.values(): 
     187            if self.noisy: 
     188                log.msg('adbapi closing: %s %s%s' % (self.dbapiName, 
     189                                                     self.connargs or '', 
     190                                                     self.connkw or '')) 
     191            connection.close() 
     192        self.connections.clear() 
    131193 
    132194    def connect(self): 
    133         """Should be run in thread, blocks. 
     195        """Return a database connection when one becomes available. This method blocks and should be run in a thread from the internal threadpool. 
    134196 
    135197        Don't call this method directly from non-threaded twisted code. 
     198 
     199        @return: a database connection from the pool. 
    136200        """ 
     201 
    137202        tid = self.threadID() 
    138203        conn = self.connections.get(tid) 
    139         if not conn: 
     204        if conn is None: 
     205            if self.noisy: 
     206                log.msg('adbapi connecting: %s %s%s' % (self.dbapiName, 
     207                                                        self.connargs or '', 
     208                                                        self.connkw or '')) 
    140209            conn = apply(self.dbapi.connect, self.connargs, self.connkw) 
    141210            self.connections[tid] = conn 
    142             if self.noisy: 
    143                 log.msg('adbapi connecting: %s %s%s' % 
    144                     ( self.dbapiName, self.connargs or '', self.connkw or '')) 
    145211        return conn 
    146212 
     213    def _runInteraction(self, interaction, *args, **kw): 
     214        trans = Transaction(self, self.connect()) 
     215        try: 
     216            result = apply(interaction, (trans,)+args, kw) 
     217            trans.close() 
     218            trans._connection.commit() 
     219            return result 
     220        except: 
     221            log.msg('Exception in SQL interaction. Rolling back.') 
     222            log.deferr() 
     223            trans._connection.rollback() 
     224            raise 
     225 
    147226    def _runQuery(self, args, kw): 
    148227        conn = self.connect() 
    149228        curs = conn.cursor() 
     
    154233            conn.commit() 
    155234            return result 
    156235        except: 
     236            log.msg('Exception in SQL query. Rolling back.') 
     237            log.deferr() 
    157238            conn.rollback() 
    158239            raise 
    159240 
    160241    def _runOperation(self, args, kw): 
    161242        conn = self.connect() 
    162243        curs = conn.cursor() 
    163  
    164244        try: 
    165245            apply(curs.execute, args, kw) 
    166             result = None 
    167246            curs.close() 
    168247            conn.commit() 
    169248        except: 
    170             # XXX - failures aren't working here 
     249            log.msg('Exception in SQL operation. Rolling back.') 
     250            log.deferr() 
    171251            conn.rollback() 
    172252            raise 
    173         return result 
     253 
     254    def __getstate__(self): 
     255        return {'dbapiName': self.dbapiName, 
     256                'noisy': self.noisy, 
     257                'min': self.min, 
     258                'max': self.max, 
     259                'connargs': self.connargs, 
     260                'connkw': self.connkw} 
     261 
     262    def __setstate__(self, state): 
     263        self.__dict__ = state 
     264        apply(self.__init__, (self.dbapiName, )+self.connargs, self.connkw) 
     265 
     266    def _deferToThread(self, f, *args, **kwargs): 
     267        """Internal function. 
     268 
     269        Call f in one of the connection pool's threads. 
     270        """ 
     271 
     272        d = defer.Deferred() 
     273        self.threadpool.callInThread(threads._putResultInDeferred, 
     274                                     d, f, args, kwargs) 
     275        return d 
    174276 
    175277    def query(self, callback, errback, *args, **kw): 
    176278        # this will be deprecated ASAP 
    177         threads.deferToThread(self._runQuery, args, kw).addCallbacks( 
     279        self._deferToThread(self._runQuery, args, kw).addCallbacks( 
    178280            callback, errback) 
    179281 
    180282    def operation(self, callback, errback, *args, **kw): 
    181283        # this will be deprecated ASAP 
    182         threads.deferToThread(self._runOperation, args, kw).addCallbacks( 
     284        self._deferToThread(self._runOperation, args, kw).addCallbacks( 
    183285            callback, errback) 
    184286 
    185287    def synchronousOperation(self, *args, **kw): 
     
    187289 
    188290    def interaction(self, interaction, callback, errback, *args, **kw): 
    189291        # this will be deprecated ASAP 
    190         apply(threads.deferToThread, 
     292        apply(self._deferToThread, 
    191293              (self._runInteraction, interaction) + args, kw).addCallbacks( 
    192294            callback, errback) 
    193295 
    194     def runOperation(self, *args, **kw): 
    195         """Run a SQL statement and return a Deferred of result.""" 
    196         d = defer.Deferred() 
    197         apply(self.operation, (d.callback,d.errback)+args, kw) 
    198         return d 
    199  
    200     def runQuery(self, *args, **kw): 
    201         """Run a read-only query and return a Deferred.""" 
    202         d = defer.Deferred() 
    203         apply(self.query, (d.callback, d.errback)+args, kw) 
    204         return d 
    205  
    206     def _runInteraction(self, interaction, *args, **kw): 
    207         trans = Transaction(self, self.connect()) 
    208         try: 
    209             result = apply(interaction, (trans,)+args, kw) 
    210         except: 
    211             log.msg('Exception in SQL interaction!  rolling back...') 
    212             log.deferr() 
    213             trans._connection.rollback() 
    214             raise 
    215         else: 
    216             trans._cursor.close() 
    217             trans._connection.commit() 
    218             return result 
    219  
    220     def close(self): 
    221         from twisted.internet import reactor 
    222         reactor.removeSystemEventTrigger(self.shutdownID) 
    223         self.finalClose() 
    224  
    225     def finalClose(self): 
    226         for connection in self.connections.values(): 
    227             if self.noisy: 
    228                 log.msg('adbapi closing: %s %s%s' % (self.dbapiName, 
    229                                                      self.connargs or '', 
    230                                                      self.connkw or '')) 
    231             connection.close() 
    232296 
    233297class Augmentation: 
    234298    '''A class which augments a database connector with some functionality. 
     
    242306 
    243307    def __init__(self, dbpool): 
    244308        self.dbpool = dbpool 
    245         #self.createSchema() 
    246309 
    247310    def __setstate__(self, state): 
    248311        self.__dict__ = state 
    249         #self.createSchema() 
    250312 
    251313    def operationDone(self, done): 
    252314        """Example callback for database operation success. 
  • twisted/python/threadpool.py

    RCS file: /cvs/Twisted/twisted/python/threadpool.py,v
    retrieving revision 1.20
    diff -u -r1.20 threadpool.py
     
    7070    def start(self): 
    7171        """Start the threadpool. 
    7272        """ 
    73         self.workers = self.min 
     73        self.workers = min(max(self.min, self.q.qsize()), self.max) 
    7474        self.joined = 0 
    7575        self.started = 1 
    76         for i in range(self.min): 
     76        for i in range(self.workers): 
    7777            name = "PoolThread-%s-%s" % (id(self), i) 
    7878            threading.Thread(target=self._worker, name=name).start() 
    7979 
     
    107107        self.q.put(o) 
    108108        if self.started and not self.waiters: 
    109109            self._startSomeWorkers() 
    110      
     110 
    111111    def _runWithCallback(self, callback, errback, func, args, kwargs): 
    112112        try: 
    113113            result = apply(func, args, kwargs) 
  • twisted/test/test_enterprise.py

    RCS file: /cvs/Twisted/twisted/test/test_enterprise.py,v
    retrieving revision 1.16
    diff -u -r1.16 test_enterprise.py
     
    2222import os 
    2323import random 
    2424 
    25 from twisted.trial.util import deferredResult 
    2625from twisted.enterprise.row import RowObject 
    2726from twisted.enterprise.reflector import * 
    2827from twisted.enterprise.xmlreflector import XMLReflector 
    2928from twisted.enterprise.sqlreflector import SQLReflector 
    3029from twisted.enterprise.adbapi import ConnectionPool 
    3130from twisted.enterprise import util 
     31from twisted.internet import defer 
     32from twisted.trial.util import deferredResult, deferredError 
     33from twisted.python import log 
    3234 
    3335try: import gadfly 
    3436except: gadfly = None 
     
    8991) 
    9092""" 
    9193 
     94simple_table_schema = """ 
     95CREATE TABLE simple ( 
     96  x integer 
     97) 
     98""" 
     99 
    92100def randomizeRow(row, nullsOK=1, trailingSpacesOK=1): 
    93101    values = {} 
    94102    for name, type in row.rowColumns: 
     
    298306    DB_USER = 'twisted_test' 
    299307    DB_PASS = 'twisted_test' 
    300308 
     309    can_rollback = 1 
     310 
    301311    reflectorClass = SQLReflector 
    302312 
    303313    def createReflector(self): 
     
    305315        self.dbpool = self.makePool() 
    306316        deferredResult(self.dbpool.runOperation(main_table_schema)) 
    307317        deferredResult(self.dbpool.runOperation(child_table_schema)) 
     318        deferredResult(self.dbpool.runOperation(simple_table_schema)) 
    308319        return self.reflectorClass(self.dbpool, [TestRow, ChildRow]) 
    309320 
    310321    def destroyReflector(self): 
    311322        deferredResult(self.dbpool.runOperation('DROP TABLE testTable')) 
    312323        deferredResult(self.dbpool.runOperation('DROP TABLE childTable')) 
     324        deferredResult(self.dbpool.runOperation('DROP TABLE simple')) 
    313325        self.dbpool.close() 
    314326        self.stopDB() 
    315327 
    316     def startDB(self): pass 
    317     def stopDB(self): pass 
     328    def testPool(self): 
     329        # make sure failures are raised correctly 
     330        deferredError(self.dbpool.runQuery("select * from NOTABLE")) 
     331        deferredError(self.dbpool.runOperation("delete from * from NOTABLE")) 
     332        deferredError(self.dbpool.runInteraction(self.bad_interaction)) 
     333        log.flushErrors() 
     334 
     335        # verify simple table is empty 
     336        sql = "select count(1) from simple" 
     337        row = deferredResult(self.dbpool.runQuery(sql)) 
     338        self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back") 
     339 
     340        # add some rows to simple table (runOperation) 
     341        for i in range(self.count): 
     342            sql = "insert into simple(x) values(%d)" % i 
     343            deferredResult(self.dbpool.runOperation(sql)) 
     344 
     345        # make sure they were added (runQuery) 
     346        sql = "select x from simple order by x"; 
     347        rows = deferredResult(self.dbpool.runQuery(sql)) 
     348        self.failUnless(len(rows) == self.count, "Wrong number of rows") 
     349        for i in range(self.count): 
     350            self.failUnless(len(rows[i]) == 1, "Wrong size row") 
     351            self.failUnless(rows[i][0] == i, "Values not returned.") 
     352 
     353        # runInteraction 
     354        deferredResult(self.dbpool.runInteraction(self.interaction)) 
     355 
     356        # give the pool a workout 
     357        ds = [] 
     358        for i in range(self.count): 
     359            sql = "select x from simple where x = %d" % i 
     360            ds.append(self.dbpool.runQuery(sql)) 
     361        dlist = defer.DeferredList(ds, fireOnOneErrback=1) 
     362        result = deferredResult(dlist) 
     363        for i in range(self.count): 
     364            self.failUnless(result[i][1][0][0] == i, "Value not returned") 
     365 
     366        # now delete everything 
     367        ds = [] 
     368        for i in range(self.count): 
     369            sql = "delete from simple where x = %d" % i 
     370            ds.append(self.dbpool.runOperation(sql)) 
     371        dlist = defer.DeferredList(ds, fireOnOneErrback=1) 
     372        deferredResult(dlist) 
     373 
     374        # verify simple table is empty 
     375        sql = "select count(1) from simple" 
     376        row = deferredResult(self.dbpool.runQuery(sql)) 
     377        self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back") 
     378 
     379    def interaction(self, transaction): 
     380        transaction.execute("select x from simple order by x") 
     381        for i in range(self.count): 
     382            row = transaction.fetchone() 
     383            self.failUnless(len(row) == 1, "Wrong size row") 
     384            self.failUnless(row[0] == i, "Value not returned.") 
     385        # should test this, but gadfly throws an exception instead 
     386        #self.failUnless(transaction.fetchone() is None, "Too many rows") 
     387 
     388    def bad_interaction(self, transaction): 
     389        if self.can_rollback: 
     390            transaction.execute("insert into simple(x) values(0)") 
    318391 
     392        transaction.execute("select * from NOTABLE") 
    319393 
    320 class SinglePool(ConnectionPool): 
    321     """A pool for just one connection at a time. 
    322     Remove this when ConnectionPool is fixed. 
    323     """ 
    324  
    325     def __init__(self, connection): 
    326         self.connection = connection 
    327  
    328     def connect(self): 
    329         return self.connection 
    330  
    331     def close(self): 
    332         self.connection.close() 
    333         del self.connection 
     394    def startDB(self): pass 
     395    def stopDB(self): pass 
    334396 
    335397 
    336398class NoSlashSQLReflector(SQLReflector): 
     
    346408    nullsOK = 0 
    347409    DB_DIR = "./gadflyDB" 
    348410    reflectorClass = NoSlashSQLReflector 
     411    can_rollback = 0 
    349412 
    350413    def startDB(self): 
    351414        if not os.path.exists(self.DB_DIR): os.mkdir(self.DB_DIR) 
     
    359422        conn.close() 
    360423 
    361424    def makePool(self): 
    362         return SinglePool(gadfly.gadfly(self.DB_NAME, self.DB_DIR)) 
     425        return ConnectionPool('gadfly', self.DB_NAME, self.DB_DIR, cp_max=1) 
    363426 
    364427 
    365428class SQLiteTestCase(SQLReflectorTestCase, unittest.TestCase): 
     
    375438        if os.path.exists(self.database): os.unlink(self.database) 
    376439 
    377440    def makePool(self): 
    378         return SinglePool(sqlite.connect(database=self.database)) 
     441        return ConnectionPool('sqlite', database=self.database, cp_max=1) 
    379442 
    380443 
    381444class PostgresTestCase(SQLReflectorTestCase, unittest.TestCase): 
     
    384447 
    385448    def makePool(self): 
    386449        return ConnectionPool('pyPgSQL.PgSQL', database=self.DB_NAME, 
    387                               user=self.DB_USER, password=self.DB_PASS) 
     450                              user=self.DB_USER, password=self.DB_PASS, 
     451                              cp_min=0) 
    388452 
    389453 
    390454class MySQLTestCase(SQLReflectorTestCase, unittest.TestCase): 
     
    392456    """ 
    393457 
    394458    trailingSpacesOK = 0 
     459    can_rollback = 0 
    395460 
    396461    def makePool(self): 
    397462        return ConnectionPool('MySQLdb', db=self.DB_NAME, 
     
    410475 
    411476 
    412477if gadfly is None: GadflyTestCase.skip = 1 
     478elif not getattr(gadfly, 'connect', None): gadfly.connect = gadfly.gadfly 
     479 
    413480if sqlite is None: SQLiteTestCase.skip = 1 
    414481 
    415482if PgSQL is None: PostgresTestCase.skip = 1