[Twisted-web] cooperate and keeping data integrity in response

John Aherne johnaherne at rocs.co.uk
Sat Jan 14 06:36:47 EST 2012


Sorry about the double post. But copy/paste failed me. I left out part
of the code.

Now included


I have been looking at JPCalderones example of using web.request with JSON
which seems much like what I want.

One thing I am not clear about is if I get a lot of queries coming in more
or less simultaneously and I am using cooperate to allow other functions to
run, will I need to guard against my data in a list being overwritten by
subsequent requests.

The way I see it the functions to read data and and store it in my list are
in danger of impacting each other.

The response is being built cooperatively bit by bit to permit other
functions to run so it could happen that the next request overwrites my
list where the database query is being stored.

If this is a danger, then I need to prevent this, which seems to imply that
I will need to block each request and not service another request until the
previous one has completed.

Have I got that right or am I way off target or have I missed the obvious

What would be good is to keep on servicing requests so that the response is
good but keep the data integrity.

The test code I am using is JP's with some minor variations shown below. I
hope it formats correctly.

Thanks for any help.

Regards

John Aherne

#Asynchronous JSON
#Today in #twisted.web the topic of generating large JSON responses
#in a Twisted Web server came up. The problem was that the data being
#serialized into JSON was so large that the JSON serialization process
#itself would block the web server, preventing other requests from being
#serviced.
#
#The first solution that came up was to split the web server into two
#pieces, so that the URLs which could have these JSON responses were
#served by a different process than was serving the rest. This is a
#pretty decent solution, and it also provides the benefit of using extra
#CPU cores if there are any available. In this case, it complicated
#things a little since it meant sharing a session across two processes.
#So we went looking for another approach.
#
#It turns out that the json module supports incremental serialization.
#When I saw the JSONEncoder.iterencode method, I thought it would be
#great used in combination with cooperate to create a producer. This
#would let an application serialize a large structure to JSON without
#multiple processes, threads, or unreasonably blocking the reactor.
#
#Here's the little bit of glue necessary to make things work:
import cgi
from json import JSONEncoder
from twisted.enterprise import adbapi
from twisted.internet.task import cooperate

#db = sqlite3.connect('c:\\sqlite\\test.db')
#cur = db.cursor()
dbpool = adbapi.ConnectionPool("pyodbc","DSN=testsql",cp_reconnect='True')

class AsyncJSON(object):
    def __init__(self, value):
        self._value = value


    def beginProducing(self, consumer):
        #print 'value', self._value
        self._consumer = consumer
        self._iterable = JSONEncoder().iterencode(self._value)
        #print 'iterable', self._iterable
        self._consumer.registerProducer(self, True)
        self._task = cooperate(self._produce())
        d = self._task.whenDone()
        d.addBoth(self._unregister)
        return d


    def pauseProducing(self):
        self._task.pause()


    def resumeProducing(self):
        self._task.resume()


    def stopProducing(self):
        self._task.stop()


    def _produce(self):
        for chunk in self._iterable:
            #print 'chunk', chunk
            self._consumer.write(chunk)
            yield None


    def _unregister(self, passthrough):
        self._consumer.unregisterProducer()
        return passthrough



#By using the iterencode method, this avoids spending too much time
#generating json output at once. Instead, a little bit of the input
#will be serialized at a time, and each short resulting string is available
#from the iterator returned by iterencode.
#
#By using cooperate, the _produce generator will iterated in a way that
#lets it cooperate with the reactor and other event sources/handlers.
#A few chunks of json data will be written to the consumer, then execution
#will switch away to something else, then come back and a few more will
#be written, and so on.
#
#And by using the producer/consumer interface, if the HTTP client which
#issued the request doesn't read the results as fast as they're being
#generated, the server will stop generating new output until the client
#catches up.
#
#Altogether, this provides a very cool, efficient way to generate JSON
#output.
#
#Here's an example to make it easier to see how one might use AsyncJSON
#in a resource:
#
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
from twisted.web.server import Site
from twisted.internet import reactor

def read_pcodes(pcode, request):
    """ Read postcode data and premise data for single postocde   """

    sql_mail = """select rcmplc01.substreet,rcmplc01.street,
                         rcmplc01.sublocality, rcmplc01.locality,
                         rcmplc01.town,
                         rcmplc01.postcode,
                         rcmplc02.data
                         from rcmplc01
                         left outer join rcmplc02
                         on rcmplc01.postcode = rcmplc02.postcode
                         where rcmplc01.postcode = ?
             """
    pcode = pcode.strip().upper()
    def run():
        return dbpool.runQuery(sql_mail,(pcode,))
    d = run()
    d.addCallback(read_result, request)
    d.addErrback(read_failure,request)
    return d

def read_failure(o, request):
    print 'failure', str(o)
    request.finish()


def read_result(res, request):
    """ read result for postcode lookup. Build return list"""
    #print 'res', res
    print 'len res', len(res)
    my_list = []
    for item in res:
my_list.append([item[0], item[1], item[2], item[3], item[4], item[5]])
    d = AsyncJSON(my_list).beginProducing(request)
    d.addCallback(lambda ignored: request.finish())
    d.addErrback(got_error, request)

class PostcodeFinder(Resource):
    """ Handle http POST requests  for postcode lookup"""
    def render_POST(self, request):
        print 'req', request.args
        if request.args.get('pcode', None):
            pcode = cgi.escape(request.args['pcode'][0]).strip().upper()
            if (pcode[-3:][0].isdigit() and pcode[-2:].isalpha() and
                len(pcode[:len(pcode)-3].strip()) < 5):
                postcode2 = '%s %s' % (pcode[:len(pcode)-3].strip(),pcode[-3:])
                d = read_pcodes(postcode2, request)
            else:
                return 'Not a VALID REQUEST'
        elif request.args.get('street', None):
            m25 = True if request.args.get('M25', None) else False
            d =
read_street(cgi.escape(request.args['street'][0]).strip().upper(),
m25, request)
        elif request.args.get('orgname', None):
            d =
read_orgname(cgi.escape(request.args['orgname'][0]).strip().upper(),
request)
        else:
            return 'Not a VALID REQUEST'
        return NOT_DONE_YET


root = Resource()
root.putChild("json", PostcodeFinder())
factory = Site(root)
reactor.listenTCP(8086, factory)
reactor.run()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://twistedmatrix.com/pipermail/twisted-web/attachments/20120114/9eb0631c/attachment-0001.htm 


More information about the Twisted-web mailing list