老马啸西风

老马啸西风 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织 houbb.github.io/ 编辑
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 个人简介什么都没有

个人动态

老马啸西风 发布了文章 · 2020-12-12

技术架构的演进之路: 为什么需要微服务?

技术架构的演进之路

整体发展概览

服务架构一直处于演变之中,为了适合自己的业务,不断的去调整。

整体的发展历程如下:

输入图片说明

开发者视角

从一个 java 开发者,感受大概经历了下面几个历程:

第一阶段:单体架构

早期,大部分IT系统都是单体系统,例如传统的SSH架构,此时前后端也没有分离,UI组件也包含在了控制层:

输入图片说明

这个也就是老马刚毕业时候的架构,SSH 基本是面试必问。

不过现在这些都发生了一些变化,主流已经变成了 spring mvc + spring contaienr + mybatis。

只能说,spring,java 界永远的春天!

第二阶段:分布式架构

为了方便给系统扩容,以及增加系统的复用性,出现分布式系统。

另一方面,系统模块快速膨胀,为了降低系统内部的复杂度,于是对系统模块进行拆解,分不到不同的系统中,降低模块耦合,加快迭代速度。

ps: 其实主要是降低单个应用的复杂性,让每一个应用专注于一件事情。这样可维护成本大大降低,换言之,开发完后以后,可以让一个刚毕业的新人做运维。把开发者裁掉,降低成本。

主流主要是 SOA 和 MSA 两种体系。

SOA

早期的分布式系统是基于面向服务的架构SOA。

SOA是微服务的前身,主要是为了摆脱单体应用的问题,达到以下效果:

  1. 充分利用现有的基础设施;
  2. SOA体系结构依赖于消息传递(AMQP,MSMQ)和SOAP作为主要的远程访问协议。
  3. 快速响应业务变化;

架构图如下:

输入图片说明

异构系统,也可以通过消息中间件的协议转换进行相互调用。

这个我倒是接触过一段时间,当时业务系统是 C# 开发,我所在的后端服务使用的是 java 技术开发。当时的协议使用的是 webservice。

但是用起来感觉不是很顺畅,主要缺点如下:

(1)WebService中常用的SOAP通信协议,通常使用XML格式进行通信,数据冗余,协议过重

(2)服务治理很不完善。

后来逐渐演变为了现在的 MSA(Micro-Service Archeticture 微服务架构),从而实现了更加松耦合以及更加灵活的系统。

MSA

微服务是一种软件开发技术——面向服务的体系结构(SOA)体系结构样式的变体,它将应用程序构造为松散耦合服务的集合。

在微服务体系结构中,服务是细粒度的,协议是轻量级的

  • 微服务架构图示

微服务架构图示

优点

微服务架构有很多重要的优点。

首先,它解决了复杂性问题。它将单体应用分解为一组服务。虽然功能总量不变,但应用程序已被分解为可管理的模块或服务。

这些服务定义了明确的RPC或消息驱动的API边界。微服务架构强化了应用模块化的水平,而这通过单体代码库很难实现。

因此,微服务开发的速度要快很多,更容易理解和维护。

第三,微服务架构可以使每个微服务独立部署。

最后,微服务架构使得每个服务都可独立扩展。

现在这种架构模式已经成为主流,个人感受最深的就是如果你只负责单一服务,你可以把他理解的比较透彻,而且维护起来也没有那么复杂。如果有功能变更,只上线对应的应用即可。

缺点

微服务的一些想法在实践上是好的,但当整体实现时也会呈现出其复杂性。

  • 运维开销及成本增加
  • 必须有坚实的 DevOps 开发运维一体化技能
  • 隐式接口及接口匹配问题
  • 代码重复
  • 分布式系统的复杂性
  • 异步机制
  • 可测性的挑战

个人感觉微服务最大的问题在于系统的拆分之后,很难有一个人可以知道系统的全貌,所以定位问题变得非常复杂。

举个例子,如果交易发生一笔失败,你可能要从API网关=》业务系统=》交易核心=》支付核心=》风控系统问一遍才能知道原因,最后发现是对底层的系统返回了一个失败,这里涉及到多个系统的沟通成本,基本上半天的时间就没了。

SOA vs 微服务

SOA vs 微服务

SOA微服务架构
应用程序服务的可重用性的最大化专注于解耦
系统性的改变需要修改整体系统性的改变是创建一个新的服务
DevOps和持续交付正在变得流行,但还不是主流强烈关注DevOps和持续交付
专注于业务功能重用更重视“上下文边界”的概念
通信使用企业服务总线ESB对于通信而言,使用较少精细和简单的消息系统
支持多种消息协议使用轻量级协议,例如HTTP,REST或Thrift API
对部署到它的所有服务使用通用平台应用程序服务器不是真的被使用,通常使用云平台
容器(如Docker)的使用不太受欢迎容器在微服务方面效果很好
SOA服务共享数据存储每个微服务可以有一个独立的数据存储
共同的治理和标准轻松的治理,更加关注团队协作和选择自由

挑战

微服务的挑战可以概述如下:

  • API Gateway
  • 服务间调用
  • 服务发现
  • 服务容错
  • 服务部署
  • 数据调用

PatternsRelatedToMicroservices

不过幸运的是,很多成熟的中间件,已经为我们解决了这些问题。

第一代微服务框架

dubbo 的架构

Dubbo 的架构如下:

struct

dubbo 针对 rpc 这部分做的很好,单也仅此而已。

但是为什么还是会这么火爆呢?

很多架构的升级都会有历史包袱,除非你是一家新公司,全新的应用。

大部分的应用都是 spring 或者 springboot 的,所以现在大部分公司都是 springboot + dubbo 的技术选型方案,这样可以让架构的平滑的迁移。

如果你们公司是全新的技术选型,可以考虑 spring cloud。

spring cloud 架构

Spring Cloud 架构如下:

输入图片说明

你会发现 spring cloud 可以说是 java 技术栈中,比较完善的微服务框架。

当然,spring 再牛,负责的声明周期也只是 jvm 之内,应用的部署运维也是需要考虑的。

输入图片说明

每一项技术都有其优势和局限性,所以需要结合使用。

推荐阅读:

Microservice Architectures With Spring Cloud and Docker

目前 docker 虚拟化技术如日中天,结合 k8s 掌托。

我选称这盛世为,喝不起咖啡的打工人,在春天的货船上,996 搬砖!

下一代微服务:Service Mesh?

Service Mesh 也是目前比较火爆的技术,我们后续进行详解。

个人感悟

技术架构的演进和生物的进化是类似的,物竞天择,适者生存。

学习技术也不能只局限于现在这一刻,要学会去回顾技术的历史,知道为什么是这样?如果有能力,也可以引领技术的未来,为什么不是这样呢?

我觉得自己很幸运,最初接触的是单体应用,是 spring xml 配置的时代。

我觉得自己很不幸,框架层出不穷,技术日新月异,如果不持续学习,不出 5 年,就会被彻底淘汰。

为了不被那么快淘汰,本系列将从微服务的发展历史,理论知识,入门使用,应用实战,实现原理,重复造轮子等方面,逐渐理解微服务。

我是老马,期待与你,一起见证开发者的春天!

深入学习

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 2020-12-12

java 如何从零实现一个数据库差异对比工具?

对比数据的痛苦

不知道你是否也像我一样,在快乐编写代码的时候,必须进行一些数据库的数据对比工作。

诚然,一般的数据差异,比如是每一行的内容不同,市场上有比较成熟的 compare2 等对比工具。

但是如果是对比数据的每一列是否相同,这个就会变得比较麻烦。

对比

v1.0 纯人工对比

我们在做一些数据迁移等功能时,经常需要对比数据是否正确,最常见的方法就是人工一列一列的对比。

一开始老马也是这么和同事对的,对了几天之后感觉效率实在是低,而且还容易看花眼。

于是我就是琢磨,这个东西用程序对比应该会简单很多。

v2.0 半人工对比

说干就干,我花半天时间实现了一个基于 jsqlparser 可以解析类似于 insert into xxx (xx, xx, xx) values (xx, xx, xx); 的工具类。

然后对比 2 边的数据,这下对于一张表上百个字段的对比,一些变得快了许多,准确率也高了很多。

不要问我为什么会有上百个字段,这都是历史沉淀下来的瑰宝。。。

ps: insert into 语句是否通过数据库连接工具手工导出的。

后来又发现另一个问题:表太多,如果想换一个数据对比,我手工导出一遍又要花费数十分钟的时间,关键是重复且枯燥。

枯燥

既然重复,那么可以使用程序实现吗?

v3.0 对比基本自动化

于是我下班后熬夜实现了这个版本: java 程序实现了数据的导出持久化,然后进行修改前后的差异对比。

下面我分享一下自己的思路,以及核心源码,文末有下载福利。

希望对你工作和学习提供帮助。

整体理念

我希望这个工具是 MVP 的理念,由简单到复杂,后期逐渐丰富特性。

要有可拓展性,目前支持 mysql/oracle/sql server 等主流数据库,用户可以定制化开发。

尽可能少的依赖,使用原生的 jdbc,不需要引入 mybatis 等框架。

核心依赖

下面列举一下我用到的核心依赖:

fastjson 用于数据持久化为 json

mysql-connector-java 数据库连接驱动

jsqlparser 辅助工具,解析 sql 使用,非必须

实现思路

  1. 根据指定的 jdbc 连接信息,自动选择对应的 jdbc 实现。
  2. 执行对应的 sql,将结果解析为 map,进行 JSON 持久化
  3. 对持久化的 json 进行差异对比,展现出差异结果

有了这个思路,一切就会变得朴实无华。

当然在此之前,需要我们把代码实现出来,下面进入写BUG环节:

写BUG

jdbc 实现

核心接口

考虑到后期不同数据库实现,我们统一定义一个查询接口

/**
 * JDBC 访问层
 * @author 老马啸西风
 * @date 2017/8/1
 */
public interface JdbcMapper {

    /**
     * 执行查询语句
     * @param querySql
     * @return
     */
    ResultSet query(String querySql);

}

抽象实现

这里提供了基本的抽象实现。

子类只需要实现对应的连接获取信息即可。

public abstract class AbstractJdbcMapper implements JdbcMapper {

    protected JdbcVo jdbcVo;

    public AbstractJdbcMapper(JdbcVo jdbcVo) {
        this.jdbcVo = jdbcVo;
    }

    /**
     * 获取数据库连接
     * @return
     */
    protected abstract Connection getConnection();

    @Override
    public ResultSet query(String querySql) {
        ResultSet rs = null;
        Connection connection = getConnection();
        try {
            Statement stmt = null;
            stmt = connection.createStatement();
            rs = stmt.executeQuery(querySql);
        } catch (Exception e) {
            System.out.println("SQL: " + querySql);
            throw new ExportdbException(e);
        }
        return rs;
    }

}

JdbcVo 连接信息

这个对象主要是数据库连接信息对象:

public class JdbcVo {

    /**
     * 驱动类名称
     */
    private String driverClassName;

    /**
     * 数据库链接
     */
    private String url;

    /**
     * 用户名称
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    //getter & setter
}

mysql 实现

此处以 mysql 为例:

import com.github.houbb.exportdb.dto.JdbcVo;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

/**
 * mysql 实现
 * @author 老马啸西风
 * @date 2017/8/1
 */
public class MySqlJdbcMapper extends AbstractJdbcMapper {

    public MySqlJdbcMapper(JdbcVo jdbcVo) {
        super(jdbcVo);
    }

    @Override
    protected Connection getConnection() {
        try {
            Class.forName(jdbcVo.getDriverClassName());
            return DriverManager.getConnection(jdbcVo.getUrl(),
                    jdbcVo.getUsername(),
                    jdbcVo.getPassword());
        } catch (ClassNotFoundException | SQLException e) {
            e.printStackTrace();
        }
        return null;
    }
}

这里主要是对连接的初始化,连接不同的数据库,都需要引入对应的数据源。

行数据导出实现

下面是导出的核心实现:

接口定义

public interface IExportdb {

    /**
     * 查询
     * @param context 上下文
     * @param sql sql
     * @return 结果
     * @since 0.0.1
     */
    QueryResultVo query(final ExportdbContext context, final String sql);

}

这里指定了需要执行的 sql。

context 中为了便于后期拓展,目前只有 JdbcMapper。

返回的就是 QueryResultVo,就是查询结果,定义如下:

public class QueryResultVo {
    /**
     * 表名称
     */
    private String tableName;

    /**
     * 数据库名称
     *
     * @since 0.0.2
     */
    private String databaseName;

    /**
     * 结果集合
     */
    private List<Map<String, Object>> resultMaps;

    /**
     * 执行的 sql
     */
    private String sql;

    //getter & setter
}

默认实现

默认的导出实现如下:

import com.github.houbb.exportdb.core.ExportdbContext;
import com.github.houbb.exportdb.core.IExportdb;
import com.github.houbb.exportdb.dal.JdbcMapper;
import com.github.houbb.exportdb.dto.QueryResultVo;
import com.github.houbb.exportdb.exception.ExportdbException;
import com.github.houbb.heaven.util.lang.StringUtil;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public class Exportdb implements IExportdb {

    @Override
    public QueryResultVo query(ExportdbContext context, String sql) {
        try {
            final JdbcMapper jdbcMapper = context.jdbcMapper();

            ResultSet resultSet = jdbcMapper.query(sql);
            List<Map<String, Object>> maps = new ArrayList<>();

            String tableName = null;
            while (resultSet.next()) {
                final ResultSetMetaData metaData = resultSet.getMetaData();
                // 设置表名称
                if(tableName == null) {
                    tableName = metaData.getTableName(1);
                }

                Map<String, Object> map = new LinkedHashMap<>();
                // 为空直接返回,大于1则报错
                // 列数的总数
                int columnCount = metaData.getColumnCount();

                for (int i = 1; i <= columnCount; i++) {
                    String columnName = metaData.getColumnName(i);
                    Object value = resultSet.getObject(columnName);

                    map.put(columnName, value);
                }

                maps.add(map);
            }

            if(StringUtil.isEmptyTrim(tableName)) {
                Statement statement = CCJSqlParserUtil.parse(sql);
                Select select = (Select)statement;
                PlainSelect plainSelect = (PlainSelect) select.getSelectBody();
                tableName = plainSelect.getFromItem().toString();
            }

            return QueryResultVo.newInstance().tableName(tableName)
                    .databaseName("")
                    .sql(sql)
                    .resultMaps(maps);
        } catch (SQLException | JSQLParserException throwables) {
            throw new ExportdbException(throwables);
        }
    }
}

其实实现非常简单,我们主要讲一下两点:

(1)表名称

mysql 经测试可以通过如下方式获取:

resultSet.getMetaData();
tableName = metaData.getTableName(1);

oracle 我在测试的时候,发现无法获取。所以是借助 sqlparser 解析我们的查询语句得到的。

暂时主要是支持查询,所以这里写的有些固定了,后续可以优化一下。

if(StringUtil.isEmptyTrim(tableName)) {
    Statement statement = CCJSqlParserUtil.parse(sql);
    Select select = (Select)statement;
    PlainSelect plainSelect = (PlainSelect) select.getSelectBody();
    tableName = plainSelect.getFromItem().toString();
}

(2)列信息

每一个查询,可能都对应多条记录。

我们看一下每一条记录的构建:

while (resultSet.next()) {
    final ResultSetMetaData metaData = resultSet.getMetaData();
    Map<String, Object> map = new LinkedHashMap<>();
    // 为空直接返回,大于1则报错
    // 列数的总数
    int columnCount = metaData.getColumnCount();
    for (int i = 1; i <= columnCount; i++) {
        String columnName = metaData.getColumnName(i);
        Object value = resultSet.getObject(columnName);
        map.put(columnName, value);
    }
    maps.add(map);
}

这个经常写 jdbc 的小伙伴也一定不陌生。

你说现在都用 mybatis 了,谁还写 jdbc 啊,这么 low。

那么,你自己手写一个 mybatis,这些也是必会的。

从零开始手写 mybatis(一)MVP 版本

差异对比

导出的使用

我们可以把一行数据导出,可以在修改前后分别导出。

如果是导出到不同的库,不同的表,那么就进行不同库表之间的导出。

导出结果之后,就需要进行对比了。

对比实现

接口定义

对于导出结果的处理,你可以根据自己的实际情况自行选择。

比如导出为 csv/json/insert 等,对比差异也可以按照自己的需求定制。

public interface IQueryResultHandler {

