[Twisted-Python] Factory question

Gabriel Rossetti mailing_lists at evotex.ch
Thu Feb 28 11:14:29 EST 2008


Drew Smathers wrote:
> On Thu, Feb 28, 2008 at 9:42 AM, Gabriel Rossetti
> <mailing_lists at evotex.ch> wrote:
>   
>> Drew Smathers wrote:
>>  > On Wed, Feb 27, 2008 at 3:32 AM, Gabriel Rossetti
>>  > <mailing_lists at evotex.ch> wrote:
>>  >
>>  >> Hello everyone,
>>  >>
>>  >>  I have a small question, I have a service which needs to sometimes send
>>  >>  data (without having received any prior to sending) and sometimes
>>  >>  receive data, which is better :
>>  >>
>>  >>  1) create a factory that inherits from ServerFactory and ClientFactory,
>>  >>  thus it can listen and send data
>>  >>
>>  >>  2) create a factory that inherits from ServerFactory only and uses a
>>  >>  single-use client (ClientCreator, as shown in the writing clients howto)
>>  >>  when it needs to send data
>>  >>
>>  >>
>>  >
>>  > I'm not sure of there's a single right way to do it, but I wouldn't
>>  > bother inheriting from both ClientFactory and ServerFactory.  I think
>>  > you're on the write track with 2, though.
>>  >
>>  >
>>  I had taken route 1 up until now (I'm thinking about switching...)
>>
>>  Maybe there's something I haven't quite gotten, when ClientA initially
>>  connects to the server, the factory creates an instance of the protocol,
>>  correct?
>>     
>
> Yes.
>
>   
Ok
>> Now ClientA sends some data to the server, which processes it
>>  and sends something back. After that, the TCP session ends, and the
>>  client disconnects, and the protocols instance dies. Is this correct or
>>  does it live on and get reused somehow?
>>     
>
> The protocol instance does not get reused.
>
>   
Ok, so every time there is data exchanged (new tcp/ip session) then a 
new protocol instance is created. Any persistence/state data must 
therefore be stored in the factory if I understand correctly.
>>  I ask this because since
>>  initially the clients send data to the server (registration), the server
>>  will there after send data to the clients. This makes the client have to
>>  connect to the server initially though a port using reactor.connectTCP()
>>  and listen to a port (that the server now knows since the client
>>  registered itself) using reactor.listenTCP().
>>     
>
> This is might be a bad idea - depending on the locality of your
> servers and clients. Why not just use the established connection?  
The idea is to have services register on a central server then they 
transmit messages to the daemon which routes/relays them to the correct 
service, sort of like a micro-kernel. So, sometimes the services 
initiate the communication process and sometimes they don't, the central 
server does. This makes the services be servers and clients. Imagine this :

service1 has some data that needs to be processed by service2 (which 
will in turn send it to another service), is sends it to the central 
server, which sends it to service2. Service2 does whatever it has to do, 
and then sends it to the central server to route to service_n.

So the established connection is usually useless, except for sending 
some sort of ACK maybe.

> If
> the *client* is listening on a port then it isn't just a client - it's
> a server, or a peer in a clustered system.
>
>   
I guess it's/they a sort of peer(s) in a clustered system, it/they may 
reside on the same machine as the central server or not.
>>  I think I have to use
>>  reactor.connectTCP() instead of  ClientCreator since the connection has
>>  to happen at the beginning and a transport needs to exist before I can
>>  send anything. Well...now that I think about it, I could have the
>>  factory register the client...... that would make me not have to inherit
>>  from the Client factory.... (I've now switched to solution 2, see last
>>  part of this email).
>>
>>     
>
> Ok.
>
>   
>>> In most use cases you shouldn't have to create custom factories.
>>>       
>>  I have to since I need a non-reconnecting client xmlstream, and the
>>  factory used with xmlstreams is a reconnecting client.
>>
>>     
>>> Just
>>>       
>>  > write the protocol to support bidirectional comm and to create the
>>  > server:
>>  >
>>  > f = Factory()
>>  > f.protocol = YourBidirectionalProtocol
>>  >
>>  > Regarding the client, how you implement it depends on whether or not
>>  > the server is establishing the connection vs. reusing the existing
>>  > connection.
>>  What exactly do you mean by reusing an existing connection?
>>
>>     
>>> If you're establishing the connection (like in a cluster
>>>       
>>  > app with known peers), just use ClientCreator.  If you're reusing the
>>  > existing connection, then you might not have to anything, unless you
>>  > have some state to set up which could be done by overriding
>>  > connectionMade on your Protocol.
>>  >
>>  > Finally, take everything I've stated above with a grain of salt.
>>  >
>>  >
>>  Thanks, I find it fairly hard to get used to Twisted, I wanted to buy
>>  the book, but it was written in 2005 and I'm not sure if it's still
>>  valid with today's version.
>>
>>     
>
> The book it is not up to date.
>
>   
>>  BTW, any idea why I'm getting this type of behavior (one server, 3
>>  distinct connections from clients) :
>>
>>     
>
> Without seeing your code, no.
>
>   
ok, it's a bit long.....

