Sumkor

Sumkor 查看完整档案

广州编辑  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑

会写点代码

个人动态

Sumkor 关注了用户 · 3月30日

Ressmix @ressmix

SegmentFault上的文章不再更新,抄袭的太多。分布式系列(理论篇、进阶篇、实战篇)共八十篇文章已经全部在公众号和我的主站免费分享。源码篇(Kafka、Zookeeper、Spring Cloud Netflix、Hadoop、Seata)19年中已全部写完,暂不公开,后续可能出版。

关注 370

Sumkor 发布了文章 · 3月29日

阅读 JDK 8 源码:松弛队列 ConcurrentLinkedQueue

ConcurrentLinkedQueue 是一个由链表结构组成的无界非阻塞队列,是 JDK 中唯一一个并发安全的非阻塞队列,使用无锁算法来保证线程安全。为了减少 CAS 操作造成的资源争夺损耗,其链表结构被设计为“松弛”的(Slack)。本文对 ConcurrentLinkedQueue 的入队和出队过程进行图解,直观展示其内部结构。

1. 继承体系

java.util.concurrent.ConcurrentLinkedQueue

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable

继承体系

2. 数据结构

ConcurrentLinkedQueue 的数据结构为链表。

2.1 链表节点

需要注意的是,item 为空表示无效节点,非空表示有效节点。
无效节点是需要从链表中清理掉的节点,ConcurrentLinkedQueue 队列中为什么要存储无效节点呢,继续往下看。

java.util.concurrent.ConcurrentLinkedQueue.Node

private static class Node<E> {
    volatile E item;       // 节点的数据
    volatile Node<E> next; // 下一个节点
    
    /**
     * Constructs a new node.  Uses relaxed write because item can
     * only be seen after publication via casNext.
     */
    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    // 相比 putObjectVolatile(),putOrderedObject() 不保证内存可见性,但是性能较高
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }
}    

2.2 head 和 tail 节点

队列中定义了 head 和 tail 节点。
由于采用了非阻塞算法(non-blocking algorithms),head 和 tail 节点并不严格指向链表的头尾节点,也就是每次入队出队操作并不会及时更新 head 和 tail 节点。
通过规定“不变式”和“可变式”来维护非阻塞算法的正确性。

不变式:并发对象需要一直保持的特性。
不变式是并发对象的各个方法之间必须遵守的“契约”,每个方法在调用前和调用后都必须保持不变式。
采用不变式,就可以隔离的分析每个方法,而不用考虑它们之间所有可能的交互。

基本不变式

在执行方法之前和之后,队列必须要保持的不变式(The fundamental invariants):

  • 当入队插入新节点之后,队列中有一个 next 域为 null 的节点(真正的尾节点)。
  • 从 head 开始遍历队列,可以访问所有 item 域不为 null 的节点(有效节点)。

head 的不变式和可变式

/**
 * A node from which the first live (non-deleted) node (if any)
 * can be reached in O(1) time.
 * Invariants:
 * - all live nodes are reachable from head via succ()
 * - head != null
 * - (tmp = head).next != tmp || tmp != head
 * Non-invariants:
 * - head.item may or may not be null.
 * - it is permitted for tail to lag behind head, that is, for tail
 *   to not be reachable from head!
 */
private transient volatile Node<E> head;

head 的不变式:

  • 所有的有效节点,都能从 head 通过调用 succ() 方法遍历可达。
  • head 不能为 null。
  • head 节点的 next 域不能引用到自身。

head 的可变式:

  • head 节点的 item 域可能为 null,也可能不为 null。
  • 允许 tail 滞后(lag behind)于 head,也就是说:从 head 开始遍历队列,不一定能到达 tail。

tail 的不变式和可变式

/**
 * A node from which the last node on list (that is, the unique
 * node with node.next == null) can be reached in O(1) time.
 * Invariants:
 * - the last node is always reachable from tail via succ()
 * - tail != null
 * Non-invariants:
 * - tail.item may or may not be null.
 * - it is permitted for tail to lag behind head, that is, for tail
 *   to not be reachable from head!
 * - tail.next may or may not be self-pointing to tail.
 */
private transient volatile Node<E> tail;

tail 的不变式:

  • 通过 tail 调用 succ() 方法,最后节点总是可达的。
  • tail 不能为 null。

tail 的可变式:

  • tail 节点的 item 域可能为 null,也可能不为 null。
  • 允许 tail 滞后于 head,也就是说:从 head 开始遍历队列,不一定能到达 tail。
  • tail 节点的 next 域可以引用到自身。

3. 构造函数

默认创建空节点,head 和 tail 都指向该节点。

/**
 * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
 */
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

/**
 * Creates a {@code ConcurrentLinkedQueue}
 * initially containing the elements of the given collection,
 * added in traversal order of the collection's iterator.
 *
 * @param c the collection of elements to initially contain
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}

4. 入队

4.1 源码分析

因为是无界队列,add(e)方法不用抛出异常。不支持添加 null。

java.util.concurrent.ConcurrentLinkedQueue#add

/**
 * Inserts the specified element at the tail of this queue.
 * As the queue is unbounded, this method will never throw
 * {@link IllegalStateException} or return {@code false}.
 *
 * @return {@code true} (as specified by {@link Collection#add})
 * @throws NullPointerException if the specified element is null
 */
public boolean add(E e) {
    return offer(e);
}

入队的核心逻辑:

java.util.concurrent.ConcurrentLinkedQueue#offer

/**
 * Inserts the specified element at the tail of this queue.
 * As the queue is unbounded, this method will never return {@code false}.
 *
 * @return {@code true} (as specified by {@link Queue#offer})
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    // 注意tail不一定是尾节点(甚至tail有可能存在于废弃的链上,后有解释),但是也不妨从tail节点开始遍历链表
    for (Node<E> t = tail, p = t;;) { // 初始时t和p都指向tail节点
        Node<E> q = p.next;
        if (q == null) { // 使用p.next是否为空来判断p是否是尾节点,比较准确
            // p is last node // 进入这里说明此时p是尾节点
            if (p.casNext(null, newNode)) { // 若节点p的下一个节点为null,则设置为newNode
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time 
                // 不管p与t是否相同,都应该casTail。但是这里只在p与t不同时才casTail,导致tail节点不总是尾节点,目的是减少对tail的CAS
                    casTail(t, newNode);  // Failure is OK. // 将尾节点tail由t改为newNode,更新失败了也没关系,因为tail是不是尾节点不重要:)
                return true;
            }
            // Lost CAS race to another thread; re-read next // CAS失败,说明其他线程先一步操作使得p的下一个节点不为null,需重新获取尾节点
        }
        else if (p == q) // 如果p的next等于p,说明p已经出队了,需要重新设置p、t的值
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet. 
            // 1. 若节点t不再是tail,说明其他线程加入过元素(修改过tail),则取最新tail作为t和p,从新的tail节点继续遍历链表
            // 2. 若节点t依旧是tail,说明从tail节点开始遍历链表已经不管用了,则把head作为p,从head节点从头遍历链表(注意这一步造成后续遍历中p!=t成立)
            p = (t != (t = tail)) ? t : head;
            // 这里没有更新tail,仍留在废链上
        else
            // Check for tail updates after two hops. 
            // 进入这里,说明p.next不为null,且p未出队,需要判断:
            // 1. 若p与t相等,则t留在原位,p=p.next一直往下遍历(注意这一步造成后续遍历中p!=t成立)。
            // 2. 若p与t不等,需进一步判断t与tail是否相等。若t不为tail,则取最新tail作为t和p;若t为tail,则p=p.next一直往下遍历。
            // 就是说从tail节点往后遍历链表的过程,需时刻关注tail是否发生变化
            p = (p != t && t != (t = tail)) ? t : q;  
    }
}

更新 tail 节点:

java.util.concurrent.ConcurrentLinkedQueue#casTail

private boolean casTail(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

入队的基本思想:

  1. 从 tail 节点开始遍历到尾节点,若定位到尾节点(p.next == null),则入队。
  2. 遍历过程中,如果遍历到无效节点(p.next == p),需要重新从有效节点(tail 或 head)开始遍历。
  3. 遍历过程中,时刻关注 tail 节点是否无效。若无效了需要重新从最新的 tail 开始遍历,否则继续遍历当前的下一个节点。

需要注意的点:

  1. 入队过程中没有频繁执行 casTail(出队过程不会执行 casTail),因此 tail 位置有滞后,不一定指向尾节点,甚至可能位于废弃的链上。
  2. 使用 p.next == null 来判断尾节点,比使用 tail 准确。
  3. 通过 tail 遍历节点可能会遍历到无效节点,但是从 head 遍历总能访问到有效节点。

4.2 入队过程图示

执行offer(e)入队,tail 并不总是指向尾节点,多个元素入队过程如下:

添加第一个元素(t 与 p 相等,不会更新 tail):

图片.png

添加第二个元素(t 与 p 不相等,更新 tail):

图片.png

添加第三个元素(t 与 p 相等,不会更新 tail):

图片.png

4.3 tail 位于废弃链

由于出队 poll() 逻辑并不会执行 casTail() 来维护 tail 所在位置,因此 tail 可能滞后于 head,甚至位于废弃链上,如下图所示:

图片.png

此时从 tail 往后遍历会访问到无效节点 p,该节点满足 p == p.next

如果想要继续访问到有效节点,需分两种情况:

  1. 从遍历开始至今,tail 的位置无变化,此时需要从 head 节点开始往下才能遍历到有效节点。
  2. 从遍历开始至今,tail 的位置发生了变化,说明其他线程更新了 tail 的位置,此时从新的 tail 开始往下遍历即可。

图片.png

5. 出队

java.util.concurrent.ConcurrentLinkedQueue#poll

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) { // 初始时h和p都指向head节点,从head节点开始遍历链表
            E item = p.item;

            if (item != null && p.casItem(item, null)) { // p.item不为空,把p节点的数据域设为空,返回p节点的数据
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    // 若p.next不为空,则把p.next设为头节点,把h和p出队;若p.next为空,则把p设为头节点,把h出队
                    updateHead(h, ((q = p.next) != null) ? q : p); 
                return item;
            }
            else if ((q = p.next) == null) { // 进入这里,说明p.item必然为空。若p.next也为空,说明队列中没有数据了,需要返回null
                updateHead(h, p); // 把头节点设为p,把h出队
                return null;
            }
            else if (p == q) // 如果p的next等于p,说明p已经出队了,重新从头节点开始遍历
                continue restartFromHead;
            else
                p = q; // p = p.next 继续遍历链表
        }
    }
}

更新 head 节点:

java.util.concurrent.ConcurrentLinkedQueue#updateHead

/**
 * Tries to CAS head to p. If successful, repoint old head to itself
 * as sentinel for succ(), below.
 */
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p)) // 节点h和p不等,且当前头节点为h,则把头节点设为p
        h.lazySetNext(h); // 原头节点h的next指向自身,表示h出队
}

出队的基本思想:

  1. 从 head 节点开始遍历找出首个有效节点(p.item != null),返回该节点的数据(p.item)。
  2. 遍历过程中,如果遍历到尾节点(p.next == null),则返回空。
  3. 遍历过程中,如果遍历到无效节点(p.next == p),说明其他线程修改了 head,需要重新从有效节点(新的 head)开始遍历。

需要注意的是,并不是每次出队时都执行 updateHead() 更新 head 节点:

  1. 当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点。
  2. 只有当 head 节点里没有元素时,出队操作才会更新 head 节点。

采用这种方式同样是为了减少使用 CAS 更新 head 节点的消耗,从而提高出队效率。

5.1 出队过程图示

场景一:队列中具有两个节点,头节点为无效节点。由于 p != h,此时需要把头节点出队。

图片.png

场景二:队列中具有两个节点,头节点为有效节点。由于 p == h,此时不需要把头节点出队。

图片.png

6. 容量

不用定义初始容量,无须扩容,容量最大值为 Integer.MAX_VALUE。

获取队列的容量:从头开始遍历队列中的有效节点,并计数。注意是遍历过程是弱一致的。

java.util.concurrent.ConcurrentLinkedQueue#size

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p)) // 从第一个有数据的节点开始,一直遍历链表
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE) // 自增直到最大值
                break;
    return count;
}

java.util.concurrent.ConcurrentLinkedQueue#succ

/**
 * Returns the successor of p, or the head node if p.next has been
 * linked to self, which will only be true if traversing with a
 * stale pointer that is now off the list.
 */
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next; // 如果p已经出队了,则重新从头节点开始,否则继续遍历下一个节点
}

7. IDEA 调试的问题

编写单元测试,对 ConcurrentLinkedQueue#offer 进行调试。

@Test
public void test() {
    ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
    queue.offer("a");
    queue.offer("b");
    queue.offer("c");
    queue.offer("d");
}

阅读源码可知入队过程并不会修改 head 节点,但是从 IDEA 的 debug 结果看到 head 节点发生了变化!

图片.png

这是因为 IDEA 的 debug 过程会调用 ConcurrentLinkedQueue#toString 导致的。

ConcurrentLinkedQueue#toString 方法会创建迭代器,会调用到 ConcurrentLinkedQueue#first 方法,该方法会将首个有效节点作为头节点。

java.util.concurrent.ConcurrentLinkedQueue.Itr#Itr
java.util.concurrent.ConcurrentLinkedQueue.Itr#advance
java.util.concurrent.ConcurrentLinkedQueue#first

