FastAPI 和 Celery 中使用 billiard 多进程进行 KMeans 聚类时程序卡住怎么办?

新手上路,请多包涵

我用fastapi和celery构造了一个后台系统,其中一个功能是接收请求后要对pandas格式的每一列数据进行Kmeans处理,我使用了billiard多进程来加速运算,在实际测试中,请求一次成功,下一次不成功,再下一次成功,不成功都是多进程卡在Kmeans处程序不动了,很奇怪,希望大神帮我看看,celey里是采用delay方式调用的。
下面是我程序调用的代码片段

from sklearn.cluster import KMeans
from tqdm import tqdm
from yellowbrick.cluster import kelbow_visualizer
from billiard.pool import Pool

def knn_some(data_x, n=10, t=1):
    data_x = np.array(data_x)
    # 变形状
    if len(data_x.shape) == 1:
        data_x = data_x.reshape(-1, 1)
    if data_x.shape[0] < n:
        n = data_x.shape[0]//2
    # 找出最合适的K
    oz = kelbow_visualizer(KMeans(random_state=1, n_init='auto'), data_x, k=(2, n), show=False)
    k = oz.elbow_value_
    if k is None:
        k = 10
    # 再进行KMeans聚类
    # 样本的数量
    n_samples = data_x.shape[0]
    if n_samples < k * t:
        k = n_samples // t
    kmeans = KMeans(n_clusters=k * t, random_state=1, n_init='auto')
    kmeans.fit(data_x)
    return kmeans

def process_column(column_data):
    return knn_some(column_data, n=20, t=1)
    
def updown_deal(df, param):
    thing_dic = {}
    # 去除只包含一个唯一值的列
    df = df.dropna(axis=1, how='all')
    df = df.loc[:, df.apply(pd.Series.nunique) != 1]
    columns_to_process = set(df.columns) - set([eci, sdate, target])
    # 改为多进程,节约时间
    num_processes = 1000
    if os.cpu_count() > 2:
        num_processes = os.cpu_count()-2  # 使用CPU核心数作为进程数 
    num_processes = min(num_processes, os.cpu_count())
    num_processes = min(num_processes, len(columns_to_process))
    logger.info(f'多进程处理上下门限开始,{num_processes}核心参与运算')
    with Pool(processes=num_processes) as pool:
        results = pool.map(process_column, [df[c].dropna().values for c in columns_to_process])
    logger.info('多进程处理上下门限完毕')

1>我尝试着限制数据大小,发现不管数据多少都是没用的
2>我尝试别的多进程方式,也是不管用,我在knn_some方法中打印日志是可以打印的,但就是到Kmeans就不动了,而且还是成一次卡一次。

阅读 345
avatarAI BotBETA

回答

问题可能是由于在使用 billiard 多进程时,KMeans 聚类操作中的某些资源或状态没有正确管理或同步。这里有几个可能的解决方案和检查点:

  1. 确保数据在进程间正确传递
    在使用多进程时,确保传递给每个进程的数据是独立的,并且不会受到其他进程的影响。在你的代码中,你传递了 df[c].dropna().values 给每个进程,这看起来是正确的。然而,确保这些数据在进程间没有共享的状态。
  2. 检查资源限制
    KMeans 聚类可能会消耗大量内存和计算资源。如果你的系统资源有限(例如,内存),多个 KMeans 实例同时运行可能会导致资源竞争或耗尽。尝试减少并行进程的数量,看看是否有所改善。
  3. 避免全局变量和共享状态
    确保 knn_someprocess_column 函数不依赖于全局变量或共享状态。在多进程环境中,全局变量和共享状态可能导致不可预测的行为。
  4. 使用 apply_async 而不是 map
    使用 Pool.apply_async 而不是 Pool.map 可以更好地控制任务的分配和结果的收集。这允许你设置回调函数来处理每个任务的结果,并可能更好地管理内存和进程状态。
  5. 检查日志和错误输出
    增加更多的日志输出,特别是在 KMeans.fit 调用之前和之后。这可以帮助你确定程序是否卡在了 KMeans 调用上,还是由于其他原因。
  6. 限制并行任务的数量
    即使你有多个 CPU 核心,也可能需要限制并行运行的任务数量,以避免资源耗尽。尝试减少 num_processes 的值,看看是否有助于稳定程序。
  7. 考虑使用其他并行化策略
    如果问题持续存在,考虑使用其他并行化策略,如使用 Dask 或 Apache Spark 等框架,这些框架在大数据和并行计算方面可能更健壮。

如果上述建议都不能解决问题,可能需要更深入地检查 billiardKMeans 的内部实现,看看是否有已知的兼容性问题或资源争用问题。

1 个回答

很奇怪,在这里的进程需要写成下面这样才能执行,但是在别的地方不这样写就可以执行。

    with Pool(processes=num_processes) as pool:
        results = []
        for column_name in columns_to_process:
            column_data = df[column_name].dropna().values
            # 使用apply_async异步地将任务分配到进程池中
            result = pool.apply_async(process_column, args=(column_data,))
            results.append(result)
            # 关闭进程池,不再接受新的任务
        pool.close()
            # 等待所有异步任务完成
        pool.join()


# [i.get() for i in results]

更新过了一晚就不行了,真的奇怪

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
宣传栏