青芒

青芒 查看完整档案

杭州编辑中南民族大学工商学院  |  计算机应用 编辑有赞科技有限公司  |  工程师 编辑填写个人主网站
编辑

java骚年

个人动态

青芒 关注了专栏 · 1月14日

一看就懂一学就会的前端技术

用最通俗的语言,最实际的例子介绍前端新技术

关注 601

青芒 赞了文章 · 2018-06-05

三种技术的融合

图片描述

搜索引擎技术,分析数据库技术,分布式计算引擎技术这三股力量正在快速地彼此融合。举例证如下

Hive

Hive一开始只是用sql的方式描述map/reduce的逻辑,是一个典型的分布式计算引擎。这是分布式计算引擎向OLAP方向靠拢的第一步。

Hive+Index

Hive推出不久就被发现,虽然用的SQL但是性能离数据库还差很远。很快就有人提出是不是要给Hive加上数据库一样的索引。这明显就是分布式计算引擎向分析数据库的方向靠拢。

Parquet

Parquet是一种列式文件,用于加速hive/impala这样的分布式计算引擎的查询速度。使用 parquet 加上了索引的 hive/impala/spark 这些已经很难说与 OLAP 数据库的差别是什么了。

Kylin/Presto

这些Hive的衍生物直接上来就是瞄着OLAP去的。各种sql on hadoop的方案。

Elasticsearch

另外一个方向的融合是搜索引擎技术快速地向OLAP融合。Elasticsearch公司更名为了Elastic,因为越来越多的人开始用Elasticsearch不是search,而是analytics,也就是跑SQL。
Elasticsearch底层的Lucene引入了DocValues之后,数据可以按列存储(和parquet一样),使得Elasticsearch几乎可以当成一个列式数据库来使用了。
另外Elasticsearch在Lucene的基础上大幅加强了Aggregation的功能,利用其冗长但是强大的aggregation dsl可以表达出比SQL还要复杂的聚合逻辑。
腾讯的Hermes数据库(http://data.qq.com/article?id=817)就是基于Lucene/Solr实现的分析型数据库

Crate.io

因为Elasticsearch性能实在太出众了,但是dsl接口不好使。有人拿Elasticsearch做为底层,上层封装了一个SQL接口,从何正式变成了一种数据库,叫 http://crate.io

Groonga

http://groonga.org/docs/characteristic.html
日本人写了一个搜索引擎,而这个搜索引擎同时还可以作为mysql可插拔的存储引擎使用,从而把mysql变成一种支持全文检索的列式数据库。

Spark on Elasticsearch/RDBMS

一个更加有趣的方向是Spark开始和OLAP数据库和Elasticsearch勾搭在一起。利用把Elasticsearch查询映射成Spark的RDD,可以把一条SQL的where部分放在Elasticsearch里分布式执行(所谓filter push down优化),然后把分布式的group by 和 projection 由Spark来完成。

融合

这三个技术各自有独自看重的内在实现方式
* 搜索引擎:重点是inverted index,索引的压缩存储和高效检索
* 分析数据库:重点是column oriented storage,利用列式存储快速地在查询时暴力扫描
* 分布式计算引擎:从一开始就是map reduce,关注的是分区和分布式执行

实际上三家是从不同的角度切入了同一个问题。不过这已经不是一招鲜的时代了。一个好的搜索引擎需要inverted index/column oriented storage/map reduce,三者都要。一个好的OLAP也是inverted index/column oriented storage/map reduce三个都要的。
目前从趋势上来看风头最火的是 Elasticsearch,最佳的组合是 Spark + Elasticsearch。
最科幻的未来组合是把Spark + Elasticsearch 做深度的整合,去掉 Elasticsearch 自己的分布式层,完全靠 Spark 做分布式计算。要是能再配备一个实时计算管道作为灵活的入库渠道和物化视图就更牛x了。

查看原文

赞 10 收藏 38 评论 1

青芒 发布了文章 · 2018-05-24

Dubbo剖析-集群容错

本篇主要对dubbo集群容错进行剖析,主要下面几个模块

  1. cluster容错方案
  2. Directory目录服务
  3. route 路由解析
  4. loadBalance 软负载均衡

一、调用链路


二、容错方案


集群模式的配置

<dubbo:service cluster="failsafe" /> 服务提供方
<dubbo:reference cluster="failsafe" /> 服务消费方

集群容错实现

接口类 com.alibaba.dubbo.rpc.cluster.Cluster
Cluster实现类

1.AvailableCluster
获取可用的调用。遍历所有Invokers判断Invoker.isAvalible,只要一个有为true直接调用返回,不管成不成功

2.BroadcastCluster
广播调用。遍历所有Invokers, 逐个调用每个调用catch住异常不影响其他invoker调用

3.FailbackCluster
失败自动恢复, 对于invoker调用失败, 后台记录失败请求,任务定时重发, 通常用于通知

//FailbackClusterInvoker
//记录失败的调用
private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();

protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            //失败后调用 addFailed
            addFailed(invocation, this);
            return new RpcResult(); // ignore
        }
    }

private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
    if (retryFuture == null) {
        synchronized (this) {
            if (retryFuture == null) {
                retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

                    public void run() {
                        // 收集统计信息
                        try {
                            retryFailed();
                        } catch (Throwable t) { // 防御性容错
                            logger.error("Unexpected error occur at collect statistic", t);
                        }
                    }
                }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
            }
        }
    }
    failed.put(invocation, router);
}

//失败的进行重试,重试成功后移除当前map
void retryFailed() {
        if (failed.size() == 0) {
            return;
        }
        for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
                failed).entrySet()) {
            Invocation invocation = entry.getKey();
            Invoker<?> invoker = entry.getValue();
            try {
                invoker.invoke(invocation);
                failed.remove(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
            }
        }
    }

4.FailfastCluster
快速失败,只发起一次调用,失败立即保错,通常用于非幂等性操作

5.FailoverCluster default
失败转移,当出现失败,重试其它服务器,通常用于读操作,但重试会带来更长延迟
(1) 目录服务directory.list(invocation) 列出方法的所有可调用服务
获取重试次数,默认重试两次

int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;

(2) 根据LoadBalance负载策略选择一个Invoker
(3) 执行invoker.invoke(invocation)调用
(4) 调用成功返回
调用失败小于重试次数,重新执行从3)步骤开始执行,调用次数大于等于重试次数抛出调用失败异常

6.FailsafeCluster
失败安全,出现异常时,直接忽略,通常用于写入审计日志等操作。

7.ForkingCluster
并行调用,只要一个成功即返回,通常用于实时性要求较高的操作,但需要浪费更多服务资源。

注:
还有 MergeableCluster 和 MockClusterWrapper策略,但是个人没有用过所以就不说了

三、Directory目录服务


1. StaticDirectory

静态目录服务, 它的所有Invoker通过构造函数传入, 服务消费方引用服务的时候, 服务对多注册中心的引用,将Invokers集合直接传入 StaticDirectory构造器

public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
    super(url == null && invokers != null && invokers.size() > 0 ? invokers.get(0).getUrl() : url, routers);
    if (invokers == null || invokers.size() == 0)
        throw new IllegalArgumentException("invokers == null");
    this.invokers = invokers;
}

StaticDirectory的list方法直接返回所有invoker集合

@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
    return invokers;
}

2. RegistryDirectory

注册目录服务, 它的Invoker集合是从注册中心获取的, 它实现了NotifyListener接口实现了回调接口notify(List<Url>)。

比如消费方要调用某远程服务,会向注册中心订阅这个服务的所有服务提供方,订阅时和服务提供方数据有变动时回调消费方的NotifyListener服务的notify方法NotifyListener.notify(List<Url>) 回调接口传入所有服务的提供方的url地址然后将urls转化为invokers, 也就是refer应用远程服务到此时引用某个远程服务的RegistryDirectory中有对这个远程服务调用的所有invokers。

RegistryDirectory.list(invocation)就是根据服务调用方法获取所有的远程服务引用的invoker执行对象

四、服务路由


dubbo路由功能貌似用的不多,目的主要是对已注册的服务进行过滤,比如只能调用某些配置的服务,或者禁用某些服务。

1. ConditionRouter条件路由

dubbo-admin 后台进行配置。

路由代码入口

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
            throws RpcException {
    if (invokers == null || invokers.size() == 0) {
        return invokers;
    }
    try {
        if (!matchWhen(url, invocation)) {
            return invokers;
        }
        List<Invoker<T>> result = new ArrayList<Invoker<T>>();
        if (thenCondition == null) {
            logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
            return result;
        }
    .............................

2. ScriptRouter脚本路由

按照dubbo脚本规则进行编写,程序识别

五、软负载均衡


1. RandomLoadBalance default

随机,按权重设置随机概率。权重default=100
在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 总个数
        int totalWeight = 0; // 总权重
        boolean sameWeight = true; // 权重是否都一样
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // 累计总权重
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false; // 计算所有权重是否一样
            }
        }
        if (totalWeight > 0 && !sameWeight) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offset = random.nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // 如果权重相同或权重为0则均等随机
        return invokers.get(random.nextInt(length));
    }
算法含义
如果所有的服务权重都一样,就采用总服务数进行随机。如果权重不一样,则按照权重出随机数,然后用随机数减去服务权重,结果为负数则使用当前循环的服务。其实也就是一个概率性问题 每个服务的概率就是 当前服务的权重/ 总服务权重

2. RoundRobinLoadBalance

轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

该负载算法维护着一个方法调用顺序计数

private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

以方法名作为key

轮循分为 普通轮询和加权轮询。权重一样时,采用取模运算普通轮询,反之加权轮询。

下面看下具体的实现
RoundRobinLoadBalance#doSelect

i.普通轮询

AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
    sequences.putIfAbsent(key, new AtomicPositiveInteger());
    sequence = sequences.get(key);
}
//获取本次调用的服务器序号,并+1
int currentSequence = sequence.getAndIncrement();

//当前序号和服务总数取模
return invokers.get(currentSequence % length);

ii.加权轮询
下面贴下核心实现代码。注意几个变量

