Source code for ads.evaluations.statistical_metrics

#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2020, 2022 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/


from __future__ import print_function, absolute_import, division

import numpy as np
import pandas as pd
from sklearn import metrics
from sklearn.preprocessing import LabelEncoder
from ads.common import logger
from ads.common.decorator.runtime_dependency import (
    runtime_dependency,
    OptionalDependency,
)

__all__ = ["ModelEvaluator"]

DEFAULT_BIN_CLASS_METRICS = [
    "accuracy",
    "hamming_loss",
    "precision",
    "recall",
    "f1",
    "auc",
]
DEFAULT_MULTI_CLASS_METRICS = [
    "accuracy",
    "hamming_loss",
    "precision_weighted",
    "precision_micro",
    "recall_weighted",
    "recall_micro",
    "f1_weighted",
    "f1_micro",
]
DEFAULT_REG_METRICS = ["r2_score", "mse", "mae"]
DEFAULT_BIN_CLASS_LABELS_MAP = {
    "accuracy": "Accuracy",
    "hamming_loss": "Hamming distance",
    "kappa_score": "Cohen's kappa coefficient",
    "precision": "Precision",
    "recall": "Recall",
    "f1": "F1",
    "auc": "ROC AUC",
}
DEFAULT_MULTI_CLASS_LABELS_MAP = {
    "accuracy": "Accuracy",
    "hamming_loss": "Hamming distance",
    "precision_weighted": "Precision Weighted Average",
    "precision_micro": "Precision Micro Average",
    "recall_weighted": "Recall Weighted Average",
    "recall_micro": "Recall Micro Average",
    "f1_weighted": "F1 Weighted Average",
    "f1_micro": "F1 Micro Average",
}
DEFAULT_REG_LABELS_MAP = {
    "r2_score": "r-Squared Score",
    "root_mean_squared_error": "Root Mean Squared Error",
    "median_absolute_error": "Median Absolute Error",
}


