带有结果集的 java.util.stream

新手上路,请多包涵

我的数据量很大(大约 1 亿条记录)的表很少。所以我无法将此数据存储在内存中,但我想使用 java.util.stream 类流式传输此 结果集,并将此流传递给另一个类。我阅读了 Stream.ofStream.Builder 运算符,但它们是内存中的缓冲流。那么有什么办法可以解决这个问题呢?提前致谢。

更新 #1

好的,我用谷歌搜索并找到 了 jooq 库。我不确定,但看起来它可能适用于我的测试用例。总而言之,我几乎没有包含大量数据的表。我想流式传输我的结果集并将此流传输到另一种方法。是这样的:

 // why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {

    Stream<Record> record = null;
    try (Connection connection = dataSource.getConnection()) {
        String sql = "select * from " + table;

        try (PreparedStatement pSt = connection.prepareStatement(sql)) {
            connection.setAutoCommit(false);
            pSt.setFetchSize(5000);
            ResultSet resultSet = pSt.executeQuery();
            //
            record = DSL.using(connection)
                    .fetch(resultSet).stream();
        }
    } catch (SQLException sqlEx) {
        logger.error(sqlEx);
    }

    return record;
}

可以请别人建议,我的方法正确吗?谢谢。

更新 #2

我在 jooq 上做了一些实验,现在可以说上面的决定不适合我。此代码 record = DSL.using(connection).fetch(resultSet).stream(); 花费太多时间

原文由 Iurii 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 311
1 个回答

您必须了解的第一件事是像这样的代码

try (Connection connection = dataSource.getConnection()) {
    …
    try (PreparedStatement pSt = connection.prepareStatement(sql)) {
        …
        return stream;
    }
}

在您离开 try 块时不起作用,资源已关闭,而 Stream 的处理甚至还没有开始。

资源管理结构“try with resources”适用于方法内块范围内使用的资源,但您正在创建返回资源的工厂方法。因此,您必须确保返回流的关闭将关闭资源,并且调用者负责关闭 Stream


此外,您需要一个函数,它从 ResultSet 的一行中生成一个项目。假设,你有一个像

Record createRecord(ResultSet rs) {
    …
}

你可以创建一个 Stream<Record> 基本上就像

Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
    Long.MAX_VALUE,Spliterator.ORDERED) {
        @Override
        public boolean tryAdvance(Consumer<? super Record> action) {
            if(!resultSet.next()) return false;
            action.accept(createRecord(resultSet));
            return true;
        }
    }, false);

但是要正确地做到这一点,您必须合并异常处理和资源关闭。您可以使用 Stream.onClose 注册将在 Stream 关闭时执行的操作,但它必须是 Runnable 无法抛出异常。同样 tryAdvance 方法不允许抛出已检查的异常。而且由于我们不能简单地嵌套 try(…) 块在这里,在 close 抛出抑制异常的程序逻辑,当已经有一个挂起的异常时,不是免费的。

为了在这里帮助我们,我们引入了一种新类型,它可以包装可能抛出已检查异常的关闭操作,并将它们包装在未检查异常中。通过实施 AutoCloseable 本身,它可以利用 try(…) 构造安全地链接关闭操作:

 interface UncheckedCloseable extends Runnable, AutoCloseable {
    default void run() {
        try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
    }
    static UncheckedCloseable wrap(AutoCloseable c) {
        return c::close;
    }
    default UncheckedCloseable nest(AutoCloseable c) {
        return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
    }
}

这样,整个操作就变成了:

 private Stream<Record> tableAsStream(DataSource dataSource, String table)
    throws SQLException {

    UncheckedCloseable close=null;
    try {
        Connection connection = dataSource.getConnection();
        close=UncheckedCloseable.wrap(connection);
        String sql = "select * from " + table;
        PreparedStatement pSt = connection.prepareStatement(sql);
        close=close.nest(pSt);
        connection.setAutoCommit(false);
        pSt.setFetchSize(5000);
        ResultSet resultSet = pSt.executeQuery();
        close=close.nest(resultSet);
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
            Long.MAX_VALUE,Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super Record> action) {
                try {
                    if(!resultSet.next()) return false;
                    action.accept(createRecord(resultSet));
                    return true;
                } catch(SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }, false).onClose(close);
    } catch(SQLException sqlEx) {
        if(close!=null)
            try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
        throw sqlEx;
    }
}

此方法包装了所有资源的必要关闭操作, ConnectionStatementResultSet 在上述实用程序类的一个实例中。如果在初始化过程中发生异常,则立即执行关闭操作并将异常传递给调用者。如果流构造成功,则通过 onClose 注册关闭操作。

因此,调用者必须确保正确关闭,例如

try(Stream<Record> s=tableAsStream(dataSource, table)) {
    // stream operation
}

请注意,通过 RuntimeException 交付的 SQLException 已添加到 tryAdvance 方法中。因此,您现在可以 throws SQLException 添加到 createRecord 方法。

原文由 Holger 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题