java金融

java金融 查看完整档案

上海编辑  |  填写毕业院校  |  填写所在公司/组织 javajr.cn 编辑
编辑

欢迎关注公众号【java金融】,作者一线互联网工作,专注 Java 领域相关技术:java基础、 Java SE、Java 并发、JVM、分布式、中间件、微服务、Spring、mysql、oracle等技术。

个人动态

java金融 发布了文章 · 4月2日

阿里一面:Spring和SpringMvc父子容器你能说清楚吗

以前写了几篇关于SpringBoot的文章《面试高频题:springBoot自动装配的原理你能说出来吗》《保姆级教程,手把手教你实现一个SpringBoot的starter》,这几天突然有个读者问:能说一说Spring的父子容器吗?说实话这其实也是Spring八股文里面一个比较常见的问题。在我的印象里面Spring就是父容器,SpringMvc就是子容器,子容器可以访问父容器的内容,父容器不能访问子容器的东西。有点类似java里面的继承的味道,子类可以继承父类共有方法和变量,可以访问它们,父类不可以访问子类的方法和变量。在这里就会衍生出几个比较经典的问题:

  • 为什么需要父子容器?
  • 是否可以把所有类都通过Spring容器来管理?(SpringapplicationContext.xml中配置全局扫描)
  • 是否可以把我们所需的类都放入Spring-mvc子容器里面来管理(springmvcspring-servlet.xml中配置全局扫描)?
  • 同时通过两个容器同时来管理所有的类?如果能够把上面这四个问题可以说个所以然来,个人觉得Spring的父子容器应该问题不大了。我们可以看下官网提供的父子容器的图片上图中显示了2个WebApplicationContext实例,为了进行区分,分别称之为:Servlet WebApplicationContext(子容器)、Root WebApplicationContext(父容器)。
  • Servlet WebApplicationContext:这是对J2EE三层架构中的web层进行配置,如控制器(controller)、视图解析器(view resolvers)等相关的bean。通过spring mvc中提供的DispatchServlet来加载配置,通常情况下,配置文件的名称为spring-servlet.xml。
  • Root WebApplicationContext:这是对J2EE三层架构中的service层、dao层进行配置,如业务bean,数据源(DataSource)等。通常情况下,配置文件的名称为applicationContext.xml。在web应用中,其一般通过ContextLoaderListener来加载。

Spring的启动

要想很好的理解它们之间的关系,我们就有必要先弄清楚Spring的启动流程。要弄清楚这个启动流程我们就需要搭建一个SpringMvc项目,说句实话,用惯了SpringBooot开箱即用,突然在回过头来搭建一个SpringMvc项目还真有点不习惯,一大堆的配置文件。(虽然也可以用注解来实现)具体怎么搭建SpringMvc项目这个就不介绍了,搭建好项目我们运行起来可以看到控制台会输出如下日志:日志里面分别打印出了父容器和子容器分别的一个耗时。

如何验证是有两个容器?

我们只需要Controller与我们的Service中实现ApplicationContextAware接口,就可以得知对应的管理容器:在Service所属的父容器里面我们可以看到父容器对应的对象是XmlWebApplicationContext@3972Controller中对应的容器对象是XmlWebApplicationContext@4114由此可见它们是两个不同的容器。

源码分析

我们知道SpringServletContainerInitializerservlet 3.0 开始,Tomcat 启动时会自动加载实现了 ServletContainerInitializer
接口的类(需要在 META-INF/services 目录下新建配置文件)也称为 SPI(Service Provider Interface) 机制,SPI的应用还是挺广的比如我们的JDBC、还有Dubbo框架里面都有用到,如果还有不是很了解SPI机制的 可以去学习下。所以我们的入口就是SpringServletContainerInitializeronStartup方法,这也应该是web容器启动调用Spring相关的第一个方法。

初始化SpringIoc

如果实在找不到入口的话,我们可以 根据控制台打印的日志,然后拿着日志进行反向查找这应该总能找到开始加载父容器的地方。启动的时候控制台应该会打印出“Root WebApplicationContext: initialization started” 我们拿着这个日志就能定位到代码了

`public WebApplicationContext initWebApplicationContext(ServletContext servletContext) {`
 `if (servletContext.getAttribute(WebApplicationContext.ROOT_WEB_APPLICATION_CONTEXT_ATTRIBUTE) != null) {`
 `throw new IllegalStateException(`
 `"Cannot initialize context because there is already a root application context present - " +`
 `"check whether you have multiple ContextLoader* definitions in your web.xml!");`
 `}`
 `servletContext.log("Initializing Spring root WebApplicationContext");`
 `Log logger = LogFactory.getLog(ContextLoader.class);`
 `if (logger.isInfoEnabled()) {`
 `logger.info("Root WebApplicationContext: initialization started");`
 `}`
 `long startTime = System.currentTimeMillis();`
 `try {`
 `// Store context in local instance variable, to guarantee that`
 `// it is available on ServletContext shutdown.`
 `if (this.context == null) {`
 `// 通过反射去创建context` 
 `this.context = createWebApplicationContext(servletContext);`
 `}`
 `if (this.context instanceof ConfigurableWebApplicationContext) {`
 `ConfigurableWebApplicationContext cwac = (ConfigurableWebApplicationContext) this.context;`
 `if (!cwac.isActive()) {`
 `// The context has not yet been refreshed -> provide services such as`
 `// setting the parent context, setting the application context id, etc`
 `if (cwac.getParent() == null) {`
 `// The context instance was injected without an explicit parent ->`
 `// determine parent for root web application context, if any.`
 `ApplicationContext parent = loadParentContext(servletContext);`
 `cwac.setParent(parent);`
 `}`
 `// IOC容器初始化`
 `configureAndRefreshWebApplicationContext(cwac, servletContext);`
 `}`
 `}`
 `servletContext.setAttribute(WebApplicationContext.ROOT_WEB_APPLICATION_CONTEXT_ATTRIBUTE, this.context);`
 `ClassLoader ccl = Thread.currentThread().getContextClassLoader();`
 `if (ccl == ContextLoader.class.getClassLoader()) {`
 `currentContext = this.context;`
 `}`
 `else if (ccl != null) {`
 `currentContextPerThread.put(ccl, this.context);`
 `}`
 `if (logger.isInfoEnabled()) {`
 `long elapsedTime = System.currentTimeMillis() - startTime;`
 `logger.info("Root WebApplicationContext initialized in " + elapsedTime + " ms");`
 `}`
 `return this.context;`
 `}`
 `catch (RuntimeException | Error ex) {`
 `logger.error("Context initialization failed", ex);`
 `servletContext.setAttribute(WebApplicationContext.ROOT_WEB_APPLICATION_CONTEXT_ATTRIBUTE, ex);`
 `throw ex;`
 `}`
 `}`

这段代码就是创建父容器的地方。

初始化 Spring MVC

接着我们再来看看创建子容器的地方:在FrameworkServlet上述代码是不是会有个疑问我们怎么就会执行FrameworkServletinitServletBean方法。这是由于我们在web.xml 里面配置了DispatcherServlet,然后web容器就会去调用DispatcherServletinit方法,并且这个方法只会被执行一次。通过init方法就会去执行到initWebApplicationContext这个方法了,这就是web子容器的一个启动执行顺序。

`<servlet>`
 `<servlet-name>dispatcher</servlet-name>`
 `<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>`
 `// 如果不配置这个load-on-startup 1 不会再项目启动的时候执行inti方法。而是首次访问再启动`
 `<load-on-startup>1</load-on-startup>`
 `</servlet>`

大概流程如下:从上述代码我们可以发现子容器是自己重新通过反射new了一个新的容器作为子容器, 并且设置自己的父容器为Spring 初始化创建的WebApplicationContext。然后就是去加载我们在web.xml 里面配置的Springmvc 的配置文件,然后通过创建的子容器去执行refresh方法,这个方法我相信很多人应该都比较清楚了。

问题解答

我们知道了Sping父容器以及SpingMvc子容器的一个启动过程,以及每个容器都分别干了什么事情现在再回过头来看看上述四个问题。

  • 为什么需要父子容器?父子容器的主要作用应该是划分框架边界。有点单一职责的味道。在J2EE三层架构中,在service层我们一般使用spring框架来管理, 而在web层则有多种选择,如spring mvc、struts等。因此,通常对于web层我们会使用单独的配置文件。例如在上面的案例中,一开始我们使用spring-servlet.xml来配置web层,使用applicationContext.xml来配置servicedao层。如果现在我们想把web层从spring mvc替换成struts,那么只需要将spring-servlet.xml替换成Struts的配置文件struts.xml即可,而applicationContext.xml不需要改变。
  • 是否可以把所有类都通过Spring父容器来管理?(Spring的applicationContext.xml中配置全局扫描)所有的类都通过父容器来管理的配置就是如下:
