Running your Spark Application on OCI Data Flow

Submit your code to Data Flow for workloads that require larger resources.

Notebook Extension

For most Notebook users, local or OCI Notebook Sessions, the notebook extension is the most straightforward integration with dataflow. It’s a “Set It and Forget It” API with options to update ad-hoc. You can configure your dataflow runs by running ads opctl configure in the terminal.

After setting up your dataflow config, you can return to the Notebook. Import ads and DataFlowConfig:

import ads
from ads.jobs.utils import DataFlowConfig

Load the dataflow extension inside the notebook cell -

%load_ext ads.jobs.extension

Define config. If you have not yet configured your dataflow setting, or would like to amend the defaults, you can modify as shown below:

dataflow_config = DataFlowConfig()
dataflow_config.compartment_id = "ocid1.compartment.<your compartment ocid>"
dataflow_config.driver_shape = "VM.Standard.E4.Flex"
dataflow_config.driver_shape_config = oci.data_flow.models.ShapeConfig(ocpus=2, memory_in_gbs=32)
dataflow_config.executor_shape = "VM.Standard.E4.Flex"
dataflow_config.executor_shape_config = oci.data_flow.models.ShapeConfig(ocpus=4, memory_in_gbs=64)
dataflow_config.logs_bucket_uri = "oci://<my-bucket>@<my-tenancy>/"
dataflow_config.spark_version = "3.2.1"
dataflow_config.configuration = {"spark.driver.memory": "512m"}
dataflow_config.private_endpoint_id = "ocid1.dataflowprivateendpoint.oc1.iad.<your private endpoint ocid>"
# For using Data Flow Pools
# dataflow_config.poolId = "ocid1.dataflowpool.oc1..<unique_ocid>"

Use the config defined above to submit the cell.

Tip :class: note

Get more information about the dataflow extension by running %dataflow -h

Call the dataflow magic command in the first line of your cell to run it on dataflow.

%%dataflow run -f a_script.py -c {dataflow_config} -w -o -- abc -l 5 -v

This header will: - save the cell as a file called script.py and store it in your dataflow_config.script_bucket - After the -- notation, all parameters are sent to your script. For example abc is a positional argument, and l and v are named arguments.

Below is a full example:

%%dataflow run -f a_script.py -c {dataflow_config} -w -o -- abc -l 5 -v
from pyspark.sql import SparkSession
import click


@click.command()
@click.argument("app_name")
@click.option(
    "--limit", "-l", help="max number of rows to print", default=10, required=False
)
@click.option("--verbose", "-v", help="print out result in verbose mode", is_flag=True)
def main(app_name, limit, verbose):
    # Create a Spark session
    spark = SparkSession.builder.appName(app_name).getOrCreate()

    # Load a csv file from dataflow public storage
    df = (
        spark.read.format("csv")
        .option("header", "true")
        .option("multiLine", "true")
        .load(
            "oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv"
        )
    )

    # Create a temp view and do some SQL operations
    df.createOrReplaceTempView("berlin")
    query_result_df = spark.sql(
        """
        SELECT
            city,
            zipcode,
            CONCAT(latitude,',', longitude) AS lat_long
        FROM berlin
    """
    ).limit(limit)

    # Convert the filtered Spark DataFrame into JSON format
    # Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook.
    if verbose:
        rows = query_result_df.toJSON().collect()
        for i, row in enumerate(rows):
            print(f"record {i}")
            print(row)


if __name__ == "__main__":
    main()

ADS CLI

Prerequisites

  1. Install ADS CLI

  2. Configure Defaults

Tip

If, for some reason, you are unable to use CLI, instead skip to the Create, Run Data Flow Application Using ADS Python SDK section below.

Sometimes your code is too complex to run in a single cell, and it’s better run as a notebook or file. In that case, use the ADS Opctl CLI.

To submit your notebook to Data Flow using the ads CLI, run:

ads opctl run -s <folder where notebook is located> -e <notebook name> -b dataflow

Tip :class: note

You can avoid running cells that are not DataFlow environment compatible by tagging the cells and then providing the tag names to ignore. In the following example cells that are tagged ignore and remove will be ignored - --exclude-tag ignore --exclude-tag remove

Tip :class: note

You can run the notebook in your local pyspark environment before submitting to DataFlow using the same CLI with -b local

# Activate the Pyspark conda environment in local
ads opctl run -s <notebook directory> -e <notebook file> -b local

You could submit a notebook using ADS SDK APIs. Here is an example to submit a notebook -

from ads.jobs import Job, DataFlow, DataFlowNotebookRuntime

