De PyTorch DDP a Accelerate Trainer, dominio del entrenamiento distribuido con facilidad.

Desde PyTorch DDP hasta Accelerate Trainer, dominar el entrenamiento distribuido ahora es más fácil.

Visión general general

Este tutorial asume que tienes una comprensión básica de PyTorch y cómo entrenar un modelo simple. Mostrará el entrenamiento en múltiples GPU a través de un proceso llamado Paralelismo de Datos Distribuidos (DDP) a través de tres niveles diferentes de aumento de la abstracción:

  • DDP nativo de PyTorch a través del módulo pytorch.distributed
  • Utilizando el envoltorio ligero de 🤗 Accelerate alrededor de pytorch.distributed que también ayuda a asegurar que el código se pueda ejecutar en una sola GPU y TPUs sin cambios de código y con cambios mínimos de código al código original
  • Utilizando la API de alto nivel de Trainer de 🤗 Transformer, que abstrae todo el código de plantilla y admite diversos dispositivos y escenarios distribuidos

¿Qué es el entrenamiento “Distribuido” y por qué importa?

Toma algún código de entrenamiento muy básico de PyTorch a continuación, que configura y entrena un modelo en MNIST basado en el ejemplo oficial de MNIST

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

class BasicNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        self.act = F.relu

    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.act(self.fc1(x))
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

Definimos el dispositivo de entrenamiento ( cuda ):

device = "cuda"

Construimos algunos DataLoaders de PyTorch:

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307), (0.3081))
])

train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

Movemos el modelo al dispositivo CUDA:

model = BasicNet().to(device)

Construimos un optimizador de PyTorch:

optimizer = optim.AdamW(model.parameters(), lr=1e-3)

Antes de finalmente crear un bucle de entrenamiento y evaluación simplista que realiza una iteración completa sobre el conjunto de datos y calcula la precisión de la prueba:

model.train()
for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(device), target.to(device)
    output = model(data)
    loss = F.nll_loss(output, target)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

model.eval()
correct = 0
with torch.no_grad():
    for data, target in test_loader:
        output = model(data)
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Precisión: {100. * correct / len(test_loader.dataset)}')

Típicamente, a partir de aquí, uno podría simplemente poner todo esto en un script de Python o ejecutarlo en un Jupyter Notebook.

Sin embargo, ¿cómo se ejecutaría este script en, por ejemplo, dos GPUs o en varias máquinas si estos recursos están disponibles, lo cual podría mejorar la velocidad de entrenamiento a través del entrenamiento distribuido? Simplemente hacer python myscript.py solo ejecutaría el script usando una sola GPU. Aquí es donde entra en juego torch.distributed

Paralelismo de Datos Distribuidos de PyTorch

Como su nombre lo indica, torch.distributed está diseñado para funcionar en configuraciones distribuidas. Esto puede incluir una configuración de múltiples nodos, donde tiene varias máquinas cada una con una sola GPU, o múltiples GPU donde un solo sistema tiene múltiples GPU, o alguna combinación de ambos.

Para convertir nuestro código anterior para que funcione dentro de una configuración distribuida, primero se deben definir algunas configuraciones de configuración, detalladas en el Tutorial de inicio rápido de DDP

Primero, se debe declarar una función de setup y una función de cleanup. Esto abrirá un grupo de procesamiento a través del cual todos los procesos de cálculo pueden comunicarse

Nota: para esta sección del tutorial, se debe asumir que estos se envían en archivos de script de Python. Más adelante se discutirá un lanzador que utiliza Accelerate que elimina esta necesidad

import os
import torch.distributed as dist

def setup(rank, world_size):
    "Configura el grupo de procesos y la configuración para el paralelismo de datos distribuido de PyTorch"
    os.environ["MASTER_ADDR"] = 'localhost'
    os.environ["MASTER_PORT"] = "12355"

    # Inicializa el grupo de procesos
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    "Limpia el entorno distribuido"
    dist.destroy_process_group()

La última pieza del rompecabezas es cómo enviar mis datos y mi modelo a otra GPU.

Aquí es donde entra en juego el módulo DistributedDataParallel. Copiará su modelo en cada GPU, y cuando se llame a loss.backward(), se realizará la retropropagación y los gradientes resultantes en todas estas copias del modelo se promediarán/reducirán. Esto garantiza que cada dispositivo tenga los mismos pesos después del paso del optimizador.

A continuación se muestra un ejemplo de nuestra configuración de entrenamiento, refactorizada como una función, con esta capacidad:

Nota: Aquí, el rango es la clasificación general de la GPU actual en comparación con todas las demás GPU disponibles, lo que significa que tienen una clasificación de 0 -> n-1

