我正在远程服务器上运行模型训练代码,调用 2 个节点上的 8 个 GPU。但出现了以下问题,请问该怎么解决?
“torch.distributed.DistStoreError: Timed out after 901 seconds waiting for clients. 1/2 clients joined.
”
代码:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
import random
import numpy as np
import os
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from transformers import AutoTokenizer
from ttt import TTTForCausalLM, TTTConfig, TTT_STANDARD_CONFIGS
from torch import amp # 混合精度训练
from sklearn.model_selection import train_test_split
# 设置随机种子
def setup_seed(seed):
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
# 初始化分布式环境
def setup_distributed():
dist.init_process_group(backend='nccl', init_method='env://')
rank = dist.get_rank() # 获取全局 rank
world_size = dist.get_world_size() # 获取全局 world_size
local_rank = rank % torch.cuda.device_count() # 计算每个节点的局部 GPU rank
torch.cuda.set_device(local_rank) # 设置 GPU 设备为 local_rank
setup_seed(0) # 所有进程使用相同的种子
return rank, world_size, local_rank
rank, world_size, local_rank = setup_distributed()
device = torch.device("cuda", local_rank)
PAD_TOKEN = '[PAD]'
SOS_TOKEN = '[SOS]'
EOS_TOKEN = '[EOS]'
UNK_TOKEN = '[UNK]'
# 所有进程都读取和处理数据
with open('preprocessed_sequences.txt', 'r') as f:
sequences = [line.strip() for line in f.readlines()]
# 过滤掉空序列
sequences = [seq for seq in sequences if seq]
print(f"Rank {rank}: 共加载了 {len(sequences)} 条序列。")
# 仅在主进程中构建词汇表
if rank == 0:
amino_acids = set(''.join(sequences))
amino_acids = sorted(list(amino_acids))
print(f"氨基酸种类:{amino_acids}")
vocab = [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN, UNK_TOKEN] + amino_acids
vocab_to_idx = {token: idx for idx, token in enumerate(vocab)}
idx_to_vocab = {idx: token for idx, token in enumerate(vocab)}
vocab_size = len(vocab)
print(f"词汇表大小:{vocab_size}")
else:
amino_acids = None
vocab = None
vocab_to_idx = None
idx_to_vocab = None
vocab_size = None
# 广播词汇表到所有进程
amino_acids = [amino_acids]
vocab = [vocab]
vocab_to_idx = [vocab_to_idx]
idx_to_vocab = [idx_to_vocab]
vocab_size = [vocab_size]
dist.broadcast_object_list(amino_acids, src=0)
dist.broadcast_object_list(vocab, src=0)
dist.broadcast_object_list(vocab_to_idx, src=0)
dist.broadcast_object_list(idx_to_vocab, src=0)
dist.broadcast_object_list(vocab_size, src=0)
amino_acids = amino_acids[0]
vocab = vocab[0]
vocab_to_idx = vocab_to_idx[0]
idx_to_vocab = idx_to_vocab[0]
vocab_size = vocab_size[0]
# 定义编码函数
def encode_sequence(seq):
return [vocab_to_idx.get(char, vocab_to_idx[UNK_TOKEN]) for char in seq]
# 编码所有序列
encoded_sequences = [encode_sequence(seq) for seq in sequences]
# 使用相同的随机种子确保数据划分一致
indices = np.arange(len(encoded_sequences))
train_indices, val_indices = train_test_split(indices, test_size=0.2, random_state=42)
train_seqs_full = [encoded_sequences[i] for i in train_indices]
val_seqs_full = [encoded_sequences[i] for i in val_indices]
chunk_size = 1000 # 每个子集包含的序列数
# 计算子集数量
num_chunks = (len(train_seqs_full) + chunk_size - 1) // chunk_size
print(f"Rank {rank}: 训练集将被分成 {num_chunks} 个子集。")
# 自定义数据集类
class ProteinDataset(Dataset):
def __init__(self, sequences, max_length=2048):
self.sequences = sequences
self.max_length = max_length
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
seq = self.sequences[idx]
max_seq_len = random.randint(100, self.max_length - 2)
seq = seq[:max_seq_len - 2]
seq_input = [vocab_to_idx[SOS_TOKEN]] + seq + [vocab_to_idx[EOS_TOKEN]]
seq_input += [vocab_to_idx[PAD_TOKEN]] * (self.max_length - len(seq_input))
return torch.tensor(seq_input)
def collate_fn(batch):
input_ids = torch.stack(batch)
labels = input_ids.clone()
return input_ids, labels
# 定义模型配置
config = TTTConfig(
vocab_size=vocab_size,
**TTT_STANDARD_CONFIGS['350m'],
max_position_embeddings=2048,
ttt_layer_type='linear',
ttt_base_lr=1.0,
pre_conv=False,
conv_kernel=4,
)
# 检查并加载模型
if os.path.isfile('ttt_model_complete.pth'):
print(f"进程 {rank}:检测到已保存的模型,加载模型...")
model = TTTForCausalLM(config)
checkpoint = torch.load('ttt_model_complete.pth', map_location=device)
model.load_state_dict(checkpoint['model'])
start_epoch = checkpoint.get('epoch', 0)
best_val_loss = checkpoint.get('loss', float('inf'))
print(f"加载模型,epoch {start_epoch},loss: {best_val_loss:.4f}")
else:
print(f"进程 {rank}:未检测到已保存的模型,初始化新模型...")
model = TTTForCausalLM(config)
start_epoch = 0
best_val_loss = float('inf')
model = model.to(device)
model = DDP(model, device_ids=[local_rank], output_device=local_rank)
# 定义优化器和学习率调度器
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)
# 定义混合精度训练的梯度缩放器
scaler = amp.GradScaler()
# Early stopping 和训练相关变量
early_stopping_patience = 3
early_stopping_counter = 0
train_losses = []
val_losses = []
num_epochs = 4
# 保存模型的函数
def save_model(epoch, model, loss, file_path='ttt_model_complete.pth'):
if rank == 0:
state = {
'epoch': epoch,
'model': model.state_dict(),
'loss': loss
}
torch.save(state, file_path)
print(f"模型已保存在 epoch {epoch},路径为: {file_path}")
# 训练循环
for epoch in range(start_epoch, num_epochs):
model.train()
epoch_loss = 0
# 分块处理训练数据
for chunk_idx in range(num_chunks):
start_idx = chunk_idx * chunk_size
end_idx = min((chunk_idx + 1) * chunk_size, len(train_seqs_full))
train_seqs = train_seqs_full[start_idx:end_idx]
print(f"Rank {rank}: 正在处理训练数据子集 {chunk_idx + 1}/{num_chunks}, 包含 {len(train_seqs)} 条序列。")
train_dataset = ProteinDataset(train_seqs)
train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank, shuffle=True)
train_loader = DataLoader(train_dataset, batch_size=2, sampler=train_sampler, collate_fn=collate_fn)
train_sampler.set_epoch(epoch + chunk_idx)
if rank == 0:
progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs} Chunk {chunk_idx + 1}/{num_chunks}", leave=False)
else:
progress_bar = train_loader
for input_ids, labels in progress_bar:
input_ids = input_ids.to(device)
labels = labels.to(device)
optimizer.zero_grad()
with amp.autocast(device_type='cuda'):
outputs = model(input_ids=input_ids, labels=labels)
loss = outputs.loss
scaler.scale(loss).backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
scaler.step(optimizer)
scaler.update()
epoch_loss += loss.item()
if rank == 0:
progress_bar.set_postfix(loss=f"{loss.item():.4f}")
avg_loss = epoch_loss / (len(train_seqs_full))
if rank == 0:
train_losses.append(avg_loss)
print(f"第 {epoch + 1}/{num_epochs} 轮,训练损失:{avg_loss:.4f}")
model.eval()
val_loss = 0
val_chunk_size = 1000
val_num_chunks = (len(val_seqs_full) + val_chunk_size - 1) // val_chunk_size
for val_chunk_idx in range(val_num_chunks):
val_start_idx = val_chunk_idx * val_chunk_size
val_end_idx = min((val_chunk_idx + 1) * val_chunk_size, len(val_seqs_full))
val_seqs = val_seqs_full[val_start_idx:val_end_idx]
print(f"Rank {rank}: 正在处理验证数据子集 {val_chunk_idx + 1}/{val_num_chunks}, 包含 {len(val_seqs)} 条序列。")
val_dataset = ProteinDataset(val_seqs)
val_sampler = DistributedSampler(val_dataset, num_replicas=world_size, rank=rank)
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, sampler=val_sampler, collate_fn=collate_fn)
with torch.no_grad():
for input_ids, labels in val_loader:
input_ids = input_ids.to(device)
labels = labels.to(device)
outputs = model(input_ids=input_ids, labels=labels)
loss = outputs.loss
val_loss += loss.item()
avg_val_loss = val_loss / len(val_seqs_full)
if rank == 0:
val_losses.append(avg_val_loss)
print(f"第 {epoch + 1}/{num_epochs} 轮,验证损失:{avg_val_loss:.4f}")
scheduler.step(avg_val_loss)
save_model(epoch + 1, model.module, avg_val_loss)
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
early_stopping_counter = 0
torch.save(model.module.state_dict(), 'ttt_language_model_best.pth')
print("模型已保存,验证损失降低。")
else:
early_stopping_counter += 1
if early_stopping_counter >= early_stopping_patience:
print(f"验证损失没有改善,提前停止训练。在 epoch {epoch + 1} 停止。")
break
# 加载最优模型进行预测
if rank == 0:
model.module.load_state_dict(torch.load('ttt_language_model_best.pth'))
model.module.to('cpu')
model.module.eval()
def generate_sequence(model, start_tokens, max_length=2048, temperature=1.0):
model.eval()
input_ids = torch.tensor(
[vocab_to_idx[SOS_TOKEN]] + [vocab_to_idx.get(token, vocab_to_idx[UNK_TOKEN]) for token in start_tokens],
device='cpu'
).unsqueeze(0)
generated_ids = model.generate(
input_ids=input_ids,
max_length=max_length,
temperature=temperature,
eos_token_id=vocab_to_idx[EOS_TOKEN],
pad_token_id=vocab_to_idx[PAD_TOKEN],
do_sample=True,
top_k=50,
top_p=0.95
)
generated_sequence = ''.join([idx_to_vocab[token_id.item()] for token_id in generated_ids[0][1:]])
return generated_sequence
start_tokens = random.choice(amino_acids)
generated_seq = generate_sequence(model.module, start_tokens)
print(f"以 '{start_tokens}' 开头生成的序列:{generated_seq}")
SLURM:
#!/bin/bash
#SBATCH -N 2
#SBATCH --gpus=8 # 申请8个GPU
#SBATCH --ntasks=8 # 总共 8 个任务
#SBATCH -t 7-00:00:00 # 设置最大运行时间为 7 天
#SBATCH -o output_%j.log # 输出文件(%j为作业号)
#SBATCH -e error_%j.log # 错误文件(%j为作业号)
conda activate name
export MASTER_ADDR=$(scontrol show hostname $SLURM_NODELIST | head -n 1)
export MASTER_PORT=12355 # 确保该端口号在所有节点都未被占用
export RANK=$SLURM_PROCID
export NCCL_SOCKET_IFNAME=bond0
export NCCL_BLOCKING_WAIT=1
export NCCL_DEBUG=INFO
export NCCL_TIMEOUT=1200
export WORLD_SIZE=8
# 执行训练脚本
torchrun --nnodes=2 --nproc_per_node=8 script.py
希望能在服务器上成功跑通吧(已崩溃,一个节点上的8个GPU内存不够,2个节点老是通信问题