ttqtc

ttqtc 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

ttqtc 提出了问题 · 9月16日

json内的字符串形式的json如何反序列化

在一个json中,有个字符串形式的值,但这个值("{b:1}")本身也是个json,如何在反序列化的时候,将这个字符串形式的值(即a)转为json对象

原始json对象

{a:"{b:1}"}

希望反序列化成的json对象

{a:{b: 1}}

关注 5 回答 4

ttqtc 提出了问题 · 9月14日

引入的JAR包内逻辑有错误,该如何适配?

在我的工程中引入了一个依赖的jar包,但是后来发现这个包内有段逻辑写的有问题,在不修改这个外部依赖的情况下,如何在本工程内覆盖或者修复相应逻辑呢?

关注 3 回答 2

ttqtc 提出了问题 · 4月14日

如何将一批数据同时置为无效?

问题描述

数据库中现在有很大一批数据,每天00:00执行一次定时任务,将一部分数据的状态字段写为无效,但是定时任务需要运行一段时间(例如五分钟才能全部执行完),也就是在定时任务刚开始执行时的数据是准时(00:00)被修改了,但是在排在这个任务中后面的数据就没办法准时(00:00)被修改了(后面的数据修改时间可能是00:05)

有什么办法能让这个任务中的数据,同时在00:00时候被修改呢?

关注 5 回答 3

ttqtc 收藏了文章 · 1月16日

真的,Kafka 入门一篇文章就够了

image.png

初识 Kafka

什么是 Kafka

Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统

Kafka 的基本术语

消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。

批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。

主题:消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。

分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

image.png

生产者: 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。

消费者:订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。

消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。

image.png

偏移量:偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

broker: 一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

broker 集群:broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。

副本:Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。

重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

Kafka 的特性(设计原则)

  • 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
  • 高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
  • 持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
  • 容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
  • 高并发: 支持数千个客户端同时读写

Kafka 的使用场景

  • 活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
  • 传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
  • 度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 流式处理:流式处理是有一个能够提供多种应用程序的领域。
  • 限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。

Kafka 的消息队列

Kafka 的消息队列一般分为两种模式:点对点模式和发布订阅模式

Kafka 是支持消费者群组的,也就是说 Kafka 中会有一个或者多个消费者,如果一个生产者生产的消息由一个消费者进行消费的话,那么这种模式就是点对点模式

image.png

如果一个生产者或者多个生产者产生的消息能够被多个消费者同时消费的情况,这样的消息队列成为发布订阅模式的消息队列

image.png

Kafka 系统架构

image.png

如上图所示,一个典型的 Kafka 集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

核心 API

Kafka 有四个核心API,它们分别是

  • Producer API,它允许应用程序向一个或多个 topics 上发送消息记录
  • Consumer API,允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
  • Streams API,它允许应用程序作为流处理器,从一个或多个主题中消费输入流并为其生成输出流,有效的将输入流转换为输出流。
  • Connector API,它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者。例如,关系数据库的连接器可能会捕获对表的所有更改

image.png

Kafka 为何如此之快

Kafka 实现了零拷贝原理来快速移动数据,避免了内核之间的切换。Kafka 可以将数据记录分批发送,从生产者到文件系统(Kafka 主题日志)到消费者,可以端到端的查看这些批次的数据。

批处理能够进行更有效的数据压缩并减少 I/O 延迟,Kafka 采取顺序写入磁盘的方式,避免了随机磁盘寻址的浪费,更多关于磁盘寻址的了解,请参阅 程序员需要了解的硬核知识之磁盘

总结一下其实就是四个要点

  • 顺序读写
  • 零拷贝
  • 消息压缩
  • 分批发送

Kafka 安装和重要配置

Kafka 安装我在 Kafka 系列第一篇应该比较详细了,详情见带你涨姿势的认识一下kafka 这篇文章。

那我们还是主要来说一下 Kafka 中的重要参数配置吧,这些参数对 Kafka 来说是非常重要的。

broker 端配置

  • broker.id

每个 kafka broker 都有一个唯一的标识来表示,这个唯一的标识符即是 broker.id,它的默认值是 0。这个值在 kafka 集群中必须是唯一的,这个值可以任意设定,

  • port

如果使用配置样本来启动 kafka,它会监听 9092 端口。修改 port 配置参数可以把它设置成任意的端口。要注意,如果使用 1024 以下的端口,需要使用 root 权限启动 kakfa。

  • zookeeper.connect

用于保存 broker 元数据的 Zookeeper 地址是通过 zookeeper.connect 来指定的。比如我可以这么指定 localhost:2181 表示这个 Zookeeper 是运行在本地 2181 端口上的。我们也可以通过 比如我们可以通过 zk1:2181,zk2:2181,zk3:2181 来指定 zookeeper.connect 的多个参数值。该配置参数是用冒号分割的一组 hostname:port/path 列表,其含义如下

hostname 是 Zookeeper 服务器的机器名或者 ip 地址。

port 是 Zookeeper 客户端的端口号

/path 是可选择的 Zookeeper 路径,Kafka 路径是使用了 chroot 环境,如果不指定默认使用跟路径。

如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2
  • log.dirs

Kafka 把所有的消息都保存到磁盘上,存放这些日志片段的目录是通过 log.dirs 来制定的,它是用一组逗号来分割的本地系统路径,log.dirs 是没有默认值的,你必须手动指定他的默认值。其实还有一个参数是 log.dir,如你所知,这个配置是没有 s 的,默认情况下只用配置 log.dirs 就好了,比如你可以通过 /home/kafka1,/home/kafka2,/home/kafka3 这样来配置这个参数的值。

  • num.recovery.threads.per.data.dir

对于如下3种情况,Kafka 会使用可配置的线程池来处理日志片段。

服务器正常启动,用于打开每个分区的日志片段;

服务器崩溃后重启,用于检查和截断每个分区的日志片段;

服务器正常关闭,用于关闭日志片段。

默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到井行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。

  • auto.create.topics.enable

默认情况下,kafka 会使用三种方式来自动创建主题,下面是三种情况:

当一个生产者开始往主题写入消息时

当一个消费者开始从主题读取消息时

当任意一个客户端向主题发送元数据请求时

auto.create.topics.enable参数我建议最好设置成 false,即不允许自动创建 Topic。在我们的线上环境里面有很多名字稀奇古怪的 Topic,我想大概都是因为该参数被设置成了 true 的缘故。

主题默认配置

Kafka 为新创建的主题提供了很多默认配置参数,下面就来一起认识一下这些参数

  • num.partitions

num.partitions 参数指定了新创建的主题需要包含多少个分区。如果启用了主题自动创建功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。

  • default.replication.factor

这个参数比较简单,它表示 kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务default.replication.factor 的默认值为1,这个参数在你启用了主题自动创建功能后有效。

  • log.retention.ms

Kafka 通常根据时间来决定数据可以保留多久。默认使用 log.retention.hours 参数来配置时间,默认是 168 个小时,也就是一周。除此之外,还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数作用是一样的,都是决定消息多久以后被删除,推荐使用 log.retention.ms。

  • log.retention.bytes

另一种保留消息的方式是判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设置为 1GB,那么这个主题最多可以保留 8GB 数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。

  • log.segment.bytes

上述的日志都是作用在日志片段上,而不是作用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片段上,当日志片段大小到达 log.segment.bytes 指定上限(默认为 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就越会频繁的关闭和分配新文件,从而降低磁盘写入的整体效率。

  • log.segment.ms

上面提到日志片段经关闭后需等待过期,那么 log.segment.ms 这个参数就是指定日志多长时间被关闭的参数和,log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片段会在大小或时间到达上限时被关闭,就看哪个条件先得到满足。

  • message.max.bytes

broker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认是 1000 000, 也就是 1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误消息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 mesage.max.bytes,那么消息的实际大小可以大于这个值

这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。

  • retention.ms

规定了该主题消息被保存的时常,默认是7天,即该主题只能保存7天的消息,一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。

  • retention.bytes

retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。

JVM 参数配置

JDK 版本一般推荐直接使用 JDK1.8,这个版本也是现在中国大部分程序员的首选版本。

说到 JVM 端设置,就绕不开这个话题,业界最推崇的一种设置方式就是直接将 JVM 堆大小设置为 6GB,这样会避免很多 Bug 出现。

JVM 端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的 GC 设置。如果你依然在使用 Java 7,那么可以根据以下法则选择合适的垃圾回收器:

  • 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC
  • 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC

当然了,如果你已经在使用 Java 8 了,那么就用默认的 G1 收集器就好了。在没有任何调优的情况下,G1 表现得要比 CMS 出色,主要体现在更少的 Full GC,需要调整的参数更少等,所以使用 G1 就好了。

一般 G1 的调整只需要这两个参数即可

  • MaxGCPauseMillis

该参数指定每次垃圾回收默认的停顿时间。该值不是固定的,G1可以根据需要使用更长的时间。它的默认值是 200ms,也就是说,每一轮垃圾回收大概需要200 ms 的时间。

  • InitiatingHeapOccupancyPercent

该参数指定了 G1 启动新一轮垃圾回收之前可以使用的堆内存百分比,默认值是45,这就表明G1在堆使用率到达45之前不会启用垃圾回收。这个百分比包括新生代和老年代。

Kafka Producer

在 Kafka 中,我们把产生消息的那一方称为生产者,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka 后台,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka 后台,然后淘宝会根据你的爱好做智能推荐,致使你的钱包从来都禁不住诱惑,那么这些生产者产生的消息是怎么传到 Kafka 应用程序的呢?发送过程是怎么样的呢?

尽管消息的产生非常简单,但是消息的发送过程还是比较复杂的,如图

我们从创建一个ProducerRecord 对象开始,ProducerRecord 是 Kafka 中的一个核心类,它代表了一组 Kafka 需要发送的 key/value 键值对,它由记录要发送到的主题名称(Topic Name),可选的分区号(Partition Number)以及可选的键值对构成。

在发送 ProducerRecord 时,我们需要将键值对对象由序列化器转换为字节数组,这样它们才能够在网络上传输。然后消息到达了分区器。

如果发送过程中指定了有效的分区号,那么在发送记录时将使用该分区。如果发送过程中未指定分区,则将使用key 的 hash 函数映射指定一个分区。如果发送的过程中既没有分区号也没有,则将以循环的方式分配一个分区。选好分区后,生产者就知道向哪个主题和分区发送数据了。

ProducerRecord 还有关联的时间戳,如果用户没有提供时间戳,那么生产者将会在记录中使用当前的时间作为时间戳。Kafka 最终使用的时间戳取决于 topic 主题配置的时间戳类型。

  • 如果将主题配置为使用 CreateTime,则生产者记录中的时间戳将由 broker 使用。
  • 如果将主题配置为使用LogAppendTime,则生产者记录中的时间戳在将消息添加到其日志中时,将由 broker 重写。

然后,这条消息被存放在一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。由一个独立的线程负责把它们发到 Kafka Broker 上。

Kafka Broker 在收到消息时会返回一个响应,如果写入成功,会返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量,上面两种的时间戳类型也会返回给用户。如果写入失败,会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败的话,就返回错误消息。

创建 Kafka 生产者

要向 Kafka 写入消息,首先需要创建一个生产者对象,并设置一些属性。Kafka 生产者有3个必选的属性

  • bootstrap.servers

该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他的 broker 信息。不过建议至少要提供两个 broker 信息,一旦其中一个宕机,生产者仍然能够连接到集群上。

  • key.serializer

broker 需要接收到序列化之后的 key/value 值,所以生产者发送的消息需要经过序列化之后才传递给 Kafka Broker。生产者需要知道采用何种方式把 Java 对象转换为字节数组。key.serializer 必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使用这个类把键对象序列化为字节数组。这里拓展一下 Serializer 类

Serializer 是一个接口,它表示类将会采用何种方式序列化,它的作用是把对象转换为字节,实现了 Serializer 接口的类主要有 ByteArraySerializerStringSerializerIntegerSerializer ,其中 ByteArraySerialize 是 Kafka 默认使用的序列化器,其他的序列化器还有很多,你可以通过 这里 查看其他序列化器。要注意的一点:key.serializer 是必须要设置的,即使你打算只发送值的内容

  • value.serializer

与 key.serializer 一样,value.serializer 指定的类会将值序列化。

下面代码演示了如何创建一个 Kafka 生产者,这里只指定了必要的属性,其他使用默认的配置

private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);

来解释一下这段代码

  • 首先创建了一个 Properties 对象
  • 使用 StringSerializer 序列化器序列化 key / value 键值对
  • 在这里我们创建了一个新的生产者对象,并为键值设置了恰当的类型,然后把 Properties 对象传递给他。

Kafka 消息发送

实例化生产者对象后,接下来就可以开始发送消息了,发送消息主要由下面几种方式

简单消息发送

Kafka 最简单的消息发送如下:

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

producer.send(record);

代码中生产者(producer)的 send() 方法需要把 ProducerRecord 的对象作为参数进行发送,ProducerRecord 有很多构造函数,这个我们下面讨论,这里调用的是

public ProducerRecord(String topic, K key, V value) {}

这个构造函数,需要传递的是 topic主题,key 和 value。

把对应的参数传递完成后,生产者调用 send() 方法发送消息(ProducerRecord对象)。我们可以从生产者的架构图中看出,消息是先被写入分区中的缓冲区中,然后分批次发送给 Kafka Broker。

发送成功后,send() 方法会返回一个 Future(java.util.concurrent) 对象,Future 对象的类型是 RecordMetadata 类型,我们上面这段代码没有考虑返回值,所以没有生成对应的 Future 对象,所以没有办法知道消息是否发送成功。如果不是很重要的信息或者对结果不会产生影响的信息,可以使用这种方式进行发送。

我们可以忽略发送消息时可能发生的错误或者在服务器端可能发生的错误,但在消息发送之前,生产者还可能发生其他的异常。这些异常有可能是 SerializationException(序列化失败)BufferedExhaustedException 或 TimeoutException(说明缓冲区已满),又或是 InterruptedException(说明发送线程被中断)

同步发送消息

第二种消息发送机制如下所示

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace();
}

这种发送消息的方式较上面的发送方式有了改进,首先调用 send() 方法,然后再调用 get() 方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常,如果没有发生错误,我们会得到 RecordMetadata 对象,可以用它来查看消息记录。

生产者(KafkaProducer)在发送的过程中会出现两类错误:其中一类是重试错误,这类错误可以通过重发消息来解决。比如连接的错误,可以通过再次建立连接来解决;无错误则可以通过重新为分区选举首领来解决。KafkaProducer 被配置为自动重试,如果多次重试后仍无法解决问题,则会抛出重试异常。另一类错误是无法通过重试来解决的,比如消息过大对于这类错误,KafkaProducer 不会进行重试,直接抛出异常。

异步发送消息

同步发送消息都有个问题,那就是同一时间只能有一个消息在发送,这会造成许多消息无法直接发送,造成消息滞后,无法发挥效益最大化。

比如消息在应用程序和 Kafka 集群之间一个来回需要 10ms。如果发送完每个消息后都等待响应的话,那么发送100个消息需要 1 秒,但是如果是异步方式的话,发送 100 条消息所需要的时间就会少很多很多。大多数时候,虽然Kafka 会返回 RecordMetadata 消息,但是我们并不需要等待响应。

为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回掉支持。下面是回调的一个例子

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
        producer.send(producerRecord,new DemoProducerCallBack());


class DemoProducerCallBack implements Callback {

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception != null){
      exception.printStackTrace();;
    }
  }
}

首先实现回调需要定义一个实现了org.apache.kafka.clients.producer.Callback的类,这个接口只有一个 onCompletion方法。如果 kafka 返回一个错误,onCompletion 方法会抛出一个非空(non null)异常,这里我们只是简单的把它打印出来,如果是生产环境需要更详细的处理,然后在 send() 方法发送的时候传递一个 Callback 回调的对象。

生产者分区机制

Kafka 对于数据的读写是以分区为粒度的,分区可以分布在多个主机(Broker)中,这样每个节点能够实现独立的数据写入和读取,并且能够通过增加新的节点来增加 Kafka 集群的吞吐量,通过分区部署在多个 Broker 来实现负载均衡的效果。

上面我们介绍了生产者的发送方式有三种:不管结果如何直接发送发送并返回结果发送并回调。由于消息是存在主题(topic)的分区(partition)中的,所以当 Producer 生产者发送产生一条消息发给 topic 的时候,你如何判断这条消息会存在哪个分区中呢?

这其实就设计到 Kafka 的分区机制了。

分区策略

Kafka 的分区策略指的就是将生产者发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略。

如果要自定义分区策略的话,你需要显示配置生产者端的参数 Partitioner.class,我们可以看一下这个类它位于 org.apache.kafka.clients.producer 包下

public interface Partitioner extends Configurable, Closeable {
  
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

  public void close();
  
  default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}

Partitioner 类有三个方法,分别来解释一下

  • partition(): 这个类有几个参数: topic,表示需要传递的主题;key 表示消息中的键值;keyBytes表示分区中序列化过后的key,byte数组的形式传递;value 表示消息的 value 值;valueBytes 表示分区中序列化后的值数组;cluster表示当前集群的原数据。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
  • close() : 继承了 Closeable 接口能够实现 close() 方法,在分区关闭时调用。
  • onNewBatch(): 表示通知分区程序用来创建新的批次

其中与分区策略息息相关的就是 partition() 方法了,分区策略有下面这几种

顺序轮询

顺序分配,消息是均匀的分配给每个 partition,即每个分区存储一次消息。就像下面这样

image.png

上图表示的就是轮询策略,轮训策略是 Kafka Producer 提供的默认策略,如果你不使用指定的轮训策略的话,Kafka 默认会使用顺序轮训策略的方式。

随机轮询

随机轮询简而言之就是随机的向 partition 中保存消息,如下图所示

实现随机分配的代码只需要两行,如下

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。

本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。

按照 key 进行消息保存

