鸿乃江边鸟

鸿乃江边鸟 查看完整档案

杭州编辑  |  填写毕业院校  |  填写所在公司/组织 segmentfault.com/u/hongnaijiangbianniao 编辑
编辑

// _ _
// / \ / \ / \ / \
// ( D | A | T | A )
// \_/ \_/ \_/ \_/

技术爱好者

个人动态

鸿乃江边鸟 发布了文章 · 2020-12-07

【k8s系列6】Mirantis及时现身,接过Kubernetes dockershim支持大旗

关于dockershim即将灭亡的传言无疑存在严重夸大。如果一直有关注Kubernetes生态系统,很多朋友一时之间可能确实被Kubernetes 1.20版本的发布公告弄得有点不知所措。从公告内容来看,自1.20版本开始dockershim将被全面弃用。但请不要恐慌,调整呼吸,一切都会好起来。

更重要的是,Mirantis现已同意与Docker开展合作,在Kubernetes之外独立维护shim代码并将其作为Docker Engine API的统一CRI接口。对于Mirantis客户而言,这意味着Docker Engine的商业支持版本Mirantis Container Runtime(MCR)也将提供CRI兼容能力。我们将从https://github.com/dims/cri-d...,并逐步将其转化为开源项目https://github.com/Mirantis/c...。换句话说,你可以像之前一样继续基于Docker Engine构建Kubernetes,唯一的区别就是dockershim由内置方案变成了外部方案。我们将共同努力,保证它在保持原有功能的同时,顺利通过各类一致性测试并提供与此前内置版本相同的使用体验。Mirantis将在Mirantis Kubernetes Engine中使用dockershim,Docker方面也将在Docker Desktop中继续提供dockershim。

从头说起……

用过Kubernetes的朋友都清楚,它的最大作用就是编排各类容器。对不少用户来说,容器已经与Docker完全统一了起来。但这种说法并不准确,Docker本身只是彻底改变了容器技术并将其推向了通用舞台,因此Docker Engine也成为Kubernetes所支持的第一种(也是最初唯一一种)容器运行时。

但Kubernetes社区并不打算长期保持这样的状态。

从长远来看,社区希望能够使用多种不同类型的容器,因此参与者们创建了容器运行时接口(CRI),也就是容器引擎与Kubernetes间进行通信的标准方式。如果容器引擎与CRI相兼容,即可轻松在Kubernetes当中运行。

第一款兼容CRI的容器引擎是containerd,而它来自……好吧,还是来自Docker。很明显,Docker本身不仅仅是一种容器运行时,而且提供可供其他用户消费的种种部件,甚至包括用户界面。因此,Docker提取出与容器实际相关的部分,并将其调整为第一种与CRI兼容的运行时,而后把它捐赠给了云原生计算基金会(CNCF)。由此衍生出的cri-containerd组件具有运行时中立特性,而且能够支持多种Linux与Windows操作系统。

但这还留下最后一个问题——Docker本身仍然不兼容CRI。

Dockershim是什么?

正如Kubernetes最初对Docker Engine提供内置支持一样,其中同样包含对各类存储卷解决方案、网络解决方案甚至是云服务商的内置支持。但要不断维护这些支持功能实在是太过麻烦,因此社区决定将所有第三方解决方案从核心中剥离出来并创建相关接口,例如:

  • 容器运行时接口(CRI)
  • 容器网络接口(CNI)
  • 容器存储接口(CSI)

其中的基本思路在于,只要兼容这些接口,那么任何供应商都可以创建出能自动与Kubernetes相对接的产品。

当然,这绝不是说不兼容的组件就没办法与Kubernetes配合使用;Kubernetes可以使用正确的组件完成各类协同。换言之,不兼容的组件仅仅需要加上个“shim(意为垫片)”,由其在组件与相应的Kubernetes接口之间完成转换,即可轻松解决问题。例如,dockershim会接收CRI命令并将其转换为Docker Engine能够理解的内容,反之亦然。但在第三方组件被从Kubernetes核心内剥离的背景之下,dockershim自身也需要逐步退出。

虽然听起来好像事情不小,但实际上没那么严重。大家使用docker build构建起的CRI兼容型镜像,未来仍然可以与Kubernetes正常配套使用。

Kubernetes放弃对dockershim的支持,会带来哪些影响?

对大多数人来说,弃用dockershim其实半点影响也没有。这是因为大部分用户既意识不到dockershim的存在,实际上使用的也不是Docker本体;相反,他们使用的是与CRI相兼容的containerd。

当然,也有一部分用户(包括Mirantis的客户)在运行依赖于dockershim的工作负载,借此与Kubernetes实现无缝协作。

考虑到dockershim仍然是不少企业难以割舍的重要组件,Mirantis与Docker达成协议,继续支持并开发dockershim。只不过这一次,dockershim将以独立开源组件的身份存在。

那么,这到底意味着什么?

简单来讲,如果你直接使用containerd,则不必抱有任何担心;因为containerd能够与CRI相兼容。如果你身为Mirantis的客户,同样不用担心;因为Mirantis容器运行时将包含对dockershim的支持,确保其与CRI相兼容。

但如果你使用的是开源Docker Engine,则dockershim项目将以开源组件的形式提供,您可以继续在Kubernetes上正常使用;唯一的区别就是需要对配置做出少量修改,具体请参见我们后续发布的说明文档。

所以,请大家不必惊异。Docker还在,dockershim还在,一切如常。

原文链接:https://www.mirantis.com/blog...

查看原文

赞 0 收藏 0 评论 0

鸿乃江边鸟 发布了文章 · 2020-12-07

【k8s系列5】不必惊慌:聊聊Kubernetes与Docker

在1.20版本之后,Kubernetes将不再支持把Docker作为容器运行时使用。

不必惊慌,实际上没多大影响。

摘要:这里只是不建议将Docker作为底层运行时,你仍然可以使用专为Kubernetes创建的容器运行时接口(CRI)一如既往地在集群中运行Docker镜像。

对于Kubernetes最终用户,此次调整同样不会有太大影响。Docker不会就此消亡,你也仍然可以继续将Docker作为开发工具使用。Docker会继续构建起不计其数的容器,而运行docker build命令所生成的镜像仍可在Kubernetes集群内正常运行。

如果你使用的是GKE或者EKS等托管Kubernetes服务,则需要确保在未来的Kubernetes版本彻底去除Docker支持之前,为你的工作节点引入受支持的容器运行时。如果节点中包含自定义项,你可能需要根据当前环境及运行时要求做出更新。请与服务供应商合作,确保正确完成升级测试及规划。

如果你的集群一直在滚动扩展,则需要配合变量以避免服务中断。在1.20版本中,你将收到Docker弃用警告。而在未来的Kubernets版本(计划在2021年下半年发布的1.23版本)中,Docker运行时将被彻底移除、不再受到支持,届时您必须切换至其他兼容的容器运行时,例如containerd或者CRI-O。只需要保证你所选定的运行时,能够支持当前使用的Docker守护程序配置即可(例如日志记录)。

既然问题不大,人们在慌什么?在怕什么?

这里我们需要探讨两种不同的环境,而这也是恐慌情绪的根源。首先,在Kubernetes集群内部存在一种叫作容器运行时的东西,负责提取并运行容器镜像。Docker是目前最流行的运行时选项(其他常见选项还包括containerd与CRI-O)。但Docker在设计上并未考虑到被嵌入Kubernetes这种用法,所以可能引发问题。

很明显,这里我们提到的“Docker”并不是同一种东西——它代表着一整套技术栈,而containerd高级容器运行时则是Docker中的一部分。Docker很酷、实用性极强,提供多种用户体验增强功能,让我们能够在开发过程中轻松完成协同交互。但是,用户体验增强功能对Kubernetes来说并非必需,因为Kubernetes并不是什么人类协作方。

结果就是,要想让containerd这个人类友好型抽象层发挥作用,Kubernetes集群就必须引入另一款名为Dockershimi的工具。但这款工具的介入又引发了新的问题,因为我们必须额外加以维护,否则就可能引发安全问题。事实上,Dockershim早在Kubelet 1.23版本时就已经被移除,或者说Kubelet很早就取消了将Docker作为容器运行时的功能。这时候很多朋友可能要问,既然Docker栈中已经包含containerd,Kubernetes为什么还要画蛇添足地搞出个Dockershim?

这是因为Docker与CRI(即容器运行时接口)并不相容。正是因为不相容,所以我们才需要Dockershim来缓冲一下。但这不是什么大问题,各位没必要惊慌——这件事的本质,就是把容器运行时从Docker转换为另一种受支持的选项。

这里需要注意的是:如果大家将底层Docker套接字(/var/run/docker.sock)设定为集群工作流中的一部分,那么转换至其他运行时会破坏掉当前业务的正常运行。这种模式称为Docker in Docker,好在我们可以使用多种选项解决这个特定用例,包括Kaniko、Img以及Buildah等等。

但这种变化对开发者意味着什么?我们还需要编写Dockerfiles吗?未来还应不应该继续使用Docker?

请注意,本次变更所影响到的环境,其实跟大多数人用于进行Docker交互的环境并不是一回事。你在开发中使用的Docker安装,与Kubernetes集群中的Docker运行时毫无关系。我知道,这事听起来让人有点犯迷糊。总之,对于开发人员,Docker在公布此次更改之前提供的所有方案都仍然适用。Docker生成的镜像实际上并不特定于Docker,更准确地说它应该属于OCI(开放容器倡议)镜像。任何与OCI相兼容的镜像,无论使用哪种工具构建而成,对于Kubernetes来说都是一样的。Containerd与CRI-O都能识别这些镜像并正常运行,这也是我们建立一套统一容器标准的意义所在。

因此,虽然变化即将到来,虽然会给部分用户带来麻烦,但影响并不算大。而且从长远角度看,这其实是件好事。总而言之,希望大家放下抵触和恐慌情绪,坦然接受这个变化。

原文链接:https://kubernetes.io/blog/20...

查看原文

赞 0 收藏 0 评论 0

鸿乃江边鸟 发布了文章 · 2020-12-07

【openJDK系列2】云原生时代,Java危矣?

本文转载自:https://mp.weixin.qq.com/s/fV...

