package com.baidu.mrd.novel.novelNews;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringEncoder;
import org.apache.log4j.Logger;
public class ReCrawl implements Runnable{
private static final Logger LOG = Logger.getLogger(ReCrawl.class);
private static String topic="chapter.apply.review";
final int numThreads = 4;
Properties properties;
public ReCrawl() {
//cms kafka配置
properties = new Properties();
properties.put("zookeeper.connect","*******");
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("zookeeper.session.timeout.ms", "15000");
properties.put("group.id", "CMS_Consumer");
}
public void run() {
System.out.println("ReCrawl service start");
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(numThreads));
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
for (final KafkaStream stream : streams) {
executor.submit(new ReCrawlThead(stream));
}
}
//消费CMS kafka 获得 _id
//查_id对应的path 重新抓取章节
public static void main(String[] args) {
Thread t = new Thread(new ReCrawl());
t.start();
}
}
Exception in thread "Thread-1" kafka.common.ConsumerRebalanceFailedException: CMS_Consumer_zw_93_179-1487647552758-c6567524 can't rebalance after 5 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:92)
at com.baidu.mrd.novel.novelNews.ReCrawl.run(ReCrawl.java:45)
at java.lang.Thread.run(Thread.java:724)
请问这是什么问题???
消费均衡两次重试之间的时间间隔
rebalance.backoff.ms 2000