一、 环境准备:
Doris环境:
- Doris集群2.1.2版本
- FE 节点:172.XX.XX.15 - 172.XX.XX.17
- BE 节点:172.XX.XX.18 - 172.XX.XX.20
Iceberg环境:
版本1.4.3
二、Iceberg 介绍:
Iceberg 是一种开源、高性能、高可靠的数据湖表格式,可实现超大规模数据的分析与管理。它支持Doris 在内的多种主流查询引擎,兼容 HDFS 以及各种对象云存储,具备 ACID、Schema 演进、高级过滤、隐藏分区和分区布局演进等特性,可确保高性能查询以及数据的可靠性及一致性,其时间旅行和版本回滚功能也为数据管理带来较高的灵活性。
Doris 对 Iceberg 多项核心特性提供了原生支持:
Ø 支持 Hive Metastore、Hadoop、REST、Glue、Google Dataproc Metastore、DLF 等多种 Iceberg Catalog 类型。
Ø 原生支持 Iceberg V1/V2 表格式,以及 Position Delete、Equality Delete 文件的读取。
Ø 支持通过表函数查询 Iceberg 表快照历史。
Ø 支持时间旅行(Time Travel)功能。
Ø 原生支持 Iceberg 表引擎。可以通过 Apache Doris 直接创建、管理以及将数据写入到 Iceberg 表。支持完善的分区 Transform 函数,从而提供隐藏分区和分区布局演进等能力。
三、Doris + Iceberg表格式数据读写:
Doris 可以直接管理和构建 Iceberg 表,在 Doris 中完成对数据的清洗、加工并写入到 Iceberg 表,构建统一的湖仓数据处理平台。
Ø Doris中创建基于Hive metastore 的Iceberg Catalog:
mysql> CREATE CATALOG iceberg PROPERTIES (
-> "type"="iceberg",
-> "iceberg.catalog.type"="hms",
-> "hive.metastore.uris" = "thrift://xxxx:9083",
-> "hive.metastore.sasl.enabled" = "true",
-> "hive.metastore.warehouse.dir" = '/warehouse/tablespace/managed/hive',
-> "hadoop.security.authentication" = "kerberos",
-> 'hadoop.username' = 'hive',
-> "hadoop.kerberos.principal" = "hive/ xxxx @ xxxx.dev",
-> "hadoop.kerberos.keytab" = "/etc/security/keytabs/ xxxx.keytab",
-> "fs.defaultFS" = "hdfs:// xxxx ",
-> 'dfs.nameservices'=' xxxx ',
-> 'dfs.ha.namenodes. xxxx '='nn1,nn2',
-> 'dfs.namenode.rpc-address. xxxx.nn1'='hdfs1:54310',
->'dfs.namenode.rpc-address. xxxx.nn2'=' xxxx:54310', ‘dfs.client.failover.proxy.provider. xxxx '='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
-> );
Query OK, 0 rows affected (0.01 sec)
Ø 切换到创建好的iceberg catalog:
Ø 在doris 库中创建要测试的表:
mysql> CREATE TABLE IF NOT EXISTS customer (
-> c_customer_sk bigint not null,
-> c_customer_id char(16) not null,
-> c_current_cdemo_sk bigint,
-> c_current_hdemo_sk bigint,
-> c_current_addr_sk bigint,
-> c_first_shipto_date_sk bigint,
-> c_first_sales_date_sk bigint,
-> c_salutation char(10),
-> c_first_name char(20),
-> c_last_name char(30),
-> c_preferred_cust_flag char(1),
-> c_birth_day integer,
-> c_birth_month integer,
-> c_birth_year integer,
-> c_birth_country varchar(20),
-> c_login char(13),
-> c_email_address char(50),
-> c_last_review_date_sk bigint
-> )
-> DUPLICATE KEY(c_customer_sk)
-> DISTRIBUTED BY HASH(c_customer_id) BUCKETS 12
-> PROPERTIES (
-> "replication_num" = "1"
-> );
Query OK, 0 rows affected (0.04 sec)
Ø 在Doris中读取Iceberg表格式数据并写入Doris表:
mysql> insert into iceberg_hive.customer
select c_customer_sk, c_customer_id, c_current_cdemo_sk, c_current_hdemo_sk, c_current_addr_sk, c_first_shipto_date_sk, c_first_sales_date_sk, c_salutation, c_first_name, c_last_name, c_preferred_cust_flag, c_birth_day, c_birth_month, c_birth_year, c_birth_country, c_login, c_email_address, c_last_review_date from iceberg.testiceberg.customer;
Query OK, 30000000 rows affected (4.25 sec)
{'label':'label_a94021aa3195414b_a25091xxxxxxb94', 'status':'VISIBLE', 'txnId':'54165'}
读取3千万行数据并写入doris表,总耗时4.25秒。
Ø 数据回写Iceberg:
新建对应的Iceberg表格式的库表并把数据从doris中回写回去
mysql> CREATE DATABASE `testiceberg2`;
mysql> use testiceberg2;
mysql> CREATE TABLE `customer_2`
-> (
-> `c_customer_id` TEXT NULL,
-> `c_current_cdemo_sk` INT NULL,
-> `c_current_hdemo_sk` INT NULL,
-> `c_current_addr_sk` INT NULL,
-> `c_first_shipto_date_sk` INT NULL,
-> `c_first_sales_date_sk` INT NULL,
-> `c_salutation` TEXT NULL,
-> `c_first_name` TEXT NULL,
-> `c_last_name` TEXT NULL,
-> `c_preferred_cust_flag` TEXT NULL,
-> `c_birth_day` INT NULL,
-> `c_birth_month` INT NULL,
-> `c_birth_year` INT NULL,
-> `c_birth_country` TEXT NULL,
-> `c_login` TEXT NULL,
-> `c_email_address` TEXT NULL,
-> `c_last_review_date` TEXT NULL
-> );
Query OK, 0 rows affected (3.21 sec)
刷新catalog后,即可看到新建的表:
mysql> refresh catalog iceberg;
Query OK, 0 rows affected (0.02 sec)
mysql> show tables;
+------------------------+
| Tables_in_testiceberg2 |
+------------------------+
| customer_2 |
| test_iceberg_tb0719 |
+------------------------+
2 rows in set (0.06 sec)
从Doris中往Iceberg回写数据:
mysql> insert into iceberg.testiceberg2.customer_2
-> select c_customer_sk, c_customer_id, c_current_cdemo_sk, c_current_hdemo_sk, c_current_addr_sk, c_first_shipto_date_sk, c_first_sales_date_sk, c_salutation, c_first_name, c_last_name, c_preferred_cust_flag, c_birth_day, c_birth_month, c_birth_year, c_birth_country, c_login, c_email_address, c_last_review_date_sk
-> from iceberg_hive.customer;
Query OK, 0 rows affected (10.87 sec)
{'status':'COMMITTED', 'txnId':'2'}
Iceberg表中查询:
mysql> use testiceberg2;
Database changed
mysql> show tables;
+------------------------+
| Tables_in_testiceberg2 |
+------------------------+
| customer_2 |
| test_iceberg_tb0719 |
+------------------------+
2 rows in set (0.00 sec)
mysql> select count(1) from customer_2;
+----------+
| count(1) |
+----------+
| 30000000 |
+----------+
1 row in set (0.48 sec)
数据写入成功,3千万行数据写入Iceberg表,总耗时10.87s。
查看Iceberg底层数据存储:
Ø 分区表创建:
在 Apache Doris 中创建 Iceberg 表所使用的列类型,和 Iceberg 中的列类型对应关系如下:
注意:目前只支持这些数据类型和TEXT,其它数据类型会报错。
列类型暂时只能为默认的 Nullable,不支持 NOT NULL。
插入数据后,如果类型不能够兼容,例如 'abc' 插入到数值类型,则会转为 null 值后插入。
mysql> CREATE TABLE call_center
-> (
-> cc_call_center_sk bigint null,
-> cc_call_center_id string null,
-> cc_rec_start_date DATE,
-> cc_rec_end_date DATE,
-> cc_closed_date_sk int,
-> cc_open_date_sk int,
-> cc_name string,
-> cc_class string,
-> cc_employees int,
-> cc_sq_ft int,
-> cc_hours string,
-> cc_manager string,
-> cc_mkt_id int,
-> cc_mkt_class string,
-> cc_mkt_desc string,
-> cc_market_manager string,
-> cc_division int,
-> cc_division_name string,
-> cc_company int,
-> cc_company_name string,
-> cc_street_number string,
-> cc_street_name string,
-> cc_street_type string,
-> cc_suite_number string,
-> cc_city string,
-> cc_county string,
-> cc_state string,
-> cc_zip string,
-> cc_country text,
-> cc_gmt_offset decimal(5,2),
-> cc_tax_percentage decimal(5,2)
-> )
-> PARTITION BY LIST (MONTH(cc_rec_start_date)) ()
-> PROPERTIES (
-> "replication_num" = "1",
-> 'write-format'='parquet',
-> 'compression-codec'='zlib'
-> );
Query OK, 0 rows affected (0.14 sec)
进行数据插入后,可以在底层数据存储目录上看到对应的按月份字段的分区信息:
Ø 数据查询:
简单查询
分区裁剪
.
.
.
通过EXPLAIN VERBOSE语句的结果可知,cc_rec_start_date <'2000-01-01'谓词条件,最终只命中一个分区(partition=1/0)。
Ø Time Travel:
对表插入两行新数据
mysql> insert into call_center
-> select '49' as cc_call_center_sk, cc_call_center_id, cc_rec_start_date, cc_rec_end_date, cc_closed_date_sk, cc_open_date_sk, cc_name, cc_class, cc_employees,cc_sq_ft, cc_hours, cc_manager, cc_mkt_id, cc_mkt_class, cc_mkt_desc, cc_market_manager, cc_division, cc_division_name, cc_company, cc_company_name, cc_street_number, cc_street_name, cc_street_type, cc_suite_number, cc_city, cc_county, cc_state, cc_zip, cc_country, cc_gmt_offset, cc_tax_percentage
-> from testiceberg.call_center where cc_call_center_sk = '3';
Query OK, 0 rows affected (0.94 sec)
{'status':'COMMITTED', 'txnId':'2'}
mysql> insert into call_center
-> select '50' as cc_call_center_sk, cc_call_center_id, cc_rec_start_date, cc_rec_end_date, cc_closed_date_sk, cc_open_date_sk, cc_name, cc_class, cc_employees, cc_sq_ft, cc_hours, cc_manager, cc_mkt_id, cc_mkt_class, cc_mkt_desc, cc_market_manager, cc_division, cc_division_name, cc_company, cc_company_name, cc_street_number, cc_street_name, cc_street_type, cc_suite_number, cc_city, cc_county, cc_state, cc_zip, cc_country, cc_gmt_offset, cc_tax_percentage
-> from testiceberg.call_center where cc_call_center_sk = '5';
Query OK, 0 rows affected (0.69 sec)
{'status':'COMMITTED', 'txnId':'3'}
使用iceberg_meta表函数查询表的快照信息
mysql> select * from iceberg_meta("table" = "iceberg.testiceberg2.call_center","query_type" = "snapshots");
| committed_at | snapshot_id | parent_id | operation | manifest_list | summary
| 2024-08-08 16:16:20 | 8846297417078174017 | -1 | append | hdfs://ctyunns/warehouse/tablespace/managed/hive/testiceberg2.db/call_center/metadata/snap-8846297417078174017-1-1682a358-6957-4994-a8ba-6dd1fed81e28.avro | {"added-data-files":"8","added-records":"48","added-files-size":"111881","changed-partition-count":"4","total-records":"48","total-files-size":"111881","total-data-files":"8","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0"} |
| 2024-08-08 17:11:26 | 2061265937586961988 | 8846297417078174017 | append | hdfs://ctyunns/warehouse/tablespace/managed/hive/testiceberg2.db/call_center/metadata/snap-2061265937586961988-1-4cede317-c303-471e-b97b-410c359940f2.avro | {"added-data-files":"1","added-records":"1","added-files-size":"12087","changed-partition-count":"1","total-records":"49","total-files-size":"123968","total-data-files":"9","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0"} |
| 2024-08-08 17:11:53 | 6569229766383265056 | 2061265937586961988 | append | hdfs://ctyunns/warehouse/tablespace/managed/hive/testiceberg2.db/call_center/metadata/snap-6569229766383265056-1-a5834abd-3493-4843-9340-e03190d40b0c.avro | {"added-data-files":"1","added-records":"1","added-files-size":"12319","changed-partition-count":"1","total-records":"50","total-files-size":"136287","total-data-files":"10","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0"} |
3 rows in set (0.05 sec)
使用FOR VERSION AS OF语句查询指定快照
可见旧的snapshot快照ID (8846297417078174017)只包含历史数据,而最新的snapshot快照ID (6569229766383265056)包含所有的数据。