RabbitMQ Pika 连接重置,(-1, ConnectionResetError(104, 'Connection reset by peer'))

新手上路,请多包涵

通过 stackoverflow 搜索并发布这个问题,因为没有解决方案对我有用,我的问题可能与其他问题不同。

我正在编写一个脚本,它从 rabbitMQ 队列中获取一篇文章并处理该文章以计算单词数并从中提取关键词并将其转储到数据库中。我的脚本工作正常,但执行一段时间后出现此异常

(-1, "ConnectionResetError(104, 'Connection reset by peer')")

我不知道为什么我会得到这个。我在 stackover flow 上尝试了很多可用的解决方案,但没有一个对我有用。我已经编写了我的脚本并以两种不同的方式进行了尝试。两者都工作正常但一段时间后发生相同的异常。

这是我的第一个代码:

 def app_main():

    global channel, results, speedvars
    Logger.log_message('Starting app main')

    # Edit 4
    def pika_connect():
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=Config.AMQ_DAEMONS['base']['amq-host']))
        channel = connection.channel()
        print ("In pika connect")
        Logger.log_message('Setting up input queue consumer')
        channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
        channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

        Logger.log_message('Starting loop')
        channel.start_consuming()

    #########

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()

    # Edit 5 starting 10 threads to listen to pika

    for th in range(qthreads):
        Logger.log_message('Starting thread: '+str(th))
        try:
            t = Thread(target=pika_connect, args=())
            t.start()
        except Exception as e:
            Logger.error_message("Exception in starting threads " + str(e))

try:
    app_main()
except Exception as e:
    Logger.error_message("Exception in APP MAIN " + str(e))

这是我的第二个代码:

 def app_main():

    global channel, results, speedvars
    Logger.log_message('Starting app main')

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()

    connection = pika.BlockingConnection(pika.ConnectionParameters(
             host=Config.AMQ_DAEMONS['base']['amq-host']))
    channel = connection.channel()
    print ("In app main")
    Logger.log_message('Setting up input queue consumer')
    channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
    channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

    Logger.log_message('Starting loop')

    try:
        channel.start_consuming()
    except Exception as e:
        Logger.error_message("Exception in start_consuming in main " + str(e))
        raise e

try:
    app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))

在我的第一个代码中,我使用了线程,因为我想加快处理文章的过程。

这是我的回电功能

def on_message(ch, method, properties, message): Logger.log_message("Starting parsing new msg ") handle_message(message)

编辑:完整代码

import os
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)

from Modules import Logger
import pika
import Config
import json
import pickle
import Pipeline
import sys
import time
import datetime
import threading
import queue
import functools

from pid.decorator import pidfile

Logger.log_init(Config.AMQ_DAEMONS['consumer']['log-ident'])
#qthreads = Config.AMQ_DAEMONS['consumer']['threads']
results = queue.Queue()
channel = None
speedvars = None

SPD_RECEIVED = 'received'
SPD_DISCARDED = 'discarded'
SPD_SENT = 'sent'

class SpeedVars(object):
    vars = {}
    lock = None

    def __init__(self):
        self.lock = threading.Lock()

    def inc(self, var):

        self.lock.acquire()
        try:
            if var in self.vars:
                self.vars[var] += 1
            else:
                self.vars[var] = 1
        finally:
            self.lock.release()

    def dec(self, var):

        self.lock.acquire()
        try:
            if var in self.vars:
                self.vars[var] -= 1
            else:
                Logger.error_message('Cannot decrement ' + var + ', not tracked')
        finally:
            self.lock.release()

    def get(self, var):

        out = None
        self.lock.acquire()
        try:
            if var in self.vars:
                out = self.vars[var]
            else:
                Logger.error_message('Cannot get ' + var + ', not tracked')
        finally:
            self.lock.release()

        return out

    def get_all(self):

        out = None
        self.lock.acquire()
        try:
            out = self.vars.copy()
        finally:
            self.lock.release()

        return out

class SpeedTracker(threading.Thread):
    speedvars = None
    start_ts = None
    last_vars = {}

    def __init__(self, speedvars):
        super(SpeedTracker, self).__init__()
        self.start_ts = time.time()
        self.speedvars = speedvars
        Logger.log_message('Setting up speed tracker')

    def run(self):
        while True:
            time.sleep(Config.AMQ_DAEMONS['consumer']['speed-tracking-interval'])
            prev = self.last_vars
            cur = self.speedvars.get_all()
            now = time.time()
            if len(prev) > 0:
                q = {}
                for key in cur:
                    qty = cur[key] - prev[key]
                    avg = qty / Config.AMQ_DAEMONS['consumer']['speed-tracking-interval']
                    overall_avg = cur[key] / (now - self.start_ts)
                    Logger.log_message('Speed-tracking (' + key + '): total ' + str(cur[key])
                                       + ', delta ' + str(qty) + ', speed ' + '%0.2f' % avg + '/sec, '
                                       + ', overall speed ' + '%0.2f' % overall_avg + '/sec')
                pending = cur[SPD_RECEIVED] - cur[SPD_DISCARDED] - cur[SPD_SENT]
                pending_avg = pending / (now - self.start_ts)
                Logger.log_message('Speed-tracking (pending): total ' + str(pending)
                                   + ', overall speed ' + '%0.2f' % pending_avg + '/sec')
            self.last_vars = cur

