SparkPipelineModel

See API Documentation

Overview

The SparkPipelineModel class in ADS is designed to allow you to rapidly get a PySpark model into production. The .prepare() method creates the model artifacts that are needed to deploy a functioning model without you having to configure it or write code. However, you can customize the required score.py file.

The .verify() method simulates a model deployment by calling the load_model() and predict() methods in the score.py file. With the .verify() method, you can debug your score.py file without deploying any models. The .save() method deploys a model artifact to the model catalog. The .deploy() method deploys a model to a REST endpoint.

The following steps take your trained PySpark model and deploy it into production with a few lines of code.

Create a Spark Pipeline Model

Generate a synthetic dataset:

from pyspark.sql import SparkSession


spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()
training = spark.createDataFrame(
    [
        (0, "a b c d e spark", 1.0),
        (1, "b d", 0.0),
        (2, "spark f g h", 1.0),
        (3, "hadoop mapreduce", 0.0),
    ],
    ["id", "text", "label"],
)
test = spark.createDataFrame(
    [
        (4, "spark i j k"),
        (5, "l m n"),
        (6, "spark hadoop spark"),
        (7, "apache hadoop"),
    ],
    ["id", "text"],
)

Create a Spark Pipeline. (Note that a Spark Pipeline can be made with just 1 stage.)

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)

Prepare Model Artifact

import tempfile
from ads.model.framework.spark_model import SparkPipelineModel
from ads.common.model_metadata import UseCaseType

artifact_dir=tempfile.mkdtemp()
spark_model = SparkPipelineModel(estimator=model, artifact_dir=artifact_dir)

spark_model.prepare(inference_conda_env="pyspark32_p38_cpu_v2",
                    X_sample=training,
                    force_overwrite=True,
                    use_case_type=UseCaseType.BINARY_CLASSIFICATION)

Instantiate a SparkPipelineModel() object with a PySpark model. Each instance accepts the following parameters:

  • artifact_dir: str. Artifact directory to store the files needed for deployment.

  • auth: (Dict, optional): Defaults to None. The default authentication is set using the ads.set_auth API. To override the default, use ads.common.auth.api_keys() or ads.common.auth.resource_principal() and create the appropriate authentication signer and the **kwargs required to instantiate the IdentityClient object.

  • estimator: Callable. Any model object generated by the PySpark framework.

  • properties: (ModelProperties, optional). Defaults to None. The ModelProperties object required to save and deploy model.

The properties is an instance of the ModelProperties class and has the following predefined fields:

  • bucket_uri: str

  • compartment_id: str

  • deployment_access_log_id: str

  • deployment_bandwidth_mbps: int

  • deployment_instance_count: int

  • deployment_instance_shape: str

  • deployment_log_group_id: str

  • deployment_predict_log_id: str

  • deployment_memory_in_gbs: Union[float, int]

  • deployment_ocpus: Union[float, int]

  • inference_conda_env: str

  • inference_python_version: str

  • overwrite_existing_artifact: bool

  • project_id: str

  • remove_existing_artifact: bool

  • training_conda_env: str

  • training_id: str

  • training_python_version: str

  • training_resource_id: str

  • training_script_path: str

By default, properties is populated from the environment variables when not specified. For example, in notebook sessions the environment variables are preset and stored in project id (PROJECT_OCID) and compartment id (NB_SESSION_COMPARTMENT_OCID). So properties populates these environment variables, and uses the values in methods such as .save() and .deploy(). Pass in values to overwrite the defaults. When you use a method that includes an instance of properties, then properties records the values that you pass in. For example, when you pass inference_conda_env into the .prepare() method, then properties records the value. To reuse the properties file in different places, you can export the properties file using the .to_yaml() method then reload it into a different machine using the .from_yaml() method.

Summary Status

You can call the .summary_status() method after a model serialization instance such as GenericModel, SklearnModel, TensorFlowModel, or PyTorchModel is created. The .summary_status() method returns a Pandas dataframe that guides you through the entire workflow. It shows which methods are available to call and which ones aren’t. Plus it outlines what each method does. If extra actions are required, it also shows those actions.

The following image displays an example summary status table created after a user initiates a model instance. The table’s Step column displays a Status of Done for the initiate step. And the Details column explains what the initiate step did such as generating a score.py file. The Step column also displays the prepare(), verify(), save(), deploy(), and predict() methods for the model. The Status column displays which method is available next. After the initiate step, the prepare() method is available. The next step is to call the prepare() method.

../../../_images/summary_status.png

Register Model

model_id = spark_model.save()

Start loading model.joblib from model directory /tmp/tmphdo8dfn3 ...
Model is successfully loaded.
['input_schema.json', 'runtime.yaml', 'model_input_data_schema.json', 'model', 'score.py']

