Creating Workloads


  1. Internet Connection

  2. ADS cli is installed

  3. Install docker:

Write your training code:

While running distributed workload, the IP address of the scheduler is known only during the runtime. The IP address is exported as environment variable - SCHEDULER_IP in all the nodes when the Job Run is in IN_PROGRESS state. Create dask.distributed.Client object using environment variable to specify the IP address. Eg. -

client = Client(f"{os.environ['SCHEDULER_IP']}:{os.environ.get('SCHEDULER_PORT','8786')}")

see Writing Dask Code for more examples.

For this example, the code to run on the cluster will be:
from dask.distributed import Client
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV

import pandas as pd
import joblib
import os
import argparse

default_n_samples = int(os.getenv("DEFAULT_N_SAMPLES", "1000"))

parser = argparse.ArgumentParser()
parser.add_argument("--n_samples", default=default_n_samples, type=int, help="size of dataset")
parser.add_argument("--cv", default=3, type=int, help="number of cross validations")
args, unknownargs = parser.parse_known_args()

# Using environment variable to fetch the SCHEDULER_IP is important.
client = Client(f"{os.environ['SCHEDULER_IP']}:{os.environ.get('SCHEDULER_PORT','8786')}")

X, y = make_classification(n_samples=args.n_samples, random_state=42)

with joblib.parallel_backend("dask"):
        SVC(gamma="auto", random_state=0, probability=True),
            "C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
            "kernel": ["rbf", "poly", "sigmoid"],
            "shrinking": [True, False],
    ).fit(X, y)

Initialize a distributed-training folder:

At this point you have created a training file (or files) - in the above example. Now running the command below

Note: This step requires an internet connection. The init command initializes your code directory with dask related artifacts to build

ads opctl distributed-training init --framework dask

Containerize your code and build container:

Before you can build the image, you must set the following environment variables:

Specify image name and tag

export IMAGE_NAME=<>
export TAG=latest

Build the container image.

ads opctl distributed-training build-image \
    -t $TAG \
    -reg $IMAGE_NAME \
    -df oci_dist_training_artifacts/dask/v1/Dockerfile

The code is assumed to be in the current working directory. To override the source code directory, use the -s flag and specify the code dir. This folder should be within the current working directory.

ads opctl distributed-training build-image \
    -t $TAG \
    -reg $IMAGE_NAME \
    -df oci_dist_training_artifacts/dask/v1/Dockerfile
    -s <code_dir>

If you are behind proxy, ads opctl will automatically use your proxy settings (defined via no_proxy, http_proxy and https_proxy).

Define your workload yaml:

The yaml file is a declarative way to express the workload. Refer YAML schema for more details.

kind: distributed
apiVersion: v1.0
    kind: infrastructure
    type: dataScienceJob
    apiVersion: v1.0
      projectId: oci.xxxx.<project_ocid>
      compartmentId: oci.xxxx.<compartment_ocid>
      displayName: my_distributed_training
      logGroupId: oci.xxxx.<log_group_ocid>
      subnetId: oci.xxxx.<subnet-ocid>
      shapeName: VM.Standard2.4
      blockStorageSize: 50
    kind: dask
    apiVersion: v1.0
      workDir: "oci://my-bucket@my-namespace/daskexample/001"
      name: GridSearch Dask
          replicas: 2
    kind: python
    apiVersion: v1.0
      entryPoint: ""
      kwargs: "--cv 5"
        - name: DEFAULT_N_SAMPLES
          value: 5000

Use ads opctl to create the cluster infrastructure and run the workload:

Do a dry run to inspect how the yaml translates to Job and Job Runs. This does not create actual Job or Job Run.

ads opctl run -f train.yaml --dry-run

This will give an option similar to this -

-----------------------------Entering dryrun mode----------------------------------
Creating Job with payload:
kind: job
    kind: infrastructure
      blockStorageSize: 50
      compartmentId: oci.xxxx.<compartment_ocid>
      displayName: GridSearch Dask
      jobInfrastructureType: ME_STANDALONE
      jobType: DEFAULT
      logGroupId: oci.xxxx.<log_group_ocid>
      logId: oci.xxxx.<log_ocid>
      projectId: oci.xxxx.<project_ocid>
      shapeName: VM.Standard2.4
      subnetId: oci.xxxx.<subnet-ocid>
    type: dataScienceJob
  name: GridSearch Dask
    kind: runtime
      entrypoint: null
      - name: OCI__WORK_DIR
        value: oci://my-bucket@my-namespace/daskexample/001
      - name: OCI__EPHEMERAL
        value: None
      - name: OCI__CLUSTER_TYPE
        value: DASK
      - name: OCI__WORKER_COUNT
        value: '2'
      - name: OCI__START_ARGS
        value: ''
      - name: OCI__ENTRY_SCRIPT
        value: --cv 5
      - name: DEFAULT_N_SAMPLES
        value: '5000'
    type: container

