Customizing the Model

Customize score.py

Here is an example for preparing a model artifact for TensorFlow model which is trained on the minsit dataset. The final layer in the model produces 10 values corresponding to each digit. The default score.py will produce an array of 10 elements for each input vector. Suppose you want to change the default behavior of predict fucntion in score.py to return most likely digit instead of returning a probablity distribution over all the digits. To do so we can return the position corresponding to the maximum value within the output array. Here are the steps to customize the score.py -

Step1: Train your estimator and then generate the Model artifact as shown below -

from ads.catalog.model import ModelCatalog
from ads.model.framework.tensorflow_model import TensorFlowModel
import tensorflow as tf
from uuid import uuid4
from ads.common.model_metadata import UseCaseType

mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

tf_estimator = tf.keras.models.Sequential(
        [
            tf.keras.layers.Flatten(input_shape=(28, 28)),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(10),
        ]
    )
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
tf_estimator.compile(optimizer="adam", loss=loss_fn, metrics=["accuracy"])
tf_estimator.fit(x_train, y_train, epochs=1)

tf_model = TensorFlowModel(tf_estimator, artifact_dir=f"./model-artifact-{str(uuid4())}")

# Autogenerate score.py, pickled model, runtime.yaml, input_schema.json and output_schema.json
tf_model.prepare(inference_conda_env="generalml_p38_cpu_v1",
                    use_case_type=UseCaseType.MULTINOMIAL_CLASSIFICATION,
                    X_sample=trainx,
                    y_sample=trainy
                )

Verify the output produced by the autogenerated score.py by calling verify on Model object.

print(tensorflow_model.verify(testx[:3])['prediction'])
[[-2.9461750984191895, -5.293642997741699, 0.4030594229698181, 3.0270071029663086, -6.470805644989014, -2.07453989982605, -9.646402359008789, 9.256569862365723, -2.6433541774749756, -0.8167083263397217],
[-3.4297854900360107, 2.4863781929016113, 8.968724250793457, 3.162344217300415, -11.153030395507812, 0.15335027873516083, -0.5451826453208923, -7.817524433135986, -1.0585914850234985, -10.736929893493652],
[-4.420501232147217, 5.841022491455078, -0.17066864669322968, -1.0071465969085693, -2.261953592300415, -3.0983355045318604, -2.0874621868133545, 1.0745809078216553, -1.2511857748031616, -2.273810625076294]]

The default score.py in tf_model.artifact_dir location is -

import os
import sys
from functools import lru_cache
import pandas as pd
import numpy as np
import json
import tensorflow as tf
from io import BytesIO
import base64

model_name = 'model.h5'


"""
Inference script. This script is used for prediction by scoring server when schema is known.
"""


@lru_cache(maxsize=10)
def load_model(model_file_name=model_name):
    """
    Loads model from the serialized format

    Returns
    -------
    model:  a model instance on which predict API can be invoked
    """
    model_dir = os.path.dirname(os.path.realpath(__file__))
    if model_dir not in sys.path:
        sys.path.insert(0, model_dir)
    contents = os.listdir(model_dir)
    if model_file_name in contents:
        print(f'Start loading {model_file_name} from model directory {model_dir} ...')
        loaded_model = tf.keras.models.load_model(os.path.join(model_dir, model_file_name))

        print("Model is successfully loaded.")
        return loaded_model
    else:
        raise Exception(f'{model_file_name} is not found in model directory {model_dir}')


