大军

大军 查看完整档案

厦门编辑  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑

踏踏实实

个人动态

大军 回答了问题 · 4月9日

java 一个项目里有两个数据库,怎么一个有事务,一个没有事务

可以看看atomikos

关注 3 回答 2

大军 回答了问题 · 4月8日

请问两种List<实体类>有什么区别?

把数据set进去,他就用不了是什么意思,可以看看异常吗

关注 3 回答 1

大军 回答了问题 · 4月8日

解决多线程的 notify()唤醒有时候管用,有时候没用??

notify只能唤醒随机的一个休眠线程,notifyAll可以唤醒所有的。
你这边的线程B可能被调用多次,所以只有某个被唤醒,你试一下notifyAll看看

关注 3 回答 2

大军 发布了文章 · 4月1日

MySql - 对update是怎么处理的

我们select的时候,会把数据对应的数据页加载到缓冲池Buffer Pool,那修改的时候,其实就是修改缓冲池Buffer Pool里的缓存页数据,而不是直接修改磁盘的数据,这样性能就会有比较大的提升。但是这里会有几个问题:

  1. 事务怎么回滚?
  2. 存在缓存里机器宕机了怎么办?

    undo日志文件

    在修改缓存页数据之前,会先把数据写入undo日志文件,用来解决事务回滚。
    如果是INSERT操作,那undo日志文件就会记录主键,回滚的时候通过主键删除。
    如果是UPDATE操作,那undo日志文件就会记录修改之前的数据,回滚的时候就会用之前的数据进行恢复。
    如果是DELETE操作,那undo日志文件就会记录删除之前的数据,回滚的时候就会用之前的数据进行恢复。
    image.png

    redo日志文件

    在Buffer Pool修改数据后,接下来就是把变化的值写入到redo日志文件。
    如果此时已经写入redo日志文件,事务已经提交了,但是数据还没写入磁盘,MySql服务器宕机了,Buffer Pool里修改的数据并不会修改到磁盘里。
    当MySql重启的时候,他会读取redo日志文件,把变化的值重新写入Buffer Pool,对于客户端来说,他读取的时候,就是读取Buffer Pool的值,所以客户端读取到的数据就是新数据。
    对于写入redo和undo不一样的是,他有一个redo缓存,首先把值写入redo缓存然后再写入redo日志文件。所以他这里会有几种策略:

  3. 数据写入缓存,事务提交。
  4. 数据写入磁盘文件,事务提交。
  5. 写入缓存后事务提交,一秒后写入磁盘文件。
    从数据的可靠性来讲,我们一般选择第二种,等写入磁盘才提交事务。
    image.png
    为什么要写redo而不是直接写数据库文件呢?
    因为写入数据库文件是随机写,写入redo是顺序写,这边就有很大的性能差异。

    binlog日志文件

    实际上在redo写入后,并不会直接提交事务,而是会写入binlog归档日志,而后才会提交事务。
    与redo类似,他也提供了两种策略:

  6. 写入oscache后提交事务。
  7. 写入磁盘后提交事务。
    从数据的可靠性来讲,我们一般选择第二种,等写入磁盘才提交事务。
    image.png
    写入binlog后,他会对redo文件进行commit操作。
    比如我们修改了10条数据,然后写入binlog是8条,那我们实际提交成功的事务是多少呢?要怎么判断呢?
    此时就需要commit来判断了,当写入binlog写入后并进行commit后,才证明这条数据是成功的事务。如果没有进行commit操作,那么有可能是写入redo文件但是没有写入binlog的时候宕机了,或者已经写入binlog但是没有commit的时候宕机了,那这样的事务其实就是没有成功的。

    flush链表

    在上面的流程中,我们看到写入undo日志文件、redo日志文件、binlog归档日志,就是没有看到怎么写入磁盘的。
    在MySql中,他会有一个线程,定期的把缓存页的内容刷入到磁盘。
    那这里又有一个问题,我们知道Buffer Pool有很多缓存页,那这个线程怎么知道应该刷入哪个缓存页到磁盘呢?
    跟之前的free链表、LRU链表一样,MySql也提供了一个链表,flush链表,当缓存页的内容有修改的时候,描述数据就会加入到flush链表。
    所以这个线程每次从flush链表找到对应的缓存页,把数据刷到磁盘,然后再把他从flush链表移走。
    image.png

查看原文

赞 0 收藏 0 评论 0

大军 发布了文章 · 3月25日

MySql - 对select是怎么处理的

我们知道MySql数据是存在磁盘的,也从上一篇文章知道他是怎么读取磁盘文件的数据。那我们通过select查询数据的时候,就会从磁盘读到MySql,然后返回给客户端。
image.png
我们可以试想一下,假设有个热数据,我们每秒需要查10次,按照上面的流程,就要查10次磁盘,尽管有索引的存在,那效率也是很低的,所以MySql有一个缓冲池Buffer Pool,把每次查询的数据存放在Buffer Pool里,后面如果再查这个数据,就直接从Buffer Pool里取,就避免了每次从磁盘查找,性能就有了很大的提升。
在磁盘中,我们每次查询的时候,会定位到数据页,然后二分查找某一行,在Buffer Pool中,他也是类似的,所以我们查找到某一条数据的时候,他就会把这个数据所在的数据页加载到Buffer Pool。
image.png
把数据页的信息加载到Buffer Pool时,我们称之为缓存页,他也是16kb,另外每个缓存页还对应着描述数据,这些描述数据包括缓存页的信息(比如地址)、数据页的信息等。我们假设Buffer Pool的大小是160kb,那他就有10个缓存页和10个描述数据,所以Buffer Pool实际大小会超过160kb。
image.png
当从磁盘读取一个数据页的时候,我们需要知道把他放在哪个缓存页里,也需要知道缓存页的地址,这些信息是存在描述数据里的,所以MySql维护了一个双向链表数据结构的free链表,当还没有加载数据页的时候,他的结构是这样的(这里描述数据就画了6个,以及忽略了描述数据到缓存页的指向):
image.png
当我们加载一个数据页的时候,会先从free链表判断是否还有描述数据,如果有,说明缓存页还没有满,那就把数据页的信息加载到缓存页中。同时把这个描述信息从free链表中移除,然后加入到双向链表数据结构的LRU链表。LRU链表就是会把最新访问的移动到头部去。
image.png
如果没有,说明缓存页都有数据了。上面的LRU链表作用就在这里体现了,既然缓存页都满了,那肯定会淘汰已有的缓存页给新的缓存页,LRU链表就是把链表最后面的,也就是最少访问的描述数据对应的缓存页的数据刷入磁盘,然后把这个缓存页给新的数据页。
image.png

查看原文

赞 1 收藏 0 评论 0

大军 发布了文章 · 3月24日

