kute

kute 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 个人简介什么都没有

个人动态

kute 收藏了文章 · 2月19日

Redis 多线程网络模型全面揭秘

博客原文

Redis 多线程网络模型全面揭秘

导言

在目前的技术选型中,Redis 俨然已经成为了系统高性能缓存方案的事实标准,因此现在 Redis 也成为了后端开发的基本技能树之一,Redis 的底层原理也顺理成章地成为了必须学习的知识。

Redis 从本质上来讲是一个网络服务器,而对于一个网络服务器来说,网络模型是它的精华,搞懂了一个网络服务器的网络模型,你也就搞懂了它的本质。

本文通过层层递进的方式,介绍了 Redis 网络模型的版本变更历程,剖析了其从单线程进化到多线程的工作原理,此外,还一并分析并解答了 Redis 的网络模型的很多抉择背后的思考,帮助读者能更深刻地理解 Redis 网络模型的设计。

Redis 有多快?

根据官方的 benchmark,通常来说,在一台普通硬件配置的 Linux 机器上跑单个 Redis 实例,处理简单命令(时间复杂度 O(N) 或者 O(log(N))),QPS 可以达到 8w+,而如果使用 pipeline 批处理功能,则 QPS 至高能达到 100w。

仅从性能层面进行评判,Redis 完全可以被称之为高性能缓存方案。

Redis 为什么快?

Redis 的高性能得益于以下几个基础:

  • C 语言实现,虽然 C 对 Redis 的性能有助力,但语言并不是最核心因素。
  • 纯内存 I/O,相较于其他基于磁盘的 DB,Redis 的纯内存操作有着天然的性能优势。
  • I/O 多路复用,基于 epoll/select/kqueue 等 I/O 多路复用技术,实现高吞吐的网络 I/O。
  • 单线程模型,单线程无法利用多核,但是从另一个层面来说则避免了多线程频繁上下文切换,以及同步机制如锁带来的开销。

Redis 为何选择单线程?

Redis 的核心网络模型选择用单线程来实现,这在一开始就引起了很多人的不解,Redis 官方的对于此的回答是:

It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

核心意思就是,对于一个 DB 来说,CPU 通常不会是瓶颈,因为大多数请求不会是 CPU 密集型的,而是 I/O 密集型。具体到 Redis 的话,如果不考虑 RDB/AOF 等持久化方案,Redis 是完全的纯内存操作,执行速度是非常快的,因此这部分操作通常不会是性能瓶颈,Redis 真正的性能瓶颈在于网络 I/O,也就是客户端和服务端之间的网络传输延迟,因此 Redis 选择了单线程的 I/O 多路复用来实现它的核心网络模型。

上面是比较笼统的官方答案,实际上更加具体的选择单线程的原因可以归纳如下:

避免过多的上下文切换开销

多线程调度过程中必然需要在 CPU 之间切换线程上下文 context,而上下文的切换又涉及程序计数器、堆栈指针和程序状态字等一系列的寄存器置换、程序堆栈重置甚至是 CPU 高速缓存、TLB 快表的汰换,如果是进程内的多线程切换还好一些,因为单一进程内多线程共享进程地址空间,因此线程上下文比之进程上下文要小得多,如果是跨进程调度,则需要切换掉整个进程地址空间。

如果是单线程则可以规避进程内频繁的线程切换开销,因为程序始终运行在进程中单个线程内,没有多线程切换的场景。

避免同步机制的开销

如果 Redis 选择多线程模型,又因为 Redis 是一个数据库,那么势必涉及到底层数据同步的问题,则必然会引入某些同步机制,比如锁,而我们知道 Redis 不仅仅提供了简单的 key-value 数据结构,还有 list、set 和 hash 等等其他丰富的数据结构,而不同的数据结构对同步访问的加锁粒度又不尽相同,可能会导致在操作数据过程中带来很多加锁解锁的开销,增加程序复杂度的同时还会降低性能。

简单可维护

Redis 的作者 Salvatore Sanfilippo (别称 antirez) 对 Redis 的设计和代码有着近乎偏执的简洁性理念,你可以在阅读 Redis 的源码或者给 Redis 提交 PR 的之时感受到这份偏执。因此代码的简单可维护性必然是 Redis 早期的核心准则之一,而引入多线程必然会导致代码的复杂度上升和可维护性下降。

事实上,多线程编程也不是那么尽善尽美,首先多线程的引入会使得程序不再保持代码逻辑上的串行性,代码执行的顺序将变成不可预测的,稍不注意就会导致程序出现各种并发编程的问题;其次,多线程模式也使得程序调试更加复杂和麻烦。网络上有一幅很有意思的图片,生动形象地描述了并发编程面临的窘境。

你期望的多线程编程 VS 实际上的多线程编程:

你期望的多线程VS实际上的多线程

前面我们提到引入多线程必须的同步机制,如果 Redis 使用多线程模式,那么所有的底层数据结构都必须实现成线程安全的,这无疑又使得 Redis 的实现变得更加复杂。

总而言之,Redis 选择单线程可以说是多方博弈之后的一种权衡:在保证足够的性能表现之下,使用单线程保持代码的简单和可维护性。

Redis 真的是单线程?

在讨论这个问题之前,我们要先明确『单线程』这个概念的边界:它的覆盖范围是核心网络模型,抑或是整个 Redis?如果是前者,那么答案是肯定的,在 Redis 的 v6.0 版本正式引入多线程之前,其网络模型一直是单线程模式的;如果是后者,那么答案则是否定的,Redis 早在 v4.0 就已经引入了多线程。

因此,当我们讨论 Redis 的多线程之时,有必要对 Redis 的版本划出两个重要的节点:

  1. Redis v4.0(引入多线程处理异步任务)
  2. Redis v6.0(正式在网络模型中实现 I/O 多线程)

单线程事件循环

我们首先来剖析一下 Redis 的核心网络模型,从 Redis 的 v1.0 到 v6.0 版本之前,Redis 的核心网络模型一直是一个典型的单 Reactor 模型:利用 epoll/select/kqueue 等多路复用技术,在单线程的事件循环中不断去处理事件(客户端请求),最后回写响应数据到客户端:

这里有几个核心的概念需要学习:

  • client:客户端对象,Redis 是典型的 CS 架构(Client <---> Server),客户端通过 socket 与服务端建立网络通道然后发送请求命令,服务端执行请求的命令并回复。Redis 使用结构体 client 存储客户端的所有相关信息,包括但不限于封装的套接字连接 -- *conn当前选择的数据库指针 -- *db读入缓冲区 -- querybuf写出缓冲区 -- buf写出数据链表 -- reply等。
  • aeApiPoll:I/O 多路复用 API,是基于 epoll_wait/select/kevent 等系统调用的封装,监听等待读写事件触发,然后处理,它是事件循环(Event Loop)中的核心函数,是事件驱动得以运行的基础。
  • acceptTcpHandler:连接应答处理器,底层使用系统调用 accept 接受来自客户端的新连接,并为新连接注册绑定命令读取处理器,以备后续处理新的客户端 TCP 连接;除了这个处理器,还有对应的 acceptUnixHandler 负责处理 Unix Domain Socket 以及 acceptTLSHandler 负责处理 TLS 加密连接。
  • readQueryFromClient:命令读取处理器,解析并执行客户端的请求命令。
  • beforeSleep:事件循环中进入 aeApiPoll 等待事件到来之前会执行的函数,其中包含一些日常的任务,比如把 client->buf 或者 client->reply (后面会解释为什么这里需要两个缓冲区)中的响应写回到客户端,持久化 AOF 缓冲区的数据到磁盘等,相对应的还有一个 afterSleep 函数,在 aeApiPoll 之后执行。
  • sendReplyToClient:命令回复处理器,当一次事件循环之后写出缓冲区中还有数据残留,则这个处理器会被注册绑定到相应的连接上,等连接触发写就绪事件时,它会将写出缓冲区剩余的数据回写到客户端。

Redis 内部实现了一个高性能的事件库 --- AE,基于 epoll/select/kqueue/evport 四种事件驱动技术,实现 Linux/MacOS/FreeBSD/Solaris 多平台的高性能事件循环模型。Redis 的核心网络模型正式构筑在 AE 之上,包括 I/O 多路复用、各类处理器的注册绑定,都是基于此才得以运行。

至此,我们可以描绘出客户端向 Redis 发起请求命令的工作原理:

  1. Redis 服务器启动,开启主线程事件循环(Event Loop),注册 acceptTcpHandler 连接应答处理器到用户配置的监听端口对应的文件描述符,等待新连接到来;
  2. 客户端和服务端建立网络连接;
  3. acceptTcpHandler 被调用,主线程使用 AE 的 API 将 readQueryFromClient 命令读取处理器绑定到新连接对应的文件描述符上,并初始化一个 client 绑定这个客户端连接;
  4. 客户端发送请求命令,触发读就绪事件,主线程调用 readQueryFromClient 通过 socket 读取客户端发送过来的命令存入 client->querybuf 读入缓冲区;
  5. 接着调用 processInputBuffer,在其中使用 processInlineBuffer 或者 processMultibulkBuffer 根据 Redis 协议解析命令,最后调用 processCommand 执行命令;
  6. 根据请求命令的类型(SET, GET, DEL, EXEC 等),分配相应的命令执行器去执行,最后调用 addReply 函数族的一系列函数将响应数据写入到对应 client 的写出缓冲区:client->buf 或者 client->replyclient->buf 是首选的写出缓冲区,固定大小 16KB,一般来说可以缓冲足够多的响应数据,但是如果客户端在时间窗口内需要响应的数据非常大,那么则会自动切换到 client->reply 链表上去,使用链表理论上能够保存无限大的数据(受限于机器的物理内存),最后把 client 添加进一个 LIFO 队列 clients_pending_write
  7. 在事件循环(Event Loop)中,主线程执行 beforeSleep --> handleClientsWithPendingWrites,遍历 clients_pending_write 队列,调用 writeToClientclient 的写出缓冲区里的数据回写到客户端,如果写出缓冲区还有数据遗留,则注册 sendReplyToClient 命令回复处理器到该连接的写就绪事件,等待客户端可写时在事件循环中再继续回写残余的响应数据。

对于那些想利用多核优势提升性能的用户来说,Redis 官方给出的解决方案也非常简单粗暴:在同一个机器上多跑几个 Redis 实例。事实上,为了保证高可用,线上业务一般不太可能会是单机模式,更加常见的是利用 Redis 分布式集群多节点和数据分片负载均衡来提升性能和保证高可用。

多线程异步任务

以上便是 Redis 的核心网络模型,这个单线程网络模型一直到 Redis v6.0 才改造成多线程模式,但这并不意味着整个 Redis 一直都只是单线程。

Redis 在 v4.0 版本的时候就已经引入了的多线程来做一些异步操作,此举主要针对的是那些非常耗时的命令,通过将这些命令的执行进行异步化,避免阻塞单线程的事件循环。

我们知道 Redis 的 DEL 命令是用来删除掉一个或多个 key 储存的值,它是一个阻塞的命令,大多数情况下你要删除的 key 里存的值不会特别多,最多也就几十上百个对象,所以可以很快执行完,但是如果你要删的是一个超大的键值对,里面有几百万个对象,那么这条命令可能会阻塞至少好几秒,又因为事件循环是单线程的,所以会阻塞后面的其他事件,导致吞吐量下降。

Redis 的作者 antirez 为了解决这个问题进行了很多思考,一开始他想的办法是一种渐进式的方案:利用定时器和数据游标,每次只删除一小部分的数据,比如 1000 个对象,最终清除掉所有的数据,但是这种方案有个致命的缺陷,如果同时还有其他客户端往某个正在被渐进式删除的 key 里继续写入数据,而且删除的速度跟不上写入的数据,那么将会无止境地消耗内存,虽然后来通过一个巧妙的办法解决了,但是这种实现使 Redis 变得更加复杂,而多线程看起来似乎是一个水到渠成的解决方案:简单、易理解。于是,最终 antirez 选择引入多线程来实现这一类非阻塞的命令。更多 antirez 在这方面的思考可以阅读一下他发表的博客:Lazy Redis is better Redis

