木小宝

木小宝 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 个人简介什么都没有

个人动态

木小宝 收藏了文章 · 2020-12-11

手写线程池,对照学习ThreadPoolExecutor线程池实现原理!


作者:小傅哥
博客:https://bugstack.cn
Github:https://github.com/fuzhengwei/CodeGuide/wiki

沉淀、分享、成长,让自己和他人都能有所收获!😄

一、前言

人看手机,机器学习!

正好是2020年,看到这张图还是蛮有意思的。以前小时候总会看到一些科技电影,讲到机器人会怎样怎样,但没想到人似乎被娱乐化的东西,搞成了低头族、大肚子!

当意识到这一点时,其实非常怀念小时候。放假的早上跑出去,喊上三五个伙伴,要不下河摸摸鱼、弹弹玻璃球、打打pia、跳跳房子!一天下来真的不会感觉累,但现在如果是放假的一天,你的娱乐安排,很多时候会让头很累!

就像,你有试过学习一天英语头疼,还是刷一天抖音头疼吗?或者玩一天游戏与打一天球!如果你意识到了,那么争取放下一会手机,适当娱乐,锻炼保持个好身体!

二、面试题

谢飞机,小记!,上次吃亏在线程上,这次可能一次坑掉两次了!

谢飞机:你问吧,我准备好了!!!

面试官:嗯,线程池状态是如何设计存储的?

谢飞机:这!下一个,下一个!

面试官:Worker 的实现类,为什么不使用 ReentrantLock 来实现呢,而是自己继承AQS?

谢飞机:我...!

面试官:那你简述下,execute 的执行过程吧!

谢飞机:再见!

三、线程池讲解

1. 先看个例子

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
threadPoolExecutor.execute(() -> {
    System.out.println("Hi 线程池!");
});
threadPoolExecutor.shutdown();

// Executors.newFixedThreadPool(10);
// Executors.newCachedThreadPool();
// Executors.newScheduledThreadPool(10);
// Executors.newSingleThreadExecutor();

这是一段用于创建线程池的例子,相信你已经用了很多次了。

线程池的核心目的就是资源的利用,避免重复创建线程带来的资源消耗。因此引入一个池化技术的思想,避免重复创建、销毁带来的性能开销。

那么,接下来我们就通过实践的方式分析下这个池子的构造,看看它是如何处理线程的。

2. 手写一个线程池

2.1 实现流程

为了更好的理解和分析关于线程池的源码,我们先来按照线程池的思想,手写一个非常简单的线程池。

其实很多时候一段功能代码的核心主逻辑可能并没有多复杂,但为了让核心流程顺利运行,就需要额外添加很多分支的辅助流程。就像我常说的,为了保护手才把擦屁屁纸弄那么大!

图 21-1 线程池简化流程

关于图 21-1,这个手写线程池的实现也非常简单,只会体现出核心流程,包括:

  1. 有n个一直在运行的线程,相当于我们创建线程池时允许的线程池大小。
  2. 把线程提交给线程池运行。
  3. 如果运行线程池已满,则把线程放入队列中。
  4. 最后当有空闲时,则获取队列中线程进行运行。

2.2 实现代码

public class ThreadPoolTrader implements Executor {

    private final AtomicInteger ctl = new AtomicInteger(0);

    private volatile int corePoolSize;
    private volatile int maximumPoolSize;

    private final BlockingQueue<Runnable> workQueue;

    public ThreadPoolTrader(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
    }

    @Override
    public void execute(Runnable command) {
        int c = ctl.get();
        if (c < corePoolSize) {
            if (!addWorker(command)) {
                reject();
            }
            return;
        }
        if (!workQueue.offer(command)) {
            if (!addWorker(command)) {
                reject();
            }
        }
    }

    private boolean addWorker(Runnable firstTask) {
        if (ctl.get() >= maximumPoolSize) return false;

        Worker worker = new Worker(firstTask);
        worker.thread.start();
        ctl.incrementAndGet();
        return true;
    }

    private final class Worker implements Runnable {

        final Thread thread;
        Runnable firstTask;