这个策略也叫做 key-ordering 策略,Kafka 中每条消息都会有自己的key,一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示

实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

上面这几种分区策略都是比较基础的策略,除此之外,你还可以自定义分区策略。

生产者压缩机制

压缩一词简单来讲就是一种互换思想,它是一种经典的用 CPU 时间去换磁盘空间或者 I/O 传输量的思想,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。如果你还不了解的话我希望你先读完这篇文章 程序员需要了解的硬核知识之压缩算法,然后你就明白压缩是怎么回事了。

Kafka 压缩是什么

Kafka 的消息分为两层:消息集合 和 消息。一个消息集合中包含若干条日志项,而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。

在 Kafka 中,压缩会发生在两个地方:Kafka Producer 和 Kafka Consumer,为什么启用压缩?说白了就是消息太大,需要变小一点 来使消息发的更快一些。

Kafka Producer 中使用 compression.type 来开启压缩

private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String,String> record =
  new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");

上面代码表明该 Producer 的压缩算法使用的是 GZIP

有压缩必有解压缩,Producer 使用压缩算法压缩消息后并发送给服务器后,由 Consumer 消费者进行解压缩,因为采用的何种压缩算法是随着 key、value 一起发送过去的,所以消费者知道采用何种压缩算法。

Kafka 重要参数配置

在上一篇文章 带你涨姿势的认识一下kafka中,我们主要介绍了一下 kafka 集群搭建的参数,本篇文章我们来介绍一下 Kafka 生产者重要的配置,生产者有很多可配置的参数,在文档里(http://kafka.apache.org/docum...)都有说明,我们介绍几个在内存使用、性能和可靠性方面对生产者影响比较大的参数进行说明

key.serializer

用于 key 键的序列化,它实现了 org.apache.kafka.common.serialization.Serializer 接口

value.serializer

用于 value 值的序列化,实现了 org.apache.kafka.common.serialization.Serializer 接口

acks

acks 参数指定了要有多少个分区副本接收消息,生产者才认为消息是写入成功的。此参数对消息丢失的影响较大

  • 如果 acks = 0,就表示生产者也不知道自己产生的消息是否被服务器接收了,它才知道它写成功了。如果发送的途中产生了错误,生产者也不知道,它也比较懵逼,因为没有返回任何消息。这就类似于 UDP 的运输层协议,只管发,服务器接受不接受它也不关心。
  • 如果 acks = 1,只要集群的 Leader 接收到消息,就会给生产者返回一条消息,告诉它写入成功。如果发送途中造成了网络异常或者 Leader 还没选举出来等其他情况导致消息写入失败,生产者会受到错误消息,这时候生产者往往会再次重发数据。因为消息的发送也分为 同步异步,Kafka 为了保证消息的高效传输会决定是同步发送还是异步发送。如果让客户端等待服务器的响应(通过调用 Future 中的 get() 方法),显然会增加延迟,如果客户端使用回调,就会解决这个问题。
  • 如果 acks = all,这种情况下是只有当所有参与复制的节点都收到消息时,生产者才会接收到一个来自服务器的消息。不过,它的延迟比 acks =1 时更高,因为我们要等待不只一个服务器节点接收消息。

buffer.memory

此参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send() 方法调用要么被阻塞,要么抛出异常,具体取决于 block.on.buffer.null 参数的设置。

compression.type

此参数来表示生产者启用何种压缩算法,默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy、gzip 和 lz4,它指定了消息发送给 broker 之前使用哪一种压缩算法进行压缩。下面是各压缩算法的对比

retries

生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领),在这种情况下,reteis 参数的值决定了生产者可以重发的消息次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者在每次重试之间等待 100ms,这个等待参数可以通过 retry.backoff.ms 进行修改。

batch.size

当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次被填满,批次里的所有消息会被发送出去。不过生产者井不一定都会等到批次被填满才发送,任意条数的消息都可能被发送。

client.id

此参数可以是任意的字符串,服务器会用它来识别消息的来源,一般配置在日志里

max.in.flight.requests.per.connection

此参数指定了生产者在收到服务器响应之前可以发送多少消息,它的值越高,就会占用越多的内存,不过也会提高吞吐量。把它设为1 可以保证消息是按照发送的顺序写入服务器。

timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生产者在发送数据时等待服务器返回的响应时间,metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待时间超时,生产者要么重试发送数据,要么返回一个错误。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配----如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回一个错误。

max.block.ms

此参数指定了在调用 send() 方法或使用 partitionFor() 方法获取元数据时生产者的阻塞时间当生产者的发送缓冲区已捕,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

max.request.size

该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小。

receive.buffer.bytes 和 send.buffer.bytes

Kafka 是基于 TCP 实现的,为了保证可靠的消息传输,这两个参数分别指定了 TCP Socket 接收和发送数据包的缓冲区的大小。如果它们被设置为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值。

Kafka Consumer

应用程序使用 KafkaConsumer 从 Kafka 中订阅主题并接收来自这些主题的消息,然后再把他们保存起来。应用程序首先需要创建一个 KafkaConsumer 对象,订阅主题并开始接受消息,验证消息并保存结果。一段时间后,生产者往主题写入的速度超过了应用程序验证数据的速度,这时候该如何处理?如果只使用单个消费者的话,应用程序会跟不上消息生成的速度,就像多个生产者像相同的主题写入消息一样,这时候就需要多个消费者共同参与消费主题中的消息,对消息进行分流处理。

Kafka 消费者从属于消费者群组。一个群组中的消费者订阅的都是相同的主题,每个消费者接收主题一部分分区的消息。下面是一个 Kafka 分区消费示意图

image.png

上图中的主题 T1 有四个分区,分别是分区0、分区1、分区2、分区3,我们创建一个消费者群组1,消费者群组中只有一个消费者,它订阅主题T1,接收到 T1 中的全部消息。由于一个消费者处理四个生产者发送到分区的消息,压力有些大,需要帮手来帮忙分担任务,于是就演变为下图

image.png

这样一来,消费者的消费能力就大大提高了,但是在某些环境下比如用户产生消息特别多的时候,生产者产生的消息仍旧让消费者吃不消,那就继续增加消费者。

image.png

如上图所示,每个分区所产生的消息能够被每个消费者群组中的消费者消费,如果向消费者群组中增加更多的消费者,那么多余的消费者将会闲置,如下图所示

image.png

向群组中增加消费者是横向伸缩消费能力的主要方式。总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组 G2,而这个消费组有两个消费者,那么就演变为下图这样

image.png

在这个场景中,消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息,在逻辑意义上来说它们属于不同的应用。

总结起来就是如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者

消费者组和分区重平衡

消费者组是什么

消费者组(Consumer Group)是由一个或多个消费者实例(Consumer Instance)组成的群组,具有可扩展性和可容错性的一种机制。消费者组内的消费者共享一个消费者组ID,这个ID 也叫做 Group ID,组内的消费者共同对一个主题进行订阅和消费,同一个组中的消费者只能消费一个分区的消息,多余的消费者会闲置,派不上用场。

我们在上面提到了两种消费方式

  • 一个消费者群组消费一个主题中的消息,这种消费模式又称为点对点的消费方式,点对点的消费方式又被称为消息队列
  • 一个主题中的消息被多个消费者群组共同消费,这种消费模式又称为发布-订阅模式

消费者重平衡

我们从上面的消费者演变图中可以知道这么一个过程:最初是一个消费者订阅一个主题并消费其全部分区的消息,后来有一个消费者加入群组,随后又有更多的消费者加入群组,而新加入的消费者实例分摊了最初消费者的部分消息,这种把分区的所有权通过一个消费者转到其他消费者的行为称为重平衡,英文名也叫做 Rebalance 。如下图所示

image.png

重平衡非常重要,它为消费者群组带来了高可用性伸缩性,我们可以放心的添加消费者或移除消费者,不过在正常情况下我们并不希望发生这样的行为。在重平衡期间,消费者无法读取消息,造成整个消费者组在重平衡的期间都不可用。另外,当分区被重新分配给另一个消费者时,消息当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。

消费者通过向组织协调者(Kafka Broker)发送心跳来维护自己是消费者组的一员并确认其拥有的分区。对于不同不的消费群体来说,其组织协调者可以是不同的。只要消费者定期发送心跳,就会认为消费者是存活的并处理其分区中的消息。当消费者检索记录或者提交它所消费的记录时就会发送心跳。

如果过了一段时间 Kafka 停止发送心跳了,会话(Session)就会过期,组织协调者就会认为这个 Consumer 已经死亡,就会触发一次重平衡。如果消费者宕机并且停止发送消息,组织协调者会等待几秒钟,确认它死亡了才会触发重平衡。在这段时间里,死亡的消费者将不处理任何消息。在清理消费者时,消费者将通知协调者它要离开群组,组织协调者会触发一次重平衡,尽量降低处理停顿。

重平衡是一把双刃剑,它为消费者群组带来高可用性和伸缩性的同时,还有有一些明显的缺点(bug),而这些 bug 到现在社区还无法修改。

重平衡的过程对消费者组有极大的影响。因为每次重平衡过程中都会导致万物静止,参考 JVM 中的垃圾回收机制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虚拟机》中 p76 关于 Serial 收集器的描述):

更重要的是它在进行垃圾收集时,必须暂停其他所有的工作线程。直到它收集结束。Stop The World 这个名字听起来很帅,但这项工作实际上是由虚拟机在后台自动发起并完成的,在用户不可见的情况下把用户正常工作的线程全部停掉,这对很多应用来说都是难以接受的。

也就是说,在重平衡期间,消费者组中的消费者实例都会停止消费,等待重平衡的完成。而且重平衡这个过程很慢......

创建消费者

上面的理论说的有点多,下面就通过代码来讲解一下消费者是如何消费的

在读取消息之前,需要先创建一个 KafkaConsumer 对象。创建 KafkaConsumer 对象与创建 KafkaProducer 对象十分相似 --- 把需要传递给消费者的属性放在 properties 对象中,后面我们会着重讨论 Kafka 的一些配置,这里我们先简单的创建一下,使用3个属性就足矣,分别是 bootstrap.serverkey.deserializervalue.deserializer

这三个属性我们已经用过很多次了,如果你还不是很清楚的话,可以参考 带你涨姿势是认识一下Kafka Producer

还有一个属性是 group.id 这个属性不是必须的,它指定了 KafkaConsumer 是属于哪个消费者群组。创建不属于任何一个群组的消费者也是可以的

Properties properties = new Properties();
        properties.put("bootstrap.server","192.168.1.9:9092");     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

主题订阅

创建好消费者之后,下一步就开始订阅主题了。subscribe() 方法接受一个主题列表作为参数,使用起来比较简单

consumer.subscribe(Collections.singletonList("customerTopic"));

为了简单我们只订阅了一个主题 customerTopic,参数传入的是一个正则表达式,正则表达式可以匹配多个主题,如果有人创建了新的主题,并且主题的名字与正则表达式相匹配,那么会立即触发一次重平衡,消费者就可以读取新的主题。

要订阅所有与 test 相关的主题,可以这样做

consumer.subscribe("test.*");

轮询

我们知道,Kafka 是支持订阅/发布模式的,生产者发送数据给 Kafka Broker,那么消费者是如何知道生产者发送了数据呢?其实生产者产生的数据消费者是不知道的,KafkaConsumer 采用轮询的方式定期去 Kafka Broker 中进行数据的检索,如果有数据就用来消费,如果没有就再继续轮询等待,下面是轮询等待的具体实现

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
    for (ConsumerRecord<String, String> record : records) {
      int updateCount = 1;
      if (map.containsKey(record.value())) {
        updateCount = (int) map.get(record.value() + 1);
      }
      map.put(record.value(), updateCount);
    }
  }
}finally {
  consumer.close();
}
  • 这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过轮询的方式向 Kafka 请求数据。
  • 第三行代码非常重要,Kafka 必须定期循环请求数据,否则就会认为该 Consumer 已经挂了,会触发重平衡,它的分区会移交给群组中的其它消费者。传给 poll() 方法的是一个超市时间,用 java.time.Duration 类来表示,如果该参数被设置为 0 ,poll() 方法会立刻返回,否则就会在指定的毫秒数内一直等待 broker 返回数据。
  • poll() 方法会返回一个记录列表。每条记录都包含了记录所属主题的信息,记录所在分区的信息、记录在分区中的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐条处理每条记录。
  • 在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭,并立即触发一次重平衡,而不是等待群组协调器发现它不再发送心跳并认定它已经死亡。
线程安全性

在同一个群组中,我们无法让一个线程运行多个消费者,也无法让多个线程安全的共享一个消费者。按照规则,一个消费者使用一个线程,如果一个消费者群组中多个消费者都想要运行的话,那么必须让每个消费者在自己的线程中运行,可以使用 Java 中的 ExecutorService 启动多个消费者进行进行处理。

消费者配置

到目前为止,我们学习了如何使用消费者 API,不过只介绍了几个最基本的属性,Kafka 文档列出了所有与消费者相关的配置说明。大部分参数都有合理的默认值,一般不需要修改它们,下面我们就来介绍一下这些参数。

  • fetch.min.bytes

该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题使用频率不是很高的时候就不用来回处理消息。如果没有很多可用数据,但消费者的 CPU 使用率很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值调大可以降低 broker 的工作负载。

  • fetch.max.wait.ms

我们通过上面的 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才会把它返回给消费者。而 fetch.max.wait.ms 则用于指定 broker 的等待时间,默认是 500 毫秒。如果没有足够的数据流入 kafka 的话,消费者获取的最小数据量要求就得不到满足,最终导致 500 毫秒的延迟。如果要降低潜在的延迟,就可以把参数值设置的小一些。如果 fetch.max.wait.ms 被设置为 100 毫秒的延迟,而 fetch.min.bytes 的值设置为 1MB,那么 Kafka 在收到消费者请求后,要么返回 1MB 的数据,要么在 100 ms 后返回所有可用的数据。就看哪个条件首先被满足。

  • max.partition.fetch.bytes

该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值时 1MB,也就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes 指定的字节。如果一个主题有20个分区和5个消费者,那么每个消费者需要至少4 MB的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置大),否则消费者可能无法读取这些消息,导致消费者一直挂起重试。 在设置该属性时,另外一个考量的因素是消费者处理数据的时间。消费者需要频繁的调用 poll() 方法来避免会话过期和发生分区再平衡,如果单次调用poll() 返回的数据太多,消费者需要更多的时间进行处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把 max.partition.fetch.bytes 值改小,或者延长会话过期时间。

  • session.timeout.ms

这个属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就会被认定为死亡,协调器就会触发重平衡。把它的分区分配给消费者群组中的其它消费者,此属性与 heartbeat.interval.ms 紧密相关。heartbeat.interval.ms 指定了 poll() 方法向群组协调器发送心跳的频率,session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,这两个属性一般需要同时修改,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 1s。把 session.timeout.ms 值设置的比默认值小,可以更快地检测和恢复崩愤的节点,不过长时间的轮询或垃圾收集可能导致非预期的重平衡。把该属性的值设置得大一些,可以减少意外的重平衡,不过检测节点崩溃需要更长的时间。

  • auto.offset.reset

该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下的该如何处理。它的默认值是 latest,意思指的是,在偏移量无效的情况下,消费者将从最新的记录开始读取数据。另一个值是 earliest,意思指的是在偏移量无效的情况下,消费者将从起始位置处开始读取分区的记录。

  • enable.auto.commit

我们稍后将介绍几种不同的提交偏移量的方式。该属性指定了消费者是否自动提交偏移量,默认值是 true,为了尽量避免出现重复数据和数据丢失,可以把它设置为 false,由自己控制何时提交偏移量。如果把它设置为 true,还可以通过 auto.commit.interval.ms 属性来控制提交的频率

  • partition.assignment.strategy

我们知道,分区会分配给群组中的消费者。PartitionAssignor 会根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者,Kafka 有两个默认的分配策略RangeRoundRobin

  • client.id

该属性可以是任意字符串,broker 用他来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额中

  • max.poll.records

该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询中需要处理的数据量。

  • receive.buffer.bytes 和 send.buffer.bytes

socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设置为 -1,就使用操作系统默认值。如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

提交和偏移量的概念

特殊偏移

我们上面提到,消费者在每次调用poll() 方法进行定时轮询的时候,会返回由生产者写入 Kafka 但是还没有被消费者消费的记录,因此我们可以追踪到哪些记录是被群组里的哪个消费者读取的。消费者可以使用 Kafka 来追踪消息在分区中的位置(偏移量)

消费者会向一个叫做 _consumer_offset 的特殊主题中发送消息,这个主题会保存每次所发送消息中的分区偏移量,这个主题的主要作用就是消费者触发重平衡后记录偏移使用的,消费者每次向这个主题发送消息,正常情况下不触发重平衡,这个主题是不起作用的,当触发重平衡后,消费者停止工作,每个消费者可能会分到对应的分区,这个主题就是让消费者能够继续处理消息所设置的。

如果提交的偏移量小于客户端最后一次处理的偏移量,那么位于两个偏移量之间的消息就会被重复处理

image.png

如果提交的偏移量大于最后一次消费时的偏移量,那么处于两个偏移量中间的消息将会丢失

image.png

既然_consumer_offset 如此重要,那么它的提交方式是怎样的呢?下面我们就来说一下####提交方式

KafkaConsumer API 提供了多种方式来提交偏移量

自动提交

最简单的方式就是让消费者自动提交偏移量。如果 enable.auto.commit 被设置为true,那么每过 5s,消费者会自动把从 poll() 方法轮询到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认是 5s。与消费者里的其他东西一样,自动提交也是在轮询中进行的。消费者在每次轮询中会检查是否提交该偏移量了,如果是,那么就会提交从上一次轮询中返回的偏移量。

提交当前偏移量

auto.commit.offset 设置为 false,可以让应用程序决定何时提交偏移量。使用 commitSync() 提交偏移量。这个 API 会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