Creating Main Job with following details:
Name: main
Environment Variables:
Creating 2 worker jobs with following details:
Name: worker
Environment Variables:
-----------------------------Ending dryrun mode----------------------------------

Test Locally:

Before submitting the workload to jobs, you can run it locally to test your code, dependencies, configurations etc. With -b local flag, it uses a local backend. Further when you need to run this workload on OCI data science jobs, simply use -b job flag instead.

ads opctl run -f train.yaml -b local

If your code requires to use any oci services (like object bucket), you need to mount oci keys from your local host machine onto the container. This is already done for you assuming the typical location of oci keys ~/.oci. You can modify it though, in-case you have keys at a different location. You need to do this in the config.ini file.

oci_key_mnt = ~/.oci:/home/oci_dist_training/.oci

Note that the local backend requires the source code for your workload is available locally in the source folder specified in the config.ini file. If you specified Git repository or OCI object storage location as source code location in your workflow YAML, please make sure you have a local copy available for local testing.

Submit the workload:

ads opctl run -f train.yaml -b job

Note:: This will automatically push the docker image to the OCI container registry repo .

Once running, you will see on the terminal outputs similar to the below

jobId: oci.xxxx.<job_ocid>
  mainJobRunIdName: oci.xxxx.<job_run_ocid>
workDir: oci://my-bucket@my-namespace/cluster-testing/005
  - workerJobRunIdName_1: oci.xxxx.<job_run_ocid>
  - workerJobRunIdName_2: oci.xxxx.<job_run_ocid>
  - workerJobRunIdName_3: oci.xxxx.<job_run_ocid>

This information can be saved as YAML file and used as input to ads opctl distributed-training show-config -f <info.yaml>. You can use --job-info to save the job run info into YAML, for example:

ads opctl run -f train.yaml --job-info info.yaml

Monitoring the workload logs

To view the logs from a job run, you could run -

ads opctl watch oci.xxxx.<job_run_ocid>

You could stream the logs from any of the job run ocid using ads opctl watch command. You could run this command from multiple terminal to watch all of the job runs. Typically, watching mainJobRunId should yield most informative log.

To find the IP address of the scheduler dashboard, you could check the configuration file generated by the Main job by running -

ads opctl distributed-training show-config -f info.yaml

This will generate an output such as follows -

Main Info:
OCI__MAIN_IP: <ip address>
SCHEDULER_IP: <ip address>
tmpdir: oci://my-bucket@my-namesapce/daskcluster-testing/005/oci.xxxx.<job_ocid>

Dask dashboard is host at : http://{SCHEDULER_IP}:8787 If the IP address is reachable from your workstation network, you can access the dashboard directly from your workstation. The alternate approach is to use either a Bastion host on the same subnet as the Job Runs and create an ssh tunnel from your workstation.

For more information about the dashboard, checkout

Saving Artifacts to Object Storage Buckets

In case you want to save the artifacts generated by the training process (model checkpoints, TensorBoard logs, etc.) to an object bucket you can use the ‘sync’ feature. The environment variable OCI__SYNC_DIR exposes the directory location that will be automatically synchronized to the configured object storage bucket location. Use this directory in your training script to save the artifacts.

To configure the destination object storage bucket location, use the following settings in the workload yaml file(train.yaml).

  value: 1
  value: "<bucket_name>"
  value: "<bucket_prefix>"

Note: Change SYNC_ARTIFACTS to 0 to disable this feature. Use OCI__SYNC_DIR env variable in your code to save the artifacts. For Example :

with open(os.path.join(os.environ.get("OCI__SYNC_DIR"),"results.txt"), "w") as rf:
  rf.write(f"Best Params are: {grid.best_params_}, Score is {grid.best_score_}")

Terminating In-Progress Cluster

To terminate a running cluster, you could run -

ads opctl distributed-training cancel -f info.yaml