杨闯

杨闯 查看完整档案

深圳编辑南方科技大学  |  计算机科学与技术 编辑  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 该用户太懒什么也没留下

个人动态

杨闯 发布了文章 · 2018-03-30

Spark DataFrame 使用UDF实现UDAF的一种方法

Background:
当我们使用Spark Dataframe的时候常常需要进行group by操作,然后针对这一个group算出一个结果来。即所谓的聚合操作

然而

Spark提供的aggregation函数太少,常常不能满足我们的需要,怎么办呢?

Spark 贴心的提供了UDAF(User-defined aggregate function),听起来不错。
但是,这个函数实现起来太复杂,反正我是看的晕晕乎乎,难受的很。反倒是UDF的实现非常简单,无非是UDF针对所有行,UDAF针对一个group中的所有行。

So,两者在某种程度上是一样的。

下面我们就看看如何用UDF实现UDAF的功能

举个例子来说明问题:
我们有一个dataframe是长这样的:

+-------+-------+-------+
|groupid|column1|column2|
+-------+-------+-------+
|   1   |  1    |   7   |
|   1   |  12   |   9   |
|   1   |  30   |   8   |
|   1   |  18   |   1   |
|   1   |  19   |   13  |
|   1   |  15   |   20  |
|   2   |  41   |   2   |
|   2   |  50   |   19  |
|   2   |  16   |   11  |
|   2   |  27   |   5   |
|   3   |  83   |   6   |
|   3   |  91   |   15  |
|   3   |  10   |   8   |

我们想对它group by id,然后对每一个group里的内容进行自定义操作。
比如寻找某一列第三大的数、通过某两列的数据计算出一个参数等等很多user-define的操作。

抽象的步骤看这里:

STEP.1. 对想要操作的列执行collect_list(),生成新列,此时一个group就是一行。
        +-------+--------------------------+-----------------------+
        |groupid|        column1           |        column2        |
        +-------+--------------------------+-----------------------+
        |   1   |  [1,12,30,18,19,15]  | [7,9,8,1,13,20]   |
        |   2   |      [41,50,16,27]       |      [2,19,11,5]      | 
        |   3   |        [83,91,10]        |      [6,15,8]         |
STEP.2.写一个UDF,传入参数为上边生成的列,相当于传入了一个或多个数组。
 import org.apache.spark.sql.functions._
    def createNewCol = udf((column1: collection.mutable.WrappedArray[Int], column2: collection.mutable.WrappedArray[Int]) => {  // udf function
      var balabala  //各种要用到的自定义变量 
      var resultArray = Array.empty[(Int, Int, Int)]
      for(column1.size):  //遍历计算
          result[i] = 对俩数组column1,column2进行某种计算操作 //一个group中第i行的结果
      resultArray[i]=(column1[i],column2[i],result[i])
      resultArray   //返回值
    })    
STEP.3.UDF中可以对数组做任意操作,你对数组想怎么操作就怎么操作,最后返回一个数组就可以了,长度和你传入的数组相同(显然),数组每个元素的格式是tuple的(column1.vaule,column2.value, result)因为column1,column2的值我们后边展开的时候还要用。
STEP.4.执行UDF函数,传入的第一步中生成的列,获得结果列newcolumn,存储UDF的返回值。此时一个group还是一行。
+-------+--------------------------+-----------------------+-------------------------------+
|groupid|        column1           |        column2        |          newcolumn            |
+-------+--------------------------+-----------------------+-------------------------------+
|   1   |  [1,12,30,18,19,15]  | [7,9,8,1,13,20]   | [(1,7,v1.1),(12,9,v1.2)...]   |
|   2   |      [41,50,16,27]       |      [2,19,11,5]      | [(41,2,v2.1),(50,19,v2.2)..]  |
|   3   |        [83,91,10]        |      [6,15,8]         | [(83,91,v3.1),(6,15,v3.2)..]  |
STEP.5.column1,column2可以丢掉了,因为用不到。
+-------+-------------------------------+
|groupid|          newcolumn            |
+-------+-------------------------------+
|   1   | [(1,7,v1.1),(12,9,v1.2)...]   |
|   2   | [(41,2,v2.1),(50,19,v2.2)..]  |
|   3   | [(83,91,v3.1),(6,15,v3.2)..]  |
STEP.6.对结果列执行explode(col("newcolumn"))操作,相当于把数组撑开来到整个group中。
+-------+----------------------+
|groupid|         new          |
+-------+----------------------+
|   1   | (1,7,value1.1)    |
|   1   | (12,9,value1.2)   |
|   1   | (30,8,value1.3)   |
|   1   | (18,1,value1.4)   |
.....省略
|   2   |  (41,2,value2.1)     |
|   2   |  (50,19,value2.2)    |
|   3   |  (83,91,value3.1)    | ...大面积省略
    
STEP.7.把tuple分开成三列

select(col("groupid"), col("new._1").as("rownum"), col("new._2").as("column2"), col("new._3").as("resultcolumn")) //selecting as separate column

所有代码看这里:



