服务器训练模型超时(Pytorch)?

新手上路,请多包涵

我正在远程服务器上运行模型训练代码,调用 2 个节点上的 8 个 GPU。但出现了以下问题,请问该怎么解决?

“torch.distributed.DistStoreError: Timed out after 901 seconds waiting for clients. 1/2 clients joined.


代码:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
import random
import numpy as np
import os
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from transformers import AutoTokenizer
from ttt import TTTForCausalLM, TTTConfig, TTT_STANDARD_CONFIGS
from torch import amp  # 混合精度训练
from sklearn.model_selection import train_test_split

# 设置随机种子
def setup_seed(seed):
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)

# 初始化分布式环境
def setup_distributed():
    dist.init_process_group(backend='nccl', init_method='env://')
    rank = dist.get_rank()  # 获取全局 rank
    world_size = dist.get_world_size()  # 获取全局 world_size
    local_rank = rank % torch.cuda.device_count()  # 计算每个节点的局部 GPU rank
    torch.cuda.set_device(local_rank)  # 设置 GPU 设备为 local_rank
    setup_seed(0)  # 所有进程使用相同的种子
    return rank, world_size, local_rank

rank, world_size, local_rank = setup_distributed()
device = torch.device("cuda", local_rank)

PAD_TOKEN = '[PAD]'
SOS_TOKEN = '[SOS]'
EOS_TOKEN = '[EOS]'
UNK_TOKEN = '[UNK]'

# 所有进程都读取和处理数据
with open('preprocessed_sequences.txt', 'r') as f:
    sequences = [line.strip() for line in f.readlines()]
# 过滤掉空序列
sequences = [seq for seq in sequences if seq]
print(f"Rank {rank}: 共加载了 {len(sequences)} 条序列。")

# 仅在主进程中构建词汇表
if rank == 0:
    amino_acids = set(''.join(sequences))
    amino_acids = sorted(list(amino_acids))
    print(f"氨基酸种类:{amino_acids}")

    vocab = [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN, UNK_TOKEN] + amino_acids
    vocab_to_idx = {token: idx for idx, token in enumerate(vocab)}
    idx_to_vocab = {idx: token for idx, token in enumerate(vocab)}
    vocab_size = len(vocab)
    print(f"词汇表大小:{vocab_size}")
else:
    amino_acids = None
    vocab = None
    vocab_to_idx = None
    idx_to_vocab = None
    vocab_size = None

# 广播词汇表到所有进程
amino_acids = [amino_acids]
vocab = [vocab]
vocab_to_idx = [vocab_to_idx]
idx_to_vocab = [idx_to_vocab]
vocab_size = [vocab_size]

dist.broadcast_object_list(amino_acids, src=0)
dist.broadcast_object_list(vocab, src=0)
dist.broadcast_object_list(vocab_to_idx, src=0)
dist.broadcast_object_list(idx_to_vocab, src=0)
dist.broadcast_object_list(vocab_size, src=0)

amino_acids = amino_acids[0]
vocab = vocab[0]
vocab_to_idx = vocab_to_idx[0]
idx_to_vocab = idx_to_vocab[0]
vocab_size = vocab_size[0]

# 定义编码函数
def encode_sequence(seq):
    return [vocab_to_idx.get(char, vocab_to_idx[UNK_TOKEN]) for char in seq]

# 编码所有序列
encoded_sequences = [encode_sequence(seq) for seq in sequences]

# 使用相同的随机种子确保数据划分一致
indices = np.arange(len(encoded_sequences))
train_indices, val_indices = train_test_split(indices, test_size=0.2, random_state=42)

train_seqs_full = [encoded_sequences[i] for i in train_indices]
val_seqs_full = [encoded_sequences[i] for i in val_indices]

chunk_size = 1000  # 每个子集包含的序列数

# 计算子集数量
num_chunks = (len(train_seqs_full) + chunk_size - 1) // chunk_size
print(f"Rank {rank}: 训练集将被分成 {num_chunks} 个子集。")

# 自定义数据集类
class ProteinDataset(Dataset):
    def __init__(self, sequences, max_length=2048):
        self.sequences = sequences
        self.max_length = max_length

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        seq = self.sequences[idx]
        max_seq_len = random.randint(100, self.max_length - 2)
        seq = seq[:max_seq_len - 2]
        seq_input = [vocab_to_idx[SOS_TOKEN]] + seq + [vocab_to_idx[EOS_TOKEN]]
        seq_input += [vocab_to_idx[PAD_TOKEN]] * (self.max_length - len(seq_input))
        return torch.tensor(seq_input)

def collate_fn(batch):
    input_ids = torch.stack(batch)
    labels = input_ids.clone()
    return input_ids, labels

# 定义模型配置
config = TTTConfig(
    vocab_size=vocab_size,
    **TTT_STANDARD_CONFIGS['350m'],
    max_position_embeddings=2048,
    ttt_layer_type='linear',
    ttt_base_lr=1.0,
    pre_conv=False,
    conv_kernel=4,
)

