[Twisted-Python] IPushProducer problem

Gabriel Rossetti gabriel.rossetti at arimaz.com
Wed Jan 14 07:48:13 MST 2009


Gabriel Rossetti wrote:
> Jean-Paul Calderone wrote:
>> On Tue, 13 Jan 2009 16:34:29 +0100, Gabriel Rossetti 
>> <gabriel.rossetti at arimaz.com> wrote:
>>> Hello everyone,
>>>
>>> I implemented a push-produce a while back and I though it works, but 
>>> it didn't. When the msgs where spaced out, it worked, but if several 
>>> msgs were sent one after the other, then things go bad (the msgs get 
>>> mixed up)...
>>>
>>> I found this thread : 
>>> http://www.twistedmatrix.com/pipermail/twisted-python 
>>> /2007-March/014983.html
>>>
>>> and I think I ave the same problem, my code looks a bit like his :
>>>
>>>    def pauseProducing(self):
>>>        """
>>>        Pause the producer
>>>        """
>>>        self.__paused = True
>>>
>>>    def resumeProducing(self):
>>>        """
>>>        Resume the producer
>>>        """
>>>        self.__paused = False
>>>
>>>        while(not self.__paused and self.__startLimit < self.__total):
>>>            data = self.__buf[self.__startLimit:self.__endLimit]
>>>            self.transport.write(data)
>>>            self.__startLimit = self.__endLimit
>>>            self.__endLimit += self.__burstSize
>>>            if(not self.__stream):
>>>                break
>>>
>>>        if(not self.__paused or self.__startLimit >= self.__total):
>>>            self.stopProducing()
>>>
>>>    def stopProducing(self):
>>>        """
>>>        Stop the producer
>>>        """
>>>        self.__paused = False
>>>        self.__startLimit = 0
>>>        self.__buf = None
>>>        self.__total = None
>>>        self.__endLimit = self.__burstSize
>>>
>>> and when I send a msg I use this :
>>>
>>> def sendMessage(self, msg):
>>>        if domish.IElement.providedBy(msg):
>>>            msg = msg.toXml()
>>>              if isinstance(msg, unicode):
>>>            msg = msg.encode('utf-8')
>>>                      self.__buf = msg
>>>        self.__total = len(msg)
>>>        if(self.__stream):
>>>            self.resumeProducing()
>>>
>>> I think what happens is that the code is re-entered, just like 
>>> Jean-Paul suggested in the other thread. The thing is I didn't 
>>> understand the solution, could somebody please explain it to me or 
>>> suggest something else?
>>
>> It's extremely difficult to say without seeing a complete example and 
>> without
>> knowing exactly how the code is failing.  However, one thing I do see 
>> is that
>> it is not safe to call your `sendMessage` a second time before all of 
>> the
>> data from the first call has been written.  Since you are using 
>> `__buf´ to
>> record the message to send, if you try to send another message, the 
>> first
>> buffer will be clobbered.  Could this be the problem you're having?
>>
>> Perhaps consider appending to `__buf´ instead of overwriting it.  
>> There are
>> probably lots of other areas where this code could improve too, but not
>> corrupting the internal state of the producer is a good start. :)
>>
>> Jean-Paul
>>
> Thanks for your answer Jean-Paul, yes, not currupting the internal 
> state of my producer is a good start :-). I re-wrote it like this and 
> it seems to work correctly now :
>
>    def pauseProducing(self):
>        """
>        Pause the producer
>        """
>        self.__paused = True
>
>    def resumeProducing(self):
>        """
>        Resume the producer
>        """
>        self.__paused = False
>
>        while(not self.__paused and self.__buf):
>            self.transport.write(self.__buf[:self.__burstSize])
>            self.__buf = self.__buf[self.__burstSize:]
>            if(not self.__stream):
>                break
>
>        if(not self.__paused or not self.__buf):
>            self.stopProducing()
>
>    def stopProducing(self):
>        """
>        Stop the producer
>        """
>        self.__paused = False
>
>    def sendMessage(self, msg):
>        if domish.IElement.providedBy(msg):
>            msg = msg.toXml()
>              if isinstance(msg, unicode):
>            msg = msg.encode('utf-8')
>                      self.__buf += msg
>        if(self.__stream and not self.__paused):
>            self.resumeProducing()
>
>
> Thanks again for pointing that out :-),
> Gabriel
>
Just a quick question, should I do the producer 
registration/unregisteration once (e.g. in 
connectionMade()/connectionLost()) or have the registration be done in 
my sendMessage() method and the unregistration in the stopProducing() 
method? I noticed that if I set streaming to False, it reacts like a 
PullProducer and calls resumeProducing() in the registration, which is 
what made me wonder if I should move the registration to sendMessage().

Thank you,
Gabriel




More information about the Twisted-Python mailing list