[Twisted-web] Using Twisted to process a pool of messages

Phil Mayers p.mayers at imperial.ac.uk
Sun Oct 16 11:08:26 EDT 2011


On 10/16/2011 03:34 PM, AmirBehzad Eslami wrote:

> Now I'm asked to use Twisted to make my python script asynchronous.
> Well, I was  PHP Programmer. I'm wondering how this application could
> become asynchronous.
>
> Let me explain.
> If the first call to the above URL fails to recieve content for some reason,
> what will happen? Does asynchronous mean that Python will invoke
> a new URL to the same URL again? or It will make a new URL call?

None of the above.

Asynchronous code is not magical. It won't do things you don't tell it to.

In Twisted terms, normally you will call some API which will return a 
"deferred", which is just a placeholder for the result. You would attach 
a callback, which is called on success, and an errback, which is called 
on failure.

If the URL fetch fails, your errback will be invoked. On the other hand, 
if the URL fetch succeeds but returns no data, your callback will be 
called with an empty result.

Either way - the code you write decides what happens next.

> How can I handle "last_sms_id" during these asynchronous calls?

It sounds to me like you need to work your way through the Twisted 
tutorial. It's absolutely essential to understand a) how the reactor 
model that Twisted uses schedules your code and b) how Deferreds are 
used to give you results "asynchronously".

Very briefly, your code might look like something this:

from twisted.python import log
from twisted.internet import task
from twisted.web import client
from twisted.internet import reactor

class Worker:
   def __init__(self):
     self.last_sms_id = None
     self.host = 'http://your.server'

   def start(self):
     self.tsk = task.LoopingCall(self.fetch)
     self.tsk.start(5)

   def fetch(self):
     # We are called every 5 seconds
     # Fetch our URL and return a deferred
     # Attach success/failure callbacks

     if self.last_sms_id is None:
       # first time we've run?
       url = '%s/script' % (self.host,)
     else:
       url = '%s/script?last_sms_id=%s' % (self.host, self.last_sms_id,)

     d = self.client.getPage(url)

     d.addCallback(self.success)

     # because we attach the errback *after* the callback,
     # any errors/exceptions in the callback will go through
     # to the errback, where we can log them...
     d.addErrback(self.failure)

     # If we return a "deferred" from a LoopingCall task
     # the next call won't happen until the deferred has
     # completed; this is probably good
     return d

   def success(self, data):
     # N.B. any errors in this function will result in our errback
     # being called, so we can ignore them here if we want...

     messages = simplejson.loads(data)
     for msg in messages:
         # NOTE: this function should NOT block
         # or you'll block the reactor...
         do_something(msg)
         self.last_sms_id = msg.sms_id

   def failure(self, f):
     # called if either the page fetch failed, or the
     # callback had an error (invalid/empty data)
     # Just log it, and return a "null" reply
     log.msg("url fetch failed")
     log.err(f)

def setup():
   w = Worker()
   w.start()

def main():
   log.startLogging(sys.stderr)
   reactor.callWhenRunning(setup)
   reactor.run()

if __name__=='__main__':
   main()


...obviously your "do_something" call will need to handle the messages; 
it might for example use "getPage" to issue an HTTP POST to an 
SMS-sending service, or put them into a queue which another 
task.LoopingCall empties at a pre-defined rate.

Hope this helps clear things up a bit.



More information about the Twisted-web mailing list