针对eventtime处理乱序数据,如何保证在需要的窗口内获得指定的数据?
flink采用watermark allowedLateness() sideOutputLateData()三个机制来保证获取数据
先来示例
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object demo1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime )
val inputStream: DataStream[String] = env.socketTextStream("hadoop102",7777)
val outputTag = new OutputTag[SensorReading]("side")
val dataStream = inputStream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(2)) {
override def extractTimestamp(element: SensorReading): Long = {
element.timestamp*1000 //我的测试时间戳是s,flink要求ms
}
})
val minStream: DataStream[SensorReading] = dataStream.keyBy(_.id)
// .window( SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2)))
.timeWindow(Time.seconds(10))
.allowedLateness(Time.seconds(4))
.sideOutputLateData(outputTag)
.minBy("temperature")
dataStream.print("data")
minStream.print("min")
minStream.getSideOutput(outputTag).print("slide")
env.execute("demo1")
}
}
case class SensorReading(id: String, timestamp: Long, temperature: Double)
注意参数
1、窗口开窗为10s ,此次采用滚动窗口比较简单点
2、watermark为2s
3、允许延迟为4s
注意事项
1、如果用我的代码进行测试,不要修改测试数据第一条,因为涉及到计算窗口的start
测试数据
sensor_1, 1547718120,20
sensor_1, 1547718130,10
sensor_1, 1547718131,9
sensor_1, 1547718132,8
sensor_1, 1547718120,9
sensor_1, 1547718135,5
sensor_1, 1547718120,9
sensor_1, 1547718136,4
sensor_1, 1547718120,9
打印结果
data> SensorReading(sensor_1,1547718120,20.0)
data> SensorReading(sensor_1,1547718130,10.0)
data> SensorReading(sensor_1,1547718131,9.0)
data> SensorReading(sensor_1,1547718132,8.0)
min> SensorReading(sensor_1,1547718120,20.0)
data> SensorReading(sensor_1,1547718120,9.0)
min> SensorReading(sensor_1,1547718120,9.0)
data> SensorReading(sensor_1,1547718135,5.0)
data> SensorReading(sensor_1,1547718120,9.0)
min> SensorReading(sensor_1,1547718120,9.0)
data> SensorReading(sensor_1,1547718136,4.0)
data> SensorReading(sensor_1,1547718120,9.0)
slide> SensorReading(sensor_1,1547718120,9.0)
说明
1、经过计算窗口的开始时间是1547718120,所以第一个窗口是【20-30),
2、第一个窗口关闭的时间是20+10+2=32,所以当输入32这条数据的时候【20-30)的窗口关闭,此时窗口内的数据只有20,所以算出温度最小值为20。
3、当第一次输入SensorReading(sensor_1,1547718120,9.0)这条数据的时候,allowlateness起作用,认为这条数据也是延迟数据,对原先算出的最小值20进行修正,最后算出min=9.0
备注如果不加allowlateness。此时窗口【20-30)已经关闭了,对数据是没有影响的
4、此时需要计算一个最多延迟时间20+10+2+4=36,所以输入35的时候,这条数据,会进入到第二个窗口,同时第一个窗口还没有彻底关闭,所以再次输入 SensorReading(sensor_1,1547718120,9.0),仍然会进入到【20-30)的窗口,并在此计算最小值
5、输入SensorReading(sensor_1,1547718136,4.0),窗口彻底关闭,再次输入 SensorReading(sensor_1,1547718120,9.0),不再对第一个窗口min进行修正,直接把数据放到测输入流,以后所有的【20-30)的数据在输入都会全部放到侧输出流
总结
1、窗口window 的作用是为了周期性的获取数据
2、watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法,
3、allowLateNess,是将窗口关闭时间再延迟一段时间,
思考?这里的allowLateNess 感觉就好像window变大了,那么为什么不直接把window设置大一点呢?或者把watermark加大点
业务需要,比如我业务需要统计每个小时内的数据,那么开窗一定是1h,但是数据乱序可能会达到几分钟,一般来说水印设置的都比较小(为什么呢?暂时不知道),所以提出了延迟时间这个概念
2022-09-14 更新
比如来的数据是 1 2 3 4 5 6 4 7 8 9 ....999 1000 1001 窗口假设是10s,水印是2s
设置的过大1000s,那么1-10的窗口要在数据1+1000+10=1011来的时候才会关闭,就违背了流处理的原则:实时。
设置的过小1s,上图中4s数据就会丢失,就会导致数据不能完全接收。
4、sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流