[Twisted-Python] Help with trial test failure

Benjamin BERTRAND beenje at gmail.com
Sun Aug 4 08:25:55 MDT 2013


Hi,

I'm trying to write a simple gateway to receive messages using a specific protocol and publish/store them using txredis.
I wrote a small example that seems to work.
But the small test I wrote fails:

$ trial gateway/test
gateway.test.test_example
  GatewayServiceTestCase
    test_messageReceived ...                                            [ERROR]

===============================================================================
[ERROR]
Traceback (most recent call last):
Failure: twisted.internet.error.ConnectionDone: Connection was closed cleanly.

gateway.test.test_example.GatewayServiceTestCase.test_messageReceived
-------------------------------------------------------------------------------
Ran 1 tests in 0.007s


As I understand, the connection to the redis server is lost during the test.
I actually managed to get the test to pass by adding some inlineCallbacks decorator to my messageReceived and lineReceived methods.
But I don't really understand why that would be needed.
Could someone explain what is happening?

Both version of the code can be found here: https://gist.github.com/beenje/6150400
(revision 1 with the problem and revision 2 with the inlineCallbacks)

Below is the original version with the problem:

Thanks

Benjamin

example.py
----------------------------------------------------------------------------------
import json
import time
from twisted.internet import defer
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import LineReceiver
from twisted.python import log


class BasicProtocol(LineReceiver):

    def lineReceived(self, line):
        self.messageReceived(line)

    def messageReceived(self, message):
        try:
            self.factory.messageReceived(message)
        except AttributeError:
            pass


class BasicGatewayFactory(ServerFactory):

    protocol = BasicProtocol

    def __init__(self, service, channel):
        self.service = service
        self.channel = channel

    def messageReceived(self, message):
        self.service.publish(self.channel, message)


class RedisPublishService(object):

    def __init__(self, factory):
        """
        @param factory: redis client factory
        """
        self.factory = factory

    @defer.inlineCallbacks
    def publish(self, channel, message):
        log.msg("Publish message {} on {}".format(message, channel))
        yield self.factory.client.publish(channel, message)
        timestamp = int(time.time() * 1000)
        # Include the timestamp in the value to allow
        # duplicate message
        value = json.dumps({"timestamp": timestamp, "message": message})
        log.msg("Store message in {} sorted set with score {}".format(
                channel, timestamp))
        # Set the timestamp as score to easily fetch the values within a
        # time period using zrangebyscore
        yield self.factory.client.zadd(channel, timestamp, value)


if __name__ == '__main__':
    import sys
    from twisted.internet import reactor
    from txredis.client import RedisClientFactory
    log.startLogging(sys.stdout)
    redis_factory = RedisClientFactory()
    reactor.connectTCP('localhost', 6379, redis_factory)
    redis_pub_service = RedisPublishService(redis_factory)
    gw_factory = BasicGatewayFactory(redis_pub_service, "test")
    reactor.listenTCP(8000, gw_factory)
    reactor.run()
----------------------------------------------------------------------------------

test_example.py
----------------------------------------------------------------------------------
from twisted.internet import reactor, defer, protocol
from twisted.python import log
from twisted.test import proto_helpers
from twisted.trial.unittest import TestCase
from txredis.client import RedisSubscriber, RedisClientFactory
from txredis.testing import REDIS_HOST, REDIS_PORT
from gateway.example import BasicGatewayFactory, RedisPublishService


class GatewayServiceTestCase(TestCase):

    @defer.inlineCallbacks
    def setUp(self):
        self.redis_factory = RedisClientFactory()
        reactor.connectTCP(REDIS_HOST, REDIS_PORT, self.redis_factory)
        yield self.redis_factory.deferred
        self.redis_pub_service = RedisPublishService(self.redis_factory)
        self.factory = BasicGatewayFactory(self.redis_pub_service, "test")
        self.server = self.factory.buildProtocol(None)
        self.transport = proto_helpers.StringTransportWithDisconnection()
        self.transport.protocol = self.server
        self.server.makeConnection(self.transport)

        class MySubscriber(RedisSubscriber):
            def __init__(self, *args, **kwargs):
                RedisSubscriber.__init__(self, *args, **kwargs)
                self.msg_channel = None
                self.msg_message = None
                self.msg_received = defer.Deferred()

            def messageReceived(self, channel, message):
                log.msg("Message received!")
                self.msg_channel = channel
                self.msg_message = message
                self.msg_received.callback(None)
                self.msg_received = defer.Deferred()

        clientCreator = protocol.ClientCreator(reactor, MySubscriber)
        self.subscriber = yield clientCreator.connectTCP(REDIS_HOST,
                                                         REDIS_PORT)
        yield self.subscriber.subscribe("test")

    def tearDown(self):
        self.subscriber.transport.loseConnection()
        self.redis_factory.continueTrying = 0
        self.redis_factory.stopTrying()
        if self.redis_factory.client:
            self.redis_factory.client.setTimeout(None)
            self.redis_factory.client.transport.loseConnection()
        self.transport.loseConnection()

    @defer.inlineCallbacks
    def test_messageReceived(self):
        cb = self.subscriber.msg_received
        self.server.dataReceived('HELLO1\r\n')
        yield cb
        self.assertEqual(self.subscriber.msg_channel, "test")
        self.assertEqual(self.subscriber.msg_message, "HELLO1")




More information about the Twisted-Python mailing list