MosesDon

MosesDon 查看完整档案

填写现居城市南京信息工程大学滨江学院  |  电子信息工程 编辑  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 个人简介什么都没有

个人动态

MosesDon 发布了文章 · 1月29日

量化初识

赞 0 收藏 0 评论 0

MosesDon 发布了文章 · 2020-12-29

sql优化

赞 0 收藏 0 评论 0

MosesDon 发布了文章 · 2020-12-22

Flume入门案例

Flume入门案例

1 监控端口数据官方案例

1.1 案例需求:

使用Flume监听一个端口,收集该端口数据,并打印到控制台。

1.2 需求分析:

监控端口数据案例.png

1.3 实现步骤:

1.3.1 安装netcat工具
sudo yum install -y nc
1.3.2 判断44444端口是否被占用
sudo netstat -tunlp | grep 44444
1.3.3 创建Flume Agent配置文件flume-netcat-logger.conf

在flume目录下创建job文件夹并进入job文件夹。

mkdir job
cd job/

在job文件夹下创建Flume Agent配置文件flume-netcat-logger.conf。

vim flume-netcat-logger.conf

在flume-netcat-logger.conf文件中添加如下内容。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置文件来源于官方手册http://flume.apache.org/FlumeUserGuide.html
配置文件解析

监控端口数据-配置文件解析.png

1.3.4 先开启flume监听端口

第一种写法:

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

第二种写法:

bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

参数说明:

--conf/-c:表示配置文件存储在conf/目录
--name/-n:表示给agent起名为a1
--conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件。
-Dflume.root.logger=INFO,console :-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。
1.3.5 使用netcat工具向本机的44444端口发送内容
nc localhost 44444
hello 
atguigu
1.3.6 在Flume监听页面观察接收数据情况

监听页面观察接收数据情况.png

2 实时监控单个文件

2.1 案例需求:

实时监控Hive日志,并上传到HDFS中

2.2 需求分析:

实时读取本地文件到hdfs
实时读取本地文件到hdfs.png

2.3 实现步骤:

2.3.1 Flume要想将数据输出到HDFS,必须持有Hadoop相关jar包


commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
拷贝到/opt/module/flume/lib文件夹下(flume lib库下)。

2.3.2 创建flume-file-hdfs.conf文件

创建文件

vim flume-file-hdfs.conf

注:要想读取Linux系统中的文件,就得按照Linux命令的规则执行命令。由于Hive日志在Linux系统中所以读取文件的类型选择:exec即execute执行的意思。表示执行Linux命令来读取文件。
添加如下内容

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 100
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 10000
a2.channels.c2.transactionCapacity = 1000

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

注意:
对于所有与时间相关的转义序列,Event Header中必须存在以 “timestamp”的key(除非hdfs.useLocalTimeStamp设置为true,此方法会使用TimestampInterceptor自动添加timestamp)。

a2.sinks.k2.hdfs.useLocalTimeStamp = true
配置文件解析

实时读取本地文件到hdfs-配置文件解析.png

2.3.3 运行Flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
2.3.4 开启Hadoop和Hive并操作Hive产生日志
sbin/start-dfs.sh
sbin/start-yarn.sh

bin/hive
hive (default)>
2.3.5 在HDFS上查看文件。

查看实时读取本地文件到hdfs文件.png

2.4 Execsouce 总结

execsource和异步的source一样,无法在source向channel中放入event故障时,及时通知客户端,暂停生成数据!
容易造成数据丢失!

解决方案:
  • 需要在发生故障时,及时通知客户端!
  • 如果客户端无法暂停,必须有一个数据的缓存机制!
  • 如果希望数据有强的可靠性保证,可以考虑使用SpoolingDirSource或TailDirSource或自己写Source自己控制!

3 监控多个新文件

3.1 案例需求:

使用Flume监听整个目录的文件,并上传至HDFS

3.2 需求分析:

监控多个新文件.png

3.3 实现步骤:

3.3.1 创建配置文件flume-dir-hdfs.conf

创建一个文件

vim flume-dir-hdfs.conf

添加如下内容

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = \\S*\\.tmp

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 1000

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

监控多个新文件-配置文件解析
监控多个新文件-配置文件解析.png

3.3.2 启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

说明:在使用Spooling Directory Source时
不要在监控目录中创建并持续修改文件
上传完成的文件会以.COMPLETED结尾
被监控文件夹每500毫秒扫描一次文件变动

3.3.3 向upload文件夹中添加文件

在/opt/module/flume目录下创建upload文件夹

