基于spring boot,如何在Redis Stream中实现多线程并行消费消息队列?

使用redis stream实现消息队列,消费者在消费时每次都是同一个线程,无法并行消费!!!

创建StreamMessageListenerContainer

StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ? extends ObjectRecord<String, ?>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                    .builder()
                    .pollTimeout(Duration.ofSeconds(10)) 
                    .batchSize(10) 
                    .targetType(targetClass)
                    .executor(taskExecutor)
                    .build();

线程池配置

@Configuration
@EnableAsync
public class RedisExecutorConfig {

    @Bean("redisTaskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(20);
        taskExecutor.setMaxPoolSize(35);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("redis-container-thread-");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }

}

啥原因呢,为啥这个线程池设置了没效果,每次都是同一个线程名称,有啥好的方法设置消费端并行消费呢?

        /**
         * Configure a {@link Executor} to run stream polling {@link Task}s.
         *
         * @param executor must not be null.
         * @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
         */
        public StreamMessageListenerContainerOptionsBuilder<K, V> executor(Executor executor) {

            Assert.notNull(executor, "Executor must not be null!");

            this.executor = executor;
            return this;
        }

进入public StreamMessageListenerContainerOptionsBuilder<K, V> executor(Executor executor)方法查看源码发现注释翻译过来是配置{@link Executor}以运行流轮询{@link Task},难道是这个executor不支持多线程消费吗?

阅读 1.3k
avatarAI BotBETA

在Spring Data Redis中,使用StreamMessageListenerContainer并配置executor以支持多线程消费是正确的方法。然而,如果你发现所有的消息处理都在同一个线程上执行,这可能是由于几个原因造成的:

  1. 配置问题:确保你的StreamMessageListenerContainer确实使用了配置的executor。从你提供的代码看,你已经正确地将taskExecutor设置为了StreamMessageListenerContainerOptionsexecutor
  2. 消费者组(Consumer Group)的单一消费者实例:如果你的Redis Stream消费者组只有一个消费者实例,那么即使你配置了多线程,也只有这一个消费者实例会接收消息。Redis Stream的消费者组设计是确保消息在每个消费者组内只被处理一次。如果你想要并行处理,你需要在同一个消费者组内增加更多的消费者实例,或者在多个消费者组之间进行分布。
  3. Spring Boot版本和依赖库版本:确保你使用的Spring Boot和Spring Data Redis的版本支持多线程消息监听。通常,较新的版本会包含对多线程和并发更好的支持。
  4. 日志和调试:查看应用的日志,特别是关于Spring Data Redis和线程池的日志,以确认是否有任何异常或错误导致线程池没有按预期工作。此外,你可以在消费者方法中添加日志记录,以查看是哪个线程正在处理消息。
  5. Redis配置:检查Redis服务器的配置,确保没有限制连接数或队列的并行处理能力。
  6. 代码验证:在你的消息处理逻辑中,确保没有单线程阻塞的操作(如同步等待外部资源),这可能会阻止其他线程的执行。

解决方案

  1. 增加消费者实例:如果你是在一个Spring Boot应用中,考虑创建多个应用实例或使用Spring Cloud Stream等框架来支持多个消费者实例。
  2. 使用多个消费者组:如果你的应用可以处理这种架构,你可以将消息分发到多个消费者组,每个消费者组有自己的消费者实例和线程池。
  3. 验证和测试:确保你的配置正确无误,并进行充分的测试来验证多线程消费是否按预期工作。
  4. 查阅文档和社区:查看Spring Data Redis和Spring Boot的官方文档,以及相关的社区论坛和Stack Overflow问题,看看是否有其他人遇到了类似的问题并找到了解决方案。
  5. 更新依赖:如果可能的话,尝试更新你的Spring Boot和Spring Data Redis依赖到最新版本,以获得更好的性能和功能支持。
1 个回答

在消费者监听器的onMessage方法上加上@Async("redisTaskExecutor")注解就能实现异步消费了。初步估计应该是redis为了维护消费的顺序,所以在消费时是以轮询的方式处理的吧。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进