-------------------------"Central server" aka 
Daemon---------------------------------------------------------------------------

class MdfXmlStreamFactory(XmlStreamFactoryMixin):
    """
    The factory class used by the daemon and services to create
    protocol instances
    """
   
    def __init__(self, proto, *args, **kwargs):
        """
        Constructor
       
        @param proto: the protocol to use
        @type proto: a subclass of 
L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
        @param args: misc args
        @type args: C{tuple}
        @param kwargs: misc keyword args
        @type kwargs: C{dict}
        """
        XmlStreamFactoryMixin.__init__(self)
        self.args = args
        self.kwargs = kwargs
        self.protocol = proto

    def buildProtocol(self, addr):
        """
        Builds the protocol and
       
        @param addr: The address (protocol, IP, port) of the connection
        @type addr: 
L{IPv4Address<twisted.internet.address._ServerFactoryIPv4Address>}
       
        @return: an instance of the built protocol
        """
        #self.resetDelay()
        xs = self.protocol(*self.args, **self.kwargs)
        xs.factory = self
        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, 
xs.connected) # stream connect event or xml start event???
        for event, fn in self.bootstraps:
            xs.addObserver(event, fn)

        return xs

class MdfXmlStreamServerFactory(MdfXmlStreamFactory, ServerFactory):
    """
    The factory class used by the daemon to create
    protocol instances
    """
   
    # The registered services
    _services = {}
   
    def __init__(self, proto, *args, **kwargs):
        """
        Constructor
       
        @param proto: the protocol to use
        @type proto: a subclass of 
L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
        @param args: misc args
        @type args: C{tuple}
        @param kwargs: misc keyword args
        @type kwargs: C{dict}
        """
        MdfXmlStreamFactory.__init__(self, proto, *args, **kwargs)

class Daemon(xmlstream.XmlStream):
    """
    The daemon is the implementation of a microkernel type inter-service
    communication (ISC) routing daemon. Here is how it works :
   
        - Services announce their presence to the daemon by giving their 
name,
          version, ip, port and a list of message-types that they accept
        - The daemon listens for messages from the attached services, 
when one
          is received, it routes the message to the correct service
   
    @todo: add unique id generation/verification
    """
   
    # Holds the real method
    __dataReceived = xmlstream.XmlStream.dataReceived
   
    # The registered services
    #__services = {}
    cnt = 1
   
    def __init__(self, *args, **kwargs):
        """
        Constructor
       
        @param args: non-keyword args
        @type args: C{tuple}
        @param kwargs: keyword args
        @type kwargs: C{dict}
        """
        xmlstream.XmlStream.__init__(self)
        self.__routeTo = None
        self.__lastMsgType = None
        self.__lastMsgId = None
        self.inst = Daemon.cnt
        Daemon.cnt += 1
        print "daemon proto instance %d" % self.inst
       
   
    def connectionMade(self):
        xmlstream.XmlStream.connectionMade(self)
   
    def dataReceived(self, data):
        """
        Called everytime data is received
       
        @param data: the data received
        @type data: C{object} (anything)
        """
        self.__dataReceived(data)
       
    def connectionLost(self, reason):
        """
        Called when the connection is shut down, restores the
        dataReceived method
       
        @param reason: the reason why the connection was lost
        @type reason: C{str}
        """
        self.__dataReceived = xmlstream.XmlStream.dataReceived
        self.__routeTo = None
        xmlstream.XmlStream.connectionLost(self, reason)
   
    def __onHeader(self, element):
        """
        Analyse a header and set the data's recipiant
       
        @param element: the header element (XML)
        @type element: L{Element<twisted.words.xish.domish.Element>}
        """
        print "got header from %s:%s : %s" % 
