spark如何处理两个RDD的关联问题

spark开发新手遇到如下问题,请大神赐教
问题描述

数据处理过程中计算得到两个RDD:rdd1和rdd2,

1.rdd1显示数据集

(R1,3)
(R2,5)
(R3,5)
(R4,5)
(R5,3)

2.rdd2显示数据集

((R1,R3),2)
((R2,R3),3)
((R2,R5),3)
((R1,R2),1)
((R1,R4),3)
((R3,R4),4)
((R4,R5),1)
((R3,R5),2)
((R2,R4),3)

要求计算 :
((Rx,Ry),α) 其中α = 0.5*(|Rx| + |Ry|) ,|Rx||Ry|为rdd1中RxRy的对应值
比如对rdd2中的第一条记录:
((R1,R3),2) ===> ((R1,R3),(0.5*(3+5))) = ((R1,R3),4)

阅读 9.8k
3 个回答

可以将val rdd1map = rdd1.collectAsMap,然后在rdd2的map中,0.5*(rdd1map.get(k.apply(0)).getOrElse(0) + rdd1map.get(k.apply(1)).getOrElse(0))
大体是这个思路,我也在学习过程中,不能给予完善的解答,抱歉。

实现rdd关联的话,需要先对rdd进行keyby,然后进行join操作

你看下 能否达到你对要求


val spark = SparkSession

  .builder
  .appName(this.getClass.getSimpleName)
  .config("spark.default.parallelism", "3")
  .master("local[3]")
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .getOrCreate()

val sql = spark.sqlContext
import sql.implicits._

val t1 = spark.sparkContext.makeRDD(List(("R1", 3),
  ("R2", 5),
  ("R3", 5),
  ("R4", 5),
  ("R5", 3))).toDF("name", "v_1")

val t2 = spark.sparkContext.makeRDD(List((("R1", "R3"), 2),
  (("R2", "R3"), 3),
  (("R2", "R5"), 3),
  (("R1", "R2"), 1),
  (("R1", "R4"), 3),
  (("R3", "R4"), 4),
  (("R4", "R5"), 1),
  (("R3", "R5"), 2),
  (("R2", "R4"), 3))).map(row => (row._1._1, row._1._2, row._2)).toDF("name1", "name2", "v_2")

t2.join(t1, t1("name").<=>(t2("name1")))
  .withColumnRenamed("v_1", "name1_v")
  .drop("name")
  .join(t1, t1("name").<=>(t2("name2")))
  .withColumnRenamed("v_1", "name2_v")
  .drop("name")
  //((Rx,Ry),α) 其中α = 0.5*(|Rx| + |Ry|) ,|Rx|和|Ry|为rdd1中Rx和Ry的对应值
  .selectExpr("name1","name2","(name1_v+name2_v)*0.5")
  .show()
  
  |name1|name2|((name1_v + name2_v) * 0.5)|
R2 R3 5.0
R1 R3 4.0
R1 R2 4.0
R3 R4 5.0
R2 R4 5.0
R1 R4 4.0
R3 R5 4.0
R2 R5 4.0
R4 R5 4.0
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
宣传栏