水位线与窗口
对于流式数据,时间是一个重要的标识。在flink的事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数据的时间戳来驱动的。
但在分布式系统中,这种驱动方式又会有一些问题。因为数据本身在处理转换的过程中会变化,如果遇到窗口聚合这样的操作,其实是要攒一批数据才会输出一个结果,那么下游的数据就会变少,时间进度的控制就不够精细了。
所以我们应该把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。
水位线设置
这里我们将通过mysql-cdc来生成一个水位线,我们在读取数据源的一侧进行设置。
package cn.ctyun.demo.api.watermark;
import cn.ctyun.demo.api.utils.TransformUtil;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
/**
* @classname: ViewContentStreamWithWaterMark
* @description: 拥有水位线
* @author: Liu Xinyuan
* @create: 2023-04-14 09:50
**/
public class ViewContentStreamWithWaterMark {
public static DataStream<JSONObject> getViewContentDataStream(StreamExecutionEnvironment env){
// 1.创建Flink-MySQL-CDC的Source
MySqlSource<String> viewContentSouce = MySqlSource.<String>builder()
.hostname("***")
.port(3306)
.username("***")
.password("***")
.databaseList("test_cdc_source")
.tableList("test_cdc_source.user_view")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("Asia/Shanghai")
.build();
// 2.使用CDC Source从MySQL读取数据
DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
viewContentSouce,
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner(
new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String extractData, long l) {
return JSONObject.parseObject(extractData).getLong("ts_ms");
}
}
),
"ViewContentStreamWithWatermark Source"
);
// 3.转换为指定格式
return mysqlDataStreamSource.map(TransformUtil::formatResult);
}
}
我们在cdc传来的数据中获取他的日志自带更新时间戳字段ts_ms时间戳作为我们的事件时间,并生成水位线,此后此数据流将包含水位线进行后续地传递。
窗口设置
在窗口中,有着不同的设置,可以面对不同的场景。我们按照数据不同的分配规则,将窗口的具体实现分为了以下四种,如下所示:
-
滚动窗口(Tumbling Windows):滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子都是滚动窗口。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口。滚动窗口也是在BI分析中最常用的窗口类型之一。
- 滑动窗口(Sliding Windows ):与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。所以定义滑动窗口的参数有两个:窗口大小(window size)定义了窗口的大小,还有一个“滑动步长”(window slide),代表了窗口计算的频率。
- 会话窗口(Session Windows):会话窗口顾名思义,是基于“会话”(session)来来对数据进行分组的。这里的会话类似Web 应用中 session 的概念,不过并不表示两端的通讯过程,而是借用会话超时失效的机制来 描述窗口。简单来说,就是数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来,那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。一般而言将会给数据设置一个超时时间,如果两个数据间间隔过长并大于超时时间。在这里所有能够控制的就是超时时间(gap),其作为判定新窗口开启的一个重要指标。
- 全局窗口(Session Windows):这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。
窗口API
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)
窗口函数MapReduce
在这里,我们首先定义一个MapReduce过程,用来统计目前十秒内的访问统计数量,这里的水位线设定请参考代码ViewContentStreamWithWaterMark
(上文中提供的代码),具体的MapReduce如下所示
package cn.ctyun.demo.api;
import cn.ctyun.demo.api.watermark.ViewContentStreamWithWaterMark;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @classname: ApiTimeWindow
* @description: 时间窗的使用
* @author: Liu Xinyuan
* @create: 2023-04-17 20:39
**/
public class ApiTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<JSONObject> viewContentDataStream = ViewContentStreamWithWaterMark.getViewContentDataStream(env);
viewContentDataStream.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
// 不将删除的数据考虑在内
return !value.getString("op").equals("d");
}
}).map(new MapFunction<JSONObject, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(JSONObject value) throws Exception {
return Tuple2.of(value.getString("user_name"), 1L);
}
}).keyBy(r -> r.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 设定一个累加规则
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}).print();
env.execute();
}
}
这里设定了一个时间窗口为10秒,最终的结果为每十秒钟将统计一个登录统计,并输出到控制台。使用时间窗口后和不加的唯一区别是计算的范围变为了时间窗内计算。