searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Kafka消费者rebalance耗时较长的原因分析

2023-04-17 09:09:06
229
0

一、现象

topic1有3个分区

创建consumer1加入group,订阅topic,调poll试图拉取一条消息

创建consumer2加入group,订阅topic,调poll试图拉取一条消息

consumer2调poll时,该topic触发rebalance,开始重新分配分区,该rebalance过程长达几分钟,期间所有消费者无法消费该topic消息

 

二、rebalance过程

consumer2加入group时,该topic触发rebalance,所有消费者都要重新加入group,包括原来的消费者consumer1;

consumer1会定期发心跳,kafka通过心跳回包通知consumer1重新加入group,然后consumer1发送JoinGroup请求;

如果consumer1、consumer2都很快发送了JoinGroup,并走完了后面的流程,rebalance会很快结;

heartbeat.interval.ms默认3s,默认情况下,消费者每隔3s发一次心跳,因此,新消费者加入group,rebalance一般也可以在3s内完成。

 

三、长时间rebalance原因

本案例观察kafka收发包日志

consumer2发送JoinGroup,开始rebalance,此处正常;然后consumer1发送心跳,kafka回包errorcode=27(REBALANCE_IN_PROGRESS),通知consumer1重新加入group,但consumer1没有发送JoinGroup,kafka一直等待,直到rebalance超时结束,consumer1被踢出group,最后仅consumer2留在该group,整个rebalance过程通常持续几分钟(max.poll.interval.ms默认5min)

 

四、consumer1为何不发送JoinGroup

分析kafka消费者client源码,消费者收到心跳回包,得知开始rebalance,仅仅设置rejoinNeeded标志。

private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
	public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
		...
		if (error == Errors.REBALANCE_IN_PROGRESS) {
			...
			requestRejoin("group is already rebalancing");
			...
		}
		...
	}
}
public synchronized void requestRejoin(final String shortReason, final String fullReason) {
    ...
    this.rejoinNeeded = true;
}

 

真正发送JoinGroup的逻辑在poll执行

public boolean poll(Timer timer, boolean waitForJoinGroup) {
       ...
       if (rejoinNeededOrPending()) {
              发送JoinGroup
       }
       ...
}
protected synchronized boolean rejoinNeededOrPending() {
    return rejoinNeeded || joinFuture != null;
}

 

根据上面的代码分析,可以得出结论:当consumer1消费一条消息之后,不再调poll拉消息,即使心跳回包知道开始rebalance,只会设置rejoinNeeded标志,不会发送JoinGroup,从而导致kafka一直等待consumer1加入group,直到超时。

 

五、解决方法

要想快速完成rebalance,有几个应对方法:

1、消费程序保持常态poll消息,比如在循环或定时器中反复调poll从而能及时发送JoinGroup;

2、消费者心跳回调方法添加JoinGroup发送,不依赖poll的执行

 

六、消费程序优化

kafka client的poll接口有两个版本:poll(long)、poll(Duration)

public ConsumerRecords<K, V> poll(final long timeoutMs) {
    return poll(time.timer(timeoutMs), false);
}
public ConsumerRecords<K, V> poll(final Duration timeout) {
    return poll(time.timer(timeout), true);
}
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout);

poll(long)的实现,consumer会一直阻塞直到它成功获取了所需的元数据信息,之后它才会发起fetch请求去获取数据。虽然poll可以指定超时时间,但这个超时时间只适用于后面的消息获取,前面更新元数据信息不计入这个超时时间,所以设定的timout时间可能失效。当rebalance过程长时间未结束,poll(long)会阻塞超过timeout时间;

poll(Duration)的实现修改了设计,会把元数据获取也计入整个超时时间,从而避免了无效等待。

在rebalance过程可能时间很长的情景下,使用poll(Duration)接口可以避免消费程序阻塞。

0条评论
作者已关闭评论
a****k
16文章数
0粉丝数
a****k
16 文章 | 0 粉丝
原创