Node<E> first() { // 获取第一个具有非空元素的节点
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

关闭 IDEA debug 视图中自动调用 toString 方法:

图片.png

重新打断点调试,可以看到入队后 head 节点不变:

图片.png

8. 总结

  1. ConcurrentLinkedQueue 是非阻塞队列,采用 CAS 和自旋保证并发安全。
  2. ConcurrentLinkedQueue 的 tail 并不是严格指向尾节点,通过减少出队时对 tail 的 CAS 以提高效率。
  3. ConcurrentLinkedQueue 的 head 所指节点可能是空节点,也可能是数据节点,通过减少出队时对 head 的 CAS 以提高效率。
  4. 采用非阻塞算法,允许队列处于不一致状态(head/tail),通过保证不变式和可变式,来维护非阻塞算法的正确性。
  5. 由于是非阻塞队列,无法使用在线程池中。

作者:Sumkor
链接:https://segmentfault.com/a/11...

查看原文

赞 0 收藏 0 评论 0

Sumkor 发布了文章 · 3月29日

阅读 JDK 8 源码:传递队列 SynchronousQueue

SynchronousQueue 是一个由链表或栈结构组成的阻塞队列,采用无锁算法,并发安全。
每个线程的存入操作必须等待一个取出操作与之匹配(反之亦然),否则当前线程将阻塞在队列中等待匹配。
适用于传递性场景,即生产者线程处理的数据直接传递给消费者线程。

1. 继承体系

java.util.concurrent.SynchronousQueue

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable

继承体系

2. 数据结构

双重栈(Dual stack)或 双重队列(Dual Queue)。

定义了内部类 Transferer,作为栈和队列实现的公共 API。

// 传输器,即两个线程交换元素使用的东西。提供两种实现方式:队列、栈
private transient volatile Transferer<E> transferer;

3. 构造函数

如果是公平模式就使用队列,如果是非公平模式就使用栈,默认使用栈(非公平)。

/**
 * Creates a {@code SynchronousQueue} with nonfair access policy.
 */
public SynchronousQueue() {
    this(false);
}

/**
 * Creates a {@code SynchronousQueue} with the specified fairness policy.
 *
 * @param fair if true, waiting threads contend in FIFO order for
 *        access; otherwise the order is unspecified.
 */
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

4. 属性

/** The number of CPUs, for spin control */
static final int NCPUS = Runtime.getRuntime().availableProcessors();

// 指定超时时间情况下,当前线程最大自旋次数。 
// CPU小于2,不需要自旋,否则自旋32次。 
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; 

// 未指定超时时间情况下,当前线程最大自旋次数。// 16倍
static final int maxUntimedSpins = maxTimedSpins * 16; 

// 指定超时时间情况下,若自旋结束后如果剩余时间大于该值,则阻塞相应时间
static final long spinForTimeoutThreshold = 1000L;

5. 容量

队列容量固定为 0。

/**
 * Always returns {@code true}.
 * A {@code SynchronousQueue} has no internal capacity.
 *
 * @return {@code true}
 */
public boolean isEmpty() {
    return true;
}

/**
 * Always returns zero.
 * A {@code SynchronousQueue} has no internal capacity.
 *
 * @return zero
 */
public int size() {
    return 0;
}

6. 存入取出操作

SynchronousQueue 中的传输器定义了公共方法 Transferer#transfer:

java.util.concurrent.SynchronousQueue.Transferer

/**
 * Shared internal API for dual stacks and queues.
 */
abstract static class Transferer<E> {
    /**
     * Performs a put or take.
     *
     * @param e if non-null, the item to be handed to a consumer;   // 传输的数据元素。非空表示需要向消费者传递数据,为空表示需要向生产者请求数据
     *          if null, requests that transfer return an item
     *          offered by producer.
     * @param timed if this operation should timeout                // 该操作是否会超时
     * @param nanos the timeout, in nanoseconds
     * @return if non-null, the item provided or received; if null, // 非空表示数据元素传递或接收成功,为空表示失败
     *         the operation failed due to timeout or interrupt --  // 失败的原因有两种:1.超时;2.中断,通过 Thread.interrupted 来检测中断
     *         the caller can distinguish which of these occurred
     *         by checking Thread.interrupted.
     */
    abstract E transfer(E e, boolean timed, long nanos);
}

SynchronousQueue 由于继承了 BlockingQueue,遵循方法约定:

     抛出异常    特殊值     阻塞     超时
插入  add(e)     offer(e)  put(e)   offer(e, time, unit)
移除  remove()   poll()    take()   poll(time, unit)
检查  element()  peek()    不可用   不可用

底层都是通过调用 Transferer#transferer 来实现。

add(e)                 transferer.transfer(e, true, 0)
offer(e)               transferer.transfer(e, true, 0)
put(e)                 transferer.transfer(e, false, 0)
offer(e, time, unit)   transferer.transfer(e, true, unit.toNanos(timeout))

remove()               transferer.transfer(null, true, 0)
poll()                 transferer.transfer(null, true, 0)
take()                 transferer.transfer(null, false, 0)
poll(time, unit)       transferer.transfer(null, true, unit.toNanos(timeout))

其中 put 和 take 都是设置为不超时,表示一直阻塞直到完成。

7. 栈实现

栈定义

双重栈(Dual stack),指的是栈中的节点具有两种模式。
TransferStack 进行了扩展,其节点具有三种模式:请求模式、数据模式、匹配模式。

java.util.concurrent.SynchronousQueue.TransferStack

/** Dual stack */
static final class TransferStack<E> extends Transferer<E> {

    /* Modes for SNodes, ORed together in node fields */
    /** Node represents an unfulfilled consumer */
    static final int REQUEST    = 0; // 请求模式,消费者请求数据
    /** Node represents an unfulfilled producer */
    static final int DATA       = 1; // 数据模式,生产者提供数据
    /** Node is fulfilling another unfulfilled DATA or REQUEST */
    static final int FULFILLING = 2; // 匹配模式,表示数据从正一个节点传递给另外的节点
    
    /** Returns true if m has fulfilling bit set. */
    static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
    
    /** The head (top) of the stack */
    volatile SNode head;
    
    boolean casHead(SNode h, SNode nh) {
        return h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
    }

    /**
     * Creates or resets fields of a node. Called only from transfer
     * where the node to push on stack is lazily created and
     * reused when possible to help reduce intervals between reads
     * and CASes of head and to avoid surges of garbage when CASes
     * to push nodes fail due to contention.
     */
    static SNode snode(SNode s, Object e, SNode next, int mode) {
        if (s == null) s = new SNode(e);
        s.mode = mode;
        s.next = next;
        return s;
    }
}

节点定义

  1. 使用 mode 标记该节点的模式。
  2. 当前节点匹配成功,则 match 设置为所匹配的节点。
  3. 当前节点取消匹配,则 match 设置为自身。
  4. 使用 waiter 存储操作该节点的线程,等待匹配时挂起该线程,匹配成功时需唤醒该线程。

java.util.concurrent.SynchronousQueue.TransferStack.SNode

/** Node class for TransferStacks. */
static final class SNode {
    volatile SNode next;        // next node in stack         // 栈中下一个节点
    volatile SNode match;       // the node matched to this   // 当前节点所匹配的节点
    volatile Thread waiter;     // to control park/unpark     // 等待着的线程
    Object item;                // data; or null for REQUESTs // 数据元素
    int mode;                   // 节点的模式:REQUEST、DATA、FULFILLING
    // Note: item and mode fields don't need to be volatile
    // since they are always written before, and read after,
    // other volatile/atomic operations.

    SNode(Object item) {
        this.item = item;
    }

    // 若下一个节点为cmp,将其替换为节点val
    boolean casNext(SNode cmp, SNode val) {
        return cmp == next &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    /**
     * Tries to match node s to this node, if so, waking up thread.
     * Fulfillers call tryMatch to identify their waiters.
     * Waiters block until they have been matched.
     *
     * @param s the node to match
     * @return true if successfully matched to s
     */
    // m.tryMatch(s),表示节点m尝试与节点s进行匹配。注意入参节点s由当前线程所持有,而节点m的线程是阻塞状态的(等待匹配)
    boolean tryMatch(SNode s) {
        if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { // 如果m还没有匹配者,就把s作为它的匹配者,设置m.match为s
            Thread w = waiter;  // 节点m的线程
            if (w != null) {    // waiters need at most one unpark
                waiter = null;
                LockSupport.unpark(w); // 唤醒m中的线程,两者匹配完毕
                // 补充:节点m的线程从TransferStack#awaitFulfill方法中被唤醒,检查得知被匹配成功了,返回匹配的节点
            }
            return true;
        }
        return match == s; // 可能其它线程先一步匹配了m,判断是否是s
    }
    
    /**
     * Tries to cancel a wait by matching node to itself.
     */
    // 将节点的match属性设为自身,表示取消
    void tryCancel() {
        UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
    }

    boolean isCancelled() {
        return match == this;
    }
}    

transfer

基本思想,在循环中尝试三种操作:

  1. 当前栈内为空,或者栈顶节点模式与当前节点模式一致,尝试着把当前节点入栈并且等待匹配。若匹配成功,返回该节点的数据。若取消匹配,则返回空。
  2. 如果栈顶节点模式与当前节点模式互补,尝试把当前节点入栈进行匹配,再把匹配的两个节点弹出栈。
  3. 如果栈顶节点的模式为匹配中,则协助匹配和弹出。

java.util.concurrent.SynchronousQueue.TransferStack#transfer

/**
 * Puts or takes an item.
 */
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    int mode = (e == null) ? REQUEST : DATA; // 元素为空,为请求模式,否则为数据模式(注意,当前节点入栈前不会是匹配模式!)

    for (;;) { // 自旋CAS
        SNode h = head;
        
        // 模式相同
        if (h == null || h.mode == mode) {  // empty or same-mode // 栈顶没有元素,或者栈顶元素跟当前元素是一个模式的(请求模式或数据模式)
            if (timed && nanos <= 0) {      // can't wait // 已超时
                if (h != null && h.isCancelled()) // 头节点h不为空且已取消
                    casHead(h, h.next);     // pop cancelled node // 把h.next作为新的头节点 // 把已经取消的头节点弹出,并进入下一次循环
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) { // 把节点s入栈,作为新的头节点,s.next指向h(因为是模式相同的,所以只能入栈)
                SNode m = awaitFulfill(s, timed, nanos); // 等待匹配,自旋阻塞当前线程
                if (m == s) {               // wait was cancelled // 等待被取消了,需要清除节点s
                    clean(s);               // 出栈
                    return null;
                }
                if ((h = head) != null && h.next == s) // 执行到这里,说明节点s已匹配成功
                    // 若栈顶有数据,头节点为h,下一个节点为s,需要将节点h和s出栈
                    // 因为是等待匹配,这里是等到了其他线程入栈新的节点,跟当前节点s匹配了
                    casHead(h, s.next);     // help s's fulfiller // 把s.next作为新的头节点
                return (E) ((mode == REQUEST) ? m.item : s.item); // 若当前为请求模式,返回匹配到的节点m的数据;否则返回节点s的数据
            }
        } 
        
        // 模式互补(执行到这里,说明栈顶元素跟当前元素的模式不同)
        else if (!isFulfilling(h.mode)) {   // try to fulfill // 这里判断栈顶模式不是FULFILLING(匹配中),说明是互补的
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry // 弹出已取消的栈顶元素,并重试
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 把节点s入栈,并设为匹配模式(FULFILLING|mode的结果符合TransferStack#isFulfilling)
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone 
                        // 如果m为null,说明除了s节点外的节点都被其它线程先一步匹配掉了,就清空栈并跳出内部循环,到外部循环再重新入栈判断
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {    // 如果m和s尝试匹配成功,就弹出栈顶的两个元素s和m
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match  // m和s匹配失败,说明m已经先一步被其它线程匹配了,需要清除
                        s.casNext(m, mn);   // help unlink // 若s的下一个节点为m,则将m换成mn
                }
            }
        } 
        
        // 模式互补(执行到这里,说明栈顶元素跟当前元素的模式不同,且栈顶是匹配模式,说明栈顶和栈顶下面的节点正在发生匹配,当前请求需要做协助工作)
        else {                              // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone // 节点m是h匹配的节点,但是m为空,说明m已经被其它线程先一步匹配了
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match // 协助匹配,如果节点m和h匹配,则弹出h和m
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink // 如果节点m和h不匹配,则将h的下一个节点换为mn(即m已经被其他线程匹配了,需要清除它)
            }
        }
    }
}

注意:

  1. TransferStack#casHead 可以用于入栈,也可以用于出栈。
  2. mode = FULFILLING|mode 得到的结果满足 TransferStack#isFulfilling,说明是匹配模式。
  3. TransferStack#awaitFulfill 是被动等待匹配
  4. TransferStack.SNode#tryMatch 是主动进行匹配
  5. 节点从阻塞中被其他线程唤醒时,一般是位于栈的第二个节点,此时栈的头节点是新入栈的与之匹配的节点。

awaitFulfill

节点s自旋或阻塞,直到被其他节点匹配。

java.util.concurrent.SynchronousQueue.TransferStack#awaitFulfill

