如何在 Pyspark 中按列连接/附加多个 Spark 数据帧?

新手上路,请多包涵

如何使用 Pyspark 数据帧做相当于 pd.concat([df1,df2],axis=‘columns’) 的 pandas?我用谷歌搜索,找不到好的解决方案。

 DF1
var1
     3
     4
     5

DF2
var2    var3
  23      31
  44      45
  52      53

Expected output dataframe
var1        var2    var3
     3        23      31
     4        44      45
     5        52      53

编辑以包括预期输出

原文由 GeorgeOfTheRF 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 394
2 个回答

以下是您想在 scala 中执行的操作的示例,我希望您可以将其转换为 pyspark

 val spark = SparkSession
    .builder()
    .master("local")
    .appName("ParquetAppendMode")
    .getOrCreate()
  import spark.implicits._

  val df1 = spark.sparkContext.parallelize(Seq(
    (1, "abc"),
    (2, "def"),
    (3, "hij")
  )).toDF("id", "name")

  val df2 = spark.sparkContext.parallelize(Seq(
    (19, "x"),
    (29, "y"),
    (39, "z")
  )).toDF("age", "address")

  val schema = StructType(df1.schema.fields ++ df2.schema.fields)

  val df1df2 = df1.rdd.zip(df2.rdd).map{
    case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}

  spark.createDataFrame(df1df2, schema).show()

这就是你只使用数据框的方式

import org.apache.spark.sql.functions._

val ddf1 = df1.withColumn("row_id", monotonically_increasing_id())
val ddf2 = df2.withColumn("row_id", monotonically_increasing_id())

val result = ddf1.join(ddf2, Seq("row_id")).drop("row_id")

result.show()

将新列添加为 row_id 并将两个数据框与键连接为 row_id

希望这可以帮助!

原文由 koiralo 发布,翻译遵循 CC BY-SA 3.0 许可协议

使用 pyspark 的等效接受答案将是

from pyspark.sql.types import StructType

spark = SparkSession.builder().master("local").getOrCreate()
df1 = spark.sparkContext.parallelize([(1, "a"),(2, "b"),(3, "c")]).toDF(["id", "name"])
df2 = spark.sparkContext.parallelize([(7, "x"),(8, "y"),(9, "z")]).toDF(["age", "address"])

schema = StructType(df1.schema.fields + df2.schema.fields)
df1df2 = df1.rdd.zip(df2.rdd).map(lambda x: x[0]+x[1])
spark.createDataFrame(df1df2, schema).show()

原文由 Devi 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题