`<context:component-scan  use-default-filters="false"  base-package="cn.javajr">`
 `<context:include-filter type="annotation" expression="org.springframework.stereotype.Service" />`
 `<context:include-filter type="annotation" expression="org.springframework.stereotype.Component" />`
 `<context:include-filter type="annotation" expression="org.springframework.stereotype.Repository" />`
 `<context:include-filter type="annotation" expression="org.springframework.stereotype.Controller" />`
 `</context:component-scan>`

然后在SpringMvc的配置里面不配置扫描包路径。很显然这种方式是行不通的,这样会导致我们请求接口的时候产生404。因为在解析@ReqestMapping注解的过程中initHandlerMethods()函数只是对Spring MVC 容器中的bean进行处理的,并没有去查找父容器的bean, 因此不会对父容器中含有@RequestMapping注解的函数进行处理,更不会生成相应的handler。所以当请求过来时找不到处理的handler,导致404。

  • 是否可以把我们所需的类都放入Spring-mvc子容器里面来管理(springmvc的spring-servlet.xml中配置全局扫描)?这个是把包的扫描配置spring-servlet.xml中这个是可行的。为什么可行因为无非就是把所有的东西全部交给子容器来管理了,子容器执行了refresh方法,把在它的配置文件里面的东西全部加载管理起来来了。虽然可以这么做不过一般应该是不推荐这么去做的,一般人也不会这么干的。如果你的项目里有用到事物、或者aop记得也需要把这部分配置需要放到Spring-mvc子容器的配置文件来,不然一部分内容在子容器和一部分内容在父容器,可能就会导致你的事物或者AOP不生效。(这里不就有个经典的八股文吗?你有遇到事物不起作用的时候,其实这也是一种情况)
  • 同时通过两个容器同时来管理所有的类?这个问题应该是比较好回答了,肯定不会通过这种方式来的,先不说会不会引发其他问题,首先两个容器里面都放一份一样的对象,造成了内存浪费。再者的话子容器会覆盖父容器加载,本来可能父容器配置了事物生成的是代理对象,但是被子容器一覆盖,又成了原生对象。这就导致了你的事物不起作用了。在补充一个问题:SpringBoot 里面是否还有父子容器?我们下篇再见!

总结

  • 其实父子容器对于程序员来说是无感的,是一个并没有什么用的知识点,都是Spring帮我们处理了,但是我们还是需要知道有这么个东西,不然我们有可能遇到问题的时候可能不知道如何下手。比如为啥我这个事物不起作用了,我这个aop怎么也不行了,网上都是这么配置的。

结束

  • 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
  • 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
  • 感谢您的阅读,十分欢迎并感谢您的关注。

往期精选

*推荐👍 :Java高并发编程基础三大利器之CyclicBarrier*

推荐👍 :Java高并发编程基础三大利器之CountDownLatch

推荐👍 :Java高并发编程基础三大利器之Semaphore

推荐👍 :Java高并发编程基础之AQS

推荐👍 :可恶的爬虫直接把生产6台机器爬挂了!

查看原文

赞 0 收藏 0 评论 0

java金融 发布了文章 · 3月11日

Java高并发编程基础三大利器之CountDownLatch

# 引言
上一篇文章我们介绍了AQS的信号量Semaphore《Java高并发编程基础三大利器之Semaphore》,接下来应该轮到CountDownLatch了。

什么是CountDownLatch

CountDownLatch是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就减1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上(调用await方法的线程)等待的线程就可以恢复工作了。

应用场景

CountDownLatch可以用来干什么呢?有什么应用场景?实际项目中有应用的场景吗?这应该才是大家比较关心的。我们先来看看官网提供的例子是如何进行应用的https://docs.oracle.com/javas...
官方提供了两个demo我直接把它转成了图片顺带推荐下这个代码转图片的网址https://www.dute.org/code-sna... 还挺好用的。

官网demo1

The first is a start signal that prevents any worker from proceeding until the driver is ready for them to proceed;
The second is a completion signal that allows the driver to wait until all workers have completed.
  • 第一个开始信号(startSignal)会阻止任何工人(worker )开始工作,在司机到来之前。说白了就是司机没来工人就不能干活。
  • 第二个是完成信号 (doneSignal),允许司机 Driver 等待,直到所有的工人完成.说白了就是司机要等到所有工人完工为止。

在这里插入图片描述

官网demo2

Another typical usage would be to divide a problem into N parts, describe each part with a Runnable that executes that portion and counts down on the latch, and queue all the Runnables to an Executor. When all sub-parts are complete, the coordinating thread will be able to pass through await.

另一种典型的用法就是把一个大任务拆分N个部分,让多个线程(Worker)执行,每个线程(Worker)执行完自己的部分计数器就减1,当所有子部分都完成后,Driver 才继续向下执行才继续执行。
就好比富士康手机加工的流水线一样,组装一步手机需要一条条的流水线来相互配合完成。一条条流水线(Worker),每条线都干自己的活。有的流水线是贴膜的,有的流水线是打螺丝的,有的流水线是质检的、有的流水线充电的、有的流水线贴膜的。等这些流水线都干完了就把一部手机组装完成了。

在这里插入图片描述
上面两个就是官方提供的demo,下面我再来两个我们平时开发中可以用到的栗子:

多个线程等待:模拟并发,让并发线程一起执行。

有时候我们写了接口想去压测下它,看看它的最大并发数大概是多少。当然我们可以使用Jmeter来进行压测,但是有时候我们不想去下载工具,其实就可以借助CountDownLatch来实现。

/**
 * @author: 公众号:java金融
 */
public class TestCountDownLatch1 {
     public static void main(String[] args) throws InterruptedException {
          CountDownLatch countDownLatch = new CountDownLatch(1);
          for (int i = 0; i < 5; i++) {
               new Thread(() -> {
                    try {
                         //所有请求都阻塞在这,等待
                         countDownLatch.await();
                         // 调用测试接口
                         System.out.println(Thread.currentThread().getName() + "开始执行……");
                    } catch (InterruptedException e) {
                         e.printStackTrace();
                    }
               }).start();
          }
          // 让请求都准备好
          Thread.sleep(2000);
          // 让所有请求统一请求
          countDownLatch.countDown();
     }
}

我们通过CountDownLatch.await(),让多个参与者线程启动后阻塞等待,然后在主线程 调用CountDownLatch.countdown() 将计数减为0,让所有线程一起往下执行;以此实现了多个线程在同一时刻并发执行,来模拟并发请求的目的。

单个线程等待:多个线程(任务)完成后,进行汇总合并

/**
 * @author: 公众号:java金融
 */
public class TestCountDownLatch1 {
     public static void main(String[] args) throws InterruptedException {
          int count = 3;
          CountDownLatch countDownLatch = new CountDownLatch(count);
          for (int i = 0; i < count; i++) {
               final int index = i;
               new Thread(() -> {
                    try {
                         Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000));
                         System.out.println("finish" + index + Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                         e.printStackTrace();
                    }finally{
                        countDownLatch.countDown();
                    }
               }).start();
          }
          countDownLatch.await();// 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。
          System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
     }
}

这种场景应该是用的最多了,比如我们打开一个电商的个人中心页面,我们需要调用,用户信息接口、用户订单接口、用户会员信息等接口,然后合并后一起给到前端,假设每个接口最长耗时为1s,如果我们同步调用的话最大耗时时间是3s,如果我们采用异步调用然后合并结果,所以最大的耗时时间是3s。每个接口调用返回数据后调用countDown方法,让计数器进行减1,当把计数器减为0时的这个线程会去唤醒主线程,让它继续往下走。

CountDownLatch 实现原理

CountDownLatch是通过AQSstate字段来实现的一个计数器,计数器的初始值(state的值)为new CountDownLatch设置的数量,每次调用countDown的时候,state的值会进行减1,最后某个线程将state值减为0时,会把调用了await()进行阻塞等待的线程进行唤醒。CountDownLatch重写了tryReleaseShared这个方法,只有当state这个字段被设置为0时,也就是tryReleaseShared返回true的情况就会执行doReleaseShared方法,把调用了await的线程进行唤醒。

  public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
 protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

CountDownLatch的其他源码就不进行分析了,
相信看了这两篇文章《Java高并发编程基础之AQS》《Java高并发编程基础三大利器之Semaphore》再来看这个还是比较轻松的。

