骑牛上青山

骑牛上青山 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

骑牛上青山 发布了文章 · 11月16日

分布式系统中的哈希算法

哈希

Hash也称散列、哈希,原理是把任意长度的字符串当作输入,然后通过Hash算法变成固定长度输出。Hash是一个映射的过程,因此是一定会产生冲突的,一般使用链地址法,开放寻址法等方法来解决hash冲突。

分布式下的哈希

在分布式的情景下,为了解决数据和请求的定向问题,我们也会常常使用到哈希算法。接下来,就会介绍几种常常在分布式环境下运用的hash算法。

普通哈希

哪怕是在分布式的环境下,我们也可以使用最简单的hash算法,通过设定好每台服务器对应的结果值,在请求或者数据进来时进行计算,将数据分别映射到相应的服务器上,由于计算规则是一致的,因此无论进行多少次的计算,数据的映射是不会进行变化的。

这种普通的哈希的方式优缺点分明,优点是实现简单,清晰明了。缺点是由于分布式系统中的节点充满了不确定性,可能会缩容或者扩容或者节点宕机,如果在这些情况下,意味着哈希的映射将会发生变化,同时之前的那些映射的数据需要进行迁移,以便之后能够正确的访问。而这种方式的哈希在这种情况下产生的数据迁移量将会是非常巨大的。

image

上图是一个普通的分布式系统的哈希映射关系,3台服务器分别接受哈希值为0,1,2的请求(一般为计算%2的值)。

image

于是当我们新增一台服务器之后,原本3台服务器变成了4台,哈希映射需要随之修改,大量的数据需要迁移。

一致性哈希

一致性哈希是为了解决之前说到的哈希造成的大量数据迁移的问题。

一致性哈希和普通哈希相比,同样是有一定的映射关系的,但是不同的是,我们会在开始创建一个哈希环,在环上分布着大量的节点值,一般的范围为0 ~ 2^32-1

image

之后我们会根据一定的规则,将服务器节点落在环上,如下图

image

之后的逻辑就比较简单了,当请求发往服务器后,经过计算找到其在环上的对应位置,然后检查该位置上是否有对应服务器节点,如果有就将请求转发过去,如果没有就沿着哈希环顺时针寻找,直到找到节点位置。

这样设计的好处是显而易见的,如果我们需要新增或者删除节点的时候,每次只会影响至多2个节点的数据,相比较之前的普通哈希,消耗显然是更少的。并且当某些服务器因故障突然宕机的时候,请求也可以顺延到下个节点进行处理。

节点分布不均问题

一致性哈希的特点决定了如果节点分布的不够均匀会导致其中部分节点压力过大,而部分节点有很多资源的空闲。如下图

image

图中的A,B节点显然是不均衡的,请求会更多地发往A节点而B节点只能获取A节点约1/3的请求。

于是一致性哈希往往会引入这么一个概念:虚拟节点。尽管我们的服务器分布不够均匀,但是我们可以认为的创建一些虚拟节点,并且创建相应的映射,帮助虚拟节点把请求转发到实际节点。

image

如图,我们可以创建对应的虚拟节点A',B',然后把发往B'的请求转发到B,A'的请求转发到A,这样就不会存在失衡的问题了。

哈希槽

哈希槽的典型是redis的分布式实现。

redis的分布式实现中,会在启动集群的时候确认所有的服务器数量,然后将数量为16384的哈希槽平均分配给所有的master服务器,然后所有的数据都会存放在指定的节点之中。

redis的哈希槽的实现和一致性哈希有相同之处,也有不同之处。最主要的原因是redis采用了不同的高可用策略。一致性哈希在服务器宕机时会把流量转到下一个服务器,但是redis不同,redis的集群模式会保证服务器节点拥有的主备模式。备份节点不会直接参与到哈希槽的分配中,但是当主节点宕机后,从节点会顶替主节点处理任务。

分布式哈希表

分布式哈希表(DHT)是一种分布式的哈希手段。和一致性哈希不同的是,DHT不需要中心节点来分配数据的流向。他有自己的一套机制保证无论数据刚开始走的是哪一个服务器,都可以找到自己需要前往的正确服务器。

Kademlia算法

Kademlia算法是一种典型的分布式的哈希表算法,多用于p2p网络的构建,由Petar MaymounkovDavid Mazieres共同创造。Kademlia论文源地址:Kademlia

分布式环境下的哈希表的难点在于以下几点:

  1. 分布式环境下每个服务器不可能掌握所有服务器的情况,因此如何保证你的请求能在没有中央节点定位的情况下找到对应的服务器是一大难点。
  2. 同样由于分布式环境的服务器的掌握信息有限,那么服务器的加入和退出如何能够被集群知晓也是一大难点。

那么我们来看Kademlia算法是如何解决这些问题的吧。

异或运算

Kademlia使用到了异或来进行距离的计算。我们先来看看异或的定义。

异或的运算法则为:0⊕0=0,1⊕0=1,0⊕1=1,1⊕1=0(同为0,异为1)

然后我们来看看为什么用异或来计算距离。

a⊕b = b⊕a // 异或符合交换律,a节点到b节点的距离和b节点到a结点的距离相同
a⊕b = 0    // 自己和自己的距离为0
a⊕b >= 0   // 两个节点之间的距离大于0
a⊕b + b⊕c >= a⊕c // a到b再到c的距离大于等于直接到c的距离

根据上述的一些异或的规则,我们可以发现异或和距离的一些特性可以说是绝配,真的很佩服算法的作者能够想到如此精巧的设计。

二叉树的构建

确定了用异或来计算距离后,那么具体集群是如何构建并存储信息使得可以查找到正确的信息呢?

Kademlia算法理论中每个集群的节点都会存储一部分节点的信息(不可能存储所有节点的信息,因为效率会低,并且无法保证实时性)。

所有的节点会构建成一棵独特的二叉树如下图:

image

首先把每个节点的id经过一定的哈希计算得到该节点的一串01字符串以表示其在树中的位置,从高位开始,1则往左子树走,0往右子树走,直到结束。可以看出图中的黑色节点的哈希值为0011

二叉树的拆分

Kademlia的二叉树中的每一个节点都可以根据自己的视角进行二叉树的拆分

拆分规则是从根节点开始依次把不包含自己的子树拆分出来,以此类推,最后只剩下自己。之前的二叉树拆分如之前的图。对于黑色节点来说,外部有拆分出了四个不包含自己的子树。

K-bucket机制

在二叉树拆分之后每一个拆分过后的子树实际上对应的就是一个一个bucket,每个bucket对于当前节点的距离是不同的范围,距离越远,高位不同,因此异或结果差距越大(距离越远):

K-bucket距离区间
0[2^0, 2^1)
1[2^1, 2^2)
2[2^2, 2^3)
3[2^3, 2^4)
4[2^4, 2^5)

所以实际上每一个节点再进行拆分后只需要在对应的每个bucket中存储一份该bucket的节点就可以遍历整个二叉树(集群)。当然为了容错,一般来说每个bucket的节点都会保留几个,而不仅仅是一个。

节点查询

大致了解了原理之后,我们回过头来看每次请求是如何定位节点的。

首先一个请求进入集群中的某个服务器。然后我们将请求带着的目的地服务器的id和当前服务器的id计算两者的距离。然后计算出了一个值,之后从服务器的bucket列表中寻找对应的bucket(即这个距离范围对应的bucket)。我们的目标服务器就可以锁定在了那个bucket的范围之内,之后,在bucket中寻找距离该节点最近的K个服务节点(此参数可以自行设定大小),将请求重定向到这几个节点。之后重复上述的步骤,如果该集群中真的有目标节点,那么就可以成功的返回。

基本的机制如此,当然在实际的环境中我们考虑到现实情况会对请求做超时处理,避免大量的节点间的查询造成不必要的负载。

节点变动

一个新节点是想加入网络,首先有一个前提条件:他需要有一个处于网络中的节点的信息,然后才能开启加入流程。

加入流程:

  1. 新节点A以之前就有的节点T为起点,将其加入自己的K-bucket中,并且生成一个自己的节点id
  2. 节点A项节点B进行请求,以自己的id为参数请求节点定位自己的位置
  3. 之后就是查询结点的流程了,每一个路经的节点都会找到自己节点中存储的距离节点最近的节点,然后A把这些节点放入自己的bucket中以完善自己的路由表。同时,这些路经节点也会把A节点放入自己的路由表中,以待后用。
  4. 等到大部分节点返回后,A的路由表建立完成,一些节点也已经将A节点加入自己的路由表。至此A节点加入网络成功。

算法的参数

算法中我们用到的一些参数其实是可以自己定义的:

  1. k-bucket中的k:定义了每一层bucket会存储k个节点信息
  2. 每一次请求会向s个节点发送信息
  3. id的长度也是可以自定义的,注意id的长度会决定二叉树的高度

总结

分布式系统中的哈希算法有很多种,实现不同,功能也不尽相同。对于一般的企业应用中,带有中心节点的哈希算法是更为理想的选择,因为意味着服务的可控可监测。而在类似于p2p和区块链的环境下,具备中心节点的分布式哈希算法是没法接受的,因为p2p和区块链的设计上是没有中心节点的,也不会有节点能够知道所有网络中的节点信息,因此无中心节点的哈希算法在此可以大放异彩。

查看原文

赞 0 收藏 0 评论 0

骑牛上青山 发布了文章 · 10月26日

一次zuul版本升级产生的问题排查记录

起因

事情的起因是由于早期的一些服务版本放到现在太低了,基本上都是SpringBoot1.5.x,因此准备统一对服务进行一次版本升级,升级到2.1.xSpringCloud`版本升级到Greenwich`。当然我们用的旧版本的zuul相关的都需要升级。

意外的Bug