commitSync() 将会提交由 poll() 返回的最新偏移量,如果处理完所有记录后要确保调用了 commitSync(),否则还是会有丢失消息的风险,如果发生了在均衡,从最近一批消息到发生在均衡之间的所有消息都将被重复处理。

异步提交

异步提交 commitAsync() 与同步提交 commitSync() 最大的区别在于异步提交不会进行重试,同步提交会一致进行重试。

同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大的问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但是如果在关闭消费者或再均衡前的最后一次提交,就要确保提交成功。

因此,在消费者关闭之前一般会组合使用commitAsync和commitSync提交偏移量

提交特定的偏移量

消费者API允许调用 commitSync() 和 commitAsync() 方法时传入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。

下面为自己做个宣传,欢迎关注公众号 Java建设者,号主是Java技术栈,热爱技术,喜欢阅读,热衷于分享和总结,希望能把每一篇好文章分享给成长道路上的你。
关注公众号回复 002 领取为你特意准备的大礼包,你一定会喜欢并收藏的。

文章参考:

Kafka史上最详细原理总结

《Kafka 权威指南》

https://kafka.apache.org/

http://kafka.apache.org/docum...

https://www.tutorialkart.com/...

https://dzone.com/articles/wh...

《极客时间 - Kafka 核心技术与实战》

查看原文

ttqtc 收藏了文章 · 1月16日

不懂什么是锁?看看这篇你就明白了

Java 锁分类

Java 中的锁有很多,可以按照不同的功能、种类进行分类,下面是我对 Java 中一些常用锁的分类,包括一些基本的概述

image.png

  • 从线程是否需要对资源加锁可以分为 悲观锁乐观锁
  • 从资源已被锁定,线程是否阻塞可以分为 自旋锁
  • 从多个线程并发访问资源,也就是 Synchronized 可以分为 无锁偏向锁轻量级锁重量级锁
  • 从锁的公平性进行区分,可以分为公平锁非公平锁
  • 从根据锁是否重复获取可以分为 可重入锁不可重入锁
  • 从那个多个线程能否获取同一把锁分为 共享锁排他锁

下面我们依次对各个锁的分类进行详细阐述。

线程是否需要对资源加锁

Java 按照是否对资源加锁分为乐观锁悲观锁,乐观锁和悲观锁并不是一种真实存在的锁,而是一种设计思想,乐观锁和悲观锁对于理解 Java 多线程和数据库来说至关重要,下面就来探讨一下这两种实现方式的区别和优缺点

悲观锁

悲观锁是一种悲观思想,它总认为最坏的情况可能会出现,它认为数据很可能会被其他人所修改,所以悲观锁在持有数据的时候总会把资源 或者 数据 锁住,这样其他线程想要请求这个资源的时候就会阻塞,直到等到悲观锁把资源释放为止。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。悲观锁的实现往往依靠数据库本身的锁功能实现。

Java 中的 SynchronizedReentrantLock 等独占锁(排他锁)也是一种悲观锁思想的实现,因为 Synchronzied 和 ReetrantLock 不管是否持有资源,它都会尝试去加锁,生怕自己心爱的宝贝被别人拿走。

乐观锁

乐观锁的思想与悲观锁的思想相反,它总认为资源和数据不会被别人所修改,所以读取不会上锁,但是乐观锁在进行写入操作的时候会判断当前数据是否被修改过(具体如何判断我们下面再说)。乐观锁的实现方案一般来说有两种: 版本号机制CAS实现 。乐观锁多适用于多度的应用类型,这样可以提高吞吐量。

在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了乐观锁的一种实现方式 CAS 实现的。

两种锁的使用场景

上面介绍了两种锁的基本概念,并提到了两种锁的适用场景,一般来说,悲观锁不仅会对写操作加锁还会对读操作加锁,一个典型的悲观锁调用:

select * from student where name="cxuan" for update

这条 sql 语句从 Student 表中选取 name = "cxuan" 的记录并对其加锁,那么其他写操作再这个事务提交之前都不会对这条数据进行操作,起到了独占和排他的作用。

悲观锁因为对读写都加锁,所以它的性能比较低,对于现在互联网提倡的三高(高性能、高可用、高并发)来说,悲观锁的实现用的越来越少了,但是一般多读的情况下还是需要使用悲观锁的,因为虽然加锁的性能比较低,但是也阻止了像乐观锁一样,遇到写不一致的情况下一直重试的时间。

相对而言,乐观锁用于读多写少的情况,即很少发生冲突的场景,这样可以省去锁的开销,增加系统的吞吐量。

乐观锁的适用场景有很多,典型的比如说成本系统,柜员要对一笔金额做修改,为了保证数据的准确性和实效性,使用悲观锁锁住某个数据后,再遇到其他需要修改数据的操作,那么此操作就无法完成金额的修改,对产品来说是灾难性的一刻,使用乐观锁的版本号机制能够解决这个问题,我们下面说。

乐观锁的实现方式

乐观锁一般有两种实现方式:采用版本号机制CAS(Compare-and-Swap,即比较并替换)算法实现。

版本号机制

版本号机制是在数据表中加上一个 version 字段来实现的,表示数据被修改的次数,当执行写操作并且写入成功后,version = version + 1,当线程A要更新数据时,在读取数据的同时也会读取 version 值,在提交更新时,若刚才读取到的 version 值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功。

我们以上面的金融系统为例,来简述一下这个过程。

image.png

  • 成本系统中有一个数据表,表中有两个字段分别是 金额version,金额的属性是能够实时变化,而 version 表示的是金额每次发生变化的版本,一般的策略是,当金额发生改变时,version 采用递增的策略每次都在上一个版本号的基础上 + 1。
  • 在了解了基本情况和基本信息之后,我们来看一下这个过程:公司收到回款后,需要把这笔钱放在金库中,假如金库中存有100 元钱

    • 下面开启事务一:当男柜员执行回款写入操作前,他会先查看(读)一下金库中还有多少钱,此时读到金库中有 100 元,可以执行写操作,并把数据库中的钱更新为 120 元,提交事务,金库中的钱由 100 -> 120,version的版本号由 0 -> 1。
    • 开启事务二:女柜员收到给员工发工资的请求后,需要先执行读请求,查看金库中的钱还有多少,此时的版本号是多少,然后从金库中取出员工的工资进行发放,提交事务,成功后版本 + 1,此时版本由 1 -> 2。

上面两种情况是最乐观的情况,上面的两个事务都是顺序执行的,也就是事务一和事务二互不干扰,那么事务要并行执行会如何呢?

image.png

  • 事务一开启,男柜员先执行读操作,取出金额和版本号,执行写操作

    begin
    update 表 set 金额 = 120,version = version + 1 where 金额 = 100 and version = 0

    此时金额改为 120,版本号为1,事务还没有提交

    事务二开启,女柜员先执行读操作,取出金额和版本号,执行写操作

    begin
    update 表 set 金额 = 50,version = version + 1 where 金额 = 100 and version = 0

    此时金额改为 50,版本号变为 1,事务未提交

    现在提交事务一,金额改为 120,版本变为1,提交事务。理想情况下应该变为 金额 = 50,版本号 = 2,但是实际上事务二 的更新是建立在金额为 100 和 版本号为 0 的基础上的,所以事务二不会提交成功,应该重新读取金额和版本号,再次进行写操作。

    这样,就避免了女柜员 用基于 version = 0 的旧数据修改的结果覆盖男操作员操作结果的可能。

CAS 算法

省略代码,完整代码请参照 看完你就应该能明白的悲观锁和乐观锁

