会炼钢的小白龙

会炼钢的小白龙 查看完整档案

北京编辑太原理工大学  |  冶金工程 编辑TalkingData  |  数据工程师 编辑 baixianlong.top 编辑
编辑

阳光下 海滩边 一张长椅 一台电脑 三五行代码

个人动态

会炼钢的小白龙 赞了文章 · 2019-08-16

使用 Spring Boot 2.0 + WebFlux 实现 RESTful API

概述

什么是 Spring WebFlux, 它是一种异步的, 非阻塞的, 支持背压(Back pressure)机制的Web 开发框架. 要深入了解 Spring WebFlux, 首先要了知道 Reactive Stream, 和命令式编程相较而言, 只是另一种编程姿势.

滚床单的姿势有很多种, 目的都一样.

WebFlux 支持两种编程风(姿)格(势)

  • 使用@Controller这种基于注解的姿势, 与Sring MVC的姿势相同
  • 基于Java 8 Lambda的函数式编程风格
注意: 上面只是两种编程的姿势, 和"普通话和重庆话都是中国话"是一个道理. 我们公司也有外地的, 对他我说普通话, 对本地同事说重庆话. 这叫多态

创建项目

本文的源码在最下面

通过 http://start.spring.io 创建项目骨架.

clipboard.png

如果是手工配置, 需要添加Spring的里程碑(Milestone)仓库:

<repositories>
    <repository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/snapshot</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

<pluginRepositories>
    <pluginRepository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/snapshot</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </pluginRepository>
    <pluginRepository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </pluginRepository>
</pluginRepositories>

测试

列举所有用户

clipboard.png

创建用户

clipboard.png

获取单个用户

clipboard.png

修改

clipboard.png

删除

clipboard.png

源码

注解姿势
函数姿势

查看原文

赞 8 收藏 12 评论 2

会炼钢的小白龙 发布了文章 · 2019-04-13

Java基础篇——JVM之GC原理(干货满满)

原创不易,如需转载,请注明出处https://www.cnblogs.com/baixianlong/p/10697554.html,多多支持哈!

一、什么是GC?

GC是垃圾收集的意思,内存处理是编程人员容易出现问题的地方,忘记或者错误的内存回收会导致程序或系统的不稳定甚至崩溃,Java提供的GC功能可以自动监测对象是否超过作用域从而达到自动回收内存的目的,Java语言没有提供释放已分配内存的显示操作方法。Java程序员不用担心内存管理,因为垃圾收集器会自动进行管理。要请求垃圾收集,可以调用下面的方法之一:System.gc() 或Runtime.getRuntime().gc()。

二、哪些内存需要回收?

哪些内存需要回收是垃圾回收机制第一个要考虑的问题,所谓“要回收的垃圾”无非就是那些不可能再被任何途径使用的对象。那么如何找到这些对象?

  • 引用计数法:这种算法不能解决对象之间相互引用的情况,所以这种方法不靠谱
  • <font color=red>可达性分析法:这个算法的基本思想是通过一系列称为“GC Roots”的对象作为起始点,从这些节点向下搜索,搜索所走过的路径称为引用链,当一个对象到GC Roots没有任何引用链(即GC Roots到对象不可达)时,则证明此对象是不可用的。</font>

那么问题又来了,如何选取GCRoots对象呢?在Java语言中,可以作为GCRoots的对象包括下面几种:

  1. 虚拟机栈(栈帧中的局部变量区,也叫做局部变量表)中引用的对象。
  2. 方法区中的类静态属性引用的对象。
  3. 方法区中常量引用的对象。
  4. 本地方法栈中JNI(Native方法)引用的对象。

下面给出一个GCRoots的例子,如下图,为GCRoots的引用链,obj8、obj9、obj10都没有到GCRoots对象的引用链,所以会进行回收。

可达.png

三、四种引用状以及基于可达性分析的内存回收原理

引用.png

对于可达性分析算法而言,未到达的对象并非是“非死不可”的,若要宣判一个对象死亡,至少需要经历两次标记阶段。

  1. 如果对象在进行可达性分析后发现没有与GCRoots相连的引用链,则该对象被第一次标记并进行一次筛选,筛选条件为是否有必要执行该对象的finalize方法,若对象没有覆盖finalize方法或者该finalize方法是否已经被虚拟机执行过了,则均视作不必要执行该对象的finalize方法,即该对象将会被回收。反之,若对象覆盖了finalize方法并且该finalize方法并没有被执行过,那么,这个对象会被放置在一个叫F-Queue的队列中,之后会由虚拟机自动建立的、优先级低的Finalizer线程去执行,而虚拟机不必要等待该线程执行结束,即虚拟机只负责建立线程,其他的事情交给此线程去处理。
  2. 对F-Queue中对象进行第二次标记,如果对象在finalize方法中拯救了自己,即关联上了GCRoots引用链,如把this关键字赋值给其他变量,那么在第二次标记的时候该对象将从“即将回收”的集合中移除,如果对象还是没有拯救自己,那就会被回收。如下代码演示了一个对象如何在finalize方法中拯救了自己,然而,它只能拯救自己一次,第二次就被回收了。具体代码如下:

    public class GC {

    public static GC SAVE_HOOK = null; 
    
    public static void main(String[] args) throws InterruptedException {
        // 新建对象,因为SAVE_HOOK指向这个对象,对象此时的状态是(reachable,unfinalized)
        SAVE_HOOK = new GC(); 
        //将SAVE_HOOK设置成null,此时刚才创建的对象就不可达了,因为没有句柄再指向它了,对象此时状态是(unreachable,unfinalized)
        SAVE_HOOK = null; 
        //强制系统执行垃圾回收,系统发现刚才创建的对象处于unreachable状态,并检测到这个对象的类覆盖了finalize方法,因此把这个对象放入F-Queue队列,由低优先级线程执行它的finalize方法,此时对象的状态变成(unreachable, finalizable)或者是(finalizer-reachable,finalizable)
        System.gc(); 
        // sleep,目的是给低优先级线程从F-Queue队列取出对象并执行其finalize方法提供机会。在执行完对象的finalize方法中的super.finalize()时,对象的状态变成(unreachable,finalized)状态,但接下来在finalize方法中又执行了SAVE_HOOK = this;这句话,又有句柄指向这个对象了,对象又可达了。因此对象的状态又变成了(reachable, finalized)状态。
        Thread.sleep(500);
        // 这里楼主说对象处于(reachable,finalized)状态应该是合理的。对象的finalized方法被执行了,因此是finalized状态。又因为在finalize方法是执行了SAVE_HOOK=this这句话,本来是unreachable的对象,又变成reachable了。
        if (null != SAVE_HOOK) { //此时对象应该处于(reachable, finalized)状态 
            // 这句话会输出,注意对象由unreachable,经过finalize复活了。
            System.out.println("Yes , I am still alive"); 
        } else { 
            System.out.println("No , I am dead"); 
        } 
        // 再一次将SAVE_HOOK放空,此时刚才复活的对象,状态变成(unreachable,finalized)
        SAVE_HOOK = null; 
        // 再一次强制系统回收垃圾,此时系统发现对象不可达,虽然覆盖了finalize方法,但已经执行过了,因此直接回收。
        System.gc(); 
        // 为系统回收垃圾提供机会
        Thread.sleep(500); 
        if (null != SAVE_HOOK) { 
            // 这句话不会输出,因为对象已经彻底消失了。
            System.out.println("Yes , I am still alive"); 
        } else { 
            System.out.println("No , I am dead"); 
        } 
    } 
    
    @Override 
    protected void finalize() throws Throwable { 
        super.finalize(); 
        System.out.println("execute method finalize()"); 
       // 这句话让对象的状态由unreachable变成reachable,就是对象复活
        SAVE_HOOK = this; 
    } 

    }

