[Twisted-Python] Factory question

Gabriel Rossetti mailing_lists at evotex.ch
Fri Feb 29 05:34:46 EST 2008


Gabriel Rossetti wrote:
> Drew Smathers wrote:
>> On Thu, Feb 28, 2008 at 9:42 AM, Gabriel Rossetti
>> <mailing_lists at evotex.ch> wrote:
>>  
>>> Drew Smathers wrote:
>>>  > On Wed, Feb 27, 2008 at 3:32 AM, Gabriel Rossetti
>>>  > <mailing_lists at evotex.ch> wrote:
>>>  >
>>>  >> Hello everyone,
>>>  >>
>>>  >>  I have a small question, I have a service which needs to 
>>> sometimes send
>>>  >>  data (without having received any prior to sending) and sometimes
>>>  >>  receive data, which is better :
>>>  >>
>>>  >>  1) create a factory that inherits from ServerFactory and 
>>> ClientFactory,
>>>  >>  thus it can listen and send data
>>>  >>
>>>  >>  2) create a factory that inherits from ServerFactory only and 
>>> uses a
>>>  >>  single-use client (ClientCreator, as shown in the writing 
>>> clients howto)
>>>  >>  when it needs to send data
>>>  >>
>>>  >>
>>>  >
>>>  > I'm not sure of there's a single right way to do it, but I wouldn't
>>>  > bother inheriting from both ClientFactory and ServerFactory.  I 
>>> think
>>>  > you're on the write track with 2, though.
>>>  >
>>>  >
>>>  I had taken route 1 up until now (I'm thinking about switching...)
>>>
>>>  Maybe there's something I haven't quite gotten, when ClientA initially
>>>  connects to the server, the factory creates an instance of the 
>>> protocol,
>>>  correct?
>>>     
>>
>> Yes.
>>
>>   
> Ok
>>> Now ClientA sends some data to the server, which processes it
>>>  and sends something back. After that, the TCP session ends, and the
>>>  client disconnects, and the protocols instance dies. Is this 
>>> correct or
>>>  does it live on and get reused somehow?
>>>     
>>
>> The protocol instance does not get reused.
>>
>>   
> Ok, so every time there is data exchanged (new tcp/ip session) then a 
> new protocol instance is created. Any persistence/state data must 
> therefore be stored in the factory if I understand correctly.
>>>  I ask this because since
>>>  initially the clients send data to the server (registration), the 
>>> server
>>>  will there after send data to the clients. This makes the client 
>>> have to
>>>  connect to the server initially though a port using 
>>> reactor.connectTCP()
>>>  and listen to a port (that the server now knows since the client
>>>  registered itself) using reactor.listenTCP().
>>>     
>>
>> This is might be a bad idea - depending on the locality of your
>> servers and clients. Why not just use the established connection?  
> The idea is to have services register on a central server then they 
> transmit messages to the daemon which routes/relays them to the 
> correct service, sort of like a micro-kernel. So, sometimes the 
> services initiate the communication process and sometimes they don't, 
> the central server does. This makes the services be servers and 
> clients. Imagine this :
>
> service1 has some data that needs to be processed by service2 (which 
> will in turn send it to another service), is sends it to the central 
> server, which sends it to service2. Service2 does whatever it has to 
> do, and then sends it to the central server to route to service_n.
>
> So the established connection is usually useless, except for sending 
> some sort of ACK maybe.
>
>> If
>> the *client* is listening on a port then it isn't just a client - it's
>> a server, or a peer in a clustered system.
>>
>>   
> I guess it's/they a sort of peer(s) in a clustered system, it/they may 
> reside on the same machine as the central server or not.
>>>  I think I have to use
>>>  reactor.connectTCP() instead of  ClientCreator since the connection 
>>> has
>>>  to happen at the beginning and a transport needs to exist before I can
>>>  send anything. Well...now that I think about it, I could have the
>>>  factory register the client...... that would make me not have to 
>>> inherit
>>>  from the Client factory.... (I've now switched to solution 2, see last
>>>  part of this email).
>>>
>>>     
>>
>> Ok.
>>
>>  
>>>> In most use cases you shouldn't have to create custom factories.
>>>>       
>>>  I have to since I need a non-reconnecting client xmlstream, and the
>>>  factory used with xmlstreams is a reconnecting client.
>>>
>>>    
>>>> Just
>>>>       
>>>  > write the protocol to support bidirectional comm and to create the
>>>  > server:
>>>  >
>>>  > f = Factory()
>>>  > f.protocol = YourBidirectionalProtocol
>>>  >
>>>  > Regarding the client, how you implement it depends on whether or not
>>>  > the server is establishing the connection vs. reusing the existing
>>>  > connection.
>>>  What exactly do you mean by reusing an existing connection?
>>>
>>>    
>>>> If you're establishing the connection (like in a cluster
>>>>       
>>>  > app with known peers), just use ClientCreator.  If you're reusing 
>>> the
>>>  > existing connection, then you might not have to anything, unless you
>>>  > have some state to set up which could be done by overriding
>>>  > connectionMade on your Protocol.
>>>  >
>>>  > Finally, take everything I've stated above with a grain of salt.
>>>  >
>>>  >
>>>  Thanks, I find it fairly hard to get used to Twisted, I wanted to buy
>>>  the book, but it was written in 2005 and I'm not sure if it's still
>>>  valid with today's version.
>>>
>>>     
>>
>> The book it is not up to date.
>>
>>  
>>>  BTW, any idea why I'm getting this type of behavior (one server, 3
>>>  distinct connections from clients) :
>>>
>>>     
>>
>> Without seeing your code, no.
>>
>>   
Ok, I figured out what is wrong, when the factories are created, I add a 
bootstrap to an event that will call a callback when that event is 
received. In the central server, when a client/service connects, the 
callback is called and xml event observers are added (and now I added a 
method that removes them when the client disconnects). What happens is 
when a second client connects, the event calls the callback for every 
instance of the protocol, not just the one for the current connection. I 
traced it down to the following code in   :

        self.addBootstrap(xmlstream.STREAM_START_EVENT, xs._connected)
        self.addBootstrap(xmlstream.STREAM_END_EVENT, xs._disconnected)
       
        for event, fn in self.bootstraps:
            xs.addObserver(event, fn)

