#!/usr/bin/env python
# -*- coding: utf-8; -*-
# Copyright (c) 2021, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
import contextlib
import logging
import os
import shutil
import tempfile
from io import DEFAULT_BUFFER_SIZE
from urllib import request
from urllib.parse import urlparse
import fsspec
from ads.common.auth import default_signer
logger = logging.getLogger(__name__)
[docs]
class Artifact:
"""Represents a OCI Data Science Job artifact.
The Artifact class is designed to add an additional processing step on runtime/source code.
before uploading it as data science job artifact.
A sub-class should implement the build() method to do the additional processing.
A sub-class is designed to be used with context manager so that the temporary files are cleaned up properly.
For example, the NotebookArtifact implements the build() method to convert the notebook to python script.
with NotebookArtifact(runtime) as artifact:
* The build() method will be called when entering the context manager
* The final artifact for the job will be stored in artifact.path upload_artifact(artifact.path)
* Files are cleaned up when exit or if there is an exception.
"""
CONST_DRIVER_UTILS = "driver_utils.py"
CONST_DRIVER_NOTEBOOK = "driver_notebook.py"
def __init__(self, source, runtime=None) -> None:
# Get the full path of source file if it is local file.
if source and not urlparse(source).scheme:
self.source = os.path.abspath(os.path.expanduser(source))
else:
self.source = source
self.path = None
self.temp_dir = None
self.runtime = runtime
def __str__(self) -> str:
if self.path:
return self.path
return self.source
def __enter__(self):
self.temp_dir = tempfile.TemporaryDirectory()
self.build()
return self
def __exit__(self, *exc):
if self.temp_dir:
self.temp_dir.cleanup()
@staticmethod
def _write_file(fp, to_path):
"""Reads a file from a file-like object and write it to specific local path.
Parameters
----------
fp : file-like object
The source of the file.
to_path : path-like object
Local destination path.
"""
with open(to_path, "wb") as out_file:
block_size = DEFAULT_BUFFER_SIZE * 8
while True:
block = fp.read(block_size)
if not block:
break
out_file.write(block)
@staticmethod
def _download_from_web(url, to_path):
"""Downloads a single file from http/https/ftp.
Parameters
----------
url : str
The URL of the source file.
to_path : path-like object
Local destination path.
"""
url_response = request.urlopen(url)
with contextlib.closing(url_response) as fp:
logger.debug("Downloading from %s", url)
Artifact._write_file(fp, to_path)
[docs]
@staticmethod
def copy_from_uri(uri, to_path, unpack=False, **storage_options):
"""Copy file(s) to local path
Parameters
----------
uri : str
The URI of the source file or directory, which can be local path of OCI object storage URI.
to_path : path-like object
The local destination path.
If this is a directory, the source file/directory will be placed under it.
unpack : bool
Indicate if zip or tar.gz file specified by the uri should be unpacked.
This option has no effect on other files.
storage_options :
Storage options for fsspec.
For OCI object storage, the default_signer from ads.common.auth will be used
if storage option is not specified.
Returns
-------
str or path-like object
The actual path of file/directory at destination.
* For copying a single file and to_path is a filename, this will be the same as to_path.
* For copying a single file and to_path is a directory, this will be to_path + filename.
* For copying a directory, this will be to_path + directory name.
"""
scheme = urlparse(uri).scheme
# temp_dir is used only if the uri is zip/tar file
with tempfile.TemporaryDirectory() as temp_dir:
if unpack and (
str(uri).endswith(".zip")
or str(uri).endswith(".tar.gz")
or str(uri).endswith(".tar")
or str(uri).endswith(".tgz")
):
unpack_path = to_path
to_path = temp_dir
else:
unpack_path = None
if scheme in ["http", "https", "ftp"]:
if os.path.isdir(to_path):
to_path = os.path.join(to_path, os.path.basename(uri))
Artifact._download_from_web(uri, to_path)
else:
if scheme == "oci" and not storage_options:
storage_options = default_signer()
fs = fsspec.filesystem(scheme, **storage_options)
if os.path.isdir(to_path):
to_path = os.path.join(
to_path, os.path.basename(str(uri).rstrip("/"))
)
fs.get(uri, to_path, recursive=True)
if unpack_path:
shutil.unpack_archive(to_path, unpack_path)
to_path = unpack_path
return to_path
[docs]
def build(self):
"""Builds the runtime artifact in the temporary directory.
Subclass should implement this method to:
1. Process the runtime
2. Set the self.path to the final artifact path
Raises
------
NotImplementedError
When this method is not implemented in the subclass.
"""
raise NotImplementedError()
[docs]
class ScriptArtifact(Artifact):
"""Represents a ScriptRuntime job artifact"""
[docs]
def build(self):
"""Prepares job artifact for script runtime.
If the source is a file, it will be returned as is.
If the source is a directory, it will be compressed as a zip file.
"""
source = self.copy_from_uri(self.source, self.temp_dir.name)
# Zip the artifact if it is a directory
if os.path.isdir(source):
basename = os.path.basename(str(source).rstrip("/"))
source = str(source).rstrip("/")
# Runtime must have entrypoint if the source is a directory
if self.runtime and not self.runtime.entrypoint:
raise ValueError(
"Please specify entrypoint when script source is a directory."
)
output = os.path.join(self.temp_dir.name, basename)
shutil.make_archive(
output, "zip", os.path.dirname(source), base_dir=basename
)
self.path = output + ".zip"
return
# Otherwise, use the artifact directly
self.path = source
[docs]
class PythonArtifact(Artifact):
"""Represents a PythonRuntime job artifact"""
CONST_DRIVER_SCRIPT = "driver_python.py"
DEFAULT_BASENAME = "artifact"
# The directory to store user code
# This directory must match the USER_CODE_DIR in driver_python.py
USER_CODE_DIR = "code"
def __init__(self, source, runtime=None) -> None:
super().__init__(source, runtime)
self.basename = None
self.artifact_dir = None
self.code_dir = None
def _copy_artifacts(self, drivers=None):
"""Copies the drivers and artifacts to the temp artifact dir."""
# The basename of the job artifact,
# this will be the name of the zip file uploading to OCI
if self.source:
self.basename = os.path.basename(str(self.source).rstrip("/")).split(
".", 1
)[0]
else:
self.basename = self.DEFAULT_BASENAME
# The temp dir path for storing the artifacts, including drivers and user code
self.artifact_dir = os.path.join(self.temp_dir.name, self.basename)
# The temp dir path for storing the user code
self.code_dir = os.path.join(self.artifact_dir, self.USER_CODE_DIR)
os.makedirs(self.artifact_dir, exist_ok=True)
if not drivers:
drivers = []
# Copy the driver script
for filename in drivers:
file_path = os.path.join(
os.path.dirname(__file__), "../../templates", filename
)
shutil.copy(file_path, os.path.join(self.artifact_dir, filename))
# Copy user code
if self.source:
os.makedirs(self.code_dir, exist_ok=True)
Artifact.copy_from_uri(self.source, self.code_dir, unpack=True)
def _zip_artifacts(self):
"""Create a zip file from the temp artifact dir."""
output = os.path.join(self.temp_dir.name, self.basename)
shutil.make_archive(output, "zip", self.artifact_dir, base_dir="./")
return output + ".zip"
[docs]
def build(self):
"""Prepares job artifact for PythonRuntime."""
self._copy_artifacts(
drivers=[
self.CONST_DRIVER_UTILS,
self.CONST_DRIVER_SCRIPT,
self.CONST_DRIVER_NOTEBOOK,
]
)
# Check if entrypoint is valid
# If the user code is a directory,
# user should specify the entrypoint with the name of the top level directory.
# For example, if the user code is in "path/to/dir"
# The entrypoint should be something like dir/
if self.runtime and self.runtime.entrypoint:
if not os.path.exists(
os.path.join(
self.code_dir, self.runtime.working_dir, self.runtime.entrypoint
)
):
# The specific entrypoint does not exist.
# Check if user forgot to specify the top level directory.
possible_entrypoint = os.path.join(
self.code_dir,
self.runtime.working_dir,
self.basename,
self.runtime.entrypoint,
)
err_message = (
f"Invalid entrypoint. {self.runtime.entrypoint} does not exist."
)
if os.path.exists(possible_entrypoint):
suggested_entrypoint = os.path.join(
self.basename, self.runtime.entrypoint
)
err_message += f" Do you mean {suggested_entrypoint}?"
logger.warning(err_message)
# Zip the job artifact
self.path = self._zip_artifacts()
[docs]
class NotebookArtifact(PythonArtifact):
"""Represents a NotebookRuntime job artifact"""
CONST_DRIVER_SCRIPT = PythonArtifact.CONST_DRIVER_NOTEBOOK
[docs]
def build(self):
"""Prepares job artifact for notebook runtime"""
# Copy job artifacts
self._copy_artifacts([self.CONST_DRIVER_UTILS, self.CONST_DRIVER_NOTEBOOK])
if self.runtime.notebook:
notebook_path = os.path.join(
os.path.basename(self.runtime.source), self.runtime.notebook
)
if not os.path.exists(os.path.join(self.code_dir, notebook_path)):
raise ValueError(
f"Invalid notebook path: {self.runtime.notebook}\n"
+ f"Please make sure your {self.runtime.source} contains the notebook and "
+ "the notebook path is relative to the root of the source."
)
# Zip the job artifact
self.path = self._zip_artifacts()
[docs]
class GitPythonArtifact(Artifact):
CONST_DRIVER_SCRIPT = "driver_oci.py"
def __init__(self) -> None:
super().__init__("", runtime=None)
[docs]
def build(self):
"""Prepares job artifact for GitPythonRuntime."""
artifact_dir = os.path.join(self.temp_dir.name, "ads_driver")
os.makedirs(artifact_dir, exist_ok=True)
for filename in [
self.CONST_DRIVER_UTILS,
self.CONST_DRIVER_SCRIPT,
NotebookArtifact.CONST_DRIVER_SCRIPT,
]:
file_path = os.path.join(
os.path.dirname(__file__), "../../templates", filename
)
shutil.copy(file_path, os.path.join(artifact_dir, filename))
# Zip the job artifact
shutil.make_archive(artifact_dir, "zip", artifact_dir, base_dir="./")
self.path = artifact_dir + ".zip"