Rabbitmq confirm 模式回调不生效

问题描述

spring集成RabbitMQ后做各种类型消息的测试,使用confirm模式时有正常发送和消费消息,但是回调没有执行

想法思路

起初猜想是否跟没有开启手动确认有关,尝试过后发现无果

相关代码

  • rabbit监听的一些配置
<bean id="rabbitListenerContainerFactory"
          class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="concurrentConsumers" value="3" />
        <property name="maxConcurrentConsumers" value="10" />
        <property name="acknowledgeMode" value="MANUAL" />
    </bean>
  • 生产者代码
@Autowired
private RabbitTemplate rabbitTemplate;

public void submitConfirm() {
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString().replace("-", ""));
        UserEntity user = new UserEntity();
        user.setUsername("测试订单" + UUID.randomUUID().toString().replace("-", ""));
        user.setPassword("123456");
        rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> {
            System.out.println("correlationData--->" + correlation);
            System.out.println(ack);
            if (ack) {
                System.out.println("正常投递回复...");
                //后续执行其他业务...
            } else {
                System.out.println("投递异常....");
                //后续记录等操作...
            }
        });
        rabbitTemplate.convertAndSend("topic-exchange", "order.add", user, correlationData);
    }
  • 消费者代码
 @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = "topic-exchange", durable = "true", type = ExchangeTypes.TOPIC),
            value = @Queue(value = "topic-queue", durable = "true"),
            key = "order.#"
    ))
    @RabbitHandler
    public void onMessage(@Payload UserEntity body, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        try {
            System.out.println("[corfim]接收到消息: " + body);
            //后续业务处理...
            channel.basicAck(tag, false);
        } catch (IOException e) {
            System.out.println("异常...");
            e.printStackTrace();
            channel.basicNack(tag, false,false);
        }

    }
  • 结果打印:

[corfim]接收到消息: UserEntity{username='测试订单ba5a338d44ad4b6daa9fee0de0626dde', password='123456'}

以上就是相关代码及问题描述,还请了解的朋友赐教

阅读 7k
1 个回答

没有配置

publisher-confirms=true
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题