迈莫coding

迈莫coding 查看完整档案

北京编辑  |  填写毕业院校公众号  |  迈莫coding 编辑填写个人主网站
编辑

校招面试/go/公众号同名文章,每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。

个人动态

迈莫coding 发布了文章 · 4月13日

http到底有什么魔性,备受青睐

在这里插入图片描述

原文始发于:https://mp.weixin.qq.com/s/5Y8XD_T4F0U7SsDahc6nww

http是什么

http是超文本传输协议,也就是Hyper Text Transfer Protocol,是一个在计算机世界里专门在两点之间传输文字、图片、音频、视频等超文本数据的约定和规范。

先看一下http名字:超文本传输协议。顾名思义,它可以拆成三部分,分别为:超文本、传输、协议。那我们接下来去聊一聊每部分的含义。

在这里插入图片描述

首先http是一个协议,那么协议到底是什么?

其实"协议"并不仅仅局限在计算机世界里,在现实生活中"协议"也是无处不在的,比如租房时会签订一个租房合同、刚毕业的大学生会和公司签订一个三方协议、员工入职时会签订一个保密协议等等,协议渗透到生活的方方面面。

之所以出现"协议",也是对参与者的一种行为约定和规范。俗话说:"没有规矩,不成方圆",也体现出了协议的重要性。

http是一个用在计算机世界里的协议。它使用计算机能够理解的语言确立了一种计算机之间交流通信的规范,以及相关的各种控制和错误处理方式。

协议聊完了,那接下里看看http第二组成部分:传输

所谓的传输,就是把一大推东西从A点搬到B点,或者从B点搬到A点,即"A<==>B"。

就拿网络冲浪举例子吧,迈莫在手机上浏览公众号<<迈莫coding>>,发现一篇文章<<http到底是有什么魔性,备受青睐>>对自己帮助更大,随手点了个赞。

拿点赞这一操作来说,客户端需要将点赞人的ID、文章ID等必备数据给到服务器,服务器拿到这些数据去更新数据库,更新完成之后,将现有的点赞数通知客户端,客户端拿到服务器发送过来的数据后,将点赞数更新。

说到这里,你也差不多理解了http的第二层的含义了吧。

说完"协议"和"传输",现在我们终于到了http的第三部分:超文本

既然http是"传输协议",那么它传输的"超文本"到底是什么?它为什么是超文本而不是文本?超文本和文本区别是什么?接下来,我们慢慢聊。

先说文本是什么。文本表示http传输的是完整的、有意义的数据,可以被浏览器、服务器这些上层应用程序处理。不是tcp/udp这些底层协议里被切分的杂乱无章的二进制包。

超文本就是"超越了普通文本的文本",不再局限于简单的文字,而是文字、图片、音频和视频等的混合体,最关键的是“超链接”,能够从一个"超文本"跳跃到另一个"超文本",形成复杂的非线性、网状的机构关系。

到这里,我们详细解释了http是超文本传输协议,最后用一句话来概括:http是一个计算机世界里专门在两点之间传输文字、图片、音频和视频等超文本数据的约定和规范。

http特点

上述聊完了http的含义,接下来一起看看http为什么这么备受青睐,他到底有什么魔性,让互联网人无法自拔。

我用一张图来简单概括一下http的特点,然后再分开细聊。先有整体思想,然后细看每个特点。

在这里插入图片描述

- 简单快速

客户端访问服务端时,只需传送请求方法和路径。请求方法常用的有 GET、HEAD、POST 等等。每种方法规定了客户与服务器联系的类型不同。由于 HTTP 协议简单,使得 HTTP 服务器的程序规模小,因而通信速度很快。

- 灵活可扩展

在使用http数据传输过程中,请求主体body 也不再限于文本形式的 TXT 或HTML,而是能够传输图片、音频视频等任意数据。也可以在请求头header 中添加一些特性,比如请求方法、版本号、状态码段等。

- 可靠性传输

http是基于tcp/ip协议的,而由于tcp是可靠性传输协议,所以 HTTP 自然也就继承了这个特性,能够在请求方和应答方之间“可靠”地传输数据。

不过我们必须正确地理解“可靠”的含义,HTTP 并不能 100% 保证数据一定能够发送到
另一端,在网络繁忙、连接质量差等恶劣的环境下,也有可能收发失败。“可靠”只是向使用者提供了一个“承诺”,会在下层用多种手段“尽量”保证数据的完整送达。

- 无状态

HTTP 协议是无状态协议。无状态,是指协议对于事务处理没有记忆能力。无状态意味着如果后续处理需要前面的信息,则它必须重传,这样可能导致每次连接传送的数据量增大。另一方面,在服务器不需要先前信息时它的应答就较快。

我们可以再对比一下 UDP 协议,不过它是无连接也无状态的,顺序发包乱序收包,数据包发出去后就不管了,收到后也不会顺序整理。而 HTTP 是有连接无状态,顺序发包顺序收包,按照收发的顺序管理报文。

http 1.0/1.1/2.x 版本

http1.0 比较 http1.1

- 长连接

处理在一个 TCP 连接上可以传送多个 HTTP 请求和响应,减少了建立和关闭连接的消耗和延迟。在 HTTP1.1中 默认开启Connection:keep-alive ,一定程度上弥补了 HTTP1.0 每次请求都要创建连接的缺点。

- 请求流水线Pipelining

HTTP1.1 还允许客户端不用等待上一次请求结果返回,就可以发出下一次请求,但服务器端必须按照接收到客户端请求的先后顺序依次回送响应结果,以保证客户端能够区分出每次请求的响应内容,这样也显著地减少了整个下载过程所需要的时间。

http1.x 比较 http2.x

- 多路复用机制

http2.0 采用多路复用技术,做到同一个连接并发处理多个请求,而且并发请求的数量比HTTP1.1大了好几个数量级。HTTP1.1也可以多建立几个TCP连接,来支持处理更多并发的请求,但是创建TCP连接本身也是有开销的。

在这里插入图片描述

- 头部数据压缩

在HTTP1.1中,HTTP请求和响应都是由状态行、请求/响应头部、消息主体三部分组成。一般而言,消息主体都会经过gzip压缩,或者本身传输的就是压缩过后的二进制文件,但状态行和头部却没有经过任何压缩,直接以纯文本传输。随着Web功能越来越复杂,每个页面产生的请求数也越来越多,导致消耗在头部的流量越来越多,尤其是每次都要传输UserAgent、Cookie这类不会频繁变动的内容,完全是一种浪费。

而且并发请求的数量比HTTP1.1大了好几个数量级。HTTP1.1也可以多建立几个TCP连接,来支持处理更多并发的请求,但是创建TCP连接本身也是有开销的。

为此,将请求头部进行微压缩,可以让那些请求秩序一个来回搞定,大大节约了时间和资源的浪费。

- 采用二进制格式

比起文本协议,二进制解析起来更加高效,"线上"更紧凑,更重要的是错误更少。

- 服务器推送

服务端推送是一种在客户端请求之前发送数据的机制。网页使用了许多资源:HTML、样式表、脚本、图片等等。在HTTP1.1中这些资源每一个都必须明确地请求。这是一个很慢的过程。浏览器从获取HTML开始,然后在它解析和评估页面的时候,增量地获取更多的资源。因为服务器必须等待浏览器做每一个请求,网络经常是空闲的和未充分使用的。

为了改善延迟,HTTP2.0引入了server push,它允许服务端推送资源给浏览器,在浏览器明确地请求之前,免得客户端再次创建连接发送请求到服务器端获取。这样客户端可以直接从本地加载这些资源,不用再通过网络。

在这里插入图片描述

到这里,http的概念、特点、版本差异及其特点也就告一段落了,今天就到这里。

咱们下期再见~~~

文章也会持续更新,可以微信搜索「 迈莫coding 」第一时间阅读。每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。
查看原文

赞 0 收藏 0 评论 0

迈莫coding 发布了文章 · 4月5日

实习四个多月,聊一聊自己的感受

前言

======

在鹅厂实习了也将近四个多月,也准备回学校写毕设论文,在实习这段时间,感觉自己过的无比充实。无论是从态度上还是技能上,都有不一样的提升。相信大家读到这篇文章,也非常想知道在鹅厂工作到底是什么样子的,鹅厂的福利有多爽。莫慌,我会以我的亲身经历告诉你鹅厂的福利,来鹅厂就对了鹅厂欢迎您的到来!!!

=====

在鹅厂工作的Q哥、Q妹,早餐和晚餐是瑞雪拿的,吃多少拿多少。因为我在腾讯北京总部,所以我以其说一下我的感受。公司有两层食堂,可以选择自己喜欢的,一层食堂在10点就不续餐了,如果来的迟的话,可以去B1层,B1层关门时间是10点半。要说食堂伙食,凭心而论,伙食超级好,种类繁多,味道独特,让人流连忘返。

我一般每天9点半左右到公司,去拿自己喜欢吃的蛋挞(来迟了就无了😭)。我的早餐是比较的固定的,基本上是小笼包+蛋挞+鸡蛋+混沌(豆浆)+小菜,荤素配搭,营养早餐。一日之计在于晨,也希望各位小伙伴好好吃早饭,对自己好点。就这样,早餐吃完了,也该泡杯枸杞干活了。

这就是我的早餐图,拍个不是很好,凑合看吧😂😂,片面体现出鹅厂的福利相当棒。民以食为天,鹅厂棒棒的。

=====

在出行方面,鹅厂提供了班车和摆渡车。基本上全天候车,保证员工的正常出行。我一般坐车到达西二旗后,坐摆渡车到达总部。我记得我第一次来总部时,那时候没有员工卡,也不知道班车位置,只能挤公交,印象非常深刻,人超级多,公交都挤不上,勉勉强强挤上去,下车后还需要走1公里路才可以到达目的地。鹅厂在出行方面人性化,从西二旗->总部,每隔10分钟就会巴士来回穿梭,满足员工的出行。员工再也不用挤公交了了🥱🥱。

工作

======

到了工作这一块,相信也是大家比较想看的,莫慌,听我细细道来。

工作的话,每个需求都会划分日期和人数,在规定时间内交工即可。外界传说996、007呀,听见就很可怕的,一想到秃头,头更大😂😂。在我没来之前,信以为真。但来实习的话,突然发现谣言一传传百里。谣言真可怕!!!

其实在我看来,每天把自己手头工作做完,工作期间保持高效率,尽量把时间投入到工作中。这样的话,工作完成之后,剩余时间自已可以规划,比如看看书、锻炼锻炼都可以的,提升一下自己。

这张图就是我的工作真实现照,工作的时候需要和各方人进行对接,距离近的话,可以直接拿上电脑跑过去,进行疑惑解答;如果离得比较远的话,就只能打电话对接了(员工拥有一个电话)。主要是因为空间太大了,只能通过打电话来进行探讨😣😣。空间大也有难度 😂😂。

这就是我的工作时的状态,每天忙忙碌碌,充实着每一天。

健身

======

我不是一个特别喜欢健身的人😂😂。但为了自己拥有一个强壮的身体,在半个月前,就每天晚上坚持(小声逼逼: 偶尔也会断)去健身房健健身,锻炼一下自己,但有时候到了健身房,一看到健身器材就打退堂鼓。幸亏有小哥哥小姐姐的陪伴,才能一天一天坚持下去(有时候会断😂😂)。

