ads.pipeline package¶
Subpackages¶
- ads.pipeline.builders package
- ads.pipeline.schema package
- ads.pipeline.visualizer package
- Submodules
- ads.pipeline.visualizer.base module
GraphOrientation
PipelineRenderer
PipelineVisualizer
PipelineVisualizer.pipeline
PipelineVisualizer.pipeline_run
PipelineVisualizer.steps
PipelineVisualizer.deps
PipelineVisualizer.step_status
PipelineVisualizer.render()
PipelineVisualizer.to_svg()
PipelineVisualizer.with_pipeline()
PipelineVisualizer.with_pipeline_run()
PipelineVisualizer.with_renderer()
PipelineVisualizerError
RendererItem
RendererItemStatus
RendererItemStatus.duration
RendererItemStatus.format_datetime()
RendererItemStatus.from_pipeline_run()
RendererItemStatus.from_pipeline_step_run()
RendererItemStatus.key
RendererItemStatus.kind
RendererItemStatus.lifecycle_details
RendererItemStatus.lifecycle_state
RendererItemStatus.name
RendererItemStatus.time_finished
RendererItemStatus.time_started
StepKind
StepStatus
- ads.pipeline.visualizer.graph_renderer module
- ads.pipeline.visualizer.text_renderer module
- Module contents
Submodules¶
ads.pipeline.ads_pipeline module¶
- class ads.pipeline.ads_pipeline.DataSciencePipeline(config: dict | None = None, signer: Signer | None = None, client_kwargs: dict | None = None, **kwargs)[source]¶
Bases:
OCIDataScienceMixin
,Pipeline
Initializes a service/resource with OCI client as a property. If config or signer is specified, it will be used to initialize the OCI client. If neither of them is specified, the client will be initialized with ads.common.auth.default_signer. If both of them are specified, both of them will be passed into the OCI client,
and the authentication will be determined by OCI Python SDK.
- Parameters:
- build_ads_pipeline() Pipeline [source]¶
Builds an ADS pipeline from OCI datascience pipeline.
- Returns:
ADS Pipeline instance.
- Return type:
- build_ads_pipeline_step(step: Dict) PipelineStep [source]¶
Builds an ADS pipeline step from OCI pipeline response.
- create_step_artifact(artifact_path: str, step_name: str) DataSciencePipeline [source]¶
Creates step artifact.
- Parameters:
- Returns:
DataSciencePipeline instance.
- Return type:
- delete(id: str, operation_kwargs: Dict = {'delete_related_job_runs': True, 'delete_related_pipeline_runs': True}, waiter_kwargs: Dict = {'max_wait_seconds': 1800}) DataSciencePipeline [source]¶
Deletes an OCI pipeline.
- Parameters:
id (str) – The ocid of pipeline.
operation_kwargs (dict, optional) –
The operational kwargs to be executed when deleting the pipeline. Defaults to: {“delete_related_pipeline_runs”: True, “delete_related_job_runs”: True}, which will delete the corresponding pipeline runs and job runs.
The allowed keys are: * “delete_related_pipeline_runs”: bool, to specify whether to delete related PipelineRuns or not. * “delete_related_job_runs”: bool, to specify whether to delete related JobRuns or not. * “allow_control_chars”: bool, to indicate whether or not this request should allow control characters in the response object. By default, the response will not allow control characters in strings * “retry_strategy”: obj, to apply to this specific operation/call. This will override any retry strategy set at the client-level. This should be one of the strategies available in the
retry
module. This operation will not retry by default, users can also use the convenientDEFAULT_RETRY_STRATEGY
provided by the SDK to enable retries for it. The specifics of the default retry strategy are described here. To have this operation explicitly not perform any retries, pass an instance ofNoneRetryStrategy
. * “if_match”: str, for optimistic concurrency control. In the PUT or DELETE call for a resource, set the if-match parameter to the value of the etag from a previous GET or POST response for that resource. The resource is updated or deleted only if the etag you provide matches the resource’s current etag value. * “opc_request_id”: str, unique Oracle assigned identifier for the request. If you need to contact Oracle about a particular request, then provide the request ID.waiter_kwargs (dict, optional) – The waiter kwargs to be passed when deleting the pipeline. Defaults to: {“max_wait_seconds”: 1800}, which will allow a maximum wait time to 1800 seconds to delete the pipeline. The allowed keys are: * “max_wait_seconds”: int, the maximum time to wait, in seconds. * “max_interval_seconds”: int, the maximum interval between queries, in seconds. * “succeed_on_not_found”: bool, to determine whether or not the waiter should return successfully if the data we’re waiting on is not found (e.g. a 404 is returned from the service). This defaults to False and so a 404 would cause an exception to be thrown by this function. Setting it to True may be useful in scenarios when waiting for a resource to be terminated/deleted since it is possible that the resource would not be returned by the a GET call anymore. * “wait_callback”: A function which will be called each time that we have to do an initial wait (i.e. because the property of the resource was not in the correct state, or the
evaluate_response
function returned False). This function should take two arguments - the first argument is the number of times we have checked the resource, and the second argument is the result of the most recent check. * “fetch_func”: A function to be called to fetch the updated state from the server. This can be used if the call to check for state needs to be more complex than a single GET request. For example, if the goal is to wait until an item appears in a list, fetch_func can be a function that paginates through a full list on the server.
- Returns:
DataSciencePipeline instance.
- Return type:
- classmethod from_ocid(ocid: str) DataSciencePipeline [source]¶
Gets a datascience pipeline by OCID.
- Parameters:
ocid (str) – The OCID of the datascience pipeline.
- Returns:
An instance of DataSciencePipeline.
- Return type:
- run(pipeline_details: Dict, service_logging: OCILog | None = None) PipelineRun [source]¶
Runs an OCI pipeline.
- Parameters:
pipeline_details (dict) – A dictionary that contains pipeline details.
service_logging (OCILog instance.) – The OCILog instance.
- Returns:
PipelineRun instance.
- Return type:
- upload_artifact(step_details: List) DataSciencePipeline [source]¶
Uploads artifacts to pipeline.
- Parameters:
step_details (list) – List of pipeline step details.
- Returns:
DataSciencePipeline instance.
- Return type:
- class ads.pipeline.ads_pipeline.Pipeline(name: str | None = None, spec: Dict | None = None, **kwargs)[source]¶
Bases:
Builder
Represents a Data Science Machine Learning Pipeline.
Initialize a pipeline.
- Parameters:
name (str) – The name of the pipeline, default to None. If a name is not provided, a randomly generated easy to remember name with timestamp will be generated, like ‘strange-spider-2022-08-17-23:55.02’.
spec (dict, optional) – Object specification, default to None
kwargs (dict) –
Specification as keyword arguments. If spec contains the same key as the one in kwargs, the value from kwargs will be used.
project_id: str
compartment_id: str
display_name: str
description: str
maximum_runtime_in_minutes: int
environment_variables: dict(str, str)
command_line_arguments: str
log_id: str
log_group_id: str
enable_service_log: bool
shape_name: str
block_storage_size_in_gbs: int
shape_config_details: dict
step_details: list[PipelineStep]
dag: list[str]
defined_tags: dict(str, dict(str, object))
freeform_tags: dict[str, str]
- step_details¶
The step details of pipeline.
- Type:
List[PipelineStep]
- with_step_details(self, step_details: List[PipelineStep]) Pipeline [source]¶
Sets the step details of pipeline.
- with_compartment_id(self, compartment_id: str) Pipeline [source]¶
Sets the compartment id of pipeline.
- with_environment_variable(self, \*\*kwargs) Pipeline [source]¶
Sets the environment variables of pipeline.
- with_argument(self, \*args, \*\*kwargs) Pipeline [source]¶
Sets the command line arguments of pipeline.
- with_maximum_runtime_in_minutes(self, maximum_runtime_in_minutes: int) Pipeline [source]¶
Sets the maximum runtime in minutes of pipeline.
- with_freeform_tags(self, freeform_tags: Dict) Pipeline [source]¶
Sets the freeform tags of pipeline.
- with_shape_name(self, shape_name: str) Pipeline [source]¶
Sets the shape name of pipeline infrastructure.
- with_block_storage_size_in_gbs(self, block_storage_size_in_gbs: int) Pipeline [source]¶
Sets the block storage size of pipeline infrastructure.
- with_shape_config_details(self, shape_config_details: Dict) Pipeline [source]¶
Sets the shape config details of pipeline infrastructure.
- with_enable_service_log(self, enable_service_log: bool) Pipeline [source]¶
Sets the value to enable the service log of pipeline.
- from_dict(cls, obj_dict: dict):
Initializes the object from a dictionary.
- show(self, rankdir: str = GraphOrientation.TOP_BOTTOM)[source]¶
Render pipeline with step information in a graph.
- to_svg(self, uri: str = None, rankdir: str = GraphOrientation.TOP_BOTTOM, \*\*kwargs) str: [source]¶
Renders pipeline as graph into SVG.
- run(self, display_name: str | None = None, project_id: str | None = None, compartment_id: str | None = None, configuration_override_details: dict | None = None, log_configuration_override_details: dict | None = None, step_override_details: list | None = None, free_form_tags: dict | None = None, defined_tags: dict | None = None, system_tags: dict | None = None) PipelineRun [source]¶
Creates and/or overrides an ADS pipeline run.
- delete(self, delete_related_pipeline_runs: Optional[bool] = True, delete_related_job_runs: Optional[bool] = True, max_wait_seconds: Optional[int] = MAXIMUM_TIMEOUT, \*\*kwargs) Pipeline [source]¶
Deletes an ADS pipeline run.
- to_yaml(self, uri=None, \*\*kwargs)¶
Returns Pipeline serialized as a YAML string
- from_yaml(cls, yaml_string=None, uri=None, \*\*kwargs)¶
Creates an Pipeline from YAML string provided or from URI location containing YAML string
- list(cls, compartment_id: Optional[str] = None, \*\*kwargs) List[Pipeline] [source]¶
List pipelines in a given compartment.
- run_list(self, \*\*kwargs) List[PipelineRun] [source]¶
Gets a list of runs of the pipeline.
Example
Here is an example for creating and running a pipeline using builder:
from ads.pipeline import Pipeline, CustomScriptStep, ScriptRuntime # Define an OCI Data Science pipeline pipeline = ( Pipeline(name="<pipeline_name>") .with_compartment_id("<compartment_id>") .with_project_id("<project_id>") .with_log_group_id("<log_group_id>") .with_log_id("<log_id>") .with_description("<description>") .with_maximum_runtime_in_minutes(200) .with_argument("argument", key="value") .with_environment_variable(env="value") .with_freeform_tags({"key": "value"}) .with_step_details([ ( PipelineStep(name="PipelineStepOne") .with_job_id("<job_id>") .with_description("<description>") ), ( PipelineStep(name="PipelineStepTwo") .with_infrastructure( CustomScriptStep() .with_shape_name("VM.Standard2.1") .with_block_storage_size(50) ) .with_runtime( ScriptRuntime() .with_source("oci://bucket_name@namespace/path/to/script.py") .with_service_conda("tensorflow26_p37_cpu_v2") .with_environment_variable(ENV="value") .with_argument("argument", key="value") .with_maximum_runtime_in_minutes(200) ) ) ]) .with_dag_details(["PipelineStepOne >> PipelineStepTwo"]) ) # Create and Run the pipeline run = pipeline.create().run() # Stream the pipeline run outputs run.watch()
See also
https
//docs.oracle.com/en-us/iaas/tools/ads-sdk/latest/user_guide/pipeline/index.html
- CONST_BLOCK_STORAGE_SIZE = 'blockStorageSizeInGBs'¶
- CONST_COMMAND_LINE_ARGUMENTS = 'commandLineArguments'¶
- CONST_COMPARTMENT_ID = 'compartmentId'¶
- CONST_CONFIGURATION_DETAILS = 'configurationDetails'¶
- CONST_CONFIGURATION_OVERRIDE_DETAILS = 'configurationOverrideDetails'¶
- CONST_CREATED_BY = 'createdBy'¶
- CONST_DAG = 'dag'¶
- CONST_DEFINED_TAGS = 'definedTags'¶
- CONST_DESCRIPTION = 'description'¶
- CONST_DISPLAY_NAME = 'displayName'¶
- CONST_ENABLE_AUTO_LOG_CREATION = 'enableAutoLogCreation'¶
- CONST_ENABLE_LOGGING = 'enableLogging'¶
- CONST_ENABLE_SERVICE_LOG = 'enableServiceLog'¶
- CONST_ENVIRONMENT_VARIABLES = 'environmentVariables'¶
- CONST_FREEFROM_TAGS = 'freeformTags'¶
- CONST_ID = 'id'¶
- CONST_INFRA_CONFIG_DETAILS = 'infrastructureConfigurationDetails'¶
- CONST_LOG_CONFIGURATION_DETAILS = 'logConfigurationDetails'¶
- CONST_LOG_CONFIGURATION_OVERRIDE_DETAILS = 'logConfigurationOverrideDetails'¶
- CONST_LOG_GROUP_ID = 'logGroupId'¶
- CONST_LOG_ID = 'logId'¶
- CONST_MAXIMUM_RUNTIME_IN_MINUTES = 'maximumRuntimeInMinutes'¶
- CONST_MEMORY_IN_GBS = 'memoryInGBs'¶
- CONST_OCPUS = 'ocpus'¶
- CONST_PIPELINE_ID = 'pipelineId'¶
- CONST_PROJECT_ID = 'projectId'¶
- CONST_SERVICE = 'datascience'¶
- CONST_SERVICE_LOG_CATEGORY = 'pipelinerunlog'¶
- CONST_SERVICE_LOG_ID = 'serviceLogId'¶
- CONST_SHAPE_CONFIG_DETAILS = 'shapeConfigDetails'¶
- CONST_SHAPE_NAME = 'shapeName'¶
- CONST_STEP_DETAILS = 'stepDetails'¶
- CONST_STEP_OVERRIDE_DETAILS = 'stepOverrideDetails'¶
- CONST_SYSTEM_TAGS = 'systemTags'¶
- CONST_TYPE = 'type'¶
- LIFECYCLE_STATE_ACTIVE = 'ACTIVE'¶
- LIFECYCLE_STATE_CREATING = 'CREATING'¶
- LIFECYCLE_STATE_DELETED = 'DELETED'¶
- LIFECYCLE_STATE_DELETING = 'DELETING'¶
- LIFECYCLE_STATE_FAILED = 'FAILED'¶
- property argument: str¶
The command line arguments of the pipeline.
- Returns:
The command line arguments of the pipeline.
- Return type:
- property block_storage_size_in_gbs: int¶
The block storage size of pipeline infrastructure.
- Returns:
The block storage size of the pipeline infrastructure.
- Return type:
- property compartment_id: str¶
The compartment id of the pipeline.
- Returns:
The compartment id of the pipeline.
- Return type:
- create(delete_if_fail: bool = True) Pipeline [source]¶
Creates an ADS pipeline.
- Returns:
The ADS Pipeline instance.
- Return type:
- property created_by: str¶
The id that creates the pipeline.
- Returns:
The id that creates the pipeline.
- Return type:
- property dag: List[str]¶
The dag details of the pipeline.
- Returns:
The dag details of the pipeline.
- Return type:
- delete(delete_related_pipeline_runs: bool | None = True, delete_related_job_runs: bool | None = True, max_wait_seconds: int | None = 1800, **kwargs) Pipeline [source]¶
Deteles an ADS pipeline.
- Parameters:
delete_related_pipeline_runs (bool, optional) – Specify whether to delete related PipelineRuns or not. Defaults to True.
delete_related_job_runs (bool, optional) – Specify whether to delete related JobRuns or not. Defaults to True.
max_wait_seconds (int, optional) – The maximum time to wait, in seconds. Defaults to 1800.
kwargs (optional)
pipeline. (The kwargs to be executed when deleting the)
are (The allowed keys)
"allow_control_chars" (*)
default (allow control characters in the response object. By)
will (the response)
strings. (not allow control characters in)
"retry_strategy" (*)
the (override any retry strategy set at the client-level. This should be one of)
:param strategies available in the
retry
module. This operation will not: :param retry by default: :param users can also use the convenientDEFAULT_RETRY_STRATEGY
: :param provided by the SDK to enable retries for it. The specifics of the default retry: :param strategy are described here.: :param To have this operation explicitly not perform any retries: :param pass an instance ofNoneRetryStrategy
.: :param * “if_match”: :type * “if_match”: str, for optimistic concurrency control. In the PUT or DELETE call :param for a resource: :param set the if-match parameter to the value of the etag from a: :param previous GET or POST response for that resource. The resource is updated or: :param deleted only if the etag you provide matches the resource’s current etag value.: :param * “opc_request_id”: :type * “opc_request_id”: str, unique Oracle assigned identifier for the request. :param If you need to contact Oracle about a particular request: :param then provide the request ID.: :param * “max_interval_seconds”: :type * “max_interval_seconds”: int, the maximum interval between queries, in seconds. :param * “succeed_on_not_found”: :type * “succeed_on_not_found”: bool, to determine whether or not the waiter should :param return successfully if the data we’re waiting on is not found: :param (e.g. a 404 is returned from the service). This defaults to False and so a 404 would: :param cause an exception to be thrown by this function. Setting it to True may be useful in: :param scenarios when waiting for a resource to be terminated/deleted since it is possible that: :param the resource would not be returned by the a GET call anymore.: :param * “wait_callback”: :type * “wait_callback”: A function which will be called each time that we have to do an initial :param wait (i.e. because the property of the resource was not in the correct state: :param : :param or theevaluate_response
function returned False). This function should take two: :param arguments - the first argument is the number of times we have checked the resource: :param : :param and the second argument is the result of the most recent check.: :param * “fetch_func”: :type * “fetch_func”: A function to be called to fetch the updated state from the server. :param This can be used if the call to check for state needs to be more complex than a single: :param GET request. For example: :param if the goal is to wait until an item appears in a list: :param : :param fetch_func can be a function that paginates through a full list on the server.:- Returns:
The ADS Pipeline instance.
- Return type:
- property description: str¶
The description of pipeline.
- Returns:
The description of pipeline.
- Return type:
- download(to_dir: str, override_if_exists: bool | None = False) Pipeline [source]¶
Downloads artifacts from pipeline.
- property enable_service_log: bool¶
Enables service log of pipeline.
- Returns:
The bool value to enable service log of pipeline.
- Return type:
- property environment_variable: dict¶
The environment variables of the pipeline.
- Returns:
The environment variables of the pipeline.
- Return type:
- init(**kwargs) Pipeline [source]¶
Initializes a starter specification for the Pipeline.
- Returns:
The Pipeline instance (self)
- Return type:
- classmethod list(compartment_id: str | None = None, **kwargs) List[Pipeline] [source]¶
List pipelines in a given compartment.
- Parameters:
compartment_id ((str, optional). Defaults to None.) – The OCID of compartment. If None, the value will be taken from the environment variables.
kwargs – Additional keyword arguments for filtering pipelines. - project_id: str - lifecycle_state: str. Allowed values: “CREATING”, “ACTIVE”, “DELETING”, “FAILED”, “DELETED” - created_by: str - limit: int
- Returns:
The list of pipelines.
- Return type:
List[Pipeline]
- property log_group_id: str¶
The log group id of the pipeline.
- Returns:
The log group id of the pipeline.
- Return type:
- property maximum_runtime_in_minutes: int¶
The maximum runtime in minutes of the pipeline.
- Returns:
The maximum runtime minutes of the pipeline.
- Return type:
- property project_id: str¶
The project id of the pipeline.
- Returns:
The project id of the pipeline.
- Return type:
- run(display_name: str | None = None, project_id: str | None = None, compartment_id: str | None = None, configuration_override_details: dict | None = None, log_configuration_override_details: dict | None = None, step_override_details: list | None = None, free_form_tags: dict | None = None, defined_tags: dict | None = None, system_tags: dict | None = None) PipelineRun [source]¶
Creates an ADS pipeline run.
- Parameters:
display_name (str, optional) – The display name to override the one defined previously. Defaults to None.
project_id (str, optional) – The project id to override the one defined previously. Defaults to None.
compartment_id (str, optional) – The compartment id to override the one defined previously. Defaults to None.
configuration_override_details (dict, optional) – The configuration details dictionary to override the one defined previously. Defaults to None. The configuration_override_details contains the following keys: * “type”: str, only “DEFAULT” is allowed. * “environment_variables”: dict, optional, the environment variables * “command_line_arguments”: str, optional, the command line arguments * “maximum_runtime_in_minutes”: int, optional, the maximum runtime allowed in minutes
log_configuration_override_details (dict(str, str), optional) – The log configuration details dictionary to override the one defined previously. Defaults to None. The log_configuration_override_details contains the following keys: * “log_group_id”: str, optional, the log group id * “log_id”: str, optional, the log id
step_override_details (list[PipelineStepOverrideDetails], optional) –
The step details list to override the one defined previously. Defaults to None. The PipelineStepOverrideDetails is a dict which contains the following keys: * step_name: str, the name of step to override * step_configuration_details: dict, which contains:
”maximum_runtime_in_minutes”: int, optional
”environment_variables”: dict, optional
”command_line_arguments”: str, optional
free_form_tags (dict(str, str), optional) – The free from tags dictionary to override the one defined previously. Defaults to None.
defined_tags (dict(str, dict(str, object)), optional) – The defined tags dictionary to override the one defined previously. Defaults to None.
system_tags (dict(str, dict(str, object)), optional) – The system tags dictionary to override the one defined previously. Defaults to None.
Example
# Creates a pipeline run using pipeline configurations pipeline.run() # Creates a pipeline run by overriding pipeline configurations pipeline.run( display_name="OverrideDisplayName", configuration_override_details={ "maximum_runtime_in_minutes":30, "type":"DEFAULT", "environment_variables": { "key": "value" }, "command_line_arguments": "ARGUMENT --KEY VALUE", }, log_configuration_override_details={ "log_group_id": "<log_group_id>" }, step_override_details=[{ "step_name" : "<step_name>", "step_configuration_details" : { "maximum_runtime_in_minutes": 200, "environment_variables": { "1":"2" }, "command_line_arguments": "argument --key value", } }] )
- Returns:
The ADS PipelineRun instance.
- Return type:
- run_list(**kwargs) List[PipelineRun] [source]¶
Gets a list of runs of the pipeline.
- Returns:
A list of pipeline run instances.
- Return type:
List[PipelineRun]
- property service_log_id: str¶
The service log id of pipeline.
- Returns:
The service log id of pipeline.
- Return type:
- property shape_config_details: dict¶
The shape config details of pipeline infrastructure.
- Returns:
The shape config details of the pipeline infrastructure.
- Return type:
- property shape_name: str¶
The shape name of pipeline infrastructure.
- Returns:
The shape name of the pipeline infrastructure.
- Return type:
- show(rankdir: str = 'TB') None [source]¶
Render pipeline with step information in a graph
- Return type:
None
- property step_details: List[PipelineStep]¶
The step details of the pipeline.
- Returns:
The step details of the pipeline.
- Return type:
- to_dict(**kwargs) dict [source]¶
Serializes the pipeline specifications to a dictionary.
- Returns:
A dictionary containing pipeline specifications.
- Return type:
- to_svg(uri: str | None = None, rankdir: str = 'TB', **kwargs) str [source]¶
Renders pipeline as graph in svg string.
- with_argument(*args, **kwargs) Pipeline [source]¶
Adds command line arguments to the pipeline. Existing arguments will be preserved. This method can be called (chained) multiple times to add various arguments. For example, pipeline.with_argument(key=”val”).with_argument(“path/to/file”) will result in: “–key val path/to/file”
- Parameters:
args – Positional arguments. In a single method call, positional arguments are always added before keyword arguments. You can call with_argument() to add positional arguments after keyword arguments.
kwargs – Keyword arguments. To add a keyword argument without value, set the value to None.
- Returns:
The Pipeline instance (self).
- Return type:
- Raises:
ValueError – Keyword arguments with space in a key.
- with_block_storage_size_in_gbs(block_storage_size_in_gbs: int) Pipeline [source]¶
Sets the block storage size of pipeline infrastructure.
- with_compartment_id(compartment_id: str) Pipeline [source]¶
Sets the compartment id of the pipeline.
- with_enable_service_log(enable_service_log: bool) Pipeline [source]¶
Sets the bool value to enable the service log of pipeline.
- with_environment_variable(**kwargs) Pipeline [source]¶
Sets environment variables of the pipeline.
- Parameters:
kwargs – Keyword arguments. To add a keyword argument without value, set the value to None.
- Returns:
The Pipeline instance (self).
- Return type:
- with_maximum_runtime_in_minutes(maximum_runtime_in_minutes: int) Pipeline [source]¶
Sets the maximum runtime in minutes of the pipeline.
- with_shape_config_details(memory_in_gbs: float, ocpus: float, **kwargs: Dict[str, Any]) Pipeline [source]¶
Sets the shape config details of pipeline infrastructure. Specify only when a flex shape is selected. For example VM.Standard.E3.Flex allows the memory_in_gbs and cpu count to be specified.
ads.pipeline.ads_pipeline_run module¶
- class ads.pipeline.ads_pipeline_run.LogType[source]¶
Bases:
str
- CUSTOM_LOG = 'custom_log'¶
- SERVICE_LOG = 'service_log'¶
- class ads.pipeline.ads_pipeline_run.PipelineRun(config: dict | None = None, signer: Signer | None = None, client_kwargs: dict | None = None, **kwargs)[source]¶
Bases:
OCIDataScienceMixin
,PipelineRun
,RunInstance
- custom_logging¶
Returns the OCILog object containing the custom logs from the pipeline.
- Type:
- create(self) PipelineRun [source]¶
Creates an OCI pipeline run.
- delete(self, delete_related_job_runs: Optional[bool] = True, max_wait_seconds: Optional[int] = MAXIMUM_TIMEOUT, \*\*kwargs) PipelineRun [source]¶
Deletes an OCI pipeline run.
- cancel(self, maximum_timeout: int = MAXIMUM_TIMEOUT) PipelineRun [source]¶
Cancels an OCI pipeline run.
- watch(self, steps: List[str] = None, interval: float = LOG_INTERVAL, log_type: str = LogType.CUSTOM_LOG, \*args) PipelineRun [source]¶
Watches the pipeline run until it finishes.
- list(cls, pipeline_id: str, compartment_id: Optional[str] = None, \*\*kwargs) List[PipelineRun]: [source]¶
Lists pipeline runs for a given pipeline.
- show(self, mode: str = ShowMode.GRAPH, wait: bool = False, rankdir: str = GraphOrientation.TOP_BOTTOM) None [source]¶
Renders pipeline run. Can be text or graph representation.
- to_svg(self, uri: str = None, rankdir: str = GraphOrientation.TOP_BOTTOM, \*\*kwargs)[source]¶
Renders pipeline run graph to SVG.
Initializes a service/resource with OCI client as a property. If config or signer is specified, it will be used to initialize the OCI client. If neither of them is specified, the client will be initialized with ads.common.auth.default_signer. If both of them are specified, both of them will be passed into the OCI client,
and the authentication will be determined by OCI Python SDK.
- Parameters:
- cancel(maximum_timeout: int = 1800) PipelineRun [source]¶
Cancels an OCI pipeline run.
- Parameters:
maximum_timeout (int, optional) – The maximum timeout to cancel the pipeline run. Defaults to 1800 seconds.
- Returns:
Pipeline run instance (self).
- Return type:
- create() PipelineRun [source]¶
Creates an OCI pipeline run.
- Returns:
Pipeline run instance (self).
- Return type:
- property custom_logging: OCILog¶
The OCILog object containing the custom logs from the pipeline run.
- delete(delete_related_job_runs: bool | None = True, max_wait_seconds: int | None = 1800, **kwargs) PipelineRun [source]¶
Deletes an OCI pipeline run.
- Parameters:
delete_related_job_runs (bool, optional) – Specify whether to delete related JobRuns or not. Defaults to True.
max_wait_seconds (int, optional) – The maximum time to wait, in seconds. Defaults to 1800.
kwargs (optional)
pipeline. (The kwargs to be executed when deleting the)
are (The allowed keys)
"allow_control_chars" (*)
default (allow control characters in the response object. By)
will (the response)
strings. (not allow control characters in)
"retry_strategy" (*)
the (override any retry strategy set at the client-level. This should be one of)
:param strategies available in the
retry
module. This operation will not: :param retry by default: :param users can also use the convenientDEFAULT_RETRY_STRATEGY
: :param provided by the SDK to enable retries for it. The specifics of the default retry: :param strategy are described here.: :param To have this operation explicitly not perform any retries: :param pass an instance ofNoneRetryStrategy
.: :param * “if_match”: :type * “if_match”: str, for optimistic concurrency control. In the PUT or DELETE call :param for a resource: :param set the if-match parameter to the value of the etag from a: :param previous GET or POST response for that resource. The resource is updated or: :param deleted only if the etag you provide matches the resource’s current etag value.: :param * “opc_request_id”: :type * “opc_request_id”: str, unique Oracle assigned identifier for the request. :param If you need to contact Oracle about a particular request: :param then provide the request ID.: :param * “max_interval_seconds”: :type * “max_interval_seconds”: int, the maximum interval between queries, in seconds. :param * “wait_callback”: :type * “wait_callback”: A function which will be called each time that we have to do an initial :param wait (i.e. because the property of the resource was not in the correct state: :param : :param or theevaluate_response
function returned False). This function should take two: :param arguments - the first argument is the number of times we have checked the resource: :param : :param and the second argument is the result of the most recent check.: :param * “fetch_func”: :type * “fetch_func”: A function to be called to fetch the updated state from the server. :param This can be used if the call to check for state needs to be more complex than a single: :param GET request. For example: :param if the goal is to wait until an item appears in a list: :param : :param fetch_func can be a function that paginates through a full list on the server.:- Returns:
Pipeline run instance (self).
- Return type:
- classmethod list(pipeline_id: str, compartment_id: str | None = None, **kwargs) List[PipelineRun] [source]¶
List pipeline runs for a given pipeline.
- Parameters:
pipeline_id (str.) – The OCID of pipeline.
compartment_id ((str, optional). Defaults to None.) – The OCID of compartment. If None, the value will be taken from the environment variables.
kwargs – Additional keyword arguments for filtering pipelines. - lifecycle_state: str. Allowed values: “CREATING”, “ACTIVE”, “DELETING”, “FAILED”, “DELETED” - created_by: str - limit: int
- Returns:
The list of pipeline runs.
- Return type:
List[PipelineRun]
- logs(log_type: str | None = None) ConsolidatedLog [source]¶
Builds the consolidated log for pipeline run.
- Parameters:
log_type (str) – The log type of the pipeline run. Defaults to None. Can be custom_log, service_log or None.
- Returns:
The ConsolidatedLog instance.
- Return type:
- property pipeline¶
Returns the ADS Pipeline instance. Step details will be synched with the Pipeline Run.
- Parameters:
None
- Returns:
The ADS Pipeline instance, where Step details will be synched with the Pipeline Run.
- Return type:
- property service_logging: OCILog¶
The OCILog object containing the service logs from the pipeline run.
- show(mode: str = 'graph', wait: bool = False, rankdir: str = 'TB') None [source]¶
Renders pipeline run. Can be text or graph representation.
- Parameters:
mode ((str, optional). Defaults to graph.) – Pipeline run display mode. Allowed values: graph or text.
wait ((bool, optional). Default to False.) – Whether to wait until the completion of the pipeline run.
rankdir ((str, optional). Default to TB.) – Direction of the rendered graph. Allowed Values: TB or LR. Applicable only for graph mode.
- Return type:
None
- to_svg(uri: str | None = None, rankdir: str = 'TB', **kwargs) str [source]¶
Renders pipeline run graph to SVG.
- Parameters:
uri ((string, optional). Defaults to None.) – URI location to save the SVG string.
rankdir ((str, optional). Default to TB.) – Direction of the rendered graph. Allowed Values: TB or LR. Applicable only for graph mode.
- Returns:
Pipeline run graph in svg format.
- Return type:
- to_yaml() str [source]¶
Serializes the object into YAML string.
- Returns:
YAML stored in a string.
- Return type:
- watch(steps: List[str] | None = None, interval: float = 3, log_type: str | None = None, *args) PipelineRun [source]¶
Watches the pipeline run until it finishes. This method will keep streamming the service log of the pipeline run until it’s succeeded, failed or cancelled.
- Parameters:
steps ((List[str], optional). Defaults to None.) – Pipeline steps passed in to filter the logs.
interval ((float, optional). Defaults to 3 seconds.) – Time interval in seconds between each request to update the logs.
log_type ((str, optional). Defaults to None.) – The log type. Can be custom_log, service_log or None.
*args – Pipeline steps passed in to filter the logs. Example: .watch(“step1”, “step2”)
Examples
>>> .watch() >>> .watch(log_type="service_log") >>> .watch("step1", "step2", log_type="custom_log", interval=3) >>> .watch(steps=["step1", "step2"], log_type="custom_log", interval=3)
- Returns:
Pipeline run instance (self).
- Return type:
ads.pipeline.ads_pipeline_step module¶
- class ads.pipeline.ads_pipeline_step.PipelineStep(name: str | None = None, job_id: str | None = None, infrastructure=None, runtime=None, description=None, maximum_runtime_in_minutes=None, environment_variable=None, command_line_argument=None, kind=None)[source]¶
Bases:
Job
Represents the Data Science Machine Learning Pipeline Step.
Initialize a pipeline step.
- Parameters:
name (str, required) – The name of the pipeline step.
job_id (str, optional) – The job id of the pipeline step, by default None.
infrastructure (Infrastructure, optional) – Pipeline step infrastructure, by default None.
runtime (Runtime, optional) – Pipeline step runtime, by default None.
description (str, optional) – The description for pipeline step, by default None.
maximum_runtime_in_minutes (int, optional) – The maximum runtime in minutes for pipeline step, by default None.
environment_variable (dict, optional) – The environment variable for pipeline step, by default None.
command_line_argument (str, optional) – The command line argument for pipeline step, by default None.
kind (str, optional) – The kind of pipeline step.
- infrastructure¶
The infrastructure of pipeline step.
- Type:
- with_job_id(self, job_id: str) PipelineStep [source]¶
Sets the job id for pipeline step.
- with_infrastructure(self, infrastructure) PipelineStep [source]¶
Sets the infrastructure for pipeline step.
- with_runtime(self, runtime) PipelineStep [source]¶
Sets the runtime for pipeline step.
- with_description(self, description: str) PipelineStep [source]¶
Sets the description for pipeline step.
- with_maximum_runtime_in_minutes(self, maximum_runtime_in_minutes: int) PipelineStep [source]¶
Sets the maximum runtime in minutes for pipeline step.
- with_environment_variable(self, \*\*kwargs) PipelineStep [source]¶
Sets the environment variables for pipeline step.
- with_argument(self, \*args, \*\*kwargs) PipelineStep [source]¶
Sets the command line arguments for pipeline step.
- with_kind(self, kind: str) PipelineStep [source]¶
Sets the kind for pipeline step.
- from_dict(cls, config: dict) PipelineStep [source]¶
Initializes a PipelineStep from a dictionary containing the configurations.
- to_yaml(self, uri=None, \*\*kwargs)¶
Returns PipelineStep serialized as a YAML string
- from_yaml(cls, yaml_string=None, uri=None, \*\*kwargs)¶
Creates an PipelineStep from YAML string provided or from URI location containing YAML string
Example
Here is an example for defining a pipeline step using builder:
from ads.pipeline import PipelineStep, CustomScriptStep, ScriptRuntime # Define an OCI Data Science pipeline step to run a python script pipeline_step = ( PipelineStep(name="<pipeline_step_name>") .with_infrastructure( CustomScriptStep() .with_shape_name("VM.Standard2.1") .with_block_storage_size(50) ) .with_runtime( ScriptRuntime() .with_source("oci://bucket_name@namespace/path/to/script.py") .with_service_conda("tensorflow26_p37_cpu_v2") .with_environment_variable(ENV="value") .with_argument("argument", key="value") .with_maximum_runtime_in_minutes(200) ) ) # Another way to define an OCI Data Science pipeline step from existing job pipeline_step = ( PipelineStep(name="<pipeline_step_name>") .with_job_id("<job_id>") .with_description("<description>") )
See also
https
//docs.oracle.com/en-us/iaas/tools/ads-sdk/latest/user_guide/pipeline/index.html
- CONST_COMMAND_LINE_ARGUMENTS = 'commandLineArguments'¶
- CONST_DEPENDS_ON = 'dependsOn'¶
- CONST_DESCRIPTION = 'description'¶
- CONST_ENVIRONMENT_VARIABLES = 'environmentVariables'¶
- CONST_INFRASTRUCTURE = 'infrastructure'¶
- CONST_JOB_ID = 'jobId'¶
- CONST_KIND = 'stepType'¶
- CONST_MAXIMUM_RUNTIME_IN_MINUTES = 'maximumRuntimeInMinutes'¶
- CONST_NAME = 'name'¶
- CONST_RUNTIME = 'runtime'¶
- CONST_STEP_CONFIG_DETAILS = 'stepConfigurationDetails'¶
- CONST_STEP_INFRA_CONFIG_DETAILS = 'stepInfrastructureConfigurationDetails'¶
- property argument: str¶
The command line arguments of the pipeline step.
- Returns:
The command line arguments of the pipeline step.
- Return type:
- property depends_on: list¶
The list of upstream pipeline steps for (self).
- Returns:
The list of upstream pipeline steps for (self).
- Return type:
- property description: str¶
The description of the pipeline step.
- Returns:
The description of the pipeline step.
- Return type:
- property environment_variable: dict¶
The environment variables of the pipeline step.
- Returns:
The environment variables of the pipeline step.
- Return type:
- classmethod from_dict(config: dict) PipelineStep [source]¶
Initializes a PipelineStep from a dictionary containing the configurations.
- Parameters:
config (dict) – A dictionary containing the infrastructure and runtime specifications.
- Returns:
A PipelineStep instance
- Return type:
- Raises:
NotImplementedError – If the type of the intrastructure or runtime is not supported.
- property infrastructure: DataScienceJob¶
The infrastructure of the pipeline step.
- Returns:
Data science pipeline step instance.
- Return type:
- property job_id: str¶
The job id of the pipeline step.
- Returns:
The job id of the pipeline step.
- Return type:
- property kind: str¶
The kind of the object as showing in YAML.
- Returns:
The kind of the object as showing in YAML.
- Return type:
- property maximum_runtime_in_minutes: int¶
The maximum runtime in minutes of pipeline step.
- Returns:
The maximum runtime in minutes of the pipeline step.
- Return type:
- property runtime: Runtime¶
The runtime of the pipeline step.
- Returns:
Runtime instance.
- Return type:
- to_dict() dict [source]¶
Serializes the pipeline step specification dictionary.
- Returns:
A dictionary containing pipeline step specification.
- Return type:
- with_argument(*args, **kwargs) PipelineStep [source]¶
Adds command line arguments to the pipeline step. Existing arguments will be preserved. This method can be called (chained) multiple times to add various arguments. For example, pipeline.with_argument(key=”val”).with_argument(“path/to/file”) will result in: “–key val path/to/file”
- Parameters:
args – Positional arguments. In a single method call, positional arguments are always added before keyword arguments. You can call with_argument() to add positional arguments after keyword arguments.
kwargs – Keyword arguments. To add a keyword argument without value, set the value to None.
- Returns:
The Pipeline step instance (self).
- Return type:
- Raises:
ValueError – Keyword arguments with space in a key.
- with_description(description: str) PipelineStep [source]¶
Sets the description for pipeline step.
- Parameters:
description (str) – The description of pipeline step.
- Return type:
Pipeline step instance (self).
- with_environment_variable(**kwargs) PipelineStep [source]¶
Sets environment variables of the pipeline step.
- Parameters:
kwargs – Keyword arguments. To add a keyword argument without value, set the value to None.
- Returns:
The Pipeline step instance (self).
- Return type:
- with_infrastructure(infrastructure) PipelineStep [source]¶
Sets the infrastructure for pipeline step.
- Parameters:
infrastructure – The infrastructure of pipeline step.
- Return type:
Pipeline step instance (self).
- with_job_id(job_id: str) PipelineStep [source]¶
Sets the job id for pipeline step.
- Parameters:
job_id (str) – The job id of pipeline step.
- Return type:
Pipeline step instance (self).
- with_kind(kind: str) PipelineStep [source]¶
Sets the kind of pipeline step.
- Parameters:
kind (str) – The kind of pipeline step.
- Return type:
Pipeline step instance (self).
- with_maximum_runtime_in_minutes(maximum_runtime_in_minutes: int) PipelineStep [source]¶
Sets the maximum runtime in minutes of pipeline step.
- Parameters:
maximum_runtime_in_minutes (int) – The maximum runtime in minutes of pipeline step.
- Return type:
Pipeline step instance (self).
- with_runtime(runtime) PipelineStep [source]¶
Sets the runtime for pipeline step.
- Parameters:
runtime – The runtime of pipeline step.
- Return type:
Pipeline step instance (self).