我的rocketmq配置是双主双从,异步模式的。我发现一个问题,就是我使用广播模式消费消息,消息有被消费者消费,但是rocketmq-console控制台依旧显示消息没被消费掉(消息堆积)。
我的生产者代码如下
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址
producer.setNamesrvAddr("x.x.x.x:9876;x.x.x.x:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("springboot-mq", "Tag1", ("Hello World" + i).getBytes());
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result);
//线程睡1秒
TimeUnit.SECONDS.sleep(1);
}
//6.关闭生产者producer
// producer.shutdown();
}
消费者代码如下:
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
consumer.setNamesrvAddr("x.x.x.x:9876;x.x.x.x:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("springboot-mq", "*");
//设定消费模式:负载均衡|广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容cc
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
}
生产者把消息发送到rocketmq,消费者都可以消费到消息。但是消费者消费完消息后,rocketmq-console控制台依旧没有显示消息被消费掉(消息堆积)。如果把消费者采用集群模式消费,消息被消费后,rocektmq-console正常,没有消息堆积。
下面是我的rocketmq-console相关截图:
我的印象中。 广播模式,消费进度貌似是存储在客户端~~~ 从 console 上看, 是看 broker 中的消费进度吧~~~~