/**
 * Spins/blocks until node s is matched by a fulfill operation.
 *
 * @param s the waiting node
 * @param timed true if timed wait
 * @param nanos timeout value
 * @return matched node, or s if cancelled
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 节点s自旋或阻塞,直到被其他节点匹配
    final long deadline = timed ? System.nanoTime() + nanos : 0L; // 到期时间
    Thread w = Thread.currentThread(); // 当前线程
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0); // 自旋次数(自旋结束后再判断是否需要阻塞)
    for (;;) {
        if (w.isInterrupted()) // 当前线程中断了,尝试取消节点s
            s.tryCancel(); // 取消操作,设置 s.match = s
        SNode m = s.match; // 检查节点s是否匹配到了节点m(由其它线程的m匹配到当前线程的s,或者s已取消)
        if (m != null)
            return m; // 如果匹配到了,直接返回m
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) { // 已超时,尝试取消节点s
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0; // 如果spins大于0,则一直自减直到为0,才会执行去elseif的逻辑
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter // 如果s的waiter为null,把当前线程注入进去,并进入下一次自旋
        else if (!timed)
            LockSupport.park(this); // 如果无设置超时,则阻塞等待被其它线程唤醒,唤醒后继续自旋并查看是否匹配到了元素
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos); // 如果设置超时且还有剩余时间,就阻塞相应时间
    }
}

在自旋过程中,执行以下逻辑:

  1. 检查当前线程是否已中断,是则取消当前节点。
  2. 检查当前线程是否已匹配成功,是则返回匹配的节点。
  3. 自旋结束,若允许超时,判断是否到达超时时间,未到达则阻塞剩余时间。
  4. 自旋结束,若不允许超时,则阻塞等待唤醒。

等待匹配的过程中,首先进行自旋再判断是否进入阻塞,是因为线程的阻塞和唤醒操作涉及到系统内核态与用户态之间的切换,比较耗时。

8. 队列实现

队列定义

双重队列(Dual Queue),指的是队列中的节点具有两种模式。
TransferStack 进行了扩展,队列中大部分节点具有两种模式:请求节点、数据节点。
队列的头节点不属于这两种模式,而是空节点(dummy node)。

java.util.concurrent.SynchronousQueue.TransferQueue

/** Dual Queue */
static final class TransferQueue<E> extends Transferer<E> { // 队列实现

    /** Head of queue */
    transient volatile QNode head; // 头节点
    /** Tail of queue */
    transient volatile QNode tail; // 尾节点
    /**
     * Reference to a cancelled node that might not yet have been
     * unlinked from queue because it was the last inserted node
     * when it was cancelled.
     */
    transient volatile QNode cleanMe;

    TransferQueue() {
        QNode h = new QNode(null, false); // initialize to dummy node. // 队列的头节点,是一个空节点
        head = h;
        tail = h;
    }

    /**
     * Tries to cas nh as new head; if successful, unlink
     * old head's next node to avoid garbage retention.
     */
    // CAS将头节点(TransferQueue#head属性的值)从节点h改为节点nh,再将h出队
    void advanceHead(QNode h, QNode nh) {
        if (h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
            h.next = h; // forget old next // 将节点h的next指针设为自身,表示h出队,方便垃圾回收
    }

    /**
     * Tries to cas nt as new tail.
     */
    void advanceTail(QNode t, QNode nt) {
        if (tail == t)
            UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
    }
}    

节点定义

  1. 使用 isData 标记该节点的模式。
  2. 当前节点匹配成功,则 item 设置为所匹配的节点的数据。
  3. 当前节点取消匹配,则 item 设置为自身。
  4. 当前节点出队,则 next 指向自身。
  5. 使用 waiter 存储操作该节点的线程,等待匹配时挂起该线程,匹配成功时需唤醒该线程。

java.util.concurrent.SynchronousQueue.TransferQueue.QNode

/** Node class for TransferQueue. */
static final class QNode {
    volatile QNode next;          // next node in queue     // 队列的下一个节点
    volatile Object item;         // CAS'ed to or from null // 数据元素
    volatile Thread waiter;       // to control park/unpark // 等待着的线程
    final boolean isData;         // true表示为DATA类型,false表示为REQUEST类型

    QNode(Object item, boolean isData) {
        this.item = item;
        this.isData = isData;
    }

    boolean casNext(QNode cmp, QNode val) {
        return next == cmp &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    boolean casItem(Object cmp, Object val) {
        return item == cmp &&
            UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    /**
     * Tries to cancel by CAS'ing ref to this as item.
     */
    void tryCancel(Object cmp) {
        UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
    }

    boolean isCancelled() {
        return item == this;
    }

    /**
     * Returns true if this node is known to be off the queue 
     * because its next pointer has been forgotten due to
     * an advanceHead operation.
     */
    // 如果节点已经出队了,则返回 true。由 TransferQueue#advanceHead 操作将 next 指向自身。 
    boolean isOffList() {
        return next == this;
    }
}

transfer

基本思想,在循环中尝试两种操作:

  1. 如果队列为空,或者尾节点模式与当前节点模式相同,则入队(尾部)并等待匹配。
  2. 如果队列中有等待中的节点,且首个非空节点与当前节点的模式互补,则匹配并出队(头部)。
/**
 * Puts or takes an item.
 */
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        if (h == t || t.isData == isData) { // empty or same-mode // 队列为空,或者队尾节点模式与当前节点模式相同,只能入队
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read // 尾节点已过期,需重新获取尾节点
                continue;
            if (tn != null) {               // lagging tail // 尾节点已过期,需重新获取尾节点
                advanceTail(t, tn);
                continue;
            }
            if (timed && nanos <= 0)        // can't wait // 已超时
                return null;
            if (s == null)
                s = new QNode(e, isData);   // 当前节点设为s
            if (!t.casNext(null, s))        // failed to link in // 把当前节点s插入节点t之后,若失败则重试
                continue;

            advanceTail(t, s);              // swing tail and wait // 将节点s设为新的尾节点
            Object x = awaitFulfill(s, e, timed, nanos); // 节点s等待匹配
            if (x == s) {                   // wait was cancelled // 说明当前节点s已取消,需要出队
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked // 执行到这里,说明节点s匹配成功。如果s尚未出队,则进入以下逻辑
                advanceHead(t, s);          // unlink if head 
                // 已知t是s的上一个节点,这里判断如果t是头节点,则把t出队,把节点s作为新的头节点
                // 下面操作进一步把s设为空节点,即dummy node
                if (x != null)              // and forget fields
                    s.item = s;             // 把节点s的数据域设为自身,表示已取消,不再匹配
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;  // REQUEST类型,返回匹配到的数据x,否则返回e

        } else {                            // complementary-mode // 互补
            QNode m = h.next;               // node to fulfill // 首个非空节点,将它与请求节点匹配
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled // 并不是互补的,说明m已经被先一步匹配了
                x == m ||                   // m cancelled // m已经取消
                !m.casItem(x, e)) {         // lost CAS // 若CAS失败说明m数据元素不为x,即节点m已经被匹配了。若CAS成功则说明匹配成功。
                advanceHead(h, m);          // dequeue and retry // 把头节点出队,把节点m作为新的头节点(dummy node),继续重试
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled // 匹配成功,把节点m作为新的头节点(dummy node)
            LockSupport.unpark(m.waiter);   // 唤醒m上的线程
            return (x != null) ? (E)x : e;
        }
    }
}

注意:

  1. 入队的时候,总是先获取最新的尾节点,说明 tail 是准确的(对比其他并发队列)。
  2. 出队的时候,总是先获取最新的头节点,说明 head 是准确的(对比其他并发队列)。
  3. TransferQueue#awaitFulfill 是被动等待匹配。
  4. m.casItem(x, e)是主动进行匹配,这里 m 为队首第一个非空节点,x 为 m.item,e 为入参数据元素。
  5. 根据 FIFO 规则,当阻塞等待中的节点被唤醒时,表明该节点要么已超时,要么已从队尾排到了队头且匹配成功。
  6. 由于头节点是空节点(dummy node),从队头进行匹配时,不是比较 head 节点,而是比较 head.next 节点。
  7. 从队头匹配成功时,当前节点并不会入队,而是把旧的头节点出队,把匹配的节点设为新的头节点(dummy node)。

awaitFulfill

节点s自旋或阻塞,直到被其他节点匹配。

java.util.concurrent.SynchronousQueue.TransferQueue#awaitFulfill

/**
 * Spins/blocks until node s is fulfilled.
 *
 * @param s the waiting node
 * @param e the comparison value for checking match
 * @param timed true if timed wait
 * @param nanos timeout value
 * @return matched item, or s if cancelled
 */
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { // 节点s等待匹配,其数据元素为e
    /* Same idea as TransferStack.awaitFulfill */
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel(e);
        Object x = s.item; // 检查节点s是否匹配到了数据x,注意这里是跟栈结构不一样的地方!
        if (x != e)
            return x;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0)
            --spins;
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

除了检测是否被匹配成功和取消节点的方式,与 TransferStack#awaitFulfill 不同之外,整体逻辑是一样的。

9. 总结

