[Twisted-Python] deferred releasing too many tokens
Jonathan Vanasco
twisted-python at 2xlp.com
Wed Sep 26 14:07:15 MDT 2012
I'm in the process of rewriting a web spider (originally in twisted circa 2005) , and keep running into an issue with deferreds releasing too many tokens.
i've been playing around with this all day, and can't seem to shake this problem.
i'm guessing that i designed the application logic wrong.
Does the following code raise any warning signs for people ?
The general setup is this:
AnalyzeLink -
- run-of-the-mill class that performs actual db operations and link fetching
- doesn't really rely on twisted, aside from being coded to the specs of runInteraction ( accepts an adbapi txn, raises for a rollback, is generally happy for a commit )
AnalyzeLinksService-
- relies on twisted
- queries the database for a batch of items to update
- each actionable item is wrapped into an '_AnalyzeLinksRequestWrapper' instance, all of which are tossed into a defer.DeferredList()
AnalyzeLinksRequestWrapper-
- relies on twisted
- pushes actual work into callbacks via threads.deferToThread
- uses a defer.DeferredSemaphore provided by AnalyzeLinksService to acquire locks
some cleaned-up code is below :
------------------
class AnalyzeLink(object):
def get_update_batch(self,txn):
# returns list of ids/data/etc to process
def action_for_data(self,txn,data):
# processes an entry
class _AnalyzeLinksRequestWrapper(RequestWrapper):
dbConnectionPool = None
semaphoreService = None
semaphoreLock = None
def __init__( self , semaphoreService = None , dbConnectionPool = None ):
self.dbConnectionPool= dbConnectionPool
self.semaphoreService= semaphoreService
def queue_thread( self , data=None ):
self.queued_data= data
d = self.semaphoreService.acquire()\
.addCallback( self._T_to_thread )
return d
def _T_to_thread( self , deferredSemaphore ):
self.semaphoreLock= deferredSemaphore
t = threads.deferToThread( self._T_thread_begin )\
.addErrback( self._T_errors )\
.addCallback( self._T_thread_end )
def _T_thread_begin( self ):
log.debug("_AnalyzeLinksRequestWrapper._T_thread_begin" )
updater = AnalyzeLink()
self.dbConnectionPool.runInteraction( updater.action_for_data , self.queued_data )\
.addCallback( self._T_thread_end )\
.addErrback( self._T_errors )
def _T_thread_end( self , rval=None ):
self.semaphoreLock.release()
def _T_errors( self , x ):
self._T_thread_end()
raise x
class _AnalyzeLinksService(ServiceScaffold):
SEMAPHORE_TOKENS = 25
def __init__( self ):
self.semaphoreService= defer.DeferredSemaphore( tokens=self.SEMAPHORE_TOKENS )
def action( self ):
updater= AnalyzeLink()
database.get_dbPool().runInteraction( updater.get_update_batch , queued_updates )\
.addCallback( self._action_2 )\
.addErrback( self._action_error )
def _action_2( self , queued_updates ):
if len( queued_updates ):
updates= []
for item in queued_updates:
requestWrapper= _AnalyzeLinksRequestWrapper(\
semaphoreService = self.semaphoreService ,
dbConnectionPool = database.get_dbPool()
)
result= requestWrapper.queue_thread( data=item )
updates.append(result)
finished= defer.DeferredList( updates )\
.addCallback( self.deferred_list_finish )
else:
d= defer.Deferred()
self.deferred_list_finish( d )
def _action_error( self , raised ):
log.debug("%s._action_error" % self.__class__.__name__ )
self.set_processing_status( False )
if isinstance( raised.value , database.DbRollback ):
print "DB Rollback"
raise raised
elif isinstance( raised.value , database.DbRollbackOk ):
print "DB Rollback ok"
else:
raise raised
AnalyzeLinksService= _AnalyzeLinksService()
class AnalyzeLinksService_Service(internet.TimerService):
def __init__( self , dbConfigHash=None ):
internet.TimerService.__init__( self,
CHECK_PERIOD__IMPORT ,
AnalyzeLinksService.action
)
More information about the Twisted-Python
mailing list