EVAO_大个子

EVAO_大个子 查看完整档案

西安编辑西北工业大学  |  交通信息工程及控制 编辑  |  填写所在公司/组织 segmentfault.com/u/evao_dagezi 编辑
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 个人简介什么都没有

个人动态

EVAO_大个子 发布了文章 · 2019-07-26

Java核心技术 卷1 基础知识 学习笔记——第四章 对象与类

1.学习LocalDate

    public static void main(String\u005B\u005D args) throws IOException {
        LocalDate date = LocalDate.now();
        int month = date.getMonthValue();
        int today = date.getDayOfMonth();

        date = date.minusDays(today - 1); // Set to start of month,minusDays(int n)方法生成当前日期之后或之前N天的日期
        DayOfWeek weekday = date.getDayOfWeek();
        int value = weekday.getValue(); // 1 = Monday, ... 7 = Sunday

        System.out.println("Mon Tue Wed Thu Fri Sat Sun");
        for (int i = 1; i < value; i++)
            System.out.print("    ");
        while (date.getMonthValue() == month) {
            System.out.printf("%3d", date.getDayOfMonth());
            if (date.getDayOfMonth() == today)
                System.out.print("*");
            else
                System.out.print(" ");
            date = date.plusDays(1);
            if (date.getDayOfWeek().getValue() == 1)
                System.out.println();
        }
        if (date.getDayOfWeek().getValue() != 1)
            System.out.println();
    }

输出结果
clipboard.png
2.Java文件名必须与public类的名字相匹配,在一个源文件中,只能有一个公共类。
3.如果需要返回一个可变对象的引用,应该首先对他进行克隆(clone);
4.一个方法不能修改一个基本数据类型的参数(即数值型或布尔型);
一个方法可以改变一个对象参数的状态;
一个方法不能让对象参数引用一个新的对象;

public class ParamTest
{
   public static void main(String[] args)
   {
      /*
       * Test 1: Methods can't modify numeric parameters
       */
      System.out.println("Testing tripleValue:");
      double percent = 10;
      System.out.println("Before: percent=" + percent);
      tripleValue(percent);
      System.out.println("After: percent=" + percent);

      /*
       * Test 2: Methods can change the state of object parameters
       */
      System.out.println("\nTesting tripleSalary:");
      Employee harry = new Employee("Harry", 50000);
      System.out.println("Before: salary=" + harry.getSalary());
      tripleSalary(harry);
      System.out.println("After: salary=" + harry.getSalary());

      /*
       * Test 3: Methods can't attach new objects to object parameters
       */
      System.out.println("\nTesting swap:");
      Employee a = new Employee("Alice", 70000);
      Employee b = new Employee("Bob", 60000);
      System.out.println("Before: a=" + a.getName());
      System.out.println("Before: b=" + b.getName());
      swap(a, b);
      System.out.println("After: a=" + a.getName());
      System.out.println("After: b=" + b.getName());
   }

   public static void tripleValue(double x) // doesn't work
   {
      x = 3 * x;
      System.out.println("End of method: x=" + x);
   }

   public static void tripleSalary(Employee x) // works
   {
      x.raiseSalary(200);
      System.out.println("End of method: salary=" + x.getSalary());
   }

   public static void swap(Employee x, Employee y)
   {
      Employee temp = x;
      x = y;
      y = temp;
      System.out.println("End of method: x=" + x.getName());
      System.out.println("End of method: y=" + y.getName());
   }
}

class Employee // simplified Employee class
{
   private String name;
   private double salary;

   public Employee(String n, double s)
   {
      name = n;
      salary = s;
   }

   public String getName()
   {
      return name;
   }

   public double getSalary()
   {
      return salary;
   }

   public void raiseSalary(double byPercent)
   {
      double raise = salary * byPercent / 100;
      salary += raise;
   }
}

5.如果没有指定private或者public,这一部分可以被同一个包中的所有方法访问;

查看原文

赞 0 收藏 0 评论 0

EVAO_大个子 发布了文章 · 2019-07-24

Java核心技术 卷1 基础知识 学习笔记——第三章 java的基本程序设计结构

1. 一个正整数除以0的结果为正无穷大,0/0或者负数的平方根的结果为NaN(不是一个数字);
if(x==Double.NaN)//is never true,所有非数值的值都认为是不同的
if(Double.isNaN(x))//检查x是否为不是一个数字

2. System.out.println(2.0-1.1);
输出的值为0.8999999999999999,浮点数值采用二进制系统表示,无法精确表示分数1/10;

3. char

public static void main(Stringu005Bu005D args)
System.out.println("u0022+u0022");//表示“”+“”,输出空
// c:user 会报错,u后面未跟着4个16进制数

4. 使用static final声明一个类常量
5. 运算符
15/2输出7,15%2输出1,15.0/2输出7.5
System.out.println(1.5/0);//输出Infinity
System.out.println(2/0);System.out.println(0/0);
报Exception in thread "main" java.lang.ArithmeticException: / by zero
strictfp关键字标记的方法必须使用严格的浮点计算来生成可再生的结果。
++i:先加1;i++:先获取值再加1
a+=b+=c等于a+=(b+=c)

6.在源文件顶部引入math,使用时就不用再写Math
import static java.lang.Math.*;
System.out.println(sqrt(2));
7.类型转换
实心箭头表示无信息丢失的转换,虚线箭头表示可能有精度损失的转换;
clipboard.png
图片描述
System.out.println((byte)300);//输出44
8.字符串
String all = String.join("/", "S", "M", "L", "XL", "XXL");
输出:S/M/L/XL/XXL
9.码点的问题

10.javaApi地址:https://docs.oracle.com/javase/8/docs/api
11.每次连接字符串,都会构建一个新的String对象。
12.Scanner,printf的用法,printf的详细用法请看书

        Scanner in = new Scanner(System.in);

        // get first input
        System.out.print("What is your name? ");
        String name = in.nextLine();

        // get second input
        System.out.print("How old are you? ");
        int age = in.nextInt();

        // display output on console
        System.out.printf("Hello,%s. Next year, you'll be %d",name,age+1);
        System.out.println();
        Double x = 100000.0 / 3;
        System.out.println(x);
        System.out.printf("%8.2f", x);
Scanner in = new Scanner(Paths.get("C:\\Users\\yr\\Desktop\\3.txt"), "UTF-8");//读取文件
PrintWriter out = new PrintWriter("C:\\Users\\yr\\Desktop\\3.txt", "UTF-8");//写文件

13.

    public static void main(String\u005B\u005D args) throws IOException {
        for (double i = 0; i != 10; i += 0.1) {
            System.out.println(i);
        }

    }//这个循环可能永远无法停止,因为二级制无法精确表示0.1

clipboard.png

14.switch语句
编译代码时,可以考虑加上-Xlint:fallthrough选项
如:javac -Xlint:fallthrough Test.java
如果某个分支缺少一个break语句,编译器就会给出一个警告信息
15.带标签的break

        int n = 1;
        lable: while (true) {
            if (n < 5)
                break lable;
        }

16.对象数组的元素初始化为一个特殊值null
17.java实际没有多维数组,只有一维数组,多维数组被解释为数组的数组。

查看原文

赞 0 收藏 0 评论 0

EVAO_大个子 赞了文章 · 2019-06-14

开源区块链监控系统, CITA-Monitor 助力运维人员实时把控链的运行状态

