黑白影

黑白影 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 个人简介什么都没有

个人动态

黑白影 发布了文章 · 2019-09-21

Hadoop 系列(八)—— 基于 ZooKeeper 搭建 Hadoop 高可用集群

一、高可用简介

Hadoop 高可用 (High Availability) 分为 HDFS 高可用和 YARN 高可用,两者的实现基本类似,但 HDFS NameNode 对数据存储及其一致性的要求比 YARN ResourceManger 高得多,所以它的实现也更加复杂,故下面先进行讲解:

1.1 高可用整体架构

HDFS 高可用架构如下:

https://github.com/heibaiying

图片引用自:https://www.edureka.co/blog/h...

HDFS 高可用架构主要由以下组件所构成:

  • Active NameNode 和 Standby NameNode:两台 NameNode 形成互备,一台处于 Active 状态,为主 NameNode,另外一台处于 Standby 状态,为备 NameNode,只有主 NameNode 才能对外提供读写服务。
  • 主备切换控制器 ZKFailoverController:ZKFailoverController 作为独立的进程运行,对 NameNode 的主备切换进行总体控制。ZKFailoverController 能及时检测到 NameNode 的健康状况,在主 NameNode 故障时借助 Zookeeper 实现自动的主备选举和切换,当然 NameNode 目前也支持不依赖于 Zookeeper 的手动主备切换。
  • Zookeeper 集群:为主备切换控制器提供主备选举支持。
  • 共享存储系统:共享存储系统是实现 NameNode 的高可用最为关键的部分,共享存储系统保存了 NameNode 在运行过程中所产生的 HDFS 的元数据。主 NameNode 和 NameNode 通过共享存储系统实现元数据同步。在进行主备切换的时候,新的主 NameNode 在确认元数据完全同步之后才能继续对外提供服务。
  • DataNode 节点:除了通过共享存储系统共享 HDFS 的元数据信息之外,主 NameNode 和备 NameNode 还需要共享 HDFS 的数据块和 DataNode 之间的映射关系。DataNode 会同时向主 NameNode 和备 NameNode 上报数据块的位置信息。

1.2 基于 QJM 的共享存储系统的数据同步机制分析

目前 Hadoop 支持使用 Quorum Journal Manager (QJM) 或 Network File System (NFS) 作为共享的存储系统,这里以 QJM 集群为例进行说明:Active NameNode 首先把 EditLog 提交到 JournalNode 集群,然后 Standby NameNode 再从 JournalNode 集群定时同步 EditLog,当 Active NameNode 宕机后, Standby NameNode 在确认元数据完全同步之后就可以对外提供服务。

需要说明的是向 JournalNode 集群写入 EditLog 是遵循 “过半写入则成功” 的策略,所以你至少要有 3 个 JournalNode 节点,当然你也可以继续增加节点数量,但是应该保证节点总数是奇数。同时如果有 2N+1 台 JournalNode,那么根据过半写的原则,最多可以容忍有 N 台 JournalNode 节点挂掉。

https://github.com/heibaiying

1.3 NameNode 主备切换

NameNode 实现主备切换的流程下图所示:

https://github.com/heibaiying

  1. HealthMonitor 初始化完成之后会启动内部的线程来定时调用对应 NameNode 的 HAServiceProtocol RPC 接口的方法,对 NameNode 的健康状态进行检测。
  2. HealthMonitor 如果检测到 NameNode 的健康状态发生变化,会回调 ZKFailoverController 注册的相应方法进行处理。
  3. 如果 ZKFailoverController 判断需要进行主备切换,会首先使用 ActiveStandbyElector 来进行自动的主备选举。
  4. ActiveStandbyElector 与 Zookeeper 进行交互完成自动的主备选举。
  5. ActiveStandbyElector 在主备选举完成后,会回调 ZKFailoverController 的相应方法来通知当前的 NameNode 成为主 NameNode 或备 NameNode。
  6. ZKFailoverController 调用对应 NameNode 的 HAServiceProtocol RPC 接口的方法将 NameNode 转换为 Active 状态或 Standby 状态。

1.4 YARN高可用

YARN ResourceManager 的高可用与 HDFS NameNode 的高可用类似,但是 ResourceManager 不像 NameNode ,没有那么多的元数据信息需要维护,所以它的状态信息可以直接写到 Zookeeper 上,并依赖 Zookeeper 来进行主备选举。

https://github.com/heibaiying

二、集群规划

按照高可用的设计目标:需要保证至少有两个 NameNode (一主一备) 和 两个 ResourceManager (一主一备) ,同时为满足“过半写入则成功”的原则,需要至少要有 3 个 JournalNode 节点。这里使用三台主机进行搭建,集群规划如下:

https://github.com/heibaiying

三、前置条件

四、集群配置

4.1 下载并解压

下载 Hadoop。这里我下载的是 CDH 版本 Hadoop,下载地址为:http://archive.cloudera.com/c...

# tar -zvxf hadoop-2.6.0-cdh5.15.2.tar.gz 

4.2 配置环境变量

编辑 profile 文件:

# vim /etc/profile

增加如下配置:

export HADOOP_HOME=/usr/app/hadoop-2.6.0-cdh5.15.2
export  PATH=${HADOOP_HOME}/bin:$PATH

执行 source 命令,使得配置立即生效:

# source /etc/profile

4.3 修改配置

进入 ${HADOOP_HOME}/etc/hadoop 目录下,修改配置文件。各个配置文件内容如下:

1. hadoop-env.sh

# 指定JDK的安装位置
export JAVA_HOME=/usr/java/jdk1.8.0_201/

2. core-site.xml

<configuration>
    <property>
        <!-- 指定 namenode 的 hdfs 协议文件系统的通信地址 -->
        <name>fs.defaultFS</name>
        <value>hdfs://hadoop001:8020</value>
    </property>
    <property>
        <!-- 指定 hadoop 集群存储临时文件的目录 -->
        <name>hadoop.tmp.dir</name>
        <value>/home/hadoop/tmp</value>
    </property>
    <property>
        <!-- ZooKeeper 集群的地址 -->
        <name>ha.zookeeper.quorum</name>
        <value>hadoop001:2181,hadoop002:2181,hadoop002:2181</value>
    </property>
    <property>
        <!-- ZKFC 连接到 ZooKeeper 超时时长 -->
        <name>ha.zookeeper.session-timeout.ms</name>
        <value>10000</value>
    </property>
</configuration>

3. hdfs-site.xml

<configuration>
    <property>
        <!-- 指定 HDFS 副本的数量 -->
        <name>dfs.replication</name>
        <value>3</value>
    </property>
    <property>
        <!-- namenode 节点数据(即元数据)的存放位置,可以指定多个目录实现容错,多个目录用逗号分隔 -->
        <name>dfs.namenode.name.dir</name>
        <value>/home/hadoop/namenode/data</value>
    </property>
    <property>
        <!-- datanode 节点数据(即数据块)的存放位置 -->
        <name>dfs.datanode.data.dir</name>
        <value>/home/hadoop/datanode/data</value>
    </property>
    <property>
        <!-- 集群服务的逻辑名称 -->
        <name>dfs.nameservices</name>
        <value>mycluster</value>
    </property>
    <property>
        <!-- NameNode ID 列表-->
        <name>dfs.ha.namenodes.mycluster</name>
        <value>nn1,nn2</value>
    </property>
    <property>
        <!-- nn1 的 RPC 通信地址 -->
        <name>dfs.namenode.rpc-address.mycluster.nn1</name>
        <value>hadoop001:8020</value>
    </property>
    <property>
        <!-- nn2 的 RPC 通信地址 -->
        <name>dfs.namenode.rpc-address.mycluster.nn2</name>
        <value>hadoop002:8020</value>
    </property>
    <property>
        <!-- nn1 的 http 通信地址 -->
        <name>dfs.namenode.http-address.mycluster.nn1</name>
        <value>hadoop001:50070</value>
    </property>
    <property>
        <!-- nn2 的 http 通信地址 -->
        <name>dfs.namenode.http-address.mycluster.nn2</name>
        <value>hadoop002:50070</value>
    </property>
    <property>
        <!-- NameNode 元数据在 JournalNode 上的共享存储目录 -->
        <name>dfs.namenode.shared.edits.dir</name>
        <value>qjournal://hadoop001:8485;hadoop002:8485;hadoop003:8485/mycluster</value>
    </property>
    <property>
        <!-- Journal Edit Files 的存储目录 -->
        <name>dfs.journalnode.edits.dir</name>
        <value>/home/hadoop/journalnode/data</value>
    </property>
    <property>
        <!-- 配置隔离机制,确保在任何给定时间只有一个 NameNode 处于活动状态 -->
        <name>dfs.ha.fencing.methods</name>
        <value>sshfence</value>
    </property>
    <property>
        <!-- 使用 sshfence 机制时需要 ssh 免密登录 -->
        <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/root/.ssh/id_rsa</value>
    </property>
    <property>
        <!-- SSH 超时时间 -->
        <name>dfs.ha.fencing.ssh.connect-timeout</name>
        <value>30000</value>
    </property>
    <property>
        <!-- 访问代理类,用于确定当前处于 Active 状态的 NameNode -->
        <name>dfs.client.failover.proxy.provider.mycluster</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
    <property>
        <!-- 开启故障自动转移 -->
        <name>dfs.ha.automatic-failover.enabled</name>
        <value>true</value>
    </property>
</configuration>

4. yarn-site.xml

