芋道源码

芋道源码 查看完整档案

北京编辑浙江工业大学  |  计算机 编辑医疗  |  技术经理 编辑 github.com/YunaiV 编辑
编辑

弱水三千 只取一瓢

个人动态

芋道源码 关注了用户 · 2020-07-15

Linmi @linmi

Segmentfault Community Support | Notion Pro


一对一免费提供内容选题、写作指导 → 微信 linmib

关注 19

芋道源码 发布了文章 · 2020-07-15

从零单排,使用 Netty 构建 IM 聊天室~

1. 概述

《芋道 Spring Boot WebSocket 入门》文章中,我们使用 WebSocket 实现了一个简单的 IM 功能,支持身份认证、私聊消息、群聊消息。

然后就有胖友私信艿艿,希望使用纯 Netty 实现一个类似的功能。良心的艿艿,当然不会给她发红人卡,因此就有了本文。可能有胖友不知道 Netty 是什么,这里简单介绍下:

Netty 是一个 Java 开源框架。

Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。

Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。

下面,我们来新建三个项目,如下图所示:

三个项目

另外,我们也会提供 Netty 常用功能的示例:

  • 心跳机制,实现服务端对客户端的存活检测。
  • 断线重连,实现客户端对服务端的重新连接。

不哔哔,直接开干。

友情提示:可能会胖友担心,没有 Netty 基础是不是无法阅读本文?!

艿艿的想法,看!就硬看,按照代码先自己能搭建一下哈~文末,艿艿会提供一波 Netty 基础入门的文章。

2. 构建 Netty 服务端与客户端

本文在提供完整代码示例,可见 https://github.com/YunaiV/Spr...lab-67 目录。

原创不易,给点个 Star 嘿,一起冲鸭!

本小节,我们先来使用 Netty 构建服务端与客户端的核心代码,让胖友对项目的代码有个初始的认知。

2.1 构建 Netty 服务端

创建 lab-67-netty-demo-server 项目,搭建 Netty 服务端。如下图所示:

项目结构

下面,我们只会暂时看看 server 包下的代码,避免信息量过大,击穿胖友的秃头。

2.1.1 NettyServer

创建 NettyServer 类,Netty 服务端。代码如下:

@Component
public class NettyServer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${netty.port}")
    private Integer port;

    @Autowired
    private NettyServerHandlerInitializer nettyServerHandlerInitializer;

    /**
     * boss 线程组,用于服务端接受客户端的连接
     */
    private EventLoopGroup bossGroup = new NioEventLoopGroup();
    /**
     * worker 线程组,用于服务端接受客户端的数据读写
     */
    private EventLoopGroup workerGroup = new NioEventLoopGroup();
    /**
     * Netty Server Channel
     */
    private Channel channel;

    /**
     * 启动 Netty Server
     */
    @PostConstruct
    public void start() throws InterruptedException {
        // <2.1> 创建 ServerBootstrap 对象,用于 Netty Server 启动
        ServerBootstrap bootstrap = new ServerBootstrap();
        // <2.2> 设置 ServerBootstrap 的各种属性
        bootstrap.group(bossGroup, workerGroup) // <2.2.1> 设置两个 EventLoopGroup 对象
                .channel(NioServerSocketChannel.class)  // <2.2.2> 指定 Channel 为服务端 NioServerSocketChannel
                .localAddress(new InetSocketAddress(port)) // <2.2.3> 设置 Netty Server 的端口
                .option(ChannelOption.SO_BACKLOG, 1024) // <2.2.4> 服务端 accept 队列的大小
                .childOption(ChannelOption.SO_KEEPALIVE, true) // <2.2.5> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能
                .childOption(ChannelOption.TCP_NODELAY, true) // <2.2.6> 允许较小的数据包的发送,降低延迟
                .childHandler(nettyServerHandlerInitializer);
        // <2> 绑定端口,并同步等待成功,即启动服务端
        ChannelFuture future = bootstrap.bind().sync();
        if (future.isSuccess()) {
            channel = future.channel();
            logger.info("[start][Netty Server 启动在 {} 端口]", port);
        }
    }

    /**
     * 关闭 Netty Server
     */
    @PreDestroy
    public void shutdown() {
        // <3.1> 关闭 Netty Server
        if (channel != null) {
            channel.close();
        }
        // <3.2> 优雅关闭两个 EventLoopGroup 对象
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

}

🔥 ① 在类上,添加 @Component 注解,把 NettyServer 的创建交给 Spring 管理。

  • port 属性,读取 application.yml 配置文件的 netty.port 配置项。
  • #start() 方法,添加 @PostConstruct 注解,启动 Netty 服务器。
  • #shutdown() 方法,添加 @PreDestroy 注解,关闭 Netty 服务器。

🔥 ② 我们来详细看看 #start() 方法的代码,如何实现 Netty Server 的启动。

<2.1> 处,创建 ServerBootstrap 类,Netty 提供的服务器的启动类,方便我们初始化 Server。

<2.2> 处,设置 ServerBootstrap 的各种属性。

友情提示:这里涉及较多 Netty 组件的知识,艿艿先以简单的语言描述,后续胖友在文末的 Netty 基础入门的文章,补充学噢。

<2.2.1> 处,调用 #group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 方法,设置使用 bossGroupworkerGroup。其中:

  • bossGroup 属性:Boss 线程组,用于服务端接受客户端的连接
  • workerGroup 属性:Worker 线程组,用于服务端接受客户端的数据读写

Netty 采用的是多 Reactor 多线程的模型,服务端可以接受更多客户端的数据读写的能力。原因是:

  • 创建专门用于接受客户端连接bossGroup 线程组,避免因为已连接的客户端的数据读写频繁,影响新的客户端的连接。
  • 创建专门用于接收客户端读写workerGroup 线程组,多个线程进行客户端的数据读写,可以支持更多客户端。

课后习题:感兴趣的胖友,后续可以看看《【NIO 系列】——之 Reactor 模型》文章。

<2.2.2> 处,调用 #channel(Class<? extends C> channelClass) 方法,设置使用 NioServerSocketChannel 类,它是 Netty 定义的 NIO 服务端 TCP Socket 实现类。

<2.2.3> 处,调用 #localAddress(SocketAddress localAddress) 方法,设置服务端的端口

<2.2.4> 处,调用 option#(ChannelOption<T> option, T value) 方法,设置服务端接受客户端的连接队列大小。因为 TCP 建立连接是三次握手,所以第一次握手完成后,会添加到服务端的连接队列中。

课后习题:更多相关内容,后续可以看看《浅谈 TCP Socket 的 backlog 参数》文章。

<2.2.5> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,TCP Keepalive 机制,实现 TCP 层级的心跳保活功能。

课后习题:更多相关内容,后续可以看看《TCP Keepalive 机制刨根问底》文章。

<2.2.6> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,允许较小的数据包的发送,降低延迟。

课后习题:更多相关内容,后续可以看看《详解 Socket 编程 --- TCP_NODELAY 选项》文章。

<2.2.7> 处,调用 #childHandler(ChannelHandler childHandler) 方法,设置客户端连接上来的 Channel 的处理器为 NettyServerHandlerInitializer。稍后我们在「2.1.2 NettyServerHandlerInitializer」小节来看看。

<2.3> 处,调用 #bind() + #sync() 方法,绑定端口,并同步等待成功,即启动服务端。

🔥 ③ 我们来详细看看 #shutdown() 方法的代码,如何实现 Netty Server 的关闭。

<3.1> 处,调用 Channel 的 #close() 方法,关闭 Netty Server,这样客户端就不再能连接了。

<3.2> 处,调用 EventLoopGroup 的 #shutdownGracefully() 方法,优雅关闭 EventLoopGroup。例如说,它们里面的线程池。

2.1.2 NettyServerHandlerInitializer

在看 NettyServerHandlerInitializer 的代码之前,我们需要先了解下 Netty 的 ChannelHandler 组件,用来处理 Channel 的各种事件。这里的事件很广泛,比如可以是连接、数据读写、异常、数据转换等等。

ChannelHandler 有非常多的子类,其中有个非常特殊的 ChannelInitializer,它用于 Channel 创建时,实现自定义的初始化逻辑。这里我们创建的 NettyServerHandlerInitializer 类,就继承了 ChannelInitializer 抽象类,代码如下:

@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {

    /**
     * 心跳超时时间
     */
    private static final Integer READ_TIMEOUT_SECONDS = 3 * 60;

    @Autowired
    private MessageDispatcher messageDispatcher;
    @Autowired
    private NettyServerHandler nettyServerHandler;

    @Override
    protected void initChannel(Channel ch) {
        // <1> 获得 Channel 对应的 ChannelPipeline
        ChannelPipeline channelPipeline = ch.pipeline();
        // <2> 添加一堆 NettyServerHandler 到 ChannelPipeline 中
        channelPipeline
                // 空闲检测
                .addLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS))
                // 编码器
                .addLast(new InvocationEncoder())
                // 解码器
                .addLast(new InvocationDecoder())
                // 消息分发器
                .addLast(messageDispatcher)
                // 服务端处理器
                .addLast(nettyServerHandler)
        ;
    }

}

在每一个客户端与服务端建立完成连接时,服务端会创建一个 Channel 与之对应。此时,NettyServerHandlerInitializer 会进行执行 #initChannel(Channel c) 方法,进行自定义的初始化。

友情提示:创建的客户端的 Channel,不要和「2.1.1 NettyServer」小节的 NioServerSocketChannel 混淆,不是同一个哈。

#initChannel(Channel ch) 方法的 ch 参数,就是此时创建的客户端 Channel。

<1> 处,调用 Channel 的 #pipeline() 方法,获得客户端 Channel 对应的 ChannelPipeline。ChannelPipeline 由一系列的 ChannelHandler 组成,又或者说是 ChannelHandler 。这样, Channel 所有上所有的事件都会经过 ChannelPipeline,被其上的 ChannelHandler 所处理。

<2> 处,添加五个 ChannelHandler 到 ChannelPipeline 中,每一个的作用看其上的注释。具体的,我们会在后续的小节详细解释。

2.1.3 NettyServerHandler

创建 NettyServerHandler 类,继承 ChannelInboundHandlerAdapter 类,实现客户端 Channel 建立连接、断开连接、异常时的处理。代码如下:

@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private NettyChannelManager channelManager;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 从管理器中添加
        channelManager.add(ctx.channel());
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {
        // 从管理器中移除
        channelManager.remove(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);
        // 断开连接
        ctx.channel().close();
    }

}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

channelManager 属性,是我们实现的客户端 Channel 的管理器。

  • #channelActive(ChannelHandlerContext ctx) 方法,在客户端和服务端建立连接完成时,调用 NettyChannelManager 的 #add(Channel channel) 方法,添加到其中
  • #channelUnregistered(ChannelHandlerContext ctx) 方法,在客户端和服务端断开连接时,调用 NettyChannelManager 的 #add(Channel channel) 方法,从其中移除

具体的 NettyChannelManager 的源码,我们在「2.1.4 NettyChannelManager」 小节中来瞅瞅~

#exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 方法,在处理 Channel 的事件发生异常时,调用 Channel 的 #close() 方法,断开和客户端的连接。

2.1.4 NettyChannelManager

创建 NettyChannelManager 类,提供两种功能。

🔥 ① 客户端 Channel 的管理。代码如下:

@Component
public class NettyChannelManager {

    /**
     * {@link Channel#attr(AttributeKey)} 属性中,表示 Channel 对应的用户
     */
    private static final AttributeKey<String> CHANNEL_ATTR_KEY_USER = AttributeKey.newInstance("user");

    private Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * Channel 映射
     */
    private ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>();
    /**
     * 用户与 Channel 的映射。
     *
     * 通过它,可以获取用户对应的 Channel。这样,我们可以向指定用户发送消息。
     */
    private ConcurrentMap<String, Channel> userChannels = new ConcurrentHashMap<>();

    /**
     * 添加 Channel 到 {@link #channels} 中
     *
     * @param channel Channel
     */
    public void add(Channel channel) {
        channels.put(channel.id(), channel);
        logger.info("[add][一个连接({})加入]", channel.id());
    }

    /**
     * 添加指定用户到 {@link #userChannels} 中
     *
     * @param channel Channel
     * @param user 用户
     */
    public void addUser(Channel channel, String user) {
        Channel existChannel = channels.get(channel.id());
        if (existChannel == null) {
            logger.error("[addUser][连接({}) 不存在]", channel.id());
            return;
        }
        // 设置属性
        channel.attr(CHANNEL_ATTR_KEY_USER).set(user);
        // 添加到 userChannels
        userChannels.put(user, channel);
    }

    /**
     * 将 Channel 从 {@link #channels} 和 {@link #userChannels} 中移除
     *
     * @param channel Channel
     */
    public void remove(Channel channel) {
        // 移除 channels
        channels.remove(channel.id());
        // 移除 userChannels
        if (channel.hasAttr(CHANNEL_ATTR_KEY_USER)) {
            userChannels.remove(channel.attr(CHANNEL_ATTR_KEY_USER).get());
        }
        logger.info("[remove][一个连接({})离开]", channel.id());
    }
}

🔥 ② 向客户端 Channel 发送消息。代码如下:

@Component
public class NettyChannelManager {

    /**
     * 向指定用户发送消息
     *
     * @param user 用户
     * @param invocation 消息体
     */
    public void send(String user, Invocation invocation) {
        // 获得用户对应的 Channel
        Channel channel = userChannels.get(user);
        if (channel == null) {
            logger.error("[send][连接不存在]");
            return;
        }
        if (!channel.isActive()) {
            logger.error("[send][连接({})未激活]", channel.id());
            return;
        }
        // 发送消息
        channel.writeAndFlush(invocation);
    }

    /**
     * 向所有用户发送消息
     *
     * @param invocation 消息体
     */
    public void sendAll(Invocation invocation) {
        for (Channel channel : channels.values()) {
            if (!channel.isActive()) {
                logger.error("[send][连接({})未激活]", channel.id());
                return;
            }
            // 发送消息
            channel.writeAndFlush(invocation);
        }
    }

}

2.1.5 引入依赖

创建 pom.xml 文件,引入 Netty 依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>lab-67-netty-demo</artifactId>
        <groupId>cn.iocoder.springboot.labs</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-67-netty-demo-server</artifactId>

    <properties>
        <!-- 依赖相关配置 -->
        <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
        <!-- 插件相关配置 -->
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!-- Spring Boot 基础依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- Netty 依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.50.Final</version>
        </dependency>

        <!-- 引入 netty-demo-common 封装 -->
        <dependency>
            <groupId>cn.iocoder.springboot.labs</groupId>
            <artifactId>lab-67-netty-demo-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

2.1.6 NettyServerApplication

创建 NettyServerApplication 类,Netty Server 启动类。代码如下:

@SpringBootApplication
public class NettyServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(NettyServerApplication.class, args);
    }

}

2.1.7 简单测试

执行 NettyServerApplication 类,启动 Netty Server 服务器。日志如下:

... // 省略其他日志

2020-06-21 00:16:38.801  INFO 41948 --- [           main] c.i.s.l.n.server.NettyServer             : [start][Netty Server 启动在 8888 端口]
2020-06-21 00:16:38.893  INFO 41948 --- [           main] c.i.s.l.n.NettyServerApplication         : Started NettyServerApplication in 0.96 seconds (JVM running for 1.4)

Netty Server 启动在 8888 端口。

2.2 构建 Netty 客户端

创建 lab-67-netty-demo-client 项目,搭建 Netty 客户端。如下图所示:

项目结构

下面,我们只会暂时看看 client 包下的代码,避免信息量过大,击穿胖友的秃头。

2.2.1 NettyClient

创建 NettyClient 类,Netty 客户端。代码如下:

@Component
public class NettyClient {

    /**
     * 重连频率,单位:秒
     */
    private static final Integer RECONNECT_SECONDS = 20;

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${netty.server.host}")
    private String serverHost;
    @Value("${netty.server.port}")
    private Integer serverPort;

    @Autowired
    private NettyClientHandlerInitializer nettyClientHandlerInitializer;

