问题描述
使用分布式爬虫使用代理ip进行异步爬取,使用了NODE的amqplib库以单爬虫作为消费者,接收到消息后爬取相应页面然后入库,一台机器上使用PM2多开进程,同一台机器有多个消费者进行消息接收爬取,中间会出现大量的页面请求失败,请求失败后以nack作为处理,问题是随着运行时间的增加,接收消息的速度会越来越慢,在重启所有爬虫之后,速度就会提高,对rabbitmq的机制不是太了解,请问大佬为什么会出现这类的现象
具体代码如下
//绑定消息队列
async function run() {
//配置MQ
ch = await amqpcfg.createChannel(configs)
//传送客户端信息
let consumeParam = configs.ampq.Queue.Param
//绑定队列
let receiveQueueName = configs.ampq.Queue.receive.QueueName
//绑定回调
ch.consume(receiveQueueName, async(msg) => {
try {
doWork(msg)
} catch (e) {
console.log('\033[47;31m Work执行失败,异常未捕获,参数: \033[40;31m ' +e.stack + '\033[0m')
}
}, consumeParam)
}
//消息队列回调,这是接收到消息之后的回调,HotelsSpider为具体的异步爬取的网络请求
async function doWork(msg) {
let messageId = msg.properties.messageId
let Task = JSON.parse(transToString(msg.content))
//console.log(Task)
//参数检查
if (!Task.SpiderParams.PlatHotelNum || !Task.SpiderParams.TargerDate) {
ch.ack(msg)
await ResultTask(messageId, Task, '参数不完整', 4)
return
}
//扩展参数为1~4人
let param = {
hotelId: Task.SpiderParams.PlatHotelNum,
day: getAfterDay(Task.SpiderParams.TargerDate, 0),
GuestNum: Task.SpiderParams.GuestNum
}
//console.log(` Received ${JSON.stringify(param)} `)
let details
try {
details = await HotelsSpider(messageId, param) // await taskQueue(messageId, param)
Task.CompleteTime = moment().format()
Task.State = (details && details.length ? 2 : 3)
//入mongoDB
let mId = await MongoDbInsertOne({
"Task": Task,
"details": details
})
Task.DateID = mId
//}
//确认消息
ch.ack(msg)
await ResultTask(messageId, Task, '', Task.State)
//console.log('details', details.length)
} catch (e) {
let error = `${param.hotelId}-${param.day}-${param.GuestNum},message:${e.message.substr(0,100)}`
console.log('\033[47;31m 网页请求错误,参数: \033[40;31m ' + error + '\033[0m')
ch.nack(msg);
errorCount++
return
}
resultNum += 1
console.log(`速率--${getProcessRate(startTime, resultNum)}个/分,${moment().format('LT')},查询到价格:{${details.length}},条件:${param.hotelId}-${param.day}-${param.GuestNum}`)
details = null
}
//HotelsSpider中具体网络请求如下
async function getHttpResp(hotelId, ins, out, adults) {
try {
let url = `${configs.URL.base}/h${hotelId}.Hotel-Information`
let params = {
'chkin': ins,
'chkout': out,
'adults': adults,
'mctc': 2
}
url = url + '?' + qs.stringify(params)
let proxy = `http://${global.proxy.ip}:${global.proxy.port}`
let res = await request_Hotels(url, proxy)
let html = res.toString()
return getExpediaHotelInfoByHtml(html)
} catch (e) {
let error = `${hotelId},${ins},${out},${adults}`
let message = e.message
console.log('\033[47;31m 网页请求错误,参数: \033[40;31m ' + error + '\033[0m')
let errorFilter = errorCodeArray.find(item => {
return message.indexOf(item) !== -1
})
if(errorFilter){
errorTimeCount(30)
}
//放弃异常捕获,上退异常以便退回队列
throw e
}
}
网络请求的时间不会太长,使用的request-promise库,超时时间设置在了10秒左右,但是运行个几十分钟接收消息的速度就会明显减慢,尝试过一台机器只开一个爬虫,加大消息接收数,在开始的时候速度很快,但是运行一段时间后也会出现堵塞问题
怕是一直在消费消息不管有没有处理完,加上
ch.prefetch(10)
试试。