<configuration>
    <property>
        <!--配置 NodeManager 上运行的附属服务。需要配置成 mapreduce_shuffle 后才可以在 Yarn 上运行 MapReduce 程序。-->
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <!-- 是否启用日志聚合 (可选) -->
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
    </property>
    <property>
        <!-- 聚合日志的保存时间 (可选) -->
        <name>yarn.log-aggregation.retain-seconds</name>
        <value>86400</value>
    </property>
    <property>
        <!-- 启用 RM HA -->
        <name>yarn.resourcemanager.ha.enabled</name>
        <value>true</value>
    </property>
    <property>
        <!-- RM 集群标识 -->
        <name>yarn.resourcemanager.cluster-id</name>
        <value>my-yarn-cluster</value>
    </property>
    <property>
        <!-- RM 的逻辑 ID 列表 -->
        <name>yarn.resourcemanager.ha.rm-ids</name>
        <value>rm1,rm2</value>
    </property>
    <property>
        <!-- RM1 的服务地址 -->
        <name>yarn.resourcemanager.hostname.rm1</name>
        <value>hadoop002</value>
    </property>
    <property>
        <!-- RM2 的服务地址 -->
        <name>yarn.resourcemanager.hostname.rm2</name>
        <value>hadoop003</value>
    </property>
    <property>
        <!-- RM1 Web 应用程序的地址 -->
        <name>yarn.resourcemanager.webapp.address.rm1</name>
        <value>hadoop002:8088</value>
    </property>
    <property>
        <!-- RM2 Web 应用程序的地址 -->
        <name>yarn.resourcemanager.webapp.address.rm2</name>
        <value>hadoop003:8088</value>
    </property>
    <property>
        <!-- ZooKeeper 集群的地址 -->
        <name>yarn.resourcemanager.zk-address</name>
        <value>hadoop001:2181,hadoop002:2181,hadoop003:2181</value>
    </property>
    <property>
        <!-- 启用自动恢复 -->
        <name>yarn.resourcemanager.recovery.enabled</name>
        <value>true</value>
    </property>
    <property>
        <!-- 用于进行持久化存储的类 -->
        <name>yarn.resourcemanager.store.class</name>
        <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
    </property>
</configuration>

5. mapred-site.xml

<configuration>
    <property>
        <!--指定 mapreduce 作业运行在 yarn 上-->
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

5. slaves

配置所有从属节点的主机名或 IP 地址,每行一个。所有从属节点上的 DataNode 服务和 NodeManager 服务都会被启动。

hadoop001
hadoop002
hadoop003

4.4 分发程序

将 Hadoop 安装包分发到其他两台服务器,分发后建议在这两台服务器上也配置一下 Hadoop 的环境变量。

# 将安装包分发到hadoop002
scp -r /usr/app/hadoop-2.6.0-cdh5.15.2/  hadoop002:/usr/app/
# 将安装包分发到hadoop003
scp -r /usr/app/hadoop-2.6.0-cdh5.15.2/  hadoop003:/usr/app/

五、启动集群

5.1 启动ZooKeeper

分别到三台服务器上启动 ZooKeeper 服务:

 zkServer.sh start

5.2 启动Journalnode

分别到三台服务器的的 ${HADOOP_HOME}/sbin 目录下,启动 journalnode 进程:

hadoop-daemon.sh start journalnode

5.3 初始化NameNode

hadop001 上执行 NameNode 初始化命令:

hdfs namenode -format

执行初始化命令后,需要将 NameNode 元数据目录的内容,复制到其他未格式化的 NameNode 上。元数据存储目录就是我们在 hdfs-site.xml 中使用 dfs.namenode.name.dir 属性指定的目录。这里我们需要将其复制到 hadoop002 上:

 scp -r /home/hadoop/namenode/data hadoop002:/home/hadoop/namenode/

5.4 初始化HA状态

在任意一台 NameNode 上使用以下命令来初始化 ZooKeeper 中的 HA 状态:

hdfs zkfc -formatZK

5.5 启动HDFS

进入到 hadoop001${HADOOP_HOME}/sbin 目录下,启动 HDFS。此时 hadoop001hadoop002 上的 NameNode 服务,和三台服务器上的 DataNode 服务都会被启动:

start-dfs.sh

5.6 启动YARN

进入到 hadoop002${HADOOP_HOME}/sbin 目录下,启动 YARN。此时 hadoop002 上的 ResourceManager 服务,和三台服务器上的 NodeManager 服务都会被启动:

start-yarn.sh

需要注意的是,这个时候 hadoop003 上的 ResourceManager 服务通常是没有启动的,需要手动启动:

yarn-daemon.sh start resourcemanager

六、查看集群

6.1 查看进程

成功启动后,每台服务器上的进程应该如下:

[root@hadoop001 sbin]# jps
4512 DFSZKFailoverController
3714 JournalNode
4114 NameNode
3668 QuorumPeerMain
5012 DataNode
4639 NodeManager


[root@hadoop002 sbin]# jps
4499 ResourceManager
4595 NodeManager
3465 QuorumPeerMain
3705 NameNode
3915 DFSZKFailoverController
5211 DataNode
3533 JournalNode


[root@hadoop003 sbin]# jps
3491 JournalNode
3942 NodeManager
4102 ResourceManager
4201 DataNode
3435 QuorumPeerMain

6.2 查看Web UI

HDFS 和 YARN 的端口号分别为 500708080,界面应该如下:

此时 hadoop001 上的 NameNode 处于可用状态:

https://github.com/heibaiying
而 hadoop002 上的 NameNode 则处于备用状态:

<br/>

https://github.com/heibaiying
<br/>

hadoop002 上的 ResourceManager 处于可用状态:

<br/>

https://github.com/heibaiying
<br/>

hadoop003 上的 ResourceManager 则处于备用状态:

<br/>

https://github.com/heibaiying
<br/>

同时界面上也有 Journal Manager 的相关信息:

<br/>

https://github.com/heibaiying

七、集群的二次启动

上面的集群初次启动涉及到一些必要初始化操作,所以过程略显繁琐。但是集群一旦搭建好后,想要再次启用它是比较方便的,步骤如下(首选需要确保 ZooKeeper 集群已经启动):

hadoop001 启动 HDFS,此时会启动所有与 HDFS 高可用相关的服务,包括 NameNode,DataNode 和 JournalNode:

start-dfs.sh

hadoop002 启动 YARN:

start-yarn.sh

这个时候 hadoop003 上的 ResourceManager 服务通常还是没有启动的,需要手动启动:

yarn-daemon.sh start resourcemanager

参考资料

以上搭建步骤主要参考自官方文档:

关于 Hadoop 高可用原理的详细分析,推荐阅读:

Hadoop NameNode 高可用 (High Availability) 实现解析

更多大数据系列文章可以参见 GitHub 开源项目大数据入门指南
查看原文

赞 0 收藏 0 评论 0

黑白影 发布了文章 · 2019-09-21

Hadoop 系列(七)—— HDFS Java API

一、 简介

想要使用 HDFS API,需要导入依赖 hadoop-client。如果是 CDH 版本的 Hadoop,还需要额外指明其仓库地址:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.heibaiying</groupId>
    <artifactId>hdfs-java-api</artifactId>
    <version>1.0</version>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hadoop.version>2.6.0-cdh5.15.2</hadoop.version>
    </properties>


    <!---配置 CDH 仓库地址-->
    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>


    <dependencies>
        <!--Hadoop-client-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

二、API的使用

2.1 FileSystem

FileSystem 是所有 HDFS 操作的主入口。由于之后的每个单元测试都需要用到它,这里使用 @Before 注解进行标注。

private static final String HDFS_PATH = "hdfs://192.168.0.106:8020";
private static final String HDFS_USER = "root";
private static FileSystem fileSystem;

@Before
public void prepare() {
    try {
        Configuration configuration = new Configuration();
        // 这里我启动的是单节点的 Hadoop,所以副本系数设置为 1,默认值为 3
        configuration.set("dfs.replication", "1");
        fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER);
    } catch (IOException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (URISyntaxException e) {
        e.printStackTrace();
    }
}


@After
public void destroy() {
    fileSystem = null;
}

2.2 创建目录

支持递归创建目录:

@Test
public void mkDir() throws Exception {
    fileSystem.mkdirs(new Path("/hdfs-api/test0/"));
}

2.3 创建指定权限的目录

FsPermission(FsAction u, FsAction g, FsAction o) 的三个参数分别对应:创建者权限,同组其他用户权限,其他用户权限,权限值定义在 FsAction 枚举类中。

@Test
public void mkDirWithPermission() throws Exception {
    fileSystem.mkdirs(new Path("/hdfs-api/test1/"),
            new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.READ));
}

2.4 创建文件,并写入内容

@Test
public void create() throws Exception {
    // 如果文件存在,默认会覆盖, 可以通过第二个参数进行控制。第三个参数可以控制使用缓冲区的大小
    FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/a.txt"),
                                               true, 4096);
    out.write("hello hadoop!".getBytes());
    out.write("hello spark!".getBytes());
    out.write("hello flink!".getBytes());
    // 强制将缓冲区中内容刷出
    out.flush();
    out.close();
}

2.5 判断文件是否存在

@Test
public void exist() throws Exception {
    boolean exists = fileSystem.exists(new Path("/hdfs-api/test/a.txt"));
    System.out.println(exists);
}

2.6 查看文件内容

查看小文本文件的内容,直接转换成字符串后输出:

@Test
public void readToString() throws Exception {
    FSDataInputStream inputStream = fileSystem.open(new Path("/hdfs-api/test/a.txt"));
    String context = inputStreamToString(inputStream, "utf-8");
    System.out.println(context);
}

inputStreamToString 是一个自定义方法,代码如下:

/**
 * 把输入流转换为指定编码的字符
 *
 * @param inputStream 输入流
 * @param encode      指定编码类型
 */
private static String inputStreamToString(InputStream inputStream, String encode) {
    try {
        if (encode == null || ("".equals(encode))) {
            encode = "utf-8";
        }
        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encode));
        StringBuilder builder = new StringBuilder();
        String str = "";
        while ((str = reader.readLine()) != null) {
            builder.append(str).append("\n");
        }
        return builder.toString();
    } catch (IOException e) {
        e.printStackTrace();
    }
    return null;
}

2.7 文件重命名

@Test
public void rename() throws Exception {
    Path oldPath = new Path("/hdfs-api/test/a.txt");
    Path newPath = new Path("/hdfs-api/test/b.txt");
    boolean result = fileSystem.rename(oldPath, newPath);
    System.out.println(result);
}

2.8 删除目录或文件

