Airflow DAG 中的外部文件

新手上路,请多包涵

我正在尝试访问 Airflow 任务中的外部文件以读取一些 sql,但出现“找不到文件”。有人遇到过这个吗?

 from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dat',
    start_date=datetime(2017, 1, 1),
    catchup=False,
    schedule_interval=timedelta(days=1)
)

def run_query():
    # read the query
    query = open('sql/queryfile.sql')
    # run the query
    execute(query)

tas = PythonOperator(
    task_id='run_query', dag=dag, python_callable=run_query)

日志状态如下:

 IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'

我知道我可以简单地将查询复制并粘贴到同一个文件中,这确实不是一个很好的解决方案。有多个查询并且文本非常大,用 Python 代码嵌入它会降低可读性。

原文由 Alessandro Mariani 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 538
2 个回答

这是一个使用 Variable 来简化操作的示例。

  • 首先在 Airflow UI -> Admin -> Variable 添加 变量,例如。 {key: 'sql_path', values: 'your_sql_script_folder'}

  • 然后在您的 DAG 中添加以下代码,以使用刚刚添加的 Airflow 中的变量。

DAG代码:

 import airflow
from airflow.models import Variable

tmpl_search_path = Variable.get("sql_path")

dag = airflow.DAG(
   'tutorial',
    schedule_interval="@daily",
    template_searchpath=tmpl_search_path,  # this
    default_args=default_args
)
  • 现在您可以在文件夹变量下使用 sql 脚本名称或路径

  • 您可以在 了解更多

原文由 zhongjiajie 发布,翻译遵循 CC BY-SA 4.0 许可协议

所有相对路径都参考了 AIRFLOW_HOME 环境变量。尝试:

  • 给出绝对路径
  • 将文件相对于 AIRFLOW_HOME 放置
  • 尝试在 python 可调用文件中记录 PWD ,然后决定要提供的路径(最佳选择)

原文由 Priyank Mehta 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题