weightSum = 服务权重之和
invokerToWeightMap = 权重>0的 invoker map
int currentSequence = sequence.getAndIncrement();
if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样

    // mod < weightSum,下面for循环进行weight递减,weight大的服务被调用的概率大
    int mod = currentSequence % weightSum;
    for (int i = 0; i < maxWeight; i++) {
        for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
            final Invoker<T> k = each.getKey();
            final IntegerWrapper v = each.getValue();
            if (mod == 0 && v.getValue() > 0) {
                return k;
            }
            if (v.getValue() > 0) {
                v.decrement();
                mod--;
            }
        }
    }
}

可以举个例子
两个服务 A 和 B,权重分别是1和2
那么 mod=[0,1,2],经过上面的逻辑,调用概率是 A B B A B B A B B ..... 显然B的概率更大一些

3. LeastActiveLoadBalance

最少活跃调用数优先,活跃数指调用前后计数差。使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

每个服务有一个活跃计数器,我们假如有A,B两个提供者.计数均为0.当A提供者开始处理请求,该计数+1,此时A还没处理完,当处理完后则计数-1.而B请求接收到请求处理得很快.B处理完后A还没处理完,所以此时A,B的计数为1,0.那么当有新的请求来的时候,就会选择B提供者(B的活跃计数比A小).这就是文档说的,使慢的提供者收到更少请求。

int leastCount = 0; // 相同最小活跃数的个数
int[] leastIndexs = new int[length]; // 相同最小活跃数的下标

i.最小活跃服务个数=1, 该服务优先

if (leastCount == 1) {
    // 如果只有一个最小则直接返回
    return invokers.get(leastIndexs[0]);
}

ii.最小活跃服务个数>1, 最小活跃的服务按照权重随机

if (!sameWeight && totalWeight > 0) {
    // 如果权重不相同且权重大于0则按总权重数随机
    int offsetWeight = random.nextInt(totalWeight);
    // 并确定随机值落在哪个片断上
    for (int i = 0; i < leastCount; i++) {
        int leastIndex = leastIndexs[i];
        //权重越大,offsetWeight越快减成负数
        offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
        if (offsetWeight <= 0)
            return invokers.get(leastIndex);
    }
}

iii. 最小活跃服务个数>1, 权重相同,服务个数随机

// 如果权重相同或权重为0则均等随机
return invokers.get(leastIndexs[random.nextInt(leastCount)]);

4. ConsistentHashLoadBalance

  • 一致性 Hash,相同参数的请求总是发到同一提供者。
  • 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
  • 算法参见:http://en.wikipedia.org/wiki/Consistent_hashing
  • 缺省只对第一个参数 Hash,如果要修改,请配置 <dubbo:parameter key="hash.arguments" value="0,1" />
  • 缺省用 160 份虚拟节点,如果要修改,请配置 <dubbo:parameter key="hash.nodes" value="320" />
配置样例
<dubbo:reference id="demoService" interface="com.youzan.dubbo.api.DemoService" loadbalance="consistenthash">
    <!--缺省只对第一个参数 Hash-->
    <dubbo:parameter key="hash.arguments" value="0,1" />
    <!--缺省用 160 份虚拟节点,-->
    <dubbo:parameter key="hash.nodes" value="160" />
</dubbo:reference>
算法解析

ConsistentHashLoadBalance为使用该算法的服务维护了一个selectors,

key=invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName()
eg: com.youzan.dubbo.api.DemoService.sayHello
#com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance

private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    int identityHashCode = System.identityHashCode(invokers);

    //获取该服务的ConsistentHashSelector,并跟进本次调用获取对应invoker
    ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
    if (selector == null || selector.getIdentityHashCode() != identityHashCode) {
        selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
        selector = (ConsistentHashSelector<T>) selectors.get(key);
    }
    return selector.select(invocation);
}

ConsistentHashSelector作为ConsistentHashLoadBalance的内部类, 就是具体的一致性hash实现。

  • ConsistentHashSelector内部元素
#com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance.ConsistentHashSelector

//该服务的所有hash节点
private final TreeMap<Long, Invoker<T>> virtualInvokers;
//虚拟节点数量
private final int replicaNumber;
//该服务的唯一hashcode,通过System.identityHashCode(invokers)获取
private final int identityHashCode;
  • 如何构建该服务的虚拟节点?
public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
    // 创建TreeMap 来保存结点
    this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
    // 生成调用结点HashCode
    this.identityHashCode = System.identityHashCode(invokers);
    // 获取Url 
    //dubbo://192.168.0.4:20880/com.youzan.dubbo.api.DemoService?anyhost=true&application=consumer-of-helloworld-app&check=false&class=com.youzan.dubbo.provider.DemoServiceImpl&dubbo=2.5.4&generic=false&hash.arguments=0,1&hash.nodes=160&interface=com.youzan.dubbo.api.DemoService&loadbalance=consistenthash&methods=sayHello&pid=32710&side=consumer&timestamp=1527383363936
    URL url = invokers.get(0).getUrl();
    // 获取所配置的结点数,如没有设置则使用默认值160
    this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
    // 获取需要进行hash的参数数组索引,默认对第一个参数进行hash
    String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
    argumentIndex = new int[index.length];
    for (int i = 0; i < index.length; i ++) {
        argumentIndex[i] = Integer.parseInt(index[i]);
    }
    // 创建虚拟结点
    // 对每个invoker生成replicaNumber个虚拟结点,并存放于TreeMap中
    for (Invoker<T> invoker : invokers) {
        for (int i = 0; i < replicaNumber / 4; i++) {
            // 根据md5算法为每4个结点生成一个消息摘要,摘要长为16字节128位。
            byte[] digest = md5(invoker.getUrl().toFullString() + i);
            // 随后将128位分为4部分,0-31,32-63,64-95,95-128,并生成4个32位数,存于long中,long的高32位都为0
            // 并作为虚拟结点的key。
            for (int h = 0; h < 4; h++) {
                long m = hash(digest, h);
                virtualInvokers.put(m, invoker);
            }
        }
    }
}

代码如果看的不是很懂,也不用去深究了(我就没看懂,瞻仰了网上大神的文章贴了帖注释),大家可以就粗略的认为,这段代码就是尽可能的构建出散列均匀的服务hash表。

  • 如何从virtualInvokers选取本次调用的invoker?
// 选择invoker
public Invoker<T> select(Invocation invocation) {
    // 根据调用参数来生成Key
    String key = toKey(invocation.getArguments());
    // 根据这个参数生成消息摘要
    byte[] digest = md5(key);
    //调用hash(digest, 0),将消息摘要转换为hashCode,这里仅取0-31位来生成HashCode
    //调用sekectForKey方法选择结点。
    Invoker<T> invoker = sekectForKey(hash(digest, 0));
    return invoker;
}

private String toKey(Object[] args) {
    StringBuilder buf = new StringBuilder();
    // 由于hash.arguments没有进行配置,因为只取方法的第1个参数作为key
    for (int i : argumentIndex) {
        if (i >= 0 && i < args.length) {
            buf.append(args[i]);
        }
    }
    return buf.toString();
}

//根据hashCode选择结点
private Invoker<T> sekectForKey(long hash) {
    Invoker<T> invoker;
    Long key = hash;
    // 若HashCode直接与某个虚拟结点的key一样,则直接返回该结点
    if (!virtualInvokers.containsKey(key)) {
        // 若不一致,找到一个比传入的key大的第一个结点。
        SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
        // 若不存在,那么选择treeMap中第一个结点
        // 使用TreeMap的firstKey方法,来选择最小上界。
        if (tailMap.isEmpty()) {
            key = virtualInvokers.firstKey();
        } else {
           // 若存在则返回
            key = tailMap.firstKey();
        }
    }
    invoker = virtualInvokers.get(key);
    return invoker;
}
  • 一致性hash环是什么东东?和上面的算法什么关系?

ConsistentHashSelector.virtualInvokers这个东西就是我们的服务hash节点,单纯的从数据结构上的确看不到什么环状的存在,可以先示意下,当前的数据结构
selector结构

virtualInvokers

我们的服务节点只是一个普通的 map数据存储而已,如何形成环呢?其实所谓的环只是逻辑上的展现,ConsistentHashSelector.sekectForKey()方法里通过 TreeMap.tailMap()、TreeMap.tailMap().firstKey、TreeMap.tailMap().firstKey() 结合case实现了环状逻辑。下面我们画图说话。

第一步原始数据结构,我们按照hash从小到大排列

A,B,C表示我们提供的服务,改示意图假设服务节点散列均匀

第二步选择服务节点

i. 假设本地调用得到的key=2120, 代码逻辑(指ConsistentHashSelector.sekectForKey)走到tailMap.firstKey()

那么读取到 3986 A服务

ii.假设本地调用得到的key=9991, tailMap为空,逻辑走到 virtualInvokers.firstKey() 回到起点

读取到 1579 A服务

上述两部情况基本已经能够描述清楚节点的选择逻辑,至于hash直接命中,那么读取对应的服务即可,无需多讲。

最后环状形成
上面两部的介绍已经描述hash算法,那么我们所谓的环状是怎么一回事呢?其实也就是为了方便更好的理解这个逻辑,我们将线性的hash排列作为环状,然后hash的选择按照顺时针方向选择节点(等价于上面hash比较大小)
环状示意
节点选择算法与上面等价,本图主要用来示意,理想的hash环hash差距应该是等差,均匀的排列。

参考:
https://blog.csdn.net/column/details/learningdubbo.html?&page=1
https://blog.csdn.net/revivedsun/article/details/71022871
https://www.jianshu.com/p/53feb7f5f5d9

查看原文

赞 2 收藏 1 评论 0

青芒 赞了文章 · 2018-05-08

Dubbo SPI机制和IOC

SPI机制

SPI,即(service provider interface)机制,有很多组件的实现,如日志、数据库访问等都是采用这样的方式,一般通用组件为了提升可扩展性,基于接口编程,将操作接口形成标准规范,但是可以开放多种扩展实现,这种做法也符合开闭设计原则,使组件具有可插拨特性。不同的厂商或组织可以基于规范推出自己的实现,只需要在自己的jar包中通过配置文件和相应的实现类即可以实现扩展。甚至开发者自己也可以很方便对框架进行定制化实现。