这是我第一天健身时给我舍友拍的,他们不信我能健身坚持,开始赌我可以坚持几天。事实上,他们是对的,只有小哥哥小姐姐去健身,我才会去😂😂。但发自内心,还是希望自己可以一直坚持下去,拥有一个强壮的身体。

给自己定个小目标:健身养成习惯,奥利给!!!

感悟

======

在鹅厂实习的这段时间,眼界和格局让自己有了不一样的提升,也见识到了学霸的恐怖性;技能上有了不一样的感悟,知其然而所以然;态度上,无论大小事物,都应付出100%努力,认真对待。生活上,也让可怜的娃见识到了所谓的明星,也拿到了他们的签名照。满足了自己心中小小的愿望。

我在鹅厂欢迎大家的到来!!!

文章也会持续更新,可以微信搜索「 迈莫coding 」第一时间阅读。每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。
查看原文

赞 0 收藏 0 评论 0

迈莫coding 发布了文章 · 4月4日

elasticsearch入门篇

原文先发于:https://mp.weixin.qq.com/s/0cTaqiYSJ1Pr9jBEVYfKHw

一、elasticsearch背后有趣的故事

许多年前,一个刚结婚的名叫 Shay Banon 的失业开发者,跟着他的妻子去了伦敦,他的妻子在那里学习厨师。 在寻找一个赚钱的工作的时候,为了给他的妻子做一个食谱搜索引擎,他开始使用 Lucene 的一个早期版本。直接使用 Lucene 是很难的,因此 Shay 开始做一个抽象层,Java 开发者使用它可以很简单的给他们的程序添加搜索功能。 他发布了他的第一个开源项目 Compass。后来 Shay 获得了一份工作,主要是高性能,分布式环境下的内存数据网格。这个对于高性能,实时,分布式搜索引擎的需求尤为突出, 他决定重写 Compass,把它变为一个独立的服务并取名 Elasticsearch。

第一个公开版本在2010年2月发布,从此以后,Elasticsearch 已经成为了 Github 上最活跃的项目之一,他拥有超过300名 contributors(目前736名 contributors )。 一家公司已经开始围绕 Elasticsearch 提供商业服务,并开发新的特性,但是,Elasticsearch 将永远开源并对所有人可用。

据说,Shay 的妻子还在等着她的食谱搜索引擎…

二、elasticsearch简介

Elasticsearch 是一个开源的搜索引擎,建立在一个全文搜索引擎库 Apache Lucene™ 基础之上。 Lucene 可以说是当下最先进、高性能、全功能的搜索引擎库--无论是开源还是私有。但是 Lucene 仅仅只是一个库。为了充分发挥其功能,你需要使用 Java 并将 Lucene 直接集成到应用程序中。 更糟糕的是,您可能需要获得信息检索学位才能了解其工作原理。Lucene 非常 复杂。Elasticsearch 也是使用 Java 编写的,它的内部使用 Lucene 做索引与搜索,但是它的目的是使全文检索变得简单, 通过隐藏 Lucene 的复杂性,取而代之的提供一套简单一致的 RESTful API。然而,Elasticsearch 不仅仅是 Lucene,并且也不仅仅只是一个全文搜索引擎。 它可以被下面这样准确的形容:

  1. 一个分布式的实时文档存储,每个字段 可以被索引与搜索
  2. 一个分布式实时分析搜索引擎
  3. 能胜任上百个服务节点的扩展,并支持 PB 级别的结构化或者非结构化数据

2.1、elasticsearch功能

  • 分布式的搜索引擎

es可作为一个分布式的搜索引擎,例如百度,淘宝的商品搜索,一般web系统的站内搜索,es都是不错的技术选型。

  • 数据分析引擎

es在搜索的基础上提供了丰富的API支持个性化的搜索和数据分析功能,比如电商网站,我们可以查询最近几天的热销商品等。

  • 对海量数据进行近实时的处理

es是一个分布式的搜索引擎,es通过集群和内部分片可以将海量数据分散到多台服务器上进行存储和检索,大大提高了其可伸缩性和容灾能力。
所谓近实时是一个相对的概念,一般的如果相应速度能达到秒级别,我们就称为其实近实时的。es的近实时包括两个方面:其一写入的数据在1s后就可以进行检索。其二其检索和分析响应速度可以达到秒级别。

2.2、elasticsearch的特点

  • 分布式

es是一个分布式的搜索引擎,可以很好的进行数据的容灾迁移,动态扩容,负载均衡等分布式特性。

  • 海量数据

es能处理PB级别的数据,因为es是一个分布式的架构,支持动态扩容,所以对于海量数据的处理和存储都不再是问题。

三、elasticsearch的几个基础概念

es中数据的基础概念

  • index

索引(index)类似于关系型数据库里的“数据库”——它是我们存储和索引关联数据的地方。

提示:

事实上,我们的数据被存储和索引在分片(shards)中,索引只是一个把一个或多个分片分组在一起的逻辑空间。然而,这只是一些内部细节——我们的程序完全不用关心分 片。对于我们的程序而言,文档存储在索引(index)中。
剩下的细节由Elasticsearch关心 既可。

  • type

type的概念类似于MySql中表的概念。

在应用中,我们使用对象表示一些“事物”,例如一个用户、一篇博客、一个评论,或者一封邮 件。每个对象都属于一个类(class),这个类定义了属性或与对象关联的数据。 user 类的对象 可能包含姓名、性别、年龄和Email地址。 在关系型数据库中,我们经常将相同类的对象存储在一个表里,因为它们有着相同的结构。 同理,在Elasticsearch中,我们使用相同类型(type)的文档表示相同的“事物”,因为他们的数 据结构也是相同的。 每个类型(type)都有自己的映射(mapping)或者结构定义,就像传统数据库表中的列一样。所 有类型下的文档被存储在同一个索引下,但是类型的映射(mapping)会告诉Elasticsearch不同 的文档如何被索引。 我们将会在《映射》章节探讨如何定义和管理映射,但是现在我们将依 赖Elasticsearch去自动处理数据结构。

  • document

document是es的基本索引单元,document类似于MySql中的一行记录。document的数据是json格式。

  • id

在MySql中我们使用主键表示一条记录的唯一性,在es中id就是这种概念。在es中id同样可以是自产生的,es自动生成的ID具备以下特点:自动生成的是 url安全的,base64编码,GUID,保证分布式下ID不冲突(全局唯一ID)。当然也可以我们自己来指定。

2,es在分布式下的几个概念

  • Cluster(集群):

相信熟悉分布式的小伙伴对这个Cluster都不会陌生,Cluster表示es的一个集群,所谓集群就是有好多es组合成的一个分布式下的es集群。

  • node(节点):

node就是es集群(Cluster)中的一个es节点就称为node。简单来说可以理解成一个es实例就是该集群中的一个节点。

3,es存储策略上的两个概念

  • shard(分片)和 replica:

为了将数据添加到Elasticsearch,我们需要索引(index)——一个存储关联数据的地方。实际 上,索引只是一个用来指向一个或多个分片(shards)的“逻辑命名空间(logical namespace)”. 一个分片(shard)是一个最小级别“工作单元(worker unit)”,它只是保存了索引中所有数据的一 部分。

道分片就是一个Lucene实例,并且它本身就是一个完整的搜索引擎。我们的文档存储在分片中,并且在分片中被索引,但是我们的应用程序不会直接与它们通信,取而代之的是,直接与索引通信。 分片是Elasticsearch在集群中分发数据的关键。把分片想象成数据的容器。文档存储在分片中,然后分片分配到你集群中的节点上。当你的集群扩容或缩小,Elasticsearch将会自动在你的节点间迁移分片,以使集群保持平衡。 分片可以是主分片(primary shard)或者是复制分片(replica shard)。

你索引中的每个文档属于一个单独的主分片,所以主分片的数量决定了索引最多能存储多少数据。 理论上主分片能存储的数据大小是没有限制的,限制取决于你实际的使用情况。分片的最大容量完全取决于你的使用状况:硬件存储的大小、文档的大小和复杂度、如何索引 和查询你的文档,以及你期望的响应时间。

复制分片只是主分片的一个副本,它可以防止硬件故障导致的数据丢失,同时可以提供读请 求,比如搜索或者从别的shard取回文档。 当索引创建完成的时候,主分片的数量就固定了,但是复制分片的数量可以随时调整。 默认情况下,一个索引被分配5个主分片,一个主分片默认只有一个复制分片。

重点:

shard分为两种:

  1. primary shard --- 主分片

    1. replica shard --- 复制分片(或者称为备份分片或者副本分片)

需要注意的是,在业界有一个约定俗称的东西,单说一个单词shard一般指的是primary shard,而单说一个单词replica就是指的replica shard。

另外一个需要注意的是replica shard是相对于索引而言的,如果说当前index有一个复制分片,那么相对于主分片来说就是每一个主分片都有一个复制分片,即如果有5个主分片就有5个复制分片,并且主分片和复制分片之间是一一对应的关系。

很重要的一点:primary shard不能和replica shard在同一个节点上。重要的事情说三遍:

**primary shard不能和replica shard在同一个节点上
primary shard不能和replica shard在同一个节点上
primary shard不能和replica shard在同一个节点上**

所以es最小的高可用配置为两台服务器。

四、elasticsearch的安装和开发工具

  • 本人安装的是elasticsearch-6.6.2版本
  • 开发工具:kibana-6.6.2(注意kibana的版本一定要和elasticsearch的版本一致)

另外本地还配置了另一个开发工具:elasticsearch-head

安装方式,大家去百度一下,有很多很详细的安装步骤,在这里就不在赘述了。

简单贴一张图关于如何在kibana中执行curl

在这里插入图片描述

四、集群健康状态

Elasticsearch 的集群监控信息中包含了许多的统计数据,其中最为重要的一项就是集群健康,它在 status 字段中展示为 green 、 yellow 或者 red。

在kibana中执行:GET /_cat/health?v

1 epoch      timestamp cluster        status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
2 1568794410 08:13:30  my-application yellow          1         1     47  47    0    0       40             0                  -                 54.0%

其中我们可以看到当前我本地的集群健康状态是yellow ,但这里问题来了,集群的健康状况是如何进行判断的呢?

  • green(很健康)
    所有的主分片和副本分片都正常运行。
  • yellow(亚健康)
    所有的主分片都正常运行,但不是所有的副本分片都正常运行。
  • red(不健康)
    有主分片没能正常运行。

注意:

我本地只配置了一个单节点的elasticsearch,因为primary shard和replica shard是不能分配到一个节点上的所以,在我本地的elasticsearch中是不存在replica shard的,所以健康状况为yellow。

文章也会持续更新,可以微信搜索「 迈莫coding 」第一时间阅读。每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。
原文地址:https://www.cnblogs.com/hello...
查看原文

赞 0 收藏 0 评论 0

迈莫coding 发布了文章 · 3月19日

你真的知道怎么实现一个延迟队列吗?

目录

  • 前言
  • 延迟队列定义
  • 应用场景
  • 实现方案

    • Redis zset
    • TimeWheel
    • 时间轮结构
    • 时间轮运行逻辑
  • 总结
原文地址:https://mp.weixin.qq.com/s/jL8_23pjYWV74rsjoWNPWg

前言

