使用谓词从 pyarrow.parquet.ParquetDataset 中过滤行

新手上路,请多包涵

我有一个 parquet 数据集存储在 s3 上,我想查询数据集中的特定行。我能够使用 petastorm 来做到这一点,但现在我只想使用 pyarrow 来做到这一点。

这是我的尝试:

 import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    'analytics.xxx',
    filesystem=fs,
    validate_schema=False,
    filters=[('event_name', '=', 'SomeEvent')]
)

df = dataset.read_pandas().to_pandas()

但这会返回一个 pandas DataFrame,就好像过滤器不起作用一样,即我有各种值为 event_name 的行。有什么我想念的或我误解的东西吗?我可以在获取 pandas DataFrame 后进行过滤,但我会使用比需要更多的内存空间。

原文由 kluu 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.1k
2 个回答

对于来自 Google 的任何人,您现在可以在读取 Parquet 文件时过滤 PyArrow 中的行。无论您是通过 pandas 还是 pyarrow.parquet 阅读它。

文档 中:

filters (List[Tuple] or List[List[Tuple]] or None (default)) – 不匹配过滤谓词的行将从扫描数据中移除。嵌入在嵌套目录结构中的分区键将被利用来避免加载不包含匹配行的文件。如果 use_legacy_dataset 为 True,则过滤器只能引用分区键,并且仅支持 hive 样式的目录结构。将 use_legacy_dataset 设置为 False 时,还支持文件级过滤和不同的分区方案。

谓词以析取范式 (DNF) 表示,例如 [[(‘x’, ‘=’, 0), …], …]。 DNF 允许单列谓词的任意布尔逻辑组合。最里面的元组每个都描述一个列谓词。内部谓词列表被解释为连词 (AND),形成更具选择性的多列谓词。最后,最外层的列表将这些过滤器组合为析取 (OR)。

谓词也可以作为 List[Tuple] 传递。这种形式被解释为一个单一的连词。要在谓词中表达 OR,必须使用(首选)List[List[Tuple]] 表示法。

原文由 Niklas B 发布,翻译遵循 CC BY-SA 4.0 许可协议

注意:我在 这篇文章 中将其扩展为 Python 和 Parquet 的综合指南

镶木地板格式分区

为了使用过滤器,您需要使用分区以 Parquet 格式存储数据。与 CSV 相比,使用 Parquet 从许多列和分区中加载一些 Parquet 列和分区可以显着提高 I/O 性能。 Parquet 可以根据一个或多个字段的值对文件进行分区,并为嵌套值的唯一组合创建一个目录树,或者为一个分区列创建一组目录。 PySpark Parquet 文档 解释了 Parquet 的工作原理。

性别和国家的分区看起来像 这样

 path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...

如果您需要进一步分区数据,还有行组分区,但大多数工具仅支持指定行组大小,您必须自己执行 key-->row group 查找,这很丑陋(很高兴在另一个问题)。

用 Pandas 编写分区

您需要使用 Parquet 对数据进行分区,然后您可以使用过滤器加载它。对于大型数据集,您可以使用 PyArrow、pandas 或 DaskPySpark 将数据写入分区。

例如,要在 pandas 中编写分区:

 df.to_parquet(
    path='analytics.xxx',
    engine='pyarrow',
    compression='snappy',
    columns=['col1', 'col5'],
    partition_cols=['event_name', 'event_category']
)

这将文件布局如下:

 analytics.xxx/event_name=SomeEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=SomeEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet

在 PyArrow 中加载 Parquet 分区

要使用分区列按一个属性获取事件,请将元组过滤器放入列表中:

 import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx',
    filesystem=fs,
    validate_schema=False,
    filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()

使用逻辑与过滤

要使用 AND 获取具有两个或更多属性的事件,您只需创建一个过滤器元组列表:

 import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx',
    filesystem=fs,
    validate_schema=False,
    filters=[
        ('event_name',     '=', 'SomeEvent'),
        ('event_category', '=', 'SomeCategory')
    ]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()

使用逻辑 OR 过滤

要使用 OR 获取两个事件,您需要将过滤器元组嵌套在它们自己的列表中:

 import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx',
    filesystem=fs,
    validate_schema=False,
    filters=[
        [('event_name', '=', 'SomeEvent')],
        [('event_name', '=', 'OtherEvent')]
    ]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()

使用 AWS Data Wrangler 加载 Parquet 分区

正如提到的另一个答案,无论数据位于何处(本地或云中),将数据过滤加载到某些分区中的某些列的最简单方法是使用 awswrangler 模块。如果您使用的是 S3,请查看 awswrangler.s3.read_parquet()awswrangler.s3.to_parquet() 的文档。过滤的工作方式与上面的示例相同。

 import awswrangler as wr

df = wr.s3.read_parquet(
    path="analytics.xxx",
    columns=["event_name"],
    filters=[('event_name', '=', 'SomeEvent')]
)

加载 Parquet 分区 pyarrow.parquet.read_table()

如果您使用的是 PyArrow,您还可以使用 pyarrow.parquet.read_table()

 import pyarrow.parquet as pq

fp = pq.read_table(
    source='analytics.xxx',
    use_threads=True,
    columns=['some_event', 'some_category'],
    filters=[('event_name', '=', 'SomeEvent')]
)
df = fp.to_pandas()

使用 PySpark 加载 Parquet 分区

最后,在 PySpark 中,您可以使用 pyspark.sql.DataFrameReader.read_parquet()

 import pyspark.sql.functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
                    .appName('Stack Overflow Example Parquet Column Load') \
                    .getOrCreate()

# I automagically employ Parquet structure to load the selected columns and partitions
df = spark.read.parquet('s3://analytics.xxx') \
          .select('event_name', 'event_category') \
          .filter(F.col('event_name') == 'SomeEvent')

希望这可以帮助您使用 Parquet :)

原文由 rjurney 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题