[atguigu@hadoop102 flume]$ mkdir upload

向upload文件夹中添加文件

touch atguigu.txt
touch atguigu.tmp
touch atguigu.log
3.3.4 查看HDFS上的数据

image.png

3.3.5 等待1s,再次查询upload文件夹
[atguigu@hadoop102 upload]$ ll
总用量 0
-rw-rw-r--. 1 atguigu atguigu 0 5月  20 22:31 atguigu.log.COMPLETED
-rw-rw-r--. 1 atguigu atguigu 0 5月  20 22:31 atguigu.tmp
-rw-rw-r--. 1 atguigu atguigu 0 5月  20 22:31 atguigu.txt.COMPLETED

3.4 SpoolingDirSource 总结

  • SpoolingDirSource指定本地磁盘的一个目录为"Spooling(自动收集)"的目录!这个source可以读取目录中新增的文件,将文件的内容封装为event!
  • SpoolingDirSource在读取一整个文件到channel之后,它会采取策略,要么删除文件(是否可以删除取决于配置),要么对文件进程一个完成状态的重命名,这样可以保证source持续监控新的文件!

  • SpoolingDirSource和execsource不同
    SpoolingDirSource是可靠的!即使flume被杀死或重启,依然不丢数据!但是为了保证这个特性,付出的代价是,一旦flume发现以下情况,flume就会报错,停止!

    1. 一个文件已经被放入目录,在采集文件时,不能被修改
    2. 文件的名在放入目录后又被重新使用(出现了重名的文件)

要求: 必须已经封闭的文件才能放入到SpoolingDirSource,在同一个SpoolingDirSource中都不能出现重名的文件!

使用:

必需配置

type    –    The component type name, needs to be spooldir.
spoolDir    –    The directory from which to read files from.

4 实时监控多个文件

Exec source适用于监控一个实时追加的文件,但不能保证数据不丢失;Spooldir Source能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;而Taildir Source既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控。

4.1 案例需求:

使用Flume监听整个目录的实时追加文件,并上传至HDFS

4.2 需求分析:

实时监控多个文件.png

4.3 实现步骤:

4.3.1 创建配置文件flume-taildir-hdfs.conf

创建一个文件

[atguigu@hadoop102 job]$ vim flume-taildir-hdfs.conf

添加如下内容

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/file.*

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 1000

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
实时监控多个文件-配置文件解析

实时监控多个文件-配置文件解析.png

4.3.2 启动监控文件夹命令
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
4.3.3 向files文件夹中追加内容

在/opt/module/flume目录下创建files文件夹

[atguigu@hadoop102 flume]$ mkdir files

向upload文件夹中添加文件

[atguigu@hadoop102 files]$ echo hello >> file1.txt
[atguigu@hadoop102 files]$ echo atguigu >> file2.txt
4.3.4 查看HDFS上的数据

image.png

Taildir说明:

Taildir Source维护了一个json格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File的格式如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}

注:Linux中储存文件元数据的区域就叫做inode,每个inode都有一个号码,操作系统用inode号码来识别不同的文件,Unix/Linux系统内部不使用文件名,而使用inode号码来识别文件。

4.4 TailDirSource 总结

flume ng 1.7版本后提供!

常见问题:

TailDirSource采集的文件,不能随意重命名!如果日志在正在写入时,名称为 xxxx.tmp,写入完成后,滚动,改名为xxx.log,此时一旦匹配规则可以匹配上述名称,就会发生数据的重复采集!

  • Taildir Source 可以读取多个文件最新追加写入的内容!
  • Taildir Source是可靠的,即使flume出现了故障或挂掉。Taildir Source在工作时,会将读取文件的最后的位置记录在一个json文件中,一旦agent重启,会从之前已经记录的位置,继续执行tail操作!
  • Json文件中,位置是可以修改,修改后,Taildir Source会从修改的位置进行tail操作!如果JSON文件丢失了,此时会重新从每个文件的第一行,重新读取,这会造成数据的重复!
  • Taildir Source目前只能读文本文件!
必需配置:
channels    –     
type    –    The component type name, needs to be TAILDIR.
filegroups    –    Space-separated list of file groups. Each file group indicates a set of files to be tailed.
filegroups.<filegroupName>    –    Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.
查看原文

赞 0 收藏 0 评论 0

MosesDon 发布了文章 · 2020-12-15

Flume拓扑结构--Flink进阶认识

Flume拓扑结构

1 简单串联

Flume拓扑结构-简单串联.png
Flume Agent连接
此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。

