searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Kafka之Schema Registry的原理及使用

2023-07-27 01:06:56
1196
0

一、Schema Registry简介

Confluent Schema Registry为元数据提供了一个服务层。它提供了一个RESTful接口,用于存储和检索Avro、JSON模式和Protobuf模式。它基于指定的主题名称策略存储所有schema版本的历史记录,提供多个兼容性设置,并允许根据配置的兼容性设置和对这些模式类型的扩展来支持schema的演化。它提供了嵌入Apache Kafka客户端的序列化程序,这些客户端处理以任何支持格式发送的Kafka消息的模式存储和检索。

Schema Registry位于Kafka broker节点之外,并与之分开。kafka生产者和消费者仍然与Kafka broker通信,以发布和读取主题的数据(消息)。同时,他们还可以与模式注册中心通信,以发送和检索描述消息数据模型的模式。

Schema Registry的存储与检索schema示意图

二、Schema Registry的部署及启动

step 1:下载confluent

社区版下载:https://packages.confluent.io/archive/7.2/confluent-community-7.2.0.tar.gz

其他版本下载:https://www.confluent.io/previous-versions/

step 2:安装confluent包

tar -zxvf confluent-community-7.2.0.tar.gz -C /usr/local

step 3:配置schema registry配置文件

vim /usr/local/confluent-7.2.0/etc/schema-registry/schema-registry.properties
--------------------------------------------------------------------------------------
#schema registry服务地址
listeners=http://localhost:8081

# Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the leader schema registry instance and for storing the data for
# registered schemas.
#kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
kafkastore.bootstrap.servers=SASL_PLAINTEXT://localhost1:9092,SASL_PLAINTEXT://localhost2:9092,SASL_PLAINTEXT://localhost3:9092

# The name of the topic to store schemas in
kafkastore.topic=_schemas

# If true, API requests that fail will include extra debugging information, including stack traces
debug=false

kafkastore.security.protocol=SASL_PLAINTEXT
kafkastore.sasl.mechanism=GSSAPI
kafkastore.sasl.kerberos.service.name=kafka

kafkastore.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
   useKeyTab=true \
   storeKey=true \
   keyTab="/path/to/kafka.keytab" \
   principal="XX/XXX@XXX";

step 4:启动Schema Registry

cd /usr/local/confluent-7.2.0
bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties

三、Schema Registry的rest api使用

Schema Registry中的相关rest api如下:

# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key/versions
  {"id":1}

# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
     http://localhost:8081/subjects/Kafka-value/versions
  {"id":1}

# List all subjects
$ curl -X GET http://localhost:8081/subjects
  ["Kafka-value","Kafka-key"]

# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
  [1]

# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1
  {"schema":"\"string\""}

# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/3
  3

# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
  [1, 2, 3, 4, 5]

# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key
  {"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}

# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
  {"is_compatible":true}

# Get top level config
$ curl -X GET http://localhost:8081/config
  {"compatibilityLevel":"BACKWARD"}

# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://localhost:8081/config
  {"compatibility":"NONE"}

# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "BACKWARD"}' \
    http://localhost:8081/config/Kafka-value
  {"compatibility":"BACKWARD"}

参考链接:https://blog.csdn.net/wuxintdrh/article/details/119594987

 

四、基于schema Registry的序列化传输验证

4.1 IDAE中导入依赖,配置开发环境

新建maven工程:kafka_schema_registry,其源码目录安排如下:

从本地schema registry安装包中导入项目依赖的jar包,需要导入的包及命令如下:

(1)common-config

mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\confluent-common\common-config-7.2.0.jar -DgroupId=io.confluent -DartifactId=common-config -Dversion=7.2.0 -Dpackaging=jar

(2)common-utils

mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\confluent-common\common-utils-7.2.0.jar -DgroupId=io.confluent -DartifactId=common-utils -Dversion=7.2.0 -Dpackaging=jar

(3)kafka-schema-registry-client

mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\schema-registry\kafka-schema-registry-client-7.2.0.jar -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=7.2.0 -Dpackaging=jar

(4)kafka-avro-serializer

mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\kafka-serde-tools\kafka-avro-serializer-7.2.0.jar -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=7.2.0 -Dpackaging=jar

(5)kafka-schema-serializer

mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\kafka-serde-tools\kafka-schema-serializer-7.2.0.jar -DgroupId=io.confluent -DartifactId=kafka-schema-serializer -Dversion=7.2.0 -Dpackaging=jar

新建maven工程,在本地maven库中导入以上jar包(导入命令如上所示),过程如下:

修改maven项目的pom.xml文件,添加如下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>org.example</groupId>
	<artifactId>kafka_schema_registry</artifactId>
	<version>1.0-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.8.1</version>
		</dependency>

		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>common-utils</artifactId>
			<version>7.2.0</version>
		</dependency>
		
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-avro-serializer</artifactId>
			<version>7.2.0</version>
		</dependency>
		
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-schema-serializer</artifactId>
			<version>7.2.0</version>
		</dependency>
		
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-schema-registry-client</artifactId>
			<version>7.2.0</version>
		</dependency>
		
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>common-utils</artifactId>
			<version>7.2.0</version>
		</dependency>
		
		<dependency>
			<groupId>org.apache.avro</groupId>
			<artifactId>avro</artifactId>
			<version>1.11.0</version>
		</dependency>
		<dependency>
			<groupId>com.google.guava</groupId>
			<artifactId>guava</artifactId>
			<version>30.1.1-jre</version>
		</dependency>

		<!-- jaskson start -->
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-core</artifactId>
			<version>2.13.2</version>
		</dependency>

		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.13.2</version>
		</dependency>
		<!-- jaskson end -->
	</dependencies>
	
	<!-- 项目打包时,与依赖jar包一并进行打包 -->
	<build>
		<plugins>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.6.1</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<configuration>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

4.2 java编码实现生产者与消费者

生产者:ProducerWithSchema.java

package com.chenxi.schemaTest;

import java.util.Properties;
import java.util.Random;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
* @author: knight chen
*/
public class ProducerWithSchema {
        // 设置user的schema序列化模式
	public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +
	"\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " +
	"{\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";

	public static void main(String[] args) throws InterruptedException {
		Properties props = new Properties();
		props.put("bootstrap.servers", "localhost1:9092,localhost2:9092,,localhost3:9092");
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 使用Confluent实现的KafkaAvroSerializer
		props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
		// 添加schema服务的地址,用于获取schema
		props.put("schema.registry.url", "http://localhost:8081");
		props.put("security.protocol","SASL_PLAINTEXT");
		props.put("sasl.mechanism","GSSAPI");
		props.put("sasl.kerberos.service.name","kafka");
		props.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/path/to/kafka.keytab\" principal=\"XX/XXX@XXX\";");
		Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
		Schema.Parser parser = new Schema.Parser();
		Schema schema = parser.parse(USER_SCHEMA);
		Random rand = new Random();
		int id = 0;
		while (id < 100) {
			id++;
			String name = "name" + id;
			int age = rand.nextInt(40) + 1;
			GenericRecord user = new GenericData.Record(schema);
			user.put("id", id);
			user.put("name", name);
			user.put("age", age);
			ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("schema_test", user);
			System.out.println(user);
			producer.send(record);
			Thread.sleep(100);
		}
		producer.close();
	}
}

消费者:ConsumerWithSchema.java

package com.chenxi.schemaTest;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author: knight chen
 */