    /**
     * 线程组,用于客户端对服务端的连接、数据读写
     */
    private EventLoopGroup eventGroup = new NioEventLoopGroup();
    /**
     * Netty Client Channel
     */
    private volatile Channel channel;

    /**
     * 启动 Netty Server
     */
    @PostConstruct
    public void start() throws InterruptedException {
        // <2.1> 创建 Bootstrap 对象,用于 Netty Client 启动
        Bootstrap bootstrap = new Bootstrap();
        // <2.2>
        bootstrap.group(eventGroup) // <2.2.1> 设置一个 EventLoopGroup 对象
                .channel(NioSocketChannel.class)  // <2.2.2> 指定 Channel 为客户端 NioSocketChannel
                .remoteAddress(serverHost, serverPort) // <2.2.3> 指定连接服务器的地址
                .option(ChannelOption.SO_KEEPALIVE, true) // <2.2.4> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能
                .option(ChannelOption.TCP_NODELAY, true) //<2.2.5>  允许较小的数据包的发送,降低延迟
                .handler(nettyClientHandlerInitializer);
        // <2.3> 连接服务器,并异步等待成功,即启动客户端
        bootstrap.connect().addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // 连接失败
                if (!future.isSuccess()) {
                    logger.error("[start][Netty Client 连接服务器({}:{}) 失败]", serverHost, serverPort);
                    reconnect();
                    return;
                }
                // 连接成功
                channel = future.channel();
                logger.info("[start][Netty Client 连接服务器({}:{}) 成功]", serverHost, serverPort);
            }

        });
    }

    public void reconnect() {
        // ... 暂时省略代码。
    }

    /**
     * 关闭 Netty Server
     */
    @PreDestroy
    public void shutdown() {
        // <3.1> 关闭 Netty Client
        if (channel != null) {
            channel.close();
        }
        // <3.2> 优雅关闭一个 EventLoopGroup 对象
        eventGroup.shutdownGracefully();
    }

    /**
     * 发送消息
     *
     * @param invocation 消息体
     */
    public void send(Invocation invocation) {
        if (channel == null) {
            logger.error("[send][连接不存在]");
            return;
        }
        if (!channel.isActive()) {
            logger.error("[send][连接({})未激活]", channel.id());
            return;
        }
        // 发送消息
        channel.writeAndFlush(invocation);
    }

}
友情提示:整体代码,是和「2.1.1 NettyServer」对等,且基本是一致的。

🔥 ① 在类上,添加 @Component 注解,把 NettyClient 的创建交给 Spring 管理。

  • serverHostserverPort 属性,读取 application.yml 配置文件的 netty.server.hostnetty.server.port 配置项。
  • #start() 方法,添加 @PostConstruct 注解,启动 Netty 客户端。
  • #shutdown() 方法,添加 @PreDestroy 注解,关闭 Netty 客户端。

🔥 ② 我们来详细看看 #start() 方法的代码,如何实现 Netty Client 的启动,建立和服务器的连接。

<2.1> 处,创建 Bootstrap 类,Netty 提供的客户端的启动类,方便我们初始化 Client。

<2.2> 处,设置 Bootstrap 的各种属性。

<2.2.1> 处,调用 #group(EventLoopGroup group) 方法,设置使用 eventGroup 线程组,实现客户端对服务端的连接、数据读写。

<2.2.2> 处,调用 #channel(Class<? extends C> channelClass) 方法,设置使用 NioSocketChannel 类,它是 Netty 定义的 NIO 服务端 TCP Client 实现类。

<2.2.3> 处,调用 #remoteAddress(SocketAddress localAddress) 方法,设置连接服务端的地址

<2.2.4> 处,调用 #option(ChannelOption<T> childOption, T value) 方法,TCP Keepalive 机制,实现 TCP 层级的心跳保活功能。

<2.2.5> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,允许较小的数据包的发送,降低延迟。

<2.2.7> 处,调用 #handler(ChannelHandler childHandler) 方法,设置自己 Channel 的处理器为 NettyClientHandlerInitializer。稍后我们在「2.2.2 NettyClientHandlerInitializer」小节来看看。

<2.3> 处,调用 #connect() 方法,连接服务器,并异步等待成功,即启动客户端。同时,添加回调监听器 ChannelFutureListener,在连接服务端失败的时候,调用 #reconnect() 方法,实现定时重连。😈 具体 #reconnect() 方法的代码,我们稍后在瞅瞅哈。

③ 我们来详细看看 #shutdown() 方法的代码,如何实现 Netty Client 的关闭。

<3.1> 处,调用 Channel 的 #close() 方法,关闭 Netty Client,这样客户端就断开和服务端的连接。

<3.2> 处,调用 EventLoopGroup 的 #shutdownGracefully() 方法,优雅关闭 EventLoopGroup。例如说,它们里面的线程池。

#send(Invocation invocation) 方法,实现向服务端发送消息。

因为 NettyClient 是客户端,所以无需像 NettyServer 一样使用「2.1.4 NettyChannelManager」维护 Channel 的集合。

2.2.2 NettyClientHandlerInitializer

创建的 NettyClientHandlerInitializer 类,就继承了 ChannelInitializer 抽象类,实现和服务端建立连接后,添加相应的 ChannelHandler 处理器。代码如下:

@Component
public class NettyClientHandlerInitializer extends ChannelInitializer<Channel> {

    /**
     * 心跳超时时间
     */
    private static final Integer READ_TIMEOUT_SECONDS = 60;

    @Autowired
    private MessageDispatcher messageDispatcher;

    @Autowired
    private NettyClientHandler nettyClientHandler;

    @Override
    protected void initChannel(Channel ch) {
        ch.pipeline()
                // 空闲检测
                .addLast(new IdleStateHandler(READ_TIMEOUT_SECONDS, 0, 0))
                .addLast(new ReadTimeoutHandler(3 * READ_TIMEOUT_SECONDS))
                // 编码器
                .addLast(new InvocationEncoder())
                // 解码器
                .addLast(new InvocationDecoder())
                // 消息分发器
                .addLast(messageDispatcher)
                // 客户端处理器
                .addLast(nettyClientHandler)
        ;
    }

}

「2.1.2 NettyServerHandlerInitializer」的代码基本一样,差别在于空闲检测额外增加 IdleStateHandler,客户端处理器换成了 NettyClientHandler

2.2.3 NettyClientHandler

创建 NettyClientHandler 类,实现客户端 Channel 断开连接、异常时的处理。代码如下:

@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private NettyClient nettyClient;

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 发起重连
        nettyClient.reconnect();
        // 继续触发事件
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);
        // 断开连接
        ctx.channel().close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
        // 空闲时,向服务端发起一次心跳
        if (event instanceof IdleStateEvent) {
            logger.info("[userEventTriggered][发起一次心跳]");
            HeartbeatRequest heartbeatRequest = new HeartbeatRequest();
            ctx.writeAndFlush(new Invocation(HeartbeatRequest.TYPE, heartbeatRequest))
                    .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        } else {
            super.userEventTriggered(ctx, event);
        }
    }

}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

#channelInactive(ChannelHandlerContext ctx) 方法,实现在和服务端断开连接时,调用 NettyClient 的 #reconnect() 方法,实现客户端定时和服务端重连

#exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 方法,在处理 Channel 的事件发生异常时,调用 Channel 的 #close() 方法,断开和客户端的连接。

#userEventTriggered(ChannelHandlerContext ctx, Object event) 方法,在客户端在空闲时,向服务端发送一次心跳,即心跳机制。这块的内容,我们稍后详细讲讲。

2.2.4 引入依赖

创建 pom.xml 文件,引入 Netty 依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>lab-67-netty-demo</artifactId>
        <groupId>cn.iocoder.springboot.labs</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-67-netty-demo-client</artifactId>

    <properties>
        <!-- 依赖相关配置 -->
        <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
        <!-- 插件相关配置 -->
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!-- 实现对 Spring MVC 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Netty 依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.50.Final</version>
        </dependency>

        <!-- 引入 netty-demo-common 封装 -->
        <dependency>
            <groupId>cn.iocoder.springboot.labs</groupId>
            <artifactId>lab-67-netty-demo-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

2.2.5 NettyClientApplication

创建 NettyClientApplication 类,Netty Client 启动类。代码如下:

@SpringBootApplication
public class NettyClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(NettyClientApplication.class, args);
    }

}

2.2.6 简单测试

执行 NettyClientApplication 类,启动 Netty Client 客户端。日志如下:

... // 省略其他日志

2020-06-21 09:06:12.205  INFO 44029 --- [ntLoopGroup-2-1] c.i.s.l.n.client.NettyClient             : [start][Netty Client 连接服务器(127.0.0.1:8888) 成功]

同时 Netty Server 服务端发现有一个客户端接入,打印如下日志:

2020-06-21 09:06:12.268  INFO 41948 --- [ntLoopGroup-3-1] c.i.s.l.n.server.NettyChannelManager     : [add][一个连接(db652822)加入]

2.3 小结

至此,我们已经构建 Netty 服务端和客户端完成。因为 Netty 提供的 API 非常便利,所以我们不会像直接使用 NIO 时,需要处理大量底层且细节的代码。

不过,如上的内容仅仅是本文的开胃菜,正片即将开始!美滋滋,继续往下看,奥利给!

3. 通信协议

「2. 构建 Netty 服务端与客户端」小节中,我们实现了客户端和服务端的连接功能。而本小节,我们要让它们两能够说上话,即进行数据的读写

在日常项目的开发中,前端和后端之间采用 HTTP 作为通信协议,使用文本内容进行交互,数据格式一般是 JSON。但是在 TCP 的世界里,我们需要自己基于二进制构建,构建客户端和服务端的通信协议。

我们以客户端向服务端发送消息来举个例子,假设客户端要发送一个登录请求,对应的类如下:

public class AuthRequest {

    /** 用户名 **/
    private String username;
    /** 密码 **/
    private String password;
    
}
  • 显然,我们无法将一个 Java 对象直接丢到 TCP Socket 当中,而是需要将其转换成 byte 字节数组,才能写入到 TCP Socket 中去。即,需要将消息对象通过序列化,转换成 byte 字节数组。
  • 同时,在服务端收到 byte 字节数组时,需要将其又转换成 Java 对象,即反序列化。不然,服务端对着一串 byte 字节处理个毛线?!
友情提示:服务端向客户端发消息,也是一样的过程哈!

序列化的工具非常多,例如说 Google 提供的 Protobuf,性能高效,且序列化出来的二进制数据较小。Netty 对 Protobuf 进行集成,提供了相应的编解码器。如下图所示:

Netty codeprotobuf/code 包

但是考虑到很多胖友对 Protobuf 并不了解,因为它实现序列化又增加胖友的额外学习成本。因此,艿艿仔细一个捉摸,还是采用 JSON 方式进行序列化。可能胖友会疑惑,JSON 不是将对象转换成字符串吗?嘿嘿,我们再把字符串转换成 byte 字节数组就可以啦~

下面,我们新建 lab-67-netty-demo-common 项目,并在 codec 包下,实现我们自定义的通信协议。如下图所示:

项目结构

3.1 Invocation

创建 Invocation 类,通信协议的消息体。代码如下:

/**
 * 通信协议的消息体
 */
public class Invocation {

    /**
     * 类型
     */
    private String type;
    /**
     * 消息,JSON 格式
     */
    private String message;

    // 空构造方法
    public Invocation() {
    }

    public Invocation(String type, String message) {
        this.type = type;
        this.message = message;
    }

    public Invocation(String type, Message message) {
        this.type = type;
        this.message = JSON.toJSONString(message);
    }
    
    // ... 省略 setter、getter、toString 方法
}

type 属性,类型,用于匹配对应的消息处理器。如果类比 HTTP 协议,type 属性相当于请求地址。

message 属性,消息内容,使用 JSON 格式。

另外,Message 是我们定义的消息接口。代码如下:

public interface Message {

    // ... 空,作为标记接口

}

3.2 粘包与拆包

在开始看 Invocation 的编解码处理器之前,我们先了解下粘包拆包的概念。

如果的内容,引用《Netty 解决粘包和拆包问题的四种方案》文章的内容,进行二次编辑。

3.2.1 产生原因

产生粘包和拆包问题的主要原因是,操作系统在发送 TCP 数据的时候,底层会有一个缓冲区,例如 1024 个字节大小。

  • 如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题。

    例如说,在《详解 Socket 编程 --- TCP_NODELAY 选项》文章中我们可以看到,在关闭 Nagle 算法时,请求不会等待满足缓冲区大小,而是尽快发出,降低延迟。
  • 如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP 就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。

如下图展示了粘包和拆包的一个示意图,演示了粘包和拆包的三种情况:

示例图

  • A 和 B 两个包都刚好满足 TCP 缓冲区的大小,或者说其等待时间已经达到 TCP 等待时长,从而还是使用两个独立的包进行发送。
  • A 和 B 两次请求间隔时间内较短,并且数据包较小,因而合并为同一个包发送给服务端。
  • B 包比较大,因而将其拆分为两个包 B_1 和 B_2 进行发送,而这里由于拆分后的 B_2 比较小,其又与 A 包合并在一起发送。

3.2.2 解决方案

对于粘包和拆包问题,常见的解决方案有三种:

🔥 ① 客户端在发送数据包的时候,每个包都固定长度。比如 1024 个字节大小,如果客户端发送的数据长度不足 1024 个字节,则通过补充空格的方式补全到指定长度。

这种方式,艿艿暂时没有找到采用这种方式的案例。

🔥 ② 客户端在每个包的末尾使用固定的分隔符。例如 \r\n,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的 \r\n,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包。

具体的案例,有 HTTP、WebSocket、Redis。

🔥 ③ 将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息。

友情提示:方案 ③ 是 ① 的升级版,动态长度

本文,艿艿将采用这种方式,在每次 Invocation 序列化成字节数组写入 TCP Socket 之前,先将字节数组的长度写到其中。如下图所示:

Invocation 序列化

3.3 InvocationEncoder

创建 InvocationEncoder 类,实现将 Invocation 序列化,并写入到 TCP Socket 中。代码如下:

public class InvocationEncoder extends MessageToByteEncoder<Invocation> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {
        // <2.1> 将 Invocation 转换成 byte[] 数组
        byte[] content = JSON.toJSONBytes(invocation);
        // <2.2> 写入 length
        out.writeInt(content.length);
        // <2.3> 写入内容
        out.writeBytes(content);
        logger.info("[encode][连接({}) 编码了一条消息({})]", ctx.channel().id(), invocation.toString());
    }

}

MessageToByteEncoder 是 Netty 定义的编码 ChannelHandler 抽象类,将泛型 <I> 消息转换成字节数组。

#encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) 方法,进行编码的逻辑。

<2.1> 处,调用 JSON 的 #toJSONBytes(Object object, SerializerFeature... features) 方法,将 Invocation 转换成 字节数组。

<2.2> 处,将字节数组的长度,写入到 TCP Socket 当中。这样,后续「3.4 InvocationDecoder」可以根据该长度,解析到消息,解决粘包和拆包的问题

友情提示:MessageToByteEncoder 会最终将 ByteBuf out 写到 TCP Socket 中。

<2.3> 处,将字节数组,写入到 TCP Socket 当中。

3.4 InvocationDecoder

创建 InvocationDecoder 类,实现从 TCP Socket 读取字节数组,反序列化成 Invocation。代码如下:

public class InvocationDecoder extends ByteToMessageDecoder {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // <2.1> 标记当前读取位置
        in.markReaderIndex();
        // <2.2> 判断是否能够读取 length 长度
        if (in.readableBytes() <= 4) {
            return;
        }
        // <2.3> 读取长度
        int length = in.readInt();
        if (length < 0) {
            throw new CorruptedFrameException("negative length: " + length);
        }
        // <3.1> 如果 message 不够可读,则退回到原读取位置
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }
        // <3.2> 读取内容
        byte[] content = new byte[length];
        in.readBytes(content);
        // <3.3> 解析成 Invocation
        Invocation invocation = JSON.parseObject(content, Invocation.class);
        out.add(invocation);
        logger.info("[decode][连接({}) 解析到一条消息({})]", ctx.channel().id(), invocation.toString());
    }

}

