#!/usr/bin/env python
# -*- coding: utf-8; -*-
# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
import copy
import os
import shlex
import shutil
import tempfile
from distutils import dir_util
from typing import Dict, Tuple, Union
from jinja2 import Environment, PackageLoader
from ads.common.auth import AuthContext, create_signer
from ads.common.oci_client import OCIClientFactory
from ads.jobs import (
ContainerRuntime,
DataScienceJob,
DataScienceJobRun,
GitPythonRuntime,
Job,
NotebookRuntime,
PythonRuntime,
ScriptRuntime,
)
from ads.opctl import logger
from ads.opctl.backend.base import (
Backend,
RuntimeFactory,
)
from ads.opctl.config.resolver import ConfigResolver
from ads.opctl.constants import DEFAULT_IMAGE_SCRIPT_DIR
from ads.opctl.distributed.common.cluster_config_helper import (
ClusterConfigToJobSpecConverter,
)
REQUIRED_FIELDS = [
"project_id",
"compartment_id",
"subnet_id",
"block_storage_size_in_GBs",
"shape_name",
]
[docs]class MLJobBackend(Backend):
def __init__(self, config: Dict) -> None:
"""
Initialize a MLJobBackend object given config dictionary.
Parameters
----------
config: dict
dictionary of configurations
"""
self.config = config
self.oci_auth = create_signer(
config["execution"].get("auth"),
config["execution"].get("oci_config", None),
config["execution"].get("oci_profile", None),
)
self.auth_type = config["execution"].get("auth")
self.profile = config["execution"].get("oci_profile", None)
self.client = OCIClientFactory(**self.oci_auth).data_science
[docs] def init(
self,
uri: Union[str, None] = None,
overwrite: bool = False,
runtime_type: Union[str, None] = None,
**kwargs: Dict,
) -> Union[str, None]:
"""Generates a starter YAML specification for a Data Science Job.
Parameters
----------
overwrite: (bool, optional). Defaults to False.
Overwrites the result specification YAML if exists.
uri: (str, optional), Defaults to None.
The filename to save the resulting specification template YAML.
runtime_type: (str, optional). Defaults to None.
The resource runtime type.
**kwargs: Dict
The optional arguments.
Returns
-------
Union[str, None]
The YAML specification for the given resource if `uri` was not provided.
`None` otherwise.
"""
with AuthContext(auth=self.auth_type, profile=self.profile):
# define a job
job = (
Job()
.with_name(
"{Job name. For MLflow, it will be replaced with the Project name}"
)
.with_infrastructure(
DataScienceJob(
**(self.config.get("infrastructure", {}) or {})
).init()
)
.with_runtime(
JobRuntimeFactory.get_runtime(
key=runtime_type or PythonRuntime().type
).init()
)
)
note = (
"# This YAML specification was auto generated by the `ads opctl init` command.\n"
"# The more details about the jobs YAML specification can be found in the ADS documentation:\n"
"# https://accelerated-data-science.readthedocs.io/en/latest/user_guide/jobs/index.html \n\n"
)
return job.to_yaml(
uri=uri,
overwrite=overwrite,
note=note,
filter_by_attribute_map=True,
**kwargs,
)
[docs] def apply(self) -> None:
"""
Create Job and Job Run from YAML.
"""
with AuthContext(auth=self.auth_type, profile=self.profile):
job = Job.from_dict(self.config)
job.create()
job_run = job.run()
print("JOB OCID:", job.id)
print("JOB RUN OCID:", job_run.id)
[docs] def run(self) -> None:
"""
Create Job and Job Run from OCID or cli parameters.
"""
# TODO Check that this still runs smoothly for distributed
with AuthContext(auth=self.auth_type, profile=self.profile):
if self.config["execution"].get("ocid", None):
job_id = self.config["execution"]["ocid"]
run_id = (
Job.from_datascience_job(self.config["execution"]["ocid"]).run().id
)
else:
payload = self._create_payload() # create job with infrastructure
src_folder = self.config["execution"].get("source_folder")
if self.config["execution"].get("conda_type") and self.config[
"execution"
].get("conda_slug"):
# add conda runtime
job_id, run_id = self._run_with_conda_pack(payload, src_folder)
elif self.config["execution"].get("image"):
# add docker image runtime
job_id, run_id = self._run_with_image(payload)
else:
raise ValueError(
"Either conda info or image name should be provided."
)
print("JOB OCID:", job_id)
print("JOB RUN OCID:", run_id)
return {"job_id": job_id, "run_id": run_id}
[docs] def init_operator(self):
# TODO: check if folder is empty, check for force overwrite
# TODO: check that command is being run from advanced-ds repo (important until ads released)
operator_folder = self.config["execution"].get("operator_folder_path")
os.makedirs(operator_folder, exist_ok=True)
operator_folder_name = os.path.basename(os.path.normpath(operator_folder))
docker_tag = f"{os.path.join(self.config['infrastructure'].get('docker_registry'), operator_folder_name)}:latest"
self.config["execution"]["operator_folder_name"] = operator_folder_name
self.config["execution"]["docker_tag"] = docker_tag
operator_slug = self.config["execution"].get("operator_slug")
self._jinja_write(operator_slug, operator_folder)
# DONE
print(
"\nInitialization Successful.\n"
f"All code should be written in main.py located at: {os.path.join(operator_folder, 'main.py')}\n"
f"Additional libraries should be added to environment.yaml located at: {os.path.join(operator_folder, 'environment.yaml')}\n"
"Any changes to main.py will require re-building the docker image, whereas changes to args in the"
" runtime section of the yaml file do not. Write accordingly.\n"
"Run this cluster with:\n"
f"\tdocker build -t {docker_tag} -f {os.path.join(operator_folder, 'Dockerfile')} .\n"
f"\tads opctl publish-image {docker_tag} \n"
f"\tads opctl run -f {os.path.join(operator_folder, operator_slug + '.yaml')} \n"
)
return operator_folder
[docs] def delete(self):
"""
Delete Job or Job Run from OCID.
"""
if self.config["execution"].get("id"):
job_id = self.config["execution"]["id"]
with AuthContext(auth=self.auth_type, profile=self.profile):
Job.from_datascience_job(job_id).delete()
print(f"Job {job_id} has been deleted.")
elif self.config["execution"].get("run_id"):
run_id = self.config["execution"]["run_id"]
with AuthContext(auth=self.auth_type, profile=self.profile):
DataScienceJobRun.from_ocid(run_id).delete()
print(f"Job run {run_id} has been deleted.")
[docs] def cancel(self):
"""
Cancel Job Run from OCID.
"""
run_id = self.config["execution"]["run_id"]
with AuthContext(auth=self.auth_type, profile=self.profile):
DataScienceJobRun.from_ocid(run_id).cancel()
print(f"Job run {run_id} has been cancelled.")
[docs] def watch(self):
"""
Watch Job Run from OCID.
"""
run_id = self.config["execution"]["run_id"]
with AuthContext(auth=self.auth_type, profile=self.profile):
run = DataScienceJobRun.from_ocid(run_id)
run.watch()
def _jinja_write(self, operator_slug, operator_folder):
# TODO AH: fill in templates with relevant details
env = Environment(
loader=PackageLoader("ads", f"opctl/operators/{operator_slug}")
)
for setup_file in [
"Dockerfile",
"environment.yaml",
"main.py",
"run.py",
"start_scheduler.sh",
"start_worker.sh",
"dask_cluster.yaml",
]:
template = env.get_template(setup_file + ".jinja2")
with open(os.path.join(operator_folder, setup_file), "w") as ff:
ff.write(template.render(config=self.config))
def _create_payload(self, infra=None, name=None) -> Job:
if not infra:
infra = self.config.get("infrastructure", {})
# if any(k not in infra for k in REQUIRED_FIELDS):
# missing = [k for k in REQUIRED_FIELDS if k not in infra]
# raise ValueError(
# f"Following fields are missing but are required for OCI ML Jobs: {missing}. Please run `ads opctl configure`."
# )
ml_job = DataScienceJob(spec=infra if "spec" not in infra else infra["spec"])
log_group_id = infra.get("log_group_id")
log_id = infra.get("log_id")
if log_group_id:
ml_job.with_log_group_id(log_group_id)
if log_id:
ml_job.with_log_id(log_id)
if not name:
try:
name = infra.get("displayName") or self.config["execution"].get(
"job_name"
)
except:
pass
return Job(
name=name,
infrastructure=ml_job,
)
def _run_with_conda_pack(self, payload: Job, src_folder: str) -> Tuple[str, str]:
payload.with_runtime(
ScriptRuntime().with_environment_variable(
**self.config["execution"]["env_vars"]
)
)
if self.config["execution"].get("conda_type") == "service":
payload.runtime.with_service_conda(self.config["execution"]["conda_slug"])
else:
payload.runtime.with_custom_conda(self.config["execution"]["conda_uri"])
if ConfigResolver(self.config)._is_ads_operator():
with tempfile.TemporaryDirectory() as td:
os.makedirs(os.path.join(td, "operators"), exist_ok=True)
dir_util.copy_tree(
src_folder,
os.path.join(td, "operators", os.path.basename(src_folder)),
)
curr_dir = os.path.dirname(os.path.abspath(__file__))
shutil.copy(
os.path.join(curr_dir, "..", "operators", "run.py"),
os.path.join(td, "operators"),
)
payload.runtime.with_source(
os.path.join(td, "operators"), entrypoint="operators/run.py"
)
payload.runtime.set_spec(
"args", shlex.split(self.config["execution"]["command"] + " -r")
)
job = payload.create()
job_id = job.id
run_id = job.run().id
else:
with tempfile.TemporaryDirectory() as td:
dir_util.copy_tree(
src_folder, os.path.join(td, os.path.basename(src_folder))
)
payload.runtime.with_source(
os.path.normpath(os.path.join(td, os.path.basename(src_folder))),
entrypoint=os.path.join(
os.path.basename(src_folder),
self.config["execution"]["entrypoint"],
),
)
if self.config["execution"].get("command"):
payload.runtime.set_spec(
"args", shlex.split(self.config["execution"]["command"])
)
job = payload.create()
job_id = job.id
run_id = job.run().id
return job_id, run_id
def _run_with_image(self, payload: Job) -> Tuple[str, str]:
payload.with_runtime(
ContainerRuntime().with_environment_variable(
**self.config["execution"]["env_vars"]
)
)
image = self.config["execution"]["image"]
if ":" not in image:
image += ":latest"
payload.runtime.with_image(image)
if os.path.basename(image) == image:
logger.warn("Did you include registry in image name?")
if ConfigResolver(self.config)._is_ads_operator():
command = f"python {os.path.join(DEFAULT_IMAGE_SCRIPT_DIR, 'operators/run.py')} -r "
else:
command = ""
# running a non-operator image
if self.config["execution"].get("entrypoint"):
payload.runtime.with_entrypoint(self.config["execution"]["entrypoint"])
if self.config["execution"].get("command"):
command += f"{self.config['execution']['command']}"
if len(command) > 0:
payload.runtime.with_cmd(",".join(shlex.split(command)))
job = payload.create()
job_id = job.id
run_id = job.run().id
return job_id, run_id
[docs]class MLJobDistributedBackend(MLJobBackend):
DIAGNOSTIC_COMMAND = "python -m ads.opctl.diagnostics -t distributed"
def __init__(self, config: Dict) -> None:
"""
Initialize a MLJobDistributedBackend object given config dictionary.
Parameters
----------
config: dict
dictionary of configurations
"""
super().__init__(config=config)
self.job = None
[docs] def prepare_job_config(self, cluster_info):
job_conf_helper = ClusterConfigToJobSpecConverter(cluster_info)
jobdef_conf = job_conf_helper.job_def_info()
infrastructure = cluster_info.infrastructure
if jobdef_conf.get("name"):
infrastructure["spec"]["displayName"] = jobdef_conf.get("name")
job = self._create_payload(infrastructure["spec"])
envVars = {}
envVars.update(
cluster_info.cluster.config.envVars
) # Add user provided environment variables
envVars.update(
jobdef_conf.get("envVars") or {}
) # Update with `OCI__` environment variables
job.with_runtime(ContainerRuntime().with_environment_variable(**envVars))
job.runtime.with_image(image=jobdef_conf["image"])
self.job = job
if os.path.basename(jobdef_conf["image"]) == jobdef_conf["image"]:
logger.warning("Did you include registry in image name?")
main_jobrun_conf = job_conf_helper.job_run_info("main")
main_jobrun_conf["envVars"]["RANK"] = "0"
main_jobrun_conf["name"] = main_jobrun_conf.get("name") or "main"
worker_jobrun_conf = job_conf_helper.job_run_info("worker")
worker_jobrun_conf_list = []
if worker_jobrun_conf:
for i in range(cluster_info.cluster.worker.replicas):
conf = copy.deepcopy(worker_jobrun_conf)
conf["envVars"]["RANK"] = str(i + 1)
conf["name"] = (
conf.get("name", worker_jobrun_conf["envVars"]["OCI__MODE"])
+ "_"
+ str(i)
)
worker_jobrun_conf_list.append(conf)
ps_jobrun_conf = job_conf_helper.job_run_info("ps")
ps_jobrun_conf_list = []
if ps_jobrun_conf:
for i in range(cluster_info.cluster.ps.replicas):
conf = copy.deepcopy(ps_jobrun_conf)
conf["name"] = (
conf.get("name", worker_jobrun_conf["envVars"]["OCI__MODE"])
+ "_"
+ str(i)
)
ps_jobrun_conf_list.append(conf)
worker_jobrun_conf_list.extend(ps_jobrun_conf_list)
return main_jobrun_conf, worker_jobrun_conf_list
[docs] @staticmethod
def generate_worker_name(worker_jobrun_conf, i):
return f"{worker_jobrun_conf['name']}-{i}"
[docs] def run_diagnostics(self, cluster_info, dry_run=False, **kwargs):
with AuthContext(auth=self.auth_type, profile=self.profile):
main_jobrun_conf, worker_jobrun_conf_list = self.prepare_job_config(
cluster_info=cluster_info
)
self.job.runtime.with_entrypoint(["/bin/bash", "--login", "-c"])
self.job.runtime.with_cmd(MLJobDistributedBackend.DIAGNOSTIC_COMMAND)
if dry_run: # If dry run, print the job yaml on the console.
print(
"-----------------------------Entering dryrun mode----------------------------------"
)
print(f"Creating Job with payload: \n{self.job}")
print("+" * 200)
print(f"Creating Main Job Run with following details:")
print(f"Name: {main_jobrun_conf['name']}")
print(f"Additional Environment Variables: ")
main_env_Vars = main_jobrun_conf.get("envVars", {})
for k in main_env_Vars:
print(f"\t{k}:{main_env_Vars[k]}")
print("~" * 200)
print(
"-----------------------------Ending dryrun mode----------------------------------"
)
return None
else:
job = self.job.create()
# Start main job
conf = dict(main_jobrun_conf)
main_jobrun = job.run(
conf["name"],
env_var=conf["envVars"],
# freeform_tags={"distributed_training": "oracle-ads"},
)
self.job = job
main_jobrun.watch()
return job, main_jobrun
[docs] def run(self, cluster_info, dry_run=False) -> None:
"""
* Creates Job Definition and starts main and worker jobruns from that job definition
* The Job Definition will contain all the environment variables defined at the cluster/spec/config level, environment variables defined by the user at runtime/spec/env level and `OCI__` derived from the yaml specification
* The Job Run will have overrides provided by the user under cluster/spec/{main|worker}/config section and `OCI__MODE`={MASTER|WORKER} depending on the run type
"""
with AuthContext(auth=self.auth_type, profile=self.profile):
main_jobrun_conf, worker_jobrun_conf_list = self.prepare_job_config(
cluster_info=cluster_info
)
if dry_run: # If dry run, print the job yaml on the console.
print(
"-----------------------------Entering dryrun mode----------------------------------"
)
print(f"Creating Job with payload: \n{self.job}")
print(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
)
print(f"Creating Main Job Run with following details:")
print(f"Name: {main_jobrun_conf['name']}")
print(f"Additional Environment Variables: ")
main_env_Vars = main_jobrun_conf.get("envVars", {})
for k in main_env_Vars:
print(f"\t{k}:{main_env_Vars[k]}")
print(
"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
)
if cluster_info.cluster.worker:
print(f"Creating Job Runs with following details:")
for i in range(len(worker_jobrun_conf_list)):
worker_jobrun_conf = worker_jobrun_conf_list[i]
print("Name: " + worker_jobrun_conf.get("name"))
print("Additional Environment Variables: ")
worker_env_Vars = worker_jobrun_conf.get("envVars", {})
for k in worker_env_Vars:
print(f"\t{k}:{worker_env_Vars[k]}")
print(
"-----------------------------Ending dryrun mode----------------------------------"
)
return None
else:
job = self.job.create()
# Start main job
conf = dict(main_jobrun_conf)
main_jobrun = job.run(
conf["name"],
env_var=conf["envVars"],
# freeform_tags={"distributed_training": "oracle-ads"},
)
# Start worker job
worker_jobruns = []
if cluster_info.cluster.worker:
for i in range(len(worker_jobrun_conf_list)):
worker_jobrun_conf = worker_jobrun_conf_list[i]
conf = dict(worker_jobrun_conf)
jobrun = job.run(
worker_jobrun_conf.get("name"),
env_var=conf["envVars"],
)
worker_jobruns.append(jobrun)
self.job = job
return job, main_jobrun, worker_jobruns
[docs]class JobRuntimeFactory(RuntimeFactory):
"""Job runtime factory."""
_MAP = {
ContainerRuntime().type: ContainerRuntime,
ScriptRuntime().type: ScriptRuntime,
PythonRuntime().type: PythonRuntime,
NotebookRuntime().type: NotebookRuntime,
GitPythonRuntime().type: GitPythonRuntime,
}