Creating Horovod Workloads

Prerequisites

  1. Internet Connection

  2. ADS cli is installed

  3. Install docker: https://docs.docker.com/get-docker

Write your training code:

Your model training script (TensorFlow or PyTorch) needs to be adapted to use (Elastic) Horovod APIs for distributed training. Refer Writing distributed code with horovod framework

Also see : Horovod Examples

For this example, the code to run was inspired from an example found here . There are minimal changes to this script to save the training artifacts and TensorBoard logs to a folder referenced by OCI__SYNC_DIR environment variable. OCI__SYNC_DIR is a pre-provisioned folder which can be synchronized with an object bucket during the training process.

train.py
# Script adapted from https://github.com/horovod/horovod/blob/master/examples/elastic/tensorflow2/tensorflow2_keras_mnist_elastic.py

# ==============================================================================


import argparse
import tensorflow as tf
import horovod.tensorflow.keras as hvd
from distutils.version import LooseVersion

import os

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

parser = argparse.ArgumentParser(description="Tensorflow 2.0 Keras MNIST Example")

parser.add_argument(
    "--use-mixed-precision",
    action="store_true",
    default=False,
    help="use mixed precision for training",
)

parser.add_argument(
    "--data-dir",
    help="location of the training dataset in the local filesystem (will be downloaded if needed)",
    default='/code/data/mnist.npz'
)

args = parser.parse_args()

if args.use_mixed_precision:
    print(f"using mixed precision {args.use_mixed_precision}")
    if LooseVersion(tf.__version__) >= LooseVersion("2.4.0"):
        from tensorflow.keras import mixed_precision

        mixed_precision.set_global_policy("mixed_float16")
    else:
        policy = tf.keras.mixed_precision.experimental.Policy("mixed_float16")
        tf.keras.mixed_precision.experimental.set_policy(policy)

# Horovod: initialize Horovod.
hvd.init()

# Horovod: pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU")

import numpy as np

minist_local = args.data_dir


def load_data():
    print("using pre-fetched dataset")
    with np.load(minist_local, allow_pickle=True) as f:
        x_train, y_train = f["x_train"], f["y_train"]
        x_test, y_test = f["x_test"], f["y_test"]
        return (x_train, y_train), (x_test, y_test)


(mnist_images, mnist_labels), _ = (
    load_data()
    if os.path.exists(minist_local)
    else tf.keras.datasets.mnist.load_data(path="mnist-%d.npz" % hvd.rank())
)


dataset = tf.data.Dataset.from_tensor_slices(
    (
        tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32),
        tf.cast(mnist_labels, tf.int64),
    )
)
dataset = dataset.repeat().shuffle(10000).batch(128)

model = tf.keras.Sequential(
    [
        tf.keras.layers.Conv2D(32, [3, 3], activation="relu"),
        tf.keras.layers.Conv2D(64, [3, 3], activation="relu"),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
        tf.keras.layers.Dropout(0.25),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation="relu"),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.Dense(10, activation="softmax"),
    ]
)

# Horovod: adjust learning rate based on number of GPUs.
scaled_lr = 0.001 * hvd.size()
opt = tf.optimizers.Adam(scaled_lr)

# Horovod: add Horovod DistributedOptimizer.
opt = hvd.DistributedOptimizer(
    opt, backward_passes_per_step=1, average_aggregated_gradients=True
)

# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
model.compile(
    loss=tf.losses.SparseCategoricalCrossentropy(),
    optimizer=opt,
    metrics=["accuracy"],
    experimental_run_tf_function=False,
)

# Horovod: initialize optimizer state so we can synchronize across workers
# Keras has empty optimizer variables() for TF2:
# https://sourcegraph.com/github.com/tensorflow/tensorflow@v2.4.1/-/blob/tensorflow/python/keras/optimizer_v2/optimizer_v2.py#L351:10
model.fit(dataset, steps_per_epoch=1, epochs=1, callbacks=None)

state = hvd.elastic.KerasState(model, batch=0, epoch=0)


def on_state_reset():
    tf.keras.backend.set_value(state.model.optimizer.lr, 0.001 * hvd.size())
    # Re-initialize, to join with possible new ranks
    state.model.fit(dataset, steps_per_epoch=1, epochs=1, callbacks=None)


state.register_reset_callbacks([on_state_reset])

callbacks = [
    hvd.callbacks.MetricAverageCallback(),
    hvd.elastic.UpdateEpochStateCallback(state),
    hvd.elastic.UpdateBatchStateCallback(state),
    hvd.elastic.CommitStateCallback(state),
]

# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
# save the artifacts in the OCI__SYNC_DIR dir.
artifacts_dir = os.environ.get("OCI__SYNC_DIR") + "/artifacts"
tb_logs_path = os.path.join(artifacts_dir, "logs")
check_point_path = os.path.join(artifacts_dir, "ckpts", "checkpoint-{epoch}.h5")
if hvd.rank() == 0:
    callbacks.append(tf.keras.callbacks.ModelCheckpoint(check_point_path))
    callbacks.append(tf.keras.callbacks.TensorBoard(tb_logs_path))