Java 诞生距今已有 25 年,但它仍然长期占据着“天下第一”编程语言的宝座。只是其统治地位并非坚不可摧,反倒可以说是危机四伏。云原生时代,Java 技术体系的许多前提假设都受到了挑战,目前已经有可预见的、足以威胁动摇其根基的潜在可能性正在酝酿。同时,像 Golang、Rust 这样的新生语言,以及 C、C++、C#、Python 等老对手也都对 Java 的市场份额虎视眈眈。面对危机,Java 正在尝试哪些变革?未来,Java 是会继续向前、再攀高峰,还是由盛转衰?在今天由极客邦科技举办的 QCon 全球软件开发大会 2020(深圳站)上,远光软件研究院院长、《深入理解 Java 虚拟机》系列书籍作者周志明发表了主题演讲《云原生时代的 Java》,以下内容为演讲整理。  

今天,25岁的Java仍然是最具有统治力的编程语言,长期占据编程语言排行榜的首位,拥有一千二百万的庞大开发者群体,全世界有四百五十亿部物理设备使用着Java技术,同时,在云端数据中心的虚拟化环境里,还运行着超过两百五十亿个Java虚拟机的进程实例 (数据来自Oracle的WebCast)。

以上这些数据是Java过去25年巨大成就的功勋佐证,更是Java技术体系维持自己“天下第一”编程语言的坚实壁垒。Java与其他语言竞争,底气从来不在于语法、类库有多么先进好用,而是来自它庞大的用户群和极其成熟的软件生态,这在朝夕之间难以撼动。然而,这个现在看起来仍然坚不可摧的Java帝国,其统治地位的稳固程度不仅没有高枕无忧,反而说是危机四伏也不为过。目前已经有了可预见的、足以威胁动摇其根基的潜在可能性正在酝酿,并随云原生时代而降临。

Java 的危机

Java与云原生的矛盾,来源于Java诞生之初,植入到它基因之中的一些基本的前提假设已经逐渐开始被动摇,甚至已经不再成立。

我举个例子,每一位Java的使用者都听说过“一次编写,到处运行”(Write Once, Run Anywhere)这句口号。20多年前,Java成熟之前,开发者如果希望程序在Linux、Solaris、Windows等不同平台,在x86、AMD64、SPARC、MIPS、ARM等不同指令集架构上都能正常运行,就必须针对每种组合,编译出对应的二进制发行包,或者索性直接分发源代码,由使用者在自己的平台上编译。

面对这个问题,Java通过语言层虚拟化的方式,令每一个Java应用都自动取得平台无关(Platform Independent)、架构中立(Architecture Neutral)的先天优势,让同一套程序格式得以在不同指令集架构、不同操作系统环境下都能运行且得到一致的结果,不仅方便了程序的分发,还避免了各种平台下内存模型、线程模型、字节序等底层细节差异对程序编写的干扰。在当年,Java的这种设计带有令人趋之若鹜的强大吸引力,直接开启了托管语言(Managed Language,如Java、.NET)的一段兴盛期。

面对相同的问题,今天的云原生选择以操作系统层虚拟化的方式,通过容器实现的不可变基础设施去解决。不可变基础设施这个概念出现得比云原生要早,原本是指该如何避免由于运维人员对服务器运行环境所做的持续的变更而导致的意想不到的副作用。但在云原生时代,它的内涵已不再局限于方便运维、程序升级和部署的手段,而是升华一种为向应用代码隐藏环境复杂性的手段,是分布式服务得以成为一种可普遍推广的普适架构风格的必要前提。

将程序连同它的运行环境一起封装到稳定的镜像里,现已是一种主流的应用程序分发方式。Docker同样提出过“一次构建,到处运行”(Build Once, Run Anywhere)的口号,尽管它只能提供环境兼容性和有局限的平台无关性(指系统内核功能以上的ABI兼容),且完全不可能支撑架构中立性,所以将“一次构建,到处运行”与“一次编写,到处运行”对立起来并不严谨恰当,但是无可否认,今天Java技术“一次编译,到处运行”的优势,已经被容器大幅度地削弱,不再是大多数服务端开发者技术选型的主要考虑因素了。

如果仅仅是优势的削弱,并不足以成为Java的直接威胁,充其量只是一个潜在的不利因素,但更加迫在眉睫的风险来自于那些与技术潮流直接冲突的假设。譬如,Java总体上是面向大规模、长时间的服务端应用而设计的,严(luō)谨(suō)的语法利于约束所有人写出较一致的代码;静态类型动态链接的语言结构,利于多人协作开发,让软件触及更大规模;即时编译器、性能制导优化、垃圾收集子系统等Java最具代表性的技术特征,都是为了便于长时间运行的程序能享受到硬件规模发展的红利。

另一方面,在微服务的背景下,提倡服务围绕业务能力而非技术来构建应用,不再追求实现上的一致,一个系统由不同语言,不同技术框架所实现的服务来组成是完全合理的;服务化拆分后,很可能单个微服务不再需要再面对数十、数百GB乃至TB的内存;有了高可用的服务集群,也无须追求单个服务要7×24小时不可间断地运行,它们随时可以中断和更新。

同时,微服务又对应用的容器化亲和性,譬如镜像体积、内存消耗、启动速度,以及达到最高性能的时间等方面提出了新的要求。这两年的网红概念Serverless也进一步增加这些因素的考虑权重,而这些却正好都是Java的弱项:哪怕再小的Java程序也要带着完整的虚拟机和标准类库,使得镜像拉取和容器创建效率降低,进而使整个容器生命周期拉长。基于Java虚拟机的执行机制,使得任何Java的程序都会有固定的基础内存开销,以及固定的启动时间,而且Java生态中广泛采用的依赖注入进一步将启动时间拉长,使得容器的冷启动时间很难缩短。 

软件工业中已经出现过不止一起因Java这些弱点而导致失败的案例,如JRuby编写的Logstash,原本是同时承担部署在节点上的收集端(Shipper)和专门转换处理的服务端(Master)的职责,后来因为资源占用的原因,被Elstaic.co用Golang的Filebeat代替了Shipper部分的职能;又如Scala语言编写的边车代理Linkerd,作为服务网格概念的提出者,却最终被Envoy所取代,其主要弱点之一也是由于Java虚拟机的资源消耗所带来的劣势。 

虽然在云原生时代依然有很多适合Java发挥的领域,但是具备弹性与韧性、随时可以中断重启的微型服务的确已经形成了一股潮流,在逐步蚕食大型系统的领地。正是由于潮流趋势的改变,新一代的语言与技术尤其重视轻量化和快速响应能力,大多又重新回归到了原生语言(Native Language,如Golang、Rust)之上。

Java 的变革

面对挑战,Java的开发者和社区都没有退缩,它们在各自的领域给出了很多优秀的解决方案,涌现了如Quarkus、Micronaut、Helidon等一大批以提升Java在云原生环境下的适应性为卖点的框架。

不过,今天我们的主题将聚焦在由Java官方本身所推进的项目上。在围绕Java 25周年的研讨和布道活动中,官方的设定是以“面向未来的变革”(Innovating for the Future)为基调,你有可能在此之前已经听说过其中某个(某些)项目的名字和改进点,但这里我们不仅关心这些项目改进的是什么,还更关心它们背后的动机与困难、带来的收益,以及要付出的代价。

    Innovating for the Future

Project Leyden

对于原生语言的挑战,最有力最彻底的反击手段无疑是将字节码直接编译成可以脱离Java虚拟机的原生代码。如果真的能够生成脱离Java虚拟机运行的原生程序,将意味着启动时间长的问题能够彻底解决,因为此时已经不存在初始化虚拟机和类加载的过程;也意味着程序马上就能达到最佳的性能,因为此时已经不存在即时编译器运行时编译,所有代码都是在编译期编译和优化好的(如下图所示);没有了Java虚拟机、即时编译器这些额外的部件,也就意味着能够省去它们原本消耗的那部分内存资源与镜像体积。

     Java Performance Matrices(图片来源)

但同时,这也是风险系数最高、实现难度最大的方案。

Java并非没有尝试走过这条路,从Java 2之前的GCJ(GNU Compiler for Java),到后来的Excelsior JET,再到2018年Oracle Labs启动的GraalVM中的SubstrateVM模块,最后到2020年中期刚建立的Leyden项目,都在朝着提前编译(Ahead-of-Time Compilation,AOT)生成原生程序这个目标迈进。

Java支持提前编译最大的困难在于它是一门动态链接的语言,它假设程序的代码空间是开放的(Open World),允许在程序的任何时候通过类加载器去加载新的类,作为程序的一部分运行。要进行提前编译,就必须放弃这部分动态性,假设程序的代码空间是封闭的(Closed World),所有要运行的代码都必须在编译期全部可知。这一点不仅仅影响到了类加载器的正常运作,除了无法再动态加载外,反射(通过反射可以调用在编译期不可知的方法)、动态代理、字节码生成库(如CGLib)等一切会运行时产生新代码的功能都不再可用,如果将这些基础能力直接抽离掉,Helloworld还是能跑起来,但Spring肯定跑不起来,Hibernate也跑不起来,大部分的生产力工具都跑不起来,整个Java生态中绝大多数上层建筑都会轰然崩塌。 

要获得有实用价值的提前编译能力,只有依靠提前编译器、组件类库和开发者三方一起协同才可能办到。由于Leyden刚刚开始,几乎没有公开的资料,所以下面我是以SubstrateVM为目标对象进行的介绍: 

  • 有一些功能,像反射这样的基础特性是不可能妥协的,折衷的解决办法是由用户在编译期,以配置文件或者编译器参数的形式,明确告知编译器程序代码中有哪些方法是只通过反射来访问的,编译器将方法的添加到静态编译的范畴之中。同理,所有使用到动态代理的地方,也必须在事先列明,在编译期就将动态代理的字节码全部生成出来。其他所有无法通过程序指针分析(Points-To Analysis)得到的信息,譬如程序中用到的资源、配置文件等等,也必须照此处理。
  • 另一些功能,如动态生成字节码也十分常用,但用户自己往往无法得知那些动态字节码的具体信息,就只能由用到CGLib、javassist等库的程序去妥协放弃。在Java世界中也许最典型的场景就是Spring用CGLib来进行类增强,默认情况下,每一个Spring管理的Bean都要用到CGLib。从Spring Framework 5.2开始增加了@proxyBeanMethods注解来排除对CGLib的依赖,仅使用标准的动态代理去增强类。

