使用netty编写的TCP程序,分为client、server、cli三个部分,希望cli与server连接后发送一段数据并收到回复即退出,但是现在cli可以正常收到回复,却无法正常关闭cli程序
cli启动程序
public void run (SimpleChannelInboundHandler handler) {
EventLoopGroup worker = new NioEventLoopGroup(1);
try{
Bootstrap b = new Bootstrap();
b.group(worker)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("logger",new LoggingHandler(LogLevel.DEBUG))
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(Messages.msg.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(handler);
}
});
ChannelFuture future = b.connect(host,port).sync();
System.out.println("connect complete");
future.channel().closeFuture().sync();
System.out.println("channel closed");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("working shutdown");
final Future f = worker.shutdownGracefully();
f.addListener(future->{
if(f == future)
System.out.println("closed");
else{
System.out.println("cannot close");
}
});
}
}
clihandler
public class CliInBoundHandler extends SimpleChannelInboundHandler<Messages.msg> {
private final static InternalLogger logger = InternalLoggerFactory.getInstance("cli");
protected List<String> commands;
private final static int batch_num = 3;
private int counter = 0;
public CliInBoundHandler(List<String> commands) {
this.commands = commands;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Messages.msg msg) throws Exception {
logger.info("receive a msg from {} \n {}",ctx.channel().remoteAddress(),msg);
if(msg.getType() != Messages.msg.Type.RESPONSE){
System.err.println("invalid response");
}
System.out.println(msg.getResponse().getResponse());
counter ++;
if(counter == commands.size()){
System.out.println("try to close ctx");
ChannelFuture f = ctx.close();
f.addListener(future -> {
if(f==future){
System.out.println("channel closed in ctx");
}else{
System.out.println("cannot close in ctx");
}
});
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// auth
Messages.Auth auth = Messages.Auth.newBuilder().setAuth(Messages.Auth.Type.cli).build();
final ChannelFuture future = ctx.writeAndFlush(Messages.msg.newBuilder().
setType(Messages.msg.Type.AUTH).setAuth(auth).build());
future.sync();
for(int i=0;i<commands.size();i+=batch_num){
int to = Math.min(i+batch_num,commands.size());
Messages.Cmd com = Messages.Cmd.newBuilder().addAllCmds(commands.subList(i,to)).build();
Messages.msg msg = Messages.msg.newBuilder()
.setType(Messages.msg.Type.CMD)
.setCmd(com).build();
ctx.writeAndFlush(msg);
}
super.channelActive(ctx);
}
}
这是cli运行的截图,可以看到所有的debug信息都是正常打印出来的,但是不知道还有哪个线程没有正常停止。尝试用jconsole看过,但是没有看出是哪个线程,而且打开jconsole过一会儿程序就莫名又停止了,但是如果不打开jconsole却一直停止不了。