searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Kafka 参数使用介绍

2023-03-24 07:47:39
48
0

    通过Kafka原理介绍的介绍,大家对kafka的原理也有了一定的了解了,那么我们怎么根据这些原理来更好的使用kafka呢?我们大家都知道世界上没有银弹,如果什么都去做就会什么都做不好,我们想要用好一个产品,就要对这个产品进行一些定制化的调整,下面分别就服务端、生产者、消费者三个部分给大家介绍一些对我们业务使用性能有影响的关键配置。

关键配置

  1. 优化性能我们有三板斧:批量,压缩,异步
    1. 批量用来减少io次数
    2. 压缩用来减少网络流量
    3. 异步用来减少等待时间

服务端配置

配置

key

默认值

最大能够接收的消息体大小

max.message.bytes

1000012

最小的ISR数量

transaction.state.log.min.isr

2

压缩算法

compression.type

producer

服务器返回数据的最大值

fetch.max.bytes

52428800

等待请求响应的最长时间

request.timeout.ms

30000

生产者配置

配置

key

默认值

是否等待消息提交

acks

1

压缩

compression.type

none

消息收集数量

batch.size

16384

消息收集时间

linger.ms

0

等待请求响应的最长时间

request.timeout.ms

30000

send()返回时间上限

delivery.timeout.ms

120000

重试次数(瞬时错误)

retries

0

发送未确认请求最大数量

max.in.flight.requests.per.connection

5

缓冲内存

buffer.memory

33554432

 

还记得前面介绍的isr的副本吧 ,主节点会将收到的消息复制到isr的副本中,那么怎么确认这个消息提交成功了呢。

  1. 生产者可以在延迟和持久性中, 决定是否等待消息提交. 这个可以在生产者中的acks配置设置。
    1. 可以选择是否等待0, 1 or all(-1)个ISR副本反馈。(当只有少数ISR存活时,会存在ISR失效导致数据丢失问题,可以通过配置最小的ISR数量(transaction.state.log.min.isr)来减少问题)
  2. 端到端块压缩功能,通过compression.type配置。
    1. 由于数据以压缩格式存储在代理上,有效提取的偏移量是压缩的消息边界。因此,对于压缩数据,消耗的偏移量将每次提前一个压缩消息。如果使用者发生故障,这可能会产生重复消费的副作用。
  3. 批量发送
    1. batch.sizelinger.ms满足任意条件就会触发发送。
    2. 消息大小要小于上限max.message.bytes配置,否则实际消息大于服务端值的时候会发送失败 。
    1. 通过batch.size(消息收集数量)和linger.ms(消息收集时间)配置。
  4. 等待请求响应的最长时间:request.timeout.ms
  5. 调用send()返回后报告成功或失败时间的上限:delivery.timeout.ms
    1. 这限制了记录在发送之前将被延迟的总时间,等待来自代理的确认的时间(如果期望)以及允许可重发的发送失败的时间。如果遇到不可恢复的错误,重试已用尽则生产者可能会报告未能早于此配置发送记录。此配置的值应大于或等于request.timeout.mslinger.ms的总和 。
  6. 调重试次数(瞬时错误):retries
    1. 如果设置大于零的值将导致客户端重新发送其发送失败并带有潜在的瞬时错误的任何记录。请注意,允许重试而不将max.in.flight.requests.per.connection设置为1可能会更改记录的顺序,因为如果将两个批次发送到单个分区,并且第一个批次失败并被重试,但是第二个批次成功,则第二批中的数据可能会首先出现。
    2. 另外请注意,如果由delivery.timeout.ms配置的超时在成功确认之前首先到期,则在重试次数用完之前,生产请求将失败。用户通常应该不设置此配置,而是使用delivery.timeout.ms来控制重试行为。
  7. 客户端在阻塞之前将通过单个连接发送的未确认请求的最大数量:max.in.flight.requests.per.connection
  8. 缓冲内存:buffer.memory
    1. 生产者可以用来缓冲等待发送到服务器的记录的总内存字节。如果记录的生产的速度超过了将记录发送到服务器的速度,则生产者将阻塞max.block.ms,此后它将引发异常。

 

消费者配置

配置

key

默认值

返回数据的最大值