延迟队列是我们日常开发中,较为频繁接触的一种技术方案。顾名思义,延迟队列就是具有延迟功能的消息队列。比如往该队列里投递了一个延时为60s的信息,那么60s后就能收到该信息。自己在网上搜索资料整理,学习一下,为此进行了一次总结并且把知识分享出来。

延迟队列定义

首先,大家对队列数据结构一定不陌生了,它是一种先进先出的数据结构。普通队列中的元素是有序的,它们遵循着先进先出的规则,也就是说,先入队的任务优先被执行,最后入队的任务最后执行。

延迟队列与普通队列的最大区别在于其延迟的属性上,普通队列是先入队列的任务优先被执行,而延迟队列在入队时会指定一个延迟时间,表示希望经过该指定时间后处理。比如你在某个购物app上下单,而你却没支付,待支付界面上就会提醒你15分钟后自动取消,这就是延迟队列的典型范例。

应用场景

在我们现实生活中,延迟队列的使用是比较多的,比如说以下几个场景:

  • 订单超时未支付,该订单会自动取消
  • 用户外卖下单后,距离超时时间还有15分钟时,系统会提醒外卖小哥及时送餐,以免超时
  • 商品收货后未作出评价,系统默认7天后给出5星好评

......

延迟队列的使用场景是无处不在的。在以上场景中,如果不使用延迟队列,则需要业务方每秒轮训数据库,比较现在时间是否符合设定的时间,每个业务方都需要一样的重复逻辑。因此,我们可以将其抽象提取出来,作为公共组件,为此,今天的主角-延迟队列至此诞生了。

有了延迟队列,每个业务方只需要把任务添加到延迟队列中,并设置延迟时间即可,到了指定时间,任务就会被自动触发,调用对应的逻辑方法进行处理。

延迟队列为我们提供了解决大量需要延迟执行的任务提供了一个合理的解决方案。接下来,我们一起来看看延迟队列究竟是如何实现的。

实现方案

Redis zset

我们把客户端需要延迟执行的消息称为一个延迟任务,那么我们就可以使用 Redis ZSet 数据结构进行存储,延迟任务的ID作为key值,value值就是整个任务详情,score值为该延迟执行的消息延迟时间。

那么我们可以通过以下几个步骤使用Redis的ZSet数据结构来实现一个延迟队列:

  • 使用 ZADD key score value 语法进行入队操作,把延迟任务的ID作为key值,整个任务详情作为value值,该任务需要延时的时间作为score。
  • 启动一个线程(每隔一秒执行)通过 ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES 方法查询ZSet中的任务是否可执行。其中会有两种情况:
  • 如果查询出来的分数小于当前时间戳,说明这个任务已经可以执行了,则去异步执行
  • 如果查询出来的分数大于当前时间戳,说明该队列中没有需要啊执行的任务,则休眠一秒后再次轮训

从实现步骤来看,通过Redis zset 实现延迟队列是一种容易理解并实现相对简单的实现方式。并且我们可以依赖Redis 自身的持久化来实现持久化,使用Redis集群来支持高并发和高可用,是一种不错的延迟队列的实现方案。

TimeWheel

TimeWheel时间轮算法,也是一种实现延迟队列的方案之一。其应用场景丰富,在 Netty、Akka、Quartz、ZooKeeper 、Kafka等组件中都存在时间轮的踪影。

时间轮结构

在这里插入图片描述

如上面所示,时间轮是一个存储延迟任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个延迟任务列表(HashedWheelBucket),HashedWheelBucket是一个环形的双向链表(图中红色处),链表中的每一项表示的都是延迟任务项,其中封装链真正的延迟任务。

时间轮是由多个时间格组成的,每个时间格表示当前时间的基本跨度。并且时间格个数是固定的。

时间轮还有一个表盘指针,用来表示时间轮当前所指时间,随着时间的迁移,不断处理时间格中对应的延迟任务。

时间轮运行逻辑

时间轮在启动时候会记录当前启动的时间赋值给startTime。时间轮在添加延迟任务时首先会计算出一个延迟时间,比如一个任务的延迟时间为30s,那么会将当前时间+30s-时间轮启动时间,计算出一个时间戳(延迟时间)。然后将延迟任务加入到对应时间格的链表中,等待执行。

然后需要计算出几个参数值:

  1. 延迟任务总共延迟的次数:将每个任务的延迟时间/时间格计算出tick需要跳动的次数
  2. 计算时间轮round次数:根据计算的需要走的(总次数-当前tick数量)/时间格个数,比如我们现在需要添加一个延时为24秒的延迟任务,如果当前tick为0,那么轮数=(24-0)/20=1,那么指针每运行一圈就会将round取出来减一,所以需要转动到第二轮之后才可以将轮数round减为0之后才会运行
  3. 计算出该任务需要放置到时间轮(wheel)的槽位,然后加入到槽位链表最后

将timeouts中的数据放置到时间轮wheel中之后,计算出当前时针走到的槽位的位置,并取出槽位中的链表数据,将deadline和当前的时间做对比,运行过期的数xx据。

使用时间轮实现的延迟队列,能够支持大量任务的高效触发。在Kafka的时间轮训算法的实现方案中,引入了DelayQueue,使用DelayQueue来推送时间轮滚动,而延迟任务的添加与删除操作都放在时间轮中,这样的设计大幅度提升了整个延迟队列的执行效率。

总结

延迟队列在我们日常开发中应用非常广泛,在本文中分别介绍了使用Redis zset和TimeWheel时间轮两种方式实现延迟队列。从实现过程中,可以发现使用Redis zset实现延迟队列理解起来最为简单,能够快速落地,但Redis毕竟是基于内存的,虽然有持久化机制,但还是有数据丢失的可能性。而使用TimeWheel时间轮算法,是一个非常巧妙的方案,但同时也是最为难理解的方案。到这里,文章也基本结束了,希望本文对你们实现延迟队列提供一点思路。

文章也会持续更新,可以微信搜索「 迈莫coding 」第一时间阅读。每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。
查看原文

赞 0 收藏 0 评论 0

迈莫coding 收藏了文章 · 3月18日

kafka核心原理的秘密,藏在这16张图里

文章首发公众号:码哥字节(ID:MageByte)

Kafka 是一个优秀的分布式消息中间件,许多系统中都会使用到 Kafka 来做消息通信。对分布式消息系统的了解和使用几乎成为一个后台开发人员必备的技能。今天码哥字节就从常见的 Kafka 面试题入手,和大家聊聊 Kafka 的那些事儿。

思维导图

讲一讲分布式消息中间件

问题

  • 什么是分布式消息中间件?
  • 消息中间件的作用是什么?
  • 消息中间件的使用场景是什么?
  • 消息中间件选型?

消息队列

分布式消息是一种通信机制,和 RPC、HTTP、RMI 等不一样,消息中间件采用分布式中间代理的方式进行通信。如图所示,采用了消息中间件之后,上游业务系统发送消息,先存储在消息中间件,然后由消息中间件将消息分发到对应的业务模块应用(分布式生产者 - 消费者模式)。这种异步的方式,减少了服务之间的耦合程度。

架构

定义消息中间件:

  • 利用高效可靠的消息传递机制进行平台无关的数据交流
  • 基于数据通信,来进行分布式系统的集成
  • 通过提供消息传递和消息排队模型,可以在分布式环境下扩展进程间的通信

在系统架构中引用额外的组件,必然提高系统的架构复杂度和运维的难度,那么在系统中使用分布式消息中间件有什么优势呢?消息中间件在系统中起的作用又是什么呢?

  • 解耦
  • 冗余(存储)
  • 扩展性
  • 削峰
  • 可恢复性
  • 顺序保证
  • 缓冲
  • 异步通信

面试时,面试官经常会关心面试者对开源组件的选型能力,这既可以考验面试者知识的广度,也可以考验面试者对某类系统的知识的认识深度,而且也可以看出面试者对系统整体把握和系统架构设计的能力。开源分布式消息系统有很多,不同的消息系统的特性也不一样,选择怎样的消息系统,不仅需要对各消息系统有一定的了解,也需要对自身系统需求有清晰的认识。

下面是常见的几种分布式消息系统的对比:

选择

答案关键字

  • 什么是分布式消息中间件?通信,队列,分布式,生产消费者模式。
  • 消息中间件的作用是什么? 解耦、峰值处理、异步通信、缓冲。
  • 消息中间件的使用场景是什么? 异步通信,消息存储处理。
  • 消息中间件选型?语言,协议、HA、数据可靠性、性能、事务、生态、简易、推拉模式。

Kafka 基本概念和架构

问题

  • 简单讲下 Kafka 的架构?
  • Kafka 是推模式还是拉模式,推拉的区别是什么?
  • Kafka 如何广播消息?
  • Kafka 的消息是否是有序的?
  • Kafka 是否支持读写分离?
  • Kafka 如何保证数据高可用?
  • Kafka 中 zookeeper 的作用?
  • 是否支持事务?
  • 分区数是否可以减少?

Kafka 架构中的一般概念:

架构

  • Producer:生产者,也就是发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka。
  • Consumer:消费者,也就是接受消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。
  • Consumer Group:一个消费者组可以包含一个或多个消费者。使用多分区 + 多消费者方式可以极大提高数据下游的处理速度,同一消费组中的消费者不会重复消费消息,同样的,不同消费组中的消费者消息消息时互不影响。Kafka 就是通过消费组的方式来实现消息 P2P 模式和广播模式。
  • Broker:服务代理节点。Broker 是 Kafka 的服务节点,即 Kafka 的服务器。
  • Topic:Kafka 中的消息以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。
  • Partition:Topic 是一个逻辑的概念,它可以细分为多个分区,每个分区只属于单个主题。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。
  • Offset:offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序性而不是主题有序性。
  • Replication:副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络一场,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。
  • Record: 实际写入 Kafka 中并可以被读取的消息记录。每个 record 包含了 key、value 和 timestamp。

Kafka Topic Partitions Layout

主题

Kafka 将 Topic 进行分区,分区可以并发读写。

Kafka Consumer Offset

consumer offset

zookeeper

zookeeper

  • Broker 注册:Broker 是分布式部署并且之间相互独立,Zookeeper 用来管理注册到集群的所有 Broker 节点。
  • Topic 注册: 在 Kafka 中,同一个 Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护
  • 生产者负载均衡:由于同一个 Topic 消息会被分区并将其分布在多个 Broker 上,因此,生产者需要将消息合理地发送到这些分布式的 Broker 上。
  • 消费者负载均衡:与生产者类似,Kafka 中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的 Broker 服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的 Topic 下面的消息,互不干扰。

答案关键字

  • 简单讲下 Kafka 的架构?

    Producer、Consumer、Consumer Group、Topic、Partition
  • Kafka 是推模式还是拉模式,推拉的区别是什么?

    Kafka Producer 向 Broker 发送消息使用 Push 模式,Consumer 消费采用的 Pull 模式。拉取模式,让 consumer 自己管理 offset,可以提供读取性能
  • Kafka 如何广播消息?

    Consumer group
  • Kafka 的消息是否是有序的?

    Topic 级别无序,Partition 有序
  • Kafka 是否支持读写分离?

    不支持,只有 Leader 对外提供读写服务
  • Kafka 如何保证数据高可用?

    副本,ack,HW
  • Kafka 中 zookeeper 的作用?

    集群管理,元数据管理
  • 是否支持事务?

    0.11 后支持事务,可以实现”exactly once“
  • 分区数是否可以减少?

    不可以,会丢失数据

