芹菜:找不到模块

新手上路,请多包涵

我正在使用开放式语义搜索 (OSS),我想使用 Flower 工具 监控其进程。 Celery 需要的 worker 应该按照 OSS 在 其网站 上的说明提供

工作人员将执行排队文件的分析和索引等任务。 workers 由 etl/tasks.py 实现,并将在启动时由服务 opensemanticsearch 自动启动。

此 tasks.py 文件如下所示:

 #!/usr/bin/python3
# -*- coding: utf-8 -*-

#
# Queue tasks for batch processing and parallel processing
#

# Queue handler
from celery import Celery

# ETL connectors
from etl import ETL
from etl_delete import Delete
from etl_file import Connector_File
from etl_web import Connector_Web
from etl_rss import Connector_RSS

verbose = True
quiet = False

app = Celery('etl.tasks')
app.conf.CELERYD_MAX_TASKS_PER_CHILD = 1

etl_delete = Delete()
etl_web = Connector_Web()
etl_rss = Connector_RSS()

#
# Delete document with URI from index
#

@app.task(name='etl.delete')
def delete(uri):
    etl_delete.delete(uri=uri)

#
# Index a file
#

@app.task(name='etl.index_file')
def index_file(filename, wait=0, config=None):

    if wait:
        time.sleep(wait)

    etl_file = Connector_File()

    if config:
        etl_file.config = config

    etl_file.index(filename=filename)

#
# Index file directory
#

@app.task(name='etl.index_filedirectory')
def index_filedirectory(filename):

    from etl_filedirectory import Connector_Filedirectory

    connector_filedirectory = Connector_Filedirectory()

    result = connector_filedirectory.index(filename)

    return result

#
# Index a webpage
#
@app.task(name='etl.index_web')
def index_web(uri, wait=0, downloaded_file=False, downloaded_headers=[]):

    if wait:
        time.sleep(wait)

    result = etl_web.index(uri, downloaded_file=downloaded_file, downloaded_headers=downloaded_headers)

    return result

#
# Index full website
#

@app.task(name='etl.index_web_crawl')
def index_web_crawl(uri, crawler_type="PATH"):

    import etl_web_crawl

    result = etl_web_crawl.index(uri, crawler_type)

    return result

#
# Index webpages from sitemap
#

@app.task(name='etl.index_sitemap')
def index_sitemap(uri):

    from etl_sitemap import Connector_Sitemap

    connector_sitemap = Connector_Sitemap()

    result = connector_sitemap.index(uri)

    return result

#
# Index RSS Feed
#

@app.task(name='etl.index_rss')
def index_rss(uri):

    result = etl_rss.index(uri)

    return result

#
# Enrich with / run plugins
#

@app.task(name='etl.enrich')
def enrich(plugins, uri, wait=0):

    if wait:
        time.sleep(wait)

    etl = ETL()
    etl.read_configfile('/etc/opensemanticsearch/etl')
    etl.read_configfile('/etc/opensemanticsearch/enhancer-rdf')

    etl.config['plugins'] = plugins.split(',')

    filename = uri

    # if exist delete protocoll prefix file://
    if filename.startswith("file://"):
        filename = filename.replace("file://", '', 1)

    parameters = etl.config.copy()

    parameters['id'] = uri
    parameters['filename'] = filename

    parameters, data = etl.process (parameters=parameters, data={})

    return data

#
# Read command line arguments and start
#

#if running (not imported to use its functions), run main function
if __name__ == "__main__":

    from optparse import OptionParser

    parser = OptionParser("etl-tasks [options]")
    parser.add_option("-q", "--quiet", dest="quiet", action="store_true", default=False, help="Don\'t print status (filenames) while indexing")
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="Print debug messages")

    (options, args) = parser.parse_args()

    if options.verbose == False or options.verbose==True:
        verbose = options.verbose
        etl_delete.verbose = options.verbose
        etl_web.verbose = options.verbose
        etl_rss.verbose = options.verbose

    if options.quiet == False or options.quiet==True:
        quiet = options.quiet

    app.worker_main()

我阅读了多篇关于 Celery 的教程,根据我的理解,这一行应该可以完成这项工作

celery -A etl.tasks flower

但事实并非如此。结果是语句

错误:无法加载芹菜应用程序。未找到模块 etl。

同为

celery -A etl.tasks worker --loglevel=debug

所以芹菜本身似乎是造成麻烦的原因,而不是花。我也试过例如 celery -A etl.index_filedirectory worker –loglevel=debug 但结果相同。

我错过了什么?我是否必须以某种方式告诉 Celery 在哪里可以找到 etl.tasks?在线研究并没有真正显示出类似的情况,大多数“找不到模块”错误似乎都是在导入内容时发生的。所以这可能是一个愚蠢的问题,但我无法在任何地方找到解决方案。我希望你们能帮助我。不幸的是,我要到星期一才能回复,提前抱歉。

原文由 starsnpixel 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 320
1 个回答

我遇到了同样的问题,我按如下方式安装和配置了我的队列,并且它有效。

安装 RabbitMQ

苹果系统

brew install rabbitmq
sudo vim ~/.bash_profile

bash_profile 添加以下行:

PATH=$PATH:/usr/local/sbin

然后更新 bash_profile

sudo source ~/.bash_profile

Linux

sudo apt-get install rabbitmq-server

配置 RabbitMQ

启动队列:

sudo rabbitmq-server

在另一个终端中,配置队列:

 sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_user_tags myuser mytag
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

启动芹菜

我建议进入包含 task.py 的文件夹并使用以下命令:

celery -A task worker -l info -Q celery --concurrency 5

原文由 sashaboulouds 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题