JDK SPI介绍

JDK实现spi服务查找: ServiceLoader。
举个例子:
首先定义下示例接口

package com.example;

public interface Spi {

       booleanisSupport(String name);

       String sayHello();

}

ServiceLoader会遍历所有jar查找META-INF/services/com.example.Spi文件

A厂商提供实现

package com.a.example;

public class SpiAImpl implements Spi {

       publicboolean isSupport(String name) {

              return"SPIA".equalsIgnoreCase(name.trim()); 

}

public String syaHello() {

       return “hello 我是厂商A”;

}

}

在A厂商提供的jar包中的META-INF/services/com.example.Spi文件内容为:

com.a.example.SpiAImpl #厂商A的spi实现全路径类名

B厂商提供实现

package com.b.example;

public class SpiBImpl implements Spi {

       publicboolean isSupport(String name) {

              return"SPIB".equalsIgnoreCase(name.trim()); 

}

public String syaHello() {

       return “hello 我是厂商B”;

}

}

在B厂商提供的jar包中的META-INF/services/com.example.Spi文件内容为:

com.b.example.SpiBImpl #厂商B的spi实现全路径类名

ServiceLoader.load(Spi.class)读取厂商A、B提供jar包中的文件,ServiceLoader实现了Iterable接口可通过while for循环语句遍历出所有实现。

一个接口多种实现,就如策略模式一样提供了策略的实现,但是没有提供策略的选择, 使用方可以根据isSupport方法根据业务传入厂商名来选择具体的厂商。

public class SpiFactory {

       //读取配置获取所有实现

       privatestatic ServiceLoader spiLoader = ServiceLoader.load(Spi.class);

       //根据名字选取对应实现

       publicstatic Spi getSpi(String name) {

              for(Spi spi : spiLoader) {

                     if(spi.isSupport(name) ) {

                            returnspi;

                     }

              }

              returnnull;

}

}

Duddo SPI

Dubbo 改进了 JDK 标准的 SPI 的以下问题:
  • JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源。
  • 如果扩展点加载失败,连扩展点的名称都拿不到了。
  • 增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点。

示例

在扩展类的jar包内,放置扩展点配置文件 META-INF/dubbo/接口全限定名,内容为:配置名=扩展实现类全限定名,多个实现类用换行符分隔。

以扩展 Dubbo 的协议为例,在协议的实现 jar 包内放置文本文件:META-INF/dubbo/com.alibaba.dubbo.rpc.Protocol,内容为:

xxx=com.alibaba.xxx.XxxProtocol

实现类内容:

package com.alibaba.xxx;

import com.alibaba.dubbo.rpc.Protocol;

public class XxxProtocol implemenets Protocol { 
    // ...
}

ExtensionLoad

dubbo扩展机制的实现核心类是ExtensionLoad,几乎所有扩展实现都在这个类里面。每个可扩展接口的扩展实现类和实现实例的都管理通过是ExtensionLoad进行,每个接口维护一个单例的ExtensionLoad,所有可扩展接口的实现都维护在ExtensionLoad中,如下所示:

/**
 * SPI 类和ExtensionLoader映射
 */
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();

在单例模式中,最典型的实现就是通过私有构造方法实现的:

private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }

在dubbo扩展点实现过程中,有几个重要的特性需要提前了解一下:

扩展点自动包装

自动包装扩展点的 Wrapper 类。ExtensionLoader 在加载扩展点时,如果加载到的扩展点有拷贝构造函数,则判定为扩展点 Wrapper 类。

Wrapper类内容:

package com.alibaba.xxx;

import com.alibaba.dubbo.rpc.Protocol;

public class XxxProtocolWrapper implemenets Protocol {
    Protocol impl;

    public XxxProtocol(Protocol protocol) { impl = protocol; }

    // 接口方法做一个操作后,再调用extension的方法
    public void refer() {
        //... 一些操作
        impl.refer();
        // ... 一些操作
    }

    // ...
}

Wrapper 类同样实现了扩展点接口,但是 Wrapper 不是扩展点的真正实现。它的用途主要是用于从 ExtensionLoader 返回扩展点时,包装在真正的扩展点实现外。即从 ExtensionLoader 中返回的实际上是 Wrapper 类的实例,Wrapper 持有了实际的扩展点实现类。这个是典型的装饰者模式,即真正的实现类是被包装在Wrapper之中,Wrapper类还做一些其它事情。

扩展点自动装配

加载扩展点时,自动注入依赖的扩展点。加载扩展点时,扩展点实现类的成员如果为其它扩展点类型,ExtensionLoader 在会自动注入依赖的扩展点。ExtensionLoader 通过扫描扩展点实现类的所有 setter 方法来判定其成员。即 ExtensionLoader 会执行扩展点的拼装操作。这个类似于Spring的IOC,后面会专门介绍。

扩展点自适应

在调用过程,自动选择一个扩展实现执行,一个扩展点只允许有一个自适应实现。dubbo通过@Adaptive注解标定自适应实现,这个注解可以在实现类上,也可以在方法上。比如ExtensionFactory的自适应实现就是通过在实现类AdaptiveExtensionFactory上加@Adaptive注解实现的:

@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
   ...
}

如 Cluster就是通过在方法加@Adaptive实现的:

@SPI(FailoverCluster.NAME)
public interface Cluster {

    /**
     * Merge the directory invokers to a virtual invoker.
     * 
     * @param <T>
     * @param directory
     * @return cluster invoker
     * @throws RpcException
     */
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;

}

这两种方式的自适应扩展类的实现方式也不同,在类上加注解是通过在实现上标识该类为自适应实现类,而在方法上加注解的,是通过动态代码生成自适应实现类。

扩展点自动激活

对于集合类扩展点,比如:Filter, InvokerListener, ExportListener, TelnetHandler, StatusChecker 等,可以同时加载多个实现,此时,可以用自动激活来简化配置

在ExtensionLoader中比较重要的公用方法就是这些:

  • getExtensionLoader
  • getAdaptiveExtension
  • getActivateExtension

下面就详细剖析一下ExtensionLoader的实现流程。

获取ExtensionLoader流程

每个可扩展接口对应的ExtensionLoader都是单例,唯一获取ExtensionLoader对象的入口就是ExtensionLoader::getExtensionLoader方法,如主要流程图:
图片描述

  • 首先通过ExtensionLoader::getExtensionLoader
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        if (type == null)
            throw new IllegalArgumentException("Extension type == null");
        if(!type.isInterface()) {
            throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
        }
        if(!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type(" + type + 
                    ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
        }
        
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }

会校验尝试获取Loader的接口是否有@SPI注解,先在缓存中找,如果没有缓存,则调用私有构造方法:

private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }

这里有个重要的对象objectFactory,这个对象的作用就是自动装配依赖,也就是IOC,可以看出,除了ObjectFactory本身,所有扩展点都有ObjectFactory实例,这个也是通过SPI管理的,它是通过getAdaptiveExtension()方法获取,这就是后面要介绍自适应扩展实现,有关ObjectFactory的内容会在后面IOC中详细分析。

自适应扩展

我们从getAdaptiveExtension()方法切入,这个方法要完成的任务就是获取该扩展点的自适应实现实例,其流程如下图所示:
图片描述

主要完成以下工作:

  • 1.检查自适应缓存是否存在。
  • 2.如果缓存未命中,则开始自适应例构建过程。
  • 3.要构建自适应实例,先要有自适应的实现类,实现类有两种方式:一种通过配置文件,一种是通过是字节码的方式动态生成。

配置文件配置的自适应类通过在实现类上面加@Adaptive注解,如

.....
@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
}

字节码生成的自适应实现类是在方法层面@Adaptive注解,如

@SPI("dubbo")
public interface Protocol {
    
    ....省略代码
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    void destroy();

}
  • 4.优先加载配置文件,将自适应实现类缓存在cachedAdaptiveClass中,同时通过加载配置文件,也将激活实现缓存在cachedActivates之中,这个在后面的激活实现中有用到。将包装类实例缓存在cachedWrapperClasses。可以简单一窥加载配置文件的代码