        public Worker(Runnable firstTask) {
            this.thread = new Thread(this);
            this.firstTask = firstTask;
        }

        @Override
        public void run() {
            Runnable task = firstTask;
            try {
                while (task != null || (task = getTask()) != null) {
                    task.run();
                    if (ctl.get() > maximumPoolSize) {
                        break;
                    }
                    task = null;
                }
            } finally {
                ctl.decrementAndGet();
            }
        }

        private Runnable getTask() {
            for (; ; ) {
                try {
                    System.out.println("workQueue.size:" + workQueue.size());
                    return workQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private void reject() {
        throw new RuntimeException("Error!ctl.count:" + ctl.get() + " workQueue.size:" + workQueue.size());
    }

    public static void main(String[] args) {
        ThreadPoolTrader threadPoolTrader = new ThreadPoolTrader(2, 2, new ArrayBlockingQueue<Runnable>(10));

        for (int i = 0; i < 10; i++) {
            int finalI = i;
            threadPoolTrader.execute(() -> {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务编号:" + finalI);
            });
        }
    }

}

// 测试结果

任务编号:1
任务编号:0
workQueue.size:8
workQueue.size:8
任务编号:3
workQueue.size:6
任务编号:2
workQueue.size:5
任务编号:5
workQueue.size:4
任务编号:4
workQueue.size:3
任务编号:7
workQueue.size:2
任务编号:6
workQueue.size:1
任务编号:8
任务编号:9
workQueue.size:0
workQueue.size:0

以上,关于线程池的实现还是非常简单的,从测试结果上已经可以把最核心的池化思想体现出来了。主要功能逻辑包括:

  • ctl,用于记录线程池中线程数量。
  • corePoolSizemaximumPoolSize,用于限制线程池容量。
  • workQueue,线程池队列,也就是那些还不能被及时运行的线程,会被装入到这个队列中。
  • execute,用于提交线程,这个是通用的接口方法。在这个方法里主要实现的就是,当前提交的线程是加入到worker、队列还是放弃。
  • addWorker,主要是类 Worker 的具体操作,创建并执行线程。这里还包括了 getTask() 方法,也就是从队列中不断的获取未被执行的线程。

,那么以上呢,就是这个简单线程池实现的具体体现。但如果深思熟虑就会发现这里需要很多完善,比如:线程池状态呢,不可能一直奔跑呀!?线程池的锁呢,不会有并发问题吗?线程池拒绝后的策略呢?,这些问题都没有在主流程解决,也正因为没有这些流程,所以上面的代码才更容易理解。

接下来,我们就开始分析线程池的源码,与我们实现的简单线程池参考对比,会更加容易理解😄!

3. 线程池源码分析

3.1 线程池类关系图

图 21-2 线程池类关系图

以围绕核心类 ThreadPoolExecutor 的实现展开的类之间实现和继承关系,如图 21-2 线程池类关系图。

  • 接口 ExecutorExecutorService,定义线程池的基本方法。尤其是 execute(Runnable command) 提交线程池方法。
  • 抽象类 AbstractExecutorService,实现了基本通用的接口方法。
  • ThreadPoolExecutor,是整个线程池最核心的工具类方法,所有的其他类和接口,为围绕这个类来提供各自的功能。
  • Worker,是任务类,也就是最终执行的线程的方法。
  • RejectedExecutionHandler,是拒绝策略接口,有四个实现类;AbortPolicy(抛异常方式拒绝)DiscardPolicy(直接丢弃)DiscardOldestPolicy(丢弃存活时间最长的任务)CallerRunsPolicy(谁提交谁执行)
  • Executors,是用于创建我们常用的不同策略的线程池,newFixedThreadPoolnewCachedThreadPoolnewScheduledThreadPoolnewSingleThreadExecutor

3.2 高3位与低29位

图 22-3 线程状态,高3位与低29位

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

ThreadPoolExecutor 线程池实现类中,使用 AtomicInteger 类型的 ctl 记录线程池状态和线程池数量。在一个类型上记录多个值,它采用的分割数据区域,高3位记录状态,低29位存储线程数量,默认 RUNNING 状态,线程数为0个。

3.2 线程池状态

图 22-4 线程池状态流转

图 22-4 是线程池中的状态流转关系,包括如下状态:

  • RUNNING:运行状态,接受新的任务并且处理队列中的任务。
  • SHUTDOWN:关闭状态(调用了shutdown方法)。不接受新任务,,但是要处理队列中的任务。
  • STOP:停止状态(调用了shutdownNow方法)。不接受新任务,也不处理队列中的任务,并且要中断正在处理的任务。
  • TIDYING:所有的任务都已终止了,workerCount为0,线程池进入该状态后会调 terminated() 方法进入TERMINATED 状态。
  • TERMINATED:终止状态,terminated() 方法调用结束后的状态。

3.3 提交线程(execute)

图 22-5 提交线程流程图

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

在阅读这部分源码的时候,可以参考我们自己实现的线程池。其实最终的目的都是一样的,就是这段被提交的线程,启动执行加入队列决策策略,这三种方式。

  • ctl.get(),取的是记录线程状态和线程个数的值,最终需要使用方法 workerCountOf(),来获取当前线程数量。`workerCountOf 执行的是 c & CAPACITY 运算
  • 根据当前线程池中线程数量,与核心线程数 corePoolSize 做对比,小于则进行添加线程到任务执行队列。
  • 如果说此时线程数已满,那么则需要判断线程池是否为运行状态 isRunning(c)。如果是运行状态则把不能被执行的线程放入线程队列中。
  • 放入线程队列以后,还需要重新判断线程是否运行以及移除操作,如果非运行且移除,则进行拒绝策略。否则判断线程数量为0后添加新线程。
  • 最后就是再次尝试添加任务执行,此时方法 addWorker 的第二个入参是 false,最终会影响添加执行任务数量判断。如果添加失败则进行拒绝策略。

3.5 添加执行任务(addWorker)

图 22-6 添加执行任务逻辑流程

private boolean addWorker(Runnable firstTask, boolean core)

第一部分、增加线程数量

retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);
    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;
    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        if (compareAndIncrementWorkerCount(c))
            break retry;
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

第一部分、创建启动线程

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int rs = runStateOf(ctl.get());
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;

添加执行任务的流程可以分为两块看,上面代码部分是用于记录线程数量、下面代码部分是在独占锁里创建执行线程并启动。这部分代码在不看锁、CAS等操作,那么就和我们最开始手写的线程池基本一样了

  • if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())),判断当前线程池状态,是否为 SHUTDOWNSTOPTIDYINGTERMINATED中的一个。并且当前状态为 SHUTDOWN、且传入的任务为 null,同时队列不为空。那么就返回 false。
  • compareAndIncrementWorkerCount,CAS 操作,增加线程数量,成功就会跳出标记的循环体。
  • runStateOf(c) != rs,最后是线程池状态判断,决定是否循环。
  • 在线程池数量记录成功后,则需要进入加锁环节,创建执行线程,并记录状态。在最后如果判断没有启动成功,则需要执行 addWorkerFailed 方法,剔除到线程方法等操作。

3.6 执行线程(runWorker)

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) 
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

其实,有了手写线程池的基础,到这也就基本了解了,线程池在干嘛。到这最核心的点就是 task.run() 让线程跑起来。额外再附带一些其他流程如下;

  • beforeExecuteafterExecute,线程执行的前后做一些统计信息。
  • 另外这里的锁操作是 Worker 继承 AQS 自己实现的不可重入的独占锁。
  • processWorkerExit,如果你感兴趣,类似这样的方法也可以深入了解下。在线程退出时候workers做到一些移除处理以及完成任务数等,也非常有意思

3.7 队列获取任务(getTask)

如果你已经开始阅读源码,可以在 runWorker 方法中,看到这样一句循环代码 while (task != null || (task = getTask()) != null)。这与我们手写线程池中操作的方式是一样的,核心目的就是从队列中获取线程方法。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
  • getTask 方法从阻塞队列中获取等待被执行的任务,也就是一条条往出拿线程方法。
  • if (rs >= SHUTDOWN ...,判断线程是否关闭。
  • wc = workerCountOf(c),wc > corePoolSize,如果工作线程数超过核心线程数量 corePoolSize 并且 workQueue 不为空,则增加工作线程。但如果超时未获取到线程,则会把大于 corePoolSize 的线程销毁掉。
  • timed,是 allowCoreThreadTimeOut 得来的。最终 timed 为 true 时,则通过阻塞队列的poll方法进行超时控制。
  • 如果在 keepAliveTime 时间内没有获取到任务,则返回null。如果为false,则阻塞。

四、总结

  • 这一章节并没有完全把线程池的所有知识点都介绍完,否则一篇内容会有些臃肿。在这一章节我们从手写线程池开始,逐步的分析这些代码在Java的线程池中是如何实现的,涉及到的知识点也几乎是我们以前介绍过的内容,包括:队列、CAS、AQS、重入锁、独占锁等内容。所以这些知识也基本是环环相扣的,最好有一些根基否则会有些不好理解。
  • 除了本章介绍的,我们还没有讲到线程的销毁过程、四种线程池方法的选择和使用、以及在CPU密集型任务IO 密集型任务时该怎么配置。另外在Spring中也有自己实现的线程池方法。这些知识点都非常贴近实际操作。
  • 好了,今天的内容先扯到这,后续的内容陆续完善。如果以上内容有错字、流程缺失、或者不好理解以及描述错误,欢迎留言。互相学习、互相进步。

五、系列推荐

查看原文

木小宝 发布了文章 · 2020-11-06

记一次docker-compose启动的内部服务响应超慢的问题

docker-compose启动的内部服务首页出来耗时用了10几秒,真是被震惊了,首页用到的服务是3个。后来发现是因为docker的dns解析问题导致的。

vim /etc/systemd/system/docker.service.d/docker-dns.conf
image.png
原本ndot这里是2,什么意思呢?
表示域名请求中点的个数,例如:www.baidu.com,ndot就是2,包含2个点。而我的服务中呢,就不是严格按照域名的方式来写的,都是一个单词,像php这种。那这会发生什么呢?
php中ndot是0,比原始配置的2小,会按照dns-search的配置拼上默认的svc.cluster.local,然后按php.svc.cluster.local这个名字去找服务了,结果找不到,按opt里的配置,超时2s,重试2次,之后再按照我原本的服务名去找,结果找到了,一个服务这就4s过去了,然后我所有的服务都没有点。就呵呵了。

改完之后重启docker守护进程:
systemctl daemon-reload

再重启docker服务:
service docker restart

ok了,变毫秒响应了!

查看原文

赞 0 收藏 0 评论 0

木小宝 发布了文章 · 2020-06-19

SparkStreaming读kafka数据保存为一个文件

需求是这样的:kafka里的数据存至afs上,每条数据都有一个start_time字段,是时间戳格式。现需要按照start_time字段存到具体的某一天某个小时的路径下,类似这种目录:xxx/2020-01-01(日期)/16(小时)/xxx

那就开始吧:
pom.xml

<properties>
        <target.dir>log-processor</target.dir>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <log4j.version>2.8.2</log4j.version>
        <disruptor.version>3.3.6</disruptor.version>
        <org.springframework.version>4.3.10.RELEASE</org.springframework.version>
        <scala.version>2.11.0</scala.version>
        <hadoop.version>2.7.4</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty</artifactId>
                </exclusion>
            </exclusions>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.18.Final</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.2.4</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- 会有包冲突导致的异常,因此排除net.jpountz.lz4-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>net.jpountz.lz4</groupId>
                    <artifactId>lz4</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
public class SparkStreaming {
    public static void main(String[] args) throws InterruptedException {
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokers");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroupId");
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

        SparkConf conf = new SparkConf().setAppName("sparkTask");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
        jssc.sparkContext().setLogLevel("WARN");

        String brokers = "broker1, broker2";
        final JavaInputDStream<ConsumerRecord<String, String>> lines = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(new HashSet<>(Arrays.asList(brokers)), kafkaParams));

        lines.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
            public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
                JsonObject obj = null;
                String date = null;
                try {
                    obj = new JsonParser().parse(record.value()).getAsJsonObject();
                    Long startTime = Long.parseLong(obj.get("start_time").getAsString());
                    LocalDateTime time = LocalDateTime.ofEpochSecond(startTime, 0, ZoneOffset.ofHours(8));
                    date = time.toLocalDate() + "/" + time.getHour();
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
                return new Tuple2<String, String>(date, record.value());
            }
        }).foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
            @Override
            public void call(JavaPairRDD<String, String> dateStringJavaPairRDD) {
                try {
                    // 取出(date, line)中date并去重
                    List<String> dateKeys = dateStringJavaPairRDD.map(new Function<Tuple2<String, String>, String>() {
                        @Override
                        public String call(Tuple2<String, String> v1) throws Exception {
                            return v1._1;
                        }
                    }).distinct().collect();
                    // 根据date过滤数据并分别写入不同目录
                    for (String dateKey : dateKeys) {
                        String savePath = String.join("/",
                                new String[]{"feature", dateKey,
                                        String.valueOf(Timestamp.valueOf(LocalDateTime.now()).getTime())});
                        JavaRDD<String> resultRdd = null;
                        resultRdd = dateStringJavaPairRDD.filter(new Function<Tuple2<String, String>, Boolean>() {
                            @Override
                            public Boolean call(Tuple2<String, String> v1) throws Exception {
                                if (v1._1.equals(dateKey)) {
                                    return true;
                                } else {
                                    return false;
                                }
                            }
                        }).map(new Function<Tuple2<String, String>, String>() {

                            @Override
                            public String call(Tuple2<String, String> v1) throws Exception {
                                return v1._2;
                            }
                        });
                        resultRdd.repartition(1).saveAsTextFile(savePath);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        jssc.start();
        jssc.awaitTermination();
    }
}
查看原文

赞 0 收藏 0 评论 0

木小宝 关注了专栏 · 2020-06-05

CodeGuide | 程序员编码指南

公众号:bugstack虫洞栈,回复:设计模式,可以下载《重学Java设计模式》PDF,全网下载量17万+ | 这是一本互联网真实案例实践书籍。以落地解决方案为核心,从实际业务中抽离出,交易、营销、秒杀、中间件、源码等22个真实场景,来学习设计模式的运用。

关注 12262

木小宝 赞了文章 · 2020-05-08

优秀!Github上10个开源免费的后台控制面板你值得拥有!

来源:www.jianshu.com/p/3bc7404af887

Web 开发中几乎的平台都需要一个后台管理,但是从零开发一套后台控制面板并不容易,幸运的是有很多开源免费的后台控制面板可以给开发者使用,那么有哪些优秀的开源免费的控制面板呢?我在 Github 上收集了一些优秀的后台控制面板,并总结得出 Top 10。


1. AdminLTE

Github Star 数 24969 , Github 地址:

https://github.com/almasaeed2...

非常流行的基于 Bootstrap 3.x 的免费的后台 UI 框架。


2. vue-Element-Admin

Github Star 数 19546, Github 地址:

https://github.com/PanJiaChen...

一个基于 vue2.0 和 Eelement 的控制面板 UI 框架。


3. tabler

Github Star 数 15870, Github 地址:

https://github.com/tabler/tabler

构建在 BootStrap 4 之上的免费的 HTML 控制面板框架


4. Gentelella

Github Star 数 15654, Github 地址:

https://github.com/puikinsh/g...

一个基于 Bootstarp 的免费的后台控制面板。


5. ng2-admin

Github Star 数 13181, Github 地址:

https://github.com/akveo/ngx-...

基于 Angular 2, Bootstrap 4 和 Webpack 的后台管理面板框架。


6. ant-design-pro

Github Star 数 12707,Github 地址:

https://github.com/ant-design...

开箱即用的中台前端/设计解决方案


7. blur-admin

Github Star 数 9241,Github 地址:

https://github.com/akveo/blur...

基于 Angular 和 Bootstrap 的后台管理面板框架。


8. vue-admin

Github Star 数 8676,Github 地址:

https://github.com/vue-bulma/...

基于 Vue 和 Bulma 的控制面板。


9. iview-admin

Github Star 数 8668,Github 地址:

https://github.com/iview/ivie...

基于 iView 的 Vue 2.0 控制面板。


10. material-dashboard

Github Star 数 7111,Github 地址:

https://github.com/creativeti...

基于 Bootstrap 4 和 Material 风格的控制面板。

欢迎小伙伴留言评论、指正。如有帮助,欢迎点赞+转发分享。

欢迎大家关注民工哥的公众号:民工哥技术之路
image.png

查看原文

赞 21 收藏 17 评论 0

木小宝 发布了文章 · 2020-01-15

redis启动报错

redis版本:3.2.12
安装完redis后发现启动有警告:虽不影响正常使用,但还是要解决。

如下:
8029FDA1-6945-41C1-95F9-55739E00572C.JPG

警告1:

 WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.

解决方案:
在/etc/sysctl.conf中添加:net.core.somaxconn= 1024,然后执行:sysctl -p

警告2:

 WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.

使用的透明大页可能导致redis延迟和使用问题。

解决方案:
在/etc/rc.local文件中添加:

if test -f /sys/kernel/mm/redhat_transparent_hugepage/enabled; then
   echo never > /sys/kernel/mm/redhat_transparent_hugepage/enabled
fi
查看原文

赞 0 收藏 0 评论 0

木小宝 发布了文章 · 2019-12-18

linux搭建maven私服

1、下载nexus
链接: https://pan.baidu.com/s/1e8Ww... 提取码: qj2f
将解压后的文件复制到/usr/local/nexus/目录下。

2、修改配置
2.1.修改nexus3的运行用户为root
cd /usr/local/nexus/nexus-3.20.0-04/bin
vim nexus.rc

run_as_user="root"

2.2.修改nexus3启动时要使用的jdk版本
vim nexus
第14行:

INSTALL4J_JAVA_HOME_OVERRIDE=/home/work/jdk1.8.0_171

2.3.修改nexus3默认端口
cd /usr/local/nexus/nexus-3.20.0-04/etc
vim nexus-default.properties

application-port=8083

2.4.修改nexus3数据及日志的存放位置
cd /usr/local/nexus/nexus-3.20.0-04/bin
vim nexus.vmoptions

-XX:LogFile=/data/nexus3/log/jvm.log
-Dkaraf.data=/data/nexus3
-Dkaraf.log=/data/nexus3/log

2.5.启动nexus服务

nohup ./nexus run &

出现以下日志说明启动成功:

Started Sonatype Nexus OSS 3.20.0-04

3、访问页面
通过ip:8083访问页面,用户名:admin,密码:第一次登录密码在Dkaraf.data配置的路径下有一个password的文件,里面是初始密码,登录之后可修改密码。

参考链接:https://www.jianshu.com/p/06c1ca9052eb

查看原文

赞 0 收藏 0 评论 0

木小宝 收藏了文章 · 2019-08-14

9. java 多态

概念

所谓多态,就是指一个引用变量(类型)在不同的情况下的多种状态。也可以理解为,多态是指通过指向父类的指针,来调用在不同子类中实现的方法。多态性是对象多种表现形式的体现

多态性严格来说有两种描述形式:

  1. 方法的多态性:

    • 方法的重载
    • 方法的重写
  2. 对象的多态性: 指的是发生在继承关系类之中,子类和父类之前的转换问题

    • 向上转型 : 父类 父类对象 = 子类实例
    • 向下转型 : 子类 子类对象 = (子类)父类实例 。 需进行强制转换,有风险,最好使用 instanceof 进行判断。
class A {
    public void fun(){
        System.out.println("A ");
    }
}

class B extends A{
    public void fun(){  //重写父类方法
        System.out.println("B");
    }
}
public class testDemo {
    public static void main(String[] args) {
        A a = new B(); //向上转型
        B b = (B)a; //向下转型
        a.fun();
         // a.funb();  //父类中不能调用子类中扩展的方法
        b.fun();
        System.out.println(a instanceof A);
        System.out.println(a instanceof B);
        System.out.println(b instanceof A);
        System.out.println("**********");
        A a2 = new A();
        System.out.println(a2 instanceof A);
        System.out.println(a2 instanceof B);
    }
}
输出结果:
B
B
true
true
true
**********
true
false

Java实现多态有三个必要条件:继承、重写、向上转型。

继承:在多态中必须存在有继承关系的子类和父类。

重写:子类对父类中某些方法进行重新定义,在调用这些方法时就会调用子类的方法。

向上转型:在多态中需要将子类的引用赋给父类对象,只有这样该引用才能够具备技能调用父类的方法和子类的方法。

只有满足了上述三个条件,我们才能够在同一个继承结构中使用统一的逻辑实现代码处理不同的对象,从而达到执行不同的行为。

对于Java而言,它多态的实现机制遵循一个原则:当超类对象引用了子类对象时,被引用对象的类型而不是引用变量的类型决定了调用谁的成员方法,但是这个被调用的方法必须是在超类中定义过的,也就是说被子类覆盖的方法。

多态分为编译时多态和运行时多态。其中编辑时多态是静态的,主要是指方法的重载,它是根据参数列表的不同来区分不同的函数,通过编辑之后会变成两个不同的函数,在运行时谈不上多态。而运行时多态是动态的,它是通过动态绑定来实现的,也就是我们所说的多态性。

在Java中有两种形式可以实现多态:继承和接口。

多态的优点

  1. 消除类型之间的耦合关系
  2. 可替换性
  3. 可扩充性
  4. 接口性
  5. 灵活性
  6. 简化性
查看原文

木小宝 收藏了文章 · 2019-07-24

Elasticsearch 参考指南(基本概念)

基本概念

有一些概念是Elasticsearch的核心,从一开始就理解这些概念将极大地帮助简化学习过程。

接近实时(NRT)

Elasticsearch是一个近乎实时的搜索平台,这意味着从索引文档到可搜索文档的时间有一点延迟(通常是一秒)。

集群

集群是一个或多个节点(服务器)的集合,它们共同保存您的整个数据,并提供跨所有节点的联合索引和搜索功能。集群由一个唯一的名称标识,默认情况下这个名称是“elasticsearch”。这个名称很重要,因为如果节点被设置为以这个名称加入集群,那么节点才能是集群的一部分。

确保不要在不同的环境中重用相同的集群名称,否则可能会导致节点加入错误的集群。例如,你可以使用logging-devlogging-stagelogging-prod进行开发、阶段和生产集群。

节点

节点是作为群集一部分的单个服务器,存储数据并参与群集的索引和搜索功能。就像群集一样,节点由名称标识,默认情况下,该名称是在启动时分配给节点的随机通用唯一标识符(UUID)。如果不想要默认的节点名,可以定义任何想要的节点名。此名称对于管理目的非常重要,你可以在其中识别网络中哪些服务器与Elasticsearch集群中的哪些节点相对应。

节点可以被配置为通过集群名称加入特定的集群。默认情况下,每个节点都设置为加入名为elasticsearch的集群,这意味着如果你在网络上启动了许多节点并且假设它们可以相互发现 - 它们将自动形成并加入名为elasticsearch的单个集群。

在单个群集中,你可以拥有任意数量的节点。此外,如果你的网络上当前没有其他Elasticsearch节点正在运行,则默认情况下,启动单个节点将形成名为elasticsearch的新单节点集群。

索引

索引是具有某些类似特征的文档集合。例如,你可以拥有客户数据的索引,产品目录的另一个索引以及订单数据的另一个索引。索引由名称标识(必须全部小写),此名称用于在对其中的文档执行索引,搜索,更新和删除操作时引用索引。

在单个群集中,你可以根据需要定义任意数量的索引。

类型

在6.0.0中已弃用
参见删除映射类型

一种类型,曾经是索引的逻辑类别/分区,允许你在同一索引中存储不同类型的文档,例如一种类型用于用户,另一种类型用于博客帖子。不再可能在索引中创建多个类型,并且将在更高版本中删除类型的整个概念,更多信息请参见删除映射类型

文档

文档是可以被索引的基本信息单元。例如,你可以为一个客户提供一个文档,为一个产品提供另一个文档,为一个订单提供另一个文档,该文档用JSON (JavaScript对象表示法)表示,这是一种普遍存在的internet数据交换格式。

在索引/类型中,可以存储任意数量的文档,请注意,尽管文档在物理上驻留在索引中,但实际上文档必须被索引/分配到索引中的类型中。

碎片和副本

索引可能存储大量可能超过单个节点的硬件限制的数据。例如,占用1TB磁盘空间的十亿个文档的单个索引可能不适合单个节点的磁盘,或者可能太慢而无法单独从单个节点提供搜索请求。

为了解决这个问题,Elasticsearch提供了将索引细分为多个称为碎片的能力,创建索引时,只需定义所需的碎片数即可。每个碎片本身都是一个功能齐全且独立的“索引”,可以托管在集群中的任何节点上。

碎片之所以重要,主要有两个原因:

  • 它允许你水平分割/缩放你的内容量
  • 它允许你跨碎片(可能在多个节点上)分发和并行操作,从而提高性能/吞吐量

碎片的分布方式以及如何将其文档聚合回搜索请求的机制完全由Elasticsearch管理,对用户而言是透明的。

在随时都可能发生故障的网络/云环境中,这非常有用,强烈建议使用故障转移机制,以防碎片/节点以某种方式脱机或因任何原因消失。为此,Elasticsearch允许你将索引的碎片的一个或多个副本制作成所谓的副本碎片或简称副本。

复制之所以重要,主要有两个原因:

  • 它在碎片/节点发生故障时提供高可用性。因此,请务必注意,副本碎片永远不会在与从中复制的原始/主碎片相同的节点上分配。
  • 它允许你扩展搜索量/吞吐量,因为可以在所有副本上并行执行搜索。

总而言之,每个索引可以拆分为多个碎片,索引也可以复制为零(表示没有副本)或更多次。复制后,每个索引都将具有主碎片(从中复制的原始碎片)和副本碎片(主碎片的副本)。

可以在创建索引时为每个索引定义碎片和副本的数量。创建索引后,你还可以随时动态更改副本数,你可以使用_shrink_split API更改现有索引的碎片数,但这不是一项简单的任务,预先计划正确数量的碎片是最佳方法。

默认情况下,Elasticsearch中的每个索引都分配了5个主碎片和1个副本,这意味着如果群集中至少有两个节点,则索引将包含5个主碎片和另外5个副本碎片(1个完整副本),总计为每个索引10个碎片。

每个Elasticsearch碎片都是Lucene索引,单个Lucene索引中可以包含最大数量的文档。从LUCENE-5843开始,这个极限是2,147,483,519(= Integer.MAX_VALUE - 128)个文档,你可以使用_cat/shards API监视碎片大小。

有了这些,让我们开始有趣的部分……


上一篇:介绍

下一篇:安装

查看原文

木小宝 关注了标签 · 2018-06-05

区块链

区块链(英语:Blockchain 或 Block chain)是一种分布式数据库,起源自比特币。区块链是一串使用密码学方法相关联产生的数据块,每一个数据块中包含了一次比特币网络交易的信息,用于验证其信息的有效性(防伪)和生成下一个区块。该概念在中本聪的白皮书中提出,中本聪创造第一个区块,即“创世区块”。

区块链在网络上是公开的,可以在每一个离线比特币钱包数据中查询。比特币钱包的功能依赖于与区块链的确认,一次有效检验称为一次确认。通常一次交易要获得数个确认才能进行。轻量级比特币钱包使用在线确认,即不会下载区块链数据到设备存储中。

比特币的众多竞争币也使用同样的设计,只是在工作量证明上和算法上略有不同。如,采用权益证明和 SCrypt 算法等等。

关注 51199

认证与成就

  • 获得 0 次点赞
  • 获得 0 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 0 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2018-06-05
个人主页被 539 人浏览