"""
A module implementing Firth's logistic regression.
Firth's regression is a penalised likelihood method that addresses small-sample
bias and perfect separation in logistic regression. It adjusts score function
to yield more reliable parameter estimates.
Note this is essentially a fork of the GitHub `firthlogist <https://github.com/jzluo/firthlogist/tree/master>`_
repo, with minor tweaks to work on python 3.10+.
"""
import warnings
import numpy as np
from math import sqrt
from copy import deepcopy
from scipy.linalg import lapack
from scipy.special import expit
from scipy.stats import chi2, norm
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.exceptions import ConvergenceWarning
from sklearn.preprocessing import LabelEncoder
from sklearn.utils.multiclass import check_classification_targets
from sklearn.utils.validation import (
check_is_fitted,
check_X_y,
check_array
)
from tabulate import tabulate
from stats_misc.constants import (
Error_MSG,
)
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[docs]
class FirthLogisticRegression(BaseEstimator, ClassifierMixin):
"""
Logistic regression with Firth's bias reduction method.
This is based on the implementation in the `logistf` R package. Please see
the `logistf` references [1]_ and [2]_ for details about the method.
Parameters
----------
max_iter : `int`, default 25
The maximum number of Newton-Raphson iterations.
max_halfstep : `int`, default 0
The maximum number of step-halvings in one Newton-Raphson iteration.
max_stepsize : `int`, default 5
The maximum step size - for each coefficient, the step size is forced
to be less than max_stepsize.
pl_max_iter : `int`, default 100
The maximum number of Newton-Raphson iterations for finding profile
likelihood confidence intervals.
pl_max_halfstep : `int`, default 0
The maximum number of step-halvings in one iteration for finding profile
likelihood confidence intervals.
pl_max_stepsize : `int`, default 5
The maximum step size while finding PL confidence intervals.
tol : `float`, default 0.0001
Convergence tolerance for stopping.
fit_intercept : `bool`, default `True`
Specifies if intercept should be added.
skip_pvals : `bool`, default `False`
If True, p-values will not be calculated. Calculating the p-values can
be time-consuming if `wald=False` since the fitting procedure is
repeated for each coefficient.
skip_ci : `bool`, default `False`
If True, confidence intervals will not be calculated. Calculating the
confidence intervals via profile likelihoood is time-consuming.
alpha : `float`, default 0.05
Significance level (confidence interval = 1-alpha).
wald : `bool`, default `False`
If True, uses Wald method to calculate p-values and confidence
intervals.
test_vars : `int`, `list` [`int`], or `None`, default `None`
Index or list of indices of the variables for which to calculate
confidence intervals and p-values. If None, calculate for all variables.
This option has no effect if wald=True.
Attributes
----------
bse_
Standard errors of the coefficients.
classes_
A list of the class labels.
ci_
The fitted profile likelihood confidence intervals.
coef_
The coefficients of the features.
intercept_
Fitted intercept. If `fit_intercept = False`, the intercept is set to
zero.
loglik_
Fitted penalized log-likelihood.
llf_
A statsmodels compliant allies for `loglik_`
n_iter_
Number of Newton-Raphson iterations performed.
pvals_
p-values calculated by penalized likelihood ratio tests.
References
----------
.. [1] Firth D (1993). Bias reduction of maximum likelihood estimates.
Biometrika 80, 27–38.
.. [2] Heinze G, Schemper M (2002). A solution to the problem of separation
in logistic regression. Statistics in Medicine 21: 2409-2419.
"""
# /////////////////////////////////////////////////////////////////////////
def __init__(
self,
max_iter=25,
max_halfstep=0,
max_stepsize=5,
pl_max_iter=100,
pl_max_halfstep=0,
pl_max_stepsize=5,
tol=0.0001,
fit_intercept=True,
skip_pvals=False,
skip_ci=False,
alpha=0.05,
wald=False,
test_vars=None,
):
# ### check input
# is alpha between 0 and 1
if alpha <= 0 or alpha >= 1:
raise ValueError(Error_MSG.FLOAT_LIMITS.format('alpha', 0, 1))
# ### assign input
self.max_iter = max_iter
self.max_stepsize = max_stepsize
self.max_halfstep = max_halfstep
self.pl_max_iter = pl_max_iter
self.pl_max_halfstep = pl_max_halfstep
self.pl_max_stepsize = pl_max_stepsize
self.tol = tol
self.fit_intercept = fit_intercept
self.skip_pvals = skip_pvals
self.skip_ci = skip_ci
self.alpha = alpha
self.wald = wald
self.test_vars = test_vars
# /////////////////////////////////////////////////////////////////////////
def _more_tags(self):
# return {"binary_only": True}
return {"binary_only": True, "fit requires 2 classes only": True,
"requires_y": True,}
# /////////////////////////////////////////////////////////////////////////
[docs]
def fit(self, X, y):
"""
Fits a Firth logistic regression model.
Parameters
----------
X : `np.ndarray`
A 2d numpy array with dimmension n (subjects) by k (variables). The
array represents the design matrix of independent variables.
y : `np.ndarray`
A 1d or 2d numpy array with dimmension n by 1. The array represents
the dependent variable, typically coded as 0 and 1.
"""
# ### validate input
# Validate X and y, forcing float64, no NaN/inf, ensures 2D for X
X, y = check_X_y(
X, y, dtype=np.float64, accept_sparse=False, ensure_2d=True,
force_all_finite=True
)
# Enforce at least 2 samples
if X.shape[0] < 2:
raise ValueError(
f"Found array with {X.shape[0]} sample(s) (shape={X.shape}) "
"while a minimum of 2 is required by "
f"{self.__class__.__name__}."
)
# Record the number of features, per scikit-learn convention
self.n_features_in_ = X.shape[1]
# Confirm classification targets are valid, must be binary
check_classification_targets(y)
self.classes_ = np.unique(y)
if len(self.classes_) != 2:
raise ValueError("Only binary classification is supported.")
# Encode y as 0/1
y = LabelEncoder().fit_transform(y).astype(X.dtype, copy=False)
# ### pre-process data
if self.fit_intercept:
X = np.hstack((X, np.ones((X.shape[0], 1))))
# ### fit model
self.coef_, self.loglik_, self.n_iter_ = _firth_newton_raphson(
X, y, self.max_iter, self.max_stepsize, self.max_halfstep, self.tol
)
# map loglig to llf (matching statsmodels)
self.llf_ = self.loglik_
self.bse_ = _bse(X, self.coef_)
if not self.skip_ci:
if not self.wald:
self.ci_ = _profile_likelihood_ci(
X=X,
y=y,
fitted_coef=self.coef_,
full_loglik=self.loglik_,
max_iter=self.pl_max_iter,
max_stepsize=self.pl_max_stepsize,
max_halfstep=self.pl_max_halfstep,
tol=self.tol,
alpha=self.alpha,
test_vars=self.test_vars,
)
else:
self.ci_ = _wald_ci(self.coef_, self.bse_, self.alpha)
else:
self.ci_ = np.full((self.coef_.shape[0], 2), np.nan)
# penalized likelihood ratio tests
if not self.skip_pvals:
if not self.wald:
self.pvals_ = _penalized_lrt(
self.loglik_,
X,
y,
self.max_iter,
self.max_stepsize,
self.max_halfstep,
self.tol,
self.test_vars,
)
else:
self.pvals_ = _wald_test(self.coef_, self.bse_)
else:
self.pvals_ = np.full(self.coef_.shape[0], np.nan)
if self.fit_intercept:
self.intercept_ = self.coef_[-1]
self.coef_ = self.coef_[:-1]
else:
self.intercept_ = 0
return self
# /////////////////////////////////////////////////////////////////////////
@property
def llf(self):
"""The loglikelihood, statsmodels allies of loglik"""
return self.llf_
# /////////////////////////////////////////////////////////////////////////
@property
def loglik(self):
"""The loglikelihood"""
return self.loglik_
# /////////////////////////////////////////////////////////////////////////
@property
def pvalues(self):
"""The p-values of the regression coefficients"""
return self.pvals_
# /////////////////////////////////////////////////////////////////////////
@property
def params(self):
"""The regression coefficients"""
return self.coef_
# /////////////////////////////////////////////////////////////////////////
@property
def bse(self):
"""The standard errors of the regression coefficients"""
return self.bse_
# /////////////////////////////////////////////////////////////////////////
# NOTE tabulate does not seem to be maintained anymore. Should be able to
# replace this with `from statsmodels.iolib.summary import Summary`
[docs]
def summary(self, xname=None, tablefmt="simple"):
"""
Prints a summary table.
Parameters
----------
xname : `list` [`str`] or `None`, default `NoneType`
Names for the X variables. Default is x1, x2, ... Must match the
number of parameters in the model.
tablefmt : `str`
`tabulate` table format for output. Please see the documentation for
`tabulate` for options.
"""
check_is_fitted(self, "coef_")
if xname and len(xname) != len(self.coef_):
raise ValueError(
f"Length of xname ({len(xname)}) does not match the number of "
f"parameters in the model ({len(self.coef_)})"
)
if not xname:
xname = [f"x{i}" for i in range(1, len(self.coef_) + 1)]
coef = list(self.coef_)
if self.fit_intercept:
xname.append("Intercept")
coef.append(self.intercept_)
headers = [
"",
"coef",
"std err",
f"[{self.alpha/2}",
f"{1-self.alpha/2}]",
"p-value",
]
table = zip(xname, coef, self.bse_, self.ci_[:, 0], self.ci_[:, 1], self.pvals_)
table = tabulate(table, headers, tablefmt=tablefmt)
table += "\n\n"
table += f"Log-Likelihood: {round(self.loglik_, 4)}\n"
table += f"Newton-Raphson iterations: {self.n_iter_}\n"
print(table)
if self.fit_intercept:
xname.pop()
return
# /////////////////////////////////////////////////////////////////////////
[docs]
def decision_function(self, X):
"""
Returns the linear predictors (including the intercept).
Parameters
----------
X : `np.ndarray`
A user supplied design matrix, should have the same number of
variables used to derive the fitted model.
Notes
-----
The returned value is on the logit(risk) scale.
"""
# validation
check_is_fitted(self, "coef_")
# Validate X at predict time
X = check_array(
X, dtype=np.float64, accept_sparse=False, force_all_finite=True
)
# X = self._validate_data(X, reset=False)
scores = X @ self.coef_ + self.intercept_
return scores
# /////////////////////////////////////////////////////////////////////////
# NOTE this can be expaned to include a user defined threshold instead of
# the hard-coded zero.
[docs]
def predict(self, X):
"""
Returns predicted values based on a design matrix.
Parameters
----------
X : `np.ndarray`
A user supplied design matrix, should have the same number of
variables used to derive the fitted model.
Notes
-----
Returns 0 or 1 depending on whether the logit(risk) is larger than zero.
"""
decision = self.decision_function(X)
if len(decision.shape) == 1:
indices = (decision > 0).astype(int)
else:
indices = decision.argmax(axis=1)
return self.classes_[indices]
# /////////////////////////////////////////////////////////////////////////
[docs]
def predict_proba(self, X):
"""
Returns the predicted probability.
Parameters
----------
X : `np.ndarray`
A user supplied design matrix, should have the same number of
variables used to derive the fitted model.
"""
decision = self.decision_function(X)
if decision.ndim == 1:
decision = np.c_[-decision, decision]
proba = expit(decision)
return proba
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _firth_newton_raphson(X, y, max_iter, max_stepsize, max_halfstep, tol,
mask=None):
# see logistf reference manual for explanation of procedure
coef = np.zeros(X.shape[1])
for iter in range(1, max_iter + 1):
preds = expit(X @ coef)
XW = _get_XW(X, preds, mask)
fisher_info_mtx = XW.T @ XW
hat = _hat_diag(XW)
U_star = np.matmul(X.T, y - preds + np.multiply(hat, 0.5 - preds))
step_size = np.linalg.lstsq(fisher_info_mtx, U_star, rcond=None)[0]
# if mask:
# step_size[mask] = 0
# step-halving
mx = np.max(np.abs(step_size)) / max_stepsize
if mx > 1:
step_size = step_size / mx # restrict to max_stepsize
coef_new = coef + step_size
preds_new = expit(X @ coef_new)
loglike = _loglikelihood(X, y, preds)
loglike_new = _loglikelihood(X, y, preds_new)
steps = 0
while loglike < loglike_new:
step_size *= 0.5
coef_new = coef + step_size
preds_new = expit(X @ coef_new)
loglike_new = _loglikelihood(X, y, preds_new)
steps += 1
if steps == max_halfstep:
warning_msg = "Step-halving failed to converge."
warnings.warn(warning_msg, ConvergenceWarning, stacklevel=2)
return coef_new, -loglike_new, iter
if iter > 1 and np.linalg.norm(coef_new - coef) < tol:
return coef_new, -loglike_new, iter
coef += step_size
warning_msg = (
"Firth logistic regression failed to converge. Try increasing max_iter."
)
warnings.warn(warning_msg, ConvergenceWarning, stacklevel=2)
# NOTE taking the negative because _loglikelihood return the negativve
# loglik
return coef, -loglike_new, max_iter
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _loglikelihood(X, y, preds, penalized:bool=True,):
"""
Estimate the negative log-likelihood.
Parameters
----------
X : `np.ndarray`
The design matrix.
y : `np.ndarray`
The outcome.
preds : `np.ndarray`
The linear predictors.
penalized : `bool`, default `True`
Whether to calculate the penalised or regular log likelihood.
"""
# penalized log-likelihood
XW = _get_XW(X, preds)
fisher_info_mtx = XW.T @ XW
penalty = 0.5 * np.log(np.linalg.det(fisher_info_mtx))
loglik = -1 * (np.sum(y * np.log(preds) + (1 - y) * np.log(1 - preds)) + penalty)
if penalized == False:
loglik = -1 * np.sum(y * np.log(preds) + (1 - y) * np.log(1 - preds))
return loglik
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _get_XW(X, preds, mask=None):
# mask is 1-indexed because 0 == None
rootW = np.sqrt(preds * (1 - preds))
XW = rootW[:, np.newaxis] * X
# is this equivalent??
# https://github.com/georgheinze/logistf/blob/master/src/logistf.c#L150-L159
if mask is not None:
XW[:, mask] = 0
return XW
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _get_aug_XW(X, preds, hats):
rootW = np.sqrt(preds * (1 - preds) * (1 + hats))
XW = rootW[:, np.newaxis] * X
return XW
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _hat_diag(XW):
# Get diagonal elements of the hat matrix
qr, tau, _, _ = lapack.dgeqrf(XW, overwrite_a=True)
Q, _, _ = lapack.dorgqr(qr, tau, overwrite_a=True)
hat = np.einsum("ij,ij->i", Q, Q)
return hat
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _bse(X, coefs):
# se in logistf is diag(object$var) ^ 0.5, where var is the covariance matrix,
# which is the inverse of the observed fisher information matrix
# https://stats.stackexchange.com/q/68080/343314
preds = expit(X @ coefs)
XW = _get_XW(X, preds)
fisher_info_mtx = XW.T @ XW
return np.sqrt(np.diag(np.linalg.pinv(fisher_info_mtx)))
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _penalized_lrt(
full_loglik, X, y, max_iter, max_stepsize, max_halfstep, tol, test_vars,
):
if test_vars is None:
test_var_indices = range(X.shape[1])
elif isinstance(test_vars, int): # single index
test_var_indices = [test_vars]
else: # list, tuple, or set of indices
test_var_indices = sorted(test_vars)
pvals = []
for mask in test_var_indices:
_, null_loglik, _ = _firth_newton_raphson(
X,
y,
max_iter,
max_stepsize,
max_halfstep,
tol,
mask,
)
pvals.append(_lrt(full_loglik, null_loglik))
if len(pvals) < X.shape[1]:
pval_array = np.full(X.shape[1], np.nan)
for idx, test_var_idx in enumerate(test_var_indices):
pval_array[test_var_idx] = pvals[idx]
return pval_array
return np.array(pvals)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _lrt(full_loglik, null_loglik):
# in logistf: 1-pchisq(2*(fit.full$loglik-fit.i$loglik),1)
lr_stat = 2 * (full_loglik - null_loglik)
p_value = chi2.sf(lr_stat, df=1)
return p_value
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _predict(X, coef):
preds = expit(X @ coef)
np.clip(preds, a_min=1e-15, a_max=1 - 1e-15, out=preds)
return preds
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _profile_likelihood_ci(
X,
y,
fitted_coef,
full_loglik,
max_iter,
max_stepsize,
max_halfstep,
tol,
alpha,
test_vars,
):
LL0 = full_loglik - chi2.ppf(1 - alpha, 1) / 2
lower_bound = []
upper_bound = []
if test_vars is None:
test_var_indices = range(fitted_coef.shape[0])
elif isinstance(test_vars, int): # single index
test_var_indices = [test_vars]
else: # list, tuple, or set of indices
test_var_indices = sorted(test_vars)
for side in [-1, 1]:
# for coef_idx in range(fitted_coef.shape[0]):
for coef_idx in test_var_indices:
coef = deepcopy(fitted_coef)
for iter in range(1, max_iter + 1):
preds = _predict(X, coef)
# NOTE taking the negative because _loglikelihood return the
# negativve loglik
loglike = -_loglikelihood(X, y, preds)
XW = _get_XW(X, preds)
hat = _hat_diag(XW)
XW = _get_aug_XW(X, preds, hat) # augmented data using hat diag
fisher_info_mtx = XW.T @ XW
U_star = np.matmul(X.T, y - preds + np.multiply(hat, 0.5 - preds))
# https://github.com/georgheinze/logistf/blob/master/src/logistf.c#L780-L781
inv_fisher = np.linalg.pinv(fisher_info_mtx)
tmp1x1 = U_star @ np.negative(inv_fisher) @ U_star
underRoot = (
-2
* ((LL0 - loglike) + 0.5 * tmp1x1)
/ (inv_fisher[coef_idx, coef_idx])
)
lambda_ = 0 if underRoot < 0 else side * sqrt(underRoot)
U_star[coef_idx] += lambda_
step_size = np.linalg.lstsq(fisher_info_mtx, U_star, rcond=None)[0]
mx = np.max(np.abs(step_size)) / max_stepsize
if mx > 1:
step_size = step_size / mx # restrict to max_stepsize
coef += step_size
loglike_old = deepcopy(loglike)
for halfs in range(1, max_halfstep + 1):
preds = _predict(X, coef)
# NOTE taking the negative because _loglikelihood return the
# negativve loglik
loglike = -_loglikelihood(X, y, preds)
if (abs(loglike - LL0) < abs(loglike_old - LL0)) and loglike > LL0:
break
step_size *= 0.5
coef -= step_size
if abs(loglike - LL0) <= tol:
if side == -1:
lower_bound.append(coef[coef_idx])
else:
upper_bound.append(coef[coef_idx])
break
if abs(loglike - LL0) > tol:
if side == -1:
lower_bound.append(np.nan)
else:
upper_bound.append(np.nan)
warning_msg = (
f"Non-converged PL confidence limits - max number of "
f"iterations exceeded for variable x{coef_idx}. Try "
f"increasing pl_max_iter."
)
warnings.warn(warning_msg, ConvergenceWarning, stacklevel=2)
bounds = np.column_stack([lower_bound, upper_bound])
if len(lower_bound) < fitted_coef.shape[0]:
ci = np.full([fitted_coef.shape[0], 2], np.nan)
for idx, test_var_idx in enumerate(test_var_indices):
ci[test_var_idx] = bounds[idx]
# return ci
return ci
# return bounds
return bounds
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _wald_ci(coef, bse, alpha):
lower_ci = coef + norm.ppf(alpha / 2) * bse
upper_ci = coef + norm.ppf(1 - alpha / 2) * bse
return np.column_stack([lower_ci, upper_ci])
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def _wald_test(coef, bse):
# 1 - pchisq((beta^2/vars), 1), in our case bse = vars^0.5
return chi2.sf(np.square(coef) / np.square(bse), 1)