Opened 10 years ago

Closed 8 years ago

#1042 enhancement closed fixed (fixed)

Add blockingCallFromThread to twisted.internet.threads

Reported by: spiv Owned by:
Priority: highest Milestone:
Component: core Keywords:
Cc: exarkun, spiv, jknight, therve Branch:
Author: Launchpad Bug:

Description


Change History (14)

comment:1 Changed 10 years ago by spiv

I recently found a use for this at work, it's general enough it really should
live in the core.

Implementation here:
http://twistedmatrix.com/pipermail/twisted-python/2003-September/005725.html

It should have tests and documentation added, of course.

comment:2 Changed 10 years ago by exarkun

cf sandbox/exarkun/threadwrapper.py

comment:3 Changed 10 years ago by spiv

There's a minor bug in your ThreadWrapper: ThreadWrapper(foo).bar.baz will fail
if bar happens to be callable.

It's also a little bit too magical for my tastes; it works nicely in the case
that attribute access is fast and only function calls need to special treatment,
but in this crazy modern world of properties and the like this won't always
work, and "mostly, except for these things" solutions worry me.

So I like the idea, but I think the simpler and more predictable
blockingCallFromThread should be available even if we add ThreadWrapper.  And
blockingCallFromThread should definitely borrow ThreadWrapper's Deferred handling.

comment:4 Changed 10 years ago by spiv

Here's a version of blockingCallFromThread that:
  - actually calls the func (oops!)
  - accepts *args and **kwargs.

import threading, sys
from twisted.internet import reactor

def blockingCallFromThread(func, *args, **kwargs):
    e = threading.Event()
    l = []
    def wrapped_func():
        try:
            l.append(func(*args, **kwargs))
        except:
            l.append(sys.exc_info())
            l.append(0)
        else:
            l.append(1)
        e.set()
    reactor.callFromThread(wrapped_func)
    e.wait()
    result, ok = l
    if ok:
        return result
    else:
        # Whee!  Cross-thread exceptions!
        raise result[0], result[1], result[2]

comment:5 Changed 10 years ago by jknight

I (re-re-)implemented this for twisted.web2.wsgi.

import Queue
from twisted.python import failure
from twisted.internet import defer

def callInReactor(__f, *__a, **__kw):
    from twisted.internet import reactor
    queue = Queue.Queue()
    reactor.callFromThread(__callFromThread, queue, __f, __a, __kw)
    result = queue.get()
    if isinstance(result, failure.Failure):
        result.raiseException()
    return result

def __callFromThread(queue, f, a, kw):
    result = defer.maybeDeferred(f, *a, **kw)
    result.addBoth(queue.put)

comment:6 Changed 9 years ago by spiv

It looks like Chandler has something similar, too:
http://svn.osafoundation.org/zanshin/trunk/src/zanshin/util.py

comment:7 Changed 9 years ago by spiv

Updated version of my blockingCallFromThread that's shorter and handles Deferreds:

import threading, sys
from twisted.internet import defer, reactor
from twisted.python.failure import Failure

def blockingCallFromThread(func, *args, **kwargs):
    e = threading.Event()
    l = []
    def _got_result(result):
        l.append(result)
        e.set()
        return None
    def wrapped_func():
        d = defer.maybeDeferred(func, *args, **kwargs)
        d.addBoth(_got_result)
    reactor.callFromThread(wrapped_func)
    e.wait()
    result = l[0]
    if isinstance(result, Failure):
        # Whee!  Cross-thread exceptions!
        result.raiseException()
    else:
        return result

comment:8 Changed 8 years ago by therve

  • Cc therve added
  • Keywords review added
  • Owner spiv deleted
  • Priority changed from normal to highest

Ready to review in blockingcallfromthread-1042.

comment:9 Changed 8 years ago by exarkun

  • Owner set to exarkun
  • Status changed from new to assigned

comment:10 Changed 8 years ago by exarkun

  • Keywords review removed
  • Owner changed from exarkun to therve
  • Status changed from assigned to new
  • test_blockingCallFromThread docstring could use some help. Try to describe the behavior of blockingCallFromThread that the test method is actually exercising and verifying.
  • test_blockingCallFromThread's cb1 shouldn't wait with a timeout of 1. A loaded machine may be incapable of getting through the test this quickly. Since wait can't be interrupted except by a timeout, the timeout should be used, but it should be trial's default timeout value. It'd be best if this could be done in some way like deferToThread(waiter.wait, self.perMethodTimeout, but I don't know if the value is available via any public attribute.
  • in test_blockingCallFromThread, reactorFunc might as well return defer.succeed("foo"). This will deterministically exercise the already-fired Deferred codepath through blockingCallFrom thread, rather than jumping between that case and the not-already-fired case depending on how timing goes. It'd be good to also have a test which deterministically exercises the not-already-fired case, too.
  • Same comment for test_errorBlockingCallFromThread, wrt already-fired vs not-already-fired.
  • blockingCallFromThread's docstring should document its parameters, return value, and raised exceptions. It's probably also worth using some more explicit language than "synchronously" to explain that it will not return until a result is available.
  • In the example in the howto, how about doing something that's really async? eg, getPage()

comment:11 Changed 8 years ago by therve

  • Keywords review added
  • Owner therve deleted

Applied review comments, thanks!

comment:12 Changed 8 years ago by exarkun

  • Keywords review removed
  • Owner set to therve

I tweaked the howto a little bit. The rest looks good. Merge if you're happy with the howto changes.

comment:13 Changed 8 years ago by therve

  • Resolution set to fixed
  • Status changed from new to closed

(In [20509]) Merge blockingcallfromthread-1042

Author: therve
Reviewer: exarkun
Fixes #1042

Add a new blockingCallFromThread function to twisted.internet.threads, allowing
a thread to call an asynchronous reactor function synchronously. Add a bit of
documentation to explain it, and replace the existing method used in web2.wsgi.

comment:14 Changed 4 years ago by <automation>

  • Owner therve deleted
Note: See TracTickets for help on using tickets.