root / trunk / twisted / trial / reporter.py

Revision 25279, 31.9 kB (checked in by exarkun, 8 months ago)

Merge flushwarnings-3487

Author: radix, itamar, exarkun
Reviewer: glyph
Fixes: #3487
Fixes: #3427
Fixes: #2820
Fixes: #3506

Introduce TestCase.flushWarnings, an API for interacting with the Python warnings
module in unit tests. By default, the default trial reporter will write warnings
emitted by tests to its output stream (no longer does the runner print them to
stdout). flushWarnings may be used to prevent this from happening in particular
tests, and its return value may be used to make assertions about what warnings have
been emitted. TestCase.assertWarns and TestCase.callDeprecated are both now
implemented in terms of this more flexible API.

The default reporter includes rudamentary support for duplicate suppression: each
warning will be reported only once per test, but if a warning is emitted by two or
more tests, it will be reported for each test (isolating tests from each other
further by making warning behavior the same regardless of what other tests have run).

There is also some support for failing tests based on warnings emitted. If the Python
warnings filters system is used to turn a warning into an exception, any test which
causes such a warning to be emitted will have an error attributed to it.

Line 
1 # -*- test-case-name: twisted.trial.test.test_reporter -*-
2 #
3 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
4 # See LICENSE for details.
5 #
6 # Maintainer: Jonathan Lange
7
8 """
9 Defines classes that handle the results of tests.
10 """
11
12 import sys, os
13 import time
14 import warnings
15
16 from twisted.python.compat import set
17 from twisted.python import reflect, log
18 from twisted.python.components import proxyForInterface
19 from twisted.python.failure import Failure
20 from twisted.python.util import untilConcludes
21 from twisted.trial import itrial, util
22
23 from zope.interface import implements
24
25 pyunit = __import__('unittest')
26
27
28 class BrokenTestCaseWarning(Warning):
29     """emitted as a warning when an exception occurs in one of
30     setUp, tearDown, setUpClass, or tearDownClass"""
31
32
33 class SafeStream(object):
34     """
35     Wraps a stream object so that all C{write} calls are wrapped in
36     L{untilConcludes}.
37     """
38
39     def __init__(self, original):
40         self.original = original
41
42     def __getattr__(self, name):
43         return getattr(self.original, name)
44
45     def write(self, *a, **kw):
46         return untilConcludes(self.original.write, *a, **kw)
47
48
49 class TestResult(pyunit.TestResult, object):
50     """
51     Accumulates the results of several L{twisted.trial.unittest.TestCase}s.
52
53     @ivar successes: count the number of successes achieved by the test run.
54     @type successes: C{int}
55     """
56     implements(itrial.IReporter)
57
58     def __init__(self):
59         super(TestResult, self).__init__()
60         self.skips = []
61         self.expectedFailures = []
62         self.unexpectedSuccesses = []
63         self.successes = 0
64         self._timings = []
65
66     def __repr__(self):
67         return ('<%s run=%d errors=%d failures=%d todos=%d dones=%d skips=%d>'
68                 % (reflect.qual(self.__class__), self.testsRun,
69                    len(self.errors), len(self.failures),
70                    len(self.expectedFailures), len(self.skips),
71                    len(self.unexpectedSuccesses)))
72
73     def _getTime(self):
74         return time.time()
75
76     def _getFailure(self, error):
77         """
78         Convert a C{sys.exc_info()}-style tuple to a L{Failure}, if necessary.
79         """
80         if isinstance(error, tuple):
81             return Failure(error[1], error[0], error[2])
82         return error
83
84     def startTest(self, test):
85         """
86         This must be called before the given test is commenced.
87
88         @type test: L{pyunit.TestCase}
89         """
90         super(TestResult, self).startTest(test)
91         self._testStarted = self._getTime()
92
93     def stopTest(self, test):
94         """
95         This must be called after the given test is completed.
96
97         @type test: L{pyunit.TestCase}
98         """
99         super(TestResult, self).stopTest(test)
100         self._lastTime = self._getTime() - self._testStarted
101
102     def addFailure(self, test, fail):
103         """
104         Report a failed assertion for the given test.
105
106         @type test: L{pyunit.TestCase}
107         @type fail: L{Failure} or L{tuple}
108         """
109         self.failures.append((test, self._getFailure(fail)))
110
111     def addError(self, test, error):
112         """
113         Report an error that occurred while running the given test.
114
115         @type test: L{pyunit.TestCase}
116         @type error: L{Failure} or L{tuple}
117         """
118         self.errors.append((test, self._getFailure(error)))
119
120     def addSkip(self, test, reason):
121         """
122         Report that the given test was skipped.
123
124         In Trial, tests can be 'skipped'. Tests are skipped mostly because there
125         is some platform or configuration issue that prevents them from being
126         run correctly.
127
128         @type test: L{pyunit.TestCase}
129         @type reason: L{str}
130         """
131         self.skips.append((test, reason))
132
133     def addUnexpectedSuccess(self, test, todo):
134         """Report that the given test succeeded against expectations.
135
136         In Trial, tests can be marked 'todo'. That is, they are expected to fail.
137         When a test that is expected to fail instead succeeds, it should call
138         this method to report the unexpected success.
139
140         @type test: L{pyunit.TestCase}
141         @type todo: L{unittest.Todo}
142         """
143         # XXX - 'todo' should just be a string
144         self.unexpectedSuccesses.append((test, todo))
145
146     def addExpectedFailure(self, test, error, todo):
147         """Report that the given test failed, and was expected to do so.
148
149         In Trial, tests can be marked 'todo'. That is, they are expected to fail.
150
151         @type test: L{pyunit.TestCase}
152         @type error: L{Failure}
153         @type todo: L{unittest.Todo}
154         """
155         # XXX - 'todo' should just be a string
156         self.expectedFailures.append((test, error, todo))
157
158     def addSuccess(self, test):
159         """Report that the given test succeeded.
160
161         @type test: L{pyunit.TestCase}
162         """
163         self.successes += 1
164
165     def upDownError(self, method, error, warn, printStatus):
166         warnings.warn("upDownError is deprecated in Twisted 8.0.",
167                       category=DeprecationWarning, stacklevel=3)
168
169     def cleanupErrors(self, errs):
170         """Report an error that occurred during the cleanup between tests.
171         """
172         warnings.warn("Cleanup errors are actual errors. Use addError. "
173                       "Deprecated in Twisted 8.0",
174                       category=DeprecationWarning, stacklevel=2)
175
176     def startSuite(self, name):
177         warnings.warn("startSuite deprecated in Twisted 8.0",
178                       category=DeprecationWarning, stacklevel=2)
179
180     def endSuite(self, name):
181         warnings.warn("endSuite deprecated in Twisted 8.0",
182                       category=DeprecationWarning, stacklevel=2)
183
184
185     def done(self):
186         """
187         The test suite has finished running.
188         """
189
190
191
192 class TestResultDecorator(proxyForInterface(itrial.IReporter,
193                                             "_originalReporter")):
194     """
195     Base class for TestResult decorators.
196
197     @ivar _originalReporter: The wrapped instance of reporter.
198     @type _originalReporter: A provider of L{itrial.IReporter}
199     """
200
201     implements(itrial.IReporter)
202
203
204
205 class UncleanWarningsReporterWrapper(TestResultDecorator):
206     """
207     A wrapper for a reporter that converts L{util.DirtyReactorError}s
208     to warnings.
209     """
210     implements(itrial.IReporter)
211
212     def addError(self, test, error):
213         """
214         If the error is a L{util.DirtyReactorError}, instead of
215         reporting it as a normal error, throw a warning.
216         """
217
218         if (isinstance(error, Failure)
219             and error.check(util.DirtyReactorAggregateError)):
220             warnings.warn(error.getErrorMessage())
221         else:
222             self._originalReporter.addError(test, error)
223
224
225
226 class _AdaptedReporter(TestResultDecorator):
227     """
228     TestResult decorator that makes sure that addError only gets tests that
229     have been adapted with a particular test adapter.
230     """
231
232     def __init__(self, original, testAdapter):
233         """
234         Construct an L{_AdaptedReporter}.
235
236         @param original: An {itrial.IReporter}.
237         @param testAdapter: A callable that returns an L{itrial.ITestCase}.
238         """
239         TestResultDecorator.__init__(self, original)
240         self.testAdapter = testAdapter
241
242
243     def addError(self, test, error):
244         """
245         See L{itrial.IReporter}.
246         """
247         test = self.testAdapter(test)
248         return self._originalReporter.addError(test, error)
249
250
251     def addExpectedFailure(self, test, failure, todo):
252         """
253         See L{itrial.IReporter}.
254         """
255         return self._originalReporter.addExpectedFailure(
256             self.testAdapter(test), failure, todo)
257
258
259     def addFailure(self, test, failure):
260         """
261         See L{itrial.IReporter}.
262         """
263         test = self.testAdapter(test)
264         return self._originalReporter.addFailure(test, failure)
265
266
267     def addSkip(self, test, skip):
268         """
269         See L{itrial.IReporter}.
270         """
271         test = self.testAdapter(test)
272         return self._originalReporter.addSkip(test, skip)
273
274
275     def addUnexpectedSuccess(self, test, todo):
276         """
277         See L{itrial.IReporter}.
278         """
279         test = self.testAdapter(test)
280         return self._originalReporter.addUnexpectedSuccess(test, todo)
281
282
283     def startTest(self, test):
284         """
285         See L{itrial.IReporter}.
286         """
287         return self._originalReporter.startTest(self.testAdapter(test))
288
289
290     def stopTest(self, test):
291         """
292         See L{itrial.IReporter}.
293         """
294         return self._originalReporter.stopTest(self.testAdapter(test))
295
296
297
298 class Reporter(TestResult):
299     """
300     A basic L{TestResult} with support for writing to a stream.
301
302     @ivar _startTime: The time when the first test was started. It defaults to
303         C{None}, which means that no test was actually launched.
304     @type _startTime: C{float} or C{NoneType}
305
306     @ivar _warningCache: A C{set} of tuples of warning message (file, line,
307         text, category) which have already been written to the output stream
308         during the currently executing test.  This is used to avoid writing
309         duplicates of the same warning to the output stream.
310     @type _warningCache: C{set}
311
312     @ivar _publisher: The log publisher which will be observed for warning
313         events.
314     @type _publisher: L{LogPublisher} (or another type sufficiently similar)
315     """
316
317     implements(itrial.IReporter)
318
319     _separator = '-' * 79
320     _doubleSeparator = '=' * 79
321
322     def __init__(self, stream=sys.stdout, tbformat='default', realtime=False,
323                  publisher=None):
324         super(Reporter, self).__init__()
325         self._stream = SafeStream(stream)
326         self.tbformat = tbformat
327         self.realtime = realtime
328         self._startTime = None
329         self._warningCache = set()
330
331         # Start observing log events so as to be able to report warnings.
332         self._publisher = publisher
333         if publisher is not None:
334             publisher.addObserver(self._observeWarnings)
335
336
337     def _observeWarnings(self, event):
338         """
339         Observe warning events and write them to C{self._stream}.
340
341         This method is a log observer which will be registered with
342         C{self._publisher.addObserver}.
343
344         @param event: A C{dict} from the logging system.  If it has a
345             C{'warning'} key, a logged warning will be extracted from it and
346             possibly written to C{self.stream}.
347         """
348         if 'warning' in event:
349             key = (event['filename'], event['lineno'],
350                    event['category'].split('.')[-1],
351                    str(event['warning']))
352             if key not in self._warningCache:
353                 self._warningCache.add(key)
354                 self._stream.write('%s:%s: %s: %s\n' % key)
355
356
357     def stream(self):
358         warnings.warn("stream is deprecated in Twisted 8.0.",
359                       category=DeprecationWarning, stacklevel=2)
360         return self._stream
361     stream = property(stream)
362
363
364     def separator(self):
365         warnings.warn("separator is deprecated in Twisted 8.0.",
366                       category=DeprecationWarning, stacklevel=2)
367         return self._separator
368     separator = property(separator)
369
370
371     def startTest(self, test):
372         """
373         Called when a test begins to run. Records the time when it was first
374         called and resets the warning cache.
375
376         @param test: L{ITestCase}
377         """
378         super(Reporter, self).startTest(test)
379         if self._startTime is None:
380             self._startTime = self._getTime()
381         self._warningCache = set()
382
383
384     def addFailure(self, test, fail):
385         """
386         Called when a test fails. If L{realtime} is set, then it prints the
387         error to the stream.
388
389         @param test: L{ITestCase} that failed.
390         @param fail: L{failure.Failure} containing the error.
391         """
392         super(Reporter, self).addFailure(test, fail)
393         if self.realtime:
394             fail = self.failures[-1][1] # guarantee it's a Failure
395             self._write(self._formatFailureTraceback(fail))
396
397
398     def addError(self, test, error):
399         """
400         Called when a test raises an error. If L{realtime} is set, then it
401         prints the error to the stream.
402
403         @param test: L{ITestCase} that raised the error.
404         @param error: L{failure.Failure} containing the error.
405         """
406         error = self._getFailure(error)
407         super(Reporter, self).addError(test, error)
408         if self.realtime:
409             error = self.errors[-1][1] # guarantee it's a Failure
410             self._write(self._formatFailureTraceback(error))
411
412
413     def write(self, format, *args):
414         warnings.warn("write is deprecated in Twisted 8.0.",
415                       category=DeprecationWarning, stacklevel=2)
416         self._write(format, *args)
417
418
419     def _write(self, format, *args):
420         """
421         Safely write to the reporter's stream.
422
423         @param format: A format string to write.
424         @param *args: The arguments for the format string.
425         """
426         s = str(format)
427         assert isinstance(s, type(''))
428         if args:
429             self._stream.write(s % args)
430         else:
431             self._stream.write(s)
432         untilConcludes(self._stream.flush)
433
434
435     def writeln(self, format, *args):
436         warnings.warn("writeln is deprecated in Twisted 8.0.",
437                       category=DeprecationWarning, stacklevel=2)
438         self._writeln(format, *args)
439
440
441     def _writeln(self, format, *args):
442         """
443         Safely write a line to the reporter's stream. Newline is appended to
444         the format string.
445
446         @param format: A format string to write.
447         @param *args: The arguments for the format string.
448         """
449         self._write(format, *args)
450         self._write('\n')
451
452
453     def upDownError(self, method, error, warn, printStatus):
454         super(Reporter, self).upDownError(method, error, warn, printStatus)
455         if warn:
456             tbStr = self._formatFailureTraceback(error)
457             log.msg(tbStr)
458             msg = ("caught exception in %s, your TestCase is broken\n\n%s"
459                    % (method, tbStr))
460             warnings.warn(msg, BrokenTestCaseWarning, stacklevel=2)
461
462
463     def cleanupErrors(self, errs):
464         super(Reporter, self).cleanupErrors(errs)
465         warnings.warn("%s\n%s" % ("REACTOR UNCLEAN! traceback(s) follow: ",
466                                   self._formatFailureTraceback(errs)),
467                       BrokenTestCaseWarning)
468
469
470     def _trimFrames(self, frames):
471         # when a method fails synchronously, the stack looks like this:
472         #  [0]: defer.maybeDeferred()
473         #  [1]: utils.runWithWarningsSuppressed()
474         #  [2:-2]: code in the test method which failed
475         #  [-1]: unittest.fail
476
477         # when a method fails inside a Deferred (i.e., when the test method
478         # returns a Deferred, and that Deferred's errback fires), the stack
479         # captured inside the resulting Failure looks like this:
480         #  [0]: defer.Deferred._runCallbacks
481         #  [1:-2]: code in the testmethod which failed
482         #  [-1]: unittest.fail
483
484         # as a result, we want to trim either [maybeDeferred,runWWS] or
485         # [Deferred._runCallbacks] from the front, and trim the
486         # [unittest.fail] from the end.
487
488         # There is also another case, when the test method is badly defined and
489         # contains extra arguments.
490
491         newFrames = list(frames)
492
493         if len(frames) < 2:
494             return newFrames
495
496         first = newFrames[0]
497         second = newFrames[1]
498         if (first[0] == "maybeDeferred"
499             and os.path.splitext(os.path.basename(first[1]))[0] == 'defer'
500             and second[0] == "runWithWarningsSuppressed"
501             and os.path.splitext(os.path.basename(second[1]))[0] == 'utils'):
502             newFrames = newFrames[2:]
503         elif (first[0] == "_runCallbacks"
504               and os.path.splitext(os.path.basename(first[1]))[0] == 'defer'):
505             newFrames = newFrames[1:]
506
507         if not newFrames:
508             # The method fails before getting called, probably an argument problem
509             return newFrames
510
511         last = newFrames[-1]
512         if (last[0].startswith('fail')
513             and os.path.splitext(os.path.basename(last[1]))[0] == 'unittest'):
514             newFrames = newFrames[:-1]
515
516         return newFrames
517
518
519     def _formatFailureTraceback(self, fail):
520         if isinstance(fail, str):
521             return fail.rstrip() + '\n'
522         fail.frames, frames = self._trimFrames(fail.frames), fail.frames
523         result = fail.getTraceback(detail=self.tbformat, elideFrameworkCode=True)
524         fail.frames = frames
525         return result
526
527
528     def _printResults(self, flavour, errors, formatter):
529         """
530         Print a group of errors to the stream.
531
532         @param flavour: A string indicating the kind of error (e.g. 'TODO').
533         @param errors: A list of errors, often L{failure.Failure}s, but
534             sometimes 'todo' errors.
535         @param formatter: A callable that knows how to format the errors.
536         """
537         for content in errors:
538             self._writeln(self._doubleSeparator)
539             self._writeln('%s: %s' % (flavour, content[0].id()))
540             self._writeln('')
541             self._write(formatter(*(content[1:])))
542
543
544     def _printExpectedFailure(self, error, todo):
545         return 'Reason: %r\n%s' % (todo.reason,
546                                    self._formatFailureTraceback(error))
547
548
549     def _printUnexpectedSuccess(self, todo):
550         ret = 'Reason: %r\n' % (todo.reason,)
551         if todo.errors:
552             ret += 'Expected errors: %s\n' % (', '.join(todo.errors),)
553         return ret
554
555
556     def printErrors(self):
557         """
558         Print all of the non-success results in full to the stream.
559         """
560         warnings.warn("printErrors is deprecated in Twisted 8.0.",
561                       category=DeprecationWarning, stacklevel=2)
562         self._printErrors()
563
564
565     def _printErrors(self):
566         """
567         Print all of the non-success results to the stream in full.
568         """
569         self._write('\n')
570         self._printResults('[SKIPPED]', self.skips, lambda x : '%s\n' % x)
571         self._printResults('[TODO]', self.expectedFailures,
572                            self._printExpectedFailure)
573         self._printResults('[FAIL]', self.failures,
574                            self._formatFailureTraceback)
575         self._printResults('[ERROR]', self.errors,
576                            self._formatFailureTraceback)
577         self._printResults('[SUCCESS!?!]', self.unexpectedSuccesses,
578                            self._printUnexpectedSuccess)
579
580
581     def _getSummary(self):
582         """
583         Return a formatted count of tests status results.
584         """
585         summaries = []
586         for stat in ("skips", "expectedFailures", "failures", "errors",
587                      "unexpectedSuccesses"):
588             num = len(getattr(self, stat))
589             if num:
590                 summaries.append('%s=%d' % (stat, num))
591         if self.successes:
592            summaries.append('successes=%d' % (self.successes,))
593         summary = (summaries and ' ('+', '.join(summaries)+')') or ''
594         return summary
595
596
597     def printSummary(self):
598         """
599         Print a line summarising the test results to the stream.
600         """
601         warnings.warn("printSummary is deprecated in Twisted 8.0.",
602                       category=DeprecationWarning, stacklevel=2)
603         self._printSummary()
604
605
606     def _printSummary(self):
607         """
608         Print a line summarising the test results to the stream.
609         """
610         summary = self._getSummary()
611         if self.wasSuccessful():
612             status = "PASSED"
613         else:
614             status = "FAILED"
615         self._write("%s%s\n", status, summary)
616
617
618     def done(self):
619         """
620         Summarize the result of the test run.
621
622         The summary includes a report of all of the errors, todos, skips and
623         so forth that occurred during the run. It also includes the number of
624         tests that were run and how long it took to run them (not including
625         load time).
626
627         Expects that L{_printErrors}, L{_writeln}, L{_write}, L{_printSummary}
628         and L{_separator} are all implemented.
629         """
630         if self._publisher is not None:
631             self._publisher.removeObserver(self._observeWarnings)
632         self._printErrors()
633         self._writeln(self._separator)
634         if self._startTime is not None:
635             self._writeln('Ran %d tests in %.3fs', self.testsRun,
636                           time.time() - self._startTime)
637         self._write('\n')
638         self._printSummary()
639
640
641
642 class MinimalReporter(Reporter):
643     """
644     A minimalist reporter that prints only a summary of the test result, in
645     the form of (timeTaken, #tests, #tests, #errors, #failures, #skips).
646     """
647
648     def _printErrors(self):
649         """
650         Don't print a detailed summary of errors. We only care about the
651         counts.
652         """
653
654
655     def _printSummary(self):
656         """
657         Print out a one-line summary of the form:
658         '%(runtime) %(number_of_tests) %(number_of_tests) %(num_errors)
659         %(num_failures) %(num_skips)'
660         """
661         numTests = self.testsRun
662         if self._startTime is not None:
663             timing = self._getTime() - self._startTime
664         else:
665             timing = 0
666         t = (timing, numTests, numTests,
667              len(self.errors), len(self.failures), len(self.skips))
668         self._writeln(' '.join(map(str, t)))
669
670
671
672 class TextReporter(Reporter):
673     """
674     Simple reporter that prints a single character for each test as it runs,
675     along with the standard Trial summary text.
676     """
677
678     def addSuccess(self, test):
679         super(TextReporter, self).addSuccess(test)
680         self._write('.')
681
682
683     def addError(self, *args):
684         super(TextReporter, self).addError(*args)
685         self._write('E')
686
687
688     def addFailure(self, *args):
689         super(TextReporter, self).addFailure(*args)
690         self._write('F')
691
692
693     def addSkip(self, *args):
694         super(TextReporter, self).addSkip(*args)
695         self._write('S')
696
697
698     def addExpectedFailure(self, *args):
699         super(TextReporter, self).addExpectedFailure(*args)
700         self._write('T')
701
702
703     def addUnexpectedSuccess(self, *args):
704         super(TextReporter, self).addUnexpectedSuccess(*args)
705         self._write('!')
706
707
708
709 class VerboseTextReporter(Reporter):
710     """
711     A verbose reporter that prints the name of each test as it is running.
712
713     Each line is printed with the name of the test, followed by the result of
714     that test.
715     """
716
717     # This is actually the bwverbose option
718
719     def startTest(self, tm):
720         self._write('%s ... ', tm.id())
721         super(VerboseTextReporter, self).startTest(tm)
722
723
724     def addSuccess(self, test):
725         super(VerboseTextReporter, self).addSuccess(test)
726         self._write('[OK]')
727
728
729     def addError(self, *args):
730         super(VerboseTextReporter, self).addError(*args)
731         self._write('[ERROR]')
732
733
734     def addFailure(self, *args):
735         super(VerboseTextReporter, self).addFailure(*args)
736         self._write('[FAILURE]')
737
738
739     def addSkip(self, *args):
740         super(VerboseTextReporter, self).addSkip(*args)
741         self._write('[SKIPPED]')
742
743
744     def addExpectedFailure(self, *args):
745         super(VerboseTextReporter, self).addExpectedFailure(*args)
746         self._write('[TODO]')
747
748
749     def addUnexpectedSuccess(self, *args):
750         super(VerboseTextReporter, self).addUnexpectedSuccess(*args)
751         self._write('[SUCCESS!?!]')
752
753
754     def stopTest(self, test):
755         super(VerboseTextReporter, self).stopTest(test)
756         self._write('\n')
757
758
759
760 class TimingTextReporter(VerboseTextReporter):
761     """
762     Prints out each test as it is running, followed by the time taken for each
763     test to run.
764     """
765
766     def stopTest(self, method):
767         """
768         Mark the test as stopped, and write the time it took to run the test
769         to the stream.
770         """
771         super(TimingTextReporter, self).stopTest(method)
772         self._write("(%.03f secs)\n" % self._lastTime)
773
774
775
776 class _AnsiColorizer(object):
777     """
778     A colorizer is an object that loosely wraps around a stream, allowing
779     callers to write text to the stream in a particular color.
780
781     Colorizer classes must implement C{supported()} and C{write(text, color)}.
782     """
783     _colors = dict(black=30, red=31, green=32, yellow=33,
784                    blue=34, magenta=35, cyan=36, white=37)
785
786     def __init__(self, stream):
787         self.stream = stream
788
789     def supported(cls, stream=sys.stdout):
790         """
791         A class method that returns True if the current platform supports
792         coloring terminal output using this method. Returns False otherwise.
793         """
794         if not stream.isatty():
795             return False # auto color only on TTYs
796         try:
797             import curses
798         except ImportError:
799             return False
800         else:
801             try:
802                 try:
803                     return curses.tigetnum("colors") > 2
804                 except curses.error:
805                     curses.setupterm()
806                     return curses.tigetnum("colors") > 2
807             except:
808                 # guess false in case of error
809                 return False
810     supported = classmethod(supported)
811
812     def write(self, text, color):
813         """
814         Write the given text to the stream in the given color.
815
816         @param text: Text to be written to the stream.
817
818         @param color: A string label for a color. e.g. 'red', 'white'.
819         """
820         color = self._colors[color]
821         self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
822
823
824 class _Win32Colorizer(object):
825     """
826     See _AnsiColorizer docstring.
827     """
828     def __init__(self, stream):
829         from win32console import GetStdHandle, STD_OUTPUT_HANDLE, \
830              FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
831              FOREGROUND_INTENSITY
832         red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
833                                   FOREGROUND_BLUE, FOREGROUND_INTENSITY)
834         self.stream = stream
835         self.screenBuffer = GetStdHandle(STD_OUTPUT_HANDLE)
836         self._colors = {
837             'normal': red | green | blue,
838             'red': red | bold,
839             'green': green | bold,
840             'blue': blue | bold,
841             'yellow': red | green | bold,
842             'magenta': red | blue | bold,
843             'cyan': green | blue | bold,
844             'white': red | green | blue | bold
845             }
846
847     def supported(cls, stream=sys.stdout):
848         try:
849             import win32console
850             screenBuffer = win32console.GetStdHandle(
851                 win32console.STD_OUTPUT_HANDLE)
852         except ImportError:
853             return False
854         import pywintypes
855         try:
856             screenBuffer.SetConsoleTextAttribute(
857                 win32console.FOREGROUND_RED |
858                 win32console.FOREGROUND_GREEN |
859                 win32console.FOREGROUND_BLUE)
860         except pywintypes.error:
861             return False
862         else:
863             return True
864     supported = classmethod(supported)
865
866     def write(self, text, color):
867         color = self._colors[color]
868         self.screenBuffer.SetConsoleTextAttribute(color)
869         self.stream.write(text)
870         self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
871
872
873 class _NullColorizer(object):
874     """
875     See _AnsiColorizer docstring.
876     """
877     def __init__(self, stream):
878         self.stream = stream
879
880     def supported(cls, stream=sys.stdout):
881         return True
882     supported = classmethod(supported)
883
884     def write(self, text, color):
885         self.stream.write(text)
886
887
888
889 class TreeReporter(Reporter):
890     """
891     Print out the tests in the form a tree.
892
893     Tests are indented according to which class and module they belong.
894     Results are printed in ANSI color.
895     """
896
897     currentLine = ''
898     indent = '  '
899     columns = 79
900
901     FAILURE = 'red'
902     ERROR = 'red'
903     TODO = 'blue'
904     SKIP = 'blue'
905     TODONE = 'red'
906     SUCCESS = 'green'
907
908     def __init__(self, stream=sys.stdout, *args, **kwargs):
909         super(TreeReporter, self).__init__(stream, *args, **kwargs)
910         self._lastTest = []
911         for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
912             if colorizer.supported(stream):
913                 self._colorizer = colorizer(stream)
914                 break
915
916     def getDescription(self, test):
917         """
918         Return the name of the method which 'test' represents.  This is
919         what gets displayed in the leaves of the tree.
920
921         e.g. getDescription(TestCase('test_foo')) ==> test_foo
922         """
923         return test.id().split('.')[-1]
924
925     def addSuccess(self, test):
926         super(TreeReporter, self).addSuccess(test)
927         self.endLine('[OK]', self.SUCCESS)
928
929     def addError(self, *args):
930         super(TreeReporter, self).addError(*args)
931         self.endLine('[ERROR]', self.ERROR)
932
933     def addFailure(self, *args):
934         super(TreeReporter, self).addFailure(*args)
935         self.endLine('[FAIL]', self.FAILURE)
936
937     def addSkip(self, *args):
938         super(TreeReporter, self).addSkip(*args)
939         self.endLine('[SKIPPED]', self.SKIP)
940
941     def addExpectedFailure(self, *args):
942         super(TreeReporter, self).addExpectedFailure(*args)
943         self.endLine('[TODO]', self.TODO)
944
945     def addUnexpectedSuccess(self, *args):
946         super(TreeReporter, self).addUnexpectedSuccess(*args)
947         self.endLine('[SUCCESS!?!]', self.TODONE)
948
949     def _write(self, format, *args):
950         if args:
951             format = format % args
952         self.currentLine = format
953         super(TreeReporter, self)._write(self.currentLine)
954
955
956     def _getPreludeSegments(self, testID):
957         """
958         Return a list of all non-leaf segments to display in the tree.
959
960         Normally this is the module and class name.
961         """
962         segments = testID.split('.')[:-1]
963         if len(segments) == 0:
964             return segments
965         segments = [
966             seg for seg in '.'.join(segments[:-1]), segments[-1]
967             if len(seg) > 0]
968         return segments
969
970
971     def _testPrelude(self, testID):
972         """
973         Write the name of the test to the stream, indenting it appropriately.
974
975         If the test is the first test in a new 'branch' of the tree, also
976         write all of the parents in that branch.
977         """
978         segments = self._getPreludeSegments(testID)
979         indentLevel = 0
980         for seg in segments:
981             if indentLevel < len(self._lastTest):
982                 if seg != self._lastTest[indentLevel]:
983                     self._write('%s%s\n' % (self.indent * indentLevel, seg))
984             else:
985                 self._write('%s%s\n' % (self.indent * indentLevel, seg))
986             indentLevel += 1
987         self._lastTest = segments
988
989
990     def cleanupErrors(self, errs):
991         self._colorizer.write('    cleanup errors', self.ERROR)
992         self.endLine('[ERROR]', self.ERROR)
993         super(TreeReporter, self).cleanupErrors(errs)
994
995     def upDownError(self, method, error, warn, printStatus):
996         self._colorizer.write("  %s" % method, self.ERROR)
997         if printStatus:
998             self.endLine('[ERROR]', self.ERROR)
999         super(TreeReporter, self).upDownError(method, error, warn, printStatus)
1000
1001     def startTest(self, test):
1002         """
1003         Called when C{test} starts. Writes the tests name to the stream using
1004         a tree format.
1005         """
1006         self._testPrelude(test.id())
1007         self._write('%s%s ... ' % (self.indent * (len(self._lastTest)),
1008                                    self.getDescription(test)))
1009         super(TreeReporter, self).startTest(test)
1010
1011
1012     def endLine(self, message, color):
1013         """
1014         Print 'message' in the given color.
1015
1016         @param message: A string message, usually '[OK]' or something similar.
1017         @param color: A string color, 'red', 'green' and so forth.
1018         """
1019         spaces = ' ' * (self.columns - len(self.currentLine) - len(message))
1020         super(TreeReporter, self)._write(spaces)
1021         self._colorizer.write(message, color)
1022         super(TreeReporter, self)._write("\n")
1023
1024
1025     def _printSummary(self):
1026         """
1027         Print a line summarising the test results to the stream, and color the
1028         status result.
1029         """
1030         summary = self._getSummary()
1031         if self.wasSuccessful():
1032             status = "PASSED"
1033             color = self.SUCCESS
1034         else:
1035             status = "FAILED"
1036             color = self.FAILURE
1037         self._colorizer.write(status, color)
1038         self._write("%s\n", summary)
Note: See TracBrowser for help on using the browser.