我将如何在实时场景中使用 concurrent.futures 和队列?

新手上路,请多包涵

使用 Python 3 的 concurrent.futures 模块进行并行工作相当容易,如下所示。

 with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    future_to = {executor.submit(do_work, input, 60): input for input in dictionary}
    for future in concurrent.futures.as_completed(future_to):
        data = future.result()

在队列中插入和检索项目也非常方便。

 q = queue.Queue()
for task in tasks:
q.put(task)
while not q.empty():
   q.get()

我有一个脚本在后台运行以监听更新。现在,理论上假设,当这些更新到达时,我会将它们排队并使用 ThreadPoolExecutor 同时对它们进行处理。

现在,所有这些组件都单独工作,并且有意义,但我如何才能将它们一起使用呢?我不知道是否可以实时从队列中输入 ThreadPoolExecutor 工作,除非要工作的数据是预先确定的?

简而言之,我想要做的就是每秒接收 4 条消息的更新,将它们推入队列,并让我的 concurrent.futures 处理它们。如果我不这样做,那么我就会陷入一种缓慢的顺序方法。

让我们以下面 的 Python 文档中的规范示例为例

 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

URLS 的列表是固定的。是否可以实时提供此列表并让工作人员在他们过来时处理它,也许出于管理目的来自队列?我对我的方法是否 真的可行 感到有点困惑?

原文由 Ali Gajani 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 958
2 个回答

Python 文档中的 示例 已扩展为从队列中获取其工作。需要注意的一个变化是,此代码使用 concurrent.futures.wait 而不是 concurrent.futures.as_completed 以允许在等待其他工作完成时开始新工作。

 import concurrent.futures
import urllib.request
import time
import queue

q = queue.Queue()

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def feed_the_workers(spacing):
    """ Simulate outside actors sending in work to do, request each url twice """
    for url in URLS + URLS:
        time.sleep(spacing)
        q.put(url)
    return "DONE FEEDING"

def load_url(url, timeout):
    """ Retrieve a single page and report the URL and contents """
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

    # start a future for a thread which sends work in through the queue
    future_to_url = {
        executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}

    while future_to_url:
        # check for status of the futures which are currently working
        done, not_done = concurrent.futures.wait(
            future_to_url, timeout=0.25,
            return_when=concurrent.futures.FIRST_COMPLETED)

        # if there is incoming work, start a new future
        while not q.empty():

            # fetch a url from the queue
            url = q.get()

            # Start the load operation and mark the future with its URL
            future_to_url[executor.submit(load_url, url, 60)] = url

        # process any completed futures
        for future in done:
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
            else:
                if url == 'FEEDER DONE':
                    print(data)
                else:
                    print('%r page is %d bytes' % (url, len(data)))

            # remove the now completed future
            del future_to_url[future]

获取每个 url 两次的输出:

 'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
'http://www.bbc.co.uk/' page is 193780 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://www.foxnews.com/' page is 67574 bytes
'http://www.cnn.com/' page is 136975 bytes
DONE FEEDING
'http://www.bbc.co.uk/' page is 193605 bytes
'http://some-made-up-domain.com/' page is 896 bytes
'http://europe.wsj.com/' page is 874649 bytes
'http://europe.wsj.com/' page is 874649 bytes

原文由 Stephen Rauch 发布,翻译遵循 CC BY-SA 3.0 许可协议

在工作中,我发现了一种情况,我想对无限的数据流进行并行处理。我创建了一个小型图书馆,灵感来自 Stephen Rauch 已经提供的出色答案。

我最初通过考虑两个单独的线程来解决这个问题,一个将工作提交到队列,另一个监视队列中是否有任何已完成的任务,并为新工作的进入腾出更多空间。这类似于 Stephen Rauch 提出的建议,其中他使用在单独线程中运行的 feed_the_workers 函数消耗流。