Kafka 使用

问题

  • Kafka 有哪些命令行工具?你用过哪些?
  • Kafka Producer 的执行过程?
  • Kafka Producer 有哪些常见配置?
  • 如何让 Kafka 的消息有序?
  • Producer 如何保证数据发送不丢失?
  • 如何提升 Producer 的性能?
  • 如果同一 group 下 consumer 的数量大于 part 的数量,kafka 如何处理?
  • Kafka Consumer 是否是线程安全的?
  • 讲一下你使用 Kafka Consumer 消费消息时的线程模型,为何如此设计?
  • Kafka Consumer 的常见配置?
  • Consumer 什么时候会被踢出集群?
  • 当有 Consumer 加入或退出时,Kafka 会作何反应?
  • 什么是 Rebalance,何时会发生 Rebalance?

命令行工具

Kafka 的命令行工具在 Kafka 包的/bin目录下,主要包括服务和集群管理脚本,配置脚本,信息查看脚本,Topic 脚本,客户端脚本等。

  • kafka-configs.sh: 配置管理脚本
  • kafka-console-consumer.sh: kafka 消费者控制台
  • kafka-console-producer.sh: kafka 生产者控制台
  • kafka-consumer-groups.sh: kafka 消费者组相关信息
  • kafka-delete-records.sh: 删除低水位的日志文件
  • kafka-log-dirs.sh:kafka 消息日志目录信息
  • kafka-mirror-maker.sh: 不同数据中心 kafka 集群复制工具
  • kafka-preferred-replica-election.sh: 触发 preferred replica 选举
  • kafka-producer-perf-test.sh:kafka 生产者性能测试脚本
  • kafka-reassign-partitions.sh: 分区重分配脚本
  • kafka-replica-verification.sh: 复制进度验证脚本
  • kafka-server-start.sh: 启动 kafka 服务
  • kafka-server-stop.sh: 停止 kafka 服务
  • kafka-topics.sh:topic 管理脚本
  • kafka-verifiable-consumer.sh: 可检验的 kafka 消费者
  • kafka-verifiable-producer.sh: 可检验的 kafka 生产者
  • zookeeper-server-start.sh: 启动 zk 服务
  • zookeeper-server-stop.sh: 停止 zk 服务
  • zookeeper-shell.sh:zk 客户端

我们通常可以使用kafka-console-consumer.shkafka-console-producer.sh脚本来测试 Kafka 生产和消费,kafka-consumer-groups.sh可以查看和管理集群中的 Topic,kafka-topics.sh通常用于查看 Kafka 的消费组情况。

Kafka Producer

Kafka producer 的正常生产逻辑包含以下几个步骤:

  1. 配置生产者客户端参数常见生产者实例。
  2. 构建待发送的消息。
  3. 发送消息。
  4. 关闭生产者实例。

Producer 发送消息的过程如下图所示,需要经过拦截器序列化器分区器,最终由累加器批量发送至 Broker。

producer

Kafka Producer 需要以下必要参数:

  • bootstrap.server: 指定 Kafka 的 Broker 的地址
  • key.serializer: key 序列化器
  • value.serializer: value 序列化器

常见参数:

  • batch.num.messages

    默认值:200,每次批量消息的数量,只对 asyc 起作用。
  • request.required.acks

    默认值:0,0 表示 producer 毋须等待 leader 的确认,1 代表需要 leader 确认写入它的本地 log 并立即确认,-1 代表所有的备份都完成后确认。 只对 async 模式起作用,这个参数的调整是数据不丢失和发送效率的 tradeoff,如果对数据丢失不敏感而在乎效率的场景可以考虑设置为 0,这样可以大大提高 producer 发送数据的效率。
  • request.timeout.ms

    默认值:10000,确认超时时间。
  • partitioner.class

    默认值:kafka.producer.DefaultPartitioner,必须实现 kafka.producer.Partitioner,根据 Key 提供一个分区策略。_有时候我们需要相同类型的消息必须顺序处理,这样我们就必须自定义分配策略,从而将相同类型的数据分配到同一个分区中。_
  • producer.type

    默认值:sync,指定消息发送是同步还是异步。异步 asyc 成批发送用 kafka.producer.AyncProducer, 同步 sync 用 kafka.producer.SyncProducer。同步和异步发送也会影响消息生产的效率。
  • compression.topic

    默认值:none,消息压缩,默认不压缩。其余压缩方式还有,"gzip"、"snappy"和"lz4"。对消息的压缩可以极大地减少网络传输量、降低网络 IO,从而提高整体性能。
  • compressed.topics

    默认值:null,在设置了压缩的情况下,可以指定特定的 topic 压缩,未指定则全部压缩。
  • message.send.max.retries

    默认值:3,消息发送最大尝试次数。
  • retry.backoff.ms

    默认值:300,每次尝试增加的额外的间隔时间。
  • topic.metadata.refresh.interval.ms

    默认值:600000,定期的获取元数据的时间。当分区丢失,leader 不可用时 producer 也会主动获取元数据,如果为 0,则每次发送完消息就获取元数据,不推荐。如果为负值,则只有在失败的情况下获取元数据。
  • queue.buffering.max.ms

    默认值:5000,在 producer queue 的缓存的数据最大时间,仅仅 for asyc。
  • queue.buffering.max.message

    默认值:10000,producer 缓存的消息的最大数量,仅仅 for asyc。
  • queue.enqueue.timeout.ms

    默认值:-1,0 当 queue 满时丢掉,负值是 queue 满时 block, 正值是 queue 满时 block 相应的时间,仅仅 for asyc。

Kafka Consumer

Kafka 有消费组的概念,每个消费者只能消费所分配到的分区的消息,每一个分区只能被一个消费组中的一个消费者所消费,所以同一个消费组中消费者的数量如果超过了分区的数量,将会出现有些消费者分配不到消费的分区。消费组与消费者关系如下图所示:

consumer group

Kafka Consumer Client 消费消息通常包含以下步骤:

  1. 配置客户端,创建消费者
  2. 订阅主题
  3. 拉去消息并消费
  4. 提交消费位移
  5. 关闭消费者实例

过程

因为 Kafka 的 Consumer 客户端是线程不安全的,为了保证线程安全,并提升消费性能,可以在 Consumer 端采用类似 Reactor 的线程模型来消费数据。

消费模型

Kafka consumer 参数

  • bootstrap.servers: 连接 broker 地址,host:port 格式。
  • group.id: 消费者隶属的消费组。
  • key.deserializer: 与生产者的key.serializer对应,key 的反序列化方式。
  • value.deserializer: 与生产者的value.serializer对应,value 的反序列化方式。
  • session.timeout.ms: coordinator 检测失败的时间。默认 10s 该参数是 Consumer Group 主动检测 (组内成员 comsummer) 崩溃的时间间隔,类似于心跳过期时间。
  • auto.offset.reset: 该属性指定了消费者在读取一个没有偏移量后者偏移量无效(消费者长时间失效当前的偏移量已经过时并且被删除了)的分区的情况下,应该作何处理,默认值是 latest,也就是从最新记录读取数据(消费者启动之后生成的记录),另一个值是 earliest,意思是在偏移量无效的情况下,消费者从起始位置开始读取数据。
  • enable.auto.commit: 否自动提交位移,如果为false,则需要在程序中手动提交位移。对于精确到一次的语义,最好手动提交位移
  • fetch.max.bytes: 单次拉取数据的最大字节数量
  • max.poll.records: 单次 poll 调用返回的最大消息数,如果处理逻辑很轻量,可以适当提高该值。 但是max.poll.records条数据需要在在 session.timeout.ms 这个时间内处理完 。默认值为 500
  • request.timeout.ms: 一次请求响应的最长等待时间。如果在超时时间内未得到响应,kafka 要么重发这条消息,要么超过重试次数的情况下直接置为失败。

Kafka Rebalance

rebalance 本质上是一种协议,规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic 的每个分区。比如某个 group 下有 20 个 consumer,它订阅了一个具有 100 个分区的 topic。正常情况下,Kafka 平均会为每个 consumer 分配 5 个分区。这个分配的过程就叫 rebalance。

什么时候 rebalance?

这也是经常被提及的一个问题。rebalance 的触发条件有三种:

  • 组成员发生变更(新 consumer 加入组、已有 consumer 主动离开组或已有 consumer 崩溃了——这两者的区别后面会谈到)
  • 订阅主题数发生变更
  • 订阅主题的分区数发生变更

如何进行组内分区分配?

Kafka 默认提供了两种分配策略:Range 和 Round-Robin。当然 Kafka 采用了可插拔式的分配策略,你可以创建自己的分配器以实现不同的分配策略。

答案关键字

  • Kafka 有哪些命令行工具?你用过哪些?/bin目录,管理 kafka 集群、管理 topic、生产和消费 kafka
  • Kafka Producer 的执行过程?拦截器,序列化器,分区器和累加器
  • Kafka Producer 有哪些常见配置?broker 配置,ack 配置,网络和发送参数,压缩参数,ack 参数
  • 如何让 Kafka 的消息有序?Kafka 在 Topic 级别本身是无序的,只有 partition 上才有序,所以为了保证处理顺序,可以自定义分区器,将需顺序处理的数据发送到同一个 partition
  • Producer 如何保证数据发送不丢失?ack 机制,重试机制
  • 如何提升 Producer 的性能?批量,异步,压缩
  • 如果同一 group 下 consumer 的数量大于 part 的数量,kafka 如何处理?多余的 Part 将处于无用状态,不消费数据
  • Kafka Consumer 是否是线程安全的?不安全,单线程消费,多线程处理
  • 讲一下你使用 Kafka Consumer 消费消息时的线程模型,为何如此设计?拉取和处理分离
  • Kafka Consumer 的常见配置?broker, 网络和拉取参数,心跳参数
  • Consumer 什么时候会被踢出集群?奔溃,网络异常,处理时间过长提交位移超时
  • 当有 Consumer 加入或退出时,Kafka 会作何反应?进行 Rebalance
  • 什么是 Rebalance,何时会发生 Rebalance?topic 变化,consumer 变化

高可用和性能

问题

  • Kafka 如何保证高可用?
  • Kafka 的交付语义?
  • Replic 的作用?
  • 什么事 AR,ISR?
  • Leader 和 Flower 是什么?
  • Kafka 中的 HW、LEO、LSO、LW 等分别代表什么?
  • Kafka 为保证优越的性能做了哪些处理?

分区与副本

分区副本

在分布式数据系统中,通常使用分区来提高系统的处理能力,通过副本来保证数据的高可用性。多分区意味着并发处理的能力,这多个副本中,只有一个是 leader,而其他的都是 follower 副本。仅有 leader 副本可以对外提供服务。 多个 follower 副本通常存放在和 leader 副本不同的 broker 中。通过这样的机制实现了高可用,当某台机器挂掉后,其他 follower 副本也能迅速”转正“,开始对外提供服务。

为什么 follower 副本不提供读服务?