于是,在 Redis v4.0 之后增加了一些的非阻塞命令如 UNLINKFLUSHALL ASYNCFLUSHDB ASYNC

UNLINK 命令其实就是 DEL 的异步版本,它不会同步删除数据,而只是把 key 从 keyspace 中暂时移除掉,然后将任务添加到一个异步队列,最后由后台线程去删除,不过这里需要考虑一种情况是如果用 UNLINK 去删除一个很小的 key,用异步的方式去做反而开销更大,所以它会先计算一个开销的阀值,只有当这个值大于 64 才会使用异步的方式去删除 key,对于基本的数据类型如 List、Set、Hash 这些,阀值就是其中存储的对象数量。

Redis 多线程网络模型

前面提到 Redis 最初选择单线程网络模型的理由是:CPU 通常不会成为性能瓶颈,瓶颈往往是内存网络,因此单线程足够了。那么为什么现在 Redis 又要引入多线程呢?很简单,就是 Redis 的网络 I/O 瓶颈已经越来越明显了。

随着互联网的飞速发展,互联网业务系统所要处理的线上流量越来越大,Redis 的单线程模式会导致系统消耗很多 CPU 时间在网络 I/O 上从而降低吞吐量,要提升 Redis 的性能有两个方向:

  • 优化网络 I/O 模块
  • 提高机器内存读写的速度

后者依赖于硬件的发展,暂时无解。所以只能从前者下手,网络 I/O 的优化又可以分为两个方向:

  • 零拷贝技术或者 DPDK 技术
  • 利用多核优势

零拷贝技术有其局限性,无法完全适配 Redis 这一类复杂的网络 I/O 场景,更多网络 I/O 对 CPU 时间的消耗和 Linux 零拷贝技术,可以阅读我的另一篇文章:Linux I/O 原理和 Zero-copy 技术全面揭秘。而 DPDK 技术通过旁路网卡 I/O 绕过内核协议栈的方式又太过于复杂以及需要内核甚至是硬件的支持。

因此,利用多核优势成为了优化网络 I/O 性价比最高的方案。

6.0 版本之后,Redis 正式在核心网络模型中引入了多线程,也就是所谓的 I/O threading,至此 Redis 真正拥有了多线程模型。前一小节,我们了解了 Redis 在 6.0 版本之前的单线程事件循环模型,实际上就是一个非常经典的 Reactor 模型:

目前 Linux 平台上主流的高性能网络库/框架中,大都采用 Reactor 模式,比如 netty、libevent、libuv、POE(Perl)、Twisted(Python)等。

Reactor 模式本质上指的是使用 I/O 多路复用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O) 的模式。

更多关于 Reactor 模式的细节可以参考我之前的文章:Go netpoller 原生网络模型之源码全面揭秘,Reactor 网络模型那一小节,这里不再赘述。

Redis 的核心网络模型在 6.0 版本之前,一直是单 Reactor 模式:所有事件的处理都在单个线程内完成,虽然在 4.0 版本中引入了多线程,但是那个更像是针对特定场景(删除超大 key 值等)而打的补丁,并不能被视作核心网络模型的多线程。

通常来说,单 Reactor 模式,引入多线程之后会进化为 Multi-Reactors 模式,基本工作模式如下:

区别于单 Reactor 模式,这种模式不再是单线程的事件循环,而是有多个线程(Sub Reactors)各自维护一个独立的事件循环,由 Main Reactor 负责接收新连接并分发给 Sub Reactors 去独立处理,最后 Sub Reactors 回写响应给客户端。

Multiple Reactors 模式通常也可以等同于 Master-Workers 模式,比如 Nginx 和 Memcached 等就是采用这种多线程模型,虽然不同的项目实现细节略有区别,但总体来说模式是一致的。

设计思路

Redis 虽然也实现了多线程,但是却不是标准的 Multi-Reactors/Master-Workers 模式,这其中的缘由我们后面会分析,现在我们先看一下 Redis 多线程网络模型的总体设计:

  1. Redis 服务器启动,开启主线程事件循环(Event Loop),注册 acceptTcpHandler 连接应答处理器到用户配置的监听端口对应的文件描述符,等待新连接到来;
  2. 客户端和服务端建立网络连接;
  3. acceptTcpHandler 被调用,主线程使用 AE 的 API 将 readQueryFromClient 命令读取处理器绑定到新连接对应的文件描述符上,并初始化一个 client 绑定这个客户端连接;
  4. 客户端发送请求命令,触发读就绪事件,服务端主线程不会通过 socket 去读取客户端的请求命令,而是先将 client 放入一个 LIFO 队列 clients_pending_read
  5. 在事件循环(Event Loop)中,主线程执行 beforeSleep -->handleClientsWithPendingReadsUsingThreads,利用 Round-Robin 轮询负载均衡策略,把 clients_pending_read队列中的连接均匀地分配给 I/O 线程各自的本地 FIFO 任务队列 io_threads_list[id] 和主线程自己,I/O 线程通过 socket 读取客户端的请求命令,存入 client->querybuf 并解析第一个命令,但不执行命令,主线程忙轮询,等待所有 I/O 线程完成读取任务;
  6. 主线程和所有 I/O 线程都完成了读取任务,主线程结束忙轮询,遍历 clients_pending_read 队列,执行所有客户端连接的请求命令,先调用 processCommandAndResetClient 执行第一条已经解析好的命令,然后调用 processInputBuffer 解析并执行客户端连接的所有命令,在其中使用 processInlineBuffer 或者 processMultibulkBuffer 根据 Redis 协议解析命令,最后调用 processCommand 执行命令;
  7. 根据请求命令的类型(SET, GET, DEL, EXEC 等),分配相应的命令执行器去执行,最后调用 addReply 函数族的一系列函数将响应数据写入到对应 client 的写出缓冲区:client->buf 或者 client->replyclient->buf 是首选的写出缓冲区,固定大小 16KB,一般来说可以缓冲足够多的响应数据,但是如果客户端在时间窗口内需要响应的数据非常大,那么则会自动切换到 client->reply 链表上去,使用链表理论上能够保存无限大的数据(受限于机器的物理内存),最后把 client 添加进一个 LIFO 队列 clients_pending_write
  8. 在事件循环(Event Loop)中,主线程执行 beforeSleep --> handleClientsWithPendingWritesUsingThreads,利用 Round-Robin 轮询负载均衡策略,把 clients_pending_write 队列中的连接均匀地分配给 I/O 线程各自的本地 FIFO 任务队列 io_threads_list[id] 和主线程自己,I/O 线程通过调用 writeToClientclient 的写出缓冲区里的数据回写到客户端,主线程忙轮询,等待所有 I/O 线程完成写出任务;
  9. 主线程和所有 I/O 线程都完成了写出任务, 主线程结束忙轮询,遍历 clients_pending_write 队列,如果 client 的写出缓冲区还有数据遗留,则注册 sendReplyToClient 到该连接的写就绪事件,等待客户端可写时在事件循环中再继续回写残余的响应数据。

这里大部分逻辑和之前的单线程模型是一致的,变动的地方仅仅是把读取客户端请求命令和回写响应数据的逻辑异步化了,交给 I/O 线程去完成,这里需要特别注意的一点是:I/O 线程仅仅是读取和解析客户端命令而不会真正去执行命令,客户端命令的执行最终还是要在主线程上完成

源码剖析

以下所有代码基于目前最新的 Redis v6.0.10 版本。

多线程初始化

void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */

    // 如果用户只配置了一个 I/O 线程,则不会创建新线程(效率低),直接在主线程里处理 I/O。
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    // 根据用户配置的 I/O 线程数,启动线程。
    for (int i = 0; i < server.io_threads_num; i++) {
        // 初始化 I/O 线程的本地任务队列。
        io_threads_list[i] = listCreate();
        if (i == 0) continue; // 线程 0 是主线程。

        // 初始化 I/O 线程并启动。
        pthread_t tid;
        // 每个 I/O 线程会分配一个本地锁,用来休眠和唤醒线程。
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        // 每个 I/O 线程分配一个原子计数器,用来记录当前遗留的任务数量。
        io_threads_pending[i] = 0;
        // 主线程在启动 I/O 线程的时候会默认先锁住它,直到有 I/O 任务才唤醒它。
        pthread_mutex_lock(&io_threads_mutex[i]);
        // 启动线程,进入 I/O 线程的主逻辑函数 IOThreadMain。
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

initThreadedIO 会在 Redis 服务器启动时的初始化工作的末尾被调用,初始化 I/O 多线程并启动。

Redis 的多线程模式默认是关闭的,需要用户在 redis.conf 配置文件中开启:

io-threads 4
io-threads-do-reads yes

读取请求

当客户端发送请求命令之后,会触发 Redis 主线程的事件循环,命令处理器 readQueryFromClient 被回调,在以前的单线程模型下,这个方法会直接读取解析客户端命令并执行,但是多线程模式下,则会把 client 加入到 clients_pending_read 任务队列中去,后面主线程再分配到 I/O 线程去读取客户端请求命令:

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;

    // 检查是否开启了多线程,如果是则把 client 加入异步队列之后返回。
    if (postponeClientRead(c)) return;
    
    // 省略代码,下面的代码逻辑和单线程版本几乎是一样的。
    ... 
}

int postponeClientRead(client *c) {
    // 当多线程 I/O 模式开启、主线程没有在处理阻塞任务时,将 client 加入异步队列。
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        // 给 client 打上 CLIENT_PENDING_READ 标识,表示该 client 需要被多线程处理,
        // 后续在 I/O 线程中会在读取和解析完客户端命令之后判断该标识并放弃执行命令,让主线程去执行。
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

接着主线程会在事件循环的 beforeSleep() 方法中,调用 handleClientsWithPendingReadsUsingThreads

int handleClientsWithPendingReadsUsingThreads(void) {
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);

    // 遍历待读取的 client 队列 clients_pending_read,
    // 通过 RR 轮询均匀地分配给 I/O 线程和主线程自己(编号 0)。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 设置当前 I/O 操作为读取操作,给每个 I/O 线程的计数器设置分配的任务数量,
    // 让 I/O 线程可以开始工作:只读取和解析命令,不执行。
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    // 忙轮询,累加所有 I/O 线程的原子任务计数器,直到所有计数器的遗留任务数量都是 0,
    // 表示所有任务都已经执行完成,结束轮询。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");

    // 遍历待读取的 client 队列,清除 CLIENT_PENDING_READ 和 CLIENT_PENDING_COMMAND 标记,
    // 然后解析并执行所有 client 的命令。
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);

        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~CLIENT_PENDING_COMMAND;
            // client 的第一条命令已经被解析好了,直接尝试执行。
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid
                 * processing the client later. So we just go
                 * to the next. */
                continue;
            }
        }
        processInputBuffer(c); // 继续解析并执行 client 命令。

        // 命令执行完成之后,如果 client 中有响应数据需要回写到客户端,则将 client 加入到待写出队列 clients_pending_write
        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            clientInstallWriteHandler(c);
    }

    /* Update processed count on server */
    server.stat_io_reads_processed += processed;

    return processed;
}

这里的核心工作是:

  • 遍历待读取的 client 队列 clients_pending_read,通过 RR 策略把所有任务分配给 I/O 线程和主线程去读取和解析客户端命令。
  • 忙轮询等待所有 I/O 线程完成任务。
  • 最后再遍历 clients_pending_read,执行所有 client 的命令。

写回响应

