Source code for ads.opctl.backend.local

#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import copy
import json
import os
import runpy
import sys
import tempfile
from concurrent.futures import Future, ThreadPoolExecutor
from time import sleep
from typing import Dict, List, Optional, Union

from oci.data_science.models import PipelineStepRun

from ads.common.auth import AuthContext, create_signer
from ads.common.decorator.runtime_dependency import (
    OptionalDependency,
    runtime_dependency,
)
from ads.common.oci_client import OCIClientFactory
from ads.config import NO_CONTAINER
from ads.model.model_metadata import ModelCustomMetadata
from ads.model.runtime.runtime_info import RuntimeInfo
from ads.opctl import logger
from ads.opctl.backend.base import Backend
from ads.opctl.conda.cmds import _install
from ads.opctl.config.resolver import ConfigResolver
from ads.opctl.constants import (
    DEFAULT_IMAGE_CONDA_DIR,
    DEFAULT_IMAGE_HOME_DIR,
    DEFAULT_IMAGE_SCRIPT_DIR,
    DEFAULT_MODEL_DEPLOYMENT_FOLDER,
    DEFAULT_MODEL_FOLDER,
    DEFAULT_NOTEBOOK_SESSION_CONDA_DIR,
    DEFAULT_NOTEBOOK_SESSION_SPARK_CONF_DIR,
    ML_JOB_GPU_IMAGE,
    ML_JOB_IMAGE,
)
from ads.opctl.distributed.cmds import load_ini, local_run
from ads.opctl.model.cmds import _download_model
from ads.opctl.operator import __operators__
from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS
from ads.opctl.operator.common.operator_loader import OperatorInfo, OperatorLoader
from ads.opctl.operator.runtime import const as operator_runtime_const
from ads.opctl.operator.runtime import runtime as operator_runtime
from ads.opctl.spark.cmds import (
    generate_core_site_properties,
    generate_core_site_properties_str,
)
from ads.opctl.utils import (
    build_image,
    get_docker_client,
    is_in_notebook_session,
    run_command,
    run_container,
)
from ads.pipeline.ads_pipeline import Pipeline, PipelineStep