运行结果如下:

    leesf
    null
    finalize method executed!
    leesf
    yes, i am still alive :)
    no, i am dead : (

  由结果可知,该对象拯救了自己一次,第二次没有拯救成功,因为对象的finalize方法最多被虚拟机调用一次。此外,从结果我们可以得知,一个堆对象的this(放在局部变量表中的第一项)引用会永远存在,在方法体内可以将this引用赋值给其他变量,这样堆中对象就可以被其他变量所引用,即不会被回收。

四、方法区的垃圾回收

1、方法区的垃圾回收主要回收两部分内容:

  • 废弃常量
  • 无用的类

2、既然进行垃圾回收,就需要判断哪些是废弃常量,哪些是无用的类?

  • 如何判断废弃常量呢?以字面量回收为例,如果一个字符串“abc”已经进入常量池,但是当前系统没有任何一个String对象引用了叫做“abc”的字面量,那么,如果发生垃圾回收并且有必要时,“abc”就会被系统移出常量池。常量池中的其他类(接口)、方法、字段的符号引用也与此类似。
  • 如何判断无用的类呢?需要满足以下三个条件:

    1. 该类的所有实例都已经被回收,即Java堆中不存在该类的任何实例。
    2. 加载该类的ClassLoader已经被回收。
    3. 该类对应的java.lang.Class对象没有在任何地方被引用,无法在任何地方通过反射访问该类的方法。

五、垃圾收集算法(垃圾回收器都是基于这些算法来实现)

1、标记-清除(Mark-Sweep)算法

  这是最基础的算法,标记-清除算法就如同它的名字样,分为“标记”和“清除”两个阶段:首先标记出所有需要回收的对象,标记完成后统一回收所有被标记的对象。这种算法的不足主要体现在效率和空间,从效率的角度讲,标记和清除两个过程的效率都不高;从空间的角度讲,标记清除后会产生大量不连续的内存碎片, 内存碎片太多可能会导致以后程序运行过程中在需要分配较大对象时,无法找到足够的连续内存而不得不提前触发一次垃圾收集动作。标记-清除算法执行过程如图:

mark-sweep.png

2、复制(Copying)算法

  复制算法是为了解决效率问题而出现的,它将可用的内存分为两块,每次只用其中一块,当这一块内存用完了,就将还存活着的对象复制到另外一块上面,然后再把已经使用过的内存空间一次性清理掉。这样每次只需要对整个半区进行内存回收,内存分配时也不需要考虑内存碎片等复杂情况,只需要移动指针,按照顺序分配即可。复制算法的执行过程如图:

copying.png

  不过这种算法有个缺点,内存缩小为了原来的一半,这样代价太高了。现在的商用虚拟机都采用这种算法来回收新生代,不过研究表明1:1的比例非常不科学,因此新生代的内存被划分为一块较大的Eden空间和两块较小的Survivor空间,每次使用Eden和其中一块Survivor。每次回收时,将Eden和Survivor中还存活着的对象一次性复制到另外一块Survivor空间上,最后清理掉Eden和刚才用过的Survivor空间。HotSpot虚拟机默认Eden区和Survivor区的比例为8:1,意思是每次新生代中可用内存空间为整个新生代容量的90%。当然,我们没有办法保证每次回收都只有不多于10%的对象存活,当Survivor空间不够用时,需要依赖老年代进行分配担保(Handle Promotion)。

3、标记-整理(Mark-Compact)算法

  复制算法在对象存活率较高的场景下要进行大量的复制操作,效率很低。万一对象100%存活,那么需要有额外的空间进行分配担保。老年代都是不易被回收的对象,对象存活率高,因此一般不能直接选用复制算法。根据老年代的特点,有人提出了另外一种标记-整理算法,过程与标记-清除算法一样,不过不是直接对可回收对象进行清理,而是让所有存活对象都向一端移动,然后直接清理掉边界以外的内存。标记-整理算法的工作过程如图:

mark-sweep.png

六、垃圾收集器

  垃圾收集器就是上面讲的理论知识的具体实现了。不同虚拟机所提供的垃圾收集器可能会有很大差别,我们使用的是HotSpot,HotSpot这个虚拟机所包含的所有收集器如图:

垃圾回收器总览.png

  上图展示了7种作用于不同分代的收集器,如果两个收集器之间存在连线,那说明它们可以搭配使用。虚拟机所处的区域说明它是属于新生代收集器还是老年代收集器。多说一句,我们必须明确一个观点:没有最好的垃圾收集器,更加没有万能的收集器,只能选择对具体应用最合适的收集器。这也是HotSpot为什么要实现这么多收集器的原因。OK,下面一个一个看一下收集器。

Serial收集器

  最基本、发展历史最久的收集器,这个收集器是一个采用复制算法的单线程的收集器,单线程一方面意味着它只会使用一个CPU或一条线程去完成垃圾收集工作,另一方面也意味着它进行垃圾收集时必须暂停其他线程的所有工作,直到它收集结束为止。后者意味着,在用户不可见的情况下要把用户正常工作的线程全部停掉,这对很多应用是难以接受的。不过实际上到目前为止,Serial收集器依然是虚拟机运行在Client模式下的默认新生代收集器,因为它简单而高效。用户桌面应用场景中,分配给虚拟机管理的内存一般来说不会很大,收集几十兆甚至一两百兆的新生代停顿时间在几十毫秒最多一百毫秒,只要不是频繁发生,这点停顿是完全可以接受的。Serial收集器运行过程如下图所示:

Serial收集器.png

  说明:1. 需要STW(Stop The World),停顿时间长。2. 简单高效,对于单个CPU环境而言,Serial收集器由于没有线程交互开销,可以获取最高的单线程收集效率。

ParNew收集器

  ParNew收集器其实就是Serial收集器的多线程版本,除了使用多条线程进行垃圾收集外,其余行为和Serial收集器完全一样,包括使用的也是复制算法。ParNew收集器除了多线程以外和Serial收集器并没有太多创新的地方,但是它却是Server模式下的虚拟机首选的新生代收集器,其中有一个很重要的和性能无关的原因是,除了Serial收集器外,目前只有它能与CMS收集器配合工作(看图)。CMS收集器是一款几乎可以认为有划时代意义的垃圾收集器,因为它第一次实现了让垃圾收集线程与用户线程基本上同时工作。ParNew收集器在单CPU的环境中绝对不会有比Serial收集器更好的效果,甚至由于线程交互的开销,该收集器在两个CPU的环境中都不能百分之百保证可以超越Serial收集器。当然,随着可用CPU数量的增加,它对于GC时系统资源的有效利用还是很有好处的。它默认开启的收集线程数与CPU数量相同,在CPU数量非常多的情况下,可以使用-XX:ParallelGCThreads参数来限制垃圾收集的线程数。ParNew收集器运行过程如下图所示:

ParNew收集器.png

Parallel Scavenge收集器

  Parallel Scavenge收集器也是一个新生代收集器,也是用复制算法的收集器,也是并行的多线程收集器,但是它的特点是它的关注点和其他收集器不同。介绍这个收集器主要还是介绍吞吐量的概念。CMS等收集器的关注点是尽可能缩短垃圾收集时用户线程的停顿时间,而Parallel Scavenge收集器的目标则是打到一个可控制的吞吐量。所谓吞吐量的意思就是CPU用于运行用户代码时间与CPU总消耗时间的比值,即吞吐量=运行用户代码时间/(运行用户代码时间+垃圾收集时间),虚拟机总运行100分钟,垃圾收集1分钟,那吞吐量就是99%。另外,<font color=red>Parallel Scavenge收集器是虚拟机运行在Server模式下的默认垃圾收集器</font>。

  停顿时间短适合需要与用户交互的程序,良好的响应速度能提升用户体验;高吞吐量则可以高效率利用CPU时间,尽快完成运算任务,主要适合在后台运算而不需要太多交互的任务。

  虚拟机提供了-XX:MaxGCPauseMillis和-XX:GCTimeRatio两个参数来精确控制最大垃圾收集停顿时间和吞吐量大小。不过不要以为前者越小越好,GC停顿时间的缩短是以牺牲吞吐量和新生代空间换取的。由于与吞吐量关系密切,Parallel Scavenge收集器也被称为“吞吐量优先收集器”。<font color=red>Parallel Scavenge收集器有一个-XX:+UseAdaptiveSizePolicy参数,这是一个开关参数,这个参数打开之后,就不需要手动指定新生代大小、Eden区和Survivor参数等细节参数了,虚拟机会根据当前系统的运行情况以及性能监控信息,动态调整这些参数以提供最合适的停顿时间或者最大的吞吐量。</font>如果对于垃圾收集器运作原理不太了解,以至于在优化比较困难的时候,使用Parallel Scavenge收集器配合自适应调节策略,把内存管理的调优任务交给虚拟机去完成将是一个不错的选择。

Serial Old收集器

Serial收集器的老年代版本,同样是一个单线程收集器,使用“标记-整理算法”,这个收集器的主要意义也是在于给Client模式下的虚拟机使用。

Parallel Old收集器

  Parallel Scavenge收集器的老年代版本,使用多线程和“标记-整理”算法。这个收集器在JDK 1.6之后的出现,“吞吐量优先收集器”终于有了比较名副其实的应用组合,在注重吞吐量以及CPU资源敏感的场合,都可以优先考虑Parallel Scavenge收集器+Parallel Old收集器的组合。运行过程如下图所示:

Parallel Old收集器.png

CMS收集器

<font color=red>CMS(Conrrurent Mark Sweep)收集器是以获取最短回收停顿时间为目标的收集器</font>。使用标记 - 清除算法,收集过程分为如下四步:

  1. 初始标记,标记GCRoots能直接关联到的对象,时间很短。
  2. 并发标记,进行GCRoots Tracing(可达性分析)过程,时间很长。
  3. 重新标记,修正并发标记期间因用户程序继续运作而导致标记产生变动的那一部分对象的标记记录,时间较长。
  4. 并发清除,回收内存空间,时间很长。

其中,并发标记与并发清除两个阶段耗时最长,但是可以与用户线程并发执行。运行过程如下图所示:

CMS收集器.png

说明:

  1. 对CPU资源非常敏感,可能会导致应用程序变慢,吞吐率下降。
  2. 无法处理浮动垃圾,因为在并发清理阶段用户线程还在运行,自然就会产生新的垃圾,而在此次收集中无法收集他们,只能留到下次收集,这部分垃圾为浮动垃圾,同时,由于用户线程并发执行,所以需要预留一部分老年代空间提供并发收集时程序运行使用。
  3. 由于采用的标记 - 清除算法,会产生大量的内存碎片,不利于大对象的分配,可能会提前触发一次Full GC。虚拟机提供了-XX:+UseCMSCompactAtFullCollection参数来进行碎片的合并整理过程,这样会使得停顿时间变长,虚拟机还提供了一个参数配置,-XX:+CMSFullGCsBeforeCompaction,用于设置执行多少次不压缩的Full GC后,接着来一次带压缩的GC。

G1收集器

  G1算法将堆划分为若干个区域(Region),它仍然属于分代收集器。不过,这些区域的一部分包含新生代,新生代的垃圾收集依然采用暂停所有应用线程的方式,将存活对象拷贝到老年代或者Survivor空间。老年代也分成很多区域,G1收集器通过将对象从一个区域复制到另外一个区域,完成了清理工作。这就意味着,在正常的处理过程中,G1完成了堆的压缩(至少是部分堆的压缩),这样也就不会有cms内存碎片问题的存在了。

  在G1中,还有一种特殊的区域,叫Humongous区域。 如果一个对象占用的空间超过了分区容量50%以上,G1收集器就认为这是一个巨型对象。这些巨型对象,默认直接会被分配在年老代,但是如果它是一个短期存在的巨型对象,就会对垃圾收集器造成负面影响。为了解决这个问题,G1划分了一个Humongous区,它用来专门存放巨型对象。如果一个H区装不下一个巨型对象,那么G1会寻找连续的H分区来存储。为了能找到连续的H区,有时候不得不启动Full GC。

G1.png

G1主要有以下特点:

  1. 并行和并发。使用多个CPU来缩短Stop The World停顿时间,与用户线程并发执行。
  2. 分代收集。独立管理整个堆,但是能够采用不同的方式去处理新创建对象和已经存活了一段时间、熬过多次GC的旧对象,以获取更好的收集效果。
  3. 空间整合。基于标记 - 整理算法,无内存碎片产生。
  4. 可预测的停顿。能简历可预测的停顿时间模型,能让使用者明确指定在一个长度为M毫秒的时间片段内,消耗在垃圾收集上的时间不得超过N毫秒。

  在G1之前的垃圾收集器,收集的范围都是整个新生代或者老年代,而G1不再是这样。使用G1收集器时,<font color=red>Java堆的内存布局与其他收集器有很大差别,它将整个Java堆划分为多个大小相等的独立区域(Region),虽然还保留有新生代和老年代的概念,但新生代和老年代不再是物理隔离的了,它们都是一部分(可以不连续)Region的集合</font>。

七、CMS和G1对比(过去 vs 未来)

CMS垃圾回收器

 CMS堆内存结构划分:

cms-内存结构.png

  • 新生代:eden space + 2个survivor
  • 老年代:old space
  • 持久代:1.8之前的perm space
  • 元空间:1.8之后的metaspace

  <font color="red">注意:这些space必须是地址连续的空间</font>

CMS中垃圾回收模式

  • 对象分配

    1. 优先在Eden区分配

        在JVM内存模型一文中, 我们大致了解了VM年轻代堆内存可以划分为一块Eden区和两块Survivor区. 在大多数情况下, 对象在新生代Eden区中分配, 当Eden区没有足够空间分配时, VM发起一次Minor GC, 将Eden区和其中一块Survivor区内尚存活的对象放入另一块Survivor区域, 如果在Minor GC期间发现新生代存活对象无法放入空闲的Survivor区, 则会通过空间分配担保机制使对象提前进入老年代(空间分配担保见下).
      
    2. 大对象直接进入老年代

        Serial和ParNew两款收集器提供了-XX:PretenureSizeThreshold的参数, 令大于该值的大对象直接在老年代分配, 这样做的目的是避免在Eden区和Survivor区之间产生大量的内存复制(大对象一般指 需要大量连续内存的Java对象, 如很长的字符串和数组), 因此大对象容易导致还有不少空闲内存就提前触发GC以获取足够的连续空间.
      
        然而取历次晋升的对象的平均大小也是有一定风险的, 如果某次Minor GC存活后的对象突增,远远高于平均值的话,依然可能导致担保失败(Handle Promotion Failure, 老年代也无法存放这些对象了), 此时就只好在失败后重新发起一次Full GC(让老年代腾出更多空间).
      
    3. 空间分配担保

      
        在执行Minor GC前, VM会首先检查老年代是否有足够的空间存放新生代尚存活对象, 由于新生代使用复制收集算法, 为了提升内存利用率, 只使用了其中一个Survivor作为轮换备份, 因此当出现大量对象在Minor GC后仍然存活的情况时, 就需要老年代进行分配担保, 让Survivor无法容纳的对象直接进入老年代, 但前提是老年代需要有足够的空间容纳这些存活对象. 但存活对象的大小在实际完成GC前是无法明确知道的, 因此Minor GC前, VM会先首先检查老年代连续空间是否大于新生代对象总大小或历次晋升的平均大小, 如果条件成立, 则进行Minor GC, 否则进行Full GC(让老年代腾出更多空间).
      
  • 对象晋升

    1. 年龄阈值

        VM为每个对象定义了一个对象年龄(Age)计数器, 对象在Eden出生如果经第一次Minor GC后仍然存活, 且能被Survivor容纳的话, 将被移动到Survivor空间中, 并将年龄设为1. 以后对象在Survivor区中每熬过一次Minor GC年龄就+1. 当增加到一定程度(-XX:MaxTenuringThreshold, 默认15), 将会晋升到老年代.
      
    2. 提前晋升: 动态年龄判定

        然而VM并不总是要求对象的年龄必须达到MaxTenuringThreshold才能晋升老年代: 如果在Survivor空间中相同年龄所有对象大小的总和大于Survivor空间的一半, 年龄大于或等于该年龄的对象就可以直接进入老年代, 而无须等到晋升年龄.
      

G1垃圾回收器

G1堆内存结构划分(它将整个Java堆划分为多个大小相等的独立区域Region)

G1-内存结构.png

G1中提供了三种垃圾回收模式:young gc、mixed gc 和 full gc

  • Young GC

    发生在年轻代的GC算法,一般对象(除了巨型对象)都是在eden region中分配内存,当所有eden region被耗尽无法申请内存时,就会触发一次young gc,这种触发机制和之前的young gc差不多,执行完一次young gc,活跃对象会被拷贝到survivor region或者晋升到old region中,空闲的region会被放入空闲列表中,等待下次被使用。

  • Mixed GC

    当越来越多的对象晋升到老年代old region时,为了避免堆内存被耗尽,虚拟机会触发一个混合的垃圾收集器,即mixed gc,该算法并不是一个old gc,除了回收整个young region,还会回收一部分的old region,这里需要注意:是一部分老年代,而不是全部老年代,可以选择哪些old region进行收集,从而可以对垃圾回收的耗时时间进行控制。

  • Full GC

    如果对象内存分配速度过快,mixed gc来不及回收,导致老年代被填满,就会触发一次full gc,G1的full gc算法就是单线程执行的serial old gc,会导致异常长时间的暂停时间,需要进行不断的调优,尽可能的避免full gc.

八、各种垃圾收集器的选用

  • 首先查看你使用的垃圾回收器是什么?

    java -XX:+PrintCommandLineFlags -version
    
  • 根据自身系统需求选择最合适的垃圾回收器(没有最好的,只有最是适合的)

    各种收集器配置.png

九、总结

  • 到此GC的内存就差不多了,其中不免有些错误的地方,或者理解有偏颇的地方欢迎大家提出来!
  • 关于GC更细粒度的调优,没敢妄言,今后有了实战事例在补上!!!

个人博客地址:

csdn:https://blog.csdn.net/tiantuo6513

cnblogs:https://www.cnblogs.com/baixianlong

segmentfault:https://segmentfault.com/u/baixianlong

github:https://github.com/xianlongbai

本文参考:

<font color="gray">https://www.cnblogs.com/xiaox...;/font>

<font color="gray">https://zhuanlan.zhihu.com/p/...;/font>

查看原文

赞 0 收藏 0 评论 0

会炼钢的小白龙 发布了文章 · 2019-04-08

SpringBoot中异步请求和异步调用(看这一篇就够了)

<font color=gray> 原创不易,如需转载,请注明出处https://www.cnblogs.com/baixianlong/p/10661591.html,否则将追究法律责任!!! </font>

一、SpringBoot中异步请求的使用

1、异步请求与同步请求

同步请求
异步请求

特点:

  • 可以先释放容器分配给请求的线程与相关资源,减轻系统负担,释放了容器所分配线程的请求,其响应将被延后,可以在耗时处理完成(例如长时间的运算)时再对客户端进行响应。<font color=red>一句话:增加了服务器对客户端请求的吞吐量</font>(实际生产上我们用的比较少,如果并发请求量很大的情况下,我们会通过nginx把请求负载到集群服务的各个节点上来分摊请求压力,当然还可以通过消息队列来做请求的缓冲)。

2、异步请求的实现

方式一:Servlet方式实现异步请求


    @RequestMapping(value = "/email/servletReq", method = GET)
    public void servletReq (HttpServletRequest request, HttpServletResponse response) {
        AsyncContext asyncContext = request.startAsync();
        //设置监听器:可设置其开始、完成、异常、超时等事件的回调处理
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onTimeout(AsyncEvent event) throws IOException {
                System.out.println("超时了...");
                //做一些超时后的相关操作...
            }
            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {
                System.out.println("线程开始");
            }
            @Override
            public void onError(AsyncEvent event) throws IOException {
                System.out.println("发生错误:"+event.getThrowable());
            }
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                System.out.println("执行完成");
                //这里可以做一些清理资源的操作...
            }
        });
        //设置超时时间
        asyncContext.setTimeout(20000);
        asyncContext.start(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10000);
                    System.out.println("内部线程:" + Thread.currentThread().getName());
                    asyncContext.getResponse().setCharacterEncoding("utf-8");
                    asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
                    asyncContext.getResponse().getWriter().println("这是异步的请求返回");
                } catch (Exception e) {
                    System.out.println("异常:"+e);
                }
                //异步请求完成通知
                //此时整个请求才完成
                asyncContext.complete();
            }
        });
        //此时之类 request的线程连接已经释放了
        System.out.println("主线程:" + Thread.currentThread().getName());
    }

