编译运行Demo Java工程
更新时间 2025-06-16 11:54:11
最近更新时间: 2025-06-16 11:54:11
介绍连接Kafka编译运行Demo Java工程
kafka-clients引入依赖
在使用Kafka时,你需要在你的项目中引入相应的依赖。具体的依赖项可能会因你的项目和需求而有所不同。在使用Kafka之前,请确保查阅官方文档以获取最新的依赖项和使用说明。
以Java编程语言为例,可以使用Kafka的Java客户端库。你可以在Maven或Gradle构建工具中添加以下依赖项:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
示例代码
-
从控制台获取以下信息
-
连接地址
实例连接地址从控制台实例详情菜单处获取,在实例详情页面的接入点信息一栏。
-
Topic名称
在Topic管理页面,选择需要的Topic名称。
-
消费组名称
在消费组管理页面,选择需要的消费组名称。
-
-
在实例代码中替换以上信息即可实现消息。
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { private final KafkaProducer<String, String> producer; public final static String TOPIC = "test-topic"; public final static String BROKER_ADDR = "192.168.0.11:8090,192.168.0.9:8090,192.168.0.10:8090"; public Producer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put("retries",3); producer = new KafkaProducer<>(props); } public void produce() { try { for (int i = 0; i < 10; i++) { String data = "The msg is " + i; producer.send(new ProducerRecord<>(TOPIC, data), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { // TODO: 异常处理 exception.printStackTrace(); return; } System.out.println("produce msg completed, partition id = " + metadata.partition()); } }); } } catch (Exception e) { // TODO: 异常处理 e.printStackTrace(); } producer.flush(); producer.close(); } public static void main(String[] args) { Producer producer = new Producer(); producer.produce(); } }
-
同样在实例代码中替换以上信息即可消费消息。
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class Consumer { private org.apache.kafka.clients.consumer.Consumer<String, String> consumer; private static final String GROUP_ID = "test-group"; private static final String TOPIC = "test-topic"; public final static String BROKER_ADDR = "192.168.0.11:8090,192.168.0.9:8090,192.168.0.10:8090"; public Consumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_ADDR); props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); } public void consume() { consumer.subscribe(Arrays.asList(TOPIC)); while (true){ try { ConsumerRecords<String, String> records = consumer.poll(1000); System.out.println("the numbers of topic:" + records.count()); for (ConsumerRecord<String, String> record : records) { System.out.println("the data is " + record.value()); } }catch (Exception e){ // TODO: 异常处理 e.printStackTrace(); } } } public static void main(String[] args) { new Consumer().consume(); } }