1. 依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
2. 代码
- 接收来自socket(nc -l 9001)的数据
- 每两秒进行一次wc聚合
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("localhost", 9001);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount =
text.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
public Tuple2<String, Integer> map(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
public String getKey(Tuple2<String, Integer> tup) throws Exception {
return tup.f0;
}
}).timeWindow(Time.seconds(2)
).sum(1);
wordCount.print().setParallelism(1);
env.execute("SocketWindowWordCountJava");
flink datastream程序有几个组成:
- env:创建flink job的运行环境
- source:从哪里获取到数据
- trans:对数据具体做哪些处理
- sink:数据写到什么地方
- execute:触发任务的运行