[Legacy]

ads.dataflow.dataflow module has been deleted since version v2.8.6: June 2023

Deprecated since version v2.6.3: July 2022

ADS can be used to to create and run PySpark applications directly from a notebook session.

Prerequisite

To access , there are a number of steps that are needed to be completed.

  • Data Flow requires a bucket to store the logs, and a data warehouse bucket. Refer to the Data Flow documentation for setting up storage.

  • Data Flow requires policies to be set in IAM to access resources to manage and run applications/sessions. Refer to the Data Flow documentation on how to setup policies.

  • Data Flow natively supports conda packs published to OCI Object Storage. Ensure the Data Flow Resource has read access to the bucket or path of your published conda pack, and that the spark version >= 3 when running your Data Flow Application/Session.

Create a Instance

First, you create a DataFlow object instance.

By default, all artifacts are stored using the dataflow_base_folder optional argument. By default, all artifacts are stored in /home/datascience/dataflow. The dataflow_base_folder directory contains multiple subdirectories, each one corresponds to a different application. The name of the subdirectory corresponds to the application name that a random string is added as a suffix. In each application directory, artifacts generated by separate runs are stored in different folders. Each folder is identified by the run display name and the run creation time. All the run specific artifacts including the script, the run configuration, and the run logs are saved in the corresponding run folder.

Also, you can choose to use a specific compartment using the optional compartment_id argument when creating the dataflow instance. Otherwise, it uses the same compartment as your notebook session to create the instance.

from ads.dataflow.dataflow import DataFlow
data_flow = DataFlow(
  compartment_id="<compartmentA_OCID>",
  dataflow_base_folder="<my_dataflow_dir>"
)

Generate a Script Using a Template

We provide simple PySpark or sparksql templates for you to get started with . You can use data_flow.template() to generate a pre-written template.

We support these templates:

The standard_pyspark template is used for standard PySpark jobs.

The sparksql template is used for sparksql jobs.

from ads.dataflow.dataflow import DataFlow
data_flow = DataFlow()
data_flow.template(job_type='standard_pyspark')

data_flow.template() returns the local path to the script you have generated.

Create a Application

The application creation process has two stages, preparation and creation.

In the preparation stage, you prepare the configuration object necessary to create a application. You must provide values for these three parameters:

  • display_name: The name you give your application.

  • pyspark_file_path: The local path to your PySpark script.

  • script_bucket: The bucket used to read/write the PySpark script in Object Storage.

ADS checks that the bucket exists, and that you can write to it from your notebook sesssion. Optionally, you can change values for these parameters:

  • compartment_id: The OCID of the compartment to create a application. If it’s not provided, the same compartment as your dataflow object is used.

  • driver_shape: The driver shape used to create the application. The default value is "VM.Standard2.4".

  • executor_shape: The executor shape to create the application. The default value is "VM.Standard2.4".

  • logs_bucket: The bucket used to store run logs in Object Storage. The default value is "dataflow-logs".

  • num_executors: The number of executor VMs requested. The default value is 1.

Note

If you want to use a private bucket as the logs_bucket, ensure that you add a corresponding service policy using ` Identity: Policy Set Up <https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/dfs_getting_started.htm#policy_set_up>`_.

Then you can use prepare_app() to create the configuration object necessary to create the application.

from ads.dataflow.dataflow import DataFlow

data_flow = DataFlow()
app_config = data_flow.prepare_app(
  display_name="<app-display-name>",
  script_bucket="<your-script-bucket>" ,
  pyspark_file_path="<your-scirpt-path>"
)

After you have the application configured, you can create a application using create_app:

app = data_flow.create_app(app_config)

Your local script is uploaded to the script bucket in this application creation step. Object Storage supports file versioning that creates an object version when the content changes, or the object is deleted. You can enable Object Versioning in your bucket in the OCI Console to prevent overwriting of existing files in Object Storage.

You can create an application with a script file that exists in Object Storage by setting overwrite_script=True in create_app. Similarly, you can set overwrite_archive=True to create an application with an archive file that exists in Object Storage. By default, the overwrite_script and overwrite_archive options are set to false.

app = data_flow.create_app(app_config, overwrite_script=True, overwrite_archive=True)

You can explore a few attributes of the DataFlowApp object.

First , you can look at the configuration of the application.

app.config

Next, you could get a URL link to the OCI Console Application Details page.

app.oci_link

Load an Existing Application

As an alternative to creating applications in ADS, you can load existing applications created elsewhere. These applications must be Python applications. To load an existing applications, you need the application’s OCID.

existing_app = data_flow.load_app(app_id, target_folder)

