Airflow 使用 PythonOperator 的模板文件

新手上路,请多包涵

获取 BashOperatorSqlOperator 为其模板选择外部文件的方法在某种程度上有明确的记录,但看看 PythonOperator 我对我的测试从文档中了解是行不通的。我不确定 templates_extstemplates_dict 参数如何正确交互以获取文件。

在我创建的 dags 文件夹中: pyoptemplate.sqlpyoptemplate.t 以及 test_python_operator_template.py

pyoptemplate.sql:

 SELECT * FROM {{params.table}};

pyoptemplate.t:

 SELECT * FROM {{params.table}};

test_python_operator_template.py:

 # coding: utf-8
# vim:ai:si:et:sw=4 ts=4 tw=80
"""
# A Test of Templates in PythonOperator
"""

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

import pprint

pp = pprint.PrettyPrinter(indent=4)

def templated_function(ds, **kwargs):
    """This function will try to use templates loaded from external files"""
    pp.pprint(ds)
    pp.pprint(kwargs)

# Define the DAG
dag = DAG(dag_id='test_python_operator_template_dag',
          default_args={"owner": "lamblin",
                        "start_date": datetime.now()},
          template_searchpath=['/Users/daniellamblin/airflow/dags'],
          schedule_interval='@once')

# Define the single task in this controller example DAG
op = PythonOperator(task_id='test_python_operator_template',
                    provide_context=True,
                    python_callable=templated_function,
                    templates_dict={
                        'pyoptemplate': '',
                        'pyoptemplate.sql': '',
                        'sql': 'pyoptemplate',
                        'file1':'pyoptemplate.sql',
                        'file2':'pyoptemplate.t',
                        'table': '{{params.table}}'},
                    templates_exts=['.sql','.t'],
                    params={'condition_param': True,
                            'message': 'Hello World',
                            'table': 'TEMP_TABLE'},
                    dag=dag)

运行结果显示 table 被正确地模板化为字符串,但其他人没有提取任何文件进行模板化。

 dlamblin$ airflow test test_python_operator_template_dag test_python_operator_template 2017-01-18
[2017-01-18 23:58:06,698] {__init__.py:36} INFO - Using executor SequentialExecutor
[2017-01-18 23:58:07,342] {models.py:154} INFO - Filling up the DagBag from /Users/daniellamblin/airflow/dags
[2017-01-18 23:58:07,620] {models.py:1196} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2017-01-18 23:58:07,620] {models.py:1219} INFO - Executing <Task(PythonOperator): test_python_operator_template> on 2017-01-18 00:00:00
'2017-01-18'
{   u'END_DATE': '2017-01-18',
    u'conf': <module 'airflow.configuration' from '/Library/Python/2.7/site-packages/airflow/configuration.pyc'>,
    u'dag': <DAG: test_python_operator_template_dag>,
    u'dag_run': None,
    u'ds_nodash': u'20170118',
    u'end_date': '2017-01-18',
    u'execution_date': datetime.datetime(2017, 1, 18, 0, 0),
    u'latest_date': '2017-01-18',
    u'macros': <module 'airflow.macros' from '/Library/Python/2.7/site-packages/airflow/macros/__init__.pyc'>,
    u'params': {   'condition_param': True,
                   'message': 'Hello World',
                   'table': 'TEMP_TABLE'},
    u'run_id': None,
    u'tables': None,
    u'task': <Task(PythonOperator): test_python_operator_template>,
    u'task_instance': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>,
    u'task_instance_key_str': u'test_python_operator_template_dag__test_python_operator_template__20170118',
    'templates_dict': {   'file1': u'pyoptemplate.sql',
                          'file2': u'pyoptemplate.t',
                          'pyoptemplate': u'',
                          'pyoptemplate.sql': u'',
                          'sql': u'pyoptemplate',
                          'table': u'TEMP_TABLE'},
    u'test_mode': True,
    u'ti': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>,
    u'tomorrow_ds': '2017-01-19',
    u'tomorrow_ds_nodash': u'20170119',
    u'ts': '2017-01-18T00:00:00',
    u'ts_nodash': u'20170118T000000',
    u'yesterday_ds': '2017-01-17',
    u'yesterday_ds_nodash': u'20170117'}
[2017-01-18 23:58:07,634] {python_operator.py:67} INFO - Done. Returned value was: None

原文由 dlamblin 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 670
2 个回答

从 Airflow 1.8 开始,PythonOperator 在 __init__ 中替换其 template_ext 字段的方式不起作用。任务仅在 template_ext __class__ 。要创建一个获取 SQL 模板文件的 PythonOperator,您只需执行以下操作:

 class SQLTemplatedPythonOperator(PythonOperator):
    template_ext = ('.sql',)

然后在任务运行时从任务中访问 SQL:

 SQLTemplatedPythonOperator(
    templates_dict={'query': 'my_template.sql'},
    params={'my_var': 'my_value'},
    python_callable=my_func,
    provide_context=True,
)

def my_func(**context):
    context['templates_dict']['query']

原文由 Ardan 发布,翻译遵循 CC BY-SA 3.0 许可协议

最近遇到了同样的问题,终于解决了。 @Ardan 的解决方案是正确的,但只想重复一个更完整的答案,并提供一些关于 Airflow 如何为新手工作的细节。

当然你首先需要其中之一:

 from airflow.operators.python_operator import PythonOperator

class SQLTemplatedPythonOperator(PythonOperator):

    # somehow ('.sql',) doesn't work but tuple of two works...
    template_ext = ('.sql','.abcdefg')

假设你有一个像下面这样的 sql 模板文件:

 # stored at path: $AIRFLOW_HOME/sql/some.sql
select {{some_params}} from my_table;

首先确保将文件夹添加到 dag 参数中的搜索路径。

不要将 template_searchpath 传递给 args,然后将 args 传递给 DAG!!!!它不起作用。

 dag = DAG(
    dag_id= "some_name",
    default_args=args,
    schedule_interval="@once",
    template_searchpath='/Users/your_name/some_path/airflow_home/sql'
)

那么您的接线员电话将是

SQLTemplatedPythonOperator(
        templates_dict={'query': 'some.sql'},
        op_kwargs={"args_directly_passed_to_your_function": "some_value"},
        task_id='dummy',
        params={"some_params":"some_value"},
        python_callable=your_func,
        provide_context=True,
        dag=dag,
    )

你的功能将是:

 def your_func(args_directly_passed_to_your_function=None):
    query = context['templates_dict']['query']
    dome_some_thing(query)

一些解释:

  1. Airflow 使用上下文中的值来呈现您的模板。要手动将其添加到上下文中,您可以像上面那样使用 params 字段。

  2. PythonOperator 不再像@Ardan 提到的那样从 template_ext 字段中获取模板文件扩展名。源代码在 这里。它只需要从 self.class.template_ext 扩展。

  3. Airflow 循环遍历 template_dict 字段,如果 value.endswith(file_extension) == True,则呈现模板。

原文由 P. Xie 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
logo
Stack Overflow 翻译
子站问答
访问
宣传栏