searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Flink SQL Client使用介绍

2023-05-23 11:51:44
872
0

 

1. 入门

本节介绍如何在命令行里启动和运行你的第一个 Flink SQL 程序。SQL 客户端绑定在常规的 Flink 发行包中,因此可以直接运行。仅需要一个正在运行的 Flink 集群就可以在上面执行 Table 程序。如果仅想试用 SQL 客户端,也可以使用以下命令启动本地集群:

在Yarn集群上运行:

 注意:SQL client如果使用yarn session模式启动,会查找/tmp/.yarn-properties-{用户名}文件中指定的application id,将SQL提交到这个yarn session上运行。因此在使用Yarn方式的时候需要额外注意,启动Yarn session和SQL client必须使用相同的用户。

1.1 启动SQL客户端CLI

SQL 客户端脚本也位于 Flink 的 bin 目录中。将来,用户有两种方式来启动 SQL 客户端命令行界面:通过嵌入式独立进程或者通过连接到远程 SQL 客户端网关。目前仅支持嵌入式模式,现在默认模式就是嵌入式。可以通过以下方式启动 CLI:

 注意:低版本不能使用该命令

或者显式使用嵌入式模式:

1.2 执行SQL查询

CLI 启动后,你可以使用 HELP 命令列出所有可用的 SQL 语句。为了验证你的设置及集群连接是否正确,可以输入一条 SQL 查询语句并按 Enter 键执行:

该查询不需要数据表,并且只会产生一行结果。CLI 将从集群中检索结果并将其可视化。可以按 Q 键退出结果视图。CLI 为维护和可视化结果提供三种模式。下面具体看一下。

 注意:Flink 1.24.0 版本使用 execution.result-mode 参数。

1.2.1 表格模式

表格模式(table mode)在内存中物化结果,并将结果用规则的分页表格的形式可视化展示出来。执行如下命令启用:

 文档中 SET ‘sql-client.execution.result-mode’ = ‘xxx’ 方式配置不生效

 可以使用如下查询语句查看不同模式的的运行结果:

1.2.2 变更日志模式

变更日志模式(changelog mode)不会物化结果。可视化展示由插入(+)和撤销(-)组成的持续查询结果流。

1.2.3 Tableau模式

Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容取决于作业执行模式(execution.type):

 注意:当你在流式查询上使用这种模式时,Flink 会将结果持续的打印在当前的控制台上。如果流式查询的输入是有限数据集,那么 Flink 在处理完所有的输入数据之后,作业会自动停止,同时控制台上的打印也会自动停止。如果你想提前结束这个查询,那么可以直接使用 CTRL-C 按键,这个会停止作业同时停止在控制台上的打印。

2. 配置

2.1 启动选项

可以使用如下可选 CLI 命令启动 SQL 客户端:

2.2 客户端配置

Key

默认值

类型

说明

sql-client.execution.max-table-result.rows

1000000

Integer

在表格模式下缓存的行数。如果行数超过指定值,则以 FIFO 样式重试该行。

sql-client.execution.result-mode

TABLE

枚举值,可以是 TABLE, CHANGELOG, TABLEAU

确定展示查询结果的模式。可以是 table、tableau、changelog。

sql-client.verbose

false

Boolean

确定是否将输出详细信息输出到控制台。如果将选项设置为 true,会打印异常堆栈。否则,只输出原因。

2.3 使用SQL文件初始化会话

SQL 查询需要配置执行环境。SQL 客户端支持 -i 启动选项,以在启动 SQL 客户端时执行初始化 SQL 文件以设置环境,注意,只用于初始化环境,查询或者插入操作是禁止的。细则戳这里

所谓的初始化 SQL 文件,可以使用 DDL 来定义可用的 catalogs、table source、sink、用户自定义函数以及其他执行和部署所需的属性。

下面给出了此类文件的示例:

上述配置:

  • 连接到 Hive Catalog 并使用 MyCatalog 作为当前 Catalog,使用 MyDatabase 作为目录的当前数据库
  • 定义一个可以从 CSV 文件中读取数据的表 MyTable
  • 定义一个视图 MyCustomView,它使用 SQL 查询声明一个虚拟表
  • 定义了一个可以使用类名实例化的用户定义函数 myUDF
  • 在流模式下使用 blink 计划器运行语句,并且设置并行度为 1
  • 使用表格模式运行 SQL 进行探索性查询,

使用 -i  选项初始化 SQL 客户端会话时,初始化 SQL 文件中允许使用以下语句:

  • DDL(CREATE/DROP/ALTER)
  • USE CATALOG/DATABASE
  • LOAD/UNLOAD MODULE
  • SET 命令
  • RESET 命令

执行查询或插入语句时,请进入交互模式或使用-f选项提交SQL语句。如果 SQL 客户端在初始化时遇到错误,SQL 客户端将退出并显示错误信息。

3. 使用SQL客户端提交作业

SQL 客户端可以允许用户在交互式命令行中或使用 -f 选项执行 sql 文件来提交作业。在这两种模式下,SQL 客户端都可以支持解析和执行 Flink 支持的所有类型的 SQL 语句。

3.1 交互式命令行

在交互式命令行中,SQL 客户端读取用户输入并在获取分号 (;) 时执行语句。如果语句成功执行,SQL 客户端会打印成功消息。当出现错误时,SQL 客户端也会打印错误信息。默认情况下,错误消息仅包含错误原因。为了打印完整的异常堆栈以进行调试,需要通过如下命令设置:

 将 sql-client.verbose 设置为 true

3.2 执行SQL文件

SQL 客户端支持使用 -f 选项执行 SQL 脚本文件。SQL 客户端会一一执行 SQL 脚本文件中的语句,并为每条执行的语句打印执行信息。一旦一条语句失败,SQL 客户端就会退出,所有剩余的语句也不会执行。

下面给出了此类文件的示例:

这个配置:

  • 定义从 CSV 文件读取的时态表 users,
  • 设置属性,例如作业名称,
  • 设置保存点路径,
  • 提交从指定保存点路径加载保存点的 sql 作业。

 与交互模式相比,SQL 客户端遇到错误会停止执行并退出。

3.3 执行一组SQL语句

SQL 客户端将每个 INSERT INTO 语句作为单个 Flink 作业执行。但是,这有时性能不是最佳,因为 Pipeline 的某些部分可以重复使用。SQL 客户端支持 STATEMENT SET 语法来执行一组 SQL 语句。这与 Table API 中 StatementSet 功能类似。STATEMENT SET 语法包含一个或多个 INSERT INTO 语句。 STATEMENT SET 块中的所有语句都要经过整体优化后作为一个 Flink 作业执行。

具体语法如下:

 包含在 STATEMENT SET 中的语句必须用分号 (;) 分隔。

具体看一下示例:

3.4 同步/异步执行DML语句

默认情况下,SQL 客户端异步执行 DML 语句。这意味着,SQL 客户端将 DML 语句的作业提交给 Flink 集群即可,不用等待作业完成。所以 SQL 客户端可以同时提交多个作业。这对于通常长时间运行的流作业很有用。SQL 客户端确保语句成功提交到集群。提交语句后,CLI 将显示有关 Flink 作业的信息:

SQL 客户端再提交作业后不会跟踪作业的状态。CLI 进程可以在提交后关闭而不影响查询。Flink 的重启策略负责容错。可以使用 Flink 的 Web 界面、命令行或 REST API 取消查询。但是,对于批处理用户,更常见的是下一个 DML 语句需要等待前一个 DML 语句完成才能执行。为了同步执行 DML 语句,我们可以在 SQL 客户端中设置 table.dml-sync 选项为 true:

 如果要终止作业,只需键入 CTRL-C 即可取消执行。

3.5 从保存点启动SQL作业

