如何在 Airflow 中运行 Spark 代码?

新手上路,请多包涵

地球人您好!我正在使用 Airflow 来安排和运行 Spark 任务。这次我发现的只是 Airflow 可以管理的 python DAG。

DAG 示例:

 spark_count_lines.py
import logging

from airflow import DAG
from airflow.operators import PythonOperator

from datetime import datetime

args = {
  'owner': 'airflow'
  , 'start_date': datetime(2016, 4, 17)
  , 'provide_context': True
}

dag = DAG(
  'spark_count_lines'
  , start_date = datetime(2016, 4, 17)
  , schedule_interval = '@hourly'
  , default_args = args
)

def run_spark(**kwargs):
  import pyspark
  sc = pyspark.SparkContext()
  df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
  logging.info('Number of lines in people.txt = {0}'.format(df.count()))
  sc.stop()

t_main = PythonOperator(
  task_id = 'call_spark'
  , dag = dag
  , python_callable = run_spark
)

问题是我不擅长 Python 代码,并且有一些任务是用 Java 编写的。我的问题是如何在 python DAG 中运行 Spark Java jar?或者也许还有其他方法哟?我发现 spark 提交: http ://spark.apache.org/docs/latest/submitting-applications.html

但我不知道如何将所有内容连接在一起。也许有人以前用过它并且有工作示例。感谢您的时间!

原文由 Ruslan Lomov 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 905
2 个回答

您应该可以使用 BashOperator 。保持其余代码不变,导入所需的类和系统包:

 from airflow.operators.bash_operator import BashOperator

import os
import sys

设置所需路径:

 os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

并添加运算符:

 spark_task = BashOperator(
    task_id='spark_java',
    bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
    params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
    dag=dag
)

您可以轻松地扩展它以使用 Jinja 模板提供额外的参数。

您当然可以通过将 bash_command 替换为适合您的情况的模板来针对非 Spark 场景进行调整,例如:

 bash_command = 'java -jar {{ params.jar }}'

并调整 params

原文由 zero323 发布,翻译遵循 CC BY-SA 3.0 许可协议

Airflow 从 1.8 版(今天发布)开始,有

SparkSQLHook 代码 - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py

SparkSubmitHook 代码 - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py

请注意,这两个新的 Spark 运算符/挂钩在 1.8 版本的“contrib”分支中,因此没有(很好)记录。

因此,您可以使用 SparkSubmitOperator 提交您的 Java 代码以供 Spark 执行。

原文由 Tagar 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题