Skip to content

基于 asyncio 协程的网络爬虫

作者:A. Jesse Jiryu Davis 和 Guido van Rossum

如果你喜欢这些书,你也许也会喜欢 Software Design by Example in PythonSoftware Design by Example in JavaScript

A. Jesse Jiryu Davis 是 MongoDB 纽约总部的高级工程师。他编写了异步 MongoDB Python 驱动 Motor,是 MongoDB C 驱动的主要开发者,也是 PyMongo 团队成员。他为 asyncio 和 Tornado 做出过贡献。个人博客:http://emptysqua.re

Guido van Rossum 是 Python 语言的创造者。Python 社区称他为 BDFL(终身仁慈独裁者),这个称号来自 Monty Python 的小品。Guido 的主页:http://www.python.org/~guido/

简介

传统计算机科学强调高效的算法,追求尽快完成计算。但许多网络程序并不是在计算上花费时间,而是需要高效地等待大量缓慢或偶发的网络事件。这类程序的挑战在于:如何高效地等待大量网络事件。现代的解决方案是异步 I/O(async)。

本章将介绍一个简单的网络爬虫。爬虫是典型的异步应用,因为它需要等待大量响应,但计算量很小。它能同时抓取的页面越多,完成得就越快。如果为每个请求分配一个线程,随着并发请求数的增加,线程资源会先耗尽,而不是 socket 资源。通过异步 I/O,可以避免线程的开销。

我们将分三步介绍示例。首先,展示一个异步事件循环,并用回调实现一个高效但难以扩展的爬虫。其次,展示 Python 协程如何兼具高效与可扩展性,并用生成器函数实现简单协程。最后,使用 Python 标准库 asyncio 的完整协程,并用异步队列协调它们。

任务描述

网络爬虫用于查找并下载网站上的所有页面,常用于归档或索引。它从根 URL 开始,抓取页面,解析其中的链接,将未见过的链接加入队列。当抓取到的页面没有新链接且队列为空时,爬虫结束。

我们可以通过并发抓取多个页面来加快进度。爬虫发现新链接后,会为每个新页面启动并发抓取操作。响应到达时解析内容,将新链接加入队列。并发数过高会影响性能,因此我们限制最大并发数,剩余链接留在队列中等待。

传统方法

如何让爬虫并发?传统做法是创建线程池,每个线程负责抓取一个页面。例如,抓取 xkcd.com 的页面:

python
def fetch(url):
	sock = socket.socket()
	sock.connect(('xkcd.com', 80))
	request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
	sock.send(request.encode('ascii'))
	response = b''
	chunk = sock.recv(4096)
	while chunk:
		response += chunk
		chunk = sock.recv(4096)

	# 页面已下载
	links = parse_links(response)
	q.add(links)

默认情况下,socket 操作是阻塞的:线程调用如 connect 或 recv 时会阻塞,直到操作完成。因此,要同时抓取多个页面,就需要多个线程。复杂应用会用线程池和连接池来复用线程和 socket。

但线程开销大,操作系统对线程数有限制。在 Jesse 的系统上,一个 Python 线程约占 50k 内存,启动上万线程会失败。如果要同时操作上万个 socket,线程数会先耗尽。线程的开销或系统限制成为瓶颈。

Dan Kegel 在著名的 “C10K 问题” 文章中指出多线程 I/O 并发的局限。他说:

现在是时候让 Web 服务器同时处理一万个客户端了,不是吗?毕竟,Web 现在很大。

Kegel 在 1999 年提出了 “C10K” 概念。如今一万个连接已不算多,但问题本质未变。用每连接一个线程的方式处理 C10K 仍然不现实。即使我们的玩具爬虫用线程也能工作,但对于大规模应用,线程数仍是瓶颈。如何突破?

异步

异步 I/O 框架用非阻塞 socket 在单线程上实现并发操作。在异步爬虫中,先将 socket 设为非阻塞:

python
sock = socket.socket()
sock.setblocking(False)
try:
	sock.connect(('xkcd.com', 80))
except BlockingIOError:
	pass

非阻塞 socket 的 connect 会抛出异常,即使正常工作。这是底层 C 函数的行为,用 errno 的 EINPROGRESS 表示连接已开始。

现在,爬虫需要知道何时连接建立,以便发送 HTTP 请求。可以用循环不断尝试:

python
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
encoded = request.encode('ascii')

while True:
	try:
		sock.send(encoded)
		break  # 完成
	except OSError as e:
		pass

print('sent')

