Source code for ads.jobs.env_var_parser

#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2021, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
import copy
import json
from typing import Union, List, Dict
from configparser import ConfigParser, ExtendedInterpolation
from configparser import (
    InterpolationSyntaxError,
    InterpolationDepthError,
)
from configparser import NoSectionError, NoOptionError, MAX_INTERPOLATION_DEPTH


[docs]class EnvVarInterpolation(ExtendedInterpolation): """Modified version of ExtendedInterpolation to ignore errors https://github.com/python/cpython/blob/main/Lib/configparser.py """
[docs] def before_set(self, parser, section: str, option: str, value: str) -> str: return value
def _interpolate_some(self, parser, option, accum, rest, section, map, depth): rawval = parser.get(section, option, raw=True, fallback=rest) if depth > MAX_INTERPOLATION_DEPTH: raise InterpolationDepthError(option, section, rawval) while rest: p = rest.find("$") if p < 0: accum.append(rest) return if p > 0: accum.append(rest[:p]) rest = rest[p:] # p is no longer used c = rest[1:2] if c == "$": accum.append("$") rest = rest[2:] elif c == "{": m = self._KEYCRE.match(rest) if m is None: raise InterpolationSyntaxError( option, section, "Bad interpolation variable reference %r" % rest, ) path = m.group(1).split(":") sect = section opt = option try: if len(path) == 1: opt = parser.optionxform(path[0]) v = map[opt] elif len(path) == 2: sect = path[0] opt = parser.optionxform(path[1]) v = parser.get(sect, opt, raw=True) else: raise InterpolationSyntaxError( option, section, "More than one ':' found: %r" % (rest[m.end() :],), ) except (KeyError, NoSectionError, NoOptionError): accum.append(rest) return rest = rest[m.end() :] if "$" in v: self._interpolate_some( parser, opt, accum, v, sect, dict(parser.items(sect, raw=True)), depth + 1, ) else: accum.append(v) else: accum.append("$") rest = rest[1:]
[docs]def parse(env_var: Union[Dict, List[dict]]) -> dict: """Parse the environment variables and perform substitutions. This will also converts kubernetes style environment variables from a list to a dictionary. Parameters ---------- env_var : dict or list Environment variables specified as a list or a dictionary. If evn_var is a list, it should be in the format of: "[{"name": "ENV_NAME_1", "value": "ENV_VALUE_1"}, {"name": "ENV_NAME_2", "value": "ENV_VALUE_2"}] Returns ------- dict Environment variable as a dictionary. """ # Convert kubernetes style env to dict if isinstance(env_var, list): env_var = {ev["name"]: ev["value"] for ev in env_var} else: env_var = copy.deepcopy(env_var) config = ConfigParser(interpolation=EnvVarInterpolation()) config.optionxform = str for k in env_var.keys(): if env_var[k] is None: # Convert None to empty string env_var[k] = "" elif not isinstance(env_var[k], str): # If the value is not a string, # try to dump it as json string try: env_var[k] = json.dumps(env_var[k]) except Exception: # Cast the value to string if it is not json serializable env_var[k] = str(env_var[k]) config["envs"] = env_var return {k: config["envs"].get(k) for k in env_var.keys()}
[docs]def escape(s: str) -> str: return s.replace("$", "$$")