Forezp

Forezp 查看完整档案

深圳编辑  |  填写毕业院校  |  填写所在公司/组织 blog.csdn.net/forezp 编辑
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

Forezp 发布了文章 · 2019-08-25

RateLimiter 源码分析(Guava 和 Sentinel 实现)

作者javadoop,资深Java工程师。本文已获作者授权发布。
原文链接https://www.javadoop.com/post...

本文主要介绍关于流控的两部分内容。

第一部分介绍 Guava 中 RateLimiter 的源码,包括它的两种模式,目前网上大部分文章只分析简单的 SmoothBursty 模式,而没有分析带有预热的 SmoothWarmingUp。

第二部分介绍 Sentinel 中流控的实现,本文不要求读者了解 Sentinel,这部分内容和 Sentinel 耦合很低,所以读者不需要有阅读压力。

Sentinel 中流控设计是参考 Guava RateLimiter 的,所以阅读第二部分内容,需要有第一部分内容的背景。

Guava RateLimiter

RateLimiter 基于漏桶算法,但它参考了令牌桶算法,这里不讨论流控算法,请自行查找资料。

RateLimiter 使用介绍

RateLimiter 的接口非常简单,它有两个静态方法用来实例化,实例化以后,我们只需要关心 acquire 就行了,甚至都没有 release 操作。

// RateLimiter 接口列表:

// 实例化的两种方式:
public static RateLimiter create(double permitsPerSecond){}
public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit) {}

public double acquire() {}
public double acquire(int permits) {}

public boolean tryAcquire() {}
public boolean tryAcquire(int permits) {}
public boolean tryAcquire(long timeout, TimeUnit unit) {}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {}

public final double getRate() {}
public final void setRate(double permitsPerSecond) {}

RateLimiter 的作用是用来限流的,我们知道 java 并发包中提供了 Semaphore,它也能够提供对资源使用进行控制,我们看一下下面的代码:

// Semaphore
Semaphore semaphore = new Semaphore(10);
for (int i = 0; i < 100; i++) {
    executor.submit(new Runnable() {
        @Override
        public void run() {
            semaphore.acquireUninterruptibly(1);
            try {
                doSomething();
            } finally {
                semaphore.release();
            }
        }
    });
}

Semaphore 用来控制同时访问某个资源的并发数量,如上面的代码,我们设置 100 个线程工作,但是我们能做到最多只有 10 个线程能同时到 doSomething() 方法中。它控制的是并发数量。

而 RateLimiter 是用来控制访问资源的速率(rate)的,它强调的是控制速率。比如控制每秒只能有 100 个请求通过,比如允许每秒发送 1MB 的数据。

它的构造方法指定一个 permitsPerSecond 参数,代表每秒钟产生多少个 permits,这就是我们的速率。

RateLimiter 允许预占未来的令牌,比如,每秒产生 5 个 permits,我们可以单次请求 100 个,这样,紧接着的下一个请求需要等待大概 20 秒才能获取到 permits。

SmoothRateLimiter 介绍

RateLimiter 目前只有一个子类,那就是抽象类 SmoothRateLimiter,SmoothRateLimiter 有两个实现类,也就是我们这边要介绍的两种模式,我们先简单介绍下 SmoothRateLimiter,然后后面分两个小节分别介绍它的两个实现类。

RateLimiter 作为抽象类,只有两个属性:

private final SleepingStopwatch stopwatch;

private volatile Object mutexDoNotUseDirectly;

stopwatch 非常重要,它用来“计时”,RateLimiter 把实例化的时间设置为 0 值,后续都是取相对时间,用微秒表示。

mutexDoNotUseDirectly 用来做锁,RateLimiter 依赖于 synchronized 来控制并发,所以我们之后可以看到,各个属性甚至都没有用 volatile 修饰。

然后我们来看 SmoothRateLimiter 的属性,分别代表什么意思。

// 当前还有多少 permits 没有被使用,被存下来的 permits 数量
double storedPermits;

// 最大允许缓存的 permits 数量,也就是 storedPermits 能达到的最大值
double maxPermits;

// 每隔多少时间产生一个 permit,
// 比如我们构造方法中设置每秒 5 个,也就是每隔 200ms 一个,这里单位是微秒,也就是 200,000
double stableIntervalMicros;

// 下一次可以获取 permits 的时间,这个时间是相对 RateLimiter 的构造时间的,是一个相对时间,理解为时间戳吧
private long nextFreeTicketMicros = 0L; 

其实,看到这几个属性,我们就可以大致猜一下它的内部实现了:

nextFreeTicketMicros 是一个很关键的属性。我们每次获取 permits 的时候,先拿 storedPermits 的值,如果够,storedPermits 减去相应的值就可以了,如果不够,那么还需要将 nextFreeTicketMicros 往前推,表示我预占了接下来多少时间的量了。那么下一个请求来的时候,如果还没到 nextFreeTicketMicros 这个时间点,需要 sleep 到这个点再返回,当然也要将这个值再往前推。

大家在这里可能会有疑惑,因为时间是一直往前走的,所以 storedPermits 的信息可能是不准确的,不过,只需要在关键的操作中同步一下,重新计算就好了。

SmoothBursty 分析

我们先从比较简单的 SmoothBursty 出发,来分析 RateLimiter 的源码,之后我们再分析 SmoothWarmingUp。

Bursty 是突发的意思,它说的不是下面这个意思:我们设置了 1k 每秒,而我们可以一次性获取 5k 的 permits,这个场景表达的不是突发,而是在说预先占有了接下来几秒产生的 permits。

突发说的是,RateLimiter 会缓存一定数量的 permits 在池中,这样对于突发请求,能及时得到满足。想象一下我们的某个接口,很久没有请求过来,突然同时来了好几个请求,如果我们没有缓存一些 permits 的话,很多线程就需要等待了。

SmoothBursty 默认缓存最多 1 秒钟的 permits,不可以修改。

RateLimiter 的静态构造方法:

public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

构造参数 permitsPerSecond 指定每秒钟可以产生多少个 permits。

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

我们看到,这里实例化的是 SmoothBursty 的实例,它的构造方法很简单,而且它只有一个属性 maxBurstSeconds,这里就不贴代码了。

构造函数指定了 maxBurstSeconds 为 1.0,也就是说,最多会缓存 1 秒钟,也就是 (1.0 * permitsPerSecond) 这么多个 permits 到池中。

这个 1.0 秒,关系到 storedPermits 和 maxPermits:

0 <= storedPermits <= maxPermits = permitsPerSecond

我们继续往后看 setRate 方法:

public final void setRate(double permitsPerSecond) {
  checkArgument(
      permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
  synchronized (mutex()) {
    doSetRate(permitsPerSecond, stopwatch.readMicros());
  }
}

setRate 这个方法是一个 public 方法,它可以用来调整速率。我们这边继续跟的是初始化过程,但是大家提前知道这个方法是用来调整速率用的,对理解源码有很大的帮助。注意看,这里用了 synchronized 控制并发。

@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
    // 同步
    resync(nowMicros);
    // 计算属性 stableIntervalMicros
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
}

resync 方法很简单,它用来调整 storedPermits 和 nextFreeTicketMicros。这就是我们说的,在关键的节点,需要先更新一下 storedPermits 到正确的值。

void resync(long nowMicros) {
  // 如果 nextFreeTicket 已经过掉了,想象一下很长时间都没有再次调用 limiter.acquire() 的场景
  // 需要将 nextFreeTicket 设置为当前时间,重新计算 storedPermits
  if (nowMicros > nextFreeTicketMicros) {
    double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
    storedPermits = min(maxPermits, storedPermits + newPermits);
    nextFreeTicketMicros = nowMicros;
  }
}
coolDownIntervalMicros() 这个方法大家先不用关注,可以看到,在 SmoothBursty 类中的实现是直接返回了 stableIntervalMicros 的值,也就是我们说的,每产生一个 permit 的时间长度。

当然了,细心的读者,可能会发现,此时的 stableIntervalMicros 其实没有设置,也就是说,上面发生了一次除以 0 值的操作,得到的 newPermits 其实是一个无穷大。而 maxPermits 此时还是 0 值,不过这里其实没有关系。

我们回到前面一个方法,resync 同步以后,会设置 stableIntervalMicros 为一个正确的值,然后进入下面的方法:

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  // 这里计算了,maxPermits 为 1 秒产生的 permits
  maxPermits = maxBurstSeconds * permitsPerSecond;
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
  } else {
    // 因为 storedPermits 的值域变化了,需要等比例缩放
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}

上面这个方法,我们要这么看,原来的 RateLimiter 是用某个 permitsPerSecond 值初始化的,现在我们要调整这个频率。对于 maxPermits 来说,是重新计算,而对于 storedPermits 来说,是做等比例的缩放。

到此,构造方法就完成了,我们得到了一个 RateLimiter 的实现类 SmoothBursty 的实例,可能上面的源码你还是会有一些疑惑,不过也没关系,继续往下看,可能你的很多疑惑就解开了。

接下来,我们来分析 acquire 方法:

@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}

@CanIgnoreReturnValue
public double acquire(int permits) {
  // 预约,如果当前不能直接获取到 permits,需要等待
  // 返回值代表需要 sleep 多久
  long microsToWait = reserve(permits);
  // sleep
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  // 返回 sleep 的时长
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

我们来看 reserve 方法:

final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}

final long reserveAndGetWaitLength(int permits, long nowMicros) {
  // 返回 nextFreeTicketMicros
  long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  // 计算时长
  return max(momentAvailable - nowMicros, 0);
}

继续往里看:

@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  // 这里做一次同步,更新 storedPermits 和 nextFreeTicketMicros (如果需要)
  resync(nowMicros);
  // 返回值就是 nextFreeTicketMicros,注意刚刚已经做了 resync 了,此时它是最新的正确的值
  long returnValue = nextFreeTicketMicros;
  // storedPermits 中可以使用多少个 permits
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  // storedPermits 中不够的部分
  double freshPermits = requiredPermits - storedPermitsToSpend;
  // 为了这个不够的部分,需要等待多久时间
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 这部分固定返回 0
          + (long) (freshPermits * stableIntervalMicros);
  // 将 nextFreeTicketMicros 往前推
  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
  // storedPermits 减去被拿走的部分
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}

我们可以看到,获取 permits 的时候,其实是获取了两部分,一部分来自于存量 storedPermits,存量不够的话,另一部分来自于预占未来的 freshPermits。

这里提一个关键点吧,我们看到,返回值是 nextFreeTicketMicros 的旧值,因为只要到这个时间点,就说明当次 acquire 可以成功返回了,而不管 storedPermits 够不够。如果 storedPermits 不够,会将 nextFreeTicketMicros 往前推一定的时间,预占了一定的量。

到这里,acquire 方法就分析完了,大家看到这里,逆着往前看就是了。应该说,SmoothBursty 的源码还是非常简单的。

SmoothWarmingUp 分析

分析完了 SmoothBursty,我们再来分析 SmoothWarmingUp 会简单一些。我们说过,SmoothBursty 可以处理突发请求,因为它会缓存最多 1 秒的 permits,而待会我们会看到 SmoothWarmingUp 完全不同的设计。

SmoothWarmingUp 适用于资源需要预热的场景,比如我们的某个接口业务,需要使用到数据库连接,由于连接需要预热才能进入到最佳状态,如果我们的系统长时间处于低负载或零负载状态(当然,应用刚启动也是一样的),连接池中的连接慢慢释放掉了,此时我们认为连接池是冷的。

假设我们的业务在稳定状态下,正常可以提供最大 1000 QPS 的访问,但是如果连接池是冷的,我们就不能让 1000 个请求同时进来,因为这会拖垮我们的系统,我们应该有个预热升温的过程。