fetch.max.bytes

52428800

每个分区返回数据的最大值

replica.fetch.max.bytes

1048576

poll()返回的最大记录数

max.poll.records

500

poll()之间的最大延迟

max.poll.interval.ms

300000

等待请求响应的最长时间

request.timeout.ms

305000

自动提交开关

enable.auto.commit

true

自动提交时间间隔

auto.commit.interval.ms

5000

  1. 批量消费
    1. fetch.max.bytes,服务器一次返回数据的最大值(如果第一条拉取的记录就大于该值,也是可以拉取的),客户端版本小于0.10.2的时候要大于max.message.bytes配置 。
    2. replica.fetch.max.bytes,尝试为每个分区获取的消息的字节数(如果第一条拉取的记录就大于该值,也是可以拉取的)。
  2. 一次调用poll()返回的最大记录数: max.poll.records
  3. 调用poll()之间的最大延迟:max.poll.interval.ms
    1. 控制拉取下来的记录的最大处理时间。
    2. 0.10.1版本之前,如果真实处理时间大于session.timeout.ms,则会触发服务端协调器对于该消费组中其他消费者与分区之间对应关系的重新平衡。
    3. 0.10.1版本之后,kafka引入了单独的心跳线程。
  4. 等待请求响应的最长时间:request.timeout.ms
    1. 要大于max.poll.interval.ms配置,否则有可能在客户端处理完本轮数据,开始下一轮拉取之时的间隔大于request.timeout.ms时间,触发连接超时。
  5. 自动提交
    1. 开关:enable.auto.commit
    2. 自动提交的时间间隔:auto.commit.interval.ms
    3. 为true的时候,自动提交系统将在auto.commit.interval.ms 时间间隔后自动提交处理完的偏移量。

最佳实践

生产者

  1. transaction.state.log.min.isr设置的比kafka服务器失效率大。
  2. compression.type在单个数据量大或者设置批量上传数据时设置,推荐lz4。
  3. batch.size linger.ms生产者对吞吐量要求高,丢失容忍度高情况下配置
    1. 批量数据要比max.message.bytes 小(如果配置了compression.type,就是压缩后的数据)。
  4. acks
    1. 对数据传输速度要求高,丢失容忍度高的时候设置0,
    2. 对数据传输速度要求高,丢失容忍度低的时候设置1,
    3. 不接受数据丢失的时候设置all或者-1,
  5. delivery.timeout.ms需要大于linger.msrequest.timeout.ms之和。

      注意:

  • 我们知道isr数量影响消息持久性和可见性的一个重要因素,太小了会因为机器失效导致集群不可用,大了就需要更多的机器资源
  • 有批量就有丢失,有异步就有丢失

消费者

  1. 拉取间隙:
    1. max.poll.interval.ms要小于request.timeout.ms(2.0版本之前)
    2. fetch.max.bytes replica.fetch.max.bytes需要比max.message.bytes 大(0.10.2版本之前)
  2. 批量拉取:
    1. max.poll.records要考虑批量数据的传输和客户端处理时间小于max.poll.interval.msrequest.timeout.ms
  3. 批量确认:
    1. enable.auto.commitauto.commit.interval.ms高吞吐、接受数据重复消费的情况下设置,auto.commit.interval.ms需要小于max.poll.interval.ms request.timeout.ms

      注意:

消息顺序性

    消息组件有很重要的两个功能:消息的顺序性和消息的不丢失性,kafka什么配置可以做到这两点呢?

  1. 发送消息的时候根据业务id来指定要发送到的分区,可以保证该id消息顺序性。
  2. max.in.flight.requests.per.connection设置为1。

消息不丢失(最少一次)

  1. 生产者
    1. transaction.state.log.min.isr设置为n/2+1。
    2. batch.size设置为1。
    3. acks 设置为all或者-1。
    4. retries设置为Integer.MAX_VALUE。(重试时间超出delivery.timeout.ms时候还是会丢失)
  2. 消费者
    1. enable.auto.commit 设置为false。
    2. 每次处理完数据之后在提交确认偏移量。

      注意:

  • 有批量,必存在丢失的可能性

参考文献

 

0条评论
0 / 1000
肖****睿
13文章数
0粉丝数
肖****睿
13 文章 | 0 粉丝
原创

