Introduction to Event Driven Programming Using Twisted

Jean-Paul Calderone
exarkun@twistedmatrix.com
http://as.ynchrono.us/

Time for questions at the end of each section

Try for question time at the very end

Event Driven Programming

Contrasts

vs Single-Tasking Approaches

1 from requests import get
2 twistedmatrix = get("http://twistedmatrix.com/")
3 google = get("http://google.com/")
4 print twistedmatrix, google

Network Diagram

one thread, call blocking function, get result as return value

Limitation - non-utilized resources wasted

(local cpu, network connection, remote cpu)

Event-driven systems aren't limited to a single task at a time

vs Multi-Threaded ("pthreads") Approaches

 1 from threading import Thread
 2 from requests import get
 3 
 4 def gettm():
 5     global twistedmatrix
 6     twistedmatrix = get("http://twistedmatrix.com/")
 7 
 8 def getgoog():
 9     global google
10     google = get("http://google.com/")
11 
12 tmThread = Thread(target=gettm, args=())
13 googThread = Thread(target=getgoog, args=())
14 
15 tmThread.start()
16 googThread.start()
17 
18 tmThread.join()
19 googThread.join()
20 
21 print twistedmatrix, google

multiple threads, each call blocking function, get result as return value

event-driven systems don't need threads (but may use them)

vs Multi-Threaded ("pthreads") Approaches

Network Diagram

Better resource utilization

Event Driven Example: Twisted

 1 from twisted.web.client import getPage
 2 from twisted.internet.defer import gatherResults
 3 from twisted.internet import reactor
 4 
 5 def reportResults(results):
 6     twistedmatrix, google = results
 7     print twistedmatrix, google
 8 
 9 d1 = getPage("http://twistedmatrix.com/")
10 d2 = getPage("http://google.com/")
11 gatherResults([d1, d2]).addCallback(reportResults)
12 
13 reactor.run()

Uses Twisted for event-driven approach

How it works will be explained

Twisted Network Diagram

Network Diagram

Single threaded, good resource utilization

Event Driven Example: PyGame

 1 while True:
 2     for event in pygame.event.get():
 3         if event.type == pygame.locals.QUIT or \
 4                 event.type == pygame.KEYDOWN and \
 5                 event.key == pygame.K_q:
 6             self.stop()
 7         elif event.type == pygame.KEYDOWN:
 8             self.controller.keyDown(event.key)
 9         elif event.type == pygame.KEYUP:
10             self.controller.keyUp(event.key)
11         elif event.type == pygame.MOUSEMOTION:
12             if pygame.event.get_grab():
13                 self.controller.mouseMotion(
14                     event.pos, event.rel, event.buttons)
15         elif event.type == pygame.MOUSEBUTTONUP:
16             grabbed = pygame.event.get_grab()
17             pygame.event.set_grab(not grabbed)
18             visible = pygame.mouse.set_visible(True)
19             pygame.mouse.set_visible(not visible)

Saw what other approaches look like

What does event driven approach look like?

Call a function to get all events that happened

Dispatch them yourself

was it this, was it that, etc

Somewhat primitive/low-level

Event Driven Example: JavaScript/Browser

 1 var handleBodyKeyDown = function(event) {
 2   switch (event.keyCode) {
 3     case 37: // left arrow
 4       prevSlide();
 5       break;
 6     case 39: // right arrow
 7     case 32: // space
 8       nextSlide();
 9       break;
10   }
11 };
12 
13 document.addEventListener(
14     'keydown', handleBodyKeyDown, false);

Loop is someone else's problem

Connect function to event source

Execution falls off the end

Someone else calls the function when it is time

Event Driven Example: Gtk Example

Data:

1 <signal name="response" handler="on_loginDialog_response" />

Code:

 1 glade = glade.XML(gladefile)
 2 glade.signal_autoconnect(self)
 3 
 4 def on_loginDialog_response(self, widget, response):
 5     handlers = {gtk.RESPONSE_NONE: self.windowClosed,
 6                gtk.RESPONSE_DELETE_EVENT: self.windowClosed,
 7                gtk.RESPONSE_OK: self.doLogin,
 8                gtk.RESPONSE_CANCEL: self.cancelled}
 9     handlers.get(response)()
10 
11 gtk.main()

Glade file defines callback association

Python code loads definition, connects "signals" - event source

Loop is somewhere in Gtk

Someone calls the function when it is time

What Event-Driven Programming Isn't

Not Parallel Computation

1 while True:
2     for event in pygame.event.get():
3         if event.key == pygame.K_a:
4             print 'Ackermann result:', ackermann(4, 2)
5         elif event.key == pygame.K_f:
6             print 'Some fibonacci:', fibonacci(15)

Single thread only does a single thing at once

Event-Driven approach can help manage multiple threads or processes

Not Guaranteed Race Condition Free

1 while True:
2     for event in pygame.event.get():
3         if event.type == pygame.KEYDOWN:
4             start = time.time()
5         elif event.type == pygame.KEYUP:
6             end = time.time()
7             print 'Key down for', end - start, 'seconds'
8             start = end = None

Press a, b, release a, b - oops

Multiple things are happening

Shared mutable data can still get corrupted by unexpected ordering

But no pre-emptive context switching means critical sections are easier

Inside The Reactor

Standard I/O Echo "Server"

1 import sys
2 
3 def main():
4     while True:
5         data = sys.stdin.read(1)
6         if not data:
7             break
8         sys.stdout.write(data)

Understanding how an event loop (reactor) works makes understanding the programming model easier

I'll build up an event loop one feature at a time

...

Byte-echoing application

Supports one "client"

Application logic tied to stdio

.read(1) == "" means EOF, exit from loop

Read one byte at a time because that's the easiest thing to deal with

Standard I/O Echo "Server" Unit Test

 1 def test():
 2     from StringIO import StringIO
 3     sys.stdin = StringIO("foo\nbar\n")
 4     sys.stdout = StringIO()
 5     try:
 6         main()
 7         assert sys.stdout.getvalue() == "foo\nbar\n"
 8     finally:
 9         sys.stdin = sys.__stdin__
10         sys.stdout = sys.__stdout__

Requires monkey patching

Mixes tests for I/O with the application logic

Factor Application Apart from Input

 1 import sys
 2 
 3 def echo(data):
 4     sys.stdout.write(data)
 5 
 6 def main():
 7     while True:
 8         data = sys.stdin.read(1)
 9         if not data:
10             break
11         echo(data)

Application code freed of doing reads

Still does writes

Improved Unit Test

1 def test():
2     from StringIO import StringIO
3     sys.stdout = StringIO()
4     try:
5         echo("foo\n")
6         echo("bar\n")
7         assert sys.stdout.getvalue() == "foo\nbar\n"
8     finally:
9         sys.stdout = sys.__stdout__

Still monkey-patches stdout

Mixes in less I/O

Supplying test data is easier

Factor Application Apart from Output

 1 import sys
 2 
 3 def echo(stdout, data):
 4     stdout.write(data)
 5 
 6 def main():
 7     while True:
 8         data = sys.stdin.read(1)
 9         if not data:
10             break
11         echo(sys.stdout, data)

Application logic divorced from sys.stdin, sys.stdout

Pleasant Unit Test

1 def test():
2     from StringIO import StringIO
3     stdout = StringIO()
4     echo(stdout, "foo\n")
5     echo(stdout, "bar\n")
6     assert stdout.getvalue() == "foo\nbar\n"

Zero monkey-patching

Only testing application logic, not I/O implementation

Stateful Protocols

 1 import sys
 2 
 3 class Echoer:
 4     def makeConnection(self, stdout):
 5         self.stdout = stdout
 6 
 7     def dataReceived(self, data):
 8         self.stdout.write(data)
 9 
