我正在尝试使用 ExternalTaskSensor,但它卡在了另一个 DAG 的任务上,该任务已经成功完成。
在这里,第一个 DAG“a”完成了它的任务,然后应该触发通过 ExternalTaskSensor 的第二个 DAG“b”。相反,它会卡在查找 a.first_task 上。
第一个 DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='a',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_first_task():
print('First task is done')
PythonOperator(
task_id='first_task',
python_callable=do_first_task,
dag=dag)
第二个 DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
dag = DAG(
dag_id='b',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_second_task():
print('Second task is done')
ExternalTaskSensor(
task_id='wait_for_the_first_task_to_be_completed',
external_dag_id='a',
external_task_id='first_task',
dag=dag) >> \
PythonOperator(
task_id='second_task',
python_callable=do_second_task,
dag=dag)
我在这里错过了什么?
原文由 Aleksei Solovev 发布,翻译遵循 CC BY-SA 4.0 许可协议
ExternalTaskSensor
假设您依赖于具有相同执行日期的 dag 运行中的任务。这意味着在您的情况下,
a
和b
需要按相同的时间表运行(例如每天上午 9:00 或 w/e)。否则,您需要在实例化
execution_date_fn
时使用execution_delta
或 ---ExternalTaskSensor
。这是操作员本身内部的文档,以帮助进一步澄清: