#!/usr/bin/env python
# -*- coding: utf-8; -*-
# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
import copy
import datetime
import logging
import os
import traceback
from typing import Any, Dict, List, Optional
import fsspec
import oci
import oci.util as oci_util
from ads.common import utils
from ads.common.oci_datascience import DSCNotebookSession, OCIDataScienceMixin
from ads.common.oci_logging import OCILog
from ads.common.oci_resource import ResourceNotFoundError
from ads.config import COMPARTMENT_OCID, NB_SESSION_OCID, PROJECT_OCID
from ads.jobs.builders.base import Builder
from ads.jobs.builders.infrastructure.dsc_job import DataScienceJob, DSCJob
from ads.jobs.builders.infrastructure.dsc_job_runtime import (
DataScienceJobRuntimeManager,
)
from ads.jobs.builders.runtimes.artifact import Artifact
from ads.jobs.builders.runtimes.python_runtime import (
GitPythonRuntime,
NotebookRuntime,
PythonRuntime,
ScriptRuntime,
)
from ads.pipeline.ads_pipeline_run import PipelineRun
from ads.pipeline.ads_pipeline_step import PipelineStep
from ads.pipeline.visualizer.base import GraphOrientation, PipelineVisualizer
from ads.pipeline.visualizer.graph_renderer import PipelineGraphRenderer
logger = logging.getLogger(__name__)
MAXIMUM_TIMEOUT = 1800
DEFAULT_WAITER_KWARGS = {"max_wait_seconds": MAXIMUM_TIMEOUT}
DEFAULT_OPERATION_KWARGS = {
"delete_related_pipeline_runs": True,
"delete_related_job_runs": True,
}
ALLOWED_OPERATION_KWARGS = [
"allow_control_chars",
"retry_strategy",
"delete_related_job_runs",
"delete_related_pipeline_runs",
"if_match",
"opc_request_id",
]
ALLOWED_WAITER_KWARGS = [
"max_interval_seconds",
"max_wait_seconds",
"succeed_on_not_found",
"wait_callback",
"fetch_func",
]
[docs]
class Pipeline(Builder):
"""Represents a Data Science Machine Learning Pipeline."""
CONST_ID = "id"
CONST_PIPELINE_ID = "pipelineId"
CONST_DISPLAY_NAME = "displayName"
CONST_STEP_DETAILS = "stepDetails"
CONST_STEP_OVERRIDE_DETAILS = "stepOverrideDetails"
CONST_COMPARTMENT_ID = "compartmentId"
CONST_PROJECT_ID = "projectId"
CONST_LOG_GROUP_ID = "logGroupId"
CONST_LOG_ID = "logId"
CONST_SERVICE_LOG_ID = "serviceLogId"
CONST_ENABLE_SERVICE_LOG = "enableServiceLog"
CONST_ENVIRONMENT_VARIABLES = "environmentVariables"
CONST_COMMAND_LINE_ARGUMENTS = "commandLineArguments"
CONST_CREATED_BY = "createdBy"
CONST_DESCRIPTION = "description"
CONST_TYPE = "type"
CONST_MAXIMUM_RUNTIME_IN_MINUTES = "maximumRuntimeInMinutes"
CONST_ENABLE_LOGGING = "enableLogging"
CONST_ENABLE_AUTO_LOG_CREATION = "enableAutoLogCreation"
CONST_CONFIGURATION_DETAILS = "configurationDetails"
CONST_CONFIGURATION_OVERRIDE_DETAILS = "configurationOverrideDetails"
CONST_LOG_CONFIGURATION_DETAILS = "logConfigurationDetails"
CONST_LOG_CONFIGURATION_OVERRIDE_DETAILS = "logConfigurationOverrideDetails"
CONST_FREEFROM_TAGS = "freeformTags"
CONST_DEFINED_TAGS = "definedTags"
CONST_SYSTEM_TAGS = "systemTags"
CONST_INFRA_CONFIG_DETAILS = "infrastructureConfigurationDetails"
CONST_SHAPE_NAME = "shapeName"
CONST_BLOCK_STORAGE_SIZE = "blockStorageSizeInGBs"
CONST_SHAPE_CONFIG_DETAILS = "shapeConfigDetails"
CONST_OCPUS = "ocpus"
CONST_MEMORY_IN_GBS = "memoryInGBs"
CONST_SERVICE_LOG_CATEGORY = "pipelinerunlog"
CONST_SERVICE = "datascience"
CONST_DAG = "dag"
LIFECYCLE_STATE_CREATING = "CREATING"
LIFECYCLE_STATE_ACTIVE = "ACTIVE"
LIFECYCLE_STATE_DELETING = "DELETING"
LIFECYCLE_STATE_FAILED = "FAILED"
LIFECYCLE_STATE_DELETED = "DELETED"
def __init__(self, name: str = None, spec: Dict = None, **kwargs) -> None:
"""Initialize a pipeline.
Parameters
----------
name: str
The name of the pipeline, default to None. If a name is not provided, a randomly generated easy to remember
name with timestamp will be generated, like 'strange-spider-2022-08-17-23:55.02'.
spec : dict, optional
Object specification, default to None
kwargs: dict
Specification as keyword arguments.
If spec contains the same key as the one in kwargs, the value from kwargs will be used.
- project_id: str
- compartment_id: str
- display_name: str
- description: str
- maximum_runtime_in_minutes: int
- environment_variables: dict(str, str)
- command_line_arguments: str
- log_id: str
- log_group_id: str
- enable_service_log: bool
- shape_name: str
- block_storage_size_in_gbs: int
- shape_config_details: dict
- step_details: list[PipelineStep]
- dag: list[str]
- defined_tags: dict(str, dict(str, object))
- freeform_tags: dict[str, str]
Attributes
----------
kind: str
The kind of the object as showing in YAML.
name: str
The name of pipeline.
id: str
The id of pipeline.
step_details: List[PipelineStep]
The step details of pipeline.
dag_details: List[str]
The dag details of pipeline.
log_group_id: str
The log group id of pipeline.
log_id: str
The log id of pipeline.
project_id: str
The project id of pipeline.
compartment_id: str
The compartment id of pipeline.
created_by: str
The created by of pipeline.
description: str
The description of pipeline.
environment_variable: dict
The environment variables of pipeline.
argument: str
The command line argument of pipeline.
maximum_runtime_in_minutes: int
The maximum runtime in minutes of pipeline.
shape_name: str
The shape name of pipeline infrastructure.
block_storage_size_in_gbs: int
The block storage of pipeline infrastructure.
shape_config_details: dict
The shape config details of pipeline infrastructure.
enable_service_log: bool
The value to enable service log or not.
service_log_id: str
The service log id of pipeline.
status: str
The status of the pipeline.
Methods
-------
with_name(self, name: str) -> Pipeline
Sets the name of pipeline.
with_id(self, id: str) -> Pipeline
Sets the ocid of pipeline.
with_step_details(self, step_details: List[PipelineStep]) -> Pipeline
Sets the step details of pipeline.
with_dag_details(self, dag_details: List[str]) -> Pipeline
Sets the dag details of pipeline.
with_log_group_id(self, log_group_id: str) -> Pipeline
Sets the log group id of pipeline.
with_log_id(self, log_id: str) -> Pipeline
Sets the log id of pipeline.
with_project_id(self, project_id: str) -> Pipeline
Sets the project id of pipeline.
with_compartment_id(self, compartment_id: str) -> Pipeline
Sets the compartment id of pipeline.
with_created_by(self, created_by: str) -> Pipeline
Sets the created by of pipeline.
with_description(self, description: str) -> Pipeline
Sets the description of pipeline.
with_environment_variable(self, **kwargs) -> Pipeline
Sets the environment variables of pipeline.
with_argument(self, *args, **kwargs) -> Pipeline
Sets the command line arguments of pipeline.
with_maximum_runtime_in_minutes(self, maximum_runtime_in_minutes: int) -> Pipeline
Sets the maximum runtime in minutes of pipeline.
with_freeform_tags(self, freeform_tags: Dict) -> Pipeline
Sets the freeform tags of pipeline.
with_defined_tags(self, defined_tags: Dict) -> Pipeline
Sets the defined tags of pipeline.
with_shape_name(self, shape_name: str) -> Pipeline
Sets the shape name of pipeline infrastructure.
with_block_storage_size_in_gbs(self, block_storage_size_in_gbs: int) -> Pipeline
Sets the block storage size of pipeline infrastructure.
with_shape_config_details(self, shape_config_details: Dict) -> Pipeline
Sets the shape config details of pipeline infrastructure.
with_enable_service_log(self, enable_service_log: bool) -> Pipeline
Sets the value to enable the service log of pipeline.
to_dict(self) -> dict:
Serializes the pipeline specifications to a dictionary.
from_dict(cls, obj_dict: dict):
Initializes the object from a dictionary.
create(self, delete_if_fail: bool = True) -> Pipeline
Creates an ADS pipeline.
show(self, rankdir: str = GraphOrientation.TOP_BOTTOM)
Render pipeline with step information in a graph.
to_svg(self, uri: str = None, rankdir: str = GraphOrientation.TOP_BOTTOM, **kwargs) -> str:
Renders pipeline as graph into SVG.
run(self, display_name: Optional[str] = None, project_id: Optional[str] = None, compartment_id: Optional[str] = None, configuration_override_details: Optional[dict] = None, log_configuration_override_details: Optional[dict] = None, step_override_details: Optional[list] = None, free_form_tags: Optional[dict] = None, defined_tags: Optional[dict] = None, system_tags: Optional[dict] = None) -> PipelineRun
Creates and/or overrides an ADS pipeline run.
delete(self, delete_related_pipeline_runs: Optional[bool] = True, delete_related_job_runs: Optional[bool] = True, max_wait_seconds: Optional[int] = MAXIMUM_TIMEOUT, **kwargs) -> Pipeline
Deletes an ADS pipeline run.
from_ocid(cls, ocid: str) -> Pipeline
Creates an ADS pipeline from ocid.
from_id(cls, id: str) -> Pipeline
Creates an ADS pipeline from ocid.
to_yaml(self, uri=None, **kwargs)
Returns Pipeline serialized as a YAML string
from_yaml(cls, yaml_string=None, uri=None, **kwargs)
Creates an Pipeline from YAML string provided or from URI location containing YAML string
list(cls, compartment_id: Optional[str] = None, **kwargs) -> List[Pipeline]
List pipelines in a given compartment.
run_list(self, **kwargs) -> List[PipelineRun]
Gets a list of runs of the pipeline.
Example
-------
Here is an example for creating and running a pipeline using builder:
.. code-block:: python
from ads.pipeline import Pipeline, CustomScriptStep, ScriptRuntime
# Define an OCI Data Science pipeline
pipeline = (
Pipeline(name="<pipeline_name>")
.with_compartment_id("<compartment_id>")
.with_project_id("<project_id>")
.with_log_group_id("<log_group_id>")
.with_log_id("<log_id>")
.with_description("<description>")
.with_maximum_runtime_in_minutes(200)
.with_argument("argument", key="value")
.with_environment_variable(env="value")
.with_freeform_tags({"key": "value"})
.with_step_details([
(
PipelineStep(name="PipelineStepOne")
.with_job_id("<job_id>")
.with_description("<description>")
),
(
PipelineStep(name="PipelineStepTwo")
.with_infrastructure(
CustomScriptStep()
.with_shape_name("VM.Standard2.1")
.with_block_storage_size(50)
)
.with_runtime(
ScriptRuntime()
.with_source("oci://bucket_name@namespace/path/to/script.py")
.with_service_conda("tensorflow26_p37_cpu_v2")
.with_environment_variable(ENV="value")
.with_argument("argument", key="value")
.with_maximum_runtime_in_minutes(200)
)
)
])
.with_dag_details(["PipelineStepOne >> PipelineStepTwo"])
)
# Create and Run the pipeline
run = pipeline.create().run()
# Stream the pipeline run outputs
run.watch()
See Also
--------
https://docs.oracle.com/en-us/iaas/tools/ads-sdk/latest/user_guide/pipeline/index.html
"""
self.attribute_set = {
"id",
"createdBy",
"projectId",
"compartmentId",
"description",
"pipelineId",
"displayName",
"configurationDetails",
"logConfigurationDetails",
"infrastructureConfigurationDetails",
"stepDetails",
"lifecycleState",
"freeformTags",
"definedTags",
"systemTags",
"dag",
"logId",
"logGroupId",
}
self.attribute_map = {
"projectId": "project_id",
"compartmentId": "compartment_id",
"displayName": "display_name",
"description": "description",
"maximumRuntimeInMinutes": "maximum_runtime_in_minutes",
"environmentVariables": "environment_variables",
"commandLineArguments": "command_line_arguments",
"logId": "log_id",
"logGroupId": "log_group_id",
"enableServiceLog": "enable_service_log",
"stepDetails": "step_details",
"freeformTags": "freeform_tags",
"definedTags": "defined_tags",
"shapeName": "shape_name",
"blockStorageSizeInGBs": "block_storage_size_in_gbs",
"shapeConfigDetails": "shape_config_details",
"dag": "dag",
"id": "id",
}
self._artifact_content_map = {}
super().__init__(spec=spec, **kwargs)
name = (
name
or self.get_spec(self.CONST_DISPLAY_NAME)
or utils.get_random_name_for_resource()
)
self.set_spec(self.CONST_DISPLAY_NAME, name)
if "dag" in kwargs:
self.with_dag(kwargs.get("dag"))
elif spec and "dag" in spec:
self.with_dag(spec.get("dag"))
self.data_science_pipeline = None
self.service_logging = None
def _load_default_properties(self) -> Dict:
"""Load default properties from environment variables, notebook session, etc.
Returns
-------
Dict
A dictionary of default properties.
"""
defaults = super()._load_default_properties()
if COMPARTMENT_OCID:
defaults[self.CONST_COMPARTMENT_ID] = COMPARTMENT_OCID
if PROJECT_OCID:
defaults[self.CONST_PROJECT_ID] = PROJECT_OCID
if NB_SESSION_OCID:
try:
nb_session = DSCNotebookSession.from_ocid(NB_SESSION_OCID)
nb_config = nb_session.notebook_session_configuration_details
defaults[self.CONST_SHAPE_NAME] = nb_config.shape
defaults[
self.CONST_BLOCK_STORAGE_SIZE
] = nb_config.block_storage_size_in_gbs
if nb_config.notebook_session_shape_config_details:
notebook_shape_config_details = oci_util.to_dict(
nb_config.notebook_session_shape_config_details
)
defaults[self.CONST_SHAPE_CONFIG_DETAILS] = copy.deepcopy(
notebook_shape_config_details
)
except Exception as e:
logger.warning(
f"Error fetching details about Notebook "
f"session: {NB_SESSION_OCID}. {e}"
)
logger.debug(traceback.format_exc())
return defaults
@property
def kind(self) -> str:
"""The kind of the object as showing in YAML.
Returns
-------
str
pipeline
"""
return "pipeline"
@property
def name(self) -> str:
"""The name of the pipeline.
Returns
-------
str
The name of the pipeline.
"""
return self.get_spec(self.CONST_DISPLAY_NAME)
[docs]
def with_name(self, name: str) -> "Pipeline":
"""Sets the name of pipeline.
Parameters
----------
name: str
The name of pipeline.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(self.CONST_DISPLAY_NAME, name)
@property
def id(self) -> str:
"""The id of the pipeline.
Returns
-------
str
The id of the pipeline.
"""
return self.get_spec(self.CONST_ID)
[docs]
def with_id(self, id: str) -> "Pipeline":
"""Sets the id of pipeline.
Parameters
----------
id: str
The id of pipeline.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(self.CONST_ID, id)
@property
def step_details(self) -> List["PipelineStep"]:
"""The step details of the pipeline.
Returns
-------
list
The step details of the pipeline.
"""
return self.get_spec(self.CONST_STEP_DETAILS)
[docs]
def with_step_details(self, step_details: List["PipelineStep"]) -> "Pipeline":
"""Sets the pipeline step details for the pipeline.
Parameters
----------
step_details: list
A list of steps in the pipeline.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
self.set_spec(self.CONST_DAG, None)
return self.set_spec(self.CONST_STEP_DETAILS, step_details)
@property
def dag(self) -> List[str]:
"""The dag details of the pipeline.
Returns
-------
list
The dag details of the pipeline.
"""
return self.get_spec(self.CONST_DAG)
[docs]
def with_dag(self, dag: List[str]) -> "Pipeline":
"""Sets the pipeline dag details for the pipeline.
Parameters
----------
dag: list
A list of dag representing step dependencies in the pipeline.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
self.set_spec(self.CONST_DAG, dag)
if not self.step_details:
raise ValueError("Pipeline step details must be specified first.")
stepname_to_step_map = {x.name: x for x in self.step_details}
updated_step_details = Pipeline._add_dag_to_node(dag, stepname_to_step_map)
return self.set_spec(self.CONST_STEP_DETAILS, updated_step_details)
@staticmethod
def _add_dag_to_node(
dag: List[str], stepname_to_step_map: dict
) -> List["PipelineStep"]:
"""Add dependencies to pipeline steps.
Parameters
----------
dag: list
A list of dag representing step dependencies in the pipeline.
stepname_to_step_map: dict
A dict mapping PipelineStep name to the PipelineStep object.
Returns
-------
List
A list of PipelineStep.
"""
dag_mapping = {x: [] for x in stepname_to_step_map}
updated_step_details = []
if dag:
for dag_line in dag:
dependency = [
x.strip().strip("()").split(",") for x in dag_line.split(">>")
]
for i in range(len(dependency) - 1):
for node1 in dependency[i]:
for node2 in dependency[i + 1]:
node1 = node1.strip()
node2 = node2.strip()
if node1 not in stepname_to_step_map:
raise ValueError(
f"Pipeline step with name {node1} does not exist. Please provide a valid step name."
)
if node2 not in stepname_to_step_map:
raise ValueError(
f"Pipeline step with name {node2} does not exist. Please provide a valid step name."
)
dag_mapping[node2].append(node1)
for node, prev_list in dag_mapping.items():
node_list = [stepname_to_step_map[x] for x in prev_list]
stepname_to_step_map[node]._with_depends_on(node_list)
updated_step_details.append(stepname_to_step_map[node])
return updated_step_details
@property
def log_group_id(self) -> str:
"""The log group id of the pipeline.
Returns
-------
str:
The log group id of the pipeline.
"""
return self.get_spec(self.CONST_LOG_GROUP_ID)
[docs]
def with_log_group_id(self, log_group_id: str) -> "Pipeline":
"""Sets the log group id of the pipeline.
Parameters
----------
log_group_id: str
The log group id of the pipeline.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(self.CONST_LOG_GROUP_ID, log_group_id)
@property
def log_id(self) -> str:
"""The log id of the pipeline.
Returns
-------
str:
The log id of the pipeline.
"""
return self.get_spec(self.CONST_LOG_ID)
[docs]
def with_log_id(self, log_id: str) -> "Pipeline":
"""Sets the log id of the pipeline.
Parameters
----------
log_id: str
The log id of the pipeline.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(self.CONST_LOG_ID, log_id)
@property
def project_id(self) -> str:
"""The project id of the pipeline.
Returns
-------
str:
The project id of the pipeline.
"""
return self.get_spec(self.CONST_PROJECT_ID)
[docs]
def with_project_id(self, project_id: str) -> "Pipeline":
"""Sets the project id of the pipeline.
Parameters
----------
project_id: str
The project id of the pipeline.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(self.CONST_PROJECT_ID, project_id)
@property
def compartment_id(self) -> str:
"""The compartment id of the pipeline.
Returns
-------
str:
The compartment id of the pipeline.
"""
return self.get_spec(self.CONST_COMPARTMENT_ID)
[docs]
def with_compartment_id(self, compartment_id: str) -> "Pipeline":
"""Sets the compartment id of the pipeline.
Parameters
----------
compartment_id: str
The compartment id of the pipeline.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(self.CONST_COMPARTMENT_ID, compartment_id)
@property
def created_by(self) -> str:
"""The id that creates the pipeline.
Returns
-------
str:
The id that creates the pipeline.
"""
return self.get_spec(self.CONST_CREATED_BY)
[docs]
def with_created_by(self, created_by: str) -> "Pipeline":
"""Sets the id that creates the pipeline.
Parameters
----------
created_by: str
The id that creates the pipeline.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(self.CONST_CREATED_BY, created_by)
@property
def description(self) -> str:
"""The description of pipeline.
Returns
-------
str:
The description of pipeline.
"""
return self.get_spec(self.CONST_DESCRIPTION)
[docs]
def with_description(self, description: str) -> "Pipeline":
"""Sets the description of the pipeline.
Parameters
----------
description: str
The description of the pipeline.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(self.CONST_DESCRIPTION, description)
@property
def environment_variable(self) -> dict:
"""The environment variables of the pipeline.
Returns
-------
dict:
The environment variables of the pipeline.
"""
return self.get_spec(self.CONST_ENVIRONMENT_VARIABLES)
[docs]
def with_environment_variable(self, **kwargs) -> "Pipeline":
"""Sets environment variables of the pipeline.
Parameters
----------
kwargs:
Keyword arguments.
To add a keyword argument without value, set the value to None.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
if kwargs:
environment_variable_dict = {}
for k, v in kwargs.items():
environment_variable_dict[k] = v
self.set_spec(self.CONST_ENVIRONMENT_VARIABLES, environment_variable_dict)
return self
@property
def argument(self) -> str:
"""The command line arguments of the pipeline.
Returns
-------
str:
The command line arguments of the pipeline.
"""
return self.get_spec(self.CONST_COMMAND_LINE_ARGUMENTS)
[docs]
def with_argument(self, *args, **kwargs) -> "Pipeline":
"""Adds command line arguments to the pipeline.
Existing arguments will be preserved.
This method can be called (chained) multiple times to add various arguments.
For example, pipeline.with_argument(key="val").with_argument("path/to/file")
will result in: "--key val path/to/file"
Parameters
----------
args:
Positional arguments.
In a single method call, positional arguments are always added before keyword arguments.
You can call with_argument() to add positional arguments after keyword arguments.
kwargs:
Keyword arguments.
To add a keyword argument without value, set the value to None.
Returns
-------
Pipeline
The Pipeline instance (self).
Raises
------
ValueError
Keyword arguments with space in a key.
"""
arg_values = self.get_spec(self.CONST_COMMAND_LINE_ARGUMENTS, [])
args = [str(arg) for arg in args]
arg_values.extend(args)
for k, v in kwargs.items():
if " " in k:
raise ValueError("Argument key %s cannot contain space.", str(k))
arg_values.append(f"--{str(k)}")
# Ignore None value
if v is None:
continue
arg_values.append(str(v))
arg_string = " ".join(arg_values)
self.set_spec(self.CONST_COMMAND_LINE_ARGUMENTS, arg_string)
return self
@property
def maximum_runtime_in_minutes(self) -> int:
"""The maximum runtime in minutes of the pipeline.
Returns
-------
int:
The maximum runtime minutes of the pipeline.
"""
return self.get_spec(self.CONST_MAXIMUM_RUNTIME_IN_MINUTES)
[docs]
def with_maximum_runtime_in_minutes(
self, maximum_runtime_in_minutes: int
) -> "Pipeline":
"""Sets the maximum runtime in minutes of the pipeline.
Parameters
----------
maximum_runtime_in_minutes: int
The maximum_runtime_in_minutes of the pipeline.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(
self.CONST_MAXIMUM_RUNTIME_IN_MINUTES, maximum_runtime_in_minutes
)
@property
def shape_name(self) -> str:
"""The shape name of pipeline infrastructure.
Returns
-------
str:
The shape name of the pipeline infrastructure.
"""
return self.get_spec(self.CONST_SHAPE_NAME)
[docs]
def with_shape_name(self, shape_name: str) -> "Pipeline":
"""Sets the shape name of pipeline infrastructure.
Parameters
----------
shape_name: str
The shape name of the pipeline infrastructure.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(self.CONST_SHAPE_NAME, shape_name)
@property
def block_storage_size_in_gbs(self) -> int:
"""The block storage size of pipeline infrastructure.
Returns
-------
int:
The block storage size of the pipeline infrastructure.
"""
return self.get_spec(self.CONST_BLOCK_STORAGE_SIZE)
[docs]
def with_block_storage_size_in_gbs(
self, block_storage_size_in_gbs: int
) -> "Pipeline":
"""Sets the block storage size of pipeline infrastructure.
Parameters
----------
block_storage_size_in_gbs: int
The block storage size of pipeline infrastructure.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(self.CONST_BLOCK_STORAGE_SIZE, block_storage_size_in_gbs)
@property
def shape_config_details(self) -> dict:
"""The shape config details of pipeline infrastructure.
Returns
-------
dict:
The shape config details of the pipeline infrastructure.
"""
return self.get_spec(self.CONST_SHAPE_CONFIG_DETAILS)
[docs]
def with_shape_config_details(
self, memory_in_gbs: float, ocpus: float, **kwargs: Dict[str, Any]
) -> "Pipeline":
"""
Sets the shape config details of pipeline infrastructure.
Specify only when a flex shape is selected.
For example `VM.Standard.E3.Flex` allows the memory_in_gbs and cpu count to be specified.
Parameters
----------
memory_in_gbs: float
The size of the memory in GBs.
ocpus: float
The OCPUs count.
kwargs
Additional keyword arguments.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(
self.CONST_SHAPE_CONFIG_DETAILS,
{
self.CONST_OCPUS: ocpus,
self.CONST_MEMORY_IN_GBS: memory_in_gbs,
**kwargs,
},
)
@property
def enable_service_log(self) -> bool:
"""Enables service log of pipeline.
Returns
-------
bool:
The bool value to enable service log of pipeline.
"""
return self.get_spec(self.CONST_ENABLE_SERVICE_LOG)
[docs]
def with_enable_service_log(self, enable_service_log: bool) -> "Pipeline":
"""Sets the bool value to enable the service log of pipeline.
Parameters
----------
enable_service_log: bool
The value to enable the service log of pipeline.
Returns
-------
Pipeline
The Pipeline instance (self).
"""
return self.set_spec(self.CONST_ENABLE_SERVICE_LOG, enable_service_log)
@property
def service_log_id(self) -> str:
"""The service log id of pipeline.
Returns
-------
str:
The service log id of pipeline.
"""
return self.get_spec(self.CONST_SERVICE_LOG_ID)
[docs]
def to_dict(self, **kwargs) -> dict:
"""Serializes the pipeline specifications to a dictionary.
Returns
-------
dict
A dictionary containing pipeline specifications.
"""
dict_details = copy.deepcopy(super().to_dict(**kwargs))
dict_details["spec"][self.CONST_DAG] = self.get_spec(self.CONST_DAG)
step_details_list = []
if self.step_details:
for step in self.step_details:
if isinstance(step, PipelineStep):
step = step.to_dict()
if not isinstance(step, dict):
raise TypeError("Pipeline step is not a valid type")
step_dict = copy.deepcopy(step)
step_dict["spec"].pop("dependsOn", None)
step_details_list.append(step_dict)
dict_details["spec"][self.CONST_STEP_DETAILS] = step_details_list
return dict_details
[docs]
@classmethod
def from_dict(cls, obj_dict: dict):
"""Initializes the object from a dictionary."""
temp_obj_dict = copy.deepcopy(obj_dict)
step_mapping = {}
for step in temp_obj_dict["spec"][cls.CONST_STEP_DETAILS]:
pipeline_step = PipelineStep.from_dict(step)
step_mapping[pipeline_step.name] = pipeline_step
if cls.CONST_DAG not in temp_obj_dict["spec"]:
temp_obj_dict["spec"][cls.CONST_DAG] = None
step_details = Pipeline._add_dag_to_node(
temp_obj_dict["spec"][cls.CONST_DAG], step_mapping
)
temp_obj_dict["spec"][cls.CONST_STEP_DETAILS] = step_details
return cls(spec=temp_obj_dict["spec"])
[docs]
def create(self, delete_if_fail: bool = True) -> "Pipeline":
"""Creates an ADS pipeline.
Returns
-------
Pipeline:
The ADS Pipeline instance.
"""
pipeline_details = self.__pipeline_details()
self.data_science_pipeline = DataSciencePipeline(**pipeline_details).create(
self.step_details, delete_if_fail
)
self.set_spec(self.CONST_ID, self.data_science_pipeline.id)
if self.enable_service_log and not self.service_log_id:
try:
self.__create_service_log()
except Exception as ex:
logger.warning("Failed to create service log: %s", str(ex))
return self
def _show(self) -> PipelineVisualizer:
"""
Prepeares `PipelineVisualizer` instance to render a graph.
Returns
-------
PipelineVisualizer
"""
return (
PipelineVisualizer()
.with_renderer(PipelineGraphRenderer(show_status=False))
.with_pipeline(self)
)
[docs]
def show(self, rankdir: str = GraphOrientation.TOP_BOTTOM) -> None:
"""
Render pipeline with step information in a graph
Returns
-------
None
"""
self._show().render(rankdir=rankdir)
[docs]
def to_svg(
self, uri: str = None, rankdir: str = GraphOrientation.TOP_BOTTOM, **kwargs
) -> str:
"""
Renders pipeline as graph in svg string.
Parameters
----------
uri: (string, optional). Defaults to None.
URI location to save the SVG string.
rankdir: str, default to "TB".
Direction of the rendered graph; allowed Values are {"TB", "LR"}.
Returns
-------
str
Graph in svg format.
"""
return self._show().to_svg(uri=uri, rankdir=rankdir, **kwargs)
[docs]
def run(
self,
display_name: Optional[str] = None,
project_id: Optional[str] = None,
compartment_id: Optional[str] = None,
configuration_override_details: Optional[dict] = None,
log_configuration_override_details: Optional[dict] = None,
step_override_details: Optional[list] = None,
free_form_tags: Optional[dict] = None,
defined_tags: Optional[dict] = None,
system_tags: Optional[dict] = None,
) -> "PipelineRun":
"""Creates an ADS pipeline run.
Parameters
----------
display_name: str, optional
The display name to override the one defined previously. Defaults to None.
project_id: str, optional
The project id to override the one defined previously. Defaults to None.
compartment_id: str, optional
The compartment id to override the one defined previously. Defaults to None.
configuration_override_details: dict, optional
The configuration details dictionary to override the one defined previously.
Defaults to None.
The configuration_override_details contains the following keys:
* "type": str, only "DEFAULT" is allowed.
* "environment_variables": dict, optional, the environment variables
* "command_line_arguments": str, optional, the command line arguments
* "maximum_runtime_in_minutes": int, optional, the maximum runtime allowed in minutes
log_configuration_override_details: dict(str, str), optional
The log configuration details dictionary to override the one defined previously.
Defaults to None.
The log_configuration_override_details contains the following keys:
* "log_group_id": str, optional, the log group id
* "log_id": str, optional, the log id
step_override_details: list[PipelineStepOverrideDetails], optional
The step details list to override the one defined previously.
Defaults to None.
The PipelineStepOverrideDetails is a dict which contains the following keys:
* step_name: str, the name of step to override
* step_configuration_details: dict, which contains:
* "maximum_runtime_in_minutes": int, optional
* "environment_variables": dict, optional
* "command_line_arguments": str, optional
free_form_tags: dict(str, str), optional
The free from tags dictionary to override the one defined previously.
Defaults to None.
defined_tags: dict(str, dict(str, object)), optional
The defined tags dictionary to override the one defined previously.
Defaults to None.
system_tags: dict(str, dict(str, object)), optional
The system tags dictionary to override the one defined previously.
Defaults to None.
Example
--------
.. code-block:: python
# Creates a pipeline run using pipeline configurations
pipeline.run()
# Creates a pipeline run by overriding pipeline configurations
pipeline.run(
display_name="OverrideDisplayName",
configuration_override_details={
"maximum_runtime_in_minutes":30,
"type":"DEFAULT",
"environment_variables": {
"key": "value"
},
"command_line_arguments": "ARGUMENT --KEY VALUE",
},
log_configuration_override_details={
"log_group_id": "<log_group_id>"
},
step_override_details=[{
"step_name" : "<step_name>",
"step_configuration_details" : {
"maximum_runtime_in_minutes": 200,
"environment_variables": {
"1":"2"
},
"command_line_arguments": "argument --key value",
}
}]
)
Returns
-------
PipelineRun:
The ADS PipelineRun instance.
"""
pipeline_details = self.__pipeline_details()
self.__override_configurations(
pipeline_details,
display_name,
project_id,
compartment_id,
configuration_override_details,
log_configuration_override_details,
step_override_details,
free_form_tags,
defined_tags,
system_tags,
)
if not self.data_science_pipeline:
self.data_science_pipeline = DataSciencePipeline(**pipeline_details)
if self.enable_service_log:
return self.data_science_pipeline.run(
pipeline_details, self.service_logging
)
return self.data_science_pipeline.run(pipeline_details)
[docs]
def delete(
self,
delete_related_pipeline_runs: Optional[bool] = True,
delete_related_job_runs: Optional[bool] = True,
max_wait_seconds: Optional[int] = MAXIMUM_TIMEOUT,
**kwargs,
) -> "Pipeline":
"""Deteles an ADS pipeline.
Parameters
----------
delete_related_pipeline_runs: bool, optional
Specify whether to delete related PipelineRuns or not. Defaults to True.
delete_related_job_runs: bool, optional
Specify whether to delete related JobRuns or not. Defaults to True.
max_wait_seconds: int, optional
The maximum time to wait, in seconds. Defaults to 1800.
kwargs: optional
The kwargs to be executed when deleting the pipeline.
The allowed keys are:
* "allow_control_chars": bool, to indicate whether or not this request should
allow control characters in the response object. By default, the response will
not allow control characters in strings.
* "retry_strategy": obj, to apply to this specific operation/call. This will
override any retry strategy set at the client-level. This should be one of the
strategies available in the :py:mod:`~oci.retry` module. This operation will not
retry by default, users can also use the convenient :py:data:`~oci.retry.DEFAULT_RETRY_STRATEGY`
provided by the SDK to enable retries for it. The specifics of the default retry
strategy are described `here <https://docs.oracle.com/en-us/iaas/tools/python/latest/sdk_behaviors/retries.html>`__.
To have this operation explicitly not perform any retries, pass an instance of :py:class:`~oci.retry.NoneRetryStrategy`.
* "if_match": str, for optimistic concurrency control. In the PUT or DELETE call
for a resource, set the `if-match` parameter to the value of the etag from a
previous GET or POST response for that resource. The resource is updated or
deleted only if the `etag` you provide matches the resource's current `etag` value.
* "opc_request_id": str, unique Oracle assigned identifier for the request.
If you need to contact Oracle about a particular request, then provide the request ID.
* "max_interval_seconds": int, the maximum interval between queries, in seconds.
* "succeed_on_not_found": bool, to determine whether or not the waiter should
return successfully if the data we're waiting on is not found
(e.g. a 404 is returned from the service). This defaults to False and so a 404 would
cause an exception to be thrown by this function. Setting it to True may be useful in
scenarios when waiting for a resource to be terminated/deleted since it is possible that
the resource would not be returned by the a GET call anymore.
* "wait_callback": A function which will be called each time that we have to do an initial
wait (i.e. because the property of the resource was not in the correct state,
or the ``evaluate_response`` function returned False). This function should take two
arguments - the first argument is the number of times we have checked the resource,
and the second argument is the result of the most recent check.
* "fetch_func": A function to be called to fetch the updated state from the server.
This can be used if the call to check for state needs to be more complex than a single
GET request. For example, if the goal is to wait until an item appears in a list,
fetch_func can be a function that paginates through a full list on the server.
Returns
-------
Pipeline:
The ADS Pipeline instance.
"""
if not self.data_science_pipeline:
self.data_science_pipeline = DataSciencePipeline()
operation_kwargs = {
"delete_related_pipeline_runs": delete_related_pipeline_runs,
"delete_related_job_runs": delete_related_job_runs,
}
waiter_kwargs = {"max_wait_seconds": max_wait_seconds}
for key, value in kwargs.items():
if key in ALLOWED_OPERATION_KWARGS:
operation_kwargs[key] = value
elif key in ALLOWED_WAITER_KWARGS:
waiter_kwargs[key] = value
self.data_science_pipeline.delete(
id=self.id, operation_kwargs=operation_kwargs, waiter_kwargs=waiter_kwargs
)
return self
[docs]
def download(
self, to_dir: str, override_if_exists: Optional[bool] = False
) -> "Pipeline":
"""Downloads artifacts from pipeline.
Parameters
----------
to_dir : str
Local directory to which the artifacts will be downloaded to.
override_if_exists: bool, optional
Bool to decide whether to override existing folder/file or not. Defaults to False.
Returns
-------
Pipeline:
The ADS Pipeline instance.
"""
if not self.data_science_pipeline or not self.id:
print("Pipeline hasn't been created.")
return self
if (
self.data_science_pipeline.sync().lifecycle_state
!= self.LIFECYCLE_STATE_ACTIVE
):
print("Pipeline hasn't been created or not in ACTIVE state.")
return self
pipeline_folder = os.path.join(to_dir, self.id)
if not os.path.exists(pipeline_folder):
print("Creating directory: " + pipeline_folder)
os.mkdir(pipeline_folder)
elif not override_if_exists:
print(
f"Folder {pipeline_folder} already exists. Set override_if_exists to True to override."
)
return self
for step in self.step_details:
if step.kind == "CUSTOM_SCRIPT":
res = self.data_science_pipeline.client.get_step_artifact_content(
self.id, step.name
)
if not res or not res.data or not res.data.raw:
print(f"Failed to download {step.name} artifact.")
return self
content_disposition = res.headers.get("Content-Disposition", "")
artifact_name = str(content_disposition).replace(
"attachment; filename=", ""
)
step_folder = os.path.join(pipeline_folder, step.name)
if not os.path.exists(step_folder):
print("Creating directory: " + step_folder)
os.mkdir(step_folder)
elif not override_if_exists:
print(
f"Folder {step_folder} already exists. Set override_if_exists to True to override."
)
continue
file_name = os.path.join(step_folder, artifact_name)
with open(file_name, "wb") as f:
f.write(res.data.raw.read())
return self
def _populate_step_artifact_content(self):
"""Populates artifact information to CUSTOM_SCRIPT step.
This method is only invoked when the existing pipeline needs to be loaded.
"""
if (
not self.data_science_pipeline
or not self.id
or self.status != self.LIFECYCLE_STATE_ACTIVE
):
return
for step in self.step_details:
if step.kind == "CUSTOM_SCRIPT":
artifact_name = self._artifact_content_map.get(step.name)
if not artifact_name:
res = self.data_science_pipeline.client.get_step_artifact_content(
self.id, step.name
)
content_disposition = res.headers.get("Content-Disposition", "")
artifact_name = str(content_disposition).replace(
"attachment; filename=", ""
)
self._artifact_content_map[step.name] = artifact_name
if isinstance(step.runtime, ScriptRuntime):
step.runtime.with_script(artifact_name)
elif isinstance(step.runtime, PythonRuntime):
step.runtime.with_working_dir(artifact_name)
elif isinstance(step.runtime, NotebookRuntime):
step.runtime.with_notebook(artifact_name)
elif isinstance(step.runtime, GitPythonRuntime):
step.runtime.with_source(artifact_name)
[docs]
@classmethod
def from_ocid(cls, ocid: str) -> "Pipeline":
"""Creates a pipeline by OCID.
Parameters
----------
ocid: str
The OCID of pipeline.
Returns
-------
Pipeline:
The Pipeline instance.
"""
pipeline = DataSciencePipeline.from_ocid(ocid).build_ads_pipeline()
pipeline._populate_step_artifact_content()
return pipeline
[docs]
@classmethod
def from_id(cls, id: str) -> "Pipeline":
"""Creates a pipeline by OCID.
Parameters
----------
id: str
The OCID of pipeline.
Returns
-------
Pipeline:
The Pipeline instance.
"""
return cls.from_ocid(id)
def __create_service_log(self) -> "Pipeline":
"""Creates a service log for pipeline.
Returns
-------
Pipeline:
The ADS Pipeline instance.
"""
if not self.log_group_id:
raise ValueError(
"Log group OCID is not specified for this pipeline. Call with_log_group_id to add it."
)
if not self.id:
raise ValueError("Pipeline is not created yet. Call the create method.")
oci_service = oci.logging.models.OciService(
service=self.CONST_SERVICE,
resource=self.id,
category=self.CONST_SERVICE_LOG_CATEGORY,
)
archiving = oci.logging.models.Archiving(is_enabled=False)
configuration = oci.logging.models.Configuration(
source=oci_service, compartment_id=self.compartment_id, archiving=archiving
)
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M")
service_logging = OCILog(
display_name=self.name + f"-{timestamp}",
log_group_id=self.log_group_id,
log_type="SERVICE",
configuration=configuration,
annotation="service",
).create()
self.__set_service_logging_resource(service_logging)
self.set_spec(self.CONST_SERVICE_LOG_ID, self.service_logging.id)
return self
def __set_service_logging_resource(self, service_logging: OCILog):
"""Sets the service logging for pipeline.
Parameters
----------
service_logging: OCILog
An OCILog instance containing the service logging resources.
"""
self.service_logging = service_logging
def _convert_step_details_to_dag(
self, step_details: List["PipelineStep"] = []
) -> List[str]:
"""Converts step_details to the DAG representation.
Parameters
----------
step_details: list
A list of PipelineStep objects, default to empty list.
Returns
-------
List
A list of str representing step dependencies.
"""
dag_list = []
for step in step_details:
if isinstance(step, PipelineStep):
step = step.to_dict()["spec"]
step_name = step["name"]
else:
step_name = step["stepName"]
if not step["dependsOn"]:
continue
if len(step["dependsOn"]) == 1:
dag = step["dependsOn"][0] + " >> " + step_name
else:
dag = "(" + ", ".join(step["dependsOn"]) + ") >> " + step_name
dag_list.append(dag)
return dag_list
def __pipeline_details(self) -> dict:
"""Converts pipeline attributes to a dictionary.
Returns
-------
dict:
A dictionary that contains pipeline details.
"""
pipeline_details = copy.deepcopy(self._spec)
pipeline_details.pop(self.CONST_ENABLE_SERVICE_LOG, None)
pipeline_details.pop(self.CONST_SERVICE_LOG_ID, None)
pipeline_configuration_details = self.__pipeline_configuration_details(
pipeline_details
)
if pipeline_configuration_details:
pipeline_details[
self.CONST_CONFIGURATION_DETAILS
] = pipeline_configuration_details
pipeline_log_configuration_details = self.__pipeline_log_configuration_details(
pipeline_details
)
if pipeline_log_configuration_details:
pipeline_details[
self.CONST_LOG_CONFIGURATION_DETAILS
] = pipeline_log_configuration_details
pipeline_infrastructure_configuration_details = (
self.__pipeline_infrastructure_configuration_details(pipeline_details)
)
if pipeline_infrastructure_configuration_details:
pipeline_details[
self.CONST_INFRA_CONFIG_DETAILS
] = pipeline_infrastructure_configuration_details
if self.id:
pipeline_details[self.CONST_PIPELINE_ID] = self.id
pipeline_details.pop(self.CONST_ID)
step_details_list = self.__step_details(pipeline_details)
pipeline_details[self.CONST_STEP_DETAILS] = step_details_list
return pipeline_details
def __pipeline_configuration_details(self, pipeline_details: Dict) -> dict:
"""Converts pipeline configuration details to a dictionary.
Parameters
----------
pipeline_details: dict
A dictionary that contains pipeline details.
Returns
-------
dict:
A dictionary that contains pipeline configuration details.
"""
pipeline_configuration_details = {}
if self.maximum_runtime_in_minutes:
pipeline_configuration_details[
self.CONST_MAXIMUM_RUNTIME_IN_MINUTES
] = self.maximum_runtime_in_minutes
pipeline_details.pop(self.CONST_MAXIMUM_RUNTIME_IN_MINUTES)
if self.environment_variable:
pipeline_configuration_details[
self.CONST_ENVIRONMENT_VARIABLES
] = self.environment_variable
pipeline_details.pop(self.CONST_ENVIRONMENT_VARIABLES)
if self.argument:
pipeline_configuration_details[
self.CONST_COMMAND_LINE_ARGUMENTS
] = self.argument
pipeline_details.pop(self.CONST_COMMAND_LINE_ARGUMENTS)
pipeline_configuration_details[self.CONST_TYPE] = "DEFAULT"
return pipeline_configuration_details
def __pipeline_log_configuration_details(self, pipeline_details: Dict) -> dict:
"""Converts pipeline log configuration details to a dictionary.
Parameters
----------
pipeline_details: dict
A dictionary that contains pipeline details.
Returns
-------
dict:
A dictionary that contains pipeline log configuration details.
"""
pipeline_log_configuration_details = {}
if self.log_id:
pipeline_log_configuration_details[self.CONST_LOG_ID] = self.log_id
if not self.log_group_id:
try:
log_obj = OCILog.from_ocid(self.log_id)
except ResourceNotFoundError:
raise ResourceNotFoundError(
f"Unable to determine log group ID for Log ({self.log_id})."
" The log resource may not exist or You may not have the required permission."
" Try to avoid this by specifying the log group ID."
)
self.with_log_group_id(log_obj.log_group_id)
if self.log_group_id:
pipeline_log_configuration_details[
self.CONST_LOG_GROUP_ID
] = self.log_group_id
if self.log_id:
pipeline_log_configuration_details[self.CONST_ENABLE_LOGGING] = True
pipeline_log_configuration_details[
self.CONST_ENABLE_AUTO_LOG_CREATION
] = False
pipeline_details.pop(self.CONST_LOG_ID)
pipeline_details.pop(self.CONST_LOG_GROUP_ID, None)
else:
if self.log_group_id:
pipeline_log_configuration_details[self.CONST_ENABLE_LOGGING] = True
pipeline_log_configuration_details[
self.CONST_ENABLE_AUTO_LOG_CREATION
] = True
pipeline_details.pop(self.CONST_LOG_GROUP_ID)
else:
pipeline_log_configuration_details[self.CONST_ENABLE_LOGGING] = False
pipeline_log_configuration_details[
self.CONST_ENABLE_AUTO_LOG_CREATION
] = False
return pipeline_log_configuration_details
def __pipeline_infrastructure_configuration_details(
self, pipeline_details: Dict
) -> dict:
pipeline_infrastructure_details = {}
if self.shape_name:
pipeline_infrastructure_details[self.CONST_SHAPE_NAME] = self.shape_name
pipeline_details.pop(self.CONST_SHAPE_NAME)
if self.block_storage_size_in_gbs:
pipeline_infrastructure_details[
self.CONST_BLOCK_STORAGE_SIZE
] = self.block_storage_size_in_gbs
pipeline_details.pop(self.CONST_BLOCK_STORAGE_SIZE)
if self.shape_config_details:
pipeline_infrastructure_details[
self.CONST_SHAPE_CONFIG_DETAILS
] = self.shape_config_details
pipeline_details.pop(self.CONST_SHAPE_CONFIG_DETAILS)
return pipeline_infrastructure_details
def __step_details(self, pipeline_details: Dict) -> list:
"""Converts pipeline step details to a dictionary.
Parameters
----------
pipeline_details: dict
A dictionary that contains pipeline details.
Returns
-------
list:
A list that contains pipeline step details.
"""
step_details_list = []
if self.step_details:
for step in self.step_details:
step_details = copy.deepcopy(step._spec)
step_details["stepName"] = step.name
step_details.pop("name", None)
if not step.depends_on:
step_details[step.CONST_DEPENDS_ON] = []
if not step.job_id:
step_infrastructure_configuration_details = (
self.__step_infrastructure_configuration_details(step)
)
step_details[
step.CONST_STEP_INFRA_CONFIG_DETAILS
] = step_infrastructure_configuration_details
step_details.pop(step.CONST_INFRASTRUCTURE, None)
step_details.pop(step.CONST_RUNTIME, None)
step_configuration_details = self.__step_configuration_details(
pipeline_details, step
)
step_details[
step.CONST_STEP_CONFIG_DETAILS
] = step_configuration_details
step_details.pop(self.CONST_MAXIMUM_RUNTIME_IN_MINUTES, None)
step_details.pop(self.CONST_ENVIRONMENT_VARIABLES, None)
step_details.pop(self.CONST_COMMAND_LINE_ARGUMENTS, None)
step_details_list.append(step_details)
return step_details_list
def __step_infrastructure_configuration_details(self, step) -> dict:
step_infrastructure_configuration_details = {}
step_infrastructure_configuration_details[
"blockStorageSizeInGBs"
] = step.infrastructure.block_storage_size
step_infrastructure_configuration_details[
"shapeName"
] = step.infrastructure.shape_name
step_infrastructure_configuration_details[
"shapeConfigDetails"
] = step.infrastructure.shape_config_details
return step_infrastructure_configuration_details
def __step_configuration_details(self, pipeline_details: Dict, step) -> dict:
step_configuration_details = {}
step_configuration_details[self.CONST_TYPE] = "DEFAULT"
if step.runtime:
payload = DataScienceJobRuntimeManager(step.infrastructure).translate(
step.runtime
)
if "job_configuration_details" in payload:
job_configuration_details = payload["job_configuration_details"]
if "environment_variables" in job_configuration_details:
step_configuration_details[
self.CONST_ENVIRONMENT_VARIABLES
] = job_configuration_details["environment_variables"]
if "command_line_arguments" in job_configuration_details:
step_configuration_details[
self.CONST_COMMAND_LINE_ARGUMENTS
] = job_configuration_details["command_line_arguments"]
if "maximum_runtime_in_minutes" in job_configuration_details:
step_configuration_details[
self.CONST_MAXIMUM_RUNTIME_IN_MINUTES
] = job_configuration_details["maximum_runtime_in_minutes"]
elif step.CONST_STEP_CONFIG_DETAILS in step._spec:
step_configuration_details = step._spec[step.CONST_STEP_CONFIG_DETAILS]
if len(step_configuration_details) == 1:
if step.environment_variable:
step_configuration_details[
self.CONST_ENVIRONMENT_VARIABLES
] = step.environment_variable
if step.argument:
step_configuration_details[
self.CONST_COMMAND_LINE_ARGUMENTS
] = step.argument
if step.maximum_runtime_in_minutes:
step_configuration_details[
self.CONST_MAXIMUM_RUNTIME_IN_MINUTES
] = step.maximum_runtime_in_minutes
if len(step_configuration_details) == 1:
if self.CONST_CONFIGURATION_DETAILS in pipeline_details:
step_configuration_details = pipeline_details[
self.CONST_CONFIGURATION_DETAILS
]
return step_configuration_details
def __override_configurations(
self,
pipeline_details,
display_name,
project_id,
compartment_id,
configuration_override_details,
log_configuration_override_details,
step_override_details,
free_form_tags,
defined_tags,
system_tags,
) -> dict:
if display_name:
pipeline_details[self.CONST_DISPLAY_NAME] = display_name
if project_id:
pipeline_details[self.CONST_PROJECT_ID] = project_id
if compartment_id:
pipeline_details[self.CONST_COMPARTMENT_ID] = compartment_id
if configuration_override_details:
configuration_override_details[self.CONST_TYPE] = "DEFAULT"
pipeline_details[
self.CONST_CONFIGURATION_OVERRIDE_DETAILS
] = self._standardize_spec(configuration_override_details)
if log_configuration_override_details:
pipeline_details[
self.CONST_LOG_CONFIGURATION_OVERRIDE_DETAILS
] = self._standardize_spec(log_configuration_override_details)
log_configuration_override_details = pipeline_details[
self.CONST_LOG_CONFIGURATION_OVERRIDE_DETAILS
]
if (
self.CONST_LOG_ID in log_configuration_override_details
and self.CONST_LOG_GROUP_ID not in log_configuration_override_details
):
try:
log_obj = OCILog.from_ocid(
log_configuration_override_details[self.CONST_LOG_ID]
)
except ResourceNotFoundError:
raise ResourceNotFoundError(
f"Unable to determine log group ID for Log ({log_configuration_override_details[self.CONST_LOG_ID]})."
" The log resource may not exist or You may not have the required permission."
" Try to avoid this by specifying the log group ID."
)
if log_obj and log_obj.log_group_id:
log_configuration_override_details[
self.CONST_LOG_GROUP_ID
] = log_obj.log_group_id
if self.CONST_LOG_ID in log_configuration_override_details:
log_configuration_override_details[self.CONST_ENABLE_LOGGING] = True
log_configuration_override_details[
self.CONST_ENABLE_AUTO_LOG_CREATION
] = False
else:
if self.CONST_LOG_GROUP_ID in log_configuration_override_details:
log_configuration_override_details[self.CONST_ENABLE_LOGGING] = True
log_configuration_override_details[
self.CONST_ENABLE_AUTO_LOG_CREATION
] = True
else:
log_configuration_override_details[
self.CONST_ENABLE_LOGGING
] = False
log_configuration_override_details[
self.CONST_ENABLE_AUTO_LOG_CREATION
] = False
if step_override_details:
step_override_details_list = []
for step in step_override_details:
step_detail = {}
step_detail["stepName"] = step["step_name"]
step_detail["stepConfigurationDetails"] = self._standardize_spec(
step["step_configuration_details"]
)
step_override_details_list.append(step_detail)
pipeline_details[
self.CONST_STEP_OVERRIDE_DETAILS
] = step_override_details_list
if free_form_tags:
pipeline_details[self.CONST_FREEFROM_TAGS] = free_form_tags
if defined_tags:
pipeline_details[self.CONST_DEFINED_TAGS] = defined_tags
if system_tags:
pipeline_details[self.CONST_SYSTEM_TAGS] = system_tags
# TODO: Needs to improve the validation logic
# Ticket: https://jira.oci.oraclecorp.com/browse/ODSC-31996
# @classmethod
# def from_yaml(cls, uri: str) -> "Pipeline":
# pipeline_schema = {}
# schema_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "schema")
# with open(
# os.path.join(schema_path, "pipeline_schema.json")
# ) as pipeline_schema_file:
# pipeline_schema = json.load(pipeline_schema_file)
# cs_step_schema = {}
# with open(
# os.path.join(schema_path, "cs_step_schema.json")
# ) as cs_step_schema_file:
# cs_step_schema = json.load(cs_step_schema_file)
# ml_step_schema = {}
# with open(
# os.path.join(schema_path, "ml_step_schema.json")
# ) as ml_step_schema_file:
# ml_step_schema = json.load(ml_step_schema_file)
# yaml_dict = yaml.load(Pipeline._read_from_file(uri=uri), Loader=yaml.FullLoader)
# pipeline_validator = Validator(pipeline_schema)
# if not pipeline_validator.validate(yaml_dict):
# raise ValueError(pipeline_validator.errors)
# step_details = yaml_dict["spec"]["stepDetails"]
# if len(step_details) == 0:
# raise ValueError("Pipeline must have at least one step.")
# ml_step_validator = Validator(ml_step_schema)
# cs_step_validator = Validator(cs_step_schema)
# for step in step_details:
# if not ml_step_validator.validate(step) and not cs_step_validator.validate(
# step
# ):
# if ml_step_validator.errors:
# raise ValueError(ml_step_validator.errors)
# else:
# raise ValueError(cs_step_validator.errors)
# return super().from_yaml(uri=uri)
[docs]
@classmethod
def list(cls, compartment_id: Optional[str] = None, **kwargs) -> List["Pipeline"]:
"""
List pipelines in a given compartment.
Parameters
----------
compartment_id: (str, optional). Defaults to None.
The OCID of compartment.
If `None`, the value will be taken from the environment variables.
kwargs
Additional keyword arguments for filtering pipelines.
- project_id: str
- lifecycle_state: str. Allowed values: "CREATING", "ACTIVE", "DELETING", "FAILED", "DELETED"
- created_by: str
- limit: int
Returns
-------
List[Pipeline]
The list of pipelines.
"""
result = []
for item in DataSciencePipeline.list_resource(compartment_id, **kwargs):
pipeline = item.build_ads_pipeline()
pipeline._populate_step_artifact_content()
result.append(pipeline)
return result
[docs]
def run_list(self, **kwargs) -> List[PipelineRun]:
"""Gets a list of runs of the pipeline.
Returns
-------
List[PipelineRun]
A list of pipeline run instances.
"""
return PipelineRun.list(
compartment_id=self.compartment_id, pipeline_id=self.id, **kwargs
)
@property
def status(self) -> Optional[str]:
"""Status of the pipeline.
Returns
-------
str
Status of the pipeline.
"""
if self.data_science_pipeline:
return self.data_science_pipeline.lifecycle_state
return None
[docs]
def init(self, **kwargs) -> "Pipeline":
"""Initializes a starter specification for the Pipeline.
Returns
-------
Pipeline
The Pipeline instance (self)
"""
return (
self.build()
.with_compartment_id(self.compartment_id or "{Provide a compartment OCID}")
.with_project_id(self.project_id or "{Provide a project OCID}")
)
[docs]
class DataSciencePipeline(OCIDataScienceMixin, oci.data_science.models.Pipeline):
[docs]
@classmethod
def from_ocid(cls, ocid: str) -> "DataSciencePipeline":
"""Gets a datascience pipeline by OCID.
Parameters
----------
ocid: str
The OCID of the datascience pipeline.
Returns
-------
DataSciencePipeline
An instance of DataSciencePipeline.
"""
return super().from_ocid(ocid)
[docs]
def build_ads_pipeline(self) -> "Pipeline":
"""Builds an ADS pipeline from OCI datascience pipeline.
Returns
-------
Pipeline:
ADS Pipeline instance.
"""
pipeline_details = self.to_dict()
ads_pipeline = Pipeline(pipeline_details["displayName"])
ads_pipeline.data_science_pipeline = self
for key in pipeline_details:
if key in ads_pipeline.attribute_set:
if key == "stepDetails":
step_details = []
for step in pipeline_details[key]:
step_details.append(self.build_ads_pipeline_step(step))
ads_pipeline.set_spec(key, step_details)
elif key in ["freeformTags", "systemTags", "definedTags"]:
ads_pipeline.set_spec(key, pipeline_details[key])
elif type(pipeline_details[key]) is dict:
for attribute in pipeline_details[key]:
ads_pipeline.set_spec(
attribute, pipeline_details[key][attribute]
)
else:
ads_pipeline.set_spec(key, pipeline_details[key])
dag_list = ads_pipeline._convert_step_details_to_dag(
pipeline_details["stepDetails"]
)
ads_pipeline.set_spec(Pipeline.CONST_DAG, dag_list)
return ads_pipeline
[docs]
def build_ads_pipeline_step(self, step: Dict) -> "PipelineStep":
"""Builds an ADS pipeline step from OCI pipeline response.
Parameters
----------
step: dict
A dictionary that contains the information of a pipeline step.
Returns
-------
Pipeline:
ADS PipelineStep instance.
"""
ads_pipeline_step = PipelineStep(step["stepName"])
for key in step:
if key in ads_pipeline_step.attribute_set:
infrastructure = DataScienceJob()
if key == ads_pipeline_step.CONST_STEP_INFRA_CONFIG_DETAILS:
for attribute in step[key]:
infrastructure.set_spec(attribute, step[key][attribute])
ads_pipeline_step.set_spec(
ads_pipeline_step.CONST_INFRASTRUCTURE, infrastructure
)
elif key == ads_pipeline_step.CONST_STEP_CONFIG_DETAILS:
if step["stepType"] == "CUSTOM_SCRIPT":
job_configuration_details_dict = {}
for attribute in step[key]:
job_configuration_details_dict[
infrastructure.CONST_JOB_TYPE
] = "DEFAULT"
job_configuration_details_dict[attribute] = step[key][
attribute
]
dsc_job = DSCJob(
job_configuration_details=job_configuration_details_dict
)
runtime = DataScienceJobRuntimeManager(infrastructure).extract(
dsc_job
)
ads_pipeline_step.set_spec(
ads_pipeline_step.CONST_RUNTIME, runtime
)
else:
for attribute in step[key]:
ads_pipeline_step.set_spec(attribute, step[key][attribute])
else:
ads_pipeline_step.set_spec(key, step[key])
return ads_pipeline_step
[docs]
def create(self, step_details: List, delete_if_fail: bool) -> str:
"""Creates an OCI pipeline.
Parameters
----------
step_details: list
List of pipeline step details.
Returns
-------
str:
The id of OCI pipeline.
"""
response = self.client.create_pipeline(
self.to_oci_model(oci.data_science.models.CreatePipelineDetails)
)
self.update_from_oci_model(response.data)
try:
self.upload_artifact(step_details)
except Exception as ex:
if delete_if_fail:
self.delete(self.id)
raise ex
self.step_details = step_details
return self
[docs]
def upload_artifact(self, step_details: List) -> "DataSciencePipeline":
"""Uploads artifacts to pipeline.
Parameters
----------
step_details: list
List of pipeline step details.
Returns
-------
DataSciencePipeline:
DataSciencePipeline instance.
"""
for step in step_details:
if step.runtime:
payload = DataScienceJobRuntimeManager(step.infrastructure).translate(
step.runtime
)
target_artifact = payload["artifact"]
if issubclass(target_artifact.__class__, Artifact):
with target_artifact as artifact:
self.create_step_artifact(artifact.path, step.name)
else:
self.create_step_artifact(target_artifact, step.name)
return self
[docs]
def create_step_artifact(
self, artifact_path: str, step_name: str
) -> "DataSciencePipeline":
"""Creates step artifact.
Parameters
----------
artifact_path: str
Local path to artifact.
step_name: str
Pipeline step name.
Returns
-------
DataSciencePipeline:
DataSciencePipeline instance.
"""
with fsspec.open(artifact_path, "rb") as f:
self.client.create_step_artifact(
self.id,
step_name,
f,
content_disposition=f"attachment; filename={os.path.basename(artifact_path)}",
)
return self
[docs]
def run(
self, pipeline_details: Dict, service_logging: OCILog = None
) -> "PipelineRun":
"""Runs an OCI pipeline.
Parameters
----------
pipeline_details: dict
A dictionary that contains pipeline details.
service_logging: OCILog instance.
The OCILog instance.
Returns
-------
PipelineRun:
PipelineRun instance.
"""
data_science_pipeline_run = PipelineRun(**pipeline_details)
if service_logging:
data_science_pipeline_run._set_service_logging_resource(service_logging)
data_science_pipeline_run.create()
return data_science_pipeline_run
[docs]
def delete(
self,
id: str,
operation_kwargs: Dict = DEFAULT_OPERATION_KWARGS,
waiter_kwargs: Dict = DEFAULT_WAITER_KWARGS,
) -> "DataSciencePipeline":
"""Deletes an OCI pipeline.
Parameters
----------
id: str
The ocid of pipeline.
Parameters
----------
operation_kwargs: dict, optional
The operational kwargs to be executed when deleting the pipeline.
Defaults to: {"delete_related_pipeline_runs": True, "delete_related_job_runs": True},
which will delete the corresponding pipeline runs and job runs.
The allowed keys are:
* "delete_related_pipeline_runs": bool, to specify whether to delete related
PipelineRuns or not.
* "delete_related_job_runs": bool, to specify whether to delete related JobRuns or not.
* "allow_control_chars": bool, to indicate whether or not this request should
allow control characters in the response object. By default, the response will not
allow control characters in strings
* "retry_strategy": obj, to apply to this specific operation/call. This will
override any retry strategy set at the client-level. This should be one of the
strategies available in the :py:mod:`~oci.retry` module. This operation will not retry
by default, users can also use the convenient :py:data:`~oci.retry.DEFAULT_RETRY_STRATEGY`
provided by the SDK to enable retries for it. The specifics of the default retry strategy
are described `here <https://docs.oracle.com/en-us/iaas/tools/python/latest/sdk_behaviors/retries.html>`__.
To have this operation explicitly not perform any retries, pass an instance of :py:class:`~oci.retry.NoneRetryStrategy`.
* "if_match": str, for optimistic concurrency control. In the PUT or DELETE call
for a resource, set the `if-match` parameter to the value of the etag from a previous
GET or POST response for that resource. The resource is updated or deleted only if the
`etag` you provide matches the resource's current `etag` value.
* "opc_request_id": str, unique Oracle assigned identifier for the request. If you need
to contact Oracle about a particular request, then provide the request ID.
waiter_kwargs: dict, optional
The waiter kwargs to be passed when deleting the pipeline.
Defaults to: {"max_wait_seconds": 1800}, which will allow a maximum wait time to 1800 seconds to delete the pipeline.
The allowed keys are:
* "max_wait_seconds": int, the maximum time to wait, in seconds.
* "max_interval_seconds": int, the maximum interval between queries, in seconds.
* "succeed_on_not_found": bool, to determine whether or not the waiter should return
successfully if the data we're waiting on is not found (e.g. a 404 is returned from the service).
This defaults to False and so a 404 would cause an exception to be thrown by this function.
Setting it to True may be useful in scenarios when waiting for a resource to be
terminated/deleted since it is possible that the resource would not be returned by the a GET call anymore.
* "wait_callback": A function which will be called each time that we have to do an initial
wait (i.e. because the property of the resource was not in the correct state,
or the ``evaluate_response`` function returned False). This function should take two
arguments - the first argument is the number of times we have checked the resource,
and the second argument is the result of the most recent check.
* "fetch_func": A function to be called to fetch the updated state from the server.
This can be used if the call to check for state needs to be more complex than a single
GET request. For example, if the goal is to wait until an item appears in a list,
fetch_func can be a function that paginates through a full list on the server.
Returns
-------
DataSciencePipeline:
DataSciencePipeline instance.
"""
self.client_composite.delete_pipeline_and_wait_for_state(
pipeline_id=id,
wait_for_states=[
oci.data_science.models.WorkRequest.STATUS_SUCCEEDED,
oci.data_science.models.WorkRequest.STATUS_FAILED,
],
operation_kwargs=operation_kwargs,
waiter_kwargs=waiter_kwargs,
)
return self.sync()