(str(self.transport.getPeer().host), str(self.transport.getPeer().port), 
element.toXml())
       
        self.__lastMsgId = element.getAttribute("id")
        self.__lastMsgType = element.getAttribute("type")
        if(self.__lastMsgType != constants._REG_MSG_TYPE):
            self.__routeTo = self.factory._services[type]
            self.__dataReceived = __routeDataReceived
   
    def __onReg(self, element):
        """
        Register a service
       
        @param element: the registeration element (XML)
        @type element: L{Element<twisted.words.xish.domish.Element>}
        """
        print "got reg from %s:%s : %s" % 
(str(self.transport.getPeer().host), str(self.transport.getPeer().port), 
element.toXml())

        name = 
str(xpath.XPathQuery("/body/reg/name").queryForNodes(element)[0])
        version = 
str(xpath.XPathQuery("/body/reg/version").queryForNodes(element)[0])
        #address = 
str(xpath.XPathQuery("/body/reg/address").queryForNodes(element)[0])
        port = 
int(str(xpath.XPathQuery("/body/reg/port").queryForNodes(element)[0]))
        msgs = [ str(m) for m in 
xpath.XPathQuery("/body/reg/message_type").queryForNodes(element) ]
       
        address = self.transport.getPeer().host
        #port = self.transport.getPeer().port
        serv = ServiceReg(name, version, msgs, address, port)
        self.__registerService(serv)
       
    def connected(self, xs):
        """
        Called when a client connects using an XML stream
       
        @param xs: the current xml stream
        @type xs: L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
        """
       
        print 'Connection from %s:%s!' % 
(str(self.transport.getPeer().host), str(self.transport.getPeer().port))
        xs.addObserver("/header", self.__onHeader)
        xs.addObserver("/body/reg", self.__onReg)
       
    def __routeDataReceived(self, data):
        """
        Pushes the messages to the correct service
       
        @param data: the data received
        @type data: C{object} (anything)
        """
        print "route '%s' to : %s" % (str(data), self.__routeTo)
       
        utils.sendMessage(self.__routeTo.ip, self.__routeTo.port, data)
        #self.send(data)
       
    def __registerService(self, service):
        """
        Register a service
       
        @param service: the service to register
        @type service: L{ServiceReg}
       
        @raise ServiceMessageConflictError: if another service already 
has a
                                            message registered that the 
current
                                            service is trying to register
       
        @todo: what is to be done with the exception once raised???? 
Finish status message...
        """
       
        def foundConflict(self, msgTypes):
            """
            Check if there is a conflict with message types to be registered
            by this service
           
            @param service: the service messages to check for conflicts
            @type service: C{str}
           
            @return: the conflicting service type or None if no
                    conflict is found
            """
            for mt in msgTypes:
                if(self.factory._services.has_key(mt)):
                    return mt
            return None
       
        print "Registering service : ", str(service)
       
        try:
            #
            # Check if another service already registered a message type 
that
            # the current service is trying to register
            #
            conflict = foundConflict(self, service.acceptedMsgs)
            if(conflict != None):
                raise ServiceMessageConflictError(conflict)
           
            #
            # Regrister the message types and this service
            #
            for msgType in service.acceptedMsgs:
                self.factory._services[msgType] = service
               
        except ServiceMessageConflictError, reason:
            status = 
