ads.opctl.distributed.common package

Submodules

ads.opctl.distributed.common.abstract_cluster_provider module

class ads.opctl.distributed.common.abstract_cluster_provider.ClusterProvider(mode, ephemeral=True, life_span='0h', work_dir='')[source]

Bases: object

Provides contract for implementing Framework specific Cluster Life Cycle Manager.

The main node is identified by the environment variable - MAIN The worker node is identified by the environment variable - WORKER

The worker and main coordinate cluster configuration using the directory provided via WORK_DIR. The main node creates config.json in the WORK_DIR. The worker nodes polls the config.json and exports the configuration as environment variables

DEFAULT_CODE_DIR = '/code'
SYNC_SCRIPT_PATH = '/etc/datascience/sync.sh'
basic_configuration() dict[source]

Prepares basic set of configuration which is framework independent. This configuration is decorated later by configuration method implemented by framework specific implementations

check_cluster_status()[source]
configuration(conf={}) dict[source]

Provides the configuration information of the cluster.

conf:

Contains generic information about the cluster, generated using basic_configuration. Eg. IP Address of the main process

static create_sync_script(sync_script_fn, sync_script)[source]
execution_failed()[source]

Invoked when code submitted to epheramal cluster fails. Calling this method sets the cluster tearable state

expected_worker_count()[source]
export_config_files()[source]

By default only exports configuration generated by main. This behavior can be overridden.

export_configuration(files)[source]

Read the configuration in the files array and export to environment variable

fetch_all_worker_info()[source]

Fetchs all the worker configs In some cluster the main requires information about all worker IPs apriori. This method maybe useful in such situation.

fetch_code()[source]
classmethod find_self_ip(authinfo)[source]

Identify IP address by finding which of the host IP intersects with the CIDR block of the subnet associated with the JOB_OCID

get_oci_auth()[source]
get_sync_loc()[source]
get_sync_script()[source]
profile_cmd()[source]
reached_endoflife()[source]
ready()[source]
run_code()[source]
setup_configuration(config: dict | None = None)[source]

Writes the configuration information into location provided by work_dir

config:

dictionary containing configuration information that needs to be shared with the workers if config is None, then this method calls self.configuration and saves the configuration

work_dir:

Could be any valid URI supported by fsspec

setup_extra_configs(conf: dict)[source]
start()[source]

Starts the cluster processes

start_main()[source]

Implement this for starting the main process. Eg. scheduler for Dask

start_ps()[source]

Implement this for starting the ps process. Eg. tf-parameter-server for tensorflow

start_worker()[source]

Implement this for starting the worker process. Eg. dask-worker for Dask

stop()[source]

Writes stop file and exits.

property stop_filesystem
sync(loop=True)[source]
tearable()[source]

Checks if cluster is ready for tear down. If there is a stop file in the tmp directory then stop. If cluster is ephemeral type, check if code execution is complete If TTL is reached then stop

when_ready(func, *args, **kwargs)[source]

ads.opctl.distributed.common.abstract_framework_spec_builder module

class ads.opctl.distributed.common.abstract_framework_spec_builder.AbstractFrameworkSpecBuilder(config)[source]

Bases: object

Provides contract for implementing Framework specific Cluster Spec Builder

In the example of jobs, this class handles adding framework specific environment variables to the job definition.

NOTE: This class is not invoked while the cluster is running. Only after a call to ads opctl.

update()[source]
ads.opctl.distributed.common.abstract_framework_spec_builder.update_env_vars(config, env_vars: List)[source]

env_vars: List, should be formatted as [{“name”: “OCI__XXX”, “value”: YYY},]

ads.opctl.distributed.common.cluster_config_helper module

class ads.opctl.distributed.common.cluster_config_helper.ClusterConfigToJobSpecConverter(cluster_info)[source]

Bases: object

job_def_info()[source]
job_run_info(jobType)[source]

ads.opctl.distributed.common.cluster_provider_factory module

class ads.opctl.distributed.common.cluster_provider_factory.ClusterProviderFactory[source]

Bases: object

Factory class for creating provider instance.

static get_provider(key, *args, **kwargs)[source]
provider = {}
classmethod register(cluster_type, provider_class)[source]

ads.opctl.distributed.common.cluster_runner module

class ads.opctl.distributed.common.cluster_runner.ClusterRunner(cluster_provider=None, cluster_key=None)[source]

Bases: object

run()[source]

ads.opctl.distributed.common.framework_factory module

Module contents