Ticket #2157: win32_console_io.2.patch
| File win32_console_io.2.patch, 32.6 KB (added by khorn, 4 years ago) |
|---|
-
twisted/internet/_pollingfile.py
3 3 4 4 Implements a simple polling interface for file descriptors that don't work with 5 5 select() - this is pretty much only useful on Windows. 6 7 6 """ 8 7 8 import sys 9 9 from zope.interface import implements 10 10 11 11 from twisted.internet.interfaces import IConsumer, IPushProducer … … 13 13 MIN_TIMEOUT = 0.000000001 14 14 MAX_TIMEOUT = 0.1 15 15 16 class _PollableResource: 16 17 class _PollableResource(object): 17 18 active = True 18 19 19 20 def activate(self): … … 22 23 def deactivate(self): 23 24 self.active = False 24 25 25 class _PollingTimer: 26 27 class _PollingTimer(object): 26 28 # Everything is private here because it is really an implementation detail. 27 29 28 30 def __init__(self, reactor): … … 91 93 # If we ever (let's hope not) need the above functionality on UNIX, this could 92 94 # be factored into a different module. 93 95 94 import win32pipe95 96 import win32file 96 97 import win32api 97 98 import pywintypes 98 99 99 class _PollableReadPipe(_PollableResource):100 100 101 class _PollableReader(_PollableResource): 102 101 103 implements(IPushProducer) 102 104 103 def __init__(self, pipe, receivedCallback, lostCallback): 104 # security attributes for pipes 105 self.pipe = pipe 105 def __init__(self, handle, receivedCallback, lostCallback): 106 self.handle = handle 106 107 self.receivedCallback = receivedCallback 107 108 self.lostCallback = lostCallback 108 109 109 110 def checkWork(self): 110 finished = 0 111 fullDataRead = [] 111 raise NotImplementedError() 112 112 113 while 1:114 try:115 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)116 # finished = (result == -1)117 if not bytesToRead:118 break119 hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)120 fullDataRead.append(data)121 except win32api.error:122 finished = 1123 break124 125 dataBuf = ''.join(fullDataRead)126 if dataBuf:127 self.receivedCallback(dataBuf)128 if finished:129 self.cleanup()130 return len(dataBuf)131 132 113 def cleanup(self): 133 114 self.deactivate() 134 115 self.lostCallback() 135 116 136 117 def close(self): 118 # XXX why not cleanup? 137 119 try: 138 win32api.CloseHandle(self. pipe)120 win32api.CloseHandle(self.handle) 139 121 except pywintypes.error: 140 122 # You can't close std handles...? 141 123 pass … … 150 132 self.activate() 151 133 152 134 153 FULL_BUFFER_SIZE = 64 * 1024154 135 155 class _PollableWritePipe(_PollableResource): 136 class _PollableWriter(_PollableResource): 137 FULL_BUFFER_SIZE = 64 * 1024 156 138 157 139 implements(IConsumer) 158 159 def __init__(self, writePipe, lostCallback):140 141 def __init__(self, handle, lostCallback): 160 142 self.disconnecting = False 161 143 self.producer = None 162 144 self.producerPaused = 0 163 145 self.streamingProducer = 0 164 146 self.outQueue = [] 165 self. writePipe = writePipe147 self.handle = handle 166 148 self.lostCallback = lostCallback 167 try:168 win32pipe.SetNamedPipeHandleState(writePipe,169 win32pipe.PIPE_NOWAIT,170 None,171 None)172 except pywintypes.error:173 # Maybe it's an invalid handle. Who knows.174 pass175 149 176 150 def close(self): 177 151 self.disconnecting = True … … 219 193 def writeConnectionLost(self): 220 194 self.deactivate() 221 195 try: 222 win32api.CloseHandle(self. writePipe)196 win32api.CloseHandle(self.handle) 223 197 except pywintypes.error: 224 198 # OMG what 225 199 pass … … 232 206 if self.disconnecting: 233 207 return 234 208 self.outQueue.append(data) 235 if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:209 if sum(map(len, self.outQueue)) > self.FULL_BUFFER_SIZE: 236 210 self.bufferFull() 237 211 238 212 def checkWork(self): 213 raise NotImplementedError() 214 215 216 217 # 218 # Pipe support 219 # 220 import win32pipe 221 222 223 class _PollableReadPipe(_PollableReader): 224 def __init__(self, pipe, receivedCallback, lostCallback): 225 _PollableReader.__init__(self, pipe, receivedCallback, lostCallback) 226 # security attributes for pipes 227 228 def checkWork(self): 229 finished = 0 230 fullDataRead = [] 231 232 while 1: 233 try: 234 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.handle, 1) 235 # finished = (result == -1) 236 if not bytesToRead: 237 break 238 hr, data = win32file.ReadFile(self.handle, bytesToRead, None) 239 fullDataRead.append(data) 240 except win32api.error: 241 finished = 1 242 break 243 244 dataBuf = ''.join(fullDataRead) 245 if dataBuf: 246 self.receivedCallback(dataBuf) 247 if finished: 248 self.cleanup() 249 return len(dataBuf) 250 251 252 class _PollableWritePipe(_PollableWriter): 253 def __init__(self, writePipe, lostCallback): 254 _PollableWriter.__init__(self, writePipe, lostCallback) 255 256 try: 257 win32pipe.SetNamedPipeHandleState(writePipe, 258 win32pipe.PIPE_NOWAIT, 259 None, 260 None) 261 except pywintypes.error: 262 # Maybe it's an invalid handle. Who knows. 263 pass 264 265 def checkWork(self): 239 266 numBytesWritten = 0 240 267 if not self.outQueue: 241 268 if self.disconnecting: 242 269 self.writeConnectionLost() 243 270 return 0 244 271 try: 245 win32file.WriteFile(self. writePipe, '', None)272 win32file.WriteFile(self.handle, '', None) 246 273 except pywintypes.error: 247 274 self.writeConnectionLost() 248 275 return numBytesWritten … … 250 277 data = self.outQueue.pop(0) 251 278 errCode = 0 252 279 try: 253 errCode, nBytesWritten = win32file.WriteFile(self. writePipe,280 errCode, nBytesWritten = win32file.WriteFile(self.handle, 254 281 data, None) 255 282 except win32api.error: 256 283 self.writeConnectionLost() … … 266 293 if not resumed and self.disconnecting: 267 294 self.writeConnectionLost() 268 295 return numBytesWritten 269 270 -
twisted/internet/_win32stdio.py
1 # -*- test-case-name: twisted.test.test_process.ProcessTestCase.testStdio -*-1 # -*- test-case-name: twisted.test.test_process.ProcessTestCase.testStdio,twisted.test.test_conio.StdIOTestCase -*- 2 2 3 import os 4 import errno 5 import msvcrt 6 7 import pywintypes 3 8 import win32api 4 import os, msvcrt9 import win32console 5 10 6 11 from zope.interface import implements 7 12 8 13 from twisted.internet.interfaces import IHalfCloseableProtocol, ITransport, IAddress 9 14 from twisted.internet.interfaces import IConsumer, IPushProducer 15 from twisted.internet import main, abstract 10 16 11 from twisted.internet import _pollingfile, main 17 import _pollingfile 12 18 19 20 21 # _pollingfile support 22 # XXX check me 23 class _PollableReadConsole(_pollingfile._PollableReader): 24 def __init__(self, channel, receivedCallback, lostCallback): 25 _pollingfile._PollableReader.__init__(self, channel.handle, 26 receivedCallback, lostCallback) 27 self.channel = channel 28 29 def checkWork(self): 30 try: 31 data = self.channel.read(2000) # 2000 = 80 x 25 32 except IOError, ioe: 33 assert ioe.args[0] == errno.EAGAIN 34 return 0 35 except win32console.error: 36 # stdin or stdout closed? 37 self.cleanup() 38 return 0 39 40 self.receivedCallback(data) 41 return len(data) 42 43 44 class _PollableWriteConsole(_pollingfile._PollableWriter): 45 def __init__(self, channel, lostCallback): 46 _pollingfile._PollableWriter.__init__(self, channel.handle, lostCallback) 47 48 self.channel = channel 49 50 def checkWork(self): 51 numBytesWritten = 0 52 if not self.outQueue: 53 if self.disconnecting: 54 self.writeConnectionLost() 55 return 0 56 try: 57 self.channel.write('') 58 except pywintypes.error: 59 self.writeConnectionLost() 60 return numBytesWritten 61 62 while self.outQueue: 63 data = self.outQueue.pop(0) 64 try: 65 # XXX as far as I know, 66 # nBytesWritten is always equal to len(data) 67 nBytesWritten = self.channel.write(data) 68 except win32console.error: 69 self.writeConnectionLost() 70 break 71 else: 72 numBytesWritten += nBytesWritten 73 if len(data) > nBytesWritten: 74 self.outQueue.insert(0, data[nBytesWritten:]) 75 break 76 else: 77 resumed = self.bufferEmpty() 78 if not resumed and self.disconnecting: 79 self.writeConnectionLost() 80 81 return numBytesWritten 82 83 84 85 # support for win32eventreactor 86 # XXX check me 87 class ConsoleReader(abstract.FileDescriptor): 88 def __init__(self, channel, receivedCallback, lostCallback, reactor=None): 89 """win32eventreactor is assumed. 90 """ 91 92 if not reactor: 93 from twisted.internet import reactor 94 95 self.channel = channel 96 self.receivedCallback = receivedCallback 97 self.lostCallback = lostCallback 98 self.reactor = reactor 99 100 self.reactor.addEvent(self.channel.handle, self, "doRead") 101 102 103 def doRead(self): 104 try: 105 data = self.channel.read(2000) # 2000 = 80 x 25 106 except IOError, ioe: 107 assert ioe.args[0] == errno.EAGAIN 108 return 0 109 except win32console.error: 110 # stdin or stdout closed? 111 self.cleanup() 112 return 0 113 114 self.receivedCallback(data) 115 return len(data) 116 117 def cleanup(self): 118 self.reactor.removeEvent(self.channel.handle) 119 self.lostCallback() 120 121 def close(self): 122 try: 123 win32api.CloseHandle(self.channel.handle) 124 except pywintypes.error: 125 # You can't close std handles...? 126 pass 127 128 self.cleanup() 129 130 def stopProducing(self): 131 self.close() 132 133 def pauseProducing(self): 134 self.reactor.removeEvent(self.channel.handle) 135 136 def resumeProducing(self): 137 self.reactor.addEvent(self.channel.handle, self, "doRead") 138 139 13 140 class Win32PipeAddress(object): 14 141 implements(IAddress) 15 142 143 class Win32ConsoleAddress(object): 144 implements(IAddress) 145 146 16 147 class StandardIO(_pollingfile._PollingTimer): 17 148 18 149 implements(ITransport, … … 28 159 29 160 Also, put it stdin/stdout/stderr into binary mode. 30 161 """ 162 31 163 from twisted.internet import reactor 32 164 from twisted.internet.win32eventreactor import Win32Reactor 165 33 166 for stdfd in range(0, 1, 2): 34 167 msvcrt.setmode(stdfd, os.O_BINARY) 35 168 36 169 _pollingfile._PollingTimer.__init__(self, reactor) 37 170 self.proto = proto 171 172 # Check for win32eventreactor 173 win32Enabled = reactor.__class__ is Win32Reactor 38 174 39 hstdin = win32api.GetStdHandle(win32api.STD_INPUT_HANDLE) 40 hstdout = win32api.GetStdHandle(win32api.STD_OUTPUT_HANDLE) 175 # Check if we are connected to a console. 176 # If this is the case, connect to the console, else connect to 177 # anonymous pipes. 178 if os.isatty(0): 179 import conio 180 181 if win32Enabled: 182 self.stdin = ConsoleReader( 183 conio.stdin, self.dataReceived, self.readConnectionLost 184 ) 185 else: 186 self.stdin = _PollableReadConsole( 187 conio.stdin, self.dataReceived, self.readConnectionLost 188 ) 189 else: 190 hstdin = win32api.GetStdHandle(win32api.STD_INPUT_HANDLE) 191 self.stdin = _pollingfile._PollableReadPipe( 192 hstdin, self.dataReceived, self.readConnectionLost 193 ) 41 194 42 self.stdin = _pollingfile._PollableReadPipe( 43 hstdin, self.dataReceived, self.readConnectionLost) 195 if os.isatty(1): 196 import conio 197 198 self.stdout = _PollableWriteConsole( 199 conio.stdout, self.writeConnectionLost 200 ) 201 else: 202 hstdout = win32api.GetStdHandle(win32api.STD_OUTPUT_HANDLE) 203 self.stdout = _pollingfile._PollableWritePipe( 204 hstdout, self.writeConnectionLost 205 ) 206 207 if not (os.isatty(0) and win32Enabled): 208 self._addPollableResource(self.stdin) 44 209 45 self.stdout = _pollingfile._PollableWritePipe(46 hstdout, self.writeConnectionLost)47 48 self._addPollableResource(self.stdin)49 210 self._addPollableResource(self.stdout) 50 211 51 212 self.proto.makeConnection(self) … … 86 247 self.stdout.close() 87 248 88 249 def getPeer(self): 89 return Win32PipeAddress() 250 if os.isatty(0) and os.isatty(1): 251 return Win32ConsoleAddress() 252 else: 253 return Win32PipeAddress() 90 254 91 255 def getHost(self): 92 return Win32PipeAddress() 256 if os.isatty(0) and os.isatty(1): 257 return Win32ConsoleAddress() 258 else: 259 return Win32PipeAddress() 93 260 261 94 262 # IConsumer 95 263 96 264 def registerProducer(self, producer, streaming): … … 113 281 114 282 def resumeProducing(self): 115 283 self.stdin.resumeProducing() 116 -
twisted/internet/conio.py
1 # -*- test-case-name: twisted.test.test_conio -*- 2 """This module implements POSIX replacements for stdin/stdout/stderr 3 that support asyncronous read/write to a Windows console. 4 5 Some details about Windows console: 6 - a process can have attached only one console 7 - there can be only one input buffer 8 - there can be more then one output buffer 9 10 Moreover this module tries to offer an higher level and convenient 11 interface for termios commands. 12 """ 13 14 import errno 15 import pywintypes 16 import win32api 17 import win32console 18 19 20 21 _ENABLE_NORMAL_MODE = win32console.ENABLE_ECHO_INPUT | win32console.ENABLE_LINE_INPUT 22 _ENABLE_WINDOW_INPUT = win32console.ENABLE_WINDOW_INPUT 23 24 25 class ConIn(object): 26 """I implement a file like object that supports asyncronous reading 27 from a console. 28 29 This class should be considered a singleton, don't instantiate new 30 objects and instead use the global stdin object. 31 """ 32 33 def __init__(self, handle): 34 # handle should be std handle for STD_INPUT_HANDLE 35 self.handle = handle 36 37 # The code page in use 38 # I assume that this does not change 39 self.cp = "cp%d" % win32console.GetConsoleCP() 40 41 # The temporary (accumulation) buffer used to store the data as 42 # it arrives from the console 43 self._buf = [] 44 45 # The buffer used to store data ready to be read 46 self.buffer = '' 47 48 # Enable the receiving of input records when the console 49 # window (buffer) is changed 50 defaultMode = handle.GetConsoleMode() 51 handle.SetConsoleMode(defaultMode | _ENABLE_WINDOW_INPUT) 52 53 # The callback to be called upon the receiving of a windows 54 # change record 55 self._windowChangeCallback = None 56 57 # To optimize the code we use different functions for normal 58 # and raw mode 59 self.read = self._read 60 self.readline = self._readline 61 62 # 63 # termios interface 64 # 65 def enableRawMode(self, enabled=True): 66 """Enable raw mode. 67 68 XXX check me 69 """ 70 71 # Flush buffer 72 self._buf = [] 73 self.buffer = '' 74 75 # Flush the console buffer, too 76 self.handle.FlushConsoleInputBuffer() 77 78 mode = self.handle.GetConsoleMode() 79 80 if enabled: 81 self.read = self._read_raw 82 self.readline = self._readline_raw 83 84 # Set mode on the console, too 85 # XXX check me (this seems not to work) 86 self.handle.SetConsoleMode(mode & ~_ENABLE_NORMAL_MODE) 87 else: 88 self.read = self._read 89 self.readline = self._readline 90 91 # Set mode on the console, too 92 self.handle.SetConsoleMode(mode | _ENABLE_NORMAL_MODE) 93 94 def setWindowChangeCallback(self, callback): 95 """callback is called when the console window buffer is 96 changed. 97 98 Note: WINDOW_BUFFER_SIZE_EVENT is only raised when changing 99 the window *buffer* size from the console menu 100 """ 101 102 self._windowChangeCallback = callback 103 104 105 # 106 # File object interface 107 # 108 def close(self): 109 win32api.CloseHandle(self.handle) 110 111 def flush(self): 112 # Flush both internal buffer and system console buffer 113 self.buffer = '' 114 self._buf = [] 115 116 self.handle.FlushConsoleInputBuffer() 117 118 def fileno(self): 119 return self.handle 120 121 def isatty(self): 122 return True 123 124 def next(self): 125 raise NotImplementedError("Not yet implemented") 126 127 def _read(self, size=None): 128 """Read size bytes from the console. 129 An exception is raised when the operation would block. 130 131 XXX Just return the empty string instead of raising an exception? 132 """ 133 134 # This can fail if stdout has been closed 135 info = stdout.handle.GetConsoleScreenBufferInfo() 136 rowSize = info["MaximumWindowSize"].X 137 138 # Initialize the current cursor position 139 if not self._buf: 140 self.pos = info["CursorPosition"] 141 142 while 1: 143 n = self.handle.GetNumberOfConsoleInputEvents() 144 if n == 0: 145 break 146 147 records = self.handle.ReadConsoleInput(n) 148 149 # Process input 150 for record in records: 151 if record.EventType == win32console.WINDOW_BUFFER_SIZE_EVENT: 152 rowSize = record.Size.X 153 if self._windowChangeCallback: 154 self._windowChangeCallback() 155 if record.EventType != win32console.KEY_EVENT \ 156 or not record.KeyDown: 157 continue 158 159 char = record.Char 160 n = record.RepeatCount 161 if char == '\b': 162 pos = stdout.handle.GetConsoleScreenBufferInfo()["CursorPosition"] 163 164 # Move the cursor 165 x = pos.X - n 166 if x >= 0: 167 pos.X = x 168 # XXX assuming |x| < rowSize (I'm lazy) 169 elif pos.Y > self.pos.Y: 170 pos.X = rowSize - 1 171 pos.Y -= 1 172 173 stdout.handle.SetConsoleCursorPosition(pos) 174 stdout.handle.WriteConsoleOutputCharacter(' ' * n, pos) 175 176 # Delete the characters from accumulation buffer 177 self._buf = self._buf[:-n] 178 continue 179 elif char == '\0': 180 vCode = record.VirtualKeyCode 181 # XXX TODO handle keyboard navigation 182 continue 183 elif char == '\r': 184 char = '\n' * n 185 186 self._buf.append(char) 187 stdout.handle.WriteConsole(char) # do echo 188 189 # We have some data ready to be read 190 self.buffer = ''.join(self._buf) 191 self._buf = [] 192 self.pos = info["CursorPosition"] 193 194 if size is None: 195 size = len(self.buffer) 196 197 data = self.buffer[:size] 198 self.buffer = self.buffer[size:] 199 return data 200 201 char = char * n 202 data = char.encode(self.cp) 203 stdout.handle.WriteConsole(data) # do echo 204 205 self._buf.append(data) 206 207 if self.buffer: 208 data = self.buffer[:size] 209 self.buffer = self.buffer[size:] 210 return data 211 else: 212 raise IOError(errno.EAGAIN) 213 214 def _read_raw(self, size=None): 215 """Read size bytes from the console, in raw mode. 216 217 XXX check me. 218 """ 219 220 while 1: # XXX is this loop really needed? 221 n = self.handle.GetNumberOfConsoleInputEvents() 222 if n == 0: 223 break 224 225 records = self.handle.ReadConsoleInput(n) 226 227 # Process input 228 for record in records: 229 if record.EventType == win32console.WINDOW_BUFFER_SIZE_EVENT: 230 if self._windowChangeCallback: 231 self._windowChangeCallback() 232 if record.EventType != win32console.KEY_EVENT \ 233 or not record.KeyDown: 234 continue 235 236 char = record.Char 237 n = record.RepeatCount 238 if char == '\0': 239 vCode = record.VirtualKeyCode 240 # XXX TODO handle keyboard navigation 241 continue 242 elif char == '\r': 243 char = '\n' * n 244 245 char = char * n 246 data = char.encode(self.cp) 247 248 self._buf.append(data) 249 250 251 buffer = ''.join(self._buf) 252 if buffer: 253 if size is None: 254 size = len(buffer) 255 256 data = buffer[:size] 257 # Keep the remaining data in the accumulation buffer 258 self._buf = [buffer[size:]] 259 return data 260 else: 261 return '' 262 263 def _readline(self, size=None): 264 # XXX check me 265 return self._read(size) 266 267 def _readline_raw(self, size=None): 268 raise NotImplementedError("Not yet implemented") 269 270 271 272 class ConOut(object): 273 """I implement a file like object that supports asyncronous writing 274 to a console. 275 276 This class should be considered private, don't instantiate new 277 objects and instead use the global stdout and stderr objects. 278 279 Note that there is no option to make WriteConsole non blocking, 280 but is seems that this function does not block at all. 281 When a blocking operation like text selection is in action, the 282 process is halted. 283 """ 284 285 def __init__(self, handle): 286 # handle should be std handle for STD_OUTOUT_HANDLE or STD_ERROR_HANDLE 287 self.handle = handle 288 289 290 # 291 # File object interface 292 # 293 def close(self): 294 win32api.CloseHandle(self.handle) 295 296 def flush(self): 297 # There is no buffering 298 pass 299 300 def fileno(self): 301 return self.handle 302 303 def isatty(self): 304 return True 305 306 def write(self, s): 307 """Write a string to the console. 308 """ 309 310 return self.handle.WriteConsole(s) 311 312 def writelines(self, seq): 313 """Write a sequence of strings to the console. 314 """ 315 316 s = ''.join(seq) 317 return self.handle.WriteConsole(s) 318 319 320 321 # The public interface of this module 322 # XXX TODO replace sys.stdin, sys.stdout and sys.stderr? 323 stdin = ConIn(win32console.GetStdHandle(win32console.STD_INPUT_HANDLE)) 324 stdout = ConOut(win32console.GetStdHandle(win32console.STD_OUTPUT_HANDLE)) 325 stderr = ConOut(win32console.GetStdHandle(win32console.STD_ERROR_HANDLE)) 326 327 328 __all__ = [stdin, stdout, stderr] -
twisted/test/test_conio.py
1 """Test suite for asyncronous I/O support for Windows Console. 2 3 For testing I use the low level WriteConsoleInput function that allows 4 to write directly in the console input queue. 5 """ 6 7 import os, sys 8 import win32console 9 10 from twisted.trial import unittest 11 from twisted.python import filepath 12 from twisted.internet import error, defer, protocol, reactor 13 14 from twisted.internet import conio, _win32stdio as stdio 15 16 17 18 def createKeyEvent(char, repeat=1): 19 """Create a low level record structure with the given character. 20 """ 21 22 evt = win32console.PyINPUT_RECORDType(win32console.KEY_EVENT) 23 evt.KeyDown = True 24 evt.Char = char 25 evt.RepeatCount = repeat 26 27 return evt 28 29 30 stdin = win32console.GetStdHandle(win32console.STD_INPUT_HANDLE) 31 32 33 class ConInTestCase(unittest.TestCase): 34 """Test case for console stdin. 35 """ 36 37 def tearDown(self): 38 conio.stdin.flush() 39 40 def testRead(self): 41 data = u"hello\r" 42 records = [createKeyEvent(c) for c in data] 43 stdin.WriteConsoleInput(records) 44 45 result = conio.stdin.read() 46 self.failUnlessEqual(result, "hello\n") 47 48 def testRead2(self): 49 """Test two consecutives read. 50 """ 51 52 def read(): 53 data = u"hello\r" 54 records = [createKeyEvent(c) for c in data] 55 stdin.WriteConsoleInput(records) 56 57 result = conio.stdin.read() 58 self.failUnlessEqual(result, "hello\n") 59 60 read() 61 read() 62 63 def testReadMultiple(self): 64 """Test if repeated characters are handled correctly. 65 """ 66 67 data = u"hello\r" 68 records = [createKeyEvent(c, 3) for c in data] 69 stdin.WriteConsoleInput(records) 70 71 result = conio.stdin.read() 72 self.failUnlessEqual(result, "hhheeellllllooo\n\n\n") 73 74 def testReadWithDelete(self): 75 """Test if deletion is handled correctly. 76 """ 77 78 data = u"hello" + u"\b" * 5 + u"world\r" 79 records = [createKeyEvent(c) for c in data] 80 stdin.WriteConsoleInput(records) 81 82 result = conio.stdin.read() 83 self.failUnlessEqual(result, "world\n") 84 85 def testDeleteBoundary(self): 86 """Test if deletion is handled correctly. 87 """ 88 89 data = u"h" + "\b\b" + u"w\r" 90 records = [createKeyEvent(c) for c in data] 91 stdin.WriteConsoleInput(records) 92 93 result = conio.stdin.read() 94 self.failUnlessEqual(result, "w\n") 95 96 def testDeleteFullBoundary(self): 97 """Test if deletion is handled correctly. 98 """ 99 100 data = u"h" * 500 + "\b" * 600 + u"w\r" 101 records = [createKeyEvent(c) for c in data] 102 stdin.WriteConsoleInput(records) 103 104 result = conio.stdin.read() 105 self.failUnlessEqual(result, "w\n") 106 107 def testReadWithBuffer(self): 108 data = u"hello\r" 109 records = [createKeyEvent(c) for c in data] 110 stdin.WriteConsoleInput(records) 111 112 result = conio.stdin.read(3) 113 self.failUnlessEqual(result, "hel") 114 115 result = conio.stdin.read(3) 116 self.failUnlessEqual(result, "lo\n") 117 118 def testReadWouldBlock(self): 119 data = u"hello" 120 records = [createKeyEvent(c) for c in data] 121 stdin.WriteConsoleInput(records) 122 123 self.failUnlessRaises(IOError, conio.stdin.read) 124 125 def testReadWouldBlockBuffer(self): 126 data = u"hello" 127 records = [createKeyEvent(c) for c in data] 128 stdin.WriteConsoleInput(records) 129 130 self.failUnlessRaises(IOError, conio.stdin.read, 3) 131 132 def testIsatty(self): 133 self.failUnless(conio.stdin.isatty()) 134 135 def testBuffer(self): 136 data = u"hello" 137 records = [createKeyEvent(c) for c in data] 138 stdin.WriteConsoleInput(records) 139 140 try: 141 # This will put the data in the accumulation buffer 142 conio.stdin.read() 143 except IOError: 144 pass 145 146 self.failUnlessEqual(conio.stdin._buf, list("hello")) 147 148 def testFlush(self): 149 data = u"hello\r" 150 records = [createKeyEvent(c) for c in data] 151 stdin.WriteConsoleInput(records) 152 153 result = conio.stdin.read(3) 154 conio.stdin.flush() 155 156 self.failIf(conio.stdin.buffer) 157 self.failUnlessRaises(IOError, conio.stdin.read, 3) 158 159 def testFlushBuffer(self): 160 data = u"hello" 161 records = [createKeyEvent(c) for c in data] 162 stdin.WriteConsoleInput(records) 163 164 try: 165 # This will put the data in the accumulation buffer 166 conio.stdin.read() 167 except IOError: 168 pass 169 170 conio.stdin.flush() 171 172 self.failIf(conio.stdin.buffer) 173 self.failIf(conio.stdin._buf) 174 self.failUnlessRaises(IOError, conio.stdin.read, 3) 175 176 177 class ConInRawTestCase(unittest.TestCase): 178 """Test case for console stdin in raw mode. 179 """ 180 181 def setUp(self): 182 conio.stdin.enableRawMode() 183 184 def tearDown(self): 185 conio.stdin.flush() 186 conio.stdin.enableRawMode(False) 187 188 def testRead(self): 189 data = u"hello" 190 records = [createKeyEvent(c) for c in data] 191 stdin.WriteConsoleInput(records) 192 193 result = conio.stdin.read() 194 self.failUnlessEqual(result, "hello") 195 196 197 def testReadMultiple(self): 198 data = u"hello" 199 records = [createKeyEvent(c, 3) for c in data] 200 stdin.WriteConsoleInput(records) 201 202 result = conio.stdin.read() 203 self.failUnlessEqual(result, "hhheeellllllooo") 204 205 206 def testReadWithDelete(self): 207 data = u"hello" + u'\b' * 5 + u"world" 208 records = [createKeyEvent(c) for c in data] 209 stdin.WriteConsoleInput(records) 210 211 result = conio.stdin.read() 212 self.failUnlessEqual(result, "hello" + '\b' * 5 + "world") 213 214 def testReadWithBuffer(self): 215 data = u"hello\r" 216 records = [createKeyEvent(c) for c in data] 217 stdin.WriteConsoleInput(records) 218 219 result = conio.stdin.read(3) 220 self.failUnlessEqual(result, "hel") 221 222 result = conio.stdin.read(3) 223 self.failUnlessEqual(result, "lo\n") 224 225 def testFlush(self): 226 data = u"hello" 227 records = [createKeyEvent(c) for c in data] 228 stdin.WriteConsoleInput(records) 229 230 result = conio.stdin.read(3) 231 conio.stdin.flush() 232 233 self.failIf(conio.stdin.buffer) 234 self.failIf(conio.stdin.read()) 235 236 237 class ConOutTestCase(unittest.TestCase): 238 """Test case for console stdout. 239 Not very much to test, yet. 240 """ 241 242 def testWrite(self): 243 data = "hello" 244 n = conio.stdout.write(data) 245 246 self.failUnlessEqual(n, 5) 247 248 def testWriteUnicode(self): 249 data = u"hello" 250 n = conio.stdout.write(data) 251 252 self.failUnlessEqual(n, 5) 253 254 def testWritelines(self): 255 data = ["hello", "world"] 256 n = conio.stdout.writelines(data) 257 258 self.failUnlessEqual(n, 10) 259 260 def testIsatty(self): 261 self.failUnless(conio.stdout.isatty()) 262 263 264 265 class StdIOTestProtocol(protocol.Protocol): 266 def __init__(self): 267 self.onData = defer.Deferred() 268 269 def dataReceived(self, data): 270 self.onData.callback(data) 271 272 273 class StdIOTestCase(unittest.TestCase): 274 """Test twisted.internet.stdio support for consoles. 275 """ 276 277 def setUp(self): 278 p = StdIOTestProtocol() 279 self.stdio = stdio.StandardIO(p) 280 self.onData = p.onData 281 282 def tearDown(self): 283 self.stdio._pause() 284 try: 285 self.stdio._stopPolling() 286 except error.AlreadyCalled: 287 pass 288 289 conio.stdin.flush() 290 291 def testRead(self): 292 def cb(result): 293 self.failUnlessEqual(result, "hello\n") 294 295 data = u"hello\r" 296 records = [createKeyEvent(c) for c in data] 297 stdin.WriteConsoleInput(records) 298 299 return self.onData.addCallback(cb)
