@KafkaListener(topics = {"${kafka.topic.topicB}"}, groupId = "groupB")
public void consumeTopicB(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if(kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
/*
* 执行消费逻辑处理的代码,
*
*/
acknowledgment.acknowledge();// 消费成功后手动提交offset
logger.info("消费者B消费topicB:{} partition:{}的消息 -> {}", consumerRecord.topic(), consumerRecord.partition(),message);
}
比如在上面的消费逻辑处理过程中,失败了。那么此条消费要怎么处理呢?我是设置手动提交offset的。
我的思路是这样的:
如果失败了以后,把失败的数据存入到数据库中,然后在提交offset。然后后续在定时的从数据库中把失败的数据再次发送到对应的topic下,等待下次的消费。
但是这样的话有个问题,比如某条消息一直失败,不可能无限重复上面的操作吧?
所以我想的是在消息模型中添加一个失败重试次数属性:
public class KafkaMsg implements Serializable {
private static final long serialVersionUID = -1532915942422600087L;
private String msgId;
private String content;
private Integer retryTime; // 重试次数记录
public String getMsgId() {
return msgId;
}
public String getContent() {
return content;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public void setContent(String content) {
this.content = content;
}
public Integer getRetryTime() {
return retryTime;
}
public Integer setRetryTime(Integer time) {
this.retryTime = time;
}
@Override
public String toString() {
return "KafkaMsg{" +
"msgId='" + msgId + '\'' +
", content='" + content + '\'' +
'}';
}
}
然后消费失败后,先记录一下重试次数再把它存入数据库,然后定时再次发送到topic时,先判断它的重试次数是否达到上限,没有就再次写入topic等待再次被消费
其实不光是Kafka还有rabbitmq消费端消费失败后,重试也可以使用这样的方式处理。
不知道这样处理是否可行,还有其它的处理方式吗。大家讨论一下下
就是这样没错了,我一般设5分钟重试一次,3次失败就不管了,之后手工处理消息。