Ticket #2157: test_conio.2.py

File test_conio.2.py, 7.7 KB (added by synapsis, 8 years ago)

test suite for console support, version 2

Line 
1"""Test suite for asyncronous I/O support for Windows Console.
2
3For testing I use the low level WriteConsoleInput function that allows
4to write directly in the console input queue.
5"""
6
7import os, sys
8import win32console
9
10from twisted.trial import unittest
11from twisted.python import filepath
12from twisted.internet import error, defer, protocol, reactor
13
14from twisted.internet import conio, _win32stdio as stdio
15
16
17
18def createKeyEvent(char, repeat=1):
19    """Create a low level record structure with the given character.
20    """
21   
22    evt = win32console.PyINPUT_RECORDType(win32console.KEY_EVENT)
23    evt.KeyDown = True
24    evt.Char = char
25    evt.RepeatCount = repeat
26
27    return evt
28
29
30stdin = win32console.GetStdHandle(win32console.STD_INPUT_HANDLE)
31
32
33class ConInTestCase(unittest.TestCase):
34    """Test case for console stdin.
35    """
36
37    def tearDown(self):
38        conio.stdin.flush()
39
40    def testRead(self):
41        data = u"hello\r"
42        records = [createKeyEvent(c) for c in data]
43        stdin.WriteConsoleInput(records)
44
45        result = conio.stdin.read()
46        self.failUnlessEqual(result, "hello\n")
47
48    def testRead2(self):
49        """Test two consecutives read.
50        """
51
52        def read():
53            data = u"hello\r"
54            records = [createKeyEvent(c) for c in data]
55            stdin.WriteConsoleInput(records)
56           
57            result = conio.stdin.read()
58            self.failUnlessEqual(result, "hello\n")
59   
60        read()
61        read()
62
63    def testReadMultiple(self):
64        """Test if repeated characters are handled correctly.
65        """
66
67        data = u"hello\r"
68        records = [createKeyEvent(c, 3) for c in data]
69        stdin.WriteConsoleInput(records)
70
71        result = conio.stdin.read()
72        self.failUnlessEqual(result, "hhheeellllllooo\n\n\n")
73
74    def testReadWithDelete(self):
75        """Test if deletion is handled correctly.
76        """
77
78        data = u"hello" + u"\b" * 5 + u"world\r"
79        records = [createKeyEvent(c) for c in data]
80        stdin.WriteConsoleInput(records)
81
82        result = conio.stdin.read()
83        self.failUnlessEqual(result, "world\n")
84
85    def testDeleteBoundary(self):
86        """Test if deletion is handled correctly.
87        """
88
89        data = u"h" + "\b\b" + u"w\r"
90        records = [createKeyEvent(c) for c in data]
91        stdin.WriteConsoleInput(records)
92
93        result = conio.stdin.read()
94        self.failUnlessEqual(result, "w\n")
95
96    def testDeleteFullBoundary(self):
97        """Test if deletion is handled correctly.
98        """
99
100        data = u"h" * 500 + "\b" * 600 + u"w\r"
101        records = [createKeyEvent(c) for c in data]
102        stdin.WriteConsoleInput(records)
103
104        result = conio.stdin.read()
105        self.failUnlessEqual(result, "w\n")
106
107    def testReadWithBuffer(self):
108        data = u"hello\r"
109        records = [createKeyEvent(c) for c in data]
110        stdin.WriteConsoleInput(records)
111
112        result = conio.stdin.read(3)
113        self.failUnlessEqual(result, "hel")
114
115        result = conio.stdin.read(3)
116        self.failUnlessEqual(result, "lo\n")
117
118    def testReadWouldBlock(self):
119        data = u"hello"
120        records = [createKeyEvent(c) for c in data]
121        stdin.WriteConsoleInput(records)
122
123        self.failUnlessRaises(IOError, conio.stdin.read)
124
125    def testReadWouldBlockBuffer(self):
126        data = u"hello"
127        records = [createKeyEvent(c) for c in data]
128        stdin.WriteConsoleInput(records)
129
130        self.failUnlessRaises(IOError, conio.stdin.read, 3)
131
132    def testIsatty(self):
133        self.failUnless(conio.stdin.isatty())
134
135    def testBuffer(self):
136        data = u"hello"
137        records = [createKeyEvent(c) for c in data]
138        stdin.WriteConsoleInput(records)
139
140        try:
141            # This will put the data in the accumulation buffer
142            conio.stdin.read()
143        except IOError:
144            pass
145       
146        self.failUnlessEqual(conio.stdin._buf, list("hello"))
147
148    def testFlush(self):
149        data = u"hello\r"
150        records = [createKeyEvent(c) for c in data]
151        stdin.WriteConsoleInput(records)
152
153        result = conio.stdin.read(3)
154        conio.stdin.flush()
155       
156        self.failIf(conio.stdin.buffer)
157        self.failUnlessRaises(IOError, conio.stdin.read, 3)
158
159    def testFlushBuffer(self):
160        data = u"hello"
161        records = [createKeyEvent(c) for c in data]
162        stdin.WriteConsoleInput(records)
163
164        try:
165            # This will put the data in the accumulation buffer
166            conio.stdin.read()
167        except IOError:
168            pass
169
170        conio.stdin.flush()
171       
172        self.failIf(conio.stdin.buffer)
173        self.failIf(conio.stdin._buf)
174        self.failUnlessRaises(IOError, conio.stdin.read, 3)
175
176
177class ConInRawTestCase(unittest.TestCase):
178    """Test case for console stdin in raw mode.
179    """
180
181    def setUp(self):
182        conio.stdin.enableRawMode()
183
184    def tearDown(self):
185        conio.stdin.flush()
186        conio.stdin.enableRawMode(False)
187
188    def testRead(self):
189        data = u"hello"
190        records = [createKeyEvent(c) for c in data]
191        stdin.WriteConsoleInput(records)
192
193        result = conio.stdin.read()
194        self.failUnlessEqual(result, "hello")
195
196   
197    def testReadMultiple(self):
198        data = u"hello"
199        records = [createKeyEvent(c, 3) for c in data]
200        stdin.WriteConsoleInput(records)
201
202        result = conio.stdin.read()
203        self.failUnlessEqual(result, "hhheeellllllooo")
204
205       
206    def testReadWithDelete(self):
207        data = u"hello" + u'\b' * 5 + u"world"
208        records = [createKeyEvent(c) for c in data]
209        stdin.WriteConsoleInput(records)
210
211        result = conio.stdin.read()
212        self.failUnlessEqual(result, "hello" + '\b' * 5 + "world")
213
214    def testReadWithBuffer(self):
215        data = u"hello\r"
216        records = [createKeyEvent(c) for c in data]
217        stdin.WriteConsoleInput(records)
218
219        result = conio.stdin.read(3)
220        self.failUnlessEqual(result, "hel")
221
222        result = conio.stdin.read(3)
223        self.failUnlessEqual(result, "lo\n")
224
225    def testFlush(self):
226        data = u"hello"
227        records = [createKeyEvent(c) for c in data]
228        stdin.WriteConsoleInput(records)
229
230        result = conio.stdin.read(3)
231        conio.stdin.flush()
232
233        self.failIf(conio.stdin.buffer)
234        self.failIf(conio.stdin.read())
235
236
237class ConOutTestCase(unittest.TestCase):
238    """Test case for console stdout.
239    Not very much to test, yet.
240    """
241   
242    def testWrite(self):
243        data = "hello"
244        n = conio.stdout.write(data)
245       
246        self.failUnlessEqual(n, 5)
247
248    def testWriteUnicode(self):
249        data = u"hello"
250        n = conio.stdout.write(data)
251       
252        self.failUnlessEqual(n, 5)
253
254    def testWritelines(self):
255        data = ["hello", "world"]
256        n = conio.stdout.writelines(data)
257       
258        self.failUnlessEqual(n, 10)
259
260    def testIsatty(self):
261        self.failUnless(conio.stdout.isatty())
262
263
264
265class StdIOTestProtocol(protocol.Protocol):
266    def __init__(self):
267        self.onData = defer.Deferred()
268
269    def dataReceived(self, data):
270        self.onData.callback(data)
271
272
273class StdIOTestCase(unittest.TestCase):
274    """Test twisted.internet.stdio support for consoles.
275    """
276 
277    def setUp(self):
278        p = StdIOTestProtocol()
279        self.stdio = stdio.StandardIO(p)
280        self.onData = p.onData
281
282    def tearDown(self):
283        self.stdio._pause()
284        try:
285            self.stdio._stopPolling()
286        except error.AlreadyCalled:
287            pass
288       
289        conio.stdin.flush()
290
291    def testRead(self):
292        def cb(result):
293            self.failUnlessEqual(result, "hello\n")
294
295        data = u"hello\r"
296        records = [createKeyEvent(c) for c in data]
297        stdin.WriteConsoleInput(records)
298
299        return self.onData.addCallback(cb)