CAS 即 compare and swap(比较与交换),是一种有名的无锁算法。即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步(Non-blocking Synchronization

Java 从 JDK1.5 开始支持,java.util.concurrent 包里提供了很多面向并发编程的类,也提供了 CAS 算法的支持,一些以 Atomic 为开头的一些原子类都使用 CAS 作为其实现方式。使用这些类在多核 CPU 的机器上会有比较好的性能。

如果要把证它们的原子性,必须进行加锁,使用 Synchronzied 或者 ReentrantLock,我们前面介绍它们是悲观锁的实现,我们现在讨论的是乐观锁,那么用哪种方式保证它们的原子性呢?请继续往下看

CAS 中涉及三个要素:

  • 需要读写的内存值 V
  • 进行比较的值 A
  • 拟写入的新值 B

当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

我们以 java.util.concurrent 中的 AtomicInteger 为例,看一下在不用锁的情况下是如何保证线程安全的

public class AtomicCounter {

    private AtomicInteger integer = new AtomicInteger();

    public AtomicInteger getInteger() {
        return integer;
    }

    public void setInteger(AtomicInteger integer) {
        this.integer = integer;
    }

    public void increment(){
        integer.incrementAndGet();
    }

    public void decrement(){
        integer.decrementAndGet();
    }

}

public class AtomicProducer extends Thread{

    private AtomicCounter atomicCounter;

    public AtomicProducer(AtomicCounter atomicCounter){
        this.atomicCounter = atomicCounter;
    }

    @Override
    public void run() {
        for(int j = 0; j < AtomicTest.LOOP; j++) {
            System.out.println("producer : " + atomicCounter.getInteger());
            atomicCounter.increment();
        }
    }
}

public class AtomicConsumer extends Thread{

    private AtomicCounter atomicCounter;

    public AtomicConsumer(AtomicCounter atomicCounter){
        this.atomicCounter = atomicCounter;
    }

    @Override
    public void run() {
        for(int j = 0; j < AtomicTest.LOOP; j++) {
            System.out.println("consumer : " + atomicCounter.getInteger());
            atomicCounter.decrement();
        }
    }
}

public class AtomicTest {

    final static int LOOP = 10000;

    public static void main(String[] args) throws InterruptedException {

        AtomicCounter counter = new AtomicCounter();
        AtomicProducer producer = new AtomicProducer(counter);
        AtomicConsumer consumer = new AtomicConsumer(counter);

        producer.start();
        consumer.start();

        producer.join();
        consumer.join();

        System.out.println(counter.getInteger());

    }
}

经测试可得,不管循环多少次最后的结果都是0,也就是多线程并行的情况下,使用 AtomicInteger 可以保证线程安全性。 incrementAndGet 和 decrementAndGet 都是原子性操作。

乐观锁的缺点

任何事情都是有利也有弊,软件行业没有完美的解决方案只有最优的解决方案,所以乐观锁也有它的弱点和缺陷:

ABA 问题

ABA 问题说的是,如果一个变量第一次读取的值是 A,准备好需要对 A 进行写操作的时候,发现值还是 A,那么这种情况下,能认为 A 的值没有被改变过吗?可以是由 A -> B -> A 的这种情况,但是 AtomicInteger 却不会这么认为,它只相信它看到的,它看到的是什么就是什么。

JDK 1.5 以后的 AtomicStampedReference 类就提供了此种能力,其中的 compareAndSet 方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

也可以采用CAS的一个变种DCAS来解决这个问题。
DCAS,是对于每一个V增加一个引用的表示修改次数的标记符。对于每个V,如果引用修改了一次,这个计数器就加1。然后再这个变量需要update的时候,就同时检查变量的值和计数器的值。

循环开销大

我们知道乐观锁在进行写操作的时候会判断是否能够写入成功,如果写入不成功将触发等待 -> 重试机制,这种情况是一个自旋锁,简单来说就是适用于短期内获取不到,进行等待重试的锁,它不适用于长期获取不到锁的情况,另外,自旋循环对于性能开销比较大。

CAS与synchronized的使用情景

简单的来说 CAS 适用于写比较少的情况下(多读场景,冲突一般较少),synchronized 适用于写比较多的情况下(多写场景,冲突一般较多)

  • 对于资源竞争较少(线程冲突较轻)的情况,使用 Synchronized 同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗 cpu 资源;而 CAS 基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能。
  • 对于资源竞争严重(线程冲突严重)的情况,CAS 自旋的概率会比较大,从而浪费更多的 CPU 资源,效率低于 synchronized。

资源已被锁定,线程是否阻塞

自旋锁的提出背景

由于在多处理器环境中某些资源的有限性,有时需要互斥访问(mutual exclusion),这时候就需要引入锁的概念,只有获取了锁的线程才能够对资源进行访问,由于多线程的核心是CPU的时间分片,所以同一时刻只能有一个线程获取到锁。那么就面临一个问题,那么没有获取到锁的线程应该怎么办?

通常有两种处理方式:一种是没有获取到锁的线程就一直循环等待判断该资源是否已经释放锁,这种锁叫做自旋锁,它不用将线程阻塞起来(NON-BLOCKING);还有一种处理方式就是把自己阻塞起来,等待重新调度请求,这种叫做互斥锁

什么是自旋锁

自旋锁的定义:当一个线程尝试去获取某一把锁的时候,如果这个锁此时已经被别人获取(占用),那么此线程就无法获取到这把锁,该线程将会等待,间隔一段时间后会再次尝试获取。这种采用循环加锁 -> 等待的机制被称为自旋锁(spinlock)

image.png

自旋锁的原理

自旋锁的原理比较简单,如果持有锁的线程能在短时间内释放锁资源,那么那些等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞状态,它们只需要等一等(自旋),等到持有锁的线程释放锁之后即可获取,这样就避免了用户进程和内核切换的消耗。

因为自旋锁避免了操作系统进程调度和线程切换,所以自旋锁通常适用在时间比较短的情况下。由于这个原因,操作系统的内核经常使用自旋锁。但是,如果长时间上锁的话,自旋锁会非常耗费性能,它阻止了其他线程的运行和调度。线程持有锁的时间越长,则持有该锁的线程将被 OS(Operating System) 调度程序中断的风险越大。如果发生中断情况,那么其他线程将保持旋转状态(反复尝试获取锁),而持有该锁的线程并不打算释放锁,这样导致的是结果是无限期推迟,直到持有锁的线程可以完成并释放它为止。

解决上面这种情况一个很好的方式是给自旋锁设定一个自旋时间,等时间一到立即释放自旋锁。自旋锁的目的是占着CPU资源不进行释放,等到获取锁立即进行处理。但是如何去选择自旋时间呢?如果自旋执行时间太长,会有大量的线程处于自旋状态占用 CPU 资源,进而会影响整体系统的性能。因此自旋的周期选的额外重要!JDK在1.6 引入了适应性自旋锁,适应性自旋锁意味着自旋时间不是固定的了,而是由前一次在同一个锁上的自旋时间以及锁拥有的状态来决定,基本认为一个线程上下文切换的时间是最佳的一个时间。

自旋锁的优缺点

自旋锁尽可能的减少线程的阻塞,这对于锁的竞争不激烈,且占用锁时间非常短的代码块来说性能能大幅度的提升,因为自旋的消耗会小于线程阻塞挂起再唤醒的操作的消耗,这些操作会导致线程发生两次上下文切换!

但是如果锁的竞争激烈,或者持有锁的线程需要长时间占用锁执行同步块,这时候就不适合使用自旋锁了,因为自旋锁在获取锁前一直都是占用 cpu 做无用功,占着 XX 不 XX,同时有大量线程在竞争一个锁,会导致获取锁的时间很长,线程自旋的消耗大于线程阻塞挂起操作的消耗,其它需要 cpu 的线程又不能获取到 cpu,造成 cpu 的浪费。所以这种情况下我们要关闭自旋锁。

自旋锁的实现

下面我们用Java 代码来实现一个简单的自旋锁

public class SpinLockTest {

    private AtomicBoolean available = new AtomicBoolean(false);

    public void lock(){

        // 循环检测尝试获取锁
        while (!tryLock()){
            // doSomething...
        }

    }

    public boolean tryLock(){
        // 尝试获取锁,成功返回true,失败返回false
        return available.compareAndSet(false,true);
    }

    public void unLock(){
        if(!available.compareAndSet(true,false)){
            throw new RuntimeException("释放锁失败");
        }
    }

}

这种简单的自旋锁有一个问题:无法保证多线程竞争的公平性。对于上面的 SpinlockTest,当多个线程想要获取锁时,谁最先将available设为false谁就能最先获得锁,这可能会造成某些线程一直都未获取到锁造成线程饥饿。就像我们下课后蜂拥的跑向食堂,下班后蜂拥地挤向地铁,通常我们会采取排队的方式解决这样的问题,类似地,我们把这种锁叫排队自旋锁(QueuedSpinlock)。计算机科学家们使用了各种方式来实现排队自旋锁,如TicketLock,MCSLock,CLHLock。接下来我们分别对这几种锁做个大致的介绍。

TicketLock

在计算机科学领域中,TicketLock 是一种同步机制或锁定算法,它是一种自旋锁,它使用ticket 来控制线程执行顺序。

就像票据队列管理系统一样。面包店或者服务机构(例如银行)都会使用这种方式来为每个先到达的顾客记录其到达的顺序,而不用每次都进行排队。通常,这种地点都会有一个分配器(叫号器,挂号器等等都行),先到的人需要在这个机器上取出自己现在排队的号码,这个号码是按照自增的顺序进行的,旁边还会有一个标牌显示的是正在服务的标志,这通常是代表目前正在服务的队列号,当前的号码完成服务后,标志牌会显示下一个号码可以去服务了。

像上面系统一样,TicketLock 是基于先进先出(FIFO) 队列的机制。它增加了锁的公平性,其设计原则如下:TicketLock 中有两个 int 类型的数值,开始都是0,第一个值是队列ticket(队列票据), 第二个值是 出队(票据)。队列票据是线程在队列中的位置,而出队票据是现在持有锁的票证的队列位置。可能有点模糊不清,简单来说,就是队列票据是你取票号的位置,出队票据是你距离叫号的位置。现在应该明白一些了吧。

当叫号叫到你的时候,不能有相同的号码同时办业务,必须只有一个人可以去办,办完后,叫号机叫到下一个人,这就叫做原子性。你在办业务的时候不能被其他人所干扰,而且不可能会有两个持有相同号码的人去同时办业务。然后,下一个人看自己的号是否和叫到的号码保持一致,如果一致的话,那么就轮到你去办业务,否则只能继续等待。上面这个流程的关键点在于,每个办业务的人在办完业务之后,他必须丢弃自己的号码,叫号机才能继续叫到下面的人,如果这个人没有丢弃这个号码,那么其他人只能继续等待。下面来实现一下这个票据排队方案

public class TicketLock {

    // 队列票据(当前排队号码)
    private AtomicInteger queueNum = new AtomicInteger();

    // 出队票据(当前需等待号码)
    private AtomicInteger dueueNum = new AtomicInteger();

    // 获取锁:如果获取成功,返回当前线程的排队号
    public int lock(){
        int currentTicketNum = dueueNum.incrementAndGet();
        while (currentTicketNum != queueNum.get()){
            // doSomething...
        }
        return currentTicketNum;
    }

    // 释放锁:传入当前排队的号码
    public void unLock(int ticketNum){
        queueNum.compareAndSet(ticketNum,ticketNum + 1);
    }

}

每次叫号机在叫号的时候,都会判断自己是不是被叫的号,并且每个人在办完业务的时候,叫号机根据在当前号码的基础上 + 1,让队列继续往前走。

但是上面这个设计是有问题的,因为获得自己的号码之后,是可以对号码进行更改的,这就造成系统紊乱,锁不能及时释放。这时候就需要有一个能确保每个人按会着自己号码排队办业务的角色,在得知这一点之后,我们重新设计一下这个逻辑

public class TicketLock2 {

    // 队列票据(当前排队号码)
    private AtomicInteger queueNum = new AtomicInteger();

    // 出队票据(当前需等待号码)
    private AtomicInteger dueueNum = new AtomicInteger();

    private ThreadLocal<Integer> ticketLocal = new ThreadLocal<>();

    public void lock(){
        int currentTicketNum = dueueNum.incrementAndGet();

        // 获取锁的时候,将当前线程的排队号保存起来
        ticketLocal.set(currentTicketNum);
        while (currentTicketNum != queueNum.get()){
            // doSomething...
        }
    }

    // 释放锁:从排队缓冲池中取
    public void unLock(){
        Integer currentTicket = ticketLocal.get();
        queueNum.compareAndSet(currentTicket,currentTicket + 1);
    }

}

这次就不再需要返回值,办业务的时候,要将当前的这一个号码缓存起来,在办完业务后,需要释放缓存的这条票据。

缺点

TicketLock 虽然解决了公平性的问题,但是多处理器系统上,每个进程/线程占用的处理器都在读写同一个变量queueNum ,每次读写操作都必须在多个处理器缓存之间进行缓存同步,这会导致繁重的系统总线和内存的流量,大大降低系统整体的性能。

为了解决这个问题,MCSLock 和 CLHLock 应运而生。

CLHLock

上面说到TicketLock 是基于队列的,那么 CLHLock 就是基于链表设计的,CLH的发明人是:Craig,Landin and Hagersten,用它们各自的字母开头命名。CLH 是一种基于链表的可扩展,高性能,公平的自旋锁,申请线程只能在本地变量上自旋,它会不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

public class CLHLock {

    public static class CLHNode{
        private volatile boolean isLocked = true;
    }

    // 尾部节点
    private volatile CLHNode tail;
    private static final ThreadLocal<CLHNode> LOCAL = new ThreadLocal<>();
    private static final AtomicReferenceFieldUpdater<CLHLock,CLHNode> UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(CLHLock.class,CLHNode.class,"tail");


    public void lock(){
        // 新建节点并将节点与当前线程保存起来
        CLHNode node = new CLHNode();
        LOCAL.set(node);

        // 将新建的节点设置为尾部节点,并返回旧的节点(原子操作),这里旧的节点实际上就是当前节点的前驱节点
        CLHNode preNode = UPDATER.getAndSet(this,node);
        if(preNode != null){
            // 前驱节点不为null表示当锁被其他线程占用,通过不断轮询判断前驱节点的锁标志位等待前驱节点释放锁
            while (preNode.isLocked){

            }
            preNode = null;
            LOCAL.set(node);
        }
        // 如果不存在前驱节点,表示该锁没有被其他线程占用,则当前线程获得锁
    }

    public void unlock() {
        // 获取当前线程对应的节点
        CLHNode node = LOCAL.get();
        // 如果tail节点等于node,则将tail节点更新为null,同时将node的lock状态职位false,表示当前线程释放了锁
        if (!UPDATER.compareAndSet(this, node, null)) {
            node.isLocked = false;
        }
        node = null;
    }
}

MCSLock

MCS Spinlock 是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,直接前驱负责通知其结束自旋,从而极大地减少了不必要的处理器缓存同步的次数,降低了总线和内存的开销。MCS 来自于其发明人名字的首字母: John Mellor-Crummey 和 Michael Scott。

public class MCSLock {

    public static class MCSNode {
        volatile MCSNode next;
        volatile boolean isLocked = true;
    }

    private static final ThreadLocal<MCSNode> NODE = new ThreadLocal<>();

    // 队列
    @SuppressWarnings("unused")
    private volatile MCSNode queue;

    private static final AtomicReferenceFieldUpdater<MCSLock,MCSNode> UPDATE =
            AtomicReferenceFieldUpdater.newUpdater(MCSLock.class,MCSNode.class,"queue");


    public void lock(){
        // 创建节点并保存到ThreadLocal中
        MCSNode currentNode = new MCSNode();
        NODE.set(currentNode);

        // 将queue设置为当前节点,并且返回之前的节点
        MCSNode preNode = UPDATE.getAndSet(this, currentNode);
        if (preNode != null) {
            // 如果之前节点不为null,表示锁已经被其他线程持有
            preNode.next = currentNode;
            // 循环判断,直到当前节点的锁标志位为false
            while (currentNode.isLocked) {
            }
        }
    }

    public void unlock() {
        MCSNode currentNode = NODE.get();
        // next为null表示没有正在等待获取锁的线程
        if (currentNode.next == null) {
            // 更新状态并设置queue为null
            if (UPDATE.compareAndSet(this, currentNode, null)) {
                // 如果成功了,表示queue==currentNode,即当前节点后面没有节点了
                return;
            } else {
                // 如果不成功,表示queue!=currentNode,即当前节点后面多了一个节点,表示有线程在等待
                // 如果当前节点的后续节点为null,则需要等待其不为null(参考加锁方法)
                while (currentNode.next == null) {
                }
            }
        } else {
            // 如果不为null,表示有线程在等待获取锁,此时将等待线程对应的节点锁状态更新为false,同时将当前线程的后继节点设为null
            currentNode.next.isLocked = false;
            currentNode.next = null;
        }
    }
}

CLHLock 和 MCSLock

  • 都是基于链表,不同的是CLHLock是基于隐式链表,没有真正的后续节点属性,MCSLock是显示链表,有一个指向后续节点的属性。
  • 将获取锁的线程状态借助节点(node)保存,每个线程都有一份独立的节点,这样就解决了TicketLock多处理器缓存同步的问题。

多个线程并发访问资源

锁状态的分类

Java 语言专门针对 synchronized 关键字设置了四种状态,它们分别是:无锁、偏向锁、轻量级锁和重量级锁,但是在了解这些锁之前还需要先了解一下 Java 对象头和 Monitor。

Java 对象头

我们知道 synchronized 是悲观锁,在操作同步之前需要给资源加锁,这把锁就是对象头里面的,而Java 对象头又是什么呢?我们以 Hotspot 虚拟机为例,Hopspot 对象头主要包括两部分数据:Mark Word(标记字段)class Pointer(类型指针)

Mark Word:默认存储对象的HashCode,分代年龄和锁标志位信息。这些信息都是与对象自身定义无关的数据,所以Mark Word被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据。它会根据对象的状态复用自己的存储空间,也就是说在运行期间Mark Word里存储的数据会随着锁标志位的变化而变化。

class Point:对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。

在32位虚拟机和64位虚拟机的 Mark Word 所占用的字节大小不一样,32位虚拟机的 Mark Word 和 class Pointer 分别占用 32bits 的字节,而 64位虚拟机的 Mark Word 和 class Pointer 占用了64bits 的字节,下面我们以 32位虚拟机为例,来看一下其 Mark Word 的字节具体是如何分配的

image.png

image.png

用中文翻译过来就是

image.png

  • 无状态也就是无锁的时候,对象头开辟 25bit 的空间用来存储对象的 hashcode ,4bit 用于存放分代年龄,1bit 用来存放是否偏向锁的标识位,2bit 用来存放锁标识位为01
  • 偏向锁 中划分更细,还是开辟25bit 的空间,其中23bit 用来存放线程ID,2bit 用来存放 epoch,4bit 存放分代年龄,1bit 存放是否偏向锁标识, 0表示无锁,1表示偏向锁,锁的标识位还是01
  • 轻量级锁中直接开辟 30bit 的空间存放指向栈中锁记录的指针,2bit 存放锁的标志位,其标志位为00
  • 重量级锁中和轻量级锁一样,30bit 的空间用来存放指向重量级锁的指针,2bit 存放锁的标识位,为11
  • GC标记开辟30bit 的内存空间却没有占用,2bit 空间存放锁标志位为11。

其中无锁和偏向锁的锁标志位都是01,只是在前面的1bit区分了这是无锁状态还是偏向锁状态。

关于为什么这么分配的内存,我们可以从 OpenJDK 中的markOop.hpp类中的枚举窥出端倪

image.png
来解释一下

  • age_bits 就是我们说的分代回收的标识,占用4字节
  • lock_bits 是锁的标志位,占用2个字节
  • biased_lock_bits 是是否偏向锁的标识,占用1个字节
  • max_hash_bits 是针对无锁计算的hashcode 占用字节数量,如果是32位虚拟机,就是 32 - 4 - 2 -1 = 25 byte,如果是64 位虚拟机,64 - 4 - 2 - 1 = 57 byte,但是会有 25 字节未使用,所以64位的 hashcode 占用 31 byte
  • hash_bits 是针对 64 位虚拟机来说,如果最大字节数大于 31,则取31,否则取真实的字节数
  • cms_bits 我觉得应该是不是64位虚拟机就占用 0 byte,是64位就占用 1byte
  • epoch_bits 就是 epoch 所占用的字节大小,2字节。

Synchronized锁

synchronized用的锁记录是存在Java对象头里的。

JVM基于进入和退出 Monitor 对象来实现方法同步和代码块同步。代码块同步是使用 monitorenter 和 monitorexit 指令实现的,monitorenter 指令是在编译后插入到同步代码块的开始位置,而 monitorexit 是插入到方法结束处和异常处。任何对象都有一个 monitor 与之关联,当且一个 monitor 被持有后,它将处于锁定状态。

根据虚拟机规范的要求,在执行 monitorenter 指令时,首先要去尝试获取对象的锁,如果这个对象没被锁定,或者当前线程已经拥有了那个对象的锁,把锁的计数器加1,相应地,在执行 monitorexit 指令时会将锁计数器减1,当计数器被减到0时,锁就释放了。如果获取对象锁失败了,那当前线程就要阻塞等待,直到对象锁被另一个线程释放为止。

Monitor

Synchronized是通过对象内部的一个叫做监视器锁(monitor)来实现的,监视器锁本质又是依赖于底层的操作系统的 Mutex Lock(互斥锁)来实现的。而操作系统实现线程之间的切换需要从用户态转换到核心态,这个成本非常高,状态之间的转换需要相对比较长的时间,这就是为什么 Synchronized 效率低的原因。因此,这种依赖于操作系统 Mutex Lock 所实现的锁我们称之为重量级锁

Java SE 1.6为了减少获得锁和释放锁带来的性能消耗,引入了偏向锁轻量级锁:锁一共有4种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态。锁可以升级但不能降级。

所以锁的状态总共有四种:无锁状态、偏向锁、轻量级锁和重量级锁。随着锁的竞争,锁可以从偏向锁升级到轻量级锁,再升级的重量级锁(但是锁的升级是单向的,也就是说只能从低到高升级,不会出现锁的降级)。JDK 1.6中默认是开启偏向锁和轻量级锁的,我们也可以通过-XX:-UseBiasedLocking=false来禁用偏向锁。

锁的分类及其解释

先来个大体的流程图来感受一下这个过程,然后下面我们再分开来说

image.png

无锁

无锁状态,无锁即没有对资源进行锁定,所有的线程都可以对同一个资源进行访问,但是只有一个线程能够成功修改资源。

image.png

无锁的特点就是在循环内进行修改操作,线程会不断的尝试修改共享资源,直到能够成功修改资源并退出,在此过程中没有出现冲突的发生,这很像我们在之前文章中介绍的 CAS 实现,CAS 的原理和应用就是无锁的实现。无锁无法全面代替有锁,但无锁在某些场合下的性能是非常高的。

偏向锁

HotSpot 的作者经过研究发现,大多数情况下,锁不仅不存在多线程竞争,还存在锁由同一线程多次获得的情况,偏向锁就是在这种情况下出现的,它的出现是为了解决只有在一个线程执行同步时提高性能。

image.png

可以从对象头的分配中看到,偏向锁要比无锁多了线程IDepoch,下面我们就来描述一下偏向锁的获取过程

偏向锁获取过程

  1. 首先线程访问同步代码块,会通过检查对象头 Mark Word 的锁标志位判断目前锁的状态,如果是 01,说明就是无锁或者偏向锁,然后再根据是否偏向锁 的标示判断是无锁还是偏向锁,如果是无锁情况下,执行下一步
  2. 线程使用 CAS 操作来尝试对对象加锁,如果使用 CAS 替换 ThreadID 成功,就说明是第一次上锁,那么当前线程就会获得对象的偏向锁,此时会在对象头的 Mark Word 中记录当前线程 ID 和获取锁的时间 epoch 等信息,然后执行同步代码块。
全局安全点(Safe Point):全局安全点的理解会涉及到 C 语言底层的一些知识,这里简单理解 SafePoint 是 Java 代码中的一个线程可能暂停执行的位置。

等到下一次线程在进入和退出同步代码块时就不需要进行 CAS 操作进行加锁和解锁,只需要简单判断一下对象头的 Mark Word 中是否存储着指向当前线程的线程ID,判断的标志当然是根据锁的标志位来判断的。如果用流程图来表示的话就是下面这样
image.png

关闭偏向锁

偏向锁在Java 6 和Java 7 里是默认启用的。由于偏向锁是为了在只有一个线程执行同步块时提高性能,如果你确定应用程序里所有的锁通常情况下处于竞争状态,可以通过JVM参数关闭偏向锁:-XX:-UseBiasedLocking=false,那么程序默认会进入轻量级锁状态。

关于 epoch

偏向锁的对象头中有一个被称为 epoch 的值,它作为偏差有效性的时间戳。

轻量级锁

轻量级锁是指当前锁是偏向锁的时候,资源被另外的线程所访问,那么偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能,下面是详细的获取过程。

轻量级锁加锁过程

  1. 紧接着上一步,如果 CAS 操作替换 ThreadID 没有获取成功,执行下一步
  2. 如果使用 CAS 操作替换 ThreadID 失败(这时候就切换到另外一个线程的角度)说明该资源已被同步访问过,这时候就会执行锁的撤销操作,撤销偏向锁,然后等原持有偏向锁的线程到达全局安全点(SafePoint)时,会暂停原持有偏向锁的线程,然后会检查原持有偏向锁的状态,如果已经退出同步,就会唤醒持有偏向锁的线程,执行下一步
  3. 检查对象头中的 Mark Word 记录的是否是当前线程 ID,如果是,执行同步代码,如果不是,执行偏向锁获取流程 的第2步。

如果用流程表示的话就是下面这样(已经包含偏向锁的获取)

image.png

重量级锁

重量级锁的获取流程比较复杂,小伙伴们做好准备,其实多看几遍也没那么麻烦,呵呵。

重量级锁的获取流程

  1. 接着上面偏向锁的获取过程,由偏向锁升级为轻量级锁,执行下一步
  2. 会在原持有偏向锁的线程的栈中分配锁记录,将对象头中的 Mark Word 拷贝到原持有偏向锁线程的记录中,然后原持有偏向锁的线程获得轻量级锁,然后唤醒原持有偏向锁的线程,从安全点处继续执行,执行完毕后,执行下一步,当前线程执行第4步
  3. 执行完毕后,开始轻量级解锁操作,解锁需要判断两个条件

    • 判断对象头中的 Mark Word 中锁记录指针是否指向当前栈中记录的指针

image.png

  • 拷贝在当前线程锁记录的 Mark Word 信息是否与对象头中的 Mark Word 一致。

如果上面两个判断条件都符合的话,就进行锁释放,如果其中一个条件不符合,就会释放锁,并唤起等待的线程,进行新一轮的锁竞争。

  1. 在当前线程的栈中分配锁记录,拷贝对象头中的 MarkWord 到当前线程的锁记录中,执行 CAS 加锁操作,会把对象头 Mark Word 中锁记录指针指向当前线程锁记录,如果成功,获取轻量级锁,执行同步代码,然后执行第3步,如果不成功,执行下一步
  2. 当前线程没有使用 CAS 成功获取锁,就会自旋一会儿,再次尝试获取,如果在多次自旋到达上限后还没有获取到锁,那么轻量级锁就会升级为 重量级锁

image.png

如果用流程图表示是这样的
image.png

锁的公平性与非公平性

我们知道,在并发环境中,多个线程需要对同一资源进行访问,同一时刻只能有一个线程能够获取到锁并进行资源访问,那么剩下的这些线程怎么办呢?这就好比食堂排队打饭的模型,最先到达食堂的人拥有最先买饭的权利,那么剩下的人就需要在第一个人后面排队,这是理想的情况,即每个人都能够买上饭。那么现实情况是,在你排队的过程中,就有个别不老实的人想走捷径,插队打饭,如果插队的这个人后面没有人制止他这种行为,他就能够顺利买上饭,如果有人制止,他就也得去队伍后面排队。

对于正常排队的人来说,没有人插队,每个人都在等待排队打饭的机会,那么这种方式对每个人来说都是公平的,先来后到嘛。这种锁也叫做公平锁。

image.png

那么假如插队的这个人成功买上饭并且在买饭的过程不管有没有人制止他,他的这种行为对正常排队的人来说都是不公平的,这在锁的世界中也叫做非公平锁。

image.png

image.png

那么我们根据上面的描述可以得出下面的结论

公平锁表示线程获取锁的顺序是按照线程加锁的顺序来分配的,即先来先得的FIFO先进先出顺序。而非公平锁就是一种获取锁的抢占机制,是随机获得锁的,和公平锁不一样的就是先来的不一定先得到锁,这个方式可能造成某些线程一直拿不到锁,结果也就是不公平的了。

锁公平性的实现

在 Java 中,我们一般通过 ReetrantLock 来实现锁的公平性

我们分别通过两个例子来讲解一下锁的公平性和非公平性

锁的公平性

public class MyFairLock extends Thread{

    private ReentrantLock lock = new ReentrantLock(true);
    public void fairLock(){
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName()  + "正在持有锁");
        }finally {
            System.out.println(Thread.currentThread().getName()  + "释放了锁");
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        MyFairLock myFairLock = new MyFairLock();
        Runnable runnable = () -> {
            System.out.println(Thread.currentThread().getName() + "启动");
            myFairLock.fairLock();
        };
        Thread[] thread = new Thread[10];
        for(int i = 0;i < 10;i++){
            thread[i] = new Thread(runnable);
        }
        for(int i = 0;i < 10;i++){
            thread[i].start();
        }
    }
}