    /**
     * 结果处理类
     * @param queryResultVo 查询结果
     */
    void handler(final QueryResultVo queryResultVo);

}

持久化

此处介绍一种比较简单实用的方式:json 持久化。

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.github.houbb.exportdb.dto.QueryResultVo;
import com.github.houbb.exportdb.support.result.IQueryResultHandler;
import com.github.houbb.heaven.util.io.FileUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author 老马啸西风
 * @since 0.0.1
 */
public class FileJsonQueryResultHandler implements IQueryResultHandler {

    /**
     * 默认的文件输出路径
     *
     * 根据操作系统,自动设置
     * @since 0.0.1
     */
    private final String dir;

    public FileJsonQueryResultHandler(String dir) {
        this.dir = dir;
    }

    public FileJsonQueryResultHandler() {
        this("D:\\exportdb\\");
    }

    /**
     * 结果处理类
     *
     * @param queryResultVo 查询结果
     */
    @Override
    public void handler(final QueryResultVo queryResultVo) {
        String path = dir+queryResultVo.tableName()+".edb";
        System.out.println("文件路径: " + path);

        List<Map<String, Object>> list = queryResultVo.resultMaps();
        List<String> lines = new ArrayList<>(list.size()+1);

        lines.add("-- "+queryResultVo.sql());
        for(Map<String, Object> map : list) {
            lines.add(JSON.toJSONString(map, SerializerFeature.WriteMapNullValue));
        }

        FileUtil.write(path, lines);
    }

}

我们将行数据持久化到文件中,注意这里指定了 JSON.toJSONString(map, SerializerFeature.WriteMapNullValue)

这样可以让 null 字段也输出,更加方便对比。

文件差异对比实现

上面我们假设将文件输出到 2 个文件,下面指定文件路径就可以进行对比了:

/**
 * 差异对比
 * @param oldPath 原始路径
 * @param newPath 新的路径
 */
public static void differ(final String oldPath, final String newPath) {
    List<String> oldLines = FileUtil.readAllLines(oldPath);
    List<String> newLines = FileUtil.readAllLines(newPath);
    System.out.println(FileUtil.getFileName(oldPath)+" 对比开始---------------");
    for(int i = 0; i < oldLines.size(); i++) {
        String oldL = oldLines.get(i);
        String newL = newLines.get(i);
        if(oldL.startsWith("--")) {
            continue;
        }
        System.out.println("第 " + (i+1) +" 行对比: ");
        differMaps(oldL, newL);
    }
    System.out.println(FileUtil.getFileName(oldPath)+" 对比结束---------------");
    System.out.println();
}

private static void differMaps(final String oldMap, final String newMap) {
    Map<String, Object> om = JSON.parseObject(oldMap);
    Map<String, Object> nm = JSON.parseObject(newMap);
    for(Map.Entry<String, Object> entry : om.entrySet()) {
        String key = entry.getKey();
        Object oldV = om.get(key);
        Object newV = nm.get(key);
        // 跳过 null 的对比
        if(oldV == null && newV == null) {
            continue;
        }
        if(!ObjectUtil.isEquals(oldV, newV)) {
            System.out.println("差异列:" + key +", 旧值:" + oldV + ", 新值:" + newV);
        }
    }
}

这里将差异内容,直接 console 控台输出。

文件夹

当然,我们也可以对比两个文件夹下的内容。

实现如下:

public static void differDir(final String oldDir, final String newDir) {
    File[] oldFiles = new File(oldDir).listFiles();

    for(File file : oldFiles) {
        String fileName = file.getName();
        String aop = file.getAbsolutePath();
        String anp = newDir+fileName;
        differ(aop, anp);
    }
}

引导类

便利性

上面我们把核心实现都搞定了,但是用户使用起来还是不够方便。因为配置等不够优雅。

所以我们引入引导类,帮助用户快速使用:

/**
 * @author 老马啸西风
 * @since 0.0.1
 */
public class ExportdbBs {

    private ExportdbBs(){}

    /**
     * 导出实现
     * @since 0.0.1
     */
    private final IExportdb exportdb = new Exportdb();

    /**
     * 驱动类名称
     */
    private String driverName = DriverNameConstant.MYSQL;

    /**
     * 数据库链接
     */
    private String url = "jdbc:mysql://localhost:3306/test";

    /**
     * 用户名称
     */
    private String username = "root";

    /**
     * 密码
     */
    private String password = "123456";


    public static ExportdbBs newInstance() {
        return new ExportdbBs();
    }

    public ExportdbBs driverName(String driverName) {
        this.driverName = driverName;
        return this;
    }

    public ExportdbBs url(String url) {
        this.url = url;
        return this;
    }

    public ExportdbBs username(String username) {
        this.username = username;
        return this;
    }

    public ExportdbBs password(String password) {
        this.password = password;
        return this;
    }

    /**
     * 查询
     * @param sql sql
     * @return 结果
     * @since 0.0.1
     */
    public QueryResultVo query(final String sql) {
        //1. 构建 vo
        JdbcVo jdbcVo = new JdbcVo(driverName, url, username, password);

        //2. 获取 mapper
        final JdbcMapper jdbcMapper = getJdbcMapper(jdbcVo);

        //3. 构建上下文
        final ExportdbContext context = ExportdbContext.newInstance().jdbcMapper(jdbcMapper);
        return this.exportdb.query(context, sql);
    }

    /**
     * 查询并且处理
     * @param queryResultHandler 查询结果处理器
     * @param sql sql
     * @since 0.0.1
     */
    public void queryAndHandle(final IQueryResultHandler queryResultHandler,
                               final String sql, final String... otherSqls) {
        QueryResultVo queryResultVo = this.query(sql);
        queryResultHandler.handler(queryResultVo);

        // 同理处理其他的 sql
        for(String os : otherSqls) {
            QueryResultVo vo = this.query(os);
            queryResultHandler.handler(vo);
        }
    }

    /**
     * 查询并且处理
     * @param queryResultHandler 查询结果处理器
     * @param sqlList sql 列表
     * @since 0.0.2
     */
    public void queryAndHandle(final IQueryResultHandler queryResultHandler,
                               List<String> sqlList) {
        // 同理处理其他的 sql
        for(String sql : sqlList) {
            System.out.println("开始执行:" + sql);
            QueryResultVo vo = this.query(sql);
            queryResultHandler.handler(vo);
        }
    }

    private JdbcMapper getJdbcMapper(JdbcVo jdbcVo) {
        if(DriverNameConstant.MYSQL.equalsIgnoreCase(driverName)) {
            return new MySqlJdbcMapper(jdbcVo);
        }
        if(DriverNameConstant.ORACLE.equalsIgnoreCase(driverName)) {
            return new OracleJdbcMapper(jdbcVo);
        }
        if(DriverNameConstant.SQL_SERVER.equalsIgnoreCase(driverName)) {
            return new SqlServerJdbcMapper(jdbcVo);
        }

        throw new UnsupportedOperationException();
    }

}

这里为用户提供了 mysql 最基本的配置,以及常用的查询处理方法。

测试

下面我们来看一下测试的效果:

直接查询

QueryResultVo resultVo = ExportdbBs.newInstance().query("select * from user;");
System.out.println(resultVo);

查询并处理

final String sql = "select * from user;";
final IQueryResultHandler handler = new FileJsonQueryResultHandler();
ExportdbBs.newInstance().queryAndHandle(handler, sql);

两次导出可以指定文件路径,比如分别是:

D:\exportdb\old\D:\exportdb\new\

针对两次结果对比

final String oldP = "D:\\exportdb\\old\\";
final String newP = "D:\\exportdb\\new\\";

CompareUtil.differDir(oldP, newP);

差异结果就会被输出到控台。

结果

一切顺利,不过革命尚未成功,同学仍需加班呀~~~

不足之处

这是一个 v0.0.1 版本,还有很多不足。

比如:

  • 导出为 csv
  • 导出为 insert/update 语句
  • 导出的文件名称自定义策略
  • 可以指定多个 sql 是否生成在同一个文件中
  • 导出路径根据操作系统,自动变更
  • 更加便于使用,比如页面指定数据源+sql,页面显示对应差异结果。

不过也基本可用,符合我们最初的设想。

小结

不知道你平时又是如何对比数据的呢?

如果你需要这个工具,可以关注【老马啸西风】,后台回复【对比】即可。

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马持续写作的最大动力!

公众号-无二维码.png

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 2020-11-28

java 如何实现 binary search 二分查找法?

思维导图

顺序查找

如果让你在一堆书架上找到自己想要的书,你会怎么找呢?

实际上最简单最粗暴的方式就是一本一本的看过去。

这个用计算机实现就对应着顺序查找。

概念

顺序查找适合于存储结构为顺序存储或链接存储的线性表。

基本思想:顺序查找也称为线形查找,属于无序查找算法。从数据结构线形表的一端开始,顺序扫描,依次将扫描到的结点关键字与给定值k相比较,若相等则表示查找成功;若扫描结束仍没有找到关键字等于k的结点,表示查找失败。

复杂度分析:

 
查找成功时的平均查找长度为:(假设每个数据元素的概率相等) ASL = 1/n(1+2+3+…+n) = (n+1)/2 ;

当查找不成功时,需要n+1次比较,时间复杂度为O(n);

所以,顺序查找的时间复杂度为O(n)。

复杂度

java 代码实现

以 java 代码为例:

/*
** 顺序查询算法
** @param arr 数组信息
** @param target 目标值
** @param arrLength 数组长度
*/
int foreachSearch(int arr[], int target, int arrLength) {
    int i;
    for(i = 0; i < arrLength; i++) {
        if(target == arr[i]) {
            return i;
        }
    }
    return -1;
}

java 改进版本

我们这个实现版本主要是为了弥补大部分网上实现的不足,很多实现就是一个 int 类型,适用范围不够广泛。

接口定义

为了后续的拓展性,我们定义查询接口及抽象实现。

package com.github.houbb.search.api;

import java.util.List;

/**
 * @author 老马啸西风
 * @since 0.0.1
 */
public interface ISearch<T> {

    /**
     * 执行元素的查询
     * @param list 列表
     * @param key 目标对象
     * @return 结果对应的下标
     * @since 0.0.1
     */
    int search(List<? extends Comparable<? super T>> list, T key);

}

抽象实现:

package com.github.houbb.search.core;

import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.heaven.util.util.CollectionUtil;
import com.github.houbb.search.api.ISearch;
import com.github.houbb.search.constant.SearchConst;

import java.util.List;

/**
 * 抽象查询类
 * @author 老马啸西风
 * @since 0.0.1
 */
public abstract class AbstractSearch<T> implements ISearch<T> {

    @Override
    public int search(List<? extends Comparable<? super T>> list, T key) {
        ArgUtil.notNull(key, "key");

        if(CollectionUtil.isEmpty(list)) {
            return SearchConst.NOT_FOUND;
        }

        return this.doSearch(list, key);
    }

    /**
     * 执行查询
     * @param list 列表
     * @param key key
     * @return 结果
     * @since 0.0.1
     */
    protected abstract int doSearch(List<? extends Comparable<? super T>> list, T key);

}

遍历实现

实现和前面基础版本类似。

package com.github.houbb.search.core;

import com.github.houbb.search.constant.SearchConst;

import java.util.List;

/**
 * 遍历查询法
 * @author 老马啸西风
 * @since 0.0.1
 */
public class ForeachSearch<T> extends AbstractSearch<T> {

    @Override
    @SuppressWarnings("all")
    protected int doSearch(List<? extends Comparable<? super T>> list, T key) {
        for(int i = 0; i < list.size(); i++) {
            Comparable comparable = list.get(i);
            if(comparable.compareTo(key) == 0) {
                return i;
            }
        }

        return SearchConst.NOT_FOUND;
    }

}

这个实现的适用范围故意被我们缩小为可比较类型了,实际上可以更加广泛一些,理论上只要能通过 equals() 比较的对象都可以。

其实主要是为了兼容我们下面要讲的二分查找法,也是我们本文的重点。

二分查找法

遍历查询非常的简单粗暴,不过性能也是比较差的。

如果要查找的数据已经排序过了,比如我们通过查看联系人时,一般可以通过姓名快速找到大概的位置,而不是从头到尾的看一遍。

这个通过计算机实现就是二分查找法。

概念

二分搜索(英语:binary search),也称折半搜索(英语:half-interval search)、对数搜索(英语:logarithmic search),是一种在有序数组中查找某一特定元素的搜索算法。

搜索过程从数组的中间元素开始,如果中间元素正好是要查找的元素,则搜索过程结束;

如果某一特定元素大于或者小于中间元素,则在数组大于或小于中间元素的那一半中查找,而且跟开始一样从中间元素开始比较。

如果在某一步骤数组为空,则代表找不到。这种搜索算法每一次比较都使搜索范围缩小一半。

复杂度概览

分类搜索算法
数据结构数组
最坏时间复杂度O(log(n))
最优时间复杂度O(1)
平均时间复杂度O(log(n))
空间复杂度迭代: O(1); 递归: O(log(n))

二分查找

步骤

(1)首先确定整个查找区间的中间位置 mid = ( left + right )/ 2

(2)用待查关键字值与中间位置的关键字值进行比较;

若相等,则查找成功

若大于,则在后(右)半个区域继续进行折半查找

若小于,则在前(左)半个区域继续进行折半查找

(3)对确定的缩小区域再按折半公式,重复上述步骤。

最后,得到结果:要么查找成功, 要么查找失败。

折半查找的存储结构采用一维数组存放。

注意:这里有一个前提,数组必须是有序的。如果元素无序怎么办呢?当然是先执行排序,然后再通过折半查找了。

排序算法不是本文重点,可以参考:

7 天时间,我整理并实现了这 9 种最经典的排序算法

java 代码实现

我们先来看一下网上最常见的两种实现。

递归实现

public static int binarySearch(int[] arr, int start, int end, int hkey){
    if (start > end)
        return -1;

    int mid = start + (end - start)/2;    //防止溢位
    if (arr[mid] > hkey)
        return binarySearch(arr, start, mid - 1, hkey);
    if (arr[mid] < hkey)
        return binarySearch(arr, mid + 1, end, hkey);
    return mid;  

}

递归实现的版本非常的简洁优雅。

循环实现

public static int binarySearch(int[] arr, int start, int end, int hkey){
    int result = -1;

    while (start <= end){
        int mid = start + (end - start)/2;    //防止溢位
        if (arr[mid] > hkey)
            end = mid - 1;
        else if (arr[mid] < hkey)
            start = mid + 1;
        else {
            result = mid ;  
            break;
        }
    }

    return result;
}

循环实现相对麻烦一些,不过性能一般会比递归好一点。

java 改良版本

上面的方法非常简洁,问题也同样明显。

如果我想比较查找 String、Long 等其他常见类型怎么办呢?

我们可以稍微改良一下上面的实现:

package com.github.houbb.search.core;

import com.github.houbb.search.constant.SearchConst;

import java.util.List;

/**
 * 折半
 * @author 老马啸西风
 * @since 0.0.1
 */
public class BinarySearch<T> extends AbstractSearch<T> {