df = (
    DataFlow()
    .with_compartment_id(
        "ocid1.compartment.oc1.<your compartment id>"
    )
    .with_driver_shape("VM.Standard.E4.Flex")
    .with_driver_shape_config(ocpus=2, memory_in_gbs=32)
    .with_executor_shape("VM.Standard.E4.Flex")
    .with_executor_shape_config(ocpus=4, memory_in_gbs=64)
    .with_logs_bucket_uri("oci://mybucket@mytenancy/")
    .with_private_endpoint_id("ocid1.dataflowprivateendpoint.oc1.iad.<your private endpoint ocid>")
    .with_configuration({
        "spark.driverEnv.myEnvVariable": "value1",
        "spark.executorEnv.myEnvVariable": "value2",
    })
)
rt = (
    DataFlowNotebookRuntime()
    .with_notebook(
        "<path to notebook>"
    )  # This could be local path or http path to notebook ipynb file
    .with_script_bucket("<my-bucket>")
    .with_exclude_tag(["ignore", "remove"])  # Cells to Ignore
)
job = Job(infrastructure=df, runtime=rt).create(overwrite=True)
df_run = job.run(wait=True)

ADS Python SDK

To create a Data Flow application using the ADS Python API you need two components:

  • DataFlow, a subclass of Infrastructure.

  • DataFlowRuntime, a subclass of Runtime.

DataFlow stores properties specific to Data Flow service, such as compartment_id, logs_bucket_uri, and so on. You can set them using the with_{property} functions:

  • with_compartment_id

  • with_configuration

  • with_driver_shape

  • with_driver_shape_config

  • with_executor_shape

  • with_executor_shape_config

  • with_language

  • with_logs_bucket_uri

  • with_metastore_id (doc)

  • with_num_executors

  • with_spark_version

  • with_warehouse_bucket_uri

  • with_private_endpoint_id (doc)

  • with_pool_id (doc)

  • with_defined_tags

  • with_freeform_tags

For more details, see Data Flow class documentation.

DataFlowRuntime stores properties related to the script to be run, such as the path to the script and CLI arguments. Likewise all properties can be set using with_{property}. The DataFlowRuntime properties are:

  • with_script_uri

  • with_script_bucket

  • with_archive_uri (doc)

  • with_archive_bucket

  • with_custom_conda

  • with_configuration

For more details, see the runtime class documentation.

Since service configurations remain mostly unchanged across multiple experiments, a DataFlow object can be reused and combined with various DataFlowRuntime parameters to create applications.

In the following “hello-world” example, DataFlow is populated with compartment_id, driver_shape, driver_shape_config, executor_shape, executor_shape_config , spark_version, defined_tags and freeform_tags. DataFlowRuntime is populated with script_uri and script_bucket. The script_uri specifies the path to the script. It can be local or remote (an Object Storage path). If the path is local, then script_bucket must be specified additionally because Data Flow requires a script to be available in Object Storage. ADS performs the upload step for you, as long as you give the bucket name or the Object Storage path prefix to upload the script. Either can be given to script_bucket. For example,  either with_script_bucket("<bucket_name>") or with_script_bucket("oci://<bucket_name>@<namespace>/<prefix>") is accepted. In the next example, the prefix is given for script_bucket.

from ads.jobs import DataFlow, Job, DataFlowRuntime
from uuid import uuid4
import os
import tempfile

with tempfile.TemporaryDirectory() as td:
    with open(os.path.join(td, "script.py"), "w") as f:
        f.write(
            """
import pyspark

def main():
    print("Hello World")
    print("Spark version is", pyspark.__version__)

if __name__ == "__main__":
    main()
        """
        )
    name = f"dataflow-app-{str(uuid4())}"
    dataflow_configs = (
        DataFlow()
        .with_compartment_id("oci.xx.<compartment_id>")
        .with_logs_bucket_uri("oci://mybucket@mynamespace/dflogs")
        .with_driver_shape("VM.Standard.E4.Flex")
        .with_driver_shape_config(ocpus=2, memory_in_gbs=32)
        .with_executor_shape("VM.Standard.E4.Flex")
        .with_executor_shape_config(ocpus=4, memory_in_gbs=64)
        .with_spark_version("3.0.2")
        # For using Data Flow Pool
        # .with_pool_id("ocid1.dataflowpool.oc1..<unique_ocid>")
        .with_defined_tag(
            **{"Oracle-Tags": {"CreatedBy": "test_name@oracle.com"}}
        )
        .with_freeform_tag(test_freeform_key="test_freeform_value")
    )
    runtime_config = (
        DataFlowRuntime()
        .with_script_uri(os.path.join(td, "script.py"))
        .with_script_bucket("oci://mybucket@namespace/prefix")
        .with_custom_conda("oci://<mybucket>@<mynamespace>/<path/to/conda_pack>")
        .with_configuration({
            "spark.driverEnv.myEnvVariable": "value1",
            "spark.executorEnv.myEnvVariable": "value2",
        })
    )
    df = Job(name=name, infrastructure=dataflow_configs, runtime=runtime_config)
    df.create()