这个问题本质上是对性能和一致性的取舍。试想一下,如果 follower 副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读。 比如你现在写入一条数据到 kafka 主题 a,消费者 b 从主题 a 消费数据,却发现消费不到,因为消费者 b 去读取的那个分区副本中,最新消息还没写入。而这个时候,另一个消费者 c 却可以消费到最新那条数据,因为它消费了 leader 副本。Kafka 通过 WH 和 Offset 的管理来决定 Consumer 可以消费哪些数据,已经当前写入的数据。

watermark

只有 Leader 可以对外提供读服务,那如何选举 Leader

kafka 会将与 leader 副本保持同步的副本放到 ISR 副本集合中。当然,leader 副本是一直存在于 ISR 副本集合中的,在某些特殊情况下,ISR 副本中甚至只有 leader 一个副本。 当 leader 挂掉时,kakfa 通过 zookeeper 感知到这一情况,在 ISR 副本中选取新的副本成为 leader,对外提供服务。 但这样还有一个问题,前面提到过,有可能 ISR 副本集合中,只有 leader,当 leader 副本挂掉后,ISR 集合就为空,这时候怎么办呢?这时候如果设置 unclean.leader.election.enable 参数为 true,那么 kafka 会在非同步,也就是不在 ISR 副本集合中的副本中,选取出副本成为 leader。

副本的存在就会出现副本同步问题

Kafka 在所有分配的副本 (AR) 中维护一个可用的副本列表 (ISR),Producer 向 Broker 发送消息时会根据ack配置来确定需要等待几个副本已经同步了消息才相应成功,Broker 内部会ReplicaManager服务来管理 flower 与 leader 之间的数据同步。

sync

性能优化

  • partition 并发
  • 顺序读写磁盘
  • page cache:按页读写
  • 预读:Kafka 会将将要消费的消息提前读入内存
  • 高性能序列化(二进制)
  • 内存映射
  • 无锁 offset 管理:提高并发能力
  • Java NIO 模型
  • 批量:批量读写
  • 压缩:消息压缩,存储压缩,减小网络和 IO 开销

Partition 并发

一方面,由于不同 Partition 可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面,由于 Partition 在物理上对应一个文件夹,即使多个 Partition 位于同一个节点,也可通过配置让同一节点上的不同 Partition 置于不同的 disk drive 上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。

顺序读写

Kafka 每一个 partition 目录下的文件被平均切割成大小相等(默认一个文件是 500 兆,可以手动去设置)的数据文件,
每一个数据文件都被称为一个段(segment file), 每个 segment 都采用 append 的方式追加数据。

追加数据

答案关键字

  • Kafka 如何保证高可用?

    通过副本来保证数据的高可用,producer ack、重试、自动 Leader 选举,Consumer 自平衡
  • Kafka 的交付语义?

    交付语义一般有at least onceat most onceexactly once。kafka 通过 ack 的配置来实现前两种。
  • Replic 的作用?

    实现数据的高可用
  • 什么是 AR,ISR?

    AR:Assigned Replicas。AR 是主题被创建后,分区创建时被分配的副本集合,副本个 数由副本因子决定。
    ISR:In-Sync Replicas。Kafka 中特别重要的概念,指代的是 AR 中那些与 Leader 保 持同步的副本集合。在 AR 中的副本可能不在 ISR 中,但 Leader 副本天然就包含在 ISR 中。关于 ISR,还有一个常见的面试题目是如何判断副本是否应该属于 ISR。目前的判断 依据是:Follower 副本的 LEO 落后 Leader LEO 的时间,是否超过了 Broker 端参数 replica.lag.time.max.ms 值。如果超过了,副本就会被从 ISR 中移除。
  • Leader 和 Flower 是什么?
  • Kafka 中的 HW 代表什么?

    高水位值 (High watermark)。这是控制消费者可读取消息范围的重要字段。一 个普通消费者只能“看到”Leader 副本上介于 Log Start Offset 和 HW(不含)之间的 所有消息。水位以上的消息是对消费者不可见的。
  • Kafka 为保证优越的性能做了哪些处理?

    partition 并发、顺序读写磁盘、page cache 压缩、高性能序列化(二进制)、内存映射 无锁 offset 管理、Java NIO 模型

本文并没有深入 Kafka 的实现细节和源码分析,但 Kafka 确实是一个 优秀的开源系统,很多优雅的架构设计和源码设计都值得我们学习,十分建议感兴趣的同学更加深入的去了解一下这个开源系统,对于自身架构设计能力,编码能力,性能优化都会有很大的帮助。

推荐阅读

以下几篇文章阅读量与读者反馈都很好,推荐大家阅读:

MageByte

查看原文

迈莫coding 发布了文章 · 3月2日

go语言十分钟入门教程

在这里插入图片描述

导语|这是一篇go基本语法快速入门文章,学习该文章时,默认读者已安装成功Golang环境,若环境未安装成功,可自行百度。
原文地址:https://mp.weixin.qq.com/s/zvVzP0juPb4xk-GSuTNlOA

目录

  • 环境安装
  • 输出语句
  • Go语言关键字
  • 类型

    • 数据类型
    • 变量定义

      • var关键字定义
      • 简短模式
      • 多变量赋值
    • 常量
    • iota关键字
    • 运算符
  • 函数
  • 条件语句和循环语句

    • 条件语句
    • 循环语句
  • 数据

    • 数组
    • 字符串
    • 切片

      • 初始化slice
      • 示例
    • map字典
    • 结构体struct
  • 接口

    • 语法
    • 示例
  • 总结

环境安装