You can find the app_id in the the OCI Console or by listing existing applications.

Optionally, you could assign a value to the parameter target_folder. This parameter is the directory you want to store the local artifacts of this application in. If target_folder is not provided, then the local artifacts of this application are stored in the dataflow_base_folder folder defined by the dataflow object instance.

Listing Applications

From ADS you can list applications, that are returned a as a list of dictionaries, with a function to provide the data in a Pandas dataframe. The default sort order is the most recent run first.

For example, to list the most recent five applications use this code:

from ads.dataflow.dataflow import DataFlow
data_flow = DataFlow()
data_flow.list_apps().to_dataframe().head(5)

Create a Run

After an application is created or loaded in your notebook session, the next logical step is to execute a run of that application. The process of running (or creating) a run is similar to creating an application.

First, you configure the run using the prepare_run() method of the DataFlowApp object. You only need to provide a value for the name of your run using run_display_name:

run_config = app.prepare_run(run_display_name="<run-display-name>")

You could use a compartment different from your application to create a run by specifying the compartment_id in prepare_run. By default, it uses the same compartment as your application to create the run.

Optionally, you can specify the logs_bucket to store the logs of your run. By default, the run inherits the logs_bucket from the parent application, but you can overwrite that option.

Every time the application launches a run, a local folder representing this run is created. This folder stores all the information including the script, the run configuration, and any logs that are stored in the logs bucket.

Then, you can create a run using the run_config generated in the preparation stage. During this process, you can monitor the run while the job is running. You can also pull logs into your local directories by setting, save_log_to_local=True.

run = app.run(run_config, save_log_to_local=True)

The DataFlowRun object has some useful attributes similar to the DataFlowApp object.

You can check the status of the run with:

run.status

You can get the configuration file that created this run. The run configuration and the PySpark script used in this run are also saved in the corresponding run directory in your notebook environment.

run.config

You can get the run directory where the artifacts are stored in your notebook environment with:

run.local_dir

Similarly, you can get a clickable link to the OCI Console Run Details page with:

run.oci_link

Fetching Logs

After a run has completed, you can examine the logs using ADS. There are two types of logs, stdout and stderr.

run.log_stdout.head()   # show first rows of stdout
run.log_stdout.tail()   # show last lines of stdout

# where the logs are stored on OCI Storage
run.log_stdout.oci_path

# the path to the saved logs in the notebook environment if ``save_log_to_local`` was ``True`` when you create this run
run.log_stdout.local_path

If save_log_to_local is set to False during app.run(...), you can fetch logs by calling the fetch_log(...).save() method on the DataFlowRun object with the correct logs type.

run.fetch_log("stdout").save()
run.fetch_log("stderr").save()

Note

Due to a limitation of PySpark (specifically Python applications in Spark), both stdout and stderr are merged into the stdout stream.

Edit and Synchronize PySpark Script

The integration with ADS supports the edit-run-edit cycle, so the local PySpark script can be edited, and is automatically synchronized to Object Storage each time the application is run.

obtains the PySpark script from Object Storage

so the local files in the notebook session are not visible to . The app.run(...) method compares the content hash of the local file with the remote copy on Object Storage. If any change is detected, the new local version is copied over to the remote. For the first run the synchronization creates the remote file and generates a fully qualified URL with namespace that’s required for .

Synchronizing is the default setting in app.run(...). If you don’t want the application to sync with the local modified files, you need to include sync=False as an argument parameter in app.run(...).

Arguments and Parameters

Passing arguments to PySpark scripts is done with the arguments value in prepare_app. Additional to the arguments supports, is a parameter dictionary that you can use to interpolate arguments. To just pass arguments, the script_parameter section may be ignored. However, any key-value pair defined in script_parameter can be referenced in arguments using the ${key} syntax, and the value of that key is passed as the argument value.

from ads.dataflow.dataflow import DataFlow

data_flow = DataFlow()
app_config = data_flow.prepare_app(
  display_name,
  script_bucket,
  pyspark_file_path,
  arguments = ['${foo}', 'bar', '-d', '--file', '${filename}'],
  script_parameters={
    'foo': 'val1 val2',
    'filename': 'file1',
  }
)
app = data_flow.create_app(app_config)

run_config = app.prepare_run(run_display_name="test-run")
run = app.run(run_config)

Note

The arguments in the format of ${arg} are replaced by the value provided in script parameters when passed in, while arguments not in this format are passed into the script verbatim.

You can override the values of some or all script parameters in each run by passing different values to prepare_run().

run_config = app.prepare_run(run_display_name="test-run", foo='val3')
run = app.run(run_config)

Add Third-Party Libraries

