我在 Cloud Composer v1.16.16 上运行 Airflowv1.10.15。
我的 DAG 看起来像这样:
from datetime import datetime, timedelta
# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large
default_args = {
'owner': 'xxxx',
'depends_on_past': False,
'start_date': datetime(2021, 9, 14),
'email_on_failure': True,
'email': ['xxxx'],
'retries': 1,
'retry_delay': timedelta(minutes=2),
'catchup': False
}
# Define the DAG with parameters
dag = DAG(
dag_id='xxxx_v1',
schedule_interval='0 20 * * *',
default_args=default_args,
catchup=False,
max_active_runs=1,
concurrency=1
)
def wd_to_bq(key, val, **kwargs):
logger.info("workday to BQ ingestion")
workday_extract.fetch_wd_load_bq(key, val)
start_load = DummyOperator(task_id='start', dag=dag)
end_load = DummyOperator(task_id='end', dag=dag)
for key, val in workday_config_large.endpoint_tbl_mapping.items():
# Task 1: Process the unmatched records from the view
workday_to_bq = PythonOperator(
dag=dag,
task_id=f'{key}',
execution_timeout=timedelta(minutes=60),
provide_context=True,
python_callable=wd_to_bq,
op_kwargs={'key': key, 'val': val}
)
start_load >> workday_to_bq >> end_load
任务失败并出现错误 - 任务退出并返回代码 Negsignal.SIGKILL 。 python 脚本在我的本地机器上运行良好,并在 15 分钟内完成。有多个端点可以从中提取报告。但是,耗时最长(约 15 分钟)的那个会因此错误而失败,而其他的会成功。
我尝试了很多选项,但似乎都不起作用。有人可以帮忙吗?
原文由 saurabh saraff 发布,翻译遵循 CC BY-SA 4.0 许可协议
我通过增加内存大小解决了这个问题
https://github.com/apache/airflow/issues/10435
应该在运行任务时检查充当工作者的 pod 的内存大小