使用rabbitMQ作为vue项目获取数据的方法,每次调用后都会残留队列缓存在服务器上,这该怎么处理?

最开始进入页面就有6个接口,每次进主页都残留一些队列,使用channel.deleteQueue(replyToQueueInfo.queue)responsePromise后删除队列,程序就没有数据了剩下的接口也会卡住,加上定时器也没什么用。在其后的finally{}里面删除队列,有些删了有些还留在后台。

console.log(replyToQueueInfo.queue, 'consumeTag',consumeTag) 打印结果如下图
image.png

后台随机队列的缓存:
image.png

  1. rabbitMQ代码

    // 有临时队列
    const amqp = require('amqplib');
    const { v4: uuid } = require('uuid');
    
    let connection;
    let channel;
    
    
    const amqp_url = 'amqp://leadzone:LeadzoneC12@193.168.1.106:5672/vpp_system';
    async function initAmqp() {
     if (!connection) {
         connection = await amqp.connect(amqp_url);
         channel = await connection.createChannel();
     }
    }
    let consumeTag;
    let replyToQueue;
    let replyToQueueInfo;
    async function getQueueMsg(rkey, msg) {
     if (!channel) await initAmqp();
    
     try {
         const correlationId = uuid();
         replyToQueueInfo = await channel.assertQueue('', { exclusive: false });
         const replyTo = replyToQueueInfo.queue;
         let consumerResolved = false; // 标志变量用于防止多次解决Promise
    
         // 创建一个Promise等待RPC响应
         const responsePromise = new Promise((resolve, reject) => {
             // 确保从返回对象中正确提取consumerTag
             const consumeResult = channel.consume(replyTo, async (message) => {
                 if (!consumerResolved && message && message.properties.correlationId === correlationId) {
                     try {
                         const response = JSON.parse(message.content.toString());
                         resolve(response);
                         consumerResolved = true; // 解决Promise后设置标志
                         channel.ack(message); // 立即确认消息
                     } catch (err) {
                         reject(err);
                     } finally {
                         if (consumeTag) {
                             channel.cancel(consumeTag).catch(console.warn); // 取消消费者
                         }
                     }
                 } else {
                     // 如果已经有一个响应,则忽略后续消息并立即确认
                     if (message) {
                         channel.ack(message);
                     }
                 }
             }, { noAck: false });
    
             consumeTag = consumeResult.consumerTag; // 获取消费者标签
             replyToQueue = replyToQueueInfo.queue; // 保存队列名称
         });
         console.log(replyToQueueInfo.queue, 'consumeTag',consumeTag);
    
         channel.publish('', rkey, Buffer.from(JSON.stringify(msg)), {
             correlationId,
             replyTo,
         });
        
             channel.deleteQueue(replyToQueueInfo.queue); // 清除起始连接通道中的临时队列
        
         
         return await responsePromise;
     } catch (err) {
         console.error(err);
         throw err;
     } finally {
         if (consumeTag) {
             try {
                 await channel.cancel(consumeTag); // 取消消费者
                 if (replyToQueue) {
                     await channel.deleteQueue(replyToQueue); // 删除队列
                 }
                 
             } catch (cancelErr) {
                 console.warn('Failed to clean up resources:', cancelErr);
             }
         } else {
             try {
                await channel.deleteQueue(replyToQueueInfo.queue); // 清除起始连接通道中的临时队列
                // if (channel) {
                 //     await channel.close(); // 关闭通道
                 // }
                 // if (connection) {
                 //     await connection.close(); // 关闭连接
                 // }
             } catch (cancelErr) {
                 console.warn('Failed to clean up resources:', cancelErr);
             }
         }
     }
    }
    
    // 在应用退出时确保关闭连接
    process.on('exit', () => {
     if (connection) connection.close();
     channel.deleteQueue(replyToQueue); // 删除队列
    });
    
    export {
     getQueueMsg
    };
  2. vue界面调用接口方法,通过改变method和data进行接口变更

    await getQueueMsg("DbiViewServer", {
     api: "DbiViewServer",
     method: "CreateFESMeasure",
     data: [
       {
         meas_list: aMl,
         link_id: state.radioVal,
       },
     ],
      }).then((res) => {
     console.log(res);
      });
阅读 390
avatarAI BotBETA

回答

问题中描述的现象表明,队列在消息处理完成后被过早删除,或者在某些情况下,删除操作未能正确执行。这通常是因为在消息处理流程中,队列删除时机选择不当。

解决方案:

  1. 确保队列在消息完全处理后再删除
    确保在 responsePromise 解决(即消息完全处理)后再删除队列。你的代码中在 responsePromise 返回之前调用了 channel.deleteQueue(replyToQueueInfo.queue);,这会导致队列在消息处理完成前就被删除。
  2. 调整队列删除位置
    将队列删除操作移到 responsePromise.then().finally() 块之后。这样可以确保只有在消息完全处理并且响应被接收后才删除队列。

修改后的代码:

async function getQueueMsg(rkey, msg) {
    if (!channel) await initAmqp();

    try {
        const correlationId = uuid();
        replyToQueueInfo = await channel.assertQueue('', { exclusive: false });
        const replyTo = replyToQueueInfo.queue;
        let consumerResolved = false; // 标志变量用于防止多次解决Promise

        // 创建一个Promise等待RPC响应
        const responsePromise = new Promise((resolve, reject) => {
            const consumeResult = channel.consume(replyTo, async (message) => {
                if (!consumerResolved && message && message.properties.correlationId === correlationId) {
                    try {
                        const response = JSON.parse(message.content.toString());
                        resolve(response);
                        consumerResolved = true; // 解决Promise后设置标志
                        channel.ack(message); // 立即确认消息
                    } catch (err) {
                        reject(err);
                    } finally {
                        if (consumeTag) {
                            channel.cancel(consumeTag).catch(console.warn); // 取消消费者
                        }
                    }
                } else {
                    if (message) {
                        channel.ack(message);
                    }
                }
            }, { noAck: false });

            consumeTag = consumeResult.consumerTag; // 获取消费者标签
            replyToQueue = replyToQueueInfo.queue; // 保存队列名称
        });

        console.log(replyToQueueInfo.queue, 'consumeTag', consumeTag);

        channel.publish('', rkey, Buffer.from(JSON.stringify(msg)), {
            correlationId,
            replyTo,
        });

        // 等待响应Promise解决后再删除队列
        await responsePromise;

    } catch (err) {
        console.error(err);
        throw err;
    } finally {
        if (consumeTag) {
            try {
                await channel.cancel(consumeTag); // 取消消费者
                if (replyToQueue) {
                    await channel.deleteQueue(replyToQueue); // 删除队列
                }
            } catch (cancelErr) {
                console.warn('Failed to clean up resources:', cancelErr);
            }
        } else {
            // 如果没有consumeTag,则直接删除初始化的临时队列(这种情况应该很少发生)
            try {
                await channel.deleteQueue(replyToQueueInfo.queue); // 清除起始连接通道中的临时队列
            } catch (cancelErr) {
                console.warn('Failed to clean up resources:', cancelErr);
            }
        }
    }
}

注意:

  • 确保 await responsePromise;finally 块之前执行,以便在删除队列之前完成所有消息处理。
  • finally 块中检查 consumeTagreplyToQueue,确保它们被正确设置并用于资源清理。

通过上述修改,应该可以解决队列残留的问题。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
宣传栏