2019年起,Pivotal的Spring团队与Oracle Labs的GraalVM团队共同孵化了Spring GraalVM Native项目,这个目前仍处于Experimental / Alpha状态的项目,能够让程序先以传统方式运行(启动)一次,自动化地找出程序中的反射、动态代理的代码,代替用户向编译器提供绝大部分所需的信息,并能将允许启动时初始化的Bean在编译期就完成初始化,直接绕过Spring程序启动最慢的阶段。这样从启动到程序可以提供服务,耗时竟能够低于0.1秒。

    Spring Boot Startup Time(数据来源)

以原生方式运行后,缩短启动时间的效果立竿见影,一般会有数十倍甚至更高的改善,程序容量和内存消耗也有一定程度的下降。不过至少目前而言,程序的运行效率还是要弱于传统基于Java虚拟机的方式,虽然即时编译器有编译时间的压力,但由于可以进行基于假设的激进优化和运行时性能度量的制导优化,使得即时编译器的效果仍要优于提前编译器,这方面需要GraalVM编译器团队的进一步努力,也需要从语言改进上入手,让Java变得更适合被编译器优化。

Project Valhalla

Java语言上可感知的语法变化,多数来自于Amber项目,它的项目目标是持续优化语言生产力,近期(JDK 15、16)会有很多来自这个项目的特性,如Records、Sealed Class、Pattern Matching、Raw String Literals等实装到生产环境。 

然而语法不仅与编码效率相关,与运行效率也有很大关系。“程序=代码+数据”这个提法至少在衡量运行效率上是合适的,无论是托管语言还是原生语言,最终产物都是处理器执行的指令流和内存存储的数据结构。Java、.NET、C、C++、Golang、Rust等各种语言谁更快,取决于特定场景下,编译器生成指令流的优化效果,以及数据在内存中的结构布局。

Java即时编译器的优化效果拔群,但是由于Java“一切皆为对象”的前提假设,导致在处理一系列不同类型的小对象时,内存访问性能非常拉垮,这点是Java在游戏、图形处理等领域一直难有建树的重要制约因素,也是Java建立Valhalla项目的目标初衷。 

这里举个例子来说明此问题,如果我想描述空间里面若干条线段的集合,在Java中定义的代码会是这样的:

public record Point(float x, float y, float z) {}
public record Line(Point start, Point end) {}
Line[] lines;

面向对象的内存布局中,对象标识符(Object Identity)存在的目的是为了允许在不暴露对象结构的前提下,依然可以引用其属性与行为,这是面向对象编程中多态性的基础。在Java中堆内存分配和回收、空值判断、引用比较、同步锁等一系列功能都会涉及到对象标识符,内存访问也是依靠对象标识符来进行链式处理的,譬如上面代码中的“若干条线段的集合”,在堆内存中将构成如下图的引用关系: 

Object Identity / Memory Layout

计算机硬件经过25年的发展,内存与处理器虽然都在进步,但是内存延迟与处理器执行性能之间的冯诺依曼瓶颈(Von Neumann Bottleneck)不仅没有缩减,反而还在持续加大,“RAM Is the New Disk”已经从嘲讽梗逐渐成为了现实。

一次内存访问(将主内存数据调入处理器Cache)大约需要耗费数百个时钟周期,而大部分简单指令的执行只需要一个时钟周期而已。因此,在程序执行性能这个问题上,如果编译器能减少一次内存访问,可能比优化掉几十、几百条其他指令都来得更有效果。 

额外知识:冯诺依曼瓶颈

不同处理器(现代处理器都集成了内存管理器,以前是在北桥芯片中)的内存延迟大概是40-80纳秒(ns,十亿分之一秒),而根据不同的时钟频率,一个时钟周期大概在0.2-0.4纳秒之间,如此短暂的时间内,即使真空中传播的光,也仅仅能够行进10厘米左右。

数据存储与处理器执行的速度矛盾是冯诺依曼架构的主要局限性之一,1977年的图灵奖得主John Backus提出了“冯诺依曼瓶颈”这个概念,专门用来描述这种局限性。 

编译器的确在努力减少内存访问,从JDK 6起,HotSpot的即时编译器就尝试通过逃逸分析来做标量替换(Scalar Replacement)和栈上分配(Stack Allocations)优化,基本原理是如果能通过分析,得知一个对象不会传递到方法之外,那就不需要真实地在对中创建完整的对象布局,完全可以绕过对象标识符,将它拆散为基本的原生数据类型来创建,甚至是直接在栈内存中分配空间(HotSpot并没有这样做),方法执行完毕后随着栈帧一起销毁掉。

不过,逃逸分析是一种过程间优化(Interprocedural Optimization),非常耗时,也很难处理那些理论有可能但实际不存在的情况。相同的问题在C、C++中却并不存在,上面场景中,程序员只要将Point和Line都定义为struct即可,C#中也有struct,是依靠.NET的值类型(Value Type)来实现的。Valhalla项目的核心改进就是提供类似的值类型支持,提供一个新的关键字(inline),让用户可以在不需要向方法外部暴露对象、不需要多态性支持、不需要将对象用作同步锁的场合中,将类标识为值类型,此时编译器就能够绕过对象标识符,以平坦的、紧凑的方式去为对象分配内存。 

有了值类型的支持后,现在Java泛型中令人诟病的不支持原数据类型(Primitive Type)、频繁装箱问题也就随之迎刃而解,现在Java的包装类,理所当然地会以代表原生类型的值类型来重新定义,这样Java泛型的性能会得到明显的提升,因为此时Integer与int的访问,在机器层面看完全可以达到一致的效率。

Project Loom

Java语言抽象出来隐藏了各种操作系统线程差异性的统一线程接口,这曾经是它区别于其他编程语言(C/C++表示有被冒犯到)的一大优势,不过,统一的线程模型不见得永远都是正确的。

Java目前主流的线程模型是直接映射到操作系统内核上的1:1模型,这对于计算密集型任务这很合适,既不用自己去做调度,也利于一条线程跑满整个处理器核心。但对于I/O密集型任务,譬如访问磁盘、访问数据库占主要时间的任务,这种模型就显得成本高昂,主要在于内存消耗和上下文切换上:64位Linux上HotSpot的线程栈容量默认是1MB,线程的内核元数据(Kernel Metadata)还要额外消耗2-16KB内存,所以单个虚拟机的最大线程数量一般只会设置到200至400条,当程序员把数以百万计的请求往线程池里面灌时,系统即便能处理得过来,其中的切换损耗也相当可观。 

Loom项目的目标是让Java支持额外的N:M线程模型,请注意是“额外支持”,而不是像当年从绿色线程过渡到内核线程那样的直接替换,也不是像Solaris平台的HotSpot虚拟机那样通过参数让用户二选其一。

Loom项目新增加一种“虚拟线程”(Virtual Thread,以前以Fiber为名进行宣传过,但因为要频繁解释啥是Fiber所以现在放弃了),本质上它是一种有栈协程(Stackful Coroutine),多条虚拟线程可以映射到同一条物理线程之中,在用户空间中自行调度,每条虚拟线程的栈容量也可由用户自行决定。 

     Virtual Thread

同时,Loom项目的另一个目标是要尽最大可能保持原有统一线程模型的交互方式,通俗地说就是原有的Thread、J.U.C、NIO、Executor、Future、ForkJoinPool等这些多线程工具都应该能以同样的方式支持新的虚拟线程,原来多线程中你理解的概念、编码习惯大多数都能够继续沿用。

为此,虚拟线程将会与物理线程一样使用java.lang.Thread来进行抽象,只是在创建线程时用到的参数或者方法稍有不同(譬如给Thread增加一个Thread.VIRTUAL_THREAD参数,或者增加一个startVirtualThread()方法)。这样现有的多线程代码迁移到虚拟线程中的成本就会变得很低,而代价就是Loom的团队必须做更多的工作以保证虚拟线程在大部分涉及到多线程的标准API中都能够兼容,甚至在调试器上虚拟线程与物理线程看起来都会有一致的外观。但很难全部都支持,譬如调用JNI的本地栈帧就很难放到虚拟线程上,所以一旦遇到本地方法,虚拟线程就会被绑定(Pinned)到一条物理线程上。 

Loom的另一个重点改进是支持结构化并发(Structured Concurrency),这是2016年才提出的新的并发编程概念,但很快就被诸多编程语言所吸纳。它是指程序的并发行为会与代码的结构对齐,譬如以下代码所示,按照传统的编程观念,如果没有额外的处理(譬如无中生有地弄一个await关键字),那在task1和task2提交之后,程序应该继续向下执行:

ThreadFactory factory = Thread.builder().virtual().factory();
try (var executor = Executors.newThreadExecutor(factory)) {
 executor.submit(task1);
 executor.submit(task2);
} // blocks and waits 

但是在结构化并发的支持下,只有两个并行启动的任务线程都结束之后,程序才会继续向下执行,很好地以同步的编码风格,来解决异步的执行问题。事实上,“Code like sync,Work like async”正是Loom简化并发编程的核心理念。

Project Portola

Portola项目的目标是将OpenJDK向Alpine Linux移植。Alpine Linux是许多Docker容器首选的基础镜像,因为它只有5 MB大小,比起其他Cent OS、Debain等动辄一百多MB的发行版来说,更适合用于容器环境。不过Alpine Linux为了尽量瘦身,默认是用musl作为C标准库的,而非传统的glibc(GNU C library),因此要以Alpine Linux为基础制作OpenJDK镜像,必须先安装glibc,此时基础镜像大约有12 MB。Portola计划将OpenJDK的上游代码移植到musl,并通过兼容性测试。使用Portola制作的标准Java SE 13镜像仅有41 MB,不仅远低于Cent OS的OpenJDK(大约396 MB),也要比官方的slim版(约200 MB)要小得多。 

$ sudo docker build .
Sending build context to Docker daemon 2.56kB
Step 1/8 : FROM alpine:latest as build
latest: Pulling from library/alpine
bdf0201b3a05: Pull complete
Digest: sha256:28ef97b8686a0b5399129e9b763d5b7e5ff03576aa5580d6f4182a49c5fe1913
Status: Downloaded newer image for alpine:latest
 ---> cdf98d1859c1
