在spark-shell中执行两段程序:
第一段sortBy:
val list1: List[(String, Int)] = List(("the", 12), ("they", 2), ("do", 4), ("wild", 1), ("and", 5), ("into", 4))
val listRDD1: RDD[(String, Int)] = sc.parallelize(list1)
val result1: RDD[(String, Int)] = listRDD1.sortBy(_._2, false)
result1.collect()
在webui中查看程序的DAG,产生了3个Stage:
其中出现了4个MapPartitionsRDD,3个在ShuffledRDD之前,一个在shuffledRDD之后。
查看sortBy的源码:
f sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.size)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
this.keyBy[K](f).sortByKey(ascending, numPartitions).values
其中的keyBy会在shuffle之前生成一个MapPartitionsRDD, values会在shuffled之后生成一个MapPartitionsRDD。剩下的两个MapPartitionsRDD应该是sortByKey生成的。
用程序验证这个猜想:
val list2: List[(Int, (String, Int))] = List((12, ("the", 12)), (2, ("they", 2)), (4, ("do", 4)), (1, ("wild", 1)), (5, ("and", 5)), (4, ("into", 4)))
val listRDD2: RDD[(Int, (String, Int))] = sc.parallelize(list2)
val result2: RDD[(Int, (String, Int))] = listRDD2.sortByKey(false)
result2.collect()
查看程序的DAG,也是三个Stage:
看DAG确实是生成了两个MapPartitionsRDD,但是这两个MapPartitionsRDD都是怎么生成的?而且为什么中间又出现了一个parallelize阶段?求大佬解答。
中间又出现了的parallelize阶段是不是 range partitioner 启动的呀