入门小站

入门小站 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑

rumenz.com

个人动态

入门小站 发布了文章 · 2月9日

jstack处理Java中CPU100%的思路流程

模拟问题代码

构造一个死循环,造成CPU使用率100%。
> vim InfiniteLoop.java
public class InfiniteLoop {

    public static void main(String[] args) {
        Runnable target;
        Thread thread=new Thread(new Runnable() {
            @Override
            public void run() {
                long i=0;
                while (true){
                    i++;
                }
            }
        });
        thread.setName("rumenz");
        thread.start();
    }
}

> javac InfiniteLoop.java

运行问题代码

> java InfiniteLoop

发现系统CPU 100%

> top

6076 root      20   0 7096732  18972  10648 S 100.0  0.1   7:42.51 java InfiniteLoop
得到进程号是6076

根据top命令,发现PID为6076的Java进程占用CPU高达100%,出现故障。

找出具体的线程号

> top -Hp 6076

6096 root      20   0 7096732  18972  10648 R 99.7  0.1   9:09.92 java 
得到线程号是6096

将线程号转换成16进制

> printf "%x\n" 6096
17d0

万事具备,开始使用jstack打印堆栈信息


> jstack 6076 | grep 17d0 -A 30

"rumenz" #10 prio=5 os_prio=0 tid=0x00007fe0580f9000 nid=0x17d0 runnable [0x00007fe04431d000]
   java.lang.Thread.State: RUNNABLE
        at InfiniteLoop$1.run(InfiniteLoop.java:11)
        at java.lang.Thread.run(Thread.java:748)

"Service Thread" #9 daemon prio=9 os_prio=0 tid=0x00007fe0580e5800 nid=0x17ce runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread3" #8 daemon prio=9 os_prio=0 tid=0x00007fe0580c8000 nid=0x17cd waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007fe0580c6000 nid=0x17cc waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007fe0580c4000 nid=0x17cb waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007fe0580c1000 nid=0x17ca waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007fe0580bf800 nid=0x17c9 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007fe05808e800 nid=0x17c8 in Object.wait() [0x00007fe044b25000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000076d408ed8> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
        - locked <0x000000076d408ed8> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
        at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:216)
at InfiniteLoop$1.run(InfiniteLoop.java:11),提示出代码11行, 查看源码发现有一个死循环。

总结解决JAVA,CPU 100%的问题

  • top 查找CPU 100%的进程号pid
  • top -Hp pid找出进程pid下最占CPU的线程号tid
  • printf "%x\n" tidtid转换成十六进制 16tid
  • jstack pid | grep 16tid -A 30打印堆栈信息
  • 处理问题代码

关注微信公众号:【入门小站】,解锁更多知识点。

查看原文

赞 0 收藏 0 评论 0

入门小站 发布了文章 · 2月2日

CMS垃圾收集器停顿案例

CMS垃圾收集器从jdk1.6中开始应用,是一个老年代垃圾收集器,在JVM的发展过程中扮演了重要的历史作用,jdk1.7,jdk1.8中都可以开启使用。在jdk9中已经废弃掉了。

CMS垃圾收集器的重要缺点

由于老年代碎片问题,在YGC的时候会发生晋升失败(promotion failures),即使老年代有足够的空间,但是仍然可能导致分配失败,因为没有足够连续的空间,从而触发Concurrent mode Failure,会发生SWTFullGCFullGC相比于CMS这种并发模式的GC需要更长的停顿时间才能完成垃圾回收工作。这会导致严重的停顿服务不可用问题。concurrent mode failure,需要stop-the-wold 降级为GC-Serail Old)。

CMS为什么会产生碎片

CMS垃圾收集器在回收老年代时,采用的是标记清理(Mark-Sweep)算法,它在垃圾回收时并不会压缩堆,时间久了,导致老年代的碎片化问题越来越严重,直到发生单线程的Mark-Sweep Compact GCFullGC,会完全STW。如果堆比较大并且老年代占的空间比较大,STW的时间会持续几秒,十几秒,几十秒。对于应用程序来说就是长时间的停顿,这对于互联网应用的影响是很大的。

分析CMS日志

启动jvm的时候,增加参数-XX:+PrintGCDetails 和 -XX:+PrintGCTimeStamps可以打印出CMS GC的详细日志。-XX:+PrintHeapAtGC 在进行GC的前后打印出堆的信息,-Xloggc:../logs/gc.log 日志文件的输出路径。
-XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps and -XX:+PrintGCApplicationStoppedTime -XX:PrintFLSStatistics=2
{Heap before GC invocations=7430 (full 24):
parnew generation total 134400K, used 121348K[0x53000000, 0x5c600000, 0x5c600000)
eden space 115200K, 99% used [0x53000000, 0x5a07e738, 0x5a080000)
from space 19200K, 32% used [0x5a080000, 0x5a682cc0, 0x5b340000)
to space 19200K, 0% used [0x5b340000, 0x5b340000, 0x5c600000)
concurrent mark-sweep generation total 2099200K, used 1694466K [0x5c600000, 0xdc800000, 0xdc800000)
concurrent-mark-sweep perm gen total 409600K, used 186942K [0xdc800000, 0xf5800000, 0xfbc00000)
10628.167: [GC Before GC:
Statistics for BinaryTreeDictionary:
------------------------------------
Total Free Space: 103224160
Max Chunk Size: 5486
Number of Blocks: 57345
Av. Block Size: 1800
Tree Height: 36 <---- High fragmentation
Statistics for IndexedFreeLists:
--------------------------------
Total Free Space: 371324
Max Chunk Size: 254
Number of Blocks: 8591 <---- High fragmentation
Av. Block Size: 43
free=103595484
frag=1.0000 <---- High fragmentation
Before GC:
Statistics for BinaryTreeDictionary:
------------------------------------
Total Free Space: 0
Max Chunk Size: 0
Number of Blocks: 0
Tree Height: 0
Statistics for IndexedFreeLists:
--------------------------------
Total Free Space: 0
Max Chunk Size: 0
Number of Blocks: 0
free=0 frag=0.0000
10628.168: [ParNew (promotion failed) Desired survivor size 9830400 bytes, new threshold 1 (max 1)
- age 1: 4770440 bytes, 4770440 total: 121348K->122157K(134400K), 0.4263254secs]
10628,594: [CMS10630.887: [CMS-concurrent-mark: 7.286/8.682 secs] [Times: user=14.81, sys=0.34, real=8.68 secs]
(concurrent mode failure):1698044K->625427K(2099200K), 17.1365396 secs]
1815815K->625427K(2233600K), [CMS Perm : 186942K->180711K(409600K)]
 
After GC:
Statistics for BinaryTreeDictionary:
------------------------------------
Total Free Space: 377269492
Max Chunk Size:
377269492
Number of Blocks: 1 <---- No fragmentation
Av. Block Size: 377269492
Tree Height: 1 <---- No fragmentation
Statistics for IndexedFreeLists:
--------------------------------
Total Free Space: 0
Max Chunk Size: 0
Number of Blocks: 0
free=377269492
frag=0.0000 <---- No fragmentation
After GC:
Statistics for BinaryTreeDictionary:
------------------------------------
Total Free Space: 0
Max Chunk Size: 0
Number of Blocks: 0
Tree Height: 0
Statistics for IndexedFreeLists:
--------------------------------
Total Free Space: 0
Max Chunk Size: 0
Number of Blocks: 0
free=0 frag=0.0000
, 17.5645589 secs] [Times: user=17.82 sys=0.06, real=17.57 secs]
Heap after GC invocations=7431 (full 25):
parnew generation total 134400K, used 0K [0x53000000, 0x5c600000, 0x5c600000)
eden space 115200K, 0% used [0x53000000, 0x53000000, 0x5a080000)
from space 19200K, 0% used [0x5b340000, 0x5b340000, 0x5c600000)
to space 19200K, 0% used [0x5a080000, 0x5a080000, 0x5b340000)
concurrent mark-sweep generation total 2099200K, used 625427K [0x5c600000, 0xdc800000, 0xdc800000)
concurrent-mark-sweep perm gen total 409600K, used 180711K [0xdc800000, 0xf5800000, 0xfbc00000)
}
Total time for which application threads were stopped: 17.5730653 seconds
由于碎片率非常高,从而导致promotion failure,然后发生concurrent mode failure,触发的FullGC总计花了17.1365396秒才完成。

操作系统内存不够,使用了swap,导致CMS长时间停顿

操作系统使用了swap,可能导致GC停顿时间更长,这些停顿可能是几秒,甚至几十秒级别。

系统配置了允许使用swap空间,操作系统可能把JVM进程的非活动内存页移到swap空间,从而释放内存给当前活动进程(可能是操作系统上其他进程,取决于系统调度)。Swapping由于需要访问磁盘,所以相比物理内存,它的速度慢的令人发指。所以,如果在GC的时候,系统正好需要执行Swapping

