[Twisted-Python] Help with trial test failure
Benjamin BERTRAND
beenje at gmail.com
Sun Aug 4 08:25:55 MDT 2013
Hi,
I'm trying to write a simple gateway to receive messages using a specific protocol and publish/store them using txredis.
I wrote a small example that seems to work.
But the small test I wrote fails:
$ trial gateway/test
gateway.test.test_example
GatewayServiceTestCase
test_messageReceived ... [ERROR]
===============================================================================
[ERROR]
Traceback (most recent call last):
Failure: twisted.internet.error.ConnectionDone: Connection was closed cleanly.
gateway.test.test_example.GatewayServiceTestCase.test_messageReceived
-------------------------------------------------------------------------------
Ran 1 tests in 0.007s
As I understand, the connection to the redis server is lost during the test.
I actually managed to get the test to pass by adding some inlineCallbacks decorator to my messageReceived and lineReceived methods.
But I don't really understand why that would be needed.
Could someone explain what is happening?
Both version of the code can be found here: https://gist.github.com/beenje/6150400
(revision 1 with the problem and revision 2 with the inlineCallbacks)
Below is the original version with the problem:
Thanks
Benjamin
example.py
----------------------------------------------------------------------------------
import json
import time
from twisted.internet import defer
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import LineReceiver
from twisted.python import log
class BasicProtocol(LineReceiver):
def lineReceived(self, line):
self.messageReceived(line)
def messageReceived(self, message):
try:
self.factory.messageReceived(message)
except AttributeError:
pass
class BasicGatewayFactory(ServerFactory):
protocol = BasicProtocol
def __init__(self, service, channel):
self.service = service
self.channel = channel
def messageReceived(self, message):
self.service.publish(self.channel, message)
class RedisPublishService(object):
def __init__(self, factory):
"""
@param factory: redis client factory
"""
self.factory = factory
@defer.inlineCallbacks
def publish(self, channel, message):
log.msg("Publish message {} on {}".format(message, channel))
yield self.factory.client.publish(channel, message)
timestamp = int(time.time() * 1000)
# Include the timestamp in the value to allow
# duplicate message
value = json.dumps({"timestamp": timestamp, "message": message})
log.msg("Store message in {} sorted set with score {}".format(
channel, timestamp))
# Set the timestamp as score to easily fetch the values within a
# time period using zrangebyscore
yield self.factory.client.zadd(channel, timestamp, value)
if __name__ == '__main__':
import sys
from twisted.internet import reactor
from txredis.client import RedisClientFactory
log.startLogging(sys.stdout)
redis_factory = RedisClientFactory()
reactor.connectTCP('localhost', 6379, redis_factory)
redis_pub_service = RedisPublishService(redis_factory)
gw_factory = BasicGatewayFactory(redis_pub_service, "test")
reactor.listenTCP(8000, gw_factory)
reactor.run()
----------------------------------------------------------------------------------
test_example.py
----------------------------------------------------------------------------------
from twisted.internet import reactor, defer, protocol
from twisted.python import log
from twisted.test import proto_helpers
from twisted.trial.unittest import TestCase
from txredis.client import RedisSubscriber, RedisClientFactory
from txredis.testing import REDIS_HOST, REDIS_PORT
from gateway.example import BasicGatewayFactory, RedisPublishService
class GatewayServiceTestCase(TestCase):
@defer.inlineCallbacks
def setUp(self):
self.redis_factory = RedisClientFactory()
reactor.connectTCP(REDIS_HOST, REDIS_PORT, self.redis_factory)
yield self.redis_factory.deferred
self.redis_pub_service = RedisPublishService(self.redis_factory)
self.factory = BasicGatewayFactory(self.redis_pub_service, "test")
self.server = self.factory.buildProtocol(None)
self.transport = proto_helpers.StringTransportWithDisconnection()
self.transport.protocol = self.server
self.server.makeConnection(self.transport)
class MySubscriber(RedisSubscriber):
def __init__(self, *args, **kwargs):
RedisSubscriber.__init__(self, *args, **kwargs)
self.msg_channel = None
self.msg_message = None
self.msg_received = defer.Deferred()
def messageReceived(self, channel, message):
log.msg("Message received!")
self.msg_channel = channel
self.msg_message = message
self.msg_received.callback(None)
self.msg_received = defer.Deferred()
clientCreator = protocol.ClientCreator(reactor, MySubscriber)
self.subscriber = yield clientCreator.connectTCP(REDIS_HOST,
REDIS_PORT)
yield self.subscriber.subscribe("test")
def tearDown(self):
self.subscriber.transport.loseConnection()
self.redis_factory.continueTrying = 0
self.redis_factory.stopTrying()
if self.redis_factory.client:
self.redis_factory.client.setTimeout(None)
self.redis_factory.client.transport.loseConnection()
self.transport.loseConnection()
@defer.inlineCallbacks
def test_messageReceived(self):
cb = self.subscriber.msg_received
self.server.dataReceived('HELLO1\r\n')
yield cb
self.assertEqual(self.subscriber.msg_channel, "test")
self.assertEqual(self.subscriber.msg_message, "HELLO1")
More information about the Twisted-Python
mailing list