rabbitmq某消息重试会导致其后到的消息被阻塞吗?

esolve
  • 947

利用RetryOperationsInterceptor做重试机制,假如某个消息抛异常重试,会导致这个消息之后的来的消息一直被阻塞吗?还是这个重试的消息被插到队尾,让其他消息消费?

       @Bean
        SimpleRabbitListenerContainerFactory lowLoadRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory()
            factory.connectionFactory = connectionFactory
            factory.concurrentConsumers = 1
            factory.maxConcurrentConsumers = 1
            factory.recoveryInterval = 1000L
            factory.setAdviceChain(retryOperationsInterceptor())
            return factory
        }

        @Bean
        RetryOperationsInterceptor retryOperationsInterceptor() {
            RetryTemplate retryTemplate = new RetryTemplate()
            RetryPolicy retryPolicy = new SimpleRetryPolicy(Integer.MAX_VALUE)
            retryPolicy.setMaxAttempts(5)
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy()
            backOffPolicy.setInitialInterval(60000)
            backOffPolicy.setMultiplier(2)
            backOffPolicy.setMaxInterval(3600000)
            retryTemplate.setRetryPolicy(retryPolicy)
            retryTemplate.setBackOffPolicy(backOffPolicy)
            retryTemplate.registerListener(new RetryListener() {
                @Override
                <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                    return true
                }

                @Override
                <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
                    if (throwable != null) {
                        log.error("Failed: Retry count " + context.getRetryCount(), throwable)
                    }
                }

                @Override
                <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
                    log.error("Retry count " + context.getRetryCount(), throwable)
                }
            })
            RetryOperationsInterceptor interceptor = RetryInterceptorBuilder.stateless()
                    .retryOperations(retryTemplate)
                    .build()

            return interceptor
        }
回复
阅读 2k
1 个回答

会重新发布的,不会阻塞后面的

通过看spring-amqp的源码知道。这个职责是委托给MessageRecoverer来完成的。

package org.springframework.amqp.rabbit.retry;

import org.springframework.amqp.core.Message;

/**
 * @author Dave Syer
 *
 */
public interface MessageRecoverer {

    /**
     * Callback for message that was consumed but failed all retry attempts.
     *
     * @param message the message to recover
     * @param cause the cause of the error
     */
    void recover(Message message, Throwable cause);

}

clipboard.png

这个接口在spring-amqp中有两个实现,一个是 RejectAndDontRequeueRecovererRepublishMessageRecoverer,见名知意,有两个策略。前一种是放入死信队列,后一种是重新发布。更详细的机制看这两个实现类的注释会更加的清晰。

clipboard.png

--- RepublishMessageRecoverer


    @Override
    public void recover(Message message, Throwable cause) {
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
        headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
        headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange());
        headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey());
        Map<? extends String, ? extends Object> additionalHeaders = additionalHeaders(message, cause);
        if (additionalHeaders != null) {
            headers.putAll(additionalHeaders);
        }

        if (null != this.errorExchangeName) {
            String routingKey = this.errorRoutingKey != null ? this.errorRoutingKey : this.prefixedOriginalRoutingKey(message);
            this.errorTemplate.send(this.errorExchangeName, routingKey, message);
            if (this.logger.isWarnEnabled()) {
                this.logger.warn("Republishing failed message to exchange " + this.errorExchangeName);
            }
        }
        else {
            final String routingKey = this.prefixedOriginalRoutingKey(message);
            this.errorTemplate.send(routingKey, message);
            if (this.logger.isWarnEnabled()) {
                this.logger.warn("Republishing failed message to the template's default exchange with routing key " + routingKey);
            }
        }
    }
你知道吗?

宣传栏