python twisted异步采集

2014年10月26日 发表评论 阅读评论

对于大量的数据采集除了多线程,就只有异步来实现了。本文是通过twisted框架来实现异步采集,原文来自:http://oubiwann.blogspot.com/2008/06/async-batching-with-twisted-walkthrough.html 。

Async Batching with Twisted: A Walkthrough

Example 1: Just a DefferedList

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet.defer import DeferredList
def listCallback(results):
  print results
def finish(ign):
  reactor.stop()
def test():
  d1 = getPage('http://www.google.com')
  d2 = getPage('http://yahoo.com')
  dl = DeferredList([d1, d2])
  dl.addCallback(listCallback)
  dl.addCallback(finish)
test()
reactor.run()

This is one of the simplest examples you’ll ever see for a deferred list in action. Get two deferreds (the getPage function returns a deferred) and use them to created a deferred list. Add callbacks to the list, garnish with a lemon.

Example 2: Simple Result Manipulation

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet.defer import DeferredList
def listCallback(results):
  for isSuccess, content in results:
    print "Successful? %s" % isSuccess
    print "Content Length: %s" % len(content)
def finish(ign):
  reactor.stop()
def test():
  d1 = getPage('http://www.google.com')
  d2 = getPage('http://yahoo.com')
  dl = DeferredList([d1, d2])
  dl.addCallback(listCallback)
  dl.addCallback(finish)
test()
reactor.run()

We make things a little more interesting in this example by doing some processing on the results. For this to make sense, just remember that a callback gets passed the result when the deferred action completes. If we look up the API documentation for DeferredList, we see that it returns a list of (success, result) tuples, where success is a Boolean and result is the result of a deferred that was put in the list (remember, we’ve got two layers of deferreds here!).

Example 3: Page Callbacks Too

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet.defer import DeferredList
def pageCallback(result):
  return len(result)
def listCallback(result):
  print result
def finish(ign):
  reactor.stop()
def test():
  d1 = getPage('http://www.google.com')
  d1.addCallback(pageCallback)
  d2 = getPage('http://yahoo.com')
  d2.addCallback(pageCallback)
  dl = DeferredList([d1, d2])
  dl.addCallback(listCallback)
  dl.addCallback(finish)
test()
reactor.run()

Here, we mix things up a little bit. Instead of doing processing on all the results at once (in the deferred list callback), we’re processing them when the page callbacks fire. Our processing here is just a simple example of getting the length of the getPage deferred result: the HTML content of the page at the given URL.

Example 4: Results with More Structure

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet.defer import DeferredList
def pageCallback(result):
  data = {
    'length': len(result),
    'content': result[:10],
    }
  return data
def listCallback(result):
  for isSuccess, data in result:
    if isSuccess:
      print "Call to server succeeded with data %s" % str(data)
def finish(ign):
  reactor.stop()
def test():
  d1 = getPage('http://www.google.com')
  d1.addCallback(pageCallback)
  d2 = getPage('http://yahoo.com')
  d2.addCallback(pageCallback)
  dl = DeferredList([d1, d2])
  dl.addCallback(listCallback)
  dl.addCallback(finish)
test()
reactor.run()

A follow-up to the last example, here we put the data in which we are interested into a dictionary. We don’t end up pulling any of the data out of the dictionary; we just stringify it and print it to stdout.

Example 5: Passing Values to Callbacks

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet.defer import DeferredList
def pageCallback(result, url):
  data = {
    'length': len(result),
    'content': result[:10],
    'url': url,
    }
  return data
def getPageData(url):
  d = getPage(url)
  d.addCallback(pageCallback, url)
  return d
def listCallback(result):
  for isSuccess, data in result:
    if isSuccess:
      print "Call to %s succeeded with data %s" % (data['url'], str(data))
def finish(ign):
  reactor.stop()
def test():
  d1 = getPageData('http://www.google.com')
  d2 = getPageData('http://yahoo.com')
  dl = DeferredList([d1, d2])
  dl.addCallback(listCallback)
  dl.addCallback(finish)
test()
reactor.run()

After all this playing, we start asking ourselves more serious questions, like: “I want to decide which values show up in my callbacks” or “Some information that is available here, isn’t available there. How do I get it there?” This is how 🙂 Just pass the parameters you want to your callback. They’ll be tacked on after the result (as you can see from the function signatures).

