Ticket #83: twisted.diff

File twisted.diff, 20.5 KB (added by davep, 13 years ago)
  • ChangeLog

    RCS file: /cvs/Twisted/ChangeLog,v
    retrieving revision 1.416
    diff -u -r1.416 ChangeLog
     
     12003-07-14  Dave Peticolas  <dave@krondo.com>
     2
     3        * twisted/test/test_enterprise.py: more tests.
     4
     5        * twisted/enterprise/adbapi.py: obey cp_min & cp_max args.
     6        add doc strings.
     7
    182003-07-12  Christopher Armstrong  <radix@twistedmatrix.com>
    29
    310        * twisted/web/util.py: Add a new ChildRedirector that, when placed
  • twisted/enterprise/adbapi.py

    RCS file: /cvs/Twisted/twisted/enterprise/adbapi.py,v
    retrieving revision 1.50
    diff -u -r1.50 adbapi.py
     
    2525
    2626class Transaction:
    2727    """
    28     I am a lightweight wrapper for a database 'cursor' object.  I relay
     28    I am a lightweight wrapper for a DB-API 'cursor' object.  I relay
    2929    attribute access to the DB cursor.
    3030    """
    3131    _cursor = None
     
    4646class ConnectionPool(pb.Referenceable):
    4747    """I represent a pool of connections to a DB-API 2.0 compliant database.
    4848
    49     You can pass the noisy arg which determines whether informational
    50     log messages are generated during the pool's operation.
     49    You can pass cp_min, cp_max or both to set the minimum and maximum
     50    number of connections that will be opened by the pool. You can pass
     51    the noisy arg which determines whether informational log messages are
     52    generated during the pool's operation.
    5153    """
    52     noisy = 1
    5354
    54     # XXX - make the min and max attributes (and cp_min and cp_max
    55     # kwargs to __init__) actually do something?
    56     min = 3
    57     max = 5
     55    noisy = 1   # if true, generate informational log messages
     56    min = 3     # minimum number of connections in pool
     57    max = 5     # maximum number of connections in pool
    5858
    5959    def __init__(self, dbapiName, *connargs, **connkw):
    6060        """See ConnectionPool.__doc__
    6161        """
    6262        self.dbapiName = dbapiName
    63         if self.noisy:
    64             log.msg("Connecting to database: %s %s %s" %
    65                     (dbapiName, connargs, connkw))
    6663        self.dbapi = reflect.namedModule(dbapiName)
    6764
    6865        if getattr(self.dbapi, 'apilevel', None) != '2.0':
     
    7471        self.connargs = connargs
    7572        self.connkw = connkw
    7673
    77         import thread
    78         self.threadID = thread.get_ident
    79         self.connections = {}
    80 
    8174        if connkw.has_key('cp_min'):
    8275            self.min = connkw['cp_min']
    8376            del connkw['cp_min']
     
    9083            self.noisy = connkw['cp_noisy']
    9184            del connkw['cp_noisy']
    9285
     86        self.min = min(self.min, self.max)
     87        self.max = max(self.min, self.max)
     88
     89        import threading # threading is an optional module so import it here
     90        self.connsem = threading.BoundedSemaphore(self.max) # limit access
     91        self.freelock = threading.Lock() # protect free list
     92
     93        self.freelist = []     # connections which aren't being used
     94        self.connections = {}  # all connections
     95        for _ in range(self.min):
     96            if self.noisy:
     97                log.msg('adbapi connecting: %s %s%s' % (self.dbapiName,
     98                                                        self.connargs or '',
     99                                                        self.connkw or ''))
     100            conn = apply(self.dbapi.connect, self.connargs, self.connkw)
     101            self.freelist.append(conn)
     102            self.connections[conn] = conn
     103
    93104        from twisted.internet import reactor
    94105        self.shutdownID = reactor.addSystemEventTrigger('during', 'shutdown',
    95106                                                        self.finalClose)
     
    98109        """Interact with the database and return the result.
    99110
    100111        The 'interaction' is a callable object which will be executed in a
    101         pooled thread.  It will be passed an L{Transaction} object as an
    102         argument (whose interface is identical to that of the database cursor
    103         for your DB-API module of choice), and its results will be returned as
    104         a Deferred.  If running the method raises an exception, the transaction
    105         will be rolled back.  If the method returns a value, the transaction
    106         will be committed.
     112        thread using a pooled connection. It will be passed an L{Transaction}
     113        object as an argument (whose interface is identical to that of the
     114        database cursor for your DB-API module of choice), and its results
     115        will be returned as a Deferred. If running the method raises an
     116        exception, the transaction will be rolled back. If the method returns
     117        a value, the transaction will be committed.
    107118
    108119        @param interaction: a callable object whose first argument is
    109120            L{adbapi.Transaction}.
     
    117128        apply(self.interaction, (interaction,d.callback,d.errback,)+args, kw)
    118129        return d
    119130
    120     def __getstate__(self):
    121         return {'dbapiName': self.dbapiName,
    122                 'noisy': self.noisy,
    123                 'min': self.min,
    124                 'max': self.max,
    125                 'connargs': self.connargs,
    126                 'connkw': self.connkw}
     131    def runQuery(self, *args, **kw):
     132        """Execute an SQL query and return the result.
    127133
    128     def __setstate__(self, state):
    129         self.__dict__ = state
    130         apply(self.__init__, (self.dbapiName, )+self.connargs, self.connkw)
     134        A DB-API cursor will will be invoked with cursor.execute(*args, **kw).
     135        The exact nature of the arguments will depend on the specific flavor
     136        of DB-API being used, but the first argument in *args be an SQL
     137        statement. The result of a subsequent cursor.fetchall() will be
     138        fired to the Deferred which is returned. If either the 'execute' or
     139        'fetchall' methods raise an exception, the transaction will be rolled
     140        back and a Failure returned.
     141
     142        @param *args,**kw: arguments to be passed to a DB-API cursor's
     143        'execute' method.
     144
     145        @return: a Deferred which will fire the return value of a DB-API
     146        cursor's 'fetchall' method, or a Failure.
     147        """
     148
     149        d = defer.Deferred()
     150        apply(self.query, (d.callback, d.errback)+args, kw)
     151        return d
     152
     153    def runOperation(self, *args, **kw):
     154        """Execute an SQL query and return None.
     155
     156        A DB-API cursor will will be invoked with cursor.execute(*args, **kw).
     157        The exact nature of the arguments will depend on the specific flavor
     158        of DB-API being used, but the first argument in *args will be an SQL
     159        statement. This method will not attempt to fetch any results from the
     160        query and is thus suitable for INSERT, DELETE, and other SQL statements
     161        which do not return values. If the 'execute' method raises an exception,
     162        the transaction will be rolled back and a Failure returned.
     163
     164        @param *args,**kw: arguments to be passed to a DB-API cursor's
     165        'execute' method.
     166
     167        @return: a Deferred which will fire None or a Failure.
     168        """
     169
     170        d = defer.Deferred()
     171        apply(self.operation, (d.callback, d.errback)+args, kw)
     172        return d
     173
     174    def close(self):
     175        """Close all pool connections.
     176
     177        Connections will be closed even if they are in use!
     178        """
     179
     180        self.finalClose()
     181
     182    def finalClose(self):
     183        """This should only be called by self.close() or the shutdown trigger.
     184        """
     185
     186        self.freelist[:] = []
     187        for connection in self.connections.values():
     188            if self.noisy:
     189                log.msg('adbapi closing: %s %s%s' % (self.dbapiName,
     190                                                     self.connargs or '',
     191                                                     self.connkw or ''))
     192            connection.close()
     193        self.connections.clear()
    131194
    132195    def connect(self):
    133         """Should be run in thread, blocks.
     196        """Return a database connection when one becomes available. This method blocks and should be run in a thread.
    134197
    135         Don't call this method directly from non-threaded twisted code.
     198        Don't call this method directly from non-threaded twisted code. This
     199        method acquires the connection semaphore. You must call dbpool.release
     200        on the connection when you are finished.
     201
     202        @return: a database connection from the pool..
    136203        """
    137         tid = self.threadID()
    138         conn = self.connections.get(tid)
     204
     205        self.connsem.acquire() # wait for our turn
     206
     207        # see if there is a free connection available
     208        self.freelock.acquire()
     209        if self.freelist: conn = self.freelist.pop()
     210        else: conn = None
     211        self.freelock.release()
     212
     213        # otherwise make a new connection
    139214        if not conn:
    140             conn = apply(self.dbapi.connect, self.connargs, self.connkw)
    141             self.connections[tid] = conn
    142215            if self.noisy:
    143                 log.msg('adbapi connecting: %s %s%s' %
    144                     ( self.dbapiName, self.connargs or '', self.connkw or ''))
     216                log.msg('adbapi connecting: %s %s%s' % (self.dbapiName,
     217                                                        self.connargs or '',
     218                                                        self.connkw or ''))
     219            conn = apply(self.dbapi.connect, self.connargs, self.connkw)
     220            self.connections[conn] = conn
    145221        return conn
    146222
     223    def release(self, connection):
     224        """Release a connection back into the pool.
     225
     226        @param connection: connection to be released. Must have been created
     227        with self.connect.
     228        """
     229
     230        self.freelist.append(connection) # atomic, no need for lock
     231        self.connsem.release()
     232
     233    def _runInteraction(self, interaction, *args, **kw):
     234        trans = Transaction(self, self.connect())
     235        try:
     236            result = apply(interaction, (trans,)+args, kw)
     237            trans.close()
     238            trans._connection.commit()
     239            self.release(trans._connection)
     240            return result
     241        except:
     242            log.msg('Exception in SQL interaction. Rolling back.')
     243            log.deferr()
     244            trans._connection.rollback()
     245            self.release(trans._connection)
     246            raise
     247
    147248    def _runQuery(self, args, kw):
    148249        conn = self.connect()
    149250        curs = conn.cursor()
     
    152253            result = curs.fetchall()
    153254            curs.close()
    154255            conn.commit()
     256            self.release(conn)
    155257            return result
    156258        except:
     259            log.msg('Exception in SQL query. Rolling back.')
     260            log.deferr()
    157261            conn.rollback()
     262            self.release(conn)
    158263            raise
    159264
    160265    def _runOperation(self, args, kw):
    161266        conn = self.connect()
    162267        curs = conn.cursor()
    163 
    164268        try:
    165269            apply(curs.execute, args, kw)
    166             result = None
    167270            curs.close()
    168271            conn.commit()
     272            self.release(conn)
    169273        except:
    170             # XXX - failures aren't working here
     274            log.msg('Exception in SQL operation. Rolling back.')
     275            log.deferr()
    171276            conn.rollback()
     277            self.release(conn)
    172278            raise
    173         return result
     279
     280    def __getstate__(self):
     281        return {'dbapiName': self.dbapiName,
     282                'noisy': self.noisy,
     283                'min': self.min,
     284                'max': self.max,
     285                'connargs': self.connargs,
     286                'connkw': self.connkw}
     287
     288    def __setstate__(self, state):
     289        self.__dict__ = state
     290        apply(self.__init__, (self.dbapiName, )+self.connargs, self.connkw)
    174291
    175292    def query(self, callback, errback, *args, **kw):
    176293        # this will be deprecated ASAP
     
    191308              (self._runInteraction, interaction) + args, kw).addCallbacks(
    192309            callback, errback)
    193310
    194     def runOperation(self, *args, **kw):
    195         """Run a SQL statement and return a Deferred of result."""
    196         d = defer.Deferred()
    197         apply(self.operation, (d.callback,d.errback)+args, kw)
    198         return d
    199 
    200     def runQuery(self, *args, **kw):
    201         """Run a read-only query and return a Deferred."""
    202         d = defer.Deferred()
    203         apply(self.query, (d.callback, d.errback)+args, kw)
    204         return d
    205 
    206     def _runInteraction(self, interaction, *args, **kw):
    207         trans = Transaction(self, self.connect())
    208         try:
    209             result = apply(interaction, (trans,)+args, kw)
    210         except:
    211             log.msg('Exception in SQL interaction!  rolling back...')
    212             log.deferr()
    213             trans._connection.rollback()
    214             raise
    215         else:
    216             trans._cursor.close()
    217             trans._connection.commit()
    218             return result
    219 
    220     def close(self):
    221         from twisted.internet import reactor
    222         reactor.removeSystemEventTrigger(self.shutdownID)
    223         self.finalClose()
    224 
    225     def finalClose(self):
    226         for connection in self.connections.values():
    227             if self.noisy:
    228                 log.msg('adbapi closing: %s %s%s' % (self.dbapiName,
    229                                                      self.connargs or '',
    230                                                      self.connkw or ''))
    231             connection.close()
    232311
    233312class Augmentation:
    234313    '''A class which augments a database connector with some functionality.
     
    242321
    243322    def __init__(self, dbpool):
    244323        self.dbpool = dbpool
    245         #self.createSchema()
    246324
    247325    def __setstate__(self, state):
    248326        self.__dict__ = state
    249         #self.createSchema()
    250327
    251328    def operationDone(self, done):
    252329        """Example callback for database operation success.
  • twisted/test/test_enterprise.py

    RCS file: /cvs/Twisted/twisted/test/test_enterprise.py,v
    retrieving revision 1.16
    diff -u -r1.16 test_enterprise.py
     
    2222import os
    2323import random
    2424
    25 from twisted.trial.util import deferredResult
    2625from twisted.enterprise.row import RowObject
    2726from twisted.enterprise.reflector import *
    2827from twisted.enterprise.xmlreflector import XMLReflector
    2928from twisted.enterprise.sqlreflector import SQLReflector
    3029from twisted.enterprise.adbapi import ConnectionPool
    3130from twisted.enterprise import util
     31from twisted.internet import defer
     32from twisted.trial.util import deferredResult, deferredError
     33from twisted.python import log
    3234
    3335try: import gadfly
    3436except: gadfly = None
     
    8991)
    9092"""
    9193
     94simple_table_schema = """
     95CREATE TABLE simple (
     96  x integer
     97)
     98"""
     99
    92100def randomizeRow(row, nullsOK=1, trailingSpacesOK=1):
    93101    values = {}
    94102    for name, type in row.rowColumns:
     
    298306    DB_USER = 'twisted_test'
    299307    DB_PASS = 'twisted_test'
    300308
     309    can_rollback = 1
     310
    301311    reflectorClass = SQLReflector
    302312
    303313    def createReflector(self):
     
    305315        self.dbpool = self.makePool()
    306316        deferredResult(self.dbpool.runOperation(main_table_schema))
    307317        deferredResult(self.dbpool.runOperation(child_table_schema))
     318        deferredResult(self.dbpool.runOperation(simple_table_schema))
    308319        return self.reflectorClass(self.dbpool, [TestRow, ChildRow])
    309320
    310321    def destroyReflector(self):
    311322        deferredResult(self.dbpool.runOperation('DROP TABLE testTable'))
    312323        deferredResult(self.dbpool.runOperation('DROP TABLE childTable'))
     324        deferredResult(self.dbpool.runOperation('DROP TABLE simple'))
    313325        self.dbpool.close()
    314326        self.stopDB()
    315327
    316     def startDB(self): pass
    317     def stopDB(self): pass
     328    def testPool(self):
     329        # make sure failures are raised correctly
     330        deferredError(self.dbpool.runQuery("select * from NOTABLE"))
     331        deferredError(self.dbpool.runOperation("delete from * from NOTABLE"))
     332        deferredError(self.dbpool.runInteraction(self.bad_interaction))
     333        log.flushErrors()
     334
     335        # verify simple table is empty
     336        sql = "select count(1) from simple"
     337        row = deferredResult(self.dbpool.runQuery(sql))
     338        self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back")
     339
     340        # add some rows to simple table (runOperation)
     341        for i in range(self.count):
     342            sql = "insert into simple(x) values(%d)" % i
     343            deferredResult(self.dbpool.runOperation(sql))
     344
     345        # make sure they were added (runQuery)
     346        sql = "select x from simple order by x";
     347        rows = deferredResult(self.dbpool.runQuery(sql))
     348        self.failUnless(len(rows) == self.count, "Wrong number of rows")
     349        for i in range(self.count):
     350            self.failUnless(len(rows[i]) == 1, "Wrong size row")
     351            self.failUnless(rows[i][0] == i, "Values not returned.")
     352
     353        # runInteraction
     354        deferredResult(self.dbpool.runInteraction(self.interaction))
     355
     356        # give the pool a workout
     357        ds = []
     358        for i in range(self.count):
     359            sql = "select x from simple where x = %d" % i
     360            ds.append(self.dbpool.runQuery(sql))
     361        dlist = defer.DeferredList(ds, fireOnOneErrback=1)
     362        result = deferredResult(dlist)
     363        for i in range(self.count):
     364            self.failUnless(result[i][1][0][0] == i, "Value not returned")
     365
     366        # now delete everything
     367        ds = []
     368        for i in range(self.count):
     369            sql = "delete from simple where x = %d" % i
     370            ds.append(self.dbpool.runOperation(sql))
     371        dlist = defer.DeferredList(ds, fireOnOneErrback=1)
     372        deferredResult(dlist)
     373
     374        # verify simple table is empty
     375        sql = "select count(1) from simple"
     376        row = deferredResult(self.dbpool.runQuery(sql))
     377        self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back")
     378
     379    def interaction(self, transaction):
     380        transaction.execute("select x from simple order by x")
     381        for i in range(self.count):
     382            row = transaction.fetchone()
     383            self.failUnless(len(row) == 1, "Wrong size row")
     384            self.failUnless(row[0] == i, "Value not returned.")
     385        # should test this, but gadfly throws an exception instead
     386        #self.failUnless(transaction.fetchone() is None, "Too many rows")
     387
     388    def bad_interaction(self, transaction):
     389        if self.can_rollback:
     390            transaction.execute("insert into simple(x) values(0)")
    318391
     392        transaction.execute("select * from NOTABLE")
    319393
    320 class SinglePool(ConnectionPool):
    321     """A pool for just one connection at a time.
    322     Remove this when ConnectionPool is fixed.
    323     """
    324 
    325     def __init__(self, connection):
    326         self.connection = connection
    327 
    328     def connect(self):
    329         return self.connection
    330 
    331     def close(self):
    332         self.connection.close()
    333         del self.connection
     394    def startDB(self): pass
     395    def stopDB(self): pass
    334396
    335397
    336398class NoSlashSQLReflector(SQLReflector):
     
    346408    nullsOK = 0
    347409    DB_DIR = "./gadflyDB"
    348410    reflectorClass = NoSlashSQLReflector
     411    can_rollback = 0
    349412
    350413    def startDB(self):
    351414        if not os.path.exists(self.DB_DIR): os.mkdir(self.DB_DIR)
     
    359422        conn.close()
    360423
    361424    def makePool(self):
    362         return SinglePool(gadfly.gadfly(self.DB_NAME, self.DB_DIR))
     425        return ConnectionPool('gadfly', self.DB_NAME, self.DB_DIR, cp_max=1)
    363426
    364427
    365428class SQLiteTestCase(SQLReflectorTestCase, unittest.TestCase):
     
    375438        if os.path.exists(self.database): os.unlink(self.database)
    376439
    377440    def makePool(self):
    378         return SinglePool(sqlite.connect(database=self.database))
     441        return ConnectionPool('sqlite', database=self.database, cp_max=1)
    379442
    380443
    381444class PostgresTestCase(SQLReflectorTestCase, unittest.TestCase):
     
    384447
    385448    def makePool(self):
    386449        return ConnectionPool('pyPgSQL.PgSQL', database=self.DB_NAME,
    387                               user=self.DB_USER, password=self.DB_PASS)
     450                              user=self.DB_USER, password=self.DB_PASS,
     451                              cp_min=0)
    388452
    389453
    390454class MySQLTestCase(SQLReflectorTestCase, unittest.TestCase):
     
    392456    """
    393457
    394458    trailingSpacesOK = 0
     459    can_rollback = 0
    395460
    396461    def makePool(self):
    397462        return ConnectionPool('MySQLdb', db=self.DB_NAME,
     
    410475
    411476
    412477if gadfly is None: GadflyTestCase.skip = 1
     478elif not getattr(gadfly, 'connect', None): gadfly.connect = gadfly.gadfly
     479
    413480if sqlite is None: SQLiteTestCase.skip = 1
    414481
    415482if PgSQL is None: PostgresTestCase.skip = 1