Flink 支持从指定的保存点启动作业。在 SQL 客户端中,允许使用 SET 命令指定保存点的路径:

当指定了保存点的路径时,Flink 会在执行 DML 语句时会尝试从保存点恢复状态。因为指定的保存点路径会影响后面所有的 DML 语句,你可以使用 RESET 命令来重置这个配置选项,即禁用从保存点恢复:

3.6 自定义作业名称

SQL 客户端支持通过 SET 命令为查询和 DML 语句定义作业名称:

因为指定的作业名会影响后面所有的查询和 DML 语句,你也可以使用 RESET 命令来重置这个配置,即使用默认的作业名:

如果未指定选项 pipeline.name,SQL 客户端将为提交的作业生成默认名称,例如 insert-into_ 用于 INSERT INTO 语句。

4. 兼容性

为了与之前版本兼容,SQL 客户端仍然支持使用 YAML 文件进行初始化,并允许在 YAML 文件中设置 key。当在 YAML 文件中定义 key 时,SQL 客户端将打印警告消息以通知:

当使用 SET 命令打印属性时,SQL 客户端会打印所有的属性。为了区分不推荐使用的 key,SQL 客户端使用 [DEPRECATED] 作为标识符:

 

 

 

5. 报错记录

5.1 SQL client 提交任务的时候连接拒绝

这个问题报错比较笼统。真实的原因是sql-client无法连接到Flink集群的job manager。

如果使用standalone模式,需要执行./start-cluster.sh启动一个standalone集群。

如果使用Yarn session模式,则需要:

  • 启动sql client之前需要export HADOOP_CLASSPATH环境变量。
  • 提交yarn session和启动sql client需要使用同一个用户,否则会找不到yarn session对应的application id。
  • 确保当前机器的Yarn客户端配置无问题。可通过执行yarn命令是否能正常返回集群信息确认。

 

 

0条评论
0 / 1000
Sirius.
4文章数
0粉丝数
Sirius.
4 文章 | 0 粉丝
Sirius.
4文章数
0粉丝数
Sirius.
4 文章 | 0 粉丝
原创

Flink SQL Client使用介绍

2023-05-23 11:51:44
872
0

 

1. 入门

本节介绍如何在命令行里启动和运行你的第一个 Flink SQL 程序。SQL 客户端绑定在常规的 Flink 发行包中,因此可以直接运行。仅需要一个正在运行的 Flink 集群就可以在上面执行 Table 程序。如果仅想试用 SQL 客户端,也可以使用以下命令启动本地集群:

在Yarn集群上运行:

 注意:SQL client如果使用yarn session模式启动,会查找/tmp/.yarn-properties-{用户名}文件中指定的application id,将SQL提交到这个yarn session上运行。因此在使用Yarn方式的时候需要额外注意,启动Yarn session和SQL client必须使用相同的用户。

1.1 启动SQL客户端CLI

SQL 客户端脚本也位于 Flink 的 bin 目录中。将来,用户有两种方式来启动 SQL 客户端命令行界面:通过嵌入式独立进程或者通过连接到远程 SQL 客户端网关。目前仅支持嵌入式模式,现在默认模式就是嵌入式。可以通过以下方式启动 CLI:

 注意:低版本不能使用该命令

或者显式使用嵌入式模式:

1.2 执行SQL查询

CLI 启动后,你可以使用 HELP 命令列出所有可用的 SQL 语句。为了验证你的设置及集群连接是否正确,可以输入一条 SQL 查询语句并按 Enter 键执行:

该查询不需要数据表,并且只会产生一行结果。CLI 将从集群中检索结果并将其可视化。可以按 Q 键退出结果视图。CLI 为维护和可视化结果提供三种模式。下面具体看一下。

 注意:Flink 1.24.0 版本使用 execution.result-mode 参数。

1.2.1 表格模式

