#!/usr/bin/env python# -*- coding: utf-8; -*-# Copyright (c) 2022, 2023 Oracle and/or its affiliates.# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/fromabcimportabstractmethodfromtypingimportDict,Union
[docs]classUnsupportedRuntime(Exception):def__init__(self,runtime_type:str):super().__init__(f"The provided runtime: `{runtime_type}` ""is not supported by given resource.")
[docs]classBackend:"""Interface for backend"""def__init__(self,config:Dict)->None:self.config=configself.auth_type=config["execution"].get("auth","api_key")self.profile=config["execution"].get("oci_profile",None)self.oci_config=config["execution"].get("oci_config",None)
[docs]@abstractmethoddefrun(self)->Dict:""" Initiate a run. Returns ------- Dict """
[docs]defdelete(self)->None:""" Delete a remote run. Returns ------- None """
[docs]defwatch(self)->None:""" Stream logs from a remote run. Returns ------- None """
[docs]defcancel(self)->None:""" Cancel a remote run. Returns ------- None """
[docs]defapply(self)->Dict:""" Initiate Data Science service from YAML. Returns ------- Dict """
[docs]defactivate(self)->None:""" Activate a remote service. Returns ------- None """raiseNotImplementedError("`activate` has not been implemented yet.")
[docs]defdeactivate(self)->None:""" Deactivate a remote service. Returns ------- None """raiseNotImplementedError("`deactivate` has not been implemented yet.")
[docs]defrun_diagnostics(self):""" Implement Diagnostics check appropriate for the backend """
[docs]definit(self,uri:Union[str,None]=None,overwrite:bool=False,**kwargs:Dict)->Union[str,None]:"""Generates a YAML specification for the resource. Parameters ---------- overwrite: (bool, optional). Defaults to False. Overwrites the result specification YAML if exists. uri: (str, optional) The filename to save the resulting specification template YAML. **kwargs: Dict The optional arguments. runtime_type: str The resource runtime type. Returns ------- Union[str, None] The YAML specification for the given resource if `uri` was not provided. `None` otherwise. """raiseNotImplementedError("The `init` has not been implemented yet for the given resource.")
[docs]defpredict(self)->None:""" Run model predict. Returns ------- None """raiseNotImplementedError("`predict` has not been implemented yet.")
[docs]classRuntimeFactory:"""Base factory for runtime."""_MAP={}