Ticket #2157: win32_console_io.2.patch

File win32_console_io.2.patch, 32.6 KB (added by khorn, 6 years ago)

Unified diff of above patches against r27422

  • twisted/internet/_pollingfile.py

     
    33
    44Implements a simple polling interface for file descriptors that don't work with
    55select() - this is pretty much only useful on Windows.
    6 
    76"""
    87
     8import sys
    99from zope.interface import implements
    1010
    1111from twisted.internet.interfaces import IConsumer, IPushProducer
     
    1313MIN_TIMEOUT = 0.000000001
    1414MAX_TIMEOUT = 0.1
    1515
    16 class _PollableResource:
     16
     17class _PollableResource(object):
    1718    active = True
    1819
    1920    def activate(self):
     
    2223    def deactivate(self):
    2324        self.active = False
    2425
    25 class _PollingTimer:
     26
     27class _PollingTimer(object):
    2628    # Everything is private here because it is really an implementation detail.
    2729
    2830    def __init__(self, reactor):
     
    9193# If we ever (let's hope not) need the above functionality on UNIX, this could
    9294# be factored into a different module.
    9395
    94 import win32pipe
    9596import win32file
    9697import win32api
    9798import pywintypes
    9899
    99 class _PollableReadPipe(_PollableResource):
    100100
     101class _PollableReader(_PollableResource):
     102
    101103    implements(IPushProducer)
    102104
    103     def __init__(self, pipe, receivedCallback, lostCallback):
    104         # security attributes for pipes
    105         self.pipe = pipe
     105    def __init__(self, handle, receivedCallback, lostCallback):
     106        self.handle = handle
    106107        self.receivedCallback = receivedCallback
    107108        self.lostCallback = lostCallback
    108109
    109110    def checkWork(self):
    110         finished = 0
    111         fullDataRead = []
     111        raise NotImplementedError()
    112112
    113         while 1:
    114             try:
    115                 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
    116                 # finished = (result == -1)
    117                 if not bytesToRead:
    118                     break
    119                 hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
    120                 fullDataRead.append(data)
    121             except win32api.error:
    122                 finished = 1
    123                 break
    124 
    125         dataBuf = ''.join(fullDataRead)
    126         if dataBuf:
    127             self.receivedCallback(dataBuf)
    128         if finished:
    129             self.cleanup()
    130         return len(dataBuf)
    131 
    132113    def cleanup(self):
    133114        self.deactivate()
    134115        self.lostCallback()
    135116
    136117    def close(self):
     118        # XXX why not cleanup?
    137119        try:
    138             win32api.CloseHandle(self.pipe)
     120            win32api.CloseHandle(self.handle)
    139121        except pywintypes.error:
    140122            # You can't close std handles...?
    141123            pass
     
    150132        self.activate()
    151133
    152134
    153 FULL_BUFFER_SIZE = 64 * 1024
    154135
    155 class _PollableWritePipe(_PollableResource):
     136class _PollableWriter(_PollableResource):
     137    FULL_BUFFER_SIZE = 64 * 1024
    156138
    157139    implements(IConsumer)
    158 
    159     def __init__(self, writePipe, lostCallback):
     140   
     141    def __init__(self, handle, lostCallback):
    160142        self.disconnecting = False
    161143        self.producer = None
    162144        self.producerPaused = 0
    163145        self.streamingProducer = 0
    164146        self.outQueue = []
    165         self.writePipe = writePipe
     147        self.handle = handle
    166148        self.lostCallback = lostCallback
    167         try:
    168             win32pipe.SetNamedPipeHandleState(writePipe,
    169                                               win32pipe.PIPE_NOWAIT,
    170                                               None,
    171                                               None)
    172         except pywintypes.error:
    173             # Maybe it's an invalid handle.  Who knows.
    174             pass
    175149
    176150    def close(self):
    177151        self.disconnecting = True
     
    219193    def writeConnectionLost(self):
    220194        self.deactivate()
    221195        try:
    222             win32api.CloseHandle(self.writePipe)
     196            win32api.CloseHandle(self.handle)
    223197        except pywintypes.error:
    224198            # OMG what
    225199            pass
     
    232206        if self.disconnecting:
    233207            return
    234208        self.outQueue.append(data)
    235         if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
     209        if sum(map(len, self.outQueue)) > self.FULL_BUFFER_SIZE:
    236210            self.bufferFull()
    237211
    238212    def checkWork(self):
     213        raise NotImplementedError()
     214
     215
     216
     217#
     218# Pipe support
     219#
     220import win32pipe
     221
     222
     223class _PollableReadPipe(_PollableReader):
     224    def __init__(self, pipe, receivedCallback, lostCallback):
     225        _PollableReader.__init__(self, pipe, receivedCallback, lostCallback)
     226        # security attributes for pipes
     227
     228    def checkWork(self):
     229        finished = 0
     230        fullDataRead = []
     231
     232        while 1:
     233            try:
     234                buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.handle, 1)
     235                # finished = (result == -1)
     236                if not bytesToRead:
     237                    break
     238                hr, data = win32file.ReadFile(self.handle, bytesToRead, None)
     239                fullDataRead.append(data)
     240            except win32api.error:
     241                finished = 1
     242                break
     243
     244        dataBuf = ''.join(fullDataRead)
     245        if dataBuf:
     246            self.receivedCallback(dataBuf)
     247        if finished:
     248            self.cleanup()
     249        return len(dataBuf)
     250
     251
     252class _PollableWritePipe(_PollableWriter):
     253    def __init__(self, writePipe, lostCallback):
     254        _PollableWriter.__init__(self, writePipe, lostCallback)
     255
     256        try:
     257            win32pipe.SetNamedPipeHandleState(writePipe,
     258                                              win32pipe.PIPE_NOWAIT,
     259                                              None,
     260                                              None)
     261        except pywintypes.error:
     262            # Maybe it's an invalid handle.  Who knows.
     263            pass
     264
     265    def checkWork(self):
    239266        numBytesWritten = 0
    240267        if not self.outQueue:
    241268            if self.disconnecting:
    242269                self.writeConnectionLost()
    243270                return 0
    244271            try:
    245                 win32file.WriteFile(self.writePipe, '', None)
     272                win32file.WriteFile(self.handle, '', None)
    246273            except pywintypes.error:
    247274                self.writeConnectionLost()
    248275                return numBytesWritten
     
    250277            data = self.outQueue.pop(0)
    251278            errCode = 0
    252279            try:
    253                 errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
     280                errCode, nBytesWritten = win32file.WriteFile(self.handle,
    254281                                                             data, None)
    255282            except win32api.error:
    256283                self.writeConnectionLost()
     
    266293            if not resumed and self.disconnecting:
    267294                self.writeConnectionLost()
    268295        return numBytesWritten
    269 
    270 
  • twisted/internet/_win32stdio.py

     
    1 # -*- test-case-name: twisted.test.test_process.ProcessTestCase.testStdio -*-
     1# -*- test-case-name: twisted.test.test_process.ProcessTestCase.testStdio,twisted.test.test_conio.StdIOTestCase -*-
    22
     3import os
     4import errno
     5import msvcrt
     6
     7import pywintypes
    38import win32api
    4 import os, msvcrt
     9import win32console
    510
    611from zope.interface import implements
    712
    813from twisted.internet.interfaces import IHalfCloseableProtocol, ITransport, IAddress
    914from twisted.internet.interfaces import IConsumer, IPushProducer
     15from twisted.internet import main, abstract
    1016
    11 from twisted.internet import _pollingfile, main
     17import _pollingfile
    1218
     19
     20
     21# _pollingfile support
     22# XXX check me
     23class _PollableReadConsole(_pollingfile._PollableReader):
     24    def __init__(self, channel, receivedCallback, lostCallback):
     25        _pollingfile._PollableReader.__init__(self, channel.handle,
     26                                              receivedCallback, lostCallback)
     27        self.channel = channel
     28
     29    def checkWork(self):
     30        try:
     31            data = self.channel.read(2000) # 2000 = 80 x 25
     32        except IOError, ioe:
     33            assert ioe.args[0] == errno.EAGAIN
     34            return 0
     35        except win32console.error:
     36            # stdin or stdout closed?
     37            self.cleanup()
     38            return 0
     39
     40        self.receivedCallback(data)
     41        return len(data)
     42
     43
     44class _PollableWriteConsole(_pollingfile._PollableWriter):
     45    def __init__(self, channel, lostCallback):
     46        _pollingfile._PollableWriter.__init__(self, channel.handle, lostCallback)
     47
     48        self.channel = channel
     49
     50    def checkWork(self):
     51        numBytesWritten = 0
     52        if not self.outQueue:
     53            if self.disconnecting:
     54                self.writeConnectionLost()
     55                return 0
     56            try:
     57                self.channel.write('')
     58            except pywintypes.error:
     59                self.writeConnectionLost()
     60                return numBytesWritten
     61
     62        while self.outQueue:
     63            data = self.outQueue.pop(0)
     64            try:
     65                # XXX as far as I know,
     66                # nBytesWritten is always equal to len(data)
     67                nBytesWritten = self.channel.write(data)
     68            except win32console.error:
     69                self.writeConnectionLost()
     70                break
     71            else:
     72                numBytesWritten += nBytesWritten
     73                if len(data) > nBytesWritten:
     74                    self.outQueue.insert(0, data[nBytesWritten:])
     75                    break
     76        else:
     77            resumed = self.bufferEmpty()
     78            if not resumed and self.disconnecting:
     79                self.writeConnectionLost()
     80
     81        return numBytesWritten
     82
     83
     84
     85# support for win32eventreactor
     86# XXX check me
     87class ConsoleReader(abstract.FileDescriptor):
     88    def __init__(self, channel, receivedCallback, lostCallback, reactor=None):
     89        """win32eventreactor is assumed.
     90        """
     91       
     92        if not reactor:
     93            from twisted.internet import reactor
     94
     95        self.channel = channel
     96        self.receivedCallback = receivedCallback
     97        self.lostCallback = lostCallback
     98        self.reactor = reactor
     99
     100        self.reactor.addEvent(self.channel.handle, self, "doRead")
     101
     102
     103    def doRead(self):
     104        try:
     105            data = self.channel.read(2000) # 2000 = 80 x 25
     106        except IOError, ioe:
     107            assert ioe.args[0] == errno.EAGAIN
     108            return 0
     109        except win32console.error:
     110            # stdin or stdout closed?
     111            self.cleanup()
     112            return 0
     113
     114        self.receivedCallback(data)
     115        return len(data)
     116
     117    def cleanup(self):
     118        self.reactor.removeEvent(self.channel.handle)
     119        self.lostCallback()
     120
     121    def close(self):
     122        try:
     123            win32api.CloseHandle(self.channel.handle)
     124        except pywintypes.error:
     125            # You can't close std handles...?
     126            pass
     127
     128        self.cleanup()
     129
     130    def stopProducing(self):
     131        self.close()
     132
     133    def pauseProducing(self):
     134        self.reactor.removeEvent(self.channel.handle)
     135
     136    def resumeProducing(self):
     137        self.reactor.addEvent(self.channel.handle, self, "doRead")
     138
     139
    13140class Win32PipeAddress(object):
    14141    implements(IAddress)
    15142
     143class Win32ConsoleAddress(object):
     144    implements(IAddress)
     145
     146
    16147class StandardIO(_pollingfile._PollingTimer):
    17148
    18149    implements(ITransport,
     
    28159
    29160        Also, put it stdin/stdout/stderr into binary mode.
    30161        """
     162
    31163        from twisted.internet import reactor
    32 
     164        from twisted.internet.win32eventreactor import Win32Reactor
     165       
    33166        for stdfd in range(0, 1, 2):
    34167            msvcrt.setmode(stdfd, os.O_BINARY)
    35168
    36169        _pollingfile._PollingTimer.__init__(self, reactor)
    37170        self.proto = proto
     171       
     172        # Check for win32eventreactor
     173        win32Enabled = reactor.__class__ is Win32Reactor
    38174
    39         hstdin = win32api.GetStdHandle(win32api.STD_INPUT_HANDLE)
    40         hstdout = win32api.GetStdHandle(win32api.STD_OUTPUT_HANDLE)
     175        # Check if we are connected to a console.
     176        # If this is the case, connect to the console, else connect to
     177        # anonymous pipes.
     178        if os.isatty(0):
     179            import conio
     180           
     181            if win32Enabled:
     182                self.stdin = ConsoleReader(
     183                    conio.stdin, self.dataReceived, self.readConnectionLost
     184                    )
     185            else:
     186                self.stdin = _PollableReadConsole(
     187                    conio.stdin, self.dataReceived, self.readConnectionLost
     188                    )
     189        else:
     190            hstdin = win32api.GetStdHandle(win32api.STD_INPUT_HANDLE)
     191            self.stdin = _pollingfile._PollableReadPipe(
     192                hstdin, self.dataReceived, self.readConnectionLost
     193                )
    41194
    42         self.stdin = _pollingfile._PollableReadPipe(
    43             hstdin, self.dataReceived, self.readConnectionLost)
     195        if os.isatty(1):
     196            import conio
     197           
     198            self.stdout = _PollableWriteConsole(
     199                conio.stdout, self.writeConnectionLost
     200                )
     201        else:
     202            hstdout = win32api.GetStdHandle(win32api.STD_OUTPUT_HANDLE)
     203            self.stdout = _pollingfile._PollableWritePipe(
     204                hstdout, self.writeConnectionLost
     205                )
     206           
     207        if not (os.isatty(0) and win32Enabled):
     208            self._addPollableResource(self.stdin)
    44209
    45         self.stdout = _pollingfile._PollableWritePipe(
    46             hstdout, self.writeConnectionLost)
    47 
    48         self._addPollableResource(self.stdin)
    49210        self._addPollableResource(self.stdout)
    50211
    51212        self.proto.makeConnection(self)
     
    86247        self.stdout.close()
    87248
    88249    def getPeer(self):
    89         return Win32PipeAddress()
     250        if os.isatty(0) and os.isatty(1):
     251            return Win32ConsoleAddress()
     252        else:
     253            return Win32PipeAddress()
    90254
    91255    def getHost(self):
    92         return Win32PipeAddress()
     256        if os.isatty(0) and os.isatty(1):
     257            return Win32ConsoleAddress()
     258        else:
     259            return Win32PipeAddress()
    93260
     261
    94262    # IConsumer
    95263
    96264    def registerProducer(self, producer, streaming):
     
    113281
    114282    def resumeProducing(self):
    115283        self.stdin.resumeProducing()
    116 
  • twisted/internet/conio.py

     
     1# -*- test-case-name: twisted.test.test_conio -*-
     2"""This module implements POSIX replacements for stdin/stdout/stderr
     3that support asyncronous read/write to a Windows console.
     4
     5Some details about Windows console:
     6- a process can have attached only one console
     7- there can be only one input buffer
     8- there can be more then one output buffer
     9
     10Moreover this module tries to offer an higher level and convenient
     11interface for termios commands.
     12"""
     13
     14import errno
     15import pywintypes
     16import win32api
     17import win32console
     18
     19
     20
     21_ENABLE_NORMAL_MODE = win32console.ENABLE_ECHO_INPUT | win32console.ENABLE_LINE_INPUT
     22_ENABLE_WINDOW_INPUT = win32console.ENABLE_WINDOW_INPUT
     23
     24
     25class ConIn(object):
     26    """I implement a file like object that supports asyncronous reading
     27    from a console.
     28
     29    This class should be considered a singleton, don't instantiate new
     30    objects and instead use the global stdin object.
     31    """
     32   
     33    def __init__(self, handle):
     34        # handle should be std handle for STD_INPUT_HANDLE
     35        self.handle = handle
     36
     37        # The code page in use
     38        # I assume that this does not change
     39        self.cp = "cp%d" % win32console.GetConsoleCP()
     40
     41        # The temporary (accumulation) buffer used to store the data as
     42        # it arrives from the console
     43        self._buf = []
     44       
     45        # The buffer used to store data ready to be read
     46        self.buffer = ''
     47
     48        # Enable the receiving of input records when the console
     49        # window (buffer) is changed
     50        defaultMode = handle.GetConsoleMode()
     51        handle.SetConsoleMode(defaultMode | _ENABLE_WINDOW_INPUT)
     52
     53        # The callback to be called upon the receiving of a windows
     54        # change record
     55        self._windowChangeCallback = None
     56       
     57        # To optimize the code we use different functions for normal
     58        # and raw mode
     59        self.read = self._read
     60        self.readline = self._readline
     61
     62    #
     63    # termios interface
     64    #
     65    def enableRawMode(self, enabled=True):
     66        """Enable raw mode.
     67
     68        XXX check me
     69        """
     70
     71        # Flush buffer
     72        self._buf = []
     73        self.buffer = ''
     74
     75        # Flush the console buffer, too
     76        self.handle.FlushConsoleInputBuffer()
     77
     78        mode = self.handle.GetConsoleMode()
     79
     80        if enabled:
     81            self.read = self._read_raw
     82            self.readline = self._readline_raw
     83
     84            # Set mode on the console, too
     85            # XXX check me (this seems not to work)
     86            self.handle.SetConsoleMode(mode & ~_ENABLE_NORMAL_MODE)
     87        else:
     88            self.read = self._read
     89            self.readline = self._readline
     90
     91            # Set mode on the console, too
     92            self.handle.SetConsoleMode(mode | _ENABLE_NORMAL_MODE)
     93
     94    def setWindowChangeCallback(self, callback):
     95        """callback is called when the console window buffer is
     96        changed.
     97
     98        Note: WINDOW_BUFFER_SIZE_EVENT is only raised when changing
     99              the window *buffer* size from the console menu
     100        """
     101       
     102        self._windowChangeCallback = callback
     103
     104           
     105    #
     106    # File object interface
     107    #
     108    def close(self):
     109        win32api.CloseHandle(self.handle)
     110
     111    def flush(self):
     112        # Flush both internal buffer and system console buffer
     113        self.buffer = ''
     114        self._buf = []
     115
     116        self.handle.FlushConsoleInputBuffer()
     117
     118    def fileno(self):
     119        return self.handle
     120
     121    def isatty(self):
     122        return True
     123
     124    def next(self):
     125        raise NotImplementedError("Not yet implemented")
     126
     127    def _read(self, size=None):
     128        """Read size bytes from the console.
     129        An exception is raised when the operation would block.
     130
     131        XXX Just return the empty string instead of raising an exception?
     132        """
     133
     134        # This can fail if stdout has been closed
     135        info = stdout.handle.GetConsoleScreenBufferInfo()
     136        rowSize = info["MaximumWindowSize"].X
     137
     138        # Initialize the current cursor position
     139        if not self._buf:
     140            self.pos = info["CursorPosition"]
     141           
     142        while 1:
     143            n = self.handle.GetNumberOfConsoleInputEvents()
     144            if n == 0:
     145                break
     146               
     147            records = self.handle.ReadConsoleInput(n)
     148               
     149            # Process input
     150            for record in records:
     151                if record.EventType == win32console.WINDOW_BUFFER_SIZE_EVENT:
     152                    rowSize = record.Size.X
     153                    if self._windowChangeCallback:
     154                        self._windowChangeCallback()
     155                if record.EventType != win32console.KEY_EVENT \
     156                        or not record.KeyDown:
     157                    continue
     158
     159                char = record.Char
     160                n = record.RepeatCount
     161                if char == '\b':
     162                    pos = stdout.handle.GetConsoleScreenBufferInfo()["CursorPosition"]
     163                   
     164                    # Move the cursor
     165                    x = pos.X - n
     166                    if x >= 0:
     167                        pos.X = x
     168                    # XXX assuming |x| < rowSize (I'm lazy)
     169                    elif pos.Y > self.pos.Y:
     170                        pos.X = rowSize - 1
     171                        pos.Y -= 1
     172
     173                    stdout.handle.SetConsoleCursorPosition(pos)
     174                    stdout.handle.WriteConsoleOutputCharacter(' ' * n, pos)
     175                   
     176                    # Delete the characters from accumulation buffer
     177                    self._buf = self._buf[:-n]
     178                    continue
     179                elif char == '\0':
     180                    vCode = record.VirtualKeyCode
     181                    # XXX TODO handle keyboard navigation
     182                    continue
     183                elif char == '\r':
     184                    char = '\n' * n
     185
     186                    self._buf.append(char)
     187                    stdout.handle.WriteConsole(char) # do echo
     188                   
     189                    # We have some data ready to be read
     190                    self.buffer = ''.join(self._buf)
     191                    self._buf = []
     192                    self.pos = info["CursorPosition"]
     193
     194                    if size is None:
     195                        size = len(self.buffer)
     196                       
     197                    data = self.buffer[:size]
     198                    self.buffer = self.buffer[size:]
     199                    return data
     200
     201                char = char * n
     202                data = char.encode(self.cp)
     203                stdout.handle.WriteConsole(data) # do echo
     204               
     205                self._buf.append(data)
     206
     207        if self.buffer:
     208            data = self.buffer[:size]
     209            self.buffer = self.buffer[size:]
     210            return data
     211        else:
     212            raise IOError(errno.EAGAIN)
     213
     214    def _read_raw(self, size=None):
     215        """Read size bytes from the console, in raw mode.
     216
     217        XXX check me.
     218        """
     219
     220        while 1: # XXX is this loop really needed?
     221            n = self.handle.GetNumberOfConsoleInputEvents()
     222            if n == 0:
     223                break
     224               
     225            records = self.handle.ReadConsoleInput(n)
     226               
     227            # Process input
     228            for record in records:
     229                if record.EventType == win32console.WINDOW_BUFFER_SIZE_EVENT:
     230                    if self._windowChangeCallback:
     231                        self._windowChangeCallback()
     232                if record.EventType != win32console.KEY_EVENT \
     233                        or not record.KeyDown:
     234                    continue
     235
     236                char = record.Char
     237                n = record.RepeatCount
     238                if char == '\0':
     239                    vCode = record.VirtualKeyCode
     240                    # XXX TODO handle keyboard navigation
     241                    continue
     242                elif char == '\r':
     243                    char = '\n' * n
     244
     245                char = char * n
     246                data = char.encode(self.cp)
     247               
     248                self._buf.append(data)
     249
     250
     251        buffer = ''.join(self._buf)
     252        if buffer:
     253            if size is None:
     254                size = len(buffer)
     255
     256            data = buffer[:size]
     257            # Keep the remaining data in the accumulation buffer
     258            self._buf = [buffer[size:]]
     259            return data
     260        else:
     261            return ''
     262
     263    def _readline(self, size=None):
     264        # XXX check me
     265        return self._read(size)
     266
     267    def _readline_raw(self, size=None):
     268        raise NotImplementedError("Not yet implemented")
     269
     270   
     271   
     272class ConOut(object):
     273    """I implement a file like object that supports asyncronous writing
     274    to a console.
     275
     276    This class should be considered private, don't instantiate new
     277    objects and instead use the global stdout and stderr objects.
     278
     279    Note that there is no option to make WriteConsole non blocking,
     280    but is seems that this function does not block at all.
     281    When a blocking operation like text selection is in action, the
     282    process is halted.
     283    """
     284
     285    def __init__(self, handle):
     286        # handle should be std handle for STD_OUTOUT_HANDLE or STD_ERROR_HANDLE
     287        self.handle = handle
     288
     289       
     290    #
     291    # File object interface
     292    #
     293    def close(self):
     294        win32api.CloseHandle(self.handle)
     295
     296    def flush(self):
     297        # There is no buffering
     298        pass
     299
     300    def fileno(self):
     301        return self.handle
     302
     303    def isatty(self):
     304        return True
     305
     306    def write(self, s):
     307        """Write a string to the console.
     308        """
     309
     310        return self.handle.WriteConsole(s)
     311
     312    def writelines(self, seq):
     313        """Write a sequence of strings to the console.
     314        """
     315       
     316        s = ''.join(seq)
     317        return self.handle.WriteConsole(s)
     318
     319
     320
     321# The public interface of this module
     322# XXX TODO replace sys.stdin, sys.stdout and sys.stderr?
     323stdin = ConIn(win32console.GetStdHandle(win32console.STD_INPUT_HANDLE))
     324stdout = ConOut(win32console.GetStdHandle(win32console.STD_OUTPUT_HANDLE))
     325stderr = ConOut(win32console.GetStdHandle(win32console.STD_ERROR_HANDLE))
     326
     327
     328__all__ = [stdin, stdout, stderr]
  • twisted/test/test_conio.py

     
     1"""Test suite for asyncronous I/O support for Windows Console.
     2
     3For testing I use the low level WriteConsoleInput function that allows
     4to write directly in the console input queue.
     5"""
     6
     7import os, sys
     8import win32console
     9
     10from twisted.trial import unittest
     11from twisted.python import filepath
     12from twisted.internet import error, defer, protocol, reactor
     13
     14from twisted.internet import conio, _win32stdio as stdio
     15
     16
     17
     18def createKeyEvent(char, repeat=1):
     19    """Create a low level record structure with the given character.
     20    """
     21   
     22    evt = win32console.PyINPUT_RECORDType(win32console.KEY_EVENT)
     23    evt.KeyDown = True
     24    evt.Char = char
     25    evt.RepeatCount = repeat
     26
     27    return evt
     28
     29
     30stdin = win32console.GetStdHandle(win32console.STD_INPUT_HANDLE)
     31
     32
     33class ConInTestCase(unittest.TestCase):
     34    """Test case for console stdin.
     35    """
     36
     37    def tearDown(self):
     38        conio.stdin.flush()
     39
     40    def testRead(self):
     41        data = u"hello\r"
     42        records = [createKeyEvent(c) for c in data]
     43        stdin.WriteConsoleInput(records)
     44
     45        result = conio.stdin.read()
     46        self.failUnlessEqual(result, "hello\n")
     47
     48    def testRead2(self):
     49        """Test two consecutives read.
     50        """
     51
     52        def read():
     53            data = u"hello\r"
     54            records = [createKeyEvent(c) for c in data]
     55            stdin.WriteConsoleInput(records)
     56           
     57            result = conio.stdin.read()
     58            self.failUnlessEqual(result, "hello\n")
     59   
     60        read()
     61        read()
     62
     63    def testReadMultiple(self):
     64        """Test if repeated characters are handled correctly.
     65        """
     66
     67        data = u"hello\r"
     68        records = [createKeyEvent(c, 3) for c in data]
     69        stdin.WriteConsoleInput(records)
     70
     71        result = conio.stdin.read()
     72        self.failUnlessEqual(result, "hhheeellllllooo\n\n\n")
     73
     74    def testReadWithDelete(self):
     75        """Test if deletion is handled correctly.
     76        """
     77
     78        data = u"hello" + u"\b" * 5 + u"world\r"
     79        records = [createKeyEvent(c) for c in data]
     80        stdin.WriteConsoleInput(records)
     81
     82        result = conio.stdin.read()
     83        self.failUnlessEqual(result, "world\n")
     84
     85    def testDeleteBoundary(self):
     86        """Test if deletion is handled correctly.
     87        """
     88
     89        data = u"h" + "\b\b" + u"w\r"
     90        records = [createKeyEvent(c) for c in data]
     91        stdin.WriteConsoleInput(records)
     92
     93        result = conio.stdin.read()
     94        self.failUnlessEqual(result, "w\n")
     95
     96    def testDeleteFullBoundary(self):
     97        """Test if deletion is handled correctly.
     98        """
     99
     100        data = u"h" * 500 + "\b" * 600 + u"w\r"
     101        records = [createKeyEvent(c) for c in data]
     102        stdin.WriteConsoleInput(records)
     103
     104        result = conio.stdin.read()
     105        self.failUnlessEqual(result, "w\n")
     106
     107    def testReadWithBuffer(self):
     108        data = u"hello\r"
     109        records = [createKeyEvent(c) for c in data]
     110        stdin.WriteConsoleInput(records)
     111
     112        result = conio.stdin.read(3)
     113        self.failUnlessEqual(result, "hel")
     114
     115        result = conio.stdin.read(3)
     116        self.failUnlessEqual(result, "lo\n")
     117
     118    def testReadWouldBlock(self):
     119        data = u"hello"
     120        records = [createKeyEvent(c) for c in data]
     121        stdin.WriteConsoleInput(records)
     122
     123        self.failUnlessRaises(IOError, conio.stdin.read)
     124
     125    def testReadWouldBlockBuffer(self):
     126        data = u"hello"
     127        records = [createKeyEvent(c) for c in data]
     128        stdin.WriteConsoleInput(records)
     129
     130        self.failUnlessRaises(IOError, conio.stdin.read, 3)
     131
     132    def testIsatty(self):
     133        self.failUnless(conio.stdin.isatty())
     134
     135    def testBuffer(self):
     136        data = u"hello"
     137        records = [createKeyEvent(c) for c in data]
     138        stdin.WriteConsoleInput(records)
     139
     140        try:
     141            # This will put the data in the accumulation buffer
     142            conio.stdin.read()
     143        except IOError:
     144            pass
     145       
     146        self.failUnlessEqual(conio.stdin._buf, list("hello"))
     147
     148    def testFlush(self):
     149        data = u"hello\r"
     150        records = [createKeyEvent(c) for c in data]
     151        stdin.WriteConsoleInput(records)
     152
     153        result = conio.stdin.read(3)
     154        conio.stdin.flush()
     155       
     156        self.failIf(conio.stdin.buffer)
     157        self.failUnlessRaises(IOError, conio.stdin.read, 3)
     158
     159    def testFlushBuffer(self):
     160        data = u"hello"
     161        records = [createKeyEvent(c) for c in data]
     162        stdin.WriteConsoleInput(records)
     163
     164        try:
     165            # This will put the data in the accumulation buffer
     166            conio.stdin.read()
     167        except IOError:
     168            pass
     169
     170        conio.stdin.flush()
     171       
     172        self.failIf(conio.stdin.buffer)
     173        self.failIf(conio.stdin._buf)
     174        self.failUnlessRaises(IOError, conio.stdin.read, 3)
     175
     176
     177class ConInRawTestCase(unittest.TestCase):
     178    """Test case for console stdin in raw mode.
     179    """
     180
     181    def setUp(self):
     182        conio.stdin.enableRawMode()
     183
     184    def tearDown(self):
     185        conio.stdin.flush()
     186        conio.stdin.enableRawMode(False)
     187
     188    def testRead(self):
     189        data = u"hello"
     190        records = [createKeyEvent(c) for c in data]
     191        stdin.WriteConsoleInput(records)
     192
     193        result = conio.stdin.read()
     194        self.failUnlessEqual(result, "hello")
     195
     196   
     197    def testReadMultiple(self):
     198        data = u"hello"
     199        records = [createKeyEvent(c, 3) for c in data]
     200        stdin.WriteConsoleInput(records)
     201
     202        result = conio.stdin.read()
     203        self.failUnlessEqual(result, "hhheeellllllooo")
     204
     205       
     206    def testReadWithDelete(self):
     207        data = u"hello" + u'\b' * 5 + u"world"
     208        records = [createKeyEvent(c) for c in data]
     209        stdin.WriteConsoleInput(records)
     210
     211        result = conio.stdin.read()
     212        self.failUnlessEqual(result, "hello" + '\b' * 5 + "world")
     213
     214    def testReadWithBuffer(self):
     215        data = u"hello\r"
     216        records = [createKeyEvent(c) for c in data]
     217        stdin.WriteConsoleInput(records)
     218
     219        result = conio.stdin.read(3)
     220        self.failUnlessEqual(result, "hel")
     221
     222        result = conio.stdin.read(3)
     223        self.failUnlessEqual(result, "lo\n")
     224
     225    def testFlush(self):
     226        data = u"hello"
     227        records = [createKeyEvent(c) for c in data]
     228        stdin.WriteConsoleInput(records)
     229
     230        result = conio.stdin.read(3)
     231        conio.stdin.flush()
     232
     233        self.failIf(conio.stdin.buffer)
     234        self.failIf(conio.stdin.read())
     235
     236
     237class ConOutTestCase(unittest.TestCase):
     238    """Test case for console stdout.
     239    Not very much to test, yet.
     240    """
     241   
     242    def testWrite(self):
     243        data = "hello"
     244        n = conio.stdout.write(data)
     245       
     246        self.failUnlessEqual(n, 5)
     247
     248    def testWriteUnicode(self):
     249        data = u"hello"
     250        n = conio.stdout.write(data)
     251       
     252        self.failUnlessEqual(n, 5)
     253
     254    def testWritelines(self):
     255        data = ["hello", "world"]
     256        n = conio.stdout.writelines(data)
     257       
     258        self.failUnlessEqual(n, 10)
     259
     260    def testIsatty(self):
     261        self.failUnless(conio.stdout.isatty())
     262
     263
     264
     265class StdIOTestProtocol(protocol.Protocol):
     266    def __init__(self):
     267        self.onData = defer.Deferred()
     268
     269    def dataReceived(self, data):
     270        self.onData.callback(data)
     271
     272
     273class StdIOTestCase(unittest.TestCase):
     274    """Test twisted.internet.stdio support for consoles.
     275    """
     276 
     277    def setUp(self):
     278        p = StdIOTestProtocol()
     279        self.stdio = stdio.StandardIO(p)
     280        self.onData = p.onData
     281
     282    def tearDown(self):
     283        self.stdio._pause()
     284        try:
     285            self.stdio._stopPolling()
     286        except error.AlreadyCalled:
     287            pass
     288       
     289        conio.stdin.flush()
     290
     291    def testRead(self):
     292        def cb(result):
     293            self.failUnlessEqual(result, "hello\n")
     294
     295        data = u"hello\r"
     296        records = [createKeyEvent(c) for c in data]
     297        stdin.WriteConsoleInput(records)
     298
     299        return self.onData.addCallback(cb)