总结

  • CountDownLatch不能重新初始化或者修改CountDownLatch内部计数器的值。
  • CountDownLatchSemaphore在使用AQS的方式上很相似,在同步状态中都是保存的是当前的计数值。
  • CountDownLatch的作用就是允许一个或多个线程等待其他线程完成操作,看起来有点类似join() 方法,但其提供了比 join() 更加灵活的API。
  • CountDownLatch可以手动控制在n个线程里调用ncountDown()方法使计数器进行减一操作,也可以在一个线程里调用n次执行减一操作。
  • join() 的实现原理是不停检查join线程是否存活,如果 join 线程存活则让当前线程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活。

结束

  • 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
  • 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
  • 感谢您的阅读,十分欢迎并感谢您的关注。

站在巨人的肩膀上摘苹果:
https://javajr.cn/
https://zhuanlan.zhihu.com/p/...

查看原文

赞 0 收藏 0 评论 0

java金融 发布了文章 · 3月4日

Java高并发编程基础三大利器之Semaphore

引言

最近可以进行个税申报了,还没有申报的同学可以赶紧去试试哦。不过我反正是从上午到下午一直都没有成功的进行申报,一进行申报
就返回“当前访问人数过多,请稍后再试”。为什么有些人就能够申报成功,有些人就直接返回失败。这很明显申报处理资源是有限的,
只能等别人处理完了在来处理你的,你如果运气好可能重试几次就轮到你了,如果运气不好可能重试一天也可能轮不到你。
我反正已经是放弃了,等到夜深人静的时候再来试试。作为一个程序员我们肯定知道这是个税申请app的限流操作,如果还有不懂什么
是限流操作的可以参考下这个文章《高并发系统三大利器之限流》
比如个税申报系统每台机器只最多分别只能处理1000个请求,再多的请求就会把机器打挂。如果是多余的请求就把这些请求拒绝掉。直接给你返回一句温馨提示:“当前访问人数过多,请稍后再试”,如果要实现这个功能大家想想可以通过哪些方法算法来实现。

共享锁、独占锁

学习semaphore之前我们必须要先了解下什么是共享锁。在上一篇文章《Java高并发编程基础之AQS》我们介绍了公平锁于非公平锁的区别。

  • 共享锁:它是允许多个线程同时获取锁,并发的访问共享资源
  • 独占锁:也有人把它叫做“独享锁”,它是是独占的,排他的,只能被一个线程可持有,

当独占锁已经被某个线程持有时,其他线程只能等待它被释放后,才能去争锁,并且同一时刻只有一个线程能争锁成功。

什么是Semaphore

在《Java并发编程艺术》(微信搜【java金融】回复电子书可以免费获取PDF版本)这一书中是这么说的:

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。
  • Semaphore机制是提供给线程抢占式获取许可,所以他可以实现公平或者非公平,类似于ReentrantLock

说了这么多我们来个实际的例子看一看,比如我们去停车场停车,停车场总共只有5个车位,但是现在有8辆汽车来停车,剩下的3辆汽车要么等其他汽车开走后进行停车,或者去找别的停车位?

/**
 * @author: 公众号【Java金融】
 */
public class SemaphoreTest {
    public static void main(String[] args) throws InterruptedException {
         // 初始化五个车位
        Semaphore semaphore = new Semaphore(5);
        // 等所有车子
        final CountDownLatch latch = new CountDownLatch(8);
        for (int i = 0; i < 8; i++) {
            int finalI = i;
            if (i == 5) {
                Thread.sleep(1000);
                new Thread(() -> {
                    stopCarNotWait(semaphore, finalI);
                    latch.countDown();
                }).start();
                continue;
            }
            new Thread(() -> {
                stopCarWait(semaphore, finalI);
                latch.countDown();
            }).start();
        }
        latch.await();
        log("总共还剩:" + semaphore.availablePermits() + "个车位");
    }

    private static void stopCarWait(Semaphore semaphore, int finalI) {
        String format = String.format("车牌号%d", finalI);
        try {
            semaphore.acquire(1);
            log(format + "找到车位了,去停车了");
            Thread.sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            semaphore.release(1);
            log(format + "开走了");
        }
    }

    private static void stopCarNotWait(Semaphore semaphore, int finalI) {
         String format = String.format("车牌号%d", finalI);
        try {
            if (semaphore.tryAcquire()) {
                log(format + "找到车位了,去停车了");
                Thread.sleep(10000);
                log(format + "开走了");
                semaphore.release();
            } else {
                log(format + "没有停车位了,不在这里等了去其他地方停车去了");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void log(String content) {
        // 格式化
        DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        // 当前时间
        LocalDateTime now = LocalDateTime.now();
        System.out.println(now.format(fmTime) + "  "+content);
    }
}
2021-03-01 18:54:57  车牌号0找到车位了,去停车了
2021-03-01 18:54:57  车牌号3找到车位了,去停车了
2021-03-01 18:54:57  车牌号2找到车位了,去停车了
2021-03-01 18:54:57  车牌号1找到车位了,去停车了
2021-03-01 18:54:57  车牌号4找到车位了,去停车了
2021-03-01 18:54:58  车牌号5没有停车位了,不在这里等了去其他地方停车去了
2021-03-01 18:55:07  车牌号7找到车位了,去停车了
2021-03-01 18:55:07  车牌号6找到车位了,去停车了
2021-03-01 18:55:07  车牌号2开走了
2021-03-01 18:55:07  车牌号0开走了
2021-03-01 18:55:07  车牌号3开走了
2021-03-01 18:55:07  车牌号4开走了
2021-03-01 18:55:07  车牌号1开走了
2021-03-01 18:55:17  车牌号7开走了
2021-03-01 18:55:17  车牌号6开走了
2021-03-01 18:55:17  总共还剩:5个车位

从输出结果我们可以看到车牌号5这辆车看见没有车位了,就不在这个地方傻傻的等了,而是去其他地方了,但是车牌号6车牌号7分别需要等到车库开出两辆车空出两个车位后才停进去。这就体现了Semaphoreacquire 方法如果没有获取到凭证它就会阻塞,而tryAcquire方法如果没有获取到凭证不会阻塞的。

semaphore在dubbo中的应用

Dubbo中可以给Provider配置线程池大小来控制系统提供服务的最大并行度,默认是200

<dubbo:provider  threads="200"/>

比如我现在这个订单系统有三个接口,分别为创单、取消订单、修改订单。这三个接口加起来的并发是200但是创单接口是核心接口,我想让它多分点线程来执行
让它可以有最大150个线程,取消订单和修改订单分别最大25个线程执行就可以了。dubbo提供了executes这一属性来实现这个功能

<dubbo:service interface="cn.javajr.service.CreateOrderService" executes="150"/>
<dubbo:service interface="cn.javajr.service.CancelOrderService" executes="25"/>
<dubbo:service interface="cn.javajr.service.EditOrderService" executes="25"/>

我们可以看看dubbo内部是如何来executes的,具体实现是在ExecuteLimitFilter这个类我们可以

 public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
        if (max > 0) {
            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
            // 如果当前使用的线程数量已经大于等于设置的阈值,那么直接抛出异常
//            if (count.getActive() >= max) {
// throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service // using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            /**
             * http://manzhizhen.iteye.com/blog/2386408
             * use semaphore for concurrency control (to limit thread number)
             */
             
            executesLimit = count.getSemaphore(max);
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            }
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        // 计数器+1
        RpcStatus.beginCount(url, methodName);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (Throwable t) {
            isSuccess = false;
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        } finally {
           // 计数器-1
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if(acquireResult) {
                executesLimit.release();
            }
        }
    }

从上述代码我们也可以看出早期这个是没有采用Semaphore来实现的,而是直接采用被注释的 if (count.getActive() >= max) 这个来来实现的,由于这个count.getActive() >= max 和这个计数加1不是原子性的,所以会有问题,具体bug号可以看https://github.com/apache/dub...Semaphore来修复非原子性问题。具体更详细的分析可以参见代码的链接。不过现在最新版本(2.7.9)我看是采用采用自旋加上和CAS来实现的。

Semaphore

上面就是对Semaphore一个简单的使用以及dubbo中用到的例子,说句实话Semaphore在工作中用的还是比较少的,不过面试又有可能会被问到,所以还是有必要来一起学习一下它。我们前面《Java高并发编程基础之AQS》通过ReentrantLock 一起学习了下AQS,其实Semaphore同样也是通过AQS来是实现的,我们可以一起来对照下独占锁的方法,基本上都是有方法一一相对应的。
在这里插入图片描述
这里有两点稍微需要注意的地方:

  • 在独占锁模式中,我们只有在获取了独占锁的节点释放锁时,才会唤醒后继节点,因为独占锁只能被一个线程持有,如果它还没有被释放,就没有必要去唤醒它的后继节点。
  • 在共享锁模式下,当一个节点获取到了共享锁,我们在获取成功后就可以唤醒后继节点了,而不需要等到该节点释放锁的时候,这是因为共享锁可以被多个线程同时持有,一个锁获取到了,则后继的节点都可以直接来获取。因此,在共享锁模式下,在获取锁和释放锁结束时,都会唤醒后继节点。

获取凭证

我们同样还是通过非公平锁的模式来老获取凭证
我们可以看下acquire的核心方法

  public final void acquireSharedInterruptibly(int arg)
           throws InterruptedException {
       if (Thread.interrupted())
           throw new InterruptedException();
       if (tryAcquireShared(arg) < 0)
           doAcquireSharedInterruptibly(arg);
   }
    protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
   }
    
