[Twisted-Python] Re: How to receive a big stream data?

steven wang steven.zdwang at gmail.com
Fri Jul 20 00:50:19 EDT 2007


Thank you very much! :)

On 7/12/07, David Bolen <db3l.net at gmail.com> wrote:
>
> "steven wang" <steven.zdwang at gmail.com> writes:
>
> > But I want to receive binary data in my protocol.
>
> Even if you start with a non-binary header, you can switch to
> receiving binary information at any time by going using the raw mode
> of most of the basic protocols.  And having some sort of ASCII header
> prior to the raw data is often a very simple way to handle things
> (something in common with a tremendous number of standard TCP-based
> protocols).
>
> Your original post had a fairly straight-forward ASCII header that I
> think would probably be fine.  What you're probably missing is the
> concept of switching to a raw binary receive mode which then switches
> your protocol from getting data in its lineReceived method to having
> rawDataReceived called.
>
> For example, here's a slightly stripped pair of protocols (server and
> client) that I'm currently using as part of a bigger project.  Most of
> the communication is over a PB connection which the client uses to
> perform operations on the server, one of which is editing job
> information.  But jobs contain attached files (often very large
> audio/video files), so committing changes to a job also involves
> transmitting up any newly added files.  So after the client updates
> the server's meta data, it initiates a separate set of file transfers
> across a different port.
>
> In my case, the header for a file transfer includes a session key
> (which the protocol uses to reference the original PB-based job
> session the client was using) along with a file key used for storage
> (which uniquely references a specific file in the job).  The final
> element is the total file size.  That is, upon connecting, the client
> transmits a line such as:
>
>     <session_uuid> <file_uuid> #######
>
> where the two uuids are specific to the transfer underway (and help
> with security since a random client isn't going to know the right
> ids), and ######## is the overall file length.  After sending that
> line (e.g., right after its final newline), the client just blasts up
> the raw data.
>
> The protocol is a simple LineReceiver based protocol, that receives
> that information information as an ASCII initial line, after which it
> switches to raw mode to receive the data.  Although the data length
> could technically be inferred from when the client disconnects, having
> it up front ensures I can detect a transfer that gets interrupted.
>
> So on the server side you have:
>
>           - - - - - - - - - - - - - - - - - - - - - - - - -
>
> class FileIOProtocol(LineReceiver):
>
>     def __init__(self):
>         self.info = None
>         self.outfile = None
>         self.remain = 0
>         self.crc = 0
>
>     def lineReceived(self, line):
>         logger.debug('FileIOProtocol:lineReceived:%s', line)
>         sess_key, file_key, self.size = line.split()
>         file_key = uuid.UUID(file_key)
>
>         try:
>             session_uuid = uuid.UUID(sess_key)
>         except:
>             logger.debug('FileIOProtocol:lineReceived Invalid session')
>             self.transport.loseConnection()
>             return
>
>         self.job_session = self.factory.sessions.get(session_uuid)
>         if not self.job_session:
>             logger.debug('FileIOProtocol:lineReceived Invalid session')
>             self.transport.loseConnection()
>             return
>
>         if not self.job_session.active:
>             logger.debug('FileIOProtocol:lineReceived Stale session')
>             self.transport.loseConnection()
>             return
>
>         # [db3l] The original code validates the individual file uuid here
>         # resulting in self.job_file as job file object from the session
>
>         if not self.job_file:
>             logger.debug('FileIOProtocol:lineReceived Invalid file key')
>             self.transport.loseConnection()
>             return
>
>         # Create the upload directory if not already present
>         if not os.path.isdir(self.job_session.upload_dir):
>             os.makedirs(self.job_session.upload_dir)
>
>         self.outfilename = os.path.join(self.job_session.upload_dir,
>                                         self.job_file['uuid'].hex)
>
>         logger.debug('FileIOProtocol:lineReceived Receiving into %s',
>                      self.outfilename)
>         try:
>             self.outfile = open(self.outfilename,'wb')
>         except Exception, value:
>             logger.debug('FileIOProtocol:lineReceived Unable to open file
> %s '
>                          '(%s)', self.outfilename, value)
>             self.transport.loseConnection()
>             return
>
>         self.remain = int(self.size)
>         logger.debug('FileIOProtocol:lineReceived Entering raw mode: %s
> %s',
>                      self.outfile, self.remain)
>         self.setRawMode()
>
>     def rawDataReceived(self, data):
>         self.remain -= len(data)
>         self.crc = crc32(data, self.crc)
>         self.outfile.write(data)
>
>     def connectionMade(self):
>         LineReceiver.connectionMade(self)
>         logger.debug('FileIOProtocol:connectionMade')
>
>     def connectionLost(self, reason):
>         LineReceiver.connectionLost(self, reason)
>         logger.debug('FileIOProtocol:connectionLost')
>         if self.outfile:
>             self.outfile.close()
>
>             if self.remain != 0:
>                 # Problem uploading - discard
>                 logger.debug('FileIOProtocol:connectionLost
> remain(%d)!=0',
>                              self.remain)
>
>                 os.remove(self.outfilename)
>             else:
>                 # Update job object with upload status
>                 self.job_file['uploaded'] = datetime.utcnow()
>                 self.job_file['size'] = self.size
>                 self.job_file['crc'] = self.crc
>
>
> class FileIOFactory(ServerFactory):
>     protocol = FileIOProtocol
>
>     def __init__(self, db, sessions, options):
>         self.db = db
>         self.options = options
>         self.sessions = sessions
>
>           - - - - - - - - - - - - - - - - - - - - - - - - -
>
> which is bound to an appropriate port on the server however you'd like.
> I use code like:
>
>     self.fileio = FileIOFactory(db, self.sessions, options)
>     reactor.listenTCP(self.options['file_port'], self.fileio)
>
>
> On the client side, I have an equivalent protocol that transmits up
> the file.  It's run beneath a GUI, so keeps a reference to the GUI
> controller object that might indicate it needs to cancel a transfer
> mid-stream, as well as updating the controller during the transfer so
> it can update a progress bar on screen.
>
> It is also a LineReceiver based protocol, and uses the Twisted
> FileSender object to do the raw data transfer (which is implemented as
> a straight producer with the TCP socket being the consumer).  The
> connectionMade method is where it transmits the ASCII header and then
> institutes the raw data transfer.
>
>           - - - - - - - - - - - - - - - - - - - - - - - - -
>
>
> class TransferCancelled(Exception):
>     """Exception for a user cancelling a transfer"""
>     pass
>
> class FileIOClient(LineReceiver):
>
>     def __init__(self, path, sess_key, file_key, controller):
>         self.path = path
>         self.sess_key = sess_key
>         self.file_key = file_key
>         self.controller = controller
>
>         self.infile = open(self.path, 'rb')
>         self.insize = os.stat(self.path).st_size
>
>         self.result = None
>         self.completed = False
>
>         self.controller.file_sent = 0
>         self.controller.file_size = self.insize
>
>     def _monitor(self, data):
>         self.controller.file_sent += len(data)
>         self.controller.total_sent += len(data)
>
>         # Check with controller to see if we've been cancelled and abort
>         # if so.
>         if self.controller.cancel:
>             print 'FileIOClient._monitor Cancelling'
>             # Need to unregister the producer with the transport or it
> will
>             # wait for it to finish before breaking the connection
>             self.transport.unregisterProducer()
>             self.transport.loseConnection()
>             # Indicate a user cancelled result
>             self.result = TransferCancelled('User cancelled transfer')
>
>         return data
>
>     def cbTransferCompleted(self, lastsent):
>         self.completed = True
>         self.transport.loseConnection()
>
>     def connectionMade(self):
>         self.transport.write('%s %s %s\r\n' % (str(self.sess_key),
>                                                str(self.file_key),
>                                                self.insize))
>         sender = FileSender()
>         sender.CHUNK_SIZE = 2 ** 16
>         d = sender.beginFileTransfer(self.infile, self.transport,
>                                      self._monitor)
>         d.addCallback(self.cbTransferCompleted)
>
>     def connectionLost(self, reason):
>         LineReceiver.connectionLost(self, reason)
>         print 'FileIOClient:connectionLost'
>         self.infile.close()
>         if self.completed:
>             self.controller.completed.callback(self.result)
>         else:
>             self.controller.completed.errback(reason)
>
> class FileIOClientFactory(ClientFactory):
>
>     protocol = FileIOClient
>
>     def __init__(self, path, sess_key, file_key, controller):
>         self.path = path
>         self.sess_key = sess_key
>         self.file_key = file_key
>         self.controller = controller
>
>     def clientConnectionFailed(self, connector, reason):
>         ClientFactory.clientConnectionFailed(self, connector, reason)
>         self.controller.completed.errback(reason)
>
>     def buildProtocol(self, addr):
>         print 'buildProtocol'
>         p = self.protocol(self.path, self.sess_key, self.file_key,
>                           self.controller)
>         p.factory = self
>         return p
>
>           - - - - - - - - - - - - - - - - - - - - - - - - -
>
>
> Within the presentation layer controller on the client, initiating a
> transfer is done with:
>
>     def _transmitOne(self, address, port, path, sess_key, file_key):
>         self.completed = defer.Deferred()
>         f = FileIOClientFactory(path, sess_key, file_key, self)
>         reactor.connectTCP(address, port, f)
>         return self.completed
>
> and the result is that self.completed fires (callback or errback) when
> the transfer is done (which the controller uses to then initiate the
> next transfer when there are a list of files to go up for a job).
>
> While probably not exactly what you're trying to do, perhaps it'll
> point you in the right direction.
>
> -- David
>
>
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://twistedmatrix.com/pipermail/twisted-python/attachments/20070720/37570bc1/attachment.htm 


More information about the Twisted-Python mailing list