地球人您好!我正在使用 Airflow 来安排和运行 Spark 任务。这次我发现的只是 Airflow 可以管理的 python DAG。
DAG 示例:
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, python_callable = run_spark
)
问题是我不擅长 Python 代码,并且有一些任务是用 Java 编写的。我的问题是如何在 python DAG 中运行 Spark Java jar?或者也许还有其他方法哟?我发现 spark 提交: http ://spark.apache.org/docs/latest/submitting-applications.html
但我不知道如何将所有内容连接在一起。也许有人以前用过它并且有工作示例。感谢您的时间!
原文由 Ruslan Lomov 发布,翻译遵循 CC BY-SA 4.0 许可协议
您应该可以使用
BashOperator
。保持其余代码不变,导入所需的类和系统包:设置所需路径:
并添加运算符:
您可以轻松地扩展它以使用 Jinja 模板提供额外的参数。
您当然可以通过将
bash_command
替换为适合您的情况的模板来针对非 Spark 场景进行调整,例如:并调整
params
。