Source code for ads.opctl.distributed.common.cluster_runner
#!/usr/bin/env python
# -*- coding: utf-8; -*-
# Copyright (c) 2022 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
import os
from time import sleep, time_ns
from ads.opctl.distributed.common.cluster_provider_factory import ClusterProviderFactory
import traceback
[docs]
class ClusterRunner:
def __init__(self, cluster_provider=None, cluster_key=None):
self.cluster_key = cluster_key or os.environ.get("OCI__CLUSTER_TYPE")
self.mode = os.environ.get("OCI__MODE")
self.ephemeral = os.environ.get("OCI__EPHEMERAL", 1)
# life_span = os.environ.get("OCI__LIFE_SPAN") # TODO: Ask MR how this works
self.work_dir = os.environ.get("OCI__WORK_DIR")
os.environ["JOB_OCID"] = os.environ.get("JOB_OCID", 'Undefined')
os.environ["JOB_RUN_OCID"] = os.environ.get("JOB_RUN_OCID", str(time_ns()))
self.cluster = cluster_provider or ClusterProviderFactory.get_provider(
self.cluster_key,
mode=self.mode,
ephemeral=self.ephemeral,
work_dir=self.work_dir,
) # life_spanlife_span=life_span
print(f"Cluster built: {self.cluster}", flush=True)
[docs]
def run(self):
exit_code = 0
self.cluster.start()
try:
self.cluster.run_code()
# self.cluster.code_execution_complete = True # This needs to be
# set inside the run_code method of the implementation class.
except Exception as e:
print(f"Error Running the code: {e}", flush=True)
traceback.print_exc()
exit_code = 1
self.cluster.execution_failed()
while (
not self.cluster.tearable()
): # If not ephemeral, wait util it is ready for tearing down
sleep(15)
print("Signalling Stop!!!", flush=True)
self.cluster.stop() # Signal cluster tear down
print(f"Exiting with exit code: {exit_code}", flush=True)
self.cluster.sync(loop=False)
exit(exit_code)
if __name__ == "__main__":
ClusterRunner().run()