Quick Start

Data Flow is a hosted Apache Spark server. It is quick to start, and can scale to handle large datasets in parallel. ADS provides a convenient API for creating and maintaining workloads on Data Flow.

Submit a Toy Python Script to Data Flow

From a Python Environment

Submit a python script to Data Flow entirely from your python environment. The following snippet uses a toy python script that prints “Hello World” followed by the spark version, 3.2.1.

from ads.jobs import DataFlow, Job, DataFlowRuntime
from uuid import uuid4
import os
import tempfile

SCRIPT_CONTENT = """
import pyspark

def main():
    print("Hello World")
    print("Spark version is", pyspark.__version__)

if __name__ == "__main__":
    main()
"""

with tempfile.TemporaryDirectory() as td:
    with open(os.path.join(td, "script.py"), "w") as f:
        f.write(SCRIPT_CONTENT)
    name = f"dataflow-app-{str(uuid4())}"
    dataflow_configs = (
        DataFlow()
        .with_compartment_id("oci.xx.<compartment_id>")
        .with_logs_bucket_uri("oci://<mybucket>@<mynamespace>/<dataflow-logs-prefix>")
        .with_driver_shape("VM.Standard.E4.Flex")
        .with_driver_shape_config(ocpus=2, memory_in_gbs=32)
        .with_executor_shape("VM.Standard.E4.Flex")
        .with_executor_shape_config(ocpus=4, memory_in_gbs=64)
        .with_spark_version("3.2.1")
        .with_defined_tag(
            **{"Oracle-Tags": {"CreatedBy": "test_name@oracle.com"}}
        )
        .with_freeform_tag(test_freeform_key="test_freeform_value")
    )
    runtime_config = (
        DataFlowRuntime()
        .with_script_uri(os.path.join(td, "script.py"))
        .with_script_bucket("oci://<mybucket>@<mynamespace>/<subdir_to_put_and_get_script>")
    )
    df = Job(name=name, infrastructure=dataflow_configs, runtime=runtime_config)
    df.create()
    df_run = df.run()

From the Command Line

The same result can be achieved from the command line using ads CLI and a yaml file.

Assuming you have the following two files written in your current directory as script.py and dataflow.yaml respectively:

# script.py
import pyspark
def main():
    print("Hello World")
    print("Spark version is", pyspark.__version__)
if __name__ == "__main__":
    main()
# dataflow.yaml
kind: job
spec:
    name: dataflow-app-<uuid>
    infrastructure:
        kind: infrastructure
        spec:
            compartmentId: oci.xx.<compartment_id>
            logsBucketUri: oci://<mybucket>@<mynamespace>/<dataflow-logs-prefix>
            driverShape: VM.Standard.E4.Flex
            driverShapeConfig:
              ocpus: 2
              memory_in_gbs: 32
            executorShape: VM.Standard.E4.Flex
            executorShapeConfig:
              ocpus: 4
              memory_in_gbs: 64
            sparkVersion: 3.2.1
            numExecutors: 1
            definedTags:
              Oracle-Tags:
                CreatedBy: test_name@oracle.com
            freeformTags:
              test_freeform_key: test_freeform_value
        type: dataFlow
    runtime:
        kind: runtime
        spec:
            scriptUri: script.py
            scriptBucket: oci://<mybucket>@<mynamespace>/<subdir_to_put_and_get_script>
ads jobs run -f dataflow.yaml

Real Data Flow Example with Conda Environment

From PySpark v3.0.0 and onwards, Data Flow allows a published conda environment as the Spark runtime environment when built with ADS. Data Flow supports published conda environments only. Conda packs are tar’d conda environments. When you publish your own conda packs to object storage, ensure that the Data Flow Resource has access to read the object or bucket. Below is a more built-out example using conda packs:

From a Python Environment

from ads.jobs import DataFlow, Job, DataFlowRuntime
from uuid import uuid4
import os
import tempfile

