- Flink可以读取alluxio中保存的文件/将数据写入alluxio中,需要使用java程序,不支持connector形式
- 将state保存在alluxio中,降低state的读取的时间消耗
1. alluxio的单机部署
- cp -r /usr/local/alluxio-2.9.3/conf/alluxio-site.properties.template /usr/local/alluxio-2.9.3/conf/alluxio-site.properties
- 在上述的alluxio-site.properties配置如下内容
# Common properties alluxio.master.hostname=localhost alluxio.master.mount.table.root.ufs=/tmp/underFSStorage alluxio.worker.tieredstore.level0.alias=MEM alluxio.worker.tieredstore.level0.dirs.path=/data02/alluxio/ramdisk |
- 挂载RAMFS文件系统
-
运行以下命令以挂载RAMFS文件系统。
-
/usr/local/alluxio-2.9.3/bin/alluxio-mount.sh SudoMount
- 本地启动Alluxio文件系统
- 简单运行如下的命令来启动Alluxio文件系统。
- 如果您尚未挂载ramdisk或要重新挂载(如为了改变ramdisk大小)
/usr/local/alluxio-2.9.3/bin/alluxio-start.sh local SudoMount - 或者,如果已经安装了ramdisk
/usr/local/alluxio-2.9.3/bin/alluxio-start.sh local
- 如果您尚未挂载ramdisk或要重新挂载(如为了改变ramdisk大小)
- 简单运行如下的命令来启动Alluxio文件系统。
- 查看结果
2. 前期的配置
- 在hadoop的core-site.xml中添加属性:
<property>
<name>fs.alluxio.impl</name>
<value>alluxio.hadoop.FileSystem</value>
</property>
- 在
conf/flink-conf.yaml
文件中,配置fs.hdfs.hadoopconf
的值为core-site.xml
的目录
fs.hdfs.hadoopconf: /usr/local/hadoop3/etc/hadoop
- 在Flink的
lib
目录中,添加/usr/local/alluxio-2.9.3/client/alluxio-2.9.3-client.jar
在conf/alluxio-site.properties中配置的属性,在{FLINK_HOME}/conf/flink-conf .yaml文件中将这些属性转化为env.java.opts
例如,将CACHE_THROUGH作为Alluxio客户端的写文件方式
3. Flink的程序开发和配置
- Flink读取alluxio中的文件
- 将文件路径指定为alluxio的根目录
-
/usr/local/alluxio-2.9.3/bin/alluxio fs copyFromLocal /data02/test/alluxioTest/word.txt
-
- 将文件路径指定为alluxio的根目录
-
- 确保alluxio的文件权限,当前linux 用户可以编辑
- 碰到以下问题:为kerberos的问题,需要询问哪个用户可以使用,在可用用户上,获取flink组件的文件编辑权限,运行flink作业
-
同时该用户,要具有对alluxio的链接编辑权限
/usr/local/flink/bin/flink run -m yarn-cluster /usr/local/flink/examples/batch/WordCount.jar --input alluxio:
//localhost:19998/flink/word.txt --output alluxio://localhost:19998/flink/output
-
4. 结果的显示
5. 使用alluxio保存state
- flink-conf.yaml中配置state和checkpoint,或者stream程序中,添加state和checkpoint代码
execution.checkpointing.mode: EXACTLY_ONCE execution.checkpointing.interval: 5s state.checkpoints.num-retained: 1 # 用于存储 operator state 快照的 State Backend state.backend: rocksdb state.backend.incremental: true # # 存储快照的目录 state.checkpoints.dir: alluxio: //alluxioIP:19998/flink/checkpoints |
- 执行flink作业,查看state保存结果
-
/usr/local/flink/bin/flink run -m yarn-cluster /usr/local/flink/examples/streaming/TopSpeedWindowing.jar --output
-