完成命令的读取、解析以及执行之后,客户端命令的响应数据已经存入 client->buf 或者 client->reply 中了,接下来就需要把响应数据回写到客户端了,还是在 beforeSleep 中, 主线程调用 handleClientsWithPendingWritesUsingThreads

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    // 如果用户设置的 I/O 线程数等于 1 或者当前 clients_pending_write 队列中待写出的 client
    // 数量不足 I/O 线程数的两倍,则不用多线程的逻辑,让所有 I/O 线程进入休眠,
    // 直接在主线程把所有 client 的相应数据回写到客户端。
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    // 唤醒正在休眠的 I/O 线程(如果有的话)。
    if (!server.io_threads_active) startThreadedIO();

    if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);

    // 遍历待写出的 client 队列 clients_pending_write,
    // 通过 RR 轮询均匀地分配给 I/O 线程和主线程自己(编号 0)。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }

        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 设置当前 I/O 操作为写出操作,给每个 I/O 线程的计数器设置分配的任务数量,
    // 让 I/O 线程可以开始工作,把写出缓冲区(client->buf 或 c->reply)中的响应数据回写到客户端。
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 主线程自己也会去执行读取客户端请求命令的任务,以达到最大限度利用 CPU。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    // 忙轮询,累加所有 I/O 线程的原子任务计数器,直到所有计数器的遗留任务数量都是 0。
    // 表示所有任务都已经执行完成,结束轮询。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");

    // 最后再遍历一次 clients_pending_write 队列,检查是否还有 client 的写出缓冲区中有残留数据,
    // 如果有,那就为 client 注册一个命令回复器 sendReplyToClient,等待客户端写就绪再继续把数据回写。
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        // 检查 client 的写出缓冲区是否还有遗留数据。
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);

    /* Update processed count on server */
    server.stat_io_writes_processed += processed;

    return processed;
}

这里的核心工作是:

  • 检查当前任务负载,如果当前的任务数量不足以用多线程模式处理的话,则休眠 I/O 线程并且直接同步将响应数据回写到客户端。
  • 唤醒正在休眠的 I/O 线程(如果有的话)。
  • 遍历待写出的 client 队列 clients_pending_write,通过 RR 策略把所有任务分配给 I/O 线程和主线程去将响应数据写回到客户端。
  • 忙轮询等待所有 I/O 线程完成任务。
  • 最后再遍历 clients_pending_write,为那些还残留有响应数据的 client 注册命令回复处理器 sendReplyToClient,等待客户端可写之后在事件循环中继续回写残余的响应数据。

I/O 线程主逻辑

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];

    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    // 设置 I/O 线程的 CPU 亲和性,尽可能将 I/O 线程(以及主线程,不在这里设置)绑定到用户配置的
    // CPU 列表上。
    redisSetCpuAffinity(server.server_cpulist);
    makeThreadKillable();

    while(1) {
        // 忙轮询,100w 次循环,等待主线程分配 I/O 任务。
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        // 如果 100w 次忙轮询之后如果还是没有任务分配给它,则通过尝试加锁进入休眠,
        // 等待主线程分配任务之后调用 startThreadedIO 解锁,唤醒 I/O 线程去执行。
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        serverAssert(io_threads_pending[id] != 0);

        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));


        // 注意:主线程分配任务给 I/O 线程之时,
        // 会把任务加入每个线程的本地任务队列 io_threads_list[id],
        // 但是当 I/O 线程开始执行任务之后,主线程就不会再去访问这些任务队列,避免数据竞争。
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            // 如果当前是写出操作,则把 client 的写出缓冲区中的数据回写到客户端。
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
              // 如果当前是读取操作,则socket 读取客户端的请求命令并解析第一条命令。
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        // 所有任务执行完之后把自己的计数器置 0,主线程通过累加所有 I/O 线程的计数器
        // 判断是否所有 I/O 线程都已经完成工作。
        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Done\n", id);
    }
}

I/O 线程启动之后,会先进入忙轮询,判断原子计数器中的任务数量,如果是非 0 则表示主线程已经给它分配了任务,开始执行任务,否则就一直忙轮询一百万次等待,忙轮询结束之后再查看计数器,如果还是 0,则尝试加本地锁,因为主线程在启动 I/O 线程之时就已经提前锁住了所有 I/O 线程的本地锁,因此 I/O 线程会进行休眠,等待主线程唤醒。

主线程会在每次事件循环中尝试调用 startThreadedIO 唤醒 I/O 线程去执行任务,如果接收到客户端请求命令,则 I/O 线程会被唤醒开始工作,根据主线程设置的 io_threads_op 标识去执行命令读取和解析或者回写响应数据的任务,I/O 线程在收到主线程通知之后,会遍历自己的本地任务队列 io_threads_list[id],取出一个个 client 执行任务:

  • 如果当前是写出操作,则调用 writeToClient,通过 socket 把 client->buf 或者 client->reply 里的响应数据回写到客户端。
  • 如果当前是读取操作,则调用 readQueryFromClient,通过 socket 读取客户端命令,存入 client->querybuf,然后调用 processInputBuffer 去解析命令,这里最终只会解析到第一条命令,然后就结束,不会去执行命令。
  • 在全部任务执行完之后把自己的原子计数器置 0,以告知主线程自己已经完成了工作。
void processInputBuffer(client *c) {
// 省略代码
...

    while(c->qb_pos < sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED) break;

        /* Don't process more buffers from clients that have already pending
         * commands to execute in c->argv. */
        if (c->flags & CLIENT_PENDING_COMMAND) break;
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            // 判断 client 是否具有 CLIENT_PENDING_READ 标识,如果是处于多线程 I/O 的模式下,
            // 那么此前已经在 readQueryFromClient -> postponeClientRead 中为 client 打上该标识,
            // 则立刻跳出循环结束,此时第一条命令已经解析完成,但是不执行命令。
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }

            // 执行客户端命令
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid exiting this
                 * loop and trimming the client buffer later. So we return
                 * ASAP in that case. */
                return;
            }
        }
    }

...
}

这里需要额外关注 I/O 线程初次启动时会设置当前线程的 CPU 亲和性,也就是绑定当前线程到用户配置的 CPU 上,在启动 Redis 服务器主线程的时候同样会设置 CPU 亲和性,Redis 的核心网络模型引入多线程之后,加上之前的多线程异步任务、多进程(BGSAVE、AOF、BIO、Sentinel 脚本任务等),Redis 现如今的系统并发度已经很大了,而 Redis 本身又是一个对吞吐量和延迟极度敏感的系统,所以用户需要 Redis 对 CPU 资源有更细粒度的控制,这里主要考虑的是两方面:CPU 高速缓存和 NUMA 架构。

首先是 CPU 高速缓存(这里讨论的是 L1 Cache 和 L2 Cache 都集成在 CPU 中的硬件架构),这里想象一种场景:Redis 主进程正在 CPU-1 上运行,给客户端提供数据服务,此时 Redis 启动了子进程进行数据持久化(BGSAVE 或者 AOF),系统调度之后子进程抢占了主进程的 CPU-1,主进程被调度到 CPU-2 上去运行,导致之前 CPU-1 的高速缓存里的相关指令和数据被汰换掉,CPU-2 需要重新加载指令和数据到自己的本地高速缓存里,浪费 CPU 资源,降低性能。

因此,Redis 通过设置 CPU 亲和性,可以将主进程/线程和子进程/线程绑定到不同的核隔离开来,使之互不干扰,能有效地提升系统性能。

其次是基于 NUMA 架构的考虑,在 NUMA 体系下,内存控制器芯片被集成到处理器内部,形成 CPU 本地内存,访问本地内存只需通过内存通道而无需经过系统总线,访问时延大大降低,而多个处理器之间通过 QPI 数据链路互联,跨 NUMA 节点的内存访问开销远大于本地内存的访问:

因此,Redis 通过设置 CPU 亲和性,让主进程/线程尽可能在固定的 NUMA 节点上的 CPU 上运行,更多地使用本地内存而不需要跨节点访问数据,同样也能大大地提升性能。

关于 NUMA 相关知识请读者自行查阅,篇幅所限这里就不再展开,以后有时间我再单独写一篇文章介绍。

最后还有一点,阅读过源码的读者可能会有疑问,Redis 的多线程模式下,似乎并没有对数据进行锁保护,事实上 Redis 的多线程模型是全程无锁(Lock-free)的,这是通过原子操作+交错访问来实现的,主线程和 I/O 线程之间共享的变量有三个:io_threads_pending 计数器、io_threads_op I/O 标识符和 io_threads_list 线程本地任务队列。

io_threads_pending 是原子变量,不需要加锁保护,io_threads_opio_threads_list 这两个变量则是通过控制主线程和 I/O 线程交错访问来规避共享数据竞争问题:I/O 线程启动之后会通过忙轮询和锁休眠等待主线程的信号,在这之前它不会去访问自己的本地任务队列 io_threads_list[id],而主线程会在分配完所有任务到各个 I/O 线程的本地队列之后才去唤醒 I/O 线程开始工作,并且主线程之后在 I/O 线程运行期间只会访问自己的本地任务队列 io_threads_list[0] 而不会再去访问 I/O 线程的本地队列,这也就保证了主线程永远会在 I/O 线程之前访问 io_threads_list 并且之后不再访问,保证了交错访问。io_threads_op 同理,主线程会在唤醒 I/O 线程之前先设置好 io_threads_op 的值,并且在 I/O 线程运行期间不会再去访问这个变量。

性能提升

Redis 将核心网络模型改造成多线程模式追求的当然是最终性能上的提升,所以最终还是要以 benchmark 数据见真章:

测试数据表明,Redis 在使用多线程模式之后性能大幅提升,达到了一倍。更详细的性能压测数据可以参阅这篇文章:Benchmarking the experimental Redis Multi-Threaded I/O

以下是美图技术团队实测的新旧 Redis 版本性能对比图,仅供参考:

模型缺陷

首先第一个就是我前面提到过的,Redis 的多线程网络模型实际上并不是一个标准的 Multi-Reactors/Master-Workers 模型,和其他主流的开源网络服务器的模式有所区别,最大的不同就是在标准的 Multi-Reactors/Master-Workers 模式下,Sub Reactors/Workers 会完成 网络读 -> 数据解析 -> 命令执行 -> 网络写 整套流程,Main Reactor/Master 只负责分派任务,而在 Redis 的多线程方案中,I/O 线程任务仅仅是通过 socket 读取客户端请求命令并解析,却没有真正去执行命令,所有客户端命令最后还需要回到主线程去执行,因此对多核的利用率并不算高,而且每次主线程都必须在分配完任务之后忙轮询等待所有 I/O 线程完成任务之后才能继续执行其他逻辑。

Redis 之所以如此设计它的多线程网络模型,我认为主要的原因是为了保持兼容性,因为以前 Redis 是单线程的,所有的客户端命令都是在单线程的事件循环里执行的,也因此 Redis 里所有的数据结构都是非线程安全的,现在引入多线程,如果按照标准的 Multi-Reactors/Master-Workers 模式来实现,则所有内置的数据结构都必须重构成线程安全的,这个工作量无疑是巨大且麻烦的。

所以,在我看来,Redis 目前的多线程方案更像是一个折中的选择:既保持了原系统的兼容性,又能利用多核提升 I/O 性能。

其次,目前 Redis 的多线程模型中,主线程和 I/O 线程的通信过于简单粗暴:忙轮询和锁,因为通过自旋忙轮询进行等待,导致 Redis 在启动的时候以及运行期间偶尔会有短暂的 CPU 空转引起的高占用率,而且这个通信机制的最终实现看起来非常不直观和不简洁,希望后面 Redis 能对目前的方案加以改进。

总结

Redis 作为缓存系统的事实标准,它的底层原理值得开发者去深入学习,Redis 自 2009 年发布第一版之后,其单线程网络模型的选择在社区中从未停止过讨论,多年来一直有呼声希望 Redis 能引入多线程从而利用多核优势,但是作者 antirez 是一个追求大道至简的开发者,对 Redis 加入任何新功能都异常谨慎,所以在 Redis 初版发布的十年后才最终将 Redis 的核心网络模型改造成多线程模式,这期间甚至诞生了一些 Redis 多线程的替代项目。虽然 antirez 一直在推迟多线程的方案,但却从未停止思考多线程的可行性,Redis 多线程网络模型的改造不是一朝一夕的事情,这其中牵扯到项目的方方面面,所以我们可以看到 Redis 的最终方案也并不完美,没有采用主流的多线程模式设计。

让我们来回顾一下 Redis 多线程网络模型的设计方案:

  • 使用 I/O 线程实现网络 I/O 多线程化,I/O 线程只负责网络 I/O 和命令解析,不执行客户端命令。
  • 利用原子操作+交错访问实现无锁的多线程模型。
  • 通过设置 CPU 亲和性,隔离主进程和其他子进程,让多线程网络模型能发挥最大的性能。

