最开始进入页面就有6个接口,每次进主页都残留一些队列,使用channel.deleteQueue(replyToQueueInfo.queue)
在responsePromise
后删除队列,程序就没有数据了剩下的接口也会卡住,加上定时器也没什么用。在其后的finally{}里面删除队列,有些删了有些还留在后台。
console.log(replyToQueueInfo.queue, 'consumeTag',consumeTag)
打印结果如下图
后台随机队列的缓存:
rabbitMQ代码
// 有临时队列 const amqp = require('amqplib'); const { v4: uuid } = require('uuid'); let connection; let channel; const amqp_url = 'amqp://leadzone:LeadzoneC12@193.168.1.106:5672/vpp_system'; async function initAmqp() { if (!connection) { connection = await amqp.connect(amqp_url); channel = await connection.createChannel(); } } let consumeTag; let replyToQueue; let replyToQueueInfo; async function getQueueMsg(rkey, msg) { if (!channel) await initAmqp(); try { const correlationId = uuid(); replyToQueueInfo = await channel.assertQueue('', { exclusive: false }); const replyTo = replyToQueueInfo.queue; let consumerResolved = false; // 标志变量用于防止多次解决Promise // 创建一个Promise等待RPC响应 const responsePromise = new Promise((resolve, reject) => { // 确保从返回对象中正确提取consumerTag const consumeResult = channel.consume(replyTo, async (message) => { if (!consumerResolved && message && message.properties.correlationId === correlationId) { try { const response = JSON.parse(message.content.toString()); resolve(response); consumerResolved = true; // 解决Promise后设置标志 channel.ack(message); // 立即确认消息 } catch (err) { reject(err); } finally { if (consumeTag) { channel.cancel(consumeTag).catch(console.warn); // 取消消费者 } } } else { // 如果已经有一个响应,则忽略后续消息并立即确认 if (message) { channel.ack(message); } } }, { noAck: false }); consumeTag = consumeResult.consumerTag; // 获取消费者标签 replyToQueue = replyToQueueInfo.queue; // 保存队列名称 }); console.log(replyToQueueInfo.queue, 'consumeTag',consumeTag); channel.publish('', rkey, Buffer.from(JSON.stringify(msg)), { correlationId, replyTo, }); channel.deleteQueue(replyToQueueInfo.queue); // 清除起始连接通道中的临时队列 return await responsePromise; } catch (err) { console.error(err); throw err; } finally { if (consumeTag) { try { await channel.cancel(consumeTag); // 取消消费者 if (replyToQueue) { await channel.deleteQueue(replyToQueue); // 删除队列 } } catch (cancelErr) { console.warn('Failed to clean up resources:', cancelErr); } } else { try { await channel.deleteQueue(replyToQueueInfo.queue); // 清除起始连接通道中的临时队列 // if (channel) { // await channel.close(); // 关闭通道 // } // if (connection) { // await connection.close(); // 关闭连接 // } } catch (cancelErr) { console.warn('Failed to clean up resources:', cancelErr); } } } } // 在应用退出时确保关闭连接 process.on('exit', () => { if (connection) connection.close(); channel.deleteQueue(replyToQueue); // 删除队列 }); export { getQueueMsg };
vue界面调用接口方法,通过改变method和data进行接口变更
await getQueueMsg("DbiViewServer", { api: "DbiViewServer", method: "CreateFESMeasure", data: [ { meas_list: aMl, link_id: state.radioVal, }, ], }).then((res) => { console.log(res); });