并行运行气流任务/dags

新手上路,请多包涵

我正在使用气流来编排一些 python 脚本。我有一个“主”dag,从中运行了几个 subdags。我的主要 dag 应该根据以下概述运行:

在此处输入图像描述

我已经通过使用以下几行设法在我的主 dag 中找到了这个结构:

 etl_internal_sub_dag1 >> etl_internal_sub_dag2 >> etl_internal_sub_dag3
etl_internal_sub_dag3 >> etl_adzuna_sub_dag
etl_internal_sub_dag3 >> etl_adwords_sub_dag
etl_internal_sub_dag3 >> etl_facebook_sub_dag
etl_internal_sub_dag3 >> etl_pagespeed_sub_dag

etl_adzuna_sub_dag >> etl_combine_sub_dag
etl_adwords_sub_dag >> etl_combine_sub_dag
etl_facebook_sub_dag >> etl_combine_sub_dag
etl_pagespeed_sub_dag >> etl_combine_sub_dag

我想要气流做的是首先运行 etl_internal_sub_dag1 然后 etl_internal_sub_dag2 然后 etl_internal_sub_dag3 。 When the etl_internal_sub_dag3 is finished I want etl_adzuna_sub_dag , etl_adwords_sub_dag , etl_facebook_sub_dag , and etl_pagespeed_sub_dag to run in parallel.最后,当这最后四个脚本完成后,我希望 etl_combine_sub_dag 运行。

However, when I run the main dag, etl_adzuna_sub_dag , etl_adwords_sub_dag , etl_facebook_sub_dag , and etl_pagespeed_sub_dag are run one by one and not in parallel .

Question: How do I make sure that the scripts etl_adzuna_sub_dag , etl_adwords_sub_dag , etl_facebook_sub_dag , and etl_pagespeed_sub_dag are run in parallel?

编辑: 我的 default_argsDAG 看起来像这样:

 default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': start_date,
    'end_date': end_date,
    'email': ['myname@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

DAG_NAME = 'main_dag'

dag = DAG(DAG_NAME, default_args=default_args, catchup = False)

原文由 Mr. President 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 732
2 个回答

您将需要使用 LocalExecutor

检查您的配置( airflow.cfg ),您可能正在使用 SequentialExectuor 串行执行任务。

Airflow 使用后端数据库来存储元数据。检查您的 airflow.cfg 文件并查找 executor 关键字。默认情况下,Airflow 使用 SequentialExecutor 无论如何都会按顺序执行任务。因此,要允许 Airflow 并行运行任务,您需要在 Postges 或 MySQL 中创建一个数据库并在 airflow.cfgsql_alchemy_conn 参数)中对其进行配置,然后将执行程序更改为 LocalExecutorairflow.cfg 然后运行 airflow initdb

请注意,要使用 LocalExecutor ,您需要使用 Postgres 或 MySQL 而不是 SQLite 作为后端数据库。

更多信息: https ://airflow.incubator.apache.org/howto/initialize-database.html

如果你想真正体验一下 Airflow,你应该考虑设置一个真实的数据库后端并切换到 LocalExecutor。由于 Airflow 被构建为使用强大的 SqlAlchemy 库与其元数据进行交互,因此您应该能够使用任何受支持的数据库后端作为 SqlAlchemy 后端。我们推荐使用 MySQL 或 Postgres。

原文由 kaxil 发布,翻译遵循 CC BY-SA 4.0 许可协议

并行运行任务的一种简单解决方案是将它们放在 [ ] 括号中。例如: task_start >> [task_get_users, task_get_posts, task_get_comments, task_get_todos]

有关更多信息,您可以阅读 来自 towardsdatascience 的这篇文章

原文由 Luca 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题