[Twisted-Python] A resizable cooperator class for queuing and dispatching jobs

Terry Jones terry at jon.es
Tue Dec 8 23:09:35 EST 2009


I just wrote a fun class that lets you

   - submit jobs to be dispatched to a queue
   - manage how many tasks are in progress at once
   - dynamically adjust that number
   - shut down cleanly, including
   - recovering jobs that were queued but hadn't been dispatched

This uses a combination of a DeferredQueue, a task.Cooperator, and the
DeferredPool I posted on Monday. For now I named it ResizableDispatchQueue
(not a great name, suggestions welcome). You can pick it up from
http://pastebin.com/f7dc9320e

I can think of lots of uses. Here's a simple example.

You want to write a server with a web interface that allows people to enter
their phone number so you can send them an SMS. You anticipate lots of
people will use the service. But sending SMS messages is quite slow, and
the company that you ship those jobs off to is concerned that you'll
overrun their service (or maybe they have an API limit, etc). So you need
to queue up jobs locally and send them off at a certain rate. You'd like to
be able to adjust that rate up or down. You also want to be able to shut
your service down cleanly (i.e., not in the middle of a task), and when you
restart it you want to be able to re-queue the jobs that were queued last
time but which hadn't gone out.

For example, suppose your function that sends the SMS is called sendSMS and
that it takes a (number, message) tuple arg. Then:

    dispatcher = ResizableDispatchQueue(sendSMS)
    # Tell it to send at most 5 things at once.
    dispatcher.start(5)      # Same as dispatcher.width = 5

    # Later... send off some SMS messages.
    dispatcher.put((2127399921, 'Hello...'))
    dispatcher.put((5052929919, 'Test...'))
    
    # Later, bump up to 10 simultaneous jobs.
    dispatcher.width = 10

    # Oops, turns out we're sending too fast, turn it down a little.
    dispatcher.narrow(3)

    # Get a copy of the list of pending jobs.
    jobs = dispatcher.pending()

    # Arrange to increase the number of jobs in an hour's time.
    reactor.callLater(3600, dispatcher.setWidth, 20)

    # Time to shutdown. Wait for any tasks underway to complete, and save
    # the list of jobs not yet dispatched.

    def saveJobs(jobs):
        pickle.dump(jobs, ...)

    d = dispatcher.stop()
    d.addCallback(saveJobs)


On restart you just unpickle the old job list and pass its items to
dispatcher.put().

I have a small test suite that's a bit weird (it schedules various things
and tests how long the overall job takes and what's still pending when stop
is called). It could be much better, but it does at least illustrate that
the code seems to work. Let me know if you want it.

There's also the issue about what to do when the dispatch function hits an
error.  An option could be added to re-queue the job, but it's perhaps
better to let the dispatch function do that along with whatever else it
needs.

As usual, I'd be happy to hear comments and suggestions. I'll probably
adjust this so the DeferredQueue uses a priority queue.

Terry



More information about the Twisted-Python mailing list