Your PySpark applications might have custom dependencies in the form of Python wheels or virtual environments, see Adding Third-Party Libraries to Applications.

Pass the archive file to your applications with archive_path and archive_bucket values in prepare_app.

  • archive_path: The local path to archive file.

  • archive_bucket: The bucket used to read and write the archive file in Object Storage; if not provided, archive_bucket will use the bucket for PySpark bucket by default.

Use prepare_app() to create the configuration object necessary to create the application.

from ads.dataflow.dataflow import DataFlow

data_flow = DataFlow()
app_config = data_flow.prepare_app(
  display_name="<app-display-name>",
  script_bucket="<your-script-bucket>",
  pyspark_file_path="<your-scirpt-path>",
  archive_path="<your-archive-path>",
  archive_bucket="<your-archive-bucket>"
)

The behavior of the archive file is very similar to the PySpark script when creating:

  • An application, the local archive file is uploaded to the specified bucket Object Storage.

  • A run, the latest local archive file is synchronized to the remote file in Object Storage. The sync parameter controls this behavior.

  • Loading an existing application created with archive_uri, the archive file is obtained from Object Storage, and saved in the local directory.

Fetching PySpark Output

After the application has run and any stdout captured in the log file, the PySpark script likely produces some form of output. Usually a PySpark script batch processes something. For example, sampling data, aggregating data, preprocessing data. You can load the resulting output as an ADSDataset.open() using the ocis:// protocol handler.

The only way to get output from PySpark back into the notebook session is to create files in Object Storage that is read into the notebook, or use the stdout stream.

Following is a simple example of a PySpark script producing output printed in a portable JSON-L format, though CSV works too. This method, while convenient as an example, is not a recommended for large data.

from pyspark.sql import SparkSession

def main():

    # create a spark session
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .getOrCreate()

    # load an example csv file from dataflow public storage into DataFrame
    original_df = spark\
          .read\
          .format("csv")\
          .option("header", "true")\
          .option("multiLine", "true")\
          .load("oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv")

    # the dataframe as a sql view so we can perform SQL on it
    original_df.createOrReplaceTempView("berlin")

    query_result_df = spark.sql("""
                      SELECT
                        city,
                        zipcode,
                        number_of_reviews,
                        CONCAT(latitude, ',', longitude) AS lat_long
                      FROM
                        berlin"""
                    )

    # Convert the filtered Spark DataFrame into JSON format
    # Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook.

    print('\n'\
            .join(query_result_df\
            .toJSON()\
            .collect()))

if __name__ == '__main__':
    main()

After you run the stdout stream (which contains CSV formatted data), it can be interpreted as a string using Pandas.

import io
import pandas as pd

# the PySpark script wrote to the log as jsonL, and we read the log back as a pandas dataframe
df = pd.read_json((str(run.log_stdout)), lines=True)

df.head()

Example Notebook: Develop Pyspark jobs locally - from local to remote workflows

This notebook provides spark operations for customers by bridging the existing local spark workflows with cloud based capabilities. Data scientists can use their familiar local environments with JupyterLab, and work with remote data and remote clusters simply by selecting a kernel. The operations demonstrated are, how to:

  • Use the interactive spark environment and produce a spark script,

  • Prepare and create an application,

  • Prepare and create a run,

  • List existing dataflow applications,

  • Retrieve and display the logs,

The purpose of the dataflow module is to provide an efficient and convenient way for you to launch a Spark application, and run Spark jobs. The interactive Spark kernel provides a simple and efficient way to edit and build your Spark script, and easy access to read from an OCI filesystem.

import io
import matplotlib.pyplot as plt
import os
from os import path
import pandas as pd
import tempfile
import uuid

from ads.dataflow.dataflow import DataFlow

from pyspark.sql import SparkSession

Build your PySPark Script Using an Interactive Spark kernel

Set up spark session in your PySPark conda environment:

# create a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.driver.cores", "4") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

Load the Employee Attrition data file from OCI Object Storage into a Spark DataFrame:

emp_attrition = spark\
      .read\
      .format("csv")\
      .option("header", "true")\
      .option("inferSchema", "true")\
      .option("multiLine", "true")\
      .load("oci://hosted-ds-datasets@bigdatadatasciencelarge/synthetic/orcl_attrition.csv") \
      .cache() # cache the dataset to increase computing speed
emp_attrition.createOrReplaceTempView("emp_attrition")

Next, explore the dataframe:

spark.sql('select * from emp_attrition limit 5').toPandas()
Age Attrition TravelForWork ... name
0 42 Yes infrequent ... Tracy Moore
1 50 No often ... Andrew Hoover
2 38 Yes infrequent ... Julie Bell
3 34 No often ... Thomas Adams
4 28 No infrequent ... Johnathan Burnett