CITA 是秘猿科技从 2016 年就开始研发,2017 年开源的高性能区块链内核。CITA 作为高性能区块链内核,可以用来开发各种联盟链,甚至公有链系统,具有为稳定、高效、灵活、可适应未来等特点。为了降低使用门槛,我们还提供了增加 CITA 易用性的工具链:包括钱包,缓存服务器,SDK,合约调试工具等等。这些项目的代码全部在 Github 上开源,用户可以根据需求进行个性化改造。本文是 CITA 工具链介绍的第一篇文章

CITA 生态工具又增一枚利器:CITA-Monitor

区块链服务程序是一个 7x24 小时的工作软件,节点分布在不同网络的主机中。作为运维人员,需要关注服务是否正常工作,包括服务中的区块链数据是否能够正常同步、软件进程是否存活、用来存储数据的空间是否足够、其他节点是否正常工作等,因此一个能够实时、直观了解这些指标,并且在运维人员没有主动关注时,也能及时收到服务异常告警通知的监控系统,是十分重要且必要的。

因此,为了给运维人员提供更好的用户体验,秘猿科技研发并开源了 CITA-Monitor 监控系统, 用以监控 CITA 区块链服务运行状态。CITA-Monitor 监控的指标包括:区块链数据、服务进程状态、运行环境的 CPU /存储器/磁盘使用率等主机信息等。

图片描述

为了能够让数据情况能够一目了然,我们开发了数据可视化面板,节点管理员可以轻松了解节点的运行健康状态。此外,我们还内置了关键的告警规则,例如服务进程状态告警,如微服务、依赖服务进程存活;区块链数据状态告警,如出块高度、出块间隔时间、交易数据的 TPS;运行环境状态警告,如磁盘空间不足,经过简单配置收发邮箱即可第一时间收到相关告警邮件。CITA-Monitor 详细介绍请移步 GitHub 查阅。

功能列表

  • CITA 服务进程监控

    • CITA 微服务及MQ进程的存活、进程的 CPU、内存使用率、IO
  • 区块链数据健康监控

    • 节点出块高度历史、出块时间、出块间隔趋势、Quota、交易量历史、TPS、磁盘占用比例、数据目录大小增长趋势
  • 运行环境监控

    • 主机运行环境的系统负载、CPU、内存、磁盘空间使用情况、网络流量、TCP 连接数等
  • 故障告警通知

    • 支持邮件通知、Slack 通知、短信通知(Pro 版)
    • 监控告警策略
  • 节点网络监控(Pro 版)

    • 连接节点数、网络拓扑、地理位置等
  • 鉴源限流(Pro 版)

    • 鉴别请求来源、工具;限制访问来源、频率
  • JSONRPC 接口调用分析(Pro 版)

    • 统计分析 RPC 方法的请求时间、请求次数

仪表板的监控指标

  • Summary Dashboard

    • 节点列表
    • 各节点最新块高
    • 各节点 CPU 使用率变化
    • 各节点监控进程存活
  • CITA Node Info Dashboard

    • Node Info - 选定节点的详细信息,包括区块链数据、运行环境、运行软件信息
    • CITA Meta Data - 链的配置信息,如 Chain Name、创建时间等
    • Chain Info - 链的最新块高、共识节点数、共识节点出块历史趋势
  • Host Info Dashboard

    • 各节点运行主机的信息,包括系统负载、CPU、内存、硬盘使用率、网络流量
  • Process Info Dashboard

    • 节点中 CITA 微服进程的存活历史、CPU、内存、IO 变化历史
  • RabbitMQ Dashboard

    • RabbitMQ 服务的存活状态、channels 、consumers、connections、queues 等的变化记录

更细节可查看:监控指标信息结构

系统架构

图片描述

更多仪表盘截图

图片描述
图片描述

查看原文

赞 9 收藏 1 评论 1

EVAO_大个子 发布了文章 · 2019-06-14

前台AJAX传数组,后台的java接收

前台AJAX传数组,后台的java接收(后台接收前端发送的数组类型数据)两种解决方法

第一种方法,前端将数组通过JSON.stringify()方法转换为json格式数据,后台将接收的json数据转换为数组

function search() {
            var equiNames = JSON.stringify($("#equiNames").val());
            var startDate = $('#daterange-btn span').text().substring(0, 10);
            var endDate = $('#daterange-btn span').text().substring(13);
            $.ajax({
                url : "dataAcquisition/report",
                type : "post",
                dataType : "json",
                data : {
                    "equiNames" : equiNames,
                    "startDate" : startDate,
                    "endDate" : endDate
                },
                success : function(result) {
                    ……
                    }
                }
            });
        }
@RequestMapping("/report")
    public void report(String equiNames, String startDate, String endDate, HttpServletRequest request,
            HttpServletResponse response) throws ExecutionException, InterruptedException, IOException, ParseException {
        //将接收的json数据转换为数组
        List<String> equiNameList = new Gson().fromJson(equiNames, new TypeToken<List<String>>() {
        }.getType());
        List<DataAcquisitionVo> resultList = dataAcquisitionService.report(equiNameList, startDate, endDate);
        response.setContentType("application/json; charset=UTF-8");
        response.getWriter().write(new Gson().toJson(resultList));
    }

第二种方法,前端通过设置traditional属性为true直接传递数组 */,后台通过对象接收

function search() {
            var equiNames = JSON.stringify($("#equiNames").val());
            var startDate = $('#daterange-btn span').text().substring(0, 10);
            var endDate = $('#daterange-btn span').text().substring(13);
            $.ajax({
                url : "dataAcquisition/report",
                type : "post",
                dataType : "json",
                traditional : true,//用传统方式序列化数据
                data : {
                    "equiNames" : equiNames,
                    "startDate" : startDate,
                    "endDate" : endDate
                },
                success : function(result) {
                    ……
                    }
                }
            });
        }

对象

@RequestMapping("/report")
    public void report(ReportParaVo rp, HttpServletRequest request, HttpServletResponse response)
            throws ExecutionException, InterruptedException, IOException, ParseException {
        
        List<DataAcquisitionVo> resultList = dataAcquisitionService.report(rp);
        response.setContentType("application/json; charset=UTF-8");
        response.getWriter().write(new Gson().toJson(resultList));
    }
import java.util.List;

public class ReportParaVo {
    private List<String> equiNames;
    private String startDate;
    private String endDate;

    public List<String> getEquiNames() {
        return equiNames;
    }

    public void setEquiNames(List<String> equiNames) {
        this.equiNames = equiNames;
    }

    public String getStartDate() {
        return startDate;
    }

    public void setStartDate(String startDate) {
        this.startDate = startDate;
    }

    public String getEndDate() {
        return endDate;
    }

    public void setEndDate(String endDate) {
        this.endDate = endDate;
    }

}

第二种方法效果如图所示

查看原文

赞 1 收藏 0 评论 0

EVAO_大个子 发布了文章 · 2019-06-14

解决java.lang.NoClassDefFoundError

报错信息如下:

[ERROR]Activator initialize error : websocket
[ERROR]io/netty/util/concurrent/GenericFutureListener
java.lang.NoClassDefFoundError: io/netty/util/concurrent/GenericFutureListener
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    ……
Caused by: java.lang.ClassNotFoundException: io.netty.util.concurrent.GenericFutureListener
    at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1363)
    at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1186)
    ... 45 more

类找不到,应该是缺少了依赖,这个类对应的依赖可以从中得到它

https://mvnrepository.com/art...

你可以加

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

