KerryWu

KerryWu 查看完整档案

上海编辑福州大学  |  软件工程 编辑上海得帆  |  技术顾问 编辑 github.com/Kerry2019 编辑
编辑

保持饥饿

个人动态

KerryWu 赞了文章 · 11月23日

差点跪了!阿里3面真题:CAP和BASE理论了解么?可以结合实际案例说下不?

本文节选自我开源的 JavaGuide :https://github.com/Snailclimb/JavaGuide (Github标星92k+!一份涵盖大部分 Java 程序员所需要掌握的核心知识。准备 Java 面试,首选 JavaGuide!)

经历过技术面试的小伙伴想必对这个两个概念已经再熟悉不过了!

Guide哥当年参加面试的时候,不夸张地说,只要问到分布式相关的内容,面试官几乎是必定会问这两个分布式相关的理论。

并且,这两个理论也可以说是小伙伴们学习分布式相关内容的基础了!

因此,小伙伴们非常非常有必要将这理论搞懂,并且能够用自己的理解给别人讲出来。

这篇文章我会站在自己的角度对这两个概念进行解读!

个人能力有限。如果文章有任何需要改善和完善的地方,欢迎在评论区指出,共同进步!——爱你们的Guide哥

CAP理论

CAP 理论/定理起源于 2000年,由加州大学伯克利分校的Eric Brewer教授在分布式计算原理研讨会(PODC)上提出,因此 CAP定理又被称作 布鲁尔定理(Brewer’s theorem)

2年后,麻省理工学院的Seth Gilbert和Nancy Lynch 发表了布鲁尔猜想的证明,CAP理论正式成为分布式领域的定理。

简介

CAP 也就是 Consistency(一致性)Availability(可用性)Partition Tolerance(分区容错性) 这三个单词首字母组合。

CAP 理论的提出者布鲁尔在提出 CAP 猜想的时候,并没有详细定义 ConsistencyAvailabilityPartition Tolerance 三个单词的明确定义。

因此,对于 CAP 的民间解读有很多,一般比较被大家推荐的是下面 👇 这种版本的解。

在理论计算机科学中,CAP 定理(CAP theorem)指出对于一个分布式系统来说,当设计读写操作时,只能能同时满足以下三点中的两个:

  • 一致性(Consistence) : 所有节点访问同一份最新的数据副本
  • 可用性(Availability): 非故障的节点在合理的时间内返回合理的响应(不是错误或者超时的响应)。
  • 分区容错性(Partition tolerance) : 分布式系统出现网络分区的时候,仍然能够对外提供服务。

什么是网络分区?

分布式系统中,多个节点之前的网络本来是连通的,但是因为某些故障(比如部分节点网络出了问题)某些节点之间不连通了,整个网络就分成了几块区域,这就叫网络分区。

partition-tolerance

不是所谓的“3 选 2”

大部分人解释这一定律时,常常简单的表述为:“一致性、可用性、分区容忍性三者你只能同时达到其中两个,不可能同时达到”。实际上这是一个非常具有误导性质的说法,而且在 CAP 理论诞生 12 年之后,CAP 之父也在 2012 年重写了之前的论文。

当发生网络分区的时候,如果我们要继续服务,那么强一致性和可用性只能 2 选 1。也就是说当网络分区之后 P 是前提,决定了 P 之后才有 C 和 A 的选择。也就是说分区容错性(Partition tolerance)我们是必须要实现的。

简而言之就是:CAP 理论中分区容错性 P 是一定要满足的,在此基础上,只能满足可用性 A 或者一致性 C。

因此,分布式系统理论上不可能选择 CA 架构,只能选择 CP 或者 AP 架构。

为啥无同时保证 CA 呢?

举个例子:若系统出现“分区”,系统中的某个节点在进行写操作。为了保证 C, 必须要禁止其他节点的读写操作,这就和 A 发生冲突了。如果为了保证 A,其他节点的读写操作正常的话,那就和 C 发生冲突了。

选择的关键在于当前的业务场景,没有定论,比如对于需要确保强一致性的场景如银行一般会选择保证 CP 。

CAP 实际应用案例

我这里以注册中心来探讨一下 CAP 的实际应用。考虑到很多小伙伴不知道注册中心是干嘛的,这里简单以 Dubbo 为例说一说。

下图是 Dubbo 的架构图。注册中心 Registry 在其中扮演了什么角色呢?提供了什么服务呢?

注册中心负责服务地址的注册与查找,相当于目录服务,服务提供者和消费者只在启动时与注册中心交互,注册中心不转发请求,压力较小。

常见的可以作为注册中心的组件有:ZooKeeper、Eureka、Nacos...。

  1. ZooKeeper 保证的是 CP。 任何时刻对 ZooKeeper 的读请求都能得到一致性的结果,但是, ZooKeeper 不保证每次请求的可用性比如在 Leader 选举过程中或者半数以上的机器不可用的时候服务就是不可用的。
  2. Eureka 保证的则是 AP。 Eureka 在设计的时候就是优先保证 A (可用性)。在 Eureka 中不存在什么 Leader 节点,每个节点都是一样的、平等的。因此 Eureka 不会像 ZooKeeper 那样出现选举过程中或者半数以上的机器不可用的时候服务就是不可用的情况。 Eureka 保证即使大部分节点挂掉也不会影响正常提供服务,只要有一个节点是可用的就行了。只不过这个节点上的数据可能并不是最新的。
  3. Nacos 不仅支持 CP 也支持 AP。

总结

在进行分布式系统设计和开发时,我们不应该仅仅局限在 CAP 问题上,还要关注系统的扩展性、可用性等等

在系统发生“分区”的情况下,CAP 理论只能满足 CP 或者 AP。要注意的是,这里的前提是系统发生了“分区”

如果系统没有发生“分区”的话,节点间的网络连接通信正常的话,也就不存在 P 了。这个时候,我们就可以同时保证 C 和 A 了。

总结:如果系统发生“分区”,我们要考虑选择 CP 还是 AP。如果系统没有发生“分区”的话,我们要思考如何保证 CA 。

推荐阅读

  1. CAP 定理简化 (英文,有趣的案例)
  2. 神一样的 CAP 理论被应用在何方 (中文,列举了很多实际的例子)
  3. 请停止呼叫数据库 CP 或 AP (英文,带给你不一样的思考)

BASE 理论

BASE 理论起源于 2008 年, 由eBay的架构师Dan Pritchett在ACM上发表。

简介

BASEBasically Available(基本可用)Soft-state(软状态)Eventually Consistent(最终一致性) 三个短语的缩写。BASE 理论是对 CAP 中一致性 C 和可用性 A 权衡的结果,其来源于对大规模互联网系统分布式实践的总结,是基于 CAP 定理逐步演化而来的,它大大降低了我们对系统的要求。

BASE 理论的核心思想

即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。

也就是牺牲数据的一致性来满足系统的高可用性,系统中一部分数据不可用或者不一致时,仍需要保持系统整体“主要可用”。

BASE 理论本质上是对 CAP 的延伸和补充,更具体地说,是对 CAP 中 AP 方案的一个补充。

为什么这样说呢?

CAP 理论这节我们也说过了:

如果系统没有发生“分区”的话,节点间的网络连接通信正常的话,也就不存在 P 了。这个时候,我们就可以同时保证 C 和 A 了。因此,如果系统发生“分区”,我们要考虑选择 CP 还是 AP。如果系统没有发生“分区”的话,我们要思考如何保证 CA 。

因此,AP 方案只是在系统发生分区的时候放弃一致性,而不是永远放弃一致性。在分区故障恢复后,系统应该达到最终一致性。这一点其实就是 BASE 理论延伸的地方。

BASE 理论三要素

BASE理论三要素

1. 基本可用

基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性。但是,这绝不等价于系统不可用。

什么叫允许损失部分可用性呢?

  • 响应时间上的损失: 正常情况下,处理用户请求需要 0.5s 返回结果,但是由于系统出现故障,处理用户请求的时间变为 3 s。
  • 系统功能上的损失:正常情况下,用户可以使用系统的全部功能,但是由于系统访问量突然剧增,系统的部分非核心功能无法使用。

2. 软状态

软状态指允许系统中的数据存在中间状态(CAP 理论中的数据不一致),并认为该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时。

3. 最终一致性

最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能够达到一个一致的状态。因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性。

分布式一致性的 3 种级别:

  1. 强一致性 :系统写入了什么,读出来的就是什么。
  2. 弱一致性 :不一定可以读取到最新写入的值,也不保证多少时间之后读取到的数据是最新的,只是会尽量保证某个时刻达到数据一致的状态。
  3. 最终一致性 :弱一致性的升级版。,系统会保证在一定时间内达到数据一致的状态,

业界比较推崇是最终一致性级别,但是某些对数据一致要求十分严格的场景比如银行转账还是要保证强一致性。

总结

ACID 是数据库事务完整性的理论,CAP 是分布式系统设计理论,BASE 是 CAP 理论中 AP 方案的延伸。

图解计算机基础+个人原创的 Java 面试手册PDF版。

微信搜“JavaGuide”回复“计算机基础”即可获取图解计算机基础+个人原创的 Java 面试手册。

查看原文

赞 8 收藏 4 评论 0

KerryWu 发布了文章 · 11月16日

elasticsearch的开发应用

平时项目开发中,经常会遇到模糊搜索的需求。通常当需要模糊搜索的数据库字段不大,我们可以简单通过 字段名 like '%搜索值%'实现,搜索效率不高,而且就算加索引也无法生效。对于数据库字段很大的,mysql还提供全文索引,开销也很大。

有没有一种专门做搜索的“数据库”呢?不仅可以实现高效的模糊搜索,而且还能像百度、谷歌这类搜索引擎一样,从我输入的一段文字中,自动识别关键词进行搜索。下面介绍的elasticsearch就是这方面的行家。

1. 简介

全文搜索属于最常见的需求,开源的 Elasticsearch是目前全文搜索引擎的首选。它可以快速地储存、搜索和分析海量数据。维基百科、Stack Overflow、Github 都采用它。Elasticsearch 的底层是开源库 Lucene。但是,你没法直接用 Lucene,必须自己写代码去调用它的接口。Elasticsearch 是 Lucene 的封装,提供了 REST API 的操作接口,开箱即用。

index 索引

elasticsearch 数据管理的顶层单位叫做 Index(索引)。它是单个数据库的同义词。每个 Index (即数据库)的名字必须是小写。下面的命令可以查看当前节点的所有 Index。

 curl -X GET 'http://localhost:9200/_cat/indices?v'
document 文档

Index 里面单条的记录称为 Document(文档)。许多条 Document 构成了一个 Index,Document 使用 JSON 格式表示。

同一个 Index 里面的 Document,不要求有相同的结构(scheme),但是最好保持相同,这样有利于提高搜索效率。

type (已移除)

type 是之前存在的概念,elasticsearch 7.x 就不在使用。Document 可以分组,比如weather这个 Index 里面,可以按城市分组(北京和上海),这种分组就叫做 Type,它是虚拟的逻辑分组,用来过滤 Document。

但是不同的分组中,Document 的数据结构应当尽量保持一致,否则会影响搜索效率,type的定位就很鸡肋。因此elasticsearch就直接做强制限制,在6.X 版本中,一个index下只能存在一个type;在 7.X 版本中,直接去除了 type 的概念,就是说 index 不再会有 type。从前的那些写法,可以直接用index下的_doc代替。

倒排索引

什么是倒排索引: 倒排索引也叫反向索引,通俗来讲正向索引是通过key找value,反向索引则是通过value找key。Elasticsearch 使用一种称为倒排索引的结构,它适用于快速的全文搜索。一个倒排索引由文档中所有不重复词的列表构成,对于其中每个词,有一个包含它的文档列表。倒排索引建立的是分词(Term)和文档(Document)之间的映射关系,在倒排索引中,数据是面向词(Term)而不是面向文档的。

假设我们有两个文档,每个文档的content域包含如下内容:
The quick brown fox jumped over the lazy dog
Quick brown foxes leap over lazy dogs in summer

为了创建倒排索引,我们首先将每个文档的content域拆分成单独的词(我们称它为词条或tokens),创建一个包含所有不重复词条的排序列表,然后列出每个词条出现在哪个文档。

现在,如果我们想搜索quick brown,我们只需要查找包含每个词条的文档。两个文档都匹配,但是第一个文档比第二个匹配度更高。如果我们使用仅计算匹配词条数量的简单相似性算法,那么,我们可以说,对于我们查询的相关性来讲,第一个文档比第二个文档更佳。

分词

Elasticsearch在倒排索引时会文本会使用设置的分析器,而输入的检索语句也会首先通过设置的相同分析器,然后在进行查询。

es内置很多分词器,但是对中文分词并不友好,例如使用standard分词器对一句中文话进行分词,会分成一个字一个字的。这时可以使用第三方的Analyzer插件,比如 ik、pinyin等,本文用的是ik。

2. 安装

本次除了安装elasticsearch以外,我们还会安装kibana(可视化展示elasticsearch数据的产品),以及给elasticsearch预装一个ik中文分词器的插件。因为elasticsearch选用的版本是 7.9.3 ,为了保证兼容,因此kibana和ik插件都使用该对应版本。

先查看下面容器化安装脚本:

# 启动 Elasticsearch
docker run -d \
 --name elasticsearch \
 --restart=on-failure:3 \
 -p 9200:9200 \
 -p 9300:9300 \
 -e "discovery.type=single-node" \
 -v /Volumes/elasticsearch/data/:/usr/share/elasticsearch/data/ \
 -v /Volumes/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
 -v /Volumes/elasticsearch/plugins/:/usr/share/elasticsearch/plugins/ \
 elasticsearch:7.9.3

#启动 Kibana
docker run -d \
 --name kibana \
 --link elasticsearch:es \
 -p 5601:5601 \
 -e ELASTICSEARCH_URL=es:9200 \
 kibana:7.9.3
elasticsearch部分

单节点执行,暴露9200和9300端口,值得关注的是有三个挂载卷:

  1. /data: elasticsearch的数据库都在该目录,可以直接挂载该目录。
  2. /config/elasticsearch.yml: 主要是安装后要修改elasticsearch.yml文件,在文件末尾加上下列两行,以解决跨域问题。为了方便,就直接挂载该文件了。
http.cors.enabled: true
http.cors.allow-origin: "*"
  1. /plugins: 这是elasticsearch安装插件的目录,默认是空的,通常将插件解压后放入该目录,重启后即可使用。关于分词器的插件,下文再说。
kibana部分

因为kibana的环境变量中是要配置elasticsearch地址的,但是docker不同容器之间默认网络隔离。可以通过配置同一网络环境的方式解决,这里使用的方式,是通过--link 给elasticsearch设置了一个别名,相当于容器内部维护了一个DNS域名映射。

ik分词器插件

ik中文分词器插件的下载地址,最好找elasticsearch对应版本的插件。

通常来说是启动elasticsearch服务后再安装插件的,重启之后生效。我为了简单,先将插件zip下载到本地,elasticsearch-analysis-ik-7.9.3.zip 解压到 /Volumes/elasticsearch/plugins/analysis-ik/ 目录,再对 /Volumes/elasticsearch/plugins/ 目录做挂载,当elasticsearch 容器启动后,插件就已经生效了。

3. API示例

这里我们通过REST API,从零开始做一个完整的示例。我们通过创建一个articles的索引,使用ik中文分词器,创建完数据后,再做查询。本文不会介绍所有的API,但好在是 RESTFul 风格,很多用法都能自己推敲出来。

创建索引

(PUT) http://localhost:9200/articles

更新索引mapping

(POST) http://localhost:9200/articles/_mapping

{
    "properties": {
        "title": {
            "type": "text",
            "analyzer": "ik_max_word",
            "search_analyzer": "ik_max_word"
        },
        "content": {
            "type": "text",
            "analyzer": "ik_max_word",
            "search_analyzer": "ik_max_word"
        }
    }
}
查看索引mapping

(GET) http://localhost:9200/articles/_mapping

新建文档

(POST) http://localhost:9200/articles/_doc