2 复制和多路复用

Flume拓扑结构-复制和多路复用.png
单source,多channel、sink
Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。

3 负载均衡和故障转移

Flume拓扑结构-负载均衡和故障转移.png
Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。

4 聚合

Flume拓扑结构-聚合.png
这种模式是我们最常见的,也非常实用,日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的flume,再由此flume上传到hdfs、hive、hbase等,进行日志分析。

查看原文

赞 0 收藏 0 评论 0

MosesDon 发布了文章 · 2020-12-15

Flume Agent内部原理-Flink进阶认识

Flume Agent内部原理

Flume Agent内部原理.png

重要组件:

1 ChannelSelector

ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。
ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。

2 SinkProcessor

SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以实现故障转移的功能。

查看原文

赞 0 收藏 0 评论 0

MosesDon 发布了文章 · 2020-12-15

flume初识-基本概念

1 定义

flume定义.png

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

2 flume基础框架

flume基础框架.png

2.1 Agent

Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。

2.2 Source

Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

2.3 Sink

Sink不断轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。

Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

2.4 Channel

Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。

Flume自带两种Channel:Memory Channel和File Channel。

Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。

File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

2.5 Event

传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由HeaderBody两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。
Event.png

2.6 Interceptors

在Flume中允许使用拦截器对传输中的event进行拦截和处理!拦截器必须实现org.apache.flume.interceptor.Interceptor接口。拦截器可以根据开发者的设定修改甚至删除event!Flume同时支持拦截器链,即由多个拦截器组合而成!通过指定拦截器链中拦截器的顺序,event将按照顺序依次被拦截器进行处理!

2.7 Channel Selectors

Channel Selectors用于source组件将event传输给多个channel的场景。常用的有replicating(默认)和multiplexing两种类型。replicating负责将event复制到多个channel,而multiplexing则根据event的属性和配置的参数进行匹配,匹配成功则发送到指定的channel!

2.8 Sink Processors

用户可以将多个sink组成一个整体(sink组),Sink Processors可用于提供组内的所有sink的负载平衡功能,或在时间故障的情况下实现从一个sink到另一个sink的故障转移。

查看原文

赞 0 收藏 0 评论 0

MosesDon 发布了文章 · 2020-12-15

flume初识-配置conf文件

以hdfs为例

配置文件来源于官方手册

1 创建flume-taildir-hdfs.conf文件

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/file.*

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 1000

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

2 启动监控文件夹命令

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf

3 向files文件夹中追加内容

向upload文件夹中添加文件

echo hello >> file1.txt
echo atguigu >> file2.txt

Taildir说明:

Taildir Source维护了一个json格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File的格式如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}

注:Linux中储存文件元数据的区域就叫做inode,每个inode都有一个号码,操作系统用inode号码来识别不同的文件,Unix/Linux系统内部不使用文件名,而使用inode号码来识别文件。

查看原文

赞 0 收藏 0 评论 0

MosesDon 发布了文章 · 2020-12-13

hive压缩和列式存储

Hadoop压缩配置

MR支持的压缩编码

压缩格式工具算法文件扩展名是否可切分对应的编码/解码器压缩算法原始文件大小压缩文件大小压缩速度
DEFAULTDEFAULT.deflateorg.apache.hadoop.io.compress.DefaultCodec
GzipgzipDEFAULT.gzorg.apache.hadoop.io.compress.GzipCodecgzip8.3GB1.8GB17.5MB/s
bzip2bzip2bzip2.bz2org.apache.hadoop.io.compress.BZip2Codecbzip28.3GB1.1GB2.4MB/s
LZOlzopLZO.lzocom.hadoop.compression.lzo.LzopCodecLZO8.3GB2.9GB49.3MB/s
SnappySnappy.snappyorg.apache.hadoop.io.compress.SnappyCodec

压缩参数配置

要在Hadoop中启用压缩,可以配置如下参数(mapred-site.xml文件中):

参数默认值阶段建议
io.compression.codecs (在core-site.xml中配置)org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.Lz4Codec输入压缩Hadoop使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compressfalsemapper输出这个参数设为true启用压缩
mapreduce.map.output.compress.codecorg.apache.hadoop.io.compress.DefaultCodecmapper输出使用LZO、LZ4或snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compressfalsereducer输出这个参数设为true启用压缩
mapreduce.output.fileoutputformat.compress.codecorg.apache.hadoop.io.compress. DefaultCodecreducer输出使用标准工具或者编解码器,如gzip和bzip2
mapreduce.output.fileoutputformat.compress.typeRECORDreducer输出SequenceFile输出使用的压缩类型:NONE和BLOCK

