Saturday, April 2, 2011

Twisted: Asynchronous HTTP Request

Note that how to make an HTTP request with Twisted is already documented. But, unless you're already familiar with Twisted, my guess is that extending the example code to downloading a large number of web pages with a limit on the number of simultaneous requests is not easy. Below, you'll find example code for exactly that. Below the code is a walk-through that will hopefully help you understand the details.


from pprint import pformat

from twisted.internet import reactor
import twisted.internet.defer
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

class PrinterClient(Protocol):
    def __init__(self, whenFinished):
        self.whenFinished = whenFinished

    def dataReceived(self, bytes):
        print '##### Received #####\n%s' % (bytes,)

    def connectionLost(self, reason):
        print 'Finished:', reason.getErrorMessage()
        self.whenFinished.callback(None)

def handleResponse(r):
    print "version=%s\ncode=%s\nphrase='%s'" % (r.version, r.code, r.phrase)
    for k, v in r.headers.getAllRawHeaders():
        print "%s: %s" % (k, '\n  '.join(v))
    whenFinished = twisted.internet.defer.Deferred()
    r.deliverBody(PrinterClient(whenFinished))
    return whenFinished

def handleError(reason):
    reason.printTraceback()
    reactor.stop()

def getPage(url):
    print "Requesting %s" % (url,)
    d = Agent(reactor).request('GET', url, Headers({'User-Agent': ['twisted']}), None)
    d.addCallbacks(handleResponse, handleError)
    return d

semaphore = twisted.internet.defer.DeferredSemaphore(2)
dl = list()
dl.append(semaphore.run(getPage, 'http://google.com'))
dl.append(semaphore.run(getPage, 'http://cnn.com'))
dl.append(semaphore.run(getPage, 'http://nytimes.com'))
dl = twisted.internet.defer.DeferredList(dl)
dl.addCallbacks(lambda x: reactor.stop(), handleError)

reactor.run()

getPage handles an entire single HTTP request. Agent(reactor).request() creates an Agent and sends the HTTP request. request() returns a deferred which is fired when the headers are retrieved. The addCallbacks line specifies that handleResponse is called upon successful header retrieval and handleError is called if there is an error in retrieving the headers.

handleResponse is given a Response object which contains the HTTP header and includes a method, deliverBody, to specify a Protocol to handle delivery of the HTTP body. A Protocol is used for body delivery because it may come in chunks and an error may occur in the middle of delivery (e.g. someone pulls your network plug). PrinterClient is a very simple Protocol which (1) prints received data, (2) logs the reason for termination (if not twisted.web.client.ResponseDone, there was an error), and (3) fires a deferred whenFinished.

The trickiest part of this code is following the Deferred chain, which is essential to understanding how we limit the maximum number of outstanding requests. A key point to understand about Deferreds is that, if a callback returns a Deferred, the parent Deferred waits for the child Deferred to fire before handing a value to the next Deferred in the chain. See documentation on Chaining Deferreds. Because of this, each semaphore.run waits for the PrinterClient protocol to complete before releasing its semaphore. The DeferredSemaphore is basically a Deferred-aware semaphore. It's only argument is the number of tokens it allows to be "checked-out" simultaneously. When we make the nytimes.com semaphore.run call, the semaphore doesn't call getPage until one of the other requests has completed.

The DeferredList is used to clean-up after all requests have completed. Under normal circumstances, we just want to stop the reactor so our process will exit. But, if there is an error, we want to see what happened, hence we use handleError in that case.

Update 9/13/11: Minor code formatting change.