方式二:使用很简单,直接返回的参数包裹一层callable即可,可以继承WebMvcConfigurerAdapter类来设置默认线程池和超时处理

    @RequestMapping(value = "/email/callableReq", method = GET)
    @ResponseBody
    public Callable<String> callableReq () {
        System.out.println("外部线程:" + Thread.currentThread().getName());

        return new Callable<String>() {

            @Override
            public String call() throws Exception {
                Thread.sleep(10000);
                System.out.println("内部线程:" + Thread.currentThread().getName());
                return "callable!";
            }
        };
    }

    @Configuration
    public class RequestAsyncPoolConfig extends WebMvcConfigurerAdapter {
    
    @Resource
    private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;

    @Override
    public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
        //处理 callable超时
        configurer.setDefaultTimeout(60*1000);
        configurer.setTaskExecutor(myThreadPoolTaskExecutor);
        configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor());
    }

    @Bean
    public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() {
        return new TimeoutCallableProcessingInterceptor();
    }

}

方式三:和方式二差不多,在Callable外包一层,给WebAsyncTask设置一个超时回调,即可实现超时处理


    @RequestMapping(value = "/email/webAsyncReq", method = GET)
    @ResponseBody
    public WebAsyncTask<String> webAsyncReq () {
        System.out.println("外部线程:" + Thread.currentThread().getName());
        Callable<String> result = () -> {
            System.out.println("内部线程开始:" + Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (Exception e) {
                // TODO: handle exception
            }
            logger.info("副线程返回");
            System.out.println("内部线程返回:" + Thread.currentThread().getName());
            return "success";
        };
        WebAsyncTask<String> wat = new WebAsyncTask<String>(3000L, result);
        wat.onTimeout(new Callable<String>() {

            @Override
            public String call() throws Exception {
                // TODO Auto-generated method stub
                return "超时";
            }
        });
        return wat;
    }

方式四:DeferredResult可以处理一些相对复杂一些的业务逻辑,最主要还是可以在另一个线程里面进行业务处理及返回,即可在两个完全不相干的线程间的通信。

@RequestMapping(value = "/email/deferredResultReq", method = GET)
    @ResponseBody
    public DeferredResult<String> deferredResultReq () {
        System.out.println("外部线程:" + Thread.currentThread().getName());
        //设置超时时间
        DeferredResult<String> result = new DeferredResult<String>(60*1000L);
        //处理超时事件 采用委托机制
        result.onTimeout(new Runnable() {

            @Override
            public void run() {
                System.out.println("DeferredResult超时");
                result.setResult("超时了!");
            }
        });
        result.onCompletion(new Runnable() {

            @Override
            public void run() {
                //完成后
                System.out.println("调用完成");
            }
        });
        myThreadPoolTaskExecutor.execute(new Runnable() {

            @Override
            public void run() {
                //处理业务逻辑
                System.out.println("内部线程:" + Thread.currentThread().getName());
                //返回结果
                result.setResult("DeferredResult!!");
            }
        });
       return result;
    }

二、SpringBoot中异步调用的使用

1、介绍

异步请求的处理。除了异步请求,一般上我们用的比较多的应该是异步调用。通常在开发过程中,会遇到一个方法是和实际业务无关的,没有紧密性的。比如记录日志信息等业务。这个时候正常就是启一个新线程去做一些业务处理,让主线程异步的执行其他业务。

2、使用方式(基于spring下)

  • 需要在启动类加入@EnableAsync使异步调用@Async注解生效
  • 在需要异步执行的方法上加入此注解即可@Async("threadPool"),threadPool为自定义线程池
  • 代码略。。。就俩标签,自己试一把就可以了

3、注意事项

  • 在默认情况下,未设置TaskExecutor时,默认是使用SimpleAsyncTaskExecutor这个线程池,但此线程不是真正意义上的线程池,因为线程不重用,每次调用都会创建一个新的线程。可通过控制台日志输出可以看出,每次输出线程名都是递增的。所以最好我们来自定义一个线程池。
  • 调用的异步方法,不能为同一个类的方法(包括同一个类的内部类),简单来说,因为Spring在启动扫描时会为其创建一个代理类,而同类调用时,还是调用本身的代理类的,所以和平常调用是一样的。其他的注解如@Cache等也是一样的道理,说白了,就是Spring的代理机制造成的。所以在开发中,最好把异步服务单独抽出一个类来管理。下面会重点讲述。。

4、什么情况下会导致@Async异步方法会失效?

  1. <font color=red>调用同一个类下注有@Async异步方法</font>:在spring中像@Async和@Transactional、cache等注解本质使用的是动态代理,其实Spring容器在初始化的时候Spring容器会将含有AOP注解的类对象“替换”为代理对象(简单这么理解),那么注解失效的原因就很明显了,就是因为调用方法的是对象本身而不是代理对象,因为没有经过Spring容器,那么解决方法也会沿着这个思路来解决。
  2. <font color=red>调用的是静态(static )方法</font>
  3. <font color=red>调用(private)私有化方法</font>

5、解决4中问题1的方式(其它2,3两个问题自己注意下就可以了)

  1. <font color=red>将要异步执行的方法单独抽取成一个类</font>,原理就是当你把执行异步的方法单独抽取成一个类的时候,这个类肯定是被Spring管理的,其他Spring组件需要调用的时候肯定会注入进去,这时候实际上注入进去的就是代理类了。
  2. 其实我们的注入对象都是从Spring容器中给当前Spring组件进行成员变量的赋值,由于某些类使用了AOP注解,那么实际上在Spring容器中实际存在的是它的代理对象。那么我们就可以<font color=red>通过上下文获取自己的代理对象调用异步方法</font>。

    @Controller
    @RequestMapping("/app")
    public class EmailController {
    
        //获取ApplicationContext对象方式有多种,这种最简单,其它的大家自行了解一下
        @Autowired
        private ApplicationContext applicationContext;
        
        @RequestMapping(value = "/email/asyncCall", method = GET)
        @ResponseBody
        public Map<String, Object> asyncCall () {
            Map<String, Object> resMap = new HashMap<String, Object>();
            try{
                //这样调用同类下的异步方法是不起作用的
                  //this.testAsyncTask();
                //通过上下文获取自己的代理对象调用异步方法
                EmailController emailController = (EmailController)applicationContext.getBean(EmailController.class);
                emailController.testAsyncTask();
                resMap.put("code",200);
            }catch (Exception e) {
                resMap.put("code",400);
                logger.error("error!",e);
            }
            return resMap;
        }
    
        //注意一定是public,且是非static方法
        @Async
        public void testAsyncTask() throws InterruptedException {
            Thread.sleep(10000);
            System.out.println("异步任务执行完成!");
        }
    
    }
  3. <font color=red>开启cglib代理,手动获取Spring代理类</font>,从而调用同类下的异步方法。

    • 首先,在启动类上加上<font color=red>@EnableAspectJAutoProxy(exposeProxy = true)</font>注解。
    • 代码实现,如下:

      @Service
      @Transactional(value = "transactionManager", readOnly = false, propagation = Propagation.REQUIRED, rollbackFor = Throwable.class)
      public class EmailService {

      @Autowired
      private ApplicationContext applicationContext;
      
      @Async
      public void testSyncTask() throws InterruptedException {
          Thread.sleep(10000);
          System.out.println("异步任务执行完成!");
      }
      
      
      public void asyncCallTwo() throws InterruptedException {
          //this.testSyncTask();

      // EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);
      // emailService.testSyncTask();

          boolean isAop = AopUtils.isAopProxy(EmailController.class);//是否是代理对象;
          boolean isCglib = AopUtils.isCglibProxy(EmailController.class);  //是否是CGLIB方式的代理对象;
          boolean isJdk = AopUtils.isJdkDynamicProxy(EmailController.class);  //是否是JDK动态代理方式的代理对象;
          //以下才是重点!!!
          EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);
          EmailService proxy = (EmailService) AopContext.currentProxy();
          System.out.println(emailService == proxy ? true : false);
          proxy.testSyncTask();
          System.out.println("end!!!");
      }

      }

