MQTT这边有个消费端,订阅了对应主题,messageArrived接收订阅的消息。handle方法就是不走数据库直接插入到redis,后期再消费。现在五千个设备,如果每隔1小时同时向MQTT这边上报数据,消费端这边设置了10个线程,这种写法会丢失数据吗?
ExecutorService executorService = Executors.newFixedThreadPool(10);
public synchronized void messageArrived (final String topic, MqttMessage message ) throws Exception
{
final String msg = new String(message.getPayload());
// System.err.println("【MQTT-消费端】接收消息主题 : " + topic);
// System.err.println("【MQTT-消费端】接收消息内容 : " + msg);
executorService.execute(new Runnable() {
public void run() {
handle(topic,msg);
}
});
}
结果:本地模拟测试了,不会丢失
生产者 :这边作为客户端也连接进mqtt,for循环5000个发送消息至对应主题
消费者:加了Thread.sleep(1000);模拟插入到redis
最后一个线程走完正好5000,生产者很快就发完了,消费者睡一秒导致整体这消费速度有点慢
