Ticket #2157: test_win32conio.py

File test_win32conio.py, 8.6 KB (added by John Popplewell, 6 years ago)

twisted.internet.test.test_win32conio

Line 
1# Copyright (c) 2009 Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Test suite for asyncronous I/O support for Windows Console.
6
7For testing I use the low level WriteConsoleInput function that allows
8writing directly into the console input queue.
9"""
10
11from twisted.python.runtime import platform
12
13import os, sys
14
15
16if platform.isWindows():
17    import win32console
18    from twisted.internet import win32conio, _win32stdio as stdio
19else:
20    win32console = None
21
22
23from twisted.trial import unittest
24from twisted.python import filepath
25from twisted.internet import error, defer, protocol, reactor
26
27
28def createKeyEvent(char, repeat=1):
29    """
30    Create a low level record structure with the given character.
31    """
32
33    evt = win32console.PyINPUT_RECORDType(win32console.KEY_EVENT)
34    evt.KeyDown = True
35    evt.Char = char
36    evt.RepeatCount = repeat
37
38    return evt
39
40if platform.isWindows():
41    stdin = win32conio.GetStdHandle("CONIN$")
42
43
44class ConInTestCase(unittest.TestCase):
45    """
46    Test case for console stdin.
47    """
48    def setUp(self):
49        self.console = win32conio.Console()
50        self.console.setEcho(False)
51
52    def tearDown(self):
53        self.console.closeRead()
54        self.console.closeWrite()
55
56    def testRead(self):
57        data = u"hello\r"
58        records = [createKeyEvent(c) for c in data]
59        stdin.WriteConsoleInput(records)
60
61        result = self.console.read()
62        self.failUnlessEqual(result, "hello\r\n")
63
64    def testRead2(self):
65        """
66        Test two consecutives read.
67        """
68
69        def read():
70            data = u"hello\r"
71            records = [createKeyEvent(c) for c in data]
72            stdin.WriteConsoleInput(records)
73
74            result = self.console.read()
75            self.failUnlessEqual(result, "hello\r\n")
76
77        read()
78        read()
79
80    def testReadMultiple(self):
81        """
82        Test if repeated characters are handled correctly.
83        """
84
85        data = u"hello\r"
86        records = [createKeyEvent(c, 3) for c in data]
87        stdin.WriteConsoleInput(records)
88
89        result = self.console.read()
90        self.failUnlessEqual(result, "hhheeellllllooo\r\n")
91
92    def testReadWithDelete(self):
93        """
94        Test if deletion is handled correctly.
95        """
96
97        data = u"hello" + u"\b" * 5 + u"world\r"
98        records = [createKeyEvent(c) for c in data]
99        stdin.WriteConsoleInput(records)
100
101        result = self.console.read()
102        self.failUnlessEqual(result, "world\r\n")
103
104    def testDeleteBoundary(self):
105        """
106        Test if deletion is handled correctly.
107        """
108
109        data = u"h" + "\b\b" + u"w\r"
110        records = [createKeyEvent(c) for c in data]
111        stdin.WriteConsoleInput(records)
112
113        result = self.console.read()
114        self.failUnlessEqual(result, "w\r\n")
115
116    def testDeleteFullBoundary(self):
117        """
118        Test if deletion is handled correctly.
119        """
120
121        data = u"h" * 500 + "\b" * 600 + u"w\r"
122        records = [createKeyEvent(c) for c in data]
123        stdin.WriteConsoleInput(records)
124
125        result = self.console.read()
126        self.failUnlessEqual(result, "w\r\n")
127
128#   def testReadWithBuffer(self):
129#       data = u"hello\r"
130#       records = [createKeyEvent(c) for c in data]
131#       stdin.WriteConsoleInput(records)
132
133#       result = self.stdin.read(3)
134#       self.failUnlessEqual(result, "hel")
135
136#       result = self.stdin.read(3)
137#       self.failUnlessEqual(result, "lo\r")
138
139    def testReadWouldBlock(self):
140        data = u"hello"
141        records = [createKeyEvent(c) for c in data]
142        stdin.WriteConsoleInput(records)
143
144        self.failUnlessEqual('', self.console.read())
145
146#   def testReadWouldBlockBuffer(self):
147#       data = u"hello"
148#       records = [createKeyEvent(c) for c in data]
149#       stdin.WriteConsoleInput(records)
150
151#       self.failUnlessRaises(IOError, self.stdin.read, 3)
152
153    def testIsatty(self):
154        self.failUnless(self.console.isatty())
155
156    def testBuffer(self):
157        data = u"hello"
158        records = [createKeyEvent(c) for c in data]
159        stdin.WriteConsoleInput(records)
160
161        try:
162            # This will put the data in the accumulation buffer
163            self.console.read()
164        except IOError:
165            pass
166
167        self.failUnlessEqual(self.console._inbuf, "hello")
168
169    def testFlush(self):
170        data = u"hello\r"
171        records = [createKeyEvent(c) for c in data]
172        stdin.WriteConsoleInput(records)
173
174        self.console.flushIn()
175
176        self.failIf(self.console.inbuffer)
177        self.failUnlessEqual('', self.console.read())
178
179    def testFlushBuffer(self):
180        data = u"hello"
181        records = [createKeyEvent(c) for c in data]
182        stdin.WriteConsoleInput(records)
183
184        try:
185            # This will put the data in the accumulation buffer
186            self.console.read()
187        except IOError:
188            pass
189
190        self.console.flushIn()
191
192        self.failIf(self.console.inbuffer)
193        self.failIf(self.console._inbuf)
194        self.failUnlessEqual('', self.console.read())
195
196
197class ConInRawTestCase(unittest.TestCase):
198    """
199    Test case for console stdin in raw mode.
200    """
201
202    def setUp(self):
203        self.console = win32conio.Console()
204        self.console.setEcho(False)
205        self.console.enableRawMode()
206
207    def tearDown(self):
208        self.console.closeRead()
209        self.console.closeWrite()
210
211    def testRead(self):
212        data = u"hello"
213        records = [createKeyEvent(c) for c in data]
214        stdin.WriteConsoleInput(records)
215
216        result = self.console.read()
217        self.failUnlessEqual(result, "hello")
218
219    def testReadMultiple(self):
220        data = u"hello"
221        records = [createKeyEvent(c, 3) for c in data]
222        stdin.WriteConsoleInput(records)
223
224        result = self.console.read()
225        self.failUnlessEqual(result, "hhheeellllllooo")
226
227    def testReadWithDelete(self):
228        data = u"hello" + u'\b' * 5 + u"world"
229        records = [createKeyEvent(c) for c in data]
230        stdin.WriteConsoleInput(records)
231
232        result = self.console.read()
233        self.failUnlessEqual(result, "hello" + '\b' * 5 + "world")
234
235#   def testReadWithBuffer(self):
236#       data = u"hello\r"
237#       records = [createKeyEvent(c) for c in data]
238#       stdin.WriteConsoleInput(records)
239
240#       result = self.console.read(3)
241#       self.failUnlessEqual(result, "hel")
242
243#       result = self.console.read(3)
244#       self.failUnlessEqual(result, "lo\n")
245
246    def testFlush(self):
247        data = u"hello"
248        records = [createKeyEvent(c) for c in data]
249        stdin.WriteConsoleInput(records)
250
251        self.console.flushIn()
252
253        self.failIf(self.console.inbuffer)
254        self.failIf(self.console.read())
255
256
257class ConOutTestCase(unittest.TestCase):
258    """
259    Test case for console stdout.
260    Not very much to test, yet.
261    """
262
263    def setUp(self):
264        self.console = win32conio.Console()
265        self.console.setEcho(False)
266        self.console.enableRawMode()
267
268    def tearDown(self):
269        self.console.closeRead()
270        self.console.closeWrite()
271
272    def testWrite(self):
273        data = "hello"
274        n = self.console.write(data)
275
276        self.failUnlessEqual(n, 5)
277
278    def testWriteUnicode(self):
279        data = u"hello"
280        n = self.console.write(data)
281
282        self.failUnlessEqual(n, 5)
283
284    def testWritelines(self):
285        data = ["hello", "world"]
286        n = self.console.writelines(data)
287
288        self.failUnlessEqual(n, 10)
289
290    def testIsatty(self):
291        self.failUnless(self.console.isatty())
292
293
294class StdIOTestProtocol(protocol.Protocol):
295    def makeConnection(self, transport):
296        self.onData = defer.Deferred()
297
298    def dataReceived(self, data):
299        self.onData.callback(data)
300
301
302class StdIOTestCase(unittest.TestCase):
303    """
304    Test twisted.internet.stdio support for consoles.
305    """
306
307    def setUp(self):
308        p = StdIOTestProtocol()
309        self.stdio = stdio.StandardIO(p, True)
310        self.stdio.setEcho(False)
311        self.onData = p.onData
312
313    def tearDown(self):
314        self.stdio._pause()
315        try:
316            self.stdio._stopPolling()
317        except error.AlreadyCalled:
318            pass
319
320    def testRead(self):
321        def cb(result):
322            self.failUnlessEqual(result, "hello\r\n")
323            self.stdio.loseConnection()
324
325        d = self.onData.addCallback(cb)
326        data = u"hello\r"
327        records = [createKeyEvent(c) for c in data]
328        stdin.WriteConsoleInput(records)
329        return d
330       
331if win32console is None:
332    ConInTestCase.skip = "win32conio is only available under Windows."
333    ConInRawTestCase.skip = "win32conio is only available under Windows."
334    ConOutTestCase.skip = "win32conio is only available under Windows."
335    StdIOTestCase.skip = "win32conio is only available under Windows."