Ticket #3682: test_proxy.py

File test_proxy.py, 52.2 KB (added by gthomas, 6 years ago)

copy of test_internet.py with test_dontPausePullConsumerOnWriteAlot added to TestProducer

Line 
1# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Tests for lots of functionality provided by L{twisted.internet}.
6"""
7
8import os
9import sys
10import time
11
12from twisted.trial import unittest
13from twisted.internet import reactor, protocol, error, abstract, defer
14from twisted.internet import interfaces, base
15
16from twisted.test.time_helpers import Clock
17
18try:
19    from twisted.internet import ssl
20except ImportError:
21    ssl = None
22if ssl and not ssl.supported:
23    ssl = None
24
25from twisted.internet.defer import Deferred, maybeDeferred
26from twisted.python import util, runtime
27
28
29
30class ThreePhaseEventTests(unittest.TestCase):
31    """
32    Tests for the private implementation helpers for system event triggers.
33    """
34    def setUp(self):
35        """
36        Create a trigger, an argument, and an event to be used by tests.
37        """
38        self.trigger = lambda x: None
39        self.arg = object()
40        self.event = base._ThreePhaseEvent()
41
42
43    def test_addInvalidPhase(self):
44        """
45        L{_ThreePhaseEvent.addTrigger} should raise L{KeyError} when called
46        with an invalid phase.
47        """
48        self.assertRaises(
49            KeyError,
50            self.event.addTrigger, 'xxx', self.trigger, self.arg)
51
52
53    def test_addBeforeTrigger(self):
54        """
55        L{_ThreePhaseEvent.addTrigger} should accept C{'before'} as a phase, a
56        callable, and some arguments and add the callable with the arguments to
57        the before list.
58        """
59        self.event.addTrigger('before', self.trigger, self.arg)
60        self.assertEqual(
61            self.event.before,
62            [(self.trigger, (self.arg,), {})])
63
64
65    def test_addDuringTrigger(self):
66        """
67        L{_ThreePhaseEvent.addTrigger} should accept C{'during'} as a phase, a
68        callable, and some arguments and add the callable with the arguments to
69        the during list.
70        """
71        self.event.addTrigger('during', self.trigger, self.arg)
72        self.assertEqual(
73            self.event.during,
74            [(self.trigger, (self.arg,), {})])
75
76
77    def test_addAfterTrigger(self):
78        """
79        L{_ThreePhaseEvent.addTrigger} should accept C{'after'} as a phase, a
80        callable, and some arguments and add the callable with the arguments to
81        the after list.
82        """
83        self.event.addTrigger('after', self.trigger, self.arg)
84        self.assertEqual(
85            self.event.after,
86            [(self.trigger, (self.arg,), {})])
87
88
89    def test_removeTrigger(self):
90        """
91        L{_ThreePhaseEvent.removeTrigger} should accept an opaque object
92        previously returned by L{_ThreePhaseEvent.addTrigger} and remove the
93        associated trigger.
94        """
95        handle = self.event.addTrigger('before', self.trigger, self.arg)
96        self.event.removeTrigger(handle)
97        self.assertEqual(self.event.before, [])
98
99
100    def test_removeNonexistentTrigger(self):
101        """
102        L{_ThreePhaseEvent.removeTrigger} should raise L{ValueError} when given
103        an object not previously returned by L{_ThreePhaseEvent.addTrigger}.
104        """
105        self.assertRaises(ValueError, self.event.removeTrigger, object())
106
107
108    def test_removeRemovedTrigger(self):
109        """
110        L{_ThreePhaseEvent.removeTrigger} should raise L{ValueError} the second
111        time it is called with an object returned by
112        L{_ThreePhaseEvent.addTrigger}.
113        """
114        handle = self.event.addTrigger('before', self.trigger, self.arg)
115        self.event.removeTrigger(handle)
116        self.assertRaises(ValueError, self.event.removeTrigger, handle)
117
118
119    def test_removeAlmostValidTrigger(self):
120        """
121        L{_ThreePhaseEvent.removeTrigger} should raise L{ValueError} if it is
122        given a trigger handle which resembles a valid trigger handle aside
123        from its phase being incorrect.
124        """
125        self.assertRaises(
126            KeyError,
127            self.event.removeTrigger, ('xxx', self.trigger, (self.arg,), {}))
128
129
130    def test_fireEvent(self):
131        """
132        L{_ThreePhaseEvent.fireEvent} should call I{before}, I{during}, and
133        I{after} phase triggers in that order.
134        """
135        events = []
136        self.event.addTrigger('after', events.append, ('first', 'after'))
137        self.event.addTrigger('during', events.append, ('first', 'during'))
138        self.event.addTrigger('before', events.append, ('first', 'before'))
139        self.event.addTrigger('before', events.append, ('second', 'before'))
140        self.event.addTrigger('during', events.append, ('second', 'during'))
141        self.event.addTrigger('after', events.append, ('second', 'after'))
142
143        self.assertEqual(events, [])
144        self.event.fireEvent()
145        self.assertEqual(events,
146                         [('first', 'before'), ('second', 'before'),
147                          ('first', 'during'), ('second', 'during'),
148                          ('first', 'after'), ('second', 'after')])
149
150
151    def test_asynchronousBefore(self):
152        """
153        L{_ThreePhaseEvent.fireEvent} should wait for any L{Deferred} returned
154        by a I{before} phase trigger before proceeding to I{during} events.
155        """
156        events = []
157        beforeResult = Deferred()
158        self.event.addTrigger('before', lambda: beforeResult)
159        self.event.addTrigger('during', events.append, 'during')
160        self.event.addTrigger('after', events.append, 'after')
161
162        self.assertEqual(events, [])
163        self.event.fireEvent()
164        self.assertEqual(events, [])
165        beforeResult.callback(None)
166        self.assertEqual(events, ['during', 'after'])
167
168
169    def test_beforeTriggerException(self):
170        """
171        If a before-phase trigger raises a synchronous exception, it should be
172        logged and the remaining triggers should be run.
173        """
174        events = []
175
176        class DummyException(Exception):
177            pass
178
179        def raisingTrigger():
180            raise DummyException()
181
182        self.event.addTrigger('before', raisingTrigger)
183        self.event.addTrigger('before', events.append, 'before')
184        self.event.addTrigger('during', events.append, 'during')
185        self.event.fireEvent()
186        self.assertEqual(events, ['before', 'during'])
187        errors = self.flushLoggedErrors(DummyException)
188        self.assertEqual(len(errors), 1)
189
190
191    def test_duringTriggerException(self):
192        """
193        If a during-phase trigger raises a synchronous exception, it should be
194        logged and the remaining triggers should be run.
195        """
196        events = []
197
198        class DummyException(Exception):
199            pass
200
201        def raisingTrigger():
202            raise DummyException()
203
204        self.event.addTrigger('during', raisingTrigger)
205        self.event.addTrigger('during', events.append, 'during')
206        self.event.addTrigger('after', events.append, 'after')
207        self.event.fireEvent()
208        self.assertEqual(events, ['during', 'after'])
209        errors = self.flushLoggedErrors(DummyException)
210        self.assertEqual(len(errors), 1)
211
212
213    def test_synchronousRemoveAlreadyExecutedBefore(self):
214        """
215        If a before-phase trigger tries to remove another before-phase trigger
216        which has already run, a warning should be emitted.
217        """
218        events = []
219
220        def removeTrigger():
221            self.event.removeTrigger(beforeHandle)
222
223        beforeHandle = self.event.addTrigger('before', events.append, ('first', 'before'))
224        self.event.addTrigger('before', removeTrigger)
225        self.event.addTrigger('before', events.append, ('second', 'before'))
226        self.assertWarns(
227            DeprecationWarning,
228            "Removing already-fired system event triggers will raise an "
229            "exception in a future version of Twisted.",
230            __file__,
231            self.event.fireEvent)
232        self.assertEqual(events, [('first', 'before'), ('second', 'before')])
233
234
235    def test_synchronousRemovePendingBefore(self):
236        """
237        If a before-phase trigger removes another before-phase trigger which
238        has not yet run, the removed trigger should not be run.
239        """
240        events = []
241        self.event.addTrigger(
242            'before', lambda: self.event.removeTrigger(beforeHandle))
243        beforeHandle = self.event.addTrigger(
244            'before', events.append, ('first', 'before'))
245        self.event.addTrigger('before', events.append, ('second', 'before'))
246        self.event.fireEvent()
247        self.assertEqual(events, [('second', 'before')])
248
249
250    def test_synchronousBeforeRemovesDuring(self):
251        """
252        If a before-phase trigger removes a during-phase trigger, the
253        during-phase trigger should not be run.
254        """
255        events = []
256        self.event.addTrigger(
257            'before', lambda: self.event.removeTrigger(duringHandle))
258        duringHandle = self.event.addTrigger('during', events.append, 'during')
259        self.event.addTrigger('after', events.append, 'after')
260        self.event.fireEvent()
261        self.assertEqual(events, ['after'])
262
263
264    def test_asynchronousBeforeRemovesDuring(self):
265        """
266        If a before-phase trigger returns a L{Deferred} and later removes a
267        during-phase trigger before the L{Deferred} fires, the during-phase
268        trigger should not be run.
269        """
270        events = []
271        beforeResult = Deferred()
272        self.event.addTrigger('before', lambda: beforeResult)
273        duringHandle = self.event.addTrigger('during', events.append, 'during')
274        self.event.addTrigger('after', events.append, 'after')
275        self.event.fireEvent()
276        self.event.removeTrigger(duringHandle)
277        beforeResult.callback(None)
278        self.assertEqual(events, ['after'])
279
280
281    def test_synchronousBeforeRemovesConspicuouslySimilarDuring(self):
282        """
283        If a before-phase trigger removes a during-phase trigger which is
284        identical to an already-executed before-phase trigger aside from their
285        phases, no warning should be emitted and the during-phase trigger
286        should not be run.
287        """
288        events = []
289        def trigger():
290            events.append('trigger')
291        self.event.addTrigger('before', trigger)
292        self.event.addTrigger(
293            'before', lambda: self.event.removeTrigger(duringTrigger))
294        duringTrigger = self.event.addTrigger('during', trigger)
295        self.event.fireEvent()
296        self.assertEqual(events, ['trigger'])
297
298
299    def test_synchronousRemovePendingDuring(self):
300        """
301        If a during-phase trigger removes another during-phase trigger which
302        has not yet run, the removed trigger should not be run.
303        """
304        events = []
305        self.event.addTrigger(
306            'during', lambda: self.event.removeTrigger(duringHandle))
307        duringHandle = self.event.addTrigger(
308            'during', events.append, ('first', 'during'))
309        self.event.addTrigger(
310            'during', events.append, ('second', 'during'))
311        self.event.fireEvent()
312        self.assertEqual(events, [('second', 'during')])
313
314
315    def test_triggersRunOnce(self):
316        """
317        A trigger should only be called on the first call to
318        L{_ThreePhaseEvent.fireEvent}.
319        """
320        events = []
321        self.event.addTrigger('before', events.append, 'before')
322        self.event.addTrigger('during', events.append, 'during')
323        self.event.addTrigger('after', events.append, 'after')
324        self.event.fireEvent()
325        self.event.fireEvent()
326        self.assertEqual(events, ['before', 'during', 'after'])
327
328
329    def test_finishedBeforeTriggersCleared(self):
330        """
331        The temporary list L{_ThreePhaseEvent.finishedBefore} should be emptied
332        and the state reset to C{'BASE'} before the first during-phase trigger
333        executes.
334        """
335        events = []
336        def duringTrigger():
337            events.append('during')
338            self.assertEqual(self.event.finishedBefore, [])
339            self.assertEqual(self.event.state, 'BASE')
340        self.event.addTrigger('before', events.append, 'before')
341        self.event.addTrigger('during', duringTrigger)
342        self.event.fireEvent()
343        self.assertEqual(events, ['before', 'during'])
344
345
346
347class SystemEventTestCase(unittest.TestCase):
348    """
349    Tests for the reactor's implementation of the C{fireSystemEvent},
350    C{addSystemEventTrigger}, and C{removeSystemEventTrigger} methods of the
351    L{IReactorCore} interface.
352
353    @ivar triggers: A list of the handles to triggers which have been added to
354        the reactor.
355    """
356    def setUp(self):
357        """
358        Create an empty list in which to store trigger handles.
359        """
360        self.triggers = []
361
362
363    def tearDown(self):
364        """
365        Remove all remaining triggers from the reactor.
366        """
367        while self.triggers:
368            trigger = self.triggers.pop()
369            try:
370                reactor.removeSystemEventTrigger(trigger)
371            except (ValueError, KeyError):
372                pass
373
374
375    def addTrigger(self, event, phase, func):
376        """
377        Add a trigger to the reactor and remember it in C{self.triggers}.
378        """
379        t = reactor.addSystemEventTrigger(event, phase, func)
380        self.triggers.append(t)
381        return t
382
383
384    def removeTrigger(self, trigger):
385        """
386        Remove a trigger by its handle from the reactor and from
387        C{self.triggers}.
388        """
389        reactor.removeSystemEventTrigger(trigger)
390        self.triggers.remove(trigger)
391
392
393    def _addSystemEventTriggerTest(self, phase):
394        eventType = 'test'
395        events = []
396        def trigger():
397            events.append(None)
398        self.addTrigger(phase, eventType, trigger)
399        self.assertEqual(events, [])
400        reactor.fireSystemEvent(eventType)
401        self.assertEqual(events, [None])
402
403
404    def test_beforePhase(self):
405        """
406        L{IReactorCore.addSystemEventTrigger} should accept the C{'before'}
407        phase and not call the given object until the right event is fired.
408        """
409        self._addSystemEventTriggerTest('before')
410
411
412    def test_duringPhase(self):
413        """
414        L{IReactorCore.addSystemEventTrigger} should accept the C{'during'}
415        phase and not call the given object until the right event is fired.
416        """
417        self._addSystemEventTriggerTest('during')
418
419
420    def test_afterPhase(self):
421        """
422        L{IReactorCore.addSystemEventTrigger} should accept the C{'after'}
423        phase and not call the given object until the right event is fired.
424        """
425        self._addSystemEventTriggerTest('after')
426
427
428    def test_unknownPhase(self):
429        """
430        L{IReactorCore.addSystemEventTrigger} should reject phases other than
431        C{'before'}, C{'during'}, or C{'after'}.
432        """
433        eventType = 'test'
434        self.assertRaises(
435            KeyError, self.addTrigger, 'xxx', eventType, lambda: None)
436
437
438    def test_beforePreceedsDuring(self):
439        """
440        L{IReactorCore.addSystemEventTrigger} should call triggers added to the
441        C{'before'} phase before it calls triggers added to the C{'during'}
442        phase.
443        """
444        eventType = 'test'
445        events = []
446        def beforeTrigger():
447            events.append('before')
448        def duringTrigger():
449            events.append('during')
450        self.addTrigger('before', eventType, beforeTrigger)
451        self.addTrigger('during', eventType, duringTrigger)
452        self.assertEqual(events, [])
453        reactor.fireSystemEvent(eventType)
454        self.assertEqual(events, ['before', 'during'])
455
456
457    def test_duringPreceedsAfter(self):
458        """
459        L{IReactorCore.addSystemEventTrigger} should call triggers added to the
460        C{'during'} phase before it calls triggers added to the C{'after'}
461        phase.
462        """
463        eventType = 'test'
464        events = []
465        def duringTrigger():
466            events.append('during')
467        def afterTrigger():
468            events.append('after')
469        self.addTrigger('during', eventType, duringTrigger)
470        self.addTrigger('after', eventType, afterTrigger)
471        self.assertEqual(events, [])
472        reactor.fireSystemEvent(eventType)
473        self.assertEqual(events, ['during', 'after'])
474
475
476    def test_beforeReturnsDeferred(self):
477        """
478        If a trigger added to the C{'before'} phase of an event returns a
479        L{Deferred}, the C{'during'} phase should be delayed until it is called
480        back.
481        """
482        triggerDeferred = Deferred()
483        eventType = 'test'
484        events = []
485        def beforeTrigger():
486            return triggerDeferred
487        def duringTrigger():
488            events.append('during')
489        self.addTrigger('before', eventType, beforeTrigger)
490        self.addTrigger('during', eventType, duringTrigger)
491        self.assertEqual(events, [])
492        reactor.fireSystemEvent(eventType)
493        self.assertEqual(events, [])
494        triggerDeferred.callback(None)
495        self.assertEqual(events, ['during'])
496
497
498    def test_multipleBeforeReturnDeferred(self):
499        """
500        If more than one trigger added to the C{'before'} phase of an event
501        return L{Deferred}s, the C{'during'} phase should be delayed until they
502        are all called back.
503        """
504        firstDeferred = Deferred()
505        secondDeferred = Deferred()
506        eventType = 'test'
507        events = []
508        def firstBeforeTrigger():
509            return firstDeferred
510        def secondBeforeTrigger():
511            return secondDeferred
512        def duringTrigger():
513            events.append('during')
514        self.addTrigger('before', eventType, firstBeforeTrigger)
515        self.addTrigger('before', eventType, secondBeforeTrigger)
516        self.addTrigger('during', eventType, duringTrigger)
517        self.assertEqual(events, [])
518        reactor.fireSystemEvent(eventType)
519        self.assertEqual(events, [])
520        firstDeferred.callback(None)
521        self.assertEqual(events, [])
522        secondDeferred.callback(None)
523        self.assertEqual(events, ['during'])
524
525
526    def test_subsequentBeforeTriggerFiresPriorBeforeDeferred(self):
527        """
528        If a trigger added to the C{'before'} phase of an event calls back a
529        L{Deferred} returned by an earlier trigger in the C{'before'} phase of
530        the same event, the remaining C{'before'} triggers for that event
531        should be run and any further L{Deferred}s waited on before proceeding
532        to the C{'during'} events.
533        """
534        eventType = 'test'
535        events = []
536        firstDeferred = Deferred()
537        secondDeferred = Deferred()
538        def firstBeforeTrigger():
539            return firstDeferred
540        def secondBeforeTrigger():
541            firstDeferred.callback(None)
542        def thirdBeforeTrigger():
543            events.append('before')
544            return secondDeferred
545        def duringTrigger():
546            events.append('during')
547        self.addTrigger('before', eventType, firstBeforeTrigger)
548        self.addTrigger('before', eventType, secondBeforeTrigger)
549        self.addTrigger('before', eventType, thirdBeforeTrigger)
550        self.addTrigger('during', eventType, duringTrigger)
551        self.assertEqual(events, [])
552        reactor.fireSystemEvent(eventType)
553        self.assertEqual(events, ['before'])
554        secondDeferred.callback(None)
555        self.assertEqual(events, ['before', 'during'])
556
557
558    def test_removeSystemEventTrigger(self):
559        """
560        A trigger removed with L{IReactorCore.removeSystemEventTrigger} should
561        not be called when the event fires.
562        """
563        eventType = 'test'
564        events = []
565        def firstBeforeTrigger():
566            events.append('first')
567        def secondBeforeTrigger():
568            events.append('second')
569        self.addTrigger('before', eventType, firstBeforeTrigger)
570        self.removeTrigger(
571            self.addTrigger('before', eventType, secondBeforeTrigger))
572        self.assertEqual(events, [])
573        reactor.fireSystemEvent(eventType)
574        self.assertEqual(events, ['first'])
575
576
577    def test_removeNonExistentSystemEventTrigger(self):
578        """
579        Passing an object to L{IReactorCore.removeSystemEventTrigger} which was
580        not returned by a previous call to
581        L{IReactorCore.addSystemEventTrigger} or which has already been passed
582        to C{removeSystemEventTrigger} should result in L{TypeError},
583        L{KeyError}, or L{ValueError} being raised.
584        """
585        b = self.addTrigger('during', 'test', lambda: None)
586        self.removeTrigger(b)
587        self.assertRaises(
588            TypeError, reactor.removeSystemEventTrigger, None)
589        self.assertRaises(
590            ValueError, reactor.removeSystemEventTrigger, b)
591        self.assertRaises(
592            KeyError,
593            reactor.removeSystemEventTrigger,
594            (b[0], ('xxx',) + b[1][1:]))
595
596
597    def test_interactionBetweenDifferentEvents(self):
598        """
599        L{IReactorCore.fireSystemEvent} should behave the same way for a
600        particular system event regardless of whether Deferreds are being
601        waited on for a different system event.
602        """
603        events = []
604
605        firstEvent = 'first-event'
606        firstDeferred = Deferred()
607        def beforeFirstEvent():
608            events.append(('before', 'first'))
609            return firstDeferred
610        def afterFirstEvent():
611            events.append(('after', 'first'))
612
613        secondEvent = 'second-event'
614        secondDeferred = Deferred()
615        def beforeSecondEvent():
616            events.append(('before', 'second'))
617            return secondDeferred
618        def afterSecondEvent():
619            events.append(('after', 'second'))
620
621        self.addTrigger('before', firstEvent, beforeFirstEvent)
622        self.addTrigger('after', firstEvent, afterFirstEvent)
623        self.addTrigger('before', secondEvent, beforeSecondEvent)
624        self.addTrigger('after', secondEvent, afterSecondEvent)
625
626        self.assertEqual(events, [])
627
628        # After this, firstEvent should be stuck before 'during' waiting for
629        # firstDeferred.
630        reactor.fireSystemEvent(firstEvent)
631        self.assertEqual(events, [('before', 'first')])
632
633        # After this, secondEvent should be stuck before 'during' waiting for
634        # secondDeferred.
635        reactor.fireSystemEvent(secondEvent)
636        self.assertEqual(events, [('before', 'first'), ('before', 'second')])
637
638        # After this, firstEvent should have finished completely, but
639        # secondEvent should be at the same place.
640        firstDeferred.callback(None)
641        self.assertEqual(events, [('before', 'first'), ('before', 'second'),
642                                  ('after', 'first')])
643
644        # After this, secondEvent should have finished completely.
645        secondDeferred.callback(None)
646        self.assertEqual(events, [('before', 'first'), ('before', 'second'),
647                                  ('after', 'first'), ('after', 'second')])
648
649
650
651class TimeTestCase(unittest.TestCase):
652    """
653    Tests for the IReactorTime part of the reactor.
654    """
655
656
657    def test_seconds(self):
658        """
659        L{twisted.internet.reactor.seconds} should return something
660        like a number.
661
662        1. This test specifically does not assert any relation to the
663           "system time" as returned by L{time.time} or
664           L{twisted.python.runtime.seconds}, because at some point we
665           may find a better option for scheduling calls than
666           wallclock-time.
667        2. This test *also* does not assert anything about the type of
668           the result, because operations may not return ints or
669           floats: For example, datetime-datetime == timedelta(0).
670        """
671        now = reactor.seconds()
672        self.assertEquals(now-now+now, now)
673
674
675    def test_callLaterUsesReactorSecondsInDelayedCall(self):
676        """
677        L{reactor.callLater} should use the reactor's seconds factory
678        to produce the time at which the DelayedCall will be called.
679        """
680        oseconds = reactor.seconds
681        reactor.seconds = lambda: 100
682        try:
683            call = reactor.callLater(5, lambda: None)
684            self.assertEquals(call.getTime(), 105)
685        finally:
686            reactor.seconds = oseconds
687
688
689    def test_callLaterUsesReactorSecondsAsDelayedCallSecondsFactory(self):
690        """
691        L{reactor.callLater} should propagate its own seconds factory
692        to the DelayedCall to use as its own seconds factory.
693        """
694        oseconds = reactor.seconds
695        reactor.seconds = lambda: 100
696        try:
697            call = reactor.callLater(5, lambda: None)
698            self.assertEquals(call.seconds(), 100)
699        finally:
700            reactor.seconds = oseconds
701
702
703    def test_callLater(self):
704        """
705        Test that a DelayedCall really calls the function it is
706        supposed to call.
707        """
708        d = Deferred()
709        reactor.callLater(0, d.callback, None)
710        d.addCallback(self.assertEqual, None)
711        return d
712
713
714    def test_cancelDelayedCall(self):
715        """
716        Test that when a DelayedCall is cancelled it does not run.
717        """
718        called = []
719        def function():
720            called.append(None)
721        call = reactor.callLater(0, function)
722        call.cancel()
723
724        # Schedule a call in two "iterations" to check to make sure that the
725        # above call never ran.
726        d = Deferred()
727        def check():
728            try:
729                self.assertEqual(called, [])
730            except:
731                d.errback()
732            else:
733                d.callback(None)
734        reactor.callLater(0, reactor.callLater, 0, check)
735        return d
736
737
738    def test_cancelCancelledDelayedCall(self):
739        """
740        Test that cancelling a DelayedCall which has already been cancelled
741        raises the appropriate exception.
742        """
743        call = reactor.callLater(0, lambda: None)
744        call.cancel()
745        self.assertRaises(error.AlreadyCancelled, call.cancel)
746
747
748    def test_cancelCalledDelayedCallSynchronous(self):
749        """
750        Test that cancelling a DelayedCall in the DelayedCall's function as
751        that function is being invoked by the DelayedCall raises the
752        appropriate exception.
753        """
754        d = Deferred()
755        def later():
756            try:
757                self.assertRaises(error.AlreadyCalled, call.cancel)
758            except:
759                d.errback()
760            else:
761                d.callback(None)
762        call = reactor.callLater(0, later)
763        return d
764
765
766    def test_cancelCalledDelayedCallAsynchronous(self):
767        """
768        Test that cancelling a DelayedCall after it has run its function
769        raises the appropriate exception.
770        """
771        d = Deferred()
772        def check():
773            try:
774                self.assertRaises(error.AlreadyCalled, call.cancel)
775            except:
776                d.errback()
777            else:
778                d.callback(None)
779        def later():
780            reactor.callLater(0, check)
781        call = reactor.callLater(0, later)
782        return d
783
784
785    def testCallLaterDelayAndReset(self):
786        """
787        Test that the reactor handles DelayedCalls which have been
788        reset or delayed.
789        """
790        clock = Clock()
791        clock.install()
792        try:
793            callbackTimes = [None, None]
794
795            def resetCallback():
796                callbackTimes[0] = clock()
797
798            def delayCallback():
799                callbackTimes[1] = clock()
800
801            ireset = reactor.callLater(2, resetCallback)
802            idelay = reactor.callLater(3, delayCallback)
803
804            clock.pump(reactor, [0, 1])
805
806            self.assertIdentical(callbackTimes[0], None)
807            self.assertIdentical(callbackTimes[1], None)
808
809            ireset.reset(2) # (now)1 + 2 = 3
810            idelay.delay(3) # (orig)3 + 3 = 6
811
812            clock.pump(reactor, [0, 1])
813
814            self.assertIdentical(callbackTimes[0], None)
815            self.assertIdentical(callbackTimes[1], None)
816
817            clock.pump(reactor, [0, 1])
818
819            self.assertEquals(callbackTimes[0], 3)
820            self.assertEquals(callbackTimes[1], None)
821
822            clock.pump(reactor, [0, 3])
823            self.assertEquals(callbackTimes[1], 6)
824        finally:
825            clock.uninstall()
826
827
828    def testCallLaterTime(self):
829        d = reactor.callLater(10, lambda: None)
830        try:
831            self.failUnless(d.getTime() - (time.time() + 10) < 1)
832        finally:
833            d.cancel()
834
835    def testCallInNextIteration(self):
836        calls = []
837        def f1():
838            calls.append('f1')
839            reactor.callLater(0.0, f2)
840        def f2():
841            calls.append('f2')
842            reactor.callLater(0.0, f3)
843        def f3():
844            calls.append('f3')
845
846        reactor.callLater(0, f1)
847        self.assertEquals(calls, [])
848        reactor.iterate()
849        self.assertEquals(calls, ['f1'])
850        reactor.iterate()
851        self.assertEquals(calls, ['f1', 'f2'])
852        reactor.iterate()
853        self.assertEquals(calls, ['f1', 'f2', 'f3'])
854
855    def testCallLaterOrder(self):
856        l = []
857        l2 = []
858        def f(x):
859            l.append(x)
860        def f2(x):
861            l2.append(x)
862        def done():
863            self.assertEquals(l, range(20))
864        def done2():
865            self.assertEquals(l2, range(10))
866
867        for n in range(10):
868            reactor.callLater(0, f, n)
869        for n in range(10):
870            reactor.callLater(0, f, n+10)
871            reactor.callLater(0.1, f2, n)
872
873        reactor.callLater(0, done)
874        reactor.callLater(0.1, done2)
875        d = Deferred()
876        reactor.callLater(0.2, d.callback, None)
877        return d
878
879    testCallLaterOrder.todo = "See bug 1396"
880    testCallLaterOrder.skip = "Trial bug, todo doesn't work! See bug 1397"
881    def testCallLaterOrder2(self):
882        # This time destroy the clock resolution so that it fails reliably
883        # even on systems that don't have a crappy clock resolution.
884
885        def seconds():
886            return int(time.time())
887
888        base_original = base.seconds
889        runtime_original = runtime.seconds
890        base.seconds = seconds
891        runtime.seconds = seconds
892
893        def cleanup(x):
894            runtime.seconds = runtime_original
895            base.seconds = base_original
896            return x
897        return maybeDeferred(self.testCallLaterOrder).addBoth(cleanup)
898
899    testCallLaterOrder2.todo = "See bug 1396"
900    testCallLaterOrder2.skip = "Trial bug, todo doesn't work! See bug 1397"
901
902    def testDelayedCallStringification(self):
903        # Mostly just make sure str() isn't going to raise anything for
904        # DelayedCalls within reason.
905        dc = reactor.callLater(0, lambda x, y: None, 'x', y=10)
906        str(dc)
907        dc.reset(5)
908        str(dc)
909        dc.cancel()
910        str(dc)
911
912        dc = reactor.callLater(0, lambda: None, x=[({'hello': u'world'}, 10j), reactor], *range(10))
913        str(dc)
914        dc.cancel()
915        str(dc)
916
917        def calledBack(ignored):
918            str(dc)
919        d = Deferred().addCallback(calledBack)
920        dc = reactor.callLater(0, d.callback, None)
921        str(dc)
922        return d
923
924
925    def testDelayedCallSecondsOverride(self):
926        """
927        Test that the C{seconds} argument to DelayedCall gets used instead of
928        the default timing function, if it is not None.
929        """
930        def seconds():
931            return 10
932        dc = base.DelayedCall(5, lambda: None, (), {}, lambda dc: None,
933                              lambda dc: None, seconds)
934        self.assertEquals(dc.getTime(), 5)
935        dc.reset(3)
936        self.assertEquals(dc.getTime(), 13)
937
938
939class CallFromThreadTests(unittest.TestCase):
940    def testWakeUp(self):
941        # Make sure other threads can wake up the reactor
942        d = Deferred()
943        def wake():
944            time.sleep(0.1)
945            # callFromThread will call wakeUp for us
946            reactor.callFromThread(d.callback, None)
947        reactor.callInThread(wake)
948        return d
949
950    if interfaces.IReactorThreads(reactor, None) is None:
951        testWakeUp.skip = "Nothing to wake up for without thread support"
952
953    def _stopCallFromThreadCallback(self):
954        self.stopped = True
955
956    def _callFromThreadCallback(self, d):
957        reactor.callFromThread(self._callFromThreadCallback2, d)
958        reactor.callLater(0, self._stopCallFromThreadCallback)
959
960    def _callFromThreadCallback2(self, d):
961        try:
962            self.assert_(self.stopped)
963        except:
964            # Send the error to the deferred
965            d.errback()
966        else:
967            d.callback(None)
968
969    def testCallFromThreadStops(self):
970        """
971        Ensure that callFromThread from inside a callFromThread
972        callback doesn't sit in an infinite loop and lets other
973        things happen too.
974        """
975        self.stopped = False
976        d = defer.Deferred()
977        reactor.callFromThread(self._callFromThreadCallback, d)
978        return d
979
980
981
982class ReactorCoreTestCase(unittest.TestCase):
983    """
984    Test core functionalities of the reactor.
985    """
986
987    def test_run(self):
988        """
989        Test that reactor.crash terminates reactor.run
990        """
991        for i in xrange(3):
992            reactor.callLater(0.01, reactor.crash)
993            reactor.run()
994
995
996    def test_iterate(self):
997        """
998        Test that reactor.iterate(0) doesn't block
999        """
1000        start = time.time()
1001        # twisted timers are distinct from the underlying event loop's
1002        # timers, so this fail-safe probably won't keep a failure from
1003        # hanging the test
1004        t = reactor.callLater(10, reactor.crash)
1005        reactor.iterate(0) # shouldn't block
1006        stop = time.time()
1007        elapsed = stop - start
1008        self.failUnless(elapsed < 8)
1009        t.cancel()
1010
1011
1012
1013class DelayedTestCase(unittest.TestCase):
1014    def setUp(self):
1015        self.finished = 0
1016        self.counter = 0
1017        self.timers = {}
1018        self.deferred = defer.Deferred()
1019        # ick. Sometimes there are magic timers already running:
1020        # popsicle.Freezer.tick . Kill off all such timers now so they won't
1021        # interfere with the test. Of course, this kind of requires that
1022        # getDelayedCalls already works, so certain failure modes won't be
1023        # noticed.
1024        if not hasattr(reactor, "getDelayedCalls"):
1025            return
1026        for t in reactor.getDelayedCalls():
1027            t.cancel()
1028        reactor.iterate() # flush timers
1029
1030    def tearDown(self):
1031        for t in self.timers.values():
1032            t.cancel()
1033
1034    def checkTimers(self):
1035        l1 = self.timers.values()
1036        l2 = list(reactor.getDelayedCalls())
1037
1038        # There should be at least the calls we put in.  There may be other
1039        # calls that are none of our business and that we should ignore,
1040        # though.
1041
1042        missing = []
1043        for dc in l1:
1044            if dc not in l2:
1045                missing.append(dc)
1046        if missing:
1047            self.finished = 1
1048        self.failIf(missing, "Should have been missing no calls, instead was missing " + repr(missing))
1049
1050    def callback(self, tag):
1051        del self.timers[tag]
1052        self.checkTimers()
1053
1054    def addCallback(self, tag):
1055        self.callback(tag)
1056        self.addTimer(15, self.callback)
1057
1058    def done(self, tag):
1059        self.finished = 1
1060        self.callback(tag)
1061        self.deferred.callback(None)
1062
1063    def addTimer(self, when, callback):
1064        self.timers[self.counter] = reactor.callLater(when * 0.01, callback,
1065                                                      self.counter)
1066        self.counter += 1
1067        self.checkTimers()
1068
1069    def testGetDelayedCalls(self):
1070        if not hasattr(reactor, "getDelayedCalls"):
1071            return
1072        # This is not a race because we don't do anything which might call
1073        # the reactor until we have all the timers set up. If we did, this
1074        # test might fail on slow systems.
1075        self.checkTimers()
1076        self.addTimer(35, self.done)
1077        self.addTimer(20, self.callback)
1078        self.addTimer(30, self.callback)
1079        which = self.counter
1080        self.addTimer(29, self.callback)
1081        self.addTimer(25, self.addCallback)
1082        self.addTimer(26, self.callback)
1083
1084        self.timers[which].cancel()
1085        del self.timers[which]
1086        self.checkTimers()
1087
1088        self.deferred.addCallback(lambda x : self.checkTimers())
1089        return self.deferred
1090
1091    def testActive(self):
1092        dcall = reactor.callLater(0, lambda: None)
1093        self.assertEquals(dcall.active(), 1)
1094        reactor.iterate()
1095        self.assertEquals(dcall.active(), 0)
1096
1097resolve_helper = """
1098import %(reactor)s
1099%(reactor)s.install()
1100from twisted.internet import reactor
1101
1102class Foo:
1103    def __init__(self):
1104        reactor.callWhenRunning(self.start)
1105        self.timer = reactor.callLater(3, self.failed)
1106    def start(self):
1107        reactor.resolve('localhost').addBoth(self.done)
1108    def done(self, res):
1109        print 'done', res
1110        reactor.stop()
1111    def failed(self):
1112        print 'failed'
1113        self.timer = None
1114        reactor.stop()
1115f = Foo()
1116reactor.run()
1117"""
1118
1119class ChildResolveProtocol(protocol.ProcessProtocol):
1120    def __init__(self, onCompletion):
1121        self.onCompletion = onCompletion
1122
1123    def connectionMade(self):
1124        self.output = []
1125        self.error = []
1126
1127    def outReceived(self, out):
1128        self.output.append(out)
1129
1130    def errReceived(self, err):
1131        self.error.append(err)
1132
1133    def processEnded(self, reason):
1134        self.onCompletion.callback((reason, self.output, self.error))
1135        self.onCompletion = None
1136
1137
1138class Resolve(unittest.TestCase):
1139    def testChildResolve(self):
1140        # I've seen problems with reactor.run under gtk2reactor. Spawn a
1141        # child which just does reactor.resolve after the reactor has
1142        # started, fail if it does not complete in a timely fashion.
1143        helperPath = os.path.abspath(self.mktemp())
1144        helperFile = open(helperPath, 'w')
1145
1146        # Eeueuuggg
1147        reactorName = reactor.__module__
1148
1149        helperFile.write(resolve_helper % {'reactor': reactorName})
1150        helperFile.close()
1151
1152        env = os.environ.copy()
1153        env['PYTHONPATH'] = os.pathsep.join(sys.path)
1154
1155        helperDeferred = Deferred()
1156        helperProto = ChildResolveProtocol(helperDeferred)
1157
1158        reactor.spawnProcess(helperProto, sys.executable, ("python", "-u", helperPath), env)
1159
1160        def cbFinished((reason, output, error)):
1161            # If the output is "done 127.0.0.1\n" we don't really care what
1162            # else happened.
1163            output = ''.join(output)
1164            if output != 'done 127.0.0.1\n':
1165                self.fail((
1166                    "The child process failed to produce the desired results:\n"
1167                    "   Reason for termination was: %r\n"
1168                    "   Output stream was: %r\n"
1169                    "   Error stream was: %r\n") % (reason.getErrorMessage(), output, ''.join(error)))
1170
1171        helperDeferred.addCallback(cbFinished)
1172        return helperDeferred
1173
1174if not interfaces.IReactorProcess(reactor, None):
1175    Resolve.skip = "cannot run test: reactor doesn't support IReactorProcess"
1176
1177class Counter:
1178    index = 0
1179
1180    def add(self):
1181        self.index = self.index + 1
1182
1183
1184class Order:
1185
1186    stage = 0
1187
1188    def a(self):
1189        if self.stage != 0: raise RuntimeError
1190        self.stage = 1
1191
1192    def b(self):
1193        if self.stage != 1: raise RuntimeError
1194        self.stage = 2
1195
1196    def c(self):
1197        if self.stage != 2: raise RuntimeError
1198        self.stage = 3
1199
1200
1201class CallFromThreadTestCase(unittest.TestCase):
1202    """Task scheduling from threads tests."""
1203
1204    if interfaces.IReactorThreads(reactor, None) is None:
1205        skip = "Nothing to test without thread support"
1206
1207    def schedule(self, *args, **kwargs):
1208        """Override in subclasses."""
1209        reactor.callFromThread(*args, **kwargs)
1210
1211    def testScheduling(self):
1212        c = Counter()
1213        for i in range(100):
1214            self.schedule(c.add)
1215        for i in range(100):
1216            reactor.iterate()
1217        self.assertEquals(c.index, 100)
1218
1219    def testCorrectOrder(self):
1220        o = Order()
1221        self.schedule(o.a)
1222        self.schedule(o.b)
1223        self.schedule(o.c)
1224        reactor.iterate()
1225        reactor.iterate()
1226        reactor.iterate()
1227        self.assertEquals(o.stage, 3)
1228
1229    def testNotRunAtOnce(self):
1230        c = Counter()
1231        self.schedule(c.add)
1232        # scheduled tasks should not be run at once:
1233        self.assertEquals(c.index, 0)
1234        reactor.iterate()
1235        self.assertEquals(c.index, 1)
1236
1237
1238class MyProtocol(protocol.Protocol):
1239    """Sample protocol."""
1240
1241class MyFactory(protocol.Factory):
1242    """Sample factory."""
1243
1244    protocol = MyProtocol
1245
1246
1247class ProtocolTestCase(unittest.TestCase):
1248
1249    def testFactory(self):
1250        factory = MyFactory()
1251        protocol = factory.buildProtocol(None)
1252        self.assertEquals(protocol.factory, factory)
1253        self.assert_( isinstance(protocol, factory.protocol) )
1254
1255
1256class DummyProducer(object):
1257    """
1258    Very uninteresting producer implementation used by tests to ensure the
1259    right methods are called by the consumer with which it is registered.
1260
1261    @type events: C{list} of C{str}
1262    @ivar events: The producer/consumer related events which have happened to
1263    this producer.  Strings in this list may be C{'resume'}, C{'stop'}, or
1264    C{'pause'}.  Elements are added as they occur.
1265    """
1266
1267    def __init__(self):
1268        self.events = []
1269
1270
1271    def resumeProducing(self):
1272        self.events.append('resume')
1273
1274
1275    def stopProducing(self):
1276        self.events.append('stop')
1277
1278
1279    def pauseProducing(self):
1280        self.events.append('pause')
1281
1282
1283
1284class SillyDescriptor(abstract.FileDescriptor):
1285    """
1286    A descriptor whose data buffer gets filled very fast.
1287
1288    Useful for testing FileDescriptor's IConsumer interface, since
1289    the data buffer fills as soon as at least four characters are
1290    written to it, and gets emptied in a single doWrite() cycle.
1291    """
1292    bufferSize = 3
1293    connected = True
1294
1295    def writeSomeData(self, data):
1296        """
1297        Always write all data.
1298        """
1299        return len(data)
1300
1301
1302    def startWriting(self):
1303        """
1304        Do nothing: bypass the reactor.
1305        """
1306    stopWriting = startWriting
1307
1308
1309
1310class ReentrantProducer(DummyProducer):
1311    """
1312    Similar to L{DummyProducer}, but with a resumeProducing method which calls
1313    back into an L{IConsumer} method of the consumer against which it is
1314    registered.
1315
1316    @ivar consumer: The consumer with which this producer has been or will
1317    be registered.
1318
1319    @ivar methodName: The name of the method to call on the consumer inside
1320    C{resumeProducing}.
1321
1322    @ivar methodArgs: The arguments to pass to the consumer method invoked in
1323    C{resumeProducing}.
1324    """
1325    def __init__(self, consumer, methodName, *methodArgs):
1326        super(ReentrantProducer, self).__init__()
1327        self.consumer = consumer
1328        self.methodName = methodName
1329        self.methodArgs = methodArgs
1330
1331
1332    def resumeProducing(self):
1333        super(ReentrantProducer, self).resumeProducing()
1334        getattr(self.consumer, self.methodName)(*self.methodArgs)
1335
1336
1337
1338class TestProducer(unittest.TestCase):
1339    """
1340    Test abstract.FileDescriptor's consumer interface.
1341    """
1342    def test_doubleProducer(self):
1343        """
1344        Verify that registering a non-streaming producer invokes its
1345        resumeProducing() method and that you can only register one producer
1346        at a time.
1347        """
1348        fd = abstract.FileDescriptor()
1349        fd.connected = 1
1350        dp = DummyProducer()
1351        fd.registerProducer(dp, 0)
1352        self.assertEquals(dp.events, ['resume'])
1353        self.assertRaises(RuntimeError, fd.registerProducer, DummyProducer(), 0)
1354
1355
1356    def test_unconnectedFileDescriptor(self):
1357        """
1358        Verify that registering a producer when the connection has already
1359        been closed invokes its stopProducing() method.
1360        """
1361        fd = abstract.FileDescriptor()
1362        fd.disconnected = 1
1363        dp = DummyProducer()
1364        fd.registerProducer(dp, 0)
1365        self.assertEquals(dp.events, ['stop'])
1366
1367
1368    def _dontPausePullConsumerTest(self, methodName):
1369        descriptor = SillyDescriptor()
1370        producer = DummyProducer()
1371        descriptor.registerProducer(producer, streaming=False)
1372        self.assertEqual(producer.events, ['resume'])
1373        del producer.events[:]
1374
1375        # Fill up the descriptor's write buffer so we can observe whether or
1376        # not it pauses its producer in that case.
1377        getattr(descriptor, methodName)('1234')
1378
1379        self.assertEqual(producer.events, [])
1380
1381
1382    def test_dontPausePullConsumerOnWrite(self):
1383        """
1384        Verify that FileDescriptor does not call producer.pauseProducing() on a
1385        non-streaming pull producer in response to a L{IConsumer.write} call
1386        which results in a full write buffer. Issue #2286.
1387        """
1388        return self._dontPausePullConsumerTest('write')
1389
1390
1391    def test_dontPausePullConsumerOnWriteSequence(self):
1392        """
1393        Like L{test_dontPausePullConsumerOnWrite}, but for a call to
1394        C{writeSequence} rather than L{IConsumer.write}.
1395
1396        C{writeSequence} is not part of L{IConsumer}, but
1397        L{abstract.FileDescriptor} has supported consumery behavior in response
1398        to calls to L{writeSequence} forever.
1399        """
1400        return self._dontPausePullConsumerTest('writeSequence')
1401
1402
1403    def test_dontPausePullConsumerOnWriteAlot(self):
1404        """
1405        Like L{test_dontPausePullConsumerOnWrite}, but for a call to
1406        C{writeSequence} rather than L{IConsumer.write}.
1407
1408        C{writeSequence} is not part of L{IConsumer}, but
1409        L{abstract.FileDescriptor} has supported consumery behavior in response
1410        to calls to L{writeSequence} forever.
1411
1412        Should be run with
1413        bash -c 'ulimit -d 1024 -v 102400 ; PYTHONPATH=../..:$PYTHONPATH python ../../bin/trial test_proxy.TestProducer'
1414        """
1415        descriptor = SillyDescriptor()
1416        producer = DummyProducer()
1417        descriptor.registerProducer(producer, streaming=False)
1418        self.assertEqual(producer.events, ['resume'])
1419        del producer.events[:]
1420
1421        def doWrite(self):
1422            """
1423            Called when data can be written.
1424
1425            A result that is true (which will be a negative number or an
1426            exception instance) indicates that the connection was lost. A false
1427            result implies the connection is still there; a result of 0
1428            indicates no write was done, and a result of None indicates that a
1429            write was done.
1430            """
1431            if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
1432                # If there is currently less than SEND_LIMIT bytes left to send
1433                # in the string, extend it with the array data.
1434                self.dataBuffer = buffer(self.dataBuffer, self.offset) + "".join(self._tempDataBuffer)
1435                self.offset = 0
1436                self._tempDataBuffer = []
1437                self._tempDataLen = 0
1438
1439            # Send as much data as you can.
1440            if self.offset:
1441                l = self.writeSomeData(buffer(self.dataBuffer, self.offset))
1442            else:
1443                l = self.writeSomeData(self.dataBuffer)
1444
1445            # There is no writeSomeData implementation in Twisted which returns
1446            # 0, but the documentation for writeSomeData used to claim negative
1447            # integers meant connection lost.  Keep supporting this here,
1448            # although it may be worth deprecating and removing at some point.
1449            if l < 0 or isinstance(l, Exception):
1450                return l
1451            if l == 0 and self.dataBuffer:
1452                result = 0
1453            else:
1454                result = None
1455            self.offset += l
1456            # If there is nothing left to send,
1457            if self.offset == len(self.dataBuffer) and not self._tempDataLen:
1458                self.dataBuffer = ""
1459                self.offset = 0
1460                # stop writing.
1461                self.stopWriting()
1462                # If I've got a producer who is supposed to supply me with data,
1463                if self.producer is not None and ((not self.streamingProducer)
1464                                                  or self.producerPaused):
1465                    # tell them to supply some more.
1466                    self.producerPaused = 0
1467                    self.producer.resumeProducing()
1468                elif self.disconnecting:
1469                    # But if I was previously asked to let the connection die, do
1470                    # so.
1471                    return self._postLoseConnection()
1472                elif self._writeDisconnecting:
1473                    # I was previously asked to to half-close the connection.
1474                    result = self._closeWriteConnection()
1475                    self._writeDisconnected = True
1476                    return result
1477            return result
1478
1479        print
1480        # Fill up the descriptor's write buffer so we can observe whether or
1481        # not it pauses its producer in that case.
1482        try:
1483            for _ in xrange(0, 1000*15):
1484                descriptor.write('1234'*1000)
1485        except MemoryError:
1486            del descriptor
1487            raise
1488        print 'B1', len(descriptor.dataBuffer), descriptor._tempDataLen
1489
1490        #self.assertEqual(producer.events, [])
1491
1492        descriptor.doWrite()
1493        print 'B2', len(descriptor.dataBuffer), descriptor._tempDataLen
1494
1495        # Fill up the descriptor's write buffer so we can observe whether or
1496        # not it pauses its producer in that case.
1497        try:
1498            for _ in xrange(0, 1000*15):
1499                descriptor.write('1234'*1000)
1500        except MemoryError:
1501            del descriptor
1502            raise
1503        print 'A1', len(descriptor.dataBuffer), descriptor._tempDataLen
1504        doWrite(descriptor)
1505        print 'A2', len(descriptor.dataBuffer), descriptor._tempDataLen
1506
1507
1508    def _reentrantStreamingProducerTest(self, methodName):
1509        descriptor = SillyDescriptor()
1510        producer = ReentrantProducer(descriptor, methodName, 'spam')
1511        descriptor.registerProducer(producer, streaming=True)
1512
1513        # Start things off by filling up the descriptor's buffer so it will
1514        # pause its producer.
1515        getattr(descriptor, methodName)('spam')
1516
1517        # Sanity check - make sure that worked.
1518        self.assertEqual(producer.events, ['pause'])
1519        del producer.events[:]
1520
1521        # After one call to doWrite, the buffer has been emptied so the
1522        # FileDescriptor should resume its producer.  That will result in an
1523        # immediate call to FileDescriptor.write which will again fill the
1524        # buffer and result in the producer being paused.
1525        descriptor.doWrite()
1526        self.assertEqual(producer.events, ['resume', 'pause'])
1527        del producer.events[:]
1528
1529        # After a second call to doWrite, the exact same thing should have
1530        # happened.  Prior to the bugfix for which this test was written,
1531        # FileDescriptor would have incorrectly believed its producer was
1532        # already resumed (it was paused) and so not resume it again.
1533        descriptor.doWrite()
1534        self.assertEqual(producer.events, ['resume', 'pause'])
1535
1536
1537    def test_reentrantStreamingProducerUsingWrite(self):
1538        """
1539        Verify that FileDescriptor tracks producer's paused state correctly.
1540        Issue #811, fixed in revision r12857.
1541        """
1542        return self._reentrantStreamingProducerTest('write')
1543
1544
1545    def test_reentrantStreamingProducerUsingWriteSequence(self):
1546        """
1547        Like L{test_reentrantStreamingProducerUsingWrite}, but for calls to
1548        C{writeSequence}.
1549
1550        C{writeSequence} is B{not} part of L{IConsumer}, however
1551        C{abstract.FileDescriptor} has supported consumery behavior in response
1552        to calls to C{writeSequence} forever.
1553        """
1554        return self._reentrantStreamingProducerTest('writeSequence')
1555
1556
1557
1558class PortStringification(unittest.TestCase):
1559    if interfaces.IReactorTCP(reactor, None) is not None:
1560        def testTCP(self):
1561            p = reactor.listenTCP(0, protocol.ServerFactory())
1562            portNo = p.getHost().port
1563            self.assertNotEqual(str(p).find(str(portNo)), -1,
1564                                "%d not found in %s" % (portNo, p))
1565            return p.stopListening()
1566
1567    if interfaces.IReactorUDP(reactor, None) is not None:
1568        def testUDP(self):
1569            p = reactor.listenUDP(0, protocol.DatagramProtocol())
1570            portNo = p.getHost().port
1571            self.assertNotEqual(str(p).find(str(portNo)), -1,
1572                                "%d not found in %s" % (portNo, p))
1573            return p.stopListening()
1574
1575    if interfaces.IReactorSSL(reactor, None) is not None and ssl:
1576        def testSSL(self, ssl=ssl):
1577            pem = util.sibpath(__file__, 'server.pem')
1578            p = reactor.listenSSL(0, protocol.ServerFactory(), ssl.DefaultOpenSSLContextFactory(pem, pem))
1579            portNo = p.getHost().port
1580            self.assertNotEqual(str(p).find(str(portNo)), -1,
1581                                "%d not found in %s" % (portNo, p))
1582            return p.stopListening()