到pom文件或从站点下载jar并将其添加到类路径或lib文件夹

查看原文

赞 0 收藏 0 评论 0

EVAO_大个子 发布了文章 · 2019-06-14

大数据系列——Storm安装和API

1. 实时计算

  • 有别于传统的离线批处理操作(对很多数据的集合进行的操作)
  • 实时处理,说白就是针对一条一条的数据/记录进行操作
  • 实时计算计算的是无界数据

2. 有界数据和无界数据

2.1 有界数据

  • 离线计算面临的操作数据都是有界限的,无论是1G、1T、1P、1EB、1NB
  • 数据的有界必然会导致计算的有界

2.2 无界数据

  • 实时计算面临的操作数据是源源不断的向水流一样,是没有界限的
  • 数据的无界必然导致计算的无界

3. 计算中心和计算引擎

在大数据领域中存在三大计算中心三大计算引擎

3.1 三大计算中心

  • 离线计算计算中心(mapreduce)
  • 实时计算中心(storm flink...)
  • 准实时计算中心(spark)

3.2 三大计算引擎

  • 交互式查询计算引擎(hive sparksql)
  • 图计算计算引擎
  • 机器学习计算引擎

4. Storm简介

  • 免费 开源 分布式 实时计算系统
  • 处理无界的数据流
  • Tiwtter开源的cloujre
  • Storm能实现高频数据和大规模数据的实时处理
  • 官网资料显示storm的一个节点1秒钟能够处理100万个100字节的消息(IntelE5645@2.4Ghz的CPU,24GB的内存)
  • storm是毫秒级的实时处理框架

Apache Storm是Twitter开源的一个类似于Hadoop的实时数据处理框架,它原来是由BackType开发,后BackType被Twitter收购,将Storm作为Twitter的实时数据分析系统。

5. hadoop与storm的计算

  • 数据来源

    • hadoop

      • HADOOP处理的是HDFS上TB级别的数据(历史数据)
    • storm

      • STORM是处理的是实时新增的某一笔数据(实时数据)
  • 处理过程

    • hadoop

      • HADOOP是分MAP阶段到REDUCE阶段
      • HADOOP最后是要结束的
    • storm

      • STORM是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT)或处理逻辑(BOLT)
      • STORM是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始
  • 处理速度

    • hadoop

      • HADOOP是以处理HDFS上TB级别数据为目的,处理速度慢
    • storm

      • STORM是只要处理新增的某一笔数据即可,可以做到很快 (毫秒级的响应)
  • 适用场景

    • HADOOP是在要处理批量数据时用的 ,不讲究时效性
    • STORM是要处理某一新增数据时用的,要讲时效性

6. Storm的架构

  • Spout

    • Storm认为每个stream都有一个stream源,也就是原始元组的源头,所以它将这个源头称为Spout
    • 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息
  • Bolt

    • 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生新的输出数据流,可执行过滤,聚合,查询数据库等操作
  • 数据流
  • Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程.
  • Stream groupings: 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream grouping就是用来定义一个stream应该如何分配给Bolts们.

7. Storm集群的安装

  • 准备安装文件

    ​ apache-storm-1.0.2.tar.gz

  • 解压
[root@uplooking01 /soft]
    tar -zxvf apache-storm-1.0.2.tar.gz -C /opt
    mv apache-storm-1.0.2/ storm
  • 配置storm

storm-env.sh

[root@uplooking01 /soft]
    export JAVA_HOME=/opt/jdk
    export STORM_CONF_DIR="/opt/storm/conf"

storm.yaml

[root@uplooking01 /opt/storm/conf]


storm.zookeeper.servers:
  - "uplooking03"
  - "uplooking04"
  - "uplooking05"

#配置两个主节点,实现主节点的单点故障
nimbus.seeds: ["uplooking01", "uplooking02"]
storm.local.dir: "/opt/storm/storm-local"
#配置从节点的槽数
supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703
  • 分发到其他节点
[root@uplooking01 /]
    scp -r /opt/storm uplooking02:/opt
    scp -r /opt/storm uplooking03:/opt
    scp -r /opt/storm uplooking04:/opt
    scp -r /opt/storm uplooking05:/opt
  • 启动storm
[root@uplooking01 /]  
    #启动主进程和ui进程
    nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm ui >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking02 /]
    #启动主进程(numbus)
    nohup /opt/storm/bin/storm numbus >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
#启动从节点进程(supervisor)
[root@uplooking03 /]
    nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking04 /]
    nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking05 /]
    nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &

8. Storm集群的启动脚本

#!/bin/bash
#启动nimbus

for nimbusHost in  `cat /opt/shell/nimbus.host`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T  root@${nimbusHost}    << eeooff
    nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 &
eeooff
done

#启动supervisor
for supervisorHost in  `cat /opt/shell/supervisor.host`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T  root@${supervisorHost}    << eeooff
        nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
eeooff
done


#启动logviewer
for logviewerHost in  `cat /opt/shell/logviewer.host`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T  root@${logviewerHost}    << eeooff
        nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
eeooff
done


#启动ui
for uiHost in  `cat /opt/shell/ui.host`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T  root@${uiHost}    << eeooff
        nohup /opt/storm/bin/storm ui >/dev/null 2>&1 &
eeooff
done

9. Storm实现数字累加

  • 编写Spout
public class MySpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    //初始化累加的数字
    int num = 0;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        collector.emit(new Values(num));
        num++;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("mynum"));
    }
}
  • 编写Bolt
public class MyBolt extends BaseRichBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void execute(Tuple tuple) {
        Integer num = tuple.getIntegerByField("mynum");
        System.out.println(num);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}
  • 编写Topology
public class MyTopology {
    public static void main(String[] args) {
        //创建自定义的spout
        MySpout mySpout = new MySpout();
        //创建自定义的bolt
        MyBolt myBolt = new MyBolt();
        //创建topology名称
        String topologyName = "MyNumTopology";
        //创建topology的配置对象
        Map conf = new Config();

        //创建topology的构造器
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //为topology设置spout和bolt
        topologyBuilder.setSpout("myspout", mySpout);
        topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout");

        //创建本地的topology提交器
        StormTopology stormTopology = topologyBuilder.createTopology();
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology(topologyName, conf, stormTopology);
    }
}

10. 多个Bolt的问题

  • 定义下一个Bolt
public class MyBolt02 extends BaseRichBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void execute(Tuple tuple) {
        System.out.println(tuple.getIntegerByField("mynum02") + ".....");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}
  • 第一个Bolt中给第二个Bolt发射数据
public class MyBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        Integer num = tuple.getIntegerByField("mynum");
        System.out.println(num);
        collector.emit(new Values(num));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("mynum02"));
    }
}
  • 在Topology中配置第二个Bolt
public class MyTopology {
    public static void main(String[] args) {
        //创建自定义的spout
        MySpout mySpout = new MySpout();
        //创建自定义的bolt
        MyBolt myBolt = new MyBolt();

        MyBolt02 myBolt02 = new MyBolt02();
        //创建topology名称
        String topologyName = "MyNumTopology";
        //创建topology的配置对象
        Map conf = new Config();

        //创建topology的构造器
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //为topology设置spout和bolt
        topologyBuilder.setSpout("myspout", mySpout);
        topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout");
        topologyBuilder.setBolt("mybolt02", myBolt02).shuffleGrouping("mybolt");

        //创建本地的topology提交器
        StormTopology stormTopology = topologyBuilder.createTopology();
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology(topologyName, conf, stormTopology);
    }
}

