如何将巨大的熊猫数据框保存到 hdfs?

新手上路,请多包涵

我正在使用 pandas 和 spark 数据框。数据帧总是非常大(> 20 GB),标准的 spark 函数不足以满足这些大小。目前我正在将我的熊猫数据框转换为这样的火花数据框:

 dataframe = spark.createDataFrame(pandas_dataframe)

我进行这种转换是因为使用 spark 将数据帧写入 hdfs 非常容易:

 dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")

但是对于大于 2 GB 的数据帧,转换失败。如果我将 spark 数据框转换为 pandas,我可以使用 pyarrow:

 // temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")

// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)

// delete temp files
hdfs.delete(path, recursive=True)

这是从 spark 到 pandas 的快速转换,它也适用于大于 2 GB 的数据帧。我还找不到相反的方法。意思是有一个 pandas 数据框,我在 pyarrow 的帮助下将其转换为 spark。问题是我真的找不到如何将 pandas 数据帧写入 hdfs。

我的熊猫版本:0.19.0

原文由 Mulgard 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.1k
2 个回答

意思是有一个 pandas 数据框,我在 pyarrow 的帮助下将其转换为 spark。

pyarrow.Table.fromPandas 是您正在寻找的功能:

>  Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True)
>
> Convert pandas.DataFrame to an Arrow Table
>
> ```

import pyarrow as pa

pdf = … # type: pandas.core.frame.DataFrame adf = pa.Table.from_pandas(pdf) # type: pyarrow.lib.Table


结果可以直接写入 Parquet / HDFS 而无需通过 Spark 传递数据:

import pyarrow.parquet as pq

fs = pa.hdfs.connect()

with fs.open(path, “wb”) as fw pq.write_table(adf, fw) “`

也可以看看

火花笔记

此外,由于 Spark 2.3(当前主控)Arrow 在 createDataFrameSPARK-20791 - 使用 Apache Arrow 从 Pandas.DataFrame 改进 Spark createDataFrame )中得到直接支持。它 使用 SparkContext.defaultParallelism 计算块数, 因此您可以轻松控制单个批次的大小。

最后 defaultParallelism 可用于控制使用标准生成的分区数 _convert_from_pandas 有效地将切片的大小减小到更易于管理的程度。

不幸的是,这些不太可能解决您 当前的内存问题。两者都依赖于 parallelize ,因此将所有数据存储在驱动程序节点的内存中。切换到 Arrow 或调整配置只能加速进程或解决块大小限制。

实际上,只要您使用本地 Pandas DataFrame 作为输入,我看不出有任何理由在这里切换到 Spark。这种情况下最严重的瓶颈是驱动程序的网络 I/O,而分发数据无法解决这个问题。

原文由 zero323 发布,翻译遵循 CC BY-SA 3.0 许可协议

另一种方法是将您的 pandas 数据帧转换为 spark 数据帧(使用 pyspark)并使用 save 命令将其保存到 hdfs。例子

    df = pd.read_csv("data/as/foo.csv")
    df[['Col1', 'Col2']] = df[['Col2', 'Col2']].astype(str)
    sc = SparkContext(conf=conf)
    sqlCtx = SQLContext(sc)
    sdf = sqlCtx.createDataFrame(df)

这里 astype 将列的类型从 object --- 更改为 string 。这可以避免引发异常,因为 spark 无法弄清楚熊猫类型 object 。但要确保这些列确实是字符串类型。

现在将您的 df 保存在 hdfs 中:

     sdf.write.csv('mycsv.csv')

原文由 lego king 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题