ByteToMessageDecoder 是 Netty 定义的解码 ChannelHandler 抽象类,在 TCP Socket 读取到新数据时,触发进行解码。

② 在 <2.1><2.2><2.3> 处,从 TCP Socket 中读取长度

③ 在 <3.1><3.2><3.3> 处,从 TCP Socket 中读取字节数组,并反序列化成 Invocation 对象。

最终,添加 List<Object> out 中,交给后续的 ChannelHandler 进行处理。稍后,我们将在「4. 消息分发」小结中,会看到 MessageDispatcher 将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。

3.5 引入依赖

创建 pom.xml 文件,引入 Netty、FastJSON 等等依赖。

  
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>lab-67-netty-demo</artifactId>
        <groupId>cn.iocoder.springboot.labs</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-67-netty-demo-common</artifactId>

    <properties>
        <!-- 插件相关配置 -->
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencies>
        <!-- Netty 依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.50.Final</version>
        </dependency>

        <!-- FastJSON 依赖 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.71</version>
        </dependency>

        <!-- 引入 Spring 相关依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>5.2.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.2.5.RELEASE</version>
        </dependency>

        <!-- 引入 SLF4J 依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
    </dependencies>

</project>

3.6 小结

至此,我们已经完成通信协议的定义、编解码的逻辑,是不是蛮有趣的?!

另外,我们在 NettyServerHandlerInitializer 和 NettyClientHandlerInitializer 的初始化代码中,将编解码器添加到其中。如下图所示:

编解码器的初始化

4. 消息分发

SpringMVC 中,DispatcherServlet 会根据请求地址、方法等,将请求分发到匹配的 Controller 的 Method 方法上。

lab-67-netty-demo-client 项目的 dispatcher 包中,我们创建了 MessageDispatcher 类,实现和 DispatcherServlet 类似的功能,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。

codedispatcher/code 包

下面,我们来看看具体的代码实现。

4.1 Message

创建 Message 接口,定义消息的标记接口。代码如下:

public interface Message {
}

下图,是我们涉及到的 Message 实现类。如下图所示:

Message 实现类

4.2 MessageHandler

创建 MessageHandler 接口,消息处理器接口。代码如下:

public interface MessageHandler<T extends Message> {

    /**
     * 执行处理消息
     *
     * @param channel 通道
     * @param message 消息
     */
    void execute(Channel channel, T message);

    /**
     * @return 消息类型,即每个 Message 实现类上的 TYPE 静态字段
     */
    String getType();

}
  • 定义了泛型 <T> ,需要是 Message 的实现类。
  • 定义的两个接口方法,胖友自己看下注释哈。

下图,是我们涉及到的 MessageHandler 实现类。如下图所示:

MessageHandler 实现类

4.3 MessageHandlerContainer

创建 MessageHandlerContainer 类,作为 MessageHandler 的容器。代码如下:

public class MessageHandlerContainer implements InitializingBean {

    private Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 消息类型与 MessageHandler 的映射
     */
    private final Map<String, MessageHandler> handlers = new HashMap<>();

    @Autowired
    private ApplicationContext applicationContext;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 通过 ApplicationContext 获得所有 MessageHandler Bean
        applicationContext.getBeansOfType(MessageHandler.class).values() // 获得所有 MessageHandler Bean
                .forEach(messageHandler -> handlers.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中
        logger.info("[afterPropertiesSet][消息处理器数量:{}]", handlers.size());
    }

    /**
     * 获得类型对应的 MessageHandler
     *
     * @param type 类型
     * @return MessageHandler
     */
    MessageHandler getMessageHandler(String type) {
        MessageHandler handler = handlers.get(type);
        if (handler == null) {
            throw new IllegalArgumentException(String.format("类型(%s) 找不到匹配的 MessageHandler 处理器", type));
        }
        return handler;
    }

    /**
     * 获得 MessageHandler 处理的消息类
     *
     * @param handler 处理器
     * @return 消息类
     */
    static Class<? extends Message> getMessageClass(MessageHandler handler) {
        // 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler);
        // 获得接口的 Type 数组
        Type[] interfaces = targetClass.getGenericInterfaces();
        Class<?> superclass = targetClass.getSuperclass();
        while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { // 此处,是以父类的接口为准
            interfaces = superclass.getGenericInterfaces();
            superclass = targetClass.getSuperclass();
        }
        if (Objects.nonNull(interfaces)) {
            // 遍历 interfaces 数组
            for (Type type : interfaces) {
                // 要求 type 是泛型参数
                if (type instanceof ParameterizedType) {
                    ParameterizedType parameterizedType = (ParameterizedType) type;
                    // 要求是 MessageHandler 接口
                    if (Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {
                        Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
                        // 取首个元素
                        if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
                            return (Class<Message>) actualTypeArguments[0];
                        } else {
                            throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
                        }
                    }
                }
            }
        }
        throw new IllegalStateException(String.format("类型(%s) 获得不到消息类型", handler));
    }

}

① 实现 InitializingBean 接口,在 #afterPropertiesSet() 方法中,扫描所有 MessageHandler Bean ,添加到 MessageHandler 集合中。

② 在 #getMessageHandler(String type) 方法中,获得类型对应的 MessageHandler 对象。稍后,我们会在 MessageDispatcher 调用该方法。

③ 在 #getMessageClass(MessageHandler handler) 方法中,通过 MessageHandler 中,通过解析其类上的泛型,获得消息类型对应的 Class 类。这是参考 rocketmq-spring 项目的 DefaultRocketMQListenerContainer#getMessageType() 方法,进行略微修改。

友情提示:如果胖友对 Java 的泛型机制没有做过一点了解,可能略微有点硬核。可以先暂时跳过,知道意图即可。

4.4 MessageDispatcher

创建 MessageDispatcher 类,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。代码如下:

@ChannelHandler.Sharable
public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> {

    @Autowired
    private MessageHandlerContainer messageHandlerContainer;

    private final ExecutorService executor =  Executors.newFixedThreadPool(200);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) {
        // <3.1> 获得 type 对应的 MessageHandler 处理器
        MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());
        // 获得  MessageHandler 处理器的消息类
        Class<? extends Message> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);
        // <3.2> 解析消息
        Message message = JSON.parseObject(invocation.getMessage(), messageClass);
        // <3.3> 执行逻辑
        executor.submit(new Runnable() {

            @Override
            public void run() {
                // noinspection unchecked
                messageHandler.execute(ctx.channel(), message);
            }

        });
    }

}

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

SimpleChannelInboundHandler 是 Netty 定义的消息处理 ChannelHandler 抽象类,处理消息的类型是 <I> 泛型时。

#channelRead0(ChannelHandlerContext ctx, Invocation invocation) 方法,处理消息,进行分发。

消息分发

<3.1> 处,调用 MessageHandlerContainer 的 #getMessageHandler(String type) 方法,获得 Invocation 的 type 对应的 MessageHandler 处理器

然后,调用 MessageHandlerContainer 的 #getMessageClass(messageHandler) 方法,获得 MessageHandler 处理器的消息类

<3.2> 处,调用 JSON 的 ## parseObject(String text, Class<T> clazz) 方法,将 Invocation 的 message 解析成 MessageHandler 对应的消息对象

<3.3> 处,丢到线程池中,然后调用 MessageHandler 的 #execute(Channel channel, T message) 方法,执行业务逻辑

注意,为什么要丢到 executor 线程池中呢?我们先来了解下 EventGroup 的线程模型。

友情提示:在我们启动 Netty 服务端或者客户端时,都会设置其 EventGroup。

EventGroup 我们可以先简单理解成一个线程池,并且线程池的大小仅仅是 CPU 数量 * 2。每个 Channel 仅仅会被分配到其中的一个线程上,进行数据的读写。并且,多个 Channel 会共享一个线程,即使用同一个线程进行数据的读写。

那么胖友试着思考下,MessageHandler 的具体逻辑视线中,往往会涉及到 IO 处理,例如说进行数据库的读取。这样,就会导致一个 Channel 在执行 MessageHandler 的过程中,阻塞了共享当前线程的其它 Channel 的数据读取。

因此,我们在这里创建了 executor 线程池,进行 MessageHandler 的逻辑执行,避免阻塞 Channel 的数据读取。

可能会有胖友说,我们是不是能够把 EventGroup 的线程池设置大一点,例如说 200 呢?对于长连接的 Netty 服务端,往往会有 1000 ~ 100000 的 Netty 客户端连接上来,这样无论设置多大的线程池,都会出现阻塞数据读取的情况。

友情提示:executor 线程池,我们一般称之为业务线程池或者逻辑线程池,顾名思义,就是执行业务逻辑的。

这样的设计方式,目前 Dubbo 等等 RPC 框架,都采用这种方式。

后续,胖友可以认真阅读下《【NIO 系列】——之 Reactor 模型》文章,进一步理解。

4.5 NettyServerConfig

创建 NettyServerConfig 配置类,创建 MessageDispatcher 和 MessageHandlerContainer Bean。代码如下:

@Configuration
public class NettyServerConfig {

    @Bean
    public MessageDispatcher messageDispatcher() {
        return new MessageDispatcher();
    }

    @Bean
    public MessageHandlerContainer messageHandlerContainer() {
        return new MessageHandlerContainer();
    }

}

4.6 NettyClientConfig

友情提示:和「4.5 NettyServerConfig」小结一致。

创建 NettyClientConfig 配置类,创建 MessageDispatcher 和 MessageHandlerContainer Bean。代码如下:

@Configuration
public class NettyClientConfig {

    @Bean
    public MessageDispatcher messageDispatcher() {
        return new MessageDispatcher();
    }

    @Bean
    public MessageHandlerContainer messageHandlerContainer() {
        return new MessageHandlerContainer();
    }

}

4.7 小结

后续,我们将在如下小节,具体演示消息分发的使用:

5. 断开重连

Netty 客户端需要实现断开重连机制,解决各种情况下的断开情况。例如说:

  • Netty 客户端启动时,Netty 服务端处于挂掉,导致无法连接上。
  • 在运行过程中,Netty 服务端挂掉,导致连接被断开。
  • 任一一端网络抖动,导致连接异常断开。

具体的代码实现比较简单,只需要在两个地方增加重连机制。

  • Netty 客户端启动时,无法连接 Netty 服务端时,发起重连。
  • Netty 客户端运行时,和 Netty 断开连接时,发起重连。

考虑到重连会存在失败的情况,我们采用定时重连的方式,避免占用过多资源。

5.1 具体代码

① 在 NettyClient 中,提供 #reconnect() 方法,实现定时重连的逻辑。代码如下:

// NettyClient.java

public void reconnect() {
    eventGroup.schedule(new Runnable() {
        @Override
        public void run() {
            logger.info("[reconnect][开始重连]");
            try {
                start();
            } catch (InterruptedException e) {
                logger.error("[reconnect][重连失败]", e);
            }
        }
    }, RECONNECT_SECONDS, TimeUnit.SECONDS);
    logger.info("[reconnect][{} 秒后将发起重连]", RECONNECT_SECONDS);
}

通过调用 EventLoop 提供的 #schedule(Runnable command, long delay, TimeUnit unit) 方法,实现定时逻辑。而在内部的具体逻辑,调用 NettyClient 的 #start() 方法,发起连接 Netty 服务端。

又因为 NettyClient 在 #start() 方法在连接 Netty 服务端失败时,又会调用 #reconnect() 方法,从而再次发起定时重连。如此循环反复,知道 Netty 客户端连接上 Netty 服务端。如下图所示:

NettyClient 重连

② 在 NettyClientHandler 中,实现 #channelInactive(ChannelHandlerContext ctx) 方法,在发现和 Netty 服务端断开时,调用 Netty Client 的 #reconnect() 方法,发起重连。代码如下:

// NettyClientHandler.java

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    // 发起重连
    nettyClient.reconnect();
    // 继续触发事件
    super.channelInactive(ctx);
}

5.2 简单测试

① 启动 Netty Client,不要启动 Netty Server,控制台打印日志如下图:

重连失败

可以看到 Netty Client 在连接失败时,不断发起定时重连。

② 启动 Netty Server,控制台打印如下图:

重连成功

可以看到 Netty Client 成功重连上 Netty Server。

6. 心跳机制与空闲检测

在上文中,艿艿推荐胖友阅读《TCP Keepalive 机制刨根问底》文章,我们可以了解到 TCP 自带的空闲检测机制,默认是 2 小时。这样的检测机制,从系统资源层面上来说是可以接受的。

但是在业务层面,如果 2 小时才发现客户端与服务端的连接实际已经断开,会导致中间非常多的消息丢失,影响客户的使用体验。

因此,我们需要在业务层面,自己实现空闲检测,保证尽快发现客户端与服务端实际已经断开的情况。实现逻辑如下:

  • 服务端发现 180 秒未从客户端读取到消息,主动断开连接。
  • 客户端发现 180 秒未从服务端读取到消息,主动断开连接。

考虑到客户端和服务端之间并不是一直有消息的交互,所以我们需要增加心跳机制

  • 客户端每 60 秒向服务端发起一次心跳消息,保证服务端可以读取到消息。
  • 服务端在收到心跳消息时,回复客户端一条确认消息,保证客户端可以读取到消息。

友情提示:

  • 为什么是 180 秒?可以加大或者减小,看自己希望多快检测到连接异常。过短的时间,会导致心跳过于频繁,占用过多资源。
  • 为什么是 60 秒?三次机会,确认是否心跳超时。

虽然听起来有点复杂,但是实现起来并不复杂哈。

6.1 服务端的空闲检测

NettyServerHandlerInitializer 中,我们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。如下图所示:

ReadTimeoutHandler

通过这样的方式,实现服务端发现 180 秒未从客户端读取到消息,主动断开连接。

6.2 客户端的空闲检测

友情提示:和「6.1 服务端的空闲检测」一致。

NettyClientHandlerInitializer 中,我们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。如下图所示:

ReadTimeoutHandler

通过这样的方式,实现客户端发现 180 秒未从服务端读取到消息,主动断开连接。

6.3 心跳机制

Netty 提供了 IdleStateHandler 处理器,提供空闲检测的功能,在 Channel 的读或者写空闲时间太长时,将会触发一个 IdleStateEvent 事件。

这样,我们只需要在 NettyClientHandler 处理器中,在接收到 IdleStateEvent 事件时,客户端向客户端发送一次心跳消息。如下图所示:

客户端心跳

同时,我们在服务端项目中,创建了一个 HeartbeatRequestHandler 消息处理器,在收到客户端的心跳请求时,回复客户端一条确认消息。代码如下:

@Component
public class HeartbeatRequestHandler implements MessageHandler<HeartbeatRequest> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void execute(Channel channel, HeartbeatRequest message) {
        logger.info("[execute][收到连接({}) 的心跳请求]", channel.id());
        // 响应心跳
        HeartbeatResponse response = new HeartbeatResponse();
        channel.writeAndFlush(new Invocation(HeartbeatResponse.TYPE, response));
    }

    @Override
    public String getType() {
        return HeartbeatRequest.TYPE;
    }

}

6.4 简单测试

启动 Netty Server 服务端,再启动 Netty Client 客户端,耐心等待 60 秒后,可以看到心跳日志如下:

// ... 客户端
2020-06-22 08:24:47.275  INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.c.handler.NettyClientHandler   : [userEventTriggered][发起一次心跳]
2020-06-22 08:24:47.335  INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(44223e18) 编码了一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})]
2020-06-22 08:24:47.408  INFO 57005 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(44223e18) 解析到一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]
2020-06-22 08:24:47.409  INFO 57005 --- [pool-1-thread-1] c.i.s.l.n.m.h.HeartbeatResponseHandler   : [execute][收到连接(44223e18) 的心跳响应]

// ... 服务端
2020-06-22 08:24:47.388  INFO 56998 --- [ntLoopGroup-3-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(34778465) 解析到一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})]
2020-06-22 08:24:47.390  INFO 56998 --- [pool-1-thread-1] c.i.s.l.n.m.h.HeartbeatRequestHandler    : [execute][收到连接(34778465) 的心跳请求]
2020-06-22 08:24:47.399  INFO 56998 --- [ntLoopGroup-3-1] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(34778465) 编码了一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]

