思路如下:
- 定义一个POJO类,注意flink里使用的类必须有一个无参的构造方法
- 自定义DataSource实现SourceFunction接口
- 使用
ctx.collect()
传入想要发送的数据就可以了
首先定义一个POJO类:
class MyData {
public int keyId;
public long timestamp;
public double value;
public MyData() {
}
public MyData(int accountId, long timestamp, double value) {
this.keyId = accountId;
this.timestamp = timestamp;
this.value = value;
}
public long getKeyId() {
return keyId;
}
public void setKeyId(int keyId) {
this.keyId = keyId;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public double getValue() {
return value;
}
public void setValue(double value) {
this.value = value;
}
@Override
public String toString() {
return "MyData{" +
"keyId=" + keyId +
", timestamp=" + timestamp +
", value=" + value +
'}';
}
}
生成自己的数据:
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class CreateMyData {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<MyData> sourceStream = env.addSource(new MyDataSource());
env.setParallelism(3);
sourceStream.print();
env.execute();
}
private static class MyDataSource implements SourceFunction<MyData> {
// 定义标志位,用来控制数据的产生
private boolean isRunning = true;
private final Random random = new Random(0);
@Override
public void run(SourceContext ctx) throws Exception {
while (isRunning) {
ctx.collect(new MyData(random.nextInt(5), System.currentTimeMillis(), random.nextFloat()));
Thread.sleep(1000L); // 1s生成1个数据
}
}
@Override
public void cancel() {
isRunning = false;
}
}
}