utils.createConfirmationMsgBody(constants._MSG_FAILURE_TYPE,
                                                     self.__lastMsgId, 
str(reason))
        else:
            status = 
utils.createConfirmationMsgBody(constants._MSG_SUCCESS_TYPE,
                                                     self.__lastMsgId)
           
        #
        # Send registeration confirmation (succeeded or failed)
        #
        msgRoot = utils.createMessage(constants._REG_MSG_CONFIRM_TYPE,
                                      constants._CONF_MSG_ID,
                                      constants._DAEMON_SERVICE_NAME,
                                      constants._MSG_SPEC_VERSION,
                                      constants._STATUS_DATA_TYPE, status)
       
        print "Sending confirmation message to %s : %s" % 
(self.transport.getPeer().host, msgRoot.toXml())
       
        #self.send(msgRoot)
        utils.sendMessage(self.transport.getPeer().host, service.port, 
msgRoot)
   
if(__name__ == "__main__"):

    reactor.listenTCP(4321, MdfXmlStreamServerFactory(Daemon))
    print "Listening for connections..."
    reactor.run()

----------------------------------------"Service"-----------------------------------------------------------------------------------------------------

class MdfXmlStreamClientFactory(MdfXmlStreamServerFactory):
    """
    The factory class used by the services to create
    protocol instances
   
    @attention: this class might dissapear, I have to see if it's useful 
to keep it or not
    """
   
    __daemonAddrs = None
    __daemonPort = None
   
    def __init__(self, proto, *args, **kwargs):
        """
        Constructor
       
        @param proto: the protocol to use
        @type proto: a subclass of 
L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
        @param args: misc args
        @type args: C{tuple}
        @param kwargs: misc keyword args
        @type kwargs: C{dict}
        """
        MdfXmlStreamServerFactory.__init__(self, proto, *args, **kwargs)
        self._serviceInfo = ServiceReg(kwargs["name"], 
kwargs["version"], list(kwargs.get("messageTypes", [])))
        MdfXmlStreamClientFactory.__daemonAddrs = kwargs["address"]
        MdfXmlStreamClientFactory.__daemonPort = kwargs["port"]
       
    def register(self, port):
        """
        Register the service
       
        @param port: the service's port
        @type port: C{int}
        """
        self._serviceInfo.port = port
        msgBodyData = utils.createRegMsgBody(self._serviceInfo.name,
                                             self._serviceInfo.version,
                                             str(self._serviceInfo.port),
                                             self._serviceInfo.acceptedMsgs)
       
        msgRoot = utils.createMessage(constants._REG_MSG_TYPE,
                                      constants._REG_MSG_ID,
                                      self._serviceInfo.name,
                                      constants._MSG_SPEC_VERSION,
                                      constants._REG_DATA_TYPE, msgBodyData)
       
        utils.sendMessage(MdfXmlStreamClientFactory.__daemonAddrs,
                          MdfXmlStreamClientFactory.__daemonPort,
                          msgRoot)

class BaseService(xmlstream.XmlStream):
    """
    The service is the implementation of a microkernel type inter-service
    communication (ISC) endpoint. Here is how it works :
   
        - Services announce their presence to the daemon by giving their 
name,
          version, ip, port and a list of message-types that they accept
        - The daemon listens for messages from the attached services, 
when one
          is received, it routes the message to the correct service
         
    @todo: add unique id generation/verification
    """
   
    def __init__(self, *args, **kwargs):
        """
        Constructor
       
        @param args: non-keyword args
        @type args: C{tuple}
        @param kwargs: keyword args
        @type kwargs: C{dict}
        """
        xmlstream.XmlStream.__init__(self)
        self._msgSrc = None
        self._msgDest = None
        self._msgBodyData = None
        self._registered = False
   
    def _onHeader(self, element):
        """
        Analyse a header and save the source and destination
       
        @param element: the header element (XML)
        @type element: L{Element<twisted.words.xish.domish.Element>}
       
        @todo: add msg spec version verification
        @todo: add id verification???
        """
        print "got header from %s:%s : %s" % 
