[Twisted-Python] Looping

Rob Hoadley hoadley at gmail.com
Tue Jan 6 01:09:21 EST 2009


So one problem is understanding the reactor.  It's only meant to be
run once.  Some of your code looks like it's trying to start it more
than once.   You only want one started at a time or weird things
happen.  I'd start here and make sure you understand the reactor:

http://twistedmatrix.com/projects/core/documentation/howto/reactor-basics.html

As far as your problem, you should make your URL grabbing service a
twisted service so you can reach your goal of running it as a daemon
and so you can make a programmatic avenue to add new urls to grab.
You'd also want to learn about making twisted services.   As a
previous poster recommends, you have a two straightforward ways to
call things.  reactor.callLater is the one I like to use.  The example
l show you below is going every 10 seconds.

The way to daemonize things is by using twistd using a tac file.  So
there are a number of things you'll need to cover.  You may want to
slowly go through the twisted tutorial.

http://twistedmatrix.com/projects/core/documentation/howto/tutorial/index.html

As a teaser,  In your service __init__ you'd create your
self.localUrls aka GROUPS from above as an object complex or simple as
you want.  You may want to carry around more info. want it to persist,
or not.  Below the data doesn't persist as it's pop'd off a deque.
It's up to you to figure out how you want it to work.

Here are some modules you'd probably want to use and a semi working example:

from twisted.internet import  reactor
from twisted.python import  log
from twisted.web import client
from twisted.application import  service
from collections import deque

class URLGrabService(service.Service):

    def __init__ ( self ):
        # self.urls  = deque() something that contains your urls I like deque
        self.call = reactor.callLater(10,self.getPages)

   def startService( self ):
        # implement your start service code if needed
        # twistd would call this from a succesful Application Object
don't call directly
        service.Service.startService( self )

    def stopService( self ):
       # stop your reactor.call
       if self.call:
           self.call.cancel()
       service.Service.stopService( self )

    def getPages( self ):

        if len( self.urls ):
            localUrls = deque()

            while self.urls:
                localQueue.append( self.localUrls.popleft() )

            # clear out the localUrls and move to a localQueue var

            while localQueue:
                urlObj = localQueue.popleft() # mucho faster than a
list doing pop(0)
                log.msg( 'sending %s' % urlObj.url )
                self._getPage( urlObj )
                # I just create a bunch of deferreds

        self.call = reactor.callLater(10,self.getPages)

	def _getPage( self, urlObj ):
		d = client.getPage( urlObj.url )
	        d.addCallback( self._checkWebStatus, urlObj )
	        d.addErrback( self._readdUrl, urlObj )
	        return d

        def _checkWebStatus( self, page, urlObj ):
               # do the work you'd do on a successful grab

        def _readdUrl( self, urlObj ):
                # handle errors... I generally retry them on the next
cycle and set a urlObj.retry +1

Then you'd want a tac file to pull it all together.  Hope this helps.

-rob




