[Twisted-Python] Re: How to receive a big stream data?

David Bolen db3l.net at gmail.com
Wed Jul 11 18:39:12 MDT 2007


"steven wang" <steven.zdwang at gmail.com> writes:

> But I want to receive binary data in my protocol.

Even if you start with a non-binary header, you can switch to
receiving binary information at any time by going using the raw mode
of most of the basic protocols.  And having some sort of ASCII header
prior to the raw data is often a very simple way to handle things
(something in common with a tremendous number of standard TCP-based
protocols).

Your original post had a fairly straight-forward ASCII header that I
think would probably be fine.  What you're probably missing is the
concept of switching to a raw binary receive mode which then switches
your protocol from getting data in its lineReceived method to having
rawDataReceived called.

For example, here's a slightly stripped pair of protocols (server and
client) that I'm currently using as part of a bigger project.  Most of
the communication is over a PB connection which the client uses to
perform operations on the server, one of which is editing job
information.  But jobs contain attached files (often very large
audio/video files), so committing changes to a job also involves
transmitting up any newly added files.  So after the client updates
the server's meta data, it initiates a separate set of file transfers
across a different port.

In my case, the header for a file transfer includes a session key
(which the protocol uses to reference the original PB-based job
session the client was using) along with a file key used for storage
(which uniquely references a specific file in the job).  The final
element is the total file size.  That is, upon connecting, the client
transmits a line such as:

    <session_uuid> <file_uuid> #######

where the two uuids are specific to the transfer underway (and help
with security since a random client isn't going to know the right
ids), and ######## is the overall file length.  After sending that
line (e.g., right after its final newline), the client just blasts up
the raw data.

The protocol is a simple LineReceiver based protocol, that receives
that information information as an ASCII initial line, after which it
switches to raw mode to receive the data.  Although the data length
could technically be inferred from when the client disconnects, having
it up front ensures I can detect a transfer that gets interrupted.

So on the server side you have:

          - - - - - - - - - - - - - - - - - - - - - - - - -

class FileIOProtocol(LineReceiver):

    def __init__(self):
        self.info = None
        self.outfile = None
        self.remain = 0
        self.crc = 0

    def lineReceived(self, line):
        logger.debug('FileIOProtocol:lineReceived:%s', line)
        sess_key, file_key, self.size = line.split()
        file_key = uuid.UUID(file_key)

        try:
            session_uuid = uuid.UUID(sess_key)
        except:
            logger.debug('FileIOProtocol:lineReceived Invalid session')
            self.transport.loseConnection()
            return

        self.job_session = self.factory.sessions.get(session_uuid)
        if not self.job_session:
            logger.debug('FileIOProtocol:lineReceived Invalid session')
            self.transport.loseConnection()
            return

        if not self.job_session.active:
            logger.debug('FileIOProtocol:lineReceived Stale session')
            self.transport.loseConnection()
            return

        # [db3l] The original code validates the individual file uuid here
        # resulting in self.job_file as job file object from the session
        
        if not self.job_file:
            logger.debug('FileIOProtocol:lineReceived Invalid file key')
            self.transport.loseConnection()
            return
            
        # Create the upload directory if not already present
        if not os.path.isdir(self.job_session.upload_dir):
            os.makedirs(self.job_session.upload_dir)

        self.outfilename = os.path.join(self.job_session.upload_dir,
                                        self.job_file['uuid'].hex)

        logger.debug('FileIOProtocol:lineReceived Receiving into %s',
                     self.outfilename)
        try:
            self.outfile = open(self.outfilename,'wb')
        except Exception, value:
            logger.debug('FileIOProtocol:lineReceived Unable to open file %s '
                         '(%s)', self.outfilename, value)
            self.transport.loseConnection()
            return

        self.remain = int(self.size)
        logger.debug('FileIOProtocol:lineReceived Entering raw mode: %s %s',
                     self.outfile, self.remain)
        self.setRawMode()

    def rawDataReceived(self, data):
        self.remain -= len(data)
        self.crc = crc32(data, self.crc)
        self.outfile.write(data)

    def connectionMade(self):
        LineReceiver.connectionMade(self)
        logger.debug('FileIOProtocol:connectionMade')

    def connectionLost(self, reason):
        LineReceiver.connectionLost(self, reason)
        logger.debug('FileIOProtocol:connectionLost')
        if self.outfile:
            self.outfile.close()

            if self.remain != 0:
                # Problem uploading - discard
                logger.debug('FileIOProtocol:connectionLost remain(%d)!=0',
                             self.remain)
                
                os.remove(self.outfilename)
            else:
                # Update job object with upload status
                self.job_file['uploaded'] = datetime.utcnow()
                self.job_file['size'] = self.size
                self.job_file['crc'] = self.crc


