日拱一兵

日拱一兵 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织 dayarch.top 编辑
编辑

欢迎关注,公众号「日拱一兵」,以读侦探小说思维趣味轻松学习Java技术

个人动态

日拱一兵 发布了文章 · 1月8日

10分钟搞定让你困惑的 Jenkins 环境变量

前言

Jenkins, DevOps 技术栈的核心之一,CI/CD 离不开编写 Pipeline 脚本,上手 Jenkins ,简单查一下文档,你就应该不会被 agent,stages,step 这类关键词弄懵,也能很快构建出 pipeline 的骨架

但是当向骨架中填充内容的时候,尤其如何利用环境变量(系统内置 | 自定义),多数人都会变得比较混乱,浪费很多时间,本文就帮助大家快速通关环境变量

准备

如果你想一边阅读本文,一边实践,但是没有 Jenkins 服务可用,又想快速尝试,可以应用 Docker 一个命令快速搭建 Jenkins 服务

docker container run --rm -p 8080:8080 -p 50000:50000 --name=jenkins -v $(pwd):/var/jenkins_home jenkins/jenkins

2021 年了,本地没有 Docker 说不过去了,过来瞧瞧 Docker 系列是否入得了你的法眼?

打开浏览器输入:localhost:8080

  1. 找到终端的临时密码登陆
  2. 安装推荐的依赖
  3. 创建新的 Pipeline 类型的 Item
  4. 点击左侧 Config,然后在页面底部 Pipeline 部分输入我们接下来写的脚本进行测试就好了

就是这么简单.....

认识 Jenkins 环境变量

Jenkins 环境变量就是通过 env 关键字暴露出来的全局变量,可以在 Jenkins 文件的任何位置使用

其实和你使用的编程语言中的全局变量没有实质差别

查看 Jenkins 系统内置环境变量

Jenkins 在系统内置了很多环境变量方便我们快速使用,查看起来有两种方式:

方式一:

直接在浏览器中访问 ${YOUR_JENKINS_HOST}/env-vars.html 页面就可以,比如 http://localhost:8080/env-vars.html ,每个变量的用途写的都很清楚

方式二

通过执行 printenv shell 命令来获取:

pipeline {
    agent any

    stages {
        stage("Env Variables") {
            steps {
                sh "printenv"
            }
        }
    }
}

直接 Save - Build, 在终端 log 中你会看到相应的环境变量,并且可以快速看到他们当前的值

通常这两种方式可以结合使用

读取环境变量

上面我们说了 env 是环境变量的关键字,但是读取 Jenkins 内置的这些环境变量,env 关键字是可有可无, 但不能没了底裤,都要使用 ${xxx} 包围起来。以 BUILD_NUMBER 这个内置环境变量举例来说明就是这样滴:

如果你在 Jenkins 文件中使用 shell 命令,使用这些内置环境变量甚至可以不用 {}, 来看一下:

pipeline {
    agent any

    stages {
        stage("Read Env Variables") {
            steps {
                echo "带 env 的读取方式:${env.BUILD_NUMBER}"
                echo "不带 env 的读取方式:${BUILD_NUMBER}"
                sh 'echo "shell 中读取方式 $BUILD_NUMBER"'
            }
        }
    }
}

可以看到结果是一样一样滴,不管有几种,记住第一种最稳妥

内置的环境变量虽好,但也不能完全满足我们自定义的 pipeline 的执行逻辑,所以我们也得知道如何定义以及使用自定义环境变量

自定义 Jenkins 环境变量

Jenkins pipeline 分声明式(Declarative)和 脚本式(imperative)写法,相应的环境变量定义方式也略有不同,归纳起来有三种方式:

还是看个实际例子吧:

pipeline {
    agent any

    environment {
        FOO = "bar"
    }

    stages {
        stage("Custom Env Variables") {
            environment {
                NAME = "RGYB"
            }

            steps {
                echo "FOO = ${env.FOO}"
                echo "NAME = ${env.NAME}"

                script {
                    env.SCRIPT_VARIABLE = "Thumb Up"
                }

                echo "SCRIPT_VARIABLE = ${env.SCRIPT_VARIABLE}"

                withEnv(["WITH_ENV_VAR=Come On"]) {
                    echo "WITH_ENV_VAR = ${env.WITH_ENV_VAR}"
                }
            }
        }
    }
}

来看运行结果:

注意:withEnv(["WITH_ENV_VAR=Come On"]) {}这里的 = 号两侧不能有空格,必须是 key=value 的形式

一个完整的 pipeline 通常会有很多个 stage,环境变量在不同的 stage 有不同的值是很常见的,知道如何设置以及读取环境变量后,我们还得知道如何重写环境变量

重写 Jenkins 环境变量

Jenkins 让人相对困惑最多的地方就是重写环境变量,但是只要记住下面这三条规则,就可以搞定一切了

  1. withEnv(["WITH_ENV_VAR=Come On"]) {} 内置函数的这种写法,可以重写任意环境变量
  2. 定义在 environment {} 的环境变量不能被脚本式定义的环境变量(env.key="value")重写
  3. 脚本式环境变量只能重写脚本式环境变量

这三点是硬规则,没涵盖在这 3 点规则之内的也就是被允许的了

三条规则就有点让人头大了,农夫选豆种,举例为证吧

pipeline {
    agent any

    environment {
        FOO = "你当像鸟飞往你的山"
        NAME = "Tan"
    }

    stages {
        stage("Env Variables") {
            environment {
                  // 会重写第 6 行 变量
                NAME = "RGYB" 
                  // 会重写系统内置的环境变量 BUILD_NUMBER
                BUILD_NUMBER = "10" 
            }

            steps {
                  // 应该打印出 "FOO = 你当像鸟飞往你的山"
                echo "FOO = ${env.FOO}" 
                  // 应该打印出 "NAME = RGYB"
                echo "NAME = ${env.NAME}" 
                  // 应该打印出 "BUILD_NUMBER = 10"
                echo "BUILD_NUMBER =  ${env.BUILD_NUMBER}" 

                script {
                      // 脚本式创建一个环境变量
                    env.SCRIPT_VARIABLE = "1" 
                }
            }
        }

        stage("Override Variables") {
            steps {
                script {
                      // 这里的 FOO 不会被重写,违背 Rule No.2
                    env.FOO = "Tara"
                      // SCRIPT_VARIABLE 变量会被重写,符合 Rule No.3
                    env.SCRIPT_VARIABLE = "2" 
                }

                  // FOO 在第 37 行重写失败,还会打印出 "FOO = 你当像鸟飞往你的山"
                echo "FOO = ${env.FOO}" 
                  // 会打印出 "SCRIPT_VARIABLE = 2"
                echo "SCRIPT_VARIABLE = ${env.SCRIPT_VARIABLE}" 

                  // FOO 会被重写,符合 Rule No.1
                withEnv(["FOO=Educated"]) { 
                      // 应该打印 "FOO = Educated"
                    echo "FOO = ${env.FOO}" 
                }

                  // 道理同上
                withEnv(["BUILD_NUMBER=15"]) {
                      // 应该打印出 "BUILD_NUMBER = 15"
                    echo "BUILD_NUMBER = ${env.BUILD_NUMBER}"
                }
            }
        }
    }
}

来验证一下结果吧

看到这,基本的设置应该就没有什么问题了,相信你也发现了,Jenkins 设置环境变量和编程语言的那种设置环境变量还是略有不同的,后者可以将变量赋值为对象,但 Jenkins 就不行,因为在 Jenkins 文件中,所有设置的值都会被当成 String, 难道没办法应用 Boolean 值吗?

Jenkins 中使用 Boolean 值

如果设置一个变量为 false ,Jenkins 就会将其转换为 "false", 如果想使用 Boolean 来做条件判断,必须要调用 toBoolean() 方法做转换

pipeline {
    agent any

    environment {
        IS_BOOLEAN = false
    }

    stages {
        stage("Env Variables") {
            steps {
                script {
                      // Hello 会被打印出来,因为非空字符串都会被认为是 Boolean.True
                    if (env.IS_BOOLEAN) {
                        echo "Hello"
                    }

                      // 真正的 Boolean 比较
                    if (env.IS_BOOLEAN.toBoolean() == false) {
                        echo "日拱一兵"
                    }
                  
                      // 真正的 Boolean 
                    if (!env.IS_BOOLEAN.toBoolean()) {
                        echo "RGYB"
                    }
                }
            }
        }
    }
}

来看运行结果:

如果你写过 Pipeline,你一定会知道,写 Pipeline 是离不开写 shell 的,有些时候,需要将 shell 的执行结果赋值给环境变量,Jenkins 也有方法支持

Shell 结果赋值给环境变量

实现这种方式很简单,只需要记住一个格式:sh(script: 'cmd', returnStdout:true)

pipeline {
    agent any

    environment {
          // 使用 trim() 去掉结果中的空格
        LS_RESULT = "${sh(script:'ls -lah', returnStdout: true).trim()}"
    }

    stages {
        stage("Env Variables") {
            steps {
                echo "LS_RESULT = ${env.LS_RESULT}"
            }
        }
    }
}

总结

关于 Jenkins 环境变量,了解这些基本上就满足绝大多数应用场景了,当再遇到环境变量问题时,可以回过来翻看一下了,有解决的困惑吗?

查看原文

赞 10 收藏 5 评论 0

日拱一兵 发布了文章 · 2020-12-24

Docker Container 就是一个进程,多新鲜啊?

大家对 Docker 都应该有了或多或少的认识了,相信大家都是从这两张图来粗旷的理解 Docker 及容器概念的

那我们如何更轻松的理解容器 Container 呢?说白了

Container 就是一个进程

比如我们 run 一个 mongo 的镜像 image

然后我们通过下面命令列举出正在运行的容器 (以下两个命令等同)

# 旧命令
docker ps
# 新命令
docker container ls

个人建议使用新命令

如果你对上述等同命令有所疑惑,或者好奇动图中的命令自动补全是怎么实现的,以及为什么建议使用新命令,请看 Docker 命令自动补全,在不熟悉命令之前,建议充分利用 TAB 键来查看每个命令的含义,然后结合实际使用场景,慢慢记忆,这样才根深蒂固

我们 run 下面命令:

# top      -- Display the running processes of a container (这是 TAB 补全给的命令提示说明)
docker container top mongo

从上图中可以看到,PID 为 2292,command 为 mongod。既然我们说 container 是一个进程,那我们就应该在 Host 中找得到,执行下面命令

ps aux | grep mongod

查看执行结果:

rgyb           49927   0.0  0.0  4277516    708 s000  S+    4:06PM   0:00.00 grep --color=auto --exclude-dir=.bzr --exclude-dir=CVS --exclude-dir=.git --exclude-dir=.hg --exclude-dir=.svn mongod

关于 mongod 只有我们刚刚执行的 grep mongod 的操作,并没有上面说的 container,这是为什么?

细心的朋友可能已经从动图中发现我是用 Mac 做的测试,Docker Platform 原生运行在 Linux 上(在 Linux 操作系统中就不会有上述问题,大家可以自行尝试)。我是用 Docker for Mac,其实使用的是在macOS上一个特殊的 xhyve VM中运行的小型(定制)Alpine Linux,所以如果想看到这个进程,我们需要进入到 Mac 的这个 Docker VM

执行下述命令:

docker run -it --rm --privileged --pid=host justincormack/nsenter1

(这里暂不展开说明,有兴趣的可以看看这个 justincormack/nsenter1 image 到底做了什么,Docker for Windows 也可以用这种方式进入 Docker VM)

到这,Container 是个进程算是证明完了,但是老gong,你证明这个有什么用呢?

程序员都对进程有基本了解,证明 Container 是个进程,只不过是将一个“新”东西剥开看看本质,并挂靠到你熟悉的内容上

进程就是它可以获取操作系统的哪些资源(网络/磁盘/文件等),当停止进程,也就会自动退出,释放相应资源。所以,接下来只要慢慢探索,一个 Container 中使用了哪些资源,是如何获取资源的。了解了这些,也就慢慢了解了 Docker

大家可以通过下面两个命令了解 Container 的更多详情

  • docker container inspect mongo 查看Container 的详细信息(JSON 的数据形式)
Usage:    docker container inspect [OPTIONS] CONTAINER [CONTAINER...]

Display detailed information on one or more containers

Options:
  -f, --format string   Format the output using the given Go template
  -s, --size            Display total file sizes
------------------------------
docker container inspect mongo

细节内容非常多,可以简单的看一看(一定有你一眼就能看明白的信息),暂不用深究

  • docker container stats mongo 查看资源是用情况(动态统计)
Usage:    docker container stats [OPTIONS] [CONTAINER...]

Display a live stream of container(s) resource usage statistics

Options:
  -a, --all             Show all containers (default shows just running)
      --format string   Pretty-print images using a Go template
      --no-stream       Disable streaming stats and only pull the first result
      --no-trunc        Do not truncate output
----------------------------      
docker container stats mongo

总结

这里没有上来就和大家死背命令,现在不会,将来也不会。而是通过实际目的,结合命令补全自行查看的方式逐步认识与了解。强烈建议大家安装命令补全,可以尽情使用 TAB,也可以在每个命令后面添加 --help 的方式随时查看使用方式

证明 Container 就是一个进程,这样贴近我们已有知识后,学习门槛至少矮了半截吧。最后做个游戏吧,你记住了本文的多少个命令?

灵魂追问

  1. 为什么资源动态统计 Mem LIMIT 是 1.941GB,这个是在哪里设置的?

日拱一兵 | 原创

查看原文

赞 1 收藏 1 评论 0

日拱一兵 发布了文章 · 2020-12-09

Docker 命令自动补全?要的

前言

不知道这个小伙伴有多久没用过 Docker 了, 突然对我说 Docker 命令怎么发生变化了

docker run ...
#变成了
docker container run ...

他说,本来对 Docker 命令就不熟悉,这下感觉更加混乱了。其实个人看来,这么变化还使得命令看着更加规整

当在命令行直接输入 docker 然后回车:

从图中可以看出,Docker 将命令结构化的划分了两大类,Management Commands 和 Commands,其实前者就是一级命令,后者就是子命令 (这是自 Docker 1.13 开始的改动),所以以后使用命令就是这样滴:

docker <Management Command> <Sub-Command <Opts/Args>>

这样以后我们使用命令只需要先关注 Management Commands 就可以了,那后续的子命令还是不知道怎么用,还要一点点查询嘛?

Docker 命令自动补全

为了解决这个问题,Docker 也提供了非常完善的命令自动补全功能,也就是把一切交给 Tab 键

Mac 安装Docker命令自动补全

逐条键入下面命令:

brew install bash-completion

sudo curl -L https://raw.githubusercontent.com/docker/compose/1.27.4/contrib/completion/bash/docker-compose -o /usr/local/etc/bash_completion.d/docker-compose

打开 ~/.bash_profile 文件,将下面内容粘贴进去:

if [ -f $(brew --prefix)/etc/bash_completion ]; then
 . $(brew --prefix)/etc/bash_completion
 fi

然后刷新使之生效

source ~/.bash_profile

我觉得 Zsh 更好,为什么?答案请看这篇:这篇 iTerm2 + Oh My Zsh 教程手把手让你成为这条街最靓的仔

Zsh安装Docker命令自动补全

如果没有安装 Oh-My-Zsh shell,第一步则是要先安装它,逐条键入下面命令:

mkdir -p ~/.zsh/completion

curl -L https://raw.githubusercontent.com/docker/compose/1.27.4/contrib/completion/zsh/_docker-compose > ~/.zsh/completion/_docker-compose

打开 ~/.zshrc 文件,将下面内容粘贴进去:

fpath=(~/.zsh/completion $fpath)
autoload -Uz compinit && compinit -i

比如我的 ~/.zshrc 文件内容:

搜索该文件插件位置,更新插件内容:

plugins=(... docker docker-compose
)

顺便说一下,强烈建议使用 git 插件

最后刷新一下使之生效:

source ~/.zshrc

总结

自动补全功能就可以疯狂利用你的 Tab 键,这比查阅文档要更加快捷,来看看效果:

日拱一兵 | 原创

查看原文

赞 6 收藏 5 评论 0

日拱一兵 发布了文章 · 2020-12-02

Windows 的这款工具,有时让我觉得 Mac 不是很香

上次写了个 cheat.sh 在手,天下我有,小伙伴们热情高涨,觉得这是一个没有杂质的好工具;也有小伙伴抱怨说对 Windows 用户不是特别友好 (其实用 curl API 是没啥问题的)。为了「雨露均沾」,今天就介绍一款对 Windows 超级 * N (N 是几,大家读完文章自己定) 友好的工具

我甚至有些嫉妒,因为 Windows 的这款工具需要我在 Mac 结合好几个工具才能达到与之相媲美的效果

今天的主角就是~~~~

PowerToys

PowerToys 就是这么一款有 Power 的工具,使用的前置要求很简单,Windows 10 操作系统,截至写本文时,PowerToys release 的版本为 v0.25.0,打开 release 页面,直接拉到页面底部,下载 exe 文件即可

安装没有什么顾虑,选定一个安装目录,无脑下一步,直至安装结束

image

安装完打开 PowerToys,在左侧边栏看到的就是 PowerToys 的全部功能

image

我们来逐一看看这些强大的工具

Color Picker

直译过来就是颜色提取器在 Mac 中我用 Sip),这应该是前端工程师的标配,后端工程师建博客以及画图,如果需要好的配色,都是很需要它滴

使用快捷键 Win + Shift + C 就可以滑动鼠标取色了,如果一个地方的颜色分布比较细腻,还可以通过滚动鼠标滚轮进行放大提取,支持两种颜色值(RGB | HEX)

FancyZones

程序员从前怼到后,干一个活同时开很多个应用都是很正常的现象,合理快捷的利用屏幕区域显得尤为关键。比如打开一个 Terminal, 它只需要在一个角落里让我们动态地看到 log 就行。按照以往,我们可能会选择用鼠标缩放某个应用到指定大小,然后用鼠标拖拽的合适的位置上,两个字「麻烦

FancyZones 就是一个非常有效的窗口管理器 (Mac 中我用 SizeUp), 你可以按照模版布局进行设置:

也可以自定义你喜欢的的布局:

然后就是按照你喜欢的方式拖拽应用到相应的窗口就可以了,还可以通过快捷键 Win+Left/Right Arrow 快速在多个区域之间移动你的应用 (如果快捷键有冲突,可以自行设置),FancyZones 还有很多高级设置,大家根据自己的喜好一次配置到位就好了

File Explorer Preview

Mac Finder 自带文件预览功能,但是却没有渲染文件并预览的功能,File Explorer Preview 就实现了这个完整的功能,使用这个功能的前提是把 Preview Pane(预览面板)打开,就是选中这个:

File Explorer Preview 目前支持三种文件类型的渲染和预览:

  • MarkDown
  • SVG
  • HTML

如果你打开 SVG 文件并编辑颜色区域,CTRL + S 保存,还可以实时预览出新的变化,爽歪歪

Image Resizer

处理图片大小,日常用 Mac 的Preview 或者找一些在线的 Resize 图片工具。但 Image Resizer 缺带来了不一样的赶脚

  • 单个图片 Resize
  • 批量多个图片 Resize
  • 自定义 Resize 大小
  • 批量生成 Resize 之后的图片文件名

直接一张图片自己体会吧

Keyboard Manager

直译键盘管理器,其实就是重定义快捷键的。当多个应用快捷键冲突的时候,在此集中管理那是一点都不混乱,不但可以 remap 全局的快捷键,还可以 remap 某个应用的快捷键

不知道打游戏的快捷键会不会被 remap,大家可以试试

PowerRename

如其名,这是一个重命名的功能,到底 Power 在哪里呢?还是看动图说明吧

配合下面这些 Options 的使用,真是强大而灵活,这就是 Power

文件重命名还动态支持一些变量,这个大家自行查看说明就好了

PowerToys Run

当我们查找一个应用,我们要找桌面上的图片,常用的还好一下可以定位到位置,万一不常用可使要多找上一会;如果要找个文件或文件夹更是麻烦。PowerToys Run 就为解决这个痛点而生 (Mac 下我用 Alfred),我知道有些插件比如 UTools 也有类似的功能,但是 PowerToys Run 和 Windows 系统融合得更加流畅

日常做个加减乘除,也不用再打开计算器了

这一切,只需要按一下快捷键 Alt+Space 即可

Shortcut Guide

这是一个显示使用 Windows 键的常见键盘快捷键,使用方式很简单,按住 Windows 键大概 1秒钟,就会显示出配合 Windows 键使用的快捷键了,甚至任务栏的应用都有快捷键,直接一键跳到相应的应用上,不用再 tab~tab 来回切换了 (Mac 下我要用 cheatsheet 和 PopClip 结合使用)

看看到上图右侧,一定有你不知道的快捷键功能,是不是方便了许多呢?

Video Conference Mute (Experimental)

这是一个 Pre-release 的功能,受疫情影响,目前还有很多小伙伴在家远程办公,视频会议成为了日常的沟通手段。在办公室喊一嗓子就可以解决的问题都要跑到线上来说明,难免会有一些多个任务并行的时刻,都开视频语音肯定会互相干扰,所以灵活的切换或静音就是一个非常给力的功能了,这个自行设置快捷键就可以了:

总结

Github上有伙伴说,PowerToys 工具的这些功能在未来甚至可能直接成为 Windows 的内置功能,这个到底能否成真不可知,至少现在我们也可以享受这些功能带来的便利,这个多合一的工具,偶尔让我觉得 Mac 也不是很香,有 Windows 10 系统的小伙伴们都可以实验起来了,相信这个工具会让你提升很多效率

在介绍 PowerToys 工具时,我也在文中附上了 Mac 的工具,这样的均沾您觉得还可以吗?

日拱一兵 | 原创

查看原文

赞 10 收藏 3 评论 4

日拱一兵 发布了文章 · 2020-11-30

cheat.sh在手,天下我有

前言

作为程序员需要了解的东西有很多,日常编码和写脚本脱离不开各式语言与 Linux 命令。为了记住一些杂乱的或不被经常使用的知识点,我们迫切需要一个“小抄”/备忘录,小抄内容多了自然繁杂,所以我们希望这个小抄要:

  1. 简洁:只包含你想要的内容,没有其他「花边」内容
  2. 快速:可以立即使用
  3. 全面:能基本包含你所有问题的答案
  4. 通用:它应该在任何地方、任何时间都可用,不需要任何准备
  5. 不唐突:它不应该让你从主要任务上分心(比如减少应用切换)
  6. 辅导:它应该帮助你学习这个科目(在答案基础上扩展知识)
  7. 不显眼:应该可以在完全不被注意的情况下使用(就好比划词翻译,鼠标轻点就有答案)

老gong,你是想介绍哆啦A梦吗?

<img data-original="https://cdn.jsdelivr.net/gh/FraserYu/img-host/blog-img20201115205225.png" style="zoom:25%;" />