{Heap before GC invocations=132 (full 0):
par new generation total 2696384K, used 2696384K [0xfffffffc20010000, 0xfffffffce0010000, 0xfffffffce0010000)
eden space 2247040K, 100% used [0xfffffffc20010000, 0xfffffffca9270000, 0xfffffffca9270000)
from space 449344K, 100% used [0xfffffffca9270000, 0xfffffffcc4940000, 0xfffffffcc4940000)
to space 449344K, 0% used [0xfffffffcc4940000, 0xfffffffcc4940000, 0xfffffffce0010000)
concurrent mark-sweep generation total 9437184K, used 1860619K [0xfffffffce0010000, 0xffffffff20010000, 0xffffffff20010000)
concurrent-mark-sweep perm gen total 1310720K, used 511451K [0xffffffff20010000, 0xffffffff70010000, 0xffffffff70010000)
2020-07-17T03:58:06.601-0700: 51522.120: [GC Before GC: :2696384K->449344K(2696384K), 29.4779282 secs] 4557003K->2326821K(12133568K) ,29.4795222 secs] [Times: user=915.56, sys=6.35, real=29.48 secs]
最后一行[Times: user=915.56, sys=6.35, real=29.48 secs]中real就是YGC时应用真实的停顿时间。

YGC时 vmstat命令输出

r b w swap free re mf pi po fr de sr s0 s1 s2 s3 in sy cs us sy id
0 0 0 77611960 94847600 55 266 0 0 0 0 0 0 0 0 0 3041 2644 2431 44 8 48
0 0 0 76968296 94828816 79 324 0 18 18 0 0 0 0 1 0 3009 3642 2519 59 13 28
1 0 0 77316456 94816000 389 2848 0 7 7 0 0 0 0 2 0 40062 78231 61451 42 6 53
2 0 0 77577552 94798520 115 591 0 13 13 0 0 13 12 1 0 4991 8104 5413 2 0 98
YGC总共花了29.48才完成,从上面看出系统在此期间使用了600多Mb的swap分区,这就意味着,在GC的时候,内存中的一些页被移到了swap空间,这个内存页不一定属于JVM进程,可能是其他操作系统上的其他进程。

操作系统上可用物理内容不足以运行系统上所有的进程,解决办法就是尽可能运行更少的进程,增加RAM从而提升系统的物理内存。在这个例子中,Old区有9G,但是只使用了1.8G(mark-sweep generation total 9437184K, used 1860619K)。我们可以适当的降低Old区的大小以及整个堆的大小,从而减少内存压力,最小化系统上的应用发生swapping的可能。

堆空间不足

如果应用程序需要的堆内存比我们设定的Xms大,也会导致频繁的GC,严重的情况会导致OOM。由于堆空间不足,对象分配失败,JVM就要调用GC尝试回收已经分配的空间,但是GC不能释放更多的内存空间,又导致下一次GC

应用运行时,频繁的FullGC会引起长时间停顿,在下面这个例子中,Perm空间(永久代)几乎是满的,并且在Perm区尝试分配内存也都失败了,从而触发FullGC:

永久代:这个区域会存储包括类定义、结构、字段、方法(数据及代码)以及常量在内的类相关数据。它可以通过(以下两个是非堆区配置参数)-XX:PermSize及-XX:MaxPermSize来进行调节。若永久代(Perm Gen)空间用完,会导致java.lang.OutOfMemoryError: PermGenspace的异常。而且从JDK8开始,永久代被元空间所取代。

166687.013: [Full GC [PSYoungGen:126501K->0K(922048K)] [PSOldGen: 2063794K->1598637K(2097152K)]2190295K->1598637K(3019200K) [PSPermGen: 165840K->164249K(166016K)],6.8204928 secs] [Times: user=6.80 sys=0.02, real=6.81 secs]
 
166699.015: [Full GC [PSYoungGen:125518K->0K(922048K)] [PSOldGen: 1763798K->1583621K(2097152K)]1889316K->1583621K(3019200K) [PSPermGen: 165868K->164849K(166016K)],4.8204928 secs] [Times: user=4.80 sys=0.02, real=4.81 secs]
如果老年代空间不足时,也会导致频繁的FullGC,解决方案就是扩大老年代永久代的空间。

程序中调用了System.gc

System.gc调用,应用中的一些类里,或者第三方模块中调用System.gc调用从而触发STW的FullGC,也可能会引起非常长时间的停顿。如下GC日志所示,Full GC后面的(System)表示它是由调用System.GC触发的FullGC,并且耗时5.75秒:
164638.058: [Full GC (System) [PSYoungGen: 22789K->0K(992448K)]
[PSOldGen: 1645508K->1666990K(2097152K)] 1668298K->1666990K(3089600K)
[PSPermGen: 164914K->164914K(166720K)], 5.7499132 secs] [Times: user=5.69, sys=0.06, real=5.75 secs]
如果你要关闭通过调用System.gc()触发FullGC,配置JVM参数 -XX:+DisableExplicitGC即可。

总结

  • promotion failed – concurrent mode failure
Minor GC后, Survivor空间容纳不了剩余对象,将要放入老年代,老年代有碎片或者不能容纳这些对象,就产生了concurrent mode failure, 然后进行stop-the-world的Serial Old收集器。

解决办法:-XX:UseCMSCompactAtFullCollection -XX:CMSFullGCBeforeCompaction=5 或者调大新生代或者Survivor空间

  • concurrent mode failure
CMS是和业务线程并发运行的,在执行CMS的过程中有业务对象需要在老年代直接分配,例如大对象,但是老年代没有足够的空间来分配,所以导致concurrent mode failure, 然后需要进行stop-the-world的Serial Old收集器。

解决办法:+XX:CMSInitiatingOccupancyFraction 触发CMS收集器的内存比例,调大老年带的空间。+XX:CMSMaxAbortablePrecleanTime,+XX:CMSFullGCsBeforeCompaction=5,5次Full GC 后压缩old generation一次

关注微信公众号:【入门小站】,解锁更多知识点。

查看原文

赞 0 收藏 0 评论 0

入门小站 发布了文章 · 1月31日

jconsole和jstack定位死锁问题

什么是死锁

死锁问题是多线程特有的问题,它可以被认为是线程间切换消耗系统性能的一种极端情况。 在死锁时,线程间相互等待资源,而又不释放自身的资源,导致无穷无尽的等待,其结果是系统任务永远无法执行完成。 死锁问题是在多线程开发中应该坚决避免和杜绝的问题.

死锁示例代码

package com.rumenz.learn.deadLock;

public class RumenzThread implements Runnable{
    int a,b;

    public RumenzThread(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public void run() {
       //Integer.valueOf(a) 包装成对象
        synchronized (Integer.valueOf(a)){
            try{
                //睡眠3秒,增加死锁的几率
                Thread.sleep(3000);

            }catch (Exception e){
                e.printStackTrace();
            }
            synchronized (Integer.valueOf(b)){
                System.out.println("a+b="+(a+b));
            }
        }

    }
}

package com.rumenz.learn.deadLock;

public class DeadLock {

    public static void main(String[] args) {
        new Thread(new RumenzThread(1, 2)).start();
        new Thread(new RumenzThread(2, 1)).start();

    }
}

运行程序使用jstack -l pid来定位死锁

先找到死锁程序的进程id

> jps
56993 Jps
56636 Launcher
57066 DeadLock  //这个就是死锁的进程

使用jstack -l 57066来定位死锁

> jstack -l 57066


Found one Java-level deadlock:
=============================
"Thread-1":
  waiting to lock monitor 0x00007fbe6d80de18 (object 0x000000076ab33988, a java.lang.Integer),
  which is held by "Thread-0"
"Thread-0":
  waiting to lock monitor 0x00007fbe6d8106a8 (object 0x000000076ab33998, a java.lang.Integer),
  which is held by "Thread-1"

Java stack information for the threads listed above:
===================================================
"Thread-1":
    at com.rumenz.learn.deadLock.RumenzThread.run(RumenzThread.java:27)
    - waiting to lock <0x000000076ab33988> (a java.lang.Integer)
    - locked <0x000000076ab33998> (a java.lang.Integer)
    at java.lang.Thread.run(Thread.java:748)
"Thread-0":
    at com.rumenz.learn.deadLock.RumenzThread.run(RumenzThread.java:27)
    - waiting to lock <0x000000076ab33998> (a java.lang.Integer)
    - locked <0x000000076ab33988> (a java.lang.Integer)
    at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock. //发现一个死锁
RumenzThread.java:27 定位到大概的代码文件位置。

jconsole定位死锁问题

  • 找到死锁进程

image-20210131232115142

  • 链接,不安全的链接

    image-20210131232205381

  • 选择线程

    image-20210131232346154

  • 点击检测死锁

