我用fastapi和celery构造了一个后台系统,其中一个功能是接收请求后要对pandas格式的每一列数据进行Kmeans处理,我使用了billiard多进程来加速运算,在实际测试中,请求一次成功,下一次不成功,再下一次成功,不成功都是多进程卡在Kmeans处程序不动了,很奇怪,希望大神帮我看看,celey里是采用delay方式调用的。
下面是我程序调用的代码片段
from sklearn.cluster import KMeans
from tqdm import tqdm
from yellowbrick.cluster import kelbow_visualizer
from billiard.pool import Pool
def knn_some(data_x, n=10, t=1):
data_x = np.array(data_x)
# 变形状
if len(data_x.shape) == 1:
data_x = data_x.reshape(-1, 1)
if data_x.shape[0] < n:
n = data_x.shape[0]//2
# 找出最合适的K
oz = kelbow_visualizer(KMeans(random_state=1, n_init='auto'), data_x, k=(2, n), show=False)
k = oz.elbow_value_
if k is None:
k = 10
# 再进行KMeans聚类
# 样本的数量
n_samples = data_x.shape[0]
if n_samples < k * t:
k = n_samples // t
kmeans = KMeans(n_clusters=k * t, random_state=1, n_init='auto')
kmeans.fit(data_x)
return kmeans
def process_column(column_data):
return knn_some(column_data, n=20, t=1)
def updown_deal(df, param):
thing_dic = {}
# 去除只包含一个唯一值的列
df = df.dropna(axis=1, how='all')
df = df.loc[:, df.apply(pd.Series.nunique) != 1]
columns_to_process = set(df.columns) - set([eci, sdate, target])
# 改为多进程,节约时间
num_processes = 1000
if os.cpu_count() > 2:
num_processes = os.cpu_count()-2 # 使用CPU核心数作为进程数
num_processes = min(num_processes, os.cpu_count())
num_processes = min(num_processes, len(columns_to_process))
logger.info(f'多进程处理上下门限开始,{num_processes}核心参与运算')
with Pool(processes=num_processes) as pool:
results = pool.map(process_column, [df[c].dropna().values for c in columns_to_process])
logger.info('多进程处理上下门限完毕')
1>我尝试着限制数据大小,发现不管数据多少都是没用的
2>我尝试别的多进程方式,也是不管用,我在knn_some方法中打印日志是可以打印的,但就是到Kmeans就不动了,而且还是成一次卡一次。
很奇怪,在这里的进程需要写成下面这样才能执行,但是在别的地方不这样写就可以执行。
更新过了一晚就不行了,真的奇怪