前面学习了reduceFunction 和aggregateFunction
现在来学习processFunction
两者的区别是什么?
前者是窗口内的数据来一条处理一条
后者是窗口内的数据全部到齐了之后一起处理
功能上来看都还行,感觉reduce这种在大数据量的情况下更占优势,而process逻辑处理更清楚,因为数据都到齐了我想怎么算怎么算。
public class UvCountByWindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
// 将数据全部发往同一分区,按窗口统计UV
stream.keyBy(data -> true)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new UvCountByWindow())
.print();
env.execute();
}
// 自定义窗口处理函数
public static class UvCountByWindow extends ProcessWindowFunction<Event, String, Boolean, TimeWindow>{
//注意这个boolean是我们再keyby的时候的data->true
//注意这里elements 是一个迭代器,就是我们窗口内的所有数据
//这个函数是可以获取上下文的getRuntimeContext 获取分区keyby的信息 窗口信息
@Override
public void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
HashSet<String> userSet = new HashSet<>();
// 遍历所有数据,放到Set里去重
for (Event event: elements){
userSet.add(event.user);
}
// 结合窗口信息,包装输出内容
Long start = context.window().getStart();
Long end = context.window().getEnd();
out.collect("窗口: " + new Timestamp(start) + " ~ " + new Timestamp(end)
+ " 的独立访客数量是:" + userSet.size());
}
}
}