神易风

神易风 查看完整档案

广州编辑  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑

<script>alert("hello world")</script>

个人动态

神易风 发布了文章 · 10月18日

Spring 配置文件字段注入到List、Map

今天给大家分享冷门但是有很实小知识,Spring 配置文件注入list、map、字节流。

list 注入

properties文件

user.id=3242,2323,1

使用spring el表达式

@Value("#{'${user.id}'.split(',')}")
private List<Integer> list;

yaml 文件

在yml配置文件配置数组方式

number:
  arrays: 
    - One
    - Two
    - Three
@Value("${number.arrays}")
private List list

虽然网上都说,这样可以注入,我亲身实践过了,肯定是不能的。会抛出 Caused by: java.lang.IllegalArgumentException: Could not resolve placeholder 'number.arrays' in value "${number.arrays}"异常。要想注入必须要使用@ConfigurationProperties

@ConfigurationProperties(prefix = "number")
public class AgentController {

    private List arrays;
    public List getArrays() {
        return arrays;
    }

    public void setArrays(List arrays) {
        this.arrays = arrays;
    }
    @GetMapping("/s")
    public List lists(){
        return arrays;
    }

不是想这么麻烦,可以像properties文件写法,使用el表达式即可

number:
  arrays: One,Two,Three
@Value("#{'${number.arrays}'.split(',')}")
private List arrays;

注入文件流

    @Value("classpath: application.yml")
    private Resource resource;
   
    //  占位符
    @Value("${file.name}")
    private Resource resource2;

    @GetMapping("/s")
    public String lists() throws IOException {
        return IOUtils.toString(resource.getInputStream(),"UTF-8");
    }

从类路径加载application.yml文件将文件注入到org.springframework.core.io.Resource ,可以使用getInputStream()方法获取流。比起使用类加载器获取路径再去加载文件的方式,优雅、简单不少。

Map Key Value 注入

properties

resource.code.mapper={x86:"hostIp"}

@Value("#{${resource.code.mapper}}")

private Map<String, String> mapper;

成功注入

yaml

在yaml文件中,使用@Value不能注入Map 实例的,要借助@ConfigurationProperties 才能实现。

@ConfigurationProperties(prefix = "blog")
public class AgentController {

    private Map website;

    public Map getWebsite() {
        return website;
    }

 public void setWebsite(Map website) {
        this.website = website;
    }

    @GetMapping("/s")
    public String lists() throws IOException {
        return JsonUtil.toJsonString(website);
    }

配置文件

blog:
  website:
    juejin: https://juejin.im
    jianshu: https://jianshu.com
    sifou: https://segmentfault.com/

可以看出@ConfigurationProperties注入功能远比@Value强,不仅能注入List、Map这些,还能注入对象属性,静态内部类属性,这个在Spring Boot Redis模块 org.springframework.boot.autoconfigure.data.redis.RedisProperties体现出来。

区别

区别@ConfigurationProperties@Value
类型各种复制类型属性Map、内部类只支持简单属性
spEl表达式不支持支持
JSR303数据校验支持不支持
功能一个列属性批量注入单属性注入
查看原文

赞 2 收藏 2 评论 0

神易风 发布了文章 · 7月30日

Spring事务传播行为

本文出处Spring 事务传播行为
转载请说明出处

在Spring @Transactional 声明式事务有一个项属性propagation事务传播行为,是一个Propagation 枚举类,有7种类型,对应不同使用场景。下面说下这些枚举代码含义,在结合代码加深理解,巩固学习。

Spring 属性说明
REQUIRED支持当前事务,如果不存在事务则创建一个新事务。这个propagation默认属性
SUPPORTS支持当前事务,如果不存在事务则按照无事务执行
MANDATORY支持当前事务,如果不存事务在则抛出异常
REQUIRES_NEW创建一个新的事务,如果存在当前事务,则将它挂起
NOT_SUPPORTED无事务执行,如果存在事务将它挂起
NEVER无事务执行,如果存在当前事务则抛出异常
NESTED如果当前事务存在,则在嵌套事务中执行,如果不存在事务就像REQUIRED效果

事务传播级别失效

先做一个小实验,写一个持久化的save方法,设置成禁用事务,用事务方法去调用它,看会发生什么效果。

    @Transactional(propagation = Propagation.NEVER)
    public void save(Users users){
        usersRepository.save(users);
    }

    @Transactional
    public void multSave(){
        Users users = getUsers();
        save(users);
    }

按照上面字段含义解析,save方法不支持事务,在当前事务运行会抛出异常。可是执行结果不是预料那样,没有抛出异常,数据写入到数据库了。并且我们可以在改一次save方法,直接抛出一个RuntimeTime异常,你会发现结果更加明显。

    @Transactional(propagation = Propagation.NEVER)
    public void save(Users users){
        usersRepository.save(users);
        throw new RuntimeException("4322");
    }

    @Transactional
    public void multSave(){
        Users users = getUsers();
        save(users);
    }

你会发现save 方法 数据回滚了。
amBce1.jpg
在做多几次这种实验你就会发现,在同一个类中,最外层事务注解属性会覆盖方法内所有事务注解,比如你的外层设置默认事务传播行为,无论调用那种行为都会按照默认属性,外层没有设置事务,调用方法有事务也不会生效的。这个不难理解,实现Spring 事务一个基于AOP的代理类,它根据目标方法注解来增强方法,这个是运行时动态执行的,方法内其他方法没有代理类的功能增强,就相当于普通方法效果了。千万不要在同一个类里面测试事务传播行为,否则你永远都都得不到结论的。

进入方法验证

默认事务

    @Transactional
    public void save(){
        Employee employee = new Employee();
        employee.setDepartmentId(2);
        employee.setName("Jorry");
        employee.setSalary(60000);
        repository.save(employee);
        throw new RuntimeException("33333");
    }

    @Transactional
    public void multSave(){
        Users users = getUsers();
        save(users);
        employeeService.save();

    }

执行结果: users、employee写入失败,employee的方法抛出异常触发回滚,也会导致外层方法回滚。在同一个事务中,只要触发回滚,事务内所有数据操作都会回滚的。

支持事务

    @Transactional(propagation = Propagation.SUPPORTS)
    public void save(){
        Elimployee employee = new Employee();
        employee.setDepartmentId(2);
        employee.setName("Jorry");
        employee.setSalary(60000);
        repository.save(employee);
    }

    @Transactional
    public void multSave(){
        Users users = getUsers();
        save(users);
        employeeService.save();
        throw new RuntimeException("3333");
    }

执行效果: Usres和Elimployee 都进行回滚了。SUPPORTS会加入调用者事务中,和调用者事务是同一个事务,如果在事务中出现异常全部回滚。即使在employeeService.save() 抛出异常,效果一样的。

开启新事务

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void save(){
        Employee employee = new Employee();
        employee.setDepartmentId(2);
        employee.setName("Jorry");
        employee.setSalary(60000);
        repository.save(employee);
        throw new RuntimeException("3333");
    }

    @Transactional
    public void multSave(){
        Users users = getUsers();
        save(users);
        try {
            employeeService.save();
        }catch (RuntimeException e){

        }

执行结果: empoyee回滚,users写入数据库成功。要处理employeeService.save()异常,不然异常会导致multSave出现异常事务回滚了。加入一个对照,在把employeeService.save()改成默认属性,发现users,employee 都会回滚了,还会抛出一个异常

org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only

Spring官方文档说到如果内部事务(外部调用者不知道)将事务无提示地标记为仅回滚,则外部调用者仍会调用commit。外部调用者需要接收一个,UnexpectedRollbackException以清楚地指示已执行回滚。

强制不使用事务

    @Transactional(propagation = Propagation.NEVER)
    public void save(){
        Employee employee = new Employee();
        employee.setDepartmentId(2);
        employee.setName("Jorry");
        employee.setSalary(60000);
        repository.save(employee);
    }

    @Transactional
    public void multSave(){
        Users users = getUsers();
        save(users);
        employeeService.save();

    }

执行结果: 两个表都写入失败,employeeService.save() 抛出异常

org.springframework.transaction.IllegalTransactionStateException: Existing transaction found for transaction marked with propagation 'never'

不支持事务

    @Transactional(propagation = Propagation.NOT_SUPPORTED)
    public void save(){
        Employee employee = new Employee();
        employee.setDepartmentId(2);
        employee.setName("Jorry");
        employee.setSalary(60000);
        repository.save(employee);
        throw new RuntimeException("33333");
    }

    @Transactional
    public void multSave(){
        Users users = getUsers();
        save(users);
        employeeService.save();
    }

执行结果:employee成功写入数据库,employee 将事务挂了,用无事务方法执行,users受到异常抛出,触发事务回滚。如果你把employeeService.save()处理一下,两种表操作都写入成功了。

强制事务

    @Transactional(propagation = Propagation.MANDATORY)
    public void save(){
        Employee employee = new Employee();
        employee.setDepartmentId(2);
        employee.setName("Jorry");
        employee.setSalary(60000);
        repository.save(employee);
    }

    public void multSave(){
        Users users = getUsers();
        save(users);
        employeeService.save();
    }

执行结果: users 成功写入数据库,employee写入失败并且抛出异常

org.springframework.transaction.IllegalTransactionStateException: No existing transaction found for transaction marked with propagation 'mandatory'

嵌套事务

    @Transactional
    public void multSave(){
        Users users = getUsers();
        save(users);
        employeeService.save();
    }

    @Transactional(propagation = Propagation.NESTED)
    public void save(){
        Employee employee = new Employee();
        employee.setDepartmentId(2);
        employee.setName("Jorry");
        employee.setSalary(60000);
        repository.save(employee);
    }

执行结果: employeeService.save()抛出一个异常,好像JPA的实现不支持嵌套执行,这个看不到效果了。

org.springframework.transaction.NestedTransactionNotSupportedException: JpaDialect does not support savepoints - check your JPA provider's capabilities
查看原文

赞 2 收藏 1 评论 2

神易风 赞了文章 · 7月23日

必看!java后端,亮剑诛仙(最全知识点)

原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处。

你可能有所感悟。零散的资料读了很多,但是很难有提升。到处是干货,但是并没什么用,简单来说就是缺乏系统化。另外,噪音太多,雷同的框架一大把,我不至于全都要去学了吧。

这里,我大体根据基础、Java基础、Java进阶给分了下类,挑的也都是最常用最重要的工具。

这篇文章耗费了我大量的精力,你要是觉得好,请不要吝啬你的赞。如果你认同,可以关注我的微信公众号xjjdog,里面讲的就是这些内容。我会尝试更加系统化。

最新的内容会在github持续更新,添加新的精选相关文章。地址:

https://github.com/sayhiai/javaok

基础知识

数据结构

基本的数据结构是非常重要的,无论接触什么编程语言,这些基本数据结构都是首先要掌握的。具体的实现,就体现在java的集合类中。这些数据结构,就是这些复杂工具的具体原始形态,要烂记于心。

培训机构一般没有时间普及基础知识,通过算法和数据结构,“通常”能够一眼看出是否是经过培训。

常用算法

算法是某些大厂的门槛。毫无疑问,某些参加过ACM的应届生,能够秒杀大多数工作多年的码农。算法能够培养逻辑思维能力和动手能力,在刚参加工作的前几年,是非常大的加分项。但随着工作年限的增加,它的比重在能力体系中的比重,会慢慢降低。

算法的学习方式就是通过不断的练习与重复。不精此道的同学,永远不要试图解决一个没见过的问题。一些问题的最优解,可能耗费了某个博士毕生的精力,你需要的就是理解记忆以及举一反三。最快的进阶途径就是刷leetcode。

对于普通研发,排序算法和时间复杂度是必须要掌握的,也是工作和面试中最常用的。时间充裕,也可涉猎动态规划、背包等较高阶的算法知识,就是下图的左列。

书籍

《算法导论》
《编程之美》
《数学之美》

数据库基础 MySQL

MySQL是应用最广的关系型数据库。除了了解基本的使用和建模,一些稍底层的知识也是必要的。

MySQL有存储引擎的区别。InnoDB和MyISAM是最常用的,优缺点应该明晓。ACID是关系型数据库的基本属性,需要了解背后的事务隔离级别。脏读、幻读问题的产生原因也要了解。

为了加快查询速度,索引是数据库中非常重要的一个结构,B+树是最常用的索引结构。因字符集的问题,乱码问题也是经常被提及的。

专业的DBA通常能帮你解决一些规范和性能问题,但并不总是有DBA,很多事情需要后端自己动手。

书籍

《MySQL技术内幕——InnoDB存储引擎》
《高性能MySQL》
《高可用MySQL》

网络基础

网络通信是互联网时代最有魅力的一个特点,可以说我们的工作和生活,每时每刻都在和它打交道。

连接的三次握手和四次挥手,至今还有很多人非常模糊。造成的后果就是对网络连接处于的状态不慎了解,程序在性能和健壮性上大打折扣。

HTTP是使用最广泛的协议,通常都会要求对其有较深入的了解。对于Java来说,熟悉Netty开发是入门网络开发的捷径。

爬虫是网络开发中另外一个极具魅力的点,但建议使用python而不是java去做。

书籍

《HTTP权威指南》
《TCP/IP详解 卷一》

操作系统 Linux

科班出身的都学过《计算机组成机构》这门课,这非常重要,但很枯燥。结合Linux理解会直观的多。鉴于目前大多数服务器环境都是Linux,提前接触能够相辅相成。

需要搞清楚CPU、内存、网络、I/O设备之间的交互和速度差别。对于计算密集型应用,就需要关注程序执行的效率;对于I/O密集型,要关注进程(线程)之间的切换以及I/O设备的优化以及调度。这部分知识是开发一些高性能高可靠中间件的前提,无法绕过。

对于Linux,首先应该掌握的就是日常运维,包括常用命令的使用和软件安装配置。正则也是必须要掌握的一个知识点。

脚本编程对后端来说是一个非常大的加分项。它不仅能增加开发效率,也能在一些突发问题上使你游刃有余。

书籍

《UNIX环境高级编程(第3版)》
《鸟哥的Linux私房菜》
《Linux内核设计与实现》
《Linux命令行大全》

相关文章

《Linux上,最常用的一批命令解析(10年精选)》

Java基础

JVM

Java程序员的最爱和噩梦。以oracle版本为准,各个jvm版本之间有差别。JVM的知识包含两方面。一个是存储级别的,一个是执行级别的。

以存储为例,又分为堆内的和堆外的两种,各有千秋。垃圾回收器就是针对堆内内存设计的,目前最常用的有CMS和G1。JVM有非常丰富的配置参数来控制这个过程。在字节码层面,会有锁升级以及内存屏障一类的知识,并通过JIT编译来增加执行速度。

JVM还有一个内存模型JMM,用来协调多线程的并发访问。JVM的spec非常庞大,但面试经常提及。

另外,jdk还提供了一系列工具来窥探这些信息。包含jstat,jmap,jstack,jvisualvm等,都是最常用的。

书籍

《深入理解Java虚拟机》

JDK

现在,终于到了java程序员的核心了:JDK,一套依据jvm规范实现的一套API。我们平常的工作,就是组合这些API,来控制程序的行为。

jdk的代码非常庞大,内容也非常繁杂。最重要的大体包括:集合、多线程、NIO、反射、文件操作、Lambda语法等。这部分内容加上下面的SSM,基本上就是大多数小伙伴玩耍的地方。

假如说数据结构和算法是理论,这里就是支撑理论的实现。Java玩的好不好,就是说这里。

书籍

《Effective Java 中文版》
《数据结构与算法分析:Java语言描述》

SSM

你可能会用SSM开发项目,觉得编程无非就这些东西。设计模式烂记于心,IOC、AOP手到擒来。这里集中了大部分同行,有些可能到此为止就Ok了,因为有些同学接下来的重点是项目管理,而不是技术。

SSM最擅长的是Web开发。目前的表现形式逐渐多样化,随着前后端分离的盛行,Restful这种有着明确语义的模式逐渐流行。

书籍

《Head First 设计模式》
《Spring揭秘》
《SpringBoot揭秘》
《MyBatis技术内幕》
《深入剖析Tomcat》

其实跟着文档走一遍就行了,很多书籍就是翻译而已。

并发编程

现在的服务器都是多核的了,并发编程也来越多。java有多种创建多线程的方式,不过目前使用线程池的多一些。线程池的基础就是AQS,基于AQS,又有很多的工具类扩展。

java同时有很多加锁和线程同步的方式,锁有乐观锁/悲观锁之分,又有公平锁/非公平锁之分,写一段死锁代码还是有点难度的。

有两个问题被考察的频率非常高,一个是ABA,一个是伪共享。并发编程一般和网络编程配对,提供对某个问题的一系列解决方案。

这是java中一块难啃的骨头。

书籍

《Java核心技术系列:Java多线程编程核心技术》
《Java性能权威指南》
《Java并发编程实战》

性能优化 & 故障排查

有人认为这应该是SRE的范畴,但通常最熟悉业务的却是开发,技术并没有什么明显的界限。掌握这些内容,会让你在芸芸大众中脱颖而出。

从操作系统的内核优化到数据库的索引和事务优化,这部分的技能是建立在牢固的基础之上的。也就是操作系统的基础。

操作系统的每个组件都有可能出现问题,对于一个java后端来说,要能够非常容易的定位到这些问题。比如常见的内存溢出问题。

书籍

《性能之巅:洞悉系统、企业与云计算》
《高性能Linux服务器构建实战》

Java进阶

下面有些知识点,界限是非常模糊的。它们你中有我,我中有你,可以说是一个整体。

Redis

缓存可以说是计算机系统中应用最广泛的技术了。对于分布式缓存来说,最常用的就是Redis了。由于其数据结构丰富,被应用的场景越来越多。

基本的5种数据类型都知道,但你要说出其他几种,给人的印象就不一样了。Redis有主从和Cluster两种集群模式,高可用配置也不相同。

Redis几乎能适应除搜索外的所有互联网业务,对于其使用来说,一些规范限制是非常有必要的。一般速度越快的系统,越容易被长尾操作给拖死。所以,对于info命令的内容,也应有了解。

有三个点要尤其注意:分布式锁、限流,以及和源数据的同步问题。

书籍

《Redis实战 》
《Redis开发与运维》
《Redis设计与实现》

相关文章

《这可能是最中肯的Redis规范了》

Kafka

MQ是分布式系统中非常重要的组件,目前使用最广泛的就是Kafka。除了用在大数据场景中,Kafka也能够在业务系统中使用。

Kafka的速度非常快,根据ACK的级别配置,可靠性会增加,但速度会减缓。对于消息系统来说,监控报警是非常重要的一环,能够提前预知系统的问题。Kafka的集群自身就是高可用的,依赖Zookeeper组件,了解一些基本概念,包括ISR,能够更加详细的了解这个过程。

书籍

《Kafka入门与实践》
《Kafka技术内幕》

相关文章

《Kafka基础知识索引》

分库分表 ShardingJDBC

随着数据的增长,MySQL本身出现了瓶颈。分库分表是针对关系型数据库的一套解决方案,把它改造成分布式数据库。

根据切分层次,最像回事的是在代理层和驱动层进行切入。ShardingJDBC就是在驱动层的一个组件。

组件本身只是一个问题。在真正的切分之前,会有垂直拆分和水平拆分之分。我们的线上业务也要不停机的进行拆分和切换,一个全量和增量同步工具都是需要的。

有条件经历这个过程的,都是一笔宝贵的财富。它不仅在技术上,而且在流程上都有诸多挑战。你会体验到技术、流程、管理,是不分家的。

相关文章

《“分库分表" ?选型和流程要慎重,否则会失控》

微服务 & 中间件

目前最火的微服务架构就是SpringCloud。这对熟悉SSM开发的同学来说, 是非常容易上手的。微服务有注册中心、RPC、负载均衡、熔断限流、网关等关键组件,有些组件有很多不同的替代品。

微服务拆分后又引申出一些列问题,需要一些其他中间件支持。比如监控报警、ELKB、配置中心、调度中心、调用链等。虽然没有微服务也需要它们,但明显组合起来,效果会好的多。
各种A/B测试,金丝雀,灰度等,基本是终极目标之一。
微服务是一个复杂的整体,同时融合了技术和流程管理方面的内容。

书籍

《可伸缩服务架构:框架与中间件》
《Spring Cloud与Docker微服务架构实战》
《架构修炼之道》

分布式

当服务器数量增加,一些服务,包括上面提到的微服务,都需要进行协调和交互。这就是分布式系统。

分布式的理论基础有CAP、BASE等。针对一致性,有特别多的算法,其中Raft作为易懂的新贵,使用越来越广泛。

这部分侧重于理论,一旦开始进入实践,写出来的都是些大家伙。这里有一篇文章,虽然不是很全,聊表心意吧。

相关文章

《也浅谈下分布式存储要点》

书籍

《NoSQL精粹》
《ZooKeeper:分布式过程协同技术详解》
《从Paxos到Zookeeper分布式一致性原理与实践》

支撑技术

基本运维

我倾向于基础架构和运维不分家,因为它们有太多重合和相似的地方。基本运维和架构配合起来,典型的特点就是平台化+规范化。

这里是检验综合素质的地方,有广度也有深度。

相关文章

《这么多监控组件,总有一款适合你》

书籍

《奔跑吧Ansible》
《Docker——容器与容器云》
《Kubernetes权威指南》
《Jenkins权威指南》
《深入理解Nginx》

安全

安全无小事,建筑工地和系统安全一样的道理。熟悉一些常用的攻击和加密解密算法是必要的。

就像是你给家里的门上锁:能够阻挡大部分心怀不轨的人,但无法阻挡无所顾忌的暴徒。

End

你可能发现并没有自己关注的组件。这不奇怪,比如个人喜欢的的ES,就找不到一个合适的位置。这里只是最主要的一点内容,就已显繁杂,一个大杂烩并不见得好。

值得提醒的是,这些知识,是众多发展路线上的一个分支。可能有的朋友,目前只在其中的一个点上面奋斗,缺乏所谓的广度;也可能有的朋友,有着全栈的标签,却做着SSM的工作。不同的公司需要的技术水平不尽相同。一个专注ERP业务的公司,会在项目管理上多些文章;一个专做IM的团队,可能对网络开发滚瓜烂熟。

再次强调。此技术要点为个人整理,为了修复认知上的偏差,我会维护一个github项目,实时跟进分类和增加新的相关文章(欢迎提交PR)。如果你有什么想法,请尽快反馈给我,非常感谢。

作者简介:小姐姐味道 (xjjdog),一个不允许程序员走弯路的公众号。聚焦基础架构和Linux。十年架构,日百亿流量,与你探讨高并发世界,给你不一样的味道。我的个人微信xjjdog0,欢迎添加好友,​进一步交流。​
近期热门文章​

《必看!java后端,亮剑诛仙》
后端技术索引,中肯火爆

《Linux上,最常用的一批命令解析(10年精选)》
CSDN发布首日,1k赞。点赞率1/8。

《这次要是讲不明白Spring Cloud核心组件,那我就白编这故事了》
用故事讲解核心组件,包你满意

《Linux生产环境上,最常用的一套“Sed“技巧》
最常用系列Sed篇,简单易懂。Vim篇更加易懂。

查看原文

赞 13 收藏 10 评论 0

神易风 发布了文章 · 7月22日

MySQL排名函数实现

本文出处MySQL排名函数实现
转载请说明出处

现在有个需求对所有学生分数进行排名,并且列出名次。刚看到这个需求,我有点懵逼,完全没有思路😂,为什么难一点需求,我就不会做呢😔 去网上查询资料,把所有实现都列出来,全部都要学会。

数据库准备

创建一个分数表s_score

CREATE TABLE `s_score`  (
  `id` int NOT NULL AUTO_INCREMENT,
  `score` int NOT NULL DEFAULT 0,
  `name` varchar(20) CHARACTER SET utf8mb4 NULL,
  PRIMARY KEY (`id`)
);

插入数据

INSERT INTO `s_score` (`name`, `score`) VALUES
('张三', 80),
('小明', 90),
('小红', 60),
('李四', 70),
('赵武', 80),
('梁晨', 87),
('小绿', 69),
('威廉', 69),
('大卫', 91),
('王五', 96),
('赵六', 96),
('小五', 80),
('小龙', 88);

普通实现

在MySQL8.0推出Rank排名函数RANK,完全支持这种需求,但是必须MySQL8.0 以上版本才支持这个特性。8.0以下的版本有什么方法实现呢,使用用户变量,记录名次。
用户变量:以"@"开始,形式为"@var_name",以区分用户变量及列名。它可以是任何随机的,复合的标量表达式,只要其中没有列指定。下面写一个小例子,展示如何使用用户变量

select @a:=1 a,@b:=@a+1 b

执行结果

ab
12

:= 是赋值的意思,与编程语言赋值有点区别。下面开始展示使用简单SQL实现RANK排名函数效果

用户变量简单实现名次显示

SELECT name,score, @rank:=@rank+1 `rank` from s_score s,(select @rank:=0) q ORDER BY score desc
namescorerank
赵六961
王五962
大卫913
小明904
小龙885
梁晨876
小五807
张三808
赵武809
李四7010
威廉6911
小绿6912
小红6013

并排名次展示

现在还有一个问题,出现分数相同,并列排名,名次应该相同。我们使用一个temp变量来记录前一个分数值,判断前面分数是否与当前相等,相等直接返回上一个排名情况,否则排名+1。

select name,score,case when @temp_score=score then @rank when @temp_score:=score then @rank:=@rank+1 END 
   `rank` from s_score s,(select @rank:=0,@temp_score:=NULL) q ORDER BY score desc
namescorerank
赵六961
王五961
大卫912
小明903
小龙884
梁晨875
小五806
张三806
赵武806
李四707
威廉698
小绿698
小红609

并排名次跳过

如果出现并列排名,下一个名次将自动跳过,比如出现两个并列第一,91应该变成第三名了,名次和人数相对应。

SELECT name,score,rank from (
SELECT name ,score,@rank :=IF( @temp_score = score, @rank, 
@rank_incr ) `rank`,@rank_incr := @rank_incr + 1,
    @temp_score := score FROM s_score s,(SELECT@rank := 
    0,@temp_rank := NULL,@rank_incr := 1 ) q ORDER BY score 
    DESC) a
namescorerank
赵六961
王五961
大卫913
小明904
小龙885
梁晨876
小五807
张三807
赵武807
李四7010
威廉6911
小绿6911
小红6013

使用SQL窗口函数

窗口函数的基本语法如下:

select 排序函数/聚合函数 over (<partition by ...> 分区字段 order by 排序字段)

注意over 后面有一个空格的,这个语法有点蛋疼,我自己试了十几次才书写成功。
根据维基百科解释:窗口函数允许在当前记录之前和之后访问记录中的数据。窗口函数定义一或一列窗口,其中当前行周围具有给定的长度,并跨窗口中的数据集执行计算。可以这样理解,窗口就是数据集合,函数就是计算数据方法。

partiton by是可选的。如果不使用partition by,那么就是将整张表作为一个集合,最后使用排序函数得到的就是每一条记录根据排序列的排序编号。
排序函数主要有rank()、dense_rank、row_number,他们主要区别:

  • rank(): 对同一个字段排序,出现相同时,会并列排名,并且会出现排名间隙。
  • dense_rank() : 对同一个字段排序,出现相同时,会出现并列排名,排名连续的
  • row_number(): 对同一个字段排序,排名是联系的,即使出现相同,不会并列排名次
    select name,score, RANK() over (ORDER BY score DESC) `rank`,ROW_NUMBER() over (order by score DESC) `row`,
    DENSE_RANK()over (ORDER BY score DESC) `dense` from s_score
    
namescorerankrowdense
赵六96111
王五96121
大卫91332
小明90443
小龙88554
梁晨87665
赵武80776
小五80786
张三80796
李四7010107
小绿6911118
威廉6911128
小红6013139

以上就是排序名次全部实现方式了,还有其他实现方式,麻烦在评论里补充一下。

参考文档

https://cloud.tencent.com/developer/article/1562954
https://www.jianshu.com/p/bb1b72a1623e

查看原文

赞 10 收藏 5 评论 8

神易风 发布了文章 · 6月16日

AbstractQueuedSynchronizer原理解析

本文出处AbstractQueuedSynchronizer原理解析
转载请说明

AbstractQueuedSynchronizer简称AQS是Java大部分Lock、Semaphore、CountDownLatch等公共依赖框架,实现依赖于先进先出(FIFO)等待队列的阻塞锁。读懂它的代码原理有利我们去理解Java Lock衍生类原理,帮组我们开发自定义Lock。

主要原理

线程队列.png
由上图所示,在队列内的元素都为执行线程,在队列头部head就是获取到独占锁的执行线程,其他线程都在队列中排队沉睡中,想要获取锁的线程会从tail中加入队列。当head释放锁了,会将next线程唤醒,去获取锁,成功获取后再将head指向next线程。这里主要简单说一下基本原理,具体怎么操作我们一起进入代码讲解吧。

Node内部类属性

    static final class Node {
        /** 标记当前节点共享锁标记 */
        static final Node SHARED = new Node();
        /** 标记当前节点独占锁标记 */
        static final Node EXCLUSIVE = null;

        /** 中断或者超时退出锁竞争   */
        static final int CANCELLED =  1;
        /**  锁即将被释放,这时需要唤醒下一个获取线程同时将head节点指向下一个节点 */
        static final int SIGNAL    = -1;
        /** 同步队列condition     */
        static final int CONDITION = -2;
        //共享锁
        static final int PROPAGATE = -3;

        //线程等待状态 分别对应上面4种状态
        volatile int waitStatus;
        //前置结点引用
        volatile Node prev;
        //后置结点引用
        volatile Node next;

        volatile Thread thread;
        //下一个需要唤醒结点
        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        Node() {}

        /** Constructor used by addWaiter. */
        Node(Node nextWaiter) {
            this.nextWaiter = nextWaiter;
            THREAD.set(this, Thread.currentThread());
        }

        /** Constructor used by addConditionWaiter. */
        Node(int waitStatus) {
            WAITSTATUS.set(this, waitStatus);
            THREAD.set(this, Thread.currentThread());
        }

AbstractQueuedSynchronizer 内部属性


    /**
     * 队列头
     */
    private transient volatile Node head;

    /**
     * 队列尾
     */
    private transient volatile Node tail;

    /**
     * 同步状态,根据设置这个值来获取锁,默认线程都是从0开始
     */
    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

    protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);
    }

进入ReentrantLock,获取锁的时候,调用AQS那个方法

    public void lock() {
        sync.acquire(1);
    }

Sync是ReentrantLock内部类,继承自AbstractQueuedSynchronizer,用于实现公平锁和非公平锁。

acquire

    public final void acquire(int arg) {
         //尝试去获取锁
        if (!tryAcquire(arg) && 
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  //获取锁失败先初始化队列再排队
            selfInterrupt(); //设置当前线程中断
    }

主要流程先获取锁,如果失败了进入队列中排队。

acquireQueued

    final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            for (;;) { //自旋 
                final Node p = node.predecessor(); //前置结点
                if (p == head && tryAcquire(arg)) {//前置节点为head,下一个即将获得锁 尝试去获取锁
                    setHead(node);  //获取成功了,将head 指向node
                    p.next = null; // help GC  
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node))  //判断前置节点waitStatus 是不是 SIGNAL,如果不是不会挂起线程
                    interrupted |= parkAndCheckInterrupt(); //挂起线程,当线程被唤醒将返回线程中断状态,并且清理中断
            }
        } catch (Throwable t) {
            cancelAcquire(node); //唤醒线程,移除出堵塞队列
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }

acquireQueued 方法要么获取锁成功跳出循环,要么出现异常进入异常处理逻辑。

shouldParkAfterFailedAcquire

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) //线程准备被唤醒
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) { //大于0都是CANCELLED 状态  线程要退出锁竞争 
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node; //删除 ws = CANCELLED的 队列节点
        } else { // 0 或者PROPAGATE都改成 
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }

这个方法就两个作用,比较waitStatus或者设置,将node结点向前移动。
当p=head 相等,有可能是head还是空,队列还没有初始化成功,这个时候才一次去获取锁。还有就是node就head后置结点,或者node重入尝试获取锁,获取锁成功了,重新设置头结点。

cancelAcquire

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null) 
            return;

        node.thread = null;

        Node pred = node.prev;
        while (pred.waitStatus > 0)  //跳过状态大于0线程
            node.prev = pred = pred.prev;

        Node predNext = pred.next;

        //将当前结点设置成CANCELLED状态,退出锁竞争
        node.waitStatus = Node.CANCELLED;

        //如果node是tail,直接将前置结点设置成tail ,
        if (node == tail && compareAndSetTail(node, pred)) {
            pred.compareAndSetNext(predNext, null); //将tail.next = null 
        } else {
            int ws;
             // 向判断前置结点不能是head,在判断状态是SIGNAL,再判断node后继结点状态是否合法
            // 所以条件都成立,将前后两个节结点关联起来,相当于直接删除自己。
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    pred.compareAndSetNext(predNext, next);//前后结合
            } else {
                unparkSuccessor(node); //唤醒后置结点
            }

            node.next = node; // help GC
        }
    }

这个方法主要任务就寻找状态合法前置结点,设置线程状态CANCELLED,退出线程锁竞争,找到后置线程,将前后指引关联起来,从队列中删除自己。但是有可能前置节点head,这时就需要唤醒后置节点,删除状态为CANCELLED节点,获取锁

unparkSuccessor

看下怎么唤醒线程的

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0) //因为要唤醒下一个线程,应该要清除SIGNAL 状态
            node.compareAndSetWaitStatus(ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev) //从后往前遍历,找到最靠近结点
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            LockSupport.unpark(s.thread); //唤醒线程
    }

addWaiter

    private Node addWaiter(Node mode) {
        Node node = new Node(mode); 

        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {//tail 已经存在,将线程加入tail
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {
                initializeSyncQueue(); //初始化队列head,tail 属性
            }
        }
    }

根据给定的模式去创建排队节点,mode只要分成两个模式Node.EXCLUSIVE排他锁和Node.SHARED共享锁。
tzIwuD.png
结合上面三个方法分析,对获取锁失败有了些简单了解。线程使用tryAcquire去获取锁,如果获取失败了,进入排队方法,此时先判断队列tail存在,已经存在直接尾插法插入,否则先初始化队列tail和head。这里知道队列初始化不是由先获取到锁去初始化的,而是由竞争失败的线程去创建队列,这样做是为了性能吧。acquire会根据前置节点为head的情况下不断去获取锁,设置一个不间断的锁竞争趋势。然后将当前线程挂起,当线程被唤醒,第一个要去做的就是去获取锁成功,跳出acquire循环。返回线程中断状态,只有当线程中断状态存在,再次调用线程中断。
本篇开头线程队列图片在有些情况下,并不是图片显示情况那样的。在队列刚开始初始化的时候,head节点并不是获取锁的节点,head只是随机创建Node对象,和tail都是同一个对象。当有线程从tail.next关联,进入队列,也简介和head建立起关系了,只要head最靠近的结点获取到锁后,将head指向自己才会往开篇队列图片方向走。

分析tryAcquire

tryAcquire是获取锁唯一实现,主要有子类实现。主要因为AQS是支持独占锁和共享锁、是否支持重入,更适合由子类去实现获取锁逻辑。进入ReentranLock了解公平锁和非公平锁如何实现tryAcquire,这两种情况一起分析下。

非公平锁

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {
        @ReservedStackAccess
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {//state 第一次竞争锁
                if (compareAndSetState(0, acquires)) { //交换比较state成功,算是获取锁成功了
                    setExclusiveOwnerThread(current);//设置属性变量
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) { //锁重入,只对state += acquires 处理
                int nextc = c + acquires;
                if (nextc < 0) // int 越界
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
}

非常简单,使用CAS(交换比较)最先设置成功线程,就算获取成功,在判断获取锁线程是否是占有锁线程,支持重入锁。

公平锁

    static final class FairSync extends Sync {
        @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

主要区别多了一个hasQueuedPredecessors判断,进入方法分析下

    public final boolean hasQueuedPredecessors() {
        Node h, s;
        if ((h = head) != null) {//队列已经初始化
            //大于0就是CANCELLED ,s会被移除出队列,下一个节点不可以了,获取下下个节点
            if ((s = h.next) == null || s.waitStatus > 0) { 
                s = null; // traverse in case of concurrent cancellation
                for (Node p = tail; p != h && p != null; p = p.prev) { //从尾链一直向前遍历
                    if (p.waitStatus <= 0) //waitStatus 状态正常
                        s = p;
                }
            }
            if (s != null && s.thread != Thread.currentThread()) //获取锁的线程只要不是下一个即将去获取锁的线程
                return true;
        }
        return false;
    }

先判断head节点是否初始化了,如果没有直接返回false。已经存在直接获取head节点后继节点,只有判断节点线程和当前线程不相等就返回true。这是为什么呢? 想一下锁开始释放的时候,唤醒下一个节点去获取锁,这时候有一个线程也去获取锁,有可能抢占了队列中排队节点获取到锁,相当于插队这是"不公平"的。
公共和不公平主要区别就是在获取锁的时候先判断队列中是否有排队线程,如果存在,直接获取失败,入列排队,强制保证排队最长队列最先获取到锁的原则。公平锁会比非公平锁多一点点性能消耗,但是影响不是很大的,在平常开发上使用公平锁也是一个不错选择,毕竟大家都是选择追求公平的😝。

释放锁

ReentranLock.unlock()的代码

    public void unlock() {
        sync.release(1);
    } 

release

    public final boolean release(int arg) {
        if (tryRelease(arg)) { //这个需要子类去实现
            Node h = head;
            if (h != null &&stat h.waitStatus != 0) //状态不能为0 
                unparkSuccessor(h); //唤醒下一个队列线程
            return true;
        }
        return false;
    }

h.waitStatus = 0 就是状态还是默认状态,我们知道head状态是由shouldParkAfterFailedAcquire修改的,这时候线程还在自旋获取锁,不需要唤醒。

tryRelease

ReentranLock的tryRelease 是公平锁和非公平锁的释放锁的实现。

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread()) //释放锁线程必须是拥有者线程,不然直接抛出异常
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) { //state 必须等于0了,才算真正释放锁,对应了锁重入 state计算
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

每次调用tryRelease(),只要不是非法线程都可以让state - releases,又不会唤醒线程,只要state=0了,才真正释放锁,设置占有线程为null。唤醒队列中等待线程。
一个ReentranLock获取锁释放锁流程就已经走完了,但是AbstractQueuedSynchronizer中还有很多方法,本着解析这类来的,把其他功能也分析一下吧。

lockInterruptibly

lockInterruptibly在获取锁的时候或者入列排队等待,线程可以被中断强制退出锁竞争。这个是ReentrantLock才有的功能,synchronized关键字并不支持获取锁中断。
ReentrantLock 代码

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

acquireInterruptibly

看一下acquireInterruptibly是怎么样的

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) //已经已经中断了,并且清理中断状态
            throw new InterruptedException();
        if (!tryAcquire(arg)) //获取锁失败
            doAcquireInterruptibly(arg); //失败 入列挂起等待
    }

看下入队线程怎么支持中断的

doAcquireInterruptibly

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE); //加入队列当前结点 
        try {
            for (;;) { //自旋
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return;
                }
                // 先检查前置结点状态,移除无效线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) //挂起线程  返回线程中断状态
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

获取锁流程和lock方法基本上一样,在获取锁之前会判断线程中断,并且去处理它。在入队的线程,线程已经被挂起是不能处理中断的,只要当线程被唤醒了,直接抛出异常,退出锁竞争。

获取锁超时

ReentrantLock 有个方法可以设置在指定时间内去获取锁,避免等待获取锁的时候,线程被一直堵塞下去。通过设置超时时间,在锁获取失败了可以让调用者自己去处理。直接进入代码讲解

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

看下 AQS怎么实现

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) || //获取失败了 进入下面方法
            doAcquireNanos(arg, nanosTimeout);
    }

doAcquireNanos

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout; //超时时间戳
        final Node node = addWaiter(Node.EXCLUSIVE); //入队
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L) { //已经超时
                    cancelAcquire(node); // 退出队列
                    return false;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) //大于1000 ns 线程才需要被挂起 不然时间太短了,还没有挂起就已经结束了
                    LockSupport.parkNanos(this, nanosTimeout); //挂起线程,指定时间会比唤醒
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

根据百度上描述纳秒,计算机执行一个执行一道指令(如将两数相加)约需2至4纳秒,太小时间间隔就没有意义了。这个方法比正常方法多了超时判断和超时唤醒线程,其他都一样。

ConditionObject

ConditionObject是AbstractQueuedSynchronizer内部类,实现Condition接口主要功能堵塞线程和唤醒堵塞,有点类似wait、notfiy、notifyAll,一般都是用在同步队列的生产者和消费者的线程堵塞唤醒。先解析Condition 接口每个方法含义,在具体分析方法实现。

Condition

public interface Condition {

    // 使当前线程等待,直到收到信号或者中断
    void await() throws InterruptedException;

   //使当前线程等待,直到收到信号
    void awaitUninterruptibly();

    // 使当前线程等待,直到收到信号或者超过指定时间或者发出中断
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    // 同上
    boolean await(long time, TimeUnit unit) throws InterruptedException;

   //当前线程等待,直到收到信息或者中断或者超过指定时间
    boolean awaitUntil(Date deadline) throws InterruptedException;

    // 唤醒单个等待线程
    void signal();

    //唤醒全部等待线程
    void signalAll();

在解析实现类之前,要先知道ConditionObject内部属性

    public class ConditionObject implements Condition, java.io.Serializable {
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        public ConditionObject() { }

内部属性就两个,头结点、尾结点,下面开始分析实现方法。

await

        public final void await() throws InterruptedException {
            if (Thread.interrupted())  //返回线程中断状态,并且清理中断信号
                throw new InterruptedException();
            Node node = addConditionWaiter();  //线程进入等待队列中
            int savedState = fullyRelease(node);  //执行释放锁功能,返回state 值
            int interruptMode = 0;
            //  //node 是否在队列中  
            while (!isOnSyncQueue(node)) { //不在队列 挂起线程 
                LockSupport.park(this);  
                //当线程被唤醒 清理中断状态    重新加入等待队列中
                //当线程被唤醒,判断是否存在中断信号,并且返回给interruptMode 
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //在队中自旋获取锁    
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters(); //清除线程状态不为CONDITION 
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
  1. 先判断线程是否存在中断,中断则直接抛出异常。
  2. 将Node加入有ConditionObject 维护的单向链表,这个列表主要用在队列唤醒,和获取锁的队列并不冲突。
  3. 释放锁,刚开始有点想不明白这里的,这个要结合同步队列来理解。当线程被挂起的时候必须要释放锁,不然变成死锁了。生产者获取不到锁,不能插入数据,就无法唤醒消费者。
  4. while 循环处理,只要node已经加入线程队列参与锁的竞争了,才会退出循环,或者先将当前线程挂起。当线程被唤醒了,判断线程是否因为中断而被唤醒,如果是就直接跳出while 循环。
  5. 因为已经加入队列,现在可以去获取锁 。
  6. 此时node结点已经获取到锁了waitStatus 已经产生变化了,需要清除单向链表中状态不合法结点包括它自己。
  7. 当interruptMode 不等于0,则说明有中断需要处理,需要调用者自己去处理。

addConditionWaiter

        private Node addConditionWaiter() {
            if (!isHeldExclusively()) //不是当前锁拥有者
                throw new IllegalMonitorStateException();
            Node t = lastWaiter;
            // 如果lastWaiter 不是CONDITION状态了,移除出队列,CONDITION是同步队列专属状态
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();  //将状态不等于CONDITION 移除nextWaiter 队列
                t = lastWaiter;
            }

            Node node = new Node(Node.CONDITION); //创建结点,设置waitStatus 为CONDITION

            if (t == null) //头节点还没初始化
                firstWaiter = node;
            else
                t.nextWaiter = node; //node不是头就是尾结点
            lastWaiter = node; 
            return node;
        }

nextWaiter队列.png
在ConditioinObject中会维护一个nextWaiter连起来的单向condition链表,这个链表主要特性就是,所以Node.waitStatus 必须是Node.CONDITION,会将链表头部结点、尾部结点放入lastWaiter、firstWaiter结点中。

unlinkCancelledWaiters

        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) { //从头开始遍历
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null; //切断 t 和后面结点关联,下面方便删除
                    if (trail == null)  //将头节点删除,下一个结点就是头结点
                        firstWaiter = next;
                    else 
                        trail.nextWaiter = next; 
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

遍历整个condition链表,将Node.waitStatus != Node.CONDITION删除。

isOnSyncQueue

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null) //node 刚释放锁 还没进一次进入队列
            return false;
        if (node.next != null) //前置结点不为空,肯定在队列中
            return true;
        return findNodeFromTail(node); //从tail 向前查找node 找到返回true
    }

checkInterruptWhileWaiting

        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
        }

检查线程中断,并且清理它。没有中断直接返回0,不会执行transferAfterCancelledWait。中断了就会执行执行transferAfterCancelledWait并且返回一个中断结构。

  • REINTERRUPT 在等待退出时中断退出
  • THROW_IE 抛出一个InterruptedException 异常退出

transferAfterCancelledWait

    final boolean transferAfterCancelledWait(Node node) {
        if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { //waitStatus CONDITION => 0 设置成功
            enq(node); //加入队列
            return true;
        }

        while (!isOnSyncQueue(node)) //node 不在队列中
            Thread.yield(); //线程让出执行权 等待node进入队列后 跳出自旋
        return false;
    }

根据之前获取锁的代码,加入队列结点waitStatus 都是默认值0,只要aitStatus CONDITION => 0 设置成功才能加入队列,参与锁竞争。设置失败就会自旋判断node 是否进入队列中,必须进入队列中才会退出这个方法。

reportInterruptAfterWait

        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE) // 重新抛出中断异常,交给调用者处理
                throw new InterruptedException();xin
            else if (interruptMode == REINTERRUPT)  //退出时中断退出
                selfInterrupt(); //调用线程中断
        }

根据不同中断情况,做出不同处理。

signal

        public final void signal() {
            if (!isHeldExclusively()) //必须是独占锁线程执行
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null) //头结点不为空,说明condition队列存在,可以去唤醒
                doSignal(first); //唤醒线程
        }

看下唤醒线程如何实现的

doSignal

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)  //下一个结点为空,链表已经到头了
                    lastWaiter = null;
                first.nextWaiter = null;
                //这个方法就是主要唤醒逻辑
            } while (!transferForSignal(first) && 
                     (first = firstWaiter) != null);
        } 

transferForSignal

    final boolean transferForSignal(Node node) {
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))  //加入队列 waitStatus=0
            return false;
        //加入队列竞争锁
        // p 是 node 前置结点
        Node p = enq(node); 
        int ws = p.waitStatus;
        // ws 大于0  p会退出队列,可以唤醒后置结点
        // p 设置状态失败,p结点可以已经不存在了,这时候应该唤醒线程去获取锁了。
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            LockSupport.unpark(node.thread); //唤醒node 的 线程
        return true; //唤醒成功返回true 
    }