7. 认证逻辑

友情提示:从本小节开始,我们就具体看看业务逻辑的处理示例。

认证的过程,如下图所示:

认证流程

7.1 AuthRequest

创建 AuthRequest 类,定义用户认证请求。代码如下:

public class AuthRequest implements Message {

    public static final String TYPE = "AUTH_REQUEST";

    /**
     * 认证 Token
     */
    private String accessToken;
    
    // ... 省略 setter、getter、toString 方法
}

这里我们使用 accessToken 认证令牌进行认证。

因为一般情况下,我们使用 HTTP 进行登录系统,然后使用登录后的身份标识(例如说 accessToken 认证令牌),将客户端和当前用户进行认证绑定。

7.2 AuthResponse

创建 AuthResponse 类,定义用户认证响应。代码如下:

public class AuthResponse implements Message {

    public static final String TYPE = "AUTH_RESPONSE";

    /**
     * 响应状态码
     */
    private Integer code;
    /**
     * 响应提示
     */
    private String message;
    
    // ... 省略 setter、getter、toString 方法
}

7.3 AuthRequestHandler

服务端...

创建 AuthRequestHandler 类,为服务端处理客户端的认证请求。代码如下:

@Component
public class AuthRequestHandler implements MessageHandler<AuthRequest> {

    @Autowired
    private NettyChannelManager nettyChannelManager;

    @Override
    public void execute(Channel channel, AuthRequest authRequest) {
        // <1> 如果未传递 accessToken
        if (StringUtils.isEmpty(authRequest.getAccessToken())) {
            AuthResponse authResponse = new AuthResponse().setCode(1).setMessage("认证 accessToken 未传入");
            channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse));
            return;
        }

        // <2> ... 此处应有一段

        // <3> 将用户和 Channel 绑定
        // 考虑到代码简化,我们先直接使用 accessToken 作为 User
        nettyChannelManager.addUser(channel, authRequest.getAccessToken());

        // <4> 响应认证成功
        AuthResponse authResponse = new AuthResponse().setCode(0);
        channel.writeAndFlush(new Invocation(AuthResponse.TYPE, authResponse));
    }

    @Override
    public String getType() {
        return AuthRequest.TYPE;
    }

}

代码比较简单,胖友看看 <1><2><3><4> 上的注释。

7.4 AuthResponseHandler

客户端...

创建 AuthResponseHandler 类,为客户端处理服务端的认证响应。代码如下:

@Component
public class AuthResponseHandler implements MessageHandler<AuthResponse> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void execute(Channel channel, AuthResponse message) {
        logger.info("[execute][认证结果:{}]", message);
    }

    @Override
    public String getType() {
        return AuthResponse.TYPE;
    }

}

打印个认证结果,方便调试。

7.5 TestController

客户端...

创建 TestController 类,提供 /test/mock 接口,模拟客户端向服务端发送请求。代码如下:

@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private NettyClient nettyClient;

    @PostMapping("/mock")
    public String mock(String type, String message) {
        // 创建 Invocation 对象
        Invocation invocation = new Invocation(type, message);
        // 发送消息
        nettyClient.send(invocation);
        return "success";
    }

}

7.6 简单测试

启动 Netty Server 服务端,再启动 Netty Client 客户端,然后使用 Postman 模拟一次认证请求。如下图所示:

Postman 模拟认证请求

同时,可以看到认证成功的日志如下:

// 客户端...
2020-06-22 08:41:12.364  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(9e086597) 编码了一条消息(Invocation{type='AUTH_REQUEST', message='{"accessToken": "yunai"}'})]
2020-06-22 08:41:12.390  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='AUTH_RESPONSE', message='{"code":0}'})]
2020-06-22 08:41:12.392  INFO 57583 --- [pool-1-thread-1] c.i.s.l.n.m.auth.AuthResponseHandler     : [execute][认证结果:AuthResponse{code=0, message='null'}]

// 服务端...
2020-06-22 08:41:12.374  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(791f122b) 解析到一条消息(Invocation{type='AUTH_REQUEST', message='{"accessToken": "yunai"}'})]
2020-06-22 08:41:12.379  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(791f122b) 编码了一条消息(Invocation{type='AUTH_RESPONSE', message='{"code":0}'})]

8. 单聊逻辑

私聊的过程,如下图所示:

私聊流程

服务端负责将客户端 A 发送的私聊消息,转发给客户端 B。

8.1 ChatSendToOneRequest

创建 ChatSendToOneRequest 类,发送给指定人的私聊消息的请求。代码如下:

public class ChatSendToOneRequest implements Message {

    public static final String TYPE = "CHAT_SEND_TO_ONE_REQUEST";

    /**
     * 发送给的用户
     */
    private String toUser;
    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    
    // ... 省略 setter、getter、toString 方法
}

8.2 ChatSendResponse

创建 ChatSendResponse 类,聊天发送消息结果的响应。代码如下:

public class ChatSendResponse implements Message {

    public static final String TYPE = "CHAT_SEND_RESPONSE";

    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 响应状态码
     */
    private Integer code;
    /**
     * 响应提示
     */
    private String message;
    
    // ... 省略 setter、getter、toString 方法
}

8.3 ChatRedirectToUserRequest

创建 ChatRedirectToUserRequest 类, 转发消息给一个用户的请求。代码如下:

public class ChatRedirectToUserRequest implements Message {

    public static final String TYPE = "CHAT_REDIRECT_TO_USER_REQUEST";

    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    
    // ... 省略 setter、getter、toString 方法
}
友情提示:写完之后,艿艿突然发现少了一个 fromUser 字段,表示来自谁的消息。

8.4 ChatSendToOneHandler

服务端...

创建 ChatSendToOneHandler 类,为服务端处理客户端的私聊请求。代码如下:

@Component
public class ChatSendToOneHandler implements MessageHandler<ChatSendToOneRequest> {

    @Autowired
    private NettyChannelManager nettyChannelManager;

    @Override
    public void execute(Channel channel, ChatSendToOneRequest message) {
        // <1> 这里,假装直接成功
        ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0);
        channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse));

        // <2> 创建转发的消息,发送给指定用户
        ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId())
                .setContent(message.getContent());
        nettyChannelManager.send(message.getToUser(), new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest));
    }

    @Override
    public String getType() {
        return ChatSendToOneRequest.TYPE;
    }

}

代码比较简单,胖友看看 <1><2> 上的注释。

8.5 ChatSendResponseHandler

客户端...

创建 ChatSendResponseHandler 类,为客户端处理服务端的聊天响应。代码如下:

@Component
public class ChatSendResponseHandler implements MessageHandler<ChatSendResponse> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void execute(Channel channel, ChatSendResponse message) {
        logger.info("[execute][发送结果:{}]", message);
    }

    @Override
    public String getType() {
        return ChatSendResponse.TYPE;
    }

}

打印个聊天发送结果,方便调试。

8.6 ChatRedirectToUserRequestHandler

客户端

创建 ChatRedirectToUserRequestHandler 类,为客户端处理服务端的转发消息的请求。代码如下:

@Component
public class ChatRedirectToUserRequestHandler implements MessageHandler<ChatRedirectToUserRequest> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void execute(Channel channel, ChatRedirectToUserRequest message) {
        logger.info("[execute][收到消息:{}]", message);
    }

    @Override
    public String getType() {
        return ChatRedirectToUserRequest.TYPE;
    }

}

打印个聊天接收消息,方便调试。

8.7 简单测试

① 启动 Netty Server 服务端。

② 启动 Netty Client 客户端 A。然后使用 Postman 模拟一次认证请求(用户为 yunai)。如下图所示:

Postman 模拟认证请求

③ 启动 Netty Client 客户端 B。注意,需要设置 --server.port 端口为 8081,避免冲突。如下图所示:

IDEA 设置

然后使用 Postman 模拟一次认证请求(用户为 tutou)。如下图所示:

Postman 模拟认证请求

④ 最后使用 Postman 模拟一次 yunai 芋艿给 tutou 土豆发送一次私聊消息。如下图所示:

Postman 模拟私聊请求

同时,可以看到客户端 A 向客户端 B 发送私聊消息的日志如下:

// 客户端 A...(芋艿)
2020-06-22 08:48:09.505  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tudou", msgId: "1", content: "你猜"}'})]
2020-06-22 08:48:09.510  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})]
2020-06-22 08:48:09.511  INFO 57583 --- [ool-1-thread-69] c.i.s.l.n.m.c.ChatSendResponseHandler    : [execute][发送结果:ChatSendResponse{msgId='1', code=0, message='null'}]
2020-06-22 08:48:35.148  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tutou", msgId: "1", content: "你猜"}'})]
2020-06-22 08:48:35.150  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})]
2020-06-22 08:48:35.150  INFO 57583 --- [ool-1-thread-70] c.i.s.l.n.m.c.ChatSendResponseHandler    : [execute][发送结果:ChatSendResponse{msgId='1', code=0, message='null'}]

// 服务端 ...
2020-06-22 08:48:35.149  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(791f122b) 解析到一条消息(Invocation{type='CHAT_SEND_TO_ONE_REQUEST', message='{toUser: "tutou", msgId: "1", content: "你猜"}'})]
2020-06-22 08:48:35.149  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(791f122b) 编码了一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"1"}'})]
2020-06-22 08:48:35.149  INFO 56998 --- [ntLoopGroup-3-3] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(79cb3a1e) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"你猜","msgId":"1"}'})]

// 客户端 B...(秃头)
2020-06-22 08:48:18.277  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.c.handler.NettyClientHandler   : [userEventTriggered][发起一次心跳]
2020-06-22 08:48:18.278  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [encode][连接(24fbc3e8) 编码了一条消息(Invocation{type='HEARTBEAT_REQUEST', message='{}'})]
2020-06-22 08:48:18.280  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(24fbc3e8) 解析到一条消息(Invocation{type='HEARTBEAT_RESPONSE', message='{}'})]
2020-06-22 08:48:18.281  INFO 59613 --- [pool-1-thread-4] c.i.s.l.n.m.h.HeartbeatResponseHandler   : [execute][收到连接(24fbc3e8) 的心跳响应]
2020-06-22 08:48:35.150  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(24fbc3e8) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"你猜","msgId":"1"}'})]
2020-06-22 08:48:35.151  INFO 59613 --- [pool-1-thread-5] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='1', content='你猜'}]

9. 群聊逻辑

群聊的过程,如下图所示:

群聊流程

服务端负责将客户端 A 发送的群聊消息,转发给客户端 A、B、C。

友情提示:考虑到逻辑简洁,艿艿提供的本小节的示例,并不是一个一个群,而是所有人在一个大的群聊中哈~

9.1 ChatSendToAllRequest

创建 ChatSendToOneRequest 类,发送给所有人的群聊消息的请求。代码如下:

public class ChatSendToAllRequest implements Message {

    public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST";

    /**
     * 消息编号
     */
    private String msgId;
    /**
     * 内容
     */
    private String content;
    
    // ... 省略 setter、getter、toString 方法
}
友情提示:如果是正经的群聊,会有一个 groupId 字段,表示群编号。

9.2 ChatSendResponse

「8.2 ChatSendResponse」小节一致。

9.3 ChatRedirectToUserRequest

「8.3 ChatRedirectToUserRequest」小节一致。

9.4 ChatSendToAllHandler

服务端...

创建 ChatSendToAllHandler 类,为服务端处理客户端的群聊请求。代码如下:

@Component
public class ChatSendToAllHandler implements MessageHandler<ChatSendToAllRequest> {

    @Autowired
    private NettyChannelManager nettyChannelManager;

    @Override
    public void execute(Channel channel, ChatSendToAllRequest message) {
        // <1> 这里,假装直接成功
        ChatSendResponse sendResponse = new ChatSendResponse().setMsgId(message.getMsgId()).setCode(0);
        channel.writeAndFlush(new Invocation(ChatSendResponse.TYPE, sendResponse));

        // <2> 创建转发的消息,并广播发送
        ChatRedirectToUserRequest sendToUserRequest = new ChatRedirectToUserRequest().setMsgId(message.getMsgId())
                .setContent(message.getContent());
        nettyChannelManager.sendAll(new Invocation(ChatRedirectToUserRequest.TYPE, sendToUserRequest));
    }

    @Override
    public String getType() {
        return ChatSendToAllRequest.TYPE;
    }

}

代码比较简单,胖友看看 <1><2> 上的注释。

9.5 ChatSendResponseHandler

「8.5 ChatSendResponseHandler」小节一致。

9.6 ChatRedirectToUserRequestHandler

「8.6 ChatRedirectToUserRequestHandler」小节一致。

9.7 简单测试

① 启动 Netty Server 服务端。

② 启动 Netty Client 客户端 A。然后使用 Postman 模拟一次认证请求(用户为 yunai)。如下图所示:

Postman 模拟认证请求

③ 启动 Netty Client 客户端 B。注意,需要设置 --server.port 端口为 8081,避免冲突。

IDEA 设置

④ 启动 Netty Client 客户端 C。注意,需要设置 --server.port 端口为 8082,避免冲突。

IDEA 设置

⑤ 最后使用 Postman 模拟一次发送群聊消息。如下图所示:

Postman 模拟群聊请求

同时,可以看到客户端 A 群发给所有客户端的日志如下:

// 客户端 A...
2020-06-22 08:55:44.898  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(9e086597) 编码了一条消息(Invocation{type='CHAT_SEND_TO_ALL_REQUEST', message='{msgId: "2", content: "广播消息"}'})]
2020-06-22 08:55:44.901  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"2"}'})]
2020-06-22 08:55:44.901  INFO 57583 --- [ol-1-thread-148] c.i.s.l.n.m.c.ChatSendResponseHandler    : [execute][发送结果:ChatSendResponse{msgId='2', code=0, message='null'}]
2020-06-22 08:55:44.901  INFO 57583 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9e086597) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.903  INFO 57583 --- [ol-1-thread-149] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]

// 服务端...
2020-06-22 08:55:44.898  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(791f122b) 解析到一条消息(Invocation{type='CHAT_SEND_TO_ALL_REQUEST', message='{msgId: "2", content: "广播消息"}'})]
2020-06-22 08:55:44.901  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(791f122b) 编码了一条消息(Invocation{type='CHAT_SEND_RESPONSE', message='{"code":0,"msgId":"2"}'})]
2020-06-22 08:55:44.901  INFO 56998 --- [ntLoopGroup-3-2] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(791f122b) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.901  INFO 56998 --- [ntLoopGroup-3-3] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(79cb3a1e) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.901  INFO 56998 --- [ntLoopGroup-3-4] c.i.s.l.n.codec.InvocationEncoder        : [decode][连接(9dc03826) 编码了一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]

// 客户端 B...
2020-06-22 08:55:44.902  INFO 59613 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(24fbc3e8) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.902  INFO 59613 --- [ool-1-thread-83] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]

// 客户端 C...
2020-06-22 08:55:44.901  INFO 61597 --- [ntLoopGroup-2-1] c.i.s.l.n.codec.InvocationDecoder        : [decode][连接(9128c71c) 解析到一条消息(Invocation{type='CHAT_REDIRECT_TO_USER_REQUEST', message='{"content":"广播消息","msgId":"2"}'})]
2020-06-22 08:55:44.903  INFO 61597 --- [ool-1-thread-16] l.n.m.c.ChatRedirectToUserRequestHandler : [execute][收到消息:ChatRedirectToUserRequest{msgId='2', content='广播消息'}]

666. 彩蛋

至此,我们已经通过 Netty 实现了一个简单的 IM 功能,是不是收获蛮大的,嘿嘿。

下面,良心的艿艿,再来推荐一波文章,嘿嘿。

等后续,艿艿会在 https://github.com/YunaiV/one... 开源项目中,实现一个相对完整的客服功能,哈哈哈~

查看原文

赞 24 收藏 14 评论 3

芋道源码 回答了问题 · 2020-07-09

for循环数据量很大,很卡

避免一次性提交大事务。

对方如果有批量接口,每次提交 100 条,然后更新本地 db。
对方如果没有批量接口,操纵 1 条,更新 db 一次。

