我对现有的工作流程做了一个小改动,它破坏了气流。这是代码:
dag_name = platform + "_" + report['table']
dag = DAG(
dag_name,
catchup=True,
default_args=default_args,
schedule_interval=report['schedule']
)
with dag:
trigger_report = PythonOperator(
task_id=dag.dag_id + '_trigger_report',
python_callable=trigger_report,
provide_context=True,
op_kwargs={
'report_name': report['report'],
'amazonmws_conn_id': default_args['amazonmws_conn_id']
},
dag=dag
)
这是我收到的错误:
airflow.exceptions.AirflowException: python_callable param must be callable
原文由 Ashley O 发布,翻译遵循 CC BY-SA 4.0 许可协议
好像你正在传递
trigger_report
本身作为python_callable
。这是故意的吗?它已经有价值了吗?
(可能,否则你会得到一个
NameError: name 'trigger_report' is not defined
)