flask celery work报错KeyError: 'app.tasks.send_async_email'

南方仔
  • 2
新手上路,请多包涵

这个问题小弟看了好多天了,在网上搜索了各种答案,尝试了很多次依旧没解决,还请劳烦各位大佬帮忙看看。

依赖包:
celery==4.4.0
Flask==1.1.1

项目结构如下:
image.png

route.py 中提交异步任务
tasks.py 中存放异步函数
image.png

结果报错如下:

## celery -A app:celery worker -B -E --loglevel=INFO 
 
 -------------- celery@felixdeMacBook-Pro.local v4.4.0 (cliffs)
--- ***** ----- 
-- ******* ---- Darwin-19.3.0-x86_64-i386-64bit 2020-02-23 18:31:40
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         app:0x105bf0518
- ** ---------- .> transport:   redis://:**@localhost:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: ON
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]


[2020-02-23 18:31:41,293: INFO/Beat] beat: Starting...
[2020-02-23 18:31:41,294: INFO/MainProcess] Connected to redis://:**@localhost:6379/0
[2020-02-23 18:31:41,323: INFO/MainProcess] mingle: searching for neighbors
[2020-02-23 18:31:42,377: INFO/MainProcess] mingle: all alone
[2020-02-23 18:31:42,431: INFO/MainProcess] celery@felixdeMacBook-Pro.local ready.
[2020-02-23 18:32:53,110: ERROR/MainProcess] Received unregistered task of type 'app.tasks.send_async_email'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[10, 20], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (83b)
Traceback (most recent call last):
  File "/Users/felixweek/venvs/py3_venv/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
    strategy = strategies[type_]
KeyError: 'app.tasks.send_async_email'
回复
阅读 2.1k
1 个回答
南方仔
  • 2
新手上路,请多包涵
✓ 已被采纳

问题解决了,我使用的工厂模式,要在create_app前导入执行函数,添加这一句
celery.conf['imports'] = ['app.tasks', ]

CELERY_BROKER_URL = "redis://:*****@localhost:6379/0"  
celery = Celery(__name_, broker=CELERY_BROKER_URL)  
# 在此处导入执行函数
celery.conf['imports'] = ['app.tasks', ]  
  
  
def create_app(env=None):  
    app = Flask(__name__)  
    app.config.from_object(config\[env\])  
  
    register_logging(app)  
    register_sentry(app)  
    register_routes(app)  
  
    register_command(app)  
  
    register_celery(app)  
  
    return app
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
你知道吗?

宣传栏