在本教程中,我们将介绍在分布式模型训练流水线中开始使用 Opacus 所需了解的基础知识。随着最先进的模型和数据集变得越来越大,多 GPU 训练已成为常态,Opacus 提供了对分布式数据并行 (DDP) 的无缝开箱即用支持。
本教程需要具备 Opacus 和 DDP 的基础知识。如果您是第一次接触这些工具,建议从以下教程开始:使用差分隐私构建图像分类器 以及 分布式数据并行入门
在第 1 章中,我们将从一个最小可行示例开始,演示为了让 Opacus 在分布式设置中工作具体需要做什么。这应该足以让您应对大多数常见场景。
在第 2 章和第 3 章中,我们将深入了解其实现并讨论技术细节。我们将看到隐私 DDP 与常规 DDP 之间的区别,以及为什么我们需要引入这些区别。
在开始之前,有几件事我们需要提及。
首先,本教程编写环境为至少拥有 2 个 GPU 的单台 Linux 机器。对于 Windows 环境和/或多节点训练,基本原理保持不变,但您需要稍微修改 DDP 代码才能使其正常工作。
其次,众所周知 Jupyter notebooks 不支持 DDP 训练。在整个教程中,我们将使用 %%writefile 魔法命令将代码写入独立文件,稍后通过终端执行。这些文件将在本笔记本的最后一个单元格中被清理。
首先,让我们初始化分布式环境
%%writefile opacus_ddp_demo.py
import os
import torch.distributed as dist
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
Overwriting opacus_ddp_demo.py
我们将使用 MNIST 作为示例,因此让我们也初始化一个简单的卷积网络并下载数据集
%%writefile -a opacus_ddp_demo.py
import torch.nn as nn
import torch.nn.functional as F
class SampleConvNet(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 16, 8, 2, padding=3)
self.conv2 = nn.Conv2d(16, 32, 4, 2)
self.fc1 = nn.Linear(32 * 4 * 4, 32)
self.fc2 = nn.Linear(32, 10)
def forward(self, x):
# x of shape [B, 1, 28, 28]
x = F.relu(self.conv1(x)) # -> [B, 16, 14, 14]
x = F.max_pool2d(x, 2, 1) # -> [B, 16, 13, 13]
x = F.relu(self.conv2(x)) # -> [B, 32, 5, 5]
x = F.max_pool2d(x, 2, 1) # -> [B, 32, 4, 4]
x = x.view(-1, 32 * 4 * 4) # -> [B, 512]
x = F.relu(self.fc1(x)) # -> [B, 32]
x = self.fc2(x) # -> [B, 10]
return x
Appending to opacus_ddp_demo.py
%%writefile -a opacus_ddp_demo.py
from torchvision import datasets, transforms
# Precomputed characteristics of the MNIST dataset
MNIST_MEAN = 0.1307
MNIST_STD = 0.3081
DATA_ROOT = "./mnist"
mnist_train_ds = datasets.MNIST(
DATA_ROOT,
train=True,
download=True,
transform=transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize((MNIST_MEAN,), (MNIST_STD,)),
]
),
)
mnist_test_ds = datasets.MNIST(
DATA_ROOT,
train=False,
download=True,
transform=transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize((MNIST_MEAN,), (MNIST_STD,)),
]
),
)
Appending to opacus_ddp_demo.py
接下来是关键部分——也是与非隐私 DDP 唯一的不同之处。
首先,我们不使用 DistributedDataParallel 包装模型,而是使用 opacus.distributed 包中的 DifferentiallyPrivateDistributedDataParallel 进行包装。就这么简单。
第二个区别出现在初始化 DataLoader 时。通常在分布式训练中,您会初始化特定于分布式设置的数据加载器。这会影响两个参数:
local_batch_size * num_gpus。sampler=DistributedSampler(dataset) 以便将训练数据集分发到各个 GPU 上。使用 Opacus,您不需要做这两件事。make_private 方法期望用户提供的 DataLoader 是非分布式的,就像您在单个 GPU 上训练时初始化的那样。
下面的代码通过注释掉需要替换或删除的代码行,重点展示了您需要对普通 DDP 训练流水线进行的更改。
%%writefile -a opacus_ddp_demo.py
import torch
import torch.optim as optim
from torch.utils.data import DataLoader, DistributedSampler
from opacus.distributed import DifferentiallyPrivateDistributedDataParallel as DPDDP
from torch.nn.parallel import DistributedDataParallel as DDP
from opacus import PrivacyEngine
LR = 0.1
BATCH_SIZE = 200
N_GPUS = torch.cuda.device_count()
def init_training(rank):
model = SampleConvNet()
#model = DDP(model) -- non-private
model = DPDDP(model)
optimizer = optim.SGD(model.parameters(), lr=LR, momentum=0)
data_loader = DataLoader(
mnist_train_ds,
#batch_size=BATCH_SIZE // N_GPUS, -- non-private
batch_size=BATCH_SIZE,
#sampler=DistributedSampler(mnist_train_ds) -- non-private
)
if rank == 0:
logger.info(
f"(rank {rank}) Initialized model ({type(model).__name__}), "
f"optimizer ({type(optimizer).__name__}), "
f"data loader ({type(data_loader).__name__}, len={len(data_loader)})"
)
privacy_engine = PrivacyEngine()
# PrivacyEngine looks at the model's class and enables
# distributed processing if it's wrapped with DPDDP
model, optimizer, data_loader = privacy_engine.make_private(
module=model,
optimizer=optimizer,
data_loader=data_loader,
noise_multiplier=1.,
max_grad_norm=1.,
)
if rank == 0:
logger.info(
f"(rank {rank}) After privatization: model ({type(model).__name__}), "
f"optimizer ({type(optimizer).__name__}), "
f"data loader ({type(data_loader).__name__}, len={len(data_loader)})"
)
logger.info(f"(rank {rank}) Average batch size per GPU: {int(optimizer.expected_batch_size)}")
return model, optimizer, data_loader, privacy_engine
Appending to opacus_ddp_demo.py
现在我们只需要定义训练循环并启动它。
%%writefile -a opacus_ddp_demo.py
import numpy as np
def test(model, device):
test_loader = DataLoader(
mnist_test_ds,
batch_size=BATCH_SIZE,
)
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(
dim=1, keepdim=True
)
correct += pred.eq(target.view_as(pred)).sum().item()
model.train()
return correct / len(mnist_test_ds)
def launch(rank, world_size, epochs):
setup(rank, world_size)
criterion = nn.CrossEntropyLoss()
model, optimizer, data_loader, privacy_engine = init_training(rank)
model.to(rank)
model.train()
for e in range(epochs):
losses = []
correct = 0
total = 0
for data, target in data_loader:
data, target = data.to(rank), target.to(rank)
optimizer.zero_grad()
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
total += len(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
losses.append(loss.item())
test_accuracy = test(model, rank)
train_accuracy = correct / total
epsilon = privacy_engine.get_epsilon(delta=1e-5)
if rank == 0:
print(
f"Epoch: {e} \t"
f"Train Loss: {np.mean(losses):.4f} | "
f"Train Accuracy: {train_accuracy:.2f} | "
f"Test Accuracy: {test_accuracy:.2f} |"
f"(ε = {epsilon:.2f})"
)
cleanup()
Appending to opacus_ddp_demo.py
%%writefile -a opacus_ddp_demo.py
import torch.multiprocessing as mp
EPOCHS = 10
world_size = torch.cuda.device_count()
if __name__ == '__main__':
mp.spawn(
launch,
args=(world_size,EPOCHS,),
nprocs=world_size,
join=True
)
Appending to opacus_ddp_demo.py
最后,运行脚本。请注意,我们初始化的 DataLoader 的 batch_size=200,这相当于完整数据集(60000 张图像)上的 300 个批次。
在每个 worker 上将其传递给 make_private 后,我们得到了每个 batch_size=100 的数据加载器,但每个数据加载器仍然会遍历 300 个批次。
!python -W ignore opacus_ddp_demo.py
05/13/2022 11:13:16:INFO:(rank 0) Initialized model (DifferentiallyPrivateDistributedDataParallel), optimizer (SGD), data loader (DataLoader, len=300) 05/13/2022 11:13:16:INFO:(rank 1) Average batch size per GPU: 100 05/13/2022 11:13:16:INFO:(rank 0) After privatization: model (GradSampleModule), optimizer (DistributedDPOptimizer), data loader (DPDataLoader, len=300) 05/13/2022 11:13:16:INFO:(rank 0) Average batch size per GPU: 100 Epoch: 0 Train Loss: 1.5412 | Train Accuracy: 0.57 | Test Accuracy: 0.73 |(ε = 0.87) Epoch: 1 Train Loss: 0.6717 | Train Accuracy: 0.79 | Test Accuracy: 0.83 |(ε = 0.91) Epoch: 2 Train Loss: 0.5659 | Train Accuracy: 0.85 | Test Accuracy: 0.86 |(ε = 0.96) Epoch: 3 Train Loss: 0.5347 | Train Accuracy: 0.87 | Test Accuracy: 0.88 |(ε = 1.00) Epoch: 4 Train Loss: 0.5178 | Train Accuracy: 0.88 | Test Accuracy: 0.90 |(ε = 1.03) Epoch: 5 Train Loss: 0.4750 | Train Accuracy: 0.90 | Test Accuracy: 0.91 |(ε = 1.07) Epoch: 6 Train Loss: 0.4502 | Train Accuracy: 0.90 | Test Accuracy: 0.91 |(ε = 1.11) Epoch: 7 Train Loss: 0.4358 | Train Accuracy: 0.91 | Test Accuracy: 0.92 |(ε = 1.14) Epoch: 8 Train Loss: 0.4186 | Train Accuracy: 0.92 | Test Accuracy: 0.92 |(ε = 1.18) Epoch: 9 Train Loss: 0.4129 | Train Accuracy: 0.92 | Test Accuracy: 0.93 |(ε = 1.21)
注意:以下两章将讨论 Opacus 的高级用法及其实现细节。我们强烈建议在继续之前阅读 Opacus 高级功能 教程。
现在让我们查看 make_private 方法内部,看看它为了实现 DDP 处理做了什么。我们将从对 DataLoader 的修改开始。
提醒一下,DPDataLoader 与常规 DataLoader 仅在一个方面不同——它使用均匀有放回随机采样器(又称“泊松采样”)进行数据采样。这意味着我们不再使用固定的批次大小,而是使用采样率:即每个样本被包含在下一个批次中的概率。
现在让我们初始化常规数据加载器,然后将其转换为 DPDataLoader。这正是我们在 make_private() 方法中所做的。
下面我们将初始化三个数据加载器:
这三个加载器的初始化都使得逻辑批次大小为 64。
请注意,仅当 poisson_sampling 设置为 True(默认值)时,make_private 方法才会使用 DPDataLoader。如果在利用 DPDPP 时将 poisson_sampling 设置为 False,那么我们需要像非隐私情况一样,显式地向 DataLoader 提供 DistributedSampler 且将 batch_size 设为 BATCH_SIZE // world_size。
%%writefile opacus_distributed_data_loader_demo.py
from opacus_ddp_demo import setup, cleanup, mnist_train_ds
import logging
from torch.utils.data import DataLoader, DistributedSampler
from opacus.data_loader import DPDataLoader
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
BATCH_SIZE = 64
def init_data(rank, world_size):
setup(rank, world_size)
non_distributed_dl = DataLoader(
mnist_train_ds,
batch_size=BATCH_SIZE
)
distributed_non_private_dl = DataLoader(
mnist_train_ds,
batch_size=BATCH_SIZE // world_size,
sampler=DistributedSampler(mnist_train_ds),
)
private_dl = DPDataLoader.from_data_loader(non_distributed_dl, distributed=True)
if rank == 0:
logger.info(
f"(rank {rank}) Non-distributed non-private data loader. "
f"#batches: {len(non_distributed_dl)}, "
f"#data points: {len(non_distributed_dl.sampler)}, "
f"batch_size: {non_distributed_dl.batch_size}"
)
logger.info(
f"(rank {rank}) Distributed, non-private data loader. "
f"#batches: {len(distributed_non_private_dl)}, "
f"#data points: {len(distributed_non_private_dl.sampler)}, "
f"batch_size: {distributed_non_private_dl.batch_size}"
)
logger.info(
f"(rank {rank}) Distributed, private data loader. "
f"#batches: {len(private_dl)}, "
f"#data points: {private_dl.batch_sampler.num_samples}, "
f"sample_rate: {private_dl.sample_rate:4f}, "
f"avg batch_size (=sample_rate*num_data_points): {int(private_dl.sample_rate*private_dl.batch_sampler.num_samples)}"
)
Writing opacus_distributed_data_loader_demo.py
%%writefile -a opacus_distributed_data_loader_demo.py
import torch
import torch.multiprocessing as mp
world_size = torch.cuda.device_count()
if __name__ == '__main__':
mp.spawn(
init_data,
args=(world_size,),
nprocs=world_size,
join=True
)
Appending to opacus_distributed_data_loader_demo.py
让我们看看运行它时会发生什么——以及 from_data_loader 工厂方法究竟做了什么。
请注意,我们的隐私 DataLoader 是使用非分布式、非隐私的数据加载器初始化的。所有基本参数(每个 GPU 的批次大小和每个 GPU 的样本数量)都与分布式、非隐私数据加载器匹配。
!python -W ignore opacus_distributed_data_loader_demo.py
05/13/2022 11:14:53:INFO:(rank 0) Non-distributed non-private data loader. #batches: 938, #data points: 60000, batch_size: 64 05/13/2022 11:14:53:INFO:(rank 0) Distributed, non-private data loader. #batches: 938, #data points: 30000, batch_size: 32 05/13/2022 11:14:53:INFO:(rank 0) Distributed, private data loader. #batches: 938, #data points: 30000, sample_rate: 0.001066, avg batch_size (=sample_rate*num_data_points): 31
DDP 和 DPDDP 之间的一个显著区别在于它们如何处理同步。
通常在分布式数据并行中,前向和反向传播是同步点,DDP 包装器会确保在反向传播期间一旦每层梯度可用,就会在所有 worker 之间同步梯度。
然而,Opacus 需要一个更晚的同步点。在我们可以使用梯度之前,我们需要对其进行裁剪并添加噪声。这是在优化器中完成的,这使得同步点从反向传播移动到了优化步骤。此外,为了简化计算,我们仅在 rank=0 的 worker 上添加噪声,并使用根据所有 worker 组合批次校准的噪声尺度。
%%writefile opacus_sync_demo.py
import sys
sys.path.append('/data/home/shilov/opacus')
from opacus_ddp_demo import setup, cleanup, mnist_train_ds, SampleConvNet
import logging
from torch.utils.data import DataLoader
import torch.optim as optim
from opacus.data_loader import DPDataLoader
from opacus import GradSampleModule
from opacus.distributed import DifferentiallyPrivateDistributedDataParallel as DPDDP
from opacus.optimizers import DistributedDPOptimizer
from torch.nn.parallel import DistributedDataParallel as DDP
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
BATCH_SIZE = 64
LR = 64
def init_training(rank, world_size):
model = SampleConvNet()
optimizer = optim.SGD(model.parameters(), lr=LR, momentum=0)
model = GradSampleModule(model)
model = DPDDP(model)
optimizer = DistributedDPOptimizer(
optimizer=optimizer,
noise_multiplier=0.,
max_grad_norm=100.,
expected_batch_size=BATCH_SIZE//world_size,
)
data_loader = DPDataLoader.from_data_loader(
data_loader=DataLoader(
mnist_train_ds,
batch_size=BATCH_SIZE,
),
distributed=True,
)
return model, optimizer, data_loader
Writing opacus_sync_demo.py
现在我们已经初始化了 DifferentiallyPrivateDistributedDataParallel 模型和 DistributedDPOptimizer,让我们看看它们是如何协同工作的。
DifferentiallyPrivateDistributedDataParallel 是一个空操作:我们仅在初始化时执行模型同步,而在前向和反向传播中不执行任何操作。
另一方面,DistributedDPOptimizer 承担了所有繁重的工作:
rank=0 的 worker 上添加噪声step() 过程中、应用梯度之前调用 torch.distributed.all_reduce 同步梯度%%writefile -a opacus_sync_demo.py
import torch.nn as nn
import numpy as np
def launch(rank, world_size):
setup(rank, world_size)
criterion = nn.CrossEntropyLoss()
model, optimizer, data_loader = init_training(rank, world_size)
model.to(rank)
model.train()
for data, target in data_loader:
data = data
target = torch.tensor(target)
data, target = data.to(rank), target.to(rank)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
flat_grad = torch.cat([p.grad_sample.sum(dim=0).view(-1) for p in model.parameters()]).cpu().numpy() / optimizer.expected_batch_size
logger.info(
f"(rank={rank}) Gradient norm before optimizer.step(): {np.linalg.norm(flat_grad):.4f}"
)
logger.info(
f"(rank={rank}) Gradient sample before optimizer.step(): {flat_grad[:3]}"
)
optimizer.step()
flat_grad = torch.cat([p.grad.view(-1) for p in model.parameters()]).cpu().numpy()
logger.info(
f"(rank={rank}) Gradient norm after optimizer.step(): {np.linalg.norm(flat_grad):.4f}"
)
logger.info(
f"(rank={rank}) Gradient sample after optimizer.step(): {flat_grad[:3]}"
)
break
cleanup()
Appending to opacus_sync_demo.py
%%writefile -a opacus_sync_demo.py
import torch.multiprocessing as mp
import torch
world_size = torch.cuda.device_count()
if __name__ == '__main__':
mp.spawn(
launch,
args=(world_size,),
nprocs=world_size,
join=True
)
Appending to opacus_sync_demo.py
当我们运行代码时,请注意梯度在 loss.backward() 之后并未同步,而仅在 optimizer.step() 之后同步。在这个例子中,我们设置了隐私参数以有效地禁用噪声和裁剪,因此同步后的梯度确实是各个 worker 梯度的平均值。
!python -W ignore opacus_sync_demo.py
05/13/2022 11:15:22:INFO:(rank=1) Gradient norm before optimizer.step(): 0.9924 05/13/2022 11:15:22:INFO:(rank=1) Gradient sample before optimizer.step(): [-0.00525815 -0.01079952 -0.01051272] 05/13/2022 11:15:22:INFO:(rank=0) Gradient norm before optimizer.step(): 1.7812 05/13/2022 11:15:22:INFO:(rank=0) Gradient sample before optimizer.step(): [-0.0181896 -0.02559735 -0.02745825] 05/13/2022 11:15:22:INFO:(rank=0) Gradient norm after optimizer.step(): 1.2387 05/13/2022 11:15:22:INFO:(rank=1) Gradient norm after optimizer.step(): 1.2387 05/13/2022 11:15:22:INFO:(rank=0) Gradient sample after optimizer.step(): [-0.01172432 -0.01819846 -0.01898623] 05/13/2022 11:15:22:INFO:(rank=1) Gradient sample after optimizer.step(): [-0.01172432 -0.01819846 -0.01898623]
%%bash
rm opacus_ddp_demo.py
rm opacus_distributed_data_loader_demo.py
rm opacus_sync_demo.py