    @Override
    @SuppressWarnings("all")
    protected int doSearch(List<? extends Comparable<? super T>> list, T key) {
        int low = 0;
        int high = list.size()-1;

        while (low <= high) {
            int mid = (low+high) / 2;

            Comparable comparable = list.get(mid);
            if(comparable.compareTo(key) == 0) {
                return mid;
            } else if(comparable.compareTo(key) < 0) {
                // 小于指定元素
                low = mid;
            } else {
                // 大于指定元素
                high = mid;
            }
        }

        return SearchConst.NOT_FOUND;
    }

}

这里我们将比较对象扩展为 Comparable 对象,然后通过 compareTo 方法进行比较。

开源工具

当然一般的查找算法到这里就结束了,但是老马却不这么认为。

知其然,知其所以然,所以我们要学习算法原理。

君子性非异也, 善假于物也,所以我们要学会使用工具。

算法应该被封装为简单可用的工具,便于使用。于是老马把上面的两种查询算法开源到了 maven 中央仓库,便于后期使用可拓展。

maven 引入

<dependency>
    <groupId>com.github.houbb</groupId>
    <artifactId>search</artifactId>
    <version>0.0.1</version>
</dependency>

使用

遍历查询

final List<String> list = Arrays.asList("1", "2", "3", "4", "5");
Assert.assertEquals(3, SearchHelper.foreach(list, "4"));

折半查询

final List<String> list = Arrays.asList("1", "2", "3", "4", "5");
Assert.assertEquals(3, SearchHelper.binary(list, "4"));

小结

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马持续写作的最大动力!

下一节我们将讲解一下二叉查询树,感兴趣的小伙伴可以关注一波,精彩内容,不容错过。

image

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 2020-11-23

7 天时间,我整理并实现了这 9 种最经典的排序算法

排序算法

回顾

我们前面已经介绍了 3 种最常见的排序算法:

java 实现冒泡排序讲解

QuickSort 快速排序到底快在哪里?

SelectionSort 选择排序算法详解(java 实现)

然而天下排序千千万,今天老马就和大家一起把最常见的几种都学习一遍。

堆排序

堆排序(英语:Heapsort)是指利用堆这种数据结构所设计的一种排序算法。

堆是一个近似完全二叉树的结构,并同时满足堆的性质:即子节点的键值或索引总是小于(或者大于)它的父节点。

基础知识 JCIP-11-二叉堆

最大堆

若以升序排序说明,把数组转换成最大堆(Max-Heap Heap),这是一种满足最大堆性质(Max-Heap Property)的二叉树:对于除了根之外的每个节点i, A[parent(i)] ≥ A[i]

堆是具有以下性质的完全二叉树:每个结点的值都大于或等于其左右孩子结点的值,称为大顶堆;或者每个结点的值都小于或等于其左右孩子结点的值,称为小顶堆。如下图:

重复从最大堆取出数值最大的结点(把根结点和最后一个结点交换,把交换后的最后一个结点移出堆),并让残余的堆维持最大堆性质。

输入图片说明

同时,我们对堆中的结点按层进行编号,将这种逻辑结构映射到数组中就是下面这个样子:

输入图片说明

堆节点的访问

通常堆是通过一维数组来实现的。

在数组起始位置为0的情形中:

则父节点和子节点的位置关系如下:

(01) 索引为i的左孩子的索引是 (2*i+1);

(02) 索引为i的左孩子的索引是 (2*i+2);

(03) 索引为i的父结点的索引是 floor((i-1)/2);

堆节点的访问

堆的操作

在堆的数据结构中,堆中的最大值总是位于根节点(在优先队列中使用堆的话堆中的最小值位于根节点)。

堆中定义以下几种操作:

最大堆调整(Max Heapify):将堆的末端子节点作调整,使得子节点永远小于父节点

创建最大堆(Build Max Heap):将堆中的所有数据重新排序

堆排序(HeapSort):移除位在第一个数据的根节点,并做最大堆调整的递归运算

堆排序算法图解

这个图解来自 图解排序算法(三)之堆排序,画的非常漂亮。

基本思想

将待排序序列构造成一个大顶堆,此时,整个序列的最大值就是堆顶的根节点。

将其与末尾元素进行交换,此时末尾就为最大值。

然后将剩余n-1个元素重新构造成一个堆,这样会得到n个元素的次小值。如此反复执行,便能得到一个有序序列了。

步骤

步骤一 构造初始堆

将给定无序序列构造成一个大顶堆(一般升序采用大顶堆,降序采用小顶堆)。

a. 假设给定无序序列结构如下

输入图片说明

b. 此时我们从最后一个非叶子结点开始(叶子结点自然不用调整,第一个非叶子结点 arr.length/2-1=5/2-1=1,也就是下面的6结点),从左至右,从下至上进行调整。

输入图片说明

c. 找到第二个非叶节点4,由于[4,9,8]中9元素最大,4和9交换。

输入图片说明

d. 这时,交换导致了子根[4,5,6]结构混乱,继续调整,[4,5,6]中6最大,交换4和6。

输入图片说明

此时,我们就将一个无序的序列构造成了一个大顶堆。

步骤二 调整堆

将堆顶元素与末尾元素进行交换,使末尾元素最大。

然后继续调整堆,再将堆顶元素与末尾元素交换,得到第二大元素。如此反复进行交换、重建、交换。

a. 将堆顶元素9和末尾元素4进行交换

输入图片说明

b. 重新调整结构,使其继续满足堆定义

输入图片说明

c. 再将堆顶元素8与末尾元素5进行交换,得到第二大元素8.

输入图片说明

后续过程,继续进行调整,交换,如此反复进行,最终使得整个序列有序

输入图片说明

简单总结

再简单总结下堆排序的基本思路:

a. 将无需序列构建成一个堆,根据升序降序需求选择大顶堆或小顶堆;

b. 将堆顶元素与末尾元素交换,将最大元素"沉"到数组末端;

c. 重新调整结构,使其满足堆定义,然后继续交换堆顶元素与当前末尾元素,反复执行调整+交换步骤,直到整个序列有序。

java 实现

说明

为了和前面的逻辑保持一致,我们暂时依然使用 list 去实现这个堆排序。

实现

package com.github.houbb.sort.core.api;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sort.core.util.InnerSortUtil;

import java.util.List;

/**
 * 堆排序
 *
 * @author binbin.hou
 * @since 0.0.4
 */
public class HeapSort extends AbstractSort {

    private static final Log log = LogFactory.getLog(HeapSort.class);

    @Override
    @SuppressWarnings("all")
    protected void doSort(List<?> original) {
        final int maxIndex = original.size() - 1;

        /*
         *  第一步:将数组堆化
         *  beginIndex = 第一个非叶子节点。
         *  从第一个非叶子节点开始即可。无需从最后一个叶子节点开始。
         *  叶子节点可以看作已符合堆要求的节点,根节点就是它自己且自己以下值为最大。
         */
        int beginIndex = original.size() / 2 - 1;
        for (int i = beginIndex; i >= 0; i--) {
            maxHeapify(original, i, maxIndex);
        }

        /*
         * 第二步:对堆化数据排序
         * 每次都是移出最顶层的根节点A[0],与最尾部节点位置调换,同时遍历长度 - 1。
         * 然后从新整理被换到根节点的末尾元素,使其符合堆的特性。
         * 直至未排序的堆长度为 0。
         */
        for (int i = maxIndex; i > 0; i--) {
            InnerSortUtil.swap(original, 0, i);
            maxHeapify(original, 0, i - 1);
        }
    }

    /**
     * 调整索引为 index 处的数据,使其符合堆的特性。
     *
     * @param list  列表
     * @param index 需要堆化处理的数据的索引
     * @param len   未排序的堆(数组)的长度
     * @since 0.0.4
     */
    @SuppressWarnings("all")
    private void maxHeapify(final List list, int index, int len) {
        int li = (index * 2) + 1; // 左子节点索引
        int ri = li + 1;           // 右子节点索引
        int cMax = li;             // 子节点值最大索引,默认左子节点。

        // 左子节点索引超出计算范围,直接返回。
        if (li > len) {
            return;
        }

        // 先判断左右子节点,哪个较大。
        if (ri <= len && InnerSortUtil.gt(list, ri, li)) {
            cMax = ri;
        }

        if (InnerSortUtil.gt(list, cMax, index)) {
            InnerSortUtil.swap(list, cMax, index);      // 如果父节点被子节点调换,
            maxHeapify(list, cMax, len);  // 则需要继续判断换下后的父节点是否符合堆的特性。
        }
    }

}

插入排序

插入排序(英语:Insertion Sort)是一种简单直观的排序算法。

它的工作原理是通过构建有序序列,对于未排序数据,在已排序序列中从后向前扫描,找到相应位置并插入。

插入排序在实现上,通常采用in-place排序(即只需用到 O(1) 的额外空间的排序),因而在从后向前扫描过程中,需要反复把已排序元素逐步向后挪位,为最新元素提供插入空间。

算法步骤

一般来说,插入排序都采用in-place在数组上实现。具体算法描述如下:

  1. 从第一个元素开始,该元素可以认为已经被排序
  2. 取出下一个元素,在已经排序的元素序列中从后向前扫描
  3. 如果该元素(已排序)大于新元素,将该元素移到下一位置
  4. 重复步骤3,直到找到已排序的元素小于或者等于新元素的位置
  5. 将新元素插入到该位置后
  6. 重复步骤2~5

排序

java 实现

java 实现

package com.github.houbb.sort.core.api;

import com.github.houbb.heaven.annotation.ThreadSafe;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sort.core.util.InnerSortUtil;

import java.util.List;

/**
 * 冒泡排序
 * @author binbin.hou
 * @since 0.0.5
 */
@ThreadSafe
public class InsertSort extends AbstractSort {

    private static final Log log = LogFactory.getLog(InsertSort.class);

    @Override
    @SuppressWarnings("all")
    public void doSort(List original) {
        for(int i = 1; i < original.size(); i++) {
            Comparable current = (Comparable) original.get(i);

            int j = i-1;
            // 从后向前遍历,把大于当前元素的信息全部向后移动。
            while (j >= 0 && InnerSortUtil.gt(original, j, current)) {
                // 元素向后移动一位
                original.set(j+1, original.get(j));
                j--;
            }

            // 将元素插入到对应的位置
            original.set(j+1, current);
        }
    }

}

希尔排序(Shellsort)

也称递减增量排序算法,是插入排序的一种更高效的改进版本。

希尔排序是非稳定排序算法。

希尔排序是基于插入排序的以下两点性质而提出改进方法的:

  1. 插入排序在对几乎已经排好序的数据操作时,效率高,即可以达到线性排序的效率
  2. 但插入排序一般来说是低效的,因为插入排序每次只能将数据移动一位

算法实现

希尔排序通过将比较的全部元素分为几个区域来提升插入排序的性能。

这样可以让一个元素可以一次性地朝最终位置前进一大步。然后算法再取越来越小的步长进行排序,算法的最后一步就是普通的插入排序,但是到了这步,需排序的数据几乎是已排好的了(此时插入排序较快)。

假设有一个很小的数据在一个已按升序排好序的数组的末端。如果用复杂度为O(n^2)的排序(冒泡排序或插入排序),可能会进行n次的比较和交换才能将该数据移至正确位置。

而希尔排序会用较大的步长移动数据,所以小数据只需进行少数比较和交换即可到正确位置

一个更好理解的希尔排序实现:将数组列在一个表中并对列排序(用插入排序)。重复这过程,不过每次用更长的列来进行。最后整个表就只有一列了。

将数组转换至表是为了更好地理解这算法,算法本身仅仅对原数组进行排序(通过增加索引的步长,例如是用i += step_size而不是i++)。

例子

例如,假设有这样一组数[ 13 14 94 33 82 25 59 94 65 23 45 27 73 25 39 10 ],如果我们以步长为5开始进行排序,我们可以通过将这列表放在有5列的表中来更好地描述算法,这样他们就应该看起来是这样:

13 14 94 33 82
25 59 94 65 23
45 27 73 25 39
10

然后我们对每列进行排序:

10 14 73 25 23
13 27 94 33 39
25 59 94 65 82
45

将上述四行数字,依序接在一起时我们得到:[ 10 14 73 25 23 13 27 94 33 39 25 59 94 65 82 45 ]。

这时10已经移至正确位置了,然后再以3为步长进行排序:

10 14 73
25 23 13
27 94 33
39 25 59
94 65 82
45

排序之后变为:

10 14 13
25 23 33
27 25 59
39 65 73
45 94 82
94

最后以1步长进行排序(此时就是简单的插入排序了)。

步长序列如何选择?

Donald Shell 最初建议步长选择为 n/2 并且对步长取半直到步长达到1。

虽然这样取可以比 O(n^2) 类的算法(插入排序)更好,但这样仍然有减少平均时间和最差时间的余地。

已知的最好步长序列是由 Sedgewick 提出的(1, 5, 19, 41, 109,...),

另一个在大数组中表现优异的步长序列是(斐波那契数列除去0和1将剩余的数以黄金分割比的两倍的幂进行运算得到的数列):

(1, 9, 34, 182, 836, 4025, 19001, 90358, 428481, 2034035, 9651787, 45806244, 217378076, 1031612713,…)

java 代码实现

实现

这里为了简单,我们步长直接选择列表长度的一半,并且依次折半。

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sort.core.util.InnerSortUtil;

import java.util.List;

/**
 * 希尔排序
 *
 * @author binbin.hou
 * @since 0.0.6
 */
public class ShellSort extends AbstractSort {

    private static final Log log = LogFactory.getLog(ShellSort.class);

    @Override
    @SuppressWarnings("all")
    protected void doSort(List<?> original) {
        // 步长从大到小
        for(int step = original.size()/2; step > 0; step /= 2) {
            // 从第 step 个元素,逐个执行插入排序
            for(int i = step; i < original.size(); i++) {
                int j = i;

                while ((j-step >= 0) && InnerSortUtil.lt(original, j, j-step)) {
                    // 执行交换
                    InnerSortUtil.swap(original, j, j-step);

                    if(log.isDebugEnabled()) {
                        log.debug("step: {}, j: {}, j-step: {}, list: {}",
                                step, j, j-step, original);
                    }

                    // 更新步长
                    j -= step;
                }
            }
        }
    }

}

整体实现也不难,大家可以回顾下 插入排序

这里为了便于大家理解,我们特意添加了日志。

归并排序(英语:Merge sort,或mergesort)

是创建在归并操作上的一种有效的排序算法,效率为 O(nlogn)(大O符号)。1945年由约翰·冯·诺伊曼首次提出。

该算法是采用分治法(Divide and Conquer)的一个非常典型的应用,且各层分治递归可以同时进行。

概述

采用分治法:

分割:递归地把当前序列平均分割成两半。

集成:在保持元素顺序的同时将上一步得到的子序列集成到一起(归并)。

java 实现递归法

递归法(Top-down)

  1. 申请空间,使其大小为两个已经排序序列之和,该空间用来存放合并后的序列
  2. 设定两个指针,最初位置分别为两个已经排序序列的起始位置
  3. 比较两个指针所指向的元素,选择相对小的元素放入到合并空间,并移动指针到下一位置
  4. 重复步骤3直到某一指针到达序列尾
  5. 将另一序列剩下的所有元素直接复制到合并序列尾

java 实现

实际上代码实现也不难,不过递归多多少少让人看起来不太习惯。

我们后面会结合测试日志,再进行讲解。

package com.github.houbb.sort.core.api;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * 归并排序-递归实现
 *
 * @author binbin.hou
 * @since 0.0.7
 */
public class MergeRecursiveSort extends AbstractSort {

    private static final Log log = LogFactory.getLog(MergeRecursiveSort.class);

    @Override
    @SuppressWarnings("all")
    protected void doSort(List<?> original) {
        // 存放归并的结果
        // 直接将数组填满,避免 set 出现越界
        List<?> resultList = new ArrayList<>(original);
        sortRecursive(original, resultList, 0, original.size()-1);
    }