public void delete() throws Exception {
    /*
     *  第二个参数代表是否递归删除
     *    +  如果 path 是一个目录且递归删除为 true, 则删除该目录及其中所有文件;
     *    +  如果 path 是一个目录但递归删除为 false,则会则抛出异常。
     */
    boolean result = fileSystem.delete(new Path("/hdfs-api/test/b.txt"), true);
    System.out.println(result);
}

2.9 上传文件到HDFS

@Test
public void copyFromLocalFile() throws Exception {
    // 如果指定的是目录,则会把目录及其中的文件都复制到指定目录下
    Path src = new Path("D:\\BigData-Notes\\notes\\installation");
    Path dst = new Path("/hdfs-api/test/");
    fileSystem.copyFromLocalFile(src, dst);
}

2.10 上传大文件并显示上传进度

@Test
    public void copyFromLocalBigFile() throws Exception {

        File file = new File("D:\\kafka.tgz");
        final float fileSize = file.length();
        InputStream in = new BufferedInputStream(new FileInputStream(file));

        FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/kafka5.tgz"),
                new Progressable() {
                  long fileCount = 0;

                  public void progress() {
                     fileCount++;
                     // progress 方法每上传大约 64KB 的数据后就会被调用一次
                     System.out.println("上传进度:" + (fileCount * 64 * 1024 / fileSize) * 100 + " %");
                   }
                });

        IOUtils.copyBytes(in, out, 4096);

    }

2.11 从HDFS上下载文件

@Test
public void copyToLocalFile() throws Exception {
    Path src = new Path("/hdfs-api/test/kafka.tgz");
    Path dst = new Path("D:\\app\\");
    /*
     * 第一个参数控制下载完成后是否删除源文件,默认是 true,即删除;
     * 最后一个参数表示是否将 RawLocalFileSystem 用作本地文件系统;
     * RawLocalFileSystem 默认为 false,通常情况下可以不设置,
     * 但如果你在执行时候抛出 NullPointerException 异常,则代表你的文件系统与程序可能存在不兼容的情况 (window 下常见),
     * 此时可以将 RawLocalFileSystem 设置为 true
     */
    fileSystem.copyToLocalFile(false, src, dst, true);
}

2.12 查看指定目录下所有文件的信息

public void listFiles() throws Exception {
    FileStatus[] statuses = fileSystem.listStatus(new Path("/hdfs-api"));
    for (FileStatus fileStatus : statuses) {
        //fileStatus 的 toString 方法被重写过,直接打印可以看到所有信息
        System.out.println(fileStatus.toString());
    }
}

FileStatus 中包含了文件的基本信息,比如文件路径,是否是文件夹,修改时间,访问时间,所有者,所属组,文件权限,是否是符号链接等,输出内容示例如下:

FileStatus{
path=hdfs://192.168.0.106:8020/hdfs-api/test; 
isDirectory=true; 
modification_time=1556680796191; 
access_time=0; 
owner=root; 
group=supergroup; 
permission=rwxr-xr-x; 
isSymlink=false
}

2.13 递归查看指定目录下所有文件的信息

@Test
public void listFilesRecursive() throws Exception {
    RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path("/hbase"), true);
    while (files.hasNext()) {
        System.out.println(files.next());
    }
}

和上面输出类似,只是多了文本大小,副本系数,块大小信息。

LocatedFileStatus{
path=hdfs://192.168.0.106:8020/hbase/hbase.version; 
isDirectory=false; 
length=7; 
replication=1; 
blocksize=134217728; 
modification_time=1554129052916; 
access_time=1554902661455; 
owner=root; group=supergroup;
permission=rw-r--r--; 
isSymlink=false}

2.14 查看文件的块信息

@Test
public void getFileBlockLocations() throws Exception {

    FileStatus fileStatus = fileSystem.getFileStatus(new Path("/hdfs-api/test/kafka.tgz"));
    BlockLocation[] blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
    for (BlockLocation block : blocks) {
        System.out.println(block);
    }
}

块输出信息有三个值,分别是文件的起始偏移量 (offset),文件大小 (length),块所在的主机名 (hosts)。

0,57028557,hadoop001

这里我上传的文件只有 57M(小于 128M),且程序中设置了副本系数为 1,所有只有一个块信息。

以上所有测试用例下载地址HDFS Java API

更多大数据系列文章可以参见 GitHub 开源项目大数据入门指南
查看原文

赞 0 收藏 0 评论 0

黑白影 发布了文章 · 2019-09-21

Hadoop 系列(六)—— HDFS 常用 Shell 命令

一、基本命令

打开 Hbase Shell:

# hbase shell

1.1 获取帮助

# 获取帮助
help
# 获取命令的详细信息
help 'status'

1.2 查看服务器状态

status

1.3 查看版本信息

version

二、关于表的操作

2.1 查看所有表

list

2.2 创建表

命令格式: create '表名称', '列族名称 1','列族名称 2','列名称 N'

# 创建一张名为Student的表,包含基本信息(baseInfo)、学校信息(schoolInfo)两个列族
create 'Student','baseInfo','schoolInfo'

2.3 查看表的基本信息

命令格式:desc '表名'

describe 'Student'

2.4 表的启用/禁用

enable 和 disable 可以启用/禁用这个表,is_enabled 和 is_disabled 来检查表是否被禁用

# 禁用表
disable 'Student'
# 检查表是否被禁用
is_disabled 'Student'
# 启用表
enable 'Student'
# 检查表是否被启用
is_enabled 'Student'

2.5 检查表是否存在

exists 'Student'

2.6 删除表

# 删除表前需要先禁用表
disable 'Student'
# 删除表
drop 'Student'

三、增删改

3.1 添加列族

命令格式: alter '表名', '列族名'

alter 'Student', 'teacherInfo'

3.2 删除列族

命令格式:alter '表名', {NAME => '列族名', METHOD => 'delete'}

alter 'Student', {NAME => 'teacherInfo', METHOD => 'delete'}

3.3 更改列族存储版本的限制

默认情况下,列族只存储一个版本的数据,如果需要存储多个版本的数据,则需要修改列族的属性。修改后可通过 desc 命令查看。

alter 'Student',{NAME=>'baseInfo',VERSIONS=>3}

3.4 插入数据

命令格式:put '表名', '行键','列族:列','值'

注意:如果新增数据的行键值、列族名、列名与原有数据完全相同,则相当于更新操作

put 'Student', 'rowkey1','baseInfo:name','tom'
put 'Student', 'rowkey1','baseInfo:birthday','1990-01-09'
put 'Student', 'rowkey1','baseInfo:age','29'
put 'Student', 'rowkey1','schoolInfo:name','Havard'
put 'Student', 'rowkey1','schoolInfo:localtion','Boston'

put 'Student', 'rowkey2','baseInfo:name','jack'
put 'Student', 'rowkey2','baseInfo:birthday','1998-08-22'
put 'Student', 'rowkey2','baseInfo:age','21'
put 'Student', 'rowkey2','schoolInfo:name','yale'
put 'Student', 'rowkey2','schoolInfo:localtion','New Haven'

put 'Student', 'rowkey3','baseInfo:name','maike'
put 'Student', 'rowkey3','baseInfo:birthday','1995-01-22'
put 'Student', 'rowkey3','baseInfo:age','24'
put 'Student', 'rowkey3','schoolInfo:name','yale'
put 'Student', 'rowkey3','schoolInfo:localtion','New Haven'

put 'Student', 'wrowkey4','baseInfo:name','maike-jack'

3.5 获取指定行、指定行中的列族、列的信息

# 获取指定行中所有列的数据信息
get 'Student','rowkey3'
# 获取指定行中指定列族下所有列的数据信息
get 'Student','rowkey3','baseInfo'
# 获取指定行中指定列的数据信息
get 'Student','rowkey3','baseInfo:name'

3.6 删除指定行、指定行中的列

# 删除指定行
delete 'Student','rowkey3'
# 删除指定行中指定列的数据
delete 'Student','rowkey3','baseInfo:name'

四、查询

hbase 中访问数据有两种基本的方式:

  • 按指定 rowkey 获取数据:get 方法;
  • 按指定条件获取数据:scan 方法。

scan 可以设置 begin 和 end 参数来访问一个范围内所有的数据。get 本质上就是 begin 和 end 相等的一种特殊的 scan。

4.1Get查询

# 获取指定行中所有列的数据信息
get 'Student','rowkey3'
# 获取指定行中指定列族下所有列的数据信息
get 'Student','rowkey3','baseInfo'
# 获取指定行中指定列的数据信息
get 'Student','rowkey3','baseInfo:name'

4.2 查询整表数据

scan 'Student'

4.3 查询指定列簇的数据

scan 'Student', {COLUMN=>'baseInfo'}

4.4 条件查询

# 查询指定列的数据
scan 'Student', {COLUMNS=> 'baseInfo:birthday'}

除了列 (COLUMNS) 修饰词外,HBase 还支持 Limit(限制查询结果行数),STARTROWROWKEY 起始行,会先根据这个 key 定位到 region,再向后扫描)、STOPROW(结束行)、TIMERANGE(限定时间戳范围)、VERSIONS(版本数)、和 FILTER(按条件过滤行)等。

如下代表从 rowkey2 这个 rowkey 开始,查找下两个行的最新 3 个版本的 name 列的数据:

scan 'Student', {COLUMNS=> 'baseInfo:name',STARTROW => 'rowkey2',STOPROW => 'wrowkey4',LIMIT=>2, VERSIONS=>3}

4.5 条件过滤

Filter 可以设定一系列条件来进行过滤。如我们要查询值等于 24 的所有数据:

scan 'Student', FILTER=>"ValueFilter(=,'binary:24')"

值包含 yale 的所有数据:

scan 'Student', FILTER=>"ValueFilter(=,'substring:yale')"

列名中的前缀为 birth 的:

scan 'Student', FILTER=>"ColumnPrefixFilter('birth')"

FILTER 中支持多个过滤条件通过括号、AND 和 OR 进行组合:

# 列名中的前缀为birth且列值中包含1998的数据
scan 'Student', FILTER=>"ColumnPrefixFilter('birth') AND ValueFilter ValueFilter(=,'substring:1998')"

