如何实现本地缓存自动更新?

背景: 项目里使用了Guava本地缓存,缓存了数据库的一部分数据,项目使用K8S部署,大概有10台左右的机器。当数据库更新时,希望所有机器的缓存同步更新。目前采用的是canal监听binlog + 刷入kafka。基于此场景,所以项目使用了广播模式来消费kafak的消息。

问题:由于机器的数目会基于整体压力动态变化,并不是固定数量,所以我们在项目里并没有写死消费者组ID,而是采用了随机数目的方式。但是每次重启容器都会造成kafka的消息堆积,虽然关掉的这个容器的消费者组状态已经是empty了,但不知为什么,腾讯云会保留empty消费者组然后消息堆积,然后就告警。时间一长就会掩盖真正的消息堆积error。虽然可以每次重启后都去删除empty消费者组,但是这样太麻烦了。以后要是100台机器,部署一次就得删100次(这玩意儿还不能多选只能一个个删)

我想追求的是:每个服务的消费者组ID是固定的。
比如:假设现在有4个节点组成的集群,那么消费者组id最好为consumer1-4. 如果扩容到20台,那么他会自动的变为consumer1-20. 如果其中一个节点挂掉了,比如consumer16挂掉,过了一会k8s自己恢复了,我希望新启动的机器自动基础consumer16这个ID。如果此时缩容了,比如由20台缩容到了10台,我希望消费者组ID变为consumer1-10;

又或者?我不用广播模式,而是所有节点用一个消费者组,其中一个消费者消费后,主动轮询调用所有节点的接口去更新缓存(异步)?那这还得维护一个节点的列表。

大家觉得哪种方式更好?或者有其他解决方法?

阅读 1.7k
avatarAI BotBETA

对于你的问题,我有以下几种解决方案:

方案一:动态消费者组ID

你可以考虑使用一些能够动态管理消费者组的工具,如Spring Cloud Stream。它可以根据你的应用所在的动态集群自动创建和删除消费者组。

你只需要将每个服务看作一个消息代理,服务启动时会自动加入到对应的消费者组中,服务停止时会自动离开消费者组。消费者组ID可以基于服务实例的名称或ID动态生成。这样,即使有节点挂掉或者新加入节点,消费者组也会自动适应。

Spring Cloud Stream支持多种消息中间件,包括Kafka,RabbitMQ等。你可以根据你的需求选择合适的消息中间件。

方案二:使用分布式锁

你可以使用分布式锁来保证同一时间只有一个节点更新缓存。当数据库有更新时,首先获取分布式锁,然后更新缓存,最后释放分布式锁。这样可以避免多个节点同时更新缓存造成的数据不一致问题。

常见的分布式锁有Zookeeper,Redis等。你可以根据项目需求选择合适的分布式锁。

方案三:使用消息队列进行通知

当数据库有更新时,将更新事件发送到消息队列中,然后由一个单独的服务消费消息队列中的事件,并通知其他节点更新缓存。这样可以保证所有节点都能及时获取到数据库的更新事件,并更新本地缓存。

你可以选择Kafka,RabbitMQ等作为消息队列。

以上三种方案都可以解决你的问题,你可以根据你的项目需求选择合适的方案。

2 个回答

当节点的数量较多,使用本地缓存不是个好选择,可以使用缓存服务来实现,比如redis。当数据库更新时,canal监听binlog之后发现数据更新,则将redis中的缓存数据进行更新

新手上路,请多包涵

听起来你的核心诉求是需要将数据更新这一事件,实时广播到各个节点。
我们的场景是下发规则/配置信息到各个节点,采用的是 redis 的发布-订阅能力,将内容双写在redis, 通过发布订阅事件触发各节点的拉取redis动作;
你可以参考下面这篇文章
https://segmentfault.com/a/1190000042333233

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
宣传栏