    /**
     * 递归排序
     * @param originalList 原始列表
     * @param resultList 存放结果的列表
     * @param startIx 开始
     * @param endIx 结果
     * @since 0.0.7
     */
    @SuppressWarnings("all")
    private void sortRecursive(List originalList,
                               List resultList,
                               int startIx,
                               int endIx) {
        // 循环结束
        if(startIx >= endIx) {
            return;
        }

        // 找到中间位置,将列表一分为二
        int midIx = (startIx+endIx) / 2;
        int leftStart = startIx;
        int leftEnd = midIx;
        int rightStart = midIx+1;
        int rightEnd = endIx;

        if(log.isDebugEnabled()) {
            log.debug("拆分:ls: {}, le: {}, rs: {}, re: {}",
                    leftStart, leftEnd, rightStart, rightEnd);
        }

        // 递归调用
        sortRecursive(originalList, resultList, leftStart, leftEnd);
        sortRecursive(originalList, resultList, rightStart, rightEnd);

        if(log.isDebugEnabled()) {
            log.debug("操作:ls: {}, le: {}, rs: {}, re: {}",
                    leftStart, leftEnd, rightStart, rightEnd);
        }

        // 这里需要通过 k 记录一下开始的位置
        int k = startIx;
        while (leftStart <= leftEnd && rightStart <= rightEnd) {
            //相对小的元素放入到合并空间,并移动指针到下一位置
            Comparable left = (Comparable) originalList.get(leftStart);
            Comparable right = (Comparable) originalList.get(rightStart);

            // 左边较小,则放入合并空间
            if(left.compareTo(right) < 0) {
                resultList.set(k++, left);
                leftStart++;
            } else {
                resultList.set(k++, right);
                rightStart++;
            }
        }

        // 如果列表比较结束,将剩下的元素,全部放入到队列中。
        while (leftStart <= leftEnd) {
            resultList.set(k++, originalList.get(leftStart++));
        }
        while (rightStart <= rightEnd) {
            resultList.set(k++, originalList.get(rightStart++));
        }

        // 将结果统一拷贝到原始集合中
        for(int i = startIx; i <= endIx; i++) {
            originalList.set(i, resultList.get(i));
        }
    }

}

java 迭代实现

相信很多小伙伴都知道迭代可以使得代码变得简洁,但是会让调试和理解变得复杂。

我们来一起学习一下迭代的实现方式。

迭代法(Bottom-up)

原理如下(假设序列共有 n 个元素):

  1. 将序列每相邻两个数字进行归并操作,形成 ceil(n/2) 个序列,排序后每个序列包含两/一个元素
  2. 若此时序列数不是1个则将上述序列再次归并,形成 ceil(n/4) 个序列,每个序列包含四/三个元素
  3. 重复步骤2,直到所有元素排序完毕,即序列数为1

迭代实现

相对递归,这个代码就要显得复杂很多。

不过这种迭代的方式性能更好,实现如下。

package com.github.houbb.sort.core.api;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sort.core.util.InnerSortUtil;

import java.util.ArrayList;
import java.util.List;

/**
 * 归并排序-迭代实现
 *
 * @author binbin.hou
 * @since 0.0.7
 */
public class MergeSort extends AbstractSort {

    private static final Log log = LogFactory.getLog(MergeSort.class);

    @Override
    protected void doSort(List<?> original) {
        // 存放归并的结果
        // 直接将数组填满,避免 set 出现越界
        List<?> resultList = new ArrayList<>(original);

        //起始,子序列长度为1。对长度为1的序列进行两两合并
        int k = 1;
        final int length = original.size();
        while (k < length) {
            mergePass(original, resultList, k, length);//将原先无序的数据两两归并入归并数组
            k = 2 * k;//子序列长度加倍
            mergePass(resultList, original, k, length);//将归并数组中已经两两归并的有序序列再归并回数组 original
            k = 2 * k;//子序列长度加倍
        }
    }

    /**
     * 负责将数组中的相邻的有k个元素的字序列进行归并
     *
     * @param original 原始列表
     * @param results 结果列表
     * @param s  子序列长度
     * @param len 长度
     * @since 0.0.7
     */
    @SuppressWarnings("all")
    private static void mergePass(List original, List results, int s, int len) {
        int i = 0;

        // 写成(i + 2 * k - 1 < len),就会把(i+2*k-1)当做一个整体看待
        // 从前往后,将2个长度为k的子序列合并为1个。
        // 对于序列{3, 4, 2, 5, 7, 0, 9, 8, 1, 6},当k=8的时候,因为i>(len-2*k+1),所以根本没有进入while循环
        while (i < len - 2 * s + 1) {
            merge(original, results, i, i + s - 1, i + 2 * s - 1);//两两归并
            i = i + 2 * s;
        }

        // 将那些“落单的”长度不足两两merge的部分和前面merge起来。
        // (连接起来之前也是要进行排序的,因此有了下面的merge操作)
        if (i < len - s + 1) {
            merge(original, results, i, i + s - 1, len - 1);//归并最后两个序列
        } else {
            for (int j = i; j < len; j++) {//若最后只剩下单个子序列
                results.set(j, original.get(j));
            }
        }
    }

    /**
     * 将两个有序数组合并成一个有序数组
     * @param original 原始
     * @param result 结果
     * @param low 开始
     * @param mid 中间
     * @param high 结束
     * @since 0.0.7
     */
    @SuppressWarnings("all")
    private static void merge(List original, List result, int low, int mid, int high) {
        int j, k, l;

        // 将记录由小到大地放进temp数组
        for (j = mid + 1, k = low; low <= mid && j <= high; k++) {
            if (InnerSortUtil.lt(original, low, j)) {
                result.set(k, original.get(low++));
            } else {
                result.set(k, original.get(j++));
            }
        }

        //接下来两循环是为了将剩余的(比另一边多出来的个数)放到temp数组中
        if (low <= mid) {
            for (l = 0; l <= mid - low; l++) {
                result.set(k + l, original.get(low + l));
            }
        }
        if (j <= high) {
            for (l = 0; l <= high - j; l++) {
                result.set(k + l, original.get(j + l));
            }
        }
    }

}

counting sort 计数排序

计数排序(Counting sort)是一种稳定的线性时间排序算法。

该算法于1954年由 Harold H. Seward 提出。

通过计数将时间复杂度降到了 O(N)

基础版

算法步骤

  1. 找出原数组中元素值最大的,记为max。
  2. 创建一个新数组count,其长度是max加1,其元素默认值都为0。
  3. 遍历原数组中的元素,以原数组中的元素作为count数组的索引,以原数组中的元素出现次数作为count数组的元素值
  4. 创建结果数组result,起始索引index。
  5. 遍历count数组,找出其中元素值大于0的元素,将其对应的索引作为元素值填充到result数组中去,每处理一次,count中的该元素值减1,直到该元素值不大于0,依次处理count中剩下的元素。
  6. 返回结果数组 result。

java 实现

package com.github.houbb.sort.core.api;

import com.github.houbb.heaven.annotation.ThreadSafe;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sort.core.util.InnerSortUtil;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * 计数排序
 *
 * @author binbin.hou
 * @since 0.0.8
 */
@ThreadSafe
public class CountingSortBasic extends AbstractSort {

    private static final Log log = LogFactory.getLog(CountingSortBasic.class);

    @Override
    @SuppressWarnings("all")
    public void doSort(List original) {
        //1. 获取最大的元素
        int max = Integer.MIN_VALUE;
        for (Object object : original) {
            Integer integer = (Integer) object;
            max = Math.max(max, integer);
        }

        //2. 构建 count 列表
        int[] counts = new int[max + 1];
        //3.遍历原数组中的元素,以原数组中的元素作为count数组的索引,以原数组中的元素出现次数作为count数组的元素值。
        for (Object object : original) {
            Integer integer = (Integer) object;
            counts[integer]++;
        }

        //4. 结果构建
        int index = 0;
        // 遍历计数数组,将计数数组的索引填充到结果数组中
        for (int i = 0; i < counts.length; i++) {
            while (counts[i] > 0) {
                // i 实际上就是元素的值
                // 从左到右遍历,元素自然也就排序好了。
                // 相同的元素会出现多次,所以才需要循环。
                original.set(index++, i);
                counts[i]--;

                if(log.isDebugEnabled()) {
                    log.debug("结果数组:{}", original);
                }
            }
        }
    }

}

改良版

空间浪费

实际上我们创建一个比最大元素还要大1的数组,只是为了放下所有的元素而已。

但是它有一个缺陷,那就是存在空间浪费的问题。

比如一组数据{101,109,102,110},其中最大值为110,按照基础版的思路,我们需要创建一个长度为111的计数数组,但是我们可以发现,它前面的[0,100]的空间完全浪费了,那怎样优化呢?

将数组长度定为 max-min+1,即不仅要找出最大值,还要找出最小值,根据两者的差来确定计数数组的长度。

改良版本实现

import com.github.houbb.heaven.annotation.ThreadSafe;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

import java.util.Arrays;
import java.util.List;

/**
 * 计数排序
 *
 * @author binbin.hou
 * @since 0.0.8
 */
@ThreadSafe
public class CountingSort extends AbstractSort {

    private static final Log log = LogFactory.getLog(CountingSort.class);

    @Override
    @SuppressWarnings("all")
    public void doSort(List original) {
        //1. 获取最大、最小的元素
        int max = Integer.MIN_VALUE;
        int min = Integer.MAX_VALUE;
        for (Object object : original) {
            Integer integer = (Integer) object;
            max = Math.max(max, integer);
            min = Math.min(min, integer);
        }

        //2. 构建 count 列表
        int[] counts = new int[max-min + 1];
        //3.遍历原数组中的元素,以原数组中的元素作为count数组的索引,以原数组中的元素出现次数作为count数组的元素值。
        for (Object object : original) {
            Integer integer = (Integer) object;
            // 元素要减去最小值,再作为新索引
            counts[integer-min]++;
        }

        if(log.isDebugEnabled()) {
            log.debug("counts.length: {}", counts.length);
        }
        //4. 结果构建
        int index = 0;
        // 遍历计数数组,将计数数组的索引填充到结果数组中
        for (int i = 0; i < counts.length; i++) {
            while (counts[i] > 0) {
                // i 实际上就是元素的值
                // 从左到右遍历,元素自然也就排序好了。
                // 相同的元素会出现多次,所以才需要循环。
                // 这里将减去的最小值统一加上
                original.set(index++, i+min);
                counts[i]--;

                if(log.isDebugEnabled()) {
                    log.debug("结果数组:{}", original);
                }
            }
        }
    }

}

自己的思考

算法的本质

这个算法的本质是什么呢?

个人理解只需要保证两点:

(1)每一个元素,都有自己的一个元素位置

(2)相同的元素,次数会增加。

算法的巧妙之处在于直接利用数值本身所谓索引,直接跳过了排序比较;利用技数,解决了重复数值的问题。

算法的不足

这个算法的巧妙之处,同时也是对应的限制:那就是只能直接比较数字。如果是字符串呢?

一点想法

我最初的想法就是可以使用类似于 HashMap 的数据结构。这样可以解决元素过滤,次数统计的问题。

但是无法解决排序问题。

当然了,如果使用 TreeMap 就太赖皮了,因为本身就是利用了树进行排序。

TreeMap 版本

我们这里使用 TreeMap 主要有下面的目的:

(1)让排序不局限于数字。

(2)大幅度降低内存的浪费,不多一个元素,也不少一个元素。

思想实际上依然是技术排序的思想。

package com.github.houbb.sort.core.api;

import com.github.houbb.heaven.annotation.ThreadSafe;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

import java.util.List;
import java.util.Map;
import java.util.TreeMap;

/**
 * 计数排序-TreeMap
 *
 * @author binbin.hou
 * @since 0.0.8
 */
@ThreadSafe
public class CountingSortTreeMap extends AbstractSort {

    private static final Log log = LogFactory.getLog(CountingSortTreeMap.class);

    @Override
    @SuppressWarnings("all")
    public void doSort(List original) {
        TreeMap<Comparable, Integer> countMap = new TreeMap<>();

        // 初始化次数
        for (Object object : original) {
            Comparable comparable = (Comparable) object;

            Integer count = countMap.get(comparable);
            if(count == null) {
                count = 0;
            }
            count++;
            countMap.put(comparable, count);
        }

        //4. 结果构建
        int index = 0;
        // 遍历计数数组,将计数数组的索引填充到结果数组中
        for (Map.Entry<Comparable, Integer> entry : countMap.entrySet()) {
            int count = entry.getValue();
            Comparable key = entry.getKey();
            while (count > 0) {
                // i 实际上就是元素的值
                // 从左到右遍历,元素自然也就排序好了。
                // 相同的元素会出现多次,所以才需要循环。
                original.set(index++, key);
                count--;
            }
        }
    }

}

桶排序(Bucket sort)

或所谓的箱排序,是一个排序算法,工作的原理是将数组分到有限数量的桶里。

每个桶再个别排序(有可能再使用别的排序算法或是以递归方式继续使用桶排序进行排序)。

桶排序是鸽巢排序的一种归纳结果。当要被排序的数组内的数值是均匀分配的时候,桶排序使用线性时间 O(n)

桶排序是计数排序的扩展版本,计数排序可以看成每个桶只存储相同元素,而桶排序每个桶存储一定范围的元素,通过映射函数,将待排序数组中的元素映射到各个对应的桶中,对每个桶中的元素进行排序,最后将非空桶中的元素逐个放入原序列中。

桶排序需要尽量保证元素分散均匀,否则当所有数据集中在同一个桶中时,桶排序失效。

算法流程

桶排序以下列程序进行:

  1. 设置一个定量的数组当作空桶子。
  2. 寻访序列,并且把项目一个一个放到对应的桶子去。
  3. 对每个不是空的桶子进行排序。
  4. 从不是空的桶子里把项目再放回原来的序列中。

算法

java 实现

实现

package com.github.houbb.sort.core.api;

import com.github.houbb.heaven.annotation.ThreadSafe;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * 桶排序
 *
 * @author binbin.hou
 * @since 0.0.9
 */
@ThreadSafe
public class BucketSort extends AbstractSort {

    private static final Log log = LogFactory.getLog(BucketSort.class);

