ads.pipeline package
Subpackages
- ads.pipeline.builders package
- ads.pipeline.schema package
- ads.pipeline.visualizer package
- Submodules
- ads.pipeline.visualizer.base module
GraphOrientation
PipelineRenderer
PipelineVisualizer
PipelineVisualizer.pipeline
PipelineVisualizer.pipeline_run
PipelineVisualizer.steps
PipelineVisualizer.deps
PipelineVisualizer.step_status
PipelineVisualizer.render()
PipelineVisualizer.to_svg()
PipelineVisualizer.with_pipeline()
PipelineVisualizer.with_pipeline_run()
PipelineVisualizer.with_renderer()
PipelineVisualizerError
RendererItem
RendererItemStatus
RendererItemStatus.duration
RendererItemStatus.format_datetime()
RendererItemStatus.from_pipeline_run()
RendererItemStatus.from_pipeline_step_run()
RendererItemStatus.key
RendererItemStatus.kind
RendererItemStatus.lifecycle_details
RendererItemStatus.lifecycle_state
RendererItemStatus.name
RendererItemStatus.time_finished
RendererItemStatus.time_started
StepKind
StepStatus
- ads.pipeline.visualizer.graph_renderer module
- ads.pipeline.visualizer.text_renderer module
- Module contents
Submodules
ads.pipeline.ads_pipeline module
- class ads.pipeline.ads_pipeline.DataSciencePipeline(config: Optional[dict] = None, signer: Optional[Signer] = None, client_kwargs: Optional[dict] = None, **kwargs)
Bases:
OCIDataScienceMixin
,Pipeline
Initializes a service/resource with OCI client as a property. If config or signer is specified, it will be used to initialize the OCI client. If neither of them is specified, the client will be initialized with ads.common.auth.default_signer. If both of them are specified, both of them will be passed into the OCI client,
and the authentication will be determined by OCI Python SDK.
- Parameters:
config (dict, optional) – OCI API key config dictionary, by default None.
signer (oci.signer.Signer, optional) – OCI authentication signer, by default None.
client_kwargs (dict, optional) – Additional keyword arguments for initializing the OCI client.
- build_ads_pipeline() Pipeline
Builds an ADS pipeline from OCI datascience pipeline.
- Returns:
ADS Pipeline instance.
- Return type:
- build_ads_pipeline_step(step: Dict) PipelineStep
Builds an ADS pipeline step from OCI pipeline response.
- Parameters:
step (dict) – A dictionary that contains the information of a pipeline step.
- Returns:
ADS PipelineStep instance.
- Return type:
- create(step_details: List, delete_if_fail: bool) str
Creates an OCI pipeline.
- Parameters:
step_details (list) – List of pipeline step details.
- Returns:
The id of OCI pipeline.
- Return type:
str
- create_step_artifact(artifact_path: str, step_name: str) DataSciencePipeline
Creates step artifact.
- Parameters:
artifact_path (str) – Local path to artifact.
step_name (str) – Pipeline step name.
- Returns:
DataSciencePipeline instance.
- Return type:
- delete(id: str, operation_kwargs: Dict = {'delete_related_job_runs': True, 'delete_related_pipeline_runs': True}, waiter_kwargs: Dict = {'max_wait_seconds': 1800}) DataSciencePipeline
Deletes an OCI pipeline.
- Parameters:
id (str) – The ocid of pipeline.
operation_kwargs (dict, optional) –
The operational kwargs to be executed when deleting the pipeline. Defaults to: {“delete_related_pipeline_runs”: True, “delete_related_job_runs”: True}, which will delete the corresponding pipeline runs and job runs.
The allowed keys are: * “delete_related_pipeline_runs”: bool, to specify whether to delete related PipelineRuns or not. * “delete_related_job_runs”: bool, to specify whether to delete related JobRuns or not. * “allow_control_chars”: bool, to indicate whether or not this request should allow control characters in the response object. By default, the response will not allow control characters in strings * “retry_strategy”: obj, to apply to this specific operation/call. This will override any retry strategy set at the client-level. This should be one of the strategies available in the
retry
module. This operation will not retry by default, users can also use the convenientDEFAULT_RETRY_STRATEGY
provided by the SDK to enable retries for it. The specifics of the default retry strategy are described here. To have this operation explicitly not perform any retries, pass an instance ofNoneRetryStrategy
. * “if_match”: str, for optimistic concurrency control. In the PUT or DELETE call for a resource, set the if-match parameter to the value of the etag from a previous GET or POST response for that resource. The resource is updated or deleted only if the etag you provide matches the resource’s current etag value. * “opc_request_id”: str, unique Oracle assigned identifier for the request. If you need to contact Oracle about a particular request, then provide the request ID.waiter_kwargs (dict, optional) – The waiter kwargs to be passed when deleting the pipeline. Defaults to: {“max_wait_seconds”: 1800}, which will allow a maximum wait time to 1800 seconds to delete the pipeline. The allowed keys are: * “max_wait_seconds”: int, the maximum time to wait, in seconds. * “max_interval_seconds”: int, the maximum interval between queries, in seconds. * “succeed_on_not_found”: bool, to determine whether or not the waiter should return successfully if the data we’re waiting on is not found (e.g. a 404 is returned from the service). This defaults to False and so a 404 would cause an exception to be thrown by this function. Setting it to True may be useful in scenarios when waiting for a resource to be terminated/deleted since it is possible that the resource would not be returned by the a GET call anymore. * “wait_callback”: A function which will be called each time that we have to do an initial wait (i.e. because the property of the resource was not in the correct state, or the
evaluate_response
function returned False). This function should take two arguments - the first argument is the number of times we have checked the resource, and the second argument is the result of the most recent check. * “fetch_func”: A function to be called to fetch the updated state from the server. This can be used if the call to check for state needs to be more complex than a single GET request. For example, if the goal is to wait until an item appears in a list, fetch_func can be a function that paginates through a full list on the server.
- Returns:
DataSciencePipeline instance.
- Return type:
- classmethod from_ocid(ocid: str) DataSciencePipeline
Gets a datascience pipeline by OCID.
- Parameters:
ocid (str) – The OCID of the datascience pipeline.
- Returns:
An instance of DataSciencePipeline.
- Return type:
- run(pipeline_details: Dict, service_logging: Optional[OCILog] = None) PipelineRun
Runs an OCI pipeline.
- Parameters:
pipeline_details (dict) – A dictionary that contains pipeline details.
service_logging (OCILog instance.) – The OCILog instance.
- Returns:
PipelineRun instance.
- Return type:
- upload_artifact(step_details: List) DataSciencePipeline
Uploads artifacts to pipeline.
- Parameters:
step_details (list) – List of pipeline step details.
- Returns:
DataSciencePipeline instance.
- Return type:
- class ads.pipeline.ads_pipeline.Pipeline(name: Optional[str] = None, spec: Optional[Dict] = None, **kwargs)
Bases:
Builder
Represents a Data Science Machine Learning Pipeline.
Initialize a pipeline.
- Parameters:
name (str) – The name of the pipeline, default to None. If a name is not provided, a randomly generated easy to remember name with timestamp will be generated, like ‘strange-spider-2022-08-17-23:55.02’.
spec (dict, optional) – Object specification, default to None
kwargs (dict) –
Specification as keyword arguments. If spec contains the same key as the one in kwargs, the value from kwargs will be used.
project_id: str
compartment_id: str
display_name: str
description: str
maximum_runtime_in_minutes: int
environment_variables: dict(str, str)
command_line_arguments: str
log_id: str
log_group_id: str
enable_service_log: bool
shape_name: str
block_storage_size_in_gbs: int
shape_config_details: dict
step_details: list[PipelineStep]
dag: list[str]
defined_tags: dict(str, dict(str, object))
freeform_tags: dict[str, str]
- kind
The kind of the object as showing in YAML.
- Type:
str
- name
The name of pipeline.
- Type:
str
- id
The id of pipeline.
- Type:
str
- step_details
The step details of pipeline.
- Type:
List[PipelineStep]
- dag_details
The dag details of pipeline.
- Type:
List[str]
- log_group_id
The log group id of pipeline.
- Type:
str
- log_id
The log id of pipeline.
- Type:
str
- project_id
The project id of pipeline.
- Type:
str
- compartment_id
The compartment id of pipeline.
- Type:
str
- created_by
The created by of pipeline.
- Type:
str
- description
The description of pipeline.
- Type:
str
- environment_variable
The environment variables of pipeline.
- Type:
dict
- argument
The command line argument of pipeline.
- Type:
str
- maximum_runtime_in_minutes
The maximum runtime in minutes of pipeline.
- Type:
int
- shape_name
The shape name of pipeline infrastructure.
- Type:
str
- block_storage_size_in_gbs
The block storage of pipeline infrastructure.
- Type:
int
- shape_config_details
The shape config details of pipeline infrastructure.
- Type:
dict
- enable_service_log
The value to enable service log or not.
- Type:
bool
- service_log_id
The service log id of pipeline.
- Type:
str
- status
The status of the pipeline.
- Type:
str
- with_step_details(self, step_details: List[PipelineStep]) Pipeline
Sets the step details of pipeline.
- with_maximum_runtime_in_minutes(self, maximum_runtime_in_minutes: int) Pipeline
Sets the maximum runtime in minutes of pipeline.
- with_block_storage_size_in_gbs(self, block_storage_size_in_gbs: int) Pipeline
Sets the block storage size of pipeline infrastructure.
- with_shape_config_details(self, shape_config_details: Dict) Pipeline
Sets the shape config details of pipeline infrastructure.
- with_enable_service_log(self, enable_service_log: bool) Pipeline
Sets the value to enable the service log of pipeline.
- to_dict(self) dict:
Serializes the pipeline specifications to a dictionary.
- from_dict(cls, obj_dict: dict):
Initializes the object from a dictionary.
- show(self, rankdir: str = GraphOrientation.TOP_BOTTOM)
Render pipeline with step information in a graph.
- to_svg(self, uri: str = None, rankdir: str = GraphOrientation.TOP_BOTTOM, \*\*kwargs) str:
Renders pipeline as graph into SVG.
- run(self, display_name: Optional[str] = None, project_id: Optional[str] = None, compartment_id: Optional[str] = None, configuration_override_details: Optional[dict] = None, log_configuration_override_details: Optional[dict] = None, step_override_details: Optional[list] = None, free_form_tags: Optional[dict] = None, defined_tags: Optional[dict] = None, system_tags: Optional[dict] = None) PipelineRun
Creates and/or overrides an ADS pipeline run.
- delete(self, delete_related_pipeline_runs: Optional[bool] = True, delete_related_job_runs: Optional[bool] = True, max_wait_seconds: Optional[int] = MAXIMUM_TIMEOUT, \*\*kwargs) Pipeline
Deletes an ADS pipeline run.
- to_yaml(self, uri=None, \*\*kwargs)
Returns Pipeline serialized as a YAML string
- from_yaml(cls, yaml_string=None, uri=None, \*\*kwargs)
Creates an Pipeline from YAML string provided or from URI location containing YAML string
- list(cls, compartment_id: Optional[str] = None, \*\*kwargs) List[Pipeline]
List pipelines in a given compartment.
- run_list(self, \*\*kwargs) List[PipelineRun]
Gets a list of runs of the pipeline.
Example
Here is an example for creating and running a pipeline using builder:
from ads.pipeline import Pipeline, CustomScriptStep, ScriptRuntime # Define an OCI Data Science pipeline pipeline = ( Pipeline(name="<pipeline_name>") .with_compartment_id("<compartment_id>") .with_project_id("<project_id>") .with_log_group_id("<log_group_id>") .with_log_id("<log_id>") .with_description("<description>") .with_maximum_runtime_in_minutes(200) .with_argument("argument", key="value") .with_environment_variable(env="value") .with_freeform_tags({"key": "value"}) .with_step_details([ ( PipelineStep(name="PipelineStepOne") .with_job_id("<job_id>") .with_description("<description>") ), ( PipelineStep(name="PipelineStepTwo") .with_infrastructure( CustomScriptStep() .with_shape_name("VM.Standard2.1") .with_block_storage_size(50) ) .with_runtime( ScriptRuntime() .with_source("oci://bucket_name@namespace/path/to/script.py") .with_service_conda("tensorflow26_p37_cpu_v2") .with_environment_variable(ENV="value") .with_argument("argument", key="value") .with_maximum_runtime_in_minutes(200) ) ) ]) .with_dag_details(["PipelineStepOne >> PipelineStepTwo"]) ) # Create and Run the pipeline run = pipeline.create().run() # Stream the pipeline run outputs run.watch()
See also
https
//docs.oracle.com/en-us/iaas/tools/ads-sdk/latest/user_guide/pipeline/index.html
- CONST_BLOCK_STORAGE_SIZE = 'blockStorageSizeInGBs'
- CONST_COMMAND_LINE_ARGUMENTS = 'commandLineArguments'
- CONST_COMPARTMENT_ID = 'compartmentId'
- CONST_CONFIGURATION_DETAILS = 'configurationDetails'
- CONST_CONFIGURATION_OVERRIDE_DETAILS = 'configurationOverrideDetails'
- CONST_CREATED_BY = 'createdBy'
- CONST_DAG = 'dag'
- CONST_DEFINED_TAGS = 'definedTags'
- CONST_DESCRIPTION = 'description'
- CONST_DISPLAY_NAME = 'displayName'
- CONST_ENABLE_AUTO_LOG_CREATION = 'enableAutoLogCreation'
- CONST_ENABLE_LOGGING = 'enableLogging'
- CONST_ENABLE_SERVICE_LOG = 'enableServiceLog'
- CONST_ENVIRONMENT_VARIABLES = 'environmentVariables'
- CONST_FREEFROM_TAGS = 'freeformTags'
- CONST_ID = 'id'
- CONST_INFRA_CONFIG_DETAILS = 'infrastructureConfigurationDetails'
- CONST_LOG_CONFIGURATION_DETAILS = 'logConfigurationDetails'
- CONST_LOG_CONFIGURATION_OVERRIDE_DETAILS = 'logConfigurationOverrideDetails'
- CONST_LOG_GROUP_ID = 'logGroupId'
- CONST_LOG_ID = 'logId'
- CONST_MAXIMUM_RUNTIME_IN_MINUTES = 'maximumRuntimeInMinutes'
- CONST_MEMORY_IN_GBS = 'memoryInGBs'
- CONST_OCPUS = 'ocpus'
- CONST_PIPELINE_ID = 'pipelineId'
- CONST_PROJECT_ID = 'projectId'
- CONST_SERVICE = 'datascience'
- CONST_SERVICE_LOG_CATEGORY = 'pipelinerunlog'
- CONST_SERVICE_LOG_ID = 'serviceLogId'
- CONST_SHAPE_CONFIG_DETAILS = 'shapeConfigDetails'
- CONST_SHAPE_NAME = 'shapeName'
- CONST_STEP_DETAILS = 'stepDetails'
- CONST_STEP_OVERRIDE_DETAILS = 'stepOverrideDetails'
- CONST_SYSTEM_TAGS = 'systemTags'
- CONST_TYPE = 'type'
- LIFECYCLE_STATE_ACTIVE = 'ACTIVE'
- LIFECYCLE_STATE_CREATING = 'CREATING'
- LIFECYCLE_STATE_DELETED = 'DELETED'
- LIFECYCLE_STATE_DELETING = 'DELETING'
- LIFECYCLE_STATE_FAILED = 'FAILED'
- property argument: str
The command line arguments of the pipeline.
- Returns:
The command line arguments of the pipeline.
- Return type:
str
- property block_storage_size_in_gbs: int
The block storage size of pipeline infrastructure.
- Returns:
The block storage size of the pipeline infrastructure.
- Return type:
int
- property compartment_id: str
The compartment id of the pipeline.
- Returns:
The compartment id of the pipeline.
- Return type:
str
- create(delete_if_fail: bool = True) Pipeline
Creates an ADS pipeline.
- Returns:
The ADS Pipeline instance.
- Return type:
- property created_by: str
The id that creates the pipeline.
- Returns:
The id that creates the pipeline.
- Return type:
str
- property dag: List[str]
The dag details of the pipeline.
- Returns:
The dag details of the pipeline.
- Return type:
list
- delete(delete_related_pipeline_runs: Optional[bool] = True, delete_related_job_runs: Optional[bool] = True, max_wait_seconds: Optional[int] = 1800, **kwargs) Pipeline
Deteles an ADS pipeline.
- Parameters:
delete_related_pipeline_runs (bool, optional) – Specify whether to delete related PipelineRuns or not. Defaults to True.
delete_related_job_runs (bool, optional) – Specify whether to delete related JobRuns or not. Defaults to True.
max_wait_seconds (int, optional) – The maximum time to wait, in seconds. Defaults to 1800.
kwargs (optional) –
pipeline. (The kwargs to be executed when deleting the) –
are (The allowed keys) –
"allow_control_chars" (*) –
default (allow control characters in the response object. By) –
will (the response) –
strings. (not allow control characters in) –
"retry_strategy" (*) –
the (override any retry strategy set at the client-level. This should be one of) –
:param strategies available in the
retry
module. This operation will not: :param retry by default: :param users can also use the convenientDEFAULT_RETRY_STRATEGY
: :param provided by the SDK to enable retries for it. The specifics of the default retry: :param strategy are described here <https: :type strategy are described `here. :param To have this operation explicitly not perform any retries: :param pass an instance ofNoneRetryStrategy
.: :param * “if_match”: :type * “if_match”: str, for optimistic concurrency control. In the PUT or DELETE call :param for a resource: :param set the if-match parameter to the value of the etag from a: :param previous GET or POST response for that resource. The resource is updated or: :param deleted only if the etag you provide matches the resource’s current etag value.: :param * “opc_request_id”: :type * “opc_request_id”: str, unique Oracle assigned identifier for the request. :param If you need to contact Oracle about a particular request: :param then provide the request ID.: :param * “max_interval_seconds”: :type * “max_interval_seconds”: int, the maximum interval between queries, in seconds. :param * “succeed_on_not_found”: :type * “succeed_on_not_found”: bool, to determine whether or not the waiter should :param return successfully if the data we’re waiting on is not found: :param (e.g. a 404 is returned from the service). This defaults to False and so a 404 would: :param cause an exception to be thrown by this function. Setting it to True may be useful in: :param scenarios when waiting for a resource to be terminated/deleted since it is possible that: :param the resource would not be returned by the a GET call anymore.: :param * “wait_callback”: :type * “wait_callback”: A function which will be called each time that we have to do an initial :param wait (i.e. because the property of the resource was not in the correct state: :param : :param or theevaluate_response
function returned False). This function should take two: :param arguments - the first argument is the number of times we have checked the resource: :param : :param and the second argument is the result of the most recent check.: :param * “fetch_func”: :type * “fetch_func”: A function to be called to fetch the updated state from the server. :param This can be used if the call to check for state needs to be more complex than a single: :param GET request. For example: :param if the goal is to wait until an item appears in a list: :param : :param fetch_func can be a function that paginates through a full list on the server.:- Returns:
The ADS Pipeline instance.
- Return type:
- property description: str
The description of pipeline.
- Returns:
The description of pipeline.
- Return type:
str
- download(to_dir: str, override_if_exists: Optional[bool] = False) Pipeline
Downloads artifacts from pipeline.
- Parameters:
to_dir (str) – Local directory to which the artifacts will be downloaded to.
override_if_exists (bool, optional) – Bool to decide whether to override existing folder/file or not. Defaults to False.
- Returns:
The ADS Pipeline instance.
- Return type:
- property enable_service_log: bool
Enables service log of pipeline.
- Returns:
The bool value to enable service log of pipeline.
- Return type:
bool
- property environment_variable: dict
The environment variables of the pipeline.
- Returns:
The environment variables of the pipeline.
- Return type:
dict
- classmethod from_dict(obj_dict: dict)
Initializes the object from a dictionary.
- classmethod from_id(id: str) Pipeline
Creates a pipeline by OCID.
- Parameters:
id (str) – The OCID of pipeline.
- Returns:
The Pipeline instance.
- Return type:
- classmethod from_ocid(ocid: str) Pipeline
Creates a pipeline by OCID.
- Parameters:
ocid (str) – The OCID of pipeline.
- Returns:
The Pipeline instance.
- Return type:
- property id: str
The id of the pipeline.
- Returns:
The id of the pipeline.
- Return type:
str
- property kind: str
The kind of the object as showing in YAML.
- Returns:
pipeline
- Return type:
str
- classmethod list(compartment_id: Optional[str] = None, **kwargs) List[Pipeline]
List pipelines in a given compartment.
- Parameters:
compartment_id ((str, optional). Defaults to None.) – The OCID of compartment. If None, the value will be taken from the environment variables.
kwargs – Additional keyword arguments for filtering pipelines. - project_id: str - lifecycle_state: str. Allowed values: “CREATING”, “ACTIVE”, “DELETING”, “FAILED”, “DELETED” - created_by: str - limit: int
- Returns:
The list of pipelines.
- Return type:
List[Pipeline]
- property log_group_id: str
The log group id of the pipeline.
- Returns:
The log group id of the pipeline.
- Return type:
str
- property log_id: str
The log id of the pipeline.
- Returns:
The log id of the pipeline.
- Return type:
str
- property maximum_runtime_in_minutes: int
The maximum runtime in minutes of the pipeline.
- Returns:
The maximum runtime minutes of the pipeline.
- Return type:
int
- property name: str
The name of the pipeline.
- Returns:
The name of the pipeline.
- Return type:
str
- property project_id: str
The project id of the pipeline.
- Returns:
The project id of the pipeline.
- Return type:
str
- run(display_name: Optional[str] = None, project_id: Optional[str] = None, compartment_id: Optional[str] = None, configuration_override_details: Optional[dict] = None, log_configuration_override_details: Optional[dict] = None, step_override_details: Optional[list] = None, free_form_tags: Optional[dict] = None, defined_tags: Optional[dict] = None, system_tags: Optional[dict] = None) PipelineRun
Creates an ADS pipeline run.
- Parameters:
display_name (str, optional) – The display name to override the one defined previously. Defaults to None.
project_id (str, optional) – The project id to override the one defined previously. Defaults to None.
compartment_id (str, optional) – The compartment id to override the one defined previously. Defaults to None.
configuration_override_details (dict, optional) – The configuration details dictionary to override the one defined previously. Defaults to None. The configuration_override_details contains the following keys: * “type”: str, only “DEFAULT” is allowed. * “environment_variables”: dict, optional, the environment variables * “command_line_arguments”: str, optional, the command line arguments * “maximum_runtime_in_minutes”: int, optional, the maximum runtime allowed in minutes
log_configuration_override_details (dict(str, str), optional) – The log configuration details dictionary to override the one defined previously. Defaults to None. The log_configuration_override_details contains the following keys: * “log_group_id”: str, optional, the log group id * “log_id”: str, optional, the log id
step_override_details (list[PipelineStepOverrideDetails], optional) –
The step details list to override the one defined previously. Defaults to None. The PipelineStepOverrideDetails is a dict which contains the following keys: * step_name: str, the name of step to override * step_configuration_details: dict, which contains:
”maximum_runtime_in_minutes”: int, optional
”environment_variables”: dict, optional
”command_line_arguments”: str, optional
free_form_tags (dict(str, str), optional) – The free from tags dictionary to override the one defined previously. Defaults to None.
defined_tags (dict(str, dict(str, object)), optional) – The defined tags dictionary to override the one defined previously. Defaults to None.
system_tags (dict(str, dict(str, object)), optional) – The system tags dictionary to override the one defined previously. Defaults to None.
Example
# Creates a pipeline run using pipeline configurations pipeline.run() # Creates a pipeline run by overriding pipeline configurations pipeline.run( display_name="OverrideDisplayName", configuration_override_details={ "maximum_runtime_in_minutes":30, "type":"DEFAULT", "environment_variables": { "key": "value" }, "command_line_arguments": "ARGUMENT --KEY VALUE", }, log_configuration_override_details={ "log_group_id": "<log_group_id>" }, step_override_details=[{ "step_name" : "<step_name>", "step_configuration_details" : { "maximum_runtime_in_minutes": 200, "environment_variables": { "1":"2" }, "command_line_arguments": "argument --key value", } }] )
- Returns:
The ADS PipelineRun instance.
- Return type:
- run_list(**kwargs) List[PipelineRun]
Gets a list of runs of the pipeline.
- Returns:
A list of pipeline run instances.
- Return type:
List[PipelineRun]
- property service_log_id: str
The service log id of pipeline.
- Returns:
The service log id of pipeline.
- Return type:
str
- property shape_config_details: dict
The shape config details of pipeline infrastructure.
- Returns:
The shape config details of the pipeline infrastructure.
- Return type:
dict
- property shape_name: str
The shape name of pipeline infrastructure.
- Returns:
The shape name of the pipeline infrastructure.
- Return type:
str
- show(rankdir: str = 'TB') None
Render pipeline with step information in a graph
- Return type:
None
- property status: Optional[str]
Status of the pipeline.
- Returns:
Status of the pipeline.
- Return type:
str
- property step_details: List[PipelineStep]
The step details of the pipeline.
- Returns:
The step details of the pipeline.
- Return type:
list
- to_dict() dict
Serializes the pipeline specifications to a dictionary.
- Returns:
A dictionary containing pipeline specifications.
- Return type:
dict
- to_svg(uri: Optional[str] = None, rankdir: str = 'TB', **kwargs) str
Renders pipeline as graph in svg string.
- Parameters:
uri ((string, optional). Defaults to None.) – URI location to save the SVG string.
rankdir (str, default to "TB".) – Direction of the rendered graph; allowed Values are {“TB”, “LR”}.
- Returns:
Graph in svg format.
- Return type:
str
- with_argument(*args, **kwargs) Pipeline
Adds command line arguments to the pipeline. Existing arguments will be preserved. This method can be called (chained) multiple times to add various arguments. For example, pipeline.with_argument(key=”val”).with_argument(“path/to/file”) will result in: “–key val path/to/file”
- Parameters:
args – Positional arguments. In a single method call, positional arguments are always added before keyword arguments. You can call with_argument() to add positional arguments after keyword arguments.
kwargs – Keyword arguments. To add a keyword argument without value, set the value to None.
- Returns:
The Pipeline instance (self).
- Return type:
- Raises:
ValueError – Keyword arguments with space in a key.
- with_block_storage_size_in_gbs(block_storage_size_in_gbs: int) Pipeline
Sets the block storage size of pipeline infrastructure.
- Parameters:
block_storage_size_in_gbs (int) – The block storage size of pipeline infrastructure.
- Returns:
The Pipeline instance (self).
- Return type:
- with_compartment_id(compartment_id: str) Pipeline
Sets the compartment id of the pipeline.
- Parameters:
compartment_id (str) – The compartment id of the pipeline.
- Returns:
The Pipeline instance (self).
- Return type:
- with_created_by(created_by: str) Pipeline
Sets the id that creates the pipeline.
- Parameters:
created_by (str) – The id that creates the pipeline.
- Returns:
The Pipeline instance (self).
- Return type:
- with_dag(dag: List[str]) Pipeline
Sets the pipeline dag details for the pipeline.
- Parameters:
dag (list) – A list of dag representing step dependencies in the pipeline.
- Returns:
The Pipeline instance (self).
- Return type:
- with_defined_tags(defined_tags: Dict) Pipeline
Sets defined tags of the pipeline.
- Parameters:
defined_tags (dict) – The defined tags dictionary.
- Returns:
The Pipeline instance (self).
- Return type:
- with_description(description: str) Pipeline
Sets the description of the pipeline.
- Parameters:
description (str) – The description of the pipeline.
- Returns:
The Pipeline instance (self).
- Return type:
- with_enable_service_log(enable_service_log: bool) Pipeline
Sets the bool value to enable the service log of pipeline.
- Parameters:
enable_service_log (bool) – The value to enable the service log of pipeline.
- Returns:
The Pipeline instance (self).
- Return type:
- with_environment_variable(**kwargs) Pipeline
Sets environment variables of the pipeline.
- Parameters:
kwargs – Keyword arguments. To add a keyword argument without value, set the value to None.
- Returns:
The Pipeline instance (self).
- Return type:
- with_freeform_tags(freeform_tags: Dict) Pipeline
Sets freeform tags of the pipeline.
- Parameters:
freeform_tags (dict) – The freeform tags dictionary.
- Returns:
The Pipeline instance (self).
- Return type:
- with_id(id: str) Pipeline
Sets the id of pipeline.
- Parameters:
id (str) – The id of pipeline.
- Returns:
The Pipeline instance (self).
- Return type:
- with_log_group_id(log_group_id: str) Pipeline
Sets the log group id of the pipeline.
- Parameters:
log_group_id (str) – The log group id of the pipeline.
- Returns:
The Pipeline instance (self).
- Return type:
- with_log_id(log_id: str) Pipeline
Sets the log id of the pipeline.
- Parameters:
log_id (str) – The log id of the pipeline.
- Returns:
The Pipeline instance (self).
- Return type:
- with_maximum_runtime_in_minutes(maximum_runtime_in_minutes: int) Pipeline
Sets the maximum runtime in minutes of the pipeline.
- Parameters:
maximum_runtime_in_minutes (int) – The maximum_runtime_in_minutes of the pipeline.
- Returns:
The Pipeline instance (self).
- Return type:
- with_name(name: str) Pipeline
Sets the name of pipeline.
- Parameters:
name (str) – The name of pipeline.
- Returns:
The Pipeline instance (self).
- Return type:
- with_project_id(project_id: str) Pipeline
Sets the project id of the pipeline.
- Parameters:
project_id (str) – The project id of the pipeline.
- Returns:
The Pipeline instance (self).
- Return type:
- with_shape_config_details(memory_in_gbs: float, ocpus: float, **kwargs: Dict[str, Any]) Pipeline
Sets the shape config details of pipeline infrastructure. Specify only when a flex shape is selected. For example VM.Standard.E3.Flex allows the memory_in_gbs and cpu count to be specified.
- Parameters:
memory_in_gbs (float) – The size of the memory in GBs.
ocpus (float) – The OCPUs count.
kwargs – Additional keyword arguments.
- Returns:
The Pipeline instance (self).
- Return type:
- with_shape_name(shape_name: str) Pipeline
Sets the shape name of pipeline infrastructure.
- Parameters:
shape_name (str) – The shape name of the pipeline infrastructure.
- Returns:
The Pipeline instance (self).
- Return type:
- with_step_details(step_details: List[PipelineStep]) Pipeline
Sets the pipeline step details for the pipeline.
- Parameters:
step_details (list) – A list of steps in the pipeline.
- Returns:
The Pipeline instance (self).
- Return type:
ads.pipeline.ads_pipeline_run module
- exception ads.pipeline.ads_pipeline_run.LogNotConfiguredError
Bases:
Exception
- class ads.pipeline.ads_pipeline_run.LogType
Bases:
str
- CUSTOM_LOG = 'custom_log'
- SERVICE_LOG = 'service_log'
- class ads.pipeline.ads_pipeline_run.PipelineRun(config: Optional[dict] = None, signer: Optional[Signer] = None, client_kwargs: Optional[dict] = None, **kwargs)
Bases:
OCIDataScienceMixin
,PipelineRun
,RunInstance
- status
Returns Lifecycle status.
- Type:
str
- custom_logging
Returns the OCILog object containing the custom logs from the pipeline.
- Type:
- create(self) PipelineRun
Creates an OCI pipeline run.
- delete(self, delete_related_job_runs: Optional[bool] = True, max_wait_seconds: Optional[int] = MAXIMUM_TIMEOUT, \*\*kwargs) PipelineRun
Deletes an OCI pipeline run.
- cancel(self, maximum_timeout: int = MAXIMUM_TIMEOUT) PipelineRun
Cancels an OCI pipeline run.
- watch(self, steps: List[str] = None, interval: float = LOG_INTERVAL, log_type: str = LogType.CUSTOM_LOG, \*args) PipelineRun
Watches the pipeline run until it finishes.
- list(cls, pipeline_id: str, compartment_id: Optional[str] = None, \*\*kwargs) List[PipelineRun]:
Lists pipeline runs for a given pipeline.
- to_yaml(self) str
Serializes the object into YAML string.
- show(self, mode: str = ShowMode.GRAPH, wait: bool = False, rankdir: str = GraphOrientation.TOP_BOTTOM) None
Renders pipeline run. Can be text or graph representation.
- to_svg(self, uri: str = None, rankdir: str = GraphOrientation.TOP_BOTTOM, \*\*kwargs)
Renders pipeline run graph to SVG.
- sync(self) None
Syncs status of Pipeline run.
Initializes a service/resource with OCI client as a property. If config or signer is specified, it will be used to initialize the OCI client. If neither of them is specified, the client will be initialized with ads.common.auth.default_signer. If both of them are specified, both of them will be passed into the OCI client,
and the authentication will be determined by OCI Python SDK.
- Parameters:
config (dict, optional) – OCI API key config dictionary, by default None.
signer (oci.signer.Signer, optional) – OCI authentication signer, by default None.
client_kwargs (dict, optional) – Additional keyword arguments for initializing the OCI client.
- cancel(maximum_timeout: int = 1800) PipelineRun
Cancels an OCI pipeline run.
- Parameters:
maximum_timeout (int, optional) – The maximum timeout to cancel the pipeline run. Defaults to 1800 seconds.
- Returns:
Pipeline run instance (self).
- Return type:
- create() PipelineRun
Creates an OCI pipeline run.
- Returns:
Pipeline run instance (self).
- Return type:
- property custom_logging: OCILog
The OCILog object containing the custom logs from the pipeline run.
- delete(delete_related_job_runs: Optional[bool] = True, max_wait_seconds: Optional[int] = 1800, **kwargs) PipelineRun
Deletes an OCI pipeline run.
- Parameters:
delete_related_job_runs (bool, optional) – Specify whether to delete related JobRuns or not. Defaults to True.
max_wait_seconds (int, optional) – The maximum time to wait, in seconds. Defaults to 1800.
kwargs (optional) –
pipeline. (The kwargs to be executed when deleting the) –
are (The allowed keys) –
"allow_control_chars" (*) –
default (allow control characters in the response object. By) –
will (the response) –
strings. (not allow control characters in) –
"retry_strategy" (*) –
the (override any retry strategy set at the client-level. This should be one of) –
:param strategies available in the
retry
module. This operation will not: :param retry by default: :param users can also use the convenientDEFAULT_RETRY_STRATEGY
: :param provided by the SDK to enable retries for it. The specifics of the default retry: :param strategy are described here <https: :type strategy are described `here. :param To have this operation explicitly not perform any retries: :param pass an instance ofNoneRetryStrategy
.: :param * “if_match”: :type * “if_match”: str, for optimistic concurrency control. In the PUT or DELETE call :param for a resource: :param set the if-match parameter to the value of the etag from a: :param previous GET or POST response for that resource. The resource is updated or: :param deleted only if the etag you provide matches the resource’s current etag value.: :param * “opc_request_id”: :type * “opc_request_id”: str, unique Oracle assigned identifier for the request. :param If you need to contact Oracle about a particular request: :param then provide the request ID.: :param * “max_interval_seconds”: :type * “max_interval_seconds”: int, the maximum interval between queries, in seconds. :param * “wait_callback”: :type * “wait_callback”: A function which will be called each time that we have to do an initial :param wait (i.e. because the property of the resource was not in the correct state: :param : :param or theevaluate_response
function returned False). This function should take two: :param arguments - the first argument is the number of times we have checked the resource: :param : :param and the second argument is the result of the most recent check.: :param * “fetch_func”: :type * “fetch_func”: A function to be called to fetch the updated state from the server. :param This can be used if the call to check for state needs to be more complex than a single: :param GET request. For example: :param if the goal is to wait until an item appears in a list: :param : :param fetch_func can be a function that paginates through a full list on the server.:- Returns:
Pipeline run instance (self).
- Return type:
- classmethod list(pipeline_id: str, compartment_id: Optional[str] = None, **kwargs) List[PipelineRun]
List pipeline runs for a given pipeline.
- Parameters:
pipeline_id (str.) – The OCID of pipeline.
compartment_id ((str, optional). Defaults to None.) – The OCID of compartment. If None, the value will be taken from the environment variables.
kwargs – Additional keyword arguments for filtering pipelines. - lifecycle_state: str. Allowed values: “CREATING”, “ACTIVE”, “DELETING”, “FAILED”, “DELETED” - created_by: str - limit: int
- Returns:
The list of pipeline runs.
- Return type:
List[PipelineRun]
- logs(log_type: Optional[str] = None) ConsolidatedLog
Builds the consolidated log for pipeline run.
- Parameters:
log_type (str) – The log type of the pipeline run. Defaults to None. Can be custom_log, service_log or None.
- Returns:
The ConsolidatedLog instance.
- Return type:
- property pipeline
Returns the ADS Pipeline instance. Step details will be synched with the Pipeline Run.
- Parameters:
None –
- Returns:
The ADS Pipeline instance, where Step details will be synched with the Pipeline Run.
- Return type:
- property service_logging: OCILog
The OCILog object containing the service logs from the pipeline run.
- show(mode: str = 'graph', wait: bool = False, rankdir: str = 'TB') None
Renders pipeline run. Can be text or graph representation.
- Parameters:
mode ((str, optional). Defaults to graph.) – Pipeline run display mode. Allowed values: graph or text.
wait ((bool, optional). Default to False.) – Whether to wait until the completion of the pipeline run.
rankdir ((str, optional). Default to TB.) – Direction of the rendered graph. Allowed Values: TB or LR. Applicable only for graph mode.
- Return type:
None
- property status: str
Lifecycle status.
- Returns:
Status in a string.
- Return type:
str
- sync(**kwargs) None
Syncs status of the Pipeline Run.
- Return type:
None
- to_svg(uri: Optional[str] = None, rankdir: str = 'TB', **kwargs) str
Renders pipeline run graph to SVG.
- Parameters:
uri ((string, optional). Defaults to None.) – URI location to save the SVG string.
rankdir ((str, optional). Default to TB.) – Direction of the rendered graph. Allowed Values: TB or LR. Applicable only for graph mode.
- Returns:
Pipeline run graph in svg format.
- Return type:
str
- to_yaml() str
Serializes the object into YAML string.
- Returns:
YAML stored in a string.
- Return type:
str
- watch(steps: Optional[List[str]] = None, interval: float = 3, log_type: Optional[str] = None, *args) PipelineRun
Watches the pipeline run until it finishes. This method will keep streamming the service log of the pipeline run until it’s succeeded, failed or cancelled.
- Parameters:
steps ((List[str], optional). Defaults to None.) – Pipeline steps passed in to filter the logs.
interval ((float, optional). Defaults to 3 seconds.) – Time interval in seconds between each request to update the logs.
log_type ((str, optional). Defaults to None.) – The log type. Can be custom_log, service_log or None.
*args – Pipeline steps passed in to filter the logs. Example: .watch(“step1”, “step2”)
Examples
>>> .watch() >>> .watch(log_type="service_log") >>> .watch("step1", "step2", log_type="custom_log", interval=3) >>> .watch(steps=["step1", "step2"], log_type="custom_log", interval=3)
- Returns:
Pipeline run instance (self).
- Return type:
ads.pipeline.ads_pipeline_step module
- class ads.pipeline.ads_pipeline_step.PipelineStep(name: str, job_id: Optional[str] = None, infrastructure=None, runtime=None, description=None, maximum_runtime_in_minutes=None, environment_variable=None, command_line_argument=None, kind=None)
Bases:
Job
Represents the Data Science Machine Learning Pipeline Step.
Initialize a pipeline step.
- Parameters:
name (str, required) – The name of the pipeline step.
job_id (str, optional) – The job id of the pipeline step, by default None.
infrastructure (Infrastructure, optional) – Pipeline step infrastructure, by default None.
runtime (Runtime, optional) – Pipeline step runtime, by default None.
description (str, optional) – The description for pipeline step, by default None.
maximum_runtime_in_minutes (int, optional) – The maximum runtime in minutes for pipeline step, by default None.
environment_variable (dict, optional) – The environment variable for pipeline step, by default None.
command_line_argument (str, optional) – The command line argument for pipeline step, by default None.
kind (str, optional) – The kind of pipeline step.
- kind
The kind of the object as showing in YAML.
- Type:
str
- name
The name of pipeline step.
- Type:
str
- job_id
The job id of pipeline step.
- Type:
str
- infrastructure
The infrastructure of pipeline step.
- Type:
- description
The description of pipeline step.
- Type:
str
- maximum_runtime_in_minutes
The maximum runtime in minutes of pipeline step.
- Type:
int
- environment_variable
The environment variables of pipeline step.
- Type:
dict
- argument
The argument of pipeline step.
- Type:
str
- depends_on
The depends on of pipeline step.
- Type:
list
- with_job_id(self, job_id: str) PipelineStep
Sets the job id for pipeline step.
- with_infrastructure(self, infrastructure) PipelineStep
Sets the infrastructure for pipeline step.
- with_runtime(self, runtime) PipelineStep
Sets the runtime for pipeline step.
- with_description(self, description: str) PipelineStep
Sets the description for pipeline step.
- with_maximum_runtime_in_minutes(self, maximum_runtime_in_minutes: int) PipelineStep
Sets the maximum runtime in minutes for pipeline step.
- with_environment_variable(self, \*\*kwargs) PipelineStep
Sets the environment variables for pipeline step.
- with_argument(self, \*args, \*\*kwargs) PipelineStep
Sets the command line arguments for pipeline step.
- with_kind(self, kind: str) PipelineStep
Sets the kind for pipeline step.
- to_dict(self) dict
Serializes the pipeline step specification dictionary.
- from_dict(cls, config: dict) PipelineStep
Initializes a PipelineStep from a dictionary containing the configurations.
- to_yaml(self, uri=None, \*\*kwargs)
Returns PipelineStep serialized as a YAML string
- from_yaml(cls, yaml_string=None, uri=None, \*\*kwargs)
Creates an PipelineStep from YAML string provided or from URI location containing YAML string
Example
Here is an example for defining a pipeline step using builder:
from ads.pipeline import PipelineStep, CustomScriptStep, ScriptRuntime # Define an OCI Data Science pipeline step to run a python script pipeline_step = ( PipelineStep(name="<pipeline_step_name>") .with_infrastructure( CustomScriptStep() .with_shape_name("VM.Standard2.1") .with_block_storage_size(50) ) .with_runtime( ScriptRuntime() .with_source("oci://bucket_name@namespace/path/to/script.py") .with_service_conda("tensorflow26_p37_cpu_v2") .with_environment_variable(ENV="value") .with_argument("argument", key="value") .with_maximum_runtime_in_minutes(200) ) ) # Another way to define an OCI Data Science pipeline step from existing job pipeline_step = ( PipelineStep(name="<pipeline_step_name>") .with_job_id("<job_id>") .with_description("<description>") )
See also
https
//docs.oracle.com/en-us/iaas/tools/ads-sdk/latest/user_guide/pipeline/index.html
- CONST_COMMAND_LINE_ARGUMENTS = 'commandLineArguments'
- CONST_DEPENDS_ON = 'dependsOn'
- CONST_DESCRIPTION = 'description'
- CONST_ENVIRONMENT_VARIABLES = 'environmentVariables'
- CONST_INFRASTRUCTURE = 'infrastructure'
- CONST_JOB_ID = 'jobId'
- CONST_KIND = 'stepType'
- CONST_MAXIMUM_RUNTIME_IN_MINUTES = 'maximumRuntimeInMinutes'
- CONST_NAME = 'name'
- CONST_RUNTIME = 'runtime'
- CONST_STEP_CONFIG_DETAILS = 'stepConfigurationDetails'
- CONST_STEP_INFRA_CONFIG_DETAILS = 'stepInfrastructureConfigurationDetails'
- property argument: str
The command line arguments of the pipeline step.
- Returns:
The command line arguments of the pipeline step.
- Return type:
str
- property depends_on: list
The list of upstream pipeline steps for (self).
- Returns:
The list of upstream pipeline steps for (self).
- Return type:
list
- property description: str
The description of the pipeline step.
- Returns:
The description of the pipeline step.
- Return type:
str
- property environment_variable: dict
The environment variables of the pipeline step.
- Returns:
The environment variables of the pipeline step.
- Return type:
dict
- classmethod from_dict(config: dict) PipelineStep
Initializes a PipelineStep from a dictionary containing the configurations.
- Parameters:
config (dict) – A dictionary containing the infrastructure and runtime specifications.
- Returns:
A PipelineStep instance
- Return type:
- Raises:
NotImplementedError – If the type of the intrastructure or runtime is not supported.
- property infrastructure: DataScienceJob
The infrastructure of the pipeline step.
- Returns:
Data science pipeline step instance.
- Return type:
- property job_id: str
The job id of the pipeline step.
- Returns:
The job id of the pipeline step.
- Return type:
str
- property kind: str
The kind of the object as showing in YAML.
- Returns:
The kind of the object as showing in YAML.
- Return type:
str
- property maximum_runtime_in_minutes: int
The maximum runtime in minutes of pipeline step.
- Returns:
The maximum runtime in minutes of the pipeline step.
- Return type:
int
- property name: str
The name of pipeline step.
- Returns:
The name of the pipeline step.
- Return type:
str
- property runtime: Runtime
The runtime of the pipeline step.
- Returns:
Runtime instance.
- Return type:
- to_dict() dict
Serializes the pipeline step specification dictionary.
- Returns:
A dictionary containing pipeline step specification.
- Return type:
dict
- with_argument(*args, **kwargs) PipelineStep
Adds command line arguments to the pipeline step. Existing arguments will be preserved. This method can be called (chained) multiple times to add various arguments. For example, pipeline.with_argument(key=”val”).with_argument(“path/to/file”) will result in: “–key val path/to/file”
- Parameters:
args – Positional arguments. In a single method call, positional arguments are always added before keyword arguments. You can call with_argument() to add positional arguments after keyword arguments.
kwargs – Keyword arguments. To add a keyword argument without value, set the value to None.
- Returns:
The Pipeline step instance (self).
- Return type:
- Raises:
ValueError – Keyword arguments with space in a key.
- with_description(description: str) PipelineStep
Sets the description for pipeline step.
- Parameters:
description (str) – The description of pipeline step.
- Return type:
Pipeline step instance (self).
- with_environment_variable(**kwargs) PipelineStep
Sets environment variables of the pipeline step.
- Parameters:
kwargs – Keyword arguments. To add a keyword argument without value, set the value to None.
- Returns:
The Pipeline step instance (self).
- Return type:
- with_infrastructure(infrastructure) PipelineStep
Sets the infrastructure for pipeline step.
- Parameters:
infrastructure – The infrastructure of pipeline step.
- Return type:
Pipeline step instance (self).
- with_job_id(job_id: str) PipelineStep
Sets the job id for pipeline step.
- Parameters:
job_id (str) – The job id of pipeline step.
- Return type:
Pipeline step instance (self).
- with_kind(kind: str) PipelineStep
Sets the kind of pipeline step.
- Parameters:
kind (str) – The kind of pipeline step.
- Return type:
Pipeline step instance (self).
- with_maximum_runtime_in_minutes(maximum_runtime_in_minutes: int) PipelineStep
Sets the maximum runtime in minutes of pipeline step.
- Parameters:
maximum_runtime_in_minutes (int) – The maximum runtime in minutes of pipeline step.
- Return type:
Pipeline step instance (self).
- with_runtime(runtime) PipelineStep
Sets the runtime for pipeline step.
- Parameters:
runtime – The runtime of pipeline step.
- Return type:
Pipeline step instance (self).
ads.pipeline.cli module
ads.pipeline.extension module
- ads.pipeline.extension.load_ipython_extension(ipython)
- ads.pipeline.extension.pipeline(line, cell=None)
- ads.pipeline.extension.pipeline_cancel(options, args)
- ads.pipeline.extension.pipeline_delete(options, args)
- ads.pipeline.extension.pipeline_log(options, args)
- ads.pipeline.extension.pipeline_run(options, args)
- ads.pipeline.extension.pipeline_show(options, args)
- ads.pipeline.extension.pipeline_status(options, args)