这里唤醒线程的条件,是前置结点是否可用,只要当前置结点准备退出队列,或者已经被删除了。node被唤醒获取锁。
剩下几个方法,我不打算在写了,内容太多了。有兴趣自己去了解,内容都是差不多重复的。

总结

本篇通过获取锁释放锁原理去解析AbstractQueuedSynchronizer内部源码原理,也分析了公平锁和非公平锁的区别。讲解一些衍生功能的锁,如超时锁,中断锁,算是比较全部解析了AbstractQueuedSynchronizer作为Java ReentrantLock这类同位锁的框架支持,也简单分析了ConditionObject 实现线程协调signal、await方法。这篇文章大部分内容都是我自己思考想出来,如果有那些地方说错,请出来大家一起讨论。

查看原文

赞 0 收藏 0 评论 0

神易风 赞了文章 · 6月10日

JVM_类加载机制详解

Class 文件的装载流程 (类加载过程)

加载 -> 连接 (验证 -> 准备 -> 解析) -> 初始化 -> 使用 -> 卸载

加载

加载阶段,jvm 会通过类名获取到此类的字节码文件(.class 文件),
然后将该文件中的数据结构转存到内存里(转化为运行时方法区内的数据结构),
最后在堆中生成一个代表该类的 Class 对象,用于后期使用者创建对象或者调用相关方法。

验证

验证阶段用于保证 Class 文件符合 jvm 规范,如果验证失败会抛出 error。

准备

在该阶段虚拟机会给类对象的静态成员变量配置内存空间,并赋初始值。

解析

将类/接口/字段/方法中的号引用替换为直接引用。

初始化

虚拟机会调用类对象的初始化方法来进行类变量的赋值。

Class 文件被装载的条件

必须要有类去主动使用该 Class。
方式有:
使用 new 关键字、反射、克隆、反序列化;
调用类的静态方法;
调用一个类的子类的时候会初始化其父类;
包含 main() 方法的类。

被动使用则不会去装载 Class。
方式有:
调用了其父类的静态方法。

总结:

jvm 秉持了实用主义理念,对于没有用到的 Class 不会进行装载。
但是在 java 代码的启动环节会加载一些使用到的类。

加载器种类

启动类加载器(Bootstrap ClassLoader):

在 jdk8 中用来加载 jvm 自身需要的类,c++ 实现,用来加载 rt.jar。
在 jdk9 之后的 jdk 中,Bootstrap ClassLoader 主要用来加载 java.base 中的核心系统类。

扩展类加载器(ExtClassLoader):

jdk8 中用来加载 ${JAVA_HOME}/lib/ext 目录下的类。
在 jdk9 中已经被移除。

模块加载器(PlatformClassLoader):

jdk9 之后用来代替 ExtClassLoader 的加载器,用来加载 jdk 中的非核心模块类。

应用程序类加载器(AppClassLoader):

用来加载一般的应用类。

自定义加载器:

使用者自己定义的,一般继承 java.lang.ClassLoader 的类。

双亲委派机制

任意一个 ClassLoader 在尝试加载一个类的时候,都会先尝试调用其父类的相关方法去加载类,如果其父类不能加载该类,则交由子类去完成。

这样的好处:对于任意使用者自定义的 ClassLoader,都会先去尝试让 jvm 的 Bootstrap ClassLoader 去尝试加载(自定义的 ClassLoader 都继承了它们)。那么就能保证 jvm 的类会被优先加载,限制了使用者对 jvm 系统的影响。

源码

源码探究使用 jdk11,与 jdk8 中的有些许不同。

ClassLoader

ClassLoader 是类加载器的顶级父类,其核心的方法主要是 loadClass(...) 方法:

// ClassLoader.class
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException{
    // 加锁,保证线程安全
    synchronized (getClassLoadingLock(name)) {
        // 先去找一次 class 是否已经被加载了,如果已经被加载了就不用重复加载了
        // 此方法的核心逻辑由 c++ 实现
        Class<?> c = findLoadedClass(name);
        // 没有被加载的情况
        if (c == null) {
            long t0 = System.nanoTime(); // 记录时间
            try {
                // 此处体现双亲委派机制
                // 如果该加载器存在父加载器,就会先去调用父加载器的相关方法
                // 如果没有父加载器,就去调用 Bootstrap 加载器
                if (parent != null) {
                    c = parent.loadClass(name, false);
                } else {
                    // 调用 BootstrapClassLoader,此方法的核心逻辑是 c++ 实现的
                    c = findBootstrapClassOrNull(name);
                }
            } catch (ClassNotFoundException e) {

            }

            // 如果依旧加载不到,那么就说明父加载器仍然加载不到信息
            // 那么就需要指定的加载器自己去加载了
            if (c == null) {

                long t1 = System.nanoTime();
                
                // 该加载器加载类文件的核心逻辑
                // 该方法在 ClassLoader 中是留空的,需要子类按照自身的逻辑去实现
                c = findClass(name);

                // 此处做一些信息记录,和主逻辑无关
                PerfCounter.getParentDelegationTime().addTime(t1 - t0);
                PerfCounter.getFindClassTime().addElapsedTimeFrom(t1);
                PerfCounter.getFindClasses().increment();
            }
        }
        
        if (resolve) {
            // 解析 class,也是留空的,需要子类去实现
            resolveClass(c);
        }
        return c;
    }
}

BuiltinClassLoader

BuiltinClassLoader 是 jdk9 中代替 URLClassLoader 的加载器,是 PlatformClassLoader 与 AppClassLoader 的父类。其继承了 SecureClassLoader,其核心的方法主要是 loadClassOrNull(...) 方法:

// BuiltinClassLoader.class

// step 1
@Override
protected Class<?> loadClass(String cn, boolean resolve) throws ClassNotFoundException{
    // 复写了 loadClass(...) 方法,但是核心是调用 loadClassOrNull(...)
    Class<?> c = loadClassOrNull(cn, resolve);
    if (c == null)
        throw new ClassNotFoundException(cn);
    return c;
}

// step 2
protected Class<?> loadClassOrNull(String cn, boolean resolve) {
    // 加锁,保证线程安全
    synchronized (getClassLoadingLock(cn)) {
        // 先去找一次 class 是否已经被加载了,此方法是 ClassLoader 中的
        Class<?> c = findLoadedClass(cn);

        if (c == null) {

            // 这里会需要去先加载模块信息
            LoadedModule loadedModule = findLoadedModule(cn);
            if (loadedModule != null) {
                BuiltinClassLoader loader = loadedModule.loader();
                if (loader == this) {
                    if (VM.isModuleSystemInited()) {
                        c = findClassInModuleOrNull(loadedModule, cn);
                    }
                } else {
                    c = loader.loadClassOrNull(cn);
                }
            } else {

                // 先调用父加载器的相关方法去加载一次
                if (parent != null) {
                    c = parent.loadClassOrNull(cn);
                }

                // 如果没加载到,则用当前加载器去加载
                if (c == null && hasClassPath() && VM.isModuleSystemInited(){
                    // 此方法内会调用到 defineClass(...) 方法去加载类文件
                    c = findClassOnClassPathOrNull(cn);
                }
            }

        }

        // 解析 class
        if (resolve && c != null)
            resolveClass(c);

        return c;
    }
}

该加载器中还有一个加载 class 字节码的方法:

// BuiltinClassLoader.class
private Class<?> defineClass(String cn, Resource res) throws IOException{
    
    URL url = res.getCodeSourceURL();

    // 先解析这个 class 的路径
    int pos = cn.lastIndexOf('.');
    if (pos != -1) {
        String pn = cn.substring(0, pos);
        Manifest man = res.getManifest();
        defineOrCheckPackage(pn, man, url);
    }

    // 这里会将 class 读取出来成一个 byte[] 字符串,并通过 jvm 的相关方法去加载
    ByteBuffer bb = res.getByteBuffer();
    if (bb != null) {
        CodeSigner[] signers = res.getCodeSigners();
        CodeSource cs = new CodeSource(url, signers);
        // 该方法最后会调用 ClassLoader 内的 native 方法
        return defineClass(cn, bb, cs);
    } else {
        byte[] b = res.getBytes();
        CodeSigner[] signers = res.getCodeSigners();
        CodeSource cs = new CodeSource(url, signers);
        // 该方法最后会调用 ClassLoader 内的 native 方法
        return defineClass(cn, b, 0, b.length, cs);
    }
}

BootClassLoader

BootClassLoader 是 ClassLoaders 的一个静态内部类,虽然它从代码实现上是 BuiltinClassLoader 的子类,但是从功能上说它是 PlatformClassLoader 的 parent 类:

// ClassLoader.class
private static class BootClassLoader extends BuiltinClassLoader {
    BootClassLoader(URLClassPath bcp) {
        super(null, null, bcp);
    }

    // 复写了 BuiltinClassLoader 中的 loadClassOrNull(...) 方法
    @Override
    protected Class<?> loadClassOrNull(String cn) {
        return JLA.findBootstrapClassOrNull(this, cn);
    }
};

PlatformClassLoader

PlatformClassLoader 也是 ClassLoaders 的一个静态内部类,从功能上说它是 BootClassLoader 的子类,同时也是 AppClassLoader 的 parent 类。PlatformClassLoader 主要用来加载一些 module:

// ClassLoader.class
private static class PlatformClassLoader extends BuiltinClassLoader {
    static {
        if (!ClassLoader.registerAsParallelCapable())
            throw new InternalError();
    }

    // 此处会将 BootClassLoader 作为 parent 参数传入进去
    PlatformClassLoader(BootClassLoader parent) {
        super("platform", parent, null);
    }

    // 加载 module
    private Package definePackage(String pn, Module module) {
        return JLA.definePackage(this, pn, module);
    }
}

AppClassLoader

AppClassLoader 的核心方法是 loadClass(...),最终会调用到 BuiltinClassLoader.loadClassOrNull(...) 方法,而此方法内部又会调用到 PlatformClassLoader.loadClass(...) 方法;然后实际上 PlatformClassLoader 内部又会去调用 BootClassLoader 的 loadClassOrNull(...) 方法。这种方式下就完成类加载器的双亲委派机制:

// ClassLoader.class
private static class AppClassLoader extends BuiltinClassLoader {
    static {
        if (!ClassLoader.registerAsParallelCapable())
            throw new InternalError();
    }

    final URLClassPath ucp;

    // 此处会将 PlatformClassLoader 作为 parent 参数传入进去
    AppClassLoader(PlatformClassLoader parent, URLClassPath ucp) {
        super("app", parent, ucp);
        this.ucp = ucp;
    }

    @Override
    protected Class<?> loadClass(String cn, boolean resolve) throws ClassNotFoundException{
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            int i = cn.lastIndexOf('.');
            if (i != -1) {
                sm.checkPackageAccess(cn.substring(0, i));
            }
        }
        // 实际上是调用了 BuiltinClassLoader.loadClassOrNull(...) 方法
        return super.loadClass(cn, resolve);
    }

    @Override
    protected PermissionCollection getPermissions(CodeSource cs) {
        PermissionCollection perms = super.getPermissions(cs);
        perms.add(new RuntimePermission("exitVM"));
        return perms;
    }


    void appendToClassPathForInstrumentation(String path) {
        ucp.addFile(path);
    }


    private Package definePackage(String pn, Module module) {
        return JLA.definePackage(this, pn, module);
    }


    protected Package defineOrCheckPackage(String pn, Manifest man, URL url) {
        return super.defineOrCheckPackage(pn, man, url);
    }
}
查看原文

赞 2 收藏 0 评论 0

神易风 发布了文章 · 6月8日

线程池ThreadPoolExecutor 了解

本文章出处 线程池ThreadPoolExecutor 了解
转载请说明

常用线程池类型

Java通过Executors静态方法创建4种不同类型线程池。

  • newSingleThreadExecutor 创建单例的线程池,保证执行任务顺序,超出线程任务将会在任务中等待,所有的任务都按照FIFO队列顺序执行。
  • newFixedThreadPool 创建一个固定大小的线程组,指定工作线程数量,当任务超过指定工作数量时,在队列中排队等待执行。
  • newCachedThreadPool 创建一个可以缓存线程池,这个线程池活动线程是0,最大线程Integer.MAX,当不断有新的任务添加到线程池中,池内线程数量不够时,可以立刻创建新的线程执行任务。当空闲的线程超过60s就被系统回收掉。
  • newScheduleThreadPool 创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer。
  • newWorkStealingPool 会创建一个含有足够多线程的线程池,来维持相应的并行级别,它会通过工作窃取的方式,使得多核的 CPU 不会闲置,总会有活着的线程让 CPU 去运行。

像newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool都时内部封装ThreadPoolExecutor生成线程池的,下面具体分析ThreadPoolExecutor这个类。

ThreadPoolExecutor 构造函数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize 线程核心线程数,不会被回收的线程。
  • maximumPoolSize 线程池能够申请最大线程数量
  • workQueue 同步性队列转载执行的任务
  • keepAliveTime 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间。
  • threadFactory 线程工厂
  • handler 当任务数量超过队列容量时,需要处理这种情况,饱和策略,主要有4种处理策略

    • AbortPolicy:直接抛出异常,这是默认策略;
    • CallerRunsPolicy:使用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务;

线程池疑问

创建线程池基本核心构造参数我们已经知道了,但是我们还有很多问题没有搞明白的。怎么知道线程池内每个线程运行状态,是在工作中还是空闲呢?是不是有一个专门线程去标记空闲线程活动时间?线程是如何实现共用线程。 带着这些问题去阅读代码。

线程池内线程状态

以下内容都是来自ThreadPoolExecutor代码注释。
线程池内的线程状态都是有一个AtomicInteger ctl保持的,是一个原子整数,包装了两个领域含义。
ctl内部结构.png

  • workerCount 有效的线程数 ,线程总数2 ^ 29 -1 ,线程启动数量不包括线程停止的数量,而该值可能是
    与活动线程的实际数量暂时不同。例如当ThreadFactory创建线程失败时,线程正在执行退出,统计线程数量依然包括退出的线程。
  • runState 线程状态

    • RUNNING 正在接受新的任务并且处理队列中的任务
    • SHUTDOWN 不接受新的任务,但是能处理任务
    • STOP 不能接受新的任务,不能处理队列中的任务,但是可以中断正在执行的任务。
    • TIDYING 所有的任务终止,workerCount为0 ,线程全部过渡到TIDYING状态,即将运行terminated() 钩子方法
    • TERMINATEDterminated() 钩子方法执行完成

这些状态都有一个转换顺序

  • RUNNING -> SHUTDOWN 执行shutdown()
  • (RUNNING or SHUTDOWN) -> STOP 执行shutdownNow()
  • SHUTDOWN -> TIDYING 当任务队列和线程池都是空
  • STOP -> TIDYING 线程池都是空
  • TIDYING -> TERMINATED 当 terminated()钩子方法执行完
    这些状态具体代码实现
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

execute 方法解析

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 处理3个步骤
         * 1. 如果正在运行的线程数量小于核心线程数,直接创建一个新的线程去执行任务
         * 调用addWorker 方法自动检查 线程状态和数量,避免在不能添加线程时添加线程出现错误警报
         *
         * 2. 如果任务可以成功进入队列,我们仍然需要双重检查是否添加一个线程
         *   因为存在上次检查时有线程死亡或者当我们进入方法时线程池正在关闭
         *   因此,我们重新检查状态,如果停止,则回滚排队,如果没有,则启动新线程。
         *
         * 3. 添加任务失败,则尝试创建一个线程,如果失败了,使用拒绝策略
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { //当前线程数量小于核心线程数
            if (addWorker(command, true)) // 创建线程
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { //线程池状态RUNNING 并且 任务添加成功
            int recheck = ctl.get(); // 第二重检查
            if (! isRunning(recheck) && remove(command)) //判断线程池状态  删除任务修改状态
                reject(command);
            else if (workerCountOf(recheck) == 0)  //线程池数量为0
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) //线程池状态不为RUNNING 或者 队列已满,用于开启非核心线程拉取任务
            reject(command);
    }
 