Step 2/8 : ADD https://download.java.net/java/early_access/alpine/16/binaries/openjdk-13-ea+16_linux-x64-musl_bin.tar.gz /opt/jdk/
Downloading [==================================================>] 195.2MB/195.2MB
 ---> Using cache
 ---> b1a444e9dde9
Step 3/7 : RUN tar -xzvf /opt/jdk/openjdk-13-ea+16_linux-x64-musl_bin.tar.gz -C /opt/jdk/
 ---> Using cache
 ---> ce2721c75ea0
Step 4/7 : RUN ["/opt/jdk/jdk-13/bin/jlink", "--compress=2", "--module-path", "/opt/jdk/jdk-13/jmods/", "--add-modules", "java.base", "--output", "/jlinked"]
 ---> Using cache
 ---> d7b2793ed509
Step 5/7 : FROM alpine:latest
 ---> cdf98d1859c1
Step 6/7 : COPY --from=build /jlinked /opt/jdk/
 ---> Using cache
 ---> 993fb106f2c2
Step 7/7 : CMD ["/opt/jdk/bin/java", "--version"] - to check JDK version
 ---> Running in 8e1658f5f84d
Removing intermediate container 8e1658f5f84d
 ---> 350dd3a72a7d
Successfully built 350dd3a72a7d
$ sudo docker tag 350dd3a72a7d jdk-13-musl/jdk-version:v1
$ sudo docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
jdk-13-musl/jdk-version v1 350dd3a72a7d About a minute ago 41.7MB
alpine latest cdf98d1859c1 2 weeks ago 5.53M

Java 的未来

云原生时代,Java技术体系的许多前提假设都受到了挑战,“一次编译,到处运行”、“面向长时间大规模程序而设计”、“从开放的代码空间中动态加载”、“一切皆为对象”、“统一线程模型”,等等。技术发展迭代不会停歇,没有必要坚持什么“永恒的真理”,旧的原则被打破,只要合理,便是创新。

Java语言意识到了挑战,也意识到了要面向未来而变革。文中提到的这些项目,Amber和Portola已经明确会在2021年3月的Java 16中发布,至少也会达到Feature Preview的程度:

  • JEP 394:Pattern Matching for instanceof
  • JEP 395:Records
  • JEP 397:Sealed Classes
  • JEP 386:Alpine Linux Port

至于更受关注,同时也是难度更高的 Valhalla 和 Loom 项目,目前仍然没有明确的版本计划信息,尽管它们已经开发了数年时间,非常希望能够赶在 Java 17 这个 LTS 版本中面世,但前路还是困难重重。

至于难度最高、创建时间最晚的 Leyden 项目,目前还完全处于特性讨论阶段,连个胚胎都算不上。对于 Java 的原生编译,我们中短期内只可能寄希望于 Oracle 的 GraalVM。

未来一段时间,是Java重要的转型窗口期,如果作为下一个LTS版的Java 17,能够成功集Amber、Portola、Valhalla、Loom和Panama(用于外部函数接口访问,本文没有提到)的新能力、新特性于一身,GraalVM也能给予足够强力支持的话,那Java 17 LTS大概率会是一个里程碑式的版本,带领着整个Java生态从大规模服务端应用,向新的云原生时代软件系统转型。可能成为比肩当年从面向嵌入式设备与浏览器Web Applets的Java 1,到确立现代Java语言方向(Java SE/EE/ME和JavaCard)雏形的Java 2转型那样的里程碑。 

但是,如果Java不能加速自己的发展步伐,那由强大生态所构建的护城河终究会消耗殆尽,被Golang、Rust这样的新生语言,以及C、C++、C#、Python等老对手蚕食掉很大一部分市场份额,以至被迫从“天下第一”编程语言的宝座中退位。 

Java的未来是继续向前,再攀高峰,还是由盛转衰,锋芒挫缩,你我拭目以待。

查看原文

赞 1 收藏 0 评论 0

鸿乃江边鸟 发布了文章 · 2020-12-05

【go系列5】golang中的通道

golang中的通道类型是一种特殊的类型, 类型名字为chan。在任何时候,同时只有一个goroutine访问通道进行并发和获取数据,goroutine间通过通道就可以进行通信。我们可以通过go关键字创建goroutine。
通道本身是同步的,通道的发送和接受数据默认是同步的,且遵循先进先出的规则以保证数据发送的顺序。

  1. 通道分为双向通道和单向通道。
  • 双向通道:
chan1 := make(chan int, 10)
  • 单向通道:
#单向只写通道,10 表示通道的容量
chan2 := make(chan <- int, 10)
#单向只读通道,10表示通道的容量
chan3 := make(<- chan int, 10)
package main

import (
    "time"

    "github.com/golang/glog"
)

func read(readChan <-chan int) {
    for data := range readChan {
        glog.Info(data)
    }
}

func write(writeChan chan<- int) {
    for i := 0; i < 100; i++ {
        writeChan <- i
        glog.Infof("write: %s", i)
    }
}

func main() {
    // 双向通道
    writeReadChan := make(chan int)
    // 传入双向通道自动会转换成一个单项通道
    go write(writeReadChan)
    glog.Info("start to read data from channel!")
    // 传入双向通道会自动转换成一个单项通道`
    go read(writeReadChan)
    // 关闭chan
    close(writeReadChan)
    time.Sleep(time.Second * 100)
    glog.Info("finishedAll!!")

}
  1. 通道分无缓冲通道和缓冲通道
  • 无缓冲通道
unbufferChan1 := make(chan int)
unbufferChan2 := make(chan int, 0)
  • 缓冲通道
bufferChan := make(chan int, 1)
  • 无缓冲通道的特点是,发送的数据需要被读取后,发送才会完成,它阻塞场景:

    1. 通道中无数据,但执行读通道。
    2. 通道中无数据,向通道写数据,但无协程读取。
func occasion1() {
    noBufChan := make(chan int)
    <-noBufChan
    fmt.Println("read ")
}

// 场景2
func occasion2() {
    ch := make(chan int)
    ch <- 1
    fmt.Println("write success no block")
}
  • 有缓存通道的特点是,有缓存时可以向通道中写入数据后直接返回,缓存中有数据时可以从通道中读到数据直接返回,这时有缓存通道是不会阻塞的,它阻塞场景是:

    1. 通道的缓存无数据,但执行读通道。
    2. 通道的缓存已经占满,向通道写数据,但无协程读。
// 场景1
func occasion1() {
    bufCh := make(chan int, 2)
    <-bufCh
    fmt.Println("read from no buffer channel success")
}

// 场景2
func occasion2() {
    ch := make(chan int, 2)
    ch <- 1
    ch <- 2
    ch <- 3
    fmt.Println("write success no block")
}
查看原文

赞 0 收藏 0 评论 0

鸿乃江边鸟 发布了文章 · 2020-12-04

【go系列4】golang 函数命名返回值

golang的函数返回值是可以命名的,且一个不带任何返回值的return语句,返回所有的命名返回值,如:

package main

import "fmt"

func NameReturnDemo(count int) (x int, y int) {
    x = count * 4 / 9
    y = count - x
    return
}

func main() {
    fmt.Println(NameReturnDemo(17))
}

# 运行
go run demo.go
7 10
查看原文

赞 0 收藏 0 评论 0

鸿乃江边鸟 发布了文章 · 2020-12-03

【go系列3】go 的包管理工具go modules的使用

背景

由于之前写k8s operator在引入go的包的时候,涉及到go的包管理工具,因为自己写java已经很多年,maven的包管理工具很是好用,便想到golang是否也有类似的工具, goer都知道go的包引入带上github.com类似的前缀,就会自动从go的第三方检索服务pkg.go.dev下载对应的文件,而该检索服务会定期从github更新最新的代码。

go mod 是golang 1.11版本引入的。相对于之前要么是没有包管理,要么是管理工具不好用

使用

  1. 查看golang的版本,确保在1.11及以上版本

go version
go version go1.15.2 darwin/amd64

  1. 设置GO111MODULE为on

GO111MODULE 有三个值 : off,on,auto(默认)

  • off 关闭go mod的功能,查找包沿用一起的GOPATH模式或者通过vendor目录方式
  • on 开启go mod功能,不会去查找GOPATH目录
  • auto,分情况,

    1. 当前目录在GOPATH/src之外且该目录包含go.mod文件,会开启go mod功能
    2. 当前文件在包含go.mod文件的目录下面
  1. 初始化项目
    直接在当前项目下执行go mod init,则会在当前项目下建立go.mod文件
    如果该项目在GOPATH目录下,且没设置GO111MODULE为on,则会报错:
go: modules disabled inside GOPATH/src by GO111MODULE=auto; see 'go help modules'

再执行go build 则会把项目依赖的文件自动添加到go.mod中且会生成go.sum文件,
如果有存在包依赖冲问题,直接修改按照提示修改go.mod文件对应的版本就可以。
如:

../../../../pkg/mod/k8s.io/client-go@v11.0.0+incompatible/kubernetes/scheme/register.go:26:2: module k8s.io/api@latest found (v0.19.4), but does not contain package k8s.io/api/auditregistration/v1alpha1
go.mod  
module github.com/monkeyboy123/custom-controller

go 1.15

require (
    github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
    github.com/imdario/mergo v0.3.11 // indirect
    golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
    k8s.io/api v0.19.4 // indirect
    k8s.io/apimachinery v0.19.4
    k8s.io/client-go v11.0.0+incompatible
    k8s.io/klog v1.0.0 // indirect
    k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
)

直接修改k8s.io/client-go v11.0.0+incompatible为k8s.io/client-go v0.19.4 即可

关于使用go mod管理的话,依赖包的下载会下到$GOPATH/pkg/mod目录下,而不再是$GOPATH/src目录下

查看原文

赞 0 收藏 0 评论 0

鸿乃江边鸟 发布了文章 · 2020-12-01

【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析

AQE简介

spark configuration,到在最早在spark 1.6版本就已经有了AQE;到了spark 2.x版本,intel大数据团队进行了相应的原型开发和实践;到了spark 3.0时代,Databricks和intel一起为社区贡献了新的AQE