对应到 SmoothWarmingUp 中,如果系统处于低负载状态,storedPermits 会一直增加,当请求来的时候,我们要从 storedPermits 中取 permits,最关键的点在于,从 storedPermits 中取 permits 的操作是比较耗时的,因为没有预热。

回顾一下前面介绍的 SmoothBursty,它从 storedPermits 中获取 permits 是不需要等待时间的,而这边洽洽相反,从 storedPermits 获取需要更多的时间,这是最大的不同,先理解这一点,能帮助你更好地理解源码。

大家先有一些粗的概念,然后我们来看下面这个图:

这个图不容易看懂,X 轴代表 storedPermits 的数量,Y 轴代表获取一个 permits 需要的时间。

假设指定 permitsPerSecond 为 10,那么 stableInterval 为 100ms,而 coldInterval 是 3 倍,也就是 300ms(coldFactor,3 倍是写死的,用户不能修改)。也就是说,当达到 maxPermits 时,此时处于系统最冷的时候,获取一个 permit 需要 300ms,而如果 storedPermits 小于 thresholdPermits 的时候,只需要 100ms。

想象有一条垂直线 x=k,它与 X 轴的交点 k 代表当前 storedPermits 的数量:

  • 当系统在非常繁忙的时候,这条线停留在 x=0 处,此时 storedPermits 为 0
  • 当 limiter 没有被使用的时候,这条线慢慢往右移动,直到 x=maxPermits 处;
  • 如果 limiter 被重新使用,那么这条线又慢慢往左移动,直到 x=0 处;

当 storedPermits 处于 maxPermits 状态时,我们认为 limiter 中的 permits 是冷的,此时获取一个 permit 需要较多的时间,因为需要预热,有一个关键的分界点是 thresholdPermits。

预热时间是我们在构造的时候指定的,图中梯形的面积就是预热时间,因为预热完成后,我们能进入到一个稳定的速率中(stableInterval),下面我们来计算出 thresholdPermits 和 maxPermits 的值。

有一个关键点,从 thresholdPermits 到 0 的时间,是从 maxPermits 到 thresholdPermits 时间的一半,也就是梯形的面积是长方形面积的 2 倍,梯形的面积是 warmupPeriod。

之所以长方形的面积是 warmupPeriod/2,是因为 coldFactor 是硬编码的 3。

梯形面积为 warmupPeriod,即:

warmupPeriod = 2 * stableInterval * thresholdPermits

由此,我们得出 thresholdPermits 的值:

thresholdPermits = 0.5 * warmupPeriod / stableInterval

然后我们根据梯形面积的计算公式:

warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)

得出 maxPermits 为:

maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)

这样,我们就得到了 thresholdPermits 和 maxPermits 的值。

接下来,我们来看一下冷却时间间隔,它指的是 storedPermits 中每个 permit 的增长速度,也就是我们前面说的 x=k 这条垂直线往右的移动速度,为了达到从 0 到 maxPermits 花费 warmupPeriodMicros 的时间,我们将其定义为:

@Override
double coolDownIntervalMicros() {
    return warmupPeriodMicros / maxPermits;
}
贴一下代码,大家就知道了,在 resync 中用到的这个:

void resync(long nowMicros) {
  if (nowMicros > nextFreeTicketMicros) {
    // coolDownIntervalMicros 在这里使用
    double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
    storedPermits = min(maxPermits, storedPermits + newPermits);
    nextFreeTicketMicros = nowMicros;
  }
}

基于上面的分析,我们来看 SmoothWarmingUp 的其他源码。

首先,我们来看它的 doSetRate 方法,有了前面的介绍,这个方法的源码非常简单:

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
    double oldMaxPermits = maxPermits;
    // coldFactor 是固定的 3
    double coldIntervalMicros = stableIntervalMicros * coldFactor;
    // 这个公式我们上面已经说了
    thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
    // 这个公式我们上面也已经说了
    maxPermits =
        thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
    // 计算那条斜线的斜率。数学知识,对边 / 临边
    slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
    if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
    } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
    }
}

setRate 方法非常简单,接下来,我们要分析的是 storedPermitsToWaitTime 方法,我们回顾一下下面的代码:

这段代码是 acquire 方法的核心,waitMicros 由两部分组成,一部分是从 storedPermits 中获取花费的时间,一部分是等待 freshPermits 产生花费的时间。在 SmoothBursty 的实现中,从 storedPermits 中获取 permits 直接返回 0,不需要等待。

而在 SmoothWarmingUp 的实现中,由于需要预热,所以从 storedPermits 中取 permits 需要花费一定的时间,其实就是要计算下图中,阴影部分的面积。

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
  long micros = 0;
  // 如果右边梯形部分有 permits,那么先从右边部分获取permits,计算梯形部分的阴影部分的面积
  if (availablePermitsAboveThreshold > 0.0) {
    // 从右边部分获取的 permits 数量
    double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
    // 梯形面积公式:(上底+下底)*高/2
    double length =
        permitsToTime(availablePermitsAboveThreshold)
            + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
    micros = (long) (permitsAboveThresholdToTake * length / 2.0);
    permitsToTake -= permitsAboveThresholdToTake;
  }
  // 加上 长方形部分的阴影面积
  micros += (long) (stableIntervalMicros * permitsToTake);
  return micros;
}

// 对于给定的 x 值,计算 y 值
private double permitsToTime(double permits) {
  return stableIntervalMicros + permits * slope;
}

到这里,SmoothWarmingUp 也已经说完了。

如果大家对于 Guava RateLimiter 还有什么疑惑,欢迎在留言区留言,对于 Sentinel 中的流控不感兴趣的读者,看到这里就可以结束了。

Sentinel 中的流控

Sentinel 是阿里开源的流控、熔断工具,这里不做过多的介绍,感兴趣的读者请自行了解。

在 Sentinel 的流控中,我们可以配置流控规则,主要是控制 QPS 和线程数,这里我们不讨论控制线程数,控制线程数的代码不在我们这里的讨论范围内,下面的介绍都是指控制 QPS。

RateLimiterController

RateLimiterController 非常简单,它通过使用 latestPassedTime 属性来记录最后一次通过的时间,然后根据规则中 QPS 的限制,计算当前请求是否可以通过。

举个非常简单的例子:设置 QPS 为 10,那么每 100 毫秒允许通过一个,通过计算当前时间是否已经过了上一个请求的通过时间 latestPassedTime 之后的 100 毫秒,来判断是否可以通过。假设才过了 50ms,那么需要当前线程再 sleep 50ms,然后才可以通过。如果同时有另一个请求呢?那需要 sleep 150ms 才行。

public class RateLimiterController implements TrafficShapingController {

    // 排队最大时长,默认 500ms
    private final int maxQueueingTimeMs;
    // QPS 设置的值
    private final double count;
        // 上一次请求通过的时间
    private final AtomicLong latestPassedTime = new AtomicLong(-1);

    public RateLimiterController(int timeOut, double count) {
        this.maxQueueingTimeMs = timeOut;
        this.count = count;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    // 通常 acquireCount 为 1,这里不用关心参数 prioritized
    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // Pass when acquire count is less or equal than 0.
        if (acquireCount <= 0) {
            return true;
        }
        // 
        if (count <= 0) {
            return false;
        }

        long currentTime = TimeUtil.currentTimeMillis();
        // 计算每 2 个请求之间的间隔,比如 QPS 限制为 10,那么间隔就是 100ms
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

        // Expected pass time of this request.
        long expectedTime = costTime + latestPassedTime.get();

        // 可以通过,设置 latestPassedTime 然后就返回 true 了
        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // 不可以通过,需要等待
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            // 等待时长大于最大值,返回 false
            if (waitTime > maxQueueingTimeMs) {
                return false;
            } else {
                // 将 latestPassedTime 往前推
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    // 需要 sleep 的时间
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > maxQueueingTimeMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    // in race condition waitTime may <= 0
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }

}

这个策略还是非常好理解的,简单粗暴,快速失败。

WarmUpController

WarmUpController 用来防止突发流量迅速上升,导致系统负载严重过高,本来系统在稳定状态下能处理的,但是由于许多资源没有预热,导致这个时候处理不了了。比如,数据库需要建立连接、需要连接到远程服务等,这就是为什么我们需要预热。

啰嗦一句,这里不仅仅指系统刚刚启动需要预热,对于长时间处于低负载的系统,突发流量也需要重新预热。

Guava 的 SmoothWarmingUp 是用来控制获取令牌的速率的,和这里的控制 QPS 还是有一点区别,但是中心思想是一样的。我们在看完源码以后再讨论它们的区别。

为了帮助大家理解源码,我们这边先设定一个场景:QPS 设置为 100,预热时间设置为 10 秒。代码中使用 “【】” 代表根据这个场景计算出来的值。

public class WarmUpController implements TrafficShapingController {

    // 阈值
    protected double count;
    // 3
    private int coldFactor;
    // 转折点的令牌数,和 Guava 的 thresholdPermits 一个意思
    // [500]
    protected int warningToken = 0;
    // 最大的令牌数,和 Guava 的 maxPermits 一个意思
    // [1000]
    private int maxToken;
    // 斜线斜率
    // [1/25000]
    protected double slope;

    // 累积的令牌数,和 Guava 的 storedPermits 一个意思
    protected AtomicLong storedTokens = new AtomicLong(0);
    // 最后更新令牌的时间
    protected AtomicLong lastFilledTime = new AtomicLong(0);

    public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
        construct(count, warmUpPeriodInSec, coldFactor);
    }

    public WarmUpController(double count, int warmUpPeriodInSec) {
        construct(count, warmUpPeriodInSec, 3);
    }

    // 下面的构造方法,和 Guava 中是差不多的,只不过 thresholdPermits 和 maxPermits 都换了个名字
    private void construct(double count, int warmUpPeriodInSec, int coldFactor) {

        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }

        this.count = count;

        this.coldFactor = coldFactor;