  1. SynchronousQueue 是一个阻塞队列,采用 CAS 和自旋来保证线程安全。
  2. SynchronousQueue 具有两种实现方式:双重栈(非公平模式)和双重队列(公平模式),默认使用双重栈。
  3. 栈实现的节点具有三种模式:数据节点、请求节点、匹配节点。队列实现的节点具有两种模式:数据节点、请求节点。
  4. 不管是哪种实现,都需要定义节点的匹配状态、取消状态、匹配方式。
  5. 存取操作先跟队列/栈中已存在的节点进行对比,匹配则出队/出栈,不匹配则入队/入栈。栈实现还会协助匹配。
  6. 入队/入栈之后,先自旋一定次数后再使用 LockSupport 来阻塞线程,等待匹配。
  7. SynchronousQueue 不直接存储数据元素(SynchronousQueue#size 固定为 0),内部的链表或栈结构用于暂存阻塞的线程。
  8. 适用于传递性场景,即生产者线程处理的数据直接传递给消费者线程。但是,当消费者消费速度赶不上生产速度,队列会阻塞严重。

作者:Sumkor
链接:https://segmentfault.com/a/11...

查看原文

赞 0 收藏 0 评论 0

Sumkor 赞了文章 · 3月24日

快速上手Grape:为什么每个人都应该有一套自己的脚手架?

作者:Run,总架构师
工作经验 5 年,当前就职行业知名技术公司,担任总架构师职位,擅长前后端技术栈,Ruby、Python、JavaScript 等语言。

内容目录

针对 Grape 框架的改进

深层 expose

你也许知道,expose 可以嵌套。如果传递的是个不带参的块,则会执行嵌套渲染:

class Entities::Article < Grape::Entity
 expose :user do
 expose :name { |article| article.user.name }
 expose :age { |article| article.user.age }
 end
end

如上,会渲染一个如下的数据结构:

{
 "user": {
 "name": "Jim",
 "age": 18
 }
}

如果此时传入 deep: true,则嵌套层绑定的 instance 就不一样了。下面的例子与上面的例子展示了同样的效果:

class Entities::Article < Grape::Entity
 expose :user, deep: true do
 expose :name
 expose :age
 end
end

接口返回值声明

status 用于声明接口的返回值(successfailentity 是特殊情况下的别名)。它有如下几种基本的调用形式:

status 200 do
 expose :article, using: Entities::Article
end
​
status 200, '返回文章数据' do
 expose :article, using: Entities::Article
end
​
status 400, '请求参数有误' do
 expose :code, desc: '错误码'
 expose :message, desc: '错误消息'
end
​
success '请求成功' do
 expose :article, using: Entities::Article
end
​
fail '请求失败' do
 expose :code, desc: '错误码'
 expose :message, desc: '错误消息'
end
​
entity do
 expose :article, using: Entities::Article
end

上述声明主要起两个作用:

  1. 在接口逻辑中调用 present 方法不用显示地指定 Entity 类型,它是自动解析的。

    以前,你必须这样调用:

    present :article, article, with: Entities::Article

    现在,只用这样:

    present :article, article

    因为在 status 声明中它已经知道如何渲染 article 实体了。

  2. status 声明可对应生成文档。

参数、返回值一体化

Grape::Entity 新加一个 to_params 方法,使得你可以在参数声明中重复利用:

params do
 requires :article, type: Hash do
 optional :all, using: Entities::Article.to_params
 end
end

它比 Grape::Entity.documentation 更好用,做了如下改进:

  1. type 可以写成字符串:

      expose :foo, documentation: { type: 'string' }
  2. 可混用额外参数,如同时指定 param_type 参数:

      expose :foo, documentation: { param_type: 'body' }
  3. 合理处理了 is_array

     expose :bar, documentation: { type: String, is_array: true }

针对测试的改进

现在可以用编程 style 的方式测试接口的返回值了,不需要再测试诸如 JSON 字符串、XML 文本之类的。如果像下面这样实现接口:

present :article, article

就可以像下面这样测试:

get '/article/1'
assert_equal 200, last_response.status
assert_equal articles(:one), presents(:article)

注意,这个功能不是实现在框架中的,需要克隆脚手架项目:

git clone https://github.com/run27017/grape-app-demo.git

新手如何上手

如果你是完全的新手,建议先从熟悉 Grape 框架开始。我建议您阅读我仓库下的文档。在您熟悉了 Grape 框架以后,再阅读以上我对 Grape 框架改进的部分。关于整个框架的设计理念,可阅读后文。

项目上手就从我提供的脚手架项目开始,所有功能都集成进来了:

git clone https://github.com/run27017/grape-app-demo.git

理念

面向文档的开发

当今环境下,有许多的开发范式供后端开发者选择,例如_测试驱动开发_、_行为驱动开发_、_敏捷软件开发_等等。与之相对的,我提出了一个新的想法,我将其称为_面向文档的开发_。

写 API 项目的同时是要准备文档的。我不知道大家是如何准备文档的,但往往都逃不出一个怪圈:同一个接口,我的实现要写一份,我的文档也要同时写一份。我常常在想,为什么我在写接口的同时不能将文档同步地完成呢?换个角度想,接口文档的契约精神为何不能自动地成为实现的一部分?如果,我能发明一个 DSL,在编写文档的同时就能够制约接口的行为,那不正是我想要的吗?

说干就干!

我发现 Grape 框架就已经提供了类似的 DSL 了。例如你在制定参数时可以像这样:

params do
 requires :user, type: Hash do
 requires :name, type: String, desc: '用户的姓名'
 requires :age, type: Integer, desc: '用户的年龄'
 end
end

上面的代码就可以对参数进行制约,限制参数为 nameage 两个字段,并分别限制它们的类型为 StringInteger. 与此同时,一个名为 grape-swagger 的库可以将 params 的宏定义渲染成 Swagger 文档的一部分。完美,文档和实现在这里结合了。

另外,Grape 框架提供了 desc 宏,它是一个纯文档的声明供第三方库读取,不会对接口行为产生任何影响。

desc '创建新用户' do
 tags 'users'
 entity Entities::User
end

但是,毕竟 Grape 框架不是完全的面向文档的开发框架,它有很多重要的使命,所以它和文档的无缝衔接也就仅限于此了。你能看到,params 宏是个完美结合的范例,desc 宏很可惜只与文档渲染有关,然后就别无其他了。

鉴于 Grape 框架是个开源框架,修改它以添加几个小零件还是很简易的一件事。我用了几天的时间添加了一个 status 宏,可以用它来声明返回值:

status 200 do
  expose :user, deep: true do
    expose :id, documentation: { type: Integer, desc: '用户的 id' }
    expose :name, documentation: { type: String, desc: '用户的姓名' }
    expose :age, documentation: { type: Integer, desc: '用户的年龄' }
  end
end

上述声明主要起两个作用:

  1. 在接口逻辑中调用 present 方法不用显示地指定 Entity 类型,它是自动解析的。

    以前,你必须这样调用:

    present :user, user, with: Entities::User

    现在,只用这样:

    present :user, user

    因为在 status 声明中它已经知道如何渲染 user 实体了。

  2. grape-swagger 库可以解析 status 宏生成文档。

一切还只是冰山一角。

你真的不需要 Controller 测试吗?

有关接口的单元测试,有两个观点争论不休:接口测试应该是针对 Integration 测试还是 Controller 测试?Integration 测试像是一个黑匣子,开发者调用接口,然后针对接口返回的视图进行测试。而 Controller 测试也会一样地调用接口,但会测到内部的状态。

通过下面的两个案例直观地感受一下 Controller 测试和 Integration 测试在 Rails 框架中的不同写法。

在早期的 Rails 版本中,是有 Controller 测试的:

class ArticlesControllerTest < ActionController::TestCase
  test "should get index" do
    get :index
    assert_response :success
    assert_equal users(:one, :two, :three), assigns(:articles)
  end
end

Rails 5 以后,更推荐 Integration 测试:

class ArticlesControllerTest < ActionDispatch::IntegrationTest
 test "should get index" do
 get articles_url
 assert_response :success
 end
end

注意到 Integration 测试中是没有对应的 assert_equal 语句的,因为它比较难写。如果视图返回的是 JSON 数据,可以尝试用下面的等效写法:

assert_equal users(:one, :two, :three).as_json, JSON.parse(last_response.body)

但是测试视图的代码及其依赖视图的变化,也会经常性的失效,上面只是尝试性的一种写法而已。

我不想在这里探讨 Controller 测试和 Integration 测试究竟孰优孰劣的问题,尽管你可能已经从字里行间察觉到我的倾向性。关于这个话题能够从 2012 年讨论到 2020 年,不信你可以看这个帖子。我可不想让情况变得更糟。很多次我在想,那些反对 Controller 测试的人可能仅仅把 Controller 的实例变量看成是其内部状态而已,而没有意识到它也是 Controller 与 Template 之间的契约。这不怪他们,因为用传递实例变量的方式定义契约确实不够优雅。

好在我做了一些简单的工作,让其在 Grape 框架内可以更优雅地测试接口的返回。你只需要在逻辑接口中使用 present 方法指定渲染数据:

present :users, users

然后就可以在测试用例中结合特殊的 presents 方法测试它:

assert_equal users(:one, :two, :three), presents(:users)

assigns 很像,但是它更舒服不是么?

写在最后

这就是我对 Grape 框架的改造过程,已经开始并将持续下去。我的改造理念无外乎两个想法:更好的文档结合和更好的测试。而事实上,只需要一点点工作,确实就可以起作用了。

如果你也想用到我所改造的 Grape 框架,直接克隆我的脚手架就好了:

git clone https://github.com/run27017/g...

脚手架中使用的是我 Fork 后的框架集,它们是:

点击它们的链接看一看吧,也许你也能成为开源世界的贡献者呢。

推荐

如果想了解学习更多可以看我最新录制的课程

课程链接:https://ke.sifou.com/course/1...

设计这门课的初衷
  • 当前 Web 开发的主流模式:前后端分离
  • Web API 开发,没有必要五花八门,应采用最佳实践
  • 将自己多年的实践经验总结并分享
这门课在讲什么?
  • 遵循标准:最小惊讶原则
  • 接口的完备性:输入和输出、鉴权、错误处理等
  • 与框架整合
为什么每个人都应该有一套自己的脚手架?
  1. 框架只是通用模版的提供者,并没有安排具体的解决方案
  2. 框架并没有细化到最佳实践
查看原文

赞 7 收藏 3 评论 0

Sumkor 发布了文章 · 3月19日

阅读 JDK 8 源码:WeakHashMap 和 Reference、ReferenceQueue

WeakHashMap 是一种特殊的 HashMap,它的 key 为 WeakReference 弱引用,并且内置了一个 ReferenceQueue 用于存储被回收的弱引用。阅读 WeakHashMap 源码之前,需要先理解 Reference 和 ReferenceQueue 的机制。理解其基本原理之后,可以使用 HashMap 达到跟 WeakHashMap 一样的效果,文末提供了示例。

1. Reference

public abstract class Reference<T>
extends Object

Reference 是引用对象的抽象基类。此类定义了常用于所有引用对象的操作。因为引用对象是通过与垃圾回收器的密切合作来实现的,所以不能直接为此类创建子类。

Reference 继承结构

继承结构如下:

继承结构

Reference 与 GC 的交互

在 Reference 实例所管理的对象被垃圾回收器检测为不可达时,垃圾回收器会把该 Reference 添加到 pending-Reference 列表中,这是一个非常轻量级的操作。同时,Reference 实例会默认启动一个 ReferenceHandler 守护线程,不停地尝试从 pending-Reference 列表中取得被回收的 Reference 实例。当获取成功时,ReferenceHandler 线程会把该 Reference 存入 ReferenceQueue 队列。

Reference 的构造函数

Reference 是一个抽象类,需要由子类来调用其构造方法。
入参 referent 是需要被 GC 特殊对待的对象 ,入参 queue 是 ReferenceQueue 引用队列实例,默认为 ReferenceQueue.NULL。

Reference(T referent) {
    this(referent, null);
}

Reference(T referent, ReferenceQueue<? super T> queue) {
    this.referent = referent;
    this.queue = (queue == null) ? ReferenceQueue.NULL : queue;
}

Reference 的属性

private T referent;  // Reference实例管理的对象,会被GC特殊对待

volatile ReferenceQueue<? super T> queue; // Reference实例管理的对象被回收后,Reference实例会被添加到这个队列中

/* When active:   NULL
 *     pending:   this
 *    Enqueued:   队列中的下一个引用(如果是最后一个,则为this)
 *    Inactive:   this
 */
@SuppressWarnings("rawtypes")
Reference next; // ReferenceQueue队列的下一个引用

/* When active:   由垃圾回收器管理的已发现的引用列表的下一个元素(如果是最后一个,则为this)
 *     pending:   pending-Reference列表中的下一个元素(如果是最后一个,则为null)
 *   otherwise:   NULL
 */
transient private Reference<T> discovered;  // pending-Reference列表的指针,由JVM使用

/* 用于控制垃圾回收器操作的锁,垃圾回收器开始一轮垃圾回收前要获取此锁。
 * 所有占用这个锁的代码必须尽快完成,不能生成新对象,也不能调用用户代码。
 */
static private class Lock { }
private static Lock lock = new Lock();


/* pending-Reference列表,存储等待进入ReferenceQueue队列的引用。
 * 垃圾回收器向该列表添加元素,而Reference-handler线程向该列表移除元素。
 * 操作pending-Reference列表需要使用lock对象。
 * pending-Reference列表使用discovered指针来访问元素。
 */
private static Reference<Object> pending = null; // pending-Reference列表的头结点

Reference 的状态

Reference 实例具有四种状态:

  • Active:新创建的 Reference 实例为 Active 状态。当垃圾回收器检测到 Reference 中管理的对象为不可达时,如果该 Reference 实例注册了队列,则进入 Pending 状态,否则进入 Inactive 状态。
  • Pending:在 pending-Reference 列表中的元素,等待 Reference-handler 线程将其存入 ReferenceQueue 队列。未注册的实例不会到达这个状态。
  • Enqueued:在 ReferenceQueue 队列中的元素。当实例从 ReferenceQueue 队列中删除时,进入 Inactive 状态。未注册的实例不会到达这个状态。
  • Inactive:一旦实例变为 Inactive (非活动)状态,它的状态将不再更改。

其状态图转换如下:

状态转移图

Reference 类是通过其中的 queue 属性和 next 属性来记录这些状态:

  • Active:queue = ReferenceQueue实例 或 ReferenceQueue.NULL; next = null
  • Pending:queue = ReferenceQueue实例; next = this
  • Enqueued:queue = ReferenceQueue.ENQUEUED; next = 队列的下一个节点或 this
  • Inactive:queue = ReferenceQueue.NULL; next = this.

ReferenceHandler 线程

Reference 类中定义了静态代码块,用于启动 ReferenceHandler 守护线程,设置优先级最高。

static {
    ThreadGroup tg = Thread.currentThread().getThreadGroup();
    for (ThreadGroup tgn = tg;
         tgn != null;
         tg = tgn, tgn = tg.getParent());
    Thread handler = new ReferenceHandler(tg, "Reference Handler");
    /* If there were a special system-only priority greater than
     * MAX_PRIORITY, it would be used here
     */
    handler.setPriority(Thread.MAX_PRIORITY);
    handler.setDaemon(true);
    handler.start(); // 启动ReferenceHandler守护线程,优先级最高

    // provide access in SharedSecrets // 在SharedSecrets中提供访问权限
    SharedSecrets.setJavaLangRefAccess(new JavaLangRefAccess() {
        @Override
        public boolean tryHandlePendingReference() {
            return tryHandlePending(false);
        }
    });
}

ReferenceHandler 线程启动之后,在死循环中执行 tryHandlePending 操作。
tryHandlePending 代码流程:

  1. 从 pending 属性取得 GC 回收时存入 pending-Reference 列表中对象。
  2. 将 pending 指向 pending-Reference 列表的下一个节点。
  3. 如果 ReferenceQueue 不为空,则把被回收的对象入队。

java.lang.ref.Reference.ReferenceHandler

private static class ReferenceHandler extends Thread {
    public void run() {
        while (true) {
            tryHandlePending(true);
        }
    }
}

java.lang.ref.Reference#tryHandlePending

static boolean tryHandlePending(boolean waitForNotify) {
    Reference<Object> r;
    Cleaner c;
    try {
        synchronized (lock) {
            if (pending != null) {
                r = pending; // GC 回收的时候,会把对象赋值给 pending,这里取的该对象
                c = r instanceof Cleaner ? (Cleaner) r : null;
                // 从pending链中删除r
                pending = r.discovered;
                r.discovered = null;
            } else {
                if (waitForNotify) {
                    lock.wait(); // 取不到则休眠
                }
                // retry if waited
                return waitForNotify;
            }
        }
    } catch (OutOfMemoryError x) {
        Thread.yield(); // 让出线程的CPU时间,这样希望能删除一些活动引用,使用GC回收一些空间
        // retry
        return true;
    } catch (InterruptedException x) {
        // retry
        return true;
    }

    // Fast path for cleaners
    if (c != null) {
        c.clean();
        return true;
    }

    ReferenceQueue<? super Object> q = r.queue;
    if (q != ReferenceQueue.NULL) q.enqueue(r); // 若引用队列不为空,将对象放入ReferenceQueue队列中
    return true;
}

2. ReferenceQueue

ReferenceQueue 的属性

static ReferenceQueue<Object> NULL = new Null<>();
static ReferenceQueue<Object> ENQUEUED = new Null<>();

private static class Null<S> extends ReferenceQueue<S> {
    boolean enqueue(Reference<? extends S> r) {
        return false;
    }
}

入队

使用头插法,将引用 r 存入 r.queue 队列中。

java.lang.ref.ReferenceQueue#enqueue

boolean enqueue(Reference<? extends T> r) { /* Called only by Reference class */
    synchronized (lock) {
        // Check that since getting the lock this reference hasn't already been
        // enqueued (and even then removed)
        ReferenceQueue<?> queue = r.queue;
        if ((queue == NULL) || (queue == ENQUEUED)) { // 校验引用r是否已经存入ReferenceQueue之中,或者已经从ReferenceQueue中移除
            return false;
        }
        assert queue == this;
        r.queue = ENQUEUED; // 表示引用r已经存入ReferenceQueue之中
        r.next = (head == null) ? r : head; // 头插法。r.next = 队列的下一个节点
        head = r;
        queueLength++;
        if (r instanceof FinalReference) {
            sun.misc.VM.addFinalRefCount(1);
        }
        lock.notifyAll();
        return true;
    }
}

出队

将头节点出队,设置新的头节点,并将引用 r 的下一个节点指向自身。

java.lang.ref.ReferenceQueue#reallyPoll

@SuppressWarnings("unchecked")
private Reference<? extends T> reallyPoll() {       /* Must hold lock */
    Reference<? extends T> r = head;
    if (r != null) {
        head = (r.next == r) ?
            null :
            r.next; // Unchecked due to the next field having a raw type in Reference // 头节点出队
        r.queue = NULL; // 表示引用r已经从ReferenceQueue中移除
        r.next = r;     // 自连接,方便回收
        queueLength--;
        if (r instanceof FinalReference) {
            sun.misc.VM.addFinalRefCount(-1);
        }
        return r;
    }
    return null;
}

3. WeakReference

在 JDK 1.2 版之前,Java 里面的引用是很传统的定义:如果 reference 类型的数据中存储的数值代表的是另外一块内存的起始地址,就称该 reference 数据是代表某块内存、某个对象的引用。在 JDK 1.2 版之后,Java 对引用的概念进行了扩充,将引用分为强引用(Strongly Reference)、软引用(Soft Reference)、弱引用(Weak Reference)和虚引用(Phantom Reference)4 种,这 4 种引用强度依次逐渐减弱。

  • 强引用:无论任何情况下,只要强引用关系还存在,垃圾收集器就永远不会回收掉被引用的对象。
  • 软引用:在系统将要发生内存溢出异常前,会把这些对象列进回收范围之中进行第二次回收,如果这次回收还没有足够的内存,才会抛出内存溢出异常
  • 弱引用:当垃圾收集器开始工作,无论当前内存是否足够,都会回收掉只被弱引用关联的对象
  • 虚引用:一个对象是否有虚引用的存在,完全不会对其生存时间构成影响,也无法通过虚引用来取得一个对象实例。为一个对象设置虚引用关联的唯一目的只是为了能在这个对象被收集器回收时收到一个系统通知。

WeakReference 的定义

java.lang.ref.WeakReference

public class WeakReference<T> extends Reference<T> {

    /**
     * Creates a new weak reference that refers to the given object.  The new
     * reference is not registered with any queue.
     *
     * @param referent object the new weak reference will refer to
     */
    public WeakReference(T referent) {
        super(referent);
    }

    /**
     * Creates a new weak reference that refers to the given object and is
     * registered with the given queue.
     *
     * @param referent object the new weak reference will refer to
     * @param q the queue with which the reference is to be registered,
     *          or <tt>null</tt> if registration is not required
     */
    public WeakReference(T referent, ReferenceQueue<? super T> q) {
        super(referent, q);
    }

}    

WeakReference 的使用

/**
 * 弱引用:当垃圾收集器开始工作,无论当前内存是否足够,都会回收掉只被弱引用关联的对象
 * <p>
 * -verbose:gc -Xms20M -Xmx20M -Xmn10M -XX:SurvivorRatio=8
 * 堆大小为20m,其中新生代大小为10m,按照1:8比例分配,Eden区大小设置为8m
 * 此时存入8m大小的变量,直接存入老年代(tenured generation)
 */
@Test
public void weakReference() {
    byte[] allocation01 = new byte[1024 * 1024 * 8];
    WeakReference<byte[]> weakReference = new WeakReference<byte[]>(allocation01);

    System.out.println("weakReference.get() = " + weakReference.get());// [B@154ebadd
    allocation01 = null;// 解除强引用,只留下弱引用
    System.out.println("weakReference.get() = " + weakReference.get());// [B@154ebadd

    System.gc();
    System.out.println("weakReference.get() = " + weakReference.get());// null
}

执行结果:

weakReference.get() = [B@f2a0b8e
[GC (System.gc())  15209K->9574K(19456K), 0.0209182 secs]
[Full GC (System.gc())  9574K->1323K(19456K), 0.0239549 secs]
weakReference.get() = null

WeakReference 和 ReferenceQueue 配合使用

@Test
public void referenceQueue() throws InterruptedException {
    // 创建一个引用队列
    ReferenceQueue referenceQueue = new ReferenceQueue();

    /**
     * 创建弱引用,此时 Reference 状态为 Active,
     */
    WeakReference weakReference = new WeakReference(new Object(), referenceQueue);
    System.out.println(weakReference);// java.lang.ref.WeakReference@f2a0b8e
    System.out.println(weakReference.get());// java.lang.Object@593634ad

    /**
     * 当 GC 执行后,由于自定了引用队列,Reference 的状态由 Pending 变为 Enqueued
     */
    System.gc();

    System.out.println(weakReference);// java.lang.ref.WeakReference@f2a0b8e
    System.out.println(weakReference.get());// null

    /**
     * 从队列里面取出该元素,Reference 状态为 Inactive
     */
    Reference reference = referenceQueue.remove();
    System.out.println(reference);// java.lang.ref.WeakReference@f2a0b8e
    System.out.println(reference.get());// null

    reference = referenceQueue.poll();
    System.out.println(reference);// null
}

for 循环中的引用对象

来看一下特殊的例子。

/**
 * -verbose:gc -Xms20M -Xmx20M -Xmn10M -XX:SurvivorRatio=8
 */
@Test
public void weakReferenceInLoop01() throws InterruptedException {
    ReferenceQueue referenceQueue = new ReferenceQueue();
    for (int i = 0; i < 5; i++) {
        byte[] bytes = new byte[1024 * 1024 * 8]; // 虽然是个强引用,但是下一次循环后就变为不可达!
        WeakReference<byte[]> weakReference = new WeakReference<byte[]>(bytes, referenceQueue);
    }
    Reference remove = referenceQueue.remove(1000); // 这里没有得到通知!
    System.out.println("remove = " + remove); // null
}

执行结果:

[GC (Allocation Failure)  15241K->9594K(19456K), 0.0025132 secs]
[GC (Allocation Failure)  9594K->9610K(19456K), 0.0010068 secs]
[Full GC (Allocation Failure)  9610K->1328K(19456K), 0.0151334 secs]
[GC (Allocation Failure)  9520K->9520K(19456K), 0.0003440 secs]
[Full GC (Ergonomics)  9520K->1328K(19456K), 0.0055399 secs]
[GC (Allocation Failure)  9520K->9520K(19456K), 0.0002247 secs]
[Full GC (Ergonomics)  9520K->963K(19456K), 0.0068916 secs]
[GC (Allocation Failure)  9156K->9156K(19456K), 0.0003452 secs]
[Full GC (Ergonomics)  9156K->1024K(19456K), 0.0024345 secs]
remove = null

上面的例子有两个注意要点:

  1. for 循环中的 bytes,虽然是个强引用,但是下一次循环后就变为不可达,因此弱引用会被回收。
  2. 弱引用被回收,但是 ReferenceQueue 并没有得到通知。

即使在 java.lang.ref.Reference#tryHandlePending 中打断点,也没有进入。

为什么 ReferenceQueue 没有得到通知呢,来看下一个例子。

/**
 * -verbose:gc -Xms20M -Xmx20M -Xmn10M -XX:SurvivorRatio=8
 */
@Test
public void weakReferenceInLoop02() throws InterruptedException {
    ReferenceQueue referenceQueue = new ReferenceQueue();
    WeakReference[] weakReferences = new WeakReference[10];
    for (int i = 0; i < 5; i++) {
        byte[] bytes = new byte[1024 * 1024 * 8];
        WeakReference<byte[]> weakReference = new WeakReference<byte[]>(bytes, referenceQueue);
        weakReferences[i] = weakReference;
    }
    Reference remove = referenceQueue.remove(1000); // 这里得到通知了!
    System.out.println("remove = " + remove);
}

执行结果:

[GC (Allocation Failure)  15196K->9597K(19456K), 0.0033007 secs]
[GC (Allocation Failure)  9597K->9629K(19456K), 0.0101304 secs]
[Full GC (Allocation Failure)  9629K->1328K(19456K), 0.0097783 secs]
[GC (Allocation Failure)  9520K->9552K(19456K), 0.0076307 secs]
[Full GC (Ergonomics)  9552K->1536K(19456K), 0.0299183 secs]
[GC (Allocation Failure)  9728K->9760K(19456K), 0.0082721 secs]
[Full GC (Ergonomics)  9760K->1328K(19456K), 0.0051265 secs]
[GC (Allocation Failure)  9520K->9552K(19456K), 0.0004645 secs]
[Full GC (Ergonomics)  9552K->1328K(19456K), 0.0044950 secs]
remove = java.lang.ref.WeakReference@f2a0b8e

这个例子中,for 循环中的 WeakReference 通过赋值给数组,暴露给 for 循环之外了。
如果在 java.lang.ref.Reference#tryHandlePending 中打断点,发现是可以进入断点位置的。

猜测是 JVM 的一种优化手段,当 WeakReference 对象本身有关联到强引用时,GC 回收弱引用的时候才会将其存入 pending-Reference 列表,以便后续由 Reference-handler 线程将其存入 ReferenceQueue 队列。

4. WeakHashMap

public class WeakHashMap<K,V>
extends AbstractMap<K,V>
implements Map<K,V>

WeakHashMap 是以弱键实现的基于哈希表的 Map。在 WeakHashMap 中,当某个 key 不再正常使用时,将自动移除该 Entry。
支持 null 值和 null 键。该类具有与 HashMap 类相似的性能特征,并具有相同的初始容量(16)和加载因子(0.75)。
像大多数 collection 类一样,该类是不同步的。可以使用 Collections.synchronizedMap 方法来构造同步的 WeakHashMap。

WeakHashMap 的数据结构

WeakHashMap 的存储结构只有(数组 + 链表)。
WeakHashMap 由于使用了弱引用,在 GC 时会回收没有强引用的 key,因此不会存储过多的元素,无需转换成树结构。

数组结构定义如下:

/**
 * The table, resized as necessary. Length MUST Always be a power of two.
 */
Entry<K,V>[] table;

WeakHashMap 中的内部类 Entry 继承了 WeakReference,将 key 交给 WeakReference 管理。

java.util.WeakHashMap.Entry

private static class Entry<K,V> extends WeakReference<Object> implements Map.Entry<K,V> {
    V value;
    final int hash;
    Entry<K,V> next;

    Entry(Object key, V value,
          ReferenceQueue<Object> queue,
          int hash, Entry<K,V> next) {
        super(key, queue);
        this.value = value;
        this.hash  = hash;
        this.next  = next;
    }
}    

定义了属性 ReferenceQueue

private final ReferenceQueue<Object> queue = new ReferenceQueue<>();

往 WeakHashMap 中添加元素时,使用new Entry<>(k, value, queue, h, e)创建 Entry 条目,传入 ReferenceQueue,具体见 WeakHashMap#put 方法。

put

java.util.WeakHashMap#put

public V put(K key, V value) {
    Object k = maskNull(key);
    int h = hash(k);
    Entry<K,V>[] tab = getTable(); // 获取当前table数组(获取之前会清除废弃的元素)
    int i = indexFor(h, tab.length);

    for (Entry<K,V> e = tab[i]; e != null; e = e.next) { // 遍历链表
        if (h == e.hash && eq(k, e.get())) {
            V oldValue = e.value;
            if (value != oldValue)
                e.value = value; // 替换旧值
            return oldValue;
        }
    }

    modCount++;
    Entry<K,V> e = tab[i];
    tab[i] = new Entry<>(k, value, queue, h, e); // 头插法
    if (++size >= threshold)
        resize(tab.length * 2); // 扩容
    return null;
}

流程跟 HashMap#put 相比简化了不少,关注不同的地方:

  1. WeakHashMap 的构造函数中就创建了数组,而 HashMap 第一次 put 操作才初始化数组。
  2. getTable() 获取当前数组 table 时,会先清除 Key 已被回收的 Entry,再返回 table。
  3. 由于桶中没有树结构,定位到桶之后只需要遍历链表即可。
  4. 链表使用头插法加入新元素。

expungeStaleEntries

expungeStaleEntries() 方法用于清除废弃元素,getTable()size()resize()方法都会调用该方法。
基本上 WeakHashMap 的每个方法都会调用 getTable(),可知 expungeStaleEntries() 是 WeakHashMap 中核心的逻辑。

getTable

java.util.WeakHashMap#expungeStaleEntries

private void expungeStaleEntries() {
    for (Object x; (x = queue.poll()) != null; ) { // 被回收的对象,会由GC存入引用队列中
        synchronized (queue) {
            @SuppressWarnings("unchecked")
                Entry<K,V> e = (Entry<K,V>) x;
            int i = indexFor(e.hash, table.length); // 被回收的节点所在桶的位置

            Entry<K,V> prev = table[i]; // 被回收的节点所在桶的头节点
            Entry<K,V> p = prev;
            while (p != null) { // 遍历链表,删除节点e
                Entry<K,V> next = p.next;
                if (p == e) {
                    if (prev == e)
                        table[i] = next;
                    else
                        prev.next = next; // 节点e的前一个节点,指向节点e的下一个节点,即在链表上解开节点e
                    // Must not null out e.next;
                    // stale entries may be in use by a HashIterator
                    e.value = null; // Help GC
                    size--; // 容量减少
                    break;
                }
                prev = p;
                p = next;
            }
        }
    }
}

代码流程:

  1. 从 ReferenceQueue 中拉取已被 GC 回收的弱引用节点
  2. 定位该节点在数组中所在桶的位置
  3. 遍历桶上的链表,删除该节点

resize

当满足 size >= threshold时,调用扩容方法。
需要注意的是,执行扩容之前,会清除 WeakHashMap 中废弃的 Entry,导致 size 变小而达不到扩容阈值。
此时 WeakHashMap 采取先扩容,再将新的 size 与旧的 threshold 进行对比。
若满足size >= threshold / 2说明扩容成功,更新阈值;否则放弃扩容,把元素迁移回旧数组。
先扩容又放弃扩容,反复迁移数组是一个比较低效的操作。

void resize(int newCapacity) {
    Entry<K,V>[] oldTable = getTable(); // 扩容之前获取旧数组,该操作会触发清除废弃元素,导致 size 变小而达不到扩容阈值
    int oldCapacity = oldTable.length;
    if (oldCapacity == MAXIMUM_CAPACITY) {
        threshold = Integer.MAX_VALUE;
        return;
    }

    Entry<K,V>[] newTable = newTable(newCapacity); // 扩容
    transfer(oldTable, newTable); // 迁移元素至新数组
    table = newTable;

    /*
     * If ignoring null elements and processing ref queue caused massive
     * shrinkage, then restore old table.  This should be rare, but avoids
     * unbounded expansion of garbage-filled tables.
     */
    if (size >= threshold / 2) { // 这里放宽了扩容阈值,为原来的一半,目的是避免反复迁移数组
        threshold = (int)(newCapacity * loadFactor); // 扩容完成,更新阈值
    } else { // 放弃扩容
        expungeStaleEntries(); // 清除废弃的元素
        transfer(newTable, oldTable); // 迁移元素至旧数组
        table = oldTable;
    }
}

WeakHashMap 使用案例

往 WeakHashMap 中存入 10000 个 key 为 1M 大小的元素,循环结束之后查得 WeakHashMap 大小远不到 10000。

@Test
public void weakHashMap() throws InterruptedException {
    WeakHashMap<Object, Object> map = new WeakHashMap<>();
    Object value = new Object();
    for (int i = 0; i < 10000; i++) {
        byte[] bytes = new byte[1024 * 1024]; // 1M
        map.put(bytes, value);
    }
    System.out.println("map.size->" + map.size()); // 609
}

总结一下,WeakHashMap 使用了弱引用作为 key,当 GC 回收时,清除弱引用并将 Entry 存入 WeakHashMap 中内置的 ReferenceQueue 中。
后续 WeakHashMap 的每一步操作都会从 ReferenceQueue 中拉取得到 Entry,并从 WeakHashMap 中删除该 Entry。
根据这一特性,WeakHashMap 很适合作为缓存使用。

理解了 WeakHashMap 的原理,使用 HashMap 同样可以做到类似的效果。

@Test
public void hashMap() throws InterruptedException {
    ReferenceQueue referenceQueue = new ReferenceQueue(); // 定义引用队列
    Object value = new Object();
    HashMap<Object, Object> map = new HashMap<>();
    for (int i = 0; i < 10000; i++) {
        byte[] bytes = new byte[1024 * 1024]; // 1M
        WeakReference<byte[]> weakReference = new WeakReference<byte[]>(bytes, referenceQueue);
        map.put(weakReference, value); // 使用WeakReference作为Key
    }
    System.out.println("map.size->" + map.size()); // 10000

    Thread thread = new Thread(() -> {
        try {
            int cnt = 0;
            WeakReference<byte[]> k;
            while ((k = (WeakReference) referenceQueue.remove(1000)) != null) {
                 map.remove(k); // 从HashMap中移除Entry,达到与WeakHashMap一样的效果
            }
        } catch (InterruptedException e) {
            // nothing
        }
    });
    thread.setDaemon(true);
    thread.start();
    thread.join();// 主线程需要等待,直到当前线程thread消亡

    System.out.println("map.size->" + map.size()); // 远不足 10000
}

5. 参考


作者:Sumkor
链接:https://segmentfault.com/a/11...

查看原文

赞 0 收藏 0 评论 0

Sumkor 关注了专栏 · 3月4日

Keep Coding

记录与分享

关注 184

Sumkor 发布了文章 · 3月2日

阅读 JDK 8 源码:ConcurrentHashMap 扩容总结 (发现源码的BUG!)

这段时间阅读了 JDK 8 的 ConcurrentHashMap 源码,其中扩容的过程涉及技术点繁多,很有必要自己动手对扩容原理进行梳理总结。本文中,我对关键代码编写了单元测试,并且找来了图例,方便理解。
编写文章过程,我竟然发现了 JDK 8 版本扩容时对 sizeCtl 的判断有 BUG,具体在第 3.4 节中说明。

1. 数据结构

使用数组+链表+红黑树来实现,利用 CAS + synchronized 来保证并发更新的安全。

ConcurrentHashMap

1.1 对比 Java7

与 Java7 相比,Java8 中的 ConcurrentHashMap 舍弃了 Segment 结构,将分段锁改为粒度更小的“桶”锁。得益于 JVM 对锁的优化(锁膨胀、锁粗化、锁消除),ConcurrentHashMap 的锁技术从 ReentrantLock 改为内置锁 synchronized,并引入了 CAS 乐观锁。

1.2 对比 HashMap

红黑树结构略有不同。
HashMap 的红黑树中的节点叫做 TreeNode,TreeNode 不仅仅有属性,还维护着红黑树的结构,比如说查找,新增等等;
ConcurrentHashMap 中红黑树被拆分成两块,TreeNode 仅仅维护的属性和查找功能,新增了 TreeBin,来维护红黑树结构,并负责根节点的加锁和解锁;新增 ForwardingNode (转移)节点,扩容的时候会使用到,通过使用该节点,来保证扩容时的线程安全。

Node

Node 是 ConcurrentHashMap 存储结构的基本单元,用于存储数据。
与 HashMap 中的 Node 一样实现了 Map.Entry 接口,不同的是部分属性加了 volatile 修饰。

static class Node<K,V> implements Map.Entry<K,V> {
    //链表结构的属性定义
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }

TreeNode

TreeNode 继承与 Node,它是红黑树的节点,同时保留了链表的特性。
当链表的节点数大于 8 时会转换成红黑树的结构,就是通过 TreeNode 作为存储结构代替 Node 来转换成黑红树的。

static final class TreeNode<K,V> extends Node<K,V> {
    //树形结构的属性定义
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red; //标志红黑树的红节点
    TreeNode(int hash, K key, V val, Node<K,V> next,
             TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }
}

TreeBin

当链表转为红黑树后,数组中保存的引用为 TreeBin,TreeBin 内部不保存 key/value,他保存了 TreeNode 的 list 以及红黑树 root。
TreeBin 从字面含义中可以理解为存储树形结构的容器,而树形结构就是指 TreeNode,所以 TreeBin 就是封装 TreeNode 的容器,它提供转换黑红树的一些条件和锁的控制。

static final class TreeBin<K,V> extends Node<K,V> {
    //指向TreeNode列表和根节点
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // 读写锁状态
    static final int WRITER = 1; // 获取写锁的状态
    static final int WAITER = 2; // 等待写锁的状态
    static final int READER = 4; // 增加数据时读锁的状态
}    

ForwardingNode

当进行扩容时,要把链表迁移到新的哈希表,在做这个操作时,会在把数组中的头节点替换为 ForwardingNode 对象。ForwardingNode 中不保存 key 和 value,只保存了扩容后哈希表(nextTable)的引用。此时查找相应 node 时,需要去 nextTable 中查找。

static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }

2. 索引计算

与 HashMap 不同的地方是,将 hash 值与 HASH_BITS 作按位与操作,保证 hash 值为正数。
负数在 ConcurrentHashMap 中有特殊的含义(代表 TreeBin、ForwardingNode 等)。

/**
 * 哈希桶数组索引位置的计算
 * 
 * @author Sumkor https://segmentfault.com/blog/sumkor
 * @since 2021/3/2
 */
@Test
public void hash() {
    int capacity = 16;// 默认值,见 java.util.concurrent.ConcurrentHashMap.DEFAULT_CAPACITY
    Object key = 17;

    int h = key.hashCode();// 第一步取hashCode值
    h = h ^ (h >>> 16);// 第二步高位参与运算
    h = h & 0x7fffffff;// 第三步与HASH_BITS相与,主要作用是使hash值为正数
    /**
     * @see ConcurrentHashMap#spread(int)
     *
     * 在 ConcurrentHashMap 之中,hash值为负数有特殊的含义:
     * -1 表示 ForwardingNode 节点 {@link ConcurrentHashMap#MOVED}
     * -2 表示 TreeBin 节点 {@link ConcurrentHashMap#TREEBIN}
     */

    h = h & (capacity - 1);// 第四步取模运算
    /**
     * @see ConcurrentHashMap#putVal(java.lang.Object, java.lang.Object, boolean)
     */

    System.out.println("h = " + h);
}

关于索引计算公式的推导见我写的文章 HashMap中的取模和扩容公式推导

3. 扩容

3.1 扩容的时机

  • put() 添加元素完毕后,通过 addCount() 检查元素总量 size 是否超过阈值 sizeCtrl。
  • putAll() 添加大量元素之前,通过 tryPresize() 检查是否需要扩容。
  • treeifyBin() 桶中元素由链表转成树结构之前,如果数组容量小于 64(MIN_TREEIFY_CAPACITY),放弃转换红黑树,通过 tryPresize() 检查是否需要扩容。
  • put()computeIfAbsent()computeIfPresent() 等方法操作 HashMap 元素时,发现元素节点类型为 ForwardingNode,则通过 helpTransfer() 检查当前线程是否加入扩容。

3.2 扩容控制 sizeCtl

sizeCtl 是 ConcurrentHashMap 中的一个重要的变量。

/**
 * Table initialization and resizing control.  When negative, the
 * table is being initialized or resized: -1 for initialization,
 * else -(1 + the number of active resizing threads).  Otherwise,
 * when table is null, holds the initial table size to use upon
 * creation, or 0 for default. After initialization, holds the
 * next element count value upon which to resize the table.
 */
private transient volatile int sizeCtl;

sizeCtl 用于数组初始化与扩容控制:

初始化前:
    = 0  //未指定初始容量
    > 0  //由指定的初始容量计算而来,再找最近的2的幂次方。比如传入6,计算公式为6+6/2+1=10,最近的2的幂次方为16,所以sizeCtl就为16。具体见 tableSizeFor 函数。
    
初始化中:
    = -1 //table正在初始化
    = -N //N是int类型,分为两部分,高15位是指定容量标识,低16位表示并行扩容线程数+1,具体见 resizeStamp 函数。
    
初始化后:
    = n - (n >>> 2) = table.length * 0.75  //扩容阈值,为table容量大小的0.75倍

3.3 扩容检查

addCount()tryPresize()helpTransfer() 都包含了相似的扩容检查逻辑。
这里以 addCount() 为例作分析。

3.3.1 addCount() 源码

扩容检查流程:

  • 计算元素总量 size,若 CAS 冲突严重则放弃扩容。
  • 若 size 计算成功,有新元素加入,且检测到元素总量大于阈值 size > sizeCtl。

    • 如果检查到当前已有线程在进行扩容。

      • 扩容已经接近完成或足够多的线程参与到扩容中了,当前线程直接返回。
      • 当前线程参与扩容。
    • 如果没有其他线程在进行扩容,则修改 sizeCtl 标识,进行扩容。
// 参数 x 表示键值对个数的变化值,如果为正,表示新增了元素,如果为负,表示删除了元素
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // 如果 counterCells 为空,则直接尝试通过 CAS 将 x 累加到 baseCount 中
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        // counterCells 非空
        // 或 counterCells 为空,但 CAS baseCount 失败都会来到这里
        CounterCell a; long v; int m;
        boolean uncontended = true;
        // 如果当前线程探针哈希到的数组元素非空,则尝试将 x 累加到对应数组元素
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            // counterCells 为空,或其长度小于1
            // 或当前线程探针哈希到的数组元素为空
            // 或当前线程探针哈希到的数组元素非空,但 CAS 数组元素失败
            // 都会调用 fullAddCount 方法来完成 x 的写入  
            fullAddCount(x, uncontended);
            // 如果调用过 fullAddCount,则当前线程一定不会协助扩容
            return;
        }
        if (check <= 1)
            return;
        s = sumCount(); // s表示加入新元素后的size大小,即元素总量
    }
    if (check >= 0) { // check值为桶上节点数量,有新元素加入成功才检查是否要扩容
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) { // size大于sizeCtl阈值,及其他边界条件符合,则扩容
            int rs = resizeStamp(n); // 高16位置0,第16位为1,低15位存放当前容量n扩容标识,用于表示是对n的扩容。
            if (sc < 0) { // sizeCtl<0表示已经有线程在进行扩容工作
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0) // 条件1校验容量n扩容标识,条件2和3校验sc的边界(这里有BUG!),条件4和5校验扩容逻辑是否完成
                    break; // 跳出循环,表示当前线程无需参与扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 当前线程参与扩容,sizeCtl加1
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2)) // 没有其他线程在进行扩容,则修改sizeCtl的值,将其高15位存放容量n扩容标识,低16位存放并行扩容线程数+1
                transfer(tab, null);
            s = sumCount();
        }
    }
}

