Kotlin+SpringBoot连接MQTT频繁断开又成功连接、发送信息和回调经常失败,怎么让服务稳定连接MQTT?

新手上路,请多包涵

Kotlin+SpringBoot连接MQTT频繁断开又成功连接、发送信息和回调经常失败,同样的代码用Java可以实现发送与接收MQTT信息

MQTT配置类

import com.atri.utils.log
import org.eclipse.paho.client.mqttv3.IMqttClient
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory
import org.springframework.integration.mqtt.core.MqttPahoClientFactory

@Configuration
class MqttConfig(@Value("\${mqtt.url:tcp://127.0.0.1:1883}") val url: String) {
    // private val log = LoggerFactory.getLogger(this::class.java)

    @Bean
    fun mqttClientFactory(): MqttPahoClientFactory {
        // 创建 MQTT 客户端工厂对象
        val factory = DefaultMqttPahoClientFactory()
        // 创建 MQTT 连接选项对象
        val options = MqttConnectOptions()
        // 设置连接超时时间,单位为秒
        options.connectionTimeout = 10
        // 设置心跳间隔,单位为秒
        options.keepAliveInterval = 5
        // 设置是否为清洁会话,true 表示每次连接时,服务器会清除之前的会话数据
        options.isCleanSession = true
        // 设置是否自动重连
        options.isAutomaticReconnect = true
        // 将配置好的连接选项设置到 MQTT 客户端工厂对象中
        factory.connectionOptions = options
        return factory;
    }

    // 创建 MQTT 客户端实例
    @Bean
    fun mqttClient(factory: MqttPahoClientFactory): IMqttClient {
        // 生成包含当前时间的客户端 ID
        val clientId = "${System.currentTimeMillis()}"
        var client: IMqttClient? = null
        try {
            // 使用工厂对象和 URL 及生成的客户端 ID 获取 MQTT 客户端实例
            client = factory.getClientInstance(url, clientId)
            // 连接 MQTT
            client.connect()
            log().info("MQTT 客户端连接成功: $clientId")
        } catch (e: Exception) {
            // 通过扩展函数的工具类打印日志
            log().error("MQTT 连接失败:${e}")
        }
        // !!:解包
        return client!!
    }
}

消息发送类

import com.atri.utils.log
import org.eclipse.paho.client.mqttv3.IMqttClient
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.springframework.stereotype.Component

@Component
class MqttPublisher(val mqtt: IMqttClient) {

    val lock = Any()

    fun publish(topic: String, qos: Int, payload: String) {
        synchronized(lock) {
            if (mqtt.isConnected == false) {
                mqtt.reconnect()
            }
            synchronized(lock) {
                // 构建消息体
                val message: MqttMessage = MqttMessage(payload.toByteArray())
                // 设置 QOS
                message.qos = qos
                try {
                    // 发送 MQTT 消息,发送失败则尝试重连
                    mqtt.publish(topic, message)
                } catch (e: Exception) {
                    try {
                        // 重连 MQTT
                        mqtt.reconnect()
                    } catch (_: MqttException) {

                    }
                    log().error("发送 MQTT 消息失败,$e")
                }
            }
        }
    }
}

消息回调类

import com.atri.utils.log
import org.eclipse.paho.client.mqttv3.IMqttClient
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.springframework.stereotype.Component
import java.nio.charset.StandardCharsets
import java.util.concurrent.CopyOnWriteArrayList

@Component
// 类后接 : 表示实现接口
class MqttSubscriber(val mqtt: IMqttClient) : MqttCallbackExtended {
    // 私有最终列表,用于存储主题
    final val topics = CopyOnWriteArrayList<String>()

    // 注意:以下方式定义的列表没有删除元素方法,即不能定义变量类型为 List<?>
    // 因为 Kotlin 的原生 List 类型不带删除元素的方法
    // final val topics: List<String> = CopyOnWriteArrayList<String>()

    // 私有最终列表,用于存储 QOS
    final val qosS = CopyOnWriteArrayList<Int>()

    // 断线后触发,只会执行一次
    // override:表示重写方法
    override fun connectionLost(p0: Throwable?) {
        TODO("Not yet implemented")
    }

    // 断线重连 MQTT 后触发
    override fun connectComplete(connect: Boolean, msg: String?) {
        (0 .. topics.size - 1).forEach {
            // 重新订阅列表中的主题
            try {
                mqtt.subscribe(topics[it], qosS[it])
            } catch (_: MqttException) {

            }
        }
    }

    // 处理接收到的消息回调
    override fun messageArrived(topic: String?, message: MqttMessage?) {
        // .let {...} :表示当对象/值不为空时,执行的代码块,it 即对象/值
        message?.payload?.let { log().info("Received Message From $topic ==> ${String(it, StandardCharsets.UTF_8)}") }
    }

    // 只要 MQTT 有发送完成的消息,都会执行回调方法
    override fun deliveryComplete(token: IMqttDeliveryToken?) {
        TODO("Not yet implemented")
    }

    // 订阅一个主题
    fun subscribeToTopic(topic: String, qos: Int?) {
        if (topic !in topics) {
            topics += topic
            qosS += qos ?: 0
            mqtt.subscribe(topic, this::messageArrived)
        } else {
            log().warn("已存在重复主题 $topic,订阅失败")
        }
    }

    // 取消订阅一个主题
    fun unsubscribeFromTopic(topic: String) {
        val topicIndex = topics.indexOf(topic)
        val qos = qosS[topicIndex]
        try {
            if (topics.remove(topic) && qosS.remove(qos)) {
                mqtt.unsubscribe(topic)
            } else {
                log().error("取消订阅主题失败,主题名称:$topic")
            }
        } catch (_: MqttException) {
            topics += topic
            qosS += qos
            log().error("MQTT 未连接,取消订阅主题失败")
        }
    }
}

测试类

import com.atri.test.Anime
import com.atri.utils.toPrettyString
import jakarta.annotation.PostConstruct
import org.eclipse.paho.client.mqttv3.IMqttClient
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service

@Service
class MqttService(
    val mqtt: IMqttClient,
    val publisher: MqttPublisher,
    val subscriber: MqttSubscriber
) {
    // 此处不要使用 @Scheduled 定时任务,而是直接初始化一次
    @PostConstruct
    fun init() {
        // 初始化订阅主题
        subscriber.subscribeToTopic("topic-1", 1)
        // 执行回调方法,此处是自定义的消息接收回调类
        mqtt.setCallback(subscriber)
    }

    @Scheduled(fixedRate = 1000)
    fun push() {
        val animeMsg = Anime(id = 1, name = "败角女主太多了", type = "", heat = 100)
        // 发送字符串消息
        publisher.publish("topic-1", 0, "Hello World!")
        // 发送 JSON 字符串消息
        publisher.publish("topic-2", 0, animeMsg.toPrettyString())
    }
}

Kotlin 的结果:
image.png
Java 的结果:
image.png

Kotlin+SpringBoot连接MQTT频繁断开又成功连接、发送信息和回调经常失败,同样的代码用Java可以实现发送与接收MQTT信息,怎么让服务不频繁断开MQTT连接

阅读 556
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
宣传栏