redis断连重启后,stream流监听会失效?

新手上路,请多包涵

公司项目使用了StreamListener进行监听redis stream流消息数据;但每隔十几二十天的就会失效监听不到数据;
初步判断:应该是网络或者连接数等问题导致程序与redis服务断开连接,但问题还是无法定位。
以下是代码,有大佬知道或者遇到过这类问题嘛,还请指教如何解决!

@Bean
    public List<Subscription> subscription(RedisConnectionFactory factory){
        List<Subscription> resultList = new ArrayList<>();
        var options = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .build();
        for (String redisStreamName : redisStreamNames) {
            initStream(redisStreamName,groups[0]);
            var listenerContainer = StreamMessageListenerContainer.create(factory,options);
            Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(groups[0], this.getClass().getName()),
                    StreamOffset.create(redisStreamName, ReadOffset.lastConsumed()), streamListener);
            resultList.add(subscription);
            listenerContainer.start();
        }
        return resultList;
    }
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {

    RedisCache redisCache;

    public ListenerMessage(RedisCache redisCache){
        this.redisCache = redisCache;
    }

    @Override
    public void onMessage(MapRecord<String, String, String> entries) {
        try{
            Map<String, String> map = entries.getValue();
            String private_chat = map.get("private_chat");
            MessageSave messageSave = JSON.toJavaObject(JSON.parseObject(private_chat),MessageSave.class);
           
            log.info("当前正在处理:{}",messageSave.getMsgtime());
            QyTagService qyTagService = SpringUtils.getBean(QyTagService.class);
            qyTagService.auditPrivateMessage(messageSave);
            //check用于验证key和对应消息是否一直
            log.info("stream name :{}, body:{}, check:{}",entries.getStream(), map,(entries.getStream().equals(map.get("name"))));
            redisCache.ack(entries.getStream(),"group2",entries.getId().getValue());
            redisCache.delField(entries.getStream(),entries.getId().getValue());
        }catch (Exception e){
            log.error("error message:{}",e.getMessage());
        }
    }
}

redis配置:

  redis:
    expire: 60000 # 过期时间
    database: 0 # Redis使用的库
    host: 
    port: 6379 #端口号
    timeout: 100000        #  连接超时时间(毫秒)
    cache:
      type: redis           #使用redis做缓存
阅读 1.7k
avatarAI BotBETA

这种问题可能是由于以下原因导致的:

  1. Redis连接超时:在你的代码中,可以看到设置了Redis连接的超时时间,但是这个超时时间可能不足以应对长时间的网络不稳定或者Redis服务器的延迟。这可能导致程序与Redis服务断开连接,从而使得stream流监听失效。可以尝试增加超时时间的设置,例如设置为60秒或者更长。
  2. Redis服务器重启:如果Redis服务器在程序断开连接后重启,那么监听可能会失效。这种情况下,可能需要重新连接Redis服务器。
  3. 网络问题:网络问题也可能导致连接丢失。可以尝试检查应用程序和Redis服务器之间的网络连接是否稳定。
  4. 订阅数量限制:Redis Streams对每个连接的订阅数量有限制,如果超过这个限制,可能会导致一些订阅被丢弃。
  5. Redis配置问题:检查Redis配置,确保Redis服务器的配置参数(如超时时间、最大客户端连接数等)与应用程序的需求相匹配。

如果以上方法都不能解决问题,可能需要进一步检查应用程序的日志和监控数据,以了解更多关于问题的详细信息。例如,可以查看应用程序是否有任何与Redis连接或操作相关的错误或异常,以及Redis服务器的状态和性能等。

1 个回答
新手上路,请多包涵

使用cancelOnError方法,返回false

//注册
            var options = StreamMessageListenerContainer.StreamReadRequest
                    .builder(StreamOffset.create(redisStreamName, ReadOffset.lastConsumed()))
                    .cancelOnError(throwable -> {
                        System.out.println("这是一个错误"+throwable);
                        // 不能取消
                        return !(throwable instanceof RuntimeException);
                    })
                    .consumer(Consumer.from(groups[0], this.getClass().getName()))
                    .autoAck(true)
                    .build();
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题