Creating PyTorch Distributed Workloads¶
Prerequisites
Internet Connection
ADS cli is installed
Install docker: https://docs.docker.com/get-docker
Write your training code:
For this example, the code to run was inspired from an example found here
Note that MASTER_ADDR
, MASTER_PORT
, WORLD_SIZE
, RANK
, and LOCAL_RANK
are environment variables
that will automatically be set.
# Copyright (c) 2017 Facebook, Inc. All rights reserved.
# BSD 3-Clause License
#
# Script adapted from:
# https://github.com/Azure/azureml-examples/blob/32eeda9e9f394bd6c3b687b55e2740abc50b116c/sdk/python/jobs/single-step/pytorch/distributed-training/src/train.py
# ==============================================================================
import datetime
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import os, argparse
# define network architecture
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 32, 3)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(32, 64, 3)
self.conv3 = nn.Conv2d(64, 128, 3)
self.fc1 = nn.Linear(128 * 6 * 6, 120)
self.dropout = nn.Dropout(p=0.2)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = self.pool(F.relu(self.conv2(x)))
x = self.pool(F.relu(self.conv3(x)))
x = x.view(-1, 128 * 6 * 6)
x = self.dropout(F.relu(self.fc1(x)))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# define functions
def train(train_loader, model, criterion, optimizer, epoch, device, print_freq, rank):
running_loss = 0.0
for i, data in enumerate(train_loader, 0):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data[0].to(device), data[1].to(device)
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
if i % print_freq == 0: # print every print_freq mini-batches
print(
"Rank %d: [%d, %5d] loss: %.3f"
% (rank, epoch + 1, i + 1, running_loss / print_freq)
)
running_loss = 0.0
def evaluate(test_loader, model, device):
classes = (
"plane",
"car",
"bird",
"cat",
"deer",
"dog",
"frog",
"horse",
"ship",
"truck",
)
model.eval()
correct = 0
total = 0
class_correct = list(0.0 for i in range(10))
class_total = list(0.0 for i in range(10))
with torch.no_grad():
for data in test_loader:
images, labels = data[0].to(device), data[1].to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
c = (predicted == labels).squeeze()
for i in range(10):
label = labels[i]
class_correct[label] += c[i].item()
class_total[label] += 1
# print total test set accuracy
print(
"Accuracy of the network on the 10000 test images: %d %%"
% (100 * correct / total)
)
# print test accuracy for each of the classes
for i in range(10):
print(
"Accuracy of %5s : %2d %%"
% (classes[i], 100 * class_correct[i] / class_total[i])
)
def main(args):
# get PyTorch environment variables
world_size = int(os.environ["WORLD_SIZE"])
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
distributed = world_size > 1
if torch.cuda.is_available():
print("CUDA is available.")
else:
print("CUDA is not available.")
# set device
if distributed:
if torch.cuda.is_available():
device = torch.device("cuda", local_rank)
else:
device = torch.device("cpu")
else:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# initialize distributed process group using default env:// method
if distributed:
torch.distributed.init_process_group(
backend=args.backend,
timeout=datetime.timedelta(minutes=args.timeout)
)
# define train and test dataset DataLoaders
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
train_set = torchvision.datasets.CIFAR10(
root=args.data_dir, train=True, download=True, transform=transform
)
if distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_set)
else:
train_sampler = None
train_loader = torch.utils.data.DataLoader(
train_set,
batch_size=args.batch_size,
shuffle=(train_sampler is None),
num_workers=args.workers,
sampler=train_sampler,
)
test_set = torchvision.datasets.CIFAR10(
root=args.data_dir, train=False, download=True, transform=transform
)
test_loader = torch.utils.data.DataLoader(
test_set, batch_size=args.batch_size, shuffle=False, num_workers=args.workers
)
model = Net().to(device)
# wrap model with DDP
if distributed:
if torch.cuda.is_available():
model = nn.parallel.DistributedDataParallel(
model, device_ids=[local_rank], output_device=local_rank
)
else:
model = nn.parallel.DistributedDataParallel(model)
# define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(
model.parameters(), lr=args.learning_rate, momentum=args.momentum
)
# train the model
for epoch in range(args.epochs):
print("Rank %d: Starting epoch %d" % (rank, epoch))
if distributed:
train_sampler.set_epoch(epoch)
model.train()
train(
train_loader,
model,
criterion,
optimizer,
epoch,
device,
args.print_freq,
rank,
)
print("Rank %d: Finished Training" % (rank))
if not distributed or rank == 0:
os.makedirs(args.output_dir, exist_ok=True)
model_path = os.path.join(args.output_dir, "cifar_net.pt")
torch.save(model.state_dict(), model_path)
# evaluate on full test dataset
evaluate(test_loader, model, device)
# run script
if __name__ == "__main__":
# setup argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--data-dir", type=str, help="directory containing CIFAR-10 dataset"
)
parser.add_argument("--epochs", default=10, type=int, help="number of epochs")
parser.add_argument(
"--batch-size",
default=16,
type=int,
help="mini batch size for each gpu/process",
)
parser.add_argument(
"--workers",
default=2,
type=int,
help="number of data loading workers for each gpu/process",
)
parser.add_argument(
"--learning-rate", default=0.001, type=float, help="learning rate"
)
parser.add_argument("--momentum", default=0.9, type=float, help="momentum")
parser.add_argument(
"--output-dir", default="outputs", type=str, help="directory to save model to"
)
parser.add_argument(
"--print-freq",
default=200,
type=int,
help="frequency of printing training statistics",
)
parser.add_argument(
"--backend", default="gloo", type=str,
help="distributed communication backend, should be gloo, nccl or mpi"
)
parser.add_argument(
"--timeout", default=30, type=int,
help="timeout in minutes for waiting for the initialization of distributed process group."
)
args = parser.parse_args()
# call main function
main(args)
Initialize a distributed-training folder:
At this point you have create a training file (or files) - train.py
in the above
example. Now running the command below will download the artifacts required for building the docker image.
The artifacts will be saved into the oci_dist_training_artifacts/pytorch/v1
directory under your current working directory.
ads opctl distributed-training init --framework pytorch --version v1
Containerize your code and build container:
Before you can build the image, you must set the following environment variables:
Specify image name and tag
export IMAGE_NAME=<region.ocir.io/my-tenancy/image-name>
export TAG=latest
Build the container image
ads opctl distributed-training build-image \
-t $TAG \
-reg $IMAGE_NAME \
-df oci_dist_training_artifacts/pytorch/v1/Dockerfile
The code is assumed to be in the current working directory. To override the source code directory, use the -s
flag and specify the code dir. This folder should be within the current working directory.
ads opctl distributed-training build-image \
-t $TAG \
-reg $IMAGE_NAME \
-df oci_dist_training_artifacts/pytorch/v1/Dockerfile
-s <code_dir>
If you are behind proxy, ads opctl will automatically use your proxy settings (defined via no_proxy
, http_proxy
and https_proxy
).
Define your workload yaml:
The yaml
file is a declarative way to express the workload.
Following is the YAML for running the example code, you will need to replace the values in the spec sections for your project:
infrastructure
containsspec
for OCI Data Science Jobs. Here you need to specify a subnet that allows communications between nodes. TheVM.GPU2.1
shape is used in this example.cluster
containsspec
for the image you built and a working directory on OCI object storage, which will be used by job runs to shared internal configurations. Environment variables specified in thecluster.spec.config
will be available in all nodes. Here theNCCL_ASYNC_ERROR_HANDLING
is used to enable the timeout for NCCL backend. The job runs will be terminated if the nodes failed to connect to each other in certain minutes as specified in your training code when callinginit_process_group()
.runtime
containsspec
for the name of your training script, and the command line arguments for running the script. Here thenccl
backend is used for communications between GPUs. For CPU training, you can use thegloo
backend. Thetimeout
argument specify the maximum minutes for the nodes to wait when callinginit_process_group()
. This is useful for preventing the job runs to wait forever in case of node failure.
kind: distributed
apiVersion: v1.0
spec:
infrastructure:
kind: infrastructure
type: dataScienceJob
apiVersion: v1.0
spec:
projectId: oci.xxxx.<project_ocid>
compartmentId: oci.xxxx.<compartment_ocid>
displayName: PyTorch-Distributed
logGroupId: oci.xxxx.<log_group_ocid>
logId: oci.xxx.<log_ocid>
subnetId: oci.xxxx.<subnet-ocid>
shapeName: VM.GPU2.1
blockStorageSize: 50
cluster:
kind: pytorch
apiVersion: v1.0
spec:
image: <region.ocir.io/my-tenancy/image-name>
workDir: "oci://my-bucket@my-namespace/pytorch/distributed"
config:
env:
- name: NCCL_ASYNC_ERROR_HANDLING
value: '1'
main:
name: PyTorch-Distributed-main
replicas: 1
worker:
name: PyTorch-Distributed-worker
replicas: 3
runtime:
kind: python
apiVersion: v1.0
spec:
entryPoint: "train.py"
args:
- --data-dir
- /home/datascience/data
- --output-dir
- /home/datascience/outputs
- --backend
- gloo
- --timeout
- 5
Use ads opctl to create the cluster infrastructure and dry-run the workload:
ads opctl run -f train.yaml --dry-run
the output from the dry run will show all the actions and infrastructure configuration.
Use ads opctl to create the cluster infrastructure and run the workload:
Test Locally:
Before submitting the workload to jobs, you can run it locally to test your code, dependencies, configurations etc.
With -b local
flag, it uses a local backend. Further when you need to run this workload on OCI data science jobs, simply use -b job
flag instead.
ads opctl run -f train.yaml -b local
If your code requires to use any oci services (like object bucket), you need to mount oci keys from your local host machine onto the container. This is already done for you assuming the typical location of oci keys ~/.oci
. You can modify it though, in-case you have keys at a different location. You need to do this in the config.ini file.
oci_key_mnt = ~/.oci:/home/oci_dist_training/.oci
Note that the local backend requires the source code for your workload is available locally in the source folder specified in the config.ini
file.
If you specified Git repository or OCI object storage location as source code location in your workflow YAML, please make sure you have a local copy available for local testing.
Submit the workload:
ads opctl run -f train.yaml -b job
Note:: This will automatically push the docker image to the OCI container registry repo .
Once running, you will see on the terminal outputs similar to the below
jobId: oci.xxxx.<job_ocid>
mainJobRunId:
mainJobRunIdName: oci.xxxx.<job_run_ocid>
workDir: oci://my-bucket@my-namespace/cluster-testing/005
otherJobRunIds:
- workerJobRunIdName_1: oci.xxxx.<job_run_ocid>
- workerJobRunIdName_2: oci.xxxx.<job_run_ocid>
- workerJobRunIdName_3: oci.xxxx.<job_run_ocid>
This information can be saved as YAML file and used as input to ads opctl distributed-training show-config -f <info.yaml>
.
You can use --job-info
to save the job run info into YAML, for example:
ads opctl run -f train.yaml --job-info info.yaml
Monitoring the workload logs
To view the logs from a job run, you could run -
ads opctl watch oci.xxxx.<job_run_ocid>
You could stream the logs from any of the job run ocid using ads opctl watch
command. You could run this command from multiple terminal to watch all of the job runs. Typically, watching mainJobRunId
should yield most informative log.
Saving Artifacts to Object Storage Buckets
In case you want to save the artifacts generated by the training process (model checkpoints, TensorBoard logs, etc.) to an object bucket
you can use the ‘sync’ feature. The environment variable OCI__SYNC_DIR
exposes the directory location that will be automatically synchronized
to the configured object storage bucket location. Use this directory in your training script to save the artifacts.
To configure the destination object storage bucket location, use the following settings in the workload yaml file(train.yaml).
- name: SYNC_ARTIFACTS
value: 1
- name: WORKSPACE
value: "<bucket_name>"
- name: WORKSPACE_PREFIX
value: "<bucket_prefix>"
Note: Change SYNC_ARTIFACTS
to 0
to disable this feature.
Use OCI__SYNC_DIR
env variable in your code to save the artifacts. For Example :
model_path = os.path.join(os.environ.get("OCI__SYNC_DIR"),"model.pt")
torch.save(model, model_path)
Profiling
You may want to profile your training setup for optimization/performance tuning. Profiling typically provides a detailed analysis of cpu utilization, gpu utilization, top cuda kernels, top operators etc. You can choose to profile your training setup using the native Pytorch profiler or using a third party profiler such as Nvidia Nsights.
Profiling using Pytorch Profiler
Pytorch Profiler is a native offering from Pytorch for Pytorch performance profiling. Profiling is invoked using code instrumentation using the api torch.profiler.profile
.
Refer this link for changes that you need to do in your training script for instrumentation.
You should choose the OCI__SYNC_DIR
directory to save the profiling logs. For example:
prof = torch.profiler.profile(activities=[torch.profiler.ProfilerActivity.CPU,torch.profiler.ProfilerActivity.CUDA],
schedule=torch.profiler.schedule(
wait=1,
warmup=1,
active=3,
repeat=1),
on_trace_ready=torch.profiler.tensorboard_trace_handler(os.environ.get("OCI__SYNC_DIR") + "/logs"),
with_stack=False)
prof.start()
# training code
prof.end()
Also, the sync feature SYNC_ARTIFACTS
should be enabled '1'
to sync the profiling logs to the configured object storage.
You would also need to install the Pytorch Tensorboard Plugin.
pip install torch-tb-profiler
Thereafter, use Tensorboard to view logs. Refer the Tensorboard setup for set-up on your computer.
Profiling using Nvidia Nsights
Nvidia Nsights. is a system wide profiling tool from Nvidia that can be used to profile Deep Learning workloads.
Nsights requires no change in your training code. This works on process level. You can enable this experimental feature in your training setup via the following configuration in the runtime yaml file(highlighted).
spec:
image: "@image"
workDir: "oci://@/"
name: "tf_multiworker"
config:
env:
- name: WORKER_PORT
value: 12345
- name: SYNC_ARTIFACTS
value: 1
- name: WORKSPACE
value: "<bucket_name>"
- name: WORKSPACE_PREFIX
value: "<bucket_prefix>"
- name: PROFILE
value: 1
- name: PROFILE_CMD
value: "nsys profile -w true -t cuda,nvtx,osrt,cudnn,cublas -s none -o /opt/ml/nsight_report -x true"
main:
name: "main"
replicas: 1
worker:
name: "worker"
replicas: 1
Refer this for nsys profile command options. You can modify the command within the PROFILE_CMD
but remember this is all experimental. The profiling reports are generated per node. You need to download the reports to your computer manually or via the oci command.
oci os object bulk-download \
-ns <namespace> \
-bn <bucket_name> \
--download-dir /path/on/your/computer \
--prefix path/on/bucket/<job_id>
Note: -bn
== WORKSPACE
and --prefix path
== WORKSPACE_PREFIX/<job_id>
, as configured in the runtime yaml file.
To view the reports, you would need to install Nsight Systems app from here. Thereafter, open the downloaded reports in the Nsight Systems app.