    // 主要看下这个方法,这个方法返回的值也就是tryAcquireShared返回的值,因为tryAcquireShared->nonfairTryAcquireShared
    final int nonfairTryAcquireShared(int acquires) {
          //自旋
          for (;;) {
               //Semaphore用AQS的state变量的值代表可用许可数
               int available = getState();
               //可用许可数减去本次需要获取的许可数即为剩余许可数
               int remaining = available - acquires;
               //如果剩余许可数小于0或者CAS将当前可用许可数设置为剩余许可数成功,则返回成功许可数
               if (remaining < 0 ||
                   compareAndSetState(available, remaining))
                   return remaining;
           }
  • tryAcquireShared 获取返回许可书小于0时说明获取许可失败需要进入doAcquireSharedInterruptibly这个方法去休眠。
  • tryAcquireShared 获取返回许可书小于0时说明获取许可成功直接结束。

doAcquireSharedInterruptibly

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 独占锁的acquireQueued调用的是addWaiter(Node.EXCLUSIVE),
        // 而共享锁调用的是addWaiter(Node.SHARED),表明了该节点处于共享模式
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这个方法是不是跟我们上篇文章讲的AQS的独占锁的acquireQueued很像,不过独占锁它是直接调用了用了setHead(node)方法,而共享锁调用的是setHeadAndPropagate(node, r)
这个方法除了调用setHead 里面还调用了doReleaseShared(唤醒后继节点)

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

其他的方法基本上是和ReentrantLock来实现的独占锁差不多,我相信大家对源码分析感兴趣的应该也不多,其他更多细节问题还是需要自己亲自动手去看源码的。

总结

  • 当信号量Semaphore初始化设置许可证为1 时,它也可以当作互斥锁使用。其中0、1就相当于它的状态,当=1时表示其他线程可以获取,当=0时,排他,即其他线程必须要等待。
  • SemaphoreJUC包中的一个很简单的工具类,用来实现多线程下对于资源的同一时刻的访问线程数限制
  • Semaphore中存在一个【许可】的概念,即访问资源之前,先要获得许可,如果当前许可数量为0,那么线程阻塞,直到获得许可
  • Semaphore内部使用AQS实现,由抽象内部类Sync继承了AQS。因为Semaphore天生就是共享的场景,所以其内部实际上类似于共享锁的实现
  • 共享锁的调用框架和独占锁很相似,它们最大的不同在于获取锁的逻辑——共享锁可以被多个线程同时持有,而独占锁同一时刻只能被一个线程持有。
  • 由于共享锁同一时刻可以被多个线程持有,因此当头节点获取到共享锁时,可以立即唤醒后继节点来争锁,而不必等到释放锁的时候。因此,共享锁触发唤醒后继节点的行为可能有两处,一处在当前节点成功获得共享锁后,一处在当前节点释放共享锁后。
  • 采用semaphore来进行限流的话会产生突刺现象
指在一定时间内的一小段时间内就用完了所有资源,后大部分时间中无资源可用。
比如在限流方法中的计算器算法,设置1s内的最大请求数为100,在前100ms已经永远了100个请求,则后面900ms将无法处理请求,这就是突刺现象

结束

  • 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
  • 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
  • 感谢您的阅读,十分欢迎并感谢您的关注。

站在巨人的肩膀上摘苹果:
https://segmentfault.com/a/11...

查看原文

赞 0 收藏 0 评论 0

java金融 关注了用户 · 3月1日

taowen @taowen

Go开发者们请加入我们,滴滴出行平台技术部 taowen@didichuxing.com

关注 1071

java金融 发布了文章 · 2月25日

java高并发编程基础之AQS

引言

曾经有一道比较比较经典的面试题“你能够说说java的并发包下面有哪些常见的类?”大多数人应该都可以说出
CountDownLatch、CyclicBarrier、Sempahore多线程并发三大利器。这三大利器都是通过AbstractQueuedSynchronizer抽象类(下面简写AQS)来实现的,所以学习三大利器之前我们有必要先来学习下AQS

AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架

AQS结构

说到同步我们如何来保证同步?大家第一印象肯定是加锁了,说到锁的话大家肯定首先会想到的是Synchronized。
Synchronized大家应该基本上都会使用,加锁和释放锁都是jvm 来帮我们实现的,我们只需要简单的加个 Synchronized关键字就可以了。
用起来超级方便。但是有没有一种情况我们设置一个锁的超时时间Synchronized就有点实现不了,这时候我们就可以用ReentrantLock来实现,ReentrantLock是通过aqs来实现的,今天我们就通过ReentrantLock来学习一下aqs。

CAS && 公平锁和非公平锁

AQS里面用到了大量的CAS学习AQS之前我们还是有必要简单的先了解下CAS、公平锁和非公平锁。

CAS
  • CAS 全称是 compare and swap,是一种用于在多线程环境下实现同步功能的机制。CAS 操作包含三个操作数 -- 内存位置、预期数值和新值。CAS 的实现逻辑是将内存位置处的数值与预期数值想比较,若相等,则将内存位置处的值替换为新值。若不相等,则不做任何操作,这个操作是个原子性操作,java里面的AtomicInteger等类都是通过cas来实现的。
公平锁和非公平锁
  • 公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,队列中第一个才能获得到锁。

优点:等待锁的线程不会饿死,每个线程都可以获取到锁。
缺点:整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。

  • 非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。

优点:可以减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量。
缺点:处于等待队列中的线程可能会饿死,或者等很久才会获得锁。
文字有点拗口,我们来个实际的例子说明下。比如我们去食堂就餐的时候都要排队,大家都按照先来后到的顺序排队打饭,这就是公平锁。如果等到你准备拿盘子打饭的时候
直接蹦出了一个五大三粗的胖子插队到你前面,你看打不赢他只能忍气吞声让他插队,等胖子打完饭了又来个小个子也来插你队,这时候你没法忍了,直接大吼一声让他滚,这个
小个子只能屁颠屁颠到队尾去排队了这就是非公平锁。
我们先来看看AQS有哪些属性

// 头结点
private transient volatile Node head;

// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;

// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;

// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

下面我们来写一个demo分析下lock 加锁和释放锁的过程

   final void lock() {
            // 上来先试试直接把状态置位1,如果此时没人获取锁就直接
            if (compareAndSetState(0, 1))
                 // 争抢成功则修改获得锁状态的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

cas尝试失败,说明已经有人再持有锁,所以进入acquire方法

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire方法,看名字大概能猜出什么意思,就是试一试。
tryAcquire实际上是调用了父类Sync的nonfairTryAcquire方法

  final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
             // 获取下当前锁的状态
            int c = getState();
            // 这个if 逻辑跟前面一进来就获取锁的逻辑一样都是通过cas尝试获取下锁
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 进入这个判断说明 锁重入了 状态需要进行+1
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                 // 如果锁的重入次数大于int的最大值,直接就抛出异常了,正常情况应该不存在这种情况,不过jdk还是严谨的
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 返回false 说明尝试获取锁失败了,失败了就要进行acquireQueued方法了
            return false;
        }

tryAcquire方法如果获取锁失败了,那么肯定就要排队等待获取锁。排队的线程需要待在哪里等待获取锁?这个就跟我们线程池执行任务一样,线程池把任务都封装成一个work,然后当线程处理任务不过来的时候,就把任务放到队列里面。AQS同样也是类似的,把排队等待获取锁的线程封装成一个NODE。然后再把NODE放入到一个队列里面。队列如下所示,不过需要注意一点head是不存NODE的。
在这里插入图片描述

接下来我们继续分析源码,看下获取锁失败是如何被加入队列的。
就要执行acquireQueued方法,执行acquireQueued方法之前需要先执行addWaiter方法

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // cas 加入队列队尾
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 尾结点不为空 || cas 加入尾结点失败
        enq(node);
        return node;
    }

enq

接下来再看看enq方法

// 通过自旋和CAS一定要当前node加入队尾
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 尾结点为空说明队列还是空的,还没有被初始化,所以初始化头结点,可以看到头结点的node 是没有绑定线程的也就是不存数据的
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

通过addWaiter方法已经把获取锁的线程通过封装成一个NODE加入对列。上述方法的一个执行流程图如下:
在这里插入图片描述
,接下来就是继续执行acquireQueued方法

acquireQueued

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                 // 通过自旋去获取锁 前驱节点==head的时候去尝试获取锁,这个方法在前面已经分析过了。
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
               // 进入这个if说明node的前驱节点不等于head 或者尝试获取锁失败了
               // 判断是否需要挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
               // 异常情况进入cancelAcquire,在jdk11的时候这个源码直接是catch (Throwable e){ cancelAcquire(node);} 简单明了
            if (failed)
                cancelAcquire(node);
        }
    }