非也,其实是 cheat.sh

cheat.sh 介绍

cheat.sh 就是一个可以满足上述愿望的小哆啦,目前在 Github 的形式是这样滴:

Commit 也非常活跃,就是这么一个哆啦

  • 它提供一个简单的 curl/浏览器接口方便我们查询
  • 目前覆盖 58 种编程语言,多种 DBMS以及超过 1000 个UNIX/Linux 常用命令
  • 提供对世界上最好的社区驱动的备忘单存储库的访问,与StackOverflow持平(绝对是高质量的内容)
  • 提供命令行客户端
  • 可以嵌套在代码编辑器中使用,比如 Intellij IDEA 和 VS-Code
  • 支持一个特殊的隐身模式,可以完全隐形的使用它 (感觉挺神秘的呢)

先来认识一下,打开命令行终端,使用 curl 命令输入:

curl cht.sh

如何使用 cheat.sh

先拿几个常用的 UNIX/Linux 命令练练手:

curl cht.sh/tar

瞧这整理的规范和简洁不?

curl cht.sh/tr

答案依旧整洁规范, 同时还高亮显示,友好的很啊

如果你不知道某个命令,还可以使用 ~Keyword 的形式来查询,比如你想查看如何建立快照

curl cht.sh/~snapshot

上面说过, cheat.sh 包含 1000 多个常用的 UNIX/Linux 命令,当需要的时候,按照语法 curl cht.sh/<you-cmd> 尽情查询吧

除了 Linux 命令,我们还说支持 58 种语言,当写代码时某个 API 不会用或需要完成某些操作,cheat.sh 依旧可以帮上忙,比如我总是记不住 Java Lambda 的 group 操作

curl cht.sh/java/lambda+group

记住下面的标准格式,搜索的结果都是和 StackOverflow 一样的高质量

如果这个答案还不是你想要的,你就可以添加数字进行翻页获取其他结果

另外你觉得结果中的注释很碍眼的话,可以在每次查询的后面加上 \?Q ,就像这样:

curl cht.sh/java/lambda+group\?Q

当然每种语言都默认支持 :list 和 :help 两种查询方式,作为帮助指令,大家可以自行尝试了,比如 go 语言

curl cht.sh/go/:list
curl cht.sh/go/:help

相信到这里,你已经可以掌握 cheat.sh 的基本使用方式了

但是,这种 curl 方式总是显得不是很方便,比如空格要用 + 替代,日常工作语言比如只有 Java,每次都要输入 curl cht.sh/java/xxxxxxx 这样就会显得很麻烦, 为了解决这些问题,cheat.sh 很贴心,也提供了命令行客户端,大大简化了搜索操作

Cheat.sh 命令行客户端

安装

安装 CLI Client (Command Line Client)非常简单,只需要依次执行下面的命令即可:

# 注意你的环境变量 PATH 已经 import 了 ~/bin 下的内容
mkdir -p ~/bin/
curl https://cht.sh/:cht.sh > ~/bin/cht.sh
chmod +x ~/bin/cht.sh

如果要保证 shell 模式可用,还需要安装一个依赖 rlwrap, 下面两种安装方式都可以(我直接用brew安装的)

brew install rlwrap
# 或者
sudo apt install rlwrap

使用

有了 CLI Client 之后,来看一看搜索上的变化:

这个 CLI Client 还提供了一个更加便利的 shell 模式:

cht.sh --shell

如下图,每次直接按照语言搜索相关内容就可以了:

通常我们编程在一段时间内会用一种语言,我们可以进一步简化搜索过程,cd 到某个语言目录下:

如果进入 shell 模式,同时想一次性进入某个语言目录,也可以通过一条命令搞定:

cht.sh --shell java

隐身模式

相信很多小伙伴都配有划词工具,比如某个单词不会了,选中相应的单词,就会出来解释,cheat.sh 也有类似的模式,进入某个语言目录下之后,输入 stealth Q 就可以进入这个模式了:

用鼠标选中文本后,用起来的效果就是这样滴:

不过这里建议,搜索的单词不要超过 5 个

以上这些使用方式,默认都会调用它自己的服务,为了更快速的响应,我们可以搭建自己的服务,前提是要更改 CLI Client 的 server URL:

打开或新建 ~/.cht.sh/cht.sh.conf ,添加

CHTSH_URL=https://cht.sh            # URL of the cheat.sh server

然后就可以 run 自己的服务

git clone https://github.com/chubin/cheat.sh.git
cd cheat.sh
docker-compose up

最后访问服务: http://localhost:8002

集成主流编辑器

cheat.sh 同样和主流编辑器有很好的集成:

FeatureEmacsSublimeVimVSCodeIDEAQtCreator
Command queries
Queries from buffer
Toggle comments
Prev/next answer
Multiple answers
Warnings as queries
Queries history
Session id
Configurable server

Vim 的集成度是最高的,大家可以根据 cheat.sh-vim 自行配置

VSCode 和 IDEA 是大家高频使用的两个 IDE,和他们集成就很简单了,只需要安装相应的插件:

VSCode 插件

安装 vscode-snippet 就可以在 VSCode 中快速使用这个功能了

IDEA 插件

安装 idea-cheatsh-plugin 这个插件就可以在 Intellij IDEA 中使用这个功能了

总结

至于支持的 58 种语言都是什么,请大家自行参考 README 文档,关于 cheat.sh, 了解这些基本的使用就已经够了,还是那句话,好的工具是用来提高工作效率的,不要被工具过度捆绑
日拱一兵 | 原创

查看原文

赞 31 收藏 20 评论 1

日拱一兵 发布了文章 · 2020-11-05

妙用 Intellij IDEA 创建临时文件,Git 跟踪不到的那种

| 好看请赞,养成习惯

  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough

现陆续将Demo代码和技术文章整理在一起 Github实践精选 ,方便大家阅读查看,本文同样收录在此,觉得不错,还请Star


多数人对于 Intellij IDEA 可能始于其「颜值」,终于其「才华」,外加各种插件 buff 的加成,coding 的节奏分分钟要暴走

抛开自己安装的插件,IDEA 其实也内秀的很,在 IntelliJ IDEA HTTP Client高级使用详解 中详细的介绍了开发小组内放弃 Postman 的理由,用过的小伙伴后台留言直呼爽。

但今天要介绍的是另外一个秀的有些含蓄的小功能,她那么显眼的站在你面前,你却选择忽视她

有时候我们需要在项目之外创建一些临时文件或一些实验性代码,创建在项目中可能一不小心 Git 误提交,不创建项目里又需要切换到其他应用上来回拷贝,对于专注 coding 的我们来说,总显得不够流畅

IDEA 其实早已为我们解决了这个痛点,借助 Scratch files 和 Scratch buffers 就可以解决

Scratch files | Scratch buffers

IDEA 会在项目平行目录中自动生成下面的目录结构,这就是今天主角的位置,你有正视过她们吗?

Scratch files 和 Scratch buffers 二者还是有很大差别的:

Scratch files

Scratch files 是一种功能完整的、可运行的和可调试的文件,支持语法高亮显示、代码补全和相应文件类型的所有其他特性 (说白了很像Jupyter Notebooks)

Scratch file 的功能,就可以满足我们在 Coding 中的各种想法,用于快速记录。创建好的这个文件并不存储在我们的项目目录中(避免了意外 git push 的尴尬),甚至在 IDEA 中切换到其他项目中也可以看到你刚刚创建的这个文件,进一步说白一点,这是凌驾在项目之上的一个全局功能,如下图,切换到了其他项目中,Scratch files 依旧存在

创建一个 Scratch file 很简单,使用快捷键 ⇧⌘N ,支持关键字搜索,直接创建相应类型的文件就可以,比如这里创建一个 java 文件

默认会创建名为 Scratch.java 的文件,并写好 main 函数,就像这样:

在这里和你正常在项目中写 java 代码没什么区别,你可以使用 git status 命令来验证,结果是 Git 完全不 track 的,放心的创建临时文件就好了

其实我很常用的是创建一个 scratch.sql 文件,存放一些日常 SQL 语句,同样的创建方式,搜索 sql 默认会创建一个 scratch.sql 的文件,文件创建之后要配置数据源

配置好后,就可以尽情的书写你日常用到的 SQL 了

光标放到 SQL 位置,使用快捷键 ⌘⏎ ,选择相应的 Session (会话) 就可以 run 这条 SQL 语句了

执行后,就看到你熟悉的画面了,因为这里的画面就是 IDEA 默认的数据库管理工具,这篇 IntelliJ IDEA的数据库管理工具实在太方便了 文章中有过详细说明

创建 Scratch files 可选择的类型非常多,总有一个适合你的一些需要

Scratch buffers

和 Scratch files 类似,只不过 Scratch buffers 就是一个简单的文本文件,没有任何编码辅助功能 (说白了,可以将它理解成一个记事本),创建 Scratch buffers 没有直接的快捷键,需要用通用快捷键 ⇧⌘A ,并输入关键字(比如 buffer):

敲击回车键,就会创建好一个名为 scratch.txt 的文本文件。反复创建 Scratch buffers,你会发现,最多只允许创建 5 个

因为这个操作不频繁,所以也就没有默认快捷键,如果你是个快捷键强迫症患者,那就在 KeyMap 处添加相应的快捷键就可以了

这里要说明一个注意事项

如果你在 buffer1.txt 文件中记录了一些内容,当你创建第 6 次 scratch buffer 文件时,buffer1.txt 的内容就会被清空

别看 Scratch files 和 Scratch buffers 不会被 Git 跟踪,但是同样可以通过鼠标右键查看 Local History 来查看过往所有改动

减少应用切换,尽量保持专注,可以借助预览模式(ctrl+opt+v) 和 快捷键 (cmd+e)切换最近常用文件,戴上耳机,快告诉我,时速多少迈?

总结

这是一个很小的功能,但是在日常开发中真的可以带来很大的帮助:

  • 跨越项目的访问
  • 不被 Git 所跟踪,防止误提交
  • 可以临时测试各种语言的代码

最后配合预览模式的使用,减少应用之间的切换,一切显得都很流畅

当然,保持 Code Clean,减少猜测和回忆时间,我们最好给我们创建的 Scratch files 和 Scratch buffers 更友好的文件名称

日拱一兵 | 原创

查看原文

赞 4 收藏 2 评论 0

日拱一兵 发布了文章 · 2020-08-26

10分钟搞定 Java 并发队列好吗?好的

| 好看请赞,养成习惯

  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough

现陆续将Demo代码和技术文章整理在一起 Github实践精选 ,方便大家阅读查看,本文同样收录在此,觉得不错,还请Star


前言

如果按照用途与特性进行粗略的划分,JUC 包中包含的工具大体可以分为 6 类:

  1. 执行者与线程池
  2. 并发队列
  3. 同步工具
  4. 并发集合
  5. 原子变量

并发系列中,主要讲解了 执行者与线程池同步工具 , 在分析源码时,或多或少的提及到了「队列」,队列在 JUC 中也是多种多样存在,所以本文就以「远看」视角,帮助大家快速了解与区分这些看似「杂乱」的队列

并发队列

Java 并发队列按照实现方式来进行划分可以分为 2 种:

  1. 阻塞队列
  2. 非阻塞队列

如果你已经看完并发系列锁的实现,你已经能够知道他们实现的区别:

前者就是基于锁实现的,后者则是基于 CAS 非阻塞算法实现的

常见的队列有下面这几种:

瞬间懵逼?看到这个没有人性的图想直接走人? 客观先别急,一会就柳暗花明了

当下你也许有个问题:

为什么会有这么多种队列的存在

锁有应对各种情形的锁,队列也自然有应对各种情形的队列了, 是不是也有点单一职责原则的意思呢?

所以我们要了解这些队列到底是怎么设计的?以及用在了哪些地方?

先来看下图

如果你在 IDE 中打开以上非阻塞队列和阻塞队列,查看其实现方法,你就会发现,阻塞队列非阻塞队列额外支持两种操作

  1. 阻塞的插入

    当队列满时,队列会阻塞插入元素的线程,直到队列不满

  2. 阻塞的移除

    当队列为空时,获取元素的线程会阻塞,直到队列变为非空

综合说明入队/出队操作,看似杂乱的方法,用一个表格就能概括了

抛出异常

  • 当队列满时,此时如果再向队列中插入元素,会抛出 IllegalStateException (这很好理解)
  • 当队列空时,此时如果再从队列中获取元素,会抛出 NoSuchElementException (这也很好理解)

返回特殊值

  • 当向队列插入元素时,会返回元素是否插入成功,成功则返回 true
  • 当从队列移除元素时,如果没有则返回 null

一直阻塞

  • 当队列满时,如果生产者线程向队列 put 元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出
  • 当队列为空时,如果消费者线程 从队列里面 take 元素,队列会阻塞消费者线程,直到队列不为空

关于阻塞,我们其实早在 并发编程之等待通知机制 就已经充分说明过了,你还记得下面这张图吗?原理其实是一样一样滴

超时退出

和锁一样,因为有阻塞,为了灵活使用,就一定支持超时退出,阻塞时间达到超时时间,就会直接返回

至于为啥插入和移除这么多种单词表示形式,我也不知道,为了方便记忆,只需要记住阻塞的方法形式即可:

单词 puttake 字母 t 首位相连,一个放,一个拿

到这里你应该对 Java 并发队列有了个初步的认识了,原来看似杂乱的方法貌似也有了规律。接下来就到了疯狂串知识点的时刻了,借助前序章节的知识,分分钟就理解全部队列了

ArrayBlockingQueue

之前也说过,JDK中的命名还是很讲究滴,一看这名字,底层就是数组实现了,是否有界,那就看在构造的时候是否需要指定 capacity 值了

填鸭式的说明也容易忘,这些都是哪看到的呢?在所有队列的 Java docs 的第一段,一句话就概括了该队列的主要特性,所以强烈建议大家自己在看源码时,简单瞄一眼 docs 开头,心中就有多半个数了

在讲 Java AQS队列同步器以及ReentrantLock的应用 时我们介绍了公平锁与非公平锁的概念,ArrayBlockingQueue 也有同样的概念,看它的构造方法,就有 ReentrantLock 来辅助实现

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

默认情况下,依旧是不保证线程公平访问队列(公平与否是指阻塞的线程能否按照阻塞的先后顺序访问队列,先阻塞线访问,后阻塞后访问)

到这我也要临时问一个说过多次的面试送分题了:

为什么默认采用非公平锁的方式?它较公平锁方式有什么好处,又可能带来哪些问题?

知道了以上内容,结合上面表格中的方法,ArrayBlockingQueue 就可以轻松过关了

和数组相对的自然是链表了

LinkedBlockingQueue

LinkedBlockingQueue 也算是一个有界阻塞队列 ,从下面的构造函数中你也可以看出,该队列的默认和最大长度为 Integer.MAX_VALUE ,这也就 docs 说 optionally-bounded 的原因了

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
  this.capacity = capacity;
  last = head = new Node<E>(null);
}
正如 Java 集合一样,链表形式的队列,其存取效率要比数组形式的队列高。但是在一些并发程序中,数组形式的队列由于具有一定的可预测性,因此可以在某些场景中获得更高的效率

看到 LinkedBlockingQueue 是不是也有些熟悉呢? 为什么要使用线程池? 就已经和它多次照面了

创建单个线程池

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

创建固定个数线程池

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

面试送分题又来了

使用 Executors 创建线程池很简单,为什么大厂严格要求禁用这种创建方式呢?

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界的阻塞队列,默认情况下采用自然顺序升序排列,当然也有非默认情况自定义优先级,需要排序,那自然要用到 Comparator 来定义排序规则了

可以定义优先级,自然也就有相应的限制,以及使用的注意事项

  • 按照上图说明,队列中不允许存在 null 值,也不允许存在不能排序的元素
  • 对于排序值相同的元素,其序列是不保证的,但你可以继续自定义其他可以区分出来优先级的值,如果你有严格的优先级区分,建议有更完善的比较规则,就像 Java docs 这样

     class FIFOEntry<E extends Comparable<? super E>>
         implements Comparable<FIFOEntry<E>> {
       static final AtomicLong seq = new AtomicLong(0);
       final long seqNum;
       final E entry;
       public FIFOEntry(E entry) {
         seqNum = seq.getAndIncrement();
         this.entry = entry;
       }
       public E getEntry() { return entry; }
       public int compareTo(FIFOEntry<E> other) {
         int res = entry.compareTo(other.entry);
         if (res == 0 && other.entry != this.entry)
           res = (seqNum < other.seqNum ? -1 : 1);
         return res;
       }
     }
  • 队列容量是没有上限的,但是如果插入的元素超过负载,有可能会引起OutOfMemory异常(这是肯定的),这也是为什么我们通常所说,队列无界,心中有界
  • PriorityBlockingQueue 也有 put 方法,这是一个阻塞的方法,因为它是无界的,自然不会阻塞,所以就有了下面比较聪明的做法

    public void put(E e) {
        offer(e); // never need to block  请自行对照上面表格
    }
  • 可以给定初始容量,这个容量会按照一定的算法自动扩充

    // Default array capacity.
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    这里默认的容量是 11,由于也是基于数组,那面试送分题又来了

    你通常是怎样定义容器/集合初始容量的?有哪些依据?

DelayQueue

DelayQueue 是一个支持延时获取元素的无界阻塞队列

  • 是否延时肯定是和某个时间(通常和当前时间) 进行比较
  • 比较过后还要进行排序,所以也是存在一定的优先级

看到这也许觉得这有点和 PriorityBlockingQueue 很像,没错,DelayQueue 的内部也是使用 PriorityQueue

上图绿色框线也告诉你,DelayQueue 队列的元素必须要实现 Depayed 接口:

所以从上图可以看出使用 DelayQueue 非常简单,只需要两步:

实现 getDelay() 方法,返回元素要延时多长时间
public long getDelay(TimeUnit unit) {
      // 最好采用纳秒形式,这样更精确
    return unit.convert(time - now(), NANOSECONDS);
}
实现 compareTo() 方法,比较元素顺序
public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

上面的代码哪来的呢?如果你打开 ScheduledThreadPoolExecutor 里的 ScheduledFutureTask,你就看到了 (ScheduledThreadPoolExecutor 内部就是应用 DelayQueue)

所以综合来说,下面两种情况非常适合使用 DelayQueue

  • 缓存系统的设计:用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,如果能从 DelayQueue 中获取元素,说明缓存有效期到了
  • 定时任务调度:用 DelayQueue 保存当天会执行的任务以及时间,如果能从 DelayQueue 中获取元素,任务就可以开始执行了。比如 TimerQueue 就是这样实现的

SynchronousQueue

这是一个不存储元素的阻塞队列,不存储元素还叫队列?

没错,SynchronousQueue 直译过来叫同步队列,如果在队列里面呆久了应该就算是“异步”了吧

所以使用它,每个put() 操作必须要等待一个 take() 操作,反之亦然,否则不能继续添加元素

实际中怎么用呢?假如你需要两个线程之间同步共享变量,如果不用 SynchronousQueue 你可能会选择用 CountDownLatch 来完成,就像这样:

ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);



Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();
};



Runnable consumer = () -> {
    try {
        countDownLatch.await();
        Integer consumedElement = sharedState.get();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

这点小事就用计数器来实现,显然很不合适,用 SynchronousQueue 改造一下,感觉瞬间就不一样了

ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

其实 Executors.newCachedThreadPool() 方法里面使用的就是 SynchronousQueue

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
看到前面 LinkedBlockingQueue 用在 newSingleThreadExecutornewFixedThreadPool 上,而newCachedThreadPool 却用 SynchronousQueue,这是为什么呢?

因为单线程池和固定线程池中,线程数量是有限的,因此提交的任务需要在LinkedBlockingQueue队列中等待空余的线程;

而缓存线程池中,线程数量几乎无限(上限为Integer.MAX_VALUE),因此提交的任务只需要在SynchronousQueue 队列中同步移交给空余线程即可, 所以有时也会说 SynchronousQueue 的吞吐量要高于 LinkedBlockingQueueArrayBlockingQueue

LinkedTransferQueue

简单来说,TransferQueue提供了一个场所,生产者线程使用 transfer 方法传入一些对象并阻塞,直至这些对象被消费者线程全部取出。

你有没有觉得,刚刚介绍的 SynchronousQueue 是否很像一个容量为 0 的 TransferQueue

但 LinkedTransferQueue 相比其他阻塞队列多了三个方法

  • transfer(E e)

    如果当前有消费者正在等待消费元素,transfer 方法就可以直接将生产者传入的元素立刻 transfer (传输) 给消费者;如果没有消费者等待消费元素,那么 transfer 方法会把元素放到队列的 tail(尾部)

    节点,一直阻塞,直到该元素被消费者消费才返回

  • tryTransfer(E e)

    tryTransfer,很显然是一种尝试,如果没有消费者等待消费元素,则马上返回 false ,程序不会阻塞

  • tryTransfer(E e, long timeout, TimeUnit unit)

    带有超时限制,尝试将生产者传入的元素 transfer 给消费者,如果超时时间到,还没有消费者消费元素,则返回 false

你瞧,所有阻塞的方法都是一个套路:

  1. 阻塞方式
  2. 带有 try 的非阻塞方式
  3. 带有 try 和超时时间的非阻塞方式

看到这你也许感觉 LinkedTransferQueue 没啥特点,其实它和其他阻塞队列的差别还挺大的:

BlockingQueue 是如果队列满了,线程才会阻塞;但是 TransferQueue 是如果没有消费元素,则会阻塞 (transfer 方法)

这也就应了 Doug Lea 说的那句话:

LinkedTransferQueue is actually a superset of ConcurrentLinkedQueue, SynchronousQueue (in “fair” mode), and unbounded
LinkedBlockingQueues. And it’s made better by allowing you to mix and
match those features as well as take advantage of higher-performance i
mplementation techniques.

简单翻译:

LinkedTransferQueueConcurrentLinkedQueue, SynchronousQueue (在公平模式下), 无界的LinkedBlockingQueues等的超集; 允许你混合使用阻塞队列的多种特性

所以,在合适的场景中,请尽量使用LinkedTransferQueue

上面都看的是单向队列 FIFO,接下来我们看看双向队列

LinkedBlockingDeque

LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,凡是后缀为 Deque 的都是双向队列意思,后缀的发音为deck——/dek/, 刚接触它时我以为是这个冰激凌的发音

所谓双向队列值得就是可以从队列的两端插入和移除元素。所以:

双向队列因为多了一个操作队列的入口,在多线程同时入队是,也就会减少一半的竞争

队列有头,有尾,因此它又比其他阻塞队列多了几个特殊的方法

  • addFirst
  • addLast
  • xxxxFirst
  • xxxxLast
  • ... ...

这么一看,双向阻塞队列确实很高效,

那双向阻塞队列应用在什么地方了呢?

不知道你是否听过 “工作窃取”模式,看似不太厚道的一种方法,实则是高效利用线程的好办法。下一篇文章,我们就来看看 ForkJoinPool 是如何应用 “工作窃取”模式的

总结

到这关于 Java 队列(其实主要介绍了阻塞队列)就快速的区分完了,将看似杂乱的方法做了分类整理,方便快速理解其用途,同时也说明了这些队列的实际用途。相信你带着更高的视角来阅读源码会更加轻松,最后也希望大家认真看两个队列的源码实现,在遇到队列的问题,脑海中的画面分分钟就可以搞定了

参考

  1. Java 并发编程的艺术
  2. Java 并发编程之美
  3. https://zhuanlan.zhihu.com/p/...

日拱一兵 | 原创

查看原文

赞 26 收藏 18 评论 0

日拱一兵 发布了文章 · 2020-08-12

“既生 ExecutorService, 何生 CompletionService?”

前言

我会手动创建线程,为什么要使用线程池? 中详细的介绍了 ExecutorService,可以将整块任务拆分做简单的并行处理;

不会用Java Future,我怀疑你泡茶没我快 中又详细的介绍了 Future 的使用,填补了 Runnable 不能获取线程执行结果的空缺

将二者结合起来使用看似要一招吃天下了(Java有并发,并发之大,一口吃不下), but ~~ 是我太天真

ExecutorService VS CompletionService

假设我们有 4 个任务(A, B, C, D)用来执行复杂的计算,每个任务的执行时间随着输入参数的不同而不同,如果将任务提交到 ExecutorService, 相信你已经可以“信手拈来”

ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));

// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (Future future:futures) {
    Integer result = future.get();
    // 其他业务逻辑
}

先直入主题,用 CompletionService 实现同样的场景

ExecutorService executorService = Executors.newFixedThreadPool(4);

// ExecutorCompletionService 是 CompletionService 唯一实现类
CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService );

List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));

// 遍历 Future list,通过 get() 方法获取每个 future 结果
for (int i=0; i<futures.size(); i++) {
    Integer result = executorCompletionService.take().get();
    // 其他业务逻辑
}

两种方式在代码实现上几乎一毛一样,我们曾经说过 JDK 中不会重复造轮子,如果要造一个新轮子,必定是原有的轮子在某些场景的使用上有致命缺陷

既然新轮子出来了,二者到底有啥不同呢? 在 搞定 CompletableFuture,并发异步编程和编写串行程序还有什么区别? 文中,我们提到了 Future get() 方法的致命缺陷:

如果 Future 结果没有完成,调用 get() 方法,程序会阻塞在那里,直至获取返回结果

先来看第一种实现方式,假设任务 A 由于参数原因,执行时间相对任务 B,C,D 都要长很多,但是按照程序的执行顺序,程序在 get() 任务 A 的执行结果会阻塞在那里,导致任务 B,C,D 的后续任务没办法执行。又因为每个任务执行时间是不固定的,所以无论怎样调整将任务放到 List 的顺序,都不合适,这就是致命弊端

新轮子自然要解决这个问题,它的设计理念就是哪个任务先执行完成,get() 方法就会获取到相应的任务结果,这么做的好处是什么呢?来看个图你就瞬间理解了

两张图一对比,执行时长高下立判了,在当今高并发的时代,这点时间差,在吞吐量上起到的效果可能不是一点半点了

那 CompletionService 是怎么做到获取最先执行完的任务结果的呢?

远看CompletionService 轮廓

如果你使用过消息队列,你应该秒懂我要说什么了,CompletionService 实现原理很简单

就是一个将异步任务的生产和任务完成结果的消费解耦的服务

用人话解释一下上面的抽象概念我只能再画一张图了

说白了,哪个任务执行的完,就直接将执行结果放到队列中,这样消费者拿到的结果自然就是最早拿到的那个了

从上图中看到,有任务,有结果队列,那 CompletionService 自然也要围绕着几个关键字做文章了

  • 既然是异步任务,那自然可能用到 Runnable 或 Callable
  • 既然能获取到结果,自然也会用到 Future 了

带着这些线索,我们走进 CompletionService 源码看一看

近看 CompletionService 源码

CompletionService 是一个接口,它简单的只有 5 个方法:

Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

关于 2 个 submit 方法, 我在 不会用Java Future,我怀疑你泡茶没我快 文章中做了非常详细的分析以及案例使用说明,这里不再过多赘述

另外 3 个方法都是从阻塞队列中获取并移除阻塞队列第一个元素,只不过他们的功能略有不同

  • Take: 如果队列为空,那么调用 take() 方法的线程会被阻塞
  • Poll: 如果队列为空,那么调用 poll() 方法的线程会返回 null
  • Poll-timeout: 以超时的方式获取并移除阻塞队列中的第一个元素,如果超时时间到,队列还是空,那么该方法会返回 null

所以说,按大类划分上面5个方法,其实就是两个功能

  • 提交异步任务 (submit)
  • 从队列中拿取并移除第一个元素 (take/poll)

CompletionService 只是接口,ExecutorCompletionService 是该接口的唯一实现类

ExecutorCompletionService 源码分析

先来看一下类结构, 实现类里面并没有多少内容

<fancybox></fancybox>

ExecutorCompletionService 有两种构造函数:

private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;

public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor,
                                 BlockingQueue<Future<V>> completionQueue) {
    if (executor == null || completionQueue == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = completionQueue;
}

两个构造函数都需要传入一个 Executor 线程池,因为是处理异步任务的,我们是不被允许手动创建线程的,所以这里要使用线程池也就很好理解了

另外一个参数是 BlockingQueue,如果不传该参数,就会默认队列为 LinkedBlockingQueue,任务执行结果就是加入到这个阻塞队列中的

所以要彻底理解 ExecutorCompletionService ,我们只需要知道一个问题的答案就可以了:

它是如何将异步任务结果放到这个阻塞队列中的?

想知道这个问题的答案,那只需要看它提交任务之后都做了些什么?

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

我们前面也分析过,execute 是提交 Runnable 类型的任务,本身得不到返回值,但又可以将执行结果放到阻塞队列里面,所以肯定是在 QueueingFuture 里面做了文章

从上图中看一看出,QueueingFuture 实现的接口非常多,所以说也就具备了相应的接口能力。

重中之重是,它继承了 FutureTask ,FutureTask 重写了 Runnable 的 run() 方法 (方法细节分析可以查看FutureTask源码分析 ) 文中详细说明了,无论是set() 正常结果,还是setException() 结果,都会调用 finishCompletion() 方法:

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

      // 重点 重点 重点
    done();

    callable = null;        // to reduce footprint
}

上述方法会执行 done() 方法,而 QueueingFuture 恰巧重写了 FutureTask 的 done() 方法:

方法实现很简单,就是将 task 放到阻塞队列中

protected void done() { 
  completionQueue.add(task); 
}

执行到此的 task 已经是前序步骤 set 过结果的 task,所以就可以通过消费阻塞队列获取相应的结果了

相信到这里,CompletionService 在你面前应该没什么秘密可言了

CompletionService 的主要用途

在 JDK docs 上明确给了两个例子来说明 CompletionService 的用途:

假设你有一组针对某个问题的solvers,每个都返回一个类型为Result的值,并且想要并发地运行它们,处理每个返回一个非空值的结果,在某些方法使用(Result r)

其实就是文中开头的使用方式

 void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException, ExecutionException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     for (Callable<Result> s : solvers)
         ecs.submit(s);
     int n = solvers.size();
     for (int i = 0; i < n; ++i) {
         Result r = ecs.take().get();
         if (r != null)
             use(r);
     }
 }
假设你想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务
void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     int n = solvers.size();
     List<Future<Result>> futures
         = new ArrayList<Future<Result>>(n);
     Result result = null;
     try {
         for (Callable<Result> s : solvers)
             futures.add(ecs.submit(s));
         for (int i = 0; i < n; ++i) {
             try {
                 Result r = ecs.take().get();
                 if (r != null) {
                     result = r;
                     break;
                 }
             } catch (ExecutionException ignore) {}
         }
     }
     finally {
         for (Future<Result> f : futures)
               // 注意这里的参数给的是 true,详解同样在前序 Future 源码分析文章中
             f.cancel(true);
     }

     if (result != null)
         use(result);
 }

这两种方式都是非常经典的 CompletionService 使用 范式 ,请大家仔细品味每一行代码的用意

范式没有说明 Executor 的使用,使用 ExecutorCompletionService,需要自己创建线程池,看上去虽然有些麻烦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险 (这也是我们反复说过多次的,不要所有业务共用一个线程池

总结

CompletionService 的应用场景还是非常多的,比如

  • Dubbo 中的 Forking Cluster
  • 多仓库文件/镜像下载(从最近的服务中心下载后终止其他下载过程)
  • 多服务调用(天气预报服务,最先获取到的结果)

CompletionService 不但能满足获取最快结果,还能起到一定 "load balancer" 作用,获取可用服务的结果,使用也非常简单, 只需要遵循范式即可

并发系列 讲了这么多,分析源码的过程也碰到各种队列,接下来我们就看看那些让人眼花缭乱的队列

灵魂追问

  1. 通常处理结果还会用异步方式进行处理,如果采用这种方式,有哪些注意事项?
  2. 如果是你,你会选择使用无界队列吗?为什么?

日拱一兵 | 原创

查看原文

赞 18 收藏 15 评论 4

日拱一兵 发布了文章 · 2020-08-05

hashCode竟然不是根据对象内存地址生成的?还对内存泄漏与偏向锁有影响?

日拱一兵 | 原创

起因

起因是群里的一位童鞋突然问了这么问题:

如果重写 equals 不重写 hashcode 会有什么影响?

这个问题从上午10:45 开始陆续讨论,到下午15:39 接近尾声 (忽略这形同虚设的马赛克)

这是一个好问题,更是一个高频基础面试题,我还曾经专门写过一篇文章 Java equals 和 hashCode 的这几个问题可以说明白吗, 主要说明了以下内容

<img data-original="https://rgyb.sunluomeng.top/20200728204424.png" style="zoom:50%;" />

随着讨论的进行,问题慢慢集中在内存溢出和内存泄漏的问题上

内存溢出 VS 内存泄漏

这两个词在中文解释上有些相似,至少给我的第一感觉,他们的差别是这样的(有人和我一样吗?)

内存溢出:Out of Memory (OOM) ,这个大家都很熟悉了,理解起来也很简单,就是内存不够用了(啤酒【对象】太多,杯子【内存】装不下了)

那啥是内存泄漏呢?

内存泄漏:Memory Leak

特意查了一下 Leak 的字典含义,解释1的直白翻译是【通常是由于错误失误,从一个开口进入或逃脱】

所以程序中的内存泄漏我的理解更多是:由于程序的编写错误暴漏出一些 开口,导致一些对象进入这写开口,最终导致相关问题,进一步说白了,程序有漏洞,不当的调用就会出问题

所以接下来我们主要来看看 Java 内存泄漏,以及问题的起因 hashCode 和内存泄漏到底有哪些关系

内存泄漏

咱也是一个有身份证的人,不能总讲大白话,相对官方的内存泄漏解释是这样滴:

内存泄漏说明的是这样一种情况:堆中存在一些不再使用的对象,但垃圾收集器无法将它们从内存中删除(垃圾收集器定期删除未引用的对象,但从不收集仍在引用的对象),因此对它们进行了不必要的维护

这句话略显抽象,一张图你就能明白

如果有用的、但垃圾收集器又不能删除的对象增多,就像下图这样,那么就会逐渐导致内存溢出(OOM)了

所以也可以总结为,OOM 的原因之一可能是内存泄漏导致的

内存泄漏会带来哪些问题

内存泄漏,会导致真正可用内存变少,在没达到 OOM 的这个过程中,就会出现奇奇怪怪的问题

  1. 当应用程序长时间连续运行时,性能会严重下降,毕竟可用内存变小
  2. 自发的和奇怪的应用程序崩溃
  3. 应用程序偶尔会耗尽连接对象(这个经常听说吧)
  4. 最终的结果是 OOM

所以也可以反过来推理,如果发生上述问题,有可能程序的某些地方发生了内存泄漏

那常见的哪些情形可能会引起内存泄漏呢?又有哪些解决办法呢?

会引起内存泄漏的常见情形与相应解决办法

静态成员变量的乱用

直接来看一个例子

@Slf4j
public class StaticTest {
    public static List<Double> list = new ArrayList<>();

    public void populateList() {
        for (int i = 0; i < 10000000; i++) {
            list.add(Math.random());
        }
    }

    public static void main(String[] args) {
        new StaticTest().populateList();
    }
}

populateList() 是一个 public 方法,可能被各种调用,导致 list 无限增大

解决办法

解决办法很简单,针对这种情形(也就是通常所说的长周期对象引用短周期对象),就是将 list 放到方法内部,方法栈帧执行完自动就会被回收了

public void populateList() {
   List<Double> list = new ArrayList<>();
   for (int i = 0; i < 10000000; i++) {
      list.add(Math.random());
   }
}

有童鞋可能有疑问:

看 Spring 源码时有好多是 static 修饰的成员变量,难道它们也会导致内存泄漏?

不是的,如果你仔细看逻辑,它们都是是在容器初始化的过程中一次性加载的,所以不会像 populateList 随着调用次数的增加,无限撑大 List

未关闭的流

在学习流的时候老师就在耳边反复说:

一定要关闭流... 闭流... ... ... ...

因为每当我们建立一个新的连接或打开一个流时(比如数据库连接、输入流和会话对象),JVM都会为这些资源分配内存,如果不关闭,这就是占用空间"有用"的对象, GC 就不会回收他们,当请求很大,来个请求就新建一个流,最终都还没关闭,结果可想而知

解决办法

流的解决办法很简单,其实主要遵循相应范式就可以避免此类问题

  1. 通过 try/catch/finally范式在 finally 关掉流
  2. 如果你用的 Java 7+ 的版本,也可以用 try-with-resources, 这样代码在编译后会自动帮你关闭流
  3. 也可以使用 Lombok 的 @Cleanup 注解, 就像下面这样
@Cleanup InputStream jobJarInputStream = new URL(jobJarUrl).openStream();
@Cleanup OutputStream jobJarOutputStream = new FileOutputStream(jobJarFile);
IOUtils.copy(jobJarInputStream, jobJarOutputStream);

不正确的 equals 和 hashCode 实现

又回到了这两个函数上,有很大一部分程序员不会主动重写 equals 和 hashCode,尤其是用 Lombok @Data 注解(该注解默认会帮助重写这两个函数)后,更会忽视这两个方法实现,一不小心的使就可能引起内存泄漏

来看个非常简单的例子:

public class MemLeakTest {

   public static void main(String[] args) throws InterruptedException {
      Map<Person, String> map = new HashMap<>();
      Person p1 = new Person("zhangsan", 1);
      Person p2 = new Person("zhangsan", 1);
      Person p3 = new Person("zhangsan", 1);

      map.put(p1, "zhangsan");
      map.put(p2, "zhangsan");
      map.put(p3, "zhangsan");

      System.out.println(map.entrySet().size()); // 运行结果:3
   }
}  

@Getter
@Setter
class Person {
    private String name;
    private Integer id;

    public Person(String name, Integer id){
        this.name = name;
        this.id = id;
    }
}

Person 类没有重写 hashCode 方法,那 Map 的 put 方法就会调用 Object 默认的 hashCode 方法

public V put(K key, V value) {
    return putVal(hash(key), key, value, false, true);
}

static final int hash(Object key) {
  int h;
  return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

p1, p2, p3 在【业务】属性上是完全相同的三个对象,由于「对象地址」的不同导致生成的 hashCode 不一样,最终都被放到 Map 中,这就会导致业务重复对象占用空间,所以这也是内存泄漏的一种

解决办法

解决办法很简单,在 Person 上加一个 Lombok 的 @Data 注解自动帮你重写 hashCode 方法,或手动在 IDE 中 generate,再次运行,结果就为 1了,符合业务需求

那重写了 hashCode 确实可以避免重复对象的加入,那这就完事大吉了吗, 再来看个例子

public static void main(String[] args) throws InterruptedException {
  // 注意: HashSet 的底层也是 Map 结构 
  Set<Person> set = new HashSet<Person>();

   Person p1 = new Person("zhangsan", 1);
   Person p2 = new Person("lisi", 2);
   Person p3 = new Person("wanger", 3);

   set.add(p1);
   set.add(p2);
   set.add(p3);
   
   System.out.println(set.size()); // 运行结果:3
   p3.setName("wangermao");
   set.remove(p3);
   System.out.println(set.size()); // 运行结果:3
   set.add(p3);
   System.out.println(set.size()); // 运行结果:4
}

从运行结果中来看,很显然 set.remove(p3) 没有删除成功,因为 p3.setName("wangermao") 后,重新计算 p3 的 hashCode 会发生变化,所以 remove 的时候会找不到相应的 Node,这就又给了增加相同对象的“机会”,导致业务中无用的对象被引用着,所以可以说这也是内存泄漏的一种。运行结果来看:

所以诸如此类操作,最好是先 remove,然后更改属性,最后再重新 add 进去

看到这,你应该发现了,要解决 hashCode 相关的问题,你要充分了解集合的特性,更要留意类是否重写了该方法以及它们的实现方式,避免出现内存泄漏情况

ThreadLocal

群消息中的最后,小姐姐 留下【ThreadLocal】几个字,深藏功与名的离开了,一看就是高手

ThreadLocal 是面试多线程的高频考点,它的好处是可以快速方便的做到线程隔离,但大家也都知道他是一把双刃剑,因为使用不好就有可能导致内存泄漏了

实际工作中我们都是使用线程池来管理线程 「具体请参考 我会手动创建线程,为什么要使用线程池」,这种方式可以让线程得到反复利用(故意不让 GC 回收),

现在,如果任何类创建了一个ThreadLocal变量,但没有显式地删除它,那么即使在web应用程序停止之后,该对象的副本仍将保留在工作线程中,从而阻止了该对象被垃圾收集,所以乱用也会导致内存泄漏

解决办法

解决办法依旧很简单,依旧是遵循标准

  1. 调用 ThreadLocal 的 remove() 方法,移除当前线程变量值
  2. 也可以将它看作一种 resource,使用 try/finally 范式,万一在运行过程中出现异常,还可以在 finally 中 remove 掉
try {
    threadLocal.set(System.nanoTime());
    // business code
}
finally {
    threadLocal.remove();
}

我觉得小姐姐一定是高手

总的来说,引起内存泄漏的原因非常多,比如还有引用外部类的内部类等问题,这里不再展开说明,只是说明了几种非常常见的可能引发内存泄漏问题的几种场景

内存泄漏问题不易察觉,所以有时需要借助工具来帮忙

JVisualJVM

JVisualJvm 【可视化JVM】,可分析JDK1.6及其以上版本的JVM运行时JVM参数系统参数堆栈CPU使用等信息。可分析本地应用及远程应用,在JDK1.6以上版本中自带。工具的使用暂不展开说明, 想快速使用此工具,只需要在 IDE 中安装个 VisualVM Launcher 插件

然后在进行基本的配置

然后在IDE的右上角或当前类鼠标右键就可以点击运行查看了

运行起 VisualJVM 就是这样子了

不要走,还没结束,在总结这篇文章的时候,我还发现了「新大陆」

HashCode 真是根据对象内存地址生成的?

脑海中的印象不知道为何,很根深蒂固的接受了Object hashCode 是根据对象内存地址生成的,这次刚好想探求一下 hashCode 的本质,还着实打破了我的固有印象 (以 JDK1.8 为例)

OpenJDK 定义 hashCode 的方法在下面两个文件中

  • src/share/vm/prims/jvm.h
  • src/share/vm/prims/jvm.cpp

逐步看下去,最终会来到 get_next_hash 这个方法中,方便大家查看我先把方法截图至此:

总的来说有 6 种生成 hashCode 的方式:

  • 0: A randomly generated number
  • 1: A function of memory address of the object
  • 2: A hardcoded 1 (used for sensitivity testing.)
  • 3: A sequence.
  • 4: The memory address of the object, cast to int
  • 5(else): Thread state combined with xorshift1

那在 JDK1.8 种用的哪一种呢?

可以看到在 JDK1.8 中生成 hashCode 的方式是 5, 也就是走程序的 else 路径,即使用 Xorshift,并不是之前认为的对象内存地址「1」,以为老版本是采用对象内存地址的方式,所以继续查看其他版本

从图中可以看出,JDK1.62JDK1.73 版本生成 hashCode 的方式「1」随机数的形式,和我们原本认为的并不一样,别的版本没有继续查询,至于「流传下来」说是对象内存地址生成的 hashCode 我也木有再深入研究,有了解的同学还请留言赐教

那么问题来了:

假设用的 JDK1.6或 JDK1.7,它们生成 hashCode 的方式是随机生成的,那一个对象多次调用hashCode是会有不同的hashCode 呢?(排除服务重启的情况)

显然应该不会的,因为如果每次都变化, 存储到集合中的对象那就很容易丢失了,那问题又来了:

它们存在哪了?

hash 值是存在对象头中的,我们还知道对象头中还可能存储线程ID,所以他们在某些情形中还会存在冲突

对象头中 hashCode 和 偏向锁的冲突

jvm 启动时,可以使用 -XX:+UseBiasedLocking=true 开启偏向锁,(关于偏向锁,轻量级锁,重量级锁大家查阅 synchronized 相关文档就可以),这里引 OpenJDK Wiki4 里面的图片加以文字说明整个冲突过程

所以,调用 Object 的 hashCode() 方法或者 System.identityHashCode() 方法会让对象不能使用偏向锁。到这里你也就应该知道了,如果你还想使用偏向锁,那最好重写 hashCode() 方法,避免使偏向锁失效

总结

为了解决群的这个问题,发现新大陆的同时也差点让我掉入【追问无底洞】,不过通过本文你应该了解内存溢出和内存泄漏的差别,以及他们的解决方案,另外 hashCode5 生成方式还着实让人有些惊讶,如果你知道「hashCode的生成是根据对象内存地址生成的来源,还请留言赐教」。除此之外,小小的 hashCode 还有可能让偏向锁失效,所有的这些细节问题都有可能是导致程序崩溃的坑,所以勿以「恶」小而为之,毋以「善」小而不为,良好的编程习惯能避免很多问题

当然想要更好的理解内存泄漏,当然是要更好的理解 GC 机制,而想要更好的理解 GC,当然是更好的理解 JVM,咱们后续慢慢分析吧

灵魂追问

  1. 为了清除 ThreadLocal 线程变量值,不用 ThreadLocal.remove() 方法,而是用 ThreadLocal.set(null) 会达到同样的效果吗?
  2. 你曾经遇到哪些不易察觉的内存泄漏问题呢?

参考


  1. https://wiki.openjdk.java.net...
  2. http://hg.openjdk.java.net/jd...
  3. http://hg.openjdk.java.net/jd...
  4. https://en.wikipedia.org/wiki...
  5. https://srvaroa.github.io/jvm...
查看原文

赞 3 收藏 2 评论 0

日拱一兵 发布了文章 · 2020-07-30

一款功能简约到可怜的SQL 客户端(蝇量级)

原创 | 日拱一兵

  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough

无意间看到这个SQL客户端,瞬间被它简洁的页面吸引了, 启动画面可能是它最复杂的呈现了,爱没?

SQLECTRON

按照官网 (https://sqlectron.github.io/, 看 URL 发现,SQLECTRON官网都是用 Github Pages 搭建的) 的说明:

一个简单的轻量级SQL客户端桌面/终端,具有跨数据库和跨平台的支持

看到这你应该放心了,无论你使用的是 LinuxMac 还是 Windows,都可以试一试。那它支持哪些数据库呢?一会到安装界面你就会发现了

这里我用 MAC 演示一下整个使用过程

安装与使用

写本文时的版本为 v1.30.0, 直接下载安装包——>拖拽, 一步安装完成

添加 Server

填写相关信息,从Database Type 中你应该已经看到了,目前支持的数据库类型有:

  • MySQL
  • PostgreSQL
  • Microsoft SQL Server
  • SQLite
  • Cassandra

测试连接 ——> Save 即可

选择相应的 Server, 然后 Connect,执行个 SQL 试一试

获取执行结果后,可以快速粘贴为 JSON 或 CSV 格式,当然也可以导出相应格式文件,非常方便

日常 explain 个 SQL, 画风都不一样了

说它很轻量级,我们和DataGrip 来做个比较(这么比真是没有人性,DataGrip 的功能有多少怎么不说呢) 如果不是重度客户端依赖的同学,SQLECTRON 还是满足基本要求的

由于客户端提供的功能并不复杂,所以快捷键 (https://github.com/sqlectron/...,大家可以自行查阅

如果你更喜欢终端形式,SQLECTRON 还有一个 SQLECTRON-TERM (https://github.com/sqlectron/... 客户端支持,就像这样,浓浓的 BIOS 风

只需一条命令安装即可(前提是安装 Node)

npm install -g sqlectron-term

Bu~~~t

先别盲目追逐,这是一个用 Javascript 语言实现的,并且在 github 上的星标并不多

因为 Owner 出于兴趣维护这个项目,但是现在兴趣没了

总结

如果你只是做日常的基本 SQL 执行,那么 SQLECTRON 完全可以满足你的需求,你不用再找 DataGrip 或 Navicat 的注册码,同时也不会让电脑发热太多发生卡顿

如果你有兴趣看一看,并且想尝试维护这个项目,这又是一个很好的锻炼机会

原创 | 日拱一兵

查看原文

赞 2 收藏 2 评论 0

日拱一兵 发布了文章 · 2020-07-30

一款功能简约到可怜的SQL 客户端

  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough

现陆续将Demo代码和技术文章整理在一起 Github实践精选 ,方便大家阅读查看,本文同样收录在此,觉得不错,还请Star🌟

无意间看到这个SQL客户端,瞬间被它简洁的页面吸引了, 启动画面可能是它最复杂的呈现了,爱没?

SQLECTRON

按照官网 (https://sqlectron.github.io/, 看 URL 发现,SQLECTRON官网都是用 Github Pages 搭建的) 的说明:

一个简单的轻量级SQL客户端桌面/终端,具有跨数据库和跨平台的支持

看到这你应该放心了,无论你使用的是 LinuxMac 还是 Windows,都可以试一试。那它支持哪些数据库呢?一会到安装界面你就会发现了

这里我用 MAC 演示一下整个使用过程

安装与使用

写本文时的版本为 v1.30.0, 直接下载安装包——>拖拽, 一步安装完成

添加 Server

填写相关信息,从Database Type 中你应该已经看到了,目前支持的数据库类型有:

  • MySQL
  • PostgreSQL
  • Microsoft SQL Server
  • SQLite
  • Cassandra

测试连接 ——> Save 即可

选择相应的 Server, 然后 Connect,执行个 SQL 试一试

获取执行结果后,可以快速粘贴为 JSON 或 CSV 格式,当然也可以导出相应格式文件,非常方便

日常 explain 个 SQL, 画风都不一样了

说它很轻量级,我们和DataGrip 来做个比较(这么比真是没有人性,DataGrip 的功能有多少怎么不说呢) 如果不是重度客户端依赖的同学,SQLECTRON 还是满足基本要求的

由于客户端提供的功能并不复杂,所以快捷键 (https://github.com/sqlectron/...,大家可以自行查阅

如果你更喜欢终端形式,SQLECTRON 还有一个 SQLECTRON-TERM (https://github.com/sqlectron/... 客户端支持,就像这样,浓浓的 BIOS 风

只需一条命令安装即可(前提是安装 Node)

npm install -g sqlectron-term

Bu~~~t

先别盲目追逐,这是一个用 Javascript 语言实现的,并且在 github 上的星标并不多

因为 Owner 出于兴趣维护这个项目,但是现在兴趣没了

总结

如果你只是做日常的基本 SQL 执行,那么 SQLECTRON 完全可以满足你的需求,你不用再找 DataGrip 或 Navicat 的注册码,同时也不会让电脑发热太多发生卡顿

如果你有兴趣看一看,并且想尝试维护这个项目,这又是一个很好的锻炼机会

日拱一兵 | 原创

查看原文

赞 8 收藏 5 评论 2

日拱一兵 发布了文章 · 2020-07-21

搞定 CompletableFuture,并发异步编程和编写串行程序还有什么区别?你们要的多图长文

  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough

前言

上一篇文章 不会用Java Future,我怀疑你泡茶没我快 全面分析了 Future,通过它我们可以获取线程的执行结果,它虽然解决了 Runnable 的 “三无” 短板,但是它自身还是有短板:

不能手动完成计算

假设你使用 Future 运行子线程调用远程 API 来获取某款产品的最新价格,服务器由于洪灾宕机了,此时如果你想手动结束计算,而是想返回上次缓存中的价格,这是 Future 做不到的

调用 get() 方法会阻塞程序

Future 不会通知你它的完成,它提供了一个get()方法,程序调用该方法会阻塞直到结果可用为止,没有办法利用回调函数附加到Future,并在Future的结果可用时自动调用它

不能链式执行

烧水泡茶中,通过构造函数传参做到多个任务的链式执行,万一有更多的任务,或是任务链的执行顺序有变,对原有程序的影响都是非常大的

整合多个 Future 执行结果方式笨重

假设有多个 Future 并行执行,需要在这些任务全部执行完成之后做后续操作,Future 本身是做不到的,需要借助工具类 Executors 的方法

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
没有异常处理

Future 同样没有提供很好的异常处理方案

上一篇文章看 Future 觉得是发现了新天地,这么一说有感觉回到了解放前

对于 Java 后端的同学,在 Java1.8 之前想实现异步编程,还想避开上述这些烦恼,ReactiveX 应该是一个常见解决方案(做Android 的应该会有了解)。如果熟悉前端同学, ES6 Promise(男朋友的承诺)也解决了异步编程的烦恼

天下语言都在彼此借鉴相应优点,Java 作为老牌劲旅自然也要解决上述问题。又是那个男人,并发大师 Doug Lea 忧天下程序员之忧,解天下程序员之困扰,在 Java1.8 版本(Lambda 横空出世)中,新增了一个并发工具类 CompletableFuture,它的出现,让人在泡茶过程中,品尝到了不一样的味道......

几个重要 Lambda 函数

CompletableFuture 在 Java1.8 的版本中出现,自然也得搭上 Lambda 的顺风车,为了更好的理解 CompletableFuture,这里我需要先介绍一下几个 Lambda 函数,我们只需要关注它们的以下几点就可以:

  • 参数接受形式
  • 返回值形式
  • 函数名称

Runnable

Runnable 我们已经说过无数次了,无参数,无返回值

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

Function

Function<T, R> 接受一个参数,并且有返回值

@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}

Consumer

Consumer<T> 接受一个参数,没有返回值

@FunctionalInterface
public interface Consumer<T> {   
    void accept(T t);
}

Supplier

Supplier<T> 没有参数,有一个返回值

@FunctionalInterface
public interface Supplier<T> {
    T get();
}

BiConsumer

BiConsumer<T, U> 接受两个参数(Bi, 英文单词词根,代表两个的意思),没有返回值

@FunctionalInterface
public interface BiConsumer<T, U> {
    void accept(T t, U u);

好了,我们做个小汇总

有些同学可能有疑问,为什么要关注这几个函数式接口,因为 CompletableFuture 的函数命名以及其作用都是和这几个函数式接口高度相关的,一会你就会发现了

前戏做足,终于可以进入正题了 CompletableFuture

CompletableFuture

类结构

老规矩,先从类结构看起:

实现了 Future 接口

实现了 Future 接口,那就具有 Future 接口的相关特性,请脑补 Future 那少的可怜的 5 个方法,这里不再赘述,具体请查看 不会用Java Future,我怀疑你泡茶没我快

实现了 CompletionStage 接口

CompletionStage 这个接口还是挺陌生的,中文直译过来是【竣工阶段】,如果将烧水泡茶比喻成一项大工程,他们的竣工阶段体现是不一样的

  1. 单看线程1 或单看线程 2 就是一种串行关系,做完一步之后做下一步
  2. 一起看线程1 和 线程 2,它们彼此就是并行关系,两个线程做的事彼此独立互补干扰
  3. 泡茶就是线程1 和 线程 2 的汇总/组合,也就是线程 1 和 线程 2 都完成之后才能到这个阶段(当然也存在线程1 或 线程 2 任意一个线程竣工就可以开启下一阶段的场景)

所以,CompletionStage 接口的作用就做了这点事,所有函数都用于描述任务的时序关系,总结起来就是这个样子:

CompletableFuture 既然实现了两个接口,自然也就会实现相应的方法充分利用其接口特性,我们走进它的方法来看一看

CompletableFuture 大约有50种不同处理串行,并行,组合以及处理错误的方法。小弟屏幕不争气,方法之多,一个屏幕装不下,看到这么多方法,是不是瞬间要直接 收藏——>吃灰 2连走人?别担心,我们按照相应的命名和作用进行分类,分分钟搞定50多种方法

串行关系

then 直译【然后】,也就是表示下一步,所以通常是一种串行关系体现, then 后面的单词(比如 run /apply/accept)就是上面说的函数式接口中的抽象方法名称了,它的作用和那几个函数式接口的作用是一样一样滴

CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
  
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
  
CompletableFuture<Void> thenAccept(Consumer<? super T> action) 
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
  
<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)  
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

聚合 And 关系

combine... with...both...and... 都是要求两者都满足,也就是 and 的关系了

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

<U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
<U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
  
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)

聚合 Or 关系

Either...or... 表示两者中的一个,自然也就是 Or 的体现了

<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
<U> CompletableFuture<U> applyToEitherAsync(、CompletionStage<? extends T> other, Function<? super T, U> fn)
<U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)

CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)

异常处理

CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor)
        
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
        
       
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

这个异常处理看着还挺吓人的,拿传统的 try/catch/finally 做个对比也就瞬间秒懂了

whenComplete 和 handle 的区别如果你看接受的参数函数式接口名称你也就能看出差别了,前者使用Comsumer, 自然也就不会有返回值;后者使用 Function,自然也就会有返回值

这里并没有全部列举,不过相信很多同学已经发现了规律:

CompletableFuture 提供的所有回调方法都有两个异步(Async)变体,都像这样
// thenApply() 的变体
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
另外,方法的名称也都与前戏中说的函数式接口完全匹配,按照这中规律分类之后,这 50 多个方法看起来是不是很轻松了呢?

基本方法已经罗列的差不多了,接下来我们通过一些例子来实际演示一下:

案例演示

创建一个 CompletableFuture 对象

创建一个 CompletableFuture 对象并没有什么稀奇的,依旧是通过构造函数构建

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

这是最简单的 CompletableFuture 对象创建方式,由于它实现了 Future 接口,所以自然就可以通过 get() 方法获取结果

String result = completableFuture.get();

文章开头已经说过,get()方法在任务结束之前将一直处在阻塞状态,由于上面创建的 Future 没有返回,所以在这里调用 get() 将会永久性的堵塞

这时就需要我们调用 complete() 方法手动的结束一个 Future

completableFuture.complete("Future's Result Here Manually");

这时,所有等待这个 Future 的 client 都会返回手动结束的指定结果

runAsync

使用 runAsync 进行异步计算

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("运行在一个单独的线程当中");
});

future.get();

由于使用的是 Runnable 函数式表达式,自然也不会获取到结果

supplyAsync

使用 runAsync 是没有返回结果的,我们想获取异步计算的返回结果需要使用 supplyAsync() 方法

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            log.info("运行在一个单独的线程当中");
            return "我有返回值";
        });

        log.info(future.get());

