[Twisted-Python] using stdio with Twisted

Neurophyre listbox at evernex.com
Tue Mar 20 14:31:19 MDT 2007


On Mar 20, 2007, at 12:43 PM, Jean-Paul Calderone wrote:

>
> Well, it is also a protocol implementation for something like  
> VT102.  A lot
> of the tests bypass the byte-level stuff because that's not what's  
> interesting
> for what they're testing.  In your actual code, ServerProtocol is  
> what is
> responsible for calling keystrokeReceived with all of those  
> interesting
> values.

Sorry, but I don't understand how to make it do that.  I have my own  
user interface 'protocol' which at this point is getting  
keystrokeReceived events and echoing them to the screen, but it's  
only getting a keyID and the modifier field is always None.  Also, if  
I enter a control-key, the control character is echoed (for example  
ctrl-J produces a line feed) but the modifier is not set.

I've grepped your code and found only one place, in widgets.py, where  
it's used that isn't a test case -- and that's just doing a  
comparison on a received keystroke, I think.

Here's my relevant code, for reference:


class DOCUIProtocol(recvline.RecvLine):
     """
     A 'protocol' which handles the user interface.
     """

     def connectionMade(self):
         """runWithProtocol() in main"""
         # process individual keystrokes, hopefully

         # http://svn.twistedmatrix.com/cvs/sandbox/exarkun/invective/ 
trunk/invective/tui.py?rev=19759&view=auto
         super(DOCUIProtocol, self).connectionMade()
         self.terminal.resetPrivateModes([privateModes.CURSOR_MODE])

         # set up quasi-singleton state variables
         self.__buff = ClientUITidbits.KeystrokeBuffer()
         self.__clientUIState = ClientUITidbits.UIState()
         self.__clientUIState.setState(Constants.UISTATE_DISCONNECTED)

         # make a quasi-singleton instance of ourself so others can  
use write() and writeLine()
         dummy = ClientUITidbits.StdIO()
         dummy.stdioInstance = self;

         d = self.write(u'done.\n')
         self.connectBBS()

     def keystrokeReceived(self, keyID, modifier):
         # see http://twistedmatrix.com/documents/current/api/ 
twisted.conch.insults.insults.TerminalProtocol.html
         # TODO fix for control keys
         self.writeLine("Got key: " + keyID + " " + str(modifier))

         # ?
         if keyID == ServerProtocol.RIGHT_ARROW:
             self.writeLine("killed")
             reactor.stop()

         #self.__buff.append(keyID)
         #ClientParseInput.KeystrokeDispatcher.parse()

     # TODO delete if we continue using insults
     def dataReceived(self, data):
         self.__buff.append(data)
         ClientParseInput.KeystrokeDispatcher.parse()

     def write(self, data):
         """write data to stdout, encoded in an output charset"""
         # TODO delete for conch
         # self.transport.write(data.encode(Constants.CHARSET_OUTPUT))
         self.terminal.write(data.encode(Constants.CHARSET_OUTPUT))

     def writeLine(self, data):
         """write data plus a newline, encoded in an output charset"""
         # TODO find out why this isn't working right
         # TODO delete for conch
         # self.transport.write(data.encode(Constants.CHARSET_OUTPUT  
+ '\n'))
         self.terminal.write(data.encode(Constants.CHARSET_OUTPUT) +  
'\n')

     def connectBBS(self):
         """initiate the TCP connection to the BBS server and start  
the reactor."""
         # the factory writes diagnostic messages to the stdio  
instance for us
         reactor.connectTCP('vapor.evernex.net', 9023,  
ClientProtocol.BBSClientProtocolFactory())
         # reactor.run()


class CommandLineUserInterface(DOCUIProtocol):
     """
     See http://svn.twistedmatrix.com/cvs/sandbox/exarkun/invective/ 
trunk/invective/tui.py?rev=19759&view=auto
     """

     def connectionMade(self):
         signal(SIGWINCH, self.windowChanged)
         winSize = self.getWindowSize()
         self.width = winSize[0]
         self.height = winSize[1]
         super(CommandLineUserInterface, self).connectionMade()

     def connectionLost(self, reason):
         reactor.stop()

     # XXX Should be part of runWithProtocol
     def getWindowSize(self):
         winsz = ioctl(0, TIOCGWINSZ, '12345678')
         winSize = unpack('4H', winsz)
         newSize = winSize[1], winSize[0], winSize[3], winSize[2]
         return newSize

     def windowChanged(self, signum, frame):
         winSize = self.getWindowSize()
         self.terminalSize(winSize[0], winSize[1])


if __name__ == "__main__":
     splashLine = Constants.VERSION_CLIENT_LINE + u' - ' +  
Constants.VERSION_COPYRIGHT
     print splashLine.encode(Constants.CHARSET_OUTPUT)
     print 'Connecting user interface: ',
     runWithProtocol(CommandLineUserInterface)





More information about the Twisted-Python mailing list