    @Override
    @SuppressWarnings("all")
    public void doSort(List original) {
        final int step = 10;

        // 计算最小值
        int min = Integer.MAX_VALUE;
        int max = Integer.MIN_VALUE;
        for(Object object : original) {
            Integer integer = (Integer) object;
            min = Math.min(min, integer);
            max = Math.max(max, integer);
        }

        //2. 计算桶的个数
        int bucketNum = (max-min) / step + 1;;
        //2.1 初始化临时列表
        List<List<Integer>> tempList = new ArrayList<>(bucketNum);
        for(int i = 0; i < bucketNum; i++) {
            tempList.add(new ArrayList<Integer>());
        }

        //3. 将元素放入桶中
        // 这里有一个限制:要求元素必须一个左边的桶元素,要小于右边的桶。
        // 这就限制了只能是数字类的比较,不然没有范围的概念
        for(Object o : original) {
            Integer integer = (Integer) o;
            int index = (integer-min) / step;

            tempList.get(index).add(integer);
        }

        // 4. 针对单个桶进行排序
        // 可以选择任意你喜欢的算法
        for(int i = 0; i < bucketNum; i++) {
            Collections.sort(tempList.get(i));
        }

        //5. 设置结果
        int index = 0;
        for(int i = 0; i < bucketNum; i++) {
            List<Integer> integers = tempList.get(i);

            for(Integer val : integers) {
                original.set(index++, val);
            }
        }
    }

}

开源地址

为了便于大家学习,上面的排序已经开源,开源地址:

https://github.com/houbb/sort

欢迎大家 fork/star,鼓励一下作者~~

小结

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马持续写作的最大动力!

image

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 2020-11-02

java 实现跳表(skiplist)及论文解读

Skiplist.png

什么是跳跃表

跳表由William Pugh发明。

他在论文 《Skip lists: a probabilistic alternative to balanced trees》中详细介绍了跳表的数据结构和插入删除等操作。

跳表是一种可以用来代替平衡树的数据结构,跳表使用概率平衡而不是严格执行的平衡,因此,与等效树的等效算法相比,跳表中插入和删除的算法要简单得多,并且速度要快得多。

Skip_list.svg.png

为什么需要?

性能比较好。

实现相对于红黑树比较简单。

占用更少的内存。

论文解读

为了学习第一手的资料,我们先学习一下论文,然后再结合网上的文章,实现一个 java 版本的 skip-list。

William Pugh

二叉树可用于表示抽象数据类型,例如字典和有序列表。

当元素以随机顺序插入时,它们可以很好地工作。某些操作序列(例如按顺序插入元素)产生了生成的数据结构,这些数据结构的性能非常差。

如果可以随机排列要插入的项目列表,则对于任何输入序列,树都将很好地工作。在大多数情况下,必须在线回答查询,因此随机排列输入是不切实际的。

平衡树算法会在执行操作时重新排列树,以保持一定的平衡条件并确保良好的性能。

skiplist是平衡树的一种概率替代方案

通过咨询随机数生成器来平衡skiplist。尽管skiplist在最坏情况下的性能很差,但是没有任何输入序列会始终产生最坏情况的性能(就像枢轴元素随机选择时的快速排序一样)。

skiplist数据结构不太可能会严重失衡(例如,对于超过250个元素的字典,搜索所花费的时间超过预期时间的3倍的机会少于百万分之一)。类似于通过随机插入构建的搜索树,但不需要插入即可是随机的。

概率性地平衡数据结构比显式地保持平衡容易。

ps: 大部分程序员可以手写 skip-list,但是手写一个红黑树就要复杂的多。

对于许多应用程序,skiplist比树更自然地表示,这也导致算法更简单

skiplist算法的简单性使其更易于实现,并且在平衡树和自调整树算法上提供了显着的恒定因子速度改进。

skiplist也非常节省空间。它们可以轻松配置为每个元素平均需要 4/3 个指针(甚至更少),并且不需要将平衡或优先级信息与每个节点一起存储。

算法核心思想

对于一个linked list来说,如果要查找某个元素,我们可能需要遍历整个链表。

如果list是有序的,并且每两个结点都有一个指针指向它之后两位的结点(Figure 1b),那么我们可以通过查找不超过 ⌈n/2⌉+1 个结点来完成查找。

如果每四个结点都有一个指针指向其之后四位的结点,那么只需要检查最多 ⌈n/4⌉+2 个结点(Figure 1c)。

如果所有的第(2^i)个结点都有一个指针指向其2^i之后的结点(Figure 1d),那么最大需要被检查的结点个数为 ⌈log_n2⌉,代价仅仅是将需要的指针数量加倍。

这种数据结构的查询效率很高,但是对它的插入和删除几乎是不可实现的(impractical)。

接下来看下论文中的一张图:

skip-list

因为这样的数据结构是基于链表的,并且额外的指针会跳过中间结点,所以作者称之为跳表(Skip Lists)

结构

从图中可以看到, 跳跃表主要由以下部分构成:

表头(head):负责维护跳跃表的节点指针。

跳跃表节点:保存着元素值,以及多个层。

层:保存着指向其他元素的指针。高层的指针越过的元素数量大于等于低层的指针,为了提高查找的效率,程序总是从高层先开始访问,然后随着元素值范围的缩小,慢慢降低层次。

表尾:全部由 NULL 组成,表示跳跃表的末尾。

skip-list 算法过程

本节提供了在字典或符号表中搜索,插入和删除元素的算法。

搜索操作将返回与所需的键或失败的键关联的值的内容(如果键不存在)。

插入操作将指定的键与新值相关联(如果尚未存在,则插入该键)。

Delete操作删除指定的密钥。

易于支持其他操作,例如“查找最小键”或“查找下一个键”。每个元素由一个节点表示,插入节点时,其级别是随机选择的,而不考虑元素的数量在数据结构中。

级别i节点具有i个前向指针,索引从1到i。

我们不需要在节点中存储节点的级别。级别以某个适当的常量MaxLevel进行上限。

list 的级别是列表中当前的最大级别(如果列表为空,则为1)。

列表的标题具有从一级到MaxLevel的前向指针。

标头的前向指针在高于列表的当前最大级别的级别上指向NIL

初始化

分配元素NIL并为其提供比任何合法密钥更大的密钥。

所有skiplist的所有级别均以NIL终止。

初始化一个新列表,以使列表的级别等于1,并且列表标题的所有前向指针都指向NIL

搜索算法

我们通过遍历不超过包含要搜索元素的节点的前向指针来搜索元素(图2)。

如果在当前的前向指针级别上无法再取得任何进展,则搜索将向下移动到下一个级别。

当我们无法在1级进行更多处理时,我们必须紧靠在包含所需元素的节点之前(如果它在列表中)

skiplist-02.PN

插入和删除算法

要插入或删除节点,我们只需进行搜索和拼接,如图3所示。

skiplist-03

图4给出了插入和删除的算法。

保持向量更新,以便在搜索完成时(并且我们准备执行拼接),update [i]包含一个指向级别i或更高级别的最右边节点的指针,该指针位于插图位置的左侧 /删除。

如果插入生成的节点的级别大于列表的先前最大级别,则我们将更新列表的最大级别,并初始化更新向量的适当部分。

每次删除后,我们检查是否删除了列表的最大元素,如果删除了,请减小列表的最大级别。

skiplist-04

选择一个随机级别

最初,我们讨论了概率分布,其中一半的具有i指针的节点也具有i + 1指针。

为了摆脱魔术常数,我们说具有i指针的节点的一小部分也具有i + 1指针。 (对于我们最初的讨论,p = 1/2)。

通过与图5中等效的算法随机生成级别。

生成级别时不参考列表中元素的数量。

skiplist-05

我们从什么级别开始搜索?定义L(n)

在用p = 1/2生成的16个元素的skiplist中,我们可能会遇到9个1级元素,3个2级元素,3个3级元素和1个14级元素(这是不太可能的,但是可以发生)。

我们应该如何处理呢?

如果我们使用标准算法并从14级开始搜索,我们将做很多无用的工作。

我们应该从哪里开始搜索?

我们的分析表明,理想情况下,我们将在期望 1/p 个节点的级别L处开始搜索。

当 L = log_(1/p)n 时,会发生这种情况。

由于我们将经常引用此公式,因此我们将使用 L(n) 表示 log_(1/p)n。

对于决定如何处理列表中异常大的元素的情况,有许多解决方案。

别担心,要乐观些。

只需从列表中存在的最高级别开始搜索即可。

正如我们将在分析中看到的那样,n个元素列表中的最大级别明显大于 L(n) 的概率非常小。

从列表中的最高级别开始搜索不会给预期搜索时间增加一个很小的常数。

这是本文描述的算法中使用的方法

使用少于给定的数量。

尽管一个元素可能包含14个指针的空间,但我们不需要全部使用14个指针。

我们可以选择仅使用 L(n) 级。

有很多方法可以实现此目的,但是它们都使算法复杂化并且不能显着提高性能,因此不建议使用此方法。

修复随机性(dice)

如果我们生成的随机级别比列表中的当前最大级别大一倍以上,则只需将列表中当前的最大级别再加上一个作为新节点的级别即可。

在实践中,从直观上看,此更改似乎效果很好。

但是,由于节点的级别不再是完全随机的,这完全破坏了我们分析结果算法的能力。

程序员可能应该随意实现它,而纯粹主义者则应避免这样做。

确定MaxLevel

由于我们可以安全地将级别限制为 L(n),因此我们应该选择 MaxLevel = L(n)(其中N是skiplist中元素数量的上限)。

如果 p = 1/2,则使用 MaxLevel = 16适用于最多包含216个元素的数据结构。

ps: maxLevel 可以通过元素个数+P的值推导出来。

针对 P,作者的建议使用 p = 1/4。后面的算法分析部分有详细介绍,篇幅较长,感兴趣的同学可以在 java 实现之后阅读到。

Java 实现版本

加深印象

我们无论看理论觉得自己会了,然而常常是眼高手低。

最好的方式就是自己写一遍,这样印象才能深刻。

节点定义

我们可以认为跳表就是一个加强版本的链表。

所有的链表都需要一个节点 Node,我们来定义一下:

/**
 * 元素节点
 * @param <E> 元素泛型
 * @author 老马啸西风
 */
private static class SkipListNode<E> {
    /**
     * key 信息
     * <p>
     * 这个是什么?index 吗?
     *
     * @since 0.0.4
     */
    int key;
    /**
     * 存放的元素
     */
    E value;
    /**
     * 向前的指针
     * <p>
     * 跳表是多层的,这个向前的指针,最多和层数一样。
     *
     * @since 0.0.4
     */
    SkipListNode<E>[] forwards;

    @SuppressWarnings("all")
    public SkipListNode(int key, E value, int maxLevel) {
        this.key = key;
        this.value = value;
        this.forwards = new SkipListNode[maxLevel];
    }
    @Override
    public String toString() {
        return "SkipListNode{" +
                "key=" + key +
                ", value=" + value +
                ", forwards=" + Arrays.toString(forwards) +
                '}';
    }
}

事实证明,链表中使用 array 比使用 List 可以让代码变得简洁一些。

至少在阅读起来更加直,第一遍就是用 list 实现的,后来不全部重写了。

对比如下:

newNode.forwards[i] = updates[i].forwards[i];   //数组

newNode.getForwards().get(i).set(i, updates.get(i).getForwards(i)); //列表

查询实现

查询的思想很简单:我们从最高层开始从左向右找(最上面一层可以最快定位到我们想找的元素大概位置),如果 next 元素大于指定的元素,就往下一层开始找。

任何一层,找到就直接返回对应的值。

找到最下面一层,还没有值,则说明元素不存在。

/**
 * 执行查询
 * @param searchKey 查找的 key
 * @return 结果
 * @since 0.0.4
 * @author 老马啸西风
 */
public E search(final int searchKey) {
    // 从左边最上层开始向右
    SkipListNode<E> c = this.head;
    // 从已有的最上层开始遍历
    for(int i = currentLevel-1; i >= 0; i--) {
        while (c.forwards[i].key < searchKey) {
            // 当前节点在这一层直接向前
            c = c.forwards[i];
        }
        // 判断下一个元素是否满足条件
        if(c.forwards[i].key == searchKey) {
            return c.forwards[i].value;
        }
    }
    // 查询失败,元素不存在。
    return null;
}

ps: 网上的很多实现都是错误的。大部分都没有理解到 skiplist 查询的精髓。

插入

若key不存在,则插入该key与对应的value;若key存在,则更新value。

如果待插入的结点的层数高于跳表的当前层数currentLevel,则更新currentLevel。

选择待插入结点的层数randomLevel:

randomLevel只依赖于跳表的最高层数和概率值p。

算法在后面的代码中。

另一种实现方法为,如果生成的randomLevel大于当前跳表的层数currentLevel,那么将randomLevel设置为currentLevel+1,这样方便以后的查找,在工程上是可以接受的,但同时也破坏了算法的随机性。

/**
 * 插入元素
 *
 *
 * @param searchKey 查询的 key
 * @param newValue 元素
 * @since 0.0.4
 * @author 老马啸西风
 */
@SuppressWarnings("all")
public void insert(int searchKey, E newValue) {
    SkipListNode<E>[] updates = new SkipListNode[maxLevel];
    SkipListNode<E> curNode = this.head;
    for (int i = currentLevel - 1; i >= 0; i--) {
        while (curNode.forwards[i].key < searchKey) {
            curNode = curNode.forwards[i];
        }
        // curNode.key < searchKey <= curNode.forward[i].key
        updates[i] = curNode;
    }
    // 获取第一个元素
    curNode = curNode.forwards[0];
    if (curNode.key == searchKey) {
        // 更新对应的值
        curNode.value = newValue;
    } else {
        // 插入新元素
        int randomLevel = getRandomLevel();
        // 如果层级高于当前层级,则更新 currentLevel
        if (this.currentLevel < randomLevel) {
            for (int i = currentLevel; i < randomLevel; i++) {
                updates[i] = this.head;
            }
            currentLevel = randomLevel;
        }
        // 构建新增的元素节点
        //head==>new  L-1
        //head==>pre==>new L-0
        SkipListNode<E> newNode = new SkipListNode<>(searchKey, newValue, randomLevel);
        for (int i = 0; i < randomLevel; i++) {
            newNode.forwards[i] = updates[i].forwards[i];
            updates[i].forwards[i] = newNode;
        }
    }
}

其中 getRandomLevel 是一个随机生成 level 的方法。

/**
 * 获取随机的级别
 * @return 级别
 * @since 0.0.4
 */
private int getRandomLevel() {
    int lvl = 1;
    //Math.random() 返回一个介于 [0,1) 之间的数字
    while (lvl < this.maxLevel && Math.random() < this.p) {
        lvl++;
    }
    return lvl;
}

个人感觉 skiplist 非常巧妙的一点就是利用随机达到了和平衡树类似的平衡效果。

不过也正因为随机,每次的链表生成的都不同。

删除

删除特定的key与对应的value。

如果待删除的结点为跳表中层数最高的结点,那么删除之后,要更新currentLevel。

/**
 * 删除一个元素
 * @param searchKey 查询的 key
 * @since 0.0.4
* @author 老马啸西风
 */
@SuppressWarnings("all")
public void delete(int searchKey) {
    SkipListNode<E>[] updates = new SkipListNode[maxLevel];
    SkipListNode<E> curNode = this.head;
    for (int i = currentLevel - 1; i >= 0; i--) {
        while (curNode.forwards[i].key < searchKey) {
            curNode = curNode.forwards[i];
        }
        // curNode.key < searchKey <= curNode.forward[i].key
        // 设置每一层对应的元素信息
        updates[i] = curNode;
    }
    // 最下面一层的第一个指向的元素
    curNode = curNode.forwards[0];
    if (curNode.key == searchKey) {
        for (int i = 0; i < currentLevel; i++) {
            if (updates[i].forwards[i] != curNode) {
                break;
            }
            updates[i].forwards[i] = curNode.forwards[i];
        }
        // 移除无用的层级
        while (currentLevel > 0 && this.head.forwards[currentLevel-1] ==  this.NIL) {
            currentLevel--;
        }
    }
}

输出跳表

为了便于测试,我们实现一个输出跳表的方法。

/**
 * 打印 list
 * @since 0.0.4
 */
public void printList() {
    for (int i = currentLevel - 1; i >= 0; i--) {
        SkipListNode<E> curNode = this.head.forwards[i];
        System.out.print("HEAD->");
        while (curNode != NIL) {
            String line = String.format("(%s,%s)->", curNode.key, curNode.value);
            System.out.print(line);
            curNode = curNode.forwards[i];
        }
        System.out.println("NIL");
    }
}

测试

public static void main(String[] args) {
    SkipList<String> list = new SkipList<>();
    list.insert(3, "耳朵听声音");
    list.insert(7, "镰刀来割草");
    list.insert(6, "口哨嘟嘟响");
    list.insert(4, "红旗迎风飘");
    list.insert(2, "小鸭水上漂");
    list.insert(9, "勺子能吃饭");
    list.insert(1, "铅笔细又长");
    list.insert(5, "秤钩来买菜");
    list.insert(8, "麻花扭一扭");
    list.printList();
    System.out.println("---------------");
    list.delete(3);
    list.delete(4);
    list.printList();
    System.out.println(list.search(8));
}

日志如下:

HEAD->(5,秤钩来买菜)->(6,口哨嘟嘟响)->NIL
HEAD->(1,铅笔细又长)->(2,小鸭水上漂)->(3,耳朵听声音)->(4,红旗迎风飘)->(5,秤钩来买菜)->(6,口哨嘟嘟响)->(7,镰刀来割草)->(8,麻花扭一扭)->(9,勺子能吃饭)->NIL
---------------
HEAD->(5,秤钩来买菜)->(6,口哨嘟嘟响)->NIL
HEAD->(1,铅笔细又长)->(2,小鸭水上漂)->(5,秤钩来买菜)->(6,口哨嘟嘟响)->(7,镰刀来割草)->(8,麻花扭一扭)->(9,勺子能吃饭)->NIL
麻花扭一扭

小结

SkipList 是非常巧妙的一个数据结构,到目前为止,我还是不能手写红黑树,不过写跳表相对会轻松很多。给论文作者点赞!

下一节让我们一起 jdk 中的 ConcurrentSkipListSet 数据结构,感受下 java 官方实现的魅力。

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马持续写作的最大动力!

深度学习

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 2020-10-31

CopyOnWriteArrayList 使用入门及源码详解

CopyOnWriteArrayList.png

CopyOnWriteArrayList

官方定义

CopyOnWriteArrayList是ArrayList的线程安全变体,其中通过创建底层数组的新副本来实现所有可变操作(添加,设置等)。

这通常成本太高,但是当遍历操作大大超过突变时,它可能比替代方法更有效,并且当您不能或不想同步遍历但需要排除并发线程之间的干扰时非常有用。

“快照”样式迭代器方法在创建迭代器时使用对数组状态的引用。

这个数组在迭代器的生命周期中永远不会改变,所以干扰是不可能的,并且保证迭代器不会抛出ConcurrentModificationException。自迭代器创建以来,迭代器不会反映列表的添加,删除或更改。不支持对迭代器本身进行元素更改操作(删除,设置和添加)。这些方法抛出UnsupportedOperationException。

允许所有元素,包括null。

内存一致性效果:与其他并发集合一样,在将对象放入CopyOnWriteArrayList之前,线程中的操作发生在从另一个线程中的CopyOnWriteArrayList访问或删除该元素之后的操作之前。

CopyOnWriteArrayList-view.png

使用例子

网上这种代码大同小异。

ArrayList 版本

下面来看一个列子:两个线程一个线程循环读取,一个线程修改list的值。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CopyOnWriteArrayListDemo {
    /**
     * 读线程
     */
    private static class ReadTask implements Runnable {
        List<String> list;

        public ReadTask(List<String> list) {
            this.list = list;
        }

        public void run() {
            for (String str : list) {
                System.out.println(str);
            }
        }
    }
    /**
     * 写线程
     */
    private static class WriteTask implements Runnable {
        List<String> list;
        int index;

        public WriteTask(List<String> list, int index) {
            this.list = list;
            this.index = index;
        }

        public void run() {
            list.remove(index);
            list.add(index, "write_" + index);
        }
    }

    public void run() {
        final int NUM = 10;
        List<String> list = new ArrayList<String>();
        for (int i = 0; i < NUM; i++) {
            list.add("main_" + i);
        }
        ExecutorService executorService = Executors.newFixedThreadPool(NUM);
        for (int i = 0; i < NUM; i++) {
            executorService.execute(new ReadTask(list));
            executorService.execute(new WriteTask(list, i));
        }
        executorService.shutdown();
    }

    public static void main(String[] args) {
        new CopyOnWriteArrayListDemo().run();
    }
}

这个运行结果会报错。

因为我们在读取的时候,对列表进行了修改。

CopyOnWriteArrayList 版本

直接列表创建替换即可:

List<String> list = new CopyOnWriteArrayList<String>();

则运行结果正常。

CopyOnWriteArrayList 优缺点

优点

