如何使用 Celery 和 Django 将任务路由到不同的队列

新手上路,请多包涵

我正在使用以下堆栈:

  • Python 3.6
  • Celery v4.2.1 (经纪人: RabbitMQ v3.6.0
  • 姜戈 v2.0.4

根据 Celery 的文档,在不同的队列上运行计划任务应该就像为 CELERY_ROUTES 上的任务定义相应的队列一样简单,尽管如此,所有任务似乎都在 Celery 的默认队列上执行。

这是 my_app/settings.py 上的配置:

 CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
    },

}

这些任务只是用于测试路由的简单脚本:

文件 app1/tasks.py

 from my_app.celery import app
import time

@app.task()
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)

文件 app2/tasks.py

 from my_app.celery import app
import time

@app.task()
def app2_test():
    print('I am app2_test task!')
    time.sleep(10)

当我使用所有必需的队列运行 Celery 时:

 celery -A my_app worker -B -l info -Q celery,queue1,queue2

RabbitMQ 将显示只有默认队列“ celery ”正在运行任务:

 sudo rabbitmqctl list_queues
# Tasks executed by each queue:
#  - celery 2
#  - queue1 0
#  - queue2 0

有人知道如何解决这种意外行为吗?

问候,

原文由 Ander 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 962
2 个回答

我已经开始工作了,这里有几点需要注意:

根据 Celery 的 4.2.0 文档CELERY_ROUTES 应该是定义队列路由的变量,但它只适用于我使用 CELERY_TASK_ROUTES 代替。任务路由似乎独立于 Celery Beat,因此这仅适用于手动安排的任务:

 app1_test.delay()
app2_test.delay()

或者

app1_test.apply_async()
app2_test.apply_async()

为了让它与 Celery Beat 一起工作,我们只需要在 CELERY_BEAT_SCHEDULE 变量中明确定义队列。文件 my_app/settings.py 的最终设置如下:

 CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
        'options': {'queue': 'queue1'}
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
        'options': {'queue': 'queue2'}
    },

}

并运行 Celery 监听这两个队列:

 celery -A my_app worker -B -l INFO -Q queue1,queue2

在哪里

  • -A :项目或应用程序的名称。
  • -B :启动任务调度程序 Celery beat
  • -l :定义日志记录级别。
  • -Q : 定义了这个worker处理的队列。

我希望这可以为其他开发人员节省一些时间。

原文由 Ander 发布,翻译遵循 CC BY-SA 4.0 许可协议

queue 参数添加到装饰器可能对您有所帮助,

 @app.task(queue='queue1')
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)

原文由 JPG 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题