老马啸西风

老马啸西风 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织 houbb.github.io/ 编辑
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

老马啸西风 发布了文章 · 10月21日

ReentrantLock 可重入锁这样学,面试没烦恼,下班走得早

ReentrantLock+可重入锁

为什么需要 ReentrantLock ?

既生 synchronized,何生 ReentrantLock

每一个接触过多线程的 java coder 肯定都知道 synchronized 关键字,那为什么还需要 ReentrantLock 呢?

其实这就是 ReentrantLock 与 synchronized 对比的优势问题:

(1)ReentrantLock 使用起来更来更加灵活。我们在需要控制的地方,可以灵活指定加锁或者解锁。

这可以让加锁的范围更小,记住老马的一句话,更小往往意味着更快

(2)ReentrantLock 提供了公平锁、非公平锁等多种方法特性,这些都是 synchronized 关键字无法提供的。

接下来,就让我们一起来学习一下 ReentrantLock 可重入锁吧。

可重入锁

ReentrantLock 使用

线程定义

创建一个可重入锁线程。

/**
 * @author 老马啸西风
 */
public class ReconnectThread extends Thread {

    /**
     * 声明可重入锁
     */
    private static final ReentrantLock reentrantLock = new ReentrantLock();


    /**
     * 用于标识当前线程
     */
    private String name;

    public ReconnectThread(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        reentrantLock.lock();

        try {
            for (int i = 0; i < 5; i++) {
                System.out.println(name+" "+i+" times");
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }

    }
}

测试

  • Test
/**
 * @author 老马啸西风
 */
public static void main(String[] args) {
    Thread one = new ReconnectThread("one");
    Thread two = new ReconnectThread("two");
    one.start();
    two.start();
}
  • result

根据结果可知。两个必须要等待另外一个执行完成才能运行。

two 0 times
two 1 times
two 2 times
two 3 times
two 4 times
one 0 times
one 1 times
one 2 times
one 3 times
one 4 times

锁的释放和获取

锁是 java 并发编程中最重要的同步机制。

锁除了让临界区互斥执行外,还可以让释放锁的线程向获取同一个锁的线程发送消息。

实例

  • MonitorExample.java
/**
 * @author 老马啸西风
 */
class MonitorExample {
    int a = 0;

    public synchronized void writer() {  //1
        a++;                             //2
    }                                    //3

    public synchronized void reader() {  //4
        int i = a;                       //5
        //……
    }                                    //6
}

假设线程 A 执行 writer() 方法,随后线程 B 执行 reader() 方法。

根据 happens-before 规则,这个过程包含的 happens-before 关系可以分为两类:

  • 根据程序次序规则,1 happens before 2, 2 happens before 3; 4 happens before 5, 5 happens before 6。
  • 根据监视器锁规则,3 happens before 4。
  • 根据 happens before 的传递性,2 happens before 5。

因此,线程 A 在释放锁之前所有可见的共享变量,在线程 B 获取同一个锁之后,将立刻变得对 B 线程可见。

锁释放和获取的内存语义

当线程释放锁时,JMM 会把该线程对应的本地内存中的共享变量刷新到主内存中。

以上面的 MonitorExample 程序为例,A 线程释放锁后,共享数据的状态示意图如下:

  • 线程 A
本地内存 A: a = 1;

(写入到主内存)

主内存:a = 1;

当线程获取锁时,JMM 会把该线程对应的本地内存置为无效。

从而使得被监视器保护的临界区代码必须要从主内存中去读取共享变量。

下面是锁获取的状态过程:

在线程 A 写入主内存之后。

线程之间通信:线程 A 向 B 发送消息

  • 线程 B
主内存:a = 1;

(从主内存中读取)

本地内存 B: a = 1;

和 volatile 内存语义对比

对比锁释放-获取的内存语义与 volatile 写-读的内存语义,

可以看出:锁释放与 volatile 写有相同的内存语义;锁获取与 volatile 读有相同的内存语义

内存语义小结

下面对锁释放和锁获取的内存语义做个总结:

  • 线程 A 释放一个锁,实质上是线程A向接下来将要获取这个锁的某个线程发出了(线程A对共享变量所做修改的)消息。
  • 线程 B 获取一个锁,实质上是线程 B 接收了之前某个线程发出的(在释放这个锁之前对共享变量所做修改的)消息。
  • 线程 A 释放锁,随后线程 B 获取这个锁,这个过程实质上是线程 A 通过主内存向线程 B 发送消息。

锁内存语义的实现

本文将借助 ReentrantLock 的源代码,来分析锁内存语义的具体实现机制。

  • ReentrantLockExample.java
/**
 * @author 老马啸西风
 */
class ReentrantLockExample {

    int a = 0;

    ReentrantLock lock = new ReentrantLock();

    public void writer() {
        lock.lock();         //获取锁
        try {
            a++;
        } finally {
            lock.unlock();  //释放锁
        }
    }

    public void reader () {
        lock.lock();        //获取锁
        try {
            int i = a;
            //……
        } finally {
            lock.unlock();  //释放锁
        }
    }
}

在 ReentrantLock 中,调用 lock() 方法获取锁;调用 unlock() 方法释放锁。

源码实现

ReentrantLock 的实现依赖于 java 同步器框架 AbstractQueuedSynchronizer(本文简称之为AQS)。

AQS 使用一个整型的 volatile 变量(命名为state)来维护同步状态,马上我们会看到,这个 volatile 变量是 ReentrantLock 内存语义实现的关键。

/**
 * @author 老马啸西风
 */
public class ReentrantLock implements Lock, java.io.Serializable {

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //...
    }

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        //...
    }

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        //...        
    }
}

公平锁

lock()

使用公平锁时,加锁方法lock()的方法调用轨迹如下:

  1. ReentrantLock : lock()
  2. FairSync : lock()
  3. AbstractQueuedSynchronizer : acquire(int arg)
  4. ReentrantLock : tryAcquire(int acquires)

在第 4 步真正开始加锁,下面是该方法的源代码(JDK 1.8):

/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 * @author 老马啸西风
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

加锁方法首先读 volatile 变量 state。

unlock()

在使用公平锁时,解锁方法unlock()的方法调用轨迹如下:

  1. ReentrantLock : unlock()
  2. AbstractQueuedSynchronizer : release(int arg)
  3. Sync : tryRelease(int releases)

在第 3 步真正开始释放锁,下面是该方法的源代码:

/**
 * @author 老马啸西风
 */
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

在释放锁的最后写 volatile 变量 state。

公平锁在释放锁的最后写 volatile 变量 state;在获取锁时首先读这个 volatile 变量。

根据 volatile 的 happens-before 规则,释放锁的线程在写 volatile 变量之前可见的共享变量,在获取锁的线程读取同一个 volatile 变量后将立即变的对获取锁的线程可见。

非公平锁

非公平锁的释放和公平锁完全一样,所以这里仅仅分析非公平锁的获取。

lock()

使用非公平锁时,加锁方法lock()的方法调用轨迹如下:

  1. ReentrantLock : lock()
  2. NonfairSync : lock()
  3. AbstractQueuedSynchronizer : compareAndSetState(int expect, int update)

在第 3 步真正开始加锁,下面是该方法的源代码:

/**
 * Atomically sets synchronization state to the given updated
 * value if the current state value equals the expected value.
 * This operation has memory semantics of a {@code volatile} read
 * and write.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that the actual
 *         value was not equal to the expected value.
 * @author 老马啸西风
 */
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

该方法以原子操作的方式更新 state 变量,本文把 java 的 compareAndSet() 方法调用简称为CAS。

JDK文档对该方法的说明如下:如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值。此操作具有 volatile 读和写的内存语义

内存语义总结

现在对公平锁和非公平锁的内存语义做个总结:

  • 公平锁和非公平锁释放时,最后都要写一个 volatile 变量 state。
  • 公平锁获取时,首先会去读这个 volatile 变量。
  • 非公平锁获取时,首先会用 CAS 更新这个 volatile 变量,这个操作同时具有 volatile 读和 volatile 写的内存语义。

从本文对 ReentrantLock 的分析可以看出,锁释放-获取的内存语义的实现至少有下面两种方式:

  1. 利用 volatile 变量的写-读所具有的内存语义。
  2. 利用 CAS 所附带的 volatile 读和 volatile 写的内存语义。

小结

本文从介绍 ReentrantLock 使用案例开始,引出了锁的获取和释放的内存语义。

为了读者加深印象,对源码进行了简单的学习,下一节将对源码进行深入讲解。

秉着没有对比,就没有发现的原则,我们对比了 ReentrantLock 和 volatile 以及 synchronized 的差异性,便于读者正确地根据自己的场景选择合适的加锁策略。

希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。

各位极客的点赞收藏转发,是老马写作的最大动力!

深入学习

查看原文

赞 1 收藏 1 评论 0

老马啸西风 发布了文章 · 10月12日

java 手写并发框架(二)异步转同步框架封装锁策略

序言

上一节我们学习了异步查询转同步的 7 种实现方式,今天我们就来学习一下,如何对其进行封装,使其成为一个更加便于使用的工具。

思维导图如下:

异步转同步

拓展阅读

java 手写并发框架(1)异步查询转同步的 7 种实现方式

异步转同步的便利性

实现方式

  • 循环等待
  • wait & notify
  • 使用条件锁
  • 使用 CountDownLatch
  • 使用 CyclicBarrier
  • Future
  • Spring EventListener

上一节我们已经对上面的 7 种实现方式进行了详细的介绍,没有看过的同学可以去简单回顾一下。

但是这样个人觉得还是不够方便,懒惰是进步的阶梯。

更进一步简化

我们希望达到下面的效果:

@Sync
public String queryId() {
    System.out.println("开始查询");
    return id;
}

@SyncCallback(value = "queryId")
public void queryIdCallback() {
    System.out.println("回调函数执行");
    id = "123";
}

通过注解直接需要同步的方法,和回调的方法,代码中直接调用即可。

我们首先实现基于字节码增强的版本,后续将实现整合 spring, springboot 的版本。

锁的代码实现

锁的定义

我们将原来的实现抽象为加锁和解锁,为了便于拓展,接口定义如下:

package com.github.houbb.sync.api.api;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public interface ISyncLock {

    /**
     * 等待策略
     * @param context 上下文
     * @since 0.0.1
     */
    void lock(final ISyncLockContext context);

    /**
     * 解锁策略
     * @param context 上下文
     * @since 0.0.1
     */
    void unlock(final ISyncUnlockContext context);

}

其中上下文加锁和解锁做了区分,不过暂时内容是一样的。

主要是超时时间和单位:

package com.github.houbb.sync.api.api;

import java.util.concurrent.TimeUnit;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public interface ISyncLockContext {

    /**
     * 超时时间
     * @return 结果
     */
    long timeout();

    /**
     * 超时时间单位
     * @return 结果
     */
    TimeUnit timeUnit();

}

锁策略实现

我们本节主要实现下上一节中的几种锁实现。

目前我们选择其中的是个进行实现:

wait & notify

package com.github.houbb.sync.core.support.lock;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sync.api.api.ISyncLock;
import com.github.houbb.sync.api.api.ISyncLockContext;
import com.github.houbb.sync.api.api.ISyncUnlockContext;
import com.github.houbb.sync.api.exception.SyncRuntimeException;

/**
 * 等待通知同步
 *
 * @author binbin.hou
 * @since 0.0.1
 */
public class WaitNotifyLock implements ISyncLock {

    private static final Log log = LogFactory.getLog(WaitNotifyLock.class);

    /**
     * 声明对象
     */
    private final Object lock = new Object();

    @Override
    public synchronized void lock(ISyncLockContext context) {
        synchronized (lock) {
            try {
                long timeoutMills = context.timeUnit().toMillis(context.timeout());
                log.info("进入等待,超时时间为:{}ms", timeoutMills);
                lock.wait(timeoutMills);
            } catch (InterruptedException e) {
                log.error("中断异常", e);
                throw new SyncRuntimeException(e);
            }
        }
    }

    @Override
    public void unlock(ISyncUnlockContext context) {
        synchronized (lock) {
            log.info("唤醒所有等待线程");
            lock.notifyAll();
        }
    }

}

加锁的部分比较简单,我们从上下文中获取超时时间和超时单位,直接和上一节内容类似,调用即可。

至于上下文中的信息是怎么来的,我们后续就会讲解。

条件锁实现

这个在有了上一节的基础之后也非常简单。

核心流程:

(1)创建锁

(2)获取锁的 condition

(3)执行加锁和解锁

package com.github.houbb.sync.core.support.lock;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sync.api.api.ISyncLock;
import com.github.houbb.sync.api.api.ISyncLockContext;
import com.github.houbb.sync.api.api.ISyncUnlockContext;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 等待通知同步
 *
 * @author binbin.hou
 * @since 0.0.1
 */
public class LockConditionLock implements ISyncLock {

    private static final Log log = LogFactory.getLog(LockConditionLock.class);

    private final Lock lock = new ReentrantLock();

    private final Condition condition = lock.newCondition();

