Spark的sortBy函数为什么会生成4个MapPartitionsRDD?

在spark-shell中执行两段程序:
第一段sortBy:

val list1: List[(String, Int)] = List(("the", 12), ("they", 2), ("do", 4), ("wild", 1), ("and", 5), ("into", 4))
val listRDD1: RDD[(String, Int)] = sc.parallelize(list1)
val result1: RDD[(String, Int)] = listRDD1.sortBy(_._2, false)
result1.collect()

在webui中查看程序的DAG,产生了3个Stage:

clipboard.png

clipboard.png

clipboard.png

其中出现了4个MapPartitionsRDD,3个在ShuffledRDD之前,一个在shuffledRDD之后。
查看sortBy的源码:

f sortBy[K](
    f: (T) => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.size)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
  this.keyBy[K](f).sortByKey(ascending, numPartitions).values

其中的keyBy会在shuffle之前生成一个MapPartitionsRDD, values会在shuffled之后生成一个MapPartitionsRDD。剩下的两个MapPartitionsRDD应该是sortByKey生成的。

用程序验证这个猜想:

val list2: List[(Int, (String, Int))] = List((12, ("the", 12)), (2, ("they", 2)), (4, ("do", 4)), (1, ("wild", 1)), (5, ("and", 5)), (4, ("into", 4)))
val listRDD2: RDD[(Int, (String, Int))] = sc.parallelize(list2)
val result2: RDD[(Int, (String, Int))] = listRDD2.sortByKey(false)
result2.collect()

查看程序的DAG,也是三个Stage:

clipboard.png

clipboard.png

clipboard.png

看DAG确实是生成了两个MapPartitionsRDD,但是这两个MapPartitionsRDD都是怎么生成的?而且为什么中间又出现了一个parallelize阶段?求大佬解答。

阅读 2.2k
1 个回答
新手上路,请多包涵

中间又出现了的parallelize阶段是不是 range partitioner 启动的呀

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进