Opacus
  • 简介
  • 常见问题
  • 教程
  • API 参考
  • GitHub

›

教程

  • 概览

使用 Opacus

  • 使用快速梯度裁剪 DP-SGD 构建文本分类器
  • 使用差分隐私构建图像分类器
  • 训练用于姓名分类的差分隐私 LSTM 模型
  • 深入了解 Opacus 的高级功能
  • 模块验证器 (Module Validator) 和修复器 (Fixer) 指南
  • 梯度采样器 (Grad Samplers) 指南
  • 使用 DistributedDataParallel 进行多 GPU 训练

使用 Distributed Data Parallel 在多 GPU 上通过 Opacus 进行训练¶

在本教程中,我们将介绍在分布式模型训练流水线中开始使用 Opacus 所需了解的基础知识。随着最先进的模型和数据集变得越来越大,多 GPU 训练已成为常态,Opacus 提供了对分布式数据并行 (DDP) 的无缝开箱即用支持。

本教程需要具备 Opacus 和 DDP 的基础知识。如果您是第一次接触这些工具,建议从以下教程开始:使用差分隐私构建图像分类器 以及 分布式数据并行入门

在第 1 章中,我们将从一个最小可行示例开始,演示为了让 Opacus 在分布式设置中工作具体需要做什么。这应该足以让您应对大多数常见场景。

在第 2 章和第 3 章中,我们将深入了解其实现并讨论技术细节。我们将看到隐私 DDP 与常规 DDP 之间的区别,以及为什么我们需要引入这些区别。

第 0 章:准备工作¶

在开始之前,有几件事我们需要提及。

首先,本教程编写环境为至少拥有 2 个 GPU 的单台 Linux 机器。对于 Windows 环境和/或多节点训练,基本原理保持不变,但您需要稍微修改 DDP 代码才能使其正常工作。

其次,众所周知 Jupyter notebooks 不支持 DDP 训练。在整个教程中,我们将使用 %%writefile 魔法命令将代码写入独立文件,稍后通过终端执行。这些文件将在本笔记本的最后一个单元格中被清理。

第 1 章:入门指南¶

首先,让我们初始化分布式环境

输入 [1]
%%writefile opacus_ddp_demo.py
import os
import torch.distributed as dist
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()
Overwriting opacus_ddp_demo.py

我们将使用 MNIST 作为示例,因此让我们也初始化一个简单的卷积网络并下载数据集

输入 [2]
%%writefile -a opacus_ddp_demo.py

import torch.nn as nn
import torch.nn.functional as F

class SampleConvNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 16, 8, 2, padding=3)
        self.conv2 = nn.Conv2d(16, 32, 4, 2)
        self.fc1 = nn.Linear(32 * 4 * 4, 32)
        self.fc2 = nn.Linear(32, 10)

    def forward(self, x):
        # x of shape [B, 1, 28, 28]
        x = F.relu(self.conv1(x))  # -> [B, 16, 14, 14]
        x = F.max_pool2d(x, 2, 1)  # -> [B, 16, 13, 13]
        x = F.relu(self.conv2(x))  # -> [B, 32, 5, 5]
        x = F.max_pool2d(x, 2, 1)  # -> [B, 32, 4, 4]
        x = x.view(-1, 32 * 4 * 4)  # -> [B, 512]
        x = F.relu(self.fc1(x))  # -> [B, 32]
        x = self.fc2(x)  # -> [B, 10]
        return x
Appending to opacus_ddp_demo.py
输入 [3]
%%writefile -a opacus_ddp_demo.py

from torchvision import datasets, transforms

# Precomputed characteristics of the MNIST dataset
MNIST_MEAN = 0.1307
MNIST_STD = 0.3081

DATA_ROOT = "./mnist"

mnist_train_ds = datasets.MNIST(
    DATA_ROOT,
    train=True,
    download=True,
    transform=transforms.Compose(
        [
            transforms.ToTensor(),
            transforms.Normalize((MNIST_MEAN,), (MNIST_STD,)),
        ]
    ),
)

mnist_test_ds = datasets.MNIST(
    DATA_ROOT,
    train=False,
    download=True,
    transform=transforms.Compose(
        [
            transforms.ToTensor(),
            transforms.Normalize((MNIST_MEAN,), (MNIST_STD,)),
        ]
    ),
)
Appending to opacus_ddp_demo.py

接下来是关键部分——也是与非隐私 DDP 唯一的不同之处。