通读本文之后,相信读者们应该能够了解到一个优秀的网络系统的实现所涉及到的计算机领域的各种技术:设计模式、网络 I/O、并发编程、操作系统底层,甚至是计算机硬件。另外还需要对项目迭代和重构的谨慎,对技术方案的深入思考,绝不仅仅是写好代码这一个难点。

参考&延伸阅读

查看原文

kute 关注了用户 · 2月19日

panjf2000 @panjf2000

程序猿。

关注 46

kute 回答了问题 · 1月7日

解决java8数据结构优雅的转换方式

本来想不劳而获的,算了自己写把

int total = map.values().iterator().next().size();
        List<Map<String, Object>> list = IntStream.range(0, total)
                .boxed()
                .parallel()
                .map(i -> map.entrySet()
                        .stream()
                        .reduce(new HashMap<>(total), (accMap, entry) -> {
                            accMap.put(entry.getKey(), entry.getValue().get(i));
                            return accMap;
                        }, (BinaryOperator<Map<String, Object>>) (accMap1, accMap2) -> {
                            accMap1.putAll(accMap2);
                            return accMap1;
                        }))
                .collect(Collectors.toList());

关注 2 回答 2

kute 提出了问题 · 1月7日

解决java8数据结构优雅的转换方式

如题,已知如下结构:

map的大小不确定,但是 map中的v(也就是list)的大小都是一致的。

Map<String, List<Object>> map = new HashMap<>(ImmutableMap.of(
                "a", Lists.newArrayList(1, 2, 3),
                "b", Lists.newArrayList(4, 5, 6)
        ));

要转换成如下结构:

List<Map<String, Object>> list = Lists.newArrayList(
                Maps.newHashMap(ImmutableMap.of(
                        "a", 1,
                        "b", 4
                )),
                Maps.newHashMap(ImmutableMap.of(
                        "a", 2,
                        "b", 5
                )),
                Maps.newHashMap(ImmutableMap.of(
                        "a", 3,
                        "b", 6
                ))
        );

怎么转换比较方便呢?

关注 2 回答 2

kute 提出了问题 · 2020-12-24

两个数据量很大的集合求差集的高效方法

如题,两个集合如 List<String>,每个集合的数据量可能在 50-100w之间,如何 高效的计算出 list-1 diff list-2 的结果,耗时 以及内存占用 尽可能优

可以使用任何一切手段,如 调用脚本等

关注 7 回答 9

kute 回答了问题 · 2020-06-15

解决Freeswitch 如何在主叫挂断后检测到 挂断状态 或者事件?

关注 1 回答 1

kute 提出了问题 · 2020-06-15

解决Freeswitch 如何在主叫挂断后检测到 挂断状态 或者事件?

请教个问题

Freeswitch内呼叫命令

originate {execute_on_answer='lua my_script.lua',hangup_after_bridge=true,origination_caller_id_number=12345}sofia/gateway/gw43/54321 &echo

这样执行后,如果被叫 挂断了,脚本内 session:ready()还是true,如何检测下 被叫挂断呢,能让脚本感知到?

关注 1 回答 1

kute 收藏了文章 · 2020-06-03

面试必备!就凭借着这份Java 高频面试题,我拿下了阿里,字节的offer!

List

1. 为什么 arraylist 不安全?

我们查看源码发现 arraylist 的 CRUD 操作,并没有涉及到锁之类的东西。底层是数组,初始大小为 10。插入时会判断数组容量是否足够,不够的话会进行扩容。所谓扩容就是新建一个新的数组,然后将老的数据里面的元素复制到新的数组里面(所以增加较慢)。

2. CopyOnWriteArrayList 有什么特点?

它是 List 接口的一个实现类,在 java.util.concurrent(简称 JUC,后面我全部改成 juc,大家注意下)。

内部持有一个 ReentrantLock lock = new ReentrantLock(); 对于增删改操作都是先加锁再释放锁,线程安全。并且锁只有一把,而读操作不需要获得锁,支持并发。

读写分离,写时复制出一个新的数组,完成插入、修改或者移除操作后将新数组赋值给 array。

3. CopyOnWriteArrayList 与 Vector 的选择?

Vector 是增删改查方法都加了 synchronized,保证同步,但是每个方法执行的时候都要去获得锁,性能就会大大下降,而 CopyOnWriteArrayList 只是在增删改上加锁,但是读不加锁,在读方面的性能就好于 Vector,CopyOnWriteArrayList 支持读多写少的并发情况。

Vector 和 CopyOnWriteArrayList 都是 List 接口的一个实现类。

4. CopyOnWriteArrayList 适用于什么情况?

我们看源码不难发现他每次增加一个元素都要进行一次拷贝,此时严重影响了增删改的性能,其中和 arraylist 差了好几百倍。

所以对于读多写少的操作 CopyOnWriteArrayList 更加适合,而且线程安全。

DriverManager 这个类就使用到了CopyOnWriteArrayList。

5. LinkedList  和 ArrayList  对比?

 LinkedList<Integer> lists = new LinkedList<>();
 
 lists.addFirst(1);
 lists.push(2);
 lists.addLast(3);
 lists.add(4);
 lists.addFirst(5);
 
 lists.forEach(System.out::println);
// 5 2 1 3 4

addFirst 和 addLast 方法很清楚。push 方法默认是 addFirst 实现。add 方法默认是 addLast 实现。所以总结一下就是 add 和 last,push 和 first。

其实我们要明白一下,链表相对于数组来说,链表的添加和删除速度很快,是顺序添加删除很快,因为一个 linkedList 会保存第一个节点和最后一个节点,时间复杂度为O(1),但是你要指定位置添加 add(int index, E element) ,那么此时他会先遍历,然后找到改位置的节点,将你的节点添加到他前面,此时时间复杂度最大值为 O(n)。

数组呢?我们知道 ArrayList 底层实现就是数组,数组优点就是由于内存地址是顺序的,属于一块整的,此时遍历起来很快,添加删除的话,他会复制数组,当数组长度特别大时所消耗的时间会很长。这是一张图,大家可以看一下:

6. Arrays.asList() 方法返回的数组是不可变得吗?


List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
integers.set(2, 5); // 这个操作可以
//integers.add(6);  这个会抛出异常
integers.forEach(System.out::println); // 1 2 5 4 5

1. 很显然我们是可以修改 list集合的 可以使用set方法
2. 但是当我们尝试去使用add() 方法时,会抛出 java.lang.UnsupportedOperationException 的异常,
不支持操作的异常
3.当我们使用 java9+时  可以使用 List.of()方法 ,他就是彻彻底底的不可修改的

7. 怎么将一个不安全数组换成安全数组?

1. 使用 Collections这个工具类
List<Integer> integers1 = Collections.synchronizedList(integers);

2. java5+ 变成 CopyOnWriteArrayList
CopyOnWriteArrayList<Integer> integers2 = (CopyOnWriteArrayList<Integer>) integers;

3. java9+ ,使用 List.of() 变成只读对象

8. Collections 工具类?

 1. 创建一个安全的空集合,防止NullPointerException异常
 List<String> list = Collections.<String>emptyList();
 
 2. 拷贝集合
 Collections.addAll(list, 2,3, 4, 5, 6);
 
 3. 构建一个安全的集合
 List<Integer> safeList = Collections.synchronizedList(list);
 
 4. 二分查找
 Collections.binarySearch(list, 2);

 5.翻转数组
 Collections.reverse(list);

Set

1. HashSet、TreeSet 和 LinkedHashSet 三种类型什么时候使用它们?

如你的需求是要一个能快速访问的 Set,那么就要用 HashSet,HashSet 底层是 HashMap 实现的,其中的元素没有按顺序排列。

如果你要一个可排序 Set,那么你应该用 TreeSet,TreeSet 的底层实现是 TreeMap。

如果你要记录下插入时的顺序时,你应该使用 LinedHashSet。

Set 集合中不能包含重复的元素,每个元素必须是唯一的,你只要将元素加入 set 中,重复的元素会自动移除。所以可以去重,很多情况下都需要使用(但是去重方式不同)。

LinkedHashSet 正好介于 HashSet 和 TreeSet 之间,它也是一个基于 HashMap 和双向链表的集合,但它同时维护了一个双链表来记录插入的顺序,基本方法的复杂度为 O(1)。

三者都是线程不安全的,需要使用 Collections.synchronizedSet(new HashSet(…));。

2. HashSet 和 LinkedHashSet 判定元素重复的原则是相同的?

会先去执行 hashCode() 方法,判断是否重复。如果 hashCode() 返回值相同,就会去判断 equals 方法。如果 equals() 方法还是相同,那么就认为重复。

3. TreeSet 判断元素重复原则?

TreeSet 的元素必须是实现了 java.lang.Comparable<T> 接口,所以他是根据此个接口的方法 compareTo 方法进行判断重复的,当返回值一样的时认定重复。

4. 怎么实现一个线程安全的 hashset?

我们看源码会发现他里面有一个 HashMap(用 transient 关键字标记的成员变量不参与序列化过程,因为 HashMap 已经实现 Serializable)。

5. CopyOnWriteArraySet 的实现?

1 public CopyOnWriteArraySet() {
2     al = new CopyOnWriteArrayList<E>();
3 }

很显然翻源码我们发现他实现了 CopyOnWriteArrayList()。

Map

1. Hashtable 特点?

Hashtable 和 ConcurrentHashMap 以及 ConcurrentSkipListMap 以及 TreeMap 不允许 key 和 value 值为空,但是 HashMap 可以 key 和 value 值都可以为空。

Hashtable 的方法都加了 Synchronized 关键字修饰,所以线程安全。

它是数组+链表的实现。

2. ConcurrentHashMap 问题?

取消 segments 字段,直接采用 transient volatile HashEntry<K,V>[] table 保存数据。

采用 table 数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。

把 Table 数组+单向链表的数据结构变成为 Table 数组 + 单向链表 + 红黑树的结构。

当链表长度超过 8 以后,单向链表变成了红黑数;在哈希表扩容时,如果发现链表长度小于 6,则会由红黑树重新退化为链表。

对于其他详细我不吹,看懂的么几个,他比 HashMap 还要难。

对于线程安全环境下介意使用 ConcurrentHashMap 而不去使用 Hashtable。

3. 为什么不去使用 Hashtable 而去使用 ConcurrentHashMap?

HashTable 容器使用 synchronized 来保证线程安全,但在线程竞争激烈的情况下 HashTable 的效率非常低下。因为当一个线程访问 HashTable 的同步方法时,其他线程访问 HashTable 的同步方法时,可能会进入阻塞或轮询状态。如线程 1 使用 put 进行添加元素,线程 2 不但不能使用 put 方法添加元素,并且也不能使用 get 方法来获取元素,所以竞争越激烈效率越低。

4. ConcurrentSkipListMap 与 TreeMap 的选择?

ConcurrentSkipListMap 提供了一种线程安全的并发访问的排序映射表。内部是 SkipList(跳表)结构实现,利用底层的插入、删除的 CAS 原子性操作,通过死循环不断获取最新的结点指针来保证不会出现竞态条件。在理论上能够在 O(log(n)) 时间内完成查找、插入、删除操作。调用 ConcurrentSkipListMap 的 size 时,由于多个线程可以同时对映射表进行操作,所以映射表需要遍历整个链表才能返回元素个数,这个操作是个 O(log(n)) 的操作。

在 JDK1.8 中,ConcurrentHashMap 的性能和存储空间要优于 ConcurrentSkipListMap,但是 ConcurrentSkipListMap 有一个功能:它会按照键的自然顺序进行排序。

故需要对键值排序,则我们可以使用 TreeMap,在并发场景下可以使用 ConcurrentSkipListMap。

所以我们并不会去纠结 ConcurrentSkipListMap 和 ConcurrentHashMap 两者的选择。

5. LinkedHashMap 的使用?

主要是为了解决读取的有序性。基于 HashMap 实现的。

Queue

