#!/usr/bin/env python
# Copyright (c) 2021, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
import collections
import copy
import datetime
import time
import warnings
from typing import Any, Dict, List, Union
import oci
import oci.loggingsearch
import pandas as pd
from oci.data_science.models import (
CreateModelDeploymentDetails,
LogDetails,
UpdateModelDeploymentDetails,
)
from ads.common import auth as authutil
from ads.common import utils as ads_utils
from ads.common.oci_logging import (
LOG_INTERVAL,
LOG_RECORDS_LIMIT,
ConsolidatedLog,
OCILog,
)
from ads.config import COMPARTMENT_OCID, PROJECT_OCID
from ads.jobs.builders.base import Builder
from ads.jobs.builders.infrastructure.utils import get_value
from ads.model.common.utils import _is_json_serializable
from ads.model.deployment.common.utils import send_request
from ads.model.deployment.model_deployment_infrastructure import (
DEFAULT_BANDWIDTH_MBPS,
DEFAULT_MEMORY_IN_GBS,
DEFAULT_OCPUS,
DEFAULT_REPLICA,
DEFAULT_SHAPE_NAME,
MODEL_DEPLOYMENT_INFRASTRUCTURE_TYPE,
ModelDeploymentInfrastructure,
)
from ads.model.deployment.model_deployment_runtime import (
ModelDeploymentCondaRuntime,
ModelDeploymentContainerRuntime,
ModelDeploymentMode,
ModelDeploymentRuntime,
ModelDeploymentRuntimeType,
OCIModelDeploymentRuntimeType,
)
from ads.model.serde.model_input import JsonModelInputSERDE
from ads.model.service.oci_datascience_model_deployment import (
OCIDataScienceModelDeployment,
)
from .common import utils
from .common.utils import State
from .model_deployment_properties import ModelDeploymentProperties
DEFAULT_WAIT_TIME = 1200
DEFAULT_POLL_INTERVAL = 10
DEFAULT_WORKFLOW_STEPS = 6
DELETE_WORKFLOW_STEPS = 2
DEACTIVATE_WORKFLOW_STEPS = 2
DEFAULT_RETRYING_REQUEST_ATTEMPTS = 3
MODEL_DEPLOYMENT_KIND = "deployment"
MODEL_DEPLOYMENT_TYPE = "modelDeployment"
MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON = "TRITON"
MODEL_DEPLOYMENT_RUNTIMES = {
ModelDeploymentRuntimeType.CONDA: ModelDeploymentCondaRuntime,
ModelDeploymentRuntimeType.CONTAINER: ModelDeploymentContainerRuntime,
}
[docs]
class ModelDeploymentLogType:
PREDICT = "predict"
ACCESS = "access"
[docs]
class ModelDeploymentType:
SINGLE_MODEL = "SINGLE_MODEL"
SINGLE_MODEL_FLEX = "SINGLE_MODEL_FLEX"
MODEL_GROUP = "MODEL_GROUP"
[docs]
class ModelDeploymentUpdateType:
ZDT = "ZDT"
LIVE = "LIVE"
[docs]
class ModelDeploymentPredictError(Exception): # pragma: no cover
pass
[docs]
class ModelDeployment(Builder):
"""
A class used to represent a Model Deployment.
Attributes
----------
config: (dict)
Deployment configuration parameters
properties: (ModelDeploymentProperties)
ModelDeploymentProperties object
workflow_state_progress: (str)
Workflow request id
workflow_steps: (int)
The number of steps in the workflow
dsc_model_deployment: (OCIDataScienceModelDeployment)
The OCIDataScienceModelDeployment instance.
state: (State)
Returns the deployment state of the current Model Deployment object
created_by: (str)
The user that creates the model deployment
lifecycle_state: (str)
Model deployment lifecycle state
lifecycle_details: (str)
Model deployment lifecycle details
time_created: (datetime)
The time when the model deployment is created
display_name: (str)
Model deployment display name
description: (str)
Model deployment description
freeform_tags: (dict)
Model deployment freeform tags
defined_tags: (dict)
Model deployment defined tags
runtime: (ModelDeploymentRuntime)
Model deployment runtime
infrastructure: (ModelDeploymentInfrastructure)
Model deployment infrastructure
Methods
-------
deploy(wait_for_completion, **kwargs)
Deploy the current Model Deployment object
delete(wait_for_completion, **kwargs)
Deletes the current Model Deployment object
update(wait_for_completion, **kwargs)
Updates a model deployment
activate(wait_for_completion, max_wait_time, poll_interval)
Activates a model deployment
deactivate(wait_for_completion, max_wait_time, poll_interval)
Deactivates a model deployment
list(status, compartment_id, project_id, **kwargs)
List model deployment within given compartment and project.
with_display_name(display_name)
Sets model deployment display name
with_description(description)
Sets model deployment description
with_freeform_tags(freeform_tags)
Sets model deployment freeform tags
with_defined_tags(defined_tags)
Sets model deployment defined tags
with_runtime(self, runtime)
Sets model deployment runtime
with_infrastructure(self, infrastructure)
Sets model deployment infrastructure
from_dict(obj_dict)
Deserializes model deployment instance from dict
from_id(id)
Loads model deployment instance from ocid
sync()
Updates the model deployment instance from backend
Examples
--------
>>> # Build model deployment from builder apis:
>>> ds_model_deployment = (ModelDeployment()
... .with_display_name("TestModelDeployment")
... .with_description("Testing the test model deployment")
... .with_freeform_tags(tag1="val1", tag2="val2")
... .with_infrastructure(
... (ModelDeploymentInfrastructure()
... .with_project_id(<project_id>)
... .with_compartment_id(<compartment_id>)
... .with_shape_name("VM.Standard.E4.Flex")
... .with_shape_config_details(
... ocpus=1,
... memory_in_gbs=16
... )
... .with_replica(1)
... .with_bandwidth_mbps(10)
... .with_web_concurrency(10)
... .with_access_log(
... log_group_id=<log_group_id>,
... log_id=<log_id>
... )
... .with_predict_log(
... log_group_id=<log_group_id>,
... log_id=<log_id>
... ))
... )
... .with_runtime(
... (ModelDeploymentContainerRuntime()
... .with_image(<image>)
... .with_image_digest(<image_digest>)
... .with_entrypoint(<entrypoint>)
... .with_server_port(<server_port>)
... .with_health_check_port(<health_check_port>)
... .with_env({"key":"value"})
... .with_deployment_mode("HTTPS_ONLY")
... .with_model_uri(<model_uri>)
... .with_bucket_uri(<bucket_uri>)
... .with_auth(<auth>)
... .with_timeout(<time_out>))
... )
... )
>>> ds_model_deployment.deploy()
>>> ds_model_deployment.status
>>> ds_model_deployment.with_display_name("new name").update()
>>> ds_model_deployment.deactivate()
>>> ds_model_deployment.sync()
>>> ds_model_deployment.list(status="ACTIVE")
>>> ds_model_deployment.delete()
>>> # Build model deployment from yaml
>>> ds_model_deployment = ModelDeployment.from_yaml(uri=<path_to_yaml>)
"""
_PREFIX = "datascience_model_deployment"
CONST_ID = "id"
CONST_CREATED_BY = "createdBy"
CONST_DISPLAY_NAME = "displayName"
CONST_DESCRIPTION = "description"
CONST_FREEFORM_TAG = "freeformTags"
CONST_DEFINED_TAG = "definedTags"
CONST_MODEL_DEPLOYMENT_URL = "modelDeploymentUrl"
CONST_INFRASTRUCTURE = "infrastructure"
CONST_RUNTIME = "runtime"
CONST_LIFECYCLE_STATE = "lifecycleState"
CONST_LIFECYCLE_DETAILS = "lifecycleDetails"
CONST_TIME_CREATED = "timeCreated"
CONST_UPDATE_TYPE = "updateType"
attribute_map = {
CONST_ID: "id",
CONST_CREATED_BY: "created_by",
CONST_DISPLAY_NAME: "display_name",
CONST_DESCRIPTION: "description",
CONST_FREEFORM_TAG: "freeform_tags",
CONST_DEFINED_TAG: "defined_tags",
CONST_MODEL_DEPLOYMENT_URL: "model_deployment_url",
CONST_INFRASTRUCTURE: "infrastructure",
CONST_RUNTIME: "runtime",
CONST_LIFECYCLE_STATE: "lifecycle_state",
CONST_LIFECYCLE_DETAILS: "lifecycle_details",
CONST_TIME_CREATED: "time_created",
}
initialize_spec_attributes = [
"display_name",
"description",
"freeform_tags",
"defined_tags",
"infrastructure",
"runtime",
]
model_input_serializer = JsonModelInputSERDE()
def __init__(
self,
properties: Union[ModelDeploymentProperties, Dict] = None,
config: Dict = None,
model_deployment_id: str = None,
model_deployment_url: str = "",
spec: Dict = None,
**kwargs,
):
"""Initializes a ModelDeployment object.
Parameters
----------
properties: (Union[ModelDeploymentProperties, Dict], optional). Defaults to None.
Object containing deployment properties.
The properties can be `None` when `kwargs` are used for specifying properties.
config: (Dict, optional). Defaults to None.
ADS auth dictionary for OCI authentication.
This can be generated by calling `ads.common.auth.api_keys()` or `ads.common.auth.resource_principal()`.
If this is `None` then the `ads.common.default_signer(client_kwargs)` will be used.
model_deployment_id: (str, optional). Defaults to None.
Model deployment OCID.
model_deployment_url: (str, optional). Defaults to empty string.
Model deployment url.
spec: (dict, optional). Defaults to None.
Model deployment spec.
kwargs:
Keyword arguments for initializing `ModelDeploymentProperties` or `ModelDeployment`.
"""
if spec and properties:
raise ValueError(
"You can only pass in either `spec` or `properties` to initialize model deployment instance."
)
if config:
warnings.warn(
"Parameter `config` was deprecated in 2.8.2 from ModelDeployment constructor and will be removed in 3.0.0. Please use `ads.set_auth()` to config the auth information. "
"Check: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/cli/authentication.html"
)
if properties:
warnings.warn(
"Parameter `properties` was deprecated in 2.8.2 from ModelDeployment constructor and will be removed in 3.0.0. Please use `spec` or the builder pattern to initialize model deployment instance. "
"Check: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/model_registration/quick_start.html"
)
if model_deployment_url or model_deployment_id:
warnings.warn(
"Parameter `model_deployment_url` and `model_deployment_id` were deprecated in 2.8.2 from ModelDeployment constructor and will be removed in 3.0.0. These two fields will be auto-populated from the service side. "
"Check: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/model_registration/quick_start.html"
)
initialize_spec = {}
initialize_spec_kwargs = {}
if spec:
initialize_spec = spec
initialize_spec_kwargs = self._extract_spec_kwargs(**kwargs)
elif not properties and not spec:
if self.CONST_INFRASTRUCTURE in kwargs or self.CONST_RUNTIME in kwargs:
initialize_spec_kwargs = self._extract_spec_kwargs(**kwargs)
super().__init__(spec=initialize_spec, **initialize_spec_kwargs)
self.properties = (
properties
if isinstance(properties, ModelDeploymentProperties)
else ModelDeploymentProperties(oci_model_deployment=properties, **kwargs)
)
self.current_state = (
State._from_str(self.properties.lifecycle_state)
if self.properties.lifecycle_state
else State.UNKNOWN
)
self._access_log = None
self._predict_log = None
self.dsc_model_deployment = OCIDataScienceModelDeployment()
@property
def kind(self) -> str:
"""The kind of the object as showing in YAML.
Returns
-------
str
deployment
"""
return MODEL_DEPLOYMENT_KIND
@property
def type(self) -> str:
"""The type of the object as showing in YAML.
Returns
-------
str
deployment
"""
return MODEL_DEPLOYMENT_TYPE
@property
def model_deployment_id(self) -> str:
"""The model deployment ocid.
Returns
-------
str
The model deployment ocid.
"""
return self.get_spec(self.CONST_ID, None)
@property
def id(self) -> str:
"""The model deployment ocid.
Returns
-------
str
The model deployment ocid.
"""
return self.get_spec(self.CONST_ID, None)
@property
def created_by(self) -> str:
"""The user that creates the model deployment.
Returns
-------
str
The user that creates the model deployment.
"""
return self.get_spec(self.CONST_CREATED_BY, None)
@property
def url(self) -> str:
"""Model deployment url.
Returns
-------
str
Model deployment url.
"""
return self.get_spec(self.CONST_MODEL_DEPLOYMENT_URL, None)
@property
def lifecycle_state(self) -> str:
"""Model deployment lifecycle state.
Returns
-------
str
Model deployment lifecycle state.
"""
return self.get_spec(self.CONST_LIFECYCLE_STATE, None)
@property
def lifecycle_details(self) -> str:
"""Model deployment lifecycle details.
Returns
-------
str
Model deployment lifecycle details.
"""
return self.get_spec(self.CONST_LIFECYCLE_DETAILS, None)
@property
def time_created(self) -> datetime:
"""The time when the model deployment is created.
Returns
-------
datetime
The time when the model deployment is created.
"""
return self.get_spec(self.CONST_TIME_CREATED, None)
@property
def display_name(self) -> str:
"""Model deployment display name.
Returns
-------
str
Model deployment display name.
"""
return self.get_spec(self.CONST_DISPLAY_NAME, None)
[docs]
def with_display_name(self, display_name: str) -> "ModelDeployment":
"""Sets the name of model deployment.
Parameters
----------
display_name: str
The name of model deployment.
Returns
-------
ModelDeployment
The ModelDeployment instance (self).
"""
return self.set_spec(self.CONST_DISPLAY_NAME, display_name)
@property
def description(self) -> str:
"""Model deployment description.
Returns
-------
str
Model deployment description.
"""
return self.get_spec(self.CONST_DESCRIPTION, None)
[docs]
def with_description(self, description: str) -> "ModelDeployment":
"""Sets the description of model deployment.
Parameters
----------
description: str
The description of model deployment.
Returns
-------
ModelDeployment
The ModelDeployment instance (self).
"""
return self.set_spec(self.CONST_DESCRIPTION, description)
@property
def freeform_tags(self) -> Dict:
"""Model deployment freeform tags.
Returns
-------
Dict
Model deployment freeform tags.
"""
return self.get_spec(self.CONST_FREEFORM_TAG, {})
@property
def defined_tags(self) -> Dict:
"""Model deployment defined tags.
Returns
-------
Dict
Model deployment defined tags.
"""
return self.get_spec(self.CONST_DEFINED_TAG, {})
@property
def runtime(self) -> "ModelDeploymentRuntime":
"""Model deployment runtime.
Returns
-------
ModelDeploymentRuntime
Model deployment runtime.
"""
return self.get_spec(self.CONST_RUNTIME, None)
[docs]
def with_runtime(self, runtime: ModelDeploymentRuntime) -> "ModelDeployment":
"""Sets the runtime of model deployment.
Parameters
----------
runtime: ModelDeploymentRuntime
The runtime of model deployment.
Returns
-------
ModelDeployment
The ModelDeployment instance (self).
"""
return self.set_spec(self.CONST_RUNTIME, runtime)
@property
def infrastructure(self) -> "ModelDeploymentInfrastructure":
"""Model deployment infrastructure.
Returns
-------
ModelDeploymentInfrastructure
Model deployment infrastructure.
"""
return self.get_spec(self.CONST_INFRASTRUCTURE, None)
[docs]
def with_infrastructure(
self, infrastructure: ModelDeploymentInfrastructure
) -> "ModelDeployment":
"""Sets the infrastructure of model deployment.
Parameters
----------
infrastructure: ModelDeploymentInfrastructure
The infrastructure of model deployment.
Returns
-------
ModelDeployment
The ModelDeployment instance (self).
"""
return self.set_spec(self.CONST_INFRASTRUCTURE, infrastructure)
[docs]
def deploy(
self,
wait_for_completion: bool = True,
max_wait_time: int = DEFAULT_WAIT_TIME,
poll_interval: int = DEFAULT_POLL_INTERVAL,
):
"""Deploys the current ModelDeployment object
Parameters
----------
wait_for_completion: bool
Flag set for whether to wait for deployment to be deployed before proceeding.
Defaults to True.
max_wait_time: int
Maximum amount of time to wait in seconds (Defaults to 1200).
Negative implies infinite wait time.
poll_interval: int
Poll interval in seconds (Defaults to 10).
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
create_model_deployment_details = (
self._build_model_deployment_details()
if self._spec
else self.properties.build()
)
response = self.dsc_model_deployment.create(
create_model_deployment_details=create_model_deployment_details,
wait_for_completion=wait_for_completion,
max_wait_time=max_wait_time,
poll_interval=poll_interval,
)
return self._update_from_oci_model(response)
[docs]
def delete(
self,
wait_for_completion: bool = True,
max_wait_time: int = DEFAULT_WAIT_TIME,
poll_interval: int = DEFAULT_POLL_INTERVAL,
):
"""Deletes the ModelDeployment
Parameters
----------
wait_for_completion: bool
Flag set for whether to wait for deployment to be deleted before proceeding.
Defaults to True.
max_wait_time: int
Maximum amount of time to wait in seconds (Defaults to 1200).
Negative implies infinite wait time.
poll_interval: int
Poll interval in seconds (Defaults to 10).
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
response = self.dsc_model_deployment.delete(
wait_for_completion=wait_for_completion,
max_wait_time=max_wait_time,
poll_interval=poll_interval,
)
return self._update_from_oci_model(response)
[docs]
def update(
self,
properties: Union[ModelDeploymentProperties, dict, None] = None,
wait_for_completion: bool = True,
max_wait_time: int = DEFAULT_WAIT_TIME,
poll_interval: int = DEFAULT_POLL_INTERVAL,
update_type: str = ModelDeploymentUpdateType.ZDT,
**kwargs,
):
"""Updates a model deployment
You can update `model_deployment_configuration_details` and change `instance_shape` and `model_id`
when the model deployment is in the ACTIVE lifecycle state.
The `bandwidth_mbps` or `instance_count` can only be updated while the model deployment is in the `INACTIVE` state.
Changes to the `bandwidth_mbps` or `instance_count` will take effect the next time
the `ActivateModelDeployment` action is invoked on the model deployment resource.
Parameters
----------
properties: ModelDeploymentProperties or dict
The properties for updating the deployment.
wait_for_completion: bool
Flag set for whether to wait for deployment to be updated before proceeding.
Defaults to True.
max_wait_time: int
Maximum amount of time to wait in seconds (Defaults to 1200).
Negative implies infinite wait time.
poll_interval: int
Poll interval in seconds (Defaults to 10).
update_type: str
Update type for the deployment. Allowed values: ['ZDT', 'LIVE'].
'LIVE' update can only be used if model group id has been changed.
kwargs:
dict
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
if properties:
warnings.warn(
"Parameter `properties` is deprecated from ModelDeployment `update()` in 2.8.6 and will be removed in 3.0.0. Please use the builder pattern or kwargs to update model deployment instance. "
"Check: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/model_registration/quick_start.html"
)
updated_properties = properties
if not isinstance(properties, ModelDeploymentProperties):
updated_properties = ModelDeploymentProperties(
oci_model_deployment=properties, **kwargs
)
update_model_deployment_details = (
updated_properties.to_update_deployment()
if properties or updated_properties.oci_model_deployment or kwargs
else self._update_model_deployment_details(
update_type=update_type, **kwargs
)
)
response = self.dsc_model_deployment.update(
update_model_deployment_details=update_model_deployment_details,
wait_for_completion=wait_for_completion,
max_wait_time=max_wait_time,
poll_interval=poll_interval,
)
return self._update_from_oci_model(response)
[docs]
def watch(
self,
log_type: str = None,
time_start: datetime = None,
interval: int = LOG_INTERVAL,
log_filter: str = None,
) -> "ModelDeployment":
"""Streams the access and/or predict logs of model deployment.
Parameters
----------
log_type: str, optional
The log type. Can be `access`, `predict` or None.
Defaults to None.
time_start : datetime.datetime, optional
Starting time for the log query.
Defaults to None.
interval : int, optional
The time interval between sending each request to pull logs from OCI logging service.
Defaults to 3.
log_filter : str, optional
Expression for filtering the logs. This will be the WHERE clause of the query.
Defaults to None.
status_list : List[str], optional
List of status of model deployment. This is used to store list of status from logs.
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
status = ""
while not self._stop_condition():
status = self._check_and_print_status(status)
time.sleep(interval)
time_start = time_start or self.time_created
try:
count = self.logs(log_type).stream(
source=self.model_deployment_id,
interval=interval,
stop_condition=self._stream_stop_condition,
time_start=time_start,
log_filter=log_filter,
)
if not count:
print(
"No logs in the last 14 days. Please reset time_start to see older logs."
)
return self.sync()
except KeyboardInterrupt:
print("Stop watching logs.")
pass
def _stop_condition(self):
"""Stops the sync once the model deployment is in a terminal state."""
return self.state in [State.ACTIVE, State.FAILED, State.DELETED, State.INACTIVE]
def _stream_stop_condition(self):
"""Stops the stream sync once the model deployment is in a terminal state."""
return self.state in [State.FAILED, State.DELETED, State.INACTIVE]
def _check_and_print_status(self, prev_status) -> str:
"""Check and print the next status.
Parameters
----------
prev_status: str
The previous model deployment status.
Returns
-------
str:
The next model deployment status.
"""
status = self._model_deployment_status_text()
if status != prev_status:
timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
print(f"{timestamp} - {status}")
return status
def _model_deployment_status_text(self) -> str:
"""Formats the status message.
Returns
-------
str:
The model deployment life status and life cycle details.
"""
details = f", {self.lifecycle_details}" if self.lifecycle_details else ""
return f"Model Deployment {self.lifecycle_state}" + details
@property
def state(self) -> State:
"""Returns the deployment state of the current Model Deployment object"""
request_attempts = 0
while request_attempts < DEFAULT_RETRYING_REQUEST_ATTEMPTS:
request_attempts += 1
try:
self.current_state = State._from_str(self.sync().lifecycle_state)
break
except:
pass
time.sleep(1)
return self.current_state
@property
def status(self) -> State:
"""Returns the deployment state of the current Model Deployment object"""
return self.state
[docs]
def predict(
self,
json_input=None,
data: Any = None,
serializer: "ads.model.ModelInputSerializer" = model_input_serializer,
auto_serialize_data: bool = False,
model_name: str = None,
model_version: str = None,
**kwargs,
) -> dict:
"""Returns prediction of input data run against the model deployment endpoint.
Examples
--------
>>> import numpy as np
>>> from ads.model import ModelInputSerializer
>>> class MySerializer(ModelInputSerializer):
... def serialize(self, data):
... serialized_data = 1
... return serialized_data
>>> model_deployment = ModelDeployment.from_id(<model_deployment_id>)
>>> prediction = model_deployment.predict(
... data=np.array([1, 2, 3]),
... serializer=MySerializer(),
... auto_serialize_data=True,
... )['prediction']
Parameters
----------
json_input: Json serializable
JSON payload for the prediction.
data: Any
Data for the prediction.
serializer: ads.model.ModelInputSerializer
Defaults to ads.model.JsonModelInputSerializer.
auto_serialize_data: bool
Defaults to False. Indicate whether to auto serialize input data using `serializer`.
If `auto_serialize_data=False`, `data` required to be bytes or json serializable
and `json_input` required to be json serializable. If `auto_serialize_data` set
to True, data will be serialized before sending to model deployment endpoint.
model_name: str
Defaults to None. When the `inference_server="triton"`, the name of the model to invoke.
model_version: str
Defaults to None. When the `inference_server="triton"`, the version of the model to invoke.
kwargs:
content_type: str
Used to indicate the media type of the resource.
By default, it will be `application/octet-stream` for bytes input and `application/json` otherwise.
The content-type header will be set to this value when calling the model deployment endpoint.
Returns
-------
dict:
Prediction results.
"""
current_state = self.sync().lifecycle_state
if current_state != State.ACTIVE.name:
raise ModelDeploymentPredictError(
"This model deployment is not in active state, you will not be able to use predict end point. "
f"Current model deployment state: {current_state} "
)
endpoint = f"{self.url}/predict"
signer = authutil.default_signer()["signer"]
header = {
"signer": signer,
"content_type": kwargs.get("content_type", None),
}
header.update(kwargs.pop("headers", {}))
if data is None and json_input is None:
raise AttributeError(
"Neither `data` nor `json_input` are provided. You need to provide one of them."
)
if data is not None and json_input is not None:
raise AttributeError(
"`data` and `json_input` are both provided. You can only use one of them."
)
try:
if auto_serialize_data:
data = data or json_input
serialized_data = serializer.serialize(data=data)
return send_request(
data=serialized_data,
endpoint=endpoint,
is_json_payload=_is_json_serializable(serialized_data),
header=header,
)
if json_input is not None:
if not _is_json_serializable(json_input):
raise ValueError(
"`json_input` must be json serializable. "
"Set `auto_serialize_data` to True, or serialize the provided input data first,"
"or using `data` to pass binary data."
)
utils.get_logger().warning(
"The `json_input` argument of `predict()` will be deprecated soon. "
"Please use `data` argument. "
)
data = json_input
is_json_payload = _is_json_serializable(data)
if not isinstance(data, bytes) and not is_json_payload:
raise TypeError(
"`data` is not bytes or json serializable. Set `auto_serialize_data` to `True` to serialize the input data."
)
if model_name and model_version:
header["model-name"] = model_name
header["model-version"] = model_version
elif bool(model_version) ^ bool(model_name):
raise ValueError(
"`model_name` and `model_version` have to be provided together."
)
prediction = send_request(
data=data,
endpoint=endpoint,
is_json_payload=is_json_payload,
header=header,
)
return prediction
except oci.exceptions.ServiceError as ex:
# When bandwidth exceeds the allocated value, TooManyRequests error (429) will be raised by oci backend.
if ex.status == 429:
bandwidth_mbps = (
self.infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS
)
utils.get_logger().warning(
f"Load balancer bandwidth exceeds the allocated {bandwidth_mbps} Mbps."
"To estimate the actual bandwidth, use formula: (payload size in KB) * (estimated requests per second) * 8 / 1024."
"To resolve the issue, try sizing down the payload, slowing down the request rate or increasing the allocated bandwidth."
)
raise
[docs]
def activate(
self,
wait_for_completion: bool = True,
max_wait_time: int = DEFAULT_WAIT_TIME,
poll_interval: int = DEFAULT_POLL_INTERVAL,
) -> "ModelDeployment":
"""Activates a model deployment
Parameters
----------
wait_for_completion: bool
Flag set for whether to wait for deployment to be activated before proceeding.
Defaults to True.
max_wait_time: int
Maximum amount of time to wait in seconds (Defaults to 1200).
Negative implies infinite wait time.
poll_interval: int
Poll interval in seconds (Defaults to 10).
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
response = self.dsc_model_deployment.activate(
wait_for_completion=wait_for_completion,
max_wait_time=max_wait_time,
poll_interval=poll_interval,
)
return self._update_from_oci_model(response)
[docs]
def deactivate(
self,
wait_for_completion: bool = True,
max_wait_time: int = DEFAULT_WAIT_TIME,
poll_interval: int = DEFAULT_POLL_INTERVAL,
) -> "ModelDeployment":
"""Deactivates a model deployment
Parameters
----------
wait_for_completion: bool
Flag set for whether to wait for deployment to be deactivated before proceeding.
Defaults to True.
max_wait_time: int
Maximum amount of time to wait in seconds (Defaults to 1200).
Negative implies infinite wait time.
poll_interval: int
Poll interval in seconds (Defaults to 10).
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
response = self.dsc_model_deployment.deactivate(
wait_for_completion=wait_for_completion,
max_wait_time=max_wait_time,
poll_interval=poll_interval,
)
return self._update_from_oci_model(response)
def _log_details(self, log_type: str = ModelDeploymentLogType.ACCESS):
"""Gets log details for the provided `log_type`.
Properties
----------
log_type: (str, optional). Defaults to "access".
The log type. Can be "access" or "predict".
Returns
-------
oci.datascience_model.CategoryLogDetails
Category log details of the ModelDeployment.
Raises
------
AttributeError
Deployment doesn't have requested log configuration.
"""
if self.properties.category_log_details and getattr(
self.properties.category_log_details, log_type
):
return getattr(self.properties.category_log_details, log_type)
elif self.infrastructure:
category_log_details = self._build_category_log_details()
log = category_log_details.get(log_type, None)
if log and log.get("logId", None) and log.get("logGroupId", None):
return LogDetails(
log_id=log.get("logId"), log_group_id=log.get("logGroupId")
)
raise LogNotConfiguredError(
f"Deployment `{self.model_deployment_id}` "
f"has no `{log_type}` log configuration."
)
@property
def predict_log(self) -> OCILog:
"""Gets the model deployment predict logs object.
Returns
-------
OCILog
The OCILog object containing the predict logs.
"""
if not self._predict_log:
log_details = self._log_details(log_type=ModelDeploymentLogType.PREDICT)
compartment_id = (
self.infrastructure.compartment_id
if self.infrastructure
else self.properties.compartment_id
)
self._predict_log = OCILog(
compartment_id=compartment_id or COMPARTMENT_OCID,
id=log_details.log_id,
log_group_id=log_details.log_group_id,
source=self.model_deployment_id,
annotation=ModelDeploymentLogType.PREDICT,
)
return self._predict_log
@property
def access_log(self) -> OCILog:
"""Gets the model deployment access logs object.
Returns
-------
OCILog
The OCILog object containing the access logs.
"""
if not self._access_log:
log_details = self._log_details(log_type=ModelDeploymentLogType.ACCESS)
compartment_id = (
self.infrastructure.compartment_id
if self.infrastructure
else self.properties.compartment_id
)
self._access_log = OCILog(
compartment_id=compartment_id or COMPARTMENT_OCID,
id=log_details.log_id,
log_group_id=log_details.log_group_id,
source=self.model_deployment_id,
annotation=ModelDeploymentLogType.ACCESS,
)
return self._access_log
[docs]
def logs(self, log_type: str = None) -> ConsolidatedLog:
"""Gets the access or predict logs.
Parameters
----------
log_type: (str, optional). Defaults to None.
The log type. Can be "access", "predict" or None.
Returns
-------
ConsolidatedLog
The ConsolidatedLog object containing the logs.
"""
loggers = []
if not log_type:
try:
loggers.append(self.access_log)
except LogNotConfiguredError:
pass
try:
loggers.append(self.predict_log)
except LogNotConfiguredError:
pass
if not loggers:
raise LogNotConfiguredError(
"Neither `predict` nor `access` log was configured for the model deployment."
)
elif log_type == ModelDeploymentLogType.ACCESS:
loggers = [self.access_log]
elif log_type == ModelDeploymentLogType.PREDICT:
loggers = [self.predict_log]
else:
raise ValueError(
"Parameter log_type should be either access, predict or None."
)
return ConsolidatedLog(*loggers)
[docs]
def show_logs(
self,
time_start: datetime.datetime = None,
time_end: datetime.datetime = None,
limit: int = LOG_RECORDS_LIMIT,
log_type: str = None,
):
"""Shows deployment logs as a pandas dataframe.
Parameters
----------
time_start: (datetime.datetime, optional). Defaults to None.
Starting date and time in RFC3339 format for retrieving logs.
Defaults to None. Logs will be retrieved 14 days from now.
time_end: (datetime.datetime, optional). Defaults to None.
Ending date and time in RFC3339 format for retrieving logs.
Defaults to None. Logs will be retrieved until now.
limit: (int, optional). Defaults to 100.
The maximum number of items to return.
log_type: (str, optional). Defaults to None.
The log type. Can be "access", "predict" or None.
Returns
-------
A pandas DataFrame containing logs.
"""
logging = self.logs(log_type=log_type)
def prepare_log_record(log):
"""Converts a log record to ordered dict"""
log_content = log.get("logContent", {})
return collections.OrderedDict(
[
("type", log_content.get("type").split(".")[-1]),
("id", log_content.get("id")),
("message", log_content.get("data", {}).get("message")),
("time", log_content.get("time")),
]
)
logs = logging.search(
source=self.model_deployment_id,
time_start=time_start,
time_end=time_end,
limit=limit,
)
return pd.DataFrame([prepare_log_record(log.data) for log in logs])
[docs]
def sync(self) -> "ModelDeployment":
"""Updates the model deployment instance from backend.
Returns
-------
ModelDeployment
The ModelDeployment instance (self).
"""
return self._update_from_oci_model(
OCIDataScienceModelDeployment.from_id(self.model_deployment_id)
)
[docs]
@classmethod
def list(
cls,
status: str = None,
compartment_id: str = None,
project_id: str = None,
**kwargs,
) -> List["ModelDeployment"]:
"""Lists the model deployments associated with current compartment id and status
Parameters
----------
status : str
Status of deployment. Defaults to None.
Allowed values: `ACTIVE`, `CREATING`, `DELETED`, `DELETING`, `FAILED`, `INACTIVE` and `UPDATING`.
compartment_id : str
Target compartment to list deployments from.
Defaults to the compartment set in the environment variable "NB_SESSION_COMPARTMENT_OCID".
If "NB_SESSION_COMPARTMENT_OCID" is not set, the root compartment ID will be used.
An ValueError will be raised if root compartment ID cannot be determined.
project_id : str
Target project to list deployments from.
Defaults to the project id in the environment variable "PROJECT_OCID".
kwargs :
The values are passed to oci.data_science.DataScienceClient.list_model_deployments.
Returns
-------
list
A list of ModelDeployment objects.
"""
deployments = OCIDataScienceModelDeployment.list(
status=status,
compartment_id=compartment_id,
project_id=project_id,
**kwargs,
)
return [cls()._update_from_oci_model(deployment) for deployment in deployments]
[docs]
@classmethod
def list_df(
cls,
status: str = None,
compartment_id: str = None,
project_id: str = None,
) -> pd.DataFrame:
"""Returns the model deployments associated with current compartment and status
as a Dataframe that can be easily visualized
Parameters
----------
status : str
Status of deployment. Defaults to None.
Allowed values: `ACTIVE`, `CREATING`, `DELETED`, `DELETING`, `FAILED`, `INACTIVE` and `UPDATING`.
compartment_id : str
Target compartment to list deployments from.
Defaults to the compartment set in the environment variable "NB_SESSION_COMPARTMENT_OCID".
If "NB_SESSION_COMPARTMENT_OCID" is not set, the root compartment ID will be used.
An ValueError will be raised if root compartment ID cannot be determined.
project_id : str
Target project to list deployments from.
Defaults to the project id in the environment variable "PROJECT_OCID".
Returns
-------
DataFrame
pandas Dataframe containing information about the ModelDeployments
"""
model_deployments = cls.list(
status=status, compartment_id=compartment_id, project_id=project_id
)
if isinstance(status, str) or status == None:
status = State._from_str(status)
display = pd.DataFrame()
ids, urls, status_list = [], [], []
for model_deployment in model_deployments:
state_of_model = State._from_str(model_deployment.lifecycle_state)
if status == State.UNKNOWN or status.name == state_of_model.name:
ids.append(model_deployment.model_deployment_id)
urls.append(model_deployment.url)
status_list.append(model_deployment.lifecycle_state)
display["deployment_id"] = ids
display["deployment_url"] = urls
display["current_state"] = status_list
return display
[docs]
@classmethod
def from_id(cls, id: str) -> "ModelDeployment":
"""Loads the model deployment instance from ocid.
Parameters
----------
id: str
The ocid of model deployment.
Returns
-------
ModelDeployment
The ModelDeployment instance (self).
"""
oci_model = OCIDataScienceModelDeployment.from_id(id)
return cls(properties=oci_model)._update_from_oci_model(oci_model)
[docs]
@classmethod
def from_dict(cls, obj_dict: Dict) -> "ModelDeployment":
"""Loads model deployment instance from a dictionary of configurations.
Parameters
----------
obj_dict: Dict
A dictionary of configurations.
Returns
-------
ModelDeployment
The model deployment instance.
"""
if not isinstance(obj_dict, dict):
raise ValueError(
"The config data for initializing the model deployment is invalid."
)
spec = ads_utils.batch_convert_case(
copy.deepcopy(obj_dict.get("spec")), "camel"
)
mappings = {
cls.CONST_INFRASTRUCTURE: {
MODEL_DEPLOYMENT_INFRASTRUCTURE_TYPE: ModelDeploymentInfrastructure,
},
cls.CONST_RUNTIME: {
ModelDeploymentRuntimeType.CONDA: ModelDeploymentCondaRuntime,
ModelDeploymentRuntimeType.CONTAINER: ModelDeploymentContainerRuntime,
},
}
model_deployment = cls()
for key, value in spec.items():
if key in mappings:
mapping = mappings[key]
child_config = value
if child_config.get("type") not in mapping:
raise NotImplementedError(
f"{key.title()} type: {child_config.get('type')} is not supported."
)
model_deployment.set_spec(
key, mapping[child_config.get("type")].from_dict(child_config)
)
else:
model_deployment.set_spec(key, value)
return model_deployment
[docs]
def to_dict(self, **kwargs) -> Dict:
"""Serializes model deployment to a dictionary.
Returns
-------
dict
The model deployment serialized as a dictionary.
"""
spec = copy.deepcopy(self._spec)
for key, value in spec.items():
if hasattr(value, "to_dict"):
value = value.to_dict()
spec[key] = value
return {
"kind": self.kind,
"type": self.type,
"spec": ads_utils.batch_convert_case(spec, "camel"),
}
def _update_from_oci_model(self, oci_model_instance) -> "ModelDeployment":
"""Updates model deployment instance from OCIDataScienceModelDeployment.
Parameters
----------
oci_model_instance: OCIDataScienceModelDeployment
The OCIDataScienceModelDeployment instance.
Returns
-------
ModelDeployment
The model deployment instance.
"""
self.dsc_model_deployment = oci_model_instance
for key, value in self.attribute_map.items():
if hasattr(oci_model_instance, value):
self.set_spec(key, getattr(oci_model_instance, value))
infrastructure = ModelDeploymentInfrastructure()
self._extract_from_oci_model(
infrastructure, oci_model_instance, infrastructure.sub_level_attribute_maps
)
model_deployment_configuration_details = getattr(
oci_model_instance, "model_deployment_configuration_details", None
)
environment_configuration_details = getattr(
model_deployment_configuration_details,
"environment_configuration_details",
None,
)
runtime = (
ModelDeploymentContainerRuntime()
if getattr(
environment_configuration_details,
"environment_configuration_type",
None,
)
== OCIModelDeploymentRuntimeType.CONTAINER
else ModelDeploymentCondaRuntime()
)
self._extract_from_oci_model(runtime, oci_model_instance)
infrastructure.set_spec(
infrastructure.CONST_WEB_CONCURRENCY,
runtime.env.get("WEB_CONCURRENCY", None),
)
if (
runtime.env.get("CONTAINER_TYPE", None)
== MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON
):
runtime.set_spec(
runtime.CONST_INFERENCE_SERVER,
MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON.lower(),
)
self.set_spec(self.CONST_INFRASTRUCTURE, infrastructure)
self.set_spec(self.CONST_RUNTIME, runtime)
return self
@staticmethod
def _extract_from_oci_model(
dsc_instance: Union[ModelDeploymentInfrastructure, ModelDeploymentRuntime],
oci_model_instance: OCIDataScienceModelDeployment,
sub_level: Dict = {},
) -> Union[ModelDeploymentInfrastructure, ModelDeploymentRuntime]:
"""Extract attributes from OCIDataScienceModelDeployment.
Parameters
----------
dsc_instance: Union[ModelDeploymentInfrastructure, ModelDeploymentRuntime]
The ModelDeploymentInfrastructure or ModelDeploymentRuntime instance.
oci_model_instance: OCIDataScienceModelDeployment
The OCIDataScienceModelDeployment instance.
sub_level: Dict
The sub level attribute maps of ModelDeploymentInfrastructure or ModelDeploymentRuntime
Returns
-------
Union[ModelDeploymentInfrastructure, ModelDeploymentRuntime]
The ModelDeploymentInfrastructure or ModelDeploymentRuntime instance.
"""
for infra_attr, dsc_attr in dsc_instance.payload_attribute_map.items():
dsc_attr = dsc_attr if isinstance(dsc_attr, list) else [dsc_attr]
for dsc_value in dsc_attr:
value = get_value(oci_model_instance, dsc_value)
if value:
if infra_attr not in sub_level:
dsc_instance._spec[infra_attr] = value
else:
dsc_instance._spec[infra_attr] = {}
for sub_infra_attr, sub_dsc_attr in sub_level[
infra_attr
].items():
sub_value = get_value(value, sub_dsc_attr)
if sub_value:
dsc_instance._spec[infra_attr][sub_infra_attr] = (
sub_value
)
return dsc_instance
def _build_model_deployment_details(self) -> CreateModelDeploymentDetails:
"""Builds CreateModelDeploymentDetails from model deployment instance.
Returns
-------
CreateModelDeploymentDetails
The CreateModelDeploymentDetails instance.
"""
if not (self.infrastructure and self.runtime):
raise ValueError(
"Missing parameter runtime or infrastructure. Try reruning it after parameters are fully configured."
)
create_model_deployment_details = {
self.CONST_DISPLAY_NAME: self.display_name or self._random_display_name(),
self.CONST_DESCRIPTION: self.description,
self.CONST_DEFINED_TAG: self.defined_tags,
self.CONST_FREEFORM_TAG: self.freeform_tags,
self.runtime.CONST_DEPLOYMENT_MODE: self.runtime.deployment_mode
or ModelDeploymentMode.HTTPS,
self.infrastructure.CONST_COMPARTMENT_ID: self.infrastructure.compartment_id
or COMPARTMENT_OCID,
self.infrastructure.CONST_PROJECT_ID: self.infrastructure.project_id
or PROJECT_OCID,
self.infrastructure.CONST_MODEL_DEPLOYMENT_CONFIG_DETAILS: self._build_model_deployment_configuration_details(),
}
if not self.infrastructure.compute_target_details:
create_model_deployment_details[
self.infrastructure.CONST_CATEGORY_LOG_DETAILS
] = self._build_category_log_details()
return OCIDataScienceModelDeployment(
**create_model_deployment_details
).to_oci_model(CreateModelDeploymentDetails)
def _update_model_deployment_details(
self, update_type: str, **kwargs
) -> UpdateModelDeploymentDetails:
"""Builds UpdateModelDeploymentDetails from model deployment instance.
Parameters
----------
update_type: str
Update type for the deployment. Allowed values: ['ZDT', 'LIVE'].
'LIVE' update can only be used if model group id has been changed.
Returns
-------
UpdateModelDeploymentDetails
The UpdateModelDeploymentDetails instance.
"""
if not (self.infrastructure and self.runtime):
raise ValueError(
"Missing parameter runtime or infrastructure. Try reruning it after parameters are fully configured."
)
self._update_spec(**kwargs)
update_model_deployment_details = {
self.CONST_DISPLAY_NAME: self.display_name,
self.CONST_DESCRIPTION: self.description,
self.CONST_DEFINED_TAG: self.defined_tags,
self.CONST_FREEFORM_TAG: self.freeform_tags,
self.infrastructure.CONST_MODEL_DEPLOYMENT_CONFIG_DETAILS: self._build_model_deployment_configuration_details(),
self.infrastructure.CONST_CATEGORY_LOG_DETAILS: self._build_category_log_details(),
}
dsc_update_model_deployment_details: UpdateModelDeploymentDetails = (
OCIDataScienceModelDeployment(
**update_model_deployment_details
).to_oci_model(UpdateModelDeploymentDetails)
)
dsc_update_model_deployment_details.model_deployment_configuration_details.update_type = update_type
return dsc_update_model_deployment_details
def _update_spec(self, **kwargs) -> "ModelDeployment":
"""Updates model deployment specs from kwargs.
Parameters
----------
kwargs:
display_name: (str)
Model deployment display name
description: (str)
Model deployment description
freeform_tags: (dict)
Model deployment freeform tags
defined_tags: (dict)
Model deployment defined tags
Additional kwargs arguments.
Can be any attribute that `ads.model.deployment.ModelDeploymentCondaRuntime`, `ads.model.deployment.ModelDeploymentContainerRuntime`
and `ads.model.deployment.ModelDeploymentInfrastructure` accepts.
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
if not kwargs:
return self
converted_specs = ads_utils.batch_convert_case(kwargs, "camel")
specs = {
"self": self._spec,
"runtime": self.runtime._spec,
"infrastructure": self.infrastructure._spec,
}
sub_set = {
self.infrastructure.CONST_ACCESS_LOG,
self.infrastructure.CONST_PREDICT_LOG,
self.infrastructure.CONST_SHAPE_CONFIG_DETAILS,
}
for spec_value in specs.values():
for key in spec_value:
if key in converted_specs:
if key in sub_set:
for sub_key in converted_specs[key]:
converted_sub_key = ads_utils.snake_to_camel(sub_key)
spec_value[key][converted_sub_key] = converted_specs[key][
sub_key
]
else:
spec_value[key] = copy.deepcopy(converted_specs[key])
self = (
ModelDeployment(spec=specs["self"])
.with_runtime(
MODEL_DEPLOYMENT_RUNTIMES[self.runtime.type](spec=specs["runtime"])
)
.with_infrastructure(
ModelDeploymentInfrastructure(spec=specs["infrastructure"])
)
)
return self
def _build_model_deployment_configuration_details(self) -> Dict:
"""Builds model deployment configuration details from model deployment instance.
Returns
-------
Dict:
Dict contains model deployment configuration details.
"""
infrastructure = self.infrastructure
runtime = self.runtime
instance_configuration = {
infrastructure.CONST_INSTANCE_SHAPE_NAME: infrastructure.shape_name
or DEFAULT_SHAPE_NAME,
}
if instance_configuration[infrastructure.CONST_INSTANCE_SHAPE_NAME].endswith(
"Flex"
):
instance_configuration[
infrastructure.CONST_MODEL_DEPLOYMENT_INSTANCE_SHAPE_CONFIG_DETAILS
] = {
infrastructure.CONST_OCPUS: infrastructure.shape_config_details.get(
"ocpus", None
)
or DEFAULT_OCPUS,
infrastructure.CONST_MEMORY_IN_GBS: infrastructure.shape_config_details.get(
"memory_in_gbs", None
)
or infrastructure.shape_config_details.get("memoryInGBs", None)
or DEFAULT_MEMORY_IN_GBS,
}
if infrastructure.subnet_id:
instance_configuration[infrastructure.CONST_SUBNET_ID] = (
infrastructure.subnet_id
)
if infrastructure.private_endpoint_id:
if not hasattr(
oci.data_science.models.InstanceConfiguration, "private_endpoint_id"
):
# TODO: add oci version with private endpoint support.
raise OSError(
"Private endpoint is not supported in the current OCI SDK installed."
)
instance_configuration[infrastructure.CONST_PRIVATE_ENDPOINT_ID] = (
infrastructure.private_endpoint_id
)
# Add capacity reservation IDs if provided
if infrastructure.capacity_reservation_ids:
if not hasattr(
oci.data_science.models.InstanceConfiguration,
"capacity_reservation_ids",
):
raise OSError(
"Capacity reservation is not supported in the current OCI SDK installed. "
"Please upgrade to a newer version of the OCI SDK."
)
instance_configuration[infrastructure.CONST_CAPACITY_RESERVATION_IDS] = (
infrastructure.capacity_reservation_ids
)
def _drop_none_values(d: Dict) -> Dict:
"""Drops keys with None values from the provided dict."""
return {k: v for k, v in d.items() if v is not None}
# Fixed-size is the default. If autoscaling is configured on infrastructure,
# emit an AUTOSCALING policy (supported for both SINGLE_MODEL and MODEL_GROUP).
auto_scaling = getattr(infrastructure, "auto_scaling", None) or {}
if auto_scaling:
scaling_type = str(auto_scaling.get("scalingType", "") or "").lower()
metric_type = scaling_type.upper()
scaling_policy = {
infrastructure.CONST_POLICY_TYPE: "AUTOSCALING",
"isEnabled": auto_scaling.get("isEnabled", True),
"coolDownInSeconds": auto_scaling.get("coolDownInSeconds", None),
"autoScalingPolicies": [
_drop_none_values(
{
"autoScalingPolicyType": "THRESHOLD",
"maximumInstanceCount": auto_scaling.get(
"maximumInstanceCount", 3
),
"minimumInstanceCount": auto_scaling.get(
"minimumInstanceCount", 1
),
"initialInstanceCount": auto_scaling.get(
"initialInstanceCount",
infrastructure.replica or DEFAULT_REPLICA,
),
"rules": [
{
"metricExpressionRuleType": "PREDEFINED_EXPRESSION",
"metricType": metric_type,
"scaleInConfiguration": _drop_none_values(
{
"scalingConfigurationType": "THRESHOLD",
"threshold": auto_scaling.get(
"scaleInThreshold", 30
),
}
),
"scaleOutConfiguration": _drop_none_values(
{
"scalingConfigurationType": "THRESHOLD",
"threshold": auto_scaling.get(
"scaleOutThreshold", 70
),
}
),
}
],
}
)
],
}
scaling_policy = _drop_none_values(scaling_policy)
else:
scaling_policy = {
infrastructure.CONST_POLICY_TYPE: "FIXED_SIZE",
infrastructure.CONST_INSTANCE_COUNT: infrastructure.replica
or DEFAULT_REPLICA,
}
if not (runtime.model_uri or runtime.model_group_id):
raise ValueError(
"Missing parameter model uri and model group id. Try reruning it after model or model group is configured."
)
model_id = runtime.model_uri
if model_id and not model_id.startswith("ocid"):
from ads.model.datascience_model import DataScienceModel
dsc_model = DataScienceModel(
name=self.display_name,
compartment_id=self.infrastructure.compartment_id or COMPARTMENT_OCID,
project_id=self.infrastructure.project_id or PROJECT_OCID,
artifact=runtime.model_uri,
).create(
bucket_uri=runtime.bucket_uri,
auth=runtime.auth,
region=runtime.region,
overwrite_existing_artifact=runtime.overwrite_existing_artifact,
remove_existing_artifact=runtime.remove_existing_artifact,
timeout=runtime.timeout,
)
model_id = dsc_model.id
model_configuration_details = {
infrastructure.CONST_BANDWIDTH_MBPS: infrastructure.bandwidth_mbps
or DEFAULT_BANDWIDTH_MBPS,
infrastructure.CONST_INSTANCE_CONFIG: instance_configuration,
runtime.CONST_MODEL_ID: model_id,
infrastructure.CONST_SCALING_POLICY: scaling_policy,
}
if runtime.env:
if not hasattr(
oci.data_science.models,
"ModelDeploymentEnvironmentConfigurationDetails",
):
raise OSError(
"Environment variable hasn't been supported in the current OCI SDK installed."
)
environment_variables = runtime.env
if infrastructure.web_concurrency:
environment_variables["WEB_CONCURRENCY"] = str(
infrastructure.web_concurrency
)
runtime.set_spec(runtime.CONST_ENV, environment_variables)
if (
hasattr(runtime, "inference_server")
and runtime.inference_server
and runtime.inference_server.upper()
== MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON
):
environment_variables["CONTAINER_TYPE"] = (
MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON
)
runtime.set_spec(runtime.CONST_ENV, environment_variables)
environment_configuration_details = {
runtime.CONST_ENVIRONMENT_CONFIG_TYPE: runtime.environment_config_type,
runtime.CONST_ENVIRONMENT_VARIABLES: runtime.env,
}
if runtime.environment_config_type == OCIModelDeploymentRuntimeType.CONTAINER:
if not hasattr(
oci.data_science.models,
"OcirModelDeploymentEnvironmentConfigurationDetails",
):
raise OSError(
"Container runtime hasn't been supported in the current OCI SDK installed."
)
environment_configuration_details["image"] = runtime.image
environment_configuration_details["imageDigest"] = runtime.image_digest
environment_configuration_details["cmd"] = runtime.cmd
environment_configuration_details["entrypoint"] = runtime.entrypoint
environment_configuration_details["serverPort"] = runtime.server_port
environment_configuration_details["healthCheckPort"] = (
runtime.health_check_port
)
model_deployment_configuration_details = {
infrastructure.CONST_DEPLOYMENT_TYPE: ModelDeploymentType.SINGLE_MODEL,
infrastructure.CONST_MODEL_CONFIG_DETAILS: model_configuration_details,
runtime.CONST_ENVIRONMENT_CONFIG_DETAILS: environment_configuration_details,
}
if runtime.model_group_id:
model_deployment_configuration_details[
infrastructure.CONST_DEPLOYMENT_TYPE
] = ModelDeploymentType.MODEL_GROUP
model_deployment_configuration_details["modelGroupConfigurationDetails"] = {
runtime.CONST_MODEL_GROUP_ID: runtime.model_group_id
}
model_deployment_configuration_details[
"infrastructureConfigurationDetails"
] = {
"infrastructureType": "INSTANCE_POOL",
infrastructure.CONST_BANDWIDTH_MBPS: infrastructure.bandwidth_mbps
or DEFAULT_BANDWIDTH_MBPS,
infrastructure.CONST_INSTANCE_CONFIG: instance_configuration,
infrastructure.CONST_SCALING_POLICY: scaling_policy,
}
model_configuration_details.pop(runtime.CONST_MODEL_ID)
if infrastructure.compute_target_details:
if not hasattr(
oci.data_science.models,
"ManagedComputeClusterModelDeploymentResourceConfiguration",
):
raise OSError(
"Managed compute cluster is not supported in the current OCI SDK installed. "
"Please upgrade to a newer version of the OCI SDK."
)
compute_target_details = infrastructure.compute_target_details
model_deployment_configuration_details[
infrastructure.CONST_DEPLOYMENT_TYPE
] = ModelDeploymentType.SINGLE_MODEL_FLEX
model_deployment_configuration_details[
"infrastructureConfigurationDetails"
] = {
"infrastructureType": "MANAGED_COMPUTE_CLUSTER",
infrastructure.CONST_COMPUTE_TARGET_ID: compute_target_details.get(
"compute_target_id"
),
infrastructure.CONST_MODEL_DEPLOYMENT_RESOURCE_CONFIGURATION: {
infrastructure.CONST_RESOURCE_REQUEST_CONFIGURATION: {
infrastructure.CONST_OCPUS: compute_target_details.get("ocpus"),
infrastructure.CONST_MEMORY_IN_GBS: compute_target_details.get(
"memory_in_gbs"
),
infrastructure.CONST_GPUS: compute_target_details.get(
"gpu_count"
),
},
},
infrastructure.CONST_SCALING_POLICY: scaling_policy,
}
model_configuration_details.pop(infrastructure.CONST_BANDWIDTH_MBPS)
model_configuration_details.pop(infrastructure.CONST_INSTANCE_CONFIG)
model_configuration_details.pop(infrastructure.CONST_SCALING_POLICY)
if runtime.deployment_mode == ModelDeploymentMode.STREAM:
if not hasattr(oci.data_science.models, "StreamConfigurationDetails"):
raise OSError(
"Model deployment mode hasn't been supported in the current OCI SDK installed."
)
model_deployment_configuration_details[
infrastructure.CONST_STREAM_CONFIG_DETAILS
] = {
runtime.CONST_INPUT_STREAM_IDS: runtime.input_stream_ids,
runtime.CONST_OUTPUT_STREAM_IDS: runtime.output_stream_ids,
}
return model_deployment_configuration_details
def _build_category_log_details(self) -> Dict:
"""Builds category log details from model deployment instance.
Returns
-------
Dict:
Dict contains category log details.
"""
if self.infrastructure.log_group_id and self.infrastructure.log_id:
log_group_details = {
self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.log_group_id,
self.infrastructure.CONST_LOG_ID: self.infrastructure.log_id,
}
return {
self.infrastructure.CONST_ACCESS: log_group_details,
self.infrastructure.CONST_PREDICT: log_group_details,
}
logs = {}
if (
self.infrastructure.access_log
and self.infrastructure.access_log.get(
self.infrastructure.CONST_LOG_GROUP_ID, None
)
and self.infrastructure.access_log.get(
self.infrastructure.CONST_LOG_ID, None
)
):
logs[self.infrastructure.CONST_ACCESS] = {
self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.access_log.get(
"logGroupId", None
),
self.infrastructure.CONST_LOG_ID: self.infrastructure.access_log.get(
"logId", None
),
}
if (
self.infrastructure.predict_log
and self.infrastructure.predict_log.get(
self.infrastructure.CONST_LOG_GROUP_ID, None
)
and self.infrastructure.predict_log.get(
self.infrastructure.CONST_LOG_ID, None
)
):
logs[self.infrastructure.CONST_PREDICT] = {
self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.predict_log.get(
"logGroupId", None
),
self.infrastructure.CONST_LOG_ID: self.infrastructure.predict_log.get(
"logId", None
),
}
return logs
def _random_display_name(self):
"""Generates a random display name."""
return f"{self._PREFIX}-{ads_utils.get_random_name_for_resource()}"
def _extract_spec_kwargs(self, **kwargs) -> Dict:
"""Extract spec related keyword arguments from kwargs.
Parameters
----------
kwargs
Returns
-------
Dict:
Dict contains model deployment spec related keyword arguments.
"""
spec_kwargs = {}
for attribute in self.initialize_spec_attributes:
if attribute in kwargs:
spec_kwargs[attribute] = kwargs[attribute]
return spec_kwargs
[docs]
def build(self) -> "ModelDeployment":
"""Load default values from the environment for the job infrastructure."""
build_method = getattr(self.infrastructure, "build", None)
if build_method and callable(build_method):
build_method()
else:
raise NotImplementedError
return self