具有批处理功能的 Java 8 Stream

新手上路,请多包涵

我有一个包含项目列表的大文件。

我想创建一批项目,使用该批次发出 HTTP 请求(所有项目都需要作为 HTTP 请求中的参数)。我可以用 for 循环很容易地做到这一点,但作为 Java 8 爱好者,我想尝试用 Java 8 的 Stream 框架编写它(并获得延迟处理的好处)。

例子:

 List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

我想做一些很长的事情 lazyFileStream.group(500).map(processBatch).collect(toList())

最好的方法是什么?

原文由 Andy Dang 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 611
2 个回答

笔记! 此解决方案在运行 forEach 之前读取整个文件。

您可以使用 jOOλ 来做到这一点,这是一个为单线程、顺序流用例扩展 Java 8 流的库:

 Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

在幕后, zipWithIndex() 只是:

 static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

… 而 groupBy() 是 API 的便利:

 default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(免责声明:我为 jOOλ 背后的公司工作)

原文由 Lukas Eder 发布,翻译遵循 CC BY-SA 4.0 许可协议

为了完整起见,这里有一个 Guava 解决方案。

 Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

在这个问题中,集合是可用的,所以不需要流,它可以写成,

 Iterables.partition(data, batchSize).forEach(this::process);

原文由 Ben Manes 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题