ads.opctl.distributed.common package

Submodules

ads.opctl.distributed.common.abstract_cluster_provider module

class ads.opctl.distributed.common.abstract_cluster_provider.ClusterProvider(mode, ephemeral=True, life_span='0h', work_dir='')

Bases: object

Provides contract for implementing Framework specific Cluster Life Cycle Manager.

The main node is identified by the environment variable - MAIN The worker node is identified by the environment variable - WORKER

The worker and main coordinate cluster configuration using the directory provided via WORK_DIR. The main node creates config.json in the WORK_DIR. The worker nodes polls the config.json and exports the configuration as environment variables

DEFAULT_CODE_DIR = '/code'
SYNC_SCRIPT_PATH = '/etc/datascience/sync.sh'
basic_configuration() dict

Prepares basic set of configuration which is framework independent. This configuration is decorated later by configuration method implemented by framework specific implementations

check_cluster_status()
configuration(conf={}) dict

Provides the configuration information of the cluster.

conf:

Contains generic information about the cluster, generated using basic_configuration. Eg. IP Address of the main process

static create_sync_script(sync_script_fn, sync_script)
execution_failed()

Invoked when code submitted to epheramal cluster fails. Calling this method sets the cluster tearable state

expected_worker_count()
export_config_files()

By default only exports configuration generated by main. This behavior can be overridden.

export_configuration(files)

Read the configuration in the files array and export to environment variable

fetch_all_worker_info()

Fetchs all the worker configs In some cluster the main requires information about all worker IPs apriori. This method maybe useful in such situation.

fetch_code()
classmethod find_self_ip(authinfo)

Identify IP address by finding which of the host IP intersects with the CIDR block of the subnet associated with the JOB_OCID

get_oci_auth()
get_sync_loc()
get_sync_script()
profile_cmd()
reached_endoflife()
ready()
run_code()
setup_configuration(config: Optional[dict] = None)

Writes the configuration information into location provided by work_dir

config:

dictionary containing configuration information that needs to be shared with the workers if config is None, then this method calls self.configuration and saves the configuration

work_dir:

Could be any valid URI supported by fsspec

setup_extra_configs(conf: dict)
start()

Starts the cluster processes

start_main()

Implement this for starting the main process. Eg. scheduler for Dask

start_ps()

Implement this for starting the ps process. Eg. tf-parameter-server for tensorflow

start_worker()

Implement this for starting the worker process. Eg. dask-worker for Dask

stop()

Writes stop file and exits.

property stop_filesystem
sync(loop=True)
tearable()

Checks if cluster is ready for tear down. If there is a stop file in the tmp directory then stop. If cluster is ephemeral type, check if code execution is complete If TTL is reached then stop

when_ready(func, *args, **kwargs)

ads.opctl.distributed.common.abstract_framework_spec_builder module

class ads.opctl.distributed.common.abstract_framework_spec_builder.AbstractFrameworkSpecBuilder(config)

Bases: object

Provides contract for implementing Framework specific Cluster Spec Builder

In the example of jobs, this class handles adding framework specific environment variables to the job definition.

NOTE: This class is not invoked while the cluster is running. Only after a call to ads opctl.

update()
ads.opctl.distributed.common.abstract_framework_spec_builder.update_env_vars(config, env_vars: List)

env_vars: List, should be formatted as [{“name”: “OCI__XXX”, “value”: YYY},]

ads.opctl.distributed.common.cluster_config_helper module

class ads.opctl.distributed.common.cluster_config_helper.ClusterConfigToJobSpecConverter(cluster_info)

Bases: object

job_def_info()
job_run_info(jobType)

ads.opctl.distributed.common.cluster_provider_factory module

class ads.opctl.distributed.common.cluster_provider_factory.ClusterProviderFactory

Bases: object

Factory class for creating provider instance.

static get_provider(key, *args, **kwargs)
provider = {}
classmethod register(cluster_type, provider_class)

ads.opctl.distributed.common.cluster_runner module

class ads.opctl.distributed.common.cluster_runner.ClusterRunner(cluster_provider=None, cluster_key=None)

Bases: object

run()

ads.opctl.distributed.common.framework_factory module

Module contents