下一步我们进入addWorker创建线程的核心方法

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry: //retry标记,第一次看到 😓
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN) //至少SHUTDOWN
                && (runStateAtLeast(c, STOP) // 至少STOP  都是不合法
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) { //状态合法
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) //大于核心线程或者最大线程都不需要创建线程,和掩码相与防止最大线程数超过2 ^ 29 - 1 细节啊
                    return false;
                if (compareAndIncrementWorkerCount(c))  // ctl 自增成功,跳出整个循环
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN)) //状态至少SHUTDOWN 重新进入循环 
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); //创建线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //在加锁期间重新检查线程池状态
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // 刚创建线程已经开始执行任务,这是有问题
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) { 
                    t.start(); //启动任务
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker() 主要流程检查线程池状态是否合法,创建新的线程,加入workers中,调用start()执行任务。我们去了解下Worker 类

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        // TODO: switch to AbstractQueuedLongSynchronizer and move
        // completedTasks into the lock word.

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }
}

Worker其实就是Runnable包装类,但是增加了任务中断功能,他的主要任务就是维护中断状态,继承AQS可以简化获取和释放围绕每个任务执行的锁定,防止旨在唤醒等待任务的工作线程的中断。
了解Worker怎么执行任务的进入runWorker()

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; //取出任务
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) { //如果当前worker没有任务,从队列中获取任务,直到队列为空
                w.lock();
                //处理线程中断机制 
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task); //前置处理,类似拦截器机制,需要子类去实现
                    try {
                        task.run(); //调用任务方法
                        afterExecute(task, null); //后置处理
                    } catch (Throwable ex) {
                        afterExecute(task, ex); //异常处理
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;  //执行任务数量+ 1
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly); //线程生命周期走完,执行回收工作
        }
    }

结合Worker构造函数,Worker在初始化就自己给自己上锁了,避免线程在任务还没有开始的情况下就被中断了 。启动线程执行runWorker方法,取出任务,释放锁,如果Worker中的任务为空,从队列中拉取任务。处理线程中断,主要依据第一线程状态已经至少STOP状态,然后清除中断状态,在判断线程没有中断信号了,再发送中断信号。按照作者注释的意思就是当线程池已经在停止过程中,线程应该中断,但是必须双重检查防止关闭过程中竞争发送中继信号。调用run方法执行任务。为什么要上锁执行任务,主要是执行任务过程,必须要获取锁才能中断线程的,但是Worker本身不支持重入锁的,只有在任务开始关闭过程才能中断。
在这里我们终于看到线程共用方式了,通过线程不断从队列中获取任务,然后再进行调用run方法执行任务,当线程退出获取队列循环,线程生命周期就结束了。

geTask()

    private Runnable getTask() {
        boolean timedOut = false; //上一次拉取是否超时

        for (;;) {
            int c = ctl.get();

            //检查线程池状态是SHUTDOWN  不接受新的任务
            // 任务队列为空
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount(); //核心线程数workerCount -1
                return null;
            }

            int wc = workerCountOf(c); 

            // allowCoreThreadTimeOut  空闲情况下是否回收核心线程数 默认是false
           // 当前线程数大于 核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // wc 大于最大线程数 ,先处理线程数量
           // 线程在存活的时间内没有获取到任务,则需要回收掉,上一个循环的,线程数-1
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) { //wc 不要为0,任务队列为空的情况
                if (compareAndDecrementWorkerCount(c)) //线程-1成功没有其他线程竞争,没有新增任务
                    return null;
                continue;
            }

            try {
                Runnable r = timed ? 
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //超时会返回空
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true; 
            } catch (InterruptedException retry) { //中断等待获取任务,放弃执行任务
                timedOut = false;
            }
        }
    }

这里我们知道空闲时间是怎么回收线程的,通过同步性队列poll() + 超时时间知道一个线程在这个时间内没有任务执行,线程池处于空闲状态的,返回null给调用方法,跳出while循环,结束整个线程的生命周期。

进入processWorkerExit()

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) //如果没有执行到任务,核心线程-1
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w); //移除当前worker ,线程会被回收掉
        } finally {
            mainLock.unlock();
        }

        tryTerminate(); //判断线程池内状态,是否对线程池发出关闭信号

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) { //线程池在RUNNABLE或者SHUTDOWN状态,线程池任然可以执行任务或者接受任务
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty()) //线程池内线程已经被回收完了并且任务还没有执行完
                    min = 1;
                if (workerCountOf(c) >= min) //线程池内线程数量大于核心线程池,不需要新建线程去处理
                    return; // replacement not needed
            }
            addWorker(null, false); //创建新的线程处理任务
        }
    }

进入 tryTerminate()

在线程池SHUTDOWN状态线程为0和任务队列为空的情况,或者STOP状态核心队列为空情况,线程池状向TIDYING转移,传播关闭池信号。

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) || //RUNNING 状态不需要处理
                runStateAtLeast(c, TIDYING) || //已经进入TIDYING,也不做处理 
                (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) //任务队列不为空,不满足条件
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE); 尝试去中断一个worker 
                return;
            }
        
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); //加锁修改线程池状态
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 进入TIDYING状态
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));  //执行完terminated() 进入TERMINATED状态 
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

shutdown()

再去了解下线程池终止方法

   public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN); //修改线程池状态为SHUTDOWN
            interruptIdleWorkers(); //中断线程
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