我们创建了一个 ReetrantLock,并给构造函数传了一个 true,我们可以查看 ReetrantLock 的构造函数

public ReentrantLock(boolean fair) {
  sync = fair ? new FairSync() : new NonfairSync();
}

根据 JavaDoc 的注释可知,如果是 true 的话,那么就会创建一个 ReentrantLock 的公平锁,然后并创建一个 FairSync ,FairSync 其实是一个 Sync 的内部类,它的主要作用是同步对象以获取公平锁。

image.png

而 Sync 是 ReentrantLock 中的内部类,Sync 继承 AbstractQueuedSynchronizer 类,AbstractQueuedSynchronizer 就是我们常说的 AQS ,它是 JUC(java.util.concurrent) 中最重要的一个类,通过它来实现独占锁和共享锁。

abstract static class Sync extends AbstractQueuedSynchronizer {...}

也就是说,我们把 fair 参数设置为 true 之后,就可以实现一个公平锁了,是这样吗?我们回到示例代码,我们可以执行一下这段代码,它的输出是顺序获取的(碍于篇幅的原因,这里就暂不贴出了),也就是说我们创建了一个公平锁

锁的非公平性

与公平性相对的就是非公平性,我们通过设置 fair 参数为 true,便实现了一个公平锁,与之相对的,我们把 fair 参数设置为 false,是不是就是非公平锁了?用事实证明一下

private ReentrantLock lock = new ReentrantLock(false);

其他代码不变,我们执行一下看看输出(部分输出)

Thread-1启动
Thread-4启动
Thread-1正在持有锁
Thread-1释放了锁
Thread-5启动
Thread-6启动
Thread-3启动
Thread-7启动
Thread-2启动

可以看到,线程的启动并没有按顺序获取,可以看出非公平锁对锁的获取是乱序的,即有一个抢占锁的过程。也就是说,我们把 fair 参数设置为 false 便实现了一个非公平锁。

ReentrantLock 基本概述

ReentrantLock 是一把可重入锁,也是一把互斥锁,它具有与 synchronized 相同的方法和监视器锁的语义,但是它比 synchronized 有更多可扩展的功能。

ReentrantLock 的可重入性是指它可以由上次成功锁定但还未解锁的线程拥有。当只有一个线程尝试加锁时,该线程调用 lock() 方法会立刻返回成功并直接获取锁。如果当前线程已经拥有这把锁,这个方法会立刻返回。可以使用 isHeldByCurrentThreadgetHoldCount 进行检查。

这个类的构造函数接受可选择的 fairness 参数,当 fairness 设置为 true 时,在多线程争夺尝试加锁时,锁倾向于对等待时间最长的线程访问,这也是公平性的一种体现。否则,锁不能保证每个线程的访问顺序,也就是非公平锁。与使用默认设置的程序相比,使用许多线程访问的公平锁的程序可能会显示较低的总体吞吐量(即较慢;通常要慢得多)。但是获取锁并保证线程不会饥饿的次数比较小。无论如何请注意:锁的公平性不能保证线程调度的公平性。因此,使用公平锁的多线程之一可能会连续多次获得它,而其他活动线程没有进行且当前未持有该锁。这也是互斥性 的一种体现。

也要注意的 tryLock() 方法不支持公平性。如果锁是可以获取的,那么即使其他线程等待,它仍然能够返回成功。

推荐使用下面的代码来进行加锁和解锁

class MyFairLock {
  private final ReentrantLock lock = new ReentrantLock();

  public void m() {
    lock.lock();  
    try {
      // ... 
    } finally {
      lock.unlock()
    }
  }
}

ReentrantLock 锁通过同一线程最多支持2147483647个递归锁。 尝试超过此限制会导致锁定方法引发错误。

ReentrantLock 如何实现锁公平性

我们在上面的简述中提到,ReentrantLock 是可以实现锁的公平性的,那么原理是什么呢?下面我们通过其源码来了解一下 ReentrantLock 是如何实现锁的公平性的

跟踪其源码发现,调用 Lock.lock() 方法其实是调用了 sync 的内部的方法

abstract void lock();

而 sync 是最基础的同步控制 Lock 的类,它有公平锁和非公平锁的实现。它继承 AbstractQueuedSynchronizer 即 使用 AQS 状态代表锁持有的数量。

lock 是抽象方法是需要被子类实现的,而继承了 AQS 的类主要有

image.png

我们可以看到,所有实现了 AQS 的类都位于 JUC 包下,主要有五类:ReentrantLockReentrantReadWriteLockSemaphoreCountDownLatchThreadPoolExecutor,其中 ReentrantLock、ReentrantReadWriteLock、Semaphore 都可以实现公平锁和非公平锁。

下面是公平锁 FairSync 的继承关系

image.png

非公平锁的NonFairSync 的继承关系

image.png

由继承图可以看到,两个类的继承关系都是相同的,我们从源码发现,公平锁和非公平锁的实现就是下面这段代码的区别(下一篇文章我们会从原理角度分析一下公平锁和非公平锁的实现)

image.png

通过上图中的源代码对比,我们可以明显的看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()

hasQueuedPredecessors() 也是 AQS 中的方法,它主要是用来 查询是否有任何线程在等待获取锁的时间比当前线程长,也就是说每个等待线程都是在一个队列中,此方法就是判断队列中在当前线程获取锁时,是否有等待锁时间比自己还长的队列,如果当前线程之前有排队的线程,返回 true,如果当前线程位于队列的开头或队列为空,返回 false。

综上,公平锁就是通过同步队列来实现多个线程按照申请锁的顺序来获取锁,从而实现公平的特性。非公平锁加锁时不考虑排队等待问题,直接尝试获取锁,所以存在后申请却先获得锁的情况。

根据锁是否可重入进行区分

可重入锁

可重入锁又称为递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者class),不会因为之前已经获取过还没释放而阻塞。Java 中 ReentrantLocksynchronized 都是可重入锁,可重入锁的一个优点是在一定程度上可以避免死锁。

我们先来看一段代码来说明一下 synchronized 的可重入性

private synchronized void doSomething(){
  System.out.println("doSomething...");
  doSomethingElse();
}

private synchronized void doSomethingElse(){
  System.out.println("doSomethingElse...");
}

在上面这段代码中,我们对 doSomething()doSomethingElse() 分别使用了 synchronized 进行锁定,doSomething() 方法中调用了 doSomethingElse() 方法,因为 synchronized 是可重入锁,所以同一个线程在调用 doSomething() 方法时,也能够进入 doSomethingElse() 方法中。

不可重入锁

如果 synchronized 是不可重入锁的话,那么在调用 doSomethingElse() 方法的时候,必须把 doSomething() 的锁丢掉,实际上该对象锁已被当前线程所持有,且无法释放。所以此时会出现死锁。

也就是说,不可重入锁会造成死锁

多个线程能够共享同一把锁

独占锁和共享锁

独占多和共享锁一般对应 JDK 源码的 ReentrantLock 和 ReentrantReadWriteLock 源码来介绍独占锁和共享锁。

独占锁又叫做排他锁,是指锁在同一时刻只能被一个线程拥有,其他线程想要访问资源,就会被阻塞。JDK 中 synchronized和 JUC 中 Lock 的实现类就是互斥锁。

共享锁指的是锁能够被多个线程所拥有,如果某个线程对资源加上共享锁后,则其他线程只能对资源再加共享锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据

image.png

我们看到 ReentrantReadWriteLock 有两把锁:ReadLockWriteLock,也就是一个读锁一个写锁,合在一起叫做读写锁。再进一步观察可以发现 ReadLock 和 WriteLock 是靠内部类 Sync 实现的锁。Sync 是继承于 AQS 子类的,AQS 是并发的根本,这种结构在CountDownLatch、ReentrantLock、Semaphore里面也都存在。

在 ReentrantReadWriteLock 里面,读锁和写锁的锁主体都是 Sync,但读锁和写锁的加锁方式不一样。读锁是共享锁,写锁是独享锁。读锁的共享锁可保证并发读非常高效,而读写、写读、写写的过程互斥,因为读锁和写锁是分离的。所以ReentrantReadWriteLock的并发性相比一般的互斥锁有了很大提升。

查看原文

ttqtc 收藏了文章 · 1月16日

Kafka 的这些原理你知道吗

如果只是为了开发 Kafka 应用程序,或者只是在生产环境使用 Kafka,那么了解 Kafka 的内部工作原理不是必须的。不过,了解 Kafka 的内部工作原理有助于理解 Kafka 的行为,也利用快速诊断问题。下面我们来探讨一下这三个问题

  • Kafka 是如何进行复制的
  • Kafka 是如何处理来自生产者和消费者的请求的
  • Kafka 的存储细节是怎样的

如果感兴趣的话,就请花费你一些时间,耐心看完这篇文章。

集群成员间的关系

我们知道,Kafka 是运行在 ZooKeeper 之上的,因为 ZooKeeper 是以集群形式出现的,所以 Kafka 也可以以集群形式出现。这也就涉及到多个生产者和多个消费者如何协调的问题,这个维护集群间的关系也是由 ZooKeeper 来完成的。如果你看过我之前的文章(真的,关于 Kafka 入门看这一篇就够了),你应该会知道,Kafka 集群间会有多个 主机(broker),每个 broker 都会有一个 broker.id,每个 broker.id 都有一个唯一的标识符用来区分,这个标识符可以在配置文件里手动指定,也可以自动生成。

Kafka 可以通过 broker.id.generation.enable 和 reserved.broker.max.id 来配合生成新的 broker.id。

broker.id.generation.enable参数是用来配置是否开启自动生成 broker.id 的功能,默认情况下为true,即开启此功能。自动生成的broker.id有一个默认值,默认值为1000,也就是说默认情况下自动生成的 broker.id 从1001开始。

Kafka 在启动时会在 ZooKeeper 中 /brokers/ids 路径下注册一个与当前 broker 的 id 相同的临时节点。Kafka 的健康状态检查就依赖于此节点。当有 broker 加入集群或者退出集群时,这些组件就会获得通知。

  • 如果你要启动另外一个具有相同 ID 的 broker,那么就会得到一个错误 —— 新的 broker 会试着进行注册,但不会成功,因为 ZooKeeper 里面已经有一个相同 ID 的 broker。
  • 在 broker 停机、出现分区或者长时间垃圾回收停顿时,broker 会从 ZooKeeper 上断开连接,此时 broker 在启动时创建的临时节点会从 ZooKeeper 中移除。监听 broker 列表的 Kafka 组件会被告知该 broker 已移除。
  • 在关闭 broker 时,它对应的节点也会消失,不过它的 ID 会继续存在其他数据结构中,例如主题的副本列表中,副本列表复制我们下面再说。在完全关闭一个 broker 之后,如果使用相同的 ID 启动另一个全新的 broker,它会立刻加入集群,并拥有一个与旧 broker 相同的分区和主题。

Broker Controller 的作用

我们之前在讲 Kafka Rebalance 重平衡的时候,提过一个群组协调器,负责协调群组间的关系,那么 broker 之间也有一个控制器组件(Controller),它是 Kafka 的核心组件。它的主要作用是在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群,集群中的每个 broker 都可以称为 controller,但是在 Kafka 集群启动后,只有一个 broker 会成为 Controller 。既然 Kafka 集群是依赖于 ZooKeeper 集群的,所以有必要先介绍一下 ZooKeeper 是什么,可以参考作者的这一篇文章(ZooKeeper不仅仅是注册中心,你还知道有哪些?)详细了解,在这里就简单提一下 znode 节点的问题。

ZooKeeper 的数据是保存在节点上的,每个节点也被称为znode,znode 节点是一种树形的文件结构,它很像 Linux 操作系统的文件路径,ZooKeeper 的根节点是 /

image.png

znode 根据数据的持久化方式可分为临时节点和持久性节点。持久性节点不会因为 ZooKeeper 状态的变化而消失,但是临时节点会随着 ZooKeeper 的重启而自动消失。

znode 节点有一个 Watcher 机制:当数据发生变化的时候, ZooKeeper 会产生一个 Watcher 事件,并且会发送到客户端。Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 Zookeeper 上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 ZooKeeper 实现分布式锁、集群管理等功能。

控制器的选举

Kafka 当前选举控制器的规则是:Kafka 集群中第一个启动的 broker 通过在 ZooKeeper 里创建一个临时节点 /controller 让自己成为 controller 控制器。其他 broker 在启动时也会尝试创建这个节点,但是由于这个节点已存在,所以后面想要创建 /controller 节点时就会收到一个 节点已存在 的异常。然后其他 broker 会在这个控制器上注册一个 ZooKeeper 的 watch 对象,/controller 节点发生变化时,其他 broker 就会收到节点变更通知。这种方式可以确保只有一个控制器存在。那么只有单独的节点一定是有个问题的,那就是单点问题

image.png

如果控制器关闭或者与 ZooKeeper 断开链接,ZooKeeper 上的临时节点就会消失。集群中的其他节点收到 watch 对象发送控制器下线的消息后,其他 broker 节点都会尝试让自己去成为新的控制器。其他节点的创建规则和第一个节点的创建原则一致,都是第一个在 ZooKeeper 里成功创建控制器节点的 broker 会成为新的控制器,那么其他节点就会收到节点已存在的异常,然后在新的控制器节点上再次创建 watch 对象进行监听。

image.png

控制器的作用

那么说了这么多,控制是什么呢?控制器的作用是什么呢?或者说控制器的这么一个组件被设计用来干什么?别着急,接下来我们就要说一说。

Kafka 被设计为一种模拟状态机的多线程控制器,它可以作用有下面这几点

  • 控制器相当于部门(集群)中的部门经理(broker controller),用于管理部门中的部门成员(broker)
  • 控制器是所有 broker 的一个监视器,用于监控 broker 的上线和下线
  • 在 broker 宕机后,控制器能够选举新的分区 Leader
  • 控制器能够和 broker 新选取的 Leader 发送消息

再细分一下可以具体分为如下 5 点

  • 主题管理 : Kafka Controller 可以帮助我们完成对 Kafka 主题创建、删除和增加分区的操作,简而言之就是对分区拥有最高行使权。

换句话说,当我们执行kafka-topics 脚本时,大部分的后台工作都是控制器来完成的。

  • 分区重分配: 分区重分配主要是指,kafka-reassign-partitions 脚本提供的对已有主题分区进行细粒度的分配功能。这部分功能也是控制器实现的。
  • Prefered 领导者选举 : Preferred 领导者选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案。
  • 集群成员管理: 主要管理 新增 broker、broker 关闭、broker 宕机
  • 数据服务: 控制器的最后一大类工作,就是向其他 broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。这些数据我们会在下面讨论

当控制器发现一个 broker 离开集群(通过观察相关 ZooKeeper 路径),控制器会收到消息:这个 broker 所管理的那些分区需要一个新的 Leader。控制器会依次遍历每个分区,确定谁能够作为新的 Leader,然后向所有包含新 Leader 或现有 Follower 的分区发送消息,该请求消息包含谁是新的 Leader 以及谁是 Follower 的信息。随后,新的 Leader 开始处理来自生产者和消费者的请求,Follower 用于从新的 Leader 那里进行复制。

这就很像外包公司的一个部门,这个部门就是专门出差的,每个人在不同的地方办公,但是中央总部有一个部门经理,现在部门经理突然离职了。公司不打算外聘人员,决定从部门内部选一个能力强的人当领导,然后当上领导的人需要向自己的组员发送消息,这条消息就是任命消息和明确他管理了哪些人,大家都知道了,然后再各自给部门干活。

当控制器发现一个 broker 加入集群时,它会使用 broker ID 来检查新加入的 broker 是否包含现有分区的副本。如果有控制器就会把消息发送给新加入的 broker 和 现有的 broker。

上面这块关于分区复制的内容我们接下来会说到。

broker controller 数据存储

上面我们介绍到 broker controller 会提供数据服务,用于保存大量的 Kafka 集群数据。如下图

image.png

可以对上面保存信息归类,主要分为三类

  • broker 上的所有信息,包括 broker 中的所有分区,broker 所有分区副本,当前都有哪些运行中的 broker,哪些正在关闭中的 broker 。
  • 所有主题信息,包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。
  • 所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表。

Kafka 是离不开 ZooKeeper的,所以这些数据信息在 ZooKeeper 中也保存了一份。每当控制器初始化时,它都会从 ZooKeeper 上读取对应的元数据并填充到自己的缓存中。