由于使用的是 Supplier 函数式表达式,自然可以获得返回结果

我们已经多次说过,get() 方法在Future 计算完成之前会一直处在 blocking 状态下,对于真正的异步处理,我们希望的是可以通过传入回调函数,在Future 结束时自动调用该回调函数,这样,我们就不用等待结果

CompletableFuture<String> comboText = CompletableFuture.supplyAsync(() -> {
          //可以注释掉做快速返回 start
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            log.info("👍");
          //可以注释掉做快速返回 end
            return "赞";
        })
                .thenApply(first -> {
                    log.info("在看");
                    return first + ", 在看";
                })
                .thenApply(second -> second + ", 转发");

        log.info("三连有没有?");
        log.info(comboText.get());

对 thenApply 的调用并没有阻塞程序打印log,也就是前面说的通过回调通知机制, 这里你看到 thenApply 使用的是supplyAsync所用的线程,如果将supplyAsync 做快速返回,我们再来看一下运行结果:

thenApply 此时使用的是主线程,所以:

串行的后续操作并不一定会和前序操作使用同一个线程

thenAccept

如果你不想从回调函数中返回任何结果,那可以使用 thenAccept

        final CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(
                // 模拟远端API调用,这里只返回了一个构造的对象
                () -> Product.builder().id(12345L).name("颈椎/腰椎治疗仪").build())
                .thenAccept(product -> {
                    log.info("获取到远程API产品名称 " + product.getName());
                });
        voidCompletableFuture.get();

thenRun

thenAccept 可以从回调函数中获取前序执行的结果,但thenRun 却不可以,因为它的回调函数式表达式定义中没有任何参数

CompletableFuture.supplyAsync(() -> {
    //前序操作
}).thenRun(() -> {
    //串行的后需操作,无参数也无返回值
});

我们前面同样说过了,每个提供回调方法的函数都有两个异步(Async)变体,异步就是另外起一个线程

        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            log.info("前序操作");
            return "前需操作结果";
        }).thenApplyAsync(result -> {
            log.info("后续操作");
            return "后续操作结果";
        });

到这里,相信你串行的操作你已经非常熟练了

thenCompose

日常的任务中,通常定义的方法都会返回 CompletableFuture 类型,这样会给后续操作留有更多的余地,假如有这样的业务(X呗是不是都有这样的业务呢?):

//获取用户信息详情
    CompletableFuture<User> getUsersDetail(String userId) {
        return CompletableFuture.supplyAsync(() -> User.builder().id(12345L).name("日拱一兵").build());
    }

    //获取用户信用评级
    CompletableFuture<Double> getCreditRating(User user) {
        return CompletableFuture.supplyAsync(() -> CreditRating.builder().rating(7.5).build().getRating());
    }

这时,如果我们还是使用 thenApply() 方法来描述串行关系,返回的结果就会发生 CompletableFuture 的嵌套

        CompletableFuture<CompletableFuture<Double>> result = completableFutureCompose.getUsersDetail(12345L)
                .thenApply(user -> completableFutureCompose.getCreditRating(user));

显然这不是我们想要的,如果想“拍平” 返回结果,thenCompose 方法就派上用场了

CompletableFuture<Double> result = completableFutureCompose.getUsersDetail(12345L)
                .thenCompose(user -> completableFutureCompose.getCreditRating(user));

这个和 Lambda 的map 和 flatMap 的道理是一样一样滴

thenCombine

如果要聚合两个独立 Future 的结果,那么 thenCombine 就会派上用场了

        CompletableFuture<Double> weightFuture = CompletableFuture.supplyAsync(() -> 65.0);
        CompletableFuture<Double> heightFuture = CompletableFuture.supplyAsync(() -> 183.8);
        
        CompletableFuture<Double> combinedFuture = weightFuture
                .thenCombine(heightFuture, (weight, height) -> {
                    Double heightInMeter = height/100;
                    return weight/(heightInMeter*heightInMeter);
                });

        log.info("身体BMI指标 - " + combinedFuture.get());

当然这里多数时处理两个 Future 的关系,如果超过两个Future,如何处理他们的一些聚合关系呢?

allOf | anyOf

相信你看到方法的签名,你已经明白他的用处了,这里就不再介绍了

static CompletableFuture<Void>     allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

接下来就是异常的处理了

exceptionally

        Integer age = -1;

        CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
            if( age < 0 ) {
                throw new IllegalArgumentException("何方神圣?");
            }
            if(age > 18) {
                return "大家都是成年人";
            } else {
                return "未成年禁止入内";
            }
        }).thenApply((str) -> {
            log.info("游戏开始");
            return str;
        }).exceptionally(ex -> {
            log.info("必有蹊跷,来者" + ex.getMessage());
            return "Unknown!";
        });

        log.info(maturityFuture.get());

exceptionally 就相当于 catch,出现异常,将会跳过 thenApply 的后续操作,直接捕获异常,进行一场处理

handle

用多线程,良好的习惯是使用 try/finally 范式,handle 就可以起到 finally 的作用,对上述程序做一个小小的更改, handle 接受两个参数,一个是正常返回值,一个是异常

注意:handle的写法也算是范式的一种
        Integer age = -1;

        CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
            if( age < 0 ) {
                throw new IllegalArgumentException("何方神圣?");
            }
            if(age > 18) {
                return "大家都是成年人";
            } else {
                return "未成年禁止入内";
            }
        }).thenApply((str) -> {
            log.info("游戏开始");
            return str;
        }).handle((res, ex) -> {
            if(ex != null) {
                log.info("必有蹊跷,来者" + ex.getMessage());
                return "Unknown!";
            }
            return res;
        });

        log.info(maturityFuture.get());

到这里,关于 CompletableFuture 的基本使用你已经了解的差不多了,不知道你是否注意,我们前面说的带有 Sync 的方法是单独起一个线程来执行,但是我们并没有创建线程,这是怎么实现的呢?

细心的朋友如果仔细看每个变种函数的第三个方法也许会发现里面都有一个 Executor 类型的参数,用于指定线程池,因为实际业务中我们是严谨手动创建线程的,这在 我会手动创建线程,为什么要使用线程池?文章中明确说明过;如果没有指定线程池,那自然就会有一个默认的线程池,也就是 ForkJoinPool

private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

ForkJoinPool 的线程数默认是 CPU 的核心数。但是,在前序文章中明确说明过:

不要所有业务共用一个线程池,因为,一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能

总结

CompletableFuture 的方法并没有全部介绍完全,也没必要全部介绍,相信大家按照这个思路来理解 CompletableFuture 也不会有什么大问题了,剩下的就交给实践/时间以及自己的体会了

后记

你以为 JDK1.8 CompletableFuture 已经很完美了是不是,但追去完美的道路上永无止境,Java 9 对CompletableFuture 又做了部分升级和改造

  1. 添加了新的工厂方法
  2. 支持延迟和超时处理

    orTimeout()
    completeOnTimeout()
  3. 改进了对子类的支持

详情可以查看: Java 9 CompletableFuture API Improvements. 怎样快速的切换不同 Java 版本来尝鲜?SDKMAN 统一灵活管理多版本Java 这篇文章的方法送给你

最后咱们再泡一壶茶,感受一下新变化吧

灵魂追问

  1. 听说 ForkJoinPool 线程池效率更高,为什么呢?
  2. 如果批量处理异步程序,有什么可用的方案吗?

参考

  1. Java 并发编程实战
  2. Java 并发编程的艺术
  3. Java 并发编程之美
  4. https://www.baeldung.com/java...
  5. https://www.callicoder.com/ja...

日拱一兵 | 原创

查看原文

赞 15 收藏 13 评论 0

日拱一兵 发布了文章 · 2020-07-10

不会用Java Future,我怀疑你泡茶没我快, 又是超长图文!!

mountains-1412683_1280.png

  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough

前言

创建线程有几种方式?这个问题的答案应该是可以脱口而出的吧

  • 继承 Thread 类
  • 实现 Runnable 接口

但这两种方式创建的线程是属于”三wu产品“:

  • 没有参数
  • 没有返回值
  • 没办法抛出异常
class MyThread implements Runnable{
   @Override
   public void run() {
      log.info("my thread");
   }
}

Runnable 接口是 JDK1.0 的核心产物

 /**
 * @since   JDK1.0
 */
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

用着 “三wu产品” 总是有一些弊端,其中没办法拿到返回值是最让人不能忍的,于是 Callable 就诞生了

Callable

又是 Doug Lea 大师,又是 Java 1.5 这个神奇的版本

 /**
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> the result type of method {@code call}
 */
@FunctionalInterface
public interface Callable<V> {
    
    V call() throws Exception;
}

Callable 是一个泛型接口,里面只有一个 call() 方法,该方法可以返回泛型值 V ,使用起来就像这样:

Callable<String> callable = () -> {
    // Perform some computation
    Thread.sleep(2000);
    return "Return some result";
};

二者都是函数式接口,里面都仅有一个方法,使用上又是如此相似,除了有无返回值,Runnable 与 Callable 就点差别吗?

Runnable VS Callable

两个接口都是用于多线程执行任务的,但他们还是有很明显的差别的

执行机制

先从执行机制上来看,Runnable 你太清楚了,它既可以用在 Thread 类中,也可以用在 ExecutorService 类中配合线程池的使用;Bu~~~~t, Callable 只能在 ExecutorService 中使用,你翻遍 Thread 类,也找不到Callable 的身影

异常处理

Runnable 接口中的 run 方法签名上没有 throws ,自然也就没办法向上传播受检异常;而 Callable 的 call() 方法签名却有 throws,所以它可以处理受检异常;

所以归纳起来看主要有这几处不同点:

整体差别虽然不大,但是这点差别,却具有重大意义

返回值和处理异常很好理解,另外,在实际工作中,我们通常要使用线程池来管理线程(原因已经在 为什么要使用线程池? 中明确说明),所以我们就来看看 ExecutorService 中是如何使用二者的

ExecutorService

先来看一下 ExecutorService 类图

我将上图标记的方法单独放在此处

void execute(Runnable command);

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

可以看到,使用ExecutorService 的 execute() 方法依旧得不到返回值,而 submit() 方法清一色的返回 Future 类型的返回值

细心的朋友可能已经发现, submit() 方法已经在 CountDownLatch 和 CyclicBarrier 傻傻的分不清楚? 文章中多次使用了,只不过我们没有获取其返回值罢了,那么

  • Future 到底是什么呢?
  • 怎么通过它获取返回值呢?

