Source code for ads.model.deployment.model_deployment

#!/usr/bin/env python

# Copyright (c) 2021, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/


import collections
import copy
import datetime
import time
import warnings
from typing import Any, Dict, List, Union

import oci
import oci.loggingsearch
import pandas as pd
from oci.data_science.models import (
    CreateModelDeploymentDetails,
    LogDetails,
    UpdateModelDeploymentDetails,
)

from ads.common import auth as authutil
from ads.common import utils as ads_utils
from ads.common.oci_logging import (
    LOG_INTERVAL,
    LOG_RECORDS_LIMIT,
    ConsolidatedLog,
    OCILog,
)
from ads.config import COMPARTMENT_OCID, PROJECT_OCID
from ads.jobs.builders.base import Builder
from ads.jobs.builders.infrastructure.utils import get_value
from ads.model.common.utils import _is_json_serializable
from ads.model.deployment.common.utils import send_request
from ads.model.deployment.model_deployment_infrastructure import (
    DEFAULT_BANDWIDTH_MBPS,
    DEFAULT_MEMORY_IN_GBS,
    DEFAULT_OCPUS,
    DEFAULT_REPLICA,
    DEFAULT_SHAPE_NAME,
    MODEL_DEPLOYMENT_INFRASTRUCTURE_TYPE,
    ModelDeploymentInfrastructure,
)
from ads.model.deployment.model_deployment_runtime import (
    ModelDeploymentCondaRuntime,
    ModelDeploymentContainerRuntime,
    ModelDeploymentMode,
    ModelDeploymentRuntime,
    ModelDeploymentRuntimeType,
    OCIModelDeploymentRuntimeType,
)
from ads.model.serde.model_input import JsonModelInputSERDE
from ads.model.service.oci_datascience_model_deployment import (
    OCIDataScienceModelDeployment,
)

from .common import utils
from .common.utils import State
from .model_deployment_properties import ModelDeploymentProperties

DEFAULT_WAIT_TIME = 1200
DEFAULT_POLL_INTERVAL = 10
DEFAULT_WORKFLOW_STEPS = 6
DELETE_WORKFLOW_STEPS = 2
DEACTIVATE_WORKFLOW_STEPS = 2
DEFAULT_RETRYING_REQUEST_ATTEMPTS = 3

MODEL_DEPLOYMENT_KIND = "deployment"
MODEL_DEPLOYMENT_TYPE = "modelDeployment"
MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON = "TRITON"

MODEL_DEPLOYMENT_RUNTIMES = {
    ModelDeploymentRuntimeType.CONDA: ModelDeploymentCondaRuntime,
    ModelDeploymentRuntimeType.CONTAINER: ModelDeploymentContainerRuntime,
}


