spark序列化方式
分布式的程序存在着网络传输,无论是数据还是程序本身的序列化都是必不可少的。spark自身提供两种序列化方式:
- java序列化:这是spark默认的序列化方式,使用java的ObjectOutputStream框架,只要是实现了
java.io.Serializable
接口的类都可以,这种方式虽然通用但是性能差,占用的空间也比较大 - kryo序列化:相比于java序列化,kryo更高效且序列化结果紧凑,占用空间小,但是不能序列化所有数据类型,且为更好的性能,需要在程序中注册需要序列化的类
kryo不作为默认的序列化方式,是因为需要显式注册自定义的类型,自spark2.0后,对于一些简单类型的rdd(AllScalaRegistrar默认注册了一些常用的基本类型)在shuffling时内部默认使用kryo作序列化
SparkSql与序列化
SparkSql并不使用kryo或java序列化,Dataset使用的是Encoder将jvm对象转换为二进制(《spark数据格式UnsafeRow》),类似于序列化过程,但是Encoder是动态生成代码,并使用标准的InternalRow格式,使得spark可以直接基于字节上做很多操作(不需要反序列化过程),比如filtering,sorting和hashing;Encoder比kryo和java序列化更轻量级,因为它不用额外保存类的描述信息。
在spark中使用kryo
kryo使用比较麻烦,但为了更好的性能和使用更少的内存,还是建议使用kryo序列化。
- 初始化sparkcontext时指定使用kryo序列化
- 向kryo注册自定义类(
registerKryoClasses->org.apache.spark.serializer.KryoSerializer.classesToRegister
)
val conf=new SparkConf().setAppName("kryo-test").setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.registerKryoClasses(Array(classOf[com.test.spark.KryoTest.Person],
classOf[Array[com.test.spark.KryoTest.Person]],
classOf[scala.collection.mutable.WrappedArray.ofRef[_]]))
val spark=SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
case class Person(val name: String, val age: Long)
val array = (1 to 100000).map(v => Person("person" + v, v)).toSeq
val rdd = spark.sparkContext.parallelize(array, 5).persist(StorageLevel.MEMORY_ONLY_SER)
rdd.count
rdd.take(10).foreach(p => println(p.age, p.name))
上面的示例执行完后在sparkui上看到使用内存为1641.3 KB
使用kryo但不注册类
val conf=new SparkConf().setAppName("kryo-test").setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val spark=SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
case class Person(val name: String, val age: Long)
val array = (1 to 100000).map(v => Person("person" + v, v)).toSeq
val rdd = spark.sparkContext.parallelize(array, 5).persist(StorageLevel.MEMORY_ONLY_SER)
rdd.count
rdd.take(10).foreach(p => println(p.age, p.name))
上面的示例执行完后在sparkui上看到使用内存为4.8 MB,是注册类时1641.3 KB的差不多3倍,不仅如此,如果在Stage界面观察Task Deserialization Time
和Result Serialization Time
两项指标,也可以看出注册了类的话耗时更少。
当序列化Person实例对象时,如果不注册Person类,那么会写入Person类的完全限定类名,如果注册了,则会使用一个int类型的ID(1-2字节)代替完全限定类名,显然注册类更加高效且节省空间
这个ID是通过com.esotericsoftware.kryo.Kryo#nextRegisterID自增生成的,与类唯一对应,在反序列化时通过ID反向找到类然后实例化。所以在spark这种分布式的程序中,Person类在所有executor中都必需对应着相同的ID值,这是如何保证的?是因为在所有的executor中代码相同所以注册类的顺序一致,还是在driver中把所有类统一注册然后广播到各个executor中?
kryo与java占用内存对比
将上面代码改为使用java序列化方式,最终得到2.7MB
序列化方式 | 占用内存 |
---|---|
kryo且注册类 | 1641.3 KB |
kryo不注册类 | 4.8 MB |
java | 2.7 MB |
可以看到kryo在不注册类的情况时,rdd缓存占用的内存比使用java时还要多
Encoders.kryo vs Encoders.javaSerialization
在Dataset中如果想使用kryo序列化,可以通过工厂类org.apache.spark.sql.Encoders
生成一个使用kryo序列化/java序列化的Encoder,但是创建的Dataset并不是一个标准的数据集,因为得到的数据集中有唯一一列”value”,而这个列的值则是整行记录的二进制数据
val df_java_ser = spark.createDataset(array)(Encoders.javaSerialization(classOf[Person])).persist(StorageLevel.MEMORY_ONLY_SER)
df_java_ser.count
df_java_ser.show
val df_kryo_ser = spark.createDataset(array)(Encoders.kryo(classOf[Person])).persist(StorageLevel.MEMORY_ONLY_SER)
df_kryo_ser.count
df_kryo_ser.show(3)
//show的结果如下
+--------------------+
| value|
+--------------------+
|[01 00 63 6F 6D 2...|
|[01 00 63 6F 6D 2...|
|[01 00 63 6F 6D 2...|
+--------------------+
上面分别使用kryo和java序列化时分别占用内在为:5.2M和16.8M
在Dataset两种方式最后大小都比上面rdd的多,猜测这是因为行数据序列化为二进制后被包装成UnsafeRow,所以需要很多额外的空间。
但是在同样没注册到Person类的情况下,RDD是java优于kryo,Dataset是kryo优于java,这是什么原因,求留言告知!!!
撇开Dataset直接对Person对象进行序列化(一样不注册Person类):
val bf_kryo = new org.apache.spark.serializer.KryoSerializer(new SparkConf()).newInstance().serialize(Person("abcd" , 1))
val bf_java = new org.apache.spark.serializer.JavaSerializer(new SparkConf()).newInstance().serialize(Person("abcd" , 1))
kryo使用了42字节,java使用了103字节,既然单个对象的序列化结果都是kryo优于java,为什么在RDD中却相反,why?