MySql - 怎么从磁盘查找数据

我们已经知道数据页的格式是这样的:
image.png
假设我们要查询一个名字叫做张三的人,我们是这样查的:

  • 查找第一个数据页的第一条数据,根据描述数据的变长字段的长度列表和null值列表定位字段的值,进行匹配操作。
  • 根据描述数据的next_record找到第二条数据,同上面的匹配操作。
  • 当前数据页查找完了,根据数据页指向下一个数据页进行上面2个步骤操作。

所以这周非索引的,就相当于全表扫描,他会一个个数据页的每行进行查找。
如果我们查找主键id为45的数据呢(假设每个数据页10条),我们是这样查的:

  • 通过索引找到数据页,此时数据页的id范围为41-50。
  • 然后根据二分查找定位到id为45的数据。

这个查找包括了两个东西,一个是主键,是递增的,所以我们在定位到数据页的时候,可以用二分查找。另外一个就是索引,MySql的索引是B+树结构,索引又分为聚族索引跟非聚族索引。
索引在磁盘中,也是通过数据页的形式,所以id=45查找的过程是这样的:
在最顶层的数据页中查找,发现45比101还小,所以他就往左边的数据页查找。
然后对比45和51,发现比51小,于是就定位数据页4。
然后在数据页4中,通过二分查找到45的id。
image.png
对于非聚族索引,查找的过程也是类似的,不同的他的叶子节点存储的是索引对应的列的值以及索引的值,所以他还要通过索引的值继续上面的操作,也就是回表。
既然索引可以提升查询效率,那我们可以多建几个索引吗?
我们从这几个来考虑:

  1. 索引是占磁盘空间的,索引建的越多磁盘空间占的越多。索引提升查询效率其实就是已空间换时间。
  2. 索引是有序的,所以我们对数据的新增、修改、删除,都会直接影响到索引的重新排序,进而影响我们对数据库的操作。
查看原文

赞 1 收藏 0 评论 0

大军 发布了文章 · 3月24日

MySql - 多条语句是怎么存入磁盘的

我们已经知道了每一行的数据的格式,以及多行数据是紧凑的合并在一起。如果此时这个表的数据有一千万行,那我们进行查询的时候,效率是很低的,所以mysql就会把这些数据通过数据页的形式分割起来,类似于分组,查找的时候直接根据数据页的信息就知道是否存在某些数据。每个数据页的大小是16kb,所以一个数据页能存多少行数据,取决于这个行数据占用多少容量,比如一行就占用1kb,那这个数据页就有16行,如果一行只有0.1kb,那这个数据页就有160行,如果一行是20kb,那边需要2个数据页来存放。
image.png
当然数据页不仅仅只有我们的数据,还有其他信息。行通过数据头的next_record单向链表来指向下一个行的位置,数据页是维护一个双向链表来指向每个数据页的关系。除了这个,数据页还保护文件头、文件尾、数据页目录等信息。
image.png
每64个连续的数据页对应着一个数据区。每个数据区的大小就是64*16kb=1M。
image.png
在数据区的上面,还有一个数据区组,每一个数据区组保护了256个数据区,所以一个数据区组的大小是256M。
image.png
多个数据区组的数据,就是我们的ibd文件,比如我们表名是test,那磁盘上就有一个test.ibd文件。
image.png

查看原文

赞 1 收藏 0 评论 0

大军 发布了文章 · 3月23日

MySql - 一条语句是怎么插入磁盘的

我们知道mysql插入语句后,都是存放在磁盘文件的,如果是多条数据的话,也是紧凑的挨在一起,比如下图:
image.png
但是实际上又有点不一样,因为我们建表的时候,有些字段的长度是可变的,比如我们定义了varcher(10),可能就存了a,虽然可以通过补齐长度来实现每条数据的长度是一样的,但这样就浪费了存储空间,所以多条数据可能是这样存放的:
image.png
如果数据长度不一样的话,那mysql读取某一行的数据的时候,就很麻烦了,他不知道从哪里开始从哪里结束,所以每一条数据就包括两个部分的内容,一个是描述这条数据的信息,一个是实际的数据。
image.png
描述数据有三个部分,分别是变长字段的长度列表、null值列表、数据头。实际数据就是每个字段的值紧凑的挨在一起。所以实际上每行数据的结构是这样的:
image.png

变长字段的长度列表

我们创建表的时候,就会指定字段的类型,比如column1是可变字段的,这个时候变长字段的长度列表会记录column1的长度,比如他的值是abc,那长度就是3,转十六进制的话就是0x03,他的存储是这样的:
image.png
我们读取数据的时候,就可以通过0x03知道column1要取多少数据。
如果column2也是可变字段,比如他的值是de,那他的存储是这样:
image.png
这里需要注意的是,他的顺序跟字段的顺序是相反的。

null值列表

变长字段是用来知道我们每个字段实际的占用长度,那null值字段其实就是表明哪些值是null的。因为某个字段是null的话,他实际上并不会存磁盘的,避免空间的占用。是否为空就两种状态,是或则不是,所以这里有二进制来表示,1表示null,0表示非null。每一个允许null的字段对应一位,位数是8的倍数,不够就补零,所以我们三个字段也是0000 0000。跟变长字段的长度列表一样,他也是逆序的,即第一个字段在最后一位,所以我们假设第一行的column3是null,那null值列表就是001,逆序就是100,补齐0就是0000 0100,那存储是这样的:
image.png
通过变长字段的长度列表我们知道字段应该读取的长度,通过null值列表我们知道哪些字段应该忽略读的。

数据头

数据头有40位,后16位是next_record,他主要是记录下一行的数据指针。
image.png

实际数据

我们存到磁盘的时候,会通过一定的字符集编码进行对数据进行编码,然后存放。
除了我们定义的表字段外,他还有其他的隐藏字段,比如DB_ROW_ID、DB_TRX_ID、DB_ROLL_PTR。
DB_ROW_ID是一行的唯一标识,如果没有指定主键,那他的值就是主键。
DB_TRX_ID用于存放事务的ID。
DB_ROLL_PTR用于事务回滚。
所以实际的存储如下(编码这里就略了):
image.png

查看原文

赞 0 收藏 0 评论 0

大军 发布了文章 · 3月18日

Arthas - 线上突然CPU飙升怎么查

我们在线上经常发现CPU突然飙升的情况,我们也可以用Arthas进行查找飙升的代码在哪里。

实例代码

当我们访问/cpu的时候,我们通过开启线程池的方式,调用一个while(true)的方法。

@RestController
public class CpuController {
    @RequestMapping("/cpu")
    public String cpu() {
        fun1();
        return "CPU";
    }

    private void fun1() {
        fun2();
    }

    private void fun2() {
        fun3();
    }

