| 1 |
|
|---|
| 2 |
|
|---|
| 3 |
import select, sys |
|---|
| 4 |
|
|---|
| 5 |
import libevent |
|---|
| 6 |
|
|---|
| 7 |
from zope.interface import implements |
|---|
| 8 |
|
|---|
| 9 |
from twisted.internet import posixbase, main, error |
|---|
| 10 |
from twisted.python import log |
|---|
| 11 |
from twisted.internet.interfaces import IReactorFDSet |
|---|
| 12 |
|
|---|
| 13 |
|
|---|
| 14 |
reads = {} |
|---|
| 15 |
writes = {} |
|---|
| 16 |
selectables = {} |
|---|
| 17 |
|
|---|
| 18 |
class LibEventReactor(posixbase.PosixReactorBase): |
|---|
| 19 |
"""A reactor that uses libevent.""" |
|---|
| 20 |
implements(IReactorFDSet) |
|---|
| 21 |
|
|---|
| 22 |
def _add(self, xer, flags, mdict): |
|---|
| 23 |
"""Create the event for reader/writer. |
|---|
| 24 |
""" |
|---|
| 25 |
fd = xer.fileno() |
|---|
| 26 |
if fd not in mdict: |
|---|
| 27 |
event = libevent.createEvent(fd, flags, self._doReadOrWrite) |
|---|
| 28 |
mdict[fd] = event |
|---|
| 29 |
event.addToLoop() |
|---|
| 30 |
selectables[fd] = xer |
|---|
| 31 |
|
|---|
| 32 |
def addReader(self, reader): |
|---|
| 33 |
"""Add a FileDescriptor for notification of data available to read. |
|---|
| 34 |
""" |
|---|
| 35 |
self._add(reader, libevent.EV_READ|libevent.EV_PERSIST, reads) |
|---|
| 36 |
|
|---|
| 37 |
def addWriter(self, writer, writes=writes, selectables=selectables): |
|---|
| 38 |
"""Add a FileDescriptor for notification of data available to write. |
|---|
| 39 |
""" |
|---|
| 40 |
self._add(writer, libevent.EV_WRITE|libevent.EV_PERSIST, writes) |
|---|
| 41 |
|
|---|
| 42 |
def _remove(self, selectable, mdict, other): |
|---|
| 43 |
"""Remove an event if found. |
|---|
| 44 |
""" |
|---|
| 45 |
fd = selectable.fileno() |
|---|
| 46 |
if fd == -1: |
|---|
| 47 |
for fd, fdes in selectables.items(): |
|---|
| 48 |
if selectable is fdes: |
|---|
| 49 |
break |
|---|
| 50 |
else: |
|---|
| 51 |
return |
|---|
| 52 |
if fd in mdict: |
|---|
| 53 |
event = mdict.pop(fd) |
|---|
| 54 |
try: |
|---|
| 55 |
event.removeFromLoop() |
|---|
| 56 |
except: |
|---|
| 57 |
pass |
|---|
| 58 |
if fd not in other: |
|---|
| 59 |
del selectables[fd] |
|---|
| 60 |
|
|---|
| 61 |
def removeReader(self, reader, reads=reads, writes=writes): |
|---|
| 62 |
"""Remove a Selectable for notification of data available to read. |
|---|
| 63 |
""" |
|---|
| 64 |
return self._remove(reader, reads, writes) |
|---|
| 65 |
|
|---|
| 66 |
def removeWriter(self, writer, writes=writes, reads=reads): |
|---|
| 67 |
"""Remove a Selectable for notification of data available to write. |
|---|
| 68 |
""" |
|---|
| 69 |
return self._remove(writer, writes, reads) |
|---|
| 70 |
|
|---|
| 71 |
def removeAll(self, reads=reads, writes=writes, selectables=selectables): |
|---|
| 72 |
"""Remove all selectables, and return a list of them.""" |
|---|
| 73 |
if self.waker is not None: |
|---|
| 74 |
self.removeReader(self.waker) |
|---|
| 75 |
result = selectables.values() |
|---|
| 76 |
events = reads.copy() |
|---|
| 77 |
events.update(writes) |
|---|
| 78 |
|
|---|
| 79 |
reads.clear() |
|---|
| 80 |
writes.clear() |
|---|
| 81 |
selectables.clear() |
|---|
| 82 |
|
|---|
| 83 |
for event in events.values(): |
|---|
| 84 |
event.removeFromLoop() |
|---|
| 85 |
if self.waker is not None: |
|---|
| 86 |
self.addReader(self.waker) |
|---|
| 87 |
return result |
|---|
| 88 |
|
|---|
| 89 |
def _doReadOrWrite(self, fd, events, eventObj, selectables=selectables): |
|---|
| 90 |
try: |
|---|
| 91 |
selectable = selectables[fd] |
|---|
| 92 |
except KeyError: |
|---|
| 93 |
return |
|---|
| 94 |
why = None |
|---|
| 95 |
inRead = False |
|---|
| 96 |
try: |
|---|
| 97 |
if events & libevent.EV_READ: |
|---|
| 98 |
why = selectable.doRead() |
|---|
| 99 |
inRead = True |
|---|
| 100 |
if not why and events & libevent.EV_WRITE: |
|---|
| 101 |
why = selectable.doWrite() |
|---|
| 102 |
inRead = False |
|---|
| 103 |
if selectable.fileno() != fd: |
|---|
| 104 |
why = error.ConnectionFdescWentAway('Filedescriptor went away') |
|---|
| 105 |
inRead = False |
|---|
| 106 |
except: |
|---|
| 107 |
log.err() |
|---|
| 108 |
why = sys.exc_info()[1] |
|---|
| 109 |
if why: |
|---|
| 110 |
self._disconnectSelectable(selectable, why, inRead) |
|---|
| 111 |
|
|---|
| 112 |
def doIteration(self, timeout): |
|---|
| 113 |
libevent.loop(libevent.EVLOOP_NONBLOCK | libevent.EVLOOP_ONCE) |
|---|
| 114 |
|
|---|
| 115 |
def install(): |
|---|
| 116 |
"""Install the libevent reactor.""" |
|---|
| 117 |
p = LibEventReactor() |
|---|
| 118 |
main.installReactor(p) |
|---|
| 119 |
|
|---|
| 120 |
__all__ = ["LibEventReactor", "install"] |
|---|
| 121 |
|
|---|