我的Kafka在客户端每次上线都会有日志,我看我的一个storm程序中使用了producer,而且这个程序我就启动了一次然后一直跑,如果没有问题的话,日志中就只有一条producer上线的日志,但是我的日志里面有4000多条。storm程序什么时候会重启?
public class MyBolt extends BaseBasicBolt {
private MyProducer producer;
private MyHolder holder;
@Override
public void prepare(Map stormConf, TopologyContext context) {
try{
producer = new MyProducer() ;
} catch (Exception e){
logger.error(e.getMessage());
}
holder = new MyHolder(5);
}
@Override
public void execute(Tuple tuple, MyCollector collector) {
ProducerResult result = producer.sendMessage(msg);
if (result.getProducerStatus().equals(ProducerStatus.SEND_FAILURE)){
System.out.println("fail") ;
} else if (result.getProducerStatus().equals(ProducerStatus.SEND_OK)){
System.out.println("success") ;
}
}
}
} catch (Exception e) {
StringBuilder errorMsg = new StringBuilder();
errorMsg.append(msg).append("\n #### ").append(e.getMessage());
logger.error(errorMsg.toString(), e);
eMetric.incr();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields(Constants.FIELD, FIELD));
}
}