10 def main():
11     echo = Echoer()
12     echo.makeConnection(sys.stdout)
13 
14     while True:
15         data = sys.stdin.read(1)
16         if not data:
17             break
18         echo.dataReceived(data)

Most protocols are stateful

Standard Python approach, use an instance

Simple mainloop change to call dataReceived

Unit Tests, Still

1 def test():
2     from StringIO import StringIO
3     output = StringIO()
4     echo = Echoer()
5     echo.makeConnection(output)
6     echo.dataReceived("foo\n")
7     echo.dataReceived("bar\n")
8     assert output.getvalue() == "foo\nbar\n"

Still look okay, just using new API

Stateful Line Echo Server

 1 class LineProtocol:
 2     def makeConnection(self, stdout):
 3         self.stdout = stdout
 4         self.buffer = ""
 5 
 6     def dataReceived(self, data):
 7         self.buffer += data
 8         lines = self.buffer.split("\n")
 9         self.buffer = lines.pop()
10         for line in lines:
11             self.lineReceived(line)
12 
13     def lineReceived(self, line):
14         pass
15 
16 class Echoer(LineProtocol):
17     def lineReceived(self, line):
18         self.stdout.write(line + "\n")

Change to echo lines instead of bytes

Line parsing buffer is more state

Another reason instance is useful

Classes allow line parsing separate from echo logic

(Still Unit Tested)

 1 def test():
 2     liner = LineProtocol()
 3     received = []
 4     liner.lineReceived = received.append
 5     liner.makeConnection(None)
 6     liner.dataReceived("fo")
 7     liner.dataReceived("o\n")
 8     liner.dataReceived("bar\n")
 9     assert received == ["foo", "bar"]
10 
11     from StringIO import StringIO
12     output = StringIO()
13     echo = Echoer()
14     echo.makeConnection(output)
15     echo.lineReceived("foo")
16     echo.lineReceived("bar")
17     assert output.getvalue() == "foo\nbar\n"

Unit tests for each class

Typical advantages of classes, separation of concerns

Easily separated

Only exercise the relevant logic in each

More Protocol Abstraction

 1 class LineProtocol:
 2     delimiter = "\n"
 3 
 4     def makeConnection(self, stdout):
 5         self.stdout = stdout
 6         self.buffer = ""
 7 
 8     def dataReceived(self, data):
 9         self.buffer += data
10         lines = self.buffer.split(self.delimiter)
11         self.buffer = lines.pop()
12         for line in lines:
13             self.lineReceived(line)
14 
15     def sendLine(self, line):
16         if self.delimiter in line:
17             raise ValueError()
18         self.stdout.write(line + self.delimiter)
19 
20     def lineReceived(self, line):
21         pass

Already saw protocol handle parsing bytes

Protocol can handle generating bytes too

Keep byte mashing logic out of application code

More Unit Tests

 1 def test():
 2     output = StringIO()
 3     liner = LineProtocol()
 4     liner.makeConnection(output)
 5 
 6     try:
 7         liner.sendLine("foo\nbar")
 8     except ValueError:
 9         pass
10     else:
11         assert False, "ValueError not raised"
12 
13     liner.sendLine("foobar")
14     assert output.getvalue() == "foobar\n"

Unit test byte generation code too

Protocol method makes good place for consistency checks

Which can also be tested

Use Twisted's Protocol base class

 1 from twisted.internet.protocol import Protocol
 2 
 3 class LineProtocol(Protocol):
 4     delimiter = "\n"
 5     buffer = ""
 6 
 7     def dataReceived(self, data):
 8         self.buffer += data
 9         lines = self.buffer.split(self.delimiter)
10         self.buffer = lines.pop()
11         for line in lines:
12             self.lineReceived(line)
13 
14     def sendLine(self, line):
15         self.transport.write(line + self.delimiter)
16 
17     def lineReceived(self, line):
18         pass

Twisted provided Protocol looks similar to this

"stdout" changed to "transport" - indicates transport indifference

Subclass it, inherit connectionMade to set transport

Or Twisted's line parsing protocol

1 from twisted.protocols.basic import LineOnlyReceiver
2 
3 class Echoer(LineOnlyReceiver):
4     delimiter = "\n"
5 
6     def lineReceived(self, line):
7         self.sendLine(line)

And there's a line parser too

Does all the parsing

Provides the sendLine method

Another benefit of classes - libraries, code sharing, reuse

All that's left is application logic

Why Blocking Is Bad

Protocol is pretty well factored at this point

About to talk about improving the mainloop

First, look at what problems blocking might cause

Why generally want to avoid it

A Blocking Protocol

1 import time, random
2 from twisted.protocols.basic import LineOnlyReceiver
3 
4 class Echoer(LineOnlyReceiver):
5     delimiter = "\n"
6 
7     def lineReceived(self, line):
8         time.sleep(random.random())
9         self.sendLine(line)

Same echo protocol, but with time.sleep

time.sleep blocks

Echoer.lineReceived won't finish until sleep and sendLine are done

Consequences

 1 def main():
 2     echo = Echoer()
 3     echo.makeConnection(sys.stdout)
 4 
 5     while True:
 6         data = sys.stdin.read(1)
 7         if not data:
 8             break
 9         echo.dataReceived(data)
10             -> Echoer.lineReceived
11                 -> time.sleep

Here's our event loop

Marked up with call stack

Loop calls dataReceived

dataReceived calls lineReceived

lineReceived calls sleep

... nothing returns until sleep is done

Loop doesn't iterate until sleep is done

Further input not handled until sleep is done

Reactor isn't reacting until sleep is done

Makes no difference for single-connection application

Keep in mind for what comes next though

Protocol Conclusion

1 from twisted.protocols.basic import LineOnlyReceiver
2 
3 class Echoer(LineOnlyReceiver):
4     delimiter = "\n"
5 
6     def lineReceived(self, line):
7         self.sendLine(line)

Application logic separate from byte parsing logic

Application logic separate from byte generation logic

Byte parsing/generation logic re-usable

Application logic separate from actual I/O - easily tested, transport agnostic (tcp, ssl, stdio)

Next: start incrementally improving the main loop

The Event Loop

Slight Main Loop Tweak

 1 from twisted.internet.protocol import FileWrapper
 2 
 3 def main():
 4     echo = Echoer()
 5     echo.makeConnection(FileWrapper(sys.stdout))
 6 
 7     while True:
 8         data = sys.stdin.read(1)
 9         if not data:
10             break
11         echo.dataReceived(data)

Before real improvements, use FileWrapper to make sys.stdout look like a transport

Simple wrapper - adds "disconnecting" attribute, nothing interesting

Just compatibility with LineOnlyReceiver/Echoer

Parameterize The Application

 1 def loop(protocol):
 2     protocol.makeConnection(FileWrapper(sys.stdout))
 3 
 4     while True:
 5         data = sys.stdin.read(1)
 6         if not data:
 7             break
 8         protocol.dataReceived(data)
 9 
10 def main():
11     echoer = Echoer()
12     loop(echoer)

Take the hard-coded protocol out of the loop

Parameter for caller to supply value

Now the loop runs any application (over stdio)

Multiple Event Sources

Significant missing feature

Handle more than one connection

How to add that without threads

Respond To First Event That Happens

 1 from socket import socket
 2 
 3 twistedmatrix = socket()
 4 twistedmatrix.setblocking(False)
 5 
 6 google = socket()
 7 google.setblocking(False)
 8 
 9 twistedmatrix.connect_ex(('www.twistedmatrix.com', 80))