@lru_cache(maxsize=1)
def fetch_data_type_from_schema(input_schema_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_schema.json")):
    """
    Returns data type information fetch from input_schema.json.

    Parameters
    ----------
    input_schema_path: path of input schema.

    Returns
    -------
    data_type: data type fetch from input_schema.json.

    """
    data_type = {}
    if os.path.exists(input_schema_path):
        schema = json.load(open(input_schema_path))
        for col in schema['schema']:
            data_type[col['name']] = col['dtype']
    else:
        print("input_schema has to be passed in in order to recover the same data type. pass `X_sample` in `ads.model.framework.tensorflow_model.TensorFlowModel.prepare` function to generate the input_schema. Otherwise, the data type might be changed after serialization/deserialization.")
    return data_type


def deserialize(data, input_schema_path):
    """
    Deserialize json-serialized data to data in original type when sent to
predict.

    Parameters
    ----------
    data: serialized input data.
    input_schema_path: path of input schema.

    Returns
    -------
    data: deserialized input data.

    """
    data_type = data.get('data_type', '')
    json_data = data.get('data', data)

    if "numpy.ndarray" in data_type:
        load_bytes = BytesIO(base64.b64decode(json_data.encode('utf-8')))
        return np.load(load_bytes, allow_pickle=True)
    if "pandas.core.series.Series" in data_type:
        return pd.Series(json_data)
    if "pandas.core.frame.DataFrame" in data_type:
        return pd.read_json(json_data, dtype=fetch_data_type_from_schema(input_schema_path))
    if "tensorflow.python.framework.ops.EagerTensor" in data_type:
        load_bytes = BytesIO(base64.b64decode(json_data.encode('utf-8')))
        return tf.convert_to_tensor(np.load(load_bytes, allow_pickle=True))

    return json_data

def pre_inference(data, input_schema_path):
    """
    Preprocess json-serialized data to feed into predict function.

    Parameters
    ----------
    data: Data format as expected by the predict API of the core estimator.
    input_schema_path: path of input schema.

    Returns
    -------
    data: Data format after any processing.
    """
    data = deserialize(data, input_schema_path)

    # Add further data preprocessing if needed
    return data

def post_inference(yhat):
    """
    Post-process the model results.

    Parameters
    ----------
    yhat: Data format after calling model.predict.

    Returns
    -------
    yhat: Data format after any processing.

    """

    return yhat.numpy().tolist()

def predict(data, model=load_model(), input_schema_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_schema.json")):
    """
    Returns prediction given the model and data to predict.

    Parameters
    ----------
    model: Model instance returned by load_model API
    data: Data format as expected by the predict API of the core estimator.
    input_schema_path: path of input schema.

    Returns
    -------
    predictions: Output from scoring server
        Format: {'prediction': output from model.predict method}

    """
    inputs = pre_inference(data, input_schema_path)

    yhat = post_inference(
        model(inputs)
    )
    return {'prediction': yhat}

Step 2: Update post_inference method in score.py to find the index corresponding the maximum value and return. We can use argmax function from tensorflow to achieve that. Here is the modified code -

  1import os
  2import sys
  3from functools import lru_cache
  4import pandas as pd
  5import numpy as np
  6import json
  7import tensorflow as tf
  8from io import BytesIO
  9import base64
 10
 11model_name = 'model.h5'
 12
 13
 14"""
 15Inference script. This script is used for prediction by scoring server when schema is known.
 16"""
 17
 18
 19@lru_cache(maxsize=10)
 20def load_model(model_file_name=model_name):
 21    """
 22    Loads model from the serialized format
 23
 24    Returns
 25    -------
 26    model:  a model instance on which predict API can be invoked
 27    """
 28    model_dir = os.path.dirname(os.path.realpath(__file__))
 29    if model_dir not in sys.path:
 30        sys.path.insert(0, model_dir)
 31    contents = os.listdir(model_dir)
 32    if model_file_name in contents:
 33        print(f'Start loading {model_file_name} from model directory {model_dir} ...')
 34        loaded_model = tf.keras.models.load_model(os.path.join(model_dir, model_file_name))
 35
 36        print("Model is successfully loaded.")
 37        return loaded_model
 38    else:
 39        raise Exception(f'{model_file_name} is not found in model directory {model_dir}')
 40
 41
 42@lru_cache(maxsize=1)
 43def fetch_data_type_from_schema(input_schema_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_schema.json")):
 44    """
 45    Returns data type information fetch from input_schema.json.
 46
 47    Parameters
 48    ----------
 49    input_schema_path: path of input schema.
 50
 51    Returns
 52    -------
 53    data_type: data type fetch from input_schema.json.
 54
 55    """
 56    data_type = {}
 57    if os.path.exists(input_schema_path):
 58        schema = json.load(open(input_schema_path))
 59        for col in schema['schema']:
 60            data_type[col['name']] = col['dtype']
 61    else:
 62        print("input_schema has to be passed in in order to recover the same data type. pass `X_sample` in `ads.model.framework.tensorflow_model.TensorFlowModel.prepare` function to generate the input_schema. Otherwise, the data type might be changed after serialization/deserialization.")
 63    return data_type
 64
 65
 66def deserialize(data, input_schema_path):
 67    """
 68    Deserialize json-serialized data to data in original type when sent to
 69predict.
 70
 71    Parameters
 72    ----------
 73    data: serialized input data.
 74    input_schema_path: path of input schema.
 75
 76    Returns
 77    -------
 78    data: deserialized input data.
 79
 80    """
 81    data_type = data.get('data_type', '')
 82    json_data = data.get('data', data)
 83
 84    if "numpy.ndarray" in data_type:
 85        load_bytes = BytesIO(base64.b64decode(json_data.encode('utf-8')))
 86        return np.load(load_bytes, allow_pickle=True)
 87    if "pandas.core.series.Series" in data_type:
 88        return pd.Series(json_data)
 89    if "pandas.core.frame.DataFrame" in data_type:
 90        return pd.read_json(json_data, dtype=fetch_data_type_from_schema(input_schema_path))
 91    if "tensorflow.python.framework.ops.EagerTensor" in data_type:
 92        load_bytes = BytesIO(base64.b64decode(json_data.encode('utf-8')))
 93        return tf.convert_to_tensor(np.load(load_bytes, allow_pickle=True))
 94
 95    return json_data
 96
 97def pre_inference(data, input_schema_path):
 98    """
 99    Preprocess json-serialized data to feed into predict function.
100
101    Parameters
102    ----------
103    data: Data format as expected by the predict API of the core estimator.
104    input_schema_path: path of input schema.
105
106    Returns
107    -------
108    data: Data format after any processing.
109    """
110    data = deserialize(data, input_schema_path)
111
112    # Add further data preprocessing if needed
113    return data
114
115def post_inference(yhat):
116    """
117    Post-process the model results.
118
119    Parameters
120    ----------
121    yhat: Data format after calling model.predict.
122
123    Returns
124    -------
125    yhat: Data format after any processing.
126
127    """
128    yhat = tf.argmax(yhat, axis=1) # Get the index of the max value
129    return yhat.numpy().tolist()
130
131def predict(data, model=load_model(), input_schema_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_schema.json")):
132    """
133    Returns prediction given the model and data to predict.
134
135    Parameters
136    ----------
137    model: Model instance returned by load_model API
138    data: Data format as expected by the predict API of the core estimator.
139    input_schema_path: path of input schema.
140
141    Returns
142    -------
143    predictions: Output from scoring server
144        Format: {'prediction': output from model.predict method}
145
146    """
147    inputs = pre_inference(data, input_schema_path)
148
149    yhat = post_inference(
150        model(inputs)
151    )
152    return {'prediction': yhat}

Step 3: Verify the changes

print(tensorflow_model.verify(testx[:3])['prediction'])
Start loading model.h5 from model directory /tmp/tmppkco6xrt ...
Model is successfully loaded.
[7, 2, 1]

Step 4: Register the model

model_id = tensorflow_model.save()

Step 5: Deploy and generate the endpoint

>>> # Deploy and create an endpoint for the TensorFlow model
>>> tensorflow_model.deploy(
        display_name="TensorFlow Model For Classification",
         deployment_log_group_id = "ocid1.loggroup.oc1.xxx.xxxxx",
         deployment_access_log_id = "ocid1.log.oc1.xxx.xxxxx",
         deployment_predict_log_id = "ocid1.log.oc1.xxx.xxxxx"
    )
>>> print(f"Endpoint: {tensorflow_model.model_deployment.url}")
https://modeldeployment.{region}.oci.customer-oci.com/ocid1.datasciencemodeldeployment.oc1.xxx.xxxxx

Step 6: Run prediction from the endpoint

print(tensorflow_model.predict(testx[:3])['prediction'])
[7, 2, 1]