Spark String 是使用最广泛的实时处理框架之一,其他包括Apache Flink, Apache Storm, Kafka Streams。
Spark Streamming 相对于其他有更对的性能问题,它的处理是通过时间窗口而不是逐个事件,会导致延迟发生。
良好的配置和开发有有时候会使性能提高10到20倍,
分享一些可以优化的配置:
一. Kafka的直接实现
1) Spark2 的版本中有两种方式接收kafka数据,一种是兼容kafka0.8.x,另外一种是direct模式,
其中第一种方式缺乏并行性且不兼容TLS, 如果需要同时并行读取多个topic,就需要使用direct模式
第一种方式:
val kafkaStream = KafkaUtils.createStream(
streamingContext,
[ZK quorum],
[consumer group id],
[per-topic number of Kafka partitions to consume]
)
direct模式并不关心数据是否来自一个或多个topic
val kafkaStream = KafkaUtils.createDirectStream[String, String]
(ssc,locationStrategy, consumerStrategy)
二,offset 偏移量管理
offset偏移量指示分配给spark消费者的数据从何处读取,这个非常重要,这个保证了推流过程中的HA,
避免出现错误时丢失数据。
在kafka 0.10.x和spark 2 中offset的管理是通过kafka,为了防止在数据处理过程中
丢失数据,一般有以下几种方式:
1.使用checkpointing
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
2.
在代码中自己管理offset
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
三,分区管理
在kafka中对即将接收的topic进行分区消费非常重要,进行分区可以运行在不同的spark执行器中
并行接收数据。
以下是创建三个程序并行分别使用两个cpu和2gb内存
./bin/kafka-topics.sh --create --replication-factor 1 --partitions 3 --zookeeper zookeeper:2181
--topic topicname
conf.set("spark.cores.max", 6)
conf.set("spark.executor.cores", 2)
conf.set("spark.executor.memory", 2GB)
四, spark streaming的一些优化配置
1. 设置poll的超时时间
在spark2中默认的超时时间是512ms, 如果有很多任务是“Task failed”则需要
增加该值的value继续观察
conf.set("spark.streaming.kafka.consumer.poll.ms", 512)
2, 系列化
使用序列化在kafka中非常重要,你可以使用默认的序列化函数
val serializers = Map(
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[RowDeserializer]
)
当然你也可以使用kryo进行序列化
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[Foo], classOf[Var]))
conf.set("spark.kryo.registrationRequired", "true")