node.js使用rabbitmq造成的消息队列堵塞

问题描述

使用分布式爬虫使用代理ip进行异步爬取,使用了NODE的amqplib库以单爬虫作为消费者,接收到消息后爬取相应页面然后入库,一台机器上使用PM2多开进程,同一台机器有多个消费者进行消息接收爬取,中间会出现大量的页面请求失败,请求失败后以nack作为处理,问题是随着运行时间的增加,接收消息的速度会越来越慢,在重启所有爬虫之后,速度就会提高,对rabbitmq的机制不是太了解,请问大佬为什么会出现这类的现象

具体代码如下
//绑定消息队列
async function run() {

//配置MQ
ch = await amqpcfg.createChannel(configs)
//传送客户端信息
let consumeParam = configs.ampq.Queue.Param
//绑定队列
let receiveQueueName = configs.ampq.Queue.receive.QueueName
//绑定回调
ch.consume(receiveQueueName, async(msg) => {
    try {
        doWork(msg)
    } catch (e) {
        console.log('\033[47;31m Work执行失败,异常未捕获,参数: \033[40;31m ' +e.stack  + '\033[0m')
    }
}, consumeParam)
    

}
//消息队列回调,这是接收到消息之后的回调,HotelsSpider为具体的异步爬取的网络请求
async function doWork(msg) {

let messageId = msg.properties.messageId
let Task = JSON.parse(transToString(msg.content))
//console.log(Task)
//参数检查
if (!Task.SpiderParams.PlatHotelNum || !Task.SpiderParams.TargerDate) {
    ch.ack(msg)
    await ResultTask(messageId, Task, '参数不完整', 4)
    return
}

//扩展参数为1~4人
let param = {
    hotelId: Task.SpiderParams.PlatHotelNum,
    day: getAfterDay(Task.SpiderParams.TargerDate, 0),
    GuestNum: Task.SpiderParams.GuestNum
}
//console.log(` Received ${JSON.stringify(param)} `)

let details
try {
    details = await HotelsSpider(messageId, param) // await taskQueue(messageId, param)
            Task.CompleteTime = moment().format()
            Task.State = (details && details.length ? 2 : 3)
            
                    //入mongoDB
                    let mId = await MongoDbInsertOne({
                            "Task": Task,
                            "details": details
                    })
                    Task.DateID = mId
            //}
            //确认消息
            ch.ack(msg)
            await ResultTask(messageId, Task, '', Task.State)
    //console.log('details', details.length)
} catch (e) {
    let error = `${param.hotelId}-${param.day}-${param.GuestNum},message:${e.message.substr(0,100)}`
    console.log('\033[47;31m 网页请求错误,参数: \033[40;31m ' + error + '\033[0m')
    ch.nack(msg);
    errorCount++
    return
}
resultNum += 1
console.log(`速率--${getProcessRate(startTime, resultNum)}个/分,${moment().format('LT')},查询到价格:{${details.length}},条件:${param.hotelId}-${param.day}-${param.GuestNum}`)
details = null

}
//HotelsSpider中具体网络请求如下
async function getHttpResp(hotelId, ins, out, adults) {

try {
    let url = `${configs.URL.base}/h${hotelId}.Hotel-Information`
    let params = {
        'chkin': ins,
        'chkout': out,
        'adults': adults,
        'mctc': 2
    }

    url = url + '?' + qs.stringify(params)
    let proxy = `http://${global.proxy.ip}:${global.proxy.port}`
    let res = await request_Hotels(url, proxy)
    let html = res.toString()
    return getExpediaHotelInfoByHtml(html)
} catch (e) {
    let error = `${hotelId},${ins},${out},${adults}`
    let message = e.message
    console.log('\033[47;31m 网页请求错误,参数: \033[40;31m ' + error + '\033[0m')
    
    let errorFilter = errorCodeArray.find(item => {
        return message.indexOf(item) !== -1
    })
    if(errorFilter){
        errorTimeCount(30)
    }
    //放弃异常捕获,上退异常以便退回队列
    throw e
}

}

网络请求的时间不会太长,使用的request-promise库,超时时间设置在了10秒左右,但是运行个几十分钟接收消息的速度就会明显减慢,尝试过一台机器只开一个爬虫,加大消息接收数,在开始的时候速度很快,但是运行一段时间后也会出现堵塞问题

阅读 2.8k
1 个回答

怕是一直在消费消息不管有没有处理完,加上 ch.prefetch(10)试试。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题