spark 3.0.1中的AQE的配置

配置项默认值官方说明分析
spark.sql.adaptive.enabledfalse是否开启自适应查询此处设置为true开启
spark.sql.adaptive.coalescePartitions.enabledtrue是否合并临近的shuffle分区(根据'spark.sql.adaptive.advisoryPartitionSizeInBytes'的阈值来合并)此处默认为true开启,分析见: 分析1
spark.sql.adaptive.coalescePartitions.initialPartitionNum(none)shuffle合并分区之前的初始分区数,默认为spark.sql.shuffle.partitions的值分析见:分析2
spark.sql.adaptive.coalescePartitions.minPartitionNum(none)shuffle 分区合并后的最小分区数,默认为spark集群的默认并行度分析见: 分析3
spark.sql.adaptive.advisoryPartitionSizeInBytes64MB建议的shuffle分区的大小,在合并分区和处理join数据倾斜的时候用到分析见:分析3
spark.sql.adaptive.skewJoin.enabledtrue是否开启join中数据倾斜的自适应处理
spark.sql.adaptive.skewJoin.skewedPartitionFactor5数据倾斜判断因子,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes分析见:分析4
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MB数据倾斜判断阈值,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes分析见:分析4
spark.sql.adaptive.logLeveldebug配置自适应执行的计划改变日志调整为info级别,便于观察自适应计划的改变
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin0.2转为broadcastJoin的非空分区比例阈值,>=该值,将不会转换为broadcastjoin分析见:分析5

分析1

OptimizeSkewedJoin.scala中,我们看到ADVISORY_PARTITION_SIZE_IN_BYTES,也就是spark.sql.adaptive.advisoryPartitionSizeInBytes被引用的地方, (OptimizeSkewedJoin是物理计划中的规则)

 /**
   * The goal of skew join optimization is to make the data distribution more even. The target size
   * to split skewed partitions is the average size of non-skewed partition, or the
   * advisory partition size if avg size is smaller than it.
   */
  private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {
    val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
    val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))
    // It's impossible that all the partitions are skewed, as we use median size to define skew.
    assert(nonSkewSizes.nonEmpty)
    math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
  }

其中:

  1. nonSkewSizes为task非倾斜的分区
  2. targetSize返回的是max(非倾斜的分区的平均值,advisorySize),其中advisorySize为spark.sql.adaptive.advisoryPartitionSizeInBytes值,所以说

targetSize不一定是spark.sql.adaptive.advisoryPartitionSizeInBytes值

  1. medianSize值为task的分区大小的中位值

分析2

SQLConf.scala

def numShufflePartitions: Int = {
    if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {
      getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)
    } else {
      defaultNumShufflePartitions
    }
  }

从spark 3.0.1开始如果开启了AQE和shuffle分区合并,则用的是spark.sql.adaptive.coalescePartitions.initialPartitionNum,这在如果有多个shuffle stage的情况下,增加分区数,可以有效的增强shuffle分区合并的效果

分析3

CoalesceShufflePartitions.scala,CoalesceShufflePartitions是一个物理计划的规则,会执行如下操作

 if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {
      plan
    } else {
      // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions,
      // we should skip it when calculating the `partitionStartIndices`.
      val validMetrics = shuffleStages.flatMap(_.mapStats)

      // We may have different pre-shuffle partition numbers, don't reduce shuffle partition number
      // in that case. For example when we union fully aggregated data (data is arranged to a single
      // partition) and a result of a SortMergeJoin (multiple partitions).
      val distinctNumPreShufflePartitions =
        validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
      if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
        // We fall back to Spark default parallelism if the minimum number of coalesced partitions
        // is not set, so to avoid perf regressions compared to no coalescing.
        val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
          .getOrElse(session.sparkContext.defaultParallelism)
        val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
          validMetrics.toArray,
          advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
          minNumPartitions = minPartitionNum)
        // This transformation adds new nodes, so we must use `transformUp` here.
        val stageIds = shuffleStages.map(_.id).toSet
        plan.transformUp {
          // even for shuffle exchange whose input RDD has 0 partition, we should still update its
          // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same
          // number of output partitions.
          case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) =>
            CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION)
        }
      } else {
        plan
      }
    }
  }

也就是说:

  1. 如果是用户自己指定的分区操作,如repartition操作,spark.sql.adaptive.coalescePartitions.minPartitionNum无效,且跳过分区合并优化
  2. 如果多个task进行shuffle,且task有不同的分区数的话,spark.sql.adaptive.coalescePartitions.minPartitionNum无效,且跳过分区合并优化
  3. 见ShufflePartitionsUtil.coalescePartition分析

分析4

OptimizeSkewedJoin.scala中,我们看到

/**
   * A partition is considered as a skewed partition if its size is larger than the median
   * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than
   * ADVISORY_PARTITION_SIZE_IN_BYTES.
   */
  private def isSkewed(size: Long, medianSize: Long): Boolean = {
    size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
      size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
  }
  1. OptimizeSkewedJoin是个物理计划的规则,会根据isSkewed来判断是否数据数据有倾斜,而且必须是满足SKEW_JOIN_SKEWED_PARTITION_FACTOR和SKEW_JOIN_SKEWED_PARTITION_THRESHOLD才会判断为数据倾斜了
  2. medianSize为task的分区大小的中位值

分析5

在AdaptiveSparkPlanExec方法getFinalPhysicalPlan中调用了reOptimize方法,而reOptimize方法则会执行逻辑计划的优化操作:

private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {
    logicalPlan.invalidateStatsCache()
    val optimized = optimizer.execute(logicalPlan)
    val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
    val newPlan = applyPhysicalRules(sparkPlan, preprocessingRules ++ queryStagePreparationRules)
    (newPlan, optimized)
  }

而optimizer 中有个DemoteBroadcastHashJoin规则:

@transient private val optimizer = new RuleExecutor[LogicalPlan] {
    // TODO add more optimization rules
    override protected def batches: Seq[Batch] = Seq(
      Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf))
    )
  }

而对于DemoteBroadcastHashJoin则有对是否broadcastjoin的判断:

case class DemoteBroadcastHashJoin(conf: SQLConf) extends Rule[LogicalPlan] {

  private def shouldDemote(plan: LogicalPlan): Boolean = plan match {
    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.isDefined
      && stage.mapStats.isDefined =>
      val mapStats = stage.mapStats.get
      val partitionCnt = mapStats.bytesByPartitionId.length
      val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0)
      partitionCnt > 0 && nonZeroCnt > 0 &&
        (nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin
    case _ => false
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
    case j @ Join(left, right, _, _, hint) =>
      var newHint = hint
      if (!hint.leftHint.exists(_.strategy.isDefined) && shouldDemote(left)) {
        newHint = newHint.copy(leftHint =
          Some(hint.leftHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))
      }
      if (!hint.rightHint.exists(_.strategy.isDefined) && shouldDemote(right)) {
        newHint = newHint.copy(rightHint =
          Some(hint.rightHint.getOrElse(HintInfo()).copy(strategy = Some(NO_BROADCAST_HASH))))
      }
      if (newHint.ne(hint)) {
        j.copy(hint = newHint)
      } else {
        j
      }
  }
}

shouldDemote就是对是否进行broadcastjoin的判断:

  1. 首先得是ShuffleQueryStageExec操作
  2. 如果非空分区比列大于nonEmptyPartitionRatioForBroadcastJoin,也就是spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin,则不会把mergehashjoin转换为broadcastJoin
  3. 这在sql中先join在groupby的场景中比较容易出现

ShufflePartitionsUtil.coalescePartition分析(合并分区的核心代码)

coalescePartition如示:

def coalescePartitions(
      mapOutputStatistics: Array[MapOutputStatistics],
      advisoryTargetSize: Long,
      minNumPartitions: Int): Seq[ShufflePartitionSpec] = {
    // If `minNumPartitions` is very large, it is possible that we need to use a value less than
    // `advisoryTargetSize` as the target size of a coalesced task.
    val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
    // The max at here is to make sure that when we have an empty table, we only have a single
    // coalesced partition.
    // There is no particular reason that we pick 16. We just need a number to prevent
    // `maxTargetSize` from being set to 0.
    val maxTargetSize = math.max(
      math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)
    val targetSize = math.min(maxTargetSize, advisoryTargetSize)

    val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ")
    logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " +
      s"actual target size $targetSize.")

    // Make sure these shuffles have the same number of partitions.
    val distinctNumShufflePartitions =
      mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
    // The reason that we are expecting a single value of the number of shuffle partitions
    // is that when we add Exchanges, we set the number of shuffle partitions
    // (i.e. map output partitions) using a static setting, which is the value of
    // `spark.sql.shuffle.partitions`. Even if two input RDDs are having different
    // number of partitions, they will have the same number of shuffle partitions
    // (i.e. map output partitions).
    assert(
      distinctNumShufflePartitions.length == 1,
      "There should be only one distinct value of the number of shuffle partitions " +
        "among registered Exchange operators.")

    val numPartitions = distinctNumShufflePartitions.head
    val partitionSpecs = ArrayBuffer[CoalescedPartitionSpec]()
    var latestSplitPoint = 0
    var coalescedSize = 0L
    var i = 0
    while (i < numPartitions) {
      // We calculate the total size of i-th shuffle partitions from all shuffles.
      var totalSizeOfCurrentPartition = 0L
      var j = 0
      while (j < mapOutputStatistics.length) {
        totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i)
        j += 1
      }

      // If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a
      // new coalesced partition.
      if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {
        partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
        latestSplitPoint = i
        // reset postShuffleInputSize.
        coalescedSize = totalSizeOfCurrentPartition
      } else {
        coalescedSize += totalSizeOfCurrentPartition
      }
      i += 1
    }
    partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)

    partitionSpecs
  }
  1. totalPostShuffleInputSize 先计算出总的shuffle的数据大小
  2. maxTargetSize取max(totalPostShuffleInputSize/minNumPartitions,16)的最大值,minNumPartitions也就是spark.sql.adaptive.coalescePartitions.minPartitionNum的值
  3. targetSize取min(maxTargetSize,advisoryTargetSize),advisoryTargetSize也就是spark.sql.adaptive.advisoryPartitionSizeInBytes的值,所以说该值只是建议值,不一定是targetSize
  4. while循环就是取相邻的分区合并,对于每个task中的每个相邻分区合并,直到不大于targetSize