setHead

这个方法每当有一个node获取到锁了,就把当前node节点设置为头节点,可以简单的看做当前节点获取到锁了就把当前节点”移除“(变为头结点)队列。

shouldParkAfterFailedAcquire

说到这个方法我们就要先看下NODE可能会有哪些状态在源码里面我们可以看到总共会有四种状态

  • CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
  • SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
  • CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驱节点状态 如果这个状态为-1 则返回true,把当前线程挂起
        if (ws == Node.SIGNAL)
            return true;
        // 大于0,说明状态为CANCELLED 
        if (ws > 0) {
            do {
               // 删除被取消的node(让被取消的node成为一个没有引用的node等着下次GC被回收)
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 进入这里只能是 0,-2,-3。NODE节点初始化的时候waitStatus默认值是0,所以只有这里才有修改waitStatus的地方
            // 通过cas 把前驱节点的状态设置为-1,然后返回false ,外面调用这个方法的是个循环,又会调用一次这个方法
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt

挂起当前线程,并且阻塞

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 挂起当前线程,阻塞
    return Thread.interrupted();
}

在这里插入图片描述

解锁

加锁成功了,那锁用完了就应该释放锁了,释放锁重点看下unparkSuccessor这个方法就好了

 private void unparkSuccessor(Node node) {
          // 头结点状态
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        // s==null head的successor节点获取锁成功后,执行了head.next=null的操作后,解锁线程读取了head.next,因此s==null
        // head的successor节点被取消(cancelAcquire)时,执行了如下操作:successor.waitStatus=1 ; successor.next = successor;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从尾节点开始往前找,找到最前面的非取消的节点 这里没有break 哦
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
             // 唤醒线程 ,唤醒的线程会从acquireQueued去获取锁
            LockSupport.unpark(s.thread);
    }

释放锁代码比较简单,基本都写在代码注释里面了,流程如下:
在这里插入图片描述
这段代码里面有一个比较经典的面试题:
如果头结点的下一个节点为空或者头结点的下一个节点的状态为取消的时候为什么要从后往前找,找到最前面非取消的节点?

  • node.prev = pred; compareAndSetTail(pred, node) 这两个地方可以看作Tail入队的原子操作,但是此时pred.next = node;还没执行,如果这个时候执行了unparkSuccessor方法,就没办法从前往后找了,所以需要从后往前找。
  • 在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node

总结

  • reentrantLock的获取锁和释放锁基本就讲完了,里面还涉及多比较多的细节,感兴趣的同学可以对着源码一行一行去debug试试。
  • 适当的了解aqs才能更好的学习CountDownLatch、CyclicBarrier、Sempahore,因为这三个利器都是基于aqs来实现的。

结束

  • 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
  • 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
  • 感谢您的阅读,十分欢迎并感谢您的关注。

站在巨人的肩膀上摘苹果:
https://tech.meituan.com/2019...
https://javadoop.com/post/Abs...
https://www.cnblogs.com/yanlo...

查看原文

赞 0 收藏 0 评论 0

java金融 关注了专栏 · 2月5日

taowen

I write code

关注 275

java金融 发布了文章 · 1月19日

可恶的爬虫直接把生产6台机器爬挂了!

引言

  • 正在午睡,突然收到线上疯狂报警的邮件,查看这个邮件发现这个报警的应用最近半个月都没有发布,应该不至于会有报警,但是还是打开邮件通过监控发现是由于某个接口某个接口流量暴增,CPU暴涨。为了先解决问题只能先暂时扩容机器了,把机器扩容了一倍,问题得到暂时的解决。最后复盘为什么流量暴增?由于最近新上线了一个商品列表查询接口,主要用来查询商品信息,展示给到用户。业务逻辑也比较简单,直接调用底层一个soa接口,然后把数据进行整合过滤,排序推荐啥的,然后吐给前端。这个接口平时流量都很平稳。线上只部署了6台机器,面对这骤增的流量,只能进行疯狂的扩容来解决这个问题。扩容机器后一下问题得到暂时的解决。后来经过请求分析原来大批的请求都是无效的,都是爬虫过来爬取信息的。这个接口当时上线的时候是裸着上的也没有考虑到会有爬虫过来。

解决办法

  • 既然是爬虫那就只能通过反爬来解决了。自己写一套反爬虫系统,根据用户的习惯,请求特征啥的,浏览器cookie、同一个请求频率、用户ID、以及用户注册时间等来实现一个反爬系统。
  • 直接接入公司现有的反爬系统,需要按照它提供的文档来提供指定的格式请求日志让它来分析。

既然能够直接用现成的,又何必自己重新造轮子呢。最后决定还是采用接入反爬系统的爬虫组件。爬虫系统提供了两种方案如下:

方案1:

  • 爬虫系统提供批量获取黑名单IP的接口(getBlackIpList)和移除黑名单IP接口(removeBlackIp)。

业务项目启动的时候,调用getBlackIpList接口把所有IP黑名单全部存入到本地的一个容器里面(Map、List),中间会有一个定时任务去调用getBlackIpList接口全量拉取黑名单(黑名单会实时更新,可能新增,也可能减少)来更新这个容器。

  • 每次来一个请求先经过这个本地的黑名单IP池子,IP是否在这个池子里面,如果在这个池子直接返回爬虫错误码,然后让前端弹出一个复杂的图形验证码,如果用户输入验证码成功(爬虫基本不会去输入验证码),然后把IP从本地容器移除,同时发起一个异步请求调用移除黑名单IP接口(removeBlackIp),以防下次批量拉取黑单的时候又拉入进来了。然后在发送一个activemq消息告诉其他机器这个IP是被误杀的黑名单,其他机器接受到了这个消息也就会把自己容器里面这个IP移除掉。(其实同步通知其他机器也可以通过把这个IP存入redis里面,如果在命中容器里面是黑名单的时候,再去redis里面判断这个ip是否存在redis里面,如果存在则说明这个ip是被误杀的,应该是正常请求,下次通过定时任务批量拉取黑名单的时候,拉取完之后把这个redis里面的数据全部删除,或者让它自然过期。

这种方案:性能较好,基本都是操作本地内存。但是实现有点麻烦,要维护一份IP黑名单放在业务系统中。
在这里插入图片描述

方案2:

  • 爬虫系统提供单个判断IP是否黑名单接口checkIpIsBlack(但是接口耗时有点长5s)和移除黑名单IP接口(removeBlackIp)。每一个请求过来都去调用爬虫系统提供的接口(判断IP是否在黑名单里面)这里有一个网络请求会有点耗时。如果爬虫系统返回是黑名单,就返回一个特殊的错误码给到前端,然后前端弹出一个图形验证码,如果输入的验证码正确,则调用爬虫系统提供的移除IP黑名单接口,把IP移除。

这种方案:对于业务系统使用起来比较简单,直接调用接口就好,没有业务逻辑,但是这个接口耗时是没法忍受的,严重影响用户的体验
最终综合考虑下来最后决定采用方案1.毕竟系统对响应时间是有要求的尽量不要增加不必要的耗时。

方案1 实现

方案1伪代码实现 我们上文《看了CopyOnWriteArrayList后自己实现了一个CopyOnWriteHashMap》有提到过对于读多写少的线程安全的容器我们可以选择CopyOnWrite容器。

static CopyOnWriteArraySet blackIpCopyOnWriteArraySet = null;
    /**
     * 初始化
     */
    @PostConstruct
    public void init() {
        // 调用反爬系统接口 拉取批量黑名单
        List<String> blackIpList = getBlackIpList();
        // 初始化
        blackIpCopyOnWriteArraySet = new CopyOnWriteArraySet(blackIpList);
    }

    /**
     * 判断IP 是否黑名单
     * @param ip
     * @return
     */
    public boolean checkIpIsBlack(String ip) {
      boolean checkIpIsBlack =  blackIpCopyOnWriteArraySet.contains(ip);
       if (!checkIpIsBlack ) 
               return false;
       // 不在redis白名单里面
       if (!RedisUtils.exist(String.format("whiteIp_%", ip)){
               return false;
        } 
       return  true;
    }

上线后经过一段时间让爬虫系统消费我们的请求日志,经过一定模型特征的训练,效果还是很明显的。由于大部分都是爬虫很多请求直接就被拦截了,所以线上的机器可以直接缩容掉一部分了又回到了6台。但是好景不长,突然发现GC次数频繁告警不断。为了暂时解决问题,赶紧把生产机器进行重启(生产出问题之后,除了重启和回退还有什么解决办法吗),并且保留了一台机器把它拉出集群,重启之后发现过又是一样的还是没啥效果。通过dump线上的一台机器,通过MemoryAnalyzer分析发现一个大对象就是我们存放IP的大对象,存放了大量的的IP数量。这个IP存放的黑名单是放在一个全局的静态CopyOnWriteArraySet,所以每次gc 它都不会被回收掉。只能临时把线上的机器配置都进行升级,由原来的8核16g直接变为16核32g,新机器上线后效果很显著。
为啥测试环境没有复现?
测试环境本来就没有什么其他请求,都是内网IP,几个黑名单IP还是开发手动构造的。

解决方案

业务系统不再维护IP黑名单池子了,由于黑名单来自反爬系统,爬虫黑名单的数量不确定。所以最后决定采取方案2和方案1结合优化。

  • 1.项目启动的时候把所有的IP黑名单全部初始化到一个全局的布隆过滤器
  • 2.一个请求过来先经过布隆过滤器,判断是否在布隆过滤器里面,如果在的话我们再去看看是否在redis白名单里面(误杀用户需要进行洗白)我们再去请求反爬系统判断IP是否是黑名单接口,如果接口返回是IP黑名单直接返回错误码给到前端,如果不是直接放行(布隆过滤器有一定的误判,但是误判率是非常小的,所以即使被误判了,最后再去实际请求接口,这样的话就不会存在真正的误判真实用户)。如果不存在布隆器直接放行。
  • 3.如果是被误杀的用户,用户进行了IP洗白,布隆过滤器的数据是不支持删除(布谷鸟布隆器可以删除(可能误删)),把用户进行正确洗白后的IP存入redis里面。(或者一个本地全局容器,mq消息同步其他机器)

下面我们先来了解下什么是布隆过滤器把。

什么是布隆过滤器
布隆过滤器(英语:Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。

上述出自百度百科。
说白了布隆过滤器主要用来判断一个元素是否在一个集合中,它可以使用一个位数组简洁的表示一个数组。它的空间效率和查询时间远远超过一般的算法,不过它存在一定的误判的概率,适用于容忍误判的场景。如果布隆过滤器判断元素存在于一个集合中,那么大概率是存在在集合中,如果它判断元素不存在一个集合中,那么一定不存在于集合中。

实现原理

 布隆过滤器的原理是,当一个元素被加入集合时,通过 K 个散列函数将这个元素映射成一个位数组(Bit array)中的 K 个点,把它们置为 1 。检索时,只要看看这些点是不是都是1就知道元素是否在集合中;如果这些点有任何一个 0,则被检元素一定不在;如果都是1,则被检元素很可能在(之所以说“可能”是误差的存在)。底层是采用一个bit数组和几个哈希函数来实现。
在这里插入图片描述
在这里插入图片描述
下面我们以一个 bloom filter 插入"java" 和"PHP"为例,每次插入一个元素都进行了三次hash函数
java第一次hash函数得到下标是2,所以把数组下标是2给置为1
java第二次Hash函数得到下标是3,所以把数组下标是3给置为1
java第三次Hash函数得到下标是5,所以把数组下标是5给置为1
PHP 第一次Hash函数得到下标是5,所以把数组下标是5给置为1
...
查找的时候,当我们去查找C++的时候发现第三次hash位置为0,所以C++一定是不在不隆过滤器里面。但是我们去查找“java”这个元素三次hash出来对应的点都是1。只能说这个元素是可能存在集合里面。

  • 布隆过滤器添加元素
  1. 将要添加的元素给k个哈希函数
  2. 得到对应于位数组上的k个位置
  3. 将这k个位置设为1
  • 布隆过滤器查询元素
  1. 将要查询的元素给k个哈希函数
  2. 得到对应于位数组上的k个位置
  3. 如果k个位置有一个为0,则肯定不在集合中
  4. 如果k个位置全部为1,则可能在集合中

使用BloomFilter

引入pom

 <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>23.0</version>
  </dependency> 
    public static int count = 1000000;
    private static BloomFilter<String> bf = BloomFilter.create(Funnels.stringFunnel(Charset.forName("utf-8")), count,0.009);
    public static void main(String[] args) {
        int missCount = 0;
        for (int i = 0; i < count; i++) {
            bf.put(i+"");
        }
        for (int i = count; i < count+1000000; i++) {
            boolean b = bf.mightContain(i +"");
            if (b) {
                missCount++;
            }
        }
        System.out.println(new BigDecimal(missCount).divide(new BigDecimal(count)));
    }

解决问题

布隆过滤器介绍完了,我们再回到上述的问题,我们把上述问题通过伪代码来实现下;

   /**
     * 初始化
     */
    @PostConstruct
    public void init() {
        // 这个可以通过配置中心来读取
        double fpp = 0.001;
        // 调用反爬系统接口 拉取批量黑名单
        List<String> blackIpList = getBlackIpList();
        // 初始化 不隆过滤器
        blackIpBloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.forName("utf-8")), blackIpList.size(), fpp);
        for (String ip: blackIpList) {
            blackIpBloomFilter.put(ip);
        }
    }
    /**
     * 判断是否是爬虫
     */
    public boolean checkIpIsBlack(String ip) {
        boolean contain = blackIpBloomFilter.mightContain(ip);
        if (!contain) {
            return false;
        }
         // 不在redis白名单里面
       if (!RedisUtils.exist(String.format("whiteIp_%", ip)){
               return false;
        } 
        // 调用反爬系统接口 判断IP是否在黑名单里面
    }

总结

上述只是列举了通过IP来反爬虫,这种反爬的话只能应对比较低级的爬虫,如果稍微高级一点的爬虫也可以通过代理IP来继续爬你的网站,这样的话成本可能就会加大了一点。爬虫虽然好,但是还是不要乱爬,“爬虫爬的好,牢饭吃到饱

结束

  • 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
  • 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
  • 感谢您的阅读,十分欢迎并感谢您的关注。
查看原文

赞 0 收藏 0 评论 0

java金融 发布了文章 · 2020-12-31

看了CopyOnWriteArrayList后自己实现了一个CopyOnWriteHashMap

在这里插入图片描述

引言

面试官: 小伙子你有点眼熟啊,是不是去年来这面试过啊。<br/>
二胖: 啊,没有啊我这是第一次来这。<br/>
面试官: 行,那我们开始今天的面试吧,刚开始我们先来点简单的吧,java里面的容器你知道哪些啊,跟我说一说吧。<br/>
二胖: 好的,java里面常见容器有ArrayList(线程非安全)、HashMap(线程非安全)、HashSet(线程非安全),ConcurrentHashMap(线程安全)。<br/>
面试官:ArrayList 既然线程非安全那有没有线程安全的ArrayList列?<br/>
二胖: 这个。。。 好像问到知识盲点了。<br/>
面试官: 那我们今天的面试就先到这了,我待会还有一个会,后续如有通知人事会联系你的。<br/>
以上故事纯属虚构如有雷同请以本文为主。

什么是COW

在java里面说到集合容器我们一般首先会想到的是HashMapArrayListHasHSet这几个容器也是平时开发中用的最多的。
这几个都是非线程安全的,如果我们有特定业务需要使用线程的安全容器列,

  • HashMap可以用ConcurrentHashMap代替。
  • ArrayList 可以使用Collections.synchronizedList()方法(list 每个方法都用synchronized修饰) 或者使用Vector(现在基本也不用了,每个方法都用synchronized修饰)

或者使用CopyOnWriteArrayList 替代。

  • HasHSet 可以使用 Collections.synchronizedSet 或者使用CopyOnWriteArraySet来代替。(CopyOnWriteArraySet为什么不叫CopyOnWriteHashSet因为CopyOnWriteArraySet底层是采用CopyOnWriteArrayList来实现的)

我们可以看到CopyOnWriteArrayList在线程安全的容器里面多次出现。
首先我们来看看什么是CopyOnWriteCopy-On-Write简称COW,是一种用于程序设计中的优化策略。

CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

为什么要引入COW

防止ConcurrentModificationException异常

在java里面我们如果采用不正确的循环姿势去遍历List时候,如果一边遍历一边修改抛出java.util.ConcurrentModificationException错误的。
如果对ArrayList循环遍历不是很熟悉的可以建议看下这篇文章《ArrayList的删除姿势你都掌握了吗》

        List<String> list = new ArrayList<>();
        list.add("张三");
        list.add("java金融");
        list.add("javajr.cn");
        Iterator<String> iterator = list.iterator();
        while(iterator.hasNext()){
            String content = iterator.next();
            if("张三".equals(content)) {
                list.remove(content);
            }

        }

上面这个栗子是会发生java.util.ConcurrentModificationException异常的,如果把ArrayList改为CopyOnWriteArrayList 是不会发生生异常的。

线程安全的容器

我们再看下面一个栗子一个线程往List里面添加数据,一个线程循环list读数据。

    List<String> list = new ArrayList<>();
        list.add("张三");
        list.add("java金融");
        list.add("javajr.cn");
        Thread t = new Thread(new Runnable() {
            int count = 0;
            @Override
            public void run() {
                while (true) {
                    list.add(count++ + "");
                }
            }
        });
        t.start();
        Thread.sleep(10000);
        for (String s : list) {
            System.out.println(s);
        }

我们运行上述代码也会发生ConcurrentModificationException异常,如果把ArrayList换成了CopyOnWriteArrayList就一切正常。

CopyOnWriteArrayList的实现

通过上面两个栗子我们可以发现CopyOnWriteArrayList是线程安全的,下面我们就来一起看看CopyOnWriteArrayList是如何实现线程安全的。

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    private static final long serialVersionUID = 8673264195747942595L;

    /** The lock protecting all mutators */
    final transient ReentrantLock lock = new ReentrantLock();

    /** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;

从源码中我们可以知道CopyOnWriteArrayList这和ArrayList底层实现都是通过一个Object的数组来实现的,只不过 CopyOnWriteArrayList的数组是通过volatile来修饰的,为什么需要volatile修饰建议可以看看《Java的synchronized 能防止指令重排序吗?》
还有新增了ReentrantLock

add方法:

    public boolean add(E e) {
        // 先获取锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            // 复制一个新的数组
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            // 把新数组的值 赋给原数组
            setArray(newElements);
            return true;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

上述源码我们可以发现比较简单,有几个点需要稍微注意下

  • 增加数据的时候是通过ReentrantLock加锁操作来(在jdk11的时候采用了synchronized来替换ReentrantLock)保证多线程写的时候只有一个线程进行数组的复制,否则的话内存中会有多份被复制的数据,导致数据错乱。
  • 数组是通过volatile 修饰的,根据 volatilehappens-before 规则,写线程对数组引用的修改是可以立即对读线程是可见的。
  • 通过写时复制来保证读写实在两个不同的数据容器中进行操作。

自己实现一个COW容器

再Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayListCopyOnWriteArraySet,但是并没有CopyOnWriteHashMap我们可以按照他的思路自己来实现一个CopyOnWriteHashMap

public class CopyOnWriteHashMap<K, V> implements Map<K, V>, Cloneable {

    final transient ReentrantLock lock = new ReentrantLock();

    private volatile Map<K, V> map;


    public CopyOnWriteHashMap() {
        map = new HashMap<>();
    }

    @Override
    public V put(K key, V value) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Map<K, V> newMap = new HashMap<K, V>(map);
            V val = newMap.put(key, value);
            map = newMap;
            return val;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public V get(Object key) {
        return map.get(key);
    }
    @Override
    public V remove(Object key) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Map<K, V> newMap = new HashMap<K, V>(map);

            if (!newMap.containsKey(key)) {
                return null;
            }
            V v = newMap.get(key);
            newMap.remove(key);
            map = newMap;
            return v;
        }finally {
            lock.unlock();
        }
    }

上述我们实现了一个简单的CopyOnWriteHashMap,只实现了add、remove、get方法其他剩余的方法可以自行去实现,涉及到只要数据变化的就要加锁,读无需加锁。

应用场景

CopyOnWrite并发容器适用于读多写少的并发场景,比如黑白名单、国家城市等基础数据缓存、系统配置等。这些基本都是只要想项目启动的时候初始化一次,变更频率非常的低。如果这种读多写少的场景采用 Vector,Collections包装的这些方式是不合理的,因为尽管多个读线程从同一个数据容器中读取数据,但是读线程对数据容器的数据并不会发生发生修改,所以并不需要读也加锁。

CopyOnWrite缺点

CopyOnWriteArrayList虽然是一个线程安全版的ArrayList,但其每次修改数据时都会复制一份数据出来,所以CopyOnWriteArrayList只适用读多写少或无锁读场景。我们如果在实际业务中使用CopyOnWriteArrayList,一定是因为这个场景适合而非是为了炫技。

内存占用问题

因为CopyOnWrite的写时复制机制每次进行写操作的时候都会有两个数组对象的内存,如果这个数组对象占用的内存较大的话,如果频繁的进行写入就会造成频繁的Yong GC和Full GC。

数据一致性问题

CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。读操作的线程可能不会立即读取到新修改的数据,因为修改操作发生在副本上。但最终修改操作会完成并更新容器所以这是最终一致性。

CopyOnWriteArrayList和Collections.synchronizedList()

简单的测试了下CopyOnWriteArrayList 和 Collections.synchronizedList()的读和写发现:

  • 在高并发的写时CopyOnWriteArray比同步Collections.synchronizedList慢百倍
  • 在高并发的读性能时CopyOnWriteArray比同步Collections.synchronizedList快几十倍。
  • 高并发写时,CopyOnWriteArrayList为何这么慢呢?因为其每次add时,都用Arrays.copyOf创建新数组,频繁add时内存申请释放性能消耗大。
  • 高并发读的时候CopyOnWriteArray无锁,Collections.synchronizedList有锁所以读的效率比较低下。

总结

选择CopyOnWriteArrayList的时候一定是读远大于写。如果读写都差不多的话建议选择Collections.synchronizedList。

结束

  • 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
  • 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
  • 感谢您的阅读,十分欢迎并感谢您的关注。

在这里插入图片描述
巨人肩膀摘苹果
http://ifeve.com/java-copy-on...

查看原文

赞 8 收藏 6 评论 0

java金融 发布了文章 · 2020-12-30

肝了一个月,整理了这些java思维导图(干货十足)!

很多人都在问应该怎么样学习java的知识点,java有哪些知识点?最近准备面试了,java知识点太多了又不知道如何开始复习?java的知识点太多太多,学完了又忘了。所以我们可以为每个知识点都整理成一份思维导图。需要的时候只要找出这个思维导图对着学习、巩固。所以我整理了近100多份思维导图,内容包含了jvm、java基础、java核心基础、中间件、spring、redis、tomcat、数据结构、大数据、算法等,链接我放文末了,没有任何套路。直接下载! 给大家白嫖。最近学弟也凭着这份思维导图最终进入了互联网大厂。

java核心知识

在这里插入图片描述

javaSE知识

在这里插入图片描述

springmvc 框架

在这里插入图片描述

JVM调优【面试必考】

在这里插入图片描述

netty

在这里插入图片描述

数据结构

在这里插入图片描述

zookper

在这里插入图片描述
所有思维导图大概100多张。我就不一一的展示了。因为自动压缩的原因可能看不太清。有需要的同学直接下面的lanzou网盘中下载。
希望大家点赞收藏支持一下~~

下载地址 https://wws.lanzous.com/iYmW5...
点击下载

查看原文

赞 1 收藏 1 评论 0

java金融 发布了文章 · 2020-12-23

5种SpringMvc的异步处理方式你都了解吗?

引言

说到异步大家肯定首先会先想到同步。我们先来看看什么是同步?
所谓同步,就是发出一个功能调用时,在没有得到结果之前,该调用就不返回或继续执行后续操作。
简单来说,同步就是必须一件一件事做,等前一件做完了才能做下一件事。
异步:异步就相反,调用在发出之后,这个调用就直接返回了,不需要等结果。

浏览器同步

浏览器发起一个request然后会一直待一个响应response,在这期间里面它是阻塞的。比如早期我们在我们在逛电商平台的时候买东西我们打开一个商品的页面,大致流程是不是可能是这样,每次打开一个页面都是由一个线程从头到尾来处理,这个请求需要进行数据库的访问需要把商品价格库存啥的返回页面,还需要去调用第三方接口,比如优惠券接口等我们只有等到这些都处理完成后这个线程才会把结果响应给浏览器,在这等结果期间这个线程只能一直在干等着啥事情也不能干。这样的话是不是会有有一定的性能问题。大致的流程如下:
在这里插入图片描述

浏览器异步

为了解决上面同步阻塞的问题,再Servlet3.0发布后,提供了一个新特性:异步处理请求。比如我们还是进入商品详情页面,这时候这个前端发起一个请求,然后会有一个线程来执行这个请求,这个请求需要去数据库查询库存、调用第三方接口查询优惠券等。这时候这个线程就不用干等着呢。它的任务到这就完成了,又可以执行下一个任务了。等查询数据库和第三方接口查询优惠券有结果了,这时候会有一个新的线程来把处理结果返回给前端。这样的话线程的工作量是不超级饱和,需要不停的干活,连休息的机会都不给了。

在这里插入图片描述

  • 这个异步是纯后端的异步,对前端是无感的,异步也并不会带来响应时间上的优化,原来该执行多久照样还是需要执行多久。但是我们的请求线程(Tomcat 线程)为异步servlet之后,我们可以立即返回,依赖于业务的任务用业务线程来执行,也就是说,Tomcat的线程可以立即回收,默认情况下,Tomcat的核心线程是10,最大线程数是200,我们能及时回收线程,也就意味着我们能处理更多的请求,能够增加我们的吞吐量,这也是异步Servlet的主要作用。

下面我们就来看看Spring mvc 的几种异步方式吧
https://docs.spring.io/spring...
在这里插入图片描述
在这个之前我们还是先简单的回顾下Servlet 3.1的异步:

  • 客户端(浏览器、app)发送一个请求
  • Servlet容器分配一个线程来处理容器中的一个servlet
  • servlet调用request.startAsync()开启异步模式,保存AsyncContext, 然后返回。
  • 这个servlet请求线程以及所有的过滤器都可以结束,但其响应(response)会等待异步线程处理结束后再返回。
  • 其他线程使用保存的AsyncContext来完成响应
  • 客户端收到响应

在这里插入图片描述

Callable

 /**  公众号:java金融
     * 使用Callable
     * @return
     */
    @GetMapping("callable")
    public Callable<String> callable() {
        System.out.println(LocalDateTime.now().toString() + "--->主线程开始");
        Callable<String> callable = () -> {
            String result = "return callable";
            // 执行业务耗时 5s
            Thread.sleep(5000);
            System.out.println(LocalDateTime.now().toString() + "--->子任务线程("+Thread.currentThread().getName()+")");
            return result;
        };
        System.out.println(LocalDateTime.now().toString() + "--->主线程结束");
        return callable;
    }
       public static String doBusiness() {
        // 执行业务耗时 10s
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return UUID.randomUUID().toString();
    }
  • 控制器先返回一个Callable对象
  • Spring MVC开始进行异步处理,并把该Callable对象提交给另一个独立线程的执行器TaskExecutor处理
  • DispatcherServlet和所有过滤器都退出Servlet容器线程,但此时方法的响应对象仍未返回
  • Callable对象最终产生一个返回结果,此时Spring MVC会重新把请求分派回Servlet容器,恢复处理
  • DispatcherServlet再次被调用,恢复对Callable异步处理所返回结果的处理

上面就是Callable的一个执行流程,下面我们来简单的分析下源码,看看是怎么实现的:
我们知道SpringMvc是可以返回json格式数据、或者返回视图页面(html、jsp)等,SpringMvc是怎么实现这个的呢?最主要的一个核心类就是org.springframework.web.method.support.HandlerMethodReturnValueHandler 我们来看看这个类,这个类就是一个接口,总共就两个方法;

boolean supportsReturnType(MethodParameter returnType);
void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception;

上面这个我们的请求是返回Callable<String> 这样一个结果的,我们会根据这个返回的类型去找所有实现了HandlerMethodReturnValueHandler 这个接口的实现类,最终我们会根据返回类型通过supportsReturnType这个实现的方法找到一个对应的HandlerMethodReturnValueHandler 实现类,我们根据返回类型是Callable然后就找到了实现类CallableMethodReturnValueHandler。在这里插入图片描述
开启异步线程的话也就是在handleReturnValue这个方法里面了,感兴趣的大家可以动手去debug下还是比较好调试的。

CompletableFuture 和ListenableFuture

   @GetMapping("completableFuture")
    public CompletableFuture<String> completableFuture() {
        // 线程池一般不会放在这里,会使用static声明,这只是演示
        ExecutorService executor = Executors.newCachedThreadPool();
        System.out.println(LocalDateTime.now().toString() + "--->主线程开始");
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(IndexController::doBusiness, executor);
        System.out.println(LocalDateTime.now().toString() + "--->主线程结束");
        return completableFuture;
    }

    @GetMapping("listenableFuture")
    public ListenableFuture<String> listenableFuture() {
        // 线程池一般不会放在这里,会使用static声明,这只是演示
        ExecutorService executor = Executors.newCachedThreadPool();
        System.out.println(LocalDateTime.now().toString() + "--->主线程开始");
        ListenableFutureTask<String> listenableFuture = new ListenableFutureTask<>(()->   doBusiness());
        executor.execute(listenableFuture);
        System.out.println(LocalDateTime.now().toString() + "--->主线程结束");
        return listenableFuture;
    }

注:这种方式记得不要使用内置的不要使用内置的 ForkJoinPool线程池,需要自己创建线程池否则会有性能问题

WebAsyncTask

 @GetMapping("asynctask")
    public WebAsyncTask asyncTask() {
        SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
        System.out.println(LocalDateTime.now().toString() + "--->主线程开始");
        WebAsyncTask<String> task = new WebAsyncTask(1000L, executor, ()-> doBusiness());
        task.onCompletion(()->{
            System.out.println(LocalDateTime.now().toString() + "--->调用完成");
        });
        task.onTimeout(()->{
            System.out.println("onTimeout");
            return "onTimeout";
        });
        System.out.println(LocalDateTime.now().toString() + "--->主线程结束");
        return task;
    }

DeferredResult

    @GetMapping("deferredResult")
    public DeferredResult<String> deferredResult() {
        System.out.println(LocalDateTime.now().toString() + "--->主线程("+Thread.currentThread().getName()+")开始");
        DeferredResult<String> deferredResult = new DeferredResult<>();
        CompletableFuture.supplyAsync(()-> doBusiness(), Executors.newFixedThreadPool(5)).whenCompleteAsync((result, throwable)->{
            if (throwable!=null) {
                deferredResult.setErrorResult(throwable.getMessage());
            }else {
                deferredResult.setResult(result);
            }
        });
        // 异步请求超时时调用
        deferredResult.onTimeout(()->{
            System.out.println(LocalDateTime.now().toString() + "--->onTimeout");
        });
        // 异步请求完成后调用
        deferredResult.onCompletion(()->{
            System.out.println(LocalDateTime.now().toString() + "--->onCompletion");
        });
        System.out.println(LocalDateTime.now().toString() + "--->主线程("+Thread.currentThread().getName()+")结束");
        return deferredResult;
    }
  • 上面这几种异步方式都是会等到业务doBusiness执行完之后(10s)才会把response给到前端,执行请求的主线程会立即结束,响应结果会交给另外的线程来返回给前端。
  • 这种异步跟下面的这个所谓的假异步是不同的,这种情况是由主线程执行完成之后立马返回值(主线程)给前端,不会等个5s在返回给前端。
    @GetMapping("call")
    public String call() {
       new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   Thread.sleep(5000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       }).start();
        return "这是个假异步";
    }

这几种异步方式都跟返回Callable 差不多,都有对应的HandlerMethodReturnValueHandler 实现类,无非就是丰富了自己一些特殊的api、比如超时设置啥的,以及线程池的创建是谁来创建,执行流程基本都是一样的。

总结

  • 了解spring mvc 的异步编程,对我们后续学习响应式编程、rxjava、webflux等都是有好处的。
  • 异步编程可以帮我们高效的利用系统资源。

结束

  • 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
  • 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
  • 感谢您的阅读,十分欢迎并感谢您的关注。

在这里插入图片描述

站在巨人的肩膀上摘苹果:
https://blog.csdn.net/f641385...

查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 19 次点赞
  • 获得 1 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2020-05-20
个人主页被 2.1k 人浏览