df.groupBy("groupid").agg(collect_list("column1").as("column1"),collect_list("column2").as("column2")) // 把要操作的列转换成数组,作为group的一个列属性。
      .withColumn("newcolumn", createNewCol(col("column1"), col("column2")))  //把存储数组的列传入udf,返回一个新列     
      .drop("column1", "column2") //丢弃两个存储数组的列,因为用不到了                                                        
      .withColumn("new", explode(col("newcolumn"))) //把新计算出来的内容从一行explode到整个group
      .select(col("groupid"), col("new._1").as("rownum"), col("new._2").as("column2"), col("new._3").as("column3"))  //selecting as separate column                                         
      .show(false)

The end

实际案例就不举了,码字太麻烦了。这里有一个,英文的,来自我的stackoverflow
PS:collect 是 一个shuffle算子,会特别消耗资源,如果出现OOM,别怪我
查看原文

赞 1 收藏 0 评论 2

杨闯 发布了文章 · 2018-03-30

“The CM is using external DB” bug的解决办法

上周升级CDH5的jdk 从1.8_131到1.8_151,照常升完之后重启集群,结果集群挂了。
无奈手动命令行关闭集群所有服务然后重启。

service cloudera-scm-server-db stop
service cloudera-scm-server stop
service cloudera-scm-agent stop

然后就启动不了。
系统报错显示,缺了几个必要的日志文件
推测可能是强行重启导致的文件丢失

进一步,自己手动添加缺失的log文件。
起初,我是用root添加的,并未授予777权限,导致日志无法被写入,总是空的。
后来,索性登入cloudera-scm账户,创建日志文件,至此才从日志文件中发现了出问题的地方。

日志文件的目录
/var/log/cloudera-scm-server
/var/log/cloudera-scm-server-db
/var/log/cloudera-scm-agent

OK,到达这一步,server和agent服务已经可以正常启动了。但是postgresql还是有问题。迷瞪了好久,就是下面这个错误。

Starting cloudera-scm-server-db (via systemctl): Job for cloudera-scm-server-db.service failed because the control process exited with error code. See "systemctl status cloudera-scm-server-db.service" and "journalctl -xe" for details

后来发现集群使用的是内嵌的postgre数据库,而网上大部分人使用的都是外部的数据库(postgre,mysql等等等等)。
之前我排查数据库不能启动的原因时,尝试使用如下方式连接postgresql数据库。

/usr/share/cmf/schema/scm_prepare_database.sh postgresql scm scm scm_password

但是查阅cloudera官方文档发现,这条命令是针对外部数据库的!!!,会更改数据库的配置文件:

cat /etc/cloudera-scm-server/db.properties
Auto-generated by scm_prepare_database.sh
#
Sat Oct 1 12:19:15 PDT 201
#
com.cloudera.cmf.db.type=postgresql
com.cloudera.cmf.db.host=localhost
com.cloudera.cmf.db.name=scm
com.cloudera.cmf.db.user=scm
com.cloudera.cmf.db.password=scm_password
com.cloudera.cmf.db.setupType=EXTERNAL

上边是被改过的,可以看到postgre默认的7432端口变成了localhost,数据库类型也变成了EXTERNAL,这就是报External错误的原因。改一改,改成下面这个样子,

#vim cat /etc/cloudera-scm-server/db.properties
Auto-generated by scm_prepare_database.sh
Sat Oct 1 12:19:15 PDT 201
com.cloudera.cmf.db.type=postgresql
com.cloudera.cmf.db.host=localhost:7432
com.cloudera.cmf.db.name=scm
com.cloudera.cmf.db.user=scm
com.cloudera.cmf.db.password=scm_password
com.cloudera.cmf.db.setupType=EMBEDDED

如果发现不知道scm的密码,还可以改改密码,挺方便的。
至此,重启所有服务就OK啦。

Tips:

  1. service --status-all 可以查看所有服务的运行状态。例如postgre is running 这种。
  2. 登入到cloudera-scm 账户时,发现他是不可登陆的,通过cat/etc/passwd查看发现cloudera-scm:x:998:996:Cloudera Manager:/var/lib/cloudera-scm-server:/sbin/nologin,把/sbin/nologin 改成/bin/bash就好了。
  3. cloudera 官方的问答社区挺好用的,有了bug 会有官方的工作人员给你解答,这个十分nice!
  4. 每次对系统操作的时候一定要慎重!不要像我一样,网上大部分人用的外部数据库,我不假思索就用了外部数据库的操作,导致浪费了那么多时间!尤其集群不是自己搭建的这种,更要注意。
查看原文

赞 0 收藏 0 评论 0

杨闯 关注了标签 · 2018-03-30

数据库

数据库(Database)是按照数据结构来组织、存储和管理数据的仓库,它产生于距今五十年前,随着信息技术和市场的发展,特别是二十世纪九十年代以后,数据管理不再仅仅是存储和管理数据,而转变成用户所需要的各种数据管理的方式。数据库有很多种类型,从最简单的存储有各种数据的表格到能够进行海量数据存储的大型数据库系统都在各个方面得到了广泛的应用。

关注 6629

杨闯 关注了标签 · 2018-03-30

大数据

