collect_list 通过保留基于另一个变量的顺序

新手上路,请多包涵

我正在尝试在现有列集上使用 groupby 聚合在 Pyspark 中创建一个新的列表列。下面提供了一个示例输入数据框:

 ------------------------
id | date        | value
------------------------
1  |2014-01-03   | 10
1  |2014-01-04   | 5
1  |2014-01-05   | 15
1  |2014-01-06   | 20
2  |2014-02-10   | 100
2  |2014-03-11   | 500
2  |2014-04-15   | 1500

预期的输出是:

 id | value_list
------------------------
1  | [10, 5, 15, 20]
2  | [100, 500, 1500]

列表中的值按日期排序。

我尝试使用 collect_list 如下:

 from pyspark.sql import functions as F
ordered_df = input_df.orderBy(['id','date'],ascending = True)
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))

但是即使我在聚合之前按日期对输入数据帧进行排序,collect_list 也不能保证顺序。

有人可以通过保留基于第二个(日期)变量的顺序来帮助进行聚合吗?

原文由 Ravi 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.6k
2 个回答

如果您将日期和值都收集为列表,则可以使用 and udf 根据日期对结果列进行排序,然后仅保留结果中的值。

 import operator
import pyspark.sql.functions as F

# create list column
grouped_df = input_df.groupby("id") \
               .agg(F.collect_list(F.struct("date", "value")) \
               .alias("list_col"))

# define udf
def sorter(l):
  res = sorted(l, key=operator.itemgetter(0))
  return [item[1] for item in res]

sort_udf = F.udf(sorter)

# test
grouped_df.select("id", sort_udf("list_col") \
  .alias("sorted_list")) \
  .show(truncate = False)
+---+----------------+
|id |sorted_list     |
+---+----------------+
|1  |[10, 5, 15, 20] |
|2  |[100, 500, 1500]|
+---+----------------+

原文由 mtoto 发布,翻译遵循 CC BY-SA 3.0 许可协议

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('id').orderBy('date')

sorted_list_df = input_df.withColumn(
            'sorted_list', F.collect_list('value').over(w)
        )\
        .groupBy('id')\
        .agg(F.max('sorted_list').alias('sorted_list'))

Window 用户提供的示例通常不能真正解释发生了什么,所以让我为您剖析一下。

如您所知,将 collect_listgroupBy 一起使用将生成 无序 列表的值。这是因为根据数据的分区方式,Spark 会在找到组中的行后立即将值附加到列表中。然后,顺序取决于 Spark 如何计划您对执行程序的聚合。

Window 函数允许您控制这种情况,按特定值对行进行分组,以便您可以执行操作 over 每个结果组:

 w = Window.partitionBy('id').orderBy('date')

  • partitionBy - 你想要具有相同的行的组/分区 id
  • orderBy - 您希望组中的每一行按 date 排序

一旦定义了窗口的范围 - “具有相同 id 的行,按 date 排序”-,您可以使用它对其执行操作,在这种情况下,一个 collect_list

 F.collect_list('value').over(w)

此时,您创建了一个新列 sorted_list ,其中包含按日期排序的有序值列表,但每个 id 仍然有重复的行。要删除您想要的重复行 groupBy id 并保留 max 每个组的值:

 .groupBy('id')\
.agg(F.max('sorted_list').alias('sorted_list'))

原文由 TMichel 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题