1. 队列是什么?

我们都知道队列 (Queue) 是一种先进先出 (FIFO) 的数据结构,Java 中定义了 java.util.Queue 接口用来表示队列。Java 中的 Queue 与 List、Set 属于同一个级别接口,它们都是实现了 Collection 接口。

注意:HashMap 没有实现 Collection 接口。

2. Deque 是什么?

它是一个双端队列。我们用到的 linkedlist 就是实现了 deque 的接口。支持在两端插入和移除元素。

3. 常见的几种队列实现?

LinkedList 是链表结构,队列呢也是一个列表结构,继承关系上 LinkedList 实现了 Queue,所以对于 Queue 来说,添加是 offer(obj),删除是 poll(),获取队头(不删除)是 peek() 。

 1public static void main(String[] args) {
 2    Queue<Integer> queue = new LinkedList<>();
 3
 4    queue.offer(1);
 5    queue.offer(2);
 6    queue.offer(3);
 7
 8    System.out.println(queue.poll());
 9    System.out.println(queue.poll());
10    System.out.println(queue.poll());
11}
12// 1, 2 , 3 

PriorityQueue 维护了一个有序列表,插入或者移除对象会进行 Heapfy 操作,默认情况下可以称之为小顶堆。当然,我们也可以给它指定一个实现了 java.util.Comparator 接口的排序类来指定元素排列的顺序。PriorityQueue 是一个无界队列,当你设置初始化大小还是不设置都不影响他继续添加元素。

ConcurrentLinkedQueue 是基于链接节点的并且线程安全的队列。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大小 ConcurrentLinkedQueue 对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。

4. ArrayBlockingQueue 与 LinkedBlockingQueue 的区别,哪个性能好呢?

ArrayBlockingQueue 是有界队列。LinkedBlockingQueue 看构造方法区分,默认构造方法最大值是 2^31-1。但是当 take 和 put 操作时,ArrayBlockingQueue 速度要快于 LinkedBlockingQueue。

ArrayBlockingQueue 中的锁是没有分离的,即生产和消费用的是同一个锁。LinkedBlockingQueue 中的锁是分离的,即生产用的是 putLock,消费是 takeLock;ArrayBlockingQueue 基于数组,在生产和消费的时候,是直接将枚举对象插入或移除的,不会产生或销毁任何额外的对象实例;LinkedBlockingQueue 基于链表,在生产和消费的时候,需要把枚举对象转换为 Node 进行插入或移除,会生成一个额外的 Node 对象,这在长时间内需要高效并发地处理大批量数据的系统中,其对于 GC 的影响还是存在一定的区别。

LinkedBlockingQueue 的消耗是 ArrayBlockingQueue 消耗的 10 倍左右,即 LinkedBlockingQueue 消耗在 1500 毫秒左右,而 ArrayBlockingQueue 只需 150 毫秒左右。

按照实现原理来分析,ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea 之所以没这样去做,也许是因为 ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。

在使用 LinkedBlockingQueue 时,若用默认大小且当生产速度大于消费速度时候,有可能会内存溢出。

在使用 ArrayBlockingQueue 和 LinkedBlockingQueue 分别对 1000000 个简单字符做入队操作时,我们测试的是 ArrayBlockingQueue 会比 LinkedBlockingQueue 性能好 , 好差不多 50% 起步。

5. BlockingQueue 的问题以及 ConcurrentLinkedQueue 的问题?

BlockingQueue 可以是限定容量的。

BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 collection 接口。

BlockingQueue 实现是线程安全的。

BlockingQueue 是阻塞队列(看你使用的方法),ConcurrentLinkedQueue 是非阻塞队列。

LinkedBlockingQueue 是一个线程安全的阻塞队列,基于链表实现,一般用于生产者与消费者模型的开发中。采用锁机制来实现多线程同步,提供了一个构造方法用来指定队列的大小,如果不指定大小,队列采用默认大小(Integer.MAX_VALUE,即整型最大值)。

ConcurrentLinkedQueue 是一个线程安全的非阻塞队列,基于链表实现。java 并没有提供构造方法来指定队列的大小,因此它是无界的。为了提高并发量,它通过使用更细的锁机制,使得在多线程环境中只对部分数据进行锁定,从而提高运行效率。他并没有阻塞方法,take 和 put 方法,注意这一点。

6. 简要概述 BlockingQueue 常用的七个实现类?

ArrayBlockingQueue 构造函数必须传入指定大小,所以他是一个有界队列。

LinkedBlockingQueue 分为两种情况,第一种构造函数指定大小,他是一个有界队列,第二种情况不指定大小他可以称之为无界队列,队列最大值为 Integer.MAX_VALUE。

PriorityBlockingQueue(还有一个双向的 LinkedBlockingDeque)他是一个无界队列,不管你使用什么构造函数。一个内部由优先级堆支持的、基于时间的调度队列。队列中存放 Delayed 元素,只有在延迟期满后才能从队列中提取元素。当一个元素的 getDelay() 方法返回值小于等于 0 时才能从队列中 poll 中元素,否则 poll() 方法会返回 null。 

SynchronousQueue 这个队列类似于 Golang的channel,也就是 chan,跟无缓冲区的 chan 很相似。比如 take 和 put 操作就跟 chan 一模一样。但是区别在于他的 poll 和 offer 操作可以设置等待时间。

DelayQueue 延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用 poll() 方法会返回 null 值,超时判定是通过 getDelay(TimeUnit.NANOSECONDS) 方法的返回值小于等于 0 来判断。延时队列不能存放空元素。

添加的元素必须实现 java.util.concurrent.Delayed 接口:

 1@Test
 2public void testLinkedList() throws InterruptedException {
 3
 4    DelayQueue<Person> queue = new DelayQueue<>();
 5
 6    queue.add(new Person());
 7
 8    System.out.println("queue.poll() = " + queue.poll(200,TimeUnit.MILLISECONDS));
 9}
10
11
12static class Person implements Delayed {
13
14    @Override
15    public long getDelay(TimeUnit unit) {
16        // 这个对象的过期时间
17        return 100L;
18    }
19
20    @Override
21    public int compareTo(Delayed o) {
22        //比较
23        return o.hashCode() - this.hashCode();
24    }
25}
26
27输出 : 
28queue.poll() = null

LinkedTransferQueue 是 JDK1.7 加入的无界队列,亮点就是无锁实现的,性能高。Doug Lea 说这个是最有用的 BlockingQueue 了,性能最好的一个。Doug Lea 说从功能角度来讲,LinkedTransferQueue 实际上是 ConcurrentLinkedQueue、SynchronousQueue(公平模式)和 LinkedBlockingQueue 的超集。他的 transfer 方法表示生产必须等到消费者消费才会停止阻塞。生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事)。同时我们知道上面那些 BlockingQueue 使用了大量的 condition 和 lock,这样子效率很低,而 LinkedTransferQueue 则是无锁队列。他的核心方法其实就是 xfer() 方法,基本所有方法都是围绕着这个进行的,一般就是 SYNC、ASYNC、NOW 来区分状态量。像 put、offer、add 都是 ASYNC,所以不会阻塞。下面几个状态对应的变量。

1private static final int NOW   = 0; // for untimed poll, tryTransfer(不阻塞)
2private static final int ASYNC = 1; // for offer, put, add(不阻塞)
3private static final int SYNC  = 2; // for transfer, take(阻塞)
4private static final int TIMED = 3; // for timed poll, tryTransfer (waiting)

7. (小顶堆) 优先队列 PriorityQueue 的实现?

小顶堆是什么:任意一个非叶子节点的权值都不大于其左右子节点的权值。

PriorityQueue 是非线程安全的,PriorityBlockingQueue 是线程安全的。

两者都使用了堆,算法原理相同。

PriorityQueue 的逻辑结构是一棵完全二叉树,就是因为完全二叉树的特点,他实际存储确实可以为一个数组的,所以他的存储结构其实是一个数组。

首先 java 中的 PriorityQueue 是优先队列,使用的是小顶堆实现,因此结果不一定是完全升序。

8. 自己实现一个大顶堆?

 1/**
 2 * 构建一个 大顶堆
 3 *
 4 * @param tree
 5 * @param n
 6 */
 7static void build_heap(int[] tree, int n) {
 8
 9    // 最后一个节点
10    int last_node = n - 1;
11
12    // 开始遍历的位置是 : 最后一个堆的堆顶 , (以最小堆为单位)
13    int parent = (last_node - 1) / 2;
14
15    // 递减向上遍历
16    for (int i = parent; i >= 0; i--) {
17        heapify(tree, n, i);
18    }
19}
20
21
22/**
23 * 递归操作
24 * @param tree 代表一棵树
25 * @param n    代表多少个节点
26 * @param i    对哪个节点进行 heapify
27 */
28static void heapify(int[] tree, int n, int i) {
29
30    // 如果当前值 大于 n 直接返回了 ,一般不会出现这种问题 .....
31    if (i >= n) {
32        return;
33    }
34
35    // 子节点
36    int c1 = 2 * i + 1;
37    int c2 = 2 * i + 2;
38
39    // 假设最大的节点 为 i (父节点)
40    int max = i;
41
42    // 如果大于  赋值给 max
43    if (c1 < n && tree[c1] > tree[max]) {
44        max = c1;
45    }
46
47    // 如果大于  赋值给 max
48    if (c2 < n && tree[c2] > tree[max]) {
49        max = c2;
50    }
51
52    // 如果i所在的就是最大值我们没必要去做交换
53    if (max != i) {
54
55        // 交换最大值 和 父节点 的位置
56        swap(tree, max, i);
57
58        // 交换完以后 , 此时的max其实就是 i原来的数 ,就是最小的数字 ,所以需要递归遍历
59        heapify(tree, n, max);
60    }
61
62}
63
64// 交换操作
65static void swap(int[] tree, int max, int i) {
66    int temp = tree[max];
67    tree[max] = tree[i];
68    tree[i] = temp;
69}

Stack

栈结构属于一种先进者后出,类似于一个瓶子,先进去的会压到栈低(push 操作),出去的时候只有一个出口就是栈顶,返回栈顶元素,这个操作称为 pop。

Stack 类继承自 Vector,所有方法都加入了 sync 修饰,使得效率很低,线程安全。

 1@Test
 2public void testStack() {
 3
 4    Stack<Integer> stack = new Stack<>();
 5
 6    // push 添加
 7    stack.push(1);
 8
 9    stack.push(2);
10
11    // pop 返回栈顶元素 , 并移除
12    System.out.println("stack.pop() = " + stack.pop());
13
14    System.out.println("stack.pop() = " + stack.pop());
15
16}
17
18输出 : 
192 , 1 

这就是本文的全部内容了。如果觉得写的不错,请记得收藏加转发。还想获取更多面试资料的小伙伴们,记得关注我公众号:前程有光

查看原文

kute 赞了文章 · 2020-04-30

1.5W 字搞懂 Spring Cloud,太牛了!

作者:FrancisQ
juejin.im/post/5de2553e5188256e885f4fa3

概述

首先我给大家看一张图,如果大家对这张图有些地方不太理解的话,我希望你们看完我这篇文章会恍然大悟。

什么是 Spring cloud

构建分布式系统不需要复杂和容易出错。Spring Cloud 为最常见的分布式系统模式提供了一种简单且易于接受的编程模型,帮助开发人员构建有弹性的、可靠的、协调的应用程序。Spring Cloud 构建于 Spring Boot 之上,使得开发者很容易入手并快速应用于生产中。

官方果然官方,介绍都这么有板有眼的。

我所理解的Spring Cloud就是微服务系统架构的一站式解决方案,在平时我们构建微服务的过程中需要做如服务发现注册、配置中心、消息总线、负载均衡、断路器、数据监控等操作,而 Spring Cloud 为我们提供了一套简易的编程模型,使我们能在 Spring Boot 的基础上轻松地实现微服务项目的构建。

Spring Cloud 的版本

当然这个只是个题外话。