since the original XmlStreamFactory/XmlStreamFactoryMixin was designed 
to be a XML client and not a server, it has no notion of differentiating 
protocol instances (since there is only one)

Gabriel
> ok, it's a bit long.....
>
> -------------------------"Central server" aka 
> Daemon--------------------------------------------------------------------------- 
>
>
> class MdfXmlStreamFactory(XmlStreamFactoryMixin):
>    """
>    The factory class used by the daemon and services to create
>    protocol instances
>    """
>      def __init__(self, proto, *args, **kwargs):
>        """
>        Constructor
>              @param proto: the protocol to use
>        @type proto: a subclass of 
> L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
>        @param args: misc args
>        @type args: C{tuple}
>        @param kwargs: misc keyword args
>        @type kwargs: C{dict}
>        """
>        XmlStreamFactoryMixin.__init__(self)
>        self.args = args
>        self.kwargs = kwargs
>        self.protocol = proto
>
>    def buildProtocol(self, addr):
>        """
>        Builds the protocol and
>              @param addr: The address (protocol, IP, port) of the 
> connection
>        @type addr: 
> L{IPv4Address<twisted.internet.address._ServerFactoryIPv4Address>}
>              @return: an instance of the built protocol
>        """
>        #self.resetDelay()
>        xs = self.protocol(*self.args, **self.kwargs)
>        xs.factory = self
>        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, 
> xs.connected) # stream connect event or xml start event???
>        for event, fn in self.bootstraps:
>            xs.addObserver(event, fn)
>
>        return xs
>
> class MdfXmlStreamServerFactory(MdfXmlStreamFactory, ServerFactory):
>    """
>    The factory class used by the daemon to create
>    protocol instances
>    """
>      # The registered services
>    _services = {}
>      def __init__(self, proto, *args, **kwargs):
>        """
>        Constructor
>              @param proto: the protocol to use
>        @type proto: a subclass of 
> L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
>        @param args: misc args
>        @type args: C{tuple}
>        @param kwargs: misc keyword args
>        @type kwargs: C{dict}
>        """
>        MdfXmlStreamFactory.__init__(self, proto, *args, **kwargs)
>
> class Daemon(xmlstream.XmlStream):
>    """
>    The daemon is the implementation of a microkernel type inter-service
>    communication (ISC) routing daemon. Here is how it works :
>          - Services announce their presence to the daemon by giving 
> their name,
>          version, ip, port and a list of message-types that they accept
>        - The daemon listens for messages from the attached services, 
> when one
>          is received, it routes the message to the correct service
>      @todo: add unique id generation/verification
>    """
>      # Holds the real method
>    __dataReceived = xmlstream.XmlStream.dataReceived
>      # The registered services
>    #__services = {}
>    cnt = 1
>      def __init__(self, *args, **kwargs):
>        """
>        Constructor
>              @param args: non-keyword args
>        @type args: C{tuple}
>        @param kwargs: keyword args
>        @type kwargs: C{dict}
>        """
>        xmlstream.XmlStream.__init__(self)
>        self.__routeTo = None
>        self.__lastMsgType = None
>        self.__lastMsgId = None
>        self.inst = Daemon.cnt
>        Daemon.cnt += 1
>        print "daemon proto instance %d" % self.inst
>            def connectionMade(self):
>        xmlstream.XmlStream.connectionMade(self)
>      def dataReceived(self, data):
>        """
>        Called everytime data is received
>              @param data: the data received
>        @type data: C{object} (anything)
>        """
>        self.__dataReceived(data)
>          def connectionLost(self, reason):
>        """
>        Called when the connection is shut down, restores the
>        dataReceived method
>              @param reason: the reason why the connection was lost
>        @type reason: C{str}
>        """
>        self.__dataReceived = xmlstream.XmlStream.dataReceived
>        self.__routeTo = None
>        xmlstream.XmlStream.connectionLost(self, reason)
>      def __onHeader(self, element):
>        """
>        Analyse a header and set the data's recipiant
>              @param element: the header element (XML)
>        @type element: L{Element<twisted.words.xish.domish.Element>}
>        """
>        print "got header from %s:%s : %s" % 
> (str(self.transport.getPeer().host), 
> str(self.transport.getPeer().port), element.toXml())
>              self.__lastMsgId = element.getAttribute("id")
>        self.__lastMsgType = element.getAttribute("type")
>        if(self.__lastMsgType != constants._REG_MSG_TYPE):
>            self.__routeTo = self.factory._services[type]
>            self.__dataReceived = __routeDataReceived
>      def __onReg(self, element):
>        """
>        Register a service
>              @param element: the registeration element (XML)
>        @type element: L{Element<twisted.words.xish.domish.Element>}
>        """
>        print "got reg from %s:%s : %s" % 
> (str(self.transport.getPeer().host), 
> str(self.transport.getPeer().port), element.toXml())
>
>        name = 
> str(xpath.XPathQuery("/body/reg/name").queryForNodes(element)[0])
>        version = 
> str(xpath.XPathQuery("/body/reg/version").queryForNodes(element)[0])
>        #address = 
> str(xpath.XPathQuery("/body/reg/address").queryForNodes(element)[0])
>        port = 
> int(str(xpath.XPathQuery("/body/reg/port").queryForNodes(element)[0]))
>        msgs = [ str(m) for m in 
> xpath.XPathQuery("/body/reg/message_type").queryForNodes(element) ]
>              address = self.transport.getPeer().host
>        #port = self.transport.getPeer().port
>        serv = ServiceReg(name, version, msgs, address, port)
>        self.__registerService(serv)
>          def connected(self, xs):
>        """
>        Called when a client connects using an XML stream
>              @param xs: the current xml stream
>        @type xs: L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
>        """
>              print 'Connection from %s:%s!' % 
> (str(self.transport.getPeer().host), str(self.transport.getPeer().port))
>        xs.addObserver("/header", self.__onHeader)
>        xs.addObserver("/body/reg", self.__onReg)
>          def __routeDataReceived(self, data):
>        """
>        Pushes the messages to the correct service
>              @param data: the data received
>        @type data: C{object} (anything)
>        """
>        print "route '%s' to : %s" % (str(data), self.__routeTo)
>              utils.sendMessage(self.__routeTo.ip, self.__routeTo.port, 
> data)
>        #self.send(data)
>          def __registerService(self, service):
>        """
>        Register a service
>              @param service: the service to register
>        @type service: L{ServiceReg}
>              @raise ServiceMessageConflictError: if another service 
> already has a
>                                            message registered that the 
> current
>                                            service is trying to register
>              @todo: what is to be done with the exception once 
> raised???? Finish status message...
>        """
>              def foundConflict(self, msgTypes):
>            """
>            Check if there is a conflict with message types to be 
> registered
>            by this service
>                      @param service: the service messages to check for 
> conflicts
>            @type service: C{str}
>                      @return: the conflicting service type or None if no
>                    conflict is found
>            """
>            for mt in msgTypes:
>                if(self.factory._services.has_key(mt)):
>                    return mt
>            return None
>              print "Registering service : ", str(service)
>              try:
>            #
>            # Check if another service already registered a message 
> type that
>            # the current service is trying to register
>            #
>            conflict = foundConflict(self, service.acceptedMsgs)
>            if(conflict != None):
>                raise ServiceMessageConflictError(conflict)
>                      #
>            # Regrister the message types and this service
>            #
>            for msgType in service.acceptedMsgs:
>                self.factory._services[msgType] = service
>                      except ServiceMessageConflictError, reason:
>            status = 
> utils.createConfirmationMsgBody(constants._MSG_FAILURE_TYPE,
>                                                     self.__lastMsgId, 
> str(reason))
>        else:
>            status = 
> utils.createConfirmationMsgBody(constants._MSG_SUCCESS_TYPE,
>                                                     self.__lastMsgId)
>                  #
>        # Send registeration confirmation (succeeded or failed)
>        #
>        msgRoot = utils.createMessage(constants._REG_MSG_CONFIRM_TYPE,
>                                      constants._CONF_MSG_ID,
>                                      constants._DAEMON_SERVICE_NAME,
>                                      constants._MSG_SPEC_VERSION,
>                                      constants._STATUS_DATA_TYPE, status)
>              print "Sending confirmation message to %s : %s" % 
> (self.transport.getPeer().host, msgRoot.toXml())
>              #self.send(msgRoot)
>        utils.sendMessage(self.transport.getPeer().host, service.port, 
> msgRoot)
>   if(__name__ == "__main__"):
>
>    reactor.listenTCP(4321, MdfXmlStreamServerFactory(Daemon))
>    print "Listening for connections..."
>    reactor.run()
>
> ----------------------------------------"Service"----------------------------------------------------------------------------------------------------- 
>
>
> class MdfXmlStreamClientFactory(MdfXmlStreamServerFactory):
>    """
>    The factory class used by the services to create
>    protocol instances
>      @attention: this class might dissapear, I have to see if it's 
> useful to keep it or not
>    """
>      __daemonAddrs = None
>    __daemonPort = None
>      def __init__(self, proto, *args, **kwargs):
>        """
>        Constructor
>              @param proto: the protocol to use
>        @type proto: a subclass of 
> L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
>        @param args: misc args
>        @type args: C{tuple}
>        @param kwargs: misc keyword args
>        @type kwargs: C{dict}
>        """
>        MdfXmlStreamServerFactory.__init__(self, proto, *args, **kwargs)
>        self._serviceInfo = ServiceReg(kwargs["name"], 
> kwargs["version"], list(kwargs.get("messageTypes", [])))
>        MdfXmlStreamClientFactory.__daemonAddrs = kwargs["address"]
>        MdfXmlStreamClientFactory.__daemonPort = kwargs["port"]
>          def register(self, port):
>        """
>        Register the service
>              @param port: the service's port
>        @type port: C{int}
>        """
>        self._serviceInfo.port = port
>        msgBodyData = utils.createRegMsgBody(self._serviceInfo.name,
>                                             self._serviceInfo.version,
>                                             str(self._serviceInfo.port),
>                                             
> self._serviceInfo.acceptedMsgs)
>              msgRoot = utils.createMessage(constants._REG_MSG_TYPE,
>                                      constants._REG_MSG_ID,
>                                      self._serviceInfo.name,
>                                      constants._MSG_SPEC_VERSION,
>                                      constants._REG_DATA_TYPE, 
> msgBodyData)
>              utils.sendMessage(MdfXmlStreamClientFactory.__daemonAddrs,
>                          MdfXmlStreamClientFactory.__daemonPort,
>                          msgRoot)
>
> class BaseService(xmlstream.XmlStream):
>    """
>    The service is the implementation of a microkernel type inter-service
>    communication (ISC) endpoint. Here is how it works :
>          - Services announce their presence to the daemon by giving 
> their name,
>          version, ip, port and a list of message-types that they accept
>        - The daemon listens for messages from the attached services, 
> when one
>          is received, it routes the message to the correct service
>            @todo: add unique id generation/verification
>    """
>      def __init__(self, *args, **kwargs):
>        """
>        Constructor
>              @param args: non-keyword args
>        @type args: C{tuple}
>        @param kwargs: keyword args
>        @type kwargs: C{dict}
>        """
>        xmlstream.XmlStream.__init__(self)
>        self._msgSrc = None
>        self._msgDest = None
>        self._msgBodyData = None
>        self._registered = False
>      def _onHeader(self, element):
>        """
>        Analyse a header and save the source and destination
>              @param element: the header element (XML)
>        @type element: L{Element<twisted.words.xish.domish.Element>}
>              @todo: add msg spec version verification
>        @todo: add id verification???
>        """
>        print "got header from %s:%s : %s" % 
> (str(self.transport.getPeer().host), 
> str(self.transport.getPeer().port), element.toXml())
>              self._msgSrc = 
> xpath.XPathQuery("/header").queryForNodes(element)[0].getAttribute("source") 
>
>        self._msgDest = 
> xpath.XPathQuery("/header").queryForNodes(element)[0].getAttribute("destination") 
>
>          def _onBody(self, element):
>        """
>        Get the body act accordingly
>              @param element: the body element (XML)
>        @type element: L{Element<twisted.words.xish.domish.Element>}
>              @todo: add data type verification
>        @todo: call data action callbacks
>        """
>        self._msgBodyData = 
> xpath.XPathQuery("/body").queryForNodes(element)[0].toXml()
>        print "_onbody : ", self._msgBodyData
>          def _onConfirm(self, element):
>        """
>        Get the confirmation message and act accordingly
>              @param element: the confirmation element (XML)
>        @type element: L{Element<twisted.words.xish.domish.Element>}
>              @todo: add data type verification
>        @todo: call data action callbacks
>        """
>        status = 
> str(xpath.XPathQuery("/body/conf/status").queryForNodes(element)[0])
>        id = 
> int(str(xpath.XPathQuery("/body/conf/id").queryForNodes(element)[0]))
>        msg = 
> str(xpath.XPathQuery("/body/conf/msg").queryForNodes(element)[0])
>        if(id == constants._REG_MSG_ID):
>            if(status == constants._MSG_SUCCESS_TYPE):
>                self._registered = True
>          def connected(self, xs):
>        """
>        Called when a client connects using an XML stream
>              @param xs: the current xml stream
>        @type xs: L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
>              @todo: add data action callbacks
>        """
>        print 'Connection from %s:%s!' % 
> (str(self.transport.getPeer().host), str(self.transport.getPeer().port))
>        xs.addObserver("/header", self._onHeader)
>        xs.addObserver("/body/conf", self._onConfirm)
>        xs.addObserver("/body", self._onBody)
>      
> def start(daemonAddrs, daemonPort, serviceRef, serviceName, 
> serviceVersion, serviceMessageTypes):
>    """
>    Start the daemon
>      @param daemonPort: the port the daemon listens on
>    @type daemonPort: C{int}
>    @param serviceRef: a reference to the service's class
>    @type serviceRef: a subclass of L{BaseService<service.BaseService>}
>    @param serviceName: the service's name
>    @type serviceName: C{str}
>    @param serviceVersion: the service's version
>    @type serviceVersion: C{str}
>    @param serviceMessageTypes: the list of messages the service registers
>    @type serviceMessageTypes: C{str list}
>    """
>    f = MdfXmlStreamClientFactory(serviceRef, address=daemonAddrs, 
> port=daemonPort, name=serviceName, version=serviceVersion, 
> messageTypes=serviceMessageTypes)
>    port = reactor.listenTCP(0, f).getHost().port
>    f.register(port)
>    print port
>      reactor.run()
>
> if(__name__ == "__main__"):
>
>    start("localhost", 4321, BaseService, "service_base", "1.0", ["all"])
>>>  Daemon listening for connections...
>>>
>>>  daemon proto instance 1
>>>  Connection from 127.0.0.1:57821! <-- ok, Client1
>>>  ....
>>>  daemon proto instance 2
>>>  Connection from 127.0.0.1:57821! <-- Client1 again????? why?
>>>  Connection from 127.0.0.1:57823! <-- ok, Client2
>>>  ....
>>>  daemon proto instance 3
>>>  Connection from 127.0.0.1:57821! <-- ok, Client1 again????? why?
>>>  Connection from 127.0.0.1:57823! <-- ok, Client2 again????? why?
>>>  Connection from 127.0.0.1:57824! <-- ok, Client3
>>>
>>>  Oh, and by the time I finished writing this email, I've switched to
>>>  solution 2, but I still get the  behavior above.
>>>
>>>
>>>
>>>  _______________________________________________
>>>  Twisted-Python mailing list
>>>  Twisted-Python at twistedmatrix.com
>>>  http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>>>
>>>     
>>
>>
>>
>>   
> Thank you,
> Gabriel
>
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>





More information about the Twisted-Python mailing list