话不多说直接上代码
object windowtest1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1) //必加!!!!!因为不加的话 每个分区都有自己watermark 需要大量的数据,可以自行尝试
env.getConfig.setAutoWatermarkInterval(200L) //可不加
val ds: DataStream[String] = env.socketTextStream("hadoop102",8888)
val min: DataStream[SensorReading] = ds.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {
override def extractTimestamp(element: SensorReading) = {
element.timestamp * 1000
}
})
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.minBy("temperature")
ds.print("data")
min.print("min")
env.execute("window test1")
}
}
case class SensorReading(id: String, timestamp: Long, temperature: Double)
注意上面参数
1、timeWindow(Time.seconds(5))开窗长度5s
2、new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) 窗口乱序最大3s
3、这个是滚动开窗,根据源码可知窗口开始时间为1547718120
测试数据
sensor_1, 1547718120,20
sensor_1, 1547718121,19
sensor_1, 1547718122,18
sensor_1, 1547718123,17
sensor_1, 1547718124,16
sensor_1, 1547718125,15
sensor_1, 1547718126,14
sensor_1, 1547718127,13
sensor_1, 1547718128,12
sensor_1, 1547718129,11
sensor_1, 1547718130,10
sensor_1, 1547718131,9
sensor_1, 1547718132,8
sensor_1, 1547718133,7
sensor_1, 1547718134,6
最后数据结果
注意:所有时间戳是以s为单位,但是系统是以ms为单位
为了方便解释,后续所有时间戳以后两位代替说明
说明1、黄色区域为窗口划分情况[20-24),[25-29),前闭后开行
2、红色是闭窗时输出的内容
解析
问题1、为什么会在28s的时候输出最小值?
在输入时间戳28到的时候,此时watermark=28-3(延时时间)=25,此时要关闭25的窗口即[20-25),然后会输出该窗口的最小值
就是温度为16的
问题2、滚动时间窗口,窗口中的内容和watermark有什么关系
目前来看这两者没有直接关系,你的数据在哪个窗口,只取决于你的数据中指定的时间戳,时区(一般不考虑),windowsize三个要素
比如我开5s的窗口,延迟时间为1s,第一个窗口是0-4.9999s,watermark=当前最大时间-延迟时间,那么哪些数据会放到第一个窗口呢?应该是0-5.99999s的数据,前提是watermark<5,一旦watermark=5,那么第一个窗口会立马关闭,即使是0-5.999的数据也不会进入窗口
例如以下数据
0 0.1 1 2 3 4 5 5. 5 5.9 6 这批数据除了6都会进入到第一个窗口
0 0.1 1 2 6 3 4 5 5. 5 5.9 这批数据只有在6之前的数据会进入窗口,其余3 4 5 5. 5 5.9 不会进入其他窗口
貌似会进入测输出流,需要allowedLateness() /sideOutputLateData()参数设置
总给:
watermark=最大时间-延迟时间,只能递增
watermark决定了窗口是否关闭