我们网关使用的是zuul,使用的是spring-cloud-netflix封装的包,此次版本升级同步升级了相关的包。但是意外的情况发生了,在测试环境上我们发现上传文件会出现异常。具体表现是这样的:当上传的文件超出一定大小后,在经过zuul网关并向其他服务转发的时候,之前上传的包就不见了。这个情况十分奇怪,因此马上开始排查。

Bug的排查

出现这样的问题,第一反应是测试是不是根本没有上传包所以当然包没法转发到下一层,当然这种想法很快被否定了。好吧,那就认真的排查吧。

首先先去追踪了一下路由以及出现的具体日志,将问题定位到zuul服务,排除了上游nginx和下游业务服务出现问题的可能。但是zuul服务没有任何异常日志出现,所以非常困扰。检查过后发现文件确实有通过zuul,但是之后凭空消失没有留下一点痕迹。

明明当初考虑上传文件的问题给zuul分配了两个g的内存,怎么上传500m的文件就出问题了呢?不对!此时我灵光一闪,会不会和垃圾回收机制有关。我们的文件是非常大的,这样的大文件生成的大对象是会保存在java的堆上的,并且由于垃圾回收的机制,这样的对象不会经历年轻代,会直接分配到老年代,会不会是由于我们内存参数设置不合理导致老年代太小而放不下呢?想到做到,我们通过调整jvm参数,保证了老年代至少有一个G的空间,并且同步检测了java的堆内存的状态。然而让人失望的是居然没有奏效。不过此时事情和开始不同,我们有了线索。在刚才的堆的内存监控中发现了一些异常,随即合理怀疑是堆中内存不够导致了oom。随后加大内存尝试并且再次运行,发现居然上传成功了。果然是老年代内存不足导致的oom,不过虽然上传成功,但是老年代中的内存居然被占用了1.6G左右,明明是500M的文件,为什么会占用了这么大的内存呢?

虽然找到了原因,但是增加内存显然不是解决问题的方法,因此,我们在启动参数上新增了-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data准备查看oom的具体分析日志。

OOM信息

查看堆栈信息可以发现,溢出是发生在byte数组的拷贝上,我们迅速定位代码,可以找到如下的代码:

    public InputStream getRequestEntity() {
        if (requestEntity == null) {
            return null;
        }
        
        if (!retryable) {
            return requestEntity;
        }

        try {
            if (!(requestEntity instanceof ResettableServletInputStreamWrapper)) {
                requestEntity = new ResettableServletInputStreamWrapper(
                        StreamUtils.copyToByteArray(requestEntity));
            }
            requestEntity.reset();
        }
        finally {
            return requestEntity;
        }
    }

这段代码源自RibbonCommandContext是在zuul中进行请求转发的时候调用到的,具体的OOM是发生在调用StreamUtils.copyToByteArray(requestEntity));的时候。继续进入方法查找源头。最终经过排查找到了溢出的源头。ribbon转发中的用到了ByteArrayOutputStream的拷贝,代码如下:

    public synchronized void write(byte b[], int off, int len) {
        if ((off < 0) || (off > b.length) || (len < 0) ||
            ((off + len) - b.length > 0)) {
            throw new IndexOutOfBoundsException();
        }
        ensureCapacity(count + len);
        System.arraycopy(b, off, buf, count, len);
        count += len;
    }

可以看到这边有一个ensureCapacity,查看源码:

    private void ensureCapacity(int minCapacity) {
        // overflow-conscious code
        if (minCapacity - buf.length > 0)
            grow(minCapacity);
    }

    private void grow(int minCapacity) {
        // overflow-conscious code
        int oldCapacity = buf.length;
        int newCapacity = oldCapacity << 1;
        if (newCapacity - minCapacity < 0)
            newCapacity = minCapacity;
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        buf = Arrays.copyOf(buf, newCapacity);
    }

可以看到ensureCapacity做了一件事,就是当流拷贝的时候byte数组的大小不够了,那就调用grow进行扩容,而grow的扩容和ArrayList不同,他的扩容是每一次将数组扩大两倍。

至此溢出的原因就很清楚了,500m文件占用1.6g是因为刚好触发扩容,导致用了多一倍的空间来容纳拷贝的文件,再加上源文件,所以占用了文件的3倍空间。

解决方案

至于解决方案,调整内存占用或者是老年代的占比显然不是合理的解决方案。我们再回头查看源代码,可以看到这个部分

    if (!retryable) {
        return requestEntity;
    }

如果设置的不重试的话,那么body中的信息就不会被保存。所以,我们决定临时先去除上传文件涉及到的服务的重试,之后再修改上传机制,在以后的上传文件时绕过zuul。

追根溯源

虽然找到的原因,并且也有了解决方案,但是我们仍然不知道为什么旧版本是ok的,因此本着追根究底的态度,找到了旧版的zuul的源码。

新版的ribbon代码集成spring-cloud-netflix-ribbon,而旧版的ribbon的代码集成在spring-cloud-netflix-core中,所以稍稍花费点时间才找到对应的代码,检查不同,发现旧版的getRequestEntity没有任何的处理,直接返回了requestEntity

    public InputStream getRequestEntity() {
        return requestEntity;
    }

而在之后的版本中马上就加上了拷贝机制。于是我们去github上找到了当初的那个commit

之后我们顺着commit中给出的信息找到了最初的issue

查看过issue之后发现这原来是旧版的一个bug,这个bug会导致旧版的post请求在retry的时候有body丢失的情况,因此在新版本中进行了修复,当请求为post的时候会对于body进行缓存以便于重试。

至此,我们原原本本的复原了这个bug的全貌以及形成的历史和原因。并且找到适当的解决方案。最后提一句:真的不要用zuul来上传大文件,真的会很糟糕!

查看原文

赞 0 收藏 0 评论 0

骑牛上青山 收藏了文章 · 6月22日

面试官:你回去等通知吧!

这是why技术的第37篇原创文章

老规矩,先聊聊生活,上面这张图片是我周一拍的。

周一晚上下班后发现公司楼下推着三轮车卖花的阿姨又开始买花了。整个路口只有她一个人在做生意,整条路上也没有几个人,大家都低着头匆匆走着,繁花中带着点忧伤。

于是,我去买了一把白玫瑰。

上周日把《霍乱时期的爱情》看完了,就刚好当道具拍了上面的照片。总体来说我不喜欢这种纵情声色的故事,更不喜欢那个看起来冠冕堂皇的理由∶“我一生有622个情人,但是我只爱过你”。虽然它真的是穷极了爱情的所有可能性,但是它不够真实。

相比之下我觉得钱钟书先生写的《围城》∶“我说的让她三分,不是三分流水七分尘的三分,而是天下明月只有三分的三分。”这样打打闹闹的爱情更加真实。

再看杨绛先生的《我们仨》,书的最后她说∶“世间好物不坚牢,彩云易散琉璃脆”。这才是爱情,这才是真实的生活。

好了,说回文章。

对不起,我错了。

前面发的这两篇文章:

《面试官:你说你熟悉jvm?那你讲一下并发的可达性分析》

《面试官:G1回收器怎么知道你是什么时候的垃圾?》

里面有一些没有说清楚的地方,又有很多读者来问,所以我觉得需要补充说明一下

更重要的是,经过高手指点,其中还有一些描述错误的地方,我也需要进行勘误

如果真的是面试题,可能面试官就会对我说:好了,我们今天就先到这里。你回去等通知吧。

如果你没看过我刚刚说的两篇文章,我建议你不要看这篇,因为一看就得看三篇,如果里面的衍生知识点你还想彻底弄明白,一个下午就过去了......(当然,你看了后收获肯定还是有的。)

如果你看了我之前的两篇文章,我求求你一定看看这篇,补充、更正一下答案,等面试官真的问起细节来,也不怕......

好了,在阅读本文之前,我假设你已经读过我前面说的两篇优质、幽默、有料的文章了。

并发的可达性分析-勘误

之前发布了这篇文章《面试官:你说你熟悉jvm?那你讲一下并发的可达性分析》,对于文中这一部分内容中的动图,有很多朋友给我说看不懂:

我把这个动图拿出来:

首先,需要说明的是,我现在也看不懂这个动图了。(画错了就是画错了,还强行找个理由)。

接下来,忘记这个动图,我们重新分析一波原始快照方案(以下简称SATB,Snapshot At The Beginning)。

首先,我们看初始标记阶段(即根节点枚举)完成后,刚刚进入并发标记阶段,GC 线程开始扫描时的对象图:

在上面这张图里,当GC Roots确定后,对象图就已经确定了。SATB扫描的时候基于已经确定的对象图(快照版的对象图)扫描,也就是说扫描过程中上面的快照图的引用关系是不会发生变化的,但是真实的对象图是会发生变化的。

举个例子:就类似于你在操场上拍了一张照片,你数照片里面的人数,照片是不会发生变化,人数一直都是这么多,但是真实的操场上的人是在时刻变化的。

所以,在对象图确定的一刻,正常扫描完成后,对象图变成了下面这样:

好了,面前的铺垫完成了。

我们这里需要演示的是“对象消失”情况。

首先,我们先确定一下上面展示的对象图,在并发标记阶段必然有一个时刻的对象图是这样的:

我们基于这个时刻的这个对象图去讨论“对象消失”的问题。

还得记得"对象消失"必须同时满足的两个条件吗?(这两个条件是摘抄自《深入理解Java虚拟机(第3版)》P.89)

条件一:赋值器插入了一条或者多条从黑色对象到白色对象的新引用。

条件二:赋值器删除了全部从灰色对象到该白色对象的直接或间接引用。

我们再仔细的读一遍第二个条件,你会发现,它说的是**“该白色对象”。这个“该白色对象”指的是条件一里面的白色对象。**

所以,我们有理由相信:条件一和条件二是有先后顺序的,即必须是赋值器插入了一条或者多条从黑色对象到白色对象的新引用,然后赋值器又删除了全部从灰色对象到该白色对象的直接或间接引用。在这样的情况下,才会出现“对象消失”的情况。

