searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

springboot集成kafka指定主题分区消费

2023-07-26 08:10:56
35
0

1、引入spring-kaka包

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.1.5.RELEASE</version>
</dependency>

2、定义主题分区类

package com.kafka.demo;

import lombok.Getter;
import lombok.Setter;

import java.util.List;

@Setter
@Getter
public class TopicPartitions {
    private String topic;
    private List<Integer> partitions;
}

3、定义主题消费参数类

package com.kafka.demo;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Configuration
@ConfigurationProperties(prefix = "config.kafka")
@Data
public class KafkaConsumerProperties {
    private List<String> bootstrapServers;
    private String groupId = "kafka-demo";
    private String clientId;
    private String autoOffsetReset = "earliest";
    private List<TopicPartitions> topicPartitions;
}


4、定义消费配置类

package com.kafka.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Configuration
@Slf4j
public class KafkaConsumerConfig {
    private KafkaConsumerProperties properties;
    private KafkaListenerEndpointRegistry registry;

    public KafkaConsumerConfig(KafkaConsumerProperties properties, KafkaListenerEndpointRegistry registry) {
        this.properties = properties;
        this.registry = registry;
    }

    @EventListener(ApplicationReadyEvent.class)
    public void receiveEvent() {
        Optional.ofNullable(registry.getListenerContainers()).orElse(new ArrayList<>()).forEach(listenerContainer -> {
            if (!listenerContainer.isRunning() &&
                    !((ConcurrentMessageListenerContainer) listenerContainer).getBeanName().endsWith("_manual")) {
                listenerContainer.start();
            }
        });
    }


    private Map<String, Object> configs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.join(properties.getBootstrapServers(), ","));
        props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getAutoOffsetReset());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        Assert.isTrue(StringUtils.isNotBlank(properties.getClientId()), "setting of custom.kafka.local.consumer.client-id can not be blank");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, properties.getClientId());
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> localConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(configs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new MyConcurrentKafkaListenerContainerFactory(properties);
        factory.setConsumerFactory(localConsumerFactory());
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        factory.setBatchListener(true);
        factory.setAutoStartup(false);
        return factory;
    }
}


5、定义消费工厂类

package com.kafka.demo;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.util.Assert;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

@Slf4j
public class MyConcurrentKafkaListenerContainerFactory extends ConcurrentKafkaListenerContainerFactory<String, String> {
    private KafkaConsumerProperties localKafkaConsumerProperties;

    public MyConcurrentKafkaListenerContainerFactory(KafkaConsumerProperties localKafkaConsumerProperties) {
        this.localKafkaConsumerProperties = localKafkaConsumerProperties;
        Assert.notNull(this.localKafkaConsumerProperties, "localKafkaConsumerProperties can not be null");
    }


    @Override
    protected ConcurrentMessageListenerContainer<String, String> createContainerInstance(KafkaListenerEndpoint endpoint) {
        if (StringUtils.isBlank(endpoint.getId()) || endpoint.getId().equals("special_topic")) {
            try {
                List<TopicPartitions> topicPartitions = Optional.ofNullable(localKafkaConsumerProperties.getTopicPartitions()).orElse(Lists.newArrayList());
                List<TopicPartitionInitialOffset> topicPartitionInitialOffsetList = topicPartitions.stream()
                        .flatMap(v -> v.getPartitions().stream().map(p -> new TopicPartitionInitialOffset(v.getTopic(), p)))
                        .collect(Collectors.toList());
                ContainerProperties properties = new ContainerProperties(topicPartitionInitialOffsetList.toArray(new TopicPartitionInitialOffset[topicPartitionInitialOffsetList.size()]));
                return new ConcurrentMessageListenerContainer(getConsumerFactory(), properties);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                throw e;
            }
        }
        return super.createContainerInstance(endpoint);
    }
}


6、定义消费类

package com.kafka.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@Slf4j
public class MyKafkaConsumer  {

    @KafkaListener(containerFactory = "kafkaListenerContainerFactory", topicPattern = ".+", id = "special_topic")
    public void consumeConfig(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        records.forEach(record -> {
            String topic = record.topic();
            // TODO: doSomething
        });
        ack.acknowledge();
    }
}

原理:@KafkaListener注解会去找containerFactory对应的工厂类,而该值配置的是 kafkaListenerContainerFactory正是步骤4中定义的Bean, 而该Bean又引用了自定义的工厂类 MyConcurrentKafkaListenerContainerFactory,该工厂会使用消费消费参数(步骤2)中定义的 topicPartitions 向kafka的不同主题分区注册消费者,从而起到让添加了@KafkaListener注解的方法可以实现根据不同主题不同分区进行消费

 

0条评论
0 / 1000
洪****能
3文章数
0粉丝数
洪****能
3 文章 | 0 粉丝
洪****能
3文章数
0粉丝数
洪****能
3 文章 | 0 粉丝
原创

