from ads.jobs import DataFlow, DataFlowRun, DataFlowRuntime
# Update these values
job_name = "<job_name>"
logs_bucket = "oci://<bucket_name>@<namespace>/<prefix>"
metastore_id = "<metastore_id>"
script_bucket = "oci://<bucket_name>@<namespace>/<prefix>"
compartment_id = os.environ.get("NB_SESSION_COMPARTMENT_OCID")
driver_shape = "VM.Standard2.1"
executor_shape = "VM.Standard2.1"
spark_version = "3.0.2"
# A python script to be run in Data Flow
script = '''
from pyspark.sql import SparkSession
def main():
database_name = "employee_attrition"
table_name = "orcl_attrition"
# Create a Spark session
spark = SparkSession \\
.builder \\
.appName("Python Spark SQL basic example") \\
.enableHiveSupport() \\
.getOrCreate()
# Load a CSV file from a public Object Storage bucket
df = spark \\
.read \\
.format("csv") \\
.option("header", "true") \\
.option("multiLine", "true") \\
.load("oci://hosted-ds-datasets@bigdatadatasciencelarge/synthetic/orcl_attrition.csv")
print(f"Creating {database_name}")
spark.sql(f"DROP DATABASE IF EXISTS {database_name} CASCADE")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
# Write the data to the database
df.write.mode("overwrite").saveAsTable(f"{database_name}.{table_name}")
# Use Spark SQL to read from the database.
query_result_df = spark.sql(f"""
SELECT EducationField, SalaryLevel, JobRole FROM {database_name}.{table_name} limit 10
""")
# Convert the filtered Apache Spark DataFrame into JSON format and write it out to stdout
# so that it can be captured in the log.
print('\\n'.join(query_result_df.toJSON().collect()))
if __name__ == '__main__':
main()
'''
# Saves the python script to local path.
dataflow_base_folder = tempfile.mkdtemp()
script_uri = os.path.join(dataflow_base_folder, "example.py")
with open(script_uri, 'w') as f:
print(script.strip(), file=f)
dataflow_configs = DataFlow(
{
"compartment_id": compartment_id,
"driver_shape": driver_shape,
"executor_shape": executor_shape,
"logs_bucket_uri": log_bucket_uri,
"metastore_id": metastore_id,
"spark_version": spark_version
}
)
runtime_config = DataFlowRuntime(
{
"script_uri": pyspark_file_path,
"script_bucket": script_uri
}
)
# creates a Data Flow application with DataFlow and DataFlowRuntime.
df_job = Job(name=job_name,
infrastructure=dataflow_configs,
runtime=runtime_config)
df_app = df_job.create()
df_run = df_app.run()
# check a job log
df_run.watch()