(str(self.transport.getPeer().host), str(self.transport.getPeer().port), 
element.toXml())
       
        self._msgSrc = 
xpath.XPathQuery("/header").queryForNodes(element)[0].getAttribute("source")
        self._msgDest = 
xpath.XPathQuery("/header").queryForNodes(element)[0].getAttribute("destination")
       
    def _onBody(self, element):
        """
        Get the body act accordingly
       
        @param element: the body element (XML)
        @type element: L{Element<twisted.words.xish.domish.Element>}
       
        @todo: add data type verification
        @todo: call data action callbacks
        """
        self._msgBodyData = 
xpath.XPathQuery("/body").queryForNodes(element)[0].toXml()
        print "_onbody : ", self._msgBodyData
       
    def _onConfirm(self, element):
        """
        Get the confirmation message and act accordingly
       
        @param element: the confirmation element (XML)
        @type element: L{Element<twisted.words.xish.domish.Element>}
       
        @todo: add data type verification
        @todo: call data action callbacks
        """
        status = 
str(xpath.XPathQuery("/body/conf/status").queryForNodes(element)[0])
        id = 
int(str(xpath.XPathQuery("/body/conf/id").queryForNodes(element)[0]))
        msg = 
str(xpath.XPathQuery("/body/conf/msg").queryForNodes(element)[0])
        if(id == constants._REG_MSG_ID):
            if(status == constants._MSG_SUCCESS_TYPE):
                self._registered = True
       
    def connected(self, xs):
        """
        Called when a client connects using an XML stream
       
        @param xs: the current xml stream
        @type xs: L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
       
        @todo: add data action callbacks
        """
        print 'Connection from %s:%s!' % 
(str(self.transport.getPeer().host), str(self.transport.getPeer().port))
        xs.addObserver("/header", self._onHeader)
        xs.addObserver("/body/conf", self._onConfirm)
        xs.addObserver("/body", self._onBody)
       

def start(daemonAddrs, daemonPort, serviceRef, serviceName, 
serviceVersion, serviceMessageTypes):
    """
    Start the daemon
   
    @param daemonPort: the port the daemon listens on
    @type daemonPort: C{int}
    @param serviceRef: a reference to the service's class
    @type serviceRef: a subclass of L{BaseService<service.BaseService>}
    @param serviceName: the service's name
    @type serviceName: C{str}
    @param serviceVersion: the service's version
    @type serviceVersion: C{str}
    @param serviceMessageTypes: the list of messages the service registers
    @type serviceMessageTypes: C{str list}
    """
    f = MdfXmlStreamClientFactory(serviceRef, address=daemonAddrs, 
port=daemonPort, name=serviceName, version=serviceVersion, 
messageTypes=serviceMessageTypes)
    port = reactor.listenTCP(0, f).getHost().port
    f.register(port)
    print port
   
    reactor.run()

if(__name__ == "__main__"):

    start("localhost", 4321, BaseService, "service_base", "1.0", ["all"])
>>  Daemon listening for connections...
>>
>>  daemon proto instance 1
>>  Connection from 127.0.0.1:57821! <-- ok, Client1
>>  ....
>>  daemon proto instance 2
>>  Connection from 127.0.0.1:57821! <-- Client1 again????? why?
>>  Connection from 127.0.0.1:57823! <-- ok, Client2
>>  ....
>>  daemon proto instance 3
>>  Connection from 127.0.0.1:57821! <-- ok, Client1 again????? why?
>>  Connection from 127.0.0.1:57823! <-- ok, Client2 again????? why?
>>  Connection from 127.0.0.1:57824! <-- ok, Client3
>>
>>  Oh, and by the time I finished writing this email, I've switched to
>>  solution 2, but I still get the  behavior above.
>>
>>
>>
>>  _______________________________________________
>>  Twisted-Python mailing list
>>  Twisted-Python at twistedmatrix.com
>>  http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>>
>>     
>
>
>
>   
Thank you,
Gabriel




More information about the Twisted-Python mailing list