Quick Start¶
Data Flow is a hosted Apache Spark server. It is quick to start, and can scale to handle large datasets in parallel. ADS provides a convenient API for creating and maintaining workloads on Data Flow.
Submit a Toy Python Script to Data Flow¶
From a Python Environment¶
Submit a python script to Data Flow entirely from your python environment. The following snippet uses a toy python script that prints “Hello World” followed by the spark version, 3.2.1.
from ads.jobs import DataFlow, Job, DataFlowRuntime
from uuid import uuid4
import os
import tempfile
SCRIPT_CONTENT = """
import pyspark
def main():
print("Hello World")
print("Spark version is", pyspark.__version__)
if __name__ == "__main__":
main()
"""
with tempfile.TemporaryDirectory() as td:
with open(os.path.join(td, "script.py"), "w") as f:
f.write(SCRIPT_CONTENT)
name = f"dataflow-app-{str(uuid4())}"
dataflow_configs = (
DataFlow()
.with_compartment_id("oci.xx.<compartment_id>")
.with_logs_bucket_uri("oci://<mybucket>@<mynamespace>/<dataflow-logs-prefix>")
.with_driver_shape("VM.Standard.E4.Flex")
.with_driver_shape_config(ocpus=2, memory_in_gbs=32)
.with_executor_shape("VM.Standard.E4.Flex")
.with_executor_shape_config(ocpus=4, memory_in_gbs=64)
.with_spark_version("3.2.1")
.with_defined_tag(
**{"Oracle-Tags": {"CreatedBy": "test_name@oracle.com"}}
)
.with_freeform_tag(test_freeform_key="test_freeform_value")
)
runtime_config = (
DataFlowRuntime()
.with_script_uri(os.path.join(td, "script.py"))
.with_script_bucket("oci://<mybucket>@<mynamespace>/<subdir_to_put_and_get_script>")
)
df = Job(name=name, infrastructure=dataflow_configs, runtime=runtime_config)
df.create()
df_run = df.run()
From the Command Line¶
The same result can be achieved from the command line using ads CLI
and a yaml file.
Assuming you have the following two files written in your current directory as script.py
and dataflow.yaml
respectively:
# script.py
import pyspark
def main():
print("Hello World")
print("Spark version is", pyspark.__version__)
if __name__ == "__main__":
main()
# dataflow.yaml
kind: job
spec:
name: dataflow-app-<uuid>
infrastructure:
kind: infrastructure
spec:
compartmentId: oci.xx.<compartment_id>
logsBucketUri: oci://<mybucket>@<mynamespace>/<dataflow-logs-prefix>
driverShape: VM.Standard.E4.Flex
driverShapeConfig:
ocpus: 2
memory_in_gbs: 32
executorShape: VM.Standard.E4.Flex
executorShapeConfig:
ocpus: 4
memory_in_gbs: 64
sparkVersion: 3.2.1
numExecutors: 1
definedTags:
Oracle-Tags:
CreatedBy: test_name@oracle.com
freeformTags:
test_freeform_key: test_freeform_value
type: dataFlow
runtime:
kind: runtime
spec:
scriptUri: script.py
scriptBucket: oci://<mybucket>@<mynamespace>/<subdir_to_put_and_get_script>
ads jobs run -f dataflow.yaml
Real Data Flow Example with Conda Environment¶
From PySpark v3.0.0 and onwards, Data Flow allows a published conda environment as the Spark runtime environment when built with ADS. Data Flow supports published conda environments only. Conda packs are tar’d conda environments. When you publish your own conda packs to object storage, ensure that the Data Flow Resource has access to read the object or bucket. Below is a more built-out example using conda packs:
From a Python Environment¶
from ads.jobs import DataFlow, Job, DataFlowRuntime
from uuid import uuid4
import os
import tempfile
with tempfile.TemporaryDirectory() as td:
with open(os.path.join(td, "script.py"), "w") as f:
f.write(
'''
from pyspark.sql import SparkSession
import click
@click.command()
@click.argument("app_name")
@click.option(
"--limit", "-l", help="max number of row to print", default=10, required=False
)
@click.option("--verbose", "-v", help="print out result in verbose mode", is_flag=True)
def main(app_name, limit, verbose):
Create a Spark session
spark = SparkSession.builder.appName(app_name).getOrCreate()
Load a csv file from dataflow public storage
df = (
spark.read.format("csv")
.option("header", "true")
.option("multiLine", "true")
.load(
"oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv"
)
)
Create a temp view and do some SQL operations
df.createOrReplaceTempView("berlin")
query_result_df = spark.sql(
"""
SELECT
city,
zipcode,
CONCAT(latitude,',', longitude) AS lat_long
FROM berlin
"""
).limit(limit)
# Convert the filtered Spark DataFrame into JSON format
# Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook.
if verbose:
rows = query_result_df.toJSON().collect()
for i, row in enumerate(rows):
print(f"record {i}")
print(row)
if __name__ == "__main__":
main()
'''
)
name = f"dataflow-app-{str(uuid4())}"
dataflow_configs = (
DataFlow()
.with_compartment_id("oci.xx.<compartment_id>")
.with_logs_bucket_uri("oci://<mybucket>@<mynamespace>/<dataflow-logs-prefix>")
.with_driver_shape("VM.Standard.E4.Flex")
.with_driver_shape_config(ocpus=2, memory_in_gbs=32)
.with_executor_shape("VM.Standard.E4.Flex")
.with_executor_shape_config(ocpus=4, memory_in_gbs=64)
.with_spark_version("3.2.1")
.with_defined_tag(
**{"Oracle-Tags": {"CreatedBy": "test_name@oracle.com"}}
)
.with_freeform_tag(test_freeform_key="test_freeform_value")
)
runtime_config = (
DataFlowRuntime()
.with_script_uri(os.path.join(td, "script.py"))
.with_script_bucket("oci://<mybucket>@<mynamespace>/<subdir_to_put_and_get_script>")
.with_custom_conda(uri="oci://<mybucket>@<mynamespace>/<path_to_conda_pack>")
.with_arguments(["run-test", "-v", "-l", "5"])
)
df = Job(name=name, infrastructure=dataflow_configs, runtime=runtime_config)
df.create()
df_run = df.run()
From the Command Line¶
Again, assume you have the following two files written in your current directory as script.py
and dataflow.yaml
respectively:
# script.py
from pyspark.sql import SparkSession
import click
@click.command()
@click.argument("app_name")
@click.option(
"--limit", "-l", help="max number of row to print", default=10, required=False
)
@click.option("--verbose", "-v", help="print out result in verbose mode", is_flag=True)
def main(app_name, limit, verbose):
Create a Spark session
spark = SparkSession.builder.appName(app_name).getOrCreate()
Load a csv file from dataflow public storage
df = (
spark.read.format("csv")
.option("header", "true")
.option("multiLine", "true")
.load(
"oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv"
)
)
Create a temp view and do some SQL operations
df.createOrReplaceTempView("berlin")
query_result_df = spark.sql(
"""
SELECT
city,
zipcode,
CONCAT(latitude,',', longitude) AS lat_long
FROM berlin
"""
).limit(limit)
# Convert the filtered Spark DataFrame into JSON format
# Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook.
if verbose:
rows = query_result_df.toJSON().collect()
for i, row in enumerate(rows):
print(f"record {i}")
print(row)
if __name__ == "__main__":
main()
# dataflow.yaml
kind: job
spec:
name: dataflow-app-<uuid>
infrastructure:
kind: infrastructure
spec:
compartmentId: oci.xx.<compartment_id>
logsBucketUri: oci://<mybucket>@<mynamespace>/<dataflow-logs-prefix>
driverShape: VM.Standard.E4.Flex
driverShapeConfig:
ocpus: 2
memory_in_gbs: 32
executorShape: VM.Standard.E4.Flex
executorShapeConfig:
ocpus: 4
memory_in_gbs: 64
sparkVersion: 3.2.1
numExecutors: 1
definedTags:
Oracle-Tags:
CreatedBy: test_name@oracle.com
freeformTags:
test_freeform_key: test_freeform_value
type: dataFlow
runtime:
kind: runtime
spec:
scriptUri: script.py
scriptBucket: oci://<mybucket>@<mynamespace>/<subdir_to_put_and_get_script>
conda:
uri: oci://<mybucket>@<mynamespace>/<path_to_conda_pack>
type: published
args:
- "run-test"
- "-v"
- "-l"
- "5"
ads jobs run -f dataflow.yaml