{
    "title": "都江堰",
    "content": "一位年迈的老祖宗,没有成为挂在墙上的画像,没有成为写在书里的回忆..."
}
搜索文档

(GET) http://localhost:9200/articles/_search

{
    "query": {
        "match": {
            "content": "画像回忆"
        }
    }
}

4. spring开发

实际spring结合elasticsearch的开发还是比较复杂的,如果详细的讲这块内容,得单独讲几篇文章。本文的目的主要是简要介绍elasticsearch的应用,因此本章只是做一个很简单的demo,实际开发还得查阅相关文档。

下文使用的是spring-boot-starter-data-elasticsearch,还好目前4.2版本是兼容7.9.x版本elasticsearch的。因为都是属于 spring data jpa 体系的,所以dao层的语法还是容易理解的。另外一些复杂的操作,可以通过注入 ElasticsearchRestTemplate 来实现。

下列代码中的 /articles 接口,和上文中的 http://localhost:9200/articles/_search 一样,都是对文章正文的模糊搜索。

pom.xml
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>
application.yml
spring:
  data:
    elasticsearch:
      client:
        reactive:
          endpoints: localhost:9200
ArticlesEO.java
@Data
@Document(indexName = "articles")
public class ArticlesEO {
    @Id
    private String id;
    @Field(type = FieldType.Text,analyzer = "ik_max_word")
    private String title;
    @Field(type = FieldType.Text,analyzer = "ik_max_word")
    private String content;
}
ArticlesRepository.java
@Repository
public interface ArticlesRepository extends ElasticsearchRepository<ArticlesEO,String> {
    Page<ArticlesEO> findByContent(String content, Pageable pageable);
}
ArticleController.java
@RestController
@RequestMapping
public class ArticleController {
    private Pageable pageable = PageRequest.of(0,10);

    private final ArticlesRepository articlesRepository;
    private final ElasticsearchRestTemplate elasticsearchRestTemplate;

    public ArticleController(ArticlesRepository articlesRepository,ElasticsearchRestTemplate elasticsearchRestTemplate) {
        this.articlesRepository = articlesRepository;
        this.elasticsearchRestTemplate=elasticsearchRestTemplate;
    }

    /**
     * 查询 document
     * @param content
     * @return
     */
    @GetMapping("/articles")
    public Page<ArticlesEO> searchArticle(@RequestParam("content")String content){
        return articlesRepository.findByContent(content,pageable);
    }

}
查看原文

赞 0 收藏 0 评论 0

KerryWu 发布了文章 · 11月11日

Redis分布式锁的实现

很多新手将 分布式锁分布式事务 混淆,个人理解: 是用于解决多程序并发争夺某一共享资源;事务 是用于保障一系列操作执行的一致性。我前面有几篇文章讲解了分布式事务,关于2PC、TCC和异步确保方案的实现,这次打算把几种分布式锁的方案说一说。

1. 定义

在传统单体架构中,我们最常见的锁是jdk的锁。因为线程是操作系统能够运行调度的最小单位,在java多线程开发时,就难免涉及到不同线程竞争同一个进程下的资源。jdk库给我们提供了synchronized、Lock和并发包java.util.concurrent.* 等。但是它们都统一的限制,竞争资源的线程,都是运行在同一个Jvm进程下,在分布式架构中,不同Jvm进程是无法使用该锁的。

为了防止分布式系统中的多个进程之间相互干扰,我们需要一种分布式协调技术来对这些进程进行调度。而这个分布式协调技术的核心就是来实现这个分布式锁

举个经典“超卖”的例子,某个电商项目中抢购100件库存的商品,抢购接口的逻辑可简单分为:1、查询库存是否大于零;2、当库存大于零时,购买商品。当只剩1件库存时,A用户和B用户都同时执行了第一步,查询库存都为1件,然后都执行购买操作。当他们购买完成,发现库存是 -1 件了。我们可以在java代码中将“查询库存”和“减库存”的操作加锁,保障A用户和B用户的请求无法并发执行。但万一我们的接口服务是个集群服务,A用户和B用户的请求分别被负载均衡转发到不同的Jvm进程上,那还是解决不了问题。

2. 分布式锁对比

通过前面的例子可以知道,协调解决分布式锁的资源,肯定不能是Jvm进程级别的资源,而应该是某个可以共享的外部资源。

三种实现方式

常见分布式锁一般有三种实现方式:1. 数据库锁;2. 基于ZooKeeper的分布式锁;3. 基于Redis的分布式锁。

  1. 数据库锁:这种方式很容易被想到,把竞争的资源放到数据库中,利用数据库锁来实现资源竞争,可以参考之前的文章《数据库事务和锁》。例如:(1)悲观锁实现:查询库存商品的sql可以加上 "FOR UPDATE" 以实现排他锁,并且将“查询库存”和“减库存”打包成一个事务 COMMIT,在A用户查询和购买完成之前,B用户的请求都会被阻塞住。(2)乐观锁实现:在库存表中加上版本号字段来控制。或者更简单的实现是,当每次购买完成后发现库存小于零了,回滚事务即可。
  2. zookeeper的分布式锁:实现分布式锁,ZooKeeper是专业的。它类似于一个文件系统,通过多系统竞争文件系统上的文件资源,起到分布式锁的作用。具体的实现方式,请参考之前的文章《zookeeper的开发应用》
  3. redis的分布式锁:之前的文章讲过redis的开发应用和事务,一直没有讲过redis的分布式锁,这也是本文的核心内容。简单来说是通过setnx竞争键的值。

“数据库锁”是竞争表级资源或行级资源,“zookeeper锁”是竞争文件资源,“redis锁”是为了竞争键值资源。它们都是通过竞争程序外的共享资源,来实现分布式锁。

对比

不过在分布式锁的领域,还是zookeeper更专业。redis本质上也是数据库,所有其它两种方案都是“兼职”实现分布式锁的,效果上没有zookeeper好。

  1. 性能消耗小:当真的出现并发锁竞争时,数据库或redis的实现基本都是通过阻塞,或不断重试获取锁,有一定的性能消耗。而zookeeper锁是通过注册监听器,当某个程序释放锁是,下一个程序监听到消息再获取锁。
  2. 锁释放机制完善:如果是redis获取锁的那个客户端bug了或者挂了,那么只能等待超时时间之后才能释放锁;而zk的话,因为创建的是临时znode,只要客户端挂了,znode就没了,此时就自动释放锁。
  3. 集群的强一致性:众所周知,zookeeper是典型实现了 CP 事务的案例,集群中永远由Leader节点来处理事务请求。而redis其实是实现 AP 事务的,如果master节点故障了,发生主从切换,此时就会有可能出现锁丢失的问题。
锁的必要条件

另外为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下几个条件:

  1. 互斥性。在任意时刻,只有一个客户端能持有锁。
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  3. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

3. Redis实现分布式锁

3.1. 加锁

正确的加锁
public class RedisTool {

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * 尝试获取分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }
}

以看到,我们加锁就一行代码:jedis.set(String key, String value, String nxxx, String expx, int time),这个set()方法一共有五个形参:

  1. key:我们使用key来当锁,因为key是唯一的。
  2. value:我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
  3. Nxxx:这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
  4. EXPX:这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
  5. time:与第四个参数相呼应,代表key的过期时间。

总的来说,执行上面的set()方法就只会导致两种结果:

  • 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。
  • 已有锁存在,不做任何操作。
不推荐的加锁方式(不推荐!!!)

我看过很多博客中,都用下面的方式来加锁,即setnx和getset的配合,手动来维护键的过期时间。

public static boolean wrongGetLock2(Jedis jedis, String lockKey, int expireTime) {

    long expires = System.currentTimeMillis() + expireTime;
    String expiresStr = String.valueOf(expires);

    // 如果当前锁不存在,返回加锁成功
    if (jedis.setnx(lockKey, expiresStr) == 1) {
        return true;
    }

    // 如果锁存在,获取锁的过期时间
    String currentValueStr = jedis.get(lockKey);
    if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
        // 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
        String oldValueStr = jedis.getSet(lockKey, expiresStr);
        if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
            // 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才有权利加锁
            return true;
        }
    }
    // 其他情况,一律返回加锁失败
    return false;
}

表面上来看,这段代码也是实现分布式锁的,而且代码逻辑和上面的差不多,但是有下面几个问题:

  1. 由于是客户端自己生成过期时间,所以需要强制要求分布式下每个客户端的时间必须同步。
  2. 当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法,那么虽然最终只有一个客户端可以加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖。
  3. 锁不具备拥有者标识,即任何客户端都可以解锁。

网上的这类代码可能是基于早期jedis的版本,当时有很大的局限性。Redis 2.6.12以上版本为set指令增加了可选参数,像前面说的jedis.set(String key, String value, String nxxx, String expx, int time)的api,可以把 SETNXEXPIRE 打包在一起执行,并且把过期键的解锁交给redis服务器去管理。因此实际开发过程中,大家不要再用这种比较原始的方式加锁了。

3.2. 解锁

正确的加锁
public class RedisTool {

    private static final Long RELEASE_SUCCESS = 1L;

    /**
     * 释放分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @return 是否释放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }
}

首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。那么为什么要使用Lua语言来实现呢?因为要确保上述操作是原子性的。在之前《Redis的线程模型和事务》文章中,我们通过事务的方式保证一系列操作指令的原子性,使用Lua脚本也同样可以实现类似的效果。

为什么要保证原子性呢?假如A请求在获取锁对应的value值验证requestId相等后,下达删除指令。但是由于网络等原因,删除的指令阻塞住了。而此时锁因为超时自动解锁了,并且B请求获取到了锁,重新加锁。这时候A请求到删除指令执行了,结果把B请求好不容易获取到的锁给删了。

3.3. lua

Redis命令的计算能力并不算很强大,使用Lua语言则可以在很大程度上弥补Redis的这个不足。在Redis中,执行Lua语言是原子性,也就是说Redis执行Lua的时候是不会被中断的,具备原子性,这个特性有助于Redis对并发数据一致性的支持。

Redis支持两种方法运行脚本,一种是直接输入一些Lua语言的程序代码,另一种是将Lua语言编写成文件。在实际应用中,一些简单的脚本可以采取第一种方式,对于有一定逻辑的一般采用第二种。而对于采用简单脚本的,Redis支持缓存脚本,只是它会使用SHA-1算法对脚本进行签名,然后把SHA-1标识返回,只要通过这个标识运行就可以了。

redis中执行lua

这里就简单介绍,直接输入一些Lua语言的程序代码的方式,可在redis-cli中执行下列:

eval lua-script key-num [key1 key2 key3 ....] [value1 value2 value3 ....]

--示例1 
eval "return 'Hello World'" 0
--示例2
eval "redis.call('set',KEYS[1],ARGV[1])" 1 lua-key lua-value
  • eval 代表执行Lua语言的命令。
  • lua-script 代表Lua语言脚本。
  • key-num 表示参数中有多少个key,需要注意的是Redis中key是从1开始的,如果没有key的参数,那么写0。
  • [key1 key2 key3…] 是key作为参数传递给Lua语言,也可以不填,但是需要和key-num的个数对应起来。
  • [value1 value2 value3 …] 这些参数传递给Lua语言,他们是可填可不填的。
lua中调用redis

在Lua语言中采用redis.call 执行操作:

redis.call(command,key[param1, param2…])

--示例1
eval "return redis.call('set','foo','bar')" 0
--示例2
eval "return redis.call('set',KEYS[1],'bar')" 1 foo
  • command 是命令,包括set、get、del等。
  • key 是被操作的键。
  • param1,param2… 代表给key的参数。

例如,实现一个getset的lua脚本
getset.lua

local key = KEYS[1]
local newValue = ARGV[1]
local oldValue = redis.call('get', key)
redis.call('set', key, newValue)
return oldValue

3.4. 局限性和改进

前面我们说过,在Redis集群中,分布式锁的实现存在一些局限性,当主从替换时难以保证一致性。

现象

在redis sentinel集群中,我们具有多台redis,他们之间有着主从的关系,例如一主二从。我们的set命令对应的数据写到主库,然后同步到从库。当我们申请一个锁的时候,对应就是一条命令 setnx mykey myvalue ,在redis sentinel集群中,这条命令先是落到了主库。假设这时主库down了,而这条数据还没来得及同步到从库,sentinel将从库中的一台选举为主库了。这时,我们的新主库中并没有mykey这条数据,若此时另外一个client执行 setnx mykey hisvalue , 也会成功,即也能得到锁。这就意味着,此时有两个client获得了锁。这不是我们希望看到的,虽然这个情况发生的记录很小,只会在主从failover的时候才会发生,大多数情况下、大多数系统都可以容忍,但不是所有的系统都能容忍这种瑕疵。

解决

为了解决故障转移情况下的缺陷,Antirez 发明了 Redlock 算法。使用redlock算法,需要多个redis实例,加锁的时候,它会向多半节点发送 setex mykey myvalue 命令,只要过半节点成功了,那么就算加锁成功了。这和zookeeper的实现方案非常类似,zookeeper集群的leader广播命令时,要求其中必须有过半的follower向leader反馈ACK才生效

在实际工作中使用的时候,我们可以选择已有的开源实现,python有redlock-py,java 中有 Redisson redlock

redlock确实解决了上面所说的“不靠谱的情况”。但是,它解决问题的同时,也带来了代价。你需要多个redis实例,你需要引入新的库 代码也得调整,性能上也会有损。所以,果然是不存在“完美的解决方案”,我们更需要的是能够根据实际的情况和条件把问题解决了就好。

4.“超卖”示例代码

我们模拟一个商品抢购的场景:某个商品有100件库存,200个人同时并行的去抢购,分别对比一下有锁和无锁的抢购情况。

4.1. 代码

下文是本次demo的代码,orm使用jpa,因此dao层和pojo的代码就没在文中写了。controller层只有一个接口,通过传参来选择是否使用锁。

表结构

商品表 product

CREATE TABLE `product` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `amount` int DEFAULT NULL,
  PRIMARY KEY (`id`)
)

购买记录表 purchase_history

CREATE TABLE `purchase_history` (
  `id` int NOT NULL AUTO_INCREMENT,
  `product_name` varchar(255) DEFAULT NULL,
  `purchaser` varchar(255) DEFAULT NULL,
  `purchase_time` datetime DEFAULT NULL,
  `amount` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) 
pom.xml
<dependencies>
    <!--web-->
 <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--redis-->
 <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <!--lombok-->
 <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--jpa-->
 <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <!--mysql-->
 <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <!-- test-->
 <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
    </dependency>
</dependencies>
application.yml
server:
  port: 8001
spring:
  redis:
    host: localhost
    port: 6379
 datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/demo
    username: root
    password: password
ProductController.java
@RestController
@RequestMapping("/product")
public class ProductController {
    public final static String PRODUCT_APPLE="apple";
    private final BuyService buyService;
    
    public ProductController(BuyService buyService){
        this.buyService=buyService;
    }
    /**
 * 购买商品
 * @param lock 是否有锁 Y/N
 */ @GetMapping("/buy")
    public void buy(@RequestParam(value = "lock",required = false) String lock)throws Exception{
       if("Y".equals(lock)){
           buyService.buyProductWithLock(PRODUCT_APPLE);
       }else {
           buyService.buyProduct(PRODUCT_APPLE);
       }
    }
}
BuyService.java
@Service
public class BuyService {
    private final ProductDao productDao;
    private final PurchaseHistoryDao purchaseHistoryDao;
    private final LockService lockService;
    public BuyService(ProductDao productDao, PurchaseHistoryDao purchaseHistoryDao, LockService lockService) {
        this.productDao = productDao;
        this.purchaseHistoryDao = purchaseHistoryDao;
        this.lockService = lockService;
    }
    /**
 * 购买:无锁
 * @param productName
 */
 public void buyProduct(String productName) {
        Product product = productDao.findOneByName(productName);
        if (product.getAmount() > 0) {
            //库存减1
            product.setAmount(product.getAmount() - 1);
            productDao.save(product);
            //记录日志
            PurchaseHistory purchaseHistory = new PurchaseHistory();
            purchaseHistory.setProductName(productName);
            purchaseHistory.setAmount(1);
            purchaseHistoryDao.save(purchaseHistory);
        }
    }
    /**
 * 购买:有锁
 * @param productName
 */
 public void buyProductWithLock(String productName) throws Exception{
        String uuid = UUID.randomUUID().toString();
        //加锁
        while (true) {
            if (lockService.lock(productName, uuid)) {
                break;
            }
            Thread.sleep(100);
        }
        Product product = productDao.findOneByName(productName);
        if (product.getAmount() > 0) {
            //库存减1
            product.setAmount(product.getAmount() - 1);
            productDao.save(product);
            //记录日志
            PurchaseHistory purchaseHistory = new PurchaseHistory();
            purchaseHistory.setProductName(productName);
            purchaseHistory.setAmount(1);
            purchaseHistoryDao.save(purchaseHistory);
        }
        lockService.unlock(productName, uuid);
    }
}
LockService.java
@Service
public class LockService {
    private final StringRedisTemplate stringRedisTemplate;
    public LockService(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }
    /**
 * 加锁
 * @param lockKey
 * @param requestId
 * @return
 */
 public boolean lock(String lockKey, String requestId) {
        return stringRedisTemplate
 .opsForValue()
                .setIfAbsent(lockKey, requestId, Duration.ofSeconds(3));
    }
    /**
 * 解锁
 * @param lockKey
 * @param requestId
 */
 public void unlock(String lockKey, String requestId) {
        DefaultRedisScript<Long> script = new DefaultRedisScript<>();
        script.setResultType(Long.class);
        script.setScriptText("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end");
        stringRedisTemplate.execute(script, Collections.singletonList(lockKey), requestId);
    }
}

