请问我该如何使用EventExecutorGroup在Netty中执行相关的耗时操作呢? 可以的话请提供一个列子

之前在写一个netty的服务端遇到了问题,调查资料之后知道netty的耗时任务在EventExecutorGroup中执行,我是ChannelInboundHandlerAdapter和EventExecutorGroup混合使用,也就是将ChannelInboundHandler放到EventExecutorGroup来执行耗时任务,主要是在ChannelInboundHandler的channelReadComplete中执行任务,但是还是出现了错误,想要知道如何正确使用EventExecutorGroup执行耗时任务

阅读 4.3k
1 个回答

请参考我这个答案。链接描述

补充代码,代码不是完整的,不过可以参考一下。

public class ChanHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LoggerFactory.getLogger(ChanHandler.class);

    @Resource
    private BusinessService businessService;
    @Resource
    private ChannelManager channelManager;


    /**
     * 建立连接时
     * @param ctx
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {

        //...
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {

        //...
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            String requestStr = (String)msg;
            if(ctx.executor().inEventLoop()){
                businessLogic(ctx,requestStr);
            }else {
                ctx.executor().execute(() -> businessLogic(ctx,requestStr));
            }

        }finally {
            ReferenceCountUtil.release(msg);
        }
    }

    /**
     * 处理业务逻辑
     * @param ctx
     * @param requestStr
     */
    private void businessLogic(ChannelHandlerContext ctx,String requestStr){
        Response res = null;
        try {
            JSONObject request = JSON.parseObject(requestStr);
            switch (request.getString("action")){
                case "dev_login":
                    res = businessService.dev_login(request,ctx);
                    break;
                case "ping":
                    res = new Response("pong",null);
                    break;
                case "msg":
                    res = businessService.processing_msg(request,ctx);
                    break;
                case "quit":
                    String key = ConnUtils.getKey(ctx);
                    channelManager.removeConnection(key);
            }
        }catch (Exception e){
            res = new Response("error",400,"无法解析的字符","");
        }finally {
            IOUtil.writeAndFlush(ctx,res);
        }
    }
    
    // ...
}
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

EventExecutorGroup logicGroup = new DefaultEventExecutorGroup(16);

try {

    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup,workerGroup);
    bootstrap.channel(NioServerSocketChannel.class);
    bootstrap.handler(new LoggingHandler(LogLevel.INFO));
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new LoggingHandler(LogLevel.INFO));

            ByteBuf byteBuf = Unpooled.copiedBuffer(Const.DELIMITER.getBytes());
            pipeline.addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));

            pipeline.addLast(new IdleStateHandler(readWaitSeconds, 0, 0));

            pipeline.addLast(new StringDecoder());
            pipeline.addLast(new StringEncoder());

            pipeline.addLast(logicGroup,chanHandler);
        }
    });

    bootstrap.option(ChannelOption.SO_BACKLOG, 128);
    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

    ChannelFuture future = bootstrap.bind(port).sync();

    future.channel().closeFuture().sync();

} catch (InterruptedException e) {
    logger.error("Server error,{}",e);
} finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    logicGroup.shutdownGracefully();
}
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题