RabbitMQ 示例:多线程、通道和队列

新手上路,请多包涵

我刚刚阅读了 RabbitMQ 的 Java API 文档,发现它内容丰富且简单明了。如何设置一个简单的 Channel 用于发布/消费的示例非常容易理解和理解。但这是一个非常简单/基本的示例,它给我留下了一个重要的问题: 如何设置 1+ Channels 向多个队列发布/消费?

假设我有一个带有 3 个队列的 RabbitMQ 服务器: loggingsecurity_eventscustomer_orders 所以我们要么需要一个 Channel 来发布/消费所有 3 个队列,或者更可能需要 3 个独立的 Channels ,每个专用于一个队列。

除此之外,RabbitMQ 的最佳实践规定我们为每个消费者线程设置 1 Channel 。对于这个例子,假设 security_events 只有 1 个消费者线程就可以了,但是 loggingcustomer_order 都需要 5 个线程来处理音量。所以,如果我理解正确的话,这是否意味着我们需要:

  • 1 Channel 和 1 个用于发布/消费的消费者线程 security_events ;和
  • 5 Channels 和 5 个用于发布/消费的消费者线程 logging ;和
  • 5 Channels 和 5 个用于发布/消费的消费者线程 customer_orders

如果我的理解在这里被误导,请首先纠正我。无论哪种方式,一些厌倦了战斗的 RabbitMQ 老手 可以帮助我用一个体面的代码示例“连接点”来设置满足我的要求的发布者/消费者吗? 提前致谢!

原文由 user1768830 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.5k
1 个回答

我认为你在初步理解上有几个问题。坦率地说,看到以下内容我有点惊讶: both need 5 threads to handle the volume 。您如何确定您需要那个确切的号码?你能保证 5 个线程就足够了吗?

RabbitMQ 经过调优和时间测试,因此它完全取决于正确的设计和高效的消息处理。

让我们尝试审查问题并找到合适的解决方案。顺便说一句,消息队列本身不会提供任何保证你有真正好的解决方案。您必须了解自己在做什么,并且还要进行一些额外的测试。

正如您肯定知道的那样,有许多可能的布局:

在此处输入图像描述

我将使用布局 B 作为最简单的方式来说明 1 生产者 N 消费者问题。由于您非常担心吞吐量。顺便说一句,正如您所期望的那样,RabbitMQ 表现得很好( 来源)。注意 prefetchCount ,后面会讲到:

在此处输入图像描述

因此,消息处理逻辑很可能是确保您有足够吞吐量的正确位置。每次需要处理消息时,您自然可以跨越一个新线程,但最终这种方法会杀死您的系统。基本上,线程越多,延迟就越大(如果需要,可以查看 Amdahl 定律)。

在此处输入图像描述

(参见 阿姆达尔定律图示

提示 #1:小心线程,使用 ThreadPools( 详细信息

线程池可以描述为 Runnable 对象(工作队列)的集合和运行线程的连接。这些线程不断运行并检查工作查询是否有新工作。如果有新的工作要做,他们就执行这个 Runnable。 Thread 类本身提供了一个方法,例如execute(Runnable r) 来将一个新的Runnable 对象添加到工作队列中。

 public class Main {
  private static final int NTHREDS = 10;

  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
    for (int i = 0; i < 500; i++) {
      Runnable worker = new MyRunnable(10000000L + i);
      executor.execute(worker);
    }
    // This will make the executor accept no new threads
    // and finish all existing threads in the queue
    executor.shutdown();
    // Wait until all threads are finish
    executor.awaitTermination();
    System.out.println("Finished all threads");
  }
}

提示 #2:小心消息处理开销

我会说这是显而易见的优化技术。您很可能会发送小而易于处理的消息。整个方法是关于不断设置和处理较小的消息。大消息最终会开一个糟糕的玩笑,所以最好避免这种情况。

在此处输入图像描述

所以最好发送小块信息,但是处理呢?每次提交作业都会产生开销。在传入消息率高的情况下,批处理非常有用。

在此处输入图像描述

例如,假设我们有简单的消息处理逻辑,我们不希望每次处理消息时都有线程特定的开销。为了优化这个非常简单的 CompositeRunnable can be introduced

 class CompositeRunnable implements Runnable {

    protected Queue<Runnable> queue = new LinkedList<>();

    public void add(Runnable a) {
        queue.add(a);
    }

    @Override
    public void run() {
        for(Runnable r: queue) {
            r.run();
        }
    }
}

或者以稍微不同的方式执行相同操作,即收集要处理的消息:

 class CompositeMessageWorker<T> implements Runnable {

    protected Queue<T> queue = new LinkedList<>();

    public void add(T message) {
        queue.add(message);
    }

    @Override
    public void run() {
        for(T message: queue) {
            // process a message
        }
    }
}

通过这种方式,您可以更有效地处理消息。

提示 #3:优化消息处理

尽管您知道可以并行处理消息( Tip #1 )并减少处理开销( Tip #2 ),但您必须快速完成所有操作。冗余的处理步骤、繁重的循环等可能会对性能产生很大影响。请参阅有趣的案例研究:

在此处输入图像描述

通过选择正确的 XML 解析器将 Message Queue 吞吐量提高十倍

技巧 #4:连接和频道管理

  • 在现有连接上启动新通道涉及一次网络往返 - 启动新连接需要多次。
  • 每个连接都使用服务器上的一个文件描述符。频道没有。
  • 在一个频道上发布一条大消息会在它出去时阻塞连接。除此之外,多路复用是相当透明的。
  • 如果服务器过载,正在发布的连接可能会被阻塞——最好将发布连接和消费连接分开
  • 准备好处理消息突发

来源

请注意,所有提示都可以完美地协同工作。如果您需要更多详细信息,请随时告诉我。

完整的消费者示例( 来源

请注意以下事项:

  • channel.basicQos(prefetch) - 正如您之前看到的 prefetchCount 可能非常有用:

此命令允许消费者选择一个预取窗口,该窗口指定它准备接收的未确认消息的数量。通过将预取计数设置为非零值,代理将不会向消费者发送任何违反该限制的消息。要向前移动窗口,消费者必须确认收到一条消息(或一组消息)。

  • ExecutorService threadExecutor - 您可以指定正确配置的执行程序服务。

例子:

 static class Worker extends DefaultConsumer {

    String name;
    Channel channel;
    String queue;
    int processed;
    ExecutorService executorService;

    public Worker(int prefetch, ExecutorService threadExecutor,
                  , Channel c, String q) throws Exception {
        super(c);
        channel = c;
        queue = q;
        channel.basicQos(prefetch);
        channel.basicConsume(queue, false, this);
        executorService = threadExecutor;
    }

    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        Runnable task = new VariableLengthTask(this,
                                               envelope.getDeliveryTag(),
                                               channel);
        executorService.submit(task);
    }
}

您还可以检查以下内容:

原文由 Renat Gilmanov 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题