PySpark:在日期为字符串的范围内按日期字段过滤 DataFrame

新手上路,请多包涵

我的数据框包含一个日期字段,它以字符串格式显示,例如

'2015-07-02T11:22:21.050Z'

我需要按日期过滤 DataFrame 以仅获取上周的记录。因此,我尝试了一种映射方法,我使用 strptime 将字符串日期转换为日期时间对象:

 def map_to_datetime(row):
     format_string = '%Y-%m-%dT%H:%M:%S.%fZ'
     row.date = datetime.strptime(row.date, format_string)

df = df.map(map_to_datetime)

然后我会应用一个过滤器作为

df.filter(lambda row:
    row.date >= (datetime.today() - timedelta(days=7)))

我设法让映射工作,但过滤器失败了

类型错误:条件应为字符串或列

有没有办法以有效的方式使用过滤,或者我应该改变方法以及如何改变?

原文由 mar tin 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 574
2 个回答

火花 >= 1.5

您可以使用 INTERVAL

 from pyspark.sql.functions import expr, current_date

df_casted.where(col("dt") >= current_date() - expr("INTERVAL 7 days"))

火花 < 1.5

您可以在不使用 worker 端 Python 代码并切换到 RDD 的情况下解决这个问题。首先,由于您使用 ISO 8601 字符串,您的数据可以直接转换为日期或时间戳:

 from pyspark.sql.functions import col

df = sc.parallelize([
    ('2015-07-02T11:22:21.050Z', ),
    ('2016-03-20T21:00:00.000Z', )
]).toDF(("d_str", ))

df_casted = df.select("*",
    col("d_str").cast("date").alias("dt"),
    col("d_str").cast("timestamp").alias("ts"))

这将节省 JVM 和 Python 之间的一次往返。还有一些方法可以处理第二部分。仅限日期:

 from pyspark.sql.functions import current_date, datediff, unix_timestamp

df_casted.where(datediff(current_date(), col("dt")) < 7)

时间戳:

 def days(i: int) -> int:
    return 60 * 60 * 24 * i

df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7))

您还可以查看 current_timestampdate_sub

注意:我会避免使用 DataFrame.map 。最好使用 DataFrame.rdd.map 代替。切换到 2.0+ 时,它将为您节省一些工作

原文由 zero323 发布,翻译遵循 CC BY-SA 4.0 许可协议

我想出了一种方法来解决我的问题,方法是使用带有字符串格式日期的 SparkSQL API。

这是一个例子:

 last_week = (datetime.today() - timedelta(days=7)).strftime(format='%Y-%m-%d')

new_df = df.where(df.date >= last_week)

原文由 mar tin 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题