Spring Cloud 的版本号并不是我们通常见的数字版本号,而是一些很奇怪的单词。这些单词均为英国伦敦地铁站的站名。同时根据字母表的顺序来对应版本时间顺序,比如:最早 的 Release 版本 Angel,第二个 Release 版本 Brixton(英国地名),然后是 Camden、 Dalston、Edgware、Finchley、Greenwich、Hoxton。

关于版本可以看这篇:Spring Cloud 多版本怎么选择?

Spring Cloud 的服务发现框架——Eureka

===============================

Eureka是基于REST(代表性状态转移)的服务,主要在AWS云中用于定位服务,以实现负载均衡和中间层服务器的故障转移。我们称此服务为Eureka服务器。Eureka还带有一个基于Java的客户端组件Eureka Client,它使与服务的交互变得更加容易。客户端还具有一个内置的负载平衡器,可以执行基本的循环负载平衡。在Netflix,更复杂的负载均衡器将Eureka包装起来,以基于流量,资源使用,错误条件等多种因素提供加权负载均衡,以提供出色的弹性。

总的来说,Eureka 就是一个服务发现框架。何为服务,何又为发现呢?

举一个生活中的例子,就比如我们平时租房子找中介的事情。

在没有中介的时候我们需要一个一个去寻找是否有房屋要出租的房东,这显然会非常的费力,一你找凭一个人的能力是找不到很多房源供你选择,再者你也懒得这么找下去(找了这么久,没有合适的只能将就)。这里的我们就相当于微服务中的Consumer,而那些房东就相当于微服务中的Provider。消费者Consumer需要调用提供者Provider提供的一些服务,就像我们现在需要租他们的房子一样。

但是如果只是租客和房东之间进行寻找的话,他们的效率是很低的,房东找不到租客赚不到钱,租客找不到房东住不了房。所以,后来房东肯定就想到了广播自己的房源信息(比如在街边贴贴小广告),这样对于房东来说已经完成他的任务(将房源公布出去),但是有两个问题就出现了。

第一、其他不是租客的都能收到这种租房消息,这在现实世界没什么,但是在计算机的世界中就会出现资源消耗的问题了。第二、租客这样还是很难找到你,试想一下我需要租房,我还需要东一个西一个地去找街边小广告,麻不麻烦?

那怎么办呢?我们当然不会那么傻乎乎的,第一时间就是去找中介呀,它为我们提供了统一房源的地方,我们消费者只需要跑到它那里去找就行了。而对于房东来说,他们也只需要把房源在中介那里发布就行了。

推荐阅读:Spring Cloud Eureka 常用配置详解!

那么现在,我们的模式就是这样的了。

但是,这个时候还会出现一些问题。

  1. 房东注册之后如果不想卖房子了怎么办?我们是不是需要让房东定期续约?如果房东不进行续约是不是要将他们从中介那里的注册列表中移除。
  2. 租客是不是也要进行注册呢?不然合同乙方怎么来呢?
  3. 中介可不可以做连锁店呢?如果这一个店因为某些不可抗力因素而无法使用,那么我们是否可以换一个连锁店呢?

针对上面的问题我们来重新构建一下上面的模式图

好了,举完这个例子我们就可以来看关于 Eureka 的一些基础概念了,你会发现这东西理解起来怎么这么简单。

服务发现:其实就是一个“中介”,整个过程中有三个角色:服务提供者(出租房子的)、服务消费者(租客)、服务中介(房屋中介)。

服务提供者:就是提供一些自己能够执行的一些服务给外界。

服务消费者:就是需要使用一些服务的“用户”。

服务中介:其实就是服务提供者和服务消费者之间的“桥梁”,服务提供者可以把自己注册到服务中介那里,而服务消费者如需要消费一些服务(使用一些功能)就可以在服务中介中寻找注册在服务中介的服务提供者。

服务注册 Register:

官方解释:当 Eureka 客户端向[Eureka] Server注册时,它提供自身的元数据,比如IP地址、端口,运行状况指示符URL,主页等。

结合中介理解:房东 (提供者[Eureka] Client Provider)在中介 (服务器[Eureka] Server) 那里登记房屋的信息,比如面积,价格,地段等等(元数据metaData)。

服务续约 Renew:

官方解释:Eureka 客户会每隔30秒(默认情况下)发送一次心跳来续约。通过续约来告知[Eureka] Server该 Eureka 客户仍然存在,没有出现问题。正常情况下,如果[Eureka] Server在90秒没有收到 Eureka 客户的续约,它会将实例从其注册表中删除。

结合中介理解:房东 (提供者[Eureka] Client Provider) 定期告诉中介 (服务器[Eureka] Server) 我的房子还租(续约) ,中介 (服务器[Eureka] Server) 收到之后继续保留房屋的信息。

获取注册列表信息 Fetch Registries:

官方解释:Eureka 客户端从服务器获取注册表信息,并将其缓存在本地。客户端会使用该信息查找其他服务,从而进行远程调用。该注册列表信息定期(每30秒钟)更新一次。每次返回注册列表信息可能与 Eureka 客户端的缓存信息不同, Eureka 客户端自动处理。如果由于某种原因导致注册列表信息不能及时匹配,Eureka 客户端则会重新获取整个注册表信息。

Eureka 服务器缓存注册列表信息,整个注册表以及每个应用程序的信息进行了压缩,压缩内容和没有压缩的内容完全相同。Eureka 客户端和 Eureka 服务器可以使用JSON / XML格式进行通讯。在默认的情况下 Eureka 客户端使用压缩JSON格式来获取注册列表的信息。

结合中介理解:租客(消费者[Eureka] Client Consumer) 去中介 (服务器[Eureka] Server) 那里获取所有的房屋信息列表 (客户端列表[Eureka] Client List) ,而且租客为了获取最新的信息会定期向中介 (服务器[Eureka] Server) 那里获取并更新本地列表。

服务下线 Cancel:

官方解释:Eureka客户端在程序关闭时向Eureka服务器发送取消请求。发送请求后,该客户端实例信息将从服务器的实例注册表中删除。该下线请求不会自动完成,它需要调用以下内容:DiscoveryManager.getInstance().shutdownComponent();

结合中介理解:房东 (提供者[Eureka] Client Provider) 告诉中介  (服务器[Eureka] Server) 我的房子不租了,中介之后就将注册的房屋信息从列表中剔除。

服务剔除 Eviction:

官方解释:在默认的情况下,当Eureka客户端连续90秒(3个续约周期)没有向Eureka服务器发送服务续约,即心跳,Eureka服务器会将该服务实例从服务注册列表删除,即服务剔除。

结合中介理解:房东(提供者[Eureka] Client Provider) 会定期联系 中介  (服务器[Eureka] Server) 告诉他我的房子还租(续约),如果中介  (服务器[Eureka] Server) 长时间没收到提供者的信息,那么中介会将他的房屋信息给下架(服务剔除)。

下面就是Netflix官方给出的 Eureka 架构图,你会发现和我们前面画的中介图别无二致。

当然,可以充当服务发现的组件有很多:Zookeeper,Consul, Eureka 等。

更多关于 Eureka 的知识(自我保护,初始注册策略等等)可以自己去官网查看,或者查看我的另一篇文章 [深入理解 Eureka]

负载均衡之 Ribbon

什么是 RestTemplate?

不是讲Ribbon么?怎么扯到了RestTemplate了?你先别急,听我慢慢道来。更多的关于设计,原理知识点的问题。

我就说一句!RestTemplate是Spring提供的一个访问Http服务的客户端类,怎么说呢?就是微服务之间的调用是使用的RestTemplate。比如这个时候我们 消费者B 需要调用 提供者A 所提供的服务我们就需要这么写。如我下面的伪代码

@Autowired
private RestTemplate restTemplate;
// 这里是提供者A的ip地址,但是如果使用了 Eureka 那么就应该是提供者A的名称
private static final String SERVICE_PROVIDER_A = "http://localhost:8081";

@PostMapping("/judge")
public boolean judge(@RequestBody Request request) {
    String url = SERVICE_PROVIDER_A + "/service1";
    return restTemplate.postForObject(url, request, Boolean.class);
}

如果你对源码感兴趣的话,你会发现上面我们所讲的 Eureka 框架中的注册、续约等,底层都是使用的RestTemplate。

为什么需要 Ribbon?

Ribbon 是Netflix公司的一个开源的负载均衡 项目,是一个客户端/进程内负载均衡器,运行在消费者端。

我们再举个例子,比如我们设计了一个秒杀系统,但是为了整个系统的高可用,我们需要将这个系统做一个集群,而这个时候我们消费者就可以拥有多个秒杀系统的调用途径了,如下图。

如果这个时候我们没有进行一些均衡操作,如果我们对秒杀系统1进行大量的调用,而另外两个基本不请求,就会导致秒杀系统1崩溃,而另外两个就变成了傀儡,那么我们为什么还要做集群,我们高可用体现的意义又在哪呢?

推荐阅读:分布式与集群的区别到底是什么?

所以Ribbon出现了,注意我们上面加粗的几个字——运行在消费者端。指的是,Ribbon是运行在消费者端的负载均衡器,如下图。

其工作原理就是Consumer端获取到了所有的服务列表之后,在其内部使用负载均衡算法,进行对多个系统的调用。

Nginx 和 Ribbon 的对比

提到负载均衡就不得不提到大名鼎鼎的Nignx了,而和Ribbon不同的是,它是一种集中式的负载均衡器。

何为集中式呢?简单理解就是将所有请求都集中起来,然后再进行负载均衡。如下图。

我们可以看到Nginx是接收了所有的请求进行负载均衡的,而对于Ribbon来说它是在消费者端进行的负载均衡。如下图。

请注意Request的位置,在Nginx中请求是先进入负载均衡器,而在Ribbon中是先在客户端进行负载均衡才进行请求的。

Ribbon 的几种负载均衡算法

负载均衡,不管Nginx还是Ribbon都需要其算法的支持,如果我没记错的话Nginx使用的是 轮询和加权轮询算法。而在Ribbon中有更多的负载均衡调度算法,其默认是使用的RoundRobinRule轮询策略。

  • RoundRobinRule:轮询策略。Ribbon默认采用的策略。若经过一轮轮询没有找到可用的provider,其最多轮询 10 轮。若最终还没有找到,则返回 null。
  • RandomRule: 随机策略,从所有可用的 provider 中随机选择一个。
  • RetryRule: 重试策略。先按照 RoundRobinRule 策略获取 provider,若获取失败,则在指定的时限内重试。默认的时限为 500 毫秒。

还有很多,这里不一一举例子了,你最需要知道的是默认轮询算法,并且可以更换默认的负载均衡算法,只需要在配置文件中做出修改就行。

providerName:
  ribbon:
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule

当然,在Ribbon中你还可以自定义负载均衡算法,你只需要实现IRule接口,然后修改配置文件或者自定义Java Config类。

什么是 Open Feign

有了 Eureka,RestTemplate,Ribbon我们就可以愉快地进行服务间的调用了,但是使用RestTemplate还是不方便,我们每次都要进行这样的调用。

@Autowired
private RestTemplate restTemplate;
// 这里是提供者A的ip地址,但是如果使用了 Eureka 那么就应该是提供者A的名称
private static final String SERVICE_PROVIDER_A = "http://localhost:8081";

@PostMapping("/judge")
public boolean judge(@RequestBody Request request) {
    String url = SERVICE_PROVIDER_A + "/service1";
    // 是不是太麻烦了???每次都要 url、请求、返回类型的
    return restTemplate.postForObject(url, request, Boolean.class);
}

这样每次都调用RestRemplate的API是否太麻烦,我能不能像调用原来代码一样进行各个服务间的调用呢?

聪明的小朋友肯定想到了,那就用映射呀,就像域名和IP地址的映射。我们可以将被调用的服务代码映射到消费者端,这样我们就可以“无缝开发”啦。

OpenFeign 也是运行在消费者端的,使用 Ribbon 进行负载均衡,所以 OpenFeign 直接内置了 Ribbon。

在导入了Open Feign之后我们就可以进行愉快编写  Consumer端代码了。

