#!/usr/bin/env python
# -*- coding: utf-8; -*-
# Copyright (c) 2021, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
import collections
import copy
import datetime
import oci
import warnings
import time
from typing import Dict, List, Union, Any
import oci.loggingsearch
from ads.common import auth as authutil
import pandas as pd
from ads.model.serde.model_input import JsonModelInputSERDE
from ads.common.oci_logging import (
LOG_INTERVAL,
LOG_RECORDS_LIMIT,
ConsolidatedLog,
OCILog,
)
from ads.config import COMPARTMENT_OCID, PROJECT_OCID
from ads.jobs.builders.base import Builder
from ads.jobs.builders.infrastructure.utils import get_value
from ads.model.common.utils import _is_json_serializable
from ads.model.deployment.common.utils import send_request
from ads.model.deployment.model_deployment_infrastructure import (
DEFAULT_BANDWIDTH_MBPS,
DEFAULT_REPLICA,
DEFAULT_SHAPE_NAME,
DEFAULT_OCPUS,
DEFAULT_MEMORY_IN_GBS,
MODEL_DEPLOYMENT_INFRASTRUCTURE_TYPE,
ModelDeploymentInfrastructure,
)
from ads.model.deployment.model_deployment_runtime import (
ModelDeploymentCondaRuntime,
ModelDeploymentContainerRuntime,
ModelDeploymentMode,
ModelDeploymentRuntime,
ModelDeploymentRuntimeType,
OCIModelDeploymentRuntimeType,
)
from ads.model.service.oci_datascience_model_deployment import (
OCIDataScienceModelDeployment,
)
from ads.common import utils as ads_utils
from .common import utils
from .common.utils import State
from .model_deployment_properties import ModelDeploymentProperties
from oci.data_science.models import (
LogDetails,
CreateModelDeploymentDetails,
UpdateModelDeploymentDetails,
)
DEFAULT_WAIT_TIME = 1200
DEFAULT_POLL_INTERVAL = 10
DEFAULT_WORKFLOW_STEPS = 6
DELETE_WORKFLOW_STEPS = 2
DEACTIVATE_WORKFLOW_STEPS = 2
DEFAULT_RETRYING_REQUEST_ATTEMPTS = 3
MODEL_DEPLOYMENT_KIND = "deployment"
MODEL_DEPLOYMENT_TYPE = "modelDeployment"
MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON = "TRITON"
MODEL_DEPLOYMENT_RUNTIMES = {
ModelDeploymentRuntimeType.CONDA: ModelDeploymentCondaRuntime,
ModelDeploymentRuntimeType.CONTAINER: ModelDeploymentContainerRuntime,
}
[docs]
class ModelDeploymentLogType:
PREDICT = "predict"
ACCESS = "access"
[docs]
class ModelDeploymentPredictError(Exception): # pragma: no cover
pass
[docs]
class ModelDeployment(Builder):
"""
A class used to represent a Model Deployment.
Attributes
----------
config: (dict)
Deployment configuration parameters
properties: (ModelDeploymentProperties)
ModelDeploymentProperties object
workflow_state_progress: (str)
Workflow request id
workflow_steps: (int)
The number of steps in the workflow
dsc_model_deployment: (OCIDataScienceModelDeployment)
The OCIDataScienceModelDeployment instance.
state: (State)
Returns the deployment state of the current Model Deployment object
created_by: (str)
The user that creates the model deployment
lifecycle_state: (str)
Model deployment lifecycle state
lifecycle_details: (str)
Model deployment lifecycle details
time_created: (datetime)
The time when the model deployment is created
display_name: (str)
Model deployment display name
description: (str)
Model deployment description
freeform_tags: (dict)
Model deployment freeform tags
defined_tags: (dict)
Model deployment defined tags
runtime: (ModelDeploymentRuntime)
Model deployment runtime
infrastructure: (ModelDeploymentInfrastructure)
Model deployment infrastructure
Methods
-------
deploy(wait_for_completion, **kwargs)
Deploy the current Model Deployment object
delete(wait_for_completion, **kwargs)
Deletes the current Model Deployment object
update(wait_for_completion, **kwargs)
Updates a model deployment
activate(wait_for_completion, max_wait_time, poll_interval)
Activates a model deployment
deactivate(wait_for_completion, max_wait_time, poll_interval)
Deactivates a model deployment
list(status, compartment_id, project_id, **kwargs)
List model deployment within given compartment and project.
with_display_name(display_name)
Sets model deployment display name
with_description(description)
Sets model deployment description
with_freeform_tags(freeform_tags)
Sets model deployment freeform tags
with_defined_tags(defined_tags)
Sets model deployment defined tags
with_runtime(self, runtime)
Sets model deployment runtime
with_infrastructure(self, infrastructure)
Sets model deployment infrastructure
from_dict(obj_dict)
Deserializes model deployment instance from dict
from_id(id)
Loads model deployment instance from ocid
sync()
Updates the model deployment instance from backend
Examples
--------
>>> # Build model deployment from builder apis:
>>> ds_model_deployment = (ModelDeployment()
... .with_display_name("TestModelDeployment")
... .with_description("Testing the test model deployment")
... .with_freeform_tags(tag1="val1", tag2="val2")
... .with_infrastructure(
... (ModelDeploymentInfrastructure()
... .with_project_id(<project_id>)
... .with_compartment_id(<compartment_id>)
... .with_shape_name("VM.Standard.E4.Flex")
... .with_shape_config_details(
... ocpus=1,
... memory_in_gbs=16
... )
... .with_replica(1)
... .with_bandwidth_mbps(10)
... .with_web_concurrency(10)
... .with_access_log(
... log_group_id=<log_group_id>,
... log_id=<log_id>
... )
... .with_predict_log(
... log_group_id=<log_group_id>,
... log_id=<log_id>
... ))
... )
... .with_runtime(
... (ModelDeploymentContainerRuntime()
... .with_image(<image>)
... .with_image_digest(<image_digest>)
... .with_entrypoint(<entrypoint>)
... .with_server_port(<server_port>)
... .with_health_check_port(<health_check_port>)
... .with_env({"key":"value"})
... .with_deployment_mode("HTTPS_ONLY")
... .with_model_uri(<model_uri>)
... .with_bucket_uri(<bucket_uri>)
... .with_auth(<auth>)
... .with_timeout(<time_out>))
... )
... )
>>> ds_model_deployment.deploy()
>>> ds_model_deployment.status
>>> ds_model_deployment.with_display_name("new name").update()
>>> ds_model_deployment.deactivate()
>>> ds_model_deployment.sync()
>>> ds_model_deployment.list(status="ACTIVE")
>>> ds_model_deployment.delete()
>>> # Build model deployment from yaml
>>> ds_model_deployment = ModelDeployment.from_yaml(uri=<path_to_yaml>)
"""
_PREFIX = "datascience_model_deployment"
CONST_ID = "id"
CONST_CREATED_BY = "createdBy"
CONST_DISPLAY_NAME = "displayName"
CONST_DESCRIPTION = "description"
CONST_FREEFORM_TAG = "freeformTags"
CONST_DEFINED_TAG = "definedTags"
CONST_MODEL_DEPLOYMENT_URL = "modelDeploymentUrl"
CONST_INFRASTRUCTURE = "infrastructure"
CONST_RUNTIME = "runtime"
CONST_LIFECYCLE_STATE = "lifecycleState"
CONST_LIFECYCLE_DETAILS = "lifecycleDetails"
CONST_TIME_CREATED = "timeCreated"
attribute_map = {
CONST_ID: "id",
CONST_CREATED_BY: "created_by",
CONST_DISPLAY_NAME: "display_name",
CONST_DESCRIPTION: "description",
CONST_FREEFORM_TAG: "freeform_tags",
CONST_DEFINED_TAG: "defined_tags",
CONST_MODEL_DEPLOYMENT_URL: "model_deployment_url",
CONST_INFRASTRUCTURE: "infrastructure",
CONST_RUNTIME: "runtime",
CONST_LIFECYCLE_STATE: "lifecycle_state",
CONST_LIFECYCLE_DETAILS: "lifecycle_details",
CONST_TIME_CREATED: "time_created",
}
initialize_spec_attributes = [
"display_name",
"description",
"freeform_tags",
"defined_tags",
"infrastructure",
"runtime",
]
model_input_serializer = JsonModelInputSERDE()
def __init__(
self,
properties: Union[ModelDeploymentProperties, Dict] = None,
config: Dict = None,
model_deployment_id: str = None,
model_deployment_url: str = "",
spec: Dict = None,
**kwargs,
):
"""Initializes a ModelDeployment object.
Parameters
----------
properties: (Union[ModelDeploymentProperties, Dict], optional). Defaults to None.
Object containing deployment properties.
The properties can be `None` when `kwargs` are used for specifying properties.
config: (Dict, optional). Defaults to None.
ADS auth dictionary for OCI authentication.
This can be generated by calling `ads.common.auth.api_keys()` or `ads.common.auth.resource_principal()`.
If this is `None` then the `ads.common.default_signer(client_kwargs)` will be used.
model_deployment_id: (str, optional). Defaults to None.
Model deployment OCID.
model_deployment_url: (str, optional). Defaults to empty string.
Model deployment url.
spec: (dict, optional). Defaults to None.
Model deployment spec.
kwargs:
Keyword arguments for initializing `ModelDeploymentProperties` or `ModelDeployment`.
"""
if spec and properties:
raise ValueError(
"You can only pass in either `spec` or `properties` to initialize model deployment instance."
)
if config:
warnings.warn(
"Parameter `config` was deprecated in 2.8.2 from ModelDeployment constructor and will be removed in 3.0.0. Please use `ads.set_auth()` to config the auth information. "
"Check: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/cli/authentication.html"
)
if properties:
warnings.warn(
"Parameter `properties` was deprecated in 2.8.2 from ModelDeployment constructor and will be removed in 3.0.0. Please use `spec` or the builder pattern to initialize model deployment instance. "
"Check: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/model_registration/quick_start.html"
)
if model_deployment_url or model_deployment_id:
warnings.warn(
"Parameter `model_deployment_url` and `model_deployment_id` were deprecated in 2.8.2 from ModelDeployment constructor and will be removed in 3.0.0. These two fields will be auto-populated from the service side. "
"Check: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/model_registration/quick_start.html"
)
initialize_spec = {}
initialize_spec_kwargs = {}
if spec:
initialize_spec = spec
initialize_spec_kwargs = self._extract_spec_kwargs(**kwargs)
elif not properties and not spec:
if self.CONST_INFRASTRUCTURE in kwargs or self.CONST_RUNTIME in kwargs:
initialize_spec_kwargs = self._extract_spec_kwargs(**kwargs)
super().__init__(spec=initialize_spec, **initialize_spec_kwargs)
self.properties = (
properties
if isinstance(properties, ModelDeploymentProperties)
else ModelDeploymentProperties(oci_model_deployment=properties, **kwargs)
)
self.current_state = (
State._from_str(self.properties.lifecycle_state)
if self.properties.lifecycle_state
else State.UNKNOWN
)
self._access_log = None
self._predict_log = None
self.dsc_model_deployment = OCIDataScienceModelDeployment()
@property
def kind(self) -> str:
"""The kind of the object as showing in YAML.
Returns
-------
str
deployment
"""
return MODEL_DEPLOYMENT_KIND
@property
def type(self) -> str:
"""The type of the object as showing in YAML.
Returns
-------
str
deployment
"""
return MODEL_DEPLOYMENT_TYPE
@property
def model_deployment_id(self) -> str:
"""The model deployment ocid.
Returns
-------
str
The model deployment ocid.
"""
return self.get_spec(self.CONST_ID, None)
@property
def id(self) -> str:
"""The model deployment ocid.
Returns
-------
str
The model deployment ocid.
"""
return self.get_spec(self.CONST_ID, None)
@property
def created_by(self) -> str:
"""The user that creates the model deployment.
Returns
-------
str
The user that creates the model deployment.
"""
return self.get_spec(self.CONST_CREATED_BY, None)
@property
def url(self) -> str:
"""Model deployment url.
Returns
-------
str
Model deployment url.
"""
return self.get_spec(self.CONST_MODEL_DEPLOYMENT_URL, None)
@property
def lifecycle_state(self) -> str:
"""Model deployment lifecycle state.
Returns
-------
str
Model deployment lifecycle state.
"""
return self.get_spec(self.CONST_LIFECYCLE_STATE, None)
@property
def lifecycle_details(self) -> str:
"""Model deployment lifecycle details.
Returns
-------
str
Model deployment lifecycle details.
"""
return self.get_spec(self.CONST_LIFECYCLE_DETAILS, None)
@property
def time_created(self) -> datetime:
"""The time when the model deployment is created.
Returns
-------
datetime
The time when the model deployment is created.
"""
return self.get_spec(self.CONST_TIME_CREATED, None)
@property
def display_name(self) -> str:
"""Model deployment display name.
Returns
-------
str
Model deployment display name.
"""
return self.get_spec(self.CONST_DISPLAY_NAME, None)
[docs]
def with_display_name(self, display_name: str) -> "ModelDeployment":
"""Sets the name of model deployment.
Parameters
----------
display_name: str
The name of model deployment.
Returns
-------
ModelDeployment
The ModelDeployment instance (self).
"""
return self.set_spec(self.CONST_DISPLAY_NAME, display_name)
@property
def description(self) -> str:
"""Model deployment description.
Returns
-------
str
Model deployment description.
"""
return self.get_spec(self.CONST_DESCRIPTION, None)
[docs]
def with_description(self, description: str) -> "ModelDeployment":
"""Sets the description of model deployment.
Parameters
----------
description: str
The description of model deployment.
Returns
-------
ModelDeployment
The ModelDeployment instance (self).
"""
return self.set_spec(self.CONST_DESCRIPTION, description)
@property
def freeform_tags(self) -> Dict:
"""Model deployment freeform tags.
Returns
-------
Dict
Model deployment freeform tags.
"""
return self.get_spec(self.CONST_FREEFORM_TAG, {})
@property
def defined_tags(self) -> Dict:
"""Model deployment defined tags.
Returns
-------
Dict
Model deployment defined tags.
"""
return self.get_spec(self.CONST_DEFINED_TAG, {})
@property
def runtime(self) -> "ModelDeploymentRuntime":
"""Model deployment runtime.
Returns
-------
ModelDeploymentRuntime
Model deployment runtime.
"""
return self.get_spec(self.CONST_RUNTIME, None)
[docs]
def with_runtime(self, runtime: ModelDeploymentRuntime) -> "ModelDeployment":
"""Sets the runtime of model deployment.
Parameters
----------
runtime: ModelDeploymentRuntime
The runtime of model deployment.
Returns
-------
ModelDeployment
The ModelDeployment instance (self).
"""
return self.set_spec(self.CONST_RUNTIME, runtime)
@property
def infrastructure(self) -> "ModelDeploymentInfrastructure":
"""Model deployment infrastructure.
Returns
-------
ModelDeploymentInfrastructure
Model deployment infrastructure.
"""
return self.get_spec(self.CONST_INFRASTRUCTURE, None)
[docs]
def with_infrastructure(
self, infrastructure: ModelDeploymentInfrastructure
) -> "ModelDeployment":
"""Sets the infrastructure of model deployment.
Parameters
----------
infrastructure: ModelDeploymentInfrastructure
The infrastructure of model deployment.
Returns
-------
ModelDeployment
The ModelDeployment instance (self).
"""
return self.set_spec(self.CONST_INFRASTRUCTURE, infrastructure)
[docs]
def deploy(
self,
wait_for_completion: bool = True,
max_wait_time: int = DEFAULT_WAIT_TIME,
poll_interval: int = DEFAULT_POLL_INTERVAL,
):
"""Deploys the current ModelDeployment object
Parameters
----------
wait_for_completion: bool
Flag set for whether to wait for deployment to be deployed before proceeding.
Defaults to True.
max_wait_time: int
Maximum amount of time to wait in seconds (Defaults to 1200).
Negative implies infinite wait time.
poll_interval: int
Poll interval in seconds (Defaults to 10).
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
create_model_deployment_details = (
self._build_model_deployment_details()
if self._spec
else self.properties.build()
)
response = self.dsc_model_deployment.create(
create_model_deployment_details=create_model_deployment_details,
wait_for_completion=wait_for_completion,
max_wait_time=max_wait_time,
poll_interval=poll_interval,
)
return self._update_from_oci_model(response)
[docs]
def delete(
self,
wait_for_completion: bool = True,
max_wait_time: int = DEFAULT_WAIT_TIME,
poll_interval: int = DEFAULT_POLL_INTERVAL,
):
"""Deletes the ModelDeployment
Parameters
----------
wait_for_completion: bool
Flag set for whether to wait for deployment to be deleted before proceeding.
Defaults to True.
max_wait_time: int
Maximum amount of time to wait in seconds (Defaults to 1200).
Negative implies infinite wait time.
poll_interval: int
Poll interval in seconds (Defaults to 10).
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
response = self.dsc_model_deployment.delete(
wait_for_completion=wait_for_completion,
max_wait_time=max_wait_time,
poll_interval=poll_interval,
)
return self._update_from_oci_model(response)
[docs]
def update(
self,
properties: Union[ModelDeploymentProperties, dict, None] = None,
wait_for_completion: bool = True,
max_wait_time: int = DEFAULT_WAIT_TIME,
poll_interval: int = DEFAULT_POLL_INTERVAL,
**kwargs,
):
"""Updates a model deployment
You can update `model_deployment_configuration_details` and change `instance_shape` and `model_id`
when the model deployment is in the ACTIVE lifecycle state.
The `bandwidth_mbps` or `instance_count` can only be updated while the model deployment is in the `INACTIVE` state.
Changes to the `bandwidth_mbps` or `instance_count` will take effect the next time
the `ActivateModelDeployment` action is invoked on the model deployment resource.
Parameters
----------
properties: ModelDeploymentProperties or dict
The properties for updating the deployment.
wait_for_completion: bool
Flag set for whether to wait for deployment to be updated before proceeding.
Defaults to True.
max_wait_time: int
Maximum amount of time to wait in seconds (Defaults to 1200).
Negative implies infinite wait time.
poll_interval: int
Poll interval in seconds (Defaults to 10).
kwargs:
dict
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
if properties:
warnings.warn(
"Parameter `properties` is deprecated from ModelDeployment `update()` in 2.8.6 and will be removed in 3.0.0. Please use the builder pattern or kwargs to update model deployment instance. "
"Check: https://accelerated-data-science.readthedocs.io/en/latest/user_guide/model_registration/quick_start.html"
)
updated_properties = properties
if not isinstance(properties, ModelDeploymentProperties):
updated_properties = ModelDeploymentProperties(
oci_model_deployment=properties, **kwargs
)
update_model_deployment_details = (
updated_properties.to_update_deployment()
if properties or updated_properties.oci_model_deployment or kwargs
else self._update_model_deployment_details(**kwargs)
)
response = self.dsc_model_deployment.update(
update_model_deployment_details=update_model_deployment_details,
wait_for_completion=wait_for_completion,
max_wait_time=max_wait_time,
poll_interval=poll_interval,
)
return self._update_from_oci_model(response)
[docs]
def watch(
self,
log_type: str = None,
time_start: datetime = None,
interval: int = LOG_INTERVAL,
log_filter: str = None,
) -> "ModelDeployment":
"""Streams the access and/or predict logs of model deployment.
Parameters
----------
log_type: str, optional
The log type. Can be `access`, `predict` or None.
Defaults to None.
time_start : datetime.datetime, optional
Starting time for the log query.
Defaults to None.
interval : int, optional
The time interval between sending each request to pull logs from OCI logging service.
Defaults to 3.
log_filter : str, optional
Expression for filtering the logs. This will be the WHERE clause of the query.
Defaults to None.
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
status = ""
while not self._stop_condition():
status = self._check_and_print_status(status)
time.sleep(interval)
time_start = time_start or self.time_created
try:
count = self.logs(log_type).stream(
source=self.model_deployment_id,
interval=interval,
stop_condition=self._stream_stop_condition,
time_start=time_start,
log_filter=log_filter,
)
if not count:
print(
"No logs in the last 14 days. Please reset time_start to see older logs."
)
return self.sync()
except KeyboardInterrupt:
print("Stop watching logs.")
pass
def _stop_condition(self):
"""Stops the sync once the model deployment is in a terminal state."""
return self.state in [State.ACTIVE, State.FAILED, State.DELETED, State.INACTIVE]
def _stream_stop_condition(self):
"""Stops the stream sync once the model deployment is in a terminal state."""
return self.state in [State.FAILED, State.DELETED, State.INACTIVE]
def _check_and_print_status(self, prev_status) -> str:
"""Check and print the next status.
Parameters
----------
prev_status: str
The previous model deployment status.
Returns
-------
str:
The next model deployment status.
"""
status = self._model_deployment_status_text()
if status != prev_status:
timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
print(f"{timestamp} - {status}")
return status
def _model_deployment_status_text(self) -> str:
"""Formats the status message.
Returns
-------
str:
The model deployment life status and life cycle details.
"""
details = f", {self.lifecycle_details}" if self.lifecycle_details else ""
return f"Model Deployment {self.lifecycle_state}" + details
@property
def state(self) -> State:
"""Returns the deployment state of the current Model Deployment object"""
request_attempts = 0
while request_attempts < DEFAULT_RETRYING_REQUEST_ATTEMPTS:
request_attempts += 1
try:
self.current_state = State._from_str(self.sync().lifecycle_state)
break
except:
pass
time.sleep(1)
return self.current_state
@property
def status(self) -> State:
"""Returns the deployment state of the current Model Deployment object"""
return self.state
[docs]
def predict(
self,
json_input=None,
data: Any = None,
serializer: "ads.model.ModelInputSerializer" = model_input_serializer,
auto_serialize_data: bool = False,
model_name: str = None,
model_version: str = None,
**kwargs,
) -> dict:
"""Returns prediction of input data run against the model deployment endpoint.
Examples
--------
>>> import numpy as np
>>> from ads.model import ModelInputSerializer
>>> class MySerializer(ModelInputSerializer):
... def serialize(self, data):
... serialized_data = 1
... return serialized_data
>>> model_deployment = ModelDeployment.from_id(<model_deployment_id>)
>>> prediction = model_deployment.predict(
... data=np.array([1, 2, 3]),
... serializer=MySerializer(),
... auto_serialize_data=True,
... )['prediction']
Parameters
----------
json_input: Json serializable
JSON payload for the prediction.
data: Any
Data for the prediction.
serializer: ads.model.ModelInputSerializer
Defaults to ads.model.JsonModelInputSerializer.
auto_serialize_data: bool
Defaults to False. Indicate whether to auto serialize input data using `serializer`.
If `auto_serialize_data=False`, `data` required to be bytes or json serializable
and `json_input` required to be json serializable. If `auto_serialize_data` set
to True, data will be serialized before sending to model deployment endpoint.
model_name: str
Defaults to None. When the `inference_server="triton"`, the name of the model to invoke.
model_version: str
Defaults to None. When the `inference_server="triton"`, the version of the model to invoke.
kwargs:
content_type: str
Used to indicate the media type of the resource.
By default, it will be `application/octet-stream` for bytes input and `application/json` otherwise.
The content-type header will be set to this value when calling the model deployment endpoint.
Returns
-------
dict:
Prediction results.
"""
current_state = self.sync().lifecycle_state
if current_state != State.ACTIVE.name:
raise ModelDeploymentPredictError(
"This model deployment is not in active state, you will not be able to use predict end point. "
f"Current model deployment state: {current_state} "
)
endpoint = f"{self.url}/predict"
signer = authutil.default_signer()["signer"]
header = {
"signer": signer,
"content_type": kwargs.get("content_type", None),
}
header.update(kwargs.pop("headers", {}))
if data is None and json_input is None:
raise AttributeError(
"Neither `data` nor `json_input` are provided. You need to provide one of them."
)
if data is not None and json_input is not None:
raise AttributeError(
"`data` and `json_input` are both provided. You can only use one of them."
)
try:
if auto_serialize_data:
data = data or json_input
serialized_data = serializer.serialize(data=data)
return send_request(
data=serialized_data,
endpoint=endpoint,
is_json_payload=_is_json_serializable(serialized_data),
header=header,
)
if json_input is not None:
if not _is_json_serializable(json_input):
raise ValueError(
"`json_input` must be json serializable. "
"Set `auto_serialize_data` to True, or serialize the provided input data first,"
"or using `data` to pass binary data."
)
utils.get_logger().warning(
"The `json_input` argument of `predict()` will be deprecated soon. "
"Please use `data` argument. "
)
data = json_input
is_json_payload = _is_json_serializable(data)
if not isinstance(data, bytes) and not is_json_payload:
raise TypeError(
"`data` is not bytes or json serializable. Set `auto_serialize_data` to `True` to serialize the input data."
)
if model_name and model_version:
header["model-name"] = model_name
header["model-version"] = model_version
elif bool(model_version) ^ bool(model_name):
raise ValueError(
"`model_name` and `model_version` have to be provided together."
)
prediction = send_request(
data=data,
endpoint=endpoint,
is_json_payload=is_json_payload,
header=header,
)
return prediction
except oci.exceptions.ServiceError as ex:
# When bandwidth exceeds the allocated value, TooManyRequests error (429) will be raised by oci backend.
if ex.status == 429:
bandwidth_mbps = self.infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS
utils.get_logger().warning(
f"Load balancer bandwidth exceeds the allocated {bandwidth_mbps} Mbps."
"To estimate the actual bandwidth, use formula: (payload size in KB) * (estimated requests per second) * 8 / 1024."
"To resolve the issue, try sizing down the payload, slowing down the request rate or increasing the allocated bandwidth."
)
raise
[docs]
def activate(
self,
wait_for_completion: bool = True,
max_wait_time: int = DEFAULT_WAIT_TIME,
poll_interval: int = DEFAULT_POLL_INTERVAL,
) -> "ModelDeployment":
"""Activates a model deployment
Parameters
----------
wait_for_completion: bool
Flag set for whether to wait for deployment to be activated before proceeding.
Defaults to True.
max_wait_time: int
Maximum amount of time to wait in seconds (Defaults to 1200).
Negative implies infinite wait time.
poll_interval: int
Poll interval in seconds (Defaults to 10).
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
response = self.dsc_model_deployment.activate(
wait_for_completion=wait_for_completion,
max_wait_time=max_wait_time,
poll_interval=poll_interval,
)
return self._update_from_oci_model(response)
[docs]
def deactivate(
self,
wait_for_completion: bool = True,
max_wait_time: int = DEFAULT_WAIT_TIME,
poll_interval: int = DEFAULT_POLL_INTERVAL,
) -> "ModelDeployment":
"""Deactivates a model deployment
Parameters
----------
wait_for_completion: bool
Flag set for whether to wait for deployment to be deactivated before proceeding.
Defaults to True.
max_wait_time: int
Maximum amount of time to wait in seconds (Defaults to 1200).
Negative implies infinite wait time.
poll_interval: int
Poll interval in seconds (Defaults to 10).
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
response = self.dsc_model_deployment.deactivate(
wait_for_completion=wait_for_completion,
max_wait_time=max_wait_time,
poll_interval=poll_interval,
)
return self._update_from_oci_model(response)
def _log_details(self, log_type: str = ModelDeploymentLogType.ACCESS):
"""Gets log details for the provided `log_type`.
Properties
----------
log_type: (str, optional). Defaults to "access".
The log type. Can be "access" or "predict".
Returns
-------
oci.datascience_model.CategoryLogDetails
Category log details of the ModelDeployment.
Raises
------
AttributeError
Deployment doesn't have requested log configuration.
"""
if self.properties.category_log_details and getattr(
self.properties.category_log_details, log_type
):
return getattr(self.properties.category_log_details, log_type)
elif self.infrastructure:
category_log_details = self._build_category_log_details()
log = category_log_details.get(log_type, None)
if log and log.get("logId", None) and log.get("logGroupId", None):
return LogDetails(
log_id=log.get("logId"), log_group_id=log.get("logGroupId")
)
raise LogNotConfiguredError(
f"Deployment `{self.model_deployment_id}` "
f"has no `{log_type}` log configuration."
)
@property
def predict_log(self) -> OCILog:
"""Gets the model deployment predict logs object.
Returns
-------
OCILog
The OCILog object containing the predict logs.
"""
if not self._predict_log:
log_details = self._log_details(log_type=ModelDeploymentLogType.PREDICT)
compartment_id = (
self.infrastructure.compartment_id
if self.infrastructure
else self.properties.compartment_id
)
self._predict_log = OCILog(
compartment_id=compartment_id or COMPARTMENT_OCID,
id=log_details.log_id,
log_group_id=log_details.log_group_id,
source=self.model_deployment_id,
annotation=ModelDeploymentLogType.PREDICT,
)
return self._predict_log
@property
def access_log(self) -> OCILog:
"""Gets the model deployment access logs object.
Returns
-------
OCILog
The OCILog object containing the access logs.
"""
if not self._access_log:
log_details = self._log_details(log_type=ModelDeploymentLogType.ACCESS)
compartment_id = (
self.infrastructure.compartment_id
if self.infrastructure
else self.properties.compartment_id
)
self._access_log = OCILog(
compartment_id=compartment_id or COMPARTMENT_OCID,
id=log_details.log_id,
log_group_id=log_details.log_group_id,
source=self.model_deployment_id,
annotation=ModelDeploymentLogType.ACCESS,
)
return self._access_log
[docs]
def logs(self, log_type: str = None) -> ConsolidatedLog:
"""Gets the access or predict logs.
Parameters
----------
log_type: (str, optional). Defaults to None.
The log type. Can be "access", "predict" or None.
Returns
-------
ConsolidatedLog
The ConsolidatedLog object containing the logs.
"""
loggers = []
if not log_type:
try:
loggers.append(self.access_log)
except LogNotConfiguredError:
pass
try:
loggers.append(self.predict_log)
except LogNotConfiguredError:
pass
if not loggers:
raise LogNotConfiguredError(
"Neither `predict` nor `access` log was configured for the model deployment."
)
elif log_type == ModelDeploymentLogType.ACCESS:
loggers = [self.access_log]
elif log_type == ModelDeploymentLogType.PREDICT:
loggers = [self.predict_log]
else:
raise ValueError(
"Parameter log_type should be either access, predict or None."
)
return ConsolidatedLog(*loggers)
[docs]
def show_logs(
self,
time_start: datetime.datetime = None,
time_end: datetime.datetime = None,
limit: int = LOG_RECORDS_LIMIT,
log_type: str = None,
):
"""Shows deployment logs as a pandas dataframe.
Parameters
----------
time_start: (datetime.datetime, optional). Defaults to None.
Starting date and time in RFC3339 format for retrieving logs.
Defaults to None. Logs will be retrieved 14 days from now.
time_end: (datetime.datetime, optional). Defaults to None.
Ending date and time in RFC3339 format for retrieving logs.
Defaults to None. Logs will be retrieved until now.
limit: (int, optional). Defaults to 100.
The maximum number of items to return.
log_type: (str, optional). Defaults to None.
The log type. Can be "access", "predict" or None.
Returns
-------
A pandas DataFrame containing logs.
"""
logging = self.logs(log_type=log_type)
def prepare_log_record(log):
"""Converts a log record to ordered dict"""
log_content = log.get("logContent", {})
return collections.OrderedDict(
[
("type", log_content.get("type").split(".")[-1]),
("id", log_content.get("id")),
("message", log_content.get("data", {}).get("message")),
("time", log_content.get("time")),
]
)
logs = logging.search(
source=self.model_deployment_id,
time_start=time_start,
time_end=time_end,
limit=limit,
)
return pd.DataFrame([prepare_log_record(log.data) for log in logs])
[docs]
def sync(self) -> "ModelDeployment":
"""Updates the model deployment instance from backend.
Returns
-------
ModelDeployment
The ModelDeployment instance (self).
"""
return self._update_from_oci_model(
OCIDataScienceModelDeployment.from_id(self.model_deployment_id)
)
[docs]
@classmethod
def list(
cls,
status: str = None,
compartment_id: str = None,
project_id: str = None,
**kwargs,
) -> List["ModelDeployment"]:
"""Lists the model deployments associated with current compartment id and status
Parameters
----------
status : str
Status of deployment. Defaults to None.
Allowed values: `ACTIVE`, `CREATING`, `DELETED`, `DELETING`, `FAILED`, `INACTIVE` and `UPDATING`.
compartment_id : str
Target compartment to list deployments from.
Defaults to the compartment set in the environment variable "NB_SESSION_COMPARTMENT_OCID".
If "NB_SESSION_COMPARTMENT_OCID" is not set, the root compartment ID will be used.
An ValueError will be raised if root compartment ID cannot be determined.
project_id : str
Target project to list deployments from.
Defaults to the project id in the environment variable "PROJECT_OCID".
kwargs :
The values are passed to oci.data_science.DataScienceClient.list_model_deployments.
Returns
-------
list
A list of ModelDeployment objects.
"""
deployments = OCIDataScienceModelDeployment.list(
status=status,
compartment_id=compartment_id,
project_id=project_id,
**kwargs,
)
return [cls()._update_from_oci_model(deployment) for deployment in deployments]
[docs]
@classmethod
def list_df(
cls,
status: str = None,
compartment_id: str = None,
project_id: str = None,
) -> pd.DataFrame:
"""Returns the model deployments associated with current compartment and status
as a Dataframe that can be easily visualized
Parameters
----------
status : str
Status of deployment. Defaults to None.
Allowed values: `ACTIVE`, `CREATING`, `DELETED`, `DELETING`, `FAILED`, `INACTIVE` and `UPDATING`.
compartment_id : str
Target compartment to list deployments from.
Defaults to the compartment set in the environment variable "NB_SESSION_COMPARTMENT_OCID".
If "NB_SESSION_COMPARTMENT_OCID" is not set, the root compartment ID will be used.
An ValueError will be raised if root compartment ID cannot be determined.
project_id : str
Target project to list deployments from.
Defaults to the project id in the environment variable "PROJECT_OCID".
Returns
-------
DataFrame
pandas Dataframe containing information about the ModelDeployments
"""
model_deployments = cls.list(
status=status, compartment_id=compartment_id, project_id=project_id
)
if isinstance(status, str) or status == None:
status = State._from_str(status)
display = pd.DataFrame()
ids, urls, status_list = [], [], []
for model_deployment in model_deployments:
state_of_model = State._from_str(model_deployment.lifecycle_state)
if status == State.UNKNOWN or status.name == state_of_model.name:
ids.append(model_deployment.model_deployment_id)
urls.append(model_deployment.url)
status_list.append(model_deployment.lifecycle_state)
display["deployment_id"] = ids
display["deployment_url"] = urls
display["current_state"] = status_list
return display
[docs]
@classmethod
def from_id(cls, id: str) -> "ModelDeployment":
"""Loads the model deployment instance from ocid.
Parameters
----------
id: str
The ocid of model deployment.
Returns
-------
ModelDeployment
The ModelDeployment instance (self).
"""
oci_model = OCIDataScienceModelDeployment.from_id(id)
return cls(properties=oci_model)._update_from_oci_model(oci_model)
[docs]
@classmethod
def from_dict(cls, obj_dict: Dict) -> "ModelDeployment":
"""Loads model deployment instance from a dictionary of configurations.
Parameters
----------
obj_dict: Dict
A dictionary of configurations.
Returns
-------
ModelDeployment
The model deployment instance.
"""
if not isinstance(obj_dict, dict):
raise ValueError(
"The config data for initializing the model deployment is invalid."
)
spec = ads_utils.batch_convert_case(
copy.deepcopy(obj_dict.get("spec")), "camel"
)
mappings = {
cls.CONST_INFRASTRUCTURE: {
MODEL_DEPLOYMENT_INFRASTRUCTURE_TYPE: ModelDeploymentInfrastructure,
},
cls.CONST_RUNTIME: {
ModelDeploymentRuntimeType.CONDA: ModelDeploymentCondaRuntime,
ModelDeploymentRuntimeType.CONTAINER: ModelDeploymentContainerRuntime,
},
}
model_deployment = cls()
for key, value in spec.items():
if key in mappings:
mapping = mappings[key]
child_config = value
if child_config.get("type") not in mapping:
raise NotImplementedError(
f"{key.title()} type: {child_config.get('type')} is not supported."
)
model_deployment.set_spec(
key, mapping[child_config.get("type")].from_dict(child_config)
)
else:
model_deployment.set_spec(key, value)
return model_deployment
[docs]
def to_dict(self, **kwargs) -> Dict:
"""Serializes model deployment to a dictionary.
Returns
-------
dict
The model deployment serialized as a dictionary.
"""
spec = copy.deepcopy(self._spec)
for key, value in spec.items():
if hasattr(value, "to_dict"):
value = value.to_dict()
spec[key] = value
return {
"kind": self.kind,
"type": self.type,
"spec": ads_utils.batch_convert_case(spec, "camel"),
}
def _update_from_oci_model(self, oci_model_instance) -> "ModelDeployment":
"""Updates model deployment instance from OCIDataScienceModelDeployment.
Parameters
----------
oci_model_instance: OCIDataScienceModelDeployment
The OCIDataScienceModelDeployment instance.
Returns
-------
ModelDeployment
The model deployment instance.
"""
self.dsc_model_deployment = oci_model_instance
for key, value in self.attribute_map.items():
if hasattr(oci_model_instance, value):
self.set_spec(key, getattr(oci_model_instance, value))
infrastructure = ModelDeploymentInfrastructure()
self._extract_from_oci_model(
infrastructure, oci_model_instance, infrastructure.sub_level_attribute_maps
)
model_deployment_configuration_details = getattr(
oci_model_instance, "model_deployment_configuration_details", None
)
environment_configuration_details = getattr(
model_deployment_configuration_details,
"environment_configuration_details",
None,
)
runtime = (
ModelDeploymentContainerRuntime()
if getattr(
environment_configuration_details,
"environment_configuration_type",
None,
)
== OCIModelDeploymentRuntimeType.CONTAINER
else ModelDeploymentCondaRuntime()
)
self._extract_from_oci_model(runtime, oci_model_instance)
infrastructure.set_spec(
infrastructure.CONST_WEB_CONCURRENCY,
runtime.env.get("WEB_CONCURRENCY", None),
)
if (
runtime.env.get("CONTAINER_TYPE", None)
== MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON
):
runtime.set_spec(
runtime.CONST_INFERENCE_SERVER,
MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON.lower(),
)
self.set_spec(self.CONST_INFRASTRUCTURE, infrastructure)
self.set_spec(self.CONST_RUNTIME, runtime)
return self
@staticmethod
def _extract_from_oci_model(
dsc_instance: Union[ModelDeploymentInfrastructure, ModelDeploymentRuntime],
oci_model_instance: OCIDataScienceModelDeployment,
sub_level: Dict = {},
) -> Union[ModelDeploymentInfrastructure, ModelDeploymentRuntime]:
"""Extract attributes from OCIDataScienceModelDeployment.
Parameters
----------
dsc_instance: Union[ModelDeploymentInfrastructure, ModelDeploymentRuntime]
The ModelDeploymentInfrastructure or ModelDeploymentRuntime instance.
oci_model_instance: OCIDataScienceModelDeployment
The OCIDataScienceModelDeployment instance.
sub_level: Dict
The sub level attribute maps of ModelDeploymentInfrastructure or ModelDeploymentRuntime
Returns
-------
Union[ModelDeploymentInfrastructure, ModelDeploymentRuntime]
The ModelDeploymentInfrastructure or ModelDeploymentRuntime instance.
"""
for infra_attr, dsc_attr in dsc_instance.payload_attribute_map.items():
value = get_value(oci_model_instance, dsc_attr)
if value:
if infra_attr not in sub_level:
dsc_instance._spec[infra_attr] = value
else:
dsc_instance._spec[infra_attr] = {}
for sub_infra_attr, sub_dsc_attr in sub_level[infra_attr].items():
sub_value = get_value(value, sub_dsc_attr)
if sub_value:
dsc_instance._spec[infra_attr][sub_infra_attr] = sub_value
return dsc_instance
def _build_model_deployment_details(self) -> CreateModelDeploymentDetails:
"""Builds CreateModelDeploymentDetails from model deployment instance.
Returns
-------
CreateModelDeploymentDetails
The CreateModelDeploymentDetails instance.
"""
if not (self.infrastructure and self.runtime):
raise ValueError(
"Missing parameter runtime or infrastructure. Try reruning it after parameters are fully configured."
)
create_model_deployment_details = {
self.CONST_DISPLAY_NAME: self.display_name or self._random_display_name(),
self.CONST_DESCRIPTION: self.description,
self.CONST_DEFINED_TAG: self.defined_tags,
self.CONST_FREEFORM_TAG: self.freeform_tags,
self.runtime.CONST_DEPLOYMENT_MODE: self.runtime.deployment_mode
or ModelDeploymentMode.HTTPS,
self.infrastructure.CONST_COMPARTMENT_ID: self.infrastructure.compartment_id
or COMPARTMENT_OCID,
self.infrastructure.CONST_PROJECT_ID: self.infrastructure.project_id
or PROJECT_OCID,
self.infrastructure.CONST_MODEL_DEPLOYMENT_CONFIG_DETAILS: self._build_model_deployment_configuration_details(),
self.infrastructure.CONST_CATEGORY_LOG_DETAILS: self._build_category_log_details(),
}
return OCIDataScienceModelDeployment(
**create_model_deployment_details
).to_oci_model(CreateModelDeploymentDetails)
def _update_model_deployment_details(
self, **kwargs
) -> UpdateModelDeploymentDetails:
"""Builds UpdateModelDeploymentDetails from model deployment instance.
Returns
-------
UpdateModelDeploymentDetails
The UpdateModelDeploymentDetails instance.
"""
if not (self.infrastructure and self.runtime):
raise ValueError(
"Missing parameter runtime or infrastructure. Try reruning it after parameters are fully configured."
)
self._update_spec(**kwargs)
update_model_deployment_details = {
self.CONST_DISPLAY_NAME: self.display_name,
self.CONST_DESCRIPTION: self.description,
self.CONST_DEFINED_TAG: self.defined_tags,
self.CONST_FREEFORM_TAG: self.freeform_tags,
self.infrastructure.CONST_MODEL_DEPLOYMENT_CONFIG_DETAILS: self._build_model_deployment_configuration_details(),
self.infrastructure.CONST_CATEGORY_LOG_DETAILS: self._build_category_log_details(),
}
return OCIDataScienceModelDeployment(
**update_model_deployment_details
).to_oci_model(UpdateModelDeploymentDetails)
def _update_spec(self, **kwargs) -> "ModelDeployment":
"""Updates model deployment specs from kwargs.
Parameters
----------
kwargs:
display_name: (str)
Model deployment display name
description: (str)
Model deployment description
freeform_tags: (dict)
Model deployment freeform tags
defined_tags: (dict)
Model deployment defined tags
Additional kwargs arguments.
Can be any attribute that `ads.model.deployment.ModelDeploymentCondaRuntime`, `ads.model.deployment.ModelDeploymentContainerRuntime`
and `ads.model.deployment.ModelDeploymentInfrastructure` accepts.
Returns
-------
ModelDeployment
The instance of ModelDeployment.
"""
if not kwargs:
return self
converted_specs = ads_utils.batch_convert_case(kwargs, "camel")
specs = {
"self": self._spec,
"runtime": self.runtime._spec,
"infrastructure": self.infrastructure._spec,
}
sub_set = {
self.infrastructure.CONST_ACCESS_LOG,
self.infrastructure.CONST_PREDICT_LOG,
self.infrastructure.CONST_SHAPE_CONFIG_DETAILS,
}
for spec_value in specs.values():
for key in spec_value:
if key in converted_specs:
if key in sub_set:
for sub_key in converted_specs[key]:
converted_sub_key = ads_utils.snake_to_camel(sub_key)
spec_value[key][converted_sub_key] = converted_specs[key][
sub_key
]
else:
spec_value[key] = copy.deepcopy(converted_specs[key])
self = (
ModelDeployment(spec=specs["self"])
.with_runtime(
MODEL_DEPLOYMENT_RUNTIMES[self.runtime.type](spec=specs["runtime"])
)
.with_infrastructure(
ModelDeploymentInfrastructure(spec=specs["infrastructure"])
)
)
return self
def _build_model_deployment_configuration_details(self) -> Dict:
"""Builds model deployment configuration details from model deployment instance.
Returns
-------
Dict:
Dict contains model deployment configuration details.
"""
infrastructure = self.infrastructure
runtime = self.runtime
instance_configuration = {
infrastructure.CONST_INSTANCE_SHAPE_NAME: infrastructure.shape_name
or DEFAULT_SHAPE_NAME,
}
if instance_configuration[infrastructure.CONST_INSTANCE_SHAPE_NAME].endswith(
"Flex"
):
instance_configuration[
infrastructure.CONST_MODEL_DEPLOYMENT_INSTANCE_SHAPE_CONFIG_DETAILS
] = {
infrastructure.CONST_OCPUS: infrastructure.shape_config_details.get(
"ocpus", None
)
or DEFAULT_OCPUS,
infrastructure.CONST_MEMORY_IN_GBS: infrastructure.shape_config_details.get(
"memory_in_gbs", None
)
or infrastructure.shape_config_details.get("memoryInGBs", None)
or DEFAULT_MEMORY_IN_GBS,
}
if infrastructure.subnet_id:
instance_configuration[
infrastructure.CONST_SUBNET_ID
] = infrastructure.subnet_id
scaling_policy = {
infrastructure.CONST_POLICY_TYPE: "FIXED_SIZE",
infrastructure.CONST_INSTANCE_COUNT: infrastructure.replica
or DEFAULT_REPLICA,
}
if not runtime.model_uri:
raise ValueError(
"Missing parameter model uri. Try reruning it after model uri is configured."
)
model_id = runtime.model_uri
if not model_id.startswith("ocid"):
from ads.model.datascience_model import DataScienceModel
dsc_model = DataScienceModel(
name=self.display_name,
compartment_id=self.infrastructure.compartment_id or COMPARTMENT_OCID,
project_id=self.infrastructure.project_id or PROJECT_OCID,
artifact=runtime.model_uri,
).create(
bucket_uri=runtime.bucket_uri,
auth=runtime.auth,
region=runtime.region,
overwrite_existing_artifact=runtime.overwrite_existing_artifact,
remove_existing_artifact=runtime.remove_existing_artifact,
timeout=runtime.timeout,
)
model_id = dsc_model.id
model_configuration_details = {
infrastructure.CONST_BANDWIDTH_MBPS: infrastructure.bandwidth_mbps
or DEFAULT_BANDWIDTH_MBPS,
infrastructure.CONST_INSTANCE_CONFIG: instance_configuration,
runtime.CONST_MODEL_ID: model_id,
infrastructure.CONST_SCALING_POLICY: scaling_policy,
}
if runtime.env:
if not hasattr(
oci.data_science.models,
"ModelDeploymentEnvironmentConfigurationDetails",
):
raise EnvironmentError(
"Environment variable hasn't been supported in the current OCI SDK installed."
)
environment_variables = runtime.env
if infrastructure.web_concurrency:
environment_variables["WEB_CONCURRENCY"] = str(
infrastructure.web_concurrency
)
runtime.set_spec(runtime.CONST_ENV, environment_variables)
if (
hasattr(runtime, "inference_server")
and runtime.inference_server
and runtime.inference_server.upper()
== MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON
):
environment_variables[
"CONTAINER_TYPE"
] = MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON
runtime.set_spec(runtime.CONST_ENV, environment_variables)
environment_configuration_details = {
runtime.CONST_ENVIRONMENT_CONFIG_TYPE: runtime.environment_config_type,
runtime.CONST_ENVIRONMENT_VARIABLES: runtime.env,
}
if runtime.environment_config_type == OCIModelDeploymentRuntimeType.CONTAINER:
if not hasattr(
oci.data_science.models,
"OcirModelDeploymentEnvironmentConfigurationDetails",
):
raise EnvironmentError(
"Container runtime hasn't been supported in the current OCI SDK installed."
)
environment_configuration_details["image"] = runtime.image
environment_configuration_details["imageDigest"] = runtime.image_digest
environment_configuration_details["cmd"] = runtime.cmd
environment_configuration_details["entrypoint"] = runtime.entrypoint
environment_configuration_details["serverPort"] = runtime.server_port
environment_configuration_details[
"healthCheckPort"
] = runtime.health_check_port
model_deployment_configuration_details = {
infrastructure.CONST_DEPLOYMENT_TYPE: "SINGLE_MODEL",
infrastructure.CONST_MODEL_CONFIG_DETAILS: model_configuration_details,
runtime.CONST_ENVIRONMENT_CONFIG_DETAILS: environment_configuration_details,
}
if runtime.deployment_mode == ModelDeploymentMode.STREAM:
if not hasattr(oci.data_science.models, "StreamConfigurationDetails"):
raise EnvironmentError(
"Model deployment mode hasn't been supported in the current OCI SDK installed."
)
model_deployment_configuration_details[
infrastructure.CONST_STREAM_CONFIG_DETAILS
] = {
runtime.CONST_INPUT_STREAM_IDS: runtime.input_stream_ids,
runtime.CONST_OUTPUT_STREAM_IDS: runtime.output_stream_ids,
}
return model_deployment_configuration_details
def _build_category_log_details(self) -> Dict:
"""Builds category log details from model deployment instance.
Returns
-------
Dict:
Dict contains category log details.
"""
if self.infrastructure.log_group_id and self.infrastructure.log_id:
log_group_details = {
self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.log_group_id,
self.infrastructure.CONST_LOG_ID: self.infrastructure.log_id,
}
return {
self.infrastructure.CONST_ACCESS: log_group_details,
self.infrastructure.CONST_PREDICT: log_group_details,
}
logs = {}
if (
self.infrastructure.access_log and
self.infrastructure.access_log.get(self.infrastructure.CONST_LOG_GROUP_ID, None)
and self.infrastructure.access_log.get(self.infrastructure.CONST_LOG_ID, None)
):
logs[self.infrastructure.CONST_ACCESS] = {
self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.access_log.get(
"logGroupId", None
),
self.infrastructure.CONST_LOG_ID: self.infrastructure.access_log.get(
"logId", None
),
}
if (
self.infrastructure.predict_log and
self.infrastructure.predict_log.get(self.infrastructure.CONST_LOG_GROUP_ID, None)
and self.infrastructure.predict_log.get(self.infrastructure.CONST_LOG_ID, None)
):
logs[self.infrastructure.CONST_PREDICT] = {
self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.predict_log.get(
"logGroupId", None
),
self.infrastructure.CONST_LOG_ID: self.infrastructure.predict_log.get(
"logId", None
),
}
return logs
def _random_display_name(self):
"""Generates a random display name."""
return f"{self._PREFIX}-{ads_utils.get_random_name_for_resource()}"
def _extract_spec_kwargs(self, **kwargs) -> Dict:
"""Extract spec related keyword arguments from kwargs.
Parameters
----------
kwargs
Returns
-------
Dict:
Dict contains model deployment spec related keyword arguments.
"""
spec_kwargs = {}
for attribute in self.initialize_spec_attributes:
if attribute in kwargs:
spec_kwargs[attribute] = kwargs[attribute]
return spec_kwargs
[docs]
def build(self) -> "ModelDeployment":
"""Load default values from the environment for the job infrastructure."""
build_method = getattr(self.infrastructure, "build", None)
if build_method and callable(build_method):
build_method()
else:
raise NotImplementedError
return self