纪卓志 George

纪卓志 George 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 个人简介什么都没有

个人动态

纪卓志 George 发布了文章 · 3月2日

深入探究JDK中Timer的使用方式与源码解析

导言

在项目开发过程中,经常会遇到需要使用定时执行或延时执行任务的场景。比如我们在活动结束后自动汇总生成效果数据、导出Excel表并将文件通过邮件推送到用户手上,再比如微信运动每天都会在十点后向你发送个位数的步数(在?把摄像头从我家拆掉!)。

本文将会介绍java.util.Timer的使用,并从源码层面对它进行解析。

定时器Timer的使用

java.util.Timer是JDK提供的非常使用的工具类,用于计划在特定时间后执行的任务,可以只执行一次或定期重复执行。在JDK内部很多组件都是使用的java.util.Timer实现定时任务或延迟任务。

Timer可以创建多个对象的实例,每个对象都有且只有一个后台线程来执行任务。

Timer类是线程安全的,多个线程可以共享一个计时器,而无需使用任何的同步。

构造方法

首先我们可以看下Timer类的构造方法的API文档

  1. Timer(): 创建一个新的计时器。
  2. Timer(boolean isDaemon): 创建一个新的定时器,其关联的工作线程可以指定为守护线程。
  3. Timer(String name): 创建一个新的定时器,其关联的工作线程具有指定的名称。
  4. Timer(String name, boolean isDaemon): 创建一个新的定时器,其相关线程具有指定的名称,可以指定为守护线程。
Note: 守护线程是低优先级线程,在后台执行次要任务,比如垃圾回收。当有非守护线程在运行时,Java应用不会退出。如果所有的非守护线程都退出了,那么所有的守护线程也会随之退出。

实例方法

接下来我们看下Timer类的实例方法的API文档

  1. cancel(): 终止此计时器,并丢弃所有当前执行的任务。
  2. purge(): 从该计时器的任务队列中删除所有取消的任务。
  3. schedule(TimerTask task, Date time): 在指定的时间执行指定的任务。
  4. schedule(TimerTask task, Date firstTime, long period): 从指定 的时间开始 ,对指定的任务按照固定的延迟时间重复执行 。
  5. schedule(TimerTask task, long delay): 在指定的延迟之后执行指定的任务。
  6. schedule(TimerTask task, long delay, long period): 在指定的延迟之后开始 ,对指定的任务按照固定的延迟时间重复执行 。
  7. scheduleAtFixedRate(TimerTask task, Date firstTime, long period): 从指定的时间开始 ,对指定的任务按照固定速率重复执行 。
  8. scheduleAtFixedRate(TimerTask task, long delay, long period): 在指定的延迟之后开始 ,对指定的任务按照固定速率重复执行。
schedulescheduleAtFixedRate都是重复执行任务,区别在于schedule是在任务成功执行后,再按照固定周期再重新执行任务,比如第一次任务从0s开始执行,执行5s,周期是10s,那么下一次执行时间是15s而不是10s。而scheduleAtFixedRate是从任务开始执行时,按照固定的时间再重新执行任务,比如第一次任务从0s开始执行,执行5s,周期是10s,那么下一次执行时间是10s而不是15s。

使用方式

1. 执行时间晚于当前时间

接下来我们将分别使用schedule(TimerTask task, Date time)schedule(TimerTask task, long delay)用来在10秒后执行任务,并展示是否将Timer的工作线程设置成守护线程对Timer执行的影响。

首先我们创建类Task, 接下来我们的所有操作都会在这个类中执行, 在类中使用schedule(TimerTask task, Date time),代码如下

import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.text.MessageFormat.format;

public class Task {

    private static final long SECOND = 1000;

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println(format("程序结束时间为: {0}", currentTimeMillis()));
        }));

        long startTimestamp = currentTimeMillis();
        System.out.println(format("程序执行时间为: {0}", startTimestamp));
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                long exceptedTimestamp = startTimestamp + 10 * SECOND;
                long executingTimestamp = currentTimeMillis();
                long offset = executingTimestamp - exceptedTimestamp;
                System.out.println(format("任务运行在线程[{0}]上, 期望执行时间为[{1}], 实际执行时间为[{2}], 实际偏差[{3}]",
                        currentThread().getName(), exceptedTimestamp, executingTimestamp, offset));
            }
        }, new Date(startTimestamp + 10 * SECOND));
    }
}

在程序的最开始,我们注册程序结束时执行的函数,它用来打印程序的结束时间,我们稍后将会用它来展示工作线程设置为守护线程与非守护线程的差异。接下来是程序的主体部分,我们记录了程序的执行时间,定时任务执行时所在的线程、定时任务的期望执行时间与实际执行时间。

程序运行后的实际执行效果

程序执行时间为: 1,614,575,921,461
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,575,931,461], 实际执行时间为[1,614,575,931,464], 实际偏差[3]

程序在定时任务执行结束后并没有退出,我们注册的生命周期函数也没有执行,我们将在稍后解释这个现象。

接下来我们在类中使用schedule(TimerTask task, long delay), 来达到相同的在10秒钟之后执行的效果

import java.util.Timer;
import java.util.TimerTask;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.text.MessageFormat.format;

public class Task {

    private static final long SECOND = 1000;

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println(format("程序结束时间为: {0}", currentTimeMillis()));
        }));

        Timer timer = new Timer();
        long startTimestamp = currentTimeMillis();
        System.out.println(format("程序执行时间为: {0}", startTimestamp));
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                long exceptedTimestamp = startTimestamp + 10 * SECOND;
                long executingTimestamp = currentTimeMillis();
                long offset = executingTimestamp - exceptedTimestamp;
                System.out.println(format("任务运行在线程[{0}]上, 期望执行时间为[{1}], 实际执行时间为[{2}], 实际偏差[{3}]",
                        currentThread().getName(), exceptedTimestamp, executingTimestamp, offset));
            }
        }, 10 * SECOND);
    }
}

程序运行后的实际执行效果

程序执行时间为: 1,614,576,593,325
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,576,603,325], 实际执行时间为[1,614,576,603,343], 实际偏差[18]

回到我们刚刚的问题上,为什么我们的程序在执行完定时任务后没有正常退出?我们可以从Java API中对Thread类的描述中找到相关的内容:

从这段描述中,我们可以看到,只有在两种情况下,Java虚拟机才会退出执行

  1. 手动调用Runtime.exit()方法,并且安全管理器允许进行退出操作
  2. 所有的非守护线程都结束了,要么是执行完run()方法,要么是在run()方法中抛出向上传播的异常

所有的Timer在创建后都会创建关联的工作线程,这个关联的工作线程默认是非守护线程的,所以很明显我们满足第二个条件,所以程序会继续执行而不会退出。

那么如果我们将Timer的工作线程设置成守护线程会发生什么呢?

import java.util.Timer;
import java.util.TimerTask;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.text.MessageFormat.format;

public class Task {

    private static final long SECOND = 1000;

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println(format("程序结束时间为: {0}", currentTimeMillis()));
        }));

        Timer timer = new Timer(true);
        long startTimestamp = currentTimeMillis();
        System.out.println(format("程序执行时间为: {0}", startTimestamp));
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                long exceptedTimestamp = startTimestamp + 10 * SECOND;
                long executingTimestamp = currentTimeMillis();
                long offset = executingTimestamp - exceptedTimestamp;
                System.out.println(format("任务运行在线程[{0}]上, 期望执行时间为[{1}], 实际执行时间为[{2}], 实际偏差[{3}]",
                        currentThread().getName(), exceptedTimestamp, executingTimestamp, offset));
            }
        }, 10 * SECOND);
    }
}

程序运行后的实际执行结果

程序执行时间为: 1,614,578,037,976
程序结束时间为: 1,614,578,037,996

可以看到我们的延迟任务还没有开始执行,程序就已经结束了,因为在我们的主线程退出后,所有的非守护线程都结束了,所以Java虚拟机会正常退出,而不会等待Timer中所有的任务执行完成后再退出。

2. 执行时间早于当前时间

如果我们是通过计算Date来指定执行时间的话,那么不可避免会出现一个问题——计算后的时间是早于当前时间的,这很常见,尤其是Java虚拟机会在不恰当的时候执行垃圾回收,并导致STW(Stop the world)。

接下来,我们将调整之前调用schedule(TimerTask task, Date time)的代码,让它在过去的时间执行

import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.text.MessageFormat.format;

public class Task {

    private static final long SECOND = 1000;

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println(format("程序结束时间为: {0}", currentTimeMillis()));
        }));

        Timer timer = new Timer();
        long startTimestamp = currentTimeMillis();
        System.out.println(format("程序执行时间为: {0}", startTimestamp));
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                long exceptedTimestamp = startTimestamp - 10 * SECOND;
                long executingTimestamp = currentTimeMillis();
                long offset = executingTimestamp - exceptedTimestamp;
                System.out.println(format("任务运行在线程[{0}]上, 期望执行时间为[{1}], 实际执行时间为[{2}], 实际偏差[{3}]",
                        currentThread().getName(), exceptedTimestamp, executingTimestamp, offset));
            }
        }, new Date(startTimestamp - 10 * SECOND));
    }
}

程序运行后的执行结果

程序执行时间为: 1,614,590,000,184
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,589,990,184], 实际执行时间为[1,614,590,000,203], 实际偏差[10,019]

可以看到,当我们指定运行时间为过去时间时,Timer的工作线程会立执行该任务。

但是如果我们不是通过计算时间,而是期望延迟负数时间再执行,会发生什么呢?我们将调整之前调用schedule(TimerTask task, long delay)的代码, 让他以负数延迟时间执行

import java.util.Timer;
import java.util.TimerTask;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.text.MessageFormat.format;

public class Task {

    private static final long SECOND = 1000;

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println(format("程序结束时间为: {0}", currentTimeMillis()));
        }));

        Timer timer = new Timer();
        long startTimestamp = currentTimeMillis();
        System.out.println(format("程序执行时间为: {0}", startTimestamp));
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                long exceptedTimestamp = startTimestamp - 10 * SECOND;
                long executingTimestamp = currentTimeMillis();
                long offset = executingTimestamp - exceptedTimestamp;
                System.out.println(format("任务运行在线程[{0}]上, 期望执行时间为[{1}], 实际执行时间为[{2}], 实际偏差[{3}]",
                        currentThread().getName(), exceptedTimestamp, executingTimestamp, offset));
            }
        }, -10 * SECOND);
    }
}

程序运行后的执行结果

程序执行时间为: 1,614,590,267,556
Exception in thread "main" java.lang.IllegalArgumentException: Negative delay.
    at java.base/java.util.Timer.schedule(Timer.java:193)
    at cn.mgdream.schedule.Task.main(Task.java:22)

如果我们传入负数的延迟时间,那么Timer会抛出异常,告诉我们不能传入负数的延迟时间,这似乎是合理的——我们传入过去的时间是因为这是我们计算出来的,而不是我们主观传入的。在我们使用schedule(TimerTask task, long delay)需要注意这一点。

3. 向Timer中添加多个任务

接下来我们将分别向Timer中添加两个延迟任务,为了更容易地控制两个任务的调度顺序和时间,我们让第一个任务延迟5秒,第二个任务延迟10秒,同时让第一个任务阻塞10秒后再结束,通过这种方式来模拟出长任务。

import java.util.Timer;
import java.util.TimerTask;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.text.MessageFormat.format;

public class Task {

    private static final long SECOND = 1000;

    public static void main(String[] args) {
        Timer timer = new Timer();
        long startTimestamp = currentTimeMillis();
        System.out.println(format("程序执行时间为: {0}", startTimestamp));
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                try {
                    long exceptedTimestamp = startTimestamp + 5 * SECOND;
                    long executingTimestamp = currentTimeMillis();
                    long offset = executingTimestamp - exceptedTimestamp;
                    System.out.println(format("任务[0]运行在线程[{0}]上, 期望执行时间为[{1}], 实际执行时间为[{2}], 实际偏差[{3}]",
                            currentThread().getName(), exceptedTimestamp, executingTimestamp, offset));
                    Thread.sleep(10 * SECOND);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, 5 * SECOND);

        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                long exceptedTimestamp = startTimestamp + 10 * SECOND;
                long executingTimestamp = currentTimeMillis();
                long offset = executingTimestamp - exceptedTimestamp;
                System.out.println(format("任务[1]运行在线程[{0}]上, 期望执行时间为[{1}], 实际执行时间为[{2}], 实际偏差[{3}]",
                        currentThread().getName(), exceptedTimestamp, executingTimestamp, offset));
            }
        }, 10 * SECOND);
    }
}

程序运行后的执行结果

程序执行时间为: 1,614,597,388,284
任务[0]运行在线程[Timer-0]上, 期望执行时间为[1,614,597,393,284], 实际执行时间为[1,614,597,393,308], 实际偏差[24]
任务[1]运行在线程[Timer-0]上, 期望执行时间为[1,614,597,398,284], 实际执行时间为[1,614,597,403,312], 实际偏差[5,028]

可以看到,两个任务在同个线程顺序执行,而第一个任务因为阻塞了10秒钟,所以是在程序开始运行后的第15秒结束,而第二个任务期望在第10秒结束,但是因为第一个任务还没有结束,所以第二个任务在第15秒开始执行,与与其执行时间偏差5秒钟。在使用Timer时尽可能不要执行长任务或使用阻塞方法,否则会影响后续任务执行时间的准确性。

4. 周期性执行任务

接下来我们将会分别使用schedulescheduleAtFixedRate实现周期性执行任务。为了节省篇幅,我们将只演示如何使用schedule(TimerTask task, long delay, long period)scheduleAtFixedRate(TimerTask task, long delay, long period)来实现周期性执行任务,并介绍它们的差异。而其他的两个方法schedule(TimerTask task, Date firstTime, long period)scheduleAtFixedRate(TimerTask task, Date firstTime, long period)具有相同的效果和差异,就不再赘述。

首先我们修改Task类,调用schedule(TimerTask task, long delay, long period)来实现第一次执行完延迟任务后,周期性地执行任务

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.text.MessageFormat.format;

public class Task {

    private static final long SECOND = 1000;

    public static void main(String[] args) {
        AtomicLong counter = new AtomicLong(0);
        Timer timer = new Timer();
        long startTimestamp = currentTimeMillis();
        System.out.println(format("程序执行时间为: {0}", startTimestamp));
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                long count = counter.getAndIncrement();
                long exceptedTimestamp = startTimestamp + 10 * SECOND + count * SECOND;
                long executingTimestamp = currentTimeMillis();
                long offset = executingTimestamp - exceptedTimestamp;
                System.out.println(format("任务运行在线程[{0}]上, 期望执行时间为[{1}], 实际执行时间为[{2}], 实际偏差[{3}]",
                        currentThread().getName(), exceptedTimestamp, executingTimestamp, offset));
            }
        }, 10 * SECOND, SECOND);
    }
}

修改后的代码和使用schedule(TimerTask task, long delay)时的代码基本相同,我们额外添加计数器来记录任务的执行次数,方法调用添加了第三个参数period,表示任务每次执行时到下一次开始执行的时间间隔,我们这里设置成1秒钟。

程序运行后的执行结果

程序执行时间为: 1,614,609,111,434
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,609,121,434], 实际执行时间为[1,614,609,121,456], 实际偏差[22]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,609,122,434], 实际执行时间为[1,614,609,122,456], 实际偏差[22]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,609,123,434], 实际执行时间为[1,614,609,123,457], 实际偏差[23]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,609,124,434], 实际执行时间为[1,614,609,124,462], 实际偏差[28]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,609,125,434], 实际执行时间为[1,614,609,125,467], 实际偏差[33]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,609,126,434], 实际执行时间为[1,614,609,126,470], 实际偏差[36]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,609,127,434], 实际执行时间为[1,614,609,127,473], 实际偏差[39]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,609,128,434], 实际执行时间为[1,614,609,128,473], 实际偏差[39]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,609,129,434], 实际执行时间为[1,614,609,129,474], 实际偏差[40]