        // warningToken 和 thresholdPermits 是一样的意思,计算结果其实是一样的
        // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
        // 【warningToken = (10*100)/(3-1) = 500】
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);

        // maxToken 和 maxPermits 是一样的意思,计算结果其实是一样的
        // maxPermits = thresholdPermits + 2*warmupPeriod/(stableInterval+coldInterval)
        // 【maxToken = 500 + (2*10*100)/(1.0+3) = 1000】
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));

        // 斜率计算
        // slope
        // slope = (coldIntervalMicros-stableIntervalMicros)/(maxPermits-thresholdPermits);
        // 【slope = (3-1.0) / 100 / (1000-500) = 1/25000】
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);

    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {

        // Sentinel 的 QPS 统计使用的是滑动窗口

        // 当前时间窗口的 QPS 
        long passQps = (long) node.passQps();

        // 这里是上一个时间窗口的 QPS,这里的一个窗口跨度是1分钟
        long previousQps = (long) node.previousPassQps();

        // 同步。设置 storedTokens 和 lastFilledTime 到正确的值
        syncToken(previousQps);

        long restToken = storedTokens.get();
        // 令牌数超过 warningToken,进入梯形区域
        if (restToken >= warningToken) {

            // 这里简单说一句,因为当前的令牌数超过了 warningToken 这个阈值,系统处于需要预热的阶段
            // 通过计算当前获取一个令牌所需时间,计算其倒数即是当前系统的最大 QPS 容量

            long aboveToken = restToken - warningToken;

            // 这里计算警戒 QPS 值,就是当前状态下能达到的最高 QPS。
            // (aboveToken * slope + 1.0 / count) 其实就是在当前状态下获取一个令牌所需要的时间
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            // 如果不会超过,那么通过,否则不通过
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            // count 是最高能达到的 QPS
            if (passQps + acquireCount <= count) {
                return true;
            }
        }

        return false;
    }

    protected void syncToken(long passQps) {
        // 下面几行代码,说明在第一次进入新的 1 秒钟的时候,做同步
        // 题外话:Sentinel 默认地,1 秒钟分为 2 个时间窗口,分别 500ms
        long currentTime = TimeUtil.currentTimeMillis();
        currentTime = currentTime - currentTime % 1000;
        long oldLastFillTime = lastFilledTime.get();
        if (currentTime <= oldLastFillTime) {
            return;
        }

        // 令牌数量的旧值
        long oldValue = storedTokens.get();
        // 计算新的令牌数量,往下看
        long newValue = coolDownTokens(currentTime, passQps);

        if (storedTokens.compareAndSet(oldValue, newValue)) {
            // 令牌数量上,减去上一分钟的 QPS,然后设置新值
            long currentValue = storedTokens.addAndGet(0 - passQps);
            if (currentValue < 0) {
                storedTokens.set(0L);
            }
            lastFilledTime.set(currentTime);
        }

    }

    // 更新令牌数
    private long coolDownTokens(long currentTime, long passQps) {
        long oldValue = storedTokens.get();
        long newValue = oldValue;

        // 当前令牌数小于 warningToken,添加令牌
        if (oldValue < warningToken) {
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        } else if (oldValue > warningToken) {
            // 当前令牌数量处于梯形阶段,
            // 如果当前通过的 QPS 大于 count/coldFactor,说明系统消耗令牌的速度,大于冷却速度
            //    那么不需要添加令牌,否则需要添加令牌
            if (passQps < (int)count / coldFactor) {
                newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
            }
        }
        return Math.min(newValue, maxToken);
    }

}

coolDownTokens 这个方法用来计算新的 token 数量,其实我也没有完全理解作者的设计:

  • 第一、对于令牌的增加,在 Guava 中,使用 warmupPeriodMicros / maxPermits 作为增长率,因为它实现的是 storedPermits 从 0 到 maxPermits 花费的时间为 warmupPeriod。而这里是以每秒 count 个作为增长率,为什么?
  • 第二、else if 分支中的决定我没有理解,为什么用 passQps 和 count / coldFactor 进行对比来决定是否继续添加令牌?
  • 我自己的理解是,count/coldFactor 就是指冷却速度,那么就是说得通的。欢迎大家一起探讨。

最后,我们再简单说说 Guava 的 SmoothWarmingUp 和 Sentinel 的 WarmupController 的区别。

Guava 在于控制获取令牌的速率,它关心的是,获取 permits 需要多少时间,包括从 storedPermits 中获取,以及获取 freshPermits,以此推进 nextFreeTicketMicros 到未来的某个时间点。

而 Sentinel 在于控制 QPS,它用令牌数来标识当前系统处于什么状态,根据时间推进一直增加令牌,根据通过的 QPS 一直减少令牌。如果 QPS 持续下降,根据推演,可以发现 storedTokens 越来越多,然后越过 warningTokens 这个阈值,之后只有当 QPS 下降到 count/3 以后,令牌才会继续往上增长,一直到 maxTokens。

storedTokens 是以 “count 每秒”的增长率增长的,减少是以 前一分钟的 QPS 来减少的。其实这里我也有个疑问,为什么增加令牌的时候考虑了时间,而减少的时候却不考虑时间因素,提了 issue,似乎没人搭理。

WarmUpRateLimiterController

注意,这个类继承自刚刚介绍的 WarmUpController,它的流控效果定义为排队等待。它的代码其实就是前面介绍的 RateLimiterController 加上 WarmUpController。


public class WarmUpRateLimiterController extends WarmUpController {

    private final int timeoutInMs;
    private final AtomicLong latestPassedTime = new AtomicLong(-1);

    public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) {
        super(count, warmUpPeriodSec, coldFactor);
        this.timeoutInMs = timeOutMs;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        long currentTime = TimeUtil.currentTimeMillis();

        long restToken = storedTokens.get();
        long costTime = 0;
        long expectedTime = 0;

        // 和 RateLimiterController 比较,区别主要就是这块代码

        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;

            // current interval = restToken*slope+1/count
            double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
        } else {
            costTime = Math.round(1.0 * (acquireCount) / count * 1000);
        }
        expectedTime = costTime + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            latestPassedTime.set(currentTime);
            return true;
        } else {
            long waitTime = costTime + latestPassedTime.get() - currentTime;
            if (waitTime > timeoutInMs) {
                return false;
            } else {
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > timeoutInMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }
}

这个代码很简单,就是 RateLimiter 中的代码,然后加入了预热的内容。

在 RateLimiter 中,单个请求的 costTime 是固定的,就是 1/count,比如设置 100 qps,那么 costTime 就是 10ms。

但是这边,加入了 WarmUp 的内容,就是说,通过令牌数量,来判断当前系统的 QPS 应该是多少,如果当前令牌数超过 warningTokens,那么系统的 QPS 容量已经低于我们预设的 QPS,相应的,costTime 就会延长。

小结

有段时间没写文章了,写得不好之处,欢迎指正。

关注作者公众号:

查看原文

赞 0 收藏 0 评论 0

Forezp 发布了文章 · 2019-08-25

日志排查问题困难?分布式日志链路跟踪来帮你

作者:朱乐陶,软件架构师,具备多年Java开发及架构设计经验,擅长微服务领域
作者博客:https://blog.csdn.net/zlt2000

背景

开发排查系统问题用得最多的手段就是查看系统日志,在分布式环境中一般使用ELK来统一收集日志,但是在并发大时使用日志定位问题还是比较麻烦,由于大量的其他用户/其他线程的日志也一起输出穿行其中导致很难筛选出指定请求的全部相关日志,以及下游线程/服务对应的日志。

 

解决思路

每个请求都使用一个唯一标识来追踪全部的链路显示在日志中,并且不修改原有的打印方式(代码无入侵)
使用Logback的MDC机制日志模板中加入traceId标识,取值方式为%X{traceId}

MDC(Mapped Diagnostic Context,映射调试上下文)是 log4j 和 logback 提供的一种方便在多线程条件下记录日志的功能。MDC 可以看成是一个与当前线程绑定的Map,可以往其中添加键值对。MDC 中包含的内容可以被同一线程中执行的代码所访问。当前线程的子线程会继承其父线程中的 MDC 的内容。当需要记录日志时,只需要从 MDC 中获取所需的信息即可。MDC 的内容则由程序在适当的时候保存进去。对于一个 Web 应用来说,通常是在请求被处理的最开始保存这些数据。

方案实现

由于MDC内部使用的是ThreadLocal所以只有本线程才有效,子线程和下游的服务MDC里的值会丢失;所以方案主要的难点是解决值的传递问题,主要包括以几下部分:

  • API网关中的MDC数据如何传递给下游服务
  • 服务如何接收数据,并且调用其他远程服务时如何继续传递
  • 异步的情况下(线程池)如何传给子线程

修改日志模板

logback配置文件模板格式添加标识%X{traceId}

网关添加过滤器

生成traceId并通过header传递给下游服务

@Component
public class TraceFilter extends ZuulFilter {
    @Autowired
    private TraceProperties traceProperties;

    @Override
    public String filterType() {
        return FilterConstants.PRE_TYPE;
    }

    @Override
    public int filterOrder() {
        return FORM_BODY_WRAPPER_FILTER_ORDER - 1;
    }

    @Override
    public boolean shouldFilter() {
        //根据配置控制是否开启过滤器
        return traceProperties.getEnable();
    }

    @Override
    public Object run() {
        //链路追踪id
        String traceId = IdUtil.fastSimpleUUID();
        MDC.put(CommonConstant.LOG_TRACE_ID, traceId);
        RequestContext ctx = RequestContext.getCurrentContext();
        ctx.addZuulRequestHeader(CommonConstant.TRACE_ID_HEADER, traceId);
        return null;
    }
}

下游服务增加spring拦截器

接收并保存traceId的值
拦截器

public class TraceInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        String traceId = request.getHeader(CommonConstant.TRACE_ID_HEADER);
        if (StrUtil.isNotEmpty(traceId)) {
            MDC.put(CommonConstant.LOG_TRACE_ID, traceId);
        }
        return true;
    }
}

注册拦截器

public class DefaultWebMvcConfig extends WebMvcConfigurationSupport {
  @Override
  protected void addInterceptors(InterceptorRegistry registry) {
    //日志链路追踪拦截器
    registry.addInterceptor(new TraceInterceptor()).addPathPatterns("/**");

    super.addInterceptors(registry);
  }
}

下游服务增加feign拦截器

继续把当前服务的traceId值传递给下游服务

public class FeignInterceptorConfig {
    @Bean
    public RequestInterceptor requestInterceptor() {
        RequestInterceptor requestInterceptor = template -> {
            //传递日志traceId
            String traceId = MDC.get(CommonConstant.LOG_TRACE_ID);
            if (StrUtil.isNotEmpty(traceId)) {
                template.header(CommonConstant.TRACE_ID_HEADER, traceId);
            }
        };
        return requestInterceptor;
    }
}

解决父子线程传递问题

主要针对业务会使用线程池(异步、并行处理),并且spring自己也有@Async注解来使用线程池,要解决这个问题需要以下两个步骤

重写logback的LogbackMDCAdapter

由于logback的MDC实现内部使用的是ThreadLocal不能传递子线程,所以需要重写替换为阿里的TransmittableThreadLocal

TransmittableThreadLocal 是Alibaba开源的、用于解决 “在使用线程池等会缓存线程的组件情况下传递ThreadLocal” 问题的 InheritableThreadLocal 扩展。若希望 TransmittableThreadLocal 在线程池与主线程间传递,需配合 TtlRunnable 和 TtlCallable 使用。

TtlMDCAdapter类

package org.slf4j;

import com.alibaba.ttl.TransmittableThreadLocal;
import org.slf4j.spi.MDCAdapter;

public class TtlMDCAdapter implements MDCAdapter {
    /**
     * 此处是关键
     */
    private final ThreadLocal<Map<String, String>> copyOnInheritThreadLocal = new TransmittableThreadLocal<>();

    private static TtlMDCAdapter mtcMDCAdapter;

    static {
        mtcMDCAdapter = new TtlMDCAdapter();
        MDC.mdcAdapter = mtcMDCAdapter;
    }

    public static MDCAdapter getInstance() {
        return mtcMDCAdapter;
    }
其他代码与ch.qos.logback.classic.util.LogbackMDCAdapter一样,只需改为调用copyOnInheritThreadLocal变量

TtlMDCAdapterInitializer类用于程序启动时加载自己的mdcAdapter实现

public class TtlMDCAdapterInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
    @Override
    public void initialize(ConfigurableApplicationContext applicationContext) {
        //加载TtlMDCAdapter实例
        TtlMDCAdapter.getInstance();
    }
}

扩展线程池实现

增加TtlRunnable和TtlCallable扩展实现TTL

public class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    @Override
    public void execute(Runnable runnable) {
        Runnable ttlRunnable = TtlRunnable.get(runnable);
        super.execute(ttlRunnable);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        Callable ttlCallable = TtlCallable.get(task);
        return super.submit(ttlCallable);
    }

    @Override
    public Future<?> submit(Runnable task) {
        Runnable ttlRunnable = TtlRunnable.get(task);
        return super.submit(ttlRunnable);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        Runnable ttlRunnable = TtlRunnable.get(task);
        return super.submitListenable(ttlRunnable);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        Callable ttlCallable = TtlCallable.get(task);
        return super.submitListenable(ttlCallable);
    }
}