大数据(Big Data)又称为巨量资料,指需要新处理模式才能具有更强的决策力、洞察力和流程优化能力的海量、高增长率和多样化的信息资产。“大数据”概念最早由维克托·迈尔·舍恩伯格和肯尼斯·库克耶在编写《大数据时代》中提出,指不用随机分析法(抽样调查)的捷径,而是采用所有数据进行分析处理。大数据有4V特点,即Volume(大量)、Velocity(高速)、Variety(多样)、Value(价值)。

大数据 - 定义

对于“大数据”(Big data)研究机构Gartner给出了这样的定义。“大数据”是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。
根据维基百科的定义,大数据是指无法在可承受的时间范围内用常规软件工具进行捕捉、管理和处理的数据集合。

大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。换言之,如果把大数据比作一种产业,那么这种产业实现盈利的关键,在于提高对数据的“加工能力”,通过“加工”实现数据的“增值”。

从技术上看,大数据与云计算的关系就像一枚硬币的正反面一样密不可分。大数据必然无法用单台的计算机进行处理,必须采用分布式架构。它的特色在于对海量数据进行分布式数据挖掘,但它必须依托云计算的分布式处理、分布式数据库和云存储、虚拟化技术。

随着云时代的来临,大数据(Big data)也吸引了越来越多的关注。《着云台》的分析师团队认为,大数据(Big data)通常用来形容一个公司创造的大量非结构化数据和半结构化数据,这些数据在下载到关系型数据库用于分析时会花费过多时间和金钱。大数据分析常和云计算联系到一起,因为实时的大型数据集分析需要像MapReduce一样的框架来向数十、数百或甚至数千的电脑分配工作。

大数据需要特殊的技术,以有效地处理大量的容忍经过时间内的数据。适用于大数据的技术,包括大规模并行处理(MPP)数据库、数据挖掘电网、分布式文件系统、分布式数据库、云计算平台、互联网和可扩展的存储系统。

大数据 - 特征

容量(Volume):数据的大小决定所考虑的数据的价值的和潜在的信息;
种类(Variety):数据类型的多样性;
速度(Velocity):指获得数据的速度;
可变性(Variability):妨碍了处理和有效地管理数据的过程。
真实性(Veracity):数据的质量
复杂性(Complexity):数据量巨大,来源多渠道

大数据 - 技术盘点

HadoopMapReduce

思维模式转变的催化剂是大量新技术的诞生,它们能够处理大数据分析所带来的3个V的挑战。扎根于开源社区,Hadoop已经是目前大数据平台中应用率最高的技术,特别是针对诸如文本、社交媒体订阅以及视频等非结构化数据。除分布式文件系统之外,伴随Hadoop一同出现的还有进行大数据集处理MapReduce架构。根据权威报告显示,许多企业都开始使用或者评估Hadoop技术来作为其大数据平台的标准。

NoSQL数据库

我们生活的时代,相对稳定的数据库市场中还在出现一些新的技术,而且在未来几年,它们会发挥作用。事实上,NoSQL数据库在一个广义上派系基础上,其本身就包含了几种技术。总体而言,他们关注关系型数据库引擎的限制,如索引、流媒体和高访问量的网站服务。在这些领域,相较关系型数据库引擎,NoSQL的效率明显更高。

内存分析

在Gartner公司评选的2012年十大战略技术中,内存分析在个人消费电子设备以及其他嵌入式设备中的应用将会得到快速的发展。随着越来越多的价格低廉的内存用到数据中心中,如何利用这一优势对软件进行最大限度的优化成为关键的问题。内存分析以其实时、高性能的特性,成为大数据分析时代下的“新宠儿”。如何让大数据转化为最佳的洞察力,也许内存分析就是答案。大数据背景下,用户以及IT提供商应该将其视为长远发展的技术趋势。

集成设备

随着数据仓库设备(Data Warehouse Appliance)的出现,商业智能以及大数据分析的潜能也被激发出来,许多企业将利用数据仓库新技术的优势提升自身竞争力。集成设备将企业的数据仓库硬件软件整合在一起,提升查询性能、扩充存储空间并获得更多的分析功能,并能够提供同传统数据仓库系统一样的优势。在大数据时代,集成设备将成为企业应对数据挑战的一个重要利器。

关注 951

杨闯 关注了用户 · 2018-03-16

hfhan @hfhan

砥砺前行

关注 19557

杨闯 关注了用户 · 2018-03-16

小蜗牛 @wangwenlin

不高不帅,时常犯二, 在逗比的道路上越走越远。。。

看完请闭眼!

关注 15084

杨闯 关注了用户 · 2018-03-16

justjavac @justjavac

会写点 js 代码

关注 14440

杨闯 关注了用户 · 2018-03-16

krun @krun

喜欢挖坑造轮子

关注 11356

杨闯 关注了用户 · 2018-03-16

小翼 @xiaoyi_99

React 第一本手稿级的文档。

https://kairi1227.github.io

关注 9875

杨闯 关注了标签 · 2018-03-16

关注 64194

认证与成就

  • 获得 1 次点赞
  • 获得 0 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 0 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2018-03-16
个人主页被 202 人浏览