Spark调优之Data Serialization

新手上路,请多包涵

Spark调优之Data Serialization

序列化在分布式应用的性能表现上扮演着十分重要的角色,序列化慢并且耗费存储空间的格式会大大降低计算的速度。一般选择合适的序列化格式是优化一个Spark应用程序的一步。Spark致力于在易用性和性能之间找到一个平衡点,提供了两种序列化方式1

1.Java serialization

Java serialization是spark的默认序列化方式,spark序列化对象使用java的ObjectOutputStream框架,只要实现了java.io.Serializable的任意class都能被序列化。并且可以通过继承java.io.Serializable可以更精细的控制序列化的性能。一般来说Java serialization很灵活但是速度很慢,并且会形成非常占用空间的序列化对象。

2.Kryo serialization

Spark也可以使用Kryo框架,Kryo序列化的数据在传输速度和占用空间方面相比与Java serialization有显著的提高(一般提高10x)。但是它不支持所有的可序列化数据,并且需要提前注册在程序中用到的class

3.使用Kryo

  • 开启Kryo序列化
    在程序中可以用conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")把spark的序列化方式切换为Kyro,这样数据在shuffle和rdd存储过程中都会对数据进行kryo序列化,提高从程序的性能。spark官方称之所以kryo不是默认序列化方式的原因是它需要用户自己注册要序列化的函数,不过推荐任何network-intensive的使用Kyro。

如果不想在程序中写入启用Kryo,spark提供了另外一种方式直接在给程序打包后提交给spark-submit时添加--conf spark.serializer=org.apache.spark.serializer.KryoSerializer选项

  • class注册
    开启Kyro序列化后,对于想要使用的Kryo序列化的类都需要提前注册。spark会自动地注册一些常用的核心scala类型,具体有哪些类型可以参考Twitter chill library。用户自己定义的class可以用conf.registerKryoClasses注册。此外如果用--conf选项可以用spark.kryo.classesToRegister属性来注册class,Class用都后分隔如--conf spark.kryo.classesToRegister=MyClass1,MyClass2

选择序列化方式和注册class都是用conf对象设置的,只有在设置完conf对象后才能创建sparkcontext对象,整个流程如下:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
  • 关于Kryo的一些其他conf属性
    spark.kryo.registrationRequired:默认为false,如果设为true程序运行过程中检查当前被序列化的类型是否被注册如果没有就会抛出异常。这样可以保证整个程序所有序列化类型都被注册防止有类型忘记被注册。

NOTE:如果开启Kryo的情况下,被序列化的类型没有被注册,那么类型名会被写入到序列中极大的影响序列化性能,占用空间会下降甚至会不如直接使用java serialization。

**spark.kryo.registrator**指定注册类。注册类是用来注册类的类下面会有实例,使用的就是注册类。本质上这个属性的功能和**conf spark.kryo.classesToRegister**对与注册很多类的情况会相对方便许多

其他conf属性可以参考Spark Configuration

4.实验

实验在本地虚拟机进行,机器是2核心,6000M内存,IDE是IntelliJ使用默认配置
实验的数据是20368800行,每一行第一列是字符串,第二列是整数,第三列是浮点数以空格隔开。实验一共有3个类DataSerializationMyRegistrator, QualifyDataSerialization是主函数,读取测试文件并统计文件行数。Qualify用于存储每一行的数据,是被序列化的类型。MyRegistrator是注册类,作用是注册要被序列化的类,继承KryoRegistrator

package main.scala

import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

object DataSerialization {
  def main(args: Array[String]) {
  
    val start = System.currentTimeMillis()
    val conf = new SparkConf().setAppName("Test DataSerialization").setMaster("local")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.kryo.registrator", "MyRegistrator")
    conf.set("spark.kryo.registrationRequired","true")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val dataPath = "/home/miko/test.txt"
    val data = sc.textFile(dataPath)

    val result = data.map {
      row =>
        val words = row.split(' ')
        new Qualify(words(0), word(1), word(2))
    }.persist(StorageLevel.MEMORY_AND_DISK_SER)

    println(result.count())
    println(System.currentTimeMillis() - start)
  }
}

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
  
    kryo.register(classOf[company])
  }
}

case class Qualify(s:String,i:Int,l:Long)

5.实验结果

Serializar type space cost per partition time cost
Java 144.1M 115s
Kyro 87M 80.23s
Kyro with no registion 179.3M 82.64s

6.结论

从结果上看,Kyro序列化格式占用空间是Java序列化格式的60%左右。速度上是java的70%。可以显著提升性能不过没有官网上说理论上提升10倍那么夸张,估计和机器硬件和程序本身有很大关系。另外在用Kyro serializer的情况下是否class是否注册对结果的影响很大。可以看到在不注册class的情况下,空间占用是注册情况下的两倍还多,甚至不如java serializer,不过时间上的影响到不大。不过要对一个程序中的所有class都注册是一项非常繁琐的工作,实践中应该根据实际情况,注册那些十分需要空间的类型。


  1. footnote
阅读 8.2k
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
1 篇内容引用
宣传栏