[docs] class ModelDeploymentLogType: PREDICT = "predict" ACCESS = "access"
[docs] class ModelDeploymentType: SINGLE_MODEL = "SINGLE_MODEL" SINGLE_MODEL_FLEX = "SINGLE_MODEL_FLEX" MODEL_GROUP = "MODEL_GROUP"
[docs] class ModelDeploymentUpdateType: ZDT = "ZDT" LIVE = "LIVE"
[docs] class LogNotConfiguredError(Exception): # pragma: no cover pass
[docs] class ModelDeploymentPredictError(Exception): # pragma: no cover pass
[docs] class ModelDeployment(Builder): """ A class used to represent a Model Deployment. Attributes ---------- config: (dict) Deployment configuration parameters properties: (ModelDeploymentProperties) ModelDeploymentProperties object workflow_state_progress: (str) Workflow request id workflow_steps: (int) The number of steps in the workflow dsc_model_deployment: (OCIDataScienceModelDeployment) The OCIDataScienceModelDeployment instance. state: (State) Returns the deployment state of the current Model Deployment object created_by: (str) The user that creates the model deployment lifecycle_state: (str) Model deployment lifecycle state lifecycle_details: (str) Model deployment lifecycle details time_created: (datetime) The time when the model deployment is created display_name: (str) Model deployment display name description: (str) Model deployment description freeform_tags: (dict) Model deployment freeform tags defined_tags: (dict) Model deployment defined tags runtime: (ModelDeploymentRuntime) Model deployment runtime infrastructure: (ModelDeploymentInfrastructure) Model deployment infrastructure Methods ------- deploy(wait_for_completion, **kwargs) Deploy the current Model Deployment object delete(wait_for_completion, **kwargs) Deletes the current Model Deployment object update(wait_for_completion, **kwargs) Updates a model deployment activate(wait_for_completion, max_wait_time, poll_interval) Activates a model deployment deactivate(wait_for_completion, max_wait_time, poll_interval) Deactivates a model deployment list(status, compartment_id, project_id, **kwargs) List model deployment within given compartment and project. with_display_name(display_name) Sets model deployment display name with_description(description) Sets model deployment description with_freeform_tags(freeform_tags) Sets model deployment freeform tags with_defined_tags(defined_tags) Sets model deployment defined tags with_runtime(self, runtime) Sets model deployment runtime with_infrastructure(self, infrastructure) Sets model deployment infrastructure from_dict(obj_dict) Deserializes model deployment instance from dict from_id(id) Loads model deployment instance from ocid sync() Updates the model deployment instance from backend Examples -------- >>> # Build model deployment from builder apis: >>> ds_model_deployment = (ModelDeployment() ... .with_display_name("TestModelDeployment") ... .with_description("Testing the test model deployment") ... .with_freeform_tags(tag1="val1", tag2="val2") ... .with_infrastructure( ... (ModelDeploymentInfrastructure() ... .with_project_id(<project_id>) ... .with_compartment_id(<compartment_id>) ... .with_shape_name("VM.Standard.E4.Flex") ... .with_shape_config_details( ... ocpus=1, ... memory_in_gbs=16 ... ) ... .with_replica(1) ... .with_bandwidth_mbps(10) ... .with_web_concurrency(10) ... .with_access_log( ... log_group_id=<log_group_id>, ... log_id=<log_id> ... ) ... .with_predict_log( ... log_group_id=<log_group_id>, ... log_id=<log_id> ... )) ... ) ... .with_runtime( ... (ModelDeploymentContainerRuntime() ... .with_image(<image>) ... .with_image_digest(<image_digest>) ... .with_entrypoint(<entrypoint>) ... .with_server_port(<server_port>) ... .with_health_check_port(<health_check_port>) ... .with_env({"key":"value"}) ... .with_deployment_mode("HTTPS_ONLY") ... .with_model_uri(<model_uri>) ... .with_bucket_uri(<bucket_uri>) ... .with_auth(<auth>) ... .with_timeout(<time_out>)) ... ) ... ) >>> ds_model_deployment.deploy() >>> ds_model_deployment.status >>> ds_model_deployment.with_display_name("new name").update() >>> ds_model_deployment.deactivate() >>> ds_model_deployment.sync() >>> ds_model_deployment.list(status="ACTIVE") >>> ds_model_deployment.delete() >>> # Build model deployment from yaml >>> ds_model_deployment = ModelDeployment.from_yaml(uri=<path_to_yaml>) """ _PREFIX = "datascience_model_deployment" CONST_ID = "id" CONST_CREATED_BY = "createdBy" CONST_DISPLAY_NAME = "displayName" CONST_DESCRIPTION = "description" CONST_FREEFORM_TAG = "freeformTags" CONST_DEFINED_TAG = "definedTags" CONST_MODEL_DEPLOYMENT_URL = "modelDeploymentUrl" CONST_INFRASTRUCTURE = "infrastructure" CONST_RUNTIME = "runtime" CONST_LIFECYCLE_STATE = "lifecycleState" CONST_LIFECYCLE_DETAILS = "lifecycleDetails" CONST_TIME_CREATED = "timeCreated" CONST_UPDATE_TYPE = "updateType" attribute_map = { CONST_ID: "id", CONST_CREATED_BY: "created_by", CONST_DISPLAY_NAME: "display_name", CONST_DESCRIPTION: "description", CONST_FREEFORM_TAG: "freeform_tags", CONST_DEFINED_TAG: "defined_tags", CONST_MODEL_DEPLOYMENT_URL: "model_deployment_url", CONST_INFRASTRUCTURE: "infrastructure", CONST_RUNTIME: "runtime", CONST_LIFECYCLE_STATE: "lifecycle_state", CONST_LIFECYCLE_DETAILS: "lifecycle_details", CONST_TIME_CREATED: "time_created", } initialize_spec_attributes = [ "display_name", "description", "freeform_tags", "defined_tags", "infrastructure", "runtime", ] model_input_serializer = JsonModelInputSERDE() def __init__( self, properties: Union[ModelDeploymentProperties, Dict] = None, config: Dict = None, model_deployment_id: str = None, model_deployment_url: str = "", spec: Dict = None, **kwargs, ): """Initializes a ModelDeployment object. Parameters ---------- properties: (Union[ModelDeploymentProperties, Dict], optional). Defaults to None. Object containing deployment properties. The properties can be `None` when `kwargs` are used for specifying properties. config: (Dict, optional). Defaults to None. ADS auth dictionary for OCI authentication. This can be generated by calling `ads.common.auth.api_keys()` or `ads.common.auth.resource_principal()`. If this is `None` then the `ads.common.default_signer(client_kwargs)` will be used. model_deployment_id: (str, optional). Defaults to None. Model deployment OCID. model_deployment_url: (str, optional). Defaults to empty string. Model deployment url. spec: (dict, optional). Defaults to None. Model deployment spec. kwargs: Keyword arguments for initializing `ModelDeploymentProperties` or `ModelDeployment`. """ if spec and properties: raise ValueError( "You can only pass in either `spec` or `properties` to initialize model deployment instance." ) if config: warnings.warn( "Parameter `config` was deprecated in 2.8.2 from ModelDeployment constructor and will be removed in 3.0.0. Please use `ads.set_auth()` to config the auth information. " "Check: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/cli/authentication.html" ) if properties: warnings.warn( "Parameter `properties` was deprecated in 2.8.2 from ModelDeployment constructor and will be removed in 3.0.0. Please use `spec` or the builder pattern to initialize model deployment instance. " "Check: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/model_registration/quick_start.html" ) if model_deployment_url or model_deployment_id: warnings.warn( "Parameter `model_deployment_url` and `model_deployment_id` were deprecated in 2.8.2 from ModelDeployment constructor and will be removed in 3.0.0. These two fields will be auto-populated from the service side. " "Check: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/model_registration/quick_start.html" ) initialize_spec = {} initialize_spec_kwargs = {} if spec: initialize_spec = spec initialize_spec_kwargs = self._extract_spec_kwargs(**kwargs) elif not properties and not spec: if self.CONST_INFRASTRUCTURE in kwargs or self.CONST_RUNTIME in kwargs: initialize_spec_kwargs = self._extract_spec_kwargs(**kwargs) super().__init__(spec=initialize_spec, **initialize_spec_kwargs) self.properties = ( properties if isinstance(properties, ModelDeploymentProperties) else ModelDeploymentProperties(oci_model_deployment=properties, **kwargs) ) self.current_state = ( State._from_str(self.properties.lifecycle_state) if self.properties.lifecycle_state else State.UNKNOWN ) self._access_log = None self._predict_log = None self.dsc_model_deployment = OCIDataScienceModelDeployment() @property def kind(self) -> str: """The kind of the object as showing in YAML. Returns ------- str deployment """ return MODEL_DEPLOYMENT_KIND @property def type(self) -> str: """The type of the object as showing in YAML. Returns ------- str deployment """ return MODEL_DEPLOYMENT_TYPE @property def model_deployment_id(self) -> str: """The model deployment ocid. Returns ------- str The model deployment ocid. """ return self.get_spec(self.CONST_ID, None) @property def id(self) -> str: """The model deployment ocid. Returns ------- str The model deployment ocid. """ return self.get_spec(self.CONST_ID, None) @property def created_by(self) -> str: """The user that creates the model deployment. Returns ------- str The user that creates the model deployment. """ return self.get_spec(self.CONST_CREATED_BY, None) @property def url(self) -> str: """Model deployment url. Returns ------- str Model deployment url. """ return self.get_spec(self.CONST_MODEL_DEPLOYMENT_URL, None) @property def lifecycle_state(self) -> str: """Model deployment lifecycle state. Returns ------- str Model deployment lifecycle state. """ return self.get_spec(self.CONST_LIFECYCLE_STATE, None) @property def lifecycle_details(self) -> str: """Model deployment lifecycle details. Returns ------- str Model deployment lifecycle details. """ return self.get_spec(self.CONST_LIFECYCLE_DETAILS, None) @property def time_created(self) -> datetime: """The time when the model deployment is created. Returns ------- datetime The time when the model deployment is created. """ return self.get_spec(self.CONST_TIME_CREATED, None) @property def display_name(self) -> str: """Model deployment display name. Returns ------- str Model deployment display name. """ return self.get_spec(self.CONST_DISPLAY_NAME, None)
[docs] def with_display_name(self, display_name: str) -> "ModelDeployment": """Sets the name of model deployment. Parameters ---------- display_name: str The name of model deployment. Returns ------- ModelDeployment The ModelDeployment instance (self). """ return self.set_spec(self.CONST_DISPLAY_NAME, display_name)
@property def description(self) -> str: """Model deployment description. Returns ------- str Model deployment description. """ return self.get_spec(self.CONST_DESCRIPTION, None)
[docs] def with_description(self, description: str) -> "ModelDeployment": """Sets the description of model deployment. Parameters ---------- description: str The description of model deployment. Returns ------- ModelDeployment The ModelDeployment instance (self). """ return self.set_spec(self.CONST_DESCRIPTION, description)
@property def freeform_tags(self) -> Dict: """Model deployment freeform tags. Returns ------- Dict Model deployment freeform tags. """ return self.get_spec(self.CONST_FREEFORM_TAG, {})
[docs] def with_freeform_tags(self, **kwargs) -> "ModelDeployment": """Sets the freeform tags of model deployment. Parameters ---------- kwargs The freeform tags of model deployment. Returns ------- ModelDeployment The ModelDeployment instance (self). """ return self.set_spec(self.CONST_FREEFORM_TAG, kwargs)
@property def defined_tags(self) -> Dict: """Model deployment defined tags. Returns ------- Dict Model deployment defined tags. """ return self.get_spec(self.CONST_DEFINED_TAG, {})
[docs] def with_defined_tags(self, **kwargs) -> "ModelDeployment": """Sets the defined tags of model deployment. Parameters ---------- kwargs The defined tags of model deployment. Returns ------- ModelDeployment The ModelDeployment instance (self). """ return self.set_spec(self.CONST_DEFINED_TAG, kwargs)
@property def runtime(self) -> "ModelDeploymentRuntime": """Model deployment runtime. Returns ------- ModelDeploymentRuntime Model deployment runtime. """ return self.get_spec(self.CONST_RUNTIME, None)
[docs] def with_runtime(self, runtime: ModelDeploymentRuntime) -> "ModelDeployment": """Sets the runtime of model deployment. Parameters ---------- runtime: ModelDeploymentRuntime The runtime of model deployment. Returns ------- ModelDeployment The ModelDeployment instance (self). """ return self.set_spec(self.CONST_RUNTIME, runtime)
@property def infrastructure(self) -> "ModelDeploymentInfrastructure": """Model deployment infrastructure. Returns ------- ModelDeploymentInfrastructure Model deployment infrastructure. """ return self.get_spec(self.CONST_INFRASTRUCTURE, None)
[docs] def with_infrastructure( self, infrastructure: ModelDeploymentInfrastructure ) -> "ModelDeployment": """Sets the infrastructure of model deployment. Parameters ---------- infrastructure: ModelDeploymentInfrastructure The infrastructure of model deployment. Returns ------- ModelDeployment The ModelDeployment instance (self). """ return self.set_spec(self.CONST_INFRASTRUCTURE, infrastructure)
[docs] def deploy( self, wait_for_completion: bool = True, max_wait_time: int = DEFAULT_WAIT_TIME, poll_interval: int = DEFAULT_POLL_INTERVAL, ): """Deploys the current ModelDeployment object Parameters ---------- wait_for_completion: bool Flag set for whether to wait for deployment to be deployed before proceeding. Defaults to True. max_wait_time: int Maximum amount of time to wait in seconds (Defaults to 1200). Negative implies infinite wait time. poll_interval: int Poll interval in seconds (Defaults to 10). Returns ------- ModelDeployment The instance of ModelDeployment. """ create_model_deployment_details = ( self._build_model_deployment_details() if self._spec else self.properties.build() ) response = self.dsc_model_deployment.create( create_model_deployment_details=create_model_deployment_details, wait_for_completion=wait_for_completion, max_wait_time=max_wait_time, poll_interval=poll_interval, ) return self._update_from_oci_model(response)
[docs] def delete( self, wait_for_completion: bool = True, max_wait_time: int = DEFAULT_WAIT_TIME, poll_interval: int = DEFAULT_POLL_INTERVAL, ): """Deletes the ModelDeployment Parameters ---------- wait_for_completion: bool Flag set for whether to wait for deployment to be deleted before proceeding. Defaults to True. max_wait_time: int Maximum amount of time to wait in seconds (Defaults to 1200). Negative implies infinite wait time. poll_interval: int Poll interval in seconds (Defaults to 10). Returns ------- ModelDeployment The instance of ModelDeployment. """ response = self.dsc_model_deployment.delete( wait_for_completion=wait_for_completion, max_wait_time=max_wait_time, poll_interval=poll_interval, ) return self._update_from_oci_model(response)
[docs] def update( self, properties: Union[ModelDeploymentProperties, dict, None] = None, wait_for_completion: bool = True, max_wait_time: int = DEFAULT_WAIT_TIME, poll_interval: int = DEFAULT_POLL_INTERVAL, update_type: str = ModelDeploymentUpdateType.ZDT, **kwargs, ): """Updates a model deployment You can update `model_deployment_configuration_details` and change `instance_shape` and `model_id` when the model deployment is in the ACTIVE lifecycle state. The `bandwidth_mbps` or `instance_count` can only be updated while the model deployment is in the `INACTIVE` state. Changes to the `bandwidth_mbps` or `instance_count` will take effect the next time the `ActivateModelDeployment` action is invoked on the model deployment resource. Parameters ---------- properties: ModelDeploymentProperties or dict The properties for updating the deployment. wait_for_completion: bool Flag set for whether to wait for deployment to be updated before proceeding. Defaults to True. max_wait_time: int Maximum amount of time to wait in seconds (Defaults to 1200). Negative implies infinite wait time. poll_interval: int Poll interval in seconds (Defaults to 10). update_type: str Update type for the deployment. Allowed values: ['ZDT', 'LIVE']. 'LIVE' update can only be used if model group id has been changed. kwargs: dict Returns ------- ModelDeployment The instance of ModelDeployment. """ if properties: warnings.warn( "Parameter `properties` is deprecated from ModelDeployment `update()` in 2.8.6 and will be removed in 3.0.0. Please use the builder pattern or kwargs to update model deployment instance. " "Check: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/model_registration/quick_start.html" ) updated_properties = properties if not isinstance(properties, ModelDeploymentProperties): updated_properties = ModelDeploymentProperties( oci_model_deployment=properties, **kwargs ) update_model_deployment_details = ( updated_properties.to_update_deployment() if properties or updated_properties.oci_model_deployment or kwargs else self._update_model_deployment_details( update_type=update_type, **kwargs ) ) response = self.dsc_model_deployment.update( update_model_deployment_details=update_model_deployment_details, wait_for_completion=wait_for_completion, max_wait_time=max_wait_time, poll_interval=poll_interval, ) return self._update_from_oci_model(response)
[docs] def watch( self, log_type: str = None, time_start: datetime = None, interval: int = LOG_INTERVAL, log_filter: str = None, ) -> "ModelDeployment": """Streams the access and/or predict logs of model deployment. Parameters ---------- log_type: str, optional The log type. Can be `access`, `predict` or None. Defaults to None. time_start : datetime.datetime, optional Starting time for the log query. Defaults to None. interval : int, optional The time interval between sending each request to pull logs from OCI logging service. Defaults to 3. log_filter : str, optional Expression for filtering the logs. This will be the WHERE clause of the query. Defaults to None. status_list : List[str], optional List of status of model deployment. This is used to store list of status from logs. Returns ------- ModelDeployment The instance of ModelDeployment. """ status = "" while not self._stop_condition(): status = self._check_and_print_status(status) time.sleep(interval) time_start = time_start or self.time_created try: count = self.logs(log_type).stream( source=self.model_deployment_id, interval=interval, stop_condition=self._stream_stop_condition, time_start=time_start, log_filter=log_filter, ) if not count: print( "No logs in the last 14 days. Please reset time_start to see older logs." ) return self.sync() except KeyboardInterrupt: print("Stop watching logs.") pass
def _stop_condition(self): """Stops the sync once the model deployment is in a terminal state.""" return self.state in [State.ACTIVE, State.FAILED, State.DELETED, State.INACTIVE] def _stream_stop_condition(self): """Stops the stream sync once the model deployment is in a terminal state.""" return self.state in [State.FAILED, State.DELETED, State.INACTIVE] def _check_and_print_status(self, prev_status) -> str: """Check and print the next status. Parameters ---------- prev_status: str The previous model deployment status. Returns ------- str: The next model deployment status. """ status = self._model_deployment_status_text() if status != prev_status: timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") print(f"{timestamp} - {status}") return status def _model_deployment_status_text(self) -> str: """Formats the status message. Returns ------- str: The model deployment life status and life cycle details. """ details = f", {self.lifecycle_details}" if self.lifecycle_details else "" return f"Model Deployment {self.lifecycle_state}" + details @property def state(self) -> State: """Returns the deployment state of the current Model Deployment object""" request_attempts = 0 while request_attempts < DEFAULT_RETRYING_REQUEST_ATTEMPTS: request_attempts += 1 try: self.current_state = State._from_str(self.sync().lifecycle_state) break except: pass time.sleep(1) return self.current_state @property def status(self) -> State: """Returns the deployment state of the current Model Deployment object""" return self.state
[docs] def predict( self, json_input=None, data: Any = None, serializer: "ads.model.ModelInputSerializer" = model_input_serializer, auto_serialize_data: bool = False, model_name: str = None, model_version: str = None, **kwargs, ) -> dict: """Returns prediction of input data run against the model deployment endpoint. Examples -------- >>> import numpy as np >>> from ads.model import ModelInputSerializer >>> class MySerializer(ModelInputSerializer): ... def serialize(self, data): ... serialized_data = 1 ... return serialized_data >>> model_deployment = ModelDeployment.from_id(<model_deployment_id>) >>> prediction = model_deployment.predict( ... data=np.array([1, 2, 3]), ... serializer=MySerializer(), ... auto_serialize_data=True, ... )['prediction'] Parameters ---------- json_input: Json serializable JSON payload for the prediction. data: Any Data for the prediction. serializer: ads.model.ModelInputSerializer Defaults to ads.model.JsonModelInputSerializer. auto_serialize_data: bool Defaults to False. Indicate whether to auto serialize input data using `serializer`. If `auto_serialize_data=False`, `data` required to be bytes or json serializable and `json_input` required to be json serializable. If `auto_serialize_data` set to True, data will be serialized before sending to model deployment endpoint. model_name: str Defaults to None. When the `inference_server="triton"`, the name of the model to invoke. model_version: str Defaults to None. When the `inference_server="triton"`, the version of the model to invoke. kwargs: content_type: str Used to indicate the media type of the resource. By default, it will be `application/octet-stream` for bytes input and `application/json` otherwise. The content-type header will be set to this value when calling the model deployment endpoint. Returns ------- dict: Prediction results. """ current_state = self.sync().lifecycle_state if current_state != State.ACTIVE.name: raise ModelDeploymentPredictError( "This model deployment is not in active state, you will not be able to use predict end point. " f"Current model deployment state: {current_state} " ) endpoint = f"{self.url}/predict" signer = authutil.default_signer()["signer"] header = { "signer": signer, "content_type": kwargs.get("content_type", None), } header.update(kwargs.pop("headers", {})) if data is None and json_input is None: raise AttributeError( "Neither `data` nor `json_input` are provided. You need to provide one of them." ) if data is not None and json_input is not None: raise AttributeError( "`data` and `json_input` are both provided. You can only use one of them." ) try: if auto_serialize_data: data = data or json_input serialized_data = serializer.serialize(data=data) return send_request( data=serialized_data, endpoint=endpoint, is_json_payload=_is_json_serializable(serialized_data), header=header, ) if json_input is not None: if not _is_json_serializable(json_input): raise ValueError( "`json_input` must be json serializable. " "Set `auto_serialize_data` to True, or serialize the provided input data first," "or using `data` to pass binary data." ) utils.get_logger().warning( "The `json_input` argument of `predict()` will be deprecated soon. " "Please use `data` argument. " ) data = json_input is_json_payload = _is_json_serializable(data) if not isinstance(data, bytes) and not is_json_payload: raise TypeError( "`data` is not bytes or json serializable. Set `auto_serialize_data` to `True` to serialize the input data." ) if model_name and model_version: header["model-name"] = model_name header["model-version"] = model_version elif bool(model_version) ^ bool(model_name): raise ValueError( "`model_name` and `model_version` have to be provided together." ) prediction = send_request( data=data, endpoint=endpoint, is_json_payload=is_json_payload, header=header, ) return prediction except oci.exceptions.ServiceError as ex: # When bandwidth exceeds the allocated value, TooManyRequests error (429) will be raised by oci backend. if ex.status == 429: bandwidth_mbps = ( self.infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS ) utils.get_logger().warning( f"Load balancer bandwidth exceeds the allocated {bandwidth_mbps} Mbps." "To estimate the actual bandwidth, use formula: (payload size in KB) * (estimated requests per second) * 8 / 1024." "To resolve the issue, try sizing down the payload, slowing down the request rate or increasing the allocated bandwidth." ) raise
[docs] def activate( self, wait_for_completion: bool = True, max_wait_time: int = DEFAULT_WAIT_TIME, poll_interval: int = DEFAULT_POLL_INTERVAL, ) -> "ModelDeployment": """Activates a model deployment Parameters ---------- wait_for_completion: bool Flag set for whether to wait for deployment to be activated before proceeding. Defaults to True. max_wait_time: int Maximum amount of time to wait in seconds (Defaults to 1200). Negative implies infinite wait time. poll_interval: int Poll interval in seconds (Defaults to 10). Returns ------- ModelDeployment The instance of ModelDeployment. """ response = self.dsc_model_deployment.activate( wait_for_completion=wait_for_completion, max_wait_time=max_wait_time, poll_interval=poll_interval, ) return self._update_from_oci_model(response)
[docs] def deactivate( self, wait_for_completion: bool = True, max_wait_time: int = DEFAULT_WAIT_TIME, poll_interval: int = DEFAULT_POLL_INTERVAL, ) -> "ModelDeployment": """Deactivates a model deployment Parameters ---------- wait_for_completion: bool Flag set for whether to wait for deployment to be deactivated before proceeding. Defaults to True. max_wait_time: int Maximum amount of time to wait in seconds (Defaults to 1200). Negative implies infinite wait time. poll_interval: int Poll interval in seconds (Defaults to 10). Returns ------- ModelDeployment The instance of ModelDeployment. """ response = self.dsc_model_deployment.deactivate( wait_for_completion=wait_for_completion, max_wait_time=max_wait_time, poll_interval=poll_interval, ) return self._update_from_oci_model(response)
def _log_details(self, log_type: str = ModelDeploymentLogType.ACCESS): """Gets log details for the provided `log_type`. Properties ---------- log_type: (str, optional). Defaults to "access". The log type. Can be "access" or "predict". Returns ------- oci.datascience_model.CategoryLogDetails Category log details of the ModelDeployment. Raises ------ AttributeError Deployment doesn't have requested log configuration. """ if self.properties.category_log_details and getattr( self.properties.category_log_details, log_type ): return getattr(self.properties.category_log_details, log_type) elif self.infrastructure: category_log_details = self._build_category_log_details() log = category_log_details.get(log_type, None) if log and log.get("logId", None) and log.get("logGroupId", None): return LogDetails( log_id=log.get("logId"), log_group_id=log.get("logGroupId") ) raise LogNotConfiguredError( f"Deployment `{self.model_deployment_id}` " f"has no `{log_type}` log configuration." ) @property def predict_log(self) -> OCILog: """Gets the model deployment predict logs object. Returns ------- OCILog The OCILog object containing the predict logs. """ if not self._predict_log: log_details = self._log_details(log_type=ModelDeploymentLogType.PREDICT) compartment_id = ( self.infrastructure.compartment_id if self.infrastructure else self.properties.compartment_id ) self._predict_log = OCILog( compartment_id=compartment_id or COMPARTMENT_OCID, id=log_details.log_id, log_group_id=log_details.log_group_id, source=self.model_deployment_id, annotation=ModelDeploymentLogType.PREDICT, ) return self._predict_log @property def access_log(self) -> OCILog: """Gets the model deployment access logs object. Returns ------- OCILog The OCILog object containing the access logs. """ if not self._access_log: log_details = self._log_details(log_type=ModelDeploymentLogType.ACCESS) compartment_id = ( self.infrastructure.compartment_id if self.infrastructure else self.properties.compartment_id ) self._access_log = OCILog( compartment_id=compartment_id or COMPARTMENT_OCID, id=log_details.log_id, log_group_id=log_details.log_group_id, source=self.model_deployment_id, annotation=ModelDeploymentLogType.ACCESS, ) return self._access_log
[docs] def logs(self, log_type: str = None) -> ConsolidatedLog: """Gets the access or predict logs. Parameters ---------- log_type: (str, optional). Defaults to None. The log type. Can be "access", "predict" or None. Returns ------- ConsolidatedLog The ConsolidatedLog object containing the logs. """ loggers = [] if not log_type: try: loggers.append(self.access_log) except LogNotConfiguredError: pass try: loggers.append(self.predict_log) except LogNotConfiguredError: pass if not loggers: raise LogNotConfiguredError( "Neither `predict` nor `access` log was configured for the model deployment." ) elif log_type == ModelDeploymentLogType.ACCESS: loggers = [self.access_log] elif log_type == ModelDeploymentLogType.PREDICT: loggers = [self.predict_log] else: raise ValueError( "Parameter log_type should be either access, predict or None." ) return ConsolidatedLog(*loggers)
[docs] def show_logs( self, time_start: datetime.datetime = None, time_end: datetime.datetime = None, limit: int = LOG_RECORDS_LIMIT, log_type: str = None, ): """Shows deployment logs as a pandas dataframe. Parameters ---------- time_start: (datetime.datetime, optional). Defaults to None. Starting date and time in RFC3339 format for retrieving logs. Defaults to None. Logs will be retrieved 14 days from now. time_end: (datetime.datetime, optional). Defaults to None. Ending date and time in RFC3339 format for retrieving logs. Defaults to None. Logs will be retrieved until now. limit: (int, optional). Defaults to 100. The maximum number of items to return. log_type: (str, optional). Defaults to None. The log type. Can be "access", "predict" or None. Returns ------- A pandas DataFrame containing logs. """ logging = self.logs(log_type=log_type) def prepare_log_record(log): """Converts a log record to ordered dict""" log_content = log.get("logContent", {}) return collections.OrderedDict( [ ("type", log_content.get("type").split(".")[-1]), ("id", log_content.get("id")), ("message", log_content.get("data", {}).get("message")), ("time", log_content.get("time")), ] ) logs = logging.search( source=self.model_deployment_id, time_start=time_start, time_end=time_end, limit=limit, ) return pd.DataFrame([prepare_log_record(log.data) for log in logs])
[docs] def sync(self) -> "ModelDeployment": """Updates the model deployment instance from backend. Returns ------- ModelDeployment The ModelDeployment instance (self). """ return self._update_from_oci_model( OCIDataScienceModelDeployment.from_id(self.model_deployment_id) )
[docs] @classmethod def list( cls, status: str = None, compartment_id: str = None, project_id: str = None, **kwargs, ) -> List["ModelDeployment"]: """Lists the model deployments associated with current compartment id and status Parameters ---------- status : str Status of deployment. Defaults to None. Allowed values: `ACTIVE`, `CREATING`, `DELETED`, `DELETING`, `FAILED`, `INACTIVE` and `UPDATING`. compartment_id : str Target compartment to list deployments from. Defaults to the compartment set in the environment variable "NB_SESSION_COMPARTMENT_OCID". If "NB_SESSION_COMPARTMENT_OCID" is not set, the root compartment ID will be used. An ValueError will be raised if root compartment ID cannot be determined. project_id : str Target project to list deployments from. Defaults to the project id in the environment variable "PROJECT_OCID". kwargs : The values are passed to oci.data_science.DataScienceClient.list_model_deployments. Returns ------- list A list of ModelDeployment objects. """ deployments = OCIDataScienceModelDeployment.list( status=status, compartment_id=compartment_id, project_id=project_id, **kwargs, ) return [cls()._update_from_oci_model(deployment) for deployment in deployments]
[docs] @classmethod def list_df( cls, status: str = None, compartment_id: str = None, project_id: str = None, ) -> pd.DataFrame: """Returns the model deployments associated with current compartment and status as a Dataframe that can be easily visualized Parameters ---------- status : str Status of deployment. Defaults to None. Allowed values: `ACTIVE`, `CREATING`, `DELETED`, `DELETING`, `FAILED`, `INACTIVE` and `UPDATING`. compartment_id : str Target compartment to list deployments from. Defaults to the compartment set in the environment variable "NB_SESSION_COMPARTMENT_OCID". If "NB_SESSION_COMPARTMENT_OCID" is not set, the root compartment ID will be used. An ValueError will be raised if root compartment ID cannot be determined. project_id : str Target project to list deployments from. Defaults to the project id in the environment variable "PROJECT_OCID". Returns ------- DataFrame pandas Dataframe containing information about the ModelDeployments """ model_deployments = cls.list( status=status, compartment_id=compartment_id, project_id=project_id ) if isinstance(status, str) or status == None: status = State._from_str(status) display = pd.DataFrame() ids, urls, status_list = [], [], [] for model_deployment in model_deployments: state_of_model = State._from_str(model_deployment.lifecycle_state) if status == State.UNKNOWN or status.name == state_of_model.name: ids.append(model_deployment.model_deployment_id) urls.append(model_deployment.url) status_list.append(model_deployment.lifecycle_state) display["deployment_id"] = ids display["deployment_url"] = urls display["current_state"] = status_list return display
[docs] @classmethod def from_id(cls, id: str) -> "ModelDeployment": """Loads the model deployment instance from ocid. Parameters ---------- id: str The ocid of model deployment. Returns ------- ModelDeployment The ModelDeployment instance (self). """ oci_model = OCIDataScienceModelDeployment.from_id(id) return cls(properties=oci_model)._update_from_oci_model(oci_model)
[docs] @classmethod def from_dict(cls, obj_dict: Dict) -> "ModelDeployment": """Loads model deployment instance from a dictionary of configurations. Parameters ---------- obj_dict: Dict A dictionary of configurations. Returns ------- ModelDeployment The model deployment instance. """ if not isinstance(obj_dict, dict): raise ValueError( "The config data for initializing the model deployment is invalid." ) spec = ads_utils.batch_convert_case( copy.deepcopy(obj_dict.get("spec")), "camel" ) mappings = { cls.CONST_INFRASTRUCTURE: { MODEL_DEPLOYMENT_INFRASTRUCTURE_TYPE: ModelDeploymentInfrastructure, }, cls.CONST_RUNTIME: { ModelDeploymentRuntimeType.CONDA: ModelDeploymentCondaRuntime, ModelDeploymentRuntimeType.CONTAINER: ModelDeploymentContainerRuntime, }, } model_deployment = cls() for key, value in spec.items(): if key in mappings: mapping = mappings[key] child_config = value if child_config.get("type") not in mapping: raise NotImplementedError( f"{key.title()} type: {child_config.get('type')} is not supported." ) model_deployment.set_spec( key, mapping[child_config.get("type")].from_dict(child_config) ) else: model_deployment.set_spec(key, value) return model_deployment
[docs] def to_dict(self, **kwargs) -> Dict: """Serializes model deployment to a dictionary. Returns ------- dict The model deployment serialized as a dictionary. """ spec = copy.deepcopy(self._spec) for key, value in spec.items(): if hasattr(value, "to_dict"): value = value.to_dict() spec[key] = value return { "kind": self.kind, "type": self.type, "spec": ads_utils.batch_convert_case(spec, "camel"), }
def _update_from_oci_model(self, oci_model_instance) -> "ModelDeployment": """Updates model deployment instance from OCIDataScienceModelDeployment. Parameters ---------- oci_model_instance: OCIDataScienceModelDeployment The OCIDataScienceModelDeployment instance. Returns ------- ModelDeployment The model deployment instance. """ self.dsc_model_deployment = oci_model_instance for key, value in self.attribute_map.items(): if hasattr(oci_model_instance, value): self.set_spec(key, getattr(oci_model_instance, value)) infrastructure = ModelDeploymentInfrastructure() self._extract_from_oci_model( infrastructure, oci_model_instance, infrastructure.sub_level_attribute_maps ) model_deployment_configuration_details = getattr( oci_model_instance, "model_deployment_configuration_details", None ) environment_configuration_details = getattr( model_deployment_configuration_details, "environment_configuration_details", None, ) runtime = ( ModelDeploymentContainerRuntime() if getattr( environment_configuration_details, "environment_configuration_type", None, ) == OCIModelDeploymentRuntimeType.CONTAINER else ModelDeploymentCondaRuntime() ) self._extract_from_oci_model(runtime, oci_model_instance) infrastructure.set_spec( infrastructure.CONST_WEB_CONCURRENCY, runtime.env.get("WEB_CONCURRENCY", None), ) if ( runtime.env.get("CONTAINER_TYPE", None) == MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON ): runtime.set_spec( runtime.CONST_INFERENCE_SERVER, MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON.lower(), ) self.set_spec(self.CONST_INFRASTRUCTURE, infrastructure) self.set_spec(self.CONST_RUNTIME, runtime) return self @staticmethod def _extract_from_oci_model( dsc_instance: Union[ModelDeploymentInfrastructure, ModelDeploymentRuntime], oci_model_instance: OCIDataScienceModelDeployment, sub_level: Dict = {}, ) -> Union[ModelDeploymentInfrastructure, ModelDeploymentRuntime]: """Extract attributes from OCIDataScienceModelDeployment. Parameters ---------- dsc_instance: Union[ModelDeploymentInfrastructure, ModelDeploymentRuntime] The ModelDeploymentInfrastructure or ModelDeploymentRuntime instance. oci_model_instance: OCIDataScienceModelDeployment The OCIDataScienceModelDeployment instance. sub_level: Dict The sub level attribute maps of ModelDeploymentInfrastructure or ModelDeploymentRuntime Returns ------- Union[ModelDeploymentInfrastructure, ModelDeploymentRuntime] The ModelDeploymentInfrastructure or ModelDeploymentRuntime instance. """ for infra_attr, dsc_attr in dsc_instance.payload_attribute_map.items(): dsc_attr = dsc_attr if isinstance(dsc_attr, list) else [dsc_attr] for dsc_value in dsc_attr: value = get_value(oci_model_instance, dsc_value) if value: if infra_attr not in sub_level: dsc_instance._spec[infra_attr] = value else: dsc_instance._spec[infra_attr] = {} for sub_infra_attr, sub_dsc_attr in sub_level[ infra_attr ].items(): sub_value = get_value(value, sub_dsc_attr) if sub_value: dsc_instance._spec[infra_attr][sub_infra_attr] = ( sub_value ) return dsc_instance def _build_model_deployment_details(self) -> CreateModelDeploymentDetails: """Builds CreateModelDeploymentDetails from model deployment instance. Returns ------- CreateModelDeploymentDetails The CreateModelDeploymentDetails instance. """ if not (self.infrastructure and self.runtime): raise ValueError( "Missing parameter runtime or infrastructure. Try reruning it after parameters are fully configured." ) create_model_deployment_details = { self.CONST_DISPLAY_NAME: self.display_name or self._random_display_name(), self.CONST_DESCRIPTION: self.description, self.CONST_DEFINED_TAG: self.defined_tags, self.CONST_FREEFORM_TAG: self.freeform_tags, self.runtime.CONST_DEPLOYMENT_MODE: self.runtime.deployment_mode or ModelDeploymentMode.HTTPS, self.infrastructure.CONST_COMPARTMENT_ID: self.infrastructure.compartment_id or COMPARTMENT_OCID, self.infrastructure.CONST_PROJECT_ID: self.infrastructure.project_id or PROJECT_OCID, self.infrastructure.CONST_MODEL_DEPLOYMENT_CONFIG_DETAILS: self._build_model_deployment_configuration_details(), } if not self.infrastructure.compute_target_details: create_model_deployment_details[ self.infrastructure.CONST_CATEGORY_LOG_DETAILS ] = self._build_category_log_details() return OCIDataScienceModelDeployment( **create_model_deployment_details ).to_oci_model(CreateModelDeploymentDetails) def _update_model_deployment_details( self, update_type: str, **kwargs ) -> UpdateModelDeploymentDetails: """Builds UpdateModelDeploymentDetails from model deployment instance. Parameters ---------- update_type: str Update type for the deployment. Allowed values: ['ZDT', 'LIVE']. 'LIVE' update can only be used if model group id has been changed. Returns ------- UpdateModelDeploymentDetails The UpdateModelDeploymentDetails instance. """ if not (self.infrastructure and self.runtime): raise ValueError( "Missing parameter runtime or infrastructure. Try reruning it after parameters are fully configured." ) self._update_spec(**kwargs) update_model_deployment_details = { self.CONST_DISPLAY_NAME: self.display_name, self.CONST_DESCRIPTION: self.description, self.CONST_DEFINED_TAG: self.defined_tags, self.CONST_FREEFORM_TAG: self.freeform_tags, self.infrastructure.CONST_MODEL_DEPLOYMENT_CONFIG_DETAILS: self._build_model_deployment_configuration_details(), self.infrastructure.CONST_CATEGORY_LOG_DETAILS: self._build_category_log_details(), } dsc_update_model_deployment_details: UpdateModelDeploymentDetails = ( OCIDataScienceModelDeployment( **update_model_deployment_details ).to_oci_model(UpdateModelDeploymentDetails) ) dsc_update_model_deployment_details.model_deployment_configuration_details.update_type = update_type return dsc_update_model_deployment_details def _update_spec(self, **kwargs) -> "ModelDeployment": """Updates model deployment specs from kwargs. Parameters ---------- kwargs: display_name: (str) Model deployment display name description: (str) Model deployment description freeform_tags: (dict) Model deployment freeform tags defined_tags: (dict) Model deployment defined tags Additional kwargs arguments. Can be any attribute that `ads.model.deployment.ModelDeploymentCondaRuntime`, `ads.model.deployment.ModelDeploymentContainerRuntime` and `ads.model.deployment.ModelDeploymentInfrastructure` accepts. Returns ------- ModelDeployment The instance of ModelDeployment. """ if not kwargs: return self converted_specs = ads_utils.batch_convert_case(kwargs, "camel") specs = { "self": self._spec, "runtime": self.runtime._spec, "infrastructure": self.infrastructure._spec, } sub_set = { self.infrastructure.CONST_ACCESS_LOG, self.infrastructure.CONST_PREDICT_LOG, self.infrastructure.CONST_SHAPE_CONFIG_DETAILS, } for spec_value in specs.values(): for key in spec_value: if key in converted_specs: if key in sub_set: for sub_key in converted_specs[key]: converted_sub_key = ads_utils.snake_to_camel(sub_key) spec_value[key][converted_sub_key] = converted_specs[key][ sub_key ] else: spec_value[key] = copy.deepcopy(converted_specs[key]) self = ( ModelDeployment(spec=specs["self"]) .with_runtime( MODEL_DEPLOYMENT_RUNTIMES[self.runtime.type](spec=specs["runtime"]) ) .with_infrastructure( ModelDeploymentInfrastructure(spec=specs["infrastructure"]) ) ) return self def _build_model_deployment_configuration_details(self) -> Dict: """Builds model deployment configuration details from model deployment instance. Returns ------- Dict: Dict contains model deployment configuration details. """ infrastructure = self.infrastructure runtime = self.runtime instance_configuration = { infrastructure.CONST_INSTANCE_SHAPE_NAME: infrastructure.shape_name or DEFAULT_SHAPE_NAME, } if instance_configuration[infrastructure.CONST_INSTANCE_SHAPE_NAME].endswith( "Flex" ): instance_configuration[ infrastructure.CONST_MODEL_DEPLOYMENT_INSTANCE_SHAPE_CONFIG_DETAILS ] = { infrastructure.CONST_OCPUS: infrastructure.shape_config_details.get( "ocpus", None ) or DEFAULT_OCPUS, infrastructure.CONST_MEMORY_IN_GBS: infrastructure.shape_config_details.get( "memory_in_gbs", None ) or infrastructure.shape_config_details.get("memoryInGBs", None) or DEFAULT_MEMORY_IN_GBS, } if infrastructure.subnet_id: instance_configuration[infrastructure.CONST_SUBNET_ID] = ( infrastructure.subnet_id ) if infrastructure.private_endpoint_id: if not hasattr( oci.data_science.models.InstanceConfiguration, "private_endpoint_id" ): # TODO: add oci version with private endpoint support. raise OSError( "Private endpoint is not supported in the current OCI SDK installed." ) instance_configuration[infrastructure.CONST_PRIVATE_ENDPOINT_ID] = ( infrastructure.private_endpoint_id ) # Add capacity reservation IDs if provided if infrastructure.capacity_reservation_ids: if not hasattr( oci.data_science.models.InstanceConfiguration, "capacity_reservation_ids", ): raise OSError( "Capacity reservation is not supported in the current OCI SDK installed. " "Please upgrade to a newer version of the OCI SDK." ) instance_configuration[infrastructure.CONST_CAPACITY_RESERVATION_IDS] = ( infrastructure.capacity_reservation_ids ) def _drop_none_values(d: Dict) -> Dict: """Drops keys with None values from the provided dict.""" return {k: v for k, v in d.items() if v is not None} # Fixed-size is the default. If autoscaling is configured on infrastructure, # emit an AUTOSCALING policy (supported for both SINGLE_MODEL and MODEL_GROUP). auto_scaling = getattr(infrastructure, "auto_scaling", None) or {} if auto_scaling: scaling_type = str(auto_scaling.get("scalingType", "") or "").lower() metric_type = scaling_type.upper() scaling_policy = { infrastructure.CONST_POLICY_TYPE: "AUTOSCALING", "isEnabled": auto_scaling.get("isEnabled", True), "coolDownInSeconds": auto_scaling.get("coolDownInSeconds", None), "autoScalingPolicies": [ _drop_none_values( { "autoScalingPolicyType": "THRESHOLD", "maximumInstanceCount": auto_scaling.get( "maximumInstanceCount", 3 ), "minimumInstanceCount": auto_scaling.get( "minimumInstanceCount", 1 ), "initialInstanceCount": auto_scaling.get( "initialInstanceCount", infrastructure.replica or DEFAULT_REPLICA, ), "rules": [ { "metricExpressionRuleType": "PREDEFINED_EXPRESSION", "metricType": metric_type, "scaleInConfiguration": _drop_none_values( { "scalingConfigurationType": "THRESHOLD", "threshold": auto_scaling.get( "scaleInThreshold", 30 ), } ), "scaleOutConfiguration": _drop_none_values( { "scalingConfigurationType": "THRESHOLD", "threshold": auto_scaling.get( "scaleOutThreshold", 70 ), } ), } ], } ) ], } scaling_policy = _drop_none_values(scaling_policy) else: scaling_policy = { infrastructure.CONST_POLICY_TYPE: "FIXED_SIZE", infrastructure.CONST_INSTANCE_COUNT: infrastructure.replica or DEFAULT_REPLICA, } if not (runtime.model_uri or runtime.model_group_id): raise ValueError( "Missing parameter model uri and model group id. Try reruning it after model or model group is configured." ) model_id = runtime.model_uri if model_id and not model_id.startswith("ocid"): from ads.model.datascience_model import DataScienceModel dsc_model = DataScienceModel( name=self.display_name, compartment_id=self.infrastructure.compartment_id or COMPARTMENT_OCID, project_id=self.infrastructure.project_id or PROJECT_OCID, artifact=runtime.model_uri, ).create( bucket_uri=runtime.bucket_uri, auth=runtime.auth, region=runtime.region, overwrite_existing_artifact=runtime.overwrite_existing_artifact, remove_existing_artifact=runtime.remove_existing_artifact, timeout=runtime.timeout, ) model_id = dsc_model.id model_configuration_details = { infrastructure.CONST_BANDWIDTH_MBPS: infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS, infrastructure.CONST_INSTANCE_CONFIG: instance_configuration, runtime.CONST_MODEL_ID: model_id, infrastructure.CONST_SCALING_POLICY: scaling_policy, } if runtime.env: if not hasattr( oci.data_science.models, "ModelDeploymentEnvironmentConfigurationDetails", ): raise OSError( "Environment variable hasn't been supported in the current OCI SDK installed." ) environment_variables = runtime.env if infrastructure.web_concurrency: environment_variables["WEB_CONCURRENCY"] = str( infrastructure.web_concurrency ) runtime.set_spec(runtime.CONST_ENV, environment_variables) if ( hasattr(runtime, "inference_server") and runtime.inference_server and runtime.inference_server.upper() == MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON ): environment_variables["CONTAINER_TYPE"] = ( MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON ) runtime.set_spec(runtime.CONST_ENV, environment_variables) environment_configuration_details = { runtime.CONST_ENVIRONMENT_CONFIG_TYPE: runtime.environment_config_type, runtime.CONST_ENVIRONMENT_VARIABLES: runtime.env, } if runtime.environment_config_type == OCIModelDeploymentRuntimeType.CONTAINER: if not hasattr( oci.data_science.models, "OcirModelDeploymentEnvironmentConfigurationDetails", ): raise OSError( "Container runtime hasn't been supported in the current OCI SDK installed." ) environment_configuration_details["image"] = runtime.image environment_configuration_details["imageDigest"] = runtime.image_digest environment_configuration_details["cmd"] = runtime.cmd environment_configuration_details["entrypoint"] = runtime.entrypoint environment_configuration_details["serverPort"] = runtime.server_port environment_configuration_details["healthCheckPort"] = ( runtime.health_check_port ) model_deployment_configuration_details = { infrastructure.CONST_DEPLOYMENT_TYPE: ModelDeploymentType.SINGLE_MODEL, infrastructure.CONST_MODEL_CONFIG_DETAILS: model_configuration_details, runtime.CONST_ENVIRONMENT_CONFIG_DETAILS: environment_configuration_details, } if runtime.model_group_id: model_deployment_configuration_details[ infrastructure.CONST_DEPLOYMENT_TYPE ] = ModelDeploymentType.MODEL_GROUP model_deployment_configuration_details["modelGroupConfigurationDetails"] = { runtime.CONST_MODEL_GROUP_ID: runtime.model_group_id } model_deployment_configuration_details[ "infrastructureConfigurationDetails" ] = { "infrastructureType": "INSTANCE_POOL", infrastructure.CONST_BANDWIDTH_MBPS: infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS, infrastructure.CONST_INSTANCE_CONFIG: instance_configuration, infrastructure.CONST_SCALING_POLICY: scaling_policy, } model_configuration_details.pop(runtime.CONST_MODEL_ID) if infrastructure.compute_target_details: if not hasattr( oci.data_science.models, "ManagedComputeClusterModelDeploymentResourceConfiguration", ): raise OSError( "Managed compute cluster is not supported in the current OCI SDK installed. " "Please upgrade to a newer version of the OCI SDK." ) compute_target_details = infrastructure.compute_target_details model_deployment_configuration_details[ infrastructure.CONST_DEPLOYMENT_TYPE ] = ModelDeploymentType.SINGLE_MODEL_FLEX model_deployment_configuration_details[ "infrastructureConfigurationDetails" ] = { "infrastructureType": "MANAGED_COMPUTE_CLUSTER", infrastructure.CONST_COMPUTE_TARGET_ID: compute_target_details.get( "compute_target_id" ), infrastructure.CONST_MODEL_DEPLOYMENT_RESOURCE_CONFIGURATION: { infrastructure.CONST_RESOURCE_REQUEST_CONFIGURATION: { infrastructure.CONST_OCPUS: compute_target_details.get("ocpus"), infrastructure.CONST_MEMORY_IN_GBS: compute_target_details.get( "memory_in_gbs" ), infrastructure.CONST_GPUS: compute_target_details.get( "gpu_count" ), }, }, infrastructure.CONST_SCALING_POLICY: scaling_policy, } model_configuration_details.pop(infrastructure.CONST_BANDWIDTH_MBPS) model_configuration_details.pop(infrastructure.CONST_INSTANCE_CONFIG) model_configuration_details.pop(infrastructure.CONST_SCALING_POLICY) if runtime.deployment_mode == ModelDeploymentMode.STREAM: if not hasattr(oci.data_science.models, "StreamConfigurationDetails"): raise OSError( "Model deployment mode hasn't been supported in the current OCI SDK installed." ) model_deployment_configuration_details[ infrastructure.CONST_STREAM_CONFIG_DETAILS ] = { runtime.CONST_INPUT_STREAM_IDS: runtime.input_stream_ids, runtime.CONST_OUTPUT_STREAM_IDS: runtime.output_stream_ids, } return model_deployment_configuration_details def _build_category_log_details(self) -> Dict: """Builds category log details from model deployment instance. Returns ------- Dict: Dict contains category log details. """ if self.infrastructure.log_group_id and self.infrastructure.log_id: log_group_details = { self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.log_group_id, self.infrastructure.CONST_LOG_ID: self.infrastructure.log_id, } return { self.infrastructure.CONST_ACCESS: log_group_details, self.infrastructure.CONST_PREDICT: log_group_details, } logs = {} if ( self.infrastructure.access_log and self.infrastructure.access_log.get( self.infrastructure.CONST_LOG_GROUP_ID, None ) and self.infrastructure.access_log.get( self.infrastructure.CONST_LOG_ID, None ) ): logs[self.infrastructure.CONST_ACCESS] = { self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.access_log.get( "logGroupId", None ), self.infrastructure.CONST_LOG_ID: self.infrastructure.access_log.get( "logId", None ), } if ( self.infrastructure.predict_log and self.infrastructure.predict_log.get( self.infrastructure.CONST_LOG_GROUP_ID, None ) and self.infrastructure.predict_log.get( self.infrastructure.CONST_LOG_ID, None ) ): logs[self.infrastructure.CONST_PREDICT] = { self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.predict_log.get( "logGroupId", None ), self.infrastructure.CONST_LOG_ID: self.infrastructure.predict_log.get( "logId", None ), } return logs def _random_display_name(self): """Generates a random display name.""" return f"{self._PREFIX}-{ads_utils.get_random_name_for_resource()}" def _extract_spec_kwargs(self, **kwargs) -> Dict: """Extract spec related keyword arguments from kwargs. Parameters ---------- kwargs Returns ------- Dict: Dict contains model deployment spec related keyword arguments. """ spec_kwargs = {} for attribute in self.initialize_spec_attributes: if attribute in kwargs: spec_kwargs[attribute] = kwargs[attribute] return spec_kwargs
[docs] def build(self) -> "ModelDeployment": """Load default values from the environment for the job infrastructure.""" build_method = getattr(self.infrastructure, "build", None) if build_method and callable(build_method): build_method() else: raise NotImplementedError return self