Creating Workloads¶
Prerequisites
Internet Connection
ADS cli is installed
Install docker: https://docs.docker.com/get-docker
Write your training code:
While running distributed workload, the IP address of the scheduler is known only during the runtime. The IP address is exported as environment variable - SCHEDULER_IP
in all the nodes when the Job Run is in IN_PROGRESS state.
Create dask.distributed.Client
object using environment variable to specify the IP address.
Eg. -
client = Client(f"{os.environ['SCHEDULER_IP']}:{os.environ.get('SCHEDULER_PORT','8786')}")
see Writing Dask Code for more examples.
For this example, the code to run on the cluster will be:
from dask.distributed import Client
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
import pandas as pd
import joblib
import os
import argparse
default_n_samples = int(os.getenv("DEFAULT_N_SAMPLES", "1000"))
parser = argparse.ArgumentParser()
parser.add_argument("--n_samples", default=default_n_samples, type=int, help="size of dataset")
parser.add_argument("--cv", default=3, type=int, help="number of cross validations")
args, unknownargs = parser.parse_known_args()
# Using environment variable to fetch the SCHEDULER_IP is important.
client = Client(f"{os.environ['SCHEDULER_IP']}:{os.environ.get('SCHEDULER_PORT','8786')}")
X, y = make_classification(n_samples=args.n_samples, random_state=42)
with joblib.parallel_backend("dask"):
GridSearchCV(
SVC(gamma="auto", random_state=0, probability=True),
param_grid={
"C": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
"kernel": ["rbf", "poly", "sigmoid"],
"shrinking": [True, False],
},
return_train_score=False,
cv=args.cv,
n_jobs=-1,
).fit(X, y)
Initialize a distributed-training folder:
At this point you have created a training file (or files) - gridsearch.py
in the above
example. Now running the command below
Note: This step requires an internet connection. The init command initializes your code directory with dask related artifacts to build
ads opctl distributed-training init --framework dask
Containerize your code and build container:
Before you can build the image, you must set the following environment variables:
Specify image name and tag
export IMAGE_NAME=<region.ocir.io/my-tenancy/image-name>
export TAG=latest
Build the container image.
ads opctl distributed-training build-image \
-t $TAG \
-reg $IMAGE_NAME \
-df oci_dist_training_artifacts/dask/v1/Dockerfile
The code is assumed to be in the current working directory. To override the source code directory, use the -s
flag and specify the code dir. This folder should be within the current working directory.
ads opctl distributed-training build-image \
-t $TAG \
-reg $IMAGE_NAME \
-df oci_dist_training_artifacts/dask/v1/Dockerfile
-s <code_dir>
If you are behind proxy, ads opctl will automatically use your proxy settings (defined via no_proxy
, http_proxy
and https_proxy
).
Define your workload yaml:
The yaml
file is a declarative way to express the workload. Refer YAML schema for more details.
kind: distributed
apiVersion: v1.0
spec:
infrastructure:
kind: infrastructure
type: dataScienceJob
apiVersion: v1.0
spec:
projectId: oci.xxxx.<project_ocid>
compartmentId: oci.xxxx.<compartment_ocid>
displayName: my_distributed_training
logGroupId: oci.xxxx.<log_group_ocid>
logId: oci.xxx.<log_ocid>
subnetId: oci.xxxx.<subnet-ocid>
shapeName: VM.Standard2.4
blockStorageSize: 50
cluster:
kind: dask
apiVersion: v1.0
spec:
image: my-region.ocir.io/my-tenancy/dask-cluster-examples:dev
workDir: "oci://my-bucket@my-namespace/daskexample/001"
name: GridSearch Dask
main:
config:
worker:
config:
replicas: 2
runtime:
kind: python
apiVersion: v1.0
spec:
entryPoint: "gridsearch.py"
kwargs: "--cv 5"
env:
- name: DEFAULT_N_SAMPLES
value: 5000
Use ads opctl to create the cluster infrastructure and run the workload:
Do a dry run to inspect how the yaml translates to Job and Job Runs. This does not create actual Job or Job Run.
ads opctl run -f train.yaml --dry-run
This will give an option similar to this -
-----------------------------Entering dryrun mode----------------------------------
Creating Job with payload:
kind: job
spec:
infrastructure:
kind: infrastructure
spec:
blockStorageSize: 50
compartmentId: oci.xxxx.<compartment_ocid>
displayName: GridSearch Dask
jobInfrastructureType: ME_STANDALONE
jobType: DEFAULT
logGroupId: oci.xxxx.<log_group_ocid>
logId: oci.xxxx.<log_ocid>
projectId: oci.xxxx.<project_ocid>
shapeName: VM.Standard2.4
subnetId: oci.xxxx.<subnet-ocid>
type: dataScienceJob
name: GridSearch Dask
runtime:
kind: runtime
spec:
entrypoint: null
env:
- name: OCI__WORK_DIR
value: oci://my-bucket@my-namespace/daskexample/001
- name: OCI__EPHEMERAL
value: None
- name: OCI__CLUSTER_TYPE
value: DASK
- name: OCI__WORKER_COUNT
value: '2'
- name: OCI__START_ARGS
value: ''
- name: OCI__ENTRY_SCRIPT
value: gridsearch.py
- name: OCI__ENTRY_SCRIPT_KWARGS
value: --cv 5
- name: DEFAULT_N_SAMPLES
value: '5000'
image: my-region.ocir.io/my-tenancy/dask-cluster-examples:dev
type: container
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Creating Main Job with following details:
Name: main
Environment Variables:
OCI__MODE:MAIN
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Creating 2 worker jobs with following details:
Name: worker
Environment Variables:
OCI__MODE:WORKER
-----------------------------Ending dryrun mode----------------------------------
Test Locally:
Before submitting the workload to jobs, you can run it locally to test your code, dependencies, configurations etc.
With -b local
flag, it uses a local backend. Further when you need to run this workload on OCI data science jobs, simply use -b job
flag instead.
ads opctl run -f train.yaml -b local
If your code requires to use any oci services (like object bucket), you need to mount oci keys from your local host machine onto the container. This is already done for you assuming the typical location of oci keys ~/.oci
. You can modify it though, in-case you have keys at a different location. You need to do this in the config.ini file.
oci_key_mnt = ~/.oci:/home/oci_dist_training/.oci
Note that the local backend requires the source code for your workload is available locally in the source folder specified in the config.ini
file.
If you specified Git repository or OCI object storage location as source code location in your workflow YAML, please make sure you have a local copy available for local testing.
Submit the workload:
ads opctl run -f train.yaml -b job
Note:: This will automatically push the docker image to the OCI container registry repo .
Once running, you will see on the terminal outputs similar to the below
jobId: oci.xxxx.<job_ocid>
mainJobRunId:
mainJobRunIdName: oci.xxxx.<job_run_ocid>
workDir: oci://my-bucket@my-namespace/cluster-testing/005
otherJobRunIds:
- workerJobRunIdName_1: oci.xxxx.<job_run_ocid>
- workerJobRunIdName_2: oci.xxxx.<job_run_ocid>
- workerJobRunIdName_3: oci.xxxx.<job_run_ocid>
This information can be saved as YAML file and used as input to ads opctl distributed-training show-config -f <info.yaml>
.
You can use --job-info
to save the job run info into YAML, for example:
ads opctl run -f train.yaml --job-info info.yaml
Monitoring the workload logs
To view the logs from a job run, you could run -
ads opctl watch oci.xxxx.<job_run_ocid>
You could stream the logs from any of the job run ocid using ads opctl watch
command. You could run this command from multiple terminal to watch all of the job runs. Typically, watching mainJobRunId
should yield most informative log.
To find the IP address of the scheduler dashboard, you could check the configuration file generated by the Main job by running -
ads opctl distributed-training show-config -f info.yaml
This will generate an output such as follows -
Main Info:
OCI__MAIN_IP: <ip address>
SCHEDULER_IP: <ip address>
tmpdir: oci://my-bucket@my-namesapce/daskcluster-testing/005/oci.xxxx.<job_ocid>
Dask dashboard is host at : http://{SCHEDULER_IP}:8787
If the IP address is reachable from your workstation network, you can access the dashboard directly from your workstation.
The alternate approach is to use either a Bastion host on the same subnet as the Job Runs and create an ssh tunnel from your workstation.
For more information about the dashboard, checkout https://docs.dask.org/en/stable/diagnostics-distributed.html
Saving Artifacts to Object Storage Buckets
In case you want to save the artifacts generated by the training process (model checkpoints, TensorBoard logs, etc.) to an object bucket
you can use the ‘sync’ feature. The environment variable OCI__SYNC_DIR
exposes the directory location that will be automatically synchronized
to the configured object storage bucket location. Use this directory in your training script to save the artifacts.
To configure the destination object storage bucket location, use the following settings in the workload yaml file(train.yaml).
- name: SYNC_ARTIFACTS
value: 1
- name: WORKSPACE
value: "<bucket_name>"
- name: WORKSPACE_PREFIX
value: "<bucket_prefix>"
Note: Change SYNC_ARTIFACTS
to 0
to disable this feature.
Use OCI__SYNC_DIR
env variable in your code to save the artifacts. For Example :
with open(os.path.join(os.environ.get("OCI__SYNC_DIR"),"results.txt"), "w") as rf:
rf.write(f"Best Params are: {grid.best_params_}, Score is {grid.best_score_}")
Terminating In-Progress Cluster
To terminate a running cluster, you could run -
ads opctl distributed-training cancel -f info.yaml