[Twisted-Python] Re: Hanging Deferreds in PB Paging code

Brian Granger ellisonbg.net at gmail.com
Fri Jan 19 19:48:23 EST 2007


Ed,

Thanks for the code snippet.  I had thought about trying to implement
my own, less complicated solution for this issue but hadn't done
anything yet.  I will have a look at your code to see if I can adapt
it.  I am not dealing with files, but most of the ideas are the same.

Also, the task queue stuff looks interesting.  It is actually pretty
similar to what we have implemented in IPython:

http://ipython.scipy.org/moin/Parallel_Computing

We have abstracted things at a slightly lower level to allow various
types of higher level constructs (such as task farming) to be built
easily.  We haven't build the task farming interface fully yet, but
that is on our list of todos soon.  It might be nice to use some of
your work when we implement the task farming interface, but
unfortunately, IPython is BSD, so your GPL code won't help us much.

Brian

On 1/19/07, Ed Suominen <general at eepatents.com> wrote:
> From: "Brian Granger" <ellisonbg.net at gmail.com>
> > We are using PB as an initial protocol for some IPython related
> > stuff. Overall, PB is working well, but we need to be able to send
> > larger things around so we have been trying to implement things using
> > the pb.util.Pager stuff.  I have spent a fair amount of time
> > understanding how the Paging works.
>
> I looked at the PB paging stuff and decided to write my own simple
> solution for it, borrowing the original code as needed. At the server,
> just pass one of these to your PB client:
>
> > class Receiver(pb.Referenceable):
> >     """
> >     Give a PB client a remote reference to an instance of me and it will
> >     have a way to upload a file in chunks of whatever size it likes.
> >     """
> >     def __init__(self, destPath):
> >         self.file = open(destPath, 'w')
> >
> >     def remote_chunk(self, data):
> >         self.file.write(data)
> >         return True
> >
> >     def remote_done(self, ok):
> >         self.file.close()
>
> At the client side, use the source path for the file and the reference
> to the Receiver object to construct an Uploader object:
>
> > from twisted.internet import defer, interfaces
> > from twisted.python.filepath import FilePath
> >
> > class Uploader(object):
> >     """
> >     I upload the file at the specified I{sourcePath} to the I{referenced} PB
> >     referenceable object, both of which are supplied to my constructor.
> >
> >     @ivar d: A deferred that fires when the file has been completely uploaded.
> >
> >     """
> >     implements(interfaces.IConsumer)
> >
> >     def __init__(self, sourcePath, referenced):
> >         self.ref = referenced
> >         # The source file
> >         fp = FilePath(sourcePath)
> >         if not fp.isfile():
> >             raise OSError("'%s' is not a valid file path" % result)
> >         self.basename = fp.basename()
> >         self.bytes = [0, fp.getsize()]
> >         self.fh = fp.open()
> >         # Pull producer for reading the file locally
> >         self.producer = FileSender()
> >         # Set up the deferred that fires when the upload is done
> >         d = self.d = defer.Deferred()
> >         d.addCallback(lambda ok: self.ref.callRemote('done', ok))
> >         # Start the transfer at the file producer
> >         self.producer.beginProducing(self.fh, self)
> >
> >     def cancel(self):
> >         if not self.d.called:
> >             self.d.callback(False)
> >         self.producer.stopProducing()
> >
> >     #--- IConsumer API --------------------------------------------------------
> >
> >     def registerProducer(self, producer, streaming):
> >         self.producer = producer
> >         if not streaming:
> >             self.producer.resumeProducing()
> >
> >     def unregisterProducer(self):
> >         if not self.d.called:
> >             self.d.callback(True)
> >
> >     def write(self, data):
> >         """
> >         Writes the supplied chunk of file I{data}, no larger than
> >         L{FileSender.CHUNK_SIZE} bytes, to the server using the referenceable
> >         I've been given.
> >         """
> >         def oops(failure):
> >             failure.printTraceback()
> >             self.cancel()
> >
> >         def update(sent):
> >             sent = self.bytes[0] + sent
> >             self.p.setValue(sent)
> >             self.bytes[0] = sent
> >
> >         d = self.ref.callRemote('chunk', data)
> >         d.addCallback(lambda _: update(len(data)))
> >         d.addCallback(lambda _: self.producer.resumeProducing())
> >         d.addErrback(oops)
>
> I excerpted this from code that queues up the remote calls via my
> asynchronous task queue, and I'm not sure whether it will work well
> without it. The taskqueue subpackage is part of
> Twisted-Goodies, see http://foss.eepatents.com/Twisted-Goodies.
>
> Anyhow, given how much it borrows from Twisted's own code, I hereby
> license this code snippet for use under the same MIT-like license that
> Twisted itself uses.
>
> Best regards, Ed
>
>
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>




More information about the Twisted-Python mailing list