11. 提交作业到集群

 StormSubmitter.submitTopology(topologyName, conf, stormTopology);

12. Storm的并行度

在storm中的并行度说的就是一个进程的运行需要多少个线程来参与,如果storm运行的线程个数+1,则并行度+1

Worker :

  • worker是一个进程级别的概念,可以通过jps查看的到
  • worker是一个Topology实例的子集,也就是说一个Topology的实例在supervisor中运行,可以在一个或者多个supervisor中启动一个或者多个worker进程
  • 一个worker进程只能为一个Topology实例服务
  • 所以Topology和worker的关系===>1:N
  • 进程是由多个线程来组成,这里的线程就是Executor
  • conf.setNumWorkers(int workers)
  • 所以worker和executor的关系===>1:N
  • 每一个executor线程具体干活是由一个个task任务的实例来完成的
  • 在builer.setSpout/setBolt的第三个参数设置
  • Task真正在topology干活的实例,一个executor线程,默认情况下对应了1个task的实例的
  • Executor和Task的关系===>1:N
  • builder.setSpout().setNumTasks(tasks)//设置的是spout对应的executor拥有几个task实例builder.setBolt().setNumTasks(tasks)//设置的是bolt对应的executor拥有几个task实例

13. Storm中的消息确认机制

  • 在spout中如果发送消息时指定messageId则代表开启消息确认机制,如果不指定messageID则代表不开启消息确认机制
  • 如果Spout中开启了消息确认机制则在bolt中需要用ack()方法来确认消息接收成功
  • 在Soput中重写响应的fail()和ack()方法来处理消息成功或者失败的回调逻辑
  • Storm默认如果不确认消息接收成功则30s之后返回消息失败
  • 消息确认机制要慎重使用(效率换取安全)
查看原文

赞 0 收藏 0 评论 0

EVAO_大个子 发布了文章 · 2019-06-14

大数据系列——shell的简单语法

1. Linux 简介

  • Linux内核最初只是由芬兰人李纳斯·托瓦兹(Linus Torvalds)在赫尔辛基大学上学时出于个人爱好而编写的
  • Linux是一套免费使用和自由传播的类Unix操作系统
  • Linux能运行主要的UNIX工具软件、应用程序和网络协议

2. Linux的发行版

  • Linux的发行版说简单点就是将Linux内核与应用软件做一个打
  • Ubuntu(图形化接口,个人用户操作比较良好)
  • RedHat(企业使用的比较多,图形化接口比较low,收费)
  • CentOS(免费 开源 图形化接口比较low)
  • Debian、Fedora、SuSE、OpenSUSE、Arch Linux、SolusOS

3. Linux应用领域

  • 作为企业服务器
  • 嵌入式

4. Linux vs Windows

比较WindowsLinux
界面界面统一,外壳程序固定所有Windows程序菜单几乎一致,快捷键也几乎相同图形界面风格因发行版不同而不同,可能互不兼容。GNU/Linux的终端机是从UNIX传承下来,基本命令和操作方法也几乎一致。
驱动程序驱动程序丰富,版本更新频繁。默认安装程序里面一般包含有该版本发布时流行的硬件驱动程序,之后所出的新硬件驱动依赖于硬件厂商提供。对于一些老硬件,如果没有了原配的驱动有时很难支持。另外,有时硬件厂商未提供所需版本的Windows下的驱动,也会比较头痛。由志愿者开发,由Linux核心开发小组发布,很多硬件厂商基于版权考虑并未提供驱动程序,尽管多数无需手动安装,但是涉及安装则相对复杂,使得新用户面对驱动程序问题(是否存在和安装方法)会一筹莫展。但是在开源开发模式下,许多老硬件尽管在Windows下很难支持的也容易找到驱动。HP、Intel、AMD等硬件厂商逐步不同程度支持开源驱动,问题正在得到缓解。
使用使用比较简单,容易入门。图形化界面对没有计算机背景知识的用户使用十分有利。图形界面使用简单,容易入门。文字界面,需要学习才能掌握。
学习系统构造复杂、变化频繁,且知识、技能淘汰快,深入学习困难。系统构造简单、稳定,且知识、技能传承性好,深入学习相对容易。
软件每一种特定功能可能都需要商业软件的支持,需要购买相应的授权。大部分软件都可以自由获取,同样功能的软件选择较少。

5. Linux的安装

刚开始已经安装过centeros6.5

6. Linux 系统启动过程

  • 内核的引导

    • 当计算机打开电源后,首先是BIOS开机自检,按照BIOS中设置的启动设备(通常是硬盘)来启动。

      操作系统接管硬件以后,首先读入 /boot 目录下的内核文件。

  • 运行 init

    • init 进程是系统所有进程的起点,你可以把它比拟成系统所有进程的老祖宗,没有这个进程,系统中任何进程都不会启动。

      init 程序首先是需要读取配置文件 /etc/inittab

  • 系统初始化

    • 在init的配置文件中有这么一行: si::sysinit:/etc/rc.d/rc.sysinit 它调用执行了/etc/rc.d/rc.sysinit,而rc.sysinit是一个bash shell的脚本,它主要是完成一些系统初始化的工作,rc.sysinit是每一个运行级别都要首先运行的重要脚本
  • 建立终端

    • rc执行完毕后,返回init。这时基本系统环境已经设置好了,各种守护进程也已经启动了。

      init接下来会打开6个终端,以便用户登录系统。在inittab中的以下6行就是定义了6个终端:

  • 用户登录系统

    • 命令行登录
    • ssh登录
    • 图形界面登录

7. Linux 系统目录结构

  • /bin bin是Binary的缩写, 这个目录存放着最经常使用的命令
  • /boot 这里存放的是启动Linux时使用的一些核心文件,包括一些连接文件以及镜像文件
  • /dev dev是Device(设备)的缩写, 该目录下存放的是Linux的外部设备,在Linux中访问设备的方式和访问文件的方式是相同的
  • /etc 这个目录用来存放所有的系统管理所需要的配置文件和子目录
  • /home 用户的主目录,在Linux中,每个用户都有一个自己的目录,一般该目录名是以用户的账号命名的
  • /lib 这个目录里存放着系统最基本的动态连接共享库,其作用类似于Windows里的DLL文件。几乎所有的应用程序都需要用到这些共享库
  • /lost+found 这个目录一般情况下是空的,当系统非法关机后,这里就存放了一些文件
  • /media inux系统会自动识别一些设备,例如U盘、光驱等等,当识别后,linux会把识别的设备挂载到这个目录下。
  • /mnt 系统提供该目录是为了让用户临时挂载别的文件系统的,我们可以将光驱挂载在/mnt/上,然后进入该目录就可以查看光驱里的内容了
  • /opt 这是给主机额外安装软件所摆放的目录。比如你安装一个ORACLE数据库则就可以放到这个目录下。默认是空的
  • /proc 这个目录是一个虚拟的目录,它是系统内存的映射,我们可以通过直接访问这个目录来获取系统信息。
  • /root 该目录为系统管理员,也称作超级权限者的用户主目录
  • /sbin s就是Super User的意思,这里存放的是系统管理员使用的系统管理程序
  • /selinux  这个目录是Redhat/CentOS所特有的目录,Selinux是一个安全机制,类似于windows的防火墙,但是这套机制比较复杂,这个目录就是存放selinux相关的文件的
  • /srv 该目录存放一些服务启动之后需要提取的数据
  • /sys  这是linux2.6内核的一个很大的变化。该目录下安装了2.6内核中新出现的一个文件系统 sysfs 。
  • /tmp 这个目录是用来存放一些临时文件的。
  • /usr 这是一个非常重要的目录,用户的很多应用程序和文件都放在这个目录下,类似于windows下的program files目录。
  • /usr/bin 系统用户使用的应用程序
  • /usr/sbin 超级用户使用的比较高级的管理程序和系统守护程序
  • /usr/src 内核源代码默认的放置目录。
  • /var 这个目录中存放着在不断扩充着的东西,我们习惯将那些经常被修改的目录放在这个目录下。包括各种日志文件