关注 7 回答 6

芋道源码 发布了文章 · 2020-06-16

国产最强 Spring Cloud 全家桶 Spring Cloud Alibaba!

大家好,我是艿艿,一个让你秃头的小胖子~请不要轻易关注我,发危!

今天我们来一起瞅瞅,Spring Cloud Alibaba 究竟是个什么东东?

1. Spring Cloud 体系

相信很多胖友都看过如下一张图:演进图

  • SpringBean(对象) 为中心,提供 IOC、AOP 等功能。
  • Spring BootApplication(应用) 为中心,提供自动配置监控等功能。
  • Spring CloudService(服务) 为中心,提供服务的注册与发现、服务的调用与负载均衡等功能。

先来一起看看 Spring Cloud 官方对自己的简短介绍:

FROM https://github.com/spring-cloud

Tools for building common patterns in distributed systems with Spring

  • 基于 Spring 构建分布式系统的工具集,简称“Spring 全家桶”。

再来一起看看 Spring Cloud 官方对功能点的介绍:

FROM https://spring.io/projects/sp...

功能列表:

  • 【配置中心】Distributed/versioned configuration
  • 【注册中心】Service registration and discovery
  • 【API 网关】Routing
  • 【服务调用】Service-to-service calls
  • 【负载均衡】Load balancing
  • 【服务容错】Circuit Breakers
  • 【分布式消息】Distributed messaging

------------ 分隔线 -------------

如下功能,从 Spring Cloud 迁移到 Spring Integration 中:

  • 【全局锁】Global locks
  • 【领导选举与集群状态】Leadership election and cluster state

虽然 Spring Cloud 提供了非常强大的功能,但是它并不提供所有的实现,而是通过 Spring Cloud Common 子项目,定义了统一的抽象 API。如下图所示:Spring Cloud Common 图

而后,不同厂商结合其自身的中间件,提供自己的 Spring Cloud 套件,例如说:

  • Netflix 结合自己的 Eureka、Ribbon、Hystrix 等开源中间件,实现了 spring-cloud-netflix

    Spring Cloud Netflix 是目前国内使用最为流行的 Spring Cloud 套件。不过随着 Netflix 在开源的调整,逐步会慢慢没落。
  • Kubernetes 结合自己的 apiserver、configmap 等功能,实现了 spring-cloud-kubernetes
  • Alibaba 结合自己的 Nacos、Dubbo、Sentinel 等开源中间件,实现了 spring-cloud-alibaba

    目测 Spring Cloud Alibaba 在国内会火?!

当然,Spring Cloud 官方还是提供了一些功能的具体实现的,嘿嘿。例如说:

下面,我们把 Spring Cloud 官方、Netflix、Alibaba 三者整理成如下表格:

Spring Cloud 技术栈整理

本文在提供完整代码示例,可见 https://github.com/YunaiV/Spr...

原创不易,给点个 Star 嘿,一起冲鸭!

2. Spring Cloud Alibaba 套件

Spring Cloud Alibaba 套件,阿里开源组件、阿里云商业组件整合进 Spring Cloud 体系当中,同时对 Spring Cloud Gateway、OpenFeign、Ribbon 等等进行集成。整体如下图所示:

Spring Cloud Alibaba 全景图

我们可以将 Spring Cloud Alibaba 套件中的阿里开源组件阿里云商业组件整理成如下对照表:

开源 V.S. 商业

主要功能如下:

  • 服务注册与发现:适配 Spring Cloud 服务注册与发现标准,默认集成了 Ribbon 的支持。
  • 分布式配置管理:支持分布式系统中的外部化配置,配置更改时自动刷新。
  • 服务限流降级:默认支持 WebServlet、WebFlux, OpenFeign、RestTemplate、Spring Cloud Gateway, Zuul, Dubbo 和 RocketMQ 限流降级功能的接入,可以在运行时通过控制台实时修改限流降级规则,还支持查看限流降级 Metrics 监控。
  • 消息驱动能力:基于 Spring Cloud Stream 为微服务应用构建消息驱动能力。
  • 分布式事务:使用 @GlobalTransactional 注解, 高效并且对业务零侵入地解决分布式事务问题。

商业化独有功能如下:

  • 分布式任务调度:提供秒级、精准、高可靠、高可用的定时(基于 Cron 表达式)任务调度服务。同时提供分布式的任务执行模型,如网格任务。网格任务支持海量子任务均匀分配到所有 Worker(schedulerx-client)上执行。

    曾经,阿里 SchedulerX 也是一个开源项目,现在已经木有消息了...嘿嘿,希望未来能够重启。
  • 阿里云对象存储:阿里云提供的海量、安全、低成本、高可靠的云存储服务。支持在任何应用、任何时间、任何地点存储和访问任意类型的数据。
  • 阿里云短信服务:覆盖全球的短信服务,友好、高效、智能的互联化通讯能力,帮助企业迅速搭建客户触达通道。

    友情提示:艿艿还是比较推荐使用云片来发送短信,客服妹子基本是秒回,特别是审核短信模板的时候。

2.1 阿里开源组件

2.1.1 Nacos

Nacos:一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。

2.1.2 Dubbo

Dubbo:Apache Dubbo™ 是一款高性能 Java RPC 框架。

2.1.3 Sentinel

Sentinel:把流量作为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。

2.1.4 RocketMQ

RocketMQ:一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。

2.1.5 Seata

Seata:阿里巴巴开源产品,一个易于使用的高性能微服务分布式事务解决方案。

2.2 阿里云商业组件

2.2.1 Alibaba Cloud ACM

Alibaba Cloud ACM:一款在分布式架构环境中对应用配置进行集中管理和推送的应用配置中心产品。

2.2.2 Alibaba Cloud OSS

Alibaba Cloud OSS: 阿里云对象存储服务(Object Storage Service,简称 OSS),是阿里云提供的海量、安全、低成本、高可靠的云存储服务。您可以在任何应用、任何时间、任何地点存储和访问任意类型的数据。

2.2.3 Alibaba Cloud SchedulerX

Alibaba Cloud SchedulerX: 阿里中间件团队开发的一款分布式任务调度产品,提供秒级、精准、高可靠、高可用的定时(基于 Cron 表达式)任务调度服务。

2.2.4 Alibaba Cloud SMS

Alibaba Cloud SMS: 覆盖全球的短信服务,友好、高效、智能的互联化通讯能力,帮助企业迅速搭建客户触达通道。

2.3 集成其它 Spring Cloud 组件

Spring Cloud Alibaba 在融入 Spring Cloud 体系之后,可以方便和其它 Spring Cloud 组件进行混合使用,强强联合。这也可以弥补,Alibaba 暂时缺失的开源组件。

友情提示:如下艿艿只补充 Spring Cloud Alibaba 缺失的组件。

2.3.1 API 网关

2.3.2 链路追踪

2.3.3 服务调用 + 负载均衡

3. 选择 Spring Cloud 还是 Dubbo?

相信很多胖友在做微服务架构的技术选型的时候,基本会面临到的一个问题,特别是这个问题发生在 Dubbo 停止维护的时期?知乎上还有一个非常热门的讨论帖子[《Spring Cloud 和 Dubbo 各自的优缺点是什么?
》](https://www.zhihu.com/questio...

3.1 选择 Spring Cloud 体系?

先假设,选择 Spring Cloud 体系,那么我们就要思考选择哪一个 Spring Cloud 套件。目前最流行的,可能是 Netflix 提供的 Spring Cloud Netflix 套件。

但是世界总是这么有趣,Netflix 开始减少在开源领域的投入,逐步开始停止维护其组件:

因此,Spring Cloud Netflix 进入维护模式,可见《Spring Cloud Greenwich.RC1 available now》新闻。如下图所示:

Spring Cloud Netflix 维护模式

那么,Spring Cloud Netflix 可能不是一个很好的选择。但是,Spring Cloud 又暂时没有其它非常流行的套件,暂时可靠的貌似只有 Alibaba 提供的 Spring Cloud Alibaba 套件!在 Wanted: who's using Spring Cloud Alibaba 中,可以看到目前在使用 Spring Cloud Alibaba 的公司和团队。

另外,听说 Spring Cloud Alibaba 会成为 Spring Cloud 第二代标准实现,但是艿艿暂时没找到这块的资料和新闻。

3.2 选择 Spring Cloud Alibaba 还是 Dubbo?

事实上,Dubbo 已经从一个 RPC 框架,慢慢演变成了 Dubbo 生态,如下图所示:

FROM http://dubbo.apache.org/zh-cn...

Dubbo 生态

非常丰富,我们在 Spring Cloud Alibaba 看到的组件,在 Dubbo 生态中都提供了对应的集成。所以吧,这样看下来,无论选择 Spring Cloud Alibaba 还是 Dubbo 体系,都是非常不错的选择。

如果胖友选择 Dubbo 体系,推荐阅读如下文章:

目前,艿艿自己团队选择的是 Spring Boot + Dubbo 作为整体架构,正在考虑慢慢迁移到 Spring Cloud Alibaba 上。主要原因是,我们面向的是 B 端业务,需要考虑私有化部署,而 Spring Cloud 相比 Dubbo 更易实现中间件的透明迁移

例如说,Spring Cloud Stream RocketMQ 迁移到 Spring Cloud Stream Kafka 或 Spring Cloud Stream RabbitMQ 只需要修改配置文件即可。感兴趣的胖友,可以对比学习如下三篇文章:

旁白君:毕竟私有化部署时,对方可能会对使用什么基础组件会有要求,他就不喜欢 RocketMQ 而要 RabbitMQ 消息队列。

很多时候,这是 JPA 相比 MyBatis 的优势。

666. 彩蛋

未来,艿艿会把自己开源电商项目 https://github.com/YunaiV/one...,迁移到 Spring Cloud Alibaba 架构,胖友可以 Star 一波。

另外,在 https://github.com/YunaiV/Spr... 项目中,艿艿会不断更新更多 Spring Cloud 和 Spring Cloud Alibaba 入门与实战的文章,让你不断秃头!

考虑到部分胖友对微服务暂时没有总体的认知,推荐阅读如下几篇文章:

参考文章如下:

查看原文

赞 6 收藏 4 评论 0

芋道源码 发布了文章 · 2020-06-09

Spring Boot 定时任务的技术选型对比

大家好,我是艿艿,一个日常在地铁撸码的小胖子~

今天我们来一起瞅瞅,在 Spring Boot 应用中,有哪些定时任务的技术选型~

1. 概述

在产品的色彩斑斓的黑的需求中,有存在一类需求,是需要去定时执行的,此时就需要使用到定时任务。例如说,每分钟扫描超时支付的订单,每小时清理一次日志文件,每天统计前一天的数据并生成报表,每个月月初的工资单的推送,每年一次的生日提醒等等。

其中,艿艿最喜欢“每个月月初的工资单的推送”,你呢?

在 JDK 中,内置了两个类,可以实现定时任务的功能:

  • java.util.Timer :可以通过创建 java.util.TimerTask 调度任务,在同一个线程中串行执行,相互影响。也就是说,对于同一个 Timer 里的多个 TimerTask 任务,如果一个 TimerTask 任务在执行中,其它 TimerTask 即使到达执行的时间,也只能排队等待。因为 Timer 是串行的,同时存在 坑坑 ,所以后来 JDK 又推出了 ScheduledExecutorService ,Timer 也基本不再使用。
  • java.util.concurrent.ScheduledExecutorService :在 JDK 1.5 新增,基于线程池设计的定时任务类,每个调度任务都会被分配到线程池中并发执行,互不影响。这样,ScheduledExecutorService 就解决了 Timer 串行的问题。

在日常开发中,我们很少直接使用 Timer 或 ScheduledExecutorService 来实现定时任务的需求。主要有几点原因:

  • 它们仅支持按照指定频率,不直接支持指定时间的定时调度,需要我们结合 Calendar 自行计算,才能实现复杂时间的调度。例如说,每天、每周五、2019-11-11 等等。
  • 它们是进程级别,而我们为了实现定时任务的高可用,需要部署多个进程。此时需要等多考虑,多个进程下,同一个任务在相同时刻,不能重复执行。
  • 项目可能存在定时任务较多,需要统一的管理,此时不得不进行二次封装。

所以,一般情况下,我们会选择专业的调度任务中间件

关于“任务”的叫法,也有叫“作业”的。在英文上,有 Task 也有 Job 。本质是一样的,本文两种都会用。

然后,一般来说是调度任务,定时执行。所以胖友会在本文,或者其它文章中,会看到“调度”或“定时”的字眼儿。

在 Spring 体系中,内置了两种定时任务的解决方案:

  • 第一种,Spring FrameworkSpring Task 模块,提供了轻量级的定时任务的实现。
  • 第二种,Spring Boot 2.0 版本,整合了 Quartz 作业调度框架,提供了功能强大的定时任务的实现。

    注:Spring Framework 已经内置了 Quartz 的整合。Spring Boot 1.X 版本未提供 Quartz 的自动化配置,而 2.X 版本提供了支持。

在 Java 生态中,还有非常多优秀的开源的调度任务中间件:

目前国内采用 Elastic-Job 和 XXL-JOB 为主。从艿艿了解到的情况,使用 XXL-JOB 的团队会更多一些,主要是上手较为容易,运维功能更为完善。

本文在提供完整代码示例,可见 https://github.com/YunaiV/Spr...lab-28 目录。

原创不易,给点个 Star 嘿,一起冲鸭!

2. 快速入门 Spring Task

示例代码对应仓库:lab-28-task-demo

考虑到实际场景下,我们很少使用 Spring Task ,所以本小节会写的比较简洁。如果对 Spring Task 比较感兴趣的胖友,可以自己去阅读 《Spring Framework Documentation —— Task Execution and Scheduling》 文档,里面有 Spring Task 相关的详细文档。

在本小节,我们会使用 Spring Task 功能,实现一个每 2 秒打印一行执行日志的定时任务。

2.1 引入依赖

pom.xml 文件中,引入相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-28-task-demo</artifactId>

    <dependencies>
        <!-- 实现对 Spring MVC 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

</project>

因为 Spring Task 是 Spring Framework 的模块,所以在我们引入 spring-boot-starter-web 依赖后,无需特别引入它。

同时,考虑到我们希望让项目启动时,不自动结束 JVM 进程,所以我们引入了 spring-boot-starter-web 依赖。

2.2 ScheduleConfiguration

cn.iocoder.springboot.lab28.task.config 包路径下,创建 ScheduleConfiguration 类,配置 Spring Task 。代码如下:

// ScheduleConfiguration.java

@Configuration
@EnableScheduling
public class ScheduleConfiguration {
}
  • 在类上,添加 @EnableScheduling 注解,启动 Spring Task 的定时任务调度的功能。

2.3 DemoJob

cn.iocoder.springboot.lab28.task.job 包路径下,创建 DemoJob 类,示例定时任务类。代码如下:

// DemoJob.java

@Component
public class DemoJob {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private final AtomicInteger counts = new AtomicInteger();

    @Scheduled(fixedRate = 2000)
    public void execute() {
        logger.info("[execute][定时第 ({}) 次执行]", counts.incrementAndGet());
    }

}
  • 在类上,添加 @Component 注解,创建 DemoJob Bean 对象。
  • 创建 #execute() 方法,实现打印日志。同时,在该方法上,添加 @Scheduled 注解,设置每 2 秒执行该方法。

虽然说,@Scheduled 注解,可以添加在一个类上的多个方法上,但是艿艿的个人习惯上,还是一个 Job 类,一个定时任务。😈

2.4 Application

创建 Application.java 类,配置 @SpringBootApplication 注解即可。代码如下:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

运行 Application 类,启动示例项目。输出日志精简如下:

# 初始化一个 ThreadPoolTaskScheduler 任务调度器
2019-11-30 18:02:58.415  INFO 83730 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'

# 每 2 秒,执行一次 DemoJob 的任务
2019-11-30 18:02:58.449  INFO 83730 --- [ pikaqiu-demo-1] c.i.springboot.lab28.task.job.DemoJob    : [execute][定时第 (1) 次执行]
2019-11-30 18:03:00.438  INFO 83730 --- [ pikaqiu-demo-1] c.i.springboot.lab28.task.job.DemoJob    : [execute][定时第 (2) 次执行]
2019-11-30 18:03:02.442  INFO 83730 --- [ pikaqiu-demo-2] c.i.springboot.lab28.task.job.DemoJob    : [execute][定时第 (3) 次执行]
  • 通过日志,我们可以看到,初始化一个 ThreadPoolTaskScheduler 任务调度器。之后,每 2 秒,执行一次 DemoJob 的任务。

至此,我们已经完成了 Spring Task 调度任务功能的入门。实际上,Spring Task 还提供了异步任务 ,这个我们在其它文章中,详细讲解。

下面「2.5 @Scheduled」「2.6 应用配置文件」两个小节,是补充知识,建议看看。

2.5 @Scheduled

@Scheduled 注解,设置定时任务的执行计划。

常用属性如下:

  • cron 属性:Spring Cron 表达式。例如说,"0 0 12 * * ?" 表示每天中午执行一次,"11 11 11 11 11 ?" 表示 11 月 11 号 11 点 11 分 11 秒执行一次(哈哈哈)。更多示例和讲解,可以看看 《Spring Cron 表达式》 文章。注意,以调用完成时刻为开始计时时间。
  • fixedDelay 属性:固定执行间隔,单位:毫秒。注意,以调用完成时刻为开始计时时间。
  • fixedRate 属性:固定执行间隔,单位:毫秒。注意,以调用开始时刻为开始计时时间。
  • 这三个属性,有点雷同,可以看看 《@Scheduled 定时任务的fixedRate、fixedDelay、cron 的区别》 ,一定要分清楚差异。

不常用属性如下:

  • initialDelay 属性:初始化的定时任务执行延迟,单位:毫秒。
  • zone 属性:解析 Spring Cron 表达式的所属的时区。默认情况下,使用服务器的本地时区。
  • initialDelayString 属性:initialDelay 的字符串形式。
  • fixedDelayString 属性:fixedDelay 的字符串形式。
  • fixedRateString 属性:fixedRate 的字符串形式。

2.6 应用配置文件

application.yml 中,添加 Spring Task 定时任务的配置,如下:

spring:
  task:
    # Spring Task 调度任务的配置,对应 TaskSchedulingProperties 配置类
    scheduling:
      thread-name-prefix: pikaqiu-demo- # 线程池的线程名的前缀。默认为 scheduling- ,建议根据自己应用来设置
      pool:
        size: 10 # 线程池大小。默认为 1 ,根据自己应用来设置
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
  • spring.task.scheduling 配置项,Spring Task 调度任务的配置,对应 TaskSchedulingProperties 配置类。
  • Spring Boot TaskSchedulingAutoConfiguration 自动化配置类,实现 Spring Task 的自动配置,创建 ThreadPoolTaskScheduler 基于线程池的任务调度器。本质上,ThreadPoolTaskScheduler 是基于 ScheduledExecutorService 的封装,增强在调度时间上的功能。

注意spring.task.scheduling.shutdown 配置项,是为了实现 Spring Task 定时任务的优雅关闭。我们想象一下,如果定时任务在执行的过程中,如果应用开始关闭,把定时任务需要使用到的 Spring Bean 进行销毁,例如说数据库连接池,那么此时定时任务还在执行中,一旦需要访问数据库,可能会导致报错。

  • 所以,通过配置 await-termination = true ,实现应用关闭时,等待定时任务执行完成。这样,应用在关闭的时,Spring 会优先等待 ThreadPoolTaskScheduler 执行完任务之后,再开始 Spring Bean 的销毁。
  • 同时,又考虑到我们不可能无限等待定时任务全部执行结束,因此可以配置 await-termination-period = 60 ,等待任务完成的最大时长,单位为秒。具体设置多少的等待时长,可以根据自己应用的需要。

3. 快速入门 Quartz 单机

示例代码对应仓库:lab-28-task-quartz-memory

在艿艿最早开始实习的时候,公司使用 Quartz 作为任务调度中间件。考虑到我们要实现定时任务的高可用,需要部署多个 JVM 进程。比较舒服的是,Quartz 自带了集群方案。它通过将作业信息存储到关系数据库中,并使用关系数据库的行锁来实现执行作业的竞争,从而保证多个进程下,同一个任务在相同时刻,不能重复执行。

可能很多胖友对 Quartz 还不是很了解,我们先来看一段简介:

FROM https://www.oschina.net/p/quartz

Quartz 是一个开源的作业调度框架,它完全由 Java 写成,并设计用于 J2SE 和 J2EE 应用中。它提供了巨大的灵活性而不牺牲简单性。你能够用它来为执行一个作业而创建简单的或复杂的调度。

它有很多特征,如:数据库支持,集群,插件,EJB 作业预构建,JavaMail 及其它,支持 cron-like 表达式等等。

在 Quartz 体系结构中,有三个组件非常重要:

  • Scheduler :调度器
  • Trigger :触发器
  • Job :任务

不了解的胖友,可以直接看看 《Quartz 入门详解》 文章。这里,艿艿就不重复赘述。

FROM https://medium.com/@ChamithKo...

Quartz 整体架构图

Quartz 分成单机模式和集群模式。

  • 本小节,我们先来学习下 Quartz 的单机模式,入门比较快。
  • 下一下「5. 再次入门 Quartz 集群」 ,我们再来学习下 Quartz 的集群模式。在生产环境下,一定一定一定要使用 Quartz 的集群模式,保证定时任务的高可用。

😈 下面,让我们开始遨游~

3.1 引入依赖

pom.xml 文件中,引入相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-28-task-quartz-memory</artifactId>

    <dependencies>
        <!-- 实现对 Spring MVC 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- 实现对 Quartz 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
    </dependencies>

</project>

具体每个依赖的作用,胖友自己认真看下艿艿添加的所有注释噢。

3.2 示例 Job

cn.iocoder.springboot.lab28.task.config.job 包路径下,我们来创建示例 Job 。

创建 DemoJob01 类,示例定时任务 01 类。代码如下:

// DemoJob01.java

public class DemoJob01 extends QuartzJobBean {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private final AtomicInteger counts = new AtomicInteger();

    @Autowired
    private DemoService demoService;

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        logger.info("[executeInternal][定时第 ({}) 次执行, demoService 为 ({})]", counts.incrementAndGet(),
                demoService);
    }

}
  • 继承 QuartzJobBean 抽象类,实现 #executeInternal(JobExecutionContext context) 方法,执行自定义的定时任务的逻辑。
  • QuartzJobBean 实现了 org.quartz.Job 接口,提供了 Quartz 每次创建 Job 执行定时逻辑时,将该 Job Bean 的依赖属性注入。例如说,DemoJob01 需要 @Autowired 注入的 demoService 属性。核心代码如下:

    // QuartzJobBean.java
    
    public final void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            // 将当前对象,包装成 BeanWrapper 对象
            BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);
            // 设置属性到 bw 中
            MutablePropertyValues pvs = new MutablePropertyValues();
            pvs.addPropertyValues(context.getScheduler().getContext());
            pvs.addPropertyValues(context.getMergedJobDataMap());
            bw.setPropertyValues(pvs, true);
        } catch (SchedulerException ex) {
            throw new JobExecutionException(ex);
        }
    
        // 执行提供给子类实现的抽象方法
        this.executeInternal(context);
    }
    
    protected abstract void executeInternal(JobExecutionContext context) throws JobExecutionException;
    • 这样一看,是不是清晰很多。不要惧怕中间件的源码,好奇哪个类或者方法,就点进去看看。反正,又不花钱。
  • counts 属性,计数器。用于我们后面我们展示,每次 DemoJob01 都会被 Quartz 创建出一个新的 Job 对象,执行任务。这个很重要,也要非常小心。