可以看到,每次任务执行都会有一定时间的偏差,而这个偏差随着执行次数的增加而不断积累。这个时间偏差取决于Timer中需要执行的任务的个数,随着Timer中需要执行的任务的个数增加呈非递减趋势。因为这个程序现在只有一个任务在重复执行,因此每次执行的偏差不是很大,如果同时维护成百上千个任务,那么这个时间偏差会变得很明显。

接下来我们修改Task类,调用scheduleAtFixedRate(TimerTask task, long delay, long period)来实现周期性执行任务

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.text.MessageFormat.format;

public class Task {

    private static final long SECOND = 1000;

    public static void main(String[] args) {
        AtomicLong counter = new AtomicLong(0);
        Timer timer = new Timer();
        long startTimestamp = currentTimeMillis();
        System.out.println(format("程序执行时间为: {0}", startTimestamp));
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                long count = counter.getAndIncrement();
                long exceptedTimestamp = startTimestamp + 10 * SECOND + count * SECOND;
                long executingTimestamp = currentTimeMillis();
                long offset = executingTimestamp - exceptedTimestamp;
                System.out.println(format("任务运行在线程[{0}]上, 期望执行时间为[{1}], 实际执行时间为[{2}], 实际偏差[{3}]",
                        currentThread().getName(), exceptedTimestamp, executingTimestamp, offset));
            }
        }, 10 * SECOND, SECOND);
    }
}

方法scheduleAtFixedRate(TimerTask task, long delay, long period)schedule(TimerTask task, long delay)的效果基本相同,它们都可以达到周期性执行任务的效果,但是scheduleAtFixedRate方法会修正任务的下一次期望执行时间,按照每一次的期望执行时间加上period参数来计算出下一次期望执行时间,因此scheduleAtFixedRate是以固定速率重复执行的,而schedule则只保证两次执行的时间间隔相同

程序运行后的执行结果

程序执行时间为: 1,614,610,372,927
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,610,383,927], 实际执行时间为[1,614,610,383,950], 实际偏差[23]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,610,384,927], 实际执行时间为[1,614,610,384,951], 实际偏差[24]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,610,385,927], 实际执行时间为[1,614,610,385,951], 实际偏差[24]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,610,386,927], 实际执行时间为[1,614,610,386,947], 实际偏差[20]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,610,387,927], 实际执行时间为[1,614,610,387,949], 实际偏差[22]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,610,388,927], 实际执行时间为[1,614,610,388,946], 实际偏差[19]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,610,389,927], 实际执行时间为[1,614,610,389,946], 实际偏差[19]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,610,390,927], 实际执行时间为[1,614,610,390,947], 实际偏差[20]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,610,391,927], 实际执行时间为[1,614,610,391,950], 实际偏差[23]
任务运行在线程[Timer-0]上, 期望执行时间为[1,614,610,392,927], 实际执行时间为[1,614,610,392,946], 实际偏差[19]

5. 停止任务

尽管我们很少会主动停止任务,但是这里还是要介绍下任务停止的方式。

停止任务的方式分为两种:停止单个任务和停止整个Timer

首先我们介绍如何停止单个任务,为了停止单个任务,我们需要调用TimerTaskcancal()方法,并调用Timerpurge()方法来移除所有已经被停止了的任务(回顾我们之前提到的,过多停止的任务不清空会影响我们的执行时间)

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.text.MessageFormat.format;

public class Task {

    private static final long SECOND = 1000;

    public static void main(String[] args) {
        AtomicLong counter = new AtomicLong(0);
        Timer timer = new Timer();
        long startTimestamp = currentTimeMillis();
        System.out.println(format("程序执行时间为: {0}", startTimestamp));
        TimerTask[] timerTasks = new TimerTask[4096];
        for (int i = 0; i < timerTasks.length; i++) {
            final int serialNumber = i;
            timerTasks[i] = new TimerTask() {
                @Override
                public void run() {
                    long count = counter.getAndIncrement();
                    long exceptedTimestamp = startTimestamp + 10 * SECOND + count * SECOND;
                    long executingTimestamp = currentTimeMillis();
                    long offset = executingTimestamp - exceptedTimestamp;
                    System.out.println(format("任务[{0}]运行在线程[{1}]上, 期望执行时间为[{2}], 实际执行时间为[{3}], 实际偏差[{4}]",
                            serialNumber, currentThread().getName(), exceptedTimestamp, executingTimestamp, offset));
                }
            };
        }
        for (TimerTask timerTask : timerTasks) {
            timer.schedule(timerTask, 10 * SECOND, SECOND);
        }
        for (int i = 1; i < timerTasks.length; i++) {
            timerTasks[i].cancel();
        }
        timer.purge();
    }
}

首先我们创建了4096个任务,并让Timer来调度它们,接下来我们把除了第0个任务外的其他4095个任务停止掉,并从Timer中移除所有已经停止的任务。

程序运行后的执行结果

程序执行时间为: 1,614,611,843,830
任务[0]运行在线程[Timer-0]上, 期望执行时间为[1,614,611,853,830], 实际执行时间为[1,614,611,853,869], 实际偏差[39]
任务[0]运行在线程[Timer-0]上, 期望执行时间为[1,614,611,854,830], 实际执行时间为[1,614,611,854,872], 实际偏差[42]
任务[0]运行在线程[Timer-0]上, 期望执行时间为[1,614,611,855,830], 实际执行时间为[1,614,611,855,875], 实际偏差[45]
任务[0]运行在线程[Timer-0]上, 期望执行时间为[1,614,611,856,830], 实际执行时间为[1,614,611,856,876], 实际偏差[46]
任务[0]运行在线程[Timer-0]上, 期望执行时间为[1,614,611,857,830], 实际执行时间为[1,614,611,857,882], 实际偏差[52]
任务[0]运行在线程[Timer-0]上, 期望执行时间为[1,614,611,858,830], 实际执行时间为[1,614,611,858,883], 实际偏差[53]
任务[0]运行在线程[Timer-0]上, 期望执行时间为[1,614,611,859,830], 实际执行时间为[1,614,611,859,887], 实际偏差[57]
任务[0]运行在线程[Timer-0]上, 期望执行时间为[1,614,611,860,830], 实际执行时间为[1,614,611,860,890], 实际偏差[60]
任务[0]运行在线程[Timer-0]上, 期望执行时间为[1,614,611,861,830], 实际执行时间为[1,614,611,861,891], 实际偏差[61]
任务[0]运行在线程[Timer-0]上, 期望执行时间为[1,614,611,862,830], 实际执行时间为[1,614,611,862,892], 实际偏差[62]

我们可以看到,只有第0个任务再继续执行,而其他4095个任务都没有执行。

接下来我们介绍如何使用Timercancel()来停止整个Timer的所有任务,其实很简单,只需要执行timer.cancel()就可以。

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;

import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.text.MessageFormat.format;

public class Task {

    private static final long SECOND = 1000;

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println(format("程序结束时间为: {0}", currentTimeMillis()));
        }));

        AtomicLong counter = new AtomicLong(0);
        Timer timer = new Timer();
        long startTimestamp = currentTimeMillis();
        System.out.println(format("程序执行时间为: {0}", startTimestamp));
        TimerTask[] timerTasks = new TimerTask[4096];
        for (int i = 0; i < timerTasks.length; i++) {
            final int serialNumber = i;
            timerTasks[i] = new TimerTask() {
                @Override
                public void run() {
                    long count = counter.getAndIncrement();
                    long exceptedTimestamp = startTimestamp + 10 * SECOND + count * SECOND;
                    long executingTimestamp = currentTimeMillis();
                    long offset = executingTimestamp - exceptedTimestamp;
                    System.out.println(format("任务[{0}]运行在线程[{1}]上, 期望执行时间为[{2}], 实际执行时间为[{3}], 实际偏差[{4}]",
                            serialNumber, currentThread().getName(), exceptedTimestamp, executingTimestamp, offset));
                }
            };
        }
        timer.cancel();
    }
}