class ResultsSender(threading.Thread):
    channel = None
    results = None
    speedvars = None

    def __init__(self, results, speedvars):
        super(ResultsSender, self).__init__()
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=Config.AMQ_DAEMONS['base']['amq-host']))
        self.channel = connection.channel()
        Logger.log_message('Setting up output exchange')
        self.channel.exchange_declare(exchange=Config.AMQ_DAEMONS['consumer']['output'], exchange_type='direct')
        self.results = results
        self.speedvars = speedvars

    def run(self):
        while True:
            item = self.results.get()
            self.channel.basic_publish(
                exchange=Config.AMQ_DAEMONS['consumer']['output'],
                routing_key='',
                body=item)
            self.speedvars.inc(SPD_SENT)

def parse_message(message):
    try:
        bodytxt = message.decode('UTF-8')
        body = json.loads(bodytxt)
        return body
    except Exception as e:
        Logger.error_message("Cannot parse message - " + str(e))
        raise e

def get_body_elements(body):
    try:
        artid = str(body.get('article_id'))
        article_dt = datetime.datetime.fromtimestamp(body.get('pubTime'))
        date = article_dt.strftime(Config.DATE_FORMAT)
        article = "\n".join([body.get('title', ''), body.get('subheading', ''), body.get('content', '')])
        return (artid, date, article)
    except Exception as e:
        Logger.error_message("Cannot retrieve article attributes " + str(e))
        raise e

def process_article(id, date, text):
    global results, speedvars
    try:
        Logger.log_message('Processing article ' + id)
        keywords = Pipeline.extract_keywords(text)
        send_data = {"id": id, "date": date, "keywords": keywords}
        results.put(pickle.dumps(send_data))
        # print('Queue Size:',results.qsize())
    except Exception as e:
        Logger.error_message("Problem processing article " + str(e))
        raise e

def ack_message(ch, delivery_tag):
    """Note that `channel` must be the same pika channel instance via which
    the message being ACKed was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        channel.basic_ack(delivery_tag)
    else:
        Logger.error_message("Channel is already closed, so we can't ACK this message" + str(e))
        # Channel is already closed, so we can't ACK this message;
        # log and/or do something that makes sense for your app in this case.
        #pass

def handle_message(connection, ch, delivery_tag, message):
    global speedvars
    start = time.time()
    thread_id = threading.get_ident()

    try:
        speedvars.inc(SPD_RECEIVED)
        body = parse_message(message)
        (id, date, text) = get_body_elements(body)
        words = len(text.split())
        if words <= Config.AMQ_DAEMONS['consumer']['word-count-limit']:
            process_article(id, date, text)
        else:
            Logger.log_message('Ignoring article, over word count limit')
            speedvars.inc(SPD_DISCARDED)

    except Exception as e:
        Logger.error_message("Could not process message - " + str(e))

    cb = functools.partial(ack_message, ch, delivery_tag)
    connection.add_callback_threadsafe(cb)

    Logger.log_message("Thread id: "+str(thread_id)+" Delivery tag: "+str(delivery_tag))
    Logger.log_message("TOtal time taken to handle message : "+ str(time.time()-start))

# CALL BACK
## def on_message(ch, method, properties, message):
##    global executor
##    executor.submit(handle_message, message)

def on_message(ch, method, header_frame, message, args):
    (connection, threads) = args
    delivery_tag = method.delivery_tag
    t = threading.Thread(target=handle_message, args=(connection, ch, delivery_tag, message))
    t.start()
    threads.append(t)

####################################################
@pidfile(piddir=Config.AMQ_DAEMONS['base']['pid-dir'], pidname=Config.AMQ_DAEMONS['consumer']['pid-file'])
def app_main():
    global channel, results, speedvars

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()

    # Pika Connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=Config.AMQ_DAEMONS['base']['amq-host']))
    channel = connection.channel()

    Logger.log_message('Setting up input queue consumer')
    channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)

    #channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
    channel.basic_qos(prefetch_count=1)
    threads = []
    on_message_callback = functools.partial(on_message, args=(connection, threads))
    channel.basic_consume(on_message_callback, Config.AMQ_DAEMONS['consumer']['input'])

    Logger.log_message('Starting loop')
    ## channel.start_consuming()
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()

    Wait for all to complete
    for thread in threads:
        thread.join()

    connection.close()

app_main()

鼠兔没有花很多时间来处理消息,我仍然面临连接重置问题。

**处理消息所花费的总时间:0.0005991458892822266 **

原文由 irum zahra 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.6k
1 个回答

您的 handle_message 方法正在阻止心跳,因为您的所有代码(包括 Pika I/O 循环)都在同一个线程上运行。查看 此示例,了解如何在 Pikas I/O 循环的单独线程上运行您的工作 ( handle_message ),然后正确确认消息。


注意: RabbitMQ 团队监控 rabbitmq-users 邮件列表,有时只在 StackOverflow 上回答问题。

原文由 Luke Bakken 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进