ads.opctl.distributed.common package#

Submodules#

ads.opctl.distributed.common.abstract_cluster_provider module#

class ads.opctl.distributed.common.abstract_cluster_provider.ClusterProvider(mode, ephemeral=True, life_span='0h', work_dir='')[source]#

Bases: object

Provides contract for implementing Framework specific Cluster Life Cycle Manager.

The main node is identified by the environment variable - MAIN The worker node is identified by the environment variable - WORKER

The worker and main coordinate cluster configuration using the directory provided via WORK_DIR. The main node creates config.json in the WORK_DIR. The worker nodes polls the config.json and exports the configuration as environment variables

DEFAULT_CODE_DIR = '/code'#
SYNC_SCRIPT_PATH = '/etc/datascience/sync.sh'#
basic_configuration() dict[source]#

Prepares basic set of configuration which is framework independent. This configuration is decorated later by configuration method implemented by framework specific implementations

check_cluster_status()[source]#
configuration(conf={}) dict[source]#

Provides the configuration information of the cluster.

conf:

Contains generic information about the cluster, generated using basic_configuration. Eg. IP Address of the main process

static create_sync_script(sync_script_fn, sync_script)[source]#
execution_failed()[source]#

Invoked when code submitted to epheramal cluster fails. Calling this method sets the cluster tearable state

expected_worker_count()[source]#
export_config_files()[source]#

By default only exports configuration generated by main. This behavior can be overridden.

export_configuration(files)[source]#

Read the configuration in the files array and export to environment variable

fetch_all_worker_info()[source]#

Fetchs all the worker configs In some cluster the main requires information about all worker IPs apriori. This method maybe useful in such situation.

fetch_code()[source]#
classmethod find_self_ip(authinfo)[source]#

Identify IP address by finding which of the host IP intersects with the CIDR block of the subnet associated with the JOB_OCID

get_oci_auth()[source]#
get_sync_loc()[source]#
get_sync_script()[source]#
profile_cmd()[source]#
reached_endoflife()[source]#
ready()[source]#
run_code()[source]#
setup_configuration(config: dict | None = None)[source]#

Writes the configuration information into location provided by work_dir

config:

dictionary containing configuration information that needs to be shared with the workers if config is None, then this method calls self.configuration and saves the configuration

work_dir:

Could be any valid URI supported by fsspec

setup_extra_configs(conf: dict)[source]#
start()[source]#

Starts the cluster processes

start_main()[source]#

Implement this for starting the main process. Eg. scheduler for Dask

start_ps()[source]#

Implement this for starting the ps process. Eg. tf-parameter-server for tensorflow

start_worker()[source]#

Implement this for starting the worker process. Eg. dask-worker for Dask

stop()[source]#

Writes stop file and exits.

property stop_filesystem#
sync(loop=True)[source]#
tearable()[source]#

Checks if cluster is ready for tear down. If there is a stop file in the tmp directory then stop. If cluster is ephemeral type, check if code execution is complete If TTL is reached then stop

when_ready(func, *args, **kwargs)[source]#

ads.opctl.distributed.common.abstract_framework_spec_builder module#

class ads.opctl.distributed.common.abstract_framework_spec_builder.AbstractFrameworkSpecBuilder(config)[source]#

Bases: object

Provides contract for implementing Framework specific Cluster Spec Builder

In the example of jobs, this class handles adding framework specific environment variables to the job definition.

NOTE: This class is not invoked while the cluster is running. Only after a call to ads opctl.

update()[source]#
ads.opctl.distributed.common.abstract_framework_spec_builder.update_env_vars(config, env_vars: List)[source]#

env_vars: List, should be formatted as [{“name”: “OCI__XXX”, “value”: YYY},]

ads.opctl.distributed.common.cluster_config_helper module#

class ads.opctl.distributed.common.cluster_config_helper.ClusterConfigToJobSpecConverter(cluster_info)[source]#

Bases: object

job_def_info()[source]#
job_run_info(jobType)[source]#

ads.opctl.distributed.common.cluster_provider_factory module#

class ads.opctl.distributed.common.cluster_provider_factory.ClusterProviderFactory[source]#

Bases: object

Factory class for creating provider instance.

static get_provider(key, *args, **kwargs)[source]#
provider = {}#
classmethod register(cluster_type, provider_class)[source]#

ads.opctl.distributed.common.cluster_runner module#

class ads.opctl.distributed.common.cluster_runner.ClusterRunner(cluster_provider=None, cluster_key=None)[source]#

Bases: object

run()[source]#

ads.opctl.distributed.common.framework_factory module#

Module contents#