在将所有的任务添加到Timer后,我们执行Timer对象的cancel()方法,为了更方便地表现出Timer的工作线程也终止了,我们注册了生命周期方法,来帮我们在程序结束后打印结束时间。

程序运行后的执行结果

程序执行时间为: 1,614,612,436,037
程序结束时间为: 1,614,612,436,061

可以看到,在执行Timer对象的cancel()方法后,Timer的工作线程也随之结束,程序正常退出。

源码解析

TimerTask

    • -

TimerTask类是一个抽象类,实现了Runnable接口

public abstract class TimerTask implements Runnable

TimerTask对象的成员

首先来看TimerTask类的成员部分

final Object lock = new Object();

int state = VIRGIN;

static final int VIRGIN      = 0;
static final int SCHEDULED   = 1;
static final int EXECUTED    = 2;
static final int CANCELLED   = 3;

long nextExecutionTime;

long period = 0;

对象lock是对外用来控制TimerTask对象修改的锁对象,它控制了锁的粒度——只会影响类属性的变更,而不会影响整个类的方法调用。接下来是state属性表示TimerTask对象的状态。nextExecutionTime属性表示TimerTask对象的下一次执行时间,当TimerTask对象被添加到任务队列后,将会使用这个属性来按照从小到大的顺序排序。period属性表示TimerTask对象的执行周期,period属性的值有三种情况

  1. 如果是0,那么表示任务不会重复执行
  2. 如果是正数,那么就表示任务按照相同的执行间隔来重复执行
  3. 如果是负数,那么就表示任务按照相同的执行速率来重复执行

TimerTask对象的构造方法

Timer对象的构造方法很简单,就是protected限定的默认构造方法,不再赘述

protected TimerTask() {
}

TimerTask对象的成员方法

接下来我们看下TimerTask对象的成员方法

public abstract void run();

public boolean cancel() {
    synchronized(lock) {
        boolean result = (state == SCHEDULED);
        state = CANCELLED;
        return result;
    }
}

public long scheduledExecutionTime() {
    synchronized(lock) {
        return (period < 0 ? nextExecutionTime + period
                           : nextExecutionTime - period);
    }
}

首先是run()方法实现自Runnable()接口,为抽象方法,所有的任务都需要实现此方法。接下来是cancel()方法,这个方法会将任务的状态标记为CANCELLED,如果在结束前任务处于被调度状态,那么就返回true,否则返回false。至于scheduledExecutionTime()只是用来计算重复执行的下一次执行时间,在Timer中并没有被使用过,不再赘述。

TimerQueue

    • -

TimerQueueTimer维护任务调度顺序的最小优先队列,使用的是最小二叉堆实现,如上文所述,排序用的Key是TimerTasknextExecutionTime属性。

在介绍TimerQueue之前,我们先补充下数据结构的基础知识

二叉堆(Binary heap)

二叉堆是一颗除了最底层的元素外,所有层都被填满,最底层的元素从左向右填充的完全二叉树(complete binary tree)。完全二叉树可以用数组表示,假设元素从1开始编号,下标为i的元素,它的左孩子的下标为2*i,它的右孩子的下标为2*i+1

二叉堆的任意非叶节点满足堆序性:假设我们定义的是最小优先队列,那么我们使用的是小根堆,任意节点的元素值都小于它的左孩子和右孩子(如果有的话)的元素值。

二叉堆的定义满足递归定义法,即二叉堆的任意子树都是二叉堆,单个节点本身就是二叉堆。

根据堆序性和递归定义法,二叉堆的根节点一定是整个二叉堆中元素值最小的节点

与堆结构有关的操作,除了add, getMinremoveMin之外,还有fixUpfixDownheapify三个关键操作,而addgetMinremoveMin也是通过这三个操作来完成的,下面来简单介绍下这三个操作

  1. fixUp: 当我们向二叉堆中添加元素时,我们可以简单地将它添加到二叉树的末尾,此时从这个节点到根的完整路径上不满足堆序性。之后将它不断向上浮,直到遇到比它小的元素,此时整个二叉树的所有节点都满足堆序性。当我们减少了二叉堆中元素的值的时候也可以通过这个方法来维护二叉堆。
  2. fixDown: 当我们从二叉堆中删除元素时,我们可以简单地将二叉树末尾的元素移动到根,此时不一定满足堆序性,之后将它不断下沉,直到遇到比它大的元素,此时整个二叉树的所有节点都满足堆序性。当我们增加了二叉堆中元素的值的时候也可以通过这个方法来维护二叉堆。
  3. heapify: 当我们拿到无序的数组的时候,也可以假设我们拿到了一棵不满足堆序性的二叉树,此时我们将所有的非叶节点向下沉,直到整个二叉树的所有节点都满足堆序性,此时我们得到了完整的二叉堆。这个操作是原地操作,不需要额外的空间复杂度,而时间复杂度是O(N)。

关于二叉堆的详细内容将会在后续的文章中展开详解,这里只做简单的介绍,了解这些我们就可以开始看TimerQueue的源码。

TimerQueue的完整代码

我们直接来看TaskQueue的完整代码

class TaskQueue {

    private TimerTask[] queue = new TimerTask[128];

    private int size = 0;

    int size() {
        return size;
    }

    void add(TimerTask task) {
        // Grow backing store if necessary
        if (size + 1 == queue.length)
            queue = Arrays.copyOf(queue, 2*queue.length);

        queue[++size] = task;
        fixUp(size);
    }

    TimerTask getMin() {
        return queue[1];
    }

    TimerTask get(int i) {
        return queue[i];
    }

    void removeMin() {
        queue[1] = queue[size];
        queue[size--] = null;  // Drop extra reference to prevent memory leak
        fixDown(1);
    }

    void quickRemove(int i) {
        assert i <= size;

        queue[i] = queue[size];
        queue[size--] = null;  // Drop extra ref to prevent memory leak
    }

    void rescheduleMin(long newTime) {
        queue[1].nextExecutionTime = newTime;
        fixDown(1);
    }

    boolean isEmpty() {
        return size==0;
    }

    void clear() {
        // Null out task references to prevent memory leak
        for (int i=1; i<=size; i++)
            queue[i] = null;

        size = 0;
    }

