问题描述
spring集成RabbitMQ后做各种类型消息的测试,使用confirm模式时有正常发送和消费消息,但是回调没有执行
想法思路
起初猜想是否跟没有开启手动确认有关,尝试过后发现无果
相关代码
- rabbit监听的一些配置
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="3" />
<property name="maxConcurrentConsumers" value="10" />
<property name="acknowledgeMode" value="MANUAL" />
</bean>
- 生产者代码
@Autowired
private RabbitTemplate rabbitTemplate;
public void submitConfirm() {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString().replace("-", ""));
UserEntity user = new UserEntity();
user.setUsername("测试订单" + UUID.randomUUID().toString().replace("-", ""));
user.setPassword("123456");
rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> {
System.out.println("correlationData--->" + correlation);
System.out.println(ack);
if (ack) {
System.out.println("正常投递回复...");
//后续执行其他业务...
} else {
System.out.println("投递异常....");
//后续记录等操作...
}
});
rabbitTemplate.convertAndSend("topic-exchange", "order.add", user, correlationData);
}
- 消费者代码
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "topic-exchange", durable = "true", type = ExchangeTypes.TOPIC),
value = @Queue(value = "topic-queue", durable = "true"),
key = "order.#"
))
@RabbitHandler
public void onMessage(@Payload UserEntity body, @Headers Map<String, Object> headers, Channel channel) throws IOException {
Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
try {
System.out.println("[corfim]接收到消息: " + body);
//后续业务处理...
channel.basicAck(tag, false);
} catch (IOException e) {
System.out.println("异常...");
e.printStackTrace();
channel.basicNack(tag, false,false);
}
}
- 结果打印:
[corfim]接收到消息: UserEntity{username='测试订单ba5a338d44ad4b6daa9fee0de0626dde', password='123456'}
以上就是相关代码及问题描述,还请了解的朋友赐教
没有配置