PrefixFilter 用于对 Rowkey 的前缀进行判断:

scan 'Student', FILTER=>"PrefixFilter('wr')"
更多大数据系列文章可以参见 GitHub 开源项目大数据入门指南
查看原文

赞 0 收藏 0 评论 0

黑白影 发布了文章 · 2019-09-18

Hadoop 系列(五)—— Hadoop 集群环境搭建

一、集群规划

这里搭建一个 3 节点的 HBase 集群,其中三台主机上均为 Regin Server。同时为了保证高可用,除了在 hadoop001 上部署主 Master 服务外,还在 hadoop002 上部署备用的 Master 服务。Master 服务由 Zookeeper 集群进行协调管理,如果主 Master 不可用,则备用 Master 会成为新的主 Master

https://github.com/heibaiying

二、前置条件

HBase 的运行需要依赖 Hadoop 和 JDK(HBase 2.0+ 对应 JDK 1.8+) 。同时为了保证高可用,这里我们不采用 HBase 内置的 Zookeeper 服务,而采用外置的 Zookeeper 集群。相关搭建步骤可以参阅:

三、集群搭建

3.1 下载并解压

下载并解压,这里我下载的是 CDH 版本 HBase,下载地址为:http://archive.cloudera.com/c...

# tar -zxvf hbase-1.2.0-cdh5.15.2.tar.gz

3.2 配置环境变量

# vim /etc/profile

添加环境变量:

export HBASE_HOME=usr/app/hbase-1.2.0-cdh5.15.2
export PATH=$HBASE_HOME/bin:$PATH

使得配置的环境变量立即生效:

# source /etc/profile

3.3 集群配置

进入 ${HBASE_HOME}/conf 目录下,修改配置:

1. hbase-env.sh

# 配置JDK安装位置
export JAVA_HOME=/usr/java/jdk1.8.0_201
# 不使用内置的zookeeper服务
export HBASE_MANAGES_ZK=false

2. hbase-site.xml

<configuration>
    <property>
        <!-- 指定 hbase 以分布式集群的方式运行 -->
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <!-- 指定 hbase 在 HDFS 上的存储位置 -->
        <name>hbase.rootdir</name>
        <value>hdfs://hadoop001:8020/hbase</value>
    </property>
    <property>
        <!-- 指定 zookeeper 的地址-->
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop001:2181,hadoop002:2181,hadoop003:2181</value>
    </property>
</configuration>

3. regionservers

hadoop001
hadoop002
hadoop003

4. backup-masters

hadoop002

backup-masters 这个文件是不存在的,需要新建,主要用来指明备用的 master 节点,可以是多个,这里我们以 1 个为例。

3.4 HDFS客户端配置

这里有一个可选的配置:如果您在 Hadoop 集群上进行了 HDFS 客户端配置的更改,比如将副本系数 dfs.replication 设置成 5,则必须使用以下方法之一来使 HBase 知道,否则 HBase 将依旧使用默认的副本系数 3 来创建文件:

  1. Add a pointer to your HADOOP_CONF_DIR to the HBASE_CLASSPATH environment variable in hbase-env.sh.
  2. Add a copy of hdfs-site.xml (or hadoop-site.xml) or, better, symlinks, under ${HBASE_HOME}/conf, or
  3. if only a small set of HDFS client configurations, add them to hbase-site.xml.

以上是官方文档的说明,这里解释一下:

第一种 :将 Hadoop 配置文件的位置信息添加到 hbase-env.shHBASE_CLASSPATH 属性,示例如下:

export HBASE_CLASSPATH=usr/app/hadoop-2.6.0-cdh5.15.2/etc/hadoop

第二种 :将 Hadoop 的 hdfs-site.xmlhadoop-site.xml 拷贝到 ${HBASE_HOME}/conf 目录下,或者通过符号链接的方式。如果采用这种方式的话,建议将两者都拷贝或建立符号链接,示例如下:

# 拷贝
cp core-site.xml hdfs-site.xml /usr/app/hbase-1.2.0-cdh5.15.2/conf/
# 使用符号链接
ln -s   /usr/app/hadoop-2.6.0-cdh5.15.2/etc/hadoop/core-site.xml
ln -s   /usr/app/hadoop-2.6.0-cdh5.15.2/etc/hadoop/hdfs-site.xml
注:hadoop-site.xml 这个配置文件现在叫做 core-site.xml

第三种 :如果你只有少量更改,那么直接配置到 hbase-site.xml 中即可。

3.5 安装包分发

将 HBase 的安装包分发到其他服务器,分发后建议在这两台服务器上也配置一下 HBase 的环境变量。

scp -r /usr/app/hbase-1.2.0-cdh5.15.2/  hadoop002:usr/app/
scp -r /usr/app/hbase-1.2.0-cdh5.15.2/  hadoop003:usr/app/

四、启动集群

4.1 启动ZooKeeper集群

分别到三台服务器上启动 ZooKeeper 服务:

 zkServer.sh start

4.2 启动Hadoop集群

# 启动dfs服务
start-dfs.sh
# 启动yarn服务
start-yarn.sh

4.3 启动HBase集群

进入 hadoop001 的 ${HBASE_HOME}/bin,使用以下命令启动 HBase 集群。执行此命令后,会在 hadoop001 上启动 Master 服务,在 hadoop002 上启动备用 Master 服务,在 regionservers 文件中配置的所有节点启动 region server 服务。

start-hbase.sh

4.5 查看服务

访问 HBase 的 Web-UI 界面,这里我安装的 HBase 版本为 1.2,访问端口为 60010,如果你安装的是 2.0 以上的版本,则访问端口号为 16010。可以看到 Master 在 hadoop001 上,三个 Regin Servers 分别在 hadoop001,hadoop002,和 hadoop003 上,并且还有一个 Backup Matser 服务在 hadoop002 上。

https://github.com/heibaiying
<br/>

hadoop002 上的 HBase 出于备用状态:

<br/>

https://github.com/heibaiying

更多大数据系列文章可以参见 GitHub 开源项目大数据入门指南
查看原文

赞 0 收藏 0 评论 0

黑白影 发布了文章 · 2019-09-17

Hadoop 系列(四)—— Hadoop 开发环境搭建

一、前置条件

Hadoop 的运行依赖 JDK,需要预先安装,安装步骤见:

二、配置免密登录

Hadoop 组件之间需要基于 SSH 进行通讯。

2.1 配置映射

配置 ip 地址和主机名映射:

vim /etc/hosts
# 文件末尾增加
192.168.43.202  hadoop001

2.2 生成公私钥

执行下面命令行生成公匙和私匙:

ssh-keygen -t rsa

3.3 授权

进入 ~/.ssh 目录下,查看生成的公匙和私匙,并将公匙写入到授权文件:

[root@@hadoop001 sbin]#  cd ~/.ssh
[root@@hadoop001 .ssh]# ll
-rw-------. 1 root root 1675 3 月  15 09:48 id_rsa
-rw-r--r--. 1 root root  388 3 月  15 09:48 id_rsa.pub
# 写入公匙到授权文件
[root@hadoop001 .ssh]# cat id_rsa.pub >> authorized_keys
[root@hadoop001 .ssh]# chmod 600 authorized_keys

三、Hadoop(HDFS)环境搭建

3.1 下载并解压

下载 Hadoop 安装包,这里我下载的是 CDH 版本的,下载地址为:http://archive.cloudera.com/c...

# 解压
tar -zvxf hadoop-2.6.0-cdh5.15.2.tar.gz 

3.2 配置环境变量

# vi /etc/profile

配置环境变量:

export HADOOP_HOME=/usr/app/hadoop-2.6.0-cdh5.15.2
export  PATH=${HADOOP_HOME}/bin:$PATH

执行 source 命令,使得配置的环境变量立即生效:

# source /etc/profile

3.3 修改Hadoop配置

进入 ${HADOOP_HOME}/etc/hadoop/ 目录下,修改以下配置:

1. hadoop-env.sh

# JDK安装路径
export  JAVA_HOME=/usr/java/jdk1.8.0_201/

2. core-site.xml

<configuration>
    <property>
        <!--指定 namenode 的 hdfs 协议文件系统的通信地址-->
        <name>fs.defaultFS</name>
        <value>hdfs://hadoop001:8020</value>
    </property>
    <property>
        <!--指定 hadoop 存储临时文件的目录-->
        <name>hadoop.tmp.dir</name>
        <value>/home/hadoop/tmp</value>
    </property>
</configuration>

3. hdfs-site.xml

指定副本系数和临时文件存储位置:

<configuration>
    <property>
        <!--由于我们这里搭建是单机版本,所以指定 dfs 的副本系数为 1-->
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

4. slaves

配置所有从属节点的主机名或 IP 地址,由于是单机版本,所以指定本机即可:

hadoop001

3.4 关闭防火墙

不关闭防火墙可能导致无法访问 Hadoop 的 Web UI 界面:

# 查看防火墙状态
sudo firewall-cmd --state
# 关闭防火墙:
sudo systemctl stop firewalld.service

3.5 初始化

第一次启动 Hadoop 时需要进行初始化,进入 ${HADOOP_HOME}/bin/ 目录下,执行以下命令:

[root@hadoop001 bin]# ./hdfs namenode -format

3.6 启动HDFS

进入 ${HADOOP_HOME}/sbin/ 目录下,启动 HDFS:

[root@hadoop001 sbin]# ./start-dfs.sh

3.7 验证是否启动成功

方式一:执行 jps 查看 NameNodeDataNode 服务是否已经启动:

[root@hadoop001 hadoop-2.6.0-cdh5.15.2]# jps
9137 DataNode
9026 NameNode
9390 SecondaryNameNode

方式二:查看 Web UI 界面,端口为 50070

https://github.com/heibaiying

四、Hadoop(YARN)环境搭建

4.1 修改配置

进入 ${HADOOP_HOME}/etc/hadoop/ 目录下,修改以下配置:

1. mapred-site.xml

# 如果没有mapred-site.xml,则拷贝一份样例文件后再修改
cp mapred-site.xml.template mapred-site.xml
<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

