=========== Quick Start =========== Data Flow is a hosted Apache Spark server. It is quick to start, and can scale to handle large datasets in parallel. ADS provides a convenient API for creating and maintaining workloads on Data Flow. Submit a Toy Python Script to Data Flow ======================================= From a Python Environment ------------------------- Submit a python script to Data Flow entirely from your python environment. The following snippet uses a toy python script that prints "Hello World" followed by the spark version, 3.2.1. .. code-block:: python from ads.jobs import DataFlow, Job, DataFlowRuntime from uuid import uuid4 import os import tempfile SCRIPT_CONTENT = """ import pyspark def main(): print("Hello World") print("Spark version is", pyspark.__version__) if __name__ == "__main__": main() """ with tempfile.TemporaryDirectory() as td: with open(os.path.join(td, "script.py"), "w") as f: f.write(SCRIPT_CONTENT) name = f"dataflow-app-{str(uuid4())}" dataflow_configs = ( DataFlow() .with_compartment_id("oci.xx.") .with_logs_bucket_uri("oci://@/") .with_driver_shape("VM.Standard.E4.Flex") .with_driver_shape_config(ocpus=2, memory_in_gbs=32) .with_executor_shape("VM.Standard.E4.Flex") .with_executor_shape_config(ocpus=4, memory_in_gbs=64) .with_spark_version("3.2.1") .with_defined_tag( **{"Oracle-Tags": {"CreatedBy": "test_name@oracle.com"}} ) .with_freeform_tag(test_freeform_key="test_freeform_value") ) runtime_config = ( DataFlowRuntime() .with_script_uri(os.path.join(td, "script.py")) .with_script_bucket("oci://@/") ) df = Job(name=name, infrastructure=dataflow_configs, runtime=runtime_config) df.create() df_run = df.run() From the Command Line --------------------- The same result can be achieved from the command line using ``ads CLI`` and a yaml file. Assuming you have the following two files written in your current directory as ``script.py`` and ``dataflow.yaml`` respectively: .. code-block:: python # script.py import pyspark def main(): print("Hello World") print("Spark version is", pyspark.__version__) if __name__ == "__main__": main() .. code-block:: yaml # dataflow.yaml kind: job spec: name: dataflow-app- infrastructure: kind: infrastructure spec: compartmentId: oci.xx. logsBucketUri: oci://@/ driverShape: VM.Standard.E4.Flex driverShapeConfig: ocpus: 2 memory_in_gbs: 32 executorShape: VM.Standard.E4.Flex executorShapeConfig: ocpus: 4 memory_in_gbs: 64 sparkVersion: 3.2.1 numExecutors: 1 definedTags: Oracle-Tags: CreatedBy: test_name@oracle.com freeformTags: test_freeform_key: test_freeform_value type: dataFlow runtime: kind: runtime spec: scriptUri: script.py scriptBucket: oci://@/ .. code-block:: shell ads jobs run -f dataflow.yaml Real Data Flow Example with Conda Environment ============================================= From PySpark v3.0.0 and onwards, Data Flow allows a published conda environment as the `Spark runtime environment `_ when built with `ADS`. Data Flow supports published conda environments only. Conda packs are tar'd conda environments. When you publish your own conda packs to object storage, ensure that the Data Flow Resource has access to read the object or bucket. Below is a more built-out example using conda packs: From a Python Environment ------------------------- .. code-block:: python from ads.jobs import DataFlow, Job, DataFlowRuntime from uuid import uuid4 import os import tempfile with tempfile.TemporaryDirectory() as td: with open(os.path.join(td, "script.py"), "w") as f: f.write( ''' from pyspark.sql import SparkSession import click @click.command() @click.argument("app_name") @click.option( "--limit", "-l", help="max number of row to print", default=10, required=False ) @click.option("--verbose", "-v", help="print out result in verbose mode", is_flag=True) def main(app_name, limit, verbose): Create a Spark session spark = SparkSession.builder.appName(app_name).getOrCreate() Load a csv file from dataflow public storage df = ( spark.read.format("csv") .option("header", "true") .option("multiLine", "true") .load( "oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv" ) ) Create a temp view and do some SQL operations df.createOrReplaceTempView("berlin") query_result_df = spark.sql( """ SELECT city, zipcode, CONCAT(latitude,',', longitude) AS lat_long FROM berlin """ ).limit(limit) # Convert the filtered Spark DataFrame into JSON format # Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook. if verbose: rows = query_result_df.toJSON().collect() for i, row in enumerate(rows): print(f"record {i}") print(row) if __name__ == "__main__": main() ''' ) name = f"dataflow-app-{str(uuid4())}" dataflow_configs = ( DataFlow() .with_compartment_id("oci.xx.") .with_logs_bucket_uri("oci://@/") .with_driver_shape("VM.Standard.E4.Flex") .with_driver_shape_config(ocpus=2, memory_in_gbs=32) .with_executor_shape("VM.Standard.E4.Flex") .with_executor_shape_config(ocpus=4, memory_in_gbs=64) .with_spark_version("3.2.1") .with_defined_tag( **{"Oracle-Tags": {"CreatedBy": "test_name@oracle.com"}} ) .with_freeform_tag(test_freeform_key="test_freeform_value") ) runtime_config = ( DataFlowRuntime() .with_script_uri(os.path.join(td, "script.py")) .with_script_bucket("oci://@/") .with_custom_conda(uri="oci://@/") .with_arguments(["run-test", "-v", "-l", "5"]) ) df = Job(name=name, infrastructure=dataflow_configs, runtime=runtime_config) df.create() df_run = df.run() From the Command Line --------------------- Again, assume you have the following two files written in your current directory as ``script.py`` and ``dataflow.yaml`` respectively: .. code-block:: python # script.py from pyspark.sql import SparkSession import click @click.command() @click.argument("app_name") @click.option( "--limit", "-l", help="max number of row to print", default=10, required=False ) @click.option("--verbose", "-v", help="print out result in verbose mode", is_flag=True) def main(app_name, limit, verbose): Create a Spark session spark = SparkSession.builder.appName(app_name).getOrCreate() Load a csv file from dataflow public storage df = ( spark.read.format("csv") .option("header", "true") .option("multiLine", "true") .load( "oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv" ) ) Create a temp view and do some SQL operations df.createOrReplaceTempView("berlin") query_result_df = spark.sql( """ SELECT city, zipcode, CONCAT(latitude,',', longitude) AS lat_long FROM berlin """ ).limit(limit) # Convert the filtered Spark DataFrame into JSON format # Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook. if verbose: rows = query_result_df.toJSON().collect() for i, row in enumerate(rows): print(f"record {i}") print(row) if __name__ == "__main__": main() .. code-block:: yaml # dataflow.yaml kind: job spec: name: dataflow-app- infrastructure: kind: infrastructure spec: compartmentId: oci.xx. logsBucketUri: oci://@/ driverShape: VM.Standard.E4.Flex driverShapeConfig: ocpus: 2 memory_in_gbs: 32 executorShape: VM.Standard.E4.Flex executorShapeConfig: ocpus: 4 memory_in_gbs: 64 sparkVersion: 3.2.1 numExecutors: 1 definedTags: Oracle-Tags: CreatedBy: test_name@oracle.com freeformTags: test_freeform_key: test_freeform_value type: dataFlow runtime: kind: runtime spec: scriptUri: script.py scriptBucket: oci://@/ conda: uri: oci://@/ type: published args: - "run-test" - "-v" - "-l" - "5" .. code-block:: shell ads jobs run -f dataflow.yaml