5 rows × 36 columns

Visualize how monthly income and age relate to one another in the context of years in industry:

fig, ax = plt.subplots()
plot = spark.sql("""
          SELECT
              Age,
              MonthlyIncome,
              YearsInIndustry
          FROM
            emp_attrition
          """).toPandas().plot.scatter(x="Age", y="MonthlyIncome", title='Age vs Monthly Income',
                                       c="YearsInIndustry", cmap="viridis", figsize=(12,12), ax=ax)
plot.set_xlabel("Age")
plot.set_ylabel("Monthly Income")
plot
<AxesSubplot:title={'center':'Age vs Monthly Income'}, xlabel='Age', ylabel='Monthly Income'>

View all of the columns in the table:

spark.sql("show columns from emp_attrition").show()
+--------------------+
|            col_name|
+--------------------+
|                 Age|
|           Attrition|
|       TravelForWork|
|         SalaryLevel|
|         JobFunction|
|       CommuteLength|
|    EducationalLevel|
|      EducationField|
|             Directs|
|      EmployeeNumber|
| EnvironmentSatisf..|
|              Gender|
|          HourlyRate|
|      JobInvolvement|
|            JobLevel|
|             JobRole|
|     JobSatisfaction|
|       MaritalStatus|
|       MonthlyIncome|
|         MonthlyRate|
+--------------------+
only showing top 20 rows

Select a few columns using Spark, and convert it into a Pandas dataframe:

df = spark.sql("""
         SELECT
            Age,
            MonthlyIncome,
            YearsInIndustry
          FROM
            emp_attrition """).limit(10).toPandas()
df
Age MonthlyIncome YearsInIndustry
0 42 5993 8
1 50 5130 10
2 38 2090 7
3 34 2909 8
4 28 3468 6
5 33 3068 8
6 60 2670 12
7 31 2693 1
8 39 9526 10
9 37 5237 17

You can work with different compression formats within . For example, snappy Parquet:

# Writing to a snappy parquet file
df.to_parquet('emp_attrition.parquet.snappy', compression='snappy')
pd.read_parquet('emp_attrition.parquet.snappy')
Age MonthlyIncome YearsInIndustry
0 42 5993 8
1 50 5130 10
2 38 2090 7
3 34 2909 8
4 28 3468 6
5 33 3068 8
6 60 2670 12
7 31 2693 1
8 39 9526 10
9 37 5237 17
# We are able to read in this snappy parquet file to a spark dataframe
read_snappy_df = SparkSession \
    .builder \
    .appName("Snappy Compression Loading Example") \
    .config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec") \
    .getOrCreate() \
    .read \
    .format("parquet") \
    .load(f"{os.getcwd()}/emp_attrition.parquet.snappy")

read_snappy_df.first()
Row(Age=42, MonthlyIncome=5993, YearsInIndustry=8)

Other compression formats that supports include snappy Parquet, and Gzip on both CSV and Parquet.

You might have query that you want to run in from previous explorations, review the dataflow.ipynb notebook example that shows you how to submit a job to .

dataflow_base_folder = tempfile.mkdtemp()
data_flow = DataFlow(dataflow_base_folder=dataflow_base_folder)
print("Data flow directory: {}".format(dataflow_base_folder))
Data flow directory: /tmp/tmpe18x_qbr
pyspark_file_path = path.join(dataflow_base_folder, "example-{}.py".format(str(uuid.uuid4())[-6:]))
script = '''
from pyspark.sql import SparkSession

def main():

    # Create a Spark session
    spark = SparkSession \\
        .builder \\
        .appName("Python Spark SQL basic example") \\
        .getOrCreate()

    # Load a csv file from dataflow public storage
    df = spark \\
        .read \\
        .format("csv") \\
        .option("header", "true") \\
        .option("multiLine", "true") \\
        .load("oci://hosted-ds-datasets@bigdatadatasciencelarge/synthetic/orcl_attrition.csv")

    # Create a temp view and do some SQL operations
    df.createOrReplaceTempView("emp_attrition")
    query_result_df = spark.sql("""
        SELECT
            Age,
            MonthlyIncome,
            YearsInIndustry
        FROM emp_attrition
    """)

    # Convert the filtered Spark DataFrame into JSON format
    # Note: we are writing to the spark stdout log so that we can retrieve the log later at the end of the notebook.
    print('\\n'.join(query_result_df.toJSON().collect()))

if __name__ == '__main__':
    main()
'''

with open(pyspark_file_path, 'w') as f:
    print(script.strip(), file=f)

print("Script path: {}".format(pyspark_file_path))
Script path: /tmp/example.py
script_bucket = "test"                     # Update the value
logs_bucket = "dataflow-log"               # Update the value
display_name = "sample_Data_Flow_app"

app_config = data_flow.prepare_app(display_name=display_name,
                                   script_bucket=script_bucket,
                                   pyspark_file_path=pyspark_file_path,
                                   logs_bucket=logs_bucket)

app = data_flow.create_app(app_config)

run_display_name = "sample_Data_Flow_run"
run_config = app.prepare_run(run_display_name=run_display_name)

run = app.run(run_config, save_log_to_local=True)
run.status
'SUCCEEDED'
run.config
{'compartment_id': 'ocid1.compartment..<unique_ID>',
 'script_bucket': 'test',
 'pyspark_file_path': '/tmp/tmpe18x_qbr/example-0054ed.py',
 'archive_path': None,
 'archive_bucket': None,
 'run_display_name': 'sample_Data_Flow_run',
 'logs_bucket': 'dataflow-log',
 'logs_bucket_uri': 'oci://dataflow-log@ociodscdev',
 'driver_shape': 'VM.Standard2.4',
 'executor_shape': 'VM.Standard2.4',
 'num_executors': 1}
run.oci_link
Saving processed data to jdbc:oracle:thin:@database_high?TNS_ADMIN=/tmp/

Read from the Database Using PySpark

PySpark can be used to load data from an Oracle Autonomous Database (ADB) into a Spark application. The next cell makes a JDBC connection to the database defined using the adb_url variable, and accesses the table defined with table_name. The credentials stored in the vault and previously read into memory are used. After this command is run, you can perform Spark operations on it.

The table is relatively small so the notebook uses PySpark in the notebook session. However, for larger jobs, we recommended that you use the Oracle service.

if "adb_url" in globals():
    output_dataframe = sc.read \
        .format("jdbc") \
        .option("url", adb_url) \
        .option("dbtable", table_name) \
        .option("user", user) \
        .option("password", password) \
        .load()
else:
    print("Skipping as it appears that you do not have adb_url configured.")

The database table is loaded into Spark so that you can perform operations to transform, model, and more. In the next cell, the notebook prints the table demonstrating that it was successfully loaded into Spark from the ADB.

if "adb_url" in globals():
    output_dataframe.show()
else:
    print("Skipping as it appears that you do not have output_dataframe configured.")
+----+----------+--------------+------------+-------------------+-------------+-----------------+---------------+--------+---------------+------------------------+-------+-----------+---------------+---------+---------------------+---------------+--------------+--------------+------------+-------------------+-------+---------+------------------+------------------+-------------------------+------------------+-----------------+----------------+----------------------+----------------+-----------+--------------------+------------------------+---------------------+------------------+
| Age| Attrition| TravelForWork| SalaryLevel|       JobFunction| CommuteLength| EducationalLevel| EducationField| Directs| EmployeeNumber| EnvironmentSatisfaction| Gender| HourlyRate| JobInvolvement| JobLevel|             JobRole| JobSatisfaction| MaritalStatus| MonthlyIncome| MonthlyRate| NumCompaniesWorked| Over18| OverTime| PercentSalaryHike| PerformanceRating| RelationshipSatisfaction| WeeklyWorkedHours| StockOptionLevel| YearsinIndustry| TrainingTimesLastYear| WorkLifeBalance| YearsOnJob| YearsAtCurrentLevel| YearsSinceLastPromotion| YearsWithCurrManager|              name|
+----+----------+--------------+------------+-------------------+-------------+-----------------+---------------+--------+---------------+------------------------+-------+-----------+---------------+---------+---------------------+---------------+--------------+--------------+------------+-------------------+-------+---------+------------------+------------------+-------------------------+------------------+-----------------+----------------+----------------------+----------------+-----------+--------------------+------------------------+---------------------+------------------+
|  42|       Yes|    infrequent|        5054| Product Management|            2|               L2|  Life Sciences|       1|              1|                       2| Female|         94|              3|        2|      Sales Executive|              4|        Single|          5993|       19479|                  8|      Y|      Yes|                11|                 3|                        1|                80|                0|               8|                     0|               1|          6|                   4|                       0|                    5|       Tracy Moore|
|  50|        No|         often|        1278| Software Developer|            9|               L1|  Life Sciences|       1|              2|                       3|   Male|         61|              2|        2|   Research Scientist|              2|       Married|          5130|       24907|                  1|      Y|       No|                23|                 4|                        4|                80|                1|              10|                     3|               3|         10|                   7|                       1|                    7|     Andrew Hoover|
|  38|       Yes|    infrequent|        6296| Software Developer|            3|               L2|          Other|       1|              4|                       4|   Male|         92|              2|        1| Laboratory Techni...|              3|        Single|          2090|        2396|                  6|      Y|      Yes|                15|                 3|                        2|                80|                0|               7|                     3|               3|          0|                   0|                       0|                    0|        Julie Bell|
|  34|        No|         often|        6384| Software Developer|            4|               L4|  Life Sciences|       1|              5|                       4| Female|         56|              3|        1|   Research Scientist|              3|       Married|          2909|       23159|                  1|      Y|      Yes|                11|                 3|                        3|                80|                0|               8|                     3|               3|          8|                   7|                       3|                    0|      Thomas Adams|
|  28|        No|    infrequent|        2710| Software Developer|            3|               L1|        Medical|       1|              7|                       1|   Male|         40|              3|        1| Laboratory Techni...|              2|       Married|          3468|       16632|                  9|      Y|       No|                12|                 3|                        4|                80|                1|               6|                     3|               3|          2|                   2|                       2|                    2| Johnathan Burnett|
|  33|        No|         often|        4608| Software Developer|            3|               L2|  Life Sciences|       1|              8|                       4|   Male|         79|              3|        1| Laboratory Techni...|              4|        Single|          3068|       11864|                  0|      Y|       No|                13|                 3|                        3|                80|                0|               8|                     2|               2|          7|                   7|                       3|                    6|      Rhonda Grant|
|  60|        No|    infrequent|        6072| Software Developer|            4|               L3|        Medical|       1|             10|                       3| Female|         81|              4|        1| Laboratory Techni...|              1|       Married|          2670|        9964|                  4|      Y|      Yes|                20|                 4|                        1|                80|                3|              12|                     3|               2|          1|                   0|                       0|                    0|      Brandon Gill|
|  31|        No|    infrequent|        6228| Software Developer|           25|               L1|  Life Sciences|       1|             11|                       4|   Male|         67|              3|        1| Laboratory Techni...|              3|      Divorced|          2693|       13335|                  1|      Y|       No|                22|                 4|                        2|                80|                1|               1|                     2|               3|          1|                   0|                       0|                    0|       Debbie Chan|
|  39|        No|         often|         990| Software Developer|           24|               L3|  Life Sciences|       1|             12|                       4|   Male|         44|              2|        3| Manufacturing Dir...|              3|        Single|          9526|        8787|                  0|      Y|       No|                21|                 4|                        2|                80|                0|              10|                     2|               3|          9|                   7|                       1|                    8|        Kayla Ward|
|  37|        No|    infrequent|        5958| Software Developer|           28|               L3|        Medical|       1|             13|                       3|   Male|         94|              3|        2| Healthcare Repres...|              3|       Married|          5237|       16577|                  6|      Y|       No|                13|                 3|                        2|                80|                2|              17|                     3|               2|          7|                   7|                       7|                    7|      Angel Vaughn|
|  36|        No|    infrequent|        3710| Software Developer|           17|               L3|        Medical|       1|             14|                       1|   Male|         84|              4|        1| Laboratory Techni...|              2|       Married|          2426|       16479|                  0|      Y|       No|                13|                 3|                        3|                80|                1|               6|                     5|               3|          5|                   4|                       0|                    3|   Samantha Parker|
|  30|        No|    infrequent|         700| Software Developer|           16|               L2|  Life Sciences|       1|             15|                       4| Female|         49|              2|        2| Laboratory Techni...|              3|        Single|          4193|       12682|                  0|      Y|      Yes|                12|                 3|                        4|                80|                0|              10|                     3|               3|          9|                   5|                       0|                    8|   Melanie Mcbride|
|  32|        No|    infrequent|        3072| Software Developer|           27|               L1|  Life Sciences|       1|             16|                       1|   Male|         31|              3|        1|   Research Scientist|              3|      Divorced|          2911|       15170|                  1|      Y|       No|                17|                 3|                        4|                80|                1|               5|                     1|               2|          5|                   2|                       4|                    3|      Bradley Hall|
|  35|        No|    infrequent|        6172| Software Developer|           20|               L2|        Medical|       1|             18|                       2|   Male|         93|              3|        1| Laboratory Techni...|              4|      Divorced|          2661|        8758|                  0|      Y|       No|                11|                 3|                        3|                80|                1|               3|                     2|               3|          2|                   2|                       1|                    2|       Patrick Lee|
|  29|       Yes|    infrequent|         472| Software Developer|           25|               L3|  Life Sciences|       1|             19|                       3|   Male|         50|              2|        1| Laboratory Techni...|              3|        Single|          2028|       12947|                  5|      Y|      Yes|                14|                 3|                        2|                80|                0|               6|                     4|               3|          4|                   2|                       0|                    3|    Jessica Willis|
|  30|        No|    infrequent|        6370| Software Developer|           22|               L4|  Life Sciences|       1|             20|                       2| Female|         51|              4|        3| Manufacturing Dir...|              1|      Divorced|          9980|       10195|                  1|      Y|       No|                11|                 3|                        3|                80|                1|              10|                     1|               3|         10|                   9|                       8|                    8|        Chad Scott|
|  33|        No|    infrequent|        1530| Software Developer|            6|               L2|  Life Sciences|       1|             21|                       1|   Male|         80|              4|        1|   Research Scientist|              2|      Divorced|          3298|       15053|                  0|      Y|      Yes|                12|                 3|                        4|                80|                2|               7|                     5|               2|          6|                   2|                       0|                    5|   Gregory Bennett|
|  23|        No|          none|        5150| Software Developer|           17|               L2|        Medical|       1|             22|                       4|   Male|         96|              4|        1| Laboratory Techni...|              4|      Divorced|          2935|        7324|                  1|      Y|      Yes|                13|                 3|                        2|                80|                2|               1|                     2|               2|          1|                   0|                       0|                    0|      Jesse Palmer|
|  54|        No|    infrequent|        5590| Product Management|            3|               L4|  Life Sciences|       1|             23|                       1| Female|         78|              2|        4|              Manager|              4|       Married|         15427|       22021|                  2|      Y|       No|                16|                 3|                        3|                80|                0|              31|                     3|               3|         25|                   8|                       3|                    7| Dr. Erin Good DDS|
|  39|        No|    infrequent|        1700| Software Developer|            3|               L3|  Life Sciences|       1|             24|                       4|   Male|         45|              3|        1|   Research Scientist|              4|        Single|          3944|        4306|                  5|      Y|      Yes|                11|                 3|                        3|                80|                0|               6|                     3|               3|          3|                   2|                       1|                    2|     Kathy Patrick|
+----+----------+--------------+------------+-------------------+-------------+-----------------+---------------+--------+---------------+------------------------+-------+-----------+---------------+---------+---------------------+---------------+--------------+--------------+------------+-------------------+-------+---------+------------------+------------------+-------------------------+------------------+-----------------+----------------+----------------------+----------------+-----------+--------------------+------------------------+---------------------+------------------+
only showing top 20 rows