10 google.connect_ex(('www.google.com', 80))
11 
12 events = ???????????????????????
13 if len(events) == 2:
14     print 'Tied'
15 elif events[0] == twistedmatrix:
16     print 'Connected to www.twistedmatrix.com first'
17 elif events[0] == google:
18     print 'Connected to www.google.com first'

Some ugly code for setting up two TCP connections

Avoid blocking

connect_ex is the "do not raise an exception" version of connect

connect_ex starts the attempt, returns before it is done

Sockets in mysterious intermediate state for a while

How to find out when they are connected

First connection

select

1 from select import select
2 ...
3 events = select([], [twistedmatrix, google], [])[1]
4 ...

Very old, simple way - select: ask the platform.

read list, write list, other list (pay no attention to it)

returns readable list, writeable list, other list (pay no attention to it)

waits for socket in write list to become writeable

esoteric: writeable also means "connected"

returns as soon as any of them are

first wins!

blocks

poll

1 from select import POLLOUT, poll
2 p = poll()
3 ...
4 p.register(twistedmatrix, POLLOUT)
5 p.register(google, POLLOUT)
6 events = p.poll()
7 ...

Slightly better way - poll

Same idea as select - different API

Somewhat better big-O

Works the same, p.poll() blocks until a socket is writeable

epoll

1 from select import EPOLLOUT, epoll
2 p = epoll()
3 ...
4 p.register(twistedmatrix, EPOLLOUT)
5 p.register(google, EPOLLOUT)
6 events = p.poll()
7 ...

epoll - Linux specific

Same idea - looks pretty similar to poll

Somewhat better big-O

Works the same, p.poll() blocks until a socket is writeable

kqueue

1 from select import (
2     KQ_FILTER_WRITE as WRITE, KQ_EV_ADD as ADD, kqueue, kevent)
3 kq = kqueue()
4 ...
5 kq.control([kevent(twistedmatrix, WRITE, ADD)], 0)
6 kq.control([kevent(google, WRITE, ADD)], 0)
7 events = kq.control([], 2)
8 ...

kqueue - BSD (includes OSX) specific

Same idea - but looks a little different

About same big-O as epoll

Works the same, kq.control blocks until a socket is writeable

returns at most 2 events

MsgWaitForMultipleObjects

 1 from win32file import FD_CONNECT, WSAEventSelect
 2 from win32event import (
 3     WAIT_OBJECT_0, CreateEvent, MsgWaitForMultipleObjects)
 4 handles = []
 5 ...
 6 event = CreateEvent(None, 0, 0, None)
 7 WSAEventSelect(twistedmatrix, event, FD_CONNECT)
 8 handles.append(event)
 9 event = CreateEvent(None, 0, 0, None)
10 WSAEventSelect(google, event, FD_CONNET)
11 handles.append(event)
12 index = MsgWaitForMultipleObjects(handles, 0, -1, QS_ALLINPUT)
13 events = [handles[index - WAIT_OBJECT_0]]
14 ...

WaitForMultipleObjects - Windows specific

Same idea, looks different again

Big-O about the same as select

MsgWaitForMultipleObjects blocks until a "handle" is connected

esoteric: doesn't consider "connect" and "writeable" the same thing

Obtuse way of getting results out

Only returns one event - no ties allowed

GetQueuedCompletionStatus

 1 # Some Cython to bind the necessary Win32 APIs
 2 
 3 ctypedef size_t HANDLE
 4 ctypedef unsigned long DWORD
 5 ctypedef size_t ULONG_PTR
 6 ctypedef int BOOL
 7 
 8 cdef extern from "windows.h":
 9     ctypedef struct OVERLAPPED:
10         pass
11     HANDLE CreateIoCompletionPort(HANDLE fileHandle, HANDLE existing, ULONG_PTR key, DWORD numThreads)
12     BOOL GetQueuedCompletionStatus(HANDLE port, DWORD *bytes, ULONG_PTR *key, OVERLAPPED **ov, DWORD timeout)
13     enum:
14         INVALID_HANDLE_VALUE
15 
16 cdef class CompletionPort:
17     cdef HANDLE port
18     def __init__(self):
19         cdef HANDLE res
20         res = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0)
21         if not res:
22             raise_error(0, 'CreateIoCompletionPort')
23         self.port = res
24 
25     def addHandle(self, HANDLE handle, size_t key=0):
26         cdef HANDLE res
27         res = CreateIoCompletionPort(handle, self.port, key, 0)
28         if not res:
29             raise_error(0, 'CreateIoCompletionPort')
30 
31     def getEvent(self, long timeout):
32         cdef PyThreadState *_save
33         cdef unsigned long bytes, rc
34         cdef size_t key
35         cdef myOVERLAPPED *ov
36 
37         _save = PyEval_SaveThread()
38         rc = GetQueuedCompletionStatus(self.port, &bytes, &key, <OVERLAPPED **>&ov, timeout)
39         PyEval_RestoreThread(_save)
40 
41         if not rc:
42             rc = GetLastError()
43         else:
44             rc = 0
45 
46         obj = None
47         if ov:
48             if ov.obj:
49                 obj = <object>ov.obj
50                 Py_DECREF(obj) # we are stealing a reference here
51             PyMem_Free(ov)
52 
53         return (rc, bytes, key, obj)
54 
55     # Some Python, using the class defined by that Cython
56     port = CompletionPort()
57     ...
58     port.addHandle(twistedmatrix.fileno())
59     port.addHandle(google.fileno())
60 
61     events = []
62     rc, bytes, key, evt = port.getEvent(timeout)
63     while 1:
64         if rc == WAIT_TIMEOUT:
65             break
66         if key != KEY_WAKEUP:
67             assert key == KEY_NORMAL
68             events.append(evt.owner)
69         rc, bytes, key, evt = self.port.getEvent(0)
70     ...

I/O Completion Ports - Windows specific

Ahem

Skip the details of this one

It works the same, honest.

Big-O maybe about the same as epoll/kqueue

... Select!

 1 def loop(protocol):
 2     protocol.makeConnection(FileWrapper(sys.stdout))
 3 
 4     connections = {sys.stdin: protocol}
 5     while True:
 6         readers = connections.keys()
 7         readable, _, _ = select.select(readers, [], [])
 8         for transport in readable:
 9             protocol = connections[transport]
10             data = transport.read(1)
11             if not data:
12                 return
13             protocol.dataReceived(data)

Anyway, stick to select

All the same, select fits best on slides

Notice protocol code has no idea what reactor is using

Different reactors can use different mechanisms, protocol is unaffected

connections dictionary

keys are things to pass to select()

values are protocol instances to pass data to

block on select, wait for there to be data to read

Read it and pass to protocol

Still abort the loop when EOF is reached - fine for one protocol

Two Protocols

 1 def acceptOneClient():
 2     port = socket.socket()
 3     port.bind(('', 12345))
 4     port.listen(1)
 5     client, address = port.accept()
 6     return client.makefile('rw', 0)
 7 
 8 def loop(stdio, tcp):
 9     stdio.makeConnection(FileWrapper(sys.stdout))
10 
11     clientFile = acceptOneClient()
12     tcp.makeConnection(FileWrapper(clientFile))
13 
14     connections = {sys.stdin: stdio, clientFile: tcp}
15     while True:
16         readers = connections.keys()
17         readable, _, _ = select.select(readers, [], [])
18         for transport in readable:
19             protocol = connections[transport]
20             data = transport.read(1)
21             if not data:
22                 return
23             protocol.dataReceived(data)

Two connections

second item in connections dict

select does the rest

defect: loop aborts on first lost connection