public class ConsumerWithSchema {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost1:9092,localhost2:9092,localhost3:9092");
        props.put("group.id", "test_schema_registry_group");
        props.put("enable.auto.commit", "false");
        // 配置禁止自动提交,每次从头消费供测试使用
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 使用Confluent实现的KafkaAvroDeserializer
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://localhost:8081");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","GSSAPI");
        props.put("sasl.kerberos.service.name","kafka");
        props.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/path/to/kafka.keytab\" principal=\"XX/XXX@XXX\";");
        System.out.println("step 1");
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("schema_test"));

        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000));
                if (records == null) {
                    System.out.println("data from topic:schema_test is null!");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    GenericRecord user = record.value();
                    System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
                            + user.get("name") + ", " + "user.age = " + user.get("age") + "], "
                            + "partition = " + record.partition() + ", " + "offset = " + record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

3.验证

(1)生产者

启动命令:

java -cp kafka_schema_registry-1.0-SNAPSHOT-jar-with-dependencies.jar com.ctyun.schemaTest.ProducerWithSchema

运行截图:

(2)消费者

启动命令:

java -cp kafka_schema_registry-1.0-SNAPSHOT-jar-with-dependencies.jar com.ctyun.schemaTest.ConsumerWithSchema

运行截图:

0条评论
0 / 1000
KnightChen
3文章数
0粉丝数
KnightChen
3 文章 | 0 粉丝
KnightChen
3文章数
0粉丝数
KnightChen
3 文章 | 0 粉丝
原创

Kafka之Schema Registry的原理及使用

2023-07-27 01:06:56
1196
0

一、Schema Registry简介

Confluent Schema Registry为元数据提供了一个服务层。它提供了一个RESTful接口,用于存储和检索Avro、JSON模式和Protobuf模式。它基于指定的主题名称策略存储所有schema版本的历史记录,提供多个兼容性设置,并允许根据配置的兼容性设置和对这些模式类型的扩展来支持schema的演化。它提供了嵌入Apache Kafka客户端的序列化程序,这些客户端处理以任何支持格式发送的Kafka消息的模式存储和检索。

Schema Registry位于Kafka broker节点之外,并与之分开。kafka生产者和消费者仍然与Kafka broker通信,以发布和读取主题的数据(消息)。同时,他们还可以与模式注册中心通信,以发送和检索描述消息数据模型的模式。

Schema Registry的存储与检索schema示意图

二、Schema Registry的部署及启动

step 1:下载confluent

社区版下载:https://packages.confluent.io/archive/7.2/confluent-community-7.2.0.tar.gz

其他版本下载:https://www.confluent.io/previous-versions/

step 2:安装confluent包

tar -zxvf confluent-community-7.2.0.tar.gz -C /usr/local

step 3:配置schema registry配置文件

vim /usr/local/confluent-7.2.0/etc/schema-registry/schema-registry.properties
--------------------------------------------------------------------------------------
#schema registry服务地址
listeners=http://localhost:8081

# Use this setting to specify the bootstrap servers for your Kafka cluster and it
# will be used both for selecting the leader schema registry instance and for storing the data for
# registered schemas.
#kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
kafkastore.bootstrap.servers=SASL_PLAINTEXT://localhost1:9092,SASL_PLAINTEXT://localhost2:9092,SASL_PLAINTEXT://localhost3:9092

# The name of the topic to store schemas in
kafkastore.topic=_schemas

# If true, API requests that fail will include extra debugging information, including stack traces
debug=false

kafkastore.security.protocol=SASL_PLAINTEXT
kafkastore.sasl.mechanism=GSSAPI
kafkastore.sasl.kerberos.service.name=kafka

kafkastore.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
   useKeyTab=true \
   storeKey=true \
   keyTab="/path/to/kafka.keytab" \
   principal="XX/XXX@XXX";

step 4:启动Schema Registry

cd /usr/local/confluent-7.2.0
bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties

三、Schema Registry的rest api使用

Schema Registry中的相关rest api如下:

# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key/versions
  {"id":1}

# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
     http://localhost:8081/subjects/Kafka-value/versions
  {"id":1}

# List all subjects
$ curl -X GET http://localhost:8081/subjects
  ["Kafka-value","Kafka-key"]

# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
  [1]

# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1
  {"schema":"\"string\""}

# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/3
  3

# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
  [1, 2, 3, 4, 5]

# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key
  {"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}

# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
  {"is_compatible":true}

# Get top level config
$ curl -X GET http://localhost:8081/config
  {"compatibilityLevel":"BACKWARD"}

# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://localhost:8081/config
  {"compatibility":"NONE"}

# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "BACKWARD"}' \
    http://localhost:8081/config/Kafka-value
  {"compatibility":"BACKWARD"}

参考链接:https://blog.csdn.net/wuxintdrh/article/details/119594987

 

四、基于schema Registry的序列化传输验证

4.1 IDAE中导入依赖,配置开发环境

新建maven工程:kafka_schema_registry,其源码目录安排如下:

从本地schema registry安装包中导入项目依赖的jar包,需要导入的包及命令如下:

(1)common-config

mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\confluent-common\common-config-7.2.0.jar -DgroupId=io.confluent -DartifactId=common-config -Dversion=7.2.0 -Dpackaging=jar

(2)common-utils

mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\confluent-common\common-utils-7.2.0.jar -DgroupId=io.confluent -DartifactId=common-utils -Dversion=7.2.0 -Dpackaging=jar

(3)kafka-schema-registry-client

mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\schema-registry\kafka-schema-registry-client-7.2.0.jar -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=7.2.0 -Dpackaging=jar

(4)kafka-avro-serializer

mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\kafka-serde-tools\kafka-avro-serializer-7.2.0.jar -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=7.2.0 -Dpackaging=jar

(5)kafka-schema-serializer

mvn install:install-file -Dfile=C:\Users\lenovo\Downloads\confluent-7.2.0\share\java\kafka-serde-tools\kafka-schema-serializer-7.2.0.jar -DgroupId=io.confluent -DartifactId=kafka-schema-serializer -Dversion=7.2.0 -Dpackaging=jar

新建maven工程,在本地maven库中导入以上jar包(导入命令如上所示),过程如下:

修改maven项目的pom.xml文件,添加如下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>org.example</groupId>
	<artifactId>kafka_schema_registry</artifactId>
	<version>1.0-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.8.1</version>
		</dependency>

		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>common-utils</artifactId>
			<version>7.2.0</version>
		</dependency>
		
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-avro-serializer</artifactId>
			<version>7.2.0</version>
		</dependency>
		
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-schema-serializer</artifactId>
			<version>7.2.0</version>
		</dependency>
		
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-schema-registry-client</artifactId>
			<version>7.2.0</version>
		</dependency>
		
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>common-utils</artifactId>
			<version>7.2.0</version>
		</dependency>
		
		<dependency>
			<groupId>org.apache.avro</groupId>
			<artifactId>avro</artifactId>
			<version>1.11.0</version>
		</dependency>
		<dependency>
			<groupId>com.google.guava</groupId>
			<artifactId>guava</artifactId>
			<version>30.1.1-jre</version>
		</dependency>

		<!-- jaskson start -->
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-core</artifactId>
			<version>2.13.2</version>
		</dependency>

		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.13.2</version>
		</dependency>
		<!-- jaskson end -->
	</dependencies>
	
	<!-- 项目打包时,与依赖jar包一并进行打包 -->
	<build>
		<plugins>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.6.1</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<configuration>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

4.2 java编码实现生产者与消费者

生产者:ProducerWithSchema.java

package com.chenxi.schemaTest;

import java.util.Properties;
import java.util.Random;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
* @author: knight chen
*/
public class ProducerWithSchema {
        // 设置user的schema序列化模式
	public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +
	"\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " +
	"{\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";

	public static void main(String[] args) throws InterruptedException {
		Properties props = new Properties();
		props.put("bootstrap.servers", "localhost1:9092,localhost2:9092,,localhost3:9092");
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 使用Confluent实现的KafkaAvroSerializer
		props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
		// 添加schema服务的地址,用于获取schema
		props.put("schema.registry.url", "http://localhost:8081");
		props.put("security.protocol","SASL_PLAINTEXT");
		props.put("sasl.mechanism","GSSAPI");
		props.put("sasl.kerberos.service.name","kafka");
		props.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/path/to/kafka.keytab\" principal=\"XX/XXX@XXX\";");
		Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
		Schema.Parser parser = new Schema.Parser();
		Schema schema = parser.parse(USER_SCHEMA);
		Random rand = new Random();
		int id = 0;
		while (id < 100) {
			id++;
			String name = "name" + id;
			int age = rand.nextInt(40) + 1;
			GenericRecord user = new GenericData.Record(schema);
			user.put("id", id);
			user.put("name", name);
			user.put("age", age);
			ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("schema_test", user);
			System.out.println(user);
			producer.send(record);
			Thread.sleep(100);
		}
		producer.close();
	}
}

消费者:ConsumerWithSchema.java

package com.chenxi.schemaTest;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author: knight chen
 */
public class ConsumerWithSchema {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost1:9092,localhost2:9092,localhost3:9092");
        props.put("group.id", "test_schema_registry_group");
        props.put("enable.auto.commit", "false");
        // 配置禁止自动提交,每次从头消费供测试使用
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 使用Confluent实现的KafkaAvroDeserializer
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://localhost:8081");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","GSSAPI");
        props.put("sasl.kerberos.service.name","kafka");
        props.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/path/to/kafka.keytab\" principal=\"XX/XXX@XXX\";");
        System.out.println("step 1");
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("schema_test"));

        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000));
                if (records == null) {
                    System.out.println("data from topic:schema_test is null!");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    GenericRecord user = record.value();
                    System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
                            + user.get("name") + ", " + "user.age = " + user.get("age") + "], "
                            + "partition = " + record.partition() + ", " + "offset = " + record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

3.验证

(1)生产者

启动命令:

java -cp kafka_schema_registry-1.0-SNAPSHOT-jar-with-dependencies.jar com.ctyun.schemaTest.ProducerWithSchema

运行截图:

(2)消费者

启动命令:

java -cp kafka_schema_registry-1.0-SNAPSHOT-jar-with-dependencies.jar com.ctyun.schemaTest.ConsumerWithSchema

运行截图:

文章来自个人专栏
大数据探索
3 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0