To run this application, you could use:

df_run = df.run()

After the run completes, check the stdout log from the application by running:

print(df_run.logs.application.stdout)

You should this in the log:

Hello World
Spark version is 3.0.2

Note on Policy

ALLOW SERVICE dataflow TO READ objects IN tenancy WHERE target.bucket.name='dataflow-logs'

Data Flow supports adding third-party libraries using a ZIP file, usually called archive.zip, see the Data Flow documentation about how to create ZIP files. Similar to scripts, you can specify an archive ZIP for a Data Flow application using with_archive_uri. In the next example, archive_uri is given as an Object Storage location. archive_uri can also be local so you must specify with_archive_bucket and follow the same rule as with_script_bucket.

from ads.jobs import DataFlow, DataFlowRun, DataFlowRuntime, Job
from uuid import uuid4
import tempfile
import os

with tempfile.TemporaryDirectory() as td:
    with open(os.path.join(td, "script.py"), "w") as f:
        f.write(
            '''
from pyspark.sql import SparkSession
import click


@click.command()
@click.argument("app_name")
@click.option(
    "--limit", "-l", help="max number of row to print", default=10, required=False
)
@click.option("--verbose", "-v", help="print out result in verbose mode", is_flag=True)
def main(app_name, limit, verbose):
    # Create a Spark session
    spark = SparkSession.builder.appName(app_name).getOrCreate()

    # Load a csv file from dataflow public storage
    df = (
        spark.read.format("csv")
        .option("header", "true")
        .option("multiLine", "true")
        .load(
            "oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv"
        )
    )

    # Create a temp view and do some SQL operations
    df.createOrReplaceTempView("berlin")
    query_result_df = spark.sql(
        """
        SELECT
            city,
            zipcode,
            CONCAT(latitude,',', longitude) AS lat_long
        FROM berlin
    """
    ).limit(limit)

    # Convert the filtered Spark DataFrame into JSON format
    # Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook.
    if verbose:
        rows = query_result_df.toJSON().collect()
        for i, row in enumerate(rows):
            print(f"record {i}")
            print(row)


if __name__ == "__main__":
    main()
        '''
        )

    name = f"dataflow-app-{str(uuid4())}"
    dataflow_configs = (
        DataFlow()
        .with_compartment_id("oci1.xxx.<compartment_ocid>")
        .with_logs_bucket_uri("oci://mybucket@mynamespace/prefix")
        .with_driver_shape("VM.Standard.E4.Flex")
        .with_driver_shape_config(ocpus=2, memory_in_gbs=32)
        .with_executor_shape("VM.Standard.E4.Flex")
        .with_executor_shape_config(ocpus=4, memory_in_gbs=64)
        .with_spark_version("3.0.2")
        .with_configuration({
            "spark.driverEnv.myEnvVariable": "value1",
            "spark.executorEnv.myEnvVariable": "value2",
        })
        .with_defined_tag(
            **{"Oracle-Tags": {"CreatedBy": "test_name@oracle.com"}}
        )
        .with_freeform_tag(test_freeform_key="test_freeform_value")
    )
    runtime_config = (
        DataFlowRuntime()
        .with_script_uri(os.path.join(td, "script.py"))
        .with_script_bucket("oci://<bucket>@<namespace>/prefix/path")
        .with_archive_uri("oci://<bucket>@<namespace>/prefix/archive.zip")
        .with_custom_conda(uri="oci://<mybucket>@<mynamespace>/<my-conda-uri>")
    )
    df = Job(name=name, infrastructure=dataflow_configs, runtime=runtime_config)
    df.create()

You can pass arguments to a Data Flow run as a list of strings:

df_run = df.run(args=["run-test", "-v", "-l", "5"])

You can save the application specification into a YAML file for future reuse. You could also use the json format.

print(df.to_yaml("sample-df.yaml"))

You can also load a Data Flow application directly from the YAML file saved in the previous example:

df2 = Job.from_yaml(uri="sample-df.yaml")

Creating a new job and a run:

df_run2 = df2.create().run()

Deleting a job cancels associated runs:

df2.delete()
df_run2.status

You can also load a Data Flow application from an OCID:

df3 = Job.from_dataflow_job(df.id)

Creating a run under the same application:

df_run3 = df3.run()

Now there are 2 runs under the df application:

assert len(df.run_list()) == 2

When you run a Data Flow application, a DataFlowRun object is created. You can check the status, wait for a run to finish, check its logs afterwards, or cancel a run in progress. For example:

df_run.status
df_run.wait()

watch is an alias of wait, so you can also call df_run.watch().

There are three types of logs for a run:

  • application log

  • driver log

  • executor log

Each log consists of stdout and stderr. For example, to access stdout from application log, you could use:

df_run.logs.application.stdout

Then you could check it with:

df_run.logs.application.stderr
df_run.logs.executor.stdout
df_run.logs.executor.stderr

You can also examine head or tail of the log, or download it to a local path. For example,

log = df_run.logs.application.stdout
log.head(n=1)
log.tail(n=1)
log.download(<local-path>)

For the sample script, the log prints first five rows of a sample dataframe in JSON and it looks like:

record 0
{"city":"Berlin","zipcode":"10119","lat_long":"52.53453732241747,13.402556926822387"}
record 1
{"city":"Berlin","zipcode":"10437","lat_long":"52.54851279221664,13.404552826587466"}
record 2
{"city":"Berlin","zipcode":"10405","lat_long":"52.534996191586714,13.417578665333295"}
record 3
{"city":"Berlin","zipcode":"10777","lat_long":"52.498854933130026,13.34906453348717"}
record 4
{"city":"Berlin","zipcode":"10437","lat_long":"52.5431572633131,13.415091104515707"}

Calling log.head(n=1) returns this:

'record 0'

Calling log.tail(n=1) returns this:

{"city":"Berlin","zipcode":"10437","lat_long":"52.5431572633131,13.415091104515707"}

A link to run the page in the OCI Console is given using the run_details_link property:

df_run.run_details_link

To list Data Flow applications, a compartment id must be given with any optional filtering criteria. For example, you can filter by name of the application:

Job.dataflow_job(compartment_id=compartment_id, display_name=name)

YAML

You can create a Data Flow job directly from a YAML string. You can pass a YAML string into the Job.from_yaml() function to build a Data Flow job:

kind: job
spec:
  id: <dataflow_app_ocid>
  infrastructure:
    kind: infrastructure
    spec:
      compartmentId: <compartment_id>
      driverShape: VM.Standard.E4.Flex
      driverShapeConfig:
        ocpus: 2
        memory_in_gbs: 32
      executorShape: VM.Standard.E4.Flex
      executorShapeConfig:
        ocpus: 4
        memory_in_gbs: 64
      id: <dataflow_app_ocid>
      language: PYTHON
      logsBucketUri: <logs_bucket_uri>
      numExecutors: 1
      sparkVersion: 3.2.1
      privateEndpointId: <private_endpoint_ocid>
      poolId: <dataflow_pool_ocid>
      definedTags:
        Oracle-Tags:
          CreatedBy: test_name@oracle.com
      freeformTags:
        test_freeform_key: test_freeform_value
    type: dataFlow
  name: dataflow_app_name
  runtime:
    kind: runtime
    spec:
      configuration:
          spark.driverEnv.myEnvVariable: value1
          spark.executorEnv.myEnvVariable: value2
      scriptBucket: bucket_name
      scriptPathURI: oci://<bucket_name>@<namespace>/<prefix>
    type: dataFlow

Data Flow Infrastructure YAML Schema

kind:
    allowed:
        - infrastructure
    required: true
    type: string
spec:
    required: true
    type: dict
    schema:
        compartmentId:
            required: false
            type: string
        displayName:
            required: false
            type: string
        driverShape:
            required: false
            type: string
        driverShapeConfig:
            required: false
            type: dict
            schema:
                ocpus:
                    required: true
                    type: float
                memory_in_gbs:
                    required: true
                    type: float
        executorShape:
            required: false
            type: string
        executorShapeConfig:
            required: false
            type: dict
            schema:
                ocpus:
                    required: true
                    type: float
                memory_in_gbs:
                    required: true
                    type: float
        id:
            required: false
            type: string
        language:
            required: false
            type: string
        logsBucketUri:
            required: false
            type: string
        metastoreId:
            required: false
            type: string
        numExecutors:
            required: false
            type: integer
        sparkVersion:
            required: false
            type: string
        privateEndpointId:
            required: false
            type: string
        poolId:
            required: false
            type: string
        configuration:
            required: false
            type: dict
        definedTags:
            required: false
            type: dict
        freeformTags:
            required: false
            type: dict
type:
    allowed:
        - dataFlow
    required: true
    type: string

Data Flow Runtime YAML Schema

kind:
    allowed:
        - runtime
    required: true
    type: string
spec:
    required: true
    type: dict
    schema:
        archiveBucket:
            required: false
            type: string
        archiveUri:
            required: false
            type: string
        args:
            nullable: true
            required: false
            schema:
                type: string
            type: list
        conda:
            nullable: false
            required: false
            type: dict
            schema:
                slug:
                    required: true
                    type: string
                type:
                    allowed:
                        - service
                    required: true
                    type: string
        configuration:
            required: false
            type: dict
        definedTags:
            required: false
            type: dict
        freeformTags:
            required: false
            type: dict
        scriptBucket:
            required: false
            type: string
        scriptPathURI:
            required: false
            type: string
type:
    allowed:
        - dataFlow
    required: true
    type: string