在与我的一位同事交谈时,他帮助我意识到,如果您定义一个缓冲迭代器,您可以在每次准备好时控制从输入流中释放多少元素,那么您可以在一个线程中完成所有事情向线程池提交更多工作。

所以我们引入 BufferedIter

class BufferedIter(object):
    def __init__(self, iterator):
        self.iter = iterator

    def nextN(self, n):
        vals = []
        for _ in range(n):
            vals.append(next(self.iter))
        return vals

这允许我们以下列方式定义流处理器

import logging
import queue
import signal
import sys
import time
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED

level = logging.DEBUG
log = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
handler.setLevel(level)
log.addHandler(handler)
log.setLevel(level)

WAIT_SLEEP = 1  # second, adjust this based on the timescale of your tasks

def stream_processor(input_stream, task, num_workers):

    # Use a queue to signal shutdown.
    shutting_down = queue.Queue()

    def shutdown(signum, frame):
        log.warning('Caught signal %d, shutting down gracefully ...' % signum)
        # Put an item in the shutting down queue to signal shutdown.
        shutting_down.put(None)

    # Register the signal handler
    signal.signal(signal.SIGTERM, shutdown)
    signal.signal(signal.SIGINT, shutdown)

    def is_shutting_down():
        return not shutting_down.empty()

    futures = dict()
    buffer = BufferedIter(input_stream)
    with ThreadPoolExecutor(num_workers) as executor:
        num_success = 0
        num_failure = 0
        while True:
            idle_workers = num_workers - len(futures)

            if not is_shutting_down():
                items = buffer.nextN(idle_workers)
                for data in items:
                    futures[executor.submit(task, data)] = data

            done, _ = wait(futures, timeout=WAIT_SLEEP, return_when=ALL_COMPLETED)
            for f in done:
                data = futures[f]
                try:
                    f.result(timeout=0)
                except Exception as exc:
                    log.error('future encountered an exception: %r, %s' % (data, exc))
                    num_failure += 1
                else:
                    log.info('future finished successfully: %r' % data)
                    num_success += 1

                del futures[f]

            if is_shutting_down() and len(futures) == 0:
                break

        log.info("num_success=%d, num_failure=%d" % (num_success, num_failure))

下面我们展示一个如何使用流处理器的例子

import itertools

def integers():
    """Simulate an infinite stream of work."""
    for i in itertools.count():
        yield i

def task(x):
    """The task we would like to perform in parallel.
    With some delay to simulate a time consuming job.
    With a baked in exception to simulate errors.
    """
    time.sleep(3)
    if x == 4:
        raise ValueError('bad luck')
    return x * x

stream_processor(integers(), task, num_workers=3)

此示例的输出如下所示

2019-01-15 22:34:40,193 future finished successfully: 1
2019-01-15 22:34:40,193 future finished successfully: 0
2019-01-15 22:34:40,193 future finished successfully: 2
2019-01-15 22:34:43,201 future finished successfully: 5
2019-01-15 22:34:43,201 future encountered an exception: 4, bad luck
2019-01-15 22:34:43,202 future finished successfully: 3
2019-01-15 22:34:46,208 future finished successfully: 6
2019-01-15 22:34:46,209 future finished successfully: 7
2019-01-15 22:34:46,209 future finished successfully: 8
2019-01-15 22:34:49,215 future finished successfully: 11
2019-01-15 22:34:49,215 future finished successfully: 10
2019-01-15 22:34:49,215 future finished successfully: 9
^C <=== THIS IS WHEN I HIT Ctrl-C
2019-01-15 22:34:50,648 Caught signal 2, shutting down gracefully ...
2019-01-15 22:34:52,221 future finished successfully: 13
2019-01-15 22:34:52,222 future finished successfully: 14
2019-01-15 22:34:52,222 future finished successfully: 12
2019-01-15 22:34:52,222 num_success=14, num_failure=1

原文由 Pedro M Duarte 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题