Kotlin+SpringBoot连接MQTT频繁断开又成功连接、发送信息和回调经常失败,同样的代码用Java可以实现发送与接收MQTT信息
MQTT配置类
import com.atri.utils.log
import org.eclipse.paho.client.mqttv3.IMqttClient
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory
import org.springframework.integration.mqtt.core.MqttPahoClientFactory
@Configuration
class MqttConfig(@Value("\${mqtt.url:tcp://127.0.0.1:1883}") val url: String) {
// private val log = LoggerFactory.getLogger(this::class.java)
@Bean
fun mqttClientFactory(): MqttPahoClientFactory {
// 创建 MQTT 客户端工厂对象
val factory = DefaultMqttPahoClientFactory()
// 创建 MQTT 连接选项对象
val options = MqttConnectOptions()
// 设置连接超时时间,单位为秒
options.connectionTimeout = 10
// 设置心跳间隔,单位为秒
options.keepAliveInterval = 5
// 设置是否为清洁会话,true 表示每次连接时,服务器会清除之前的会话数据
options.isCleanSession = true
// 设置是否自动重连
options.isAutomaticReconnect = true
// 将配置好的连接选项设置到 MQTT 客户端工厂对象中
factory.connectionOptions = options
return factory;
}
// 创建 MQTT 客户端实例
@Bean
fun mqttClient(factory: MqttPahoClientFactory): IMqttClient {
// 生成包含当前时间的客户端 ID
val clientId = "${System.currentTimeMillis()}"
var client: IMqttClient? = null
try {
// 使用工厂对象和 URL 及生成的客户端 ID 获取 MQTT 客户端实例
client = factory.getClientInstance(url, clientId)
// 连接 MQTT
client.connect()
log().info("MQTT 客户端连接成功: $clientId")
} catch (e: Exception) {
// 通过扩展函数的工具类打印日志
log().error("MQTT 连接失败:${e}")
}
// !!:解包
return client!!
}
}
消息发送类
import com.atri.utils.log
import org.eclipse.paho.client.mqttv3.IMqttClient
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.springframework.stereotype.Component
@Component
class MqttPublisher(val mqtt: IMqttClient) {
val lock = Any()
fun publish(topic: String, qos: Int, payload: String) {
synchronized(lock) {
if (mqtt.isConnected == false) {
mqtt.reconnect()
}
synchronized(lock) {
// 构建消息体
val message: MqttMessage = MqttMessage(payload.toByteArray())
// 设置 QOS
message.qos = qos
try {
// 发送 MQTT 消息,发送失败则尝试重连
mqtt.publish(topic, message)
} catch (e: Exception) {
try {
// 重连 MQTT
mqtt.reconnect()
} catch (_: MqttException) {
}
log().error("发送 MQTT 消息失败,$e")
}
}
}
}
}
消息回调类
import com.atri.utils.log
import org.eclipse.paho.client.mqttv3.IMqttClient
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.springframework.stereotype.Component
import java.nio.charset.StandardCharsets
import java.util.concurrent.CopyOnWriteArrayList
@Component
// 类后接 : 表示实现接口
class MqttSubscriber(val mqtt: IMqttClient) : MqttCallbackExtended {
// 私有最终列表,用于存储主题
final val topics = CopyOnWriteArrayList<String>()
// 注意:以下方式定义的列表没有删除元素方法,即不能定义变量类型为 List<?>
// 因为 Kotlin 的原生 List 类型不带删除元素的方法
// final val topics: List<String> = CopyOnWriteArrayList<String>()
// 私有最终列表,用于存储 QOS
final val qosS = CopyOnWriteArrayList<Int>()
// 断线后触发,只会执行一次
// override:表示重写方法
override fun connectionLost(p0: Throwable?) {
TODO("Not yet implemented")
}
// 断线重连 MQTT 后触发
override fun connectComplete(connect: Boolean, msg: String?) {
(0 .. topics.size - 1).forEach {
// 重新订阅列表中的主题
try {
mqtt.subscribe(topics[it], qosS[it])
} catch (_: MqttException) {
}
}
}
// 处理接收到的消息回调
override fun messageArrived(topic: String?, message: MqttMessage?) {
// .let {...} :表示当对象/值不为空时,执行的代码块,it 即对象/值
message?.payload?.let { log().info("Received Message From $topic ==> ${String(it, StandardCharsets.UTF_8)}") }
}
// 只要 MQTT 有发送完成的消息,都会执行回调方法
override fun deliveryComplete(token: IMqttDeliveryToken?) {
TODO("Not yet implemented")
}
// 订阅一个主题
fun subscribeToTopic(topic: String, qos: Int?) {
if (topic !in topics) {
topics += topic
qosS += qos ?: 0
mqtt.subscribe(topic, this::messageArrived)
} else {
log().warn("已存在重复主题 $topic,订阅失败")
}
}
// 取消订阅一个主题
fun unsubscribeFromTopic(topic: String) {
val topicIndex = topics.indexOf(topic)
val qos = qosS[topicIndex]
try {
if (topics.remove(topic) && qosS.remove(qos)) {
mqtt.unsubscribe(topic)
} else {
log().error("取消订阅主题失败,主题名称:$topic")
}
} catch (_: MqttException) {
topics += topic
qosS += qos
log().error("MQTT 未连接,取消订阅主题失败")
}
}
}
测试类
import com.atri.test.Anime
import com.atri.utils.toPrettyString
import jakarta.annotation.PostConstruct
import org.eclipse.paho.client.mqttv3.IMqttClient
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
@Service
class MqttService(
val mqtt: IMqttClient,
val publisher: MqttPublisher,
val subscriber: MqttSubscriber
) {
// 此处不要使用 @Scheduled 定时任务,而是直接初始化一次
@PostConstruct
fun init() {
// 初始化订阅主题
subscriber.subscribeToTopic("topic-1", 1)
// 执行回调方法,此处是自定义的消息接收回调类
mqtt.setCallback(subscriber)
}
@Scheduled(fixedRate = 1000)
fun push() {
val animeMsg = Anime(id = 1, name = "败角女主太多了", type = "", heat = 100)
// 发送字符串消息
publisher.publish("topic-1", 0, "Hello World!")
// 发送 JSON 字符串消息
publisher.publish("topic-2", 0, animeMsg.toPrettyString())
}
}
Kotlin 的结果:
Java 的结果:
Kotlin+SpringBoot连接MQTT频繁断开又成功连接、发送信息和回调经常失败,同样的代码用Java可以实现发送与接收MQTT信息,怎么让服务不频繁断开MQTT连接