python3 使用 multiprocessing.Pool时,子进程未启动

代码如下,for_process 是待启动的子进程。
用 A 代表被注释的 4 行代码,B 代表之后的 6 行代码。
我的感觉是 A 和 B 应该是等价的,但是事实上 B代码可以正常运行,替换成A这一段之后,貌似就没有进入fro_process这个函数(函数的第一行是print(“子进程启动”)),连这个print都没有运行。

不知道是哪里写错了~
上网看了下别人的事例,貌似使用进程池的时候,确实只需要三行代码,不知道我这三行代码是哪里错了?
系统是win 10 64位,python 3。

    # p = Pool(3)
    # for i in range(3):
    #     print(i)
    #     p.apply_async(for_process, args=(title_list_of_init_half[i], dict_init_queue))
    # p.close()

    p_fi = Process(target=for_process, args=(title_list_of_init_half[0], dict_init_queue))
    p_se = Process(target=for_process, args=(title_list_of_init_half[1], dict_init_queue))
    p_th = Process(target=for_process, args=(title_list_of_init_half[2], dict_init_queue))
    p_fi.daemon = True
    p_se.daemon = True
    p_th.daemon = True
    p_fi.start()
    p_se.start()
    p_th.start()

下面是完整的代码:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import jieba
import math
import os
from datetime import datetime
from collections import defaultdict
import sqlite3
from multiprocessing import Process, Queue, Pool
from time import sleep, ctime

user_dict = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) + "/DATA/user_dict/game_name_list.txt"
init_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) + "/DATA/doc_list.txt"
update_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) + "/DATA/doc_list_mini.txt"
num_of_rows = 1000000 * 2.7  # 暂时用来替代 len(title_list)


def database_create():
    connect = sqlite3.connect("weight.db")
    cursor = connect.cursor()
    flag = True

    try:
        cursor.execute(
                'create table keywords'
                '(key varchar(20) primary key,key_weight float,docs varchar(2000),games varchar(900))'
        )
    except Exception as e:
        print("error: " + str(e) + " @ database_create")
        flag = False
    finally:
        cursor.close()
        connect.commit()
        connect.close()
        return flag


def database_merge(target_list, is_init=False):
    print("merge begin @ " + str(ctime()))
    connect = sqlite3.connect("weight.db")
    cursor = connect.cursor()
    flag = True
    try:
        for item in target_list:
            a0 = item
            a1 = target_list[item][0]
            a2 = ",".join(target_list[item][1])
            a3 = ",".join(target_list[item][2])

            if is_init:
                cursor.execute(
                        'insert into keywords(key,key_weight,docs,games) values (?,?,?,?)', (a0, a1, a2, a3))
            else:
                pass    # 如果有主键则 update,否则 insert。

    except Exception as e:
        print("error: " + str(e) + " @ database_merge")
        flag = False
    finally:
        cursor.close()
        connect.commit()
        connect.close()
    print("merge end @ " + str(ctime()))
    return flag


def load_user_dict():
    with open(user_dict, "r", encoding="utf-8") as ud:
        for line in ud.readlines():
            xxx = line.strip()
            jieba.add_word(xxx)


def default_weight():
    # 权重词典的定义如下:
    # 第0位(float):表示权重
    # 第1位(list):内有包含这个keywords的所有标题
    # 第2位(list):内有包含这个keywords的所有游戏名
    return [0, set([]), set([])]


def load_doc_list(this_path):
    with open(this_path, "r", encoding="utf-8") as tl:
        title_list = tl.readlines()
        # title_list 格式是:doc_id,doc_title,doc_game_name
    return title_list


def weight_calc(this_title_list):
    total_key_words = defaultdict(default_weight)

    for lines in this_title_list:
        titles = lines.split("\t")
        temp = jieba.cut(titles[1], cut_all=False)
        for keyword in temp:
            total_key_words[keyword][1].add(titles[0])  # 权重表存入 doc_id
            total_key_words[keyword][2].add(titles[2])  # 权重表存入 game_name

    dict_keywords_weight_docs_games = dict(
            (k, v) for k, v in total_key_words.items()
            # (k, v) for k, v in total_key_words.items() if (
            #     v[0] > 0 and len(v[1]) > 4 and len(k) > 1
            # )
    )

    return dict_keywords_weight_docs_games


def weight_database_init():
    title_list_of_init = load_doc_list(init_path)
    mid = int(len(title_list_of_init)/3)
    title_list_of_init_half = [
        title_list_of_init[0:mid + 1],
        title_list_of_init[mid:mid * 2 + 1],
        title_list_of_init[mid * 2:-1]
    ]
    dict_init_queue = Queue()

    # p = Pool(3)
    # for i in range(3):
    #     print(i)
    #     p.apply_async(for_process, args=(title_list_of_init_half[i], dict_init_queue))
    # p.close()

    p_fi = Process(target=for_process, args=(title_list_of_init_half[0], dict_init_queue))
    p_se = Process(target=for_process, args=(title_list_of_init_half[1], dict_init_queue))
    p_th = Process(target=for_process, args=(title_list_of_init_half[2], dict_init_queue))
    p_fi.daemon = True
    p_se.daemon = True
    p_th.daemon = True
    p_fi.start()
    p_se.start()
    p_th.start()

    dict_init = defaultdict(default_weight)
    process_count = 0
    while 1:
        a1 = dict_init_queue.get()
        this_key_word = a1[0]
        if a1 == "stop":
            process_count += 1
            if process_count == 3:
                break
            else:
                continue
        if this_key_word in dict_init:
            dict_init[this_key_word][1] = dict_init[this_key_word][1] | a1[2]
            dict_init[this_key_word][2] = dict_init[this_key_word][2] | a1[3]
        else:
            dict_init[this_key_word] = [a1[1], a1[2], a1[3]]
    for words in dict_init:
        title_count = len(dict_init[words][1])
        game_name_count = len(dict_init[words][2])
        dict_init[words][0] = math.log(num_of_rows / title_count / game_name_count)
    database_merge(dict_init, is_init=True)


def for_process(this_list, this_queue):
    print(str(os.getpid()) + " 子进程启动 @:" + str(ctime()))
    temp_dict = weight_calc(this_list)
    for item in temp_dict:
        list_1 = [item, temp_dict[item][0], temp_dict[item][1], temp_dict[item][2]]
        this_queue.put(list_1)
    this_queue.put("stop")
    print(str(os.getpid()) + " 子进程结束 @:" + str(ctime()))


def weight_database_update():
    title_list_of_update = load_doc_list(update_path)
    dict_update = weight_calc(title_list_of_update)
    database_merge(dict_update, is_init=False)


if __name__ == '__main__':
    start = datetime.now()
    database_create()   # 创建表
    weight_database_init()
    print(datetime.now()-start)
阅读 9.7k
2 个回答

只要在 A 代码 p.close() 之后加一句 p.join() 就是等效的。

或者在 B 代码 p_fi.start() 之前加一句 p_fi.daemon = p_se.daemon = p_th.daemon = True 也是等效的(这时 B 代码也不能正常运行)。

总之区别就是 A 代码没有等子进程完成就结束主进程了,B 代码等子进程完成再结束主进程。


更新完整代码后:

dict_init_queue = Queue()

需要改为:

from multiprocessing import Manager
manager = Manager()
dict_init_queue = manager.Queue()

否则不能在进程之间共享队列。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题