4.2.测试和分析

测试用例中,商品的初始库存是100份,模拟200个用户购买,按道理来说最终只能购买100份。

我们想要模拟测试“有锁”和“无锁”下的区别,就必须得创建“高并行”的条件,这里要记住是“高并行”,不是“高并发”。因为这里的购买接口,一次请求响应还是很快的,要想模拟出多个用户同时调用接口的情况,本地应该用多线程来模拟。

关于模拟高并行的环境,我做过很多尝试。第一次是写Junit测试用例,起200个线程来调用接口,结果发现线程一多服务就挂了,虽然没弄清楚缘由,但Junit应该不是这么用的。第二次是使用postman来做并发的接口测试,但发现postman是假的并发测试,还是由单线程轮流调用200遍接口,并没有实现多线程。最终还是安装了JMeter,达到了我的期望。

当我们分别起200个线程调用无锁和有锁接口时,测试结果如下:

对比无锁有锁
剩余库存680
购买记录200100

可以看到,无锁的情况下是会有“超卖”问题的,我们再来看看无锁的购买代码。

/**
 * 购买:无锁
 * @param productName
 */
 public synchronized void buyProduct(String productName) {
        Product product = productDao.findOneByName(productName);
        if (product.getAmount() > 0) {
            //库存减1
 product.setAmount(product.getAmount() - 1);
            productDao.save(product);
            //记录日志
 PurchaseHistory purchaseHistory = new PurchaseHistory();
            purchaseHistory.setProductName(productName);
            purchaseHistory.setAmount(1);
            purchaseHistoryDao.save(purchaseHistory);
        }
    }

当多个请求同时调用productDao.findOneByName(productName);,查出来的结果一样,都认为还有库存,都去购买,超卖的问题就出现了。解决这个问题,如果是单进程,通过synchronized之类的锁就解决了。如果是分布式多节点,可以考虑本文的方式,使用redis的分布式锁实现。

查看原文

赞 0 收藏 0 评论 0

KerryWu 发布了文章 · 11月3日

zookeeper的开发应用

1. zookeeper

1.1. 基础

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。很多中间件,比如kafka、hadoop、hbase,都用到了 Zookeeper来构建集群。Zookeeper是hadoop的一个子项目,其发展历程无需赘述。在分布式应用中,由于工程师不能很好地使用锁机制,以及基于消息的协调机制不适合在某些应用中使用,因此需要有一种可靠的、可扩展的、分布式的、可配置的协调机制来统一系统的状态。Zookeeper的目的就在于此。

数据结构

理解ZooKeeper的一种方法是将其看做一个具有高可用性的文件系统。但这个文件系统中没有文件和目录,而是统一使用节点(node)的概念,称为znode。znode既可以保存数据(如同文件),也可以保存其他znode(如同目录)。所有的znode构成一个层次化的数据结构,。

  • Persistent Nodes: 永久有效地节点,除非client显式的删除,否则一直存在
  • Ephemeral Nodes: 临时节点,仅在创建该节点client保持连接期间有效,一旦连接丢失,zookeeper会自动删除该节点
  • Sequence Nodes: 顺序节点,client申请创建该节点时,zk会自动在节点路径末尾添加递增序号,这种类型是实现分布式锁,分布式queue等特殊功能的关键
集群

生产环境的zookeeper服务,一般都是以集群的方式搭建环境。在zookeeper集群中,不同节点一般有下面几种角色:

  1. Leader:事务请求的唯一调度者和处理者。保证集群事务处理的顺序性。集群内部各服务器的调度者。
  2. Follower:处理客户端非事务请求,转发事务请求给Leader。参与事务请求Proposal的投票。参与Leader选举投票。
  3. Observer:处理非事务请求,将事务请求交给Leader处理。

这里面提到了“事务请求”和“非事务请求”,这里说明一下。 事务请求可以理解成数据库中包含commit操作的请求,例如:新增、修改和删除。而非事务请求则对应那些查询的请求。

可见在zookeeper集群中,真正决策的只有一个Leader节点,所有的事务请求到达其他节点后,都还是会被转发到Leader节点来处理。这种模式,保障了zookeeper在命令决策端的原子性。

Leader选举算法采用了Paxos协议,当多数Server写成功,则任务数据写成功如果有3个Server,则两个写成功即可;如果有4或5个Server,则三个写成功即可。Server数目一般为奇数(3、5、7)如果有3个Server,则最多允许1个Server挂掉;如果有4个Server,则同样最多允许1个Server挂掉由此,我们看出3台服务器和4台服务器的的容灾能力是一样的,所以为了节省服务器资源,一般我们采用奇数个数,作为服务器部署个数。

原子广播

zookeeper的核心是原子广播,这个机制保证了各个server之间的同步。实现这个机制的协议叫做Zab协议。Zab协议有两种模式,它们分别是恢复模式广播模式

当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数server的完成了和leader的状态同步以后,恢复模式就结束了。状态同步保证了leader和server具有相同的系统状态。一旦leader已经和多数的follower进行了状态同步后,他就可以开始广播消息了,即进入广播状态。

数据一致性与paxos算法

在一个分布式数据库系统中,如果各节点的初始状态一致,每个节点都执行相同的操作序列,那么他们最后能得到一个一致的状态。

Paxos算法通过投票来对写操作进行全局编号,同一时刻,只有一个写操作被批准,同时并发的写操作要去争取选票,只有获得过半数选票的写操作才会被批准(所以永远只会有一个写操作得到批准),其他的写操作竞争失败只好再发起一轮投票,就这样,在日复一日年复一年的投票中,所有写操作都被严格编号排序。

编号严格递增,当一个节点接受了一个编号为100的写操作,之后又接受到编号为99的写操作(因为网络延迟等很多不可预见原因),它马上能意识到自己数据不一致了,自动停止对外服务并重启同步过程。任何一个节点挂掉都不会影响整个集群的数据一致性(总2n+1台,除非挂掉大于n台)。

新手不要混淆,在这里的投票选举是针对多个客户端有并发写操作时,争夺该唯一写操作权的选举。和前面说的zookeeper集群中,投票选举master是不同的概念。虽然它们的选举协议,都是遵循paxos算法。

脑裂和过半选举

脑裂:对于一个集群,想要提高这个集群的可用性,通常会采用多机房部署,比如现在有一个由6台节点所组成的一个集群,部署在了两个机房。正常情况下,此集群只会有一个Leader,那么如果机房之间的网络断了之后,每个机房都选举自己的Leader,这就相当于原本一个集群,被分成了两个集群,出现了两个“大脑”,这就是脑裂。如果过了一会,断了的网络突然联通了,那么此时就会出现问题了,两个集群刚刚都对外提供服务了,数据该怎么合并,数据冲突怎么解决等等问题。
过半选举:在领导者选举的过程中,如果某台zkServer获得了超过半数的选票,则此zkServer就可以成为Leader了。

因为zookeeper的过半选举,因此zookeeper不存在“脑裂”的情况。例如还是6个节点分布在两个机房,只有当某个节点获得4个节点以上的选票,才能升级为Leader,因此不会出现两个Leader的情况。

1.2. 安装使用

docker 安装

这里直接使用zookeeper官方镜像来安装。

docker run -d \
 --name zookeeper \
 --restart=on-failure:3 \
 -p 2181:2181 \
 -v /Volumes/zookeeper/data/:/data/ \
 zookeeper
启动

执行 docker exec -it 编号 /bin/bash ,进入容器内部。

然后在 /bin 目录,执行 ./zkCli.sh ,运行启动脚本。

zooInspector客户端

zooInspector是zookeeper图形化的客户端工具,可用来查看内部数据情况。

可下载 ZooInspector.zip,解压后在build目录下获取 zookeeper-dev-ZooInspector.jar。通过 java -jar zookeeper-dev-ZooInspector.jar,即可启动 ZooInspector 图形化客户端。

2. CAP对比

2.1. CAP理论

CAP理论告诉我们,一个分布式系统不可能同时满足以下三种:

  • 一致性(C:Consistency)
  • 可用性(A:Available)
  • 分区容错性(P:Partition Tolerance)

这三个基本需求,最多只能同时满足其中的两项,因为P是必须的,因此往往选择就在CP或者AP中。

一致性(C:Consistency)

在分布式环境中,一致性是指数据在多个副本之间是否能够保持数据一致的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。例如一个将数据副本分布在不同分布式节点上的系统来说,如果对第一个节点的数据进行了更新操作并且更新成功后,其他节点上的数据也应该得到更新,并且所有用户都可以读取到其最新的值,那么这样的系统就被认为具有强一致性(或严格的一致性,最终一致性)。

可用性(A:Available)

可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果。“有效的时间内”是指,对于用户的一个操作请求,系统必须能够在指定的时间(即响应时间)内返回对应的处理结果,如果超过了这个时间范围,那么系统就被认为是不可用的。

分区容错性(P:Partition Tolerance)

分区容错性约束了一个分布式系统需要具有如下特性:分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障。

网络分区是指在分布式系统中,不同的节点分布在不同的子网络(机房或异地网络等)中,由于一些特殊的原因导致这些子网络之间出现网络不连通的状况,但各个子网络的内部网络是正常的,从而导致整个系统的网络环境被切分成了若干个孤立的区域。需要注意的是,组成一个分布式系统的每个节点的加入与退出都可以看作是一个特殊的网络分区。

2.2. zookeeper和eureka对比

eureka保证ap

eureka优先保证可用性。在Eureka平台中,如果某台服务器宕机,Eureka不会有类似于ZooKeeper的选举leader的过程;客户端请求会自动切换 到新的Eureka节点;当宕机的服务器重新恢复后,Eureka会再次将其纳入到服务器集群管理之中;而对于它来说,所有要做的无非是同步一些新的服务 注册信息而已。

Eureka各个节点都是平等的,几个节点挂掉不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务。而Eureka的客户端在向某个Eureka注册或时如果发现连接失败,则会自动切换至其它节点,只要有一台Eureka还在,就能保证注册服务可用(保证可用性),只不过查到的信息可能不是最新的(不保证强一致性)。

zookeeper保证cp

作为一个分布式协同服务,zookeeper是优先保证一致性的。

进行leader选举时集群都是不可用。在使用ZooKeeper获取服务列表时,当master节点因为网络故障与其他节点失去联系时,剩余节点会重新进行leader选举。问题在于,选举leader的时间太长,30 ~ 120s, 且选举期间整个zk集群都是不可用的,这就导致在选举期间注册服务瘫痪,虽然服务能够最终恢复,但是漫长的选举时间导致的注册长期不可用是不能容忍的。所以说,ZooKeeper不能保证服务可用性。

3. 应用场景

3.1. 数据发布与订阅(配置中心)

发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,服务式服务框架的服务地址列表等就非常适合使用。

  • 应用中用到的一些配置信息放到ZK上进行集中管理。这类场景通常是这样:应用在启动的时候会主动来获取一次配置,同时,在节点上注册一个Watcher,这样一来,以后每次配置有更新的时候,都会实时通知到订阅的客户端,从来达到获取最新配置信息的目的。
  • 分布式搜索服务中,索引的元信息和服务器集群机器的节点状态存放在ZK的一些指定节点,供各个客户端订阅使用。
  • 分布式日志收集系统。这个系统的核心工作是收集分布在不同机器的日志。收集器通常是按照应用来分配收集任务单元,因此需要在ZK上创建一个以应用名作为path的节点P,并将这个应用的所有机器ip,以子节点的形式注册到节点P上,这样一来就能够实现机器变动的时候,能够实时通知到收集器调整任务分配。
  • 系统中有些信息需要动态获取,并且还会存在人工手动去修改这个信息的发问。通常是暴露出接口,例如JMX接口,来获取一些运行时的信息。引入ZK之后,就不用自己实现一套方案了,只要将这些信息存放到指定的ZK节点上即可。

注意:在上面提到的应用场景中,有个默认前提是:数据量很小,但是数据更新可能会比较快的场景。

3.2. 负载均衡

这里说的负载均衡是指软负载均衡。在分布式环境中,为了保证高可用性,通常同一个应用或同一个服务的提供方都会部署多份,达到对等服务。而消费者就须要在这些对等的服务器中选择一个来执行相关的业务逻辑,其中比较典型的是消息中间件中的生产者,消费者负载均衡。
消息中间件中发布者和订阅者的负载均衡,linkedin开源的KafkaMQ和阿里开源的metaq都是通过zookeeper来做到生产者、消费者的负载均衡。这里以metaq为例如讲下:

生产者负载均衡

metaq发送消息的时候,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,因此metaq在运行过程中,会把所有broker和对应的分区信息全部注册到ZK指定节点上,默认的策略是一个依次轮询的过程,生产者在通过ZK获取分区列表之后,会按照brokerId和partition的顺序排列组织成一个有序的分区列表,发送的时候按照从头到尾循环往复的方式选择一个分区来发送消息。

消费负载均衡

在消费过程中,一个消费者会消费一个或多个分区中的消息,但是一个分区只会由一个消费者来消费。MetaQ的消费策略是:

  • 每个分区针对同一个group只挂载一个消费者。
  • 如果同一个group的消费者数目大于分区数目,则多出来的消费者将不参与消费。
  • 如果同一个group的消费者数目小于分区数目,则有部分消费者需要额外承担消费任务。

在某个消费者故障或者重启等情况下,其他消费者会感知到这一变化(通过 zookeeper watch消费者列表),然后重新进行负载均衡,保证所有的分区都有消费者进行消费。

3.3. 命名服务

zookeeper的命名服务有两个应用方向,一个是提供类似JNDI的功能,利用zookeepeer的树型分层结构,可以把系统中各种服务的名称、地址以及目录信息存放在zookeeper,需要的时候去zookeeper中读取。

另一个,是利用zookeeper顺序节点的特性,制作分布式的ID生成器,写过数据库应用的朋友都知道,我们在往数据库表中插入记录时,通常需要为该记录创建唯一的ID,在单机环境中我们可以利用数据库的主键自增功能。但在分布式环境则无法使用,有一种方式可以使用UUID,但是它的缺陷是没有规律,很难理解。利用zookeeper顺序节点的特性,我们可以生成有顺序的,容易理解的,同时支持分布式环境的序列号。