进入interruptIdleWorkers() 怎么中断线程

    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock(); //加锁主要是workers 是一个不安全集合
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) { //没有中断和 能够获取到锁,说明此线程池没有在执行任务,Worker 是不支持重入的
                    try {
                        t.interrupt(); 
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

处理方法挺简单的,修改线程池状态不要接收新的任务,将works中空闲线程取出发出中断信号。

shutdownNow

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();  //删除队列中的任务,返回给tasks
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

shutdownNow 会将队列中还没有来得及处理任务全部删除掉,直接调用tryTerminate()终止线程池生命周期。

总结

现在我们知道线程池内部机制是如何创建线程,共用线程,空闲回收,线程池的生命周期。调用execute()提交任务,如果当前线程池数量小于核心线程数,调用addWorker()创建一个新的线程池去执行任务,否则直接加入到队列中。在addWorker()启动一个线程去不断从队列拉取任务,直到一个队列存活时间没有任务执行或者队列为空,线程才会被回收掉。设置线程池时注意参数设置主要一些细节,核心线程数根据任务情况进行设置,大部分情况下都是核心数在处理任务,只有当任务队列超出容量大小的时候,才会创建新的任务去执行任务。所以在设置最大线程数时,注意设置队列容量大小,如果是Integer.MAX,线程数量永远不会超过核心线程数。只有当任务超出队列容量+线程最大值的情况才会触发饱和策略,根据任务需求选择合适处理方法。

查看原文

赞 0 收藏 0 评论 0

神易风 赞了文章 · 5月27日

常见 TCP 拥塞避免算法浏览(上)

BBR, the new kid on the TCP block | APNIC Blog 这篇文章挺好的,下面很多内容也基于这篇文章而来。

拥塞避免用于避免因为发送者发送数据过快导致链路上因为拥塞而出现丢包。

TCP 连接建立后先经过 Slow Start 阶段,每收到一个 ACK,CWND 翻倍,数据发送率以指数形式增长,等出现丢包,或达到 ssthresh,或到达接收方 RWND 限制后进入 Congestion Avoidance 阶段。下面这个图挺好的,描述了好几个过程,找不到出处了,只是列一下图吧。

一些基础东西可以看:TCP congestion control - Wikipedia

BDP

BDP 是 Bandwidth and Delay Product. 就是带宽 (单位 bps) 和延迟 (单位 s) 的乘积,单位是 bit,也是 Source 和 Destination 之间允许处在 Flying 状态的最大数据量。Flying 也叫 Inflights,就是发送了但还未收到的 Ack 的数据。


来自[3]。

实际发送速率乘以延迟得到的值越接近 BDP 说明算法的效率越高。

Reno

Reno 这种叫做 ACK-Pacing,基于 Ack 来确认网络状况。如果能持续收到 ACK,表示网络能正常承载当前发送速率。

Reno maintains an estimate of the time to send a packet and receive the corresponding ACK (the “round trip time,” or RTT), and while the ACK stream is showing that no packets are being lost in transit, then Reno will increase the sending rate by one additional segment each RTT interval.

Reno 下,每收到一个 ACK,CWND 加一,等出现丢包之后发送者将发送速率减半。理想状况下,Reno 能走出如下曲线:


来自[1]

Reno 有如下假设:

  • 丢包一定因为网络出现拥塞,但实际可能网络本身不好,可能有固有的丢包概率,所以假设并不严谨;
  • 网络拥塞一定是因为网络上某个 buffer overflow 了;
  • 网络的 RTT 和带宽稳定不容易变化;
  • 将速率减半以后,网络上的 buffer 一定能够从 overflow 变为 drain。也即对网络上 Buffer 大小也有假设;

从上面假设能看出,Reno 下受链路上 Buffer 大小影响很大。当 Buffer 较小的时候,链路上实际处在发送状态的数据量还未达到 BDP (Bandwidth delay product) 时候可能就会出现丢包,导致 Reno 立刻减半发送速率,从而无法高效的利用网络带宽。如果 Buffer 很大,超过 BDP,可能会进入 “Buffer Bloat” 状态,即延迟畸高,因为即使 Reno 速度降为一半依然不能保证使链路 Buffer 清空,或者说可能大部分时间链路上 Buffer 都处在非空状态,且每次 Reno 因为丢包而降速时,会做数据重传,导致之前发送的可能还依旧在链路上排队的数据空占资源没起到作用最终白白丢掉,于是让整条链路上持续存在固有延迟。

Reno 的问题:

  • 上面提到了,受链路 Buffer 影响很大;
  • 对高带宽网络,比如带宽是 10Gbps,即使假设延迟非常低只有 30ms,每个 RTT 下 CWND 增加 1500 个八进制数,得好几个小时才能真的利用起这个带宽量,并且要求这几个小时内数据都不能有丢包,不然 Reno 会降速;
  • Reno 每收到一个 Ack 就开始扩大 CWND,对链路上一起共享链路的其它 RTT 较大的连接不友好。比如一个连接 RTT 很低,它的 CWND 会比别的共享链路的连接大,不公平的占用更多带宽;

BIC

BIC 叫 Binary Increase Congestion Control,是在 Reno 基础上做改进,将 CWND 扩大过程分成三个阶段,第一阶段是在遇到丢包后将 CWND 降为 β × Wmax,(一般 β 是 0.5,Wmax 是丢包时 CWND)但记住之前 Wmax 最大值,之后以一个较快速度增加 CWND,在接近 Wmax 后 CWND 增加量是一个 Wmax 和 CWND 之间的二次函数,称为 Binary Increase,逐步去接近 Wmax,到达 Wmax 后又转换为二次曲线去探测下一个极限。具体内容可以看 Wiki,在这里:BIC TCP - Wikipedia

大概是这么个图,好处就是丢包后能快速恢复,并在稳定期尽力保持更长时间,并还能支持探测更高带宽值。


来自[3]

Cubic

Cubic 在 BIC 的基础上,一个是通过三次函数去平滑 CWND 增长曲线,让它在接近上一个 CWND 最大值时保持的时间更长一些,再有就是让 CWND 的增长和 RTT 长短无关,即不是每次 ACK 后就去增大 CWND,而是让 CWND 增长的三次函数跟时间相关,不管 RTT 多大,一定时间后 CWND 一定增长到某个值,从而让网络更公平,RTT 小的连接不能挤占 RTT 大的连接的资源。

平滑后的 CWND 和时间组成的曲线如下,可以看到三次曲线保留了 BIC 的优点,即在丢包后初期,CWND 能快速增长,减小丢包带来的影响;并且在接近上一个 CWND 最大值时 CWND 增长速度又会非常慢,要经历很长时间才能超越 Wmax,从而能尽力保持稳定,稳定在 Wmax 上;最后在超越 Wmax 后初期也是增长的非常慢,直到后来才快速的指数形式增长,用于探测下一个 Wmax。


来自[3]

Cubic 最终出来的曲线如下。黄色是 CWND,蓝色线是网络上队列拥塞包数。蓝色箭头表示 queue fill,绿色箭头表示 queue drain。看到有两个浅绿色的线,高的是开始丢包,低的是队列开始排队。因为这里画的是说网络上 bandwidth 是固定的,buffer 也是固定的,所以 Cubic 下没有超越 Last Wmax 继续探测下一个 Wmax 的曲线,都是还未到达第一次的 Wmax 时候就因为丢包而被迫降低了 CWND。第一次个黄色尖峰能那么高是因为当时网络上 buffer 是空的,后来每次丢包后 Buffer 还没来得及完全清空 CWND 又涨上来导致数据排队了,所以后来的黄色尖峰都没有第一个高。


来自[3]

Cubic 的优势:

  • 因为跟 RTT 无关所以更公平;
  • 更适合 BDP 大的网络。Reno 不适合是因为它依赖 RTT,BDP 大的时候 RTT 如果很高,会很久才能将传输效率提上去;

Cubic 缺点:

  • 当 Bandwidth 变化时候,Cubic 需要很长时间才能从稳定点进入探测下一个 Wmax 的阶段;
  • 更易导致 Bufferbloat。Reno 下如果链路上 Buffer 很大出现拥塞后 RTT 也会很长,很久才会收到 ACK,才会增加 CWND;但 Cubic 的 CWND 增长跟 RTT 无关,到时间就增长,从而更容易加剧链路负担。Bufferbloat 可以看下节。

综合来看 Cubic 适合 BDP 大的高性能网络,性能意思是带宽或者说发送速率,发送速率足够大 Cubic 把 Buffer 占满后才能快速清空,才能有较低的延迟。

一个挺好的讲 Cubic 的 PPT:Cubic,还有论文。Linux 2.6 时候用的 CUBIC。

Bufferbloat

Bufferbloat 在 Bufferbloat - Wikipedia 讲的挺好了。简单说就是随着内存越来越便宜,链路上有些设备的 Buffer 倾向于配置的特别大,而流行的 TCP Congestion Avoidance 算法又是基于丢包的。数据在队列排队说明链路已经出现拥塞,本来应该立即反馈给发送端让发送端减小发送速度的,但因为 Buffer 很大,数据都在排队,发送端根本不知道自己发出去的数据已经开始排队,还在以某个速度 (Reno)甚至更高速度(Cubic,到时间就增加 CWND) 发送。等真的出现丢包时候,发送端依然不知道出现了丢包,还会快速发消息,直到丢的这个包被接收端感知到,回复 ACK 后发送端才终于知道要降低发送速度。而重传的包放在链路上还得等之前的数据包都送达接收端后才能被处理。如果接收端的 Buffer 不足够大,很多数据送达接收端后都会丢弃,得等重传的包到达(Head Of Line 问题)后才能送给上游,大大增加延迟。

Bufferbloat 一方面是会导致网络上超长的延迟,再有就是导致网络传输不稳定,有时候延迟很小,有的时候延迟又很大等。

可以参考:Bufferbloat: Dark Buffers in the Internet - ACM Queue

PRR

在 CUBIC 之上又有个优化,叫做 Proportional Rate Reduction (PRR),用以让 CUBIC 这种算法在遇到丢包时候能更快的恢复到当前 CWND 正常值,而不过分的降低到过低的水平。

参考:draft-mathis-tcpm-proportional-rate-reduction-01 - Proportional Rate Reduction for TCP。不进一步记录了。Linux 3.X 的某个版本引入的,配合 CUBIC 一起工作。

链路上的队列模型

为了进一步优化 Cubic,可以先看看链路上队列的模型。

  • 当链路上数据较少时,所有数据都在发送链路上进行发送,没有排队的数据。这种情况下延迟最低;
  • 当传输的数据更多时候,数据开始排队,延迟开始增大;
  • 当队列满的时候进入第三个状态,队列满了出现丢包;


来自[1]

最优状态是 State 1 和 State 2 之间,即没有出现排队导致延迟升高,又能完全占满链路带宽发送数据,又高效延迟又低。
而基于丢包的 Congestion Avoidance 策略都是保持在 State 2 的状态。而 Cubic 是让 CWND 尽可能保持在上一个 Wmax 的状态,也即 State 2 末尾和即将切换到 State 3 的状态。所以 Cubic 相当于尽可能去占用链路资源,使劲发数据把下游链路占满但牺牲了延迟。

于是可以看到,为了保持在 State 1 和 State 2 状态,我们可以监控每个数据包的 RTT,先尽力增大 CWND 提高发送率如果发现发送速率提高后 RTT 没有升高则可以继续提高发送速率,直到对 RTT 有影响时就减速,说明从 State 1 切换到了 State 2。

我们希望让 CWND 尽力保持在下面图中标记为 optimum operating point的点上,即 RTT 又小,链路上带宽又被占满。在这个图上,RTprop 叫做 round-trip propagation time,BtlBw 叫做 bottleneck bandwidth。横轴是 inflights 数据量,纵轴有两个一个是 RTT 一个是送达速度。看到图中间有个很淡很淡的黄色的线把图分成了上下两部分,上半部分是 Inflights 数据量和 Round trip time 的关系。下半部分是 Inflights 和 Delivery Rate 的关系。

这里有四个东西需要说一下 CWND、发送速度、inflights,发送者带宽。inflights 就是发送者发到链路中还未收到 ACK 的数据量,发送速度是发送者发送数据的速度,发送者带宽是发送者能达到的最大发送速度,CWND 是根据拥塞算法得到的当前允许的 inflights 最大数据量,它也影响着发送速度。比如即使有足够大的带宽,甚至有足够多的数据要发,但 CWND 不够,于是只能降低发送速度。这么几个东西会纠缠在一起,有很多文章在描述拥塞算法的时候为了方便可能会将这几个东西中的某几个混淆在一起描述,看的时候需要尽力心里有数。

回到图蓝色的线表示 CWND 受制于 RTprop,绿色的线表示受制于 BltBw。灰色区域表示不可能出现的区域,至少会受到这两个限制中的一个影响。白色区域正常来说也不会出现,只是说理论上有可能出现,比如在链路上还有其它发送者一起在发送数据相互干扰等。灰色区域是无论如何不会出现。

先看上半部分,Inflights 和 Round-Trip Time 的关系,一开始 Inflights 数据量小,RTT 受制于链路固有的 RTprop 时间影响,即使链路上 Buffer 是空的 RTT 也至少是 RTprop。后来随着 Inflights 增大到达 BDP 之后,RTT 开始逐步增大,斜率是 1/BtlBw,因为 BDP 点右侧蓝色虚线就是 Buffer 大小,每个蓝点对应的纵轴点就是消耗完这么大 Buffer 需要的时间。消耗 Buffer 的时间 / Buffer 大小 就是 1/BltBw,因为消耗 Buffer 的时间乘以 BltBw 就是 Buffer 大小。感觉不好描述,反正就这么理解一下吧。主要是看到不可能到灰色部分,因为发送速率不可能比 BltBw 大。等到 Inflights 把 Buffer 占满后到达红色区域开始丢包。

下半部分,Inflights 比较小,随着 Inflights 增大 Delivery Rate 越大,因为此时还未达到带宽上限,发的数据越多到达率也就越高。等 Inflights 超过 BDP 后,Delivery Rate 受制于 BtlBw 大小不会继续增大,到达速度达到上限,Buffer 开始积累。当 Buffer 达到最大限度后,进入红色区域开始丢包。

之前基于丢包的拥塞算法实际上就是和链路 Buffer 一起配合控制 Inflights 数据量在 BDP 那条线后面与 BDP 线的距离。以前内存很贵,可能只比 BDP 大一点,基于丢包的算法延迟就不会很高,可能比最优 RTT 高一点。但后来 Buffer 越来越便宜,链路上 Buffer 就倾向于做的很大,此时基于丢包的拥塞算法就容易导致实际占用的 Buffer 很大,就容易出现 BufferBloat。

为了达到最优点,发送速率必须达到 BltBw,从而占满链路带宽,最高效的利用带宽;再有需要控制 Inflight 数据量不能超过链路的 BDP = BtlBw * RTprop,从而有最优的延迟。发送速率可以超过 BltBw,可以有队列暂时产生,但数据发送总量不能超 BDP,从而让平均下来延迟还是能在最优点附近。


来自[4]

TCP Vegas

Vegas 就如上面说的理想中算法一样,它会监控 RTT,在尝试增加发送速率时如果发现丢包或者 RTT 增加就降低发送速率,认为网络中出现拥塞。但它有这些问题:

  • CWND 增长是线性的,跟 Reno 一样需要很久才能很好的利用网络传输速率;
  • 最致命的是它不能很好的跟基于丢包的 Congestion Avoidance 共存,因为这些算法是让队列处在 State 2 和 3 之间,即尽可能让队列排队,也相当于尽可能的增大延迟,但 Vegas 是尽可能不排队,一发现排队就立即降低发送速率,所以 Vegas 和其它基于丢包算法共存时会逐步被挤出去;

参考文献在下一篇统一提供。

其他分享内容:

更多内容欢迎关注 LeanCloud 官方微信号:「LeanCloud 通讯」
查看原文

赞 11 收藏 5 评论 0

神易风 发布了文章 · 5月10日

HashMap源码解析

本文件出处https://shenyifengtk.github.io/2020/05/10/HashMap源码解析
转载请说明出处

以后面试官问你HashMap原理,不能再答数组+链表的结构了,JDK1.8 添加了红黑树结构,知识要与时俱进啊。受限于文章篇幅,我会从HashMap 初始化,put、get方面解析底层原理。

内部设定

常量设定

    /**
     * HashMap 默认初始化长度
     */
    static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16

    /**
     * HashMap 允许容器数组最大长度
     */
    static final int MAXIMUM_CAPACITY = 1 << 30;

    /**
     * 负载因子,容器数量/容器最大长度 大于等于,数组进行扩容
     */
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    /**
     *  链表的长度超过阈值,在添加元素的时候链表转化成红黑树
     */
    static final int TREEIFY_THRESHOLD = 8;

    /**
     * 在扩容的时候,红黑树数量小于阈值转换成链表
     */
    static final int UNTREEIFY_THRESHOLD = 6;

    /**
     * 最小table 容器数量支持链表树化
     */
    static final int MIN_TREEIFY_CAPACITY = 64;

内部元素结构

 static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        V value;
        Node<K,V> next;

        Node(int hash, K key, V value, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        public final K getKey()        { return key; }
        public final V getValue()      { return value; }
        public final String toString() { return key + "=" + value; }

        public final int hashCode() {
            return Objects.hashCode(key) ^ Objects.hashCode(value);
        }

        public final V setValue(V newValue) {
            V oldValue = value;
            value = newValue;
            return oldValue;
        }

        public final boolean equals(Object o) {
            if (o == this)
                return true;
            if (o instanceof Map.Entry) {
                Map.Entry<?,?> e = (Map.Entry<?,?>)o;
                if (Objects.equals(key, e.getKey()) &&
                    Objects.equals(value, e.getValue()))
                    return true;
            }
            return false;
        }
    }

Node对象只拥有三个属性,key,value,next,结构非常简单,操作也简单。

内部属性变量

   /**
     * 内部数组,上面所说table 指的就是这个
     */
    transient Node<K,V>[] table;

    /**
     * entrySet() 返回容器
     */
    transient Set<Map.Entry<K,V>> entrySet;

    /**
     * 键值对的数量,也是HashMap size
     */
    transient int size;

    /**
     * HashMap 结构修改次数
     */
    transient int modCount;

    /**
     * 扩容table 阈值
     */
    int threshold;

    /**
     *  扩容因子
     */
    final float loadFactor;
  

解析代码

构造函数

public HashMap(int initialCapacity, float loadFactor) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException("Illegal initial capacity: " +
                                               initialCapacity);
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        if (loadFactor <= 0 || Float.isNaN(loadFactor))
            throw new IllegalArgumentException("Illegal load factor: " +
                                               loadFactor);
        this.loadFactor = loadFactor;
        this.threshold = tableSizeFor(initialCapacity);
    }
    /**
     * Constructs an empty <tt>HashMap</tt> with the specified initial
     * capacity and the default load factor (0.75).
     *
     */
    public HashMap(int initialCapacity) {
        this(initialCapacity, DEFAULT_LOAD_FACTOR);
    }

    /**
     * Constructs an empty <tt>HashMap</tt> with the default initial capacity
     * (16) and the default load factor (0.75).
     */
    public HashMap() {
        this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted
    }

这段代码很简单,主要是判断两个输入参数是否合法,将容器长度重新计算扩容阈值。
看下tableSizeFor怎么计算阈值

/**
     * Returns a power of two size for the given target capacity.
     */
    static final int tableSizeFor(int cap) {
        int n = cap - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

这个方法其实通过输入数,返回一个最接近大于等于他的2次方数。
通过将cap 高位1不断右移,返回cap最近2次方的数。如果有同学看不懂这个算法,可以参考这位老哥写的https://blog.csdn.net/fan2012huan/article/details/51097331,解析得非常易懂。
上面的构造函数,并没有初始化Node 数组,我进入put方法看下怎么样初始化的!

添加元素

    public V put(K key, V value) {
        return putVal(hash(key), key, value, false, true);
    }

    /**
     * @param hash hash for key
     * @param key the key
     * @param value the value to put
     * @param onlyIfAbsent if true, don't change existing value
     * @param evict if false, the table is in creation mode.
     * @return previous value, or null if none
     */
    final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
       //p 如果存在就是链头
        Node<K,V>[] tab; Node<K,V> p; int n, i;
         //数组未初始化
        if ((tab = table) == null || (n = tab.length) == 0)
            n = (tab = resize()).length;
         //根据hash 值 计算数组下标位置,这个位置下没有元素
        if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null); //插入元素
        else {//hash 值冲突
            Node<K,V> e; K k;
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k)))) //key 相等
                e = p;
            else if (p instanceof TreeNode) //红黑树
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            else { 
                for (int binCount = 0; ; ++binCount) { //遍历链表,插入数据
                    if ((e = p.next) == null) { //下一个元素为空,说明已经遍历到最后一个元素 ,插入 终止循环
                        p.next = newNode(hash, key, value, null);
                        if (binCount >= TREEIFY_THRESHOLD - 1) // 到达转化成树链的阈值
                            treeifyBin(tab, hash); //将链表转换成树
                        break;
                    }
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k)))) //出现key 相等
                        break;
                    p = e;
                }
            }
            if (e != null) { // existing mapping for key
                V oldValue = e.value;
                if (!onlyIfAbsent || oldValue == null)
                    e.value = value;
                afterNodeAccess(e); //空方法 ,LinkedHashMap 实现
                return oldValue;
            }
        }
        ++modCount;
        if (++size > threshold) //超出阈值 
            resize();
        afterNodeInsertion(evict);
        return null;
    }

整体流程: 先判断数组为空,没有则使用resize()方法初始化数组,使用hash 值& 数组长度计算下标,下标值为空,直接插入。处理hash冲突情况,先判断链头key是否相等,判断数组Node是否树链,则使用putTreeVal方法插入树链中,否则遍历链表在结尾插入。最后判断数组元素是否超过阈值,使用reszie()扩容。这个流程基本和我们平常写业务代码差不多。

计算hash方法

static final int hash(Object key) {
        int h;
        return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
    }