private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
        String fileName = dir + type.getName();
        try {
            Enumeration<java.net.URL> urls;
            ClassLoader classLoader = findClassLoader();
            if (classLoader != null) {
                urls = classLoader.getResources(fileName);
            } else {
                urls = ClassLoader.getSystemResources(fileName);
            }
            if (urls != null) {
                while (urls.hasMoreElements()) {
                    java.net.URL url = urls.nextElement();
                    try {
                        BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
                        try {
                            String line = null;
                            while ((line = reader.readLine()) != null) {
                                final int ci = line.indexOf('#');
                                if (ci >= 0) line = line.substring(0, ci);
                                line = line.trim();
                                if (line.length() > 0) {
                                    try {
                                        String name = null;
                                        int i = line.indexOf('=');
                                        if (i > 0) {
                                            name = line.substring(0, i).trim();
                                            line = line.substring(i + 1).trim();
                                        }
                                        if (line.length() > 0) {
                                            Class<?> clazz = Class.forName(line, true, classLoader);
                                            //配置的实现必须实现该接口
                                            if (! type.isAssignableFrom(clazz)) {
                                                throw new IllegalStateException("Error when load extension class(interface: " +
                                                        type + ", class line: " + clazz.getName() + "), class " 
                                                        + clazz.getName() + "is not subtype of interface.");
                                            }
                                            if (clazz.isAnnotationPresent(Adaptive.class)) {
                                                //如果是自适应实现
                                                if(cachedAdaptiveClass == null) {
                                                    cachedAdaptiveClass = clazz;
                                                } else if (! cachedAdaptiveClass.equals(clazz)) {
                                                    //只允许有一个自适应实现类
                                                    throw new IllegalStateException("More than 1 adaptive class found: "
                                                            + cachedAdaptiveClass.getClass().getName()
                                                            + ", " + clazz.getClass().getName());
                                                }
                                            } else {
                                                //如果不是自适应类
                                                try {
                                                    //判断是不是包装类,即是否有接口的构造方法
                                                    clazz.getConstructor(type);
                                                    Set<Class<?>> wrappers = cachedWrapperClasses;
                                                    if (wrappers == null) {
                                                        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                                        wrappers = cachedWrapperClasses;
                                                    }
                                                    wrappers.add(clazz);
                                                } catch (NoSuchMethodException e) {
                                                    //不是包装类
                                                    clazz.getConstructor();
                                                    if (name == null || name.length() == 0) {
                                                        //找到Extension注解
                                                        name = findAnnotationName(clazz);
                                                        if (name == null || name.length() == 0) {
                                                            //如果Extension注解没有默认名称,则根据类的名称关系判断
                                                            if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                                    && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                                                //如果实现类和接口有名称上关系,比如XXImpl则将后面的作为实现类标识
                                                                name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                                            } else {
                                                                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                                            }
                                                        }
                                                    }
                                                    String[] names = NAME_SEPARATOR.split(name);
                                                    if (names != null && names.length > 0) {
                                                        Activate activate = clazz.getAnnotation(Activate.class);
                                                        if (activate != null) {
                                                            cachedActivates.put(names[0], activate);
                                                        }
                                                        for (String n : names) {
                                                            if (! cachedNames.containsKey(clazz)) {
                                                                cachedNames.put(clazz, n);
                                                            }
                                                            Class<?> c = extensionClasses.get(n);
                                                            if (c == null) {
                                                                extensionClasses.put(n, clazz);
                                                            } else if (c != clazz) {
                                                                throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    } catch (Throwable t) {
                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                                        exceptions.put(line, e);
                                    }
                                }
                            } // end of while read lines
                        } finally {
                            reader.close();
                        }
                    } catch (Throwable t) {
                        logger.error("Exception when load extension class(interface: " +
                                            type + ", class file: " + url + ") in " + url, t);
                    }
                } // end of while urls
            }
        } catch (Throwable t) {
            logger.error("Exception when load extension class(interface: " +
                    type + ", description file: " + fileName + ").", t);
        }
    }
  • 5.加载完配置文件后,检查缓存的自适应实现类,若没有,则通过字节码技术生成自适应类。调用createAdaptiveExtensionClass()方法,实际上是通过拼接class文本,然后通过compiler编译文本生成class,这里又是一个自适应的类AdaptiveCompiler。
private Class<?> getAdaptiveExtensionClass() {
        //先通过配置文化加载实现类,并且识别自适应实现类
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        //如果没有通过Adaptive注解标识的自适应实现类,则通过字节码创建
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }
....省略代码
private Class<?> createAdaptiveExtensionClass() {
        String code = createAdaptiveExtensionClassCode();
        ClassLoader classLoader = findClassLoader();
        com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }
@Adaptive
public class AdaptiveCompiler implements Compiler {

    private static volatile String DEFAULT_COMPILER;

    public static void setDefaultCompiler(String compiler) {
        DEFAULT_COMPILER = compiler;
    }

    public Class<?> compile(String code, ClassLoader classLoader) {
        Compiler compiler;
        ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);
        String name = DEFAULT_COMPILER; // copy reference
        if (name != null && name.length() > 0) {
            compiler = loader.getExtension(name);
        } else {
            compiler = loader.getDefaultExtension();
        }
        return compiler.compile(code, classLoader);
    }

}

下面是通过字节码动态生成的Protocol接口的自适应扩展Protocol$Adpative:

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null) 
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) 
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) 
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");

com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}

public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {

if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;

String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );

if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");

com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);

return extension.refer(arg0, arg1);
}

public void destroyServer() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroyServer() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
}

自动激活扩展

在dubbo中,某些组件可以同时有多个实现同时加载时,就可以通过@Activate注解自动激活,常见的自动激活扩展,如过滤器Filter,有顺序要求,提供了三个排序属性,before、after和order。还有一些过滤条件,主要是通过分组和key,如Provider和consumer的过滤逻辑可能就不一样,Activate的源码如下:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
    /**
     * Group过滤条件。
     */
    String[] group() default {};

    
    String[] value() default {};

    /**
     * 排序信息,可以不提供。
     */
    String[] before() default {};

    /**
     * 排序信息,可以不提供。
     */
    String[] after() default {};

    /**
     * 排序信息,可以不提供。
     */
    int order() default 0;
}

在ExtensionLoader中,有三个重载获取激活扩展实现的方法:

  • public List<T> getActivateExtension(URL url, String[] values, String group)
  • public List<T> getActivateExtension(URL url, String[] values)
  • public List<T> getActivateExtension(URL url, String key, String group)

后面两个也是通过调用第一个重载方法实现,下面来分析一下它的源码:

public List<T> getActivateExtension(URL url, String[] values, String group) {
        List<T> exts = new ArrayList<T>();
        List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
        if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
            getExtensionClasses();
            for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
                String name = entry.getKey();
                Activate activate = entry.getValue();
                if (isMatchGroup(group, activate.group())) {
                    T ext = getExtension(name);
                    if (! names.contains(name)
                            && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
                            && isActive(activate, url)) {
                        exts.add(ext);
                    }
                }
            }
            Collections.sort(exts, ActivateComparator.COMPARATOR);
        }
        List<T> usrs = new ArrayList<T>();
        for (int i = 0; i < names.size(); i ++) {
            String name = names.get(i);
            if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                    && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
                if (Constants.DEFAULT_KEY.equals(name)) {
                    if (usrs.size() > 0) {
                        exts.addAll(0, usrs);
                        usrs.clear();
                    }
                } else {
                    T ext = getExtension(name);
                    usrs.add(ext);
                }
            }
        }
        if (usrs.size() > 0) {
            exts.addAll(usrs);
        }
        return exts;
    }

这个方法所做的工作无非就是在之前加载配置时缓存的cachedActivates中过滤查询符合条件的自动激动实例,并根据@Activate注解中配置的排序规则排序。

IOC注入

在创建自适应实例时,都会调用ExtensionLoader的injectExtension方法:

private T createAdaptiveExtension() {
        try {
            //传入自适应实例注入到ExtensionLoader
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
        }
    }
    
private T injectExtension(T instance) {
        try {
            //必须要有对象工厂
            if (objectFactory != null) {
                for (Method method : instance.getClass().getMethods()) {
                    if (method.getName().startsWith("set")
                            && method.getParameterTypes().length == 1
                            && Modifier.isPublic(method.getModifiers())) {
                        Class<?> pt = method.getParameterTypes()[0];
                        try {
                            String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                            Object object = objectFactory.getExtension(pt, property);
                            if (object != null) {
                                method.invoke(instance, object);
                            }
                        } catch (Exception e) {
                            logger.error("fail to inject via method " + method.getName()
                                    + " of interface " + type.getName() + ": " + e.getMessage(), e);
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }    

然后我们看到了ExtensionFactory对象,dubbo中的IOC实例是通过ExtensionFactory实现的,其实就是检测扩展实现类有没有通过set方法设置的属性,如果有,就通过ExtensionFactory加载而设置。
ExtensionFactory的类实现体系:
图片描述

在构造ExtensionLoader对象时,有个对象extensionFactory是必须要创建的,可以看到它就是用自适应实例,而ExtensionFatocry的自适应实例便是AdaptiveExtensionFactory,通过下面它的源码,我们可以发现,它维护了其他非自适应扩展实例,其实也就两个SpiExtensionFactory和SpringExtensionFactory。尝试用这两个实例去加载,加载到便返回。

@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
    
    private final List<ExtensionFactory> factories;
    
    public AdaptiveExtensionFactory() {
        ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
        List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
        for (String name : loader.getSupportedExtensions()) {
            list.add(loader.getExtension(name));
        }
        factories = Collections.unmodifiableList(list);
    }

    public <T> T getExtension(Class<T> type, String name) {
        for (ExtensionFactory factory : factories) {
            T extension = factory.getExtension(type, name);
            if (extension != null) {
                return extension;
            }
        }
        return null;
    }

}

ExtensionFatocry 可以理解为对象工厂,只不过这里的对应就是Dubbo中的扩展Extension,AdaptiveExtensionFactory可以理解为通用扩展实现获取的入口,至于具体的获取方式分为两种,如果一种是通过Dubbo 自己的SPI方式加载到的扩展,同时还支持复用Srping 的方式,可以看看这两种实现的代码便可知:

public class SpiExtensionFactory implements ExtensionFactory {

    public <T> T getExtension(Class<T> type, String name) {
        if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
            ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
            if (loader.getSupportedExtensions().size() > 0) {
                return loader.getAdaptiveExtension();
            }
        }
        return null;
    }

}

public class SpringExtensionFactory implements ExtensionFactory {
    
    private static final Set<ApplicationContext> contexts = new ConcurrentHashSet<ApplicationContext>();
    
    public static void addApplicationContext(ApplicationContext context) {
        contexts.add(context);
    }

    public static void removeApplicationContext(ApplicationContext context) {
        contexts.remove(context);
    }

    @SuppressWarnings("unchecked")
    public <T> T getExtension(Class<T> type, String name) {
        for (ApplicationContext context : contexts) {
            if (context.containsBean(name)) {
                Object bean = context.getBean(name);
                if (type.isInstance(bean)) {
                    return (T) bean;
                }
            }
        }
        return null;
    }

}

总结

作为贯穿整个Dubbo设计始终的思想,SPI在整个框架中随处可见,本文围绕ExtensionLoader扩展点机制,通过一些dubbo组件扩展示例,分析了其核心源码和流程。希望可以对于理解Dubbo的扩展点乃至dubbo源码解析过程中有所帮助,最后总结几点:

  • 对于每个扩展点,只维护一个ExtensionLoad,具体扩展实现类和实例,都是通过相应的ExtensionLoader获取的。
  • 针对每个扩展实现的实例都是单例的,所以在扩展实现时应保证线程安全。
  • 自适应实现只能有一个,自适应实现类获取有两种方式,一种是通过配置文件,这种就是针对@Adaptive注解在类级别的时,而@Adaptive注解在方法级别时,自适应实现类就需要通过字符码动态生成。
  • 自动激活扩展实现可以有多个,一般情况下,采用自动激动方式扩展的一般都会有多个,正因为有多个,自动激动扩展实现可能有顺序性,且可以分组。

参考

http://dubbo.apache.org/books...
https://blog.csdn.net/jdluoji...

查看原文

赞 5 收藏 4 评论 1

青芒 发布了文章 · 2018-03-29

Hystrix指标窗口实现原理

一、引子

Hystrix是一个熔断中间件,能够实现fast-fail并走备用方案。Hystrix基于滑动窗口判定服务失败占比选择性熔断。滑动窗口的实现方案有很多种,指标计数也有很多种实现常见的就是AtomicInteger进行原子增减维护计数,具体的方案就不探讨了。

Hystrix是基于Rxjava去实现的,那么如何利用RxJava实现指标的汇聚和滑动窗口实现呢?当然本篇不是作为教程去介绍RxJava的使用姿势,本篇文章主要解说Hystrix是什么一个思路完成这项功能。

二、指标数据上传

看HystrixCommand执行的主入口

public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;

    final Action0 terminateCommandCleanup = new Action0() {

        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                handleCommandEnd(true); //user code did run
            }
        }
    };

    //mark the command as CANCELLED and store the latency (in addition to standard cleanup)
    final Action0 unsubscribeCommandCleanup = new Action0() {
        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
                .......省略干扰代码...........
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
                .......省略干扰代码...........
                handleCommandEnd(true); //user code did run
            }
        }
    };

   .......省略干扰代码...........

    return Observable.defer(new Func0<Observable<R>>() {

    .......省略干扰代码...........

            return afterCache
                    .doOnTerminate(terminateCommandCleanup) 
                    .doOnUnsubscribe(unsubscribeCommandCleanup) 
                    .doOnCompleted(fireOnCompletedHook);
        }
});