broker controller 故障转移

我们在前面说过,第一个在 ZooKeeper 中的 /brokers/ids下创建节点的 broker 作为 broker controller,也就是说 broker controller 只有一个,那么必然会存在单点失效问题。kafka 为考虑到这种情况提供了故障转移功能,也就是 Fail Over。如下图

image.png

最一开始,broker1 会抢先注册成功成为 controller,然后由于网络抖动或者其他原因致使 broker1 掉线,ZooKeeper 通过 Watch 机制觉察到 broker1 的掉线,之后所有存活的 brokers 开始竞争成为 controller,这时 broker3 抢先注册成功,此时 ZooKeeper 存储的 controller 信息由 broker1 -> broker3,之后,broker3 会从 ZooKeeper 中读取元数据信息,并初始化到自己的缓存中。

注意:ZooKeeper 中存储的不是缓存信息,broker 中存储的才是缓存信息。

broker controller 存在的问题

在 Kafka 0.11 版本之前,控制器的设计是相当繁琐的。我们上面提到过一句话:Kafka controller 被设计为一种模拟状态机的多线程控制器,这种设计其实是存在一些问题的

  • controller 状态的更改由不同的监听器并罚执行,因此需要进行很复杂的同步,并且容易出错而且难以调试。
  • 状态传播不同步,broker 可能在时间不确定的情况下出现多种状态,这会导致不必要的额外的数据丢失
  • controller 控制器还会为主题删除创建额外的 I/O 线程,导致性能损耗
  • controller 的多线程设计还会访问共享数据,我们知道,多线程访问共享数据是线程同步最麻烦的地方,为了保护数据安全性,控制器不得不在代码中大量使用ReentrantLock 同步机制,这就进一步拖慢了整个控制器的处理速度。

broker controller 内部设计原理

在 Kafka 0.11 之后,Kafka controller 采用了新的设计,把多线程的方案改成了单线程加事件队列的方案。如下图所示

image.png

主要所做的改变有下面这几点

第一个改进是增加了一个 Event Executor Thread,事件执行线程,从图中可以看出,不管是 Event Queue 事件队列还是 Controller context 控制器上下文都会交给事件执行线程进行处理。将原来执行的操作全部建模成一个个独立的事件,发送到专属的事件队列中,供此线程消费。

第二个改进是将之前同步的 ZooKeeper 全部改为异步操作。ZooKeeper API 提供了两种读写的方式:同步和异步。之前控制器操作 ZooKeeper 都是采用的同步方式,这次把同步方式改为异步,据测试,效率提升了10倍。

第三个改进是根据优先级处理请求,之前的设计是 broker 会公平性的处理所有 controller 发送的请求。什么意思呢?公平性难道还不好吗?在某些情况下是的,比如 broker 在排队处理 produce 请求,这时候 controller 发出了一个 StopReplica 的请求,你会怎么办?还在继续处理 produce 请求吗?这个 produce 请求还有用吗?此时最合理的处理顺序应该是,赋予 StopReplica 请求更高的优先级,使它能够得到抢占式的处理。

副本机制

复制功能是 Kafka 架构的核心功能,在 Kafka 文档里面 Kafka 把自己描述为 一个分布式的、可分区的、可复制的提交日志服务。复制之所以这么关键,是因为消息的持久存储非常重要,这能够保证在主节点宕机后依旧能够保证 Kafka 高可用。副本机制也可以称为备份机制(Replication),通常指分布式系统在多台网络交互的机器上保存有相同的数据备份/拷贝。

Kafka 使用主题来组织数据,每个主题又被分为若干个分区,分区会部署在一到多个 broker 上,每个分区都会有多个副本,所以副本也会被保存在 broker 上,每个 broker 可能会保存成千上万个副本。下图是一个副本复制示意图

image.png

如上图所示,为了简单我只画出了两个 broker ,每个 broker 指保存了一个 Topic 的消息,在 broker1 中分区0 是Leader,它负责进行分区的复制工作,把 broker1 中的分区0复制一个副本到 broker2 的主题 A 的分区0。同理,主题 A 的分区1也是一样的道理。

副本类型分为两种:一种是 Leader(领导者) 副本,一种是Follower(跟随者)副本。

Leader 副本

Kafka 在创建分区的时候都要选举一个副本,这个选举出来的副本就是 Leader 领导者副本。

Follower 副本

除了 Leader 副本以外的副本统称为 Follower 副本,Follower 不对外提供服务。下面是 Leader 副本的工作方式

image.png

这幅图需要注意以下几点

  • Kafka 中,Follower 副本也就是追随者副本是不对外提供服务的。这就是说,任何一个追随者副本都不能响应消费者和生产者的请求。所有的请求都是由领导者副本来处理。或者说,所有的请求都必须发送到 Leader 副本所在的 broker 中,Follower 副本只是用做数据拉取,采用异步拉取的方式,并写入到自己的提交日志中,从而实现与 Leader 的同步
  • 当 Leader 副本所在的 broker 宕机后,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并开启新一轮的选举,从追随者副本中选一个作为 Leader。如果宕机的 broker 重启完成后,该分区的副本会作为 Follower 重新加入。

首领的另一个任务是搞清楚哪个跟随者的状态与自己是一致的。跟随者为了保证与领导者的状态一致,在有新消息到达之前先尝试从领导者那里复制消息。为了与领导者保持一致,跟随者向领导者发起获取数据的请求,这种请求与消费者为了读取消息而发送的信息是一样的。

跟随者向领导者发送消息的过程是这样的,先请求消息1,然后再接收到消息1,在时候到请求1之后,发送请求2,在收到领导者给发送给跟随者之前,跟随者是不会继续发送消息的。这个过程如下

image.png

跟随者副本在收到响应消息前,是不会继续发送消息,这一点很重要。通过查看每个跟随者请求的最新偏移量,首领就会知道每个跟随者复制的进度。如果跟随者在10s 内没有请求任何消息,或者虽然跟随者已经发送请求,但是在10s 内没有收到消息,就会被认为是不同步的。如果一个副本没有与领导者同步,那么在领导者掉线后,这个副本将不会称为领导者,因为这个副本的消息不是全部的。

与之相反的,如果跟随者同步的消息和领导者副本的消息一致,那么这个跟随者副本又被称为同步的副本。也就是说,如果领导者掉线,那么只有同步的副本能够称为领导者。

关于副本机制我们说了这么多,那么副本机制的好处是什么呢?

  • 能够立刻看到写入的消息,就是你使用生产者 API 成功向分区写入消息后,马上使用消费者就能读取刚才写入的消息
  • 能够实现消息的幂等性,啥意思呢?就是对于生产者产生的消息,在消费者进行消费的时候,它每次都会看到消息存在,并不会存在消息不存在的情况

同步复制和异步复制

我在学习副本机制的时候,有个疑问,既然领导者副本和跟随者副本是发送 - 等待机制的,这是一种同步的复制方式,那么为什么说跟随者副本同步领导者副本的时候是一种异步操作呢?

我认为是这样的,跟随者副本在同步领导者副本后会把消息保存在本地 log 中,这个时候跟随者会给领导者副本一个响应消息,告诉领导者自己已经保存成功了,同步复制的领导者会等待所有的跟随者副本都写入成功后,再返回给 producer 写入成功的消息。而异步复制是领导者副本不需要关心跟随者副本是否写入成功,只要领导者副本自己把消息保存到本地 log ,就会返回给 producer 写入成功的消息。下面是同步复制和异步复制的过程

同步复制

  • producer 通知 ZooKeeper 识别领导者
  • producer 向领导者写入消息
  • 领导者收到消息后会把消息写入到本地 log
  • 跟随者会从领导者那里拉取消息
  • 跟随者向本地写入 log
  • 跟随者向领导者发送写入成功的消息
  • 领导者会收到所有的跟随者发送的消息
  • 领导者向 producer 发送写入成功的消息

异步复制

和同步复制的区别在于,领导者在写入本地log之后,直接向客户端发送写入成功消息,不需要等待所有跟随者复制完成。

ISR

Kafka动态维护了一个同步状态的副本的集合(a set of In-Sync Replicas),简称ISR,ISR 也是一个很重要的概念,我们之前说过,追随者副本不提供服务,只是定期的异步拉取领导者副本的数据而已,拉取这个操作就相当于是复制,ctrl-c + ctrl-v大家肯定用的熟。那么是不是说 ISR 集合中的副本消息的数量都会与领导者副本消息数量一样呢?那也不一定,判断的依据是 broker 中参数 replica.lag.time.max.ms 的值,这个参数的含义就是跟随者副本能够落后领导者副本最长的时间间隔。

replica.lag.time.max.ms 参数默认的时间是 10秒,如果跟随者副本落后领导者副本的时间不超过 10秒,那么 Kafka 就认为领导者和跟随者是同步的。即使此时跟随者副本中存储的消息要小于领导者副本。如果跟随者副本要落后于领导者副本 10秒以上的话,跟随者副本就会从 ISR 被剔除。倘若该副本后面慢慢地追上了领导者的进度,那么它是能够重新被加回 ISR 的。这也表明,ISR 是一个动态调整的集合,而非静态不变的。

Unclean 领导者选举

既然 ISR 是可以动态调整的,那么必然会出现 ISR 集合中为空的情况,由于领导者副本是一定出现在 ISR 集合中的,那么 ISR 集合为空必然说明领导者副本也挂了,所以此时 Kafka 需要重新选举一个新的领导者,那么该如何选举呢?现在你需要转变一下思路,我们上面说 ISR 集合中一定是与领导者同步的副本,那么不再 ISR 集合中的副本一定是不与领导者同步的副本了,也就是不再 ISR 列表中的跟随者副本会丢失一些消息。如果你开启 broker 端参数 unclean.leader.election.enable的话,下一个领导者就会在这些非同步的副本中选举。这种选举也叫做Unclean 领导者选举

如果你接触过分布式项目的话你一定知道 CAP 理论,那么这种 Unclean 领导者选举其实是牺牲了数据一致性,保证了 Kafka 的高可用性。

你可以根据你的实际业务场景决定是否开启 Unclean 领导者选举,一般不建议开启这个参数,因为数据的一致性要比可用性重要的多。

Kafka 请求处理流程

broker 的大部分工作是处理客户端、分区副本和控制器发送给分区领导者的请求。这种请求一般都是请求/响应式的,我猜测你接触最早的请求/响应的方式应该就是 HTTP 请求了。事实上,HTTP 请求可以是同步可以是异步的。一般正常的 HTTP 请求都是同步的,同步方式最大的一个特点是提交请求->等待服务器处理->处理完毕返回 这个期间客户端浏览器不能做任何事。而异步方式最大的特点是 请求通过事件触发->服务器处理(这是浏览器仍然可以作其他事情)-> 处理完毕

那么我也可以说同步请求就是顺序处理的,而异步请求的执行方式则不确定,因为异步需要创建多个执行线程,而每个线程的执行顺序不同。

这里需要注意一点,我们只是使用 HTTP 请求来举例子,而 Kafka 采用的是 TCP 基于 Socket 的方式进行通讯

那么这两种方式有什么缺点呢?

我相信聪明的你应该能马上想到,同步的方式最大的缺点就是吞吐量太差,资源利用率极低,由于只能顺序处理请求,因此,每个请求都必须等待前一个请求处理完毕才能得到处理。这种方式只适用于请求发送非常不频繁的系统

异步的方式的缺点就是为每个请求都创建线程的做法开销极大,在某些场景下甚至会压垮整个服务。

响应式模型

说了这么半天,Kafka 采用同步还是异步的呢?都不是,Kafka 采用的是一种 响应式(Reactor)模型,那么什么是响应式模型呢?简单的说,Reactor 模式是事件驱动架构的一种实现方式,特别适合应用于处理多个客户端并发向服务器端发送请求的场景,如下图所示

image.png

Kafka 的 broker 端有个 SocketServer组件,类似于处理器,SocketServer 是基于 TCP 的 Socket 连接的,它用于接受客户端请求,所有的请求消息都包含一个消息头,消息头中都包含如下信息

  • Request type (也就是 API Key)
  • Request version(broker 可以处理不同版本的客户端请求,并根据客户版本做出不同的响应)
  • Correlation ID --- 一个具有唯一性的数字,用于标示请求消息,同时也会出现在响应消息和错误日志中(用于诊断问题)
  • Client ID --- 用于标示发送请求的客户端

broker 会在它所监听的每一个端口上运行一个 Acceptor 线程,这个线程会创建一个连接,并把它交给 Processor(网络线程池), Processor 的数量可以使用 num.network.threads 进行配置,其默认值是3,表示每台 broker 启动时会创建3个线程,专门处理客户端发送的请求。

Acceptor 线程会采用轮询的方式将入栈请求公平的发送至网络线程池中,因此,在实际使用过程中,这些线程通常具有相同的机率被分配到待处理请求队列中,然后从响应队列获取响应消息,把它们发送给客户端。Processor 网络线程池中的请求 - 响应的处理还是比较复杂的,下面是网络线程池中的处理流程图

image.png

Processor 网络线程池接收到客户和其他 broker 发送来的消息后,网络线程池会把消息放到请求队列中,注意这个是共享请求队列,因为网络线程池是多线程机制的,所以请求队列的消息是多线程共享的区域,然后由 IO 线程池进行处理,根据消息的种类判断做何处理,比如 PRODUCE 请求,就会将消息写入到 log 日志中,如果是FETCH请求,则从磁盘或者页缓存中读取消息。也就是说,IO线程池是真正做判断,处理请求的一个组件。在IO 线程池处理完毕后,就会判断是放入响应队列中还是 Purgatory 中,Purgatory 是什么我们下面再说,现在先说一下响应队列,响应队列是每个线程所独有的,因为响应式模型中不会关心请求发往何处,因此把响应回传的事情就交给每个线程了,所以也就不必共享了。

注意:IO 线程池可以通过 broker 端参数 num.io.threads 来配置,默认的线程数是8,表示每台 broker 启动后自动创建 8 个IO 处理线程。

请求类型

下面是几种常见的请求类型

生产请求

我在 真的,关于 Kafka 入门看这一篇就够了 文章中提到过 acks 这个配置项的含义

简单来讲就是不同的配置对写入成功的界定是不同的,如果 acks = 1,那么只要领导者收到消息就表示写入成功,如果acks = 0,表示只要领导者发送消息就表示写入成功,根本不用考虑返回值的影响。如果 acks = all,就表示领导者需要收到所有副本的消息后才表示写入成功。

在消息被写入分区的首领后,如果 acks 配置的值是 all,那么这些请求会被保存在 炼狱(Purgatory)的缓冲区中,直到领导者副本发现跟随者副本都复制了消息,响应才会发送给客户端。

获取请求

broker 获取请求的方式与处理生产请求的方式类似,客户端发送请求,向 broker 请求主题分区中特定偏移量的消息,如果偏移量存在,Kafka 会采用 零复制 技术向客户端发送消息,Kafka 会直接把消息从文件中发送到网络通道中,而不需要经过任何的缓冲区,从而获得更好的性能。

客户端可以设置获取请求数据的上限和下限,上限指的是客户端为接受足够消息分配的内存空间,这个限制比较重要,如果上限太大的话,很有可能直接耗尽客户端内存。下限可以理解为攒足了数据包再发送的意思,这就相当于项目经理给程序员分配了 10 个bug,程序员每次改一个 bug 就会向项目经理汇报一下,有的时候改好了有的时候可能还没改好,这样就增加了沟通成本和时间成本,所以下限值得就是程序员你改完10个 bug 再向我汇报!!!如下图所示

image.png

如图你可以看到,在拉取消息 ---> 消息 之间是有一个等待消息积累这么一个过程的,这个消息积累你可以把它想象成超时时间,不过超时会跑出异常,消息积累超时后会响应回执。延迟时间可以通过 replica.lag.time.max.ms 来配置,它指定了副本在复制消息时可被允许的最大延迟时间。

元数据请求

生产请求和响应请求都必须发送给领导者副本,如果 broker 收到一个针对某个特定分区的请求,而该请求的首领在另外一个 broker 中,那么发送请求的客户端会收到非分区首领的错误响应;如果针对某个分区的请求被发送到不含有领导者的 broker 上,也会出现同样的错误。Kafka 客户端需要把请求和响应发送到正确的 broker 上。这不是废话么?我怎么知道要往哪发送?

事实上,客户端会使用一种 元数据请求 ,这种请求会包含客户端感兴趣的主题列表,服务端的响应消息指明了主题的分区,领导者副本和跟随者副本。元数据请求可以发送给任意一个 broker,因为所有的 broker 都会缓存这些信息。

一般情况下,客户端会把这些信息缓存,并直接向目标 broker 发送生产请求和相应请求,这些缓存需要隔一段时间就进行刷新,使用metadata.max.age.ms 参数来配置,从而知道元数据是否发生了变更。比如,新的 broker 加入后,会触发重平衡,部分副本会移动到新的 broker 上。这时候,如果客户端收到 不是首领的错误,客户端在发送请求之前刷新元数据缓存。

Kafka 重平衡流程

我在 真的,关于 Kafka 入门看这一篇就够了 中关于消费者描述的时候大致说了一下消费者组和重平衡之间的关系,实际上,归纳为一点就是让组内所有的消费者实例就消费哪些主题分区达成一致。

我们知道,一个消费者组中是要有一个群组协调者(Coordinator)的,而重平衡的流程就是由 Coordinator 的帮助下来完成的。

这里需要先声明一下重平衡发生的条件

  • 消费者订阅的任何主题发生变化
  • 消费者数量发生变化
  • 分区数量发生变化
  • 如果你订阅了一个还尚未创建的主题,那么重平衡在该主题创建时发生。如果你订阅的主题发生删除那么也会发生重平衡
  • 消费者被群组协调器认为是 DEAD 状态,这可能是由于消费者崩溃或者长时间处于运行状态下发生的,这意味着在配置合理时间的范围内,消费者没有向群组协调器发送任何心跳,这也会导致重平衡的发生。

