[Twisted-Python] deferred releasing too many tokens

Stephen Thorne stephen at thorne.id.au
Wed Sep 26 17:09:22 EDT 2012


This is a better way of using DeferredSemaphore:

    def queue_thread( self , data=None ):
        self.queued_data= data
        return self.semaphoreService.run( self._T_to_thread )

It handles acquisition and release for you. This will avoid any code
path that might result in a double-release.


On Wed, Sep 26, 2012 at 1:07 PM, Jonathan Vanasco
<twisted-python at 2xlp.com> wrote:
> I'm in the process of rewriting a web spider (originally in twisted circa 2005) , and keep running into an issue with deferreds releasing too many tokens.
>
> i've been playing around with this all day, and can't seem to shake this problem.
>
> i'm guessing that i designed the application logic wrong.
>
> Does the following code raise any warning signs for people ?
>
> The general setup is this:
>
>         AnalyzeLink -
>                 - run-of-the-mill class that performs actual db operations and link fetching
>                 - doesn't really rely on twisted, aside from being coded to the specs of runInteraction ( accepts an adbapi txn, raises for a rollback, is generally happy for a commit )
>
>         AnalyzeLinksService-
>                 - relies on twisted
>                 - queries the database for a batch of items to update
>                 - each actionable item is wrapped into an '_AnalyzeLinksRequestWrapper' instance, all of which are tossed into a defer.DeferredList()
>
>         AnalyzeLinksRequestWrapper-
>                 - relies on twisted
>                 - pushes actual work into callbacks via threads.deferToThread
>                 - uses a defer.DeferredSemaphore provided by AnalyzeLinksService to acquire locks
>
>
> some cleaned-up code is below :
>
> ------------------
>
> class AnalyzeLink(object):
>     def get_update_batch(self,txn):
>         # returns list of ids/data/etc to process
>
>     def action_for_data(self,txn,data):
>         # processes an entry
>
>
> class _AnalyzeLinksRequestWrapper(RequestWrapper):
>         dbConnectionPool = None
>     semaphoreService = None
>     semaphoreLock = None
>
>     def __init__( self , semaphoreService = None , dbConnectionPool = None ):
>         self.dbConnectionPool= dbConnectionPool
>         self.semaphoreService= semaphoreService
>
>     def queue_thread( self , data=None ):
>         self.queued_data= data
>         d = self.semaphoreService.acquire()\
>             .addCallback( self._T_to_thread )
>         return d
>
>     def _T_to_thread( self , deferredSemaphore ):
>         self.semaphoreLock= deferredSemaphore
>         t = threads.deferToThread( self._T_thread_begin )\
>             .addErrback( self._T_errors )\
>             .addCallback( self._T_thread_end )
>
>     def _T_thread_begin( self ):
>         log.debug("_AnalyzeLinksRequestWrapper._T_thread_begin" )
>         updater = AnalyzeLink()
>         self.dbConnectionPool.runInteraction( updater.action_for_data , self.queued_data )\
>             .addCallback( self._T_thread_end )\
>             .addErrback( self._T_errors )
>
>     def _T_thread_end( self , rval=None ):
>         self.semaphoreLock.release()
>
>     def _T_errors( self , x ):
>         self._T_thread_end()
>         raise x
>
>
> class _AnalyzeLinksService(ServiceScaffold):
>     SEMAPHORE_TOKENS = 25
>
>     def __init__( self ):
>         self.semaphoreService= defer.DeferredSemaphore( tokens=self.SEMAPHORE_TOKENS )
>
>     def action( self ):
>         updater= AnalyzeLink()
>         database.get_dbPool().runInteraction( updater.get_update_batch , queued_updates )\
>             .addCallback( self._action_2 )\
>             .addErrback( self._action_error )
>
>     def _action_2( self , queued_updates ):
>         if len( queued_updates ):
>             updates= []
>             for item in queued_updates:
>                 requestWrapper= _AnalyzeLinksRequestWrapper(\
>                     semaphoreService = self.semaphoreService ,
>                     dbConnectionPool = database.get_dbPool()
>                 )
>                 result= requestWrapper.queue_thread( data=item )
>                 updates.append(result)
>             finished= defer.DeferredList( updates )\
>                 .addCallback( self.deferred_list_finish )
>         else:
>             d= defer.Deferred()
>             self.deferred_list_finish( d )
>
>     def _action_error( self , raised ):
>         log.debug("%s._action_error" % self.__class__.__name__ )
>         self.set_processing_status( False )
>         if isinstance( raised.value , database.DbRollback ):
>             print "DB Rollback"
>             raise raised
>         elif isinstance( raised.value , database.DbRollbackOk ):
>             print "DB Rollback ok"
>         else:
>             raise raised
>
>
> AnalyzeLinksService= _AnalyzeLinksService()
>
>
> class AnalyzeLinksService_Service(internet.TimerService):
>     def __init__( self , dbConfigHash=None ):
>         internet.TimerService.__init__( self,
>             CHECK_PERIOD__IMPORT ,
>             AnalyzeLinksService.action
>         )
>
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python



More information about the Twisted-Python mailing list