    image-20210131232434316

关注微信公众号:【入门小站】,解锁更多知识点。

查看原文

赞 0 收藏 0 评论 0

入门小站 发布了文章 · 1月30日

Java高并发之CountDownLatch源码分析

概述

4def1cc7c2f2f3b82772bfd72959a44b_1259x1186

CountDownLatch 允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。简单来说,就是 CountDownLatch 内部维护了一个计数器,每个线程完成自己的操作之后都会将计数器减一,然后会在计数器的值变为 0 之前一直阻塞,直到计数器的值变为 0.

使用方法

这个例子主要演示了,如何利用 CountDownLatch 去协调多个线程同时开始运行。这个时候的 CountDownLatch 中的计数器的现实含义是等待创建的线程个数,每个线程在开始任务之前都会调用 await() 方法阻塞,直到所有线程都创建好,每当一个线程创建好后,都会提交调用 countDown() 方法将计数器的值减一 (代表待创建的线程数减一)。
public static void main(String[] args) {
    Test countDownLatchTest=new Test();
    countDownLatchTest.runThread();
}
//计数器为10,代表有10个线程等待创建
CountDownLatch countDownLatch=new CountDownLatch(10);

/**
 * 创建一个线程
 * @return
 */
private Thread createThread(int i){
    Thread thread=new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                //在此等待,直到计数器变为0
                countDownLatch.await();
                System.out.println("thread"+Thread.currentThread().getName()+"准备完毕"+System.currentTimeMillis());
            }catch (InterruptedException e){
                e.printStackTrace();
            }

        }
    });
    thread.setName("thread-"+i);
    return  thread;
}

public void runThread(){
    ExecutorService executorService= Executors.newFixedThreadPool(10);

    try {
        for(int i=0;i<10;i++){
            Thread.sleep(100);
            executorService.submit(createThread(i));
            //一个线程创建好了,待创建的线程数减一
            countDownLatch.countDown();
        }
    }catch (InterruptedException e){
        e.printStackTrace();
    }

}
下面我们就以这个例子,来解释源码:

源码分析

image-20210130230529549

从锁的分类上来讲,CountDownLatch 其实是一个” 共享锁 “。还有一个需要注意的是 CountDownLath 是响应中断的,如果线程在对锁进行操作的期间发生了中断,会直接抛出 InterruptedException。

源码分析

计数器的本质是什么?

刚才我们也提到了,CountDownLatch 中一个非常重要的东西就是计数器。那么我们首先需要分析的就是源码中哪个部分充当了计数器的角色。
我们通过构造方法来查看:
我们的代码CountDownLatch countDownLatch=new CountDownLatch(10);背后实际上是调用了下面这个方法:
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
而这个 Sync 的实例化又做了什么工作呢?
Sync(int count) {
    setState(count); //就是修改了AQS中的state值
}
现在已经解决了我们的第一个问题,实际上 AQS 中的 state 充当了计数器。

await 方法

  1. await 方法实际上是调用了 sync 的一个方法
public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
}
  1. sync 的void acquireSharedInterruptibly(int arg)的实现如下
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
        //如果线程中断了,则抛异常。
        //证明了之前所说的CountDownLatch是会响应中断的
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
}
  1. 如果没有中断,就会调用tryAcquireShared(arg)
    它的实现非常的简单,如果 state 为 0,就返回 1,否则返回 - 1
protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
}
  1. 如果 state 不为 0,就会返回 - 1,if 条件成立,就会调用doAcquireSharedInterruptibly(arg)
    这个方法的实现,稍微复杂一点,但这个方法也不陌生了,它的功能就是把该线程加入等待队列中并阻塞,但是在入队之后,不一定会立即 park 阻塞,它会判断自己是否是第二个节点,如果是就会再次尝试获取。
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor(); //获取当前节点的前驱节点
                if (p == head) {//前一个节点是头节点
                    int r = tryAcquireShared(arg); //去看一看state是否为0,步骤3分析过
                    if (r >= 0) {
                    //如果state目前为0,就出队
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //进入阻塞队列阻塞,如果发生中断,则抛异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
}
CountDownLatch 的 await 方法比其它几个锁的实现简单得多。不过需要注意的一点就是 CountDownLatch 是会响应中断的,这一点在源码中也有多处体现。

countDown 方法

  1. countDown 方法实际上是调用 sync 中的一个方法
public void countDown() {
        sync.releaseShared(1);
}
  1. boolean releaseShared(int arg)的具体实现如下:
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
}
  1. tryReleaseShared(arg)方法的具体实现如下:
protected boolean tryReleaseShared(int releases) {
               // Decrement count; signal when transition to zero
       for (;;) {//自旋
               int c = getState();
               if (c == 0)//计数器已经都是0了,当然会释放失败咯
                 return false;
               int nextc = c-1;//释放后,计数器减一
               if (compareAndSetState(c, nextc))//CAS修改计数器
                 return nextc == 0;
       }
}
这个方法就是去尝试直接修改 state 的值。如果 state 的修改成功,且修改后的 state 值为0,就会返回 true。就会执行doReleaseShared();方法。
  1. doReleaseShared();的实现如下,它的作用就是state为0的时候,去唤醒等待队列中的线程。
此方法主要用于唤醒后继
private void doReleaseShared() {
    /*
     * 如果head需要通知下一个节点,调用unparkSuccessor
     * 如果不需要通知,需要在释放后把waitStatus改为PROPAGATE来继续传播
     * 此外,我们必须通过自旋来CAS以防止操作时有新节点加入
     * 另外,不同于其他unparkSuccessor的用途,我们需要知道CAS设置状态失败的情况,
     * 以便进行重新检查。
     */
    for (;;) {
        //唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
        //其实就是唤醒上面新获取到共享锁的节点的后继节点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //表示后继节点需要被唤醒
            if (ws == Node.SIGNAL) {
                //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;           
                //执行唤醒操作
                unparkSuccessor(h);
            }
            //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        //如果头结点没有发生变化,表示设置完成,退出循环
        //如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
        if (h == head)                   
            break;
    }
}

关注微信公众号:【入门小站】,解锁更多知识点

查看原文

赞 0 收藏 0 评论 0

入门小站 发布了文章 · 1月28日

Java并发编程之CAS和AQS

什么是CAS

CAS(compare and swap),字面意思比较并交换,是解决多线程并行情况下使用锁造成性能损耗的一种机制.
public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
CAS有三个操作数,valueOffset内存值,expect期望值,update要更新的值。如果内存值(valueOffset)和期望值(expect)是一样的。那么处理器会将该位置的值更新为(update),否则不做任何操作。

CAS 有效地说明了“我认为位置valueOffset应该包含值expect,如果包含该值,则将update放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”在 Java 中,sun.misc.Unsafe类提供了硬件级别的原子操作来实现这个 CAS,java.util.concurrent包下的大量类都使用了这个Unsafe类的 CAS 操作

CAS的应用

java.util.concurrent.atomic包下的类大多数是使用CAS实现的,如AtomicInteger,AtomicBooleanAtomicLong等。
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    private volatile int value;// 初始int大小
    // 省略了部分代码...

    // 带参数构造函数,可设置初始int大小
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }
    // 不带参数构造函数,初始int大小为0
    public AtomicInteger() {
    }

    // 获取当前值
    public final int get() {
        return value;
    }

    // 设置值为 newValue
    public final void set(int newValue) {
        value = newValue;
    }

    //返回旧值,并设置新值为 newValue
    public final int getAndSet(int newValue) {
        /**
        * 这里使用for循环不断通过CAS操作来设置新值
        * CAS实现和加锁实现的关系有点类似乐观锁和悲观锁的关系
        * */
        for (;;) {
            int current = get();
            if (compareAndSet(current, newValue))
                return current;
        }
    }

    // 原子的设置新值为update, expect为期望的当前的值
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    // 获取当前值current,并设置新值为current+1
    public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

    // 此处省略部分代码,余下的代码大致实现原理都是类似的
}
一般,在竞争不是特别激烈的时候,使用该包下的原子操作性能比使用synchronized关键字的方式高效的多。通过查看getAndSet()方法,可知如果资源竞争十分激烈的话,这个for循环可能会持续很久都不能成功跳出。在这种情况下,我们可能需要考虑如何降低对资源的竞争。在较多的场景下,我们可能会使用到这些原子类操作。一个典型应用就是计数,在多线程的情况下需要考虑线程安全问题。
//有线程安全问题
public class Counter {
    private int count;
    public Counter(){}
    public int getCount(){
        return count;
    }
    public void increase(){
        count++;
    }
}
//悲观锁,线程安全,缺点性能差
public class Counter {
    private int count;
    public Counter(){}
    public synchronized int getCount(){
        return count;
    }
    public synchronized void increase(){
        count++;
    }
}
这是悲观锁的实现,如果我们需要获取这个资源,那么我们就给它加锁,其他线程都无法访问该资源,直到我们操作完后释放对该资源的锁。我们知道,悲观锁的效率是不如乐观锁的,上面说了atomic包下的原子类的实现是乐观锁方式,因此其效率会比使用synchronized关键字更高一些,推荐使用这种方式。
//乐观锁,线程安全,性能好
public class Counter {
    private AtomicInteger count = new AtomicInteger();
    public Counter(){}
    public int getCount(){
        return count.get();
    }
    public void increase(){
        count.getAndIncrement();
    }
}

CAS的三大缺点

ABA问题

因为 CAS 需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA 问题的解决思路就是使用版本号,在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A就会变成1A-2B-3A。

从 Java 1.5 开始 JDK 的atomic包里提供了一个类AtomicStampedReference来解决 ABA 问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