三、异步请求与异步调用的区别

  • 两者的使用场景不同,异步请求用来解决并发请求对服务器造成的压力,从而提高对请求的吞吐量;而异步调用是用来做一些非主线流程且不需要实时计算和响应的任务,比如同步日志到kafka中做日志分析等。
  • 异步请求是会一直等待response相应的,需要返回结果给客户端的;而异步调用我们往往会马上返回给客户端响应,完成这次整个的请求,至于异步调用的任务后台自己慢慢跑就行,客户端不会关心。

四、总结

  • 异步请求和异步调用的使用到这里基本就差不多了,有问题还希望大家多多指出。
  • 这边文章提到了动态代理,而spring中Aop的实现原理就是动态代理,后续会对动态代理做详细解读,还望多多支持哈~

个人博客地址:

csdn:https://blog.csdn.net/tiantuo6513
cnblogs:https://www.cnblogs.com/baixianlong
segmentfault:https://segmentfault.com/u/baixianlong
github:https://github.com/xianlongbai
查看原文

赞 1 收藏 1 评论 0

会炼钢的小白龙 发布了文章 · 2019-04-08

SpringBoot中并发定时任务的实现、动态定时任务的实现(看这一篇就够了)

原创不易,如需转载,请注明出处https://www.cnblogs.com/baixianlong/p/10659045.html,否则将追究法律责任!!!

