代码如下,for_process 是待启动的子进程。
用 A 代表被注释的 4 行代码,B 代表之后的 6 行代码。
我的感觉是 A 和 B 应该是等价的,但是事实上 B代码可以正常运行,替换成A这一段之后,貌似就没有进入fro_process这个函数(函数的第一行是print(“子进程启动”)),连这个print都没有运行。
不知道是哪里写错了~
上网看了下别人的事例,貌似使用进程池的时候,确实只需要三行代码,不知道我这三行代码是哪里错了?
系统是win 10 64位,python 3。
# p = Pool(3)
# for i in range(3):
# print(i)
# p.apply_async(for_process, args=(title_list_of_init_half[i], dict_init_queue))
# p.close()
p_fi = Process(target=for_process, args=(title_list_of_init_half[0], dict_init_queue))
p_se = Process(target=for_process, args=(title_list_of_init_half[1], dict_init_queue))
p_th = Process(target=for_process, args=(title_list_of_init_half[2], dict_init_queue))
p_fi.daemon = True
p_se.daemon = True
p_th.daemon = True
p_fi.start()
p_se.start()
p_th.start()
下面是完整的代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import jieba
import math
import os
from datetime import datetime
from collections import defaultdict
import sqlite3
from multiprocessing import Process, Queue, Pool
from time import sleep, ctime
user_dict = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) + "/DATA/user_dict/game_name_list.txt"
init_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) + "/DATA/doc_list.txt"
update_path = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) + "/DATA/doc_list_mini.txt"
num_of_rows = 1000000 * 2.7 # 暂时用来替代 len(title_list)
def database_create():
connect = sqlite3.connect("weight.db")
cursor = connect.cursor()
flag = True
try:
cursor.execute(
'create table keywords'
'(key varchar(20) primary key,key_weight float,docs varchar(2000),games varchar(900))'
)
except Exception as e:
print("error: " + str(e) + " @ database_create")
flag = False
finally:
cursor.close()
connect.commit()
connect.close()
return flag
def database_merge(target_list, is_init=False):
print("merge begin @ " + str(ctime()))
connect = sqlite3.connect("weight.db")
cursor = connect.cursor()
flag = True
try:
for item in target_list:
a0 = item
a1 = target_list[item][0]
a2 = ",".join(target_list[item][1])
a3 = ",".join(target_list[item][2])
if is_init:
cursor.execute(
'insert into keywords(key,key_weight,docs,games) values (?,?,?,?)', (a0, a1, a2, a3))
else:
pass # 如果有主键则 update,否则 insert。
except Exception as e:
print("error: " + str(e) + " @ database_merge")
flag = False
finally:
cursor.close()
connect.commit()
connect.close()
print("merge end @ " + str(ctime()))
return flag
def load_user_dict():
with open(user_dict, "r", encoding="utf-8") as ud:
for line in ud.readlines():
xxx = line.strip()
jieba.add_word(xxx)
def default_weight():
# 权重词典的定义如下:
# 第0位(float):表示权重
# 第1位(list):内有包含这个keywords的所有标题
# 第2位(list):内有包含这个keywords的所有游戏名
return [0, set([]), set([])]
def load_doc_list(this_path):
with open(this_path, "r", encoding="utf-8") as tl:
title_list = tl.readlines()
# title_list 格式是:doc_id,doc_title,doc_game_name
return title_list
def weight_calc(this_title_list):
total_key_words = defaultdict(default_weight)
for lines in this_title_list:
titles = lines.split("\t")
temp = jieba.cut(titles[1], cut_all=False)
for keyword in temp:
total_key_words[keyword][1].add(titles[0]) # 权重表存入 doc_id
total_key_words[keyword][2].add(titles[2]) # 权重表存入 game_name
dict_keywords_weight_docs_games = dict(
(k, v) for k, v in total_key_words.items()
# (k, v) for k, v in total_key_words.items() if (
# v[0] > 0 and len(v[1]) > 4 and len(k) > 1
# )
)
return dict_keywords_weight_docs_games
def weight_database_init():
title_list_of_init = load_doc_list(init_path)
mid = int(len(title_list_of_init)/3)
title_list_of_init_half = [
title_list_of_init[0:mid + 1],
title_list_of_init[mid:mid * 2 + 1],
title_list_of_init[mid * 2:-1]
]
dict_init_queue = Queue()
# p = Pool(3)
# for i in range(3):
# print(i)
# p.apply_async(for_process, args=(title_list_of_init_half[i], dict_init_queue))
# p.close()
p_fi = Process(target=for_process, args=(title_list_of_init_half[0], dict_init_queue))
p_se = Process(target=for_process, args=(title_list_of_init_half[1], dict_init_queue))
p_th = Process(target=for_process, args=(title_list_of_init_half[2], dict_init_queue))
p_fi.daemon = True
p_se.daemon = True
p_th.daemon = True
p_fi.start()
p_se.start()
p_th.start()
dict_init = defaultdict(default_weight)
process_count = 0
while 1:
a1 = dict_init_queue.get()
this_key_word = a1[0]
if a1 == "stop":
process_count += 1
if process_count == 3:
break
else:
continue
if this_key_word in dict_init:
dict_init[this_key_word][1] = dict_init[this_key_word][1] | a1[2]
dict_init[this_key_word][2] = dict_init[this_key_word][2] | a1[3]
else:
dict_init[this_key_word] = [a1[1], a1[2], a1[3]]
for words in dict_init:
title_count = len(dict_init[words][1])
game_name_count = len(dict_init[words][2])
dict_init[words][0] = math.log(num_of_rows / title_count / game_name_count)
database_merge(dict_init, is_init=True)
def for_process(this_list, this_queue):
print(str(os.getpid()) + " 子进程启动 @:" + str(ctime()))
temp_dict = weight_calc(this_list)
for item in temp_dict:
list_1 = [item, temp_dict[item][0], temp_dict[item][1], temp_dict[item][2]]
this_queue.put(list_1)
this_queue.put("stop")
print(str(os.getpid()) + " 子进程结束 @:" + str(ctime()))
def weight_database_update():
title_list_of_update = load_doc_list(update_path)
dict_update = weight_calc(title_list_of_update)
database_merge(dict_update, is_init=False)
if __name__ == '__main__':
start = datetime.now()
database_create() # 创建表
weight_database_init()
print(datetime.now()-start)
只要在 A 代码
p.close()
之后加一句p.join()
就是等效的。或者在 B 代码
p_fi.start()
之前加一句p_fi.daemon = p_se.daemon = p_th.daemon = True
也是等效的(这时 B 代码也不能正常运行)。总之区别就是 A 代码没有等子进程完成就结束主进程了,B 代码等子进程完成再结束主进程。
更新完整代码后:
需要改为:
否则不能在进程之间共享队列。