最近我在使用java的netty框架(4.1.6)来编写服务端的程序的时候遇到了一个麻烦的问题,这个问题一直困扰了我很久
我这边netty服务端在接受到客户端连接之后,会接受客户端传来的数据,并将数据在 EventExecutorGroup提供的线程中进行耗时业务处理,但是问题来了,当客户端向我发送一个数据时,比如发送的数据为A,有的时候EventExecutorGroup中会有多个线程(2~3)在不同的时间内接受这个A数据,并且进行处理,这使得我的数据库中总是会插入相同的数据
请问各位大神我该如何解决这个问题呢?
引导代码
public class ChiconyServerBootstrap
{
private int port;
private Conf MyConf;
public static String Path;
public ChiconyServerBootstrap(String Path)
{
try
{
//自定义配置文件类
MyConf = new Conf(Path);
//得到监听端口
port = MyConf.getPort();
}
catch(Exception e)
{
e.printStackTrace();
}
}
public void start()
{
ServerBootstrap serverbootstrap = new ServerBootstrap();
NioEventLoopGroup connect_group = new NioEventLoopGroup();
NioEventLoopGroup io_group = new NioEventLoopGroup();
final EventExecutorGroup work_group = new DefaultEventExecutorGroup(10);
try
{
serverbootstrap.group(connect_group, io_group);
serverbootstrap.channel(NioServerSocketChannel.class);
serverbootstrap.childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
public void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(work_group, new WorkHandler());
}
});
serverbootstrap.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60);
serverbootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
erbootstrap.bind(new InetSocketAddress(port)).sync();
future.channel().closeFuture().sync();
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
connect_group.shutdownGracefully();
io_group.shutdownGracefully();
work_group.shutdownGracefully();
}
}
public static void main(String[] args)
{
ChiconyServerBootstrap server;
//得到配置文件
Path = "./Conf/config.properties";
server = new ChiconyServerBootstrap(Path);
server.start();
}
}
业务线程的handler
public class WorkHandler extends ChannelInboundHandlerAdapter
{
//一下代码中省略了相关的参数,log为一个自定义的日志类
//远程主机的IP
private String HostIP;
//远程主句的端口
private int HostPort;
//保存主机返回的数据
private byte[] HostRetData;
//定义一个参数配置类
private Conf myConf;
//保存客户端请求数据
private byte[] PosRequestData;
//一个buffer来保存POS传递到channelHandler的数据
private ByteBuffer PosSendData_Buffer;
//保存接受的全部的数据的长度
private int PosSendData_number;
//定义一个参数用来保存数据库的表明
private String TableName;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
//得到参数配置类
myConf = new Conf(ChiconyServerBootstrap.Path);
HostIP = myConf.getHostIP();
HostPort = myConf.getHostPort();
TableName = myConf.getTableName();
RequestAH = myConf.getRequestAH();
PosSendData_Buffer = ByteBuffer.allocate(2048);
PosSendData_number = 0;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
byte[] tmp = (byte[])msg;
//重新整合已经取得的数据,防止channelRead多次调用
PosSendData_number += tmp.length;
for(int i = 0; i < tmp.length; i++)
{
PosSendData_Buffer.put(tmp[i]);
}
ReferenceCountUtil.release(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
{
//将相关的数据进行业务处理,并且发向三方服务器之后将返回数据使用HostRetData保存
//将返回的数据返回给客户端
if(HostRetData != null)
{
//将数据传输到下一个ChannelOutboundHandler的缓存中
ByteBuf encoded = ctx.alloc().buffer(HostRetData.length);
encoded.writeBytes(HostRetData);
ctx.writeAndFlush(encoded);
ctx.close();
}
else
{
ctx.close();
}
//对返回的数据进行处理并且进行数据库插入操作,插入使用的原生的JDBC,数据库使用mysql
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{
cause.printStackTrace();
}
}
解决了吗?