方法一:使用 interval
参数 (每小时执行一次)
from apscheduler.schedulers.background import BackgroundScheduler
def cron_OvertimeData():
print('123')
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.add_job(cron_OvertimeData, 'interval', hours=1) # 每小时执行一次
scheduler.start()
app.run(host='0.0.0.0', port=8080, debug=True)
方法二:使用 cron
参数,更精确控制 (每小时的0分执行)
from apscheduler.schedulers.background import BackgroundScheduler
def cron_OvertimeData():
print('123')
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.add_job(cron_OvertimeData, 'cron', minute='0') # 每小时的0分执行
scheduler.start()
app.run(host='0.0.0.0', port=8080, debug=True)
方法三:结合 date
参数,设置起始时间和结束时间 (每小时执行一次,持续10小时)
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
def cron_OvertimeData():
print('123')
if __name__ == '__main__':
scheduler = BackgroundScheduler()
start_time = datetime.now()
end_time = start_time + timedelta(hours=10) #运行10小时
scheduler.add_job(cron_OvertimeData, 'interval', hours=1, start_date=start_time, end_date=end_time)
scheduler.start()
app.run(host='0.0.0.0', port=8080, debug=True)
重要提示: 这些代码片段都假设你已经安装了 APScheduler
库 (pip install apscheduler
),并且你的代码中已经存在 app
对象 (例如,Flask 应用)。 你需要根据你的实际应用环境进行调整。 特别是 app.run()
部分,需要根据你的Web框架进行修改。 如果你的应用不是基于Flask,你需要替换成你的框架的运行方法。 另外,确保你的程序能够持续运行,否则定时任务将无法正常执行。 在生产环境中,建议使用更健壮的进程管理工具来保证程序的稳定运行。
是的,APScheduler
提供了更丰富的功能,可以补充到之前的例子中,使定时任务更加强大和灵活。以下是一些补充用法:
1. 任务参数传递:
你可以向定时任务函数传递参数:
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
def my_task(param1, param2):
print(f"Task executed with parameters: {param1}, {param2}")
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.add_job(my_task, 'interval', hours=1, args=['hello', 123])
scheduler.start()
# ... your app.run() ...
2. 任务ID和替换:
你可以为每个任务分配一个ID,方便管理和替换:
from apscheduler.schedulers.background import BackgroundScheduler
def my_task():
print("Task executed")
if __name__ == '__main__':
scheduler = BackgroundScheduler()
job = scheduler.add_job(my_task, 'interval', hours=1, id='my_unique_job_id')
# ... later, you can replace the job ...
scheduler.remove_job('my_unique_job_id')
new_job = scheduler.add_job(my_task, 'interval', hours=2, id='my_unique_job_id')
scheduler.start()
# ... your app.run() ...
3. 任务的暂停和恢复:
你可以暂停和恢复特定的任务:
from apscheduler.schedulers.background import BackgroundScheduler
def my_task():
print("Task executed")
if __name__ == '__main__':
scheduler = BackgroundScheduler()
job = scheduler.add_job(my_task, 'interval', hours=1, id='my_job')
# ... later, pause the job ...
job.pause()
# ... later, resume the job ...
job.resume()
scheduler.start()
# ... your app.run() ...
4. 错误处理:
使用 misfire_grace_time
参数设置任务错过执行时间的容忍时间,避免因为短暂的延迟而导致任务丢失:
from apscheduler.schedulers.background import BackgroundScheduler
def my_task():
print("Task executed")
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.add_job(my_task, 'interval', hours=1, misfire_grace_time=60) # 允许延迟60秒
scheduler.start()
# ... your app.run() ...
5. 使用不同的调度器:
除了 BackgroundScheduler
,APScheduler
还提供了 BlockingScheduler
(阻塞调度器,适合简单的脚本) 和 AsyncIOScheduler
(异步调度器,适合异步环境)。 选择合适的调度器取决于你的应用架构。
6. 监听事件:
APScheduler
提供了事件监听机制,可以让你在任务执行前、执行后或出现错误时执行相应的操作:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
def my_listener(event):
if event.exception:
print(f"Job {event.job_id} failed: {event.exception}")
else:
print(f"Job {event.job_id} executed successfully")
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.add_job(my_task, 'interval', hours=1, id='my_job')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.start()
# ... your app.run() ...
这些补充用法可以让你更精细地控制定时任务的执行,并处理各种异常情况。 选择哪些用法取决于你的具体需求。 记住查阅 APScheduler
的官方文档以获取更详细的信息和更多高级功能。
评论