场景测试

测试代码如下

api网关打印的日志

网关生成traceId值为13d9800c8c7944c78a06ce28c36de670

请求跳转到文件服务时打印的日志

显示的traceId与网关相同,这里特意模拟发生异常的场景

ELK聚合日志通过traceId查询整条链路日志

当系统出现异常时,可直接通过该异常日志的traceId​的值,在日志中心中询该请求的所有日志信息

源码下载

附上我的开源微服务框架(包含本文中的代码),欢迎 star 关注

https://gitee.com/zlt2000/mic...

<div>

<p align="center">
    <img data-original="https://www.fangzhipeng.com/img/avatar.jpg" width="258" height="258"/>
    <br>
    扫一扫,支持下作者吧
</p>
<p align="center" style="margin-top: 15px; font-size: 11px;color: #cc0000;">
    <strong>(转载本站文章请注明作者和出处 <a href="https://www.fangzhipeng.com">方志朋的博客</a>)</strong>
</p>

</div>

查看原文

赞 1 收藏 1 评论 0

Forezp 发布了文章 · 2019-02-21

spring cloud config将配置存储在数据库中

转载请标明出处:
https://blog.csdn.net/forezp/...
本文出自方志朋的博客

Spring Cloud Config Server最常见是将配置文件放在本地或者远程Git仓库,放在本地是将将所有的配置文件统一写在Config Server工程目录下,如果需要修改配置,需要重启config server;放在Git仓库,是将配置统一放在Git仓库,可以利用Git仓库的版本控制。本文将介绍使用另外一种方式存放配置信息,即将配置存放在Mysql中。

整个流程:Config Sever暴露Http API接口,Config Client 通过调用Config Sever的Http API接口来读取配置Config Server的配置信息,Config Server从数据中读取具体的应用的配置。流程图如下:

61.png

案例实战

在本案例中需要由2个工程,分为config-server和config-client,其中config-server工程需要连接Mysql数据库,读取配置;config-client则在启动的时候从config-server工程读取。本案例Spring Cloud版本为Greenwich.RELEASE,Spring Boot版本为2.1.0.RELEASE。

工程描述
config-server端口8769,从数据库中读取配置
config-client端口8083,从config-server读取配置

搭建config-server工程

创建工程config-server,在工程的pom文件引入config-server的起步依赖,mysql的连接器,jdbc的起步依赖,代码如下:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

在工程的配置文件application.yml下做以下的配置:

spring:
  profiles:
     active: jdbc
  application:
     name: config-jdbc-server
  datasource:
     url: jdbc:mysql://127.0.0.1:3306/config-jdbc?useUnicode=true&characterEncoding=utf8&characterSetResults=utf8&serverTimezone=GMT%2B8
     username: root
     password: 123456
     driver-class-name: com.mysql.jdbc.Driver
  cloud:
     config:
       label: master
       server:
         jdbc: true
server:
  port: 8769
spring.cloud.config.server.jdbc.sql: SELECT key1, value1 from config_properties where APPLICATION=? and PROFILE=? and LABEL=?

其中,spring.profiles.active为spring读取的配置文件名,从数据库中读取,必须为jdbc。spring.datasource配置了数据库相关的信息,spring.cloud.config.label读取的配置的分支,这个需要在数据库中数据对应。spring.cloud.config.server.jdbc.sql为查询数据库的sql语句,该语句的字段必须与数据库的表字段一致。

在程序的启动文件ConfigServerApplication加上@EnableConfigServer注解,开启ConfigServer的功能,代码如下:

@SpringBootApplication
@EnableConfigServer
public class ConfigServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConfigServerApplication.class, args);
    }
}

初始化数据库

由于Config-server需要从数据库中读取,所以读者需要先安装MySQL数据库,安装成功后,创建config-jdbc数据库,数据库编码为utf-8,然后在config-jdbc数据库下,执行以下的数据库脚本:

CREATE TABLE `config_properties` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `key1` varchar(50) COLLATE utf8_bin NOT NULL,
  `value1` varchar(500) COLLATE utf8_bin DEFAULT NULL,
  `application` varchar(50) COLLATE utf8_bin NOT NULL,
  `profile` varchar(50) COLLATE utf8_bin NOT NULL,
  `label` varchar(50) COLLATE utf8_bin DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin

其中key1字段为配置的key,value1字段为配置的值,application字段对应于应用名,profile对应于环境,label对应于读取的分支,一般为master。

插入数据config-client 的2条数据,包括server.port和foo两个配置,具体数据库脚本如下:


insert into `config_properties` (`id`, `key1`, `value1`, `application`, `profile`, `label`) values('1','server.port','8083','config-client','dev','master');
insert into `config_properties` (`id`, `key1`, `value1`, `application`, `profile`, `label`) values('2','foo','bar-jdbc','config-client','dev','master');

搭建config-client

在 config-client工程的pom文件,引入web和config的起步依赖,代码如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-config</artifactId>
</dependency>

在程序的启动配置文件 bootstrap.yml做程序的相关配置,一定要是bootstrap.yml,不可以是application.yml,bootstrap.yml的读取优先级更高,配置如下:

spring:
  application:
    name: config-client
  cloud:
    config:
      uri: http://localhost:8769
      fail-fast: true
  profiles:
    active: dev

其中spring.cloud.config.uri配置的config-server的地址,spring.cloud.config.fail-fast配置的是读取配置失败后,执行快速失败。spring.profiles.active配置的是spring读取配置文件的环境。

在程序的启动文件ConfigClientApplication,写一个RestAPI,读取配置文件的foo配置,返回给浏览器,代码如下:

@SpringBootApplication
@RestController
public class ConfigClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConfigClientApplication.class, args);
    }

    @Value("${foo}")
    String foo;
    @RequestMapping(value = "/foo")
    public String hi(){
        return foo;
    }
}

依次启动2个工程,其中config-client的启动端口为8083,这个是在数据库中的,可见config-client从 config-server中读取了配置。在浏览器上访问http://localhost:8083/foo,浏览器显示bar-jdbc,这个是在数据库中的,可见config-client从 config-server中读取了配置。

参考资料

https://cloud.spring.io/sprin...

源码下载

https://github.com/forezp/Spr...

<div>

<p align="center">
    <img data-original="https://www.fangzhipeng.com/img/avatar.jpg" width="258" height="258"/>
    <br>
    扫一扫,支持下作者吧
</p>
<p align="center" style="margin-top: 15px; font-size: 11px;color: #cc0000;">
    <strong>(转载本站文章请注明作者和出处 <a href="https://www.fangzhipeng.com">方志朋的博客</a>)</strong>
</p>

</div>

查看原文

赞 1 收藏 1 评论 0

Forezp 发布了文章 · 2019-02-14

Spring Cloud Consul 之Greenwich版本全攻略

转载请标明出处:
http://blog.csdn.net/forezp/a...
本文出自方志朋的博客

什么是Consul

Consul是HashiCorp公司推出的开源软件,使用GO语言编写,提供了分布式系统的服务注册和发现、配置等功能,这些功能中的每一个都可以根据需要单独使用,也可以一起使用以构建全方位的服务网格。Consul不仅具有服务治理的功能,而且使用分布式一致协议RAFT算法实现,有多数据中心的高可用方案,并且很容易和Spring Cloud等微服务框架集成,使用起来非常的简单,具有简单、易用、可插排等特点。使用简而言之,Consul提供了一种完整的服务网格解决方案 。

Consul具有以下的特点和功能

  • 服务发现:Consul的客户端可以向Consul注册服务,例如api服务或者mysql服务,其他客户端可以使用Consul来发现服务的提供者。Consul支持使用DNS或HTTP来注册和发现服务。
  • 运行时健康检查:Consul客户端可以提供任意数量的运行状况检查机制,这些检查机制可以是给定服务(“是Web服务器返回200 OK”)或本地节点(“内存利用率低于90%”)相关联。这些信息可以用来监控群集的运行状况,服务发现组件可以使用这些监控信息来路由流量,可以使流量远离不健康的服务。
  • KV存储:应用程序可以将Consul的键/值存储用于任何需求,包括动态配置,功能标记,协调,领导者选举等。它采用HTTP API使其易于使用。
  • 安全服务通信:Consul可以为服务生成和分发TLS证书,以建立相互的TLS连接。
  • 多数据中心:Consul支持多个数据中心。这意味着Consul的用户不必担心构建额外的抽象层以扩展到多个区域。

Consul原理

每个提供服务的节点都运行了Consul的代理,运行代理不需要服务发现和获取配置的KV键值对,代理只负责监控检查。代理节点可以和一个或者多个Consul server通讯。 Consul服务器是存储和复制数据的地方。服务器本身选出了领导者。虽然Consul可以在一台服务器上运行,但建议使用3到5,以避免导致数据丢失的故障情况。建议为每个数据中心使用一组Consul服务器。
如果你的组件需要发现服务,可以查询任何Consul Server或任何Consul客户端,Consul客户端会自动将查询转发给Consul Server。
需要发现其他服务或节点的基础架构组件可以查询任何Consul服务器或任何Consul代理。代理会自动将查询转发给服务器。每个数据中心都运行Consul服务器集群。发生跨数据中心服务发现或配置请求时,本地Consul服务器会将请求转发到远程数据中心并返回结果。

术语

  • Agent agent是一直运行在Consul集群中每个成员上的守护进程。通过运行 consul agent 来启动。agent可以运行在client或者server模式。指定节点作为client或者server是非常简单的,除非有其他agent实例。所有的agent都能运行DNS或者HTTP接口,并负责运行时检查和保持服务同步。
  • Client 一个Client是一个转发所有RPC到server的代理。这个client是相对无状态的。client唯一执行的后台活动是加入LAN gossip池。这有一个最低的资源开销并且仅消耗少量的网络带宽。
  • Server 一个server是一个有一组扩展功能的代理,这些功能包括参与Raft选举,维护集群状态,响应RPC查询,与其他数据中心交互WAN gossip和转发查询给leader或者远程数据中心。
  • DataCenter 虽然数据中心的定义是显而易见的,但是有一些细微的细节必须考虑。例如,在EC2中,多个可用区域被认为组成一个数据中心?我们定义数据中心为一个私有的,低延迟和高带宽的一个网络环境。这不包括访问公共网络,但是对于我们而言,同一个EC2中的多个可用区域可以被认为是一个数据中心的一部分。
  • Consensus 在我们的文档中,我们使用Consensus来表明就leader选举和事务的顺序达成一致。由于这些事务都被应用到有限状态机上,Consensus暗示复制状态机的一致性。
  • Gossip Consul建立在Serf的基础之上,它提供了一个用于多播目的的完整的gossip协议。Serf提供成员关系,故障检测和事件广播。更多的信息在gossip文档中描述。这足以知道gossip使用基于UDP的随机的点到点通信。
  • LAN Gossip 它包含所有位于同一个局域网或者数据中心的所有节点。
  • WAN Gossip 它只包含Server。这些server主要分布在不同的数据中心并且通常通过因特网或者广域网通信。
  • RPC 远程过程调用。这是一个允许client请求server的请求/响应机制。

image

让我们分解这张图并描述每个部分。首先,我们能看到有两个数据中心,标记为“1”和“2”。Consul对多数据中心有一流的支持并且希望这是一个常见的情况。

在每个数据中心,client和server是混合的。一般建议有3-5台server。这是基于有故障情况下的可用性和性能之间的权衡结果,因为越多的机器加入达成共识越慢。然而,并不限制client的数量,它们可以很容易的扩展到数千或者数万台。

