Ticket #4397: pgadbapi.py

File pgadbapi.py, 17.9 KB (added by Jan Urbański, 12 years ago)

the PGADBAPI module

Line 
1# -*- test-case-name: twisted.test.test_pgadbapi -*-
2# Copyright (c) 2010 Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6A Twisted wrapper for the asynchronous features of the PostgreSQL psycopg2
7driver.
8"""
9
10import psycopg2
11from psycopg2 import extensions
12from zope.interface import implements
13
14from twisted.internet import interfaces, reactor, defer
15from twisted.python import log
16
17
18class UnexpectedPollResult(Exception):
19    """
20    Polling returned an unexpected result.
21    """
22
23
24class _PollingMixin(object):
25    """
26    An object that wraps something pollable. It can take care of waiting for
27    the wrapped pollable to reach the OK state and adapts the pollable's
28    interface to U{interfaces.IReadWriteDescriptor}. It will forward all
29    attribute access that is has not been wrapped to the underlying
30    pollable. Useful as a mixin for classes that wrap a psycopg2 pollable
31    object.
32
33    @type reactor: A U{interfaces.IReactorFDSet} provider.
34    @ivar reactor: The reactor that the class will use to wait for the wrapped
35        pollable to reach the OK state.
36
37    @type prefix: C{str}
38    @ivar prefix: Prefix used during log formatting to indicate context.
39    """
40
41    implements(interfaces.IReadWriteDescriptor)
42
43    reactor = None
44    prefix = "pollable"
45    _pollingD = None
46
47    def pollable(self):
48        """
49        Return the pollable object. Subclasses should override this.
50
51        @return: A psycopg2 pollable.
52        """
53        raise NotImplementedError()
54
55    def poll(self):
56        """
57        Start polling the wrapped pollable.
58
59        @rtype: C{Deferred}
60        @return: A Deferred that will fire with an instance of this class when
61            the pollable reaches the OK state.
62        """
63        if not self._pollingD:
64            self._pollingD = defer.Deferred()
65        ret = self._pollingD
66
67        try:
68            self._pollingState = self.pollable().poll()
69        except:
70            d,  self._pollingD = self._pollingD, None
71            d.errback()
72            return ret
73
74        if self._pollingState == psycopg2.extensions.POLL_OK:
75            d, self._pollingD = self._pollingD, None
76            d.callback(self)
77        elif self._pollingState == psycopg2.extensions.POLL_WRITE:
78            self.reactor.addWriter(self)
79        elif self._pollingState == psycopg2.extensions.POLL_READ:
80            self.reactor.addReader(self)
81        else:
82            d,  self._pollingD = self._pollingD, None
83            d.errback(UnexpectedPollResult())
84
85        return ret
86
87    def doRead(self):
88        self.reactor.removeReader(self)
89        self.poll()
90
91    def doWrite(self):
92        self.reactor.removeWriter(self)
93        self.poll()
94
95    def logPrefix(self):
96        return self.prefix
97
98    def fileno(self):
99        if self.pollable():
100            return self.pollable().fileno()
101        else:
102            return -1
103
104    def connectionLost(self, reason):
105        if self._pollingD:
106            d,  self._pollingD = self._pollingD, None
107            d.errback(reason)
108
109    # forward all other access to the underlying connection
110    def __getattr__(self, name):
111        return getattr(self.pollable(), name)
112
113
114class Cursor(_PollingMixin):
115    """
116    A wrapper for a psycopg2 asynchronous cursor.
117
118    The wrapper will forward almost everything to the wrapped cursor, so the
119    usual DB-API interface can be used, but will take care of preventing
120    concurrent execution of asynchronous queries, which the PostgreSQL C
121    library does not support and will return Deferreds for some operations.
122    """
123
124    def __init__(self, cursor, connection):
125        self.reactor = connection.reactor
126        self.prefix = "cursor"
127
128        self._connection = connection
129        self._cursor = cursor
130
131    def pollable(self):
132        return self._cursor
133
134    def execute(self, query, params=None):
135        """
136        A regular DB-API execute, but returns a Deferred.
137
138        @rtype: C{Deferred}
139        @return: A C{Deferred} that will fire with the results of the
140            execute().
141        """
142        return self._connection.lock.run(
143            self._doit, 'execute', query, params)
144
145    def callproc(self, procname, params=None):
146        """
147        A regular DB-API callproc, but returns a Deferred.
148
149        @rtype: C{Deferred}
150        @return: A C{Deferred} that will fire with the results of the
151            callproc().
152        """
153        return self._connection.lock.run(
154            self._doit, 'callproc', procname, params)
155
156    def _doit(self, name, *args, **kwargs):
157        try:
158            getattr(self._cursor, name)(*args, **kwargs)
159        except:
160            return defer.fail()
161
162        return self.poll()
163
164    def close(self):
165        _cursor, self._cursor = self._cursor, None
166        return _cursor.close()
167
168    def fileno(self):
169        if self._cursor and self._connection._connection:
170            return self._cursor.fileno()
171        else:
172            return -1
173
174
175class AlreadyConnected(Exception):
176    """
177    The database connection is already open.
178    """
179
180
181class RollbackFailed(Exception):
182    """
183    Rolling back the transaction failed, the connection might be in an unusable
184    state.
185
186    @type connection: L{Connection}
187    @ivar connection: The connection that failed to roll back its transaction.
188
189    @type originalFailure: L{failure.Failure}
190    @ivar originalFailure: The failure that caused the connection to try to roll back
191        the transaction.
192    """
193
194    def __init__(self, connection, originalFailure):
195        self.connection = connection
196        self.originalFailure = originalFailure
197
198    def __str__(self):
199        return "<RollbackFailed, original error: %s>" % self.originalFailure
200
201
202class Connection(_PollingMixin):
203    """
204    A wrapper for a psycopg2 asynchronous connection.
205
206    The wrapper forwards almost everything to the wrapped connection, but
207    provides additional methods for compatibility with C{adbapi.Connection}.
208
209    @type connectionFactory: Any callable.
210    @ivar connectionFactory: The factory used to produce connections.
211
212    @type cursorFactory: Any callable.
213    @ivar cursorFactory: The factory used to produce cursors.
214    """
215
216    connectionFactory = psycopg2.connect
217    cursorFactory = Cursor
218
219    def __init__(self, reactor=None):
220        if not reactor:
221            from twisted.internet import reactor
222        self.reactor = reactor
223        self.prefix = "connection"
224
225        # this lock will be used to prevent concurrent query execution
226        self.lock = defer.DeferredLock()
227        self._connection = None
228
229    def pollable(self):
230        return self._connection
231
232    def connect(self, *args, **kwargs):
233        """
234        Connect to the database.
235
236        Positional arguments will be passed to the psycop2.connect()
237        method. Use them to pass database names, usernames, passwords, etc.
238
239        @rtype: C{Deferred}
240        @returns: A Deferred that will fire when the connection is open.
241        """
242        if self._connection:
243            return defer.fail(AlreadyConnected())
244
245        kwargs['async'] = True
246        try:
247            self._connection = self.connectionFactory(*args, **kwargs)
248        except:
249            return defer.fail()
250
251        return self.poll()
252
253    def close(self):
254        """
255        Close the connection and disconnect from the database.
256        """
257        _connection, self._connection = self._connection, None
258        _connection.close()
259
260    def cursor(self):
261        """
262        Create an asynchronous cursor.
263        """
264        return self.cursorFactory(self._connection.cursor(), self)
265
266    def runQuery(self, *args, **kwargs):
267        """
268        Execute an SQL query and return the result.
269
270        An asynchronous cursor will be created and its execute() method will
271        be invoked with the provided *args and **kwargs. After the query
272        completes the cursor's fetchall() method will be called and the
273        returned Deferred will fire with the result.
274
275        The connection is always in autocommit mode, so the query will be run
276        in a one-off transaction. In case of errors a Failure will be returned.
277
278        @rtype: C{Deferred}
279        @return: A Deferred that will fire with the return value of the
280            cursor's fetchall() method.
281        """
282        c = self.cursor()
283        d = c.execute(*args, **kwargs)
284        return d.addCallback(lambda c: c.fetchall())
285
286    def runOperation(self, *args, **kwargs):
287        """
288        Execute an SQL query and return the result.
289
290        Identical to runQuery, but the cursor's fetchall() method will not be
291        called and instead None will be returned. It is intended for statements
292        that do not normally return values, like INSERT or DELETE.
293
294        @rtype: C{Deferred}
295        @return: A Deferred that will fire None.
296        """
297        c = self.cursor()
298        d = c.execute(*args, **kwargs)
299        return d.addCallback(lambda _: None)
300
301    def runInteraction(self, interaction, *args, **kwargs):
302        """
303        Run commands in a transaction and return the result.
304
305        The 'interaction' is a callable that will be passed a
306        C{pgadbapi.Cursor} object. Before calling 'interaction' a new
307        transaction will be started, so the callable can assume to be running
308        all its commands in a transaction. If 'interaction' returns a
309        C{Deferred} processing will wait for it to fire before proceeding.
310
311        After 'interaction' finishes work the transaction will be automatically
312        committed. If it raises an exception or returns a C{Failure} the
313        connection will be rolled back instead.
314
315        If committing the transaction fails it will be rolled back instead and
316        the C{Failure} obtained trying to commit will be returned.
317
318        If rolling back the transaction fails the C{Failure} obtained from the
319        rollback attempt will be logged and a C{RollbackFailed} failure will be
320        returned. The returned failure will contain references to the original
321        C{Failure} that caused the transaction to be rolled back and to the
322        C{Connection} in which that happend, so the user can take a decision
323        whether she still wants to be using it or just close it, because an
324        open transaction might have been left open in the database.
325
326        @type interaction: Any callable
327        @param interaction: A callable whose first argument is a
328            L{pgadbapi.Cursor}.
329
330        @rtype: C{Deferred}
331        @return: A Deferred that will file with the return value of
332            'interaction'.
333        """
334        c = self.cursor()
335        d = c.execute("begin")
336        d.addCallback(interaction, *args, **kwargs)
337
338        def commitAndPassthrough(ret, cursor):
339            e = cursor.execute("commit")
340            return e.addCallback(lambda _: ret)
341        def rollbackAndPassthrough(f, cursor):
342            # maybeDeferred in case cursor.execute raises a synchronous
343            # exception
344            e = defer.maybeDeferred(cursor.execute, "rollback")
345            def just_panic(rf):
346                log.err(rf)
347                return defer.fail(RollbackFailed(self, f))
348            # if rollback failed panic
349            e.addErrback(just_panic)
350            # reraise the original failure afterwards
351            return e.addCallback(lambda _: f)
352        d.addCallback(commitAndPassthrough, c)
353        d.addErrback(rollbackAndPassthrough, c)
354
355        return d
356
357class ConnectionPool(object):
358    """
359    A poor man's pool of L{pgadbapi.Connection} instances.
360
361    @type min: C{int}
362    @ivar min: The amount of connections that will be open at start. The pool
363        never opens or closes connections on its own.
364
365    @type connectionFactory: Any callable.
366    @ivar connectionFactory: The factory used to produce connections.
367    """
368
369    min = 3
370    connectionFactory = Connection
371    reactor = None
372
373    def __init__(self, _ignored, *connargs, **connkw):
374        """
375        Create a new connection pool.
376
377        Any positional or keyword arguments other than the first one and a
378        'min' keyword argument are passed to the L{Connection} when
379        connecting. Use these arguments to pass database names, usernames,
380        passwords, etc.
381
382        @type _ignored: Any object.
383        @param _ignored: Ignored, for L{adbapi.ConnectionPool} compatibility.
384        """
385        if not self.reactor:
386            from twisted.internet import reactor
387            self.reactor = reactor
388        # for adbapi compatibility, min can be passed in kwargs
389        if 'min' in connkw:
390            self.min = connkw.pop('min')
391        self.connargs = connargs
392        self.connkw = connkw
393        self.connections = set(
394            [self.connectionFactory(self.reactor) for _ in range(self.min)])
395
396        # to avoid checking out more connections than there are pooled in total
397        self._semaphore = defer.DeferredSemaphore(self.min)
398
399    def start(self):
400        """
401        Start the connection pool.
402
403        This will create as many connections as the pool's 'min' variable says.
404
405        @rtype: C{Deferred}
406        @return: A C{Deferred} that fires when all connection have succeeded.
407        """
408        d = defer.gatherResults([c.connect(*self.connargs, **self.connkw)
409                                 for c in self.connections])
410        return d.addCallback(lambda _: self)
411
412    def close(self):
413        """
414        Stop the pool.
415
416        Disconnect all connections.
417        """
418        for c in self.connections:
419            c.close()
420
421    def remove(self, connection):
422        """
423        Remove a connection from the pool.
424
425        Provided to be able to remove broken connections from the pool. The
426        caller should make sure the removed connection does not have queries
427        pending.
428
429        @type connection: An object produced by the pool's connection factory.
430        @param connection: The connection to be removed.
431        """
432        if not self.connections:
433            raise ValueError("Connection still in use")
434        self.connections.remove(connection)
435        self._semaphore.limit -= 1
436        self._semaphore.acquire()  # bleargh...
437
438    def add(self, connection):
439        """
440        Add a connection to the pool.
441
442        Provided to be able to extend the pool with new connections.
443
444        @type connection: An object compatible with those produce by the pool's
445            connection factory.
446        @param connection: The connection to be added.
447        """
448        self.connections.add(connection)
449        self._semaphore.limit += 1
450        self._semaphore.release() # uuuugh...
451
452    def _putBackAndPassthrough(self, result, connection):
453        self.connections.add(connection)
454        return result
455
456    def runQuery(self, *args, **kwargs):
457        """
458        Execute an SQL query and return the result.
459
460        An asynchronous cursor will be created from a randomly chosen pooled
461        connection and its execute() method will be invoked with the provided
462        *args and **kwargs. After the query completes the cursor's fetchall()
463        method will be called and the returned Deferred will fire with the
464        result.
465
466        The connection is always in autocommit mode, so the query will be run
467        in a one-off transaction. In case of errors a Failure will be returned.
468
469        @rtype: C{Deferred}
470        @return: A Deferred that will fire with the return value of the
471            cursor's fetchall() method.
472        """
473        return self._semaphore.run(self._runQuery, *args, **kwargs)
474
475    def _runQuery(self, *args, **kwargs):
476        c = self.connections.pop()
477        d = c.runQuery(*args, **kwargs)
478        return d.addBoth(self._putBackAndPassthrough, c)
479
480    def runOperation(self, *args, **kwargs):
481        """
482        Execute an SQL query and return the result.
483
484        Identical to runQuery, but the cursor's fetchall() method will not be
485        called and instead None will be returned. It is intended for statements
486        that do not normally return values, like INSERT or DELETE.
487
488        @rtype: C{Deferred}
489        @return: A Deferred that will fire None.
490        """
491        return self._semaphore.run(self._runOperation, *args, **kwargs)
492
493    def _runOperation(self, *args, **kwargs):
494        c = self.connections.pop()
495        d = c.runOperation(*args, **kwargs)
496        return d.addBoth(self._putBackAndPassthrough, c)
497
498    def runInteraction(self, interaction, *args, **kwargs):
499        """
500        Run commands in a transaction and return the result.
501
502        The 'interaction' is a callable that will be passed a
503        C{pgadbapi.Cursor} object. Before calling 'interaction' a new
504        transaction will be started, so the callable can assume to be running
505        all its commands in a transaction. If 'interaction' returns a
506        C{Deferred} processing will wait for it to fire before proceeding.
507
508        After 'interaction' finishes work the transaction will be automatically
509        committed. If it raises an exception or returns a C{Failure} the
510        connection will be rolled back instead.
511
512        If committing the transaction fails it will be rolled back instead and
513        the C{Failure} obtained trying to commit will be returned.
514
515        If rolling back the transaction fails the C{Failure} obtained from the
516        rollback attempt will be logged and a C{RollbackFailed} failure will be
517        returned. The returned failure will contain references to the original
518        C{Failure} that caused the transaction to be rolled back and to the
519        C{Connection} in which that happend, so the user can take a decision
520        whether she still wants to be using it or just close it, because an
521        open transaction might have been left open in the database.
522
523        @type interaction: Any callable
524        @param interaction: A callable whose first argument is a
525            L{pgadbapi.Cursor}.
526
527        @rtype: C{Deferred}
528        @return: A Deferred that will file with the return value of
529            'interaction'.
530        """
531        return self._semaphore.run(
532            self._runInteraction, interaction, *args, **kwargs)
533
534    def _runInteraction(self, interaction, *args, **kwargs):
535        c = self.connections.pop()
536        d = c.runInteraction(interaction, *args, **kwargs)
537        return d.addBoth(self._putBackAndPassthrough, c)