2. yarn-site.xml

<configuration>
    <property>
        <!--配置 NodeManager 上运行的附属服务。需要配置成 mapreduce_shuffle 后才可以在 Yarn 上运行 MapReduce 程序。-->
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
</configuration>

4.2 启动服务

进入 ${HADOOP_HOME}/sbin/ 目录下,启动 YARN:

./start-yarn.sh

4.3 验证是否启动成功

方式一:执行 jps 命令查看 NodeManagerResourceManager 服务是否已经启动:

[root@hadoop001 hadoop-2.6.0-cdh5.15.2]# jps
9137 DataNode
9026 NameNode
12294 NodeManager
12185 ResourceManager
9390 SecondaryNameNode

方式二:查看 Web UI 界面,端口号为 8088

https://github.com/heibaiying

更多大数据系列文章可以参见 GitHub 开源项目大数据入门指南
查看原文

赞 0 收藏 0 评论 0

黑白影 发布了文章 · 2019-09-16

Hadoop 系列(三)—— 分布式计算框架 MapReduce

一、MapReduce概述

Hadoop MapReduce 是一个分布式计算框架,用于编写批处理应用程序。编写好的程序可以提交到 Hadoop 集群上用于并行处理大规模的数据集。

MapReduce 作业通过将输入的数据集拆分为独立的块,这些块由 map 以并行的方式处理,框架对 map 的输出进行排序,然后输入到 reduce 中。MapReduce 框架专门用于 <key,value> 键值对处理,它将作业的输入视为一组 <key,value> 对,并生成一组 <key,value> 对作为输出。输出和输出的 keyvalue 都必须实现Writable 接口。

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

二、MapReduce编程模型简述

这里以词频统计为例进行说明,MapReduce 处理的流程如下:

https://github.com/heibaiying

  1. input : 读取文本文件;
  2. splitting : 将文件按照行进行拆分,此时得到的 K1 行数,V1 表示对应行的文本内容;
  3. mapping : 并行将每一行按照空格进行拆分,拆分得到的 List(K2,V2),其中 K2 代表每一个单词,由于是做词频统计,所以 V2 的值为 1,代表出现 1 次;
  4. shuffling:由于 Mapping 操作可能是在不同的机器上并行处理的,所以需要通过 shuffling 将相同 key 值的数据分发到同一个节点上去合并,这样才能统计出最终的结果,此时得到 K2 为每一个单词,List(V2) 为可迭代集合,V2 就是 Mapping 中的 V2;
  5. Reducing : 这里的案例是统计单词出现的总次数,所以 ReducingList(V2) 进行归约求和操作,最终输出。

MapReduce 编程模型中 splittingshuffing 操作都是由框架实现的,需要我们自己编程实现的只有 mappingreducing,这也就是 MapReduce 这个称呼的来源。

三、combiner & partitioner

https://github.com/heibaiying

3.1 InputFormat & RecordReaders

InputFormat 将输出文件拆分为多个 InputSplit,并由 RecordReadersInputSplit 转换为标准的<key,value>键值对,作为 map 的输出。这一步的意义在于只有先进行逻辑拆分并转为标准的键值对格式后,才能为多个 map 提供输入,以便进行并行处理。

3.2 Combiner

combinermap 运算后的可选操作,它实际上是一个本地化的 reduce 操作,它主要是在 map 计算出中间文件后做一个简单的合并重复 key 值的操作。这里以词频统计为例:

map 在遇到一个 hadoop 的单词时就会记录为 1,但是这篇文章里 hadoop 可能会出现 n 多次,那么 map 输出文件冗余就会很多,因此在 reduce 计算前对相同的 key 做一个合并操作,那么需要传输的数据量就会减少,传输效率就可以得到提升。

但并非所有场景都适合使用 combiner,使用它的原则是 combiner 的输出不会影响到 reduce 计算的最终输入,例如:求总数,最大值,最小值时都可以使用 combiner,但是做平均值计算则不能使用 combiner

不使用 combiner 的情况:

<div align="center"> <img width="600px" data-original="https://raw.githubusercontent...;/> </div>
使用 combiner 的情况:

<div align="center"> <img width="600px" data-original="https://raw.githubusercontent...;/> </div>

可以看到使用 combiner 的时候,需要传输到 reducer 中的数据由 12keys,降低到 10keys。降低的幅度取决于你 keys 的重复率,下文词频统计案例会演示用 combiner 降低数百倍的传输量。

3.3 Partitioner

partitioner 可以理解成分类器,将 map 的输出按照 key 值的不同分别分给对应的 reducer,支持自定义实现,下文案例会给出演示。

四、MapReduce词频统计案例

4.1 项目简介

这里给出一个经典的词频统计的案例:统计如下样本数据中每个单词出现的次数。

Spark    HBase
Hive    Flink    Storm    Hadoop    HBase    Spark
Flink
HBase    Storm
HBase    Hadoop    Hive    Flink
HBase    Flink    Hive    Storm
Hive    Flink    Hadoop
HBase    Hive
Hadoop    Spark    HBase    Storm
HBase    Hadoop    Hive    Flink
HBase    Flink    Hive    Storm
Hive    Flink    Hadoop
HBase    Hive

为方便大家开发,我在项目源码中放置了一个工具类 WordCountDataUtils,用于模拟产生词频统计的样本,生成的文件支持输出到本地或者直接写到 HDFS 上。

项目完整源码下载地址:hadoop-word-count

4.2 项目依赖

想要进行 MapReduce 编程,需要导入 hadoop-client 依赖:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
</dependency>

4.3 WordCountMapper

将每行数据按照指定分隔符进行拆分。这里需要注意在 MapReduce 中必须使用 Hadoop 定义的类型,因为 Hadoop 预定义的类型都是可序列化,可比较的,所有类型均实现了 WritableComparable 接口。

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, 
                                                                      InterruptedException {
        String[] words = value.toString().split("\t");
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }

}

WordCountMapper 对应下图的 Mapping 操作:

https://github.com/heibaiying

WordCountMapper 继承自 Mappe 类,这是一个泛型类,定义如下:

WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
   ......
}
  • KEYIN : mapping 输入 key 的类型,即每行的偏移量 (每行第一个字符在整个文本中的位置),Long 类型,对应 Hadoop 中的 LongWritable 类型;
  • VALUEIN : mapping 输入 value 的类型,即每行数据;String 类型,对应 Hadoop 中 Text 类型;
  • KEYOUTmapping 输出的 key 的类型,即每个单词;String 类型,对应 Hadoop 中 Text 类型;
  • VALUEOUTmapping 输出 value 的类型,即每个单词出现的次数;这里用 int 类型,对应 IntWritable 类型。

4.4 WordCountReducer

在 Reduce 中进行单词出现次数的统计:

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, 
                                                                                  InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}

如下图,shuffling 的输出是 reduce 的输入。这里的 key 是每个单词,values 是一个可迭代的数据类型,类似 (1,1,1,...)

https://github.com/heibaiying

4.4 WordCountApp

组装 MapReduce 作业,并提交到服务器运行,代码如下:


/**
 * 组装作业 并提交到集群运行
 */
public class WordCountApp {


    // 这里为了直观显示参数 使用了硬编码,实际开发中可以通过外部传参
    private static final String HDFS_URL = "hdfs://192.168.0.107:8020";
    private static final String HADOOP_USER_NAME = "root";

    public static void main(String[] args) throws Exception {

        //  文件输入路径和输出路径由外部传参指定
        if (args.length < 2) {
            System.out.println("Input and output paths are necessary!");
            return;
        }

        // 需要指明 hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常
        System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);

        Configuration configuration = new Configuration();
        // 指明 HDFS 的地址
        configuration.set("fs.defaultFS", HDFS_URL);

        // 创建一个 Job
        Job job = Job.getInstance(configuration);

        // 设置运行的主类
        job.setJarByClass(WordCountApp.class);

        // 设置 Mapper 和 Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 设置 Mapper 输出 key 和 value 的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 设置 Reducer 输出 key 和 value 的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常
        FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
        Path outputPath = new Path(args[1]);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }

        // 设置作业输入文件和输出文件的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, outputPath);

        // 将作业提交到群集并等待它完成,参数设置为 true 代表打印显示对应的进度
        boolean result = job.waitForCompletion(true);

        // 关闭之前创建的 fileSystem
        fileSystem.close();

        // 根据作业结果,终止当前运行的 Java 虚拟机,退出程序
        System.exit(result ? 0 : -1);

    }
}

需要注意的是:如果不设置 Mapper 操作的输出类型,则程序默认它和 Reducer 操作输出的类型相同。

4.5 提交到服务器运行

在实际开发中,可以在本机配置 hadoop 开发环境,直接在 IDE 中启动进行测试。这里主要介绍一下打包提交到服务器运行。由于本项目没有使用除 Hadoop 外的第三方依赖,直接打包即可:

# mvn clean package

使用以下命令提交作业:

hadoop jar /usr/appjar/hadoop-word-count-1.0.jar \
com.heibaiying.WordCountApp \
/wordcount/input.txt /wordcount/output/WordCountApp

作业完成后查看 HDFS 上生成目录:

# 查看目录
hadoop fs -ls /wordcount/output/WordCountApp

# 查看统计结果
hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000

https://github.com/heibaiying

五、词频统计案例进阶之Combiner

5.1 代码实现

想要使用 combiner 功能只要在组装作业时,添加下面一行代码即可:

// 设置 Combiner
job.setCombinerClass(WordCountReducer.class);

5.2 执行结果

加入 combiner 后统计结果是不会有变化的,但是可以从打印的日志看出 combiner 的效果:

没有加入 combiner 的打印日志:

https://github.com/heibaiying

加入 combiner 后的打印日志如下:

https://github.com/heibaiying

这里我们只有一个输入文件并且小于 128M,所以只有一个 Map 进行处理。可以看到经过 combiner 后,records 由 3519 降低为 6(样本中单词种类就只有 6 种),在这个用例中 combiner 就能极大地降低需要传输的数据量。

六、词频统计案例进阶之Partitioner

