Python - queue.task_done() 的用途是什么?

新手上路,请多包涵

我编写了一个脚本,该脚本具有多个线程(使用 threading.Thread 创建)从 Queue 使用 queue.get_nowait() 获取 URL,然后处理- HTML。我是多线程编程的新手,无法理解 queue.task_done() 函数的用途。

Queue 为空时,自动返回 queue.Empty 异常。所以我不明白每个线程调用 task_done() 函数的必要性。我们知道当队列为空时我们已经完成了队列,那么为什么我们需要通知它工作线程已经完成了他们的工作(这与队列无关,在他们从队列中获取 URL 之后) ?

有人可以向我提供一个代码示例(最好使用 urllib 、文件 I/O 或除斐波那契数以外的其他内容并打印“Hello”),向我展示如何在实际应用中使用此功能?

原文由 J. Taylor 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 979
2 个回答

Queue.task_done 不是为了工人的利益。它在那里支持 Queue.join


如果我给你一盒工作任务,我会关心你什么时候把所有东西都拿出来了吗?

不,我关心 _的是工作什么时候完成_。看着一个空盒子并不能告诉我这一点。你和其他 5 个人可能仍在研究你开箱即用的东西。

Queue.task_done 让工作人员说出 _任务何时完成_。等待所有工作完成的人 Queue.join 将等到足够的 task_done 调用已经完成,而不是在队列为空时。


eigenfield 在评论中指出,队列拥有 task_done / join 方法似乎真的很奇怪。没错,但这确实是一个命名问题。 queue 模块有错误的名称选择,使它听起来像一个通用队列库,而实际上它是一个线程通信库。

通用队列有 task_done / join 方法会很奇怪,但是对于线程间消息通道来说,有一种方法来指示消息有被处理。 If the class was called thread_communication.MessageChannel instead of queue.Queue and task_done was called message_processed , the intent would be a lot clearer.

(如果您需要通用队列而不是线程间消息通道,请使用 collections.deque 。)

原文由 user2357112 发布,翻译遵循 CC BY-SA 4.0 许可协议

.task_done() 用于标记 .join() 处理完成。

💡 如果您使用 .join() 并且不为每个已处理的项目调用 .task_done() ,您的脚本将永远挂起。


这不是一个简短的例子;

 import logging
import queue
import threading
import time

items_queue = queue.Queue()
running = False

def items_queue_worker():
    while running:
        try:
            item = items_queue.get(timeout=0.01)
            if item is None:
                continue

            try:
                process_item(item)
            finally:
                items_queue.task_done()

        except queue.Empty:
            pass
        except:
            logging.exception('error while processing item')

def process_item(item):
    print('processing {} started...'.format(item))
    time.sleep(0.5)
    print('processing {} done'.format(item))

if __name__ == '__main__':
    running = True

    # Create 10 items_queue_worker threads
    worker_threads = 10
    for _ in range(worker_threads):
        threading.Thread(target=items_queue_worker).start()

    # Populate your queue with data
    for i in range(100):
        items_queue.put(i)

    # Wait for all items to finish processing
    items_queue.join()

    running = False

原文由 Jossef Harush Kadouri 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题