一、在JAVA开发领域,目前可以通过以下几种方式进行定时任务

1、单机部署模式

  • Timer:jdk中自带的一个定时调度类,可以简单的实现按某一频度进行任务执行。提供的功能比较单一,无法实现复杂的调度任务。
  • ScheduledExecutorService:也是jdk自带的一个基于线程池设计的定时任务类。其每个调度任务都会分配到线程池中的一个线程执行,所以其任务是并发执行的,互不影响。
  • Spring Task:Spring提供的一个任务调度工具,支持注解和配置文件形式,支持Cron表达式,使用简单但功能强大。
  • Quartz:一款功能强大的任务调度器,可以实现较为复杂的调度功能,如每月一号执行、每天凌晨执行、每周五执行等等,还支持分布式调度,就是配置稍显复杂。

2、分布式集群模式(不多介绍,简单提一下)

问题:

I、如何解决定时任务的多次执行?
II、如何解决任务的单点问题,实现任务的故障转移?

问题I的简单思考:

1、固定执行定时任务的机器(可以有效避免多次执行的情况 ,缺点就是单点故障问题)。
2、借助Redis的过期机制和分布式锁。
3、借助mysql的锁机制等。

成熟的解决方案:

1、Quartz:可以去看看这篇文章[Quartz分布式]( https://www.cnblogs.com/jiafuwei/p/6145280.html)。
2、elastic-job:(https://github.com/elasticjob/elastic-job-lite)当当开发的弹性分布式任务调度系统,采用zookeeper实现分布式协调,实现任务高可用以及分片。
3、xxl-job:(https://github.com/xuxueli/xxl-job)是大众点评员发布的分布式任务调度平台,是一个轻量级分布式任务调度框架。
4、saturn:(https://github.com/vipshop/Saturn) 是唯品会提供一个分布式、容错和高可用的作业调度服务框架。

二、SpringTask实现定时任务(这里是基于springboot)

1、简单的定时任务实现

使用方式:

使用@EnableScheduling注解开启对定时任务的支持。
使用@Scheduled 注解即可,基于corn、fixedRate、fixedDelay等一些定时策略来实现定时任务。

使用缺点:

1、多个定时任务使用的是同一个调度线程,所以任务是阻塞执行的,执行效率不高。
2、其次如果出现任务阻塞,导致一些场景的定时计算没有实际意义,比如每天12点的一个计算任务被阻塞到1点去执行,会导致结果并非我们想要的。

使用优点:

1、配置简单
2、适用于单个后台线程执行周期任务,并且保证顺序一致执行的场景    

源码分析:

//默认使用的调度器
 if(this.taskScheduler == null) {  
    this.localExecutor = Executors.newSingleThreadScheduledExecutor();
    this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
//可以看到SingleThreadScheduledExecutor指定的核心线程为1,说白了就是单线程执行
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
//利用了DelayedWorkQueue延时队列作为任务的存放队列,这样便可以实现任务延迟执行或者定时执行
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

  

2、实现并发的定时任务

使用方式:

  • 方式一:由1中我们知道之所以定时任务是阻塞执行,是配置的线程池决定的,那就好办了,换一个不就行了!直接上代码:

    @Configuration
    public class ScheduledConfig implements SchedulingConfigurer {
    
        @Autowired
        private TaskScheduler myThreadPoolTaskScheduler;
    
        @Override
        public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
            //简单粗暴的方式直接指定
            //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
            //也可以自定义的线程池,方便线程的使用与维护,这里不多说了
            scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
        }
    }
    
    @Bean(name = "myThreadPoolTaskScheduler")
    public TaskScheduler getMyThreadPoolTaskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.setThreadNamePrefix("Haina-Scheduled-");
        taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //调度器shutdown被调用时等待当前被调度的任务完成
        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        //等待时长
        taskScheduler.setAwaitTerminationSeconds(60);
        return taskScheduler;
    }        
  • 方式二:方式一的本质改变了任务调度器默认使用的线程池,接下来这种是不改变调度器的默认线程池,而是把当前任务交给一个异步线程池去执行

    • 首先使用@EnableAsync 启用异步任务
    • 然后在定时任务的方法加上@Async即可,默认使用的线程池为SimpleAsyncTaskExecutor(该线程池默认来一个任务创建一个线程,就会不断创建大量线程,极有可能压爆服务器内存。当然它有自己的限流机制,这里就不多说了,有兴趣的自己翻翻源码~)
    • 项目中为了更好的控制线程的使用,我们可以自定义我们自己的线程池,使用方式@Async("myThreadPool")
废话太多,直接上代码:

    @Scheduled(fixedRate = 1000*10,initialDelay = 1000*20)
    @Async("myThreadPoolTaskExecutor")
    //@Async
    public void scheduledTest02(){
        System.out.println(Thread.currentThread().getName()+"--->xxxxx--->"+Thread.currentThread().getId());
    }

    //自定义线程池
    @Bean(name = "myThreadPoolTaskExecutor")
    public TaskExecutor  getMyThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(20);
        taskExecutor.setMaxPoolSize(200);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.setKeepAliveSeconds(200);
        taskExecutor.setThreadNamePrefix("Haina-ThreadPool-");
        // 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //调度器shutdown被调用时等待当前被调度的任务完成
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        //等待时长
        taskExecutor.setAwaitTerminationSeconds(60);
        taskExecutor.initialize();
        return taskExecutor;
    }
  • 线程池的使用心得(后续有专门文章来探讨)

    • java中提供了ThreadPoolExecutor和ScheduledThreadPoolExecutor,对应与spring中的ThreadPoolTaskExecutor和ThreadPoolTaskScheduler,但是在原有的基础上增加了新的特性,在spring环境下更容易使用和控制。
    • 使用自定义的线程池能够避免一些默认线程池造成的内存溢出、阻塞等等问题,更贴合自己的服务特性
    • 使用自定义的线程池便于对项目中线程的管理、维护以及监控。
    • 即便在非spring环境下也不要使用java默认提供的那几种线程池,坑很多,阿里代码规约不说了吗,得相信大厂!!!

三、动态定时任务的实现

问题:

  • 使用@Scheduled注解来完成设置定时任务,但是有时候我们往往需要对周期性的时间的设置会做一些改变,或者要动态的启停一个定时任务,那么这个时候使用此注解就不太方便了,原因在于这个注解中配置的cron表达式必须是常量,那么当我们修改定时参数的时候,就需要停止服务,重新部署。

解决办法:

  • 方式一:实现SchedulingConfigurer接口,重写configureTasks方法,重新制定Trigger,核心方法就是addTriggerTask(Runnable task, Trigger trigger) ,不过需要注意的是,此种方式修改了配置值后,需要在下一次调度结束后,才会更新调度器,并不会在修改配置值时实时更新,实时更新需要在修改配置值时额外增加相关逻辑处理。

    @Configuration
    public class ScheduledConfig implements SchedulingConfigurer {
    
    @Autowired
    private TaskScheduler myThreadPoolTaskScheduler;
    
    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
        scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
        //可以实现动态调整定时任务的执行频率
        scheduledTaskRegistrar.addTriggerTask(
                //1.添加任务内容(Runnable)
                () -> System.out.println("cccccccccccccccc--->" + Thread.currentThread().getId()),
                //2.设置执行周期(Trigger)
                triggerContext -> {
                    //2.1 从数据库动态获取执行周期
                    String cron = "0/2 * * * * ? ";
                    //2.2 合法性校验.
    //                    if (StringUtils.isEmpty(cron)) {
    //                        // Omitted Code ..
    //                    }
                        //2.3 返回执行周期(Date)
                        return new CronTrigger(cron).nextExecutionTime(triggerContext);
                    }
            );
    }
    }
  • 方式二:使用threadPoolTaskScheduler类可实现动态添加删除功能,当然也可实现执行频率的调整

    首先,我们要认识下这个调度类,它其实是对java中ScheduledThreadPoolExecutor的一个封装改进后的产物,主要改进有以下几点:
        1、提供默认配置,因为是ScheduledThreadPoolExecutor,所以只有poolSize这一个默认参数。
        2、支持自定义任务,通过传入Trigger参数。
        3、对任务出错处理进行优化,如果是重复性的任务,不抛出异常,通过日志记录下来,不影响下次运行,如果是只执行一次的任务,将异常往上抛。
    顺便说下ThreadPoolTaskExecutor相对于ThreadPoolExecutor的改进点:
        1、提供默认配置,原生的ThreadPoolExecutor的除了ThreadFactory和RejectedExecutionHandler其他没有默认配置
        2、实现AsyncListenableTaskExecutor接口,支持对FutureTask添加success和fail的回调,任务成功或失败的时候回执行对应回调方法。
        3、因为是spring的工具类,所以抛出的RejectedExecutionException也会被转换为spring框架的TaskRejectedException异常(这个无所谓)
        4、提供默认ThreadFactory实现,直接通过参数重载配置

    扯了这么多,还是直接上代码:

    @Component
    public class DynamicTimedTask {
    
        private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);
    
        //利用创建好的调度类统一管理
        //@Autowired
        //@Qualifier("myThreadPoolTaskScheduler")
        //private ThreadPoolTaskScheduler myThreadPoolTaskScheduler;
    
    
        //接受任务的返回结果
        private ScheduledFuture<?> future;
    
        @Autowired
        private ThreadPoolTaskScheduler threadPoolTaskScheduler;
        
        //实例化一个线程池任务调度类,可以使用自定义的ThreadPoolTaskScheduler
        @Bean
        public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
            ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
            return new ThreadPoolTaskScheduler();
        }
    
    
        /**
         * 启动定时任务
         * @return
         */
        public boolean startCron() {
            boolean flag = false;
            //从数据库动态获取执行周期
            String cron = "0/2 * * * * ? ";
            future = threadPoolTaskScheduler.schedule(new CheckModelFile(),cron);
            if (future!=null){
                flag = true;
                logger.info("定时check训练模型文件,任务启动成功!!!");
            }else {
                logger.info("定时check训练模型文件,任务启动失败!!!");
            }
            return flag;
        }
    
        /**
         * 停止定时任务
         * @return
         */
        public boolean stopCron() {
            boolean flag = false;
            if (future != null) {
                boolean cancel = future.cancel(true);
                if (cancel){
                    flag = true;
                    logger.info("定时check训练模型文件,任务停止成功!!!");
                }else {
                    logger.info("定时check训练模型文件,任务停止失败!!!");
                }
            }else {
                flag = true;
                logger.info("定时check训练模型文件,任务已经停止!!!");
            }
            return flag;
        }
    
    
        class CheckModelFile implements Runnable{
    
            @Override
            public void run() {
                //编写你自己的业务逻辑  
                System.out.print("模型文件检查完毕!!!")
            }
        }

    }