// 使用 @FeignClient 注解来指定提供者的名字
@FeignClient(value = "eureka-client-provider")
public interface TestClient {
    // 这里一定要注意需要使用的是提供者那端的请求相对路径,这里就相当于映射了
    @RequestMapping(value = "/provider/xxx",
    method = RequestMethod.POST)
    CommonResponse<List<Plan>> getPlans(@RequestBody planGetRequest request);
}

然后我们在Controller就可以像原来调用Service层代码一样调用它了。

@RestController
public class TestController {
    // 这里就相当于原来自动注入的 Service
    @Autowired
    private TestClient testClient;
    // controller 调用 service 层代码
    @RequestMapping(value = "/test", method = RequestMethod.POST)
    public CommonResponse<List<Plan>> get(@RequestBody planGetRequest request) {
        return testClient.getPlans(request);
    }
}

必不可少的 Hystrix

什么是 Hystrix之熔断和降级

在分布式环境中,不可避免地会有许多服务依赖项中的某些失败。Hystrix是一个库,可通过添加等待时间容限和容错逻辑来帮助您控制这些分布式服务之间的交互。Hystrix通过隔离服务之间的访问点,停止服务之间的级联故障并提供后备选项来实现此目的,所有这些都可以提高系统的整体弹性。

总体来说[Hystrix]就是一个能进行熔断和降级的库,通过使用它能提高整个系统的弹性。

那么什么是 熔断和降级 呢?再举个例子,此时我们整个微服务系统是这样的。服务A调用了服务B,服务B再调用了服务C,但是因为某些原因,服务C顶不住了,这个时候大量请求会在服务C阻塞。

服务C阻塞了还好,毕竟只是一个系统崩溃了。但是请注意这个时候因为服务C不能返回响应,那么服务B调用服务C的的请求就会阻塞,同理服务B阻塞了,那么服务A也会阻塞崩溃。

请注意,为什么阻塞会崩溃。因为这些请求会消耗占用系统的线程、IO 等资源,消耗完你这个系统服务器不就崩了么。

这就叫服务雪崩。妈耶,上面两个熔断和降级你都没给我解释清楚,你现在又给我扯什么服务雪崩?

别急,听我慢慢道来。更多的关于设计,原理知识点的问题,可以在码匠笔记后台回复实践获取。

不听我也得讲下去!

所谓熔断就是服务雪崩的一种有效解决方案。当指定时间窗内的请求失败率达到设定阈值时,系统将通过断路器直接将此请求链路断开。

也就是我们上面服务B调用服务C在指定时间窗内,调用的失败率到达了一定的值,那么[Hystrix]则会自动将 服务B与C 之间的请求都断了,以免导致服务雪崩现象。

其实这里所讲的熔断就是指的[Hystrix]中的断路器模式,你可以使用简单的@[Hystrix]Command注解来标注某个方法,这样[Hystrix]就会使用断路器来“包装”这个方法,每当调用时间超过指定时间时(默认为1000ms),断路器将会中断对这个方法的调用。

当然你可以对这个注解的很多属性进行设置,比如设置超时时间,像这样。

@HystrixCommand(
    commandProperties = {@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "1200")}
)
public List<Xxx> getXxxx() {
    // ...省略代码逻辑
}

但是,我查阅了一些博客,发现他们都将熔断和降级的概念混淆了,以我的理解,降级是为了更好的用户体验,当一个方法调用异常时,通过执行另一种代码逻辑来给用户友好的回复。这也就对应着[Hystrix]的后备处理模式。你可以通过设置fallbackMethod来给一个方法设置备用的代码逻辑。比如这个时候有一个热点新闻出现了,我们会推荐给用户查看详情,然后用户会通过id去查询新闻的详情,但是因为这条新闻太火了(比如最近什么*易对吧),大量用户同时访问可能会导致系统崩溃,那么我们就进行服务降级,一些请求会做一些降级处理比如当前人数太多请稍后查看等等。

// 指定了后备方法调用
@HystrixCommand(fallbackMethod = "getHystrixNews")
@GetMapping("/get/news")
public News getNews(@PathVariable("id") int id) {
    // 调用新闻系统的获取新闻api 代码逻辑省略
}
//
public News getHystrixNews(@PathVariable("id") int id) {
    // 做服务降级
    // 返回当前人数太多,请稍后查看
}

什么是Hystrix之其他

我在阅读 《Spring微服务实战》这本书的时候还接触到了一个舱壁模式的概念。在不使用舱壁模式的情况下,服务A调用服务B,这种调用默认的是使用同一批线程来执行的,而在一个服务出现性能问题的时候,就会出现所有线程被刷爆并等待处理工作,同时阻塞新请求,最终导致程序崩溃。而舱壁模式会将远程资源调用隔离在他们自己的线程池中,以便可以控制单个表现不佳的服务,而不会使该程序崩溃。

具体其原理我推荐大家自己去了解一下,本篇文章中对舱壁模式不做过多解释。当然还有[Hystrix]仪表盘,它是用来实时监控**[Hystrix]的各项指标信息的,这里我将这个问题也抛出去,希望有不了解的可以自己去搜索一下。

微服务网关——Zuul

ZUUL 是从设备和 web 站点到 Netflix 流应用后端的所有请求的前门。作为边界服务应用,ZUUL 是为了实现动态路由、监视、弹性和安全性而构建的。它还具有根据情况将请求路由到多个 Amazon Auto Scaling Groups(亚马逊自动缩放组,亚马逊的一种云计算方式) 的能力

在上面我们学习了 Eureka 之后我们知道了服务提供者是消费者通过[Eureka] Server进行访问的,即[Eureka] Server是服务提供者的统一入口。那么整个应用中存在那么多消费者需要用户进行调用,这个时候用户该怎样访问这些消费者工程呢?当然可以像之前那样直接访问这些工程。但这种方式没有统一的消费者工程调用入口,不便于访问与管理,而 Zuul 就是这样的一个对于消费者的统一入口。

如果学过前端的肯定都知道 Router 吧,比如 Flutter 中的路由,Vue,React中的路由,用了 Zuul 你会发现在路由功能方面和前端配置路由基本是一个理,我偶尔撸撸 Flutter。关注微信公众号:Java技术栈,在后台回复:cloud,可以获取 Spring Cloud 系列教程。

大家对网关应该很熟吧,简单来讲网关是系统唯一对外的入口,介于客户端与服务器端之间,用于对请求进行鉴权、限流、路由、监控等功能。

没错,网关有的功能,Zuul基本都有。而Zuul中最关键的就是路由和过滤器了,在官方文档中Zuul的标题就是

Router and Filter : Zuul

Zuul 的路由功能

简单配置

本来想给你们复制一些代码,但是想了想,因为各个代码配置比较零散,看起来也比较零散,我决定还是给你们画个图来解释吧。

比如这个时候我们已经向[Eureka] Server注册了两个Consumer、三个Provicer,这个时候我们再加个Zuul网关应该变成这样子了。

emmm,信息量有点大,我来解释一下。关于前面的知识我就不解释了 。

首先,Zuul需要向 Eureka 进行注册,注册有啥好处呢?

你傻呀,Consumer都向[Eureka] Server进行注册了,我网关是不是只要注册就能拿到所有Consumer的信息了?

拿到信息有什么好处呢?

我拿到信息我是不是可以获取所有的Consumer的元数据(名称,ip,端口)?

拿到这些元数据有什么好处呢?拿到了我们是不是直接可以做路由映射?比如原来用户调用Consumer1的接口localhost:8001/studentInfo/update这个请求,我们是不是可以这样进行调用了呢?localhost:9000/consumer1/studentInfo/update呢?你这样是不是恍然大悟了?

这里的url为了让更多人看懂所以没有使用 restful 风格。

上面的你理解了,那么就能理解关于Zuul最基本的配置了,看下面。

server:
  port: 9000
eureka:
  client:
    service-url:
      # 这里只要注册 Eureka 就行了
      defaultZone: http://localhost:9997/eureka

然后在启动类上加入@EnableZuulProxy注解就行了。没错,就是那么简单。

统一前缀

这个很简单,就是我们可以在前面加一个统一的前缀,比如我们刚刚调用的是localhost:9000/consumer1/studentInfo/update,这个时候我们在yaml配置文件中添加如下。

zuul:
  prefix: /zuul

这样我们就需要通过localhost:9000/zuul/consumer1/studentInfo/update来进行访问了。

路由策略配置

你会发现前面的访问方式(直接使用服务名),需要将微服务名称暴露给用户,会存在安全性问题。所以,可以自定义路径来替代微服务名称,即自定义路由策略。

