前提条件
已配置正确的安全组。
已获取连接Kafka实例的地址。
如果Kafka实例未开启自动创建Topic功能,在连接实例前,请先创建Topic。
已创建弹性云服务器,如果使用内网同一个VPC访问实例,请设置弹性云服务器的VPC、子网、安全组与Kafka实例的VPC、子网、安全组一致。
使用内网同一个VPC访问,实例端口为8090,实例连接地址从控制台实例详情菜单处获取,如下图所示。
Maven中引入Kafka客户端
Kafka实例基于社区版本2.8.2/3.6.2,推荐客户端保持一致。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.2/3.6.2</version>
</dependency>
客户端关键参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
生产者代码示例
package com.justin.kafka.service.gw.plain;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer {
private final KafkaProducer<String, String> producer;
public final static String TOPIC = "test-topic";
public final static String BROKER_ADDR = "192.168.0.11:8090,192.168.0.9:8090,192.168.0.10:8090";
public Producer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put("retries",3);
producer = new KafkaProducer<>(props);
}
public void produce() {
try {
for (int i = 0; i < 10; i++) {
String data = "The msg is " + i;
producer.send(new ProducerRecord<>(TOPIC, data), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// TODO: 异常处理
exception.printStackTrace();
return;
}
System.out.println("produce msg completed, partition id = " + metadata.partition());
}
});
}
} catch (Exception e) {
// TODO: 异常处理
e.printStackTrace();
}
}
public static void main(String[] args) {
Producer producer = new Producer();
producer.produce();
}
}
消费者代码示例
package com.justin.kafka.service.gw.plain;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
private org.apache.kafka.clients.consumer.Consumer<String, String> consumer;
private static final String GROUP_ID = "test-group";
private static final String TOPIC = "test-topic";
public final static String BROKER_ADDR = "192.168.0.11:8090,192.168.0.9:8090,192.168.0.10:8090";
public Consumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
}
public void consume() {
consumer.subscribe(Arrays.asList(TOPIC));
while (true){
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println("the numbers of topic:" + records.count());
for (ConsumerRecord<String, String> record : records) {
System.out.println("the data is " + record.value());
}
}catch (Exception e){
// TODO: 异常处理
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new Consumer().consume();
}
}