6.1 默认的Partitioner

这里假设有个需求:将不同单词的统计结果输出到不同文件。这种需求实际上比较常见,比如统计产品的销量时,需要将结果按照产品种类进行拆分。要实现这个功能,就需要用到自定义 Partitioner

这里先介绍下 MapReduce 默认的分类规则:在构建 job 时候,如果不指定,默认的使用的是 HashPartitioner:对 key 值进行哈希散列并对 numReduceTasks 取余。其实现如下:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

6.2 自定义Partitioner

这里我们继承 Partitioner 自定义分类规则,这里按照单词进行分类:

public class CustomPartitioner extends Partitioner<Text, IntWritable> {

    public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
        return WordCountDataUtils.WORD_LIST.indexOf(text.toString());
    }
}

在构建 job 时候指定使用我们自己的分类规则,并设置 reduce 的个数:

// 设置自定义分区规则
job.setPartitionerClass(CustomPartitioner.class);
// 设置 reduce 个数
job.setNumReduceTasks(WordCountDataUtils.WORD_LIST.size());

6.3 执行结果

执行结果如下,分别生成 6 个文件,每个文件中为对应单词的统计结果:

https://github.com/heibaiying

参考资料

  1. 分布式计算框架 MapReduce
  2. Apache Hadoop 2.9.2 > MapReduce Tutorial
  3. MapReduce - Combiners
更多大数据系列文章可以参见 GitHub 开源项目大数据入门指南
查看原文

赞 0 收藏 0 评论 0

黑白影 发布了文章 · 2019-09-13

Hadoop 系列(二)—— 集群资源管理器 YARN

一、hadoop yarn 简介

Apache YARN (Yet Another Resource Negotiator) 是 hadoop 2.0 引入的集群资源管理系统。用户可以将各种服务框架部署在 YARN 上,由 YARN 进行统一地管理和资源分配。

<div align="center"> <img width="600px" data-original="https://raw.githubusercontent...;/> </div>

二、YARN架构

https://github.com/heibaiying

1. ResourceManager

ResourceManager 通常在独立的机器上以后台进程的形式运行,它是整个集群资源的主要协调者和管理者。ResourceManager 负责给用户提交的所有应用程序分配资源,它根据应用程序优先级、队列容量、ACLs、数据位置等信息,做出决策,然后以共享的、安全的、多租户的方式制定分配策略,调度集群资源。

2. NodeManager

NodeManager 是 YARN 集群中的每个具体节点的管理者。主要负责该节点内所有容器的生命周期的管理,监视资源和跟踪节点健康。具体如下:

  • 启动时向 ResourceManager 注册并定时发送心跳消息,等待 ResourceManager 的指令;
  • 维护 Container 的生命周期,监控 Container 的资源使用情况;
  • 管理任务运行时的相关依赖,根据 ApplicationMaster 的需要,在启动 Container 之前将需要的程序及其依赖拷贝到本地。

3. ApplicationMaster

在用户提交一个应用程序时,YARN 会启动一个轻量级的进程 ApplicationMasterApplicationMaster 负责协调来自 ResourceManager 的资源,并通过 NodeManager 监视容器内资源的使用情况,同时还负责任务的监控与容错。具体如下:

  • 根据应用的运行状态来决定动态计算资源需求;
  • ResourceManager 申请资源,监控申请的资源的使用情况;
  • 跟踪任务状态和进度,报告资源的使用情况和应用的进度信息;
  • 负责任务的容错。

4. Contain

Container 是 YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等。当 AM 向 RM 申请资源时,RM 为 AM 返回的资源是用 Container 表示的。YARN 会为每个任务分配一个 Container,该任务只能使用该 Container 中描述的资源。ApplicationMaster 可在 Container 内运行任何类型的任务。例如,MapReduce ApplicationMaster 请求一个容器来启动 map 或 reduce 任务,而 Giraph ApplicationMaster 请求一个容器来运行 Giraph 任务。

三、YARN工作原理简述

<div align="center"> <img data-original="https://raw.githubusercontent...;/> </div>

  1. Client 提交作业到 YARN 上;
  2. Resource Manager 选择一个 Node Manager,启动一个 Container 并运行 Application Master 实例;
  3. Application Master 根据实际需要向 Resource Manager 请求更多的 Container 资源(如果作业很小, 应用管理器会选择在其自己的 JVM 中运行任务);
  4. Application Master 通过获取到的 Container 资源执行分布式计算。

四、YARN工作原理详述

https://github.com/heibaiying

1. 作业提交

client 调用 job.waitForCompletion 方法,向整个集群提交 MapReduce 作业 (第 1 步) 。新的作业 ID(应用 ID) 由资源管理器分配 (第 2 步)。作业的 client 核实作业的输出, 计算输入的 split, 将作业的资源 (包括 Jar 包,配置文件, split 信息) 拷贝给 HDFS(第 3 步)。 最后, 通过调用资源管理器的 submitApplication() 来提交作业 (第 4 步)。

2. 作业初始化

当资源管理器收到 submitApplciation() 的请求时, 就将该请求发给调度器 (scheduler), 调度器分配 container, 然后资源管理器在该 container 内启动应用管理器进程, 由节点管理器监控 (第 5 步)。

MapReduce 作业的应用管理器是一个主类为 MRAppMaster 的 Java 应用,其通过创造一些 bookkeeping 对象来监控作业的进度, 得到任务的进度和完成报告 (第 6 步)。然后其通过分布式文件系统得到由客户端计算好的输入 split(第 7 步),然后为每个输入 split 创建一个 map 任务, 根据 mapreduce.job.reduces 创建 reduce 任务对象。

3. 任务分配

如果作业很小, 应用管理器会选择在其自己的 JVM 中运行任务。

如果不是小作业, 那么应用管理器向资源管理器请求 container 来运行所有的 map 和 reduce 任务 (第 8 步)。这些请求是通过心跳来传输的, 包括每个 map 任务的数据位置,比如存放输入 split 的主机名和机架 (rack),调度器利用这些信息来调度任务,尽量将任务分配给存储数据的节点, 或者分配给和存放输入 split 的节点相同机架的节点。

4. 任务运行

当一个任务由资源管理器的调度器分配给一个 container 后,应用管理器通过联系节点管理器来启动 container(第 9 步)。任务由一个主类为 YarnChild 的 Java 应用执行, 在运行任务之前首先本地化任务需要的资源,比如作业配置,JAR 文件, 以及分布式缓存的所有文件 (第 10 步。 最后, 运行 map 或 reduce 任务 (第 11 步)。

YarnChild 运行在一个专用的 JVM 中, 但是 YARN 不支持 JVM 重用。

5. 进度和状态更新

YARN 中的任务将其进度和状态 (包括 counter) 返回给应用管理器, 客户端每秒 (通 mapreduce.client.progressmonitor.pollinterval 设置) 向应用管理器请求进度更新, 展示给用户。

6. 作业完成

除了向应用管理器请求作业进度外, 客户端每 5 分钟都会通过调用 waitForCompletion() 来检查作业是否完成,时间间隔可以通过 mapreduce.client.completion.pollinterval 来设置。作业完成之后, 应用管理器和 container 会清理工作状态, OutputCommiter 的作业清理方法也会被调用。作业的信息会被作业历史服务器存储以备之后用户核查。

五、提交作业到YARN上运行

这里以提交 Hadoop Examples 中计算 Pi 的 MApReduce 程序为例,相关 Jar 包在 Hadoop 安装目录的 share/hadoop/mapreduce 目录下:

# 提交格式: hadoop jar jar包路径 主类名称 主类参数
# hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.15.2.jar pi 3 3

参考资料

  1. 初步掌握 Yarn 的架构及原理
  2. Apache Hadoop 2.9.2 > Apache Hadoop YARN
更多大数据系列文章可以参见 GitHub 开源项目大数据入门指南
查看原文

赞 0 收藏 0 评论 0

黑白影 发布了文章 · 2019-09-11

Hadoop 系列(一)—— 分布式文件系统 HDFS

一、介绍

HDFSHadoop Distributed File System)是 Hadoop 下的分布式文件系统,具有高容错、高吞吐量等特性,可以部署在低成本的硬件上。

二、HDFS 设计原理

https://github.com/heibaiying

2.1 HDFS 架构

HDFS 遵循主/从架构,由单个 NameNode(NN) 和多个 DataNode(DN) 组成:

  • NameNode : 负责执行有关 文件系统命名空间 的操作,例如打开,关闭、重命名文件和目录等。它同时还负责集群元数据的存储,记录着文件中各个数据块的位置信息。
  • DataNode:负责提供来自文件系统客户端的读写请求,执行块的创建,删除等操作。

2.2 文件系统命名空间

HDFS 的 文件系统命名空间 的层次结构与大多数文件系统类似 (如 Linux), 支持目录和文件的创建、移动、删除和重命名等操作,支持配置用户和访问权限,但不支持硬链接和软连接。NameNode 负责维护文件系统名称空间,记录对名称空间或其属性的任何更改。

2.3 数据复制

由于 Hadoop 被设计运行在廉价的机器上,这意味着硬件是不可靠的,为了保证容错性,HDFS 提供了数据复制机制。HDFS 将每一个文件存储为一系列,每个块由多个副本来保证容错,块的大小和复制因子可以自行配置(默认情况下,块大小是 128M,默认复制因子是 3)。

https://github.com/heibaiying

2.4 数据复制的实现原理

大型的 HDFS 实例在通常分布在多个机架的多台服务器上,不同机架上的两台服务器之间通过交换机进行通讯。在大多数情况下,同一机架中的服务器间的网络带宽大于不同机架中的服务器之间的带宽。因此 HDFS 采用机架感知副本放置策略,对于常见情况,当复制因子为 3 时,HDFS 的放置策略是:

在写入程序位于 datanode 上时,就优先将写入文件的一个副本放置在该 datanode 上,否则放在随机 datanode 上。之后在另一个远程机架上的任意一个节点上放置另一个副本,并在该机架上的另一个节点上放置最后一个副本。此策略可以减少机架间的写入流量,从而提高写入性能。