循环时间长开销大

CAS 自旋如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,一是它可以延迟流水线执行指令,使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零;二是它可以避免在退出循环的时候因内存顺序冲突而引起 CPU 流水线被清空,从而提高 CPU 的执行效率。

只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就需要用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用 CAS 来操作ij。从 Java 1.5 开始 JDK 提供了AtomicReference类来保证引用对象之间的原子性,我们可以把多个变量放在一个对象里来进行 CAS 操作。

什么是AQS

AQS(AbstractQueuedSychronizer),抽象队列同步器,是JDK下提供的一套给予FIFO等待队列的阻塞锁和相关同步器的一个同步框架。这个抽象类被设计为作为一些可用原子int值来表示状态的同步器的基类。如果我们看过类似CountDownLatch类的源码实现,会发现其内部有一个继承了AbstractQueuedSynchronizer的内部类Sync。可见CountDownLatch是基于 AQS 框架来实现的一个同步器,类似的同步器在 JUC 下还有不少,如Semaphore等。

AQS的应用

QS 管理一个关于状态信息的单一整数,该整数可以表现任何状态。比如,Semaphore用它来表现剩余的许可数,ReentrantLock用它来表现拥有它的线程已经请求了多少次锁;FutureTask用它来表现任务的状态等。

如 JDK 的文档中所说,使用 AQS 来实现一个同步器需要覆盖实现如下几个方法,并且使用getStatesetStatecompareAndSetState这三个方法来操作状态。

boolean tryAcquire(int arg)
boolean tryRelease(int arg)
int tryAcquireShared(int arg)
boolean tryReleaseShared(int arg)
boolean isHeldExclusively()
以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法,支持独占(排他)获取锁的同步器应该实现tryAcquire、 tryRelease、isHeldExclusively;而支持共享获取的同步器应该实现tryAcquireShared、tryReleaseShared、isHeldExclusively。下面以CountDownLatch举例说明基于 AQS 实现同步器,CountDownLatch用同步状态持有当前计数,countDown方法调用 release从而导致计数器递减;当计数器为 0 时,解除所有线程的等待;await调用acquire,如果计数器为 0,acquire会立即返回,否则阻塞。通常用于某任务需要等待其他任务都完成后才能继续执行的情景
public class CountDownLatch {
    /**
     * 基于AQS的内部Sync
     * 使用AQS的state来表示计数count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            // 使用AQS的getState()方法设置状态
            setState(count);
        }

        int getCount() {
            // 使用AQS的getState()方法获取状态
            return getState();
        }

        // 覆盖在共享模式下尝试获取锁
        protected int tryAcquireShared(int acquires) {
            // 这里用状态state是否为0来表示是否成功,为0的时候可以获取到返回1,否则不可以返回-1
            return (getState() == 0) ? 1 : -1;
        }

        // 覆盖在共享模式下尝试释放锁
        protected boolean tryReleaseShared(int releases) {
            // 在for循环中Decrement count直至成功;
            // 当状态值即count为0的时候,返回false表示 signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    // 使用给定计数值构造CountDownLatch
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    // 让当前线程阻塞直到计数count变为0,或者线程被中断
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // 阻塞当前线程,除非count变为0或者等待了timeout的时间。当count变为0时,返回true
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // count递减
    public void countDown() {
        sync.releaseShared(1);
    }

    // 获取当前count值
    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

AQS 原理

AQS 的实现主要在于维护一个volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被阻塞时会进入此队列,此队列称之为CLH队列)。CLH 队列中的每个节点是对线程的一个封装,包含线程基本信息,状态,等待的资源类型等。

CLH结构如下

 *      +------+  prev +-----+       +-----+
 * head |      | <---- |     | <---- |     |  tail
 *      +------+       +-----+       +-----+

image-20210128222104496

简单源码分析

    public final void acquire(int arg) {
        // 首先尝试获取,不成功的话则将其加入到等待队列,再for循环获取
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    // 从clh中选一个线程获取占用资源
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 当节点的先驱是head的时候,就可以尝试获取占用资源了tryAcquire
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    // 如果获取到资源,则将当前节点设置为头节点head
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果获取失败的话,判断是否可以休息,可以的话就进入waiting状态,直到被unpark()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  
   private Node addWaiter(Node mode) {
        // 封装当前线程和模式为新的节点,并将其加入到队列中
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }  
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
                // tail为null,说明还没初始化,此时需进行初始化工作
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 否则的话,将当前线程节点作为tail节点加入到CLH中去
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

关注微信公众号:【入门小站】,解锁更多知识点。

查看原文

赞 0 收藏 0 评论 0

入门小站 发布了文章 · 1月26日

Java线程池ExecutorService中重要的方法

ExecutorService 介绍

image-20210126211305605

ExecutorServicejava线程池定义的一个接口,它在java.util.concurrent包中,在这个接口中定义了和后台任务执行相关的方法。

image-20210126212202708

Java API对ExecutorService接口实现有两个,所以这两个即是线程池的具体实现。
1. ThreadPoolExecutor
2. ScheduledThreadPoolExecutor
ExecutorService还继承了Executor接口。

image-20210126220009540

实线表示继承,需要表示实现

ExecutorService的创建

Java提供了一个工厂类Executors来创建各种线程池。
  • newCachedThreadPool 创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,如果没有可以回收的,则新建线程。
  • newFixedThreadPool 创建一个定长的线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一个定长线程池,可以定时周期性执行任务。
  • newSingleThreadPool 创建一个单线程线程池,它只会用唯一的线程来执行任务,保证所有任务按照指定顺序来执行(FIFO,LIFO)
Executors是一个工厂类,它所有的方法返回的都是ThreadPoolExecutorScheduledThreadPoolExecutor这两个类的实例。

ExecutorService的使用

ExecutorService executorService = Executors.newFixedThreadPool(10);

executorService.execute(new Runnable() {
public void run() {
    System.out.println("入门小站");
}
});

executorService.shutdown();

ExecutorService的执行方法

  • execute(Runnable)
  • submit(Runnable)
  • submit(Callable)
  • invokeAny(...)
  • invokeAll(...)

execute(Runnable) 无法获取执行结果

接收一个Runnable实例,并且异步执行
ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.execute(new Runnable() {
public void run() {
    System.out.println("Asynchronous task");
}
});

executorService.shutdown();
没有办法获取执行结果。

submit(Runnable) 可以判断任务是否完成

submit(Runnable)execute(Runnable)多返回一个Future,可以用来判断提交的任务是否执行完成。
Future future = executorService.submit(new Runnable() {
public void run() {
    System.out.println("Asynchronous task");
}
});

future.get();  //returns null if the task has finished correctly.
如果任务完成,future.get()会返回null,future.get会阻塞。

submit(Callable)可以获取返回结果

submit(Callable)和submit(Runnable)类似,也会返回一个Future对象,但是除此之外,submit(Callable)接收的是一个Callable的实现,Callable接口中的call()方法有一个返回值,可以返回任务的执行结果,而Runnable接口中的run()方法是void的,没有返回值
Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
    System.out.println("Asynchronous Callable");
    return "Callable Result";
}
});

System.out.println("future.get() = " + future.get());
如果任务完成,future.get会返回Callable执行返回的结果,同样future.get()会阻塞。

invokeAny(...)

invokeAny(...)方法接收的是一个Callable的集合,执行这个方法不会返回Future,但是会返回所有Callable任务中其中一个任务的执行结果。这个方法也无法保证返回的是哪个任务的执行结果,反正是其中的某一个
ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
public String call() throws Exception {
    return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
    return "Task 2";
}
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
    return "Task 3";
}
});

String result = executorService.invokeAny(callables);
System.out.println("result = " + result);
executorService.shutdown();
每次执行都会返回一个结果,并且返回的结果是变化的,可能会返回“Task2”也可是“Task1”或者其它。

invokeAll(...)

invokeAll(...)与 invokeAny(...)类似也是接收一个Callable集合,但是前者执行之后会返回一个Future的List,其中对应着每个Callable任务执行后的Future对象。
ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
public String call() throws Exception {
    return "Task 1";
}
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
    return "Task 2";
}
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
      return "Task 3";
   }
});

List<Future<String>> futures = executorService.invokeAll(callables);

for(Future<String> future : futures){
    System.out.println("future.get = " + future.get());
}

executorService.shutdown();

线程池ExecutorService的关闭

如果要关闭ExecutorService中执行的线程,我们可以调用ExecutorService.shutdown()方法。在调用shutdown()方法之后,ExecutorService不会立即关闭,但是它不再接收新的任务,直到当前所有线程执行完成才会关闭,所有在shutdown()执行之前提交的任务都会被执行。

如果我们想立即关闭ExecutorService,我们可以调用ExecutorService.shutdownNow()方法。这个动作将跳过所有正在执行的任务和被提交还没有执行的任务。但是它并不对正在执行的任务做任何保证,有可能它们都会停止,也有可能执行完成。

关注微信公众号:【入门小站】,解锁更多知识点

查看原文

赞 0 收藏 0 评论 0

入门小站 发布了文章 · 1月25日

Java线程池ThreadPoolExecutor源码分析

继承关系

image-20210125212621414

Executor接口

public interface Executor {
    void execute(Runnable command);
}

ExecutorService接口

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
                                    throws InterruptedException;

    
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
                    throws InterruptedException, ExecutionException;

   
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
                    throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService接口继承Executor接口,并增加了submit、shutdown、invokeAll等等一系列方法。

AbstractExecutorService抽象类

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
                              throws InterruptedException, ExecutionException, TimeoutException {...}

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
                            throws InterruptedException, ExecutionException {... }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
                            throws InterruptedException, ExecutionException, TimeoutException {...}

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                                         throws InterruptedException {...}

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
                                        throws InterruptedException {...}

}
AbstractExecutorService抽象类实现ExecutorService接口,并且提供了一些方法的默认实现,例如submit方法、invokeAny方法、invokeAll方法。

像execute方法、线程池的关闭方法(shutdown、shutdownNow等等)就没有提供默认的实现。

构造函数与线程池状态

public ThreadPoolExecutor(int corePoolSize,                             //核心线程数
                              int maximumPoolSize,                      //最大线程数
                              long keepAliveTime,                       //线程存活时间
                              TimeUnit unit,                            //keepAliveTime的单位
                              BlockingQueue<Runnable> workQueue,        //阻塞任务队列
                              ThreadFactory threadFactory,              //创建线程工厂
                              RejectedExecutionHandler handler)         //拒绝任务的接口处理器
     { 
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

线程池状态

//记录线程池状态和线程数量(总共32位,前三位表示线程池状态,后29位表示线程数量)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程数量统计位数29  Integer.SIZE=32 
private static final int COUNT_BITS = Integer.SIZE - 3;
//容量 000 11111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//运行中 111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
//关闭 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//停止 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
//整理 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
//终止 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

//获取运行状态(获取前3位)
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//获取线程个数(获取后29位)
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
int是4个字节,32位
RUNNING:接受新任务并且处理阻塞队列里的任务
SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
STOP:拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
TIDYING:所有任务都执行完(包含阻塞队列里面任务),当前线程池活动线程为0,将要调用terminated方法
TERMINATED:终止状态。terminated方法调用完成以后的状态

线程池状态转换:
RUNNING -> SHUTDOWN:显式调用shutdown()方法, 或者隐式调用了finalize()方法
(RUNNING or SHUTDOWN) -> STOP:显式调用shutdownNow()方法
SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候
STOP -> TIDYING:当线程池为空的时候
TIDYING -> TERMINATED:当 terminated() hook 方法执行完成时候

submit方法和execute方法的区别

submit方法

  • 调用submit方法,传入Runnable或者Callable对象
  • 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
  • 将传入的对象转换为RunnableFuture对象
  • 执行execute方法,传入RunnableFuture对象
  • 返回RunnableFuture对象
public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
}

execute方法

public void execute(Runnable command) {
        //传进来的线程为null,则抛出空指针异常
        if (command == null)
            throw new NullPointerException();
        //获取当前线程池的状态+线程个数变量
        int c = ctl.get();
        /**
        * 3个步骤
        */
        //1.判断当前线程池线程个数是否小于corePoolSize,小于则调用addWorker方法创建新线程运行,
        //且传进来的Runnable当做第一个任务执行。
        //如果调用addWorker方法返回false,则直接返回
        if (workerCountOf(c) < corePoolSize) {
            //添加一个core线程(核心线程)。此处参数的true,表示添加的线程是core容量下的线程
            if (addWorker(command, true))
                return;
            //刷新数据,乐观锁就是没有锁
            c = ctl.get();
        }
       /*  isRunning方法的定义:
               private static boolean isRunning(int c)
               {return c < SHUTDOWN;}
           2.SHUTDOWN值为0,即如果c小于0,表示在运行;offer用来判断任务是否成功入队*/
        if (isRunning(c) && workQueue.offer(command)) {
             //二次检查
            int recheck = ctl.get();
            //如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                //执行拒绝策略
                reject(command);
            //否则如果当前线程池线程空,则添加一个线程
            else if (workerCountOf(recheck) == 0)
                //添加一个空线程进线程池,使用非core容量线程
                //仅有一种情况,会走这步,core线程数为0,max线程数>0,队列容量>0
                //创建一个非core容量的线程,线程池会将队列的command执行
                addWorker(null, false);
        }
        //线程池停止了或者队列已满,添加maximumPoolSize容量工作线程,如果失败,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

ThreadPoolExecutor.addWorker()

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get(); //获取运行状态和工作数量
            int rs = runStateOf(c); //获取当前线程池运行的状态

