Source code for ads.opctl.distributed.common.abstract_framework_spec_builder

#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2022 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

from abc import ABCMeta
from typing import List


[docs] class AbstractFrameworkSpecBuilder(metaclass=ABCMeta): """ Provides contract for implementing Framework specific Cluster Spec Builder In the example of jobs, this class handles adding framework specific environment variables to the job definition. NOTE: This class is not invoked while the cluster is running. Only after a call to `ads opctl`. """ def __init__(self, config): self.config = config
[docs] def update(self): new_env_vars = [] # Call DaskFramework().build_env_vars # FrameworkFactory.get("kind").build_env_vars("spec, framework") user_entrypoint = self.config["spec"]["Runtime"]["spec"]["entrypoint"] args = self.config["spec"]["Runtime"]["spec"]["args"] # TODO: # kwargs = self.config["spec"]["Runtime"]["spec"]["kwargs"] nanny_port = self.config["spec"]["Framework"]["spec"]["schedulerConfig"][ "nannyPortRange" ] worker_port = self.config["spec"]["Framework"]["spec"]["schedulerConfig"][ "workerPortRange" ] worker_timeout = self.config["spec"]["Framework"]["spec"]["workerConfig"][ "timeout" ] framework = self.config["spec"]["Framework"]["kind"] work_dir = self.config["spec"]["Framework"]["spec"]["workDir"] ephemeral = self.config["spec"]["Infrastructure"]["spec"]["cluster"][ "ephemeral" ] worker_count = self.config["spec"]["Infrastructure"]["spec"]["cluster"][ "numWorkers" ] new_env_vars.extend( [ {"name": "OCI__ENTRY_SCRIPT", "value": user_entrypoint}, {"name": "OCI__ENTRY_SCRIPT_ARGS", "value": args}, # TODO check with QQ { "name": "OCI__ENTRY_SCRIPT_KWARGS", "value": None, }, # TODO check with QQ {"name": "OCI__NANNY_PORT", "value": nanny_port}, {"name": "OCI__WORKER_PORT", "value": worker_port}, {"name": "OCI__WORKER_TIMEOUT", "value": worker_timeout}, {"name": "OCI__WORKER_COUNT", "value": worker_count}, {"name": "OCI__CLUSTER_TYPE", "value": framework}, { "name": "OCI__EPHEMERAL", "value": ephemeral, }, # TODO: remove from yaml or build out {"name": "OCI__LIFE_SPAN", "value": ""}, # TODO: for testing later {"name": "OCI__WORK_DIR", "value": work_dir}, ] ) new_env_vars = self._add_framework_envs(new_env_vars) return update_env_vars(self.config, new_env_vars)
def _add_framework_envs(self, new_env_vars: List): # Overwrite this function in your subclass in order to add/subtract environment variables return new_env_vars
[docs] def update_env_vars(config, env_vars: List): """ env_vars: List, should be formatted as [{"name": "OCI__XXX", "value": YYY},] """ # TODO move this to a class which checks the version, kind, type, etc. config["spec"]["Runtime"]["spec"]["environmentVariables"].extend(env_vars) return config