如何使用 AirFlow 运行 python 文件的文件夹?

新手上路,请多包涵

我在 python 文件的文件夹中有一系列 Python 任务:file1.py、file2.py、…

我阅读了 Airflow 文档,但看不到如何在 DAG 中指定 python 文件的文件夹和文件名?

我想执行那些 python 文件(不是通过 Python Operator 执行的 Python 函数)。

任务 1:执行 file1.py(带有一些导入包)

任务 2:执行 file2.py(使用其他一些导入包)

这会很有帮助。感谢和问候

原文由 tensor 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 647
2 个回答

您可以使用 BashOperator 将 python 文件作为任务执行

    from airflow import DAG
    from airflow.operators import BashOperator,PythonOperator
    from datetime import datetime, timedelta

    seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                      datetime.min.time())

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': seven_days_ago,
        'email': ['airflow@airflow.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
      )

    dag = DAG('simple', default_args=default_args)
t1 = BashOperator(
    task_id='testairflow',
    bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
    dag=dag)

原文由 liferacer 发布,翻译遵循 CC BY-SA 3.0 许可协议

我知道您是在问您“想要执行那些 python 文件(而不是通过 Python 运算符执行 Python 函数)”。但我认为这可能比您更有效地使用 Airflow。我还看到以前写的答案中存在混淆,所以这是您想要的方式,以及我建议执行任务的方式:

假设:

 dags/
    my_dag_for_task_1_and_2.py
    tasks/
         file1.py
         file2.py

您要求避免 PythonOperator

 #  my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import BashOperator

with DAG(
    'my_dag_for_task_1_and_2',
    default_args={
        'owner': 'me',
        'start_date': datetime(…),
        …,
    },
    schedule_interval='8 * * * *',
) as dag:
    task_1 = BashOperator(
        task_id='task_1',
        bash_command='/path/to/python /path/to/dags/tasks/file1.py',
    )
    task_2 = BashOperator(
        task_id='task_2',
        bash_command='/path/to/python /path/to/dags/tasks/file2.py',
    )
    task_1 >> task_2

您没有从头开始为 Airflow 编写 Python,而是使用 PythonOperator

 #  my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import PythonOperator
import tasks.file1
import tasks.file2

with DAG(
    'my_dag_for_task_1_and_2',
    default_args={
        'owner': 'me',
        'start_date': datetime(…),
        …,
    },
    schedule_interval='8 * * * *',
) as dag:
    task_1 = PythonOperator(
        task_id='task_1',
        python_callable=file1.function_in_file1,
    )
    task_2 = PythonOperator(
        task_id='task_2',
        python_callable=file2.function_in_file2,  # maybe main?
    )
    task_1 >> task_2

原文由 dlamblin 发布,翻译遵循 CC BY-SA 3.0 许可协议

推荐问题