| 1 |
|
|---|
| 2 |
|
|---|
| 3 |
|
|---|
| 4 |
|
|---|
| 5 |
|
|---|
| 6 |
|
|---|
| 7 |
|
|---|
| 8 |
""" flow.wrap |
|---|
| 9 |
|
|---|
| 10 |
This module provides the wrap() function in the flow module and |
|---|
| 11 |
the private classes used for its implementation. |
|---|
| 12 |
""" |
|---|
| 13 |
|
|---|
| 14 |
from base import * |
|---|
| 15 |
from twisted.python.failure import Failure |
|---|
| 16 |
from twisted.internet.defer import Deferred |
|---|
| 17 |
|
|---|
| 18 |
class _String(Stage): |
|---|
| 19 |
""" Wrapper for a string object; don't create directly use flow.wrap |
|---|
| 20 |
|
|---|
| 21 |
This is probably the simplest stage of all. It is a |
|---|
| 22 |
constant list of one item. See wrap for an example. |
|---|
| 23 |
|
|---|
| 24 |
""" |
|---|
| 25 |
def __init__(self, str): |
|---|
| 26 |
Stage.__init__(self) |
|---|
| 27 |
self.results.append(str) |
|---|
| 28 |
self.stop = True |
|---|
| 29 |
def _yield(self): |
|---|
| 30 |
pass |
|---|
| 31 |
|
|---|
| 32 |
class _List(Stage): |
|---|
| 33 |
""" Wrapper for lists and tuple objects; don't create directly |
|---|
| 34 |
|
|---|
| 35 |
A simple stage, which admits the usage of instructions, |
|---|
| 36 |
such as Cooperate() within the list. This would be |
|---|
| 37 |
much simpler without logic to handle instructions. |
|---|
| 38 |
|
|---|
| 39 |
""" |
|---|
| 40 |
def __init__(self, seq): |
|---|
| 41 |
Stage.__init__(self) |
|---|
| 42 |
self._seq = list(seq) |
|---|
| 43 |
def _yield(self): |
|---|
| 44 |
seq = self._seq |
|---|
| 45 |
while seq: |
|---|
| 46 |
result = seq.pop(0) |
|---|
| 47 |
if isinstance(result, Instruction): |
|---|
| 48 |
return result |
|---|
| 49 |
self.results.append(result) |
|---|
| 50 |
self.stop = True |
|---|
| 51 |
|
|---|
| 52 |
class _DeferredInstruction(CallLater): |
|---|
| 53 |
def __init__(self, deferred): |
|---|
| 54 |
self.deferred = deferred |
|---|
| 55 |
def callLater(self, callable): |
|---|
| 56 |
self.deferred.addBoth(callable) |
|---|
| 57 |
|
|---|
| 58 |
class _Iterable(Stage): |
|---|
| 59 |
""" Wrapper for iterable objects, pass in a next() function |
|---|
| 60 |
|
|---|
| 61 |
This wraps functions (or bound methods). Execution starts with |
|---|
| 62 |
the initial function. If the return value is a Stage, then |
|---|
| 63 |
control passes on to that stage for the next round of execution. |
|---|
| 64 |
If the return value is Cooperate, then the chain of Stages is |
|---|
| 65 |
put on hold, and this return value travels all the way up the |
|---|
| 66 |
call stack so that the underlying mechanism can sleep, or |
|---|
| 67 |
perform other tasks, etc. All other non-Instruction return |
|---|
| 68 |
values, Failure objects included, are passed back to the |
|---|
| 69 |
previous stage via self.result |
|---|
| 70 |
|
|---|
| 71 |
All exceptions signal the end of the Stage. StopIteration |
|---|
| 72 |
means to stop without providing a result, while all other |
|---|
| 73 |
exceptions provide a Failure self.result followed by stoppage. |
|---|
| 74 |
|
|---|
| 75 |
""" |
|---|
| 76 |
def __init__(self, iterable, *trap): |
|---|
| 77 |
Stage.__init__(self, *trap) |
|---|
| 78 |
self._iterable = iter(iterable) |
|---|
| 79 |
self._next = None |
|---|
| 80 |
|
|---|
| 81 |
def _yield(self): |
|---|
| 82 |
""" executed during a yield statement """ |
|---|
| 83 |
if self.results or self.stop or self.failure: |
|---|
| 84 |
return |
|---|
| 85 |
while True: |
|---|
| 86 |
next = self._next |
|---|
| 87 |
if next: |
|---|
| 88 |
instruction = next._yield() |
|---|
| 89 |
if instruction: |
|---|
| 90 |
return instruction |
|---|
| 91 |
self._next = None |
|---|
| 92 |
try: |
|---|
| 93 |
result = self._iterable.next() |
|---|
| 94 |
if isinstance(result, Instruction): |
|---|
| 95 |
if isinstance(result, Stage): |
|---|
| 96 |
self._next = result |
|---|
| 97 |
continue |
|---|
| 98 |
return result |
|---|
| 99 |
if isinstance(result, Deferred): |
|---|
| 100 |
if result.called: |
|---|
| 101 |
continue |
|---|
| 102 |
return _DeferredInstruction(result) |
|---|
| 103 |
self.results.append(result) |
|---|
| 104 |
except StopIteration: |
|---|
| 105 |
self.stop = True |
|---|
| 106 |
except Failure, fail: |
|---|
| 107 |
self.failure = fail |
|---|
| 108 |
except: |
|---|
| 109 |
self.failure = Failure() |
|---|
| 110 |
return |
|---|
| 111 |
|
|---|
| 112 |
class _Deferred(Stage): |
|---|
| 113 |
""" Wraps a Deferred object into a stage; create with flow.wrap |
|---|
| 114 |
|
|---|
| 115 |
This stage provides a callback 'catch' for errback and |
|---|
| 116 |
callbacks. If not called, then this returns an Instruction |
|---|
| 117 |
which will let the reactor execute other operations, such |
|---|
| 118 |
as the producer for this deferred. |
|---|
| 119 |
|
|---|
| 120 |
""" |
|---|
| 121 |
def __init__(self, deferred, *trap): |
|---|
| 122 |
Stage.__init__(self, *trap) |
|---|
| 123 |
self._called = False |
|---|
| 124 |
deferred.addCallbacks(self._callback, self._errback) |
|---|
| 125 |
self._cooperate = _DeferredInstruction(deferred) |
|---|
| 126 |
|
|---|
| 127 |
def _callback(self, res): |
|---|
| 128 |
self._called = True |
|---|
| 129 |
self.results = [res] |
|---|
| 130 |
|
|---|
| 131 |
def _errback(self, fail): |
|---|
| 132 |
self._called = True |
|---|
| 133 |
self.failure = fail |
|---|
| 134 |
|
|---|
| 135 |
def _yield(self): |
|---|
| 136 |
if self.results or self.stop or self.failure: |
|---|
| 137 |
return |
|---|
| 138 |
if not self._called: |
|---|
| 139 |
return self._cooperate |
|---|
| 140 |
if self._called: |
|---|
| 141 |
self.stop = True |
|---|
| 142 |
return |
|---|
| 143 |
|
|---|
| 144 |
def wrap(obj, *trap): |
|---|
| 145 |
""" Wraps various objects for use within a flow |
|---|
| 146 |
|
|---|
| 147 |
The following example illustrates many different |
|---|
| 148 |
ways in which regular objects can be wrapped by |
|---|
| 149 |
the flow module to behave in a cooperative manner. |
|---|
| 150 |
|
|---|
| 151 |
# required imports |
|---|
| 152 |
from __future__ import generators |
|---|
| 153 |
from twisted.flow import flow |
|---|
| 154 |
from twisted.internet import reactor, defer |
|---|
| 155 |
|
|---|
| 156 |
# save this function, it is used everwhere |
|---|
| 157 |
def printFlow(source): |
|---|
| 158 |
def printer(source): |
|---|
| 159 |
source = flow.wrap(source) |
|---|
| 160 |
while True: |
|---|
| 161 |
yield source |
|---|
| 162 |
print source.next() |
|---|
| 163 |
d = flow.Deferred(printer(source)) |
|---|
| 164 |
d.addCallback(lambda _: reactor.stop()) |
|---|
| 165 |
reactor.run() |
|---|
| 166 |
|
|---|
| 167 |
source = "string" |
|---|
| 168 |
printFlow(source) |
|---|
| 169 |
|
|---|
| 170 |
source = ["one",flow.Cooperate(1),"two"] |
|---|
| 171 |
printFlow(source) |
|---|
| 172 |
|
|---|
| 173 |
def source(): |
|---|
| 174 |
yield "aeye" |
|---|
| 175 |
yield flow.Cooperate() |
|---|
| 176 |
yield "capin" |
|---|
| 177 |
printFlow(source) |
|---|
| 178 |
|
|---|
| 179 |
source = Deferred() |
|---|
| 180 |
reactor.callLater(1, lambda: source.callback("howdy")) |
|---|
| 181 |
printFlow(source) |
|---|
| 182 |
|
|---|
| 183 |
""" |
|---|
| 184 |
if isinstance(obj, Stage): |
|---|
| 185 |
if trap: |
|---|
| 186 |
|
|---|
| 187 |
trap = list(trap) |
|---|
| 188 |
for ex in obj._trap: |
|---|
| 189 |
if ex not in trap: |
|---|
| 190 |
trap.append(ex) |
|---|
| 191 |
obj._trap = tuple(trap) |
|---|
| 192 |
return obj |
|---|
| 193 |
|
|---|
| 194 |
if callable(obj): |
|---|
| 195 |
obj = obj() |
|---|
| 196 |
|
|---|
| 197 |
typ = type(obj) |
|---|
| 198 |
|
|---|
| 199 |
if typ is type([]) or typ is type(tuple()): |
|---|
| 200 |
return _List(obj) |
|---|
| 201 |
|
|---|
| 202 |
if typ is type(''): |
|---|
| 203 |
return _String(obj) |
|---|
| 204 |
|
|---|
| 205 |
if isinstance(obj, Deferred): |
|---|
| 206 |
return _Deferred(obj, *trap) |
|---|
| 207 |
|
|---|
| 208 |
try: |
|---|
| 209 |
return _Iterable(obj, *trap) |
|---|
| 210 |
except TypeError: |
|---|
| 211 |
pass |
|---|
| 212 |
|
|---|
| 213 |
raise ValueError, "A wrapper is not available for %r" % (obj,) |
|---|
| 214 |
|
|---|