Spark 数据框将多行转换为列

新手上路,请多包涵

我是 spark 的新手,我想在 源数据帧 下方进行 转换(从 JSON 文件加载):

 +--+-----+-----+
|A |count|major|
+--+-----+-----+
| a|    1|   m1|
| a|    1|   m2|
| a|    2|   m3|
| a|    3|   m4|
| b|    4|   m1|
| b|    1|   m2|
| b|    2|   m3|
| c|    3|   m1|
| c|    4|   m3|
| c|    5|   m4|
| d|    6|   m1|
| d|    1|   m2|
| d|    2|   m3|
| d|    3|   m4|
| d|    4|   m5|
| e|    4|   m1|
| e|    5|   m2|
| e|    1|   m3|
| e|    1|   m4|
| e|    1|   m5|
+--+-----+-----+

进入 以下 结果数据框

 +--+--+--+--+--+--+
|A |m1|m2|m3|m4|m5|
+--+--+--+--+--+--+
| a| 1| 1| 2| 3| 0|
| b| 4| 2| 1| 0| 0|
| c| 3| 0| 4| 5| 0|
| d| 6| 1| 2| 3| 4|
| e| 4| 5| 1| 1| 1|
+--+--+--+--+--+--+

这是 转换规则

  1. 结果数据框由 A + (n major columns) 组成,其中 major 列名称由以下内容指定:
    sorted(src_df.map(lambda x: x[2]).distinct().collect())

  1. 结果数据框包含 m 行,其中 A 列的值由:
    sorted(src_df.map(lambda x: x[0]).distinct().collect())

  1. 结果数据帧中每个主要列的值是源数据帧中对应的 A 和主要列的值(例如,源数据帧中第 1 行中的计数映射到 box 其中 Aa 和列 m1

  2. Amajor 在源数据框中的组合没有重复(请将其视为SQL中两列的主键)

原文由 resec 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 637
2 个回答

让我们从示例数据开始:

 df = sqlContext.createDataFrame([
    ("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
    ("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
    ("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
    ("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
    ("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
    ("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
    ("e", 1, "m4"), ("e", 1, "m5")],
    ("a", "cnt", "major"))

请注意,我已将 count 更改为 cnt 。 Count 是大多数 SQL 方言中的保留关键字,它不是列名的好选择。

至少有两种方法可以重塑这些数据:

  • 通过 DataFrame 聚合
  from pyspark.sql.functions import col, when, max

  majors = sorted(df.select("major")
      .distinct()
      .map(lambda row: row[0])
      .collect())

  cols = [when(col("major") == m, col("cnt")).otherwise(None).alias(m)
      for m in  majors]
  maxs = [max(col(m)).alias(m) for m in majors]

  reshaped1 = (df
      .select(col("a"), *cols)
      .groupBy("a")
      .agg(*maxs)
      .na.fill(0))

  reshaped1.show()

  ## +---+---+---+---+---+---+
  ## |  a| m1| m2| m3| m4| m5|
  ## +---+---+---+---+---+---+
  ## |  a|  1|  1|  2|  3|  0|
  ## |  b|  4|  1|  2|  0|  0|
  ## |  c|  3|  0|  4|  5|  0|
  ## |  d|  6|  1|  2|  3|  4|
  ## |  e|  4|  5|  1|  1|  1|
  ## +---+---+---+---+---+---+

  • groupBy 超过RDD
   from pyspark.sql import Row

  grouped = (df
      .map(lambda row: (row.a, (row.major, row.cnt)))
      .groupByKey())

  def make_row(kv):
      k, vs = kv
      tmp = dict(list(vs) + [("a", k)])
      return Row(**{k: tmp.get(k, 0) for k in ["a"] + majors})

  reshaped2 = sqlContext.createDataFrame(grouped.map(make_row))

  reshaped2.show()

  ## +---+---+---+---+---+---+
  ## |  a| m1| m2| m3| m4| m5|
  ## +---+---+---+---+---+---+
  ## |  a|  1|  1|  2|  3|  0|
  ## |  e|  4|  5|  1|  1|  1|
  ## |  c|  3|  0|  4|  5|  0|
  ## |  b|  4|  1|  2|  0|  0|
  ## |  d|  6|  1|  2|  3|  4|
  ## +---+---+---+---+---+---+

原文由 zero323 发布,翻译遵循 CC BY-SA 3.0 许可协议

使用 zero323 的数据框,

 df = sqlContext.createDataFrame([
("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
("e", 1, "m4"), ("e", 1, "m5")],
("a", "cnt", "major"))

你也可以使用

reshaped_df = df.groupby('a').pivot('major').max('cnt').fillna(0)

原文由 TrentWoodbury 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题