redis stream 断网重新链接

微凉
  • 816

使用spring-data-redis,链接工厂使用lettuceConnectionFactory。配置如下

@Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(PlanStreamMessageListener planStreamMessageListener){

        // 创建配置对象
        StreamMessageListenerContainer.
                StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>>
                streamMessageListenerContainerOptions =
                StreamMessageListenerContainer.
                        StreamMessageListenerContainerOptions
                        .builder()
                        // 一次性最多拉取多少条消息
                        .batchSize(1)
                        // 消息消费异常的handler
                        .errorHandler(e->{
                            log.error(e.getMessage(),e);
                        })
                        // 超时时间,设置为0,表示不超时(超时后会抛出异常)
                        .pollTimeout(Duration.ofSeconds(10))
                        // 序列化器
                        .serializer(new StringRedisSerializer())
                        .build();
        // 根据配置对象创建监听容器对象
        StreamMessageListenerContainer<String,MapRecord<String, String, String>> streamMessageListenerContainer =
                StreamMessageListenerContainer
                .create(lettuceConnectionFactory, streamMessageListenerContainerOptions);

        // 使用监听容器对象开始监听消费(使用的是手动确认方式)
        streamMessageListenerContainer.receive(Consumer.from(redisAppProperties.getGroupId(),
                redisAppProperties.getConsumerId()),
                StreamOffset.create(redisAppProperties.getReceiveStreamKey(), ReadOffset.lastConsumed()), planStreamMessageListener);
        return streamMessageListenerContainer;
    }
   
 @Override
    public void onApplicationEvent(ApplicationStartedEvent event) {
        event
                .getApplicationContext()
                // 相当于返回js中的闭包函数,函数式编程思想,主要式方便链式调用,在这里并不能延迟初始化
                .getBeanProvider(StreamMessageListenerContainer.class)
                .ifAvailable(Lifecycle::start);
    }

监听方式使用的时listener的方式,异常已经在方法内被捕获。
在使用的时候出现了问题:

2020-12-18 09:38:38.408 ERROR 236524 --- [SimpleAsyncTaskExecutor-1] c.b.boot.business.config.RedisConfig     : Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s)

org.springframework.dao.QueryTimeoutException: Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s)
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:70) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:273) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.convertLettuceAccessException(LettuceStreamCommands.java:712) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:602) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.connection.DefaultedRedisConnection.xReadGroup(DefaultedRedisConnection.java:591) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$4.inRedis(DefaultStreamOperations.java:310) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:376) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:371) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.core.DefaultStreamOperations.read(DefaultStreamOperations.java:305) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$3(DefaultStreamMessageListenerContainer.java:236) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:138) [spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:123) [spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_231]
Caused by: io.lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s)
    at io.lettuce.core.ExceptionFactory.createTimeoutException(ExceptionFactory.java:51) ~[lettuce-core-5.3.5.RELEASE.jar!/:5.3.5.RELEASE]
    at io.lettuce.core.LettuceFutures.awaitOrCancel(LettuceFutures.java:119) ~[lettuce-core-5.3.5.RELEASE.jar!/:5.3.5.RELEASE]
    at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:75) ~[lettuce-core-5.3.5.RELEASE.jar!/:5.3.5.RELEASE]
    at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:79) ~[lettuce-core-5.3.5.RELEASE.jar!/:5.3.5.RELEASE]
    at com.sun.proxy.$Proxy208.xreadgroup(Unknown Source) ~[na:na]
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:600) ~[spring-data-redis-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
    ... 12 common frames omitted

2020-12-18 10:01:39.805 ERROR 236524 --- [http-nio-8085-exec-5] o.a.c.c.C.[.[.[.[dispatcherServlet]      : Servlet.service() for servlet [dispatcherServlet] in context with path [/business] threw exception [Request processing failed; nested exception is org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: java.io.IOException: 远程主机强迫关闭了一个现有的连接。] with root cause

java.io.IOException: 远程主机强迫关闭了一个现有的连接。
    at sun.nio.ch.SocketDispatcher.read0(Native Method) ~[na:1.8.0_231]
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) ~[na:1.8.0_231]
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_231]
    at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_231]
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_231]
    at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253) ~[netty-buffer-4.1.54.Final.jar!/:4.1.54.Final]
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1134) ~[netty-buffer-4.1.54.Final.jar!/:4.1.54.Final]
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) ~[netty-transport-4.1.54.Final.jar!/:4.1.54.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) ~[netty-transport-4.1.54.Final.jar!/:4.1.54.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.54.Final.jar!/:4.1.54.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.54.Final.jar!/:4.1.54.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.54.Final.jar!/:4.1.54.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.54.Final.jar!/:4.1.54.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.54.Final.jar!/:4.1.54.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.54.Final.jar!/:4.1.54.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.54.Final.jar!/:4.1.54.Final]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_231]

2020-12-18 10:01:39.944  INFO 236524 --- [lettuce-eventExecutorLoop-1-2] i.l.core.protocol.ConnectionWatchdog     : Reconnecting, last destination was /xxx.xx.xx.xxx:6379
2020-12-18 10:01:40.017  INFO 236524 --- [lettuce-nioEventLoop-4-3] i.l.core.protocol.ReconnectionHandler    : Reconnected to xxx.xx.xx.xxx:6379

很明显,中途有过一次断网的经历,但即使redis已经正常链接了,messageListener也无法再次收到任何关于通道的消息,这让人很郁闷,不能保证中途一次网都不会断,但一旦断网,redis stream的消息就失效了,有大佬遇到过这种问题,有解决方案吗?

回复
阅读 1.8k
1 个回答
✓ 已被采纳
// 在运行时抛出异常,不取消任务执行
        StreamOffset<String> streamOffset = StreamOffset.create(redisAppProperties.getReceiveStreamKey(), ReadOffset.lastConsumed());
        Consumer consumer = Consumer.from(redisAppProperties.getGroupId(),
                redisAppProperties.getConsumerId());
        StreamMessageListenerContainer.StreamReadRequest<String> readRequest =
                StreamMessageListenerContainer.StreamReadRequest
                        .builder(streamOffset)
                        .cancelOnError(throwable -> {
                            // 查询超时,有可能时断网了,不能取消
                            return !(throwable instanceof QueryTimeoutException);
                        }).consumer(consumer)
                        .autoAcknowledge(false)
                        .build();
        streamMessageListenerContainer.register(readRequest, planStreamMessageListener);
  • @see StreamMessageListenerContainer.StreamReadRequestBuilder
  • @see Task#run StreamPollTask#doLoop
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
你知道吗?

宣传栏