上述扩容检查流程,有两个关键的技术点:

  • ConcurrentHashMap 如何计算元素总量 size ?
  • resizeStamp 函数如何生成 sizeCtrl 以控制扩容过程?

此外,这里利用 sc == rs + 1 || sc == rs + MAX_RESIZERS 来校验 sizeCtl 的边界存在 BUG!

我们一个一个来看。

3.3.2 计算元素总量 size

ConcurrentHashMap 依靠 baseCount 和 counterCells 来计算元素总量 size,定义如下:

private transient volatile long baseCount;

private transient volatile CounterCell[] counterCells;

@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

addCount() 中分为以下三种情况来处理 size:

baseCount CAS 成功。

执行 U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 成功,
得到 size = baseCount + x。

baseCount CAS 失败,counterCells CAS 成功。

执行 U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 失败,
执行 U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x) 成功,
通过 size = sumCount() 来计算容量。

// 累加 baseCount 和与所有 counterCells 数组的非空元素的和
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

baseCount CAS 失败,counterCells CAS 失败。

执行 U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 失败,
执行 U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x) 失败,
通过 fullAddCount() 将 x 的写入 baseCount 或 counterCells,不会计算 size。

3.3.3 扩容过程 sizeCtrl 的计算

在扩容过程中,sizeCtrl 值为负数,其高 15 位是指定容量标识,低 16 位表示并行扩容线程数 + 1。