首先,我们不使用 DistributedDataParallel 包装模型,而是使用 opacus.distributed 包中的 DifferentiallyPrivateDistributedDataParallel 进行包装。就这么简单。

第二个区别出现在初始化 DataLoader 时。通常在分布式训练中,您会初始化特定于分布式设置的数据加载器。这会影响两个参数:

  • Batch size 表示每个 GPU 的批次大小。也就是说,您的逻辑批次大小(对收敛起作用的大小)等于 local_batch_size * num_gpus。
  • 您需要指定 sampler=DistributedSampler(dataset) 以便将训练数据集分发到各个 GPU 上。

使用 Opacus,您不需要做这两件事。make_private 方法期望用户提供的 DataLoader 是非分布式的,就像您在单个 GPU 上训练时初始化的那样。

下面的代码通过注释掉需要替换或删除的代码行,重点展示了您需要对普通 DDP 训练流水线进行的更改。

输入 [4]
%%writefile -a opacus_ddp_demo.py

import torch
import torch.optim as optim
from torch.utils.data import DataLoader, DistributedSampler

from opacus.distributed import DifferentiallyPrivateDistributedDataParallel as DPDDP
from torch.nn.parallel import DistributedDataParallel as DDP

from opacus import PrivacyEngine

LR = 0.1
BATCH_SIZE = 200
N_GPUS = torch.cuda.device_count()

def init_training(rank):
    model = SampleConvNet()
    #model = DDP(model) -- non-private
    model = DPDDP(model)

    optimizer = optim.SGD(model.parameters(), lr=LR, momentum=0)
    data_loader = DataLoader(
        mnist_train_ds,
        #batch_size=BATCH_SIZE // N_GPUS, -- non-private
        batch_size=BATCH_SIZE,
        #sampler=DistributedSampler(mnist_train_ds) -- non-private
    )

    if rank == 0:
        logger.info(
            f"(rank {rank}) Initialized model ({type(model).__name__}), "
            f"optimizer ({type(optimizer).__name__}), "
            f"data loader ({type(data_loader).__name__}, len={len(data_loader)})"
        )

    privacy_engine = PrivacyEngine()

    # PrivacyEngine looks at the model's class and enables
    # distributed processing if it's wrapped with DPDDP
    model, optimizer, data_loader = privacy_engine.make_private(
        module=model,
        optimizer=optimizer,
        data_loader=data_loader,
        noise_multiplier=1.,
        max_grad_norm=1.,
    )

    if rank == 0:
        logger.info(
            f"(rank {rank}) After privatization: model ({type(model).__name__}), "
            f"optimizer ({type(optimizer).__name__}), "
            f"data loader ({type(data_loader).__name__}, len={len(data_loader)})"
        )

    logger.info(f"(rank {rank}) Average batch size per GPU: {int(optimizer.expected_batch_size)}")

    return model, optimizer, data_loader, privacy_engine
Appending to opacus_ddp_demo.py

现在我们只需要定义训练循环并启动它。

输入 [5]
%%writefile -a opacus_ddp_demo.py

import numpy as np

def test(model, device):
    test_loader = DataLoader(
        mnist_test_ds,
        batch_size=BATCH_SIZE,
    )

    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(
                dim=1, keepdim=True
            )
            correct += pred.eq(target.view_as(pred)).sum().item()

    model.train()
    return correct / len(mnist_test_ds)

def launch(rank, world_size, epochs):
    setup(rank, world_size)
    criterion = nn.CrossEntropyLoss()

    model, optimizer, data_loader, privacy_engine = init_training(rank)
    model.to(rank)
    model.train()


    for e in range(epochs):
        losses = []
        correct = 0
        total = 0

        for data, target in data_loader:
            data, target = data.to(rank), target.to(rank)
            optimizer.zero_grad()
            output = model(data)

            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
            total += len(data)

            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            losses.append(loss.item())

        test_accuracy = test(model, rank)
        train_accuracy = correct / total
        epsilon = privacy_engine.get_epsilon(delta=1e-5)

        if rank == 0:
            print(
                f"Epoch: {e} \t"
                f"Train Loss: {np.mean(losses):.4f} | "
                f"Train Accuracy: {train_accuracy:.2f} | "
                f"Test Accuracy: {test_accuracy:.2f} |"
                f"(ε = {epsilon:.2f})"
            )

    cleanup()
Appending to opacus_ddp_demo.py
输入 [6]
%%writefile -a opacus_ddp_demo.py