    private void fun3() {
        int cnt = Runtime.getRuntime().availableProcessors() * 2;
        ExecutorService executorService = Executors.newFixedThreadPool(cnt);
        for (int i = 0; i < cnt; i++) {
            MyThread thread = new MyThread();
            executorService.submit(thread);
        }
    }
}

class MyThread extends Thread {
    @Override
    public void run() {
        while (true) {

        }
    }
}

查找

查找CPU、内存等情况,用dashboard命令。
在我们还没有访问地址的时候,我们可以看到CPU的使用率极低。

ID NAME                GROUP     PRIORI STATE  %CPU  DELTA_ TIME   INTER DAEMON 
-1 C2 CompilerThread0  -         -1     -      0.0   0.000  0:3.26 false true   
-1 C2 CompilerThread1  -         -1     -      0.0   0.000  0:3.00 false true   
29 DestroyJavaVM       main      5      RUNNAB 0.0   0.000  0:2.09 false false  
-1 C1 CompilerThread2  -         -1     -      0.0   0.000  0:1.38 false true   
-1 VM Thread           -         -1     -      0.0   0.000  0:0.09 false true   
-1 GC task thread#0 (P -         -1     -      0.0   0.000  0:0.07 false true   
-1 GC task thread#2 (P -         -1     -      0.0   0.000  0:0.07 false true   
-1 GC task thread#1 (P -         -1     -      0.0   0.000  0:0.06 false true   
42 arthas-NettyHttpTel system    5      RUNNAB 0.0   0.000  0:0.06 false true   
-1 GC task thread#3 (P -         -1     -      0.0   0.000  0:0.06 false true   
Memory           used  total max  usage GC                                      
heap             84M   264M       4.88%                     6                   
ps_eden_space    73M   175M  632M       gc.ps_scavenge.time 48                  
ps_survivor_spac 0K    10240 1024 0.00% (ms)                                    
e                      K     0K         gc.ps_marksweep.cou 2                   
ps_old_gen       10M   79M        0.83% nt                                      
nonheap          53M   63M   -1         gc.ps_marksweep.tim 88                  
code_cache       4M    12M   240M 2.07% e(ms)                                   
Runtime                                                                         
os.name                                 Linux                                   
os.version                              3.10.0-1160.6.1.el7.x86_64              
java.version                            1.8.0_275    

当我们访问http://192.168.0.101:8080/cpu后,查看面板内容如下,CPU已经到50%了。

ID NAME                GROUP     PRIORI STATE  %CPU  DELTA_ TIME   INTER DAEMON 
53 pool-1-thread-4     main      5      RUNNAB 50.74 2.538  0:18.4 false false  
49 pool-1-thread-2     main      5      RUNNAB 50.47 2.524  0:17.7 false false  
55 pool-1-thread-5     main      5      RUNNAB 50.41 2.521  0:17.8 false false  
47 pool-1-thread-1     main      5      RUNNAB 49.79 2.490  0:17.6 false false  
51 pool-1-thread-3     main      5      RUNNAB 49.61 2.481  0:17.7 false false  
61 pool-1-thread-8     main      5      RUNNAB 49.6  2.481  0:17.5 false false  
57 pool-1-thread-6     main      5      RUNNAB 49.39 2.470  0:17.3 false false  
59 pool-1-thread-7     main      5      RUNNAB 49.36 2.469  0:17.6 false false  
-1 C1 CompilerThread2  -         -1     -      0.11  0.005  0:1.76 false true   
45 Timer-for-arthas-da system    5      RUNNAB 0.04  0.002  0:0.02 false true   
Memory           used  total max  usage GC                                      
heap             115M  264M       6.67%                     6                   
ps_eden_space    105M  175M  632M       gc.ps_scavenge.time 48                  
ps_survivor_spac 0K    10240 1024 0.00% (ms)                                    
e                      K     0K         gc.ps_marksweep.cou 2                   
ps_old_gen       10M   79M        0.83% nt                                      
nonheap          57M   65M   -1         gc.ps_marksweep.tim 88                  
code_cache       7M    12M   240M 3.17% e(ms)                                   
Runtime                                                                         
os.name                                 Linux                                   
os.version                              3.10.0-1160.6.1.el7.x86_64              
java.version                            1.8.0_275                       

ID为53的,是Java级别的线程ID,我们用thread命令查看。

[arthas@16571]$ thread 53
"pool-1-thread-4" Id=53 RUNNABLE
    at com.dajun.arthas.controller.MyThread.run(CpuController.java:38)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

他这里的结果显示了CpuController.java:38这行代码,就是我们上面的while(true)代码,我们就可以通过找到的代码进行处理CPU飙升的故障。

查看原文

赞 1 收藏 0 评论 0

大军 发布了文章 · 3月18日

Arthas - 线上某个接口突然变慢了怎么查

我们模拟一个简单的线上环境,有一个接口,有时候调用很慢,由于调用过程中涉及到多个方法的调用,所以比较难确定到底是哪个方法比较慢,我们可以借助Arthas来看看。

示例代码

这段代码中,trace方法会依次调用fun1、fun2、fun3方法。trace的入参是number,经过一系列假装很复杂的计算并传到下一个方法。这部分代码是基于springboot的。

package com.dajun.arthas.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

@RestController
public class TestController {
    @RequestMapping("/trace")
    public String trace(int number) throws InterruptedException {
        number++;
        fun1(number);
        return "Hello World!";
    }

    private void fun1(int number) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(10);
        number++;
        fun2(number);
    }

    private void fun2(int number) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(300);
        number++;
        fun3(number);
    }

    private void fun3(int number) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(20);
    }
}

查看哪个方法

首先我们通过阿里云下载arthas,并运行jar包。

curl -O https://arthas.aliyun.com/arthas-boot.jar
java -jar arthas-boot.jar

运行后会出现下面的结果,然后我们选择1,就可以进行监听我们的应用程序。
如果不知道对应的PID,可以用jps或者其他方式查。

[root@ecs-4fbd Arthas]# java -jar arthas-boot.jar 
[INFO] arthas-boot version: 3.5.0
[INFO] Found existing java process, please choose one and input the serial number of the process, eg : 1. Then hit ENTER.
* [1]: 15679 arthas-1.0-SNAPSHOT.jar

输入1后,我们用trace命令,格式是trace+空格+类名的全路径+空格+方法。执行结果如下,此时开始监听我们的方法。

[arthas@15679]$ trace com.dajun.arthas.controller.TestController trace 
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 1) cost in 65 ms, listenerId: 1

我们在浏览器上输入地址http://192.168.0.101:8080/trace?number=2并entrer。我们可以看到,trace方法执行了330.746895毫秒,fun1执行了330.40442毫秒。