同一个数据中心的所有节点都必须加入gossip协议。这意味着gossip协议包含一个给定数据中心的所有节点。这服务于几个目的:第一,不需要在client上配置server地址。发现都是自动完成的。第二,检测节点故障的工作不是放在server上,而是分布式的。这是的故障检测相比心跳机制有更高的可扩展性。第三:它用来作为一个消息层来通知事件,比如leader选举发生时。

每个数据中心的server都是Raft节点集合的一部分。这意味着它们一起工作并选出一个leader,一个有额外工作的server。leader负责处理所有的查询和事务。作为一致性协议的一部分,事务也必须被复制到所有其他的节点。因为这一要求,当一个非leader得server收到一个RPC请求时,它将请求转发给集群leader。

server节点也作为WAN gossip Pool的一部分。这个Pool不同于LAN Pool,因为它是为了优化互联网更高的延迟,并且它只包含其他Consul server节点。这个Pool的目的是为了允许数据中心能够以low-touch的方式发现彼此。这使得一个新的数据中心可以很容易的加入现存的WAN gossip。因为server都运行在这个pool中,它也支持跨数据中心请求。当一个server收到来自另一个数据中心的请求时,它随即转发给正确数据中想一个server。该server再转发给本地leader。

这使得数据中心之间只有一个很低的耦合,但是由于故障检测,连接缓存和复用,跨数据中心的请求都是相对快速和可靠的。

Consul 服务注册发现流程

Consul在业界最广泛的用途就是作为服务注册中心,同Eureka类型,consul作为服务注册中心,它的注册和发现过程如下图:
31.png

在上面的流程图上有三个角色,分别为服务注册中心、服务提供者、服务消费者。

  • 服务提供者Provider启动的时候,会向Consul发送一个请求,将自己的host、ip、应用名、健康检查等元数据信息发送给Consul
  • Consul 接收到 Provider 的注册后,定期向 Provider 发送健康检查的请求,检验Provider是否健康
  • 服务消费者Consumer会从注册中心Consul中获取服务注册列表,当服务消费者消费服务时,根据应用名从服务注册列表获取到具体服务的实例(1个或者多个),从而完成服务的调用。

Consul VS Eureka

Eureka是一种服务发现工具。 该体系结构主要是客户端/服务器,每个数据中心有一组Eureka服务器,通常每个可用区域一个。 通常,Eureka的客户使用嵌入式SDK来注册和发现服务。 对于非本地集成的客户端,使用Ribbon等边车通过Eureka透明地发现服务。

Eureka使用尽力而为的复制提供弱一致的服务视图。 当客户端向服务器注册时,该服务器将尝试复制到其他服务器但不提供保证。 服务注册的生存时间很短(TTL),要求客户端对服务器进行心跳检测。 不健康的服务或节点将停止心跳,导致它们超时并从注册表中删除。 发现请求可以路由到任何服务,由于尽力复制,这些服务可以提供过时或丢失的数据。 这种简化的模型允许轻松的集群管理和高可扩展性。

Consul提供了一系列超级功能,包括更丰富的运行状况检查,键/值存储和多数据中心感知。 Consul需要每个数据中心中的一组服务器,以及每个客户端上的代理,类似于使用像Ribbon这样的边车。 Consul代理允许大多数应用程序不知道Consul,通过配置文件执行服务注册以及通过DNS或负载平衡器sidecars进行发现。

Consul提供强大的一致性保证,因为服务器使用Raft协议复制状态。 Consul支持丰富的运行状况检查,包括TCP,HTTP,Nagios / Sensu兼容脚本或基于的Eureka的TTL。 客户端节点参与基于gossip的健康检查,该检查分发健康检查的工作,而不像集中式心跳,这成为可扩展性挑战。 发现请求被路由到当选的Consul领导者,这使他们默认情况下非常一致。 允许过时读取的客户端允许任何服务器处理其请求,从而允许像Eureka一样的线性可伸缩性。

Consul的强烈一致性意味着它可以用作领导者选举和集群协调的锁定服务。 Eureka不提供类似的保证,并且通常需要为需要执行协调或具有更强一致性需求的服务运行ZooKeeper。

Consul提供了支持面向服务的体系结构所需的功能工具包。 这包括服务发现,还包括丰富的运行状况检查,锁定,键/值,多数据中心联合,事件系统和ACL。 Consul和consul-template和envconsul等工具生态系统都试图最大限度地减少集成所需的应用程序更改,以避免需要通过SDK进行本机集成。 Eureka是更大的Netflix OSS套件的一部分,该套件期望应用程序相对同质且紧密集成。 因此,Eureka只解决了有限的一部分问题,期望其他工具如ZooKeeper可以同时使用。

Eureka Server端采用的是P2P的复制模式,但是它不保证复制操作一定能成功,因此它提供的是一个最终一致性的服务实例视图;Client端在Server端的注册信息有一个带期限的租约,一旦Server端在指定期间没有收到Client端发送的心跳,则Server端会认定为Client端注册的服务是不健康的,定时任务将会将其从注册表中删除。Consul与Eureka不同,Consul采用Raft算法,可以提供强一致性的保证,Consul的agent相当于Netflix Ribbon + Netflix Eureka Client,而且对应用来说相对透明,同时相对于Eureka这种集中式的心跳检测机制,Consul的agent可以参与到基于goosip协议的健康检查,分散了server端的心跳检测压力。除此之外,Consul为多数据中心提供了开箱即用的原生支持等。

Consul下载和安装

Consul采用Go语言编写,支持Linux、Mac、Windows等各大操作系统,本文使用windows操作系统,下载地址:https://www.consul.io/downloa...,下完成后解压到计算机目录下,解压成功后,只有一个可执行的consul.exe可执行文件。打开cmd终端,切换到目录,执行以下命令:

consul --version

终端显示如下:

Consul v1.4.2
Protocol 2 spoken by default, understands 2 to 3 (agent will automatically use p
rotocol >2 when speaking to compatible agents)

证明consul下载成功了,并可执行。

consul的一些常见的执行命令如下:

命令解释示例
agent运行一个consul agentconsul agent -dev
join将agent加入到consul集群consul join IP
members列出consul cluster集群中的membersconsul members
leave将节点移除所在集群consul leave

更多命令请查看官方网站:https://www.consul.io/docs/co...

开发模式启动:

consul agent -dev 

启动成功,在浏览器上访问:http://localhost:8500,显示的界面如下:

21.png

spring cloud consul

该项目通过自动配置并绑定到Spring环境和其他Spring编程模型成语,为Spring Boot应用程序提供Consul集成。通过几个简单的注释,您可以快速启用和配置应用程序中的常见模式,并使用基于Consul的组件构建大型分布式系统。提供的模式包括服务发现,控制总线和配置。智能路由(Zuul)和客户端负载平衡(Ribbon),断路器(Hystrix)通过与Spring Cloud Netflix的集成提供。

使用spring cloud consul来服务注册与发现

本小节以案例的形式来讲解如何使用Spring Cloud Consul来进行服务注册和发现的,并且使用Feign来消费服务。再讲解之前,已经启动consul的agent,并且在浏览器上http://localhost:8500能够显示正确的页面。本案例一共有2个工程,分别如下:

工程名端口描述
consul-provider8763服务提供者
consul-consumer8765服务消费者

其中,服务提供者和服务消费者分别向consul注册,注册完成后,服务消费者通过FeignClient来消费服务提供者的服务。

服务提供者consul-provider

创建一个工程consul-provider,在工程的pom文件引入以下依赖,包括consul-discovery的起步依赖,该依赖是spring cloud consul用来向consul 注册和发现服务的依赖,采用REST API的方式进行通讯。另外加上web的起步依赖,用于对外提供REST API。代码如下:

 
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
 

在工程的配置文件application.yml做下以下配置:

server:
  port: 8763
spring:
  application:
    name: consul-provider
  cloud:
    consul:
      host: localhost
      port: 8500
      discovery:
        serviceName: consul-provider

上面的配置,指定了程序的启动端口为8763,应用名为consul-provider,consul注册中心的地址为localhost:8500

在程序员的启动类ConsulProviderApplication加上@EnableDiscoveryClient注解,开启服务发现的功能。

@SpringBootApplication
@EnableDiscoveryClient
public class ConsulProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsulProviderApplication.class, args);
    }

}

写一个RESTAPI,该API为一个GET请求,返回当前程序的启动端口,代码如下。


@RestController
public class HiController {

    @Value("${server.port}")
    String port;
    @GetMapping("/hi")
    public String home(@RequestParam String name) {
        return "hi "+name+",i am from port:" +port;
    }

}

启动工程,在浏览器上访问http://localhost:8500,页面显示如下:

微信截图_20190131175501.png

从上图可知,consul-provider服务已经成功注册到consul上面去了。

服务消费者consul-provider

服务消费者的搭建过程同服务提供者,在pom文件中引入的依赖同服务提供者,在配置文件application.yml配置同服务提供者,不同的点在端口为8765,服务名为consul-consumer。

写一个FeignClient,该FeignClient调用consul-provider的REST API,代码如下:


@FeignClient(value = "consul-provider")
public interface EurekaClientFeign {

 
    @GetMapping(value = "/hi")
    String sayHiFromClientEureka(@RequestParam(value = "name") String name);
}

Service层代码如下:

@Service
public class HiService {

    @Autowired
    EurekaClientFeign eurekaClientFeign;
 
   
    public String sayHi(String name){
        return  eurekaClientFeign.sayHiFromClientEureka(name);
    }
}

对外提供一个REST API,该API调用了consul-provider的服务,代码如下:

@RestController
public class HiController {
    @Autowired
    HiService hiService;

    @GetMapping("/hi")
    public String sayHi(@RequestParam( defaultValue = "forezp",required = false)String name){
        return hiService.sayHi(name);
    }
}

在浏览器上访问http://localhost:8765/hi,浏览器响应如下:

hi forezp,i am from port:8763

这说明consul-consumer已经成功调用了consul-provider的服务。这说明consul-provider的服务已经注册到了consul的注册中心上面去了。consul-consumer能够获取注册中心的注册列表来获来消费服务。

使用Spring Cloud Consul Config来做服务配置中心

Consul不仅能用来服务注册和发现,Consul而且支持Key/Value键值对的存储,可以用来做配置中心。Spring Cloud 提供了Spring Cloud Consul Config依赖去和Consul相集成,用来做配置中心。
现在以案例的形式来讲解如何使用Consul作为配置中心,本案例在上一个案例的consul-provider基础上进行改造。首先在工程的pom文件加上consul-config的起步依赖,代码如下:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-consul-config</artifactId>
</dependency>

然后在配置文件application.yml加上以下的以下的配置,配置如下:

spring:
  profiles:
    active: dev 

上面的配置指定了SpringBoot启动时的读取的profiles为dev。
然后再工程的启动配置文件bootstrap.yml文件中配置以下的配置:

spring:
  application:
    name: consul-provider
  cloud:
    consul:
      host: localhost
      port: 8500
      discovery:
        serviceName: consul-provider
      config:
        enabled: true
        format: yaml           
        prefix: config     
        profile-separator: ':'    
        data-key: data           

关于spring.cloud.consul.config的配置项描述如下:

  • enabled 设置config是否启用,默认为true
  • format 设置配置的值的格式,可以yaml和properties
  • prefix 设置配的基本目录,比如config
  • defaultContext 设置默认的配置,被所有的应用读取,本例子没用的
  • profileSeparator profiles配置分隔符,默认为‘,’
  • date-key为应用配置的key名字,值为整个应用配置的字符串。

网页上访问consul的KV存储的管理界面,即http://localhost:8500/ui/dc1/kv,创建一条记录,