我们带着这些疑问一点点来看

Future

Future 又是一个接口,里面只有五个方法:

从方法名称上相信你已经能看出这些方法的作用

// 取消任务
boolean cancel(boolean mayInterruptIfRunning);

// 获取任务执行结果
V get() throws InterruptedException, ExecutionException;

// 获取任务执行结果,带有超时时间限制
V get(long timeout, TimeUnit unit) throws InterruptedException,                             ExecutionException,  TimeoutException;

// 判断任务是否已经取消
boolean isCancelled();

// 判断任务是否已经结束
boolean isDone();

铺垫了这么多,看到这你也许有些乱了,咱们赶紧看一个例子,演示一下几个方法的作用

@Slf4j
public class FutureAndCallableExample {

   public static void main(String[] args) throws InterruptedException, ExecutionException {
      ExecutorService executorService = Executors.newSingleThreadExecutor();

      // 使用 Callable ,可以获取返回值
      Callable<String> callable = () -> {
         log.info("进入 Callable 的 call 方法");
         // 模拟子线程任务,在此睡眠 2s,
         // 小细节:由于 call 方法会抛出 Exception,这里不用像使用 Runnable 的run 方法那样 try/catch 了
         Thread.sleep(5000);
         return "Hello from Callable";
      };

      log.info("提交 Callable 到线程池");
      Future<String> future = executorService.submit(callable);

      log.info("主线程继续执行");

      log.info("主线程等待获取 Future 结果");
      // Future.get() blocks until the result is available
      String result = future.get();
      log.info("主线程获取到 Future 结果: {}", result);

      executorService.shutdown();
   }
}

程序运行结果如下:

如果你运行上述示例代码,主线程调用 future.get() 方法会阻塞自己,直到子任务完成。我们也可以使用 Future 方法提供的 isDone 方法,它可以用来检查 task 是否已经完成了,我们将上面程序做点小修改:

// 如果子线程没有结束,则睡眠 1s 重新检查
while(!future.isDone()) {
   System.out.println("Task is still not done...");
   Thread.sleep(1000);
}

来看运行结果:

如果子程序运行时间过长,或者其他原因,我们想 cancel 子程序的运行,则我们可以使用 Future 提供的 cancel 方法,继续对程序做一些修改

while(!future.isDone()) {
   System.out.println("子线程任务还没有结束...");
   Thread.sleep(1000);

   double elapsedTimeInSec = (System.nanoTime() - startTime)/1000000000.0;

      // 如果程序运行时间大于 1s,则取消子线程的运行
   if(elapsedTimeInSec > 1) {
      future.cancel(true);
   }
}

来看运行结果:

为什么调用 cancel 方法程序会出现 CancellationException 呢? 是因为调用 get() 方法时,明确说明了:

调用 get() 方法时,如果计算结果被取消了,则抛出 CancellationException (具体原因,你会在下面的源码分析中看到)

有异常不处理是非常不专业的,所以我们需要进一步修改程序,以更友好的方式处理异常

// 通过 isCancelled 方法判断程序是否被取消,如果被取消,则打印日志,如果没被取消,则正常调用 get() 方法
if (!future.isCancelled()){
   log.info("子线程任务已完成");
   String result = future.get();
   log.info("主线程获取到 Future 结果: {}", result);
}else {
   log.warn("子线程任务被取消");
}

查看程序运行结果:

相信到这里你已经对 Future 的几个方法有了基本的使用印象,但 Future 是接口,其实使用 ExecutorService.submit() 方法返回的一直都是 Future 的实现类 FutureTask

接下来我们就进入这个核心实现类一探究竟

FutureTask

同样先来看类结构

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

很神奇的一个接口,FutureTask 实现了 RunnableFuture 接口,而 RunnableFuture 接口又分别实现了 RunnableFuture 接口,所以可以推断出 FutureTask 具有这两种接口的特性:

  • Runnable 特性,所以可以用在 ExecutorService 中配合线程池使用
  • Future 特性,所以可以从中获取到执行结果

FutureTask源码分析

如果你完整的看过 AQS 相关分析的文章,你也许会发现,阅读 Java 并发工具类源码,我们无非就是要关注以下这三点:

- 状态 (代码逻辑的主要控制)
- 队列 (等待排队队列)
- CAS (安全的set 值)


脑海中牢记这三点,咱们开始看 FutureTask 源码,看一下它是如何围绕这三点实现相应的逻辑的

文章开头已经提到,实现 Runnable 接口形式创建的线程并不能获取到返回值,而实现 Callable 的才可以,所以 FutureTask 想要获取返回值,必定是和 Callable 有联系的,这个推断一点都没错,从构造方法中就可以看出来:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

即便在 FutureTask 构造方法中传入的是 Runnable 形式的线程,该构造方法也会通过 Executors.callable 工厂方法将其转换为 Callable 类型:

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

但是 FutureTask 实现的是 Runnable 接口,也就是只能重写 run() 方法,run() 方法又没有返回值,那问题来了:

  • FutureTask 是怎样在 run() 方法中获取返回值的?
  • 它将返回值放到哪里了?
  • get() 方法又是怎样拿到这个返回值的呢?

我们来看一下 run() 方法(关键代码都已标记注释)

public void run() {
      // 如果状态不是 NEW,说明任务已经执行过或者已经被取消,直接返回
      // 如果状态是 NEW,则尝试把执行线程保存在 runnerOffset(runner字段),如果赋值失败,则直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
          // 获取构造函数传入的 Callable 值
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                  // 正常调用 Callable 的 call 方法就可以获取到返回值
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                  // 保存 call 方法抛出的异常
                setException(ex);
            }
            if (ran)
                  // 保存 call 方法的执行结果
                set(result);
        }
    } finally {        
        runner = null;       
        int s = state;
          // 如果任务被中断,则执行中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

run() 方法没有返回值,至于 run() 方法是如何将 call() 方法的返回结果和异常都保存起来的呢?其实非常简单, 就是通过 set(result) 保存正常程序运行结果,或通过 setException(ex) 保存程序异常信息

/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes

// 保存异常结果
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

// 保存正常结果
protected void set(V v) {
  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = v;
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    finishCompletion();
  }
}

setExceptionset 方法非常相似,都是将异常或者结果保存在 Object 类型的 outcome 变量中,outcome 是成员变量,就要考虑线程安全,所以他们要通过 CAS方式设置 outcome 变量的值,既然是在 CAS 成功后 更改 outcome 的值,这也就是 outcome 没有被 volatile 修饰的原因所在。

