Source code for ads.jobs.utils

#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/


import os
import json

from oci.data_flow.models import Application

from ads.opctl.config.utils import read_from_ini
from ads.opctl.constants import (
    ADS_DATAFLOW_CONFIG_FILE_NAME,
    DEFAULT_ADS_CONFIG_FOLDER,
)

from ads.jobs import logger
from ads.common.utils import oci_key_profile


[docs] def get_dataflow_config(path=None, oci_profile=None): if path: dataflow_config_file_path = os.path.abspath(os.path.expanduser(path)) else: dataflow_config_file_path = os.path.expanduser( os.path.join(DEFAULT_ADS_CONFIG_FOLDER, ADS_DATAFLOW_CONFIG_FILE_NAME) ) config = {} if os.path.exists(dataflow_config_file_path): parser = read_from_ini(dataflow_config_file_path) if not oci_profile: oci_profile = oci_key_profile() if oci_profile in parser: config = dict(parser[oci_profile]) if len(config) == 0: logger.error( f"Dataflow configuration with profile {oci_profile} not found." ) raise ValueError( f"Dataflow configuration with profile {oci_profile} not found." ) return config else: logger.warning(f"{dataflow_config_file_path} not found. Follow this link https://accelerated-data-science.readthedocs.io/en/latest/user_guide/apachespark/dataflow.html to set up the config.") return {}
[docs] class DataFlowConfig(Application): def __init__(self, path: str = None, oci_profile: str = None): """Create a DataFlowConfig object. If a path to config file is given it is loaded from the path. Parameters ---------- path : str, optional path to configuration file, by default None oci_profile : str, optional oci profile to use, by default None """ self.config = get_dataflow_config(path, oci_profile) self._script_bucket = None self._archive_bucket = None if len(self.config) > 0: self._script_bucket = self.config.pop("script_bucket") self._archive_bucket = self.config.pop("archive_bucket", None) super().__init__(**self.config) def __repr__(self): config = json.loads(super().__repr__()) config["script_bucket"] = self.script_bucket if self.archive_bucket: config["archive_bucket"] = self.archive_bucket return f"'{json.dumps({k: v for k, v in config.items() if v is not None})}'" @property def script_bucket(self): """Bucket to save user script. Also accept a prefix in the format of oci://<bucket-name>@<namespace>/<prefix>. Returns ------- str script bucket (path) """ return self._script_bucket @script_bucket.setter def script_bucket(self, v: str): self._script_bucket = v @property def archive_bucket(self): """Bucket to save archive zip. Also accept a prefix in the format of oci://<bucket-name>@<namespace>/<prefix>. Returns ------- str : archive bucket (path) """ return self._archive_bucket @archive_bucket.setter def archive_bucket(self, v: str): self._archive_bucket = v