我们的主入口Observable当doOnTerminate doOnUnsubscribe 的时候触发 handleCommandEnd 方法,从字面意思就是当command执行结束处理一些事情。

private void handleCommandEnd(boolean commandExecutionStarted) {
    ........省略干扰代码..........
    executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
    if (executionResultAtTimeOfCancellation == null) {
        metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
    } else {
        metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
    }
    ........省略干扰代码..........
}

注意看 metrics.markCommandDone,调用了HystrixCommandMetrics的markCommandDone方法,把一个executionResult传入了进来。ExecutionResult 这是个什么鬼呢?
我们截取部分代码浏览下

public class ExecutionResult {
    private final EventCounts eventCounts;
    private final Exception failedExecutionException;
    private final Exception executionException;
    private final long startTimestamp;
    private final int executionLatency; //time spent in run() method
    private final int userThreadLatency; //time elapsed between caller thread submitting request and response being visible to it
    private final boolean executionOccurred;
    private final boolean isExecutedInThread;
    private final HystrixCollapserKey collapserKey;

    private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();
    private static final int NUM_EVENT_TYPES = ALL_EVENT_TYPES.length;
    private static final BitSet EXCEPTION_PRODUCING_EVENTS = new BitSet(NUM_EVENT_TYPES);
    private static final BitSet TERMINAL_EVENTS = new BitSet(NUM_EVENT_TYPES);

以大家聪慧的头脑应该能够猜测到这个类就是当前HystrixCommand的 执行结果记录,只不过这个结果不仅仅是结果,也包含了各种状态以及出现的异常。它的身影在Hystrix执行原理里讲的各Observable里出现,跟着HystrixCommand整个生命周期。

回到上面讲,当时command执行完毕后,调用了HystrixCommandMetrics的markCommandDone方法

void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
    HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
    if (executionStarted) {
        concurrentExecutionCount.decrementAndGet();
    }
}

最终调用量HystrixThreadEventStream. executionDone方法的HystrixThreadEventStream是ThreadLocal方式,和当前线程绑定

//HystrixThreadEventStream.threadLocalStreams
private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() {
    @Override
    protected HystrixThreadEventStream initialValue() {
        return new HystrixThreadEventStream(Thread.currentThread());
    }
};

executionDone代码如下

public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
    HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
    writeOnlyCommandCompletionSubject.onNext(event);
}

这里根据 executionResult, threadpoolkey,comandKey,生成 了一个HystrixCommandCompletion然后通过writeOnlyCommandCompletionSubject写入,writeOnlyCommandCompletionSubject整个东西,我们等会再看。现在思考下HystrixCommandCompletion是什么?HystrixCommandCompletion包含了 ExecutionResultHystrixRequestContext,它是一种HystrixEvent,标识着command执行完成的一个事件,该事件是当前这个点HystrixCommand的请求信息,执行结果,状态等数据的载体。

从上面类图可以看到不仅仅HystrixCommandCompletion一种还有其它的Event,这里就不一一介绍了。

writeOnlyCommandCompletionSubject onNext的时候会触发 writeCommandCompletionsToShardedStreams执行里面的call()方法。

  private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
    @Override
    public void call(HystrixCommandCompletion commandCompletion) {
        HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
        commandStream.write(commandCompletion);

        if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) {
            HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey());
            threadPoolStream.write(commandCompletion);
        }
    }
};

这个方法的意思是,会把HystrixCommandCompletion 通过HystrixCommandCompletionStream 写入,如果当前command使用的是线程池隔离策略的话 会通过 HystrixThreadPoolCompletionStream 再写一遍。HystrixCommandCompletionStream HystrixThreadPoolCompletionStream 他们两个概念类似,我们拿着前者解释,这个是个什么东西。
HystrixCommandCompletionStream 以commandKey为key,维护在内存中,调用它的write的方法实则是调用内部属性 writeOnlySubject的方法,writeOnlySubject是一个Subject(RxJava的东西),通过SerializedSubject保证其写入的顺序性,调用其share()方法获得一个Observable也就是readOnlyStream,让外界能够读这个Subject的数据。总结下Subject是连接两个Observable之间的桥梁,它有两个泛型元素标识着进出数据类型,全部都是HystrixCommandCompletion类型

HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {
        this.commandKey = commandKey;

        this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create());
        this.readOnlyStream = writeOnlySubject.share();
    }

我们从源头开始梳理,明白了这个HystrixCommandCompletion数据流是如何写入的(其它类型的的思路一致,就不一一解释了),那它是如何被搜集起来呢?

三、指标数据搜集

追溯至AbstractCommand初始化

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
        HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
        HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
        HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

    ........省略代码........
    this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
    ........省略代码........
}

初始化command指标

HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
    super(null);
    this.key = key;
    this.group = commandGroup;
    this.threadPoolKey = threadPoolKey;
    this.properties = properties;

    healthCountsStream = HealthCountsStream.getInstance(key, properties);
    rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);
    cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);

    rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);
    rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);
    rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);
}

有很多各种 XXXStream.getInstance(),这些Stream就是针对各类用途进行指标搜集,统计的具体实现,下面可以看下他们的UML类图

Hystrix几个别Stream类图(并非所有子类)

BucketedCounterStream实现了基本的桶计数器,BucketedCumulativeCounterStream基于父类实现了累计计数,BucketedRollingCounterStream基于父类实现了滑动窗口计数。两者的子类就是对特定指标的具体实现。

接下来分两块累计计数和滑动窗口计数,挑选其对应的CumulativeCommandEventCounterStream和HealthCountsStream进行详细说明。

3.1、BucketedCounterStream 基本桶的实现

数据采集示意图

protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
                                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
    this.numBuckets = numBuckets;
    this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {
        @Override
        public Observable<Bucket> call(Observable<Event> eventBucket) {
            return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
        }
    };

    final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();
    for (int i = 0; i < numBuckets; i++) {
        emptyEventCountsToStart.add(getEmptyBucketSummary());
    }

    this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
        @Override
        public Observable<Bucket> call() {
            return inputEventStream
                    .observe()
                    .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
                    .flatMap(reduceBucketToSummary)                
                    .startWith(emptyEventCountsToStart);   
        }
    });
}

这里父类的构造方法主要成三个部分分别是
I. reduceBucketToSummary 每个桶如何计算聚合的数据

appendRawEventToBucket的实现由其子类决定,不过大同小异,我们自行拔下代码看下HealthCountsStream, 可以看到他用的是HystrixCommandMetrics.appendEventToBucket

public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = new Func2<long[], HystrixCommandCompletion, long[]>() {
        @Override
        public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) {
            ExecutionResult.EventCounts eventCounts = execution.getEventCounts();
            for (HystrixEventType eventType: ALL_EVENT_TYPES) {
                switch (eventType) {
                    case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here
                    default:
                        initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);
                        break;
                }
            }
            return initialCountArray;
        }
    };
}

这个方法就是将一个桶时长内的数据进行累计计数相加。initialCountArray可以看出一个桶内前面的n个数据流的计算结果,数组的下标就是HystrixEventType 枚举里事件的下标值。

II. emptyEventCountsToStart 第一个桶的定义,装逼点叫创世桶

III. window窗口的定义,这里第一个参数就是每个桶的时长,第二个参数时间的单位。利用RxJava的window帮我们做聚合数据。

.window(bucketSizeInMs, TimeUnit.MILLISECONDS)

Bucket 时长如何计算
每个桶的时长如何得出的?这个也是基于我们的配置得出,拿HealthCountsStream举例子。
metrics.rollingStats.timeInMilliseconds 滑动窗口时长 默认10000ms
metrics.healthSnapshot.intervalInMilliseconds 检测健康状态的时间片,默认500ms 在这里对应一个bucket的时长

滑动窗口内桶的个数 = 滑动窗口时长 / bucket时长

而 CumulativeCommandEventCounterStream
metrics.rollingStats.timeInMilliseconds 滑动窗口时长 默认10000ms
metrics.rollingStats.numBuckets 滑动窗口要切的桶个数

bucket时长 = 滑动窗口时长 / 桶个数

不同职能的 XXXStream对应的算法和对应的配置也不一样,不过都一个套路,就不一一去展示了。

inputEventStream
inputEventStream 可以认为是窗口采集的数据流,这个数据流由其子类去传递,大致看了下

//HealthCountsStream
private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs,
                               Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) {
    super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator);
}