# 检查并加载模型
if os.path.isfile('ttt_model_complete.pth'):
    print(f"进程 {rank}:检测到已保存的模型,加载模型...")
    model = TTTForCausalLM(config)
    checkpoint = torch.load('ttt_model_complete.pth', map_location=device)
    model.load_state_dict(checkpoint['model'])
    start_epoch = checkpoint.get('epoch', 0)
    best_val_loss = checkpoint.get('loss', float('inf'))
    print(f"加载模型,epoch {start_epoch},loss: {best_val_loss:.4f}")
else:
    print(f"进程 {rank}:未检测到已保存的模型,初始化新模型...")
    model = TTTForCausalLM(config)
    start_epoch = 0
    best_val_loss = float('inf')

model = model.to(device)
model = DDP(model, device_ids=[local_rank], output_device=local_rank)

# 定义优化器和学习率调度器
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)

# 定义混合精度训练的梯度缩放器
scaler = amp.GradScaler()

# Early stopping 和训练相关变量
early_stopping_patience = 3
early_stopping_counter = 0
train_losses = []
val_losses = []
num_epochs = 4

# 保存模型的函数
def save_model(epoch, model, loss, file_path='ttt_model_complete.pth'):
    if rank == 0:
        state = {
            'epoch': epoch,
            'model': model.state_dict(),
            'loss': loss
        }
        torch.save(state, file_path)
        print(f"模型已保存在 epoch {epoch},路径为: {file_path}")

# 训练循环
for epoch in range(start_epoch, num_epochs):
    model.train()
    epoch_loss = 0

    # 分块处理训练数据
    for chunk_idx in range(num_chunks):
        start_idx = chunk_idx * chunk_size
        end_idx = min((chunk_idx + 1) * chunk_size, len(train_seqs_full))
        train_seqs = train_seqs_full[start_idx:end_idx]
        print(f"Rank {rank}: 正在处理训练数据子集 {chunk_idx + 1}/{num_chunks}, 包含 {len(train_seqs)} 条序列。")

        train_dataset = ProteinDataset(train_seqs)
        train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank, shuffle=True)
        train_loader = DataLoader(train_dataset, batch_size=2, sampler=train_sampler, collate_fn=collate_fn)

        train_sampler.set_epoch(epoch + chunk_idx)

        if rank == 0:
            progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs} Chunk {chunk_idx + 1}/{num_chunks}", leave=False)
        else:
            progress_bar = train_loader

        for input_ids, labels in progress_bar:
            input_ids = input_ids.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()

            with amp.autocast(device_type='cuda'):
                outputs = model(input_ids=input_ids, labels=labels)
                loss = outputs.loss

            scaler.scale(loss).backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            scaler.step(optimizer)
            scaler.update()

            epoch_loss += loss.item()
            if rank == 0:
                progress_bar.set_postfix(loss=f"{loss.item():.4f}")

    avg_loss = epoch_loss / (len(train_seqs_full))
    if rank == 0:
        train_losses.append(avg_loss)
        print(f"第 {epoch + 1}/{num_epochs} 轮,训练损失:{avg_loss:.4f}")

    model.eval()
    val_loss = 0

    val_chunk_size = 1000
    val_num_chunks = (len(val_seqs_full) + val_chunk_size - 1) // val_chunk_size

    for val_chunk_idx in range(val_num_chunks):
        val_start_idx = val_chunk_idx * val_chunk_size
        val_end_idx = min((val_chunk_idx + 1) * val_chunk_size, len(val_seqs_full))
        val_seqs = val_seqs_full[val_start_idx:val_end_idx]
        print(f"Rank {rank}: 正在处理验证数据子集 {val_chunk_idx + 1}/{val_num_chunks}, 包含 {len(val_seqs)} 条序列。")

        val_dataset = ProteinDataset(val_seqs)
        val_sampler = DistributedSampler(val_dataset, num_replicas=world_size, rank=rank)
        val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, sampler=val_sampler, collate_fn=collate_fn)

        with torch.no_grad():
            for input_ids, labels in val_loader:
                input_ids = input_ids.to(device)
                labels = labels.to(device)
                outputs = model(input_ids=input_ids, labels=labels)
                loss = outputs.loss
                val_loss += loss.item()

    avg_val_loss = val_loss / len(val_seqs_full)
    if rank == 0:
        val_losses.append(avg_val_loss)
        print(f"第 {epoch + 1}/{num_epochs} 轮,验证损失:{avg_val_loss:.4f}")

        scheduler.step(avg_val_loss)

        save_model(epoch + 1, model.module, avg_val_loss)

        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            early_stopping_counter = 0
            torch.save(model.module.state_dict(), 'ttt_language_model_best.pth')
            print("模型已保存,验证损失降低。")
        else:
            early_stopping_counter += 1
            if early_stopping_counter >= early_stopping_patience:
                print(f"验证损失没有改善,提前停止训练。在 epoch {epoch + 1} 停止。")
                break