  1. 保证多线程的并发读写的线程安全

缺点

内存消耗

有数组拷贝自然有内存问题。如果实际应用数据比较多,而且比较大的情况下,占用内存会比较大,这个可以用ConcurrentHashMap来代替。

  • 如何避免

内存占用问题。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。

针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。

数据一致性

CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器

使用场景

CopyOnWrite并发容器用于读多写少的并发场景。

比如白名单,黑名单,商品类目的访问和更新场景,假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单每天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,如果在,则提示不能搜索。

实现代码如下:

/**
 * 黑名单服务
 */
public class BlackListServiceImpl {
    private static CopyOnWriteMap<String, Boolean> blackListMap = new CopyOnWriteMap<String, Boolean>(
            1000);
    public static boolean isBlackList(String id) {
        return blackListMap.get(id) == null ? false : true;
    }
    public static void addBlackList(String id) {
        blackListMap.put(id, Boolean.TRUE);
    }
    /**
     * 批量添加黑名单
     *
     * @param ids
     */
    public static void addBlackList(Map<String,Boolean> ids) {
        blackListMap.putAll(ids);
    }
}

代码很简单,但是使用CopyOnWriteMap需要注意两件事情:

  1. 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。
  2. 使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。如使用上面代码里的addBlackList方法。

为什么没有并发列表?

问:JDK 5在java.util.concurrent里引入了ConcurrentHashMap,在需要支持高并发的场景,我们可以使用它代替HashMap。

但是为什么没有ArrayList的并发实现呢?

难道在多线程场景下我们只有Vector这一种线程安全的数组实现可以选择么?为什么在java.util.concurrent 没有一个类可以代替Vector呢?

  • 别人的理解

ConcurrentHashMap的出现更多的在于保证并发,从它采用了锁分段技术和弱一致性的Map迭代器去避免并发瓶颈可知。(jdk7 及其以前)

而ArrayList中很多操作很难避免锁整表,就如contains()、随机取get()等,进行查询搜索时都是要整张表操作的,那多线程时数据的实时一致性就只能通过锁来保证,这就限制了并发。

  • 个人的理解

这里说的并不确切。

如果没有数组的长度变化,那么可以通过下标进行分段,不同的范围进行锁。但是这种有个问题,如果数组出现删除,增加就会不行。

说到底,还是性能和安全的平衡。

  • 比较中肯的回答

在java.util.concurrent包中没有加入并发的ArrayList实现的主要原因是:很难去开发一个通用并且没有并发瓶颈的线程安全的List

像ConcurrentHashMap这样的类的真正价值(The real point/value of classes)并不是它们保证了线程安全。而在于它们在保证线程安全的同时不存在并发瓶颈。

举个例子,ConcurrentHashMap采用了锁分段技术和弱一致性的Map迭代器去规避并发瓶颈。

所以问题在于,像“Array List”这样的数据结构,你不知道如何去规避并发的瓶颈。拿contains() 这样一个操作来说,当你进行搜索的时候如何避免锁住整个list?

另一方面,Queue 和Deque (基于Linked List)有并发的实现是因为他们的接口相比List的接口有更多的限制,这些限制使得实现并发成为可能。

CopyOnWriteArrayList是一个有趣的例子,它规避了只读操作(如get/contains)并发的瓶颈,但是它为了做到这点,在修改操作中做了很多工作和修改可见性规则。

此外,修改操作还会锁住整个List,因此这也是一个并发瓶颈。

所以从理论上来说,CopyOnWriteArrayList并不算是一个通用的并发List。

源码解读

类定义

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    private static final long serialVersionUID = 8673264195747942595L;
    }

实现了最基本的 List 接口。

属性

我们看到前几次反复提及的 ReentrantLock 可重入锁。

array 比较好理解,以前的 List 也是通过数组实现的。

/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();

/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
/**
 * Gets the array.  Non-private so as to also be accessible
 * from CopyOnWriteArraySet class.
 */
final Object[] getArray() {
    return array;
}
/**
 * Sets the array.
 */
final void setArray(Object[] a) {
    array = a;
}

构造器

/**
 * Creates an empty list.
 */
public CopyOnWriteArrayList() {
    setArray(new Object[0]);
}

/**
 * Creates a list containing the elements of the specified
 * collection, in the order they are returned by the collection's
 * iterator.
 *
 * @param c the collection of initially held elements
 * @throws NullPointerException if the specified collection is null
 */
public CopyOnWriteArrayList(Collection<? extends E> c) {
    Object[] elements;
    if (c.getClass() == CopyOnWriteArrayList.class)
        elements = ((CopyOnWriteArrayList<?>)c).getArray();
    else {
        elements = c.toArray();
        // c.toArray might (incorrectly) not return Object[] (see 6260652)
        if (elements.getClass() != Object[].class)
            elements = Arrays.copyOf(elements, elements.length, Object[].class);
    }
    setArray(elements);
}

/**
 * Creates a list holding a copy of the given array.
 *
 * @param toCopyIn the array (a copy of this array is used as the
 *        internal array)
 * @throws NullPointerException if the specified array is null
 */
public CopyOnWriteArrayList(E[] toCopyIn) {
    setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}

这几种构造器都是统一调用的 setArray 方法:

/**
 * Sets the array.
 */
final void setArray(Object[] a) {
    array = a;
}

这个方法非常简单,就是初始化对应的数组信息。

核心方法

我大概看了下,很多方法和以前大同小异,我们来重点关注下几个修改元素值的方法:

set

方法通过 ReentrantLock 可重入锁控制加锁和解锁。

这里最巧妙的地方在于,首先会判断指定 index 的值是否和预期值相同。

按理说相同,是可以不进行更新的,这样性能更好;不过 jdk 中还是会进行一次设置。

如果值不同,则会对原来的 array 进行拷贝,然后更新,最后重新设置。

这样做的好处就是写是不阻塞读的,缺点就是比较浪费内存,拷贝数组也是要花时间的。

/**
 * Replaces the element at the specified position in this list with the
 * specified element.
 *
 * @throws IndexOutOfBoundsException {@inheritDoc}
 */
public E set(int index, E element) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        E oldValue = get(elements, index);
        if (oldValue != element) {
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len);
            newElements[index] = element;
            setArray(newElements);
        } else {
            // 不是完全禁止操作; 确保可变的写语义
            // Not quite a no-op; ensures volatile write semantics
            setArray(elements);
        }
        return oldValue;
    } finally {
        lock.unlock();
    }
}

ps: 这里的所有变更操作是互斥的。

add

/**
 * Appends the specified element to the end of this list.
 *
 * @param e element to be appended to this list
 * @return {@code true} (as specified by {@link Collection#add})
 */
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

也是通过 ReentrantLock 进行加锁。

这里比起更新更加简单直接,因为是添加元素,所以新数组的长度直接+1。

jdk 中数组的这种复制都是使用的 Arrays.copy 方法,这个以前实测,性能还是不错的。

add(int, E)

/**
 * Inserts the specified element at the specified position in this
 * list. Shifts the element currently at that position (if any) and
 * any subsequent elements to the right (adds one to their indices).
 *
 * @throws IndexOutOfBoundsException {@inheritDoc}
 */
public void add(int index, E element) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;

        // 越界校验
        if (index > len || index < 0)
            throw new IndexOutOfBoundsException("Index: "+index+
                                                ", Size: "+len);
        Object[] newElements;
        int numMoved = len - index;

        // 如果是放在数组的最后,其实就等价于上面的 add 方法了。
        if (numMoved == 0)
            newElements = Arrays.copyOf(elements, len + 1);
        else {
            // 如果元素不是在最后,就从两边开始复制即可:

            //0...index-1
            //index+1..len

            newElements = new Object[len + 1];
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index, newElements, index + 1,
                             numMoved);
        }
        
        // 统一设置 index 位置的元素信息
        newElements[index] = element;
        setArray(newElements);
    } finally {
        lock.unlock();
    }
}

remove 删除元素

/**
 * Removes the element at the specified position in this list.
 * Shifts any subsequent elements to the left (subtracts one from their
 * indices).  Returns the element that was removed from the list.
 *
 * @throws IndexOutOfBoundsException {@inheritDoc}
 */
public E remove(int index) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        E oldValue = get(elements, index);
        int numMoved = len - index - 1;

        // 如果删除最后一个元素,直接从 0..len-1 进行拷贝即可。
        if (numMoved == 0)
            setArray(Arrays.copyOf(elements, len - 1));
        else {
            // 新的数组比原来长度-1
            Object[] newElements = new Object[len - 1];

            //0...index-1 拷贝
            //indx+1...len-1 拷贝
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index + 1, newElements, index,
                             numMoved);
            setArray(newElements);
        }
        return oldValue;
    } finally {
        lock.unlock();
    }
}

删除元素实际上和添加元素的流程是类似的。

不过很奇怪,没有做越界判断?

迭代器

说明

COWList 的迭代器和常规的 ArrayList 迭代器还是有差异的,我们以前可能会被问过,一边遍历一边删除如何实现?

答案可能就是 Iterator。

但是 COW 的 Iterator 恰恰是不能支持变更的,个人理解是为了保证并发只在上面提及的几个变更中控制。

实现

迭代器定义

static final class COWIterator<E> implements ListIterator<E> {
        /** Snapshot of the array */
        private final Object[] snapshot;
        /** Index of element to be returned by subsequent call to next.  */
        private int cursor;

        private COWIterator(Object[] elements, int initialCursor) {
            cursor = initialCursor;
            snapshot = elements;
        }
}

基础方法

这里提供了一些基础的最常用的方法。

public boolean hasNext() {
    return cursor < snapshot.length;
}
public boolean hasPrevious() {
    return cursor > 0;
}
@SuppressWarnings("unchecked")
public E next() {
    if (! hasNext())
        throw new NoSuchElementException();
    return (E) snapshot[cursor++];
}
@SuppressWarnings("unchecked")
public E previous() {
    if (! hasPrevious())
        throw new NoSuchElementException();
    return (E) snapshot[--cursor];
}
public int nextIndex() {
    return cursor;
}
public int previousIndex() {
    return cursor-1;
}

不支持的操作

public void remove() {
    throw new UnsupportedOperationException();
}
public void set(E e) {
    throw new UnsupportedOperationException();
}
public void add(E e) {
    throw new UnsupportedOperationException();
}

@Override
public void forEachRemaining(Consumer<? super E> action) {
    Objects.requireNonNull(action);
    Object[] elements = snapshot;
    final int size = elements.length;
    for (int i = cursor; i < size; i++) {
        @SuppressWarnings("unchecked") E e = (E) elements[i];
        action.accept(e);
    }
    cursor = size;
}

小结

COW 这种思想是非常有优秀的,在写少读多的场景,可以通过空间换取时间。

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马持续写作的最大动力!

深入学习

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 2020-10-29

面试经典 ConcurrentHashMap 源码你读过吗?

HashMap 的线程安全性

HashMap 是线程不安全的。

为了使用线程安全的数据结构,多线程条件下,可使用 Collections.synchronizedMap 方法构造出一个同步Map,或者直接使用线程安全的 ConcurrentHashMap

HashMap 的线程安全性

Java 7基于分段锁的ConcurrentHashMap

注:本章的代码均基于JDK 1.7.0_67

数据结构

Java 7中的ConcurrentHashMap的底层数据结构仍然是数组和链表。

与HashMap不同的是,ConcurrentHashMap最外层不是一个大的数组,而是一个Segment的数组。

每个Segment包含一个与HashMap数据结构差不多的链表数组。整体数据结构如下图所示。

jdk7-concurrent-hashmap

寻址方式

在读写某个Key时,先取该Key的哈希值。

并将哈希值的高N位对Segment个数取模从而得到该Key应该属于哪个Segment,接着如同操作HashMap一样操作这个Segment。

为了保证不同的值均匀分布到不同的Segment,需要通过如下方法计算哈希值。

private int hash(Object k) {
  int h = hashSeed;
  if ((0 != h) && (k instanceof String)) {
    return sun.misc.Hashing.stringHash32((String) k);
  }
  h ^= k.hashCode();
  h += (h <<  15) ^ 0xffffcd7d;
  h ^= (h >>> 10);
  h += (h <<   3);
  h ^= (h >>>  6);
  h += (h <<   2) + (h << 14);
  return h ^ (h >>> 16);
}

同样为了提高取模运算效率,通过如下计算,ssize即为大于concurrencyLevel的最小的2的N次方,同时segmentMask为2^N-1。

这一点跟上文中计算数组长度的方法一致。

对于某一个Key的哈希值,只需要向右移segmentShift位以取高sshift位,再与segmentMask取与操作即可得到它在Segment数组上的索引。

int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
  ++sshift;
  ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

同步方式

Segment继承自ReentrantLock,所以我们可以很方便的对每一个Segment上锁。

对于读操作,获取Key所在的Segment时,需要保证可见性(请参考如何保证多线程条件下的可见性)。

具体实现上可以使用volatile关键字,也可使用锁。但使用锁开销太大,而使用volatile时每次写操作都会让所有CPU内缓存无效,也有一定开销。

ConcurrentHashMap使用如下方法保证可见性,取得最新的Segment。

Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)

获取Segment中的HashEntry时也使用了类似方法

HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
  (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)

对于写操作,并不要求同时获取所有Segment的锁,因为那样相当于锁住了整个Map。

它会先获取该Key-Value对所在的Segment的锁,获取成功后就可以像操作一个普通的HashMap一样操作该Segment,并保证该Segment的安全性。

同时由于其它Segment的锁并未被获取,因此理论上可支持concurrencyLevel(等于Segment的个数)个线程安全的并发读写。

获取锁时,并不直接使用lock来获取,因为该方法获取锁失败时会挂起。

事实上,它使用了自旋锁,如果tryLock获取锁失败,说明锁被其它线程占用,此时通过循环再次以tryLock的方式申请锁。

如果在循环过程中该Key所对应的链表头被修改,则重置retry次数。如果retry次数超过一定值,则使用lock方法申请锁。

这里使用自旋锁是因为自旋锁的效率比较高,但是它消耗CPU资源比较多,因此在自旋次数超过阈值时切换为互斥锁。