springboot集成kafka指定主题分区消费

2023-07-26 08:10:56
35
0

1、引入spring-kaka包

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.1.5.RELEASE</version>
</dependency>

2、定义主题分区类

package com.kafka.demo;

import lombok.Getter;
import lombok.Setter;

import java.util.List;

@Setter
@Getter
public class TopicPartitions {
    private String topic;
    private List<Integer> partitions;
}

3、定义主题消费参数类

package com.kafka.demo;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Configuration
@ConfigurationProperties(prefix = "config.kafka")
@Data
public class KafkaConsumerProperties {
    private List<String> bootstrapServers;
    private String groupId = "kafka-demo";
    private String clientId;
    private String autoOffsetReset = "earliest";
    private List<TopicPartitions> topicPartitions;
}


4、定义消费配置类

package com.kafka.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Configuration
@Slf4j
public class KafkaConsumerConfig {
    private KafkaConsumerProperties properties;
    private KafkaListenerEndpointRegistry registry;

    public KafkaConsumerConfig(KafkaConsumerProperties properties, KafkaListenerEndpointRegistry registry) {
        this.properties = properties;
        this.registry = registry;
    }

    @EventListener(ApplicationReadyEvent.class)
    public void receiveEvent() {
        Optional.ofNullable(registry.getListenerContainers()).orElse(new ArrayList<>()).forEach(listenerContainer -> {
            if (!listenerContainer.isRunning() &&
                    !((ConcurrentMessageListenerContainer) listenerContainer).getBeanName().endsWith("_manual")) {
                listenerContainer.start();
            }
        });
    }


    private Map<String, Object> configs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StringUtils.join(properties.getBootstrapServers(), ","));
        props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getAutoOffsetReset());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        Assert.isTrue(StringUtils.isNotBlank(properties.getClientId()), "setting of custom.kafka.local.consumer.client-id can not be blank");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, properties.getClientId());
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> localConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(configs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new MyConcurrentKafkaListenerContainerFactory(properties);
        factory.setConsumerFactory(localConsumerFactory());
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        factory.setBatchListener(true);
        factory.setAutoStartup(false);
        return factory;
    }
}


5、定义消费工厂类

package com.kafka.demo;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.util.Assert;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

@Slf4j
public class MyConcurrentKafkaListenerContainerFactory extends ConcurrentKafkaListenerContainerFactory<String, String> {
    private KafkaConsumerProperties localKafkaConsumerProperties;

    public MyConcurrentKafkaListenerContainerFactory(KafkaConsumerProperties localKafkaConsumerProperties) {
        this.localKafkaConsumerProperties = localKafkaConsumerProperties;
        Assert.notNull(this.localKafkaConsumerProperties, "localKafkaConsumerProperties can not be null");
    }


    @Override
    protected ConcurrentMessageListenerContainer<String, String> createContainerInstance(KafkaListenerEndpoint endpoint) {
        if (StringUtils.isBlank(endpoint.getId()) || endpoint.getId().equals("special_topic")) {
            try {
                List<TopicPartitions> topicPartitions = Optional.ofNullable(localKafkaConsumerProperties.getTopicPartitions()).orElse(Lists.newArrayList());
                List<TopicPartitionInitialOffset> topicPartitionInitialOffsetList = topicPartitions.stream()
                        .flatMap(v -> v.getPartitions().stream().map(p -> new TopicPartitionInitialOffset(v.getTopic(), p)))
                        .collect(Collectors.toList());
                ContainerProperties properties = new ContainerProperties(topicPartitionInitialOffsetList.toArray(new TopicPartitionInitialOffset[topicPartitionInitialOffsetList.size()]));
                return new ConcurrentMessageListenerContainer(getConsumerFactory(), properties);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                throw e;
            }
        }
        return super.createContainerInstance(endpoint);
    }
}


6、定义消费类

package com.kafka.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@Slf4j
public class MyKafkaConsumer  {

    @KafkaListener(containerFactory = "kafkaListenerContainerFactory", topicPattern = ".+", id = "special_topic")
    public void consumeConfig(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        records.forEach(record -> {
            String topic = record.topic();
            // TODO: doSomething
        });
        ack.acknowledge();
    }
}

原理:@KafkaListener注解会去找containerFactory对应的工厂类,而该值配置的是 kafkaListenerContainerFactory正是步骤4中定义的Bean, 而该Bean又引用了自定义的工厂类 MyConcurrentKafkaListenerContainerFactory,该工厂会使用消费消费参数(步骤2)中定义的 topicPartitions 向kafka的不同主题分区注册消费者,从而起到让添加了@KafkaListener注解的方法可以实现根据不同主题不同分区进行消费

 

文章来自个人专栏
开发指引
3 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0