8. Linux中忘记密码

  • !进入单用户模式更改一下root密码即可
  • 3 秒之内要按一下回车
  • 然后输入a
  • 在 末尾输入 (single s S)其中的一个,有一个空格
  • 最后按回车启动,启动后就进入了单用户模式了
  • 此时已经进入到单用户模式了,你可以更改root密码了。更密码的命令为 passwd

9. Linux的远程登录

  • ssh(putty mutty Secure CRT xshell)

10. Linux 文件基本属性

  • 当为[ d ]则是目录
  • 当为[ - ]则是文件;
  • 若是[ l ]则表示为链接文档(link file); ln -s 源文件 连接文件

11. 文件的权限

  • chmod u+x file
  • chmod u=wx file
  • chmod 421 file

12. Shell简介

  • Shell 是一个用 C 语言编写的程序,它是用户使用 Linux 的桥梁
  • Shell 既是一种命令语言,又是一种程序设计语言
  • Shell 是指一种应用程序,这个应用程序提供了一个界面,用户通过这个界面访问操作系统内核的服务

13. Shell脚本

  • Shell 脚本(shell script),是一种为 shell 编写的脚本程序。

14. Shell 环境

  • 编辑器+解释器
  • 解释器

    • Bourne Shell(/usr/bin/sh或/bin/sh)
    • Bourne Again Shell(/bin/bash)
    • C Shell(/usr/bin/csh )
    • K Shell(/usr/bin/ksh)
    • Shell for Root(/sbin/sh)

15. Shell变量

  • 定义变量

    • 操作符左右不能有空格
  • 使用变量

    • 在变量名称之前添加$
  • 变量类型

    • 局部变量
    • 环境变量
    • shell变量

16. Shell 字符串

  • 单引号
    • 单引号中的变量都会原样输出,不会进行解析
  • 双引号
    • 双引号中的变量都会被解析

17. Shell 数组

  • 定义数组

    • ages=(12,14,16)

18.Shell参数传递

  • $n ---->n为参数的下标,从0开始,0代表脚本本身的名称
  • $# 参数的个数
  • $? 上一条命令执行的结果 返回0代表成功,其他代表失败

19. Shell中的运算符

  • 算术运算符

运算符说明举例
+加法expr $a + $b 结果为 30。
-减法expr $a - $b 结果为 -10。
*乘法expr $a \* $b 结果为 200。
/除法expr $b / $a 结果为 2。
%取余expr $b % $a 结果为 0。
=赋值a=$b 将把变量 b 的值赋给 a。
==相等。用于比较两个数字,相同则返回 true。[ $a == $b ] 返回 false。
!=不相等。用于比较两个数字,不相同则返回 true。[ $a != $b ] 返回 true。

条件表达式要放在方括号之间,并且要有空格,例如: [$a==$b] 是错误的,必须写成 [ $a == $b ]

  • 关系运算符

运算符说明举例
-eq检测两个数是否相等,相等返回 true。[ $a -eq $b ] 返回 false。
-ne检测两个数是否不相等,不相等返回 true。[ $a -ne $b ] 返回 true。
-gt检测左边的数是否大于右边的,如果是,则返回 true。[ $a -gt $b ] 返回 false。
-lt检测左边的数是否小于右边的,如果是,则返回 true。[ $a -lt $b ] 返回 true。
-ge检测左边的数是否大于等于右边的,如果是,则返回 true。[ $a -ge $b ] 返回 false。
-le检测左边的数是否小于等于右边的,如果是,则返回 true。[ $a -le $b ] 返回 true。
  • 布尔运算符

运算符说明举例
!非运算,表达式为 true 则返回 false,否则返回 true。[ ! false ] 返回 true。
-o或运算,有一个表达式为 true 则返回 true。[ $a -lt 20 -o $b -gt 100 ] 返回 true。
-a与运算,两个表达式都为 true 才返回 true。[ $a -lt 20 -a $b -gt 100 ] 返回 false。
  • 逻辑运算符

运算符说明举例
&&逻辑的 AND[[ $a -lt 100 && $b -gt 100 ]] 返回 false
\\逻辑的 OR[[ $a -lt 100$b -gt 100 ]] 返回 true

20. Shell 流程控制

  • if语句
if condition
then
    command1 
    command2
    ...
    commandN 
fi
  • if.else语句
if condition
then
    command1 
    command2
    ...
    commandN
else
    command
fi
  • if..elif..else
if condition1
then
    command1
elif condition2 
then 
    command2
else
    commandN
fi
  • for循环
for var in item1 item2 ... itemN
do
    command1
    command2
    ...
    commandN
done
  • for循环遍历数组
#!/bin/bash
ages=(12 14 16)
for element in ${ages[@]}
do
echo $element
done
  • while
while condition
do
    command
done
  • 无限循环
while :
do
    command
done

或者

while true
do
    command
done

21. Shell中的函数

  • 定义函数
[ function ] funname ()

{
    action;
    [return int;]
}
  • 调用函数

    函数名称

22. Shell 输入/输出重定向

输出重定向: command1 > file1
输出追加: command1 >> file1

23. 常用的Shell脚本

23.1 启动zookeeper集群

#!/bin/bash
#在zkHosts中配置的主机中启动zookeeper
for zkHost in  `cat ./zkHosts`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T  root@${zkHost}    << eeooff
         $ZOOKEEPER_HOME/bin/zkServer.sh  start
eeooff
done

23.2 停止zookeeper集群

#!/bin/bash
#在zkHosts中配置的主机中停止zookeeper
for zkHost in  `cat ./zkHosts`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T  root@${zkHost} >/dev/null 2>&1  << eeooff
         $ZOOKEEPER_HOME/bin/zkServer.sh  stop          
eeooff
done
查看原文

赞 0 收藏 0 评论 0

EVAO_大个子 发布了文章 · 2019-06-14

大数据系列——Redis学习笔记

1. Redis的简介

  •    Redis是一个开源(BSD许可),内存存储的数据结构服务器,可用作数据库,高速缓存和消息队列代理
  • 它支持字符串、哈希表、列表、集合、有序集合,位图,hyperloglogs等数据类型
  • 内置复制、Lua脚本、LRU收回、事务以及不同级别磁盘持久化功能,同时通过Redis Sentinel提供高可用,通过Redis Cluster提供自动分区。
  • 简言之,Redis是一种面向“键/值”对数据类型的内存数据库,可以满足我们对海量数据的快速读写需求

2. Redis的特点

  • Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用
  • Redis不仅仅支持简单的k-v类型的数据,同时还提供list,set,zset,hash等数据结构的存储
  • Redis支持数据的备份,即master-slave主从模式的数据备份

