[Twisted-Python] deferred releasing too many tokens
Stephen Thorne
stephen at thorne.id.au
Wed Sep 26 17:09:22 EDT 2012
This is a better way of using DeferredSemaphore:
def queue_thread( self , data=None ):
self.queued_data= data
return self.semaphoreService.run( self._T_to_thread )
It handles acquisition and release for you. This will avoid any code
path that might result in a double-release.
On Wed, Sep 26, 2012 at 1:07 PM, Jonathan Vanasco
<twisted-python at 2xlp.com> wrote:
> I'm in the process of rewriting a web spider (originally in twisted circa 2005) , and keep running into an issue with deferreds releasing too many tokens.
>
> i've been playing around with this all day, and can't seem to shake this problem.
>
> i'm guessing that i designed the application logic wrong.
>
> Does the following code raise any warning signs for people ?
>
> The general setup is this:
>
> AnalyzeLink -
> - run-of-the-mill class that performs actual db operations and link fetching
> - doesn't really rely on twisted, aside from being coded to the specs of runInteraction ( accepts an adbapi txn, raises for a rollback, is generally happy for a commit )
>
> AnalyzeLinksService-
> - relies on twisted
> - queries the database for a batch of items to update
> - each actionable item is wrapped into an '_AnalyzeLinksRequestWrapper' instance, all of which are tossed into a defer.DeferredList()
>
> AnalyzeLinksRequestWrapper-
> - relies on twisted
> - pushes actual work into callbacks via threads.deferToThread
> - uses a defer.DeferredSemaphore provided by AnalyzeLinksService to acquire locks
>
>
> some cleaned-up code is below :
>
> ------------------
>
> class AnalyzeLink(object):
> def get_update_batch(self,txn):
> # returns list of ids/data/etc to process
>
> def action_for_data(self,txn,data):
> # processes an entry
>
>
> class _AnalyzeLinksRequestWrapper(RequestWrapper):
> dbConnectionPool = None
> semaphoreService = None
> semaphoreLock = None
>
> def __init__( self , semaphoreService = None , dbConnectionPool = None ):
> self.dbConnectionPool= dbConnectionPool
> self.semaphoreService= semaphoreService
>
> def queue_thread( self , data=None ):
> self.queued_data= data
> d = self.semaphoreService.acquire()\
> .addCallback( self._T_to_thread )
> return d
>
> def _T_to_thread( self , deferredSemaphore ):
> self.semaphoreLock= deferredSemaphore
> t = threads.deferToThread( self._T_thread_begin )\
> .addErrback( self._T_errors )\
> .addCallback( self._T_thread_end )
>
> def _T_thread_begin( self ):
> log.debug("_AnalyzeLinksRequestWrapper._T_thread_begin" )
> updater = AnalyzeLink()
> self.dbConnectionPool.runInteraction( updater.action_for_data , self.queued_data )\
> .addCallback( self._T_thread_end )\
> .addErrback( self._T_errors )
>
> def _T_thread_end( self , rval=None ):
> self.semaphoreLock.release()
>
> def _T_errors( self , x ):
> self._T_thread_end()
> raise x
>
>
> class _AnalyzeLinksService(ServiceScaffold):
> SEMAPHORE_TOKENS = 25
>
> def __init__( self ):
> self.semaphoreService= defer.DeferredSemaphore( tokens=self.SEMAPHORE_TOKENS )
>
> def action( self ):
> updater= AnalyzeLink()
> database.get_dbPool().runInteraction( updater.get_update_batch , queued_updates )\
> .addCallback( self._action_2 )\
> .addErrback( self._action_error )
>
> def _action_2( self , queued_updates ):
> if len( queued_updates ):
> updates= []
> for item in queued_updates:
> requestWrapper= _AnalyzeLinksRequestWrapper(\
> semaphoreService = self.semaphoreService ,
> dbConnectionPool = database.get_dbPool()
> )
> result= requestWrapper.queue_thread( data=item )
> updates.append(result)
> finished= defer.DeferredList( updates )\
> .addCallback( self.deferred_list_finish )
> else:
> d= defer.Deferred()
> self.deferred_list_finish( d )
>
> def _action_error( self , raised ):
> log.debug("%s._action_error" % self.__class__.__name__ )
> self.set_processing_status( False )
> if isinstance( raised.value , database.DbRollback ):
> print "DB Rollback"
> raise raised
> elif isinstance( raised.value , database.DbRollbackOk ):
> print "DB Rollback ok"
> else:
> raise raised
>
>
> AnalyzeLinksService= _AnalyzeLinksService()
>
>
> class AnalyzeLinksService_Service(internet.TimerService):
> def __init__( self , dbConfigHash=None ):
> internet.TimerService.__init__( self,
> CHECK_PERIOD__IMPORT ,
> AnalyzeLinksService.action
> )
>
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
More information about the Twisted-Python
mailing list