创建 DemoJob02 类,示例定时任务 02 类。代码如下:

// DemoJob02.java

public class DemoJob02 extends QuartzJobBean {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        logger.info("[executeInternal][我开始的执行了]");
    }

}
  • 比较简单,为了后面演示案例之用。

3.3 ScheduleConfiguration

cn.iocoder.springboot.lab28.task.config 包路径下,创建 ScheduleConfiguration 类,配置上述的两个示例 Job 。代码如下:

// ScheduleConfiguration.java

@Configuration
public class ScheduleConfiguration {

    public static class DemoJob01Configuration {

        @Bean
        public JobDetail demoJob01() {
            return JobBuilder.newJob(DemoJob01.class)
                    .withIdentity("demoJob01") // 名字为 demoJob01
                    .storeDurably() // 没有 Trigger 关联的时候任务是否被保留。因为创建 JobDetail 时,还没 Trigger 指向它,所以需要设置为 true ,表示保留。
                    .build();
        }

        @Bean
        public Trigger demoJob01Trigger() {
            // 简单的调度计划的构造器
            SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
                    .withIntervalInSeconds(5) // 频率。
                    .repeatForever(); // 次数。
            // Trigger 构造器
            return TriggerBuilder.newTrigger()
                    .forJob(demoJob01()) // 对应 Job 为 demoJob01
                    .withIdentity("demoJob01Trigger") // 名字为 demoJob01Trigger
                    .withSchedule(scheduleBuilder) // 对应 Schedule 为 scheduleBuilder
                    .build();
        }

    }

    public static class DemoJob02Configuration {

        @Bean
        public JobDetail demoJob02() {
            return JobBuilder.newJob(DemoJob02.class)
                    .withIdentity("demoJob02") // 名字为 demoJob02
                    .storeDurably() // 没有 Trigger 关联的时候任务是否被保留。因为创建 JobDetail 时,还没 Trigger 指向它,所以需要设置为 true ,表示保留。
                    .build();
        }

        @Bean
        public Trigger demoJob02Trigger() {
            // 基于 Quartz Cron 表达式的调度计划的构造器
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ? *");
            // Trigger 构造器
            return TriggerBuilder.newTrigger()
                    .forJob(demoJob02()) // 对应 Job 为 demoJob02
                    .withIdentity("demoJob02Trigger") // 名字为 demoJob02Trigger
                    .withSchedule(scheduleBuilder) // 对应 Schedule 为 scheduleBuilder
                    .build();
        }

    }

}
  • 内部创建了 DemoJob01Configuration 和 DemoJob02Configuration 两个配置类,分别配置 DemoJob01 和 DemoJob02 两个 Quartz Job 。
  • ========== DemoJob01Configuration ==========
  • #demoJob01() 方法,创建 DemoJob01 的 JobDetail Bean 对象。
  • #demoJob01Trigger() 方法,创建 DemoJob01 的 Trigger Bean 对象。其中,我们使用 SimpleScheduleBuilder 简单的调度计划的构造器,创建了每 5 秒执行一次,无限重复的调度计划。
  • ========== DemoJob2Configuration ==========
  • #demoJob2() 方法,创建 DemoJob02 的 JobDetail Bean 对象。
  • #demoJob02Trigger() 方法,创建 DemoJob02 的 Trigger Bean 对象。其中,我们使用 CronScheduleBuilder 基于 Quartz Cron 表达式的调度计划的构造器,创建了每 10 秒执行一次的调度计划。这里,推荐一个 Quartz/Cron/Crontab 表达式在线生成工具 ,方便帮我们生成 Quartz Cron 表达式,并计算出最近 5 次运行时间。

😈 因为 JobDetail 和 Trigger 一般是成双成对出现,所以艿艿习惯配置成一个 Configuration 配置类。

3.4 Application

创建 Application.java 类,配置 @SpringBootApplication 注解即可。代码如下:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

运行 Application 类,启动示例项目。输出日志精简如下:

# 创建了 Quartz QuartzScheduler 并启动
2019-11-30 23:40:05.123  INFO 92812 --- [           main] org.quartz.impl.StdSchedulerFactory      : Using default implementation for ThreadExecutor
2019-11-30 23:40:05.130  INFO 92812 --- [           main] org.quartz.core.SchedulerSignalerImpl    : Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
2019-11-30 23:40:05.130  INFO 92812 --- [           main] org.quartz.core.QuartzScheduler          : Quartz Scheduler v.2.3.2 created.
2019-11-30 23:40:05.131  INFO 92812 --- [           main] org.quartz.simpl.RAMJobStore             : RAMJobStore initialized.
2019-11-30 23:40:05.132  INFO 92812 --- [           main] org.quartz.core.QuartzScheduler          : Scheduler meta-data: Quartz Scheduler (v2.3.2) 'quartzScheduler' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

2019-11-30 23:40:05.132  INFO 92812 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler 'quartzScheduler' initialized from an externally provided properties instance.
2019-11-30 23:40:05.132  INFO 92812 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler version: 2.3.2
2019-11-30 23:40:05.132  INFO 92812 --- [           main] org.quartz.core.QuartzScheduler          : JobFactory set to: org.springframework.scheduling.quartz.SpringBeanJobFactory@203dd56b
2019-11-30 23:40:05.158  INFO 92812 --- [           main] o.s.s.quartz.SchedulerFactoryBean        : Starting Quartz Scheduler now
2019-11-30 23:40:05.158  INFO 92812 --- [           main] org.quartz.core.QuartzScheduler          : Scheduler quartzScheduler_$_NON_CLUSTERED started.

# DemoJob01
2019-11-30 23:40:05.164  INFO 92812 --- [eduler_Worker-1] c.i.springboot.lab28.task.job.DemoJob01  : [executeInternal][定时第 (1) 次执行, demoService 为 (cn.iocoder.springboot.lab28.task.service.DemoService@23d75d74)]
2019-11-30 23:40:09.866  INFO 92812 --- [eduler_Worker-2] c.i.springboot.lab28.task.job.DemoJob01  : [executeInternal][定时第 (1) 次执行, demoService 为 (cn.iocoder.springboot.lab28.task.service.DemoService@23d75d74)]
2019-11-30 23:40:14.865  INFO 92812 --- [eduler_Worker-4] c.i.springboot.lab28.task.job.DemoJob01  : [executeInternal][定时第 (1) 次执行, demoService 为 (cn.iocoder.springboot.lab28.task.service.DemoService@23d75d74)]

# DemoJob02
2019-11-30 23:40:10.004  INFO 92812 --- [eduler_Worker-3] c.i.springboot.lab28.task.job.DemoJob02  : [executeInternal][我开始的执行了]
2019-11-30 23:40:20.001  INFO 92812 --- [eduler_Worker-6] c.i.springboot.lab28.task.job.DemoJob02  : [executeInternal][我开始的执行了]
2019-11-30 23:40:30.002  INFO 92812 --- [eduler_Worker-9] c.i.springboot.lab28.task.job.DemoJob02  : [executeInternal][我开始的执行了]
  • 项目启动时,会创建了 Quartz QuartzScheduler 并启动。
  • 考虑到阅读日志方便,艿艿这里把 DemoJob01 和 DemoJob02 的日志分开来了。
  • 对于 DemoJob01 ,每 5 秒左右执行一次。同时我们可以看到,demoService 成功注入,而 counts 每次都是 1 ,说明每次 DemoJob01 都是新创建的。
  • 对于 DemoJob02 ,每 10 秒执行一次。
下面「3.5 应用配置文件」两个小节,是补充知识,建议看看。

3.5 应用配置文件

application.yml 中,添加 Quartz 的配置,如下:

spring:
  # Quartz 的配置,对应 QuartzProperties 配置类
  quartz:
    job-store-type: memory # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。
    auto-startup: true # Quartz 是否自动启动
    startup-delay: 0 # 延迟 N 秒启动
    wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
    overwrite-existing-jobs: false # 是否覆盖已有 Job 的配置
    properties: # 添加 Quartz Scheduler 附加属性,更多可以看 http://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/configuration.html 文档
      org:
        quartz:
          threadPool:
            threadCount: 25 # 线程池大小。默认为 10 。
            threadPriority: 5 # 线程优先级
            class: org.quartz.simpl.SimpleThreadPool # 线程池类型
#    jdbc: # 这里暂时不说明,使用 JDBC 的 JobStore 的时候,才需要配置

