| 1 | |
|---|
| 2 | import Queue |
|---|
| 3 | |
|---|
| 4 | from twisted.internet import reactor |
|---|
| 5 | from twisted.internet import defer |
|---|
| 6 | from twisted.python import failure |
|---|
| 7 | |
|---|
| 8 | class ThreadWrapper(object): |
|---|
| 9 | """Wrap an object which presents an asynchronous interface (via Deferreds). |
|---|
| 10 | |
|---|
| 11 | The wrapped object will present the same interface, but all methods will |
|---|
| 12 | return results, rather than Deferreds. |
|---|
| 13 | |
|---|
| 14 | This also makes it safe to call non-threadsafe methods from threads |
|---|
| 15 | other than the IO thread. |
|---|
| 16 | |
|---|
| 17 | This is only useful for wrapping objects accessed by threads other than |
|---|
| 18 | the IO thread. |
|---|
| 19 | """ |
|---|
| 20 | def __init__(self, wrappee): |
|---|
| 21 | self.wrappee = wrappee |
|---|
| 22 | |
|---|
| 23 | def __getattribute__(self, name): |
|---|
| 24 | wrappee = super(ThreadWrapper, self).__getattribute__('wrappee') |
|---|
| 25 | original = getattr(wrappee, name) |
|---|
| 26 | if callable(original): |
|---|
| 27 | return CallableWrapper(original) |
|---|
| 28 | return original |
|---|
| 29 | |
|---|
| 30 | class CallableWrapper(object): |
|---|
| 31 | def __init__(self, original): |
|---|
| 32 | self.original = original |
|---|
| 33 | self.queue = Queue.Queue() |
|---|
| 34 | |
|---|
| 35 | def __call__(__self, *__a, **__kw): |
|---|
| 36 | reactor.callFromThread(__self.__callFromThread, __a, __kw) |
|---|
| 37 | result = __self.queue.get() |
|---|
| 38 | if isinstance(result, failure.Failure): |
|---|
| 39 | result.raiseException() |
|---|
| 40 | return result |
|---|
| 41 | |
|---|
| 42 | def __callFromThread(self, a, kw): |
|---|
| 43 | result = defer.maybeDeferred(self.original, *a, **kw) |
|---|
| 44 | result.addBoth(self.queue.put) |
|---|
| 45 | |
|---|
| 46 | class Asynchronous(object): |
|---|
| 47 | def syncResult(self, v): |
|---|
| 48 | return v |
|---|
| 49 | |
|---|
| 50 | def asyncResult(self, n, v): |
|---|
| 51 | d = defer.Deferred() |
|---|
| 52 | reactor.callLater(n, d.callback, v) |
|---|
| 53 | return d |
|---|
| 54 | |
|---|
| 55 | def syncException(self): |
|---|
| 56 | 1/0 |
|---|
| 57 | |
|---|
| 58 | def asyncException(self, n): |
|---|
| 59 | def fail(): |
|---|
| 60 | try: |
|---|
| 61 | 1/0 |
|---|
| 62 | except: |
|---|
| 63 | d.errback() |
|---|
| 64 | d = defer.Deferred() |
|---|
| 65 | reactor.callLater(n, fail) |
|---|
| 66 | return d |
|---|
| 67 | |
|---|
| 68 | def threadedFunction(): |
|---|
| 69 | tr = ThreadWrapper(reactor) |
|---|
| 70 | sync = ThreadWrapper(Asynchronous()) |
|---|
| 71 | |
|---|
| 72 | assert sync.syncResult("foo") == "foo" |
|---|
| 73 | assert sync.asyncResult(0.5, "bar") == "bar" |
|---|
| 74 | |
|---|
| 75 | try: |
|---|
| 76 | sync.syncException() |
|---|
| 77 | except ZeroDivisionError: |
|---|
| 78 | pass |
|---|
| 79 | else: |
|---|
| 80 | assert False, "ZeroDivisionError not raised" |
|---|
| 81 | |
|---|
| 82 | try: |
|---|
| 83 | sync.asyncException(0.5) |
|---|
| 84 | except ZeroDivisionError: |
|---|
| 85 | pass |
|---|
| 86 | else: |
|---|
| 87 | assert False, "ZeroDivisionError not raised" |
|---|
| 88 | |
|---|
| 89 | print '4 tests passed' |
|---|
| 90 | tr.stop() |
|---|
| 91 | |
|---|
| 92 | def test(): |
|---|
| 93 | reactor.callInThread(threadedFunction) |
|---|
| 94 | reactor.run() |
|---|
| 95 | |
|---|
| 96 | test() |
|---|
| 97 | |
|---|