Kafka消费者rebalance耗时较长的原因分析

2023-04-17 09:09:06
229
0

一、现象

topic1有3个分区

创建consumer1加入group,订阅topic,调poll试图拉取一条消息

创建consumer2加入group,订阅topic,调poll试图拉取一条消息

consumer2调poll时,该topic触发rebalance,开始重新分配分区,该rebalance过程长达几分钟,期间所有消费者无法消费该topic消息

 

二、rebalance过程

consumer2加入group时,该topic触发rebalance,所有消费者都要重新加入group,包括原来的消费者consumer1;

consumer1会定期发心跳,kafka通过心跳回包通知consumer1重新加入group,然后consumer1发送JoinGroup请求;

如果consumer1、consumer2都很快发送了JoinGroup,并走完了后面的流程,rebalance会很快结;

heartbeat.interval.ms默认3s,默认情况下,消费者每隔3s发一次心跳,因此,新消费者加入group,rebalance一般也可以在3s内完成。

 

三、长时间rebalance原因

本案例观察kafka收发包日志

consumer2发送JoinGroup,开始rebalance,此处正常;然后consumer1发送心跳,kafka回包errorcode=27(REBALANCE_IN_PROGRESS),通知consumer1重新加入group,但consumer1没有发送JoinGroup,kafka一直等待,直到rebalance超时结束,consumer1被踢出group,最后仅consumer2留在该group,整个rebalance过程通常持续几分钟(max.poll.interval.ms默认5min)

 

四、consumer1为何不发送JoinGroup

分析kafka消费者client源码,消费者收到心跳回包,得知开始rebalance,仅仅设置rejoinNeeded标志。

private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
	public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
		...
		if (error == Errors.REBALANCE_IN_PROGRESS) {
			...
			requestRejoin("group is already rebalancing");
			...
		}
		...
	}
}
public synchronized void requestRejoin(final String shortReason, final String fullReason) {
    ...
    this.rejoinNeeded = true;
}

 

真正发送JoinGroup的逻辑在poll执行

public boolean poll(Timer timer, boolean waitForJoinGroup) {
       ...
       if (rejoinNeededOrPending()) {
              发送JoinGroup
       }
       ...
}
protected synchronized boolean rejoinNeededOrPending() {
    return rejoinNeeded || joinFuture != null;
}

 

根据上面的代码分析,可以得出结论:当consumer1消费一条消息之后,不再调poll拉消息,即使心跳回包知道开始rebalance,只会设置rejoinNeeded标志,不会发送JoinGroup,从而导致kafka一直等待consumer1加入group,直到超时。

 

五、解决方法

要想快速完成rebalance,有几个应对方法:

1、消费程序保持常态poll消息,比如在循环或定时器中反复调poll从而能及时发送JoinGroup;

2、消费者心跳回调方法添加JoinGroup发送,不依赖poll的执行

 

六、消费程序优化

kafka client的poll接口有两个版本:poll(long)、poll(Duration)

public ConsumerRecords<K, V> poll(final long timeoutMs) {
    return poll(time.timer(timeoutMs), false);
}
public ConsumerRecords<K, V> poll(final Duration timeout) {
    return poll(time.timer(timeout), true);
}
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout);

poll(long)的实现,consumer会一直阻塞直到它成功获取了所需的元数据信息,之后它才会发起fetch请求去获取数据。虽然poll可以指定超时时间,但这个超时时间只适用于后面的消息获取,前面更新元数据信息不计入这个超时时间,所以设定的timout时间可能失效。当rebalance过程长时间未结束,poll(long)会阻塞超过timeout时间;

poll(Duration)的实现修改了设计,会把元数据获取也计入整个超时时间,从而避免了无效等待。

在rebalance过程可能时间很长的情景下,使用poll(Duration)接口可以避免消费程序阻塞。

文章来自个人专栏
云组件
16 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0