import torch.multiprocessing as mp

EPOCHS = 10
world_size = torch.cuda.device_count()

if __name__ == '__main__':
    mp.spawn(
        launch,
        args=(world_size,EPOCHS,),
        nprocs=world_size,
        join=True
    )
Appending to opacus_ddp_demo.py

最后,运行脚本。请注意,我们初始化的 DataLoader 的 batch_size=200,这相当于完整数据集(60000 张图像)上的 300 个批次。

在每个 worker 上将其传递给 make_private 后,我们得到了每个 batch_size=100 的数据加载器,但每个数据加载器仍然会遍历 300 个批次。

输入 [7]
!python -W ignore opacus_ddp_demo.py
05/13/2022 11:13:16:INFO:(rank 0) Initialized model (DifferentiallyPrivateDistributedDataParallel), optimizer (SGD), data loader (DataLoader, len=300)
05/13/2022 11:13:16:INFO:(rank 1) Average batch size per GPU: 100
05/13/2022 11:13:16:INFO:(rank 0) After privatization: model (GradSampleModule), optimizer (DistributedDPOptimizer), data loader (DPDataLoader, len=300)
05/13/2022 11:13:16:INFO:(rank 0) Average batch size per GPU: 100
Epoch: 0 	Train Loss: 1.5412 | Train Accuracy: 0.57 | Test Accuracy: 0.73 |(ε = 0.87)
Epoch: 1 	Train Loss: 0.6717 | Train Accuracy: 0.79 | Test Accuracy: 0.83 |(ε = 0.91)
Epoch: 2 	Train Loss: 0.5659 | Train Accuracy: 0.85 | Test Accuracy: 0.86 |(ε = 0.96)
Epoch: 3 	Train Loss: 0.5347 | Train Accuracy: 0.87 | Test Accuracy: 0.88 |(ε = 1.00)
Epoch: 4 	Train Loss: 0.5178 | Train Accuracy: 0.88 | Test Accuracy: 0.90 |(ε = 1.03)
Epoch: 5 	Train Loss: 0.4750 | Train Accuracy: 0.90 | Test Accuracy: 0.91 |(ε = 1.07)
Epoch: 6 	Train Loss: 0.4502 | Train Accuracy: 0.90 | Test Accuracy: 0.91 |(ε = 1.11)
Epoch: 7 	Train Loss: 0.4358 | Train Accuracy: 0.91 | Test Accuracy: 0.92 |(ε = 1.14)
Epoch: 8 	Train Loss: 0.4186 | Train Accuracy: 0.92 | Test Accuracy: 0.92 |(ε = 1.18)
Epoch: 9 	Train Loss: 0.4129 | Train Accuracy: 0.92 | Test Accuracy: 0.93 |(ε = 1.21)

第 2 章:数据和分布式采样器¶

注意:以下两章将讨论 Opacus 的高级用法及其实现细节。我们强烈建议在继续之前阅读 Opacus 高级功能 教程。

现在让我们查看 make_private 方法内部,看看它为了实现 DDP 处理做了什么。我们将从对 DataLoader 的修改开始。

提醒一下,DPDataLoader 与常规 DataLoader 仅在一个方面不同——它使用均匀有放回随机采样器(又称“泊松采样”)进行数据采样。这意味着我们不再使用固定的批次大小,而是使用采样率:即每个样本被包含在下一个批次中的概率。

现在让我们初始化常规数据加载器,然后将其转换为 DPDataLoader。这正是我们在 make_private() 方法中所做的。

下面我们将初始化三个数据加载器:

  • 非分布式的
  • 分布式的,非隐私的
  • 分布式的,隐私的(使用泊松采样)

这三个加载器的初始化都使得逻辑批次大小为 64。

请注意,仅当 poisson_sampling 设置为 True(默认值)时,make_private 方法才会使用 DPDataLoader。如果在利用 DPDPP 时将 poisson_sampling 设置为 False,那么我们需要像非隐私情况一样,显式地向 DataLoader 提供 DistributedSampler 且将 batch_size 设为 BATCH_SIZE // world_size。

输入 [8]
%%writefile opacus_distributed_data_loader_demo.py

from opacus_ddp_demo import setup, cleanup, mnist_train_ds
import logging
from torch.utils.data import DataLoader, DistributedSampler
from opacus.data_loader import DPDataLoader

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

BATCH_SIZE = 64

