celery之后台定时任务

背景

最近有一个定时任务的需求,而当前的系统环境是Flask + Celery + Redis,而celery刚好是可以配置定时任务的,于是参考官网研究了一下。

主要实现思路如下:

  • 程序中定义任务
  • celery配置celerybeat_schedule
  • 启动celery beat来执行定时任务

官网描述的比较简单,在demo开发测试也遇到一些问题,记录如下。

定时任务配置

celery定时任务的配置如下:

1
2
3
4
5
6
7
8
9
10
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
'add-every-minute': {
'task': 'app.scheduled_task',
'schedule': crontab(minute='*'),
'args': (16, 16)
}
},
CELERY_TIMEZONE='Asia/Shanghai'

主要有两个:

  • CELERYBEAT_SCHEDULE:这里列出了每个定时任务的具体内容。
    • task:任务的函数
    • schedule:定时配置,可以通过crontab方式。
    • args:传给任务的参数。
  • CELERY_TIMEZONE:时间的时区。支持pytz中的所有时区,中国为:Asia/Shanghai

单文件模式的celery定时任务

单文件模式的flask+celery由于不存在相互引用,所以比较简单。

  • 创建任务:

    1
    2
    3
    @celery.task
    def scheduled_task(*args, **kwargs):
    print(time.strftime('%Y.%m.%d %H:%M:%S', time.localtime(time.time())))
  • 更新配置:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    app.config.update(
    CELERY_BROKER_URL = "redis://localhost:6379/0",
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0',
    CELERYBEAT_SCHEDULE = {
    'add-every-minute': {
    'task': 'app.scheduled_task',
    'schedule': crontab(minute='*'),
    'args': (16, 16)
    }
    },
    CELERY_TIMEZONE='Asia/Shanghai'
    )
  • 启动celery,并开启beat进行定时任务

    1
    celery worker -A app.celery -B --loglevel=info

工厂模式的celery定时任务

项目架构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
├── app
│ ├── __init__.py
│ ├── main
│ │ ├── __init__.py
│ │ ├── tasks.py
│ │ ├── views.py
│ └── scheduled
│ ├── __init__.py
│ ├── tasks.py
├── celery_app
│ ├── __init__.py
│ ├── tasks.py
├── config.py
├── manager.py
└── README.md

其中celery的配置在放在celery_app文件夹中的__init__.py中进行。

  • 创建任务:

    flaskscheduled模块中定义定时任务:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # -*- coding: utf-8 -*-
    # app/scheduled/tasks.py
    from celery_app import celery
    import time


    @celery.task
    def scheduled_task(*args, **kwargs):
    print(time.strftime('%Y.%m.%d %H:%M:%S', time.localtime(time.time())))
  • 更新配置:

    在celery的配置中更新,这里不仅需要配置定时任务部分,还必须在celery初始化时把任务对应的文件加入到include中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # -*- coding: utf-8 -*-
    # /celery_app/__init__.py
    from celery import Celery
    from celery.schedules import crontab

    celery = Celery('celery_app',
    broker='redis://localhost:6379/1',
    backend='redis://localhost:6379/1',
    include=['celery_app.tasks','app.main.tasks', 'app.scheduled.tasks'],
    )
    celery.conf.update(
    CELERY_BROKER_URL = 'redis://localhost:6379/1',
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/1',
    CELERYBEAT_SCHEDULE = {
    'add-every-minute': {
    'task': 'app.scheduled.tasks.scheduled_task',
    'schedule': crontab(minute='*'),
    'args': (16, 16)
    }
    },
    CELERY_TIMEZONE = 'Asia/Shanghai'
    )
  • 启动celery,并开启beat进行定时任务

    1
    celery worker -A manager.celery -B --loglevel=info

坑与建议

  • 在配置CELERYBEAT_SCHEDULE时,schedule参数建议使用crontab的方式配置,但需要注意的是这里的crontab是从celery.schedules中引用的,而不是直接import contab,否则会有如下报错:

    1
    2
    3
    4
    raceback (most recent call last):
    File "app.py", line 19, in <module>
    'schedule': crontab(minute='*'),
    TypeError: 'module' object is not callable
  • 在工厂模式中,celery配置部分,不仅在include中需要加入任务的文件,在CELERYBEAT_SCHEDULEtask里,也需要指定具体的任务函数名而不是函数所在的文件名,否则会提示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    [2019-05-19 12:24:00,380: ERROR/MainProcess] Received unregistered task of type u'app.scheduled.tasks'.
    The message has been ignored and discarded.

    Did you remember to import the module containing this task?
    Or maybe you're using relative imports?

    Please see
    http://docs.celeryq.org/en/latest/internals/protocol.html
    for more information.

    The full contents of the message body was:
    '[[16, 16], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (83b)
    Traceback (most recent call last):
    File "/home/coding/.local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 558, in on_task_received
    strategy = strategies[type_]
    KeyError: u'app.scheduled.tasks'
  • 启动celery时,如果程序中指定了borker(比如文中的redis),那么需要使用-B参数直接启用celery beat

    在官网的例子中,启动执行定时任务可以在启动celery之后

    1
    celery worker -A app.celery --loglevel=info

    在启动beat进行定时任务:

    1
    celery beat

    但是,这种方式启动后我们发现,

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    celery beat v4.2.1 (windowlicker) is starting.
    __ - ... __ - _
    LocalTime -> 2019-05-19 15:41:25
    Configuration ->
    . broker -> amqp://guest:**@localhost:5672//
    . loader -> celery.loaders.default.Loader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)

    broker变成了默认配置,而非我们在程序中配置的redis。所以导致看起来完全没有报错,但实际无法启动定时任务。

    所以建议还是通过增加-B参数,一次性启动celerycelery beat