3. Redis的优势

  • 性能极高——Redis读的速度为11w/s,写的速度为8.1w/s
  • 丰富的数据类型 (Strings,Lists,Hashes,Sets即Ordered Sets )
  • 原子性——Redis的所有操作都是原子性的,同时Redis还支持对几个操作合并后的原子性执行
  • 丰富的特性——Redis还支持publish/subscribe,通知,key过期等特性。

4. 常用的NoSql数据库

  • hbase

    • 存储海量的数据(数十亿行,百万列)
  • redis

    • 基于内存的nosql数据库
    • 一般使用redis做缓存或者队列
  • mogodb

    • 在结构化数据比较简单的情况下,可以使用mogodb代替mysql,作为一个高性能的数据库

5. Redis的安装

5.1 Redis单机版的安装

  • 准备源码安装包

    ​ redis-3.2.0.tar.gz

  • 解压

    ​ tar -zxvf redis-3.2.0.tar.gz -C /opt/

  • 重命名

    ​ mv redis-3.2.0/ redis

  • 安装gcc编译器

    ​ yum -y install gcc

  • 编译安装源码

    ​ make && make install

    注意:编译安装完成之后会在src目录下出现redis的相关命令

  • 修改redis.conf的配置项

    bind node1   (默认是127.0.0.1,这是本地回环地址,只能本地应用程序之间进行通信)
    daemonize yes(后台运行)
    logfile   /opt/redis/logs/redis.log(日志文件,目录必须存在)    
  • 启动Redis-Server

    src/redis-server redis.conf

  • 启动redis客户端

    src/redis-cli -h uplooking05 -p 6379

5.2 Redis集群的安装

  • 把前面安装好的单机版的redis拷贝到uplooking04 uplooking03
  • 修改redis.conf

bind uplookingxxxxx

  • 在从节点(uplooking04 uplooking05) redis.conf 末尾追加

slaveof uplooking03 6379

  • 修改从节点为只读不写(redis官方推荐的)(uplooking04 uplooking05)

slave-read-only yes

  • 修改主节点(uplooking03)

slave-read-only no

ps:注意启动集群时尽量先启动主节点再启动从节点

6. Redis中配置的查看

Redis的配置文件位于Redis的安装目录之下,文件名为redis.conf。可以通过config命令来查看或设置配置项

config get confName: 获取配置的值

config set confName confVal: 设置配置的值

7. Redis中的数据类型

  • string(字符串)
  • hash(哈希)
  • list(列表)
  • set(集合)
  • zset(sorted set)有序的集合

8. 数据类型之String(字符串)

  • string是redis中最基本的数据类型
  • 一个key对应一个value
  • string类型的value是基于二进制存储的,所以完全可以存储图片和影音等二进制数据
  • key的大小最大512mb
  • 存储数据: set name value
  • 读取数据: get name
  • 删除redis中的key说对应的value: del key

9. 数据类型之Hash(哈希)

  • Redis hash 是一个键值对集合。Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象 object
  • 存储一个数据: HSET KEY fieldName value
  • 获取数据: HGET KEY fieldName
  • 设置多个数据: HMSET info sex nan address gansu
  • 获取多个数据: HMGET info sex address
  • 删除hash中的多个个键值对: HDEL info admin ...
  • 删除redis中的key说对应的value: del key

10. 数据类型之List(列表)

Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)

  • 首部添加数据: LPUSHloves java c++ python..
  • 尾部添加元素: RPUSH linux hadoop..
  • 查看数据: lrange loves 0 10000
  • 首部移除元素: LPOP loves
  • 尾部移除元素: RPOP loves

列表最多可存储 232-1元素 (4294967295, 每个列表可存储40多亿)。

11. 数据类型之Set(集合)

Redis的Set是string类型的无序集合。

  • 集合中添加元素: SADD key member member ,member ...
  • SMEMBERS loves
  • 集合中的元素是不能重复,如果添加的value已经存在在redis中则返回0,不存在则返回1

12. 数据类型之ZSet(有序集合)

  • Redis zset和set一样也是string类型元素的集合,且不允许重复的成员。不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序
  • zset的成员是唯一的,但分数(score)却可以重复
  • 给zset添加数据: ZADD loves 5 java
  • 获取固定得分范围的数据: ZRANGEBYSCORE loves 1 20

13. Redis中的常用命令

13.1 键值对相关

  • keys * 取出当前所有的key
  • exists name 查看redis是否有name这个key
  • del name 删除key name
  • expire confirm 100 设置confirm这个key100秒过期
  • ttl confirm 获取confirm 这个key的有效时长
  • persist confirm 移除confirm这个key的过期时间
  • select 0 (在redis中总共有16个数据库,默认是0号数据库)
  • move confirm 1 将当前数据库中的key移动到其他的数据库中
  • randomkey 随机返回数据库里面的一个key
  • rename key2 key3 重命名key2 为key3
  • type key2 返回key的数据类型

13.2 服务器相关

  • select 0~15 编号的数据库
  • quit /exit 退出客户端
  • dbsize 返回当前数据库中所有key的数量
  • info 返回redis的相关信息
  • ==flushdb 删除当前选择数据库中的所有key==
  • ==flushall 删除所有数据库中的数据==

14. Reds中的API操作

  • pom依赖

    <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.0</version>
            </dependency>
  • 常用操作

    package com.uplooking.bigdata.redis;
    
    import org.junit.Test;
    import redis.clients.jedis.Jedis;
    
    public class RedisTest {
        @Test
        public void testString() {
            //创建Jedis
            Jedis jedis = new Jedis("uplooking03");
            jedis.select(0);
            jedis.set("name", "value");
            System.out.println(jedis.get("name"));
        }
    
    
        @Test
        public void testHash() {
            //创建Jedis
            Jedis jedis = new Jedis("uplooking03");
            jedis.select(0);
            jedis.hset("info", "name", "admin");
            jedis.hset("info", "age", 12 + "");
            System.out.println(jedis.hget("info", "name"));
        }
    
    
        @Test
        public void testList() {
            //创建Jedis
            Jedis jedis = new Jedis("uplooking03");
            jedis.select(0);
            jedis.rpush("loves", "java", "c++", "python");
            jedis.lpop("loves");
            System.out.println(jedis.lrange("loves", 0, 10000));
            System.out.println(jedis.rpop("loves"));
            System.out.println(jedis.lrange("loves", 0, 10000));
        }
    }
    

15. Redis可视化工具

RedisDesktopManager

查看原文

赞 1 收藏 1 评论 0

EVAO_大个子 发布了文章 · 2019-06-14

大数据系列——kafka学习笔记

1. 大数据领域数据类型

1.1 有界数据

一般批处理(一个文件 或者一批文件),不管文件多大,都是可以度量

mapreduce hive sparkcore sparksql

1.2 无界数据

源源不断的流水一样 (流数据)

Storm SparkStreaming

2. 消息队列(Message Queue)

  • 消息 Message

    • 网络中的两台计算机或者两个通讯设备之间传递的数据,例如说:文本、音乐、视频等内容
  • 队列 Queue

    • 一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部移除元素和在尾部追加元素。入队、出队。
  • 消息队列 MQ

    • 消息+队列
    • 保存消息的队列
    • 消息的传输过程中的容器
    • 主要提供生产、消费接口供外部调用做数据的存储和获取

3. 消息队列的分类

3.1 点对点(P2P)

  • 一个生产者生产的消息只能被一个消费者消费

3.2 发布订阅(Pub/Sub)