这个数值有什么意义?

高 15 位是指定容量标识。即存储扩容之前数组的大小 table.length,用于标识是对该大小的扩容。
低 16 位表示并行扩容线程数 + 1。用于记录当前参与扩容的线程数量,用于控制参与扩容的线程数。

最大的可参与扩容的线程数:65535

/**
 * The maximum number of threads that can help resize.
 * Must fit in 32 - RESIZE_STAMP_BITS bits.
 */
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 65535

为什么是并行扩容线程数 + 1?

sizeCtrl 的低 16 位记录并行扩容线程数,为什么还要 “+1” 呢?

我的理解是,在扩容过程中 sizeCtrl 为负数,而 -1 值具有特殊的含义,代表数组 table 正在初始化,是一个重要的标志位。
如 initTable() 方法中通过 U.compareAndSwapInt(this, SIZECTL, sc, -1) 判断 table 是否正在初始化。
为了让出 “-1” 这个标志位,因此在二进制符号位为负的情况下,低 16 位还需要再 + 1。

如何得到这个数值?

resizeStamp() 函数和 addCount()tryPresize()helpTransfer() 扩容检查逻辑都参与对 sizeCtrl 负数值的计算。

static final int resizeStamp(int n) {
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

总结一下计算过程:

/**
 * 扩容时 sizeCtl = -N
 * N是int类型,分为两部分,高15位是扩容前的容量标识,低16位表示并行扩容线程数+1
 *
 * @author Sumkor https://segmentfault.com/blog/sumkor
 * @since 2021/3/2
 */
@Test
public void sizeCtl() {
    /**
     * 插入元素的时候,检查 sizeCtl 看是否需要扩容,此时
     * {@link ConcurrentHashMap#putVal} 操作调用了 {@link ConcurrentHashMap#addCount}
     * 若这里检查到 size > sizeCtl阈值,则进行扩容。
     */

    /**
     * 首先关注其中的 {@link ConcurrentHashMap#resizeStamp} 方法
     */
    int n = 8;
    /**
     * 设置容量为 8,二进制表示如下:
     * 0000 0000 0000 0000 0000 0000 0000 1000
     */

    n = Integer.numberOfLeadingZeros(n);
    /**
     * Integer.numberOfLeadingZeros(n) 用于计算 n 转换成二进制后前面有几个 0。
     * 已知 ConcurrentHashMap 的容量必定是 2 的幂次方,所以不同的容量 n 前面 0 的个数必然不同,
     * 这里相当于用 0 的个数来记录 n 的值。
     *
     * Integer.numberOfLeadingZeros(8)=28,二进制表示如下:
     * 0000 0000 0000 0000 0000 0000 0001 1100
     */

    int rs = n | (1 << (RESIZE_STAMP_BITS - 1));
    /**
     * (1 << (RESIZE_STAMP_BITS - 1)即是 1<<15,表示为二进制即是高 16 位为 0,低 16 位为 1:
     * 0000 0000 0000 0000 1000 0000 0000 0000
     *
     * 再与 n 作或运算,得到二进制如下:
     * 0000 0000 0000 0000 1000 0000 0001 1100
     */
    System.out.println("rs = " + Integer.toBinaryString(rs));
    /**
     * 结论:resizeStamp()的返回值:高16位置0,第16位为1,低15位存放当前容量n扩容标识,用于表示是对n的扩容。
     */

    int sizeCtl = (rs << RESIZE_STAMP_SHIFT) + 2;
    /**
     * rs << 16,左移 16 后最高位为 1,所以成了一个负数。计算得到 sizeCtl 二进制如下:
     * 1000 0000 0001 1100 0000 0000 0000 0010
     */
    System.out.println("sizeCtl = " + Integer.toBinaryString(sizeCtl));
    /**
     * 那么在扩容时 sizeCtl 值的意义便如下所示:
     * 高15位:容量n扩容标识
     * 低16位:并行扩容线程数+1
     */
}

3.4 扩容检查的 BUG

明白了 sizeCtl 的含义和计算过程之后,回过头来看扩容检查的代码,还是以 addCount() 为例:

private final void addCount(long x, int check) {
    // ...省略
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

回顾一下,这里 rs = resizeStamp(n) 的高 16 位为 0,第 16 位为 1,低 15 位存放当前容量 n 扩容标识。
简单来说,rs 得到的值是正数,而扩容过程 sc < 0 是负数,那么 sc == rs + 1 || sc == rs + MAX_RESIZERS 是不可能成立的。这样导致的后果是无法控制执行扩容方法 transfer() 的线程数。不过影响并不严重, transfer() 方法本身是线程安全的,只是有可能会加剧该方法的资源竞争。

正确的写法应该是 sc == (rs << RESIZE_STAMP_SHIFT) + 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
sc == (rs << RESIZE_STAMP_SHIFT) + 1 表示扩容已经结束。
sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS 表示参与扩容的线程已经达到最大值。

在 Oracle Java Bug Database 上可以看到在 2018 年有人提了这个 Bug :Bug ID:JDK-8214427 : probable bug in logic of ConcurrentHashMap.addCount()

Bug ID:JDK-8214427

可以看到该 BUG 已经在 JDK 12 版本中解决(也就是 JDK 8 中没有解决,说好的维护至 2030 年呢)。
话说回来,国内各种博客网站、公众号研究 ConcurrentHashMap 的文章多如牛毛,几乎没有看到人提及这个 BUG,也是奇怪。

看一下解决之后的写法,对 sc 边界值的判断跟我上述的想法是一致的。

private final void addCount(long x, int check) {
    //...
    if (check >= 0) {
        Node<K,V>[] tab, nt;
        int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
            if (sc < 0) {
                if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
                        (nt = nextTable) == null || transferIndex <= 0)
                    break;
                if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            } else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

3.5 扩容流程

上述流程只是扩容之前的准备,扩容的核心逻辑在 transfer() 方法中。
阅读过程中需要时刻关注 ConcurrentHashMap 是如何做到扩容的并发安全的。

3.5.1 transfer() 源码

看源码之前,提前梳理一下扩容过程:

  1. 创建 nextTable,新容量是旧容量的 2 倍。
  2. 将原 table 的所有桶逆序分配给多个线程,每个线程每次最小分配 16 个桶,防止资源竞争导致的效率下降。指定范围的桶可能分配给多个线程同时处理。
  3. 扩容时遇到空的桶,采用 CAS 设置为 ForwardingNode 节点,表示该桶扩容完成。
  4. 扩容时遇到 ForwardingNode 节点,表示该桶已扩容过了,直接跳过。
  5. 单个桶内元素的迁移是加锁的,将旧 table 的 i 位置上所有元素拆分成高低两部分,并迁移到 nextTable 上,低位索引是 i,高位索引是 i + n,其中 n 为扩容前的容量。
  6. 最后将旧 table 的 i 位置设置为 ForwardingNode 节点。
  7. 所有桶扩容完毕,将 table 指向 nextTable,设置 sizeCtl 为新容量 0.75 倍
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 每核处理的桶的数目,最小为16
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 构建nextTable,其容量为原来容量的两倍
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n; // 迁移总进度,值范围为[0,n],表示从table的第n-1位开始处理直到第0位。
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 扩容时的特殊节点,hash固定为-1,标明此节点正在进行迁移。扩容期间的元素查找要调用其find方法在nextTable中查找元素
    boolean advance = true; // 当前线程是否需要继续寻找下一个可处理的节点
    boolean finishing = false; // to ensure sweep before committing nextTab // 所有桶是否都已迁移完成
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) { // 此循环的作用是 1.确定当前线程要迁移的桶的范围;2.通过更新i的值确定当前范围内下一个要处理的节点
            int nextIndex, nextBound;
            if (--i >= bound || finishing) // 每次循环都检查结束条件:i自减没有超过下界,finishing标识为true时,跳出while循环
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) { // 迁移总进度<=0,表示所有桶都已迁移完成
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) { // CAS执行transferIndex=transferIndex-stride,即transferIndex减去已分配出去的桶,得到边界,这里为下界
                bound = nextBound; // 当前线程需要处理的桶下标的下界
                i = nextIndex - 1; // 当前线程需要处理的桶下标
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) { // 当前线程自己的活已经做完或所有线程的活都已做完
            int sc;
            if (finishing) { // 已经完成所有节点复制了。所有线程已干完活,最后才走这里
                nextTable = null;
                table = nextTab; // table指向nextTable
                sizeCtl = (n << 1) - (n >>> 1); // 设置sizeCtl为新容量0.75倍
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 当前线程已结束扩容,sizeCtl-1表示参与扩容线程数-1
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 相等时说明没有线程在参与扩容了,置finishing=advance=true,为保险让i=n再检查一次
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null) // 遍历到i位置为null,则放入ForwardingNode节点,标志该桶扩容完成。
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED) // f.hash == -1 表示遍历到了ForwardingNode节点,意味着该节点已经处理过了
            advance = true; // already processed
        else {
            synchronized (f) { // 桶内元素迁移需要加锁
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) { // 链表节点。非链表节点hash值小于0
                        int runBit = fh & n; // 根据 hash&n 的结果,将所有结点分为两部分
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n; // 遍历链表的每个节点,依次计算 hash&n
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln); // hash&n为0,索引位置不变,作低位链表
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn); // hash&n不为0,索引变成“原索引+oldCap”,作高位链表
                        }
                        setTabAt(nextTab, i, ln); // 低位链表放在i处
                        setTabAt(nextTab, i + n, hn); // 高位链表放在i+n处
                        setTabAt(tab, i, fwd); // 在原table的i位置设置ForwardingNode节点,以提示该桶扩容完成
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

总结:

  1. 每个线程想增/删元素时,如果访问的桶是 ForwardingNode 节点,则表明当前正处于扩容状态,协助一起扩容完成后再完成相应的数据更改操作。
  2. 一个旧桶内的数据迁移完成,但不是所有桶都迁移完成时,查询数据委托给 ForwardingNode 结点查询 nextTable 完成。
  3. 迁移过程中 sizeCtl 用于记录参与扩容线程的数量,全部迁移完成后 sizeCtl 更新为新 table 容量的 0.75 倍。

3.5.2 链表迁移图示

在 ConcurrentHashMap 中,对于数组的桶上的链表结构,扩容时需要拆分成两条新的链表。

普通链表迁移

迁移过程中,通过 ph & n,即 e.hash & oldCap 计算新数组的索引位置。这部分的思想与 HashMap 是一样的。

e.hash & oldCap 公式的推导见我的文章 《HashMap中的取模和扩容公式推导》

3.5.3 单元测试,理解 lastRun

不同的是,ConcurrentHashMap 采用 lastRun 节点来辅助拆分两条新链表,而 HashMap 采用首尾指针来拆分两条新链表,见我的文章 HashMap 扩容过程图解

计算 lastRun 节点图示:

lastRun 节点

计算 lastRun 节点和拆分链表的逻辑,单元测试:

/**
 * 扩容时,链表迁移算法
 *
 * @author Sumkor https://segmentfault.com/blog/sumkor
 * @since 2021/3/2
 */
@Test
public void transferLink() {
    int oldCap = 1;
    int newCap = 2;
    Node[] oldTable = new Node[oldCap];
    Node[] newTable = new Node[newCap];
    // A -> B -> C
    Node firstLinkNode03 = new Node(new Integer(3).hashCode(), 3, "C", null);
    Node firstLinkNode02 = new Node(new Integer(2).hashCode(), 2, "B", firstLinkNode03);
    Node firstLinkNode01 = new Node(new Integer(1).hashCode(), 1, "A", firstLinkNode02);
    oldTable[0] = firstLinkNode01;
    printTable(oldTable);

    // 赋值
    int i = 0;
    int n = oldCap;
    Node f = firstLinkNode01;
    int fh = firstLinkNode01.hash;

    /**
     * 单个桶元素扩容
     * @see ConcurrentHashMap#transfer(java.util.concurrent.ConcurrentHashMap.Node[], java.util.concurrent.ConcurrentHashMap.Node[])
     */
    Node ln, hn;
    if (fh >= 0) { // 链表节点。非链表节点hash值小于0
        int runBit = fh & n; // 根据 hash&n 的结果,将所有结点分为两部分
        Node lastRun = f;
        for (Node p = f.next; p != null; p = p.next) { // 遍历原链表得到lastRun,该节点作为新链表的起始节点(新链表采用头插法)
            int b = p.hash & n; // 遍历链表的每个节点,依次计算 hash&n
            if (b != runBit) {
                runBit = b;
                lastRun = p;
            }
        }
        if (runBit == 0) { // 判断lastRun节点是属于高位还是地位
            ln = lastRun;
            hn = null;
        } else {
            hn = lastRun;
            ln = null;
        }
        System.out.println("lastRun = " + lastRun.getValue());
        for (Node p = f; p != lastRun; p = p.next) {
            int ph = p.hash;
            Object pk = p.key;
            Object pv = p.val;
            if ((ph & n) == 0)
                ln = new Node(ph, pk, pv, ln); // hash&n为0,索引位置不变,作低位链表。这里采用头插法
            else
                hn = new Node(ph, pk, pv, hn); // hash&n不为0,索引变成“原索引+oldCap”,作高位链表
        }
        newTable[i] = ln;
        newTable[i + n] = hn;
        printTable(newTable);
    }
}

执行结果:

1=A -> 2=B -> 3=C ->
--------------------------
lastRun = C
2=B ->
1=A -> 3=C ->
--------------------------

使用更多的节点来测试,结果如下:

1=A -> 2=B -> 3=C -> 4=D -> 5=E -> 6=F -> 8=H -> 10=J ->
--------------------------
lastRun = F
4=D -> 2=B -> 6=F -> 8=H -> 10=J ->
5=E -> 3=C -> 1=A ->
--------------------------

关注链表迁移前后的顺序

ConcurrentHashMap 中的链表迁移之后,LastRun 节点及之后的节点的顺序与旧链表相同,其余节点都是倒序的。这是由于 ConcurrentHashMap 迁移桶上链表的时候,加了锁,因此迁移前后顺序不一致没有问题。

而 HashMap 中的链表迁移算法,使用了高低位的首尾指针,迁移前后节点的顺序都是一致的,可以避免在并发情况下链表出现环的问题。测试结果如下:

1=A -> 2=B -> 3=C -> 4=D -> 5=E -> 6=F -> 8=H -> 10=J ->
--------------------------
2=B -> 4=D -> 6=F -> 8=H -> 10=J ->
1=A -> 3=C -> 5=E ->
--------------------------

3.5.4 红黑树迁移图示

红黑树的迁移算法与 HashMap 中的是一样的,利用了 TreeNode 的链表特性,采用了高低位的首尾指针来拆分两条新链表。

红黑树迁移

4. 参考


作者:Sumkor
链接:https://segmentfault.com/a/11...

查看原文

赞 0 收藏 0 评论 0

Sumkor 发布了文章 · 3月1日

阅读 JDK 8 源码:HashMap 扩容总结及图解

在 Java8 中,HashMap 由数组+链表+红黑树组成的。扩容时,数组容量翻倍,数组中每一个桶中的多个节点(链结构或树结构)都需要 rehash 迁移到新的数组中去。
本文通过阅读 HashMap 的 resize 方法了解其扩容原理,对桶节点的迁移算法进行单元测试,画图以方便理解。

1. 扩容的时机

  1. HashMap 中 put 入第一个元素,初始化数组 table。
  2. HashMap 中的元素数量大于阈值 threshold。
threshold = capacity * load factor。
当 size > threshold 时,触发 rehash。

2. 扩容的源码

HashMap 中的 resize 方法主要包含两部分逻辑:

  1. 初始化数组 table,并设置阈值。
  2. 数组容量翻倍,将元素迁移到新数组。
/**
 * Initializes or doubles table size.  If null, allocates in
 * accord with initial capacity target held in field threshold.
 * Otherwise, because we are using power-of-two expansion, the
 * elements from each bin must either stay at same index, or move
 * with a power of two offset in the new table.
 *
 * @return the table
 */
final Node<K,V>[] resize() {
    Node<K,V>[] oldTab = table;
    int oldCap = (oldTab == null) ? 0 : oldTab.length;
    int oldThr = threshold;
    int newCap, newThr = 0;
    if (oldCap > 0) { // 第一次进来,table为null,oldCap为0,不会进入这里
        if (oldCap >= MAXIMUM_CAPACITY) { // 扩容前的数组大小如果已经达到最大(2^30)了
            threshold = Integer.MAX_VALUE; // 取整型最大值(2^31-1),这样以后就不会扩容了
            return oldTab;
        }
        else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && // oldCap翻倍得到newCap
                 oldCap >= DEFAULT_INITIAL_CAPACITY)
            newThr = oldThr << 1; // double threshold
    }
    else if (oldThr > 0) // initial capacity was placed in threshold // 第一次进来,如果手动设置了初始容量initialCapacity,这里为true,则将threshold作为初始容量
        newCap = oldThr;
    else {               // zero initial threshold signifies using defaults // 如果没有手动设置initialCapacity,则设为默认值16
        newCap = DEFAULT_INITIAL_CAPACITY;
        newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
    }
    if (newThr == 0) { // 第一次进来,这里必为true,重新计算 threshold = capacity * Load factor
        float ft = (float)newCap * loadFactor;
        newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                  (int)ft : Integer.MAX_VALUE);
    }
    threshold = newThr;
    @SuppressWarnings({"rawtypes","unchecked"})
        Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
    table = newTab;
    if (oldTab != null) { // 对oldTab中所有元素进行rehash。由于每次扩容是2次幂的扩展(指数组长度/桶数量扩为原来2倍),所以,元素的位置要么是在原位置,要么是在原位置再移动2次幂的位置
        for (int j = 0; j < oldCap; ++j) {
            Node<K,V> e;
            if ((e = oldTab[j]) != null) { // 数组j位置的元素不为空,需要该位置上的所有元素进行rehash
                oldTab[j] = null;
                if (e.next == null) // 桶中只有一个元素,则直接rehash
                    newTab[e.hash & (newCap - 1)] = e;
                else if (e instanceof TreeNode) // 桶中是树结构
                    ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                else { // preserve order // 桶中是链表结构(JDK1.7中旧链表迁移新链表的时候,用的是头插法,如果在新表的数组索引位置相同,则链表元素会倒置;但是JDK1.8不会倒置,用的是双指针)
                    Node<K,V> loHead = null, loTail = null; // low位链表,其桶位置不变,head和tail分别代表首尾指针
                    Node<K,V> hiHead = null, hiTail = null; // high位链表,其桶位于追加后的新数组中
                    Node<K,V> next;
                    do {
                        next = e.next;
                        if ((e.hash & oldCap) == 0) { // 是0的话索引没变,是1的话索引变成“原索引+oldCap”
                            if (loTail == null)
                                loHead = e; // 总是指向头结点
                            else
                                loTail.next = e; // 该操作有可能会改变原链表结构
                            loTail = e; // 总是指向下一个节点,直到尾节点
                        }
                        else {
                            if (hiTail == null)
                                hiHead = e;
                            else
                                hiTail.next = e;
                            hiTail = e;
                        }
                    } while ((e = next) != null);
                    if (loTail != null) {
                        loTail.next = null;
                        newTab[j] = loHead; // 原索引
                    }
                    if (hiTail != null) {
                        hiTail.next = null;
                        newTab[j + oldCap] = hiHead; // 原索引+oldCap
                    }
                }
            }
        }
    }
    return newTab;
}