key值为:config/consul-provider:dev/data
value值如下:

foo:
  bar: bar1
server:
  port: 8081

51.png

在consul-provider工程新建一个API,该API返回从consul 配置中心读取foo.bar的值,代码如下:

@RestController
public class FooBarController {
    
    @Value("${foo.bar}")
    String fooBar;

    @GetMapping("/foo")
    public String getFooBar() {
        return fooBar;
    }
}

启动工程,可以看到程序的启动端口为8081,即是consul的配置中心配置的server.port端口。
工程启动完成后,在浏览器上访问http://localhost:8081/foo,页面显示bar1。由此可知,应用consul-provider已经成功从consul的配置中心读取了配置foo.bar的配置。

动态刷新配置

当使用spring cloud config作为配置中心的时候,可以使用spring cloud config bus支持动态刷新配置。Spring Cloud Comsul Config默认就支持动态刷新,只需要在需要动态刷新的类上加上@RefreshScope注解即可,修改代码如下:

@RestController
@RefreshScope
public class FooBarController {

    @Value("${foo.bar}")
    String fooBar;

    @GetMapping("/foo")
    public String getFooBar() {
        return fooBar;
    }
}

启动consul-provider工程,在浏览器上访问http://localhost:8081/foo,页面显示bar1。然后
在网页上访问consul的KV存储的管理界面,即http://localhost:8500/ui/dc1/kv,修改config/consul-provider:dev/data的值,修改后的值如下:

foo:
  bar: bar2
server: 
  port: 8081

此时不重新启动consul-provider,在浏览器上访问http://localhost:8081/foo,页面显示bar2。可见foo.bar的最新配置在应用不重启的情况下已经生效。

注意事项

  • consul支持的KV存储的Value值不能超过512KB
  • Consul的dev模式,所有数据都存储在内存中,重启Consul的时候会导致所有数据丢失,在正式的环境中,Consul的数据会持久化,数据不会丢失。

参考资料

https://www.consul.io/intro/i...

https://www.consul.io/docs/in...

https://www.consul.io/intro/v...

http://www.ityouknow.com/spri...

https://springcloud.cc/spring...

https://www.cnblogs.com/lsf90...

https://blog.csdn.net/longgeq...

更多阅读

史上最简单的 SpringCloud 教程汇总

SpringBoot教程汇总

Java面试题系列汇总

查看原文

赞 6 收藏 6 评论 0

Forezp 发布了文章 · 2019-02-12

Spring Cloud Sleuth 之Greenwich版本全攻略

微服务架构是一个分布式架构,微服务系统按业务划分服务单元,一个微服务系统往往有很多个服务单元。由于服务单元数量众多,业务的复杂性较高,如果出现了错误和异常,很难去定位。主要体现在一个请求可能需要调用很多个服务,而内部服务的调用复杂性决定了问题难以定位。所以在微服务架构中,必须实现分布式链路追踪,去跟进一个请求到底有哪些服务参与,参与的顺序又是怎样的,从而达到每个请求的步骤清晰可见,出了问题能够快速定位的目的。

image

在微服务系统中,一个来自用户的请求先到达前端A(如前端界面),然后通过远程调用,到达系统的中间件B、C(如负载均衡、网关等),最后到达后端服务D、E,后端经过一系列的业务逻辑计算,最后将数据返回给用户。对于这样一个请求,经历了这么多个服务,怎么样将它的请求过程用数据记录下来呢?这就需要用到服务链路追踪。

Spring Cloud Sleuth

Spring Cloud Sleuth 为服务之间调用提供链路追踪。通过 Sleuth 可以很清楚的了解到一个服务请求经过了哪些服务,每个服务处理花费了多长。从而让我们可以很方便的理清各微服务间的调用关系。此外 Sleuth 可以帮助我们:

  • 耗时分析: 通过 Sleuth 可以很方便的了解到每个采样请求的耗时,从而分析出哪些服务调用比较耗时;
  • 可视化错误: 对于程序未捕捉的异常,可以通过集成 Zipkin 服务界面上看到;
  • 链路优化: 对于调用比较频繁的服务,可以针对这些服务实施一些优化措施。

Google开源了Dapper链路追踪组件,并在2010年发表了论文《Dapper, a Large-Scale Distributed Systems Tracing Infrastructure》,这篇论文是业内实现链路追踪的标杆和理论基础,具有很高的参考价值。

image

Spring Cloud Sleuth采用了Google的开源项目Dapper的专业术语。

  • Span:基本工作单元,发送一个远程调度任务就会产生一个Span,Span是用一个64位ID唯一标识的,Trace是用另一个64位ID唯一标识的。Span还包含了其他的信息,例如摘要、时间戳事件、Span的ID以及进程ID。
  • Trace:由一系列Span组成的,呈树状结构。请求一个微服务系统的API接口,这个API接口需要调用多个微服务单元,调用每个微服务单元都会产生一个新的Span,所有由这个请求产生的Span组成了这个Trace。
  • Annotation:用于记录一个事件,一些核心注解用于定义一个请求的开始和结束,这些注解如下。

    • cs-Client Sent:客户端发送一个请求,这个注解描述了Span的开始。
    • sr-Server Received:服务端获得请求并准备开始处理它,如果将其sr减去cs时间戳,便可得到网络传输的时间。
    • ss-Server Sent:服务端发送响应,该注解表明请求处理的完成(当请求返回客户端),用ss的时间戳减去sr时间戳,便可以得到服务器请求的时间。
    • cr-Client Received:客户端接收响应,此时Span结束,如果cr的时间戳减去cs时间戳,便可以得到整个请求所消耗的时间。

Spring Cloud Sleuth 也为我们提供了一套完整的链路解决方案,Spring Cloud Sleuth 可以结合 Zipkin,将信息发送到 Zipkin,利用 Zipkin 的存储来存储链路信息,利用 Zipkin UI 来展示数据。

Zipkin

Zipkin是一种分布式链路追踪系统。 它有助于收集解决微服务架构中的延迟问题所需的时序数据。 它管理这些数据的收集和查找。 Zipkin的设计基于Google Dapper论文。

跟踪器存在于应用程序中,记录请求调用的时间和元数据。跟踪器使用库,它们的使用对用户是无感知的。例如,Web服务器会在收到请求时和发送响应时会记录相应的时间和一些元数据。一次完整链路请求所收集的数据被称为Span。

我们可以使用它来收集各个服务器上请求链路的跟踪数据,并通过它提供的 REST API 接口来辅助我们查询跟踪数据以实现对分布式系统的监控程序,从而及时地发现系统中出现的延迟升高问题并找出系统性能瓶颈的根源。除了面向开发的 API 接口之外,它也提供了方便的 UI 组件来帮助我们直观的搜索跟踪信息和分析请求链路明细,比如:可以查询某段时间内各用户请求的处理时间等。
Zipkin 提供了可插拔数据存储方式:In-Memory、MySql、Cassandra 以及 Elasticsearch。接下来的测试为方便直接采用 In-Memory 方式进行存储,生产推荐 Elasticsearch.

image

上图展示了 Zipkin 的基础架构,它主要由 4 个核心组件构成:

  • Collector:收集器组件,它主要用于处理从外部系统发送过来的跟踪信息,将这些信息转换为 Zipkin 内部处理的 Span 格式,以支持后续的存储、分析、展示等功能。
  • Storage:存储组件,它主要对处理收集器接收到的跟踪信息,默认会将这些信息存储在内存中,我们也可以修改此存储策略,通过使用其他存储组件将跟踪信息存储到数据库中。
  • RESTful API:API 组件,它主要用来提供外部访问接口。比如给客户端展示跟踪信息,或是外接系统访问以实现监控等。
  • Web UI:UI 组件,基于 API 组件实现的上层应用。通过 UI 组件用户可以方便而有直观地查询和分析跟踪信息。

案例实战

在本案例一共有三个应用,分别为注册中心,eureka-server、eureka-client、eureka-client-feign,三个应用的基本信息如下:

应用名端口作用
eureka-server8761注册中心
eureka-client8763服务提供者
eureka-client-feign8765服务消费者

其中eureka-server 应用为注册中心,其他两个应用向它注册。eureka-client为服务提供者,提供了一个RESTAPI,eureka-client-feign为服务消费者,通过Feign Client向服务提供者消费服务。

在之前的文章已经讲述了如何如何搭建服务注册中心,在这里就省略这一部分内容。服务提供者提供一个REST接口,服务消费者通过FeignClient消费服务。

服务提供者

eureka-client服务提供者,对外提供一个RESTAPI,并向服务注册中心注册,这部分内容,不再讲述,见源码。需要在工程的pom文件加上sleuth的起步依赖和zipkin的起步依赖,代码如下:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

在工程的配置文件application.yml需要做以下的配置:

spring:
  sleuth:
    web:
      client:
        enabled: true
    sampler:
      probability: 1.0 # 将采样比例设置为 1.0,也就是全部都需要。默认是 0.1
  zipkin:
    base-url: http://localhost:9411/ # 指定了 Zipkin 服务器的地址

其中spring.sleuth.web.client.enable为true设置的是web开启sleuth功能;spring.sleuth.sampler.probability可以设置为小数,最大值为1.0,当设置为1.0时就是链路数据100%收集到zipkin-server,当设置为0.1时,即10%概率收集链路数据;spring.zipkin.base-url设置zipkin-server的地址。

对外提供一个Api,代码如下:


@RestController
public class HiController {

    @Value("${server.port}")
    String port;
    @GetMapping("/hi")
    public String home(@RequestParam String name) {
        return "hi "+name+",i am from port:" +port;
    }

}

服务消费者

服务消费者通过FeignClient消费服务提供者提供的服务。同服务提供者一样,需要在工程的pom文件加上sleuth的起步依赖和zipkin的起步依赖,另外也需要在配置文件application.yml做相关的配置,具体同服务提供者。

服务消费者通过feignClient进行服务消费,feignclient代码如下:


@FeignClient(value = "eureka-client",configuration = FeignConfig.class)
public interface EurekaClientFeign {

    @GetMapping(value = "/hi")
    String sayHiFromClientEureka(@RequestParam(value = "name") String name);
}

servcie层代码如下:

@Service
public class HiService {

    @Autowired
    EurekaClientFeign eurekaClientFeign;

 
    public String sayHi(String name){
        return  eurekaClientFeign.sayHiFromClientEureka(name);
    }
}

controller代码如下:

@RestController
public class HiController {
    @Autowired
    HiService hiService;