from torch.nn.parallel import DistributedDataParallel as DDP

def train(model, rank, world_size):
    setup(rank, world_size)
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
    # Entrenar durante una época
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    cleanup()

El optimizador debe declararse en función del modelo en el dispositivo específico (por lo tanto, ddp_model y no model) para que todos los gradientes se calculen correctamente.

Por último, para ejecutar el script, PyTorch tiene un práctico módulo de línea de comandos torchrun que puede ayudar. Simplemente pase el número de nodos que debe utilizar y el script que se ejecutará y listo:

torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py

Lo anterior ejecutará el script de entrenamiento en dos GPU que se encuentran en una sola máquina y esta es la base para realizar solo el entrenamiento distribuido con PyTorch.

Ahora hablemos de Accelerate, una biblioteca diseñada para hacer este proceso más sencillo y también ayudar con algunas mejores prácticas

🤗 Accelerate

Accelerate es una biblioteca diseñada para permitirle realizar lo que acabamos de hacer anteriormente, sin necesidad de modificar mucho su código. Además, la canalización de datos inherente a Accelerate también puede mejorar el rendimiento de su código.

Primero, envolvamos todo el código anterior que acabamos de realizar en una sola función, para ayudarnos a visualizar la diferencia:

def train_ddp(rank, world_size):
    setup(rank, world_size)
    # Construir DataLoaders
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    # Construir modelo
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    # Construir optimizador
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)

    # Entrenar durante una sola época
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    
    # Evaluar
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Precisión: {100. * correct / len(test_loader.dataset)}')

A continuación, hablemos de cómo Accelerate puede ayudar. Hay algunos problemas con el código anterior:

  1. Esto es ligeramente ineficiente, dado que se crean n dataloaders basados en cada dispositivo y se empujan.
  2. Este código solo funcionará para multi-GPU, por lo que se debe tener especial cuidado para que se ejecute en un solo nodo nuevamente, o en TPU.

Accelerate resuelve esto a través de la clase Accelerator. A través de ella, el código permanece en gran medida igual, excepto por tres líneas de código cuando se compara un solo nodo con multinode, como se muestra a continuación:

def train_ddp_accelerate():
    accelerator = Accelerator()
    # Construir DataLoaders
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    # Construir modelo
    model = BasicModel()

    # Construir optimizador
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)

    # Enviar todo a través de `accelerator.prepare`
    train_loader, test_loader, model, optimizer = accelerator.prepare(
        train_loader, test_loader, model, optimizer
    )

    # Entrenar durante un solo epoch
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        output = model(data)
        loss = F.nll_loss(output, target)
        accelerator.backward(loss)
        optimizer.step()
        optimizer.zero_grad()

    # Evaluar
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Precisión: {100. * correct / len(test_loader.dataset)}')

Con esto, tu bucle de entrenamiento de PyTorch está configurado para ejecutarse en cualquier entorno distribuido gracias al objeto Accelerator. Este código aún se puede ejecutar a través de la CLI de torchrun o a través de la interfaz de línea de comandos propia de Accelerate, accelerate launch.

Como resultado, se simplifica realizar entrenamiento distribuido con Accelerate y mantener la mayor parte del código básico de PyTorch igual que sea posible.

Anteriormente se mencionó que Accelerate también hace que los DataLoaders sean más eficientes. Esto se logra a través de muestreadores personalizados que pueden enviar partes de los lotes automáticamente a diferentes dispositivos durante el entrenamiento, lo que permite que en un momento dado solo se conozca una única copia de los datos, en lugar de cuatro a la vez en memoria según la configuración. Además, solo hay una copia completa única del conjunto de datos original en memoria. Los subconjuntos de este conjunto de datos se dividen entre todos los nodos que se utilizan para el entrenamiento, lo que permite entrenar conjuntos de datos mucho más grandes en una sola instancia sin un aumento explosivo en la memoria utilizada.

Usando el notebook_launcher

Anteriormente se mencionó que se puede iniciar código distribuido directamente desde tu Jupyter Notebook. Esto se logra mediante la utilidad notebook_launcher de Accelerate, que permite iniciar entrenamiento multi-GPU basado en código dentro de un Jupyter Notebook.

Usarlo es tan sencillo como importar el lanzador:

from accelerate import notebook_launcher

Y pasar la función de entrenamiento que declaramos anteriormente, cualquier argumento que se vaya a pasar y el número de procesos a utilizar (como 8 en un TPU o 2 para dos GPUs). Ambas funciones de entrenamiento anteriores se pueden ejecutar, pero ten en cuenta que después de iniciar un solo lanzamiento, la instancia debe reiniciarse antes de crear otro.