表格模式(table mode)在内存中物化结果,并将结果用规则的分页表格的形式可视化展示出来。执行如下命令启用:

 文档中 SET ‘sql-client.execution.result-mode’ = ‘xxx’ 方式配置不生效

 可以使用如下查询语句查看不同模式的的运行结果:

1.2.2 变更日志模式

变更日志模式(changelog mode)不会物化结果。可视化展示由插入(+)和撤销(-)组成的持续查询结果流。

1.2.3 Tableau模式

Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容取决于作业执行模式(execution.type):

 注意:当你在流式查询上使用这种模式时,Flink 会将结果持续的打印在当前的控制台上。如果流式查询的输入是有限数据集,那么 Flink 在处理完所有的输入数据之后,作业会自动停止,同时控制台上的打印也会自动停止。如果你想提前结束这个查询,那么可以直接使用 CTRL-C 按键,这个会停止作业同时停止在控制台上的打印。

2. 配置

2.1 启动选项

可以使用如下可选 CLI 命令启动 SQL 客户端:

2.2 客户端配置

Key

默认值

类型

说明

sql-client.execution.max-table-result.rows

1000000

Integer

在表格模式下缓存的行数。如果行数超过指定值,则以 FIFO 样式重试该行。

sql-client.execution.result-mode

TABLE

枚举值,可以是 TABLE, CHANGELOG, TABLEAU

确定展示查询结果的模式。可以是 table、tableau、changelog。

sql-client.verbose

false

Boolean

确定是否将输出详细信息输出到控制台。如果将选项设置为 true,会打印异常堆栈。否则,只输出原因。

2.3 使用SQL文件初始化会话

SQL 查询需要配置执行环境。SQL 客户端支持 -i 启动选项,以在启动 SQL 客户端时执行初始化 SQL 文件以设置环境,注意,只用于初始化环境,查询或者插入操作是禁止的。细则戳这里

所谓的初始化 SQL 文件,可以使用 DDL 来定义可用的 catalogs、table source、sink、用户自定义函数以及其他执行和部署所需的属性。

下面给出了此类文件的示例:

上述配置:

  • 连接到 Hive Catalog 并使用 MyCatalog 作为当前 Catalog,使用 MyDatabase 作为目录的当前数据库
  • 定义一个可以从 CSV 文件中读取数据的表 MyTable
  • 定义一个视图 MyCustomView,它使用 SQL 查询声明一个虚拟表
  • 定义了一个可以使用类名实例化的用户定义函数 myUDF
  • 在流模式下使用 blink 计划器运行语句,并且设置并行度为 1
  • 使用表格模式运行 SQL 进行探索性查询,

使用 -i  选项初始化 SQL 客户端会话时,初始化 SQL 文件中允许使用以下语句:

  • DDL(CREATE/DROP/ALTER)
  • USE CATALOG/DATABASE
  • LOAD/UNLOAD MODULE
  • SET 命令
  • RESET 命令

执行查询或插入语句时,请进入交互模式或使用-f选项提交SQL语句。如果 SQL 客户端在初始化时遇到错误,SQL 客户端将退出并显示错误信息。

3. 使用SQL客户端提交作业

SQL 客户端可以允许用户在交互式命令行中或使用 -f 选项执行 sql 文件来提交作业。在这两种模式下,SQL 客户端都可以支持解析和执行 Flink 支持的所有类型的 SQL 语句。

3.1 交互式命令行

在交互式命令行中,SQL 客户端读取用户输入并在获取分号 (;) 时执行语句。如果语句成功执行,SQL 客户端会打印成功消息。当出现错误时,SQL 客户端也会打印错误信息。默认情况下,错误消息仅包含错误原因。为了打印完整的异常堆栈以进行调试,需要通过如下命令设置:

 将 sql-client.verbose 设置为 true

3.2 执行SQL文件

SQL 客户端支持使用 -f 选项执行 SQL 脚本文件。SQL 客户端会一一执行 SQL 脚本文件中的语句,并为每条执行的语句打印执行信息。一旦一条语句失败,SQL 客户端就会退出,所有剩余的语句也不会执行。

