1. 入门
本节介绍如何在命令行里启动和运行你的第一个 Flink SQL 程序。SQL 客户端绑定在常规的 Flink 发行包中,因此可以直接运行。仅需要一个正在运行的 Flink 集群就可以在上面执行 Table 程序。如果仅想试用 SQL 客户端,也可以使用以下命令启动本地集群:
在Yarn集群上运行:
注意:SQL client如果使用yarn session模式启动,会查找/tmp/.yarn-properties-{用户名}文件中指定的application id,将SQL提交到这个yarn session上运行。因此在使用Yarn方式的时候需要额外注意,启动Yarn session和SQL client必须使用相同的用户。
1.1 启动SQL客户端CLI
SQL 客户端脚本也位于 Flink 的 bin 目录中。将来,用户有两种方式来启动 SQL 客户端命令行界面:通过嵌入式独立进程或者通过连接到远程 SQL 客户端网关。目前仅支持嵌入式模式,现在默认模式就是嵌入式。可以通过以下方式启动 CLI:
注意:低版本不能使用该命令
或者显式使用嵌入式模式:
1.2 执行SQL查询
CLI 启动后,你可以使用 HELP 命令列出所有可用的 SQL 语句。为了验证你的设置及集群连接是否正确,可以输入一条 SQL 查询语句并按 Enter 键执行:
该查询不需要数据表,并且只会产生一行结果。CLI 将从集群中检索结果并将其可视化。可以按 Q 键退出结果视图。CLI 为维护和可视化结果提供三种模式。下面具体看一下。
注意:Flink 1.24.0 版本使用 execution.result-mode 参数。
1.2.1 表格模式
表格模式(table mode)在内存中物化结果,并将结果用规则的分页表格的形式可视化展示出来。执行如下命令启用:
文档中 SET ‘sql-client.execution.result-mode’ = ‘xxx’ 方式配置不生效
可以使用如下查询语句查看不同模式的的运行结果:
1.2.2 变更日志模式
变更日志模式(changelog mode)不会物化结果。可视化展示由插入(+)和撤销(-)组成的持续查询结果流。
1.2.3 Tableau模式
Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容取决于作业执行模式(execution.type):
注意:当你在流式查询上使用这种模式时,Flink 会将结果持续的打印在当前的控制台上。如果流式查询的输入是有限数据集,那么 Flink 在处理完所有的输入数据之后,作业会自动停止,同时控制台上的打印也会自动停止。如果你想提前结束这个查询,那么可以直接使用 CTRL-C 按键,这个会停止作业同时停止在控制台上的打印。
2. 配置
2.1 启动选项
可以使用如下可选 CLI 命令启动 SQL 客户端:
2.2 客户端配置
Key |
默认值 |
类型 |
说明 |
sql-client.execution.max-table-result.rows |
1000000 |
Integer |
在表格模式下缓存的行数。如果行数超过指定值,则以 FIFO 样式重试该行。 |
sql-client.execution.result-mode |
TABLE |
枚举值,可以是 TABLE, CHANGELOG, TABLEAU |
确定展示查询结果的模式。可以是 table、tableau、changelog。 |
sql-client.verbose |
false |
Boolean |
确定是否将输出详细信息输出到控制台。如果将选项设置为 true,会打印异常堆栈。否则,只输出原因。 |
2.3 使用SQL文件初始化会话
SQL 查询需要配置执行环境。SQL 客户端支持 -i 启动选项,以在启动 SQL 客户端时执行初始化 SQL 文件以设置环境,注意,只用于初始化环境,查询或者插入操作是禁止的。细则戳这里
所谓的初始化 SQL 文件,可以使用 DDL 来定义可用的 catalogs、table source、sink、用户自定义函数以及其他执行和部署所需的属性。
下面给出了此类文件的示例:
上述配置:
- 连接到 Hive Catalog 并使用 MyCatalog 作为当前 Catalog,使用 MyDatabase 作为目录的当前数据库
- 定义一个可以从 CSV 文件中读取数据的表 MyTable
- 定义一个视图 MyCustomView,它使用 SQL 查询声明一个虚拟表
- 定义了一个可以使用类名实例化的用户定义函数 myUDF
- 在流模式下使用 blink 计划器运行语句,并且设置并行度为 1
- 使用表格模式运行 SQL 进行探索性查询,
使用 -i 选项初始化 SQL 客户端会话时,初始化 SQL 文件中允许使用以下语句:
- DDL(CREATE/DROP/ALTER)
- USE CATALOG/DATABASE
- LOAD/UNLOAD MODULE
- SET 命令
- RESET 命令
执行查询或插入语句时,请进入交互模式或使用-f选项提交SQL语句。如果 SQL 客户端在初始化时遇到错误,SQL 客户端将退出并显示错误信息。
3. 使用SQL客户端提交作业
SQL 客户端可以允许用户在交互式命令行中或使用 -f 选项执行 sql 文件来提交作业。在这两种模式下,SQL 客户端都可以支持解析和执行 Flink 支持的所有类型的 SQL 语句。
3.1 交互式命令行
在交互式命令行中,SQL 客户端读取用户输入并在获取分号 (;) 时执行语句。如果语句成功执行,SQL 客户端会打印成功消息。当出现错误时,SQL 客户端也会打印错误信息。默认情况下,错误消息仅包含错误原因。为了打印完整的异常堆栈以进行调试,需要通过如下命令设置:
将 sql-client.verbose 设置为 true
3.2 执行SQL文件
SQL 客户端支持使用 -f 选项执行 SQL 脚本文件。SQL 客户端会一一执行 SQL 脚本文件中的语句,并为每条执行的语句打印执行信息。一旦一条语句失败,SQL 客户端就会退出,所有剩余的语句也不会执行。
下面给出了此类文件的示例:
这个配置:
- 定义从 CSV 文件读取的时态表 users,
- 设置属性,例如作业名称,
- 设置保存点路径,
- 提交从指定保存点路径加载保存点的 sql 作业。
与交互模式相比,SQL 客户端遇到错误会停止执行并退出。
3.3 执行一组SQL语句
SQL 客户端将每个 INSERT INTO 语句作为单个 Flink 作业执行。但是,这有时性能不是最佳,因为 Pipeline 的某些部分可以重复使用。SQL 客户端支持 STATEMENT SET 语法来执行一组 SQL 语句。这与 Table API 中 StatementSet 功能类似。STATEMENT SET 语法包含一个或多个 INSERT INTO 语句。 STATEMENT SET 块中的所有语句都要经过整体优化后作为一个 Flink 作业执行。
具体语法如下:
包含在 STATEMENT SET 中的语句必须用分号 (;) 分隔。
具体看一下示例:
3.4 同步/异步执行DML语句
默认情况下,SQL 客户端异步执行 DML 语句。这意味着,SQL 客户端将 DML 语句的作业提交给 Flink 集群即可,不用等待作业完成。所以 SQL 客户端可以同时提交多个作业。这对于通常长时间运行的流作业很有用。SQL 客户端确保语句成功提交到集群。提交语句后,CLI 将显示有关 Flink 作业的信息:
SQL 客户端再提交作业后不会跟踪作业的状态。CLI 进程可以在提交后关闭而不影响查询。Flink 的重启策略负责容错。可以使用 Flink 的 Web 界面、命令行或 REST API 取消查询。但是,对于批处理用户,更常见的是下一个 DML 语句需要等待前一个 DML 语句完成才能执行。为了同步执行 DML 语句,我们可以在 SQL 客户端中设置 table.dml-sync 选项为 true:
如果要终止作业,只需键入 CTRL-C 即可取消执行。
3.5 从保存点启动SQL作业
Flink 支持从指定的保存点启动作业。在 SQL 客户端中,允许使用 SET 命令指定保存点的路径:
当指定了保存点的路径时,Flink 会在执行 DML 语句时会尝试从保存点恢复状态。因为指定的保存点路径会影响后面所有的 DML 语句,你可以使用 RESET 命令来重置这个配置选项,即禁用从保存点恢复:
3.6 自定义作业名称
SQL 客户端支持通过 SET 命令为查询和 DML 语句定义作业名称:
因为指定的作业名会影响后面所有的查询和 DML 语句,你也可以使用 RESET 命令来重置这个配置,即使用默认的作业名:
如果未指定选项 pipeline.name,SQL 客户端将为提交的作业生成默认名称,例如 insert-into_ 用于 INSERT INTO 语句。
4. 兼容性
为了与之前版本兼容,SQL 客户端仍然支持使用 YAML 文件进行初始化,并允许在 YAML 文件中设置 key。当在 YAML 文件中定义 key 时,SQL 客户端将打印警告消息以通知:
当使用 SET 命令打印属性时,SQL 客户端会打印所有的属性。为了区分不推荐使用的 key,SQL 客户端使用 [DEPRECATED] 作为标识符:
5. 报错记录
5.1 SQL client 提交任务的时候连接拒绝
这个问题报错比较笼统。真实的原因是sql-client无法连接到Flink集群的job manager。
如果使用standalone模式,需要执行./start-cluster.sh启动一个standalone集群。
如果使用Yarn session模式,则需要:
- 启动sql client之前需要export HADOOP_CLASSPATH环境变量。
- 提交yarn session和启动sql client需要使用同一个用户,否则会找不到yarn session对应的application id。
- 确保当前机器的Yarn客户端配置无问题。可通过执行yarn命令是否能正常返回集群信息确认。