    private void fixUp(int k) {
        while (k > 1) {
            int j = k >> 1;
            if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }

    private void fixDown(int k) {
        int j;
        while ((j = k << 1) <= size && j > 0) {
            if (j < size &&
                queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
                j++; // j indexes smallest kid
            if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }

    void heapify() {
        for (int i = size/2; i >= 1; i--)
            fixDown(i);
    }
}

按照我们之前介绍的二叉堆的相关知识,我们可以看到TimerQueue维护了TimerTask的数组queue,初始大小size为0。

add操作首先判断了数组是否满了,如果数组已经满了,那么先执行扩容操作,再进行添加操作。如上所述,add操作先将元素放到二叉树末尾的元素(queue[++size]),之后对这个元素进行上浮来维护堆序性。

getMin直接返回二叉树的树根(queue[1]),get方法直接返回数组的第i个元素。removeMin方法会将二叉树末尾的元素(queue[size])移动到树根(queue[1]),并将原本二叉树末尾的元素设置成null,来让垃圾回收器回收这个TimerTask,之后执行fixDown来维护堆序性,quickRemove也是相同的过程,只不过它在移动元素后没有执行下沉操作,当连续执行多次quickRemove后统一执行heapify来维护堆序性。

rescheduleMin会将树根元素的元素值设置成newTime,并将它下沉到合适的位置。

fixUpfixDownheapify操作就如上文所述,用来维护二叉堆的读序性。不过这里面实现的fixUpfixDown并不优雅,基于交换临位元素的实现需要使用T(3log(N))的时间,而实际上有T(log(N))的实现方法。后续的文章中会详细介绍优先队列与二叉堆的实现方式。

TimerThread

    • -

我们直接来看TimerThread的代码

class TimerThread extends Thread {
    boolean newTasksMayBeScheduled = true;

    private TaskQueue queue;

    TimerThread(TaskQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            mainLoop();
        } finally {
            // Someone killed this Thread, behave as if Timer cancelled
            synchronized(queue) {
                newTasksMayBeScheduled = false;
                queue.clear();  // Eliminate obsolete references
            }
        }
    }

    private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired;
                synchronized(queue) {
                    // Wait for queue to become non-empty
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die

                    // Queue nonempty; look at first evt and do the right thing
                    long currentTime, executionTime;
                    task = queue.getMin();
                    synchronized(task.lock) {
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }
}

首先是控制变量newTasksMayBeScheduled,表示当前工作线程是否应该继续执行任务,当它为false的时候它将不会再从任务队列中取任务执行,表示当前工作线程已结束。接下来的queue变量是通过构造方法传进来的任务队列,工作线程的任务队列与Timer共享,实现生产消费者模型。

进入到run()方法,run()方法会调用mainLoop()方法来执行主循环,而finally代码块会在主循环结束后清空任务队列实现优雅退出。

mainLoop()方法中执行了死循环来拉取执行任务,在死循环中首先获取queue的锁来实现线程同步,接下来判断任务队列是否为且工作线程是否停止,如果任务队列为空且工作线程未停止,那么就使用queue.wait()来等待Timer添加任务后唤醒该线程,Object#wait()方法会释放当前线程所持有的该对象的锁,关于wait/notisfy的内容可以去看Java API)相关介绍。如果queue退出等待后依旧为空,则表示newTasksMayBeScheduledfalse,工作线程已停止,退出主循环,否则会从任务队列中取出需要最近执行的任务(并不会删除任务)。

取到需要最近执行的任务后,获取该任务的锁,并判断该任务是否已经停止,如果该任务已经停止,那么就把它从任务队列中移除,并什么都不做继续执行主循环。接下来判断当前时间是否小于等于任务的下一次执行时间,如果满足条件则将taskFired设置成true,判断当前任务是否需要重复执行。如果不需要重复执行就将它从任务队列中移除,并将任务状态设置成EXECUTED,如果需要重复执行就根据period设置它的下一次执行时间并重新调整任务队列。

完成这些操作后,如果taskFiredfalse,就让queue对象进入有限等待状态,很容易得到我们需要的最大等待时间为executionTime - currentTime。如果taskFiredtrue,那么就释放锁并执行被取出的任务。

Timer

    • -

Timer对象的成员

首先来看Timer的成员部分

private final TaskQueue queue = new TaskQueue();
private final TimerThread thread = new TimerThread(queue);

private final Object threadReaper = new Object() {
    @SuppressWarnings("deprecation")
    protected void finalize() throws Throwable {
        synchronized(queue) {
            thread.newTasksMayBeScheduled = false;
            queue.notify(); // In case queue is empty.
        }
    }
};

private static final AtomicInteger nextSerialNumber = new AtomicInteger(0);

其中queue对象是如前面所说,为了任务调度的最小优先队列。接下来是TimerThread,它是Timer的工作线程,在Timer创建时就已经被分配,并与Timer共享任务队列。

threadReaper是一个只复写了finalize方法的对象,它的作用是当Timer对象没有存活的引用后,终止任务线程,并等待任务队列中的所有任务执行结束后退出工作线程,实现优雅退出。

nextSerialNumber用来记录工作线程的序列号,全局唯一,避免生成的线程名称冲突。

Timer对象的构造方法

接下来我们看下Timer的所有构造方法

public Timer() {
    this("Timer-" + serialNumber());
}

public Timer(boolean isDaemon) {
    this("Timer-" + serialNumber(), isDaemon);
}

public Timer(String name) {
    thread.setName(name);
    thread.start();
}

public Timer(String name, boolean isDaemon) {
    thread.setName(name);
    thread.setDaemon(isDaemon);
    thread.start();
}

可以看到,所有的构造构造方法所做的事都相同:设置工作线程属性,并启动工作线程。

成员函数

接下来我们可以看下Timer的成员函数,我们首先不考虑cancel()purge()方法,直接看schedule系列方法

public void schedule(TimerTask task, long delay) {
    if (delay < 0)
        throw new IllegalArgumentException("Negative delay.");
    sched(task, System.currentTimeMillis()+delay, 0);
}

public void schedule(TimerTask task, Date time) {
    sched(task, time.getTime(), 0);
}

public void schedule(TimerTask task, long delay, long period) {
    if (delay < 0)
        throw new IllegalArgumentException("Negative delay.");
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, System.currentTimeMillis()+delay, -period);
}

public void schedule(TimerTask task, Date firstTime, long period) {
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, firstTime.getTime(), -period);
}

public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
    if (delay < 0)
        throw new IllegalArgumentException("Negative delay.");
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, System.currentTimeMillis()+delay, period);
}

public void scheduleAtFixedRate(TimerTask task, Date firstTime,
                                long period) {
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, firstTime.getTime(), period);
}

可以看到,所有的schedule方法除了做参数教研外,都将延迟时间和计划执行时间转化为时间戳委托给sched方法来执行。schedulescheduleAtFixedRate传递的参数都相同,不过在传递period参数时使用符号来区分周期执行的方式。

接下来我们可以看下这位神秘嘉宾——sched方法到底做了哪些事

private void sched(TimerTask task, long time, long period) {
    if (time < 0)
        throw new IllegalArgumentException("Illegal execution time.");

    // Constrain value of period sufficiently to prevent numeric
    // overflow while still being effectively infinitely large.
    if (Math.abs(period) > (Long.MAX_VALUE >> 1))
        period >>= 1;

    synchronized(queue) {
        if (!thread.newTasksMayBeScheduled)
            throw new IllegalStateException("Timer already cancelled.");

        synchronized(task.lock) {
            if (task.state != TimerTask.VIRGIN)
                throw new IllegalStateException(
                    "Task already scheduled or cancelled");
            task.nextExecutionTime = time;
            task.period = period;
            task.state = TimerTask.SCHEDULED;
        }

        queue.add(task);
        if (queue.getMin() == task)
            queue.notify();
    }
}

sched方法首先做了一些参数校验,保证期待执行时间不小于0,且执行周期不至于太大。接下来获取任务队列queue对象的monitor(监视器锁),如果Timer的工作线程已经被停止了,那么就会抛出IllegalStateException来禁止继续添加任务,newTasksMayBeScheduled这个变量将会在稍后介绍。之后sched方法会尝试获取task.lock对象的锁,判断task的状态避免重复添加,并设置task的下一次执行时间、task的执行周期和状态。之后将task添加到任务队列中,如果当前任务就是执行时间最近的任务,那么就会唤起等待queue对象的线程(其实就是thread工作线程)继续执行。

总结

本文从各个方面介绍了java.util.Timer类的使用方式,并从源码角度介绍了java.util.Timer的实现方式。看完本文后,读者应当掌握

  1. 如何执行晚于当前时间的任务
  2. 当任务执行时间早于当前时间会发生什么
  3. 如何向Timer中添加多个任务
  4. 如何周期性执行任务
  5. 如何停止任务
  6. 如何自己实现类似的定时器

希望本文可以帮助大家在工作中更好地使用java.util.Timer

查看原文

赞 0 收藏 0 评论 0

纪卓志 George 发布了文章 · 2月23日

基于Kafka和Elasticsearch构建实时站内搜索功能的实践

目前我们在构建一个多租户多产品类网站,为了让用户更好的找到他们所需要的产品,我们需要构建站内搜索功能,并且它应该是实时更新的。本文将会讨论构建这一功能的核心基础设施,以及支持此搜索能力的技术栈。

问题的定义与决策

为了构建一个快速、实时的搜索引擎,我们必须做出某些设计决策。我们使用MySQL作为主数据库存储,因此有以下选择:

  1. 直接在MySQL数据库中查询用户在搜索框中输入的每个关键词,就像%#{word1}%#{word2}%...这样。 😐
  2. 使用一个高效的搜索数据库,如Elasticsearch。😮