经过高人指点,我们还可以进行反证法,如下:

我们假设灰色对象到白色对象的引用先删除了,即先触发了条件二。那么对应的这个时刻真实的对象图将变成下面的样子:

(注意我这里强调的是真实的对象图,而不是快照的对象图。再次重申:快照的对象图在扫描开始的时候就确定了,扫描过程中是不会变化的。)

那么,白色对象9是处于游离态的,从根节点没有任何引用链相连,用图论的话来说就是从 GC Root 到对象9不可达,则证明此对象是不可能再被使用的。因此用户线程不可能把黑色对象5指向游离态的白色对象9,你写不出这样的代码来。

如果说上面的图你一眼没看出来,那么请看下面这图,是不是恍然大悟:

黑色对象5不能指向白色对象9,那么第一条规则就满足不了了。

所以,综上我们可以得出:条件一和条件二是有先后顺序的。

那么我们根据条件一继续做图如下:

条件一是赋值器插入了一条或者多条从黑色对象到白色对象的新引用。

在上面这个图的场景中,就是 GC 线程在工作的同时,赋值器插入了一条黑色对象5到白色对象9之间的新引用。(用红色线条以示区分)

在这个时刻,由于灰色对象6指向白色对象9,所以黑色对象5可以指向白色对象9,想一想我们前面的证明,只要有引用链,黑色对象就可以到达白色对象。

这个时候仅仅满足了条件一,对象还没消失。

接下来就是条件二的图,STAB破坏的就是条件二

条件二是赋值器删除了全部从灰色对象到该白色对象的直接或间接引用

在上面这个图的场景中,就是赋值器删除了灰色对象6到白色对象9的直接引用。

这个时候白色对象9就是“消失的对象”了,因为黑色的对象5是不会被再次扫描的。

需要注意的是,赋值器可以理解为用户线程,由于在并发标记阶段,用户线程和 GC 线程在同时运行,所以需要出现上面的图,还有一个前置条件就是:

用户线程删除对象6到对象9之间的引用,要先于 GC 线程扫描到对象6,把对象6变成灰色的操作。因为只有这样,GC 线程处理到对象6的时候,才有对应的写屏障记录。

如果在 GC 线程已经扫描过对象6,即对象6已经是黑色的情况下(这个时候对象9,不是黑色就是灰色,不可能是白色),用户线程再去删除对象6到对象9之间的引用,GC 线程是不需要处理的,因为对象9已经是非白了,它在本轮中必定会活下来。

这里我引用R大的描述:

https://hllvm-group.iteye.com/group/topic/44381?page=2

因为删除操作会触发 pre-write barrier,把每次引用关系变化时旧的引用值记下来,只有这样,等 GC 线程到达某一个对象时,这个对象的所有引用类型字段的变化全都有记录在案,就不会漏掉任何在快照图里活的对象。当然,很可能有对象在快照中是活的,但随着并发 GC 的进行它可能本来已经死了,但 SATB 还是会让它活过这次 GC,变成了浮动垃圾。

SATB 在写屏障里,把旧的引用所指向的对象都变成非白的(已经黑灰就不用管,还是白的就变成灰的)。

这样做的实际效果是:如果一个灰对象的字段原本指向一个白对象,但在concurrent marker能扫描到这个字段之前,这个字段被赋上了别的值(例如说null),那么这个字段跟白对象之间的关联就被切断了。SATB write barrier保证在这种切断发生之前就把字段原本引用的对象变灰,从而杜绝了上述条件二的发生。

其中:“把旧的引用所指向的对象都变成非白的。”在我们这个场景下含义如下:

旧的引用指的是:灰色对象6到白色对象9之间的引用。

所指向的对象指的是:白色对象9。

都变成非白的:指的是白色对象9变成了灰色。

所以,在两个条件顺序触发、对象图扫描完成后会变成下面的样子:

并发扫描结束之后,再以灰色对象9为根(把它作为根,自然会变成黑色),重新扫描一次,所以最终的对象图变成了这样:

有的小伙伴就会问了:如果在标记过程中,用户线程并没有把对象5指向对象9的操作,仅仅是发生了删除对象6到对象9之间引用的操作,那么这个对象图是什么样子呢?

就是下面这个样子,你应该可以想象出来:

对象9还是黑色,只是它变成了浮动垃圾,逃过了本次回收而已。并不影响程序运行。

接下来,让上面的图动起来,并且我把图片之间的切换顺序放慢。你再自己细品品:

所以,上面的全部描述,才是一次我认为正确的,展示SATB方案是如何解决“对象消失”问题的过程。

之前《面试官:你说你熟悉jvm?那你讲一下并发的可达性分析》中对于这一部分的描述过于简单,且存在错误,给大家道歉,并特以此文进行修正。

你是什么时候的垃圾-勘误

在《G1回收器:我怎么知道你是什么时候的垃圾?》这篇文章中有一句描述是这样的:

“GC Roots 能直接关联到的对象:就是一个 Region 已经使用过的部分,所以在 bottom 与 top 之间。”这句话是错误的。

实际上,通过文章后面的描述你也能发现。GC Roots 能直接关联到的对象集合应该“小于” Region 已经使用过的部分,对象图递归完之后,所有对象总和,才等于Region已经使用过的部分。

通过文章中后半部分的这个图片也可以直观的发现, bottom 到 top 之间是一个 Region 已经使用的部分。但是这一部分中,只有 bottom 到 NextTAMS 之间的对象才是 GC Roots 能直接关联到的对象,这部分对象并不是一个 Region 已经使用过的部分。

你是什么时候的垃圾-补充说明

关于《G1回收器:我怎么知道你是什么时候的垃圾?》这篇文章,还有两个需要补充说明的地方。

有的读者问说:文章中没有讨论回收的内容,每次清理不会真正回收,那是不是多轮标记后才发生一次回收呢?

一。

首先,文章中确实没有讨论回收相关的内容。我在前面部分也写了,把G1回收切分为两大部分:

1.Global Concurrent Marking:全局并发标记。

2.Evacuation Pauses:该阶段是负责把一部分Region里的活对象拷贝到空Region里面去,然后回收原本的Region空间。

只要清楚了全局并发标记阶段,就可以解答文中抛出的这个问题:

所以我只说明了全局并发标记阶段。

如果想要了解回收阶段的事,可以去看看R大的回答,强烈建议你看完本文,点个赞后,打开下面的链接,反复阅读几遍:

https://hllvm-group.iteye.com/group/topic/44381

其次,“每次清理不会真正回收,那是不是多轮标记后才发生一次回收呢?”

这句话,可能是我在文章强调了清理阶段不拷贝任何对象,再加上没有描述回收阶段,导致读者有点懵了吧。

一次全局并发标记完成后,紧接着一次回收的过程。

只是G1收集器之所以能建立可预测的停顿时间模型(-XX:MaxGCPauseMillis指定,默认值为200毫秒),是因为它将 Region 作为单次回收的最小单元,即每次收集到的内存空间都是 Region 大小的整数倍,这样就可以有计划地避免在整个Java堆中进行全区域的垃圾回收。

更具体一点的做法就是每个 Region 里面堆积的垃圾都有一个“价值”(价值即回收所获得的空间大小以及回收所需要的时间的经验值)。而这些“价值”,是维护在一个优先级列表中的,G1收集器都是知道的。

所以回收阶段会优先处理回收价值最大的那些 Region。因此,一次回收的过程并不会回收所有的 Region。

二。

这里也就解释了读者提出的另外一个问题:如果每次标记完都会回收整理,那为什么红框所在的区间与上一次标记之后相同,好像没有被整理一样,整理之后不是应该不留下内存空隙吗?

我觉得一个合理的解释,就是我上面说的:这个 Region 的价值不够,所以它本次没有被回收。随着时间的推移,它里面堆积的垃圾越来越多,“价值”就越来越高,总是会被回收的。

还有读者问:看了并发标记的过程,有个疑问 prevBitmap 的作用是什么? 因为感觉每次都是从头开始扫描,没看到它的作用。

这个问题,可以从这张图片入手解答:

这个 E 是 Remark 阶段,可以看到,在这个阶段,其实 PrevBitmap 是派上用场了。

前面刚刚说了,这个 Region 由于“价值”不够,它逃过了上次垃圾回收,所以待到下次垃圾回收的时候,就是 prevBitmap 的用武之地了,它里面记录的地址对应的区间就不需要再次标记了,因为这些地址对应的对象就已经是垃圾了。

我们可以假设 E 代表的是第 n 轮回收的过程的Remark阶段。那么 PrevBitmap 就是第 n-1 轮的标记结果。

之前的文章说了:一个 previous Bitmap 记录的是上一轮 Concurrent Marking 后的对象标记状态,因为上一轮已经完成(上一轮就是第n-1轮),所以这个bitmap的信息可以直接使用。

可以直接使用的意思就是前面说的:它里面记录的地址对应的区间就不需要再次标记了,因为这些地址对应的对象就已经是垃圾了。

到 F 图里面,可以看到,当前的 F 图是清理阶段已经完成的状态了:

判断标准有二:

1.和 E 图相比PrevBitmap 和 NextBitmap 已经交换了位置。

2.PrevBitmap 里面对应的地址的空间已经被标记为浅灰色了。

这个时候已经完成标记,PrevBitmap 又变成了第n-1次标记的结果。

你是什么垃圾-怼人

因为之前的文章已经发布了,所以我需要修改一下对应的内容。提醒后面的读者,如果看到了文章,需要注意这些地方描述的有问题。

但是我在查找我文章的过程中发现了一些让我很郁闷的事情,之前的文章,大都被剽窃了,我也见怪不怪,有时间就顺手举报一下了。

最过分的是下面这个:

这是一个百家号账号,一字不差的抄我文章,还自己标注为“原创”?

我去写了个评论:

他还不敢把评论放出来。

还有下面这个,你可长点心吧。你配的这张图片,我倒是想在家拍,但是我拍不出来呀:

这样的情况还有很多。说到底,就还是版权意识的问题。

版权问题,我之前在《订阅号做了77天,我挣了487.52元》这篇文章里面聊过:

我的号不会传播任何盗版资源,以前如此,现在如此,以后也会如此。

不做恶,就是最大的善。与君共勉。

所以我在此郑重声明,如果未经许可转载我的文章,必须标明原文地址,且保留文末公众号二维码,否则我一定见一个举报一个。

我先举报你涉黄,引起工作人员的注意,再举报你抄袭,让工作人员惩罚你。

气死我了。

最后说一句(求关注)

通过这件事我也再次感觉到了,看网上的野生文章(比如我的),要持有谨慎、怀疑、学习的态度。

才疏学浅,难免会有纰漏,如果你发现了错误的地方,还请你留言给我指出来,我对其加以修改。(我每篇技术文章都有这句话,我是认真的说的。)

感谢您的阅读,我坚持原创,十分欢迎并感谢您的关注。

我是why技术,一个不是大佬,但是喜欢分享,又暖又有料的四川好男人。

以上。

欢迎关注公众号【why技术】,坚持输出原创。分享技术、品味生活,愿你我共同进步。

查看原文

骑牛上青山 收藏了文章 · 6月22日

面试官:你说你熟悉jvm?那你讲一下并发的可达性分析

这是why技术的第35篇原创文章

上面这张图是我还是北漂的时候,在鼓楼附近的胡同里面拍的。

那天刚刚下完雨,路过这个地方的时候,一瞬间就被这五颜六色的门板和自行车给吸引了,于是拍下了这张图片。看到这张图片的时候我就很开心,多鲜活、多舒服的画面呀。

以后的文章里面我的第一张配图都用自己随时拍下的照片吧。分享生活、分享技术,哈哈。

好了,说回文章。

这次的文章我们聊聊jvm。jvm可以说是面试必备技能了。简历上写了,多问几句。简历上没写,也得提上几句。

我们先从一个简单的热身题入手,引出本文想要分享的内容。

当面试扯到jvm这一部分的时候,面试官大概率会问你jvm怎么判断哪些对象应该回收呢?

这种经典的面试题当然难不住你。

你会脱口而出引用计数算法可达性分析算法

然后你就停下来了吗?难道你不知道你回答了一句话之后,面试官肯定会接着问你能详细说明一下吗?所以,不要停。主动点,面试的时候主动点。你要抓住面试官把话语权交给你的宝贵机会,接着说啊,你得支棱起来

因为引用计数法的算法是这样的:在对象中添加一个引用计数器,每当一个地方引用它时,计数器就加一;当引用失效时,计数器值就减一;任何时刻计数器为零的对象就是不可能再被使用的。

但是这样的算法有个问题,是什么呢?

不经意间来一波自问自答。让面试官听的一愣一愣的。

就是不能解决循环依赖的问题。

并拿着自己准备的纸和笔快速的画出下面这样的图:

Object 1和Object 2其实都可以被回收,但是它们之间还有相互引用,所以它们各自的计数器为1,则还是不会被回收。

所以,Java虚拟机没有采用引用计数法。它采用的是可达性分析算法。

可达性分析算法的思路就是通过一系列的“GC Roots”,也就是根对象作为起始节点集合,从根节点开始,根据引用关系向下搜索,搜索过程所走过的路径称为引用链,如果某个对象到GC Roots间没有任何引用链相连。

用图论的话来说就是从GC Roots到这个对象不可达时,则证明此对象是不可能再被使用的。所以此对象就是可以被回收的对象。

说这句话的时候再次,快速的纸上画出下面的图:

好了,到这里就可以把话语权交给面试官了。因为到这里,他接下来可以问的点有很多,你不知道他会问什么,比如:

你刚刚谈到了根节点,那你知道哪些对象可以作为根对象吗?

你刚刚谈到了引用,那你知道java里面有哪几种引用吗?

你刚刚谈到了可达性分析算法,那如果在该算法中被判定不可达对象,是不是一定会被回收呢?

谈谈你熟悉的垃圾回收器和他们的工作过程?

.......

上面的这些问题都太常规了,任何一份面经里面都会有这样的几个问题。

而本文要解决的是下面这个稍微不那么常见,但是你答题的过程中一定会提到的点“并发标记”、“浮动垃圾”。

CMS和G1都是有一个并发标记的过程,并发标记要解决什么问题?带来了什么问题?怎么解决这些问题呢?

并发标记要解决什么问题?

刚刚我们谈到的可达性分析算法是需要一个理论上的前提的:该算法的全过程都需要基于一个能保障一致性的快照中才能够分析,这意味着必须全程冻结用户线程的运行。

为了不冻结用户线程的运行,那我们就需要让垃圾回收线程和用户线程同时运行。

所有我们来个反证法,先假设不并发标记,即只有垃圾回收线程在运行的流程是怎样的:

第一步是需要找到根节点,也就是我们常说的根节点枚举。

而在这个过程中,由于GC Roots是远远少于整个java堆中的全部对象的,而且在OopMap此类优化技巧的加持下,它带来的停顿时间是非常短暂且相对固定的,可以理解为不会随着堆里面的对象的增加而增加。大概就是下面这个图的意思:

但是我们做完根节点枚举,只是做完了第一步。接下来,我们需要从GC Roots往下继续遍历对象图,进行"标记"过程。而这一步的停顿时间必然是随着java堆中的对象增加而增加的。大概就是下面这个图的意思:

这个逻辑不复杂:堆约大,存储的对象越多,对象图结构越复杂,要标记更多对象,所以产生的停顿时间也自然就长了。

所有,经过上面的分析,我们知道了,根节点的枚举阶段是不太耗时的,也不会随着java堆里面存储的对象增加而增加耗时。而"标记"过程的耗时是会随着java堆里面存储的对象增加而增加的。

"标记"阶段是所有使用可达性分析算法的垃圾回收器都有的阶段。因此我们可以知道,如果能够削减"标记"过程这部分的停顿时间,那么收益将是可观的。

所以并发标记要解决什么问题呢?

就是要消减这一部分的停顿时间。那就是让垃圾回收器和用户线程同时运行,并发工作。也就是我们说的并发标记的阶段。

并发标记带来了什么问题?

在说带来什么问题之前,我们必须得先搞清楚一个问题:

为什么遍历对象图的时候必须在一个能保障一致性的快照中

为了说明这个问题,我们就要引入"三色标记"大法了。注意:"三色标记"也是jvm的一个考点哦。

什么是"三色标记"?《深入理解Java虚拟机(第三版)》中是这样描述的:

在遍历对象图的过程中,把访问都的对象按照"是否访问过"这个条件标记成以下三种颜色:

白色:表示对象尚未被垃圾回收器访问过。显然,在可达性分析刚刚开始的阶段,所有的对象都是白色的,若在分析结束的阶段,仍然是白色的对象,即代表不可达。

黑色:表示对象已经被垃圾回收器访问过,且这个对象的所有引用都已经扫描过。黑色的对象代表已经扫描过,它是安全存活的,如果有其它的对象引用指向了黑色对象,无须重新扫描一遍。黑色对象不可能直接(不经过灰色对象)指向某个白色对象。

灰色:表示对象已经被垃圾回收器访问过,但这个对象至少存在一个引用还没有被扫描过

读完上面描述,再品一品下面的图:

可以看到,灰色对象是黑色对象与白色对象之间的中间态。当标记过程结束后,只会有黑色和白色的对象,而白色的对象就是需要被回收的对象。

在可达性分析的扫描过程中,如果只有垃圾回收线程在工作,那肯定不会有任何问题。

但是垃圾回收器和用户线程同时运行呢?这个时候就有点意思了。

垃圾回收器在对象图上面标记颜色,而同时用户线程在修改引用关系,引用关系修改了,那么对象图就变化了,这样就有可能出现两种后果:

一种是把原本消亡的对象错误的标记为存活,这不是好事,但是其实是可以容忍的,只不过产生了一点逃过本次回收的浮动垃圾而已,下次清理就可以。

一种是把原本存活的对象错误的标记为已消亡,这就是非常严重的后果了,一个程序还需要使用的对象被回收了,那程序肯定会因此发生错误。

当面试官问你:为什么会产生浮动垃圾的时候,你就可以用上面的话来回答。

但是大概率情况下面试官应该更加关心第二种情况。

他可能会问:你刚刚说的第二种情况,"把原本存活的对象错误的标记为已消亡"能具体的说明一下吗?怎么消亡的?垃圾回收器是怎么解决这个问题的?

所以接下来,我们主要分析一下并发标记的过程中"对象消失"的问题。具体"对象"是怎么没了的。

这里借助《深入理解Java虚拟机(第三版)》的示例,但是第三版的示例的描述写的不是特别容易理解,我就尽我所能的描述的清楚一些,下面会结合动图,分析标记的三种情况:

正常标记

我们先看一下一次正常的标记过程:

首先是初始状态,很简单,只有GC Roots是黑色的。同时需要注意下面的图片的箭头方向,代表的是有向的,比如其中的一条引用链是:
根节点->5->6->7->8->11->10

在扫描的过程中,变化是这样的:

内心OS:为了做下面的这些动图、为了把动图里面的每张图截的大小一个像素都不差,鬼知道我做的多辛苦,做瞎我的钛合金狗眼。

你看上面的动图,灰色对象始终是介于黑色和白色之间的。当扫描顺利完成后,对象图就变成了这个样子:

此时,黑色对象是存活的对象,白色对象是消亡了,可以回收的对象。

记住,上面演示的是一切都是那么美好的正常情况。