Cleaning Up Artifacts

This example created a number of artifacts, such as unzipping the wallet file, creating a database table, and starting a Spark cluster. Next, you remove these resources.

if wallet_path != "<wallet_path>":
    connection.update_repository(key="pyspark_adb", value=adb_creds)
    connection.import_wallet(wallet_path=wallet_path, key="pyspark_adb")
    conn = cx_Oracle.connect(user, password, tnsname)
    cursor = conn.cursor()
    cursor.execute(f"DROP TABLE {table_name}")
    cursor.close()
    conn.close()
else:
    print("Skipping as it appears that you do not have wallet_path specified.")

if "tns_path" in globals():
    shutil.rmtree(tns_path)

sc.stop()

Example Notebook: Using the ADB with PySpark

This notebook demonstrates how to use PySpark to process data in Object Storage, and save the results to an ADB. It also demonstrates how to query data from an ADB using a local PySpark session.

import base64
import cx_Oracle
import oci
import os
import shutil
import tempfile
import zipfile

from ads.database import connection
from ads.vault.vault import Vault
from pyspark import SparkConf
from pyspark.sql import SparkSession
from urllib.parse import urlparse

Introduction

It has become a common practice to store structured and semi-structured data using services such as Object Storage. This provides a scalable solution to store vast quantities of data that can be post-processed. However, using a relational database management system (RDMS) such as the Oracle ADB provides advantages like ACID compliance, rapid relational joins, support for complex business logic, and more. It is important to be able to access information stored in Object Storage, process that information, and load it into an RBMS. This notebook demonstrates how to use PySpark, a Python interface to Apache Spark, to perform these operations.

This notebook uses a publicly accessible Object Storage location to read from. However, an ADB needs to be configured with permissions to create a table, write to that table, and read from it. It also assumes that the credentials to access the database are stored in the Vault. This is the best practice as it prevents the credentials from being stored locally or in the notebook where they may be accessible to others. If you do not have credentials stored in the Vault. Once credentials to the database, are stored in the Vault, you need the OCIDs for the Vault, encryption key, and the secret.

