• 1.3k

如何控制消息队列的消费速度?

一、需求:

比如我消费1000个队列。我将速度等级分为100个等级。

1倍速,每小时消费800个。
100倍速就是每小时消费 800*100个。

这样就可以计算每个队列的消费间隔,比如1倍速间隔是 4500 毫秒。
100倍速就是 45毫秒。

1倍速要搞这个间隔,没问题。
100倍速,45毫秒,这个就有大问题了。

你想想,分布式系统,如何控制所有服务器间隔45毫秒去消费一个消息?就算可以实现,性能也是大打折扣的。

所以,间隔1秒以下,这种方式就必须淘汰了。

二、解决方案

使用节点的方式。

比如1倍速,使用1个节点去消费,100倍速,使用100个节点去消费。
这就等于1倍速使用1个人帮你干活,100倍速,使用100个人帮你干活。这个貌似是比较符合逻辑的,也比较可行的。

但如何均衡又是一个问题。比如我一台服务器10个nodejs节点去消费。10台就是100个节点去消费。

A任务使用1倍速,B任务使用100倍速,1000个任务,100台服务器呢?怎么去群衡,分配合理的资源去消费?

感觉挺复杂的。

------------1月12日补充-------------
如果使用redis限流,这个redis在分布式环境下,压力可想而知,所以这条路是错的。

我最后找到了一种高效的限流方式。不需要redis。消息队列每次吐出一个队列,每次计算两个队列的间隔就行了。

比如:

1、吐出队列A
2、间隔45毫秒
3、标记队列A已消费
4、吐出队列B

这样能保证队列A和队列B的间隔是45毫秒,而且所有的计算都是本地完成的,不需要redis。不需要网络通信,这个是我目前找到的最好的办法。

------------1月17日补充-------------

我最近写了这部分的代码,我需要遵循下面的标准:

1、所有的运算都在本地运行,不通过网络
2、可以多台计算机分布式限流,也就是N个实例运行的时候你依然可以起到限流的作用
3、每次运算的时间控制在10毫秒内

我一共需要判断5个限流:
   a:  域名限流
   b:  消息队列限流
   等等。总共5个

如何做?

1、首次启动实例,从redis缓存读取所有的限流数据,包括各种限流配置,目的是保证所有的限流运算都不通过网络请求。

比如某个 mq=test-abc 的 消息队列上次访问时间是10毫秒之前,我需要记录起来,放到redis里面。

2、计算。比如上次访问是10毫秒之前,根据qps计算出下次访问时间。

3、运行消息队列。

4、运行之后,把这次的运行时间记录起来,每隔N分钟缓存到redis数据。

问题来了,如何保证分布式限流?你现在所有的计算都是单机上,不通过网络,如何保证N个实例也能限流?

这个简单。

比如我一个消息队列设定每秒钟 消费 10个,假如有10个实例在消费,那么我单机控制每秒钟消费1个就行了。

假如设定的qps很高,比如1秒钟1万个,计算下来,间隔也就是 0.1毫秒而已, 那么这种情况下没必要限流。

我测试了一下代码性能,每次判断限流,所用的时间都在2毫秒一下,这个算性能很高了,又能达到限流的目的。

网上搜索出来的redis方案,其实都是很垃圾的办法。

你的redis没法承受那么高的计算,只要走网络的方案都要丢弃。

阅读 1.8k
评论
    1 个回答
    • 1.3k

    我的解决方案:

    1、具体思路

    根据时间去划分等级估计不行,只能根据资源去划分。如何根据资源划分?那就使用节点的多少。只要能做到100倍速是1倍速的100倍资源就行了。

    2、如何实施

    使用kubernetes容器技术,将程序分成N个节点去执行任务。比如整个分布式系统划分成1000个节点。那么1倍速,就是N个节点运行任务,100倍速,就是100*N个节点运行任务。

    大致思路就是这样。
    不过具体的细节估计还有很多东西要解决。
    就这样,说干就干。


    隔了不久,在某本书上面看到一个东西,叫“限流器”。
    啥叫限流器,搜索了一下,就是限制速度的东西。然后搜出一堆论文和文章。原来早有人研究了,我只是不知道这东西叫限流器而已

    相关文章:

    How to Design a Scalable Rate Limiting Algorithm

    API 调用次数限制实现

    token bucket


    这几天用 lua + redis 实现了一个滑动窗口算法的限流器,代码如下

    
    
    local hsub_key =  KEYS[1];
    local hkey =  KEYS[2];
    local max_count =  tonumber(KEYS[3]);
    
    local hval_res = redis.call('hmget', hsub_key, hkey)
    
    local hval = tonumber(hval_res[1]);
    
    local res = true;
    
    
    if(hval == nil)
    then
       hval = 0;
    end;
    
    if(hval > max_count or hval == max_count)
    
    then
       res = false;
    else
       redis.call('hincrby', hsub_key, hkey, 1)
       res = true;
    
    end
    
    
    if(hval == 0)
    then
       redis.call('expire', hsub_key, 60*5);
    end;
    
    
    return res;
    
                

    因为是分布式,所以必须lua实现才能保证原子操作