kafka -can't rebalance after 5 retries

package com.baidu.mrd.novel.novelNews;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringEncoder;

import org.apache.log4j.Logger;

public class ReCrawl implements Runnable{
    private static final Logger LOG = Logger.getLogger(ReCrawl.class);

    private static String topic="chapter.apply.review";  
    final int numThreads = 4;
    Properties properties;
    
    public ReCrawl() {
        //cms kafka配置
        properties = new Properties();
        properties.put("zookeeper.connect","*******");
        properties.put("serializer.class", StringEncoder.class.getName());
        properties.put("zookeeper.session.timeout.ms", "15000");
        properties.put("group.id", "CMS_Consumer");

    }
    
    
    public void run() { 
        System.out.println("ReCrawl service start");
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put(topic, new Integer(numThreads));  
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);  
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);  
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);  
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  
        ExecutorService executor = Executors.newFixedThreadPool(numThreads);  
        for (final KafkaStream stream : streams) {  
            executor.submit(new ReCrawlThead(stream));  
        }
    }
    
    //消费CMS kafka 获得 _id
    //查_id对应的path 重新抓取章节
    public static void main(String[] args) {
        Thread t = new Thread(new ReCrawl());  
        t.start();  
    }

}
Exception in thread "Thread-1" kafka.common.ConsumerRebalanceFailedException: CMS_Consumer_zw_93_179-1487647552758-c6567524 can't rebalance after 5 retries
        at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
        at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
        at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:92)
        at com.baidu.mrd.novel.novelNews.ReCrawl.run(ReCrawl.java:45)
        at java.lang.Thread.run(Thread.java:724)

请问这是什么问题???

阅读 5.9k
1 个回答

消费均衡两次重试之间的时间间隔

rebalance.backoff.ms 2000

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题