四、总结

  • 到此基于springtask下的定时任务的简单使用算是差不多了,其中不免有些错误的地方,或者理解有偏颇的地方欢迎大家提出来!
  • 基于分布式集群下的定时任务使用,后续有时间再继续!!!

个人博客地址:

csdn:https://blog.csdn.net/tiantuo6513
cnblogs:https://www.cnblogs.com/baixianlong
segmentfault:https://segmentfault.com/u/baixianlong
github:https://github.com/xianlongbai
查看原文

赞 2 收藏 0 评论 0

会炼钢的小白龙 发布了文章 · 2018-03-23

sqoop将mysql数据导入hbase、hive的血与泪

一、 需求:(将以下这张表数据导入mysql)

clipboard.png

由此,编写如下sqoop导入命令

   sqoop import -D sqoop.hbase.add.row.key=true --connect jdbc:mysql://192.168.1.9/spider --username root --password root --table test_goods --hbase-create-table --hbase-table t_goods  --column-family cf --hbase-row-key id -m 1

一切看着都很正常,接下来开始执行命令,报如下错误:
1、 Error during import: No primary key could be found for table *
报错原因就是指定的mysql表名不是大写,所以mysql表名必须大写
2、 Could not insert row with null value for row-key column
报错原因是没有指定mysql的列名,所以必须指定列名,并且hbase-row-key id 中的id,必须在–columns中显示。 --columns ID,GOODS_NAME, GOODS_PRICE
3、 Error parsing arguments for import Unrecognized argument
报错原因是在指定mysql的列名时,用逗号隔开的时候我多加了空格,所以在
Columns后显示的列名只能用逗号隔开,不要带空格

