kafka消费者自动提交是如何工作的?

新手上路,请多包涵

我正在读 这个

自动提交提交偏移量的最简单方法是让消费者为您做这件事。如果您配置 enable.auto.commit=true,那么每五秒消费者将提交您的客户端从 poll() 收到的最大偏移量。五秒间隔是默认值,由设置 auto.commit.interval.ms 控制。就像消费者中的其他一切一样,自动提交由轮询循环驱动。每当您进行轮询时,消费者都会检查是否到了提交时间,如果是,它将提交它在上次轮询中返回的偏移量。

也许问题是我的英语不好,但我没有完全理解这个描述。

假设我使用默认间隔自动提交 - 5 秒,轮询每 7 秒发生一次。在这种情况下,提交将每 5 秒或每 7 秒发生一次?

如果轮询每 3 秒发生一次,您能否澄清行为?提交是每 5 秒还是每 6 秒发生一次?

我读过 这个

自动提交:您可以将 auto.commit 设置为 true 并使用以毫秒为单位的值设置 auto.commit.interval.ms 属性。启用此功能后,Kafka 消费者将提交收到的最后一条消息的偏移量以响应其 poll() 调用。 poll() 调用在后台以设置的 auto.commit.interval.ms 发出。

它与答案相矛盾。

你能详细解释一下这个东西吗?

假设我有这样的图表:

0 秒 - 轮询

4 秒 - 轮询

8 秒 - 轮询

offset 什么时候提交,什么时候提交?

原文由 gstackoverflow 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.2k
2 个回答

每次轮询都会调用自动提交检查,它会检查经过的时间是否大于配置的时间。如果是,则提交偏移量。

如果提交间隔为 5 秒并且轮询在 7 秒内发生,则提交将仅在 7 秒后发生。

原文由 Liju John 发布,翻译遵循 CC BY-SA 4.0 许可协议

它会尝试在轮询完成后尽快自动提交。你可以看看consumer coordinator的源码,里面定义了一组类级别的局部字段,用来了解是否启用自动提交,间隔时间是多少,下一次执行自动提交的截止日期是什么时候。

https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L625

民意调查中执行存储调用的地方之一 https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ ConsumerCoordinator.java#L279

也就是说,例如轮询每 7 秒执行一次,自动提交设置为 5:

0 - 轮询,+ 将截止日期设置为第 5 秒

7 - 轮询 + 由于截止日期提交,将截止日期更新为 7+5=12

14 - 轮询 + 由于截止日期提交,将截止日期更新为 12+5=17

但是,如果轮询设置为每 3 秒一次,并且自动提交设置为 5:

0 - 轮询,+ 将截止日期设置为第 5 秒

3 - 轮询,不提交

6 - 轮询 + 由于截止日期提交,将截止日期更新为 6+5=11

原文由 zubrabubra 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
logo
Stack Overflow 翻译
子站问答
访问
宣传栏