一、Schema Registry简介
Confluent Schema Registry为元数据提供了一个服务层。它提供了一个RESTful接口,用于存储和检索Avro、JSON模式和Protobuf模式。它基于指定的主题名称策略存储所有schema版本的历史记录,提供多个兼容性设置,并允许根据配置的兼容性设置和对这些模式类型的扩展来支持schema的演化。它提供了嵌入Apache Kafka客户端的序列化程序,这些客户端处理以任何支持格式发送的Kafka消息的模式存储和检索。
Schema Registry位于Kafka broker节点之外,并与之分开。kafka生产者和消费者仍然与Kafka broker通信,以发布和读取主题的数据(消息)。同时,他们还可以与模式注册中心通信,以发送和检索描述消息数据模型的模式。
Schema Registry的存储与检索schema示意图
二、Schema Registry的部署及启动
step 1:下载confluent
社区版下载:https://packages.confluent.io/archive/7.2/confluent-community-7.2.0.tar.gz
其他版本下载:https://www.confluent.io/previous-versions/
step 2:安装confluent包
tar -zxvf confluent-community-7.2.0.tar.gz -C /usr/local
step 3:配置schema registry配置文件
vim /usr/local/confluent-7.2.0/etc/schema-registry/schema-registry.properties
--------------------------------------------------------------------------------------
#schema registry服务地址
listeners=http://localhost:8081
# Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the leader schema registry instance and for storing the data for
# registered schemas.
#kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
kafkastore.bootstrap.servers=SASL_PLAINTEXT://localhost1:9092,SASL_PLAINTEXT://localhost2:9092,SASL_PLAINTEXT://localhost3:9092
# The name of the topic to store schemas in
kafkastore.topic=_schemas
# If true, API requests that fail will include extra debugging information, including stack traces
debug=false
kafkastore.security.protocol=SASL_PLAINTEXT
kafkastore.sasl.mechanism=GSSAPI
kafkastore.sasl.kerberos.service.name=kafka
kafkastore.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/path/to/kafka.keytab" \
principal="XX/XXX@XXX";
step 4:启动Schema Registry
cd /usr/local/confluent-7.2.0
bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties
三、Schema Registry的rest api使用
Schema Registry中的相关rest api如下:
# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-key/versions
{"id":1}
# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-value/versions
{"id":1}
# List all subjects
$ curl -X GET http://localhost:8081/subjects
["Kafka-value","Kafka-key"]
# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
[1]
# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1
{"schema":"\"string\""}
# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}
# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}
# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/3
3
# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
[1, 2, 3, 4, 5]
# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-key
{"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}
# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
{"is_compatible":true}
# Get top level config
$ curl -X GET http://localhost:8081/config
{"compatibilityLevel":"BACKWARD"}
# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' \
http://localhost:8081/config
{"compatibility":"NONE"}
# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config/Kafka-value
{"compatibility":"BACKWARD"}
参考链接:https://blog.csdn.net/wuxintdrh/article/details/119594987
四、基于schema Registry的序列化传输验证
4.1 IDAE中导入依赖,配置开发环境
新建maven工程:kafka_schema_registry,其源码目录安排如下:
从本地schema registry安装包中导入项目依赖的jar包,需要导入的包及命令如下:
(1)common-config
mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\confluent-common\common-config-7.2.0.jar -DgroupId=io.confluent -DartifactId=common-config -Dversion=7.2.0 -Dpackaging=jar
(2)common-utils
mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\confluent-common\common-utils-7.2.0.jar -DgroupId=io.confluent -DartifactId=common-utils -Dversion=7.2.0 -Dpackaging=jar
(3)kafka-schema-registry-client
mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\schema-registry\kafka-schema-registry-client-7.2.0.jar -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=7.2.0 -Dpackaging=jar
(4)kafka-avro-serializer
mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\kafka-serde-tools\kafka-avro-serializer-7.2.0.jar -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=7.2.0 -Dpackaging=jar
(5)kafka-schema-serializer
mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\kafka-serde-tools\kafka-schema-serializer-7.2.0.jar -DgroupId=io.confluent -DartifactId=kafka-schema-serializer -Dversion=7.2.0 -Dpackaging=jar
新建maven工程,在本地maven库中导入以上jar包(导入命令如上所示),过程如下:
修改maven项目的pom.xml文件,添加如下依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka_schema_registry</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-utils</artifactId>
<version>7.2.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.2.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-serializer</artifactId>
<version>7.2.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.2.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-utils</artifactId>
<version>7.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
<!-- jaskson start -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.2</version>
</dependency>
<!-- jaskson end -->
</dependencies>
<!-- 项目打包时,与依赖jar包一并进行打包 -->
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4.2 java编码实现生产者与消费者
生产者:ProducerWithSchema.java
package com.chenxi.schemaTest;
import java.util.Properties;
import java.util.Random;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* @author: knight chen
*/
public class ProducerWithSchema {
// 设置user的schema序列化模式
public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +
"\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " +
"{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost1:9092,localhost2:9092,,localhost3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 使用Confluent实现的KafkaAvroSerializer
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
// 添加schema服务的地址,用于获取schema
props.put("schema.registry.url", "http://localhost:8081");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","GSSAPI");
props.put("sasl.kerberos.service.name","kafka");
props.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/path/to/kafka.keytab\" principal=\"XX/XXX@XXX\";");
Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Random rand = new Random();
int id = 0;
while (id < 100) {
id++;
String name = "name" + id;
int age = rand.nextInt(40) + 1;
GenericRecord user = new GenericData.Record(schema);
user.put("id", id);
user.put("name", name);
user.put("age", age);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("schema_test", user);
System.out.println(user);
producer.send(record);
Thread.sleep(100);
}
producer.close();
}
}
消费者:ConsumerWithSchema.java
package com.chenxi.schemaTest;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @author: knight chen
*/
public class ConsumerWithSchema {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost1:9092,localhost2:9092,localhost3:9092");
props.put("group.id", "test_schema_registry_group");
props.put("enable.auto.commit", "false");
// 配置禁止自动提交,每次从头消费供测试使用
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 使用Confluent实现的KafkaAvroDeserializer
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
// 添加schema服务的地址,用于获取schema
props.put("schema.registry.url", "http://localhost:8081");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","GSSAPI");
props.put("sasl.kerberos.service.name","kafka");
props.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/path/to/kafka.keytab\" principal=\"XX/XXX@XXX\";");
System.out.println("step 1");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("schema_test"));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000));
if (records == null) {
System.out.println("data from topic:schema_test is null!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord user = record.value();
System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
+ user.get("name") + ", " + "user.age = " + user.get("age") + "], "
+ "partition = " + record.partition() + ", " + "offset = " + record.offset());
}
}
} finally {
consumer.close();
}
}
}
3.验证
(1)生产者
启动命令:
java -cp kafka_schema_registry-1.0-SNAPSHOT-jar-with-dependencies.jar com.ctyun.schemaTest.ProducerWithSchema
运行截图:
(2)消费者
启动命令:
java -cp kafka_schema_registry-1.0-SNAPSHOT-jar-with-dependencies.jar com.ctyun.schemaTest.ConsumerWithSchema
运行截图: