如何从 PySpark 中的不同线程在一个 Sparkcontext 中运行多个作业?

新手上路,请多包涵

从 Spark documentation about Scheduling Within an Application 可以了解到:

在给定的 Spark 应用程序(SparkContext 实例)中,如果多个并行作业是从单独的线程提交的,则它们可以同时运行。在本节中,“作业”是指 Spark 操作(例如保存、收集)以及需要运行以评估该操作的任何任务。 Spark 的调度程序是完全线程安全的,并支持此用例以支持为多个请求(例如多个用户的查询)提供服务的应用程序。”

我在 Scala 和 Java 中找不到相同的示例代码。有人可以举例说明如何使用 PySpark 实现吗?

原文由 Meethu Mathew 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 612
1 个回答

我遇到了同样的问题,所以我创建了一个独立的小例子。我使用 python 的线程模块创建多个线程并同时提交多个 spark 作业。

请注意,默认情况下,spark 将以先进先出 (FIFO) 的方式运行作业:http: //spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application 。在下面的示例中,我将其更改为 FAIR 调度

# Prereqs:
# set
# spark.dynamicAllocation.enabled         true
# spark.shuffle.service.enabled           true
  spark.scheduler.mode                    FAIR
# in spark-defaults.conf

import threading
from pyspark import SparkContext, SparkConf

def task(sc, i):
  print sc.parallelize(range(i*10000)).count()

def run_multiple_jobs():
  conf = SparkConf().setMaster('local[*]').setAppName('appname')
  # Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
  conf.set('spark.scheduler.mode', 'FAIR')
  sc = SparkContext(conf=conf)
  for i in range(4):
    t = threading.Thread(target=task, args=(sc, i))
    t.start()
    print 'spark task', i, 'has started'

run_multiple_jobs()

输出:

 spark task 0 has started
spark task 1 has started
spark task 2 has started
spark task 3 has started
30000
0
10000
20000

原文由 sparknoob 发布,翻译遵循 CC BY-SA 3.0 许可协议

推荐问题