如何在Flask中创建Celery实例?

MrChi
  • 489

我在flask程序中使用了工厂函数和蓝本,把Flask实例的创建放在了manage.py中(看过Flask Web开发这本书的同学应该懂我)。

现在我想要创建一个Celery的实例(在app程序包中),Celery的实例创建时需要Flask实例的程序上下文来读取配置并在Task的调用函数里增加程序上下文,代码如下(摘自Flask官方文档):

def make_celery(app):
    celery_app = Celery(__name__)
    celery_app.config_from_object(app.config)
    TaskBase = celery_app.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery_app.Task = ContextTask
    return celery_app

现在的问题是:

  1. Flask实例在manage.py中创建,不在app程序包中,没法在app包中导入Flask实例(强行导入就没有了大型程序结构的意义了);

  2. 定义Celery Task的时候,也需要Celery的实例对象,所以也不能在manage.py中创建Celery实例;

现在这两个实例相互制约,好头疼,请问各位大神都是怎么做的?


网上看到一个方法,在app程序包创建celery实例的时候,直接调用Flask实例创建函数来创建了一个flask实例,但是这样的话,Flask运行时的程序实例还是创建Celery时的程序实例吗?程序上下文会不会不一致?

先谢为敬!

回复
阅读 5.7k
4 个回答
✓ 已被采纳

这个问题提了有两年了,最近在重构一个Flask的项目时,又捡了起来。尝试了各种方法后,自己实现了一个不太python的方案。

参照 Flask 注册 Blueprint 的方式,延迟创建 Celery app 实例,同时在创建 Celery app 实例时,将任务函数修饰为 Celery task,并把 Celery task 绑定到 Flask app上(方便调用)。

参考 https://github.com/chiqj/flas...

专门一个flask实例worker是最简单的,就是提问最后说的独立。代价就是上下文不同,忘记你的g变量,参数只能传递过去。都用一个实例,要摆脱循环导入太麻烦了。

最近也在折腾这个,最后我采用的方法是:
1、在app同级建立一个celery_app,配置和创建一个celery对象,用于给flask提供celery的导入(task和app的初始化)。
2、在app的构建函数__init__.py中参考官网提供一共make_celery的函数重写celery的task。
3、在最外层入口manager.py中,执行make_celery来给celery的task增加上下文。
4、最后启动celery的时候需要启动manager.py中的celery对象(已经重写了task的celery)。
5、运行成功。详细见https://github.com/keejo125/f...不知道有没有讲清楚,如果理解有偏差,也请指正。

虾仁
  • 1
新手上路,请多包涵

一、版本说明:
celery==4.4.2

二、创建celery app 代码如下:

说明: create_router 要在 make_celery 之后导入,因为只有在 make_celery 后才会有 celery.current_app,否则后续在注册 task 时会没有 current_app,注册无效,调用时会阻塞请求。
# manage.py
# -*- coding: utf-8 -*-
"""
@Time: 2020-02-04
@Author: kang.song
"""
from flask_script import Manager
import os
import sys
from celery import Celery

sys.path.append(os.path.dirname(sys.path[0]))

from app.app import create_app


def make_celery(app):
    celery_app = Celery('genbu_job', broker=app.config['CELERY_BROKER'], include=["tasks.ci_tasks"])
    celery_app.conf.timezone = 'Asia/Shanghai'
    celery_app.conf.task_routes = {
        'ci_task.*': {'queue': 'ci_queue'},
        'cd_task.*': {'queue': 'cd_queue'},
        'sync_task.*': {'queue': 'sync_queue'}
    }
    TaskBase = celery_app.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery_app.Task = ContextTask
    return celery_app


app = create_app('production')
# celery
celery_app = make_celery(app)
# 路由
from app.urls import create_router
create_router(app)

# 插入脚本
manager = Manager(app)

if __name__ == '__main__':
    manager.run()

二、调用方法如下:

# view.py
from app.api.application.ci_task import ci_task_func
from utils.common import authorization, check_args, TransactionResource, generate_auth_token
from utils.response import success_response, error_response


class Index(TransactionResource):
    def get(self):
        add_together.delay(2, 2)
        return success_response()

三、项目结构如下:
image.png

四、任务内容如下:

说明:使用current_app来注册task
import logging

from celery import current_app


logger = logging.getLogger(__name__)


@current_app.task(name="ci_task.add_together")
def add_together(a, b):
    logger.info(a + b)
    return a + b

五、启动命令如下:
celery -A manage:celery_app worker --loglevel=info -Q ci_queue
image.png

六、总结:
利用celery.current_app避免从项目中直接导入celery引发循环导包。

宣传栏