1.基本信息
kafka版本:kafka_2.11-0.9.0.1
kafka jar包版本:0.9.0.1
kafka集群:192.168.1.101,192.168.1.102,192.168.1.103
2.自定义partition分发策略, 新建MyPartitioner类, 根据key值进行自定义
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
/**
*
* <p>类描述: 自定义分区分发策略 </p>
* <p>创建人:wanghonggang </p>
* <p>创建时间:2019年9月26日 下午3:30:11 </p>
*/
public class MyPartitioner implements Partitioner {
/**
* 构造函数
* 创建一个新的实例 MyPartitioner.
*
* @param props
*/
public MyPartitioner(VerifiableProperties props) {
}
/**
* 根据key和分区数量确定的分区号
*/
public int partition(Object key, int numPartitions) {
try {
long key_ = Long.parseLong((String) key);
return (int) Math.abs(key_ % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}
}
3.生产者引用分配策略, 新建MessageProducer类
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
*
* <p>类描述: 生产者 </p>
* <p>创建人:wanghonggang </p>
* <p>创建时间:2019年9月26日 下午3:32:58 </p>
*/
public class MessageProducer {
private Producer<String, String> inner;
private String topic="topic-test";
p