1 利用future.loop取消事件循环
run_forever
不会停止,那么loop.run_until_complete
是怎样停止的。
asyncio/base_envents.py
def run_until_complete(self, future):
"""运行直到Future完成。
如果参数是一个协程,它会被包装成一个Task。
警告:不要多次调用run_until_complete(),否则会包装成不同的Task,这可能会导致问题。
返回Future的结果,或者引发它的异常。
"""
self._check_closed() # 检查事件循环是否已关闭
new_task = not futures.isfuture(future) # 检查future是否是一个新任务
future = tasks.ensure_future(future, loop=self) # 确保将future包装成一个任务
if new_task:
# 如果future是一个新任务,那么不需要记录“销毁挂起任务”的消息
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb) # 添加一个回调函数,用于在Future完成时执行
try:
self.run_forever() # 运行事件循环直到完成
except:
if new_task and future.done() and not future.cancelled():
# 如果协程引发了BaseException,消耗异常以避免记录警告,调用者无法访问本地任务。
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb) # 移除回调函数
if not future.done():
raise RuntimeError('事件循环在Future完成之前停止。')
return future.result() # 返回Future的结果
其中在future结束后回调_run_until_complete_cb
def _run_until_complete_cb(fut):
exc = fut._exception # 获取Future对象的异常信息
if (isinstance(exc, BaseException)
and not isinstance(exc, Exception)):
# 如果异常是BaseException的子类但不是Exception的子类,
# 这通常表示一个不应该停止事件循环的情况(例如,SystemExit)。
# 在这种情况下,不需要停止事件循环。
return
fut._loop.stop() # 停止事件循环
可以看到run_until_complete
的主要逻辑也是run_forever
,通过对Future添加回调函数,实现在执行完成后停止的效果。实际上loop是future的一个属性,这样的话在任何一个future或者task当中, loop都可以被停止掉。(同时,future也注册到loop中形成了一个环状,容易引起循环引用)
2 取消future/task
import asyncio
# 定义一个异步函数 get_html,用于模拟下载网页
async def get_html(sleep_times):
print("waiting")
await asyncio.sleep(sleep_times)
print("done after {}s".format(sleep_times))
if __name__ == "__main__":
# 创建三个不同的协程任务,分别休眠 2 秒、3 秒和 3 秒
task1 = get_html(2)
task2 = get_html(3)
task3 = get_html(3)
# 将任务添加到任务列表中
tasks = [task1, task2, task3]
# 获取事件循环对象
loop = asyncio.get_event_loop()
try:
# 启动协程任务的执行,等待所有任务完成
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
# 处理用户中断程序的情况
all_tasks = asyncio.Task.all_tasks()
for task in all_tasks:
# 尝试取消每个任务
print("cancel task")
print(task.cancel())
# 停止事件循环
loop.stop()
# 保持事件循环处于运行状态,等待任务完成,没有这一句会抛异常
loop.run_forever()
finally:
# 关闭事件循环
loop.close()
这段代码演示了如果用户中断程序,如何取消任务并在取消后关闭事件循环。这里使用 asyncio.Task.all_tasks
方法获得了全部的task,那么这是如何实现的。
asyncio\tasks.py
@classmethod
def all_tasks(cls, loop=None):
"""
返回一个事件循环中的所有任务的集合。
默认情况下,返回当前事件循环中的所有任务。
"""
if loop is None:
loop = events.get_event_loop()
return {t for t in cls._all_tasks if t._loop is loop}
3 携程嵌套
以python 官方文档的例子描述嵌套协程的执行流程
import asyncio
async def compute(x, y):
print("Compute {} + {}...".format(x, y))
await asyncio.sleep(2.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("{} + {} = {}".format(x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
对应的时序图如下:
这里有两个注意的点:
- Task和compute之间建立的一个通道,跳过print_sum(委托方)直接从 compute返回给Task;
- 通过StopIteration获得rerurn的值。