//RollingThreadPoolEventCounterStream
private RollingThreadPoolEventCounterStream(HystrixThreadPoolKey threadPoolKey, int numCounterBuckets, int counterBucketSizeInMs,
                                                Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion,
                                                Func2<long[], long[], long[]> reduceBucket) {
    super(HystrixThreadPoolCompletionStream.getInstance(threadPoolKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket);
}

我们发现这个 inputEventStream,其实就是 HystrixCommandCompletionStream、HystrixThreadPoolCompletionStream或者其它的,我们挑其中HystrixCommandCompletionStream看下,这个就是上面第二部分指标数据上传里讲的写数据那个stream,inputEventStream.observe()也就是 HystrixCommandCompletionStream的 readOnlyStreamSubject的只读Observable。(这里如果没明白可以回到第二点看下结尾的部分)

3.2、累计计数器之CumulativeCommandEventCounterStream

先看下累计计数器的父类BucketedCumulativeCounterStream

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                                              Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                                              Func2<Output, Bucket, Output> reduceBucket) {
    super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);

    this.sourceStream = bucketedStream
            .scan(getEmptyOutputValue(), reduceBucket)
            .skip(numBuckets)
            ........省略代码........
            
}

bucketedStream就是3.1里的数据汇聚后的一个一个桶流,这里执行了scan方法,scan方法的意思就是会将当前窗口内已经提交的数据流进行按照顺序进行遍历并执行指定的function逻辑,scan里有两个参数第一个参数表示上一次执行function的结果,第二个参数就是每次遍历要执行的function,scan完毕后skip numBuckets 个bucket,可以认为丢弃掉已经计算过的bucket。

scan里的function是如何实现呢?它也是实现累计计数的关键,由子类实现,本小节也就是CumulativeCommandEventCounterStream来实现

CumulativeCommandEventCounterStream newStream = new CumulativeCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs,HystrixCommandMetrics.appendEventToBucket, HystrixCommandMetrics.bucketAggregator);

发现调用的是 HystrixCommandMetrics.bucketAggregator,我们看下其函数体

public static final Func2<long[], long[], long[]> bucketAggregator = new Func2<long[], long[], long[]>() {
    @Override
    public long[] call(long[] cumulativeEvents, long[] bucketEventCounts) {
        for (HystrixEventType eventType: ALL_EVENT_TYPES) {
            switch (eventType) {
                case EXCEPTION_THROWN:
                    for (HystrixEventType exceptionEventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) {
                        cumulativeEvents[eventType.ordinal()] += bucketEventCounts[exceptionEventType.ordinal()];
                    }
                    break;
                default:
                    cumulativeEvents[eventType.ordinal()] += bucketEventCounts[eventType.ordinal()];
                    break;
            }
        }
        return cumulativeEvents;
    }
};

call() 方法有两个参数第一个参数指的之前的计算结果,第二个参数指的当前桶内的计数,方法体不难理解,就是对各个时间的count计数累加。

如此,一个command的计数就实现了,其它累计计数也雷同。

3.3、滑动窗口之HealthCountsStream

直接父类代码

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                                           final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                                           final Func2<Output, Bucket, Output> reduceBucket) {
    super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
    Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
        @Override
        public Observable<Output> call(Observable<Bucket> window) {
            return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
        }
    };
    this.sourceStream = bucketedStream      
            .window(numBuckets, 1)          
            .flatMap(reduceWindowToSummary) 
            ........省略代码........
}

依然像累计计数器一样对父级的桶流数据进行操作,这里用的是window(),第一个参数表示桶的个数,第二个参数表示一次移动的个数。这里numBuckets就是我们的滑动窗口桶个数

滑动窗口

第一排我们可以认为是移动前的滑动窗口的数据,在执行完 flatMap里的function之后,滑动窗口向前移动一个桶位,那么 23 5 2 0 这个桶就被丢弃了,然后新进了最新的桶 45 6 2 0
那么每次滑动窗口内的数据是如何被处理呢?就是flatMap里的function做的,reduceWindowToSummary 最终被具体的子类stream实现,我们就研究下HealthCountsStream

private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() {
    @Override
    public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) {
        return healthCounts.plus(bucketEventCounts);
    }
};

//HystrixCommandMetrics.HealthCounts#plus
public HealthCounts plus(long[] eventTypeCounts) {
    long updatedTotalCount = totalCount;
    long updatedErrorCount = errorCount;

    long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
    long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
    long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
    long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
    long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];

    updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
    updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
    return new HealthCounts(updatedTotalCount, updatedErrorCount);
}

方法的实现也显而易见,统计了当前滑动窗口内成功数、失败数、线程拒绝数,超时数.....

该stream的职责就是探测服务的可用性,也是Hystrix熔断器是否生效依赖的数据源。

四、回顾

Hystrix的滑动窗口设计相对于其它可能稍微偏难理解些,其主要原因还是因为我们对RxJava的了解不够,不过这不重要,只要耐心的多看几遍就没有什么问题。

本篇主要从指标数据上报到指标数据收集来逐步解开Hystrix指标搜集的神秘面纱。最后借用一大牛的图汇总下本篇的内容

参考文档
官方文档-How it works
官方文档-configuration
Hystrix 1.5 滑动窗口实现原理总结


系列文章推荐
Hystrix常用功能介绍
Hystrix执行原理
Hystrix熔断器执行机制
Hystrix超时实现机制

查看原文

赞 2 收藏 3 评论 0

青芒 发布了文章 · 2018-03-22

Hystrix执行原理

前奏

Hystrix的常规使用姿势

    @Test
    public void test_run(){
        String s = new CommandHelloWorld("Bob").execute();
        System.out.println(s);
    }

我们的command在new的时候发生了什么?execute()是如何执行的?execute执行失败或者超时如何fallback?

一、PREPARE 初始化

当我们new XXCommand()的时候,大部分的工作都是在 AbstractCommand完成

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
        HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
        HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
        HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

    this.commandGroup = initGroupKey(group);
    this.commandKey = initCommandKey(key, getClass());
    this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
    this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
    this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
    this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
    this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

    //Strategies from plugins
    this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
    this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
    this.executionHook = initExecutionHook(executionHook);

    this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
    this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

    /* fallback semaphore override if applicable */
    this.fallbackSemaphoreOverride = fallbackSemaphore;

    /* execution semaphore override if applicable */
    this.executionSemaphoreOverride = executionSemaphore;
}

可以很清晰的看到,这里面在进行command配置装载、线程池配置装载及线程池的创建、指标搜集器、熔断器的初始化等等。

//HystrixCommandMetrics
ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>();

//HystrixThreadPoolDefault
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

//com.netflix.hystrix.HystrixCircuitBreaker.Factory
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

除HystrixCommand每次都需要重新建立,其它基本都以commandKey维护着配置,熔断器,指标的单例而线程池则以threadkey进场存储。

我们可以了了解下Hystrix的线程池如何管理
创建线程调用 HystrixThreadPool.Factory.getInstance

static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
    // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
    String key = threadPoolKey.name();

    // this should find it for all but the first time
    HystrixThreadPool previouslyCached = threadPools.get(key);
    if (previouslyCached != null) {
        return previouslyCached;
    }

    // if we get here this is the first time so we need to initialize
    synchronized (HystrixThreadPool.class) {
        if (!threadPools.containsKey(key)) {
            threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
        }
    }
    return threadPools.get(key);
}

从缓存中以threadPoolKey获取线程池,获取不到则 调用new HystrixThreadPoolDefault新建

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
    this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
    HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    this.queueSize = properties.maxQueueSize().get();

    this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
            concurrencyStrategy.getThreadPool(threadPoolKey, properties),
            properties);
    this.threadPool = this.metrics.getThreadPool();
    this.queue = this.threadPool.getQueue();

    /* strategy: HystrixMetricsPublisherThreadPool */
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}

注意

this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,concurrencyStrategy.getThreadPool(threadPoolKey, properties),properties);

其中 concurrencyStrategy.getThreadPool,HystrixConcurrencyStrategy就是hystrix的线程创建策略者

真正的创建线程执行
HystrixConcurrencyStrategy#getThreadPool

public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
    .....各种配置,此处代码省略......

    if (allowMaximumSizeToDivergeFromCoreSize) {
        final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
        if (dynamicCoreSize > dynamicMaximumSize) {
            logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                    dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    } else {
        return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
    }
}

这里调用java JUC原生的 ThreadPoolExecutor创建线程

二、Observable 大串烧

Hystrix的执行利用RxJava,组合了很多的Observable,形成一个Observable,和传统的调用链相比更加简洁。

三、各色Observable显神通

3.1.command 状态位

  1. toObservable 第一个observable,在下一个chain之前,会更改HystrixCommand状态位 OBSERVABLE_CHAIN_CREATED
  2. toObservable doOnTerminate,探测到terminate时,会将HystrixCommand更改为 TERMINAL
  3. executeCommandWithSpecifiedIsolation在开始执行的时候会更改HystrixCommand更改为 USER_CODE_EXECUTED
  4. toObservable doOnUnsubscribe,探测到terminate时,会将HystrixCommand更改为 UNSUBSCRIBED
3.2.executeCommandWithSpecifiedIsolation

分配执行线程,维护线程状态

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                .....省略干扰代码.....
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }

                if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                        // the command timed out in the wrapping thread so we will return immediately
                        // and not increment any of the counters below or other such logic
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {

                    try {
                       .....省略干扰代码.....

                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                } else {
                    //command has already been unsubscribed, so return immediately
                    return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                }
            }
        }).doOnTerminate(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                    //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
                }
                //if it was unsubscribed, then other cleanup handled it
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                    //if it was never started and was cancelled, then no need to clean up
                }
                //if it was terminal, then other cleanup handled it
            }
        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } else {
        .....省略干扰代码.....
    }
}

