#!/usr/bin/env python
# -*- coding: utf-8; -*-
# Copyright (c) 2022, 2024 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
import copy
from typing import List
from ads.jobs import Job
from ads.jobs.builders.infrastructure.dsc_job import DataScienceJob
from ads.jobs.builders.runtimes.base import Runtime
from ads.common.utils import get_random_name_for_resource
PIPELINE_STEP_KIND_TO_OCI_MAP = {
"dataScienceJob": "ML_JOB",
"customScript": "CUSTOM_SCRIPT",
}
PIPELINE_STEP_KIND_FROM_OCI_MAP = {
"ML_JOB": "dataScienceJob",
"CUSTOM_SCRIPT": "customScript",
}
PIPELINE_STEP_KIND = {"dataScienceJob", "customScript"}
PIPELINE_STEP_RESTRICTED_CHAR_SET = {",", ">", "(", ")"}
[docs]
class PipelineStep(Job):
"""Represents the Data Science Machine Learning Pipeline Step."""
CONST_NAME = "name"
CONST_JOB_ID = "jobId"
CONST_DESCRIPTION = "description"
CONST_DEPENDS_ON = "dependsOn"
CONST_KIND = "stepType"
CONST_MAXIMUM_RUNTIME_IN_MINUTES = "maximumRuntimeInMinutes"
CONST_ENVIRONMENT_VARIABLES = "environmentVariables"
CONST_COMMAND_LINE_ARGUMENTS = "commandLineArguments"
CONST_STEP_INFRA_CONFIG_DETAILS = "stepInfrastructureConfigurationDetails"
CONST_STEP_CONFIG_DETAILS = "stepConfigurationDetails"
CONST_INFRASTRUCTURE = "infrastructure"
CONST_RUNTIME = "runtime"
def __init__(
self,
name: str = None,
job_id: str = None,
infrastructure=None,
runtime=None,
description=None,
maximum_runtime_in_minutes=None,
environment_variable=None,
command_line_argument=None,
kind=None,
) -> None:
"""Initialize a pipeline step.
Parameters
----------
name : str, required
The name of the pipeline step.
job_id : str, optional
The job id of the pipeline step, by default None.
infrastructure : Infrastructure, optional
Pipeline step infrastructure, by default None.
runtime : Runtime, optional
Pipeline step runtime, by default None.
description : str, optional
The description for pipeline step, by default None.
maximum_runtime_in_minutes : int, optional
The maximum runtime in minutes for pipeline step, by default None.
environment_variable : dict, optional
The environment variable for pipeline step, by default None.
command_line_argument : str, optional
The command line argument for pipeline step, by default None.
kind: str, optional
The kind of pipeline step.
Attributes
----------
kind: str
The kind of the object as showing in YAML.
name: str
The name of pipeline step.
job_id: str
The job id of pipeline step.
infrastructure: DataScienceJob
The infrastructure of pipeline step.
runtime: Runtime
The runtime of pipeline step.
description: str
The description of pipeline step.
maximum_runtime_in_minutes: int
The maximum runtime in minutes of pipeline step.
environment_variable: dict
The environment variables of pipeline step.
argument: str
The argument of pipeline step.
depends_on: list
The depends on of pipeline step.
Methods
-------
with_job_id(self, job_id: str) -> PipelineStep
Sets the job id for pipeline step.
with_infrastructure(self, infrastructure) -> PipelineStep
Sets the infrastructure for pipeline step.
with_runtime(self, runtime) -> PipelineStep
Sets the runtime for pipeline step.
with_description(self, description: str) -> PipelineStep
Sets the description for pipeline step.
with_maximum_runtime_in_minutes(self, maximum_runtime_in_minutes: int) -> PipelineStep
Sets the maximum runtime in minutes for pipeline step.
with_environment_variable(self, **kwargs) -> PipelineStep
Sets the environment variables for pipeline step.
with_argument(self, *args, **kwargs) -> PipelineStep
Sets the command line arguments for pipeline step.
with_kind(self, kind: str) -> PipelineStep
Sets the kind for pipeline step.
to_dict(self) -> dict
Serializes the pipeline step specification dictionary.
from_dict(cls, config: dict) -> PipelineStep
Initializes a PipelineStep from a dictionary containing the configurations.
to_yaml(self, uri=None, **kwargs)
Returns PipelineStep serialized as a YAML string
from_yaml(cls, yaml_string=None, uri=None, **kwargs)
Creates an PipelineStep from YAML string provided or from URI location containing YAML string
Example
-------
Here is an example for defining a pipeline step using builder:
.. code-block:: python
from ads.pipeline import PipelineStep, CustomScriptStep, ScriptRuntime
# Define an OCI Data Science pipeline step to run a python script
pipeline_step = (
PipelineStep(name="<pipeline_step_name>")
.with_infrastructure(
CustomScriptStep()
.with_shape_name("VM.Standard2.1")
.with_block_storage_size(50)
)
.with_runtime(
ScriptRuntime()
.with_source("oci://bucket_name@namespace/path/to/script.py")
.with_service_conda("tensorflow26_p37_cpu_v2")
.with_environment_variable(ENV="value")
.with_argument("argument", key="value")
.with_maximum_runtime_in_minutes(200)
)
)
# Another way to define an OCI Data Science pipeline step from existing job
pipeline_step = (
PipelineStep(name="<pipeline_step_name>")
.with_job_id("<job_id>")
.with_description("<description>")
)
See Also
--------
https://docs.oracle.com/en-us/iaas/tools/ads-sdk/latest/user_guide/pipeline/index.html
"""
self.attribute_set = {
"jobId",
"stepType",
"stepName",
"description",
"dependsOn",
"stepInfrastructureConfigurationDetails",
"stepConfigurationDetails",
}
super().__init__()
if not name:
name = get_random_name_for_resource()
elif any(char in PIPELINE_STEP_RESTRICTED_CHAR_SET for char in name):
raise ValueError(
"PipelineStep name can not include any of the "
f"restricted characters in "
f"{''.join(PIPELINE_STEP_RESTRICTED_CHAR_SET)}."
)
self.set_spec("name", name)
if job_id:
self.with_job_id(job_id)
elif infrastructure and runtime:
self.with_infrastructure(infrastructure)
self.with_runtime(runtime)
if maximum_runtime_in_minutes:
self.with_maximum_runtime_in_minutes(maximum_runtime_in_minutes)
if environment_variable:
self.with_environment_variable(**environment_variable)
if command_line_argument:
self.with_argument(command_line_argument)
if description:
self.with_description(description)
if kind:
self.with_kind(kind)
@property
def name(self) -> str:
"""The name of pipeline step.
Returns
-------
str
The name of the pipeline step.
"""
return self.get_spec(self.CONST_NAME)
@property
def kind(self) -> str:
"""The kind of the object as showing in YAML.
Returns
-------
str
The kind of the object as showing in YAML.
"""
return self.get_spec(self.CONST_KIND)
@property
def job_id(self) -> str:
"""The job id of the pipeline step.
Returns
-------
str
The job id of the pipeline step.
"""
return self.get_spec(self.CONST_JOB_ID)
[docs]
def with_job_id(self, job_id: str) -> "PipelineStep":
"""Sets the job id for pipeline step.
Parameters
----------
job_id : str
The job id of pipeline step.
Returns
-------
Pipeline step instance (self).
"""
if not self.kind:
self.set_spec(self.CONST_KIND, "ML_JOB")
return self.set_spec(self.CONST_JOB_ID, job_id)
@property
def infrastructure(self) -> "DataScienceJob":
"""The infrastructure of the pipeline step.
Returns
-------
DataScienceJob :
Data science pipeline step instance.
"""
return self.get_spec(self.CONST_INFRASTRUCTURE)
[docs]
def with_infrastructure(self, infrastructure) -> "PipelineStep":
"""Sets the infrastructure for pipeline step.
Parameters
----------
infrastructure :
The infrastructure of pipeline step.
Returns
-------
Pipeline step instance (self).
"""
if not self.kind:
self.set_spec(self.CONST_KIND, "CUSTOM_SCRIPT")
return self.set_spec(self.CONST_INFRASTRUCTURE, infrastructure)
@property
def runtime(self) -> "Runtime":
"""The runtime of the pipeline step.
Returns
-------
Runtime :
Runtime instance.
"""
return self.get_spec(self.CONST_RUNTIME)
[docs]
def with_runtime(self, runtime) -> "PipelineStep":
"""Sets the runtime for pipeline step.
Parameters
----------
runtime :
The runtime of pipeline step.
Returns
-------
Pipeline step instance (self).
"""
if not self.kind:
self.set_spec(self.CONST_KIND, "CUSTOM_SCRIPT")
return self.set_spec(self.CONST_RUNTIME, runtime)
@property
def description(self) -> str:
"""The description of the pipeline step.
Returns
-------
str
The description of the pipeline step.
"""
return self.get_spec(self.CONST_DESCRIPTION)
[docs]
def with_description(self, description: str) -> "PipelineStep":
"""Sets the description for pipeline step.
Parameters
----------
description : str
The description of pipeline step.
Returns
-------
Pipeline step instance (self).
"""
return self.set_spec(self.CONST_DESCRIPTION, description)
@property
def maximum_runtime_in_minutes(self) -> int:
"""The maximum runtime in minutes of pipeline step.
Returns
-------
int
The maximum runtime in minutes of the pipeline step.
"""
return self.get_spec(self.CONST_MAXIMUM_RUNTIME_IN_MINUTES)
[docs]
def with_maximum_runtime_in_minutes(
self, maximum_runtime_in_minutes: int
) -> "PipelineStep":
"""Sets the maximum runtime in minutes of pipeline step.
Parameters
----------
maximum_runtime_in_minutes : int
The maximum runtime in minutes of pipeline step.
Returns
-------
Pipeline step instance (self).
"""
return self.set_spec(
self.CONST_MAXIMUM_RUNTIME_IN_MINUTES, maximum_runtime_in_minutes
)
@property
def environment_variable(self) -> dict:
"""The environment variables of the pipeline step.
Returns
-------
dict:
The environment variables of the pipeline step.
"""
return self.get_spec(self.CONST_ENVIRONMENT_VARIABLES)
[docs]
def with_environment_variable(self, **kwargs) -> "PipelineStep":
"""Sets environment variables of the pipeline step.
Parameters
----------
kwargs:
Keyword arguments.
To add a keyword argument without value, set the value to None.
Returns
-------
Pipeline
The Pipeline step instance (self).
"""
if kwargs:
environment_variable_dict = {}
for k, v in kwargs.items():
environment_variable_dict[k] = v
self.set_spec(self.CONST_ENVIRONMENT_VARIABLES, environment_variable_dict)
return self
@property
def argument(self) -> str:
"""The command line arguments of the pipeline step.
Returns
-------
str:
The command line arguments of the pipeline step.
"""
return self.get_spec(self.CONST_COMMAND_LINE_ARGUMENTS)
[docs]
def with_argument(self, *args, **kwargs) -> "PipelineStep":
"""Adds command line arguments to the pipeline step.
Existing arguments will be preserved.
This method can be called (chained) multiple times to add various arguments.
For example, pipeline.with_argument(key="val").with_argument("path/to/file") will result in:
"--key val path/to/file"
Parameters
----------
args:
Positional arguments.
In a single method call, positional arguments are always added before keyword arguments.
You can call with_argument() to add positional arguments after keyword arguments.
kwargs:
Keyword arguments.
To add a keyword argument without value, set the value to None.
Returns
-------
Pipeline
The Pipeline step instance (self).
Raises
------
ValueError
Keyword arguments with space in a key.
"""
arg_values = self.get_spec(self.CONST_COMMAND_LINE_ARGUMENTS, [])
args = [str(arg) for arg in args]
arg_values.extend(args)
for k, v in kwargs.items():
if " " in k:
raise ValueError("Argument key %s cannot contain space.", str(k))
arg_values.append(f"--{str(k)}")
# Ignore None value
if v is None:
continue
arg_values.append(str(v))
arg_string = " ".join(arg_values)
self.set_spec(self.CONST_COMMAND_LINE_ARGUMENTS, arg_string)
return self
@property
def depends_on(self) -> list:
"""The list of upstream pipeline steps for (self).
Returns
-------
list
The list of upstream pipeline steps for (self).
"""
return self.get_spec(self.CONST_DEPENDS_ON)
def _with_depends_on(self, depends_on: List["PipelineStep"]) -> "PipelineStep":
"""Sets the list of upstream pipeline steps for (self).
Parameters
----------
depends_on : list of PipelineStep objects
The list of pipeline steps that (self) depends on.
Returns
-------
Pipeline step instance (self).
"""
if not depends_on:
return self.set_spec(self.CONST_DEPENDS_ON, [])
step_list = []
for step in depends_on:
step_list.append(step.name)
return self.set_spec(self.CONST_DEPENDS_ON, step_list)
[docs]
def with_kind(self, kind: str) -> "PipelineStep":
"""Sets the kind of pipeline step.
Parameters
----------
kind : str
The kind of pipeline step.
Returns
-------
Pipeline step instance (self).
"""
if kind in PIPELINE_STEP_KIND:
self.set_spec(self.CONST_KIND, PIPELINE_STEP_KIND_TO_OCI_MAP[kind])
else:
raise ValueError(
"Invalid PipelineStep kind. The allowed "
f"values are {', '.join(PIPELINE_STEP_KIND)}."
)
return self.set_spec(self.CONST_KIND, kind)
[docs]
def to_dict(self) -> dict:
"""Serializes the pipeline step specification dictionary.
Returns
-------
dict
A dictionary containing pipeline step specification.
"""
dict_details = copy.deepcopy(super().to_dict())
if self.kind in PIPELINE_STEP_KIND_FROM_OCI_MAP:
dict_details["kind"] = PIPELINE_STEP_KIND_FROM_OCI_MAP[self.kind]
# remove information not going to show in to_dict()
if self.CONST_INFRASTRUCTURE in dict_details["spec"]:
dict_details["spec"][self.CONST_INFRASTRUCTURE].pop("type", None)
dict_details["spec"][self.CONST_INFRASTRUCTURE]["spec"].pop(
"jobInfrastructureType", None
)
dict_details["spec"][self.CONST_INFRASTRUCTURE]["spec"].pop("jobType", None)
if self.job_id:
dict_details["spec"][self.CONST_JOB_ID] = self.job_id
if self.description:
dict_details["spec"][self.CONST_DESCRIPTION] = self.description
if self.kind == "ML_JOB":
if self.environment_variable:
dict_details["spec"][
self.CONST_ENVIRONMENT_VARIABLES
] = self.environment_variable
if self.argument:
dict_details["spec"][self.CONST_COMMAND_LINE_ARGUMENTS] = self.argument
if self.maximum_runtime_in_minutes:
dict_details["spec"][
self.CONST_MAXIMUM_RUNTIME_IN_MINUTES
] = self.maximum_runtime_in_minutes
dict_details["spec"].pop(self.CONST_DEPENDS_ON, None)
return dict_details
[docs]
@classmethod
def from_dict(cls, config: dict) -> "PipelineStep":
"""Initializes a PipelineStep from a dictionary containing the configurations.
Parameters
----------
config : dict
A dictionary containing the infrastructure and runtime specifications.
Returns
-------
PipelineStep
A PipelineStep instance
Raises
------
NotImplementedError
If the type of the intrastructure or runtime is not supported.
"""
if not isinstance(config, dict):
raise ValueError("The config data for initializing the job is invalid.")
spec = config.get("spec")
mappings = {
"infrastructure": cls._INFRASTRUCTURE_MAPPING,
"runtime": cls._RUNTIME_MAPPING,
}
if spec["name"]:
pipeline_step = cls(name=spec["name"])
else:
raise ValueError("PipelineStep name must be specified.")
if config.get("kind", None):
step_kind = config.get("kind")
if step_kind in PIPELINE_STEP_KIND:
pipeline_step.set_spec(
cls.CONST_KIND, PIPELINE_STEP_KIND_TO_OCI_MAP[step_kind]
)
else:
raise ValueError(
"Invalid PipelineStep kind. The allowed "
f"values are {', '.join(PIPELINE_STEP_KIND)}."
)
else:
pipeline_step.set_spec(cls.CONST_KIND, "ML_JOB")
for key, value in spec.items():
if key in mappings:
mapping = mappings[key]
child_config = copy.deepcopy(value)
if key == "infrastructure":
child_config["type"] = "dataScienceJob"
if child_config.get("type") not in mapping:
raise NotImplementedError(
f"{key.title()} type: {child_config.get('type')} is not supported."
)
pipeline_step.set_spec(
key, mapping[child_config.get("type")].from_dict(child_config)
)
elif key == cls.CONST_STEP_CONFIG_DETAILS:
for attribute in value:
pipeline_step.set_spec(attribute, value[attribute])
else:
pipeline_step.set_spec(key, value)
return pipeline_step