[Twisted-Python] Running CPU bound function concurrently in twisted

Chengi Liu chengi.liu.86 at gmail.com
Mon Jun 24 17:26:33 MDT 2019


Thanks Moshe & Meejah & Gelin for the suggestions and advice. This is super
helpful.

I think, I am able to move forward with this.
Let me just summarize this..
My usecase is.. fetch the data.. and then you assemble the data. Fetching
data is network bound, assembling data is like CPU bound.

Can you guys confirm if what I am doing makes sense?

def compute_heavy_function(x):
    return x*x

@defer.inlinecallbacks
def network_get(x):
     x = yield treq.get('http://localhost:8081')
   content = yield x.content()
   defer.returnValue(val)

@defer.inlinecallbacks
def twisted_do_your_magic():
     nets, cpus = [], []
      for i in range(10):
             t = defer.ensureDeferred(network_get(i))
              nets.append(t)
             d = threads.deferToThread(compute_heavy_function, i)
             cpus.append(d)

     cpu_res = yield defer.gatherResults(cpus)
  network_res = yield defer.gatherResults(nets)
  defer.returnValue({'cpu': cpu_res, 'network': network_res})

if __name__ == '__main__':
    twisted_do_your_magic()
        reactor.callLater(2, reactor.stop)
    reactor.run()


I ran it locally.. it seems to be running fine. But just want to make sure
that I got the concept on what to deferToThread & what to "ensureDeferred".
>From the SO, I got the impression that network based IO benefits from
`deferToThread` but from video tutorial.. I got the impression that
ensureDefer followed by gatherResults seems to be the right way to go?



Moshe.. One last question..
I was trying to follow the tutorial on video lecture..
But, I wasnt able to make it run on python3.


Say, I have an async function

async def foo():
     resp = await treq.get("localhost:1234/foo")
     content = await resp.content()
     return json.loads(content.decode("utf-8")

async def func():
     d1 = defer.ensureDeffered(foo())
     d2 = defer.ensureDeffered(foo())
     res = await defer.gatherResults([d1, d2])
     return res

if __name__ == '__main__'
     x = func()
     reactor.callLater(2, reactor.stop)
  reactor.run()

In this case, I get an error (x = func() in main code block)..
 RuntimeWarning: coroutine 'func' was never awaited
How do i fix this.

Again, thanks for all the help, support and advice in getting me started
with twisted.



On Mon, Jun 24, 2019 at 3:49 PM meejah <meejah at meejah.ca> wrote:

>
> As a clarification to the above, parallelization of Python code across
> cores is not unique to Twisted; all Python code has this same
> limitation.
>
> To use multiple cores with Python code, you need multiple Python
> processes (as has been pointed out). One way to achieve this is to have
> the multiple processes talking to each other (using some kind of RPC
> protocol).
>
> Another way is to simply spawn some number of subprocesses (and Twisted
> has good support for running subprocesses). So, for example, if you
> write a CLI tool that can be told to run "part of your problem" then
> your parent Twisted process can simply spawn some number of those with
> appropriate arguments to split up the problem (e.g. give each process 1
> / num_cores of the problem). This will incur some startup penalty as
> each process starts up (especially if you're using PyPy, which you
> should be if you care about speed) but is way simpler.
>
> Obviously, an RPC-style communication system avoids the startup penalty
> (but can be more complex).
>
> --
> meejah
>
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> https://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: </pipermail/twisted-python/attachments/20190624/38b7b3db/attachment-0002.html>


More information about the Twisted-Python mailing list