Source code for ads.opctl.distributed.common.abstract_cluster_provider

#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2022 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import ipaddress
import json
import os
import sys
from time import sleep, time, time_ns
from urllib.parse import urlparse
import subprocess


import ads
import fsspec
import oci
import pandas as pd  # Have to find a better way for timedelta
import psutil
from ads.jobs import Job
from ads.opctl.distributed.common import cluster_config_helper
from ads.jobs.templates.driver_oci import GitSSHKey, GitManager
from ads.jobs.templates.driver_utils import JobRunner
from ads.jobs.builders.runtimes.artifact import Artifact


[docs] class ClusterProvider: """ Provides contract for implementing Framework specific Cluster Life Cycle Manager. The main node is identified by the environment variable - `MAIN` The worker node is identified by the environment variable - `WORKER` The worker and main coordinate cluster configuration using the directory provided via `WORK_DIR`. The main node creates config.json in the `WORK_DIR`. The worker nodes polls the config.json and exports the configuration as environment variables """ SYNC_SCRIPT_PATH = "/etc/datascience/sync.sh" DEFAULT_CODE_DIR = "/code" def __init__(self, mode, ephemeral=True, life_span="0h", work_dir=""): self.mode = mode self.start_time = time() self.ephemeral = ephemeral self.authinfo = self.get_oci_auth() self.ip = self.find_self_ip(self.authinfo) self.end_time = ( pd.Timedelta(int(life_span[:-1]), life_span[-1]).total_seconds() + self.start_time ) self.work_dir = work_dir self._get_my_work_dir() self.main_config_file = os.path.join( self.my_work_dir, f"MAIN_config.json" ) # In case of worker, we have two config file, worker generated and main config self.time_out = int( os.environ.get("OCI__TIMEOUT", "600") ) # Time out to wait for the config file self.fetch_code() self.setup_configuration() # Write cluster configuration to `work_dir`. Eg. IP address of scheduler, etc. # self.tmpdir = os.environ["tmpdir"] self.stop_file = os.path.join( self.my_work_dir, "stop" ) # Control file to instruct cluster to stop scheme = urlparse( self.stop_file, ).scheme self.scheme = scheme self.execution_failure = False self.code_execution_complete = False self.sync()
[docs] def sync(self, loop=True): sync_artifacts = os.environ.get("SYNC_ARTIFACTS", 0) print(f"sync_artifacts - {sync_artifacts}") if sync_artifacts == "1": bkt_name, prefix = self.get_sync_loc() if bkt_name is None: print( "WARNING: Sync 'WORKSPACE', 'WORKSPACE_PREFIX' or 'work_dir' not configured. Skipping Sync" ) return sync_script_fn = self.SYNC_SCRIPT_PATH if not os.path.exists(sync_script_fn): sync_script = self.get_sync_script() self.create_sync_script(sync_script_fn, sync_script) subprocess.Popen( [sync_script_fn, bkt_name, prefix, "-l"] ) if loop else subprocess.Popen([sync_script_fn, bkt_name, prefix]) if not loop: sleep_duration = int(os.environ.get("POST_PROCESSING_WAIT", 60)) print(f"post processing wait..{sleep_duration} seconds") sleep(sleep_duration) print("..")
[docs] def get_sync_loc(self): bckt_name = os.environ.get("WORKSPACE") pfx = os.environ.get("WORKSPACE_PREFIX") if bckt_name is None: scheme = urlparse(self.work_dir) if scheme.scheme == "oci": bckt_name = scheme.netloc.split("@")[0] pfx = scheme.path pfx = pfx.strip("//") return bckt_name, pfx
[docs] def profile_cmd(self): profile = os.environ.get("PROFILE", "0") cmd = [] if profile == "1": print("Profiler ON") cmd = os.environ.get("PROFILE_CMD").split(" ") return cmd
[docs] @staticmethod def create_sync_script(sync_script_fn, sync_script): sync_script_fn_obj = open(sync_script_fn, "w") sync_script_fn_obj.write(sync_script) sync_script_fn_obj.close() os.chmod(sync_script_fn, 755)
[docs] def get_sync_script(self): return sync_script_str
[docs] def fetch_code(self): runtime_type = os.environ.get(cluster_config_helper.OCI__RUNTIME_TYPE) if not runtime_type: return delegates = {"git": self._fetch_git, "remote": self._fetch_remote} if runtime_type not in delegates: raise ValueError(f"Runtime type {runtime_type} is not supported.") if cluster_config_helper.OCI__CODE_DIR not in os.environ: os.environ[cluster_config_helper.OCI__CODE_DIR] = self.DEFAULT_CODE_DIR delegates[runtime_type]( code_dir=os.environ.get(cluster_config_helper.OCI__CODE_DIR) )
def _fetch_git(self, code_dir): uri = os.environ.get(cluster_config_helper.OCI__RUNTIME_URI) branch = os.environ.get(cluster_config_helper.OCI__RUNTIME_GIT_BRANCH) commit = os.environ.get(cluster_config_helper.OCI__RUNTIME_GIT_COMMIT) secret_ocid = os.environ.get(cluster_config_helper.OCI__RUNTIME_GIT_SECRET_ID) # with GitSSHKey does nothing if secret_ocid is None or empty with GitSSHKey(secret_ocid): GitManager(uri, code_dir=code_dir).fetch_repo().checkout_code( branch=branch, commit=commit ) JobRunner(code_dir=code_dir).setup_python_path( python_path=os.environ.get(cluster_config_helper.OCI__RUNTIME_PYTHON_PATH), ) def _fetch_remote(self, code_dir): uri = os.environ.get(cluster_config_helper.OCI__RUNTIME_URI) Artifact.copy_from_uri(uri, code_dir)
[docs] def run_code(self): # Sub-class should implement this method to run code and, # set code_execution_complete to True after running code. self.code_execution_complete = True
@property def stop_filesystem(self): authinfo = {} if self.scheme == "oci": authinfo = self.get_oci_auth() return fsspec.filesystem( self.scheme, **authinfo ) # FileSystem class corresponding to the URI scheme. def _get_my_work_dir(self): """ Get the work dir subfolder for the current running job """ # Use "Undefined" as job identifier when not running on OCI jobs. self.jobDefID = os.environ.get("JOB_OCID", "Undefined") # Use time as job run identifier when not running on OCI jobs self.jobRunID = os.environ.get("JOB_RUN_OCID", str(time_ns())) self.my_work_dir = os.path.join(self.work_dir, self.jobDefID) self.config_file = os.path.join( self.my_work_dir, "MAIN_config.json" if self.mode == "MAIN" else f"{self.mode}_{self.jobRunID}_config.json", ) # Config file location
[docs] def export_config_files(self): """ By default only exports configuration generated by main. This behavior can be overridden. """ return [self.main_config_file if self.mode == "WORKER" else self.config_file]
[docs] def reached_endoflife(self): # TODO We can get rid of this method as JobRun takes parameter to set # the max run time. We dont have to handle this here. return (self.end_time - self.start_time) <= 0
[docs] def get_oci_auth(self): profile = os.environ.get("OCI_CONFIG_PROFILE") or os.environ.get( "OCIFS_CONFIG_PROFILE" ) ads.set_auth( os.environ.get("OCI_IAM_TYPE", "resource_principal"), profile=profile or "DEFAULT", ) authinfo = ads.common.auth.default_signer() return authinfo
[docs] @classmethod def find_self_ip(cls, authinfo): """ Identify IP address by finding which of the host IP intersects with the CIDR block of the subnet associated with the JOB_OCID """ if os.environ.get("JOB_OCID"): job_ocid = os.environ["JOB_OCID"] jobDef = Job.from_datascience_job(job_ocid) subnet_id = jobDef.infrastructure.subnet_id core_client = oci.core.VirtualNetworkClient(**authinfo) cidr = core_client.get_subnet(subnet_id).data.cidr_block for interface, snics in psutil.net_if_addrs().items(): ip = snics[0].address if ipaddress.ip_address(ip) in ipaddress.ip_network(cidr): print(f"IP address: {ip}") os.environ["GLOO_SOCKET_IFNAME"] = interface os.environ["NCCL_SOCKET_IFNAME"] = interface return ip print("IP ADDRESS NOT FOUND!!") return None else: import socket hostname = socket.gethostname() ip = socket.gethostbyname(hostname) print(f"IP address: {ip}") return ip
[docs] def basic_configuration( self, ) -> dict: """ Prepares basic set of configuration which is framework independent. This configuration is decorated later by `configuration` method implemented by framework specific implementations """ config = {} config["OCI__MAIN_IP" if self.mode == "MAIN" else "OCI__WORKER_IP"] = self.ip config["tmpdir"] = self.my_work_dir return config
[docs] def configuration(self, conf={}) -> dict: """ Provides the configuration information of the cluster. conf: Contains generic information about the cluster, generated using `basic_configuration`. Eg. IP Address of the main process """ return None
[docs] def setup_extra_configs(self, conf: dict): return None
[docs] def setup_configuration(self, config: dict = None): """ Writes the configuration information into location provided by `work_dir` config: dictionary containing configuration information that needs to be shared with the workers if config is None, then this method calls `self.configuration` and saves the configuration work_dir: Could be any valid URI supported by fsspec """ # tmpdir = self.my_work_dir # self.tmpdir = tmpdir config = config or self.configuration() print( f"Writing configuration: {config} to file: {self.config_file}", flush=True ) with fsspec.open(self.config_file, "w", **self.authinfo) as conf: conf.write(json.dumps(config))
[docs] def export_configuration(self, files): """ Read the configuration in the files array and export to environment variable """ print(f"{self.mode}. Config File: {files}", flush=True) for file in files: with fsspec.open(file, **self.authinfo) as conf: config = json.loads(conf.read()) print(f"Loading config: {config}", flush=True) for k in config: os.environ[k] = str(config[k])
[docs] def expected_worker_count( self, ): return int(os.environ.get("OCI__WORKER_COUNT", 1))
[docs] def fetch_all_worker_info(self): """ Fetchs all the worker configs In some cluster the main requires information about all worker IPs apriori. This method maybe useful in such situation. """ files = self.stop_filesystem.ls( f"{self.my_work_dir}/WORKER*config.json", refresh=True ) worker_details = {} for file in files: with open(file) as wcf: worker_details[file] = json.loads(wcf.read()) return worker_details
[docs] def start_main(self): """ Implement this for starting the main process. Eg. `scheduler` for Dask """ pass
[docs] def start_worker(self): """ Implement this for starting the worker process. Eg. `dask-worker` for Dask """ pass
[docs] def start_ps(self): """ Implement this for starting the ps process. Eg. `tf-parameter-server` for tensorflow """ pass
[docs] def start(self): """ Starts the cluster processes """ if self.mode == "MAIN": # Check if the docker is started in scheduler mode print(f"Starting main process", flush=True) self.start_main() elif self.mode == "WORKER": # Check if the docker is staretd in worker mode print(f"Starting worker process", flush=True) self.when_ready(self.start_worker) elif self.mode == "PS": # Check if the docker is staretd in worker mode print(f"Starting ps process", flush=True) self.when_ready(self.start_ps) else: print(f"Not a valid mode: {self.mode}", flush=True) raise Exception("Not a valid mode")
[docs] def check_cluster_status(self): pass
[docs] def execution_failed(self): """ Invoked when code submitted to epheramal cluster fails. Calling this method sets the cluster tearable state """ self.execution_failure = True
[docs] def tearable(self): """ Checks if cluster is ready for tear down. If there is a `stop` file in the tmp directory then stop. If cluster is ephemeral type, check if code execution is complete If TTL is reached then stop """ if self.stop_filesystem.exists(self.stop_file) or self.execution_failure: print( f"Stopping the process. Reason: {'Code Execution Failure' if self.execution_failure else 'Stop file found'}", flush=True, ) return True if self.ephemeral: return self.code_execution_complete else: return self.reached_endoflife()
[docs] def stop(self): """ Writes stop file and exits. """ if not self.stop_filesystem.exists(self.stop_file): print("Stop file not found. Writing stop file....", flush=True) authinfo = {} if self.scheme == "oci": authinfo = self.get_oci_auth() with fsspec.open(self.stop_file, "w", **authinfo) as sf: sf.write("stop")
[docs] def ready(self): if self.stop_filesystem.exists(self.main_config_file): return True else: return False
[docs] def when_ready(self, func, *args, **kwargs): start_time = time() while not self.ready(): sleep(10) if time() - start_time > self.time_out: raise Exception( "Timed out waiting to be in ready status. Likely cause configuration is missing in the `WORK_DIR`" ) func(*args, **kwargs)
sync_script_str = """#!/bin/bash loop=false if [ "$3" == "-l" ]; then loop=true; fi echo "loop: $loop" sleep_duration=${SYNC_SLEEP:-60} if [ "$OCI_IAM_TYPE" = 'api_key' ]; then auth_method=api_key else auth_method=resource_principal fi echo "auth method: $auth_method" echo "OCI__SYNC_DIR dir: $OCI__SYNC_DIR" echo "sleep duration is $sleep_duration" bucket=$1 prefix=$2/sync/$JOB_OCID/$JOB_RUN_OCID/ while true; do if [[ -d $OCI__SYNC_DIR && -n "$(ls -A $OCI__SYNC_DIR)" ]]; then echo "syncing $OCI__SYNC_DIR to $bucket/$prefix" $HOME/bin/oci os object sync --auth $auth_method --bucket-name $bucket --prefix $prefix --src-dir $OCI__SYNC_DIR else echo "nothing to sync in $OCI__SYNC_DIR" fi if [ "$loop" = false ] ; then break fi sleep $sleep_duration done """