广播变量的概念及需求
Spark Application最开始在Driver端,在我们提交任务的时候,需要传递到各个Executor的Task上运行。对于一些只读、固定的数据(比如从DB中读出的数据),每次需要Driver广播到各个Task上,每次进下数据的拉取,查询效率比较低下。广播变量允许将变量只广播(提前广播)给各个Executor。该Executor上的各个Task再从所在节点BlockManager获取变量,如果本地没有,那么就从Driver远程拉取变量副本,并保存在本地的BlockManager中。此后这个executor上的task,都会直接使用本地的BlockManager中的副本。而不是从Driver获取变量,从而提升效率。
注:一个Executor只需要在第一个Task启动时,获得一份Broadcast数据,之后的Task都从本节点的BlockManager中获取相关数据。
广播变量的使用方法
1)调用SparkContext.broadcast方法创建一个Broadcast[T]对象。任何序列化的类型都可以这么实现。
2)通过value属性访问改对象的值(Java之中为value()方法)
3)变量只会被发送到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)
kryo序列化概念及需求
默认情况下,Spark内部是使用Java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化。
这种默认序列化机制的好处在于,处理起来比较方便,也不需要手动去做什么事情,只在算子里面使用的变量,必须是实现Serializable接口的,可序列化即可。
但是缺点在于,默认的序列化机制的效率不高,序列化的速度比较慢,序列化以后的数据,占用的内存空间相对还是比较大。
Spark支持使用Kryo序列化机制。这种序列化机制,比默认的Java序列化机制速度要快,序列化后的数据更小,大概是Java序列化机制的1/10。
所以Kryo序列化优化以后,可以让网络传输的数据变少,在集群中耗费的内存资源大大减少。
Kryo序列化机制启用以后生效的几个地方
1)、算子函数中使用到的外部变量,使用Kryo以后:优化网络传输的性能,可以优化集群中内存的占用和消耗
2)、持久化RDD,优化内存的占用和消耗。持久化RDD占用的内存越少,task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC。
3)、shuffle:可以优化网络传输的性能
Kryo序列化使用方法
第一步,在SparkConf中设置一个属性,spark.serializer,org.apache.spark.serializer.KryoSerializer类。
第二步,注册你使用的需要通过Kryo序列化的一些自定义类,SparkConf.registerKryoClasses()。
项目中的使用:
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})