            // Check if queue empty only if necessary.
            //条件代表着以下几个场景,直接返回false说明当前工作线程创建失败
            //1.rs>SHUTDOWN 此时不再接收新任务,且所有的任务已经执行完毕
            //2.rs=SHUTDOWN 此时不再接收新任务,但是会执行队列中的任务
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //先判断当前活动的线程数是否大于最大值,如果超过了就直接返回false说明线程创建失败
                //如果没有超过再根据core的值再进行以下判断
                //1. core为true,则判断当前活动的线程数是否大于corePoolSize 
                //2. core为false,则判断当前活动线程数是否大于maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //比较当前值是否和c相同,如果相同,则改为c+1,并且跳出大循环,直接执行Worker进行线程创建
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //检查下当前线程池的状态是否已经发生改变
                //如果已经改变了,则进行外层retry大循环,否则只进行内层的循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //Worker的也是Runnable的实现类
            w = new Worker(firstTask);
            //因为不可以直接在Worker的构造方法中进行线程创建  
            //所以要把它的引用赋给t方便后面进行线程创建
            final Thread t = w.thread;
            if (t != null) {
                //上锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//将创建的线程添加到workers容器中  
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker方法

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable{
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
        
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
}
Worker在ThreadPoolExecutor为一个内部类实现了Runnable接口。只有一个构造方法,在上面的addWorker()中final Thread t = w.thread;知道其实是获取了线程的对象,因为在构造方法中,线程的引用即是它自己。

因此在调用t.start()执行的是(Worker类中的方法):

/** Delegates main run loop to outer runWorker  */
public void run() {
    //这里执行的是ThreadPoolExecutor中的runWorker
    runWorker(this);
}

ThreadPoolExecutor.runWorker()

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//获取Worker中的任务
        w.firstTask = null; //将Woeker中的任务置空
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果当前任务为空  那么就从getTask中获得任务
            /**
             * 如果task不为空,执行完task后则将task置空
             * 继续进入循环,则从getTask中获取任务
             */
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //任务执行前调用的方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //任务结束后调用的方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
}
从上面可以简单理解,就是执行任务,只是执行任务需要进行处理,包括获得任务、任务开始前处理、任务执行、任务执行后处理。但是,关键代码还是里面所调用的一个方法getTask() 。beforeExecute(Thread t, Runnable r)afterExecute(Runnable r, Throwable t)并未在类中有处理业务的逻辑,即可以通过继承线程池的方式来重写这两个方法,这样就能够对任务的执行进行监控。

processWorkerExit

  • 从While循环体中可以知道,当线程运行时出现异常,那么都会退出循环,进入到processWorkerExit()
  • 从getTask()获得结果为null,则也会进到processWorkerExit()

getTask()

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        //死循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //如果设置了allowCoreThreadTimeOut(true)
            //或者当前运行的任务数大于设置的核心线程数
            // timed = true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            /** ------------------------以上的操作跟之前类似----------------------- */
            /** ------------------------关键在于下面的代码------------------------- */
            /** ------------------------从阻塞队列中获取任务----------------------- */
            try {
                Runnable r = timed ?
                    //对于阻塞队列,poll(long timeout, TimeUnit unit) 将会在规定的时间内去任务
                    //如果没取到就返回null
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    //take会一直阻塞,等待任务的添加
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
线程池能够保证一直等待任务而不被销毁,其实就是进入了阻塞状态

ThreadPoolExecutor.processWorkerExit()

    /**
     * @param completedAbruptly
     */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) //如果突然被打断,工作线程数不会被减少
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        //判断运行状态是否在STOP之前
        if (runStateLessThan(c, STOP)) {
            
            if (!completedAbruptly) {//正常退出,也就是task == null
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //新增一个工作线程,代替原来的工作线程
            addWorker(null, false);
        }
}

线程池关闭

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程, 然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别, shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功, 这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定, 通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

shutdown

当调用shutdown方法时,线程池将不会再接收新的任务,然后将先前放在队列中的任务执行完成。
public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //检查权限
            checkShutdownAccess();
            //CAS 更新线程池状态
            advanceRunState(SHUTDOWN);
            //中断所有空闲的线程
            interruptIdleWorkers();
            //关闭,此处是do nothing
            onShutdown();
        } finally {
            mainLock.unlock();
        }
        //尝试结束,上面代码已分析
        tryTerminate();
}

shutdownNow

立即停止所有的执行任务,并将队列中的任务返回
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        //中断所有线程
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

