[Twisted-Python] possible error in twisted app

Jonathan Vanasco twisted-python at 2xlp.com
Fri Jan 17 19:22:52 MST 2014


a portion of my twisted app is having some problems.  i think i figured out the issue -- but if I'm right.. i'll be a bit lost.

this portion of the app is essentially a web scraper.  it grabs a batch of X urls from a data broker , and then updates a database with data about the URL ( which either comes from an oEmbed endpoint , a third party data provider, or scraping the page if needed )

there's a lot of code that would be messy to follow, so i'll just explain it as best as possible, and provide some highlights.

the underlying logic is basically this:

	reactor starts an UpdateLinksService, that checks for new batches every 30 seconds
	the UpdateLinksService has an internal marker to check if it's still processing the last batch - or if it's safe to process

	to process the urls, the UpdateLinksService runs them in a request wrapper , that is supposed to be run through a defer.DeferredSemaphore() service

	when i'm done with the batch, i clear out internal marker via a `deferred_list_finish` method.

looking at some aggressive debugging output, it looks like my work to process a url is happening /after/ i call deferred_list_finish.  

in other words, i've somehow structured this so that i'm instantly finished.  

i *thought* i was running out of memory because i had some phantom deferreds running around.  now i'm starting to think that i'm just stacking the queue faster than i work on it.

i've tried changing things around and using different return values, but then started getting "exceptions.AssertionError:" because "assert not isinstance(result, Deferred)" ( twisted/internet/defer.py", line 381, in callback )

the following is a rough composite of what is going on.  if anyone sees an obvious fix, i'd be greatly appreciative.  

thanks!

=================

class UpdateLinksService():

	def process_urls(self, urls):
		requests = []
		for url in urls:
			wrapper = requestWrapper( self.semaphoreService, dbPool ) 
			d = wrapper.queue_url(url)
			updates.append(d)
                self.d_list = defer.DeferredList( updates )\
                    .addCallback( self.deferred_list_finish )


class RequestWrapper():

	def __init__(self, semaphore_service, dbPool):
		self.semaphoreService=semaphore_service
     	self.dbPool = dbPool

    def queue_url( self, url ):
        self.url = url
        d = self.semaphoreService.run( self._to_thread )
        return d

    def _to_thread( self ):
        d = threads.deferToThread( self._thread_begin )
        return d

    def _thread_begin(self):
    	worker = UrlWorker()
        d = self.dbPool.runInteraction( worker.process_url , self.url )


class UrlWorker():

	def process_url(self,txn, url):
		#blocking stuff
		return True/False					
		
		
The reason why I have _to_thread + _thread_begin  as 2 functions, and UrlWoker separate is for code re-use.

The RequestWrapper functions are mostly all in a base class; i just subclass RequestWrapper and override _thread_begin and an error callback (not shown)

UrlWorker's various methods are used througout my twisted daemon.


More information about the Twisted-Python mailing list