假设没有这个hash算法,对象hashCode直接&数组长度(n-1),当n比较小的时候,hash虽然是32位整数,但是只要低位是有效的,hash冲突概率比较大,会导致链表比较长。使用hash方法将高16位影响到低16位,将低16位的二进制打散了,减少了hash冲突概率。
resize扩容,初始化数组方法

    final Node<K,V>[] resize() {
        Node<K,V>[] oldTab = table;
        int oldCap = (oldTab == null) ? 0 : oldTab.length;
        int oldThr = threshold;
        int newCap, newThr = 0;
        if (oldCap > 0) {
            if (oldCap >= MAXIMUM_CAPACITY) { //大于最大数组长度,不会进行扩容操作
                threshold = Integer.MAX_VALUE;
                return oldTab;
            }
            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                     oldCap >= DEFAULT_INITIAL_CAPACITY)
                newThr = oldThr << 1; // double threshold
        }
        else if (oldThr > 0) // 使用构造函数cap初始化数组长度
            newCap = oldThr;
        else {               // zero initial threshold signifies using defaults
            newCap = DEFAULT_INITIAL_CAPACITY;
            newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
        }
        if (newThr == 0) {
            float ft = (float)newCap * loadFactor;
            newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                      (int)ft : Integer.MAX_VALUE);
        }
        threshold = newThr;
        @SuppressWarnings({"rawtypes","unchecked"})
        Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; //创建新数组
        table = newTab;
        if (oldTab != null) {
            for (int j = 0; j < oldCap; ++j) {
                Node<K,V> e;
                if ((e = oldTab[j]) != null) {
                    oldTab[j] = null; //删除数组元素
                    if (e.next == null)  //没有链表的元素,直接移动插入
                        newTab[e.hash & (newCap - 1)] = e;
                    else if (e instanceof TreeNode)
                        ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); //切割树链
                    else { //将链条分成两个链条,一个是原数组下标,另一个则下标 x 2移动下标
                        Node<K,V> loHead = null, loTail = null;
                        Node<K,V> hiHead = null, hiTail = null;
                        Node<K,V> next;
                        do {
                            next = e.next;
                            if ((e.hash & oldCap) == 0) {
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            else {
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null); //将两种链条的链头元素插入到数组中
                        if (loTail != null) {
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }
                }
            }
        }
        return newTab;
    }

整体流程还是比较简单的,先确定数组长度,threshold属性,创建新数组移动copy元素,删除原数组元素。这个方法基本就是初始化table数组,扩容数组,处理链条或树链的内部细节。
if ((e.hash & oldCap) == 0) { 这个写法,看了好久才明白什么意思啊。我们都知道数组长度永远都是2的次方,二进制表现就是最高位是1 其他都是0, 比如默认长度16 ,二进制就是10000。 计算数组长度算法e.hash & (oldCap- 1), 长度-1 二机制的数11111,先数组扩容长度 * 2,计算下标结果下标前面不变,最高位是0还是1,是0保存下标不变,是1表示下标需要x 2.。觉得讲得不清楚,可以看下图
image
下面了解拆分树链的split

/**     
         * @param map the map
         * @param tab the table for recording bin heads
         * @param index the index of the table being split
         * @param bit the bit of hash to split on
         */
        final void split(HashMap<K,V> map, Node<K,V>[] tab, int index, int bit) {
            TreeNode<K,V> b = this;
            // 按hash 计算结果区分两个树链,统计树链的长度
            TreeNode<K,V> loHead = null, loTail = null;
            TreeNode<K,V> hiHead = null, hiTail = null;
            int lc = 0, hc = 0;
            for (TreeNode<K,V> e = b, next; e != null; e = next) {
                next = (TreeNode<K,V>)e.next;
                e.next = null;
                if ((e.hash & bit) == 0) {
                    if ((e.prev = loTail) == null)
                        loHead = e;
                    else
                        loTail.next = e;
                    loTail = e;
                    ++lc;
                }
                else {
                    if ((e.prev = hiTail) == null)
                        hiHead = e;
                    else
                        hiTail.next = e;
                    hiTail = e;
                    ++hc;
                }
            }

            if (loHead != null) {
                if (lc <= UNTREEIFY_THRESHOLD)
                    tab[index] = loHead.untreeify(map); //拆分树链转化成链条结构
                else {
                    tab[index] = loHead;
                    if (hiHead != null) // (else is already treeified)
                        loHead.treeify(tab);
                }
            }
            if (hiHead != null) {
                if (hc <= UNTREEIFY_THRESHOLD)
                    tab[index + bit] = hiHead.untreeify(map);
                else {
                    tab[index + bit] = hiHead;
                    if (loHead != null)
                        hiHead.treeify(tab);
                }
            }
        }

首先要知道TreeNode间接继承Node,拥有单向链条的属性。理解上面流程就很简单了,先遍历树链,按照hash & 数组长度方式,分成移动下标和不移动两个类型树链。再判断链条数量是否大于UNTREEIFY_THRESHOLD(6) ,满足使用untreeify方法拆分成链条。如果树链头不为空,则说明红黑树结构依然存在,使用treeify方法将链表转换成树链。
要理解链表怎么转化成树链,必须深入treeify方法

        final void treeify(Node<K,V>[] tab) {
            TreeNode<K,V> root = null;
            //从this 链头开始遍历
            for (TreeNode<K,V> x = this, next; x != null; x = next) {
                next = (TreeNode<K,V>)x.next;
                x.left = x.right = null;
                if (root == null) { //root 根元素
                    x.parent = null;
                    x.red = false;
                    root = x;
                }
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class<?> kc = null;
                    for (TreeNode<K,V> p = root;;) {
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        else if ((kc == null &&
                                  (kc = comparableClassFor(k)) == null) ||
                                 (dir = compareComparables(kc, k, pk)) == 0)
                            dir = tieBreakOrder(k, pk);

                        TreeNode<K,V> xp = p;
                        if ((p = (dir <= 0) ? p.left : p.right) == null) { //如果子节点黑红都是null,直接插入指向,否则向下递归
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            root = balanceInsertion(root, x); //调整节点位置,着色,左旋或者右旋
                            break;
                        }
                    }
                }
            }
            moveRootToFront(tab, root); //判断root是否根节点,如果不是插入根节点位置
        }

看到这里,大家是不是跟我一样,觉得代码写得还是比较简单的,没什么难度啊。下面代码就不是这样了,没有一点黑红树基础的同学,麻烦先补下课先,不然你很难理解代码什么意思。

红黑树的性质
  1. 节点是红色或黑色。
  2. 根节点是黑色。
  3. 所有叶子都是黑色(叶子是NIL节点)
  4. 每个红色节点的两个孩子节点必须是黑色. (从叶子到根的所有路径上不能有两个连续的红色节点)
  5. 从任意节点到其叶子节点的所有路径都包含相同数目的黑色节点。

红黑树.png
在红黑树插入或删除情况,上面的特征可能会被打破。所以要使用balanceInsertion方法平衡树链结构。我们在分析代码的时候,一定要把上面5个性质带上有助于理解。

static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root,
                                                    TreeNode<K,V> x) {
            x.red = true; //默认x 就是红色 
            for (TreeNode<K,V> xp, xpp, xppl, xppr;;) {
                //x 的父节点xp为空,则说明x 就是根节点 , 根据1属性,节点变成黑色就可以了
                if ((xp = x.parent) == null) {
                    x.red = false;
                    return x;
                }
                //父节点黑色,插入红色,以上条件都满足,终止循环
                else if (!xp.red || (xpp = xp.parent) == null)
                    return root;
               
                if (xp == (xppl = xpp.left)) {
                    //见图1-1 ,不满足性质4、性质5,改变着色,向上递归
                    if ((xppr = xpp.right) != null && xppr.red) {
                        xppr.red = false;
                        xp.red = false;
                        xpp.red = true;
                        x = xpp;
                    }
                    else {
                        //见图1-2 不满足 性质4,xp左移 
                        if (x == xp.right) {
                            root = rotateLeft(root, x = xp);
                            xpp = (xp = x.parent) == null ? null : xp.parent;
                        }
                        // 图1-3情况 不满足性质4,xpp 右移 调整着色
                        if (xp != null) {
                            xp.red = false;
                            if (xpp != null) {
                                xpp.red = true;
                                root = rotateRight(root, xpp);
                            }
                        }
                    }
                }
                else { //位置相反,流程一样,位置互换即可
                    if (xppl != null && xppl.red) {
                        xppl.red = false;
                        xp.red = false;
                        xpp.red = true;
                        x = xpp;
                    }
                    else {
                        if (x == xp.left) {
                            root = rotateRight(root, x = xp);
                            xpp = (xp = x.parent) == null ? null : xp.parent;
                        }
                        if (xp != null) {
                            xp.red = false;
                            if (xpp != null) {
                                xpp.red = true;
                                root = rotateLeft(root, xpp);
                            }
                        }
                    }
                }
            }
        }

上面的代码配合这三个图片,看出1-3 处理完后,刚好是图1-1 处理条件,向上递归,直到终止循环。这个就是插入后,调整红黑树平衡整体流程。这些代码,我当时看了一整天也明白什么意思,后面结合红黑树知识,自己画图对应代码流程,理解起来就很简单了。
图1-1
图1-2
图1-3
有了这些理解在去看putTreeVal方法就很简单了

        /**
         * Tree version of putVal.
         */
        final TreeNode<K,V> putTreeVal(HashMap<K,V> map, Node<K,V>[] tab,
                                       int h, K k, V v) {
            Class<?> kc = null;
            boolean searched = false;
            TreeNode<K,V> root = (parent != null) ? root() : this;
            for (TreeNode<K,V> p = root;;) {
                int dir, ph; K pk;
                if ((ph = p.hash) > h)
                    dir = -1;
                else if (ph < h)
                    dir = 1;
                else if ((pk = p.key) == k || (k != null && k.equals(pk)))
                    return p;
               // 获取key 类型,判断是是compare接口 比较大小
                else if ((kc == null &&
                          (kc = comparableClassFor(k)) == null) ||
                         (dir = compareComparables(kc, k, pk)) == 0) {
                    if (!searched) {
                        TreeNode<K,V> q, ch;
                        searched = true;
                        if (((ch = p.left) != null &&
                             (q = ch.find(h, k, kc)) != null) ||
                            ((ch = p.right) != null &&
                             (q = ch.find(h, k, kc)) != null))
                            return q;
                    }
                    dir = tieBreakOrder(k, pk); //使用类名,原生hash比较大小
                }

                TreeNode<K,V> xp = p;
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    Node<K,V> xpn = xp.next;
                    TreeNode<K,V> x = map.newTreeNode(h, k, v, xpn);
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    xp.next = x;
                    x.parent = x.prev = xp;
                    if (xpn != null)
                        ((TreeNode<K,V>)xpn).prev = x;
                    moveRootToFront(tab, balanceInsertion(root, x));
                    return null;
                }
            }
        }

遍历树链,比较hash 大小,找到子节点插入位置,使用balanceInsertion方法平衡红黑树。

获取value get()方法分析

    public V get(Object key) {
        Node<K,V> e;
        return (e = getNode(hash(key), key)) == null ? null : e.value;
    }

    final Node<K,V> getNode(int hash, Object key) {
        Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
        if ((tab = table) != null && (n = tab.length) > 0 && //空数组不指望了
            (first = tab[(n - 1) & hash]) != null) { //计算数组下标位置
            if (first.hash == hash && // 获取到value 
                ((k = first.key) == key || (key != null && key.equals(k))))
                return first;
            if ((e = first.next) != null) {
                if (first instanceof TreeNode) //使用红黑树查找
                    return ((TreeNode<K,V>)first).getTreeNode(hash, key);
                do {
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        return e;
                } while ((e = e.next) != null); //先查找,在遍历
            }
        }
        return null;
    }

看下红黑树查找

        final TreeNode<K,V> getTreeNode(int h, Object k) {
            return ((parent != null) ? root() : this).find(h, k, null);
        }

        final TreeNode<K,V> root() {
            for (TreeNode<K,V> r = this, p;;) {
                if ((p = r.parent) == null)
                    return r;
                r = p;
            }
        }

        final TreeNode<K,V> find(int h, Object k, Class<?> kc) {
            TreeNode<K,V> p = this;
            do {
                int ph, dir; K pk;
                TreeNode<K,V> pl = p.left, pr = p.right, q;
                if ((ph = p.hash) > h) //大于往左边
                    p = pl;
                else if (ph < h) //小于往右边
                    p = pr;
                else if ((pk = p.key) == k || (k != null && k.equals(pk))) //找到了
                    return p;
                else if (pl == null) //遍历到最底下一个节点为止,这个p节点肯定没有字节点
                    p = pr;
                else if (pr == null)
                    p = pl;
                else if ((kc != null ||
                          (kc = comparableClassFor(k)) != null) &&
                         (dir = compareComparables(kc, k, pk)) != 0)
                    p = (dir < 0) ? pl : pr;
                else if ((q = pr.find(h, k, kc)) != null)
                    return q;
                else
                    p = pl;
            } while (p != null);
            return null;
        }

删除节点

    public V remove(Object key) {
        Node<K,V> e;
        return (e = removeNode(hash(key), key, null, false, true)) == null ?
            null : e.value;
    }
    final Node<K,V> removeNode(int hash, Object key, Object value,
                               boolean matchValue, boolean movable) {
        Node<K,V>[] tab; Node<K,V> p; int n, index;
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (p = tab[index = (n - 1) & hash]) != null) {
            Node<K,V> node = null, e; K k; V v;
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))  //根据key 找到value
                node = p;
            else if ((e = p.next) != null) {
                if (p instanceof TreeNode)
                    node = ((TreeNode<K,V>)p).getTreeNode(hash, key); //使用红黑树寻找
                else {
                    do { //链表遍历查找
                        if (e.hash == hash &&
                            ((k = e.key) == key ||
                             (key != null && key.equals(k)))) { 
                            node = e;
                            break;
                        }
                        p = e;
                    } while ((e = e.next) != null); 
                }
            }
            if (node != null && (!matchValue || (v = node.value) == value ||
                                 (value != null && value.equals(v)))) {
                if (node instanceof TreeNode)
                    ((TreeNode<K,V>)node).removeTreeNode(this, tab, movable); //在红黑树中删除,这个有点复杂
                else if (node == p) //如果p刚好是链头,指引删除
                    tab[index] = node.next;
                else //p 刚好表示前一位,node 要被删除,p指向node 下一个节点,即可删除。
                    p.next = node.next;
                ++modCount;
                --size;
                afterNodeRemoval(node);
                return node;
            }
        }
        return null;
    }

数组和链表删除还是比较简单的,下标指引变更即可。红黑树元素删除就相当复杂了,比插入数据要麻烦等多了

        final void removeTreeNode(HashMap<K,V> map, Node<K,V>[] tab,
                                  boolean movable) {
            int n;
            if (tab == null || (n = tab.length) == 0)
                return;
            int index = (n - 1) & hash;
            TreeNode<K,V> first = (TreeNode<K,V>)tab[index], root = first, rl;
            TreeNode<K,V> succ = (TreeNode<K,V>)next, pred = prev;
            if (pred == null) //前置节点为空,将链头指向succ,即子节点
                tab[index] = first = succ;
            else  //将前置节点指向next,相当于删除this
                pred.next = succ;
            if (succ != null) //删除
                succ.prev = pred;
            if (first == null)
                return;
            if (root.parent != null)
                root = root.root();
            if (root == null //根据红黑树特性,最短路径不能小于最长路径1/2,树链长度不足 2 +4 = 6,拆分
                || (movable
                    && (root.right == null
                        || (rl = root.left) == null
                        || rl.left == null))) {
                tab[index] = first.untreeify(map);  // too small
                return;
            }
            TreeNode<K,V> p = this, pl = left, pr = right, replacement;
            if (pl != null && pr != null) {
                TreeNode<K,V> s = pr, sl;
                while ((sl = s.left) != null) // 找到右孩子的左子树找最小值
                    s = sl;
                boolean c = s.red; s.red = p.red; p.red = c; // 交换颜色
                TreeNode<K,V> sr = s.right;
                TreeNode<K,V> pp = p.parent;
                if (s == pr) { // s 是  p直系右子节点 ,交换位置
                    p.parent = s;
                    s.right = p;
                }
                else {
                    TreeNode<K,V> sp = s.parent;
                    if ((p.parent = sp) != null) { //p 移动到s 位置
                        if (s == sp.left)
                            sp.left = p;
                        else
                            sp.right = p;
                    }
                    if ((s.right = pr) != null) //s 移动到位置
                        pr.parent = s;
                }
                p.left = null;
                if ((p.right = sr) != null)   //p 右节点 和s 关联
                    sr.parent = p;
                if ((s.left = pl) != null)  //s 和p子左边节点pl关联起来
                    pl.parent = s;
                if ((s.parent = pp) == null) // s 父节点 指向 p 父节点  s 已经完全替换p 
                    root = s;
                else if (p == pp.left) 
                    pp.left = s;
                else
                    pp.right = s;
                if (sr != null)
                    replacement = sr;
                else
                    replacement = p;
            }
            else if (pl != null) //只有左节点情况
                replacement = pl;
            else if (pr != null) //只有右节点情况
                replacement = pr;
            else
                replacement = p;
            if (replacement != p) {
                TreeNode<K,V> pp = replacement.parent = p.parent;
                if (pp == null)
                    (root = replacement).red = false;
                else if (p == pp.left)
                    pp.left = replacement;
                else
                    pp.right = replacement;
                p.left = p.right = p.parent = null;
            }
            //调整元素在红黑树中着色,保持平衡
            TreeNode<K,V> r = p.red ? root : balanceDeletion(root, replacement);

            if (replacement == p) {  // 删除元素p
                TreeNode<K,V> pp = p.parent;
                p.parent = null;
                if (pp != null) {
                    if (p == pp.left)
                        pp.left = null;
                    else if (p == pp.right)
                        pp.right = null;
                }
            }
            if (movable)
                moveRootToFront(tab, r);
        }

简单说一下,这个方法代码是思路,首先处理最简单的链表删除元素,将链表中对象的引用改成旁边的元素。
红黑树删除元素则比较复杂点,考虑的情况比较多。比如删除节点在底下,没有子节点,还要考虑元素的着手,如果是红色则可以直接删除,黑色需要着色。如果拥有一个左节点、右节点或者两个都要,情况就更加复杂了。这个方法大部分代码都是在处理左右节点不为空的情况,将删除节点位置和右节点下最左边的子节点交换位置,并且交换着色。为什么要这么做呢?主要是因为这个节点的hash值和删除节点值最为接近,红黑树的特性小于在左边,大于在右边,大于删除值的最小值,就整个红黑树最接近删除的值。在调整红黑树着色,任务就算完成了。

心得

整体来说HashMap的代码并不是很复杂,底层业务代码,其实就跟我们平常写的crud代码。但是我在看代码的时候,有一些地方好难看懂啊,主要是涉及到红黑树的知识点。很多程序员(包括我)平常都会抱怨数据结构没什么用,在工作中用不上,其实我们掌握不够熟练,在工作上不知道使用。

参考文献

红黑树特性
hash算法解析

查看原文

赞 0 收藏 0 评论 0

神易风 赞了文章 · 4月23日

红黑树详细分析,看了都说好

红黑树简介

红黑树是一种自平衡的二叉查找树,是一种高效的查找树。它是由 Rudolf Bayer 于1972年发明,在当时被称为对称二叉 B 树(symmetric binary B-trees)。后来,在1978年被 Leo J. Guibas 和 Robert Sedgewick 修改为如今的红黑树。红黑树具有良好的效率,它可在 O(logN) 时间内完成查找、增加、删除等操作。因此,红黑树在业界应用很广泛,比如 Java 中的 TreeMap,JDK 1.8 中的 HashMap、C++ STL 中的 map 均是基于红黑树结构实现的。考虑到红黑树是一种被广泛应用的数据结构,所以我们很有必要去弄懂它。

