背景
最近有一个定时任务的需求,而当前的系统环境是Flask + Celery + Redis
,而celery刚好是可以配置定时任务的,于是参考官网研究了一下。
主要实现思路如下:
- 程序中定义任务
celery
配置celerybeat_schedule
- 启动
celery beat
来执行定时任务
官网描述的比较简单,在demo开发测试也遇到一些问题,记录如下。
定时任务配置
celery
定时任务的配置如下:
1 | from celery.schedules import crontab |
主要有两个:
- CELERYBEAT_SCHEDULE:这里列出了每个定时任务的具体内容。
task
:任务的函数schedule
:定时配置,可以通过crontab
方式。args
:传给任务的参数。
- CELERY_TIMEZONE:时间的时区。支持
pytz
中的所有时区,中国为:Asia/Shanghai
。
单文件模式的celery定时任务
单文件模式的flask+celery
由于不存在相互引用,所以比较简单。
创建任务:
1
2
3
def scheduled_task(*args, **kwargs):
print(time.strftime('%Y.%m.%d %H:%M:%S', time.localtime(time.time())))更新配置:
1
2
3
4
5
6
7
8
9
10
11
12app.config.update(
CELERY_BROKER_URL = "redis://localhost:6379/0",
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0',
CELERYBEAT_SCHEDULE = {
'add-every-minute': {
'task': 'app.scheduled_task',
'schedule': crontab(minute='*'),
'args': (16, 16)
}
},
CELERY_TIMEZONE='Asia/Shanghai'
)启动
celery
,并开启beat
进行定时任务1
celery worker -A app.celery -B --loglevel=info
工厂模式的celery定时任务
项目架构如下:
1 | ├── app |
其中celery
的配置在放在celery_app
文件夹中的__init__.py
中进行。
创建任务:
在
flask
的scheduled
模块中定义定时任务:1
2
3
4
5
6
7
8
9# -*- coding: utf-8 -*-
# app/scheduled/tasks.py
from celery_app import celery
import time
def scheduled_task(*args, **kwargs):
print(time.strftime('%Y.%m.%d %H:%M:%S', time.localtime(time.time())))更新配置:
在celery的配置中更新,这里不仅需要配置定时任务部分,还必须在
celery
初始化时把任务对应的文件加入到include
中。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# -*- coding: utf-8 -*-
# /celery_app/__init__.py
from celery import Celery
from celery.schedules import crontab
celery = Celery('celery_app',
broker='redis://localhost:6379/1',
backend='redis://localhost:6379/1',
include=['celery_app.tasks','app.main.tasks', 'app.scheduled.tasks'],
)
celery.conf.update(
CELERY_BROKER_URL = 'redis://localhost:6379/1',
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1',
CELERYBEAT_SCHEDULE = {
'add-every-minute': {
'task': 'app.scheduled.tasks.scheduled_task',
'schedule': crontab(minute='*'),
'args': (16, 16)
}
},
CELERY_TIMEZONE = 'Asia/Shanghai'
)启动
celery
,并开启beat
进行定时任务1
celery worker -A manager.celery -B --loglevel=info
坑与建议
在配置
CELERYBEAT_SCHEDULE
时,schedule
参数建议使用crontab
的方式配置,但需要注意的是这里的crontab
是从celery.schedules
中引用的,而不是直接import contab
,否则会有如下报错:1
2
3
4raceback (most recent call last):
File "app.py", line 19, in <module>
'schedule': crontab(minute='*'),
TypeError: 'module' object is not callable在工厂模式中,
celery
配置部分,不仅在include
中需要加入任务的文件,在CELERYBEAT_SCHEDULE
的task
里,也需要指定具体的任务函数名而不是函数所在的文件名,否则会提示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16[2019-05-19 12:24:00,380: ERROR/MainProcess] Received unregistered task of type u'app.scheduled.tasks'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
'[[16, 16], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (83b)
Traceback (most recent call last):
File "/home/coding/.local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 558, in on_task_received
strategy = strategies[type_]
KeyError: u'app.scheduled.tasks'启动
celery
时,如果程序中指定了borker
(比如文中的redis
),那么需要使用-B
参数直接启用celery beat
。在官网的例子中,启动执行定时任务可以在启动celery之后
1
celery worker -A app.celery --loglevel=info
在启动
beat
进行定时任务:1
celery beat
但是,这种方式启动后我们发现,
1
2
3
4
5
6
7
8
9
10celery beat v4.2.1 (windowlicker) is starting.
__ - ... __ - _
LocalTime -> 2019-05-19 15:41:25
Configuration ->
. broker -> amqp://guest:**@localhost:5672//
. loader -> celery.loaders.default.Loader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)broker变成了默认配置,而非我们在程序中配置的redis。所以导致看起来完全没有报错,但实际无法启动定时任务。
所以建议还是通过增加
-B
参数,一次性启动celery
和celery beat
。