Ticket #5744: pingbars.py

File pingbars.py, 3.1 KB (added by Richard Wall, 9 years ago)

Prints a bar chart to the console showing the round trip times reported by a ping subprocess.

Line 
1#!/usr/bin/env python
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Print a simple bar chart of ping round trip times eg
7
8 python pingbars.py www.example.com
9"""
10
11import os, re, sys
12
13from twisted.internet.defer import Deferred
14from twisted.internet.endpoints import connectProtocol, ProcessEndpoint
15from twisted.internet.error import ConnectionDone, ConnectionLost
16from twisted.protocols.basic import LineOnlyReceiver
17from twisted.python import log, usage
18
19
20
21class PingRttPrinter(LineOnlyReceiver):
22    delimiter = '\n'
23    pingLine = re.compile(
24        r'^\d+ bytes from [^:]+: '
25        r'icmp_seq=\d+ ttl=\d+ time=(?P<rtt>[\d.]+) ms$')
26
27    _offset = None
28
29    def __init__(self):
30        self.finished = Deferred()
31        self._consoleWidth = int(os.environ.get('COLUMNS', '80'))
32
33
34    def connectionMade(self):
35        self._pid = self.transport.pid
36        log.msg(format='process started. pid: %(pid)s', pid=self._pid)
37
38
39    def lineReceived(self, line):
40        m = self.pingLine.match(line)
41        if m:
42            rtt = float(m.group('rtt'))
43            if self._offset is None:
44                self._offset = max(0, rtt - (self._consoleWidth / 2))
45
46            label = ('%.1f' % (rtt,)).rjust(6)
47            barWidth = int(rtt - self._offset)
48            bar = '.' * min(max(0, barWidth), self._consoleWidth)
49
50            if barWidth > self._consoleWidth:
51                bar = bar[:-1] + '>'
52            elif barWidth < 0:
53                bar = '<' + bar[1:]
54
55            sys.stdout.write(label + ' ' + bar + '\n')
56            sys.stdout.flush()
57        else:
58            log.msg(format='unrecognised line: %(line)r', line=line)
59
60
61    def connectionLost(self, reason):
62        sys.stdout.write('\n')
63        sys.stdout.flush()
64        try:
65            reason.trap(ConnectionDone, ConnectionLost)
66        except:
67            self.finished.errback()
68        else:
69            log.msg(
70                format='process exited. pid: %(pid)s, reason: %(reason)r',
71                pid=self._pid, reason=reason.value)
72            self.finished.callback(reason.value)
73
74
75
76def startPing(reactor, options):
77    d = connectProtocol(
78        ProcessEndpoint(
79            reactor,
80            '/usr/bin/ping',
81            args=('/usr/bin/ping', '-n', options['host'])),
82        PingRttPrinter())
83
84    @d.addCallback
85    def waitForProcess(proto):
86        return proto.finished
87
88    return d
89
90
91
92class Options(usage.Options):
93    synopsis = "%s [OPTIONS] HOST" % (os.path.basename(sys.argv[0]),)
94    optFlags = [
95        ["verbose", "v", "Enable verbose logging to stderr."],
96    ]
97
98    def parseArgs(self, host):
99        self['host'] = host
100
101
102
103def main(reactor):
104    options = Options()
105
106    try:
107        options.parseOptions()
108    except usage.UsageError as errortext:
109        sys.stderr.write(str(options) + '\n')
110        sys.stderr.write('ERROR: %s\n' % (errortext,))
111        raise SystemExit(1)
112
113    if options['verbose']:
114        log.startLogging(sys.stderr)
115
116    return startPing(reactor, options)
117
118
119
120if __name__ == '__main__':
121    from twisted.internet.task import react
122    react(main)