[docs] class CondaPackNotFound(Exception): # pragma: no cover pass
[docs] class LocalBackend(Backend): def __init__(self, config: Dict) -> None: """ Initialize a LocalBackend object with given config. Parameters ---------- config: dict dictionary of configurations """ super().__init__(config=config)
[docs] def run(self): if self.config.get("version") == "v1.0": docker_image = self.config["spec"]["Infrastructure"]["spec"]["dockerImage"] # TODO: don't hard code api keys bind_volumes = { os.path.expanduser("~/.oci"): { "bind": os.path.join(DEFAULT_IMAGE_HOME_DIR, ".oci"), "mode": "ro", } } self._run_with_image_v1(bind_volumes) else: bind_volumes = {} if not is_in_notebook_session(): bind_volumes = { os.path.expanduser( os.path.dirname(self.config["execution"]["oci_config"]) ): {"bind": os.path.join(DEFAULT_IMAGE_HOME_DIR, ".oci")} } if self.config["execution"].get("conda_slug", None): exit_code = self._run_with_conda_pack(bind_volumes) elif self.config["execution"].get("image"): exit_code = self._run_with_image(bind_volumes) else: raise ValueError("Either conda pack info or image should be specified.") if exit_code != 0: raise RuntimeError( f"Job did not complete successfully. Exit code: {exit_code}. " f"Run with the --debug argument to view container logs." )
[docs] @runtime_dependency(module="docker", install_from=OptionalDependency.OPCTL) def init_vscode_container(self) -> None: """ Create a .devcontainer.json file for development with VSCode. Returns ------- None """ source_folder = self.config["execution"].get("source_folder", None) if source_folder: source_folder = os.path.abspath(source_folder) if not os.path.exists(source_folder): raise FileNotFoundError( f"source folder {source_folder} does not exist." ) if self.config["execution"].get("gpu", False): image = self.config["execution"].get("image", ML_JOB_GPU_IMAGE) else: image = self.config["execution"].get("image", ML_JOB_IMAGE) oci_config_folder = os.path.expanduser( os.path.dirname(self.config["execution"]["oci_config"]) ) dev_container = { "image": image, "extensions": ["ms-python.python"], "mounts": [ f"source={oci_config_folder},target={os.path.join(DEFAULT_IMAGE_HOME_DIR, '.oci')},type=bind", ], "workspaceMount": f"source={source_folder},target={os.path.join(DEFAULT_IMAGE_HOME_DIR, os.path.basename(source_folder))},type=bind", "workspaceFolder": DEFAULT_IMAGE_HOME_DIR, "name": f"{image}-dev-env", } if image == ML_JOB_IMAGE or image == ML_JOB_GPU_IMAGE: conda_folder = os.path.expanduser( self.config["execution"]["conda_pack_folder"] ) dev_container["mounts"].append( f"source={conda_folder},target={DEFAULT_IMAGE_CONDA_DIR},type=bind" ) dev_container[ "postCreateCommand" ] = "conda init bash && source ~/.bashrc" else: raise ValueError( "`--source-folder` option works with image `ml-job`, `ml-job-gpu` only. Those can be build with `ads opctl build-image`. Please check `ads opctl build-image -h`." ) else: image = self.config["execution"].get("image", None) if not image: raise ValueError("Image must be specified.") else: dev_container = { "image": image, "mounts": [], "extensions": ["ms-python.python"], } dev_container["containerEnv"] = self.config["execution"].get("env_vars", {}) for k, v in self.config["execution"]["volumes"].items(): dev_container["mounts"].append( f"source={os.path.abspath(k)},target={v['bind']},type=bind" ) try: client = get_docker_client() client.api.inspect_image(image) except docker.errors.ImageNotFound: cmd = None if image == ML_JOB_IMAGE: cmd = "ads opctl build-image job-local" elif image == ML_JOB_GPU_IMAGE: cmd = "ads opctl build-image job-local -g" if cmd: raise RuntimeError( f"Image {image} not found. Please run `{cmd}` to build the image." ) else: raise RuntimeError(f"Image {image} not found.") if source_folder: with open( os.path.join(os.path.abspath(source_folder), ".devcontainer.json"), "w" ) as f: f.write(json.dumps(dev_container, indent=2)) print(f"File {os.path.join(source_folder, '.devcontainer.json')} created.") else: with open(os.path.abspath(".devcontainer.json"), "w") as f: f.write(json.dumps(dev_container, indent=2)) print(f"File {os.path.abspath('.devcontainer.json')} created.")
def _run_with_conda_pack( self, bind_volumes: Dict, extra_cmd: str = "", install: bool = False, conda_uri: str = "", ) -> int: env_vars = self.config["execution"].get("env_vars", {}) slug = self.config["execution"]["conda_slug"] image = self.config["execution"].get("image", None) # bind_volumes is modified in-place and does not need to be returned # it is returned just to be explicit that it is changed during this function call bind_volumes, env_vars = self._check_conda_pack_and_install_if_applicable( slug, bind_volumes, env_vars, install=install, conda_uri=conda_uri ) bind_volumes = self._mount_source_folder_if_exists(bind_volumes) command = self._build_command_for_conda_run(extra_cmd) if is_in_notebook_session(): run_command(command, shell=True) else: conda_pack_path = os.path.join( os.path.expanduser(self.config["execution"]["conda_pack_folder"]), slug ) if os.path.exists(os.path.join(conda_pack_path, "spark-defaults.conf")): env_vars["SPARK_CONF_DIR"] = os.path.join(DEFAULT_IMAGE_CONDA_DIR, slug) logger.info( f"Running with conda pack in a container with command {command}" ) return self._activate_conda_env_and_run( image, slug, command, bind_volumes, env_vars ) def _build_command_for_conda_run(self, extra_cmd: str = "") -> str: if ConfigResolver(self.config)._is_ads_operator(): if is_in_notebook_session(): curr_dir = os.path.dirname(os.path.abspath(__file__)) script = os.path.abspath( os.path.join(curr_dir, "..", "operators", "run.py") ) else: script = os.path.join(DEFAULT_IMAGE_SCRIPT_DIR, "operators/run.py") command = f"python {script} " if self.config["execution"]["auth"] == "resource_principal": command += "-r " else: entry_script = self.config["execution"].get("entrypoint") if not entry_script: raise ValueError( "An entrypoint script must be specified when running with conda pack. " "Use `--entrypoint`." ) if not os.path.exists( os.path.join(self.config["execution"]["source_folder"], entry_script) ): raise FileNotFoundError( f"{os.path.join(self.config['execution']['source_folder'], entry_script)} is not found." ) if is_in_notebook_session(): source_folder = os.path.join(self.config["execution"]["source_folder"]) else: source_folder = os.path.join( DEFAULT_IMAGE_SCRIPT_DIR, "operators", os.path.basename(self.config["execution"]["source_folder"]), ) if os.path.splitext(entry_script)[-1] == ".py": command = f"python {os.path.join(source_folder, entry_script)} " if is_in_notebook_session(): command = ( f"source activate {os.path.join(DEFAULT_NOTEBOOK_SESSION_CONDA_DIR, self.config['execution']['conda_slug'])} && " + command ) elif os.path.splitext(entry_script)[-1] == ".sh": command = f"cd {source_folder} && /bin/bash {entry_script} " else: logger.warn( "ML Job only support .py and .sh files." "If you intend to submit to ML Job later, please update file extension." ) command = f"cd {source_folder} && {entry_script} " if self.config["execution"].get("command"): command += f"{self.config['execution']['command']}" command += extra_cmd return command def _run_with_image(self, bind_volumes: Dict) -> int: env_vars = self.config["execution"]["env_vars"] image = self.config["execution"]["image"] if ConfigResolver(self.config)._is_ads_operator(): # running operators command = ( f"python {os.path.join(DEFAULT_IMAGE_SCRIPT_DIR, 'operators/run.py')} " ) entrypoint = None if self.config["execution"].get("command", None): command += f"{self.config['execution']['command']}" else: # in case of running a user image, entrypoint is not required entrypoint = self.config["execution"].get("entrypoint", None) command = self.config["execution"].get("command", None) if self.config["execution"].get("source_folder", None): bind_volumes.update(self._mount_source_folder_if_exists(bind_volumes)) bind_volumes.update(self.config["execution"]["volumes"]) return run_container(image, bind_volumes, env_vars, command, entrypoint) def _run_with_image_v1(self, bind_volumes: Dict) -> int: env_vars = [ str(d["name"]) + "=" + str(d["value"]) for d in self.config["spec"]["Runtime"]["spec"]["environmentVariables"] ] image = self.config["spec"]["Infrastructure"]["spec"]["dockerImage"] command = self.config["spec"]["Runtime"]["spec"]["entrypoint"] entrypoint = "python /etc/datascience/operator/cluster_helper.py" print("looking to bind volume") bind_volumes.update(self.config["spec"]["Framework"]["spec"]["bindVolumes"]) return run_container( image=image, bind_volumes=bind_volumes, env_vars=env_vars, command=command, entrypoint=entrypoint, ) def _check_conda_pack_and_install_if_applicable( self, slug: str, bind_volumes: Dict, env_vars: Dict, install: bool = False, conda_uri: str = None, ) -> Dict: conda_pack_folder = os.path.abspath( os.path.expanduser(self.config["execution"]["conda_pack_folder"]) ) conda_pack_path = os.path.join(conda_pack_folder, slug) if not os.path.exists(conda_pack_path): if install: logger.info( f"Downloading a `{slug}` to the `{conda_pack_folder}`. If this conda pack is already installed locally in a different location, pass in `conda_pack_folder` to avoid downloading it again." ) _install( conda_uri=conda_uri, conda_pack_folder=conda_pack_folder, oci_config=self.oci_config, oci_profile=self.profile, auth_type=self.auth_type, ) else: raise CondaPackNotFound( f"Conda pack {conda_pack_path} not found. Please run `ads opctl conda create` or `ads opctl conda install`." ) if os.path.exists(os.path.join(conda_pack_path, "spark-defaults.conf")): if not is_in_notebook_session(): env_vars["SPARK_CONF_DIR"] = os.path.join(DEFAULT_IMAGE_CONDA_DIR, slug) # write core_site.xml if self.config["execution"]["auth"] == "api_key": properties = generate_core_site_properties( "api_key", self.config["execution"]["oci_config"], self.config["execution"]["oci_profile"], ) # key path cannot have "~/" oci_config_folder = os.path.expanduser( os.path.dirname(self.config["execution"]["oci_config"]) ) properties[-1] = ( properties[-1][0], os.path.join( DEFAULT_IMAGE_HOME_DIR, ".oci", os.path.relpath( os.path.expanduser(properties[-1][1]), oci_config_folder ), ), ) else: properties = generate_core_site_properties("resource_principal") core_site_str = generate_core_site_properties_str(properties) if is_in_notebook_session(): with open( os.path.join( DEFAULT_NOTEBOOK_SESSION_SPARK_CONF_DIR, "core-site.xml" ), "w", ) as f: f.write(core_site_str) else: with open(os.path.join(conda_pack_path, "core-site.xml"), "w") as f: f.write(core_site_str) bind_volumes[ os.path.abspath( os.path.expanduser( os.path.join(self.config["execution"]["conda_pack_folder"], slug) ) ) ] = {"bind": os.path.join(DEFAULT_IMAGE_CONDA_DIR, slug)} return bind_volumes, env_vars def _mount_source_folder_if_exists(self, bind_volumes: Dict) -> Dict: source_folder = os.path.abspath(self.config["execution"]["source_folder"]) if not os.path.exists(source_folder): raise FileNotFoundError(f"source folder {source_folder} does not exist.") mount_path = os.path.join( DEFAULT_IMAGE_SCRIPT_DIR, "operators", os.path.basename(self.config["execution"]["source_folder"]), ) bind_volumes[source_folder] = {"bind": mount_path} return bind_volumes @staticmethod @runtime_dependency(module="docker", install_from=OptionalDependency.OPCTL) def _activate_conda_env_and_run( image: str, slug: str, command: List[str], bind_volumes: Dict, env_vars: Dict ) -> int: import docker try: client = get_docker_client() client.api.inspect_image(image) except docker.errors.ImageNotFound: logger.info(f"Image {image} not found. Attempt building it now....") if image == ML_JOB_IMAGE: build_image("job-local", gpu=False) else: build_image("job-local", gpu=True) with tempfile.TemporaryDirectory(dir=os.path.expanduser("~")) as td: with open(os.path.join(td, "entryscript.sh"), "w") as f: f.write( f""" #!/bin/bash source {os.path.join(DEFAULT_IMAGE_CONDA_DIR, slug, 'bin/activate')} {command} """ ) bind_volumes[os.path.join(td, "entryscript.sh")] = { "bind": os.path.join(DEFAULT_IMAGE_SCRIPT_DIR, "entryscript.sh") } env_vars["conda_slug"] = slug return run_container( image, bind_volumes, env_vars, command=f"bash {os.path.join(DEFAULT_IMAGE_SCRIPT_DIR, 'entryscript.sh')}", )
[docs] class LocalBackendDistributed(LocalBackend): def __init__(self, config: Dict) -> None: """ Initialize a LocalBackendDistributed object with given config. This serves local single node(docker) testing for Distributed Tranining Parameters ---------- config: dict dictionary of configurations """ self.config = config
[docs] def run(self): local_run(self.config, load_ini())
[docs] class LocalPipelineBackend(Backend): LOG_PREFIX = "Local Pipeline:" DEFAULT_PARALLEL_CONTAINER_MAXIMUM = 4 DEFAULT_STATUS_POLL_INTERVAL_SECONDS = 5 def __init__(self, config: Dict) -> None: """ Initialize a LocalPipelineBackend object with given config. Parameters ---------- config: dict dictionary of configurations """ self.config = config
[docs] def run(self) -> None: pipeline = Pipeline.from_dict(self.config) self._log_orchestration_message(f"Starting pipeline {pipeline.name} locally.") if pipeline.dag: self._log_orchestration_message(f"Pipeline DAG:") for d in pipeline.dag: self._log_orchestration_message(f" {d}") completed_status = {} waiting_steps = {} for s in pipeline.step_details: waiting_steps[s.name] = s futures = {} pipeline_failure = False done = False if "max_parallel_containers" in self.config["infrastructure"]: max_parallel_containers = int( self.config["infrastructure"]["max_parallel_containers"] ) else: max_parallel_containers = min( self.DEFAULT_PARALLEL_CONTAINER_MAXIMUM, os.cpu_count() ) logger.warn( f"max_parallel_containers not specified in the config. Defaulting to {max_parallel_containers}." " Run `ads opctl configure` to define your local backend config." ) poll_interval_seconds = int( self.config["infrastructure"].get( "pipeline_status_poll_interval_seconds", self.DEFAULT_STATUS_POLL_INTERVAL_SECONDS, ) ) with ThreadPoolExecutor(max_workers=max_parallel_containers) as executor: while not done: # Check if any running steps have completed for s in list(futures): if futures[s].done(): if futures[s].exception() is None: self._log_orchestration_message( f"Step {s} completed successfully." ) completed_status[ s ] = PipelineStepRun.LIFECYCLE_STATE_SUCCEEDED else: pipeline_failure = True self._log_orchestration_message(f"Step {s} failed:") logger.error(futures[s].exception()) completed_status[s] = PipelineStepRun.LIFECYCLE_STATE_FAILED del futures[s] for s in list(waiting_steps): # Cancel all waiting steps if a failure is encountered if pipeline_failure: self._log_orchestration_message( f"Skipping step {s} - pipeline failure encountered." ) completed_status[s] = PipelineStepRun.LIFECYCLE_STATE_SKIPPED del waiting_steps[s] continue # Start a waiting step if all of its dependencies have completed successfully completed_deps = [ dep for dep in waiting_steps[s].depends_on if dep in completed_status ] if len(waiting_steps[s].depends_on) == len(completed_deps): self._log_orchestration_message(f"Starting step {s}") futures[s] = self._start_pipeline_step( waiting_steps[s], executor ) del waiting_steps[s] if len(completed_status) == len(pipeline.step_details): done = True else: sleep(poll_interval_seconds) self._log_orchestration_message("Pipeline run complete!") self._log_orchestration_message("Summary:") for step in pipeline.step_details: self._log_orchestration_message( f" {step.name} - {completed_status[step.name]}" )
def _start_pipeline_step( self, step: PipelineStep, executor: ThreadPoolExecutor ) -> Future: """ Starts a single pipeline step. Parameters ---------- step: PipelineStep The pipeline step to start executor: ThreadPoolExecutor The executor used to run the pipeline step. Returns ------- future: Future The Future that can be used to query the status of the pipeline step. """ step_config = self._create_step_config(step) # Have a local job backend execute the step using the updated step config local_job = LocalBackend(step_config) return executor.submit(local_job.run) def _create_step_config(self, pipeline_step: PipelineStep) -> Dict: """ Creates the config for local execution of an individual pipeline step. Parameters ---------- pipeline_step: PipelineStep The pipeline step whose config should be generated Returns ------- step_config: Dict The config for the individual pipeline step. """ if pipeline_step.kind.upper() != "CUSTOM_SCRIPT": raise ValueError( f"Step {pipeline_step.name} has unsupported kind. " f"Local pipeline execution only supports pipeline steps with kind customScript." ) step_config = copy.deepcopy(self.config) step_config["kind"] = pipeline_step.kind step_config["type"] = pipeline_step.type del step_config["spec"] step_execution_config = step_config["execution"] step_execution_config["conda_slug"] = pipeline_step.runtime.conda["slug"] step_execution_config["env_vars"] = pipeline_step.runtime.envs if pipeline_step.runtime.type == "script": step_execution_config["entrypoint"] = pipeline_step.runtime.script_uri elif pipeline_step.runtime.type == "python": step_execution_config["entrypoint"] = pipeline_step.runtime.script_uri step_execution_config["source_folder"] = pipeline_step.runtime.working_dir elif pipeline_step.runtime.type == "notebook": step_execution_config["entrypoint"] = pipeline_step.runtime.notebook_uri else: raise ValueError( f"Step {pipeline_step.name} has unsupported runtime. " f"Supported values are: script, python, notebook" ) if not step_execution_config.get("source_folder"): logger.warn( "No source_folder provided; defaulting to the current working directory. To specify a source" "folder for all pipeline steps, use the --source-folder parameter. To specify a source folder" "for individual steps, use a runtime with type python and specify the workingDir property." ) step_execution_config["source_folder"] = os.getcwd() ConfigResolver(step_config).process() return step_config def _log_orchestration_message(self, str: str) -> None: """ Logs a message related to pipeline run orchestration Parameters ---------- str: str The message to log """ logger.info(f"{self.LOG_PREFIX}: {str}")
[docs] class LocalModelDeploymentBackend(LocalBackend): def __init__(self, config: Dict) -> None: """ Initialize a LocalModelDeploymentBackend object with given config. Parameters ---------- config: dict dictionary of configurations """ super().__init__(config) self.oci_auth = create_signer( self.auth_type, self.oci_config, self.profile, ) self.client = OCIClientFactory(**self.oci_auth).data_science
[docs] def predict(self) -> None: """ Conducts local verify. Returns ------- None Nothing. """ # model artifact in artifact directory artifact_directory = self.config["execution"].get("artifact_directory") ocid = self.config["execution"].get("ocid") model_folder = os.path.expanduser( self.config["execution"].get("model_save_folder", DEFAULT_MODEL_FOLDER) ) artifact_directory = artifact_directory or os.path.join(model_folder, str(ocid)) if ocid and ( not os.path.exists(artifact_directory) or len(os.listdir(artifact_directory)) == 0 ): region = self.config["execution"].get("region", None) bucket_uri = self.config["execution"].get("bucket_uri", None) timeout = self.config["execution"].get("timeout", None) logger.info( f"No cached model found. Downloading the model {ocid} to {artifact_directory}. If you already have a copy of the model, specify `artifact_directory` instead of `ocid`. You can specify `model_save_folder` to decide where to store the model artifacts." ) _download_model( auth=self.auth_type, profile=self.profile, ocid=ocid, artifact_directory=artifact_directory, region=region, bucket_uri=bucket_uri, timeout=timeout, force_overwrite=True, ) # conda conda_slug = self.config["execution"].get("conda_slug") conda_path = self.config["execution"].get("conda_path") if not conda_slug and not conda_path and ocid: conda_slug, conda_path = self._get_conda_info_from_custom_metadata(ocid) if not conda_slug and not conda_path: conda_slug, conda_path = self._get_conda_info_from_runtime( artifact_dir=artifact_directory ) if "conda_slug" not in self.config["execution"]: self.config["execution"]["conda_slug"] = ( conda_path.split("/")[-1] if conda_path else conda_slug ) self.config["execution"]["image"] = ML_JOB_IMAGE # bind_volumnes bind_volumes = {} SCRIPT = "script.py" dir_path = os.path.dirname(os.path.realpath(__file__)) if not is_in_notebook_session(): bind_volumes = { os.path.expanduser( os.path.dirname(self.config["execution"]["oci_config"]) ): {"bind": os.path.join(DEFAULT_IMAGE_HOME_DIR, ".oci")} } self.config["execution"]["source_folder"] = os.path.abspath( os.path.join(dir_path, "..") ) self.config["execution"]["entrypoint"] = SCRIPT bind_volumes[artifact_directory] = {"bind": DEFAULT_MODEL_DEPLOYMENT_FOLDER} # extra cmd data = self.config["execution"].get("payload") extra_cmd = f"--payload '{data}' " + f"--auth {self.auth_type} " if self.auth_type != "resource_principal": extra_cmd += f"--profile {self.profile}" if is_in_notebook_session() or NO_CONTAINER: # _run_with_conda_pack has code to handle notebook session case, # however, it activate the conda pack and then run the script. # For the deployment, we just take the current conda env and run it. # Hence we just handle the notebook case directly here. script_path = os.path.join(os.path.join(dir_path, ".."), SCRIPT) cmd = ( f"python {script_path} " + f"--artifact-directory {artifact_directory} " + extra_cmd ) logger.info(f"Running in a notebook or NO_CONTAINER with command {cmd}") run_command(cmd=cmd, shell=True) else: extra_cmd = ( f"--artifact-directory {DEFAULT_MODEL_DEPLOYMENT_FOLDER} " + extra_cmd ) exit_code = self._run_with_conda_pack( bind_volumes, extra_cmd, install=True, conda_uri=conda_path ) if exit_code != 0: raise RuntimeError( f"`predict` did not complete successfully. Exit code: {exit_code}. " f"Run with the --debug argument to view container logs." )
def _get_conda_info_from_custom_metadata(self, ocid): """ Get conda env info from custom metadata from model catalog. Returns ------- (str, str) conda slug and conda path. """ response = self.client.get_model(ocid) custom_metadata = ModelCustomMetadata._from_oci_metadata( response.data.custom_metadata_list ) conda_slug, conda_path = None, None if "CondaEnvironmentPath" in custom_metadata.keys: conda_path = custom_metadata["CondaEnvironmentPath"].value if "SlugName" in custom_metadata.keys: conda_slug = custom_metadata["SlugName"].value return conda_slug, conda_path @staticmethod def _get_conda_info_from_runtime(artifact_dir): """ Get conda env info from runtime yaml file. Returns ------- (str, str) conda slug and conda path. """ runtime_yaml_file = os.path.join(artifact_dir, "runtime.yaml") runtime_info = RuntimeInfo.from_yaml(uri=runtime_yaml_file) conda_slug = ( runtime_info.model_deployment.inference_conda_env.inference_env_slug ) conda_path = ( runtime_info.model_deployment.inference_conda_env.inference_env_path ) return conda_slug, conda_path
[docs] class LocalOperatorBackend(Backend): """ The local operator backend to execute operator in the local environment. Currently supported two scenarios: * Running operator within local conda environment. * Running operator within local container. Attributes ---------- runtime_config: (Dict) The runtime config for the operator. operator_config: (Dict) The operator specification config. operator_type: str The type of the operator. operator_info: OperatorInfo The detailed information about the operator. """ def __init__( self, config: Optional[Dict], operator_info: OperatorInfo = None ) -> None: """ Instantiates the operator backend. Parameters ---------- config: (Dict) The configuration file containing operator's specification details and execution section. operator_info: (OperatorInfo, optional) The operator's detailed information extracted from the operator.__init__ file. Will be extracted from the operator type in case if not provided. """ super().__init__(config=config or {}) self.runtime_config = self.config.get("runtime", {}) self.operator_config = { **{ key: value for key, value in self.config.items() if key not in ("runtime", "infrastructure", "execution") } } self.operator_type = self.operator_config.get("type") self._RUNTIME_MAP = { operator_runtime.ContainerRuntime.type: self._run_with_container, operator_runtime.PythonRuntime.type: self._run_with_python, } self.operator_info = operator_info def _run_with_python(self, **kwargs: Dict) -> int: """Runs the operator within a local python environment. Returns ------- int The operator's run exit code. """ # build runtime object runtime = operator_runtime.PythonRuntime.from_dict( self.runtime_config, ignore_unknown=True ) # run operator operator_spec = json.dumps(self.operator_config) sys.argv = [self.operator_info.type, "--spec", operator_spec] logger.info(f"{'*' * 50} Runtime Config {'*' * 50}") logger.info(runtime.to_yaml()) try: runpy.run_module(self.operator_info.type, run_name="__main__") except SystemExit as exception: return exception.code else: return 0 def _run_with_container(self, **kwargs: Dict) -> int: """Runs the operator within a container. Returns ------- int The operator's run exit code. """ # build runtime object runtime: operator_runtime.ContainerRuntime = ( operator_runtime.ContainerRuntime.from_dict( self.runtime_config, ignore_unknown=True ) ) # prepare environment variables env_vars = { **{env["name"]: env["value"] for env in runtime.spec.env}, ENV_OPERATOR_ARGS: json.dumps(self.operator_config), } # prepare container volumes bind_volumes = {} for volume in runtime.spec.volume: host_path, container_path = volume.split(":") bind_volumes[host_path.lstrip().rstrip()] = { "bind": container_path.lstrip().rstrip() } logger.info(f"{'*' * 50} Runtime Config {'*' * 50}") logger.info(runtime.to_yaml()) return run_container( image=runtime.spec.image, bind_volumes=bind_volumes, env_vars=env_vars, command=f"'python3 -m {self.operator_info.type}'", )
[docs] def run(self, **kwargs: Dict) -> Dict: """Runs the operator.""" # extract runtime runtime_type = self.runtime_config.get( "type", operator_runtime.OPERATOR_LOCAL_RUNTIME_TYPE.PYTHON ) if runtime_type not in self._RUNTIME_MAP: raise RuntimeError( f"Not supported runtime - {runtime_type} for local backend. " f"Supported values: {self._RUNTIME_MAP.keys()}" ) if not self.operator_info: self.operator_info = OperatorLoader.from_uri(self.operator_type).load() if self.config.get("dry_run"): logger.info( "The dry run option is not supported for " "the local backends and will be ignored." ) # run operator with provided runtime exit_code = self._RUNTIME_MAP.get(runtime_type, lambda: None)(**kwargs) if exit_code != 0: raise RuntimeError( f"Operation did not complete successfully. Exit code: {exit_code}. " f"Run with the --debug argument to view logs." )
[docs] def init( self, uri: Union[str, None] = None, overwrite: bool = False, runtime_type: Union[str, None] = None, **kwargs: Dict, ) -> Union[str, None]: """Generates a starter YAML specification for the operator local runtime. Parameters ---------- overwrite: (bool, optional). Defaults to False. Overwrites the result specification YAML if exists. uri: (str, optional), Defaults to None. The filename to save the resulting specification template YAML. runtime_type: (str, optional). Defaults to None. The resource runtime type. **kwargs: Dict The optional arguments. Returns ------- Union[str, None] The YAML specification for the given resource if `uri` was not provided. `None` otherwise. """ runtime_type = runtime_type or operator_runtime.ContainerRuntime.type if runtime_type not in operator_runtime_const.RUNTIME_TYPE_MAP: raise ValueError( f"Not supported runtime type {runtime_type}. " f"Supported values: {operator_runtime_const.RUNTIME_TYPE_MAP.keys()}" ) RUNTIME_KWARGS_MAP = { operator_runtime.ContainerRuntime.type: { "image": f"{self.operator_config['type']}:{self.operator_config['version']}", "volume": [ os.path.expanduser( os.path.dirname(self.config["execution"]["oci_config"]) ) + ":" + "/root/.oci" ], "env": [ { "name": "operator", "value": f"{self.operator_config['type']}:{self.operator_config['version']}", } ], }, operator_runtime.PythonRuntime.type: {}, } with AuthContext(auth=self.auth_type, profile=self.profile): note = ( "# This YAML specification was auto generated by the " "`ads operator init` command.\n" "# The more details about the operator's runtime YAML " "specification can be found in the ADS documentation:\n" "# https://accelerated-data-science.readthedocs.io/en/latest \n\n" ) return ( operator_runtime_const.RUNTIME_TYPE_MAP[runtime_type] .init(**RUNTIME_KWARGS_MAP[runtime_type]) .to_yaml( uri=uri, overwrite=overwrite, note=note, **kwargs, ) )