# Train the model.
# Horovod: adjust number of steps based on number of GPUs.
@hvd.elastic.run
def train(state):
    state.model.fit(
        dataset,
        steps_per_epoch=500 // hvd.size(),
        epochs=2 - state.epoch,
        callbacks=callbacks,
        verbose=1,
    )


train(state)

Initialize a distributed-training folder:

At this point you have created a training file (or files) - train.py from the above example. Now, run the command below.

ads opctl distributed-training init --framework horovod-tensorflow --version v1

Note: If you choose to run a PyTorch example instead, use horovod-pytorch as the framework.

ads opctl distributed-training init --framework horovod-pytorch --version v1

This will download the horovod-tensorflow|horovod-pytorch framework and place it inside 'oci_dist_training_artifacts' folder.

Containerize your code and build container:

To build the image:

Horovod frameworks for TensorFlow and PyTorch contains two separate docker files, for cpu and gpu. Choose the docker file based on whether you are going to use cpu or gpu based shapes.

Before you can build the image, you must set the following environment variables:

Specify image name and tag

export IMAGE_NAME=<region.ocir.io/my-tenancy/image-name>
export TAG=latest

Build the container image.

ads opctl distributed-training build-image \
    -t $TAG \
    -reg $IMAGE_NAME \
    -df oci_dist_training_artifacts/horovod/v1/<pytorch|tensorflow>.<cpu|gpu>.Dockerfile

The code is assumed to be in the current working directory. To override the source code directory, use the -s flag and specify the code dir. This folder should be within the current working directory.

ads opctl distributed-training build-image \
    -t $TAG \
    -reg $IMAGE_NAME \
     -df oci_dist_training_artifacts/horovod/v1/<pytorch|tensorflow>.<cpu|gpu>.Dockerfile
    -s <code_dir>

If you are behind proxy, ads opctl will automatically use your proxy settings (defined via no_proxy, http_proxy and https_proxy).

SSH Setup:

In Horovod distributed training, communication between scheduler and worker(s) uses a secure connection. For this purpose, SSH keys need to be provisioned in the scheduler and worker nodes. This is already taken care in the docker images. When the docker image is built, SSH key pair is placed inside the image with required configuration changes (adding public key to authorized_keys file). This enables a secure connection between scheduler and the workers.

Define your workload yaml:

The yaml file is a declarative way to express the workload.

train.yaml
kind: distributed
apiVersion: v1.0
spec:
  infrastructure: # This section maps to Job definition. Does not include environment variables
    kind: infrastructure
    type: dataScienceJob
    apiVersion: v1.0
    spec:
      projectId: oci.xxxx.<project_ocid>
      compartmentId: oci.xxxx.<compartment_ocid>
      displayName: HVD-Distributed-TF
      logGroupId: oci.xxxx.<log_group_ocid>
      subnetId: oci.xxxx.<subnet-ocid>
      shapeName: VM.GPU2.1
      blockStorageSize: 50
  cluster:
    kind: HOROVOD
    apiVersion: v1.0
    spec:
      image: "<region>.ocir.io/<tenancy_id>/<repo_name>/<image_name>:<image_tag>"
      workDir:  "oci://<bucket_name>@<bucket_namespace>/<bucket_prefix>"
      name: "horovod_tf"
      config:
        env:
          # MIN_NP, MAX_NP and SLOTS are inferred from the shape. Modify only when needed.
          # - name: MIN_NP
          #   value: 2
          # - name: MAX_NP
          #   value: 4
          # - name: SLOTS
          #   value: 2
          - name: WORKER_PORT
            value: 12345
          - name: START_TIMEOUT #Optional: Defaults to 600.
            value: 600
          - name: ENABLE_TIMELINE # Optional: Disabled by Default.Significantly increases training duration if switched on (1).
            value: 0
          - name: SYNC_ARTIFACTS #Mandatory: Switched on by Default.
            value: 1
          - name: WORKSPACE #Mandatory if SYNC_ARTIFACTS==1: Destination object bucket to sync generated artifacts to.
            value: "<bucket_name>"
          - name: WORKSPACE_PREFIX #Mandatory if SYNC_ARTIFACTS==1: Destination object bucket folder to sync generated artifacts to.
            value: "<bucket_prefix>"
          - name: HOROVOD_ARGS # Parameters for cluster tuning.
            value: "--verbose"
      main:
        name: "scheduler"
        replicas: 1 #this will be always 1
      worker:
        name: "worker"
        replicas: 2 #number of workers
  runtime:
    kind: python
    apiVersion: v1.0
    spec:
      entryPoint: "/code/train.py" #location of user's training script in docker image.
      args:  #any arguments that the training script requires.
      env:

Use ads opctl to create the cluster infrastructure and run the workload:

Do a dry run to inspect how the yaml translates to Job and Job Runs

ads opctl run -f train.yaml --dry-run

This will give output similar to this.

  -----------------------------Entering dryrun mode----------------------------------
Creating Job with payload:
kind: job
spec:
  infrastructure:
    kind: infrastructure
    spec:
      projectId: oci.xxxx.<project_ocid>
      compartmentId: oci.xxxx.<compartment_ocid>
      displayName: HVD-Distributed-TF
      logGroupId: oci.xxxx.<log_group_ocid>
      logId: oci.xxx.<log_ocid>
      subnetId: oci.xxxx.<subnet-ocid>
      shapeName: VM.GPU2.1
      blockStorageSize: 50
    type: dataScienceJob
  name: horovod_tf
  runtime:
    kind: runtime
    spec:
      entrypoint: null
      env:
      - name: WORKER_PORT
        value: 12345
      - name: START_TIMEOUT
        value: 600
      - name: ENABLE_TIMELINE
        value: 0
      - name: SYNC_ARTIFACTS
        value: 1
      - name: WORKSPACE
        value: "<bucket_name>"
      - name: WORKSPACE_PREFIX
        value: "<bucket_prefix>"
      - name: HOROVOD_ARGS
        value: --verbose
      - name: OCI__WORK_DIR
        value: oci://<bucket_name>@<bucket_namespace>/<bucket_prefix>
      - name: OCI__EPHEMERAL
        value: None
      - name: OCI__CLUSTER_TYPE
        value: HOROVOD
      - name: OCI__WORKER_COUNT
        value: '2'
      - name: OCI__START_ARGS
        value: ''
      - name: OCI__ENTRY_SCRIPT
        value: /code/train.py
      image: "<region>.ocir.io/<tenancy_id>/<repo_name>/<image_name>:<image_tag>"
    type: container

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Creating Main Job with following details:
Name: scheduler
Environment Variables:
    OCI__MODE:MAIN
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Creating 2 worker jobs with following details:
Name: worker
Environment Variables:
    OCI__MODE:WORKER
-----------------------------Ending dryrun mode----------------------------------

Test Locally:

Before submitting the workload to jobs, you can run it locally to test your code, dependencies, configurations etc. With -b local flag, it uses a local backend. Further when you need to run this workload on OCI data science jobs, simply use -b job flag instead.

ads opctl run -f train.yaml -b local

If your code requires to use any oci services (like object bucket), you need to mount oci keys from your local host machine onto the container. This is already done for you assuming the typical location of oci keys ~/.oci. You can modify it though, in-case you have keys at a different location. You need to do this in the config.ini file.

oci_key_mnt = ~/.oci:/home/oci_dist_training/.oci

Note that the local backend requires the source code for your workload is available locally in the source folder specified in the config.ini file. If you specified Git repository or OCI object storage location as source code location in your workflow YAML, please make sure you have a local copy available for local testing.

Submit the workload:

ads opctl run -f train.yaml -b job

Note:: This will automatically push the docker image to the OCI container registry repo .

Once running, you will see on the terminal outputs similar to the below

info.yaml
jobId: oci.xxxx.<job_ocid>
mainJobRunId:
  mainJobRunIdName: oci.xxxx.<job_run_ocid>
workDir: oci://my-bucket@my-namespace/cluster-testing/005
otherJobRunIds:
  - workerJobRunIdName_1: oci.xxxx.<job_run_ocid>
  - workerJobRunIdName_2: oci.xxxx.<job_run_ocid>
  - workerJobRunIdName_3: oci.xxxx.<job_run_ocid>

This information can be saved as YAML file and used as input to ads opctl distributed-training show-config -f <info.yaml>. You can use --job-info to save the job run info into YAML, for example:

ads opctl run -f train.yaml --job-info info.yaml

Saving Artifacts to Object Storage Buckets

In case you want to save the artifacts generated by the training process (model checkpoints, TensorBoard logs, etc.) to an object bucket you can use the ‘sync’ feature. The environment variable OCI__SYNC_DIR exposes the directory location that will be automatically synchronized to the configured object storage bucket location. Use this directory in your training script to save the artifacts.

To configure the destination object storage bucket location, use the following settings in the workload yaml file(train.yaml).

- name: SYNC_ARTIFACTS
  value: 1
- name: WORKSPACE
  value: "<bucket_name>"
- name: WORKSPACE_PREFIX
  value: "<bucket_prefix>"

Note: Change SYNC_ARTIFACTS to 0 to disable this feature. Use OCI__SYNC_DIR env variable in your code to save the artifacts. For Example :

tf.keras.callbacks.ModelCheckpoint(os.path.join(os.environ.get("OCI__SYNC_DIR"),"ckpts",'checkpoint-{epoch}.h5'))

Monitoring the workload logs

To view the logs from a job run, you could run -

ads jobs watch oci.xxxx.<job_run_ocid>

For more monitoring options, please refer to Monitoring Horovod Training