Source code for ads.opctl.backend.local

#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at

import copy
import json
import os
import runpy
import sys
import tempfile
from concurrent.futures import Future, ThreadPoolExecutor
from time import sleep
from typing import Dict, List, Optional, Union

from oci.data_science.models import PipelineStepRun

from ads.common.auth import AuthContext, create_signer
from ads.common.decorator.runtime_dependency import (
from ads.common.oci_client import OCIClientFactory
from ads.config import NO_CONTAINER
from ads.model.model_metadata import ModelCustomMetadata
from ads.model.runtime.runtime_info import RuntimeInfo
from ads.opctl import logger
from ads.opctl.backend.base import Backend
from ads.opctl.conda.cmds import _install
from ads.opctl.config.resolver import ConfigResolver
from ads.opctl.constants import (
from ads.opctl.distributed.cmds import load_ini, local_run
from ads.opctl.model.cmds import _download_model
from ads.opctl.operator import __operators__
from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS
from ads.opctl.operator.common.operator_loader import OperatorInfo, OperatorLoader
from ads.opctl.operator.runtime import const as operator_runtime_const
from ads.opctl.operator.runtime import runtime as operator_runtime
from ads.opctl.spark.cmds import (
from ads.opctl.utils import (
from ads.pipeline.ads_pipeline import Pipeline, PipelineStep

[docs] class CondaPackNotFound(Exception): # pragma: no cover pass
[docs] class LocalBackend(Backend): def __init__(self, config: Dict) -> None: """ Initialize a LocalBackend object with given config. Parameters ---------- config: dict dictionary of configurations """ super().__init__(config=config)
[docs] def run(self): if self.config.get("version") == "v1.0": docker_image = self.config["spec"]["Infrastructure"]["spec"]["dockerImage"] # TODO: don't hard code api keys bind_volumes = { os.path.expanduser("~/.oci"): { "bind": os.path.join(DEFAULT_IMAGE_HOME_DIR, ".oci"), "mode": "ro", } } self._run_with_image_v1(bind_volumes) else: bind_volumes = {} if not is_in_notebook_session(): bind_volumes = { os.path.expanduser( os.path.dirname(self.config["execution"]["oci_config"]) ): {"bind": os.path.join(DEFAULT_IMAGE_HOME_DIR, ".oci")} } if self.config["execution"].get("conda_slug", None): exit_code = self._run_with_conda_pack(bind_volumes) elif self.config["execution"].get("image"): exit_code = self._run_with_image(bind_volumes) else: raise ValueError("Either conda pack info or image should be specified.") if exit_code != 0: raise RuntimeError( f"Job did not complete successfully. Exit code: {exit_code}. " f"Run with the --debug argument to view container logs." )
[docs] @runtime_dependency(module="docker", install_from=OptionalDependency.OPCTL) def init_vscode_container(self) -> None: """ Create a .devcontainer.json file for development with VSCode. Returns ------- None """ source_folder = self.config["execution"].get("source_folder", None) if source_folder: source_folder = os.path.abspath(source_folder) if not os.path.exists(source_folder): raise FileNotFoundError( f"source folder {source_folder} does not exist." ) if self.config["execution"].get("gpu", False): image = self.config["execution"].get("image", ML_JOB_GPU_IMAGE) else: image = self.config["execution"].get("image", ML_JOB_IMAGE) oci_config_folder = os.path.expanduser( os.path.dirname(self.config["execution"]["oci_config"]) ) dev_container = { "image": image, "extensions": ["ms-python.python"], "mounts": [ f"source={oci_config_folder},target={os.path.join(DEFAULT_IMAGE_HOME_DIR, '.oci')},type=bind", ], "workspaceMount": f"source={source_folder},target={os.path.join(DEFAULT_IMAGE_HOME_DIR, os.path.basename(source_folder))},type=bind", "workspaceFolder": DEFAULT_IMAGE_HOME_DIR, "name": f"{image}-dev-env", } if image == ML_JOB_IMAGE or image == ML_JOB_GPU_IMAGE: conda_folder = os.path.expanduser( self.config["execution"]["conda_pack_folder"] ) dev_container["mounts"].append( f"source={conda_folder},target={DEFAULT_IMAGE_CONDA_DIR},type=bind" ) dev_container[ "postCreateCommand" ] = "conda init bash && source ~/.bashrc" else: raise ValueError( "`--source-folder` option works with image `ml-job`, `ml-job-gpu` only. Those can be build with `ads opctl build-image`. Please check `ads opctl build-image -h`." ) else: image = self.config["execution"].get("image", None) if not image: raise ValueError("Image must be specified.") else: dev_container = { "image": image, "mounts": [], "extensions": ["ms-python.python"], } dev_container["containerEnv"] = self.config["execution"].get("env_vars", {}) for k, v in self.config["execution"]["volumes"].items(): dev_container["mounts"].append( f"source={os.path.abspath(k)},target={v['bind']},type=bind" ) try: client = get_docker_client() client.api.inspect_image(image) except docker.errors.ImageNotFound: cmd = None if image == ML_JOB_IMAGE: cmd = "ads opctl build-image job-local" elif image == ML_JOB_GPU_IMAGE: cmd = "ads opctl build-image job-local -g" if cmd: raise RuntimeError( f"Image {image} not found. Please run `{cmd}` to build the image." ) else: raise RuntimeError(f"Image {image} not found.") if source_folder: with open( os.path.join(os.path.abspath(source_folder), ".devcontainer.json"), "w" ) as f: f.write(json.dumps(dev_container, indent=2)) print(f"File {os.path.join(source_folder, '.devcontainer.json')} created.") else: with open(os.path.abspath(".devcontainer.json"), "w") as f: f.write(json.dumps(dev_container, indent=2)) print(f"File {os.path.abspath('.devcontainer.json')} created.")
def _run_with_conda_pack( self, bind_volumes: Dict, extra_cmd: str = "", install: bool = False, conda_uri: str = "", ) -> int: env_vars = self.config["execution"].get("env_vars", {}) slug = self.config["execution"]["conda_slug"] image = self.config["execution"].get("image", None) # bind_volumes is modified in-place and does not need to be returned # it is returned just to be explicit that it is changed during this function call bind_volumes, env_vars = self._check_conda_pack_and_install_if_applicable( slug, bind_volumes, env_vars, install=install, conda_uri=conda_uri ) bind_volumes = self._mount_source_folder_if_exists(bind_volumes) command = self._build_command_for_conda_run(extra_cmd) if is_in_notebook_session(): run_command(command, shell=True) else: conda_pack_path = os.path.join( os.path.expanduser(self.config["execution"]["conda_pack_folder"]), slug ) if os.path.exists(os.path.join(conda_pack_path, "spark-defaults.conf")): env_vars["SPARK_CONF_DIR"] = os.path.join(DEFAULT_IMAGE_CONDA_DIR, slug) f"Running with conda pack in a container with command {command}" ) return self._activate_conda_env_and_run( image, slug, command, bind_volumes, env_vars ) def _build_command_for_conda_run(self, extra_cmd: str = "") -> str: if ConfigResolver(self.config)._is_ads_operator(): if is_in_notebook_session(): curr_dir = os.path.dirname(os.path.abspath(__file__)) script = os.path.abspath( os.path.join(curr_dir, "..", "operators", "") ) else: script = os.path.join(DEFAULT_IMAGE_SCRIPT_DIR, "operators/") command = f"python {script} " if self.config["execution"]["auth"] == "resource_principal": command += "-r " else: entry_script = self.config["execution"].get("entrypoint") if not entry_script: raise ValueError( "An entrypoint script must be specified when running with conda pack. " "Use `--entrypoint`." ) if not os.path.exists( os.path.join(self.config["execution"]["source_folder"], entry_script) ): raise FileNotFoundError( f"{os.path.join(self.config['execution']['source_folder'], entry_script)} is not found." ) if is_in_notebook_session(): source_folder = os.path.join(self.config["execution"]["source_folder"]) else: source_folder = os.path.join( DEFAULT_IMAGE_SCRIPT_DIR, "operators", os.path.basename(self.config["execution"]["source_folder"]), ) if os.path.splitext(entry_script)[-1] == ".py": command = f"python {os.path.join(source_folder, entry_script)} " if is_in_notebook_session(): command = ( f"source activate {os.path.join(DEFAULT_NOTEBOOK_SESSION_CONDA_DIR, self.config['execution']['conda_slug'])} && " + command ) elif os.path.splitext(entry_script)[-1] == ".sh": command = f"cd {source_folder} && /bin/bash {entry_script} " else: logger.warn( "ML Job only support .py and .sh files." "If you intend to submit to ML Job later, please update file extension." ) command = f"cd {source_folder} && {entry_script} " if self.config["execution"].get("command"): command += f"{self.config['execution']['command']}" command += extra_cmd return command def _run_with_image(self, bind_volumes: Dict) -> int: env_vars = self.config["execution"]["env_vars"] image = self.config["execution"]["image"] if ConfigResolver(self.config)._is_ads_operator(): # running operators command = ( f"python {os.path.join(DEFAULT_IMAGE_SCRIPT_DIR, 'operators/')} " ) entrypoint = None if self.config["execution"].get("command", None): command += f"{self.config['execution']['command']}" else: # in case of running a user image, entrypoint is not required entrypoint = self.config["execution"].get("entrypoint", None) command = self.config["execution"].get("command", None) if self.config["execution"].get("source_folder", None): bind_volumes.update(self._mount_source_folder_if_exists(bind_volumes)) bind_volumes.update(self.config["execution"]["volumes"]) return run_container(image, bind_volumes, env_vars, command, entrypoint) def _run_with_image_v1(self, bind_volumes: Dict) -> int: env_vars = [ str(d["name"]) + "=" + str(d["value"]) for d in self.config["spec"]["Runtime"]["spec"]["environmentVariables"] ] image = self.config["spec"]["Infrastructure"]["spec"]["dockerImage"] command = self.config["spec"]["Runtime"]["spec"]["entrypoint"] entrypoint = "python /etc/datascience/operator/" print("looking to bind volume") bind_volumes.update(self.config["spec"]["Framework"]["spec"]["bindVolumes"]) return run_container( image=image, bind_volumes=bind_volumes, env_vars=env_vars, command=command, entrypoint=entrypoint, ) def _check_conda_pack_and_install_if_applicable( self, slug: str, bind_volumes: Dict, env_vars: Dict, install: bool = False, conda_uri: str = None, ) -> Dict: conda_pack_folder = os.path.abspath( os.path.expanduser(self.config["execution"]["conda_pack_folder"]) ) conda_pack_path = os.path.join(conda_pack_folder, slug) if not os.path.exists(conda_pack_path): if install: f"Downloading a `{slug}` to the `{conda_pack_folder}`. If this conda pack is already installed locally in a different location, pass in `conda_pack_folder` to avoid downloading it again." ) _install( conda_uri=conda_uri, conda_pack_folder=conda_pack_folder, oci_config=self.oci_config, oci_profile=self.profile, auth_type=self.auth_type, ) else: raise CondaPackNotFound( f"Conda pack {conda_pack_path} not found. Please run `ads opctl conda create` or `ads opctl conda install`." ) if os.path.exists(os.path.join(conda_pack_path, "spark-defaults.conf")): if not is_in_notebook_session(): env_vars["SPARK_CONF_DIR"] = os.path.join(DEFAULT_IMAGE_CONDA_DIR, slug) # write core_site.xml if self.config["execution"]["auth"] == "api_key": properties = generate_core_site_properties( "api_key", self.config["execution"]["oci_config"], self.config["execution"]["oci_profile"], ) # key path cannot have "~/" oci_config_folder = os.path.expanduser( os.path.dirname(self.config["execution"]["oci_config"]) ) properties[-1] = ( properties[-1][0], os.path.join( DEFAULT_IMAGE_HOME_DIR, ".oci", os.path.relpath( os.path.expanduser(properties[-1][1]), oci_config_folder ), ), ) else: properties = generate_core_site_properties("resource_principal") core_site_str = generate_core_site_properties_str(properties) if is_in_notebook_session(): with open( os.path.join( DEFAULT_NOTEBOOK_SESSION_SPARK_CONF_DIR, "core-site.xml" ), "w", ) as f: f.write(core_site_str) else: with open(os.path.join(conda_pack_path, "core-site.xml"), "w") as f: f.write(core_site_str) bind_volumes[ os.path.abspath( os.path.expanduser( os.path.join(self.config["execution"]["conda_pack_folder"], slug) ) ) ] = {"bind": os.path.join(DEFAULT_IMAGE_CONDA_DIR, slug)} return bind_volumes, env_vars def _mount_source_folder_if_exists(self, bind_volumes: Dict) -> Dict: source_folder = os.path.abspath(self.config["execution"]["source_folder"]) if not os.path.exists(source_folder): raise FileNotFoundError(f"source folder {source_folder} does not exist.") mount_path = os.path.join( DEFAULT_IMAGE_SCRIPT_DIR, "operators", os.path.basename(self.config["execution"]["source_folder"]), ) bind_volumes[source_folder] = {"bind": mount_path} return bind_volumes @staticmethod @runtime_dependency(module="docker", install_from=OptionalDependency.OPCTL) def _activate_conda_env_and_run( image: str, slug: str, command: List[str], bind_volumes: Dict, env_vars: Dict ) -> int: import docker try: client = get_docker_client() client.api.inspect_image(image) except docker.errors.ImageNotFound:"Image {image} not found. Attempt building it now....") if image == ML_JOB_IMAGE: build_image("job-local", gpu=False) else: build_image("job-local", gpu=True) with tempfile.TemporaryDirectory(dir=os.path.expanduser("~")) as td: with open(os.path.join(td, ""), "w") as f: f.write( f""" #!/bin/bash source {os.path.join(DEFAULT_IMAGE_CONDA_DIR, slug, 'bin/activate')} {command} """ ) bind_volumes[os.path.join(td, "")] = { "bind": os.path.join(DEFAULT_IMAGE_SCRIPT_DIR, "") } env_vars["conda_slug"] = slug return run_container( image, bind_volumes, env_vars, command=f"bash {os.path.join(DEFAULT_IMAGE_SCRIPT_DIR, '')}", )
[docs] class LocalBackendDistributed(LocalBackend): def __init__(self, config: Dict) -> None: """ Initialize a LocalBackendDistributed object with given config. This serves local single node(docker) testing for Distributed Tranining Parameters ---------- config: dict dictionary of configurations """ self.config = config
[docs] def run(self): local_run(self.config, load_ini())
[docs] class LocalPipelineBackend(Backend): LOG_PREFIX = "Local Pipeline:" DEFAULT_PARALLEL_CONTAINER_MAXIMUM = 4 DEFAULT_STATUS_POLL_INTERVAL_SECONDS = 5 def __init__(self, config: Dict) -> None: """ Initialize a LocalPipelineBackend object with given config. Parameters ---------- config: dict dictionary of configurations """ self.config = config
[docs] def run(self) -> None: pipeline = Pipeline.from_dict(self.config) self._log_orchestration_message(f"Starting pipeline {} locally.") if pipeline.dag: self._log_orchestration_message(f"Pipeline DAG:") for d in pipeline.dag: self._log_orchestration_message(f" {d}") completed_status = {} waiting_steps = {} for s in pipeline.step_details: waiting_steps[] = s futures = {} pipeline_failure = False done = False if "max_parallel_containers" in self.config["infrastructure"]: max_parallel_containers = int( self.config["infrastructure"]["max_parallel_containers"] ) else: max_parallel_containers = min( self.DEFAULT_PARALLEL_CONTAINER_MAXIMUM, os.cpu_count() ) logger.warn( f"max_parallel_containers not specified in the config. Defaulting to {max_parallel_containers}." " Run `ads opctl configure` to define your local backend config." ) poll_interval_seconds = int( self.config["infrastructure"].get( "pipeline_status_poll_interval_seconds", self.DEFAULT_STATUS_POLL_INTERVAL_SECONDS, ) ) with ThreadPoolExecutor(max_workers=max_parallel_containers) as executor: while not done: # Check if any running steps have completed for s in list(futures): if futures[s].done(): if futures[s].exception() is None: self._log_orchestration_message( f"Step {s} completed successfully." ) completed_status[ s ] = PipelineStepRun.LIFECYCLE_STATE_SUCCEEDED else: pipeline_failure = True self._log_orchestration_message(f"Step {s} failed:") logger.error(futures[s].exception()) completed_status[s] = PipelineStepRun.LIFECYCLE_STATE_FAILED del futures[s] for s in list(waiting_steps): # Cancel all waiting steps if a failure is encountered if pipeline_failure: self._log_orchestration_message( f"Skipping step {s} - pipeline failure encountered." ) completed_status[s] = PipelineStepRun.LIFECYCLE_STATE_SKIPPED del waiting_steps[s] continue # Start a waiting step if all of its dependencies have completed successfully completed_deps = [ dep for dep in waiting_steps[s].depends_on if dep in completed_status ] if len(waiting_steps[s].depends_on) == len(completed_deps): self._log_orchestration_message(f"Starting step {s}") futures[s] = self._start_pipeline_step( waiting_steps[s], executor ) del waiting_steps[s] if len(completed_status) == len(pipeline.step_details): done = True else: sleep(poll_interval_seconds) self._log_orchestration_message("Pipeline run complete!") self._log_orchestration_message("Summary:") for step in pipeline.step_details: self._log_orchestration_message( f" {} - {completed_status[]}" )
def _start_pipeline_step( self, step: PipelineStep, executor: ThreadPoolExecutor ) -> Future: """ Starts a single pipeline step. Parameters ---------- step: PipelineStep The pipeline step to start executor: ThreadPoolExecutor The executor used to run the pipeline step. Returns ------- future: Future The Future that can be used to query the status of the pipeline step. """ step_config = self._create_step_config(step) # Have a local job backend execute the step using the updated step config local_job = LocalBackend(step_config) return executor.submit( def _create_step_config(self, pipeline_step: PipelineStep) -> Dict: """ Creates the config for local execution of an individual pipeline step. Parameters ---------- pipeline_step: PipelineStep The pipeline step whose config should be generated Returns ------- step_config: Dict The config for the individual pipeline step. """ if pipeline_step.kind.upper() != "CUSTOM_SCRIPT": raise ValueError( f"Step {} has unsupported kind. " f"Local pipeline execution only supports pipeline steps with kind customScript." ) step_config = copy.deepcopy(self.config) step_config["kind"] = pipeline_step.kind step_config["type"] = pipeline_step.type del step_config["spec"] step_execution_config = step_config["execution"] step_execution_config["conda_slug"] = pipeline_step.runtime.conda["slug"] step_execution_config["env_vars"] = pipeline_step.runtime.envs if pipeline_step.runtime.type == "script": step_execution_config["entrypoint"] = pipeline_step.runtime.script_uri elif pipeline_step.runtime.type == "python": step_execution_config["entrypoint"] = pipeline_step.runtime.script_uri step_execution_config["source_folder"] = pipeline_step.runtime.working_dir elif pipeline_step.runtime.type == "notebook": step_execution_config["entrypoint"] = pipeline_step.runtime.notebook_uri else: raise ValueError( f"Step {} has unsupported runtime. " f"Supported values are: script, python, notebook" ) if not step_execution_config.get("source_folder"): logger.warn( "No source_folder provided; defaulting to the current working directory. To specify a source" "folder for all pipeline steps, use the --source-folder parameter. To specify a source folder" "for individual steps, use a runtime with type python and specify the workingDir property." ) step_execution_config["source_folder"] = os.getcwd() ConfigResolver(step_config).process() return step_config def _log_orchestration_message(self, str: str) -> None: """ Logs a message related to pipeline run orchestration Parameters ---------- str: str The message to log """"{self.LOG_PREFIX}: {str}")
[docs] class LocalModelDeploymentBackend(LocalBackend): def __init__(self, config: Dict) -> None: """ Initialize a LocalModelDeploymentBackend object with given config. Parameters ---------- config: dict dictionary of configurations """ super().__init__(config) self.oci_auth = create_signer( self.auth_type, self.oci_config, self.profile, ) self.client = OCIClientFactory(**self.oci_auth).data_science
[docs] def predict(self) -> None: """ Conducts local verify. Returns ------- None Nothing. """ # model artifact in artifact directory artifact_directory = self.config["execution"].get("artifact_directory") ocid = self.config["execution"].get("ocid") model_folder = os.path.expanduser( self.config["execution"].get("model_save_folder", DEFAULT_MODEL_FOLDER) ) artifact_directory = artifact_directory or os.path.join(model_folder, str(ocid)) if ocid and ( not os.path.exists(artifact_directory) or len(os.listdir(artifact_directory)) == 0 ): region = self.config["execution"].get("region", None) bucket_uri = self.config["execution"].get("bucket_uri", None) timeout = self.config["execution"].get("timeout", None) f"No cached model found. Downloading the model {ocid} to {artifact_directory}. If you already have a copy of the model, specify `artifact_directory` instead of `ocid`. You can specify `model_save_folder` to decide where to store the model artifacts." ) _download_model( auth=self.auth_type, profile=self.profile, ocid=ocid, artifact_directory=artifact_directory, region=region, bucket_uri=bucket_uri, timeout=timeout, force_overwrite=True, ) # conda conda_slug = self.config["execution"].get("conda_slug") conda_path = self.config["execution"].get("conda_path") if not conda_slug and not conda_path and ocid: conda_slug, conda_path = self._get_conda_info_from_custom_metadata(ocid) if not conda_slug and not conda_path: conda_slug, conda_path = self._get_conda_info_from_runtime( artifact_dir=artifact_directory ) if "conda_slug" not in self.config["execution"]: self.config["execution"]["conda_slug"] = ( conda_path.split("/")[-1] if conda_path else conda_slug ) self.config["execution"]["image"] = ML_JOB_IMAGE # bind_volumnes bind_volumes = {} SCRIPT = "" dir_path = os.path.dirname(os.path.realpath(__file__)) if not is_in_notebook_session(): bind_volumes = { os.path.expanduser( os.path.dirname(self.config["execution"]["oci_config"]) ): {"bind": os.path.join(DEFAULT_IMAGE_HOME_DIR, ".oci")} } self.config["execution"]["source_folder"] = os.path.abspath( os.path.join(dir_path, "..") ) self.config["execution"]["entrypoint"] = SCRIPT bind_volumes[artifact_directory] = {"bind": DEFAULT_MODEL_DEPLOYMENT_FOLDER} # extra cmd data = self.config["execution"].get("payload") extra_cmd = f"--payload '{data}' " + f"--auth {self.auth_type} " if self.auth_type != "resource_principal": extra_cmd += f"--profile {self.profile}" if is_in_notebook_session() or NO_CONTAINER: # _run_with_conda_pack has code to handle notebook session case, # however, it activate the conda pack and then run the script. # For the deployment, we just take the current conda env and run it. # Hence we just handle the notebook case directly here. script_path = os.path.join(os.path.join(dir_path, ".."), SCRIPT) cmd = ( f"python {script_path} " + f"--artifact-directory {artifact_directory} " + extra_cmd )"Running in a notebook or NO_CONTAINER with command {cmd}") run_command(cmd=cmd, shell=True) else: extra_cmd = ( f"--artifact-directory {DEFAULT_MODEL_DEPLOYMENT_FOLDER} " + extra_cmd ) exit_code = self._run_with_conda_pack( bind_volumes, extra_cmd, install=True, conda_uri=conda_path ) if exit_code != 0: raise RuntimeError( f"`predict` did not complete successfully. Exit code: {exit_code}. " f"Run with the --debug argument to view container logs." )
def _get_conda_info_from_custom_metadata(self, ocid): """ Get conda env info from custom metadata from model catalog. Returns ------- (str, str) conda slug and conda path. """ response = self.client.get_model(ocid) custom_metadata = ModelCustomMetadata._from_oci_metadata( ) conda_slug, conda_path = None, None if "CondaEnvironmentPath" in custom_metadata.keys: conda_path = custom_metadata["CondaEnvironmentPath"].value if "SlugName" in custom_metadata.keys: conda_slug = custom_metadata["SlugName"].value return conda_slug, conda_path @staticmethod def _get_conda_info_from_runtime(artifact_dir): """ Get conda env info from runtime yaml file. Returns ------- (str, str) conda slug and conda path. """ runtime_yaml_file = os.path.join(artifact_dir, "runtime.yaml") runtime_info = RuntimeInfo.from_yaml(uri=runtime_yaml_file) conda_slug = ( runtime_info.model_deployment.inference_conda_env.inference_env_slug ) conda_path = ( runtime_info.model_deployment.inference_conda_env.inference_env_path ) return conda_slug, conda_path
[docs] class LocalOperatorBackend(Backend): """ The local operator backend to execute operator in the local environment. Currently supported two scenarios: * Running operator within local conda environment. * Running operator within local container. Attributes ---------- runtime_config: (Dict) The runtime config for the operator. operator_config: (Dict) The operator specification config. operator_type: str The type of the operator. operator_info: OperatorInfo The detailed information about the operator. """ def __init__( self, config: Optional[Dict], operator_info: OperatorInfo = None ) -> None: """ Instantiates the operator backend. Parameters ---------- config: (Dict) The configuration file containing operator's specification details and execution section. operator_info: (OperatorInfo, optional) The operator's detailed information extracted from the operator.__init__ file. Will be extracted from the operator type in case if not provided. """ super().__init__(config=config or {}) self.runtime_config = self.config.get("runtime", {}) self.operator_config = { **{ key: value for key, value in self.config.items() if key not in ("runtime", "infrastructure", "execution") } } self.operator_type = self.operator_config.get("type") self._RUNTIME_MAP = { operator_runtime.ContainerRuntime.type: self._run_with_container, operator_runtime.PythonRuntime.type: self._run_with_python, } self.operator_info = operator_info def _run_with_python(self, **kwargs: Dict) -> int: """Runs the operator within a local python environment. Returns ------- int The operator's run exit code. """ # build runtime object runtime = operator_runtime.PythonRuntime.from_dict( self.runtime_config, ignore_unknown=True ) # run operator operator_spec = json.dumps(self.operator_config) sys.argv = [self.operator_info.type, "--spec", operator_spec]"{'*' * 50} Runtime Config {'*' * 50}") try: runpy.run_module(self.operator_info.type, run_name="__main__") except SystemExit as exception: return exception.code else: return 0 def _run_with_container(self, **kwargs: Dict) -> int: """Runs the operator within a container. Returns ------- int The operator's run exit code. """ # build runtime object runtime: operator_runtime.ContainerRuntime = ( operator_runtime.ContainerRuntime.from_dict( self.runtime_config, ignore_unknown=True ) ) # prepare environment variables env_vars = { **{env["name"]: env["value"] for env in runtime.spec.env}, ENV_OPERATOR_ARGS: json.dumps(self.operator_config), } # prepare container volumes bind_volumes = {} for volume in runtime.spec.volume: host_path, container_path = volume.split(":") bind_volumes[host_path.lstrip().rstrip()] = { "bind": container_path.lstrip().rstrip() }"{'*' * 50} Runtime Config {'*' * 50}") return run_container( image=runtime.spec.image, bind_volumes=bind_volumes, env_vars=env_vars, command=f"'python3 -m {self.operator_info.type}'", )
[docs] def run(self, **kwargs: Dict) -> Dict: """Runs the operator.""" # extract runtime runtime_type = self.runtime_config.get( "type", operator_runtime.OPERATOR_LOCAL_RUNTIME_TYPE.PYTHON ) if runtime_type not in self._RUNTIME_MAP: raise RuntimeError( f"Not supported runtime - {runtime_type} for local backend. " f"Supported values: {self._RUNTIME_MAP.keys()}" ) if not self.operator_info: self.operator_info = OperatorLoader.from_uri(self.operator_type).load() if self.config.get("dry_run"): "The dry run option is not supported for " "the local backends and will be ignored." ) # run operator with provided runtime exit_code = self._RUNTIME_MAP.get(runtime_type, lambda: None)(**kwargs) if exit_code != 0: raise RuntimeError( f"Operation did not complete successfully. Exit code: {exit_code}. " f"Run with the --debug argument to view logs." )
[docs] def init( self, uri: Union[str, None] = None, overwrite: bool = False, runtime_type: Union[str, None] = None, **kwargs: Dict, ) -> Union[str, None]: """Generates a starter YAML specification for the operator local runtime. Parameters ---------- overwrite: (bool, optional). Defaults to False. Overwrites the result specification YAML if exists. uri: (str, optional), Defaults to None. The filename to save the resulting specification template YAML. runtime_type: (str, optional). Defaults to None. The resource runtime type. **kwargs: Dict The optional arguments. Returns ------- Union[str, None] The YAML specification for the given resource if `uri` was not provided. `None` otherwise. """ runtime_type = runtime_type or operator_runtime.ContainerRuntime.type if runtime_type not in operator_runtime_const.RUNTIME_TYPE_MAP: raise ValueError( f"Not supported runtime type {runtime_type}. " f"Supported values: {operator_runtime_const.RUNTIME_TYPE_MAP.keys()}" ) RUNTIME_KWARGS_MAP = { operator_runtime.ContainerRuntime.type: { "image": f"{self.operator_config['type']}:{self.operator_config['version']}", "volume": [ os.path.expanduser( os.path.dirname(self.config["execution"]["oci_config"]) ) + ":" + "/root/.oci" ], "env": [ { "name": "operator", "value": f"{self.operator_config['type']}:{self.operator_config['version']}", } ], }, operator_runtime.PythonRuntime.type: {}, } with AuthContext(auth=self.auth_type, profile=self.profile): note = ( "# This YAML specification was auto generated by the " "`ads operator init` command.\n" "# The more details about the operator's runtime YAML " "specification can be found in the ADS documentation:\n" "# \n\n" ) return ( operator_runtime_const.RUNTIME_TYPE_MAP[runtime_type] .init(**RUNTIME_KWARGS_MAP[runtime_type]) .to_yaml( uri=uri, overwrite=overwrite, note=note, **kwargs, ) )