public interface ReduceFunction<T> extends Function, Serializable { /** * The core method of ReduceFunction, combining two values into one value of the same type. The * reduce function is consecutively applied to all values of a group until only a single value * remains. * * @param value1 The first value to combine. * @param value2 The second value to combine. * @return The combined value of both input values. * @throws Exception This method may throw exceptions. Throwing an exception will cause the * operation to fail and may trigger recovery. */ T reduce(T value1, T value2) throws Exception; }注意源码注意,reduce就是combine两个value成为一个value,注意类型是一样的,
特性是 流式函数中数据是连绵不绝的 value1 value2 。。。value999
reduce函数是将 value1 value2两个值合并为一个new value
然后拿着new value 和value3 再进行合并,得到一个new new value
然后拿着new new value 和value4进行合并,得到new new new value...
一直到value9999 最后生成的结果还是value 。不会说是一个tuple 一个list set map
简单案例,统计每个用户窗口时间内的访问量 (也可以是每个url被访问的量)
注意这个量肯定是逐渐增加的
//自定义source
public class ClickSource implements SourceFunction<Event> {
// 声明一个布尔变量,作为控制数据生成的标识位
private Boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
Random random = new Random(); // 在指定的数据集中随机选取数据
String[] users = {"Mary", "Alice", "Bob", "Cary"};
String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
while (running) {
ctx.collect(new Event(
users[random.nextInt(users.length)],
urls[random.nextInt(urls.length)],
Calendar.getInstance().getTimeInMillis()
));
//这里也可以发送水位线
// ctx.collectWithTimestamp();
// ctx.emitWatermark(new Watermark(1));
// 隔1秒生成一个点击事件,方便观测
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
public class WindowReduceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//这个有点类似预聚合,比如月活,之前的批处理是一个月的数据最后一天处理
//目前是每一个窗口的数据都聚合 这个是流处理处理批处理数据,
// processfunction就是批处理,窗口内所有的数据都来了再一起处理
// 从自定义数据源读取数据,并提取时间戳、生成水位线
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {//提取时间戳作为水位线
return element.timestamp;
}
}));
stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
// 将数据转换成二元组,方便计算
return Tuple2.of(value.user, 1L);
}
})
.keyBy(r -> r.f0)// 根据用户名分组统计各用户的点击次数
// 设置滚动事件时间窗口
.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 每5s统计一次
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 定义累加规则,窗口闭合时,向下游发送累加结果 上次的点击总数+1
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
.print();
env.execute();
}
}
注意
1.这里没有设置allowedLateness 因为没有必要,测试的是reduce。
2.这里用的是boundoutorderness 但是注意乱序=0 也就是=递增数据
3.自定义的source 是1s发一条数据,
注意这个打印的结果 为什么第一条里只有6条数据呢? 按道理都是10条数据的。很简单是因为只有6条数据在开启的窗口里。