这种方法既浪费资源,也无法高效等待多个 socket 的事件。早期 BSD Unix 用 select 函数等待非阻塞 socket 的事件。现在有 poll、kqueue(BSD)、epoll(Linux)等高效替代方案。

Python 3.4 的 DefaultSelector 会选用系统上最优的 select 类函数。我们用非阻塞 socket 并注册到 selector:

python
from selectors import DefaultSelector, EVENT_WRITE

selector = DefaultSelector()

sock = socket.socket()
sock.setblocking(False)
try:
	sock.connect(('xkcd.com', 80))
except BlockingIOError:
	pass

def connected():
	selector.unregister(sock.fileno())
	print('connected!')

selector.register(sock.fileno(), EVENT_WRITE, connected)

忽略异常后,调用 selector.register,传入 socket 的文件描述符和等待的事件类型(如 EVENT_WRITE),以及事件发生时要执行的回调函数 connected。

事件循环处理 I/O 通知:

python
def loop():
	while True:
		events = selector.select()
		for event_key, event_mask in events:
			callback = event_key.data
			callback()

connected 回调存储在 event_key.data,事件发生时被调用。

与上面的循环不同,这里的 select 会阻塞,直到有 I/O 事件发生,然后循环执行等待这些事件的回调。未完成的操作会在后续循环中继续等待。

我们已经展示了如何启动操作并在操作就绪时执行回调。异步框架正是基于非阻塞 socket 和事件循环,在单线程上实现并发操作。

这里实现了 “并发”,但不是传统意义上的 “并行”。即可以重叠 I/O 操作,但不会利用多核并行计算。这个系统适合 I/O 密集型问题,而非 CPU 密集型。

事件循环能高效处理并发 I/O,因为不为每个连接分配线程资源。但需要澄清一个误区:异步并不一定比多线程快。在 Python 中,事件循环在处理少量活跃连接时反而比多线程慢一些。异步 I/O 适合大量慢速或偶发事件的应用。

回调式编程

用我们目前构建的简易异步框架,如何实现一个网络爬虫?即使是一个简单的 URL 抓取器,写起来也很痛苦。

我们先用全局集合记录待抓取和已抓取的 URL:

python
urls_todo = set(['/'])
seen_urls = set(['/'])

seen_urls 包含 urls_todo 和已完成的 URL。两者都以根 URL “/” 初始化。

抓取页面需要一系列回调。connected 回调在 socket 连接建立时触发,发送 GET 请求。但随后还要等待响应,因此又要注册另一个回调。如果回调触发时还没读完响应,就再次注册回调,如此往复。

我们将这些回调收集到一个 Fetcher 对象中。它需要 URL、socket 对象和用于累积响应字节的变量:

python
class Fetcher:
    def __init__(self, url):
        self.response = b''  # 字节数组
        self.url = url
        self.sock = None

我们从调用 Fetcher.fetch 开始:

python
# Fetcher 类方法
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

fetch 方法开始连接 socket,但注意它会在连接建立前返回,把控制权交还给事件循环。

connected 方法实现如下:

python
# Fetcher 类方法
    def connected(self, key, mask):
        print('connected!')
        selector.unregister(key.fd)
        request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
        self.sock.send(request.encode('ascii'))
        selector.register(key.fd,
                          EVENT_READ,
                          self.read_response)

该方法发送 GET 请求,然后注册下一个回调,等待响应。

最后的回调 read_response 处理服务器响应:

python
# Fetcher 类方法
    def read_response(self, key, mask):
        global stopped
        chunk = self.sock.recv(4096)
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)
            links = self.parse_links()
            for link in links.difference(seen_urls):
                urls_todo.add(link)
                Fetcher(link).fetch()
            seen_urls.update(links)
            urls_todo.remove(self.url)
            if not urls_todo:
                stopped = True

每次 socket 可读时,回调被执行。读取 4k 数据块,若没读完则继续等待。全部读完后解析链接,启动新 Fetcher。

我们用全局变量 stopped 控制事件循环:

python
stopped = False

def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

所有页面下载完毕后,爬虫停止。

这种回调式异步编程的最大问题是“意大利面条代码”。每次 I/O 操作都要手动保存状态,代码难以维护。

协程

有没有办法既高效又优雅地写异步代码?答案是协程。用 Python 3.4 的 asyncio 和 aiohttp 包,抓取 URL 变得非常直接:

python
@asyncio.coroutine
    def fetch(self, url):
        response = yield from self.session.get(url)
        body = yield from response.read()

协程的内存开销远小于线程,Python 可以轻松启动数十万个协程。

协程本质上是可以暂停和恢复的子程序。与线程的抢占式调度不同,协程是协作式调度:它们主动让出控制权。

