.. SparkPipelineModel: SparkPipelineModel ******************* See `API Documentation <../../../ads.model.framework.html#ads.model.framework.spark_model.SparkPipelineModel>`__ Overview ======== The ``SparkPipelineModel`` class in ADS is designed to allow you to rapidly get a PySpark model into production. The ``.prepare()`` method creates the model artifacts that are needed to deploy a functioning model without you having to configure it or write code. However, you can customize the required ``score.py`` file. .. include:: ../_template/overview.rst The following steps take your trained ``PySpark`` model and deploy it into production with a few lines of code. **Create a Spark Pipeline Model** Generate a synthetic dataset: .. code-block:: python3 from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .getOrCreate() training = spark.createDataFrame( [ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0), ], ["id", "text", "label"], ) test = spark.createDataFrame( [ (4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop"), ], ["id", "text"], ) Create a Spark Pipeline. (Note that a Spark Pipeline can be made with just 1 stage.) .. code-block:: python3 from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.001) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) model = pipeline.fit(training) Prepare Model Artifact ====================== .. code-block:: python3 import tempfile from ads.model.framework.spark_model import SparkPipelineModel from ads.common.model_metadata import UseCaseType artifact_dir=tempfile.mkdtemp() spark_model = SparkPipelineModel(estimator=model, artifact_dir=artifact_dir) spark_model.prepare(inference_conda_env="pyspark32_p38_cpu_v2", X_sample=training, force_overwrite=True, use_case_type=UseCaseType.BINARY_CLASSIFICATION) Instantiate a ``SparkPipelineModel()`` object with a PySpark model. Each instance accepts the following parameters: * ``artifact_dir: str``. Artifact directory to store the files needed for deployment. * ``auth: (Dict, optional)``: Defaults to ``None``. The default authentication is set using the ``ads.set_auth`` API. To override the default, use ``ads.common.auth.api_keys()`` or ``ads.common.auth.resource_principal()`` and create the appropriate authentication signer and the ``**kwargs`` required to instantiate the ``IdentityClient`` object. * ``estimator: Callable``. Any model object generated by the PySpark framework. * ``properties: (ModelProperties, optional)``. Defaults to ``None``. The ``ModelProperties`` object required to save and deploy model. .. include:: ../_template/initialize.rst Summary Status ============== .. include:: ../_template/summary_status.rst .. figure:: ../figures/summary_status.png :align: center Register Model ============== .. code-block:: python3 model_id = spark_model.save() Start loading model.joblib from model directory /tmp/tmphdo8dfn3 ... Model is successfully loaded. ['input_schema.json', 'runtime.yaml', 'model_input_data_schema.json', 'model', 'score.py'] 'ocid1.datasciencemodel.oc1.xxx.xxxxx' Deploy and Generate Endpoint ============================ .. code-block:: python3 # Deploy and create an endpoint for the Spark Pipeline model spark_model.deploy( display_name="Spark Pipeline Model For Classification", deployment_log_group_id="ocid1.loggroup.oc1.xxx.xxxxx", deployment_access_log_id="ocid1.log.oc1.xxx.xxxxx", deployment_predict_log_id="ocid1.log.oc1.xxx.xxxxx", # Shape config details mandatory for flexible shapes: # deployment_instance_shape="VM.Standard.E4.Flex", # deployment_ocpus=, # deployment_memory_in_gbs=, ) print(f"Endpoint: {spark_model.model_deployment.url}") # Output: "Endpoint: https://modeldeployment.{region}.oci.customer-oci.com/ocid1.datasciencemodeldeployment.oc1.xxx.xxxxx" Run Prediction against Endpoint =============================== .. code-block:: python3 spark_model.predict(test)['prediction'] # [0.0, 0.0, 1.0, 0.0] Run Prediction with oci raw-request command =========================================== Model deployment endpoints can be invoked with the OCI-CLI. This example invokes a model deployment with the CLI with a ``json`` payload: `json` payload example ---------------------- .. code-block:: python3 >>> # Prepare data sample for prediction and save it to file 'payload' >>> print(json.dumps(test.toJSON().collect())) ["{\"id\":4,\"text\":\"spark i j k\"}", "{\"id\":5,\"text\":\"l m n\"}", "{\"id\":6,\"text\":\"spark hadoop spark\"}", "{\"id\":7,\"text\":\"apache hadoop\"}"] Use printed output of the data and endpoint to invoke prediction with raw-request command in terminal: .. code-block:: bash export uri=https://modeldeployment.{region}.oci.customer-oci.com/ocid1.datasciencemodeldeployment.oc1.xxx.xxxxx/predict export data='{"data": ["{\"id\":4,\"text\":\"spark i j k\"}", ... "{\"id\":7,\"text\":\"apache hadoop\"}"]}' oci raw-request \ --http-method POST \ --target-uri $uri \ --request-body "$data" Expected output of raw-request command -------------------------------------- .. code-block:: bash { "data": { "prediction": [ 0.0, 0.0, 1.0, 0.0 ] }, "headers": { "Connection": "keep-alive", "Content-Length": "32", "Content-Type": "application/json", "Date": "Thu, 08 Dec 2022 18:45:12 GMT", "X-Content-Type-Options": "nosniff", "opc-request-id": "C2E73B1679B34BAD8358B49D20619055/0EE2E5F93F48142725525D7A5BA7F5FB/049A66AA38AA0163DBBC70F225285851", "server": "uvicorn" }, "status": "200 OK" } Example ======= Adapted from an example provided by Apache in the PySpark API Reference Documentation. .. code-block:: python3 import tempfile from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.sql import SparkSession from ads.model.framework.spark_model import SparkPipelineModel from ads.common.model_metadata import UseCaseType spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .getOrCreate() artifact_dir=tempfile.mkdtemp() training = spark.createDataFrame( [ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0), ], ["id", "text", "label"], ) test = spark.createDataFrame( [ (4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop"), ], ["id", "text"], ) tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.001) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) model = pipeline.fit(training) spark_model = SparkPipelineModel(estimator=model, artifact_dir=artifact_dir) spark_model.prepare(inference_conda_env="pyspark32_p38_cpu_v2", X_sample=training, force_overwrite=True, use_case_type=UseCaseType.BINARY_CLASSIFICATION) # Check if the artifacts are generated correctly. # The verify method invokes the ``predict`` function defined inside ``score.py`` in the artifact_dir prediction = spark_model.verify(test) # Register the model spark_model.save(display_name="Spark Pipeline Model") # Deploy and create an endpoint for the Spark model spark_model.deploy( display_name="Spark Pipeline Model For Classification", deployment_log_group_id="ocid1.loggroup.oc1.xxx.xxxxx", deployment_access_log_id="ocid1.log.oc1.xxx.xxxxx", deployment_predict_log_id="ocid1.log.oc1.xxx.xxxxx", ) # Generate prediction by invoking the deployed endpoint spark_model.predict(test)["prediction"] # To delete the deployed endpoint uncomment the line below # spark_model.delete_deployment(wait_for_completion=True)