Saturday, April 2, 2011

Twisted: Asynchronous HTTP Request

Note that how to make an HTTP request with Twisted is already documented. But, unless you're already familiar with Twisted, my guess is that extending the example code to downloading a large number of web pages with a limit on the number of simultaneous requests is not easy. Below, you'll find example code for exactly that. Below the code is a walk-through that will hopefully help you understand the details.


from pprint import pformat

from twisted.internet import reactor
import twisted.internet.defer
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

class PrinterClient(Protocol):
    def __init__(self, whenFinished):
        self.whenFinished = whenFinished

    def dataReceived(self, bytes):
        print '##### Received #####\n%s' % (bytes,)

    def connectionLost(self, reason):
        print 'Finished:', reason.getErrorMessage()
        self.whenFinished.callback(None)

def handleResponse(r):
    print "version=%s\ncode=%s\nphrase='%s'" % (r.version, r.code, r.phrase)
    for k, v in r.headers.getAllRawHeaders():
        print "%s: %s" % (k, '\n  '.join(v))
    whenFinished = twisted.internet.defer.Deferred()
    r.deliverBody(PrinterClient(whenFinished))
    return whenFinished

def handleError(reason):
    reason.printTraceback()
    reactor.stop()

def getPage(url):
    print "Requesting %s" % (url,)
    d = Agent(reactor).request('GET', url, Headers({'User-Agent': ['twisted']}), None)
    d.addCallbacks(handleResponse, handleError)
    return d

semaphore = twisted.internet.defer.DeferredSemaphore(2)
dl = list()
dl.append(semaphore.run(getPage, 'http://google.com'))
dl.append(semaphore.run(getPage, 'http://cnn.com'))
dl.append(semaphore.run(getPage, 'http://nytimes.com'))
dl = twisted.internet.defer.DeferredList(dl)
dl.addCallbacks(lambda x: reactor.stop(), handleError)

reactor.run()

getPage handles an entire single HTTP request. Agent(reactor).request() creates an Agent and sends the HTTP request. request() returns a deferred which is fired when the headers are retrieved. The addCallbacks line specifies that handleResponse is called upon successful header retrieval and handleError is called if there is an error in retrieving the headers.

handleResponse is given a Response object which contains the HTTP header and includes a method, deliverBody, to specify a Protocol to handle delivery of the HTTP body. A Protocol is used for body delivery because it may come in chunks and an error may occur in the middle of delivery (e.g. someone pulls your network plug). PrinterClient is a very simple Protocol which (1) prints received data, (2) logs the reason for termination (if not twisted.web.client.ResponseDone, there was an error), and (3) fires a deferred whenFinished.

The trickiest part of this code is following the Deferred chain, which is essential to understanding how we limit the maximum number of outstanding requests. A key point to understand about Deferreds is that, if a callback returns a Deferred, the parent Deferred waits for the child Deferred to fire before handing a value to the next Deferred in the chain. See documentation on Chaining Deferreds. Because of this, each semaphore.run waits for the PrinterClient protocol to complete before releasing its semaphore. The DeferredSemaphore is basically a Deferred-aware semaphore. It's only argument is the number of tokens it allows to be "checked-out" simultaneously. When we make the nytimes.com semaphore.run call, the semaphore doesn't call getPage until one of the other requests has completed.

The DeferredList is used to clean-up after all requests have completed. Under normal circumstances, we just want to stop the reactor so our process will exit. But, if there is an error, we want to see what happened, hence we use handleError in that case.

Update 9/13/11: Minor code formatting change.

Wednesday, March 9, 2011

Twisted: Beware: Returning a Value from dataReceived

We just lost approximately 10-man-hours to undocumented behavior of Twisted. If you return a True truth value from your dataReceived function (after it is called by the reactor), the reactor will destroy your protocol, and close the corresponding connection. Fortunately, this behavior is recognized as a bug and a deprecation warning will be likely be issued with this behavior in the 11.0 release. But, since many of us are stuck with Twisted 10.2 or earlier for months, if not years, to come, it's good to be aware of this issue.

Friday, February 25, 2011

If you Misspell "protocol", this is what you get

Traceback (most recent call last):
  File "/usr/lib/python2.5/site-packages/twisted/python/log.py", line 51, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/usr/lib/python2.5/site-packages/twisted/python/log.py", line 36, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/usr/lib/python2.5/site-packages/twisted/python/context.py", line 59, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/usr/lib/python2.5/site-packages/twisted/python/context.py", line 37, in callWithContext
    return func(*args,**kw)