将以上三个问题排除后:我的最新导入命令变为如下:

sqoop import -D sqoop.hbase.add.row.key=true --connect jdbc:mysql://192.168.1.9:3306/spider --username root --password root --table TEST_GOODS --columns ID,GOODS_NAME,GOODS_PRICE --hbase-create-table --hbase-table t_goods --column-family cf --hbase-row-key ID --where "ID >= 5" -m 1

注意:这里有个小问题:记得将id>=5引起来

查看hbase,数据已经成功导入

clipboard.png

最后我将命令写入一个xxx文件,通过sqoop –options-file xxx 执行导入命令

错误写法如下:

import
-D sqoop.hbase.add.row.key=true 
--connect jdbc:mysql://192.168.1.9:3306/spider 
--username root 
--password root 
--table TEST_GOODS 
--columns ID,GOODS_NAME,GOODS_PRICE 
--hbase-create-table 
--hbase-table test_goods 
--column-family cf 
--hbase-row-key ID 
--where "ID >= 5"
-m 1

错误原因:参数的名称和参数的值没有进行回车换行

正确写法:

import 
-D 
sqoop.hbase.add.row.key=true 
--connect 
jdbc:mysql://192.168.1.9:3306/spider 
--username 
root 
--password 
root 
--table 
TEST_GOODS 
--columns 
ID,GOODS_NAME,GOODS_PRICE 
--hbase-create-table 
--hbase-table 
tt_goods 
--column-family 
cf 
--hbase-row-key 
ID 
--where 
ID>=5 
-m 
1

