ads.opctl.distributed.common package
Submodules
ads.opctl.distributed.common.abstract_cluster_provider module
- class ads.opctl.distributed.common.abstract_cluster_provider.ClusterProvider(mode, ephemeral=True, life_span='0h', work_dir='')
Bases:
object
Provides contract for implementing Framework specific Cluster Life Cycle Manager.
The main node is identified by the environment variable - MAIN The worker node is identified by the environment variable - WORKER
The worker and main coordinate cluster configuration using the directory provided via WORK_DIR. The main node creates config.json in the WORK_DIR. The worker nodes polls the config.json and exports the configuration as environment variables
- DEFAULT_CODE_DIR = '/code'
- SYNC_SCRIPT_PATH = '/etc/datascience/sync.sh'
- basic_configuration() dict
Prepares basic set of configuration which is framework independent. This configuration is decorated later by configuration method implemented by framework specific implementations
- check_cluster_status()
- configuration(conf={}) dict
Provides the configuration information of the cluster.
- conf:
Contains generic information about the cluster, generated using basic_configuration. Eg. IP Address of the main process
- static create_sync_script(sync_script_fn, sync_script)
- execution_failed()
Invoked when code submitted to epheramal cluster fails. Calling this method sets the cluster tearable state
- expected_worker_count()
- export_config_files()
By default only exports configuration generated by main. This behavior can be overridden.
- export_configuration(files)
Read the configuration in the files array and export to environment variable
- fetch_all_worker_info()
Fetchs all the worker configs In some cluster the main requires information about all worker IPs apriori. This method maybe useful in such situation.
- fetch_code()
- classmethod find_self_ip(authinfo)
Identify IP address by finding which of the host IP intersects with the CIDR block of the subnet associated with the JOB_OCID
- get_oci_auth()
- get_sync_loc()
- get_sync_script()
- profile_cmd()
- reached_endoflife()
- ready()
- run_code()
- setup_configuration(config: Optional[dict] = None)
Writes the configuration information into location provided by work_dir
- config:
dictionary containing configuration information that needs to be shared with the workers if config is None, then this method calls self.configuration and saves the configuration
- work_dir:
Could be any valid URI supported by fsspec
- setup_extra_configs(conf: dict)
- start()
Starts the cluster processes
- start_main()
Implement this for starting the main process. Eg. scheduler for Dask
- start_ps()
Implement this for starting the ps process. Eg. tf-parameter-server for tensorflow
- start_worker()
Implement this for starting the worker process. Eg. dask-worker for Dask
- stop()
Writes stop file and exits.
- property stop_filesystem
- sync(loop=True)
- tearable()
Checks if cluster is ready for tear down. If there is a stop file in the tmp directory then stop. If cluster is ephemeral type, check if code execution is complete If TTL is reached then stop
- when_ready(func, *args, **kwargs)
ads.opctl.distributed.common.abstract_framework_spec_builder module
- class ads.opctl.distributed.common.abstract_framework_spec_builder.AbstractFrameworkSpecBuilder(config)
Bases:
object
Provides contract for implementing Framework specific Cluster Spec Builder
In the example of jobs, this class handles adding framework specific environment variables to the job definition.
NOTE: This class is not invoked while the cluster is running. Only after a call to ads opctl.
- update()
- ads.opctl.distributed.common.abstract_framework_spec_builder.update_env_vars(config, env_vars: List)
env_vars: List, should be formatted as [{“name”: “OCI__XXX”, “value”: YYY},]