De PyTorch DDP a Accelerate Trainer, dominio del entrenamiento distribuido con facilidad.
Desde PyTorch DDP hasta Accelerate Trainer, dominar el entrenamiento distribuido ahora es más fácil.
Visión general general
Este tutorial asume que tienes una comprensión básica de PyTorch y cómo entrenar un modelo simple. Mostrará el entrenamiento en múltiples GPU a través de un proceso llamado Paralelismo de Datos Distribuidos (DDP) a través de tres niveles diferentes de aumento de la abstracción:
- DDP nativo de PyTorch a través del módulo
pytorch.distributed
- Utilizando el envoltorio ligero de 🤗 Accelerate alrededor de
pytorch.distributed
que también ayuda a asegurar que el código se pueda ejecutar en una sola GPU y TPUs sin cambios de código y con cambios mínimos de código al código original - Utilizando la API de alto nivel de Trainer de 🤗 Transformer, que abstrae todo el código de plantilla y admite diversos dispositivos y escenarios distribuidos
¿Qué es el entrenamiento “Distribuido” y por qué importa?
Toma algún código de entrenamiento muy básico de PyTorch a continuación, que configura y entrena un modelo en MNIST basado en el ejemplo oficial de MNIST
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
class BasicNet(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
self.act = F.relu
def forward(self, x):
x = self.act(self.conv1(x))
x = self.act(self.conv2(x))
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.act(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
Definimos el dispositivo de entrenamiento ( cuda
):
device = "cuda"
Construimos algunos DataLoaders de PyTorch:
- Evaluando el sesgo del modelo de lenguaje con 🤗 Evaluate
- Acelere sus modelos con 🤗 Optimum Intel y OpenVINO
- Presentando nuestra nueva estructura de precios
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
Movemos el modelo al dispositivo CUDA:
model = BasicNet().to(device)
Construimos un optimizador de PyTorch:
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
Antes de finalmente crear un bucle de entrenamiento y evaluación simplista que realiza una iteración completa sobre el conjunto de datos y calcula la precisión de la prueba:
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Precisión: {100. * correct / len(test_loader.dataset)}')
Típicamente, a partir de aquí, uno podría simplemente poner todo esto en un script de Python o ejecutarlo en un Jupyter Notebook.
Sin embargo, ¿cómo se ejecutaría este script en, por ejemplo, dos GPUs o en varias máquinas si estos recursos están disponibles, lo cual podría mejorar la velocidad de entrenamiento a través del entrenamiento distribuido? Simplemente hacer python myscript.py
solo ejecutaría el script usando una sola GPU. Aquí es donde entra en juego torch.distributed
Paralelismo de Datos Distribuidos de PyTorch
Como su nombre lo indica, torch.distributed
está diseñado para funcionar en configuraciones distribuidas. Esto puede incluir una configuración de múltiples nodos, donde tiene varias máquinas cada una con una sola GPU, o múltiples GPU donde un solo sistema tiene múltiples GPU, o alguna combinación de ambos.
Para convertir nuestro código anterior para que funcione dentro de una configuración distribuida, primero se deben definir algunas configuraciones de configuración, detalladas en el Tutorial de inicio rápido de DDP
Primero, se debe declarar una función de setup
y una función de cleanup
. Esto abrirá un grupo de procesamiento a través del cual todos los procesos de cálculo pueden comunicarse
Nota: para esta sección del tutorial, se debe asumir que estos se envían en archivos de script de Python. Más adelante se discutirá un lanzador que utiliza Accelerate que elimina esta necesidad
import os
import torch.distributed as dist
def setup(rank, world_size):
"Configura el grupo de procesos y la configuración para el paralelismo de datos distribuido de PyTorch"
os.environ["MASTER_ADDR"] = 'localhost'
os.environ["MASTER_PORT"] = "12355"
# Inicializa el grupo de procesos
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
"Limpia el entorno distribuido"
dist.destroy_process_group()
La última pieza del rompecabezas es cómo enviar mis datos y mi modelo a otra GPU.
Aquí es donde entra en juego el módulo DistributedDataParallel
. Copiará su modelo en cada GPU, y cuando se llame a loss.backward()
, se realizará la retropropagación y los gradientes resultantes en todas estas copias del modelo se promediarán/reducirán. Esto garantiza que cada dispositivo tenga los mismos pesos después del paso del optimizador.
A continuación se muestra un ejemplo de nuestra configuración de entrenamiento, refactorizada como una función, con esta capacidad:
Nota: Aquí, el rango es la clasificación general de la GPU actual en comparación con todas las demás GPU disponibles, lo que significa que tienen una clasificación de
0 -> n-1
from torch.nn.parallel import DistributedDataParallel as DDP
def train(model, rank, world_size):
setup(rank, world_size)
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
# Entrenar durante una época
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
cleanup()
El optimizador debe declararse en función del modelo en el dispositivo específico (por lo tanto, ddp_model
y no model
) para que todos los gradientes se calculen correctamente.
Por último, para ejecutar el script, PyTorch tiene un práctico módulo de línea de comandos torchrun
que puede ayudar. Simplemente pase el número de nodos que debe utilizar y el script que se ejecutará y listo:
torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py
Lo anterior ejecutará el script de entrenamiento en dos GPU que se encuentran en una sola máquina y esta es la base para realizar solo el entrenamiento distribuido con PyTorch.
Ahora hablemos de Accelerate, una biblioteca diseñada para hacer este proceso más sencillo y también ayudar con algunas mejores prácticas
🤗 Accelerate
Accelerate es una biblioteca diseñada para permitirle realizar lo que acabamos de hacer anteriormente, sin necesidad de modificar mucho su código. Además, la canalización de datos inherente a Accelerate también puede mejorar el rendimiento de su código.
Primero, envolvamos todo el código anterior que acabamos de realizar en una sola función, para ayudarnos a visualizar la diferencia:
def train_ddp(rank, world_size):
setup(rank, world_size)
# Construir DataLoaders
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
# Construir modelo
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
# Construir optimizador
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
# Entrenar durante una sola época
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
# Evaluar
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Precisión: {100. * correct / len(test_loader.dataset)}')
A continuación, hablemos de cómo Accelerate puede ayudar. Hay algunos problemas con el código anterior:
- Esto es ligeramente ineficiente, dado que se crean
n
dataloaders basados en cada dispositivo y se empujan. - Este código solo funcionará para multi-GPU, por lo que se debe tener especial cuidado para que se ejecute en un solo nodo nuevamente, o en TPU.
Accelerate resuelve esto a través de la clase Accelerator
. A través de ella, el código permanece en gran medida igual, excepto por tres líneas de código cuando se compara un solo nodo con multinode, como se muestra a continuación:
def train_ddp_accelerate():
accelerator = Accelerator()
# Construir DataLoaders
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
# Construir modelo
model = BasicModel()
# Construir optimizador
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
# Enviar todo a través de `accelerator.prepare`
train_loader, test_loader, model, optimizer = accelerator.prepare(
train_loader, test_loader, model, optimizer
)
# Entrenar durante un solo epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
output = model(data)
loss = F.nll_loss(output, target)
accelerator.backward(loss)
optimizer.step()
optimizer.zero_grad()
# Evaluar
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Precisión: {100. * correct / len(test_loader.dataset)}')
Con esto, tu bucle de entrenamiento de PyTorch está configurado para ejecutarse en cualquier entorno distribuido gracias al objeto Accelerator
. Este código aún se puede ejecutar a través de la CLI de torchrun
o a través de la interfaz de línea de comandos propia de Accelerate, accelerate launch
.
Como resultado, se simplifica realizar entrenamiento distribuido con Accelerate y mantener la mayor parte del código básico de PyTorch igual que sea posible.
Anteriormente se mencionó que Accelerate también hace que los DataLoaders sean más eficientes. Esto se logra a través de muestreadores personalizados que pueden enviar partes de los lotes automáticamente a diferentes dispositivos durante el entrenamiento, lo que permite que en un momento dado solo se conozca una única copia de los datos, en lugar de cuatro a la vez en memoria según la configuración. Además, solo hay una copia completa única del conjunto de datos original en memoria. Los subconjuntos de este conjunto de datos se dividen entre todos los nodos que se utilizan para el entrenamiento, lo que permite entrenar conjuntos de datos mucho más grandes en una sola instancia sin un aumento explosivo en la memoria utilizada.
Usando el notebook_launcher
Anteriormente se mencionó que se puede iniciar código distribuido directamente desde tu Jupyter Notebook. Esto se logra mediante la utilidad notebook_launcher
de Accelerate, que permite iniciar entrenamiento multi-GPU basado en código dentro de un Jupyter Notebook.
Usarlo es tan sencillo como importar el lanzador:
from accelerate import notebook_launcher
Y pasar la función de entrenamiento que declaramos anteriormente, cualquier argumento que se vaya a pasar y el número de procesos a utilizar (como 8 en un TPU o 2 para dos GPUs). Ambas funciones de entrenamiento anteriores se pueden ejecutar, pero ten en cuenta que después de iniciar un solo lanzamiento, la instancia debe reiniciarse antes de crear otro.
notebook_launcher(train_ddp, args=(), num_processes=2)
O:
notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)
Usando 🤗 Trainer
Finalmente, llegamos al nivel más alto de la API: el Hugging Face Trainer.
Esto envuelve la mayor cantidad de entrenamiento posible mientras aún se puede entrenar en sistemas distribuidos sin que el usuario tenga que hacer nada en absoluto.
Primero necesitamos importar el Trainer:
from transformers import Trainer
Luego definimos algunos TrainingArguments
para controlar todos los hiperparámetros habituales. El entrenador también funciona a través de diccionarios, por lo que se debe crear una función de agrupamiento personalizada.
Finalmente, subclasificamos el entrenador y escribimos nuestro propio compute_loss
.
Después, este código también funcionará en una configuración distribuida sin necesidad de escribir ningún código de entrenamiento.
from transformers import Trainer, TrainingArguments
model = BasicNet()
training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)
def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
target = inputs["labels"]
loss = F.nll_loss(outputs, target)
return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)
trainer.train()
***** Ejecutando entrenamiento *****
Num ejemplos = 60000
Num épocas = 1
Tamaño de lote de entrenamiento por dispositivo = 64
Tamaño de lote de evaluación por dispositivo = 64
Total pasos de optimización = 938
De manera similar a los ejemplos anteriores con el notebook_launcher
, esto se puede hacer nuevamente aquí arrojándolo todo en una función de entrenamiento:
def train_trainer_ddp():
model = BasicNet()
training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)
def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
target = inputs["labels"]
loss = F.nll_loss(outputs, target)
return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)
trainer.train()
notebook_launcher(train_trainer_ddp, args=(), num_processes=2)
Recursos
Para obtener más información sobre el paralelismo distribuido de datos de PyTorch, consulte la documentación aquí
Para obtener más información sobre 🤗 Accelerate, consulte la documentación aquí
Para obtener más información sobre 🤗 Transformers, consulte la documentación aquí
We will continue to update Zepes; if you have any questions or suggestions, please contact us!
Was this article helpful?
93 out of 132 found this helpful
Related articles
- Análisis de sentimientos en datos cifrados con cifrado homomórfico
- Demostraciones de Aprendizaje Automático de Hugging Face en arXiv
- VQ-Diffusion
- Pronóstico de series de tiempo probabilísticas con 🤗 Transformers
- Usando la Difusión Estable con Core ML en Apple Silicon
- De GPT2 a Stable Diffusion Hugging Face llega a la comunidad de Elixir
- Ilustrando el Aprendizaje por Reforzamiento a través de la Retroalimentación Humana (RLHF)