---  ---
  File "/usr/lib/python2.5/site-packages/twisted/internet/selectreactor.py", line 146, in _doReadOrWrite
    why = getattr(selectable, method)()
  File "/usr/lib/python2.5/site-packages/twisted/internet/tcp.py", line 563, in doConnect
    self._connectDone()
  File "/usr/lib/python2.5/site-packages/twisted/internet/tcp.py", line 566, in _connectDone
    self.protocol = self.connector.buildProtocol(self.getPeer())
  File "/usr/lib/python2.5/site-packages/twisted/internet/base.py", line 930, in buildProtocol
    return self.factory.buildProtocol(addr)
  File "/usr/lib/python2.5/site-packages/twisted/internet/protocol.py", line 98, in buildProtocol
    p = self.protocol()
This is worth remembering. Note that nothing here refers to the corresponding factory or the code where the error was made. I think this is one reason Twisted can be frustrating.

You can see this error with an simple example:

from twisted.internet import protocol
from twisted.internet import reactor
class MyProtocol(protocol.Protocol):
    pass
class MyFactory(protocol.ReconnectingClientFactory):
    protcol = MyProtocol
reactor.connectTCP('google.com', 80, MyFactory())
reactor.run()
It's certainly convenient to be able to set the protocol so simply, but it's disappointing that the error isn't caught at the source. I wonder why the factories don't have an __init__ method that checks for a valid protocol field?

Wednesday, January 26, 2011

Using exceptions for goto

Goto is a shunned construct in modern programming languages, but occasionally there is a case where it makes sense, such as breaking-out from a set of nested for statements when a solution is found. But, modern programming languages contain a better construct for such cases---exceptions. For example, say we are trying to find an item from each of three sets which jointly satisfy some criterion. A naive implementation might look like:

foundMatch = False
for item1 in set1:
    for item2 in set2:
        for item3 in set3:
            if satisfiesCriterion(item1, item2, item3):
                foundMatch = True
                break
        if foundMatch:
            break
    if foundMatch:
        break
But, this code can be simplified by introducing an exception:
try:
    for item1 in set1:
        for item2 in set2:
            for item3 in set3:
                if satisfiesCriterion(item1, item2, item3):
                    raise FoundMatch()
except FoundMatch:
    pass

Thursday, January 20, 2011

Twisted Documentation

There is currently much discussion on the twisted mailing list about improving twisted documentation. I'm one of many who think the documentation could be improved. I found a major problem to be a lack of introduction to the twisted mental model---the fact that it uses cooperative timesharing and blocking calls to handle events.

Victor Norman suggested Dave Peticolas' Twisted Introduction. Reading the first article which explains the Twisted "mental model" felt like a breath of fresh air. I disagree with his use of asynchronous, which implies parallel, non-blocking, etc. But, starting with the mental model is definitely the right approach. Now, if only this documentation could be integrated with the main documentation...

P.S. Dave Peticolas---I've heard that name before. Sure enough, he worked on GnuCash, my accounting program of choice.

Wednesday, January 12, 2011

Twisted: callWhenRunning, callFromThread or callLater?

When I first learned of reactor.callWhenRunning, I apparently didn't read the documentation and/or source code sufficiently carefully. I correctly understood that it was the function to use when you wanted to queue a function to be called immediately after reactor start. My mistake was to believe that it queued the function if the reactor had already been started. In fact, if the reactor is in the "running" state, it simply calls the specified function. I wonder if part of the reason for this design is how it handles the not-running case. If the reactor is not running, callWhenRunning adds a startup trigger for the specified function. Such a trigger cannot be used to queue-up a task/call.

I learned (the hard way) of the need for callFromThread when trying to run a web server and twisted reactor in separate threads of the same process ("don't try this at home"). Jean-Paul's answer to my question about reactor.wakeUp provides the reason for this requirement. The reactor must make blocking calls (e.g. select()) for certain functionality (e.g. networking). The wakeUp trips the blocking call by, e.g., "writ[ing] a byte to a pipe the reactor is select()ing (etc) on". In my case, I found that an attempt by the web server code to write to the network might be ignored indefinitely unless the call was wrapped with callFromThread. What does callFromThread do? It adds the function to the threadCallQueue and "wakes up" the reactor. Unlike callWhenRunning the specified function call isn't made until after callFromThread returns, so it can be used to queue-up a function for running when the reactor (re-)gains control.

If you read the callFromThread documentation, you'll find that callLater is the recommended way (with delay=0) to queue a function for calling in the next mainLoop iteration. Like callFromThread, callLater uses a queue(s) to manage the calls. Two queues are kept: one for calls which haven't waited long enough (_newTimedCalls), and one for calls which have waited long enough, but haven't been called yet (_pendingTimedCalls). The _pendingTimedCalls are called during the next mainLoop iteration.