在了解重平衡之前,你需要知道这两个角色

群组协调器(Coordinator):群组协调器是一个能够从消费者群组中收到所有消费者发送心跳消息的 broker。在最早期的版本中,元数据信息是保存在 ZooKeeper 中的,但是目前元数据信息存储到了 broker 中。每个消费者组都应该和群组中的群组协调器同步。当所有的决策要在应用程序节点中进行时,群组协调器可以满足 JoinGroup 请求并提供有关消费者组的元数据信息,例如分配和偏移量。群组协调器还有权知道所有消费者的心跳,消费者群组中还有一个角色就是领导者,注意把它和领导者副本和 kafka controller 进行区分。领导者是群组中负责决策的角色,所以如果领导者掉线了,群组协调器有权把所有消费者踢出组。因此,消费者群组的一个很重要的行为是选举领导者,并与协调器读取和写入有关分配和分区的元数据信息。

消费者领导者: 每个消费者群组中都有一个领导者。如果消费者停止发送心跳了,协调者会触发重平衡。

在了解重平衡之前,你需要知道状态机是什么

Kafka 设计了一套消费者组状态机(State Machine) ,来帮助协调者完成整个重平衡流程。消费者状态机主要有五种状态它们分别是 Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable

image.png
了解了这些状态的含义之后,下面我们用几条路径来表示一下消费者状态的轮转

消费者组一开始处于 Empty 状态,当重平衡开启后,它会被置于 PreparingRebalance 状态等待新消费者的加入,一旦有新的消费者加入后,消费者群组就会处于 CompletingRebalance 状态等待分配,只要有新的消费者加入群组或者离开,就会触发重平衡,消费者的状态处于 PreparingRebalance 状态。等待分配机制指定好后完成分配,那么它的流程图是这样的

image.png

在上图的基础上,当消费者群组都到达 Stable 状态后,一旦有新的消费者加入/离开/心跳过期,那么触发重平衡,消费者群组的状态重新处于 PreparingRebalance 状态。那么它的流程图是这样的。

image.png

在上图的基础上,消费者群组处于 PreparingRebalance 状态后,很不幸,没人玩儿了,所有消费者都离开了,这时候还可能会保留有消费者消费的位移数据,一旦位移数据过期或者被刷新,那么消费者群组就处于 Dead 状态了。它的流程图是这样的

image.png

在上图的基础上,我们分析了消费者的重平衡,在 PreparingRebalance或者 CompletingRebalance 或者 Stable 任意一种状态下发生位移主题分区 Leader 发生变更,群组会直接处于 Dead 状态,它的所有路径如下

image.png

这里面需要注意两点:

一般出现 Required xx expired offsets in xxx milliseconds 就表明Kafka 很可能就把该组的位移数据删除了

只有 Empty 状态下的组,才会执行过期位移删除的操作。

重平衡流程

上面我们了解到了消费者群组状态的转化过程,下面我们真正开始介绍 Rebalance 的过程。重平衡过程可以从两个方面去看:消费者端和协调者端,首先我们先看一下消费者端

从消费者看重平衡

从消费者看重平衡有两个步骤:分别是 消费者加入组等待领导者分配方案。这两个步骤后分别对应的请求是 JoinGroupSyncGroup

新的消费者加入群组时,这个消费者会向协调器发送 JoinGroup 请求。在该请求中,每个消费者成员都需要将自己消费的 topic 进行提交,我们上面描述群组协调器中说过,这么做的目的就是为了让协调器收集足够的元数据信息,来选取消费者组的领导者。通常情况下,第一个发送 JoinGroup 请求的消费者会自动称为领导者。领导者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。如图

image.png

在所有的消费者都加入进来并把元数据信息提交给领导者后,领导者做出分配方案并发送 SyncGroup 请求给协调者,协调者负责下发群组中的消费策略。下图描述了 SyncGroup 请求的过程

image.png

当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。

从协调者来看重平衡

从协调者角度来看重平衡主要有下面这几种触发条件,

  • 新成员加入组
  • 组成员主动离开
  • 组成员崩溃离开
  • 组成员提交位移

我们分别来描述一下,先从新成员加入组开始

新成员加入组

我们讨论的场景消费者集群状态处于Stable 等待分配的过程,这时候如果有新的成员加入组的话,重平衡的过程

image.png

从这个角度来看,协调者的过程和消费者类似,只是刚刚从消费者的角度去看,现在从领导者的角度去看

组成员离开

组成员离开消费者群组指的是消费者实例调用 close() 方法主动通知协调者它要退出。这里又会有一个新的请求出现 LeaveGroup()请求 。如下图所示

image.png

组成员崩溃

组成员崩溃是指消费者实例出现严重故障,宕机或者一段时间未响应,协调者接收不到消费者的心跳,就会被认为是组成员崩溃,崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制的。如下图所示

image.png

重平衡时提交位移

这个过程我们就不再用图形来表示了,大致描述一下就是 消费者发送 JoinGroup 请求后,群组中的消费者必须在指定的时间范围内提交各自的位移,然后再开启正常的 JoinGroup/SyncGroup 请求发送。

文章参考:

《Kafka 权威指南》

https://blog.csdn.net/u013256...

https://learning.oreilly.com/...

https://www.cnblogs.com/kevin...

https://www.cnblogs.com/huxi2...

《极客时间-Kafka核心技术与实战》

https://cwiki.apache.org/conf...

https://cwiki.apache.org/conf...

kafka 分区和副本以及kafaka 执行流程,以及消息的高可用

Http中的同步请求和异步请求

Reactor模式详解

https://kafka.apache.org/docu...

https://www.linkedin.com/puls...

https://cwiki.apache.org/conf...

查看原文

ttqtc 收藏了文章 · 1月16日

🔥 精美图文带你掌握 JVM 内存布局

前言

JVM 系列:

本JVM系列属于本人学习过程当中总结的一些知识点,目的是想让读者更快地掌握JVM相关的知识要点,难免会有所侧重,若想要更加系统更加详细的学习JVM知识,还是需要去阅读专业的书籍和文档。

本文主题内容:

  • JVM 内存区域概览
  • 堆区的空间分配是怎么样?堆溢出的演示
  • 创建一个新对象内存是怎么分配的?
  • 方法区 到 Metaspace 元空间
  • 栈帧是什么?栈帧里有什么?怎么理解?
  • 本地方法栈
  • 程序计数器
  • Code Cache 是什么?
注:请 区分 JVM内存结构(内存布局) 和 JMM(Java内存模型)这两个不同的概念!

概览

内存是非常重要的系统资源,是硬盘和CPU的中间仓库及桥梁,承载着操作系统和应用程序的实时运行。JVM 内存布局规定了 Java 在运行过程中内存申请、分配、管理的策略 ,保证了 JVM 的高效稳定运行。

image.png

上图描述了当前比较经典的JVM内存布局。(堆区画小了2333,按理来说应该是最大的区域)

如果按照线程是否共享来分类的话,如下图所示:

image.png

PS:线程是否共享这点,实际上理解了每块区域的实际用处之后,就很自然而然的就记住了。不需要死记硬背。

下面让我们来了解下各个区域。

一、Heap (堆区)

1.1 堆区的介绍

我们先来说堆。堆是 OOM故障最主要的发生区域。它是内存区域中最大的一块区域,被所有线程共享,存储着几乎所有的实例对象、数组。所有的对象实例以及数组都要在堆上分配,但是随着JIT编译器的发展与逃逸分析技术逐渐成熟,栈上分配、标量替换优化技术将会导致一些微妙的变化发生,所有的对象都分配在堆上也渐渐变得不是那么“绝对”了

延伸知识点:JIT编译优化中的一部分内容 - 逃逸分析

推荐阅读:深入理解Java中的逃逸分析

Java堆是垃圾收集器管理的主要区域,因此很多时候也被称做“GC堆”。从内存回收的角度来看,由于现在收集器基本都采用分代收集算法,所以Java堆中还可以细分为:新生代和老年代。再细致一点的有Eden空间、From Survivor空间、To Survivor空间等。从内存分配的角度来看,线程共享的Java堆中可能划分出多个线程私有的分配缓冲区(Thread Local Allocation Buffer,TLAB)。不过无论如何划分,都与存放内容无关,无论哪个区域,存储的都仍然是对象实例,进一步划分的目的是为了更好地回收内存,或者更快地分配内存。

1.2 堆区的调整

根据Java虚拟机规范的规定,Java堆可以处于物理上不连续的内存空间中,只要逻辑上是连续的即可,就像我们的磁盘空间一样。在实现时,既可以实现成固定大小的,也可以在运行时动态地调整。

如何调整呢?

通过设置如下参数,可以设定堆区的初始值和最大值,比如 -Xms256M -Xmx 1024M,其中 -X这个字母代表它是JVM运行时参数,msmemory start的简称,中文意思就是内存初始值,mxmemory max的简称,意思就是最大内存。

值得注意的是,在通常情况下,服务器在运行过程中,堆空间不断地扩容与回缩,会形成不必要的系统压力 所以在线上生产环境中 JVM的XmsXmx会设置成同样大小,避免在GC 后调整堆大小时带来的额外压力。

1.3 堆的默认空间分配

另外,再强调一下堆空间内存分配的大体情况。

这里可能就会有人来问了,你从哪里知道的呢?如果我想配置这个比例,要怎么修改呢?

我先来告诉你怎么看虚拟机的默认配置。命令行上执行如下命令,就可以查看当前JDK版本所有默认的JVM参数。

java -XX:+PrintFlagsFinal -version

输出

对应的输出应该有几百行,我们这里去看和堆内存分配相关的两个参数

>java -XX:+PrintFlagsFinal -version
[Global flags]
    ...
    uintx InitialSurvivorRatio                      = 8
    uintx NewRatio                                  = 2
    ...
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)

参数解释

参数作用
-XX:InitialSurvivorRatio新生代Eden/Survivor空间的初始比例
-XX:NewRatioOld区/Young区的内存比例

因为新生代是由Eden + S0 + S1组成的,所以按照上述默认比例,如果eden区内存大小是40M,那么两个survivor区就是5M,整个young区就是50M,然后可以算出Old区内存大小是100M,堆区总大小就是150M。

1.4 堆溢出 演示

/**
 * VM Args:-Xms10m -Xmx10m -XX:+HeapDumpOnOutOfMemoryError
 * @author Richard_Yi
 */
public class HeapOOMTest {

    public static final int _1MB = 1024 * 1024;

    public static void main(String[] args) {
        List<byte[]> byteList = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            byte[] bytes = new byte[2 * _1MB];
            byteList.add(bytes);
        }
    }
}

输出

java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid32372.hprof ...
Heap dump file created [7774077 bytes in 0.009 secs]
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at jvm.HeapOOMTest.main(HeapOOMTest.java:18)

-XX:+HeapDumpOnOutOfMemoryError 可以让JVM在遇到OOM异常时,输出堆内信息,特别是对相隔数月才出现的OOM异常尤为重要。

创建一个新对象 内存分配流程

看完上面对堆的介绍,我们趁热打铁再学习一下JVM创建一个新对象的内存分配流程。

image.png

绝大部分对象在Eden区生成,当Eden区装填满的时候,会触发Young Garbage Collection,即YGC。垃圾回收的时候,在Eden区实现清除策略,没有被引用的对象则直接回收。依然存活的对象会被移送到Survivor区。Survivor区分为so和s1两块内存空间。每次YGC的时候,它们将存活的对象复制到未使用的那块空间,然后将当前正在使用的空间完全清除,交换两块空间的使用状态。如果YGC要移送的对象大于Survivor区容量的上限,则直接移交给老年代。一个对象也不可能永远呆在新生代,就像人到了18岁就会成年一样,在JVM中-XX:MaxTenuringThreshold参数就是来配置一个对象从新生代晋升到老年代的阈值。默认值是15, 可以在Survivor区交换14次之后,晋升至老年代。

上述涉及到一部分垃圾回收的名词,不熟悉的读者可以查阅资料或者看下本系列的垃圾回收章节。

二、Metaspace 元空间

在 HotSpot JVM 中,永久代( ≈ 方法区)中用于存放类和方法的元数据以及常量池,比如ClassMethod。每当一个类初次被加载的时候,它的元数据都会放到永久代中。

永久代是有大小限制的,因此如果加载的类太多,很有可能导致永久代内存溢出,即万恶的 java.lang.OutOfMemoryError: PermGen,为此我们不得不对虚拟机做调优。

那么,Java 8 中 PermGen 为什么被移出 HotSpot JVM 了?(详见:JEP 122: Remove the Permanent Generation):

  1. 由于 PermGen 内存经常会溢出,引发恼人的 java.lang.OutOfMemoryError: PermGen,因此 JVM 的开发者希望这一块内存可以更灵活地被管理,不要再经常出现这样的 OOM
  2. 移除 PermGen 可以促进 HotSpot JVM 与 JRockit VM 的融合,因为 JRockit 没有永久代。

根据上面的各种原因,PermGen 最终被移除,方法区移至 Metaspace,字符串常量池移至堆区

准确来说,Perm 区中的字符串常量池被移到了堆内存中是在Java7 之后,Java 8 时,PermGen 被元空间代替,其他内容比如类元信息、字段、静态属性、方法、常量等都移动到元空间区。比如java/lang/Object 类元信息、静态属性System.out、整形常量 100000等。

元空间的本质和永久代类似,都是对JVM规范中方法区的实现。不过元空间与永久代之间最大的区别在于:元空间并不在虚拟机中,而是使用本地内存。因此,默认情况下,元空间的大小仅受本地内存限制。(和后面提到的直接内存一样,都是使用本地内存)

In JDK 8, classes metadata is now stored in the native heap and this space is called Metaspace.

对应的JVM调参:

参数作用
-XX:MetaspaceSize分配给Metaspace(以字节计)的初始大小
-XX:MaxMetaspaceSize分配给Metaspace 的最大值,超过此值就会触发Full GC,此值默认没有限制,但应取决于系统内存的大小。JVM会动态地改变此值。
-XX:MinMetaspaceFreeRatio在GC之后,最小的Metaspace剩余空间容量的百分比,减少为分配空间所导致的垃圾收集
-XX:MaxMetaspaceFreeRatio在GC之后,最大的Metaspace剩余空间容量的百分比,减少为释放空间所导致的垃圾收集

延伸阅读:关于Metaspace比较好的两篇文章。

  1. Metaspace in Java 8
  2. http://lovestblog.cn/blog/201...

三、 Java 虚拟机栈

对于每一个线程,JVM 都会在线程被创建的时候,创建一个单独的栈。也就是说虚拟机栈的生命周期和线程是一致,并且是线程私有的。除了Native方法以外,Java方法都是通过Java 虚拟机栈来实现调用和执行过程的(需要程序技术器、堆、元空间内数据的配合)。所以Java虚拟机栈是虚拟机执行引擎的核心之一。而Java虚拟机栈中出栈入栈的元素就称为「栈帧」。

栈帧(Stack Frame)是用于支持虚拟机进行方法调用和方法执行的数据结构。栈帧存储了方法的局部变量表、操作数栈、动态连接和方法返回地址等信息。每一个方法从调用至执行完成的过程,都对应着一个栈帧在虚拟机栈里从入栈到出栈的过程。

栈对应线程,栈帧对应方法

在活动线程中, 只有位于栈顶的帧才是有效的, 称为当前栈帧。正在执行的方法称为当前方法。在执行引擎运行时, 所有指令都只能针对当前栈帧进行操作。而StackOverflowError 表示请求的栈溢出, 导致内存耗尽, 通常出现在递归方法中。

虚拟机栈通过pop和push的方式,对每个方法对应的活动栈帧进行运算处理,方法正常执行结束,肯定会跳转到另一个栈帧上。在执行的过程中,如果出现了异常,会进行异常回溯,返回地址通过异常处理表确定。

可以看出栈帧在整个JVM 体系中的地位颇高。下面也具体介绍一下栈帧中的存储信息。

1. 局部变量表

局部变量表就是存放方法参数和方法内部定义的局部变量的区域

局部变量表所需的内存空间在编译期间完成分配,当进入一个方法时,这个方法需要在帧中分配多大的局部变量空间是完全确定的,在方法运行期间不会改变局部变量表的大小

这里直接上代码,更好理解。

public int test(int a, int b) {
    Object obj = new Object();
    return a + b;
}

如果局部变量是Java的8种基本基本数据类型,则存在局部变量表中,如果是引用类型。如new出来的String,局部变量表中存的是引用,而实例在堆中。

2. 操作栈

操作数栈(Operand Stack)看名字可以知道是一个栈结构。Java虚拟机的解释执行引擎称为“基于栈的执行引擎”,其中所指的“栈”就是操作数栈。当JVM为方法创建栈帧的时候,在栈帧中为方法创建一个操作数栈,保证方法内指令可以完成工作。

还是用实操理解一下。

/**
 * @author Richard_yyf
 */
public class OperandStackTest {

    public int sum(int a, int b) {
        return a + b;
    }
}

编译生成.class文件之后,再反汇编查看汇编指令

> javac OperandStackTest.java
> javap -v OperandStackTest.class > 1.txt
  public int sum(int, int);
    descriptor: (II)I
    flags: ACC_PUBLIC
    Code:
      stack=2, locals=3, args_size=3 // 最大栈深度为2 局部变量个数为3
         0: iload_1 // 局部变量1 压栈
         1: iload_2 // 局部变量2 压栈
         2: iadd    // 栈顶两个元素相加,计算结果压栈
         3: ireturn
      LineNumberTable:
        line 10: 0

3. 动态连接

每个栈帧中包含一个在常量池中对当前方法的引用, 目的是支持方法调用过程的动态连接

4. 方法返回地址

方法执行时有两种退出情况:

  • 正常退出,即正常执行到任何方法的返回字节码指令,如 RETURNIRETURNARETURN
  • 异常退出

