我正在使用 Java 编写一个 kafka
消费者。我想保持消息的实时性,所以如果等待消费的消息过多,比如1000条或更多,我应该放弃未消费的消息,从最后一个偏移量开始消费。
对于这个问题,我尝试比较最后提交的偏移量和主题的结束偏移量(只有 1 个分区),如果这两个偏移量之间的差异大于一定量,我将设置主题的最后提交的偏移量为下一个偏移量,以便我可以放弃那些冗余消息。
现在我的问题是如何获取topic的end offset,有人说可以用old consumer,但是太复杂了,new consumer有这个功能吗?
原文由 Neptune 发布,翻译遵循 CC BY-SA 4.0 许可协议
新消费者也很复杂。
//assign the topic consumer.assign();
//seek to end of the topic consumer.seekToEnd();
//the position is the latest offset consumer.position();