root / trunk / twisted / runner / procmon.py

Revision 26722, 8.5 kB (checked in by exarkun, 3 months ago)

Merge procmon-env-3691

Author: washort, exarkun
Reviewer: glyph, therve
Fixes: #3691

Add an env parameter to twisted.runner.procmon.ProcessMonitor's
addProcess method to allow applications to customize the environment
with which a process is run.

Line 
1 # Copyright (c) 2001-2009 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 ProcessMonitor: run processes, monitor progress, and restart when
6 they die.
7
8 The ProcessMonitor will not attempt to restart a process that appears
9 to die instantly -- with each "instant" death (less than 1 second, by
10 default), it will delay approximately twice as long before restarting
11 it. A successful run will reset the counter.
12
13 The primary interface is "addProcess" and "removeProcess". When the
14 service is active (that is, when the application it is attached to
15 is running), adding a process automatically starts it.
16
17 Each process has a name (a string). This string must uniquely identify
18 the process. In particular, attempting to add two processes with the
19 same name will result in a key error.
20
21 The arguments to addProcess are:
22   - name -- a string, uniquely specifying the process
23   - args -- a list of arguments. the first will be used to determine the
24           executable
25   - optionally, the uid and gid this process should be run as (by default,
26     it does not change uid/gid before running processes).
27
28 Note that args are passed to the system call, not to the shell. If running
29 the shell is desired, the common idiom is to use
30 .addProcess("name", ['/bin/sh', '-c', shell_script])
31
32 removeProcess takes just the name argument. If the process is started, it
33 kills it, and will never restart it.
34
35 The "restartAll" method restarts all processes. This is useful for 3rd
36 parties management services to allow a user to restart servers because
37 of an outside circumstances change -- for example, a new version of a library
38 which is installed.
39
40 The following attributes on the monitor can be set to configure behaviour
41   - threshold -- how long a process has to live before the death is considered
42                  instant (default 1, measured in seconds)
43   - killTime -- how long a process being killed has to get its affairs in
44                 order before it gets killed with an unmaskable signal
45                 (default 5, measured in seconds)
46   - consistencyDelay -- time between consistency checks
47                         (default 60, measured in seconds)
48 """
49
50 import os, time
51
52 from twisted.python import log
53 from twisted.internet import error, protocol, reactor
54 from twisted.application import service
55 from twisted.protocols import basic
56
57 class DummyTransport:
58
59     disconnecting = 0
60
61 transport = DummyTransport()
62
63 class LineLogger(basic.LineReceiver):
64
65     tag = None
66     delimiter = '\n'
67
68     def lineReceived(self, line):
69         log.msg('[%s] %s' % (self.tag, line))
70
71 class LoggingProtocol(protocol.ProcessProtocol):
72
73     service = None
74     name = None
75     empty = 1
76
77     def connectionMade(self):
78         self.output = LineLogger()
79         self.output.tag = self.name
80         self.output.makeConnection(transport)
81
82     def outReceived(self, data):
83         self.output.dataReceived(data)
84         self.empty = data[-1] == '\n'
85
86     errReceived = outReceived
87
88     def processEnded(self, reason):
89         if not self.empty:
90             self.output.dataReceived('\n')
91         self.service.connectionLost(self.name)
92
93
94 class ProcessMonitor(service.Service):
95
96     threshold = 1
97     active = 0
98     killTime = 5
99     consistency = None
100     consistencyDelay = 60
101
102     def __init__(self):
103         self.processes = {}
104         self.protocols = {}
105         self.delay = {}
106         self.timeStarted = {}
107         self.murder = {}
108
109     def __getstate__(self):
110         dct = service.Service.__getstate__(self)
111         for k in ('active', 'consistency'):
112             if dct.has_key(k):
113                 del dct[k]
114         dct['protocols'] = {}
115         dct['delay'] = {}
116         dct['timeStarted'] = {}
117         dct['murder'] = {}
118         return dct
119
120     def _checkConsistency(self):
121         for name, protocol in self.protocols.items():
122             proc = protocol.transport
123             try:
124                 proc.signalProcess(0)
125             except (OSError, error.ProcessExitedAlready):
126                 log.msg("Lost process %r somehow, restarting." % name)
127                 del self.protocols[name]
128                 self.startProcess(name)
129         self.consistency = reactor.callLater(self.consistencyDelay,
130                                              self._checkConsistency)
131
132
133     def addProcess(self, name, args, uid=None, gid=None, env={}):
134         """
135         Add a new process to launch, monitor, and restart when necessary.
136
137         @param name: A label for this process.  This value must be unique
138             across all processes added to this monitor.
139
140         @param args: The argv sequence for the process to launch.
141         @param uid: The user ID to use to run the process.  If C{None}, the
142             current UID is used.
143         @type uid: C{int}
144         @param gid: The group ID to use to run the process.  If C{None}, the
145             current GID is used.
146         @type uid: C{int}
147         @param env: The environment to give to the launched process.  See
148             L{IReactorProcess.spawnProcess}'s C{env} parameter.
149         @type env: C{dict}
150         """
151         if name in self.processes:
152             raise KeyError("remove %s first" % name)
153         self.processes[name] = args, uid, gid, env
154         if self.active:
155             self.startProcess(name)
156
157
158     def removeProcess(self, name):
159         del self.processes[name]
160         self.stopProcess(name)
161
162     def startService(self):
163         service.Service.startService(self)
164         self.active = 1
165         for name in self.processes.keys():
166             reactor.callLater(0, self.startProcess, name)
167         self.consistency = reactor.callLater(self.consistencyDelay,
168                                              self._checkConsistency)
169
170     def stopService(self):
171         service.Service.stopService(self)
172         self.active = 0
173         for name in self.processes.keys():
174             self.stopProcess(name)
175         self.consistency.cancel()
176
177     def connectionLost(self, name):
178         if self.murder.has_key(name):
179             self.murder[name].cancel()
180             del self.murder[name]
181         if self.protocols.has_key(name):
182             del self.protocols[name]
183         if time.time()-self.timeStarted[name]<self.threshold:
184             delay = self.delay[name] = min(1+2*self.delay.get(name, 0), 3600)
185         else:
186             delay = self.delay[name] = 0
187         if self.active and self.processes.has_key(name):
188             reactor.callLater(delay, self.startProcess, name)
189
190     def startProcess(self, name):
191         if self.protocols.has_key(name):
192             return
193         p = self.protocols[name] = LoggingProtocol()
194         p.service = self
195         p.name = name
196         args, uid, gid, env = self.processes[name]
197         self.timeStarted[name] = time.time()
198         reactor.spawnProcess(p, args[0], args, uid=uid, gid=gid, env=env)
199
200     def _forceStopProcess(self, proc):
201         try:
202             proc.signalProcess('KILL')
203         except error.ProcessExitedAlready:
204             pass
205
206     def stopProcess(self, name):
207         if not self.protocols.has_key(name):
208             return
209         proc = self.protocols[name].transport
210         del self.protocols[name]
211         try:
212             proc.signalProcess('TERM')
213         except error.ProcessExitedAlready:
214             pass
215         else:
216             self.murder[name] = reactor.callLater(self.killTime, self._forceStopProcess, proc)
217
218     def restartAll(self):
219         for name in self.processes.keys():
220             self.stopProcess(name)
221
222     def __repr__(self):
223         l = []
224         for name, proc in self.processes.items():
225             uidgid = ''
226             if proc[1] is not None:
227                 uidgid = str(proc[1])
228             if proc[2] is not None:
229                 uidgid += ':'+str(proc[2])
230
231             if uidgid:
232                 uidgid = '(' + uidgid + ')'
233             l.append('%r%s: %r' % (name, uidgid, proc[0]))
234         return ('<' + self.__class__.__name__ + ' '
235                 + ' '.join(l)
236                 + '>')
237
238 def main():
239     from signal import SIGTERM
240     mon = ProcessMonitor()
241     mon.addProcess('foo', ['/bin/sh', '-c', 'sleep 2;echo hello'])
242     mon.addProcess('qux', ['/bin/sh', '-c', 'sleep 2;printf pilim'])
243     mon.addProcess('bar', ['/bin/sh', '-c', 'echo goodbye'])
244     mon.addProcess('baz', ['/bin/sh', '-c',
245                    'echo welcome;while :;do echo blah;sleep 5;done'])
246     reactor.callLater(30, lambda mon=mon:
247                           os.kill(mon.protocols['baz'].transport.pid, SIGTERM))
248     reactor.callLater(60, mon.restartAll)
249     mon.startService()
250     reactor.addSystemEventTrigger('before', 'shutdown', mon.stopService)
251     reactor.run()
252
253 if __name__ == '__main__':
254    main()
Note: See TracBrowser for help on using the browser.