| 1 | |
|---|
| 2 | # System imports |
|---|
| 3 | import select, sys |
|---|
| 4 | |
|---|
| 5 | import libevent |
|---|
| 6 | |
|---|
| 7 | from zope.interface import implements |
|---|
| 8 | |
|---|
| 9 | from twisted.internet import posixbase, main, error |
|---|
| 10 | from twisted.python import log |
|---|
| 11 | from twisted.internet.interfaces import IReactorFDSet |
|---|
| 12 | |
|---|
| 13 | # globals |
|---|
| 14 | reads = {} |
|---|
| 15 | writes = {} |
|---|
| 16 | selectables = {} |
|---|
| 17 | |
|---|
| 18 | class LibEventReactor(posixbase.PosixReactorBase): |
|---|
| 19 | """A reactor that uses libevent.""" |
|---|
| 20 | implements(IReactorFDSet) |
|---|
| 21 | |
|---|
| 22 | def _add(self, xer, flags, mdict): |
|---|
| 23 | """Create the event for reader/writer. |
|---|
| 24 | """ |
|---|
| 25 | fd = xer.fileno() |
|---|
| 26 | if fd not in mdict: |
|---|
| 27 | event = libevent.createEvent(fd, flags, self._doReadOrWrite) |
|---|
| 28 | mdict[fd] = event |
|---|
| 29 | event.addToLoop() |
|---|
| 30 | selectables[fd] = xer |
|---|
| 31 | |
|---|
| 32 | def addReader(self, reader): |
|---|
| 33 | """Add a FileDescriptor for notification of data available to read. |
|---|
| 34 | """ |
|---|
| 35 | self._add(reader, libevent.EV_READ|libevent.EV_PERSIST, reads) |
|---|
| 36 | |
|---|
| 37 | def addWriter(self, writer, writes=writes, selectables=selectables): |
|---|
| 38 | """Add a FileDescriptor for notification of data available to write. |
|---|
| 39 | """ |
|---|
| 40 | self._add(writer, libevent.EV_WRITE|libevent.EV_PERSIST, writes) |
|---|
| 41 | |
|---|
| 42 | def _remove(self, selectable, mdict, other): |
|---|
| 43 | """Remove an event if found. |
|---|
| 44 | """ |
|---|
| 45 | fd = selectable.fileno() |
|---|
| 46 | if fd == -1: |
|---|
| 47 | for fd, fdes in selectables.items(): |
|---|
| 48 | if selectable is fdes: |
|---|
| 49 | break |
|---|
| 50 | else: |
|---|
| 51 | return |
|---|
| 52 | if fd in mdict: |
|---|
| 53 | event = mdict.pop(fd) |
|---|
| 54 | try: |
|---|
| 55 | event.removeFromLoop() |
|---|
| 56 | except: |
|---|
| 57 | pass |
|---|
| 58 | if fd not in other: |
|---|
| 59 | del selectables[fd] |
|---|
| 60 | |
|---|
| 61 | def removeReader(self, reader, reads=reads, writes=writes): |
|---|
| 62 | """Remove a Selectable for notification of data available to read. |
|---|
| 63 | """ |
|---|
| 64 | return self._remove(reader, reads, writes) |
|---|
| 65 | |
|---|
| 66 | def removeWriter(self, writer, writes=writes, reads=reads): |
|---|
| 67 | """Remove a Selectable for notification of data available to write. |
|---|
| 68 | """ |
|---|
| 69 | return self._remove(writer, writes, reads) |
|---|
| 70 | |
|---|
| 71 | def removeAll(self, reads=reads, writes=writes, selectables=selectables): |
|---|
| 72 | """Remove all selectables, and return a list of them.""" |
|---|
| 73 | if self.waker is not None: |
|---|
| 74 | self.removeReader(self.waker) |
|---|
| 75 | result = selectables.values() |
|---|
| 76 | events = reads.copy() |
|---|
| 77 | events.update(writes) |
|---|
| 78 | |
|---|
| 79 | reads.clear() |
|---|
| 80 | writes.clear() |
|---|
| 81 | selectables.clear() |
|---|
| 82 | |
|---|
| 83 | for event in events.values(): |
|---|
| 84 | event.removeFromLoop() |
|---|
| 85 | if self.waker is not None: |
|---|
| 86 | self.addReader(self.waker) |
|---|
| 87 | return result |
|---|
| 88 | |
|---|
| 89 | def _doReadOrWrite(self, fd, events, eventObj, selectables=selectables): |
|---|
| 90 | try: |
|---|
| 91 | selectable = selectables[fd] |
|---|
| 92 | except KeyError: |
|---|
| 93 | return |
|---|
| 94 | why = None |
|---|
| 95 | inRead = False |
|---|
| 96 | try: |
|---|
| 97 | if events & libevent.EV_READ: |
|---|
| 98 | why = selectable.doRead() |
|---|
| 99 | inRead = True |
|---|
| 100 | if not why and events & libevent.EV_WRITE: |
|---|
| 101 | why = selectable.doWrite() |
|---|
| 102 | inRead = False |
|---|
| 103 | if selectable.fileno() != fd: |
|---|
| 104 | why = error.ConnectionFdescWentAway('Filedescriptor went away') |
|---|
| 105 | inRead = False |
|---|
| 106 | except: |
|---|
| 107 | log.err() |
|---|
| 108 | why = sys.exc_info()[1] |
|---|
| 109 | if why: |
|---|
| 110 | self._disconnectSelectable(selectable, why, inRead) |
|---|
| 111 | |
|---|
| 112 | def doIteration(self, timeout): |
|---|
| 113 | libevent.loop(libevent.EVLOOP_NONBLOCK | libevent.EVLOOP_ONCE) |
|---|
| 114 | |
|---|
| 115 | def install(): |
|---|
| 116 | """Install the libevent reactor.""" |
|---|
| 117 | p = LibEventReactor() |
|---|
| 118 | main.installReactor(p) |
|---|
| 119 | |
|---|
| 120 | __all__ = ["LibEventReactor", "install"] |
|---|
| 121 | |
|---|