Graceful Disconnection

 1 connections = {sys.stdin: stdio, clientFile: tcp}
 2 while True:
 3     readers = connections.keys()
 4     readable, _, _ = select.select(readers, [], [])
 5     for transport in readable:
 6         protocol = connections[transport]
 7         data = transport.read(1)
 8         if not data:
 9             del connections[transport]
10             protocol.connectionLost("connection done")
11             continue
12         protocol.dataReceived(data)

Handle disconnect differently

lost connection callback

remove item from dict

Multi-connection support: this is it.

Real reactor does fundamentally the same thing

Timed Events

1 calls = []
2 
3 def callLater(delay, function):
4     calls.append((time.time() + delay, function))
5     calls.sort()

Simplest approach

Globals bad, etc; good for slides.

Add time-to-call-function and function to list

Keep sorted by time, for ease

First item is next thing needed to do

Timeout Support in select()

1 from select import select
2 ...
3 timeout = 1.0
4 events = select([], [twistedmatrix, google], [], timeout)[1]
5 if events == []:
6     print 'One second passed with no completed connection'
7 else:
8     ...

select and all other APIs have a timeout

If no result before timeout elapses, return no results

Timed Events Support In Event Loop

 1 while True:
 2     if calls:
 3         timeout = max(0, calls[0][0] - time.time())
 4     else:
 5         timeout = None
 6 
 7     readers = connections.keys()
 8     readable, _, _ = select.select(readers, [], [], timeout)
 9     for transport in readable:
10         protocol = connections[transport]
11         data = transport.read(1)
12         if not data:
13             del connections[transport]
14             protocol.connectionLost("connection done")
15             continue
16         protocol.dataReceived(data)
17 
18     while calls and calls[0][0] < time.time():
19         delay, function = calls.pop(0)
20         function()

Use time to next function as timeout

After I/O, check to see if any functions should be called

Call them

That's a reactor.

The Reactor

 1 from twisted.internet import reactor
 2 from twisted.internet.protocol import ServerFactory
 3 
 4 class EchoFactory(ServerFactory):
 5     def buildProtocol(self, clientAddress):
 6         return Echoer()
 7 
 8 def main():
 9     echoer = Echoer()
10     StandardIO(echoer)
11 
12     factory = EchoFactory()
13     reactor.listenTCP(12345, factory)
14 
15     reactor.run()

Replace custom loop with twisted.internet.reactor

reactor doing the same thing as our custom event loop

ServerFactory lets the server support multiple client connections

buildProtocol is called for each accepted connection to get a protocol instance to use

Other Transports

1 reactor.listenTCP(12345, factory)
2 
3 reactor.listenUNIX('echo', factory)
4 
5 ctxFactory = DefaultOpenSSLContextFactory(
6     'server.key', 'server.pem')
7 reactor.listenSSL(12346, factory, ctxFactory)

Twisted supports other kinds of sockets

UNIX sockets: sockets as files on filesystem

SSL sockets: really SSL protocol on TCP socket

Just more of the same; only boring details are different

Organizing Callbacks

Saw some examples of other approaches

They have some drawbacks

Sample Application - Internet Time Client

C: HEAD / HTTP/1.1
C:
S: HTTP/1.1 200 OK
S: Date: Mon, 08 Mar 2010 19:03:25 GMT
S: Expires: -1
S: Cache-Control: private, max-age=0
S: Content-Type: text/html; charset=ISO-8859-1
S: Set-Cookie: PREF=ID=c7d1998c3da1fc4b:TM=1268075005:LM=1268075005:S=entEyiRrSKVBAXmu; expires=Wed, 07-Mar-2012 19:03:25 GMT; path=/; domain=.google.com
S: Set-Cookie: NID=32=joVRaBMdFUJasbS9FWjsBMSBYCwR6C5SfCa5xUP2hCFWzo-CeMCnhPmSthgijMzjovXtzm2ReVuywjhlHMOGoNE8PX1mDuZvJehSTZAdMD82A5GqHESPU-W8X1_ZhK3K; expires=Tue, 07-Sep-2010 19:03:25 GMT; path=/; domain=.google.com; HttpOnly
S: Server: gws
S: X-XSS-Protection: 0
S: Transfer-Encoding: chunked
S:
S: 0

Take advantage of web servers to learn the time

Issue a request, extract the value of the "Date" header

First Try

 1 class DateReader(LineOnlyReceiver):
 2     def connectionMade(self):
 3         self.transport.write('HEAD / HTTP/1.0\r\n\r\n')
 4 
 5     def lineReceived(self, line):
 6         if line.lower().startswith('date:'):
 7             ignored, date = line.split(':', 1)
 8             print 'Server time is', date.strip()
 9 
10 def main():
11     client = ClientCreator(reactor, DateReader)
12     client.connectTCP('google.com', 80)
13     reactor.run()

connectTCP - create a TCP client - layer on top of the reactor

LineOnlyReceiver makes a come-back

As soon as the "Date:" line is seen, print it out

This code is not re-usable (only useful for printing date)

Separate Protocol From Application

 1 class DateReader(LineOnlyReceiver):
 2     def connectionMade(self):
 3         self.transport.write('HEAD / HTTP/1.0\r\n\r\n')
 4 
 5     def lineReceived(self, line):
 6         if line.lower().startswith('date:'):
 7             ignored, date = line.split(':', 1)
 8             self.dateHeaderReceived(date)
 9 
10     def dateHeaderReceived(self, date):
11         pass
12 
13 class DatePrinter(DateReader):
14     def dateHeaderReceived(self, date):
15         print 'Server time is', date.strip()

Factor application logic into separate class

Protocol part can be re-used elsewhere

Not very convenient to do so, though

Imagine wanting to save dates to a database - can't re-use DatePrinter

For Example, Insert Time Into Database

 1 class DateSaver(DateReader):
 2     def __init__(self, cursor, domain):
 3         self.cursor = cursor
 4         self.domain = domain
 5 
 6     def dateHeaderReceived(self, date):
 7         self.cursor.put(self.domain, date)
 8 
 9 def saveDate(cursor, domain):
10     client = ClientCreator(
11         reactor, DateSaver, cursor, domain)
12     client.connectTCP(domain, 80)
13 
14 def main():
15     cursor = DatabaseCursor(...)
16     saveDate(cursor, 'google.com')
17     reactor.run()

Extra arguments to ClientCreator - passed to DateSaver.init

Somewhat verbose - ie, requires entire new class definition.

Mostly needs repeating for each use-case - rewrite all of DateSaver code and down

Less Boilerplate?

 1 class DateHelper(DateReader):
 2     def __init__(self, dateHeaderReceived):
 3         self.dateHeaderReceived = dateHeaderReceived
 4 
 5 def fetchDate(domain, callback):
 6     client = ClientCreator(reactor, DateHelper, callback)
 7     client.connectTCP(domain, 80)
 8 
 9 def saveDate(cursor, domain):
10     def putDate(date):
11         cursor.put(date)
12     fetchDate(domain, putDate)
13 
14 def main():
15     cursor = DatabaseCursor(...)
16     saveDate(cursor, 'google.com')
17     reactor.run()

Change code to pass callback function in to DateHelper

DateHelper and fetchDate are re-usable, given any date-handling function

Have to make sure to pass callback function through all layers of the application

Nested functions hard to test

How do you know when it's done?

eg, to fetch a second date after the first

How do you know if it fails?

eg, to retry

Getting Two Dates (ie, non-trivial use-case)

 1 def saveDates(cursor, domainA, domainB):
 2     def putDate(date):
 3         cursor.put(domainB, date)
 4 
 5     def putDateAndGetAnother(date):
 6         cursor.put(domainA, date)
 7         fetchDate(domainB, putDate)
 8 
 9     fetchDate(domainA, putDateAndGetAnother)