3.4. 分布式通知/协调

ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode本身内容及子节点的),其中一个系统update了znode,那么另一个系统能够收到通知,并作出相应处理

  • 另一种心跳检测机制:检测系统和被检测系统之间并不直接关联起来,而是通过zk上某个节点关联,大大减少系统耦合。
  • 另一种系统调度模式:某系统有控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台作的一些操作,实际上是修改了ZK上某些节点的状态,而ZK就把这些变化通知给他们注册Watcher的客户端,即推送系统,于是,作出相应的推送任务。
  • 另一种工作汇报模式:一些类似于任务分发系统,子任务启动后,到zk来注册一个临时节点,并且定时将自己的进度进行汇报(将进度写回这个临时节点),这样任务管理者就能够实时知道任务进度。

总之,使用zookeeper来进行分布式通知和协调能够大大降低系统之间的耦合。

3.5. 集群管理与Master选举

集群机器监控

这通常用于那种对集群中机器状态,机器在线率有较高要求的场景,能够快速对集群中机器变化作出响应。这样的场景中,往往有一个监控系统,实时检测集群机器是否存活。过去的做法通常是:监控系统通过某种手段(比如ping)定时检测每个机器,或者每个机器自己定时向监控系统汇报“我还活着”。 这种做法可行,但是存在两个比较明显的问题:

  1. 集群中机器有变动的时候,牵连修改的东西比较多。
  2. 有一定的延时。

利用ZooKeeper有两个特性,就可以实时另一种集群机器存活性监控系统:

  1. 客户端在节点 x 上注册一个Watcher,那么如果 x?的子节点变化了,会通知该客户端。
  2. 创建EPHEMERAL类型的节点,一旦客户端和服务器的会话结束或过期,那么该节点就会消失。

例如,监控系统在 /clusterServers 节点上注册一个Watcher,以后每动态加机器,那么就往 /clusterServers 下创建一个 EPHEMERAL类型的节点:/clusterServers/{hostname}. 这样,监控系统就能够实时知道机器的增减情况,至于后续处理就是监控系统的业务了。

Master选举

Master选举则是zookeeper中最为经典的应用场景了。
在分布式环境中,相同的业务应用分布在不同的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),往往只需要让整个集群中的某一台机器进行执行,其余机器可以共享这个结果,这样可以大大减少重复劳动,提高性能,于是这个master选举便是这种场景下的碰到的主要问题。

利用ZooKeeper的强一致性,能够保证在分布式高并发情况下节点创建的全局唯一性,即:同时有多个客户端请求创建 /currentMaster 节点,最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很轻易的在分布式环境中进行集群选取了。

另外,这种场景演化一下,就是动态Master选举。这就要用到?EPHEMERAL_SEQUENTIAL类型节点的特性了。

上文中提到,所有客户端创建请求,最终只有一个能够创建成功。在这里稍微变化下,就是允许所有请求都能够创建成功,但是得有个创建顺序,于是所有的请求最终在ZK上创建结果的一种可能情况是这样: /currentMaster/{sessionId}-1 ,?/currentMaster/{sessionId}-2 ,?/currentMaster/{sessionId}-3 ….. 每次选取序列号最小的那个机器作为Master,如果这个机器挂了,由于他创建的节点会马上小时,那么之后最小的那个机器就是Master了。

  • 在搜索系统中,如果集群中每个机器都生成一份全量索引,不仅耗时,而且不能保证彼此之间索引数据一致。因此让集群中的Master来进行全量索引的生成,然后同步到集群中其它机器。另外,Master选举的容灾措施是,可以随时进行手动指定master,就是说应用在zk在无法获取master信息时,可以通过比如http方式,向一个地方获取master。
  • 在Hbase中,也是使用ZooKeeper来实现动态HMaster的选举。在Hbase实现中,会在ZK上存储一些ROOT表的地址和HMaster的地址,HRegionServer也会把自己以临时节点(Ephemeral)的方式注册到Zookeeper中,使得HMaster可以随时感知到各个HRegionServer的存活状态,同时,一旦HMaster出现问题,会重新选举出一个HMaster来运行,从而避免了HMaster的单点问题

3.6. 分布式锁

保持独占

所谓保持独占,就是所有试图来获取这个锁的客户端,最终只有一个可以成功获得这把锁。通常的做法是把zk上的一个znode看作是一把锁,通过create znode的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。

控制时序

首先,Zookeeper的每一个节点,都是一个天然的顺序发号器。在每一个节点下面创建子节点时,只要选择的创建类型是有序(EPHEMERAL_SEQUENTIAL 临时有序或者PERSISTENT_SEQUENTIAL 永久有序)类型,那么,新的子节点后面,会加上一个次序编号。这个次序编号,是上一个生成的次序编号加一

比如,创建一个用于发号的节点“/test/lock”,然后以他为父亲节点,可以在这个父节点下面创建相同前缀的子节点,假定相同的前缀为“/test/lock/seq-”,在创建子节点时,同时指明是有序类型。如果是第一个创建的子节点,那么生成的子节点为/test/lock/seq-0000000000,下一个节点则为/test/lock/seq-0000000001,依次类推,等等。

其次,Zookeeper节点的递增性,可以规定节点编号最小的那个获得锁。

一个zookeeper分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT类型),然后每个要获得锁的线程都会在这个节点下创建个临时顺序节点,由于序号的递增性,可以规定排号最小的那个获得锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁。

第三,Zookeeper的节点监听机制,可以保障占有锁的方式有序而且高效。

每个线程抢占锁之前,先抢号创建自己的ZNode。同样,释放锁的时候,就需要删除抢号的Znode。抢号成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode 的通知就可以了。当前一个Znode 删除的时候,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。

Zookeeper这种首尾相接,后面监听前面的方式,可以避免羊群效应。所谓羊群效应就是每个节点挂掉,所有节点都去监听,然后做出反映,这样会给服务器带来巨大压力,所以有了临时顺序节点,当一个节点挂掉,只有它后面的那一个节点才做出反映。

3.7. 分布式队列

队列方面,简单地讲有两种。

一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。对于第一种先进先出队列,和分布式锁服务中的控制时序场景基本原理一致,这里不再赘述。

第二种队列其实是在FIFO队列的基础上作了一个增强。通常可以在 /queue 这个znode下预先建立一个/queue/num 节点,并且赋值为n(或者直接给/queue赋值n),表示队列大小,之后每次有队列成员加入后,就判断下是否已经到达队列大小,决定是否可以开始执行了。这种用法的典型场景是,分布式环境中,一个大任务Task A,需要在很多子任务完成(或条件就绪)情况下才能进行。这个时候,凡是其中一个子任务完成(就绪),那么就去 /taskList 下建立自己的临时时序节点(CreateMode.EPHEMERAL_SEQUENTIAL),当 /taskList 发现自己下面的子节点满足指定个数,就可以进行下一步按序进行处理了。

4. 示例代码

4.1. curator

Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度。Curator帮助我们在其基础上进行封装、实现一些开发细节,包括接连重连、反复注册Watcher和NodeExistsException等。目前已经作为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。从编码风格上来讲,它提供了基于Fluent的编程风格支持。

除此之外,Curator还提供了Zookeeper的各种应用场景:Recipe、共享锁服务、Master选举机制和分布式计数器等。

现在先让我们看看Curator的几种锁方案:

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

接下来看下面一个项目的示例,项目中分别体现了 选举分布式锁 的例子。

pom.xml

        <!--curator-framework-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.2.0</version>
        </dependency>
        <!--curator-recipes-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.2.0</version>
        </dependency>

CuratorConfig.java

@ConfigurationProperties(prefix = "custom.zookeeper")
@Data
@Configuration
public class CuratorConfig {
    private String connectString;
    private int baseSleepTimeMs;
    private int maxRetries;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
    
    /**
     * 注册 CuratorFramework
     * @return
     */
    @Bean
    public CuratorFramework curatorFramework() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        return CuratorFrameworkFactory
                .builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeoutMs)
                .connectionTimeoutMs(connectionTimeoutMs)
                .retryPolicy(retryPolicy)
                .build();
    }
}

CuratorStart.java

@Component
public class CuratorStart implements ApplicationRunner {
    private final CuratorFramework curatorFramework;

    public CuratorStart(CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;
    }

    @Override
    public void run(ApplicationArguments applicationArguments){
        curatorFramework.start();
    }
}

4.2. 示例(选举)

TimerService.java

@Service
@Slf4j
public class TimerService {
    private static final String ZK_NODE_TASK_TIMER="/task-timer";
    DateTimeFormatter dateTimeFormatter= DateTimeFormatter.ofPattern("HH:mm:ss");

    @Value("${server.port}")
    private String serverPort;

    private final CuratorFramework curatorFramework;


    public TimerService(CuratorFramework curatorFramework){
        this.curatorFramework=curatorFramework;
    }

