安排 Python 脚本每小时准确运行一次
- 2025-03-05 09:14:00
- admin 原创
- 83
问题描述:
在我询问之前,Cron Jobs 和 Task Scheduler将是我的最后选择,这个脚本将在 Windows 和 Linux 上使用,我更愿意使用编码方法来完成这项工作,而不是留给最终用户去完成。
是否有可用于安排任务的 Python 库?我需要每小时运行一次函数,但是,随着时间的推移,如果我每小时运行一次脚本并使用 .sleep,由于执行/运行脚本和/或函数固有的延迟,“每小时一次”将在与前一天不同的时间运行。
在不使用 Cron Job 或使用任务计划程序进行安排的情况下,安排函数在一天中的特定时间运行(多次)的最佳方法是什么?
或者如果这不可能的话,我也希望得到您的意见。
AP Scheduler 完全满足我的需要。
版本低于 3.0
import datetime
import time
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.daemonic = False
sched.start()
def job_function():
print("Hello World")
print(datetime.datetime.now())
time.sleep(20)
# Schedules job_function to be run once each minute
sched.add_cron_job(job_function, minute='0-59')
出去:
>Hello World
>2014-03-28 09:44:00.016.492
>Hello World
>2014-03-28 09:45:00.0.14110
版本 > 3.0
(摘自以下 Animesh Pandey 的回答)
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched.configure(options_from_ini_file)
sched.start()
解决方案 1:
也许这可以帮助:Advanced Python Scheduler
这是他们文档中的一小段代码:
from apscheduler.schedulers.blocking import BlockingScheduler
def some_job():
print "Decorated job"
scheduler = BlockingScheduler()
scheduler.add_job(some_job, 'interval', hours=1)
scheduler.start()
解决方案 2:
每过一小时 10 分钟运行某项任务。
from datetime import datetime, timedelta
while 1:
print 'Run something..'
dt = datetime.now() + timedelta(hours=1)
dt = dt.replace(minute=10)
while datetime.now() < dt:
time.sleep(1)
解决方案 3:
对于apscheduler
<3.0,请参见Unknown 的答案。
对于apscheduler
> 3.0
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
@sched.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched.configure(options_from_ini_file)
sched.start()
更新:
apscheduler
文 档。
这apscheduler-3.3.1
为Python 3.6.2
。
"""
Following configurations are set for the scheduler:
- a MongoDBJobStore named “mongo”
- an SQLAlchemyJobStore named “default” (using SQLite)
- a ThreadPoolExecutor named “default”, with a worker count of 20
- a ProcessPoolExecutor named “processpool”, with a worker count of 5
- UTC as the scheduler’s timezone
- coalescing turned off for new jobs by default
- a default maximum instance limit of 3 for new jobs
"""
from pytz import utc
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
"""
Method 1:
"""
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
"""
Method 2 (ini format):
"""
gconfig = {
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
}
sched_method1 = BlockingScheduler() # uses overrides from Method1
sched_method2 = BlockingScheduler() # uses same overrides from Method2 but in an ini format
@sched_method1.scheduled_job('interval', seconds=10)
def timed_job():
print('This job is run every 10 seconds.')
@sched_method2.scheduled_job('cron', day_of_week='mon-fri', hour=10)
def scheduled_job():
print('This job is run every weekday at 10am.')
sched_method1.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
sched_method1.start()
sched_method2.configure(gconfig=gconfig)
sched_method2.start()
解决方案 4:
我能建议的最简单的选项是使用时间表库。
在您的问题中,您说“我需要每小时运行一次函数”,执行此操作的代码非常简单:
import schedule
def thing_you_wanna_do():
...
...
return
schedule.every().hour.do(thing_you_wanna_do)
while True:
schedule.run_pending()
您还询问如何在一天中的某个时间做某事,以下是一些如何执行此操作的示例:
import schedule
def thing_you_wanna_do():
...
...
return
schedule.every().day.at("10:30").do(thing_you_wanna_do)
schedule.every().monday.do(thing_you_wanna_do)
schedule.every().wednesday.at("13:15").do(thing_you_wanna_do)
# If you would like some randomness / variation you could also do something like this
schedule.every(1).to(2).hours.do(thing_you_wanna_do)
while True:
schedule.run_pending()
所用代码的 90% 是 计划库的示例代码。祝您计划顺利!
解决方案 5:
每 15 分钟运行一次脚本。例如,您想要接收 15 分钟的股票价格报价,该报价每 15 分钟更新一次。
while True:
print("Update data:", datetime.now())
sleep = 15 - datetime.now().minute % 15
if sleep == 15:
run_strategy()
time.sleep(sleep * 60)
else:
time.sleep(sleep * 60)
解决方案 6:
Python 标准库确实为该任务提供了调度和线程。但这意味着您的调度程序脚本将一直运行,而不是将其执行留给操作系统,这可能是也可能不是您想要的。
解决方案 7:
#For scheduling task execution
import schedule
import time
def job():
print("I'm working...")
schedule.every(1).minutes.do(job)
#schedule.every().hour.do(job)
#schedule.every().day.at("10:30").do(job)
#schedule.every(5).to(10).minutes.do(job)
#schedule.every().monday.do(job)
#schedule.every().wednesday.at("13:15").do(job)
#schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
解决方案 8:
也许Rocketry可以满足您的需求。它是一款功能强大的调度程序,使用起来非常简单,具有许多内置调度选项,并且易于扩展:
from rocketry import Rocketry
from rocketry.conds import daily, every, after_success
app = Rocketry()
@app.task(every("1 hour 30 minutes"))
def do_things():
...
@app.task(daily.between("12:00", "17:00"))
def do_daily_afternoon():
...
@app.task(daily & after_success(do_things))
def do_daily_after_task():
...
if __name__ == "__main__":
app.run()
但其内容远不止这些:
基于字符串的调度语法
逻辑语句(AND、OR、NOT)
大量内置调度选项
易于定制(自定义条件、参数等)
并行化(在单独的线程或进程上运行)
参数化(执行顺序和输入输出)
持久性:将日志放在你喜欢的任何地方
在运行时修改调度程序(即在其上构建 API)
链接:
免责声明:我是作者
解决方案 9:
sunshinekitty 发布的版本名为“Version < 3.0”,您可能需要指定 apscheduler 2.1.2。我安装的 2.7 版本中意外安装了版本 3,因此我执行了以下操作:
pip uninstall apscheduler
pip install apscheduler==2.1.2
之后一切正常。希望对您有所帮助。
解决方案 10:
时钟.py
from apscheduler.schedulers.blocking import BlockingScheduler
import pytz
sched = BlockingScheduler(timezone=pytz.timezone('Africa/Lagos'))
@sched.scheduled_job('cron', day_of_week='mon-sun', hour=22)
def scheduled_job():
print('This job is run every week at 10pm.')
#your job here
sched.start()
进程文件
clock: python clock.py
要求.txt
APScheduler==3.0.0
部署后,最后一步是扩展时钟进程。这是一个单例进程,这意味着您永远不需要扩展超过 1 个这样的进程。如果您运行两个,工作就会重复。
$ heroku ps:scale clock=1
来源:https ://devcenter.heroku.com/articles/clock-processes-python
解决方案 11:
可能你已经找到了解决方案@lukik,但如果你想删除一个计划,你应该使用:
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
或者
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
如果你需要使用明确的作业 ID
有关更多信息,请查看:https://apscheduler.readthedocs.io/en/stable/userguide.html#removing-jobs
解决方案 12:
我发现调度程序需要每秒运行一次程序。如果使用在线服务器,成本会很高。所以我有以下内容:
它每分钟运行一次,你可以重新计算等待时间(以秒为单位),将其更改为几小时或几天
import time
import datetime
Initiating = True
print(datetime.datetime.now())
while True:
if Initiating == True:
print("Initiate")
print( datetime.datetime.now())
time.sleep(60 - time.time() % 60+5)
Initiating = False
else:
time.sleep(60)
print("working")
print(datetime.datetime.now())
解决方案 13:
这种方法对我有用,我使用了relativedelta和datetime,并每小时进行一次模数布尔检查。从您启动它时起,它每小时运行一次。
import time
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
#Track next run outside loop and update the next run time within the loop
nxt_run=datetime.now()
#because while loops evaluate at microseconds we basically need to use a boolean evaluation to track when it should run next
while True:
cnow = datetime.now() #track the current time
time.sleep(1) #good to have so cpu doesn't spike
if (cnow.hour % 1 == 0 and cnow >= nxt_run):
print(f"start @{cnow}: next run @{nxt_run}")
nxt_run=cnow+relativedelta(hours=1) #add an hour to the next run
else:
print(f"next run @{nxt_run}")
解决方案 14:
一种选择是编写一个定期执行 Python 脚本的 C/C++ 包装器。您的最终用户将运行 C/C++ 可执行文件,该文件将在后台运行,并定期执行 Python 脚本。这可能不是最好的解决方案,如果您不了解 C/C++ 或想保持 100% Python 版本,则可能行不通。但这似乎是最用户友好的方法,因为人们习惯于单击可执行文件。所有这些都假设 Python 已安装在最终用户的计算机上。
另一个选择是使用 cron job/任务计划程序,但将其作为脚本放入安装程序中,这样最终用户就不必这样做。
扫码咨询,免费领取项目管理大礼包!