总结

  • 线程池优先使用corePoolSize的数量执行工作任务
  • 如果超过corePoolSize,队列入队
  • 超过队列,使用maximumPoolSize-corePoolSize的线程处理,这部分线程超时不干活就销毁掉。
  • 每个线程执行结束的时候,会判断当前的工作线程和任务数,如果任务数多,就会创建空线程从队列拿任务。
  • 线程池执行完成,不会自动销毁,需要手工shutdown,修改线程池状态,中断所有线程。

分配线程池大小的依据

从以下几个角度考虑
  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
  • 任务的优先级:高、中和低。
  • 任务的执行时间:长、中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如配置cpu个数 +1个线程的线程池。 由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*cpu个数 。混合型的任务,如果可以拆分, 将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量 将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。 优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大, 这样才能更好地利用CPU。

使用有界队列

有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。有一次,我们系统里后台任务线程池的队列和线程池全满了, 不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢, 因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。 如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。 当然,我们的系统所有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样问题时也会影响到其他任务。

线程池监控

如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。 可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用以下属性。
  • taskCount:线程池需要执行的任务数量。
  • completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
  • largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小, 则表示线程池曾经满过。
  • getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
  • getActiveCount:获取活动的线程数。
  • 通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法, 也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。 这几个方法在线程池里是空方法。

关注微信公众号:【入门小站】,解锁更多知识点

查看原文

赞 1 收藏 1 评论 0

入门小站 发布了文章 · 1月24日

Java并发线程之线程池

初始化线程池后,把任务丢进去,等待调度就可以了,使用起来比较方便。
JAVA中Thread是线程类,不建议直接使用Thread执行任务,在并发数量比较多的情况下,每个线程都是执行一个很短的时间就任务结束了,这样频繁创建线程会大大降低系统的效率,因为频繁的创建和销毁线程需要时间。而线程池可以复用,就是执行完一个任务,并不销毁,而是可以继续执行其它任务。

Thread的弊端

  • 每次new Thread() 创建对象,性能差。
  • 线程缺乏统一管理,可能无限制创建线程,相互竞争,有可能占用过多系统资源导致死机或OOM。
  • 不能多执行,定期执行,线程中断

线程池的优点

  • 重用存在的线程,减少对象创建,消亡的开销,性能佳,降低资源消耗。
  • 可以控制最大并发线程数,提高系统资源利用率,同时避免过多资源竞争,避免阻塞,提高响应速度。
  • 提供定时执行,定期执行,单线程,并发数控制等功能,以提高线程的可管理性。
阿里发布的 Java 开发手册中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

Executors利用工厂模式向我们提供了4种线程池实现方式,但是并不推荐使用,原因是使用Executors创建线程池不会传入相关参数而使用默认值所以我们常常忽略了那些重要的参数(线程池大小、缓冲队列的类型等),而且默认使用的参数会导致资源浪费,不可取。

ThreadPoolExecutor介绍

构造函数和参数

image-20210124110243910

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类。
public class ThreadPoolExecutor extends AbstractExecutorService {
    /** 构造函数 1 */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {}
                              
    /** 构造函数 2 */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {}
                              
    /** 构造函数 3 */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {}
                              
    /** 构造函数 4 */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {}
}
ThreadPoolExecutor类中提供了四个构造方法,在构造函数4中,参数最多,通过观察其他3个构造函数,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

img

构造器中各个参数的含义

corePoolSize 核心线程池的大小,在创建了线程池后,默认情况下,线程池中没有任何的线程池,而是等任务过来了再去创建线程执行任务。除非调用了预创建线程的方法,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。当线程池中的线程数量到达corePoolSize后,就会把到达的任务放到缓存队列里面。
  • prestartCoreThread() : 预创建一个核心线程,使其闲置等待工作。
  • prestartAllCoreThreads() : 启动所有核心线程,导致它们空闲地等待工作。
maxnumPoolSize 线程池中最大的线程数,是一个非常重要的参数,它表示在线程池中最多能创建多少线程。

keepAliveTime 表示线程在没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,即当线程池中的线程数大于corePoolSize,如果一个线程的空闲时间达到keepAliveTime,则会终止直到线程池中的线程数量不大于corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中线程数不大于corePoolSize时,keepAliveTime参数也会启作用,直到线程池中的线程数为0。

unit 参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性。

  • TimeUnit.DAYS : 以 天 为单位 ;
  • TimeUnit.HOURS : 以 小时 为单位 ;
  • TimeUnit.MINUTES : 以 分钟 为单位 ;
  • TimeUnit.SECONDS : 以 秒 为单位 ;
  • TimeUnit.MILLISECONDS : 以 毫秒 为单位 ;
  • TimeUnit.MICROSECONDS : 以 微秒 为单位 ;
  • TimeUnit.NANOSECONDS : 以 纳秒 为单位 ;
workQueue一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般有以下几种选择。
  • ArrayBlockingQueue:基于数组的先进先出队列,创建时必须指定大小。
  • LinkedBlockingQueue:基于链表的先进先出队列,若果创建时没有指定此队列的大小,则默认为Integer.MAX_VALUE
  • SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是直接新建一个线程来执行新的任务。
threadFactory线程工厂,主要用来创建线程。线程池最重要的一项工作,就是在满足某些条件情况下创建线程。在ThreadPoolExecutor线程池中,创建线程的操作时交给ThreadFactoty来完成。使用线程池,就必须要指定threadFactory。如果我们的构造器中没有指定使用ThreadFactory,这个时候ThreadPoolExecutor就会使用默认的ThreadFactory:DefaultThreadFactory

handler 在ThreadPoolExecutor线程池中还有一个重要的接口:RejectedExecutionHandler。当提交给线程池的某一个新任务无法直接被线程池中“核心线程”直接处理,又无法加入等待队列,也无法创建新的线程执行;又或者线程池已经调用shutdown()方法停止了工作;又或者线程池不是处于正常的工作状态;这时候ThreadPoolExecutor线程池会拒绝处理这个任务,触发创建ThreadPoolExecutor线程池时定义的RejectedExecutionHandler接口的实现,表示当拒绝处理任务时的策略,有以下四种取值,四种值都为其静态内部类:

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行新提交的任务。

ThreadPoolExecutor执行execute方法分下面4种情况

  • 如果当前运行的线程少于corePoolSize,则创建新的线程来执行任务(执行这一步骤需要获取全局锁)
  • 如果运行的线程等于或者多于corePoolSize,则将任务加入到BlockingQueue
  • 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(执行这一步骤需要获取全局锁)
  • 如果创建新线程将当前运行的线程超出maxnumPoolSize,任务被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

关注微信公众号:【入门小站】,解锁更多知识点

查看原文

赞 0 收藏 0 评论 0

入门小站 发布了文章 · 1月23日

Java高并发BlockingQueue重要的实现类二

DelayQueue

DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed元素。
存放到DelayDeque的元素必须继承Delayed接口。Delayed接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了激活日期,该接口强制执行下列两个方法:
  • CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法
  • getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定

DelayQueue使用场景

  • 关闭空闲链接。服务器中,有很多客户端链接,空闲一段时间后需要关闭。
  • 缓存超过了缓存时间,就需要从缓存中移除。

DelayQueue超时订单处理案例

package com.rumenz.learn.delayqueue;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

//DelayQueue里面的元素必须实现Delayed

public class Item<T> implements Delayed {

    private Long expireTime;
    private T data;

    public Item(Long expireTime, T data) {
        this.expireTime = expireTime+System.currentTimeMillis();
        this.data = data;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long d = unit.convert(this.expireTime - System.currentTimeMillis(),unit);
        return d;
    }

    @Override
    public int compareTo(Delayed o) {
        long d=getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS);
        if(d==0){
            return 0;
        }
        return d>0?1:-1;
    }

    public Long getExpireTime() {
        return expireTime;
    }

    public void setExpireTime(Long expireTime) {
        this.expireTime = expireTime;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }
}


// 订单实体类
package com.rumenz.learn.delayqueue;
public class OrderItem {
    private Double orderAmount;
    private String orderNo;
    //0未支付 1支付了
    private Integer orderStatus;

    public OrderItem(Double orderAmount, String orderNo, Integer orderStatus) {
        this.orderAmount = orderAmount;
        this.orderNo = orderNo;
        this.orderStatus = orderStatus;
    }

    public Double getOrderAmount() {
        return orderAmount;
    }

    public void setOrderAmount(Double orderAmount) {
        this.orderAmount = orderAmount;
    }

    public String getOrderNo() {
        return orderNo;
    }

    public void setOrderNo(String orderNo) {
        this.orderNo = orderNo;
    }

    public Integer getOrderStatus() {
        return orderStatus;
    }

    public void setOrderStatus(Integer orderStatus) {
        this.orderStatus = orderStatus;
    }
}

//

package com.rumenz.learn.delayqueue;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;


public class DelayQueueExample {
    //3个线程 1个线程下单 1个线程支付  1个线程关闭超时订单  订单支付超时时间为10s
    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        DelayQueue<Item<OrderItem>> delayeds = new DelayQueue<>();
        ConcurrentMap<String, OrderItem> map = new ConcurrentHashMap<>();

