[Twisted-Python] A resizable cooperator class for queuing and dispatching jobs

Dave Peticolas dave at krondo.com
Wed Dec 9 22:59:30 EST 2009

Terry Jones wrote:
> I just wrote a fun class that lets you
>    - submit jobs to be dispatched to a queue
>    - manage how many tasks are in progress at once
>    - dynamically adjust that number
>    - shut down cleanly, including
>    - recovering jobs that were queued but hadn't been dispatched
> This uses a combination of a DeferredQueue, a task.Cooperator, and the
> DeferredPool I posted on Monday. For now I named it ResizableDispatchQueue
> (not a great name, suggestions welcome). You can pick it up from
> http://pastebin.com/f7dc9320e
> I can think of lots of uses. Here's a simple example.
> You want to write a server with a web interface that allows people to enter
> their phone number so you can send them an SMS. You anticipate lots of
> people will use the service. But sending SMS messages is quite slow, and
> the company that you ship those jobs off to is concerned that you'll
> overrun their service (or maybe they have an API limit, etc). So you need
> to queue up jobs locally and send them off at a certain rate. You'd like to
> be able to adjust that rate up or down. You also want to be able to shut
> your service down cleanly (i.e., not in the middle of a task), and when you
> restart it you want to be able to re-queue the jobs that were queued last
> time but which hadn't gone out.
> For example, suppose your function that sends the SMS is called sendSMS and
> that it takes a (number, message) tuple arg. Then:
>     dispatcher = ResizableDispatchQueue(sendSMS)
>     # Tell it to send at most 5 things at once.
>     dispatcher.start(5)      # Same as dispatcher.width = 5
>     # Later... send off some SMS messages.
>     dispatcher.put((2127399921, 'Hello...'))
>     dispatcher.put((5052929919, 'Test...'))
>     # Later, bump up to 10 simultaneous jobs.
>     dispatcher.width = 10
>     # Oops, turns out we're sending too fast, turn it down a little.
>     dispatcher.narrow(3)
>     # Get a copy of the list of pending jobs.
>     jobs = dispatcher.pending()
>     # Arrange to increase the number of jobs in an hour's time.
>     reactor.callLater(3600, dispatcher.setWidth, 20)
>     # Time to shutdown. Wait for any tasks underway to complete, and save
>     # the list of jobs not yet dispatched.
>     def saveJobs(jobs):
>         pickle.dump(jobs, ...)
>     d = dispatcher.stop()
>     d.addCallback(saveJobs)
> On restart you just unpickle the old job list and pass its items to
> dispatcher.put().
> I have a small test suite that's a bit weird (it schedules various things
> and tests how long the overall job takes and what's still pending when stop
> is called). It could be much better, but it does at least illustrate that
> the code seems to work. Let me know if you want it.

This is really nifty. I know I could use this.

> There's also the issue about what to do when the dispatch function hits an
> error.  An option could be added to re-queue the job, but it's perhaps
> better to let the dispatch function do that along with whatever else it
> needs.

One reason to have a separate error handler is to support generic
error-handling strategies, like 're-try N times and then send an
email here', etc. Though maybe you could do that with decorators
on the dispatch function. It does mean the dispatch function needs
to know about the task queue, though.

> As usual, I'd be happy to hear comments and suggestions. I'll probably
> adjust this so the DeferredQueue uses a priority queue.

Having written something like this, though not as general or as elegant,
several times, I've found that pause() and resume() is a very useful
API. That's not the same as setting the width to 0 and then back again,
as pause() and resume() don't require you to know or remember the
current width of the queue.


More information about the Twisted-Python mailing list