searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Spark Streaming开发中的一些优化点

2023-05-29 07:03:20
4
0

Spark String 是使用最广泛的实时处理框架之一,其他包括Apache Flink, Apache Storm, Kafka Streams。
Spark Streamming 相对于其他有更对的性能问题,它的处理是通过时间窗口而不是逐个事件,会导致延迟发生。
良好的配置和开发有有时候会使性能提高10到20倍,
分享一些可以优化的配置:


一. Kafka的直接实现


1) Spark2 的版本中有两种方式接收kafka数据,一种是兼容kafka0.8.x,另外一种是direct模式,
其中第一种方式缺乏并行性且不兼容TLS, 如果需要同时并行读取多个topic,就需要使用direct模式
第一种方式:
  val kafkaStream = KafkaUtils.createStream(
         streamingContext,
         [ZK quorum],
         [consumer group id],
         [per-topic number of Kafka partitions to consume]
    )
direct模式并不关心数据是否来自一个或多个topic
     val kafkaStream = KafkaUtils.createDirectStream[String, String]
(ssc,locationStrategy, consumerStrategy)


二,offset 偏移量管理


offset偏移量指示分配给spark消费者的数据从何处读取,这个非常重要,这个保证了推流过程中的HA,
避免出现错误时丢失数据。
在kafka 0.10.x和spark 2 中offset的管理是通过kafka,为了防止在数据处理过程中
丢失数据,一般有以下几种方式:
1.使用checkpointing
      val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
2.
在代码中自己管理offset
  stream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      
      // some time later, after outputs have completed
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }


三,分区管理


在kafka中对即将接收的topic进行分区消费非常重要,进行分区可以运行在不同的spark执行器中
并行接收数据。
以下是创建三个程序并行分别使用两个cpu和2gb内存
./bin/kafka-topics.sh --create --replication-factor 1 --partitions 3 --zookeeper zookeeper:2181 
--topic topicname

 conf.set("spark.cores.max", 6)
 conf.set("spark.executor.cores", 2)
 conf.set("spark.executor.memory", 2GB)


四, spark streaming的一些优化配置


1. 设置poll的超时时间
在spark2中默认的超时时间是512ms, 如果有很多任务是“Task failed”则需要
增加该值的value继续观察
    conf.set("spark.streaming.kafka.consumer.poll.ms", 512)
2, 系列化
使用序列化在kafka中非常重要,你可以使用默认的序列化函数
     val serializers = Map( 
         "key.deserializer" -> classOf[StringDeserializer],
         "value.deserializer" -> classOf[RowDeserializer] 
     ) 
当然你也可以使用kryo进行序列化
      conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      conf.registerKryoClasses(Array(classOf[Foo], classOf[Var]))
      conf.set("spark.kryo.registrationRequired", "true")

 

0条评论
0 / 1000