with tempfile.TemporaryDirectory() as td:
    with open(os.path.join(td, "script.py"), "w") as f:
        f.write(
'''
from pyspark.sql import SparkSession
import click

@click.command()
@click.argument("app_name")
@click.option(
    "--limit", "-l", help="max number of row to print", default=10, required=False
)
@click.option("--verbose", "-v", help="print out result in verbose mode", is_flag=True)
def main(app_name, limit, verbose):
    Create a Spark session
    spark = SparkSession.builder.appName(app_name).getOrCreate()

    Load a csv file from dataflow public storage
    df = (
        spark.read.format("csv")
        .option("header", "true")
        .option("multiLine", "true")
        .load(
            "oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv"
        )
    )

    Create a temp view and do some SQL operations
    df.createOrReplaceTempView("berlin")
    query_result_df = spark.sql(
        """
        SELECT
            city,
            zipcode,
            CONCAT(latitude,',', longitude) AS lat_long
        FROM berlin
        """
    ).limit(limit)

    # Convert the filtered Spark DataFrame into JSON format
    # Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook.
    if verbose:
        rows = query_result_df.toJSON().collect()
        for i, row in enumerate(rows):
            print(f"record {i}")
            print(row)

if __name__ == "__main__":
    main()
'''
    )
    name = f"dataflow-app-{str(uuid4())}"
    dataflow_configs = (
        DataFlow()
        .with_compartment_id("oci.xx.<compartment_id>")
        .with_logs_bucket_uri("oci://<mybucket>@<mynamespace>/<dataflow-logs-prefix>")
        .with_driver_shape("VM.Standard.E4.Flex")
        .with_driver_shape_config(ocpus=2, memory_in_gbs=32)
        .with_executor_shape("VM.Standard.E4.Flex")
        .with_executor_shape_config(ocpus=4, memory_in_gbs=64)
        .with_spark_version("3.2.1")
        .with_defined_tag(
            **{"Oracle-Tags": {"CreatedBy": "test_name@oracle.com"}}
        )
        .with_freeform_tag(test_freeform_key="test_freeform_value")
    )
    runtime_config = (
        DataFlowRuntime()
        .with_script_uri(os.path.join(td, "script.py"))
        .with_script_bucket("oci://<mybucket>@<mynamespace>/<subdir_to_put_and_get_script>")
        .with_custom_conda(uri="oci://<mybucket>@<mynamespace>/<path_to_conda_pack>")
        .with_arguments(["run-test", "-v", "-l", "5"])
    )
    df = Job(name=name, infrastructure=dataflow_configs, runtime=runtime_config)
    df.create()
    df_run = df.run()

From the Command Line

Again, assume you have the following two files written in your current directory as script.py and dataflow.yaml respectively:

# script.py
from pyspark.sql import SparkSession
import click

@click.command()
@click.argument("app_name")
@click.option(
    "--limit", "-l", help="max number of row to print", default=10, required=False
)
@click.option("--verbose", "-v", help="print out result in verbose mode", is_flag=True)
def main(app_name, limit, verbose):
    Create a Spark session
    spark = SparkSession.builder.appName(app_name).getOrCreate()

    Load a csv file from dataflow public storage
    df = (
        spark.read.format("csv")
        .option("header", "true")
        .option("multiLine", "true")
        .load(
            "oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv"
        )
    )

    Create a temp view and do some SQL operations
    df.createOrReplaceTempView("berlin")
    query_result_df = spark.sql(
        """
        SELECT
            city,
            zipcode,
            CONCAT(latitude,',', longitude) AS lat_long
        FROM berlin
        """
    ).limit(limit)

    # Convert the filtered Spark DataFrame into JSON format
    # Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook.
    if verbose:
        rows = query_result_df.toJSON().collect()
        for i, row in enumerate(rows):
            print(f"record {i}")
            print(row)


if __name__ == "__main__":
    main()
# dataflow.yaml
kind: job
spec:
    name: dataflow-app-<uuid>
    infrastructure:
        kind: infrastructure
        spec:
            compartmentId: oci.xx.<compartment_id>
            logsBucketUri: oci://<mybucket>@<mynamespace>/<dataflow-logs-prefix>
            driverShape: VM.Standard.E4.Flex
            driverShapeConfig:
                ocpus: 2
                memory_in_gbs: 32
            executorShape: VM.Standard.E4.Flex
            executorShapeConfig:
                ocpus: 4
                memory_in_gbs: 64
            sparkVersion: 3.2.1
            numExecutors: 1
            definedTags:
                Oracle-Tags:
                    CreatedBy: test_name@oracle.com
            freeformTags:
                test_freeform_key: test_freeform_value
        type: dataFlow
    runtime:
        kind: runtime
        spec:
            scriptUri: script.py
            scriptBucket: oci://<mybucket>@<mynamespace>/<subdir_to_put_and_get_script>
            conda:
                uri: oci://<mybucket>@<mynamespace>/<path_to_conda_pack>
                type: published
            args:
                - "run-test"
                - "-v"
                - "-l"
                - "5"
ads jobs run -f dataflow.yaml