    @Override
    public synchronized void lock(ISyncLockContext context) {
        lock.lock();
        try{
            log.info("程序进入锁定状态");
            condition.await(context.timeout(), context.timeUnit());
        } catch (InterruptedException e) {
            log.error("程序锁定状态异常", e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void unlock(ISyncUnlockContext context) {
        lock.lock();
        try{
            log.info("解锁状态,唤醒所有等待线程。");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

}

CountDownLatch 实现

package com.github.houbb.sync.core.support.lock;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sync.api.api.ISyncLock;
import com.github.houbb.sync.api.api.ISyncLockContext;
import com.github.houbb.sync.api.api.ISyncUnlockContext;

import java.util.concurrent.CountDownLatch;

/**
 * 等待通知同步
 *
 * @author binbin.hou
 * @since 0.0.1
 */
public class CountDownLatchLock implements ISyncLock {

    private static final Log log = LogFactory.getLog(CountDownLatchLock.class);

    /**
     * 闭锁
     * 调用1次,后续方法即可通行。
     */
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    @Override
    public synchronized void lock(ISyncLockContext context) {
        countDownLatch = new CountDownLatch(1);

        try {
            log.info("进入等待,超时时间为:{},超时单位:{}", context.timeout(),
                    context.timeUnit());
            boolean result = countDownLatch.await(context.timeout(), context.timeUnit());
            log.info("等待结果: {}", result);
        } catch (InterruptedException e) {
            log.error("锁中断异常", e);
        }
    }

    @Override
    public void unlock(ISyncUnlockContext context) {
        log.info("执行 unlock 操作");
        countDownLatch.countDown();
    }

}

注意:这里为了保证 countDownLatch 可以多次使用,我们在每一次加锁的时候,都会重新创建 CountDownLatch。

CyclicBarrierLock 锁实现

package com.github.houbb.sync.core.support.lock;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.sync.api.api.ISyncLock;
import com.github.houbb.sync.api.api.ISyncLockContext;
import com.github.houbb.sync.api.api.ISyncUnlockContext;
import com.github.houbb.sync.api.exception.SyncRuntimeException;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeoutException;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public class CyclicBarrierLock implements ISyncLock {

    private static final Log log = LogFactory.getLog(CyclicBarrierLock.class);

    private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    @Override
    public synchronized void lock(ISyncLockContext context) {
        try {
            log.info("进入锁定状态, timeout:{}, timeunit: {}",
                    context.timeout(), context.timeUnit());
            cyclicBarrier.await(context.timeout(), context.timeUnit());

            log.info("重置 cyclicBarrier");
            cyclicBarrier.reset();
        } catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
            log.error("锁定时遇到异常", e);
            throw new SyncRuntimeException(e);
        }
    }

    @Override
    public void unlock(ISyncUnlockContext context) {
        try {
            log.info("解锁信息");
            cyclicBarrier.await(context.timeout(), context.timeUnit());
        } catch (InterruptedException | TimeoutException | BrokenBarrierException e) {
            log.error("解锁时遇到异常", e);
        }
    }

}

这里和 CountDownLatchLock 的实现非常类似,不过 CyclicBarrier 有一个好处,就是可以复用。

我们在每一次解锁之后,重置一下栅栏:

log.info("重置 cyclicBarrier");
cyclicBarrier.reset();

锁的工具类

为了简单的生成上述几种锁的实例,我们提供了一个简单的工具类方法:

package com.github.houbb.sync.core.support.lock;

import com.github.houbb.heaven.support.instance.impl.Instances;
import com.github.houbb.sync.api.api.ISyncLock;
import com.github.houbb.sync.api.constant.LockType;

import java.util.HashMap;
import java.util.Map;

/**
 * 锁策略
 * @author binbin.hou
 * @since 0.0.1
 */
public final class Locks {

    private Locks(){}

    /**
     * MAP 信息
     * @since 0.0.1
     */
    private static final Map<LockType, ISyncLock> MAP = new HashMap<>();

    static {
        MAP.put(LockType.WAIT_NOTIFY, waitNotify());
        MAP.put(LockType.COUNT_DOWN_LATCH, countDownLatch());
        MAP.put(LockType.CYCLIC_BARRIER, cyclicBarrier());
        MAP.put(LockType.LOCK_CONDITION, lockCondition());
    }

    /**
     * 获取锁实现
     * @param lockType 锁类型
     * @return 实现
     * @since 0.0.1
     */
    public static ISyncLock getLock(final LockType lockType) {
        return MAP.get(lockType);
    }

    /**
     * @since 0.0.1
     * @return 实现
     */
    private static ISyncLock waitNotify() {
        return Instances.singleton(WaitNotifyLock.class);
    }

    /**
     * @since 0.0.1
     * @return 实现
     */
    private static ISyncLock countDownLatch() {
        return Instances.singleton(CountDownLatchLock.class);
    }

    /**
     * @since 0.0.1
     * @return 实现
     */
    private static ISyncLock lockCondition() {
        return Instances.singleton(LockConditionLock.class);
    }

    /**
     * @since 0.0.1
     * @return 实现
     */
    private static ISyncLock cyclicBarrier() {
        return Instances.singleton(CyclicBarrierLock.class);
    }

}

上述的锁实现都是线程安全的,所以全部使用单例模式创建。

LockType 类是一个锁的枚举类,会在注解中使用。

小结

好了,到这里我们就把上一节中的常见的 4 种锁策略就封装完成了。

你可能好奇上下文的时间信息哪里来?这些锁又是如何被调用的?

我们将通过注解+字节码增强的方式来实现调用(就是 aop 的原理),由于篇幅原因,字节码篇幅较长,为了阅读体验,实现部分将放在下一节。

感兴趣的可以关注一下,便于实时接收最新内容。

觉得本文对你有帮助的话,欢迎点赞评论收藏转发一波。你的鼓励,是我最大的动力~

不知道你有哪些收获呢?或者有其他更多的想法,欢迎留言区和我一起讨论,期待与你的思考相遇。

文中如果链接失效,可以点击 {阅读原文}。

深入学习

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 10月11日

从零手写缓存框架(14)redis渐进式rehash详

redis 的 rehash 设计

本文思维导图如下:

"redis+渐进式+rehash

HashMap 的 rehash 回顾

读过 HashMap 源码的同学,应该都知道 map 在扩容的时候,有一个 rehash 的过程。

没有读过也没有关系,可以花时间阅读下 从零开始手写 redis(13) HashMap源码详解 简单了解下整个过程即可。

HashMap 的扩容简介

这里简单介绍下:

扩容(resize)就是重新计算容量,向HashMap对象里不停的添加元素,而HashMap对象内部的数组无法装载更多的元素时,对象就需要扩大数组的长度,以便能装入更多的元素。

当然Java里的数组是无法自动扩容的,方法是使用一个新的数组代替已有的容量小的数组,就像我们用一个小桶装水,如果想装更多的水,就得换大水桶。

redis 中的扩容设计

HashMap 的扩容需要对集合中大部分的元素进行重新计算,但是对于 redis 这种企业级应用,特别是单线程的应用,如果像传统的 rehash 一样把所有元素来一遍的话,估计要十几秒的时间。

十几秒对于常见的金融、电商等相对高并发的业务场景,是无法忍受的。

那么 redis 的 rehash 是如何实现的呢?

实际上 redis 的 rehash 动作并不是一次性、集中式地完成的, 而是分多次、渐进式地完成的

这里补充一点,不单单是扩容,缩容也是一样的道理,二者都需要进行 rehash。

只增不降就是对内存的浪费,浪费就是犯罪,特别是内存还这么贵。

ps: 这种思想和 key 淘汰有异曲同工之妙,一口吃不了一个大胖子,一次搞不定,那就 1024 次,慢慢来总能解决问题。

Redis 的渐进式 rehash

这部分直接选自经典入门书籍《Redis 设计与实现》

为什么要渐进式处理?

实际上 redis 内部有两个 hashtable,我们称之为 ht[0] 和 ht[1]。传统的 HashMap 中只有一个。

为了避免 rehash 对服务器性能造成影响, 服务器不是一次性将 ht[0] 里面的所有键值对全部 rehash 到 ht[1] , 而是分多次、渐进式地将 ht[0] 里面的键值对慢慢地 rehash 到 ht[1] 。

详细步骤

哈希表渐进式 rehash 的详细步骤:

(1)为 ht[1] 分配空间, 让字典同时持有 ht[0] 和 ht[1] 两个哈希表。

(2)在字典中维持一个索引计数器变量 rehashidx , 并将它的值设置为 0 , 表示 rehash 工作正式开始。

(3)在 rehash 进行期间, 每次对字典执行添加、删除、查找或者更新操作时, 程序除了执行指定的操作以外, 还会顺带将 ht[0] 哈希表在 rehashidx 索引上的所有键值对 rehash 到 ht[1] , 当 rehash 工作完成之后, 程序将 rehashidx 属性的值增1。

(4)随着字典操作的不断执行, 最终在某个时间点上, ht[0] 的所有键值对都会被 rehash 至 ht[1] , 这时程序将 rehashidx 属性的值设为 -1 , 表示 rehash 操作已完成。

渐进式 rehash 的好处在于它采取分而治之的方式, 将 rehash 键值对所需的计算工作均滩到对字典的每个添加、删除、查找和更新操作上, 从而避免了集中式 rehash 而带来的庞大计算量。

rehash 间的操作怎么兼容呢?

因为在进行渐进式 rehash 的过程中, 字典会同时使用 ht[0] 和 ht[1] 两个哈希表, 那这期间的操作如何保证正常进行呢?

(1)查询一个信息

这个类似于我们的数据库信息等迁移,先查询一个库,没有的话,再去查询另一个库。

ht[0] 中没找到,我们去 ht[1] 中查询即可。

(2)新数据怎么办?

这个和数据迁移一样的道理。

当我们有新旧的两个系统时,新来的用户等信息直接落在新系统即可,

这一措施保证了 ht[0] 包含的键值对数量会只减不增, 并随着 rehash 操作的执行而最终变成空表。

一图胜千言

我们来看图:

(1)准备 rehash

输入图片说明

(2)rehash index=0

输入图片说明

(3)rehash index=1

输入图片说明

(4)rehash index=2

输入图片说明

(5)rehash index=3

输入图片说明

(6)rehash 完成

输入图片说明

缩容扩容的思考

看完了上面的流程,不知道你对 rehash 是否有一个大概了思路呢?

下面让我们来一起思考下几个缩扩容的问题。

输入图片说明

什么时候扩容呢?

什么时候判断?

redis 在每次执行 put 操作的时候,就可以检查是否需要扩容。

其实也很好理解,put 插入元素的时候,判断是否需要扩容,然后开始扩容,是直接的一种思路。

留一个思考题:我们可以在其他的时候判断吗?

redis 判断是否需要扩容的源码

/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    // 如果正在进行渐进式扩容,则返回OK
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
    // 如果哈希表ht[0]的大小为0,则初始化字典
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */
    /*
     * 如果哈希表ht[0]中保存的key个数与哈希表大小的比例已经达到1:1,即保存的节点数已经大于哈希表大小
     * 且redis服务当前允许执行rehash,或者保存的节点数与哈希表大小的比例超过了安全阈值(默认值为5)
     * 则将哈希表大小扩容为原来的两倍
     */
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

扩容的条件总结下来就是两句话:

(1)服务器目前没有在执行 BGSAVE/BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 1;

(2)服务器目前正在执行 BGSAVE/BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 5;

这里其实体现了作者的一种设计思想:如果负载因子超过5,说明信息已经很多了,管你在不在保存,都要执行扩容,优先保证服务可用性。如果没那么高,那就等持久化完成再做 rehash。

我们自己在实现的时候可以简化一下,比如只考虑情况2。

扩容到原来的多少?

知道了什么时候应该开始扩容,但是要扩容到多大也是值得思考的一个问题。

扩容的太小,会导致频繁扩容,浪费性能。

扩容的太大,会导致资源的浪费。

其实这个最好的方案是结合我们实际的业务,不过这部分对用户是透明的。

一般是扩容为原来的两倍。

为什么需要扩容?

我们在实现 ArrayList 的时候需要扩容,因为数据放不下了。

我们知道 HashMap 的底层是数组 + 链表(红黑树)的数据结构。

那么会存在放不下的情况吗?

个人理解实际上不会。因为链表可以一直加下去。

那为什么需要扩容呢?

实际上更多的是处于性能的考虑。我们使用 HashMap 就是为了提升性能,如果一直不扩容,可以理解为元素都 hash 到相同的 bucket 上,这时就退化成了一个链表。

这会导致查询等操作性能大大降低。

什么时候缩容呢?

何时判断

看了前面的扩容,我们比较直观地方式是在用户 remove 元素的时候执行是否需要缩容。

不过 redis 并不完全等同于传统的 HashMap,还有数据的淘汰和过期,这些是对用户透明的。

redis 采用的方式实际上是一个定时任务。

个人理解内存缩容很重要,但是没有那么紧急,我们可以 1min 扫描一次,这样可以节省机器资源。

实际工作中,一般 redis 的内存都是逐步上升的,或者稳定在一个范围内,很少去大批量删除数据。(除非数据搞错了,我就遇到过一次,数据同步错地方了)。

所以数据删除,一般几分钟内给用户一个反馈就行。

知其然,知其所以然。

我们懂得了这个道理也就懂得了为什么有时候删除 redis 的几百万 keys,内存也不是直接降下来的原因。

缩容的条件

/* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
 * we resize the hash table to save memory */
void tryResizeHashTables(int dbid) {
    if (htNeedsResize(server.db[dbid].dict))
        dictResize(server.db[dbid].dict);
    if (htNeedsResize(server.db[dbid].expires))
        dictResize(server.db[dbid].expires);
}

/* Hash table parameters */
#define HASHTABLE_MIN_FILL        10      /* Minimal hash table fill 10% */
int htNeedsResize(dict *dict) {
    long long size, used;

    size = dictSlots(dict);
    used = dictSize(dict);
    return (size > DICT_HT_INITIAL_SIZE &&
            (used*100/size < HASHTABLE_MIN_FILL));
}

/* Resize the table to the minimal size that contains all the elements,
 * but with the invariant of a USED/BUCKETS ratio near to <= 1 */
int dictResize(dict *d)
{
    int minimal;

    if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
    minimal = d->ht[0].used;
    if (minimal < DICT_HT_INITIAL_SIZE)
        minimal = DICT_HT_INITIAL_SIZE;
    return dictExpand(d, minimal);
}

和扩容类似,不过这里的缩容比例不是 5 倍,而是当哈希表保存的key数量与哈希表的大小的比例小于 10% 时需要缩容。

缩容到多少?

最简单的方式是直接变为原来的一半,不过这么做有时候也不是那么好用。

redis 是缩容后的大小为第一个大于等于当前key数量的2的n次方。

这个可能不太好理解,举几个数字就懂了:

keys数量缩容大小
34
44
58
916

主要保障以下3点:

(1)缩容之后,要大于等于 key 的数量

(2)尽可能的小,节约内存

(3)2 的倍数。

第三个看过 HashMap 源码讲解的小伙伴应该深有体会。

当然也不能太小,redis 限制的最小为 4。

实际上如果 redis 中只放 4 个 key,实在是杀鸡用牛刀,一般不会这么小。

我们在实现的时候,直接参考 jdk 好了,给个最小值限制 8。

为什么需要缩容?

最核心的目的就是为了节约内存,其实还有一个原因,叫 small means fast(小即是快——老马)。

渐进式 ReHash 实现的思考

好了,扩容和缩容就聊到这里,那么这个渐进式 rehash 到底怎么一个渐进法?

什么是渐进式

扩容前

不需要扩容时应该有至少需要初始化两个元素:

hashtable[0] = new HashTable(size);
hashIndex=-1;

hashtable[1] = null;

hashtable 中存储着当前的元素信息,hashIndex=-1 标识当前没有在进行扩容。

扩容准备

当需要扩容的时候,我们再去创建一个 hashtable[1],并且 size 是原来的 2倍。

hashtable[0] = new HashTable(size);

hashtable[1] = new HashTable(2 * size);

hashIndex=-1;

主要是为了节约内存,使用惰性初始化的方式创建 hashtable。

扩容时

调整 hashIndex=0...size,逐步去 rehash 到新的 hashtable[1]

新的插入全部放入到 hashtable[1]

扩容后

扩容后我们应该把 hashtable[0] 的值更新为 hashtable[1],并且释放掉 hashtable[1] 的资源。

并且设置 hashIndex=-1,标识已经 rehash 完成

hashtable[0] = hashtable[1];
hashIndex=-1;

hashtable[1] = null;

这样整体的实现思路就已经差不多了,光说不练假把式,我们下一节就来自己实现一个渐进式 rehash 的 HashMap。

至于现在,先让 rehash 的思路飞一会儿~

6

小结

本节我们对 redis rehash 的原理进行了讲解,其中也加入了不少自己的思考。

文章的结尾,也添加了简单的实现思路,当然实际实现还会有很多问题需要解决。

下一节我们将一起手写一个渐进式 rehash 的 HashMap,感兴趣的伙伴可以关注一波,即使获取最新动态~

开源地址:https://github.com/houbb/cache

觉得本文对你有帮助的话,欢迎点赞评论收藏关注一波。你的鼓励,是我最大的动力~

不知道你有哪些收获呢?或者有其他更多的想法,欢迎留言区和我一起讨论,期待与你的思考相遇。

深入学习

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 10月10日

从零开始手写缓存框架 redis(13)HashMap 源码原理详解

为什么学习 HashMap 源码?

作为一名 java 开发,基本上最常用的数据结构就是 HashMap 和 List,jdk 的 HashMap 设计还是非常值得深入学习的。

无论是在面试还是工作中,知道原理都对会我们有很大的帮助。

本篇的内容较长,建议先收藏,再细细品味。

不同于网上简单的源码分析,更多的是实现背后的设计思想。

涉及的内容比较广泛,从统计学中的泊松分布,到计算机基础的位运算,经典的红黑树、链表、数组等数据结构,也谈到了 Hash 函数的相关介绍,文末也引入了美团对于 HashMap 的源码分析,所以整体深度和广度都比较大。

思维导图如下:

思维导图

本文是两年前整理的,文中不免有疏漏过时的地方,欢迎大家提出宝贵的意见。

之所以这里拿出来,有以下几个目的:

(1)让读者理解 HashMap 的设计思想,知道 rehash 的过程。下一节我们将自己实现一个 HashMap

(2)为什么要自己实现 HashMap?

最近在手写 redis 框架,都说 redis 是一个特性更加强大的 Map,自然 HashMap 就是入门基础。Redis 高性能中一个过人之处的设计就是渐进式 rehash,和大家一起实现一个渐进式 rehash 的 map,更能体会和理解作者设计的巧妙。

想把常见的数据结构独立为一个开源工具,便于后期使用。比如这次手写 redis,循环链表,LRU map 等都是从零开始写的,不利于复用,还容易有 BUG。

好了,下面就让我们一起开始 HashMap 的源码之旅吧~

HashMap 源码

HashMap 是平时使用到非常多的一个集合类,感觉有必要深入学习一下。

首先尝试自己阅读一遍源码。

java 版本

$ java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)

数据结构

从结构实现来讲,HashMap是数组+链表+红黑树(JDK1.8增加了红黑树部分)实现的。

对于当前类的官方说明

基于哈希表实现的映射接口。这个实现提供了所有可选的映射操作,并允许空值和空键。(HashMap类大致相当于Hashtable,但它是非同步的,并且允许为空。)

这个类不保证映射的顺序;特别地,它不能保证顺序会随时间保持不变。

这个实现为基本操作(get和put)提供了恒定时间的性能,假设哈希函数将元素适当地分散在各个桶中。对集合视图的迭代需要与HashMap实例的“容量”(桶数)及其大小(键-值映射数)成比例的时间。因此,如果迭代性能很重要,那么不要将初始容量设置得太高(或者负载系数太低),这是非常重要的。

HashMap实例有两个影响其性能的参数: 初始容量和负载因子

容量是哈希表中的桶数,初始容量只是创建哈希表时的容量。负载因子是在哈希表的容量自动增加之前,哈希表被允许达到的最大容量的度量。当哈希表中的条目数量超过负载因子和当前容量的乘积时,哈希表就会被重新哈希(也就是说,重新构建内部数据结构),这样哈希表的桶数大约是原来的两倍。

一般来说,默认的负载因子(0.75)在时间和空间成本之间提供了很好的权衡。

较高的值减少了空间开销,但增加了查找成本(反映在HashMap类的大多数操作中,包括get和put)。在设置映射的初始容量时,应该考虑映射中的期望条目数及其负载因子,以最小化重哈希操作的数量。如果初始容量大于条目的最大数量除以负载因子,就不会发生重哈希操作。

如果要将许多映射存储在HashMap实例中,那么使用足够大的容量创建映射将使映射存储的效率更高,而不是让它根据需要执行自动重哈希以增长表。

注意,使用具有相同hashCode()的多个键确实可以降低任何散列表的性能。为了改善影响,当键具有可比性时,这个类可以使用键之间的比较顺序来帮助断开连接。

注意,这个实现不是同步的。如果多个线程并发地访问散列映射,并且至少有一个线程在结构上修改了映射,那么它必须在外部同步。(结构修改是添加或删除一个或多个映射的任何操作;仅更改与实例已经包含的键关联的值并不是结构修改。这通常是通过对自然封装映射的对象进行同步来完成的。

如果不存在这样的对象,则应该使用集合“包装” Collections.synchronizedMap 方法。这最好在创建时完成,以防止意外的对映射的非同步访问:

Map m = Collections.synchronizedMap(new HashMap(...));

这个类的所有“集合视图方法”返回的迭代器都是快速失败的:如果在创建迭代器之后的任何时候对映射进行结构上的修改,除了通过迭代器自己的remove方法,迭代器将抛出ConcurrentModificationException。因此,在并发修改的情况下,迭代器会快速而干净地失败,而不是在未来的不确定时间内冒着任意的、不确定的行为的风险。

注意,迭代器的快速故障行为不能得到保证,因为一般来说,在存在非同步并发修改的情况下,不可能做出任何硬性保证。快速失败迭代器以最佳的方式抛出ConcurrentModificationException。因此,编写依赖于此异常的程序来保证其正确性是错误的:迭代器的快速故障行为应该仅用于检测错误。

其他基础信息

  1. 这个类是Java集合框架的成员。
  2. @since 1.2
  3. java.util 包下

源码初探

接口

public class HashMap<K,V> extends AbstractMap<K,V>
    implements Map<K,V>, Cloneable, Serializable {}

当前类实现了三个接口,我们主要关心 Map 接口即可。

继承了一个抽象类 AbstractMap,这个暂时放在本节后面学习。

常量定义

默认初始化容量

/**
 * The default initial capacity - MUST be a power of two.
 */
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
  • 为什么不直接使用 16?

看了下 statckoverflow,感觉比较靠谱的解释是:

  1. 为了避免使用魔法数字,使得常量定义本身就具有自我解释的含义。
  2. 强调这个数必须是 2 的幂。
  • 为什么要是 2 的幂?

它是这样设计的,因为它允许使用快速位和操作将每个键的哈希代码包装到表的容量范围内,正如您在访问表的方法中看到的:

final Node<K,V> getNode(int hash, Object key) {
    Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (first = tab[(n - 1) & hash]) != null) { /// <-- bitwise 'AND' here
        ...

最大容量

隐式指定较高值时使用的最大容量。

由任何带有参数的构造函数。

必须是2的幂且小于 1<<30。

/**
* The maximum capacity, used if a higher value is implicitly specified
* by either of the constructors with arguments.
* MUST be a power of two <= 1<<30.
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
  • 为什么是 1 << 30?

当然了 interger 的最大容量为 2^31-1

除此之外,2**31是20亿,每个哈希条目需要一个对象作为条目本身,一个对象作为键,一个对象作为值。

在为应用程序中的其他内容分配空间之前,最小对象大小通常为24字节左右,因此这将是1440亿字节。

可以肯定地说,最大容量限制只是理论上的。

感觉实际内存也没这么大!

负载因子

当负载因子较大时,去给table数组扩容的可能性就会少,所以相对占用内存较少(空间上较少),但是每条entry链上的元素会相对较多,查询的时间也会增长(时间上较多)。

反之就是,负载因子较少的时候,给table数组扩容的可能性就高,那么内存空间占用就多,但是entry链上的元素就会相对较少,查出的时间也会减少。

所以才有了负载因子是时间和空间上的一种折中的说法。

所以设置负载因子的时候要考虑自己追求的是时间还是空间上的少。

/**
 * The load factor used when none specified in constructor.
 */
static final float DEFAULT_LOAD_FACTOR = 0.75f;
  • 为什么是 0.75,不是 0.8 或者 0.6

其实 hashmap 源码中有解释。

Because TreeNodes are about twice the size of regular nodes, we
use them only when bins contain enough nodes to warrant use
(see TREEIFY_THRESHOLD). And when they become too small (due to
removal or resizing) they are converted back to plain bins.  In
usages with well-distributed user hashCodes, tree bins are
rarely used.  Ideally, under random hashCodes, the frequency of
nodes in bins follows a Poisson distribution
(http://en.wikipedia.org/wiki/Poisson_distribution) with a
parameter of about 0.5 on average for the default resizing
threshold of 0.75, although with a large variance because of
resizing granularity. Ignoring variance, the expected
occurrences of list size k are (exp(-0.5) * pow(0.5, k) /
factorial(k)). The first values are:

0:    0.60653066
1:    0.30326533
2:    0.07581633
3:    0.01263606
4:    0.00157952
5:    0.00015795
6:    0.00001316
7:    0.00000094
8:    0.00000006
more: less than 1 in ten million

简单翻译一下就是在理想情况下,使用随机哈希码,节点出现的频率在hash桶中遵循泊松分布,同时给出了桶中元素个数和概率的对照表。

从上面的表中可以看到当桶中元素到达8个的时候,概率已经变得非常小,也就是说用0.75作为加载因子,每个碰撞位置的链表长度超过8个是几乎不可能的。

Poisson distribution —— 泊松分布

阈值

/**
 * The bin count threshold for using a tree rather than list for a
 * bin.  Bins are converted to trees when adding an element to a
 * bin with at least this many nodes. The value must be greater
 * than 2 and should be at least 8 to mesh with assumptions in
 * tree removal about conversion back to plain bins upon
 * shrinkage.
 */
static final int TREEIFY_THRESHOLD = 8;

/**
 * The bin count threshold for untreeifying a (split) bin during a
 * resize operation. Should be less than TREEIFY_THRESHOLD, and at
 * most 6 to mesh with shrinkage detection under removal.
 */
static final int UNTREEIFY_THRESHOLD = 6;

/**
 * The smallest table capacity for which bins may be treeified.
 * (Otherwise the table is resized if too many nodes in a bin.)
 * Should be at least 4 * TREEIFY_THRESHOLD to avoid conflicts
 * between resizing and treeification thresholds.
 */
static final int MIN_TREEIFY_CAPACITY = 64;

TREEIFY_THRESHOLD

使用红黑树而不是列表的bin count阈值。

当向具有至少这么多节点的bin中添加元素时,bin被转换为树。这个值必须大于2,并且应该至少为8,以便与树删除中关于收缩后转换回普通容器的假设相匹配。

UNTREEIFY_THRESHOLD

在调整大小操作期间取消(分割)存储库的存储计数阈值。

应小于TREEIFY_THRESHOLD,并最多6个网格与收缩检测下去除。

MIN_TREEIFY_CAPACITY

最小的表容量,可为容器进行树状排列。(否则,如果在一个bin中有太多节点,表将被调整大小。)

至少为 4 * TREEIFY_THRESHOLD,以避免调整大小和树化阈值之间的冲突。

Node

源码

  • Node.java

基础 hash 结点定义。

/**
 * Basic hash bin node, used for most entries.  (See below for
 * TreeNode subclass, and in LinkedHashMap for its Entry subclass.)
 */
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    V value;
    Node<K,V> next;
    Node(int hash, K key, V value, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }
    public final K getKey()        { return key; }
    public final V getValue()      { return value; }
    public final String toString() { return key + "=" + value; }
    public final int hashCode() {
        return Objects.hashCode(key) ^ Objects.hashCode(value);
    }
    public final V setValue(V newValue) {
        V oldValue = value;
        value = newValue;
        return oldValue;
    }
    public final boolean equals(Object o) {
        // 快速判断
        if (o == this)
            return true;

        // 类型判断    
        if (o instanceof Map.Entry) {
            Map.Entry<?,?> e = (Map.Entry<?,?>)o;
            if (Objects.equals(key, e.getKey()) &&
                Objects.equals(value, e.getValue()))
                return true;
        }
        return false;
    }
}

个人理解

四个核心元素:

final int hash; // hash 值
final K key;    // key
V value;    // value 值
Node<K,V> next; // 下一个元素结点

hash 值的算法

hash 算法如下。

直接 key/value 的异或(^)。

Objects.hashCode(key) ^ Objects.hashCode(value);

其中 hashCode() 方法如下:

public static int hashCode(Object o) {
    return o != null ? o.hashCode() : 0;
}

最后还是会调用对象本身的 hashCode() 算法。一般我们自己会定义。

静态工具类

hash

static final int hash(Object key) {
    int h;
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

为什么这么设计?

  • jdk8 自带解释

计算key.hashCode(),并将(XORs)的高比特位分散到低比特位。

因为表使用的是power-of-two掩蔽,所以只在当前掩码上方以位为单位变化的哈希总是会发生冲突。

(已知的例子中有一组浮点键,它们在小表中保存连续的整数。)

因此,我们应用了一种转换,将高比特的影响向下传播。

比特传播的速度、效用和质量之间存在权衡。

因为许多常见的散列集已经合理分布(所以不要受益于传播),因为我们用树来处理大型的碰撞在垃圾箱,我们只是XOR一些改变以最便宜的方式来减少系统lossage,以及将最高位的影响,否则永远不会因为指数计算中使用的表。

  • 知乎的解释

这段代码叫扰动函数

HashMap扩容之前的数组初始大小才16。所以这个散列值是不能直接拿来用的。

用之前还要先做对数组的长度取模运算,得到的余数才能用来访问数组下标。

putVal 函数源码

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        if ((tab = table) == null || (n = tab.length) == 0)
            n = (tab = resize()).length;
        if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null);
        //...    
}

其中这一句 tab[i = (n - 1) & hash])

这一步就是在寻找桶的过程,就是上图总数组,根据容量取如果容量是16 对hash值取低16位,那么下标范围就在容量大小范围内了。

这里也就解释了为什么 hashmap 的大小需要为 2 的正整数幂,因为这样(数组长度-1)正好相当于一个“低位掩码”。

比如大小 16,则 (16-1) = 15 = 00000000 00000000 00001111(二进制);

    10100101 11000100 00100101
&    00000000 00000000 00001111
-------------------------------
    00000000 00000000 00000101    //高位全部归零,只保留末四位

但是问题是,散列值分布再松散,要是只取最后几位的话,碰撞也会很严重。

扰动函数的价值如下:

扰动函数的价值

右位移16位,正好是32bit的一半,自己的高半区和低半区做异或,就是为了混合原始哈希码的高位和低位,以此来加大低位的随机性。

而且混合后的低位掺杂了高位的部分特征,这样高位的信息也被变相保留下来。

优化哈希的原理介绍

comparable class

  • comparableClassFor()

获取对象 x 的类,如果这个类实现了 class C implements Comparable<C> 接口。

ps: 这个方法很有借鉴意义,可以做简单的拓展。我们可以获取任意接口泛型中的类型。

static Class<?> comparableClassFor(Object x) {
    if (x instanceof Comparable) {
        Class<?> c; Type[] ts, as; Type t; ParameterizedType p;
        if ((c = x.getClass()) == String.class) // bypass checks
            return c;
        if ((ts = c.getGenericInterfaces()) != null) {
            for (int i = 0; i < ts.length; ++i) {
                if (((t = ts[i]) instanceof ParameterizedType) &&
                    ((p = (ParameterizedType)t).getRawType() ==
                     Comparable.class) &&
                    (as = p.getActualTypeArguments()) != null &&
                    as.length == 1 && as[0] == c) // type arg is c
                    return c;
            }
        }
    }
    return null;
}

compareComparables()

获取两个可比较对象的比较结果。

@SuppressWarnings({"rawtypes","unchecked"}) // for cast to Comparable
static int compareComparables(Class<?> kc, Object k, Object x) {
    return (x == null || x.getClass() != kc ? 0 :
            ((Comparable)k).compareTo(x));
}

tableSizeFor

获取 2 的幂

static final int tableSizeFor(int cap) {
    int n = cap - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
  • 被调用处
public HashMap(int initialCapacity, float loadFactor) {
    // check...
    this.loadFactor = loadFactor;
    this.threshold = tableSizeFor(initialCapacity);
}
  • 感想

emmm....为什么要这么写?性能吗?

简单分析

当在实例化HashMap实例时,如果给定了initialCapacity,由于HashMap的capacity都是2的幂,因此这个方法用于找到大于等于initialCapacity的最小的2的幂(initialCapacity如果就是2的幂,则返回的还是这个数)。

  • 为什么要 -1

int n = cap - 1;

首先,为什么要对cap做减1操作。int n = cap - 1;
这是为了防止,cap已经是2的幂。如果cap已经是2的幂, 又没有执行这个减1操作,则执行完后面的几条无符号右移操作之后,返回的capacity将是这个cap的2倍。如果不懂,要看完后面的几个无符号右移之后再回来看看。

下面看看这几个无符号右移操作:

如果n这时为0了(经过了cap-1之后),则经过后面的几次无符号右移依然是0,最后返回的capacity是1(最后有个n+1的操作)。

这里只讨论n不等于0的情况。

  • 第一次位运算

n |= n >>> 1;

由于n不等于0,则n的二进制表示中总会有一bit为1,这时考虑最高位的1。

通过无符号右移1位,则将最高位的1右移了1位,再做或操作,使得n的二进制表示中与最高位的1紧邻的右边一位也为1,如000011xxxxxx。

其他依次类推

实例

比如 initialCapacity = 10;

表达式                       二进制
------------------------------------------------------    

initialCapacity = 10;
int n = 9;                  0000 1001
------------------------------------------------------    


n |= n >>> 1;               0000 1001
                            0000 0100   (右移1位) 或运算
                          = 0000 1101
------------------------------------------------------    

n |= n >>> 2;               0000 1101
                            0000 0011   (右移2位) 或运算
                          = 0000 1111
------------------------------------------------------    

n |= n >>> 4;               0000 1111
                            0000 0000   (右移4位) 或运算
                          = 0000 1111
------------------------------------------------------  

n |= n >>> 8;               0000 1111
                            0000 0000   (右移8位) 或运算
                          = 0000 1111
------------------------------------------------------  

n |= n >>> 16;              0000 1111
                            0000 0000   (右移16位) 或运算
                          = 0000 1111
------------------------------------------------------  

n = n+1;                    0001 0000    结果:2^4 = 16;      

put() 解释

下面的内容出自美团博客 Java 8系列之重新认识HashMap

由于写的非常好,此处就直接复制过来了。

流程图解

HashMap的put方法执行过程可以通过下图来理解,自己有兴趣可以去对比源码更清楚地研究学习。

输入图片说明

①.判断键值对数组table[i]是否为空或为null,否则执行resize()进行扩容;

②.根据键值key计算hash值得到插入的数组索引i,如果table[i]==null,直接新建节点添加,转向⑥,如果table[i]不为空,转向③;

③.判断table[i]的首个元素是否和key一样,如果相同直接覆盖value,否则转向④,这里的相同指的是hashCode以及equals;

④.判断table[i] 是否为treeNode,即table[i] 是否是红黑树,如果是红黑树,则直接在树中插入键值对,否则转向⑤;

⑤.遍历table[i],判断链表长度是否大于8,大于8的话把链表转换为红黑树,在红黑树中执行插入操作,否则进行链表的插入操作;遍历过程中若发现key已经存在直接覆盖value即可;

⑥.插入成功后,判断实际存在的键值对数量size是否超多了最大容量threshold,如果超过,进行扩容。

方法源码

public V put(K key, V value) {
    return putVal(hash(key), key, value, false, true);
}
/**
 * Implements Map.put and related methods
 *
 * @param hash hash for key
 * @param key the key
 * @param value the value to put
 * @param onlyIfAbsent if true, don't change existing value
 * @param evict if false, the table is in creation mode.
 * @return previous value, or null if none
 */
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
               boolean evict) {
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    if ((tab = table) == null || (n = tab.length) == 0)
        n = (tab = resize()).length;
    if ((p = tab[i = (n - 1) & hash]) == null)
        tab[i] = newNode(hash, key, value, null);
    else {
        Node<K,V> e; K k;
        if (p.hash == hash &&
            ((k = p.key) == key || (key != null && key.equals(k))))
            e = p;
        else if (p instanceof TreeNode)
            e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
        else {
            for (int binCount = 0; ; ++binCount) {
                if ((e = p.next) == null) {
                    p.next = newNode(hash, key, value, null);
                    if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                        treeifyBin(tab, hash);
                    break;
                }
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    break;
                p = e;
            }
        }
        if (e != null) { // existing mapping for key
            V oldValue = e.value;
            if (!onlyIfAbsent || oldValue == null)
                e.value = value;
            afterNodeAccess(e);
            return oldValue;
        }
    }
    ++modCount;
    if (++size > threshold)
        resize();
    afterNodeInsertion(evict);
    return null;
}

扩容机制

简介

扩容(resize)就是重新计算容量,向HashMap对象里不停的添加元素,而HashMap对象内部的数组无法装载更多的元素时,对象就需要扩大数组的长度,以便能装入更多的元素。

当然Java里的数组是无法自动扩容的,方法是使用一个新的数组代替已有的容量小的数组,就像我们用一个小桶装水,如果想装更多的水,就得换大水桶。

JDK7 源码

我们分析下resize()的源码,鉴于JDK1.8融入了红黑树,较复杂,为了便于理解我们仍然使用JDK1.7的代码,好理解一些,本质上区别不大,具体区别后文再说。

void resize(int newCapacity) {   //传入新的容量
    Entry[] oldTable = table;    //引用扩容前的Entry数组
    int oldCapacity = oldTable.length;         
    if (oldCapacity == MAXIMUM_CAPACITY) {  //扩容前的数组大小如果已经达到最大(2^30)了
        threshold = Integer.MAX_VALUE; //修改阈值为int的最大值(2^31-1),这样以后就不会扩容了
        return;
    }

    Entry[] newTable = new Entry[newCapacity];  //初始化一个新的Entry数组
    transfer(newTable);                         //!!将数据转移到新的Entry数组里
    table = newTable;                           //HashMap的table属性引用新的Entry数组
    threshold = (int)(newCapacity * loadFactor);//修改阈值
}

这里就是使用一个容量更大的数组来代替已有的容量小的数组,transfer() 方法将原有Entry数组的元素拷贝到新的Entry数组里。

void transfer(Entry[] newTable) {
    Entry[] src = table;                   //src引用了旧的Entry数组
    int newCapacity = newTable.length;
    for (int j = 0; j < src.length; j++) { //遍历旧的Entry数组
        Entry<K,V> e = src[j];             //取得旧Entry数组的每个元素
        if (e != null) {
            src[j] = null;//释放旧Entry数组的对象引用(for循环后,旧的Entry数组不再引用任何对象)
            do {
                Entry<K,V> next = e.next;
                int i = indexFor(e.hash, newCapacity); //!!重新计算每个元素在数组中的位置
                e.next = newTable[i]; //标记[1]
                newTable[i] = e;      //将元素放在数组上
                e = next;             //访问下一个Entry链上的元素
            } while (e != null);
        }
    }
}

newTable[i]的引用赋给了e.next,也就是使用了单链表的头插入方式,同一位置上新元素总会被放在链表的头部位置;

这样先放在一个索引上的元素终会被放到Entry链的尾部(如果发生了hash冲突的话),这一点和Jdk1.8有区别,下文详解。

在旧数组中同一条Entry链上的元素,通过重新计算索引位置后,有可能被放到了新数组的不同位置上。

案例

下面举个例子说明下扩容过程。假设了我们的hash算法就是简单的用key mod 一下表的大小(也就是数组的长度)。

其中的哈希桶数组table的size=2, 所以key = 3、7、5,put顺序依次为 5、7、3。

在mod 2以后都冲突在table[1]这里了。

这里假设负载因子 loadFactor=1,即当键值对的实际大小size 大于 table的实际大小时进行扩容。

接下来的三个步骤是哈希桶数组 resize成4,然后所有的Node重新rehash的过程。

输入图片说明

Jdk8 优化

经过观测可以发现,我们使用的是2次幂的扩展(指长度扩为原来2倍),所以,元素的位置要么是在原位置,要么是在原位置再移动2次幂的位置。

看下图可以明白这句话的意思,n为table的长度,图(a)表示扩容前的key1和key2两种key确定索引位置的示例,

图(b)表示扩容后key1和key2两种key确定索引位置的示例,其中hash1是key1对应的哈希与高位运算结果。

位运算

元素在重新计算hash之后,因为n变为2倍,那么n-1的mask范围在高位多1bit(红色),因此新的index就会发生这样的变化:

index

因此,我们在扩充HashMap的时候,不需要像JDK1.7的实现那样重新计算hash,只需要看看原来的hash值新增的那个bit是1还是0就好了,是0的话索引没变,是1的话索引变成“原索引+oldCap”,可以看看下图为16扩充为32的resize示意图:

rehash

这个设计确实非常的巧妙,既省去了重新计算hash值的时间,而且同时,由于新增的1bit是0还是1可以认为是随机的,因此resize的过程,均匀的把之前的冲突的节点分散到新的bucket了。

这一块就是JDK1.8新增的优化点。

有一点注意区别,JDK1.7中rehash的时候,旧链表迁移新链表的时候,如果在新表的数组索引位置相同,则链表元素会倒置,但是从上图可以看出,JDK1.8不会倒置。

JDK8 源码

有兴趣的同学可以研究下JDK1.8的resize源码,写的很赞:

/**
 * Initializes or doubles table size.  If null, allocates in
 * accord with initial capacity target held in field threshold.
 * Otherwise, because we are using power-of-two expansion, the
 * elements from each bin must either stay at same index, or move
 * with a power of two offset in the new table.
 *
 * @return the table
 */
final Node<K,V>[] resize() {
    Node<K,V>[] oldTab = table;
    int oldCap = (oldTab == null) ? 0 : oldTab.length;
    int oldThr = threshold;
    int newCap, newThr = 0;
    if (oldCap > 0) {
        if (oldCap >= MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return oldTab;
        }
        else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                 oldCap >= DEFAULT_INITIAL_CAPACITY)
            newThr = oldThr << 1; // double threshold
    }
    else if (oldThr > 0) // initial capacity was placed in threshold
        newCap = oldThr;
    else {               // zero initial threshold signifies using defaults
        newCap = DEFAULT_INITIAL_CAPACITY;
        newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
    }
    if (newThr == 0) {
        float ft = (float)newCap * loadFactor;
        newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                  (int)ft : Integer.MAX_VALUE);
    }
    threshold = newThr;
    @SuppressWarnings({"rawtypes","unchecked"})
        Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
    table = newTab;
    if (oldTab != null) {
        for (int j = 0; j < oldCap; ++j) {
            Node<K,V> e;
            if ((e = oldTab[j]) != null) {
                oldTab[j] = null;
                if (e.next == null)
                    newTab[e.hash & (newCap - 1)] = e;
                else if (e instanceof TreeNode)
                    ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                else { // preserve order
                    Node<K,V> loHead = null, loTail = null;
                    Node<K,V> hiHead = null, hiTail = null;
                    Node<K,V> next;
                    do {
                        next = e.next;
                        if ((e.hash & oldCap) == 0) {
                            if (loTail == null)
                                loHead = e;
                            else
                                loTail.next = e;
                            loTail = e;
                        }
                        else {
                            if (hiTail == null)
                                hiHead = e;
                            else
                                hiTail.next = e;
                            hiTail = e;
                        }
                    } while ((e = next) != null);
                    if (loTail != null) {
                        loTail.next = null;
                        newTab[j] = loHead;
                    }
                    if (hiTail != null) {
                        hiTail.next = null;
                        newTab[j + oldCap] = hiHead;
                    }
                }
            }
        }
    }
    return newTab;
}

小结

如果你已经通读全文,那么你已经非常厉害了。

其实第一遍没有彻底理解也没有关系,知道 HashMap 有一个 reHash 的过程就行,类似于 ArrayList 的 resize。

下一节我们将一起学习下自己手写实现一个渐进式 rehash 的 HashMap,感兴趣的可以关注一下,便于实时接收最新内容。

觉得本文对你有帮助的话,欢迎点赞评论收藏转发一波。你的鼓励,是我最大的动力~

不知道你有哪些收获呢?或者有其他更多的想法,欢迎留言区和我一起讨论,期待与你的思考相遇。

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 10月9日

java 手写并发框架(一)异步查询转同步的 7 种实现方式

序言

本节将学习一下如何实现异步查询转同步的方式,共计介绍了 7 种常见的实现方式。

思维导图如下:

思维导图

异步转同步

业务需求

有些接口查询反馈结果是异步返回的,无法立刻获取查询结果。

比如业务开发中我们调用其他系统,但是结果的返回确实通知的。

或者 rpc 实现中,client 调用 server 端,结果也是异步返回的,那么如何同步获取调用结果呢?

