Source code for stats_misc.machine_learning.firthlogist

"""
A module implementing Firth's logistic regression.

Firth's regression is a penalised likelihood method that addresses small-sample
bias and perfect separation in logistic regression. It adjusts score function
to yield more reliable parameter estimates.

Note this is essentially a fork of the GitHub `firthlogist <https://github.com/jzluo/firthlogist/tree/master>`_
repo, with minor tweaks to work on python 3.10+.
"""
import warnings
import numpy as np
from math import sqrt
from copy import deepcopy
from scipy.linalg import lapack
from scipy.special import expit
from scipy.stats import chi2, norm
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.exceptions import ConvergenceWarning
from sklearn.preprocessing import LabelEncoder
from sklearn.utils.multiclass import check_classification_targets
from sklearn.utils.validation import (
    check_is_fitted,
    check_X_y,
    check_array
)
from tabulate import tabulate
from stats_misc.constants import (
    Error_MSG,
)

# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[docs] class FirthLogisticRegression(BaseEstimator, ClassifierMixin): """ Logistic regression with Firth's bias reduction method. This is based on the implementation in the `logistf` R package. Please see the `logistf` references [1]_ and [2]_ for details about the method. Parameters ---------- max_iter : `int`, default 25 The maximum number of Newton-Raphson iterations. max_halfstep : `int`, default 0 The maximum number of step-halvings in one Newton-Raphson iteration. max_stepsize : `int`, default 5 The maximum step size - for each coefficient, the step size is forced to be less than max_stepsize. pl_max_iter : `int`, default 100 The maximum number of Newton-Raphson iterations for finding profile likelihood confidence intervals. pl_max_halfstep : `int`, default 0 The maximum number of step-halvings in one iteration for finding profile likelihood confidence intervals. pl_max_stepsize : `int`, default 5 The maximum step size while finding PL confidence intervals. tol : `float`, default 0.0001 Convergence tolerance for stopping. fit_intercept : `bool`, default `True` Specifies if intercept should be added. skip_pvals : `bool`, default `False` If True, p-values will not be calculated. Calculating the p-values can be time-consuming if `wald=False` since the fitting procedure is repeated for each coefficient. skip_ci : `bool`, default `False` If True, confidence intervals will not be calculated. Calculating the confidence intervals via profile likelihoood is time-consuming. alpha : `float`, default 0.05 Significance level (confidence interval = 1-alpha). wald : `bool`, default `False` If True, uses Wald method to calculate p-values and confidence intervals. test_vars : `int`, `list` [`int`], or `None`, default `None` Index or list of indices of the variables for which to calculate confidence intervals and p-values. If None, calculate for all variables. This option has no effect if wald=True. Attributes ---------- bse_ Standard errors of the coefficients. classes_ A list of the class labels. ci_ The fitted profile likelihood confidence intervals. coef_ The coefficients of the features. intercept_ Fitted intercept. If `fit_intercept = False`, the intercept is set to zero. loglik_ Fitted penalized log-likelihood. llf_ A statsmodels compliant allies for `loglik_` n_iter_ Number of Newton-Raphson iterations performed. pvals_ p-values calculated by penalized likelihood ratio tests. References ---------- .. [1] Firth D (1993). Bias reduction of maximum likelihood estimates. Biometrika 80, 27–38. .. [2] Heinze G, Schemper M (2002). A solution to the problem of separation in logistic regression. Statistics in Medicine 21: 2409-2419. """ # ///////////////////////////////////////////////////////////////////////// def __init__( self, max_iter=25, max_halfstep=0, max_stepsize=5, pl_max_iter=100, pl_max_halfstep=0, pl_max_stepsize=5, tol=0.0001, fit_intercept=True, skip_pvals=False, skip_ci=False, alpha=0.05, wald=False, test_vars=None, ): # ### check input # is alpha between 0 and 1 if alpha <= 0 or alpha >= 1: raise ValueError(Error_MSG.FLOAT_LIMITS.format('alpha', 0, 1)) # ### assign input self.max_iter = max_iter self.max_stepsize = max_stepsize self.max_halfstep = max_halfstep self.pl_max_iter = pl_max_iter self.pl_max_halfstep = pl_max_halfstep self.pl_max_stepsize = pl_max_stepsize self.tol = tol self.fit_intercept = fit_intercept self.skip_pvals = skip_pvals self.skip_ci = skip_ci self.alpha = alpha self.wald = wald self.test_vars = test_vars # ///////////////////////////////////////////////////////////////////////// def _more_tags(self): # return {"binary_only": True} return {"binary_only": True, "fit requires 2 classes only": True, "requires_y": True,} # /////////////////////////////////////////////////////////////////////////
[docs] def fit(self, X, y): """ Fits a Firth logistic regression model. Parameters ---------- X : `np.ndarray` A 2d numpy array with dimmension n (subjects) by k (variables). The array represents the design matrix of independent variables. y : `np.ndarray` A 1d or 2d numpy array with dimmension n by 1. The array represents the dependent variable, typically coded as 0 and 1. """ # ### validate input # Validate X and y, forcing float64, no NaN/inf, ensures 2D for X X, y = check_X_y( X, y, dtype=np.float64, accept_sparse=False, ensure_2d=True, force_all_finite=True ) # Enforce at least 2 samples if X.shape[0] < 2: raise ValueError( f"Found array with {X.shape[0]} sample(s) (shape={X.shape}) " "while a minimum of 2 is required by " f"{self.__class__.__name__}." ) # Record the number of features, per scikit-learn convention self.n_features_in_ = X.shape[1] # Confirm classification targets are valid, must be binary check_classification_targets(y) self.classes_ = np.unique(y) if len(self.classes_) != 2: raise ValueError("Only binary classification is supported.") # Encode y as 0/1 y = LabelEncoder().fit_transform(y).astype(X.dtype, copy=False) # ### pre-process data if self.fit_intercept: X = np.hstack((X, np.ones((X.shape[0], 1)))) # ### fit model self.coef_, self.loglik_, self.n_iter_ = _firth_newton_raphson( X, y, self.max_iter, self.max_stepsize, self.max_halfstep, self.tol ) # map loglig to llf (matching statsmodels) self.llf_ = self.loglik_ self.bse_ = _bse(X, self.coef_) if not self.skip_ci: if not self.wald: self.ci_ = _profile_likelihood_ci( X=X, y=y, fitted_coef=self.coef_, full_loglik=self.loglik_, max_iter=self.pl_max_iter, max_stepsize=self.pl_max_stepsize, max_halfstep=self.pl_max_halfstep, tol=self.tol, alpha=self.alpha, test_vars=self.test_vars, ) else: self.ci_ = _wald_ci(self.coef_, self.bse_, self.alpha) else: self.ci_ = np.full((self.coef_.shape[0], 2), np.nan) # penalized likelihood ratio tests if not self.skip_pvals: if not self.wald: self.pvals_ = _penalized_lrt( self.loglik_, X, y, self.max_iter, self.max_stepsize, self.max_halfstep, self.tol, self.test_vars, ) else: self.pvals_ = _wald_test(self.coef_, self.bse_) else: self.pvals_ = np.full(self.coef_.shape[0], np.nan) if self.fit_intercept: self.intercept_ = self.coef_[-1] self.coef_ = self.coef_[:-1] else: self.intercept_ = 0 return self
# ///////////////////////////////////////////////////////////////////////// @property def llf(self): """The loglikelihood, statsmodels allies of loglik""" return self.llf_ # ///////////////////////////////////////////////////////////////////////// @property def loglik(self): """The loglikelihood""" return self.loglik_ # ///////////////////////////////////////////////////////////////////////// @property def pvalues(self): """The p-values of the regression coefficients""" return self.pvals_ # ///////////////////////////////////////////////////////////////////////// @property def params(self): """The regression coefficients""" return self.coef_ # ///////////////////////////////////////////////////////////////////////// @property def bse(self): """The standard errors of the regression coefficients""" return self.bse_ # ///////////////////////////////////////////////////////////////////////// # NOTE tabulate does not seem to be maintained anymore. Should be able to # replace this with `from statsmodels.iolib.summary import Summary`
[docs] def summary(self, xname=None, tablefmt="simple"): """ Prints a summary table. Parameters ---------- xname : `list` [`str`] or `None`, default `NoneType` Names for the X variables. Default is x1, x2, ... Must match the number of parameters in the model. tablefmt : `str` `tabulate` table format for output. Please see the documentation for `tabulate` for options. """ check_is_fitted(self, "coef_") if xname and len(xname) != len(self.coef_): raise ValueError( f"Length of xname ({len(xname)}) does not match the number of " f"parameters in the model ({len(self.coef_)})" ) if not xname: xname = [f"x{i}" for i in range(1, len(self.coef_) + 1)] coef = list(self.coef_) if self.fit_intercept: xname.append("Intercept") coef.append(self.intercept_) headers = [ "", "coef", "std err", f"[{self.alpha/2}", f"{1-self.alpha/2}]", "p-value", ] table = zip(xname, coef, self.bse_, self.ci_[:, 0], self.ci_[:, 1], self.pvals_) table = tabulate(table, headers, tablefmt=tablefmt) table += "\n\n" table += f"Log-Likelihood: {round(self.loglik_, 4)}\n" table += f"Newton-Raphson iterations: {self.n_iter_}\n" print(table) if self.fit_intercept: xname.pop() return
# /////////////////////////////////////////////////////////////////////////
[docs] def decision_function(self, X): """ Returns the linear predictors (including the intercept). Parameters ---------- X : `np.ndarray` A user supplied design matrix, should have the same number of variables used to derive the fitted model. Notes ----- The returned value is on the logit(risk) scale. """ # validation check_is_fitted(self, "coef_") # Validate X at predict time X = check_array( X, dtype=np.float64, accept_sparse=False, force_all_finite=True ) # X = self._validate_data(X, reset=False) scores = X @ self.coef_ + self.intercept_ return scores
# ///////////////////////////////////////////////////////////////////////// # NOTE this can be expaned to include a user defined threshold instead of # the hard-coded zero.
[docs] def predict(self, X): """ Returns predicted values based on a design matrix. Parameters ---------- X : `np.ndarray` A user supplied design matrix, should have the same number of variables used to derive the fitted model. Notes ----- Returns 0 or 1 depending on whether the logit(risk) is larger than zero. """ decision = self.decision_function(X) if len(decision.shape) == 1: indices = (decision > 0).astype(int) else: indices = decision.argmax(axis=1) return self.classes_[indices]
# /////////////////////////////////////////////////////////////////////////
[docs] def predict_proba(self, X): """ Returns the predicted probability. Parameters ---------- X : `np.ndarray` A user supplied design matrix, should have the same number of variables used to derive the fitted model. """ decision = self.decision_function(X) if decision.ndim == 1: decision = np.c_[-decision, decision] proba = expit(decision) return proba
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _firth_newton_raphson(X, y, max_iter, max_stepsize, max_halfstep, tol, mask=None): # see logistf reference manual for explanation of procedure coef = np.zeros(X.shape[1]) for iter in range(1, max_iter + 1): preds = expit(X @ coef) XW = _get_XW(X, preds, mask) fisher_info_mtx = XW.T @ XW hat = _hat_diag(XW) U_star = np.matmul(X.T, y - preds + np.multiply(hat, 0.5 - preds)) step_size = np.linalg.lstsq(fisher_info_mtx, U_star, rcond=None)[0] # if mask: # step_size[mask] = 0 # step-halving mx = np.max(np.abs(step_size)) / max_stepsize if mx > 1: step_size = step_size / mx # restrict to max_stepsize coef_new = coef + step_size preds_new = expit(X @ coef_new) loglike = _loglikelihood(X, y, preds) loglike_new = _loglikelihood(X, y, preds_new) steps = 0 while loglike < loglike_new: step_size *= 0.5 coef_new = coef + step_size preds_new = expit(X @ coef_new) loglike_new = _loglikelihood(X, y, preds_new) steps += 1 if steps == max_halfstep: warning_msg = "Step-halving failed to converge." warnings.warn(warning_msg, ConvergenceWarning, stacklevel=2) return coef_new, -loglike_new, iter if iter > 1 and np.linalg.norm(coef_new - coef) < tol: return coef_new, -loglike_new, iter coef += step_size warning_msg = ( "Firth logistic regression failed to converge. Try increasing max_iter." ) warnings.warn(warning_msg, ConvergenceWarning, stacklevel=2) # NOTE taking the negative because _loglikelihood return the negativve # loglik return coef, -loglike_new, max_iter # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _loglikelihood(X, y, preds, penalized:bool=True,): """ Estimate the negative log-likelihood. Parameters ---------- X : `np.ndarray` The design matrix. y : `np.ndarray` The outcome. preds : `np.ndarray` The linear predictors. penalized : `bool`, default `True` Whether to calculate the penalised or regular log likelihood. """ # penalized log-likelihood XW = _get_XW(X, preds) fisher_info_mtx = XW.T @ XW penalty = 0.5 * np.log(np.linalg.det(fisher_info_mtx)) loglik = -1 * (np.sum(y * np.log(preds) + (1 - y) * np.log(1 - preds)) + penalty) if penalized == False: loglik = -1 * np.sum(y * np.log(preds) + (1 - y) * np.log(1 - preds)) return loglik # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _get_XW(X, preds, mask=None): # mask is 1-indexed because 0 == None rootW = np.sqrt(preds * (1 - preds)) XW = rootW[:, np.newaxis] * X # is this equivalent?? # https://github.com/georgheinze/logistf/blob/master/src/logistf.c#L150-L159 if mask is not None: XW[:, mask] = 0 return XW # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _get_aug_XW(X, preds, hats): rootW = np.sqrt(preds * (1 - preds) * (1 + hats)) XW = rootW[:, np.newaxis] * X return XW # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _hat_diag(XW): # Get diagonal elements of the hat matrix qr, tau, _, _ = lapack.dgeqrf(XW, overwrite_a=True) Q, _, _ = lapack.dorgqr(qr, tau, overwrite_a=True) hat = np.einsum("ij,ij->i", Q, Q) return hat # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _bse(X, coefs): # se in logistf is diag(object$var) ^ 0.5, where var is the covariance matrix, # which is the inverse of the observed fisher information matrix # https://stats.stackexchange.com/q/68080/343314 preds = expit(X @ coefs) XW = _get_XW(X, preds) fisher_info_mtx = XW.T @ XW return np.sqrt(np.diag(np.linalg.pinv(fisher_info_mtx))) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _penalized_lrt( full_loglik, X, y, max_iter, max_stepsize, max_halfstep, tol, test_vars, ): if test_vars is None: test_var_indices = range(X.shape[1]) elif isinstance(test_vars, int): # single index test_var_indices = [test_vars] else: # list, tuple, or set of indices test_var_indices = sorted(test_vars) pvals = [] for mask in test_var_indices: _, null_loglik, _ = _firth_newton_raphson( X, y, max_iter, max_stepsize, max_halfstep, tol, mask, ) pvals.append(_lrt(full_loglik, null_loglik)) if len(pvals) < X.shape[1]: pval_array = np.full(X.shape[1], np.nan) for idx, test_var_idx in enumerate(test_var_indices): pval_array[test_var_idx] = pvals[idx] return pval_array return np.array(pvals) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _lrt(full_loglik, null_loglik): # in logistf: 1-pchisq(2*(fit.full$loglik-fit.i$loglik),1) lr_stat = 2 * (full_loglik - null_loglik) p_value = chi2.sf(lr_stat, df=1) return p_value # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _predict(X, coef): preds = expit(X @ coef) np.clip(preds, a_min=1e-15, a_max=1 - 1e-15, out=preds) return preds # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _profile_likelihood_ci( X, y, fitted_coef, full_loglik, max_iter, max_stepsize, max_halfstep, tol, alpha, test_vars, ): LL0 = full_loglik - chi2.ppf(1 - alpha, 1) / 2 lower_bound = [] upper_bound = [] if test_vars is None: test_var_indices = range(fitted_coef.shape[0]) elif isinstance(test_vars, int): # single index test_var_indices = [test_vars] else: # list, tuple, or set of indices test_var_indices = sorted(test_vars) for side in [-1, 1]: # for coef_idx in range(fitted_coef.shape[0]): for coef_idx in test_var_indices: coef = deepcopy(fitted_coef) for iter in range(1, max_iter + 1): preds = _predict(X, coef) # NOTE taking the negative because _loglikelihood return the # negativve loglik loglike = -_loglikelihood(X, y, preds) XW = _get_XW(X, preds) hat = _hat_diag(XW) XW = _get_aug_XW(X, preds, hat) # augmented data using hat diag fisher_info_mtx = XW.T @ XW U_star = np.matmul(X.T, y - preds + np.multiply(hat, 0.5 - preds)) # https://github.com/georgheinze/logistf/blob/master/src/logistf.c#L780-L781 inv_fisher = np.linalg.pinv(fisher_info_mtx) tmp1x1 = U_star @ np.negative(inv_fisher) @ U_star underRoot = ( -2 * ((LL0 - loglike) + 0.5 * tmp1x1) / (inv_fisher[coef_idx, coef_idx]) ) lambda_ = 0 if underRoot < 0 else side * sqrt(underRoot) U_star[coef_idx] += lambda_ step_size = np.linalg.lstsq(fisher_info_mtx, U_star, rcond=None)[0] mx = np.max(np.abs(step_size)) / max_stepsize if mx > 1: step_size = step_size / mx # restrict to max_stepsize coef += step_size loglike_old = deepcopy(loglike) for halfs in range(1, max_halfstep + 1): preds = _predict(X, coef) # NOTE taking the negative because _loglikelihood return the # negativve loglik loglike = -_loglikelihood(X, y, preds) if (abs(loglike - LL0) < abs(loglike_old - LL0)) and loglike > LL0: break step_size *= 0.5 coef -= step_size if abs(loglike - LL0) <= tol: if side == -1: lower_bound.append(coef[coef_idx]) else: upper_bound.append(coef[coef_idx]) break if abs(loglike - LL0) > tol: if side == -1: lower_bound.append(np.nan) else: upper_bound.append(np.nan) warning_msg = ( f"Non-converged PL confidence limits - max number of " f"iterations exceeded for variable x{coef_idx}. Try " f"increasing pl_max_iter." ) warnings.warn(warning_msg, ConvergenceWarning, stacklevel=2) bounds = np.column_stack([lower_bound, upper_bound]) if len(lower_bound) < fitted_coef.shape[0]: ci = np.full([fitted_coef.shape[0], 2], np.nan) for idx, test_var_idx in enumerate(test_var_indices): ci[test_var_idx] = bounds[idx] # return ci return ci # return bounds return bounds # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _wald_ci(coef, bse, alpha): lower_ci = coef + norm.ppf(alpha / 2) * bse upper_ci = coef + norm.ppf(1 - alpha / 2) * bse return np.column_stack([lower_ci, upper_ci]) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _wald_test(coef, bse): # 1 - pchisq((beta^2/vars), 1), in our case bse = vars^0.5 return chi2.sf(np.square(coef) / np.square(bse), 1)