10 
11 def main():
12     cursor = DatabaseCursor(...)
13     saveDates(cursor, 'google.com', 'twistedmatrix.com')
14     reactor.run()

Get two dates

Hard to pass extra arguments to cursor.put - resort to more nested functions, closures

Some duplicate code because first put date is coupled to second request logic

Passing around functions doesn't work well

What about error handling?

Overall, code is awkward and missing features

Error Handling DateHelper

 1 class DateHelper(DateReader):
 2     def __init__(self, dateHeaderReceived, dateHeaderError):
 3         self._onSuccess = dateHeaderReceived
 4         self._onError = dateHeaderError
 5         self._succeeded = False
 6 
 7     def dateHeaderReceived(self, date):
 8         self._succeeded = True
 9         self._onSuccess(date)
10 
11     def connectionLost(self, reason):
12         if not self._succeeded:
13             self._onError(reason)
14 
15 def fetchDate(domain, callback):
16     client = ClientCreator(reactor, DateHelper, callback)
17     client.connectTCP(domain, 80)

Expand application interface to support second callback

New callback called when there is an error getting the date

Adapted Application Code

 1 def saveDates(cursor, domainA, domainB):
 2     def errorA(reason):
 3         print 'Failed to retrieve date from', domainA
 4 
 5     def errorB(reason):
 6         print 'Failed to retrieve date from', domainB
 7 
 8     def putDate(date):
 9         cursor.put(domainB, date)
10 
11     def putDateAndGetAnother(date):
12         cursor.put(domainA, date)
13 
14         fetchDate(domainB, putDate, errorB)
15 
16     fetchDate(domainA, putDateAndGetAnother, errorA)

Add error callback to all callers

Error callback has no way to know where error came from

.. so define two, errorA knows it was for domainA, etc

What about ClientCreator.connectTCP? What if DNS fails? What if the server refuses the connection?

Best case, define four error callbacks

Proliferation of callback/error callback arguments in APIs is a mess

Easy to forget to accept them or supply them (esp. if they are optional)

APIs can end up inconsistent, error callbacks forgotten, sometimes optional, etc

Don't Repeat Yourself

 1 class Deferred:
 2     def __init__(self):
 3         self.called = False
 4         self.callbacks = []
 5 
 6     def callback(self, result):
 7         self.called = True
 8         self.result = result
 9         self._runCallbacks()
10 
11     def addCallback(self, function, *args):
12         self.callbacks.append((function, args))
13         if self.called:
14             self._runCallbacks()
15 
16     def _runCallbacks(self):
17         while self.callbacks:
18             function, args = self.callbacks.pop(0)
19             self.result = function(self.result, *args)

Public interface: callback, addCallback

addCallback adds a function to call when there is a result

callback supplies the result

callback/addCallback or addCallback/callback - either order same

*args for passing additional arguments to the callback, beyond the ultimate result

_runCallbacks calls each one, in order, with the current result, which changes

That's the substance of it

No use of the reactor

Nothing to automatically make blocking code non-blocking

Just organization for callbacks

Deferred Based DateHelper

 1 class DateHelper(DateReader):
 2     def __init__(self, deferredResult):
 3         self.deferredResult = deferredResult
 4         self._succeeded = False
 5 
 6     def dateHeaderReceived(self, date):
 7         self._succeeded = True
 8         self.deferredResult.callback(date)
 9 
10     def connectionLost(self, reason):
11         if not self._succeeded:
12             self.deferredResult.errback(reason)
13 
14 def fetchDate(hostname):
15     result = Deferred()
16     client = ClientCreator(reactor, DateHelper, result)
17     client.connectTCP(hostname, 80)
18     return result

Very similar to previous version

Differences:

.. Accepts one Deferred instead of two callback functions

.. Calls Deferred callback method on success

.. Calls Deferred errback method on error

All the rest is the same

New fetchDate passes in a Deferred and returns the same Deferred

Protocol code has ref to it, to invoke callback/errback method

Calling code also has it ... (next slide)

Using Deferred

 1 def putDate(date, domain, cursor):
 2     cursor.put(domain, date)
 3 
 4 def saveDates(cursor, domainA, domainB):
 5     result = fetchDate(domainA)
 6     result.addCallback(putDate, domainA, cursor)
 7 
 8     def saveNextDate(ignored):
 9         result = fetchDate(domainB)
10         result.addCallback(putDate, domainB, cursor)
11 
12     result.addCallback(saveNextDate)
13 
14 def main():
15     cursor = DatabaseCursor(...)
16     saveDates(cursor, 'google.com', 'twistedmatrix.com')
17     reactor.run()

Less duplicate code - putDate used twice

Get 2nd date code not mixed with database save code for 1st date

How about error handling?

Handling Errors - Python Function Calls

1 try:
2     result = f()
3 except SomeException, e:
4     # Handle exception
5 else:
6     # Use result

Python function (or any expression) signals results in two different ways

evaluates to a value or ...

raises an exception

Deferreds are similar

Handling Errors - Deferreds

 1 dateDeferred = fetchDate('google.com')
 2 
 3 def useDate(date):
 4     # Use result
 5 
 6 def requestFailed(reason):
 7     # Handle exception
 8 
 9 dateDeferred.addCallback(useDate)
10 dateDeferred.addErrback(requestFailed)

Mentioned earlier ad-hoc callback schemes don't handle errors

Deferred does - errbacks

If DateHelper calls the errback method, the requestFailed errback will be called

Failures

 1 from twisted.python.failure import Failure
 2 
 3 try:
 4     1 / 0
 5 except:
 6     f = Failure()
 7 
 8 f.check(ZeroDivisionError) == ZeroDivisionError
 9 f.check(RuntimeError) == None
10 
11 f.trap(ZeroDivisionError) == ZeroDivisionError
12 f.trap(RuntimeError)         # Raises ZeroDivisionError
13 
14 print f.getTraceback()
15 print f.getErrorMessage()
16 
17 print f.value.message

Not talking about them much today

a Failure instance is what gets passed to any and all errbacks

Represents exception and traceback

Convenience to avoid having to pass around sys.exc_info()-style tuples everywhere

Needed to carry information from point of exception to point of handling

No call stack correspondence between those locations, as with synchronous programs

Failure Example

1 def logAllErrors(reason, domain):
2     print 'Problem fetching date from', domain
3     print reason.getErrorMessage()
4 
5 domain = 'google.com'
6 dateDeferred = fetchDate(domain)
7 dateDeferred.addErrback(reportTimeouts, domain)

Failure reason gives access to what went wrong

Just like the exception variable in an except suite

Simple Implementation - addCallback, addErrback

 1 passthrough = (lambda result: result, ())
 2 
 3 class Deferred:
 4     def addCallback(self, function, *args):
 5         success = (function, args)
 6         self.callbacks.append((success, passthrough))
 7         if self.called:
 8             self._runCallbacks()
 9 
10     def adderrback(self, function, *args):
11         failure = (function, args)
12         self.callbacks.append((passthrough, failure))
13         if self.called:
14             self._runCallbacks()
15 
16     def _runCallbacks(self):
17         while self.callbacks:
18             item = self.callbacks.pop(0)
19             if isinstance(self.result, Failure):
20                 function, args = item[1]
21             else:
22                 function, args = item[0]
23             self.result = function(self.result, *args)

Just keep an extra function alongside the callback

At _runCallbacks time, pick the right kind to run

Discard the other completely

Fill in the holes with passthrough, which preserves the result exactly

Noticing Completion