安装地址:[https://www.cnblogs.com/aaron...
](https://www.cnblogs.com/aaron...

输出语句

无论学那一门语言,首先先学该语言的输出语句。俗话说得好,输出"Hello, World!",代表你入门成功!!!

package main

import "fmt"

func main() {
  fmt.Println("Hello, World!")
}

接下来,一起学习go的基本语法,十分钟解决完战斗,走起!!!

Go语言关键字

首先先认识一下Go语言中关键字,心里有个印象,让初学者有个眼熟就行。记不住没关系,我会在下面语法反复提到。在这里之所以提出来,就是让你们看一下,看的看的就记住了。
在这里插入图片描述

类型

数据类型

在 Go 编程语言中,数据类型用于声明函数和变量。

数据类型的出现是为了把数据分成所需内存大小不同的数据,编程的时候需要用大数据的时候才需要申请大内存,就可以充分利用内存。

Go 语言按类别有以下几种数据类型:
在这里插入图片描述

变量定义

在数学概念中,变量表示没有固定值且可改变的数。但从计算机系统实现角度来看,变量是一段或多段用来存储数据的内存。

作为静态类型语言,go变量总是有固定的数据类型,类型决定了变量内存的长度和存储格式。我们只能修改变量值,无法改变类型。

var关键字定义

关键字var用于定义变量,和C不同,类型被放在变量后面。若显式提供初始值,可省略变量类型,由编译器推断。

var X int // 自动初始化为零
var y = false // 自动推断为bool的类型

可一次性定义多个变量,类型可相同也可不相同。

var x,y int
var a,b = 100, "abc"

简短模式

变量定义时,除var关键字外,还可使用更加简短的变量定义和初始化语法。

package main

import "fmt"

func main() {
  x := 10 // 使用 := 进行定义并初始化
  fmt.Println(x) // 输出语句 10
}

使用简短模式的一些限制:

  • 定义变量,同时显式初始化。
  • 不能提供数据类型。
  • 只能用在函数内部,不能用在全局变量中。

多变量赋值

进行多变量赋值操作时,首先计算出等号右边值,然后再依次完成赋值操作。

package main

import "fmt"

func main(){
  x, y := 10, 20
  x, y = y+3, x+2  // 先计算等号右边值,然后再对x、y变量赋值
  fmt.Println(x, y) // 输出语句  结果为:23 12
}

常量

常量表示运行时恒定不可改变的值,通常是一些字面量。使用常量就可用一个易于阅读理解的标识符号来代替"魔法数字",也使得在调整常量值时,无须修改所有引用代码。

常量值必须是编译期可确定的字符、字符串、数字或布尔值。可指定常量类型,或由编译器通过初始化推断。

在go语言中,使用关键字const来定义常量。

const x, y int = 10, 20
const a,b = "迈莫coding", "欢迎小伙伴"

示例:

package main

import "fmt"

const (
   a, b string = "迈莫coding", "欢迎小伙伴"
)

func main() {
   fmt.Println(a,b) // 迈莫coding 欢迎小伙伴
}

iota关键字

Go中没有明确意思上的enum(枚举)定义,不过可以借用iota标识符实现一组自增常量值来实现枚举类型。

const (
  a = iota // 0
  b        // 1
  c        // 2
)

变量a、b、c的值分别为0、1、2,原因是因为使用iota进行自增时,后续自增值按照序递增。通俗点是每新增一行,iota值加一。

若在中途中断iota自增,则必须显示恢复,如下所示:

const (
  a = iota // 0
  b        // 1
  c = 100  // 100
  d        // 100 (与上一行常量值表达式一致)
  e = iota // 4 (恢复iota自增,计数包括c、d)
  f        // 5
)

运算符

运算符使用方式和其他语言基本一样,在这里就不一一介绍了。

package main
import "fmt"
func main() {
   var a int = 21
   var b int = 10
   var c int
   c = a + b
   fmt.Println(c) // 31
   c = a - b
   fmt.Println(c) // 11
   c = a / b
   fmt.Println(c) // 2
   c = a % b
   fmt.Println(c) // 1
   a++
   fmt.Println(a) // 22
   a=21   // 为了方便测试,a 这里重新赋值为 21
   a--
   fmt.Println(a) // 20
}

函数

函数就是将复杂的算法过程分解为若干较小任务,进行拆分,易于维护。函数被设计成相对独立,通过接收输入参数完成一段算法指令,输出或存储相关结果。因此,函数还是代码复用和测试的基本单元。

关键字func用于定义函数。

package main

import "fmt"

// 定义 Write函数 返回值有两个,一个为name,一个age为
func Write() (name string, age int) {
   return "迈莫coding", 1
}

// 定义 Read函数
func Read(name string, age int) {
   fmt.Println(name, " 已经 ", age, " 岁了")
}

func main() {
   Read(Write()) // 迈莫coding  已经  1  岁了
}

条件语句和循环语句

条件语句

条件语句需要开发者通过指定一个或多个条件,并通过测试条件是否为 true 来决定是否执行指定语句,并在条件为 false 的情况在执行另外的语句。

下图展示了程序语言中条件语句的结构:

在这里插入图片描述

package main

import "fmt"

func main() {
  x := 3
  
  if x > 5 {
    fmt.Println("a")
  } else if x < 5 && x > 0 {
    fmt.Println("b")
  } else {
    fmt.Println("c")
  }
}

循环语句

在不少实际问题中有许多具有规律性的重复操作,因此在程序中就需要重复执行某些语句。

以下为大多编程语言循环程序的流程图:

在这里插入图片描述

package main

import "fmt"

func main() {
  for i := 0; i < 5; i++ {
    if i == 4 {
      continue
    } else if i == 5 {
      break
    }     
    fmt.Println(i)
  }
}

数据

数组

Go 语言提供了数组类型的数据结构。

数组是具有相同唯一类型的一组已编号且长度固定的数据项序列,这种类型可以是任意的原始类型例如整型、字符串或者自定义类型。

在这里插入图片描述

package main

import "fmt"

func main() {
  var arr1 [4]int // 元素自动初始化为零
  fmt.Println(arr1) // [0 0 0 0]
  
  arr2 := [4]int{1,2} // 其他未初始化的元素为零
  fmt.Println(arr2) // [1 2 0 0]
  
  arr3 := [4]int{5, 3:10} // 可指定索引位置初始化
  fmt.Println(arr3) // [5 0 0 10]
  
  arr4 := [...]int{1,2,3} // 编译器按照初始化值数量确定数组长度
  fmt.Println(arr4) // [1 2 3]
  
  t := len(arr4) // 内置函数len(数组名称)表示数组的长度
  fmt.Println(t) // 3
}

字符串

字符串默认值不是nil,而是""。

package main

import "fmt"

func main() {
  var str string
  str = "迈莫coding欢迎小伙伴"
  fmt.Println(str)
}

切片

切片(slice)本身不是动态数组或动态指针。只是它内部采用数组存储数据,当数组长度达到数组容量时,会进行动态扩容。

大白话就是切片功能和Java中的List集合类似,动态添加数据。不像数组(array)长度是固定的,需要事先知道数据长度。

初始化slice

x := make([]int, 1) // 通过make关键字进行slice初始化

示例

package main

import "fmt"

func main() {
    // 方式一
    a := make([]int,5) // 初始化长度为5的slice,默认值为零
    for i := 0; i <5; i++ {
       a = append(a, i)
    }
    a = append(a, 6)
    fmt.Println(a) // [0 0 0 0 0 0 1 2 3 4 6] 

    // 方式二    
    var a []int
    for i := 0; i < 5; i++ {
       a = append(a, i)
    }
    fmt.Println(a) // [0 1 2 3 4]
}

map字典

map字典也是使用频率比较高的数据结构。将其作为语言内置类型,从运行时层面进行优化,可获得更高效类型。

作为无序键值对集合,字典key值必须是支持相等运算符的数据类型,比如数字、字符串、指针、数组、结构体,以及对应接口类型。

map字典功能和Java中的map集合功能类似。

字典是应用类型,使用make函数或初始化表达语句来创建。

package main

import "fmt"

func main() {
   // 定义 变量strMap
   var strMap map[int]string
   // 进行初始化
   strMap = make(map[int]string)
   
   // 给map 赋值
   for i := 0; i < 5; i++ {
      strMap[i]  = "迈莫coding"
   }
   
   // 打印出map值
   for k, v := range strMap{
      fmt.Println(k, ":", v)
   }
  
  // 打印出map 长度
  fmt.Println(len(strMap))   
}

结构体struct

结构体(struct)将多个不同类型命名字段(field)序列打包成一个复合类型。

字段名必须唯一,可用"_"补位,支持使用自身指针类型成员。字段属性为基本数据类型。

学过Java就可以进行类比,结构体struct可以类比为Java中的类,结构体struct中字段属性可以类比为Java中类成员变量,结构体struct的方法可以类比为Java中类成员方法。

结构体(struct)语法如下:

type user struct {
  name string // 字段name 其数据类型为string
  age int // 字段age 其数据类型为int 
}

示例:

package main

import "fmt"

type user struct {
   name string
   age  int
}

// 结构体user Read方法
func (u *user) Read() string {
   return fmt.Sprintf("%s 已经 %d 岁了", u.name, u.age)
}

func main() {
   // 初始化
   u := &user{
      name: "迈莫coding",
      age:  1,
   }
   fmt.Println(u.name, "已经", u.age, "岁了")
   // 调用结构体user的 Read方法
   fmt.Println(u.Read()) // 迈莫coding 已经 1 岁了
}

接口

接口代表一个调用契约,是多个方法声明的集合。

接口解除了类型依赖,有助于减少用户可视方法,屏蔽内部结构和实现细节。在Go语言中,只要目标类型方法集内包含接口声明的全部方法,就被视为实现了该接口,无须做显示声明。当然,目标类型可实现多个接口。

大白话,接口是多个方法声明的集合,若一个struct类实现接口中所有方法,即表示该类实现了指定接口。

语法

type user interface{
}

示例

package main

import "fmt"

// 定义接口 包含公共方法
type user interface{
  talking()
}

// 定义一个struct类
type memo struct{
}

// 实现接口user中方法talking
func (m *memo) talking() {
  fmt.Println("迈莫coding欢迎您...")
}

func main() {
  mm := memo{}
  mm.talking()
}

总结

文章介绍了Go语言的基本语法,适合零小白查看,使其快速上手Go语言项目开发,但文章毕竟是快速入门,有许多没讲过的知识点,需读者自行学习,也可关注我,和我一起学习Go语言。

文章也会持续更新,可以微信搜索「 迈莫coding 」第一时间阅读。每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。
查看原文

赞 0 收藏 0 评论 0

迈莫coding 发布了文章 · 2月26日

深度解析Golang sync.Once源码

目录

  • 什么是sync.Once
  • 如何使用sync.Once
  • 源码分析
文章始发于公众号【迈莫coding】https://mp.weixin.qq.com/s/b89PmljELaPaVuLw-YIQKg

什么是sync.Once

Once 可以用来执行且仅仅执行一次动作,常常用于单例对象的初始化场景。

Once 常常用来初始化单例资源,或者并发访问只需初始化一次的共享资源,或者在测试的时候初始化一次测试资源。

sync.Once 只暴露了一个方法 Do,你可以多次调用 Do 方法,但是只有第一次调用 Do 方法时 f 参数才会执行,这里的 f 是一个无参数无返回值的函数。

如何使用sync.Once

就拿我负责的一个项目来说,因为项目的配置是挂在第三方平台上,所以在项目启动时需要获取资源配置,因为需要一个方法来保证配置仅此只获取一次,因此,我们考虑使用 sync.Once 来获取资源。这样的话,可以防止在其他地方调用获取资源方法,该方法仅此执行一次。

下面我简单写个Demo来演示一个sync.Once如何使用

package main
import (
   "fmt"
   "sync"
)
var once sync.Once
var con string
func main() {
   once.Do(func() {
      con = "hello Test once.Do"
   })
   fmt.Println(con)
}

代码说明:

代码的话比较简单,就是通过调用Do方法,采用闭包方式,将字符串("hello Test once.Do")赋值给con,进而打印出值,这就是 sync.Once 的使用,比较容易上手。

但我们用一个方法或者框架时,如果不对其了如指掌,总有点不太靠谱,感觉心里不踏实。为此,我们来聊一聊 sync.Once 的源码实现,让他无处可遁。

源码分析

接下来分析 sync.Do 究竟是如何实现的,它存储在包sync下 once.go 文件中,源代码如下:

// sync/once.go

type Once struct {
   done uint32 // 初始值为0表示还未执行过,1表示已经执行过
   m    Mutex 
}
func (o *Once) Do(f func()) {
   // 判断done是否为0,若为0,表示未执行过,调用doSlow()方法初始化
   if atomic.LoadUint32(&o.done) == 0 {
      // Outlined slow-path to allow inlining of the fast-path.
      o.doSlow(f)
   }
}

// 加载资源
func (o *Once) doSlow(f func()) {
   o.m.Lock()
   defer o.m.Unlock()
   // 采用双重检测机制 加锁判断done是否为零
   if o.done == 0 {
      // 执行完f()函数后,将done值设置为1
      defer atomic.StoreUint32(&o.done, 1)
      // 执行传入的f()函数
      f()
   }
}

接下来会分为两大部分进行分析,第一部分为 Once 的结构体组成结构,第二部分为 Do 函数的实现原理,我会在代码上加上注释,保证用心阅读完都有收获。

结构体

type Once struct {
   done uint32 // 初始值为0表示还未执行过,1表示已经执行过
   m    Mutex 
}

首先定义一个struct结构体 Once ,里面存储两个成员变量,分别为 donem

done成员变量

  • 1表示资源未初始化,需要进一步初始化
  • 0表示资源已初始化,无需初始化,直接返回即可

m成员变量

  • 为了防止多个goroutine调用 doSlow() 初始化资源时,造成资源多次初始化,因此采用 Mutex 锁机制来保证有且仅初始化一次

Do

func (o *Once) Do(f func()) {
   // 判断done是否为0,若为0,表示未执行过,调用doSlow()方法初始化
   if atomic.LoadUint32(&o.done) == 0 {
      // Outlined slow-path to allow inlining of the fast-path.
      o.doSlow(f)
   }
}

// 加载资源
func (o *Once) doSlow(f func()) {
   o.m.Lock()
   defer o.m.Unlock()
   // 采用双重检测机制 加锁判断done是否为零
   if o.done == 0 {
      // 执行完f()函数后,将done值设置为1
      defer atomic.StoreUint32(&o.done, 1)
      // 执行传入的f()函数
      f()
   }

调用 Do 函数时,首先判断done值是否为0,若为1,表示传入的匿名函数 f() 已执行过,无需再次执行;若为0,表示传入的匿名函数 f() 还未执行过,则调用 doSlow() 函数进行初始化。

在 doSlow() 函数中,若并发的goroutine进入该函数中,为了保证仅有一个goroutine执行 f() 匿名函数。为此,需要加互斥锁保证只有一个goroutine进行初始化,同时采用了双检查的机制(double-checking),再次判断 o.done 是否为 0,如果为 0,则是第一次执行,执行完毕后,就将 o.done 设置为 1,然后释放锁。

即使此时有多个 goroutine 同时进入了 doSlow 方法,因为双检查的机制,后续的 goroutine 会看到 o.done 的值为 1,也不会再次执行 f。

这样既保证了并发的 goroutine 会等待 f 完成,而且还不会多次执行 f。

文章也会持续更新,可以微信搜索「 迈莫coding 」第一时间阅读。每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。
查看原文

赞 0 收藏 0 评论 0

迈莫coding 发布了文章 · 2月26日

深度解析go context实现原理及其源码

目录

  • Context 基本使用方法
  • Context 使用场景
  • valueCtx

    • 使用示例
    • 结构体
    • WithValue
  • cancleCtx

    • 使用示例
    • 结构体
    • WitCancel
  • WithTimeout
  • WithDeadline

    • 使用示例
    • WithDeadline
  • 总结

Context 基本使用方法

首先,我们来看一下 Context 接口包含哪些方法,这些方法都是干什么用的。

包 context 定义了 Context 接口,Context 的具体实现包括 4 个方法,分别是Deadline、Done、Err 和 Value,如下所示:

type Context interface { 
  Deadline() (deadline time.Time, ok bool)
  Done() <-chan struct{} Err()
  error 
  Value(key interface{}) interface{}
}

Deadline 方法会返回这个 Context 被取消的截止日期。如果没有设置截止日期,ok 的值是 false。后续每次调用这个对象的 Deadline 方法时,都会返回和第一次调用相同的结果。

Done 方法返回一个 Channel 对象。在 Context 被取消时,此 Channel 会被 close,如果没被取消,可能会返回 nil。后续的 Done 调用总是返回相同的结果。当 Done 被 close 的时候,你可以通过 ctx.Err 获取错误信息。Done 这个方法名其实起得并不好,因为名字太过笼统,不能明确反映 Done 被 close 的原因,因为 cancel、timeout、deadline 都可能导致 Done 被 close,不过,目前还没有一个更合适的方法名称。

关于 Done 方法,你必须要记住的知识点就是:如果 Done 没有被 close,Err 方法返回 nil;如果 Done 被 close,Err 方法会返回 Done 被 close 的原因。

Context使用场景

  • 上下文信息传递 (request-scoped),比如处理 http 请求、在请求处理链路上传递信息
  • 控制子 goroutine 的运行
  • 超时控制的方法调用
  • 可以取消的方法调用

valueCtx

valueCtx 是基于 parent Context 生成一个新的 Context,保存了一个key-value键值对。它主要用来传递上下文信息。

使用示例

ctx := context.Background()
ctx = context.WithValue(ctx, "key1", "0001")
ctx = context.WithValue(ctx, "key2", "0001")
ctx = context.WithValue(ctx, "key3", "0001")
ctx = context.WithValue(ctx, "key4", "0004")
fmt.Println(ctx.Value("key1")) // 0001

查找过程如图所示:

在这里插入图片描述

结构体

type valueCtx struct {
   Context  // parent Context
   key, val interface{}  // key-value
}

func (c *valueCtx) Value(key interface{}) interface{} {
   // 若key值 等于 当前valueCtx存储的key值 
   // 则取出其value并返回
   if c.key == key {
      return c.val
   }
   // 否则递归调用valueCtx中Value方法,获取其parent Context中存储的key-value
   return c.Context.Value(key)
}

通过观察 valueCtx 结构体,它利用一个 Context 变量表示其父节点的 context ,这样 valueCtx 也继承了父节点的所有信息;并且它持有一个 key-value 键值对,说明它还可以携带额外的信息。它还覆盖了 Value 方法,优先从自己的存储中检查这个 key,不存在的话会从 parent 中继续检查。

WithValue

WithValue 就是向 context 中添加键值对:

func WithValue(parent Context, key, val interface{}) Context {
   if parent == nil {
      panic("cannot create context from nil parent")
   }
   if key == nil {
      panic("nil key")
   }
   if !reflectlite.TypeOf(key).Comparable() {
      panic("key is not comparable")
   }
   return &valueCtx{parent, key, val}
}

通过代码可以看出,向 context 中添加键值对并不是在原 context 基础上添加的,而是新建一个 valueCtx 子节点,将原 context 作为父节点。以此类推,就会形成一个 context 链。在查找过程中,如果当前 valueCtx 不存在key值,还会向 parent Context 去查找,如果 parent 还是 valueCtx 的话,还是遵循相同的原则:valueCtx 会嵌入 parent,所以还是会查找 parent 的 Value 方法的。

在这里插入图片描述

cancleCtx

在我们开发过程中,我们常常会遇到一些场景,需要主动取消长时间的任务或者中止任务,这个时候就可以使用cancelCtx。通过调用cancel函数就可中止goroutine,进而去释放所占用的资源。

需要注意的是,不是只有中途中止任务时才调用cancel函数,只要任务执行完毕后,就需要调用 cancel,这样,这个 Context 才能释放它的资源(通知它的 children 处理 cancel,从它的 parent 中把自己移除,甚至释放相关的 goroutine)。

使用示例

func main() {
  // gen 在单独的 goroutine 中生成整数 然后将它们发送到返回的管道
  gen := func(ctx context.Context) <-chan int {
     dst := make(chan int)
     n := 1
     go func() {
        for {
           select {
           case <-ctx.Done():
              return // returning not to leak the goroutine
           case dst <- n:
              n++
           }
        }
     }()
     return dst
  }
  ctx, cancel := context.WithCancel(context.Background())
  // 代码完毕后调用cancel函数释放goroutine所占用的资源
  defer cancel() // cancel when we are finished consuming integers
  // 遍历循环获取管道中的值
  for n := range gen(ctx) {
     fmt.Println(n)
     if n == 5 {
        break
     }
  }
}

创建一个 gen函数,在gen函数中创建一个goroutine,专门用来生成整数,然后将他们发送到返回的管道。通过 context.WithCancel 创建可取消的 context ,最后遍历循环获取管道中值,当n的值为5时,退出循环,结束进程。最后调用cancel函数释放goroutine所占用的资源。

结构体

type cancelCtx struct {
    Context
    mu       sync.Mutex            
    done     chan struct{}         
    children map[canceler]struct{} 
    err      error                 
}

cancelCtx和valueCtx类似,结构体中都有一个Context作为其父节点;变量done表示关闭信号传递;变量children表示当前节点所拥有的子节点,err用于存储错误信息表示任务结束的原因。

接下来,看看cancelCtx实现的方法:

type canceler interface {
    cancel(removeFromParent bool, err error)
    Done() <-chan struct{}
}

func (c *cancelCtx) Done() <-chan struct{} {
   c.mu.Lock()
   if c.done == nil {
      c.done = make(chan struct{})
   }
   d := c.done
   c.mu.Unlock()
   return d
}

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
   if err == nil {
      panic("context: internal error: missing cancel error")
   }
   c.mu.Lock()
   if c.err != nil {
      c.mu.Unlock()
      return // already canceled
   }
   c.err = err
   // 设置一个关闭的channel或者将done channel关闭,用以发送关闭信号
   if c.done == nil {
      c.done = closedchan
   } else {
      close(c.done)
   }
   // 遍历循环将字节点context取消
   for child := range c.children {
      // NOTE: acquiring the child's lock while holding parent's lock.
      child.cancel(false, err)
   }
   c.children = nil
   c.mu.Unlock()
   if removeFromParent {
      // 将当前context节点从父节点上移除
      removeChild(c.Context, c)
   }
}

cancelCtx结构体实现Done和cancel方法,Done方法实现了将done初始化。cancel方法用于将当前节点从父节点上移除以及移除当前节点下的 所有子节点。

cancelCtx 被取消时,它的 Err 字段就是下面这个 Canceled 错误:

var Canceled = errors.New("context canceled")

WithCancel

WithCancel函数用来创建一个可取消的context,即cancelCtx类型的context。

WithCancel函数返回值有两个,一个为parent 的副本Context,另一个为触发取消操作的CancelFunc。

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
   if parent == nil {
      panic("cannot create context from nil parent")
   }
   c := newCancelCtx(parent)
   propagateCancel(parent, &c) // 把c朝上传播
   return &c, func() { c.cancel(true, Canceled) }
}

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
   // 将parent作为父节点context生成一个新的子节点
   return cancelCtx{Context: parent}
}