注意spring.quartz.wait-for-jobs-to-complete-on-shutdown 配置项,是为了实现 Quartz 的优雅关闭,建议开启。关于这块,和我们在 Spring Task 的「2.6 应用配置文件」 提到的是一致的。

4. 再次入门 Quartz 集群

示例代码对应仓库:lab-28-task-quartz-memory

实际场景下,我们必然需要考虑定时任务的高可用,所以基本上,肯定使用 Quartz 的集群方案。因此本小节,我们使用 Quartz 的 JDBC 存储器 JobStoreTX ,并是使用 MySQL 作为数据库。

如下是 Quartz 两种存储器的对比:

FROM https://blog.csdn.net/Evankak...
类型优点缺点
RAMJobStore不要外部数据库,配置容易,运行速度快因为调度程序信息是存储在被分配给 JVM 的内存里面,所以,当应用程序停止运行时,所有调度信息将被丢失。另外因为存储到JVM内存里面,所以可以存储多少个 Job 和 Trigger 将会受到限制
JDBC 作业存储支持集群,因为所有的任务信息都会保存到数据库中,可以控制事物,还有就是如果应用服务器关闭或者重启,任务信息都不会丢失,并且可以恢复因服务器关闭或者重启而导致执行失败的任务运行速度的快慢取决与连接数据库的快慢
艿艿:实际上,有方案可以实现兼具这两种方式的优点,我们在 「666. 彩蛋」 中来说。

另外,本小节提供的示例和 「3. 快速入门 Quartz 单机」 基本一致。😈 下面,让我们开始遨游~

4.1 引入依赖

pom.xml 文件中,引入相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-28-task-quartz-jdbc</artifactId>

    <dependencies>
        <!-- 实现对数据库连接池的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency> <!-- 本示例,我们使用 MySQL -->
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.48</version>
        </dependency>

        <!-- 实现对 Spring MVC 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- 实现对 Quartz 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>

        <!-- 方便等会写单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>
  • 「3.1 引入依赖」 基本一致,只是额外引入 spring-boot-starter-test 依赖,等会会写两个单元测试方法。

4.2 示例 Job

cn.iocoder.springboot.lab28.task.config.job 包路径下,创建 DemoJob01DemoJob02 类。代码如下:

// DemoJob01.java

@DisallowConcurrentExecution
public class DemoJob01 extends QuartzJobBean {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private DemoService demoService;

    @Override
    protected void executeInternal(JobExecutionContext context) {
        logger.info("[executeInternal][我开始的执行了, demoService 为 ({})]", demoService);
    }

}

// DemoJob02.java

@DisallowConcurrentExecution
public class DemoJob02 extends QuartzJobBean {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    protected void executeInternal(JobExecutionContext context) {
        logger.info("[executeInternal][我开始的执行了]");
    }

}

注意,不是以 Quartz Job 为维度,保证在多个 JVM 进程中,有且仅有一个节点在执行,而是以 JobDetail 为维度。虽然说,绝大多数情况下,我们会保证一个 Job 和 JobDetail 是一一对应。😈 所以,搞不清楚这个概念的胖友,最好搞清楚这个概念。实在有点懵逼,保证一个 Job 和 JobDetail 是一一对应就对了。

而 JobDetail 的唯一标识JobKey ,使用 name + group 两个属性。一般情况下,我们只需要设置 name 即可,而 Quartz 会默认 group = DEFAULT

不过这里还有一点要补充,也是需要注意的,在 Quartz 中,相同 Scheduler 名字的节点,形成一个 Quartz 集群。在下文中,我们可以通过 spring.quartz.scheduler-name 配置项,设置 Scheduler 的名字。

【重要】为什么要说这个呢?因为我们要完善一下上面的说法:通过在 Job 实现类上添加 @DisallowConcurrentExecution 注解,实现在相同 Quartz Scheduler 集群中,相同 JobKey 的 JobDetail ,保证在多个 JVM 进程中,有且仅有一个节点在执行。

4.3 应用配置文件

application.yml 中,添加 Quartz 的配置,如下:

spring:
  datasource:
    user:
      url: jdbc:mysql://127.0.0.1:3306/lab-28-quartz-jdbc-user?useSSL=false&useUnicode=true&characterEncoding=UTF-8
      driver-class-name: com.mysql.jdbc.Driver
      username: root
      password:
    quartz:
      url: jdbc:mysql://127.0.0.1:3306/lab-28-quartz-jdbc-quartz?useSSL=false&useUnicode=true&characterEncoding=UTF-8
      driver-class-name: com.mysql.jdbc.Driver
      username: root
      password:

  # Quartz 的配置,对应 QuartzProperties 配置类
  quartz:
    scheduler-name: clusteredScheduler # Scheduler 名字。默认为 schedulerName
    job-store-type: jdbc # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。
    auto-startup: true # Quartz 是否自动启动
    startup-delay: 0 # 延迟 N 秒启动
    wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
    overwrite-existing-jobs: false # 是否覆盖已有 Job 的配置
    properties: # 添加 Quartz Scheduler 附加属性,更多可以看 http://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/configuration.html 文档
      org:
        quartz:
          # JobStore 相关配置
          jobStore:
            # 数据源名称
            dataSource: quartzDataSource # 使用的数据源
            class: org.quartz.impl.jdbcjobstore.JobStoreTX # JobStore 实现类
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
            tablePrefix: QRTZ_ # Quartz 表前缀
            isClustered: true # 是集群模式
            clusterCheckinInterval: 1000
            useProperties: false
          # 线程池相关配置
          threadPool:
            threadCount: 25 # 线程池大小。默认为 10 。
            threadPriority: 5 # 线程优先级
            class: org.quartz.simpl.SimpleThreadPool # 线程池类型
    jdbc: # 使用 JDBC 的 JobStore 的时候,JDBC 的配置
      initialize-schema: never # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,我们手动创建表结构。
  • 配置项比较多,我们主要对比 「3.5 应用配置文件」 来看看。
  • spring.datasource 配置项下,用于创建多个数据源的配置。

    • user 配置,连接 lab-28-quartz-jdbc-user 库。目的是,为了模拟我们一般项目,使用到的业务数据库。
    • quartz 配置,连接 lab-28-quartz-jdbc-quartz 库。目的是,Quartz 会使用单独的数据库。😈 如果我们有多个项目需要使用到 Quartz 数据库的话,可以统一使用一个,但是要注意配置 spring.quartz.scheduler-name 设置不同的 Scheduler 名字,形成不同的 Quartz 集群。
  • spring.quartz 配置项下,额外增加了一些配置项,我们逐个来看看。

    • scheduler-name 配置,Scheduler 名字。这个我们在上文解释了很多次了,如果还不明白,请拍死自己。
    • job-store-type 配置,设置了使用 "jdbc" 的 Job 存储器。
    • properties.org.quartz.jobStore 配置,增加了 JobStore 相关配置。重点是,通过 dataSource 配置项,设置了使用名字为 "quartzDataSource" 的 DataSource 为数据源。😈 在 「4.4 DataSourceConfiguration」 中,我们会使用 spring.datasource.quartz 配置,来创建该数据源。
    • jdbc 配置项,虽然名字叫这个,主要是为了设置使用 SQL 初始化 Quartz 表结构。这里,我们设置 initialize-schema = never ,我们手动创建表结构。

咳咳咳,配置项确实有点多。如果暂时搞不明白的胖友,可以先简单把 spring.datasource 数据源,修改成自己的即可。

4.4 初始化 Quartz 表结构

Quartz Download 地址,下载对应版本的发布包。解压后,我们可以在 src/org/quartz/impl/jdbcjobstore/ 目录,看到各种数据库的 Quartz 表结构的初始化脚本。这里,因为我们使用 MySQL ,所以使用 tables_mysql_innodb.sql 脚本。

在数据库中执行该脚本,完成初始化 Quartz 表结构。如下图所示:Quartz 表结构

关于每个 Quartz 表结构的说明,可以看看 《Quartz 框架(二)——JobStore 数据库表字段详解》 文章。😈 实际上,也可以不看,哈哈哈哈。

我们会发现,每个表都有一个 SCHED_NAME 字段,Quartz Scheduler 名字。这样,实现每个 Quartz 集群,数据层面的拆分。

4.5 DataSourceConfiguration

cn.iocoder.springboot.lab28.task.config 包路径下,创建 DataSourceConfiguration 类,配置数据源。代码如下:

// DataSourceConfiguration.java

@Configuration
public class DataSourceConfiguration {

    /**
     * 创建 user 数据源的配置对象
     */
    @Primary
    @Bean(name = "userDataSourceProperties")
    @ConfigurationProperties(prefix = "spring.datasource.user") // 读取 spring.datasource.user 配置到 DataSourceProperties 对象
    public DataSourceProperties userDataSourceProperties() {
        return new DataSourceProperties();
    }

    /**
     * 创建 user 数据源
     */
    @Primary
    @Bean(name = "userDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.user.hikari") // 读取 spring.datasource.user 配置到 HikariDataSource 对象
    public DataSource userDataSource() {
        // 获得 DataSourceProperties 对象
        DataSourceProperties properties =  this.userDataSourceProperties();
        // 创建 HikariDataSource 对象
        return createHikariDataSource(properties);
    }

    /**
     * 创建 quartz 数据源的配置对象
     */
    @Bean(name = "quartzDataSourceProperties")
    @ConfigurationProperties(prefix = "spring.datasource.quartz") // 读取 spring.datasource.quartz 配置到 DataSourceProperties 对象
    public DataSourceProperties quartzDataSourceProperties() {
        return new DataSourceProperties();
    }

    /**
     * 创建 quartz 数据源
     */
    @Bean(name = "quartzDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.quartz.hikari")
    @QuartzDataSource
    public DataSource quartzDataSource() {
        // 获得 DataSourceProperties 对象
        DataSourceProperties properties =  this.quartzDataSourceProperties();
        // 创建 HikariDataSource 对象
        return createHikariDataSource(properties);
    }

    private static HikariDataSource createHikariDataSource(DataSourceProperties properties) {
        // 创建 HikariDataSource 对象
        HikariDataSource dataSource = properties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
        // 设置线程池名
        if (StringUtils.hasText(properties.getName())) {
            dataSource.setPoolName(properties.getName());
        }
        return dataSource;
    }

}
  • 基于 spring.datasource.user 配置项,创建了名字为 "userDataSource" 的 DataSource Bean 。并且,在其上我们添加了 @Primay 注解,表示其是数据源。
  • 基于 spring.datasource.quartz 配置项,创建了名字为 "quartzDataSource" 的 DataSource Bean 。并且,在其上我们添加了 @QuartzDataSource 注解,表示其是 Quartz 的数据源。😈 注意,一定要配置啊,这里艿艿卡了好久!!!!

4.6 定时任务配置

完成上述的工作之后,我们需要配置 Quartz 的定时任务。目前,有两种方式:

4.6.1 Bean 自动设置

cn.iocoder.springboot.lab28.task.config 包路径下,创建 ScheduleConfiguration 类,配置上述的两个示例 Job 。代码如下:

// ScheduleConfiguration.java

@Configuration
public class ScheduleConfiguration {

    public static class DemoJob01Configuration {

        @Bean
        public JobDetail demoJob01() {
            return JobBuilder.newJob(DemoJob01.class)
                    .withIdentity("demoJob01") // 名字为 demoJob01
                    .storeDurably() // 没有 Trigger 关联的时候任务是否被保留。因为创建 JobDetail 时,还没 Trigger 指向它,所以需要设置为 true ,表示保留。
                    .build();
        }

        @Bean
        public Trigger demoJob01Trigger() {
            // 简单的调度计划的构造器
            SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
                    .withIntervalInSeconds(5) // 频率。
                    .repeatForever(); // 次数。
            // Trigger 构造器
            return TriggerBuilder.newTrigger()
                    .forJob(demoJob01()) // 对应 Job 为 demoJob01
                    .withIdentity("demoJob01Trigger") // 名字为 demoJob01Trigger
                    .withSchedule(scheduleBuilder) // 对应 Schedule 为 scheduleBuilder
                    .build();
        }

    }

    public static class DemoJob02Configuration {

        @Bean
        public JobDetail demoJob02() {
            return JobBuilder.newJob(DemoJob02.class)
                    .withIdentity("demoJob02") // 名字为 demoJob02
                    .storeDurably() // 没有 Trigger 关联的时候任务是否被保留。因为创建 JobDetail 时,还没 Trigger 指向它,所以需要设置为 true ,表示保留。
                    .build();
        }

        @Bean
        public Trigger demoJob02Trigger() {
            // 简单的调度计划的构造器
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ? *");
            // Trigger 构造器
            return TriggerBuilder.newTrigger()
                    .forJob(demoJob02()) // 对应 Job 为 demoJob02
                    .withIdentity("demoJob02Trigger") // 名字为 demoJob02Trigger
                    .withSchedule(scheduleBuilder) // 对应 Schedule 为 scheduleBuilder
                    .build();
        }

    }

}

在 Quartz 调度器启动的时候,会根据该配置,自动调用如下方法:

  • Scheduler#addJob(JobDetail jobDetail, boolean replace) 方法,将 JobDetail 持久化到数据库。
  • Scheduler#scheduleJob(Trigger trigger) 方法,将 Trigger 持久化到数据库。

4.6.2 Scheduler 手动设置

一般情况下,艿艿推荐使用 Scheduler 手动设置。

创建 QuartzSchedulerTest 类,创建分别添加 DemoJob01 和 DemoJob02 的 Quartz 定时任务配置。代码如下:

// QuartzSchedulerTest.java

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class QuartzSchedulerTest {

    @Autowired
    private Scheduler scheduler;

    @Test
    public void addDemoJob01Config() throws SchedulerException {
        // 创建 JobDetail
        JobDetail jobDetail = JobBuilder.newJob(DemoJob01.class)
                .withIdentity("demoJob01") // 名字为 demoJob01
                .storeDurably() // 没有 Trigger 关联的时候任务是否被保留。因为创建 JobDetail 时,还没 Trigger 指向它,所以需要设置为 true ,表示保留。
                .build();
        // 创建 Trigger
        SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
                .withIntervalInSeconds(5) // 频率。
                .repeatForever(); // 次数。
        Trigger trigger = TriggerBuilder.newTrigger()
                .forJob(jobDetail) // 对应 Job 为 demoJob01
                .withIdentity("demoJob01Trigger") // 名字为 demoJob01Trigger
                .withSchedule(scheduleBuilder) // 对应 Schedule 为 scheduleBuilder
                .build();
        // 添加调度任务
        scheduler.scheduleJob(jobDetail, trigger);
//        scheduler.scheduleJob(jobDetail, Sets.newSet(trigger), true);
    }

    @Test
    public void addDemoJob02Config() throws SchedulerException {
        // 创建 JobDetail
        JobDetail jobDetail = JobBuilder.newJob(DemoJob02.class)
                .withIdentity("demoJob02") // 名字为 demoJob02
                .storeDurably() // 没有 Trigger 关联的时候任务是否被保留。因为创建 JobDetail 时,还没 Trigger 指向它,所以需要设置为 true ,表示保留。
                .build();
        // 创建 Trigger
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ? *");
        Trigger trigger = TriggerBuilder.newTrigger()
                .forJob(jobDetail) // 对应 Job 为 demoJob01
                .withIdentity("demoJob02Trigger") // 名字为 demoJob01Trigger
                .withSchedule(scheduleBuilder) // 对应 Schedule 为 scheduleBuilder
                .build();
        // 添加调度任务
        scheduler.scheduleJob(jobDetail, trigger);
//        scheduler.scheduleJob(jobDetail, Sets.newSet(trigger), true);
    }

}
  • 创建 JobDetail 和 Trigger 的代码,其实和 「4.6.1 Bean 自动设置」 是一致的。
  • 在每个单元测试方法的最后,调用 Scheduler#scheduleJob(JobDetail jobDetail, Trigger trigger) 方法,将 JobDetail 和 Trigger 持久化到数据库。
  • 如果想要覆盖数据库中的 Quartz 定时任务的配置,可以调用 Scheduler#scheduleJob(JobDetail jobDetail, Set<? extends Trigger> triggersForJob, boolean replace) 方法,传入 replace = true 进行覆盖配置。

