使用 APScheduler 来管理多个 Python 业务脚本

香蕉你个不呐呐
2024-11-19 / 0 评论 / 1 阅读 / 正在检测是否收录...

使用 APScheduler 来管理多个 Python 业务脚本的定时执行的详细示例和使用方法。 我们将使用一个简单的例子,然后逐步扩展到更复杂的情况。

1. 安装 APScheduler:

首先,你需要安装 APScheduler 库:

pip3 install apscheduler

2. 简单的例子 (单个任务):

这个例子展示如何每分钟运行一个简单的函数:

from apscheduler.schedulers.blocking import BlockingScheduler
import time

def my_job():
    print("I'm working...")
    time.sleep(5) # 模拟任务执行时间
    print("Job completed.")

scheduler = BlockingScheduler()
scheduler.add_job(my_job, 'interval', seconds=60) # 每分钟运行一次
scheduler.start()

运行这个脚本,你会看到每分钟都会打印 "I'm working..." 和 "Job completed."。 BlockingScheduler 会阻塞主线程直到你手动停止它 (例如,使用 Ctrl+C)。

3. 多个任务:

你可以添加多个任务到同一个调度器中:

from apscheduler.schedulers.blocking import BlockingScheduler
import time

def job1():
    print("Job 1 is running...")
    time.sleep(2)
    print("Job 1 completed.")

def job2():
    print("Job 2 is running...")
    time.sleep(3)
    print("Job 2 completed.")

scheduler = BlockingScheduler()
scheduler.add_job(job1, 'interval', seconds=10) # 每 10 秒运行一次
scheduler.add_job(job2, 'cron', day_of_week='mon-fri', hour=10, minute=30) # 工作日 10:30 运行

scheduler.start()

这个例子展示了如何使用不同的调度方式:interval (间隔时间) 和 cron (cron 表达式)。

4. 使用不同的调度方式:

APScheduler 支持多种调度方式:

  • interval: 以固定的时间间隔运行任务。 参数包括 seconds, minutes, hours, days, weeks
  • cron: 使用 cron 表达式来指定任务运行的时间。 这提供了非常灵活的调度能力。 例如,'cron', day_of_week='mon-fri', hour=8, minute=0 表示工作日早上 8:00 运行。
  • date: 在指定的时间点运行任务一次。
  • 其他: APScheduler 还支持其他调度方式,例如 trigger (自定义触发器)。

5. 错误处理:

在实际应用中,你的任务可能会失败。 你可以使用 try...except 块来处理异常:

from apscheduler.schedulers.blocking import BlockingScheduler
import time

def my_job():
    try:
        # 你的业务逻辑
        print("Job is running...")
        # ... 可能出错的代码 ...
        1 / 0  # 模拟一个错误
    except Exception as e:
        print(f"Job failed: {e}")

scheduler = BlockingScheduler()
scheduler.add_job(my_job, 'interval', seconds=10)
scheduler.start()

6. 从外部文件导入任务:

如果你的任务在不同的文件中,你可以从外部文件导入它们:

# main.py
from apscheduler.schedulers.blocking import BlockingScheduler
import my_jobs

scheduler = BlockingScheduler()
scheduler.add_job(my_jobs.job1, 'interval', seconds=10)
scheduler.add_job(my_jobs.job2, 'cron', day_of_week='mon-fri', hour=10, minute=30)
scheduler.start()


# my_jobs.py
def job1():
    print("Job 1 from external file")

def job2():
    print("Job 2 from external file")

7. 停止调度器:

你可以使用 scheduler.shutdown() 来优雅地停止调度器。

8. 更高级的用法:

APScheduler 还提供更高级的功能,例如:

  • 任务持久化: 将任务信息保存到数据库中,以便在调度器重启后恢复任务。
  • 作业存储: 使用不同的作业存储后端,例如数据库或文件系统。
  • 监听器: 添加监听器来监控任务的执行情况。

记住将这些例子中的占位符函数替换成你实际的业务脚本函数。 根据你的业务脚本的复杂性和需求,选择合适的调度方式和错误处理机制。 如果你的任务之间存在依赖关系,你可能需要考虑使用更高级的调度功能或其他工具。

0

评论

博主关闭了所有页面的评论