        //下单线程
        executorService.execute(()->{
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

            Integer orderNo=100;
            while (true){
                try{
                    Thread.sleep(3000);
                    Integer amount = new Random().nextInt(1000);
                    OrderItem orderItem=new OrderItem(amount.doubleValue(), String.valueOf(orderNo), 0);
                    Item<OrderItem> item=new Item<>(10*1000L,orderItem);
                    Date date=new Date();
                    date.setTime(item.getExpireTime());

                    System.out.println("=======================下单==========================");

                    System.out.println("生成订单时间:"+simpleDateFormat.format(new Date()));
                    System.out.println("订单编号:"+orderNo);
                    System.out.println("订单金额:"+orderItem.getOrderAmount());
                    System.out.println("支付过期时间:"+simpleDateFormat.format(date));
                    System.out.println("========================下单=========================");


                    map.put(String.valueOf(orderNo),orderItem);
                    orderNo++;
                    delayeds.offer(item);


                }catch (Exception e){
                    e.printStackTrace();
                }

            }

        });
        //支付线程
        executorService.execute(()->{
            while (true){
                try {
                    //随机等待 再支付
                    Thread.sleep(new Random().nextInt(15)*1000);
                    String orderNo="";
                    Iterator<Map.Entry<String, OrderItem>> iterator = map.entrySet().iterator();
                    if(iterator.hasNext()){
                        OrderItem orderItem = iterator.next().getValue();
                        orderItem.setOrderStatus(1);
                        orderNo=orderItem.getOrderNo();
                        System.out.println("-----------------------支付订单-----------------------");
                        System.out.println("订单支付"+orderNo);
                        System.out.println("支付金额"+orderItem.getOrderAmount());
                        System.out.println("-----------------------支付订单-----------------------");
                    }
                    map.remove(orderNo);

                }catch (Exception e){
                    e.printStackTrace();
                }
            }

        });
        //关系过期的订单
        executorService.execute(()->{
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            while (true){
                try{
                    Item<OrderItem> item = delayeds.take();
                    OrderItem data = item.getData();
                    Date date=new Date();
                    date.setTime(item.getExpireTime());
                    if(data.getOrderStatus()==0){
                        System.out.println("########################过期订单########################");
                        System.out.println("订单编号:"+data.getOrderNo());
                        System.out.println("订单金额:"+data.getOrderAmount());
                        System.out.println("订单到期支付时间:"+simpleDateFormat.format(date));
                        System.out.println("########################过期订单########################");
                    }


                    map.remove(data.getOrderNo());
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });
        executorService.shutdown();
    }
}

SynchronousQueue

它是一个特殊的队列交做同步队列,特点是当一个线程往队列里写一个元素,写入操作不会理解返回,需要等待另外一个线程来将这个元素拿走。同理,当一个读线程做读操作的时候,同样需要一个相匹配写线程的写操作。这里的Synchronous指的就是读写线程需要同步,一个读线程匹配一个写线程,同理一个写线程匹配一个读线程。

不像ArrayBlockingQueueLinkedBlockingDeque之类的阻塞队列依赖AQS实现并发操作,SynchronousQueue直接使用CAS实现线程的安全访问。

较少使用到 SynchronousQueue 这个类,不过它在线程池的实现类 ScheduledThreadPoolExecutor 中得到了应用。

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    //内部栈
    static final class TransferStack<E> extends Transferer<E> {}
    //内部队列
    static final class TransferQueue<E> extends Transferer<E> {}
    public SynchronousQueue() {this(false);}
    public SynchronousQueue(boolean fair) {
        transferer = fair ? 
                 new TransferQueue<E>() : new TransferStack<E>();
    }
}

SynchronousQueue代码演示

package com.rumenz.learn.synchronousqueue;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueExample {
    public static void main(String[] args) {
        SynchronousQueue<String> queue = new SynchronousQueue<>();
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(()->{
          try {
              System.out.println(Thread.currentThread().getName()+"put 1");
              queue.put("1");
              System.out.println(Thread.currentThread().getName()+"put 2");
              queue.put("2");
              System.out.println(Thread.currentThread().getName()+"put 3");
              queue.put("3");
              System.out.println(Thread.currentThread().getName()+"put 4");
              queue.put("4");

          }catch (Exception e){
              e.printStackTrace();
          }

        });
        executorService.execute(()->{
            try{
              TimeUnit.SECONDS.sleep(1);
              System.out.println("获取数据:"+queue.take());

              TimeUnit.SECONDS.sleep(1);
              System.out.println("获取数据:"+queue.take());

              TimeUnit.SECONDS.sleep(1);
              System.out.println("获取数据:"+queue.take());


              TimeUnit.SECONDS.sleep(1);
              System.out.println("获取数据:"+queue.take());

            }catch (Exception e){
                e.printStackTrace();
            }

        });
        executorService.shutdown();
    }

}

关注微信公众号:【入门小站】,关注更多知识点

查看原文

赞 0 收藏 0 评论 0

入门小站 发布了文章 · 1月22日

Java高并发BlockingQueue重要的实现类

ArrayBlockingQueue

有界的阻塞队列,内部是一个数组,有边界的意思是:容量是有限的,必须进行初始化,指定它的容量大小,以先进先出的方式存储数据,最新插入的在对尾,最先移除的对象在头部。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
    /** 队列元素 */
    final Object[] items;

    /** 下一次读取操作的位置, poll, peek or remove */
    int takeIndex;

    /** 下一次写入操作的位置, offer, or add */
    int putIndex;

    /** 元素数量 */
    int count;
    
    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     * 它采用一个 ReentrantLock 和相应的两个 Condition 来实现。
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    
    /** 指定大小 */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    /** 
     * 指定容量大小与指定访问策略 
     * @param fair 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {}
    
    /** 
     * 指定容量大小、指定访问策略与最初包含给定集合中的元素 
     * @param c 将此集合中的元素在构造方法期间就先添加到队列中 
     */
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {}
}
  • ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用一个锁对象,由此也意味着两者无法真正并行运行。按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。然而事实上并没有如此,因为ArrayBlockingQueue的数据写入已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。
  • 通过构造函数得知,参数fair控制对象内部是否采用公平锁,默认采用非公平锁。
  • items、takeIndex、putIndex、count等属性并没有使用volatile修饰,这是因为访问这些变量(通过方法获取)使用都在锁内,并不存在可见性问题,如size()
  • 另外有个独占锁lock用来对出入对操作加锁,这导致同时只有一个线程可以访问入队出队。

Put源码分析

/** 进行入队操作 */
public void put(E e) throws InterruptedException {
        //e为null,则抛出NullPointerException异常
        checkNotNull(e);
        //获取独占锁
        final ReentrantLock lock = this.lock;
        /**
         * lockInterruptibly()
         * 获取锁定,除非当前线程为interrupted
         * 如果锁没有被另一个线程占用并且立即返回,则将锁定计数设置为1。
         * 如果当前线程已经保存此锁,则保持计数将递增1,该方法立即返回。
         * 如果锁被另一个线程保持,则当前线程将被禁用以进行线程调度,并且处于休眠状态
         * 
         */
        lock.lockInterruptibly();
        try {
            //空队列
            while (count == items.length)
                //进行条件等待处理
                notFull.await();
            //入队操作
            enqueue(e);
        } finally {
            //释放锁
            lock.unlock();
        }
    }
    
    /** 真正的入队 */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        //获取当前元素
        final Object[] items = this.items;
        //按下一个插入索引进行元素添加
        items[putIndex] = x;
        // 计算下一个元素应该存放的下标,可以理解为循环队列
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //唤起消费者
        notEmpty.signal();
}
这里由于在操作共享变量前加了锁,所以不存在内存不可见问题,加锁后获取的共享变量都是从主内存中获取的,而不是在CPU缓存或者寄存器里面的值,释放锁后修改的共享变量值会刷新到主内存。

另外这个队列使用循环数组实现,所以在计算下一个元素存放下标时候有些特殊。另外insert后调用notEmpty.signal();是为了激活调用notEmpty.await();阻塞后放入notEmpty条件队列的线程。

Take源码分析

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        //这里有些特殊
        if (itrs != null)
            //保持队列中的元素和迭代器的元素一致
            itrs.elementDequeued();
        notFull.signal();
        return x;
}
Take操作和Put操作很类似
//该类的迭代器,所有的迭代器共享数据,队列改变会影响所有的迭代器

transient Itrs itrs = null; //其存放了目前所创建的所有迭代器。

/**
* 迭代器和它们的队列之间的共享数据,允许队列元素被删除时更新迭代器的修改。
*/
class Itrs {
        void elementDequeued() {
            // assert lock.getHoldCount() == 1;
            if (count == 0)
                //队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除
                queueIsEmpty();
            //takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取
            else if (takeIndex == 0)
                takeIndexWrapped();
        }
        
        /**
         * 当队列为空的时候做的事情
         * 1. 通知所有迭代器队列已经为空
         * 2. 清空所有的弱引用,并且将迭代器置空
         */
        void queueIsEmpty() {}
        