无论何种退出情况,都将返回至方法当前调用的位置。方法退出的过程相当于弹出当前栈帧,退出可能有三种方式:

  • 返回值压入上层调用栈帧
  • 异常信息抛给能够处理的栈帧
  • PC 计数器指向方法调用后的下一条指令
延伸阅读:JVM机器指令集图解

四、本地方法栈

本地方法栈(Native Method Stack)与虚拟机栈所发挥的作用是非常相似的,它们之间的区别不过是虚拟机栈为虚拟机执行Java方法(也就是字节码)服务,而本地方法栈则为虚拟机使用到的Native方法服务。在虚拟机规范中对本地方法栈中方法使用的语言、使用方式与数据结构并没有强制规定,因此具体的虚拟机可以自由实现它。甚至有的虚拟机(譬如Sun HotSpot虚拟机)直接就把本地方法栈和虚拟机栈合二为一。与虚拟机栈一样,本地方法栈区域也会抛出StackOverflowError和OutOfMemoryError异常

五、程序计数器

程序计数器(Program Counter Register)是一块较小的内存空间。是线程私有的。它可以看作是当前线程所执行的字节码的行号指示器。什么意思呢?

白话版本:因为代码是在线程中运行的,线程有可能被挂起。即CPU一会执行线程A,线程A还没有执行完被挂起了,接着执行线程B,最后又来执行线程A了,CPU得知道执行线程A的哪一部分指令,线程计数器会告诉CPU。

由于Java虚拟机的多线程是通过线程轮流切换并分配处理器执行时间的方式来实现的,CPU 只有把数据装载到寄存器才能够运行。寄存器存储指令相关的现场信息,由于CPU 时间片轮限制,众多线程在并发执行过程中,任何一个确定的时刻,一个处理器或者多核处理器中的一个内核,只会执行某个线程中的一条指令

因此,为了线程切换后能恢复到正确的执行位置,每条线程都需要有一个独立的程序计数器,各条线程之间计数器互不影响,独立存储。每个线程在创建后,都会产生自己的程序计数器和栈帧,程序计数器用来存放执行指令的偏移量和行号指示器等,线程执行或恢复都要依赖程序计数器。此区域也不会发生内存溢出异常。

六、直接内存

直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域。但是这部分内存也被频繁地使用,而且也可能导致OutOfMemoryError异常出现,所以我们放到这里一起讲解。

在JDK 1.4中新加入了NIO(New Input/Output)类,引入了一种基于通道(Channel)与缓冲区(Buffer)的I/O方式,它可以使用Native函数库直接分配堆外内存,然后通过一个存储在Java堆中的DirectByteBuffer对象作为这块内存的引用进行操作。这样能在一些场景中显著提高性能,因为避免了在Java堆和Native堆中来回复制数据

显然,本机直接内存的分配不会受到Java堆大小的限制,但是,既然是内存,肯定还是会受到本机总内存(包括RAM以及SWAP区或者分页文件)大小以及处理器寻址空间的限制。如果内存区域总和大于物理内存的限制,也会出现OOM。

Code Cache

简而言之, JVM代码缓存是JVM将其字节码存储为本机代码的区域 。我们将可执行本机代码的每个块称为 nmethod 。该 nmethod可能是一个完整的或内联Java方法。

实时(JIT)编译器是代码缓存区域的最大消费者。这就是为什么一些开发人员将此内存称为JIT代码缓存的原因。

这部分代码所占用的内存空间成为CodeCache区域。一般情况下我们是不会关心这部分区域的且大部分开发人员对这块区域也不熟悉。如果这块区域OOM了,在日志里面就会看到 java.lang.OutOfMemoryError code cache

诊断选项

选项默认值描述
PrintCodeCachefalse是否在JVM退出前打印CodeCache的使用情况
PrintCodeCacheOnCompilationfalse是否在每个方法被JIT编译后打印CodeCache区域的使用情况
延伸阅读 Introduction to JVM Code Cache

参考

  1. 《深入理解Java虚拟机》 - 周志明
  2. 《码出高效》
  3. Metaspace in Java 8
  4. JVM机器指令集图解
  5. Introduction to JVM Code Cache
如果本文有帮助到你,希望能点个赞,这是对我的最大动力。
查看原文

ttqtc 收藏了文章 · 2018-09-13

30分钟理解GraphQL核心概念

写在前面

在上一篇文章RPC vs REST vs GraphQL中,对于这三者的优缺点进行了比较宏观的对比,而且我们也会发现,一般比较简单的项目其实并不需要GraphQL,但是我们仍然需要对新的技术有一定的了解和掌握,在新技术普及时才不会措手不及。

这篇文章主要介绍一些我接触GraphQL的这段时间,觉得需要了解的比较核心的概念,比较适合一下人群:

  • 听说过GraphQL的读者,想深入了解一下
  • 想系统地学习GraphQL的读者
  • 正在调研GraphQL技术的读者

这些概念并不局限于服务端或者是客户端,如果你熟悉这些概念,在接触任意使用GraphQL作为技术背景的库或者框架时,都可以通过文档很快的上手。

如果你已经GraphQL应用于了实际项目中,那么这篇文章可能不适合你,因为其中并没有包含一些实践中的总结和经验,关于实践的东西我会在之后再单另写一篇文章总结。

什么是GraphQL

介绍GraphQL是什么的文章网上一搜一大把,篇幅有长有短,但是从最核心上讲,它是一种查询语言,再进一步说,是一种API查询语言。

这里可能有的人就会说,什么?API还能查?API不是用来调用的吗?是的,这正是GraphQL的强大之处,引用官方文档的一句话:

ask exactly what you want.

我们在使用REST接口时,接口返回的数据格式、数据类型都是后端预先定义好的,如果返回的数据格式并不是调用者所期望的,作为前端的我们可以通过以下两种方式来解决问题:

  • 和后端沟通,改接口(更改数据源)
  • 自己做一些适配工作(处理数据源)

一般如果是个人项目,改后端接口这种事情可以随意搞,但是如果是公司项目,改后端接口往往是一件比较敏感的事情,尤其是对于三端(web、andriod、ios)公用同一套后端接口的情况。大部分情况下,均是按第二种方式来解决问题的。

因此如果接口的返回值,可以通过某种手段,从静态变为动态,即调用者来声明接口返回什么数据,很大程度上可以进一步解耦前后端的关联。

在GraphQL中,我们通过预先定义一张Schema和声明一些Type来达到上面提及的效果,我们需要知道:

  • 对于数据模型的抽象是通过Type来描述的
  • 对于接口获取数据的逻辑是通过Schema来描述的

这么说可能比较抽象,我们一个一个来说明。

Type

对于数据模型的抽象是通过Type来描述的,每一个Type有若干Field组成,每个Field又分别指向某个Type。

GraphQL的Type简单可以分为两种,一种叫做Scalar Type(标量类型),另一种叫做Object Type(对象类型)

Scalar Type

GraphQL中的内建的标量包含,StringIntFloatBooleanEnum,对于熟悉编程语言的人来说,这些都应该很好理解。

值得注意的是,GraphQL中可以通过Scalar声明一个新的标量,比如:

  • prisma(一个使用GraphQL来抽象数据库操作的库)中,还有DateTimeID这两个标量分别代表日期格式和主键
  • 在使用GraphQL实现文件上传接口时,需要声明一个Upload标量来代表要上传的文件

总之,我们只需要记住,标量是GraphQL类型系统中最小的颗粒,关于它在GraphQL解析查询结果时,我们还会再提及它。

Object Type

仅有标量是不够的抽象一些复杂的数据模型的,这时候我们需要使用对象类型,举个例子(先忽略语法,仅从字面上看):

type Article {
  id: ID
  text: String
  isPublished: Boolean
}

上面的代码,就声明了一个Article类型,它有3个Field,分别是ID类型的id,String类型的text和Boolean类型的isPublished。

对于对象类型的Field的声明,我们一般使用标量,但是我们也可以使用另外一个对象类型,比如如果我们再声明一个新的User类型,如下:

type User {
  id: ID
  name: String
}

这时我们就可以稍微的更改一下关于Article类型的声明代码,如下:

type Article {
  id: ID
  text: String
  isPublished: Boolean
  author: User
}

Article新增的author的Field是User类型, 代表这篇文章的作者。

总之,我们通过对象模型来构建GraphQL中关于一个数据模型的形状,同时还可以声明各个模型之间的内在关联(一对多、一对一或多对多)。

Type Modifier

关于类型,还有一个较重要的概念,即类型修饰符,当前的类型修饰符有两种,分别是ListRequired ,它们的语法分别为[Type]Type!, 同时这两者可以互相组合,比如[Type]!或者[Type!]或者[Type!]!(请仔细看这里!的位置),它们的含义分别为:

  • 列表本身为必填项,但其内部元素可以为空
  • 列表本身可以为空,但是其内部元素为必填
  • 列表本身和内部元素均为必填

我们进一步来更改上面的例子,假如我们又声明了一个新的Comment类型,如下:

type Comment {
  id: ID!
  desc: String,
  author: User!
}

你会发现这里的ID有一个!,它代表这个Field是必填的,再来更新Article对象,如下:

type Article {
  id: ID!
  text: String
  isPublished: Boolean
  author: User!
  comments: [Comment!]
}

我们这里的作出的更改如下:

  • id字段改为必填
  • author字段改为必填
  • 新增了comments字段,它的类型是一个元素为Comment类型的List类型

最终的Article类型,就是GraphQL中关于文章这个数据模型,一个比较简单的类型声明。

Schema

现在我们开始介绍Schema,我们之前简单描述了它的作用,即它是用来描述对于接口获取数据逻辑的,但这样描述仍然是有些抽象的,我们其实不妨把它当做REST架构中每个独立资源的uri来理解它,只不过在GraphQL中,我们用Query来描述资源的获取方式。因此,我们可以将Schema理解为多个Query组成的一张表。

这里又涉及一个新的概念Query,GraphQL中使用Query来抽象数据的查询逻辑,当前标准下,有三种查询类型,分别是query(查询)mutation(更改)subscription(订阅)

Note: 为了方便区分,Query特指GraphQL中的查询(包含三种类型),query指GraphQL中的查询类型(仅指查询类型)

Query

上面所提及的3中基本查询类型是作为Root Query(根查询)存在的,对于传统的CRUD项目,我们只需要前两种类型就足够了,第三种是针对当前日趋流行的real-time应用提出的。

我们按照字面意思来理解它们就好,如下:

  • query(查询):当获取数据时,应当选取Query类型
  • mutation(更改):当尝试修改数据时,应当使用mutation类型
  • subscription(订阅):当希望数据更改时,可以进行消息推送,使用subscription类型

仍然以一个例子来说明。

首先,我们分别以REST和GraphQL的角度,以Article为数据模型,编写一系列CRUD的接口,如下:

Rest 接口

GET /api/v1/articles/
GET /api/v1/article/:id/
POST /api/v1/article/
DELETE /api/v1/article/:id/
PATCH /api/v1/article/:id/

GraphQL Query

query {
  articles(): [Article!]!
  article(id: Int): Article!
}

mutation {
  createArticle(): Article!
  updateArticle(id: Int): Article!
  deleteArticle(id: Int): Article!
}

对比我们较熟悉的REST的接口我们可以发现,GraphQL中是按根查询的类型来划分Query职能的,同时还会明确的声明每个Query所返回的数据类型,这里的关于类型的语法和上一章节中是一样的。需要注意的是,我们所声明的任何Query都必须是Root Query的子集,这和GraphQL内部的运行机制有关。

例子中我们仅仅声明了Query类型和Mutation类型,如果我们的应用中对于评论列表有real-time的需求的话,在REST中,我们可能会直接通过长连接或者通过提供一些带验证的获取长连接url的接口,比如:

POST /api/v1/messages/

之后长连接会将新的数据推送给我们,在GraphQL中,我们则会以更加声明式的方式进行声明,如下

subscription {
  updatedArticle() {
    mutation
    node {
        comments: [Comment!]!
    }
  }
}

我们不必纠结于这里的语法,因为这篇文章的目的不是让你在30分钟内学会GraphQL的语法,而是理解的它的一些核心概念,比如这里,我们就声明了一个订阅Query,这个Query会在有新的Article被创建或者更新时,推送新的数据对象。当然,在实际运行中,其内部实现仍然是建立于长连接之上的,但是我们能够以更加声明式的方式来进行声明它。

Resolver

如果我们仅仅在Schema中声明了若干Query,那么我们只进行了一半的工作,因为我们并没有提供相关Query所返回数据的逻辑。为了能够使GraphQL正常工作,我们还需要再了解一个核心概念,Resolver(解析函数)

GraphQL中,我们会有这样一个约定,Query和与之对应的Resolver是同名的,这样在GraphQL才能把它们对应起来,举个例子,比如关于articles(): [Article!]!这个Query, 它的Resolver的名字必然叫做articles

在介绍Resolver之前,是时候从整体上了解下GraphQL的内部工作机制了,假设现在我们要对使用我们已经声明的articles的Query,我们可能会写以下查询语句(同样暂时忽略语法):

Query {
  articles {
       id
       author {
           name
       }
       comments {
      id
      desc
      author
    }
  }
}

GraphQL在解析这段查询语句时会按如下步骤(简略版):

  • 首先进行第一层解析,当前QueryRoot Query类型是query,同时需要它的名字是articles
  • 之后会尝试使用articlesResolver获取解析数据,第一层解析完毕
  • 之后对第一层解析的返回值,进行第二层解析,当前articles还包含三个子Query,分别是idauthorcomments

    • id在Author类型中为标量类型,解析结束
    • author在Author类型中为对象类型User,尝试使用UserResolver获取数据,当前field解析完毕
    • 之后对第二层解析的返回值,进行第三层解析,当前author还包含一个Query, name,由于它是标量类型,解析结束
    • comments同上...

我们可以发现,GraphQL大体的解析流程就是遇到一个Query之后,尝试使用它的Resolver取值,之后再对返回值进行解析,这个过程是递归的,直到所解析Field的类型是Scalar Type(标量类型)为止。解析的整个过程我们可以把它想象成一个很长的Resolver Chain(解析链)。

这里对于GraphQL的解析过程只是很简单的概括,其内部运行机制远比这个复杂,当然这些对于使用者是黑盒的,我们只需要大概了解它的过程即可。

Resolver本身的声明在各个语言中是不一样的,因为它代表数据获取的具体逻辑。它的函数签名(以js为例子)如下:

function(parent, args, ctx, info) {
    ...
}

其中的参数的意义如下:

  • parent: 当前上一个Resolver的返回值
  • args: 传入某个Query中的函数(比如上面例子中article(id: Int)中的id
  • ctx: 在Resolver解析链中不断传递的中间变量(类似中间件架构中的context)
  • info: 当前Query的AST对象

值得注意的是,Resolver内部实现对于GraphQL完全是黑盒状态。这意味着Resolver如何返回数据、返回什么样的数据、从哪返回数据,完全取决于Resolver本身,基于这一点,在实际中,很多人往往把GraphQL作为一个中间层来使用,数据的获取通过Resolver来封装,内部数据获取的实现可能基于RPC、REST、WS、SQL等多种不同的方式。同时,基于这一点,当你在对一些未使用GraphQL的系统进行迁移时(比如REST),可以很好的进行增量式迁移。

总结

大概就这么多,首先感谢你耐心的读到这里,虽然题目是30分钟熟悉GraphQL核心概念,但是可能已经超时了,不过我相信你对GraphQL中的核心概念已经比较熟悉了。但是它本身所涉及的东西远远比这个丰富,同时它还处于飞速的发展中。

最后我尝试根据这段时间的学习GraphQL的经验,提供一些进一步学习和了解GraphQL的方向和建议,仅供参考:

想进一步了解GraphQL本身

我建议再仔细去官网,读一下官方文档,如果有兴趣的话,看看GraphQL的spec也是极好的。这篇文章虽然介绍了核心概念,但是其他一些概念没有涉及,比如Union、Interface、Fragment等等,这些概念均是基于核心概念之上的,在了解核心概念后,应当会很容易理解。

偏向服务端

偏向服务端方向的话,除了需要进一步了解GraphQL在某个语言的具体生态外,还需要了解一些关于缓存、上传文件等特定方向的东西。如果是想做系统迁移,还需要对特定的框架做一些调研,比如graphene-django。

如果是想使用GraphQL本身做系统开发,这里推荐了解一个叫做prisma的框架,它本身是在GraphQL的基础上构建的,并且与一些GraphQL的生态框架兼容性也较好,在各大编程语言也均有适配,它本身可以当做一个ORM来使用,也可以当做一个与数据库交互的中间层来使用。

偏向客户端

偏向客户端方向的话,需要进一步了解关于graphql-client的相关知识,我这段时间了解的是apollo,一个开源的grapql-client框架,并且与各个主流前端技术栈如Angular、React等均有适配版本,使用感觉良好。

同时,还需要了解一些额外的查询概念,比如分页查询中涉及的Connection、Edge等。

大概就这么多,如有错误,还望指正。

欢迎关注公众号 全栈101,只谈技术,不谈人生
clipboard.png
查看原文

ttqtc 提出了问题 · 2018-08-17

java实体类转换有没有更优雅的方式?

常规的转换方式:

a.setId(b.getId());
a.setUserName(b.getName());

当类中的成员变量非常多,几十个,这种方式就会变得非常糟糕,且易出错,不易排查,
有没有更好的方式来解决这个问题呢?

关注 7 回答 7

ttqtc 赞了回答 · 2018-08-17

解决java中基本类型能否作为引用传递

1、return(作者要求不使用这种方法,舍弃)
2、楼上的方法
3、a作为全局变量,func2中,this.a = 3;

关注 3 回答 2

认证与成就

  • 获得 0 次点赞
  • 获得 6 枚徽章 获得 0 枚金徽章, 获得 1 枚银徽章, 获得 5 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2017-07-24
个人主页被 74 人浏览