Source code for ads.opctl.distributed.common.abstract_framework_spec_builder
#!/usr/bin/env python# -*- coding: utf-8; -*-# Copyright (c) 2022 Oracle and/or its affiliates.# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/fromabcimportABCMetafromtypingimportList
[docs]classAbstractFrameworkSpecBuilder(metaclass=ABCMeta):""" Provides contract for implementing Framework specific Cluster Spec Builder In the example of jobs, this class handles adding framework specific environment variables to the job definition. NOTE: This class is not invoked while the cluster is running. Only after a call to `ads opctl`. """def__init__(self,config):self.config=config
[docs]defupdate(self):new_env_vars=[]# Call DaskFramework().build_env_vars# FrameworkFactory.get("kind").build_env_vars("spec, framework")user_entrypoint=self.config["spec"]["Runtime"]["spec"]["entrypoint"]args=self.config["spec"]["Runtime"]["spec"]["args"]# TODO:# kwargs = self.config["spec"]["Runtime"]["spec"]["kwargs"]nanny_port=self.config["spec"]["Framework"]["spec"]["schedulerConfig"]["nannyPortRange"]worker_port=self.config["spec"]["Framework"]["spec"]["schedulerConfig"]["workerPortRange"]worker_timeout=self.config["spec"]["Framework"]["spec"]["workerConfig"]["timeout"]framework=self.config["spec"]["Framework"]["kind"]work_dir=self.config["spec"]["Framework"]["spec"]["workDir"]ephemeral=self.config["spec"]["Infrastructure"]["spec"]["cluster"]["ephemeral"]worker_count=self.config["spec"]["Infrastructure"]["spec"]["cluster"]["numWorkers"]new_env_vars.extend([{"name":"OCI__ENTRY_SCRIPT","value":user_entrypoint},{"name":"OCI__ENTRY_SCRIPT_ARGS","value":args},# TODO check with QQ{"name":"OCI__ENTRY_SCRIPT_KWARGS","value":None,},# TODO check with QQ{"name":"OCI__NANNY_PORT","value":nanny_port},{"name":"OCI__WORKER_PORT","value":worker_port},{"name":"OCI__WORKER_TIMEOUT","value":worker_timeout},{"name":"OCI__WORKER_COUNT","value":worker_count},{"name":"OCI__CLUSTER_TYPE","value":framework},{"name":"OCI__EPHEMERAL","value":ephemeral,},# TODO: remove from yaml or build out{"name":"OCI__LIFE_SPAN","value":""},# TODO: for testing later{"name":"OCI__WORK_DIR","value":work_dir},])new_env_vars=self._add_framework_envs(new_env_vars)returnupdate_env_vars(self.config,new_env_vars)
def_add_framework_envs(self,new_env_vars:List):# Overwrite this function in your subclass in order to add/subtract environment variablesreturnnew_env_vars
[docs]defupdate_env_vars(config,env_vars:List):""" env_vars: List, should be formatted as [{"name": "OCI__XXX", "value": YYY},] """# TODO move this to a class which checks the version, kind, type, etc.config["spec"]["Runtime"]["spec"]["environmentVariables"].extend(env_vars)returnconfig