3. asyncio 的简单应用
3.1 helloworld
# helloworld-asyncio
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2) #
print("end get url")
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(get_html("xxx"))
print(time.time()-start_time)
首先,考虑一个基本的协程get_html
,它负责从一个URL获取HTML内容。为了模拟实际的网络请求过程,我们需要在其中添加一个延迟操作,如休眠两秒。虽然协程提供了强大的功能,但如果没有事件循环的支持,它本身是无法运行的。asyncio
为我们提供了这样的事件循环。使用asyncio.get_event_loop()
,我们可以轻松地获取默认的事件循环。在上面的代码中,run_until_complete
方法是阻塞的,确保了协程在退出前会完全执行。
但注意,传统的同步time.sleep()
方法在这里是不可用的,因为它会阻塞整个事件循环。而我们在协程中要使用的是asyncio
库提供的异步sleep方法。此处的不可用不是指报错,而是会使操作顺序化,完全消除asyncio
带来的并发优势。
# 模拟并发场景
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("xxx") for i in range(10)] # 生成协程对象列表
loop.run_until_complete(asyncio.wait(tasks))
print(time.time()-start_time)
我们希望并发地获取10次数据。一种简单的方法是,使用一个循环生成10个协程对象,并将它们添加到事件循环中。`get_html`是一个异步函数。为了驱动这10个协程,我们可以使用`asyncio.wait()`方法,它接受一个协程对象的可迭代集合。
当运行此代码时,你会看到10次"start get url"几乎同时打印,然后约2秒后,10次"end get url"也几乎同时打印。总的执行时间应该接近2秒,这正是并发的威力。
# 模拟并发场景
import asyncio
import time
async def get_html(url):
print("start get url")
time.sleep(2) # 使用阻塞的time
print("end get url")
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("xxx") for i in range(10)] # 生成协程对象列表
loop.run_until_complete(asyncio.wait(tasks))
print(time.time()-start_time)
而将`get_html` 函数中将 `await asyncio.sleep(2)` 替换为 `time.sleep(2)`,程序的总执行时间会从约2秒增加到20秒。因为`time.sleep(2)`是一个同步操作,它会阻塞当前的线程2秒。在`asyncio`的事件循环中,当有一个协程执行到这个操作时,整个事件循环会被阻塞2秒。相对的,在`asyncio.sleep(2)`方法会立即返回一个`future`对象,不会阻塞事件循环。这也解释了在`tornado`框架中使用`pymysql`等数据库驱动无法达到并发的效果的原因,在协程编码使用数据库驱动或网络驱动的时候,一定要使用异步库去完成我们的功能。
3.2 获取协程返回值
方法1
# 获取协程的返回值-task
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2) #
return "print result"
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
task = loop.create_task(get_html("xxx"))
loop.run_until_complete(task)
print(task.result())
方法2 future方法,与线程库future的接口几乎一致。
# 获取协程的返回值-future
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2) #
return "end get url"
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
get_future = asyncio.ensure_future(get_html("xxx"))
loop.run_until_complete(get_future)
print(get_future.result())
这里有一个问题 loop是如何传递到future中的。
# asyncio/task.py
if loop is None:
loop = events.get_event_loop()
task = loop.create_task(coro_or_future)
在`asyncio/task.py`文件中阅读`ensure_future()`方法源码可以发现 当没有事件循环时,会调用`get_event_loop()`方法获取事件循环,而一个线程中只有唯一的一个事件循环,所以两代码为同一个loop。然后`ensure_future()`方法会调用`create_task()`方法,将传递进来的携程包装成task。
3.3 wait与gather
# 使用asyncio
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("xxx") for i in range(10)]
# wait
# loop.run_until_complete(asyncio.wait(tasks))
# gather
loop.run_until_complete(asyncio.gather(*tasks))
print(time.time()-start_time)
其中 wait()是一个协程,接收一组`Future`对象或协程,并在满足等待条件后返回。
@coroutine
def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
等待条件有以下3种,默认为ALL_COMPLETED
FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
gather是wait更高一层的功能抽象,它还可以用于分组和分组取消等操作。
#wait 和 gather
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# gather和wait的区别
# gather更加high-level
group1 = [get_html("xxx") for i in range(2)]
group2 = [get_html("xxxx") for i in range(2)]
group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)
# 取消分组
# group2.cancel()
loop.run_until_complete(asyncio.gather(group1, group2))
print(time.time() - start_time)