size操作

put、remove和get操作只需要关心一个Segment,而size操作需要遍历所有的Segment才能算出整个Map的大小。

一个简单的方案是,先锁住所有Segment,计算完后再解锁。

但这样做,在做size操作时,不仅无法对Map进行写操作,同时也无法进行读操作,不利于对Map的并行操作。

为更好支持并发操作,ConcurrentHashMap会在不上锁的前提逐个Segment计算3次size,如果某相邻两次计算获取的所有Segment的更新次数(每个Segment都与HashMap一样通过modCount跟踪自己的修改次数,Segment每修改一次其modCount加一)相等,说明这两次计算过程中无更新操作,则这两次计算出的总size相等,可直接作为最终结果返回。

如果这三次计算过程中Map有更新,则对所有Segment加锁重新计算Size。

该计算方法代码如下

public int size() {
  final Segment<K,V>[] segments = this.segments;
  int size;
  boolean overflow; // true if size overflows 32 bits
  long sum;         // sum of modCounts
  long last = 0L;   // previous sum
  int retries = -1; // first iteration isn't retry
  try {
    for (;;) {
      if (retries++ == RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
          ensureSegment(j).lock(); // force creation
      }
      sum = 0L;
      size = 0;
      overflow = false;
      for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
          sum += seg.modCount;
          int c = seg.count;
          if (c < 0 || (size += c) < 0)
            overflow = true;
        }
      }
      if (sum == last)
        break;
      last = sum;
    }
  } finally {
    if (retries > RETRIES_BEFORE_LOCK) {
      for (int j = 0; j < segments.length; ++j)
        segmentAt(segments, j).unlock();
    }
  }
  return overflow ? Integer.MAX_VALUE : size;
}

不同之处

ConcurrentHashMap与HashMap相比,有以下不同点

  1. ConcurrentHashMap线程安全,而HashMap非线程安全
  2. HashMap允许Key和Value为null,而ConcurrentHashMap不允许
  3. HashMap不允许通过Iterator遍历的同时通过HashMap修改,而ConcurrentHashMap允许该行为,并且该更新对后续的遍历可见

Java 8 基于 CAS 的 ConcurrentHashMap

注:本章的代码均基于JDK 1.8.0_91

数据结构

Java 7为实现并行访问,引入了Segment这一结构,实现了分段锁,理论上最大并发度与Segment个数相等。

Java 8为进一步提高并发性,摒弃了分段锁的方案,而是直接使用一个大的数组。

同时为了提高哈希碰撞下的寻址性能,Java 8在链表长度超过一定阈值(8)时将链表(寻址时间复杂度为O(N))转换为 红黑树(寻址时间复杂度为O(log(N)))。

其数据结构如下图所示:

jdk8-concurrent-hashmap

寻址方式

Java 8的ConcurrentHashMap同样是通过Key的哈希值与数组长度取模确定该Key在数组中的索引。

同样为了避免不太好的Key的hashCode设计,它通过如下方法计算得到Key的最终哈希值。

不同的是,Java 8的ConcurrentHashMap作者认为引入红黑树后,即使哈希冲突比较严重,寻址效率也足够高,所以作者并未在哈希值的计算上做过多设计,只是将Key的hashCode值与其高16位作异或并保证最高位为0(从而保证最终结果为正整数)。

static final int spread(int h) {
  return (h ^ (h >>> 16)) & HASH_BITS;
}

同步方式

对于put操作,如果Key对应的数组元素为null,则通过CAS操作将其设置为当前值。

如果Key对应的数组元素(也即链表表头或者树的根元素)不为null,则对该元素使用synchronized关键字申请锁,然后进行操作。

如果该put操作使得当前链表长度超过一定阈值,则将该链表转换为树,从而提高寻址效率。

对于读操作,由于数组被volatile关键字修饰,因此不用担心数组的可见性问题。

同时每个元素是一个Node实例(Java 7中每个元素是一个HashEntry),它的Key值和hash值都由final修饰,不可变更,无须关心它们被修改后的可见性问题。

而其Value及对下一个元素的引用由volatile修饰,可见性也有保障。

static class Node<K,V> implements Map.Entry<K,V> {
  final int hash;
  final K key;
  volatile V val;
  volatile Node<K,V> next;
}

对于Key对应的数组元素的可见性,由 Unsafe.getObjectVolatile() 保证。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

size 操作

put方法和remove方法都会通过addCount方法维护Map的size。

size方法通过sumCount获取由addCount方法维护的Map的size。

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

小结

HashMap 和 List 是我最喜欢的两种数据结构,简单实用。

ConcurrentHashMap 为了性能提升,都在不同的 jdk 版本中提升自己,何况你我呢?

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马持续写作的最大动力!

深入学习

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 2020-10-28

并发类 AtomicInteger 使用入门及源码详解

AtomicInterger 介绍

可以原子性更新的 Integer 值,当然这个类并不能完全替代 Integer 对象。

AtomicInterger

使用

使用起来还是很方便的。

比如说我们定义一个计数器,使用 AtomicInteger 可以同时兼顾性能与并发安全。

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author binbin.hou
 * @since 1.0.0
 */
public class Counter {

    private AtomicInteger c = new AtomicInteger(0);

    /**
     * 递增
     */
    public void increment() {
        c.getAndIncrement();
    }

    /**
     * 获取值
     * @return 值
     */
    public int value() {
        return c.get();
    }

    public static void main(String[] args) throws InterruptedException {
        final Counter counter = new Counter();

        //1000 threads
        for(int i = 0; i < 100 ; i++) {
            new Thread(new Runnable() {
                public void run() {
                    counter.increment();
                }
            }).start();
        }
        Thread.sleep(1000);
        System.out.println("Final number (should be 100): " + counter.value());
    }

}

日志输出:

Final number (should be 100): 100

AtomicInteger 源码

可恶!这个类使用起来竟然这么方便。

那么李大狗是如何实现的呢?

AtomicInteger 源码

类定义

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;
}

继承自 Number 类,实现了序列化接口。

内部属性

这里初始化了 unsafe 变量,用于后面使用 CAS 做变量更新。

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();

// 值的偏移量
private static final long valueOffset;

static {
    try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}

// 定义的变量值
private volatile int value;
  • objectFieldOffset 方法

这是一个 native 方法,换言之就是直接调用的操作系统,获取到 value 变量的内存偏移量信息。

public native long objectFieldOffset(Field var1);

构造器

平淡无奇的构造器,用来初始化 value。

当然也可以不指定,不指定的时候默认值是什么呢?

我想各位读者肯定都清楚,不清楚的可以留言区忏悔一下。

/**
 * Creates a new AtomicInteger with the given initial value.
 *
 * @param initialValue the initial value
 */
public AtomicInteger(int initialValue) {
    value = initialValue;
}

/**
 * Creates a new AtomicInteger with initial value {@code 0}.
 */
public AtomicInteger() {
}

基本的方法

/**
 * Gets the current value.
 *
 * @return the current value
 */
public final int get() {
    return value;
}

/**
 * Sets to the given value.
 *
 * @param newValue the new value
 */
public final void set(int newValue) {
    value = newValue;
}

/**
 * Returns the String representation of the current value.
 * @return the String representation of the current value
 */
public String toString() {
    return Integer.toString(get());
}
/**
 * Returns the value of this {@code AtomicInteger} as an {@code int}.
 */
public int intValue() {
    return get();
}
/**
 * Returns the value of this {@code AtomicInteger} as a {@code long}
 * after a widening primitive conversion.
 * @jls 5.1.2 Widening Primitive Conversions
 */
public long longValue() {
    return (long)get();
}
/**
 * Returns the value of this {@code AtomicInteger} as a {@code float}
 * after a widening primitive conversion.
 * @jls 5.1.2 Widening Primitive Conversions
 */
public float floatValue() {
    return (float)get();
}
/**
 * Returns the value of this {@code AtomicInteger} as a {@code double}
 * after a widening primitive conversion.
 * @jls 5.1.2 Widening Primitive Conversions
 */
public double doubleValue() {
    return (double)get();
}

这两个方法和普通类中的 getter/setter等并没有区别,此处不做过多解释。

基于 unsafe 的方法

为什么 AtomicInteger 能保持原子性呢?

我们一起来看一下是如何基于 Unsafe 实现原子性的?

lazySet 惰性设置

/**
 * Eventually sets to the given value.
 *
 * @param newValue the new value
 * @since 1.6
 */
public final void lazySet(int newValue) {
    unsafe.putOrderedInt(this, valueOffset, newValue);
}

最终会把值设置为给定的值。这是什么意思?我直接懵了。

其实这个是相对的,我们前面说过,volatile 修饰的变量,修改后可以保证线程间的可见性。但是这个方法,修改后并不保证线程间的可见性

这和以前在网上看到的可不一样,不是说好的 AtomicXXX 都是基于 volatile+cas 实现的吗?这里为什么要反其道而行之呢?

其实是为了性能,lazySet 有自己的应用场景。

高级程序员都知道 volatile 可以保证变量在线程间的可见性,但是这里再问一句,不使用 volatile 修饰就无法保证可见性了吗?

事实上,这里完全可以不用 volatile 变量来修饰这些共享状态,

  1. 因为访问共享状态之前先要获得锁, Lock.lock()方法能够获得锁,而获得锁的操作和volatile变量的读操作一样,会强制使CPU缓存失效,强制从内存读取变量。
  2. Lock.unlock()方法释放锁时,和写volatile变量一样,会强制刷新CPU写缓冲区,把缓存数据写到主内存

底层也是通过加内存屏障实现的。

而lazySet()优化原理,就是在不需要让共享变量的修改立刻让其他线程可见的时候,以设置普通变量的方式来修改共享状态,可以减少不必要的内存屏障,从而提高程序执行的效率。

这个讨论可以参考 stackoverflow 的问题 AtomicInteger lazySet vs. set

原子性设置值

/**
 * Atomically sets to the given value and returns the old value.
 *
 * @param newValue the new value
 * @return the previous value
 */
public final int getAndSet(int newValue) {
    return unsafe.getAndSetInt(this, valueOffset, newValue);
}

这个方法实现如下:

public final int getAndSetInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var4));
    return var5;
}

实际上就是我们常说的 volatile + CAS 实现。

compareAndSet 比较并且设置

jdk 将 CAS 这个方法暴露给了开发者,不过做了一层封装,让 unsafe 类对使用者不可见。

compareAndSwapInt 这个方法是一个 native 方法,此处不做深入。其他的方法很多都大同小异,所以我们不再赘述。

ps: 很烦,native 方法直接看源码就会变得很麻烦,以后有时间研究下 openJdk 之类的。

/**
 * Atomically sets the value to the given updated value
 * if the current value {@code ==} the expected value.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

weakCompareAndSet

这个方法我觉得也很有趣,弱比较?拿泥搜来。

/**
 * Atomically sets the value to the given updated value
 * if the current value {@code ==} the expected value.
 *
 * <p><a href="package-summary.html#weakCompareAndSet">May fail
 * spuriously and does not provide ordering guarantees</a>, so is
 * only rarely an appropriate alternative to {@code compareAndSet}.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful
 */
public final boolean weakCompareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

我们发现这两个方法在 jdk1.8 中实际上是没有差异的。

底层调用的都是同一个方法:

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

那区别是什么呢?

于是我就去查了一下,JDK1.9以前,两者底层实现是一样的,并没有严格区分。

JDK 1.9提供了Variable Handles的API,主要是用来取代java.util.concurrent.atomic包以及sun.misc.Unsafe类的功能。

Variable Handles需要依赖jvm的增强及编译器的协助,即需要依赖java语言规范及jvm规范的升级。

VarHandle中compareAndSet和compareAndSet的定义如下:

(1)compareAndSet(Object... args)

Atomically sets the value of a variable to the newValue with the memory semantics of set(java.lang.Object...) if the variable's current value, referred to as the witness value, == the expectedValue, as accessed with the memory semantics of getAcquire(java.lang.Object...).

(2)weakCompareAndSet(Object... args)

Possibly atomically sets the value of a variable to the newValue with the memory semantics of setVolatile(java.lang.Object...) if the variable's current value, referred to as the witness value, == the expectedValue, as accessed with the memory semantics of getVolatile(java.lang.Object...).

weakCompareAndSet的描述多了一个单词Possibly,可能的。

weakCompareAndSet有可能不是原子性的去更新值,这取决于虚拟机的实现。

@HotSpotIntrinsicCandidate 标注的方法,在HotSpot中都有一套高效的实现,该高效实现基于CPU指令,运行时,HotSpot维护的高效实现会替代JDK的源码实现,从而获得更高的效率。

也就是说HotSpot可能会手动实现这个方法。

@PolymorphicSignature
@HotSpotIntrinsicCandidate
public final native boolean compareAndSet(Object... var1);

@PolymorphicSignature
@HotSpotIntrinsicCandidate
public final native boolean weakCompareAndSet(Object... var1);

其实这个方法和上面的 lazySet 有异曲同工之妙。

小结

我们对 AtomicInteger 源码进行了初步的分析,底层也确实是依赖 volatile+CAS 实现。

不过发现了两个有趣的实现:weakCompareAndSet 和 lazySet。

看起来反其道而行之,实际上都是出于更高的性能考虑。

文中很多方法都是 native 实现,这让我们读起来不够尽兴,说到底这个世界上本没有高级语言,只有C语言,和对C语言的封装

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马写作的最大动力!

深入学习

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 2020-10-21

ReentrantLock 可重入锁这样学,面试没烦恼,下班走得早

ReentrantLock+可重入锁

为什么需要 ReentrantLock ?

既生 synchronized,何生 ReentrantLock

每一个接触过多线程的 java coder 肯定都知道 synchronized 关键字,那为什么还需要 ReentrantLock 呢?

其实这就是 ReentrantLock 与 synchronized 对比的优势问题:

(1)ReentrantLock 使用起来更来更加灵活。我们在需要控制的地方,可以灵活指定加锁或者解锁。

这可以让加锁的范围更小,记住老马的一句话,更小往往意味着更快

(2)ReentrantLock 提供了公平锁、非公平锁等多种方法特性,这些都是 synchronized 关键字无法提供的。

接下来,就让我们一起来学习一下 ReentrantLock 可重入锁吧。

可重入锁

ReentrantLock 使用

线程定义

创建一个可重入锁线程。

/**
 * @author 老马啸西风
 */
public class ReconnectThread extends Thread {

    /**
     * 声明可重入锁
     */
    private static final ReentrantLock reentrantLock = new ReentrantLock();


    /**
     * 用于标识当前线程
     */
    private String name;

    public ReconnectThread(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        reentrantLock.lock();

        try {
            for (int i = 0; i < 5; i++) {
                System.out.println(name+" "+i+" times");
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }

    }
}

测试

  • Test
/**
 * @author 老马啸西风
 */
public static void main(String[] args) {
    Thread one = new ReconnectThread("one");
    Thread two = new ReconnectThread("two");
    one.start();
    two.start();
}
  • result

根据结果可知。两个必须要等待另外一个执行完成才能运行。

two 0 times
two 1 times
two 2 times
two 3 times
two 4 times
one 0 times
one 1 times
one 2 times
one 3 times
one 4 times

锁的释放和获取

锁是 java 并发编程中最重要的同步机制。

锁除了让临界区互斥执行外,还可以让释放锁的线程向获取同一个锁的线程发送消息。

实例

  • MonitorExample.java
/**
 * @author 老马啸西风
 */
class MonitorExample {
    int a = 0;

    public synchronized void writer() {  //1
        a++;                             //2
    }                                    //3

    public synchronized void reader() {  //4
        int i = a;                       //5
        //……
    }                                    //6
}

假设线程 A 执行 writer() 方法,随后线程 B 执行 reader() 方法。

根据 happens-before 规则,这个过程包含的 happens-before 关系可以分为两类:

