#!/usr/bin/env python
# -*- coding: utf-8; -*-
# Copyright (c) 2022 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
import datetime
import os
import shutil
import subprocess
import shlex
import json
import glob
from typing import Dict
import click
import yaml
from datetime import datetime
import ocifs
from ads.common.oci_client import OCIClientFactory
from ads.common.auth import create_signer
from ads.common.decorator.runtime_dependency import (
runtime_dependency,
OptionalDependency,
)
from ads.config import NO_CONTAINER
from ads.opctl.constants import (
ML_JOB_GPU_IMAGE,
ML_JOB_IMAGE,
DEFAULT_IMAGE_HOME_DIR,
DEFAULT_IMAGE_CONDA_DIR,
DEFAULT_NOTEBOOK_SESSION_SPARK_CONF_DIR,
)
from ads.opctl import logger
from ads.opctl.config.utils import read_from_ini
from ads.opctl.utils import (
parse_conda_uri,
run_container,
get_docker_client,
is_in_notebook_session,
run_command,
)
from ads.opctl.config.base import ConfigProcessor
from ads.opctl.config.merger import ConfigMerger
from ads.opctl.conda.multipart_uploader import MultiPartUploader
import tempfile
def _fetch_manifest_template() -> Dict:
manifest_template_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "manifest_template.yaml"
)
with open(manifest_template_file) as manifest_file:
manifest_template = yaml.safe_load(manifest_file)
return manifest_template
@runtime_dependency(module="docker", install_from=OptionalDependency.OPCTL)
def _check_job_image_exists(gpu: bool) -> None:
if gpu:
image = ML_JOB_GPU_IMAGE
else:
image = ML_JOB_IMAGE
try:
client = get_docker_client()
client.api.inspect_image(image)
except docker.errors.ImageNotFound:
if gpu:
cmd = "`ads opctl build-image -g job-local`"
else:
cmd = "`ads opctl build-image job-local`"
raise RuntimeError(
f"Please run {cmd} to build a local image for Jobs development first."
)
def _get_name(name: str, env_file: str) -> str:
if not name and env_file:
with open(env_file) as f:
name = yaml.safe_load(f.read()).get("manifest").get("name", None)
if not name:
raise ValueError(
"Either specify environment name in environment yaml or with `--name`."
)
return name
[docs]
def create(**kwargs) -> str:
p = ConfigProcessor().step(ConfigMerger, **kwargs)
exec_config = p.config["execution"]
name = _get_name(exec_config.get("name"), exec_config.get("environment_file"))
_create(
name=name,
version=exec_config["version"],
env_file=exec_config["environment_file"],
conda_pack_folder=exec_config["conda_pack_folder"],
gpu=exec_config.get("gpu", False),
overwrite=exec_config.get("overwrite", False),
)
def _create(
name: str,
version: str,
env_file: str,
conda_pack_folder: str,
gpu: bool,
overwrite: bool,
prepare_publish: bool = False,
) -> str:
"""Create a conda pack given an environment yaml file under conda pack folder specified.
Parameters
----------
slug : str
slug of the conda pack
env_file : str
path to conda environment yaml
conda_pack_folder : str
path to conda pack folder
gpu : bool
whether to build against GPU image
overwrite : bool
whether to overwrite existing pack of the same slug
prepare_pubish : bool
whether to create conda pack archive after conda pack is created
Raises
------
FileNotFoundError
Environment file not found
RuntimeError
Environment file may be inproperly removed
RuntimeError
Creating pack failed
Return
------
str:
slug of the environment
"""
if not os.path.exists(env_file):
raise FileNotFoundError(f"Environment file {env_file} is not found.")
conda_dep = None
with open(env_file) as mfile:
conda_dep = yaml.safe_load(mfile.read())
# If manifest exists in the environment.yaml file, use that
manifest = conda_dep.get("manifest", {})
slug = manifest.get(
"slug", f"{name}_v{version}".replace(" ", "").replace(".", "_").lower()
)
pack_folder_path = os.path.join(
os.path.abspath(os.path.expanduser(conda_pack_folder)), slug
)
if os.path.exists(pack_folder_path):
overwrite = overwrite or click.confirm(
f"Conda pack with slug {slug} already exists. Do you wish to overwrite?"
)
if overwrite:
if (
os.path.commonpath(
[pack_folder_path, os.path.abspath(os.path.expanduser(env_file))]
)
== pack_folder_path
):
raise RuntimeError(
f"Environment file {os.path.abspath(os.path.expanduser(env_file))} is in {pack_folder_path}. Overwriting this folder will remove the file."
)
shutil.rmtree(pack_folder_path)
else:
return
os.makedirs(pack_folder_path, exist_ok=True)
logger.info(
f"Preparing manifest. Manifest in the environment: {conda_dep.get('manifest')}"
)
manifest_template = _fetch_manifest_template()
if "name" not in manifest:
manifest_template["manifest"]["name"] = name
manifest_template["manifest"]["slug"] = slug
if "type" not in manifest:
logger.info("Setting manifest to published")
manifest_template["manifest"]["type"] = "published"
if "version" not in manifest:
manifest_template["manifest"]["version"] = version
manifest_template["manifest"]["arch_type"] = "GPU" if gpu else "CPU"
manifest_template["manifest"]["create_date"] = datetime.utcnow().strftime(
"%a, %b %d, %Y, %H:%M:%S %Z UTC"
)
if not "manifest_version" in manifest:
manifest_template["manifest"]["manifest_version"] = "1.0"
logger.info(f"Creating conda environment {slug}")
manifest_dict = {
k: manifest_template["manifest"][k]
for k in manifest_template["manifest"]
if manifest_template["manifest"][k]
}
if "manifest" in conda_dep:
conda_dep["manifest"].update(manifest_dict)
else:
conda_dep["manifest"] = manifest_dict
logger.info(f"Updated conda environment manifest: {conda_dep.get('manifest')}")
if is_in_notebook_session() or NO_CONTAINER:
command = f"conda env create --prefix {pack_folder_path} --file {os.path.abspath(os.path.expanduser(env_file))}"
proc = run_command(command, shell=True)
if proc.returncode != 0:
raise RuntimeError(
f"Failed to create conda environment. (exit code {proc.returncode})"
)
else:
_check_job_image_exists(gpu)
docker_pack_folder_path = os.path.join(DEFAULT_IMAGE_HOME_DIR, slug)
docker_env_file_path = os.path.join(
DEFAULT_IMAGE_HOME_DIR, os.path.basename(env_file)
)
create_command = f"conda env create --prefix {docker_pack_folder_path} --file {docker_env_file_path}"
volumes = {
pack_folder_path: {"bind": docker_pack_folder_path},
os.path.abspath(os.path.expanduser(env_file)): {
"bind": docker_env_file_path
},
}
if gpu:
image = ML_JOB_GPU_IMAGE
else:
image = ML_JOB_IMAGE
try:
if prepare_publish:
tmp_file = tempfile.NamedTemporaryFile(suffix=".yaml")
# Save the manifest in the temp file that can be mounted inside the container so that archiving will work
with open(tmp_file.name, "w") as f:
yaml.safe_dump(conda_dep, f)
pack_script = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "pack.py"
)
pack_command = f"python {os.path.join(DEFAULT_IMAGE_HOME_DIR, 'pack.py')} --conda-path {docker_pack_folder_path} --manifest-location {os.path.join(DEFAULT_IMAGE_HOME_DIR, 'manifest.yaml')}"
# add pack script and manifest file to the mount so that archive can be created in the same container run
condapack_script = {
pack_script: {
"bind": os.path.join(DEFAULT_IMAGE_HOME_DIR, "pack.py")
},
tmp_file.name: {
"bind": os.path.join(DEFAULT_IMAGE_HOME_DIR, "manifest.yaml")
},
}
volumes = {
**volumes,
**condapack_script,
} # | not supported in python 3.8
run_container(
image=image,
bind_volumes=volumes,
entrypoint="/bin/bash -c ",
env_vars={},
command=f" '{create_command} && {pack_command}'",
)
else:
run_container(
image=image,
bind_volumes=volumes,
env_vars={},
command=create_command,
)
except Exception:
if os.path.exists(pack_folder_path):
shutil.rmtree(pack_folder_path)
raise RuntimeError(f"Could not create environment {slug}.")
# Save the manifest file inside the host machine, where the conda environment is saved.
manifest_location = f"{os.path.join(pack_folder_path, slug)}_manifest.yaml"
with open(manifest_location, "w") as mfile:
yaml.safe_dump(conda_dep, mfile)
logger.info(f"Environment `{slug}` setup complete.")
print(f"Pack {slug} created under {pack_folder_path}.")
return slug
def _fetch_pack_path(
slug: str,
conda_pack_os_prefix: str,
oci_config: str,
oci_profile: str,
auth_type: str,
) -> str:
oci_auth = create_signer(auth_type, oci_config, oci_profile)
fs = ocifs.OCIFileSystem(**oci_auth)
fnames = fs.ls(conda_pack_os_prefix, detail=True, refresh=True)
for f in fnames:
if os.path.basename(f["name"]) == slug:
return f"oci://{f['name']}"
elif f["type"] == "directory":
path = _fetch_pack_path(
slug, f"oci://{f['name']}", oci_config, oci_profile, auth_type
)
if path:
return path
return None
[docs]
def install(**kwargs) -> None:
p = ConfigProcessor().step(ConfigMerger, **kwargs)
exec_config = p.config["execution"]
if exec_config.get("slug") and exec_config.get("conda_pack_os_prefix"):
conda_uri = _fetch_pack_path(
exec_config["slug"],
exec_config["conda_pack_os_prefix"],
exec_config.get("oci_config"),
exec_config.get("oci_profile"),
exec_config.get("auth"),
)
if not conda_uri:
raise FileNotFoundError(
f"{exec_config['slug']} not found under {exec_config['conda_pack_os_prefix']}"
)
elif exec_config.get("conda_uri"):
conda_uri = exec_config["conda_uri"]
else:
raise ValueError(
"Either `--conda-uri`, or `--slug` and `--conda-pack-os-prefix` has to be specified."
)
_install(
conda_uri,
os.path.abspath(os.path.expanduser(exec_config["conda_pack_folder"])),
exec_config.get("oci_config"),
exec_config.get("oci_profile"),
overwrite=exec_config.get("overwrite", False),
debug=kwargs.get("debug", False),
auth_type=exec_config.get("auth"),
)
def _install(
conda_uri: str,
conda_pack_folder: str,
oci_config: str = None,
oci_profile: str = None,
overwrite: bool = False,
debug: bool = False,
auth_type: str = None,
) -> None:
"""
Install conda pack.
Parameters
----------
conda_uri: str
OCI object storage uri to conda pack
conda_pack_folder: str
local folder to save conda packs
oci_config: str
path to OCI config file
oci_profile: str
OCI config profile
overwrite: bool
whether to overwrite existing pack
debug: bool
whether to turn on debug mode
auth_type : str
authentication method
Returns
-------
None
"""
ns, bucket, path, slug = parse_conda_uri(conda_uri)
if bucket == "service-conda-packs":
raise ValueError(
"Download service conda pack is not allowed. Only custom conda pack can be downloaded to local machine. You need to publish it to your own bucket first."
)
os.makedirs(conda_pack_folder, exist_ok=True)
pack_path = os.path.join(os.path.expanduser(conda_pack_folder), slug + ".tar.gz")
pack_folder_path = os.path.join(os.path.expanduser(conda_pack_folder), slug)
if not (is_in_notebook_session() or NO_CONTAINER):
_check_job_image_exists(gpu=False)
while os.path.exists(pack_folder_path):
if overwrite:
break
else:
ans = click.prompt(
f"conda pack with slug {slug} already exists. Enter a new name or 'o' for overwrite:",
default="o",
)
if ans == "o":
overwrite = True
break
else:
slug = ans
pack_path = os.path.join(
os.path.expanduser(conda_pack_folder), slug + ".tar.gz"
)
pack_folder_path = os.path.join(
os.path.expanduser(conda_pack_folder), slug
)
if oci_config is None or oci_profile is None:
download_command = [
"oci",
"os",
"object",
"get",
"--name",
f"{path}",
"-bn",
bucket,
"-ns",
ns,
"--file",
pack_path,
"--auth",
auth_type,
]
else:
oci_config = os.path.abspath(os.path.expanduser(oci_config))
parser = read_from_ini(os.path.expanduser(oci_config))
if oci_profile not in parser:
raise ValueError(f"PROFILE {oci_profile} not found in {oci_config}.")
download_command = [
"oci",
"os",
"object",
"get",
"--name",
f"{path}",
"-bn",
bucket,
"-ns",
ns,
"--file",
pack_path,
"--profile",
oci_profile,
"--config-file",
oci_config,
]
if debug:
download_command.append("-d")
print(" ".join([shlex.quote(c) for c in download_command]))
process = subprocess.Popen(download_command)
process.communicate()
if process.returncode != 0:
if os.path.exists(pack_path):
os.remove(pack_path)
raise RuntimeError(
f"Downloading pack failed with return code {process.returncode}. Please double check the path and make sure you have access."
)
else:
print(f"Download {conda_uri} completed")
print(f"Start unpacking {pack_path}")
os.makedirs(pack_folder_path, exist_ok=True)
process = subprocess.Popen(["tar", "-xf", pack_path, "-C", pack_folder_path])
process.communicate()
if process.returncode != 0:
shutil.rmtree(pack_folder_path)
raise Exception(
f"Error extracting {pack_path} to {pack_folder_path}. Please try again."
)
# Run the conda-unpack script to clean up prefixes
# This will fix problems related to repeated "placehold" paths.
# See https://conda.github.io/conda-pack/#commandline-usage
unpack_script = os.path.join(pack_folder_path, "bin", "conda-unpack")
if os.path.exists(unpack_script):
if is_in_notebook_session() or NO_CONTAINER:
run_command(unpack_script)
else:
volumes = {
pack_folder_path: {"bind": os.path.join(DEFAULT_IMAGE_HOME_DIR, slug)}
}
try:
run_container(
image=ML_JOB_IMAGE,
bind_volumes=volumes,
env_vars={},
command=os.path.join(
DEFAULT_IMAGE_HOME_DIR, slug, "bin/conda-unpack"
),
)
except Exception:
raise RuntimeError(f"Error unpacking environment {slug}.")
if os.path.exists(os.path.join(pack_folder_path, "spark-defaults.conf")):
if is_in_notebook_session() or NO_CONTAINER:
if not os.path.exists(DEFAULT_NOTEBOOK_SESSION_SPARK_CONF_DIR):
os.makedirs(DEFAULT_NOTEBOOK_SESSION_SPARK_CONF_DIR)
shutil.copy(
os.path.join(pack_folder_path, "spark-defaults.conf"),
DEFAULT_NOTEBOOK_SESSION_SPARK_CONF_DIR,
)
for file in os.listdir(pack_folder_path):
if os.path.splitext(file)[1] == ".jar":
shutil.copy(
os.path.join(pack_folder_path, file),
DEFAULT_NOTEBOOK_SESSION_SPARK_CONF_DIR,
)
else:
with open(os.path.join(pack_folder_path, "spark-defaults.conf")) as f:
confs = f.read()
confs = confs.replace(
DEFAULT_NOTEBOOK_SESSION_SPARK_CONF_DIR,
os.path.join(DEFAULT_IMAGE_CONDA_DIR, slug),
)
with open(
os.path.join(pack_folder_path, "spark-defaults.conf"), "w"
) as f:
f.write(confs)
os.remove(pack_path)
manifest_path = glob.glob(os.path.join(pack_folder_path, "*_manifest.yaml"))[0]
with open(manifest_path) as f:
env = yaml.safe_load(f.read())
env["manifest"]["pack_uri"] = conda_uri
with open(manifest_path, "w") as f:
yaml.safe_dump(env, f)
print(f"{slug} set up at {pack_folder_path}.")
[docs]
def publish(**kwargs) -> None:
p = ConfigProcessor().step(ConfigMerger, **kwargs)
exec_config = p.config["execution"]
skip_archive = False
if exec_config.get("environment_file", None):
name = _get_name(exec_config.get("name"), exec_config.get("environment_file"))
slug = _create(
name=name,
version=exec_config["version"],
env_file=exec_config["environment_file"],
conda_pack_folder=exec_config["conda_pack_folder"],
gpu=exec_config.get("gpu", False),
overwrite=exec_config["overwrite"],
prepare_publish=True,
)
skip_archive = (
True # The conda pack archive is already created during create process.
)
else:
slug = exec_config.get("slug")
if not slug:
raise ValueError("Please specify slug of the conda pack via `--slug`.")
if not exec_config.get("conda_pack_os_prefix"):
raise ValueError(
"Please specify object storage path to save conda pack either via `ads opctl configure` or `--conda-pack-os-prefix`."
)
_publish(
conda_slug=slug,
conda_uri_prefix=exec_config["conda_pack_os_prefix"],
conda_pack_folder=exec_config["conda_pack_folder"],
oci_config=exec_config.get("oci_config"),
oci_profile=exec_config.get("oci_profile"),
overwrite=exec_config["overwrite"],
auth_type=exec_config["auth"],
skip_archive=skip_archive,
)
def _publish(
conda_slug: str,
conda_uri_prefix: str,
conda_pack_folder: str,
oci_config: str,
oci_profile: str,
overwrite: bool,
auth_type: str,
skip_archive: bool = False,
) -> None:
"""Publish a local conda pack to object storage location
Parameters
----------
conda_slug : str
slug of conda pack
conda_uri_prefix : str
object storage prefix to save conda pack
conda_pack_folder : str
path to local conda folder
oci_config : str
oci config file path
oci_profile : str
oci config profile
overwrite : bool
whether to overwrite existing pack of the same slug
auth_type : str
authentication method
Raises
------
FileNotFoundError
local conda pack folder not found
FileNotFoundError
manifest file not found inside conda pack
RuntimeError
IP packages found inside conda pack
RuntimeError
Packing conda failed
RuntimeError
Pack file not found
"""
ns, bucket, prefix, _ = parse_conda_uri(conda_uri_prefix)
# ======= check if conda pack and manifest exists ==============
pack_folder_path = os.path.abspath(
os.path.expanduser(os.path.join(conda_pack_folder, conda_slug))
)
if not os.path.exists(pack_folder_path):
raise FileNotFoundError(
f"Could not find environment {conda_slug} in {conda_pack_folder}."
)
paths = glob.glob(os.path.join(pack_folder_path, "*_manifest.yaml"))
if len(paths) != 1:
raise FileNotFoundError(
"Could not locate manifest file in the provided environment."
)
else:
manifest_location = paths[0]
# ===== check if pack contains IP packages =========
print(f"Loading environment information from {manifest_location}.")
with open(manifest_location) as mlf:
env = yaml.safe_load(mlf.read())
if "IP" in env["manifest"] and env["manifest"]["IP"].lower() == "y":
raise RuntimeError("This environment has IP restricted packages.")
oci_auth = create_signer(auth_type, oci_config, oci_profile)
client = OCIClientFactory(**oci_auth).object_storage
publish_slug = conda_slug
if not overwrite:
existing_packs = client.list_objects(ns, bucket, prefix=prefix).data.objects
pack_names = [os.path.basename(pack.name) for pack in existing_packs]
while publish_slug in pack_names:
ans = click.prompt(
f"{conda_slug} exists in {conda_uri_prefix}. Enter a new name or overwrite (o)",
default="o",
)
if ans == "o":
break
else:
publish_slug = "_".join(ans.lower().split(" "))
pack_script = os.path.join(os.path.dirname(os.path.abspath(__file__)), "pack.py")
if not skip_archive:
if is_in_notebook_session() or NO_CONTAINER:
# Set the CONDA_PUBLISH_TYPE environment variable so that the `type` attribute inside the manifest is not changed
publish_type = os.environ.get("CONDA_PUBLISH_TYPE")
command = f"python {pack_script} --conda-path {pack_folder_path}"
if publish_type:
command = f"CONDA_PUBLISH_TYPE={publish_type} {command}"
proc = run_command(command, shell=True)
if proc.returncode != 0:
raise RuntimeError(
f"Failed to archive the conda environment. (exit code {proc.returncode})"
)
else:
volumes = {
pack_folder_path: {
"bind": os.path.join(DEFAULT_IMAGE_HOME_DIR, conda_slug)
},
pack_script: {"bind": os.path.join(DEFAULT_IMAGE_HOME_DIR, "pack.py")},
}
command = f"python {os.path.join(DEFAULT_IMAGE_HOME_DIR, 'pack.py')} --conda-path {os.path.join(DEFAULT_IMAGE_HOME_DIR, conda_slug)}"
gpu = env["manifest"]["arch_type"] == "GPU"
_check_job_image_exists(gpu)
if gpu:
image = ML_JOB_GPU_IMAGE
else:
image = ML_JOB_IMAGE
try:
run_container(
image=image, bind_volumes=volumes, env_vars={}, command=command
)
except Exception:
raise RuntimeError(f"Could not pack environment {conda_slug}.")
NOT_ALLOWED_CHARS = "@#$%^&*/"
if any(chr in conda_slug for chr in NOT_ALLOWED_CHARS):
raise ValueError(
f"Invalid conda_slug. Found {NOT_ALLOWED_CHARS} in slug name. Please use a different slug name."
)
pack_file = os.path.join(pack_folder_path, f"{conda_slug}.tar.gz")
if not os.path.exists(pack_file):
raise RuntimeError(f"Pack {pack_file} was not created.")
pack_size = round(os.path.getsize(pack_file) / 2**20, 2)
with open(manifest_location) as mlf:
env = yaml.safe_load(mlf.read())
manifest = env["manifest"]
manifest["slug"] = conda_slug
manifest["create_date"] = datetime.utcnow().strftime(
"%a, %b %d, %Y, %H:%M:%S %Z UTC"
)
manifest["size_mb"] = pack_size
ns, bucket, prefix, _ = parse_conda_uri(conda_uri_prefix)
pack_uri = os.path.join(
conda_uri_prefix,
manifest.get("arch_type", "CPU").lower(),
manifest["name"],
str(manifest["version"]),
publish_slug,
)
if os.environ.get("CONDA_PUBLISH_TYPE") != "service":
# Set these values only for published conda pack
manifest["pack_path"] = os.path.join(
prefix,
manifest.get("arch_type", "CPU").lower(),
manifest["name"],
str(manifest["version"]),
publish_slug,
)
manifest["pack_uri"] = pack_uri
else:
manifest["type"] = "published"
with open(manifest_location, "w") as f:
yaml.safe_dump(env, f)
if pack_size > 100:
MultiPartUploader(
pack_file, pack_uri, 10, oci_config, oci_profile, auth_type
).upload(manifest)
else:
with open(pack_file, "rb") as pkf:
client.put_object(
ns,
bucket,
manifest["pack_path"],
pkf,
opc_meta={"manifest": json.dumps(manifest)},
)
print(f"Conda pack {pack_uri} published.")
os.remove(pack_file)