消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber)

  • 消息的发布者
  • 消息的订阅者

    每个消息可以有多个消费者,彼此互不影响。比如我发布一个微博:关注我的人都能够看到。

4. Kafka的简介

  • 在大数据领域呢,为了满足日益增长的数据量,也有一款可以满足百万级别消息的生成和消费,分布式、持久稳定的产品——Kafka
  • Kafka是分布式的发布—订阅消息系统(基于PS的一个消息队列)
  • 它最初由LinkedIn(领英)公司发布,使用Scala语言编写
  • Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统
  • 它主要用于处理活跃的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据

5. Kafka的特点

  • 高吞吐量

    • 可以满足每秒百万级别消息的生产和消费(生产消费 )
  • 持久性

    • 有一套完善的消息存储机制,确保数据的高效安全的持久化 (数据的存储)
  • 分布式

    • 基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体健壮性

6. Kafka的组件

  • 一个消息队列需要哪些部分?

    • 生产
    • 消费
    • 消息类别
    • 存储等等
  • Topic(主题)

    • Kafka处理的消息的不同分类
  • Broker (消息代理)

    • Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据,存在硬盘中。每个topic都是有分区的
  • Partition (物理上的分区)

    • 一个topic在broker中被分为1个或者多个partition,分区在创建topic的时候指定
  • Message (消息)

    • 消息,是通信的基本单位,每个消息都属于一个partition

7. Kafka的服务

  • Producer : 消息和数据的生产者,向Kafka的一个topic发布消息
  • Consumer :消息和数据的消费者,定于topic并处理其发布的消息
  • Zookeeper :协调kafka的正常运行

8. Kafka的安装

8.1 单机版的安装

  • 准备kafka

    • kafka_2.10-0.10.0.1.tgz
  • 解压kafka

    • tar -zxvf kafka_2.10-0.10.0.1.tgz -C /opt/
  • 重命名

    • mv kafka_2.10-0.10.0.1.tgz kafka
  • 配置环境变量

    export KAFKA_HOME=/opt/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
  • 编辑server.properties

    broker.id=1
    log.dirs=/opt/kafka/logs
    zookeeper.connect=uplooking03:2181,uplooking04:2181,uplooking05:2181
    listeners=PLAINTEXT://:9092          
  • 启动kafka-server服务

    kafka-server-start.sh [-daemon] server.properties
  • 停止kafka服务

     kafka-server-stop.sh

8.2 集群的安装

只需要在每个机器上修改对应的 ==broker.id=1== 即可

9. Kafka中Topic的操作

  • 创建topic

    kafka-topics.sh  --create --topic t1 --partitions 3 --replication-factor 1  --zookeeper uplooking03:2181,uplooking04:2181

    ==注意: 创建topic过程的问题,replication-factor个数不能超过brokerserver的个数==

  • 查看topic

    kafka-topics.sh  --list --zookeeper uplooking03
  • 查看具体topic的详情

    kafka-topics.sh  --describe --topic t1 --zookeeper uplooking04:2181
    PartitionCount:topic对应的partition的个数
    ReplicationFactor:topic对应的副本因子,说白就是副本个数
    Partition:partition编号,从0开始递增
    Leader:当前partition起作用的breaker.id
    Replicas: 当前副本数据存在的breaker.id,是一个列表,排在最前面的其作用
    Isr:当前kakfa集群中可用的breaker.id列表    
  • 修改topic(不能修改replication-factor,以及只能对partition个数进行增加,不能减少 )

    kafka-topics.sh --alter --topic t1 --partitions 4 --zookeeper uplooking03
  • 删除Topic

    kafka-topics.sh --delete --topic t1 --zookeeper uplooking03

    ps:这种删除只是标记删除,要想彻底删除必须设置一个属性,在server.properties中配置delete.topic.enable=true,否则只是标记删除

    配置完成之后,需要重启kafka服务

10. Kafka中的生产者和消费者接口

  • 自己写代码实现kafka提供的消息生产和消费的接口
  • kafka自身也实现了自身的生产和消费的接口,给出了两个工具(kafka-console-producer.sh , kafka-console-consumer.sh)

11. Kafka自带的生产和消费消息的工具

11.1 kafka-console-producer.sh(生产工具)

kafka-console-producer.sh --topic t1  --broker-list uplooking03:9092,uploo
king04:9092,uplooking05:9092

11.2 kafka-console-consumer.sh(消费工具)

kafka-console-consumer.sh  --zookeeper uplooking03 --topic t1
--from-beginning:从头开始消费
--blacklist:黑名单过滤(kafka-console-consumer.sh  --zookeeper uplooking03   --blacklist t1,t3)
--whitelist:白名单过滤(kafka-console-consumer.sh  --zookeeper uplooking03   --whitelist t2)    

ps:--topic|--blacklist|--whitelist 只能出现其中一个

12. ==Flume与Kafka的整合==

  • 配置flume的agent配置文件

    touch flume-kafka.properties

    # 对各个组件的描述说明
    # 其中a1为agent的名字
    # r1是a1的source的代号名字
    # c1是a1的channel的代号名字
    # k1是a1的sink的代号名字
    ############################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 用于描述source的,类型是netcat网络
    a1.sources.r1.type = netcat
    # source监听的网络ip地址和端口号
    a1.sources.r1.bind = uplooking01
    a1.sources.r1.port = 44444
    
    
    
    # 用于描述sink,类型是kafka
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = hadoop
    a1.sinks.k1.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 2
    
    
    # 用于描述channel,在内存中做数据的临时的存储
    a1.channels.c1.type = memory
    # 该内存中最大的存储容量,1000个events事件
    a1.channels.c1.capacity = 1000
    # 能够同时对100个events事件监管事务
    a1.channels.c1.transactionCapacity = 100
    
    
    # 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • 启动flume开始采集数据

    [root@uplooking01:/opt/flume/conf]
        flume-ng agent --name a1 --conf-file flume-kafka.properties
  • 开启Kafka消息消费工具

    [root@uplooking03:/opt/flume/conf]
        kafka-console-consumer.sh  --zookeeper uplooking03 --topic hadoop
  • 给flume监听的Source发送数据

    [root@uplooking03:/]
        nc uplooking01 44444
  • 现在就可以到kafka的消费工具(kafka-console-consumer.sh)中区查看nc发送的数据

13. Kafka的API操作(生产者和消费者)

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>
  <version>0.10.0.1</version>
</dependency>