具体逻辑
1.判断隔离策略,如果是Semaphore 信号量则在当前线程上执行,否则进入线程分配逻辑
2.更改HystrixCommand的状态 USER_CODE_EXECUTED
3.判断HystrixCommand超时状态,如果已经超时则抛出异常
4.更改当前command的线程执行状态为 STARTED
5.调用 getUserExecutionObservable 执行具体逻辑
6.doOnTerminate 当Observale执行完毕后(HystrixCommand可能失败也可能执行成功),此时的线程状态可能有两种分别是 STARTEDNOT_USING_THREAD , 然后更改线程状态为 TERMINAL
7.doOnUnsubscribe 当Observable被取消订阅,更改线程状态为 TERMINAL
8.subscribeOn 指定scheduler,这里Hystrix实现了自己的scheduler,在scheduler的worker指定线程池,在配置线程之前会重新加载线程池配置(这里是Rxjava的东西,暂时大家可以粗略的认为这里就是指定线程池,然后把要执行的任务扔到这个线程池里)

@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    touchConfig();
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

// allow us to change things via fast-properties by setting it each time
private void touchConfig() {
    final int dynamicCoreSize = properties.coreSize().get();
    final int configuredMaximumSize = properties.maximumSize().get();
    int dynamicMaximumSize = properties.actualMaximumSize();
    final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
    boolean maxTooLow = false;

    if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
        //if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum
        dynamicMaximumSize = dynamicCoreSize;
        maxTooLow = true;
    }

    // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
    if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
        if (maxTooLow) {
            logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + configuredMaximumSize + ".  Maximum size will be set to " +
                    dynamicMaximumSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
        }
        threadPool.setCorePoolSize(dynamicCoreSize);
        threadPool.setMaximumPoolSize(dynamicMaximumSize);
    }

    threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
}

touchConfig 执行具体的线程池参数调整。

从上面的过程也能发现,该observable也是维护线程状态的地方,线程的状态变更见下图

3.3.getUserExecutionObservable

执行具体业务逻辑

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    Observable<R> userObservable;

    try {
        userObservable = getExecutionObservable();
    } catch (Throwable ex) {
        // the run() method is a user provided implementation so can throw instead of using Observable.onError
        // so we catch it here and turn it into Observable.error
        userObservable = Observable.error(ex);
    }

    return userObservable
            .lift(new ExecutionHookApplication(_cmd))
            .lift(new DeprecatedOnRunHookApplication(_cmd));
}

userObservable = getExecutionObservable(); 由HystrixCommand自己实现

//HystrixCommand
final protected Observable<R> getExecutionObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                return Observable.just(run());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    }).doOnSubscribe(new Action0() {
        @Override
        public void call() {
            // Save thread on which we get subscribed so that we can interrupt it later if needed
            executionThread.set(Thread.currentThread());
        }
    });
}

这里看到 run()应该就明白了,就是我们自己的业务代码 CommandHelloWorld去实现的。

3.4.getFallbackOrThrowException

当executeCommandWithSpecifiedIsolation探测到异常时触发该Observable。getFallbackOrThrowException里具体fallback执行看
executeCommandAndObserve。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    .....省略干扰代码.....
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        .....省略干扰代码.....
    };

    .....省略干扰代码.....

    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

doErrorResumeNext 会触发下一个 handleFallback。

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    ....省略干扰代码....

    if (isUnrecoverable(originalException)) {
        ....省略干扰代码....
    } else {
        ....省略干扰代码....

        if (properties.fallbackEnabled().get()) {
        
            ....省略干扰代码....

            Observable<R> fallbackExecutionChain;

            // acquire a permit
            if (fallbackSemaphore.tryAcquire()) {
                try {
                    if (isFallbackUserDefined()) {
                        executionHook.onFallbackStart(this);
                        fallbackExecutionChain = getFallbackObservable();
                    } else {
                        //same logic as above without the hook invocation
                        fallbackExecutionChain = getFallbackObservable();
                    }
                } catch (Throwable ex) {
                    //If hook or user-fallback throws, then use that as the result of the fallback lookup
                    fallbackExecutionChain = Observable.error(ex);
                }

                return fallbackExecutionChain
                        .doOnEach(setRequestContext)
                        .lift(new FallbackHookApplication(_cmd))
                        .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                        .doOnNext(markFallbackEmit)
                        .doOnCompleted(markFallbackCompleted)
                        .onErrorResumeNext(handleFallbackError)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } else {
               return handleFallbackRejectionByEmittingError();
            }
        } else {
            return handleFallbackDisabledByEmittingError(originalException, failureType, message);
        }
    }
}

这里优先几个步骤
1.判断异常是否是能走fallback处理,不能则抛出HystrixRuntimeException
2.判断配置是否开启允许fallback,开启,则进入 getFallbackObservable(),而该方法具体有HystrixCommand实现,调用的则是用户的Command的fallback方法,如果调用方没有覆盖该方法,则会执行HystrixCommand的fallback方法,抛出未定义fallback方法的异常

protected R getFallback() {
    throw new UnsupportedOperationException("No fallback available.");
 }

@Override
final protected Observable<R> getFallbackObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
               //调用方 fallback逻辑
                return Observable.just(getFallback());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    });
}

后续系列文章,欢迎参阅
Hystrix熔断器执行机制
Hystrix超时实现机制

查看原文

赞 2 收藏 3 评论 0

青芒 发布了文章 · 2018-03-22

Hystrix熔断器执行机制

本篇假设大家对Hystrix的执行过程及源码有一定的了解,这里介绍Hystrix的熔断器执行机制。

1.Hystrix 熔断器类结构


HystrixCircuitBreaker作为接口定义,具体的实现有NoOpCircuitBreakerHystrixCircuitBreakerImpl,其中NoOpCircuitBreaker只是个空壳没有具体的实现,相当于不熔断。HystrixCircuitBreakerImpl是主要的熔断逻辑实现。

2.Hystrix 熔断器状态

熔断器有三个状态 CLOSED OPENHALF_OPEN 熔断器默认关闭状态,当触发熔断后状态变更为 OPEN,在等待到指定的时间,Hystrix会放请求检测服务是否开启,这期间熔断器会变为HALF_OPEN 半开启状态,熔断探测服务可用则继续变更为 CLOSED关闭熔断器。
状态变更示意图

3.代码视角

ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

Hystrix为每个commandKey都维护了一个熔断器,保持着对应的熔断器,所以当new XXXHystrixCommand()的时候依然能够保持着原来熔断器的状态。

3.1 如何判定开启熔断
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    this.properties = properties;
    this.metrics = metrics;

    //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
    Subscription s = subscribeToStream();
    activeSubscription.set(s);
}

private Subscription subscribeToStream() {
    /*
     * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
     */
    return metrics.getHealthCountsStream()
            .observe()
            .subscribe(new Subscriber<HealthCounts>() {
                
                 //.....................省略干扰代码......................
                @Override
                public void onNext(HealthCounts hc) {
                    // check if we are past the statisticalWindowVolumeThreshold
                    if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                        
                    } else {
                        if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                            
                        } else {
                            
                            if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                circuitOpened.set(System.currentTimeMillis());
                            }
                        }
                    }
                }
            });
}

这里面HystrixBreaker启动的时候会订阅HystrixCommandMetricsHealthCountsStream,每当HealthCountsStream搜集到数据,都会触发上面的 onNext方法,然后该方法做下面几个判断
1.当前请求量是否达到设定水位(请求量太小不做阀值控制)
2.当前的请求错误量是否达到阀值,达到后会将熔断器状态置为 OPEN, circuitOpened设置为当前时间戳表示开启的时间。

3.2 attemptExecution

先看下HystrixCommand 的执行Observable
com.netflix.hystrix.AbstractCommand#applyHystrixSemantics

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);

        /* determine if we're allowed to execute */
        if (circuitBreaker.attemptExecution()) {
··········省略代码··········

这里,每次HystrixCommand执行都会调用 circuitBreaker.attemptExecution()

public boolean attemptExecution() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                if (isAfterSleepWindow()) {
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        //only the first request after sleep window should execute
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }

这里代码判断逻辑
1.判断是否强制开启熔断器,是则return false,command不能执行
2.判断是否强制关闭熔断器,是则return true, command可执行
3.判断熔断器是否开启 circuitOpened.get() == -1表示没有开启,则return true,command可执行。
4.到这步证明已经开启了熔断器,那么判断是否可尝试请求,如果可以同时会把熔断器的状态改为HALF_OPEN

3.3 markSuccess&markNonSuccess

com.netflix.hystrix.AbstractCommand#executeCommandAndObserve

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    ......省略干扰代码.......

    final Action1<R> markEmits = new Action1<R>() {
        @Override
        public void call(R r) {
            if (shouldOutputOnNextEvents()) {
                executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
            }
            if (commandIsScalar()) {
                ......省略干扰代码.......
                circuitBreaker.markSuccess();
            }
        }
    };

    final Action0 markOnCompleted = new Action0() {
        @Override
        public void call() {
            if (!commandIsScalar()) {
                ......省略干扰代码.......
                circuitBreaker.markSuccess();
            }
        }
    };

    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
            circuitBreaker.markNonSuccess();
            ......省略干扰代码.......
        }
    };

    ......省略干扰代码.......

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

此处表示HystrixCommand执行的过程中对应的熔断器状态变更,上面代码不难看出,当error的时候会触发circuitBreaker.markNonSuccess();,执行成功或者执行完成触发 circuitBreaker.markSuccess();

markNonSuccess

@Override
public void markNonSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
        //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
        circuitOpened.set(System.currentTimeMillis());
    }
}

如果能执行到markNonSuccess,说明此时熔断器是关闭状态,或者尝试放流阶段。关闭状态的话不做处理(未触发熔断),尝试放流时,发现依然执行失败,这里讲熔断器状态重新置为开启状态,并把circuitOpened设置为当前的时间戳。

markSuccess

@Override
public void markSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
        //This thread wins the race to close the circuit - it resets the stream to start it over from 0
        metrics.resetStream();
        Subscription previousSubscription = activeSubscription.get();
        if (previousSubscription != null) {
            previousSubscription.unsubscribe();
        }
        Subscription newSubscription = subscribeToStream();
        activeSubscription.set(newSubscription);
        circuitOpened.set(-1L);
    }
}

能走到markSuccess说明熔断器此时关闭或者放流阶段,尝试放流阶段则讲熔断器关闭,设置circuitOpened=-1,并重置指标统计。

