searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Flink集成Alluxio

2023-12-08 06:08:51
30
0
  • Flink可以读取alluxio中保存的文件/将数据写入alluxio中,需要使用java程序,不支持connector形式
  • 将state保存在alluxio中,降低state的读取的时间消耗

1. alluxio的单机部署

  1. cp -r  /usr/local/alluxio-2.9.3/conf/alluxio-site.properties.template    /usr/local/alluxio-2.9.3/conf/alluxio-site.properties
  2. 在上述的alluxio-site.properties配置如下内容
# Common properties
 alluxio.master.hostname=localhost
alluxio.master.mount.table.root.ufs=/tmp/underFSStorage
alluxio.worker.tieredstore.level0.alias=MEM
alluxio.worker.tieredstore.level0.dirs.path=/data02/alluxio/ramdisk
  • 挂载RAMFS文件系统
    • 运行以下命令以挂载RAMFS文件系统。

                    /usr/local/alluxio-2.9.3/bin/alluxio-mount.sh SudoMount

  • 本地启动Alluxio文件系统
    • 简单运行如下的命令来启动Alluxio文件系统。
      • 如果您尚未挂载ramdisk或要重新挂载(如为了改变ramdisk大小)
         /usr/local/alluxio-2.9.3/bin/alluxio-start.sh local SudoMount
      • 或者,如果已经安装了ramdisk
         /usr/local/alluxio-2.9.3/bin/alluxio-start.sh local
  • 查看结果

2. 前期的配置

  • 在hadoop的core-site.xml中添加属性:

         <property>
                <name>fs.alluxio.impl</name>
               <value>alluxio.hadoop.FileSystem</value>
        </property>

  • conf/flink-conf.yaml文件中,配置fs.hdfs.hadoopconf的值为core-site.xml的目录

             fs.hdfs.hadoopconf: /usr/local/hadoop3/etc/hadoop

  • 在Flink的lib目录中,添加/usr/local/alluxio-2.9.3/client/alluxio-2.9.3-client.jar
  • 在conf/alluxio-site.properties中配置的属性,在{FLINK_HOME}/conf/flink-conf .yaml文件中将这些属性转化为env.java.opts
    • 例如,将CACHE_THROUGH作为Alluxio客户端的写文件方式

3. Flink的程序开发和配置

  • Flink读取alluxio中的文件
    •   将文件路径指定为alluxio的根目录
      • /usr/local/alluxio-2.9.3/bin/alluxio fs copyFromLocal /data02/test/alluxioTest/word.txt

    • 确保alluxio的文件权限,当前linux 用户可以编辑
    • 碰到以下问题:为kerberos的问题,需要询问哪个用户可以使用,在可用用户上,获取flink组件的文件编辑权限,运行flink作业
      • 同时该用户,要具有对alluxio的链接编辑权限

        /usr/local/flink/bin/flink run -m yarn-cluster /usr/local/flink/examples/batch/WordCount.jar  --input alluxio://localhost:19998/flink/word.txt  --output alluxio://localhost:19998/flink/output

4. 结果的显示

5. 使用alluxio保存state

  • flink-conf.yaml中配置state和checkpoint,或者stream程序中,添加state和checkpoint代码
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 5s
state.checkpoints.num-retained: 1
# 用于存储 operator state 快照的 State Backend
state.backend: rocksdb
state.backend.incremental: true
# # 存储快照的目录
state.checkpoints.dir: alluxio://alluxioIP:19998/flink/checkpoints
  • 执行flink作业,查看state保存结果
    • /usr/local/flink/bin/flink run -m yarn-cluster /usr/local/flink/examples/streaming/TopSpeedWindowing.jar --output 

0条评论
0 / 1000
张****领
4文章数
0粉丝数
张****领
4 文章 | 0 粉丝
张****领
4文章数
0粉丝数
张****领
4 文章 | 0 粉丝
原创

Flink集成Alluxio

2023-12-08 06:08:51
30
0
  • Flink可以读取alluxio中保存的文件/将数据写入alluxio中,需要使用java程序,不支持connector形式
  • 将state保存在alluxio中,降低state的读取的时间消耗

1. alluxio的单机部署

  1. cp -r  /usr/local/alluxio-2.9.3/conf/alluxio-site.properties.template    /usr/local/alluxio-2.9.3/conf/alluxio-site.properties
  2. 在上述的alluxio-site.properties配置如下内容
# Common properties
 alluxio.master.hostname=localhost
alluxio.master.mount.table.root.ufs=/tmp/underFSStorage
alluxio.worker.tieredstore.level0.alias=MEM
alluxio.worker.tieredstore.level0.dirs.path=/data02/alluxio/ramdisk
  • 挂载RAMFS文件系统
    • 运行以下命令以挂载RAMFS文件系统。

                    /usr/local/alluxio-2.9.3/bin/alluxio-mount.sh SudoMount

  • 本地启动Alluxio文件系统
    • 简单运行如下的命令来启动Alluxio文件系统。
      • 如果您尚未挂载ramdisk或要重新挂载(如为了改变ramdisk大小)
         /usr/local/alluxio-2.9.3/bin/alluxio-start.sh local SudoMount
      • 或者,如果已经安装了ramdisk
         /usr/local/alluxio-2.9.3/bin/alluxio-start.sh local
  • 查看结果

2. 前期的配置

  • 在hadoop的core-site.xml中添加属性:

         <property>
                <name>fs.alluxio.impl</name>
               <value>alluxio.hadoop.FileSystem</value>
        </property>

  • conf/flink-conf.yaml文件中,配置fs.hdfs.hadoopconf的值为core-site.xml的目录

             fs.hdfs.hadoopconf: /usr/local/hadoop3/etc/hadoop

  • 在Flink的lib目录中,添加/usr/local/alluxio-2.9.3/client/alluxio-2.9.3-client.jar
  • 在conf/alluxio-site.properties中配置的属性,在{FLINK_HOME}/conf/flink-conf .yaml文件中将这些属性转化为env.java.opts
    • 例如,将CACHE_THROUGH作为Alluxio客户端的写文件方式

3. Flink的程序开发和配置

  • Flink读取alluxio中的文件
    •   将文件路径指定为alluxio的根目录
      • /usr/local/alluxio-2.9.3/bin/alluxio fs copyFromLocal /data02/test/alluxioTest/word.txt

    • 确保alluxio的文件权限,当前linux 用户可以编辑
    • 碰到以下问题:为kerberos的问题,需要询问哪个用户可以使用,在可用用户上,获取flink组件的文件编辑权限,运行flink作业
      • 同时该用户,要具有对alluxio的链接编辑权限

        /usr/local/flink/bin/flink run -m yarn-cluster /usr/local/flink/examples/batch/WordCount.jar  --input alluxio://localhost:19998/flink/word.txt  --output alluxio://localhost:19998/flink/output

4. 结果的显示

5. 使用alluxio保存state

  • flink-conf.yaml中配置state和checkpoint,或者stream程序中,添加state和checkpoint代码
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 5s
state.checkpoints.num-retained: 1
# 用于存储 operator state 快照的 State Backend
state.backend: rocksdb
state.backend.incremental: true
# # 存储快照的目录
state.checkpoints.dir: alluxio://alluxioIP:19998/flink/checkpoints
  • 执行flink作业,查看state保存结果
    • /usr/local/flink/bin/flink run -m yarn-cluster /usr/local/flink/examples/streaming/TopSpeedWindowing.jar --output 

文章来自个人专栏
Flink的学习
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0