[docs] class ModelEvaluator: """ ModelEvaluator takes in the true and predicted values and returns a pandas dataframe Attributes ---------- y_true : array-like object holding the true values for the model y_pred : array-like object holding the predicted values for the model model_name (str): the name of the model classes (list): list of target classes positive_class (str): label for positive outcome from model y_score : array-like object holding the scores for true values for the model metrics (dict): dictionary object holding model data Methods ------- get_metrics() Gets the metrics information in a dataframe based on the number of classes safe_metrics_call(scoring_functions, *args) Applies sklearn scoring functions to parameters in args """ def __init__( self, y_true, y_pred, model_name, classes=None, positive_class=None, y_score=None, ): self.y_true = y_true self.y_pred = np.squeeze( y_pred ) # This is a bug where y_pred is shape (1,n) from AutoML, rather than (n) self.y_score = y_score self.model_name = model_name self.classes = classes self.positive_class = positive_class self.metrics = { "model_name": model_name, "classes": classes, "positive_class": positive_class, "y_true": y_true, "y_pred": y_pred, "y_score": y_score, }
[docs] def get_metrics(self): """ Gets the metrics information in a dataframe based on the number of classes Parameters ---------- self: (`ModelEvaluator` instance) The `ModelEvaluator` instance with the metrics. Returns ------- :class:`pandas.DataFrame` Pandas dataframe containing the metrics """ if self.classes is None: self._get_regression_metrics() return pd.DataFrame.from_dict( self.metrics, orient="index", columns=[self.model_name] ) else: if len(self.classes) == 2: # 'binary' self.positive_class = ( self.positive_class if self.positive_class else self.classes[0] ) self._get_binary_metrics() return pd.DataFrame.from_dict( self.metrics, orient="index", columns=[self.model_name] ) else: # 'multiclass' self._get_multiclass_metrics() return pd.DataFrame.from_dict( self.metrics, orient="index", columns=[self.model_name] )
def _get_general_metrics(self): try: args = [self.y_true, self.y_pred] kwargs = {"labels": self.classes} scoring_functions = { "classification_report": ("classification_report", len(args)), "kappa_score": ("cohen_kappa_score", len(args)), "raw_confusion_matrix": ("confusion_matrix", len(args)), "hinge_loss": ("hinge_loss", len(args)), } self.safe_metrics_call(scoring_functions, *args, **kwargs) args = [self.y_true, self.y_pred] scoring_functions_without_labs = { "accuracy": ("accuracy_score", len(args)), "zero_one_loss": ("zero_one_loss", len(args)), "hamming_loss": ("hamming_loss", len(args)), } self.safe_metrics_call(scoring_functions_without_labs, *args) cm = self.metrics["raw_confusion_matrix"] a = cm.astype("float") b = cm.sum(axis=1)[:, np.newaxis] normalized_cm = np.divide(a, b, out=np.zeros_like(a), where=b != 0).tolist() self.metrics["confusion_matrix"] = normalized_cm vc = pd.DataFrame(self.y_true).value_counts() self.metrics["class_balance"] = min(vc) / max(vc) except: raise ValueError( f"Errors arose when attempting to compute metrics. Metrics broke in the state: {self.metrics}" ) def _get_binary_metrics(self): self._get_general_metrics() # add a metric to determine imbalance self.metrics["balanced accuracy"] = metrics.balanced_accuracy_score( self.y_true, self.y_pred ) self.metrics["precision"] = metrics.precision_score( self.y_true, self.y_pred, pos_label=self.positive_class ) self.metrics["recall"] = metrics.recall_score( self.y_true, self.y_pred, pos_label=self.positive_class ) self.metrics["f1"] = metrics.f1_score( self.y_true, self.y_pred, pos_label=self.positive_class, average="binary" ) if self.y_score is not None: if not all(0 >= x >= 1 for x in self.y_score): self.y_score = np.asarray( [0 if x < 0 else 1 if x > 1 else x for x in self.y_score] ) if len(np.asarray(self.y_score).shape) > 1: # If the SKLearn classifier doesn't correctly identify the problem as # binary classification, y_score may be of shape (n_rows, 2) # instead of (n_rows,) pos_class_idx = self.classes.index(self.positive_class) positive_class_scores = self.y_score[:, pos_class_idx] else: positive_class_scores = self.y_score ( self.metrics["false_positive_rate"], self.metrics["true_positive_rate"], _, ) = metrics.roc_curve(y_true=self.y_true, y_score=positive_class_scores, pos_label=self.positive_class) self.metrics["auc"] = metrics.auc( self.metrics["false_positive_rate"], self.metrics["true_positive_rate"] ) self.y_score = list(self.y_score) self.metrics["youden_j"] = ( self.metrics["true_positive_rate"] - self.metrics["false_positive_rate"] ) best_idx = np.argmax(self.metrics["youden_j"]) self.metrics["roc_best_model_score"] = ( self.metrics["false_positive_rate"][best_idx], self.metrics["true_positive_rate"][best_idx], ) ( self.metrics["precision_values"], self.metrics["recall_values"], _, ) = metrics.precision_recall_curve( self.y_true, self.y_score, pos_label=self.positive_class ) pr_best_idx = np.argmax( self.metrics["precision_values"] + self.metrics["recall_values"] ) self.metrics["pr_best_idx"] = pr_best_idx self.metrics["pr_best_model_score"] = ( self.metrics["recall_values"][pr_best_idx], self.metrics["precision_values"][pr_best_idx], ) self.metrics["average_precision_score"] = metrics.average_precision_score( self.y_true, self.y_score, pos_label=self.positive_class ) self.metrics["brier score"] = metrics.brier_score_loss( self.y_true, self.y_score ) self._get_lift_and_gain() # Compute KS Statistic curves self._binary_ks_curve() def _get_lift_and_gain(self): # make y_true a boolean vector y_true, y_score = np.asarray(self.y_true), np.asarray(self.y_score) y_true = y_true == self.positive_class sorted_indices = np.argsort(y_score)[::-1] y_true = y_true[sorted_indices] gains = np.cumsum(y_true) percentages = np.arange(start=1, stop=len(y_true) + 1) tp = sum(y_true) perfect = np.append( np.arange(start=1, stop=tp + 1) / float(tp), np.ones(len(y_true) - tp) ) gains = gains / float(np.sum(y_true)) percentages = percentages / float(len(y_true)) self.metrics["cumulative_gain"] = np.insert(gains, 0, [0]) * 100 self.metrics["percentages"] = np.insert(percentages, 0, [0]) * 100 self.metrics["perfect_gain"] = np.insert(perfect, 0, [0]) * 100 percentages = percentages[1:] self.metrics["lift"] = gains[1:] / percentages self.metrics["perfect_lift"] = perfect[1:] / percentages def _binary_ks_curve(self): """This function generates the points necessary to calculate the KS Statistic curve. Args: y_true (array-like, shape (n_samples)): True labels of the data. y_probas (array-like, shape (n_samples)): Probability predictions of the positive class. Returns: thresholds (numpy.ndarray): An array containing the X-axis values for plotting the KS Statistic plot. pct1 (numpy.ndarray): An array containing the Y-axis values for one curve of the KS Statistic plot. pct2 (numpy.ndarray): An array containing the Y-axis values for one curve of the KS Statistic plot. ks_statistic (float): The KS Statistic, or the maximum vertical distance between the two curves. max_distance_at (float): The X-axis value at which the maximum vertical distance between the two curves is seen. classes (np.ndarray, shape (2)): An array containing the labels of the two classes making up `y_true`. Raises: ValueError: If `y_true` is not composed of 2 classes. The KS Statistic is only relevant in binary classification. """ y_true, y_probas = np.asarray(self.y_true), np.asarray(self.y_score) lb = LabelEncoder() encoded_labels = lb.fit_transform(y_true) if len(lb.classes_) != 2: raise ValueError( "Cannot calculate KS statistic for data with " "{} category/ies".format(len(lb.classes_)) ) idx = encoded_labels == 0 data1 = np.sort(y_probas[idx]) data2 = np.sort(y_probas[np.logical_not(idx)]) ctr1, ctr2 = 0, 0 thresholds, pct1, pct2 = [], [], [] while ctr1 < len(data1) or ctr2 < len(data2): # Check if data1 has no more elements if ctr1 >= len(data1): current = data2[ctr2] while ctr2 < len(data2) and current == data2[ctr2]: ctr2 += 1 # Check if data2 has no more elements elif ctr2 >= len(data2): current = data1[ctr1] while ctr1 < len(data1) and current == data1[ctr1]: ctr1 += 1 else: if data1[ctr1] > data2[ctr2]: current = data2[ctr2] while ctr2 < len(data2) and current == data2[ctr2]: ctr2 += 1 elif data1[ctr1] < data2[ctr2]: current = data1[ctr1] while ctr1 < len(data1) and current == data1[ctr1]: ctr1 += 1 else: current = data2[ctr2] while ctr2 < len(data2) and current == data2[ctr2]: ctr2 += 1 while ctr1 < len(data1) and current == data1[ctr1]: ctr1 += 1 thresholds.append(current) pct1.append(ctr1) pct2.append(ctr2) thresholds = np.asarray(thresholds) pct1 = np.asarray(pct1) / float(len(data1)) pct2 = np.asarray(pct2) / float(len(data2)) if thresholds[0] != 0: thresholds = np.insert(thresholds, 0, [0.0]) pct1 = np.insert(pct1, 0, [0.0]) pct2 = np.insert(pct2, 0, [0.0]) if thresholds[-1] != 1: thresholds = np.append(thresholds, [1.0]) pct1 = np.append(pct1, [1.0]) pct2 = np.append(pct2, [1.0]) differences = pct1 - pct2 self.metrics["ks_statistic"], self.metrics["max_distance_at"] = ( np.max(differences), thresholds[np.argmax(differences)], ) ( self.metrics["ks_thresholds"], self.metrics["ks_pct1"], self.metrics["ks_pct2"], self.metrics["ks_labels"], ) = (thresholds, pct1, pct2, lb.classes_) def _get_multiclass_metrics(self): self._get_general_metrics() y_true, y_pred, y_score = ( np.asarray(self.y_true), np.asarray(self.y_pred), np.asarray(self.y_score), ) if not all([x in self.classes for x in np.unique(y_true)]): logger.warning( f"There are classes in the test dataset that are not specified in the `classes` " f"attribute of model ({self.model_name}). This may lead to erroneous " f"results." ) self.metrics["precision_weighted"] = metrics.precision_score( y_true, y_pred, labels=self.classes, average="weighted" ) self.metrics["precision_micro"] = metrics.precision_score( y_true, y_pred, labels=self.classes, average="micro" ) self.metrics["precision_by_label"] = metrics.precision_score( y_true, y_pred, labels=self.classes, average=None ).tolist() self.metrics["recall_weighted"] = metrics.recall_score( y_true, y_pred, labels=self.classes, average="weighted" ) self.metrics["recall_micro"] = metrics.recall_score( y_true, y_pred, labels=self.classes, average="micro" ) self.metrics["recall_by_label"] = metrics.recall_score( y_true, y_pred, labels=self.classes, average=None ).tolist() self.metrics["f1_weighted"] = metrics.f1_score( y_true, y_pred, labels=self.classes, average="weighted" ) self.metrics["f1_micro"] = metrics.f1_score( y_true, y_pred, labels=self.classes, average="micro" ) self.metrics["f1_by_label"] = metrics.f1_score( y_true, y_pred, labels=self.classes, average=None ).tolist() self.metrics["jaccard_weighted"] = metrics.jaccard_score( y_true, y_pred, labels=self.classes, average="weighted" ) self.metrics["jaccard_micro"] = metrics.jaccard_score( y_true, y_pred, labels=self.classes, average="micro" ) self.metrics["jaccard_by_label"] = metrics.jaccard_score( y_true, y_pred, labels=self.classes, average=None ).tolist() if self.y_score is not None: # Multiclass ROC ( self.metrics["fpr_by_label"], self.metrics["tpr_by_label"], self.metrics["auc"], self.metrics["roc_best_model_score"], ) = (dict(), dict(), dict(), dict()) for i, label in enumerate(self.classes): ( self.metrics["fpr_by_label"][i], self.metrics["tpr_by_label"][i], _, ) = metrics.roc_curve(y_true, y_score[:, i], pos_label=self.classes[i]) self.metrics["auc"][i] = metrics.auc( self.metrics["fpr_by_label"][i], self.metrics["tpr_by_label"][i] ) youden_j = ( self.metrics["tpr_by_label"][i] - self.metrics["fpr_by_label"][i] ) best_idx = np.argmax(youden_j) self.metrics["roc_best_model_score"][i] = ( self.metrics["fpr_by_label"][i][best_idx], self.metrics["tpr_by_label"][i][best_idx], ) # Multiclass PR ( self.metrics["recall_values"], self.metrics["precision_values"], self.metrics["pr_best_model_score"], ) = (dict(), dict(), dict()) for i, label in enumerate(self.classes): ( self.metrics["precision_values"][i], self.metrics["recall_values"][i], _, ) = metrics.precision_recall_curve( y_true, y_score[:, i], pos_label=self.classes[i] ) pr_best_idx = np.argmax( self.metrics["precision_values"][i] + self.metrics["recall_values"][i] ) self.metrics["pr_best_model_score"][i] = ( self.metrics["recall_values"][i][pr_best_idx], self.metrics["precision_values"][i][pr_best_idx], ) @runtime_dependency(module="scipy", install_from=OptionalDependency.VIZ) def _get_regression_metrics(self): self.y_true = np.array(self.y_true) self.y_pred = np.array(np.squeeze(self.y_pred)) args = [self.y_true, self.y_pred] scoring_functions = { "r2_score": ("r2_score", len(args)), "explained_variance_score": ("explained_variance_score", len(args)), "max_error": ("max_error", len(args)), "mae": ("mean_absolute_error", len(args)), "mse": ("mean_squared_error", len(args)), "median_absolute_error": ("median_absolute_error", len(args)), } self.safe_metrics_call(scoring_functions, *args) self.metrics["root_mean_squared_error"] = np.sqrt(self.metrics["mse"]) self.metrics["residuals"] = self.y_true - self.y_pred self.metrics["mean_residuals"] = np.mean(self.metrics["residuals"]) # For QQ Plot: portions = min(len(self.metrics["residuals"]), 100) + 1 norm_quantiles = (np.arange(portions) / portions)[1:] self.metrics["norm_quantiles"] = scipy.stats.norm.ppf(norm_quantiles) resid_quant = [ np.quantile(self.metrics["residuals"], p) for p in norm_quantiles ] self.metrics["residual_quantiles"] = scipy.stats.zscore(resid_quant)
[docs] def safe_metrics_call(self, scoring_functions, *args, **kwargs): """Applies the sklearn function in `scoring_functions` to parameters in `args`. Parameters ---------- scoring_functions: (dict) Scoring functions dictionary args: (keyword arguments) Arguments passed to the sklearn function from metrics Returns: Nothing Raises: Exception: If an error is enountered applying the sklearn function fn to arguments. """ for name, (fn, n_params) in scoring_functions.items(): try: if fn == "confusion_matrix": self.metrics[name] = getattr(metrics, fn)( **{ "y_true": args[0], "y_pred": args[1], "labels": kwargs["labels"], } ) else: self.metrics[name] = getattr(metrics, fn)( *(args[:n_params]), **kwargs ) except Exception as e: self.metrics[name] = f"Error unable to compute {fn}, due to: {e}"