Python 3.4 的协程基于生成器、Future 类和 yield from 语句。Python 3.5 以后,协程成为语言原生特性。

下面我们将介绍生成器、Future、Task 及 yield from,并用它们实现异步爬虫。

Python 生成器原理

普通 Python 函数调用子程序时,子程序会一直运行到返回或抛出异常,然后控制权回到调用者。

生成器函数用 yield 暂停执行,返回一个值,等待下次被唤醒。

python
def gen_fn():
    result = yield 1
    print('result of yield: {}'.format(result))
    result2 = yield 2
    print('result of 2nd yield: {}'.format(result2))
    return 'done'

调用生成器函数不会立即执行,而是返回一个生成器对象。每个生成器有自己的堆栈帧和局部变量。

send 方法可以恢复生成器的执行,直到下一个 yield

构建基于生成器的协程

生成器可以暂停和恢复,有返回值,非常适合用来实现协程。我们用 Future 表示协程等待的结果:

python
class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []
    def add_done_callback(self, fn):
        self._callbacks.append(fn)
    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

Fetcher 的 fetch 方法可以这样写:

python
def fetch(self):
    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect(('xkcd.com', 80))
    except BlockingIOError:
        pass
    f = Future()
    def on_connected():
        f.set_result(None)
    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield f
    selector.unregister(sock.fileno())
    print('connected!')

我们还需要一个 Task 类来驱动协程:

python
class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)
    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)

这样,fetch 协程 yield 一个 Future,Task 驱动它,Future 完成后继续执行。

yield from 拆分协程

yield from 可以让一个协程委托给另一个协程,实现逻辑复用。例如:

python
def read(sock):
    f = Future()
    def on_readable():
        f.set_result(sock.recv(4096))
    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield f
    selector.unregister(sock.fileno())
    return chunk

def read_all(sock):
    response = []
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)
    return b''.join(response)

这样,fetch 协程可以直接调用 yield from read_all(sock),代码结构和同步写法类似。

完善 Future 类后,可以用 yield from 等待 Future,调用者无需关心被等待对象的类型。

用 asyncio 协程实现爬虫

我们用 asyncio 协程和队列实现爬虫。爬虫抓取首页,解析链接,加入队列。多个 worker 并发抓取,队列为空时退出。

python
loop = asyncio.get_event_loop()
crawler = Crawler('http://xkcd.com', max_redirect=10)
loop.run_until_complete(crawler.crawl())

Crawler 类用队列、集合和 aiohttp 的 ClientSession 管理状态。crawl 协程启动 worker,等待队列完成:

python
@asyncio.coroutine
def crawl(self):
    workers = [asyncio.Task(self.work()) for _ in range(self.max_tasks)]
    yield from self.q.join()
    for w in workers:
        w.cancel()

worker 协程不断从队列取 URL,抓取页面,解析新链接,加入队列,最后调用 task_done

python
@asyncio.coroutine
def work(self):
    while True:
        url, max_redirect = yield from self.q.get()
        yield from self.fetch(url, max_redirect)
        self.q.task_done()

fetch 协程处理重定向和新链接:

python
@asyncio.coroutine
def fetch(self, url, max_redirect):
    response = yield from self.session.get(url, allow_redirects=False)
    try:
        if is_redirect(response):
            if max_redirect > 0:
                next_url = response.headers['location']
                if next_url in self.seen_urls:
                    return
                self.seen_urls.add(next_url)
                self.q.put_nowait((next_url, max_redirect - 1))
        else:
            links = yield from self.parse_links(response)
            for link in links.difference(self.seen_urls):
                self.q.put_nowait((link, self.max_redirect))
            self.seen_urls.update(links)
    finally:
        yield from response.release()

队列的 join、put_nowait、task_done 等方法协调 worker 和主协程。

结论

现代程序越来越多是 I/O 密集型而非 CPU 密集型。Python 线程既不能并行计算,又容易出现竞态。异步是更合适的模式,但回调式异步代码容易混乱。协程则优雅得多,结构清晰,异常处理和堆栈追踪都很自然。

协程代码和同步写法极为相似,只需在 I/O 处加上 yield fromawait。你可以用多线程的经典模式来协调协程,无需重新发明轮子。

Python 3.5 以后,协程成为语言原生特性,async defawait 语法更简洁。无论底层实现如何变化,核心思想不变:Task、Future、事件循环仍然是 asyncio 的基石。

理解了 asyncio 协程的原理后,你可以放心使用其高层接口,写出高效、优雅的现代异步程序。