Ticket #4397: test_pgadbapi.3.py

File test_pgadbapi.3.py, 21.2 KB (added by Jan Urbański, 11 years ago)

psycopg2 changed the "issync()" method to the "async" property

Line 
1# Copyright (c) 2010 Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Tests for twisted.enterprise.pgadbapi.
6"""
7
8try:
9    import psycopg2
10    import psycopg2.extensions
11except ImportError:
12    psycopg2 = None
13
14
15from twisted.enterprise import pgadbapi
16
17from twisted.trial import unittest
18from twisted.internet import defer
19
20simple_table_schema = """
21CREATE TABLE simple (
22  x integer
23)
24"""
25
26DB_NAME = "twisted_test"
27DB_HOST = "localhost"
28DB_USER = "twisted_test"
29DB_PASS = "twisted_test"
30
31
32def getSkipForPsycopg2():
33    if not psycopg2:
34        return "psycopg2 not installed"
35    try:
36        psycopg2.extensions.POLL_OK
37    except AttributeError:
38        return ("psycopg2 does not have async support "
39                "(what? you're not running git master?)")
40    try:
41        psycopg2.connect(user=DB_USER, password=DB_PASS,
42                         host=DB_HOST, database=DB_NAME).close()
43    except psycopg2.Error:
44        return "cannot connect to test database"
45    return None
46
47
48_skip = getSkipForPsycopg2()
49
50
51class Psycopg2TestCase(unittest.TestCase):
52
53    skip = _skip
54
55
56class PollableThing(object):
57    """
58    A fake thing that provides a psycopg2 pollable interface.
59    """
60    def __init__(self):
61        self.NEXT_STATE = psycopg2.extensions.POLL_READ
62
63    def poll(self):
64        if self.NEXT_STATE is None:
65            raise Exception("no next state")
66        return self.NEXT_STATE
67
68    def fileno(self):
69        return 42
70
71
72class FakeReactor(object):
73    """
74    A reactor that just counts how many things were added and removed.
75    """
76    readersAdded = 0
77    writersAdded = 0
78    readersRemoved = 0
79    writersRemoved = 0
80
81    def reset(self):
82        self.readersAdded = self.writersAdded = 0
83        self.readersRemoved = self.writersRemoved = 0
84    def addReader(self, _):
85        self.readersAdded += 1
86    def addWriter(self, _):
87        self.writersAdded += 1
88    def removeReader(self, _):
89        self.readersRemoved += 1
90    def removeWriter(self, _):
91        self.writersRemoved += 1
92
93
94class FakeWrapper(pgadbapi._PollingMixin):
95    """
96    A mock subclass of L{pgadbapi._PollingMixin}.
97    """
98    reactor = FakeReactor()
99    prefix = "fake-wrapper"
100
101    def pollable(self):
102        return self._pollable
103
104
105class PGADBAPIPollingMixingTestCase(Psycopg2TestCase):
106
107    def test_empty(self):
108        """
109        The default L{pgadbapi._PollingMixin} implementation raises an
110        exception on pollable().
111        """
112        p = pgadbapi._PollingMixin()
113        self.assertRaises(NotImplementedError, p.pollable)
114
115    def check(self, r, *args):
116        self.assertEquals(args, (r.readersAdded, r.writersAdded,
117                                 r.readersRemoved, r.writersRemoved))
118
119    def test_polling(self):
120        """
121        L{pgadbapi._PollingMixin} adds and removes itself from the reactor
122        according to poll() results from the wrapped pollable.
123        """
124        p = FakeWrapper()
125        p._pollable = PollableThing()
126        p.reactor.reset()
127
128        # start off with reading
129        p._pollable.NEXT_STATE = psycopg2.extensions.POLL_READ
130        d = p.poll()
131        # after the initial poll we should get a Deferred and one reader added
132        self.check(p.reactor, 1, 0, 0, 0)
133
134        p._pollable.NEXT_STATE = psycopg2.extensions.POLL_WRITE
135        p.doRead()
136        # the reader should get removed and a writer should get added, because
137        # we made the next poll() return POLL_WRITE
138        self.check(p.reactor, 1, 1, 1, 0)
139
140        p._pollable.NEXT_STATE = psycopg2.extensions.POLL_READ
141        p.doWrite()
142        # the writer is removed, a reader is added
143        self.check(p.reactor, 2, 1, 1, 1)
144
145        p._pollable.NEXT_STATE = psycopg2.extensions.POLL_READ
146        p.doRead()
147        # the reader is removed, but then readded because we returned POLL_READ
148        self.check(p.reactor, 3, 1, 2, 1)
149
150        p._pollable.NEXT_STATE = psycopg2.extensions.POLL_OK
151        p.doRead()
152        # we're done, the reader should just get removed
153        self.check(p.reactor, 3, 1, 3, 1)
154
155        # and the Deferred should succeed
156        return d
157
158    def test_interface(self):
159        """
160        L{pgadbapi._PollingMixin} correctly provides the
161        L{interfaces.IReadWriteDescriptor} interface.
162        """
163        p = FakeWrapper()
164        p._pollable = PollableThing()
165
166        self.assertEquals(p.fileno(), 42)
167        self.assertEquals(p.logPrefix(), "fake-wrapper")
168
169        p._pollable = None
170        self.assertEquals(p.fileno(), -1)
171
172    def test_connectionLost(self):
173        """
174        Calling connectionLost() errbacks the C{Deferred} returned from poll()
175        but another connectionLost() is harmless.
176        """
177        p = FakeWrapper()
178        p._pollable = PollableThing()
179
180        d = p.poll()
181        p.connectionLost(RuntimeError("boom"))
182        d = self.assertFailure(d, RuntimeError)
183        p.connectionLost(RuntimeError("bam"))
184        return d
185
186    def test_errors(self):
187        """
188        Unexpected results from poll() make L{pgadbapi._PollingMixin} raise an
189        exception.
190        """
191        p = FakeWrapper()
192        p._pollable = PollableThing()
193
194        p._pollable.NEXT_STATE = "foo"
195        d = p.poll()
196        return self.assertFailure(d, pgadbapi.UnexpectedPollResult)
197
198
199class PGADBAPIConnectionTestCase(Psycopg2TestCase):
200
201    def test_simpleConnection(self):
202        """
203        Just connecting and disconnecting works.
204        """
205        conn = pgadbapi.Connection()
206        d = conn.connect(user=DB_USER, password=DB_PASS,
207                         host=DB_HOST, database=DB_NAME)
208        d.addCallback(lambda c: c.close())
209        return d
210
211    def test_connectionSetup(self):
212        """
213        The created connection should be asynchronous and in autocommit mode
214        and the C{Deferred} returned from connect() should fire with the
215        connection itself.
216        """
217        conn = pgadbapi.Connection()
218        d = conn.connect(user=DB_USER, password=DB_PASS,
219                         host=DB_HOST, database=DB_NAME)
220
221        def doChecks(c):
222            self.assertIdentical(c, conn)
223            self.assertTrue(c.async)
224            self.assertEquals(c.isolation_level, 0)
225            return c
226        d.addCallback(doChecks)
227        return d.addCallback(lambda c: c.close())
228
229    def test_multipleConnections(self):
230        """
231        Trying to connect twice raises an exception, but after closing you can
232        connect again.
233        """
234        conn = pgadbapi.Connection()
235        d = conn.connect(user=DB_USER, password=DB_PASS,
236                         host=DB_HOST, database=DB_NAME)
237
238        d.addCallback(lambda c: conn.connect(
239                user=DB_USER, password=DB_PASS,
240                host=DB_HOST, database=DB_NAME))
241        d = self.failUnlessFailure(d, pgadbapi.AlreadyConnected)
242
243        d.addCallback(lambda _: conn.close())
244        d.addCallback(lambda _: conn.connect(
245                user=DB_USER, password=DB_PASS,
246                host=DB_HOST, database=DB_NAME))
247        return d.addCallback(lambda c: c.close())
248
249    def test_errors(self):
250        """
251        Errors from psycopg2's poll() make connect() return failures. Errors on
252        creating the psycopg2 connection too. Unexpected results from poll()
253        also make connect() return a failure.
254        """
255        conn = pgadbapi.Connection()
256        class BadPollable(object):
257            def __init__(*args, **kwars):
258                pass
259            def poll(self):
260                raise RuntimeError("booga")
261            def close(self):
262                pass
263        conn.connectionFactory = BadPollable
264
265        d = conn.connect()
266        d = self.assertFailure(d, RuntimeError)
267        d.addCallback(lambda _: conn.close())
268
269        class BadThing(object):
270            def __init__(*args, **kwargs):
271                raise RuntimeError("wooga")
272            def close(self):
273                pass
274        conn.connectionFactory = BadThing
275
276        d.addCallback(lambda _: conn.connect())
277        d = self.assertFailure(d, RuntimeError)
278
279        class BrokenPollable(object):
280            def __init__(*args, **kwars):
281                pass
282            def poll(self):
283                return "tee hee hee"
284            def close(self):
285                pass
286        conn.connectionFactory = BrokenPollable
287
288        d.addCallback(lambda _: conn.connect())
289        return self.assertFailure(d, pgadbapi.UnexpectedPollResult)
290
291
292class _SimpleDBSetupMixin(object):
293
294    def setUp(self):
295        self.conn = pgadbapi.Connection()
296        d = self.conn.connect(user=DB_USER, password=DB_PASS,
297                              host=DB_HOST, database=DB_NAME)
298        d.addCallback(lambda c: c.cursor())
299        return d.addCallback(lambda c: c.execute(simple_table_schema))
300
301    def tearDown(self):
302        c = self.conn.cursor()
303        d = c.execute("drop table simple")
304        return d.addCallback(lambda _: self.conn.close())
305
306
307
308class PGADBAPIManualQueryTestCase(_SimpleDBSetupMixin, Psycopg2TestCase):
309
310    def test_simpleQuery(self):
311        """
312        A simple select works.
313        """
314        c = self.conn.cursor()
315        return c.execute("select * from simple")
316
317    def test_simpleCallproc(self):
318        """
319        A simple procedure call works.
320        """
321        c = self.conn.cursor()
322        return c.callproc("now")
323
324    def test_closeCursor(self):
325        """
326        Closing the cursor works.
327        """
328        c = self.conn.cursor()
329        d = c.execute("select 1")
330        return d.addCallback(lambda c: c.close())
331
332    def test_queryResults(self):
333        """
334        Query results are obtainable from the asynchronous cursor.
335        """
336        c = self.conn.cursor()
337        d = defer.Deferred()
338        d.addCallback(
339            lambda c: c.execute("insert into simple values (%s)", (1, )))
340        d.addCallback(
341            lambda c: c.execute("insert into simple values (%s)", (2, )))
342        d.addCallback(
343            lambda c: c.execute("insert into simple values (%s)", (3, )))
344        d.addCallback(
345            lambda c: c.execute("select * from simple"))
346        d.addCallback(
347            lambda c: self.assertEquals(c.fetchall(), [(1, ), (2, ), (3, )]))
348        d.callback(c)
349        return d
350
351    def test_multipleQueries(self):
352        """
353        Multiple calls to execute() without waiting for the previous one to
354        finish work and return correct results.
355        """
356        cursors = [self.conn.cursor() for _ in range(5)]
357        d = defer.gatherResults([c.execute("select %s", (i, ))
358                                 for i, c in enumerate(cursors)])
359        d.addCallback(
360            lambda cursors: self.assertEquals(
361                sorted(map(lambda c: c.fetchone()[0], cursors)),
362                [0, 1, 2, 3, 4]))
363        return d
364
365    def test_errors(self):
366        """
367        Errors from the database are reported as failures.
368        """
369        c = self.conn.cursor()
370        d = c.execute("select * from nonexistent")
371        return self.assertFailure(d, psycopg2.ProgrammingError)
372
373    def test_wrongCall(self):
374        """
375        Errors raised inside psycopg2 are reported as failures.
376        """
377        c = self.conn.cursor()
378        d = c.execute("select %s", "whoops")
379        return self.assertFailure(d, TypeError)
380
381    def test_manualTransactions(self):
382        """
383        Transactions can be constructed manually by issuing BEGIN and ROLLBACK
384        as appropriate, and should work.
385        """
386        c = self.conn.cursor()
387        d = defer.Deferred()
388        d.addCallback(
389            lambda c: c.execute("begin"))
390        d.addCallback(
391            lambda c: c.execute("insert into simple values (%s)", (1, )))
392        d.addCallback(
393            lambda c: c.execute("insert into simple values (%s)", (2, )))
394        d.addCallback(
395            lambda c: c.execute("select * from simple order by x"))
396        d.addCallback(
397            lambda c: self.assertEquals(c.fetchall(), [(1, ), (2, )]))
398        d.addCallback(
399            lambda _: c.execute("rollback"))
400        d.addCallback(
401            lambda c: c.execute("select * from simple"))
402        d.addCallback(
403            lambda c: self.assertEquals(c.fetchall(), []))
404        d.callback(c)
405        return d
406
407
408class NotRollingBackCursor(pgadbapi.Cursor):
409    """
410    A cursor that does not like rolling back.
411    """
412    def _doit(self, name, *args, **kwargs):
413        if name == "execute" and args == ("rollback", None):
414            raise RuntimeError("boom")
415        return pgadbapi.Cursor._doit(self, name, *args, **kwargs)
416
417
418class PGADBAPIQueryTestCase(_SimpleDBSetupMixin, Psycopg2TestCase):
419
420    def test_runQuery(self):
421        """
422        runQuery() works and returns the result.
423        """
424        d = self.conn.runQuery("select 1")
425        return d.addCallback(self.assertEquals, [(1, )])
426
427    def test_runOperation(self):
428        """
429        runOperation() works and executes the operation while returning None.
430        """
431        d = self.conn.runQuery("select count(*) from simple")
432        d.addCallback(self.assertEquals, [(0, )])
433
434        d.addCallback(lambda _: self.conn.runOperation(
435                "insert into simple values (%s)", (1, )))
436        d.addCallback(self.assertIdentical, None)
437
438        d.addCallback(lambda _: self.conn.runQuery(
439                    "select count(*) from simple"))
440        return d.addCallback(self.assertEquals, [(1, )])
441
442    def test_runSimpleInteraction(self):
443        """
444        Interactions are being run in a transaction, the parameters from
445        runInteraction are being passed to them and they are being committed
446        after they return. Their return value becomes the return value of the
447        Deferred from runInteraction.
448        """
449        def interaction(c, arg1, kwarg1):
450            self.assertEquals(arg1, "foo")
451            self.assertEquals(kwarg1, "bar")
452            d = c.execute("insert into simple values (1)")
453            d.addCallback(lambda c: c.execute("insert into simple values (2)"))
454            return d.addCallback(lambda _: "interaction done")
455
456        d = self.conn.runInteraction(interaction, "foo", kwarg1="bar")
457
458        d.addCallback(self.assertEquals, "interaction done")
459
460        d.addCallback(lambda _: self.conn.runQuery(
461                "select * from simple order by x"))
462        return d.addCallback(self.assertEquals, [(1, ), (2, )])
463
464    def test_runErrorInteraction(self):
465        """
466        Interactions that produce errors are rolled back and the correct error
467        is reported.
468        """
469        def interaction(c):
470            d = c.execute("insert into simple values (1)")
471            return d.addCallback(
472                lambda c: c.execute("select * from nope_not_here"))
473
474        d = self.conn.runInteraction(interaction)
475        d = self.assertFailure(d, psycopg2.ProgrammingError)
476
477        d.addCallback(lambda _: self.conn.runQuery(
478                "select count(*) from simple"))
479        return d.addCallback(self.assertEquals, [(0, )])
480
481    def test_errorOnRollback(self):
482        """
483        Interactions that produce errors and are unable to roll back return a
484        L{pgadbapi.RollbackFailed} failure that has references to the faulty
485        connection and the original failure that cause all that trouble.
486        """
487        def interaction(c):
488            d = c.execute("insert into simple values (1)")
489            return d.addCallback(
490                lambda c: c.execute("select * from nope_not_here"))
491
492        mp = self.patch(self.conn, 'cursorFactory', NotRollingBackCursor)
493
494        d = self.conn.runInteraction(interaction)
495        d.addCallback(lambda _: self.fail("No exception"))
496
497        def check_error(f):
498            f.trap(pgadbapi.RollbackFailed)
499            original = f.value.originalFailure
500            # original should reference the error that started all the mess
501            self.assertIsInstance(original.value,
502                                  psycopg2.ProgrammingError)
503            self.assertEquals(
504                str(f.value),
505                "<RollbackFailed, original error: %s>" % original)
506            # the error from the failed rollback should get logged
507            errors = self.flushLoggedErrors()
508            self.assertEquals(len(errors), 1)
509            self.assertEquals(errors[0].value.args[0], "boom")
510            # restore or we won't be able to clean up the mess
511            mp.restore()
512        d.addErrback(check_error)
513
514        # rollback for real, or tearDown won't be able to drop the table
515        return d.addCallback(lambda _: self.conn.runOperation("rollback"))
516
517
518class PGADBAPIConnectionPoolTestCase(Psycopg2TestCase):
519
520    def setUp(self):
521        self.pool = pgadbapi.ConnectionPool(
522            None, user=DB_USER, password=DB_PASS,
523            host=DB_HOST, database=DB_NAME)
524        return self.pool.start()
525
526    def tearDown(self):
527        return self.pool.close()
528
529    def test_basics(self):
530        """
531        Exactly 'min' connections are always created.
532        """
533        self.assertEquals(len(self.pool.connections), self.pool.min)
534
535    def test_simpleQuery(self):
536        """
537        The pool can run 'min' queries in parallel without making any of them
538        wait. The queries return correct values.
539        """
540        ds = [self.pool.runQuery("select 1") for _ in range(self.pool.min)]
541        self.assertEquals(len(self.pool._semaphore.waiting), 0)
542
543        d = defer.gatherResults(ds)
544        return d.addCallback(self.assertEquals, [[(1, )]] * self.pool.min)
545
546    def test_moreQueries(self):
547        """
548        The pool can handle more parallel queries than its size.
549        """
550        d = defer.gatherResults(
551            [self.pool.runQuery("select 1") for _ in range(self.pool.min * 5)])
552        return d.addCallback(self.assertEquals, [[(1, )]] * self.pool.min * 5)
553
554    def test_operation(self):
555        """
556        The pool's runOperation works.
557        """
558        d = self.pool.runOperation("create table x (i int)")
559        # give is a workout, 20 x the number of connections
560        d.addCallback(lambda _: defer.gatherResults(
561                [self.pool.runOperation("insert into x values (%s)", (i, ))
562                 for i in range(self.pool.min * 20)]))
563        d.addCallback(lambda _: self.pool.runQuery(
564                "select * from x order by i"))
565        d.addCallback(self.assertEquals, [(i, ) for i in
566                                         range(self.pool.min * 20)])
567        return d.addCallback(lambda _: self.pool.runOperation(
568                "drop table x"))
569
570    def test_interaction(self):
571        """
572        The pool's runInteraction works.
573        """
574        def interaction(c):
575            # cursors can only be declared in a transaction, so that's a good
576            # indication that we're in one
577            d = c.execute("declare x cursor for values (1), (2)")
578            d.addCallback(lambda c: c.execute("fetch 1 from x"))
579            d.addCallback(lambda c: self.assertEquals(c.fetchone()[0], 1))
580            d.addCallback(lambda _: c.execute("fetch 1 from x"))
581            d.addCallback(lambda c: self.assertEquals(c.fetchone()[0], 2))
582            return d
583
584        return defer.gatherResults([self.pool.runInteraction(interaction)
585                                    for _ in range(self.pool.min * 20)])
586
587
588class PGADBAPIConnectionPoolHotswappingTestCase(Psycopg2TestCase):
589
590    def test_errorsInInteractionHotswappingConnections(self):
591        """
592        After getting a RollbackFailed failure it is possible to remove the
593        offending connection from the pool, open a new one and put it in the
594        pool to replace the removed one.
595        """
596        pool = pgadbapi.ConnectionPool(
597            None, user=DB_USER, password=DB_PASS,
598            host=DB_HOST, database=DB_NAME, min=1)
599        self.assertEquals(pool.min, 1)
600        d = pool.start()
601
602        # poison the connection
603        c, = pool.connections
604        c.cursorFactory = NotRollingBackCursor
605
606        # run stuff that breaks
607        def brokenInteraction(c):
608            return c.execute("boom")
609        d.addCallback(lambda _: pool.runInteraction(brokenInteraction))
610        d.addCallback(lambda _: self.fail("No exception"))
611        def checkErrorAndHotswap(f):
612            f.trap(pgadbapi.RollbackFailed)
613            e = f.value
614            self.assertIdentical(e.connection.cursorFactory,
615                                 NotRollingBackCursor)
616            errors = self.flushLoggedErrors()
617            self.assertEquals(len(errors), 1)
618            self.assertEquals(errors[0].value.args[0], "boom")
619            pool.remove(e.connection)
620            e.connection.close()
621            c = pgadbapi.Connection()
622            self.assertNotIdentical(c.cursorFactory,
623                                    NotRollingBackCursor)
624            d = c.connect(user=DB_USER, password=DB_PASS,
625                          host=DB_HOST, database=DB_NAME)
626            return d.addCallback(lambda c: pool.add(c))
627        d.addErrback(checkErrorAndHotswap)
628
629        d.addCallback(lambda _: defer.gatherResults([
630                    pool.runQuery("select 1") for _ in range(3)]))
631        return d.addCallback(self.assertEquals, [[(1, )]] * 3)
632
633    def test_removeWhileBusy(self):
634        """
635        Removing a connection from the pool while it's running a query raises
636        an exception.
637        """
638        pool = pgadbapi.ConnectionPool(None, database="psycopg2_test", min=1)
639        d = pool.start()
640
641        def simple(c):
642            self.assertRaises(ValueError, pool.remove, c._connection)
643        return d.addCallback(lambda pool: pool.runInteraction(simple))