rocketmq 广播模式消费,消费者消费消息后,rocketmq-console依旧显示消息堆积

月夜归醉
  • 2
新手上路,请多包涵

我的rocketmq配置是双主双从,异步模式的。我发现一个问题,就是我使用广播模式消费消息,消息有被消费者消费,但是rocketmq-console控制台依旧显示消息没被消费掉(消息堆积)。

我的生产者代码如下

public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("x.x.x.x:9876;x.x.x.x:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("springboot-mq", "Tag1", ("Hello World" + i).getBytes());
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus status = result.getSendStatus();

            System.out.println("发送结果:" + result);

            //线程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }

        //6.关闭生产者producer
       // producer.shutdown();
    }

消费者代码如下:

    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("x.x.x.x:9876;x.x.x.x:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("springboot-mq", "*");

        //设定消费模式:负载均衡|广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容cc
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
    }

生产者把消息发送到rocketmq,消费者都可以消费到消息。但是消费者消费完消息后,rocketmq-console控制台依旧没有显示消息被消费掉(消息堆积)。如果把消费者采用集群模式消费,消息被消费后,rocektmq-console正常,没有消息堆积。

下面是我的rocketmq-console相关截图:
image
image

对应的源码:https://gitee.com/brozer/rock...

评论
阅读 716
1 个回答

我的印象中。 广播模式,消费进度貌似是存储在客户端~~~ 从 console 上看, 是看 broker 中的消费进度吧~~~~

撰写回答

登录后参与交流、获取后续更新提醒

宣传栏