OptimizeSkewedJoin.optimizeSkewJoin分析(数据倾斜优化的核心代码)

optimizeSkewJoin如示:

def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
    case smj @ SortMergeJoinExec(_, _, joinType, _,
        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
        if supportedJoinTypes.contains(joinType) =>
      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
      val numPartitions = left.partitionsWithSizes.length
      // Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.
      val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))
      val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))
      logDebug(
        s"""
          |Optimizing skewed join.
          |Left side partitions size info:
          |${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}
          |Right side partitions size info:
          |${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}
        """.stripMargin)
      val canSplitLeft = canSplitLeftSide(joinType)
      val canSplitRight = canSplitRightSide(joinType)
      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
      // the final data distribution is even (coalesced partitions + split partitions).
      val leftActualSizes = left.partitionsWithSizes.map(_._2)
      val rightActualSizes = right.partitionsWithSizes.map(_._2)
      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)

      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
      val leftSkewDesc = new SkewDesc
      val rightSkewDesc = new SkewDesc
      for (partitionIndex <- 0 until numPartitions) {
        val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft
        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex

        val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight
        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex

        // A skewed partition should never be coalesced, but skip it here just to be safe.
        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
          val reducerId = leftPartSpec.startReducerIndex
          val skewSpecs = createSkewPartitionSpecs(
            left.mapStats.shuffleId, reducerId, leftTargetSize)
          if (skewSpecs.isDefined) {
            logDebug(s"Left side partition $partitionIndex is skewed, split it into " +
              s"${skewSpecs.get.length} parts.")
            leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))
          }
          skewSpecs.getOrElse(Seq(leftPartSpec))
        } else {
          Seq(leftPartSpec)
        }

        // A skewed partition should never be coalesced, but skip it here just to be safe.
        val rightParts = if (isRightSkew && !isRightCoalesced) {
          val reducerId = rightPartSpec.startReducerIndex
          val skewSpecs = createSkewPartitionSpecs(
            right.mapStats.shuffleId, reducerId, rightTargetSize)
          if (skewSpecs.isDefined) {
            logDebug(s"Right side partition $partitionIndex is skewed, split it into " +
              s"${skewSpecs.get.length} parts.")
            rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex))
          }
          skewSpecs.getOrElse(Seq(rightPartSpec))
        } else {
          Seq(rightPartSpec)
        }

        for {
          leftSidePartition <- leftParts
          rightSidePartition <- rightParts
        } {
          leftSidePartitions += leftSidePartition
          rightSidePartitions += rightSidePartition
        }
      }

      logDebug("number of skewed partitions: " +
        s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}")
      if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {
        val newLeft = CustomShuffleReaderExec(
          left.shuffleStage, leftSidePartitions, leftSkewDesc.toString)
        val newRight = CustomShuffleReaderExec(
          right.shuffleStage, rightSidePartitions, rightSkewDesc.toString)
        smj.copy(
          left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)
      } else {
        smj
      }
  }
  1. SortMergeJoinExec说明适用于sort merge join
  2. assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)保证进行join的两个task的分区数相等
  3. 分别计算进行join的task的分区中位数的大小leftMedSize和rightMedSize
  4. 分别计算进行join的task的分区的targetzise大小leftTargetSize和rightTargetSize
  5. 循环判断两个task的每个分区的是否存在倾斜,如果倾斜且满足没有进行过shuffle分区合并,则进行倾斜分区处理,否则不处理
  6. createSkewPartitionSpecs方法为:
    1.获取每个join的task的对应分区的数据大小
    2.根据targetSize分成多个slice
  7. 如果存在数据倾斜,则构造包装成CustomShuffleReaderExec,进行后续任务的运行,最最终调用ShuffledRowRDD的compute方法 匹配case PartialMapperPartitionSpec进行数据的读取,其中还会自动开启“spark.sql.adaptive.fetchShuffleBlocksInBatch”批量fetch减少io

OptimizeSkewedJoin/CoalesceShufflePartitions 在哪里被调用

如:AdaptiveSparkPlanExec

@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
    ReuseAdaptiveSubquery(conf, context.subqueryCache),
    CoalesceShufflePartitions(context.session),
    // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
    // added by `CoalesceShufflePartitions`. So they must be executed after it.
    OptimizeSkewedJoin(conf),
    OptimizeLocalShuffleReader(conf)
  )

可见在AdaptiveSparkPlanExec中被调用 ,且CoalesceShufflePartitions先于OptimizeSkewedJoin,
而AdaptiveSparkPlanExec在InsertAdaptiveSparkPlan中被调用
,而InsertAdaptiveSparkPlan在QueryExecution中被调用

而在InsertAdaptiveSparkPlan.shouldApplyAQE方法和supportAdaptive中我们看到

private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
    conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
      plan.find {
        case _: Exchange => true
        case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true
        case p => p.expressions.exists(_.find {
          case _: SubqueryExpression => true
          case _ => false
        }.isDefined)
      }.isDefined
    }
  }

private def supportAdaptive(plan: SparkPlan): Boolean = {
    // TODO migrate dynamic-partition-pruning onto adaptive execution.
    sanityCheck(plan) &&
      !plan.logicalLink.exists(_.isStreaming) &&
      !plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) &&
    plan.children.forall(supportAdaptive)
  }

如果不满足以上条件也是不会开启AQE的,如果要强制开启,也可以配置spark.sql.adaptive.forceApply 为true(文档中提示是内部配置)

注意:

在spark 3.0.1中已经废弃了如下的配置:

spark.sql.adaptive.skewedPartitionMaxSplits    
spark.sql.adaptive.skewedPartitionRowCountThreshold    
spark.sql.adaptive.skewedPartitionSizeThreshold   

本文部分参考:
https://mp.weixin.qq.com/s?__...
https://mp.weixin.qq.com/s/Rv...

查看原文

赞 0 收藏 0 评论 0

鸿乃江边鸟 发布了文章 · 2020-11-28

Data Lakehouse (湖仓一体) 到底是什么

本文转载自 https://mp.weixin.qq.com/s/Il...

背景

数据湖(Data Lake),湖仓一体(Data Lakehouse)俨然已经成为了大数据领域最为火热的流行词,在接受这些流行词洗礼的时候,身为技术人员我们往往会发出这样的疑问,这是一种新的技术吗,还是仅仅只是概念上的翻新(新瓶装旧酒)呢?它到底解决了什么问题,拥有什么样新的特性呢?它的现状是什么,还存在什么问题呢?

带着这些问题,今天就从笔者的理解,为大家揭开Data Lakehouse的神秘面纱,来探一探起技术的本质到底是什么?

Data Lakehouse(湖仓一体)是新出现的一种数据架构,它同时吸收了数据仓库和数据湖的优势,数据分析师和数据科学家可以在同一个数据存储中对数据进行操作,同时它也能为公司进行数据治理带来更多的便利性。那么何为Data Lakehouse呢,它具备些什么特性呢?

本文参考自https://www.xplenty.com/gloss...://www.xplenty.com/glossary/what-is-a-data-lakehouse/。

Data Lakehouse具备什么特性?

一直以来,我们都在使用两种数据存储方式来架构数据:

•数据仓库:数仓这样的一种数据存储架构,它主要存储的是以关系型数据库组织起来的结构化数据。数据通过转换、整合以及清理,并导入到目标表中。在数仓中,数据存储的结构与其定义的schema是强匹配的。
•数据湖:数据湖这样的一种数据存储结构,它可以存储任何类型的数据,包括像图片、文档这样的非结构化数据。数据湖通常更大,其存储成本也更为廉价。存储其中的数据不需要满足特定的schema,数据湖也不会尝试去将特定的schema施行其上。相反的是,数据的拥有者通常会在读取数据的时候解析schema(schema-on-read),当处理相应的数据时,将转换施加其上。

现在许多的公司往往同时会搭建数仓、数据湖这两种存储架构,一个大的数仓和多个小的数据湖。这样,数据在这两种存储中就会有一定的冗余。

Data Lakehouse的出现试图去融合数仓和数据湖这两者之间的差异,通过将数仓构建在数据湖上,使得存储变得更为廉价和弹性,同时lakehouse能够有效地提升数据质量,减小数据冗余。在lakehouse的构建中,ETL起了非常重要的作用,它能够将未经规整的数据湖层数据转换成数仓层结构化的数据。

Data Lakehouse概念是由Databricks在此文[1]中提出的,在提出概念的同时,也列出了如下一些特性:

事务支持:Lakehouse可以处理多条不同的数据管道。这意味着它可以在不破坏数据完整性的前提下支持并发的读写事务。
Schemas:数仓会在所有存储其上的数据上施加Schema,而数据湖则不会。Lakehouse的架构可以根据应用的需求为绝大多数的数据施加schema,使其标准化。
•  报表以及分析应用的支持:报表和分析应用都可以使用这一存储架构。Lakehouse里面所保存的数据经过了清理和整合的过程,它可以用来加速分析。同时相比于数仓,它能够保存更多的数据,数据的时效性也会更高,能显著提升报表的质量。
数据类型扩展:数仓仅可以支持结构化数据,而Lakehouse的结构可以支持更多不同类型的数据,包括文件、视频、音频和系统日志。
端到端的流式支持:Lakehouse可以支持流式分析,从而能够满足实时报表的需求,实时报表在现在越来越多的企业中重要性在逐渐提高。
计算存储分离:我们往往使用低成本硬件和集群化架构来实现数据湖,这样的架构提供了非常廉价的分离式存储。Lakehouse是构建在数据湖之上的,因此自然也采用了存算分离的架构,数据存储在一个集群中,而在另一个集群中进行处理。
开放性:Lakehouse在其构建中通常会使Iceberg,Hudi,Delta Lake等构建组件,首先这些组件是开源开放的,其次这些组件采用了Parquet,ORC这样开放兼容的存储格式作为下层的数据存储格式,因此不同的引擎,不同的语言都可以在Lakehouse上进行操作。