def init_data(rank, world_size):
    setup(rank, world_size)

    non_distributed_dl = DataLoader(
        mnist_train_ds,
        batch_size=BATCH_SIZE
    )

    distributed_non_private_dl = DataLoader(
        mnist_train_ds,
        batch_size=BATCH_SIZE // world_size,
        sampler=DistributedSampler(mnist_train_ds),
    )

    private_dl = DPDataLoader.from_data_loader(non_distributed_dl, distributed=True)

    if rank == 0:
        logger.info(
            f"(rank {rank}) Non-distributed non-private data loader. "
            f"#batches: {len(non_distributed_dl)}, "
            f"#data points: {len(non_distributed_dl.sampler)}, "
            f"batch_size: {non_distributed_dl.batch_size}"
        )

        logger.info(
            f"(rank {rank}) Distributed, non-private data loader. "
            f"#batches: {len(distributed_non_private_dl)}, "
            f"#data points: {len(distributed_non_private_dl.sampler)}, "
            f"batch_size: {distributed_non_private_dl.batch_size}"
        )

        logger.info(
            f"(rank {rank}) Distributed, private data loader. "
            f"#batches: {len(private_dl)}, "
            f"#data points: {private_dl.batch_sampler.num_samples}, "
            f"sample_rate: {private_dl.sample_rate:4f}, "
            f"avg batch_size (=sample_rate*num_data_points): {int(private_dl.sample_rate*private_dl.batch_sampler.num_samples)}"
        )
Writing opacus_distributed_data_loader_demo.py
输入 [9]
%%writefile -a opacus_distributed_data_loader_demo.py

import torch
import torch.multiprocessing as mp

world_size = torch.cuda.device_count()

if __name__ == '__main__':
    mp.spawn(
        init_data,
        args=(world_size,),
        nprocs=world_size,
        join=True
    )
Appending to opacus_distributed_data_loader_demo.py

让我们看看运行它时会发生什么——以及 from_data_loader 工厂方法究竟做了什么。

请注意,我们的隐私 DataLoader 是使用非分布式、非隐私的数据加载器初始化的。所有基本参数(每个 GPU 的批次大小和每个 GPU 的样本数量)都与分布式、非隐私数据加载器匹配。

输入 [10]
!python -W ignore opacus_distributed_data_loader_demo.py
05/13/2022 11:14:53:INFO:(rank 0) Non-distributed non-private data loader. #batches: 938, #data points: 60000, batch_size: 64
05/13/2022 11:14:53:INFO:(rank 0) Distributed, non-private data loader. #batches: 938, #data points: 30000, batch_size: 32
05/13/2022 11:14:53:INFO:(rank 0) Distributed, private data loader. #batches: 938, #data points: 30000, sample_rate: 0.001066, avg batch_size (=sample_rate*num_data_points): 31

第 3 章:同步¶

DDP 和 DPDDP 之间的一个显著区别在于它们如何处理同步。

通常在分布式数据并行中,前向和反向传播是同步点,DDP 包装器会确保在反向传播期间一旦每层梯度可用,就会在所有 worker 之间同步梯度。

然而,Opacus 需要一个更晚的同步点。在我们可以使用梯度之前,我们需要对其进行裁剪并添加噪声。这是在优化器中完成的,这使得同步点从反向传播移动到了优化步骤。此外,为了简化计算,我们仅在 rank=0 的 worker 上添加噪声,并使用根据所有 worker 组合批次校准的噪声尺度。

输入 [11]
%%writefile opacus_sync_demo.py

import sys
sys.path.append('/data/home/shilov/opacus')

from opacus_ddp_demo import setup, cleanup, mnist_train_ds, SampleConvNet
import logging
from torch.utils.data import DataLoader
import torch.optim as optim
from opacus.data_loader import DPDataLoader
from opacus import GradSampleModule
from opacus.distributed import DifferentiallyPrivateDistributedDataParallel as DPDDP
from opacus.optimizers import DistributedDPOptimizer
from torch.nn.parallel import DistributedDataParallel as DDP

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

BATCH_SIZE = 64
LR = 64

def init_training(rank, world_size):
    model = SampleConvNet()
    optimizer = optim.SGD(model.parameters(), lr=LR, momentum=0)

    model = GradSampleModule(model)
    model = DPDDP(model)

    optimizer = DistributedDPOptimizer(
        optimizer=optimizer,
        noise_multiplier=0.,
        max_grad_norm=100.,
        expected_batch_size=BATCH_SIZE//world_size,
    )

    data_loader = DPDataLoader.from_data_loader(
        data_loader=DataLoader(
            mnist_train_ds,
            batch_size=BATCH_SIZE,
        ),
        distributed=True,
    )


    return model, optimizer, data_loader
