Opened 12 years ago

Closed 10 years ago

#1042 enhancement closed fixed (fixed)

Add blockingCallFromThread to twisted.internet.threads

Reported by: spiv Owned by:
Priority: highest Milestone:
Component: core Keywords:
Cc: Jean-Paul Calderone, spiv, jknight, therve Branch:
Author:

Description


Change History (14)

comment:1 Changed 12 years ago by spiv

I recently found a use for this at work, it's general enough it really should
live in the core.

Implementation here:
http://twistedmatrix.com/pipermail/twisted-python/2003-September/005725.html

It should have tests and documentation added, of course.

comment:2 Changed 12 years ago by Jean-Paul Calderone

cf sandbox/exarkun/threadwrapper.py

comment:3 Changed 12 years ago by spiv

There's a minor bug in your ThreadWrapper: ThreadWrapper(foo).bar.baz will fail
if bar happens to be callable.

It's also a little bit too magical for my tastes; it works nicely in the case
that attribute access is fast and only function calls need to special treatment,
but in this crazy modern world of properties and the like this won't always
work, and "mostly, except for these things" solutions worry me.

So I like the idea, but I think the simpler and more predictable
blockingCallFromThread should be available even if we add ThreadWrapper.  And
blockingCallFromThread should definitely borrow ThreadWrapper's Deferred handling.

comment:4 Changed 12 years ago by spiv

Here's a version of blockingCallFromThread that:
  - actually calls the func (oops!)
  - accepts *args and **kwargs.

import threading, sys
from twisted.internet import reactor

def blockingCallFromThread(func, *args, **kwargs):
    e = threading.Event()
    l = []
    def wrapped_func():
        try:
            l.append(func(*args, **kwargs))
        except:
            l.append(sys.exc_info())
            l.append(0)
        else:
            l.append(1)
        e.set()
    reactor.callFromThread(wrapped_func)
    e.wait()
    result, ok = l
    if ok:
        return result
    else:
        # Whee!  Cross-thread exceptions!
        raise result[0], result[1], result[2]

comment:5 Changed 12 years ago by jknight

I (re-re-)implemented this for twisted.web2.wsgi.

import Queue
from twisted.python import failure
from twisted.internet import defer

def callInReactor(__f, *__a, **__kw):
    from twisted.internet import reactor
    queue = Queue.Queue()
    reactor.callFromThread(__callFromThread, queue, __f, __a, __kw)
    result = queue.get()
    if isinstance(result, failure.Failure):
        result.raiseException()
    return result

def __callFromThread(queue, f, a, kw):
    result = defer.maybeDeferred(f, *a, **kw)
    result.addBoth(queue.put)

comment:6 Changed 12 years ago by spiv

It looks like Chandler has something similar, too:
http://svn.osafoundation.org/zanshin/trunk/src/zanshin/util.py

comment:7 Changed 12 years ago by spiv

Updated version of my blockingCallFromThread that's shorter and handles Deferreds:

import threading, sys
from twisted.internet import defer, reactor
from twisted.python.failure import Failure

def blockingCallFromThread(func, *args, **kwargs):
    e = threading.Event()
    l = []
    def _got_result(result):
        l.append(result)
        e.set()
        return None
    def wrapped_func():
        d = defer.maybeDeferred(func, *args, **kwargs)
        d.addBoth(_got_result)
    reactor.callFromThread(wrapped_func)
    e.wait()
    result = l[0]
    if isinstance(result, Failure):
        # Whee!  Cross-thread exceptions!
        result.raiseException()
    else:
        return result

comment:8 Changed 10 years ago by therve

Cc: therve added
Keywords: review added
Owner: spiv deleted
Priority: normalhighest

Ready to review in blockingcallfromthread-1042.

comment:9 Changed 10 years ago by Jean-Paul Calderone

Owner: set to Jean-Paul Calderone
Status: newassigned

comment:10 Changed 10 years ago by Jean-Paul Calderone

Keywords: review removed
Owner: changed from Jean-Paul Calderone to therve
Status: assignednew
  • test_blockingCallFromThread docstring could use some help. Try to describe the behavior of blockingCallFromThread that the test method is actually exercising and verifying.
  • test_blockingCallFromThread's cb1 shouldn't wait with a timeout of 1. A loaded machine may be incapable of getting through the test this quickly. Since wait can't be interrupted except by a timeout, the timeout should be used, but it should be trial's default timeout value. It'd be best if this could be done in some way like deferToThread(waiter.wait, self.perMethodTimeout, but I don't know if the value is available via any public attribute.
  • in test_blockingCallFromThread, reactorFunc might as well return defer.succeed("foo"). This will deterministically exercise the already-fired Deferred codepath through blockingCallFrom thread, rather than jumping between that case and the not-already-fired case depending on how timing goes. It'd be good to also have a test which deterministically exercises the not-already-fired case, too.
  • Same comment for test_errorBlockingCallFromThread, wrt already-fired vs not-already-fired.
  • blockingCallFromThread's docstring should document its parameters, return value, and raised exceptions. It's probably also worth using some more explicit language than "synchronously" to explain that it will not return until a result is available.
  • In the example in the howto, how about doing something that's really async? eg, getPage()

comment:11 Changed 10 years ago by therve

Keywords: review added
Owner: therve deleted

Applied review comments, thanks!

comment:12 Changed 10 years ago by Jean-Paul Calderone

Keywords: review removed
Owner: set to therve

I tweaked the howto a little bit. The rest looks good. Merge if you're happy with the howto changes.

comment:13 Changed 10 years ago by therve

Resolution: fixed
Status: newclosed

(In [20509]) Merge blockingcallfromthread-1042

Author: therve Reviewer: exarkun Fixes #1042

Add a new blockingCallFromThread function to twisted.internet.threads, allowing a thread to call an asynchronous reactor function synchronously. Add a bit of documentation to explain it, and replace the existing method used in web2.wsgi.

comment:14 Changed 6 years ago by <automation>

Owner: therve deleted
Note: See TracTickets for help on using tickets.