前言
如果下面的函数库无法执行,出现类似:(前提是python3.7以上)
AttributeError: module ‘asyncio‘ has no attribute ‘run‘
请检查run是否可跳转,如果无法跳转,尝试安装asyncio版本号为最新:pip install asuncio==3.4.3
1. 基本概念
aioschedule 是一个基于 asyncio 的 Python 库,用于在异步应用程序中进行任务调度。
它提供了一种方便的方式来安排和执行异步任务,类似于传统的 schedule 库,但适用于异步编程。
先科普下schedule
aioschedule 和 schedule 都是用于任务调度的 Python 库,但它们在异步和同步
编程环境 | 执行方式 | 依赖环境 |
---|---|---|
schedule 适用于同步编程环境 | schedule 使用阻塞式的方式执行任务 | schedule 不依赖 asyncio 库 |
aioschedule 适用于异步编程环境 | aioschedule 使用非阻塞的异步方式执行任务 | aioschedule 基于 asyncio |
两者的代码相似:
import schedule
import time
def job():
print("Job executed!")
# 注册每隔5秒执行一次的任务
schedule.every(5).seconds.do(job)
while True:
schedule.run_pending()
time.sleep(1)
另一个:
import asyncio
import aioschedule
async def job():
print("Job executed!")
# 注册每隔5秒执行一次的任务
aioschedule.every(5).seconds.do(job)
async def main():
while True:
await aioschedule.run_pending()
await asyncio.sleep(1)
asyncio.run(main())
2. 基本API
aioschedule 提供了一些常用的 API 来实现任务调度。
常用的API如下:
every(interval)
: 用于指定任务执行的时间间隔。
aioschedule.every(5).seconds.do(job)
do(job_function, *args, **kwargs)
: 用于注册要执行的任务函数,可以传递参数给任务函数。
aioschedule.every(10).minutes.do(job, arg1, kwarg1='value')
to(target)
: 用于指定任务执行的终止时间,即任务不再执行的时间点。
aioschedule.every().day.at("14:30").do(job).to("15:00")
tag(tag)
: 为任务添加标签,可以通过标签取消任务。
aioschedule.every().hour.do(job).tag('hourly')
cancel(tag)
: 取消具有指定标签的所有任务。
aioschedule.cancel('hourly')
run_pending()
: 执行所有待处理的任务。
await aioschedule.run_pending()
clear(tag=None)
: 清除所有任务或特定标签的任务。
aioschedule.clear()
# 或
aioschedule.clear('daily')
3. Demo
示例代码如下:
import asyncio
import aioschedule
async def job():
print("Job executed!")
# 注册任务,每隔5秒执行一次
aioschedule.every(1).seconds.do(job)
async def main():
# 异步等待,保持主程序运行
while True:
await aioschedule.run_pending()
await asyncio.sleep(1)
# 运行主程序
asyncio.run(main())
截图如下:
另一个Demo:
import asyncio
import aioschedule
async def job(name, count):
print(f"Job {name} executed {count} times!")
# 注册每隔3秒执行一次的任务,并传递参数
aioschedule.every(3).seconds.do(job, 'TaskA', count=1)
# 注册每隔5秒执行一次的任务,并传递参数
aioschedule.every(5).seconds.do(job, 'TaskB', count=1)
# 注册每小时执行一次的任务,并传递参数
aioschedule.every().hour.do(job, 'TaskC', count=1)
async def main():
# 异步等待,保持主程序运行
for i in range(10):
# 执行所有待处理的任务
await aioschedule.run_pending()
# 异步等待1秒,防止事件循环阻塞
await asyncio.sleep(1)
# 运行主程序
asyncio.run(main())
截图如下:
实战中的Demo也大同小异: