Quick Start#

Data Flow#

from ads.jobs import DataFlow, DataFlowRun, DataFlowRuntime

# Update these values
job_name = "<job_name>"
logs_bucket = "oci://<bucket_name>@<namespace>/<prefix>"
metastore_id = "<metastore_id>"
script_bucket = "oci://<bucket_name>@<namespace>/<prefix>"

compartment_id = os.environ.get("NB_SESSION_COMPARTMENT_OCID")
driver_shape = "VM.Standard.E4.Flex"
driver_shape_config = {"ocpus":2, "memory_in_gbs":32}
executor_shape = "VM.Standard.E4.Flex"
executor_shape_config = {"ocpus":4, "memory_in_gbs":64}
spark_version = "3.2.1"

# A python script to be run in Data Flow
script = '''
from pyspark.sql import SparkSession

def main():

    database_name = "employee_attrition"
    table_name = "orcl_attrition"

    # Create a Spark session
    spark = SparkSession \\
        .builder \\
        .appName("Python Spark SQL basic example") \\
        .enableHiveSupport() \\

    # Load a CSV file from a public Object Storage bucket
    df = spark \\
        .read \\
        .format("csv") \\
        .option("header", "true") \\
        .option("multiLine", "true") \\

    print(f"Creating {database_name}")
    spark.sql(f"DROP DATABASE IF EXISTS {database_name} CASCADE")
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

    # Write the data to the database

    # Use Spark SQL to read from the database.
    query_result_df = spark.sql(f"""
                                SELECT EducationField, SalaryLevel, JobRole FROM {database_name}.{table_name} limit 10

    # Convert the filtered Apache Spark DataFrame into JSON format and write it out to stdout
    # so that it can be captured in the log.

if __name__ == '__main__':

# Saves the python script to local path.
dataflow_base_folder = tempfile.mkdtemp()
script_uri = os.path.join(dataflow_base_folder, "example.py")

with open(script_uri, 'w') as f:
    print(script.strip(), file=f)

dataflow_configs = DataFlow(
        "compartment_id": compartment_id,
        "driver_shape": driver_shape,
        "driver_shape_config": driver_shape_config,
        "executor_shape": executor_shape,
        "executor_shape_config": executor_shape_config,
        "logs_bucket_uri": log_bucket_uri,
        "metastore_id": metastore_id,
        "spark_version": spark_version

runtime_config = DataFlowRuntime(
        "script_uri": pyspark_file_path,
        "script_bucket": script_uri

# creates a Data Flow application with DataFlow and DataFlowRuntime.
df_job = Job(name=job_name,
df_app = df_job.create()
df_run = df_app.run()

# check a job log

Interactive Spark#

from pyspark.sql import SparkSession

# Update these values
warehouse_uri = "<warehouse_uri>"
metastore_id = "<metastore_id>"

database_name = "ODSC_DEMO"

# create a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_uri) \
    .config("spark.hadoop.oracle.dcat.metastore.id", metastore_id) \
    .enableHiveSupport() \

# show the databases in the warehouse:
spark.sql("SHOW DATABASES").show()
spark.sql(f"DROP DATABASE IF EXISTS {database_name} CASCADE")
spark.sql(f"CREATE DATABASE {database_name}")

# Load the Employee Attrition data file from OCI Object Storage into a Spark DataFrame:
file_path = "oci://hosted-ds-datasets@bigdatadatasciencelarge/synthetic/orcl_attrition.csv"
input_dataframe = spark.read.option("header", "true").csv(file_path)

# explore data
spark_df = spark.sql(f"""
                     SELECT EducationField, SalaryLevel, JobRole FROM {database_name}.{table_name} limit 10