Run a Data Flow Application
Oracle Cloud Infrastructure (OCI) Data Flow is a service for creating and running Spark applications. The following examples demonstrate how to create and run Data Flow applications using ADS.
Python
To create and run a Data Flow application, you must specify a compartment and a bucket for storing logs under the same compartment:
compartment_id = "<compartment_id>"
logs_bucket_uri = "<logs_bucket_uri>"
Ensure that you set up the correct policies. For instance, for Data Flow to access logs bucket, use a policy like:
ALLOW SERVICE dataflow TO READ objects IN tenancy WHERE target.bucket.name='dataflow-logs'
For more information, see the Data Flow documentation.
Update oci_profile
if you’re not using the default:
oci_profile = "DEFAULT"
config_location = "~/.oci/config"
ads.set_auth(auth="api_key", oci_config_location=config_location, profile=oci_profile)
To create a Data Flow application you need two components:
DataFlow
, a subclass ofInfrastructure
.DataFlowRuntime
, a subclass ofRuntime
.
DataFlow
stores properties specific to Data Flow service, such as compartment_id, logs_bucket_uri, and so on. You can set them using the with_{property}
functions:
with_compartment_id
with_configuration
with_driver_shape
with_executor_shape
with_language
with_logs_bucket_uri
with_metastore_id
(doc)with_num_executors
with_spark_version
with_warehouse_bucket_uri
For more details, see `DataFlow
class documentation <https://docs.oracle.com/en-us/iaas/tools/ads-sdk/latest/ads.jobs.html#module-ads.jobs.builders.infrastructure.dataflow>`__.
DataFlowRuntime
stores properties related to the script to be run, such as the path to the script and CLI arguments. Likewise all properties can be set using with_{property}
. The DataFlowRuntime
properties are:
with_archive_bucket
with_archive_uri
(doc)with_script_bucket
with_script_uri
For more details, see the runtime class documentation.
Since service configurations remain mostly unchanged across multiple experiments, a DataFlow
object can be reused and combined with various DataFlowRuntime
parameters to create applications.
In the following “hello-world” example, DataFlow
is populated with compartment_id
, driver_shape
, executor_shape
, and spark_version
. DataFlowRuntime
is populated with script_uri
and script_bucket
. The script_uri
specifies the path to the script. It can be local or remote (an Object Storage path). If the path is local, then script_bucket
must be specified additionally because Data Flow requires a script to be available in Object Storage. ADS performs the upload step for you, as long as you give the bucket name or the Object Storage path prefix to upload the script. Either can be given to script_bucket
. For example, either with_script_bucket("<bucket_name>")
or with_script_bucket("oci://<bucket_name>@<namespace>/<prefix>")
is accepted. In the next example, the prefix is given for script_bucket
.
from ads.jobs import DataFlow, DataFlowRun, DataFlowRuntime
from uuid import uuid4
with tempfile.TemporaryDirectory() as td:
with open(os.path.join(td, "script.py"), "w") as f:
f.write('''
import pyspark
def main():
print("Hello World")
print("Spark version is", pyspark.__version__)
if __name__ == "__main__":
main()
''')
name = f"dataflow-app-{str(uuid4())}"
dataflow_configs = DataFlow()\
.with_compartment_id(compartment_id)\
.with_logs_bucket_uri(logs_bucket_uri)\
.with_driver_shape("VM.Standard2.1") \
.with_executor_shape("VM.Standard2.1") \
.with_spark_version("3.2.1")
runtime_config = DataFlowRuntime()\
.with_script_uri(os.path.join(td, "script.py"))\
.with_script_bucket(script_prefix)
df = Job(name=name, infrastructure=dataflow_configs, runtime=runtime_config)
df.create()
To run this application, you could use:
df_run = df.run()
After the run completes, check the stdout
log from the application by running:
print(df_run.logs.application.stdout)
You should this in the log:
Hello World
Spark version is 3.2.1
Data Flow supports adding third-party libraries using a ZIP file, usually called archive.zip
, see the Data Flow documentation about how to create ZIP files. Similar to scripts, you can specify an archive ZIP for a Data Flow application using with_archive_uri
. In the next example, archive_uri
is given as an Object Storage location. archive_uri
can also be local so you must specify with_archive_bucket
and follow the same rule as with_script_bucket
.
from ads.jobs import DataFlow, DataFlowRun, DataFlowRuntime
from uuid import uuid4
with tempfile.TemporaryDirectory() as td:
with open(os.path.join(td, "script.py"), "w") as f:
f.write('''
from pyspark.sql import SparkSession
import click
@click.command()
@click.argument("app_name")
@click.option(
"--limit", "-l", help="max number of row to print", default=10, required=False
)
@click.option("--verbose", "-v", help="print out result in verbose mode", is_flag=True)
def main(app_name, limit, verbose):
# Create a Spark session
spark = SparkSession.builder.appName(app_name).getOrCreate()
# Load a csv file from dataflow public storage
df = (
spark.read.format("csv")
.option("header", "true")
.option("multiLine", "true")
.load(
"oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv"
)
)
# Create a temp view and do some SQL operations
df.createOrReplaceTempView("berlin")
query_result_df = spark.sql(
"""
SELECT
city,
zipcode,
CONCAT(latitude,',', longitude) AS lat_long
FROM berlin
"""
).limit(limit)
# Convert the filtered Spark DataFrame into JSON format
# Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook.
if verbose:
rows = query_result_df.toJSON().collect()
for i, row in enumerate(rows):
print(f"record {i}")
print(row)
if __name__ == "__main__":
main()
''')
name = f"dataflow-app-{str(uuid4())}"
dataflow_configs = DataFlow()\
.with_compartment_id(compartment_id)\
.with_logs_bucket_uri(logs_bucket_uri)\
.with_driver_shape("VM.Standard2.1") \
.with_executor_shape("VM.Standard2.1") \
.with_spark_version("3.2.1")
runtime_config = DataFlowRuntime()\
.with_script_uri(os.path.join(td, "script.py"))\
.with_script_bucket("oci://<bucket>@<namespace>/prefix/path") \
.with_archive_uri("oci://<bucket>@<namespace>/prefix/archive.zip")
df = Job(name=name, infrastructure=dataflow_configs, runtime=runtime_config)
df.create()
You can pass arguments to a Data Flow run as a list of strings:
df_run = df.run(args=["run-test", "-v", "-l", "5"])
You can save the application specification into a YAML file for future reuse. You could also use the json
format.
print(df.to_yaml("sample-df.yaml"))
You can also load a Data Flow application directly from the YAML file saved in the previous example:
df2 = Job.from_yaml(uri="sample-df.yaml")
Create a new job and a run:
df_run2 = df2.create().run()
Deleting a job cancels associated runs:
df2.delete()
df_run2.status
You can also load a Data Flow application from an OCID:
df3 = Job.from_dataflow_job(df.id)
Creating a run under the same application:
df_run3 = df3.run()
Now, there are 2 runs under the df
application:
assert len(df.run_list()) == 2
When you run a Data Flow application, a DataFlowRun
object is created. You can check the status, wait for a run to finish, check its logs afterwards, or cancel a run in progress. For example:
df_run.status
df_run.wait()
Note that watch
is an alias of wait
, so you can also call df_run.watch()
.
There are three types of logs for a run:
application log
driver log
executor log
Each log consists of stdout
and stderr
. For example, to access stdout
from application log, you could use:
df_run.logs.application.stdout
Then you could check it with:
df_run.logs.application.stderr
df_run.logs.executor.stdout
df_run.logs.executor.stderr
You can also examine head
or tail
of the log, or download it to a local path. For example,
log = df_run.logs.application.stdout
log.head(n=1)
log.tail(n=1)
log.download(<local-path>)
For the sample script, the log prints first five rows of a sample dataframe in JSON and it looks like:
record 0
{"city":"Berlin","zipcode":"10119","lat_long":"52.53453732241747,13.402556926822387"}
record 1
{"city":"Berlin","zipcode":"10437","lat_long":"52.54851279221664,13.404552826587466"}
record 2
{"city":"Berlin","zipcode":"10405","lat_long":"52.534996191586714,13.417578665333295"}
record 3
{"city":"Berlin","zipcode":"10777","lat_long":"52.498854933130026,13.34906453348717"}
record 4
{"city":"Berlin","zipcode":"10437","lat_long":"52.5431572633131,13.415091104515707"}
Calling log.head(n=1)
returns this:
'record 0'
Calling log.tail(n=1)
returns this:
{"city":"Berlin","zipcode":"10437","lat_long":"52.5431572633131,13.415091104515707"}
A link to run the page in the OCI Console is given using the run_details_link
property:
df_run.run_details_link
To list Data Flow applications, a compartment id must be given with any optional filtering criteria. For example, you can filter by name of the application:
Job.dataflow_job(compartment_id=compartment_id, display_name=name)
YAML
You can create a Data Flow job directly from a YAML string. You can pass a YAML string into the Job.from_yaml()
function to build a Data Flow job:
kind: job
spec:
id: <dataflow_app_ocid>
infrastructure:
kind: infrastructure
spec:
compartmentId: <compartment_id>
driverShape: VM.Standard2.1
executorShape: VM.Standard2.1
id: <dataflow_app_ocid>
language: PYTHON
logsBucketUri: <logs_bucket_uri>
numExecutors: 1
sparkVersion: 3.2.1
type: dataFlow
name: dataflow_app_name
runtime:
kind: runtime
spec:
scriptBucket: bucket_name
scriptPathURI: oci://<bucket_name>@<namespace>/<prefix>
type: dataFlow
Data Flow Infrastructure YAML Schema
kind:
allowed:
- infrastructure
required: true
type: string
spec:
required: true
type: dict
schema:
compartmentId:
required: false
type: string
displayName:
required: false
type: string
driverShape:
required: false
type: string
executorShape:
required: false
type: string
id:
required: false
type: string
language:
required: false
type: string
logsBucketUri:
required: false
type: string
metastoreId:
required: false
type: string
numExecutors:
required: false
type: integer
sparkVersion:
required: false
type: string
type:
allowed:
- dataFlow
required: true
type: string
Data Flow Runtime YAML Schema
kind:
allowed:
- runtime
required: true
type: string
spec:
required: true
type: dict
schema:
archiveBucket:
required: false
type: string
archiveUri:
required: false
type: string
args:
nullable: true
required: false
schema:
type: string
type: list
conda:
nullable: false
required: false
type: dict
schema:
uri:
required: true
type: string
region:
required: False
type: string
authType:
required: false
allowed:
- "resource_principal"
- "api_keys"
- "instance_principal"
type:
allowed:
- published
required: true
type: string
env:
type: list
required: false
schema:
type: dict
freeformTags:
required: false
type: dict
scriptBucket:
required: false
type: string
scriptPathURI:
required: false
type: string
type:
allowed:
- dataFlow
required: true
type: string