前提条件
已配置正确的安全组。
已获取连接Kafka实例的地址。
如果Kafka实例未开启自动创建Topic功能,在连接实例前,请先创建Topic。
已创建弹性云服务器,如果使用内网同一个VPC访问实例,请设置弹性云服务器的VPC、子网、安全组与Kafka实例的VPC、子网、安全组一致。
需要用户先在用户管理页面创建用户,然后给对应的topic授予生产消费权限(注意:代码接入kafka时,sasl.jaas.config密码为创建用户时填入的密码进行md5加密,md5取32位小写)。
使用内网同一个VPC访问,实例端口为8098,实例连接地址从控制台实例详情菜单处获取,如下图所示。
Maven中引入Kafka客户端
Kafka实例基于社区版本2.8.2/3.6.2,推荐客户端保持一致。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.2/3.6.2</version>
</dependency>
客户端关键参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test_user\" password=\"f6eca4dbc78df78d63fba980e448185f\";");
//注:上面的密码f6eca4dbc78df78d63fba980e448185f,为用户管理里面创建用户时填入的密码进行md5的结果,md5取32位小写
props.put("ssl.truststore.location","/kafka/client.truststore.jks");
props.put("ssl.truststore.password","sJses2tin1@23");
props.put("ssl.endpoint.identification.algorithm","");
注意
上述代码中涉及的密码,为用户管理里面创建用户时填入的密码进行md5的结果,md5取32位小写。
生产者代码示例
package com.justin.kafka.service.gw.saslssl;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer {
private final KafkaProducer<String, String> producer;
public final static String TOPIC = "test-topic3";
public final static String BROKER_ADDR = "192.168.0.11:8098,192.168.0.9:8098,192.168.0.10:8098";
public Producer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put("retries",3);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test_user\" password=\"f6eca4dbc78df78d63fba980e448185f\";");
props.put("ssl.truststore.location","/kafka/client.truststore.jks");
props.put("ssl.truststore.password","sJses2tin1@23");
props.put("ssl.endpoint.identification.algorithm","");
producer = new KafkaProducer<>(props);
}
public void produce() {
try {
for (int i = 0; i < 10; i++) {
String data = "The msg is " + i;
producer.send(new ProducerRecord<>(TOPIC, data), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// TODO: 异常处理
exception.printStackTrace();
return;
}
System.out.println("produce msg completed, partition id = " + metadata.partition());
}
});
}
} catch (Exception e) {
// TODO: 异常处理
e.printStackTrace();
}
}
public static void main(String[] args) {
Producer producer = new Producer();
producer.produce();
}
}
消费者代码示例
package com.justin.kafka.service.gw.saslssl;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
private org.apache.kafka.clients.consumer.Consumer<String, String> consumer;
private static final String GROUP_ID = "test-group2";
private static final String TOPIC = "test-topic2";
public final static String BROKER_ADDR = "192.168.0.11:8098,192.168.0.9:8098,192.168.0.10:8098";
public Consumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test_user\" password=\"f6eca4dbc78df78d63fba980e448185f\";");
props.put("ssl.truststore.location","/kafka/client.truststore.jks");
props.put("ssl.truststore.password","sJses2tin1@23");
props.put("ssl.endpoint.identification.algorithm","");
consumer = new KafkaConsumer<>(props);
}
public void consume() {
consumer.subscribe(Arrays.asList(TOPIC));
while (true){
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println("the numbers of topic:" + records.count());
for (ConsumerRecord<String, String> record : records) {
System.out.println("the data is " + record.value());
}
}catch (Exception e){
// TODO: 异常处理
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new Consumer().consume();
}
}