Kafka 参数使用介绍

2023-03-24 07:47:39
48
0

    通过Kafka原理介绍的介绍,大家对kafka的原理也有了一定的了解了,那么我们怎么根据这些原理来更好的使用kafka呢?我们大家都知道世界上没有银弹,如果什么都去做就会什么都做不好,我们想要用好一个产品,就要对这个产品进行一些定制化的调整,下面分别就服务端、生产者、消费者三个部分给大家介绍一些对我们业务使用性能有影响的关键配置。

关键配置

  1. 优化性能我们有三板斧:批量,压缩,异步
    1. 批量用来减少io次数
    2. 压缩用来减少网络流量
    3. 异步用来减少等待时间

服务端配置

配置

key

默认值

最大能够接收的消息体大小

max.message.bytes

1000012

最小的ISR数量

transaction.state.log.min.isr

2

压缩算法

compression.type

producer

服务器返回数据的最大值

fetch.max.bytes

52428800

等待请求响应的最长时间

request.timeout.ms

30000

生产者配置

配置

key

默认值

是否等待消息提交

acks

1

压缩

compression.type

none

消息收集数量

batch.size

16384

消息收集时间

linger.ms

0

等待请求响应的最长时间

request.timeout.ms

30000

send()返回时间上限

delivery.timeout.ms

120000

重试次数(瞬时错误)

retries

0

发送未确认请求最大数量

max.in.flight.requests.per.connection

5

缓冲内存

buffer.memory

33554432

 

还记得前面介绍的isr的副本吧 ,主节点会将收到的消息复制到isr的副本中,那么怎么确认这个消息提交成功了呢。

  1. 生产者可以在延迟和持久性中, 决定是否等待消息提交. 这个可以在生产者中的acks配置设置。
    1. 可以选择是否等待0, 1 or all(-1)个ISR副本反馈。(当只有少数ISR存活时,会存在ISR失效导致数据丢失问题,可以通过配置最小的ISR数量(transaction.state.log.min.isr)来减少问题)
  2. 端到端块压缩功能,通过compression.type配置。
    1. 由于数据以压缩格式存储在代理上,有效提取的偏移量是压缩的消息边界。因此,对于压缩数据,消耗的偏移量将每次提前一个压缩消息。如果使用者发生故障,这可能会产生重复消费的副作用。
  3. 批量发送
    1. batch.sizelinger.ms满足任意条件就会触发发送。
    2. 消息大小要小于上限max.message.bytes配置,否则实际消息大于服务端值的时候会发送失败 。
    1. 通过batch.size(消息收集数量)和linger.ms(消息收集时间)配置。
  4. 等待请求响应的最长时间:request.timeout.ms
  5. 调用send()返回后报告成功或失败时间的上限:delivery.timeout.ms
    1. 这限制了记录在发送之前将被延迟的总时间,等待来自代理的确认的时间(如果期望)以及允许可重发的发送失败的时间。如果遇到不可恢复的错误,重试已用尽则生产者可能会报告未能早于此配置发送记录。此配置的值应大于或等于request.timeout.mslinger.ms的总和 。
  6. 调重试次数(瞬时错误):retries
    1. 如果设置大于零的值将导致客户端重新发送其发送失败并带有潜在的瞬时错误的任何记录。请注意,允许重试而不将max.in.flight.requests.per.connection设置为1可能会更改记录的顺序,因为如果将两个批次发送到单个分区,并且第一个批次失败并被重试,但是第二个批次成功,则第二批中的数据可能会首先出现。
    2. 另外请注意,如果由delivery.timeout.ms配置的超时在成功确认之前首先到期,则在重试次数用完之前,生产请求将失败。用户通常应该不设置此配置,而是使用delivery.timeout.ms来控制重试行为。
  7. 客户端在阻塞之前将通过单个连接发送的未确认请求的最大数量:max.in.flight.requests.per.connection
  8. 缓冲内存:buffer.memory
    1. 生产者可以用来缓冲等待发送到服务器的记录的总内存字节。如果记录的生产的速度超过了将记录发送到服务器的速度,则生产者将阻塞max.block.ms,此后它将引发异常。

 

消费者配置

配置

key

