我有以下程序来使用所有发送到 Kafka 的消息。
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
group_id='my-group',
bootstrap_servers=['my_kafka:9092'])
for message in consumer:
consumer.commit()
print ("%s key=%s value=%s" % (message.topic,message.key,
message.value))
KafkaConsumer.close()
使用上面的程序,我能够使用所有发送到 Kafka 的消息。但是一旦所有消息都被消耗掉,我想关闭没有发生的kafka消费者。我同样需要帮助。
原文由 pankmish 发布,翻译遵循 CC BY-SA 4.0 许可协议
如果我 向 KafkaConsumer 对象提供 consumer_timeout_ms 参数, 我现在可以关闭 kafka 消费者。它接受以毫秒为单位的超时值。下面是代码片段。
在上面的代码中,如果消费者在 1 秒内没有看到任何消息,它将关闭会话。