[Twisted-Python] possible error in twisted app
Jonathan Vanasco
twisted-python at 2xlp.com
Fri Jan 17 19:22:52 MST 2014
a portion of my twisted app is having some problems. i think i figured out the issue -- but if I'm right.. i'll be a bit lost.
this portion of the app is essentially a web scraper. it grabs a batch of X urls from a data broker , and then updates a database with data about the URL ( which either comes from an oEmbed endpoint , a third party data provider, or scraping the page if needed )
there's a lot of code that would be messy to follow, so i'll just explain it as best as possible, and provide some highlights.
the underlying logic is basically this:
reactor starts an UpdateLinksService, that checks for new batches every 30 seconds
the UpdateLinksService has an internal marker to check if it's still processing the last batch - or if it's safe to process
to process the urls, the UpdateLinksService runs them in a request wrapper , that is supposed to be run through a defer.DeferredSemaphore() service
when i'm done with the batch, i clear out internal marker via a `deferred_list_finish` method.
looking at some aggressive debugging output, it looks like my work to process a url is happening /after/ i call deferred_list_finish.
in other words, i've somehow structured this so that i'm instantly finished.
i *thought* i was running out of memory because i had some phantom deferreds running around. now i'm starting to think that i'm just stacking the queue faster than i work on it.
i've tried changing things around and using different return values, but then started getting "exceptions.AssertionError:" because "assert not isinstance(result, Deferred)" ( twisted/internet/defer.py", line 381, in callback )
the following is a rough composite of what is going on. if anyone sees an obvious fix, i'd be greatly appreciative.
thanks!
=================
class UpdateLinksService():
def process_urls(self, urls):
requests = []
for url in urls:
wrapper = requestWrapper( self.semaphoreService, dbPool )
d = wrapper.queue_url(url)
updates.append(d)
self.d_list = defer.DeferredList( updates )\
.addCallback( self.deferred_list_finish )
class RequestWrapper():
def __init__(self, semaphore_service, dbPool):
self.semaphoreService=semaphore_service
self.dbPool = dbPool
def queue_url( self, url ):
self.url = url
d = self.semaphoreService.run( self._to_thread )
return d
def _to_thread( self ):
d = threads.deferToThread( self._thread_begin )
return d
def _thread_begin(self):
worker = UrlWorker()
d = self.dbPool.runInteraction( worker.process_url , self.url )
class UrlWorker():
def process_url(self,txn, url):
#blocking stuff
return True/False
The reason why I have _to_thread + _thread_begin as 2 functions, and UrlWoker separate is for code re-use.
The RequestWrapper functions are mostly all in a base class; i just subclass RequestWrapper and override _thread_begin and an error callback (not shown)
UrlWorker's various methods are used througout my twisted daemon.
More information about the Twisted-Python
mailing list