<div align="center"> <img data-original="https://raw.githubusercontent...;/> </div>
如果复制因子大于 3,则随机确定第 4 个和之后副本的放置位置,同时保持每个机架的副本数量低于上限,上限值通常为 (复制系数 - 1)/机架数量 + 2,需要注意的是不允许同一个 dataNode 上具有同一个块的多个副本。

2.5 副本的选择

为了最大限度地减少带宽消耗和读取延迟,HDFS 在执行读取请求时,优先读取距离读取器最近的副本。如果在与读取器节点相同的机架上存在副本,则优先选择该副本。如果 HDFS 群集跨越多个数据中心,则优先选择本地数据中心上的副本。

2.6 架构的稳定性

1. 心跳机制和重新复制

每个 DataNode 定期向 NameNode 发送心跳消息,如果超过指定时间没有收到心跳消息,则将 DataNode 标记为死亡。NameNode 不会将任何新的 IO 请求转发给标记为死亡的 DataNode,也不会再使用这些 DataNode 上的数据。 由于数据不再可用,可能会导致某些块的复制因子小于其指定值,NameNode 会跟踪这些块,并在必要的时候进行重新复制。

2. 数据的完整性

由于存储设备故障等原因,存储在 DataNode 上的数据块也会发生损坏。为了避免读取到已经损坏的数据而导致错误,HDFS 提供了数据完整性校验机制来保证数据的完整性,具体操作如下:

当客户端创建 HDFS 文件时,它会计算文件的每个块的 校验和 ,并将 校验和 存储在同一 HDFS 命名空间下的单独的隐藏文件中。当客户端检索文件内容时,它会验证从每个 DataNode 接收的数据是否与存储在关联校验和文件中的 校验和 匹配。如果匹配失败,则证明数据已经损坏,此时客户端会选择从其他 DataNode 获取该块的其他可用副本。

3.元数据的磁盘故障

FsImageEditLog 是 HDFS 的核心数据,这些数据的意外丢失可能会导致整个 HDFS 服务不可用。为了避免这个问题,可以配置 NameNode 使其支持 FsImageEditLog 多副本同步,这样 FsImageEditLog 的任何改变都会引起每个副本 FsImageEditLog 的同步更新。

4.支持快照

快照支持在特定时刻存储数据副本,在数据意外损坏时,可以通过回滚操作恢复到健康的数据状态。

三、HDFS 的特点

3.1 高容错

由于 HDFS 采用数据的多副本方案,所以部分硬件的损坏不会导致全部数据的丢失。

3.2 高吞吐量

HDFS 设计的重点是支持高吞吐量的数据访问,而不是低延迟的数据访问。

3.3 大文件支持

HDFS 适合于大文件的存储,文档的大小应该是是 GB 到 TB 级别的。

3.3 简单一致性模型

HDFS 更适合于一次写入多次读取 (write-once-read-many) 的访问模型。支持将内容追加到文件末尾,但不支持数据的随机访问,不能从文件任意位置新增数据。

3.4 跨平台移植性

HDFS 具有良好的跨平台移植性,这使得其他大数据计算框架都将其作为数据持久化存储的首选方案。

附:图解HDFS存储原理

说明:以下图片引用自博客:翻译经典 HDFS 原理讲解漫画

1. HDFS写数据原理

https://github.com/heibaiying

https://github.com/heibaiying

https://github.com/heibaiying

2. HDFS读数据原理

https://github.com/heibaiying

3. HDFS故障类型和其检测方法

https://github.com/heibaiying

https://github.com/heibaiying

第二部分:读写故障的处理

https://github.com/heibaiying

第三部分:DataNode 故障处理

https://github.com/heibaiying

副本布局策略

https://github.com/heibaiying

参考资料

  1. Apache Hadoop 2.9.2 > HDFS Architecture
  2. Tom White . hadoop 权威指南 [M] . 清华大学出版社 . 2017.
  3. 翻译经典 HDFS 原理讲解漫画
更多大数据系列文章可以参见 GitHub 开源项目大数据入门指南
查看原文

赞 2 收藏 1 评论 0

黑白影 发布了文章 · 2019-07-04

大数据学习路线

一、大数据处理流程

大数据处理简化流程
上图是一个简化的大数据处理流程图,大数据处理的主要流程包括数据收集、数据存储、数据处理、数据应用等主要环节。下面我们逐一对各个环节所需要的技术栈进行讲解:

1.1 数据收集

大数据处理的第一步是数据的收集。现在的中大型项目通常采用微服务架构进行分布式部署,所以数据的采集需要在多台服务器上进行,且采集过程不能影响正常业务的开展。基于这种需求,就衍生了多种日志收集工具,如 Flume 、Logstash、Kibana 等,它们都能通过简单的配置完成复杂的数据收集和数据聚合。

1.2 数据存储

收集到数据后,下一个问题就是:数据该如何进行存储?通常大家最为熟知是 MySQL、Oracle 等传统的关系型数据库,它们的优点是能够快速存储结构化的数据,并支持随机访问。但大数据的数据结构通常是半结构化(如日志数据)、甚至是非结构化的(如视频、音频数据),为了解决海量半结构化和非结构化数据的存储,衍生了 Hadoop HDFS 、KFS、GFS 等分布式文件系统,它们都能够支持结构化、半结构和非结构化数据的存储,并可以通过增加机器进行横向扩展。

分布式文件系统完美地解决了海量数据存储的问题,但是一个优秀的数据存储系统需要同时考虑数据存储和访问两方面的问题,比如你希望能够对数据进行随机访问,这是传统的关系型数据库所擅长的,但却不是分布式文件系统所擅长的,那么有没有一种存储方案能够同时兼具分布式文件系统和关系型数据库的优点,基于这种需求,就产生了 HBase、MongoDB。

1.3 数据分析

大数据处理最重要的环节就是数据分析,数据分析通常分为两种:批处理和流处理。

  • 批处理:对一段时间内海量的离线数据进行统一的处理,对应的处理框架有 Hadoop MapReduce、Spark、Flink 等;
  • 流处理:对运动中的数据进行处理,即在接收数据的同时就对其进行处理,对应的处理框架有 Storm、Spark Streaming、Flink Streaming等。

批处理和流处理各有其适用的场景,时间不敏感或者硬件资源有限,可以采用批处理;时间敏感和及时性要求高就可以采用流处理。随着服务器硬件的价格越来越低和大家对及时性的要求越来越高,流处理越来越普遍,如股票价格预测和电商运营数据分析等。

上面的框架都是需要通过编程来进行数据分析,那么如果你不是一个后台工程师,是不是就不能进行数据的分析了?当然不是,大数据是一个非常完善的生态圈,有需求就有解决方案。为了能够让熟悉 SQL 的人员也能够进行数据的分析,查询分析框架应运而生,常用的有 Hive 、Spark SQL 、Flink SQL、 Pig、Phoenix 等。这些框架都能够使用标准的 SQL 或者 类SQL 语法灵活地进行数据的查询分析。这些 SQL 经过解析优化后转换为对应的作业程序来运行,如 Hive 本质上就是将 SQL 转换为 MapReduce 作业,Spark SQL 将 SQL 转换为一系列的 RDDs 和转换关系(transformations),Phoenix 将 SQL 查询转换为一个或多个HBase Scan。

1.4 数据应用

数据分析完成后,接下来就是数据应用的范畴,这取决于你实际的业务需求。比如你可以将数据进行可视化展现,或者将数据用于优化你的推荐算法,这种运用现在很普遍,比如短视频个性化推荐、电商商品推荐、头条新闻推荐等。当然你也可以将数据用于训练你的机器学习模型,这些都属于其他领域的范畴,都有着对应的框架和技术栈进行处理,这里就不一一赘述。

1.5 其他框架

上面是一个标准的大数据处理流程所用到的技术框架。但是实际的大数据处理流程比上面复杂很多,针对大数据处理中的各种复杂问题分别衍生了各类框架:

  • 单机的处理能力都是存在瓶颈的,所以大数据框架都是采用集群模式进行部署,为了更方便的进行集群的部署、监控和管理,衍生了 Ambari、Cloudera Manager 等集群管理工具;
  • 想要保证集群高可用,需要用到 ZooKeeper ,ZooKeeper 是最常用的分布式协调服务,它能够解决大多数集群问题,包括首领选举、失败恢复、元数据存储及其一致性保证。同时针对集群资源管理的需求,又衍生了 Hadoop YARN ;
  • 复杂大数据处理的另外一个显著的问题是,如何调度多个复杂的并且彼此之间存在依赖关系的作业?基于这种需求,产生了 Azkaban 和 Oozie 等工作流调度框架;
  • 大数据流处理中使用的比较多的另外一个框架是 Kafka,它可以用于消峰,避免在秒杀等场景下并发数据对流处理程序造成冲击;
  • 另一个常用的框架是 Sqoop ,主要是解决了数据迁移的问题,它能够通过简单的命令将关系型数据库中的数据导入到 HDFS 、Hive 或 HBase 中,或者从 HDFS 、Hive 导出到关系型数据库上。

二、学习路线

介绍完大数据框架,接着就可以介绍其对应的学习路线了,主要分为以下几个方面:

2.1 语言基础

1. Java

大数据框架大多采用 Java 语言进行开发,并且几乎全部的框架都会提供 Java API 。Java 是目前比较主流的后台开发语言,所以网上免费的学习资源也比较多。如果你习惯通过书本进行学习,这里推荐以下入门书籍:

  • 《Java编程的逻辑》:这里一本国人编写的系统入门 Java 的书籍,深入浅出,内容全面;
  • 《Java核心技术》:目前最新的是第10版,有卷一卷二两册,卷二可以选择性阅读,因为其中很多章节的内容在实际开发中很少用到。

目前大多数框架要求 Java 版本至少是 1.8,这是由于 Java 1.8 提供了函数式编程,使得可以用更精简的代码来实现之前同样的功能,比如你调用 Spark API,使用 1.8 可能比 1.7 少数倍的代码,所以这里额外推荐阅读 《Java 8实战》 这本书籍。

2. Scala

