NIO配合多线程陷入无限循环

下面的服务器运行起来,当客户端多个请求过来的时候一直陷入死循环,selector.select();一直有值,不知道哪里出了问题,求大佬解释一下。

package com.wangjun.io.nio;

public class TimeServer {
    
    public static void main(String[] args) {
        int port = 8080;
        
        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        new Thread(timeServer, "NIO_MultiplexerTimeServer-001").start();
    }

}
package com.wangjun.io.nio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class MultiplexerTimeServer implements Runnable {

    private Selector selector;
    private ServerSocketChannel servChannel;
    private volatile boolean stop;
    
    private final long SLEEP_TIME = 10000L;

    /**
     * 初始化多路复用器,绑定监听端口
     * 
     * @param port
     */
    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port), 1024);//???1024这个参数什么作用
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The nio time server start in port:" + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                selector.select();
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                System.out.println("key size=" + selectedKeys.size());
                while (it.hasNext()) {
                    System.out.println("it has next");
                    key = it.next();
                    it.remove();
                    try {
                        new Thread(new MyThread(key)).start();
                    } catch (Exception e) {
                        e.printStackTrace();
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
        //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所有不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void doWrite(SocketChannel channel, String response) throws IOException {
        if(response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
    
    class MyThread implements Runnable {
        
        private SelectionKey key;
        
        public MyThread(SelectionKey key) {
            this.key = key;
        }
        @Override
        public void run() {
            long startTime = System.currentTimeMillis();
            try {
                if (key.isValid()) {
                    // 处理新接入的请求消息
                    if (key.isAcceptable()) {
                        System.out.println("收到 accept 请求");
                        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                        SocketChannel sc = ssc.accept();
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ);
                    }
                    if (key.isReadable()) {
                        System.out.println("收到 read 请求,休眠" + SLEEP_TIME/1000 + "s");
                        try {
                            Thread.sleep(SLEEP_TIME);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        int readBytes = sc.read(readBuffer);
                        if (readBytes > 0) {
                            readBuffer.flip();
                            byte[] bytes = new byte[readBuffer.remaining()];
                            readBuffer.get(bytes);
                            String body = new String(bytes, "UTF-8");
                            System.out.println("时间服务器收到的命令是:" + body);
                            String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
                                    ? new Date(System.currentTimeMillis()).toString()
                                    : "BAD ORDER";
                            doWrite(sc, currentTime);
                        } else if (readBytes < 0) {
                            System.out.println("对端链路关闭");
                            // 对端链路关闭
                            key.cancel();
                            sc.close();
                        } else {
                            System.out.println("收到0字节");
                            // 读到0字节,忽略
                        }
                    }
                }else {
                    System.out.println("key is not valid");
                }
            } catch (ClosedChannelException e) {
                e.printStackTrace();
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                System.out.println("执行线程耗费:" + (System.currentTimeMillis() - startTime)/1000 + "s");
            }
        }
    }
}

客户端:

package com.wangjun.io.nio;

public class TimeClient {
    
    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        
        long start = System.currentTimeMillis();
        Thread t1 = new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001");
        Thread t2 = new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-002");
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("执行2个线程耗费:" + (System.currentTimeMillis() - start));
    }

}
package com.wangjun.io.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandle implements Runnable {
    
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;
    
    public TimeClientHandle(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while(!stop) {
            try {
                selector.select();
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
        //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所有不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // 判断连接是否成功
            SocketChannel sc = (SocketChannel) key.channel();
            if(key.isConnectable()) {
                if(sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                }else {
                    System.exit(1);
                }
            }

            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("现在的时间是:" + body);
                    this.stop = true;
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    // 读到0字节,忽略
                }
            }
        }
    }
    
    private void doConnect() throws IOException {
        //如果直连成功,则注册到多路复用器上,发送请求消息,读应答
        if(socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        }else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }
    
    private void doWrite(SocketChannel sc) throws IOException {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        if(!writeBuffer.hasRemaining()) {
            System.out.println("send order to server succeed!");
        }
    }
}

服务端和客户端都起来后,服务端的打印输出:

......
it has next
收到 read 请求,休眠10s
收到 read 请求,休眠10s
it has next
收到 read 请求,休眠10s
key size=2
it has next
it has next
收到 read 请求,休眠10s
收到 read 请求,休眠10s
key size=2
it has next
......

一直在打开线程,不断循环,不知道是哪里处理问题,在服务端命名已经remove掉了key。

阅读 2.3k
1 个回答

你的key没有cancel,而且如果有异常,new Thread(new MyThread(key)).start();这行代码下面的异常也不会被捕获,导致调用不到cancel

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题