考虑到我们是一个多租户应用程序,同时被搜索的实体可能需要大量的关联操作(如果我们使用的是MySQL一类的关系型数据库),因为不同类型的产品有不同的数据结构,所以我们还可以能需要同时遍历多个数据表来查询用户输入的关键词。所以我们决定不使用直接在MySQL中查询关键词的方案。🤯

因此,我们必须决定一种高效、可靠的方式,将数据实时地从MySQL迁移到Elasticsearch中。接下来需要做出如下的决定:

  1. 使用Worker定期查询MySQL数据库,并将所有变化的数据发送到Elasticsearch。😶
  2. 在应用程序中使用Elasticsearch客户端,将数据同时写入到MySQL和Elasticsearch中。🤔
  3. 使用基于事件的流引擎,将MySQL数据库中的数据更改作为事件,发送到流处理服务器上,经过处理后将其转发到Elasticsearch。🥳

选项1并不是实时的,所以可以直接排除,而且即使我们缩短轮询间隔,也会造成全表扫描给数据库造成查询压力。除了不是实时的之外,选项1无法支持对数据的删除操作,如果对数据进行了删除,那么我们需要额外的表记录之前存在过的数据,这样才能保证用户不会搜索到已经删除了的脏数据。对于其他两种选择,不同的应用场景做出的决定可能会有所不同。在我们的场景中,如果选择选项2,那么我们可以预见一些问题:如过Elasticsearch建立网络连接并确认更新时速度很慢,那么这可能会降低我们应用程序的速度;或者在写入Elasticsearch时发生了未知异常,我们该如何对这一操作进行重试来保证数据完整性;不可否认开发团队中不是所有开发人员都能了解所有的功能,如果有开发人员在开发新的与产品有关的业务逻辑时没有引入Elasticsearch客户端,那么我们将在Elasticsearch中更新这次数据的更改,无法保证MySQL与Elasticsearch间的数据一致性。

接下来我们该考虑如何将MySQL数据库中的数据更改作为事件,发送到流处理服务器上。我们可以在数据库变更后,在应用程序中使用消息管道的客户端同步地将事件发送到消息管道,但是这并没有解决上面提到的使用Elasticsearch客户端带来的问题,只不过是将风险从Elasticsearch转移到了消息管道。最终我们决定通过采集MySQL Binlog,将MySQL Binlog作为事件发送到消息管道中的方式来实现基于事件的流引擎。关于binlog的内容可以点击链接,在这里不再赘述。

服务简介

为了对外提供统一的搜索接口,我们首先需要定义用于搜索的数据结构。对于大部分的搜索系统而言,对用户展示的搜索结果通常包括为标题内容,这部分内容我们称之可搜索内容(Searchable Content)。在多租户系统中我们还需要在搜索结果中标示出该搜索结果属于哪个租户,或用来过滤当前租户下可搜索的内容,我们还需要额外的信息来帮助用户筛选自己想要搜索的产品类别,我们将这部分通用的但不用来进行搜索的内容称为元数据(Metadata)。最后,在我们展示搜索结果时可能希望根据不同类型的产品提供不同的展示效果,我们需要在搜索结果中返回这些个性化展示所需要的原始内容(Raw Content)。到此为止我们可以定义出了存储到Elasticsearch中的通用数据结构:

{
    "searchable": {
        "title": "string",
        "content": "string"
    },
    "metadata": {
        "tenant_id": "long",
        "type": "long",
        "created_at": "date",
        "created_by": "string",
        "updated_at": "date",
        "updated_by": "string"
    },
    "raw": {}
}

基础设施

Apache Kafka: Apache Kafka是开源的分布式事件流平台。我们使用Apache kafka作为数据库事件(插入、修改和删除)的持久化存储。

mysql-binlog-connector-java: 我们使用mysql-binlog-connector-java从MySQL Binlog中获取数据库事件,并将它发送到Apache Kafka中。我们将单独启动一个服务来完成这个过程。

在接收端我们也将单独启动一个服务来消费Kafka中的事件,并对数据进行处理然后发送到Elasticsearch中。

Q:为什么不使用Elasticsearch connector之类的连接器对数据进行处理并发送到Elasticsearch中?
A:在我们的系统中是不允许将大文本存入到MySQL中的,所以我们使用了额外的对象存储服务来存放我们的产品文档,所以我们无法直接使用连接器将数据发送到Elasticsearch中。
Q:为什么不在发送到Kafka前就将数据进行处理?
A:这样会有大量的数据被持久化到Kafka中,占用Kafka的磁盘空间,而这部分数据实际上也被存储到了Elasticsearch。
Q:为什么要用单独的服务来采集binlog,而不是使用Filebeat之类的agent?
A:当然可以直接在MySQL数据库中安装agent来直接采集binlog并发送到Kafka中。但是在部分情况下开发者使用的是云服务商或其他基础设施部门提供的MySQL服务器,这种情况下我们无法直接进入服务器安装agent,所以使用更加通用的、无侵入性的C/S结构来消费MySQL的binlog。

配置技术栈

我们使用docker和docker-compose来配置和部署服务。为了简单起见,MySQL直接使用了root作为用户名和密码,Kafka和Elasticsearch使用的是单节点集群,且没有设置任何鉴权方式,仅供开发环境使用,请勿直接用于生产环境。

version: "3"
services:
  mysql:
    image: mysql:5.7
    container_name: mysql
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: app
    ports:
      - 3306:3306
    volumes:
      - mysql:/var/lib/mysql
  zookeeper:
    image: bitnami/zookeeper:3.6.2
    container_name: zookeeper
    ports:
      - 2181:2181
    volumes:
      - zookeeper:/bitnami
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka:2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    volumes:
      - kafka:/bitnami
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
    volumes:
      - elasticsearch:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
volumes:
  mysql:
    driver: local
  zookeeper:
    driver: local
  kafka:
    driver: local
  elasticsearch:
    driver: local

在服务启动成功后我们需要为Elasticsearch创建索引,在这里我们直接使用curl调用Elasticsearch的RESTful API,也可以使用busybox基础镜像创建服务来完成这个步骤。

# Elasticsearch
curl "http://localhost:9200/search" -XPUT -d '
{
  "mappings": {
    "properties": {
      "searchable": {
        "type": "nested",
        "properties": {
          "title": {
            "type": "text"
          },
          "content": {
            "type": "text"
          }
        }
      },
      "metadata": {
        "type": "nested",
        "properties": {
          "tenant_id": {
            "type": "long"
          },
          "type": {
            "type": "integer"
          },
          "created_at": {
            "type": "date"
          },
          "created_by": {
            "type": "keyword"
          },
          "updated_at": {
            "type": "date"
          },
          "updated_by": {
            "type": "keyword"
          }
        }
      },
      "raw": {
        "type": "nested"
      }
    }
  }
}'

核心代码实现(SpringBoot + Kotlin)

Binlog采集端:

    override fun run() {
        client.serverId = properties.serverId
        val eventDeserializer = EventDeserializer()
        eventDeserializer.setCompatibilityMode(
            EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
        )
        client.setEventDeserializer(eventDeserializer)
        client.registerEventListener {
            val header = it.getHeader<EventHeader>()
            val data = it.getData<EventData>()
            if (header.eventType == EventType.TABLE_MAP) {
                tableRepository.updateTable(Table.of(data as TableMapEventData))
            } else if (EventType.isRowMutation(header.eventType)) {
                val events = when {
                    EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
                    EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
                    EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
                    else -> emptyList()
                }
                logger.info("Mutation events: {}", events)
                for (event in events) {
                    kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
                }
            }
        }
        client.connect()
    }

在这段代码里面,我们首先是对binlog客户端进行了初始化,随后开始监听binlog事件。binlog事件类型有很多,大部分都是我们不需要关心的事件,我们只需要关注TABLE\_MAP和WRITE/UPDATE/DELETE就可以。当我们接收到TABLE\_MAP事件,我们会对内存中的数据库表结构进行更新,在后续的WRITE/UPDATE/DELETE事件中,我们会使用内存缓存的数据库结构进行映射。整个过程大概如下所示:

Table: ["id", "title", "content",...]
Row: [1, "Foo", "Bar",...]
=>
{
    "id": 1,
    "title": "Foo",
    "content": "Bar"
}

随后我们将收集到的事件发送到Kafka中,并由Event Processor进行消费处理。

事件处理器

@Component
class KafkaBinlogTopicListener(
    val binlogEventHandler: BinlogEventHandler
) {

    companion object {
        private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
    }

    private val objectMapper = jacksonObjectMapper()

    @KafkaListener(topics = ["binlog"])
    fun process(message: String) {
        val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
        logger.info("Consume binlog event: {}", binlogEvent)
        binlogEventHandler.handle(binlogEvent)
    }
}

首先使用SpringBoot Message Kafka提供的注解对事件进行消费,接下来将事件委托到binlogEventHandler去进行处理。实际上BinlogEventHandler是个自定义的函数式接口,我们自定义事件处理器实现该接口后通过Spring Bean的方式注入到KafkaBinlogTopicListener中。

@Component
class ElasticsearchIndexerBinlogEventHandler(
    val restHighLevelClient: RestHighLevelClient
) : BinlogEventHandler {
    override fun handle(binlogEvent: BinlogEvent) {
        val payload = binlogEvent.payload as Map<*, *>
        val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
        // Should delete from Elasticsearch
        if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
            val deleteRequest = DeleteRequest()
            deleteRequest
                .index("search")
                .id(documentId)
            restHighLevelClient.delete(deleteRequest, DEFAULT)
        } else {
            // Not ever WRITE or UPDATE, just reindex
            val indexRequest = IndexRequest()
            indexRequest
                .index("search")
                .id(documentId)
                .source(
                    mapOf<String, Any>(
                        "searchable" to mapOf(
                            "title" to payload["title"],
                            "content" to payload["content"]
                        ),
                        "metadata" to mapOf(
                            "tenantId" to payload["tenantId"],
                            "type" to payload["type"],
                            "createdAt" to payload["createdAt"],
                            "createdBy" to payload["createdBy"],
                            "updatedAt" to payload["updatedAt"],
                            "updatedBy" to payload["updatedBy"]
                        )
                    )
                )
            restHighLevelClient.index(indexRequest, DEFAULT)
        }
    }
}

在这里我们只需要简单地判断是否为删除操作就可以,如果是删除操作需要在Elasticsearch中将数据删除,而如果是非删除操作只需要在Elasticsearch重新按照为文档建立索引即可。这段代码简单地使用了Kotlin中提供的mapOf方法对数据进行映射,如果需要其他复杂的处理只需要按照Java代码的方式编写处理器即可。

总结

在本文是作者工作后编写的第一篇博客,才学疏浅,可能有的地方会出现技术理解不够深入用词不当等问题,感谢大家帮忙指正。
其实Binlog的处理部分有很多开源的处理引擎,包括Alibaba Canal,本文使用手动处理的方式也是为其他使用非MySQL数据源的同学类似的解决方案。大家可以按需所取,因地制宜,为自己的网站设计属于自己的实时站内搜索引擎!
本文中提到的完整代码已上传到Gitee,欢迎给个Star~传送门

查看原文

赞 3 收藏 2 评论 0

纪卓志 George 发布了文章 · 2月22日

为SpringBoot项目中的自定义配置添加IDE支持

代码是写给人看的,不是写给机器看的,只是顺便计算机可以执行而已

——《计算机程序的构造和解释(SICP)》

导言

在我们的项目里经常会出现需要添加自定义配置的应用场景,例如某个开关变量,在测试环境打开,在生产环境不打开,通常我们都会使用下面的代码来实现,然后在Spring Boot配置文件中添加这个key和Value

Application.java:

application.properties

或者是没有使用@Value而直接在XML中使用我们配置的属性值

application.xml

这样的代码和配置在Spring Boot项目中可以正常启动并读取配置,但是在我们的IDE中却不会为我们提示配置的类型和代码补全。当我们有新同事到来,或者是需要为配置文件添加新的环境的支持的时候,我们很容易会把配置文件的Key拼错,或者Value的值与我们的变量类型并不兼容(实际上真的发生过这样的问题导致项目启动失败)。

但是在我们使用Spring Boot提供的配置的时候,IDE总是能为我们自动补全,告诉我们这个配置的变量类型,甚至是给我们把这个配置的描述显示出来。

我们是否也可以为我们自己写的配置添加这样的IDE支持呢?

配置项元数据(Configuration Metadata)

Spring Boot的Jar文件包含元数据文件,这些文件提供了我们所需要的配置属性的详细信息。IDE通过读取这些元数据文件,然后在使用application.properties或application.yml的时候提供上下文信息和代码补全。那么只要知道如何编写并存放配置项元数据信息文件,我们也可以让IDE知道如何为我们的自定义配置提供上下文信息。

元数据格式

Spring Boot项目的配置项元数据文件都放在META-INF/spring-configuration-metadata.json中。下图是当我们配置好Spring Boot项目后默认使用的Spring Boot自动配置的配置项元数据存放的位置

配置项元数据文件按照groups, properties和hints组织。properties下的每个property都是程序中需要使用的配置项的key值,比如server.port是服务启动后的端口号。而我们可以将一些property按照某些规则组合起来,这个组合就是group(通常我们并不需要为properties组织对应的group)。而hints是为我们的配置项提供额外的信息,比如时区time.zone支持Asia/Shanghai,我们可以为它提供"Asia/Shanghai"的hint。

properties的参数

名称类型描述
nameString属性的全名。名称是小写字母以句点分割。此属性是必须的
typeString属性的数据类型的完整签名(java.lang.String),如果是范型的话还应当包含完整的范型参数(java.util.List<java.lang.String>)。为了保证一致性,需要使用包装类型来替代基本类型。此属性不是必须的,但是无法得到类型诊断的支持。

hints的参数

名称类型描述
nameString该提示所引用的属性的全称,和properties的name参数相同。此属性是必须的
valuesValueHint[]ValueHint对象定义的有效值列表。每个条目都定义该值,并且可以具有描述

ValueHint的参数

名称类型描述
valueObjectproperty给定的类型的有效值,如果property的类型是数组,那么它也可以是值的数组。此属性是必须的。如果是Map类型的属性,可以使用.keys和.values来指定对应的有效值。
descriptionString和properties的description相同,提示给用户的简短描述。此属性不是必须的。

