#!/usr/bin/env python# -*- coding: utf-8; -*-importlogging# Copyright (c) 2022, 2024 Oracle and/or its affiliates.# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/importosimportshutilfromabcimportABC,abstractmethodfromtypingimportDict,Optionalfromads.commonimportutilsfromads.common.object_storage_detailsimportObjectStorageDetailsfromads.model.commonimportutilsasmodel_utilsfromads.model.service.oci_datascience_modelimportOCIDataScienceModel
[docs]classArtifactUploader(ABC):"""The abstract class to upload model artifacts."""PROGRESS_STEPS_COUNT=3def__init__(self,dsc_model:OCIDataScienceModel,artifact_path:str):"""Initializes `ArtifactUploader` instance. Parameters ---------- dsc_model: OCIDataScienceModel The data scince model instance. artifact_path: str The model artifact location. """ifnot(ObjectStorageDetails.is_oci_path(artifact_path)oros.path.exists(artifact_path)):raiseValueError(f"The `{artifact_path}` does not exist")self.dsc_model=dsc_modelself.artifact_path=artifact_pathself.artifact_file_path=Noneself.progress=None
[docs]defupload(self):"""Uploads model artifacts."""try:withutils.get_progress_bar(ArtifactUploader.PROGRESS_STEPS_COUNT+self.PROGRESS_STEPS_COUNT)asprogress:self.progress=progressself.progress.update("Preparing model artifacts file.")self._prepare_artifact_tmp_file()self.progress.update("Uploading model artifacts.")self._upload()self.progress.update("Uploading model artifacts has been successfully completed.")self.progress.update("Done.")exceptException:raisefinally:self._remove_artifact_tmp_file()
def_prepare_artifact_tmp_file(self)->str:"""Prepares model artifacts file. Returns ------- str Path to the model artifact file. """ifObjectStorageDetails.is_oci_path(self.artifact_path):self.artifact_file_path=self.artifact_pathelifos.path.isfile(self.artifact_path)andself.artifact_path.lower().endswith((".zip",".json")):self.artifact_file_path=self.artifact_pathelse:self.artifact_file_path=model_utils.zip_artifact(artifact_dir=self.artifact_path)returnself.artifact_file_pathdef_remove_artifact_tmp_file(self):"""Removes temporary created artifact file."""if(self.artifact_file_pathandself.artifact_file_path.lower()!=self.artifact_path.lower()):shutil.rmtree(self.artifact_file_path,ignore_errors=True)@abstractmethoddef_upload(self):"""Uploads model artifacts. Needs to be implemented in a child class."""
[docs]classSmallArtifactUploader(ArtifactUploader):"""The class helper to upload small model artifacts."""PROGRESS_STEPS_COUNT=1def_upload(self):"""Uploads model artifacts to the model catalog."""_,ext=os.path.splitext(self.artifact_file_path)self.progress.update("Uploading model artifacts to the catalog")withopen(self.artifact_file_path,"rb")asfile_data:self.dsc_model.create_model_artifact(bytes_content=file_data,extension=ext)
[docs]classLargeArtifactUploader(ArtifactUploader):""" The class helper to upload large model artifacts. Attributes ---------- artifact_path: str The model artifact location. Possible values are: - object storage path to zip archive. Example: `oci://<bucket_name>@<namespace>/prefix/mymodel.zip`. - local path to zip archive. Example: `./mymodel.zip`. - local path to folder with artifacts. Example: `./mymodel`. artifact_file_path: str The uri of the zip of model artifact. auth: dict The default authetication is set using `ads.set_auth` API. If you need to override the default, use the `ads.common.auth.api_keys` or `ads.common.auth.resource_principal` to create appropriate authentication signer and kwargs required to instantiate IdentityClient object. bucket_uri: str The OCI Object Storage URI where model artifacts will be copied to. The `bucket_uri` is only necessary for uploading large artifacts which size is greater than 2GB. Example: `oci://<bucket_name>@<namespace>/prefix/`. .. versionadded:: 2.8.10 If artifact_path is object storage path to a zip archive, bucket_uri will be ignored. dsc_model: OCIDataScienceModel The data scince model instance. overwrite_existing_artifact: bool Overwrite target bucket artifact if exists. progress: TqdmProgressBar An instance of the TqdmProgressBar. region: str The destination Object Storage bucket region. By default the value will be extracted from the `OCI_REGION_METADATA` environment variables. remove_existing_artifact: bool Wether artifacts uploaded to object storage bucket need to be removed or not. upload_manager: UploadManager The uploadManager simplifies interaction with the Object Storage service. """PROGRESS_STEPS_COUNT=4def__init__(self,dsc_model:OCIDataScienceModel,artifact_path:str,bucket_uri:str=None,auth:Optional[Dict]=None,region:Optional[str]=None,overwrite_existing_artifact:Optional[bool]=True,remove_existing_artifact:Optional[bool]=True,parallel_process_count:int=utils.DEFAULT_PARALLEL_PROCESS_COUNT,):"""Initializes `LargeArtifactUploader` instance. Parameters ---------- dsc_model: OCIDataScienceModel The data scince model instance. artifact_path: str The model artifact location. Possible values are: - object storage path to zip archive. Example: `oci://<bucket_name>@<namespace>/prefix/mymodel.zip`. - local path to zip archive. Example: `./mymodel.zip`. - local path to folder with artifacts. Example: `./mymodel`. bucket_uri: (str, optional). Defaults to `None`. The OCI Object Storage URI where model artifacts will be copied to. The `bucket_uri` is only necessary for uploading large artifacts from local which size is greater than 2GB. Example: `oci://<bucket_name>@<namespace>/prefix/`. .. versionadded:: 2.8.10 If `artifact_path` is object storage path to a zip archive, `bucket_uri` will be ignored. auth: (Dict, optional). Defaults to `None`. The default authetication is set using `ads.set_auth` API. If you need to override the default, use the `ads.common.auth.api_keys` or `ads.common.auth.resource_principal` to create appropriate authentication signer and kwargs required to instantiate IdentityClient object. region: (str, optional). Defaults to `None`. The destination Object Storage bucket region. By default the value will be extracted from the `OCI_REGION_METADATA` environment variables. overwrite_existing_artifact: (bool, optional). Defaults to `True`. Overwrite target bucket artifact if exists. remove_existing_artifact: (bool, optional). Defaults to `True`. Whether artifacts uploaded to object storage bucket need to be removed or not. parallel_process_count: (int, optional). The number of worker processes to use in parallel for uploading individual parts of a multipart upload. """self.auth=authordsc_model.authifObjectStorageDetails.is_oci_path(artifact_path):ifnotartifact_path.endswith(".zip"):raiseValueError(f"The `artifact_path={artifact_path}` is invalid.""The remote path for model artifact should be a zip archive, ""e.g. `oci://<bucket_name>@<namespace>/prefix/mymodel.zip`.")ifnotutils.is_path_exists(uri=artifact_path,auth=self.auth):raiseValueError(f"The `{artifact_path}` does not exist.")bucket_uri=artifact_pathifnotbucket_uri:raiseValueError("The `bucket_uri` must be provided.")super().__init__(dsc_model=dsc_model,artifact_path=artifact_path)self.region=regionorutils.extract_region(self.auth)self.bucket_uri=bucket_uriself.overwrite_existing_artifact=overwrite_existing_artifactself.remove_existing_artifact=remove_existing_artifactself._parallel_process_count=parallel_process_countdef_upload(self):"""Uploads model artifacts to the model catalog."""bucket_uri=self.bucket_uriself.progress.update("Copying model artifact to the Object Storage bucket")ifnotbucket_uri==self.artifact_file_path:bucket_uri_file_name=os.path.basename(bucket_uri)ifnotbucket_uri_file_name:bucket_uri=os.path.join(bucket_uri,f"{self.dsc_model.id}.zip")elifnotbucket_uri.lower().endswith(".zip"):bucket_uri=f"{bucket_uri}.zip"ifnotself.overwrite_existing_artifactandutils.is_path_exists(uri=bucket_uri,auth=self.auth):raiseFileExistsError(f"The bucket_uri=`{self.bucket_uri}` exists. Please use a new file name or ""set `overwrite_existing_artifact` to `True` if you wish to overwrite.")try:utils.upload_to_os(src_uri=self.artifact_file_path,dst_uri=bucket_uri,auth=self.auth,parallel_process_count=self._parallel_process_count,force_overwrite=self.overwrite_existing_artifact,progressbar_description="Copying model artifact to the Object Storage bucket.",)exceptExceptionasex:raiseRuntimeError(f"Failed to upload model artifact to the given Object Storage path `{self.bucket_uri}`."f"See Exception: {ex}")self.progress.update("Exporting model artifact to the model catalog")self.dsc_model.export_model_artifact(bucket_uri=bucket_uri,region=self.region)ifself.remove_existing_artifact:self.progress.update("Removing temporary artifacts from the Object Storage bucket")utils.remove_file(bucket_uri)else:self.progress.update()