Airflow 工作器配置

新手上路,请多包涵

我是气流的新手。我正在尝试通过参考这篇文章 https://stlong0521.github.io/20161023%20-%20Airflow.html 使用 Celery Executor 设置气流的分布式模式

在详细了解规范之前,我想确认 我已经在单独的实例上安装了 PostgreSQL

设置的详细说明如下:

Airflow 核心/服务器计算机

  • Python 3.5
    • 气流(AIRFLOW_HOME = ~/气流)
    • 芹菜
    • 心理医生2
  • 兔MQ

airflow.cfg 中的配置

 sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow

执行的测试:

 RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)

Airflow 工作计算机

安装了以下内容:

  • Python 3.5 与
    • 气流(AIRFLOW_HOME = ~/气流)
    • 芹菜
  • 心理医生2

airflow.cfg 中所做的配置与服务器中的完全相同:

 sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow

在工作机器上运行的命令的输出:

运行气流花时:

 [2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:password@192.168.1.12:5672//
[I 180219 14:58:15 command:147] Registered tasks:
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:password@192.168.1.12:5672//

我在 Airflow Core 机器上 传递 dag,并且我已经将 dag 将处理的示例数据(Excel 表)复制到同一台核心机器。

我的工人日志 raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1

现在我的查询是

  1. 我是否也应该将 dag 文件夹复制到工作计算机

  2. 现在,我还没有复制工作计算机上的 dag 文件夹,我看不到工作进程接任务。

请指出我在哪里犯了错误以及如何让工作进程接手该进程。

原文由 Soundar Raj 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 827
2 个回答

您的配置文件看起来没问题。正如您所怀疑的那样,所有工作人员确实需要 DAG 文件夹的副本。您可以使用诸如 git 之类的东西来使它们保持同步和最新。

原文由 Daniel Huang 发布,翻译遵循 CC BY-SA 3.0 许可协议

Airflow 的一些最大痛点在于部署和保持 DAG 文件和插件在您的 Airflow 调度程序、Airflow 网络服务器和 Celery 工作节点之间同步。

我们创建了一个名为 Astronomer Open 的开源项目,该项目将 Dockerized Airflow、Celery 和 PostgreSQL 与其他一些好东西一起自动化。该项目的动机是看到如此多的人遇到了相同的痛点而创建了非常相似的设置。

例如,这是 Airflow Dockerfile: https ://github.com/astronomer/astronomer/blob/master/docker/airflow/1.10.2/Dockerfile

和文档: https ://open.astronomer.io/

全面披露:这是我在工作中参与的一个项目——我们还提供在 Kubernetes 上运行的付费 企业版 (文档) 。也就是说,Open Edition 是完全免费的。

原文由 Taylor D. Edmiston 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题