[Twisted-Python] Selectable SerialPort Windows/Linux

eulores eulores at gmail.com
Tue Sep 14 09:03:01 EDT 2010


Here is my code for Plug 'n Play detection of serial devices. It's not
yet ready for showtime, but it works with my serial attached Wacom.

Best regards,
eulores


# -*- coding: utf-8 -*-

import sys
if sys.platform == 'win32':
    from twisted.internet import win32eventreactor
    win32eventreactor.install()
from twisted.internet import reactor, protocol
from twisted.internet.task import deferLater, LoopingCall
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
from twisted.internet.serialport import *
import re

def wait(seconds):
    return deferLater(reactor, seconds, lambda:None)

class StructuredPnP(object):
    ''' The structured PnP field is available with these fields:
            data:    string, required. Original unformatted string.
            other:   string, optional
            rev:     integer, required. Revision of the PnP standard.
100 is standard version 1.00
            eisa:    string, required
            product: string, required
            serial:  string, optional
            klass:   string, optional
            compat:  string, optional. Other older products with
similar functionality
            user:    string, optional. Free flowing field useful for
the end-user
    '''
    def __init__(self, data):
        #data = '\\96,N,8,1(\x01$WAC0608\\\\PEN\\WAC0000\\WACOM
UD\r\nUD-0608-R,V1.4-4\r\nF4)'
        # test string for a 7-bit character string
        #data = 'aaa(bbcccdddd\\eeeeeeee\\fff\\gggg\\hhhhii)'
        # test string for a 6-bit character string
        #data = 'AAA\x08BBCCCDDDD<EEEEEEEE<FFF<GGGG<HHHHII\x09'
        self.data = data
        for key in "other eisa product serial klass compat user".split():
            setattr(self, key, '')
        self.rev = '\0\0'
        prologue = r'(?P<other>[^(]{,16}?)'
        matrix =
r'(?P<rev>..)(?P<eisa>...)(?P<product>....)(?:@(?P<serial>[^@]{,8}))?(?:@(?P<klass>[^@]{,32}))?(?:@(?P<compat>[^@]{,40}))?(?:@(?P<user>.{,40}?))?(?:..)?'
        needle1 = prologue + '\\(' + matrix.replace('@','\\\\') + '\\)'
        needle2 = prologue + '\\\x08' + matrix.replace('@','\\\x3C') + '\\\x09'
        dct = dict()
        mo = re.match(needle1, data, re.S)
        if mo:
            dct = mo.groupdict()
        else:
            mo = re.match(needle2, data, re.S)
            if mo:
                dct = mo.groupdict()
                for k in "eisa product serial klass compat user".split():
                    v = dct[k]
                    dct[k] = ''.join([chr(ord(ch)+0x20) for ch in list(v)])
        for k,v in dct.items():
            setattr(self, k, v)
        self.rev = ((ord(self.rev[0])&0x3f)<<6) + (ord(self.rev[1])&0x3f)
    def __str__(self):
        return self.data
    def __repr__(self):
        l = ['<StructuredPnP object> %r' % self.data]
        for k in "other rev eisa product serial klass compat user".split():
            l.append('  %-8s %r' % (k, getattr(self,k,False)))
        return '\n'.join(l)

class PnPProtocol(protocol.Protocol):
    """ See Plug and Play External COM device Specification, rev 1.00
from Microsoft & Hayes, 1994-1995"""
    def __init__(self, deferred):
        self.deferred = deferred
        self.data = ''
        self.timeout = 1.4
        self.T5 = reactor.callLater(self.timeout, self.deliverPnP)
    def deliverPnP(self):
        self.transport.loseConnection()
        if self.deferred is not None:
            d, self.deferred = self.deferred, None
            d.callback(StructuredPnP(self.data))
    def dataReceived(self, data):
        self.T5.reset(self.timeout)
        self.data += data
        if len(self.data)>=256:
            self.T5.reset(0)
    @inlineCallbacks
    def connectionMade(self):
        while 1:
            # print "2.1.1"
            self.transport.setDTR(1)
            self.transport.setRTS(0)
            yield wait(0.2)
            if not self.transport.getDSR():
                break
            # print "2.1.3 part A"
            self.transport.setDTR(0)
            yield wait(0.2)
            # print "2.1.3 part B"
            self.transport.setDTR(1)
            yield wait(0.2)
            # print "2.1.4"
            self.transport.setRTS(1)
            # timer T5 is now used for per-character timeout
            self.timeout = 0.2
            yield wait(0.2)
            if self.data:
                break
            # print "2.1.5"
            self.transport.setDTR(0)
            self.transport.setRTS(0)
            yield wait(0.2)
            # print "2.1.6"
            self.transport.setDTR(1)
            self.transport.setRTS(1)
            yield wait(0.2)
            break
        if not self.data:
            self.T5.reset(0)
        returnValue(None)
    def connectionLost(self, reason='connectionDone'):
        print "Connection lost:", reason

def pnpString(port=0):
    d = Deferred()
    protocol = PnPProtocol(d)
    try:
        #SerialPort(protocol, port, reactor, baudrate=1200,
bytesize=SEVENBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE,
timeout=0, xonxoff=0, rtscts=0)
        SerialPort(protocol, port, reactor, baudrate=1200,
bytesize=SEVENBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE,
xonxoff=0, rtscts=0)
    except serial.SerialException:
        d.callback(StructuredPnP(''))
    return d

@inlineCallbacks
def imRunning():
    print "PnP string: %r" % (yield pnpString(3))
    reactor.stop()

if __name__ == "__main__":
    reactor.callWhenRunning(imRunning)
    reactor.run()



On Tue, Sep 14, 2010 at 11:24 AM, Markus Hubig <mhubig at imko.de> wrote:
> Hi @all!
> I'm trying to write a python library module for a special
> serial communication protocol called IMPBUS. To use the serial
> interface for sending and receiving packets as for now I'm
> sub-classing pyserial. My code looks like this:
>
> But the problem is that I can't use select with pyserial on Windows,
> because it don't provide the fileno() methode. So after some googling
> I found twisted.internet.serialport "A select()able serial device, acting
> as a transport."
> I never used twisted before so I'm a little overwhelmed by how I can
> replace pyserial with twisted in the code above ... maybe someone can
> point me to the right direction. It seems I need a "Protocol" and a
> "receiver" ...
> - Markus
> --
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://twistedmatrix.com/pipermail/twisted-python/attachments/20100914/147f1bbd/attachment.htm 


More information about the Twisted-Python mailing list