Source code for ads.model.framework.spark_model

#!/usr/bin/env python
# -*- coding: utf-8 -*--

# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at

import os
import numpy as np
import pandas as pd
import fsspec
from typing import Any, Callable, Dict, List, Optional, Union, Tuple
from ads.common import logger, utils
from ads.common.decorator.runtime_dependency import (
from ads.common.object_storage_details import ObjectStorageDetails
from ads.model.extractor.spark_extractor import SparkExtractor
from ads.model.serde.model_input import SparkModelInputSerializerType
from ads.model.serde.model_serializer import SparkModelSerializerType
from ads.model.generic_model import (
from ads.model.model_properties import ModelProperties
from ads.model.serde.common import SERDE

SPARK_DATAFRAME_SCHEMA_PATH = "_input_data_schema.json"

[docs]class SparkPipelineModel(FrameworkSpecificModel): """SparkPipelineModel class for estimators from the pyspark framework. Attributes ---------- algorithm: str The algorithm of the model. artifact_dir: str Artifact directory to store the files needed for deployment. auth: Dict Default authentication is set using the `ads.set_auth` API. To override the default, use the `ads.common.auth.api_keys` or `ads.common.auth.resource_principal` to create an authentication signer to instantiate an IdentityClient object. estimator: Callable A trained pyspark estimator/model using pyspark. framework: str "spark", the framework name of the model. hyperparameter: dict The hyperparameters of the estimator. metadata_custom: ModelCustomMetadata The model custom metadata. metadata_provenance: ModelProvenanceMetadata The model provenance metadata. metadata_taxonomy: ModelTaxonomyMetadata The model taxonomy metadata. model_artifact: ModelArtifact This is built by calling prepare. A ModelDeployment instance. model_file_name: str Name of the serialized model. model_id: str The model ID. properties: ModelProperties ModelProperties object required to save and deploy model. For more details, check runtime_info: RuntimeInfo A RuntimeInfo instance. schema_input: Schema Schema describes the structure of the input data. schema_output: Schema Schema describes the structure of the output data. serialize: bool Whether to serialize the model to pkl file by default. If False, you need to serialize the model manually, save it under artifact_dir and update the manually. version: str The framework version of the model. Methods ------- delete_deployment(...) Deletes the current model deployment. deploy(..., **kwargs) Deploys a model. from_model_artifact(uri, model_file_name, artifact_dir, ..., **kwargs) Loads model from the specified folder, or zip/tar archive. from_model_catalog(model_id, model_file_name, artifact_dir, ..., **kwargs) Loads model from model catalog. introspect(...) Runs model introspection. predict(data, ...) Returns prediction of input data run against the model deployment endpoint. prepare(..., **kwargs) Prepare and save the, serialized model and runtime.yaml file. reload(...) Reloads the model artifact files: `` and the `runtime.yaml`. save(..., **kwargs) Saves model artifacts to the model catalog. summary_status(...) Gets a summary table of the current status. verify(data, ...) Tests if deployment works in local environment. Examples -------- >>> import tempfile >>> from ads.model.framework.spark_model import SparkPipelineModel >>> from import Vectors >>> from import LogisticRegression >>> training = spark.createDataFrame([ >>> (1.0, Vectors.dense([0.0, 1.1, 0.1])), >>> (0.0, Vectors.dense([2.0, 1.0, -1.0])), >>> (0.0, Vectors.dense([2.0, 1.3, 1.0])), >>> (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"]) >>> lr_estimator = LogisticRegression(maxIter=10, regParam=0.001) >>> pipeline = Pipeline(stages=[lr_estimator]) >>> pipeline_model = >>> spark_model = SparkPipelineModel(estimator=pipeline_model, artifact_dir=tempfile.mkdtemp()) >>> spark_model.prepare(inference_conda_env="dataexpl_p37_cpu_v3") >>> spark_model.verify(training) >>> >>> model_deployment = spark_model.deploy() >>> spark_model.predict(training) >>> spark_model.delete_deployment() """ _PREFIX = "spark" model_input_serializer_type = SparkModelInputSerializerType model_save_serializer_type = SparkModelSerializerType @runtime_dependency( module="pyspark", short_name="ml", object="ml", install_from=OptionalDependency.SPARK, ) def __init__( self, estimator: Callable, artifact_dir: Optional[str] = None, properties: Optional[ModelProperties] = None, auth: Dict = None, model_save_serializer: Optional[SERDE] = model_save_serializer_type.SPARK, model_input_serializer: Optional[SERDE] = model_input_serializer_type.SPARK, **kwargs, ): """ Initiates a SparkPipelineModel instance. Parameters ---------- estimator: Callable SparkPipelineModel artifact_dir: str The URI for the generated artifact, which can be local path or OCI object storage URI. properties: (ModelProperties, optional). Defaults to None. ModelProperties object required to save and deploy model. auth :(Dict, optional). Defaults to None. The default authetication is set using `ads.set_auth` API. If you need to override the default, use the `ads.common.auth.api_keys` or `ads.common.auth.resource_principal` to create appropriate authentication signer and kwargs required to instantiate IdentityClient object. model_save_serializer: (SERDE or str, optional). Defaults to None. Instance of ads.model.SERDE. Used for serialize/deserialize model. model_input_serializer: (SERDE, optional). Defaults to `ads.model.serde.model_input.SparkModelInputSERDE`. Instance of ads.model.SERDE. Used for serialize/deserialize data. Returns ------- SparkPipelineModel SparkPipelineModel instance. Examples -------- >>> import tempfile >>> from ads.model.framework.spark_model import SparkPipelineModel >>> from import Vectors >>> from import LogisticRegression >>> from import Pipeline >>> training = spark.createDataFrame([ >>> (1.0, Vectors.dense([0.0, 1.1, 0.1])), >>> (0.0, Vectors.dense([2.0, 1.0, -1.0])), >>> (0.0, Vectors.dense([2.0, 1.3, 1.0])), >>> (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"]) >>> lr_estimator = LogisticRegression(maxIter=10, regParam=0.001) >>> pipeline = Pipeline(stages=[lr_estimator]) >>> pipeline_model = >>> spark_model = SparkPipelineModel(estimator=pipeline_model, artifact_dir=tempfile.mkdtemp()) >>> spark_model.prepare(inference_conda_env="pyspark30_p37_cpu_v5") >>> spark_model.verify(training) >>> >>> model_deployment = spark_model.deploy() >>> spark_model.predict(training) >>> spark_model.delete_deployment() """ if not (type(estimator) in [ml.PipelineModel]): raise TypeError( f"{str(type(estimator))} is not supported in `SparkPipelineModel`s." ) super().__init__( estimator=estimator, artifact_dir=artifact_dir, properties=properties, auth=auth, model_save_serializer=model_save_serializer, model_input_serializer=model_input_serializer, **kwargs, ) self._extractor = SparkExtractor(estimator) self.framework = self._extractor.framework self.algorithm = self._extractor.algorithm self.version = self._extractor.version self.hyperparameter = self._extractor.hyperparameter @staticmethod def _handle_model_file_name( as_onnx: bool, model_file_name: Optional[str] = DEFAULT_MODEL_FOLDER_NAME ): """ Process folder name for saving model. Parameters ---------- as_onnx: bool To convert to onnx format model_file_name: Optional[str] File name for saving model. Default value is `model`. Returns ------- str Processed file name. (Folder in the case of spark serialization) """ if as_onnx: raise NotImplementedError( "The Spark to Onnx Conversion is not supported because it is unstable. Please set as_onnx to False (default) to perform a spark model serialization" ) if not model_file_name: return DEFAULT_MODEL_FOLDER_NAME return model_file_name
[docs] def serialize_model( self, as_onnx: bool = False, X_sample: Optional[ Union[ Dict, str, List, np.ndarray, pd.core.series.Series, pd.core.frame.DataFrame, "pyspark.sql.DataFrame", "pyspark.pandas.DataFrame", ] ] = None, force_overwrite: bool = False, **kwargs, ) -> None: """ Serialize and save pyspark model using spark serialization. Parameters ---------- force_overwrite: (bool, optional). Defaults to False. If set as True, overwrite serialized model if exists. Returns ------- None """ if as_onnx: raise NotImplementedError( "The Spark to Onnx Conversion is not supported because it is unstable. Please set as_onnx to False (default) to perform a spark model serialization" ) super().serialize_model( as_onnx=as_onnx, force_overwrite=force_overwrite, X_sample=X_sample, **kwargs, )
@runtime_dependency( module="pyspark", short_name="sql", object="sql", install_from=OptionalDependency.SPARK, ) def _prepare_data_for_schema( self, X_sample: Union[List, Tuple, pd.DataFrame, pd.Series, np.ndarray] = None, y_sample: Union[List, Tuple, pd.DataFrame, pd.Series, np.ndarray] = None, **kwargs, ): """Generate Spark Schema and format Spark DataFrame as Pandas for ADS Schema Generation""" input_schema_path = os.path.join( self.artifact_dir, self.model_file_name + SPARK_DATAFRAME_SCHEMA_PATH, ) ( X_sample, data_type, schema, ) = self.get_data_serializer()._serialize_via_spark(X_sample) if not schema: raise TypeError( f"Data type: {data_type} unsupported. Please use `pyspark.sql.DataFrame`, `pyspark.pandas.DataFrame`, or `pandas.DataFrame`." ) storage_options = kwargs.get("auth", {}) with input_schema_path, mode="w", **(storage_options), ) as f: f.write(schema.json()) if isinstance(X_sample, sql.DataFrame): X_sample = X_sample.toPandas() return X_sample, y_sample