红黑树的性质

学过二叉查找树的同学都知道,普通的二叉查找树在极端情况下可退化成链表,此时的增删查效率都会比较低下。为了避免这种情况,就出现了一些自平衡的查找树,比如 AVL,红黑树等。这些自平衡的查找树通过定义一些性质,将任意节点的左右子树高度差控制在规定范围内,以达到平衡状态。以红黑树为例,红黑树通过如下的性质定义实现自平衡:

  1. 节点是红色或黑色。
  2. 根是黑色。
  3. 所有叶子都是黑色(叶子是NIL节点)。
  4. 每个红色节点必须有两个黑色的子节点。(从每个叶子到根的所有路径上不能有两个连续的红色节点。)
  5. 从任一节点到其每个叶子的所有简单路径都包含相同数目的黑色节点(简称黑高)。

有了上面的几个性质作为限制,即可避免二叉查找树退化成单链表的情况。但是,仅仅避免这种情况还不够,这里还要考虑某个节点到其每个叶子节点路径长度的问题。如果某些路径长度过长,那么,在对这些路径上的及诶单进行增删查操作时,效率也会大大降低。这个时候性质4和性质5用途就凸显了,有了这两个性质作为约束,即可保证任意节点到其每个叶子节点路径最长不会超过最短路径的2倍。原因如下:

当某条路径最短时,这条路径必然都是由黑色节点构成。当某条路径长度最长时,这条路径必然是由红色和黑色节点相间构成(性质4限定了不能出现两个连续的红色节点)。而性质5又限定了从任一节点到其每个叶子节点的所有路径必须包含相同数量的黑色节点。此时,在路径最长的情况下,路径上红色节点数量 = 黑色节点数量。该路径长度为两倍黑色节点数量,也就是最短路径长度的2倍。举例说明一下,请看下图:

上图画出了从根节点 M 出发的到其叶子节点的最长和最短路径。这里偷懒只画出了两条最长路径,实际上最长路径有4条,分别为:

M -> Q -> O -> N

M -> Q -> O -> p

M -> Q -> Y -> X

M -> Q -> Y -> Z

长度为4,最短路径为 M -> E,长度为2。最长路径的长度正好为最短路径长度的2倍。

前面说了关于红黑树的一些性质,这里还需要补充一些其他方面的东西。在红黑树简介一节中说到红黑树被发明出来的时候并不叫红黑树,而是叫做对称二叉 B 树,从名字中可发现红黑树和 B 树(这里指的是2-3树)或许有一定的关联,事实也正是如此。如果对红黑树的性质稍加修改,就能让红黑树和B树形成一一对应的关系。关于红黑树和 B 树关系的细节这里不展开说明了,有兴趣的同学可以参考《算法》第4版,那本书上讲的很透彻。

红黑树操作

红黑树的基本操作和其他树形结构一样,一般都包括查找、插入、删除等操作。前面说到,红黑树是一种自平衡的二叉查找树,既然是二叉查找树的一种,那么查找过程和二叉查找树一样,比较简单,这里不再赘述。相对于查找操作,红黑树的插入和删除操作就要复杂的多。尤其是删除操作,要处理的情况比较多,不过大家如果静下心来去看,会发现其实也没想的那么难。好了,废话就说到这,接下来步入正题吧。

旋转操作

在分析插入和删除操作前,这里需要插个队,先说明一下旋转操作,这个操作在后续操作中都会用得到。旋转操作分为左旋和右旋,左旋是将某个节点旋转为其右孩子的左孩子,而右旋是节点旋转为其左孩子的右孩子。这话听起来有点绕,所以还是请看下图:

上图包含了左旋和右旋的示意图,这里以右旋为例进行说明,右旋节点 M 的步骤如下:

  1. 将节点 M 的左孩子引用指向节点 E 的右孩子
  2. 将节点 E 的右孩子引用指向节点 M,完成旋转

上面分析了右旋操作,左旋操作与此类似,大家有兴趣自己画图试试吧,这里不再赘述了。旋转操作本身并不复杂,这里先分析到这吧。

插入

红黑树的插入过程和二叉查找树插入过程基本类似,不同的地方在于,红黑树插入新节点后,需要进行调整,以满足红黑树的性质。性质1规定红黑树节点的颜色要么是红色要么是黑色,那么在插入新节点时,这个节点应该是红色还是黑色呢?答案是红色,原因也不难理解。如果插入的节点是黑色,那么这个节点所在路径比其他路径多出一个黑色节点,这个调整起来会比较麻烦(参考红黑树的删除操作,就知道为啥多一个或少一个黑色节点时,调整起来这么麻烦了)。如果插入的节点是红色,此时所有路径上的黑色节点数量不变,仅可能会出现两个连续的红色节点的情况。这种情况下,通过变色和旋转进行调整即可,比之前的简单多了。

接下来,将分析插入红色节点后红黑树的情况。这里假设要插入的节点为 N,N 的父节点为 P,祖父节点为 G,叔叔节点为 U。插入红色节点后,会出现5种情况,分别如下:

情况一:

插入的新节点 N 是红黑树的根节点,这种情况下,我们把节点 N 的颜色由红色变为黑色,性质2(根是黑色)被满足。同时 N 被染成黑色后,红黑树所有路径上的黑色节点数量增加一个,性质5(从任一节点到其每个叶子的所有简单路径都包含相同数目的黑色节点)仍然被满足。

情况二:

N 的父节点是黑色,这种情况下,性质4(每个红色节点必须有两个黑色的子节点)和性质5没有受到影响,不需要调整。

情况三:

N 的父节点是红色(节点 P 为红色,其父节点必然为黑色),叔叔节点 U 也是红色。由于 P 和 N 均为红色,所有性质4被打破,此时需要进行调整。这种情况下,先将 P 和 U 的颜色染成黑色,再将 G 的颜色染成红色。此时经过 G 的路径上的黑色节点数量不变,性质5仍然满足。但需要注意的是 G 被染成红色后,可能会和它的父节点形成连续的红色节点,此时需要递归向上调整。

情况四:

N 的父节点为红色,叔叔节点为黑色。节点 N 是 P 的右孩子,且节点 P 是 G 的左孩子。此时先对节点 P 进行左旋,调整 N 与 P 的位置。接下来按照情况五进行处理,以恢复性质4。

这里需要特别说明一下,上图中的节点 N 并非是新插入的节点。当 P 为红色时,P 有两个孩子节点,且孩子节点均为黑色,这样从 G 出发到各叶子节点路径上的黑色节点数量才能保持一致。既然 P 已经有两个孩子了,所以 N 不是新插入的节点。情况四是由以 N 为根节点的子树中插入了新节点,经过调整后,导致 N 被变为红色,进而导致了情况四的出现。考虑下面这种情况(PR 节点就是上图的 N 节点):

如上图,插入节点 N 并按情况三处理。此时 PR 被染成了红色,与 P 节点形成了连续的红色节点,这个时候就需按情况四再次进行调整。

情况五:

N 的父节点为红色,叔叔节点为黑色。N 是 P 的左孩子,且节点 P 是 G 的左孩子。此时对 G 进行右旋,调整 P 和 G 的位置,并互换颜色。经过这样的调整后,性质4被恢复,同时也未破坏性质5。

插入总结

上面五种情况中,情况一和情况二比较简单,情况三、四、五稍复杂。但如果细心观察,会发现这三种情况的区别在于叔叔节点的颜色,如果叔叔节点为红色,直接变色即可。如果叔叔节点为黑色,则需要选选择,再交换颜色。当把这三种情况的图画在一起就区别就比较容易观察了,如下图:

删除

相较于插入操作,红黑树的删除操作则要更为复杂一些。删除操作首先要确定待删除节点有几个孩子,如果有两个孩子,不能直接删除该节点。而是要先找到该节点的前驱(该节点左子树中最大的节点)或者后继(该节点右子树中最小的节点),然后将前驱或者后继的值复制到要删除的节点中,最后再将前驱或后继删除。由于前驱和后继至多只有一个孩子节点,这样我们就把原来要删除的节点有两个孩子的问题转化为只有一个孩子节点的问题,问题被简化了一些。我们并不关心最终被删除的节点是否是我们开始想要删除的那个节点,只要节点里的值最终被删除就行了,至于树结构如何变化,这个并不重要。

红黑树删除操作的复杂度在于删除节点的颜色,当删除的节点是红色时,直接拿其孩子节点补空位即可。因为删除红色节点,性质5(从任一节点到其每个叶子的所有简单路径都包含相同数目的黑色节点)仍能够被满足。当删除的节点是黑色时,那么所有经过该节点的路径上的黑节点数量少了一个,破坏了性质5。如果该节点的孩子为红色,直接拿孩子节点替换被删除的节点,并将孩子节点染成黑色,即可恢复性质5。但如果孩子节点为黑色,处理起来就要复杂的多。分为6种情况,下面会展开说明。

在展开说明之前,我们先做一些假设,方便说明。这里假设最终被删除的节点为X(至多只有一个孩子节点),其孩子节点为NX的兄弟节点为SS的左节点为 SL,右节点为 SR。接下来讨论是建立在节点 X 被删除,节点 N 替换X的基础上进行的。这里说明把被删除的节点X特地拎出来说一下的原因是防止大家误以为节点N会被删除,不然后面就会看不明白。

在上面的基础上,接下来就可以展开讨论了。红黑树删除有6种情况,分别是:

情况一:

N 是新的根。在这种情形下,我们就做完了。我们从所有路径去除了一个黑色节点,而新根是黑色的,所以性质都保持着。

上面是维基百科中关于红黑树删除的情况一说明,由于没有配图,看的有点晕。经过思考,我觉得可能会是下面这种情形:

要删除的节点 X 是根节点,且左右孩子节点均为空节点,此时将节点 X 用空节点替换完成删除操作。

可能还有其他情形,大家如果知道,烦请告知。

情况二:

S 为红色,其他节点为黑色。这种情况下可以对 N 的父节点进行左旋操作,然后互换 P 与 S 颜色。但这并未结束,经过节点 P 和 N 的路径删除前有3个黑色节点(P -> X -> N),现在只剩两个了(P -> N)。比未经过 N 的路径少一个黑色节点,性质5仍不满足,还需要继续调整。不过此时可以按照情况四、五、六进行调整。

情况三:

N 的父节点,兄弟节点 S 和 S 的孩子节点均为黑色。这种情况下可以简单的把 S 染成红色,所有经过 S 的路径比之前少了一个黑色节点,这样经过 N 的路径和经过 S 的路径黑色节点数量一致了。但经过 P 的路径比不经过 P 的路径少一个黑色节点,此时需要从情况一开始对 P 进行平衡处理。

情况四:

N 的父节点是红色,S 和 S 孩子为黑色。这种情况比较简单,我们只需交换 P 和 S 颜色即可。这样所有通过 N 的路径上增加了一个黑色节点,所有通过 S 的节点的路径必然也通过 P 节点,由于 P 与 S 只是互换颜色,并不影响这些路径。

这里需要特别说明一下,上图中的节点 N 并非是新插入的节点。当 P 为红色时,P 有两个孩子节点,且孩子节点均为黑色,这样从 G 出发到各叶子节点路径上的黑色节点数量才能保持一致。既然 P 已经有两个孩子了,所以 N 不是新插入的节点。情况四是由以 N 为根节点的子树中插入了新节点,经过调整后,导致 N 被变为红色,进而导致了情况四的出现。考虑下面这种情况(PR 节点就是上图的 N 节点):

情况五:

S 为黑色,S 的左孩子为红色,右孩子为黑色。N 的父节点颜色可红可黑,且 N 是 P 左孩子。这种情况下对 S 进行右旋操作,并互换 S 和 SL 的颜色。此时,所有路径上的黑色数量仍然相等,N 兄弟节点的由 S 变为了 SL,而 SL 的右孩子变为红色。接下来我们到情况六继续分析。

情况六:

S 为黑色,S 的右孩子为红色。N 的父节点颜色可红可黑,且 N 是其父节点左孩子。这种情况下,我们对 P 进行左旋操作,并互换 P 和 S 的颜色,并将 SR 变为黑色。因为 P 变为黑色,所以经过 N 的路径多了一个黑色节点,经过 N 的路径上的黑色节点与删除前的数量一致。对于不经过 N 的路径,则有以下两种情况:

  1. 该路径经过 N 新的兄弟节点 SL ,那它之前必然经过 S 和 P。而 S 和 P 现在只是交换颜色,对于经过 SL 的路径不影响。
  2. 该路径经过 N 新的叔叔节点 SR,那它之前必然经过 P、 S 和 SR,而现在它只经过 S 和 SR。在对 P 进行左旋,并与 S 换色后,经过 SR 的路径少了一个黑色节点,性质5被打破。另外,由于 S 的颜色可红可黑,如果 S 是红色的话,会与 SR 形成连续的红色节点,打破性质4(每个红色节点必须有两个黑色的子节点)。此时仅需将 SR 由红色变为黑色即可同时恢复性质4和性质5(从任一节点到其每个叶子的所有简单路径都包含相同数目的黑色节点。)。

删除总结

红黑树删除的情况比较多,大家刚开始看的时候可能会比较晕。可能会产生这样的疑问,为啥红黑树会有这种删除情况,为啥又会有另一种情况,它们之间有什么联系和区别?和大家一样,我刚开始看的时候也有这样的困惑,直到我把所有情况对应的图形画在一起时,拨云见日,一切都明了了。此时天空中出现了4个字,原来如此、原来如此、原来如此。所以,请看图吧:

总结

红黑树是一种重要的二叉树,应用广泛,但在很多数据结构相关的书本中出现的次数并不多。很多书中要么不说,要么就一笔带过,并不会进行详细的分析,这可能是因为红黑树比较复杂的缘故。我在学习红黑树的时候也找了很多资料,但总体感觉讲的都不太好。尤其是在我学习删除操作的时候,很多资料是实在人看不下去,看的我很痛苦。直到我看到维基百科上关于红黑树的分析时,很是欣喜。这篇文章分析的很有条理,言简意赅,比很多资料好了太多。本文对红黑树的分析也主要参考了维基百科中的红黑树分析,并对维基百科中容易让人产生疑问和误解的地方进行了说明。同时维基百科中文版红黑树文中的图片较为模糊,这里我重新进行了绘制。需要说明的是,维基百科中文版无法打开了,文中关于维基百科的链接都是英文版的。另外在给大家推荐一个数据结构可视化的网站,里面包含常见数据结构可视化过程,地址为:t.cn/RZFgryr

另外,由于红黑树本身比较复杂,实现也较为复杂。在写这篇文章之前,我曾尝试过用 Java 语言实现红黑树的增删操作,最终只写出了新增节点操作,删除没做出来。而且自己写的新增逻辑实在在太繁琐,写的不好看,没法拿出来 show。所以最后把 Java 中的 TreeMap 增删相关源码拷出来,按照自己的需求把源码修改了一下,也勉强算是实现了红黑树吧。代码放到了 github 上,传送门 -> RBTree.java

最后,如果你也在学习红黑树,希望这篇文章能够帮助到你。另外,由于红黑树本身比较复杂,加之本人水平有限,难免会出一些错误。如果有错,还望大家指出来,我们共同讨论。

参考

本文在知识共享许可协议 4.0 下发布,转载请注明出处
作者:coolblog
为了获得更好的分类阅读体验,
请移步至本人的个人博客:http://www.coolblog.xyz

cc
本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。

查看原文

赞 104 收藏 430 评论 41

认证与成就

  • 获得 24 次点赞
  • 获得 2 枚徽章 获得 0 枚金徽章, 获得 1 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2018-04-23
个人主页被 974 人浏览