func propagateCancel(parent Context, child canceler) {
   done := parent.Done()
   if done == nil {
      return // parent is never canceled
   }
   
   select {
   case <-done:
      // parent is already canceled
      child.cancel(false, parent.Err())
      return
   default:
   }
   
   // 获取最近的类型为cancelCtx的祖先节点
   if p, ok := parentCancelCtx(parent); ok {
      p.mu.Lock()
      if p.err != nil {
         // parent has already been canceled
         child.cancel(false, p.err)
      } else {
         if p.children == nil {
            p.children = make(map[canceler]struct{})
         }
         // 将当前子节点加入最近cancelCtx祖先节点的children中
         p.children[child] = struct{}{}
      }
      p.mu.Unlock()
   } else {
      atomic.AddInt32(&goroutines, +1)
      go func() {
         select {
         case <-parent.Done():
            child.cancel(false, parent.Err())
         case <-child.Done():
         }
      }()
   }
}

调用 WithCancel函数时,首先会调用 newCancelCtx函数创建一个以parent作为父节点的context。然后调用propagateCancel函数,用来建立当前context节点与parent节点之间的关系。

在propagateCancel函数中,如果parent节点为nil,说明parent以上的路径没有可取消的cancelCtx,则不需要处理。

否则通过parentCancelCtx函数过去当前节点最近的类型为cancelCtx的祖先节点,首先需要判断该祖先节点是否被取消,若已被取消就取消当前节点;否则将当前节点加入祖先节点的children列表中。

否则的话,则需要新起一个 goroutine,由它来监听 parent 的 Done 是否已关闭。一旦parent.Done()返回的channel关闭,即context链中某个祖先节点context被取消,则将当前context也取消。

WithTimeout

WithTimeout 其实是和 WithDeadline 一样,只不过一个参数是超时时间,一个参数是截止时间。超时时间加上当前时间,其实就是截止时间,因此,WithTimeout 的实现是:

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { 
  // 当前时间+timeout就是deadline
  return WithDeadline(parent, time.Now().Add(timeout))
}

WithDeadline

WithDeadline 会返回一个 parent 的副本,并且设置了一个不晚于参数 d 的截止时间,类型为 timerCtx(或者是 cancelCtx)。

如果它的截止时间晚于 parent 的截止时间,那么就以 parent 的截止时间为准,并返回一个类型为 cancelCtx 的 Context,因为 parent 的截止时间到了,就会取消这个 cancelCtx。

如果当前时间已经超过了截止时间,就直接返回一个已经被 cancel 的 timerCtx。否则就会启动一个定时器,到截止时间取消这个 timerCtx。

综合起来,timerCtx 的 Done 被 Close 掉,主要是由下面的某个事件触发的:

  • 截止时间到了
  • cancel 函数被调用
  • parent 的 Done 被 close

使用示例

func main() {
  d := time.Now().Add(time.Second * 3)
  ctx, cancel := context.WithDeadline(context.Background(), d)
  defer cancel()
  select {
  case <-time.After(3 * time.Second):
     fmt.Println("overslept")
  case <-ctx.Done():
     fmt.Println(ctx.Err())
  }
}

WithDeadline

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
   if parent == nil {
      panic("cannot create context from nil parent")
   }
   // 如果parent的截止时间更早,直接返回一个cancelCtx即可
   if cur, ok := parent.Deadline(); ok && cur.Before(d) {
      return WithCancel(parent)
   }
   c := &timerCtx{
      cancelCtx: newCancelCtx(parent),
      deadline:  d,
   }
   // 建立新建context与可取消context祖先节点的取消关联关系
   propagateCancel(parent, c)
   dur := time.Until(d)
   if dur <= 0 { //当前时间已经超过了截止时间,直接cancel
      c.cancel(true, DeadlineExceeded) 
      return c, func() { c.cancel(false, Canceled) }
   }
   c.mu.Lock()
   defer c.mu.Unlock()
   if c.err == nil {
      // 设置一个定时器,到截止时间后取消
      c.timer = time.AfterFunc(dur, func() {
         c.cancel(true, DeadlineExceeded)
      })
   }
   return c, func() { c.cancel(true, Canceled) }
}

调用 WithDeadline函数,首先判断parent的截止时间是否早于当前timerCtx,若为true的话,直接返回一个cancelCtx即可。否则需要调用propagateCancel函数建议新建context与可取消context祖先节点的取消关联关系,建立关联关系之后,若当前时间已经超过截止时间后,直接cancel。否则的话,需设置一个定时器,到截止时间后取消。

总结

context主要用于父子任务之间的同步取消信号,本质上是一种协程调度的方式。另外在使用context时有两点值得注意:上游任务仅仅使用context通知下游任务不再需要,但不会直接干涉和中断下游任务的执行,由下游任务自行决定后续的处理操作,也就是说context的取消操作是无侵入的;context是线程安全的,因为context本身是不可变的(immutable),因此可以放心地在多个协程中传递使用。

到这里,Context 的源码已解读完毕,希望对您有收获,咱们下期再见。