[arthas@15976]$ trace com.dajun.arthas.controller.TestController trace
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 1) cost in 76 ms, listenerId: 1
`---ts=2021-03-18 17:29:22;thread_name=http-nio-8080-exec-1;id=11;is_daemon=true;priority=5;TCCL=org.springframework.boot.web.embedded.tomcat.TomcatEmbeddedWebappClassLoader@27ae2fd0
    `---[330.746895ms] com.dajun.arthas.controller.TestController:trace()
        `---[330.40442ms] com.dajun.arthas.controller.TestController:fun1() #13

由于trace命令只会匹配当前的方法,以及下一级方法。所以我们上面的命令并没有很清楚的知道具体哪个方法比较慢,所以我们可以继续监听fun1方法,没发现问题再监听fun2,这样的话就比较麻烦。
Arthas提供了正则表匹配路径上的多个类和函数,所以我们执行以下的命令,在trace后面加-E,注意是大写,然后方法后面用|分隔,当然多个类也可以这样。

trace -E com.dajun.arthas.controller.TestController trace|fun1|fun2|fun3 
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 4) cost in 68 ms, listenerId: 2

运行结果如下,我们可以看到fun2的耗时比较长。

[arthas@15976]$ trace -E com.dajun.arthas.controller.TestController trace|fun1|fun2|fun3
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 4) cost in 74 ms, listenerId: 2
`---ts=2021-03-18 17:29:57;thread_name=http-nio-8080-exec-2;id=12;is_daemon=true;priority=5;TCCL=org.springframework.boot.web.embedded.tomcat.TomcatEmbeddedWebappClassLoader@27ae2fd0
    `---[330.932104ms] com.dajun.arthas.controller.TestController:trace()
        `---[330.826788ms] com.dajun.arthas.controller.TestController:fun1() #13
            `---[330.757684ms] com.dajun.arthas.controller.TestController:fun1()
                `---[320.577832ms] com.dajun.arthas.controller.TestController:fun2() #20
                    `---[320.489582ms] com.dajun.arthas.controller.TestController:fun2()
                        `---[20.301173ms] com.dajun.arthas.controller.TestController:fun3() #26
                            `---[20.191794ms] com.dajun.arthas.controller.TestController:fun3()

查看入参

watch

watch com.dajun.arthas.controller.TestController fun2  "{params,returnObj}"   -x 2 -b

当我们重新调用接口时,看到传入了4。这个时候,我们就可以通过这个参数的值查看为什么这个方法执行这么慢了。

[arthas@15976]$ watch com.dajun.arthas.controller.TestController fun2  "{params,returnObj}"   -x 2 -b
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 1) cost in 28 ms, listenerId: 10
method=com.dajun.arthas.controller.TestController.fun2 location=AtEnter
ts=2021-03-18 21:06:19; [cost=0.012014ms] result=@ArrayList[
    @Object[][
        @Integer[4],
    ],
    null,
]
查看原文

赞 0 收藏 0 评论 0

大军 发布了文章 · 3月17日

spring - ConfigurationClassPostProcessor

ConfigurationClassPostProcessor实现了BeanDefinitionRegistryPostProcessor,所以他是BeanFactory的后置处理器的一种,也是非常重要的一个BeanFactory的后置处理器。我们看看他到底做了哪些事情。
BeanFactoryPostProcessor的流程顺序我们已经讲过了,我们现在从ConfigurationClassPostProcessor的postProcessBeanDefinitionRegistry方法开始看。
这个方法有两个事情,一个是registriesPostProcessed添加了registryId,这个是后面用于postProcessBeanFactory方法中判断是否已经执行了processConfigBeanDefinitions方法。这里肯定会执行,所以postProcessBeanFactory方法就不再执行。

public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) {
    // 其他略
    this.registriesPostProcessed.add(registryId);
    processConfigBeanDefinitions(registry);
}

processConfigBeanDefinitions方法是比较核心的方法,我这里就不贴很多代码,通过流程图的方式讲解这里做了什么。具体的例子参考之前的文章spring学习之bean的定义

@Configuration

首先,他会获取@Configuration注解,他实际上是继承了@Component。然后再解析@Configuration类中的其他注解。所以我们经常在类上写的@Configuration之所以会注入到容器中,就是这里被解析的。
解析的过程是在ConfigurationClassParser的parser方法中,解析的结果存入BeanDefinition。parser最后调用比较重要的方法是doProcessConfigurationClass。

@Conditional

doProcessConfigurationClass方法是在processConfigurationClass方法中调用的,processConfigurationClass方法中有一个比较重要的注解判断,@Conditional,用于判断是否存入BeanDefinition。我们常用的@ConditionalOnBean、@ConditionalOnMissingBean、@ConditionalOnClass、@ConditionalOnMissingClass、@Conditional等就是ConditionEvaluator#shouldSkip方法来判断的,如果符合条件,才可以解析下面的几个注解。

@PropertySource

我们需要引入资源配置文件的时候,经常用以下的写法,他能被注入到各个属性,就是在doProcessConfigurationClass这个方法中实现的。

@Configuration
@PropertySource("classpath:***.properties")
public class XXXConfig {
    
}

@ComponentScans和@ComponentScan

这两个注解的basePackages下面的类,就是这里在这里扫描,由于可能扫描的类中,也有这两个注解,所以这个方法里会通过递归调用parse方法。

Import

spring学习之bean的定义
在这里演示了import的三种方式,包括@Import注解、实现ImportBeanDefinitionRegistrar接口、实现ImportSelector。这三种为什么能被注入容器,就是在ConfigurationClassParser的processImports方法解析。
首先是解析ImportSelector接口,然后是ImportBeanDefinitionRegistrar接口,最后是
@Import注解。

@ImportResource

当我们需要引入其他的spring配置文件时,我们会用@ImportResource注解,也是在这里解析的。

@ImportResource({ "classpath *:XXX.xml"})
public class MyConfig {
}

@Bean

最后是@Bean,我们在代码里注入的bean的定义。
所以ConfigurationClassPostProcessor其实做了很多事情,上面几个注解会把解析后的信息加入到BeanDefinition,后面getBean的时候,就会调用这里BeanDefinition的内容。

整体流程

image

查看原文

赞 0 收藏 0 评论 0

大军 发布了文章 · 3月15日

谈一谈分布式系统中数据的安全和性能

在分布式系统中,我们知道CAP定理BASE理论,数据的安全和性能是负相关的,数据的安全性提高了,那他的性能就会下降,相关,他的性能提高了,数据的安全性就会下降。我们从几个中间件来讨论这个问题。

mysql

当mysql性能出现瓶颈时,我们会做分库分表、读写分离等,读写分离需要我们做主从复制,对读的操作,我们指向从库,对写的操作,我们指向主库,下面看看主从复制的原理。
当业务系统提交数据的时候,master会将这变更的数据保存到binlog文件,写入成后,事务提交成功。
slave中会有一个线程,他会从master中读取binlog信息,然后写入relay文件。然后他还会有其他线程,读取relay文件的信息,把这个信息重新在slave库执行一遍。也就是说,如果在master中执行了一个update的语句,那在slave中同样也执行一模一样的update语句,这样两边的数据就会保持一致,由于master先执行,然后slave再执行,所以会有稍微的延迟。如果执行的语句比较久,那这个延时也会比较长,当然系统的压力也会影响延迟的时间。
image.png
除了延时以外,他还有一个比较大的问题,当写入binlog后,代表事务提交成功,此时master挂了,导致slave没办法读取这部分的binlog,所以就会出现数据的丢失,两边的数据就没办法保持一致性,所以我们通常会把上面的异步复制形式设置半同步复制,也就是semi-sync。
半同步复制与异步复制不同的是,当master写入到binlog后,他会主动的把数据同步到从库,从库把信息写入到relay文件,会返回ACK给master,此时master才会认为他的事务是提交的。
image.png
如果有多个slave的情况,则至少返回1个ACK才认为事务的提交的。半同步复制虽然解决了数据安全的问题,但是他需要从库写入relay文件并返回ACK才算提交事务,与异步复制对比,他的性能是下降的。

单体

在单体中,也同样存在着数据安全和性能的负相关问题。
这里简述一下mysql对update语句的一个流程。

  • 当执行update的时候,会先从磁盘里把数据读取到缓存。
  • 写入undo文件,这里是在我们事务回滚的时候用的。
  • 修改缓存数据,比如把id为1的name由张三改为李四。
  • 写入redo缓存,redo主要是为了宕机重启时,恢复数据用的。
  • 写入redo缓存后,写入到磁盘。
  • 写入binlog文件。
  • 写入binlog后,提交commit给redo,跟redo说已经写入到binlog了。
  • 定期把缓存的数据写入到磁盘。

image.png
我们对数据的更新,都是在缓存中进行的,这样可以保证性能的提高。同时为了数据的安全性,还引入了undo、redo、binlog等东西。我们可以看redo和binlog两种写入磁盘的策略。
在redo中,我们可以选择0,即不把缓存数据写入磁盘,这样可以快速执行完redo操作,如果此时宕机了,还没写入磁盘的数据就丢失了,虽然提高了性能,但是数据安全性没有了。如果选择1,由于要写入磁盘才可以完成redo操作,虽然保证了数据的安全性,但是性能却下降了。
同样的,binlog写入oscache是提高了性能,但是服务器宕机会导致oscache的数据不能及时的写入磁盘,导致数据的安全性没有。如果直接写入磁盘,性能又下降。

redis

与mysql已有,redis的复制也是异步复制的,当业务系统往master写入数据的时候,他就会通过异步复制的方式把数据同步给slave,所以和mysql类似,当业务系统认为他已经把数据写入到redis的时候,此时master挂了,但是数据还没同步到slave,他的数据就丢失了。
image.png
另外一个场景,就是发生了脑裂,也就是sentinel认为master挂了,然后重新选举了master,此时业务系统和master是正常通讯的,他把数据提交到原master节点,但是原master节点的数据此时是没办法同步到其他节点,导致数据不一致。
image.png
在这情况下,我们会做以下配置:

min-replicas-to-write 1
min-replicas-max-lag 10

这个意思是至少有1个slave已经有10秒没有同步,则master暂停接收请求。所以不会说master一直写入数据,而slave没有同步。如果发生以上两个场景,最多丢失10秒的数据。虽然没有严格的做到数据安全性,但是也保证了数据的不一致性不会超过10秒,超过10秒后,由于不能写数据,写性能下降为0。

RocketMQ

我们看看基于Dledger是怎么做broker同步的。
image.png
首先,master broker收到消息后,会把这个消息置为unconmmited状态,然后把这个消息发送给slave broker,slave broker收到消息后,会发送ack进行确认,如果有多个slave broker,当超过一半的slave broker发送ack时,master broker才会把这个消息置为committed状态。在这种机制下,保证了数据的安全性,当master broker挂了,我们还有至少超过一半的slave broker的数据是完整的,由于需要多个slave broker进行ack确认,也降低了性能。
从单体上来说,异步刷盘和同步刷盘跟mysql的redo写入磁盘的一样的。
另外,kafka的同步机制跟这个类似,而且他也有写入oacache的操作。

Zookeeper

Zookeeper是CP模型的,那他的数据安全性是可以保证的,那他的性能呢?我们假设此时master节点挂了,此时需要重新选主,这个时候Zookeeper集群是不可用状态的。那Zookeeper是如何保证数据的一致性呢?
image.png
我们假设master同步给5个slave。
当master不用确认是否已经同步给slave就直接返回,这个时候性能是最高的,但是安全性是最低的,因为master数据没同步到slave的时候挂了,那这个数据是丢失的。
当master确认已经同步给所有slave(这里是5个)才返回,这个时候,性能是最低的,但是安全性是最高的,因为不管哪个slave,他的数据都是完整的。
不管是RocketMQ还是Zookeeper,都是折中选择超过一半的slave同步,才算成功。当我们访问Zookeeper的时候,他会根据已经同步好的slave服务器让我们来读取对应的信息,这样我们读取的数据肯定都是最新的。

查看原文

赞 0 收藏 0 评论 0

大军 发布了文章 · 3月11日

spring - 图解BeanPostProcessor注册

spring学习之源码分析--AbstractApplicationContext之refresh也提过了这个接口,这里用图解的方式重新看看这个接口。需要注意的是,在registerBeanPostProcessors中仅是注册,实际调用在getBean中。
大体流程和BeanFactoryPostProcessor是一样的,只是这里多了一个MergedBeanDefinitionPostProcessor接口。

PriorityOrdered

首先是PriorityOrdered排序后,加入BeanPostProcessor的list中。这里加入的,也有是MergedBeanDefinitionPostProcessor的实现类。
image.png

Ordered

然后是Ordered排序后,加入BeanPostProcessor的list中,这里也有MergedBeanDefinitionPostProcessor的实现类。
image.png

普通

这里是普通的,排序后,加入BeanPostProcessor的list中,这里也有MergedBeanDefinitionPostProcessor的实现类。
image.png

MergedBeanDefinitionPostProcessor

最后加入MergedBeanDefinitionPostProcessor,那是不是会和之前的重复呢?
其实他每次加入beanFactory的时候,都会移除当前的PostProcessor,然后再加入。
移除的时候:
image.png
重新加入的时候:
image.png
这样,MergedBeanDefinitionPostProcessor的子类就放入了list的末尾。

查看原文

赞 0 收藏 0 评论 0

大军 发布了文章 · 3月11日

spring - 图解BeanFactoryPostProcessor

spring学习之源码分析--AbstractApplicationContext之refresh也提过了这个接口,这里用图解的方式重新看看这个接口。他这里有个方法,方法的参数是ConfigurableListableBeanFactory类型的。在这里我们可以分析和修改bean定义以及预实例化单例的工具。

@FunctionalInterface
public interface BeanFactoryPostProcessor {
    void postProcessBeanFactory(ConfigurableListableBeanFactory var1) throws BeansException;
}

另外,还有一个接口,BeanDefinitionRegistryPostProcessor,继承了BeanFactoryPostProcessor接口,我们看看他的方法的参数,是BeanDefinitionRegistry类型的,在这里我们可以注册bean、移除bean等。

public interface BeanDefinitionRegistryPostProcessor extends BeanFactoryPostProcessor {
    void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry var1) throws BeansException;
}

在refresh调用invokeBeanFactoryPostProcessors方法的时候,其实就是对上面两个接口的实现类的处理。

BeanDefinitionRegistryPostProcessor#postProcessBeanDefinitionRegistry

首先执行的是BeanDefinitionRegistryPostProcessor的postProcessBeanDefinitionRegistry方法,他的顺序如下:

  • 先调用继承了BeanDefinitionRegistryPostProcessor以及PriorityOrdered的子类,如果有多个实现了PriorityOrdered接口的类,那看order小的先执行。
  • 再调用继承了BeanDefinitionRegistryPostProcessor以及Ordered的子类,如果有多个实现了Ordered接口的类,那看order小的先执行。
  • 最后调用继承了BeanDefinitionRegistryPostProcessor的子类。

image.png

BeanDefinitionRegistryPostProcessor#postProcessBeanFactory

然后执行BeanDefinitionRegistryPostProcessor的postProcessBeanFactory方法,他的顺序同上。

BeanFactoryPostProcessor#postProcessBeanFactory

BeanDefinitionRegistryPostProcessor的子类执行完成后,这个时候才到BeanFactoryPostProcessor的子类执行postProcessBeanFactory方法,他的顺序如下:

  • 先调用继承了BeanFactoryPostProcessor以及PriorityOrdered的子类,如果有多个实现了PriorityOrdered接口的类,那看order小的先执行。
  • 再调用继承了BeanFactoryPostProcessor以及Ordered的子类,如果有多个实现了Ordered接口的类,那看order小的先执行。
  • 最后调用继承了BeanFactoryPostProcessor的子类。

image.png
以上就是BeanFactoryPostProcessor的调用过程,源码之前分析了这里就不做分析,你在实际生成中有用到这个接口吗,欢迎评论区讨论。

查看原文

赞 0 收藏 0 评论 0

大军 发布了文章 · 1月25日

hystrix系列

赞 1 收藏 1 评论 0

大军 发布了文章 · 1月25日

hystrix - hystrix怎么知道他已经熔断了

hystrix - Fallback是怎么调用的提到了allowRequest方法,我们看看这里是做了哪些。

public boolean allowRequest() {
    // 配置了强制熔断,直接返回false
    if (properties.circuitBreakerForceOpen().get()) {
        // properties have asked us to force the circuit open so we will allow NO requests
        return false;
    }
    // 强制关闭
    if (properties.circuitBreakerForceClosed().get()) {
        // we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
        isOpen();
        // properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
        return true;
    }
    return !isOpen() || allowSingleTest();
}


public boolean isOpen() {
    // 获取熔断器的状态,true就是开
    if (circuitOpen.get()) {
        // 如果是开的,还是返回ture,留给半开的时候用
        return true;
    }

    // 如果是关的,获取HealthCounts信息
    HealthCounts health = metrics.getHealthCounts();

    // 当前的请求数如果小于设置的阈值,就直接返回false,说明没开
    // 比如我们设置了20,此时才19,也是没有开的
    // 这个值是样品数量的值,只有足够数量的样品,才比较准确
    if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
        // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
        return false;
    }
    // 错误比例小于设定的值,比如错误的比例超过50%,就开启熔断
    if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
        return false;
    } else {
        // 这边设置时间,用于半开状态
        if (circuitOpen.compareAndSet(false, true)) {
            // if the previousValue was false then we want to set the currentTime
            circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
            return true;
        } else {
            // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
            // caused another thread to set it to true already even though we were in the process of doing the same
            // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
            return true;
        }
    }
}

流程图如下:image

半开状态

上面还有一个方法,allowSingleTest。这个是用于处理半开的时候。
我们上面知道,当熔断的时候,会保存一个值,circuitOpenedOrLastTestedTime。这个值加上设置的休眠时间和当前时间做比较,看看能不能允许尝试访问一次。

public boolean allowSingleTest() {
    long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();

    // 熔断器开的时候,并且当前时间>熔断时间+休眠时间
    // 也就是说休眠时间设置为5,那5秒后给他一个机会看看能不能访问,也就是半开状态
    // 然后重新设置circuitOpenedOrLastTestedTime
    if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
        if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
            return true;
        }
    }
    return false;
}

HystrixCircuitBreaker创建

allowRequest是HystrixCircuitBreaker的方法,我们稍微回顾一下command的创建,AbstractCommand中,会调用initCircuitBreaker方法。他会通过HystrixCircuitBreaker的Factory来创建的

private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                                                        HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                                                        HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    if (enabled) {
        if (fromConstructor == null) {
            // get the default implementation of HystrixCircuitBreaker
            return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
        } else {
            return fromConstructor;
        }
    } else {
        return new NoOpCircuitBreaker();
    }
}

在工厂中,他其实会通过缓存来获取的,也就是说每个CommandKey都有对应的HystrixCircuitBreaker。如果缓存没有,则创建一个放入缓存。

public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    // 从circuitBreakersByCommand缓存里取,key是CommandKey
    HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
    if (previouslyCached != null) {
        return previouslyCached;
    }

    // 缓存没有,创建一个放入缓存中
    HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
    if (cbForCommand == null) {
        // this means the putIfAbsent step just created a new one so let's retrieve and return it
        return circuitBreakersByCommand.get(key.name());
    } else {
        // this means a race occurred and while attempting to 'put' another one got there before
        // and we instead retrieved it and will now return it
        return cbForCommand;
    }
}
查看原文

赞 0 收藏 0 评论 0

大军 发布了文章 · 1月25日

hystrix - Fallback是怎么调用的

调用Fallback有以下几种情况:

  • 熔断器开启
  • 信号量拒绝
  • 线程池拒绝
  • 执行方法失败

hystrix - @EnableCircuitBreaker那些事我们知道最终会调用HystrixCommand的execute方法,他这个方法就会调用queue方法。

public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

queue方法如下,这里是拿到一个Future对象。重点还是toObservable方法。

 public Future<R> queue() {
    // 其他略
    final Future<R> delegate = toObservable().toBlocking().toFuture();
    // 其他略    
}

这里会定义Observable,他会判断是否要从缓存取值,如果没有,afterCache就取applyHystrixSemantics,所以后面就会调用applyHystrixSemantics方法。

public Observable<R> toObservable() {
    //其他略
    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                return Observable.never();
            }
            return applyHystrixSemantics(_cmd);
        }
    };
    //其他略
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
             //其他略
            final boolean requestCacheEnabled = isRequestCachingEnabled();
            final String cacheKey = getCacheKey();

            /* try from cache first */
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }

            Observable<R> hystrixObservable =
                    Observable.defer(applyHystrixSemantics)
                            .map(wrapWithAllOnNextHooks);

            Observable<R> afterCache;

            // put in cache
            if (requestCacheEnabled && cacheKey != null) {
                // wrap it for caching
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();
                }
            } else {
                afterCache = hystrixObservable;
            }

            return afterCache
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                    .doOnCompleted(fireOnCompletedHook);
        }
    });
}

熔断判断

这里是比较重要的代码,首先判断是否开启熔断,如果开启了,就调用handleShortCircuitViaFallback方法,如果没有开启熔断,他还会去判断是否能请求到信号量,请求不到就调用handleSemaphoreRejectionViaFallback方法。如果都正常,就调用executeCommandAndObserve方法。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    //其他略
    if (circuitBreaker.allowRequest()) {
        //其他略

        if (executionSemaphore.tryAcquire()) {
            try {
                /* used to track userThreadExecutionTime */
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        // 
        return handleShortCircuitViaFallback();
    }
}

熔断开启

我们先看看熔断开启后的调用,会通过getFallbackObservable方法获取fallbackExecutionChain,getFallbackObservable主要的作用就是调用getFallback方法。

private Observable<R> handleShortCircuitViaFallback() {
    // 其他略
    return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
                "short-circuited", shortCircuitException);
    // 其他略
}

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    // 其他略
    fallbackExecutionChain = getFallbackObservable();
    // 其他略            

    return fallbackExecutionChain
            .doOnEach(setRequestContext)
            .lift(new FallbackHookApplication(_cmd))
            .lift(new DeprecatedOnFallbackHookApplication(_cmd))
            .doOnNext(markFallbackEmit)
            .doOnCompleted(markFallbackCompleted)
            .onErrorResumeNext(handleFallbackError)
            .doOnTerminate(singleSemaphoreRelease)
            .doOnUnsubscribe(singleSemaphoreRelease);
    
    }
    // 其他略
}

@Override
final protected Observable<R> getFallbackObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                return Observable.just(getFallback());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    });
}

getFallback是调用fallback方法的入口,MethodExecutionAction会通过发射,调用我们配置的方法。

@Override
protected Object getFallback() {
    final CommandAction commandAction = getFallbackAction();
    if (commandAction != null) {
        try {
            return process(new Action() {
                @Override
                Object execute() {
                    MetaHolder metaHolder = commandAction.getMetaHolder();
                    Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                    return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                }
            });
        } catch (Throwable e) {
            LOGGER.error(FallbackErrorMessageBuilder.create()
                    .append(commandAction, e).build());
            throw new FallbackInvocationException(unwrapCause(e));
        }
    } else {
        return super.getFallback();
    }
}

信号量隔离

我们看了上面熔断开启的代码,这边就比较简单,就是跟上面一样,调用getFallbackOrThrowException方法。

private Observable<R> handleSemaphoreRejectionViaFallback() {
    // 其他略
    return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
            "could not acquire a semaphore for execution", semaphoreRejectionException);
}

线程池隔离

如果熔断没开,信号量又可以获取到,他就会调用executeCommandAndObserve方法。这个方法,handleFallback定义了几个异常,比如线程池异常处理,时间超时处理,请求异常处理,以及其他异常处理。然后调用executeCommandWithSpecifiedIsolation方法。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    //其他略
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
            Exception e = getExceptionFromThrowable(t);
            executionResult = executionResult.setExecutionException(e);
            if (e instanceof RejectedExecutionException) {
                return handleThreadPoolRejectionViaFallback(e);
            } else if (t instanceof HystrixTimeoutException) {
                return handleTimeoutViaFallback();
            } else if (t instanceof HystrixBadRequestException) {
                return handleBadRequestByEmittingError(e);
            } else {
                /*
                 * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                 */
                if (e instanceof HystrixBadRequestException) {
                    eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                    return Observable.error(e);
                }

                return handleFailureViaFallback(e);
            }
        }
    };
 
    //其他略
    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

这个方法主要是两件事情,一个是subscribeOn方法,这里是线程池隔离用的,另外一个就是正常情况下的调用,调用的是getUserExecutionObservable方法,这个方法在线程池后面讲。

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                // 其他略
                return getUserExecutionObservable(_cmd);
                // 其他略    
            }
        })// 其他略    
        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } 
    // 其他略
}

创建相关类

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    // 动态配置线程池信息
    touchConfig();
    // 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

这里创建一个HystrixContextScheduler对象,ThreadPoolScheduler对象用于创建ThreadPoolWorker,并赋值给HystrixContextSchedulerWorker

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    // 动态配置线程池信息
    touchConfig();
    // 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

public Worker createWorker() {
    return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}

线程池判断的地方在HystrixContextSchedulerWorker的schedule方法,不够就抛RejectedExecutionException异常,异常的捕获上面已经讲了。

public Subscription schedule(Action0 action) {
    if (threadPool != null) {
        // 判断线程池线程的地方
        if (!threadPool.isQueueSpaceAvailable()) {
            throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
        }
    }
    return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}

正常调用

如果上面都正常,就会调用getUserExecutionObservable方法。这个方法最后会调用run方法。

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    //其他略
    userObservable = getExecutionObservable();
    //其他略
}

@Override
final protected Observable<R> getExecutionObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                return Observable.just(run());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    })//其他略
}

在这里,同样通过反射,调用到我们的方法。

protected Object run() throws Exception {
    LOGGER.debug("execute command: {}", getCommandKey().name());
    return process(new Action() {
        @Override
        Object execute() {
            return getCommandAction().execute(getExecutionType());
        }
    });
}

总结

上面讲了几个fallback调用的方法,以及正常的流程,流程图如下
image

查看原文

赞 0 收藏 0 评论 0

大军 发布了文章 · 1月25日

hystrix - @EnableCircuitBreaker那些事

@EnableCircuitBreaker

我们使用hystrix的时候,都会在application上使用@EnableCircuitBreaker注解,我们看看这个主键到底做了什么事情。
他import了EnableCircuitBreakerImportSelector,看到ImportSelector我们应该会联想到,他后面可能有某些bean会被注入到IOC容器中。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {

}

我们跟着看EnableCircuitBreakerImportSelector,这里会有isEnabled方法,他的实现类SpringFactoryImportSelector我这里就不说了。他的意思是,如果返回true的时候,会从spring.factories中读取org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker对应的HystrixCircuitBreakerConfiguration,最后这个HystrixCircuitBreakerConfiguration就会被实例化。这个类被加载后,他会紧接着加载HystrixCommandAspect,一看到Aspect,就知道AOP吧。我们看看他用这个干吗。

public class EnableCircuitBreakerImportSelector
        extends SpringFactoryImportSelector<EnableCircuitBreaker> {
    @Override
    protected boolean isEnabled() {
        return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
                Boolean.class, Boolean.TRUE);
    }
}

HystrixCommandAspect

他这里定义了一个@Pointcut,我们平常用hystrix的时候,就是会加HystrixCommand吧,现在知道为什么要加这个注解了吧,他会通过AOP拦截代理相应方法。

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}

HystrixCommand注解有多个配置的参数,比如:

  • groupKey:命令分组键,可以根据这个分组,统计他的各个指标,默认类名。
  • commandKey:命令键,默认方法名。
  • threadPoolKey:线程池名,如果没有设置,默认为groupKey。
  • fallbackMethod:服务降级调用的方法。
  • threadPoolProperties:配置线程池用的。

当然还有其他,就不一一列了。
下面我们看看方法,这里主要是通过我们配置的信息,生成一个GenericCommand对象,通过这个对象调用execute方法。

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
    Method method = getMethodFromTarget(joinPoint);
    Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
    if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
        throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                "annotations at the same time");
    }
    // 根据注解获取CommandMetaHolderFactory还是CollapserMetaHolderFactory
    MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
    // 通过配置信息生成元数据
    MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
    // 创建GenericCommand,会把元数据的信息赋值到GenericCommand的属性中。
    // 除此,线程池、commandActions也是这里
    HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
    ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
            metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

    Object result;
    try {
        if (!metaHolder.isObservable()) {
            // 这里会把GenericCommand强制转为HystrixExecutable,然后调用execute方法
            result = CommandExecutor.execute(invokable, executionType, metaHolder);
        } else {
            result = executeObservable(invokable, executionType, metaHolder);
        }
    } catch (HystrixBadRequestException e) {
        throw e.getCause();
    } catch (HystrixRuntimeException e) {
        throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
    }
    return result;
}

把GenericCommand转为HystrixExecutable对象。

public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
    // 其他略
    return castToExecutable(invokable, executionType).execute();
    // 其他略    
}

private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
    if (invokable instanceof HystrixExecutable) {
        return (HystrixExecutable) invokable;
    }
    // 其他略    
}

总结

@EnableCircuitBreaker主要是为了加载HystrixCircuitBreakerConfiguration以及HystrixCommandAspect,通过AOP代理我们的方法,熔断、限流,就是在这里开始的。
image

查看原文

赞 0 收藏 0 评论 0

大军 发布了文章 · 1月13日

gateway系列

赞 1 收藏 1 评论 0

大军 发布了文章 · 1月13日

gateway - 调用

RoutePredicateHandlerMapping#getHandlerInternal

webflux的入口是DispatcherHandler,他这里会调用HandlerMapping的getHandler方法。AbstractHandlerMapping实现了HandlerMapping接口,他有个抽象方法getHandlerInternal需要子类实现。
RoutePredicateHandlerMapping继承了AbstractHandlerMapping,所以我们的重点就是他的getHandlerInternal方法。这里面就是用来处理是否有对应的Route。
这个方法,其实主要调用lookupRoute方法。

protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
    // don't handle requests on management port if set and different than server port
    if (this.managementPortType == DIFFERENT && this.managementPort != null
            && exchange.getRequest().getURI().getPort() == this.managementPort) {
        return Mono.empty();
    }
    exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

    return lookupRoute(exchange)
            // .log("route-predicate-handler-mapping", Level.FINER) //name this
            .flatMap((Function<Route, Mono<?>>) r -> {
                exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                if (logger.isDebugEnabled()) {
                    logger.debug(
                            "Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                }

                exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                return Mono.just(webHandler);
            }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                if (logger.isTraceEnabled()) {
                    logger.trace("No RouteDefinition found for ["
                            + getExchangeDesc(exchange) + "]");
                }
            })));
}

RoutePredicateHandlerMapping#lookupRoute

我们可以看到他有个r.getPredicate().apply这样的代码,他这个apply最终会调用每个Predicate的test方法,返回false或者true。

protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
    return this.routeLocator.getRoutes()
            // individually filter routes so that filterWhen error delaying is not a
            // problem
            .concatMap(route -> Mono.just(route).filterWhen(r -> {
                // add the current route we are testing
                exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                return r.getPredicate().apply(exchange);
            })
    // 其他略        
}

DefaultAsyncPredicate#apply

在这里,会把每个Predicate的test方法调用过去。

public Publisher<Boolean> apply(T t) {
    return Mono.just(delegate.test(t));
}

FilteringWebHandler#handle

我们已经从上面的方法中拿到了一个Route(没有就不继续了),那就开始调用Filter。
DispatcherHandler在调用完handlerMappings后,会调用他的invokeHandler方法。

private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
    if (this.handlerAdapters != null) {
        for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
            if (handlerAdapter.supports(handler)) {
                return handlerAdapter.handle(exchange, handler);
            }
        }
    }
    return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}

然后会调用webHandler.handle方法。

public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
    WebHandler webHandler = (WebHandler) handler;
    Mono<Void> mono = webHandler.handle(exchange);
    return mono.then(Mono.empty());
}

我们的FilteringWebHandler就是一个WebHandler,我们看看他的handle方法。
他会先获取对应Route的Filters,再和通用的Filters合并排序,最后开始Filters的调用链。

public Mono<Void> handle(ServerWebExchange exchange) {
    // 获取Route
    Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
    // 取出Filters
    List<GatewayFilter> gatewayFilters = route.getFilters();
    // 加入通用的Filters
    List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
    combined.addAll(gatewayFilters);
    // TODO: needed or cached?
    // 排序
    AnnotationAwareOrderComparator.sort(combined);

    if (logger.isDebugEnabled()) {
        logger.debug("Sorted gatewayFilterFactories: " + combined);
    }
    // 调用链
    return new DefaultGatewayFilterChain(combined).filter(exchange);
}

整体流程

这边忽略了DispatcherHandler部分
image

查看原文

赞 0 收藏 0 评论 0