13.1 Kafka的生产者

  • 创建生产者的配置文件 producer.properties

    bootstrap.servers=uplooking03:9092,uplooking04:9092,uplooking05:9092
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
  • 创建生产者并且发送数据到topic中

    public class MyKafkaProducer {
        public static void main(String[] args) throws IOException {
            Properties prop = new Properties();
            prop.load(MyKafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(prop);
            kafkaProducer.send(new ProducerRecord<String, String>("hadoop", "name", "admin123"));
            kafkaProducer.close();
        }
    }

13.2 Kafka的消费者

  • 创建消费者的配置文件consumer.properties

    zookeeper.connect=uplooking03:2181,uplooking04:2181,uplooking05:2181
    group.id=test-consumer-group
    bootstrap.servers=uplooking03:9092,uplooking04:9092,uplooking05:9092
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  • 创建消息消费者消费topic中的数据

    public static void main(String[] args) throws Exception {
        Properties prop = new Properties();
        prop.load(MyKafkaConsumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(prop);
        Collection topics = new ArrayList();
        topics.add("hadoop");
        kafkaConsumer.subscribe(topics);
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
  • 自定义分区(MyCustomPartition)

    package com.uplooking.bigdata.kafka.partition;
    public class MyCustomPartition implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)  {
    //获取分区数,    分区编号一般都是从0开始
    int partitionSize = cluster.partitionCountForTopic(topic);
    int keyHash = Math.abs(key.hashCode());
    int valueHash = Math.abs(value.hashCode());
    return keyHash % partitionSize;
    }
    public void close() {
    }
    public void configure(Map<String, ?> configs) {
    }
    }

    配置自定义分区(producer.properties)

    partitioner.class=com.uplooking.bigdata.kafka.partition.MyCustomPartition
查看原文

赞 0 收藏 0 评论 0

EVAO_大个子 发布了文章 · 2019-06-14

大数据系列——Flume入门和认识

1. Flume简介

  • Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统
  • 支持在日志系统中定制各类数据发送方,用于收集数据
  • Flume提供对数据进行简单处理,并写到各种数据接收方

2. Flume OG 与Flume NG

Flume OG:Flume original generation,即Flume0.9x版本

Flume NG:Flume next generation,即Flume1.x版本

3. Flume体系结构

flume的事件(agent)

Source: 用来定义采集系统的源头

Channel: 把Source采集到的日志进行传输,处理

Sink:定义数据的目的地

4. Flume的安装

  • 准备安装文件

    • apache-flume-1.6.0-bin.tar.gz
  • 解压

    • tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /opt/
  • 重命名

    • mv apache-flume-1.6.0-bin flume
  • 添加环境变量

    #配置Flume的环境变量
    export FLUME_HOME=/opt/flume
    export PATH=$PATH:$FLUME_HOME/bin
  • 配置文件

    [root@uplooking01 /opt/flume/conf]
        mv flume-env.sh.template flume-env.sh
    [root@uplooking01 /opt/flume/conf/flume-env.sh]
        export JAVA_HOME=/opt/jdk        

5. Flume采集网络端口数据

5.1 定义flume的事件配置文件

flume-nc.properties

# flume-nc.conf: 用于监听网络数据的flume agent实例的配置文件
############################################
# 对各个组件的描述说明
# 其中a1为agent的名字
# r1是a1的source的代号名字
# c1是a1的channel的代号名字
# k1是a1的sink的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 用于描述source的,类型是netcat网络
a1.sources.r1.type = netcat
# source监听的网络ip地址和端口号
a1.sources.r1.bind = uplooking01
a1.sources.r1.port = 44444

# 用于描述sink,类型是日志格式
a1.sinks.k1.type = logger


# 用于描述channel,在内存中做数据的临时的存储
a1.channels.c1.type = memory
# 该内存中最大的存储容量,1000个events事件
a1.channels.c1.capacity = 1000
# 能够同时对100个events事件监管事务
a1.channels.c1.transactionCapacity = 100

# 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 启动agent(flume事件)

    flume-ng agent --name a1 --conf /opt/flume/conf --conf-file /opt/flume/conf/flume-nc.properties -Dflume.root.logger=INFO,console

  • 启动agent(flume事件) ,简单化

    flume-ng agent --name a1 --conf-file /opt/flume/conf/flume-nc.properties

6. 安装nc(瑞士军刀)

rpm -ivh nc-1.84-22.el6.x86_64.rpm

监听端口: nc -lk 4444

连接主机的端口: nc host port

7. 查看文件的命令

cat xxx: 查看文件的全部内容

head -100 xxx:查看文件内容开始的100行(默认不加-100是查看10行)

tail -100 xxx:查看文件内容末尾的100行(默认不加-100是查看10行)

tail -f xxx:查看文件内容末尾10行,并且监听文件的变化

8. Flume基本的组件

8.1 Source

  • 从外界采集各种类型的数据,将数据传递给Channel
  • 常见采集的数据类型

    • NetCat Source
    • Spooling Directory Source }
    • Exec Source
    • Kafka Source (后面说)
    • HTTP Source

8.2 Channel

  • 一个数据的存储池,中间通道
  • 接受source传出的数据,向sink指定的目的地传输。Channel中的数据直到进入到下一个channel中或者进入终端才会被删除。当sink写入失败后,可以自动重写,不会造成数据丢失,因此很可靠
  • channel的类型 :

    • Memory Channel (常用) 使用内存作为数据的存储。速度快
    • File Channel 使用文件来作为数据的存储。安全可靠
    • Spillable Memory Channel 用内存和文件作为数据的存储,即:先存在内存中,如果内存中数据达到阀值则flush到文件中
    • JDBC Channel 使用jdbc数据源来作为数据的存储。
    • Kafka Channel 使用kafka服务来作为数据的存储

8.3 Sink

  • 数据的最终的目的地
  • 接受channel写入的数据以指定的形式表现出来(或存储或展示)。

    sink的表现形式很多比如:打印到控制台、hdfs上、avro服务中、文件中等

  • 常见采集的数据类型

    • HDFS Sink
    • Hive Sink
    • Logger Sink
    • Avro Sink
    • HBaseSink
    • File Roll Sink

8.4 Event

event是Flume NG传输的数据的基本单位,也是事务的基本单位

在文本文件,通常是一行记录就是一个event。

网络消息传输系统中,一条消息就是一个event。

event里有header、body

Event里面的header类型:Map<String, String>

我们可以在source中自定义header的key:value,在某些channel和sink中使用header

9. Flume监听命令的执行结果

配置agent

############################################
# 对各个组件的描述说明
# 其中a1为agent的名字
# r1是a1的source的代号名字
# c1是a1的channel的代号名字
# k1是a1的sink的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 用于描述source的,类型是linux命令
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F  /opt/flume/conf/logs/test01.txt

# 用于描述sink,类型是日志格式
a1.sinks.k1.type = logger

# 用于描述channel,在内存中做数据的临时的存储
a1.channels.c1.type = memory
# 该内存中最大的存储容量,1000个events事件
a1.channels.c1.capacity = 1000
# 能够同时对100个events事件监管事务
a1.channels.c1.transactionCapacity = 100

# 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

运行agent,开始采集数据

flume-ng agent --name a1 --conf-file flume-exec.properties

10. 采集目录中新增文件的数据

配置agent

############################################
# 对各个组件的描述说明
# 其中a1为agent的名字
# r1是a1的source的代号名字
# c1是a1的channel的代号名字
# k1是a1的sink的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 用于描述source的,监听的是一个目录
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/flume/conf/logs/aa
a1.sources.r1.fileSuffix = .OK
a1.sources.r1.deletePolicy = never
a1.sources.r1.fileHeader = true

# 用于描述sink,类型是日志格式
a1.sinks.k1.type = logger

# 用于描述channel,在内存中做数据的临时的存储
a1.channels.c1.type = memory
# 该内存中最大的存储容量,1000个events事件
a1.channels.c1.capacity = 1000
# 能够同时对100个events事件监管事务
a1.channels.c1.transactionCapacity = 100

# 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1    

运行agent,开始采集数据

flume-ng agent --name a1 --conf-file flume-dir.properties

11. 后台运行flume

nohup flume-ng agent --name a1 --conf-file flume-dir.properties >dev/null 2>&1 &

查看原文

赞 1 收藏 0 评论 0

认证与成就

  • 获得 19 次点赞
  • 获得 1 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2019-06-05
个人主页被 1.3k 人浏览