我正在使用气流来编排一些 python 脚本。我有一个“主”dag,从中运行了几个 subdags。我的主要 dag 应该根据以下概述运行:
我已经通过使用以下几行设法在我的主 dag 中找到了这个结构:
etl_internal_sub_dag1 >> etl_internal_sub_dag2 >> etl_internal_sub_dag3
etl_internal_sub_dag3 >> etl_adzuna_sub_dag
etl_internal_sub_dag3 >> etl_adwords_sub_dag
etl_internal_sub_dag3 >> etl_facebook_sub_dag
etl_internal_sub_dag3 >> etl_pagespeed_sub_dag
etl_adzuna_sub_dag >> etl_combine_sub_dag
etl_adwords_sub_dag >> etl_combine_sub_dag
etl_facebook_sub_dag >> etl_combine_sub_dag
etl_pagespeed_sub_dag >> etl_combine_sub_dag
我想要气流做的是首先运行 etl_internal_sub_dag1
然后 etl_internal_sub_dag2
然后 etl_internal_sub_dag3
。 When the etl_internal_sub_dag3
is finished I want etl_adzuna_sub_dag
, etl_adwords_sub_dag
, etl_facebook_sub_dag
, and etl_pagespeed_sub_dag
to run in parallel.最后,当这最后四个脚本完成后,我希望 etl_combine_sub_dag
运行。
However, when I run the main dag, etl_adzuna_sub_dag
, etl_adwords_sub_dag
, etl_facebook_sub_dag
, and etl_pagespeed_sub_dag
are run one by one and not in parallel .
Question: How do I make sure that the scripts etl_adzuna_sub_dag
, etl_adwords_sub_dag
, etl_facebook_sub_dag
, and etl_pagespeed_sub_dag
are run in parallel?
编辑: 我的 default_args
和 DAG
看起来像这样:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': start_date,
'end_date': end_date,
'email': ['myname@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
DAG_NAME = 'main_dag'
dag = DAG(DAG_NAME, default_args=default_args, catchup = False)
原文由 Mr. President 发布,翻译遵循 CC BY-SA 4.0 许可协议
您将需要使用
LocalExecutor
。检查您的配置(
airflow.cfg
),您可能正在使用SequentialExectuor
串行执行任务。Airflow 使用后端数据库来存储元数据。检查您的
airflow.cfg
文件并查找executor
关键字。默认情况下,Airflow 使用SequentialExecutor
无论如何都会按顺序执行任务。因此,要允许 Airflow 并行运行任务,您需要在 Postges 或 MySQL 中创建一个数据库并在airflow.cfg
(sql_alchemy_conn
参数)中对其进行配置,然后将执行程序更改为LocalExecutor
在airflow.cfg
然后运行airflow initdb
。请注意,要使用
LocalExecutor
,您需要使用 Postgres 或 MySQL 而不是 SQLite 作为后端数据库。更多信息: https ://airflow.incubator.apache.org/howto/initialize-database.html