利用RetryOperationsInterceptor做重试机制,假如某个消息抛异常重试,会导致这个消息之后的来的消息一直被阻塞吗?还是这个重试的消息被插到队尾,让其他消息消费?
@Bean
SimpleRabbitListenerContainerFactory lowLoadRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory()
factory.connectionFactory = connectionFactory
factory.concurrentConsumers = 1
factory.maxConcurrentConsumers = 1
factory.recoveryInterval = 1000L
factory.setAdviceChain(retryOperationsInterceptor())
return factory
}
@Bean
RetryOperationsInterceptor retryOperationsInterceptor() {
RetryTemplate retryTemplate = new RetryTemplate()
RetryPolicy retryPolicy = new SimpleRetryPolicy(Integer.MAX_VALUE)
retryPolicy.setMaxAttempts(5)
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy()
backOffPolicy.setInitialInterval(60000)
backOffPolicy.setMultiplier(2)
backOffPolicy.setMaxInterval(3600000)
retryTemplate.setRetryPolicy(retryPolicy)
retryTemplate.setBackOffPolicy(backOffPolicy)
retryTemplate.registerListener(new RetryListener() {
@Override
<T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
return true
}
@Override
<T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
if (throwable != null) {
log.error("Failed: Retry count " + context.getRetryCount(), throwable)
}
}
@Override
<T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.error("Retry count " + context.getRetryCount(), throwable)
}
})
RetryOperationsInterceptor interceptor = RetryInterceptorBuilder.stateless()
.retryOperations(retryTemplate)
.build()
return interceptor
}
会重新发布的,不会阻塞后面的
通过看
spring-amqp
的源码知道。这个职责是委托给MessageRecoverer
来完成的。这个接口在
spring-amqp
中有两个实现,一个是RejectAndDontRequeueRecoverer
和RepublishMessageRecoverer
,见名知意,有两个策略。前一种是放入死信队列,后一种是重新发布。更详细的机制看这两个实现类的注释会更加的清晰。