对象消失的情况一

接下来,我们看看对象消失的情况:

如果用户线程在标记的时候,修改了引用关系,就会出现下面的情况:

当扫描完成后,对象图就变成了这个样子:

这时,我们和之前分析的正常扫描结束的对象图对比,就能清楚的看到,扫描完成后,原本还在被对象5引用的对象9,由于是白色对象,所以根据三色标记原则,对象9会被当成垃圾回收。

这样就出现了对象消失的情况。

对象消息的情况二

下面再给各位看看另外一种"对象消失"的现象:

上面演示的是用户线程切断引用后重新被黑色对象引用的对象就是原来引用链的一部分。

对象7和对象10本来就是原引用链(根节点->5->6->7->8->11->10)的一部分。修改后的引用链变成了(根节点->5->6->7->10)。

当扫描完成后,对象图就变成了这个样子:

由于黑色对象不会重新扫描,这将导致扫描结束后对象10和对象11都会回收了。他们都是被修改之前的原来的引用链的一部分。

所以,回到最开始的疑问:并发标记带来了什么问题?

经过我们上面三种情况(一种正常情况,两种"对象丢失"的情况)的动图分析,和扫描完成后的最终对象图进行分析对比,我们知道了,并发标记除了会产生浮动垃圾,还会出现"对象消失"的问题。

怎么解决"对象消失"问题呢?

有一个大佬叫Wilson,他在1994年在理论上证明了,当且仅当以下两个条件同时满足时,会产生"对象消失"的问题,原来应该是黑色的对象被误标为了白色:

条件一:赋值器插入了一条或者多条从黑色对象到白色对象的新引用。

条件二:赋值器删除了全部从灰色对象到该白色对象的直接或间接引用。

你在结合我们上面出现过的图捋一捋上面的这两个条件,是不是当且仅当的关系:

黑色对象5到白色对象9之间的引用是新建的,对应条件一。

黑色对象6到白色对象9之间的引用被删除了,对应条件二。

由于两个条件之间是当且仅当的关系。所以,我们要解决并发标记时对象消失的问题,只需要破坏两个条件中的任意一个就行。

于是产生了两种解决方案:增量更新(Incremental Update)和原始快照(Snapshot At The Beginning,SATB)。

在HotSpot虚拟机中,CMS是基于增量更新来做并发标记的,G1则采用的是原始快照的方式。

什么是增量更新呢?

增量更新要破坏的是第一个条件(赋值器插入了一条或者多条从黑色对象到白色对象的新引用),当黑色对象插入新的指向白色对象的引用关系时,就将这个新插入的引用记录下来,等并发扫描结束之后,再将这些记录过的引用关系中的黑色对象为根,重新扫描一次。

可以简化的理解为:黑色对象一旦插入了指向白色对象的引用之后,它就变回了灰色对象。

下面的图就是一次并发扫描结束之后,记录了黑色对象5新指向了白色对象9:

这样对象9又被扫描成为了黑色。也就不会被回收,所以不会出现对象消失的情况。

什么是原始快照呢?

原始快照要破坏的是第二个条件(赋值器删除了全部从灰色对象到该白色对象的直接或间接引用),当灰色对象要删除指向白色对象的引用关系时,就将这个要删除的引用记录下来,在并发扫描结束之后,再将这些记录过的引用关系中的灰色对象为根,重新扫描一次。

这个可以简化理解为:无论引用关系删除与否,都会按照刚刚开始扫描那一刻的对象图快照开进行搜索。

需要注意的是,上面的介绍中无论是对引用关系记录的插入还是删除,虚拟机的记录操作都是通过写屏障实现的。写屏障也是一个重要的知识点,但是不是本文重点,就不进行详细介绍了。

只是补充两点:

1.这里的写屏障和我们常说的为了解决并发乱序执行问题的"内存屏障"不是一码事,需要区分开来。

2.写屏障可以看作虚拟机层面对"引用类型字段赋值"这个动作的AOP切面,在引用对象赋值时会产生一个环形通知,供程序执行额外的动作,也就是说赋值的前后都在写屏障的覆盖范畴内。在赋值前的部分的写屏障叫做写前屏障(Pre-Write Barrier),在赋值后的则叫作写后屏障(Post-Write Barrier)。

所以,经过简单的推导我们可以知道:

增量更新用的是写后屏障(Post-Write Barrier),记录了所有新增的引用关系。

原始快照用的是写前屏障(Pre-Write Barrier),将所有即将被删除的引用关系的旧引用记录下来。

最后说一句(求关注)

最近有很多读者在找我修改简历、咨询工作的相关事情了,我就知道马上又要开始春招了。

其实我也不是很有资格给你们修改简历,也不是一个技术很牛逼的人,只是把我知道的分享出来了而已,不仅能让我巩固知识,还是倒逼我进行知识输入,在此之外还能对你有一点点帮助,那就是我文章的全部价值所在。

另外如果你正在经历春招或者社招,有兴趣的可以阅读一下我之前的这篇文章,看看是否有一点点帮助:

《面试了15位来自985/211高校的2020届研究生之后的思考》

才疏学浅,难免会有纰漏,如果你发现了错误的地方,还请你留言给我指出来,我对其加以修改。

如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。

感谢您的阅读,我坚持原创,十分欢迎并感谢您的关注。

以上。

查看原文

骑牛上青山 回答了问题 · 6月19日

解决Thread.sleep结束时会刷新工作内存吗

你这样的写法我们可以看到,在read线程内部先暂停了1s,然后在获取flag进行比较,但是这样的话会出现一个问题,write线程已经完全操作完了并且把flag修改了,此时你再获取flag那就已经是最新的了,所以会是true!所谓的线程之间不可见是因为线程内部保有了自己的副本,但是read线程在获取flag并且创建自己的副本的时候已经是最新的了,所以建议这样修改

public static boolean flag = false;

    public void start() {

        Thread read = new Thread(() -> {
            while (!flag) {
                
            }
        });
        read.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Thread write = new Thread(() -> flag = true);
        write.start();
    }

关注 3 回答 2

骑牛上青山 发布了文章 · 6月1日

JAVA concurrency -- ThreadLocal 源码详解

概述

在并发编程中,为了控制数据的正确性,我们往往需要使用锁来来保证代码块的执行隔离性。但是在很多时候锁的开销太大了,而在某些情况下,我们的局部变量是线程私有的,每个线程都会有自己的独自的变量,这个时候我们可以不对这部分数据进行加锁操作。于是ThredLocal应运而生。

ThredLocal顾名思义,是线程持有的本地变量,存放在ThredLocal中的变量不会同步到其他线程以及主线程,所有线程对于其他的线程变量都是不可见的。那么我们来看下它是如何实现的吧。

实现原理

ThredLocal在内部实现了一个静态类ThreadLocalMap来对于变量进行存储,并且在Thread类的内部使用到了这两个成员变量

    ThreadLocal.ThreadLocalMap threadLocals = null;
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

来调用ThreadLocalMap存储当前线程的内部变量。

ThreadLocalMap的实现

ThreadLocalMap是键值对结构的map,但是他没有直接使用HashMap,而是自己实现了一个。

Entry

EntryThreadLocalMap中定义的map节点,他以ThreadLocal弱引用为key,以Object为value的K-V形式的节点。使用弱引用是为了可以及时释放内存避免内存泄漏。

    static class Entry extends WeakReference<ThreadLocal<?>> {
        Object value;

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }

这里和HashMap不一样的地方在于两者解决hash冲突的方式的不同,HashMap采用的是链地址法,遇到冲突的时候将冲突的数据放入同一链表之中,等到链表到了一定程度再将链表转化为红黑树。而ThreadLocalMap实现采用的是开放寻址法,它内部没有使用链表结构,因此Entry内部没有next或者是prev指针。ThreadLocalMap的开放寻址法是怎么实现的,请看接下来的源码。

成员变量

    // map默认的初始化大小
    private static final int INITIAL_CAPACITY = 16;

    // 存储map节点数据的数组
    private Entry[] table;

    // map大小
    private int size = 0;

    // 临界值,达到这个值的时候需要扩容
    private int threshold;

    // 当临界值达到2/3的时候扩容
    private void setThreshold(int len) {
        threshold = len * 2 / 3;
    }

这里的数组大小始终是2的幂次,原因和HashMap一样,是为了在计算hash偏移的时候减少碰撞。

构造函数

    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
        // 初始化table
        table = new Entry[INITIAL_CAPACITY];
        // 计算第一个值的hash值
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
        // 创建新的节点
        table[i] = new Entry(firstKey, firstValue);
        size = 1;
        setThreshold(INITIAL_CAPACITY);
    }

set方法

    private void set(ThreadLocal<?> key, Object value) {

        // 获取ThreadLocal的hash值偏移量
        Entry[] tab = table;
        int len = tab.length;
        int i = key.threadLocalHashCode & (len-1);

        // 遍历数组直到节点为空
        for (Entry e = tab[i];
                e != null;
                e = tab[i = nextIndex(i, len)]) {
            ThreadLocal<?> k = e.get();

            // 如果节点key相等,即找到了我们想要的节点,
            // 将值赋予节点
            if (k == key) {
                e.value = value;
                return;
            }

            // 如果节点的key为空,说明弱引用已经把key回收了,那么需要做一波清理
            if (k == null) {
                replaceStaleEntry(key, value, i);
                return;
            }
        }

        // 如果没有找到对应的节点说明该key不存在,创建新节点
        tab[i] = new Entry(key, value);
        int sz = ++size;
        // 进行清理,如果清理结果没能清理掉任何的旧节点,
        // 并且数组大小超出了临界值,就进行rehash操作扩容
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            rehash();
    }

