消费完所有消息后如何关闭kafka消费者?

新手上路,请多包涵

我有以下程序来使用所有发送到 Kafka 的消息。

 from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'])
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
KafkaConsumer.close()

使用上面的程序,我能够使用所有发送到 Kafka 的消息。但是一旦所有消息都被消耗掉,我想关闭没有发生的kafka消费者。我同样需要帮助。

原文由 pankmish 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 2k
2 个回答

如果我 向 KafkaConsumer 对象提供 consumer_timeout_ms 参数, 我现在可以关闭 kafka 消费者。它接受以毫秒为单位的超时值。下面是代码片段。

 from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'],
                         consumer_timeout_ms=1000)
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
KafkaConsumer.close()

在上面的代码中,如果消费者在 1 秒内没有看到任何消息,它将关闭会话。

原文由 pankmish 发布,翻译遵循 CC BY-SA 3.0 许可协议

Kafka 配置参数 enable.partition.eof 是你所需要的。将此配置设置为 true 时。每当消费者到达分区末尾时,它将发出 PARTITION_EOF 事件。因此,您可以通过一些回调函数知道何时到达分区的末尾。通过这种方式,您可以选择在到达所有分区的末尾时关闭消费者。

原文由 Joanna 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题