1 def saveDates(cursor, domainA, domainB):
2     result = fetchDate(domainA)
3     result.addCallback(putDate, domainA, cursor)
4 
5     def saveNextDate(ignored):
6         result = fetchDate(domainB)
7         result.addCallback(putDate, domainB, cursor)
8 
9     result.addCallback(saveNextDate)

Same saveDates implementation, again, using Deferred

How do we know when both dates have been saved?

Deferred carries two pieces of information

What the result (success or failure) is

When that result becomes available; in other words, then the async operation associated with the Deferred is complete.

Need to return result to the caller, so more callbacks can be attached, to get this when information

Return The Deferred

 1 def saveDates(cursor, domainA, domainB):
 2     resultA = fetchDate(domainA)
 3     resultA.addCallback(putDate, domainA, cursor)
 4 
 5     def saveNextDate(ignored):
 6         resultB = fetchDate(domainB)
 7         resultB.addCallback(putDate, domainB, cursor)
 8 
 9     resultA.addCallback(saveNextDate)
10 
11     return resultA

Easy change

This version has a bug

resultA indicates completion of fetchDate(domainA)

does not indicate completion of fetchDate(domainB)

Need to make resultA wait for resultB

New feature for Deferred

Deferred "Chaining"

 1 def saveDates(cursor, domainA, domainB):
 2     resultA = fetchDate(domainA)
 3     resultA.addCallback(putDate, domainA, cursor)
 4 
 5     def saveNextDate(ignored):
 6         resultB = fetchDate(domainB)
 7         resultB.addCallback(putDate, domainB, cursor)
 8         return resultB
 9 
10     resultA.addCallback(saveNextDate)
11 
12     return resultA

Exposed resultA by returning it

Should expose resultB by returning it, too

symmetry is nice

What will this do?

Original _runCallbacks Implementation

1 def _runCallbacks(self):
2     while self.callbacks:
3         function, args = self.callbacks.pop(0)
4         self.result = function(self.result, *args)

errback code omitted for simplicity

When resultB is returned, it becomes self.result

Next callback will be called with resultB - a Deferred

Bad! Don't want a Deferred, want result of a Deferred

Better _runCallbacks Implementation

 1 class Deferred:
 2     ...
 3 
 4     def _continue(self, result):
 5         self.result = result
 6         self._runCallbacks()
 7 
 8     def _runCallbacks(self):
 9         while self.callbacks:
10             function, args = self.callbacks.pop(0)
11             self.result = function(self.result, *args)
12             if isinstance(self.result, Deferred):
13                 self.result.addCallback(self._continue)
14                 break

_runCallbacks checks for Deferreds

Adds _continue as a callback and temporarily abandons callback handling

Makes chaining of unrelated APIs easier

Simple implementation change

Can be challenging to wrap head around, regardless

Life Story of the saveDates Deferred

 1 def saveDates(cursor, domainA, domainB):
 2     resultA = fetchDate(domainA)
 3     resultA.addCallback(putDate, domainA, cursor)
 4 
 5     def saveNextDate(ignored):
 6         resultB = fetchDate(domainB)
 7         resultB.addCallback(putDate, domainB, cursor)
 8         return resultB
 9 
10     resultA.addCallback(saveNextDate)
11 
12     return resultA
13 
14 def stop(ignored):
15     reactor.stop()
16 
17 def main():
18     done = saveDates(cursor, domainA, domainB)
19     done.addCallback(stop)
20     reactor.run()

resultA.callbacks = [saveNextDate]

resultA.callbacks = [saveNextDate, stop]

saveNextDate runs, resultA.callbacks = [stop]

resultB.callbacks = [resultA._continue]

resultB gets a result, continues resultA

stop runs

Use Case: Retries

 1 def fetchFailed(reason, hostname, tries):
 2     if tries > 0:
 3         return fetchDate(hostname, tries)
 4     return reason
 5 
 6 def fetchDate(hostname, tries=3):
 7     result = Deferred()
 8     client = ClientCreator(reactor, DateHelper, result)
 9     client.connectTCP(hostname, 80)
10     result.addErrback(fetchFailed, hostname, tries - 1)
11     return result

fetchDate only changes slightly

fetchFailed errback does retrying transparently to user of fetchDate

Even though it inserts new async operations

Visual Explanation

Deferred States - Initial

Visual Deferred

Create a new Deferred

No result or callbacks

Deferred States - addCallback

Visual Deferred

addCallback adds a callback and a "blank" or "passthrough" errback

Deferred States - addErrback

Visual Deferred

Subsequent addErrback adds a blank callback and an errback

Building an ordered list

addErrback happened second, errback is added second, after callback

Deferred States - addCallbacks

Visual Deferred

addCallbacks puts a callback and an errback at the same level

Deferred States - addBoth

Visual Deferred

addBoth is a shortcut

Deferred States - callback

Visual Deferred

Deferred States - callback

Visual Deferred

Deferred States - callback

Visual Deferred

Deferred States - callback

Visual Deferred

Deferred States - callback

Visual Deferred

Deferred / Synchronous Comparisons: Use A Result

  • Synchronous control flow

    1 result = foo()
    2 result = result + 1
    
  • Deferred Control flow

    1 d = foo()
    2 def addOne(result):
    3     return result + 1
    4 d.addCallback(addOne)
    

Deferred / Synchronous Comparisons: Handle An Error

  • Synchronous control flow

    1 try:
    2     result = foo()
    3 except Exception, e:
    4     print e
    5     result = None
    
  • Deferred Control flow

    1 d = foo()
    2 def squashError(reason):
    3     print reason.getErrorMessage()
    4     return None
    5 d.addErrback(squashError)
    

Deferred / Synchronous Comparisons: Cleanup

  • Synchronous control flow

    1 try:
    2     result = foo()
    3 finally:
    4     cleanup()
    
  • Deferred Control flow

    1 d = foo()
    2 def asyncCleanup(passthrough):
    3     cleanup()
    4     return passthrough
    5 d.addBoth(asyncCleanup)
    

Inline Callbacks

Alternate API For Deferreds

 1 from twisted.python.log import err
 2 from twisted.internet.defer import inlineCallbacks
 3 
 4 from fetchdate import fetchDate
 5 
 6 @inlineCallbacks
 7 def fetchDates(hosts):
 8     for host in hosts:
 9         try:
10             print 'Fetching date on', host
11             date = yield fetchDate(host)
12         except:
13             err(None, "Fetching date from %r" % (host,))
14         else:
15             print 'Date on', host, 'is', date
16             print
17 
18 def main():
19     d = fetchDates(["google.com", "yahoo.com", "localhost"])
20     reactor.run()

Results

Fetching date on google.com
Date on google.com is  Tue, 24 Jan 2012 14:56:12 GMT

Fetching date on yahoo.com
Date on yahoo.com is  Tue, 24 Jan 2012 14:56:12 GMT

Fetching date on localhost
Failed to fetch date from localhost
Traceback (most recent call last):
Failure: twisted.internet.error.ConnectionRefusedError:
    Connection was refused by other side:
        111: Connection refused.

Yield Any Deferred

 1 @inlineCallbacks
 2 def fetchDates(hosts):
 3     for host in hosts:
 4         d = fetchDate(host)
 5         try:
 6             print 'Fetching date on', host
 7             date = yield d
 8         except:
 9             err(None, "Failed to fetch date from %s" % (host,))
10         else:
11             print 'Date on', host, 'is', date
12             print
13 
14 def main():
15     d = fetchDates(["google.com", "yahoo.com", "localhost"])
16     reactor.run()

Really, Any Deferred

 1 @inlineCallbacks
 2 def fetchDates(hosts):
 3     print 'Fetching dates from', hosts
 4     print
 5 
 6     fetches = [fetchDate(host) for host in hosts]
 7 
 8     dates = yield DeferredList(fetches, consumeErrors=True)
 9 
