[Twisted-Python] More Questions about Custom Reactors

Phil Mayers p.mayers at imperial.ac.uk
Sun Jan 21 19:32:16 EST 2007


Andrew Francis wrote:
> Hello Colleagues :
> 
> I am trying to write a reactor for Stackless Python. I
> posted previously here and in the Stackless mailing
> list but I am still stuck.  My problems are in the
> following areas :

Perhaps you could explain what you're actually trying to achieve, rather 
than the solution you have decided on? An indeed, why you want to use 
stackless at all - Twisted is predicated around the assumption of a 
stackful i.e. single-threaded Python.

Since you're using stackless, I assume you want to use the 
microthreading capabilities to make async code look synchronous?

Alternatively you might be wanting long-running jobs to be interleaved 
with other code transparently but without using real threads?

If you're proposing using queues to communicate between a Twisted thread 
running the reactor and a stackless thread running your micro-threads, I 
don't see why you need to modify Twisted at all. You can just run the 
stackless microthreads from one "real" thread (started via 
callInThread), pass jobs from Twisted to the "real" thread and probably 
a stackless uthread, and pass the replies back to the twisted thread 
with a callFromThread.

So, can you not just do this?:

stacklessJobs = threading.deque()
stacklessReplies = {}

def queueToStackless(job):
     """returns a deferred which is fired with the job result"""
     while True:
         # generate a unused ID
         i = random.randint()
         if not i in stacklessReplies:
             d = stacklessReplies[i] = defer.Deferred()
             break
     stacklessJobs.append((job, i))
     return d

def replyFromStackless(idx, result):
     """Called in the reactor thread by the stackless thread"""
     d = stacklessReplies.pop(idx)
     d.callback(result)

def workfunc(job, idx):
     """Called via some kind of stackless uthread scheduler"""

     # do some stuff
     for thing in job:
         pass
     result = 'foo'

     # this is the ONLY THREAD SAFE reactor call...
     reactor.callFromThread(replyFromStackless, idx, result)

def emptyQueue(sched):
     """a stackless uthread, runs forever, empties the job pool"""
     while True:
         # replace with a stackless equivalent?
         job, i = stacklessJobs.pop()
         sched.add(workfunc, job, i)

def stacklessWork():
     """The stackless scheduler, runs in the 2nd thread"""
     sched = stacklessUthreadScheduler()
     sched.add(emptyQueue, sched)
     while True:
         sched.doThing()

# On startup, begin the stackless scheduler
reactor.callInThread(stacklessWork)



> 
> 1. How does the custom reactor drive Twisted?
> 2. How does self.interleave() work?
> 3. How to use deferreds if Twisted is running in a
> seperate thread. Is callFromThread() used?
> 4. Some custom reactor implementations  inherit and
> some reactors simply install say
> threadedselectorreactor? Which technique is
> recommended?
>  
> For now, I am operating under the assumption that
> Stackless Python and Twisted ought to be executed  in
> seperate threads. This is because if Twisted blocks
> then Stackless blocks. Since I am taking this
> approach, I am assuming I will have to use
> threadedselectorreactor.
> 
> I want my Stackless Python application to do two
> things :
> 
> 1. Process HTTP based SOAP server requests
> 2. From the Stackless thread, make client.getPage()
> calls and use a deferred to process the result.
> 
> To promote decoupling, I want Twisted and Stackless to
> communicate via queues. So my code looks as follows -
> 
> 
> def processResponse(self, httpServer):
>      reply = self.myQueue.get()
>      httpServer.write(reply.toxml().encode("utf-8"))  
>             
>      httpServer.finish()
>      return
> 
> example 2.
> 
> postRequest = client.getPage(self.address, method =
> 'POST', headers = self.headers, postdata =
> self.body)postRequest.addCallback(self.__handleResponse__).addErrback(self.__handleError__)
>     
> 
> def __handleResponse__(self, pageData):
>          self.queue.put(pageData)
> 
> I have looked at the reactors included in the
> twisted.internet. I also looked at Blockingdemo.py. I
> find BlockingDemo.py confusing (i.e., where is poll()
> called? It does not seem to do anything.)
> 
> Question about interleave():
> 
> """
> Taken from threadedselectorreactor  
> 
> In order for Twisted to do its work in the main thread
> (the thread that interleave is called from), a waker
> function is necessary.  The waker function will be
> called from a "background" thread with one argument:
> func. The waker function's purpose is to call func()
> from the main thread. Many GUI toolkits ship with
> appropriate waker functions.
> """
> 
> Using wxReactor as a basis, if the following is my
> custom Stackless reactor's run method
> 
> def run(self):
>       self.startRunning(...)
>       self.interleave(some_waker_function)
>       self.stacklessApplication.Loop()    
> 
> 1. How does this drive Twisted (in wxReactor there
> seems to be a second loop that sleeps every
> millisecond)? .
> 2. What should the "some_waker_function" be doing? Is
> the interleave() method always necessary?
> 3. What is the background thread? Twisted? A
> workerThread in threadedselectorreactor?
> 4. To use client.getPage() and callbacks from
> Stackless to a Twisted reactor running in a thread,
> must I use callFromThread? Will the Stackless thread
> block?
> 
> Once again, any advice would be appreciated. I will
> happily summarise on the Stackless Wiki. Sorry for
> the long message.
> 
> Thank you,
> Andrew 
> 
> 
>     
> 
> 
>  
> ____________________________________________________________________________________
> Do you Yahoo!?
> Everyone is raving about the all-new Yahoo! Mail beta.
> http://new.mail.yahoo.com
> 
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python





More information about the Twisted-Python mailing list