注:参数含义解释

-D sqoop.hbase.add.row.key=true 是否将rowkey相关字段写入列族中,默认为false,默认情况下你将在列族中看不到任何row key中的字段。注意,该参数必须放在import之后。
--connect 数据库连接字符串
--username –password  mysql数据库的用户名密码
--table Test_Goods表名,注意大写
--hbase-create-table  如果hbase中该表不存在则创建
--hbase-table   对应的hbase表名
--hbase-row-key   hbase表中的rowkey,注意格式
--column-family   hbase表的列族
--where    导入是mysql表的where条件,写法和sql中一样
--split-by CREATE_TIME   默认情况下sqoop使用4个并发执行任务,需要制订split的列,如果不想使用并发,可以用参数 --m 1

到此,bug解决完成!!!

二、知识拓展,定时增量导入

1、Sqoop增量导入

sqoop import -D sqoop.hbase.add.row.key=true --connect jdbc:mysql://192.168.1.9:3306/spider --username root --password root --table TEST_GOODS --columns ID,GOODS_NAME,GOODS_PRICE --hbase-create-table --hbase-table t_goods --column-family cf --hbase-row-key ID --incremental lastmodified --check-column U_DATE --last-value '2017-06-27' --split-by U_DATE

--incremental lastmodified 增量导入支持两种模式 append 递增的列;lastmodified时间戳。
--check-column 增量导入时参考的列
--last-value 最小值,这个例子中表示导入2017-06-27到今天的值

2、Sqoop job:

sqoop job --create testjob01 --import --connect jdbc:mysql://192.168.1.9:3306/spider --username root --password root --table TEST_GOODS    --columns ID,GOODS_NAME,GOODS_PRICE --hbase-create-table --hbase-table t_goods --column-family cf --hbase-row-key ID -m 1

设置定时执行以上sqoop job
使用linux定时器:crontab -e
例如每天执行

0 0 * * * /opt/local/sqoop-1.4.6/bin/sqoop job ….
--exec testjob01

三、数据从mysql导入hive中后,出现数据不一致情况

我们运行hadoop fs -cat /user/hadoop/student/part-m-00000,可以看到原来字段与字段之间都用‘,’分隔开,这是sqoop默认的,这时候,如果一个字段值当中包含‘,’,再向hive中插入数据时分隔就会出错。因为hive也是用‘,’分隔的。
解决方法:建议用‘001'来进行sqoop 导入数据时的 分割。也就是--fields-terminated-by <char>参数。
例子:

sqoop import --connect "jdbc:oracle:thin:@//localhost:1521/student" --password "***" --username "***" --query "select * from student where name='zhangsan' and class_id='003' and \$CONDITIONS" --target-dir "/user/hadoop/student" --fields-terminated-by "\001" --verbose -m 1

查看原文

赞 1 收藏 1 评论 0

会炼钢的小白龙 关注了用户 · 2018-03-22

代码宇宙 @universe_of_code

我愿与你一同居住于代码宇宙中,用我的芯为你增加能量,和你一起守护只属于我们的世界。(六翼天使)

关注 11801

会炼钢的小白龙 关注了用户 · 2018-03-22

剑心无痕 @jianxinwuhen

关注 8097

会炼钢的小白龙 关注了用户 · 2018-03-22

大闲人柴毛毛 @daxianrenchaimaomao

关注 1443

会炼钢的小白龙 关注了用户 · 2018-03-22

hfhan @hfhan

砥砺前行

关注 19562

会炼钢的小白龙 关注了用户 · 2018-03-22

小蜗牛 @wangwenlin

不高不帅,时常犯二, 在逗比的道路上越走越远。。。

看完请闭眼!

关注 15079

认证与成就

  • 获得 4 次点赞
  • 获得 2 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 2 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2018-03-22
个人主页被 1.1k 人浏览