有什么支持读写超时的阻塞式IO的JavaAPI呢?

比如我想获得一个 InputStream, 支持类似下面的API:

public int read(int timeout, byte b[]) throws IOException ;

当超时到达时,返回 0 字节或者 SocketTimeoutException

如果有 DataInputStream 的超时版本就更好了。
我希望将下面的内容改造成一个支持 超时设置的版本:

public static CommandPacket readNextPacket(DataInputStream in) throws IOException {
    CommandPacket packet = new CommandPacket();
    packet.serialNo = in.readLong();
    packet.cmdType = in.readByte();

    // 用于校验
    packet.crc = in.readByte();
    packet.packetLen = in.readInt();
    packet.payload = new byte[packet.packetLen];
    in.readFully(packet.payload);
    packet.tail = new byte[PACKAGE_SPLIT.length];

    // 用于校验
    in.readFully(packet.tail);

    if (!Arrays.equals(PACKAGE_SPLIT, packet.tail)) {
        throw new RuntimeException("无效的包数据!包分隔符读取不正确!");
    }

    return packet;
}

改造成:

public static CommandPacket readNextPacket(int timeout, DataInputStream in) throws IOException {}

另外,直接在socket上设置超时 socket.setSoTimeout()不符合我的要求,我需要在socket创建以后,根据需要调整超时时间,而不是一个固定的超时时间。

想了一下,应该有下面几种解决办法:

  1. 借助InputStream类的available方法。非阻塞返回当前可用的字节数。

    public int available() throws IOException {
    }

    缺点是线程需要不断轮询available状态,cpu资源被大量浪费;结论不可行。

  2. 还是用回 socket.setSoTimeout() ,每次都重设超时时间。虽然时间控制不精确,但好像勉强也可以用。
  3. 另起一个单独的线程,监控超时时间,当超时时间到达时, 利用Thread.interrupt()方法中断IO阻塞线程,阻塞的IO操作应该会丢出一个 InterruptedIOException 异常。从而结束等待。这种方法应该是可行的。晚一点试一下。

    public void Thread.interrupt() {}
  4. 和方法3思路一致,只是把IO线程和监控线程都包装成一个任务(Runnable、或者Callable),丢到线程池中执行。这时候应该可以使用 Future.cancel()方法中断阻塞的IO操作。Future.cancel()底层 估计也是使用 Thread.interrupt()来完成的(暂时是猜测,因为要结束另一个线程的IO阻塞操作,似乎只能用 interrupt这种方式了, 就像:结束线程池的方法ExecutorService.shutdownNow方法一样)。晚点试一下。

    boolean Future.cancel(boolean mayInterruptIfRunning);
    
    List<Runnable> ExecutorService.shutdownNow();

补充:
实现时候意外发现 Future.get方法本身支持超时设置,那么就简化多了:

V Future.get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;

最后的实现:

public static CommandPacket readNextPacket(DataInputStream in, Integer timeoutSeconds)
        throws IOException, TimeoutException {

    if (timeoutSeconds == null || timeoutSeconds.intValue() <= 0) {
        // 无超时
        return readNextPacket(in);
    }

    Future<CommandPacket> future = VsqlThreadPool.getPool().submit(() -> {
        return readNextPacket(in);
    });

    try {

        return future.get(timeoutSeconds.intValue(), TimeUnit.SECONDS);

    } catch (ExecutionException e) {
        Throwable ex = e.getCause();
        if (ex instanceof IOException) {
            throw (IOException) ex;
        } else {
            throw new RuntimeException(ex);
        }
    } catch (InterruptedException e) {

        // IO操作已终止,因为操作它的线程已中断
        throw new InterruptedIOException("IO操作已终止,因为操作它的线程已中断。");

    } catch (TimeoutException e) {

        // 中断任务
        future.cancel(true);

        throw e;
    }
}
阅读 608
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题