为什么 kombu 的 ConsumerMixin 消费阻塞了?

from kombu.mixins import ConsumerMixin
from kombu import Connection, Exchange, Queue
from loguru import logger


class MyConsumer(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        print('创建消费者 start')
        queue_name = 'evt-ye.events-take--dna_create_service.auth'
        exchange_name = 'ye.events'
        routing_key = 'take'

        exchange = Exchange(exchange_name, type='topic')
        queue = Queue(
            queue_name, exchange=exchange,
            routing_key=routing_key,
            queue_arguments={'x-max-priority': 10}
        )

        # 创建一个消费者,并设置预取消息数量为10
        consumer = Consumer(
            queues=[queue], callbacks=[self.on_message],
            prefetch_count=10
        )
        print('创建消费者 down')
        return [consumer]

    def on_message(self, body, message):
        logger.debug(f"Received message: {body}")


with Connection('amqp://pon:pon@192.168.38.191:5672//') as conn:
    consumer = MyConsumer(conn)
    consumer.run()

上面的代码运行后输出

╰─➤  python -u "/Users/ponponon/Desktop/code/me/pon_example/demo.py"                                                                                                                                                                                                      130 ↵
创建消费者 start
创建消费者 down
2023-07-07 14:21:29.154 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.156 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈

然后就没有然后了,程序没有退出,一直阻塞着

图片.png

从 rabbitmq 的监控面板看,也一直出于阻塞状态

图片.png

用 wireshark 抓包看,也没有回复 ack

为什么 ?

阅读 2.2k
1 个回答
def on_message(self, body, message):
    logger.debug(f"Received message: {body}")
    message.ack()  # acknowledge the message
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题