保存正常结果值(set方法)与保存异常结果值(setException方法)两个方法代码逻辑,唯一的不同就是 CAS 传入的 state 不同。我们上面提到,state 多数用于控制代码逻辑,FutureTask 也是这样,所以要搞清代码逻辑,我们需要先对 state 的状态变化有所了解

 /*
 *
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL  //执行过程顺利完成
 * NEW -> COMPLETING -> EXCEPTIONAL //执行过程出现异常
 * NEW -> CANCELLED // 执行过程中被取消
 * NEW -> INTERRUPTING -> INTERRUPTED //执行过程中,线程被中断
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

7种状态,千万别慌,整个状态流转其实只有四种线路

FutureTask 对象被创建出来,state 的状态就是 NEW 状态,从上面的构造函数中你应该已经发现了,四个最终状态 NORMAL ,EXCEPTIONAL , CANCELLED , INTERRUPTED 也都很好理解,两个中间状态稍稍有点让人困惑:

  • COMPLETING: outcome 正在被set 值的时候
  • INTERRUPTING:通过 cancel(true) 方法正在中断线程的时候

总的来说,这两个中间状态都表示一种瞬时状态,我们将几种状态图形化展示一下:

我们知道了 run() 方法是如何保存结果的,以及知道了将正常结果/异常结果保存到了 outcome 变量里,那就需要看一下 FutureTask 是如何通过 get() 方法获取结果的:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
      // 如果 state 还没到 set outcome 结果的时候,则调用 awaitDone() 方法阻塞自己
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
      // 返回结果
    return report(s);
}

awaitDone 方法是 FutureTask 最核心的一个方法

// get 方法支持超时限制,如果没有传入超时时间,则接受的参数是 false 和 0L
// 有等待就会有队列排队或者可响应中断,从方法签名上看有 InterruptedException,说明该方法这是可以被中断的
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
      // 计算等待截止时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
          // 如果当前线程被中断,如果是,则在等待对立中删除该节点,并抛出 InterruptedException
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
          // 状态大于 COMPLETING 说明已经达到某个最终状态(正常结束/异常结束/取消)
          // 把 thread 只为空,并返回结果
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
          // 如果是COMPLETING 状态(中间状态),表示任务已结束,但 outcome 赋值还没结束,这时主动让出执行权,让其他线程优先执行(只是发出这个信号,至于是否别的线程执行一定会执行可是不一定的)
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
          // 等待节点为空
        else if (q == null)
              // 将当前线程构造节点
            q = new WaitNode();
          // 如果还没有入队列,则把当前节点加入waiters首节点并替换原来waiters
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
          // 如果设置超时时间
        else if (timed) {
            nanos = deadline - System.nanoTime();
              // 时间到,则不再等待结果
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
              // 阻塞等待特定时间
            LockSupport.parkNanos(this, nanos);
        }
        else
              // 挂起当前线程,知道被其他线程唤醒
            LockSupport.park(this);
    }
}

总的来说,进入这个方法,通常会经历三轮循环

  1. 第一轮for循环,执行的逻辑是 q == null, 这时候会新建一个节点 q, 第一轮循环结束。
  2. 第二轮for循环,执行的逻辑是 !queue,这个时候会把第一轮循环中生成的节点的 next 指针指向waiters,然后CAS的把节点q 替换waiters, 也就是把新生成的节点添加到waiters 中的首节点。如果替换成功,queued=true。第二轮循环结束。
  3. 第三轮for循环,进行阻塞等待。要么阻塞特定时间,要么一直阻塞知道被其他线程唤醒。

对于第二轮循环,大家可能稍稍有点迷糊,我们前面说过,有阻塞,就会排队,有排队自然就有队列,FutureTask 内部同样维护了一个队列

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

说是等待队列,其实就是一个 Treiber 类型 stack,既然是 stack, 那就像手枪的弹夹一样(脑补一下子弹放入弹夹的情形),后进先出,所以刚刚说的第二轮循环,会把新生成的节点添加到 waiters stack 的首节点

如果程序运行正常,通常调用 get() 方法,会将当前线程挂起,那谁来唤醒呢?自然是 run() 方法运行完会唤醒,设置返回结果(set方法)/异常的方法(setException方法) 两个方法中都会调用 finishCompletion 方法,该方法就会唤醒等待队列中的线程

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                      // 唤醒等待队列中的线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

将一个任务的状态设置成终止态只有三种方法:

  • set
  • setException
  • cancel

前两种方法已经分析完,接下来我们就看一下 cancel 方法

查看 Future cancel(),该方法注释上明确说明三种 cancel 操作一定失败的情形

  1. 任务已经执行完成了
  2. 任务已经被取消过了
  3. 任务因为某种原因不能被取消

其它情况下,cancel操作将返回true。值得注意的是,cancel操作返回 true 并不代表任务真的就是被取消, 这取决于发动cancel状态时,任务所处的状态

  • 如果发起cancel时任务还没有开始运行,则随后任务就不会被执行;
  • 如果发起cancel时任务已经在运行了,则这时就需要看 mayInterruptIfRunning 参数了:

    • 如果mayInterruptIfRunning 为true, 则当前在执行的任务会被中断
    • 如果mayInterruptIfRunning 为false, 则可以允许正在执行的任务继续运行,直到它执行完

有了这些铺垫,看一下 cancel 代码的逻辑就秒懂了

public boolean cancel(boolean mayInterruptIfRunning) {
  
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
          // 需要中断任务执行线程
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                  // 中断线程
                if (t != null)
                    t.interrupt();
            } finally { // final state
                  // 修改为最终状态 INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
          // 唤醒等待中的线程
        finishCompletion();
    }
    return true;
}

核心方法终于分析完了,到这咱们喝口茶休息一下吧

我是想说,使用 FutureTask 来演练烧水泡茶经典程序

如上图:

  • 洗水壶 1 分钟
  • 烧开水 15 分钟
  • 洗茶壶 1 分钟
  • 洗茶杯 1 分钟
  • 拿茶叶 2 分钟

最终泡茶

让我心算一下,如果串行总共需要 20 分钟,但很显然在烧开水期间,我们可以洗茶壶/洗茶杯/拿茶叶

这样总共需要 16 分钟,节约了 4分钟时间,烧水泡茶尚且如此,在现在高并发的时代,4分钟可以做的事太多了,学会使用 Future 优化程序是必然(其实优化程序就是寻找关键路径,关键路径找到了,非关键路径的任务通常就可以和关键路径的内容并行执行了

@Slf4j
public class MakeTeaExample {

   public static void main(String[] args) throws ExecutionException, InterruptedException {
      ExecutorService executorService = Executors.newFixedThreadPool(2);

      // 创建线程1的FutureTask
      FutureTask<String> ft1 = new FutureTask<String>(new T1Task());
      // 创建线程2的FutureTask
      FutureTask<String> ft2 = new FutureTask<String>(new T2Task());

      executorService.submit(ft1);
      executorService.submit(ft2);

      log.info(ft1.get() + ft2.get());
      log.info("开始泡茶");

      executorService.shutdown();
   }

   static class T1Task implements Callable<String> {

      @Override
      public String call() throws Exception {
         log.info("T1:洗水壶...");
         TimeUnit.SECONDS.sleep(1);

         log.info("T1:烧开水...");
         TimeUnit.SECONDS.sleep(15);

         return "T1:开水已备好";
      }
   }

   static class T2Task implements Callable<String> {
      @Override
      public String call() throws Exception {
         log.info("T2:洗茶壶...");
         TimeUnit.SECONDS.sleep(1);

         log.info("T2:洗茶杯...");
         TimeUnit.SECONDS.sleep(2);

         log.info("T2:拿茶叶...");
         TimeUnit.SECONDS.sleep(1);
         return "T2:福鼎白茶拿到了";
      }
   }
}

上面的程序是主线程等待两个 FutureTask 的执行结果,线程1 烧开水时间更长,线程1希望在水烧开的那一刹那就可以拿到茶叶直接泡茶,怎么半呢?

那只需要在线程 1 的FutureTask 中获取 线程 2 FutureTask 的返回结果就可以了,我们稍稍修改一下程序:

@Slf4j
public class MakeTeaExample1 {

   public static void main(String[] args) throws ExecutionException, InterruptedException {
      ExecutorService executorService = Executors.newFixedThreadPool(2);

      // 创建线程2的FutureTask
      FutureTask<String> ft2 = new FutureTask<String>(new T2Task());
      // 创建线程1的FutureTask
      FutureTask<String> ft1 = new FutureTask<String>(new T1Task(ft2));
      
      executorService.submit(ft1);
      executorService.submit(ft2);

      executorService.shutdown();
   }

   static class T1Task implements Callable<String> {

      private FutureTask<String> ft2;
      public T1Task(FutureTask<String> ft2) {
         this.ft2 = ft2;
      }

      @Override
      public String call() throws Exception {
         log.info("T1:洗水壶...");
         TimeUnit.SECONDS.sleep(1);

         log.info("T1:烧开水...");
         TimeUnit.SECONDS.sleep(15);

         String t2Result = ft2.get();
         log.info("T1 拿到T2的 {}, 开始泡茶", t2Result);
         return "T1: 上茶!!!";
      }
   }

   static class T2Task implements Callable<String> {
      @Override
      public String call() throws Exception {
         log.info("T2:洗茶壶...");
         TimeUnit.SECONDS.sleep(1);

         log.info("T2:洗茶杯...");
         TimeUnit.SECONDS.sleep(2);

         log.info("T2:拿茶叶...");
         TimeUnit.SECONDS.sleep(1);
         return "福鼎白茶";
      }
   }
}

来看程序运行结果:

知道这个变化后我们再回头看 ExecutorService 的三个 submit 方法:

<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> Future<T> submit(Callable<T> task);

第一种方法,逐层代码查看到这里:

你会发现,和我们改造烧水泡茶的程序思维是相似的,可以传进去一个 result,result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据

第二个方法参数是 Runnable 类型参数,即便调用 get() 方法也是返回 null,所以仅是可以用来断言任务已经结束了,类似 Thread.join()

第三个方法参数是 Callable 类型参数,通过get() 方法可以明确获取 call() 方法的返回值

到这里,关于 Future 的整块讲解就结束了,还是需要简单消化一下的

总结

如果熟悉 Javascript 的朋友,Future 的特性和 Javascript 的 Promise 是类似的,私下开玩笑通常将其比喻成男朋友的承诺

回归到Java,我们从 JDK 的演变历史,谈及 Callable 的诞生,它弥补了 Runnable 没有返回值的空缺,通过简单的 demo 了解 Callable 与 Future 的使用。 FutureTask 又是 Future接口的核心实现类,通过阅读源码了解了整个实现逻辑,最后结合FutureTask 和线程池演示烧水泡茶程序,相信到这里,你已经可以轻松获取线程结果了

烧水泡茶是非常简单的,如果更复杂业务逻辑,以这种方式使用 Future 必定会带来很大的会乱(程序结束没办法主动通知,Future 的链接和整合都需要手动操作)为了解决这个短板,没错,又是那个男人 Doug Lea, CompletableFuture 工具类在 Java1.8 的版本出现了,搭配 Lambda 的使用,让我们编写异步程序也像写串行代码那样简单,纵享丝滑

接下来我们就了解一下 CompletableFuture 的使用

灵魂追问

  1. 你在日常开发工作中是怎样将整块任务做到分工与协作的呢?有什么基本准则吗?
  2. 如何批量的执行异步任务呢?

参考

  1. Java 并发编程实战
  2. Java 并发编程的艺术
  3. Java 并发编程之美

日拱一兵 | 原创

查看原文

赞 11 收藏 7 评论 0

日拱一兵 发布了文章 · 2020-07-01

CountDownLatch和CyclicBarrier 傻傻的分不清?超长精美图文又来了

crescent-4875339_1280.jpg

日拱一兵 | 原创

  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough

前言

并发编程的三大核心是分工同步互斥。在日常开发中,经常会碰到需要在主线程中开启多个子线程去并行的执行任务,并且主线程需要等待所有子线程执行完毕再进行汇总的场景,这就涉及到分工与同步的内容了

在讲 有序性可见性,Happens-before来搞定 时,提到过 join() 规则,使用 join() 就可以简单的实现上述场景:

@Slf4j
public class JoinExample {

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("Thread-1 执行完毕");
            }
        }, "Thread-1");

        Thread thread2 = new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("Thread-2 执行完毕");
            }
        }, "Thread-2");

        thread1.start();
        thread2.start();

        thread1.join();
        thread2.join();

        log.info("主线程执行完毕");
    }
}

运行结果:

整个过程可以这么理解

我们来查看 join() 的实现源码:

其实现原理是不停的检查 join 线程是否存活,如果 join 线程存活,则 wait(0) 永远的等下去,直至 join 线程终止后,线程的 this.notifyAll() 方法会被调用(该方法是在 JVM 中实现的,JDK 中并不会看到源码),退出循环恢复主线程执行。很显然这种循环检查的方式比较低效

除此之外,使用 join() 缺少很多灵活性,比如实际项目中很少让自己单独创建线程(原因在 我会手动创建线程,为什么要使用线程池? 中说过)而是使用 Executor, 这进一步减少了 join() 的使用场景,所以 join() 的使用在多数是停留在 demo 演示上

那如何实现文中开头提到的场景呢?

CountDownLatch

CountDownLatch, 直译过来【数量向下门闩】,那肯定里面有计数器的存在了。我们将上述程序用 CountDownLatch 实现一下,先让大家有个直观印象

@Slf4j
public class CountDownLatchExample {

    private static CountDownLatch countDownLatch = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        // 这里不推荐这样创建线程池,最好通过 ThreadPoolExecutor 手动创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("Thread-1 执行完毕");
                //计数器减1
                countDownLatch.countDown();
            }
        });

        executorService.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.info("Thread-2 执行完毕");
                //计数器减1
                countDownLatch.countDown();
            }
        });

        log.info("主线程等待子线程执行完毕");
        log.info("计数器值为:" + countDownLatch.getCount());
        countDownLatch.await();
        log.info("计数器值为:" + countDownLatch.getCount());
        log.info("主线程执行完毕");
        executorService.shutdown();
    }
}

运行结果如下:

结合上述示例的运行结果,相信你也能猜出 CountDownLatch 的实现原理了:

  1. 初始化计数器数值,比如为2
  2. 子线程执行完则调用 countDownLatch.countDown() 方法将计数器数值减1
  3. 主线程调用 await() 方法阻塞自己,直至计数器数值为0(即子线程全部执行结束)
不知道你是否注意,countDownLatch.countDown(); 这行代码可以写在子线程执行的任意位置,不像 join() 要完全等待子线程执行完,这也是 CountDownLatch 灵活性的一种体现

上述的例子还是过于简单,Oracle 官网 CountDownLatch 说明 有两个非常经典的使用场景,示例很简单,强烈建议查看相关示例代码,打开使用思路。我将两个示例代码以图片的形式展示在此处:

官网示例1

  • 第一个是开始信号 startSignal,阻止任何工人 Worker 继续工作,直到司机 Driver 准备好让他们继续工作
  • 第二个是完成信号 doneSignal,允许司机 Driver 等待,直到所有的工人 Worker 完成。

官网示例2

另一种典型的用法是将一个问题分成 N 个部分 (比如将一个大的 list 拆分成多分,每个 Worker 干一部分),Worker 执行完自己所处理的部分后,计数器减1,当所有子部分完成后,Driver 才继续向下执行

结合官网示例,相信你已经可以结合你自己的业务场景解,通过 CountDownLatch 解决一些串行瓶颈来提高运行效率了,会用还远远不够,咱得知道 CountDownLatch 的实现原理

源码分析

CountDownLatch 是 AQS 实现中的最后一个内容,有了前序文章的知识铺垫:

当你看到 CountDownLatch 的源码内容,你会高兴的笑起来,内容真是太少了

展开类结构全部内容就这点东西

既然 CountDownLatch 是基于 AQS 实现的,那肯定也离不开对同步状态变量 state 的操作,我们在初始化的时候就将计数器的值赋值给了state

另外,它可以多个线程同时获取,那一定是基于共享式获取同步变量的用法了,所以它需要通过重写下面两个方法控制同步状态变量 state :

  • tryAcquireShared()
  • tryReleaseShared()

CountDownLatch 暴露给使用者的只有 await()countDown() 两个方法,前者是阻塞自己,因为只有获取同步状态才会才会出现阻塞的情况,那自然是在 await() 的方法内部会用到 tryAcquireShared();有获取就要有释放,那后者 countDown() 方法内部自然是要用到 tryReleaseShared() 方法了

PS:如果你对上面这个很自然的推断理解有困难,强烈建议你看一下前序文章的铺垫,以防止知识断层带来的困扰

await()

先来看 await() 方法, 从方法签名上看,该方法会抛出 InterruptedException, 所以它是可以响应中断的,这个我们在 Java多线程中断机制 中明确说明过

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

其内部调用了同步器提供的模版方法 acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
      // 如果监测到中断标识为true,会重置标识,然后抛出 InterruptedException
    if (Thread.interrupted())
        throw new InterruptedException();
      // 调用重写的 tryAcquireShared 方法,该方法结果如果大于零则直接返回,程序继续向下执行,如果小于零,则会阻塞自己
    if (tryAcquireShared(arg) < 0)
          // state不等于0,则尝试阻塞自己
        doAcquireSharedInterruptibly(arg);
}

重写的 tryAcquireShared 方法非常简单, 就是判断同步状态变量 state 的值是否为 0, 如果为零 (子线程已经全部执行完毕)则返回1, 否则返回 -1

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

如果子线程没有全部执行完毕,则会通过 doAcquireSharedInterruptibly 方法阻塞自己,这个方法在 Java AQS共享式获取同步状态及Semaphore的应用分析 中已经仔细分析过了,这里就不再赘述了

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                  // 再次尝试获取同步装阿嚏,如果大于0,说明子线程全部执行完毕,直接返回
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
              // 阻塞自己
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

await() 方法的实现就是这么简单,接下来看看 countDown() 的实现原理

countDown()

public void countDown() {
    sync.releaseShared(1);
}

同样是调用同步器提供的模版方法 releaseShared

public final boolean releaseShared(int arg) {
      // 调用自己重写的同步器方法
    if (tryReleaseShared(arg)) {
          // 唤醒调用 await() 被阻塞的线程
        doReleaseShared();
        return true;
    }
    return false;
}

重写的 tryReleaseShared 同样很简单

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
          // 如果当前状态值为0,则直接返回 (1)
        if (c == 0)
            return false;
          // 使用 CAS 让计数器的值减1 (2)
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

代码 (1) 判断当前同步状态值,如果为0 则直接返回 false;否则执行代码 (2),使用 CAS 将计数器减1,如果 CAS 失败,则循环重试,最终返回 nextc == 0 的结果值,如果该值返回 true,说明最后一个线程已调用 countDown() 方法,然后就要唤醒调用 await() 方法被阻塞的线程,同样由于分析过 AQS 的模版方法 doReleaseShared 整个释放同步状态以及唤醒的过程,所以这里同样不再赘述了

仔细看CountDownLatch重写的 tryReleaseShared 方法,有一点需要和大家说明:

代码 (1) if (c == 0) 看似没什么用处,其实用处大大滴,如果没有这个判断,当计数器值已经为零了,其他线程再调用 countDown 方法会将计数器值变为负值

现在就差 await(long timeout, TimeUnit unit) 方法没介绍了

await(long timeout, TimeUnit unit)

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

该方法签名同样抛出 InterruptedException,意思可响应中断。它其实就是 await() 更完善的一个版本,简单来说就是

主线程设定等待超时时间,如果该时间内子线程没有执行完毕,主线程也会直接返回

我们将上面的例子稍稍修改一下你就会明白(主线程超时时间设置为 2 秒,而子线程要 sleep 5 秒)

@Slf4j
public class CountDownLatchTimeoutExample {

   private static CountDownLatch countDownLatch = new CountDownLatch(2);

   public static void main(String[] args) throws InterruptedException {
      // 这里不推荐这样创建线程池,最好通过 ThreadPoolExecutor 手动创建线程池
      ExecutorService executorService = Executors.newFixedThreadPool(2);

      executorService.submit(() -> {
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         } finally {
            log.info("Thread-1 执行完毕");
            //计数器减1
            countDownLatch.countDown();
         }
      });

      executorService.submit(() -> {
         try {
            Thread.sleep(5000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         } finally {
            log.info("Thread-2 执行完毕");
            //计数器减1
            countDownLatch.countDown();
         }
      });

      log.info("主线程等待子线程执行完毕");
      log.info("计数器值为:" + countDownLatch.getCount());
      countDownLatch.await(2, TimeUnit.SECONDS);
      log.info("计数器值为:" + countDownLatch.getCount());
      log.info("主线程执行完毕");
      executorService.shutdown();
   }
}

运行结果如下:

形象化的展示上述示例的运行过程

小结

CountDownLatch 的实现原理就是这么简单,了解了整个实现过程后,你也许发现了使用 CountDownLatch 的一个问题:

计数器减 1 操作是一次性的,也就是说当计数器减到 0, 再有线程调用 await() 方法,该线程会直接通过,不会再起到等待其他线程执行结果起到同步的作用了

为了解决这个问题,贴心的 Doug Lea 大师早已给我们准备好相应策略 CyclicBarrier

本来想将 CyclicBarrier 的内容放到下一个章节,但是 CountDownLatch 的内容着实有些少,不够解渴,另外有对比才有伤害,所以内容没结束,咱得继续看 CyclicBarrier

CyclicBarrier

上面简单说了一下 CyclicBarrier 被创造出来的理由,这里先看一下它的字面解释:

概念总是有些抽象,我们将上面的例子用 CyclicBarrier 再做个改动,先让大家有个直观的使用概念

@Slf4j
public class CyclicBarrierExample {

   // 创建 CyclicBarrier 实例,计数器的值设置为2
   private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

   public static void main(String[] args) {
      ExecutorService executorService = Executors.newFixedThreadPool(2);
      int breakCount = 0;

         // 将线程提交到线程池
      executorService.submit(() -> {
         try {
            log.info(Thread.currentThread() + "第一回合");
            Thread.sleep(1000);
            cyclicBarrier.await();

            log.info(Thread.currentThread() + "第二回合");
            Thread.sleep(2000);
            cyclicBarrier.await();

            log.info(Thread.currentThread() + "第三回合");
         } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
         } 
      });

      executorService.submit(() -> {
         try {
            log.info(Thread.currentThread() + "第一回合");
            Thread.sleep(2000);
            cyclicBarrier.await();

            log.info(Thread.currentThread() + "第二回合");
            Thread.sleep(1000);
            cyclicBarrier.await();

            log.info(Thread.currentThread() + "第三回合");
         } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
         }
      });

      executorService.shutdown();
   }

}

运行结果:

结合程序代码与运行结果,我们可以看出,子线程执行完第一回合后(执行回合所需时间不同),都会调用 await() 方法,等所有线程都到达屏障点后,会突破屏障继而执行第二回合,同样的道理最终到达第三回合

形象化的展示上述示例的运行过程

看到这里,你应该明白 CyclicBarrier 的基本用法,但随之你内心也应该有了一些疑问:

  1. 怎么判断所有线程都到达屏障点的?
  2. 突破某一屏障后,又是怎么重置 CyclicBarrier 计数器,等待线程再一次突破屏障呢?

带着这些问题我们来看一看源码

源码分析

同样先打开 CyclicBarrier 的类结构,展开类全部内容,其实也没多少内容

从类结构中看到有:

  1. await() 方法,猜测应该和 CountDownLatch 是类似的,都是获取同步状态,阻塞自己
  2. ReentrantLock,CyclicBarrier 内部竟然也用到了我们之前讲过的 ReentrantLock,猜测这个锁一定保护 CyclicBarrier 的某个变量,那肯定也是基于 AQS 相关知识了
  3. Condition,存在条件,猜测会有等待/通知机制的运用

我们继续带着这些猜测,结合上面的实例代码一点点来验证

// 创建 CyclicBarrier 实例,计数器的值设置为2
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

查看构造函数 (这里的英文注释舍不得删掉,因为说的太清楚了,我来结合注释来说明一下):

private final int parties;
private int count;

public CyclicBarrier(int parties) {
    this(parties, null);
}

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

根据注释说明,parties 代表冲破屏障之前要触发的线程总数,count 本身又是计数器,那问题来了

直接就用 count 不就可以了嘛?为啥同样用于初始化计数器,要维护两个变量呢?

从 parties 和 count 的变量声明中,你也能看出一些门道,前者有 final 修饰,初始化后就不可以改变了,因为 CyclicBarrier 的设计目的是可以循环利用的,所以始终用 parties 来记录线程总数,当 count 计数器变为 0 后,如果没有 parties 的值赋给它,怎么进行重新复用再次计数呢,所以这里维护两个变量很有必要

接下来就看看 await() 到底是怎么实现的

// 从方法签名上可以看出,该方法同样可以被中断,另外还有一个 BrokenBarrierException 异常,我们一会看
public int await() throws InterruptedException, BrokenBarrierException {
    try {
          // 调用内部 dowait 方法, 第一个参数为 false,表示不设置超时时间,第二个参数也就没了意义
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

接下来看看 dowait(false, 0L) 做了哪些事情 (这个方法内容有点多,别担心,逻辑并不复杂,请看关键代码注释)

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 还记得之前说过的 Lock 标准范式吗? JDK 内部都是这么使用的,你一定也要遵循范式
    lock.lock();
    try {
        final Generation g = generation;

          // broken 是静态内部类 Generation唯一的一个成员变量,用于记录当前屏障是否被打破,如果打破,则抛出 BrokenBarrierException 异常
          // 这里感觉挺困惑的,我们要【冲破】屏障,这里【打破】屏障却抛出异常,注意我这里的用词
        if (g.broken)
            throw new BrokenBarrierException();

          // 如果线程被中断,则会通过 breakBarrier 方法将 broken 设置为true,也就是说,如果有线程收到中断通知,直接就打破屏障,停止 CyclicBarrier, 并唤醒所有线程
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
      
          // ************************************
          // 因为 breakBarrier 方法在这里会被调用多次,为了便于大家理解,我直接将 breakBarrier 代码插入到这里
          private void breakBarrier() {
          // 将打破屏障标识 设置为 true
          generation.broken = true;
          // 重置计数器
          count = parties;
          // 唤醒所有等待的线程
          trip.signalAll();
            }
          // ************************************

                // 每当一个线程调用 await 方法,计数器 count 就会减1
        int index = --count;
          // 当 count 值减到 0 时,说明这是最后一个调用 await() 的子线程,则会突破屏障
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                  // 获取构造函数中的 barrierCommand,如果有值,则运行该方法
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                  // 激活其他因调用 await 方法而被阻塞的线程,并重置 CyclicBarrier
                nextGeneration();
              
                // ************************************
                // 为了便于大家理解,我直接将 nextGeneration 实现插入到这里
                private void nextGeneration() {
                    // signal completion of last generation
                    trip.signalAll();
                    // set up next generation
                    count = parties;
                    generation = new Generation();
                }
                // ************************************
              
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

          // index 不等于0, 说明当前不是最后一个线程调用 await 方法
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                  // 没有设置超时时间
                if (!timed)
                      // 进入条件等待
                    trip.await();
                else if (nanos > 0L)
                      // 否则,判断超时时间,这个我们在 AQS 中有说明过,包括为什么最后超时阈值 spinForTimeoutThreshold 不再比较的原因,大家会看就好
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                  // 条件等待被中断,则判断是否有其他线程已经使屏障破坏。若没有则进行屏障破坏处理,并抛出异常;否则再次中断当前线程

                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

              // 如果新一轮回环结束,会通过 nextGeneration 方法新建 generation 对象
            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

doWait 就是 CyclicBarrier 的核心逻辑, 可以看出,该方法入口使用了 ReentrantLock,这也就是为什么 Generation broken 变量没有被声明为 volatile 类型保持可见性,因为对其的更改都是在锁的内部,同样在锁的内部对计数器 count 做更新,也保证了原子性

doWait 方法中,是通过 nextGeneration 方法来重新初始化/重置 CyclicBarrier 状态的,该类中还有一个 reset() 方法,也是重置 CyclicBarrier 状态的

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

但 reset() 方法并没有在 CyclicBarrier 内部被调用,显然是给 CyclicBarrier 使用者来调用的,那问题来了

什么时候调用 reset() 方法呢

正常情况下,CyclicBarrier 是会被自动重置状态的,从 reset 的方法实现中可以看出调用了 breakBarrier

方法,也就是说,调用 reset 会使当前处在等待中的线程最终抛出 BrokenBarrierException 并立即被唤醒,所以说 reset() 只会在你想打破屏障时才会使用

上述示例,我们构建 CyclicBarrier 对象时,并没有传递 barrierCommand 对象, 我们修改示例传入一个 barrierCommand 对象,看看会有什么结果:

// 创建 CyclicBarrier 实例,计数器的值设置为2
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
   log.info("全部运行结束");
});

运行结果:

从运行结果中来看,每次冲破屏障后都会执行 CyclicBarrier 初始化 barrierCommand 的方法, 这与我们对 doWait() 方法的分析完全吻合,从上面的运行结果中可以看出,最后一个线程是运行 barrierCommand run() 方法的线程,我们再来形象化的展示一下整个过程

从上图可以看出,barrierAction 与每次突破屏障是串行化的执行过程,假如 barrierAction 是很耗时的汇总操作,那这就是可以优化的点了,我们继续修改代码

// 创建单线程线程池
private static Executor executor = Executors.newSingleThreadExecutor();

// 创建 CyclicBarrier 实例,计数器的值设置为2
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
   executor.execute(() -> gather());
});

private static void gather() {
   try {
      Thread.sleep(2000);
   } catch (InterruptedException e) {
      e.printStackTrace();
   }
   log.info("全部运行结束");
}

我们这里将 CyclicBarrier 的回调函数 barrierAction使用单线程的线程池,这样最后一个冲破屏障的线程就不用等待 barrierAction 的执行,直接分配个线程池里的线程异步执行,进一步提升效率

运行结果如下:

我们再形象化的看一下整个过程:

这里使用了单一线程池,增加了并行操作,提高了程序运行效率,那问题来了:

如果 barrierAction 非常非常耗时,冲破屏障的任务就可能堆积在单一线程池的等待队列中,就存在 OOM 的风险,那怎么办呢?

这是就要需要一定的限流策略或者使用线程池的拒绝的略等

那把单一线程池换成非单一的固定线程池不就可以了嘛?比如 fixed(5)

乍一看确实能缓解单线程池可能引起的任务堆积问题,上面代码我们看到的 gather() 方法,假如该方法内部没有使用锁或者说存在竟态条件,那 CyclicBarrier 的回调函数 barrierAction 使用多线程必定引起结果的不准确

所以在实际使用中还要结合具体的业务场景不断优化代码,使之更加健壮

总结

本文讲解了 CountDownLatch 和 CyclicBarrier 的经典使用场景以及实现原理,以及在使用过程中可能会遇到的问题,比如将大的 list 拆分作业就可以用到前者,读取多个 Excel 的sheet 页,最后进行结果汇总就可以用到后者 (文中完整示例代码已上传)

最后,再形象化的比喻一下

  • CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有游客到齐才能去下一个景点
  • 而 CyclicBarrier 是一组线程之间的相互等待,可以类比几个驴友之间的不离不弃,共同到达某个地方,再继续出发,这样反复

灵魂追问

  1. 怎样拿到 CyclicBarrier 的汇总结果呢?
  2. 线程池中的 Future 特性你有使用过吗?

接下来,咱们就聊聊那些可以使用的 Future 特性

参考

  1. Java 并发编程实战
  2. Java 并发编程的艺术
  3. Java 并发编程之美
  4. When to reset CyclicBarrier in java multithreading

日拱一兵 | 原创

查看原文

赞 16 收藏 12 评论 0

日拱一兵 收藏了文章 · 2020-06-28

Java多线程进阶(十一)—— J.U.C之locks框架:StampedLock

11.jpeg

本文首发于一世流云的专栏:https://segmentfault.com/blog...

一、StampedLock类简介

StampedLock类,在JDK1.8时引入,是对读写锁ReentrantReadWriteLock的增强,该类提供了一些功能,优化了读锁、写锁的访问,同时使读写锁之间可以互相转换,更细粒度控制并发。

首先明确下,该类的设计初衷是作为一个内部工具类,用于辅助开发其它线程安全组件,用得好,该类可以提升系统性能,用不好,容易产生死锁和其它莫名其妙的问题。

1.1 StampedLock的引入

先来看下,为什么有了ReentrantReadWriteLock,还要引入StampedLock?

ReentrantReadWriteLock使得多个读线程同时持有读锁(只要写锁未被占用),而写锁是独占的。

但是,读写锁如果使用不当,很容易产生“饥饿”问题:

比如在读线程非常多,写线程很少的情况下,很容易导致写线程“饥饿”,虽然使用“公平”策略可以一定程度上缓解这个问题,但是“公平”策略是以牺牲系统吞吐量为代价的。(在ReentrantLock类的介绍章节中,介绍过这种情况)

1.2 StampedLock的特点

StampedLock的主要特点概括一下,有以下几点:

  1. 所有获取锁的方法,都返回一个邮戳(Stamp),Stamp为0表示获取失败,其余都表示成功;
  2. 所有释放锁的方法,都需要一个邮戳(Stamp),这个Stamp必须是和成功获取锁时得到的Stamp一致;
  3. StampedLock是不可重入的;(如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁)
  4. StampedLock有三种访问模式:
    ①Reading(读模式):功能和ReentrantReadWriteLock的读锁类似
    ②Writing(写模式):功能和ReentrantReadWriteLock的写锁类似
    ③Optimistic reading(乐观读模式):这是一种优化的读模式。
  5. StampedLock支持读锁和写锁的相互转换
    我们知道RRW中,当线程获取到写锁后,可以降级为读锁,但是读锁是不能直接升级为写锁的。
    StampedLock提供了读锁和写锁相互转换的功能,使得该类支持更多的应用场景。
  6. 无论写锁还是读锁,都不支持Conditon等待
我们知道,在ReentrantReadWriteLock中,当读锁被使用时,如果有线程尝试获取写锁,该写线程会阻塞。
但是,在Optimistic reading中,即使读线程获取到了读锁,写线程尝试获取写锁也不会阻塞,这相当于对读模式的优化,但是可能会导致数据不一致的问题。所以,当使用Optimistic reading获取到读锁时,必须对获取结果进行校验。

二、StampedLock使用示例

先来看一个Oracle官方的例子:

class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();    //涉及对共享资源的修改,使用写锁-独占操作
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    /**
     * 使用乐观读锁访问共享资源
     * 注意:乐观读锁在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其他写线程已经修改了数据,
     * 而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。
     *
     * @return
     */
    double distanceFromOrigin() {
        long stamp = sl.tryOptimisticRead();    // 使用乐观读锁
        double currentX = x, currentY = y;      // 拷贝共享资源到本地方法栈中
        if (!sl.validate(stamp)) {              // 如果有写锁被占用,可能造成数据不一致,所以要切换到普通读锁模式
            stamp = sl.readLock();             
            try {
                currentX = x;
                currentY = y;
            } finally {
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    void moveIfAtOrigin(double newX, double newY) { // upgrade
        // Could instead start with optimistic, not read mode
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                long ws = sl.tryConvertToWriteLock(stamp);  //读锁转换为写锁
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            sl.unlock(stamp);
        }
    }
}

可以看到,上述示例最特殊的其实是distanceFromOrigin方法,这个方法中使用了“Optimistic reading”乐观读锁,使得读写可以并发执行,但是“Optimistic reading”的使用必须遵循以下模式:

long stamp = lock.tryOptimisticRead();  // 非阻塞获取版本信息
copyVaraibale2ThreadMemory();           // 拷贝变量到线程本地堆栈
if(!lock.validate(stamp)){              // 校验
    long stamp = lock.readLock();       // 获取读锁
    try {
        copyVaraibale2ThreadMemory();   // 拷贝变量到线程本地堆栈
     } finally {
       lock.unlock(stamp);              // 释放悲观锁
    }

}
useThreadMemoryVarables();              // 使用线程本地堆栈里面的数据进行操作

三、StampedLock原理

3.1 StampedLock的内部常量

StampedLock虽然不像其它锁一样定义了内部类来实现AQS框架,但是StampedLock的基本实现思路还是利用CLH队列进行线程的管理,通过同步状态值来表示锁的状态和类型。

StampedLock内部定义了很多常量,定义这些常量的根本目的还是和ReentrantReadWriteLock一样,对同步状态值按位切分,以通过位运算对State进行操作:

对于StampedLock来说,写锁被占用的标志是第8位为1,读锁使用0-7位,正常情况下读锁数目为1-126,超过126时,使用一个名为readerOverflow的int整型保存超出数。

clipboard.png

部分常量的比特位表示如下:
clipboard.png

另外,StampedLock相比ReentrantReadWriteLock,对多核CPU进行了优化,可以看到,当CPU核数超过1时,会有一些自旋操作:
clipboard.png

3.2 示例分析

假设现在有三个线程:ThreadA、ThreadB、ThreadC、ThreadD。操作如下:
//ThreadA调用writeLock, 获取写锁
//ThreadB调用readLock, 获取读锁
//ThreadC调用readLock, 获取读锁
//ThreadD调用writeLock, 获取写锁
//ThreadE调用readLock, 获取读锁

1. StampedLock对象的创建

StampedLock的构造器很简单,构造时设置下同步状态值:
clipboard.png

另外,StamedLock提供了三类视图:
clipboard.png

这些视图其实是对StamedLock方法的封装,便于习惯了ReentrantReadWriteLock的用户使用:
例如,ReadLockView其实相当于ReentrantReadWriteLock.readLock()返回的读锁;
clipboard.png

2. ThreadA调用writeLock获取写锁

来看下writeLock方法:
clipboard.png

StampedLock中大量运用了位运算,这里(s = state) & ABITS == 0L 表示读锁和写锁都未被使用,这里写锁可以立即获取成功,然后CAS操作更新同步状态值State。

操作完成后,等待队列的结构如下:
clipboard.png

注意:StampedLock中,等待队列的结点要比AQS中简单些,仅仅三种状态。
0:初始状态
-1:等待中
1:取消

另外,结点的定义中有个cowait字段,该字段指向一个栈,用于保存读线程,这个后续会讲到。
clipboard.png

3. ThreadB调用readLock获取读锁

来看下readLock方法:
由于ThreadA此时持有写锁,所以ThreadB获取读锁失败,将调用acquireRead方法,加入等待队列:
clipboard.png

acquireRead方法非常复杂,用到了大量自旋操作:

/**
 * 尝试自旋的获取读锁, 获取不到则加入等待队列, 并阻塞线程
 *
 * @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
 * @param deadline      如果非0, 则表示限时获取
 * @return 非0表示获取成功, INTERRUPTED表示中途被中断过
 */
private long acquireRead(boolean interruptible, long deadline) {
    WNode node = null, p;   // node指向入队结点, p指向入队前的队尾结点

    /**
     * 自旋入队操作
     * 如果写锁未被占用, 则立即尝试获取读锁, 获取成功则返回.
     * 如果写锁被占用, 则将当前读线程包装成结点, 并插入等待队列(如果队尾是写结点,直接链接到队尾;否则,链接到队尾读结点的栈中)
     */
    for (int spins = -1; ; ) {
        WNode h;
        if ((h = whead) == (p = wtail)) {   // 如果队列为空或只有头结点, 则会立即尝试获取读锁
            for (long m, s, ns; ; ) {
                if ((m = (s = state) & ABITS) < RFULL ?     // 判断写锁是否被占用
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :  //写锁未占用,且读锁数量未超限, 则更新同步状态
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))        //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
                    return ns;          // 获取成功后, 直接返回
                else if (m >= WBIT) {   // 写锁被占用,以随机方式探测是否要退出自旋
                    if (spins > 0) {
                        if (LockSupport.nextSecondarySeed() >= 0)
                            --spins;
                    } else {
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        spins = SPINS;
                    }
                }
            }
        }
        if (p == null) {                            // p == null表示队列为空, 则初始化队列(构造头结点)
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        } else if (node == null) {                  // 将当前线程包装成读结点
            node = new WNode(RMODE, p);
        } else if (h == p || p.mode != RMODE) {     // 如果队列只有一个头结点, 或队尾结点不是读结点, 则直接将结点链接到队尾, 链接完成后退出自旋
            if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }
        // 队列不为空, 且队尾是读结点, 则将添加当前结点链接到队尾结点的cowait链中(实际上构成一个栈, p是栈顶指针 )
        else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) {    // CAS操作队尾结点p的cowait字段,实际上就是头插法插入结点
            node.cowait = null;
        } else {
            for (; ; ) {
                WNode pp, c;
                Thread w;
                // 尝试唤醒头结点的cowait中的第一个元素, 假如是读锁会通过循环释放cowait链
                if ((h = whead) != null && (c = h.cowait) != null &&
                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null) // help release
                    U.unpark(w);
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    do {
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s,
                                ns = s + RUNIT) :
                            (m < WBIT &&
                                (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                    } while (m < WBIT);
                }
                if (whead == h && p.prev == pp) {
                    long time;
                    if (pp == null || h == p || p.status > 0) {
                        node = null; // throw away
                        break;
                    }
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, p, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) {
                        // 写锁被占用, 且当前结点不是队首结点, 则阻塞当前线程
                        U.park(false, time);
                    }
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, p, true);
                }
            }
        }
    }

    for (int spins = -1; ; ) {
        WNode h, np, pp;
        int ps;
        if ((h = whead) == p) {     // 如果当前线程是队首结点, 则尝试获取读锁
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins; ; ) { // spin at head
                long m, s, ns;
                if ((m = (s = state) & ABITS) < RFULL ?     // 判断写锁是否被占用
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :  //写锁未占用,且读锁数量未超限, 则更新同步状态
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {      //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
                    // 获取读锁成功, 释放cowait链中的所有读结点
                    WNode c;
                    Thread w;

                    // 释放头结点, 当前队首结点成为新的头结点
                    whead = node;
                    node.prev = null;

                    // 从栈顶开始(node.cowait指向的结点), 依次唤醒所有读结点, 最终node.cowait==null, node成为新的头结点
                    while ((c = node.cowait) != null) {
                        if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
                            U.unpark(w);
                    }
                    return ns;
                } else if (m >= WBIT &&
                    LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                    break;
            }
        } else if (h != null) {     // 如果头结点存在cowait链, 则唤醒链中所有读线程
            WNode c;
            Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        if (whead == h) {
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            } else if ((ps = p.status) == 0)        // 将前驱结点的等待状态置为WAITING, 表示之后将唤醒当前结点
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            } else {        // 阻塞当前读线程
                long time;
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)   //限时等待超时, 取消等待
                    return cancelWaiter(node, node, false);

                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) {
                    // 如果前驱的等待状态为WAITING, 且写锁被占用, 则阻塞当前调用线程
                    U.park(false, time);
                }
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

我们来分析下这个方法。
该方法会首先自旋的尝试获取读锁,获取成功后,就直接返回;否则,会将当前线程包装成一个读结点,插入到等待队列。
由于,目前等待队列还是空,所以ThreadB会初始化队列,然后将自身包装成一个读结点,插入队尾,然后在下面这个地方跳出自旋:
clipboard.png

此时,等待队列的结构如下:
clipboard.png

跳出自旋后,ThreadB会继续向下执行,进入下一个自旋,在下一个自旋中,依然会再次尝试获取读锁,如果这次再获取不到,就会将前驱的等待状态置为WAITING, 表示我(当前线程)要去睡了(阻塞),到时记得叫醒我:
clipboard.png

clipboard.png

最终, ThreadB进入阻塞状态:
clipboard.png

最终,等待队列的结构如下:

clipboard.png

4. ThreadC调用readLock获取读锁

这个过程和ThreadB获取读锁一样,区别在于ThreadC被包装成结点加入等待队列后,是链接到ThreadB结点的栈指针中的。调用完下面这段代码后,ThreadC会链接到以Thread B为栈顶指针的栈中:
clipboard.png

clipboard.png

注意:读结点的cowait字段其实构成了一个栈,入栈的过程其实是个“头插法”插入单链表的过程。比如,再来个ThreadX读结点,则cowait链表结构为:ThreadB - > ThreadX -> ThreadC。最终唤醒读结点时,将从栈顶开始。

然后会在下一次自旋中,阻塞当前读线程:
clipboard.png

最终,等待队列的结构如下:
clipboard.png

可以看到,此时ThreadC结点并没有把它的前驱的等待状态置为-1,因为ThreadC是链接到栈中的,当写锁释放的时候,会从栈底元素开始,唤醒栈中所有读结点。

5. ThreadD调用writeLock获取写锁

ThreadD调用writeLock方法获取写锁失败后(ThreadA依然占用着写锁),会调用acquireWrite方法,该方法整体逻辑和acquireRead差不多,首先自旋的尝试获取写锁,获取成功后,就直接返回;否则,会将当前线程包装成一个写结点,插入到等待队列。

clipboard.png

acquireWrite源码:

/**
 * 尝试自旋的获取写锁, 获取不到则阻塞线程
 *
 * @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
 * @param deadline      如果非0, 则表示限时获取
 * @return 非0表示获取成功, INTERRUPTED表示中途被中断过
 */
private long acquireWrite(boolean interruptible, long deadline) {
    WNode node = null, p;

    /**
     * 自旋入队操作
     * 如果没有任何锁被占用, 则立即尝试获取写锁, 获取成功则返回.
     * 如果存在锁被使用, 则将当前线程包装成独占结点, 并插入等待队列尾部
     */
    for (int spins = -1; ; ) {
        long m, s, ns;
        if ((m = (s = state) & ABITS) == 0L) {      // 没有任何锁被占用
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))    // 尝试立即获取写锁
                return ns;                                                 // 获取成功直接返回
        } else if (spins < 0)
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        } else if ((p = wtail) == null) {       // 队列为空, 则初始化队列, 构造队列的头结点
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        } else if (node == null)               // 将当前线程包装成写结点
            node = new WNode(WMODE, p);
        else if (node.prev != p)
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {    // 链接结点至队尾
            p.next = node;
            break;
        }
    }

    for (int spins = -1; ; ) {
        WNode h, np, pp;
        int ps;
        if ((h = whead) == p) {     // 如果当前结点是队首结点, 则立即尝试获取写锁
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins; ; ) { // spin at head
                long s, ns;
                if (((s = state) & ABITS) == 0L) {      // 写锁未被占用
                    if (U.compareAndSwapLong(this, STATE, s,
                        ns = s + WBIT)) {               // CAS修改State: 占用写锁
                        // 将队首结点从队列移除
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                } else if (LockSupport.nextSecondarySeed() >= 0 &&
                    --k <= 0)
                    break;
            }
        } else if (h != null) {  // 唤醒头结点的栈中的所有读线程
            WNode c;
            Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        if (whead == h) {
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            } else if ((ps = p.status) == 0)        // 将当前结点的前驱置为WAITING, 表示当前结点会进入阻塞, 前驱将来需要唤醒我
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            } else {        // 阻塞当前调用线程
                long time;  // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p)
                    U.park(false, time);    // emulate LockSupport.park
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

acquireWrite中的下面这个自旋操作,用于将线程包装成写结点,插入队尾:
clipboard.png

插入完成后,队列结构如下:
clipboard.png

然后,进入下一个自旋,并在下一个自旋中阻塞ThreadD,最终队列结构如下:
clipboard.png

6. ThreadE调用readLock获取读锁

同样,由于写锁被ThreadA占用着,所以最终会调用acquireRead方法,在该方法的第一个自旋中,会将ThreadE加入等待队列:
clipboard.png

注意,由于队尾结点是写结点,所以当前读结点会直接链接到队尾;如果队尾是读结点,则会链接到队尾读结点的cowait链中。

然后进入第二个自旋,阻塞ThreadE,最终队列结构如下:
clipboard.png

7. ThreadA调用unlockWrite释放写锁

通过CAS操作,修改State成功后,会调用release方法唤醒等待队列的队首结点:
clipboard.png

release方法非常简单,先将头结点的等待状态置为0,表示即将唤醒后继结点,然后立即唤醒队首结点:
clipboard.png

此时,等待队列的结构如下:
clipboard.png

8. ThreadB被唤醒后继续向下执行

ThreadB被唤醒后,会从原阻塞处继续向下执行,然后开始下一次自旋:
clipboard.png

第二次自旋时,ThreadB发现写锁未被占用,则成功获取到读锁,然后从栈顶(ThreadB的cowait指针指向的结点)开始唤醒栈中所有线程,
最后返回:
clipboard.png

最终,等待队列的结构如下:
clipboard.png

9. ThreadC被唤醒后继续向下执行

ThreadC被唤醒后,继续执行,并进入下一次自旋,下一次自旋时,会成功获取到读锁。
clipboard.png

注意,此时ThreadB和ThreadC已经拿到了读锁,ThreadD(写线程)和ThreadE(读线程)依然阻塞中,原来ThreadC对应的结点是个孤立结点,会被GC回收。

最终,等待队列的结构如下:
clipboard.png

10. ThreadB和ThreadC释放读锁

ThreadB和ThreadC调用unlockRead方法释放读锁,CAS操作State将读锁数量减1:
clipboard.png

注意,当读锁的数量变为0时才会调用release方法,唤醒队首结点:
clipboard.png

队首结点(ThreadD写结点被唤醒),最终等待队列的结构如下:
clipboard.png

11. ThreadD被唤醒后继续向下执行

ThreadD会从原阻塞处继续向下执行,并在下一次自旋中获取到写锁,然后返回:
clipboard.png

最终,等待队列的结构如下:
clipboard.png

12. ThreadD调用unlockWrite释放写锁

ThreadD释放写锁的过程和步骤7完全相同,会调用unlockWrite唤醒队首结点(ThreadE)。

clipboard.png

ThreadE被唤醒后会从原阻塞处继续向下执行,但由于ThreadE是个读结点,所以同时会唤醒cowait栈中的所有读结点,过程和步骤8完全一样。最终,等待队列的结构如下:
clipboard.png

至此,全部执行完成。

四、StampedLock类/方法声明

参考Oracle官方文档:https://docs.oracle.com/javas...
类声明:
clipboard.png

方法声明:
clipboard.png

五、StampedLock总结

StampedLock的等待队列与RRW的CLH队列相比,有以下特点:

  1. 当入队一个线程时,如果队尾是读结点,不会直接链接到队尾,而是链接到该读结点的cowait链中,cowait链本质是一个栈;
  2. 当入队一个线程时,如果队尾是写结点,则直接链接到队尾;
  3. 唤醒线程的规则和AQS类似,都是首先唤醒队首结点。区别是StampedLock中,当唤醒的结点是读结点时,会唤醒该读结点的cowait链中的所有读结点(顺序和入栈顺序相反,也就是后进先出)。

另外,StampedLock使用时要特别小心,避免锁重入的操作,在使用乐观读锁时也需要遵循相应的调用模板,防止出现数据不一致的问题。

查看原文

日拱一兵 关注了用户 · 2020-06-26

芒果果 @wangying_5ea4fb9de961c

一路走走看看,顺便留下点什么。

关注 43

日拱一兵 赞了文章 · 2020-06-26

思否有约丨于超:在自己的舒适区边缘疯狂试探

于超.png

本期对话嘉宾:@于超(日拱一兵
访谈编辑:芒果果

「拧巴」是大多数年轻人的生活状态,既想跳出工作、学业、家庭的限制,又无法摆脱这些东西带给自己的安全感。于是他们开始在不断的自我纠结中矛盾的生活,无法调节自己的人开始反抗全世界,找到平衡的人则在其中找到了进步的动力。

在外企担任 team lead 职位的于超也是个「矛盾体」,面对变化常常带着既期待又害怕的情绪。就像他明明喜欢极限运动,却一致没胆量尝试,总在自己的舒适区边缘疯狂试探。用他自己的话来说就是“有时候内心有一种说不出来的感觉,就是想逼自己尝试点新玩意。”

更可怕的是,于超还是个典型的处女座,他做事一向有条理有规划,总觉得自己不够好,每天都会提前半小时到公司,至今还在坚持每天至少学 30 分钟英语。看到如此自律的生活,让小编不禁甩动起了咸鱼的尾巴。


360° 矛盾体,一个不威武的内蒙汉子、渴望经济学的程序员、想去摆摊儿的精英

Q:尝试用两种方式介绍自己

第一种:来自内蒙古不懂蒙语,不吃羊肉,酒精严重过敏,也不威武雄壮,更不会套马的汉子。

第二种:处女座,这个是不是能说明一切了? (感觉人生无比艰难)

Q:什么时候开始接触编程的,契机是什么?

一切都是天意,本科被调配到计算机专业,大一才知道那个现已植入脑海的 hello world,从此开始了编程之旅。

Q:大一被调剂到计算机专业其实是个意外,那你最初的梦想是什么呢?

当初的爱好不是很明确,也没有十分准确的梦想,不过内心渴望的是经济学相关的东西。

Q:刚开始接触计算机有没有觉得自己不合适,想过放弃吗?

非常强烈的不适应,我算是上大学才碰电脑,想过放弃,比如去开个小吃店等天马行空的想法,现在都有摆个臭豆腐摊位的冲动, 哈哈。


渴望新鲜感,却又不喜欢未知的恐惧,每件事都要做好规划有条不紊的推进。

Q:分享一下你的工作流,有什么个人的特别的工作习惯么?

那就说每天吧,因为整体工作流程都差不多一样,我每天会提前半个小时左右到公司,查看邮件,写当天 todo list 以及查阅昨天 todo list 安排整体计划,然后就是stand up meeting 和满满一天的工作,下班回来写博客,逢周三四六日,会跟着keep 做45分钟徒手训练。英语能满足日常工作,但是还不够好,所以每天也有至少30分钟的学习英语时间。

于超工作台.png

Q:每天提前到公司安排计划,这是后天养成的习惯还是性格使然?

我觉得二者都有,一般有约的事情我都会提前到,晚到我会觉得不好意思,去公司上班也算是和公司的约定吧。另外早去一会不用匆忙,步伐不紧不慢,节奏会比较稳,我不喜欢慌乱,喜欢有条理有步骤地安排接下来要发生的事情,减少点对未知的恐惧。

Q:坚持keep和学英语多久了?

keep 前后大概快3年,中途腰受伤休息了一阵子,最新的坚持周期是从春节到现在,疫情期间没胖,反而还瘦了一些。

英语一直有在学,毕竟程序猿要看官方文档的。

Q:英语对工作来说很重要么?有什么学英语的好方法?

我在一个外企工作,所以英语非常非常重要,关于学习英语我也尝试过几种方法,后来觉得选定一个方法就是坚持吧,人都有遗忘曲线,多学多练,忘了再学再练,(感觉是废话),另外 B 站 很多 up 主提供的资料非常丰富,总是不缺少学习资料,缺少行动吧。


通过输出倒逼自己输入才能不断成长

Q:掌握的技术栈?目前主要使用的编程语言是?

主要Java / Spring 技术栈,主要语言是 Java 和 Node.js。

Q:工作中最常使用的几个工具是什么?好用的插件推荐?

主要是用 IDEA, VSCode, DataGrip 这三个工具, IDEA 中好用的插件太多了,我还特意在咱们思否上发表过一篇文章【我在 IDEA 中必有得插件和配置

Q:最近有没有尝试新的编程语言?一般通过什么方式和渠道提升自己的能力?

最近没学习什么新语言,主要在搞 AWS 各种服务。自我提升,比如看官博/论坛,逛同性社区交友网站github,另外我个人也写博客和维护公众号,通过写作输出倒逼自己持续输入,不断追问自己为什么,反而对问题理解的更加深入,我信奉的两句话之一是:If you can NOT explain it simply, you do NOT understand it well enough。

Q:疫情期间工作有什么影响?是怎么应对的?

由于疫情要远程在家办公,沟通与协作有一定的成本,解决方式就是大家彼此都更主动一点,有问题赶紧抛出来,其实大家有一定主观能动性就会好很多。

Q:目前为止最满意的开发项目是什么?

说满意还真没有,我是那个可怕星座的人,哈哈,因为项目中总是有各种问题或者没办法改变的东西。不过我挺享受现在的项目,所有项目都在陆续搬到 AWS 上,慢慢探索上面200多个服务,总能发现惊喜,另外也将好多服务serverless 化,算是在接触新东西,所以很满意。

Q:工作之后有哪个瞬间让你觉得很有成就感?又有哪个瞬间让你“怀疑人生”?

这个瞬间有很多,比如获得读者朋友的肯定和鼓励,比如参加公司内部竞赛和团队一起拿第一,关键是拿到bonus。我认为,每天都需要有成就感,即使改掉一个bug也是成就,程序员需要各种小的成就来自我驱动。

怀疑人生的太多了,甚至会觉得自己不适合做程序猿。有时候会一叶障目,这时候建议先停止集中思考,容易掉进死胡同,面对生活的窘境,扇自己两巴掌告诉自己现实 (开个玩笑)。

Q:如果可以重新选择是否还会选择这个职业?

我觉得可能不会,我可能会选择当大夫,哪有那么多如果呢,既然干了程序员,那就好好干,也算是干一行爱一行吧。


动静皆宜,内心既平和又不服输

Q:与思否的故事?

2019 年 6 月 17 日加入 SF,相见恨晚。我是陆续在写东西,然后通过 openwrite 就加入了 SF,S F的界面风格很简约,不会有“密集恐惧症”,另外我觉得 SF 是非常注重用户反馈,“接地气”的社区,这里的氛围非常棒。

Q:对社区有哪些建议和意见?

就是不忘初心吧,见到过一些社区慢慢偏离了轨道。

Q:生活中有什么爱好?

疫情之前,每周至少1~2场羽毛球,偶尔拿出落灰的尤克里里和空灵鼓简单摆弄一会,没有艺术细菌是硬伤。

Q:看到您有跳伞的照片,是喜欢极限运动么?

我非常喜欢运动,也喜欢极限运动项目,但我自己没胆量尝试极限运动,我非常恐高。有时候内心有一种说不出来的感觉,就是想逼自己尝试点新玩意。

Q:有什么释放工作压力的方式?

动则打羽毛球,出一身汗的感觉非常舒服;静则去海边听海浪的声音,放空自己,总之是找人少的地方,我不喜欢嘈杂的环境,总体来说我喜欢安静自然的地方。

Q:可以用几个词或一句话评价一下自己么?

不服输,内心渴望美好,不够果断,胆子小,在舒适区边缘疯狂试探。

对编程初学者和怀抱梦想的年轻人有什么建议?

纸上得来终觉浅,千万别眼高手低,多多实践练习,工作的前几年真的非常重要,学再多内容都不过分,充分利用自己的时间。另外有梦想也要敢于尝试,心有多大,舞台就有多大,有机会还是要出去转转,多感受不同文化带来的观念冲击,才能有更透彻的思考。


小编有话说:

跟于超的交流中,我一直有种被追着走的感觉,从开始向他发出访谈邀约后,他就和我商定了回复的时间,毕竟工作也很忙,需要安排时间进行。

这期间我一次都没有催过他,但还是准时得到了他的回复,这在我进行的各种类型的线上采访中可以算是个例了。这么准时的访谈对象可能是每个记者都想拥有的吧~

深入交流后,这种感受就更深了,但这并不妨碍他做些计划之外的事情,也算是一种生活情趣吧。但对这种每天提前到公司做一天规划,每天坚持锻炼身体,不间断学习英语的毅力,我向来只有一种回复:我可做不到。

不过看到于超井井有条又不乏刺激的生活后,竟然让我有了做个 todo list 的冲动。如果你也有同感,那也不妨试试,生活嘛,还是要有些改变。


思否有约.png

欢迎有兴趣参与访谈的小伙伴踊跃报名,《思否有约》将把你与编程有关的故事记录下来。报名邮箱:mango@sifou.com

查看原文

赞 3 收藏 0 评论 0

日拱一兵 发布了文章 · 2020-06-24

搞定ReentrantReadWriteLock 几道小小数学题就够了

前言

有了以上两篇文章的铺垫,来理解本文要介绍的既有独占式,又有共享式获取同步状态的 ReadWriteLock,就非常轻松了

ReadWriteLock

ReadWriteLock 直译过来为【读写锁】。现实中,读多写少的业务场景是非常普遍的,比如应用缓存

一个线程将数据写入缓存,其他线程可以直接读取缓存中的数据,提高数据查询效率

之前提到的互斥锁都是排他锁,也就是说同一时刻只允许一个线程进行访问,当面对可共享读的业务场景,互斥锁显然是比较低效的一种处理方式。为了提高效率,读写锁模型就诞生了

效率提升是一方面,但并发编程更重要的是在保证准确性的前提下提高效率

一个写线程改变了缓存中的值,其他读线程一定是可以 “感知” 到的,否则可能导致查询到的值不准确

所以关于读写锁模型就了下面这 3 条规定:

  1. 允许多个线程同时读共享变量
  2. 只允许一个线程写共享变量
  3. 如果写线程正在执行写操作,此时则禁止其他读线程读共享变量

ReadWriteLock 是一个接口,其内部只有两个方法:

public interface ReadWriteLock {
    // 返回用于读的锁
    Lock readLock();

    // 返回用于写的锁
    Lock writeLock();
}

所以要了解整个读/写锁的整个应用过程,需要从它的实现类 ReentrantReadWriteLock 说起

ReentrantReadWriteLock 类结构

直接对比ReentrantReadWriteLock 与 ReentrantLock的类结构

他们又很相似吧,根据类名称以及类结构,按照咱们前序文章的分析,你也就能看出 ReentrantReadWriteLock 的基本特性:

其中黄颜色标记的的 锁降级 是看不出来的, 这里先有个印象,下面会单独说明

另外,不知道你是否还记得,Java AQS队列同步器以及ReentrantLock的应用 说过,Lock 和 AQS 同步器是一种组合形式的存在,既然这里是读/写两种锁,他们的组合模式也就分成了两种:

  1. 读锁与自定义同步器的聚合
  2. 写锁与自定义同步器的聚合
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

这里只是提醒大家,模式没有变,不要被读/写两种锁迷惑

基本示例

说了这么多,如果你忘了前序知识,整体理解感觉应该是有断档的,所以先来看个示例(模拟使用缓存)让大家对 ReentrantReadWriteLock 有个直观的使用印象

public class ReentrantReadWriteLockCache {

    // 定义一个非线程安全的 HashMap 用于缓存对象
    static Map<String, Object> map = new HashMap<String, Object>();
    // 创建读写锁对象
    static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    // 构建读锁
    static Lock rl = readWriteLock.readLock();
    // 构建写锁
    static Lock wl = readWriteLock.writeLock();

    public static final Object get(String key) {
        rl.lock();
        try{
            return map.get(key);
        }finally {
            rl.unlock();
        }
    }

    public static final Object put(String key, Object value){
        wl.lock();
        try{
            return map.put(key, value);
        }finally {
            wl.unlock();
        }
    }
}

你瞧,使用就是这么简单。但是你知道的,AQS 的核心是锁的实现,即控制同步状态 state 的值,ReentrantReadWriteLock 也是应用AQS的 state 来控制同步状态的,那么问题来了:

一个 int 类型的 state 怎么既控制读的同步状态,又可以控制写的同步状态呢?

显然需要一点设计了

读写状态设计

如果要在一个 int 类型变量上维护多个状态,那肯定就需要拆分了。我们知道 int 类型数据占32位,所以我们就有机会按位切割使用state了。我们将其切割成两部分:

  1. 高16位表示读
  2. 低16位表示写

所以,要想准确的计算读/写各自的状态值,肯定就要应用位运算了,下面代码是 JDK1.8,ReentrantReadWriteLock 自定义同步器 Sync 的位操作

abstract static class Sync extends AbstractQueuedSynchronizer {
       

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;


        static int sharedCount(int c) { 
          return c >>> SHARED_SHIFT; 
        }

        static int exclusiveCount(int c) { 
          return c & EXCLUSIVE_MASK; 
        }
}

乍一看真是有些复杂的可怕,别慌,咱们通过几道小小数学题就可以搞定整个位运算过程

整个 ReentrantReadWriteLock 中 读/写状态的计算就是反复应用这几道数学题,所以,在阅读下面内容之前,希望你搞懂这简单的运算

基础铺垫足够了,我们进入源码分析吧

源码分析

写锁分析

由于写锁是排他的,所以肯定是要重写 AQS 中 tryAcquire 方法

        protected final boolean tryAcquire(int acquires) {        
            Thread current = Thread.currentThread();
              // 获取 state 整体的值
            int c = getState();
            // 获取写状态的值
            int w = exclusiveCount(c);
            if (c != 0) {
                // w=0: 根据推理二,整体状态不等于零,写状态等于零,所以,读状态大于0,即存在读锁
                  // 或者当前线程不是已获取写锁的线程
                  // 二者之一条件成真,则获取写状态失败
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 根据推理一第 1 条,更新写状态值
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

上述代码 第 19 行 writerShouldBlock 也并没有什么神秘的,只不过是公平/非公平获取锁方式的判断(是否有前驱节点来判断)

你瞧,写锁获取方式就是这么简单

读锁分析

由于读锁是共享式的,所以肯定是要重写 AQS 中 tryAcquireShared 方法

        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
              // 写状态不等于0,并且锁的持有者不是当前线程,根据约定 3,则获取读锁失败
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
              // 获取读状态值
            int r = sharedCount(c);
              // 这个地方有点不一样,我们单独说明
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
              // 如果获取读锁失败则进入自旋获取
            return fullTryAcquireShared(current);
        }

readerShouldBlockwriterShouldBlock 在公平锁的实现上都是判断是否有前驱节点,但是在非公平锁的实现上,前者是这样的:

final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}

final boolean apparentlyFirstQueuedIsExclusive() {
  Node h, s;
  return (h = head) != null &&
    // 等待队列头节点的下一个节点
    (s = h.next)  != null &&
    // 如果是排他式的节点
    !s.isShared()         &&
    s.thread != null;
}

简单来说,如果请求读锁的当前线程发现同步队列的 head 节点的下一个节点为排他式节点,那么就说明有一个线程在等待获取写锁(争抢写锁失败,被放入到同步队列中),那么请求读锁的线程就要阻塞,毕竟读多写少,如果还没有这点判断机制,写锁可能会发生【饥饿】

上述条件都满足了,也就会进入 tryAcquireShared 代码的第 14 行到第 25 行,这段代码主要是为了记录线程持有锁的次数。读锁是共享式的,还想记录每个线程持有读锁的次数,就要用到 ThreadLocal 了,因为这不影响同步状态 state 的值,所以就不分析了, 只把关系放在这吧

到这里读锁的获取也就结束了,比写锁稍稍复杂那么一丢丢,接下来就说明一下那个可能让你迷惑的锁升级/降级问题吧

读写锁的升级与降级

个人理解:读锁是可以被多线程共享的,写锁是单线程独占的,也就是说写锁的并发限制比读锁高,所以

在真正了解读写锁的升级与降级之前,我们需要完善一下本文开头 ReentrantReadWriteLock 的例子

    public static final Object get(String key) {
        Object obj = null;
        rl.lock();
        try{
      // 获取缓存中的值
            obj = map.get(key);
        }finally {
            rl.unlock();
        }
        // 缓存中值不为空,直接返回
        if (obj!= null) {
            return obj;
        }
        
    // 缓存中值为空,则通过写锁查询DB,并将其写入到缓存中
        wl.lock();
        try{
      // 再次尝试获取缓存中的值
            obj = map.get(key);
      // 再次获取缓存中值还是为空
            if (obj == null) {
        // 查询DB
                obj = getDataFromDB(key); // 伪代码:getDataFromDB
        // 将其放入到缓存中
                map.put(key, obj);
            }
        }finally {
            wl.unlock();
        }
        return obj;
    }

有童鞋可能会有疑问

在写锁里面,为什么代码第19行还要再次获取缓存中的值呢?不是多此一举吗?

其实这里再次尝试获取缓存中的值是很有必要的,因为可能存在多个线程同时执行 get 方法,并且参数 key 也是相同的,执行到代码第 16 行 wl.lock() ,比如这样:

线程 A,B,C 同时执行到临界区 wl.lock(), 只有线程 A 获取写锁成功,线程B,C只能阻塞,直到线程A 释放写锁。这时,当线程B 或者 C 再次进入临界区时,线程 A 已经将值更新到缓存中了,所以线程B,C没必要再查询一次DB,而是再次尝试查询缓存中的值

既然再次获取缓存很有必要,我能否在读锁里直接判断,如果缓存中没有值,那就再次获取写锁来查询DB不就可以了嘛,就像这样:

    public static final Object getLockUpgrade(String key) {
        Object obj = null;
        rl.lock();
        try{
            obj = map.get(key);
            if (obj == null){
                wl.lock();
                try{
                    obj = map.get(key);
                    if (obj == null) {
                        obj = getDataFromDB(key); // 伪代码:getDataFromDB
                        map.put(key, obj);
                    }
                }finally {
                    wl.unlock();
                }
            }
        }finally {
            rl.unlock();
        }

        return obj;
    }

这还真是不可以的,因为获取一个写入锁需要先释放所有的读取锁,如果有两个读取锁试图获取写入锁,且都不释放读取锁时,就会发生死锁,所以在这里,锁的升级是不被允许的

读写锁的升级是不可以的,那么锁的降级是可以的嘛?这个是 Oracle 官网关于锁降级的示例 ,我将代码粘贴在此处,大家有兴趣可以点进去连接看更多内容

 class CachedData {
   Object data;
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {
        // 必须在获取写锁之前释放读锁,因为锁的升级是不被允许的
        rwl.readLock().unlock();
        rwl.writeLock().lock();
        try {
          // 再次检查,原因可能是其他线程已经更新过缓存
          if (!cacheValid) {
            data = ...
            cacheValid = true;
          }
                    //在释放写锁前,降级为读锁
          rwl.readLock().lock();
        } finally {
          //释放写锁,此时持有读锁
          rwl.writeLock().unlock(); 
        }
     }

     try {
       use(data);
     } finally {
       rwl.readLock().unlock();
     }
   }
 }

代码中声明了一个 volatile 类型的 cacheValid 变量,保证其可见性。

  1. 首先获取读锁,如果cache不可用,则释放读锁
  2. 然后获取写锁
  3. 在更改数据之前,再检查一次cacheValid的值,然后修改数据,将cacheValid置为true
  4. 然后在释放写锁前获取读锁 此时
  5. cache中数据可用,处理cache中数据,最后释放读锁

这个过程就是一个完整的锁降级的过程,目的是保证数据可见性,听起来很有道理的样子,那么问题来了:

上述代码为什么在释放写锁之前要获取读锁呢?

如果当前的线程A在修改完cache中的数据后,没有获取读锁而是直接释放了写锁;假设此时另一个线程B 获取了写锁并修改了数据,那么线程A无法感知到数据已被修改,但线程A还应用了缓存数据,所以就可能出现数据错误

如果遵循锁降级的步骤,线程A 在释放写锁之前获取读锁,那么线程B在获取写锁时将被阻塞,直到线程A完成数据处理过程,释放读锁,从而保证数据的可见性


那问题又来了:

使用写锁一定要降级吗?

如果你理解了上面的问题,相信这个问题已经有了答案。假如线程A修改完数据之后, 经过耗时操作后想要再使用数据时,希望使用的是自己修改后的数据,而不是其他线程修改后的数据,这样的话确实是需要锁降级;如果只是希望最后使用数据的时候,拿到的是最新的数据,而不一定是自己刚修改过的数据,那么先释放写锁,再获取读锁,然后使用数据也无妨

在这里我要额外说明一下你可能存在的误解:

  • 如果已经释放了读锁再获取写锁不叫锁的升级
  • 如果已经释放了写锁在获取读锁也不叫锁的降级

相信你到这里也理解了锁的升级与降级过程,以及他们被允许或被禁止的原因了

总结

本文主要说明了 ReentrantReadWriteLock 是如何应用 state 做位拆分实现读/写两种同步状态的,另外也通过源码分析了读/写锁获取同步状态的过程,最后又了解了读写锁的升级/降级机制,相信到这里你对读写锁已经有了一定的理解。如果你对文中的哪些地方觉得理解有些困难,强烈建议你回看本文开头的两篇文章,那里铺垫了非常多的内容。接下来我们就看看在应用AQS的最后一个并发工具类 CountDownLatch 吧

灵魂追问

  1. 读锁也没修改数据,还允许共享式获取,那还有必要设置读锁吗?
  2. 在分布式环境中,你是如何保证缓存数据一致性的呢?
  3. 当你打开看ReentrantReadWriteLock源码时,你会发现,WriteLock 中可以使用 Condition,但是ReadLock 使用Condition却会抛出UnsupportedOperationException,这是为什么呢?
// WriteLock
public Condition newCondition() {
    return sync.newCondition();
}

// ReadLock
public Condition newCondition() {
    throw new UnsupportedOperationException();
}

参考

  1. Java 并发实战
  2. Java 并发编程的艺术
  3. https://www.jianshu.com/p/586...

日拱一兵 | 原创

查看原文

赞 7 收藏 6 评论 1

日拱一兵 关注了用户 · 2020-06-17

AWS_AI开发者社区 @aws_aidevcommunity

AWS_AI 开发者社区是专注于人工智能领域,开发者交流与互动的平台。在这里,你可以分享和获取一切有关人工智能的相关技术和前沿知识,也可以与同行或爱好者们交流探讨,共同成长。
进入 AWS 开发人员中心:https://amazonaws-china.com/c...,深入了解 AWS 并构建您的首个 Web 应用程序!

关注 4266

日拱一兵 发布了文章 · 2020-06-17

精美图文讲解Java AQS 共享式获取同步状态以及Semaphore的应用

| 好看请赞,养成习惯

  • 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
  • If you can NOT explain it simply, you do NOT understand it well enough

看到本期内容这么少,是不是心动了呢?

前言

上一篇万字长文 Java AQS队列同步器以及ReentrantLock的应用 为我们读 JUC 源码以及其设计思想做了足够多的铺垫,接下来的内容我将重点说明差异化,如果有些童鞋不是能很好的理解文中的一些内容,强烈建议回看上一篇文章,搞懂基础内容,接下来的阅读真会轻松加愉快


AQS 中我们介绍了独占式获取同步状态的多种情形:

  • 独占式获取锁
  • 可响应中断的独占式获取锁
  • 有超时限制的独占式获取锁

AQS 提供的模版方法里面还差共享式获取同步状态没有介绍,所以我们今天来揭开这个看似神秘的面纱

AQS 中的共享式获取同步状态

独占式是你中没我,我中没你的的一种互斥形式,共享式显然就不是这样了,所以他们的唯一区别就是:

同一时刻能否有多个线程同时获取到同步状态

简单来说,就是这样滴:

我们知道同步状态 state 是维护在 AQS 中的,抛开可重入锁的概念,我在上篇文章中也提到了,独占式和共享式控制同步状态 state 的区别仅仅是这样:

所以说想了解 AQS 的 xxxShared 的模版方法,只需要知道它是怎么控制 state 的就好了

AQS共享式获取同步状态源码分析

为了帮助大家更好的回忆内容,我将上一篇文章的两个关键内容粘贴在此处,帮助大家快速回忆,关于共享式,大家只需要关注【骚紫色】就可以了

自定义同步器需要重写的方法

AQS 提供的模版方法

故事就从这里说起吧 (你会发现和独占式惊人的相似),关键代码都加了注释

    public final void acquireShared(int arg) {
          // 同样调用自定义同步器需要重写的方法,非阻塞式的尝试获取同步状态,如果结果小于零,则获取同步状态失败
        if (tryAcquireShared(arg) < 0)
              // 调用 AQS 提供的模版方法,进入等待队列
            doAcquireShared(arg);
    }

进入 doAcquireShared 方法:

    private void doAcquireShared(int arg) {
          // 创建共享节点「SHARED」,加到等待队列中
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
              // 进入“自旋”,这里并不是纯粹意义上的死循环,在独占式已经说明过
            for (;;) {
                  // 同样尝试获取当前节点的前驱节点
                final Node p = node.predecessor();
                  // 如果前驱节点为头节点,尝试再次获取同步状态
                if (p == head) {
                      // 在此以非阻塞式获取同步状态
                    int r = tryAcquireShared(arg);
                      // 如果返回结果大于等于零,才能跳出外层循环返回
                    if (r >= 0) {
                          // 这里是和独占式的区别
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

上面代码第 18 行我们提到和独占式获取同步状态的区别,贴心的给大家一个更直观的对比:

差别只在这里,所以我们就来看看 setHeadAndPropagate(node, r) 到底干了什么,我之前说过 JDK 源码中的方法命名绝大多数还是非常直观的,该方法直译过来就是 【设置头并且传播/繁衍】。独占式只是设置了头,共享式除了设置头还多了一个传播,你的疑问应该已经来了:

啥是传播,为什么会有传播这个设置呢?

想了解这个问题,你需要先知道非阻塞共享式获取同步状态返回值的含义:

这里说的传播其实说的是 propagate > 0 的情况,道理也很简单,当前线程获取同步状态成功了,还有剩余的同步状态可用于其他线程获取,那就要通知在等待队列的线程,让他们尝试获取剩余的同步状态

如果要让等待队列中的线程获取到通知,需要线程调用 release 方法实现的。接下来,我们走近 setHeadAndPropagate 一探究竟,验证一下

  // 入参,node: 当前节点
    // 入参,propagate:获取同步状态的结果值,即上面方法中的变量 r
    private void setHeadAndPropagate(Node node, int propagate) {
            // 记录旧的头部节点,用于下面的check
        Node h = head; 
            // 将当前节点设置为头节点
        setHead(node);
        
            // 通过 propagate 的值和 waitStatus 的值来判断是否可以调用 doReleaseShared 方法
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
              // 如果后继节点为空或者后继节点为共享类型,则进行唤醒后继节点
                    // 这里后继节点为空意思是只剩下当前头节点了,另外这里的 s == null 也是判断空指针的标准写法
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

上面方法的大方向作用我们了解了,但是代码中何时调用 doReleaseShared 的判断逻辑还是挺让人费解的,为什么会有这么一大堆的判断,我们来逐个分析一下:

这里的空判断有点让人头大,我们先挑出来说明一下:

排除了其他判断条件的干扰,接下来我们就专注分析 propagate 和 waitStatus 两个判断条件就可以了,这里再将 waitStatus 的几种状态展示在这里,帮助大家理解,【骚粉色】是我们一会要用到的:

propagate > 0

上面已经说过了,如果成立,直接短路后续判断,然后根据 doReleaseShared 的判断条件进行释放

propagate > 0 不成立, h.waitStatus < 0 成立 (注意这里的h是旧的头节点)

什么时候 h.waitStatus < 0 呢?抛开 CONDITION 的使用,只剩下 SIGNAL 和 PROPAGATE,想知道这个答案,需要提前看一下 doReleaseShared() 方法了:

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                      // CAS 将头节点的状态设置为0                
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 设置成功后才能跳出循环唤醒头节点的下一个节点
                      unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         // 将头节点状态CAS设置成 PROPAGATE 状态
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

doReleaseShared() 方法中可以看出:

  • 如果让 h.waitStatus < 0 成立,只能将其设置成 PROPAGATE = -3 的情况,设置成功的前提是 h 头节点 expected 的状态是 0;
  • 如果 h.waitStatus = 0,是上述代码第 8 行 CAS 设置成功,然后唤醒等待中的线程

所以猜测,当前线程执行到 h.waitStatus < 0 的判断前,有另外一个线程刚好执行了 doReleaseShared() 方法,将 waitStatus 又设置成PROPAGATE = -3

这个理解有点绕,我们还是来画个图理解一下吧:

可能有同学还是不太能理解这么写的道理,我们一直说 propagate <> = 0 的情况,propagate = 0 代表的是当时/当时/当时 尝试获取同步状态没成功,但是之后可能又有共享状态被释放了,所以上面的逻辑是以防这种万一,你懂的,严谨的并发就是要防止一切万一,现在结合这个情景再来理解上面的判断你是否豁然开朗了呢?

继续向下看,

前序条件不成立,(h = head) == null || h.waitStatus < 0 注意这里的h是新的头节点)

有了上面铺垫,这个就直接画个图就更好理解啦,其实就是没有那么巧有另外一个线程掺合了

相信到这里你应该理解共享式获取同步状态的全部过程了吧,至于非阻塞共享式获取同步状态带有超时时间获取同步状态,结合本文讲的 setHeadAndPropagate 逻辑和独占式获取同步状态的实现过程过程来看,真是一毛一样,这里就不再累述了,赶紧打开你的 IDE 去验证一下吧

我们分析了AQS 的模版方法,还一直没说 tryAcquireShared(arg) 这个方法是如何被重写的,想要了解这个,我们就来看一看共享式获取同步状态的经典应用 Semaphore

Semaphore 的应用及源码分析

Semaphore 概念

Semaphore 中文多翻译为 【信号量】,我还特意查了一下剑桥辞典的英文解释:

其实就是信号标志(two flags),比如红绿灯,每个交通灯产生两种不同行为

  • Flag1-红灯:停车
  • Flag2-绿灯:行车

在 Semaphore 里面,什么时候是红灯,什么时候是绿灯,其实就是靠 tryAcquireShared(arg) 的结果来表示的

  • 获取不到共享状态,即为红灯
  • 获取到共享状态,即为绿灯

所以我们走近 Semaphore ,来看看它到底是怎么应用 AQS 的,又是怎样重写 tryAcquireShared(arg) 方法的

Semaphore 源码分析

先看一下类结构

看到这里你是否有点跌眼镜,和 ReentrantLock 相似的可怕吧,如果你有些陌生,再次强烈建议你回看上一篇文章 Java AQS队列同步器以及ReentrantLock的应用 ,这里直接提速对比看公平和非公平两种重写的 tryAcquireShared(arg) 方法,没有意外,公平与否,就是判断是否有前驱节点

方法内部只是计算 state 的剩余值,那 state 的初始值是多少怎么设置呢?当然也就是构造方法了:

        public Semaphore(int permits) {
          // 默认仍是非公平的同步器,至于为什么默认是非公平的,在上一篇文章中也特意说明过
        sync = new NonfairSync(permits);
    }
    
    NonfairSync(int permits) {
            super(permits);
    }

super 方法,就会将初始值给到 AQS 中的 state

也许你发现了,当我们把 permits 设置为1 的时候,不就是 ReentrantLock 的互斥锁了嘛,说的一点也没错,我们用 Semaphore 也能实现基本互斥锁的效果


static int count;
//初始化信号量
static final Semaphore s 
    = new Semaphore(1);
//用信号量保证互斥    
static void addOne() {
  s.acquire();
  try {
    count+=1;
  } finally {
    s.release();
  }
}

But(英文听力中的重点),Semaphore 肯定不是为这种特例存在的,它是共享式获取同步状态的一种实现。如果使用信号量,我们通常会将 permits 设置成大于1的值,不知道你是否还记得我曾在 为什么要使用线程池? 一文中说到的池化概念,在同一时刻,允许多个线程使用连接池,每个连接被释放之前,不允许其他线程使用。所以说 Semaphore 可以允许多个线程访问一个临界区,最终很好的做到一个限流/限流/限流 的作用

虽然 Semaphore 能很好的提供限流作用,说实话,Semaphore 的限流作用比较单一,我在实际工作中使用 Semaphore 并不是很多,如果真的要用高性能限流器,Guava RateLimiter 是一个非常不错的选择,我们后面会做分析,有兴趣的可以提前了解一下

关于 Semaphore 源码,就这么三下五除二的结束了

总结

不知你有没有感觉到,我们的节奏明显加快了,好多原来分散的点在被疯狂的串联起来,如果按照这个方式来阅读 JUC 源码,相信你也不会一头扎进去迷失方向,然后沮丧的退出 JUC 吧,然后面试背诵答案,然后忘记,然后再背诵?

跟上节奏,关于共享式获取同步状态,Semaphore 只不过是非常经典的应用,ReadWriteLock 和 CountDownLatch 日常应用还是非常广泛的,我们接下来就陆续聊聊它们吧

灵魂追问

  1. Semaphore 的 permits 设置成1 “等同于” 简单的互斥锁实现,那它和 ReentrantLock 的区别还是挺大的,都有哪些区别呢?
  2. 你在项目中是如何使用 Semaphore 的呢?

参考

  1. Java 并发实战
  2. Java 并发编程的艺术
  3. https://blog.csdn.net/anlian5...

日拱一兵 | 原创

查看原文

赞 6 收藏 5 评论 0