当 PythonOperator 出现错误“Negsignal.SIGKILL”时,Airflow DAG 失败

新手上路,请多包涵

我在 Cloud Composer v1.16.16 上运行 Airflowv1.10.15。

我的 DAG 看起来像这样:

 from datetime import datetime, timedelta

# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large

default_args = {
    'owner': 'xxxx',
    'depends_on_past': False,
    'start_date': datetime(2021, 9, 14),
    'email_on_failure': True,
    'email': ['xxxx'],
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
    'catchup': False
}

# Define the DAG with parameters
dag = DAG(
    dag_id='xxxx_v1',
    schedule_interval='0 20 * * *',
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    concurrency=1
)

def wd_to_bq(key, val, **kwargs):
    logger.info("workday to BQ ingestion")
    workday_extract.fetch_wd_load_bq(key, val)

start_load = DummyOperator(task_id='start', dag=dag)

end_load = DummyOperator(task_id='end', dag=dag)

for key, val in workday_config_large.endpoint_tbl_mapping.items():
    # Task 1: Process the unmatched records from the view
    workday_to_bq = PythonOperator(
        dag=dag,
        task_id=f'{key}',
        execution_timeout=timedelta(minutes=60),
        provide_context=True,
        python_callable=wd_to_bq,
        op_kwargs={'key': key, 'val': val}
    )
    start_load >> workday_to_bq >> end_load

任务失败并出现错误 - 任务退出并返回代码 Negsignal.SIGKILL 。 python 脚本在我的本地机器上运行良好,并在 15 分钟内完成。有多个端点可以从中提取报告。但是,耗时最长(约 15 分钟)的那个会因此错误而失败,而其他的会成功。

我尝试了很多选项,但似乎都不起作用。有人可以帮忙吗?

原文由 saurabh saraff 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 597
1 个回答
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进