Writing opacus_sync_demo.py

现在我们已经初始化了 DifferentiallyPrivateDistributedDataParallel 模型和 DistributedDPOptimizer,让我们看看它们是如何协同工作的。

DifferentiallyPrivateDistributedDataParallel 是一个空操作:我们仅在初始化时执行模型同步,而在前向和反向传播中不执行任何操作。

另一方面,DistributedDPOptimizer 承担了所有繁重的工作:

  • 它在每个 worker 上独立进行梯度裁剪
  • 它仅在 rank=0 的 worker 上添加噪声
  • 它在 step() 过程中、应用梯度之前调用 torch.distributed.all_reduce 同步梯度
输入 [12]
%%writefile -a opacus_sync_demo.py

import torch.nn as nn
import numpy as np

def launch(rank, world_size):
    setup(rank, world_size)
    criterion = nn.CrossEntropyLoss()

    model, optimizer, data_loader = init_training(rank, world_size)
    model.to(rank)
    model.train()

    for data, target in data_loader:
        data = data
        target = torch.tensor(target)

        data, target = data.to(rank), target.to(rank)
        optimizer.zero_grad()

        output = model(data)
        loss = criterion(output, target)
        loss.backward()

        flat_grad = torch.cat([p.grad_sample.sum(dim=0).view(-1) for p in model.parameters()]).cpu().numpy() / optimizer.expected_batch_size
        logger.info(
            f"(rank={rank}) Gradient norm before optimizer.step(): {np.linalg.norm(flat_grad):.4f}"
        )
        logger.info(
            f"(rank={rank}) Gradient sample before optimizer.step(): {flat_grad[:3]}"
        )

        optimizer.step()

        flat_grad = torch.cat([p.grad.view(-1) for p in model.parameters()]).cpu().numpy()
        logger.info(
            f"(rank={rank}) Gradient norm after optimizer.step(): {np.linalg.norm(flat_grad):.4f}"
        )
        logger.info(
            f"(rank={rank}) Gradient sample after optimizer.step(): {flat_grad[:3]}"
        )

        break

    cleanup()
Appending to opacus_sync_demo.py
输入 [13]
%%writefile -a opacus_sync_demo.py

import torch.multiprocessing as mp
import torch

world_size = torch.cuda.device_count()

if __name__ == '__main__':
    mp.spawn(
        launch,
        args=(world_size,),
        nprocs=world_size,
        join=True
    )
Appending to opacus_sync_demo.py

当我们运行代码时,请注意梯度在 loss.backward() 之后并未同步,而仅在 optimizer.step() 之后同步。在这个例子中,我们设置了隐私参数以有效地禁用噪声和裁剪,因此同步后的梯度确实是各个 worker 梯度的平均值。

输入 [14]
!python -W ignore opacus_sync_demo.py
05/13/2022 11:15:22:INFO:(rank=1) Gradient norm before optimizer.step(): 0.9924
05/13/2022 11:15:22:INFO:(rank=1) Gradient sample before optimizer.step(): [-0.00525815 -0.01079952 -0.01051272]
05/13/2022 11:15:22:INFO:(rank=0) Gradient norm before optimizer.step(): 1.7812
05/13/2022 11:15:22:INFO:(rank=0) Gradient sample before optimizer.step(): [-0.0181896  -0.02559735 -0.02745825]
05/13/2022 11:15:22:INFO:(rank=0) Gradient norm after optimizer.step(): 1.2387
05/13/2022 11:15:22:INFO:(rank=1) Gradient norm after optimizer.step(): 1.2387
05/13/2022 11:15:22:INFO:(rank=0) Gradient sample after optimizer.step(): [-0.01172432 -0.01819846 -0.01898623]
05/13/2022 11:15:22:INFO:(rank=1) Gradient sample after optimizer.step(): [-0.01172432 -0.01819846 -0.01898623]

清理¶

输入 [15]
%%bash
rm opacus_ddp_demo.py
rm opacus_distributed_data_loader_demo.py
rm opacus_sync_demo.py
下载教程 Jupyter Notebook
Opacus
文档
简介常见问题教程API 参考
Github
opacus
法律
隐私政策条款
Meta Open Source
Copyright © 2025 Meta Platforms, Inc.