#!/usr/bin/env python
# -*- coding: utf-8 -*--
# Copyright (c) 2020, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
import importlib
import multiprocessing
import os
import uuid
import psutil
from enum import Enum, auto
from time import time, sleep
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import logging
from ads.common import logger
from ads.common import utils
from ads.common.data import ADSData
from ads.common.decorator.runtime_dependency import (
runtime_dependency,
OptionalDependency,
)
from ads.hpo._imports import try_import
from ads.hpo.ads_search_space import get_model2searchspace
from ads.hpo.distributions import *
from ads.hpo.objective import _Objective
from ads.hpo.stopping_criterion import NTrials, ScoreValue, TimeBudget
from ads.hpo.utils import _num_samples, _safe_indexing, _update_space_name
from ads.hpo.validation import (
assert_is_estimator,
assert_model_is_supported,
assert_strategy_valid,
assert_tuner_is_fitted,
validate_fit_params,
validate_pipeline,
validate_search_space,
validate_params_for_plot,
)
with try_import() as _imports:
from sklearn.base import BaseEstimator, clone, is_classifier
from sklearn.model_selection import BaseCrossValidator # NOQA
from sklearn.model_selection import check_cv, cross_validate
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.utils import check_random_state
from sklearn.exceptions import NotFittedError
try:
from sklearn.metrics import check_scoring
except:
from sklearn.metrics.scorer import check_scoring
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union # NOQA
[docs]
class State(Enum):
INITIATED = auto()
RUNNING = auto()
HALTED = auto()
TERMINATED = auto()
COMPLETED = auto()
[docs]
class InvalidStateTransition(Exception): # pragma: no cover
"""
`Invalid State Transition` is raised when an invalid transition request is made, such as calling
halt without a running process.
"""
pass
[docs]
class ExitCriterionError(Exception): # pragma: no cover
"""
`ExitCriterionError` is raised when an attempt is made to check exit status for a different exit
type than the tuner was initialized with. For example, if an HPO study has an exit criteria based
on the number of trials and a request is made for the time remaining, which is a different exit
criterion, an exception is raised.
"""
pass
[docs]
class DuplicatedStudyError(Exception): # pragma: no cover
"""
`DuplicatedStudyError` is raised when a new tuner process is created with a study name that
already exists in storage.
"""
[docs]
class NoRestartError(Exception): # pragma: no cover
"""
`NoRestartError` is raised when an attempt is made to check how many seconds have transpired since
the HPO process was last resumed from a halt. This can happen if the process has been terminated
or it was never halted and then resumed to begin with.
"""
pass
[docs]
class DataScienceObjective:
"""This class is to replace the previous lambda function to solve the problem that python does not allow pickle local function/lambda function."""
def __init__(self, objective, X_res, y_res):
self.objective = objective
self.X_res = X_res
self.y_res = y_res
def __call__(self, trial):
return self.objective(self.X_res, self.y_res, trial)
[docs]
class ADSTuner(BaseEstimator):
"""
Hyperparameter search with cross-validation.
"""
_required_parameters = ["model"]
@property
def sklearn_steps(self):
"""
Returns
-------
int
Search space which corresponds to the best candidate parameter setting.
"""
return _update_space_name(self.best_params, step_name=self._step_name)
@property
def best_index(self):
"""
Returns
-------
int
Index which corresponds to the best candidate parameter setting.
"""
return self.trials["value"].idxmax()
@property
def best_params(self):
"""
Returns
-------
Dict[str, Any]
Parameters of the best trial.
"""
self._check_is_fitted()
return self._remove_step_name(self._study.best_params)
@property
def best_score(self):
"""
Returns
-------
float
Mean cross-validated score of the best estimator.
"""
self._check_is_fitted()
return self._study.best_value
@property
def score_remaining(self):
"""
Returns
-------
float
The difference between the best score and the optimal score.
Raises
------
:class:`ExitCriterionError`
Error is raised if there is no score-based criteria for tuning.
"""
if self._optimal_score is None:
raise ExitCriterionError(
"Tuner does not have a score-based exit condition."
)
else:
return self._optimal_score - self.best_score
@property
def scoring_name(self):
"""
Returns
-------
str
Scoring name.
"""
return self._extract_scoring_name()
@property
def n_trials(self):
"""
Returns
-------
int
Number of completed trials. Alias for `trial_count`.
"""
self._check_is_fitted()
return len(self.trials)
# Alias for n_trials
trial_count = n_trials
@property
def trials_remaining(self):
"""
Returns
-------
int
The number of trials remaining in the budget.
Raises
------
:class:`ExitCriterionError`
Raised if the current tuner does not include a trials-based exit
condition.
"""
if self._n_trials is None:
raise ExitCriterionError(
"This tuner does not include a trials-based exit condition"
)
return self._n_trials - self.n_trials + self._previous_trial_count
@property
def trials(self):
"""
Returns
-------
:class:`pandas.DataFrame`
Trial data up to this point.
"""
if self.is_halted():
if self._trial_dataframe is None:
return pd.DataFrame()
return self._trial_dataframe
trials_dataframe = self._study.trials_dataframe().copy()
return trials_dataframe
@runtime_dependency(module="optuna", install_from=OptionalDependency.OPTUNA)
def __init__(
self,
model, # type: Union[BaseEstimator, Pipeline]
strategy="perfunctory", # type: Union[str, Mapping[str, optuna.distributions.BaseDistribution]]
scoring=None, # type: Optional[Union[Callable[..., float], str]]
cv=5, # type: Optional[int]
study_name=None, # type: Optional[str]
storage=None, # type: Optional[str]
load_if_exists=True, # type: Optional[bool]
random_state=None, # type: Optional[int]
loglevel=logging.INFO, # type: Optional[int]
n_jobs=1, # type: Optional[int]
X=None, # type: Union[List[List[float]], np.ndarray, pd.DataFrame, spmatrix, ADSData]
y=None, # type: Optional[Union[OneDimArrayLikeType, TwoDimArrayLikeType]]
):
# type: (...) -> None
"""
Returns a hyperparameter tuning object
Parameters
----------
model:
Object to use to fit the data. This is assumed to implement the
scikit-learn estimator or pipeline interface.
strategy:
``perfunctory``, ``detailed`` or a dictionary/mapping of hyperparameter
and its distribution . If obj:`perfunctory`, picks a few
relatively more important hyperparmeters to tune . If obj:`detailed`,
extends to a larger search space. If obj:dict, user defined search
space: Dictionary where keys are hyperparameters and values are distributions.
Distributions are assumed to implement the ads distribution interface.
scoring: Optional[Union[Callable[..., float], str]]
String or callable to evaluate the predictions on the validation data.
If :obj:`None`, ``score`` on the estimator is used.
cv: int
Integer to specify the number of folds in a CV splitter.
If :obj:`estimator` is a classifier and :obj:`y` is
either binary or multiclass,
``sklearn.model_selection.StratifiedKFold`` is used. otherwise,
``sklearn.model_selection.KFold`` is used.
study_name: str,
Name of the current experiment for the ADSTuner object. One ADSTuner
object can only be attached to one study_name.
storage:
Database URL. (e.g. sqlite:///example.db). Default to sqlite:////tmp/hpo_*.db.
load_if_exists:
Flag to control the behavior to handle a conflict of study names.
In the case where a study named ``study_name`` already exists in the ``storage``,
a :class:`DuplicatedStudyError` is raised if ``load_if_exists`` is
set to :obj:`False`.
Otherwise, the existing one is returned.
random_state:
Seed of the pseudo random number generator. If int, this is the
seed used by the random number generator. If :obj:`None`, the global random state from
``numpy.random`` is used.
loglevel:
loglevel. can be logging.NOTSET, logging.INFO, logging.DEBUG, logging.WARNING
n_jobs: int
Number of parallel jobs. :obj:`-1` means using all processors.
X: TwoDimArrayLikeType, Union[List[List[float]], np.ndarray,
pd.DataFrame, spmatrix, ADSData]
Training data.
y: Union[OneDimArrayLikeType, TwoDimArrayLikeType], optional
OneDimArrayLikeType: Union[List[float], np.ndarray, pd.Series]
TwoDimArrayLikeType: Union[List[List[float]], np.ndarray, pd.DataFrame, spmatrix, ADSData]
Target.
Example::
from ads.hpo.stopping_criterion import *
from ads.hpo.search_cv import ADSTuner
from sklearn.datasets import load_iris
from sklearn.svm import SVC
tuner = ADSTuner(
SVC(),
strategy='detailed',
scoring='f1_weighted',
random_state=42
)
X, y = load_iris(return_X_y=True)
tuner.tune(X=X, y=y, exit_criterion=[TimeBudget(1)])
"""
_imports.check()
self._n_jobs = n_jobs
assert (
cv > 1
), "k-fold cross-validation requires at least one train/test split by setting cv=2 or more"
self.cv = cv
self._error_score = np.nan
self.model = model
self._check_pipeline()
self._step_name = None
self._extract_estimator()
self.strategy = None
self._param_distributions = None
self._check_strategy(strategy)
self.strategy = strategy
self._param_distributions = self._get_param_distributions(self.strategy)
self._enable_pruning = hasattr(self.model, "partial_fit")
self._max_iter = 100
self.__random_state = random_state # to be used in export_trials
# this calls the randomstate.setter which turns self.random_state into a np.random.RandomState instance
# make it hard to be serialized.
self.random_state = check_random_state(random_state)
self._return_train_score = False
self.scoring = scoring
self._subsample = 1.0
self.loglevel = loglevel
self._trial_dataframe = None
self._status = State.INITIATED
self.study_name = (
study_name if study_name is not None else "hpo_" + str(uuid.uuid4())
)
self.storage = (
"sqlite:////tmp/hpo_" + str(uuid.uuid4()) + ".db"
if storage is None
else storage
)
self.oci_client = None
seed = np.random.randint(0, np.iinfo("int32").max)
self.sampler = optuna.samplers.TPESampler(seed=seed)
self.median_pruner = self._pruner(
class_name="median_pruner",
n_startup_trials=5,
n_warmup_steps=1,
interval_steps=1,
)
self.load_if_exists = load_if_exists
try:
self._study = optuna.study.create_study(
study_name=self.study_name,
direction="maximize",
pruner=self.median_pruner,
sampler=self.sampler,
storage=self.storage,
load_if_exists=self.load_if_exists,
)
except optuna.exceptions.DuplicatedStudyError as e:
if self.load_if_exists:
logger.info(
"Using an existing study with name '{}' instead of "
"creating a new one.".format(self.study_name)
)
else:
raise DuplicatedStudyError(
f"The study_name `{self.study_name}` exists in the {self.storage}. Either set load_if_exists=True, or use a new study_name."
)
self._init_data(X, y)
[docs]
def search_space(self, strategy=None, overwrite=False):
"""
Returns the search space. If strategy is not passed in, return the existing search
space. When strategy is passed in, overwrite the existing search space if overwrite
is set True, otherwise, only update the existing search space.
Parameters
----------
strategy: Union[str, dict], optional
``perfunctory``, ``detailed`` or a dictionary/mapping of the hyperparameters
and their distributions. If obj:`perfunctory`, picks a few relatively
more important hyperparmeters to tune . If obj:`detailed`, extends to a
larger search space. If obj:dict, user defined search space: Dictionary
where keys are parameters and values are distributions. Distributions are
assumed to implement the ads distribution interface.
overwrite: bool, optional
Ignored when strategy is None. Otherwise, search space is overwritten if overwrite
is set True and updated if it is False.
Returns
-------
dict
A mapping of the hyperparameters and their distributions.
Example::
from ads.hpo.stopping_criterion import *
from ads.hpo.search_cv import ADSTuner
from sklearn.datasets import load_iris
from sklearn.linear_model import SGDClassifier
tuner = ADSTuner(
SGDClassifier(),
strategy='detailed',
scoring='f1_weighted',
random_state=42
)
tuner.search_space({'max_iter': 100})
X, y = load_iris(return_X_y=True)
tuner.tune(X=X, y=y, exit_criterion=[TimeBudget(1)])
tuner.search_space()
"""
assert hasattr(
self, "_param_distributions"
), "Call <code>ADSTuner</code> first."
if not strategy:
return self._remove_step_name(self._param_distributions)
self._check_strategy(strategy)
self.strategy = strategy
if overwrite:
self._param_distributions = self._get_param_distributions(self.strategy)
else:
self._param_distributions.update(
self._get_param_distributions(self.strategy)
)
return self._remove_step_name(self._param_distributions)
@staticmethod
def _remove_step_name(param_distributions):
search_space = {}
for param, distributions in param_distributions.items():
if "__" in param:
param = param.split("__")[1]
search_space[param] = distributions
return search_space
def _check_pipeline(self):
self.model = validate_pipeline(self.model)
def _get_internal_param_distributions(self, strategy):
if isinstance(self.model, Pipeline):
for step_name, step in self.model.steps:
if step.__class__ in get_model2searchspace().keys():
self._step_name = step_name
param_distributions = get_model2searchspace()[step.__class__](
strategy
).suggest_space(step_name=step_name)
if len(param_distributions) == 0:
logger.warning("Nothing to tune.")
else:
assert_model_is_supported(self.model)
param_distributions = get_model2searchspace()[self.model.__class__](
strategy
).suggest_space()
self._check_search_space(param_distributions)
return param_distributions
def _get_param_distributions(self, strategy):
if isinstance(strategy, str):
param_distributions = self._get_internal_param_distributions(strategy)
if isinstance(strategy, dict):
param_distributions = _update_space_name(
strategy, step_name=self._step_name
)
self._check_search_space(param_distributions)
return param_distributions
def _check_search_space(self, param_distributions):
validate_search_space(self.model.get_params().keys(), param_distributions)
def _check_is_fitted(self):
assert_tuner_is_fitted(self)
def _check_strategy(self, strategy):
assert_strategy_valid(self._param_distributions, strategy, self.strategy)
def _add_halt_time(self):
"""Adds a new start time window to the start/stop log. This happens in two cases: when the tuning process
has commenced and when it resumes following a halt
"""
self._time_log.append(dict(halt=time(), resume=None))
def _add_resume_time(self):
"""Adds a new stopping time to the last window in the time log. This happens when the HPO process is
halted or terminated.
"""
if len(self._time_log) > 0:
entry = self._time_log.pop()
if entry["resume"] is not None:
raise Exception("Cannot close a time window without an opening time.")
self._time_log.append(dict(halt=entry["halt"], resume=time()))
[docs]
def tune(
self,
X=None, # type: TwoDimArrayLikeType
y=None, # type: Optional[Union[OneDimArrayLikeType, TwoDimArrayLikeType]]
exit_criterion=[], # type: Optional[list]
loglevel=None, # type: Optional[int]
synchronous=False, # type: Optional[boolean]
):
"""
Run hypyerparameter tuning until one of the <code>exit_criterion</code>
is met. The default is to run 50 trials.
Parameters
----------
X: TwoDimArrayLikeType, Union[List[List[float]], np.ndarray, pd.DataFrame, spmatrix, ADSData]
Training data.
y: Union[OneDimArrayLikeType, TwoDimArrayLikeType], optional
OneDimArrayLikeType: Union[List[float], np.ndarray, pd.Series]
TwoDimArrayLikeType: Union[List[List[float]], np.ndarray, pd.DataFrame, spmatrix, ADSData]
Target.
exit_criterion: list, optional
A list of ads stopping criterion. Can be `ScoreValue()`, `NTrials()`, `TimeBudget()`.
For example, [ScoreValue(0.96), NTrials(40), TimeBudget(10)]. It will exit when any of the
stopping criterion is satisfied in the `exit_criterion` list.
By default, the run will stop after 50 trials.
loglevel: int, optional
Log level.
synchronous: boolean, optional
Tune synchronously or not. Defaults to `False`
Returns
-------
None
Nothing
Example::
from ads.hpo.stopping_criterion import *
from ads.hpo.search_cv import ADSTuner
from sklearn.datasets import load_iris
from sklearn.svm import SVC
tuner = ADSTuner(
SVC(),
strategy='detailed',
scoring='f1_weighted',
random_state=42
)
tuner.search_space({'max_iter': 100})
X, y = load_iris(return_X_y=True)
tuner.tune(X=X, y=y, exit_criterion=[TimeBudget(1)])
"""
# Get previous trial count to ensure proper counting.
try:
self._previous_trial_count = self.trial_count
except NotFittedError:
self._previous_trial_count = 0
except Exception as e:
_logger.error(f"Error retrieving previous trial count: {e}")
raise
self._init_data(X, y)
if self.X is None:
raise ValueError(
"Need to either pass the data to `X` and `y` in `tune()`, or to `ADSTuner`."
)
if self.is_running():
raise InvalidStateTransition(
"Running process found. Do you need to call terminate() to stop before calling tune()?"
)
if self.is_halted():
raise InvalidStateTransition(
"Halted process found. You need to call resume()."
)
# handle ADSData
# Initialize time log for every new call to tune(). Set shared global time values
self._global_start = multiprocessing.Value("d", 0.0)
self._global_stop = multiprocessing.Value("d", 0.0)
self._time_log = []
self._tune(
X=self.X,
y=self.y,
exit_criterion=exit_criterion,
loglevel=loglevel,
synchronous=synchronous,
)
# Tune cannot exit before the clock starts in the subprocess.
while self._global_start.value == 0.0:
sleep(0.01)
def _init_data(self, X, y):
if X is not None:
if isinstance(X, ADSData):
self.y = X.y
self.X = X.X
else:
self.X = X
self.y = y
[docs]
def halt(self):
"""
Halt the current running tuning process.
Returns
-------
None
Nothing
Raises
------
`InvalidStateTransition` if no running process is found
Example::
from ads.hpo.stopping_criterion import *
from ads.hpo.search_cv import ADSTuner
from sklearn.datasets import load_iris
from sklearn.linear_model import SGDClassifier
tuner = ADSTuner(
SGDClassifier(),
strategy='detailed',
scoring='f1_weighted',
random_state=42
)
tuner.search_space({'max_iter': 100})
X, y = load_iris(return_X_y=True)
tuner.tune(X=X, y=y, exit_criterion=[TimeBudget(1)])
tuner.halt()
"""
if hasattr(self, "_tune_process") and self._status == State.RUNNING:
self._trial_dataframe = self._study.trials_dataframe().copy()
psutil.Process(self._tune_process.pid).suspend()
self._status = State.HALTED
self._add_halt_time()
else:
raise InvalidStateTransition(
"No running process found. Do you need to call tune()?"
)
[docs]
def resume(self):
"""
Resume the current halted tuning process.
Returns
-------
None
Nothing
Example::
from ads.hpo.stopping_criterion import *
from ads.hpo.search_cv import ADSTuner
from sklearn.datasets import load_iris
from sklearn.linear_model import SGDClassifier
tuner = ADSTuner(
SGDClassifier(),
strategy='detailed',
scoring='f1_weighted',
random_state=42
)
tuner.search_space({'max_iter': 100})
X, y = load_iris(return_X_y=True)
tuner.tune(X=X, y=y, exit_criterion=[TimeBudget(1)])
tuner.halt()
tuner.resume()
"""
if self.is_halted():
psutil.Process(self._tune_process.pid).resume()
self._add_resume_time()
self._status = State.RUNNING
else:
raise InvalidStateTransition("No paused process found.")
[docs]
def wait(self):
"""
Wait for the current tuning process to finish running.
Returns
-------
None
Nothing
Example::
from ads.hpo.stopping_criterion import *
from ads.hpo.search_cv import ADSTuner
from sklearn.datasets import load_iris
from sklearn.linear_model import SGDClassifier
tuner = ADSTuner(
SGDClassifier(),
strategy='detailed',
scoring='f1_weighted',
random_state=42
)
tuner.search_space({'max_iter': 100})
X, y = load_iris(return_X_y=True)
tuner.tune(X=X, y=y, exit_criterion=[TimeBudget(1)])
tuner.wait()
"""
if self.is_running():
self._tune_process.join()
self._status = State.COMPLETED
else:
raise InvalidStateTransition("No running process.")
[docs]
def terminate(self):
"""
Terminate the current tuning process.
Returns
-------
None
Nothing
Example::
from ads.hpo.stopping_criterion import *
from ads.hpo.search_cv import ADSTuner
from sklearn.datasets import load_iris
from sklearn.linear_model import SGDClassifier
tuner = ADSTuner(
SGDClassifier(),
strategy='detailed',
scoring='f1_weighted',
random_state=42
)
tuner.search_space({'max_iter': 100})
X, y = load_iris(return_X_y=True)
tuner.tune(X=X, y=y, exit_criterion=[TimeBudget(1)])
tuner.terminate()
"""
if self.is_running():
self._tune_process.terminate()
self._tune_process.join()
self._status = State.TERMINATED
# self._add_terminate_time()
self._update_failed_trial_state()
else:
raise RuntimeError("No running process found. Do you need to call tune()?")
@runtime_dependency(module="optuna", install_from=OptionalDependency.OPTUNA)
def _update_failed_trial_state(self):
from optuna.trial import TrialState
for trial in self._study.trials:
if trial.state == TrialState.RUNNING:
self._study._storage.set_trial_state(
trial._trial_id, optuna.structs.TrialState.FAIL
)
@property
def time_remaining(self):
"""Returns the number of seconds remaining in the study
Returns
-------
int: Number of seconds remaining in the budget. 0 if complete/terminated
Raises
------
:class:`ExitCriterionError`
Error is raised if time has not been included in the budget.
"""
if self._time_budget is None:
raise ExitCriterionError(
"This tuner does not include a time-based exit condition"
)
elif self.is_completed() or self.is_terminated():
return 0
return max(self._time_budget - self.time_elapsed, 0)
@property
def time_since_resume(self):
"""Return the seconds since the process has been resumed from a halt.
Returns
-------
int: the number of seconds since the process was last resumed
Raises
------
`NoRestartError` is the process has not been resumed
"""
if len(self._time_log) > 0:
last_time_resumed = self._time_log[-1].get("resume")
else:
raise Exception("Time log should not be empty")
if self.is_running():
if last_time_resumed is not None:
return time() - last_time_resumed
else:
raise NoRestartError("The process has not been resumed")
elif self.is_halted():
return 0 # if halted, the amount of time since restarted from a halt is 0
elif self.is_terminated():
raise NoRestartError("The process has been terminated")
@property
def time_elapsed(self):
"""Return the time in seconds that the HPO process has been searching
Returns
-------
int: The number of seconds the HPO process has been searching
"""
time_in_halted_state = 0.0
# Add up all the halted durations, i.e. the time spent between halt and resume
for entry in self._time_log:
halt_time = entry.get("halt")
resume_time = entry.get("resume")
if resume_time is None:
# halted state.
# elapsed = halt time - global start - time halted
elapsed = halt_time - self._global_start.value - time_in_halted_state
return elapsed
else:
# running/completed/terminated state,
time_in_halted_state += resume_time - halt_time
# If the loop ends all halts were resumed. If self._global_stop != 0 that means the
# process has exited.
if self._global_stop.value != 0:
global_time = self._global_stop.value - self._global_start.value
else:
global_time = time() - self._global_start.value
elapsed = global_time - time_in_halted_state
return elapsed
[docs]
def best_scores(self, n: int = 5, reverse: bool = True):
"""Return the best scores from the study
Parameters
----------
n: int
The maximum number of results to show. Defaults to 5. If `None` or
negative return all.
reverse: bool
Whether to reverse the sort order so results are in descending order.
Defaults to `True`
Returns
-------
list[float or int]
List of the best scores
Raises
------
`ValueError` if there are no trials
"""
if len(self.trials) < 1:
raise ValueError("No score data to show")
else:
scores = self.trials.value
scores = scores[scores.notnull()]
if scores is None:
raise ValueError(
f"No score data despite valid trial data. Trial data length: {len(self.trials)}"
)
if not isinstance(n, int) or n <= 0:
return sorted(scores, reverse=reverse)
else:
return sorted(scores, reverse=reverse)[:n]
[docs]
def get_status(self):
"""
return the status of the current tuning process.
Alias for the property `status`.
Returns
-------
:class:`Status`
The status of the process
Example::
from ads.hpo.stopping_criterion import *
from ads.hpo.search_cv import ADSTuner
from sklearn.datasets import load_iris
from sklearn.linear_model import SGDClassifier
tuner = ADSTuner(
SGDClassifier(),
strategy='detailed',
scoring='f1_weighted',
random_state=42
)
tuner.search_space({'max_iter': 100})
X, y = load_iris(return_X_y=True)
tuner.tune(X=X, y=y, exit_criterion=[TimeBudget(1)])
tuner.get_status()
"""
return self.status
[docs]
def is_running(self):
"""
Returns
-------
bool
`True` if the :class:`ADSTuner` instance is running; `False` otherwise.
"""
return self.status == State.RUNNING
[docs]
def is_halted(self):
"""
Returns
-------
bool
`True` if the :class:`ADSTuner` instance is halted; `False` otherwise.
"""
return self.status == State.HALTED
[docs]
def is_terminated(self):
"""
Returns
-------
bool
`True` if the :class:`ADSTuner` instance has been terminated; `False` otherwise.
"""
return self.status == State.TERMINATED
[docs]
def is_completed(self):
"""
Returns
-------
bool
`True` if the :class:`ADSTuner` instance has completed; `False` otherwise.
"""
return self.status == State.COMPLETED
def _is_tuning_started(self):
"""
Returns
-------
bool
`True` if the :class:`ADSTuner` instance has been started (for example, halted or
running); `False` otherwise.
"""
return self.status == State.HALTED or self.status == State.RUNNING
def _is_tuning_finished(self):
"""
Returns
-------
bool
`True` if the :class:`ADSTuner` instance is finished running (i.e. completed
or terminated); `False` otherwise.
"""
return self.status == State.COMPLETED or self.status == State.TERMINATED
@property
def status(self):
"""
Returns
-------
:class:`Status`
The status of the current tuning process.
"""
if (
self._status == State.HALTED
or self._status == State.TERMINATED
or self._status == State.INITIATED
):
return self._status
elif hasattr(self, "_tune_process") and self._tune_process.is_alive():
return State.RUNNING
else:
return State.COMPLETED
return self._status
def _extract_exit_criterion(self, exit_criterion):
# handle the exit criterion
self._time_budget = None
self._n_trials = None
self.exit_criterion = []
self._optimal_score = None
if exit_criterion is None or len(exit_criterion) == 0:
self._n_trials = 50
for i, criteria in enumerate(exit_criterion):
if isinstance(criteria, TimeBudget):
self._time_budget = criteria()
elif isinstance(criteria, NTrials):
self._n_trials = criteria()
elif isinstance(criteria, ScoreValue):
self._optimal_score = criteria.score
self.exit_criterion.append(criteria)
else:
raise NotImplementedError(
"``{}`` is not supported!".format(criteria.__class__.__name__)
)
def _extract_estimator(self):
if isinstance(self.model, Pipeline): # Pipeline
for step_name, step in self.model.steps:
if self._is_estimator(step):
self._step_name = step_name
self.estimator = step
else:
self.estimator = self.model
assert_is_estimator(self.estimator)
# assert _check_estimator(self.estimator), "Estimator must implement fit"
def _extract_scoring_name(self):
if isinstance(self.scoring, str):
return self.scoring
if not callable(self._scorer):
return (
self._scorer
if isinstance(self._scorer, str)
else str(self._scorer).split("(")[1].split(")")[0]
)
else:
if is_classifier(self.model):
return "mean accuracy"
else:
return "r2"
@runtime_dependency(module="optuna", install_from=OptionalDependency.OPTUNA)
def _set_logger(self, loglevel, class_name):
if loglevel is not None:
self.loglevel = loglevel
if class_name == "optuna":
optuna.logging.set_verbosity(self.loglevel)
else:
raise NotImplementedError("{} is not supported.".format(class_name))
def _set_sample_indices(self, X, random_state):
max_samples = self._subsample
n_samples = _num_samples(X)
self._sample_indices = np.arange(n_samples)
if isinstance(max_samples, float):
max_samples = int(max_samples * n_samples)
if max_samples < n_samples:
self._sample_indices = random_state.choice(
self._sample_indices, max_samples, replace=False
)
self._sample_indices.sort()
def _get_fit_params_res(self, X):
fit_params = {}
fit_params_res = fit_params
if fit_params_res is not None:
fit_params_res = validate_fit_params(X, fit_params, self._sample_indices)
return fit_params_res
def _can_tune(self):
assert hasattr(self, "model"), "Call <code>ADSTuner</code> first."
if self._param_distributions == {}:
logger.warning("Nothing to tune.")
if self._param_distributions is None:
raise NotImplementedError(
"There was no model specified or the model is not supported."
)
@runtime_dependency(module="optuna", install_from=OptionalDependency.OPTUNA)
def _tune(
self,
X, # type: TwoDimArrayLikeType
y, # type: Optional[Union[OneDimArrayLikeType, TwoDimArrayLikeType]]
exit_criterion=[], # type: Optional[list]
loglevel=None, # type: Optional[int]
synchronous=False, # type: Optional[boolean]
):
# type: (...) -> tuple
"""
Tune with all sets of parameters.
"""
self._can_tune()
self._set_logger(loglevel=loglevel, class_name="optuna")
self._extract_exit_criterion(exit_criterion)
self._extract_estimator()
random_state = self.random_state
old_level = logger.getEffectiveLevel()
logger.setLevel(self.loglevel)
if not synchronous:
optuna.logging.set_verbosity(optuna.logging.ERROR)
logger.setLevel(logging.ERROR)
self._set_sample_indices(X, random_state)
X_res = _safe_indexing(X, self._sample_indices)
y_res = _safe_indexing(y, self._sample_indices)
groups_res = _safe_indexing(None, self._sample_indices)
fit_params_res = self._get_fit_params_res(X)
classifier = is_classifier(self.model)
cv = check_cv(self.cv, y_res, classifier=classifier)
self._n_splits = cv.get_n_splits(X_res, y_res, groups=groups_res)
# scoring
self._scorer = check_scoring(self.estimator, scoring=self.scoring)
self._study = optuna.study.create_study(
study_name=self.study_name,
direction="maximize",
pruner=self.median_pruner,
sampler=self.sampler,
storage=self.storage,
load_if_exists=self.load_if_exists,
)
objective = _Objective(
self.model,
self._param_distributions,
cv,
self._enable_pruning,
self._error_score,
fit_params_res,
groups_res,
self._max_iter,
self._return_train_score,
self._scorer,
self.scoring_name,
self._step_name,
)
if synchronous:
logger.info(
"Optimizing hyperparameters using {} "
"samples...".format(_num_samples(self._sample_indices))
)
self._tune_process = multiprocessing.Process(
target=ADSTuner.optimizer,
args=(
self.study_name,
self.median_pruner,
self.sampler,
self.storage,
self.load_if_exists,
DataScienceObjective(objective, X_res, y_res),
self._global_start,
self._global_stop,
),
kwargs=dict(
n_jobs=self._n_jobs,
n_trials=self._n_trials,
timeout=self._time_budget,
show_progress_bar=False,
callbacks=self.exit_criterion,
gc_after_trial=False,
),
)
self._tune_process.start()
self._status = State.RUNNING
if synchronous:
self._tune_process.join()
logger.info("Finished hyperparemeter search!")
self._status = State.COMPLETED
logger.setLevel(old_level)
[docs]
@staticmethod
@runtime_dependency(module="optuna", install_from=OptionalDependency.OPTUNA)
def optimizer(
study_name,
pruner,
sampler,
storage,
load_if_exists,
objective_func,
global_start,
global_stop,
**kwargs,
):
"""
Static method for running ADSTuner tuning process
Parameters
----------
study_name: str
The name of the study.
pruner
The pruning method for pruning trials.
sampler
The sampling method used for tuning.
storage: str
Storage endpoint.
load_if_exists: bool
Load existing study if it exists.
objective_func
The objective function to be maximized.
global_start: :class:`multiprocesing.Value`
The global start time.
global_stop: :class:`multiprocessing.Value`
The global stop time.
kwargs: dict
Keyword/value pairs passed into the optimize process
Raises
------
:class:`Exception`
Raised for any exceptions thrown by the underlying optimization process
Returns
-------
None
Nothing
"""
import traceback
study = optuna.study.create_study(
study_name=study_name,
direction="maximize",
pruner=pruner,
sampler=sampler,
storage=storage,
load_if_exists=load_if_exists,
)
try:
global_start.value = time()
study.optimize(objective_func, **kwargs)
global_stop.value = time()
except Exception as e:
traceback.print_exc()
raise e
@staticmethod
def _is_estimator(step):
return hasattr(step, "fit") and (
not hasattr(step, "transform")
or hasattr(step, "predict")
or hasattr(step, "fit_predict")
)
@staticmethod
@runtime_dependency(module="optuna", install_from=OptionalDependency.OPTUNA)
def _pruner(class_name, **kwargs):
if class_name == "median_pruner":
return optuna.pruners.MedianPruner(**kwargs)
else:
raise NotImplementedError("{} is not supported.".format(class_name))
[docs]
def trials_export(
self, file_uri, metadata=None, script_dict={"model": None, "scoring": None}
):
"""Export the meta data as well as files needed to reconstruct the ADSTuner object to the object storage.
Data is not stored. To resume the same ADSTuner object from object storage and continue tuning from previous trials,
you have to provide the dataset.
Parameters
----------
file_uri: str
Object storage path, 'oci://bucketname@namespace/filepath/on/objectstorage'. For example,
`oci://test_bucket@ociodsccust/tuner/test.zip`
metadata: str, optional
User defined metadata
script_dict: dict, optional
Script paths for model and scoring. This is only recommended for unsupported
models and user-defined scoring functions. You can store the model and scoring
function in a dictionary with keys `model` and `scoring` and the respective
paths as values. The model and scoring scripts must import necessary libraries
for the script to run. The ``model`` and ``scoring`` variables must be set to
your model and scoring function.
Returns
-------
None
Nothing
Example::
# Print out a list of supported models
from ads.hpo.ads_search_space import model_list
print(model_list)
# Example scoring dictionary
{'model':'/home/datascience/advanced-ds/notebooks/scratch/ADSTunerV2/mymodel.py',
'scoring':'/home/datascience/advanced-ds/notebooks/scratch/ADSTunerV2/customized_scoring.py'}
Example::
from ads.hpo.stopping_criterion import *
from ads.hpo.search_cv import ADSTuner
from sklearn.datasets import load_iris
from sklearn.linear_model import SGDClassifier
tuner = ADSTuner(
SGDClassifier(),
strategy='detailed',
scoring='f1_weighted',
random_state=42
)
tuner.search_space({'max_iter': 100})
X, y = load_iris(return_X_y=True)
tuner.tune(X=X, y=y, exit_criterion=[TimeBudget(1)], synchronous=True)
tuner.trials_export('oci://<bucket_name>@<namespace>/tuner/test.zip')
"""
# oci://bucketname@namespace/filename
from ads.hpo.tuner_artifact import UploadTunerArtifact
assert self._is_tuning_finished()
assert script_dict.keys() <= set(
["model", "scoring"]
), "script_dict keys can only be model and scoring."
UploadTunerArtifact(self, file_uri, metadata).upload(script_dict)
[docs]
@classmethod
def trials_import(cls, file_uri, delete_zip_file=True, target_file_path=None):
"""Import the database file from the object storage
Parameters
----------
file_uri: str
'oci://bucketname@namespace/filepath/on/objectstorage'
Example: 'oci://<bucket_name>@<namespace>/tuner/test.zip'
delete_zip_file: bool, defaults to True, optional
Whether delete the zip file afterwards.
target_file_path: str, optional
The path where the zip file will be saved. For example, '/home/datascience/myfile.zip'.
Returns
-------
:class:`ADSTuner`
ADSTuner object
Examples
--------
>>> from ads.hpo.stopping_criterion import *
>>> from ads.hpo.search_cv import ADSTuner
>>> from sklearn.datasets import load_iris
>>> from sklearn.linear_model import SGDClassifier
>>> X, y = load_iris(return_X_y=True)
>>> tuner = ADSTuner.trials_import('oci://<bucket_name>@<namespace>/tuner/test.zip')
>>> tuner.tune(X=X, y=y, exit_criterion=[TimeBudget(1)], synchronous=True)
"""
from ads.hpo.tuner_artifact import DownloadTunerArtifact
tuner_args, cls.metadata = DownloadTunerArtifact(
file_uri, target_file_path=target_file_path
).extract_tuner_args(delete_zip_file=delete_zip_file)
return cls(**tuner_args)
@runtime_dependency(module="IPython", install_from=OptionalDependency.NOTEBOOK)
def _plot(
self, # type: ADSTuner
plot_module, # type: str
plot_func, # type: str
time_interval=0.5, # type: float
fig_size=(800, 500), # type: tuple
**kwargs,
):
if fig_size:
logger.warning(
"The param fig_size will be depreciated in future releases.",
)
spec = importlib.util.spec_from_file_location(
"plot",
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"visualization",
plot_module + ".py",
),
)
plot = importlib.util.module_from_spec(spec)
spec.loader.exec_module(plot)
_imports.check()
assert self._study is not None, "Need to call <code>.tune()</code> first."
ntrials = 0
if plot_func == "_plot_param_importances":
print("Waiting for more trials before evaluating the param importance.")
while self.status == State.RUNNING:
import time
from IPython.display import clear_output
time.sleep(time_interval)
if len(self.trials[~self.trials["value"].isnull()]) > ntrials:
if plot_func == "_plot_param_importances":
if len(self.trials[~self.trials["value"].isnull()]) >= 4:
clear_output(wait=True)
getattr(plot, plot_func)(
study=self._study, fig_size=fig_size, **kwargs
)
clear_output(wait=True)
else:
getattr(plot, plot_func)(
study=self._study, fig_size=fig_size, **kwargs
)
clear_output(wait=True)
if len(self.trials) == 0:
plt.figure()
plt.title("Intermediate Values Plot")
plt.xlabel("Step")
plt.ylabel("Intermediate Value")
plt.show(block=False)
ntrials = len(self.trials[~self.trials["value"].isnull()])
getattr(plot, plot_func)(study=self._study, fig_size=fig_size, **kwargs)
[docs]
def plot_best_scores(
self,
best=True, # type: bool
inferior=True, # type: bool
time_interval=1, # type: float
fig_size=(800, 500), # type: tuple
):
"""Plot optimization history of all trials in a study.
Parameters
----------
best:
controls whether to plot the lines for the best scores so far.
inferior:
controls whether to plot the dots for the actual objective scores.
time_interval:
how often(in seconds) the plot refresh to check on the new trial results.
fig_size: tuple
width and height of the figure.
Returns
-------
None
Nothing.
"""
self._plot(
"_optimization_history",
"_get_optimization_history_plot",
time_interval=time_interval,
fig_size=fig_size,
best=best,
inferior=inferior,
)
[docs]
@runtime_dependency(module="optuna", install_from=OptionalDependency.OPTUNA)
def plot_param_importance(
self,
importance_evaluator="Fanova", # type: str
time_interval=1, # type: float
fig_size=(800, 500), # type: tuple
):
"""Plot hyperparameter importances.
Parameters
----------
importance_evaluator: str
Importance evaluator. Valid values: "Fanova", "MeanDecreaseImpurity". Defaults
to "Fanova".
time_interval: float
How often the plot refresh to check on the new trial results.
fig_size: tuple
Width and height of the figure.
Raises
------
:class:`NotImplementedErorr`
Raised for unsupported importance evaluators
Returns
-------
None
Nothing.
"""
assert importance_evaluator in [
"MeanDecreaseImpurity",
"Fanova",
], "Only support <code>MeanDecreaseImpurity</code> and <code>Fanova</code>."
if importance_evaluator == "Fanova":
evaluator = None
elif importance_evaluator == "MeanDecreaseImpurity":
evaluator = optuna.importance.MeanDecreaseImpurityImportanceEvaluator()
else:
raise NotImplemented(
f"{importance_evaluator} is not supported. It can be either `Fanova` or `MeanDecreaseImpurity`."
)
try:
self._plot(
plot_module="_param_importances",
plot_func="_plot_param_importances",
time_interval=time_interval,
fig_size=fig_size,
evaluator=evaluator,
)
except:
logger.error(
msg="""Cannot calculate the hyperparameter importance. Increase the number of trials or time budget. """
)
[docs]
def plot_edf_scores(
self,
time_interval=1, # type: float
fig_size=(800, 500), # type: tuple
):
"""
Plot the EDF (empirical distribution function) of the scores.
Only completed trials are used.
Parameters
----------
time_interval: float
Time interval for the plot. Defaults to 1.
fig_size: tuple[int, int]
Figure size. Defaults to (800, 500).
Returns
-------
None
Nothing.
"""
self._plot(
"_edf", "_get_edf_plot", time_interval=time_interval, fig_size=fig_size
)
[docs]
def plot_contour_scores(
self,
params=None, # type: Optional[List[str]]
time_interval=1, # type: float
fig_size=(800, 500), # type: tuple
):
"""
Contour plot of the scores.
Parameters
----------
params: Optional[List[str]]
Parameter list to visualize. Defaults to all.
time_interval: float
Time interval for the plot. Defaults to 1.
fig_size: tuple[int, int]
Figure size. Defaults to (800, 500).
Returns
-------
None
Nothing.
"""
validate_params_for_plot(params, self._param_distributions)
try:
self._plot(
"_contour",
"_get_contour_plot",
time_interval=time_interval,
fig_size=fig_size,
params=params,
)
except ValueError:
logger.warning(
msg="Cannot plot contour score."
" Increase the number of trials or time budget."
)
[docs]
def plot_parallel_coordinate_scores(
self,
params=None, # type: Optional[List[str]]
time_interval=1, # type: float
fig_size=(800, 500), # type: tuple
):
"""
Plot the high-dimentional parameter relationships in a study.
Note that, If a parameter contains missing values, a trial with missing values is not plotted.
Parameters
----------
params: Optional[List[str]]
Parameter list to visualize. Defaults to all.
time_interval: float
Time interval for the plot. Defaults to 1.
fig_size: tuple[int, int]
Figure size. Defaults to (800, 500).
Returns
-------
None
Nothing.
"""
validate_params_for_plot(params, self._param_distributions)
self._plot(
"_parallel_coordinate",
"_get_parallel_coordinate_plot",
time_interval=time_interval,
fig_size=fig_size,
params=params,
)