文章也会持续更新,可以微信搜索「 迈莫coding 」第一时间阅读。每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。
查看原文

赞 0 收藏 0 评论 0

迈莫coding 发布了文章 · 2月23日

深度解析sync WaitGroup源码及其实现原理

目录

  • WaitGroup介绍
  • WaitGroup的实现

    • Add
    • Done
    • Wait

WaitGroup介绍

waitGroup ,也是在go语言并发中比较常用的语法,所以在这里我们一起剖析 waitGroup 的使用方式及其源码解读。

WaitGroup 也是sync 包下一份子,用来解决任务编排的一个并发原语。它主要解决了并发-等待问题:比如现在有三个goroutine,分别为goroutineAgoroutineBgoroutineC,而goroutineA需要等待goroutineBgoroutineC这一组goroutine全部执行完毕后,才可以执行后续业务逻辑。此时就可以使用 WaitGroup 轻松解决。

在这个场景中,goroutineA为主goroutine,goroutineBgoroutineC为子goroutine。goroutineA则需要在检查点(checkout point) 等待goroutineBgoroutineC全部执行完毕,如果在执行任务的goroutine还没全部完成,那么goroutineA就会阻塞在检查点,直到所有goroutine都完成后才能继续执行。

代码实现:

package main

import (
  "fmt"
  "sync"
)

func goroutineB(wg *sync.WaitGroup) {
  defer wg.Done()
  fmt.Println("goroutineB Execute")
  time.Sleep(time.Second)
}

func goroutineC(wg *sync.WaitGroup) {
  defer wg.Done()
  fmt.Println("goroutineC Execute")
  time.Sleep(time.Second)
}

func main() {
  var wg sync.WaitGroup
  wg.Add(2)
  go goroutineB(&wg)
  go goroutineC(&wg)
  wg.Wait()
  fmt.Println("goroutineB and goroutineC finished...")
}

运行结果:

goroutineC Execute
goroutineB Execute
goroutineB and goroutineC finished...

上述就是WaitGroup 的简单操作,它的语法也是比较简单,提供了三个方法,如下所示:

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
  • Add:用来设置WaitGroup的计数值(子goroutine的数量)
  • Done:用来将WaitGroup的计数值减1,起始就是调用Add(-1)
  • Wait:调用这个方法的goroutine会一直阻塞,直到WaitGroup的技术值变为0

接下来,我们进行剖析 WaitGroup 的源码实现,让其无处可遁,它源码比较少,除去注释,也就几十行,对新手来说也是一种不错的选择。

WaitGroup的实现

首先,我们看看 WaitGroup 的数据结构,它包括了一个noCopy 的辅助字段,一个具有复合意义的state1字段。

  • noCopy 的辅助字段:主要就是辅助 vet 工具检查是否通过 copy 赋值这个 WaitGroup 实例。我会在后面和你详细分析这个字段
  • state1:具有复合意义的字段,包含WaitGroup计数值,阻塞在检查点的主gooutine和信号量
type WaitGroup struct {
    // 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则
    noCopy noCopy
    // 64bit(8bytes)的值分成两段,高32bit是计数值,低32bit是waiter的计数
    // 另外32bit是用作信号量的
    // 因为64bit值的原子操作需要64bit对齐,但是32bit编译器不支持,所以数组中的元素在不同的架构中不一样,具体处理看下面的方法
    // 总之,会找到对齐的那64bit作为state,其余的32bit做信号量
    state1 [3]uint32
}


// 得到state的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        // 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
        return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    } else {
        // 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量
        return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    }
}    

因为对 64 位整数的原子操作要求整数的地址是 64 位对齐的,所以针对 64 位和 32 位环境的 state 字段的组成是不一样的。

在 64 位环境下,state1 的第一个元素是 waiter 数,第二个元素是 WaitGroup 的计数值,第三个元素是信号量。

在这里插入图片描述

在 32 位环境下,如果 state1 不是 64 位对齐的地址,那么 state1 的第一个元素是信号量,后两个元素分别是 waiter 数和计数值。

在这里插入图片描述

接下里,我们一一看 Add 方法、 Done 方法、 Wait 方法的实现原理。

Add

Add方法实现思路:

Add方法主要操作的state1字段中计数值部分。当Add方法被调用时,首先会将delta参数值左移32位(计数值在高32位),然后内部通过原子操作将这个值加到计数值上。需要注意的是,delta的取值范围可正可负,因为调用Done()方法时,内部通过Add(-1)方法实现的。

代码实现如下:

func (wg *WaitGroup) Add(delta int) {
  // statep表示wait数和计数值
  // 低32位表示wait数,高32位表示计数值
   statep, semap := wg.state()
   // uint64(delta)<<32 将delta左移32位
    // 因为高32位表示计数值,所以将delta左移32,增加到技术上
   state := atomic.AddUint64(statep, uint64(delta)<<32)
   // 当前计数值
   v := int32(state >> 32)
   // 阻塞在检查点的wait数
   w := uint32(state)
   if v > 0 || w == 0 {
      return
   }
   
   // 如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量
    // 将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter
   *statep = 0
   for ; w != 0; w-- {
      runtime_Semrelease(semap, false, 0)
   }
}

Done

内部就是调用Add(-1)方法,这里就不细讲了。

// Done方法实际就是计数器减1
func (wg *WaitGroup) Done() { 
  wg.Add(-1)
}

Wait

wait实现思路:

不断检查state值。如果其中的计数值为零,则说明所有的子goroutine已全部执行完毕,调用者不必等待,直接返回。如果计数值大于零,说明此时还有任务没有完成,那么调用者变成等待者,需要加入wait队列,并且阻塞自己。

代码实现如下:

func (wg *WaitGroup) Wait() {
   // statep表示wait数和计数值
   // 低32位表示wait数,高32位表示计数值
   statep, semap := wg.state()
   for {
      state := atomic.LoadUint64(statep)
      // 将state右移32位,表示当前计数值
      v := int32(state >> 32)
      // w表示waiter等待值
      w := uint32(state)
      if v == 0 {
         // 如果当前计数值为零,表示当前子goroutine已全部执行完毕,则直接返回
         return
      }
      // 否则使用原子操作将state值加一。
      if atomic.CompareAndSwapUint64(statep, state, state+1) {
         // 阻塞休眠等待
         runtime_Semacquire(semap)
         // 被唤醒,不再阻塞,返回
         return
      }
   }
}

到此,waitGroup的基本使用和实现原理已介绍完毕了,相信大家已有不一样的收获,咱们下期见。

文章也会持续更新,可以微信搜索「 迈莫coding 」第一时间阅读。每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。

在这里插入图片描述

查看原文

赞 0 收藏 0 评论 0

迈莫coding 发布了文章 · 2月20日

腾讯一面问我SQL语句中where条件为什么写上1=1

在项目编写中,经常会在代码中使用到“where 1=1”,这是为什么呢?

目录

  • where后面加"1=1"还是不加
  • 不用where 1=1 在多条件查询的困惑
  • 使用where 1=1 的好处
  • 使用where 1=1 的坏处

where后面加"1=1"还是不加

比如现在有一个场景,DB数据库中有一张博客表(blog),想把blog表中的所有记录查询出来,那么可以有两种方式操作。一种写法是where关键词什么也不加,另一种写法是where关键词后面加"1=1",写法如下:

  • where关键词什么也不加
select * from blog;
  • where关键词后面加 "1=1"
select * from blog where 1 = 1;

这两种SQL语句查询所得到的结果完全没有区别。那为什么要在where关键字后面添加"1=1"呢?

我们知道1=1表示true,即永真。如果使用不恰当会造T0级错误。例如在编写SQL语句时进行where条件查询时配合or运算符会得到意向不到的结果,结果会让你哆嗦。不信,往下看:

例如,当我们要删除博客ID称为“202102111501”的记录,我们可以这样写:

delete from blog where blogId = "202102111501"

如果这个时候如果在where语句后面加上 or 1=1会是什么后果?

delete from blog where blogId = "202102111501" or 1 = 1

本来只要博客ID称为“202102111501”的记录,结果因为添加了or 1=1的永真条件,会导致整张表里的记录都被删除了。那你可就闯祸了。

不用where 1=1 在多条件查询的困扰

举个例子,如果你想查看当前博客中某条评论记录时,那么按照平时的查询语句的 动态构造,代码大体如下:

String sql="select * from blog where";
if ( condition 1) {
  sql = sql + " blogID = 202102111501";
}
if (condition 2) {
  sql = sql + " and commentID = 150101";
}

如果上述的两个if判断语句均为true时,那么最终的动态SQL语句为:

select * from table_name where blogID = 202102111501 and commentID = 150101;

可以看出来这是一条完整的正确的SQL查询语句,能够正确执行。

如果上述的两个if判断语句均为false时,那么最终的动态SQL语句为:

select * from table_name where;

此时我们看看这条生成的SQL语句,由于where关键字后面需要使用条件,但是这条语句根本不存在,所以该语句就是一条错误语句,不能被执行,不仅报错,同时还查不到任何数据。

使用where 1=1 的好处

如果我们在where条件后加上1=1,看看它的真面目:

String sql="select * from blog where 1=1";
if ( condition 1) {
  sql = sql + " and blogID = 202102111501";
}
if (condition 2) {
  sql = sql + " and commentID = 150101";
}

当condition 1和condition 2都为真时,上面被执行的SQL代码为:

select * from blog where 1=1 and blogID = "202102111501" and commentID = 150101;

可以看出来这是一条完整的正确的SQL查询语句,能够正确执行。

如果上述的两个if判断语句均为false时,那么最终的动态SQL语句为:

select * from table_name where 1=1;

现在,我们来看这条语句,由于where 1=1 是为True的语句,因此,该条语句语法正确,能够被正确执行,它的作用相当于:sql="select * from table",即返回表中所有数据。

当在where关键字后面添加1=1时,如果此时查询时不选择任何字段时,那么必将返回表中的所有数据。如果是按照某个字段进行单条查询时,那么就会此时的条件进行查询。

说到这里,不知道您是否已明白,其实,where 1=1的应用,不是什么高级的应用,也不是所谓的智能化的构造,仅仅只是为了满足多条件查询页面中不确定的各种因素而采用的一种构造一条正确能运行的动态SQL语句的一种方法。

使用where 1=1 的坏处

我们在写SQL时,加上了1=1后虽然可以保证语法不会出错!

select * from table_name where 1=1;

但是因为table中根本就没有名称为1的字段,该SQL其实等效于select * from table,这个SQL语句很明显是全表扫描,需要大量的IO操作,数据量越大越慢。

所以在查询时,where1=1的后面需要增加其它条件,并且给这些条件建立适当的索引,效率就会大大提高。

文章也会持续更新,可以微信搜索「 迈莫coding 」第一时间阅读。每天分享优质文章、大厂经验、大厂面经,助力面试,是每个程序员值得关注的平台。
查看原文

赞 3 收藏 0 评论 0

认证与成就

  • 获得 4 次点赞
  • 获得 1 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

  • torm

    对象-关系映射(Object-Relational Mapping,简称ORM),面向对象的开发方法是当今企业级应用开发环境中的主流开发方法,关系数据库是企业级应用环境中永久存放数据的主流数据存储系统。

  • 博客系统

    适合学生搭建的个人博客

注册于 2月2日
个人主页被 933 人浏览