前提条件
- 创建了目标云数据库ClickHouse实例。详细的操作步骤,请参考快速入门->创建实例。
- 创建了用于目标云数据库ClickHouse集群的数据库账号和密码。详细的操作步骤,请参考快速入门->创建账号。
- 确保创建的云数据库ClickHouse实例可以访问需要迁移的Kafka实例。
语法描述
建表语句如下:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];
上述是云数据库ClickHouse中创建Kafka引擎表的语法和选项。让我逐一解释每个部分的含义:
- CREATE TABLE : 创建表的语句。
- [IF NOT EXISTS] : 可选项,表示如果表不存在则创建。
- [db.]table_name : 表的名称,可以包含可选的数据库前缀。
- [ON CLUSTER cluster] : 可选项,指定表所在的集群。
- (name1 [type1] [ALIAS expr1], name2 [type2] [ALIAS expr2], ...) : 定义表的列和数据类型,可以为每个列指定别名。
- ENGINE = Kafka() : 指定表的存储引擎为Kafka引擎。
- SETTINGS : 设置选项的开始标记。
- kafka_broker_list : Kafka代理服务器的主机和端口,用于连接到Kafka集群。
- kafka_topic_list : 要消费的Kafka主题列表,可以包含多个主题。
- kafka_group_name : Kafka消费者组的名称,用于协调消息的消费。
- kafka_format : 数据的格式,例如JSON、CSV等。
- kafka_row_delimiter : 可选项,指定行分隔符,用于解析文本数据。
- kafka_schema : 可选项,指定Kafka消息中的模式信息。
- kafka_num_consumers : 可选项,指定消费者线程的数量。
- kafka_max_block_size : 可选项,指定每个消费者线程的最大块大小。
- kafka_skip_broken_messages : 可选项,指定是否跳过损坏的消息。
- kafka_commit_every_batch : 可选项,指定每个批次是否提交偏移量。
- kafka_client_id : 可选项,指定Kafka消费者的客户端ID。
- kafka_poll_timeout_ms : 可选项,指定从Kafka拉取消息时的超时时间。
- kafka_poll_max_batch_size : 可选项,指定从Kafka拉取消息时的最大批次大小。
- kafka_flush_interval_ms : 可选项,指定在写入表之前的消息刷新间隔。
- kafka_thread_per_consumer : 可选项,指定每个消费者是否使用单独的线程。
- kafka_handle_error_mode : 可选项,指定处理错误消息的模式。
- kafka_commit_on_select : 可选项,指定在执行SELECT查询时是否提交偏移量。
- kafka_max_rows_per_message : 可选项,指定每条Kafka消息包含的最大行数。
这些选项允许你根据实际的Kafka集成需求来配置Kafka引擎表。根据你的具体情况,填写相应的值以满足你的数据迁移或同步需求。
以上仅是对每个选项的概述,实际使用时应根据具体情况和需求进行适当的配置。
建表示例如下:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue3 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
详细示例
通过云数据库ClickHouse的Kafka函数可以实现数据从Kafka到云数据库ClickHouse的迁移。下面是一个示例,展示了如何使用Kafka函数进行数据迁移:
- 首先,创建Kafka消费表:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
- 然后,创建云数据库ClickHouse表以存储从Kafka迁移的数据:
CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
- 接下来,创建一个物化视图,将引擎中的数据转换并放入先前创建的表中:
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
当物化视图连接到引擎时,它会在后台开始收集数据。这样,您就可以持续从 Kafka 接收消息并使用 SELECT 将其转换为所需的格式。一个 Kafka 表可以有任意多个物化视图,它们不直接从 Kafka 表中读取数据,而是接收新的记录(以块的形式),这样您就可以将数据写入具有不同详细级别的多个表中(带有分组 - 聚合和不带分组)。
- 最后,查询数据以确认迁移完成:
SELECT level, sum(total) FROM daily GROUP BY level;