开启Map输出阶段压缩

开启map输出阶段压缩可以减少job中map和Reduce task间数据传输量。具体配置如下:

案例实操:

1.开启hive中间传输数据压缩功能
hive (default)>set hive.exec.compress.intermediate=true;
2.开启mapreduce中map输出压缩功能
hive (default)>set mapreduce.map.output.compress=true;
3.设置mapreduce中map输出数据的压缩方式
hive (default)>set mapreduce.map.output.compress.codec=
 org.apache.hadoop.io.compress.SnappyCodec;
4.执行查询语句
hive (default)> select count(ename) name from emp;

开启Reduce输出阶段压缩

当Hive将输出写入到表中时,输出内容同样可以进行压缩。属性hive.exec.compress.output控制着这个功能。用户可能需要保持默认设置文件中的默认值false,这样默认的输出就是非压缩的纯文本文件了。用户可以通过在查询语句或执行脚本中设置这个值为true,来开启输出结果压缩功能。

案例实操:

1.开启hive最终输出数据压缩功能

hive (default)>set hive.exec.compress.output=true;

2.开启mapreduce最终输出数据压缩

hive (default)>set mapreduce.output.fileoutputformat.compress=true;

3.设置mapreduce最终数据输出压缩方式

hive (default)> set mapreduce.output.fileoutputformat.compress.codec =

org.apache.hadoop.io.compress.SnappyCodec;

4.设置mapreduce最终数据输出压缩为块压缩

hive (default)> set mapreduce.output.fileoutputformat.compress.type=BLOCK;

5.测试一下输出结果是否是压缩文件

hive (default)> insert overwrite local directory

'/opt/module/datas/distribute-result' select * from emp distribute by deptno sort by empno desc;

文件存储格式


Hive支持的存储数的格式主要有:TEXTFILE 、SEQUENCEFILE、ORC、PARQUET。

列式存储和行式存储

image.png
列式存储和行式存储
左边为逻辑表,右边第一个为行式存储,第二个为列式存储。

1.行存储的特点

查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。

2.列存储的特点

因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。

TEXTFILE和SEQUENCEFILE的存储格式都是基于行存储的;

ORC和PARQUET是基于列式存储的。

TextFile格式

默认格式,数据不做压缩,磁盘开销大,数据解析开销大。可结合Gzip、Bzip2使用,但使用Gzip这种方式,hive不会对数据进行切分,从而无法对数据进行并行操作。

Orc格式

Orc (Optimized Row Columnar)是Hive 0.11版里引入的新的存储格式。

如图6-11所示可以看到每个Orc文件由1个或多个stripe组成,每个stripe250MB大小,这个Stripe实际相当于RowGroup概念,不过大小由4MB->250MB,这样应该能提升顺序读的吞吐率。每个Stripe里有三部分组成,分别是Index Data,Row Data,Stripe Footer:
image.png
Orc格式

  • Index Data:一个轻量级的index,默认是每隔1W行做一个索引。这里做的索引应该只是记录某行的各字段在Row Data中的offset。
  • Row Data:存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个Stream来存储。
  • Stripe Footer:存的是各个Stream的类型,长度等信息。

每个文件有一个File Footer,这里面存的是每个Stripe的行数,每个Column的数据类型信息等;每个文件的尾部是一个PostScript,这里面记录了整个文件的压缩类型以及FileFooter的长度信息等。在读取文件时,会seek到文件尾部读PostScript,从里面解析到File Footer长度,再读FileFooter,从里面解析到各个Stripe信息,再读各个Stripe,即从后往前读。

Parquet格式

Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,2015年5月从Apache的孵化器里毕业成为Apache顶级项目。
Parquet文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此Parquet格式文件是自解析的。
通常情况下,在存储Parquet数据的时候会按照Block大小设置行组的大小,由于一般情况下每一个Mapper任务处理数据的最小单位是一个Block,这样可以把每一个行组由一个Mapper任务处理,增大任务执行并行度。Parquet文件的格式如图所示。

image.png

Parquet格式

上图展示了一个Parquet文件的内容,一个文件中可以存储多个行组,文件的首位都是该文件的Magic Code,用于校验它是否是一个Parquet文件,Footer length记录了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和该文件存储数据的Schema信息。除了文件中每一个行组的元数据,每一页的开始都会存储该页的元数据,在Parquet中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引,目前Parquet中还不支持索引页。

存储文件的压缩比总结:

从存储文件的压缩比和查询速度两个角度对比。
ORC >  Parquet >  textFile
存储文件的查询速度总结:查询速度相近。

存储和压缩结合


修改Hadoop集群具有Snappy压缩方式

修改Hadoop集群具有Snappy压缩方式

存储方式和压缩总结

在实际的项目开发当中,hive表的数据存储格式一般选择:orcparquet。压缩方式一般选择snappylzo

目前企业中主流组合用法

存储:

hive支持 TEXTFILE,SequenceFile,ORC,Parquet
行式存储: TEXTFILE,SequenceFile
列式存储: ORC,Parquet 都是行列结合存储!
将一部分行(stripe或行组)按照列式存储!

ORC:

hive独有。性能略好!

Parquet:

希望整个hadoop生态圈都可以支持Parquet格式!实用性更广!

主流

使用ORC+snappy组合!
使用Parquet+LZO组合!

压缩:

hive基于hadoop,hive支持的压缩格式也是hadoop支持的压缩格式

查看原文

赞 0 收藏 0 评论 0

MosesDon 发布了文章 · 2020-12-13

修改Hadoop集群具有Snappy压缩方式

1.查看hadoop checknative命令使用

[atguigu@hadoop104 hadoop-2.7.2]$ hadoop

  checknative [-a|-h]  check native hadoop and compression libraries availability

2.查看hadoop支持的压缩方式

[atguigu@hadoop104 hadoop-2.7.2]$ hadoop checknative

17/12/24 20:32:52 WARN bzip2.Bzip2Factory: Failed to load/initialize native-bzip2 library system-native, will use pure-Java version

17/12/24 20:32:52 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library

Native library checking:

hadoop:  true /opt/module/hadoop-2.7.2/lib/native/libhadoop.so

zlib:    true /lib64/libz.so.1

snappy:  false

lz4:     true revision:99

bzip2:   false

3.将编译好的支持Snappy压缩的hadoop-2.7.2.tar.gz包导入到

hadoop102的/opt/software中

4.解压hadoop-2.7.2.tar.gz到当前路径

[atguigu@hadoop102 software]$ tar -zxvf hadoop-2.7.2.tar.gz

5.进入到/opt/software/hadoop-2.7.2/lib/native路径可以看到支持Snappy压缩的动态链接库

[atguigu@hadoop102 native]$ pwd

/opt/software/hadoop-2.7.2/lib/native

[atguigu@hadoop102 native]$ ll

-rw-r--r--. 1 atguigu atguigu  472950 9月 1 10:19 libsnappy.a

-rwxr-xr-x. 1 atguigu atguigu     955 9月 1 10:19 libsnappy.la

lrwxrwxrwx. 1 atguigu atguigu      18 12月 24 20:39 libsnappy.so -> libsnappy.so.1.3.0

lrwxrwxrwx. 1 atguigu atguigu      18 12月 24 20:39 libsnappy.so.1 -> libsnappy.so.1.3.0

-rwxr-xr-x. 1 atguigu atguigu  228177 9月 1 10:19 libsnappy.so.1.3.0

6.拷贝/opt/software/hadoop-2.7.2/lib/native里面的所有内容到开发集群的/opt/module/hadoop-2.7.2/lib/native路径上

[atguigu@hadoop102 native]$ cp ../native/* /opt/module/hadoop-2.7.2/lib/native/

7.分发集群

[atguigu@hadoop102 lib]$ xsync native/

8.再次查看hadoop支持的压缩类型

[atguigu@hadoop102 hadoop-2.7.2]$ hadoop checknative

17/12/24 20:45:02 WARN bzip2.Bzip2Factory: Failed to load/initialize native-bzip2 library system-native, will use pure-Java version

17/12/24 20:45:02 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library

Native library checking:

hadoop:  true /opt/module/hadoop-2.7.2/lib/native/libhadoop.so

zlib:    true /lib64/libz.so.1

snappy:  true /opt/module/hadoop-2.7.2/lib/native/libsnappy.so.1

lz4:     true revision:99

bzip2:   false

9.重新启动hadoop集群和hive

查看原文

赞 0 收藏 0 评论 0

MosesDon 发布了文章 · 2020-12-12

sql执行初识

执行顺序

select * from 表1 join 表2 on xxx where xxx group by xxx having xxx order by limit xxx

  1. 先关联表 join
  2. where
  3. group by
  4. having
  5. select
  6. order by
  7. limit
查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 1 次点赞
  • 获得 1 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2020-05-06
个人主页被 1.7k 人浏览