[Twisted-Python] Factory question
Gabriel Rossetti
mailing_lists at evotex.ch
Thu Feb 28 11:14:29 EST 2008
Drew Smathers wrote:
> On Thu, Feb 28, 2008 at 9:42 AM, Gabriel Rossetti
> <mailing_lists at evotex.ch> wrote:
>
>> Drew Smathers wrote:
>> > On Wed, Feb 27, 2008 at 3:32 AM, Gabriel Rossetti
>> > <mailing_lists at evotex.ch> wrote:
>> >
>> >> Hello everyone,
>> >>
>> >> I have a small question, I have a service which needs to sometimes send
>> >> data (without having received any prior to sending) and sometimes
>> >> receive data, which is better :
>> >>
>> >> 1) create a factory that inherits from ServerFactory and ClientFactory,
>> >> thus it can listen and send data
>> >>
>> >> 2) create a factory that inherits from ServerFactory only and uses a
>> >> single-use client (ClientCreator, as shown in the writing clients howto)
>> >> when it needs to send data
>> >>
>> >>
>> >
>> > I'm not sure of there's a single right way to do it, but I wouldn't
>> > bother inheriting from both ClientFactory and ServerFactory. I think
>> > you're on the write track with 2, though.
>> >
>> >
>> I had taken route 1 up until now (I'm thinking about switching...)
>>
>> Maybe there's something I haven't quite gotten, when ClientA initially
>> connects to the server, the factory creates an instance of the protocol,
>> correct?
>>
>
> Yes.
>
>
Ok
>> Now ClientA sends some data to the server, which processes it
>> and sends something back. After that, the TCP session ends, and the
>> client disconnects, and the protocols instance dies. Is this correct or
>> does it live on and get reused somehow?
>>
>
> The protocol instance does not get reused.
>
>
Ok, so every time there is data exchanged (new tcp/ip session) then a
new protocol instance is created. Any persistence/state data must
therefore be stored in the factory if I understand correctly.
>> I ask this because since
>> initially the clients send data to the server (registration), the server
>> will there after send data to the clients. This makes the client have to
>> connect to the server initially though a port using reactor.connectTCP()
>> and listen to a port (that the server now knows since the client
>> registered itself) using reactor.listenTCP().
>>
>
> This is might be a bad idea - depending on the locality of your
> servers and clients. Why not just use the established connection?
The idea is to have services register on a central server then they
transmit messages to the daemon which routes/relays them to the correct
service, sort of like a micro-kernel. So, sometimes the services
initiate the communication process and sometimes they don't, the central
server does. This makes the services be servers and clients. Imagine this :
service1 has some data that needs to be processed by service2 (which
will in turn send it to another service), is sends it to the central
server, which sends it to service2. Service2 does whatever it has to do,
and then sends it to the central server to route to service_n.
So the established connection is usually useless, except for sending
some sort of ACK maybe.
> If
> the *client* is listening on a port then it isn't just a client - it's
> a server, or a peer in a clustered system.
>
>
I guess it's/they a sort of peer(s) in a clustered system, it/they may
reside on the same machine as the central server or not.
>> I think I have to use
>> reactor.connectTCP() instead of ClientCreator since the connection has
>> to happen at the beginning and a transport needs to exist before I can
>> send anything. Well...now that I think about it, I could have the
>> factory register the client...... that would make me not have to inherit
>> from the Client factory.... (I've now switched to solution 2, see last
>> part of this email).
>>
>>
>
> Ok.
>
>
>>> In most use cases you shouldn't have to create custom factories.
>>>
>> I have to since I need a non-reconnecting client xmlstream, and the
>> factory used with xmlstreams is a reconnecting client.
>>
>>
>>> Just
>>>
>> > write the protocol to support bidirectional comm and to create the
>> > server:
>> >
>> > f = Factory()
>> > f.protocol = YourBidirectionalProtocol
>> >
>> > Regarding the client, how you implement it depends on whether or not
>> > the server is establishing the connection vs. reusing the existing
>> > connection.
>> What exactly do you mean by reusing an existing connection?
>>
>>
>>> If you're establishing the connection (like in a cluster
>>>
>> > app with known peers), just use ClientCreator. If you're reusing the
>> > existing connection, then you might not have to anything, unless you
>> > have some state to set up which could be done by overriding
>> > connectionMade on your Protocol.
>> >
>> > Finally, take everything I've stated above with a grain of salt.
>> >
>> >
>> Thanks, I find it fairly hard to get used to Twisted, I wanted to buy
>> the book, but it was written in 2005 and I'm not sure if it's still
>> valid with today's version.
>>
>>
>
> The book it is not up to date.
>
>
>> BTW, any idea why I'm getting this type of behavior (one server, 3
>> distinct connections from clients) :
>>
>>
>
> Without seeing your code, no.
>
>
ok, it's a bit long.....
-------------------------"Central server" aka
Daemon---------------------------------------------------------------------------
class MdfXmlStreamFactory(XmlStreamFactoryMixin):
"""
The factory class used by the daemon and services to create
protocol instances
"""
def __init__(self, proto, *args, **kwargs):
"""
Constructor
@param proto: the protocol to use
@type proto: a subclass of
L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
@param args: misc args
@type args: C{tuple}
@param kwargs: misc keyword args
@type kwargs: C{dict}
"""
XmlStreamFactoryMixin.__init__(self)
self.args = args
self.kwargs = kwargs
self.protocol = proto
def buildProtocol(self, addr):
"""
Builds the protocol and
@param addr: The address (protocol, IP, port) of the connection
@type addr:
L{IPv4Address<twisted.internet.address._ServerFactoryIPv4Address>}
@return: an instance of the built protocol
"""
#self.resetDelay()
xs = self.protocol(*self.args, **self.kwargs)
xs.factory = self
self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
xs.connected) # stream connect event or xml start event???
for event, fn in self.bootstraps:
xs.addObserver(event, fn)
return xs
class MdfXmlStreamServerFactory(MdfXmlStreamFactory, ServerFactory):
"""
The factory class used by the daemon to create
protocol instances
"""
# The registered services
_services = {}
def __init__(self, proto, *args, **kwargs):
"""
Constructor
@param proto: the protocol to use
@type proto: a subclass of
L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
@param args: misc args
@type args: C{tuple}
@param kwargs: misc keyword args
@type kwargs: C{dict}
"""
MdfXmlStreamFactory.__init__(self, proto, *args, **kwargs)
class Daemon(xmlstream.XmlStream):
"""
The daemon is the implementation of a microkernel type inter-service
communication (ISC) routing daemon. Here is how it works :
- Services announce their presence to the daemon by giving their
name,
version, ip, port and a list of message-types that they accept
- The daemon listens for messages from the attached services,
when one
is received, it routes the message to the correct service
@todo: add unique id generation/verification
"""
# Holds the real method
__dataReceived = xmlstream.XmlStream.dataReceived
# The registered services
#__services = {}
cnt = 1
def __init__(self, *args, **kwargs):
"""
Constructor
@param args: non-keyword args
@type args: C{tuple}
@param kwargs: keyword args
@type kwargs: C{dict}
"""
xmlstream.XmlStream.__init__(self)
self.__routeTo = None
self.__lastMsgType = None
self.__lastMsgId = None
self.inst = Daemon.cnt
Daemon.cnt += 1
print "daemon proto instance %d" % self.inst
def connectionMade(self):
xmlstream.XmlStream.connectionMade(self)
def dataReceived(self, data):
"""
Called everytime data is received
@param data: the data received
@type data: C{object} (anything)
"""
self.__dataReceived(data)
def connectionLost(self, reason):
"""
Called when the connection is shut down, restores the
dataReceived method
@param reason: the reason why the connection was lost
@type reason: C{str}
"""
self.__dataReceived = xmlstream.XmlStream.dataReceived
self.__routeTo = None
xmlstream.XmlStream.connectionLost(self, reason)
def __onHeader(self, element):
"""
Analyse a header and set the data's recipiant
@param element: the header element (XML)
@type element: L{Element<twisted.words.xish.domish.Element>}
"""
print "got header from %s:%s : %s" %
(str(self.transport.getPeer().host), str(self.transport.getPeer().port),
element.toXml())
self.__lastMsgId = element.getAttribute("id")
self.__lastMsgType = element.getAttribute("type")
if(self.__lastMsgType != constants._REG_MSG_TYPE):
self.__routeTo = self.factory._services[type]
self.__dataReceived = __routeDataReceived
def __onReg(self, element):
"""
Register a service
@param element: the registeration element (XML)
@type element: L{Element<twisted.words.xish.domish.Element>}
"""
print "got reg from %s:%s : %s" %
(str(self.transport.getPeer().host), str(self.transport.getPeer().port),
element.toXml())
name =
str(xpath.XPathQuery("/body/reg/name").queryForNodes(element)[0])
version =
str(xpath.XPathQuery("/body/reg/version").queryForNodes(element)[0])
#address =
str(xpath.XPathQuery("/body/reg/address").queryForNodes(element)[0])
port =
int(str(xpath.XPathQuery("/body/reg/port").queryForNodes(element)[0]))
msgs = [ str(m) for m in
xpath.XPathQuery("/body/reg/message_type").queryForNodes(element) ]
address = self.transport.getPeer().host
#port = self.transport.getPeer().port
serv = ServiceReg(name, version, msgs, address, port)
self.__registerService(serv)
def connected(self, xs):
"""
Called when a client connects using an XML stream
@param xs: the current xml stream
@type xs: L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
"""
print 'Connection from %s:%s!' %
(str(self.transport.getPeer().host), str(self.transport.getPeer().port))
xs.addObserver("/header", self.__onHeader)
xs.addObserver("/body/reg", self.__onReg)
def __routeDataReceived(self, data):
"""
Pushes the messages to the correct service
@param data: the data received
@type data: C{object} (anything)
"""
print "route '%s' to : %s" % (str(data), self.__routeTo)
utils.sendMessage(self.__routeTo.ip, self.__routeTo.port, data)
#self.send(data)
def __registerService(self, service):
"""
Register a service
@param service: the service to register
@type service: L{ServiceReg}
@raise ServiceMessageConflictError: if another service already
has a
message registered that the
current
service is trying to register
@todo: what is to be done with the exception once raised????
Finish status message...
"""
def foundConflict(self, msgTypes):
"""
Check if there is a conflict with message types to be registered
by this service
@param service: the service messages to check for conflicts
@type service: C{str}
@return: the conflicting service type or None if no
conflict is found
"""
for mt in msgTypes:
if(self.factory._services.has_key(mt)):
return mt
return None
print "Registering service : ", str(service)
try:
#
# Check if another service already registered a message type
that
# the current service is trying to register
#
conflict = foundConflict(self, service.acceptedMsgs)
if(conflict != None):
raise ServiceMessageConflictError(conflict)
#
# Regrister the message types and this service
#
for msgType in service.acceptedMsgs:
self.factory._services[msgType] = service
except ServiceMessageConflictError, reason:
status =
utils.createConfirmationMsgBody(constants._MSG_FAILURE_TYPE,
self.__lastMsgId,
str(reason))
else:
status =
utils.createConfirmationMsgBody(constants._MSG_SUCCESS_TYPE,
self.__lastMsgId)
#
# Send registeration confirmation (succeeded or failed)
#
msgRoot = utils.createMessage(constants._REG_MSG_CONFIRM_TYPE,
constants._CONF_MSG_ID,
constants._DAEMON_SERVICE_NAME,
constants._MSG_SPEC_VERSION,
constants._STATUS_DATA_TYPE, status)
print "Sending confirmation message to %s : %s" %
(self.transport.getPeer().host, msgRoot.toXml())
#self.send(msgRoot)
utils.sendMessage(self.transport.getPeer().host, service.port,
msgRoot)
if(__name__ == "__main__"):
reactor.listenTCP(4321, MdfXmlStreamServerFactory(Daemon))
print "Listening for connections..."
reactor.run()
----------------------------------------"Service"-----------------------------------------------------------------------------------------------------
class MdfXmlStreamClientFactory(MdfXmlStreamServerFactory):
"""
The factory class used by the services to create
protocol instances
@attention: this class might dissapear, I have to see if it's useful
to keep it or not
"""
__daemonAddrs = None
__daemonPort = None
def __init__(self, proto, *args, **kwargs):
"""
Constructor
@param proto: the protocol to use
@type proto: a subclass of
L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
@param args: misc args
@type args: C{tuple}
@param kwargs: misc keyword args
@type kwargs: C{dict}
"""
MdfXmlStreamServerFactory.__init__(self, proto, *args, **kwargs)
self._serviceInfo = ServiceReg(kwargs["name"],
kwargs["version"], list(kwargs.get("messageTypes", [])))
MdfXmlStreamClientFactory.__daemonAddrs = kwargs["address"]
MdfXmlStreamClientFactory.__daemonPort = kwargs["port"]
def register(self, port):
"""
Register the service
@param port: the service's port
@type port: C{int}
"""
self._serviceInfo.port = port
msgBodyData = utils.createRegMsgBody(self._serviceInfo.name,
self._serviceInfo.version,
str(self._serviceInfo.port),
self._serviceInfo.acceptedMsgs)
msgRoot = utils.createMessage(constants._REG_MSG_TYPE,
constants._REG_MSG_ID,
self._serviceInfo.name,
constants._MSG_SPEC_VERSION,
constants._REG_DATA_TYPE, msgBodyData)
utils.sendMessage(MdfXmlStreamClientFactory.__daemonAddrs,
MdfXmlStreamClientFactory.__daemonPort,
msgRoot)
class BaseService(xmlstream.XmlStream):
"""
The service is the implementation of a microkernel type inter-service
communication (ISC) endpoint. Here is how it works :
- Services announce their presence to the daemon by giving their
name,
version, ip, port and a list of message-types that they accept
- The daemon listens for messages from the attached services,
when one
is received, it routes the message to the correct service
@todo: add unique id generation/verification
"""
def __init__(self, *args, **kwargs):
"""
Constructor
@param args: non-keyword args
@type args: C{tuple}
@param kwargs: keyword args
@type kwargs: C{dict}
"""
xmlstream.XmlStream.__init__(self)
self._msgSrc = None
self._msgDest = None
self._msgBodyData = None
self._registered = False
def _onHeader(self, element):
"""
Analyse a header and save the source and destination
@param element: the header element (XML)
@type element: L{Element<twisted.words.xish.domish.Element>}
@todo: add msg spec version verification
@todo: add id verification???
"""
print "got header from %s:%s : %s" %
(str(self.transport.getPeer().host), str(self.transport.getPeer().port),
element.toXml())
self._msgSrc =
xpath.XPathQuery("/header").queryForNodes(element)[0].getAttribute("source")
self._msgDest =
xpath.XPathQuery("/header").queryForNodes(element)[0].getAttribute("destination")
def _onBody(self, element):
"""
Get the body act accordingly
@param element: the body element (XML)
@type element: L{Element<twisted.words.xish.domish.Element>}
@todo: add data type verification
@todo: call data action callbacks
"""
self._msgBodyData =
xpath.XPathQuery("/body").queryForNodes(element)[0].toXml()
print "_onbody : ", self._msgBodyData
def _onConfirm(self, element):
"""
Get the confirmation message and act accordingly
@param element: the confirmation element (XML)
@type element: L{Element<twisted.words.xish.domish.Element>}
@todo: add data type verification
@todo: call data action callbacks
"""
status =
str(xpath.XPathQuery("/body/conf/status").queryForNodes(element)[0])
id =
int(str(xpath.XPathQuery("/body/conf/id").queryForNodes(element)[0]))
msg =
str(xpath.XPathQuery("/body/conf/msg").queryForNodes(element)[0])
if(id == constants._REG_MSG_ID):
if(status == constants._MSG_SUCCESS_TYPE):
self._registered = True
def connected(self, xs):
"""
Called when a client connects using an XML stream
@param xs: the current xml stream
@type xs: L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
@todo: add data action callbacks
"""
print 'Connection from %s:%s!' %
(str(self.transport.getPeer().host), str(self.transport.getPeer().port))
xs.addObserver("/header", self._onHeader)
xs.addObserver("/body/conf", self._onConfirm)
xs.addObserver("/body", self._onBody)
def start(daemonAddrs, daemonPort, serviceRef, serviceName,
serviceVersion, serviceMessageTypes):
"""
Start the daemon
@param daemonPort: the port the daemon listens on
@type daemonPort: C{int}
@param serviceRef: a reference to the service's class
@type serviceRef: a subclass of L{BaseService<service.BaseService>}
@param serviceName: the service's name
@type serviceName: C{str}
@param serviceVersion: the service's version
@type serviceVersion: C{str}
@param serviceMessageTypes: the list of messages the service registers
@type serviceMessageTypes: C{str list}
"""
f = MdfXmlStreamClientFactory(serviceRef, address=daemonAddrs,
port=daemonPort, name=serviceName, version=serviceVersion,
messageTypes=serviceMessageTypes)
port = reactor.listenTCP(0, f).getHost().port
f.register(port)
print port
reactor.run()
if(__name__ == "__main__"):
start("localhost", 4321, BaseService, "service_base", "1.0", ["all"])
>> Daemon listening for connections...
>>
>> daemon proto instance 1
>> Connection from 127.0.0.1:57821! <-- ok, Client1
>> ....
>> daemon proto instance 2
>> Connection from 127.0.0.1:57821! <-- Client1 again????? why?
>> Connection from 127.0.0.1:57823! <-- ok, Client2
>> ....
>> daemon proto instance 3
>> Connection from 127.0.0.1:57821! <-- ok, Client1 again????? why?
>> Connection from 127.0.0.1:57823! <-- ok, Client2 again????? why?
>> Connection from 127.0.0.1:57824! <-- ok, Client3
>>
>> Oh, and by the time I finished writing this email, I've switched to
>> solution 2, but I still get the behavior above.
>>
>>
>>
>> _______________________________________________
>> Twisted-Python mailing list
>> Twisted-Python at twistedmatrix.com
>> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>>
>>
>
>
>
>
Thank you,
Gabriel
More information about the Twisted-Python
mailing list