Source code for ads.opctl.distributed.cmds

#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import json
import os
import re
import subprocess
from configparser import ConfigParser
from urllib.parse import urlparse

import fsspec
import yaml

from ads.common.auth import AuthContext, AuthType, create_signer
from ads.common.decorator.runtime_dependency import (
    OptionalDependency,
    runtime_dependency,
)
from ads.jobs import Job
from ads.jobs.builders.runtimes.artifact import Artifact
from ads.opctl.utils import publish_image as publish_image_cmd
from ads.opctl.utils import run_command

ini_file = "config.ini"


def _artifact_file_name(cluster_type, version):
    return f"{cluster_type}_{version}.tar.gz".replace("-", "_")


[docs] def initialize_workspace(cluster_type, version): base_uri = None local_file_name = f".tmp_{_artifact_file_name(cluster_type, version)}".replace( "-", "_" ) if os.environ.get("OCI_DISTRIBUTED_DEV_MODE"): base_uri = dev_mode_base_uri(cluster_type, version) else: base_uri = production_mode_base_uri(cluster_type, version) artificat_location = os.path.join( base_uri, _artifact_file_name(cluster_type, version) ) print(f"Downloading from {artificat_location} to {local_file_name}") Artifact.copy_from_uri(uri=artificat_location, to_path=local_file_name) os.makedirs("oci_dist_training_artifacts", exist_ok=True) try: subprocess.call( ["tar", "-xvf", local_file_name, "-C", "oci_dist_training_artifacts"] ) print( f"Check oci_dist_training_artifacts/{cluster_type.replace('-', '_')}/{version}/README.md for build instructions " ) finally: if os.path.exists(local_file_name): os.remove(local_file_name)
[docs] def dev_mode_base_uri(cluster_type, version): """ Temporary method to mock initialization using bucket. """ bucket_name = os.environ.get("OCI_DISTRIBUTED_DEV_MODE_BUCKET") namespace = os.environ.get("OCI_DISTRIBUTED_DEV_MODE_NAMESPACE") prefix = os.environ.get("OCI_DISTRIBUTED_DEV_MODE_PREFIX") base_uri = f"oci://{bucket_name}@{namespace}/{prefix}/" return base_uri
[docs] def production_mode_base_uri(cluster_type, version): return "https://raw.githubusercontent.com/oracle-samples/oci-data-science-ai-samples/master/distributed_training/artifacts/"
[docs] def cancel_distributed_run(job_id, cluster_file_name, **kwargs): if cluster_file_name: with open(cluster_file_name) as cf: content = yaml.load(cf, yaml.SafeLoader) job_id = content["jobId"] with AuthContext(): import ads ads.set_auth(auth="api_key", profile=kwargs.get("oci_profile", "DEFAULT")) job = Job.from_datascience_job(job_id) runs = job.run_list() for job_run in runs: print(f"Cancelling Job Run: {job_run.id}") try: job_run.cancel() except Exception as e: print(f"Error cancelling: {e}") if not runs: print(f"No Job runs found for : {job_id}")
[docs] def show_config_info(job_id, work_dir, cluster_file_name, worker_info, **kwargs): if cluster_file_name: with open(cluster_file_name) as cf: content = yaml.load(cf, yaml.SafeLoader) job_id = content["jobId"] work_dir = content["workDir"] config_location = os.path.join(work_dir, job_id) scheme = urlparse( config_location, ).scheme oci_auth = ( create_signer( AuthType.API_KEY, kwargs.get("oci_config") or os.path.expanduser("~/.oci/config"), kwargs.get("oci_profile") or "DEFAULT", ) if scheme == "oci" else {} ) filesystem = fsspec.filesystem( scheme, **oci_auth ) # FileSystem class corresponding to the URI scheme. main_file = os.path.join(config_location, "MAIN_config.json") if filesystem.exists(main_file): with fsspec.open(main_file, **oci_auth) as mfile: print("Main Info:") print(yaml.dump(json.loads(mfile.read()))) if worker_info: files = fsspec.open_files( os.path.join(config_location, "WORKER*.json"), **oci_auth ) for fl in files: with fsspec.open( f"{'oci://' if oci_auth else ''}{fl.path}", **oci_auth ) as f: print(f"Worker Info from {os.path.basename(fl.path)}") print(yaml.dump(json.loads(f.read()))) else: print( f"MAIN_config file not found at location {config_location}. If you just started the cluster please wait until the main node is in `inprogress` state" )
[docs] @runtime_dependency(module="docker", install_from=OptionalDependency.OPCTL) def verify_image(img_name): """ Verify if the input image exists in OCI registry Parameters ---------- img_name Name of the docker image Returns None ------- """ import docker client = docker.from_env() try: client.images.get_registry_data(img_name) return 1 except Exception: return 0
[docs] def update_ini(tag, registry, dockerfile, source_folder, config, nobuild): """ Stores and updates input args in config.ini in Local Parameters ---------- tag tag of image registry registry to push to dockerfile relative path to Dockerfile source_folder source scripts folder that will be mounted during a local run config Job config nobuild flag for building the docker Returns dict of config.ini file ------- """ ini = ConfigParser(allow_no_value=True) if config is None: img_name = "@test" else: img_name = ( config.get("spec", {}).get("cluster", {}).get("spec", {}).get("image") ) if tag is not None: tag1 = tag else: if img_name.startswith("@"): tag1 = tag else: if len(img_name.rsplit(":", 1)) == 1: tag1 = "latest" else: tag1 = img_name.rsplit(":", 1)[1] if registry is not None: registry1 = registry else: if img_name.startswith("@"): registry1 = registry else: registry1 = img_name.rsplit(":", 1)[0] if os.path.isfile(ini_file): ini.read(ini_file) if tag1 is not None: ini.set("main", "tag", tag1) if registry1 is not None: ini.set("main", "registry", registry1) if dockerfile is not None: ini.set("main", "dockerfile", dockerfile) if source_folder is not None: ini.set("main", "source_folder", source_folder) else: ini.add_section("main") if tag1 is not None: ini.set("main", "tag", tag1) else: raise ValueError("tag arg is missing") if registry1 is not None: ini.set("main", "registry", registry1) else: raise ValueError("registry arg is missing") if dockerfile is not None: ini.set("main", "dockerfile", dockerfile) else: if nobuild: ini.set("main", "dockerfile", "DUMMY_PATH") else: raise ValueError("dockerfile arg is missing") if source_folder is not None: ini.set("main", "source_folder", source_folder) else: ini.set("main", "source_folder", ".") ini.set("main", "; mount oci keys for local testing", None) if ini.has_option("main", "oci_key_mnt"): pass else: ini.set("main", "oci_key_mnt", "~/.oci:/home/oci_dist_training/.oci") if ini.get("main", "dockerfile") == "DUMMY_PATH" and not nobuild: raise ValueError("dockerfile arg is missing") if os.path.exists(ini_file): os.remove(ini_file) with open(ini_file, "w") as f: ini.write(f) return ini
[docs] def load_ini(): """ Loads config.ini from local in dict Returns None ------- """ ini = ConfigParser(allow_no_value=True) if os.path.isfile(ini_file): ini.read(ini_file) return ini else: raise RuntimeError(f"ini_file file {ini_file} not found !")
[docs] def increment_tag_in_ini(ini): """ increments tag of image and update latest in config.ini file Parameters ---------- ini config.ini file dictionary Returns updated config.ini file ------- """ ini = increment_tag(ini) refresh_ini(ini, "config.ini") return ini
[docs] def increment_tag(ini): """ Increments the tag of the image Parameters ---------- ini config.ini file dictionary Returns updated ini dict ------- """ tag_name = ini.get("main", "tag") m = re.search(r"_v_\d+$", tag_name) if m is not None: updated_tag = re.sub( r"[0-9]+$", lambda x: f"{str(int(x.group()) + 1).zfill(len(x.group()))}", tag_name, ) else: updated_tag = f"{tag_name}_v_1" ini.set("main", "tag", updated_tag) return ini
[docs] def refresh_ini(ini, ini_file): """ write updated config.ini in local Parameters ---------- ini config.ini file dictionary ini_file Name of the config file Returns ------- """ if os.path.exists(ini_file): os.remove(ini_file) with open(ini_file, "w") as f: ini.write(f) return ini
[docs] def docker_build_cmd(ini): """ Builds the docker image Parameters ---------- ini config.ini file dictionary Returns None ------- """ cmd = get_cmd(ini) return run_cmd(cmd)
[docs] def get_cmd(ini): """ Docker build command Parameters ---------- ini config.ini file dictionary Returns Docker build command ------- """ command = [ "docker", "build", "--build-arg", "CODE_DIR=" + ini.get("main", "source_folder"), "-t", ini.get("main", "registry") + ":" + ini.get("main", "tag"), "-f", ini.get("main", "dockerfile"), ] if os.environ.get("no_proxy"): command += ["--build-arg", f"no_proxy={os.environ['no_proxy']}"] if os.environ.get("http_proxy"): command += ["--build-arg", f"http_proxy={os.environ['http_proxy']}"] if os.environ.get("https_proxy"): command += ["--build-arg", f"https_proxy={os.environ['https_proxy']}"] command += ["."] return command
[docs] def run_cmd(cmd): """ Run cli command Parameters ---------- cmd input cli command Returns None ------- """ proc = run_command(cmd) if proc.returncode != 0: raise RuntimeError(f"Docker build failed.") return 1
[docs] def horovod_cmd(code_mount, oci_key_mount, config): """ CLI command to run horovod distributed framework on local Parameters ---------- code_mount source_folder to be mounted oci_key_mount oci keys to be mounted config Job config Returns List: CLI command ------- """ command = [ "docker", "run", "-v", code_mount, "-v", oci_key_mount, "--env", "OCI_IAM_TYPE=api_key", "--rm", "--entrypoint", "/miniconda/envs/env/bin/horovodrun", config["spec"]["cluster"]["spec"]["image"], "--gloo", "-np", "2", "-H", "localhost:2", "/miniconda/envs/env/bin/python", config["spec"]["runtime"]["spec"]["entryPoint"], ] return command
[docs] def pytorch_cmd(code_mount, oci_key_mount, config): """ CLI command to run Pytorch distributed framework on local Parameters ---------- code_mount source_folder to be mounted oci_key_mount oci keys to be mounted config Job config Returns List: CLI command ------- """ command = [ "docker", "run", "-v", code_mount, "-v", oci_key_mount, "--env", "OCI_IAM_TYPE=api_key", "--env", "WORLD_SIZE=1", "--env", "RANK=0", "--env", "LOCAL_RANK=0", "--rm", "--entrypoint", "/opt/conda/bin/python", config["spec"]["cluster"]["spec"]["image"], config["spec"]["runtime"]["spec"]["entryPoint"], ] return command
[docs] def dask_cmd(code_mount, oci_key_mount, config): """ CLI command to run Dask distributed framework on local Parameters ---------- code_mount source_folder to be mounted oci_key_mount oci keys to be mounted config Job config Returns List: CLI command ------- """ command = [ "docker", "run", "-v", code_mount, "-v", oci_key_mount, "--env", "OCI_IAM_TYPE=api_key", "--env", "SCHEDULER_IP=tcp://127.0.0.1", "--rm", "--entrypoint", "/bin/sh", config["spec"]["cluster"]["spec"]["image"], "-c", "(nohup dask-scheduler >scheduler.log &) && (nohup dask-worker localhost:8786 >worker.log &) && " "/miniconda/envs/daskenv/bin/python " + config["spec"]["runtime"]["spec"]["entryPoint"], ] return command
[docs] def tensorflow_cmd(code_mount, oci_key_mount, config): """ CLI command to run Tensorflow distributed framework on local Parameters ---------- code_mount source_folder to be mounted oci_key_mount oci keys to be mounted config Job config Returns List: CLI command ------- """ if "ps" in config["spec"]["cluster"]["spec"]: # this is for the parameter server strategy. command = [ "docker", "run", "-v", code_mount, "-v", oci_key_mount, "--env", "OCI_IAM_TYPE=api_key", "--rm", "--entrypoint", "/bin/sh", config["spec"]["cluster"]["spec"]["image"], "/etc/datascience/local_test.sh", config["spec"]["runtime"]["spec"]["entryPoint"], ] else: # this is for the mirror and multiworkermirror strategy. command = [ "docker", "run", "-v", code_mount, "-v", oci_key_mount, "--env", "OCI_IAM_TYPE=api_key", "--env", 'TF_CONFIG={"cluster": {"worker": ["localhost:12345"]}, "task": {"type": "worker", "index": 0}}', "--rm", "--entrypoint", "/miniconda/bin/python", config["spec"]["cluster"]["spec"]["image"], config["spec"]["runtime"]["spec"]["entryPoint"], ] return command
[docs] def local_run(config, ini): """ Local run distributed framework on local based on input args Parameters ---------- ini config.ini file dictionary config Job run config Returns None ------- """ cwd = os.path.join(os.getcwd(), ini.get("main", "source_folder")) code_mount = os.path.join(cwd, ":/code/") oci_key_mount = ( os.path.expanduser(ini.get("main", "oci_key_mnt").split(":")[0]) + ":" + ini.get("main", "oci_key_mnt").split(":")[1] ) if config["spec"]["cluster"]["kind"].lower() == "horovod": command = horovod_cmd(code_mount, oci_key_mount, config) elif config["spec"]["cluster"]["kind"].lower() == "pytorch": command = pytorch_cmd(code_mount, oci_key_mount, config) elif config["spec"]["cluster"]["kind"].lower() == "dask": command = dask_cmd(code_mount, oci_key_mount, config) elif config["spec"]["cluster"]["kind"].lower() == "tensorflow": command = tensorflow_cmd(code_mount, oci_key_mount, config) else: raise RuntimeError(f"Framework not supported") try: command += [str(arg) for arg in config["spec"]["runtime"]["spec"]["args"]] except KeyError: pass print("Running: ", " ".join(command)) proc = run_command(command) if proc.returncode != 0: raise RuntimeError(f"Failed to run local") return 1
[docs] def update_image(config, ini): """ Update Image name in job config with latest build image Parameters ---------- config Job run config ini config.ini file dictionary Returns Updated Job run config ------- """ config["spec"]["cluster"]["spec"]["image"] = ( ini.get("main", "registry") + ":" + ini.get("main", "tag") ) return config
[docs] def verify_and_publish_image(nopush, config): """ verify existence of docker image in OCI registry and publish it Parameters ---------- nopush Doesn't push image to OCI registry config Job run config Returns None ------- """ if not nopush: publish_image_cmd(config["spec"]["cluster"]["spec"]["image"]) else: if not verify_image(config["spec"]["cluster"]["spec"]["image"]): print( "\u26A0 Image: " + config["spec"]["cluster"]["spec"]["image"] + " does not exist in registry" ) print("In order to push the image to registry enter Y else N ") inp = input("[Y/N]\n") if inp == "Y": print("\u2705 pushing image to registry") publish_image_cmd(config["spec"]["cluster"]["spec"]["image"]) else: raise RuntimeError( "Stopping the execution as image doesn't exist in OCI registry" ) return 1
[docs] def update_config_image(config): """ updates image name in config Parameters ---------- config Job config Returns updated config dict ------- """ ini = ConfigParser(allow_no_value=True) img_name = config.get("spec", {}).get("cluster", {}).get("spec", {}).get("image") if img_name.startswith("@"): if os.path.isfile(ini_file): ini.read(ini_file) return update_image(config, ini) else: raise ValueError( f"Invalid image name: {img_name} and also not able to locate config.ini file. " f"Please update image name in Job config." ) else: return config