# 加载最优模型进行预测
if rank == 0:
    model.module.load_state_dict(torch.load('ttt_language_model_best.pth'))
    model.module.to('cpu')
    model.module.eval()

    def generate_sequence(model, start_tokens, max_length=2048, temperature=1.0):
        model.eval()

        input_ids = torch.tensor(
            [vocab_to_idx[SOS_TOKEN]] + [vocab_to_idx.get(token, vocab_to_idx[UNK_TOKEN]) for token in start_tokens],
            device='cpu'
        ).unsqueeze(0)

        generated_ids = model.generate(
            input_ids=input_ids,
            max_length=max_length,
            temperature=temperature,
            eos_token_id=vocab_to_idx[EOS_TOKEN],
            pad_token_id=vocab_to_idx[PAD_TOKEN],
            do_sample=True,
            top_k=50,
            top_p=0.95
        )

        generated_sequence = ''.join([idx_to_vocab[token_id.item()] for token_id in generated_ids[0][1:]])
        return generated_sequence

    start_tokens = random.choice(amino_acids)
    generated_seq = generate_sequence(model.module, start_tokens)
    print(f"以 '{start_tokens}' 开头生成的序列:{generated_seq}")

SLURM:

#!/bin/bash
#SBATCH -N 2   
#SBATCH --gpus=8        # 申请8个GPU
#SBATCH --ntasks=8         # 总共 8 个任务
#SBATCH -t 7-00:00:00       # 设置最大运行时间为 7 天
#SBATCH -o output_%j.log     # 输出文件(%j为作业号)
#SBATCH -e error_%j.log      # 错误文件(%j为作业号)

conda activate name

export MASTER_ADDR=$(scontrol show hostname $SLURM_NODELIST | head -n 1)
export MASTER_PORT=12355  # 确保该端口号在所有节点都未被占用
export RANK=$SLURM_PROCID
export NCCL_SOCKET_IFNAME=bond0
export NCCL_BLOCKING_WAIT=1
export NCCL_DEBUG=INFO
export NCCL_TIMEOUT=1200
export WORLD_SIZE=8


# 执行训练脚本
torchrun --nnodes=2 --nproc_per_node=8 script.py

希望能在服务器上成功跑通吧(已崩溃,一个节点上的8个GPU内存不够,2个节点老是通信问题

阅读 753
avatarAI BotBETA

针对你遇到的 PyTorch 分布式训练中的 torch.distributed.DistStoreError: Timed out after 901 seconds waiting for clients. 1/2 clients joined. 错误,这通常与节点间的通信和同步问题有关。以下是一些可能的解决步骤和检查点:

1. 确保网络配置正确

确保所有节点都能通过 MASTER_ADDRMASTER_PORT 进行通信。你可以使用如下命令来测试节点间的连通性:

# 在一个节点上执行
ping $MASTER_ADDR
# 或者使用nc (netcat)测试端口
nc -zv $MASTER_ADDR $MASTER_PORT

2. 检查环境变量设置

确保所有环境变量(如 NCCL_SOCKET_IFNAMENCCL_BLOCKING_WAITNCCL_TIMEOUT 等)在所有节点上都设置正确且一致。特别是 NCCL_SOCKET_IFNAME 应该指向实际用于节点间通信的网络接口。

3. 增加超时时间

你已经在 NCCL_TIMEOUT 设置了 1200 秒,这通常是足够的,但如果网络延迟较高或系统负载较重,可能需要进一步增加。

4. 检查防火墙和安全组设置

确保没有防火墙或安全组规则阻止节点间的通信,特别是 TCP/UDP 端口 MASTER_PORT

5. 使用正确的 torchrun 参数

你的 torchrun 命令看起来基本正确,但请确保 nnodesnproc_per_node 的总和与你的 WORLD_SIZE 一致。在你的例子中,这些值是匹配的(2 节点 x 8 GPU/节点 = 16,但 WORLD_SIZE 设置为 8,这可能是一个笔误或误解)。通常,WORLD_SIZE 应该与 torchrun 的总进程数相同。

6. 检查 SLURM 脚本

  • 确保 RANK 变量在 SLURM 脚本中正确设置,但通常 PyTorch 的分布式训练会自动处理 rank。你可能不需要显式设置 RANK
  • 确保 MASTER_ADDR 总是指向第一个节点的 IP 地址。

7. 调试和日志

  • 增加 NCCL 的日志级别(如 NCCL_DEBUG=VERBOSE)可能会提供更多关于通信问题的信息。
  • 检查 PyTorch 分布式训练的输出和日志文件,看是否有其他相关的错误或警告。

8. 同步 GPU 驱动程序和 CUDA 版本

确保所有节点上的 GPU 驱动程序和 CUDA 版本都相同,以避免不兼容问题。

9. 使用 torch.distributed.launch 替代 torchrun

如果问题持续存在,你可以尝试使用 torch.distributed.launch 而不是 torchrun 来启动你的训练脚本。这可能需要你稍微修改你的脚本以适应不同的启动方式。

10. 咨询服务器管理员

如果上述步骤都不能解决问题,可能需要咨询你的服务器管理员或 IT 支持团队,以了解是否存在网络配置、防火墙设置或其他系统级问题。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
宣传栏