10     for host, (success, date) in zip(hosts, dates):
11         if success:
12             print 'Date on', host, 'is', date
13         else:
14             print "Failed to fetch date from %s" % (host,)
15             date.printTraceback()
16         print
17 
18 def main():
19     d = fetchDates(["google.com", "yahoo.com", "localhost"])
20     reactor.run()

Results, Concurrent

Fetching dates from ['google.com', 'yahoo.com', 'localhost']
Date on google.com is  Tue, 24 Jan 2012 15:17:28 GMT

Date on yahoo.com is  Tue, 24 Jan 2012 15:17:28 GMT

Failed to fetch date from localhost
Traceback (most recent call last):
Failure: twisted.internet.error.ConnectionRefusedError:
    Connection was refused by other side:
        111: Connection refused.

Limitations

Obscures Context Switching (inlineCallbacks)

 1 @inlineCallbacks
 2 def fetchDates(hosts):
 3     for host in hosts:
 4         try:
 5             print 'Fetching date on', host
 6             date = yield fetchDate(host)
 7         except:
 8             err(None, "Failed to fetch date from %s" % (host,))
 9         else:
10             print 'Date on', host, 'is', date
11             print

Obscures Context Switching (addCallback)

 1 def fetchDates(hosts):
 2     host = hosts.pop(0)
 3     d = fetchDate(host)
 4     d.addCallback(reportDate, host)
 5     d.addErrback(err, "Failed to fetch date from %s" % (host,))
 6     if hosts:
 7         d.addCallback(lambda ignored: fetchDates(hosts))
 8     return d
 9 
10 def reportDate(date, host):
11     print 'Date on', host, 'is', date
12     print

Debugger Integration

Very little of it.

(Pdb) next
> inlinecb_2.py(15)fetchDates()
-> print 'Fetching date on', host
(Pdb) next
Fetching date on google.com
> inlinecb_2.py(16)fetchDates()
-> date = yield d
(Pdb) break 20
Breakpoint 1 at inlinecb_2.py:20
(Pdb) c
> inlinecb_2.py(20)fetchDates()
-> print 'Date on', host, 'is', date
(Pdb) p date
' Tue, 24 Jan 2012 15:39:37 GMT'
(Pdb)

Useless Stack

(Pdb) bt
  inlinecb_2.py(29)<module>()
  inlinecb_2.py(26)main()
  twisted/internet/base.py(1162)run()
  twisted/internet/base.py(1174)mainLoop()
  twisted/internet/selectreactor.py(140)doSelect()
  twisted/python/log.py(84)callWithLogger()
  twisted/python/log.py(69)callWithContext()
  twisted/python/context.py(118)callWithContext()
  twisted/python/context.py(81)callWithContext()
  twisted/internet/selectreactor.py(146)_doReadOrWrite()
  twisted/internet/tcp.py(460)doRead()
  twisted/protocols/basic.py(455)dataReceived()
  web_date_12.py(46)lineReceived()
  twisted/internet/defer.py(361)callback()
  twisted/internet/defer.py(455)_startRunCallbacks()
  twisted/internet/defer.py(542)_runCallbacks()
  twisted/internet/defer.py(1076)gotResult()
  twisted/internet/defer.py(1020)_inlineCallbacks()
> inlinecb_2.py(20)fetchDates()
-> print 'Date on', host, 'is', date
(Pdb)

Other Generalizations

  • Complicated implementation (high bugs/line)
  • Bumps into Python runtime limitations (frames and tracebacks)
  • Some performance overhead
  • Requires Python 2.5 (maybe nobody cares anymore)

Specific Gotchas

Tracebacks (1)

Error / errback ordering

 1 def a():
 2     1 / 0
 3 
 4 @inlineCallbacks
 5 def f(d):
 6     yield d
 7 
 8 def error():                 def error():
 9     d = Deferred()               d = Deferred()
10     f(d).addErrback(err)         try:
11     try:                             a()
12         a()                      except:
13     except:                          d.errback()
14         d.errback()              f(d).addErrback(err)

Errback first, error second

 1 Traceback (most recent call last):
 2   File "twisted/internet/defer.py", line 388, in errback
 3     self._startRunCallbacks(fail)
 4   File "twisted/internet/defer.py", line 455, in _startRunCallbacks
 5     self._runCallbacks()
 6   File "twisted/internet/defer.py", line 542, in _runCallbacks
 7     current.result = callback(current.result, *args, **kw)
 8   File "twisted/internet/defer.py", line 1076, in gotResult
 9     _inlineCallbacks(r, g, deferred)
10 --- <exception caught here> ---
11   File "twisted/internet/defer.py", line 1018, in _inlineCallbacks
12     result = result.throwExceptionIntoGenerator(g)
13   File "twisted/python/failure.py", line 350, in throwExceptionIntoGenerator
14     return g.throw(self.type, self.value, self.tb)
15   File "inlinecb_3.py", line 9, in f
16     yield d
17   File "inlinecb_3.py", line 15, in error
18     a()
19   File "inlinecb_3.py", line 5, in a
20     1 / 0
21 exceptions.ZeroDivisionError:
22     integer division or modulo by zero

Error first, errback second

 1 Unhandled Error
 2 Traceback (most recent call last):
 3   File "inlinecb_3.py", line 37, in <module>
 4     error()
 5 --- <exception caught here> ---
 6   File "inlinecb_3.py", line 24, in error
 7     a()
 8   File "inlinecb_3.py", line 5, in a
 9     1 / 0
10 exceptions.ZeroDivisionError:
11     integer division or modulo by zero

Either way, ancient version of Twisted

 1 Traceback (most recent call last):
 2   File "inlinecb_3.py", line 35, in <module>
 3     error()
 4   File "inlinecb_3.py", line 32, in error
 5     f(d).addErrback(log.err)
 6   File "twisted-2.5.0/twisted/internet/defer.py", line 813, in unwindGenerator
 7     return _inlineCallbacks(None, f(*args, **kwargs), Deferred())
 8 --- <exception caught here> ---
 9   File "twisted-2.5.0/twisted/internet/defer.py", line 724, in _inlineCallbacks
10     result = g.throw(result.type, result.value, result.tb)
11   File "inlinecb_3.py", line 9, in f
12     yield d
13 exceptions.ZeroDivisionError:
14     integer division or modulo by zero

Misused returnValue

Simple inlineCallbacks usage

 1 def asyncOperation():
 2     return succeed(1)
 3 
 4 @inlineCallbacks
 5 def thunk():
 6     value = yield asyncOperation()
 7     if value:
 8         returnValue(value)
 9 
10 @inlineCallbacks
11 def main():
12     print "Result:", (yield thunk())

Output:

Result: 1

Then some refactoring

 1 def asyncOperation():
 2     return succeed(1)
 3 
 4 def thunk2(value):
 5     if value:
 6         returnValue(value) # !!!
 7 
 8 @inlineCallbacks
 9 def thunk():
10     value = yield asyncOperation()
11     returnValue((yield thunk2(value)) + 1)
12 
13 @inlineCallbacks
14 def main():
15     print "Result:", (yield thunk())

Output:

Result: 1

Enable Warnings

1 $ python -W::DeprecationWarning: inlinecb_4b.py
2 inline-callbacks/inlinecb_4b.py:8: DeprecationWarning:
3     returnValue() in 'thunk2' causing 'thunk' to exit:
4     returnValue should only be invoked by functions decorated
5     with inlineCallbacks
6   returnValue(value)
7 Result: 1