'ocid1.datasciencemodel.oc1.xxx.xxxxx'

Deploy and Generate Endpoint

# Deploy and create an endpoint for the Spark Pipeline model
spark_model.deploy(
    display_name="Spark Pipeline Model For Classification",
    deployment_log_group_id="ocid1.loggroup.oc1.xxx.xxxxx",
    deployment_access_log_id="ocid1.log.oc1.xxx.xxxxx",
    deployment_predict_log_id="ocid1.log.oc1.xxx.xxxxx",
    # Shape config details mandatory for flexible shapes:
    # deployment_instance_shape="VM.Standard.E4.Flex",
    # deployment_ocpus=<number>,
    # deployment_memory_in_gbs=<number>,
)
print(f"Endpoint: {spark_model.model_deployment.url}")
# Output: "Endpoint: https://modeldeployment.{region}.oci.customer-oci.com/ocid1.datasciencemodeldeployment.oc1.xxx.xxxxx"

Run Prediction against Endpoint

spark_model.predict(test)['prediction']
# [0.0, 0.0, 1.0, 0.0]

Run Prediction with oci raw-request command

Model deployment endpoints can be invoked with the OCI-CLI. This example invokes a model deployment with the CLI with a json payload:

json payload example

>>> # Prepare data sample for prediction and save it to file 'payload'
>>> print(json.dumps(test.toJSON().collect()))
["{\"id\":4,\"text\":\"spark i j k\"}", "{\"id\":5,\"text\":\"l m n\"}",
"{\"id\":6,\"text\":\"spark hadoop spark\"}", "{\"id\":7,\"text\":\"apache hadoop\"}"]

Use printed output of the data and endpoint to invoke prediction with raw-request command in terminal:

export uri=https://modeldeployment.{region}.oci.customer-oci.com/ocid1.datasciencemodeldeployment.oc1.xxx.xxxxx/predict
export data='{"data": ["{\"id\":4,\"text\":\"spark i j k\"}", ... "{\"id\":7,\"text\":\"apache hadoop\"}"]}'
oci raw-request \
    --http-method POST \
    --target-uri $uri \
    --request-body "$data"

Expected output of raw-request command

{
  "data": {
    "prediction": [
      0.0,
      0.0,
      1.0,
      0.0
    ]
  },
  "headers": {
    "Connection": "keep-alive",
    "Content-Length": "32",
    "Content-Type": "application/json",
    "Date": "Thu, 08 Dec 2022 18:45:12 GMT",
    "X-Content-Type-Options": "nosniff",
    "opc-request-id": "C2E73B1679B34BAD8358B49D20619055/0EE2E5F93F48142725525D7A5BA7F5FB/049A66AA38AA0163DBBC70F225285851",
    "server": "uvicorn"
  },
  "status": "200 OK"
}

Example

Adapted from an example provided by Apache in the PySpark API Reference Documentation.

import tempfile
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import SparkSession
from ads.model.framework.spark_model import SparkPipelineModel
from ads.common.model_metadata import UseCaseType

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

artifact_dir=tempfile.mkdtemp()

training = spark.createDataFrame(
    [
        (0, "a b c d e spark", 1.0),
        (1, "b d", 0.0),
        (2, "spark f g h", 1.0),
        (3, "hadoop mapreduce", 0.0),
    ],
    ["id", "text", "label"],
)

test = spark.createDataFrame(
    [
        (4, "spark i j k"),
        (5, "l m n"),
        (6, "spark hadoop spark"),
        (7, "apache hadoop"),
    ],
    ["id", "text"],
)

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

spark_model = SparkPipelineModel(estimator=model, artifact_dir=artifact_dir)

spark_model.prepare(inference_conda_env="pyspark32_p38_cpu_v2",
                    X_sample=training,
                    force_overwrite=True,
                    use_case_type=UseCaseType.BINARY_CLASSIFICATION)

# Check if the artifacts are generated correctly.
# The verify method invokes the ``predict`` function defined inside ``score.py`` in the artifact_dir
prediction = spark_model.verify(test)


# Register the model
spark_model.save(display_name="Spark Pipeline Model")

# Deploy and create an endpoint for the Spark model
spark_model.deploy(
    display_name="Spark Pipeline Model For Classification",
    deployment_log_group_id="ocid1.loggroup.oc1.xxx.xxxxx",
    deployment_access_log_id="ocid1.log.oc1.xxx.xxxxx",
    deployment_predict_log_id="ocid1.log.oc1.xxx.xxxxx",
)

# Generate prediction by invoking the deployed endpoint
spark_model.predict(test)["prediction"]

# To delete the deployed endpoint uncomment the line below
# spark_model.delete_deployment(wait_for_completion=True)