4.结束了

到这里熔断器的介绍就结束了,回顾下主要有熔断器如何开启、如何关闭、几个状态的变更。一个完整的熔断器就此呈现在大家的面前。

查看原文

赞 3 收藏 4 评论 0

青芒 发布了文章 · 2018-03-21

Hystrix超时实现机制

HystrixCommand在执行的本篇假设大家都具备了如何使用hystrix的能力,如果还没接触过的朋友可参阅
Hystrix介绍
常规使用姿势

HystrixCommand在执行的过程中如何探测超时,本篇主要对此进行介绍说明。

1.主入口:executeCommandAndObserve

#com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        ···省略部分代码···
        Observable<R> execution;

        //判断是否开启超时监测
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }
executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

可以简单的认为lift 里面的对前面的Observable包含,类似装饰者,后面的parent就是指上层的Observable。其中 HystrixObservableTimeoutOperator 就是关键的部分。

2.关键点: HystrixObservableTimeoutOperator

先看下HystrixObservableTimeoutOperator.call(),TimerListener的实现

TimerListener listener = new TimerListener() {

                @Override
                public void tick() {
                   
                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                        // 标记事件,可以认为是开的hook,这里暂忽略
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                        //取消原Obserable的订阅
                        s.unsubscribe();

                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                            @Override
                            public void run() {
                                child.onError(new HystrixTimeoutException());
                            }
                        });
                        timeoutRunnable.run();
                    }
                }

                //获取配置的超时时间配置
                @Override
                public int getIntervalTimeInMilliseconds() {
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                }
            };

这段代码的意思就是,给当前command的超时状态置为超时,如果设置成功就抛出HystrixTimeoutException异常,紧接着被command的 doOnErron接收走 fallback逻辑

fallback
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        .................................

        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    //此处catch到超时异常
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };

        .................................

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }

同时s.unsubscribe()通知正在执行的线程,终止任务。如何终止呢?

executeCommandWithSpecifiedIsolation.subscribeOn()

subscribeOne的参数就是HystrixContextScheduler, Rxjava里 scheduler具体干活的是 worker,我们先看下Hystrix自定义scheduler的结构示意图

那么我们直奔主题,直接看 ThreadPoolWorker

//ThreadPoolWorker.schedule
@Override
public Subscription schedule(final Action0 action) {
    if (subscription.isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }

    ScheduledAction sa = new ScheduledAction(action);

    subscription.add(sa);
    sa.addParent(subscription);

    ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
    FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
    sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

    return sa;
}

1.开始的时候判断observable是否被订阅
2.被订阅后,将任务 submit到线程池
3.FutureCompleterWithConfigurableInterrupt scheduler在执行的时候,增加了observable的中断探测

private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
    private final FutureTask<?> f;
    private final Func0<Boolean> shouldInterruptThread;
    private final ThreadPoolExecutor executor;

    private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
        this.f = f;
        this.shouldInterruptThread = shouldInterruptThread;
        this.executor = executor;
    }

    @Override
    public void unsubscribe() {
        executor.remove(f);
        if (shouldInterruptThread.call()) {
            f.cancel(true);
        } else {
            f.cancel(false);
        }
    }

    .....省略代码.......
}

当observable 取消订阅时,就会把当前任务移除,并中断任务

到这里只是讲说了超时后的处理,如何认定执行超时呢?

3.匠心之巧

这里有个很巧妙的设计,再探HystrixObservableTimeoutOperator

final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

#com.netflix.hystrix.util.HystrixTimer#addTimerListener
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        // add the listener

        Runnable r = new Runnable() {

            @Override
            public void run() {
                try {
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };

        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        return new TimerReference(listener, f);
    }

利用了ScheduledThreadPoolExecutor,延迟执行,延迟时间就是我们设定的超时时间,我们再看下

#HystrixObservableTimeoutOperator
Subscriber<R> parent = new Subscriber<R>() {

                @Override
                public void onCompleted() {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onCompleted();
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onError(e);
                    }
                }

                .....  .....  .....  .....  .....  .....  .....  .....  .....

                private boolean isNotTimedOut() {
                    // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                            originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
                }

            };

这里parent就是指上层的obserable,这里可以抽象的认为是我们的HystrixCommand执行线程, 当command执行线程执行完成的时候或异常的时候,会执行 tl.clear(), 也就是Future.cancel()会中断 TimerListener 的ScheduledFuture 线程,迫使超时机制失效。

// tl.clear()
private static class TimerReference extends SoftReference<TimerListener> {
        private final ScheduledFuture<?> f;
        ....        ....        ....        ....        ....
        @Override
        public void clear() {
            super.clear();
            // stop this ScheduledFuture from any further executions
            f.cancel(false);
        }
    }

4.回归文字

HystrixCommand里有个 TimedOutStatus 超时状态
TimedOutStatus
现在可以认为有两个线程,一个是hystrixCommand任务执行线程,一个是等着给hystrixCommand判定超时的线程,现在两个线程看谁能先把hystrixCommand的状态置换,只要任何一个线程对hystrixCommand打上标就意味着超时判定结束。

查看原文

赞 1 收藏 2 评论 0

青芒 关注了用户 · 2018-03-16

AI乔治 @aipaojiao

在此我向大家推荐一个我的架构学习交流群。
交流学习群号: 976203838

里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。

大家也可以关注我的公众号:《Java烂猪皮》 每天准时会分享Java,架构,微服务,面试等经典案例文章。

关注 88

青芒 赞了文章 · 2018-03-16

“大话架构”阿里架构师分享的Java程序员需要突破的技术要点

一、源码分析

源码分析是一种临界知识,掌握了这种临界知识,能不变应万变,源码分析对于很多人来说很枯燥,生涩难懂。

源码阅读,我觉得最核心有三点:技术基础+强烈的求知欲+耐心。

我认为是阅读源码的最核心驱动力。我见到绝大多数程序员,对学习的态度,基本上就是这几个层次(很偏激哦):

  • 1、只关注项目本身,不懂就baidu一下。
  • 2、除了做好项目,还会阅读和项目有关的技术书籍,看wikipedia。
  • 3、除了阅读和项目相关的书外,还会阅读IT行业的书,比如学Java时,还会去了解函数语言,如LISP。
  • 4、找一些开源项目看看,大量试用第三方框架,还会写写demo。
  • 5、阅读基础框架、J2EE规范、Debug服务器内核。

大多数程序都是第1种,到第5种不光需要浓厚的兴趣,还需要勇气:我能读懂吗?其实,你能够读懂的

耐心,真的很重要。因为你极少看到阅读源码的指导性文章或书籍,也没有人要求或建议你读。你读的过程中经常会卡住,而一卡主可能就陷进了迷宫。这时,你需要做的,可能是暂时中断一下,再从外围看看它:如API结构、框架的设计图。

下图是我总结出目前最应该学习的源码知识点:

clipboard.png

二、分布式架构

分布式系统是一个古老而宽泛的话题,而近几年因为 “大数据” 概念的兴起,又焕发出了新的青春与活力。除此之外,分布式系统也是一门理论模型与工程技法并重的学科内容。相比于机器学习这样的研究方向,学习分布式系统的同学往往会感觉:“入门容易,深入难”。的确,学习分布式系统几乎不需要太多数学知识。

分布式系统是一个复杂且宽泛的研究领域,学习一两门在线课程,看一两本书可能都是不能完全覆盖其所有内容的。

总的来说,分布式系统要做的任务就是把多台机器有机的组合、连接起来,让其协同完成一件任务,可以是计算任务,也可以是存储任务。如果一定要给近些年的分布式系统研究做一个分类的话,我个人认为大概可以包括三大部分:

  • 分布式存储系统
  • 分布式计算系统
  • 分布式管理系统

下图是我总结近几年目前分布式最主流的技术:

clipboard.png

三、微服务

当前微服务很热,大家都号称在使用微服务架构,但究竟什么是微服务架构?微服务架构是不是发展趋势?对于这些问题,我们都缺乏清楚的认识。

为解决单体架构下的各种问题,微服务架构应运而生。与其构建一个臃肿庞大、难以驯服的怪兽,还不如及早将服务拆分。微服务的核心思想便是服务拆分与解耦,降低复杂性。微服务强调将功能合理拆解,尽可能保证每个服务的功能单一,按照单一责任原则(Single Responsibility Principle)明确角色。 将各个服务做轻,从而做到灵活、可复用,亦可根据各个服务自身资源需求,单独布署,单独作横向扩展。

下图是我总结出微服务需要学习的知识点:

clipboard.png

四、性能优化

不管是应付前端面试还是改进产品体验,性能优化都是躲不开的话题。

优化的目的是让用户有“快”的感受,那如何让用户感受到快呢?

加载速度真的很快,用户打开输入网址按下回车立即看到了页面
加载速度并没有变快,但用户感觉你的网站很快
性能优化取决于多个因素,包括垃圾收集、虚拟机和底层操作系统(OS)设置。有多个工具可供开发人员进行分析和优化时使用,你可以通过阅读 Java Tools for Source Code Optimization and Analysis 来学习和使用它们。

必须要明白的是,没有两个应用程序可以使用相同的优化方式,也没有完美的优化 java 应用程序的参考路径。使用最佳实践并且坚持采用适当的方式处理性能优化。想要达到真正最高的性能优化,你作为一个 Java 开发人员,需要对 Java 虚拟机(JVM)和底层操作系统有正确的理解。

以上五大知识体系是我从业多年总结出来的经验,都是当前最主流的技术。想学习这些技术的朋友可以加群:478030634。群里会分享这些技术知识点供大家学习免费下载

下图是我总结性能优化应该学习理解的几大知识体系:

clipboard.png

五、Java工程化

工欲善其事,必先利其器,不管是小白,还是资深开发,都需要先选择好的工具。提升开发效率何团队协作效率。让自己有更多时间来思考。

“大话架构”阿里架构师分享的Java程序员需要突破的技术要点

clipboard.png

查看原文

赞 69 收藏 186 评论 4

认证与成就

  • 获得 39 次点赞
  • 获得 2 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 2 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2016-05-02
个人主页被 1.6k 人浏览