netty实现聊天室,客户端发送的消息(使用writeandflush),无法发送,服务器的channelread0没有被调用

X1aoHei
  • 1
新手上路,请多包涵

问题描述

client接受到控制台一行数据,用writeandflush发送,服务器端channelread0没有触发。
输入:在控制台输入“123”然后按回车健。

问题出现的环境背景及自己尝试过哪些方法

输入:在控制台输入“123”然后按回车健。

相关代码

// 请把代码文本粘贴到下方(请勿用图片代替代码)

MyServer.java

package com.wss.netty.thirdexample;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @ProjectName: netty_lecture
 * @Package: com.wss.netty.thirdexample
 * @ClassName: MyServer
 * @Author: wss
 * @Description: ${description}
 * @Date: 2019/9/8 19:04
 * @Version: 1.0
 */
public class MyServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossgroup = new NioEventLoopGroup();
        EventLoopGroup workergroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossgroup, workergroup).channel(NioServerSocketChannel.class)
                    .childHandler(new MyChatServerInitizlization());
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }finally{
            bossgroup.shutdownGracefully();
            workergroup.shutdownGracefully();
        }
    }
}

MyChatServerInitizlization.java

package com.wss.netty.thirdexample;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
 * @ProjectName: netty_lecture
 * @Package: com.wss.netty.thirdexample
 * @ClassName: MyChatServerInitizlization
 * @Author: wss
 * @Description: ${description}
 * @Date: 2019/9/8 19:07
 * @Version: 1.0
 */
public class MyChatServerInitizlization extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new MyChatServerHandler());
    }
}

MyChatServerHandler.java

package com.wss.netty.thirdexample;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * @ProjectName: netty_lecture
 * @Package: com.wss.netty.thirdexample
 * @ClassName: MyChatServerHandler
 * @Author: wss
 * @Description: ${description}
 * @Date: 2019/9/8 19:12
 * @Version: 1.0
 */
public class MyChatServerHandler extends SimpleChannelInboundHandler<String> {

    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(123);
        Channel channel = channelHandlerContext.channel();
        System.out.println(123);
        System.out.println(s);
        channelGroup.forEach(ch -> {
            if(channel != ch){
                ch.writeAndFlush(ch.remoteAddress() + "发送的消息是" + s);
            }else{
                ch.writeAndFlush("【自己】:" + s);
            }
        });
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("【服务器】 - " + channel.remoteAddress() + "加入\n");
        channelGroup.add(channel);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //channelGroup.remove(channel);  //netty自动删除
        channelGroup.writeAndFlush("【服务器】 - " + channel.remoteAddress() + "断开连接\n");
        System.out.println(channelGroup.size());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "上线");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "下线");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

MyClient.java

package com.wss.netty.thirdexample;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
 * @ProjectName: netty_lecture
 * @Package: com.wss.netty.thirdexample
 * @ClassName: MyClient
 * @Author: wss
 * @Description: ${description}
 * @Date: 2019/9/8 20:18
 * @Version: 1.0
 */
public class MyClient {
    public static void main(String[] args) throws InterruptedException, IOException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap  = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new MyChatClientInitialization());
            Channel channel = bootstrap.connect("127.0.0.1", 8899).sync().channel();
            //不断读取控制台输入的程序
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            System.out.println(123);
            for(;;){
                System.out.println(456);
                System.out.println(bufferedReader.readLine()+"1234");
                channel.writeAndFlush(bufferedReader.readLine() + "\r\n");
            }
        }finally{
            eventLoopGroup.shutdownGracefully();
        }
    }
}

MyChatClientInitialization.java

package com.wss.netty.thirdexample;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.nio.channels.Channel;

/**
 * @ProjectName: netty_lecture
 * @Package: com.wss.netty.thirdexample
 * @ClassName: MyChatClientInitialization
 * @Author: wss
 * @Description: ${description}
 * @Date: 2019/9/8 20:22
 * @Version: 1.0
 */
public class MyChatClientInitialization extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new MyChatClientHandler());
    }
}

MyChatClientHandler.java

package com.wss.netty.thirdexample;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @ProjectName: netty_lecture
 * @Package: com.wss.netty.thirdexample
 * @ClassName: MyChatClientHandler
 * @Author: wss
 * @Description: ${description}
 * @Date: 2019/9/8 20:24
 * @Version: 1.0
 */
public class MyChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

你期待的结果是什么?实际看到的错误信息又是什么?

期待的结果是启动多个客户端,一个客户端发送消息,另一个客户端也能查看到,实现广播的效果。
在控制台输入“123”,然后回车,其他客户端能够查看到消息。
实际的结果是其他客户端没有反映,服务器端channelread0没有被调用。

回复
阅读 11.2k
2 个回答
努力绽放光芒
  • 1
新手上路,请多包涵

在for循环那儿也需要在channel.writeAndFlush里面的每条消息后面加上"n"

@Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        Channel channel = channelHandlerContext.channel();
        System.out.println(s);
        channelGroup.forEach(ch -> {
            if(channel != ch){
                ch.writeAndFlush(ch.remoteAddress() + "发送的消息是" + s+"\n");
            }else{
                ch.writeAndFlush("【自己】:" + s+"\n");
            }
        });
    }
mrjiaye
  • 2
新手上路,请多包涵

image.png

将自己的逻辑实现handler 放在解码器上面试一下

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
宣传栏