        /**
         * 将takeIndex包装成0
         * 并且通知所有的迭代器,并且删除已经过期的任何对象(个人理解是置空对象)
         * 也直接的说就是在Blocking队列进行出队的时候,进行迭代器中的数据同步,保持队列中的元素和迭代器的元素是一致的。
         */
        void takeIndexWrapped() {}
}

Itrs迭代器创建的时机

//从这里知道,在ArrayBlockingQueue对象中调用此方法,才会生成这个对象
//那么就可以理解为,只要并未调用此方法,则ArrayBlockingQueue对象中的Itrs对象则为空
public Iterator<E> iterator() {
        return new Itr();
    }
    
    private class Itr implements Iterator<E> {
        Itr() {
            //这里就是生产它的地方
            //count等于0的时候,创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。
            //否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。
            if (count == 0) {
                // assert itrs == null;
                cursor = NONE;
                nextIndex = NONE;
                prevTakeIndex = DETACHED;
            } else {
                final int takeIndex = ArrayBlockingQueue.this.takeIndex;
                prevTakeIndex = takeIndex;
                nextItem = itemAt(nextIndex = takeIndex);
                cursor = incCursor(takeIndex);
                if (itrs == null) {
                    itrs = new Itrs(this);
                } else {
                    itrs.register(this); // in this order
                    itrs.doSomeSweeping(false);
                }
                prevCycles = itrs.cycles;
                // assert takeIndex >= 0;
                // assert prevTakeIndex == takeIndex;
                // assert nextIndex >= 0;
                // assert nextItem != null;
                }
        }
}

代码演示

package com.rumenz.task;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @className: BlockingQuqueExample
 * @description: TODO 类描述
 * @author: mac
 * @date: 2021/1/20
 **/
public class BlockingQueueExample {

    private static volatile   Boolean flag=false;

    public static void main(String[] args) {



        BlockingQueue blockingQueue=new ArrayBlockingQueue(1024);
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.execute(()->{
             try{
                 blockingQueue.put(1);
                 Thread.sleep(2000);
                 blockingQueue.put(3);
                 flag=true;
             }catch (Exception e){
                 e.printStackTrace();
             }
        });

        executorService.execute(()->{
            try {

                while (!flag){
                    Integer i = (Integer) blockingQueue.take();
                    System.out.println(i);
                }

            }catch (Exception e){
                e.printStackTrace();
            }

        });

        executorService.shutdown();
    }
}

LinkedBlockingQueue

基于链表的阻塞队列,通ArrayBlockingQueue类似,其内部也维护这一个数据缓冲队列(该队列由一个链表构成),当生产者往队列放入一个数据时,队列会从生产者手上获取数据,并缓存在队列的内部,而生产者立即返回,只有当队列缓冲区到达最大值容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞队列,直到消费者从队列中消费掉一份数据,生产者会被唤醒,反之对于消费者这端的处理也基于同样的原理。

LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行的操作队列中的数据,以调高整个队列的并发能力。

如果构造一个LinkedBlockingQueue对象,而没有指定容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量Integer.MAX_VALUE,这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已经被消耗殆尽了。

LinkedBlockingQueue是一个使用链表完成队列操作的阻塞队列。链表是单向链表,而不是双向链表。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
    //队列的容量,指定大小或为默认值Integer.MAX_VALUE
    private final int capacity;
    
    //元素的数量
    private final AtomicInteger count = new AtomicInteger();
    
    //队列头节点,始终满足head.item==null
    transient Node<E> head;
    
    //队列的尾节点,始终满足last.next==null
    private transient Node<E> last;
    
    /** Lock held by take, poll, etc */
    //出队的锁:take, poll, peek 等读操作的方法需要获取到这个锁
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    //当队列为空时,保存执行出队的线程:如果读操作的时候队列是空的,那么等待 notEmpty 条件
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    //入队的锁:put, offer 等写操作的方法需要获取到这个锁
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    //当队列满时,保存执行入队的线程:如果写操作的时候队列是满的,那么等待 notFull 条件
    private final Condition notFull = putLock.newCondition();
    
    //传说中的无界队列
    public LinkedBlockingQueue() {}
    //传说中的有界队列
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    //传说中的无界队列
    public LinkedBlockingQueue(Collection<? extends E> c){}
    
    /**
     * 链表节点类
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - 真正的继任者节点
         * - 这个节点,意味着继任者是head.next
         * - 空,意味着没有后继者(这是最后一个节点)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }
}
通过其构造函数,得知其可以当做无界队列也可以当做有界队列来使用。

这里用了两把锁分别是takeLockputLock,而Condition分别是notEmptynotFull,它们是这样搭配的。

  • 如果需要获取(take)一个元素,需要获取takeLock锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
  • 如果要插入(put)一个元素,需要获取putLock锁,但是获取了锁还不够,如果队列此时已满,还是需要队列不满(notFull)的这个条件(Condition)。
从上面的构造函数中可以看到,这里会初始化一个空的头结点,那么第一个元素入队的时候,队列中就会有两个元素。读取元素时,也是获取头结点后面的一个元素。count的计数值不包含这个头结点。

Put源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {    
    /**
     * 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // 如果你纠结这里为什么是 -1,可以看看 offer 方法。这就是个标识成功、失败的标志而已。
        int c = -1;
        //包装成node节点
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //获取锁定
        putLock.lockInterruptibly();
        try {
            /** 如果队列满,等待 notFull 的条件满足。 */
            while (count.get() == capacity) {
                notFull.await();
            }
            //入队
            enqueue(node);
            //原子性自增
            c = count.getAndIncrement();
            // 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。
            // 哪些线程会等待在 notFull 这个 Condition 上呢?
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
        //解锁
            putLock.unlock();
        }
        // 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),
        // 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作
        if (c == 0)
            signalNotEmpty();
    }
    
    /** 链接节点在队列末尾 */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        // 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素
        //last.next = node;
        //last = node;
        // 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作
        last = last.next = node;
    }
    
    /**
     * 等待PUT信号
     * 仅在 take/poll 中调用
     * 也就是说:元素入队后,如果需要,则会调用这个方法唤醒读线程来读
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();//唤醒
        } finally {
            putLock.unlock();
        }
    }
}

Take源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {   
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //首先,需要获取到 takeLock 才能进行出队操作
        takeLock.lockInterruptibly();
        try {
            // 如果队列为空,等待 notEmpty 这个条件满足再继续执行
            while (count.get() == 0) {
                notEmpty.await();
            }
            //// 出队
            x = dequeue();
            //count 进行原子减 1
            c = count.getAndDecrement();
            // 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    /**
     * 出队
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
    
    /**
     * Signals a waiting put. Called only from take/poll.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
}

ArrayBlockingQueue对比

ArrayBlockingQueue是共享锁,粒度大,入队与出队的时候只能有1个被执行,不允许并行执行。LinkedBlockingQueue是独占锁,入队与出队是可以并行进行的。当然这里说的是读和写进行并行,两者的读读与写写是不能并行的。总结就是LinkedBlockingQueue可以并发读写。

ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。

LinkedBlockingQueue实现一个线程添加文件对象,四个线程读取文件对象

package concurrent;
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class TestBlockingQueue {
  static long randomTime() {
    return (long) (Math.random() * 1000);
  }

  public static void main(String[] args) {
    // 能容纳100个文件
    final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);
    // 线程池
    final ExecutorService exec = Executors.newFixedThreadPool(5);
    final File root = new File("F:\\JavaLib");
    // 完成标志
    final File exitFile = new File("");
    // 读个数
    final AtomicInteger rc = new AtomicInteger();
    // 写个数
    final AtomicInteger wc = new AtomicInteger();
    // 读线程
    Runnable read = new Runnable() {
      public void run() {
        scanFile(root);
        scanFile(exitFile);
      }

      public void scanFile(File file) {
        if (file.isDirectory()) {
          File[] files = file.listFiles(new FileFilter() {
            public boolean accept(File pathname) {
              return pathname.isDirectory()
                  || pathname.getPath().endsWith(".java");
            }
          });
          for (File one : files)
            scanFile(one);
        } else {
          try {
            int index = rc.incrementAndGet();
            System.out.println("Read0: " + index + " "
                + file.getPath());
            queue.put(file);
          } catch (InterruptedException e) {
          }
        }
      }
    };
    exec.submit(read);
    // 四个写线程
    for (int index = 0; index < 4; index++) {
      // write thread
      final int NO = index;
      Runnable write = new Runnable() {
        String threadName = "Write" + NO;
        public void run() {
          while (true) {
            try {
              Thread.sleep(randomTime());
              int index = wc.incrementAndGet();
              File file = queue.take();
              // 队列已经无对象
              if (file == exitFile) {
                // 再次添加"标志",以让其他线程正常退出
                queue.put(exitFile);
                break;
              }
              System.out.println(threadName + ": " + index + " "
                  + file.getPath());
            } catch (InterruptedException e) {
            }
          }
        }
      };
      exec.submit(write);
    }
    exec.shutdown();
  }
}

关注微信公众号:【入门小站】,解锁更多知识点。

查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 14 次点赞
  • 获得 1 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2018-07-05
个人主页被 2.9k 人浏览