#!/usr/bin/env python
# -*- coding: utf-8; -*-
# Copyright (c) 2021, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
from __future__ import annotations
import os
from typing import Dict
from ads.common.auth import default_signer
from ads.jobs.builders.runtimes.base import Runtime
from ads.opctl.config.utils import convert_notebook
[docs]
class CondaRuntime(Runtime):
"""Represents a job runtime with conda pack
This is the base class for Runtime using conda environment.
The ``CondaRuntime`` is not designed to be used directly when creating a job.
"""
CONST_CONDA = "conda"
CONST_CONDA_TYPE = "type"
CONST_CONDA_TYPE_SERVICE = "service"
CONST_CONDA_TYPE_CUSTOM = "published"
CONST_CONDA_SLUG = "slug"
CONST_CONDA_URI = "uri"
CONST_CONDA_REGION = "region"
attribute_map = {CONST_CONDA: CONST_CONDA}
attribute_map.update(Runtime.attribute_map)
@property
def conda(self) -> dict:
"""The conda environment specification.
For service conda environment, the specification contains:
* ``type``, the type of the conda environment.
This is always ``service`` for service conda environment.
* ``slug``, the slug of the conda environment.
For custom conda environment, the specification contains:
* ``type``, the type of the conda environment.
This is always ``published`` for custom conda environment.
* ``uri``, the uri of the conda environment, e.g. oci://bucket@namespace/prefix/to/conda
* ``region``, the region of the bucket in which the conda environment is stored.
By default, ADS will determine the region based on the authenticated API key or resource principal.
This is only needed if your conda environment is stored in a different region.
Returns
-------
dict
A dictionary containing the conda environment specifications.
"""
return self.get_spec(self.CONST_CONDA)
[docs]
def with_service_conda(self, slug: str):
"""Specifies the service conda pack for running the job
Parameters
----------
slug : str
The slug name of the service conda pack
Returns
-------
self
The runtime instance.
"""
return self.set_spec(
self.CONST_CONDA,
{
self.CONST_CONDA_TYPE: self.CONST_CONDA_TYPE_SERVICE,
self.CONST_CONDA_SLUG: slug,
},
)
[docs]
def with_custom_conda(self, uri: str, region: str = None):
"""Specifies the custom conda pack for running the job
Make sure you have configured the IAM policy for the job run to access the conda environment.
Parameters
----------
uri : str
The OCI object storage URI for the conda pack,
e.g. "oci://your_bucket@namespace/object_name."
In the Environment Explorer of an OCI notebook session,
this is shown as the "source" of the conda pack.
region: str, optional
The region of the bucket storing the custom conda pack, by default None.
If region is not specified, ADS will use the region from your authentication credentials:
* For API Key, config["region"] is used.
* For Resource Principal, signer.region is used.
This is required if the conda pack is stored in a different region.
Returns
-------
self
The runtime instance.
See Also
--------
https://docs.oracle.com/en-us/iaas/data-science/using/conda_publishs_object.htm
"""
conda_spec = {
self.CONST_CONDA_TYPE: self.CONST_CONDA_TYPE_CUSTOM,
self.CONST_CONDA_URI: uri,
}
if region:
conda_spec[self.CONST_CONDA_REGION] = region
return self.set_spec(self.CONST_CONDA, conda_spec)
[docs]
def init(self, **kwargs) -> "CondaRuntime":
"""Initializes a starter specification for the runtime.
Parameters
----------
**kwargs: Dict
- conda_slug: str
The conda environment slug.
If it contains '/', then the assumption that this is a custom conda environment.
Returns
-------
CondaRuntime
The runtime instance.
"""
super().init(**kwargs)
conda_slug = kwargs.get("conda_slug", "")
if "/" not in conda_slug:
return self.with_service_conda(conda_slug)
return self.with_custom_conda(
conda_slug
or "{Path to the custom conda environment. Example: oci://bucket@namespace/prefix}"
)
[docs]
class ScriptRuntime(CondaRuntime):
"""Represents job runtime with scripts and conda pack.
This runtime is designed to define job artifacts and configurations supported by OCI Data Science Jobs natively.
It can be used with any script types that is supported by the OCI Data Science Jobs,
including shell scripts and python scripts.
To run a script with all dependencies contained in a local folder::
runtime = (
ScriptRuntime()
# Specify the service conda environment by slug name.
.with_service_conda("pytorch110_p38_cpu_v1")
# The job artifact can be a single Python script, a directory or a zip file.
.with_source("local/path/to/code_dir")
# Environment variable
.with_environment_variable(NAME="Welcome to OCI Data Science.")
# Command line argument
.with_argument("100 linux 'hi there'")
# The entrypoint is applicable only to directory or zip file as source
# The entrypoint should be a path relative to the working dir.
# Here my_script.sh is a file in the code_dir/my_package directory
.with_entrypoint("my_package/my_script.sh")
)
References
----------
https://docs.oracle.com/en-us/iaas/data-science/using/jobs-artifact.htm
"""
CONST_ENTRYPOINT = "entrypoint"
CONST_SCRIPT_PATH = "scriptPathURI"
attribute_map = {
CONST_ENTRYPOINT: CONST_ENTRYPOINT,
CONST_SCRIPT_PATH: "script_path_uri",
}
attribute_map.update(CondaRuntime.attribute_map)
@property
def script_uri(self) -> str:
"""The URI of the source code"""
return self.get_spec(self.CONST_SCRIPT_PATH)
[docs]
def with_script(self, uri: str):
"""Specifies the source code script for the job
Parameters
----------
uri : str
URI to the source code script, which can be any URI supported by fsspec,
including http://, https:// and OCI object storage.
For example: oci://your_bucket@your_namespace/path/to/script.py
Returns
-------
self
The runtime instance.
"""
return self.set_spec(self.CONST_SCRIPT_PATH, uri)
@property
def source_uri(self) -> str:
"""The URI of the source code"""
return self.get_spec(self.CONST_SCRIPT_PATH)
[docs]
def with_source(self, uri: str, entrypoint: str = None):
"""Specifies the source code for the job
Parameters
----------
uri : str
URI to the source code,
which can be a (.py/.sh) script, a zip/tar file or directory containing the scripts/modules
If the source code is a single file, URI can be any URI supported by fsspec,
including http://, https:// and OCI object storage.
For example: oci://your_bucket@your_namespace/path/to/script.py
URI can also be a folder or a zip file containing the source code.
In that case, entrypoint is required.
entrypoint : str, optional
The relative path of the script to be set as entrypoint when source is a zip/tar/directory.
By default None. This is not needed when the source is a single script.
Returns
-------
self
The runtime instance.
"""
if entrypoint:
self.set_spec(self.CONST_ENTRYPOINT, entrypoint)
return self.with_script(uri)
@property
def entrypoint(self) -> str:
"""The relative path of the script to be set as entrypoint when source is a zip/tar/directory."""
return self.get_spec(self.CONST_ENTRYPOINT)
[docs]
def with_entrypoint(self, entrypoint: str):
"""Specify the entrypoint for the job
Parameters
----------
entrypoint : str
The relative path of the script to be set as entrypoint when source is a zip/tar/directory.
Returns
-------
self
The runtime instance.
"""
return self.set_spec(self.CONST_ENTRYPOINT, entrypoint)
[docs]
def init(self, **kwargs) -> "ScriptRuntime":
"""Initializes a starter specification for the runtime.
Returns
-------
ScriptRuntime
The runtime instance.
"""
super().init(**kwargs)
return (
self.with_entrypoint("{For MLflow and Operator will be auto generated}")
.with_script(
"{Path to the script. For MLflow and Operator will be auto generated}"
)
.with_argument(**kwargs.get("args", {}))
)
class _PythonRuntimeMixin(Runtime):
CONST_OUTPUT_DIR = "outputDir"
CONST_OUTPUT_URI = "outputUri"
CONST_PYTHON_PATH = "pythonPath"
CONST_ENTRYPOINT = "entrypoint"
CONST_ENTRY_FUNCTION = "entryFunction"
CONST_WORKING_DIR = "workingDir"
attribute_map = {
CONST_OUTPUT_DIR: "output_dir",
CONST_OUTPUT_URI: "output_uri",
CONST_PYTHON_PATH: "python_path",
CONST_ENTRYPOINT: CONST_ENTRYPOINT,
CONST_ENTRY_FUNCTION: "entry_function",
CONST_WORKING_DIR: "working_dir",
}
attribute_map.update(Runtime.attribute_map)
def with_output(self, output_dir: str, output_uri: str):
"""Specifies the outputs of the job.
The output files in output_dir will be copied to remote output_uri when the job is finished.
Parameters
----------
output_dir : str
Path to the output directory in the job run.
This path should be a relative path from the working directory.
The source code should write all outputs into this directory.
output_uri : str
The OCI object storage URI prefix for saving the output files.
For example, oci://bucket_name@namespace/path/to/directory
Returns
-------
Self
The runtime instance.
"""
self.set_spec(self.CONST_OUTPUT_DIR, output_dir)
self.set_spec(self.CONST_OUTPUT_URI, output_uri)
return self
def with_python_path(self, *python_paths):
"""Specifies additional python paths for running the source code.
Parameters
----------
*python_paths :
Additional python path(s) for running the source code.
Each path should be a relative path from the working directory.
Returns
-------
self
The runtime instance.
"""
python_paths = list(python_paths)
for path in python_paths:
if os.path.isabs(path):
raise ValueError(
f"{path} is an absolute path."
"Please specify relative path from the working directory as python path."
)
return self.set_spec(self.CONST_PYTHON_PATH, python_paths)
def with_entrypoint(self, path: str, func: str = None):
"""Specifies the entrypoint for the job.
The entrypoint can be a script or a function in a script.
Parameters
----------
script : str
The relative path for the script/module starting the job.
func : str, optional
The function name in the script for starting the job, by default None.
If this is not specified, the script will be run with python command in a subprocess.
Returns
-------
self
The runtime instance.
"""
self.set_spec(self.CONST_ENTRYPOINT, path)
self.set_spec(self.CONST_ENTRY_FUNCTION, func)
return self
def with_working_dir(self, working_dir: str):
"""Specifies the working directory in the job run.
By default, the working directory will the directory containing the user code (job artifact directory).
This can be changed by specifying a relative path to the job artifact directory.
Parameters
----------
working_dir : str
The path of the working directory.
This can be a relative path from the job artifact directory.
Returns
-------
self
The runtime instance.
"""
return self.set_spec(self.CONST_WORKING_DIR, working_dir)
@property
def working_dir(self) -> str:
"""The working directory for the job run."""
return self.get_spec(self.CONST_WORKING_DIR, ".")
@property
def output_dir(self) -> str:
"""Directory in the Job run container for saving output files generated in the job"""
return self.get_spec(self.CONST_OUTPUT_DIR)
@property
def output_uri(self) -> str:
"""OCI object storage URI prefix for saving output files generated in the job"""
return self.get_spec(self.CONST_OUTPUT_DIR)
@property
def python_path(self):
"""Additional python paths for running the source code."""
return self.get_spec(self.CONST_PYTHON_PATH)
@property
def entry_script(self) -> str:
"""The path of the entry script"""
return self.get_spec(self.CONST_ENTRYPOINT)
@property
def entry_function(self) -> str:
"""The name of the entry function in the entry script"""
return self.get_spec(self.CONST_ENTRY_FUNCTION)
[docs]
class PythonRuntime(ScriptRuntime, _PythonRuntimeMixin):
"""Represents a job runtime using ADS driver script to run Python code
Example::
runtime = (
PythonRuntime()
# Specify the service conda environment by slug name.
.with_service_conda("pytorch110_p38_cpu_v1")
# The job artifact can be a single Python script, a directory or a zip file.
.with_source("local/path/to/code_dir")
# Environment variable
.with_environment_variable(NAME="Welcome to OCI Data Science.")
# Command line argument, arg1 --key arg2
.with_argument("arg1", key="arg2")
# Set the working directory
# When using a directory as source, the default working dir is the parent of code_dir.
# Working dir should be a relative path beginning from the source directory (code_dir)
.with_working_dir("code_dir")
# The entrypoint is applicable only to directory or zip file as source
# The entrypoint should be a path relative to the working dir.
# Here my_script.py is a file in the code_dir/my_package directory
.with_entrypoint("my_package/my_script.py")
# Add an additional Python path, relative to the working dir (code_dir/other_packages).
.with_python_path("other_packages")
# Copy files in "code_dir/output" to object storage after job finishes.
.with_output("output", "oci://bucket_name@namespace/path/to/dir")
)
"""
attribute_map = {}
attribute_map.update(ScriptRuntime.attribute_map)
attribute_map.update(_PythonRuntimeMixin.attribute_map)
[docs]
def init(self, **kwargs) -> "PythonRuntime":
"""Initializes a starter specification for the runtime.
Returns
-------
PythonRuntime
The runtime instance.
"""
super().init(**kwargs)
return (
self.with_working_dir("{For MLflow and Operator will be auto generated}")
.with_entrypoint("{For MLflow and Operator will be auto generated}")
.with_script(
"{Path to the script. For MLflow and Operator will be auto generated}"
)
)
[docs]
class NotebookRuntime(CondaRuntime):
"""Represents a job runtime with Jupyter notebook
To run a job with a single Jupyter notebook,
you can define the run time as::
runtime = (
NotebookRuntime()
.with_notebook(
path="https://raw.githubusercontent.com/tensorflow/docs/master/site/en/tutorials/customization/basics.ipynb",
encoding='utf-8'
)
.with_service_conda("tensorflow28_p38_cpu_v1")
.with_environment_variable(GREETINGS="Welcome to OCI Data Science")
.with_exclude_tag(["ignore", "remove"])
.with_output("oci://bucket_name@namespace/path/to/dir")
)
Note that the notebook path can be local or remote path supported by fsspec,
including OCI object storage path like ``oci://bucket@namespace/path/to/notebook``
"""
CONST_NOTEBOOK_PATH = "notebookPathURI"
CONST_NOTEBOOK_ENCODING = "notebookEncoding"
CONST_OUTPUT_URI = "outputUri"
CONST_OUTPUT_URI_ALT = "outputURI"
CONST_EXCLUDE_TAG = "excludeTags"
CONST_SOURCE = "source"
CONST_ENTRYPOINT = "entrypoint"
attribute_map = {
CONST_NOTEBOOK_PATH: "notebook_path_uri",
CONST_NOTEBOOK_ENCODING: "notebook_encoding",
CONST_OUTPUT_URI: "output_uri",
CONST_EXCLUDE_TAG: "exclude_tags",
CONST_SOURCE: "source",
CONST_ENTRYPOINT: "entrypoint",
}
attribute_map.update(CondaRuntime.attribute_map)
def __init__(self, spec: Dict = None, **kwargs) -> None:
if spec and self.CONST_OUTPUT_URI_ALT in spec:
val = spec.pop(self.CONST_OUTPUT_URI_ALT)
spec[self.CONST_OUTPUT_URI] = val
if self.CONST_OUTPUT_URI_ALT in kwargs:
val = kwargs.pop(self.CONST_OUTPUT_URI_ALT)
kwargs[self.CONST_OUTPUT_URI] = val
super().__init__(spec, **kwargs)
@property
def notebook_uri(self) -> str:
"""The URI of the notebook"""
return self.get_spec(self.CONST_NOTEBOOK_PATH)
@property
def notebook_encoding(self) -> str:
"""The encoding of the notebook"""
return self.get_spec(self.CONST_NOTEBOOK_ENCODING)
[docs]
def with_notebook(self, path: str, encoding="utf-8") -> NotebookRuntime:
"""Specifies the notebook to be run as a job.
Use this method if you would like to run a single notebook.
Use ``with_source()`` method if you would like to run a notebook with additional dependency files.
Parameters
----------
path : str
The path of the Jupyter notebook
encoding : str
The encoding for opening the notebook. Defaults to utf-8.
Returns
-------
self
The runtime instance.
"""
self.set_spec(self.CONST_NOTEBOOK_ENCODING, encoding)
return self.set_spec(self.CONST_NOTEBOOK_PATH, path)
@property
def exclude_tag(self) -> list:
"""A list of cell tags indicating cells to be excluded from the job"""
return self.get_spec(self.CONST_EXCLUDE_TAG, [])
[docs]
def with_exclude_tag(self, *tags) -> NotebookRuntime:
"""Specifies the cell tags in the notebook to exclude cells from the job script.
Parameters
----------
*tags : list
A list of tags (strings).
Returns
-------
self
The runtime instance.
"""
exclude_tag_list = []
for tag in tags:
if isinstance(tag, list):
exclude_tag_list.extend(tag)
else:
exclude_tag_list.append(tag)
return self.set_spec(self.CONST_EXCLUDE_TAG, exclude_tag_list)
@property
def output_uri(self) -> list:
"""URI for storing the output notebook and files"""
return self.get_spec(self.CONST_OUTPUT_URI)
[docs]
def with_output(self, output_uri: str) -> NotebookRuntime:
"""Specifies the output URI for storing the output notebook and files.
All files in the directory containing the notebook will be saved.
Parameters
----------
output_uri : str
URI for a directory storing the output notebook and files.
For example, oci://bucket@namespace/path/to/dir
Returns
-------
self
The runtime instance.
"""
return self.set_spec(self.CONST_OUTPUT_URI, output_uri)
[docs]
def with_source(self, uri: str, notebook: str, encoding="utf-8"):
"""Specify source code directory containing the notebook and dependencies for the job.
Use this method if you would like to run a notebook with additional dependency files.
Use the `with_notebook()` method if you would like to run a single notebook.
In the following example, local folder "path/to/source" contains the notebook and dependencies,
The local path of the notebook is "path/to/source/relative/path/to/notebook.ipynb"::
runtime.with_source(uri="path/to/source", notebook="relative/path/to/notebook.ipynb")
Parameters
----------
uri : str
URI of the source code directory. This can be local or on OCI object storage.
notebook : str
The relative path of the notebook from the source URI.
encoding : str
The encoding for opening the notebook. Defaults to utf-8.
Returns
-------
Self
The runtime instance.
"""
self.set_spec(self.CONST_SOURCE, uri)
self.set_spec(self.CONST_ENTRYPOINT, notebook)
self.set_spec(self.CONST_NOTEBOOK_ENCODING, encoding)
return self
@property
def source(self) -> str:
"""The source code location."""
return self.get_spec(self.CONST_SOURCE)
@property
def notebook(self) -> str:
"""The path of the notebook relative to the source."""
return self.get_spec(self.CONST_ENTRYPOINT)
[docs]
def init(self, **kwargs) -> "NotebookRuntime":
"""Initializes a starter specification for the runtime.
Returns
-------
NotebookRuntime
The runtime instance.
"""
super().init(**kwargs)
return self.with_source(
uri="{Path to the source code directory. For MLflow, it will be replaced with the path to the project}",
notebook="{Entrypoint notebook. For MLflow, it will be replaced with the CMD}",
).with_exclude_tag("tag1")
[docs]
class GitPythonRuntime(CondaRuntime, _PythonRuntimeMixin):
"""Represents a job runtime with source code from git repository
Example::
runtime = (
GitPythonRuntime()
.with_environment_variable(GREETINGS="Welcome to OCI Data Science")
# Specify the service conda environment by slug name.
.with_service_conda("pytorch19_p37_gpu_v1")
# Specify the git repository
# Optionally, you can specify the branch or commit
.with_source("https://github.com/pytorch/tutorials.git")
# Entrypoint is a relative path from the root of the git repo.
.with_entrypoint("beginner_source/examples_nn/polynomial_nn.py")
# Copy files in "beginner_source/examples_nn" to object storage after job finishes.
.with_output(
output_dir="beginner_source/examples_nn",
output_uri="oci://bucket_name@namespace/path/to/dir"
)
)
"""
CONST_GIT_URL = "url"
CONST_BRANCH = "branch"
CONST_COMMIT = "commit"
CONST_GIT_SSH_SECRET_ID = "gitSecretId"
CONST_SKIP_METADATA = "skipMetadataUpdate"
attribute_map = {
CONST_GIT_URL: CONST_GIT_URL,
CONST_BRANCH: CONST_BRANCH,
CONST_COMMIT: CONST_COMMIT,
CONST_GIT_SSH_SECRET_ID: "git_secret_id",
CONST_SKIP_METADATA: "skip_metadata_update",
}
attribute_map.update(CondaRuntime.attribute_map)
attribute_map.update(_PythonRuntimeMixin.attribute_map)
@property
def skip_metadata_update(self):
"""Indicate if the metadata update should be skipped after the job run
By default, the job run metadata will be updated with the following freeform tags:
* repo: The URL of the Git repository
* commit: The Git commit ID
* module: The entry script/module
* method: The entry function/method
* outputs. The prefix of the output files in object storage.
This update step also requires resource principals to have the permission to update the job run.
Returns
-------
bool
True if the metadata update will be skipped. Otherwise False.
"""
return self.get_spec(self.CONST_SKIP_METADATA, False)
[docs]
def with_source(
self, url: str, branch: str = None, commit: str = None, secret_ocid: str = None
):
"""Specifies the Git repository and branch/commit for the job source code.
Parameters
----------
url : str
URL of the Git repository.
branch : str, optional
Git branch name, by default None, the default branch will be used.
commit : str, optional
Git commit ID (SHA1 hash), by default None, the most recent commit will be used.
secret_ocid : str
The secret OCID storing the SSH key content for checking out the Git repository.
Returns
-------
self
The runtime instance.
"""
self.set_spec(self.CONST_GIT_URL, url)
self.set_spec(self.CONST_BRANCH, branch)
self.set_spec(self.CONST_COMMIT, commit)
self.set_spec(self.CONST_GIT_SSH_SECRET_ID, secret_ocid)
return self
@property
def url(self) -> str:
"""URL of the Git repository."""
return self.get_spec(self.CONST_GIT_URL)
@property
def branch(self) -> str:
"""Git branch name."""
return self.get_spec(self.CONST_BRANCH)
@property
def commit(self) -> str:
"""Git commit ID (SHA1 hash)"""
return self.get_spec(self.CONST_COMMIT)
@property
def ssh_secret_ocid(self) -> str:
"""The OCID of the OCI Vault secret storing the Git SSH key."""
return self.get_spec(self.CONST_GIT_SSH_SECRET_ID)
[docs]
def init(self, **kwargs) -> "GitPythonRuntime":
"""Initializes a starter specification for the runtime.
Returns
-------
GitPythonRuntime
The runtime instance.
"""
super().init(**kwargs)
return self.with_source(
"{Git URI. For MLflow and Operator will be auto generated}"
).with_entrypoint("{For MLflow and Operator will be auto generated}")
[docs]
class DataFlowRuntime(CondaRuntime):
CONST_SCRIPT_BUCKET = "scriptBucket"
CONST_ARCHIVE_BUCKET = "archiveBucket"
CONST_ARCHIVE_URI = "archiveUri"
CONST_SCRIPT_PATH = "scriptPathURI"
CONST_CONFIGURATION = "configuration"
CONST_CONDA_AUTH_TYPE = "condaAuthType"
CONST_OVERWRITE = "overwrite"
attribute_map = {
CONST_SCRIPT_BUCKET: "script_bucket",
CONST_ARCHIVE_URI: "archive_bucket",
CONST_ARCHIVE_URI: "archive_uri",
CONST_SCRIPT_PATH: "script_path_uri",
CONST_CONFIGURATION: CONST_CONFIGURATION,
CONST_CONDA_AUTH_TYPE: "conda_auth_type",
CONST_OVERWRITE: CONST_OVERWRITE,
}
attribute_map.update(Runtime.attribute_map)
[docs]
def with_conda(self, conda_spec: dict = None):
if conda_spec.get(self.CONST_CONDA_TYPE) == self.CONST_CONDA_TYPE_SERVICE:
raise NotImplementedError(
"Service Packs not supported. Please download and re-upload as a custom pack."
)
elif conda_spec.get(self.CONST_CONDA_TYPE) == self.CONST_CONDA_TYPE_CUSTOM:
return self.with_custom_conda(
uri=conda_spec.get(self.CONST_CONDA_URI),
region=conda_spec.get(self.CONST_CONDA_REGION),
)
else:
raise ValueError(
f"Unknown conda type: {conda_spec.get(self.CONST_CONDA_TYPE)}."
)
[docs]
def with_service_conda(self, slug: str):
raise NotImplementedError(
"Publish this conda pack first, and provide the published conda pack uri."
)
[docs]
def with_custom_conda(self, uri: str, region: str = None, auth_type: str = None):
"""Specifies the custom conda pack for running the job
Parameters
----------
uri : str
The OCI object storage URI for the conda pack,
e.g. "oci://your_bucket@namespace/object_name."
In the Environment Explorer of an OCI notebook session,
this is shown as the "source" of the conda pack.
region: str, optional
The region of the bucket storing the custom conda pack, by default None.
If region is not specified, ADS will use the region from your authentication credentials,
* For API Key, config["region"] is used.
* For Resource Principal, signer.region is used.
This is required if the conda pack is stored in a different region.
auth_type: str, (="resource_principal")
One of "resource_principal", "api_keys", "instance_principal", etc.
Auth mechanism used to read the conda back uri provided.
Returns
-------
self
The runtime instance.
See Also
--------
https://docs.oracle.com/en-us/iaas/data-science/using/conda_publishs_object.htm
"""
if not auth_type:
auth_type = "resource_principal"
self.set_spec(self.CONST_CONDA_AUTH_TYPE, auth_type)
return super().with_custom_conda(uri=uri, region=region)
[docs]
def with_archive_uri(self, uri: str) -> "DataFlowRuntime":
"""
Set archive uri (which is a zip file containing dependencies).
Parameters
----------
uri: str
uri to the archive zip
Returns
-------
DataFlowRuntime
runtime instance itself
"""
return self.set_spec(self.CONST_ARCHIVE_URI, uri)
@property
def archive_uri(self):
"""The Uri of archive zip"""
return self.get_spec(self.CONST_ARCHIVE_URI)
@property
def script_uri(self) -> str:
"""The URI of the source code"""
return self.get_spec(self.CONST_SCRIPT_PATH)
[docs]
def with_script_uri(self, path: str) -> "DataFlowRuntime":
"""
Set script uri.
Parameters
----------
path: str
uri to the script
Returns
-------
DataFlowRuntime
runtime instance itself
"""
return self.set_spec(self.CONST_SCRIPT_PATH, path)
[docs]
def with_script_bucket(self, bucket) -> "DataFlowRuntime":
"""
Set object storage bucket to save the script, in case script uri given is local.
Parameters
----------
bucket: str
name of the bucket
Returns
-------
DataFlowRuntime
runtime instance itself
"""
return self.set_spec(self.CONST_SCRIPT_BUCKET, bucket)
@property
def script_bucket(self) -> str:
"""Bucket to save script"""
return self.get_spec(self.CONST_SCRIPT_BUCKET)
[docs]
def with_archive_bucket(self, bucket) -> "DataFlowRuntime":
"""
Set object storage bucket to save the archive zip, in case archive uri given is local.
Parameters
----------
bucket: str
name of the bucket
Returns
-------
DataFlowRuntime
runtime instance itself
"""
return self.set_spec(self.CONST_ARCHIVE_BUCKET, bucket)
@property
def archive_bucket(self) -> str:
"""Bucket to save archive zip"""
return self.get_spec(self.CONST_ARCHIVE_BUCKET)
[docs]
def with_configuration(self, config: dict) -> "DataFlowRuntime":
"""
Set Configuration for Spark.
Parameters
----------
config: dict
dictionary of configuration details
https://spark.apache.org/docs/latest/configuration.html#available-properties.
Example: { “spark.app.name” : “My App Name”, “spark.shuffle.io.maxRetries” : “4” }
Returns
-------
DataFlowRuntime
runtime instance itself
"""
return self.set_spec(self.CONST_CONFIGURATION, config)
@property
def configuration(self) -> dict:
"""Configuration for Spark"""
return self.get_spec(self.CONST_CONFIGURATION)
[docs]
def with_overwrite(self, overwrite: bool) -> "DataFlowRuntime":
"""
Whether to overwrite the existing script in object storage (script bucket).
If the Object Storage bucket already contains a script with the same name,
then it will be overwritten with the new one if the `overwrite` flag equal to `True`.
Parameters
----------
overwrite: bool
Whether to overwrite the existing script in object storage (script bucket).
Returns
-------
DataFlowRuntime
The DataFlowRuntime instance (self).
"""
return self.set_spec(self.CONST_OVERWRITE, overwrite)
@property
def overwrite(self) -> str:
"""Whether to overwrite the existing script in object storage (script bucket)."""
return self.get_spec(self.CONST_OVERWRITE)
[docs]
def convert(self, **kwargs):
pass
[docs]
def init(self, **kwargs) -> "DataFlowRuntime":
"""Initializes a starter specification for the runtime.
Returns
-------
DataFlowRuntime
The runtime instance.
"""
super().init(**kwargs)
self._spec.pop(self.CONST_ENV_VAR, None)
return (
self.with_script_uri(
"{Path to the executable script. For MLflow and Operator will auto generated}"
)
.with_script_bucket(
kwargs.get(
"script_bucket",
"{The object storage bucket to save a script. "
"Example: oci://<bucket_name>@<tenancy>/<prefix>}",
)
)
.with_overwrite(True)
.with_configuration({"spark.driverEnv.env_key": "env_value"})
)
[docs]
class DataFlowNotebookRuntime(DataFlowRuntime, NotebookRuntime):
[docs]
def convert(self, overwrite=False):
if self.output_uri:
path = os.path.join(
self.output_uri,
str(os.path.basename(self.notebook_uri)).replace(".ipynb", ".py"),
)
else:
path = os.path.splitext(self.notebook_uri)[0] + ".py"
exclude_tags = self.exclude_tag or {}
convert_notebook(
self.notebook_uri, default_signer(), exclude_tags, path, overwrite=overwrite
)
self.set_spec(self.CONST_SCRIPT_PATH, path)