ADBs have an additional level of security that is needed to access them and are wallet file. You can obtain the wallet file from your account administrator or download it using the steps that are outlined in the [downloading a wallet(https://docs.oracle.com/en-us/iaas/Content/Database/Tasks/adbconnecting.htm#access). The wallet file is a ZIP file. This notebook unzips the wallet and updates the configuration settings so you don’t have to.

The database connection also needs the TNS name of the database. Your database administrator can give you the TNS name of the database that you have access to.

Setup the Required Variables

The required variables to set up are:

  1. vault_id, key_id, secret_ocid: The OCID of the secret by storing the username and password required to connect to your ADB in a secret within the OCI Vault service. Note that the secret is the credential needed to access a database. This notebook is designed so that any secret can be stored as long as it is in the form of a dictionary. To store your secret, just modify the dictionary, see the vault.ipynb example notebook for detailed steps to generate this OCID.

  2. tnsname: A TNS name valid for the database.

  3. wallet_path: The local path to your wallet ZIP file, see the autonomous_database.ipynb example notebook for instructions on accessing the wallet file.

secret_ocid = "secret_ocid"
tnsname = "tnsname"
wallet_path = "wallet_path"
vault_id = "vault_id"
key_id = "key_id"

Obtain Credentials from the Vault

If the vault_id, key_id, and secret_id have been updated, then the notebook obtains a handle to the vault with a variable called vault. This uses the get_secret() method to return a dictionary with the user credentials. The approach assumes that the Accelerated Data Science (ADS) library was used to store the secret.

if vault_id != "<vault_id>" and key_id != "<key_id>" and secret_ocid != "<secret_ocid>":
    print("Getting wallet username and password")
    vault = Vault(vault_id=vault_id, key_id=key_id)
    adb_creds = vault.get_secret(secret_ocid)
    user = adb_creds["username"]
    password = adb_creds["password"]
else:
    print("Skipping as it appears that you do not have vault, key, and secret ocid specified.")
Getting wallet username and password

Setup the Wallet

An ADB requires a wallet file to access the database. The wallet_path variable defines the location of this file. The next cell prepares the wallet file to make a connection to the database. It also creates the ADB connection string, adb_url.

def setup_wallet(wallet_path):
    """
    Prepare ADB wallet file for use in PySpark.
    """

    temporary_directory = tempfile.mkdtemp()
    zip_file_path = os.path.join(temporary_directory, "wallet.zip")

    # Extract everything locally.
    with zipfile.ZipFile(wallet_path, "r") as zip_ref:
        zip_ref.extractall(temporary_directory)

    return temporary_directory

if wallet_path != "<wallet_path>":
    print("Setting up wallet")
    tns_path = setup_wallet(wallet_path)
else:
    print("Skipping as it appears that you do not have wallet_path specified.")
Setting up wallet
if "tns_path" in globals() and tnsname != "<tnsname>":
    adb_url = f"jdbc:oracle:thin:@{tnsname}?TNS_ADMIN={tns_path}"
else:
    print("Skipping, as the tns_path or tnsname are not defined.")

Reading Data from Object Storage

This notebook uses PySpark to access the Object Storage file. The next cell creates a Spark application called “Python Spark SQL Example” and returns a SparkContext. The SparkContext, normally called sc, is a handle to the Spark application.

The data file that is used is relatively small so the notebook uses PySpark by running a version of Spark in local mode. That means, it is running in the notebook session. For larger jobs, we recommended that you use the Oracle service, which is an Oracle managed Spark service.

# create a spark session
sc = SparkSession \
    .builder \
    .appName("Python Spark SQL Example") \
    .getOrCreate()

This notebook reads in a data file that is stored in an Oracle Object Storage file. This is defined with the file_path variable. The SparkContext with the read.option().csv() methods is used to read in the CSV file from Object Storage into a data frame.

file_path = "oci://hosted-ds-datasets@bigdatadatasciencelarge/synthetic/orcl_attrition.csv"
input_dataframe = sc.read.option("header", "true").csv(file_path)

Save the Data to the Database

This notebook creates a table in your database with the name specified with table_name. The name that is defined should be unique so that it does not interfere with any existing table in your database. If it does, change the value to something that is unique.

table_name = "ODSC_PYSPARK_ADB_DEMO"

if tnsname != "<tnsname>" and "adb_url" in globals():
    print("Saving processed data to " + adb_url)
    properties = {
        "oracle.net.tns_admin": tnsname,
        "password": password,
        "user": user,
    }
    input_dataframe.write.jdbc(
        url=adb_url, table=table_name, properties=properties
    )
else:
    print("Skipping as it appears that you do not have tnsname specified.")