一、现象
topic1有3个分区
创建consumer1加入group,订阅topic,调poll试图拉取一条消息
创建consumer2加入group,订阅topic,调poll试图拉取一条消息
consumer2调poll时,该topic触发rebalance,开始重新分配分区,该rebalance过程长达几分钟,期间所有消费者无法消费该topic消息
二、rebalance过程
consumer2加入group时,该topic触发rebalance,所有消费者都要重新加入group,包括原来的消费者consumer1;
consumer1会定期发心跳,kafka通过心跳回包通知consumer1重新加入group,然后consumer1发送JoinGroup请求;
如果consumer1、consumer2都很快发送了JoinGroup,并走完了后面的流程,rebalance会很快结;
heartbeat.interval.ms默认3s,默认情况下,消费者每隔3s发一次心跳,因此,新消费者加入group,rebalance一般也可以在3s内完成。
三、长时间rebalance原因
本案例观察kafka收发包日志
consumer2发送JoinGroup,开始rebalance,此处正常;然后consumer1发送心跳,kafka回包errorcode=27(REBALANCE_IN_PROGRESS),通知consumer1重新加入group,但consumer1没有发送JoinGroup,kafka一直等待,直到rebalance超时结束,consumer1被踢出group,最后仅consumer2留在该group,整个rebalance过程通常持续几分钟(max.poll.interval.ms默认5min)
四、consumer1为何不发送JoinGroup
分析kafka消费者client源码,消费者收到心跳回包,得知开始rebalance,仅仅设置rejoinNeeded标志。
private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
...
if (error == Errors.REBALANCE_IN_PROGRESS) {
...
requestRejoin("group is already rebalancing");
...
}
...
}
}
public synchronized void requestRejoin(final String shortReason, final String fullReason) {
...
this.rejoinNeeded = true;
}
真正发送JoinGroup的逻辑在poll执行
public boolean poll(Timer timer, boolean waitForJoinGroup) {
...
if (rejoinNeededOrPending()) {
发送JoinGroup
}
...
}
protected synchronized boolean rejoinNeededOrPending() {
return rejoinNeeded || joinFuture != null;
}
根据上面的代码分析,可以得出结论:当consumer1消费一条消息之后,不再调poll拉消息,即使心跳回包知道开始rebalance,只会设置rejoinNeeded标志,不会发送JoinGroup,从而导致kafka一直等待consumer1加入group,直到超时。
五、解决方法
要想快速完成rebalance,有几个应对方法:
1、消费程序保持常态poll消息,比如在循环或定时器中反复调poll从而能及时发送JoinGroup;
2、消费者心跳回调方法添加JoinGroup发送,不依赖poll的执行
六、消费程序优化
kafka client的poll接口有两个版本:poll(long)、poll(Duration)
public ConsumerRecords<K, V> poll(final long timeoutMs) {
return poll(time.timer(timeoutMs), false);
}
public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(time.timer(timeout), true);
}
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout);
poll(long)的实现,consumer会一直阻塞直到它成功获取了所需的元数据信息,之后它才会发起fetch请求去获取数据。虽然poll可以指定超时时间,但这个超时时间只适用于后面的消息获取,前面更新元数据信息不计入这个超时时间,所以设定的timout时间可能失效。当rebalance过程长时间未结束,poll(long)会阻塞超过timeout时间;
poll(Duration)的实现修改了设计,会把元数据获取也计入整个超时时间,从而避免了无效等待。
在rebalance过程可能时间很长的情景下,使用poll(Duration)接口可以避免消费程序阻塞。