首先将spark版本对应最新iceberg的Jar包添加到spark的jars文件夹中,如iceberg-spark-runtime-3.3_2.12-1.5.0.jar。
下载地址:iceberg.apache.org/releases/
若需过往iceberg版本需进入maven库中下载:mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3_2.12
配置catalog
Iceberg提供了catalog,使SQL命令能够管理表加载表,catalog通过配置的spark.sql.catalog.(catalog_name)来定义。
Spark配置Iceberg catalog主要有六种类别,即spark.sql.catalog.(catalog-name).type的值可以为hive, hadoop, rest, glue, jdbc 或 nessie,这里我们以访问hive metastore为例。
# 访问hive metastore
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port
# omit uri to use the same URI as Spark: hive.metastore.uris in hive-site.xml
iceberg访问hive metastore的catalog仅加载iceberg表。要在同一hive metastore中加载非iceberg表,需要配置session catalog。
简单的说,就是增加了以下两个配置,spark建立的catalog(默认为spark_catalog)才可以加载iceberg catalog下的库表。但是,iceberg建立的catalog仍然只能加载iceberg表。
spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type = hive
进入Spark-SQL客户端
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
#查看spark默认namespace,默认为spark_catalog
show current namespace;
#切换namespace为配置的iceberg catalog
use hive_prod;
show current namespace;
#创建iceberg catalog下的数据库
create database iceberg_test;
use iceberg_test;
#也可以通过catalog.db来切换catalog和database,如use hive_prod.iceberg_test;
#将spark.sql.storeAssignmentPolicy参数恢复默认值ANSI,若为legacy会有与v2源不兼容的问题,而iceberg使用v2源
set spark.sql.storeAssignmentPolicy=ANSI;
#创建iceberg表,查看表结构并插入数据
CREATE TABLE iceberg_test_1(id bigint, data string) USING iceberg;
show create table iceberg_test_1;
desc formatted table iceberg_test_1;
INSERT INTO iceberg_test_1 VALUES (1, 'a'), (2, 'b'), (3, 'c');
select * from iceberg_test_1;
参考网址:
iceberg.apache.org/docs/latest/