默认值

返回数据的最大值

fetch.max.bytes

52428800

每个分区返回数据的最大值

replica.fetch.max.bytes

1048576

poll()返回的最大记录数

max.poll.records

500

poll()之间的最大延迟

max.poll.interval.ms

300000

等待请求响应的最长时间

request.timeout.ms

305000

自动提交开关

enable.auto.commit

true

自动提交时间间隔

auto.commit.interval.ms

5000

  1. 批量消费
    1. fetch.max.bytes,服务器一次返回数据的最大值(如果第一条拉取的记录就大于该值,也是可以拉取的),客户端版本小于0.10.2的时候要大于max.message.bytes配置 。
    2. replica.fetch.max.bytes,尝试为每个分区获取的消息的字节数(如果第一条拉取的记录就大于该值,也是可以拉取的)。
  2. 一次调用poll()返回的最大记录数: max.poll.records
  3. 调用poll()之间的最大延迟:max.poll.interval.ms
    1. 控制拉取下来的记录的最大处理时间。
    2. 0.10.1版本之前,如果真实处理时间大于session.timeout.ms,则会触发服务端协调器对于该消费组中其他消费者与分区之间对应关系的重新平衡。
    3. 0.10.1版本之后,kafka引入了单独的心跳线程。
  4. 等待请求响应的最长时间:request.timeout.ms
    1. 要大于max.poll.interval.ms配置,否则有可能在客户端处理完本轮数据,开始下一轮拉取之时的间隔大于request.timeout.ms时间,触发连接超时。
  5. 自动提交
    1. 开关:enable.auto.commit
    2. 自动提交的时间间隔:auto.commit.interval.ms
    3. 为true的时候,自动提交系统将在auto.commit.interval.ms 时间间隔后自动提交处理完的偏移量。

最佳实践

生产者

  1. transaction.state.log.min.isr设置的比kafka服务器失效率大。
  2. compression.type在单个数据量大或者设置批量上传数据时设置,推荐lz4。
  3. batch.size linger.ms生产者对吞吐量要求高,丢失容忍度高情况下配置
    1. 批量数据要比max.message.bytes 小(如果配置了compression.type,就是压缩后的数据)。
  4. acks
    1. 对数据传输速度要求高,丢失容忍度高的时候设置0,
    2. 对数据传输速度要求高,丢失容忍度低的时候设置1,
    3. 不接受数据丢失的时候设置all或者-1,
  5. delivery.timeout.ms需要大于linger.msrequest.timeout.ms之和。

      注意:

  • 我们知道isr数量影响消息持久性和可见性的一个重要因素,太小了会因为机器失效导致集群不可用,大了就需要更多的机器资源
  • 有批量就有丢失,有异步就有丢失

消费者

  1. 拉取间隙:
    1. max.poll.interval.ms要小于request.timeout.ms(2.0版本之前)
    2. fetch.max.bytes replica.fetch.max.bytes需要比max.message.bytes 大(0.10.2版本之前)
  2. 批量拉取:
    1. max.poll.records要考虑批量数据的传输和客户端处理时间小于max.poll.interval.msrequest.timeout.ms
  3. 批量确认:
    1. enable.auto.commitauto.commit.interval.ms高吞吐、接受数据重复消费的情况下设置,auto.commit.interval.ms需要小于max.poll.interval.ms request.timeout.ms

      注意:

消息顺序性

    消息组件有很重要的两个功能:消息的顺序性和消息的不丢失性,kafka什么配置可以做到这两点呢?

  1. 发送消息的时候根据业务id来指定要发送到的分区,可以保证该id消息顺序性。
  2. max.in.flight.requests.per.connection设置为1。

消息不丢失(最少一次)

  1. 生产者
    1. transaction.state.log.min.isr设置为n/2+1。
    2. batch.size设置为1。
    3. acks 设置为all或者-1。
    4. retries设置为Integer.MAX_VALUE。(重试时间超出delivery.timeout.ms时候还是会丢失)
  2. 消费者
    1. enable.auto.commit 设置为false。
    2. 每次处理完数据之后在提交确认偏移量。

      注意:

  • 有批量,必存在丢失的可能性

参考文献

 

文章来自个人专栏
大数据
6 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0