实际开发中,可能需要多kafka发送的消息在发送前进行操作,比如按照某个规则过滤掉不符合要求的消息,或者屏蔽某些关键词,或者改变某些属性等等,都可以依托拦截器链来实现。
自定义拦截器需要实现ProducerInterceptor接口,根据需要重写相应的方法
如果希望改变发送前的数据值,需要重新onSend方法,从ProducerRecord中取得对应的值,进行自定义的修改,然后重新包装回ProducerRecord
如果希望对消息发送的结果进行处理,则需要重写onAcknowledgement方法,exception有值,说明消息发送失败,可以采取自定义的措施,比如重试等等,也可以用来统计消息发送的成功率
如果希望在Producer关闭时,做一些自己的业务,则可以重写close方法。
自定义拦截器
package com.hulang.kafkaboot.Interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* 自定义拦截器
*/
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
public ProducerRecord onSend(ProducerRecord record) {
System.out.println("MyProducerInterceptor 拦截器执行");
String value = (String) record.value();
if ("hello".equals(value)) {
value = "你好";
}
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
record.topic(),
record.partition(),
record.timestamp(),
(String) record.key(),
value, // 修改后的 value值
record.headers());
return producerRecord;
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
public void close() {
}
public void configure(Map<String, ?> configs) {
}
}
常规项目配置
常规项目中,KafkaProducer是自己管理的,只需要在properties添加自定义拦截器即可,如下
private static void sendCallBack() {
// 创建kafka生产者对象
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.163:9092,192.168.0.216:9092,192.168.0.164:9092");
// 指定对应的key和value序列号类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 自定义拦截器
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
KafkaProducer kafkaProducer = new KafkaProducer<String, String>(properties);
// 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("first", "hello call back" + i), (recordMetadata, exception) -> {
System.out.println(recordMetadata.topic() + ": " + recordMetadata.partition());
});
}
// 关闭资源
kafkaProducer.close();
}
springboot项目配置方法1
server:
port: 8877
spring:
kafka:
bootstrap-servers: 192.168.0.163:9092,192.168.0.216:9092,192.168.0.164:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
# 拦截器配置
interceptor:
classes: com.hulang.kafkaboot.Interceptor.MyProducerInterceptor
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: 9527
有多个,用逗号隔开即可,注意,执行顺序取决于配置的先后顺序,在前面的先执行
server:
port: 8877
spring:
kafka:
bootstrap-servers: 192.168.0.163:9092,192.168.0.216:9092,192.168.0.164:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
# 拦截器配置,执行先后顺序取决于配置顺序
interceptor:
classes: com.hulang.kafkaboot.Interceptor.MyProducerInterceptor2,com.hulang.kafkaboot.Interceptor.MyProducerInterceptor
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: 9527
springboot项目配置方法2
package com.hulang.kafkaboot.config;
import com.hulang.kafkaboot.Interceptor.MyProducerInterceptor;
import com.hulang.kafkaboot.Interceptor.MyProducerInterceptor2;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class KafkaConfig {
("")
private String BOOTSTRAP_SERVERS_CONFIG;
("")
private String KEY_SERIALIZER_CLASS_CONFIG;
("")
private String VALUE_SERIALIZER_CLASS_CONFIG;
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// kafka连接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
// key序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER_CLASS_CONFIG);
// value序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG);
// 自定义拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add(MyProducerInterceptor.class.getName());
interceptors.add(MyProducerInterceptor2.class.getName());
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
return props;
}
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
添加多个拦截器小细节
细节1
如果你有多个拦截器,put自定义拦截器时,不能直接put两个连续的拦截器,错误示范如下:
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyProducerInterceptor.class.getName());
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyProducerInterceptor2.class.getName());
这样操作只会让最后一个拦截器生效,前面的拦截器被覆盖。
需要采用数组的形式,挨个添加,然后将数组作为参数传入。
// 自定义拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add(MyProducerInterceptor.class.getName());
interceptors.add(MyProducerInterceptor2.class.getName());
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
细节2
同一个拦截器添加多次,会执行多次,如果没有这样的需要,应注意避免,浪费资源。