  • 根据程序次序规则,1 happens before 2, 2 happens before 3; 4 happens before 5, 5 happens before 6。
  • 根据监视器锁规则,3 happens before 4。
  • 根据 happens before 的传递性,2 happens before 5。

因此,线程 A 在释放锁之前所有可见的共享变量,在线程 B 获取同一个锁之后,将立刻变得对 B 线程可见。

锁释放和获取的内存语义

当线程释放锁时,JMM 会把该线程对应的本地内存中的共享变量刷新到主内存中。

以上面的 MonitorExample 程序为例,A 线程释放锁后,共享数据的状态示意图如下:

  • 线程 A
本地内存 A: a = 1;

(写入到主内存)

主内存:a = 1;

当线程获取锁时,JMM 会把该线程对应的本地内存置为无效。

从而使得被监视器保护的临界区代码必须要从主内存中去读取共享变量。

下面是锁获取的状态过程:

在线程 A 写入主内存之后。

线程之间通信:线程 A 向 B 发送消息

  • 线程 B
主内存:a = 1;

(从主内存中读取)

本地内存 B: a = 1;

和 volatile 内存语义对比

对比锁释放-获取的内存语义与 volatile 写-读的内存语义,

可以看出:锁释放与 volatile 写有相同的内存语义;锁获取与 volatile 读有相同的内存语义

内存语义小结

下面对锁释放和锁获取的内存语义做个总结:

  • 线程 A 释放一个锁,实质上是线程A向接下来将要获取这个锁的某个线程发出了(线程A对共享变量所做修改的)消息。
  • 线程 B 获取一个锁,实质上是线程 B 接收了之前某个线程发出的(在释放这个锁之前对共享变量所做修改的)消息。
  • 线程 A 释放锁,随后线程 B 获取这个锁,这个过程实质上是线程 A 通过主内存向线程 B 发送消息。

锁内存语义的实现

本文将借助 ReentrantLock 的源代码,来分析锁内存语义的具体实现机制。

  • ReentrantLockExample.java
/**
 * @author 老马啸西风
 */
class ReentrantLockExample {

    int a = 0;

    ReentrantLock lock = new ReentrantLock();

    public void writer() {
        lock.lock();         //获取锁
        try {
            a++;
        } finally {
            lock.unlock();  //释放锁
        }
    }

    public void reader () {
        lock.lock();        //获取锁
        try {
            int i = a;
            //……
        } finally {
            lock.unlock();  //释放锁
        }
    }
}

在 ReentrantLock 中,调用 lock() 方法获取锁;调用 unlock() 方法释放锁。

源码实现

ReentrantLock 的实现依赖于 java 同步器框架 AbstractQueuedSynchronizer(本文简称之为AQS)。

AQS 使用一个整型的 volatile 变量(命名为state)来维护同步状态,马上我们会看到,这个 volatile 变量是 ReentrantLock 内存语义实现的关键。

/**
 * @author 老马啸西风
 */
public class ReentrantLock implements Lock, java.io.Serializable {

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //...
    }

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        //...
    }

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        //...        
    }
}

公平锁

lock()

使用公平锁时,加锁方法lock()的方法调用轨迹如下:

  1. ReentrantLock : lock()
  2. FairSync : lock()
  3. AbstractQueuedSynchronizer : acquire(int arg)
  4. ReentrantLock : tryAcquire(int acquires)

在第 4 步真正开始加锁,下面是该方法的源代码(JDK 1.8):

/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 * @author 老马啸西风
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

加锁方法首先读 volatile 变量 state。

unlock()

在使用公平锁时,解锁方法unlock()的方法调用轨迹如下:

  1. ReentrantLock : unlock()
  2. AbstractQueuedSynchronizer : release(int arg)
  3. Sync : tryRelease(int releases)

在第 3 步真正开始释放锁,下面是该方法的源代码:

/**
 * @author 老马啸西风
 */
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

在释放锁的最后写 volatile 变量 state。

公平锁在释放锁的最后写 volatile 变量 state;在获取锁时首先读这个 volatile 变量。

根据 volatile 的 happens-before 规则,释放锁的线程在写 volatile 变量之前可见的共享变量,在获取锁的线程读取同一个 volatile 变量后将立即变的对获取锁的线程可见。

非公平锁

非公平锁的释放和公平锁完全一样,所以这里仅仅分析非公平锁的获取。

lock()

使用非公平锁时,加锁方法lock()的方法调用轨迹如下:

  1. ReentrantLock : lock()
  2. NonfairSync : lock()
  3. AbstractQueuedSynchronizer : compareAndSetState(int expect, int update)

在第 3 步真正开始加锁,下面是该方法的源代码:

/**
 * Atomically sets synchronization state to the given updated
 * value if the current state value equals the expected value.
 * This operation has memory semantics of a {@code volatile} read
 * and write.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that the actual
 *         value was not equal to the expected value.
 * @author 老马啸西风
 */
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

该方法以原子操作的方式更新 state 变量,本文把 java 的 compareAndSet() 方法调用简称为CAS。

JDK文档对该方法的说明如下:如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值。此操作具有 volatile 读和写的内存语义

内存语义总结

现在对公平锁和非公平锁的内存语义做个总结:

  • 公平锁和非公平锁释放时,最后都要写一个 volatile 变量 state。
  • 公平锁获取时,首先会去读这个 volatile 变量。
  • 非公平锁获取时,首先会用 CAS 更新这个 volatile 变量,这个操作同时具有 volatile 读和 volatile 写的内存语义。

从本文对 ReentrantLock 的分析可以看出,锁释放-获取的内存语义的实现至少有下面两种方式:

  1. 利用 volatile 变量的写-读所具有的内存语义。
  2. 利用 CAS 所附带的 volatile 读和 volatile 写的内存语义。

小结

本文从介绍 ReentrantLock 使用案例开始,引出了锁的获取和释放的内存语义。

为了读者加深印象,对源码进行了简单的学习,下一节将对源码进行深入讲解。

秉着没有对比,就没有发现的原则,我们对比了 ReentrantLock 和 volatile 以及 synchronized 的差异性,便于读者正确地根据自己的场景选择合适的加锁策略。

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马写作的最大动力!

深入学习

查看原文

赞 1 收藏 1 评论 0

老马啸西风 发布了文章 · 2020-10-12

java 手写并发框架(二)异步转同步框架封装锁策略

序言

上一节我们学习了异步查询转同步的 7 种实现方式,今天我们就来学习一下,如何对其进行封装,使其成为一个更加便于使用的工具。

思维导图如下:

异步转同步

拓展阅读

java 手写并发框架(1)异步查询转同步的 7 种实现方式

异步转同步的便利性

实现方式

  • 循环等待
  • wait & notify
  • 使用条件锁
  • 使用 CountDownLatch
  • 使用 CyclicBarrier
  • Future
  • Spring EventListener

上一节我们已经对上面的 7 种实现方式进行了详细的介绍,没有看过的同学可以去简单回顾一下。

但是这样个人觉得还是不够方便,懒惰是进步的阶梯。

更进一步简化

我们希望达到下面的效果:

@Sync
public String queryId() {
    System.out.println("开始查询");
    return id;
}

@SyncCallback(value = "queryId")
public void queryIdCallback() {
    System.out.println("回调函数执行");
    id = "123";
}

通过注解直接需要同步的方法,和回调的方法,代码中直接调用即可。

我们首先实现基于字节码增强的版本,后续将实现整合 spring, springboot 的版本。

锁的代码实现

锁的定义

我们将原来的实现抽象为加锁和解锁,为了便于拓展,接口定义如下:

package com.github.houbb.sync.api.api;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public interface ISyncLock {

    /**
     * 等待策略
     * @param context 上下文
     * @since 0.0.1
     */
    void lock(final ISyncLockContext context);

    /**
     * 解锁策略
     * @param context 上下文
     * @since 0.0.1
     */
    void unlock(final ISyncUnlockContext context);

}

其中上下文加锁和解锁做了区分,不过暂时内容是一样的。

主要是超时时间和单位:

package com.github.houbb.sync.api.api;

import java.util.concurrent.TimeUnit;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public interface ISyncLockContext {

    /**
     * 超时时间
     * @return 结果
     */
    long timeout();

    /**
     * 超时时间单位
     * @return 结果
     */
    TimeUnit timeUnit();

}

锁策略实现

我们本节主要实现下上一节中的几种锁实现。

目前我们选择其中的是个进行实现:

wait & notify

package com.github.houbb.sync.core.support.lock;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sync.api.api.ISyncLock;
import com.github.houbb.sync.api.api.ISyncLockContext;
import com.github.houbb.sync.api.api.ISyncUnlockContext;
import com.github.houbb.sync.api.exception.SyncRuntimeException;

/**
 * 等待通知同步
 *
 * @author binbin.hou
 * @since 0.0.1
 */
public class WaitNotifyLock implements ISyncLock {

    private static final Log log = LogFactory.getLog(WaitNotifyLock.class);

    /**
     * 声明对象
     */
    private final Object lock = new Object();

    @Override
    public synchronized void lock(ISyncLockContext context) {
        synchronized (lock) {
            try {
                long timeoutMills = context.timeUnit().toMillis(context.timeout());
                log.info("进入等待,超时时间为:{}ms", timeoutMills);
                lock.wait(timeoutMills);
            } catch (InterruptedException e) {
                log.error("中断异常", e);
                throw new SyncRuntimeException(e);
            }
        }
    }

    @Override
    public void unlock(ISyncUnlockContext context) {
        synchronized (lock) {
            log.info("唤醒所有等待线程");
            lock.notifyAll();
        }
    }

}

加锁的部分比较简单,我们从上下文中获取超时时间和超时单位,直接和上一节内容类似,调用即可。

至于上下文中的信息是怎么来的,我们后续就会讲解。

条件锁实现

这个在有了上一节的基础之后也非常简单。

核心流程:

(1)创建锁

(2)获取锁的 condition

(3)执行加锁和解锁

package com.github.houbb.sync.core.support.lock;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sync.api.api.ISyncLock;
import com.github.houbb.sync.api.api.ISyncLockContext;
import com.github.houbb.sync.api.api.ISyncUnlockContext;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 等待通知同步
 *
 * @author binbin.hou
 * @since 0.0.1
 */
public class LockConditionLock implements ISyncLock {

    private static final Log log = LogFactory.getLog(LockConditionLock.class);

    private final Lock lock = new ReentrantLock();

    private final Condition condition = lock.newCondition();

    @Override
    public synchronized void lock(ISyncLockContext context) {
        lock.lock();
        try{
            log.info("程序进入锁定状态");
            condition.await(context.timeout(), context.timeUnit());
        } catch (InterruptedException e) {
            log.error("程序锁定状态异常", e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void unlock(ISyncUnlockContext context) {
        lock.lock();
        try{
            log.info("解锁状态,唤醒所有等待线程。");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

}

CountDownLatch 实现

package com.github.houbb.sync.core.support.lock;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sync.api.api.ISyncLock;
import com.github.houbb.sync.api.api.ISyncLockContext;
import com.github.houbb.sync.api.api.ISyncUnlockContext;

import java.util.concurrent.CountDownLatch;

/**
 * 等待通知同步
 *
 * @author binbin.hou
 * @since 0.0.1
 */
public class CountDownLatchLock implements ISyncLock {

    private static final Log log = LogFactory.getLog(CountDownLatchLock.class);

    /**
     * 闭锁
     * 调用1次,后续方法即可通行。
     */
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    @Override
    public synchronized void lock(ISyncLockContext context) {
        countDownLatch = new CountDownLatch(1);

        try {
            log.info("进入等待,超时时间为:{},超时单位:{}", context.timeout(),
                    context.timeUnit());
            boolean result = countDownLatch.await(context.timeout(), context.timeUnit());
            log.info("等待结果: {}", result);
        } catch (InterruptedException e) {
            log.error("锁中断异常", e);
        }
    }

    @Override
    public void unlock(ISyncUnlockContext context) {
        log.info("执行 unlock 操作");
        countDownLatch.countDown();
    }

}

注意:这里为了保证 countDownLatch 可以多次使用,我们在每一次加锁的时候,都会重新创建 CountDownLatch。

CyclicBarrierLock 锁实现

package com.github.houbb.sync.core.support.lock;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sync.api.api.ISyncLock;
import com.github.houbb.sync.api.api.ISyncLockContext;
import com.github.houbb.sync.api.api.ISyncUnlockContext;
import com.github.houbb.sync.api.exception.SyncRuntimeException;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeoutException;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public class CyclicBarrierLock implements ISyncLock {

    private static final Log log = LogFactory.getLog(CyclicBarrierLock.class);

    private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    @Override
    public synchronized void lock(ISyncLockContext context) {
        try {
            log.info("进入锁定状态, timeout:{}, timeunit: {}",
                    context.timeout(), context.timeUnit());
            cyclicBarrier.await(context.timeout(), context.timeUnit());

            log.info("重置 cyclicBarrier");
            cyclicBarrier.reset();
        } catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
            log.error("锁定时遇到异常", e);
            throw new SyncRuntimeException(e);
        }
    }

    @Override
    public void unlock(ISyncUnlockContext context) {
        try {
            log.info("解锁信息");
            cyclicBarrier.await(context.timeout(), context.timeUnit());
        } catch (InterruptedException | TimeoutException | BrokenBarrierException e) {
            log.error("解锁时遇到异常", e);
        }
    }

}

这里和 CountDownLatchLock 的实现非常类似,不过 CyclicBarrier 有一个好处,就是可以复用。

我们在每一次解锁之后,重置一下栅栏:

log.info("重置 cyclicBarrier");
cyclicBarrier.reset();

锁的工具类

为了简单的生成上述几种锁的实例,我们提供了一个简单的工具类方法:

package com.github.houbb.sync.core.support.lock;

import com.github.houbb.heaven.support.instance.impl.Instances;
import com.github.houbb.sync.api.api.ISyncLock;
import com.github.houbb.sync.api.constant.LockType;

import java.util.HashMap;
import java.util.Map;

/**
 * 锁策略
 * @author binbin.hou
 * @since 0.0.1
 */
public final class Locks {

    private Locks(){}

    /**
     * MAP 信息
     * @since 0.0.1
     */
    private static final Map<LockType, ISyncLock> MAP = new HashMap<>();

    static {
        MAP.put(LockType.WAIT_NOTIFY, waitNotify());
        MAP.put(LockType.COUNT_DOWN_LATCH, countDownLatch());
        MAP.put(LockType.CYCLIC_BARRIER, cyclicBarrier());
        MAP.put(LockType.LOCK_CONDITION, lockCondition());
    }

    /**
     * 获取锁实现
     * @param lockType 锁类型
     * @return 实现
     * @since 0.0.1
     */
    public static ISyncLock getLock(final LockType lockType) {
        return MAP.get(lockType);
    }

    /**
     * @since 0.0.1
     * @return 实现
     */
    private static ISyncLock waitNotify() {
        return Instances.singleton(WaitNotifyLock.class);
    }

    /**
     * @since 0.0.1
     * @return 实现
     */
    private static ISyncLock countDownLatch() {
        return Instances.singleton(CountDownLatchLock.class);
    }

    /**
     * @since 0.0.1
     * @return 实现
     */
    private static ISyncLock lockCondition() {
        return Instances.singleton(LockConditionLock.class);
    }

    /**
     * @since 0.0.1
     * @return 实现
     */
    private static ISyncLock cyclicBarrier() {
        return Instances.singleton(CyclicBarrierLock.class);
    }

}

上述的锁实现都是线程安全的,所以全部使用单例模式创建。

LockType 类是一个锁的枚举类,会在注解中使用。

小结

好了,到这里我们就把上一节中的常见的 4 种锁策略就封装完成了。

你可能好奇上下文的时间信息哪里来?这些锁又是如何被调用的?

我们将通过注解+字节码增强的方式来实现调用(就是 aop 的原理),由于篇幅原因,字节码篇幅较长,为了阅读体验,实现部分将放在下一节。

感兴趣的可以关注一下,便于实时接收最新内容。

觉得本文对你有帮助的话,欢迎点赞评论收藏转发一波。你的鼓励,是我最大的动力~

不知道你有哪些收获呢?或者有其他更多的想法,欢迎留言区和我一起讨论,期待与你的思考相遇。

文中如果链接失效,可以点击 {阅读原文}。

深入学习

查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 37 次点赞
  • 获得 2 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 2 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2018-07-06
个人主页被 2.9k 人浏览