Ticket #2157: win32conio.py

File win32conio.py, 7.7 KB (added by John Popplewell, 6 years ago)

twisted.internet.win32conio

Line 
1# -*- test-case-name: twisted.test.test_conio -*-
2
3# Copyright (c) 2009 Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7This module implements POSIX replacements for stdin/stdout/stderr
8that support asyncronous read/write to a Windows console.
9
10Some details about Windows console:
11 - a process can have attached only one console
12 - there can be only one input buffer
13 - there can be more then one output buffer
14
15Moreover this module tries to offer an higher level and convenient
16interface for termios commands.
17"""
18
19import os
20import errno
21import pywintypes
22import win32api
23import win32file
24import win32console
25
26
27_ENABLE_NORMAL_MODE = win32console.ENABLE_ECHO_INPUT | win32console.ENABLE_LINE_INPUT
28_ENABLE_WINDOW_INPUT = win32console.ENABLE_WINDOW_INPUT
29
30_share_mode = win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE
31_access_mode = win32file.GENERIC_READ | win32file.GENERIC_WRITE
32
33def GetStdHandle(name):
34    """
35    Get console handles even if they are redirected
36    (usually pipes to/from a parent process)
37    """
38    handle = win32file.CreateFile(name, _access_mode, _share_mode, None, win32file.OPEN_EXISTING, 0, None)
39    return win32console.PyConsoleScreenBufferType(handle)
40
41
42def getWindowSize():
43    size = GetStdHandle("CONOUT$").GetConsoleScreenBufferInfo()["Size"]
44    return (size.X, size.Y)
45
46
47class ConsoleImpl(object):
48    """
49    """
50    def __init__(self):
51        self.in_handle  = GetStdHandle("CONIN$")
52        self.out_handle = GetStdHandle("CONOUT$")
53        self.err_handle = GetStdHandle("CONOUT$")
54
55        # The code page in use
56        # I assume that this does not change
57        self.cp = "cp%d" % win32console.GetConsoleCP()
58
59        self.init()
60
61    def init(self):
62        self.in_closed = False
63        self.out_closed = False
64        self.err_closed = False
65        self._inbuf = ""
66        self.inbuffer = []
67        self.echo = True
68        defaultMode = self.in_handle.GetConsoleMode()
69        self.in_handle.SetConsoleMode(defaultMode | _ENABLE_WINDOW_INPUT)
70        self._windowChangeCallback = None
71        self.read = self._read
72        self.readline = self._readline
73        return self
74
75    #
76    # termios interface
77    #
78    def isatty(self): 
79        return True
80
81    def flushIn(self):
82        # Flush both internal buffer and system console buffer
83        self._inbuf = ""
84        self.inbuffer = []
85        self.in_handle.FlushConsoleInputBuffer() 
86
87    def setEcho(self, enabled):
88        self.echo = enabled
89
90    def enableRawMode(self, enabled=True):
91        """
92        Enable raw mode.
93        """
94        self.flushIn()
95        mode = self.in_handle.GetConsoleMode()
96
97        if enabled:
98            self.read = self._read_raw
99            self.readline = self._readline_raw
100
101            # Set mode on the console, too
102            # XXX check me (this seems not to work)
103            self.in_handle.SetConsoleMode(mode & ~_ENABLE_NORMAL_MODE)
104        else:
105            self.read = self._read
106            self.readline = self._readline
107
108            # Set mode on the console, too
109            self.in_handle.SetConsoleMode(mode | _ENABLE_NORMAL_MODE)
110
111    def setWindowChangeCallback(self, callback):
112        """
113        callback is called when the console window buffer is
114        changed.
115
116        Note: WINDOW_BUFFER_SIZE_EVENT is only raised when changing
117        the window *buffer* size from the console menu
118        """
119        self._windowChangeCallback = callback
120
121
122    #
123    # Channel interface
124    #
125    def closeRead(self):
126        self.flushIn()
127        self.in_closed = True
128
129    def closeWrite(self):
130        self.out_closed = True
131        self.err_closed = True
132
133    def isWriteClosed(self):
134        return self.out_closed
135
136    def write(self, s):
137        """
138        Write a string to the console.
139        """
140        return self.out_handle.WriteConsole(s)
141
142    def writelines(self, seq):
143        """
144        Write a sequence of strings to the console.
145        """
146        s = ''.join(seq)
147        return self.out_handle.WriteConsole(s)
148
149    def _read(self):
150        """
151        """
152        info = self.out_handle.GetConsoleScreenBufferInfo()
153        self.rowSize = info["MaximumWindowSize"].X
154
155        # Initialize the current cursor position
156        if not self._inbuf:
157            self.pos = info["CursorPosition"]
158
159        while 1:
160            n = self.in_handle.GetNumberOfConsoleInputEvents()
161            if n == 0:
162                break
163
164            # Process input
165            for record in self.in_handle.ReadConsoleInput(n):
166                if record.EventType == win32console.WINDOW_BUFFER_SIZE_EVENT:
167                    self.rowSize = record.Size.X
168                    if self._windowChangeCallback:
169                        self._windowChangeCallback()
170                if record.EventType != win32console.KEY_EVENT or \
171                        not record.KeyDown:
172                    continue
173                for i in range(record.RepeatCount):
174                    self._handleReadChar(record.Char)
175
176        if self.inbuffer:
177            return self.inbuffer.pop(0)
178        else:
179            return ''
180
181    def _handleReadChar(self, char):
182        info = self.out_handle.GetConsoleScreenBufferInfo()
183        if char == '\b':
184            if self.echo:
185                pos = info["CursorPosition"]
186                # Move the cursor, handle line wrapping
187                if pos.X > self.pos.X or (pos.X > 0 and \
188                        len(self._inbuf)+self.pos.X > self.rowSize):
189                    pos.X -= 1
190                elif len(self._inbuf)+self.pos.X >= self.rowSize:
191                    pos.X = self.rowSize - 1
192                    pos.Y -= 1
193
194                self.out_handle.SetConsoleCursorPosition(pos)
195                self.out_handle.WriteConsoleOutputCharacter(' ', pos)
196
197            # Delete the characters from accumulation buffer
198            self._inbuf = self._inbuf[:-1]
199            return
200        if char == '\0':
201            # XXX TODO handle keyboard navigation
202            return
203        if char == '\r':
204            self._inbuf += os.linesep
205            if self.echo:
206                self.out_handle.WriteConsole(os.linesep) # do echo
207
208            # We have some data ready to be read
209            self.inbuffer.append(self._inbuf)
210            self._inbuf = ""
211            self.pos = info["CursorPosition"]
212            return
213
214        data = char.encode(self.cp)
215        if self.echo:
216            self.out_handle.WriteConsole(data) # do echo
217        self._inbuf += data
218
219    def _read_raw(self):
220        """
221        """
222        n = self.in_handle.GetNumberOfConsoleInputEvents()
223        if n == 0:
224            return ''
225        # Process input
226        for record in self.in_handle.ReadConsoleInput(n):
227            if record.EventType == win32console.WINDOW_BUFFER_SIZE_EVENT:
228                if self._windowChangeCallback:
229                    self._windowChangeCallback()
230            if record.EventType != win32console.KEY_EVENT or not record.KeyDown:
231                continue
232
233            char = record.Char
234            for i in range(record.RepeatCount):
235                if char == '\0':
236                    vCode = record.VirtualKeyCode
237                    # XXX TODO handle keyboard navigation
238                    continue
239
240                data = char.encode(self.cp)
241                self._inbuf += data
242        return self._inbuf
243
244    def _readline(self):
245        raise NotImplementedError("Not yet implemented")
246
247    def _readline_raw(self):
248        raise NotImplementedError("Not yet implemented")
249
250
251console = ConsoleImpl()
252Console  = console.init
253
254__all__ = [Console]
255