Scala 是一门综合了面向对象和函数式编程概念的静态类型的编程语言,它运行在 Java虚拟机上,可以与所有的Java类库无缝协作,著名的 Kafka 就是采用 Scala 语言进行开发的。

为什么需要学习 Scala 语言 ? 这是因为当前最火的计算框架 Flink 和 Spark 都提供了 Scala 语言的接口,使用它进行开发,比使用 Java 8 所需要的代码更少,且 Spark 就是使用 Scala 语言进行编写的,学习 Scala 可以帮助你更深入的理解 Spark。同样的,对于习惯书本学习的小伙伴,这里推荐两本入门书籍:

这里说明一下,如果你的时间有限,不一定要学完 Scala 才去学习大数据框架。Scala 确实足够的精简和灵活,但其在语言复杂度上略大于 Java,例如隐式转换和隐式参数等概念在初次涉及时会比较难以理解,所以你可以在了解 Spark 后再去学习 Scala,因为类似隐式转换等概念在 Spark 源码中有大量的运用。

2.2 Linux 基础

通常大数据框架都部署在 Linux 服务器上,所以需要具备一定的 Linux 知识。Linux 书籍当中比较著名的是 《鸟哥私房菜》系列,这个系列很全面也很经典。但如果你希望能够快速地入门,这里推荐《Linux就该这么学》,其网站上有免费的电子书版本。

2.3 构建工具

这里需要掌握的自动化构建工具主要是 Maven。Maven 在大数据场景中使用比较普遍,主要在以下三个方面:

  • 管理项目 JAR 包,帮助你快速构建大数据应用程序;
  • 不论你的项目是使用 Java 语言还是 Scala 语言进行开发,提交到集群环境运行时,都需要使用 Maven 进行编译打包;
  • 大部分大数据框架使用 Maven 进行源码管理,当你需要从其源码编译出安装包时,就需要使用到 Maven。

2.4 框架学习

1. 框架分类

上面我们介绍了很多大数据框架,这里进行一下分类总结:

日志收集框架:Flume 、Logstash、Kibana

分布式文件存储系统:Hadoop HDFS

数据库系统:Mongodb、HBase

分布式计算框架

  • 批处理框架:Hadoop MapReduce
  • 流处理框架:Storm
  • 混合处理框架:Spark、Flink

查询分析框架:Hive 、Spark SQL 、Flink SQL、 Pig、Phoenix

集群资源管理器:Hadoop YARN

分布式协调服务:Zookeeper

数据迁移工具:Sqoop

任务调度框架:Azkaban、Oozie

集群部署和监控:Ambari、Cloudera Manager

上面列出的都是比较主流的大数据框架,社区都很活跃,学习资源也比较丰富。建议从 Hadoop 开始入门学习,因为它是整个大数据生态圈的基石,其它框架都直接或者间接依赖于 Hadoop 。接着就可以学习计算框架,Spark 和 Flink 都是比较主流的混合处理框架,Spark 出现得较早,所以其应用也比较广泛。 Flink 是当下最火热的新一代的混合处理框架,其凭借众多优异的特性得到了众多公司的青睐。两者可以按照你个人喜好或者实际工作需要进行学习。

大数据生态圈

图片引用自https://www.edureka.co/blog/h...

至于其它框架,在学习上并没有特定的先后顺序,如果你的学习时间有限,建议初次学习时候,同一类型的框架掌握一种即可,比如日志收集框架就有很多种,初次学习时候只需要掌握一种,能够完成日志收集的任务即可,之后工作上有需要可以再进行针对性地学习。

2. 学习资料

大数据最权威和最全面的学习资料就是官方文档。热门的大数据框架社区都比较活跃、版本更新迭代也比较快,所以其出版物都明显滞后于其实际版本,基于这个原因采用书本学习不是一个最好的方案。比较庆幸的是,大数据框架的官方文档都写的比较好,内容完善,重点突出,同时都采用了大量配图进行辅助讲解。当然也有一些优秀的书籍历经时间的检验,至今依然很经典,这里列出部分个人阅读过的经典书籍:

3. 视频学习资料

上面我推荐的都是书籍学习资料,很少推荐视频学习资料,这里说明一下原因:因为书籍历经时间的考验,能够再版的或者豆瓣等平台评价高的证明都是被大众所认可的,从概率的角度上来说,其必然更加优秀,不容易浪费大家的学习时间和精力,所以我个人更倾向于官方文档或者书本的学习方式,而不是视频。因为视频学习资料,缺少一个公共的评价平台和完善的评价机制,所以其质量良莠不齐。但是视频任然有其不可替代的好处,学习起来更直观、印象也更深刻,所以对于习惯视频学习的小伙伴,这里我各推荐一个免费的和付费的视频学习资源,大家按需选择:

最后额外安利一下个人 博客 和 GitHub 项目 大数据入门指南,上面也有大数据的系列文章。

三、开发工具

这里推荐一些大数据常用的开发工具:

Java IDE:IDEA 和 Eclipse 都可以。从个人使用习惯而言,更倾向于 IDEA ;

VirtualBox:在学习过程中,你可能经常要在虚拟机上搭建服务和集群。VirtualBox 是一款开源、免费的虚拟机管理软件,虽然是轻量级软件,但功能很丰富,基本能够满足日常的使用需求;

MobaXterm:大数据的框架通常都部署在服务器上,这里推荐使用 MobaXterm 进行连接。同样是免费开源的,支持多种连接协议,支持拖拽上传文件,支持使用插件扩展;

Translate Man:一款浏览器上免费的翻译插件(谷歌和火狐均支持)。它采用谷歌的翻译接口,准确性非常高,支持划词翻译,可以辅助进行官方文档的阅读。

四、结语

以上就是个人关于大数据的学习心得和路线推荐。本片文章对大数据技术栈做了比较狭义的限定,随着学习的深入,大家也可以把 Python 语言、推荐系统、机器学习等逐步加入到自己的大数据技术栈中。

查看原文

赞 6 收藏 5 评论 0

黑白影 发布了文章 · 2019-06-18

大数据入门指南(GitHub开源项目)

项目GitHub地址:https://github.com/heibaiying...

前 言

  1. 大数据技术栈思维导图
  2. 大数据常用软件安装指南

一、Hadoop

  1. 分布式文件存储系统——HDFS
  2. 分布式计算框架——MapReduce
  3. 集群资源管理器——YARN
  4. Hadoop单机伪集群环境搭建
  5. Hadoop集群环境搭建
  6. HDFS常用Shell命令
  7. HDFS Java API的使用
  8. 基于Zookeeper搭建Hadoop高可用集群

二、Hive

  1. Hive简介及核心概念
  2. Linux环境下Hive的安装部署
  3. Hive CLI和Beeline命令行的基本使用
  4. Hive 常用DDL操作
  5. Hive 分区表和分桶表
  6. Hive 视图和索引
  7. Hive常用DML操作
  8. Hive 数据查询详解

三、Spark

Spark Core :

  1. Spark简介
  2. Spark开发环境搭建
  3. 弹性式数据集RDD
  4. RDD常用算子详解
  5. Spark运行模式与作业提交
  6. Spark累加器与广播变量
  7. 基于Zookeeper搭建Spark高可用集群

Spark SQL :

  1. DateFrame 和 DataSet
  2. Structured API的基本使用
  3. Spark SQL外部数据源
  4. Spark SQL常用聚合函数
  5. Spark SQL JOIN 操作

Spark Streaming :

  1. Spark Streaming 简介
  2. Spark Streaming 基本操作
  3. Spark Streaming 整合 Flume
  4. Spark Streaming 整合 Kafka

四、Storm

  1. Storm和流处理简介
  2. Storm核心概念详解
  3. Storm单机环境搭建
  4. Storm集群环境搭建
  5. Storm编程模型详解
  6. Storm项目三种打包方式对比分析
  7. Storm集成Redis详解
  8. Storm集成HDFS/HBase
  9. Storm集成Kafka

五、Flink

TODO

六、HBase

  1. Hbase 简介
  2. HBase系统架构及数据结构
  3. HBase基本环境搭建(Standalone /pseudo-distributed mode)
  4. HBase集群环境搭建
  5. HBase常用Shell命令
  6. HBase Java API
  7. Hbase 过滤器详解
  8. HBase 协处理器详解
  9. HBase 容灾与备份
  10. HBase的SQL中间层——Phoenix
  11. Spring/Spring Boot 整合 Mybatis + Phoenix

七、Kafka

  1. Kafka 简介
  2. 基于Zookeeper搭建Kafka高可用集群
  3. Kafka 生产者详解
  4. Kafka 消费者详解
  5. 深入理解Kafka副本机制

八、Zookeeper

  1. Zookeeper 简介及核心概念
  2. Zookeeper单机环境和集群环境搭建
  3. Zookeeper常用Shell命令
  4. Zookeeper Java 客户端——Apache Curator
  5. Zookeeper ACL权限控制

九、Flume

  1. Flume简介及基本使用
  2. Linux环境下Flume的安装部署
  3. Flume整合Kafka

十、Sqoop

  1. Sqoop简介与安装
  2. Sqoop的基本使用

十一、Azkaban

  1. Azkaban简介
  2. Azkaban3.x 编译及部署
  3. Azkaban Flow 1.0 的使用
  4. Azkaban Flow 2.0 的使用

十二、Scala

  1. Scala简介及开发环境配置
  2. 基本数据类型和运算符
  3. 流程控制语句
  4. 数组——Array
  5. 集合类型综述
  6. 常用集合类型之——List & Set
  7. 常用集合类型之——Map & Tuple
  8. 类和对象
  9. 继承和特质
  10. 函数 & 闭包 & 柯里化
  11. 模式匹配
  12. 类型参数
  13. 隐式转换和隐式参数

十三、公共内容

  1. 大数据应用常用打包方式

后 记

大数据技术栈思维导图

项目GitHub地址:https://github.com/heibaiying...
查看原文

赞 3 收藏 3 评论 0

认证与成就

  • 获得 11 次点赞
  • 获得 0 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 0 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2019-06-18
个人主页被 622 人浏览