notebook_launcher(train_ddp, args=(), num_processes=2)

O:

notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)

Usando 🤗 Trainer

Finalmente, llegamos al nivel más alto de la API: el Hugging Face Trainer.

Esto envuelve la mayor cantidad de entrenamiento posible mientras aún se puede entrenar en sistemas distribuidos sin que el usuario tenga que hacer nada en absoluto.

Primero necesitamos importar el Trainer:

from transformers import Trainer

Luego definimos algunos TrainingArguments para controlar todos los hiperparámetros habituales. El entrenador también funciona a través de diccionarios, por lo que se debe crear una función de agrupamiento personalizada.

Finalmente, subclasificamos el entrenador y escribimos nuestro propio compute_loss.

Después, este código también funcionará en una configuración distribuida sin necesidad de escribir ningún código de entrenamiento.

from transformers import Trainer, TrainingArguments

model = BasicNet()

training_args = TrainingArguments(
    "basic-trainer",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    num_train_epochs=1,
    evaluation_strategy="epoch",
    remove_unused_columns=False
)

def collate_fn(examples):
    pixel_values = torch.stack([example[0] for example in examples])
    labels = torch.tensor([example[1] for example in examples])
    return {"x":pixel_values, "labels":labels}

class MyTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(inputs["x"])
        target = inputs["labels"]
        loss = F.nll_loss(outputs, target)
        return (loss, outputs) if return_outputs else loss

trainer = MyTrainer(
    model,
    training_args,
    train_dataset=train_dset,
    eval_dataset=test_dset,
    data_collator=collate_fn,
)

trainer.train()

    ***** Ejecutando entrenamiento *****
      Num ejemplos = 60000
      Num épocas = 1
      Tamaño de lote de entrenamiento por dispositivo = 64
      Tamaño de lote de evaluación por dispositivo = 64
      Total pasos de optimización = 938

De manera similar a los ejemplos anteriores con el notebook_launcher, esto se puede hacer nuevamente aquí arrojándolo todo en una función de entrenamiento:

def train_trainer_ddp():
    model = BasicNet()

    training_args = TrainingArguments(
        "basic-trainer",
        per_device_train_batch_size=64,
        per_device_eval_batch_size=64,
        num_train_epochs=1,
        evaluation_strategy="epoch",
        remove_unused_columns=False
    )

    def collate_fn(examples):
        pixel_values = torch.stack([example[0] for example in examples])
        labels = torch.tensor([example[1] for example in examples])
        return {"x":pixel_values, "labels":labels}

    class MyTrainer(Trainer):
        def compute_loss(self, model, inputs, return_outputs=False):
            outputs = model(inputs["x"])
            target = inputs["labels"]
            loss = F.nll_loss(outputs, target)
            return (loss, outputs) if return_outputs else loss

    trainer = MyTrainer(
        model,
        training_args,
        train_dataset=train_dset,
        eval_dataset=test_dset,
        data_collator=collate_fn,
    )

    trainer.train()

notebook_launcher(train_trainer_ddp, args=(), num_processes=2)

Recursos

Para obtener más información sobre el paralelismo distribuido de datos de PyTorch, consulte la documentación aquí

Para obtener más información sobre 🤗 Accelerate, consulte la documentación aquí

Para obtener más información sobre 🤗 Transformers, consulte la documentación aquí

We will continue to update Zepes; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

Ciencia de Datos

Haz que cada dólar de marketing cuente con la ciencia de datos.

La economía actual nos exige ser más diligentes en el gasto publicitario. Afortunadamente, los caminos viables para u...

Inteligencia Artificial

Permite un entrenamiento más rápido con la biblioteca de paralelismo de datos de Amazon SageMaker

El entrenamiento de modelos de lenguaje de gran tamaño (LLM, por sus siglas en inglés) se ha vuelto cada vez más popu...

Inteligencia Artificial

Investigadores de CMU proponen GILL un método de IA para fusionar LLMs con modelos de codificador y decodificador de imágenes

Con el lanzamiento del nuevo GPT 4 de OpenAI, se ha introducido la multimodalidad en los Modelos de Lenguaje Grandes....

Inteligencia Artificial

El cucaracha cibernético puede navegar por un laberinto

Los investigadores han desarrollado un método para crear cucarachas ciborg para ser utilizadas en misiones de búsqued...

Inteligencia Artificial

Este boletín de inteligencia artificial es todo lo que necesitas #59

Esta semana los cambios en los términos de servicio de Zoom (desde marzo) se pusieron en foco después de los temores ...