看到这段代码,开放寻址法的实现原理可以说是非常清楚了。首先计算节点的hash值,找到对应的位置,查看该位置是否为空,如果是空则插入,如果不为空,则顺延至下个节点,直到找到空的位置插入。那么我们的查询逻辑也呼之欲出:计算节点的hash值,找到对应的位置,查看该节点是否是我们想要找的节点,如果不是,则继续往下顺序寻找。

get方法

    private Entry getEntry(ThreadLocal<?> key) {
        // 计算hash值
        int i = key.threadLocalHashCode & (table.length - 1);
        // 获取该hash值对应的数组节点
        Entry e = table[i];
        if (e != null && e.get() == key)
            // 如果节点不为空并且key一致,说明是我们找的节点,直接返回
            return e;
        else
            // 否则继续往后寻找
            return getEntryAfterMiss(key, i, e);
    }
    
    private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
        Entry[] tab = table;
        int len = tab.length;

        // 如果节点不为空就一直找下去
        while (e != null) {
            ThreadLocal<?> k = e.get();
            // key相同则说明找到,返回该节点
            if (k == key)
                return e;
            // key为空进行一次清理
            if (k == null)
                expungeStaleEntry(i);
            else
                i = nextIndex(i, len);
            e = tab[i];
        }
        return null;
    }

replaceStaleEntry

    // 这个方法的作用是在set操作的时候进行清理
    private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
        Entry e;
        
        // slotToExpunge是之后开始清理的节点位置
        int slotToExpunge = staleSlot;
        // 往前寻找找到第一个为空的节点记录下位置
        for (int i = prevIndex(staleSlot, len);
                (e = tab[i]) != null;
                i = prevIndex(i, len))
            if (e.get() == null)
                slotToExpunge = i;

        // 从staleSlot开始向后遍历直到节点为空
        for (int i = nextIndex(staleSlot, len);
                (e = tab[i]) != null;
                i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();

            if (k == key) {
                // 如果节点的key一致,替换value的值
                e.value = value;

                // 将当前节点和staleSlot上的节点互换位置(将后方的值放到前方来,之前的值等待回收)
                tab[i] = tab[staleSlot];
                tab[staleSlot] = e;

                // 如果slotToExpunge和staleSlot相等,说明前面没有需要清理的节点
                // 则从当前节点开始进行清理
                if (slotToExpunge == staleSlot)
                    slotToExpunge = i;
                // 进行节点清理
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                return;
            }

            // 如果key为空并且slotToExpunge和staleSlot相等
            // 把slotToExpunge赋值为当前节点
            if (k == null && slotToExpunge == staleSlot)
                slotToExpunge = i;
        }

        // 如果没法找到key相等的节点,
        // 则清空当前节点的value并生成新的节点
        tab[staleSlot].value = null;
        tab[staleSlot] = new Entry(key, value);

        // 如果slotToExpunge和staleSlot不相等则需要进行清理(因为前方发现空的节点)
        if (slotToExpunge != staleSlot)
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    }

expungeStaleEntry

    // 对节点进行清理
    private int expungeStaleEntry(int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;

        // 释放当节点
        tab[staleSlot].value = null;
        tab[staleSlot] = null;
        size--;

        Entry e;
        int i;
        // 循环寻找到第一个空节点
        for (i = nextIndex(staleSlot, len);
                (e = tab[i]) != null;
                i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();
            // key为空进行节点释放
            if (k == null) {
                e.value = null;
                tab[i] = null;
                size--;
            } else {
                // 如果key不为空,找到对应的节点应该在的位置
                int h = k.threadLocalHashCode & (len - 1);
                if (h != i) {
                    // 如果和当前节点位置不同,
                    // 则清理节点并且循环找到后面的非空节点移到前面来
                    tab[i] = null;

                    while (tab[h] != null)
                        h = nextIndex(h, len);
                    tab[h] = e;
                }
            }
        }
        return i;
    }

cleanSomeSlots

    // 该方法用于清理空节点
    private boolean cleanSomeSlots(int i, int n) {
        // 标记是否有节点被清除
        boolean removed = false;
        Entry[] tab = table;
        int len = tab.length;
        do {
            i = nextIndex(i, len);
            Entry e = tab[i];
            // 如果有节点为空并且key为空
            // 该节点需要被清除
            if (e != null && e.get() == null) {
                // 重置n的值并且标记removed为true
                n = len;
                removed = true;
                // 清理该节点
                i = expungeStaleEntry(i);
            }
        } while ( (n >>>= 1) != 0);
        return removed;
    }

rehash

    // 当数组的元素到达临界值,进行扩容
    private void rehash() {
        // 先对所有的节点进行清理
        expungeStaleEntries();

        // 然后判断临界值是否进行扩容
        // 此处由于先做过一次清理,这里的数字可能会和之前的临界值判断有缩小
        // 所以此处临界值判断为threshold - threshold / 4
        // 即1/2的size时进行扩容
        if (size >= threshold - threshold / 4)
            resize();
    }

    private void resize() {
        // 获取旧数组,开辟新数组
        // 新数组大小为旧数组的2倍
        Entry[] oldTab = table;
        int oldLen = oldTab.length;
        int newLen = oldLen * 2;
        Entry[] newTab = new Entry[newLen];
        int count = 0;

        // 遍历旧数组
        for (int j = 0; j < oldLen; ++j) {
            Entry e = oldTab[j];
            if (e != null) {
                // 如果节点不为空,判断key是否为空
                // 如果key为空,将节点置空帮助gc
                // 如果key不为空将旧数组的节点放入新数组
                // 放入方式和set实现一致,只是由于是刚创建的新数组
                // 不会有需要清理的数据,所以不需要额外清理
                ThreadLocal<?> k = e.get();
                if (k == null) {
                    e.value = null;
                } else {
                    int h = k.threadLocalHashCode & (newLen - 1);
                    while (newTab[h] != null)
                        h = nextIndex(h, newLen);
                    newTab[h] = e;
                    count++;
                }
            }
        }

        setThreshold(newLen);
        size = count;
        table = newTab;
    }

expungeStaleEntries

    // 清理所有节点
    private void expungeStaleEntries() {
        Entry[] tab = table;
        int len = tab.length;
        // 循环清理
        for (int j = 0; j < len; j++) {
            Entry e = tab[j];
            if (e != null && e.get() == null)
                expungeStaleEntry(j);
        }
    }

关于Map的清理

ThreadLocalMap实现采用的是开放寻址法,它的实现本身应该是比较简洁的,但是为了便于GC,内部节点采用了弱引用作为key,一旦数组中节点的强引用被设置为了null,节点的key就会被gc自动回收。这样导致了ThreadLocalMap的实现变得异常的复杂。为了防止内存泄漏,在get和set方法的时候不得不进行额外的清理。

Q 为什么需要清理?

A 不清理的话key被回收,但是value依旧会存在,并且难以被回收导致内存泄漏。

Q 为什么清理的时候会涉及到节点的移动?

A 因为在开放寻址法中,可能会有相同hash值的节点连续排在一起,当其中的一个或多个节点被回收后会造成同hash值的节点中间存在null节点,而我们get节点的时候会在碰到空节点的时候停止寻找,所以如果不进行一定的清理移动会导致部分节点永远不会被查询到。

ThreadLocal的实现

hashcode的实现

讲完了ThreadLocalMap的实现原理,我们可以深深的体会到ThreadLocalhashcode是多么的重要,如果hash值不能够以合理的方式生成,导致数据的分布不均匀,ThreadLocalMap的效率将会非常的低下。

hashcode的实现:

    private final int threadLocalHashCode = nextHashCode();

    private static AtomicInteger nextHashCode =
        new AtomicInteger();

    private static final int HASH_INCREMENT = 0x61c88647;

    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

ThreadLocalhashcode实现代码很简短:每一个新的ThreadLocal的hash值都是在nextHashCode的基础上增加0x61c88647。实现很简单,但是很让人迷惑。这个莫名其妙的魔数0x61c88647是什么?

0x61c88647是有斐波那契构造而成的黄金比例数字,经过实验测试,这个数字生成的hashcode可以很大程度的保证hash值能够在数组中均匀的分布。

get

    public T get() {
        // 获取当前线程
        Thread t = Thread.currentThread();
        // 获取当前线程的变量map
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            // 找到值返回
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        // 如果找不到返回默认值
        return setInitialValue();
    }

set

    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        // 如果map不为空,加入数据
        if (map != null)
            map.set(this, value);
        else
            // 否则新建map并放入第一个和数据
            createMap(t, value);
    }

总结

Thredlocal这个类可能对于很多人来说是一个常常会用到的类,但是未必所有人都会去关注他的内部实现,但是他的源码是比较值得去阅读的,一来它的实现代码相对其他的常用类很短,只有几百行;二来它的实现很经典,经典的开放寻址法,经典的弱引用方便GC,可以说是很好的学习材料。

这里我虽然对于整个Thredlocal的源码进行了完整的注释解释,但是它最值得细细品味的还是它的设计理念以及设计思路,这会对我们写出优秀的代码有着重要的作用。

查看原文

赞 0 收藏 0 评论 0

骑牛上青山 回答了问题 · 5月29日

有关数据结构单链表整表创建的表尾插入法的一个问题

因为你的p节点是新增的节点,所以必然是尾巴节点,而尾插入为了保证插入始终是在尾巴节点因此在插入后必须保证r也同样是尾巴节点,所以当r->next = p执行之后,实际上的链表插入已经完成了,但是由于插入了新的节点,之前是尾节点的r现在不是尾节点了,那该怎么办呢?没事,p是新插入的尾节点啊,那就把p直接复制给r不就好了吗,于是r变成了尾节点,可以进行下一次的插入了

关注 4 回答 3

骑牛上青山 回答了问题 · 4月27日

如何每隔5秒/30秒/1分钟/3分钟/5分钟/10分钟/30分钟执行一段代码

