Source code for ads.common.config

#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import os
from collections import defaultdict
from configparser import ConfigParser
from copy import copy
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse

import fsspec
import yaml

from ads.common import auth as authutil
from ads.common.decorator.argument_to_case import ArgumentCase, argument_to_case

try:
    from yaml import CSafeDumper as dumper
except:
    from yaml import SafeDumper as dumper


DEFAULT_CONFIG_PROFILE = "DEFAULT"
DEFAULT_CONFIG_PATH = "~/.ads/config"


[docs]class EventType(Enum): CHANGE = "change"
[docs]class Mode: READ = "r" WRITE = "w"
[docs]class Eventing: """The class helper to register event handlers.""" def __init__(self): self._events = defaultdict(set)
[docs] def trigger(self, event: str) -> None: """Triggers all the registered callbacks for the particular event.""" for callback in self._events[event]: callback()
[docs] def on(self, event_name: str, callback: Callable) -> None: """Registers a callback for the particular event.""" self._events[event_name].add(callback)
[docs]class ConfigSection: """The class representing a config section.""" def __init__(self): """Initializes the config section instance.""" self.events = Eventing() self._info = {}
[docs] def clear(self) -> None: """Clears the config section values. Returns ------- None Nothing """ self._info = {} self.events.trigger(EventType.CHANGE.value)
[docs] def copy(self) -> "ConfigSection": """Makes a copy of a config section. Returns ------- ConfigSection The instance of a copied ConfigSection. """ return self.__class__()._with_dict(info=copy(self._info), replace=True)
def _with_dict( self, info: Dict[str, Any], replace: Optional[bool] = False ) -> "ConfigSection": """Populates the config section from a dictionary. Parameters ---------- info: Dict[str, Any] The config section information in a dictionary format. replace: (bool, optional). Defaults to False. If set as True, overwrites config section with the new information. Returns ------- ConfigSection The instance of a ConfigSection. Raises ----- TypeError If input data is not a dictionary. ValueError If config section is already contain provided fields and `replace` flag set to False. """ if not isinstance(info, dict): raise TypeError("The `info` must be a dictionary.") common_keys = list( set(self._info.keys()).intersection(set(list(map(str.lower, info.keys())))) ) if common_keys and not replace: raise ValueError( f"The config section is already contain fields: {common_keys}. " "Use `replace=True` to overwrite." ) for k, v in info.items(): self._set(key=k.lower(), value=v, replace=replace) return self
[docs] def with_dict( self, info: Dict[str, Any], replace: Optional[bool] = False ) -> "ConfigSection": """Populates the config section from a dictionary. Parameters ---------- info: Dict[str, Any] The config section information in a dictionary format. replace: (bool, optional). Defaults to False. If set as True, overwrites config section with the new information. """ self._with_dict(info=info, replace=replace) self.events.trigger(EventType.CHANGE.value) return self
[docs] def keys(self) -> Tuple[str]: """Gets the list of the keys of a config section. Returns ------- Tuple[str] The list of config section keys. """ return tuple(self._info.keys())
[docs] def to_dict(self) -> Dict[str, Any]: """Converts config section to a dictionary. Returns ------- Dict[str, Any] The config section in a dictionary format. """ return self._info
[docs] @argument_to_case(case=ArgumentCase.LOWER, arguments=["key"]) def get(self, key: str) -> str: """Gets the config section value by key. Returns ------- str A specific config section value. """ return self._info.get(key)
@argument_to_case(case=ArgumentCase.LOWER, arguments=["key"]) def _set(self, key: str, value: str, replace: Optional[bool] = False) -> None: """Sets the config section value by key. Parameters ---------- key: str The config section field key. value: str The config section field value. Returns ------- None Nothing Raises ------ ValueError In case when field with provided key already exists and `replace` flag set to False. """ if self._info.get(key) == value: return if key in self._info and not replace: raise ValueError( f"The field with key `{key}` already exists. " "Use `replace=True` to overwrite." ) self._info[key] = value
[docs] def set(self, key: str, value: str, replace: Optional[bool] = False) -> None: """Sets the config section value by key. Parameters ---------- key: str The config section field key. value: str The config section field value. Returns ------- None Nothing """ self._set(key, value, replace) self.events.trigger(EventType.CHANGE.value)
[docs] @argument_to_case(case=ArgumentCase.LOWER, arguments=["key"]) def remove(self, key: str) -> None: """Removes the config section field by key. Parameters ---------- key: str The config section field key. Returns ------- None Nothing """ self._info.pop(key, None) self.events.trigger(EventType.CHANGE.value)
def __getitem__(self, key: str): return self.get(key) def __setitem__(self, key: str, value: str): self.set(key=key, value=value, replace=True) def __bool__(self): return any(self._info.values()) def __repr__(self): return yaml.dump(self.to_dict(), Dumper=dumper)
[docs]class Config: """The class representing a config.""" __DEFAULT_SECTIONS = { DEFAULT_CONFIG_PROFILE: ConfigSection, } def __init__( self, uri: Optional[str] = DEFAULT_CONFIG_PATH, auth: Optional[Dict] = None, ): """Initializes a config instance. Parameters ---------- uri: (str, optional). Defaults to `~/.ads/config`. The path to the config file. Can be local or Object Storage file. auth: (Dict, optional). Defaults to None. The default authetication is set using `ads.set_auth` API. If you need to override the default, use the `ads.common.auth.api_keys` or `ads.common.auth.resource_principal` to create appropriate authentication signer and kwargs required to instantiate IdentityClient object. """ self._config = {} self.auth = auth or authutil.default_signer() # configure default config sections for key, default_section in self.__DEFAULT_SECTIONS.items(): self._config[key] = default_section() self._config[key].events.on(EventType.CHANGE.value, self._on_change) self.uri = os.path.expanduser(uri) self._config_parser = ExtendedConfigParser(uri=self.uri, auth=self.auth) def _on_change(self): """This method will be called when config modified.""" pass
[docs] def default(self) -> ConfigSection: """Gets default config section. Returns ------- ConfigSection A default config section. """ return self.section_get(DEFAULT_CONFIG_PROFILE)
[docs] @argument_to_case(case=ArgumentCase.UPPER, arguments=["key"]) def section_exists(self, key: str) -> bool: """Checks if a config section exists. Parameters ---------- key: str A key of a config section. Returns ------- bool True if a config section exists, Fasle otherwise. """ return key in self._config
[docs] @argument_to_case(case=ArgumentCase.UPPER, arguments=["key"]) def section_get(self, key: str) -> ConfigSection: """Gets the config section by key. Returns ------- ConfigSection A config section object. Raises ------ KeyError If a config section not exists. """ if key not in self._config: raise KeyError(f"The config section `{key}` not found.") return self._config.get(key)
[docs] @argument_to_case(case=ArgumentCase.UPPER, arguments=["key"]) def section_set( self, key: str, info: Union[dict, ConfigSection], replace: Optional[bool] = False, ) -> ConfigSection: """ Sets a config section to config. The new config section will be added in case if it doesn't exist. Otherwise the existing config section will be merged with the new fields. Parameters ---------- key: str A key of a config section. info: Union[dict, ConfigSection] The config section information in a dictionary or ConfigSection format. replace: (bool, optional). Defaults to False. If set as True, overwrites config section with the new information. Returns ------- ConfigSection A config section object. Raises ------ ValueError If section with given key is already exist and `replace` flag set to False. TypeError If input `info` has a wrong format. """ if key in self._config and not replace: raise ValueError( f"A config section `{key}` is already exist. " "Use `replace=True` if you want to overwrite." ) if not isinstance(info, (dict, ConfigSection)): raise TypeError( "Parameter `info` must be either a `dictionary` or `ConfigSection` object." ) if key not in self._config: self._config[key] = ConfigSection() self._config[key].events.on(EventType.CHANGE.value, self._on_change) if isinstance(info, ConfigSection): self._config[key].with_dict(info.copy().to_dict(), replace=replace) else: self._config[key].with_dict(copy(info), replace=replace) return self._config[key]
[docs] @argument_to_case(case=ArgumentCase.UPPER, arguments=["key"]) def section_remove(self, key: str) -> "Config": """Removes config section form config. Parameters ---------- key: str A key of a config section that needs to be removed. Returns ------- None Nothing """ self._config.pop(key, None) self._on_change() return self
[docs] def save( self, uri: Optional[str] = None, auth: Optional[Dict] = None, force_overwrite: Optional[bool] = False, ) -> "Config": """Saves config to a config file. Parameters ---------- uri: (str, optional). Defaults to `~/.ads/config`. The path to the config file. Can be local or Object Storage file. auth: (Dict, optional). Defaults to None. The default authentication is set using `ads.set_auth` API. If you need to override the default, use the `ads.common.auth.api_keys` or `ads.common.auth.resource_principal` to create appropriate authentication signer and kwargs required to instantiate IdentityClient object. force_overwrite: (bool, optional). Defaults to `False`. Overwrites the config if exists. Returns ------- None Nothing """ uri = uri or self.uri auth = auth or self.auth or authutil.default_signer() self._config_parser.with_dict(self.to_dict()).save( uri=uri, auth=auth, force_overwrite=force_overwrite ) return self
[docs] def load(self, uri: Optional[str] = None, auth: Optional[Dict] = None) -> "Config": """Loads config from a config file. Parameters ---------- uri: (str, optional). Defaults to `~/.ads/config`. The path where the config file needs to be saved. Can be local or Object Storage file. auth: (Dict, optional). Defaults to None. The default authentication is set using `ads.set_auth` API. If you need to override the default, use the `ads.common.auth.api_keys` or `ads.common.auth.resource_principal` to create appropriate authentication signer and kwargs required to instantiate IdentityClient object. Returns ------- Config A config object. """ uri = uri or self.uri auth = auth or self.auth or authutil.default_signer() return self.with_dict( self._config_parser.read(uri=uri, auth=auth).to_dict(), replace=True )
[docs] def with_dict( self, info: Dict[str, Union[Dict[str, Any], ConfigSection]], replace: Optional[bool] = False, ) -> "Config": """Merging dictionary to config. Parameters ---------- info: Dict[str, Union[Dict[str, Any], ConfigSection]] A dictionary that needs to be merged to config. replace: (bool, optional). Defaults to False. If set as True, overwrites config section with the new information. Returns ------- Config A config object. """ self._validate(info) try: for key, value in info.items(): self.section_set(key, value, replace=replace) finally: self._on_change() return self
[docs] def to_dict(self) -> Dict[str, Dict[str, Any]]: """Converts config to a dictionary format. Returns ------- Dict[str, Dict[str, Any]] A config in a dictionary format. """ return {key: value.to_dict() for key, value in self._config.items()}
[docs] def keys(self) -> List[str]: """Gets the all registered config section keys. Returns ------- List[str] The list of the all registered config section keys. """ return self._config.keys()
def _validate(self, info: Dict[str, Union[Dict[str, Any], ConfigSection]]) -> None: """Validates input dictionary.""" if not info or not isinstance(info, Dict): raise TypeError("The input data should be a dictionary.") for key, value in info.items(): if value and not isinstance(value, (Dict, ConfigSection)): raise ValueError( f"The `{key}` must be a dictionary or a `ConfigSection` instance." ) def __getitem__(self, key: str): return self.section_get(key) def __setitem__(self, key, value: Union[Dict, ConfigSection]): self.section_set(key, value, replace=True) def __repr__(self): return yaml.dump(self.to_dict(), Dumper=dumper)
[docs]class ExtendedConfigParser(ConfigParser): """Class helper to read/write information to the config file.""" def __init__( self, uri: Optional[str] = DEFAULT_CONFIG_PATH, auth: Optional[Dict] = None ): """Initializes a config parser instance. Parameters ---------- uri: (str, optional). Defaults to `~/.ads/config`. The path to the config file. Can be local or Object Storage file. auth: (Dict, optional). Defaults to None. The default authentication is set using `ads.set_auth` API. If you need to override the default, use the `ads.common.auth.api_keys` or `ads.common.auth.resource_principal` to create appropriate authentication signer and kwargs required to instantiate IdentityClient object. """ super().__init__(default_section="EXCLUDE_DEFAULT_SECTION") self.auth = auth or authutil.default_signer() self.uri = uri
[docs] def save( self, uri: Optional[str] = None, auth: Optional[Dict] = None, force_overwrite: Optional[bool] = False, ) -> None: """Saves the config to the file. Parameters ---------- uri: (str, optional). Defaults to `~/.ads/config`. The path to the config file. Can be local or Object Storage file. auth: (Dict, optional). Defaults to None. The default authentication is set using `ads.set_auth` API. If you need to override the default, use the `ads.common.auth.api_keys` or `ads.common.auth.resource_principal` to create appropriate authentication signer and kwargs required to instantiate IdentityClient object. force_overwrite: (bool, optional). Defaults to `False`. Overwrites the config if exists. Returns ------- None Raise ----- FileExistsError In case if file exists and force_overwrite is false. """ uri = uri or self.uri auth = auth or self.auth or authutil.default_signer() if not force_overwrite: dst_path_scheme = urlparse(uri).scheme or "file" if fsspec.filesystem(dst_path_scheme, **auth).exists(uri): raise FileExistsError( f"The `{uri}` exists. Set `force_overwrite` to True " "if you wish to overwrite." ) with fsspec.open(uri, mode="w", **auth) as f: self.write(f)
[docs] def to_dict(self) -> Dict[str, Any]: """Converts config to a dictionary. Returns ------- Dict[str, Any] Config in a dictionary format. """ return {s: dict(self[s]) for s in self.keys() if self[s]}
[docs] def read( self, uri: Optional[str] = None, auth: Optional[Dict] = None ) -> "ExtendedConfigParser": """Reads config file. uri: (str, optional). Defaults to `~/.ads/config`. The path to the config file. Can be local or Object Storage file. auth: (Dict, optional). Defaults to None. The default authentication is set using `ads.set_auth` API. If you need to override the default, use the `ads.common.auth.api_keys` or `ads.common.auth.resource_principal` to create appropriate authentication signer and kwargs required to instantiate IdentityClient object. Returns ------- ExtendedConfigParser Config parser object. """ uri = uri or self.uri auth = auth or self.auth or authutil.default_signer() with fsspec.open(uri, "r", **auth) as f: self.read_string(f.read()) return self
[docs] def with_dict(self, info: Dict[str, Dict[str, Any]]) -> "ExtendedConfigParser": """Populates config with values from a dictionary. Parameters ---------- info: Dict[str, Dict[str, Any]] Config in a dictionary format. Returns ------- ExtendedConfigParser Config parser object. """ self.read_dict(info) return self