root/sandbox/exarkun/threadwrapper.py

Revision 12477, 2.5 KB (checked in by exarkun, 6 years ago)

DOC STRING

Line 
1
2import Queue
3
4from twisted.internet import reactor
5from twisted.internet import defer
6from twisted.python import failure
7
8class ThreadWrapper(object):
9    """Wrap an object which presents an asynchronous interface (via Deferreds).
10   
11    The wrapped object will present the same interface, but all methods will
12    return results, rather than Deferreds.
13   
14    This also makes it safe to call non-threadsafe methods from threads
15    other than the IO thread.
16   
17    This is only useful for wrapping objects accessed by threads other than
18    the IO thread.
19    """
20    def __init__(self, wrappee):
21        self.wrappee = wrappee
22   
23    def __getattribute__(self, name):
24        wrappee = super(ThreadWrapper, self).__getattribute__('wrappee')
25        original = getattr(wrappee, name)
26        if callable(original):
27            return CallableWrapper(original)
28        return original
29
30class CallableWrapper(object):
31    def __init__(self, original):
32        self.original = original
33        self.queue = Queue.Queue()
34   
35    def __call__(__self, *__a, **__kw):
36        reactor.callFromThread(__self.__callFromThread, __a, __kw)
37        result = __self.queue.get()
38        if isinstance(result, failure.Failure):
39            result.raiseException()
40        return result
41   
42    def __callFromThread(self, a, kw):
43        result = defer.maybeDeferred(self.original, *a, **kw)
44        result.addBoth(self.queue.put)
45
46class Asynchronous(object):
47    def syncResult(self, v):
48        return v
49
50    def asyncResult(self, n, v):
51        d = defer.Deferred()
52        reactor.callLater(n, d.callback, v)
53        return d
54   
55    def syncException(self):
56        1/0
57   
58    def asyncException(self, n):
59        def fail():
60            try:
61                1/0
62            except:
63                d.errback()
64        d = defer.Deferred()
65        reactor.callLater(n, fail)
66        return d
67
68def threadedFunction():
69    tr = ThreadWrapper(reactor)
70    sync = ThreadWrapper(Asynchronous())
71   
72    assert sync.syncResult("foo") == "foo"
73    assert sync.asyncResult(0.5, "bar") == "bar"
74   
75    try:
76        sync.syncException()
77    except ZeroDivisionError:
78        pass
79    else:
80        assert False, "ZeroDivisionError not raised"
81   
82    try:
83        sync.asyncException(0.5)
84    except ZeroDivisionError:
85        pass
86    else:
87        assert False, "ZeroDivisionError not raised"
88
89    print '4 tests passed'
90    tr.stop()
91
92def test():
93    reactor.callInThread(threadedFunction)
94    reactor.run()
95
96test()
97   
Note: See TracBrowser for help on using the browser.