[Twisted-Python] deferred releasing too many tokens

Jonathan Vanasco twisted-python at 2xlp.com
Wed Sep 26 16:07:15 EDT 2012


I'm in the process of rewriting a web spider (originally in twisted circa 2005) , and keep running into an issue with deferreds releasing too many tokens.

i've been playing around with this all day, and can't seem to shake this problem.  

i'm guessing that i designed the application logic wrong.

Does the following code raise any warning signs for people ?

The general setup is this:

	AnalyzeLink -
		- run-of-the-mill class that performs actual db operations and link fetching
		- doesn't really rely on twisted, aside from being coded to the specs of runInteraction ( accepts an adbapi txn, raises for a rollback, is generally happy for a commit )

	AnalyzeLinksService- 
		- relies on twisted
		- queries the database for a batch of items to update
		- each actionable item is wrapped into an '_AnalyzeLinksRequestWrapper' instance, all of which are tossed into a defer.DeferredList()

	AnalyzeLinksRequestWrapper-
		- relies on twisted
		- pushes actual work into callbacks via threads.deferToThread 
		- uses a defer.DeferredSemaphore provided by AnalyzeLinksService to acquire locks


some cleaned-up code is below :

------------------

class AnalyzeLink(object):
    def get_update_batch(self,txn):
    	# returns list of ids/data/etc to process

    def action_for_data(self,txn,data):
    	# processes an entry


class _AnalyzeLinksRequestWrapper(RequestWrapper):
	dbConnectionPool = None
    semaphoreService = None
    semaphoreLock = None

    def __init__( self , semaphoreService = None , dbConnectionPool = None ):
        self.dbConnectionPool= dbConnectionPool
        self.semaphoreService= semaphoreService

    def queue_thread( self , data=None ):
        self.queued_data= data
        d = self.semaphoreService.acquire()\
            .addCallback( self._T_to_thread )
        return d

    def _T_to_thread( self , deferredSemaphore ):
        self.semaphoreLock= deferredSemaphore
        t = threads.deferToThread( self._T_thread_begin )\
            .addErrback( self._T_errors )\
            .addCallback( self._T_thread_end )

    def _T_thread_begin( self ):
        log.debug("_AnalyzeLinksRequestWrapper._T_thread_begin" )
        updater = AnalyzeLink()
        self.dbConnectionPool.runInteraction( updater.action_for_data , self.queued_data )\
            .addCallback( self._T_thread_end )\
            .addErrback( self._T_errors )

    def _T_thread_end( self , rval=None ):
        self.semaphoreLock.release()

    def _T_errors( self , x ):
        self._T_thread_end()
        raise x


class _AnalyzeLinksService(ServiceScaffold):
    SEMAPHORE_TOKENS = 25
    
    def __init__( self ):
        self.semaphoreService= defer.DeferredSemaphore( tokens=self.SEMAPHORE_TOKENS )

    def action( self ):
        updater= AnalyzeLink()
        database.get_dbPool().runInteraction( updater.get_update_batch , queued_updates )\
            .addCallback( self._action_2 )\
            .addErrback( self._action_error )

    def _action_2( self , queued_updates ):
        if len( queued_updates ):
            updates= []
            for item in queued_updates:
                requestWrapper= _AnalyzeLinksRequestWrapper(\
                    semaphoreService = self.semaphoreService ,
                    dbConnectionPool = database.get_dbPool()
                )
                result= requestWrapper.queue_thread( data=item )
                updates.append(result)
            finished= defer.DeferredList( updates )\
                .addCallback( self.deferred_list_finish )
        else:
            d= defer.Deferred()
            self.deferred_list_finish( d )

    def _action_error( self , raised ):
        log.debug("%s._action_error" % self.__class__.__name__ )
        self.set_processing_status( False )
        if isinstance( raised.value , database.DbRollback ):
            print "DB Rollback"
            raise raised
        elif isinstance( raised.value , database.DbRollbackOk ):
            print "DB Rollback ok"
        else:
            raise raised
            
            
AnalyzeLinksService= _AnalyzeLinksService()


class AnalyzeLinksService_Service(internet.TimerService):
    def __init__( self , dbConfigHash=None ):
        internet.TimerService.__init__( self,
            CHECK_PERIOD__IMPORT ,
            AnalyzeLinksService.action
        )
            


More information about the Twisted-Python mailing list