zuul:
  routes:
    consumer1: /FrancisQ1/**
    consumer2: /FrancisQ2/**

这个时候你就可以使用localhost:9000/zuul/FrancisQ1/studentInfo/update进行访问了。

服务名屏蔽

这个时候你别以为你好了,你可以试试,在你配置完路由策略之后使用微服务名称还是可以访问的,这个时候你需要将服务名屏蔽。

zuul:
 ignore-services: "*"

路径屏蔽

Zuul还可以指定屏蔽掉的路径 URI,即只要用户请求中包含指定的 URI 路径,那么该请求将无法访问到指定的服务。通过该方式可以限制用户的权限。

zuul:
  ignore-patterns: **/auto/**

这样关于 auto 的请求我们就可以过滤掉了。

** 代表匹配多级任意路径

*代表匹配一级任意路径

敏感请求头屏蔽

默认情况下,像 Cookie、Set-Cookie 等敏感请求头信息会被 zuul 屏蔽掉,我们可以将这些默认屏蔽去掉,当然,也可以添加要屏蔽的请求头。

Zuul 的过滤功能

如果说,路由功能是Zuul的基操的话,那么过滤器就是Zuul的利器了。毕竟所有请求都经过网关(Zuul),那么我们可以进行各种过滤,这样我们就能实现限流,灰度发布,权限控制等等。

简单实现一个请求时间日志打印

要实现自己定义的Filter我们只需要继承ZuulFilter然后将这个过滤器类以@Component注解加入 Spring 容器中就行了。

在给你们看代码之前我先给你们解释一下关于过滤器的一些注意点。

过滤器类型:Pre、Routing、Post。前置Pre就是在请求之前进行过滤,Routing路由过滤器就是我们上面所讲的路由策略,而Post后置过滤器就是在Response之前进行过滤的过滤器。你可以观察上图结合着理解,并且下面我会给出相应的注释。

// 加入Spring容器
@Component
public class PreRequestFilter extends ZuulFilter {
    // 返回过滤器类型 这里是前置过滤器
    @Override
    public String filterType() {
        return FilterConstants.PRE_TYPE;
    }
    // 指定过滤顺序 越小越先执行,这里第一个执行
    // 当然不是只真正第一个 在Zuul内置中有其他过滤器会先执行
    // 那是写死的 比如 SERVLET_DETECTION_FILTER_ORDER = -3
    @Override
    public int filterOrder() {
        return 0;
    }
    // 什么时候该进行过滤
    // 这里我们可以进行一些判断,这样我们就可以过滤掉一些不符合规定的请求等等
    @Override
    public boolean shouldFilter() {
        return true;
    }
    // 如果过滤器允许通过则怎么进行处理
    @Override
    public Object run() throws ZuulException {
        // 这里我设置了全局的RequestContext并记录了请求开始时间
        RequestContext ctx = RequestContext.getCurrentContext();
        ctx.set("startTime", System.currentTimeMillis());
        return null;
    }
}

// lombok的日志
@Slf4j
// 加入 Spring 容器
@Component
public class AccessLogFilter extends ZuulFilter {
    // 指定该过滤器的过滤类型
    // 此时是后置过滤器
    @Override
    public String filterType() {
        return FilterConstants.POST_TYPE;
    }
    // SEND_RESPONSE_FILTER_ORDER 是最后一个过滤器
    // 我们此过滤器在它之前执行
    @Override
    public int filterOrder() {
        return FilterConstants.SEND_RESPONSE_FILTER_ORDER - 1;
    }
    @Override
    public boolean shouldFilter() {
        return true;
    }
    // 过滤时执行的策略
    @Override
    public Object run() throws ZuulException {
        RequestContext context = RequestContext.getCurrentContext();
        HttpServletRequest request = context.getRequest();
        // 从RequestContext获取原先的开始时间 并通过它计算整个时间间隔
        Long startTime = (Long) context.get("startTime");
        // 这里我可以获取HttpServletRequest来获取URI并且打印出来
        String uri = request.getRequestURI();
        long duration = System.currentTimeMillis() - startTime;
        log.info("uri: " + uri + ", duration: " + duration / 100 + "ms");
        return null;
    }
}

上面就简单实现了请求时间日志打印功能,你有没有感受到Zuul过滤功能的强大了呢?Spring Cloud Gateway VS Zuul 比较,怎么选?

没有?好的、那我们再来。

令牌桶限流

当然不仅仅是令牌桶限流方式,Zuul只要是限流的活它都能干,这里我只是简单举个例子。

我先来解释一下什么是令牌桶限流吧。

首先我们会有个桶,如果里面没有满那么就会以一定固定的速率会往里面放令牌,一个请求过来首先要从桶中获取令牌,如果没有获取到,那么这个请求就拒绝,如果获取到那么就放行。很简单吧,啊哈哈、

下面我们就通过Zuul的前置过滤器来实现一下令牌桶限流。

@Component
@Slf4j
public class RouteFilter extends ZuulFilter {
    // 定义一个令牌桶,每秒产生2个令牌,即每秒最多处理2个请求
    private static final RateLimiter RATE_LIMITER = RateLimiter.create(2);
    @Override
    public String filterType() {
        return FilterConstants.PRE_TYPE;
    }

    @Override
    public int filterOrder() {
        return -5;
    }

    @Override
    public Object run() throws ZuulException {
        log.info("放行");
        return null;
    }

    @Override
    public boolean shouldFilter() {
        RequestContext context = RequestContext.getCurrentContext();
        if(!RATE_LIMITER.tryAcquire()) {
            log.warn("访问量超载");
            // 指定当前请求未通过过滤
            context.setSendZuulResponse(false);
            // 向客户端返回响应码429,请求数量过多
            context.setResponseStatusCode(429);
            return false;
        }
        return true;
    }
}

这样我们就能将请求数量控制在一秒两个,有没有觉得很酷?

关于 Zuul  的其他

Zuul的过滤器的功能肯定不止上面我所实现的两种,它还可以实现权限校验,包括我上面提到的灰度发布等等。

当然,Zuul作为网关肯定也存在单点问题,如果我们要保证Zuul的高可用,我们就需要进行Zuul的集群配置,这个时候可以借助额外的一些负载均衡器比如Nginx。

Spring Cloud配置管理——Config

为什么要使用进行配置管理?

当我们的微服务系统开始慢慢地庞大起来,那么多Consumer、Provider、[Eureka] Server、Zuul系统都会持有自己的配置,这个时候我们在项目运行的时候可能需要更改某些应用的配置,如果我们不进行配置的统一管理,我们只能去每个应用下一个一个寻找配置文件然后修改配置文件再重启应用。

首先对于分布式系统而言我们就不应该去每个应用下去分别修改配置文件,再者对于重启应用来说,服务无法访问所以直接抛弃了可用性,这是我们更不愿见到的。

那么有没有一种方法既能对配置文件统一地进行管理,又能在项目运行时动态修改配置文件呢?

那就是我今天所要介绍的Spring Cloud Config。

能进行配置管理的框架不止Spring Cloud Config一种,大家可以根据需求自己选择(disconf,阿波罗等等)。而且对于Config来说有些地方实现的不是那么尽人意。

Config 是什么

Spring Cloud Config为分布式系统中的外部化配置提供服务器和客户端支持。使用Config服务器,可以在中心位置管理所有环境中应用程序的外部属性。关注微信公众号:Java技术栈,在后台回复:cloud,可以获取 Spring Cloud 系列教程。

简单来说,Spring Cloud Config就是能将各个 应用/系统/模块 的配置文件存放到统一的地方然后进行管理(Git 或者 SVN)。

你想一下,我们的应用是不是只有启动的时候才会进行配置文件的加载,那么我们的Spring Cloud Config就暴露出一个接口给启动应用来获取它所想要的配置文件,应用获取到配置文件然后再进行它的初始化工作。就如下图。

当然这里你肯定还会有一个疑问,如果我在应用运行时去更改远程配置仓库(Git)中的对应配置文件,那么依赖于这个配置文件的已启动的应用会不会进行其相应配置的更改呢?

答案是不会的。

什么?那怎么进行动态修改配置文件呢?这不是出现了配置漂移吗?你个渣男,你又骗我!

别急嘛,你可以使用Webhooks,这是  github提供的功能,它能确保远程库的配置文件更新后客户端中的配置信息也得到更新。

噢噢,这还差不多。我去查查怎么用。

慢着,听我说完,Webhooks虽然能解决,但是你了解一下会发现它根本不适合用于生产环境,所以基本不会使用它的。

而一般我们会使用Bus消息总线 +Spring Cloud Config进行配置的动态刷新。

引出 Spring Cloud Bus

用于将服务和服务实例与分布式消息系统链接在一起的事件总线。在集群中传播状态更改很有用(例如配置更改事件)。关注微信公众号:Java技术栈,在后台回复:cloud,可以获取 Spring Cloud 系列教程。

你可以简单理解为Spring Cloud Bus的作用就是管理和广播分布式系统中的消息,也就是消息引擎系统中的广播模式。当然作为消息总线的Spring Cloud Bus可以做很多事而不仅仅是客户端的配置刷新功能。

而拥有了Spring Cloud Bus之后,我们只需要创建一个简单的请求,并且加上@ResfreshScope注解就能进行配置的动态修改了,下面我画了张图供你理解。

总结

这篇文章中我带大家初步了解了Spring Cloud的各个组件,他们有

  • Eureka 服务发现框架
  • Ribbon 进程内负载均衡器
  • Open Feign 服务调用映射
  • Hystrix 服务降级熔断器
  • Zuul 微服务网关
  • Config 微服务统一配置中心
  • Bus 消息总线

如果你能这个时候能看懂下面那张图,也就说明了你已经对Spring Cloud微服务有了一定的架构认识。

推荐去我的博客阅读更多:

1.Java JVM、集合、多线程、新特性系列教程

2.Spring MVC、Spring Boot、Spring Cloud 系列教程

3.Maven、Git、Eclipse、Intellij IDEA 系列工具教程

4.Java、后端、架构、阿里巴巴等大厂最新面试题

觉得不错,别忘了点赞+转发哦!

查看原文

赞 15 收藏 11 评论 1

kute 赞了文章 · 2020-04-30

MyBatis 如何实现流式查询

基本概念

流式查询指的是查询成功后不是返回一个集合而是返回一个迭代器,应用每次从迭代器取一条查询结果。流式查询的好处是能够降低内存使用。

如果没有流式查询,我们想要从数据库取 1000 万条记录而又没有足够的内存时,就不得不分页查询,而分页查询效率取决于表设计,如果设计的不好,就无法执行高效的分页查询。因此流式查询是一个数据库访问框架必须具备的功能。

流式查询的过程当中,数据库连接是保持打开状态的,因此要注意的是:执行一个流式查询后,数据库访问框架就不负责关闭数据库连接了,需要应用在取完数据后自己关闭。

MyBatis 流式查询接口

MyBatis 提供了一个叫 org.apache.ibatis.cursor.Cursor 的接口类用于流式查询,这个接口继承了 java.io.Closeablejava.lang.Iterable 接口,由此可知:

  1. Cursor 是可关闭的。实际上当关闭 Cursor 时,也一并将数据库连接关闭了;
  2. Cursor 是可遍历的。

除此之外,Cursor 还提供了三个方法:

  1. isOpen():用于在取数据之前判断 Cursor 对象是否是打开状态。只有当打开时 Cursor 才能取数据;
  2. isConsumed():用于判断查询结果是否全部取完;
  3. getCurrentIndex():返回已经获取了多少条数据。

因为 Cursor 实现了迭代器接口,因此在实际使用当中,从 Cursor 取数据非常简单:

try(Cursor cursor = mapper.querySomeData()) {
    cursor.forEach(rowObject -> {
        // ...
    });
}

使用 try-resource 方式可以令 Cursor 自动关闭。

但构建 Cursor 的过程不简单

我们举个实际例子。下面是一个 Mapper 类:

@Mapper
public interface FooMapper {
    @Select("select * from foo limit #{limit}")
    Cursor<Foo> scan(@Param("limit") int limit);
}

方法 scan() 是一个非常简单的查询。我们在定义这个方时,指定返回值为 Cursor 类型,MyBatis 就明白这个查询方法是一个流式查询。

然后我们再写一个 SpringMVC Controller 方法来调用 Mapper(无关的代码已经省略):

@GetMapping("foo/scan/0/{limit}")
public void scanFoo0(@PathVariable("limit") int limit) throws Exception {
    try (Cursor<Foo> cursor = fooMapper.scan(limit)) {  // 1
        cursor.forEach(foo -> {});                      // 2
    }
}

假设 fooMapper 是 @Autowired 进来的。注释 1 处是获取 Cursor 对象并保证它能最后关闭;2 处则是从 cursor 中取数据。

上面的代码看上去没什么问题,但是执行 scanFoo0(int) 时会报错:

java.lang.IllegalStateException: A Cursor is already closed.

这是因为我们前面说了在取数据的过程中需要保持数据库连接,而 Mapper 方法通常在执行完后连接就关闭了,因此 Cusor 也一并关闭了。

所以,解决这个问题的思路不复杂,保持数据库连接打开即可。我们至少有三种方案可选。

方案一:SqlSessionFactory

我们可以用 SqlSessionFactory 来手工打开数据库连接,将 Controller 方法修改如下:

@GetMapping("foo/scan/1/{limit}")
public void scanFoo1(@PathVariable("limit") int limit) throws Exception {
    try (
        SqlSession sqlSession = sqlSessionFactory.openSession();  // 1
        Cursor<Foo> cursor = 
              sqlSession.getMapper(FooMapper.class).scan(limit)   // 2
    ) {
        cursor.forEach(foo -> { });
    }
}

上面的代码中,1 处我们开启了一个 SqlSession (实际上也代表了一个数据库连接),并保证它最后能关闭;2 处我们使用 SqlSession 来获得 Mapper 对象。这样才能保证得到的 Cursor 对象是打开状态的。

方案二:TransactionTemplate

在 Spring 中,我们可以用 TransactionTemplate 来执行一个数据库事务,这个过程中数据库连接同样是打开的。代码如下:

@GetMapping("foo/scan/2/{limit}")
public void scanFoo2(@PathVariable("limit") int limit) throws Exception {
    TransactionTemplate transactionTemplate = 
            new TransactionTemplate(transactionManager);  // 1

    transactionTemplate.execute(status -> {               // 2
        try (Cursor<Foo> cursor = fooMapper.scan(limit)) {
            cursor.forEach(foo -> { });
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    });
}

上面的代码中,1 处我们创建了一个 TransactionTemplate 对象(此处 transactionManager 是怎么来的不用多解释,本文假设读者对 Spring 数据库事务的使用比较熟悉了),2 处执行数据库事务,而数据库事务的内容则是调用 Mapper 对象的流式查询。注意这里的 Mapper 对象无需通过 SqlSession 创建。

方案三:@Transactional 注解

这个本质上和方案二一样,代码如下:

@GetMapping("foo/scan/3/{limit}")
@Transactional
public void scanFoo3(@PathVariable("limit") int limit) throws Exception {
    try (Cursor<Foo> cursor = fooMapper.scan(limit)) {
        cursor.forEach(foo -> { });
    }
}

它仅仅是在原来方法上面加了个 @Transactional 注解。这个方案看上去最简洁,但请注意 Spring 框架当中注解使用的坑:只在外部调用时生效。在当前类中调用这个方法,依旧会报错。

以上是三种实现 MyBatis 流式查询的方法。

查看原文

赞 17 收藏 12 评论 6

认证与成就

  • 获得 23 次点赞
  • 获得 59 枚徽章 获得 1 枚金徽章, 获得 16 枚银徽章, 获得 42 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

注册于 2015-01-12
个人主页被 1.3k 人浏览