Ticket #2157: win32conio.2.py

File win32conio.2.py, 7.7 KB (added by John Popplewell, 6 years ago)

Fluffed it. Fixed version.

Line 
1# -*- test-case-name: twisted.test.test_conio -*-
2
3# Copyright (c) 2009 Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7This module implements POSIX replacements for stdin/stdout/stderr
8that support asyncronous read/write to a Windows console.
9
10Some details about Windows console:
11 - a process can have attached only one console
12 - there can be only one input buffer
13 - there can be more then one output buffer
14
15Moreover this module tries to offer an higher level and convenient
16interface for termios commands.
17"""
18
19import os
20import errno
21import pywintypes
22import win32api
23import win32file
24import win32console
25
26
27_ENABLE_NORMAL_MODE = win32console.ENABLE_ECHO_INPUT | win32console.ENABLE_LINE_INPUT
28_ENABLE_WINDOW_INPUT = win32console.ENABLE_WINDOW_INPUT
29
30_share_mode = win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE
31_access_mode = win32file.GENERIC_READ | win32file.GENERIC_WRITE
32
33def GetStdHandle(name):
34    """
35    Get console handles even if they are redirected
36    (usually pipes to/from a parent process)
37    """
38    handle = win32file.CreateFile(name, _access_mode, _share_mode, None, win32file.OPEN_EXISTING, 0, None)
39    return win32console.PyConsoleScreenBufferType(handle)
40
41
42def getWindowSize():
43    size = GetStdHandle("CONOUT$").GetConsoleScreenBufferInfo()["Size"]
44    return (size.X, size.Y)
45
46
47class ConsoleImpl(object):
48    """
49    """
50    def __init__(self):
51        self.in_handle  = GetStdHandle("CONIN$")
52        self.out_handle = GetStdHandle("CONOUT$")
53
54        # The code page in use
55        # I assume that this does not change
56        self.cp = "cp%d" % win32console.GetConsoleCP()
57
58        self.init()
59
60    def init(self):
61        self.in_closed = False
62        self.out_closed = False
63        self._inbuf = ""
64        self.inbuffer = []
65        self.echo = True
66        defaultMode = self.in_handle.GetConsoleMode()
67        self.in_handle.SetConsoleMode(defaultMode | _ENABLE_WINDOW_INPUT)
68        self._windowChangeCallback = None
69        self.read = self._read
70        self.readline = self._readline
71        return self
72
73    #
74    # termios interface
75    #
76    def isatty(self): 
77        return True
78
79    def flushIn(self):
80        # Flush both internal buffer and system console buffer
81        self._inbuf = ""
82        self.inbuffer = []
83        self.in_handle.FlushConsoleInputBuffer() 
84
85    def setEcho(self, enabled):
86        self.echo = enabled
87
88    def enableRawMode(self, enabled=True):
89        """
90        Enable raw mode.
91        """
92        self.flushIn()
93        mode = self.in_handle.GetConsoleMode()
94
95        if enabled:
96            self.read = self._read_raw
97            self.readline = self._readline_raw
98
99            # Set mode on the console, too
100            # XXX check me (this seems not to work)
101            self.in_handle.SetConsoleMode(mode & ~_ENABLE_NORMAL_MODE)
102        else:
103            self.read = self._read
104            self.readline = self._readline
105
106            # Set mode on the console, too
107            self.in_handle.SetConsoleMode(mode | _ENABLE_NORMAL_MODE)
108
109    def setWindowChangeCallback(self, callback):
110        """
111        callback is called when the console window buffer is
112        changed.
113
114        Note: WINDOW_BUFFER_SIZE_EVENT is only raised when changing
115        the window *buffer* size from the console menu
116        """
117        self._windowChangeCallback = callback
118
119
120    #
121    # Channel interface
122    #
123    def closeRead(self):
124        self.flushIn()
125        self.in_closed = True
126
127    def closeWrite(self):
128        self.out_closed = True
129
130    def isWriteClosed(self):
131        return self.out_closed
132
133    def write(self, s):
134        """
135        Write a string to the console.
136        """
137        return self.out_handle.WriteConsole(s)
138
139    def writelines(self, seq):
140        """
141        Write a sequence of strings to the console.
142        """
143        s = ''.join(seq)
144        return self.out_handle.WriteConsole(s)
145
146    def _read(self):
147        """
148        """
149        if self.out_closed:
150            raise pywintypes.error(6, "The handle is invalid.") 
151        info = self.out_handle.GetConsoleScreenBufferInfo()
152        self.rowSize = info["MaximumWindowSize"].X
153
154        # Initialize the current cursor position
155        if not self._inbuf:
156            self.pos = info["CursorPosition"]
157
158        while 1:
159            n = self.in_handle.GetNumberOfConsoleInputEvents()
160            if n == 0:
161                break
162
163            # Process input
164            for record in self.in_handle.ReadConsoleInput(n):
165                if record.EventType == win32console.WINDOW_BUFFER_SIZE_EVENT:
166                    self.rowSize = record.Size.X
167                    if self._windowChangeCallback:
168                        self._windowChangeCallback()
169                if record.EventType != win32console.KEY_EVENT or \
170                        not record.KeyDown:
171                    continue
172                for i in range(record.RepeatCount):
173                    self._handleReadChar(record.Char)
174
175        if self.inbuffer:
176            return self.inbuffer.pop(0)
177        else:
178            return ''
179
180    def _handleReadChar(self, char):
181        info = self.out_handle.GetConsoleScreenBufferInfo()
182        if char == '\b':
183            if self.echo:
184                pos = info["CursorPosition"]
185                # Move the cursor, handle line wrapping
186                if pos.X > self.pos.X or (pos.X > 0 and \
187                        len(self._inbuf)+self.pos.X > self.rowSize):
188                    pos.X -= 1
189                elif len(self._inbuf)+self.pos.X >= self.rowSize:
190                    pos.X = self.rowSize - 1
191                    pos.Y -= 1
192
193                self.out_handle.SetConsoleCursorPosition(pos)
194                self.out_handle.WriteConsoleOutputCharacter(' ', pos)
195
196            # Delete the characters from accumulation buffer
197            self._inbuf = self._inbuf[:-1]
198            return
199        if char == '\0':
200            # XXX TODO handle keyboard navigation
201            return
202        if char == '\r':
203            self._inbuf += os.linesep
204            if self.echo:
205                self.out_handle.WriteConsole(os.linesep) # do echo
206
207            # We have some data ready to be read
208            self.inbuffer.append(self._inbuf)
209            self._inbuf = ""
210            self.pos = info["CursorPosition"]
211            return
212
213        data = char.encode(self.cp)
214        if self.echo:
215            self.out_handle.WriteConsole(data) # do echo
216        self._inbuf += data
217
218    def _read_raw(self):
219        """
220        """
221        n = self.in_handle.GetNumberOfConsoleInputEvents()
222        if n == 0:
223            return ''
224        # Process input
225        for record in self.in_handle.ReadConsoleInput(n):
226            if record.EventType == win32console.WINDOW_BUFFER_SIZE_EVENT:
227                if self._windowChangeCallback:
228                    self._windowChangeCallback()
229            if record.EventType != win32console.KEY_EVENT or not record.KeyDown:
230                continue
231
232            char = record.Char
233            for i in range(record.RepeatCount):
234                if char == '\0':
235                    vCode = record.VirtualKeyCode
236                    # XXX TODO handle keyboard navigation
237                    continue
238
239                data = char.encode(self.cp)
240                self._inbuf += data
241        return self._inbuf
242
243    def _readline(self):
244        raise NotImplementedError("Not yet implemented")
245
246    def _readline_raw(self):
247        raise NotImplementedError("Not yet implemented")
248
249
250console = ConsoleImpl()
251Console  = console.init
252
253__all__ = [Console]
254