Still wrong! Pretty good pointer to where the bug is, though.

Captured returnValue

 1 def asyncOperation():
 2     return succeed(1)
 3 
 4 @inlineCallbacks
 5 def thunk():
 6     try:
 7         value = yield asyncOperation()
 8         if value:
 9             returnValue(value)
10     except:
11         log.err(None, "Some asyncOperation failure")
12 
13 @inlineCallbacks
14 def main():
15     print "Result:", (yield thunk())

Prevents Correct Return

 1 Some asyncOperation error
 2 Traceback (most recent call last):
 3   File "twisted/internet/defer.py", line 1020, in _inlineCallbacks
 4     result = g.send(result)
 5   File "inlinecb_4c.py", line 18, in main
 6     print "Result:", (yield thunk())
 7   File "twisted/internet/defer.py", line 1141, in unwindGenerator
 8     return _inlineCallbacks(None, f(*args, **kwargs), Deferred())
 9   File "twisted/internet/defer.py", line 1020, in _inlineCallbacks
10     result = g.send(result)
11 --- <exception caught here> ---
12   File "inlinecb_4c.py", line 12, in thunk
13     returnValue(value)
14   File "twisted/internet/defer.py", line 997, in returnValue
15     raise _DefGen_Return(val)
16 twisted.internet.defer._DefGen_Return:
17 Result: None

Tracebacks (2)

try/finally

 1 @inlineCallbacks
 2 def f():
 3     try:
 4         yield g()
 5     finally:
 6         yield succeed(1)
 7 
 8 @inlineCallbacks
 9 def g():
10     # Force it to still be a generator
11     if False:
12         yield
13 
14     # Make it fail
15     1 / 0
16 
17 def error():
18     f().addErrback(err)

Truncated traceback

 1 Traceback (most recent call last):
 2   File "inlinecb_5.py", line 24, in <module>
 3     error()
 4   File "inlinecb_5.py", line 21, in error
 5     f().addErrback(log.err)
 6   File "twisted/internet/defer.py", line 1187, in unwindGenerator
 7     return _inlineCallbacks(None, gen, Deferred())
 8 --- <exception caught here> ---
 9   File "twisted/internet/defer.py", line 1045, in _inlineCallbacks
10     result = g.send(result)
11   File "inlinecb_5.py", line 7, in f
12     yield g()
13 exceptions.ZeroDivisionError:
14     integer division or modulo by zero

The last frame is lost.

Tracebacks (3)

traceback.print_exc()

 1 import traceback
 2 
 3 def a():
 4    1 / 0
 5 
 6 @inlineCallbacks
 7 def b():
 8     try:
 9         result = yield maybeDeferred(a)
10     except:
11         traceback.print_exc()

Missing Bottom Frames

1 Traceback (most recent call last):
2   File "inlinecb_6a.py", line 11, in b
3     result = yield defer.maybeDeferred(a)
4 ZeroDivisionError: integer division or modulo by zero

Failure.printTraceback

 1 from twisted.python.failure import Failure
 2 
 3 def a():
 4    1 / 0
 5 
 6 @inlineCallbacks
 7 def b():
 8     try:
 9         result = yield maybeDeferred(a)
10     except:
11         f = Failure()
12         f.printTraceback()

More Information (Some Useful)

 1 Traceback (most recent call last):
 2   File "inlinecb_6b.py", line 17, in <module>
 3     b()
 4   File "twisted/internet/defer.py", line 1187, in unwindGenerator
 5     return _inlineCallbacks(None, gen, Deferred())
 6   File "twisted/internet/defer.py", line 1045, in _inlineCallbacks
 7     result = g.send(result)
 8   File "inlinecb_6b.py", line 12, in b
 9     result = yield defer.maybeDeferred(a)
10 --- <exception caught here> ---
11   File "twisted/internet/defer.py", line 134, in maybeDeferred
12     result = f(*args, **kw)
13   File "inlinecb_6b.py", line 7, in a
14     1 / 0
15 exceptions.ZeroDivisionError:
16     integer division or modulo by zero

Questions?

Corotwine

Coroutines And Twisted

  • Coroutine-based interface to some Twisted APIs
  • Including Deferreds
  • https://launchpad.net/corotwine

Echo Server, Corotwine Style

 1 from corotwine.protocol import gListenTCP
 2 
 3 def echo(transport):
 4     while 1:
 5         try:
 6             transport.write(transport.read())
 7         except ConnectionClosed:
 8             return
 9 
10 gListenTCP(12345, echo)
11 
12 reactor.run()

Looks even more like blocking code than inlineCallbacks

Context switches are even better hidden

They're still there, though

Supports concurrent clients, single threaded

Uses third-party greenlet module

Corotwine Deferred Support

 1 from previous_example import fetchDate
 2 
 3 from corotwine.defer import blockOn, deferredGreenlet
 4 
 5 def putDate(date, domain, cursor):
 6     cursor.put(domain, date)
 7 
 8 @deferredGreenlet
 9 def saveDates(cursor, domainA, domainB):
10     putDate(blockOn(fetchDate(domainA)), domainA)
11     putDate(blockOn(fetchDate(domainB)), domainB)

blockOn suspends execution of the coroutine until the Deferred fires

Decision in Corotwine to be explicit about context switch

Questions?

Conclusions

Event-Driven Programming

  • Simpler than multithreading
  • More performant than single-tasking
  • Just a different way to organize program flow

No pre-emptive context switching

Critical sections are easily recognized - ie, a function body

Locks are rarely necessary

Event-Driven Programming

  • Simpler than multithreading
  • More performant than single-tasking and multi-threading
  • Just a different way to organize program flow

Allows concurrent use of physical resources, cpu, net, etc

Can wait for many operations to complete simultaneously, not just one at a time

Avoids platform-level thread switching overhead; uses less memory than threads

Event-Driven Programming

  • Simpler than multithreading
  • More performant than single-tasking and multi-threading
  • Just a different way to organize program flow

Nothing magical

Just functions that get called to handle some input

Event Loop / Main Loop / Reactor

  • Just a loop

Just a loop

Get inputs, handle them, maybe generate outputs

Deferreds

  • Barely more than a list of functions to call
  • Uniform callback interface
  • Not magic

Simple implementation - almost fits on one slide

Just iterates over the list calling functions from it

Deferreds

  • Barely more than a list of functions to call
  • Uniform callback interface
  • Not magic

Underlying event source can be anything

Caller just gets a Deferred and uses it normally

Event source can change, application code unaffected

Standardizes success/failure handling for all APIs

Deferreds

  • Barely more than a list of functions to call
  • Uniform callback interface
  • Not magic

Just a list of functions and a for loop

Not directly supported by the reactor

Inline Callbacks

  • Alternative API for working with Deferreds
  • Usage makes code look more traditional (tradition is overrated)
  • Actual execution is still event-driven

Still operating on Deferreds

Just hides addCallback and addErrback behind "yield"

inlineCallbacks decorator affects code inside decorated function

Code outside decorated function still just sees a Deferred

Inline Callbacks

  • Alternative API for working with Deferreds
  • Usage makes code look more traditional (tradition is overrated)
  • Actual execution is still event-driven

Code looks like blocking code, with extra yield keywords here and there

Making non-blocking code look blocking might sound good

Inline Callbacks

  • Alternative API for working with Deferreds
  • Usage makes code look more traditional (tradition is overrated)
  • Actual execution is still event-driven

Fundamentally, obscures behavior, hurts readability

Corotwine, Stackless

  • Another alternate API for working with Deferreds
  • Also alternate API for operating on connections
  • Like inlineCallbacks, but more so

All inlineCallbacks comments still apply, but moreso.

Questions?