MQTT消费者接收的数据会出现丢失吗?

MQTT这边有个消费端,订阅了对应主题,messageArrived接收订阅的消息。handle方法就是不走数据库直接插入到redis,后期再消费。现在五千个设备,如果每隔1小时同时向MQTT这边上报数据,消费端这边设置了10个线程,这种写法会丢失数据吗?

ExecutorService executorService = Executors.newFixedThreadPool(10);
public synchronized void messageArrived (final String topic, MqttMessage message ) throws Exception
{
    final String msg = new String(message.getPayload());
//        System.err.println("【MQTT-消费端】接收消息主题 : " + topic);
//        System.err.println("【MQTT-消费端】接收消息内容 : " + msg);
    executorService.execute(new Runnable() {
        public void run() {
            handle(topic,msg);
        }
    });
}
阅读 1.7k
2 个回答

结果:本地模拟测试了,不会丢失

生产者 :这边作为客户端也连接进mqtt,for循环5000个发送消息至对应主题

for (int x=0;x<5000;x++){
            int finalX = x;
            new Thread(() -> mqttPushClient.publish("ruby", String.valueOf(finalX))).start();

        }

消费者:加了Thread.sleep(1000);模拟插入到redis

ExecutorService executorService = Executors.newFixedThreadPool(10);

    private static AtomicInteger num = new AtomicInteger(0);
 @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {

        log.info("接收消息主题 : " + topic);
        log.info("接收消息Qos : " + message.getQos());
        String s = new String(message.getPayload());
        log.info("接收消息内容 : " + s);
        executorService.execute(() -> {
            // 处理接收到的消息
            handler(s);
        });
    }

    private void handler(String s) {
        try {
            Thread.sleep(1000);
            num.incrementAndGet();
            System.err.println(num.get());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

最后一个线程走完正好5000,生产者很快就发完了,消费者睡一秒导致整体这消费速度有点慢
image.png

如果是服务端作为一个mqtt客户端去消费mqtt消息,如果消息收到了,再保存,正常情况下都是不会丢的。

丢消息的情况可能是出在,程序没有订阅mqtt topic的时候,有客户端给这个topic发送了消息,这部份消息,如果程序在这段时间后面再进行订阅,是拿不到的(丢了)。

还有一个,可能要看你订阅topic是,使用的是哪种Qos,使用不好的话,可能也会出现重复或者丢失消息的情况。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题