一般的话是定时任务,或者可以用延时队列来做。延时队列算法可以用时间轮算法,或者直接用第三方的redis或者消息队列来实现延时队列

关注 7 回答 6

骑牛上青山 发布了文章 · 4月23日

JAVA concurrency -- CyclicBarrier 与 CountDownLatch 源码详解

概述

CountDownLatchCyclicBarrier有着相似之处,并且也常常有人将他们拿出来进行比较,这次,笔者试着从源码的角度分别解析这两个类,并且从源码的角度出发,看看两个类的不同之处。

CountDownLatch

CountDownLatch从字面上来看是一个计数工具类,实际上这个类是用来进行多线程计数的JAVA方法。

CountDownLatch内部的实现主要是依靠AQS的共享模式。当一个线程把CountDownLatch初始化了一个count之后,其他的线程调用await就会阻塞住,直到其他的线程一个一个调用countDown方法进行release操作,把count的值减到0,即把同步锁释放掉,await才会进行下去。

Sync

内部主要还是实现了一个继承自AQS的同步器SyncSync源码如下:

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        // 构造方法,参数是count的数值
        Sync(int count) {
            // 内部使用state来存储count
            setState(count);
        }

        // 获取count的值
        int getCount() {
            return getState();
        }

        // 尝试获取分享模式同步器
        protected int tryAcquireShared(int acquires) {
            // 判断state的值,如果为0则获取成功,否则获取失败
            // 继承自AQS,根据AQS中的注释我们可以知道如果返回结果
            // 大于0则说明获取成功,如果小于0则说明获取失败
            // 此处不会返回0,因为没有意义
            return (getState() == 0) ? 1 : -1;
        }

        // 释放同步器
        protected boolean tryReleaseShared(int releases) {
            // 自选操作
            for (;;) {
                // 获取state
                int c = getState();
                // 如果state为0,直接返回false
                if (c == 0)
                    return false;
                // 计算state-1的结果
                int nextc = c-1;
                // CAS操作将这个值同步到state上
                if (compareAndSetState(c, nextc))
                    // 如果同步成功,则判断是否此时state为0
                    return nextc == 0;
            }
        }
    }

Sync是继承自AQS的同步器,这段代码中值得拿出来讨论的有以下几点:

  1. 为什么用state来存储count的数值?

    因为state和count其实上是一个概念,当state为0的时候说明资源是空闲的,当count为0时,说明所有的CountDownLatch线程都已经完成,所以两者虽然说不是同样的意义,但是在代码实现层面的表现是完全一致的,因此可以将count记录在state中。

  2. 为什么tryAcquireShared不会返回0?

    首先需要解释下tryAcquireShared在AQS中可能的返回值:负数说明是不可以获取共享锁,0说明是可以获取共享锁,但是当前线程获取后已经把所有的共享锁资源占完了,接下来的线程将不会再有多余资源可以获取了,正数则说明了你可以获取共享锁,并且之后还有余量可以给其他线程提供共享锁。然后我们回过来看CountDownLatch内部的tryAcquireShared,我们在实现上完全不关注后续线程,后续的资源占用状况,我只要当前状态,那么这个0的返回值实际上是没有必要的。

  3. 为什么tryReleaseShared中的参数不被使用到?

    根据这个类的实现方式,我们可以知道tryReleaseShared的参数一定是1,因为线程的完成一定是一个一个倒数完成的。实际上我们去看countDown方法内部调用到了sync.releaseShared方法的时候可以发现他写死了参数为1,所以实际上tryReleaseShared中的参数不被使用到的原因是因为参数值固定为1.

构造函数和方法

    // 构造方法
    public CountDownLatch(int count) {
        // count必须大于0
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // 初始化Sync
        this.sync = new Sync(count);
    }


    // 等待获取锁(可被打断)
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // 等待获取锁(延迟)
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // 计数器降低(释放同步器)
    // 每次调用减少1
    public void countDown() {
        sync.releaseShared(1);
    }

    // 获取count
    public long getCount() {
        return sync.getCount();
    }

    // toString
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }

CyclicBarrier

CyclicBarrier从字面上看是循环栅栏,在JAVA中的作用是让所有的线程完成后进行等待,直到所有的线程全部完成,再进行接下来的操作。

CyclicBarrier并没有直接继承AQS实现同步,而是借助了可重入锁ReentrantLock以及Condition来完成自己的内部逻辑。

成员变量

    // 锁
    private final ReentrantLock lock = new ReentrantLock();

    // 条件
    private final Condition trip = lock.newCondition();

    // 线程数
    private final int parties;

    // 执行完所有线程后执行的Runnable方法,可以为空
    private final Runnable barrierCommand;

    // 分组
    private Generation generation = new Generation();

    // 未完成的线程数
    private int count;

    private static class Generation {
        boolean broken = false;
    }

我们可以看到成员变量中有一个很陌生的类Generation,这个是CyclicBarrier内部声明的一个static类,作用是帮助区分线程的分组分代,使得CyclicBarrier可以被复用,如果这个简单的解释不能够让你很好地理解的话可以看接下来的源码解析,通过实现来理解它的用途。

构造函数

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

很常规的构造函数,只是简单的初始化成员变量,没有特别的地方。

核心方法

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe);
        }
    }

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

