使用 Python 和多线程处理巨大的 CSV 文件

新手上路,请多包涵

我有一个函数可以从一个巨大的 CSV 文件中懒惰地生成行:

 def get_next_line():
    with open(sample_csv,'r') as f:
        for line in f:
            yield line

def do_long_operation(row):
    print('Do some operation that takes a long time')

我需要使用线程,以便我可以调用从上述函数获得的每条记录 do_long_operation

网上大部分地方都有这样的例子,我不太确定自己走的路对不对。

 import threading
thread_list = []
for i in range(8):
   t = threading.Thread(target=do_long_operation, args=(get_next_row from get_next_line))
   thread_list.append(t)

for thread in thread_list:
    thread.start()

for thread in thread_list:
    thread.join()

我的问题是:

  1. 我如何只启动有限数量的线程,比如 8 个?

  2. 我如何确保每个线程都会从 get_next_line 获得一行?

原文由 user3249433 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 819
2 个回答

您可以使用来自 multiprocessing 的线程池并将您的任务映射到工作池:

 from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool
from random import randint
from time import sleep

def process_line(l):
    print l, "started"
    sleep(randint(0, 3))
    print l, "done"

def get_next_line():
    with open("sample.csv", 'r') as f:
        for line in f:
            yield line

f = get_next_line()

t = Pool(processes=8)

for i in f:
    t.map(process_line, (i,))
t.close()
t.join()

这将创建八个工作人员并将您的行一一提交给他们。一旦一个进程“空闲”,它就会被分配一个新的任务。

也有一个注释掉的导入语句。如果您注释掉 ThreadPool 并从 multiprocessing 导入 Pool ,您将获得子进程而不是线程,这在您的情况下可能更有效。

原文由 Hannu 发布,翻译遵循 CC BY-SA 4.0 许可协议

使用 multiprocessing 中的 Pool/ThreadPool 将任务映射到工作池,并使用 Queue 来控制内存中保留的任务数量(因此如果工作进程很慢,我们不会提前读入巨大的 CSV 文件):

 from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool
from random import randint
import time, os
from multiprocessing import Queue

def process_line(l):
    print("{} started".format(l))
    time.sleep(randint(0, 3))
    print("{} done".format(l))

def get_next_line():
    with open(sample_csv, 'r') as f:
        for line in f:
            yield line

# use for testing
# def get_next_line():
#     for i in range(100):
#         print('yielding {}'.format(i))
#         yield i

def worker_main(queue):
    print("{} working".format(os.getpid()))
    while True:
        # Get item from queue, block until one is available
        item = queue.get(True)
        if item == None:
            # Shutdown this worker and requeue the item so other workers can shutdown as well
            queue.put(None)
            break
        else:
            # Process item
            process_line(item)
    print("{} done working".format(os.getpid()))

f = get_next_line()

# Use a multiprocessing queue with maxsize
q = Queue(maxsize=5)

# Start workers to process queue items
t = Pool(processes=8, initializer=worker_main, initargs=(q,))

# Enqueue items. This blocks if the queue is full.
for l in f:
    q.put(l)

# Enqueue the shutdown message (i.e. None)
q.put(None)

# We need to first close the pool before joining
t.close()
t.join()

原文由 KB5 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进