Source code for stats_misc.machine_learning.validation

"""
A collection of metrics used to evaluate a prediction model, covering
discrimination and calibration statistics.

Currently the module is focussed on binary/classification tasks.
"""

import warnings
import pandas as pd
import numpy as np
import statsmodels.api as sm
from stats_misc.utils.general import (
    invlogit,
    # logit,
)
from stats_misc.intervals import (
    beta_confidence_interval,
)
from stats_misc.constants import (
    is_type,
    is_df,
    are_columns_in_df,
    NamesValidation as NamesVal,
    NamesIntervals as NamesI,
    same_len,
    Error_MSG,
)
from sklearn.metrics import (
    roc_curve,
)
from scipy.stats import chi2, norm as normal
from typing import Any, List, Type, Union, Tuple, Optional, Dict

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] class CstatisticResults(object): ''' A results objects containing c-statistic estimates. Attributes ---------- c_statistic: float The c-statistic. lower_bound: float The lower bound of the confidence interval. upper_bound: float The upper bound of the confidence interval. confidence_interval The lower and upper bounds of the confidence interval. coverage : float The interval coverage. standard_error : float The standard error of the c-statistic, based on an asymptotic approximation to the normal distribution. ''' SET_ARGS = [ NamesVal.CSTAT, NamesVal.CSTAT_LB, NamesVal.CSTAT_UB, NamesVal.CSTAT_INTERVAL, NamesVal.CSTAT_COVERAGE, NamesVal.CSTAT_SE, ] # Initiation the class def __init__(self, **kwargs): """ Initialise """ for k in kwargs.keys(): if k not in self.__class__.SET_ARGS: raise AttributeError("unrecognised argument '{0}'".format(k)) # Loops over `SET_ARGS`, assigns the kwargs content to name `s`. # if argument is missing in kwargs, print a warning. for s in self.__class__.SET_ARGS: try: setattr(self, s, kwargs[s]) except KeyError: warnings.warn("argument '{0}' is set to 'None'".format(s)) setattr(self, s, None) # ///////////////////////////////////////////////////////////////////////// def __str__(self): return f"An `Interval` results class."
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] class CalibrationResults(CstatisticResults): ''' A results objects containing calibration estimates. Attributes ---------- calibration_slope: float The calibration slope. calibration_slope_se: float The standard error of the calibration slope. calibration_intercept: float Calibration-in-the-large estimate. calibration_intercept_se: float The standard error of the calibration-in-the-large. observed_predict_table: pd.DataFrame an optional table with observed and predicted risks. ''' SET_ARGS = [ NamesVal.CAL_SLOPE, NamesVal.CAL_SLOPE_SE, NamesVal.CAL_INTERCEPT, NamesVal.CAL_INTERCEPT_SE, NamesVal.CAL_TABLE, ] # ///////////////////////////////////////////////////////////////////////// def __str__(self): return f"An `Calibration` results class."
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] class RecalibrationResults(CstatisticResults): ''' A results objects containing recalibration estimates. Attributes ---------- slope: float The slope estimate intercept: float The intercept estimate table_recalibrated: pd.DataFrame A table with the recalibrated estimates appended. ''' SET_ARGS = [ NamesVal.RECAL_SLOPE, NamesVal.RECAL_INTERCEPT, NamesVal.RECAL_TABLE, ] # ///////////////////////////////////////////////////////////////////////// def __str__(self): return f"An `Calibration` results class."
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% # discrimination functions # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def cstatistic(observed:Union[pd.Series, np.ndarray], predicted:Union[pd.Series, np.ndarray], alpha:Union[None, float] = None) -> CstatisticResults: ''' Calculates Harell's "c-statistic" for models predicting a binary outcome Parameters ---------- observed : pd.Series or np.array a column vector with the observed class. predicted : pd.Series or np.array a column vector with the predicted class or predicted probability. alpha: float, default `NoneType` the (1-alpha)% confidence interval. Default `None` for no confidence interval. Returns ------- CstatisticResults Harrel's c-statistics with optional confidence interval. Notes ----- Predictions (predicted column) can range between +- infinity, allowing for evaluations of non-Prob scores. The standard error are derived based on: .. [1] JA Hanley and BJ McNeil, "The meaning and use of the area under a receiver operating characteristic (ROC) curve, Radiology 1982; 29-36." see (Table 2). ''' # Check input is_type(observed, (pd.Series, np.ndarray), 'observed') is_type(predicted, (pd.Series, np.ndarray), 'predicted') is_type(alpha, (type(None), float), 'alpha') # is alpha between 0 and 1 if not alpha is None: if alpha <= 0 or alpha >= 1: raise ValueError(Error_MSG.FLOAT_LIMITS.format('alpha', 0, 1)) same_len(predicted, observed) # set pd.Series to numpy array if isinstance(predicted, pd.Series): predicted = predicted.to_numpy() if isinstance(observed, pd.Series): observed = observed.to_numpy() # normal versus abnormal norm_ = predicted[observed == 0] abnorm = predicted[observed == 1] # outer subtraction mat_out = np.subtract.outer(abnorm, norm_) # cstat if len(norm_) == 0 or len(abnorm) == 0: cstat = np.nan else: cstat = np.mean( (mat_out > 0) + 0.5 * (mat_out == 0)) res_dict = {NamesVal.CSTAT: cstat} # standard error if alpha is not None: # check type is_type(alpha, float) # do stuff Q1 = cstat/(2-cstat) Q2 = (2 * cstat**2)/(1+cstat) nn = len(norm_) na = len(abnorm) num = ( cstat * (1 - cstat) + (na - 1)*(Q1-cstat**2) + (nn - 1)*(Q2 - cstat**2)) denom = na * nn if denom == 0: var = np.nan else: var = num/denom # confidence interval (large sample size!!) z = np.sqrt(chi2.ppf(1-alpha, 1)) # results res_dict[NamesVal.CSTAT_LB] = cstat - z * np.sqrt(var) res_dict[NamesVal.CSTAT_UB] = cstat + z * np.sqrt(var) res_dict[NamesVal.CSTAT_INTERVAL] =\ (res_dict[NamesVal.CSTAT_LB], res_dict[NamesVal.CSTAT_UB] ) res_dict[NamesVal.CSTAT_COVERAGE] = 1-alpha res_dict[NamesVal.CSTAT_SE] = np.sqrt(var) # ignoring class warnings due to potential missing results warnings.filterwarnings("ignore", category=UserWarning) results = CstatisticResults(**res_dict) warnings.resetwarnings() # return stuff return results
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def format_roc(observed:Union[pd.Series, np.ndarray], predicted:Union[pd.Series, np.ndarray], **kwargs:Optional[Any], ) -> pd.DataFrame: ''' Takes a binary `observed` column vector and a continuous `predicted` column vector, and returns a pd.DataFrame with the columns `false_positive`, `sensitivity` and `threshold`. Arguments --------- observed: numpy array A column vector of the observed binary outcomes. predicted: numpy array A column vector of the predicted outcome (should be continuous), e.g., representing the predicted probability. kwargs: Any Supplied to `sklearn.metrics.roc_curve`. Returns ------- pd.DataFrame A table with columns: `false_positive`, `sensitivity` and `threshold`. ''' # check input is_type(observed, (pd.Series, np.ndarray), 'observed') is_type(predicted, (pd.Series, np.ndarray), 'predicted') same_len(observed,predicted) # get columns false_positive, sensitivity, threshold = roc_curve( y_true=observed, y_score=predicted, **kwargs, ) # make data frame results = pd.DataFrame( { NamesVal.FALSE_POSITIVE: false_positive, NamesVal.SENSITIVITY: sensitivity, NamesVal.THRESHOLD: threshold, } ) # return return results
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% # calibration function # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_calibration(data:pd.DataFrame, observed:str, predicted:str, alpha:float=0.05, bins: None| int =None, kwargs_ci: None | dict = None, ) -> CalibrationResults: """ Estimate the calibration slope and calibration-in-the-large assuming a binomial data generating model. A binomial model is generally appropriate if the predicted risk reflects an event occurring at a fixed moment in time, e.g. after 1 year or 1 hour. Parameters ---------- data : pd.DataFrames A table with the columns `observed` and `predicted`. observed : str a column name in `data` referencing the binary outcome column. predicted : str a column name in `data` referencing the logit predicted risk. bins : int, optional, default `NoneType` an optional integer used to create equally sized bins of the predicted logit risk and returns the average predicted and observed risk in each bin (an observed vs expected table) alpha: float, default 0.05 the (1-alpha)% confidence interval. Used for the observed risk when bin is supplied. Returns ------- CalibrationResults A calibration intercept and slope estimates, and an optional observed and expected table. Notes ----- The function DOES not presuppose the predicted risk is derived from a logistic (binomial) regression model. ANY model predicting risk at a fixed moment in time is acceptable, including models that typically provide interval predictions such a classification trees. Some/many models or rules may only provide predicted risk not the logit risk, in such cases simply call the `validation.logit` function to derive the appropriate variable. """ # Constants CS = NamesVal.CAL_SLOPE CS_SE = NamesVal.CAL_SLOPE_SE CIL = NamesVal.CAL_INTERCEPT CIL_SE = NamesVal.CAL_INTERCEPT_SE # Check input is_type(observed, str, 'observed') is_type(predicted, str, 'predicted') is_type(alpha, float, 'alpha') is_type(kwargs_ci, (type(None), dict)) # is alpha between 0 and 1 if alpha <= 0 or alpha >= 1: raise ValueError(Error_MSG.FLOAT_LIMITS.format('alpha', 0, 1)) dt = data.copy() is_df(dt) are_columns_in_df(dt, [observed] + [predicted]) # empty kwargs kwargs_ci = kwargs_ci or {} # outcome and predictor out = dt[observed] s = sm.add_constant(dt[predicted]) # calculating the calibration slope with standard errors cs_model = sm.Logit(out, s) try: fit_cs = cs_model.fit(disp=0) except Exception as exc: raise RuntimeError(f"Recalibration error: {exc}") # extracting data res_dict = {CS: fit_cs.params.iloc[1], CS_SE: fit_cs.bse.iloc[1]} # calculating the calibration slope with standard errors # NOTE the sm.Logit function ignored the offset argument cil_model = sm.GLM(out, s[s.columns[0]], offset=s[s.columns[1]], family=sm.families.Binomial()) try: fit_cil = cil_model.fit(disp=0) except Exception as exc: raise RuntimeError(f"Recalibration error: {exc}") # extracting data res_dict[CIL] = fit_cil.params.iloc[0] res_dict[CIL_SE] = fit_cil.bse.iloc[0] # calculating average predicted and observed risk if not bins == None: # check input is_type(bins, int) if bins > dt.shape[0]: raise ValueError('The number of requested bins is larger than the ' 'number of observations ins `data`.') # calculate required data dt['bins'] = pd.qcut(dt[predicted], bins, labels=False) dt[NamesVal.PREDICTED_RISK] = invlogit(dt[predicted].to_numpy()) ### calculate means observed_expect_table = dt.groupby('bins')[ [NamesVal.PREDICTED_RISK, observed]].agg(['mean', 'count']) # remove duplicate col observed_expect_table.drop((observed,'count'), axis=1, inplace=True) # get confidence interval lower, upper = [], [] for _, row in observed_expect_table.iterrows(): kwargs_ci_new = {**kwargs_ci, **{'alpha': alpha}} res = beta_confidence_interval( row.iloc[2], int(row.iloc[1]), **kwargs_ci_new, # integer=True, # alpha=alpha, ) l, u = res.__getattribute__(NamesI.VALUES) lower.append(l) upper.append(u) observed_expect_table[NamesVal.AVG_OBSERVED_RISK_LB] = lower observed_expect_table[NamesVal.AVG_OBSERVED_RISK_UB] = upper # rename observed_expect_table.columns = [ NamesVal.AVG_PREDICTED_RISK, NamesVal.NO_SUBJECTS, NamesVal.AVG_OBSERVED_RISK, NamesVal.AVG_OBSERVED_RISK_LB, NamesVal.AVG_OBSERVED_RISK_UB, ] # reorder observed_expect_table = observed_expect_table[ [NamesVal.AVG_PREDICTED_RISK, NamesVal.AVG_OBSERVED_RISK, NamesVal.AVG_OBSERVED_RISK_LB, NamesVal.AVG_OBSERVED_RISK_UB, NamesVal.NO_SUBJECTS] ] res_dict[NamesVal.CAL_TABLE] = observed_expect_table # ignoring class warnings due to potential missing results warnings.filterwarnings("ignore", category=UserWarning) results = CalibrationResults(**res_dict) warnings.resetwarnings() # returns results class return results
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def recalibrate(data:pd.DataFrame, score:str, observed:str, model:str='binomial', ) -> RecalibrationResults: """ The original `score` will be re-trained on observed outcome data, improving the agreement between the observed and predicted risk/outcome. Parameters ---------- data : pd.DataFrame A table with columns `score` and `observed`. observed : str a column name in `data` score : str a column name in `data`. Note that contrary to the calibration function the score can be in any format depending on intended use. For binomial model one would typically want to supply a `logit risk`. model : str, default `binomial` which model should be used (default: 'binomial', or 'gaussian') Returns ------- RecalibrationResults The recalibration intercept and slope, as well as a table with the recalibrated predictions. """ # Constants BINOM = NamesVal.RECAL_BINOMIAL; GAUSS = NamesVal.RECAL_GAUSSIAN SLOPE = NamesVal.RECAL_SLOPE; INTERCEPT = NamesVal.RECAL_INTERCEPT # Check input dt = data.copy() is_df(dt) are_columns_in_df(dt, [observed, score]) if not any([i in model for i in [BINOM, GAUSS]]): raise NameError("Error: model should equal {0} or {1}.".format( BINOM, GAUSS)) # Recalibrate the score out = dt[observed] s = sm.add_constant(dt[score]) if model == "binomial": # recalibrating the model recal_model = sm.Logit(out, s) try: fit = recal_model.fit(disp=0) except Exception as exc: raise RuntimeError(f"Recalibration error: {exc}") if model == "gaussian": # recalibrating the model recal_model = sm.OLS(out, s) try: fit = recal_model.fit(disp=0) except Exception as exc: raise RuntimeError(f"Recalibration error {exc}") # results dt_recal = pd.DataFrame({score: fit.predict()}) results = {INTERCEPT: fit.params.iloc[0], SLOPE: fit.params.iloc[1], NamesVal.RECAL_TABLE: dt_recal} # returning stuff return RecalibrationResults(**results)