使用redis stream实现消息队列,消费者在消费时每次都是同一个线程,无法并行消费!!!
创建StreamMessageListenerContainer
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ? extends ObjectRecord<String, ?>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(10))
.batchSize(10)
.targetType(targetClass)
.executor(taskExecutor)
.build();
线程池配置
@Configuration
@EnableAsync
public class RedisExecutorConfig {
@Bean("redisTaskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(20);
taskExecutor.setMaxPoolSize(35);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("redis-container-thread-");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
}
啥原因呢,为啥这个线程池设置了没效果,每次都是同一个线程名称,有啥好的方法设置消费端并行消费呢?
/**
* Configure a {@link Executor} to run stream polling {@link Task}s.
*
* @param executor must not be null.
* @return {@code this} {@link StreamMessageListenerContainerOptionsBuilder}.
*/
public StreamMessageListenerContainerOptionsBuilder<K, V> executor(Executor executor) {
Assert.notNull(executor, "Executor must not be null!");
this.executor = executor;
return this;
}
进入public StreamMessageListenerContainerOptionsBuilder<K, V> executor(Executor executor)方法查看源码发现注释翻译过来是配置{@link Executor}以运行流轮询{@link Task},难道是这个executor不支持多线程消费吗?
在消费者监听器的onMessage方法上加上@Async("redisTaskExecutor")注解就能实现异步消费了。初步估计应该是redis为了维护消费的顺序,所以在消费时是以轮询的方式处理的吧。