root/trunk/twisted/conch/insults/helper.py

Revision 30752, 13.3 KB (checked in by exarkun, 15 months ago)

Rewrite the copyright headers to exclude date information.

Author: exarkun
Reviewer: glyph
Fixes: #4857

To avoid the need to perpetually update copyright dates in each file in Twisted,
remove the dates from most files and just leave them in the LICENSE file.

As a side effect, some files also have had a trailing newline added where it was
missing before.

Line 
1# -*- test-case-name: twisted.conch.test.test_helper -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Partial in-memory terminal emulator
7
8@author: Jp Calderone
9"""
10
11import re, string
12
13from zope.interface import implements
14
15from twisted.internet import defer, protocol, reactor
16from twisted.python import log
17
18from twisted.conch.insults import insults
19
20FOREGROUND = 30
21BACKGROUND = 40
22BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)
23
24class CharacterAttribute:
25    """Represents the attributes of a single character.
26
27    Character set, intensity, underlinedness, blinkitude, video
28    reversal, as well as foreground and background colors made up a
29    character's attributes.
30    """
31    def __init__(self, charset=insults.G0,
32                 bold=False, underline=False,
33                 blink=False, reverseVideo=False,
34                 foreground=WHITE, background=BLACK,
35
36                 _subtracting=False):
37        self.charset = charset
38        self.bold = bold
39        self.underline = underline
40        self.blink = blink
41        self.reverseVideo = reverseVideo
42        self.foreground = foreground
43        self.background = background
44
45        self._subtracting = _subtracting
46
47    def __eq__(self, other):
48        return vars(self) == vars(other)
49
50    def __ne__(self, other):
51        return not self.__eq__(other)
52
53    def copy(self):
54        c = self.__class__()
55        c.__dict__.update(vars(self))
56        return c
57
58    def wantOne(self, **kw):
59        k, v = kw.popitem()
60        if getattr(self, k) != v:
61            attr = self.copy()
62            attr._subtracting = not v
63            setattr(attr, k, v)
64            return attr
65        else:
66            return self.copy()
67
68    def toVT102(self):
69        # Spit out a vt102 control sequence that will set up
70        # all the attributes set here.  Except charset.
71        attrs = []
72        if self._subtracting:
73            attrs.append(0)
74        if self.bold:
75            attrs.append(insults.BOLD)
76        if self.underline:
77            attrs.append(insults.UNDERLINE)
78        if self.blink:
79            attrs.append(insults.BLINK)
80        if self.reverseVideo:
81            attrs.append(insults.REVERSE_VIDEO)
82        if self.foreground != WHITE:
83            attrs.append(FOREGROUND + self.foreground)
84        if self.background != BLACK:
85            attrs.append(BACKGROUND + self.background)
86        if attrs:
87            return '\x1b[' + ';'.join(map(str, attrs)) + 'm'
88        return ''
89
90# XXX - need to support scroll regions and scroll history
91class TerminalBuffer(protocol.Protocol):
92    """
93    An in-memory terminal emulator.
94    """
95    implements(insults.ITerminalTransport)
96
97    for keyID in ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW',
98                  'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN',
99                  'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9',
100                  'F10', 'F11', 'F12'):
101        exec '%s = object()' % (keyID,)
102
103    TAB = '\t'
104    BACKSPACE = '\x7f'
105
106    width = 80
107    height = 24
108
109    fill = ' '
110    void = object()
111
112    def getCharacter(self, x, y):
113        return self.lines[y][x]
114
115    def connectionMade(self):
116        self.reset()
117
118    def write(self, bytes):
119        """
120        Add the given printable bytes to the terminal.
121
122        Line feeds in C{bytes} will be replaced with carriage return / line
123        feed pairs.
124        """
125        for b in bytes.replace('\n', '\r\n'):
126            self.insertAtCursor(b)
127
128    def _currentCharacterAttributes(self):
129        return CharacterAttribute(self.activeCharset, **self.graphicRendition)
130
131    def insertAtCursor(self, b):
132        """
133        Add one byte to the terminal at the cursor and make consequent state
134        updates.
135
136        If b is a carriage return, move the cursor to the beginning of the
137        current row.
138
139        If b is a line feed, move the cursor to the next row or scroll down if
140        the cursor is already in the last row.
141
142        Otherwise, if b is printable, put it at the cursor position (inserting
143        or overwriting as dictated by the current mode) and move the cursor.
144        """
145        if b == '\r':
146            self.x = 0
147        elif b == '\n':
148            self._scrollDown()
149        elif b in string.printable:
150            if self.x >= self.width:
151                self.nextLine()
152            ch = (b, self._currentCharacterAttributes())
153            if self.modes.get(insults.modes.IRM):
154                self.lines[self.y][self.x:self.x] = [ch]
155                self.lines[self.y].pop()
156            else:
157                self.lines[self.y][self.x] = ch
158            self.x += 1
159
160    def _emptyLine(self, width):
161        return [(self.void, self._currentCharacterAttributes()) for i in xrange(width)]
162
163    def _scrollDown(self):
164        self.y += 1
165        if self.y >= self.height:
166            self.y -= 1
167            del self.lines[0]
168            self.lines.append(self._emptyLine(self.width))
169
170    def _scrollUp(self):
171        self.y -= 1
172        if self.y < 0:
173            self.y = 0
174            del self.lines[-1]
175            self.lines.insert(0, self._emptyLine(self.width))
176
177    def cursorUp(self, n=1):
178        self.y = max(0, self.y - n)
179
180    def cursorDown(self, n=1):
181        self.y = min(self.height - 1, self.y + n)
182
183    def cursorBackward(self, n=1):
184        self.x = max(0, self.x - n)
185
186    def cursorForward(self, n=1):
187        self.x = min(self.width, self.x + n)
188
189    def cursorPosition(self, column, line):
190        self.x = column
191        self.y = line
192
193    def cursorHome(self):
194        self.x = self.home.x
195        self.y = self.home.y
196
197    def index(self):
198        self._scrollDown()
199
200    def reverseIndex(self):
201        self._scrollUp()
202
203    def nextLine(self):
204        """
205        Update the cursor position attributes and scroll down if appropriate.
206        """
207        self.x = 0
208        self._scrollDown()
209
210    def saveCursor(self):
211        self._savedCursor = (self.x, self.y)
212
213    def restoreCursor(self):
214        self.x, self.y = self._savedCursor
215        del self._savedCursor
216
217    def setModes(self, modes):
218        for m in modes:
219            self.modes[m] = True
220
221    def resetModes(self, modes):
222        for m in modes:
223            try:
224                del self.modes[m]
225            except KeyError:
226                pass
227
228
229    def setPrivateModes(self, modes):
230        """
231        Enable the given modes.
232
233        Track which modes have been enabled so that the implementations of
234        other L{insults.ITerminalTransport} methods can be properly implemented
235        to respect these settings.
236
237        @see: L{resetPrivateModes}
238        @see: L{insults.ITerminalTransport.setPrivateModes}
239        """
240        for m in modes:
241            self.privateModes[m] = True
242
243
244    def resetPrivateModes(self, modes):
245        """
246        Disable the given modes.
247
248        @see: L{setPrivateModes}
249        @see: L{insults.ITerminalTransport.resetPrivateModes}
250        """
251        for m in modes:
252            try:
253                del self.privateModes[m]
254            except KeyError:
255                pass
256
257
258    def applicationKeypadMode(self):
259        self.keypadMode = 'app'
260
261    def numericKeypadMode(self):
262        self.keypadMode = 'num'
263
264    def selectCharacterSet(self, charSet, which):
265        self.charsets[which] = charSet
266
267    def shiftIn(self):
268        self.activeCharset = insults.G0
269
270    def shiftOut(self):
271        self.activeCharset = insults.G1
272
273    def singleShift2(self):
274        oldActiveCharset = self.activeCharset
275        self.activeCharset = insults.G2
276        f = self.insertAtCursor
277        def insertAtCursor(b):
278            f(b)
279            del self.insertAtCursor
280            self.activeCharset = oldActiveCharset
281        self.insertAtCursor = insertAtCursor
282
283    def singleShift3(self):
284        oldActiveCharset = self.activeCharset
285        self.activeCharset = insults.G3
286        f = self.insertAtCursor
287        def insertAtCursor(b):
288            f(b)
289            del self.insertAtCursor
290            self.activeCharset = oldActiveCharset
291        self.insertAtCursor = insertAtCursor
292
293    def selectGraphicRendition(self, *attributes):
294        for a in attributes:
295            if a == insults.NORMAL:
296                self.graphicRendition = {
297                    'bold': False,
298                    'underline': False,
299                    'blink': False,
300                    'reverseVideo': False,
301                    'foreground': WHITE,
302                    'background': BLACK}
303            elif a == insults.BOLD:
304                self.graphicRendition['bold'] = True
305            elif a == insults.UNDERLINE:
306                self.graphicRendition['underline'] = True
307            elif a == insults.BLINK:
308                self.graphicRendition['blink'] = True
309            elif a == insults.REVERSE_VIDEO:
310                self.graphicRendition['reverseVideo'] = True
311            else:
312                try:
313                    v = int(a)
314                except ValueError:
315                    log.msg("Unknown graphic rendition attribute: " + repr(a))
316                else:
317                    if FOREGROUND <= v <= FOREGROUND + N_COLORS:
318                        self.graphicRendition['foreground'] = v - FOREGROUND
319                    elif BACKGROUND <= v <= BACKGROUND + N_COLORS:
320                        self.graphicRendition['background'] = v - BACKGROUND
321                    else:
322                        log.msg("Unknown graphic rendition attribute: " + repr(a))
323
324    def eraseLine(self):
325        self.lines[self.y] = self._emptyLine(self.width)
326
327    def eraseToLineEnd(self):
328        width = self.width - self.x
329        self.lines[self.y][self.x:] = self._emptyLine(width)
330
331    def eraseToLineBeginning(self):
332        self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1)
333
334    def eraseDisplay(self):
335        self.lines = [self._emptyLine(self.width) for i in xrange(self.height)]
336
337    def eraseToDisplayEnd(self):
338        self.eraseToLineEnd()
339        height = self.height - self.y - 1
340        self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(height)]
341
342    def eraseToDisplayBeginning(self):
343        self.eraseToLineBeginning()
344        self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y)]
345
346    def deleteCharacter(self, n=1):
347        del self.lines[self.y][self.x:self.x+n]
348        self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))
349
350    def insertLine(self, n=1):
351        self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(n)]
352        del self.lines[self.height:]
353
354    def deleteLine(self, n=1):
355        del self.lines[self.y:self.y+n]
356        self.lines.extend([self._emptyLine(self.width) for i in range(n)])
357
358    def reportCursorPosition(self):
359        return (self.x, self.y)
360
361    def reset(self):
362        self.home = insults.Vector(0, 0)
363        self.x = self.y = 0
364        self.modes = {}
365        self.privateModes = {}
366        self.setPrivateModes([insults.privateModes.AUTO_WRAP,
367                              insults.privateModes.CURSOR_MODE])
368        self.numericKeypad = 'app'
369        self.activeCharset = insults.G0
370        self.graphicRendition = {
371            'bold': False,
372            'underline': False,
373            'blink': False,
374            'reverseVideo': False,
375            'foreground': WHITE,
376            'background': BLACK}
377        self.charsets = {
378            insults.G0: insults.CS_US,
379            insults.G1: insults.CS_US,
380            insults.G2: insults.CS_ALTERNATE,
381            insults.G3: insults.CS_ALTERNATE_SPECIAL}
382        self.eraseDisplay()
383
384    def unhandledControlSequence(self, buf):
385        print 'Could not handle', repr(buf)
386
387    def __str__(self):
388        lines = []
389        for L in self.lines:
390            buf = []
391            length = 0
392            for (ch, attr) in L:
393                if ch is not self.void:
394                    buf.append(ch)
395                    length = len(buf)
396                else:
397                    buf.append(self.fill)
398            lines.append(''.join(buf[:length]))
399        return '\n'.join(lines)
400
401class ExpectationTimeout(Exception):
402    pass
403
404class ExpectableBuffer(TerminalBuffer):
405    _mark = 0
406
407    def connectionMade(self):
408        TerminalBuffer.connectionMade(self)
409        self._expecting = []
410
411    def write(self, bytes):
412        TerminalBuffer.write(self, bytes)
413        self._checkExpected()
414
415    def cursorHome(self):
416        TerminalBuffer.cursorHome(self)
417        self._mark = 0
418
419    def _timeoutExpected(self, d):
420        d.errback(ExpectationTimeout())
421        self._checkExpected()
422
423    def _checkExpected(self):
424        s = str(self)[self._mark:]
425        while self._expecting:
426            expr, timer, deferred = self._expecting[0]
427            if timer and not timer.active():
428                del self._expecting[0]
429                continue
430            for match in expr.finditer(s):
431                if timer:
432                    timer.cancel()
433                del self._expecting[0]
434                self._mark += match.end()
435                s = s[match.end():]
436                deferred.callback(match)
437                break
438            else:
439                return
440
441    def expect(self, expression, timeout=None, scheduler=reactor):
442        d = defer.Deferred()
443        timer = None
444        if timeout:
445            timer = scheduler.callLater(timeout, self._timeoutExpected, d)
446        self._expecting.append((re.compile(expression), timer, d))
447        self._checkExpected()
448        return d
449
450__all__ = ['CharacterAttribute', 'TerminalBuffer', 'ExpectableBuffer']
Note: See TracBrowser for help on using the browser.