下面给出了此类文件的示例:

这个配置:

  • 定义从 CSV 文件读取的时态表 users,
  • 设置属性,例如作业名称,
  • 设置保存点路径,
  • 提交从指定保存点路径加载保存点的 sql 作业。

 与交互模式相比,SQL 客户端遇到错误会停止执行并退出。

3.3 执行一组SQL语句

SQL 客户端将每个 INSERT INTO 语句作为单个 Flink 作业执行。但是,这有时性能不是最佳,因为 Pipeline 的某些部分可以重复使用。SQL 客户端支持 STATEMENT SET 语法来执行一组 SQL 语句。这与 Table API 中 StatementSet 功能类似。STATEMENT SET 语法包含一个或多个 INSERT INTO 语句。 STATEMENT SET 块中的所有语句都要经过整体优化后作为一个 Flink 作业执行。

具体语法如下:

 包含在 STATEMENT SET 中的语句必须用分号 (;) 分隔。

具体看一下示例:

3.4 同步/异步执行DML语句

默认情况下,SQL 客户端异步执行 DML 语句。这意味着,SQL 客户端将 DML 语句的作业提交给 Flink 集群即可,不用等待作业完成。所以 SQL 客户端可以同时提交多个作业。这对于通常长时间运行的流作业很有用。SQL 客户端确保语句成功提交到集群。提交语句后,CLI 将显示有关 Flink 作业的信息:

SQL 客户端再提交作业后不会跟踪作业的状态。CLI 进程可以在提交后关闭而不影响查询。Flink 的重启策略负责容错。可以使用 Flink 的 Web 界面、命令行或 REST API 取消查询。但是,对于批处理用户,更常见的是下一个 DML 语句需要等待前一个 DML 语句完成才能执行。为了同步执行 DML 语句,我们可以在 SQL 客户端中设置 table.dml-sync 选项为 true:

 如果要终止作业,只需键入 CTRL-C 即可取消执行。

3.5 从保存点启动SQL作业

Flink 支持从指定的保存点启动作业。在 SQL 客户端中,允许使用 SET 命令指定保存点的路径:

当指定了保存点的路径时,Flink 会在执行 DML 语句时会尝试从保存点恢复状态。因为指定的保存点路径会影响后面所有的 DML 语句,你可以使用 RESET 命令来重置这个配置选项,即禁用从保存点恢复:

3.6 自定义作业名称

SQL 客户端支持通过 SET 命令为查询和 DML 语句定义作业名称:

因为指定的作业名会影响后面所有的查询和 DML 语句,你也可以使用 RESET 命令来重置这个配置,即使用默认的作业名:

如果未指定选项 pipeline.name,SQL 客户端将为提交的作业生成默认名称,例如 insert-into_ 用于 INSERT INTO 语句。

4. 兼容性

为了与之前版本兼容,SQL 客户端仍然支持使用 YAML 文件进行初始化,并允许在 YAML 文件中设置 key。当在 YAML 文件中定义 key 时,SQL 客户端将打印警告消息以通知:

当使用 SET 命令打印属性时,SQL 客户端会打印所有的属性。为了区分不推荐使用的 key,SQL 客户端使用 [DEPRECATED] 作为标识符:

 

 

 

5. 报错记录

5.1 SQL client 提交任务的时候连接拒绝

这个问题报错比较笼统。真实的原因是sql-client无法连接到Flink集群的job manager。

如果使用standalone模式,需要执行./start-cluster.sh启动一个standalone集群。

如果使用Yarn session模式,则需要:

  • 启动sql client之前需要export HADOOP_CLASSPATH环境变量。
  • 提交yarn session和启动sql client需要使用同一个用户,否则会找不到yarn session对应的application id。
  • 确保当前机器的Yarn客户端配置无问题。可通过执行yarn命令是否能正常返回集群信息确认。

 

 

文章来自个人专栏
Sirius
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0