RocketMQ里的一个Consumer Group代表一个Consumer群组。对于大多数分布式应用来说,一个Consumer Group下通常会有多个Consumer实例。订阅关系一致指的是同一个Consumer Group下所有Consumer实例的处理逻辑必须完全一致,一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
背景信息
RocketMQ 中一个消费者代表一个Consumer实例群组。在大多数场景中,一个消费者组下面包含多个Consumer实例。
由于分布式消息服务RocketMQ的订阅关系主要由Topic+Tag共同组成,因此,保持订阅关系一致意味着同一个消费者Group ID下所有的Consumer实例订阅关系的一致性大概包括下面几个方面:
同一个消费组订阅的Topic必须一致,例如:在同一个消费组下,ConsumerA订阅Topic1和Topic2,ConsumerB也必须订阅Topic1和Topic2,只订阅Topic1、只订阅Topic2或订阅Topic2和Topic3都是不允许的。
同一个消费者订阅的同一个Topic的场景下Tag必须一致,包括Tag的数量和T顺序,例如:ConsumerA订阅Topic1的Tag配置为Tag1||Tag2,ConsumerB订阅Topic1的Tag也必须是Tag1||Tag2,只订阅Tag1、只订阅Tag2或者订阅Tag2||Tag1都是不允许的。
正确的订阅关系如下,多个不同的topic可以被多个消费组订阅,但是同一个消费组下的多个Consumer实例订阅Topic和Tag都必须一致。
代码示例
- 订阅一个Topic、一个Tag
同一个消费组下面的全部消费者实例均订阅一个topic,且均配置同一个tag这种是符合订阅关系一致性原则的。
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag1");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
// do something
}
- 订阅一个topic多个tag
每个消费者订阅消息的代码必须一致
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag1||Tag2");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
// do something
}
- 订阅多个topic且订阅多个tag
consumer.setConsumerGroup("group1");
consumer.subscribe(topic1,"Tag1");
consumer.subscribe(topic2,"Tag1|Tag2");
consumer.subscribe(topic3,"*");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
// do something
}
常见的订阅关系不一致情况
如果Rocketmq实例消费到的消息不符合预期,可以检查一下消费者逻辑是否存在订阅关系不一致的情况
下面列举几种常见的错误示例
同一个消费组下订阅的topic不一致
消费者实例1的代码:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic1,"*");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
// do something
}
消费者实例2的代码:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic2,"*");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
// do something
}
消费者实例3的代码:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic3,"*");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
// do something
}
同一个消费组的消费实例订阅的topic相同但订阅的tag不一致
消费者实例1的代码:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag1");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
// do something
}
消费者实例2的代码:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag2");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
// do something
}
消费者实例3的代码:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag2");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
// do something
}
同一个消费组下全部消费者实例订阅的topic以及tag都一致但订阅tag的顺序不一致
消费者实例1的代码
:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag1||Tag2");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
// do something
}
消费者实例2的代码:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag2||Tag1");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
// do something
}
消费者实例3的代码:
consumer.setConsumerGroup("group1");
consumer.subscribe(topic,"Tag2||Tag1");
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context)->{
// do something
}