我们向kafka里发送消息的时候,kafka有多个分区,是如何将数以百万的消息发送到各个分区的,如果其中的分区节点出现了问题?这个时候又会如何?
当我们在new一个produceRecord的时候,最常见的是直接 topic+value
还有一种是topic+key+value
查源码kafkaProduce.java里
这三行代码决定了分配的分区在哪里?
前面两行都相似
byte[] serializedKey = valueSerializer.serialize(record.topic(), record.headers(), record.key());
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
都是通过序列化 得到一个byte数组,那我们看下这个序列化怎么计算的?
head没用,不管
StringSerializer.java里 topic这个值也没用,那么制用看key和value了
发现直接就是string.getBytes
由此可以知道
serializedKey=key.getBytes
serializedValue=value.getBytes
再看 int partition = partition(record, serializedKey, serializedValue, cluster);
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
partition=null 继续看partitioner.patition方法 那这个partitioner是啥?就是我最上面的截图的分区器
最关键的代码来了
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取了所有的分区个数 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { //如果key为null int nextValue = nextValue(topic); // 这个nextvalue方法是根据topic名称随机生成一个值 123456 第二次调用+1变成123457 第三次+1变成123458 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); //获取可用的分区个数 if (availablePartitions.size() > 0) { //分配到可用的分区上 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { //所有分区都不可用那么就随便分配了 // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { //指明了根据key的hash值取分到一个区 //注意这里和上面不同,上面会获取了所有的可用分区,这里还是按照所有分区分配 // hash the keyBytes to choose a partition // Utils.murmur2(keyBytes) 这个b方法就是 你随便传个topic 无论传多少次都会返回一个相同的数,传不同的返回不同的数,就是和hash差不多不知道为啥这么麻烦。但是这个可以控制,比如我知道cc=24 我知道分区数=3 那么我知道他肯定会分配到0号分区。 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
看到这大家伙都明白了把。
如果我们不指定key,那么kafka比较智能,获取可用分区,然后自己给这个topic搞个随机值然后就会按照分区轮询。
开始思考
思考1
如果我这边启动produce 发送一条消息到partition1.然后停掉,再重启produce再发一条有可能还是会到1号分区,依次类推大概率导致最后分区offset不一样,不过这概率小之又小,但是有时候我们offset有误差,如下图。
思考2
为什么要指定key,指定key有哪些好处?
好处:我知道这条数据会分配到哪个分区,知道了又能怎么样呢??
比如如下场景。
我们设定好分区数3 其中已知 cc1%3=0 cc2%3=1 cc2%3=2
双十一的时候阿里使用kafka记录消费数据,比如用户下一个单就会朝kafka发送一条消息
{"id":xx,tyoe:"male","amount":1000}
{"id":xx,tyoe:"female","amount":10000}
{"id":xx,tyoe:"children","amount":100}
在向kafka发送消息的时候,那么就是
key=cc1 value=male
key=cc2 value=female
key=cc3 value=children
这样有什么好处? 比如说我要实时计算
女性消费了多少数据,那么我只需要消费这个topic的 1号分区,就可以计算出女性的消费额
如果不这么做,还是采用随机的分配原则,那么我们还要订阅整个topic,消费全部数据
consumer.subscribe(Lists.newArrayList(topic));
然后比如flink.addsource(kafaConsumer).filter(x->x.type=female).sum()
一个是消费所有数据一个是消费部分数据,明显减少了服务器和客户端的压力,我真是太聪明了,这都想到了。
这个时候又有杠精来了,女的消费能力强,男的消费一般,小孩穷b消费差,你这么做会导致我计算总销售额的时候出现问题,有的消费者消费压力大。
确实按照上述,存在有的topic数据多 有的topic少。
但是我马上想到解决办法了。
比如 消费能力来算 女:男:小孩=6:3:1
那么我准备好10个分区 女的放在123456 小孩放在0 男的放在789
list1[ cc1 cc2 cc3 cc4 cc5 cc6 ]%10= 1 2 3 4 5 6
list2 [cc0]%10=0
list3 [cc7 cc8 cc9]%10=7 8 9
那么
key=list1.get(new Random().nextInt(6)) value=male 就会随机进入到分区(123456)
key=cc0 value=children 就会指定分区进入 0
key=list2.get(new Random().nextInt(3)) value=female 就会随机进入到分区(789)
这样也会减少40%的数据读取