这里只展示了我们常用的参数,关于配置项元数据文件格式的详细信息可以看Spring Boot的官方文档(https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-configuration-metadata.html#configuration-metadata-format)

为自定义配置编写配置项元数据

接下来我们将对值、数组场景编写对应的配置项元数据,并为这些配置项添加提示(Talk is cheap,show me the code)

在IDE中实际使用的效果

不过这种方式虽然好,但是需要我们写很多的JSON配置来告诉IDE该如何进行代码补全和附加上下文信息,配置和代码还是处于分离的状态,如果能通过写一个配置类,直接通过这个类和它的注释就能为我们做到IDE支持就好了——Spring Boot开发者也是这么想的。

为代码自动生成配置项元数据

首先我们要改掉随处使用@Value的习惯,使用专门的数据类来存放我们的配置项

接下来我们创建一个Bean,让Spring Boot容器来接管这个类的实例

通过@ConfigurationProperties注解,Spring就会自动将配置注入到我们的配置Bean中,但是此时IDE还无法识别我们添加的自动配置,我们需要添加Spring Boot的注解处理器(annotation processor,从Java 1.6开始支持的特性)

添加注解处理器后重新编译,我们就会在target目录下看到自动生成的META-INF/spring-configuration-metadata.json

里面的内容基本就是我们之前自己手动输入的内容,只是受于Java代码表达信息的局限性,没有办法生成hints信息。其中sourceType和sourceMethod属性还可以帮助IDE跳转到我们声明这个配置的类和方法

如果我们想要让我们使用代码生成的配置类也能添加提示的话,可以在我们的META-INF目录下添加additional-spring-configuration-metadata.json文件,将hints写到这个文件里面

这样Spring Boot在编译的时候就会将我们的提示信息合并到配置信息元数据文件里面了

虽然这些工作不会增加代码的运行效率,但是让我们的配置集中起来并有IDE的加成,会让我们更改配置的时候更加有信心。正如开头所说的,代码是写给人看的,不是写给机器看的,只是顺便计算机可以执行而已。

查看原文

赞 0 收藏 0 评论 0

纪卓志 George 关注了标签 · 2月22日

vue.js

Reactive Components for Modern Web Interfaces.

Vue.js 是一个用于创建 web 交互界面的。其特点是

  • 简洁 HTML 模板 + JSON 数据,再创建一个 Vue 实例,就这么简单。
  • 数据驱动 自动追踪依赖的模板表达式和计算属性。
  • 组件化 用解耦、可复用的组件来构造界面。
  • 轻量 ~24kb min+gzip,无依赖。
  • 快速 精确有效的异步批量 DOM 更新。
  • 模块友好 通过 NPM 或 Bower 安装,无缝融入你的工作流。

官网:https://vuejs.org
GitHub:https://github.com/vuejs/vue

关注 134157

纪卓志 George 关注了标签 · 2月22日

java

Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的 Java 程序设计语言和 Java 平台(即 JavaSE, JavaEE, JavaME)的总称。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

Java编程语言的风格十分接近 C++ 语言。继承了 C++ 语言面向对象技术的核心,Java舍弃了 C++ 语言中容易引起错误的指針,改以引用取代,同时卸载原 C++ 与原来运算符重载,也卸载多重继承特性,改用接口取代,增加垃圾回收器功能。在 Java SE 1.5 版本中引入了泛型编程、类型安全的枚举、不定长参数和自动装/拆箱特性。太阳微系统对 Java 语言的解释是:“Java编程语言是个简单、面向对象、分布式、解释性、健壮、安全与系统无关、可移植、高性能、多线程和动态的语言”。

版本历史

重要版本号版本代号发布日期
JDK 1.01996 年 1 月 23 日
JDK 1.11997 年 2 月 19 日
J2SE 1.2Playground1998 年 12 月 8 日
J2SE 1.3Kestrel2000 年 5 月 8 日
J2SE 1.4Merlin2002 年 2 月 6 日
J2SE 5.0 (1.5.0)Tiger2004 年 9 月 30 日
Java SE 6Mustang2006 年 11 月 11 日
Java SE 7Dolphin2011 年 7 月 28 日
Java SE 8JSR 3372014 年 3 月 18 日
最新发布的稳定版本:
Java Standard Edition 8 Update 11 (1.8.0_11) - (July 15, 2014)
Java Standard Edition 7 Update 65 (1.7.0_65) - (July 15, 2014)

更详细的版本更新查看 J2SE Code NamesJava version history 维基页面

新手帮助

不知道如何开始写你的第一个 Java 程序?查看 Oracle 的 Java 上手文档

在你遇到问题提问之前,可以先在站内搜索一下关键词,看是否已经存在你想提问的内容。

命名规范

Java 程序应遵循以下的 命名规则,以增加可读性,同时降低偶然误差的概率。遵循这些命名规范,可以让别人更容易理解你的代码。

  • 类型名(类,接口,枚举等)应以大写字母开始,同时大写化后续每个单词的首字母。例如:StringThreadLocaland NullPointerException。这就是著名的帕斯卡命名法。
  • 方法名 应该是驼峰式,即以小写字母开头,同时大写化后续每个单词的首字母。例如:indexOfprintStackTraceinterrupt
  • 字段名 同样是驼峰式,和方法名一样。
  • 常量表达式的名称static final 不可变对象)应该全大写,同时用下划线分隔每个单词。例如:YELLOWDO_NOTHING_ON_CLOSE。这个规范也适用于一个枚举类的值。然而,static final 引用的非不可变对象应该是驼峰式。

Hello World

public class HelloWorld {
    public static void main(String[] args) {
        System.out.println("Hello, World!");
    }
}

编译并调用:

javac -d . HelloWorld.java
java -cp . HelloWorld

Java 的源代码会被编译成可被 Java 命令执行的中间形式(用于 Java 虚拟机的字节代码指令)。

可用的 IDE

学习资源

常见的问题

下面是一些 SegmentFault 上在 Java 方面经常被人问到的问题:

(待补充)

关注 139155

纪卓志 George 关注了标签 · 2月22日

数据库

数据库(Database)是按照数据结构来组织、存储和管理数据的仓库,它产生于距今五十年前,随着信息技术和市场的发展,特别是二十世纪九十年代以后,数据管理不再仅仅是存储和管理数据,而转变成用户所需要的各种数据管理的方式。数据库有很多种类型,从最简单的存储有各种数据的表格到能够进行海量数据存储的大型数据库系统都在各个方面得到了广泛的应用。

关注 8344

纪卓志 George 关注了标签 · 2月22日

css

层叠样式表(英语:Cascading Style Sheets,简写CSS),又称串样式列表,由W3C定义和维护的标准,一种用来为结构化文档(如HTML文档或XML应用)添加样式(字体、间距和颜色等)的计算机语言。

关注 93997

纪卓志 George 关注了标签 · 2月22日

php

PHP,是英文超文本预处理语言 Hypertext Preprocessor 的缩写。PHP 是一种开源的通用计算机脚本语言,尤其适用于网络开发并可嵌入HTML 中使用。PHP 的语法借鉴吸收 C语言、Java 和 Perl 等流行计算机语言的特点,易于一般程序员学习。(目前是 Web 开发性价比最高的语言)

关注 89074

纪卓志 George 关注了标签 · 2月22日

javascript

JavaScript 是一门弱类型的动态脚本语言,支持多种编程范式,包括面向对象和函数式编程,被广泛用于 Web 开发。

一般来说,完整的JavaScript包括以下几个部分:

  • ECMAScript,描述了该语言的语法和基本对象
  • 文档对象模型(DOM),描述处理网页内容的方法和接口
  • 浏览器对象模型(BOM),描述与浏览器进行交互的方法和接口

它的基本特点如下:

  • 是一种解释性脚本语言(代码不进行预编译)。
  • 主要用来向HTML页面添加交互行为。
  • 可以直接嵌入HTML页面,但写成单独的js文件有利于结构和行为的分离。

JavaScript常用来完成以下任务:

  • 嵌入动态文本于HTML页面
  • 对浏览器事件作出响应
  • 读写HTML元素
  • 在数据被提交到服务器之前验证数据
  • 检测访客的浏览器信息

《 Javascript 优点在整个语言中占多大比例?

关注 172953

纪卓志 George 关注了标签 · 2月22日

python

Python(发音:英[ˈpaɪθən],美[ˈpaɪθɑ:n]),是一种面向对象、直译式电脑编程语言,也是一种功能强大的通用型语言,已经具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法非常简捷和清晰,与其它大多数程序设计语言不一样,它使用缩进来定义语句。

Python支持命令式程序设计、面向对象程序设计、函数式编程、面向切面编程、泛型编程多种编程范式。与Scheme、Ruby、Perl、Tcl等动态语言一样,Python具备垃圾回收功能,能够自动管理存储器使用。它经常被当作脚本语言用于处理系统管理任务和网络程序编写,然而它也非常适合完成各种高级任务。Python虚拟机本身几乎可以在所有的作业系统中运行。使用一些诸如py2exe、PyPy、PyInstaller之类的工具可以将Python源代码转换成可以脱离Python解释器运行的程序。

Python的主要参考实现是CPython,它是一个由社区驱动的自由软件。目前由Python软件基金会管理。基于这种语言的相关技术正在飞快的发展,用户数量快速扩大,相关的资源非常多。

关注 134837

认证与成就

  • 获得 3 次点赞
  • 获得 0 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 0 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2月22日
个人主页被 203 人浏览