    @GetMapping("/hi")
    public String sayHi(@RequestParam( defaultValue = "forezp",required = false)String name){
        return hiService.sayHi(name);
    }

上面的代码对外暴露一个API,通过FeignClient的方式调用eureka-client的服务。

zipkin-server

在Spring Cloud D版本,zipkin-server通过引入依赖的方式构建工程,自从E版本之后,这一方式改变了,采用官方的jar形式启动,所以需要通过下载官方的jar来启动,也通过以下命令一键启动:

curl -sSL https://zipkin.io/quickstart.sh | bash -s
java -jar zipkin.jar

上面的第一行命令会从zipkin官网下载官方的jar包。
如果是window系统,建议使用gitbash执行上面的命令。

如果用 Docker 的话,使用以下命令:


docker run -d -p 9411:9411 openzipkin/zipkin

通过java -jar zipkin.jar的方式启动之后,在浏览器上访问lcoalhost:9411,显示的界面如下:

1.png

链路数据验证

依次启动eureka-server,eureka-client,eureka-client-feign的三个应用,等所有应用启动完成后,在浏览器上访问http://localhost:8765/hi(如果报错,是服务与发现需要一定的时间,耐心等待几十秒),访问成功后,再次在浏览器上访问zipkin-server的页面,显示如下:

2.png

从上图可以看出每次请求所消耗的时间,以及一些span的信息。

3.png

从上图可以看出具体的服务依赖关系,eureka-feign-client依赖了eureka-client。

使用rabbitmq进行链路数据收集

在上面的案例中使用的http请求的方式将链路数据发送给zipkin-server,其实还可以使用rabbitmq的方式进行服务的消费。使用rabbitmq需要安装rabbitmq程序,下载地址http://www.rabbitmq.com/

下载完成后,需要eureka-client和eureka-client-feign的起步依赖加上rabbitmq的依赖,依赖如下:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

在配置文件上需要配置rabbitmq的配置,配置信息如下:

spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    port: 5672

另外需要把spring.zipkin.base-url去掉。

在上面2个工程中,rabbitmq通过发送链路数据,那么zipkin-server是怎么样知道rabbitmq的地址呢,怎么监听收到的链路数据呢?这需要在程序启动的时候,通过环境变量的形式到环境中,然后zikin-server从环境变量中读取。
可配置的属性如下:

属性环境变量描述
zipkin.collector.rabbitmq.addressesRABBIT_ADDRESSES用逗号分隔的 RabbitMQ 地址列表,例如localhost:5672,localhost:5673
zipkin.collector.rabbitmq.passwordRABBIT_PASSWORD连接到 RabbitMQ 时使用的密码,默认为 guest
zipkin.collector.rabbitmq.usernameRABBIT_USER连接到 RabbitMQ 时使用的用户名,默认为guest
zipkin.collector.rabbitmq.virtual-hostRABBIT_VIRTUAL_HOST使用的 RabbitMQ virtual host,默认为 /
zipkin.collector.rabbitmq.use-sslRABBIT_USE_SSL设置为true则用 SSL 的方式与 RabbitMQ 建立链接
zipkin.collector.rabbitmq.concurrencyRABBIT_CONCURRENCY并发消费者数量,默认为1
zipkin.collector.rabbitmq.connection-timeoutRABBIT_CONNECTION_TIMEOUT建立连接时的超时时间,默认为 60000毫秒,即 1 分钟
zipkin.collector.rabbitmq.queueRABBIT_QUEUE从中获取 span 信息的队列,默认为 zipkin

比如,通过以下命令启动:

RABBIT_ADDRESSES=localhost java -jar zipkin.jar

上面的命令等同于一下的命令:

java -jar zipkin.jar --zipkin.collector.rabbitmq.addressed=localhost

用上面的2条命令中的任何一种方式重新启动zipkin-server程序,并重新启动eureka-client、eureka-server、eureka-client-feign,动完成后在浏览器上访问http://localhost:8765/hi,再访问http://localhost:9411/zipkin/,就可以看到通过Http方式发送链路数据一样的接口。

自定义Tag

在页面上可以查看每个请求的traceId,每个trace又包含若干的span,每个span又包含了很多的tag,自定义tag可以通过Tracer这个类来自定义。


@Autowired
Tracer tracer;

