1、引入spring-kaka包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.5.RELEASE</version>
</dependency>
2、定义主题分区类
package com.kafka.demo;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
@Setter
@Getter
public class TopicPartitions {
private String topic;
private List<Integer> partitions;
}
3、定义主题消费参数类
package com.kafka.demo;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
@ConfigurationProperties(prefix = "config.kafka")
@Data
public class KafkaConsumerProperties {
private List<String> bootstrapServers;
private String groupId = "kafka-demo";
private String clientId;
private String autoOffsetReset = "earliest";
private List<TopicPartitions> topicPartitions;
}
4、定义消费配置类
package com.kafka.demo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.util.Assert;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@Configuration
@Slf4j
public class KafkaConsumerConfig {
private KafkaConsumerProperties properties;
private KafkaListenerEndpointRegistry registry;
public KafkaConsumerConfig(KafkaConsumerProperties properties, KafkaListenerEndpointRegistry registry) {
this.properties = properties;
this.registry = registry;
}
@EventListener(ApplicationReadyEvent.class)
public void receiveEvent() {
Optional.ofNullable(registry.getListenerContainers()).orElse(new ArrayList<>()).forEach(listenerContainer -> {
if (!listenerContainer.isRunning() &&
!((ConcurrentMessageListenerContainer) listenerContainer).getBeanName().endsWith("_manual")) {
listenerContainer.start();
}
});
}
private Map<String, Object> configs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.join(properties.getBootstrapServers(), ","));
props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getAutoOffsetReset());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Assert.isTrue(StringUtils.isNotBlank(properties.getClientId()), "setting of custom.kafka.local.consumer.client-id can not be blank");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, properties.getClientId());
return props;
}
@Bean
public ConsumerFactory<String, String> localConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(configs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new MyConcurrentKafkaListenerContainerFactory(properties);
factory.setConsumerFactory(localConsumerFactory());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
factory.setBatchListener(true);
factory.setAutoStartup(false);
return factory;
}
}
5、定义消费工厂类
package com.kafka.demo;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.util.Assert;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
public class MyConcurrentKafkaListenerContainerFactory extends ConcurrentKafkaListenerContainerFactory<String, String> {
private KafkaConsumerProperties localKafkaConsumerProperties;
public MyConcurrentKafkaListenerContainerFactory(KafkaConsumerProperties localKafkaConsumerProperties) {
this.localKafkaConsumerProperties = localKafkaConsumerProperties;
Assert.notNull(this.localKafkaConsumerProperties, "localKafkaConsumerProperties can not be null");
}
@Override
protected ConcurrentMessageListenerContainer<String, String> createContainerInstance(KafkaListenerEndpoint endpoint) {
if (StringUtils.isBlank(endpoint.getId()) || endpoint.getId().equals("special_topic")) {
try {
List<TopicPartitions> topicPartitions = Optional.ofNullable(localKafkaConsumerProperties.getTopicPartitions()).orElse(Lists.newArrayList());
List<TopicPartitionInitialOffset> topicPartitionInitialOffsetList = topicPartitions.stream()
.flatMap(v -> v.getPartitions().stream().map(p -> new TopicPartitionInitialOffset(v.getTopic(), p)))
.collect(Collectors.toList());
ContainerProperties properties = new ContainerProperties(topicPartitionInitialOffsetList.toArray(new TopicPartitionInitialOffset[topicPartitionInitialOffsetList.size()]));
return new ConcurrentMessageListenerContainer(getConsumerFactory(), properties);
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
}
}
return super.createContainerInstance(endpoint);
}
}
6、定义消费类
package com.kafka.demo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@Slf4j
public class MyKafkaConsumer {
@KafkaListener(containerFactory = "kafkaListenerContainerFactory", topicPattern = ".+", id = "special_topic")
public void consumeConfig(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
records.forEach(record -> {
String topic = record.topic();
// TODO: doSomething
});
ack.acknowledge();
}
}
原理:@KafkaListener注解会去找containerFactory对应的工厂类,而该值配置的是 kafkaListenerContainerFactory正是步骤4中定义的Bean, 而该Bean又引用了自定义的工厂类 MyConcurrentKafkaListenerContainerFactory,该工厂会使用消费消费参数(步骤2)中定义的 topicPartitions 向kafka的不同主题分区注册消费者,从而起到让添加了@KafkaListener注解的方法可以实现根据不同主题不同分区进行消费