Spring RabbitMQ - 在带有@RabbitListener 配置的服务上使用手动通道确认

新手上路,请多包涵

如何在不使用自动确认的情况下手动确认消息。有没有办法将其与 @RabbitListener@EnableRabbit 配置样式一起使用。大多数文档告诉我们使用 SimpleMessageListenerContainer 以及 ChannelAwareMessageListener 。然而,使用它我们失去了注释提供的灵活性。我已将我的服务配置如下:

 @Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}


我的 RabbitConfiguration 如下

@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean

public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory;
    }

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    return connectionFactory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setContainerFactory(myRabbitListenerContainerFactory());
}

@Autowired
private EventReceiver receiver;
}
}

对于如何调整手动通道确认以及上述配置样式,我们将不胜感激。如果我们实现 ChannelAwareMessageListener,那么 onMessage 签名将会改变。我们可以在服务上实现 ChannelAwareMessageListener 吗?

原文由 Guru 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 536
2 个回答

添加 Channel@RabbitListener 方法…

 @RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    ...
}

并在 basicAck , basicReject 中使用标签。

编辑

@SpringBootApplication
@EnableRabbit
public class So38728668Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
        context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
        context.close();
    }

    @Bean
    public Queue so38728668() {
        return new Queue("so38728668");
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    public static class Listener {

        private final CountDownLatch latch = new CountDownLatch(1);

        @RabbitListener(queues = "so38728668")
        public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
                throws IOException {
            System.out.println(payload);
            channel.basicAck(tag, false);
            latch.countDown();
        }

    }

}

应用程序.properties:

 spring.rabbitmq.listener.acknowledge-mode=manual

原文由 Gary Russell 发布,翻译遵循 CC BY-SA 3.0 许可协议

以防万一您需要使用 ChannelAwareMessageListener 类中的#onMessage()。然后你可以这样做。

 @Component
public class MyMessageListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) {
        log.info("Message received.");
        // do something with the message
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

而对于兔子配置

@Configuration
public class RabbitConfig {

    public static final String topicExchangeName = "exchange1";

    public static final String queueName = "queue1";

    public static final String routingKey = "queue1.route.#";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("xxxx");
        connectionFactory.setPassword("xxxxxxxxxx");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vHost1");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    }

    @Bean
    public SimpleMessageListenerContainer listenerContainer(MyMessageListener myRabbitMessageListener) {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory());
        listenerContainer.setQueueNames(queueName);
        listenerContainer.setMessageListener(myRabbitMessageListener);
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        listenerContainer.setConcurrency("4");
        listenerContainer.setPrefetchCount(20);
        return listenerContainer;
    }
}

原文由 Parisana Ng 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题