<pre>Here is my code for Plug 'n Play detection of serial devices. It's not yet ready for showtime, but it works with my serial attached Wacom.<br><br>Best regards,<br>eulores<br><br><br># -*- coding: utf-8 -*-<br>
<br>import sys<br>if sys.platform == 'win32':<br> from twisted.internet import win32eventreactor<br> win32eventreactor.install()<br>from twisted.internet import reactor, protocol<br>from twisted.internet.task import deferLater, LoopingCall<br>
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue<br>from twisted.internet.serialport import *<br>import re<br><br>def wait(seconds):<br> return deferLater(reactor, seconds, lambda:None)<br><br>
class StructuredPnP(object):<br> ''' The structured PnP field is available with these fields:<br> data: string, required. Original unformatted string.<br> other: string, optional<br>
rev: integer, required. Revision of the PnP standard. 100 is standard version 1.00<br> eisa: string, required<br> product: string, required<br> serial: string, optional<br>
klass: string, optional<br> compat: string, optional. Other older products with similar functionality<br> user: string, optional. Free flowing field useful for the end-user<br> '''<br>
def __init__(self, data):<br> #data = '\\96,N,8,1(\x01$WAC0608\\\\PEN\\WAC0000\\WACOM UD\r\nUD-0608-R,V1.4-4\r\nF4)'<br> # test string for a 7-bit character string<br> #data = 'aaa(bbcccdddd\\eeeeeeee\\fff\\gggg\\hhhhii)'<br>
# test string for a 6-bit character string<br> #data = 'AAA\x08BBCCCDDDD<EEEEEEEE<FFF<GGGG<HHHHII\x09'<br> self.data = data<br> for key in "other eisa product serial klass compat user".split():<br>
setattr(self, key, '')<br> self.rev = '\0\0'<br> prologue = r'(?P<other>[^(]{,16}?)'<br> matrix = r'(?P<rev>..)(?P<eisa>...)(?P<product>....)(?:@(?P<serial>[^@]{,8}))?(?:@(?P<klass>[^@]{,32}))?(?:@(?P<compat>[^@]{,40}))?(?:@(?P<user>.{,40}?))?(?:..)?'<br>
needle1 = prologue + '\\(' + matrix.replace('@','\\\\') + '\\)'<br> needle2 = prologue + '\\\x08' + matrix.replace('@','\\\x3C') + '\\\x09'<br>
dct = dict()<br> mo = re.match(needle1, data, re.S)<br> if mo:<br> dct = mo.groupdict()<br> else:<br> mo = re.match(needle2, data, re.S)<br> if mo:<br> dct = mo.groupdict()<br>
for k in "eisa product serial klass compat user".split():<br> v = dct[k]<br> dct[k] = ''.join([chr(ord(ch)+0x20) for ch in list(v)])<br> for k,v in dct.items():<br>
setattr(self, k, v)<br> self.rev = ((ord(self.rev[0])&0x3f)<<6) + (ord(self.rev[1])&0x3f)<br> def __str__(self):<br> return self.data<br> def __repr__(self):<br> l = ['<StructuredPnP object> %r' % self.data]<br>
for k in "other rev eisa product serial klass compat user".split():<br> l.append(' %-8s %r' % (k, getattr(self,k,False)))<br> return '\n'.join(l)<br><br>class PnPProtocol(protocol.Protocol):<br>
""" See Plug and Play External COM device Specification, rev 1.00 from Microsoft & Hayes, 1994-1995"""<br> def __init__(self, deferred):<br> self.deferred = deferred<br> self.data = ''<br>
self.timeout = 1.4<br> self.T5 = reactor.callLater(self.timeout, self.deliverPnP)<br> def deliverPnP(self):<br> self.transport.loseConnection()<br> if self.deferred is not None:<br> d, self.deferred = self.deferred, None<br>
d.callback(StructuredPnP(self.data))<br> def dataReceived(self, data):<br> self.T5.reset(self.timeout)<br> self.data += data<br> if len(self.data)>=256:<br> self.T5.reset(0)<br>
@inlineCallbacks<br> def connectionMade(self):<br> while 1:<br> # print "2.1.1"<br> self.transport.setDTR(1)<br> self.transport.setRTS(0)<br> yield wait(0.2)<br>
if not self.transport.getDSR():<br> break<br> # print "2.1.3 part A"<br> self.transport.setDTR(0)<br> yield wait(0.2)<br> # print "2.1.3 part B"<br>
self.transport.setDTR(1)<br> yield wait(0.2)<br> # print "2.1.4"<br> self.transport.setRTS(1)<br> # timer T5 is now used for per-character timeout<br> self.timeout = 0.2<br>
yield wait(0.2)<br> if self.data:<br> break<br> # print "2.1.5"<br> self.transport.setDTR(0)<br> self.transport.setRTS(0)<br> yield wait(0.2)<br>
# print "2.1.6"<br> self.transport.setDTR(1)<br> self.transport.setRTS(1)<br> yield wait(0.2)<br> break<br> if not self.data:<br> self.T5.reset(0)<br>
returnValue(None)<br> def connectionLost(self, reason='connectionDone'):<br> print "Connection lost:", reason<br><br>def pnpString(port=0):<br> d = Deferred()<br> protocol = PnPProtocol(d)<br>
try:<br> #SerialPort(protocol, port, reactor, baudrate=1200, bytesize=SEVENBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=0, xonxoff=0, rtscts=0)<br> SerialPort(protocol, port, reactor, baudrate=1200, bytesize=SEVENBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, xonxoff=0, rtscts=0)<br>
except serial.SerialException:<br> d.callback(StructuredPnP(''))<br> return d<br><br>@inlineCallbacks<br>def imRunning():<br> print "PnP string: %r" % (yield pnpString(3))<br> reactor.stop()<br>
<br>if __name__ == "__main__":<br> reactor.callWhenRunning(imRunning)<br> reactor.run()</pre><br><br>On Tue, Sep 14, 2010 at 11:24 AM, Markus Hubig <<a href="mailto:mhubig@imko.de">mhubig@imko.de</a>> wrote:<br>
> Hi @all!<br>> I'm trying to write a python library module for a special<br>> serial communication protocol called IMPBUS. To use the serial<br>> interface for sending and receiving packets as for now I'm<br>
> sub-classing pyserial. My code looks like this:<br>><br>> But the problem is that I can't use select with pyserial on Windows,<br>> because it don't provide the fileno() methode. So after some googling<br>
> I found twisted.internet.serialport "A select()able serial device, acting<br>> as a transport." <br>> I never used twisted before so I'm a little overwhelmed by how I can<br>> replace pyserial with twisted in the code above ... maybe someone can<br>
> point me to the right direction. It seems I need a "Protocol" and a<br>> "receiver" ... <br>> - Markus<br>> --<br><br>