class FileIOFactory(ServerFactory):
    protocol = FileIOProtocol

    def __init__(self, db, sessions, options):
        self.db = db
        self.options = options
        self.sessions = sessions

          - - - - - - - - - - - - - - - - - - - - - - - - -

which is bound to an appropriate port on the server however you'd like.
I use code like:

    self.fileio = FileIOFactory(db, self.sessions, options)
    reactor.listenTCP(self.options['file_port'], self.fileio)


On the client side, I have an equivalent protocol that transmits up
the file.  It's run beneath a GUI, so keeps a reference to the GUI
controller object that might indicate it needs to cancel a transfer
mid-stream, as well as updating the controller during the transfer so
it can update a progress bar on screen.

It is also a LineReceiver based protocol, and uses the Twisted
FileSender object to do the raw data transfer (which is implemented as
a straight producer with the TCP socket being the consumer).  The
connectionMade method is where it transmits the ASCII header and then
institutes the raw data transfer.

          - - - - - - - - - - - - - - - - - - - - - - - - -


class TransferCancelled(Exception):
    """Exception for a user cancelling a transfer"""
    pass

class FileIOClient(LineReceiver):

    def __init__(self, path, sess_key, file_key, controller):
        self.path = path
        self.sess_key = sess_key
        self.file_key = file_key
        self.controller = controller

        self.infile = open(self.path, 'rb')
        self.insize = os.stat(self.path).st_size

        self.result = None
        self.completed = False

        self.controller.file_sent = 0
        self.controller.file_size = self.insize

    def _monitor(self, data):
        self.controller.file_sent += len(data)
        self.controller.total_sent += len(data)

        # Check with controller to see if we've been cancelled and abort
        # if so.
        if self.controller.cancel:
            print 'FileIOClient._monitor Cancelling'
            # Need to unregister the producer with the transport or it will
            # wait for it to finish before breaking the connection
            self.transport.unregisterProducer()
            self.transport.loseConnection()
            # Indicate a user cancelled result
            self.result = TransferCancelled('User cancelled transfer')

        return data

    def cbTransferCompleted(self, lastsent):
        self.completed = True
        self.transport.loseConnection()

    def connectionMade(self):
        self.transport.write('%s %s %s\r\n' % (str(self.sess_key),
                                               str(self.file_key),
                                               self.insize))
        sender = FileSender()
        sender.CHUNK_SIZE = 2 ** 16
        d = sender.beginFileTransfer(self.infile, self.transport,
                                     self._monitor)
        d.addCallback(self.cbTransferCompleted)

    def connectionLost(self, reason):
        LineReceiver.connectionLost(self, reason)
        print 'FileIOClient:connectionLost'
        self.infile.close()
        if self.completed:
            self.controller.completed.callback(self.result)
        else:
            self.controller.completed.errback(reason)

class FileIOClientFactory(ClientFactory):

    protocol = FileIOClient

    def __init__(self, path, sess_key, file_key, controller):
        self.path = path
        self.sess_key = sess_key
        self.file_key = file_key
        self.controller = controller
        
    def clientConnectionFailed(self, connector, reason):
        ClientFactory.clientConnectionFailed(self, connector, reason)
        self.controller.completed.errback(reason)

    def buildProtocol(self, addr):
        print 'buildProtocol'
        p = self.protocol(self.path, self.sess_key, self.file_key,
                          self.controller)
        p.factory = self
        return p

          - - - - - - - - - - - - - - - - - - - - - - - - -


Within the presentation layer controller on the client, initiating a
transfer is done with:

    def _transmitOne(self, address, port, path, sess_key, file_key):
        self.completed = defer.Deferred()
        f = FileIOClientFactory(path, sess_key, file_key, self)
        reactor.connectTCP(address, port, f)
        return self.completed

and the result is that self.completed fires (callback or errback) when
the transfer is done (which the controller uses to then initiate the
next transfer when there are a list of files to go up for a job).

While probably not exactly what you're trying to do, perhaps it'll
point you in the right direction.

-- David





More information about the Twisted-Python mailing list