    @Scheduled(cron = "*/5 * * * * *")
    public void task(){
        LeaderLatch leaderLatch=new LeaderLatch(curatorFramework,ZK_NODE_TASK_TIMER);
        try {
            leaderLatch.start();
            Thread.sleep(2000);
            if (leaderLatch.hasLeadership()){
                log.warn("* <主服务> 是 "+serverPort+",当前时间为"+ LocalDateTime.now().format(dateTimeFormatter));
            }else {
                log.warn("副服务是 "+serverPort+",当前时间为"+ LocalDateTime.now().format(dateTimeFormatter));
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                leaderLatch.close();
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }
}

这里通过 @Scheduled 实现了定时任务,每5秒钟执行一次。这是我们经常头疼的问题,spring的定时是基于jvm进程内的现场池执行的,如果扩展节点,多个spring进程同时执行的话,就会重复执行定时任务。

那么这里我们分别指定port为 8001、8002、8003,分别运行3个程序。当它们每隔5秒同时触发方法执行时,就会在zookeeper中模拟一个选举,最终只有一个程序作为<主服务>被执行。

4.3. 示例(分布式锁)

EmployeeController.java

@Slf4j
@RestController
public class EmployeeController {
    DateTimeFormatter dateTimeFormatter= DateTimeFormatter.ofPattern("HH:mm:ss");

    private final CuratorFramework curatorFramework;

    public EmployeeController(CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;
    }

    /**
     * InterProcessMutex通过在zookeeper的某路径节点下创建临时序列节点来实现分布式锁,即每个线程(跨进程的线程)获取同一把锁前,都需要在同样的路径下创建一个节点,节点名字由uuid + 递增序列组成。而通过对比自身的序列数是否在所有子节点的第一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变动情况,然后进行等待状态。直到watcher的事件生效将自己唤醒,或者超时时间异常返回。
     *
     * @param key
     * @return
     */
    @GetMapping("/demo/{key}")
    public String save(@PathVariable("key") String key) {
        // 获取锁
        InterProcessSemaphoreMutex balanceLock = new InterProcessSemaphoreMutex(curatorFramework, "/zktest" + key);
        try {
            // 执行加锁操作
            balanceLock.acquire();
            log.warn("lock《, key=" + key+",当前时间为"+ LocalDateTime.now().format(dateTimeFormatter));

            Thread.sleep(10000);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                // 释放锁资源
                balanceLock.release();
                log.warn("unlock》, key=" + key+",当前时间为"+ LocalDateTime.now().format(dateTimeFormatter));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return key;
    }
}

当我们模拟不同的请求竞争同一个key时,每次加锁,线程10秒钟后,再解锁,最后其他的请求才可以再加锁,重复之前的操作。在同一个程序内,我可以通过jdk的线程锁来实现类似的功能,但如果我们想要实现在不同的程序中都可以如此加锁,就只能通过分布式锁来实现。

查看原文

赞 0 收藏 0 评论 0

KerryWu 发布了文章 · 10月25日

Spring Aop 动态代理

1.代理模式

代理是设计模式的一种,代理类为委托类提供消息预处理,消息转发,事后消息处理等功能。Java中的代理分为三种角色: 代理类、委托类、接口

为了保持行为的一致性,代理类和委托类通常会实现相同的接口,所以在访问者看来两者没有丝毫的区别。通过代理类这中间一层,能有效控制对委托类对象的直接访问,也可以很好地隐藏和保护委托类对象,同时也为实施不同控制策略预留了空间,从而在设计上获得了更大的灵活性。Java 动态代理机制以巧妙的方式近乎完美地实践了代理模式的设计理念。

Java中的代理按照代理类生成时机不同又分为静态代理动态代理

  • 静态代理:静态代理的特点是, 为每一个业务增强都提供一个代理类, 由代理类来创建代理对象. 下面我们通过静态代理来实现对转账业务进行身份验证.
  • 动态代理:静态代理会为每一个业务增强都提供一个代理类, 由代理类来创建代理对象, 而动态代理并不存在代理类, 代理对象直接由代理生成工具动态生成.

1.2.静态代理

Java中的静态代理要求代理类(ProxySubject)和委托类(RealSubject)都实现同一个接口(Subject)。如下示例:

接口 Subject.java
public interface Subject {
    
    public void sayHello();
    
}
委托类 RealSubject.java
public class RealSubject implements Subject {

    @Override
    public void sayHello() {
        System.out.println("hello!");
    }
    
}
代理类 ProxySubject.java
class ProxySubject implements Subject {
    private Subject subject;

    public ProxySubject(Subject subject) {
        this.subject = subject;
    }
    
    @Override
    public void sayHello() {
        System.out.println("Before say hello...");
        subject.sayHello();
        System.out.println("After say hello...");
    }
    
}
测试方法 main
    public static void main(String[] args) {
        Subject subject = new RealSubject();
        ProxySubject proxySubject = new ProxySubject(subject);
        proxySubject.sayHello();
    }

按照示例中,代理模式实现的功能,可以理解成一个简易版的Spring AOP 实现,那我们就拿代理模式和Spring AOP做对比。

代理模式的组成包括:接口、委托类和代理类。我们在Spring中使用AOP,通常针对的“切面”,也就是委托类会有很多。接口和委托类是业务代码,必不可少,但代理类这是为了代理模式而创建的。如果每个委托类对应代理类的逻辑都不一样还好,可 如果多个委托类复用同一个代理类方法,就显得很冗余了

1.2. jdk动态代理

为了解决这类问题,jdk有提供动态代理的实现,即提供可复用的代理类。动态代理就是要生成一个包装类对象,由于代理的对象是动态的,所以叫动态代理。

JDK动态代理是使用 java.lang.reflect 包下的代理类来实现. JDK动态代理动态代理必须要有接口.

由于我们需要增强,这个增强是需要留给开发人员开发代码的,因此代理类不能直接包含被代理对象,而是一个InvocationHandler,该InvocationHandler包含被代理对象,并负责分发请求给被代理对象,分发前后均可以做增强。从原理可以看出,JDK动态代理是“对象”的代理。

上面的代码实现,可以修改成下面这种方式,同样能实现功能。

接口 Subject.java
public interface Subject {
    
    public void sayHello();
    
}
委托类 RealSubject.java
public class RealSubject implements Subject {

    @Override
    public void sayHello() {
        System.out.println("hello!");
    }
    
}
代理类 InvocationHandlerImpl.java
public class InvocationHandlerImpl implements InvocationHandler {

    private Object object;

    public InvocationHandlerImpl(Object object) {
        this.object = object;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        System.out.println("Before say hello...");
        Object returnValue = method.invoke(subject, args);
        System.out.println("After say hello...");
        return returnValue;
    }
}
测试方法 main
    public static void main(String[] args) {
        Subject realSubject = new RealSubject();
        InvocationHandler handler = new InvocationHandlerImpl(realSubject);
        ClassLoader loader = realSubject.getClass().getClassLoader();
        Class[] interfaces = realSubject.getClass().getInterfaces();
        Subject subject = (Subject) Proxy.newProxyInstance(loader, interfaces, handler);
        subject.sayHello();
    }

1.3. cglib动态代理

JDK动态代理必须要有接口, 但如果要代理一个没有接口的类该怎么办呢? 这时我们可以使用CGLIB动态代理. CGLIB动态代理的原理是生成目标类的子类, 这个子类对象就是代理对象, 代理对象是被增强过的.

注意: 不管有没有接口都可以使用CGLIB动态代理, 而不是只有在无接口的情况下才能使用.

委托类 RealSubject.java
public class RealSubject {
    public void sayHello() {
        System.out.println("hello!");
    }
}
代理类 MethodInterceptorImpl.java
public class MethodInterceptorImpl implements MethodInterceptor {

    @Override
    public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
        System.out.println("Before say hello...");
        Object returnValue= methodProxy.invokeSuper(obj, args);
        System.out.println("After say hello...");
        return returnValue;
    }

}
测试方法 main
    public static void main(String[] args) {
        RealSubject target = new RealSubject();
        RealSubject proxy = (RealSubject) Enhancer.create(target.getClass(),
                new MethodInterceptorImpl());
        proxy.sayHello();
    }

2. Spring AOP实现原理

关于Spring AOP的概念我就不多说了,大家都知道是基于动态代理实现的,就是上面我们说的这些,那么具体是怎么实现的呢?

在代理模式中有三种核心的角色:委托类、代理类、接口,而gclib动态代理中“接口”是非必须的,因此我们关注Spring AOP中 委托类代理类的实现。

委托类

回顾一下Aop的实现代码:需要在实现类上加上@Aspect的注解,还需要通过@Pointcut注解来申明“切点”,即委托类和委托方法的路径。

有了这些信息就足够获取委托类了。这里充分用到Java反射,先找到包含@Aspect注解的类,然后找到该类下的@Pointcut注解,读取所定义的委托类和委托方法路径,就完全能拿到委托类对象。

代理类

因为我们使用的是动态代理,这里的代理类可以被替换成代理方法。同样,我们在@Aspect注解的类中,用@Around、@Before、@After修饰的方法,就是我们想要的代理方法。

总结

我们可以通过BeanFactoryPostProcessor的实现类,完成对所有BeanDefinition的扫描,找出我们定义的所有的切面类,然后循环里面的方法,找到切点、以及所有的通知方法,然后根据注解判断通知类型(也就是前置,后置还是环绕),最后解析切点的内容,扫描出所有的目标类。这样就获取了委托类代理方法

现在委托类代理方法 都有了,我们知道在动态代理模式中,最终的目的是将委托类的方法执行,替换成代理类的方法执行。但是在Spring中,我们是感知不到代理类的,我们在代码中还是调用原委托类的方法,那么Spring框架是如何神不知鬼不觉地将委托类替换成代理类的呢?

这就涉及到我们之前有关Ioc文章的内容了,在Bean的生命周期中,Bean在初始化前后会执行BeanPostProcessor的方法。可以把它理解成一个增强方法,可以将原始的Bean经过“增强”处理后加载到Ioc容器中。这就是一个天然的代理方法,原始的Bean就是委托类,在此处实现代理方法生成代理类,再将代理类加载进Ioc容器。

3. jdk动态代理和cglib对比

动态代理cglibjdk
是否提供子类代理
是否提供接口代理
区别必须依赖于CGLib的类库,但是它需要类来实现任何接口代理的是指定的类生成一个子类,覆盖其中的方法实现InvocationHandler,使用Proxy.newProxyInstance产生代理对象,被代理的对象必须要实现接口
Cglib和jdk动态代理的区别?

1、Jdk动态代理:利用拦截器(必须实现InvocationHandler)加上反射机制生成一个代理接口的匿名类,在调用具体方法前调用InvokeHandler来处理
2、 Cglib动态代理:利用ASM框架,对代理对象类生成的class文件加载进来,通过修改其字节码生成子类来处理

什么时候用cglib什么时候用jdk动态代理?

1、目标对象生成了接口 默认用JDK动态代理
2、如果目标对象使用了接口,可以强制使用cglib
3、如果目标对象没有实现接口,必须采用cglib库,Spring会自动在JDK动态代理和cglib之间转换

JDK动态代理和cglib字节码生成的区别?

1、JDK动态代理只能对实现了接口的类生成代理,而不能针对类
2、Cglib是针对类实现代理,主要是对指定的类生成一个子类,覆盖其中的方法,并覆盖其中方法的增强,但是因为采用的是继承,所以该类或方法最好不要生成final,对于final类或方法,是无法继承的

Cglib比JDK快?

1、cglib底层是ASM字节码生成框架,但是字节码技术生成代理类,在JDL1.6之前比使用java反射的效率要高
2、在jdk6之后逐步对JDK动态代理进行了优化,在调用次数比较少时效率高于cglib代理效率
3、只有在大量调用的时候cglib的效率高,但是在1.8的时候JDK的效率已高于cglib
4、Cglib不能对声明final的方法进行代理,因为cglib是动态生成代理对象,final关键字修饰的类不可变只能被引用不能被修改

Spring如何选择是用JDK还是cglib?

1、当bean实现接口时,会用JDK代理模式
2、当bean没有实现接口,用cglib实现
3、可以强制使用cglib(在spring配置中加入<aop:aspectj-autoproxy proxyt-target-class=”true”/>)

查看原文

赞 0 收藏 0 评论 0

KerryWu 发布了文章 · 10月11日

Redis的线程模型和事务

1. 前言

我原本只是想学习Redis的事务,但后来发现,Redis和传统关系型数据库的事务在ACID的表现上差异很大。而要想详细了解其中的缘由,就离不开Redis独特的单线程模型,因此本文将二者联系在一起讲解。

下面先会补充一些知识储备,包括解答几个常犯错的问题,分析Redis的线程模型,为后面的章节打好基础。随后再讲解Redis的事务实现,和关系型数据库的事务做对比,以及会附上springboot中实现事务的代码。

2. 常见问题

2.1. 高并发不等于高并行

我们最多听到的就是并发,但实际上很多时候并不严谨,有些情况应该被定义为并行

  • 并发,是指在一个时间段内有多个进程在执行。只不过在人的角度看,因为这个计算机角度的时间实在是太短暂了,人根本就感受不到是多个进程,看起来像是同时进行,这种是并发。
  • 并行,指的是在同一时刻有多个进程在同时执行。

一个是时间段内发生的,一个是某一时刻发生的,如果是在只有单核CPU的情况下,是无法实现并行的,因为同一时刻只能有一个进程被调度执行,如果此时同时要执行其他进程则必须上下文切换,这种只能称之为并发,而如果是多个CPU的情况下,就可以同时调度多个进程,这种就可以称之为并行。

2.2. 什么时候该用多线程

我们首先要明确,多线程不一定比单线程快,因为多线程还涉及到CPU上下文切换的消耗,和频繁创建、销毁线程的消耗 。那么多线程是为了优化什么而使用的呢?我所了解的有两点:

1.充分利用多核CPU的资源,实现并行

因为多核cpu每一个核心都可以独立执行一个线程,所以多核cpu可以真正实现多线程的并行。
但这点优化算不上什么,一台服务器上一般部署了很多的应用,哪有那么多空闲的CPU核心空闲着。

2.应对CPU的“阻塞”

我认为这才是主要原因。“阻塞”包括网络io、磁盘io等这类io的阻塞,还包括一些执行很慢的逻辑操作等。例如:某个接口的方法中,按照执行顺序分成A、B、C三个独立的部分。

如果每个部分执行的都很慢(如:查询数据库视图,将数据导出excel文件),都要10秒。那么方法执行完成,单线程要用30秒,多线程分别执行只需要10秒。优化了20秒,线程创建和CPU上下文切换的影响,和20秒比起来不算什么。

如果每个部分执行的都很快,都只需要10毫秒。按照上面的计算方式,理论上优化了20毫秒,可线程创建和CPU上下文切换的影响,可是要大于20毫秒的。

因此总体来说,多线程开发对于程序的优化,主要体现在应对导致CPU“阻塞”的点。

3. 线程模型

Redis服务端通过单进程单线程,处理所有客户端的请求。

Redis官方数据是说支持100000+ 的QPS(峰值时间的每秒请求),很难相信这是靠单线程来支撑的。因此我们要探究一下,Redis的线程模型为啥能支持它执行这么快?

3.1. 性能瓶颈

官方表示,Redis是基于内存操作,CPU不是Redis的性能瓶颈,Redis的性能瓶颈是机器的内存和网络带宽。

看到这句话,我有个疑惑,为啥 “Redis是基于内存操作,CPU不是Redis的性能瓶颈”

这就联系到第二章中“2.多线程不一定快”的知识点了-- 在多线程开发对于程序的优化,主要体现在应对导致CPU“阻塞”的点。普通数据库的瓶颈在于磁盘io,可Redis是基于内存操作,没有磁盘io的瓶颈,而且基于Reactor模型,也没有网络io的阻塞。没有多线程的必要,CPU也就不是Redis的性能瓶颈。

另外Redis是将所有的数据全部放在内存中的,所有说使用单线程去操作执行效率就是最高的,多线程在执行过程中需要进行 CPU 的上下文切换,这个是耗时操作。对于内存系统来说,如果没有上下文切换效率就是最高的,多次读写都是在一个 CPU 上的,在内存情况下,这个就是最佳方案。

我们可以理解成,因为Redis作为内存数据库,又有个很好的线程模型,并不存在io阻塞和CPU等性能瓶颈。再往后可以提升Redis空间的,就在于机器的内存和网络带宽了。

3.2. 线程模型

我之前的很多篇文章都提到了Reactor线程模型,像Tomcat、Netty等,都使用了Reactor线程模型来实现IO多路复用,这次再加上Redis。还记得之前有介绍Reactor模型有三种:单线程Reactor模型,多线程Reactor模型,主从Reactor模型。

通常来说,主从Reactor模型是最健壮的,Tomcat和Netty都是使用这种,但是 Redis是使用单线程Reactor模型

image

上图描述了Redis工作的线程模型,模拟了服务端处理客户端命令的过程:

  1. 文件事件处理器使用 I/O 多路复用(multiplexing)程序来同时监听多个套接字,即将套接字的fd注册到epoll上,当被监听的套接字准备好执行连接应答(accept)、读取(read)、写入(write)、关闭(close)等操作时,与操作相对应的文件事件就会产生。
  2. 尽管多个文件事件可能会并发地出现,但I/O多路复用程序总是会将所有产生事件的套接字都推到一个队列里面,然后通过这个队列,以有序(sequentially)、同步(synchronously)、每次一个套接字的方式向文件事件分派器传送套接字。
  3. 此时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。文件事件处理器以单线程方式运行,这就是之前一直提到的Redis线程模型中,效率很高的那个单线程。

值得注意的是,在执行命令阶段,由于Redis是单线程来处理命令的,所有每一条到达服务端的命令不会立刻执行,所有的命令都会进入一个队列中,然后逐个被执行。并且多个客户端发送的命令的执行顺序是不确定的。但是可以确定的是,不会有两条命令被同时执行,不会产生并行问题,这也是后面我们讨论Redis事务的基础

3.3. 分析

为什么不怕Reactor单线程模型的弊端?

我们回顾之前的文章,Reactor单线程模型的最大缺点在于:Acceptor和Handlers都共用一个线程,只要某个环节发生阻塞,就会阻塞所有。整个尤其是Handlers是执行业务方法的,最容易发生阻塞,像Tomcat就默认使用200容量大线程池来执行。那Redis为什么就不怕呢?

原因就在于Redis作为内存数据库,它的Handlers是可预知的,不会出现像Tomcat那样的自定义业务方法。不过也建议不要在Reids中执行要占用大量时间的命令。

总结:Redis单线程效率高的原因
  • 纯内存访问:数据存放在内存中,内存的响应时间大约是100纳秒,这是Redis每秒万亿级别访问的重要基础。
  • 非阻塞I/O:Redis采用epoll做为I/O多路复用技术的实现,再加上Redis自身的事件处理模型将epoll中的连接,读写,关闭都转换为了时间,不在I/O上浪费过多的时间。
  • 单线程避免了线程切换和竞态产生的消耗。

4. 事务

前面说过,由于Redis单线程的特性,所有的命令都是进入一个队列中,依次执行。因此不会有两条命令被同时执行,不会产生并行问题。这点和传统关系型数据库不一样,没有并行问题,也就没有像表锁、行锁这类锁竞争的问题了。

4.1. 概念

那么Redis的事务是为了处理什么情况?

假设,客户端A提交的命令有A1、A2和A3 这三条,客户端B提交的命令有B1、B2和B3,在进入服务端队列后的顺序实际上很大部分是随机。假设是:A1、B1、B3、A3、B2、A2,可客户端A期望自己提交的是按照顺序一起执行的,它就可以使用事务实现:B2、A1、A2、A3、B1、B3,客户端B的命令执行顺序还是随机的,但是客户端A的命令执行顺序就保证了。

Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。

总结说:redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。  

Redis事务相关命令
  • watch key1 key2 ... : 监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则事务被打断 ( 类似乐观锁 )
  • multi : 标记一个事务块的开始( queued )
  • exec : 执行所有事务块的命令 ( 一旦执行exec后,之前加的监控锁都会被取消掉 ) 
  • discard : 取消事务,放弃事务块中的所有命令
  • unwatch : 取消watch对所有key的监控
事务执行过程

multi命令可以将执行该命令的客户端从非事务状态切换至事务状态,执行后,后续的普通命令(非multi、watch、exec、discard的命令)都会被放在一个事务队列中,然后向客户端返回QUEUED回复。

事务队列是一个以先进先出(FIFO)的方式保存入队的命令,较先入队的命令会被放到数组的前面,而较后入队的命令则会被放到数组的后面。

当一个处于事务状态的客户端向服务器发送exec命令时,这个exec命令将立即被服务器执行。服务器会遍历这个客户端的事务队列,执行队列中保存的所有的命令,最后将执行命令所得的结果返回给客户端。

当一个处于事务状态的客户端向服务器发送discard命令时,表示事务取消,客户端从事务状态切换回非事务状态,对应的事务队列清空。

watch

watch命令可被用作乐观锁。它可以在exec命令执行前,监视任意数量的数据库键,并在exec命令执行时,检查监视的键是否至少有一个已经被其他客户端修改过了,如果修改过了,服务器将拒绝执行事务,并向客户端返回代表事务执行失败的空回复。而unwatch命令用于取消对所有键的监视。

要注意,watch是监视键被其他客户端修改过,即其他的会话连接中。如果你在同一个会话下自己watch自己改,是不生效的。

4.2. ACID分析

在传统关系型数据库中,事务都是遵循ACID四个特性的,那么Redis的事务遵循吗?

原子性(Atomicity)
原子性是指事务包含的所有操作要么全部成功,要么全部失败回滚。

Redis 开始事务 multi 命令后,Redis 会为这个事务生成一个队列,每次操作的命令都会按照顺序插入到这个队列中。这个队列里面的命令不会被马上执行,直到 exec 命令提交事务,所有队列里面的命令会被一次性,并且排他的进行执行。

但是呢,当事务队列里面的命令执行报错时,会有两种情况:(1)一种错误类似于Java中的CheckedException,Redis执行器会检测出来,如果某个命令出现了这种错误,会自动取消事务,这是符合原子性的;(2)另一种错误类似于Java中的RuntimeExcpetion,Redis执行器检测不出来,当执行报错了已经来不及了,错误命令后续的命令依然会执行完毕,并不会回滚,因此不符合原子性。

一致性(Consistency)
一致性是指事务必须使数据库从一个一致性状态变换到另一个一致性状态,也就是说一个事务执行之前和执行之后都必须处于一致性状态。

因为达不成原子性,其实严格上来讲,也就达不成一致性。

隔离性(Isolation)
隔离性是当多个用户并发访问数据库时,比如操作同一张表时,数据库为每一个用户开启的事务,不能被其他事务的操作所干扰,多个并发事务之间要相互隔离。

回顾前面的基础,Redis 因为是单线程依次执行队列中的命令的,没有并发的操作,所以在隔离性上有天生的隔离机制。,当 Redis 执行事务时,Redis 的服务端保证在执行事务期间不会对事务进行中断,所以,Redis 事务总是以串行的方式运行,事务也具备隔离性。

持久性(Durability)
持久性是指一个事务一旦被提交了,那么对数据库中的数据的改变就是永久性的,即便是在数据库系统遇到故障的情况下也不会丢失提交事务的操作。

Redis 是否具备持久化,这个取决于 Redis 的持久化模式:

  • 纯内存运行,不具备持久化,服务一旦停机,所有数据将丢失。
  • RDB 模式,取决于 RDB 策略,只有在满足策略才会执行 Bgsave,异步执行并不能保证 Redis 具备持久化。
  • AOF 模式,只有将 appendfsync 设置为 always,程序才会在执行命令同步保存到磁盘,这个模式下,Redis 具备持久化。(将 appendfsync 设置为 always,只是在理论上持久化可行,但一般不会这么操作)

简单总结:

  • Redis 具备了一定的原子性,但不支持回滚。
  • Redis 不具备 ACID 中一致性的概念。(或者说 Redis 在设计时就无视这点)
  • Redis 具备隔离性。
  • Redis 通过一定策略可以保证持久性。

当然,我们也不应该拿传统关系型数据库事务的ACID特性去要求Redis,Redis设计更多的是追求简单与高性能,不会受制于传统 ACID 的束缚。

4.3. 代码

这里结合springboot代码做示例,加深我们对Redis事务的应用开发。在springboot中构建Redis客户端,一般通过spring-boot-starter-data-redis来实现。

jedis 和 lettuce

Lettuce和Jedis的都是连接Redis Server的客户端程序。Jedis在实现上是直连redis server,多线程环境下非线程安全,除非使用连接池,为每个Jedis实例增加物理连接。Lettuce基于Netty的连接实例(StatefulRedisConnection),可以在多个线程间并发访问,且线程安全,满足多线程环境下的并发访问,同时它是可伸缩的设计,一个连接实例不够的情况也可以按需增加连接实例。

可见Lettuce是要优于Jedis的,在spring-boot-starter-data-redis早期版本都是使用Jedis连接的,但到了2.x版本,Jedis就直接被替换成Lettuce。

下面直接看代码吧。

pom

pom文件主要是引入了spring-boot-starter-data-redis

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
controller

controller中定义了两个接口:

  • 接口1 watch:watch键A,在事务中修改键A和B的值,在阻塞3秒后,提交事务。
  • 接口2 change:修改键A。
@RestController
public class DemoController {
    public final static String STR_KEY_A="key_a";
    public final static String STR_KEY_B="key_b";

    private final StringRedisTemplate stringRedisTemplate;

    public DemoController(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @GetMapping("/watch")
    public void watch(){
        stringRedisTemplate.setEnableTransactionSupport(true);
        stringRedisTemplate.watch(STR_KEY_A);
        stringRedisTemplate.multi();
        try {
            stringRedisTemplate.opsForValue().set(STR_KEY_A, "watch_a");
            stringRedisTemplate.opsForValue().set(STR_KEY_B, "watch_b");
            Thread.sleep(3000);
        }catch (Exception e){
            e.printStackTrace();
            stringRedisTemplate.discard();
        }
        stringRedisTemplate.exec();
        stringRedisTemplate.unwatch();
    }

    @GetMapping("/change")
    public void change(){
        stringRedisTemplate.opsForValue().set(STR_KEY_A,"change_a");
    }

}
测试用例

我们写一个测试用例,大致逻辑是:先调用接口1,0.5秒后(为了保证接口1先于接口2执行,因为线程实际执行顺序不一定按照业务代码顺序来),再调用接口2,并且在两个接口的线程中,都会将键A和B的值打印出来。

因为接口1的事务是延迟3秒提交的,因此执行顺序是:

接口1 watch 键A ->接口1 multi开始事务 -> 接口2 修改键A -> 接口1 提交事务

结果也符合我们预想的,因为在接口1 watch的键值,被接口2修改了,所以接口1 的事务执行失败了,最终输出的日志是:

2020-10-11 23:32:14.133  Thread2执行结果:
key_a:change_a
key_b:null
2020-10-11 23:32:16.692  Thread1执行结果:
key_a:change_a
key_b:null
@RunWith(SpringRunner.class)
@SpringBootTest
@AutoConfigureMockMvc
public class DemoControllerTest {
    private final Logger logger = LoggerFactory.getLogger(DemoControllerTest.class);

    @Autowired
    private MockMvc mockMvc;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Test
    public void transactionTest() throws InterruptedException{
        /**
         * 清空数据,删除 A、B 键
         */
        stringRedisTemplate.delete(DemoController.STR_KEY_A);
        stringRedisTemplate.delete(DemoController.STR_KEY_B);
        /**
         * 线程1:watch A 键
         * 事务:修改A、B 键值,阻塞10秒后exec、unwatch
         * 输出:A、B键值
         */
        Thread thread1 = new Thread(() -> {
            try {
                mockMvc.perform(MockMvcRequestBuilders.get("/watch"));
                logger.info(new StringBuffer(Thread.currentThread().getName()).append("执行结果:\n")
                        .append(DemoController.STR_KEY_A).append(":").append(stringRedisTemplate.opsForValue().get(DemoController.STR_KEY_A))
                        .append("\n").append(DemoController.STR_KEY_B).append(":").append(stringRedisTemplate.opsForValue().get(DemoController.STR_KEY_B))
                        .toString());
            } catch (Exception e) {
                logger.error("/watch",e);
            }
        });
        thread1.setName("Thread1");
        /**
         * 线程2:修改 A 键
         * 事务:无事务,无阻塞
         * 输出:A、B 键值
         */
        Thread thread2 = new Thread(() -> {
            try {
                mockMvc.perform(MockMvcRequestBuilders.get("/change"));
                logger.info(new StringBuffer(Thread.currentThread().getName()).append("执行结果:\n")
                        .append(DemoController.STR_KEY_A).append(":").append(stringRedisTemplate.opsForValue().get(DemoController.STR_KEY_A))
                        .append("\n").append(DemoController.STR_KEY_B).append(":").append(stringRedisTemplate.opsForValue().get(DemoController.STR_KEY_B))
                        .toString());
            } catch (Exception e) {
                logger.error("/change",e);
            }
        });
        thread2.setName("Thread2");
        /**
         * 线程1 比 线程2 先执行
         */
        thread1.start();
        Thread.sleep(500);
        thread2.start();
        /**
         * 主线程,等待 线程1、线程2 执行完成
         */
        thread1.join();
        thread2.join();
    }
}
查看原文

赞 3 收藏 1 评论 0

KerryWu 发布了文章 · 10月7日

RabbitMQ(2)- 死信队列、延迟队列、优先队列

1. 前言

《RabbitMQ(1)-基础开发应用》中,我们已经介绍了RabbitMQ的基础开发应用。本文基于这些基础再做一些扩展,延伸出一些高级的用法,如:死信队列、延迟队列和优先队列。不过还是以死信队列为主,因为延迟队列是死信队列的衍生概念,而且优先队列也比较简单,所以先还是在代码层面上,把死信队列搞透。

1.1. 创建队列、交换机

我们在使用RabbitMQ之前,需要先创建好相关的队列和交换机,并且设置一些绑定关系。因为几篇文章都是结合springboot来开发,下面就结合springboot介绍几种创建方式:

  1. 直接访问 RabbitMQ Management 管理页面,在页面上创建;或者使用 RabbitMQ其他的客户端来创建管理。
  2. 在springboot上基于消费端开发时,@RabbitListener 注解的 bindings属性,可以简单实现类似功能。
@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "true"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
  1. 在配置类下定义@Bean,即向Ioc容器中注册Queue、Exchange、Binding的实例。
package pers.kerry.exercise.rabbitmq.rabbitmqproducera.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @description:
 * @date: 2020/7/12 11:26 下午
 * @author: kerry
 */
@Configuration
public class RabbitConfig {
   
    public static final String NORMAL_EXCHANGE_A="demo.direct.exchange.a";
    public static final String NORMAL_ROUTING_KEY_A="demo.direct.routingKey.a";
    public static final String NORMAL_QUEUE_A="demo.direct.queue.a";


    /**
     * NORMAL 交换机
     * @return
     */
    @Bean
    public Exchange ExchangeA(){
        return ExchangeBuilder
                .directExchange(NORMAL_EXCHANGE_A)
                .durable(true)
                .build();
    }

    /**
     * NORMAL 队列
     * @return
     */
    @Bean
    public Queue QueueA(){
        return QueueBuilder
                .durable(NORMAL_QUEUE_A)
                .build();
    }

    /**
     * 绑定 NORMAL队列 和 NORMAL交换机
     * @return
     */
    @Bean
    public Binding normalBinding(){
        return new Binding(NORMAL_QUEUE_A,
                Binding.DestinationType.QUEUE,
                NORMAL_EXCHANGE_A,
                NORMAL_ROUTING_KEY_A,
                null);
    }
}

我个人推荐第三种,而且建议是在生产者端定义,消费者应该更关注消费的逻辑。但是如果用代码来创建,有一个很大的缺点,就是不能删除和修改,至少我目前还没找到办法。

因此要结合第一种和第三种来使用,当然都用第一种也是可以的。只是开发人员,更希望队列、交换机的创建、绑定的逻辑,都体现在代码里面,通过代码可以更好的阅读架构设计。

2. 死信队列

2.1. 简介

死信队列这个名字听起来很特别,但它解决的是日常开发中最常见的问题:不能正常消费的消息,该如何处理。我们在第一篇文章中有使用到手动Ack,对于需要nack并且无需重回队列的消息,期望有统一的异常处理;包括有些消息是有时效性的,如果支付订单一般都最大支付时常,超时后就应该取消订单;等等。

死信队列就是应对这些情况的,它出现的条件如下:

  1. 消息被否定确认,使用basicNackbasicReject ,并且此时requeue 属性被设置为false。
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。

死信队列的架构如下:

生产者 --> 消息 --> 业务交换机 --> 业务队列 --> 消息变成死信 --> 死信交换机 -->死信队列 --> 消费者

2.2. 配置

如何配置死信队列呢?其实很简单,大概可以分为以下步骤:

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列

注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。

有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。

2.3. 代码(生产者)

按照简介中死信队列的架构,我们在配置文件中定义了NORMAL的业务队列和业务交换机,以及DLX的死信队列和死信交换机,并在业务队列中设置了死信交换机。

RabbitConfig.java

package pers.kerry.exercise.rabbitmq.rabbitmqproducera.config;


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @description:
 * @date: 2020/7/12 11:26 下午
 * @author: kerry
 */
@Configuration
public class RabbitConfig {
    /**
     * DLX,定义参数
     */
    public static final String X_DEAD_LETTER_EXCHANGE="x-dead-letter-exchange";
    public static final String X_DEAD_LETTER_ROUTING_KEY="x-dead-letter-routing-key";
    public static final String X_MESSAGE_TTL="x-message-ttl";
    public static final String X_MAX_LENGTH="x-max-length";
    /**
     * DLX,命名
     */
    public static final String DEAD_LETTER_EXCHANGE_A="demo.direct.dlx.exchange.a";
    public static final String DEAD_LETTER_ROUTING_KEY_A="demo.direct.dlx.routingKey.a";
    public static final String DEAD_LETTER_QUEUE_A="demo.direct.dlx.queue.a";
    /*
     * NORMAL,命名
     */
    public static final String NORMAL_EXCHANGE_A="demo.direct.exchange.a";
    public static final String NORMAL_ROUTING_KEY_A="demo.direct.routingKey.a";
    public static final String NORMAL_QUEUE_A="demo.direct.queue.a";


    @Bean("jsonRabbitTemplate")
    public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean("defaultRabbitTemplate")
    public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    /**
     * DLX 交换机
     * @return
     */
    @Bean
    public Exchange dlxExchangeA(){
        return ExchangeBuilder
                .directExchange(DEAD_LETTER_EXCHANGE_A)
                .durable(true)
                .build();
    }

    /**
     * DLX 队列
     * @return
     */
    @Bean
    public Queue dlxQueueA(){
        return QueueBuilder
                .durable(DEAD_LETTER_QUEUE_A)
                .build();
    }

    /**
     * NORMAL 交换机
     * @return
     */
    @Bean
    public Exchange ExchangeA(){
        return ExchangeBuilder
                .directExchange(NORMAL_EXCHANGE_A)
                .durable(true)
                .build();
    }

    /**
     * NORMAL 队列
     * @return
     */
    @Bean
    public Queue QueueA(){
        return QueueBuilder
                .durable(NORMAL_QUEUE_A)
                //设置 死信交换机
                .withArgument(X_DEAD_LETTER_EXCHANGE,DEAD_LETTER_EXCHANGE_A)
                .withArgument(X_DEAD_LETTER_ROUTING_KEY,DEAD_LETTER_ROUTING_KEY_A)
                //设置 队列所有消息 存活时间8秒
                .withArgument(X_MESSAGE_TTL,8000)
                //设置 队列最大长度 10条
                .withArgument(X_MAX_LENGTH,10)
                .build();
    }

    /**
     * 绑定 DLX队列 和 DLX交换机
     * @return
     */
    @Bean
    public Binding dlxBinding(){
        return new Binding(DEAD_LETTER_QUEUE_A,
                Binding.DestinationType.QUEUE,DEAD_LETTER_EXCHANGE_A,
                DEAD_LETTER_ROUTING_KEY_A,
                null);
    }

    /**
     * 绑定 NORMAL队列 和 NORMAL交换机
     * @return
     */
    @Bean
    public Binding normalBinding(){
        return new Binding(NORMAL_QUEUE_A,
                Binding.DestinationType.QUEUE,
                NORMAL_EXCHANGE_A,
                NORMAL_ROUTING_KEY_A,
                null);
    }

}

ProducerService.java

@Slf4j
@Service
public class ProducerService {

    public void sendText(String data, MessageProperties messageProperties) {
        /**
         * 对单个消息 设置TTL
         */
        //messageProperties.setExpiration(String.valueOf(3000));
        Message message = defaultRabbitTemplate
                .getMessageConverter()
                .toMessage(data, messageProperties);

       defaultRabbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE_A,
                RabbitConfig.NORMAL_ROUTING_KEY_A,
                message);
    }
}

2.4. 代码(消费者)

消费者的逻辑比较简单,主要是分别监听业务队列和死信队列,这里将两个队列的消息输出日志。这里模拟的是消费者nack消息,并且不退回队列的情况。

MessageListener.java

/**
 * @description:
 * @date: 2020/7/12 11:07 下午
 * @author: kerry
 */
@Component
@Slf4j
public class MessageListener {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ObjectMapper objectMapper;

    /**
     * @param message
     * @param channel
     * 监听 业务队列
     *
     * @throws Exception
     */
    @RabbitListener(queues = RabbitConfig.NORMAL_QUEUE_A)
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        switch (contentType) {
            //字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json对象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("业务队列-拒绝消息: " + bodyText);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
         /**
         * 延迟队列,被消费的消息需要重回队列
         */
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
    }


    /**
     * @param message
     * @param channel
     * 监听 死信队列
     *
     * @throws Exception
     */
    @RabbitListener(queues = RabbitConfig.DEAD_LETTER_QUEUE_A)
    @RabbitHandler
    public void onMessageDLX(Message message, Channel channel) throws Exception {
        String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        switch (contentType) {
            //字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json对象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("死信队列-接收消息: " + bodyText);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 
    }

}

2.5. 分析

开头说过,导致消息转为死信队列的方式有3种,下面就从代码中分析这3种情况。

我们回过头来看看消费者这边,定义业务队列的方法:

    /**
     * NORMAL 队列
     * @return
     */
    @Bean
    public Queue QueueA(){
        return QueueBuilder
                .durable(NORMAL_QUEUE_A)
                //设置 死信交换机
                .withArgument(X_DEAD_LETTER_EXCHANGE,DEAD_LETTER_EXCHANGE_A)
                .withArgument(X_DEAD_LETTER_ROUTING_KEY,DEAD_LETTER_ROUTING_KEY_A)
                //设置 队列所有消息 存活时间8秒
                .withArgument(X_MESSAGE_TTL,8000)
                //设置 队列最大长度 10条
                .withArgument(X_MAX_LENGTH,10)
                .build();
    }
1. nack拒绝消息,requeue=false

在死信队列的架构中,只要在业务队列中设置了死信交换机x-dead-letter-exchange。消费者代码中,我们在业务队列的消费过程中nack消息,并且requeue=false即可。x-dead-letter-routing-key可以不设置,如果不设置,默认取消息原来的路由键。
代码如下:

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

channel.basicNack方法的参数如下:

  1. deliveryTag:该消息的index。
  2. multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
  3. requeue:被拒绝的是否重新入队列。
2. 消息超时TTL

TTL(Time-To-Live)指消息的存活时间,我们有两种方式设置消息的TTL:

  1. 设置队列的TTL,即参数x-message-ttl,设置后,进入该队列的所有消息的TTL都为对应的值。
  2. 设置单个消息的TTL,在生产者代码中,给消息的属性中设置过期时间。如:messageProperties.setExpiration(String.valueOf(3000));,就是设置消息的TTL为3秒。

不过要注意的是,如果已经设置了nack的死信逻辑,TTL的死信就不生效了。道理也很简单,因为nack消息和requeue=false一起用,代表消息被消费了,并且消息不会重回队列,直接被丢弃或进入死信队列,又怎么会在队列中超时了呢。

3. 超过队列长度

可以设置队列长度,例如最大接收消息的数量。当消息在队列中已经达到最大数量,那么后面再来的消息,就会被直接丢进死信队列。
我们也是中定义业务队列的代码中,有通过x-max-length参数,设置业务队列的长度。

3. 延迟队列

在我还不知道延迟队列之前,我就觉得消息中间件应该具备这样的功能。在消息发布到队列后,我期望每个消息延迟指定时间后,再被消费者获取到。例如:在支付模块中,当用户生成订单,再到支付完成订单,是有一段时间的。而我们一般会给这个订单设置超时时间,如果超过了这段时间,订单就应该被取消,无法再支付。那么将订单作为消息,就可以利用延迟队列来实现取消订单的逻辑。

RabbitMQ并不直接支持延迟队列的功能,而是作为一个概念,你可以利用死信队列来实现“延迟队列”。利用TTL超时时间的死信方式,来实现延迟队列。

回顾一下上段中TTL的方式,我们在业务队列中除了定义死信交换机x-dead-letter-exchange,还可以定义队列的生存时间x-message-ttl,或者设置消息的过期时间。而如果我们不消费这个业务队列中的消息,那么消息在到达TTL后,就会自动转到死信队列中。如果我们只消费死信队列中的消息,忽略掉业务队列这个“中转站”,就相当于消息在被发布后,经过指定时间延迟,在死信队列中被消费,这就形成了一个“延迟队列”。

因为延迟队列就是死信队列的一种实现,所以代码层面上可以直接参考上段中TTL的部分。

4. 优先队列

优先队列,顾名思义,拥有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的权力。因此在优先队列中有两个逻辑点:队列是优先队列消息有优先级。可参考死信队列章节中,TTL部分的代码,下面对代码改动地方做一下说明:

  1. 设置队列最大优先级,即消息可使用的最大优先级数字,可通过x-max-priority参数来设置。
  2. 设置消息的优先级,在生产者代码中,在消息的属性中设置优先级,优先级越大,越先被消费。如:messageProperties.setPriority(3),即设置该消息的优先级为3。

优先队列的使用场景是在:消息有分优先级的需求,并且并发量较大。要求并发量大,是因为如果所有消息在发布之后,马上就被消费了,那么分优先级的必要性就不大了。

查看原文

赞 3 收藏 3 评论 0

KerryWu 发布了文章 · 9月27日

数据库事务的实现原理

1. 前言

都知道数据库事务有ACID特性(原子性、一致性、隔离型、持久性),本文简单聊一下它们的实现原理。

2. 日志文件

2.1. redo log

redo log叫做重做日志,是用来实现事务的持久性。该日志文件由两部分组成:重做日志缓冲(redo log buffer)以及重做日志文件(redo log),前者是在内存中,后者在磁盘中。

当事务提交之后会把所有修改信息都会存到该日志中。假设有个表叫做tb1(id,username) 现在要插入数据(3,ceshi)

image.png

start transaction; 
select balance from bank where name="zhangsan"; 
// 生成 重做日志 balance=600 
update bank set balance = balance - 400; 
// 生成 重做日志 amount=400 
update finance set amount = amount + 400; 
commit;

image.png

redo log 有什么作用?

mysql 为了提升性能不会把每次的修改都实时同步到磁盘,而是会先存到Boffer Pool(缓冲池)里头,把这个当作缓存来用。然后使用后台线程去做缓冲池和磁盘之间的同步

那么问题来了,如果还没来的同步的时候宕机或断电了怎么办?还没来得及执行上面图中红色的操作。这样会导致丢部分已提交事务的修改信息!

所以引入了redo log来记录已成功提交事务的修改信息,并且会把redo log持久化到磁盘,系统重启之后在读取redo log恢复最新数据。

总结:redo log是用来恢复数据的 用于保障,已提交事务的持久化特性。

2.2. undo log

undo log 叫做回滚日志,用于记录数据被修改前的信息。他正好跟前面所说的重做日志所记录的相反,重做日志记录数据被修改后的信息。undo log主要记录的是数据的逻辑变化,为了在发生错误时回滚之前的操作,需要将之前的操作都记录下来,然后在发生错误时才可以回滚。

还用上面那两张表

image.png

每次写入数据或者修改数据之前都会把修改前的信息记录到 undo log。

undo log 有什么作用?

undo log 记录事务修改之前版本的数据信息,因此假如由于系统错误或者rollback操作而回滚的话可以根据undo log的信息来进行回滚到没被修改前的状态。

总结:undo log是用来回滚数据的用于保障 未提交事务的原子性。

2.3. archive log

archive log(归档日志)是Oracle数据库中的概念,它其实是redo log的衍生物。

redo log file是LGWR进程从Oracle实例中的redo log buffer写入的,是循环利用的。就是说一个redo log file写满后,才写下一个。归档日志是当数据库运行在归档模式下时,一个redo log file写满后,由ARCn进程将重做日志的内容备份到归档日志文件下,然后这个redo log file才能被下一次使用。

不管数据库是否是归档模式,重做日志是肯定要写的。而只有数据库在归档模式下,重做日志才会备份,形成归档日志。

redo log是循环利用的,但是归档日志不是,它不断接收从redo log中写入的日志备份。因此到一定时间后,会导致数据库存储不够,影响数据库使用。我们一般都会执行一个定时脚本,在规定时间周期后,删掉保留周期前的归档日志文件。

3. 锁和MVCC基础

3.1. 锁

具体关于锁的知识,可以翻看前面的文章《数据库事务和锁》。简单来分,锁分为下面共享锁排他锁

  • 共享锁(shared lock),又叫做"读锁"

读锁是可以共享的,或者说多个读请求可以共享一把锁读数据,不会造成阻塞。

  • 排他锁(exclusive lock),又叫做"写锁"

写锁会排斥其他所有获取锁的请求,一直阻塞,直到写入完成释放锁。

因此通过读写锁,只有读读可以并行,但是写读写写都不能做到并行
事务的隔离性就是根据读写锁来实现的,这个后面再说。

3.2. MVCC基础

MVCC (MultiVersion Concurrency Control) 叫做多版本并发控制。

InnoDB的 MVCC ,是通过在每行记录的后面保存两个隐藏的列来实现的。这两个列,
一个保存了行的创建时间,一个保存了行的过期时间,
当然存储的并不是实际的时间值,而是系统版本号。

以上片段摘自《高性能Mysql》这本书对MVCC的定义。他的主要实现思想是通过数据多版本来做到读写分离。从而实现不加锁读进而做到读写并行

MVCC在mysql中的实现依赖的是undo log与read view

  • undo log :undo log 中记录某行数据的多个版本的数据。
  • read view :用来判断当前版本数据的可见性。

4. 事务的实现原理

前面讲的重做日志,回滚日志以及锁技术就是实现事务的基础。

  • 原子性:使用 undo log ,从而达到回滚。
  • 持久性:使用 redo log,从而达到故障后恢复。
  • 隔离性:使用锁以及MVCC,运用的优化思想有读写分离,读读并行,读写并行。
  • 一致性:一致性是通过原子性,持久性,隔离性来实现的。通过回滚,以及恢复,和在并发环境下的隔离做到一致性。

原子性,持久性,隔离性折腾半天的目的也是为了保障数据的一致性。总之,ACID只是个概念,事务最终目的是要保障数据的可靠性,一致性。

查看原文

赞 0 收藏 0 评论 0

KerryWu 发布了文章 · 9月22日

NIO优化原理和Tomcat线程模型

1、I/O阻塞

书上说BIO、NIO等都属于I/O模型,但是I/O模型这个范围有点含糊,我为此走了不少弯路。我们日常开发过程中涉及到NIO模型应用,如Tomcat、Netty中等线程模型,可以直接将其视为网络I/O模型。本文还是在基础篇章中介绍几种I/O模型方式,后面就默认只讲解网络I/O模型了。

1.1、I/O分类

BIO、NIO、AIO等都属于I/O模型,所以它们优化的都是系统I/O的性能,因此首先,我们要清楚常见的I/O有哪些分类:

I/O种类场景java中到应用
内存I/O从内存中读取数据,将数据写入内存线程从内存中将数据读取到工作空间,将值在工作空间完成更改后,将值由工作空间刷新到内存中(jmm)
磁盘I/O读取磁盘文件,写文件到磁盘线程从内存中将数据读取到工作空间,将值在工作空间完成更改后,将值由工作空间刷新到内存中(jmm)
网络I/O网络数据的读写和传输tcp/udp的抽象api即socket 通信 (java.net)

1.2、I/O过程和性能

I/O(Input/Output)即数据的输入/输出,为什么大家很关心I/O的性能呢?因为I/O存在的范围很广,在高并发的场景下,这部分性能会被无限放大。而且与业务无关,是可以有统一解决方案的。

所有的系统I/O都分为两个阶段:等待就绪和数据操作。举例来说,读函数,分为等待系统可读和真正的读;同理,写函数分为等待网卡可以写和真正的写:

  1. 等待就绪:等待数据就绪,一般是将数据加载到内核缓存区。无论是从磁盘、网络读取数据,程序能处理的都是进入内核态之后的数据,在这之前,cpu会阻塞住,等待数据进入内核态。
  2. 数据操作:数据就绪后,一般是将内核缓存中的数据加载到用户缓存区

需要说明的是等待就绪的阻塞是不使用CPU的,是在“空等”;而真正的读写操作的阻塞是使用CPU的,真正在”干活”,而且这个过程非常快,属于memory copy,带宽通常在1GB/s级别以上,可以理解为基本不耗时。这就出现一个奇怪的现象 -- 不使用CPU的“等待就绪”,却比实际使用CPU的“数据操作”,占用CPU时间更多

传统阻塞I/O模型,即在读写数据过程中会发生阻塞现象。当用户线程发出I/O请求之后,内核会去查看数据是否就绪,如果没有就绪就会等待数据就绪,而用户线程就会处于阻塞状态,用户线程交出CPU。当数据就绪之后,内核会将数据拷贝到用户线程,并返回结果给用户线程,用户线程才会解除block状态。

明确的是,让当前工作线程阻塞,等待数据就绪,是很浪费线程资源的事情,上述三种I/O都有一定的优化方案:

  • 磁盘I/O:现代电脑中都有一个DMA(Direct Memory Access 直接内存访问) 的外设组件,可以将I/O数据直接传送到主存储器中并且传输不需要CPU的参与,以此将CPU解放出来去完成其他的事情。
  • 网络I/O:NIO、AIO等I/O模型,通过向事件选择器注册I/O事件,基于就绪的事情来驱动执行I/O操作,避免的等待过程。
  • 内存I/O:内存部分没涉及到太多阻塞,优化点在于减少用户态和内核态之间的数据拷贝。nio中的零拷贝就有mmap和sendfile等实现方案。

1.3、网络I/O阻塞

这里仔细的讲讲网络I/O模型中的阻塞,即socket的阻塞。在计算机通信领域,socket 被翻译为“套接字”,它是计算机之间进行通信的一种约定或一种方式,是在tcp/ip协议上,抽象出来的一层网络通讯协议。

同上面I/O的过程一样,网络I/O也同样分成两个部分:

  1. 等待网络数据到达网卡,读取到内核缓冲区。
  2. 从内核缓冲区复制数据到用户态空间。

每个 socket 被创建后,都会分配两个缓冲区,输入缓冲区和输出缓冲区:

  • 输入缓冲区:当使用 read()/recv() 读取数据时,(1)首先会检查缓冲区,如果缓冲区中有数据,那么就读取,否则函数会被阻塞,直到网络上有数据到来。(2)如果要读取的数据长度小于缓冲区中的数据长度,那么就不能一次性将缓冲区中的所有数据读出,剩余数据将不断积压,直到有 read()/recv() 函数再次读取。(3)直到读取到数据后 read()/recv() 函数才会返回,否则就一直被阻塞。
  • 输出缓冲区:当使用 write()/send() 发送数据时,(1)首先会检查缓冲区,如果缓冲区的可用空间长度小于要发送的数据,那么 write()/send() 会被阻塞(暂停执行),直到缓冲区中的数据被发送到目标机器,腾出足够的空间,才唤醒 write()/send() 函数继续写入数据。(2) 如果TCP协议正在向网络发送数据,那么输出缓冲区会被锁定,不允许写入,write()/send() 也会被阻塞,直到数据发送完毕缓冲区解锁,write()/send() 才会被唤醒。(3)如果要写入的数据大于缓冲区的最大长度,那么将分批写入。(4)直到所有数据被写入缓冲区 write()/send() 才能返回。

由此可见在网络I/O中,会有很多的因素导致数据的读取和写入过程出现阻塞,创建socket连接也一样。socket.accept()、socket.read()、socket.write()这类函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,该线程当前的cpu时间片就浪费了。

2、阻塞优化

2.1、BIO、NIO、AIO

BIO、NIO、AIO对比

以socket.read()为例子:

  • 传统的BIO里面socket.read(),如果TCP RecvBuffer里没有数据,函数会一直阻塞,直到收到数据,返回读到的数据。
  • 对于NIO,如果TCP RecvBuffer有数据,就把数据从网卡读到内存,并且返回给用户;反之则直接返回0,永远不会阻塞。
  • 最新的AIO(Async I/O)里面会更进一步:不但等待就绪是非阻塞的,就连数据从网卡到内存的过程也是异步的。

换句话说,BIO里用户最关心“我要读”,NIO里用户最关心”我可以读了”,在AIO模型里用户更需要关注的是“读完了”。

NIO

NIO的优化体现在两个方面:

  1. 网络I/O模式的优化,通过非阻塞的模式,提高了CPU的使用性能。
  2. 内存I/O的优化,零拷贝等方式,让数据在内核态和用户态之前的传输消耗降低了。

NIO一个重要的特点是:socket主要的读、写、注册和接收函数,在等待就绪阶段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但性能非常高)

NIO的主要事件有几个:读就绪、写就绪、有新连接到来。

我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。对于写操作,就是写不出去的时候对写事件感兴趣;对于读操作,就是完成连接和系统没有办法承载新读入的数据的时;对于accept,一般是服务器刚启动的时候;而对于connect,一般是connect失败需要重连或者直接异步调用connect的时候。

其次,用一个死循环选择就绪的事件,会执行系统调用(Linux 2.6之前是select、poll,2.6之后是epoll,Windows是IOCP),还会阻塞的等待新事件的到来。新事件到来的时候,会在selector上注册标记位,标示可读、可写或者有连接到来。

2.2、Reactor模式

Reactor模式称之为响应器模式,通常用于NIO非阻塞IO的网络通信框架中。Reactor设计模式用于处理由一个或多个客户端并发传递给应用程序的的服务请求,可以理解成,Reactor模式是用来实现网络NIO的方式

Reactor是一种事件驱动机制,是处理并发I/O常见的一种模式,用于同步I/O,其中心思想是将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程阻塞在多路复用器上,一旦有I/O事件到来或是准备就绪,多路复用器将返回并将相应I/O事件分发到对应的处理器中。

Reactor模式主要分为下面三个部分:

  1. 事件接收器Acceptor:主要负责接收请求连接,接收请求后,会将建立的连接注册到分离器中。
  2. 事件分离器Reactor:依赖于循环监听多路复用器Selector,是阻塞的,一旦监听到事件,就会将事件分发到事件处理器。(例如:监听读事件,等到内核态数据就绪后,将事件分发到Handler,Handler将数据读到用户态再做处理)
  3. 事件处理器Handler:事件处理器主要完成相关的事件处理,比如读写I/O操作。

2.3、三种Reactor模式

单线程Reactor模式

一个线程:

  • 单线程:建立连接(Acceptor)、监听accept、read、write事件(Reactor)、处理事件(Handler)都只用一个单线程。
多线程Reactor模式

一个线程 + 一个线程池:

  • 单线程:建立连接(Acceptor)和 监听accept、read、write事件(Reactor),复用一个线程。
  • 工作线程池:处理事件(Handler),由一个工作线程池来执行业务逻辑,包括数据就绪后,用户态的数据读写。
主从Reactor模式

三个线程池:

  • 主线程池:建立连接(Acceptor),并且将accept事件注册到从线程池。
  • 从线程池:监听accept、read、write事件(Reactor),包括等待数据就绪时,内核态的数据I读写。
  • 工作线程池:处理事件(Handler),由一个工作线程池来执行业务逻辑,包括数据就绪后,用户态的数据读写。

3、Tomcat线程模型

3.1、Api网络请求过程

我们先补一下基础知识,讲解后端接口的响应过程。一个http连接里,完整的网络处理过程一般分为accept、read、decode、process、encode、send这几步:

  1. accept:接收客户端的连接请求,创建socket连接(tcp三次握手,创建连接)。
  2. read:从socket读取数据,包括等待读就绪,和实际读数据。
  3. decode:解码,因为网络上的数据都是以byte的形式进行传输的,要想获取真正的请求,必定需要解码。
  4. process:业务处理,即服务端程序的业务逻辑实现。
  5. encode:编码,同理,因为网络上的数据都是以byte的形式进行传输的,也就是socket只接收byte,所以必定需要编码。
  6. send:往网络socket写回数据,包括实际写数据,和等待写就绪。

3.2、各个线程模型

在tomcat的各个版本中,所支持的线程模型也发生了一步步演变。一方面,直接将默认线程模型,从BIO变成了NIO。另一方面,在后续几个版本中,加入了对AIO和APR线程模型的支持,这里要注意,仅仅是支持,而非默认线程模型。

  • BIO:阻塞式IO,tomcat7之前默认,采用传统的java IO进行操作,该模式下每个请求都会创建一个线程,适用于并发量小的场景。
  • NIO:同步非阻塞,比传统BIO能更好的支持大并发,tomcat 8.0 后默认采用该模式。
  • AIO:异步非阻塞 (NIO2),tomcat8.0后支持。多用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂。
  • APR:tomcat 以JNI形式调用http服务器的核心动态链接库来处理文件读取或网络传输操作,需要编译安装APR库(也就是说IO操作的部分直接调用native代码实现)。

各个线程模型中,NIO是作为目前最实用的线程模型,因此也是目前Tomcat默认的线程模型,因此本文对此着重讲解。

3.3、BIO和NIO

BIO模型

在BIO模型中,主要参与的角色有:AcceptorHandler工作线程池。对应于前文中Api的请求过程,它们的分工如下:

  • Acceptor:Accepter线程专门负责建立网络连接(accept)。新连接创建后,交给Handler工作线程池处理请求。
  • Handlers:针对每个请求的连接,Handler工作线程池都会分配一个线程,执行后面的所有步骤(read、decode、process、encode、send)。

前文的知识点有铺垫,readsend是面向网络I/O的,在等待读写就绪过程中,其实是CPU阻塞的。因此Handler工作线程池中的每个线程,都会因为I/O阻塞而“空等待”,造成浪费。

NIO模型

tomcat的NIO模型,相比较于BIO模型,多了个Poller角色:AcceptorPollerHandler工作线程池。这三个角色是不是很熟悉,如果将Poller换成Reactor,是不是就是Reactor模型。没错,tomcat的nio模型,的确就是基于主从Reactor模型,只不过将Reactor换了个名字而已。

  • Acceptor:Accepter线程专门负责建立网络连接(accept)。新连接创建后,不是直接使用Worker线程处理请求,而是先将请求发送给Poller缓冲队列。
  • Poller:在Poller中,维护了一个Selector对象,当Poller从缓冲队列中取出连接后,注册到该Selector中,阻塞等待读写就绪(read等待就绪、send等待就绪)。
  • Handlers:遍历Selector,找出其中就绪的IO操作,并交给Worker线程处理(read内存读、decode、process、encode、send内存写)。
对比
  • BIO模型中,一个线程对应一个请求连接的完整过程,因此tomcat服务能处理的最大连接数,和最大线程数一致。
  • NIO模型中,在一个请求连接中,对应的一个工作线程,只处理I/O读写就绪后的非阻塞过程。因此tomcat服务能处理的最大连接数,要远大于最大线程数量。

3.4、参数设置

针对于tomcat的nio模型,可以做一些参数设置。因为springboot是内嵌tomcat的,这些参数设置同样可以在properties配置文件中定义:

  • 最大线程数(server.tomcat.threads.max):工作线程池的最大线程数,默认200。注意不是越大越好,如果线程数过大,那么CPU会花费大量的时间用于线程的切换,整体效率会降低。
  • 最小线程数(server.tomcat.threads.min-spare):工作线程池的最小线程数,默认10。
  • 最大等待数(server.tomcat.accept-count):当调用HTTP请求数达到tomcat的最大线程数时,还有新的HTTP请求到来,这时tomcat会将该请求放在等待队列中,这个acceptCount就是指能够接受的最大等待数,默认100。如果等待队列也被放满了,这个时候再来新的请求就会被tomcat拒绝。
  • 最大连接数(server.tomcat.max-connections):在同一时间,tomcat能够接受的最大连接数,默认8192。

4、常见问题

1、tomcat运行后,出现 nio-8080-exec-前缀的线程作用是什么?

是工作线程池中的线程。你们可以观察某个springboot运行项目的线程模型,由于基本都是基于nio模型的tomcat应用,因此都包括这些线程:

  • 1个名称中包含Accepter的线程。
  • 2个名称中包含Poller的线程。
  • 10个工作线程,名称从 nio-8080-exec-1 到 nio-8080-exec-10。如果并发交高,默认最多有200个线程,名称到 nio-8080-exec-200。
2、tomcat中nio模型中,存在poller单线程读取多个请求线程的数据,会不会出现线程安全问题?因为通过会使用ThreadLocal存储请求用户身份信息。

不会。因为poller只是处理等待读就绪的环节,一旦读就绪事件触发后,真正的读取数据和处理业务逻辑,都是由工作线程池中的某个线程跟到底,可以放心大胆使用ThreadLocal。

3、为什么我自己对比测试nio和bio,性能提升不大?

nio线程模型优化的是线程利用率,为了在高并发场景下,基于有限的线程资源,处理更多的请求连接。

例如:tomcat使用默认最大线程数200,但你的并发请求数量连200都不到,就算是BIO模型,线程池中200个线程都没利用完。这时候你用NIO还是BIO,区别不大,甚至BIO模型处理还更快一些。但如果你的并发请求数到了2000、20000,BIO模型就会出现性能瓶颈了,超过200的请求都会阻塞住,而NIO模型就能大展身手。

查看原文

赞 0 收藏 0 评论 0

KerryWu 发布了文章 · 9月11日

gitflow工作流

1. 工作流程

项目开始(master、develop)
  1. 开发组长,基于master主干创建develop分支。master和develop就作为仓库的两个主干,并且将它们的加权限限制,只有管理员可修改。
开发阶段(master、develop、feature)
  1. 基于develop创建多个feature分支(如:feature/login),对应功能模块的开发人员,基于该分支开发新功能。
  2. 开发人员,测试新功能完成以后,在git上发起pull request把代码合并到到develop分支上。
  3. 开发组长,在确认代码没有问题后,通过该pull request 的合并请求。当所有的功能都开发完了,所有的feature分支都合并到develop上。
测试阶段-开始测试(master、develop、release)
  1. 开发组长,基于develop分支创建一个release预发布分支(如:release/1.0.0),并设置release分支只有管理员有权限修改。
  2. 基于release/1.0.0分支的代码进行测试。
测试阶段-测试中(master、develop、release、bugfix)
  1. 当测试发现bug时,基于当前release/1.0.0分支创建bugfix分支(如:bugfix/问题编号),开发人员基于该bugfix分支进行bug修复。
  2. 开发人员在bug修复后,向release/1.0.0分支提交 pull request 申请。
  3. 开发组长在确认bug修复完成后,通过该pull request 的合并请求。所有bug修复完成后,当前release版本下,所有bugfix分支都合并到release/1.0.0上。
测试阶段-测试完毕(master、develop、tag
  1. 开发组长发起pull request,把release/1.0.0代码合并到到master分支上。
  2. 基于master分支创建一个里程碑版本(tag)名为1.0.0-Release,并且在github上发布一个Release,可以将当前master代码以及相关包存档。
  3. 原则上只需保留master、develop两个分支,和1.0.0-Release里程碑版本(tag)。删除完成使命的其他分支:多个feature分支、多个bugfix分支和release/1.0.0。
线上bug-master(master、develop、hotfix)
  1. 线上版本出现bug,可以基于最新版本(master)进行修复,可以基于master创建hotfix分支(如:hotfix/问题编号),开发人员基于该hotfix分支进行bug修复。
  2. 开发人员在bug修复后,向master分支提交 pull request 申请。
  3. 开发组长在确认bug修复完成后,通过该pull request 的合并请求。基于master分支创建一个里程碑修复版本(tag),假如当前版本为1.2.0-Release,则修复版本为1.2.1-Release。
线上bug-历史版本(master、develop、tag、hotfix)
  1. 用户基于某个里程碑版本tag(如:1.0.0-Release)提出bug,创建一个issue。如果master版本已经发布到1.0.0以后了(如:1.2.0-Release),但是该bug修复只能基于历史的1.0.0修复,就需要基于该里程碑版本(tag)1.0.0-Release,创建一个release/1.0.1分支,和hotfix分支(如:hotfix/问题编号),开发人员基于该分支修复bug。
  2. 开发人员在bug修复后,向release/1.0.1分支提交 pull request 申请。
  3. 开发组长在确认bug修复完成后,通过该pull request 的合并请求。基于release/1.0.1分支创建一个里程碑修复版本(tag),名为1.0.1-Release。

image

示例代码(release/*)

# 开始 release

git checkout -b release/0.0.1 develop

# 完成 release

git checkout master

git merge --no-ff release/0.0.1

git push

git checkout develop

git merge --no-ff release/0.0.1

git push

git branch -d release/0.0.1

git push origin --delete release/0.0.1 

# 合并master/devlop分支之后,打上tag 

git tag -a v0.1.0 master

git push --tags

2. 分支说明

master
  • master 分支为主分支,用于部署生产环境,需要确保master分支的稳定性。
  • 此分支属于只读分支,只能从 release或hotfix 分支合并过来,任何时候都不能在此分支修改代码。
  • 所有向master分支的推送,都要打上tag标签记录,方便追溯。
  • 此分支只能前进,不能有回退操作。
develop
  • develop 为开发环境主干分支,基于 master 分支检出。
  • 此分支为只读分支,只能从master、release、feature分支合并过来,任何时候都不能在此分支修改代码。
  • 此分支只能由开发人员提交 pull request(需要 code review),或者由管理员 merge release 分支。
  • 在一个 release 分支没有创建出来时,develop 分支不能合并不包含 release 功能范围的 feature 分支。develop 分支在特殊情况下可以回退版本。
release/*
  • release 分支为预上线分支,基于 develop 或 master 分支检出。用于准备发布新阶段版本或者修复线上bug版本。
  • 此分支用于上线前bug测试,文档生成和其他面向发布任务。
  • 此分支属于只读分支,只能由 master 分支或者 develop 分支检出,或者从 bugfix 分支、hotfix 分支合并过来,任何时候都不能在此分支修改代码。
  • 此分支属于临时分支,在发布提测阶段,会以 release 分支代码为基准提测。当 release 分支测试验证通过后,最终会先被合并到 master 分支(发布新版本或者修复线上bug,要打tag标签),再被合并到 develop 分支(使其与 master 分支保持一致),最后删除此分支。
  • 命名:release/<版本号>(例:release/1.0.0)
feature/*
  • feature 分支为功能开发分支,由开发人员基于 develop 分支创建 feature/<功能模块> 分支。
  • 此分支用于新功能开发,一个 feature 分支最大粒度只能到模块。
  • 此分支为临时分支,最终会被合并到 develop 分支(新增功能),或者删除(放弃功能)。
  • 此分支通常仅存在于开发人员本地存储库中,而不存在与远程origin。
  • 一个新功能开发完成后,且在开发集成环境自测通过、单元测试通过、Sona扫描通过后,才能向 develop 分支提交 pull request (需要 code review)。
bugfix/*
  • 预上线 bug 修复分支,基于 release 分支检出。
  • 此分支用于上线前bug修复。
  • 此分支属于临时分支,当提测阶段中存在 bug 需要修复,由开发人员基于 release 分支创建 bugfix/<bug名字> 分支,然后在 bugfix/<bug名字> 分支进行修复 bug 。 bug 修复完成后,并且在开发集成环境自测通过、单元测试通过、Sona扫描通过后,再向 release 分支提交 pull request 申请。bug修复完成 release 分支测试通过之后可删除此分支。
hotfix/*
  • 生产环境 bug 修复分支,基于 master 分支检出。
  • 属于临时分支,当生产环境出现 bug ,管理员基于 tag 创建 hotfix/<bug名字> 分支、 release/<版本号> 分支,由开发人员在hotfix分支修复bug,修复完成后,并且在开发集成环境自测通过、单元测试通过、Sona扫描通过后,再向 release 分支提交 pull request 申请。bug修复完成上线之后可删除此分支。
总结
  • 只有 master 和 develop 两个分支是主干分支,一直存在的。其他分支都是功能性分支,在完成历史使命后会,可以被删除。
  • master、develop 和 release/* 三种分支是被锁住的只读分支,只有管理员有权限修改。只能合并,不能直接提交,其他人想要合并只能提交 pull request。

3. 版本号

在前文很多地方都涉及到版本号,例如:release/* 分支、提交tags和发布Release版本等,版本号的命名也需要有一定的规范。

版本号(公开)

对外正式发布的版本号,一般只需要通过三位数字的版本号:主版本号.次版本号.修订号

  • 主版本号:做了一些不兼容的API修改,可以理解为一个大的产品更新。
  • 次版本号:新增了一些功能,可以理解为合并了一个feature。
  • 修订号:修复了一些bug,可以理解为合并了一个hotfix。
版本号(测试)

假如我们想要发布 1.0.1 版本,在开发团队内部,可能会提交多次的测试版本,交付给测试人员测试。这时候需要基于 1.0.1 版本号后面,加上一些其他的版本号。

  • alpha:内测版,一般不向外部发布,bug会比较多,功能也不全,一般只有测试人员使用。
  • beta:公测版,该版本仍然存在很多bug,但比alpha版本稳定一些。这个阶段版本还会不断增加新功能。
  • RC:发行候选版本,基本不再加入新的功能,主要修复bug。是最终发布成正式版的前一个版本,将bug修改完就可以发布成正式版了。
  • Release:正式发布版,官方推荐使用的版本。在正式发布的时候,可以不加Release,即 1.0.1 等同于 1.0.1-Release

可能基于这些版本号还有更细致的划分,这个可以根据实际项目情况调整。例如:v1.0.0-beta.1、v1.0.0-beta.2,或v1.0.0-200910-beta等。

查看原文

赞 1 收藏 0 评论 0

认证与成就

  • 获得 100 次点赞
  • 获得 2 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 2 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

注册于 2018-07-24
个人主页被 2.6k 人浏览