Ticket #4397: test_pgadbapi.py

File test_pgadbapi.py, 21.1 KB (added by Jan Urbański, 12 years ago)

tests for PGADBAPI

Line 
1# Copyright (c) 2010 Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Tests for twisted.enterprise.pgadbapi.
6"""
7
8try:
9    import psycopg2
10    import psycopg2.extensions
11except ImportError:
12    psycopg2 = None
13
14if psycopg2:
15    try:
16        psycopg2.extensions.POLL_OK
17    except AttributeError:
18        psycopg2.extensions.POLL_OK = None
19
20from twisted.enterprise import pgadbapi
21
22from twisted.trial import unittest
23from twisted.internet import defer
24
25simple_table_schema = """
26CREATE TABLE simple (
27  x integer
28)
29"""
30
31# DB_NAME = "twisted_test"
32# DB_HOST = "localhost"
33# DB_USER = "twisted_test"
34# DB_PASS = "twisted_test"
35DB_NAME = "psycopg2_test"
36DB_HOST = "localhost"
37DB_USER = "wulczer"
38DB_PASS = ""
39
40_skip = None
41if psycopg2 is None:
42    _skip = "psycopg2 not installed"
43elif psycopg2.extensions.POLL_OK is None:
44    _skip = ("psycopg2 does not have async support "
45             "(what? you're not running git master?)")
46
47
48class Psycopg2TestCase(unittest.TestCase):
49
50    skip = _skip
51
52
53class PollableThing(object):
54    """
55    A fake thing that provides a psycopg2 pollable interface.
56    """
57    def __init__(self):
58        self.NEXT_STATE = psycopg2.extensions.POLL_READ
59
60    def poll(self):
61        if self.NEXT_STATE is None:
62            raise Exception("no next state")
63        return self.NEXT_STATE
64
65    def fileno(self):
66        return 42
67
68
69class FakeReactor(object):
70    """
71    A reactor that just counts how many things were added and removed.
72    """
73    readersAdded = 0
74    writersAdded = 0
75    readersRemoved = 0
76    writersRemoved = 0
77
78    def reset(self):
79        self.readersAdded = self.writersAdded = 0
80        self.readersRemoved = self.writersRemoved = 0
81    def addReader(self, _):
82        self.readersAdded += 1
83    def addWriter(self, _):
84        self.writersAdded += 1
85    def removeReader(self, _):
86        self.readersRemoved += 1
87    def removeWriter(self, _):
88        self.writersRemoved += 1
89
90
91class FakeWrapper(pgadbapi._PollingMixin):
92    """
93    A mock subclass of L{pgadbapi._PollingMixin}.
94    """
95    reactor = FakeReactor()
96    prefix = "fake-wrapper"
97
98    def pollable(self):
99        return self._pollable
100
101
102class PGADBAPIPollingMixingTestCase(Psycopg2TestCase):
103
104    def test_empty(self):
105        """
106        The default L{pgadbapi._PollingMixin} implementation raises an
107        exception on pollable().
108        """
109        p = pgadbapi._PollingMixin()
110        self.assertRaises(NotImplementedError, p.pollable)
111
112    def check(self, r, *args):
113        self.assertEquals(args, (r.readersAdded, r.writersAdded,
114                                 r.readersRemoved, r.writersRemoved))
115
116    def test_polling(self):
117        """
118        L{pgadbapi._PollingMixin} adds and removes itself from the reactor
119        according to poll() results from the wrapped pollable.
120        """
121        p = FakeWrapper()
122        p._pollable = PollableThing()
123        p.reactor.reset()
124
125        # start off with reading
126        p._pollable.NEXT_STATE = psycopg2.extensions.POLL_READ
127        d = p.poll()
128        # after the initial poll we should get a Deferred and one reader added
129        self.check(p.reactor, 1, 0, 0, 0)
130
131        p._pollable.NEXT_STATE = psycopg2.extensions.POLL_WRITE
132        p.doRead()
133        # the reader should get removed and a writer should get added, because
134        # we made the next poll() return POLL_WRITE
135        self.check(p.reactor, 1, 1, 1, 0)
136
137        p._pollable.NEXT_STATE = psycopg2.extensions.POLL_READ
138        p.doWrite()
139        # the writer is removed, a reader is added
140        self.check(p.reactor, 2, 1, 1, 1)
141
142        p._pollable.NEXT_STATE = psycopg2.extensions.POLL_READ
143        p.doRead()
144        # the reader is removed, but then readded because we returned POLL_READ
145        self.check(p.reactor, 3, 1, 2, 1)
146
147        p._pollable.NEXT_STATE = psycopg2.extensions.POLL_OK
148        p.doRead()
149        # we're done, the reader should just get removed
150        self.check(p.reactor, 3, 1, 3, 1)
151
152        # and the Deferred should succeed
153        return d
154
155    def test_interface(self):
156        """
157        L{pgadbapi._PollingMixin} correctly provides the
158        L{interfaces.IReadWriteDescriptor} interface.
159        """
160        p = FakeWrapper()
161        p._pollable = PollableThing()
162
163        self.assertEquals(p.fileno(), 42)
164        self.assertEquals(p.logPrefix(), "fake-wrapper")
165
166        p._pollable = None
167        self.assertEquals(p.fileno(), -1)
168
169    def test_connectionLost(self):
170        """
171        Calling connectionLost() errbacks the C{Deferred} returned from poll()
172        but another connectionLost() is harmless.
173        """
174        p = FakeWrapper()
175        p._pollable = PollableThing()
176
177        d = p.poll()
178        p.connectionLost(RuntimeError("boom"))
179        d = self.assertFailure(d, RuntimeError)
180        p.connectionLost(RuntimeError("bam"))
181        return d
182
183    def test_errors(self):
184        """
185        Unexpected results from poll() make L{pgadbapi._PollingMixin} raise an
186        exception.
187        """
188        p = FakeWrapper()
189        p._pollable = PollableThing()
190
191        p._pollable.NEXT_STATE = "foo"
192        d = p.poll()
193        return self.assertFailure(d, pgadbapi.UnexpectedPollResult)
194
195
196class PGADBAPIConnectionTestCase(Psycopg2TestCase):
197
198    def test_simpleConnection(self):
199        """
200        Just connecting and disconnecting works.
201        """
202        conn = pgadbapi.Connection()
203        d = conn.connect(user=DB_USER, password=DB_PASS,
204                         host=DB_HOST, database=DB_NAME)
205        d.addCallback(lambda c: c.close())
206        return d
207
208    def test_connectionSetup(self):
209        """
210        The created connection should be asynchronous and in autocommit mode
211        and the C{Deferred} returned from connect() should fire with the
212        connection itself.
213        """
214        conn = pgadbapi.Connection()
215        d = conn.connect(user=DB_USER, password=DB_PASS,
216                         host=DB_HOST, database=DB_NAME)
217
218        def doChecks(c):
219            self.assertIdentical(c, conn)
220            self.assertFalse(c.issync())
221            self.assertEquals(c.isolation_level, 0)
222            return c
223        d.addCallback(doChecks)
224        return d.addCallback(lambda c: c.close())
225
226    def test_multipleConnections(self):
227        """
228        Trying to connect twice raises an exception, but after closing you can
229        connect again.
230        """
231        conn = pgadbapi.Connection()
232        d = conn.connect(user=DB_USER, password=DB_PASS,
233                         host=DB_HOST, database=DB_NAME)
234
235        d.addCallback(lambda c: conn.connect(
236                user=DB_USER, password=DB_PASS,
237                host=DB_HOST, database=DB_NAME))
238        d = self.failUnlessFailure(d, pgadbapi.AlreadyConnected)
239
240        d.addCallback(lambda _: conn.close())
241        d.addCallback(lambda _: conn.connect(
242                user=DB_USER, password=DB_PASS,
243                host=DB_HOST, database=DB_NAME))
244        return d.addCallback(lambda c: c.close())
245
246    def test_errors(self):
247        """
248        Errors from psycopg2's poll() make connect() return failures. Errors on
249        creating the psycopg2 connection too. Unexpected results from poll()
250        also make connect() return a failure.
251        """
252        conn = pgadbapi.Connection()
253        class BadPollable(object):
254            def __init__(*args, **kwars):
255                pass
256            def poll(self):
257                raise RuntimeError("booga")
258            def close(self):
259                pass
260        conn.connectionFactory = BadPollable
261
262        d = conn.connect()
263        d = self.assertFailure(d, RuntimeError)
264        d.addCallback(lambda _: conn.close())
265
266        class BadThing(object):
267            def __init__(*args, **kwargs):
268                raise RuntimeError("wooga")
269            def close(self):
270                pass
271        conn.connectionFactory = BadThing
272
273        d.addCallback(lambda _: conn.connect())
274        d = self.assertFailure(d, RuntimeError)
275
276        class BrokenPollable(object):
277            def __init__(*args, **kwars):
278                pass
279            def poll(self):
280                return "tee hee hee"
281            def close(self):
282                pass
283        conn.connectionFactory = BrokenPollable
284
285        d.addCallback(lambda _: conn.connect())
286        return self.assertFailure(d, pgadbapi.UnexpectedPollResult)
287
288
289class _SimpleDBSetupMixin(object):
290
291    def setUp(self):
292        self.conn = pgadbapi.Connection()
293        d = self.conn.connect(user=DB_USER, password=DB_PASS,
294                              host=DB_HOST, database=DB_NAME)
295        d.addCallback(lambda c: c.cursor())
296        return d.addCallback(lambda c: c.execute(simple_table_schema))
297
298    def tearDown(self):
299        c = self.conn.cursor()
300        d = c.execute("drop table simple")
301        return d.addCallback(lambda _: self.conn.close())
302
303
304
305class PGADBAPIManualQueryTestCase(_SimpleDBSetupMixin, Psycopg2TestCase):
306
307    def test_simpleQuery(self):
308        """
309        A simple select works.
310        """
311        c = self.conn.cursor()
312        return c.execute("select * from simple")
313
314    def test_simpleCallproc(self):
315        """
316        A simple procedure call works.
317        """
318        c = self.conn.cursor()
319        return c.callproc("now")
320
321    def test_closeCursor(self):
322        """
323        Closing the cursor works.
324        """
325        c = self.conn.cursor()
326        d = c.execute("select 1")
327        return d.addCallback(lambda c: c.close())
328
329    def test_queryResults(self):
330        """
331        Query results are obtainable from the asynchronous cursor.
332        """
333        c = self.conn.cursor()
334        d = defer.Deferred()
335        d.addCallback(
336            lambda c: c.execute("insert into simple values (%s)", (1, )))
337        d.addCallback(
338            lambda c: c.execute("insert into simple values (%s)", (2, )))
339        d.addCallback(
340            lambda c: c.execute("insert into simple values (%s)", (3, )))
341        d.addCallback(
342            lambda c: c.execute("select * from simple"))
343        d.addCallback(
344            lambda c: self.assertEquals(c.fetchall(), [(1, ), (2, ), (3, )]))
345        d.callback(c)
346        return d
347
348    def test_multipleQueries(self):
349        """
350        Multiple calls to execute() without waiting for the previous one to
351        finish work and return correct results.
352        """
353        cursors = [self.conn.cursor() for _ in range(5)]
354        d = defer.gatherResults([c.execute("select %s", (i, ))
355                                 for i, c in enumerate(cursors)])
356        d.addCallback(
357            lambda cursors: self.assertEquals(
358                sorted(map(lambda c: c.fetchone()[0], cursors)),
359                [0, 1, 2, 3, 4]))
360        return d
361
362    def test_errors(self):
363        """
364        Errors from the database are reported as failures.
365        """
366        c = self.conn.cursor()
367        d = c.execute("select * from nonexistent")
368        return self.assertFailure(d, psycopg2.ProgrammingError)
369
370    def test_wrongCall(self):
371        """
372        Errors raised inside psycopg2 are reported as failures.
373        """
374        c = self.conn.cursor()
375        d = c.execute("select %s", "whoops")
376        return self.assertFailure(d, TypeError)
377
378    def test_manualTransactions(self):
379        """
380        Transactions can be constructed manually by issuing BEGIN and ROLLBACK
381        as appropriate, and should work.
382        """
383        c = self.conn.cursor()
384        d = defer.Deferred()
385        d.addCallback(
386            lambda c: c.execute("begin"))
387        d.addCallback(
388            lambda c: c.execute("insert into simple values (%s)", (1, )))
389        d.addCallback(
390            lambda c: c.execute("insert into simple values (%s)", (2, )))
391        d.addCallback(
392            lambda c: c.execute("select * from simple order by x"))
393        d.addCallback(
394            lambda c: self.assertEquals(c.fetchall(), [(1, ), (2, )]))
395        d.addCallback(
396            lambda _: c.execute("rollback"))
397        d.addCallback(
398            lambda c: c.execute("select * from simple"))
399        d.addCallback(
400            lambda c: self.assertEquals(c.fetchall(), []))
401        d.callback(c)
402        return d
403
404
405class NotRollingBackCursor(pgadbapi.Cursor):
406    """
407    A cursor that does not like rolling back.
408    """
409    def _doit(self, name, *args, **kwargs):
410        if name == "execute" and args == ("rollback", None):
411            raise RuntimeError("boom")
412        return pgadbapi.Cursor._doit(self, name, *args, **kwargs)
413
414
415class PGADBAPIQueryTestCase(_SimpleDBSetupMixin, Psycopg2TestCase):
416
417    def test_runQuery(self):
418        """
419        runQuery() works and returns the result.
420        """
421        d = self.conn.runQuery("select 1")
422        return d.addCallback(self.assertEquals, [(1, )])
423
424    def test_runOperation(self):
425        """
426        runOperation() works and executes the operation while returning None.
427        """
428        d = self.conn.runQuery("select count(*) from simple")
429        d.addCallback(self.assertEquals, [(0, )])
430
431        d.addCallback(lambda _: self.conn.runOperation(
432                "insert into simple values (%s)", (1, )))
433        d.addCallback(self.assertIdentical, None)
434
435        d.addCallback(lambda _: self.conn.runQuery(
436                    "select count(*) from simple"))
437        return d.addCallback(self.assertEquals, [(1, )])
438
439    def test_runSimpleInteraction(self):
440        """
441        Interactions are being run in a transaction, the parameters from
442        runInteraction are being passed to them and they are being committed
443        after they return. Their return value becomes the return value of the
444        Deferred from runInteraction.
445        """
446        def interaction(c, arg1, kwarg1):
447            self.assertEquals(arg1, "foo")
448            self.assertEquals(kwarg1, "bar")
449            d = c.execute("insert into simple values (1)")
450            d.addCallback(lambda c: c.execute("insert into simple values (2)"))
451            return d.addCallback(lambda _: "interaction done")
452
453        d = self.conn.runInteraction(interaction, "foo", kwarg1="bar")
454
455        d.addCallback(self.assertEquals, "interaction done")
456
457        d.addCallback(lambda _: self.conn.runQuery(
458                "select * from simple order by x"))
459        return d.addCallback(self.assertEquals, [(1, ), (2, )])
460
461    def test_runErrorInteraction(self):
462        """
463        Interactions that produce errors are rolled back and the correct error
464        is reported.
465        """
466        def interaction(c):
467            d = c.execute("insert into simple values (1)")
468            return d.addCallback(
469                lambda c: c.execute("select * from nope_not_here"))
470
471        d = self.conn.runInteraction(interaction)
472        d = self.assertFailure(d, psycopg2.ProgrammingError)
473
474        d.addCallback(lambda _: self.conn.runQuery(
475                "select count(*) from simple"))
476        return d.addCallback(self.assertEquals, [(0, )])
477
478    def test_errorOnRollback(self):
479        """
480        Interactions that produce errors and are unable to roll back return a
481        L{pgadbapi.RollbackFailed} failure that has references to the faulty
482        connection and the original failure that cause all that trouble.
483        """
484        def interaction(c):
485            d = c.execute("insert into simple values (1)")
486            return d.addCallback(
487                lambda c: c.execute("select * from nope_not_here"))
488
489        mp = self.patch(self.conn, 'cursorFactory', NotRollingBackCursor)
490
491        d = self.conn.runInteraction(interaction)
492        d.addCallback(lambda _: self.fail("No exception"))
493
494        def check_error(f):
495            f.trap(pgadbapi.RollbackFailed)
496            original = f.value.originalFailure
497            # original should reference the error that started all the mess
498            self.assertIsInstance(original.value,
499                                  psycopg2.ProgrammingError)
500            self.assertEquals(
501                str(f.value),
502                "<RollbackFailed, original error: %s>" % original)
503            # the error from the failed rollback should get logged
504            errors = self.flushLoggedErrors()
505            self.assertEquals(len(errors), 1)
506            self.assertEquals(errors[0].value.args[0], "boom")
507            # restore or we won't be able to clean up the mess
508            mp.restore()
509        d.addErrback(check_error)
510
511        # rollback for real, or tearDown won't be able to drop the table
512        return d.addCallback(lambda _: self.conn.runOperation("rollback"))
513
514
515class PGADBAPIConnectionPoolTestCase(Psycopg2TestCase):
516
517    def setUp(self):
518        self.pool = pgadbapi.ConnectionPool(
519            None, user=DB_USER, password=DB_PASS,
520            host=DB_HOST, database=DB_NAME)
521        return self.pool.start()
522
523    def tearDown(self):
524        return self.pool.close()
525
526    def test_basics(self):
527        """
528        Exactly 'min' connections are always created.
529        """
530        self.assertEquals(len(self.pool.connections), self.pool.min)
531
532    def test_simpleQuery(self):
533        """
534        The pool can run 'min' queries in parallel without making any of them
535        wait. The queries return correct values.
536        """
537        ds = [self.pool.runQuery("select 1") for _ in range(self.pool.min)]
538        self.assertEquals(len(self.pool._semaphore.waiting), 0)
539
540        d = defer.gatherResults(ds)
541        return d.addCallback(self.assertEquals, [[(1, )]] * self.pool.min)
542
543    def test_moreQueries(self):
544        """
545        The pool can handle more parallel queries than its size.
546        """
547        d = defer.gatherResults(
548            [self.pool.runQuery("select 1") for _ in range(self.pool.min * 5)])
549        return d.addCallback(self.assertEquals, [[(1, )]] * self.pool.min * 5)
550
551    def test_operation(self):
552        """
553        The pool's runOperation works.
554        """
555        d = self.pool.runOperation("create table x (i int)")
556        # give is a workout, 20 x the number of connections
557        d.addCallback(lambda _: defer.gatherResults(
558                [self.pool.runOperation("insert into x values (%s)", (i, ))
559                 for i in range(self.pool.min * 20)]))
560        d.addCallback(lambda _: self.pool.runQuery(
561                "select * from x order by i"))
562        d.addCallback(self.assertEquals, [(i, ) for i in
563                                         range(self.pool.min * 20)])
564        return d.addCallback(lambda _: self.pool.runOperation(
565                "drop table x"))
566
567    def test_interaction(self):
568        """
569        The pool's runInteraction works.
570        """
571        def interaction(c):
572            # cursors can only be declared in a transaction, so that's a good
573            # indication that we're in one
574            d = c.execute("declare x cursor for values (1), (2)")
575            d.addCallback(lambda c: c.execute("fetch 1 from x"))
576            d.addCallback(lambda c: self.assertEquals(c.fetchone()[0], 1))
577            d.addCallback(lambda _: c.execute("fetch 1 from x"))
578            d.addCallback(lambda c: self.assertEquals(c.fetchone()[0], 2))
579            return d
580
581        return defer.gatherResults([self.pool.runInteraction(interaction)
582                                    for _ in range(self.pool.min * 20)])
583
584
585class PGADBAPIConnectionPoolHotswappingTestCase(Psycopg2TestCase):
586
587    def test_errorsInInteractionHotswappingConnections(self):
588        """
589        After getting a RollbackFailed failure it is possible to remove the
590        offending connection from the pool, open a new one and put it in the
591        pool to replace the removed one.
592        """
593        pool = pgadbapi.ConnectionPool(
594            None, user=DB_USER, password=DB_PASS,
595            host=DB_HOST, database=DB_NAME, min=1)
596        self.assertEquals(pool.min, 1)
597        d = pool.start()
598
599        # poison the connection
600        c, = pool.connections
601        c.cursorFactory = NotRollingBackCursor
602
603        # run stuff that breaks
604        def brokenInteraction(c):
605            return c.execute("boom")
606        d.addCallback(lambda _: pool.runInteraction(brokenInteraction))
607        d.addCallback(lambda _: self.fail("No exception"))
608        def checkErrorAndHotswap(f):
609            f.trap(pgadbapi.RollbackFailed)
610            e = f.value
611            self.assertIdentical(e.connection.cursorFactory,
612                                 NotRollingBackCursor)
613            errors = self.flushLoggedErrors()
614            self.assertEquals(len(errors), 1)
615            self.assertEquals(errors[0].value.args[0], "boom")
616            pool.remove(e.connection)
617            e.connection.close()
618            c = pgadbapi.Connection()
619            self.assertNotIdentical(c.cursorFactory,
620                                    NotRollingBackCursor)
621            d = c.connect(user=DB_USER, password=DB_PASS,
622                          host=DB_HOST, database=DB_NAME)
623            return d.addCallback(lambda c: pool.add(c))
624        d.addErrback(checkErrorAndHotswap)
625
626        d.addCallback(lambda _: defer.gatherResults([
627                    pool.runQuery("select 1") for _ in range(3)]))
628        return d.addCallback(self.assertEquals, [[(1, )]] * 3)
629
630    def test_removeWhileBusy(self):
631        """
632        Removing a connection from the pool while it's running a query raises
633        an exception.
634        """
635        pool = pgadbapi.ConnectionPool(None, database="psycopg2_test", min=1)
636        d = pool.start()
637
638        def simple(c):
639            self.assertRaises(ValueError, pool.remove, c._connection)
640        return d.addCallback(lambda pool: pool.runInteraction(simple))