  • 正常处理逻辑

触发异步操作,然后传递一个唯一标识。

等到异步结果返回,根据传入的唯一标识,匹配此次结果。

  • 如何转换为同步

正常的应用场景很多,但是有时候不想做数据存储,只是想简单获取调用结果。

即想达到同步操作的结果,怎么办呢?

思路

  1. 发起异步操作
  2. 在异步结果返回之前,一直等待(可以设置超时)
  3. 结果返回之后,异步操作结果统一返回

常见的实现方式

  • 循环等待
  • wait & notify
  • 使用条件锁
  • 使用 CountDownLatch
  • 使用 CyclicBarrier
  • Future
  • Spring EventListener

下面我们一起来学习下这几种实现方式。

循环等待

说明

循环等待是最简单的一种实现思路。

我们调用对方一个请求,在没有结果之前一直循环查询即可。

这个结果可以在内存中,也可以放在 redis 缓存或者 mysql 等数据库中。

代码实现

定义抽象父类

为了便于后面的其他几种实现方式统一,我们首先定义一个抽象父类。

/**
 * 抽象查询父类
 * @author binbin.hou
 * @since 1.0.0
 */
public abstract class AbstractQuery {

    private static final Log log = LogFactory.getLog(AbstractQuery.class);

    protected String result;

    public void asyncToSync() {
        startQuery();
        new Thread(new Runnable() {
            public void run() {
                remoteCall();
            }
        }).start();
        endQuery();
    }

    protected void startQuery() {
        log.info("开始查询...");
    }

    /**
     * 远程调用
     */
    protected void remoteCall() {
        try {
            log.info("远程调用开始");
            TimeUnit.SECONDS.sleep(5);
            result = "success";
            log.info("远程调用结束");
        } catch (InterruptedException e) {
            log.error("远程调用失败", e);
        }
    }

    /**
     * 查询结束
     */
    protected void endQuery() {
        log.info("完成查询,结果为:" + result);
    }

}

代码实现

实现还是非常简单的,在没有结果之前一直循环。

TimeUnit.MILLISECONDS.sleep(10); 这里循环等待的小睡一会儿是比较重要的,避免 cpu 飙升,也可以降低为 1ms,根据自己的业务调整即可。

/**
 * 循环等待
 * @author binbin.hou
 * @since 1.0.0
 */
public class LoopQuery extends AbstractQuery {

    private static final Log log = LogFactory.getLog(LoopQuery.class);