On Mon, Jan 5, 2009 at 8:01 PM, Robert Hancock <hancock.robert at gmail.com> wrote:
> I understand how the deferreds can create concurrency, but I still
> cant' figure out how to get the process to loop correctly
>
> I changed the code to use multiple deffereds, but I feel I'm missing a
> basic architectural design on how to get the process to run in a loop.
>  Any help would be greatly appreciated.
>
> Case 1
> -------------
> If I execute if with:
>   (RSSFeedStorage(GROUPS)).refreshAllFeeds()
>    reactor.run()
>
> If executes perfectly, one time.  Or from a loop in a shell script it
> also runs fine multiple times.
>
> I would like to perform the same function every 30 seconds from within
> the script, but when I try:
>    while True:
>        (RSSFeedStorage(GROUPS)).refreshAllFeeds()
>        reactor.run()
>        time.sleep(5)
>
> Case 2
> ------------
> It executes twice and hangs, and when you hit Ctrl-C the traceback is:
>
>  File "main_rss.py", line 94, in <module>
>    reactor.run()
>  File "/usr/local/python26/lib/python2.6/site-packages/Twisted-8.2.0-py2.6-linux-x86_64.egg/twisted/internet/base.py",
> line 1128, in run
>    self.mainLoop()
>  File "/usr/local/python26/lib/python2.6/site-packages/Twisted-8.2.0-py2.6-linux-x86_64.egg/twisted/internet/base.py",
> line 1137, in mainLoop
>    self.runUntilCurrent()
> --- <exception caught here> ---
>  File "/usr/local/python26/lib/python2.6/site-packages/Twisted-8.2.0-py2.6-linux-x86_64.egg/twisted/internet/base.py",
> line 729, in runUntilCurrent
>    f(*a, **kw)
>  File "/usr/local/python26/lib/python2.6/site-packages/Twisted-8.2.0-py2.6-linux-x86_64.egg/twisted/internet/base.py",
> line 527, in stop
>    "Can't stop reactor that isn't running.")
> twisted.internet.error.ReactorNotRunning: Can't stop reactor that isn't running.
>
> Case 3
> -------------
> I tried th task.LoopingCall() example from the manual and wrote:
>    rf = RSSFeedStorage(GROUPS)
>    l = task.LoopingCall(rf.refreshAllFeeds())
>    l.start(10)
> [commented out reactor.stop in sto_working()]
>
> and it raises the following exception, I assume because the function
> returns a deferred list.
>
> exceptions.AttributeError: DeferredList instance has no __call__ method
>
>
>
> ==== Code ======
>
> import os
> import time
> import xml.sax.saxutils
>
> from twisted.internet import reactor, defer, protocol, task
> from twisted.web import client
>
> GROUPS = { 'yahoo' : 'http://finance.yahoo.com/rss/usmarkets',
>           'bbc':
> 'http://newsrss.bbc.co.uk/rss/newsonline_world_edition/front_page/rss.xml',
>           'npr': 'http://www.npr.org/rss/rss.php?id=1006',
>           'nytimes':
> 'http://www.nytimes.com/services/xml/rss/nyt/WorldBusiness.xml'
>           }
> RSYNC_DIR = '/local/apps/rdretrvl/rdsync/rssfeed'
> escape_dict = {'/': '_'}
>
>
> class RssFeed(object):
>
>    def __init__(self, groupName, feedUrl):
>        self.refreshRate = 10
>        self.title = ''
>        self.groupName = groupName
>        self.feedUrl = feedUrl
>        self.articles = []
>        self.articlesById = {}
>        self.lastRefreshTime = 0
>        self.refreshing = None
>
>    def refreshIfNeeded(self):
>        timeSinceRefersh = time.time() - self.lastRefreshTime
>        if timeSinceRefersh > self.refreshRate:
>            if not self.refreshing:
>                self.refreshing = client.getPage(self.feedUrl).addCallback(
>                    self._gotFeedData).addErrback(self._getDataFailed)
>            d = defer.Deferred()
>            self.refreshing.chainDeferred(d)
>            return d
>        else:
>            return defer.succeed(None)
>
>    def _gotFeedData(self, data):
>        print "Loaded feed data from %s" % self.feedUrl
>        self.refreshing = None
>        self.lastRefreshTime = time.time()
>        # write to rsync directory
>        name = xml.sax.saxutils.escape(self.feedUrl, escape_dict)
>        try:
>            fnout = os.path.join(RSYNC_DIR, name)
>            fout = open(fnout, 'w')
>        except IOError, e:
>            print e
>            #log.error('Opening %s: %s' % (fnout, e))
>            return
>
>        try:
>            fout.write(data)
>            #log.info('Wrote %s.' % fnout)
>            print 'Wrote %s.' % fnout
>        except IOError, e:
>            print e
>            #log.error('Wrting to %s: %s' % (fnout, e))
>        finally:
>            fout.close()
>
>    def _getDataFailed(self, failure):
>        print 'Failed to load RSS data from %s: %s' % (self.feedUrl,
> failure.getErrorMessage())
>        self.refreshing = None
>        return failure
>
> class RSSFeedStorage(object):
>
>    def __init__(self, feeds):
>        self.feeds = {}
>        for groupName, url in feeds.iteritems():
>            self.feeds[groupName] = RssFeed(groupName, url)
>
>    def refreshAllFeeds(self):
>        refreshes = [feed.refreshIfNeeded() for feed in self.feeds.values()]
>        return defer.DeferredList(refreshes).addCallback(self.stop_working).addErrback(self.handleError)
>
>    def stop_working(self, data):
>        reactor.stop()
>        print 'Reactor stopped'
>
>    def handleError(self, failure):
>        print failure
>
> if __name__ == "__main__":
>
>          {execution code goes here]
>
> On Mon, Jan 5, 2009 at 11:53 AM, Drew Smathers <drew.smathers at gmail.com> wrote:
>> On Sun, Jan 4, 2009 at 11:23 PM, Robert Hancock
>> <hancock.robert at gmail.com> wrote:
>>>> How many urls are in url_tuples?
>>> 4 - 32
>>>> Is there a reason why you're using just one deferred?
>>> What is the advantage of using more?
>>>
>>
>> Concurrency.
>>
>> -Drew
>>
>> _______________________________________________
>> Twisted-Python mailing list
>> Twisted-Python at twistedmatrix.com
>> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>>
>
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>




More information about the Twisted-Python mailing list