Working with AutoML has moved from within ADS to working directly with the automlx library. To deploy an AutoMlx model, use GenericModel class.

The following example takes your trained AutoML model using GenericModel and deploys it into production.


import pandas as pd
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

import ads
import automl
from automl import init
from ads.model import GenericModel
from ads.common.model_metadata import UseCaseType

dataset = fetch_openml(name='adult', as_frame=True)
df, y =,

# Several of the columns are incorrectly labeled as category type in the original dataset
numeric_columns = ['age', 'capitalgain', 'capitalloss', 'hoursperweek']
for col in df.columns:
    if col in numeric_columns:
        df[col] = df[col].astype(int)

X_train, X_test, y_train, y_test = train_test_split(df,
                                          {'>50K': 1, '<=50K': 0}).astype(int),

est = automl.Pipeline(task='classification'), y_train)

automl_model = GenericModel(estimator=est, artifact_dir="automl_model_artifact")

Open automl_model_artifact/ and edit the code to instantiate the model class. The edits are highlighted -

# 1.0 generated by ADS 2.8.1 on 20230226_214703
import json
import os
import sys
import importlib
from cloudpickle import cloudpickle
from functools import lru_cache
from io import StringIO
import logging
import sys
import automl
import pandas as pd
import numpy as np

model_name = 'model.pkl'

Inference script. This script is used for prediction by scoring server when schema is known.

def init_automl_logger():
    logger = logging.getLogger("automl")
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    automl.init(engine="local", engine_opts={"n_jobs": 1}, logger=logger)

def load_model(model_file_name=model_name):
    Loads model from the serialized format

    model:  a model instance on which predict API can be invoked
    model_dir = os.path.dirname(os.path.realpath(__file__))
    if model_dir not in sys.path:
        sys.path.insert(0, model_dir)
    contents = os.listdir(model_dir)
    if model_file_name in contents:
        print(f'Start loading {model_file_name} from model directory {model_dir} ...')
        with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), model_file_name), "rb") as file:
            loaded_model = cloudpickle.load(file)

        print("Model is successfully loaded.")
        return loaded_model
        raise Exception(f'{model_file_name} is not found in model directory {model_dir}')

def fetch_data_type_from_schema(input_schema_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_schema.json")):
    Returns data type information fetch from input_schema.json.

    input_schema_path: path of input schema.

    data_type: data type fetch from input_schema.json.

    data_type = {}
    if os.path.exists(input_schema_path):
        schema = json.load(open(input_schema_path))
        for col in schema['schema']:
            data_type[col['name']] = col['dtype']
        print("input_schema has to be passed in in order to recover the same data type. pass `X_sample` in `ads.model.framework.automl_model.AutoMLModel.prepare` function to generate the input_schema. Otherwise, the data type might be changed after serialization/deserialization.")
    return data_type

def deserialize(data, input_schema_path, task=None):
    Deserialize json serialization data to data in original type when sent to predict.

    data: serialized input data.
    input_schema_path: path of input schema.
    task: Machine learning task, supported: classification, regression, anomaly_detection, forecasting. Defaults to None.

    data: deserialized input data.


    if isinstance(data, bytes):
        return pd.read_json(StringIO(data.decode("utf-8")))

    data_type = data.get('data_type', '') if isinstance(data, dict) else ''
    json_data = data.get('data', data) if isinstance(data, dict) else data

    if task and task == "forecasting":
            data_type = data_type.split("'")[1]
            module, spec = ".".join(data_type.split(".")[:-1]), data_type.split(".")[-1]
            lib = importlib.import_module(name=module)
            func = getattr(lib, spec)
            return pd.DataFrame(index=func(json_data))
            logging.warning("Cannot autodetect the type of the model input data. By default, convert input data to pd.DatetimeIndex and feed the model with an empty pandas DataFrame with index as input data. If assumption is not correct, modify the and check with .verify() before saving model with .save().")
            return pd.DataFrame(index=pd.DatetimeIndex(json_data))
    if "pandas.core.series.Series" in data_type:
        return pd.Series(json_data)
    if "pandas.core.frame.DataFrame" in data_type or isinstance(json_data, str):
        return pd.read_json(json_data, dtype=fetch_data_type_from_schema(input_schema_path))
    if isinstance(json_data, dict):
        return pd.DataFrame.from_dict(json_data)

    return json_data

def pre_inference(data, input_schema_path, task=None):
    Preprocess data

    data: Data format as expected by the predict API of the core estimator.
    input_schema_path: path of input schema.
    task: Machine learning task, supported: classification, regression, anomaly_detection, forecasting. Defaults to None.

    data: Data format after any processing.

    data = deserialize(data, input_schema_path, task)
    return data

def post_inference(yhat):
    Post-process the model results

    yhat: Data format after calling model.predict.

    yhat: Data format after any processing.

    if isinstance(yhat, pd.core.frame.DataFrame):
        yhat = yhat.values
    return yhat.tolist()

def predict(data, model=load_model(), input_schema_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_schema.json")):
    Returns prediction given the model and data to predict

    model: Model instance returned by load_model API
    data: Data format as expected by the predict API of the core estimator. For eg. in case of sckit models it could be numpy array/List of list/Pandas DataFrame
    input_schema_path: path of input schema.

    predictions: Output from scoring server
        Format: {'prediction': output from model.predict method}

    task = model.task if hasattr(model, "task") else None
    features = pre_inference(data, input_schema_path, task)
    yhat = post_inference(
    return {'prediction': yhat}

Verify changes by running inference locally.

automl_model.verify(X_test.iloc[:2], auto_serialize_data=True)

Save the model, and Deploy the model. After it is successfully deployed, invoke the endpoint by calling .predict() function.

model_id ='Demo AutoMLModel model')
deploy = automl_model.deploy(display_name='Demo AutoMLModel deployment')
automl_model.predict(X_test.iloc[:2], auto_serialize_data=True)