    @Override
    protected void endQuery() {
        try {
            while (StringUtil.isEmpty(result)) {
                //循环等待一下
                TimeUnit.MILLISECONDS.sleep(10);
            }

            //获取结果

            log.info("完成查询,结果为:" + result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

测试

LoopQuery loopQuery = new LoopQuery();
loopQuery.asyncToSync();
  • 日志
[INFO] [2020-10-08 09:50:43.330] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 开始查询...
[INFO] [2020-10-08 09:50:43.331] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 远程调用开始
[INFO] [2020-10-08 09:50:48.334] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 远程调用结束
[INFO] [2020-10-08 09:50:48.343] [main] [c.g.h.s.t.d.LoopQuery.endQuery] - 完成查询,结果为:success

这里可以看到远程调用是 Thread-0 线程执行的,远程调用的耗时为 5S。

超时特性

为什么需要超时时间

上面的实现存在一个问题,那就是循环等待没有超时时间。

我们的一个网络请求,可能存在失败,也可能对方收到请求之后没有正确处理。

所以如果我们一直等待,可能永远也没有结果,或者很久之后才有结果。这在业务上是不可忍受的,所以需要添加一个超时时间。

代码实现

/**
 * 循环等待-包含超时时间
 * @author binbin.hou
 * @since 1.0.0
 */
public class LoopTimeoutQuery extends AbstractQuery {

    private static final Log log = LogFactory.getLog(LoopTimeoutQuery.class);

    /**
     * 超时时间
     */
    private long timeoutMills = 3000;

    public LoopTimeoutQuery() {
    }

    public LoopTimeoutQuery(long timeoutMills) {
        this.timeoutMills = timeoutMills;
    }

    @Override
    protected void endQuery() {
        try {
            final long endTimeMills = System.currentTimeMillis() + timeoutMills;

            while (StringUtil.isEmpty(result)) {
                // 超时判断
                if(System.currentTimeMillis() >= endTimeMills) {
                    throw new RuntimeException("请求超时");
                }

                //循环等待一下
                TimeUnit.MILLISECONDS.sleep(10);
            }

            //获取结果

            log.info("完成查询,结果为:" + result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

测试

LoopTimeoutQuery loopQuery = new LoopTimeoutQuery();
loopQuery.asyncToSync();

日志如下:

[INFO] [2020-10-08 10:04:58.091] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 开始查询...
[INFO] [2020-10-08 10:04:58.092] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 远程调用开始
Exception in thread "main" java.lang.RuntimeException: 请求超时
    at com.github.houbb.sync.test.demo.LoopTimeoutQuery.endQuery(LoopTimeoutQuery.java:38)
    at com.github.houbb.sync.test.demo.AbstractQuery.asyncToSync(AbstractQuery.java:26)
    at com.github.houbb.sync.test.demo.LoopTimeoutQuery.main(LoopTimeoutQuery.java:55)
[INFO] [2020-10-08 10:05:03.097] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 远程调用结束

超时时间是可以设定的,平时开发中可以根据自己的响应时间设置。

如果请求超时,考虑对应的兜底方案。

基于 wait() & notifyAll()

简介

实际上 loop 循环还是比较消耗性能的,对于这种等待特性, jdk 实际上为我们封装了多种特性。

比如最常见的 wait() 进入等待,notifyAll() 唤醒等待的组合方式。

这个同时也是阻塞队列的实现思想,阻塞队列我们就不介绍了,我们来看一下 wait+notify 的实现方式。

java 实现

package com.github.houbb.sync.test.demo;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

/**
 * wait+notify 实现
 * @author binbin.hou
 * @since 1.0.0
 */
public class WaitNotifyQuery extends AbstractQuery {

    private static final Log log = LogFactory.getLog(WaitNotifyQuery.class);

    /**
     * 声明对象
     */
    private final Object lock = new Object();

    @Override
    protected void remoteCall() {
        super.remoteCall();
        synchronized (lock) {
            log.info("远程线程执行完成,唤醒所有等待。");
            lock.notifyAll();
        }
    }

    @Override
    protected void endQuery() {
        try {
            // 等待 10s
            synchronized (lock) {
                log.info("主线程进入等待");
                lock.wait(10 * 1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        super.endQuery();
    }

    public static void main(String[] args) {
        WaitNotifyQuery query = new WaitNotifyQuery();
        query.asyncToSync();
    }

}

注意:编程时需要使用 synchronized 保证锁的持有者线程安全,不然会报错。

测试

日志如下:

[INFO] [2020-10-08 11:05:50.769] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 开始查询...
[INFO] [2020-10-08 11:05:50.770] [main] [c.g.h.s.t.d.WaitNotifyQuery.endQuery] - 主线程进入等待
[INFO] [2020-10-08 11:05:50.770] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 远程调用开始
[INFO] [2020-10-08 11:05:55.772] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 远程调用结束
[INFO] [2020-10-08 11:05:55.773] [Thread-0] [c.g.h.s.t.d.WaitNotifyQuery.remoteCall] - 远程线程执行完成,唤醒所有等待。
[INFO] [2020-10-08 11:05:55.773] [main] [c.g.h.s.t.d.AbstractQuery.endQuery] - 完成查询,结果为:success

基于条件锁的实现

条件锁简介

如果你想编写一个含有多个条件谓词的并发对象,或者你想获得比条件队列的可见性之外更多的控制权,那么显式的Lock和Condition的实现类提供了一个比内部锁和条件队列更加灵活的选择。

如同Lock提供了比内部加锁要丰富得多的特征集一样,Condition也提供了比内部条件队列要丰富得多的特征集:

每个锁可以有多个等待集(因await挂起的线程的集合)、可中断/不可中断的条件等待、基于时限的等待以及公平/非公平队列之间的选择.

Condition 介绍

注意事项:

wait、notify和notifyAll在Condition对象中的对等体是await、signal和signalAll.

但是,Condition继承与Object,这意味着它也有wait和notify方法.

一定要确保使用了正确的版本–await和signal!

java 实现

为了演示简单,我们直接选择可重入锁即可。

一个Condition和一个单独的Lock相关联,就像条件队列和单独的内部锁相关联一样;

调用与Condition相关联的Lock的Lock.newCondition方法,可以创建一个Condition.

package com.github.houbb.sync.test.demo;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 条件锁实现
 * @author binbin.hou
 * @since 1.0.0
 */
public class LockConditionQuery extends AbstractQuery {

    private static final Log log = LogFactory.getLog(LockConditionQuery.class);

    private final Lock lock = new ReentrantLock();

    private final Condition condition = lock.newCondition();

    @Override
    protected void remoteCall() {
        lock.lock();
        try{
            super.remoteCall();

            log.info("远程线程执行完成,唤醒所有等待线程。");
            condition.signalAll();
        } finally {
            lock.unlock();
        }

    }

    @Override
    protected void endQuery() {
        lock.lock();
        try{
            // 等待
            log.info("主线程进入等待");

            condition.await();

            super.endQuery();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        LockConditionQuery query = new LockConditionQuery();
        query.asyncToSync();
    }

}

实现也比较简单,我们在方法进入,调用 lock.lock() 加锁,finally 中调用 lock.unlock() 释放锁。

condition.await(); 进入等待;condition.signalAll(); 唤醒所有等待线程。

测试日志

[INFO] [2020-10-08 12:33:40.985] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 开始查询...
[INFO] [2020-10-08 12:33:40.986] [main] [c.g.h.s.t.d.LockConditionQuery.endQuery] - 主线程进入等待
[INFO] [2020-10-08 12:33:40.987] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 远程调用开始
[INFO] [2020-10-08 12:33:45.990] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 远程调用结束
[INFO] [2020-10-08 12:33:45.991] [Thread-0] [c.g.h.s.t.d.LockConditionQuery.remoteCall] - 远程线程执行完成,唤醒所有等待线程。
[INFO] [2020-10-08 12:33:45.993] [main] [c.g.h.s.t.d.AbstractQuery.endQuery] - 完成查询,结果为:success

CountDownLatch 闭锁实现

CountDownLatch/Future/CyclicBarrier 这三个都是 jdk 为我们提供的同步工具类,我们此处只做简单介绍。

详情参见:

JCIP-19-同步工具类。闭锁/栅栏/信号量/阻塞队列/FutureTask

CountDownLatch 简介

闭锁是一种同步工具类,可以延迟线程的进度直到其达到终止状态。

闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。

当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。

闭锁可以用来确保某些活动直到其它活动都完成后才继续执行。

java 代码实现

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * CountDownLatch 实现
 * @author binbin.hou
 * @since 1.0.0
 */
public class CountDownLatchQuery extends AbstractQuery {

    private static final Log log = LogFactory.getLog(CountDownLatchQuery.class);

    /**
     * 闭锁
     * 调用1次,后续方法即可通行。
     */
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    @Override
    protected void remoteCall() {
        super.remoteCall();

        // 调用一次闭锁
        countDownLatch.countDown();
    }

    @Override
    protected void endQuery() {
        try {
//            countDownLatch.await();
            countDownLatch.await(10, TimeUnit.SECONDS);

            log.info("完成查询,结果为:" + result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        CountDownLatchQuery loopQuery = new CountDownLatchQuery();
        loopQuery.asyncToSync();
    }

}

我们在返回结果之前调用 countDownLatch.await(10, TimeUnit.SECONDS); 进行等待,这里可以指定超时时间。

remoteCall() 远程完成后,执行一下 countDownLatch.countDown();,进而可以让程序继续执行下去。

测试

代码

CountDownLatchQuery loopQuery = new CountDownLatchQuery();
loopQuery.asyncToSync();

日志

[INFO] [2020-10-08 10:24:03.348] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 开始查询...
[INFO] [2020-10-08 10:24:03.350] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 远程调用开始
[INFO] [2020-10-08 10:24:08.353] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 远程调用结束
[INFO] [2020-10-08 10:24:08.354] [main] [c.g.h.s.t.d.CountDownLatchQuery.endQuery] - 完成查询,结果为:success

jdk 提供的闭锁功能还是非常的方便的。

CyclicBarrier 栅栏

简介

栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生[CPJ 4.4.3]。闭锁是一次性对象,一旦进入最终状态,就不能被重置了。

栅栏与闭锁的关键区别在于,所有线程必须同时达到栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

java 实现

package com.github.houbb.sync.test.demo;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * CyclicBarrier 实现
 * @author binbin.hou
 * @since 1.0.0
 */
public class CyclicBarrierQuery extends AbstractQuery {

    private static final Log log = LogFactory.getLog(CyclicBarrierQuery.class);

    private CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    @Override
    protected void remoteCall() {
        super.remoteCall();

        try {
            cyclicBarrier.await();
            log.info("远程调用进入等待");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void endQuery() {
        try {
            cyclicBarrier.await();
            log.info("主线程进入等待");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

        super.endQuery();
    }

}

测试

代码

public static void main(String[] args) {
    CyclicBarrierQuery cyclicBarrierQuery = new CyclicBarrierQuery();
    cyclicBarrierQuery.asyncToSync();
}

日志

[INFO] [2020-10-08 10:39:00.890] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 开始查询...
[INFO] [2020-10-08 10:39:00.892] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 远程调用开始
[INFO] [2020-10-08 10:39:05.894] [Thread-0] [c.g.h.s.t.d.AbstractQuery.remoteCall] - 远程调用结束
[INFO] [2020-10-08 10:39:05.895] [Thread-0] [c.g.h.s.t.d.CyclicBarrierQuery.remoteCall] - 远程调用进入等待
[INFO] [2020-10-08 10:39:05.895] [main] [c.g.h.s.t.d.CyclicBarrierQuery.endQuery] - 主线程进入等待
[INFO] [2020-10-08 10:39:05.896] [main] [c.g.h.s.t.d.AbstractQuery.endQuery] - 完成查询,结果为:success

可以看出远程线程 Thread-0 执行完之后就进入等待,此时主线程调用,然后也进入等待。

等主线程 endQuery 等待时,就满足了两个线程同时等待,然后执行就结束了。

基于 Future 实现

Future 简介

Future模式可以这样来描述:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。就相当于下了一张订货单,一段时间后可以拿着提订单来提货,这期间可以干别的任何事情。其中Future 接口就是订货单,真正处理订单的是Executor类,它根据Future接口的要求来生产产品。

Future接口提供方法来检测任务是否被执行完,等待任务执行完获得结果,也可以设置任务执行的超时时间。这个设置超时的方法就是实现Java程序执行超时的关键。

详细介绍:

JCIP-26-Executor Future FutureTask

java 代码实现

采用 Future 返回和以前的实现差异较大,我们直接覆写以前的方法即可。

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

import java.util.concurrent.*;

/**
 * Future 实现
 * @author binbin.hou
 * @since 1.0.0
 */
public class FutureQuery extends AbstractQuery {

    private static final Log log = LogFactory.getLog(FutureQuery.class);

    private final ExecutorService executorService = Executors.newSingleThreadExecutor();

    @Override
    public void asyncToSync() {
        //1. 开始调用
        super.startQuery();

        //2. 远程调用
        Future<String> stringFuture = remoteCallFuture();

        //3. 完成结果
        try {
            String result = stringFuture.get(10, TimeUnit.SECONDS);
            log.info("调用结果:{}", result);
        } catch (InterruptedException | TimeoutException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    /**
     * 远程调用
     * @return Future 信息
     */
    private Future<String> remoteCallFuture() {
        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("开始异步调用");
                TimeUnit.SECONDS.sleep(5);
                log.info("完成异步调用");
                return "success";
            }
        });

        executorService.submit(futureTask);
        // 关闭线程池
        executorService.shutdown();
        return futureTask;
    }

    public static void main(String[] args) {
        FutureQuery query = new FutureQuery();
        query.asyncToSync();
    }

}

远程调用执行时,是一个 FutureTask,然后提交到线程池去执行。

获取结果的时候,stringFuture.get(10, TimeUnit.SECONDS) 可以指定获取的超时时间。

日志

测试日志如下:

[INFO] [2020-10-08 12:52:05.175] [main] [c.g.h.s.t.d.AbstractQuery.startQuery] - 开始查询...
[INFO] [2020-10-08 12:52:05.177] [pool-1-thread-1] [c.g.h.s.t.d.FutureQuery.call] - 开始异步调用
[INFO] [2020-10-08 12:52:10.181] [pool-1-thread-1] [c.g.h.s.t.d.FutureQuery.call] - 完成异步调用
[INFO] [2020-10-08 12:52:10.185] [main] [c.g.h.s.t.d.FutureQuery.asyncToSync] - 调用结果:success

Spring EventListener

spring 事件监听器模式

对于一件事情完成的结果调用,使用观察者模式是非常适合的。

spring 为我们提供了比较强大的监听机制,此处演示下结合 spring 使用的例子。

ps: 这个例子是2年前的自己写的例子了,此处为了整个系列的完整性,直接搬过来作为补充。

代码实现

  • BookingCreatedEvent.java

定义一个传输属性的对象。

public class BookingCreatedEvent extends ApplicationEvent {

    private static final long serialVersionUID = -1387078212317348344L;

    private String info;

    public BookingCreatedEvent(Object source) {
        super(source);
    }

    public BookingCreatedEvent(Object source, String info) {
        super(source);
        this.info = info;
    }

    public String getInfo() {
        return info;
    }
}
  • BookingService.java

说明:当 this.context.publishEvent(bookingCreatedEvent); 触发时,
会被 @EventListener 指定的方法监听到。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class BookingService {

    @Autowired
    private ApplicationContext context;

    private volatile BookingCreatedEvent bookingCreatedEvent;

    /**
     * 异步转同步查询
     * @param info
     * @return
     */
    public String asyncQuery(final String info) {
        query(info);

        new Thread(new Runnable() {
            @Override
            public void run() {
                remoteCallback(info);
            }
        }).start();

        while(bookingCreatedEvent == null) {
            //.. 空循环
            // 短暂等待。
            try {
                TimeUnit.MILLISECONDS.sleep(1);
            } catch (InterruptedException e) {
                //...
            }
            //2. 使用两个单独的 event...

        }

        final String result = bookingCreatedEvent.getInfo();
        bookingCreatedEvent = null;
        return result;
    }

    @EventListener
    public void onApplicationEvent(BookingCreatedEvent bookingCreatedEvent) {
        System.out.println("监听到远程的信息: " + bookingCreatedEvent.getInfo());
        this.bookingCreatedEvent = bookingCreatedEvent;
        System.out.println("监听到远程消息后: " + this.bookingCreatedEvent.getInfo());
    }

    /**
     * 执行查询
     * @param info
     */
    public void query(final String info) {
        System.out.println("开始查询: " + info);
    }

    /**
     * 远程回调
     * @param info
     */
    public void remoteCallback(final String info) {
        System.out.println("远程回调开始: " + info);

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 重发结果事件
        String result = info + "-result";
        BookingCreatedEvent bookingCreatedEvent = new BookingCreatedEvent(this, result);
        //触发event
        this.context.publishEvent(bookingCreatedEvent);
    }
}
  • 测试方法
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = SpringConfig.class)
public class BookServiceTest {

    @Autowired
    private BookingService bookingService;

    @Test
    public void asyncQueryTest() {
        bookingService.asyncQuery("1234");
    }

}
  • 日志
2018-08-10 18:27:05.958  INFO  [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:84 - 开始查询:1234
2018-08-10 18:27:05.959  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:93 - 远程回调开始:1234
接收到信息: 1234-result
2018-08-10 18:27:07.964  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:73 - 监听到远程的信息: 1234-result
2018-08-10 18:27:07.964  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:75 - 监听到远程消息后: 1234-result
2018-08-10 18:27:07.964  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:106 - 已经触发event
2018-08-10 18:27:07.964  INFO  [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:67 - 查询结果: 1234-result
2018-08-10 18:27:07.968  INFO  [Thread-1] org.springframework.context.support.GenericApplicationContext:993 - Closing org.springframework.context.support.GenericApplicationContext@5cee5251: startup date [Fri Aug 10 18:27:05 CST 2018]; root of context hierarchy

小结

本文共计介绍了 7 种异步转同步的方式,实际上思想都是一样的。

在异步执行完成前等待,执行完成后唤醒等待即可。

当然我写本文除了总结以上几种方式以外,还想为后续写一个异步转同步的工具提供基础。

下一节我们将一起学习下如何将这个功能封装为一个同步转换框架,感兴趣的可以关注一下,便于实时接收最新内容。

觉得本文对你有帮助的话,欢迎点赞评论收藏转发一波。你的鼓励,是我最大的动力~

不知道你有哪些收获呢?或者有其他更多的想法,欢迎留言区和我一起讨论,期待与你的思考相遇。

代码地址

为了便于学习,文中的所有例子都已经开源:

实现 1-6:sync

loop

countdownlatch

spring-event-listener

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 10月8日

从零开始手写缓存框架(12)redis expire 过期的随机特性详解及实现

前言

java从零手写实现redis(一)如何实现固定大小的缓存?

java从零手写实现redis(二)redis expire 过期原理

java从零手写实现redis(三)内存数据如何重启不丢失?

java从零手写实现redis(四)添加监听器

java从零手写实现redis(五)过期策略的另一种实现思路

java从零手写实现redis(六)AOF 持久化原理详解及实现

java从零开始手写redis(七)LRU 缓存淘汰策略详解

java从零开始手写redis(八)朴素 LRU 淘汰算法性能优化

第二节中我们已经初步实现了类似 redis 中的 expire 过期功能,不过存在一个问题没有解决,那就是遍历的时候不是随机返回的,会导致每次遍历从头开始,可能导致很多 Keys 处于“饥饿”状态。

可以回顾:

java从零手写实现redis(二)redis expire 过期原理

java从零手写实现redis(五)过期策略的另一种实现思路

本节我们一起来实现一个过期的随机性版本,更近一步领会一下 redis 的巧妙之处。

以前的实现回顾

开始新的旅程之前,我们先回顾一下原来的实现。

expire 实现原理

其实过期的实思路也比较简单:我们可以开启一个定时任务,比如 1 秒钟做一次轮训,将过期的信息清空。

过期信息的存储

/**
 * 过期 map
 *
 * 空间换时间
 * @since 0.0.3
 */
private final Map<K, Long> expireMap = new HashMap<>();

@Override
public void expire(K key, long expireAt) {
    expireMap.put(key, expireAt);
}

我们定义一个 map,key 是对应的要过期的信息,value 存储的是过期时间。

轮询清理

我们固定 100ms 清理一次,每次最多清理 100 个。

/**
 * 单次清空的数量限制
 * @since 0.0.3
 */
private static final int LIMIT = 100;

/**
 * 缓存实现
 * @since 0.0.3
 */
private final ICache<K,V> cache;
/**
 * 线程执行类
 * @since 0.0.3
 */
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
public CacheExpire(ICache<K, V> cache) {
    this.cache = cache;
    this.init();
}
/**
 * 初始化任务
 * @since 0.0.3
 */
private void init() {
    EXECUTOR_SERVICE.scheduleAtFixedRate(new ExpireThread(), 100, 100, TimeUnit.MILLISECONDS);
}

这里定义了一个单线程,用于执行清空任务。

清空任务

这个非常简单,遍历过期数据,判断对应的时间,如果已经到期了,则执行清空操作。

为了避免单次执行时间过长,最多只处理 100 条。

/**
 * 定时执行任务
 * @since 0.0.3
 */
private class ExpireThread implements Runnable {
    @Override
    public void run() {
        //1.判断是否为空
        if(MapUtil.isEmpty(expireMap)) {
            return;
        }
        //2. 获取 key 进行处理
        int count = 0;
        for(Map.Entry<K, Long> entry : expireMap.entrySet()) {
            if(count >= LIMIT) {
                return;
            }
            expireKey(entry);
            count++;
        }
    }
}

/**
 * 执行过期操作
 * @param entry 明细
 * @since 0.0.3
 */
private void expireKey(Map.Entry<K, Long> entry) {
    final K key = entry.getKey();
    final Long expireAt = entry.getValue();
    // 删除的逻辑处理
    long currentTime = System.currentTimeMillis();
    if(currentTime >= expireAt) {
        expireMap.remove(key);
        // 再移除缓存,后续可以通过惰性删除做补偿
        cache.remove(key);
    }
}

redis 的定时任务

流程

想知道我们的流程就什么问题,和 redis 的定时清理任务流程对比一下就知道了。

Redis内部维护一个定时任务,默认每秒运行10次(通过配置hz控制)。

定时任务中删除过期键逻辑采用了自适应算法,根据键的过期比例、使用快慢两种速率模式回收键,流程如下所示。

输入图片说明

流程说明

1)定时任务在每个数据库空间随机检查20个键,当发现过期时删除对应的键。

2)如果超过检查数25%的键过期,循环执行回收逻辑直到不足25%或运行超时为止,慢模式下超时时间为25毫秒。

3)如果之前回收键逻辑超时,则在Redis触发内部事件之前再次以快模式运行回收过期键任务,快模式下超时时间为1毫秒且2秒内只能运行1次。

4)快慢两种模式内部删除逻辑相同,只是执行的超时时间不同。

ps: 这里的快慢模式设计的也比较巧妙,根据过期信息的比例,调整对应的任务超时时间。

这里的随机也非常重要,可以比较客观的清理掉过期信息,而不是从头遍历,导致后面的数据无法被访问。

我们接下来主要实现下随机抽取这个特性。

直接通过 Map#keys 转集合

实现思路

保持原来的 expireMap 不变,直接对 keys 转换为 collection,然后随机获取。

这个也是网上最多的一种答案。

java 代码实现

基本属性

public class CacheExpireRandom<K,V> implements ICacheExpire<K,V> {

    private static final Log log = LogFactory.getLog(CacheExpireRandom.class);

    /**
     * 单次清空的数量限制
     * @since 0.0.16
     */
    private static final int COUNT_LIMIT = 100;

    /**
     * 过期 map
     *
     * 空间换时间
     * @since 0.0.16
     */
    private final Map<K, Long> expireMap = new HashMap<>();

    /**
     * 缓存实现
     * @since 0.0.16
     */
    private final ICache<K,V> cache;

    /**
     * 是否启用快模式
     * @since 0.0.16
     */
    private volatile boolean fastMode = false;

    /**
     * 线程执行类
     * @since 0.0.16
     */
    private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();

    public CacheExpireRandom(ICache<K, V> cache) {
        this.cache = cache;
        this.init();
    }

    /**
     * 初始化任务
     * @since 0.0.16
     */
    private void init() {
        EXECUTOR_SERVICE.scheduleAtFixedRate(new ExpireThreadRandom(), 10, 10, TimeUnit.SECONDS);
    }

}

定时任务

这里我们和 redis 保持一致,支持 fastMode。

实际上 fastMode 和慢模式的逻辑是完全一样的,只是超时的时间不同。

这里的超时时间我根据个人理解做了一点调整,整体流程不变。

/**
 * 定时执行任务
 * @since 0.0.16
 */
private class ExpireThreadRandom implements Runnable {
    @Override
    public void run() {
        //1.判断是否为空
        if(MapUtil.isEmpty(expireMap)) {
            log.info("expireMap 信息为空,直接跳过本次处理。");
            return;
        }
        //2. 是否启用快模式
        if(fastMode) {
            expireKeys(10L);
        }
        //3. 缓慢模式
        expireKeys(100L);
    }
}

过期信息核心实现

我们执行过期的时候,首先会记录超时时间,用于超出时直接中断执行。

默认恢复 fastMode=false,当执行超时的时候设置 fastMode=true。

/**
 * 过期信息
 * @param timeoutMills 超时时间
 * @since 0.0.16
 */
private void expireKeys(final long timeoutMills) {
    // 设置超时时间 100ms
    final long timeLimit = System.currentTimeMillis() + timeoutMills;
    // 恢复 fastMode
    this.fastMode = false;
    //2. 获取 key 进行处理
    int count = 0;
    while (true) {
        //2.1 返回判断
        if(count >= COUNT_LIMIT) {
            log.info("过期淘汰次数已经达到最大次数: {},完成本次执行。", COUNT_LIMIT);
            return;
        }
        if(System.currentTimeMillis() >= timeLimit) {
            this.fastMode = true;
            log.info("过期淘汰已经达到限制时间,中断本次执行,设置 fastMode=true;");
            return;
        }
        //2.2 随机过期
        K key = getRandomKey();
        Long expireAt = expireMap.get(key);
        boolean expireFlag = expireKey(key, expireAt);
        log.debug("key: {} 过期执行结果 {}", key, expireFlag);
        //2.3 信息更新
        count++;
    }
}

随机获取过期 key

/**
 * 随机获取一个 key 信息
 * @return 随机返回的 keys
 * @since 0.0.16
 */
private K getRandomKey() {
    Random random = ThreadLocalRandom.current();
    Set<K> keySet = expireMap.keySet();
    List<K> list = new ArrayList<>(keySet);
    int randomIndex = random.nextInt(list.size());
    return list.get(randomIndex);
}

这个就是网上最常见的实现方法,直接所有 keys 转换为 list,然后通过 random 获取一个元素。

性能改进

方法的缺陷

getRandomKey() 方法为了获取一个随机的信息,代价还是太大了。

如果 keys 的数量非常大,那么我们要创建一个 list,这个本身就是非常耗时的,而且空间复杂度直接翻倍。

所以不太清楚为什么晚上最多的是这一种解法。

优化思路-避免空间浪费

最简单的思路是我们应该避免 list 的创建。

我们所要的只是一个基于 size 的随机值而已,我们可以遍历获取:

private K getRandomKey2() {
    Random random = ThreadLocalRandom.current();
    int randomIndex = random.nextInt(expireMap.size());
    // 遍历 keys
    Iterator<K> iterator = expireMap.keySet().iterator();
    int count = 0;
    while (iterator.hasNext()) {
        K key = iterator.next();
        if(count == randomIndex) {
            return key;
        }
        count++;
    }
    // 正常逻辑不会到这里
    throw new CacheRuntimeException("对应信息不存在");
}

优化思路-批量操作

上述的方法避免了 list 的创建,同时也符合随机的条件。

但是从头遍历到随机的 size 数值,这也是一个比较慢的过程(O(N) 时间复杂度)。

如果我们取 100 次,悲观的话就是 100 * O(N)。

我们可以运用批量的思想,比如一次取 100 个,降低时间复杂度:

/**
 * 批量获取多个 key 信息
 * @param sizeLimit 大小限制
 * @return 随机返回的 keys
 * @since 0.0.16
 */
private Set<K> getRandomKeyBatch(final int sizeLimit) {
    Random random = ThreadLocalRandom.current();
    int randomIndex = random.nextInt(expireMap.size());
    // 遍历 keys
    Iterator<K> iterator = expireMap.keySet().iterator();
    int count = 0;
    Set<K> keySet = new HashSet<>();
    while (iterator.hasNext()) {
        // 判断列表大小
        if(keySet.size() >= sizeLimit) {
            return keySet;
        }
        K key = iterator.next();
        // index 向后的位置,全部放进来。
        if(count >= randomIndex) {
            keySet.add(key);
        }
        count++;
    }
    // 正常逻辑不会到这里
    throw new CacheRuntimeException("对应信息不存在");
}

我们传入一个列表的大小限制,可以一次获取多个。

优化思路-O(1) 的时间复杂度

一开始想到随机,我的第一想法是同时冗余一个 list 存放 keys,然后可以随机返回 key,解决问题。

但是对于 list 的更新,确实 O(N) 的,空间复杂度多出了 list 这一部分,感觉不太值当。

如果使用前面的 map 存储双向链表节点也可以解决,但是相对比较麻烦,前面也都实现过,此处就不赘述了。

其实这里的随机还是有些不足

(1)比如随机如果数据重复了怎么处理?

当然目前的解法就是直接 count,一般数据量较大时这种概率比较低,而且有惰性删除兜底,所以无伤大雅。

(2)随机到的信息很大可能过期时间没到

这里最好采用我们原来的基于过期时间的 map 分类方式,这样可以保证获取到的信息过期时间在我们的掌握之中。

当然各种方法各有利弊,看我们如何根据实际情况取舍。

小结

到这里,一个类似于 redis 的 expire 过期功能,算是基本实现了。

对于 redis 过期的实现,到这里也基本告一段落了。当然,还有很多优化的地方,希望你在评论区写下自己的方法。

开源地址:https://github.com/houbb/cache

觉得本文对你有帮助的话,欢迎点赞评论收藏关注一波。你的鼓励,是我最大的动力~

不知道你有哪些收获呢?或者有其他更多的想法,欢迎留言区和我一起讨论,期待与你的思考相遇。

原文地址

Cache Travel-09-从零手写 cache 之 redis expire 过期实现原理

参考资料

JAVA 随机选出MAP中的键

Selecting random key and value sets from a Map in Java

查看原文

赞 3 收藏 3 评论 0

老马啸西风 发布了文章 · 10月7日

java 从零开始手写 redis(11)clock时钟淘汰算法详解及实现

前言

java从零手写实现redis(一)如何实现固定大小的缓存?

java从零手写实现redis(三)redis expire 过期原理

java从零手写实现redis(三)内存数据如何重启不丢失?

java从零手写实现redis(四)添加监听器

java从零手写实现redis(五)过期策略的另一种实现思路

java从零手写实现redis(六)AOF 持久化原理详解及实现

java从零开始手写 redis(七)LRU 缓存淘汰策略详解

前面我们实现了 FIFO/LRU/LFU 等常见的淘汰策略,不过在操作系统中,实际上使用的是时钟页面置换算法。

LRU 的性能确实很好,不过比较消耗内存,而且实现比较麻烦。

时钟页面置换算法就是一种近似 LRU 的算法实现,可以看作是对 FIFO 算法的改进。

Clock 页面置换算法

为什么需要 clock 算法?

LRU算法的性能接近于OPT,但是实现起来比较困难,且开销大;FIFO算法实现简单,但性能差。

所以操作系统的设计者尝试了很多算法,试图用比较小的开销接近LRU的性能,这类算法都是CLOCK算法的变体。

由于该算法循环地检查各页面的情况,故称为CLOCK算法,又称为最近未用(Not Recently Used, NRU)算法。

基本思路

需要用到页表项的访问位(access bit),当一个页面被装入内存时,把该位初始化为0,然后如果这个页被访问(读/写)时,硬件把它置为1.

把各个页面组织成环形链表(类似钟表面),把指针指向最老的页面(最先进来);

当发生一个缺页中断,考察指针所指向的最老的页面,若它的访问为为0,则立即淘汰。若访问为1,则把该位置为0,然后指针往下移动一格。如此下去,直到找到被淘汰的页面,然后把指针移动到它的下一格。

个人疑惑

(1)如果找了一圈发现元素都是 1 怎么办?

是不是直接默认取第一个元素,这样认为就是回到了朴素的 FIFO 机制。

(2)访问的性能问题

这里的遍历可以认为是一个循环链表:

每一个节点内容:

K key;
boolean accessFlag;

朴素的 FIFO 非常简单,直接往队列中扔元素就行,然后淘汰最老的一个元素。

这个如果真的使用链表作为数据结构,那么查找,更新时间复杂度就是 O(N),显然性能一般。

能想到的方案就是 HashMap 中存储 key+双向链表节点。

和性能改进版本的 LRU 对比,就是每次更新不做节点的移除调整,而只是更新对应的标志位。

简单的CLOCK算法

是通过给每一个访问的页面关联一个附加位(reference bit),有些地方也叫做使用位(use bit)。

他的主要思想是:当某一页装入主存时,将use bit初始化为0;如果该页之后又被访问到,使用位也还是标记成1。

对于页面置换算法,候选的帧集合可以看成是一个循环缓冲区,并且有一个指针和缓冲区相关联。遇到页面替换时,指针指向缓冲区的下一帧。

如果这页进入主存后发现没有空余的帧(frame),即所有页面的使用位均为1,那么这时候从指针开始循环一个缓冲区,将之前的使用位都清0,并且留在最初的位置上,换出该桢对应的页。

ps: 这里发现没有空余的帧,会将所有使用位都清零。

例子

以下面这个页面置换过程为例,访问的页面依次是:1,2,3,4,1,2,5,1,2,3,4,5。

主存有4个空闲的帧,每个页面对应的结构为(页面号,使用位)。

最开始页面号1进入主存,主存里面有空闲的帧,将其使用位记成1,由于主存中之前没有页面1,所以会发生缺页中断。

同理随后的页面2,3,4进入主存,将其使用位记成1,发生缺页中断。

当之后的页面1,2进入主存时,由于页面1,2已经在主存中,不做处理。

当之后的页面5进入主存时,主存内没有空余的帧,这时候随着指针循环移动整个缓冲区,将之前页面的使用位全部清0,即这时候页面1,2,3,4对应的使用位全部为0,指针回到最初的位置,将页面1替换出去,页面5换入主存,同时使用位标记成1。

以此类推,可知CLOCK共发生10次缺页中断。

输入图片说明

Gclock(Generalized clock page replacement algorithm)

算法思想

该算法是Clock的变种。

相对于Clock标志位采用的是二进制0和1表示,Gclock的标志位采用的是一个整数,意味着理论上可以一直增加到无穷大。

工作原理

(1)当待缓存对象在缓存中时,把其标记位的值加1。同时,指针指向该对象的下一个对象。

(2)若不在缓存中时,检查指针指向对象的标记位。如果是0,则用待缓存对象替换该对象;否则,把标记位的值减1,指针指向下一个对象。如此直到淘汰一个对象为止。由于标记位的值允许大于1,所以指针可能循环多遍才淘汰一个对象。

ps: 这个有点类似于简化版本的 LFU,统计了对应的出现次数。

WSclock(Working set clock page replacement algorithm)

算法思想

该算法同样是clock的变种,可能是实际运用最广泛的算法。

它采用clock的原理,是ws算法的增强版。

算法数据结构为循环链表,每个缓存对象保存了"最近使用的时间"rt和"是否引用"的R标志位,使用一个周期计时器t。age表示为当前时间和rt的差值

工作原理

(1)当待缓存对象存在缓存中时,更新rt为当前时间。同时,指针指向该对象的下一个对象。

(2)若不存在于缓存中时,如果缓存没满,则更新指针指向位置的rt为当前时间,R为1。同时,指针指向下一个对象。如果满了,则需要淘汰一个对象。检查指针指向的对象,

  • R为1,说明对象在working set中,则重置R为0,指针指向下一个对象。
  • R为0。如果age大于t,说明对象不在working set中,则替换该对象,并置R为1,rt为当前时间。如果age不大于t,则继续寻找淘汰对象。如果回到指针开始的位置,还未寻找到淘汰对象,则淘汰遇到的第一个R为0的对象。

二次机会法(或者enhanced clock)

改进型的CLOCK算法

思路:减少修改页的缺页处理开销

修改Clock算法,使它允许脏页总是在一次时钟头扫描中保留下来,同时使用脏位(dity bit,也叫写位)和使用位来指导置换

算法流程

在之前的CLOCK算法上面除了使用位(used bit),还增加了一个修改位(modified bit),有些地方也叫做dirty bit。

现在每一页有两个状态,分别是(使用位,修改位),可分为以下四种情况考虑:

(0,0):最近没有使用使用也没有修改,最佳状态!

(0,1):修改过但最近没有使用,将会被写

(1,0):使用过但没有被修改,下一轮将再次被用

(1,1):使用过也修改过,下一轮页面置换最后的选择

例子

以下面这个页面置换过程为例:

访问的页面依次是:0,1,3,6,2,4,5,2,5,0,3,1,2,5,4,1,0,其中红色数字表示将要修改的页面,即他们的modified bit将被设置成1,在下图中这些页面用斜体表示,使用位和修改位如下图所示。下面的"Fault ?"表示缺页时查找空闲frame的次数。

输入图片说明

替换顺序

  1. 从指针当前的位置开始寻找主存中满足(使用位,修改位)为(0,0)的页面;
  2. 如果第1步没有找到满足条件的,接着寻找状态为(0,1)页面;
  3. 如果依然没有找到,指针回到最初的位置,将集合中所有页面的使用位设置成0。重复第1步,并且如果有必要,重复第2步,这样一定可以找到将要替换的页面。

java 实现 clock 算法

说明

本文主要实现一个简单版本的 clock 算法,并对常规的实现加上一定的性能优化。(全网可能是独家的,或者说第一个这么实现的)

优化主要是基于性能的考虑,类似于前面对于 LRU 的性能优化,将查询操作从 O(N) 优化到 O(1)。

实现思路

我们定义一个符合当前业务场景的循环链表(这个后期也可以独立出去,有时间单独写一个数据结构项目,便于复用)

定义包含 accessFlag 的节点。

我们使用双向链表,而不是单向链表,这样删除的性能是最好的。

使用 map 保存 key 的信息,避免循环整个链表判断 key 是否存在,用空间换取时间。

好了,接下来就是快乐的编码阶段了。

代码实现

节点定义

/**
 * 循环链表节点
 * @author binbin.hou
 * @since 0.0.15
 * @param <K> key
 * @param <V> value
 */
public class CircleListNode<K,V> {

    /**
     * 键
     * @since 0.0.15
     */
    private K key;

    /**
     * 值
     * @since 0.0.15
     */
    private V value = null;

    /**
     * 是否被访问过
     * @since 0.0.15
     */
    private boolean accessFlag = false;

    /**
     * 后一个节点
     * @since 0.0.15
     */
    private CircleListNode<K, V> pre;

    /**
     * 后一个节点
     * @since 0.0.15
     */
    private CircleListNode<K, V> next;

    //getter & setter
}

这里很简单的几个元素:key, value, accessFlag(是否访问过的标识),然后就是 next, pre 用户实现双向链表。

双向链表实现

基本属性

为了和原来的 Lru 双向链表保持一致,我们实现原来的额接口。

public class LruMapCircleList<K,V> implements ILruMap<K,V> {

    private static final Log log = LogFactory.getLog(LruMapCircleList.class);

    /**
     * 头结点
     * @since 0.0.15
     */
    private CircleListNode<K,V> head;

    /**
     * 映射 map
     * @since 0.0.15
     */
    private Map<K, CircleListNode<K,V>> indexMap;

    public LruMapCircleList() {
        // 双向循环链表
        this.head = new CircleListNode<>(null);
        this.head.next(this.head);
        this.head.pre(this.head);

        indexMap = new HashMap<>();
    }

}

初始化 Head 节点,indexMap 用户保存 key 和双向节点之间的关系。

删除元素

/**
 * 移除元素
 *
 * 1. 是否存在,不存在则忽略
 * 2. 存在则移除,从链表+map中移除
 *
 * head==>1==>2==>head
 *
 * 删除 2 之后:
 * head==>1==>head
 * @param key 元素
 * @since 0.0.15
 */
@Override
public void removeKey(final K key) {
    CircleListNode<K,V> node = indexMap.get(key);
    if(ObjectUtil.isNull(node)) {
        log.warn("对应的删除信息不存在:{}", key);
        return;
    }
    CircleListNode<K,V> pre = node.pre();
    CircleListNode<K,V> next = node.next();
    //1-->(x2)-->3  直接移除2
    pre.next(next);
    next.pre(pre);
    indexMap.remove(key);
    log.debug("Key: {} 从循环链表中移除", key);
}

节点的删除不难,直接从循环链表中移除节点即可,同时移除 indexMap 中的信息。

更新

此处对于 put/get 用的是同一个方法,实际上如果想实现增强版本的 clock 算法,二者还是区分开比较好,不过个人感觉原理差不多,此处就不再实现了,估计这就是淘汰算法的最后一个小节。

/**
 * 放入元素
 *
 * 类似于 FIFO,直接放在队列的最后
 * 
 * head==>1==>head
 * 加入元素:
 *
 * head==>1==>2==>head
 *
 * (1)如果元素不存在,则直接插入。
 * 默认 accessFlag = 0;
 * (2)如果已经存在,则更新 accessFlag=1;
 *
 * @param key 元素
 * @since 0.0.15
 */
@Override
public void updateKey(final K key) {
    CircleListNode<K,V> node = indexMap.get(key);
    // 存在
    if(ObjectUtil.isNotNull(node)) {
        node.accessFlag(true);
        log.debug("节点已存在,设置节点访问标识为 true, key: {}", key);
    } else {
        // 不存在,则插入到最后
        node = new CircleListNode<>(key);
        CircleListNode<K,V> tail = head.pre();
        tail.next(node);
        node.pre(tail);
        node.next(head);
        head.pre(node);
        // 放入 indexMap 中,便于快速定位
        indexMap.put(key, node);
        log.debug("节点不存在,新增节点到链表中:{}", key);
    }
}

这里主要就是区分下节点是否已经存在。

(1)已存在,直接获取节点,更新 accessFlag=true;

(2)不存在:插入新的节点,accessFlag = false

淘汰数据

/**
 * 删除最老的元素
 *
 * (1)从 head.next 开始遍历,如果元素 accessFlag = 0,则直接移除
 * (2)如果 accessFlag=1,则设置其值为0,循环下一个节点。
 *
 * @return 结果
 * @since 0.0.15
 */
@Override
public ICacheEntry<K, V> removeEldest() {
    //fast-fail
    if(isEmpty()) {
        log.error("当前列表为空,无法进行删除");
        throw new CacheRuntimeException("不可删除头结点!");
    }
    // 从最老的元素开始,此处直接从 head.next 开始,后续可以考虑优化记录这个 key
    CircleListNode<K,V> node = this.head;
    while (node.next() != this.head) {
        // 下一个元素
        node = node.next();
        if(!node.accessFlag()) {
            // 未访问,直接淘汰
            K key = node.key();
            this.removeKey(key);
            return CacheEntry.of(key, node.value());
        } else {
            // 设置当前 accessFlag = 0,继续下一个
            node.accessFlag(false);
        }
    }
    // 如果循环一遍都没找到,直接取第一个元素即可。
    CircleListNode<K,V> firstNode = this.head.next();
    return CacheEntry.of(firstNode.key(), firstNode.value());
}

直接遍历节点,遇到 accessFlag=0 的直接淘汰即可。

如果 accessFlag=1,则设置其值为0,然后继续下一个。(这里有点免死金牌只能用一次的感觉)

循环一遍都没有找到,实际上直接取 head.next 即可,降级为 FIFO。当然因为我们已经更新 accessFlag=0 了,实际上继续循环也可以。

  • 实现的不足之处

这里有一个待改进点:我们不见得每次都从开始循环。这样实际上缺点比较明显,导致越先入队的元素第二次一定被淘汰,其他未被访问的元素可能会一直存在,可以用一个元素记住这个位置。(上一次被淘汰的节点的 next 节点),感觉这样才更加符合 clock 算法的思想。

还有一种方法就是不把访问过的 accessFlag 置为0,循环一圈都找不到元素直接降级为 FIFO,不过这个在大部分元素被访问之后,性能会变差。所以还是建议标记一下上次循环的位置。

调用

我们在 cache 满的时候,调用下当前循环链表即可:

import com.github.houbb.cache.api.ICache;
import com.github.houbb.cache.api.ICacheEntry;
import com.github.houbb.cache.api.ICacheEvictContext;
import com.github.houbb.cache.core.model.CacheEntry;
import com.github.houbb.cache.core.support.struct.lru.ILruMap;
import com.github.houbb.cache.core.support.struct.lru.impl.LruMapCircleList;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

/**
 * 淘汰策略-clock 算法
 *
 * @author binbin.hou
 * @since 0.0.15
 */
public class CacheEvictClock<K,V> extends AbstractCacheEvict<K,V> {

    private static final Log log = LogFactory.getLog(CacheEvictClock.class);

    /**
     * 循环链表
     * @since 0.0.15
     */
    private final ILruMap<K,V> circleList;

    public CacheEvictClock() {
        this.circleList = new LruMapCircleList<>();
    }

    @Override
    protected ICacheEntry<K, V> doEvict(ICacheEvictContext<K, V> context) {
        ICacheEntry<K, V> result = null;
        final ICache<K,V> cache = context.cache();
        // 超过限制,移除队尾的元素
        if(cache.size() >= context.size()) {
            ICacheEntry<K,V>  evictEntry = circleList.removeEldest();;
            // 执行缓存移除操作
            final K evictKey = evictEntry.key();
            V evictValue = cache.remove(evictKey);

            log.debug("基于 clock 算法淘汰 key:{}, value: {}", evictKey, evictValue);
            result = new CacheEntry<>(evictKey, evictValue);
        }

        return result;
    }


    /**
     * 更新信息
     * @param key 元素
     * @since 0.0.15
     */
    @Override
    public void updateKey(final K key) {
        this.circleList.updateKey(key);
    }

    /**
     * 移除元素
     *
     * @param key 元素
     * @since 0.0.15
     */
    @Override
    public void removeKey(final K key) {
        this.circleList.removeKey(key);
    }

}

其实调用的地方没什么难度,就是直接调用下方法即可。

测试

好的,代码写完我们来简单的验证一下。

测试代码

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .size(3)
        .evict(CacheEvicts.<String, String>clock())
        .build();
cache.put("A", "hello");
cache.put("B", "world");
cache.put("C", "FIFO");
// 访问一次A
cache.get("A");
cache.put("D", "LRU");
Assert.assertEquals(3, cache.size());
System.out.println(cache.keySet());

日志

[DEBUG] [2020-10-07 11:32:55.396] [main] [c.g.h.c.c.s.s.l.i.LruMapCircleList.updateKey] - 节点不存在,新增节点到链表中:A
[DEBUG] [2020-10-07 11:32:55.398] [main] [c.g.h.c.c.s.s.l.i.LruMapCircleList.updateKey] - 节点不存在,新增节点到链表中:B
[DEBUG] [2020-10-07 11:32:55.401] [main] [c.g.h.c.c.s.s.l.i.LruMapCircleList.updateKey] - 节点不存在,新增节点到链表中:C
[DEBUG] [2020-10-07 11:32:55.403] [main] [c.g.h.c.c.s.s.l.i.LruMapCircleList.updateKey] - 节点已存在,设置节点访问标识为 true, key: A
[DEBUG] [2020-10-07 11:32:55.404] [main] [c.g.h.c.c.s.s.l.i.LruMapCircleList.removeKey] - Key: B 从循环链表中移除
[DEBUG] [2020-10-07 11:32:55.406] [main] [c.g.h.c.c.s.e.CacheEvictClock.doEvict] - 基于 clock 算法淘汰 key:B, value: world
[DEBUG] [2020-10-07 11:32:55.410] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict
[DEBUG] [2020-10-07 11:32:55.411] [main] [c.g.h.c.c.s.s.l.i.LruMapCircleList.updateKey] - 节点不存在,新增节点到链表中:D
[D, A, C]

符合我们的预期。

LRU、FIFO与Clock的比较

LRU和FIFO本质都是先进先出的思路,但LRU是针对页面的最近访问时间来进行排序,所以需要在每一次页面访问的时候动态的调整各个页面之间的先后顺序(每一个页面的最近访问时间变了);而FIFO针对页面进入内存的时间来进行排序,这个时间是固定不变的,所以页面之间的先后顺序是固定不变的。

如果程序局部性,则LRU会很好。如果内存中所有页面都没有被访问过会退化为FIFO(如页面进入内存后没有被访问,最近访问时间与进入内存的时间相同)。

LRU算法性能较好,但系统开销较大;FIFO算法的系统的开销较小,但可能发生Belady现象。

因此,择衷的办法就是Clock算法,在每一次页面访问时,它不必去动态调整页面在链表中的顺序,而仅仅是做一个标记,等待发生缺页中断的时候,再把它移动到链表的末尾。

对于内存当中未被访问的页面,Clock算法的表现与LRU一样好,而对于那些曾经访问过的页面,它不能像LRU那样记住它们的准确访问顺序。

置换算法补充

常见的置换算法,我们基本已经讲述了一遍了。

不过算法的变种,不同场景的算法也比较多,这里补充没有详解的算法,此处就不做对应的实现了。

目的为了完善整个淘汰算法的认知体系。

最佳置换算法(OPT)

最佳(Optimal, OPT)置换算法所选择的被淘汰页面将是以后永不使用的,或者是在最长时间内不再被访问的页面,这样可以保证获得最低的缺页率。

但由于人们目前无法预知进程在内存下的若千页面中哪个是未来最长时间内不再被访问的,因而该算法无法实现。

最佳置换算法可以用来评价其他算法。假定系统为某进程分配了三个物理块,并考虑有以下页面号引用串:

7, 0, 1, 2, 0, 3, 0, 4, 2, 3, 0, 3, 2, 1, 2, 0, 1, 7, 0, 1

进程运行时,先将7, 0, 1三个页面依次装入内存。

进程要访问页面2时,产生缺页中断,根据最佳置换算法,选择第18次访问才需调入的页面7予以淘汰。

然后,访问页面0时,因为已在内存中所以不必产生缺页中断。访问页面3时又会根据最佳置换算法将页面1淘汰……依此类推,如图3-26所示。

从图中可以看出釆用最佳置换算法时的情况。

可以看到,发生缺页中断的次数为9,页面置换的次数为6。

输入图片说明

当然这个是理论算法,实际是无法实现的,因为我们无法预知后面的数据会被如何使用。

页面缓冲算法(PBA:Page Buffering Algorithm) 

虽然LRU和Clock置换算法都比FIFO算法好,但它们都需要一定的硬件支持,并需付出较多的开销,而且,置换一个已修改的页比置换未修改页的开销要大。

而页面缓冲算法(PBA)则既可改善分页系统的性能,又可采用一种较简单的置换策略。

VAX/VMS操作系统便是使用页面缓冲算法。它采用了前述的可变分配和局部置换方式,置换算法采用的是FIFO。

该算法规定将一个被淘汰的页放入两个链表中的一个,即如果页面未被修改,就将它直接放入空闲链表中;否则,便放入已修改页面的链表中。须注意的是,这时页面在内存中并不做物理上的移动,而只是将页表中的表项移到上述两个链表之一中。

空闲页面链表,实际上是一个空闲物理块链表,其中的每个物理块都是空闲的,因此,可在其中装入程序或数据。当需要读入一个页面时,便可利用空闲物理块链表中的第一个物理块来装入该页。当有一个未被修改的页要换出时,实际上并不将它换出内存,而是把该未被修改的页所在的物理块挂在自由页链表的末尾。

类似地,在置换一个已修改的页面时,也将其所在的物理块挂在修改页面链表的末尾。利用这种方式可使已被修改的页面和未被修改的页面都仍然保留在内存中。当该进程以后再次访问这些页面时,只需花费较小的开销,使这些页面又返回到该进程的驻留集中。当被修改的页面数目达到一定值时,例如64个页面,再将它们一起写回到磁盘上,从而显著地减少了磁盘I/O的操作次数。

一个较简单的页面缓冲算法已在MACH操作系统中实现了,只是它没有区分已修改页面和未修改页面。

置换算法对比

算法注释
最优算法不可实现,但可作为基准
NRU(最近未使用)算法LRU的粗糙的近似
FIFO算法可能抛弃重要(常使用)页面
第二次机会算法比FIFO有大的改善
时钟算法现实的
LRU(最近最少使用)算法很优秀,但难以实现
NFU(最不经常使用)算法LRU的近似
老化算法非常近似LRU
工作集算法实现起来开销很大
工作集时钟算法好的有效算法

小结

clock 算法算是一种权衡,在实际的实践应用中,操作系统选择的就是这种算法。

个人理解clock 的好处就是不用频繁地每次访问都去更新元素的位置,只需要淘汰的时候进行一次更新即可,我们在 LRU 中虽然使用双向链表优化,时间复杂度为 O(1),但是还是比较浪费的。

缓存的淘汰算法到这里基本是告一段落了,感谢你的支持,愿你有所收获。

开源地址:https://github.com/houbb/cache

觉得本文对你有帮助的话,欢迎点赞评论收藏关注一波。你的鼓励,是我最大的动力~

不知道你有哪些收获呢?或者有其他更多的想法,欢迎留言区和我一起讨论,期待与你的思考相遇。

深入学习

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 10月6日

java 从零开始手写 redis(十)缓存淘汰算法 LFU 最少使用频次

前言

java从零手写实现redis(一)如何实现固定大小的缓存?

java从零手写实现redis(三)redis expire 过期原理

java从零手写实现redis(三)内存数据如何重启不丢失?

java从零手写实现redis(四)添加监听器

java从零手写实现redis(五)过期策略的另一种实现思路

java从零手写实现redis(六)AOF 持久化原理详解及实现

java从零手写实现redis(七)LRU 缓存淘汰策略详解

从零开始手写 redis(八)朴素 LRU 淘汰算法性能优化

本节一起来学习下另一个常用的缓存淘汰算法,LFU 最少使用频次算法。

LFU 基础知识

概念

LFU(Least Frequently Used)即最近最不常用.看名字就知道是个基于访问频次的一种算法。

LRU是基于时间的,会将时间上最不常访问的数据给淘汰,在算法表现上是放到列表的顶部;LFU为将频率上最不常访问的数据淘汰.

既然是基于频率的,就需要有存储每个数据访问的次数.

从存储空间上,较LRU会多出一些持有计数的空间.

核心思想

如果一个数据在最近一段时间内使用次数很少,那么在将来一段时间内被使用的可能性也很小。

实现思路

O(N) 的删除

为了能够淘汰最少使用的数据,个人第一直觉就是直接一个 HashMap<String, Interger>, String 对应 key 信息,Integer 对应次数。

每次访问到就去+1,设置和读取的时间复杂度都是 O(1);不过删除就比较麻烦了,需要全部遍历对比,时间复杂度为 O(n);

O(logn) 的删除

另外还有一种实现思路就是利用小顶堆+hashmap,小顶堆插入、删除操作都能达到O(logn)时间复杂度,因此效率相比第一种实现方法更加高效。比如 TreeMap。

O(1) 的删除

是否能够更进一步优化呢?

其实 O(1) 的算法是有的,参见这篇 paper:

An O(1) algorithm for implementing the LFU cache eviction scheme

简单说下个人的想法:

我们要想实现 O(1) 的操作,肯定离不开 Hash 的操作,我们 O(N) 的删除中就实现了 O(1) 的 put/get。

但是删除性能比较差,因为需要寻找次数最少的比较耗时。

private Map<K, Node> map; // key和数据的映射
private Map<Integer, LinkedHashSet<Node>> freqMap; // 数据频率和对应数据组成的链表

class Node {
    K key;
    V value;
    int frequency = 1;
}

我们使用双 Hash 基本上就可以解决这个问题了。

map 中存放 key 和节点之间的映射关系。put/get 肯定都是 O(1) 的。

key 映射的 node 中,有对应的频率 frequency 信息;相同的频率都会通过 freqMap 进行关联,可以快速通过频率获取对应的链表。

删除也变得非常简单了,基本可以确定需要删除的最低频次是1,如果不是最多从 1...n 开始循环,最小 freq 选择链表的第一个元素开始删除即可。

至于链表本身的优先级,那么可以根据 FIFO,或者其他你喜欢的方式。

paper 的核心内容介绍

他山之石,可以攻玉。

我们在实现代码之前,先来读一读这篇 O(1) 的 paper。

介绍

本文的结构如下。

对LFU用例的描述,它可以证明优于其他缓存逐出算法

LFU缓存实现应支持的字典操作。 这些是确定策略运行时复杂度的操作

当前最著名的LFU算法及其运行时复杂度的描述

提出的LFU算法的说明; 每个操作的运行时复杂度为O(1)

LFU的用途

考虑用于HTTP协议的缓存网络代理应用程序。

该代理通常位于Internet与用户或一组用户之间。

它确保所有用户都能够访问Internet,并实现所有可共享资源的共享,以实现最佳的网络利用率和响应速度。

这样的缓存代理应该尝试在可支配的有限数量的存储或内存中最大化其可以缓存的数据量。

通常,在将静态资源(例如图像,CSS样式表和javascript代码)替换为较新版本之前,可以很容易地将它们缓存很长时间。

这些静态资源或程序员所谓的“资产”几乎包含在每个页面中,因此缓存它们是最有益的,因为几乎每个请求都将需要它们。

此外,由于要求网络代理每秒处理数千个请求,因此应将这样做所需的开销保持在最低水平。

为此,它应该仅驱逐那些不经常使用的资源。

因此,应该将经常使用的资源保持在不那么频繁使用的资源上,因为前者已经证明自己在一段时间内是有用的。

当然,有一个说法与之相反,它说将来可能不需要大量使用的资源,但是我们发现在大多数情况下情况并非如此。

例如,频繁使用页面的静态资源始终由该页面的每个用户请求。

因此,当内存不足时,这些缓存代理可以使用LFU缓存替换策略来驱逐其缓存中使用最少的项目。

LRU在这里也可能是适用的策略,但是当请求模式使得所有请求的项目都没有进入缓存并且以循环方式请求这些项目时,LRU将会失败。

ps: 数据的循环请求,会导致 LRU 刚好不适应这个场景。

在使用LRU的情况下,项目将不断进入和离开缓存,而没有用户请求访问缓存。

但是,在相同条件下,LFU算法的性能会更好,大多数缓存项会导致缓存命中。

LFU算法的病理行为并非没有可能。

我们不是在这里提出LFU的案例,而是试图证明如果LFU是适用的策略,那么比以前发布的方法有更好的实现方法。

LFU缓存支持的字典操作

当我们谈到缓存逐出算法时,我们主要需要对缓存数据进行3种不同的操作。

  1. 在缓存中设置(或插入)项目
  2. 检索(或查找)缓存中的项目; 同时增加其使用计数(对于LFU)
  3. 从缓存中逐出(或删除)最少使用(或作为逐出算法的策略)

LFU算法的当前最著名的复杂性

在撰写本文时,针对LFU缓存逐出策略的上述每个操作的最著名的运行时如下:

插入:O(log n)

查找:O(log n)

删除:O(log n)

这些复杂度值直接从二项式堆实现和标准无冲突哈希表中获得。

使用最小堆数据结构和哈希图可以轻松有效地实施LFU缓存策略。

最小堆是基于(项目的)使用计数创建的,并且通过元素的键为哈希表建立索引。

无冲突哈希表上的所有操作的顺序均为O(1),因此LFU缓存的运行时间由最小堆上的操作的运行时间控制。

将元素插入高速缓存时,它将以1的使用计数进入,由于插入最小堆的开销为O(log n),因此将其插入LFU高速缓存需要O(log n)时间。

在查找元素时,可以通过哈希函数找到该元素,该哈希函数将键哈希到实际元素。同时,使用计数(最大堆中的计数)加1,这导致最小堆的重组,并且元素从根移开。

由于元素在任何阶段都可以向下移动至log(n)电平,因此此操作也需要时间O(log n)。

当选择一个元素将其逐出并最终从堆中删除时,它可能导致堆数据结构的重大重组。

使用计数最少的元素位于最小堆的根。

删除最小堆的根包括将根节点替换为堆中的最后一个叶节点,并将该节点起泡到正确的位置。

此操作的运行时复杂度也为O(log n)。

提出的LFU算法

对于可以在LFU缓存上执行的每个字典操作(插入,查找和删除),提出的LFU算法的运行时复杂度为O(1)。

这是通过维护2个链接列表来实现的。一个用于访问频率,另一个用于具有相同访问频率的所有元素。

哈希表用于按键访问元素(为清楚起见,下图中未显示)。

双链表用于将代表一组具有相同访问频率的节点的节点链接在一起(在下图中显示为矩形块)。

我们将此双重链接列表称为频率列表。具有相同访问频率的这组节点实际上是此类节点的双向链接列表(在下图中显示为圆形节点)。

我们将此双向链接列表(在特定频率本地)称为节点列表。

节点列表中的每个节点都有一个指向其父节点的指针。

频率列表(为清楚起见,未在图中显示)。因此,节点x和您将有一个指向节点1的指针,节点z和a将有一个指向节点2的指针,依此类推...

输入图片说明

下面的伪代码显示了如何初始化LFU缓存。

用于按键定位元素的哈希表由按键变量表示。

为了简化实现,我们使用SET代替链表来存储具有相同访问频率的元素。

变量项是标准的SET数据结构,其中包含具有相同访问频率的此类元素的键。

它的插入,查找和删除运行时复杂度为O(1)。

输入图片说明

伪代码

后面的都是一些伪代码了,我们条国内。

理解其最核心的思想就行了,下面我们上真代码。

感受

这个 O(1) 的算法最核心的地方实际上不多,放在 leetcode 应该算是一个中等难度的题目。

不过很奇怪,这篇论文是在 2010 年提出的,估计以前都以为 O(logn) 是极限了?

java 代码实现

基本属性

public class CacheEvictLfu<K,V> extends AbstractCacheEvict<K,V> {

    private static final Log log = LogFactory.getLog(CacheEvictLfu.class);

    /**
     * key 映射信息
     * @since 0.0.14
     */
    private final Map<K, FreqNode<K,V>> keyMap;

    /**
     * 频率 map
     * @since 0.0.14
     */
    private final Map<Integer, LinkedHashSet<FreqNode<K,V>>> freqMap;

    /**
     *
     * 最小频率
     * @since 0.0.14
     */
    private int minFreq;

    public CacheEvictLfu() {
        this.keyMap = new HashMap<>();
        this.freqMap = new HashMap<>();
        this.minFreq = 1;
    }

}

节点定义

  • FreqNode.java
public class FreqNode<K,V> {

    /**
     * 键
     * @since 0.0.14
     */
    private K key;

    /**
     * 值
     * @since 0.0.14
     */
    private V value = null;

    /**
     * 频率
     * @since 0.0.14
     */
    private int frequency = 1;

    public FreqNode(K key) {
        this.key = key;
    }

    //fluent getter & setter
    // toString() equals() hashCode()
}

移除元素

/**
 * 移除元素
 *
 * 1. 从 freqMap 中移除
 * 2. 从 keyMap 中移除
 * 3. 更新 minFreq 信息
 *
 * @param key 元素
 * @since 0.0.14
 */
@Override
public void removeKey(final K key) {
    FreqNode<K,V> freqNode = this.keyMap.remove(key);
    //1. 根据 key 获取频率
    int freq = freqNode.frequency();
    LinkedHashSet<FreqNode<K,V>> set = this.freqMap.get(freq);
    //2. 移除频率中对应的节点
    set.remove(freqNode);
    log.debug("freq={} 移除元素节点:{}", freq, freqNode);
    //3. 更新 minFreq
    if(CollectionUtil.isEmpty(set) && minFreq == freq) {
        minFreq--;
        log.debug("minFreq 降低为:{}", minFreq);
    }
}

更新元素

/**
 * 更新元素,更新 minFreq 信息
 * @param key 元素
 * @since 0.0.14
 */
@Override
public void updateKey(final K key) {
    FreqNode<K,V> freqNode = keyMap.get(key);
    //1. 已经存在
    if(ObjectUtil.isNotNull(freqNode)) {
        //1.1 移除原始的节点信息
        int frequency = freqNode.frequency();
        LinkedHashSet<FreqNode<K,V>> oldSet = freqMap.get(frequency);
        oldSet.remove(freqNode);
        //1.2 更新最小数据频率
        if (minFreq == frequency && oldSet.isEmpty()) {
            minFreq++;
            log.debug("minFreq 增加为:{}", minFreq);
        }
        //1.3 更新频率信息
        frequency++;
        freqNode.frequency(frequency);
        //1.4 放入新的集合
        this.addToFreqMap(frequency, freqNode);
    } else {
        //2. 不存在
        //2.1 构建新的元素
        FreqNode<K,V> newNode = new FreqNode<>(key);
        //2.2 固定放入到频率为1的列表中
        this.addToFreqMap(1, newNode);
        //2.3 更新 minFreq 信息
        this.minFreq = 1;
        //2.4 添加到 keyMap
        this.keyMap.put(key, newNode);
    }
}

/**
 * 加入到频率 MAP
 * @param frequency 频率
 * @param freqNode 节点
 */
private void addToFreqMap(final int frequency, FreqNode<K,V> freqNode) {
    LinkedHashSet<FreqNode<K,V>> set = freqMap.get(frequency);
    if (set == null) {
        set = new LinkedHashSet<>();
    }
    set.add(freqNode);
    freqMap.put(frequency, set);
    log.debug("freq={} 添加元素节点:{}", frequency, freqNode);
}

数据淘汰

@Override
protected ICacheEntry<K, V> doEvict(ICacheEvictContext<K, V> context) {
    ICacheEntry<K, V> result = null;
    final ICache<K,V> cache = context.cache();
    // 超过限制,移除频次最低的元素
    if(cache.size() >= context.size()) {
        FreqNode<K,V> evictNode = this.getMinFreqNode();
        K evictKey = evictNode.key();
        V evictValue = cache.remove(evictKey);
        log.debug("淘汰最小频率信息, key: {}, value: {}, freq: {}",
                evictKey, evictValue, evictNode.frequency());
        result = new CacheEntry<>(evictKey, evictValue);
    }
    return result;
}

/**
 * 获取最小频率的节点
 *
 * @return 结果
 * @since 0.0.14
 */
private FreqNode<K, V> getMinFreqNode() {
    LinkedHashSet<FreqNode<K,V>> set = freqMap.get(minFreq);
    if(CollectionUtil.isNotEmpty(set)) {
        return set.iterator().next();
    }
    throw new CacheRuntimeException("未发现最小频率的 Key");
}

测试

代码

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .size(3)
        .evict(CacheEvicts.<String, String>lfu())
        .build();
cache.put("A", "hello");
cache.put("B", "world");
cache.put("C", "FIFO");
// 访问一次A
cache.get("A");
cache.put("D", "LRU");

Assert.assertEquals(3, cache.size());
System.out.println(cache.keySet());

日志

[DEBUG] [2020-10-03 21:23:43.722] [main] [c.g.h.c.c.s.e.CacheEvictLfu.addToFreqMap] - freq=1 添加元素节点:FreqNode{key=A, value=null, frequency=1}
[DEBUG] [2020-10-03 21:23:43.723] [main] [c.g.h.c.c.s.e.CacheEvictLfu.addToFreqMap] - freq=1 添加元素节点:FreqNode{key=B, value=null, frequency=1}
[DEBUG] [2020-10-03 21:23:43.725] [main] [c.g.h.c.c.s.e.CacheEvictLfu.addToFreqMap] - freq=1 添加元素节点:FreqNode{key=C, value=null, frequency=1}
[DEBUG] [2020-10-03 21:23:43.727] [main] [c.g.h.c.c.s.e.CacheEvictLfu.addToFreqMap] - freq=2 添加元素节点:FreqNode{key=A, value=null, frequency=2}
[DEBUG] [2020-10-03 21:23:43.728] [main] [c.g.h.c.c.s.e.CacheEvictLfu.doEvict] - 淘汰最小频率信息, key: B, value: world, freq: 1
[DEBUG] [2020-10-03 21:23:43.731] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict
[DEBUG] [2020-10-03 21:23:43.732] [main] [c.g.h.c.c.s.e.CacheEvictLfu.addToFreqMap] - freq=1 添加元素节点:FreqNode{key=D, value=null, frequency=1}
[D, A, C]

LFU vs LRU

区别

LFU是基于访问频次的模式,而LRU是基于访问时间的模式。

优势

在数据访问符合正态分布时,相比于LRU算法,LFU算法的缓存命中率会高一些。

劣势

  • LFU的复杂度要比LRU更高一些。
  • 需要维护数据的访问频次,每次访问都需要更新。
  • 早期的数据相比于后期的数据更容易被缓存下来,导致后期的数据很难被缓存。
  • 新加入缓存的数据很容易被剔除,像是缓存的末端发生“抖动”。

小结

不过实际实践中,LFU 的应用场景实际并没有那么广泛。

因为真实的数据都是有倾斜的,热点数据才是常态,所以 LRU 的性能一般情况下优于 LFU。

开源地址:https://github.com/houbb/cache

觉得本文对你有帮助的话,欢迎点赞评论收藏关注一波,你的鼓励,是我最大的动力~

目前我们实现了性能比较优异的 LRU 和 LFU 算法,但是操作系统实际采用的却不是这两种算法,我们下一节将一起学习下操作系统青睐的 clock 淘汰算法。

不知道你有哪些收获呢?或者有其他更多的想法,欢迎留言区和我一起讨论,期待与你的思考相遇。

深入学习

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 10月5日

java 从零开始手写 redis(九)LRU 缓存淘汰算法如何避免缓存污染

前言

java从零手写实现redis(一)如何实现固定大小的缓存?

java从零手写实现redis(三)redis expire 过期原理

java从零手写实现redis(三)内存数据如何重启不丢失?

java从零手写实现redis(四)添加监听器

java从零手写实现redis(五)过期策略的另一种实现思路

java从零手写实现redis(六)AOF 持久化原理详解及实现

java从零手写实现redis(七)LRU 缓存淘汰策略详解

从零开始手写 redis(八)朴素 LRU 淘汰算法性能优化

前两节我们分别实现了 LRU 算法,并且进行了性能优化。

本节作为 LRU 算法的最后一节,主要解决一下缓存污染的问题。

LRU 基础知识

是什么

LRU算法全称是最近最少使用算法(Least Recently Use),广泛的应用于缓存机制中。

当缓存使用的空间达到上限后,就需要从已有的数据中淘汰一部分以维持缓存的可用性,而淘汰数据的选择就是通过LRU算法完成的。

LRU算法的基本思想是基于局部性原理的时间局部性:

如果一个信息项正在被访问,那么在近期它很可能还会被再次访问。

拓展阅读

Apache Commons LRUMAP 源码详解

Redis 当做 LRU MAP 使用

java 从零开始手写 redis(七)redis LRU 驱除策略详解及实现

朴素 LRU 算法的不足

当存在热点数据时,LRU的效率很好,但偶发性的、周期性的批量操作会导致LRU命中率急剧下降,缓存污染情况比较严重。

扩展算法

1. LRU-K

LRU-K中的K代表最近使用的次数,因此LRU可以认为是LRU-1。

LRU-K的主要目的是为了解决LRU算法“缓存污染”的问题,其核心思想是将“最近使用过1次”的判断标准扩展为“最近使用过K次”。

相比LRU,LRU-K需要多维护一个队列,用于记录所有缓存数据被访问的历史。只有当数据的访问次数达到K次的时候,才将数据放入缓存。

当需要淘汰数据时,LRU-K会淘汰第K次访问时间距当前时间最大的数据。

数据第一次被访问时,加入到历史访问列表,如果数据在访问历史列表中没有达到K次访问,则按照一定的规则(FIFO,LRU)淘汰;

当访问历史队列中的数据访问次数达到K次后,将数据索引从历史队列中删除,将数据移到缓存队列中,并缓存数据,缓存队列重新按照时间排序;

缓存数据队列中被再次访问后,重新排序,需要淘汰数据时,淘汰缓存队列中排在末尾的数据,即“淘汰倒数K次访问离现在最久的数据”。

LRU-K具有LRU的优点,同时还能避免LRU的缺点,实际应用中LRU-2是综合最优的选择。

由于LRU-K还需要记录那些被访问过、但还没有放入缓存的对象,因此内存消耗会比LRU要多。

2. two queue

Two queues(以下使用2Q代替)算法类似于LRU-2,不同点在于2Q将LRU-2算法中的访问历史队列(注意这不是缓存数据的)改为一个FIFO缓存队列,即:2Q算法有两个缓存队列,一个是FIFO队列,一个是LRU队列。

当数据第一次访问时,2Q算法将数据缓存在FIFO队列里面,当数据第二次被访问时,则将数据从FIFO队列移到LRU队列里面,两个队列各自按照自己的方法淘汰数据。

新访问的数据插入到FIFO队列中,如果数据在FIFO队列中一直没有被再次访问,则最终按照FIFO规则淘汰;

如果数据在FIFO队列中再次被访问到,则将数据移到LRU队列头部,如果数据在LRU队列中再次被访问,则将数据移动LRU队列头部,LRU队列淘汰末尾的数据。

3. Multi Queue(MQ)

MQ算法根据访问频率将数据划分为多个队列,不同的队列具有不同的访问优先级,其核心思想是:优先缓存访问次数多的数据

详细的算法结构图如下,Q0,Q1....Qk代表不同的优先级队列,Q-history代表从缓存中淘汰数据,但记录了数据的索引和引用次数的队列:

新插入的数据放入Q0,每个队列按照LRU进行管理,当数据的访问次数达到一定次数,需要提升优先级时,将数据从当前队列中删除,加入到高一级队列的头部;为了防止高优先级数据永远不会被淘汰,当数据在指定的时间里没有被访问时,需要降低优先级,将数据从当前队列删除,加入到低一级的队列头部;需要淘汰数据时,从最低一级队列开始按照LRU淘汰,每个队列淘汰数据时,将数据从缓存中删除,将数据索引加入Q-history头部。

如果数据在Q-history中被重新访问,则重新计算其优先级,移到目标队列头部。

Q-history按照LRU淘汰数据的索引。

MQ需要维护多个队列,且需要维护每个数据的访问时间,复杂度比LRU高。

LRU算法对比

对比点对比
命中率LRU-2 > MQ(2) > 2Q > LRU
复杂度LRU-2 > MQ(2) > 2Q > LRU
代价LRU-2 > MQ(2) > 2Q > LRU

个人理解

实际上上面的几个算法,思想上大同小异。

核心目的:解决批量操作导致热点数据失效,缓存被污染的问题。

实现方式:增加一个队列,用来保存只访问一次的数据,然后根据次数不同,放入到 LRU 中。

只访问一次的队列,可以是 FIFO 队列,可以是 LRU,我们来实现一下 2Q 和 LRU-2 两种实现。

2Q

实现思路

实际上就是我们以前的 FIFO + LRU 二者的结合。

代码实现

基本属性

public class CacheEvictLru2Q<K,V> extends AbstractCacheEvict<K,V> {

    private static final Log log = LogFactory.getLog(CacheEvictLru2Q.class);

    /**
     * 队列大小限制
     *
     * 降低 O(n) 的消耗,避免耗时过长。
     * @since 0.0.13
     */
    private static final int LIMIT_QUEUE_SIZE = 1024;

    /**
     * 第一次访问的队列
     * @since 0.0.13
     */
    private Queue<K> firstQueue;

    /**
     * 头结点
     * @since 0.0.13
     */
    private DoubleListNode<K,V> head;

    /**
     * 尾巴结点
     * @since 0.0.13
     */
    private DoubleListNode<K,V> tail;

    /**
     * map 信息
     *
     * key: 元素信息
     * value: 元素在 list 中对应的节点信息
     * @since 0.0.13
     */
    private Map<K, DoubleListNode<K,V>> lruIndexMap;

    public CacheEvictLru2Q() {
        this.firstQueue = new LinkedList<>();
        this.lruIndexMap = new HashMap<>();
        this.head = new DoubleListNode<>();
        this.tail = new DoubleListNode<>();

        this.head.next(this.tail);
        this.tail.pre(this.head);
    }

}

数据淘汰

数据淘汰的逻辑:

当缓存大小,已经达到最大限制时执行:

(1)优先淘汰 firstQueue 中的数据

(2)如果 firstQueue 中数据为空,则淘汰 lruMap 中的数据信息。

这里有一个假设:我们认为被多次访问的数据,重要性高于被只访问了一次的数据。

@Override
protected ICacheEntry<K, V> doEvict(ICacheEvictContext<K, V> context) {
    ICacheEntry<K, V> result = null;
    final ICache<K,V> cache = context.cache();
    // 超过限制,移除队尾的元素
    if(cache.size() >= context.size()) {
        K evictKey = null;
        //1. firstQueue 不为空,优先移除队列中元素
        if(!firstQueue.isEmpty()) {
            evictKey = firstQueue.remove();
        } else {
            // 获取尾巴节点的前一个元素
            DoubleListNode<K,V> tailPre = this.tail.pre();
            if(tailPre == this.head) {
                log.error("当前列表为空,无法进行删除");
                throw new CacheRuntimeException("不可删除头结点!");
            }
            evictKey = tailPre.key();
        }
        // 执行移除操作
        V evictValue = cache.remove(evictKey);
        result = new CacheEntry<>(evictKey, evictValue);
    }
    return result;
}

数据删除

当数据被删除时调用:

这个逻辑和以前类似,只是多了一个 FIFO 队列的移除。

/**
 * 移除元素
 *
 * 1. 获取 map 中的元素
 * 2. 不存在直接返回,存在执行以下步骤:
 * 2.1 删除双向链表中的元素
 * 2.2 删除 map 中的元素
 *
 * @param key 元素
 * @since 0.0.13
 */
@Override
public void removeKey(final K key) {
    DoubleListNode<K,V> node = lruIndexMap.get(key);
    //1. LRU 删除逻辑
    if(ObjectUtil.isNotNull(node)) {
        // A<->B<->C
        // 删除 B,需要变成: A<->C
        DoubleListNode<K,V> pre = node.pre();
        DoubleListNode<K,V> next = node.next();
        pre.next(next);
        next.pre(pre);
        // 删除 map 中对应信息
        this.lruIndexMap.remove(node.key());
    } else {
        //2. FIFO 删除逻辑(O(n) 时间复杂度)
        firstQueue.remove(key);
    }
}

数据的更新

当数据被访问时,提升数据的优先级。

(1)如果在 lruMap 中,则首先移除,然后放入到头部

(2)如果不在 lruMap 中,但是在 FIFO 队列,则从 FIFO 队列中移除,添加到 LRU map 中。

(3)如果都不在,直接加入到 FIFO 队列中即可。

/**
 * 放入元素
 * 1. 如果 lruIndexMap 已经存在,则处理 lru 队列,先删除,再插入。
 * 2. 如果 firstQueue 中已经存在,则处理 first 队列,先删除 firstQueue,然后插入 Lru。
 * 1 和 2 是不同的场景,但是代码实际上是一样的,删除逻辑中做了二种场景的兼容。
 *
 * 3. 如果不在1、2中,说明是新元素,直接插入到 firstQueue 的开始即可。
 *
 * @param key 元素
 * @since 0.0.13
 */
@Override
public void updateKey(final K key) {
    //1.1 是否在 LRU MAP 中
    //1.2 是否在 firstQueue 中
    DoubleListNode<K,V> node = lruIndexMap.get(key);
    if(ObjectUtil.isNotNull(node)
        || firstQueue.contains(key)) {
        //1.3 删除信息
        this.removeKey(key);
        //1.4 加入到 LRU 中
        this.addToLruMapHead(key);
        return;
    }
    //2. 直接加入到 firstQueue 队尾
    //        if(firstQueue.size() >= LIMIT_QUEUE_SIZE) {
//            // 避免第一次访问的列表一直增长,移除队头的元素
//            firstQueue.remove();
//        }
    firstQueue.add(key);
}

这里我想到了一个优化点,限制 firstQueue 的一直增长,因为遍历的时间复杂度为 O(n),所以限制最大的大小为 1024。

如果超过了,则把 FIFO 中的元素先移除掉。

不过只移除 FIFO,不移除 cache,会导致二者的活跃程度不一致;

如果同时移除,但是 cache 的大小还没有满足,可能会导致超出用户的预期,这个可以作为一个优化点,暂时注释掉。

测试

代码

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .size(3)
        .evict(CacheEvicts.<String, String>lru2Q())
        .build();

cache.put("A", "hello");
cache.put("B", "world");
cache.put("C", "FIFO");

// 访问一次A
cache.get("A");
cache.put("D", "LRU");

Assert.assertEquals(3, cache.size());
System.out.println(cache.keySet());

效果

[DEBUG] [2020-10-03 13:15:50.670] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict
[D, A, C]

LRU-2 实现

说明

FIFO 中的缺点还是比较明显的,需要 O(n) 的时间复杂度做遍历。

而且命中率和 LRU-2 比起来还是会差一点。

准备工作

这里 LRU map 出现了多次,我们为了方便,将 LRU map 简单的封装为一个数据结构。

我们使用双向链表+HashMap 实现一个简单版本的。

节点

node 节点和以前一致:

public class DoubleListNode<K,V> {

    /**
     * 键
     * @since 0.0.12
     */
    private K key;

    /**
     * 值
     * @since 0.0.12
     */
    private V value;

    /**
     * 前一个节点
     * @since 0.0.12
     */
    private DoubleListNode<K,V> pre;

    /**
     * 后一个节点
     * @since 0.0.12
     */
    private DoubleListNode<K,V> next;

    //fluent getter & setter
}

接口

我们根据自己的需要,暂时定义 3 个最重要的方法。

/**
 * LRU map 接口
 * @author binbin.hou
 * @since 0.0.13
 */
public interface ILruMap<K,V> {

    /**
     * 移除最老的元素
     * @return 移除的明细
     * @since 0.0.13
     */
    ICacheEntry<K, V> removeEldest();

    /**
     * 更新 key 的信息
     * @param key key
     * @since 0.0.13
     */
    void updateKey(final K key);

    /**
     * 移除对应的 key 信息
     * @param key key
     * @since 0.0.13
     */
    void removeKey(final K key);

    /**
     * 是否为空
     * @return 是否
     * @since 0.0.13
     */
    boolean isEmpty();

    /**
     * 是否包含元素
     * @param key 元素
     * @return 结果
     * @since 0.0.13
     */
    boolean contains(final K key);
}

实现

我们基于 DoubleLinkedList + HashMap 实现。

就是把上一节中的实现整理一下即可。

import com.github.houbb.cache.api.ICacheEntry;
import com.github.houbb.cache.core.exception.CacheRuntimeException;
import com.github.houbb.cache.core.model.CacheEntry;
import com.github.houbb.cache.core.model.DoubleListNode;
import com.github.houbb.cache.core.support.struct.lru.ILruMap;
import com.github.houbb.heaven.util.lang.ObjectUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * 基于双向列表的实现
 * @author binbin.hou
 * @since 0.0.13
 */
public class LruMapDoubleList<K,V> implements ILruMap<K,V> {

    private static final Log log = LogFactory.getLog(LruMapDoubleList.class);

    /**
     * 头结点
     * @since 0.0.13
     */
    private DoubleListNode<K,V> head;

    /**
     * 尾巴结点
     * @since 0.0.13
     */
    private DoubleListNode<K,V> tail;

    /**
     * map 信息
     *
     * key: 元素信息
     * value: 元素在 list 中对应的节点信息
     * @since 0.0.13
     */
    private Map<K, DoubleListNode<K,V>> indexMap;

    public LruMapDoubleList() {
        this.indexMap = new HashMap<>();
        this.head = new DoubleListNode<>();
        this.tail = new DoubleListNode<>();

        this.head.next(this.tail);
        this.tail.pre(this.head);
    }

    @Override
    public ICacheEntry<K, V> removeEldest() {
        // 获取尾巴节点的前一个元素
        DoubleListNode<K,V> tailPre = this.tail.pre();
        if(tailPre == this.head) {
            log.error("当前列表为空,无法进行删除");
            throw new CacheRuntimeException("不可删除头结点!");
        }

        K evictKey = tailPre.key();
        V evictValue = tailPre.value();

        return CacheEntry.of(evictKey, evictValue);
    }

    /**
     * 放入元素
     *
     * (1)删除已经存在的
     * (2)新元素放到元素头部
     *
     * @param key 元素
     * @since 0.0.12
     */
    @Override
    public void updateKey(final K key) {
        //1. 执行删除
        this.removeKey(key);

        //2. 新元素插入到头部
        //head<->next
        //变成:head<->new<->next
        DoubleListNode<K,V> newNode = new DoubleListNode<>();
        newNode.key(key);

        DoubleListNode<K,V> next = this.head.next();
        this.head.next(newNode);
        newNode.pre(this.head);
        next.pre(newNode);
        newNode.next(next);

        //2.2 插入到 map 中
        indexMap.put(key, newNode);
    }

    /**
     * 移除元素
     *
     * 1. 获取 map 中的元素
     * 2. 不存在直接返回,存在执行以下步骤:
     * 2.1 删除双向链表中的元素
     * 2.2 删除 map 中的元素
     *
     * @param key 元素
     * @since 0.0.13
     */
    @Override
    public void removeKey(final K key) {
        DoubleListNode<K,V> node = indexMap.get(key);

        if(ObjectUtil.isNull(node)) {
            return;
        }

        // 删除 list node
        // A<->B<->C
        // 删除 B,需要变成: A<->C
        DoubleListNode<K,V> pre = node.pre();
        DoubleListNode<K,V> next = node.next();

        pre.next(next);
        next.pre(pre);

        // 删除 map 中对应信息
        this.indexMap.remove(key);
    }

    @Override
    public boolean isEmpty() {
        return indexMap.isEmpty();
    }

    @Override
    public boolean contains(K key) {
        return indexMap.containsKey(key);
    }
}

实现思路

LRU 的实现保持不变。我们直接将 FIFO 替换为 LRU map 即可。

为了便于理解,我们将 FIFO 对应为 firstLruMap,用来存放用户只访问了一次的元素。

将原来的 LRU 中存入访问了 2 次及其以上的元素。

其他逻辑和 2Q 保持一致。

实现

基本属性

定义两个 LRU,用来分别存储访问的信息

public class CacheEvictLru2<K,V> extends AbstractCacheEvict<K,V> {

    private static final Log log = LogFactory.getLog(CacheEvictLru2.class);

    /**
     * 第一次访问的 lru
     * @since 0.0.13
     */
    private final ILruMap<K,V> firstLruMap;

    /**
     * 2次及其以上的 lru
     * @since 0.0.13
     */
    private final ILruMap<K,V> moreLruMap;

    public CacheEvictLru2() {
        this.firstLruMap = new LruMapDoubleList<>();
        this.moreLruMap = new LruMapDoubleList<>();
    }

}

淘汰实现

和 lru 2Q 模式类似,这里我们优先淘汰 firstLruMap 中的数据信息。

@Override
protected ICacheEntry<K, V> doEvict(ICacheEvictContext<K, V> context) {
    ICacheEntry<K, V> result = null;
    final ICache<K,V> cache = context.cache();
    // 超过限制,移除队尾的元素
    if(cache.size() >= context.size()) {
        ICacheEntry<K,V>  evictEntry = null;
        //1. firstLruMap 不为空,优先移除队列中元素
        if(!firstLruMap.isEmpty()) {
            evictEntry = firstLruMap.removeEldest();
            log.debug("从 firstLruMap 中淘汰数据:{}", evictEntry);
        } else {
            //2. 否则从 moreLruMap 中淘汰数据
            evictEntry = moreLruMap.removeEldest();
            log.debug("从 moreLruMap 中淘汰数据:{}", evictEntry);
        }
        // 执行缓存移除操作
        final K evictKey = evictEntry.key();
        V evictValue = cache.remove(evictKey);
        result = new CacheEntry<>(evictKey, evictValue);
    }
    return result;
}

删除

/**
 * 移除元素
 *
 * 1. 多次 lru 中存在,删除
 * 2. 初次 lru 中存在,删除
 *
 * @param key 元素
 * @since 0.0.13
 */
@Override
public void removeKey(final K key) {
    //1. 多次LRU 删除逻辑
    if(moreLruMap.contains(key)) {
        moreLruMap.removeKey(key);
        log.debug("key: {} 从 moreLruMap 中移除", key);
    } else {
        firstLruMap.removeKey(key);
        log.debug("key: {} 从 firstLruMap 中移除", key);
    }
}

更新

/**
 * 更新信息
 * 1. 如果 moreLruMap 已经存在,则处理 more 队列,先删除,再插入。
 * 2. 如果 firstLruMap 中已经存在,则处理 first 队列,先删除 firstLruMap,然后插入 Lru。
 * 1 和 2 是不同的场景,但是代码实际上是一样的,删除逻辑中做了二种场景的兼容。
 *
 * 3. 如果不在1、2中,说明是新元素,直接插入到 firstLruMap 的开始即可。
 *
 * @param key 元素
 * @since 0.0.13
 */
@Override
public void updateKey(final K key) {
    //1. 元素已经在多次访问,或者第一次访问的 lru 中
    if(moreLruMap.contains(key)
        || firstLruMap.contains(key)) {
        //1.1 删除信息
        this.removeKey(key);
        //1.2 加入到多次 LRU 中
        moreLruMap.updateKey(key);
        log.debug("key: {} 多次访问,加入到 moreLruMap 中", key);
    } else {
        // 2. 加入到第一次访问 LRU 中
        firstLruMap.updateKey(key);
        log.debug("key: {} 为第一次访问,加入到 firstLruMap 中", key);
    }
}

实际上使用 LRU-2 的代码逻辑反而变得清晰了一些,主要是因为我们把 lruMap 作为独立的数据结构抽离了出去。

测试

代码

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .size(3)
        .evict(CacheEvicts.<String, String>lru2Q())
        .build();
cache.put("A", "hello");
cache.put("B", "world");
cache.put("C", "FIFO");
// 访问一次A
cache.get("A");
cache.put("D", "LRU");
Assert.assertEquals(3, cache.size());
System.out.println(cache.keySet());

日志

为了便于定位分析,源代码实现的时候,加了一点日志。

[DEBUG] [2020-10-03 14:39:04.966] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: A 为第一次访问,加入到 firstLruMap 中
[DEBUG] [2020-10-03 14:39:04.967] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: B 为第一次访问,加入到 firstLruMap 中
[DEBUG] [2020-10-03 14:39:04.968] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: C 为第一次访问,加入到 firstLruMap 中
[DEBUG] [2020-10-03 14:39:04.970] [main] [c.g.h.c.c.s.e.CacheEvictLru2.removeKey] - key: A 从 firstLruMap 中移除
[DEBUG] [2020-10-03 14:39:04.970] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: A 多次访问,加入到 moreLruMap 中
[DEBUG] [2020-10-03 14:39:04.972] [main] [c.g.h.c.c.s.e.CacheEvictLru2.doEvict] - 从 firstLruMap 中淘汰数据:EvictEntry{key=B, value=null}
[DEBUG] [2020-10-03 14:39:04.974] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict
[DEBUG] [2020-10-03 14:39:04.974] [main] [c.g.h.c.c.s.e.CacheEvictLru2.updateKey] - key: D 为第一次访问,加入到 firstLruMap 中
[D, A, C]

小结

对于 LRU 算法的改进我们主要做了两点:

(1)性能的改进,从 O(N) 优化到 O(1)

(2)批量操作的改进,避免缓存污染

其实除了 LRU,我们还有其他的淘汰策略。

我们需要考虑下面的问题:

A 数据被访问了 10 次,B 数据被访问了 2 次。那么二者谁是热点数据呢?

如果你认为肯定 A 是热点数据,这里实际上是另一种淘汰算法,基于 LFU 的淘汰算法,认为访问次数越多,就越是热点数据

我们下一节共同学习下 LFU 淘汰算法的实现。

开源地址:https://github.com/houbb/cache

觉得本文对你有帮助的话,欢迎点赞评论收藏关注一波,你的鼓励,是我最大的动力~

目前我们通过两次优化,解决了性能问题,和批量导致的缓存污染问题。

不知道你有哪些收获呢?或者有其他更多的想法,欢迎留言区和我一起讨论,期待与你的思考相遇。

深入学习

查看原文

赞 0 收藏 0 评论 0

老马啸西风 发布了文章 · 10月4日

从零开始手写 redis(八)朴素 LRU 淘汰算法性能优化

前言

java从零手写实现redis(一)如何实现固定大小的缓存?

java从零手写实现redis(三)redis expire 过期原理

java从零手写实现redis(三)内存数据如何重启不丢失?

java从零手写实现redis(四)添加监听器

java从零手写实现redis(五)过期策略的另一种实现思路

java从零手写实现redis(六)AOF 持久化原理详解及实现

我们前面简单实现了 redis 的几个特性,java从零手写实现redis(一)如何实现固定大小的缓存? 中实现了先进先出的驱除策略。

但是实际工作实践中,一般推荐使用 LRU/LFU 的驱除策略。

LRU 基础知识

是什么

LRU算法全称是最近最少使用算法(Least Recently Use),广泛的应用于缓存机制中。

当缓存使用的空间达到上限后,就需要从已有的数据中淘汰一部分以维持缓存的可用性,而淘汰数据的选择就是通过LRU算法完成的。

LRU算法的基本思想是基于局部性原理的时间局部性:

如果一个信息项正在被访问,那么在近期它很可能还会被再次访问。

拓展阅读

Apache Commons LRUMAP 源码详解

Redis 当做 LRU MAP 使用

java 从零开始手写 redis(七)redis LRU 驱除策略详解及实现

简单的实现思路

基于数组

方案:为每一个数据附加一个额外的属性——时间戳,当每一次访问数据时,更新该数据的时间戳至当前时间。

当数据空间已满后,则扫描整个数组,淘汰时间戳最小的数据。

不足:维护时间戳需要耗费额外的空间,淘汰数据时需要扫描整个数组。

这个时间复杂度太差,空间复杂度也不好。

基于长度有限的双向链表

方案:访问一个数据时,当数据不在链表中,则将数据插入至链表头部,如果在链表中,则将该数据移至链表头部。当数据空间已满后,则淘汰链表最末尾的数据。

不足:插入数据或取数据时,需要扫描整个链表。

这个就是我们上一节实现的方式,缺点还是很明显,每次确认元素是否存在,都要消耗 O(n) 的时间复杂度去查询。

基于双向链表和哈希表

方案:为了改进上面需要扫描链表的缺陷,配合哈希表,将数据和链表中的节点形成映射,将插入操作和读取操作的时间复杂度从O(N)降至O(1)

缺点:这个使我们上一节提到的优化思路,不过还是有缺点的,那就是空间复杂度翻倍。

数据结构的选择

(1)基于数组的实现

这里不建议选择 array 或者 ArrayList,因为读取的时间复杂度为 O(1),但是更新相对是比较慢的,虽然 jdk 使用的是 System.arrayCopy。

(2)基于链表的实现

如果我们选择链表,HashMap 中还是不能简单的存储 key, 和对应的下标。

因为链表的遍历,实际上还是 O(n) 的,双向链表理论上可以优化一半,但是这并不是我们想要的 O(1) 效果。

(3)基于双向列表

双向链表我们保持不变。

Map 中 key 对应的值我们放双向链表的节点信息。

那实现方式就变成了实现一个双向链表。

代码实现

  • 节点定义
/**
 * 双向链表节点
 * @author binbin.hou
 * @since 0.0.12
 * @param <K> key
 * @param <V> value
 */
public class DoubleListNode<K,V> {

    /**
     * 键
     * @since 0.0.12
     */
    private K key;

    /**
     * 值
     * @since 0.0.12
     */
    private V value;

    /**
     * 前一个节点
     * @since 0.0.12
     */
    private DoubleListNode<K,V> pre;

    /**
     * 后一个节点
     * @since 0.0.12
     */
    private DoubleListNode<K,V> next;

    //fluent get & set
}
  • 核心代码实现

我们保持和原来的接口不变,实现如下:

public class CacheEvictLruDoubleListMap<K,V> extends AbstractCacheEvict<K,V> {

    private static final Log log = LogFactory.getLog(CacheEvictLruDoubleListMap.class);


    /**
     * 头结点
     * @since 0.0.12
     */
    private DoubleListNode<K,V> head;

    /**
     * 尾巴结点
     * @since 0.0.12
     */
    private DoubleListNode<K,V> tail;

    /**
     * map 信息
     *
     * key: 元素信息
     * value: 元素在 list 中对应的节点信息
     * @since 0.0.12
     */
    private Map<K, DoubleListNode<K,V>> indexMap;

    public CacheEvictLruDoubleListMap() {
        this.indexMap = new HashMap<>();
        this.head = new DoubleListNode<>();
        this.tail = new DoubleListNode<>();

        this.head.next(this.tail);
        this.tail.pre(this.head);
    }

    @Override
    protected ICacheEntry<K, V> doEvict(ICacheEvictContext<K, V> context) {
        ICacheEntry<K, V> result = null;
        final ICache<K,V> cache = context.cache();
        // 超过限制,移除队尾的元素
        if(cache.size() >= context.size()) {
            // 获取尾巴节点的前一个元素
            DoubleListNode<K,V> tailPre = this.tail.pre();
            if(tailPre == this.head) {
                log.error("当前列表为空,无法进行删除");
                throw new CacheRuntimeException("不可删除头结点!");
            }

            K evictKey = tailPre.key();
            V evictValue = cache.remove(evictKey);
            result = new CacheEntry<>(evictKey, evictValue);
        }

        return result;
    }


    /**
     * 放入元素
     *
     * (1)删除已经存在的
     * (2)新元素放到元素头部
     *
     * @param key 元素
     * @since 0.0.12
     */
    @Override
    public void update(final K key) {
        //1. 执行删除
        this.remove(key);

        //2. 新元素插入到头部
        //head<->next
        //变成:head<->new<->next
        DoubleListNode<K,V> newNode = new DoubleListNode<>();
        newNode.key(key);

        DoubleListNode<K,V> next = this.head.next();
        this.head.next(newNode);
        newNode.pre(this.head);
        next.pre(newNode);
        newNode.next(next);

        //2.2 插入到 map 中
        indexMap.put(key, newNode);
    }

    /**
     * 移除元素
     *
     * 1. 获取 map 中的元素
     * 2. 不存在直接返回,存在执行以下步骤:
     * 2.1 删除双向链表中的元素
     * 2.2 删除 map 中的元素
     *
     * @param key 元素
     * @since 0.0.12
     */
    @Override
    public void remove(final K key) {
        DoubleListNode<K,V> node = indexMap.get(key);

        if(ObjectUtil.isNull(node)) {
            return;
        }

        // 删除 list node
        // A<->B<->C
        // 删除 B,需要变成: A<->C
        DoubleListNode<K,V> pre = node.pre();
        DoubleListNode<K,V> next = node.next();

        pre.next(next);
        next.pre(pre);

        // 删除 map 中对应信息
        this.indexMap.remove(key);
    }

}

实现起来不难,就是一个简易版本的双向列表。

只是获取节点的时候,借助了一下 map,让时间复杂度降低为 O(1)。

测试

我们验证一下自己的实现:

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .size(3)
        .evict(CacheEvicts.<String, String>lruDoubleListMap())
        .build();
cache.put("A", "hello");
cache.put("B", "world");
cache.put("C", "FIFO");

// 访问一次A
cache.get("A");
cache.put("D", "LRU");

Assert.assertEquals(3, cache.size());
System.out.println(cache.keySet());
  • 日志
[DEBUG] [2020-10-03 09:37:41.007] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict
[D, A, C]

因为我们访问过一次 A,所以 B 已经变成最少被访问的元素。

基于 LinkedHashMap 实现

实际上,LinkedHashMap 本身就是对于 list 和 hashMap 的一种结合的数据结构,我们可以直接使用 jdk 中 LinkedHashMap 去实现。

直接实现

public class LRUCache extends LinkedHashMap {

    private int capacity;

    public LRUCache(int capacity) {
        // 注意这里将LinkedHashMap的accessOrder设为true
        super(16, 0.75f, true);
        this.capacity = capacity;
    }

    @Override
    protected boolean removeEldestEntry(Map.Entry eldest) {
        return super.size() >= capacity;
    }
}

默认LinkedHashMap并不会淘汰数据,所以我们重写了它的removeEldestEntry()方法,当数据数量达到预设上限后,淘汰数据,accessOrder设为true意为按照访问的顺序排序。

整个实现的代码量并不大,主要都是应用LinkedHashMap的特性。

简单改造

我们对这个方法简单改造下,让其适应我们定义的接口。

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .size(3)
        .evict(CacheEvicts.<String, String>lruLinkedHashMap())
        .build();
cache.put("A", "hello");
cache.put("B", "world");
cache.put("C", "FIFO");
// 访问一次A
cache.get("A");
cache.put("D", "LRU");

Assert.assertEquals(3, cache.size());
System.out.println(cache.keySet());

测试

  • 代码
ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .size(3)
        .evict(CacheEvicts.<String, String>lruLinkedHashMap())
        .build();
cache.put("A", "hello");
cache.put("B", "world");
cache.put("C", "FIFO");
// 访问一次A
cache.get("A");
cache.put("D", "LRU");

Assert.assertEquals(3, cache.size());
System.out.println(cache.keySet());
  • 日志
[DEBUG] [2020-10-03 10:20:57.842] [main] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: B, value: world, type: evict
[D, A, C]

小结

上一节中提到的数组 O(n) 遍历的问题,本节已经基本解决了。

但其实这种算法依然存在一定的问题,比如当偶发性的批量操作时,会导致热点数据被非热点数据挤出缓存,下一节我们一起学习如何进一步改进 LRU 算法。

文中主要讲述了思路,实现部分因为篇幅限制,没有全部贴出来。

开源地址:https://github.com/houbb/cache

觉得本文对你有帮助的话,欢迎点赞评论收藏关注一波~

你的鼓励,是我最大的动力~

深入学习

查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 31 次点赞
  • 获得 2 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 2 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2018-07-06
个人主页被 1.1k 人浏览