在 Java8 中,HashMap 中的桶可能是链表结构,也可能是树结构。
从网上找来一张图,直观展示 HashMap 结构:
HashMap

如果是链结构

将旧链表拆分成两条新的链表,通过 e.hash & oldCap 来计算新链表在扩容后的数组中的新下标。
当 e.hash & oldCap = 0,则节点在新数组中的索引值与旧索引值相同。
当 e.hash & oldCap = 1,则节点在新数组中的索引值为旧索引值+旧数组容量。
e.hash & oldCap 公式的推导见上一篇文章 《HashMap中的取模和扩容公式推导》

如果是树结构

HashMap 对树结构的定义如下:

static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;
    TreeNode(int hash, K key, V val, Node<K,V> next) {
        super(hash, key, val, next);
    }
}    

需要明确的是:TreeNode 既是一个红黑树结构,也是一个双链表结构。

判断节点 e instanceof TreeNode 为 true,则调用 HashMap.TreeNode#split 方法对树进行拆分,而拆分主要用的是 TreeNode 的链表属性
拆分代码如下:

final void split(HashMap<K,V> map, Node<K,V>[] tab, int index, int bit) {
    TreeNode<K,V> b = this;
    // Relink into lo and hi lists, preserving order
    TreeNode<K,V> loHead = null, loTail = null;
    TreeNode<K,V> hiHead = null, hiTail = null;
    int lc = 0, hc = 0; // 用于决定红黑树是否要转回链表
    for (TreeNode<K,V> e = b, next; e != null; e = next) { // 对节点e进行遍历(首先明确:TreeNode既是一个红黑树结构,也是一个双链表结构)
        next = (TreeNode<K,V>)e.next;
        e.next = null; // 把e的下一个节点赋值给next后,断开e与e.next节点
        if ((e.hash & bit) == 0) { // 原索引
            if ((e.prev = loTail) == null)
                loHead = e;
            else
                loTail.next = e;
            loTail = e;
            ++lc;
        }
        else { // 原索引 + oldCap
            if ((e.prev = hiTail) == null)
                hiHead = e;
            else
                hiTail.next = e;
            hiTail = e;
            ++hc;
        }
    }

    if (loHead != null) {
        if (lc <= UNTREEIFY_THRESHOLD)
            tab[index] = loHead.untreeify(map); // 转为链结构
        else {
            tab[index] = loHead;
            if (hiHead != null) // (else is already treeified)
                loHead.treeify(tab); // 转换成树结构
        }
    }
    if (hiHead != null) {
        if (hc <= UNTREEIFY_THRESHOLD)
            tab[index + bit] = hiHead.untreeify(map);
        else {
            tab[index + bit] = hiHead;
            if (loHead != null)
                hiHead.treeify(tab);
        }
    }
}