In this example, we needed to create our own deferred-returning function, one that wraps the getPage function so that we can also pass the URL on to the callback.

Example 6: Adding Some Error Checking

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet.defer import DeferredList
urls = [
  'http://yahoo.com',
  'http://www.google.com',
  'http://www.google.com/MicrosoftRules.html',
  'http://bogusdomain.com',
  ]
def pageCallback(result, url):
  data = {
    'length': len(result),
    'content': result[:10],
    'url': url,
    }
  return data
def pageErrback(error, url):
  return {
    'msg': error.getErrorMessage(),
    'err': error,
    'url': url,
    }
def getPageData(url):
  d = getPage(url, timeout=5)
  d.addCallback(pageCallback, url)
  d.addErrback(pageErrback, url)
  return d
def listCallback(result):
  for ignore, data in result:
    if data.has_key('err'):
      print "Call to %s failed with data %s" % (data['url'], str(data))
    else:
      print "Call to %s succeeded with data %s" % (data['url'], str(data))
def finish(ign):
  reactor.stop()
def test():
  deferreds = []
  for url in urls:
    d = getPageData(url)
    deferreds.append(d)
  dl = DeferredList(deferreds, consumeErrors=1)
  dl.addCallback(listCallback)
  dl.addCallback(finish)
test()
reactor.run()

As we get closer to building real applications, we start getting concerned about things like catching/anticipating errors. We haven’t added any errbacks to the deferred list, but we have added one to our page callback. We’ve added more URLs and put them in a list to ease the pains of duplicate code. As you can see, two of the URLs should return errors: one a 404, and the other should be a domain not resolving (we’ll see this as a timeout).

Example 7: Batching with DeferredSemaphore

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet import defer
maxRun = 1
urls = [
  'http://twistedmatrix.com',
  'http://twistedsoftwarefoundation.org',
  'http://yahoo.com',
  'http://www.google.com',
  ]
def listCallback(results):
  for isSuccess, result in results:
    print len(result)
def finish(ign):
  reactor.stop()
def test():
  deferreds = []
  sem = defer.DeferredSemaphore(maxRun)
  for url in urls:
    d = sem.run(getPage, url)
    deferreds.append(d)
  dl = defer.DeferredList(deferreds)
  dl.addCallback(listCallback)
  dl.addCallback(finish)
test()
reactor.run()

These last two examples are for more advanced use cases. As soon as the reactor starts, deferreds that are ready, start “firing” — their “jobs” start running. What if we’ve got 500 deferreds in a list? Well, they all start processing. As you can imagine, this is an easy way to run an accidental DoS against a friendly service. Not cool.

For situations like this, what we want is a way to run only so many deferreds at a time. This is a great use for the deferred semaphore. When I repeated runs of the example above, the content lengths of the four pages returned after about 2.5 seconds. With the example rewritten to use just the deferred list (no deferred semaphore), the content lengths were returned after about 1.2 seconds. The extra time is due to the fact that I (for the sake of the example) forced only one deferred to run at a time, obviously not what you’re going to want to do for a highly concurrent task 😉

Note that without changing the code and only setting maxRun to 4, the timings for getting the the content lengths is about the same, averaging for me 1.3 seconds (there’s a little more overhead involved when using the deferred semaphore).

One last subtle note (in anticipation of the next example): the for loop creates all the deferreds at once; the deferred semaphore simply limits how many get run at a time.

Example 8: Throttling with Cooperator

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet import defer, task
maxRun = 2
urls = [
  'http://twistedmatrix.com',
  'http://twistedsoftwarefoundation.org',
  'http://yahoo.com',
  'http://www.google.com',
  ]
def pageCallback(result):
  print len(result)
  return result
def doWork():
  for url in urls:
    d = getPage(url)
    d.addCallback(pageCallback)
    yield d
def finish(ign):
  reactor.stop()
def test():
  deferreds = []
  coop = task.Cooperator()
  work = doWork()
  for i in xrange(maxRun):
    d = coop.coiterate(work)
    deferreds.append(d)
  dl = defer.DeferredList(deferreds)
  dl.addCallback(finish)
test()
reactor.run()

虽然目前还没到研究twisted框架的水平,不过这里先记录下,以备以后用时再回味。




本站的发展离不开您的资助,金额随意,欢迎来赏!

You can donate through PayPal.
My paypal id: itybku@139.com
Paypal page: https://www.paypal.me/361way

  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.