Lakehouse的概念最早是由Databricks所提出的,而其他的类似的产品有Azure Synapse Analytics。Lakehouse技术仍然在发展中,因此上面所述的这些特性也会被不断的修订和改进。

Data lakehouse解决了什么问题

那说完了Data Lakehouse的特性,它到底解决了什么问题呢?

这些年来,在许多的公司里,数仓和数据湖一直并存且各自发展着,也没有遇到过太过严重的问题。但是仍有一些领域有值得进步的空间,比如:

数据重复性:如果一个组织同时维护了一个数据湖和多个数仓,这无疑会带来数据冗余。在最好的情况下,这仅仅只会带来数据处理的不高效,但是在最差的情况下,它会导致数据不一致的情况出现。Data Lakehouse统一了一切,它去除了数据的重复性,真正做到了Single Version of Truth。
高存储成本:数仓和数据湖都是为了降低数据存储的成本。数仓往往是通过降低冗余,以及整合异构的数据源来做到降低成本。而数据湖则往往使用大数据文件系统(譬如Hadoop HDFS)和Spark在廉价的硬件上存储计算数据。而最为廉价的方式是结合这些技术来降低成本,这就是现在Lakehouse架构的目标。
报表和分析应用之间的差异:报表分析师们通常倾向于使用整合后的数据,比如数仓或是数据集市。而数据科学家则更倾向于同数据湖打交道,使用各种分析技术来处理未经加工的数据。在一个组织内,往往这两个团队之间没有太多的交集,但实际上他们之间的工作又有一定的重复和矛盾。而当使用Data Lakehouse后,两个团队可以在同一数据架构上进行工作,避免不必要的重复。
数据停滞(Data stagnation):在数据湖中,数据停滞是一个最为严重的问题,如果数据一直无人治理,那将很快变为数据沼泽。我们往往轻易的将数据丢入湖中,但缺乏有效的治理,长此以往,数据的时效性变得越来越难追溯。Lakehouse的引入,对于海量数据进行catalog,能够更有效地帮助提升分析数据的时效性。
潜在不兼容性带来的风险:数据分析仍是一门兴起的技术,新的工具和技术每年仍在不停地出现中。一些技术可能只和数据湖兼容,而另一些则又可能只和数仓兼容。Lakehouse灵活的架构意味着公司可以为未来做两方面的准备。

Data Lakehouse存在的问题

现有的Lakehouse架构仍存在着一些问题,其中最为显著的是:

大一统的架构:Lakehouse大一统的架构有许多的有点,但也会引入一些问题。通常,大一统的架构缺乏灵活性,难于维护,同时难以满足所有用户的需求,架构师通常更倾向于使用多模的架构,为不同的场景定制不同的范式。
并非现有架构上本质的改进:现在对于Lakehouse是否真的能够带来额外的价值仍存在疑问。同时,也有不同的意见 - 将现有的数仓、数据湖结构与合适的工具结合 - 是否会带来类似的效率呢?
技术尚未成熟:Lakehouse技术当前尚未成熟,在达到上文所提的能力之前仍有较长的路要走。

References

[1] 此文: _https://databricks.com/blog/2...

查看原文

赞 0 收藏 0 评论 0

鸿乃江边鸟 发布了文章 · 2020-11-26

【go系列2】 关于go中与java C++中应该注意的语法糖以及不同点

背景

最近在用golang写k8s operator(其中涉及到informer controler )用于内部调度平台用,刚好借此机会能够与java版本的 informer controller进行对比,由于之前对golang没怎么接触过,遇到了和java c++不同的语法糖,现在列举一下:

具体不同

glang中package

每个 Go 文件都属于且仅属于一个包,必须在源文件中非注释的第一行指明这个文件属于哪个包,如:package main,package main 表示一个可独立执行的程序,
每个 Go 应用程序都包含一个名为 main 的包。package main 包下可以有多个文件,但所有文件中只能有一个 main () 方法,main () 方法代表程序入口

golang中struct

golang中没有class的概念,但是有struct,而且可以给struct增加方法,如下:

type Member struct {
    Id     int    `json:"id"`
}
//绑定到Member结构的方法,但是这种不会改变member的值,因为结构体是值传递,当我们调用setID时,方法接收器接收到是只是结构体变量的一个副本,通过副本对值进行修复,并不会影响调用者,因此,我们可以将方法接收器定义为指针变量,就可达到修改结构体的目的
func (m Member)setId(id int){
    m.Id = id
}
m := Member{}
m.setId(1)
fmt.Println(m.Id)//输出为空
//绑定到Member结构的方法,会改变member的值 
func (m *Member)setId(id int){
    m.Id = id
}
m := Member{}
m.setId(1)
fmt.Println(m.Id)//输出为1

关于方法的接受者和接口如何被调用,参考如下:

  • 不管方法的接收者是值还是指针,对象的值和指针均可以调用该方法型,那么方法的接受者可以是值类型也可以是指针类型
  • 当方法的接收者是值时,不管是值调用还是指针调用,方法内部都是对原对象的副本进行操作,不会影响原对象型,那么方法的接受者必须也是值类型该方法才可以被调用
  • 当方法的接收者是指针时,不管是值调用还是指针调用,方法内部都是通过指针对原对象进行操作,会影响原对象

注意:struct的属性可以没有名字而只有类型,使用时类型即为属性名。(因此,一个struct中同一个类型的匿名属性只能有一个)

定义struct的tags

在定义strut的结构体时,可以添加tag,tag可以在运行时用到,以及形成json或者xml时用到如下:


type NetworkList struct {
    Project  `json:",inline"`
    f2 int    `json:"id,-"`
    f3 string `json:"f3,omitempty"`
    f4 string `json:"f4"`
}

type Project struct {
    Key   string `json:"key"`
    Value string `json:"value"`
}

其中,

名词解释
json:",inline"表示内嵌类型的key和外层struct的key是平行关系,如NetworkList里的key和Project key在形成json的时候是平行关系,不是内嵌关系,也就是在同一级
json:"id,-"-(横杠) 表示私有字段,形成json的时候不包括该key
json:"f3,omitemptyomitempty表示该字段为空,则生成json时,不包括该key
json:"f4"表示生成json时,key为f4

golang的类型转换

c++ java 中有隐式类型转换,golang没有,golang中有强制类型转换和类型断言

  • 强制类型转换
package main

import "fmt"

func main() {
    var a float32 = 5.6
    var b int = 10
    fmt.Println (a * float32(b))
}
  • 类型断言
package main

import "fmt"

func main() {
    var a interface{} =10
    t,ok:= a.(int)
    if ok{
        fmt.Println("int",t)
    }
    t2,ok:= a.(float32)
    if ok{
        fmt.Println("float32",t2)
    }
}

golang中的interface{}

interface{}是空接口没有任何方法,且所有类型都实现了空接口,相当于java中的object,interface类型默认是一个指针

golang中的go关键字

go 关键字用来创建 goroutine (协程),是实现并发的关键

//go 关键字放在方法调用前新建一个 goroutine 并让他执行方法体
go GetThingDone(param1, param2);

//上例的变种,新建一个匿名方法并执行
go func(param1, param2) {
}(val1, val2)

//直接新建一个 goroutine 并在 goroutine 中执行代码块
go {
    //do someting...
}

golang中的defer关键字

  • defer 的执行方式类似其他语言中的析构函数,在函数体执行结束后按照调用顺序的相反顺序逐个执行
  • 即使函数发生严重错误也会执行,相当于finally
  • 常用于资源清理、文件关闭、解锁以及记录时间等操作
func main() {
  fmt.Println("a")
  
  defer fmt.Println("b")
  defer fmt.Println("c")
}
//打印
//a
//c
//b

golang中的init 函数

golang的init函数在该文件被被引用时才执行(是import时,不是调用包函数时)

package lib
import "fmt"
func init() {
    fmt.Println("lib empty init ")
}
----
package main

import (
    "Test/lib"
    "fmt"
)

func main() {
    fmt.Println("wint")
}
// 输出
//lib empty init  
//wint 

匿名函数

该用法和java C++中差不多

func() {
    //func body
}()     //花括号后加()表示函数调用,此处声明时为指定参数列表,
        //故调用执行时也不需要传参
查看原文

赞 1 收藏 0 评论 0

鸿乃江边鸟 发布了文章 · 2020-11-25

浅淡 Apache Kylin 与 ClickHouse 的对比

文章转载于:https://mp.weixin.qq.com/s?__...

Apache Kylin 和 ClickHouse 都是目前市场流行的大数据 OLAP 引擎;Kylin 最初由 eBay 中国研发中心开发,2014 年开源并贡献给 Apache 软件基金会,凭借着亚秒级查询的能力和超高的并发查询能力,被许多大厂所采用,包括美团,滴滴,携程,贝壳找房,腾讯,58同城等;

OLAP 领域这两年炙手可热的 ClickHouse,由俄罗斯搜索巨头 Yandex 开发,于2016年开源,典型用户包括字节跳动、新浪、腾讯等知名企业。

这两种 OLAP 引擎有什么差异,各自有什么优势,如何选择 ?本文将尝试从技术原理、存储结构、优化方法和优势场景等方面,对比这两种 OLAP 引擎, 为大家的技术选型提供一些参考。

01

技术原理

技术原理方面,我们主要从架构生态两方面做个比较。

1.1 技术架构

Kylin 是基于 Hadoop 的 MOLAP (Multi-dimensional OLAP) 技术,核心技术是 OLAP Cube;与传统 MOLAP 技术不同,Kylin 运行在 Hadoop 这个功能强大、扩展性强的平台上,从而可以支持海量 (TB到PB) 的数据;它将预计算(通过 MapReduce 或 Spark 执行)好的多维 Cube 导入到 HBase 这个低延迟的分布式数据库中,从而可以实现亚秒级的查询响应;最近的 Kylin 4 开始使用 Spark + Parquet 来替换 HBase,从而进一步简化架构。由于大量的聚合计算在离线任务(Cube 构建)过程中已经完成,所以执行 SQL 查询时,它不需要再访问原始数据,而是直接利用索引结合聚合结果再二次计算,性能比访问原始数据高百倍甚至千倍;由于 CPU  使用率低,它可以支持较高的并发量,尤其适合自助分析、固定报表等多用户、交互式分析的场景。