3. 链表迁移算法

Java8 中如何迁移桶中的链表呢?
这里构建一个链表 A->B->C 来进行调试,应用 HashMap 中的扩容算法,扩容的时候,会将该链表拆分成两个新的链表。

单元测试代码如下:

/**
 * 旧链表数据迁移至新链表
 * 本例中,桶的数量由1扩容为2.
 * 
 * @author Sumkor https://segmentfault.com/blog/sumkor
 * @since 2021/2/28
 */
@Test
public void resizeLink() {
    int oldCap = 1;
    int newCap = 2;
    Node[] oldTable = new Node[oldCap];
    Node[] newTable = new Node[newCap];

    // A -> B -> C
    Node firstLinkNode03 = new Node(new Integer(3).hashCode(), 3, "C", null);
    Node firstLinkNode02 = new Node(new Integer(2).hashCode(), 2, "B", firstLinkNode03);
    Node firstLinkNode01 = new Node(new Integer(1).hashCode(), 1, "A", firstLinkNode02);
    oldTable[0] = firstLinkNode01;
    // print
    System.out.println("--------------------------");
    System.out.println("原链表:");
    printTable(oldTable);
    System.out.println("--------------------------");

    /**
     * HashMap中resize迁移算法
     * @see HashMap#resize()
     */
    for (int j = 0; j < oldCap; ++j) {
        Node loHead = null, loTail = null; // low位链表,其桶位置不变,head和tail分别代表首尾指针
        Node hiHead = null, hiTail = null; // high位链表,其桶位于追加后的新数组中
        Node e = oldTable[j];// 将要处理的元素
        Node next;
        do {
            next = e.next;
            if ((e.hash & oldCap) == 0) { // 是0的话索引没变,是1的话索引变成“原索引+oldCap”
                if (loTail == null)
                    loHead = e; // 总是指向头结点
                else
                    loTail.next = e; // 把loTail.next指向e。
                loTail = e;
            } else {
                if (hiTail == null)
                    hiHead = e;
                else
                    hiTail.next = e; // 把hiTail.next指向e。若hiTail.next原先并不指向e,该操作会改变oldTable[j]上的旧链表结构
                hiTail = e; // 把hiTail指向e所指向的节点
            }
        } while ((e = next) != null);
        if (loTail != null) {
            loTail.next = null; // 这一步是必须的,loTail.next有可能还其他节点,需要设为null
            newTable[j] = loHead; // 原索引
        }
        if (hiTail != null) {
            hiTail.next = null;
            newTable[j + oldCap] = hiHead; // 原索引+oldCap
        }
    }
    System.out.println("新链表:");
    printTable(newTable);
    System.out.println("--------------------------");
}

其中,采用 HashMap 中的 Node 结构作为链表节点:

/**
 * HashMap 中的 Node 结构
 */
static class Node<K, V> implements Map.Entry<K, V> {
    final int hash;
    final K key;
    V value;
    Node<K, V> next;

    Node(int hash, K key, V value, Node<K, V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }

    public final K getKey() {
        return key;
    }

    public final V getValue() {
        return value;
    }

    public final String toString() {
        return key + "=" + value;
    }

    public final int hashCode() {
        return Objects.hashCode(key) ^ Objects.hashCode(value);
    }

    public final V setValue(V newValue) {
        V oldValue = value;
        value = newValue;
        return oldValue;
    }

    public final boolean equals(Object o) {
        if (o == this)
            return true;
        if (o instanceof Map.Entry) {
            Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
            if (Objects.equals(key, e.getKey()) &&
                    Objects.equals(value, e.getValue()))
                return true;
        }
        return false;
    }
}

对数组 table 上的链表结构进行打印:

/**
 * HashMap 中的 Node 结构,打印
 */
private void printTable(Node[] table) {
    for (int i = 0; i < table.length; i++) {
        Node tmpNode = table[i];// 用于打印,不改变table的结构
        while (tmpNode != null) {
            System.out.print(tmpNode + " -> ");
            tmpNode = tmpNode.next;
        }
        System.out.println();
    }
}

执行结果

--------------------------
原链表:
1=A -> 2=B -> 3=C -> 
--------------------------
新链表:
2=B -> 
1=A -> 3=C -> 
--------------------------

注意到,迁移之后,节点 C 依旧排在节点 A 之后,而不是反过来。
在 Java8 中,HashMap 插入元素使用尾插法,扩容时使用了首尾指针保证了链表元素顺序不会倒置,从而解决了 Java7 扩容时产生的环问题。

执行过程图示

第一、二次循环之后,高低位链表指针如下:

step 01-02

第三次循环,由于 hiTail != null,因此执行 hiTail.next = e,注意此时 B 依旧指向 C。

step 03

接着执行 hiTail = e,把 hiTail 指向 e 所在节点。
最后执行 loTail.next = nullhiTail.next = null,把尾指针都指向 null。

step 04


作者:Sumkor
链接:https://segmentfault.com/a/11...

查看原文

赞 0 收藏 0 评论 0

Sumkor 收藏了问题 · 3月1日

第三方登录token不怕被别人拿走冒充登录吗?

比如登录的时候浏览器开发者工具看到登录成功换取的token,有这个token了,如果别人知道了,不就可以拿着这个token冒充我去做登录后的其他任何操作吗?即使有效期2小时,2小时内不是也能做很多操作吗?这种安全如何保障呢?

Sumkor 关注了问题 · 3月1日

第三方登录token不怕被别人拿走冒充登录吗?

比如登录的时候浏览器开发者工具看到登录成功换取的token,有这个token了,如果别人知道了,不就可以拿着这个token冒充我去做登录后的其他任何操作吗?即使有效期2小时,2小时内不是也能做很多操作吗?这种安全如何保障呢?

关注 6 回答 5

认证与成就

  • 获得 15 次点赞
  • 获得 1 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2018-03-19
个人主页被 978 人浏览