awaitCyclicBarrier的核心方法,就是靠着这个方法来实现线程的统一规划的,其中调用的是内部实现的doWait,我们来看下代码:

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 常规的加锁操作,至于为什么要用本地变量操作,
        // 可以去看下我写的另一篇ArrayBlockingQueue的相关文章
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 获取Generation类
            final Generation g = generation;

            // 查看generation是否是broken,如果是broken的,
            // 那说明之前可能因为某些线程中断或者是一些意外状态导致没有办法
            // 完成所有线程到达终点(tripped)的目标而只能报错
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果线程被外部中断需要报错,并且在内部需要将
            // generation的broken置为true来让其他线程能够感知到中断
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // 将线程未完成数减1
            int index = --count;
            // 如果此时剩余线程数为0,则说明所有的线程均已完成,即到达tripped状态
            if (index == 0) {
                boolean ranAction = false;
                try {
                    // 如果有预设完成后执行的方法,则执行
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 此时由于这一个轮回的线程已经全部完成,
                    // 所以调用nextGeneration方法开启一个新的轮回
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 如果此时还有其他的线程未完成,则当前线程开启自旋模式
            for (;;) {
                try {
                    if (!timed)
                        // 如果timed为false,trip则阻塞住直到被唤醒
                        trip.await();
                    else if (nanos > 0L)
                        // 如果timed为true,则调用awaitNanos设定时间
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                // 查看generation是否是broken,如果是broken的抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();

                // 如果g != generation意味着generation
                // 已经被赋予了一个新的对象,这说明要么是所有线程已经完成任务开启下一个轮回,
                // 要么是已经失败了,然后开启的下一个轮回,无论是哪一种情况,都return
                if (g != generation)
                    return index;

                // 如果已经超时,则强制打断
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

看完这段核心代码之后我们回头再来反思Generation的意义,我们已经可以大致的给出使用Generation的理由了:

不同于CountDownLatch的实现,CyclicBarrier采取了更加复杂的方式,原因便是因为内部涉及到了多线程之间的干预与通信,CountDownLatch不关心线程的实现与进程,他只是一个计数器,而CyclicBarrier则需要知道线程是否正常的完结,是否被中断,如果用其他的方式代价会比较高,因此,CyclicBarrier的作者通过静态内部类的方式将整个分代的状态共享于多个线程之间,保证每个线程能够获取到栅栏的状态以及能够将自身的状态更好的反馈回去。同时,这种方式便于重置,也使得CyclicBarrier可以高效的重用。至于为什么broken没有用volatile修饰,因为类的方法内部全部都上了锁,所以不会出现数据不同步的问题。

总结

CountDownLatchCyclicBarrier从使用上来说可能会有一些相似之处,但是在我们看完源码之后我们会发现两者可以说是天差地别,实现原理,实现方式,应用场景均不相同,总结下来有以下几点:

  1. CountDownLatch实现直接依托于AQSCyclicBarrier则是借助了ReentrantLock以及Condition
  2. CountDownLatch是作为计数器存在的,因此采取了讨巧的设计,源码结构清晰并且简单,同样功能也较为简单;CyclicBarrier则为了实现多线程的掌控,采用了比较复杂的设计,在代码实现上也显得比较弯弯绕绕。
  3. 由于CyclicBarrier采用的实现方式,相比一次性的CountDownLatchCyclicBarrier可以多次重复使用
  4. 计数方式的不同:CountDownLatch采用累加计数, CyclicBarrier则使用倒数计数
查看原文

赞 0 收藏 0 评论 0

骑牛上青山 发布了文章 · 4月16日

JAVA concurrency -- 阻塞队列ArrayBlockingQueue源码详解

概述

ArrayBlockingQueue顾名思义,使用数组实现的阻塞队列。今天我们就来详细讲述下他的代码实现

阻塞队列

什么是阻塞队列?

阻塞队列是一种特殊的队列,使用场景为并发环境下。在某种情况下(当线程无法获取锁的时候)线程会被挂起并且在队列中等待,如果条件具备(锁被释放)那么就会唤醒挂起的线程。

通俗点来讲的话,阻塞队列类似于理发店的等待区,当没有理发师空闲的时候,客人会在等待区等待,一旦有了空闲,就会有人自动递补。

类的继承关系

ArrayBlockingQueue继承关系.png

ArrayBlockingQueue继承了抽象队列,并且实现了阻塞队列,因此它具备队列的所有基本特性。

基本实现原理

ArrayBlockingQueue的实现是基于ReentrantLock以及AQS内部实现的锁机制以及Condition机制。
ArrayBlockingQueue内部声明了两个Condition变量,一个叫notEmpty,一个叫notFull,当有数据加入队列时尝试唤醒notEmpty,当有数据移除队列时则唤醒notFull,从而实现一个类似于生产者消费者模型的机制。

源码分析

类成员变量

    // 队列的存储对象数组
    final Object[] items;

    // 下一个取出的序号
    int takeIndex;

    // 下一个放入队列的序号
    int putIndex;

    // 队列中的元素数目
    int count;

    // 锁以及用来控制队列的两个条件变量
    final ReentrantLock lock;

    private final Condition notEmpty;

    private final Condition notFull;

    transient Itrs itrs = null;

构造函数

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    // 通用的构造函数,以容量和是否公平锁为参数,余下两个构造函数均调用此函数
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        // 调用构造函数
        this(capacity, fair);

        // 为阻塞队列初始化数据(此操作需要上锁)
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = 0;
            try {
                // 将集合中的数据存放到数组中并且进行判空操作
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            // 修改count和putIndex的值
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

这里有一点疑问,这里明明是构造函数,是类初始化的地方,照理来说不会产生竞争,为什么要进行加锁操作呢?此处原本有一句原版的注释 Lock only for visibility, not mutual exclusion 锁是为了可见性而不是互斥。这句话怎么理解呢?我们仔细观察代码,发现当我们把集合中的数据全部插入队列中之后,我们会修改相应的count以及putIndex的数值,但是如果我们没有加锁,那么在集合插入完成前count以及putIndex没有完成初始化操作的时候如果有其他线程进行了插入等操作的话,会造成数据同步问题从而使得数据不准确,因此这里的锁是必要的。

队列操作

基础队列操作enqueue和dequeue

    // 队列的插入操作
    private void enqueue(E x) {
        // 本地声明一个item数组的引用
        final Object[] items = this.items;
        // 将元素放入数组中
        items[putIndex] = x;
        // 如果此时已经到了数组的末尾了,将putIndex重置为0
        if (++putIndex == items.length)
            putIndex = 0;
        // 元素数目加1
        count++;
        // 发出通知告诉所有取数据的线程可以取数据
        notEmpty.signal();
    }

    // 队列的移除操作
    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // 找到要移除的数据置空
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        // 如果此时已经到了数组的末尾了,将takeIndex重置为0
        if (++takeIndex == items.length)
            takeIndex = 0;
        // 元素数目减1
        count--;
        // 迭代器操作,这个之后再说
        if (itrs != null)
            itrs.elementDequeued();
        // 发出通知告知插入线程可以工作
        notFull.signal();
        return x;
    }

这两个方法是队列操作的基本方法,基本上就是常规的数组数据插入移除,只是有一点很让人困惑 final Object[] items = this.items; 这段代码实现将类成员对象在本地创建了一个引用,然后在本地使用引用进行操作,为什么要多此一举呢?除此之外,代码中大量用到了这种手法,例如: final ReentrantLock lock = this.lock; 这又是为了什么呢?对此笔者猜测可能是和优化相关,因为jdk7中的实现与之不同,是使用的类变量直接操作。在进行了资料查阅后,笔者找到了一个相对靠谱的解释:

这是ArrayBlockingQueue的作者Doug Lea的习惯,他认为这种书写习惯是对机器更加友好的书写

当然也有一些大神有一些其他的解释:

final本身是不可变的,但是由于反射以及序列化操作的存在,final的不可变性就变得捉摸不定,除此之外一些编译器层面上在final上优化的不够好,导致会在使用到数据的时候反复重载导致缓存失效

希望大家可以自己认真思考下,然后尝试下,得到自己的结论。

阻塞队列的插入操作

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 如果阻塞队列已满,那么插入失败
            if (count == items.length)
                return false;
            else {
                // 否则插入成功
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

阻塞队列插入操作大致就以上几种,这几种的区别在代码中也体现得比较清楚了:

  1. offer返回的是布尔值,插入成功返回true否则(队列已满)返回false
  2. put没有返回值,假如队列是满的,他会一直阻塞直到队列为空的时候执行插入操作
  3. add实际上调用的就是offer,只是他在加入失败后会抛出异常

阻塞队列的移除操作

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex);
        } finally {
            lock.unlock();
        }
    }
  1. poll执行成功会返回队列元素,如果队列为空则直接返回null
  2. take执行成功会返回队列元素,但是如果队列为空他不会返回而是等待有数据插入,然后取出
  3. peek则是直接获取队列元素,并且执行后不会将元素从队列中删除

迭代器实现

由于迭代器和内部队列共享数据,再加上阻塞队列的特性,导致为了实现迭代器功能,需要新增一些很复杂的代码实现。

内部声明了两个类来实现迭代器,一个是Itr继承Iterator<E>,一个则是Itrs

Itrs

Itrs是用来管理迭代器的。由于阻塞队列内部可能会有多个迭代器在同时工作,在迭代器内部发生删除或者是一些不常见的操作时可能会产生一些问题,比如他们会丢失自己的数据之类的。所以Itrs内部会维护一个变量用于记录循环的圈数,并且在删除操作removeAt的时候会通知所有的迭代器。

    class Itrs {
        // 创建一个Node类作为单向链表(节点是弱引用)来管理迭代器
        private class Node extends WeakReference<Itr> {
            Node next;

            Node(Itr iterator, Node next) {
                super(iterator);
                this.next = next;
            }
        }

        // 循环圈数
        int cycles = 0;

        // 链表头
        private Node head;

        // 清理相关的变量
        private Node sweeper = null;

        private static final int SHORT_SWEEP_PROBES = 4;
        private static final int LONG_SWEEP_PROBES = 16;

        Itrs(Itr initial) {
            register(initial);
        }

        // 清理无效的迭代器(如果sweeper为空,则从头开始,否则从sweeper记录的节点开始)
        void doSomeSweeping(boolean tryHarder) {
            
        }

        // 新增加一个迭代器
        void register(Itr itr) {
            head = new Node(itr, head);
        }

        // 当takeIndex为0时调用此方法
        void takeIndexWrapped() {
            // cycle数+1,内部实现通知所有迭代器并进行清理(链表遍历)
        }

        // 有移除操作的时候调用此方法,并通知所有迭代器进行清理
        void removedAt(int removedIndex) {
            // 简单的链表遍历,内部调用Itr的removedAt方法
        }

        // 当发现队列为空的时候调用此方法,清理迭代器内的弱引用
        void queueIsEmpty() {
            
        }

        // 有元素被取时是调用
        void elementDequeued() {
            // 如果数组为空调用queueIsEmpty进行清理
            if (count == 0)
                queueIsEmpty();
            // 如果takeIndex为0,调用takeIndexWrapped,来进行循环+1操作
            else if (takeIndex == 0)
                takeIndexWrapped();
        }
    }

Itr

Itrs是管理迭代器的,Itr则是迭代器的具体实现

    private class Itr implements Iterator<E> {
        // 游标,用于寻找下一个元素
        private int cursor;

        // 下一个元素
        private E nextItem;

        // 下一个元素的下标
        private int nextIndex;

        // 上一个元素
        private E lastItem;

        // 上一个元素的下标
        private int lastRet;

        // 上一个take的下标
        private int prevTakeIndex;

        // 上一个循环
        private int prevCycles;

        // 标记为空
        private static final int NONE = -1;

        // 删除标记
        private static final int REMOVED = -2;

        // DETACH标记专用于prevTakeIndex
        private static final int DETACHED = -3;

        Itr() {
            // 这是构造函数,内部实现主要是初始化为主,
            // 并且在Itrs不为空的时候进行一波清理操作
        }

        boolean isDetached() {
            return prevTakeIndex < 0;
        }

        private int incCursor(int index) {
            // 游标+1,并重新计算值(判断是否走完一个循环,是否等于putIndex)
            if (++index == items.length)
                index = 0;
            if (index == putIndex)
                index = NONE;
            return index;
        }

        // 判断给的删除数是否是有效值
        private boolean invalidated(int index, int prevTakeIndex,
                                    long dequeues, int length) {
            
        }

        // 计算在迭代器的上一次操作后所有的删除(出队)操作
        private void incorporateDequeues() {
            // 主要方法为通过当前圈数和之前的圈数以及偏移量计算
            // 真实的删除数,并且和prevTakeIndex以及index的偏移量进行比较
        }

        // 进行detach操作并进行清理
        private void detach() {
            
        }

        // 判断是否有下一个节点
        public boolean hasNext() {
            
        }

        // 没有下一个节点(没有detach的节点将会被执行detach操作)
        private void noNext() {
            
        }

        // 找到下个节点
        public E next() {
            // 实现不复杂,主要是需要判断节点是否是detach模式
        }

        // 删除节点
        public void remove() {
            
        }

        // 当队列为空或者后续很难找到下个节点的时候通知迭代器
        void shutdown() {
            
        }

        // 辅助计算游标和prevTakeIndex之间的距离
        private int distance(int index, int prevTakeIndex, int length) {
            
        }

        // 删除节点
        boolean removedAt(int removedIndex) {
            
        }

        // 当takeIndex归0时调用
        boolean takeIndexWrapped() {
            
        }
    }

总结

ArrayBlockingQueue的实现可以说是比较的简单清晰,主要是利用了ReentrantLock内部的Condition,通过设置两个条件来巧妙地完成阻塞队列的实现,只要能够理解这两个条件的工作原理,源码的理解就没有太大的难度。ArrayBlockingQueue较难理解的反而是它内部的迭代器,由于阻塞队列的特性,他的迭代器可能会有丢失当前数据的风险,因此,作者创作的时候加入了许多复杂的方法来保证可靠性,但是在这里由于篇幅限制,以及迭代器在阻塞队列中的地位和重要性并不高,所以简单讲述,如果有兴趣可以自己找一份源码阅读。

查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 39 次点赞
  • 获得 25 枚徽章 获得 1 枚金徽章, 获得 6 枚银徽章, 获得 18 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2015-07-13
个人主页被 735 人浏览