Source code for ads.opctl.backend.ads_ml_pipeline

#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

from typing import Dict, Union
from ads.common.auth import create_signer, AuthContext
from ads.common.oci_client import OCIClientFactory
from ads.opctl.backend.base import Backend
from ads.opctl.backend.ads_ml_job import JobRuntimeFactory
from ads.opctl.decorator.common import print_watch_command
from ads.pipeline import Pipeline, PipelineRun, PipelineStep, CustomScriptStep

from ads.jobs import PythonRuntime


[docs] class PipelineBackend(Backend): def __init__(self, config: Dict) -> None: """ Initialize a MLPipeline object given config dictionary. Parameters ---------- config: dict dictionary of configurations """ self.config = config self.oci_auth = create_signer( config["execution"].get("auth"), config["execution"].get("oci_config", None), config["execution"].get("oci_profile", None), ) self.auth_type = config["execution"].get("auth") self.profile = config["execution"].get("oci_profile", None) self.client = OCIClientFactory(**self.oci_auth).data_science
[docs] @print_watch_command def apply(self) -> Dict: """ Create Pipeline and Pipeline Run from YAML. """ with AuthContext(auth=self.auth_type, profile=self.profile): pipeline = Pipeline.from_dict(self.config) pipeline.create() pipeline_run = pipeline.run() print("PIPELINE OCID:", pipeline.id) print("PIPELINE RUN OCID:", pipeline_run.id) return {"job_id": pipeline.id, "run_id": pipeline_run.id}
[docs] @print_watch_command def run(self) -> Dict: """ Create Pipeline and Pipeline Run from OCID. """ pipeline_id = self.config["execution"]["ocid"] with AuthContext(auth=self.auth_type, profile=self.profile): pipeline = Pipeline.from_ocid(ocid=pipeline_id) pipeline_run = pipeline.run() print("PIPELINE OCID:", pipeline.id) print("PIPELINE RUN OCID:", pipeline_run.id) return {"job_id": pipeline.id, "run_id": pipeline_run.id}
[docs] def delete(self) -> None: """ Delete Pipeline or Pipeline Run from OCID. """ if self.config["execution"].get("id"): pipeline_id = self.config["execution"]["id"] with AuthContext(auth=self.auth_type, profile=self.profile): Pipeline.from_ocid(pipeline_id).delete() print(f"Pipeline {pipeline_id} has been deleted.") elif self.config["execution"].get("run_id"): run_id = self.config["execution"]["run_id"] with AuthContext(auth=self.auth_type, profile=self.profile): PipelineRun.from_ocid(run_id).delete() print(f"Pipeline run {run_id} has been deleted.")
[docs] def cancel(self) -> None: """ Cancel Pipeline Run from OCID. """ run_id = self.config["execution"]["run_id"] with AuthContext(auth=self.auth_type, profile=self.profile): PipelineRun.from_ocid(run_id).cancel() print(f"Pipeline run {run_id} has been cancelled.")
[docs] def watch(self) -> None: """ Watch Pipeline Run from OCID. """ run_id = self.config["execution"]["run_id"] log_type = self.config["execution"].get("log_type") interval = self.config["execution"].get("interval") with AuthContext(auth=self.auth_type, profile=self.profile): PipelineRun.from_ocid(run_id).watch( interval=interval, log_type=log_type )
[docs] def init( self, uri: Union[str, None] = None, overwrite: bool = False, runtime_type: Union[str, None] = None, **kwargs: Dict, ) -> Union[str, None]: """Generates a starter YAML specification for an MLPipeline. Parameters ---------- overwrite: (bool, optional). Defaults to False. Overwrites the result specification YAML if exists. uri: (str, optional), Defaults to None. The filename to save the resulting specification template YAML. runtime_type: (str, optional). Defaults to None. The resource runtime type. **kwargs: Dict The optional arguments. Returns ------- Union[str, None] The YAML specification for the given resource if `uri` was not provided. `None` otherwise. """ with AuthContext(auth=self.auth_type, profile=self.profile): # define a pipeline step pipeline_step = ( PipelineStep("pipeline_step_name_1") .with_description("A step running a python script") .with_infrastructure(CustomScriptStep().init()) .with_runtime( JobRuntimeFactory.get_runtime( key=runtime_type or PythonRuntime().type ).init() ) ) # define a pipeline pipeline = ( Pipeline( name="Pipeline Name", spec=(self.config.get("infrastructure", {}) or {}), ) .with_step_details([pipeline_step]) .with_dag(["pipeline_step_name_1"]) .init() ) note = ( "# This YAML specification was auto generated by the `ads opctl init` command.\n" "# The more details about the jobs YAML specification can be found in the ADS documentation:\n" "# https://accelerated-data-science.readthedocs.io/en/latest/user_guide/pipeline/quick_start.html \n\n" ) return pipeline.to_yaml( uri=uri, overwrite=overwrite, note=note, filter_by_attribute_map=True, **kwargs, )