ClickHouse 是基于 MPP 架构的分布式 ROLAP (Relational OLAP)分析引擎,各节点职责对等,各自负责一部分数据的处理(shared nothing),开发了向量化执行引擎,利用日志合并树、稀疏索引与 CPU 的 SIMD(单指令多数据 ,Single Instruction Multiple Data)等特性,充分发挥硬件优势,达到高效计算的目的。因此当 ClickHouse 面对大数据量计算的场景,通常能达到 CPU 性能的极限。

1. 2 技术生态

Kylin 采用 Java 编写,充分融入 Hadoop 生态系统,使用 HDFS 做分布式存储,计算引擎可选 MapReduce、Spark、Flink;存储引擎可选 HBase、Parquet(结合 Spark)。源数据接入支持 Hive、Kafka、RDBMS 等,多节点协调依赖 Zookeeper;兼容 Hive 元数据,Kylin 只支持 SELECT 查询,schema 的修改等都需要在 Hive 中完成,然后同步到 Kylin;建模等操作通过 Web UI 完成,任务调度通过 Rest API 进行,Web UI 上可以查看任务进度。

ClickHouse 采用 C++ 编写,自成一套体系,对第三方工具依赖少。支持较完整的 DDL 和 DML,大部分操作可以通过命令行结合 SQL 就可以完成;分布式集群依赖 Zookeper 管理,单节点不用依赖 Zookeper,大部分配置需要通过修改配置文件完成。

02

存储

Kylin 采用 Hadoop 生态的 HBase 或 Parquet 做存储结构,依靠 HBase 的 rowkey 索引或 Parquet 的 Row group 稀疏索引来做查询提速,使用 HBase Region Server 或 Spark executor 做分布式并行计算。ClickHouse 自己管理数据存储,它的存储特点包括:MergeTree 作主要的存储结构,数据压缩分块,稀疏索引等。下面将针对两者的引擎做详细对比。

2.1 Kylin 的存储结构

Kylin 通过预聚合计算出多维 Cube 数据,查询的时候根据查询条件,动态选择最优的 Cuboid (类似于物化视图),这会极大减小 CPU 计算量和 IO 的读取量。

在 Cube 构建过程中,Kylin 将维度值进行一定的编码压缩如字典编码,力图最小化数据存储;由于 Kylin 的存储引擎和构建引擎都是可插拔式的,对于不同的存储引擎,存储结构也有所差异。

HBase 存储

在使用 HBase 作为存储引擎的情况下,在预计算时会对各个维度进行编码,保证维度值长度固定,并且在生成 hfile 时把计算结果中的维度拼接成 rowkey,聚合值作为 value。维度的顺序决定 rowkey 的设计,也会直接影响查询的效率。

Parquet 存储引擎

在使用 Parquet 作为存储格式时则会直接存储维度值和聚合值,而不需要进行编码和 rowkey 拼接。在存成 Parquet 之前,计算引擎会根据维度对计算结果进行排序,维度字段越是靠前,那么在其上的过滤效率也就越高。另外在同一个分区下 shard 的数量和 parquet 文件的 row group 数量也同样会影响查询的效率。

2.2 ClickHouse 的存储结构

ClickHouse 在创建表结构的时候一般要求用户指定分区列。采用数据压缩和纯粹的列式存储技术, 使用 Mergetree 对每一列单独存储并压缩分块,

同时数据总会以片段的形式写入磁盘,当满足一定条件后 ClickHouse 会通过后台线程定期合并这些数据片段。

当数据量持续增大,ClickHouse,会针对分区目录的数据进行合并,提高数据扫描的效率。

同时 ClickHouse 针对每个数据块,提供稀疏索引。在处理查询请求的时候,就能够利用稀疏索引,减少数据扫描起到加速作用。

03

优化方法

Kylin 和 ClickHouse 都是大数据处理系统,当数据量级持续增大的时候,采用合适的优化方法往往能事半功倍,极大地降低查询响应时间,减少存储空间,提升查询性能。由于二者的计算系统和存储系统不同,因此采用的优化方式也不一样,下一小节将着重分析 Kylin 和 ClickHouse 两者的优化方法。

3.1 Kylin 的优化方法

Kylin 的核心原理是预计算,正如第一小节技术原理所说:Kylin 的计算引擎用 Apache Spark,MapReduce;存储用 HBase,Parquet;SQL 解析和后计算用 Apache Calcite。Kylin 的核心技术是研发了一系列的优化方法,来帮助解决维度爆炸和扫描数据过多的问题,这些方法包括:设置聚合组,设置联合维度,设置衍生维度,设置维度表快照,设置 Rowkey 顺序,设置 shard by 列等。

  • 设置聚合组:通过聚合组进行剪枝,减少不必要的预计算组合;
  • 设置联合维度:将经常成对出现的维度组合放在一起,减少不必要的预计算;
  • 设置衍生维度:将能通过其他维度计算出来的维度(例如年,月,日能通过日期计算出来)设置为衍生维度,减少不必要的预计算;
  • 设置维度表快照:放入内存现算,减少占用的存储空间;
  • 字典编码:减少占用的存储空间;
  • RowKey 编码,设置 shard by 列:通过减少数据扫描的行数,加速查询效率

3.2 ClickHouse 优化方法

MPP 架构的系统最常见的优化方式就是分库分表,类似的,ClickHouse 最常见的优化方式包括设置分区和分片,此外 ClickHouse 也包括一些特有的引擎。总结归纳下来,这些优化方法包括:

  1. 用平表结构,代替多表 Join,避免昂贵的 Join 操作和数据混洗
  2. 设置合理的分区键,排序键,二级索引,减少数据扫描
  3. 搭建 ClickHouse 分布式集群增加分片和副本,添加计算资源
  4. 结合物化视图,适当采用 SummingMergetree,AggregateMergetree 等以预计算为核心的引擎

随着后面性能和并发的要求越来越高,对机器的资源消耗也越来越大。在 ClickHouse 的官方网站文档中建议 ClickHouse 的并发数不超过 100,当并发要求高,为减少 ClickHouse 的资源消耗,可以结合 ClickHouse 的一些特殊引擎进行优化。

特殊引擎中最常用的是 SummingMergetree 和 AggregateMergetree,这两种数据结构是从 Mergetree 中派生而来,本质是通过预计算将需要查询的数据提前算出来,保存在 ClickHouse 中,这样查询的时候就能进一步减少资源消耗。

从使用原理来看 SummingMergetree 和 AggregateMergetree 与 Kylin 的Cube有异曲同工之妙。但是当维度过多的时候,管理很多个物化视图是不现实的做法,存在管理成本高等问题。与 ClickHouse 不同,Kylin 提供一系列简单直接的优化方法,来避免维度爆炸的问题。

可以看到,ClickHouse 和 Kylin 都提供一些方法减少存储占用的空间,降低查询时扫描数据的行数。通常认为:对 ClickHouse 和 Kylin 进行适当优化,都能在大数据量场景下满足业务需求。ClickHouse 采用 MPP 现算,Kylin 采用预计算,由于两者采用的技术路线不同因此相应优势场景也不同。

04

优势场景

Kylin 因为采用预计算技术, 适合有固定模式的聚合查询,例如:SQL 中的 join、group by、where条件模式比较固定等,数据量越大,使用 Kylin 的优势越明显;特别的,Kylin 在去重(count distinct)、Top N、Percentile 等场景的优势尤为巨大,大量使用在 Dashboard、各类报表、大屏展示、流量统计、用户行为分析等场景。美团、极光、贝壳找房等使用 Kylin 构建了他们的数据服务平台,每日提供高达数百万到数千万次的查询服务,且大部分查询可以在 2 - 3 秒内完成。这样的高并发场景几乎没有更好的替代方案。

ClickHouse 因为采用 MPP 架构现场计算能力很强,当查询请求比较灵活,或者有明细查询需求,并发量不大的时候比较适用。场景包括:非常多列且 where 条件随意组合的用户标签筛选,并发量不大的复杂即席查询等。如果数据量和访问量较大,需要部署分布式 ClickHouse 集群,这时候对运维的挑战会比较高。

如果有些查询非常灵活,但不经常查,采用现算就比较节省资源,由于查询量少,即使每个查询消耗计算资源大整体来说也可以是划算的。如果有些查询有固定的模式,查询量较大就更适合 Kylin,因为查询量大,利用大的计算资源将计算结果保存,前期的计算成本能够摊薄每个查询中,因此是最经济的。

05

总结

本文就技术原理,存储结构,优化方法及优势场景,对 Kylin 和 ClickHouse 进行了对比。

技术原理方面:ClickHouse 采用 MPP + Shared nothing 架构,查询比较灵活,安装部署和操作简便,由于数据存储在本地,扩容和运维相对较麻烦;Kylin 采用 MOLAP 预计算,基于 Hadoop,计算与存储分离(特别是使用 Parquet 存储后)、Shared storage 的架构,更适合场景相对固定但数据体量很大的场景,基于 Hadoop 便于与现有大数据平台融合,也便于水平伸缩(特别是从 HBase 升级为 Spark + Parquet 后)。

存储结构方面:ClickHouse 存储明细数据,特点包括MergeTree 存储结构和稀疏索引,在明细之上可以进一步创建聚合表来加速性能;Kylin 采用预聚合以及 HBase 或 Parquet 做存储,物化视图对查询透明,聚合查询非常高效但不支持明细查询。

优化方法方面:ClickHouse 包括分区分片和二级索引等优化手段, Kylin 采用聚合组、联合维度、衍生维度、层级维度,以及 rowkey 排序等优化手段

优势场景方面:ClickHouse 通常适合几亿~几十亿量级的灵活查询(更多量级也支持只是集群运维难度会加大)。Kylin 则更适合几十亿~百亿以上的相对固定的查询场景。

下图是一个多方面的汇总:

综合下来, Kylin 和 ClickHouse 有各种使用的领域和场景 。现代数据分析领域没有一种能适应所有场景的分析引擎。企业需要根据自己的业务场景,选择合适的工具解决具体问题。希望本文能够帮助企业做出合适的技术选型。

查看原文

赞 0 收藏 0 评论 1

认证与成就

  • 获得 2 次点赞
  • 获得 1 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 1 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2020-10-16
个人主页被 1.8k 人浏览