kafka-clients引入依赖
在使用Kafka时,你需要在你的项目中引入相应的依赖。具体的依赖项可能会因你的项目和需求而有所不同。在使用Kafka之前,请确保查阅官方文档以获取最新的依赖项和使用说明。
以Java编程语言为例,可以使用Kafka的Java客户端库。你可以在Maven或Gradle构建工具中添加以下依赖项:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
示例代码
-
从控制台获取以下信息
-
连接地址
-
Topic名称
-
消费组名称
-
-
在实例代码中替换以上信息即可实现消息。
private static void testPlainProducer() throws InterruptedExceptionthrows { Properties props = new Properties(); props.put("bootstrap.servers","192.168.90.139;8090"); //连接地址 props.put('acks","all"); props.put('retries", 1); props.put("batch.size", 1684); props.put("linger.ms", 0) ; props.put("buffer.memory",33554432); // buffer空间32M props.put('request.timeout.ms", 1000); props.put("key.serializer", "org.apache. kafka. common. serialization. StringSerializer"); props.put("value.serializer", "org.apache.kafka. common. serialization. StringSerializer"); Producer<tring, String> producer = new KafkaProducer<~>(props); int index = 0; while(true) { String dvalue = "hello"; //消息内容 ProducerRecord record = new ProducerRecord<>( topic:"pps", key:"pps"+index++, dvalue); //主题名称 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) { if (paramRecordMetadata == null) { System.out.println("paramRecordMetadata is null "); paramException.printStackTrace() ; return; } System. out.println("发送的消息信息 " + aramRecordMetadata. topic() +",partition:+ paramRecordMetadata.partition(); } }); TimeUnit.SECONDS.sleep( timeout: 1) ; } producer.close() ; }
点击启动后成功发送消息。
-
同样在实例代码中替换以上信息即可消费消息。
private static void testPlainConsumer() throws InterruptedException { Properties properties = new Properties(); properties.put("bootstrap.servers","10.142.233.65:9092"); //连接地址 properties.put("group.id","ppsgroup"); //消费组名称 properties.put("enable.auto.commit","true"); properties.put("auto.offset.reset","earliest"); properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("pps")); //主题名称 while (true) { ConsumerRecords<0bject, Object> records = consumer.poll( timeout: 100); records.forEach(record->{ String format = String. format(offset = %d, key = %s, value = %s, record.offset ), record. key0, record. value(); System. out.println(format); }); TimeUnit.SECONSsleep( timeout: 1); } }