netty服务端EventExecutorGroup业务线程组中多个不同的线程重复处理同一个数据

最近我在使用java的netty框架(4.1.6)来编写服务端的程序的时候遇到了一个麻烦的问题,这个问题一直困扰了我很久

我这边netty服务端在接受到客户端连接之后,会接受客户端传来的数据,并将数据在 EventExecutorGroup提供的线程中进行耗时业务处理,但是问题来了,当客户端向我发送一个数据时,比如发送的数据为A,有的时候EventExecutorGroup中会有多个线程(2~3)在不同的时间内接受这个A数据,并且进行处理,这使得我的数据库中总是会插入相同的数据

请问各位大神我该如何解决这个问题呢?

引导代码

public class ChiconyServerBootstrap 
{    
    private int port;
    private Conf MyConf;
    public static String Path;
    
    public ChiconyServerBootstrap(String Path)
    {        
        try
        {
            //自定义配置文件类
            MyConf = new Conf(Path);    
            //得到监听端口
            port = MyConf.getPort();
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
    }
    
    public void start()
    {
        ServerBootstrap serverbootstrap = new ServerBootstrap();
        NioEventLoopGroup connect_group = new NioEventLoopGroup();
        NioEventLoopGroup io_group = new NioEventLoopGroup();
        final EventExecutorGroup work_group = new DefaultEventExecutorGroup(10);
        
        try
        {
            serverbootstrap.group(connect_group, io_group);
            serverbootstrap.channel(NioServerSocketChannel.class);
            serverbootstrap.childHandler(new ChannelInitializer<SocketChannel>()
            {
                @Override
                public void initChannel(SocketChannel ch) throws Exception
                {
                    ch.pipeline().addLast(work_group, new WorkHandler());
                }
            });
            
            serverbootstrap.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60);
            serverbootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);        
            erbootstrap.bind(new InetSocketAddress(port)).sync();
            future.channel().closeFuture().sync();
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            connect_group.shutdownGracefully();
            io_group.shutdownGracefully();
            work_group.shutdownGracefully();
        }
    }
    
    public static void main(String[] args)
    {
        ChiconyServerBootstrap server;
        //得到配置文件
        Path = "./Conf/config.properties";
        server = new ChiconyServerBootstrap(Path);
        server.start();
    }
}

业务线程的handler

public class WorkHandler extends ChannelInboundHandlerAdapter 
{
    //一下代码中省略了相关的参数,log为一个自定义的日志类
    
    //远程主机的IP
    private String HostIP;
    //远程主句的端口
    private int HostPort;
    //保存主机返回的数据
    private byte[] HostRetData;
    //定义一个参数配置类
    private Conf myConf;
    //保存客户端请求数据
    private byte[] PosRequestData;
    //一个buffer来保存POS传递到channelHandler的数据
    private ByteBuffer PosSendData_Buffer;
    //保存接受的全部的数据的长度
    private int PosSendData_number;
    //定义一个参数用来保存数据库的表明
    private String TableName;
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {        
        //得到参数配置类 
        myConf = new Conf(ChiconyServerBootstrap.Path);
        HostIP = myConf.getHostIP();
        HostPort = myConf.getHostPort();
        TableName = myConf.getTableName();
        RequestAH = myConf.getRequestAH();
        
        PosSendData_Buffer = ByteBuffer.allocate(2048);
        PosSendData_number = 0;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        byte[] tmp = (byte[])msg;
        
        //重新整合已经取得的数据,防止channelRead多次调用
        PosSendData_number += tmp.length;
        for(int i = 0; i < tmp.length; i++)
        {
            PosSendData_Buffer.put(tmp[i]);
        }
        
        ReferenceCountUtil.release(msg);
    }
    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
    {                
        //将相关的数据进行业务处理,并且发向三方服务器之后将返回数据使用HostRetData保存
        
        //将返回的数据返回给客户端
        if(HostRetData != null)
        {
            //将数据传输到下一个ChannelOutboundHandler的缓存中
            ByteBuf encoded = ctx.alloc().buffer(HostRetData.length);
            encoded.writeBytes(HostRetData);
            ctx.writeAndFlush(encoded);
            ctx.close();
        }
        else
        {
            ctx.close();
        }
        
        //对返回的数据进行处理并且进行数据库插入操作,插入使用的原生的JDBC,数据库使用mysql
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
    {    
        cause.printStackTrace(); 
    }
}
阅读 8k
2 个回答
新手上路,请多包涵

解决了吗?

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题