let client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'});
let Consumer = kafka.Consumer;
let consumer = new Consumer(client, [{ topic: 'message-1', partition: 0 }],{autoCommit: true, fromOffset: true});
client.on('ready', function () {
console.log('client ready')
});
consumer.on('message', function(message) {
console.log(message)
});
consumer.on('offsetOutOfRange', function (e) {
console.log('offsetOutOfRange')
// 发现问题是超出位置 所以在此处设置一下offset
// 但随之而来的是另一个问题offset.fetch()方法拿到的不是最新的消费信息位置,求有经验的大佬给指点一下,kafka-node有没有可是将offset设置到最新的消费位置的方法
offset.fetch(topic, function (err, data) {
consumer.setOffset(topic, partition, data[topic][partition] + 1)
})
});
consumer.on('error', function(error) {
console.log(error)
});