#!/usr/bin/env python# -*- coding: utf-8; -*-# Copyright (c) 2022, 2023 Oracle and/or its affiliates.# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/importosimportjsonfromoci.data_flow.modelsimportApplicationfromads.opctl.config.utilsimportread_from_inifromads.opctl.constantsimport(ADS_DATAFLOW_CONFIG_FILE_NAME,DEFAULT_ADS_CONFIG_FOLDER,)fromads.jobsimportloggerfromads.common.utilsimportoci_key_profile
[docs]defget_dataflow_config(path=None,oci_profile=None):ifpath:dataflow_config_file_path=os.path.abspath(os.path.expanduser(path))else:dataflow_config_file_path=os.path.expanduser(os.path.join(DEFAULT_ADS_CONFIG_FOLDER,ADS_DATAFLOW_CONFIG_FILE_NAME))config={}ifos.path.exists(dataflow_config_file_path):parser=read_from_ini(dataflow_config_file_path)ifnotoci_profile:oci_profile=oci_key_profile()ifoci_profileinparser:config=dict(parser[oci_profile])iflen(config)==0:logger.error(f"Dataflow configuration with profile {oci_profile} not found.")raiseValueError(f"Dataflow configuration with profile {oci_profile} not found.")returnconfigelse:logger.warning(f"{dataflow_config_file_path} not found. Follow this link https://accelerated-data-science.readthedocs.io/en/latest/user_guide/apachespark/dataflow.html to set up the config.")return{}
[docs]classDataFlowConfig(Application):def__init__(self,path:str=None,oci_profile:str=None):"""Create a DataFlowConfig object. If a path to config file is given it is loaded from the path. Parameters ---------- path : str, optional path to configuration file, by default None oci_profile : str, optional oci profile to use, by default None """self.config=get_dataflow_config(path,oci_profile)self._script_bucket=Noneself._archive_bucket=Noneiflen(self.config)>0:self._script_bucket=self.config.pop("script_bucket")self._archive_bucket=self.config.pop("archive_bucket",None)super().__init__(**self.config)def__repr__(self):config=json.loads(super().__repr__())config["script_bucket"]=self.script_bucketifself.archive_bucket:config["archive_bucket"]=self.archive_bucketreturnf"'{json.dumps({k:vfork,vinconfig.items()ifvisnotNone})}'"@propertydefscript_bucket(self):"""Bucket to save user script. Also accept a prefix in the format of oci://<bucket-name>@<namespace>/<prefix>. Returns ------- str script bucket (path) """returnself._script_bucket@script_bucket.setterdefscript_bucket(self,v:str):self._script_bucket=v@propertydefarchive_bucket(self):"""Bucket to save archive zip. Also accept a prefix in the format of oci://<bucket-name>@<namespace>/<prefix>. Returns ------- str : archive bucket (path) """returnself._archive_bucket@archive_bucket.setterdefarchive_bucket(self,v:str):self._archive_bucket=v