4.7 Application

创建 Application.java 类,配置 @SpringBootApplication 注解即可。代码如下:

// Application.java

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}
  • 运行 Application 类,启动示例项目。具体的执行日志,和 「3.4 Application」 基本一致,艿艿这里就不重复罗列了。

如果胖友想要测试集群下的运行情况,可以再创建 创建 Application02.java 类,配置 @SpringBootApplication 注解即可。代码如下:

// Application02.java

@SpringBootApplication
public class Application02 {

    public static void main(String[] args) {
        // 设置 Tomcat 随机端口
        System.setProperty("server.port", "0");

        // 启动 Spring Boot 应用
        SpringApplication.run(Application.class, args);
    }

}
  • 运行 Application02 类,再次启动一个示例项目。然后,观察输出的日志,可以看到启动的两个示例项目,都会有 DemoJob01 和 DemoJob02 的执行日志。

5. 快速入门 XXL-JOB

示例代码对应仓库:lab-28-task-xxl-job

虽然说,Quartz 的功能,已经能够满足我们对定时任务的诉求,但是距离生产可用、好用,还是有一定的距离。在艿艿最早开始实习的时候,因为Quartz 只提供了任务调度的功能,不提供管理任务的管理与监控控制台,需要自己去做二次封装。当时,因为社区中找不到合适的实现这块功能的开源项目,所以我们就自己进行了简单的封装,满足我们的管理与监控的需求。

不过现在呢,开源社区中已经有了很多优秀的调度任务中间件。其中,比较有代表性的就是 XXL-JOB 。其对自己的定义如下:

XXL-JOB 是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。

对于 XXL-JOB 的入门,艿艿已经在 《芋道 XXL-JOB 极简入门》 中编写,胖友先跳转到该文章阅读。重点是,要先搭建一个 XXL-JOB 调度中心。😈 因为,本文我们是来在 Spring Boot 项目中,实现一个 XXL-JOB 执行器。

5.1 引入依赖

pom.xml 文件中,引入相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-28-task-xxl-job</artifactId>

    <dependencies>
        <!-- 实现对 Spring MVC 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- XXL-JOB 相关依赖 -->
        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>

</project>

具体每个依赖的作用,胖友自己认真看下艿艿添加的所有注释噢。比较可惜的是,目前 XXL-JOB 官方并未提供 Spring Boot Starter 包,略微有点尴尬。不过,社区已经有人在提交 Pull Request 了,详细可见 https://github.com/xuxueli/xx...

5.2 应用配置文件

application.yml 中,添加 Quartz 的配置,如下:

server:
  port: 9090 #指定一个端口,避免和 XXL-JOB 调度中心的端口冲突。仅仅测试之用

# xxl-job
xxl:
  job:
    admin:
      addresses: http://127.0.0.1:8080/xxl-job-admin # 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
    executor:
      appname: lab-28-executor # 执行器 AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
      ip: # 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
      port: 6666 # ### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
      logpath: /Users/yunai/logs/xxl-job/lab-28-executor # 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
      logretentiondays: 30 # 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
    accessToken: yudaoyuanma # 执行器通讯TOKEN [选填]:非空时启用;
  • 具体每个参数的作用,胖友自己看下详细的注释哈。

5.3 XxlJobConfiguration

cn.iocoder.springboot.lab28.task.config 包路径下,创建 DataSourceConfiguration 类,配置 XXL-JOB 执行器。代码如下:

// XxlJobConfiguration.java

@Configuration
public class XxlJobConfiguration {

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    @Value("${xxl.job.executor.appname}")
    private String appName;
    @Value("${xxl.job.executor.ip}")
    private String ip;
    @Value("${xxl.job.executor.port}")
    private int port;
    @Value("${xxl.job.accessToken}")
    private String accessToken;
    @Value("${xxl.job.executor.logpath}")
    private String logPath;
    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        // 创建 XxlJobSpringExecutor 执行器
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppName(appName);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        // 返回
        return xxlJobSpringExecutor;
    }

}
  • #xxlJobExecutor() 方法,创建了 Spring 容器下的 XXL-JOB 执行器 Bean 对象。要注意,方法上添加了的 @Bean 注解,配置了启动和销毁方法。

5.4 DemoJob

cn.iocoder.springboot.lab28.task.job 包路径下,创建 DemoJob 类,示例定时任务类。代码如下:

// DemoJob.java

@Component
@JobHandler("demoJob")
public class DemoJob extends IJobHandler {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private final AtomicInteger counts = new AtomicInteger();

    @Override
    public ReturnT<String> execute(String param) throws Exception {
        // 打印日志
        logger.info("[execute][定时第 ({}) 次执行]", counts.incrementAndGet());
        // 返回执行成功
        return ReturnT.SUCCESS;
    }

}
  • 继承 XXL-JOB IJobHandler 抽象类,通过实现 #execute(String param) 方法,从而实现定时任务的逻辑。
  • 在方法上,添加 @JobHandler 注解,设置 JobHandler 的名字。后续,我们在调度中心的控制台中,新增任务时,需要使用到这个名字。

#execute(String param) 方法的返回结果,为 ReturnT 类型。当返回值符合 “ReturnT.code == ReturnT.SUCCESS_CODE” 时表示任务执行成功,否则表示任务执行失败,而且可以通过 “ReturnT.msg” 回调错误信息给调度中心;从而,在任务逻辑中可以方便的控制任务执行结果。

#execute(String param) 方法的方法参数,为调度中心的控制台中,新增任务时,配置的“任务参数”。一般情况下,不会使用到。

5.5 Application

创建 Application.java 类,配置 @SpringBootApplication 注解即可。代码如下:

// Application.java

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

运行 Application 类,启动示例项目。输出日志精简如下:

# XXL-JOB 启动日志
2019-11-29 00:58:42.429  INFO 46957 --- [           main] c.xxl.job.core.executor.XxlJobExecutor   : >>>>>>>>>>> xxl-job register jobhandler success, name:demoJob, jobHandler:cn.iocoder.springboot.lab28.task.job.DemoJob@3af9aa66
2019-11-29 00:58:42.451  INFO 46957 --- [           main] c.x.r.r.provider.XxlRpcProviderFactory   : >>>>>>>>>>> xxl-rpc, provider factory add service success. serviceKey = com.xxl.job.core.biz.ExecutorBiz, serviceBean = class com.xxl.job.core.biz.impl.ExecutorBizImpl
2019-11-29 00:58:42.454  INFO 46957 --- [           main] c.x.r.r.provider.XxlRpcProviderFactory   : >>>>>>>>>>> xxl-rpc, provider factory add service success. serviceKey = com.xxl.job.core.biz.ExecutorBiz, serviceBean = class com.xxl.job.core.biz.impl.ExecutorBizImpl
2019-11-29 00:58:42.565  INFO 46957 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2019-11-29 00:58:42.629  INFO 46957 --- [       Thread-7] com.xxl.rpc.remoting.net.Server          : >>>>>>>>>>> xxl-rpc remoting server start success, nettype = com.xxl.rpc.remoting.net.impl.netty_http.server.NettyHttpServer, port = 6666

此时,因为我们并未在 XXL-JOB 调度中心进行相关的配置,所以 DemoJob 并不会执行。下面,让我们在 XXL-JOB 调度中心进行相应的配置。

5.6 新增执行器

浏览器打开 http://127.0.0.1:8080/xxl-job... 地址,即「执行器管理」菜单。如下图:执行器管理

点击「新增执行器」按钮,弹出「新增执行器」界面。如下图:新增执行器

填写完 "lab-28-executor" 执行器的信息,点击「保存」按钮,进行保存。耐心等待一会,执行器会自动注册上来。如下图:执行器管理

  • 执行器列表中显示在线的执行器列表, 可通过 "OnLine 机器" 查看对应执行器的集群机器。

相同执行器,有且仅需配置一次即可。

5.7 新建任务

浏览器打开 http://127.0.0.1:8080/xxl-job... 地址,即「任务管理」菜单。如下图:任务管理

点击最右边的「新增」按钮,弹出「新增」界面。如下图:新增

填写完 "demoJob" 任务的信息,点击「保存」按钮,进行保存。如下图:任务管理

点击 "demoJob" 任务的「操作」按钮,选择「启动」,确认后,该 "demoJob" 任务的状态就变成了 RUNNING 。如下图:任务管理

此时,我们打开执行器的 IDE 界面,可以看到 DemoJob 已经在每分钟执行一次了。日志如下:

2019-11-29 01:30:00.161  INFO 48374 --- [      Thread-18] c.i.springboot.lab28.task.job.DemoJob    : [execute][定时第 (1) 次执行]
2019-11-29 01:31:00.012  INFO 48374 --- [      Thread-18] c.i.springboot.lab28.task.job.DemoJob    : [execute][定时第 (2) 次执行]
2019-11-29 01:32:00.009  INFO 48374 --- [      Thread-18] c.i.springboot.lab28.task.job.DemoJob    : [execute][定时第 (3) 次执行]
2019-11-29 01:33:00.010  INFO 48374 --- [      Thread-18] c.i.springboot.lab28.task.job.DemoJob    : [execute][定时第 (4) 次执行]
2019-11-29 01:34:00.005  INFO 48374 --- [      Thread-18] c.i.springboot.lab28.task.job.DemoJob    : [execute][定时第 (5) 次执行]

并且,我们在调度中心的界面上,点击 "demoJob" 任务的「操作」按钮,选择「查询日志」,可以看到相应的调度日志。如下图:查询日志

至此,我们已经完成了 XXL-JOB 执行器的入门。

6. 快速入门 Elastic-Job

可能很多胖友不了解 Elastic-Job 这个中间件。我们看一段其官方文档的介绍:

Elastic-Job 是一个分布式调度解决方案,由两个相互独立的子项目 Elastic-Job-Lite 和 Elastic-Job-Cloud 组成。

Elastic-Job-Lite 定位为轻量级无中心化解决方案,使用 jar 包的形式提供分布式任务的协调服务。

Elastic-Job 基本是国内开源最好的调度任务中间件的几个中间件,可能没有之一,嘿嘿。目前处于有点“断更”的状态,具体可见 https://github.com/elasticjob...

所以关于这块的示例,艿艿暂时先不提供。如果对 Elastic-Job 源码感兴趣的胖友,可以看看艿艿写的如下两个系列:

666. 彩蛋

① 如何选择?

可能胖友希望了解下不同调度中间件的对比。表格如下:

特性quartzelastic-job-litexxl-jobLTS
依赖MySQL、jdkjdk、zookeepermysql、jdkjdk、zookeeper、maven
高可用多节点部署,通过竞争数据库锁来保证只有一个节点执行任务通过zookeeper的注册与发现,可以动态的添加服务器基于竞争数据库锁保证只有一个节点执行任务,支持水平扩容。可以手动增加定时任务,启动和暂停任务,有监控集群部署,可以动态的添加服务器。可以手动增加定时任务,启动和暂停任务。有监控
任务分片×
管理界面×
难易程度简单简单简单略复杂
高级功能-弹性扩容,多种作业模式,失效转移,运行状态收集,多线程处理数据,幂等性,容错处理,spring命名空间支持弹性扩容,分片广播,故障转移,Rolling实时日志,GLUE(支持在线编辑代码,免发布),任务进度监控,任务依赖,数据加密,邮件报警,运行报表,国际化支持spring,spring boot,业务日志记录器,SPI扩展支持,故障转移,节点监控,多样化任务执行结果支持,FailStore容错,动态扩容。
版本更新半年没更新2年没更新最近有更新1年没更新

也推荐看看如下文章:

目前的状况,如果真的不知道怎么选择,可以先尝试下 XXL-JOB

② 中心化 V.S 去中心化

下面,让我们一起来简单聊聊分布式调度中间件的实现方式的分类。一个分布式的调度中间件,会存在两种角色:

  • 调度器:负责调度任务,下发给执行器。
  • 执行器:负责接收任务,执行具体任务。

那么,如果从调度系统的角度来看,可以分成两类:

  • 中心化: 调度中心和执行器分离,调度中心统一调度,通知某个执行器处理任务。
  • 去中心化:调度中心和执行器一体化,自己调度自己执行处理任务。

如此可知 XXL-Job 属于中心化的任务调度平台。目前采用这种方案的还有:

  • 链家的 kob
  • 美团的 Crane(暂未开源)

去中心化的任务调度平台,目前有:

艿艿:如果胖友想要更加的理解,可以看看艿艿朋友写的 《中心化 V.S 去中心化调度设计》

③ 任务竞争 V.S 任务预分配

那么,如果从任务分配的角度来看,可以分成两类:

  • 任务竞争:调度器会通过竞争任务,下发任务给执行器。
  • 任务预分配:调度器预先分配任务给不同的执行器,无需进行竞争。

如此可知 XXL-Job 属于任务竞争的任务调度平台。目前采用这种方案的还有:

  • 链家的 kob
  • 美团的 Crane(暂未开源)
  • Quartz 基于数据库的集群方案

任务预分配的任务调度平台,目前有:

一般来说,基于任务预分配的任务调度平台,都会选择使用 Zookeeper 来协调分配任务到不同的节点上。同时,任务调度平台必须是去中心化的方案,每个节点即是调度器又是执行器。这样,任务在预分配在每个节点之后,后续就自己调度给自己执行。

相比较而言,随着节点越来越多,基于任务竞争的方案会因为任务竞争,导致存在性能下滑的问题。而基于任务预分配的方案,则不会存在这个问题。并且,基于任务预分配的方案,性能会优于基于任务竞争的方案。

这里在推荐一篇 Elastic Job 开发者张亮的文章 《详解当当网的分布式作业框架 elastic-job》 ,灰常给力!

④ Quartz 是个优秀的调度内核

绝大多数情况下,我们不会直接使用 Quartz 作为我们的调度中间件的选择。但是,基本所有的分布式调度中间件,都将 Quartz 作为调度内核,因为 Quartz 在单纯任务调度本身提供了很强的功能。

不过呢,随着一个分布式调度中间件的逐步完善,又会逐步考虑抛弃 Quartz 作为调度内核,转而自研。例如说 XXL-JOB 在 2.1.0 RELEASE 的版本中,就已经更换成自研的调度模块。其替换的理由如下:

XXL-JOB 最终选择自研调度组件(早期调度组件基于 Quartz);

  • 一方面,是为了精简系统降低冗余依赖。
  • 另一方面,是为了提供系统的可控度与稳定性。

在 Elastic-Job 3.X 的开发计划中,也有一项计划,就是自研调度组件,替换掉 Quartz 。

推荐阅读

查看原文

赞 22 收藏 16 评论 5

芋道源码 关注了用户 · 2020-05-18

小傅哥 @fuzhengwei

CodeGuide | 程序员编码指南 - 原创文章、案例源码、资料书籍、简历模版等下载。
链接:https://github.com/fuzhengwei... - 希望给我点个Star⭐!

作者小傅哥多年从事一线互联网 Java 开发的学习历程技术汇总,旨在为大家提供一个清晰详细的学习教程,侧重点更倾向编写Java核心内容。如果我的文章能为您提供帮助,请给予支持(关注、点赞、分享)!

关注 1232

芋道源码 分享了头条 · 2019-09-13

在单机版的Springboot+Shiro的基础上,这次实现共享Session。

赞 0 收藏 0 评论 0

芋道源码 分享了头条 · 2019-09-13

Optional 使用Lambda表达式 针对Lambda表达式设计 Stream 最后

赞 0 收藏 0 评论 0

芋道源码 分享了头条 · 2019-09-13

1、先来个自我介绍2、人到中年的焦虑3、萌生回到厦门的念头4、面试过程:笔试5、面试过程:HR面6、面试过程:技术面7、总结

赞 0 收藏 0 评论 1

芋道源码 分享了头条 · 2019-09-13

Tomcat性能调优JVM性能调优一、内存调优二、垃圾回收策略调优

赞 0 收藏 0 评论 0

认证与成就

  • 获得 61 次点赞
  • 获得 6 枚徽章 获得 0 枚金徽章, 获得 2 枚银徽章, 获得 4 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2017-10-22
个人主页被 1.6k 人浏览