"""
A collection of metrics used to evaluate a prediction model, covering
discrimination and calibration statistics.
Currently the module is focussed on binary/classification tasks.
"""
import warnings
import pandas as pd
import numpy as np
import statsmodels.api as sm
from stats_misc.utils.general import (
invlogit,
# logit,
)
from stats_misc.intervals import (
beta_confidence_interval,
)
from stats_misc.constants import (
is_type,
is_df,
are_columns_in_df,
NamesValidation as NamesVal,
NamesIntervals as NamesI,
same_len,
Error_MSG,
)
from sklearn.metrics import (
roc_curve,
)
from scipy.stats import chi2, norm as normal
from typing import Any, List, Type, Union, Tuple, Optional, Dict
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
class CstatisticResults(object):
'''
A results objects containing c-statistic estimates.
Attributes
----------
c_statistic: float
The c-statistic.
lower_bound: float
The lower bound of the confidence interval.
upper_bound: float
The upper bound of the confidence interval.
confidence_interval
The lower and upper bounds of the confidence interval.
coverage : float
The interval coverage.
standard_error : float
The standard error of the c-statistic, based on an asymptotic
approximation to the normal distribution.
'''
SET_ARGS = [
NamesVal.CSTAT,
NamesVal.CSTAT_LB,
NamesVal.CSTAT_UB,
NamesVal.CSTAT_INTERVAL,
NamesVal.CSTAT_COVERAGE,
NamesVal.CSTAT_SE,
]
# Initiation the class
def __init__(self, **kwargs):
"""
Initialise
"""
for k in kwargs.keys():
if k not in self.__class__.SET_ARGS:
raise AttributeError("unrecognised argument '{0}'".format(k))
# Loops over `SET_ARGS`, assigns the kwargs content to name `s`.
# if argument is missing in kwargs, print a warning.
for s in self.__class__.SET_ARGS:
try:
setattr(self, s, kwargs[s])
except KeyError:
warnings.warn("argument '{0}' is set to 'None'".format(s))
setattr(self, s, None)
# /////////////////////////////////////////////////////////////////////////
def __str__(self):
return f"An `Interval` results class."
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
class CalibrationResults(CstatisticResults):
'''
A results objects containing calibration estimates.
Attributes
----------
calibration_slope: float
The calibration slope.
calibration_slope_se: float
The standard error of the calibration slope.
calibration_intercept: float
Calibration-in-the-large estimate.
calibration_intercept_se: float
The standard error of the calibration-in-the-large.
observed_predict_table: pd.DataFrame
an optional table with observed and predicted risks.
'''
SET_ARGS = [
NamesVal.CAL_SLOPE,
NamesVal.CAL_SLOPE_SE,
NamesVal.CAL_INTERCEPT,
NamesVal.CAL_INTERCEPT_SE,
NamesVal.CAL_TABLE,
]
# /////////////////////////////////////////////////////////////////////////
def __str__(self):
return f"An `Calibration` results class."
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
class RecalibrationResults(CstatisticResults):
'''
A results objects containing recalibration estimates.
Attributes
----------
slope: float
The slope estimate
intercept: float
The intercept estimate
table_recalibrated: pd.DataFrame
A table with the recalibrated estimates appended.
'''
SET_ARGS = [
NamesVal.RECAL_SLOPE,
NamesVal.RECAL_INTERCEPT,
NamesVal.RECAL_TABLE,
]
# /////////////////////////////////////////////////////////////////////////
def __str__(self):
return f"An `Calibration` results class."
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# discrimination functions
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def cstatistic(observed:Union[pd.Series, np.ndarray],
predicted:Union[pd.Series, np.ndarray],
alpha:Union[None, float] = None) -> CstatisticResults:
'''
Calculates Harell's "c-statistic" for models predicting
a binary outcome
Parameters
----------
observed : pd.Series or np.array
a column vector with the observed class.
predicted : pd.Series or np.array
a column vector with the predicted class or predicted probability.
alpha: float, default `NoneType`
the (1-alpha)% confidence interval. Default `None` for no confidence
interval.
Returns
-------
CstatisticResults
Harrel's c-statistics with optional confidence interval.
Notes
-----
Predictions (predicted column) can range between +- infinity, allowing for
evaluations of non-Prob scores.
The standard error are derived based on:
.. [1] JA Hanley and BJ McNeil, "The meaning and use of the area under a
receiver operating characteristic (ROC) curve, Radiology 1982; 29-36." see
(Table 2).
'''
# Check input
is_type(observed, (pd.Series, np.ndarray), 'observed')
is_type(predicted, (pd.Series, np.ndarray), 'predicted')
is_type(alpha, (type(None), float), 'alpha')
# is alpha between 0 and 1
if not alpha is None:
if alpha <= 0 or alpha >= 1:
raise ValueError(Error_MSG.FLOAT_LIMITS.format('alpha', 0, 1))
same_len(predicted, observed)
# set pd.Series to numpy array
if isinstance(predicted, pd.Series):
predicted = predicted.to_numpy()
if isinstance(observed, pd.Series):
observed = observed.to_numpy()
# normal versus abnormal
norm_ = predicted[observed == 0]
abnorm = predicted[observed == 1]
# outer subtraction
mat_out = np.subtract.outer(abnorm, norm_)
# cstat
if len(norm_) == 0 or len(abnorm) == 0:
cstat = np.nan
else:
cstat = np.mean( (mat_out > 0) + 0.5 * (mat_out == 0))
res_dict = {NamesVal.CSTAT: cstat}
# standard error
if alpha is not None:
# check type
is_type(alpha, float)
# do stuff
Q1 = cstat/(2-cstat)
Q2 = (2 * cstat**2)/(1+cstat)
nn = len(norm_)
na = len(abnorm)
num = ( cstat * (1 - cstat) + (na - 1)*(Q1-cstat**2) + (nn - 1)*(Q2 - cstat**2))
denom = na * nn
if denom == 0:
var = np.nan
else:
var = num/denom
# confidence interval (large sample size!!)
z = np.sqrt(chi2.ppf(1-alpha, 1))
# results
res_dict[NamesVal.CSTAT_LB] = cstat - z * np.sqrt(var)
res_dict[NamesVal.CSTAT_UB] = cstat + z * np.sqrt(var)
res_dict[NamesVal.CSTAT_INTERVAL] =\
(res_dict[NamesVal.CSTAT_LB],
res_dict[NamesVal.CSTAT_UB]
)
res_dict[NamesVal.CSTAT_COVERAGE] = 1-alpha
res_dict[NamesVal.CSTAT_SE] = np.sqrt(var)
# ignoring class warnings due to potential missing results
warnings.filterwarnings("ignore", category=UserWarning)
results = CstatisticResults(**res_dict)
warnings.resetwarnings()
# return stuff
return results
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# calibration function
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def get_calibration(data:pd.DataFrame,
observed:str,
predicted:str,
alpha:float=0.05,
bins: None| int =None,
kwargs_ci: None | dict = None,
) -> CalibrationResults:
"""
Estimate the calibration slope and calibration-in-the-large assuming a
binomial data generating model. A binomial model is generally appropriate
if the predicted risk reflects an event occurring at a fixed moment in time,
e.g. after 1 year or 1 hour.
Parameters
----------
data : pd.DataFrames
A table with the columns `observed` and `predicted`.
observed : str
a column name in `data` referencing the binary outcome column.
predicted : str
a column name in `data` referencing the logit predicted risk.
bins : int, optional, default `NoneType`
an optional integer used to create equally sized bins of the predicted
logit risk and returns the average predicted and observed risk in
each bin (an observed vs expected table)
alpha: float, default 0.05
the (1-alpha)% confidence interval. Used for the observed risk when
bin is supplied.
Returns
-------
CalibrationResults
A calibration intercept and slope estimates, and an optional
observed and expected table.
Notes
-----
The function DOES not presuppose the predicted risk is derived from a
logistic (binomial) regression model. ANY model predicting risk at a fixed
moment in time is acceptable, including models that typically provide
interval predictions such a classification trees. Some/many models or rules
may only provide predicted risk not the logit risk, in such cases simply
call the `validation.logit` function to derive the appropriate variable.
"""
# Constants
CS = NamesVal.CAL_SLOPE
CS_SE = NamesVal.CAL_SLOPE_SE
CIL = NamesVal.CAL_INTERCEPT
CIL_SE = NamesVal.CAL_INTERCEPT_SE
# Check input
is_type(observed, str, 'observed')
is_type(predicted, str, 'predicted')
is_type(alpha, float, 'alpha')
is_type(kwargs_ci, (type(None), dict))
# is alpha between 0 and 1
if alpha <= 0 or alpha >= 1:
raise ValueError(Error_MSG.FLOAT_LIMITS.format('alpha', 0, 1))
dt = data.copy()
is_df(dt)
are_columns_in_df(dt, [observed] + [predicted])
# empty kwargs
kwargs_ci = kwargs_ci or {}
# outcome and predictor
out = dt[observed]
s = sm.add_constant(dt[predicted])
# calculating the calibration slope with standard errors
cs_model = sm.Logit(out, s)
try:
fit_cs = cs_model.fit(disp=0)
except Exception as exc:
raise RuntimeError(f"Recalibration error: {exc}")
# extracting data
res_dict = {CS: fit_cs.params.iloc[1], CS_SE: fit_cs.bse.iloc[1]}
# calculating the calibration slope with standard errors
# NOTE the sm.Logit function ignored the offset argument
cil_model = sm.GLM(out, s[s.columns[0]],
offset=s[s.columns[1]], family=sm.families.Binomial())
try:
fit_cil = cil_model.fit(disp=0)
except Exception as exc:
raise RuntimeError(f"Recalibration error: {exc}")
# extracting data
res_dict[CIL] = fit_cil.params.iloc[0]
res_dict[CIL_SE] = fit_cil.bse.iloc[0]
# calculating average predicted and observed risk
if not bins == None:
# check input
is_type(bins, int)
if bins > dt.shape[0]:
raise ValueError('The number of requested bins is larger than the '
'number of observations ins `data`.')
# calculate required data
dt['bins'] = pd.qcut(dt[predicted], bins, labels=False)
dt[NamesVal.PREDICTED_RISK] = invlogit(dt[predicted].to_numpy())
### calculate means
observed_expect_table = dt.groupby('bins')[
[NamesVal.PREDICTED_RISK, observed]].agg(['mean', 'count'])
# remove duplicate col
observed_expect_table.drop((observed,'count'), axis=1, inplace=True)
# get confidence interval
lower, upper = [], []
for _, row in observed_expect_table.iterrows():
kwargs_ci_new = {**kwargs_ci, **{'alpha': alpha}}
res = beta_confidence_interval(
row.iloc[2], int(row.iloc[1]),
**kwargs_ci_new,
# integer=True,
# alpha=alpha,
)
l, u = res.__getattribute__(NamesI.VALUES)
lower.append(l)
upper.append(u)
observed_expect_table[NamesVal.AVG_OBSERVED_RISK_LB] = lower
observed_expect_table[NamesVal.AVG_OBSERVED_RISK_UB] = upper
# rename
observed_expect_table.columns = [
NamesVal.AVG_PREDICTED_RISK, NamesVal.NO_SUBJECTS,
NamesVal.AVG_OBSERVED_RISK, NamesVal.AVG_OBSERVED_RISK_LB,
NamesVal.AVG_OBSERVED_RISK_UB,
]
# reorder
observed_expect_table = observed_expect_table[
[NamesVal.AVG_PREDICTED_RISK, NamesVal.AVG_OBSERVED_RISK,
NamesVal.AVG_OBSERVED_RISK_LB, NamesVal.AVG_OBSERVED_RISK_UB,
NamesVal.NO_SUBJECTS]
]
res_dict[NamesVal.CAL_TABLE] = observed_expect_table
# ignoring class warnings due to potential missing results
warnings.filterwarnings("ignore", category=UserWarning)
results = CalibrationResults(**res_dict)
warnings.resetwarnings()
# returns results class
return results
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs]
def recalibrate(data:pd.DataFrame,
score:str, observed:str,
model:str='binomial',
) -> RecalibrationResults:
"""
The original `score` will be re-trained on observed outcome data, improving
the agreement between the observed and predicted risk/outcome.
Parameters
----------
data : pd.DataFrame
A table with columns `score` and `observed`.
observed : str
a column name in `data`
score : str
a column name in `data`. Note that contrary to the calibration function
the score can be in any format depending on intended use. For binomial
model one would typically want to supply a `logit risk`.
model : str, default `binomial`
which model should be used (default: 'binomial', or 'gaussian')
Returns
-------
RecalibrationResults
The recalibration intercept and slope, as well as a table with the
recalibrated predictions.
"""
# Constants
BINOM = NamesVal.RECAL_BINOMIAL; GAUSS = NamesVal.RECAL_GAUSSIAN
SLOPE = NamesVal.RECAL_SLOPE; INTERCEPT = NamesVal.RECAL_INTERCEPT
# Check input
dt = data.copy()
is_df(dt)
are_columns_in_df(dt, [observed, score])
if not any([i in model for i in [BINOM, GAUSS]]):
raise NameError("Error: model should equal {0} or {1}.".format(
BINOM, GAUSS))
# Recalibrate the score
out = dt[observed]
s = sm.add_constant(dt[score])
if model == "binomial":
# recalibrating the model
recal_model = sm.Logit(out, s)
try:
fit = recal_model.fit(disp=0)
except Exception as exc:
raise RuntimeError(f"Recalibration error: {exc}")
if model == "gaussian":
# recalibrating the model
recal_model = sm.OLS(out, s)
try:
fit = recal_model.fit(disp=0)
except Exception as exc:
raise RuntimeError(f"Recalibration error {exc}")
# results
dt_recal = pd.DataFrame({score: fit.predict()})
results = {INTERCEPT: fit.params.iloc[0], SLOPE: fit.params.iloc[1],
NamesVal.RECAL_TABLE: dt_recal}
# returning stuff
return RecalibrationResults(**results)