nio使用线程处理每一个读写请求的写法问题

问题描述

在写java nio的demo时,希望按照React的模式实现,对每一个已经就绪的请求,都在一个新的线程里处理。具体的代码如下面所示。

当一个客户端写完数据之后,断开连接时,会报错:

java.nio.channels.ClosedChannelException

显然这种写法是不合适或者不对的。看到网上用的是 attach() 方法

selectionKey.attach(new Acceptor(selector,selectionKey));

那么:
问题1: 我采用的写法,是哪里导致客户端关闭时报错,错误原因是什么?
问题2: attach()怎么用,为什么这种写法可以避免关闭客户端报错的情况?

相关代码

使用单独线程,进行select操作。

new Thread(() -> {
            try {
                while (selector.select() > 0) {
                    for (SelectionKey selectionKey : selector.selectedKeys()) {
                        try {
                            selector.selectedKeys().remove(selectionKey);
                            if (selectionKey.isAcceptable()) {
                                SocketChannel socketChannel = serverSocketChannel.accept();
                                socketChannel.configureBlocking(false);
                                socketChannel.register(selector, SelectionKey.OP_READ);
                            }
                            if (selectionKey.isReadable()) {
                                doRead(selectionKey);
                                
                            }
                        } catch (IOException e) {
                            selectionKey.cancel();
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

当遇到可读事件时,在一个新的线程里进行处理。

public void doRead(SelectionKey selectionKey) {
        new Thread(() -> {
            try {
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                String content = "";
                int c = 0;
                while ((c = socketChannel.read(byteBuffer)) > 0) {
                    byteBuffer.flip();
                    content += charset.decode(byteBuffer);
                }
                System.out.println(content);
                selectionKey.interestOps(SelectionKey.OP_READ);
                if (c == -1) {
                    selectionKey.cancel();
                    socketChannel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
阅读 2k
1 个回答
selector.selectedKeys().remove(selectionKey);

这个写的是有问题的,应该用迭代器Iterator来删除。

Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext())
            {
                SelectionKey sk = iterator.next();
                iterator.remove();
                
                .....
             }
             

你这个

  while ((c = socketChannel.read(byteBuffer)) > 0) {
  byteBuffer.flip();
  content += charset.decode(byteBuffer);
   }
      

byteBuffer应该clear()吧,要不然只能读一次,while循环没有用。
还有下面这个:

selectionKey.interestOps(SelectionKey.OP_READ);
                if (c == -1) {
                    selectionKey.cancel();
                    socketChannel.close();
                }
       
        
  

为什么用这个函数selectionKey.interestOps()?你的读事件已经注册过了,没有必要重复注册。
至于客户端断开连接报错,我没看懂你得意思,你这里selectionKey.cancel();socketChannel.close();难道不是服务端断开的连接码?

你这种写法,一可读就开启一个新的线程,但是可能main线程的下一次select()又发现可读,为什么?因为只要你不能保证你的线程在main线程select()之前读完所有数据,下一次select()就又会发现,那么就又开了一个线程,很可能你有好几个线程一起读到来的数据,然后读完之后某个线程就调用了socketChannel.close(),就把socket给结束了,那这样别的跟这个线程一起读的其他线程必然有异常。

如果用attach我觉得应该是这么用,当然我写的话就不用attach干这件事:

Acceptor acceptor = (Acceptor)selectionKey.attach();
if(acceptor == null)
selectionKey.attach(acceptor = new Acceptor(selectionKey));
else continue;
....

然后Acceptor线程结束之前,selectionKey.attach(null);
这样保证只用一个线程来读,当然上面的continue可以不写,这里只是为了表示逻辑更容易理解。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题