Airflow ExternalTaskSensor 卡住了

新手上路,请多包涵

我正在尝试使用 ExternalTaskSensor,但它卡在了另一个 DAG 的任务上,该任务已经成功完成。

在这里,第一个 DAG“a”完成了它的任务,然后应该触发通过 ExternalTaskSensor 的第二个 DAG“b”。相反,它会卡在查找 a.first_task 上。

第一个 DAG:

 import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='a',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_first_task():
    print('First task is done')

PythonOperator(
    task_id='first_task',
    python_callable=do_first_task,
    dag=dag)

第二个 DAG:

 import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor

dag = DAG(
    dag_id='b',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_second_task():
    print('Second task is done')

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed',
    external_dag_id='a',
    external_task_id='first_task',
    dag=dag) >> \
PythonOperator(
    task_id='second_task',
    python_callable=do_second_task,
    dag=dag)

我在这里错过了什么?

原文由 Aleksei Solovev 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.5k
2 个回答

ExternalTaskSensor 假设您依赖于具有相同执行日期的 dag 运行中的任务。

这意味着在您的情况下, ab 需要按相同的时间表运行(例如每天上午 9:00 或 w/e)。

否则,您需要在实例化 execution_date_fn 时使用 execution_delta 或 --- ExternalTaskSensor

这是操作员本身内部的文档,以帮助进一步澄清:

 :param execution_delta: time difference with the previous execution to
    look at, the default is the same execution_date as the current task.
    For yesterday, use [positive!] datetime.timedelta(days=1). Either
    execution_delta or execution_date_fn can be passed to
    ExternalTaskSensor, but not both.

:type execution_delta: datetime.timedelta

:param execution_date_fn: function that receives the current execution date
    and returns the desired execution date to query. Either execution_delta
    or execution_date_fn can be passed to ExternalTaskSensor, but not both.

:type execution_date_fn: callable

原文由 jhnclvr 发布,翻译遵循 CC BY-SA 3.0 许可协议

为了澄清我在这里看到的以及其他相关问题,dags 不一定要按照已接受的答案中所述的相同时间表运行。 dags 也不需要具有相同的 start_date 。如果您创建的 ExternalTaskSensor 任务没有 execution_deltaexecution_date_fn ,则两个 dag 需要具有相同的 _执行日期_。碰巧的是,如果两个 dag 具有相同的计划,则每个间隔中的计划运行将具有相同的执行日期。我不确定手动触发的计划 dag 运行的执行日期是什么时候。

For this example to work, dag b ’s ExternalTaskSensor task needs an execution_delta or execution_date_fn parameter. If using an execution_delta parameter, it should be such that b ’s execution date - execution_delta = a ’s execution date.如果使用 execution_date_fn ,那么该函数应该返回 a 的执行日期。

如果您使用的是 TriggerDagRunOperator ,然后使用 ExternalTaskSensor 来检测该 dag 何时完成,您可以执行一些操作,例如将主 dag 的执行日期传递给触发日期 TriggerDagRunOperatorexecution_date 参数,如 execution_date='{{ execution_date }}' 。然后两个 dag 的执行日期将相同,并且您不需要每个 dag 的计划都相同,或者使用 execution_deltaexecution_date_fn 传感器参数。

以上是在 Airflow 1.10.9 上编写和测试的

原文由 tomcm 发布,翻译遵循 CC BY-SA 4.0 许可协议

推荐问题
logo
Stack Overflow 翻译
子站问答
访问
宣传栏