root / trunk / twisted / scripts / trial.py

Revision 22278, 13.3 kB (checked in by therve, 1 year ago)

Merge set-compat-2761-3

Author: therve
Reviewers: exarkun, ralphm
Fixes #2761

Add twisted.python.compat.set and twisted.python.compat.frozenset, to be used
for keeping compatibility when the builtins is not available (ie, Python 2.3).

Line 
1 # -*- test-case-name: twisted.trial.test.test_script -*-
2
3 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6
7 import sys, os, random, gc, time, warnings
8
9 from twisted.internet import defer
10 from twisted.application import app
11 from twisted.python import usage, reflect, failure
12 from twisted import plugin
13 from twisted.python.util import spewer
14 from twisted.python.compat import set
15 from twisted.trial import runner, itrial, reporter
16
17
18 # Yea, this is stupid.  Leave it for for command-line compatibility for a
19 # while, though.
20 TBFORMAT_MAP = {
21     'plain': 'default',
22     'default': 'default',
23     'emacs': 'brief',
24     'brief': 'brief',
25     'cgitb': 'verbose',
26     'verbose': 'verbose'
27     }
28
29
30 def _parseLocalVariables(line):
31     """Accepts a single line in Emacs local variable declaration format and
32     returns a dict of all the variables {name: value}.
33     Raises ValueError if 'line' is in the wrong format.
34
35     See http://www.gnu.org/software/emacs/manual/html_node/File-Variables.html
36     """
37     paren = '-*-'
38     start = line.find(paren) + len(paren)
39     end = line.rfind(paren)
40     if start == -1 or end == -1:
41         raise ValueError("%r not a valid local variable declaration" % (line,))
42     items = line[start:end].split(';')
43     localVars = {}
44     for item in items:
45         if len(item.strip()) == 0:
46             continue
47         split = item.split(':')
48         if len(split) != 2:
49             raise ValueError("%r contains invalid declaration %r"
50                              % (line, item))
51         localVars[split[0].strip()] = split[1].strip()
52     return localVars
53
54
55 def loadLocalVariables(filename):
56     """Accepts a filename and attempts to load the Emacs variable declarations
57     from that file, simulating what Emacs does.
58
59     See http://www.gnu.org/software/emacs/manual/html_node/File-Variables.html
60     """
61     f = file(filename, "r")
62     lines = [f.readline(), f.readline()]
63     f.close()
64     for line in lines:
65         try:
66             return _parseLocalVariables(line)
67         except ValueError:
68             pass
69     return {}
70
71
72 def getTestModules(filename):
73     testCaseVar = loadLocalVariables(filename).get('test-case-name', None)
74     if testCaseVar is None:
75         return []
76     return testCaseVar.split(',')
77
78
79 def isTestFile(filename):
80     """Returns true if 'filename' looks like a file containing unit tests.
81     False otherwise.  Doesn't care whether filename exists.
82     """
83     basename = os.path.basename(filename)
84     return (basename.startswith('test_')
85             and os.path.splitext(basename)[1] == ('.py'))
86
87
88 def _zshReporterAction():
89     return "(%s)" % (" ".join([p.longOpt for p in plugin.getPlugins(itrial.IReporter)]),)
90
91 class Options(usage.Options, app.ReactorSelectionMixin):
92     synopsis = """%s [options] [[file|package|module|TestCase|testmethod]...]
93     """ % (os.path.basename(sys.argv[0]),)
94
95     optFlags = [["help", "h"],
96                 ["rterrors", "e", "realtime errors, print out tracebacks as "
97                  "soon as they occur"],
98                 ["debug", "b", "Run tests in the Python debugger. Will load "
99                  "'.pdbrc' from current directory if it exists."],
100                 ["debug-stacktraces", "B", "Report Deferred creation and "
101                  "callback stack traces"],
102                 ["nopm", None, "don't automatically jump into debugger for "
103                  "postmorteming of exceptions"],
104                 ["dry-run", 'n', "do everything but run the tests"],
105                 ["force-gc", None, "Have Trial run gc.collect() before and "
106                  "after each test case."],
107                 ["profile", None, "Run tests under the Python profiler"],
108                 ["unclean-warnings", None,
109                  "Turn dirty reactor errors into warnings"],
110                 ["until-failure", "u", "Repeat test until it fails"],
111                 ["no-recurse", "N", "Don't recurse into packages"],
112                 ['help-reporters', None,
113                  "Help on available output plugins (reporters)"]
114                 ]
115
116     optParameters = [
117         ["logfile", "l", "test.log", "log file name"],
118         ["random", "z", None,
119          "Run tests in random order using the specified seed"],
120         ['temp-directory', None, '_trial_temp',
121          'Path to use as working directory for tests.'],
122         ['reporter', None, 'verbose',
123          'The reporter to use for this test run.  See --help-reporters for '
124          'more info.']]
125
126     zsh_actions = {"tbformat":"(plain emacs cgitb)",
127                    "reporter":_zshReporterAction}
128     zsh_actionDescr = {"logfile":"log file name",
129                        "random":"random seed"}
130     zsh_extras = ["*:file|module|package|TestCase|testMethod:_files -g '*.py'"]
131
132     fallbackReporter = reporter.TreeReporter
133     extra = None
134     tracer = None
135
136     def __init__(self):
137         self['tests'] = set()
138         usage.Options.__init__(self)
139
140     def opt_coverage(self):
141         """
142         Generate coverage information in the _trial_temp/coverage. Requires
143         Python 2.3.3.
144         """
145         coverdir = 'coverage'
146         print "Setting coverage directory to %s." % (coverdir,)
147         import trace
148
149         # begin monkey patch ---------------------------
150         #   Before Python 2.4, this function asserted that 'filename' had
151         #   to end with '.py'  This is wrong for at least two reasons:
152         #   1.  We might be wanting to find executable line nos in a script
153         #   2.  The implementation should use os.splitext
154         #   This monkey patch is the same function as in the stdlib (v2.3)
155         #   but with the assertion removed.
156         def find_executable_linenos(filename):
157             """Return dict where keys are line numbers in the line number
158             table.
159             """
160             #assert filename.endswith('.py') # YOU BASTARDS
161             try:
162                 prog = open(filename).read()
163                 prog = '\n'.join(prog.splitlines()) + '\n'
164             except IOError, err:
165                 sys.stderr.write("Not printing coverage data for %r: %s\n"
166                                  % (filename, err))
167                 sys.stderr.flush()
168                 return {}
169             code = compile(prog, filename, "exec")
170             strs = trace.find_strings(filename)
171             return trace.find_lines(code, strs)
172
173         trace.find_executable_linenos = find_executable_linenos
174         # end monkey patch ------------------------------
175
176         self.coverdir = os.path.abspath(os.path.join(self['temp-directory'], coverdir))
177         self.tracer = trace.Trace(count=1, trace=0)
178         sys.settrace(self.tracer.globaltrace)
179
180     def opt_testmodule(self, filename):
181         "Filename to grep for test cases (-*- test-case-name)"
182         # If the filename passed to this parameter looks like a test module
183         # we just add that to the test suite.
184         #
185         # If not, we inspect it for an Emacs buffer local variable called
186         # 'test-case-name'.  If that variable is declared, we try to add its
187         # value to the test suite as a module.
188         #
189         # This parameter allows automated processes (like Buildbot) to pass
190         # a list of files to Trial with the general expectation of "these files,
191         # whatever they are, will get tested"
192         if not os.path.isfile(filename):
193             sys.stderr.write("File %r doesn't exist\n" % (filename,))
194             return
195         filename = os.path.abspath(filename)
196         if isTestFile(filename):
197             self['tests'].add(filename)
198         else:
199             self['tests'].update(getTestModules(filename))
200
201     def opt_spew(self):
202         """Print an insanely verbose log of everything that happens.  Useful
203         when debugging freezes or locks in complex code."""
204         sys.settrace(spewer)
205
206
207     def opt_help_reporters(self):
208         synopsis = ("Trial's output can be customized using plugins called "
209                     "Reporters. You can\nselect any of the following "
210                     "reporters using --reporter=<foo>\n")
211         print synopsis
212         for p in plugin.getPlugins(itrial.IReporter):
213             print '   ', p.longOpt, '\t', p.description
214         print
215         sys.exit(0)
216
217     def opt_disablegc(self):
218         """Disable the garbage collector"""
219         gc.disable()
220
221     def opt_tbformat(self, opt):
222         """Specify the format to display tracebacks with. Valid formats are
223         'plain', 'emacs', and 'cgitb' which uses the nicely verbose stdlib
224         cgitb.text function"""
225         try:
226             self['tbformat'] = TBFORMAT_MAP[opt]
227         except KeyError:
228             raise usage.UsageError(
229                 "tbformat must be 'plain', 'emacs', or 'cgitb'.")
230
231     def opt_extra(self, arg):
232         """
233         Add an extra argument.  (This is a hack necessary for interfacing with
234         emacs's `gud'.)
235         """
236         if self.extra is None:
237             self.extra = []
238         self.extra.append(arg)
239     opt_x = opt_extra
240
241     def opt_recursionlimit(self, arg):
242         """see sys.setrecursionlimit()"""
243         try:
244             sys.setrecursionlimit(int(arg))
245         except (TypeError, ValueError):
246             raise usage.UsageError(
247                 "argument to recursionlimit must be an integer")
248
249     def opt_random(self, option):
250         try:
251             self['random'] = long(option)
252         except ValueError:
253             raise usage.UsageError(
254                 "Argument to --random must be a positive integer")
255         else:
256             if self['random'] < 0:
257                 raise usage.UsageError(
258                     "Argument to --random must be a positive integer")
259             elif self['random'] == 0:
260                 self['random'] = long(time.time() * 100)
261
262     def opt_without_module(self, option):
263         """
264         Fake the lack of the specified modules, separated with commas.
265         """
266         for module in option.split(","):
267             if module in sys.modules:
268                 warnings.warn("Module '%s' already imported, "
269                               "disabling anyway." % (module,),
270                               category=RuntimeWarning)
271             sys.modules[module] = None
272
273     def parseArgs(self, *args):
274         self['tests'].update(args)
275         if self.extra is not None:
276             self['tests'].update(self.extra)
277
278     def _loadReporterByName(self, name):
279         for p in plugin.getPlugins(itrial.IReporter):
280             qual = "%s.%s" % (p.module, p.klass)
281             if p.longOpt == name:
282                 return reflect.namedAny(qual)
283         raise usage.UsageError("Only pass names of Reporter plugins to "
284                                "--reporter. See --help-reporters for "
285                                "more info.")
286
287
288     def postOptions(self):
289
290         # Only load reporters now, as opposed to any earlier, to avoid letting
291         # application-defined plugins muck up reactor selecting by importing
292         # t.i.reactor and causing the default to be installed.
293         self['reporter'] = self._loadReporterByName(self['reporter'])
294
295         if 'tbformat' not in self:
296             self['tbformat'] = 'default'
297         if self['nopm']:
298             if not self['debug']:
299                 raise usage.UsageError("you must specify --debug when using "
300                                        "--nopm ")
301             failure.DO_POST_MORTEM = False
302
303
304 def _initialDebugSetup(config):
305     # do this part of debug setup first for easy debugging of import failures
306     if config['debug']:
307         failure.startDebugMode()
308     if config['debug'] or config['debug-stacktraces']:
309         defer.setDebugging(True)
310
311
312 def _getSuite(config):
313     loader = _getLoader(config)
314     recurse = not config['no-recurse']
315     return loader.loadByNames(config['tests'], recurse)
316
317
318 def _getLoader(config):
319     loader = runner.TestLoader()
320     if config['random']:
321         randomer = random.Random()
322         randomer.seed(config['random'])
323         loader.sorter = lambda x : randomer.random()
324         print 'Running tests shuffled with seed %d\n' % config['random']
325     if not config['until-failure']:
326         loader.suiteFactory = runner.DestructiveTestSuite
327     return loader
328
329
330 def _makeRunner(config):
331     mode = None
332     if config['debug']:
333         mode = runner.TrialRunner.DEBUG
334     if config['dry-run']:
335         mode = runner.TrialRunner.DRY_RUN
336     return runner.TrialRunner(config['reporter'],
337                               mode=mode,
338                               profile=config['profile'],
339                               logfile=config['logfile'],
340                               tracebackFormat=config['tbformat'],
341                               realTimeErrors=config['rterrors'],
342                               uncleanWarnings=config['unclean-warnings'],
343                               workingDirectory=config['temp-directory'],
344                               forceGarbageCollection=config['force-gc'])
345
346
347 def run():
348     if len(sys.argv) == 1:
349         sys.argv.append("--help")
350     config = Options()
351     try:
352         config.parseOptions()
353     except usage.error, ue:
354         raise SystemExit, "%s: %s" % (sys.argv[0], ue)
355     _initialDebugSetup(config)
356     trialRunner = _makeRunner(config)
357     suite = _getSuite(config)
358     if config['until-failure']:
359         test_result = trialRunner.runUntilFailure(suite)
360     else:
361         test_result = trialRunner.run(suite)
362     if config.tracer:
363         sys.settrace(None)
364         results = config.tracer.results()
365         results.write_results(show_missing=1, summary=False,
366                               coverdir=config.coverdir)
367     sys.exit(not test_result.wasSuccessful())
368
Note: See TracBrowser for help on using the browser.