 @GetMapping("/hi")
    public String home(@RequestParam String name) {
        tracer.currentSpan().tag("name","forezp");
        return "hi "+name+",i am from port:" +port;
    }

将链路数据存储在Mysql数据库中

上面的例子是将链路数据存在内存中,只要zipkin-server重启之后,之前的链路数据全部查找不到了,zipkin是支持将链路数据存储在mysql、cassandra、elasticsearch中的。
现在讲解如何将链路数据存储在Mysql数据库中。
首先需要初始化zikin存储在Mysql的数据的scheme,可以在这里查看https://github.com/openzipkin...,具体如下:

CREATE TABLE IF NOT EXISTS zipkin_spans (
  `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
  `trace_id` BIGINT NOT NULL,
  `id` BIGINT NOT NULL,
  `name` VARCHAR(255) NOT NULL,
  `parent_id` BIGINT,
  `debug` BIT(1),
  `start_ts` BIGINT COMMENT 'Span.timestamp(): epoch micros used for endTs query and to implement TTL',
  `duration` BIGINT COMMENT 'Span.duration(): micros used for minDuration and maxDuration query'
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;

ALTER TABLE zipkin_spans ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `id`) COMMENT 'ignore insert on duplicate';
ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`, `id`) COMMENT 'for joining with zipkin_annotations';
ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTracesByIds';
ALTER TABLE zipkin_spans ADD INDEX(`name`) COMMENT 'for getTraces and getSpanNames';
ALTER TABLE zipkin_spans ADD INDEX(`start_ts`) COMMENT 'for getTraces ordering and range';

CREATE TABLE IF NOT EXISTS zipkin_annotations (
  `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
  `trace_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.trace_id',
  `span_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.id',
  `a_key` VARCHAR(255) NOT NULL COMMENT 'BinaryAnnotation.key or Annotation.value if type == -1',
  `a_value` BLOB COMMENT 'BinaryAnnotation.value(), which must be smaller than 64KB',
  `a_type` INT NOT NULL COMMENT 'BinaryAnnotation.type() or -1 if Annotation',
  `a_timestamp` BIGINT COMMENT 'Used to implement TTL; Annotation.timestamp or zipkin_spans.timestamp',
  `endpoint_ipv4` INT COMMENT 'Null when Binary/Annotation.endpoint is null',
  `endpoint_ipv6` BINARY(16) COMMENT 'Null when Binary/Annotation.endpoint is null, or no IPv6 address',
  `endpoint_port` SMALLINT COMMENT 'Null when Binary/Annotation.endpoint is null',
  `endpoint_service_name` VARCHAR(255) COMMENT 'Null when Binary/Annotation.endpoint is null'
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;

ALTER TABLE zipkin_annotations ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `span_id`, `a_key`, `a_timestamp`) COMMENT 'Ignore insert on duplicate';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`, `span_id`) COMMENT 'for joining with zipkin_spans';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTraces/ByIds';
ALTER TABLE zipkin_annotations ADD INDEX(`endpoint_service_name`) COMMENT 'for getTraces and getServiceNames';
ALTER TABLE zipkin_annotations ADD INDEX(`a_type`) COMMENT 'for getTraces and autocomplete values';
ALTER TABLE zipkin_annotations ADD INDEX(`a_key`) COMMENT 'for getTraces and autocomplete values';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id`, `span_id`, `a_key`) COMMENT 'for dependencies job';

CREATE TABLE IF NOT EXISTS zipkin_dependencies (
  `day` DATE NOT NULL,
  `parent` VARCHAR(255) NOT NULL,
  `child` VARCHAR(255) NOT NULL,
  `call_count` BIGINT,
  `error_count` BIGINT
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;

ALTER TABLE zipkin_dependencies ADD UNIQUE KEY(`day`, `parent`, `child`);

在数据库中初始化上面的脚本之后,需要做的就是zipkin-server如何连接数据库。zipkin如何连数据库同连接rabbitmq一样。zipkin连接数据库的属性所对应的环境变量如下:

属性环境变量描述
zipkin.torage.typeSTORAGE_TYPE默认的为mem,即为内存,其他可支持的为cassandra、cassandra3、elasticsearch、mysql
zipkin.torage.mysql.hostMYSQL_HOST数据库的host,默认localhost
zipkin.torage.mysql.portMYSQL_TCP_PORT数据库的端口,默认3306
zipkin.torage.mysql.usernameMYSQL_USER连接数据库的用户名,默认为空
zipkin.torage.mysql.passwordMYSQL_PASS连接数据库的密码,默认为空
zipkin.torage.mysql.dbMYSQL_DBzipkin使用的数据库名,默认是zipkin
zipkin.torage.mysql.max-activeMYSQL_MAX_CONNECTIONS最大连接数,默认是10
STORAGE_TYPE=mysql MYSQL_HOST=localhost MYSQL_TCP_PORT=3306 MYSQL_USER=root MYSQL_PASS=123456 MYSQL_DB=zipkin java -jar zipkin.jar

等同于以下的命令

java -jar zipkin.jar --zipkin.torage.type=mysql --zipkin.torage.mysql.host=localhost --zipkin.torage.mysql.port=3306 --zipkin.torage.mysql.username=root --zipkin.torage.mysql.password=123456

使用上面的命令启动zipkin.jar工程,然后再浏览数上访问http://localhost:8765/hi,再访问http://localhost:9411/zipkin/,可以看到链路数据。这时去数据库查看数据,也是可以看到存储在数据库的链路数据,如下:

微信截图_20190129154520.png

这时重启应用zipkin.jar,再次在浏览器上访问http://localhost:9411/zipkin/,仍然可以得到之前的结果,证明链路数据存储在数据库中,而不是内存中。

将链路数据存在在Elasticsearch中

zipkin-server支持将链路数据存储在ElasticSearch中。读者需要自行安装ElasticSearch和Kibana,下载地址为https://www. elastic.co/products/elasticsearch。安装完成后启动,其中ElasticSearch的默认端口号为9200,Kibana的默认端口号为5601。

同理,zipkin连接elasticsearch也是从环境变量中读取的,elasticsearch相关的环境变量和对应的属性如下:

属性环境变量描述
zipkin.torage.elasticsearch.hostsES_HOSTSES_HOSTS,默认为空
zipkin.torage.elasticsearch.pipelineES_PIPELINEES_PIPELINE,默认为空
zipkin.torage.elasticsearch.max-requestsES_MAX_REQUESTSES_MAX_REQUESTS,默认为64
zipkin.torage.elasticsearch.timeoutES_TIMEOUTES_TIMEOUT,默认为10s
zipkin.torage.elasticsearch.indexES_INDEXES_INDEX,默认是zipkin
zipkin.torage.elasticsearch.date-separatorES_DATE_SEPARATORES_DATE_SEPARATOR,默认为“-”
zipkin.torage.elasticsearch.index-shardsES_INDEX_SHARDSES_INDEX_SHARDS,默认是5
zipkin.torage.elasticsearch.index-replicasES_INDEX_REPLICASES_INDEX_REPLICAS,默认是1
zipkin.torage.elasticsearch.usernameES_USERNAMEES的用户名,默认为空
zipkin.torage.elasticsearch.passwordES_PASSWORDES的密码,默认是为空

采用以下命令启动zipkin-server:


STORAGE_TYPE=elasticsearch ES_HOSTS=http://localhost:9200 ES_INDEX=zipkin java -jar zipkin.jar
java -jar zipkin.jar --STORAGE_TYPE=elasticsearch --ES_HOSTS=http://localhost:9200 --ES_INDEX=zipkin 
java -jar zipkin.jar --STORAGE_TYPE=elasticsearch --ES_HOSTS=http://localhost:9200 --ES_INDEX=zipkin 
java -jar zipkin.jar --zipkin.torage.type=elasticsearch --zipkin.torage.elasticsearch.hosts=http://localhost:9200 --zipkin.torage.elasticsearch.index=zipkin 

启动完成后,然后在浏览数上访问http://localhost:8765/hi,再访问http://localhost:9411/zipkin/,可以看到链路数据。这时链路数据存储在ElasticSearch。

在zipkin上展示链路数据

链路数据存储在ElasticSearch中,ElasticSearch可以和Kibana结合,将链路数据展示在Kibana上。安装完成Kibana后启动,Kibana默认会向本地端口为9200的ElasticSearch读取数据。Kibana默认的端口为5601,访问Kibana的主页http://localhost:5601,其界面如下图所示。

图片1.png

在上图的界面中,单击“Management”按钮,然后单击“Add New”,添加一个index。我们将在上节ElasticSearch中写入链路数据的index配置为“zipkin”,那么在界面填写为“zipkin-*”,单击“Create”按钮,界面如下图所示:

图片2.png

创建完成index后,单击“Discover”,就可以在界面上展示链路数据了,展示界面如下图所示。

图片3.png

参考资料

https://zipkin.io/

https://github.com/spring-clo...

https://cloud.spring.io/sprin...

https://github.com/openzipkin...

https://github.com/openzipkin...

https://windmt.com/2018/04/24...

https://segmentfault.com/a/11...

elatstic 版本为2.6.x,下载地址:https://www.elastic.co/downlo...

http://www.cnblogs.com/JreeyQ...

image

查看原文

赞 8 收藏 8 评论 0

Forezp 分享了头条 · 2019-01-17

『浅入浅出』MySQL 和 InnoDB

赞 0 收藏 1 评论 0

Forezp 分享了头条 · 2019-01-14

Redis 为什么这么快?

赞 0 收藏 1 评论 0

Forezp 发布了文章 · 2019-01-08

Spring Boot Admin 2.1.0 全攻略

转载请标明出处:
https://www.fangzhipeng.com
本文出自方志朋的博客

Spring Boot Admin简介

Spring Boot Admin是一个开源社区项目,用于管理和监控SpringBoot应用程序。 应用程序作为Spring Boot Admin Client向为Spring Boot Admin Server注册(通过HTTP)或使用SpringCloud注册中心(例如Eureka,Consul)发现。 UI是的AngularJs应用程序,展示Spring Boot Admin Client的Actuator端点上的一些监控。常见的功能或者监控如下:

  • 显示健康状况
  • 显示详细信息,例如

    • JVM和内存指标
    • micrometer.io指标
    • 数据源指标
    • 缓存指标
  • 显示构建信息编号
  • 关注并下载日志文件
  • 查看jvm系统和环境属性
  • 查看Spring Boot配置属性
  • 支持Spring Cloud的postable / env-和/ refresh-endpoint
  • 轻松的日志级管理
  • 与JMX-beans交互
  • 查看线程转储
  • 查看http跟踪
  • 查看auditevents
  • 查看http-endpoints
  • 查看计划任务
  • 查看和删除活动会话(使用spring-session)
  • 查看Flyway / Liquibase数据库迁移
  • 下载heapdump
  • 状态变更通知(通过电子邮件,Slack,Hipchat,......)
  • 状态更改的事件日志(非持久性)

快速开始

创建Spring Boot Admin Server

本文的所有工程的Spring Boot版本为2.1.0 、Spring Cloud版本为Finchley.SR2。案例采用Maven多module形式,父pom文件引入以下的依赖(完整的依赖见源码):


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.0.RELEASE</version>
        <relativePath/>
    </parent>
    
    
     <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>


    <spring-cloud.version>Finchley.SR2</spring-cloud.version>

在工程admin-server引入admin-server的起来依赖和web的起步依赖,代码如下:

<dependency>
    <groupId>de.codecentric</groupId>
    <artifactId>spring-boot-admin-starter-server</artifactId>
    <version>2.1.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

然后在工程的启动类AdminServerApplication加上@EnableAdminServer注解,开启AdminServer的功能,代码如下:


@SpringBootApplication
@EnableAdminServer
public class AdminServerApplication {

    public static void main(String[] args) {
        SpringApplication.run( AdminServerApplication.class, args );
    }

}

在工程的配置文件application.yml中配置程序名和程序的端口,代码如下:

spring:
  application:
    name: admin-server
server:
  port: 8769

这样Admin Server就创建好了。

创建Spring Boot Admin Client

在admin-client工程的pom文件引入admin-client的起步依赖和web的起步依赖,代码如下:


        <dependency>
            <groupId>de.codecentric</groupId>
            <artifactId>spring-boot-admin-starter-client</artifactId>
            <version>2.1.0</version>
        </dependency>
       
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

在工程的配置文件application.yml中配置应用名和端口信息,以及向admin-server注册的地址为http://localhost:8769,最后暴露自己的actuator的所有端口信息,具体配置如下:

spring:
  application:
    name: admin-client
  boot:
    admin:
      client:
        url: http://localhost:8769
server:
  port: 8768

management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: ALWAYS

在工程的启动文件如下:


@SpringBootApplication
public class AdminClientApplication {

    public static void main(String[] args) {
        SpringApplication.run( AdminClientApplication.class, args );
    }

一次启动两个工程,在浏览器上输入localhost:8769 ,浏览器显示的界面如下:

21.png

查看wallboard:

22.png

点击wallboard,可以查看admin-client具体的信息,比如内存状态信息:

23.png

也可以查看spring bean的情况:

24.png

更多监控信息,自己体验。

Spring boot Admin结合SC注册中心使用

同上一个案例一样,本案例也是使用的是Spring Boot版本为2.1.0 、Spring Cloud版本为Finchley.SR2。案例采用Maven多module形式,父pom文件引入以下的依赖(完整的依赖见源码),此处省略。

搭建注册中心

注册中心使用Eureka、使用Consul也是可以的,在eureka-server工程中的pom文件中引入:

 <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

配置eureka-server的端口信息,以及defaultZone和防止自注册。最后系统暴露eureka-server的actuator的所有端口。

spring:
  application:
    name: eureka-server
server:
  port: 8761
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka
    register-with-eureka: false
    fetch-registry: false
management:
  endpoints:
    web:
      exposure:
        include: "*"
  endpoint:
    health:
      show-details: ALWAYS

在工程的启动文件EurekaServerApplication加上@EnableEurekaServer注解开启Eureka Server.


@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {

    public static void main(String[] args) {
        SpringApplication.run( EurekaServerApplication.class, args );
    }
}

eureka-server搭建完毕。

搭建admin-server

在admin-server工程的pom文件引入admin-server的起步依赖、web的起步依赖、eureka-client的起步依赖,如下:

<dependency>
    <groupId>de.codecentric</groupId>
    <artifactId>spring-boot-admin-starter-server</artifactId>
    <version>2.1.0</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
        
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

然后配置admin-server,应用名、端口信息。并向注册中心注册,注册地址为http://localhost:8761,最后将actuator的所有端口暴露出来,配置如下:


spring:
  application:
    name: admin-server
server:
  port: 8769
eureka:
  client:
    registryFetchIntervalSeconds: 5
    service-url:
      defaultZone: ${EUREKA_SERVICE_URL:http://localhost:8761}/eureka/
  instance:
    leaseRenewalIntervalInSeconds: 10
    health-check-url-path: /actuator/health

management:
  endpoints:
    web:
      exposure:
        include: "*"
  endpoint:
    health:
      show-details: ALWAYS

在工程的启动类AdminServerApplication加上@EnableAdminServer注解,开启admin server的功能,加上@EnableDiscoveryClient注解开启eurke client的功能。


@SpringBootApplication
@EnableAdminServer
@EnableDiscoveryClient
public class AdminServerApplication {

    public static void main(String[] args) {
        SpringApplication.run( AdminServerApplication.class, args );
    }

}

搭建admin-client

在admin-client的pom文件引入以下的依赖,由于2.1.0采用webflux,引入webflux的起步依赖,引入eureka-client的起步依赖,并引用actuator的起步依赖如下:


 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

在工程的配置文件配置应用名、端口、向注册中心注册的地址,以及暴露actuator的所有端口。


spring:
  application:
    name: admin-client
eureka:
  instance:
    leaseRenewalIntervalInSeconds: 10
    health-check-url-path: /actuator/health

  client:
    registryFetchIntervalSeconds: 5
    service-url:
      defaultZone: ${EUREKA_SERVICE_URL:http://localhost:8761}/eureka/
management:
  endpoints:
    web:
      exposure:
        include: "*"
  endpoint:
    health:
      show-details: ALWAYS
server:
  port: 8762

在启动类加上@EnableDiscoveryClie注解,开启DiscoveryClient的功能。

@SpringBootApplication
@EnableDiscoveryClient
public class AdminClientApplication {

    public static void main(String[] args) {
        SpringApplication.run( AdminClientApplication.class, args );
    }
}

一次启动三个工程,在浏览器上访问localhost:8769,浏览器会显示和上一小节一样的界面。

31.png

集成spring security

在2.1.0版本中去掉了hystrix dashboard,登录界面默认集成到了spring security模块,只要加上spring security就集成了登录模块。

只需要改变下admin-server工程,需要在admin-server工程的pom文件引入以下的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-security</artifactId>
</dependency>

在admin-server工的配置文件application.yml中配置spring security的用户名和密码,这时需要在服务注册时带上metadata-map的信息,如下:

spring:
  security:
    user:
      name: "admin"
      password: "admin"
      
eureka:
  instance:
    metadata-map:
      user.name: ${spring.security.user.name}
      user.password: ${spring.security.user.password}

写一个配置类SecuritySecureConfig继承WebSecurityConfigurerAdapter,配置如下:


@Configuration
public class SecuritySecureConfig extends WebSecurityConfigurerAdapter {

    private final String adminContextPath;

    public SecuritySecureConfig(AdminServerProperties adminServerProperties) {
        this.adminContextPath = adminServerProperties.getContextPath();
    }

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        // @formatter:off
        SavedRequestAwareAuthenticationSuccessHandler successHandler = new SavedRequestAwareAuthenticationSuccessHandler();
        successHandler.setTargetUrlParameter( "redirectTo" );

        http.authorizeRequests()
                .antMatchers( adminContextPath + "/assets/**" ).permitAll()
                .antMatchers( adminContextPath + "/login" ).permitAll()
                .anyRequest().authenticated()
                .and()
                .formLogin().loginPage( adminContextPath + "/login" ).successHandler( successHandler ).and()
                .logout().logoutUrl( adminContextPath + "/logout" ).and()
                .httpBasic().and()
                .csrf().disable();
        // @formatter:on
    }
}

重启启动工程,在浏览器上访问:http://localhost:8769/,会被重定向到登录界面,登录的用户名和密码为配置文件中配置的,分别为admin和admin,界面显示如下:

32.png

集成邮箱报警功能

在spring boot admin中,也可以集成邮箱报警功能,比如服务不健康了、下线了,都可以给指定邮箱发送邮件。集成非常简单,只需要改造下admin-server即可:

在admin-server工程Pom文件,加上mail的起步依赖,代码如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-mail</artifactId>
</dependency>

在配置文件application.yml文件中,需要配置邮件相关的配置,如下:

spring.mail.host: smtp.163.com
spring.mail.username: miles02
spring.mail.password:
spring.boot.admin.notify.mail.to: 124746406@qq.com

做完以上配置后,当我们已注册的客户端的状态从 UP 变为 OFFLINE 或其他状态,服务端就会自动将电子邮件发送到上面配置的地址。

源码下载

快速开始: https://github.com/forezp/Spr...

和spring cloud结合:https://github.com/forezp/Spr...

参考资料

http://codecentric.github.io/...

https://github.com/codecentri...

更多阅读

史上最简单的 SpringCloud 教程汇总

SpringBoot教程汇总

Java面试题系列汇总
<div>

<p align="center">
    <img data-original="https://www.fangzhipeng.com/img/avatar.jpg" width="258" height="258"/>
    <br>
    扫一扫,支持下作者吧
</p>
<p align="center" style="margin-top: 15px; font-size: 11px;color: #cc0000;">
    <strong>(转载本站文章请注明作者和出处 <a href="https://www.fangzhipeng.com">方志朋的博客</a>)</strong>
</p>

</div>

查看原文

赞 22 收藏 16 评论 2

认证与成就

  • 获得 125 次点赞
  • 获得 1 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2017-04-08
个人主页被 1.1k 人浏览