| 1 | """Test suite for asyncronous I/O support for Windows Console. |
|---|
| 2 | |
|---|
| 3 | For testing I use the low level WriteConsoleInput function that allows |
|---|
| 4 | to write directly in the console input queue. |
|---|
| 5 | """ |
|---|
| 6 | |
|---|
| 7 | import os, sys |
|---|
| 8 | import win32console |
|---|
| 9 | |
|---|
| 10 | from twisted.trial import unittest |
|---|
| 11 | from twisted.python import filepath |
|---|
| 12 | from twisted.internet import error, defer, protocol, reactor |
|---|
| 13 | |
|---|
| 14 | from twisted.internet import conio, _win32stdio as stdio |
|---|
| 15 | |
|---|
| 16 | |
|---|
| 17 | |
|---|
| 18 | def createKeyEvent(char, repeat=1): |
|---|
| 19 | """Create a low level record structure with the given character. |
|---|
| 20 | """ |
|---|
| 21 | |
|---|
| 22 | evt = win32console.PyINPUT_RECORDType(win32console.KEY_EVENT) |
|---|
| 23 | evt.KeyDown = True |
|---|
| 24 | evt.Char = char |
|---|
| 25 | evt.RepeatCount = repeat |
|---|
| 26 | |
|---|
| 27 | return evt |
|---|
| 28 | |
|---|
| 29 | |
|---|
| 30 | stdin = win32console.GetStdHandle(win32console.STD_INPUT_HANDLE) |
|---|
| 31 | |
|---|
| 32 | |
|---|
| 33 | class ConInTestCase(unittest.TestCase): |
|---|
| 34 | """Test case for console stdin. |
|---|
| 35 | """ |
|---|
| 36 | |
|---|
| 37 | def tearDown(self): |
|---|
| 38 | conio.stdin.flush() |
|---|
| 39 | |
|---|
| 40 | def testRead(self): |
|---|
| 41 | data = u"hello\r" |
|---|
| 42 | records = [createKeyEvent(c) for c in data] |
|---|
| 43 | stdin.WriteConsoleInput(records) |
|---|
| 44 | |
|---|
| 45 | result = conio.stdin.read() |
|---|
| 46 | self.failUnlessEqual(result, "hello\n") |
|---|
| 47 | |
|---|
| 48 | def testRead2(self): |
|---|
| 49 | """Test two consecutives read. |
|---|
| 50 | """ |
|---|
| 51 | |
|---|
| 52 | def read(): |
|---|
| 53 | data = u"hello\r" |
|---|
| 54 | records = [createKeyEvent(c) for c in data] |
|---|
| 55 | stdin.WriteConsoleInput(records) |
|---|
| 56 | |
|---|
| 57 | result = conio.stdin.read() |
|---|
| 58 | self.failUnlessEqual(result, "hello\n") |
|---|
| 59 | |
|---|
| 60 | read() |
|---|
| 61 | read() |
|---|
| 62 | |
|---|
| 63 | def testReadMultiple(self): |
|---|
| 64 | """Test if repeated characters are handled correctly. |
|---|
| 65 | """ |
|---|
| 66 | |
|---|
| 67 | data = u"hello\r" |
|---|
| 68 | records = [createKeyEvent(c, 3) for c in data] |
|---|
| 69 | stdin.WriteConsoleInput(records) |
|---|
| 70 | |
|---|
| 71 | result = conio.stdin.read() |
|---|
| 72 | self.failUnlessEqual(result, "hhheeellllllooo\n\n\n") |
|---|
| 73 | |
|---|
| 74 | def testReadWithDelete(self): |
|---|
| 75 | """Test if deletion is handled correctly. |
|---|
| 76 | """ |
|---|
| 77 | |
|---|
| 78 | data = u"hello" + u"\b" * 5 + u"world\r" |
|---|
| 79 | records = [createKeyEvent(c) for c in data] |
|---|
| 80 | stdin.WriteConsoleInput(records) |
|---|
| 81 | |
|---|
| 82 | result = conio.stdin.read() |
|---|
| 83 | self.failUnlessEqual(result, "world\n") |
|---|
| 84 | |
|---|
| 85 | def testDeleteBoundary(self): |
|---|
| 86 | """Test if deletion is handled correctly. |
|---|
| 87 | """ |
|---|
| 88 | |
|---|
| 89 | data = u"h" + "\b\b" + u"w\r" |
|---|
| 90 | records = [createKeyEvent(c) for c in data] |
|---|
| 91 | stdin.WriteConsoleInput(records) |
|---|
| 92 | |
|---|
| 93 | result = conio.stdin.read() |
|---|
| 94 | self.failUnlessEqual(result, "w\n") |
|---|
| 95 | |
|---|
| 96 | def testDeleteFullBoundary(self): |
|---|
| 97 | """Test if deletion is handled correctly. |
|---|
| 98 | """ |
|---|
| 99 | |
|---|
| 100 | data = u"h" * 500 + "\b" * 600 + u"w\r" |
|---|
| 101 | records = [createKeyEvent(c) for c in data] |
|---|
| 102 | stdin.WriteConsoleInput(records) |
|---|
| 103 | |
|---|
| 104 | result = conio.stdin.read() |
|---|
| 105 | self.failUnlessEqual(result, "w\n") |
|---|
| 106 | |
|---|
| 107 | def testReadWithBuffer(self): |
|---|
| 108 | data = u"hello\r" |
|---|
| 109 | records = [createKeyEvent(c) for c in data] |
|---|
| 110 | stdin.WriteConsoleInput(records) |
|---|
| 111 | |
|---|
| 112 | result = conio.stdin.read(3) |
|---|
| 113 | self.failUnlessEqual(result, "hel") |
|---|
| 114 | |
|---|
| 115 | result = conio.stdin.read(3) |
|---|
| 116 | self.failUnlessEqual(result, "lo\n") |
|---|
| 117 | |
|---|
| 118 | def testReadWouldBlock(self): |
|---|
| 119 | data = u"hello" |
|---|
| 120 | records = [createKeyEvent(c) for c in data] |
|---|
| 121 | stdin.WriteConsoleInput(records) |
|---|
| 122 | |
|---|
| 123 | self.failUnlessRaises(IOError, conio.stdin.read) |
|---|
| 124 | |
|---|
| 125 | def testReadWouldBlockBuffer(self): |
|---|
| 126 | data = u"hello" |
|---|
| 127 | records = [createKeyEvent(c) for c in data] |
|---|
| 128 | stdin.WriteConsoleInput(records) |
|---|
| 129 | |
|---|
| 130 | self.failUnlessRaises(IOError, conio.stdin.read, 3) |
|---|
| 131 | |
|---|
| 132 | def testIsatty(self): |
|---|
| 133 | self.failUnless(conio.stdin.isatty()) |
|---|
| 134 | |
|---|
| 135 | def testBuffer(self): |
|---|
| 136 | data = u"hello" |
|---|
| 137 | records = [createKeyEvent(c) for c in data] |
|---|
| 138 | stdin.WriteConsoleInput(records) |
|---|
| 139 | |
|---|
| 140 | try: |
|---|
| 141 | # This will put the data in the accumulation buffer |
|---|
| 142 | conio.stdin.read() |
|---|
| 143 | except IOError: |
|---|
| 144 | pass |
|---|
| 145 | |
|---|
| 146 | self.failUnlessEqual(conio.stdin._buf, list("hello")) |
|---|
| 147 | |
|---|
| 148 | def testFlush(self): |
|---|
| 149 | data = u"hello\r" |
|---|
| 150 | records = [createKeyEvent(c) for c in data] |
|---|
| 151 | stdin.WriteConsoleInput(records) |
|---|
| 152 | |
|---|
| 153 | result = conio.stdin.read(3) |
|---|
| 154 | conio.stdin.flush() |
|---|
| 155 | |
|---|
| 156 | self.failIf(conio.stdin.buffer) |
|---|
| 157 | self.failUnlessRaises(IOError, conio.stdin.read, 3) |
|---|
| 158 | |
|---|
| 159 | def testFlushBuffer(self): |
|---|
| 160 | data = u"hello" |
|---|
| 161 | records = [createKeyEvent(c) for c in data] |
|---|
| 162 | stdin.WriteConsoleInput(records) |
|---|
| 163 | |
|---|
| 164 | try: |
|---|
| 165 | # This will put the data in the accumulation buffer |
|---|
| 166 | conio.stdin.read() |
|---|
| 167 | except IOError: |
|---|
| 168 | pass |
|---|
| 169 | |
|---|
| 170 | conio.stdin.flush() |
|---|
| 171 | |
|---|
| 172 | self.failIf(conio.stdin.buffer) |
|---|
| 173 | self.failIf(conio.stdin._buf) |
|---|
| 174 | self.failUnlessRaises(IOError, conio.stdin.read, 3) |
|---|
| 175 | |
|---|
| 176 | |
|---|
| 177 | class ConInRawTestCase(unittest.TestCase): |
|---|
| 178 | """Test case for console stdin in raw mode. |
|---|
| 179 | """ |
|---|
| 180 | |
|---|
| 181 | def setUp(self): |
|---|
| 182 | conio.stdin.enableRawMode() |
|---|
| 183 | |
|---|
| 184 | def tearDown(self): |
|---|
| 185 | conio.stdin.flush() |
|---|
| 186 | conio.stdin.enableRawMode(False) |
|---|
| 187 | |
|---|
| 188 | def testRead(self): |
|---|
| 189 | data = u"hello" |
|---|
| 190 | records = [createKeyEvent(c) for c in data] |
|---|
| 191 | stdin.WriteConsoleInput(records) |
|---|
| 192 | |
|---|
| 193 | result = conio.stdin.read() |
|---|
| 194 | self.failUnlessEqual(result, "hello") |
|---|
| 195 | |
|---|
| 196 | |
|---|
| 197 | def testReadMultiple(self): |
|---|
| 198 | data = u"hello" |
|---|
| 199 | records = [createKeyEvent(c, 3) for c in data] |
|---|
| 200 | stdin.WriteConsoleInput(records) |
|---|
| 201 | |
|---|
| 202 | result = conio.stdin.read() |
|---|
| 203 | self.failUnlessEqual(result, "hhheeellllllooo") |
|---|
| 204 | |
|---|
| 205 | |
|---|
| 206 | def testReadWithDelete(self): |
|---|
| 207 | data = u"hello" + u'\b' * 5 + u"world" |
|---|
| 208 | records = [createKeyEvent(c) for c in data] |
|---|
| 209 | stdin.WriteConsoleInput(records) |
|---|
| 210 | |
|---|
| 211 | result = conio.stdin.read() |
|---|
| 212 | self.failUnlessEqual(result, "hello" + '\b' * 5 + "world") |
|---|
| 213 | |
|---|
| 214 | def testReadWithBuffer(self): |
|---|
| 215 | data = u"hello\r" |
|---|
| 216 | records = [createKeyEvent(c) for c in data] |
|---|
| 217 | stdin.WriteConsoleInput(records) |
|---|
| 218 | |
|---|
| 219 | result = conio.stdin.read(3) |
|---|
| 220 | self.failUnlessEqual(result, "hel") |
|---|
| 221 | |
|---|
| 222 | result = conio.stdin.read(3) |
|---|
| 223 | self.failUnlessEqual(result, "lo\n") |
|---|
| 224 | |
|---|
| 225 | def testFlush(self): |
|---|
| 226 | data = u"hello" |
|---|
| 227 | records = [createKeyEvent(c) for c in data] |
|---|
| 228 | stdin.WriteConsoleInput(records) |
|---|
| 229 | |
|---|
| 230 | result = conio.stdin.read(3) |
|---|
| 231 | conio.stdin.flush() |
|---|
| 232 | |
|---|
| 233 | self.failIf(conio.stdin.buffer) |
|---|
| 234 | self.failIf(conio.stdin.read()) |
|---|
| 235 | |
|---|
| 236 | |
|---|
| 237 | class ConOutTestCase(unittest.TestCase): |
|---|
| 238 | """Test case for console stdout. |
|---|
| 239 | Not very much to test, yet. |
|---|
| 240 | """ |
|---|
| 241 | |
|---|
| 242 | def testWrite(self): |
|---|
| 243 | data = "hello" |
|---|
| 244 | n = conio.stdout.write(data) |
|---|
| 245 | |
|---|
| 246 | self.failUnlessEqual(n, 5) |
|---|
| 247 | |
|---|
| 248 | def testWriteUnicode(self): |
|---|
| 249 | data = u"hello" |
|---|
| 250 | n = conio.stdout.write(data) |
|---|
| 251 | |
|---|
| 252 | self.failUnlessEqual(n, 5) |
|---|
| 253 | |
|---|
| 254 | def testWritelines(self): |
|---|
| 255 | data = ["hello", "world"] |
|---|
| 256 | n = conio.stdout.writelines(data) |
|---|
| 257 | |
|---|
| 258 | self.failUnlessEqual(n, 10) |
|---|
| 259 | |
|---|
| 260 | def testIsatty(self): |
|---|
| 261 | self.failUnless(conio.stdout.isatty()) |
|---|
| 262 | |
|---|
| 263 | |
|---|
| 264 | |
|---|
| 265 | class StdIOTestProtocol(protocol.Protocol): |
|---|
| 266 | def __init__(self): |
|---|
| 267 | self.onData = defer.Deferred() |
|---|
| 268 | |
|---|
| 269 | def dataReceived(self, data): |
|---|
| 270 | self.onData.callback(data) |
|---|
| 271 | |
|---|
| 272 | |
|---|
| 273 | class StdIOTestCase(unittest.TestCase): |
|---|
| 274 | """Test twisted.internet.stdio support for consoles. |
|---|
| 275 | """ |
|---|
| 276 | |
|---|
| 277 | def setUp(self): |
|---|
| 278 | p = StdIOTestProtocol() |
|---|
| 279 | self.stdio = stdio.StandardIO(p) |
|---|
| 280 | self.onData = p.onData |
|---|
| 281 | |
|---|
| 282 | def tearDown(self): |
|---|
| 283 | self.stdio._pause() |
|---|
| 284 | try: |
|---|
| 285 | self.stdio._stopPolling() |
|---|
| 286 | except error.AlreadyCalled: |
|---|
| 287 | pass |
|---|
| 288 | |
|---|
| 289 | conio.stdin.flush() |
|---|
| 290 | |
|---|
| 291 | def testRead(self): |
|---|
| 292 | def cb(result): |
|---|
| 293 | self.failUnlessEqual(result, "hello\n") |
|---|
| 294 | |
|---|
| 295 | data = u"hello\r" |
|---|
| 296 | records = [createKeyEvent(c) for c in data] |
|---|
| 297 | stdin.WriteConsoleInput(records) |
|---|
| 298 | |
|---|
| 299 | return self.onData.addCallback(cb) |
|---|