import numpy as np
[docs]
class PinballLoss():
'''
Pinball or Quantile loss function
'''
def __init__(self, quantile):
'''
INPUT:
- quantile: float,
the quantile to compute the loss function
'''
super().__init__()
self.quantile = quantile
[docs]
def __call__(self, y_pred, y_true):
'''
INPUT:
- y_pred: numpy array,
the predicted values
- y_true: numpy array,
the true values
OUTPUT:
- loss: float,
the mean pinball loss
'''
#Check consistency in the dimensions
if len(y_pred.shape) == 1:
y_pred = y_pred.reshape(-1,1)
if len(y_true.shape) == 1:
y_true = y_true.reshape(-1,1)
if y_pred.shape != y_true.shape:
raise ValueError(f'Dimensions of y_pred ({y_pred.shape}) and y_true ({y_true.shape}) do not match!!!')
# Compute the pinball loss
error = y_true - y_pred
loss = np.where(error >= 0, self.quantile * error, (self.quantile - 1) * error)
loss = np.mean(loss)
return loss
[docs]
class barrera_loss():
'''
Barrera loss function
'''
def __init__(self, theta, ret_mean=True):
'''
INPUT:
- theta: float,
the threshold for the loss function
- ret_mean: bool,
if True, the function returns the mean of the loss. Default is True
'''
self.theta = theta
self.ret_mean = ret_mean
[docs]
def __call__(self, v_, e_, y_):
'''
INPUT:
- v_: numpy array,
the quantile estimate
- e_: numpy array,
the expected shortfall estimate
- y_: numpy array,
the actual time series
OUTPUT:
- loss: float,
the loss function mean value, if ret_mean is True. Otherwise, the loss for each observation
'''
v, e, y = v_.flatten(), e_.flatten(), y_.flatten()
r = e - v #Barrera loss is computed on the difference ES - VaR
if self.ret_mean: #If true, return the mean of the loss
loss = np.nanmean( (r - np.where(y<v, (y-v)/self.theta, 0))**2 )
else: #Otherwise, return the loss for each observation
loss = (r - np.where(y<v, (y-v)/self.theta, 0))**2
return loss
[docs]
class patton_loss():
'''
Patton loss function
'''
def __init__(self, theta, ret_mean=True):
'''
INPUT:
- theta: float,
the threshold for the loss function
- ret_mean: bool,
if True, the function returns the mean of the loss. Default is True
'''
self.theta = theta
self.ret_mean = ret_mean
[docs]
def __call__(self, v_, e_, y_):
'''
INPUT:
- v_: numpy array,
the quantile estimate
- e_: numpy array,
the expected shortfall estimate
- y_: numpy array,
the actual time series
OUTPUT:
- loss: float,
the loss function mean value, if ret_mean is True. Otherwise, the loss for each observation
'''
v, e, y = v_.flatten()*100, e_.flatten()*100, y_.flatten()*100
if self.ret_mean: #If true, return the mean of the loss
loss = np.nanmean(
np.where(y<=v, (y-v)/(self.theta*e), 0) + v/e + np.log(-e) - 1
)
else: #Otherwise, return the loss for each observation
loss = np.where(y<=v, (y-v)/(self.theta*e), 0) + v/e + np.log(-e) - 1
return loss
[docs]
class DMtest():
'''
Diebold-Mariano test for the equality of forecast accuracy.
'''
def __init__(self, loss_func, h = 1):
'''
INPUT:
- loss_func: callable,
the loss function to compute the forecast accuracy
- h: int,
the maximum lag to compute the autocovariance. Default is 1
'''
self.loss_func = loss_func
self.h = h
[docs]
def autocovariance(self, Xi, T, k, Xs):
'''
Compute the autocovariance of a time series
INPUT:
- Xi: numpy array,
the time series
- T: int,
the length of the time series
- k: int,
the lag
- Xs: float,
the mean of the time series
OUTPUT:
- autoCov: float,
the autocovariance
'''
autoCov = 0
for i in np.arange(0, T-k):
autoCov += ((Xi[i+k])-Xs)*(Xi[i]-Xs)
autoCov = (1/T)*autoCov
return autoCov
[docs]
def __call__(self, Q1, E1, Q2, E2, Y):
'''
INPUT:
- Q1: numpy array,
the first set of quantile predictions
- E1: numpy array,
the first set of expected shortfall predictions
- Q2: numpy array,
the second set of quantile predictions
- E2: numpy array,
the second set of expected shortfall predictions
- Y: numpy array,
the actual time series
OUTPUT:
- stat: float,
the test statistic
- p_value: float,
the p-value of the test
- mean_difference: float,
the mean difference of the losses
'''
import warnings
from scipy.stats import t
#Compute losses
e1_lst = self.loss_func(Q1.flatten(), E1.flatten(), Y.flatten())
e2_lst = self.loss_func(Q2.flatten(), E2.flatten(), Y.flatten())
d_lst = e1_lst - e2_lst
# Clean NaN values, if any
n = len(d_lst)
d_lst = d_lst[~np.isnan(d_lst)]
T = len(d_lst)
if T < n:
warnings.warn('There are NaN in the population! They have been removed.', UserWarning)
if T == 0:
warnings.warn('All values are NaN!', UserWarning)
if np.sum(np.isnan(e1_lst)) == n:
return {'stat':np.nan, 'p_value':0, 'mean_difference':np.inf}
if np.sum(np.isnan(e2_lst)) == n:
return {'stat':np.nan, 'p_value':0, 'mean_difference':-np.inf}
else:
return {'stat':np.nan, 'p_value':0, 'mean_difference':0}
else:
mean_d = np.mean(d_lst)
# Find autocovariance and construct DM test statistics
gamma = list()
for lag in range(0, self.h):
gamma.append(self.autocovariance(d_lst, T, lag, mean_d))
V_d = (gamma[0] + 2*np.sum(gamma[1:]))/T
DM_stat = mean_d / np.sqrt(V_d)
harvey_adj = np.sqrt( (T+1-2*self.h + self.h*(self.h-1)/T) / T )
DM_stat *= harvey_adj
# Find p-value
p_value = 2*t.cdf(-abs(DM_stat), df = T - 1)
return {'stat':DM_stat, 'p_value':p_value, 'mean_difference':mean_d}
[docs]
def cr_t_test(errorsA, errorsB, train_len, test_len):
'''
Corrected resampled t-test for the equality of forecast accuracy.
INPUT:
- errorsA: numpy array,
the first set of forecast errors
- errorsB: numpy array,
the second set of forecast errors
- train_len: int,
the length of the training set
- test_len: int,
the length of the test set
OUTPUT:
- stat: float,
the test statistic
- p_val: float,
the p-value of the test
'''
from scipy.stats import t as stud_t
output = dict() #Initialize output
J = len(errorsA) #Compute the number of folds
if J != len(errorsB):
raise ValueError('Both samples must have the same length!')
if isinstance(errorsA, list):
errorsA = np.array(errorsA)
if isinstance(errorsB, list):
errorsB = np.array(errorsB)
mu_j = errorsA - errorsB #Vector of difference of generalization errors
mu_hat = np.mean(mu_j) #Mean of the difference of generalization errors
S2 = np.sum( (mu_j-mu_hat)**2 ) / (J-1) #In sample variance
sigma2 = (1/J + test_len/train_len)*S2 #Adjusted variance
return {'stat':mu_hat / np.sqrt(sigma2), 'p_value':stud_t.cdf(output['stat'], J-1)}
[docs]
class bootstrap_mean_test():
'''
Bootstrap test for the mean of a sample
'''
def __init__(self, mu_target, one_side=False, n_boot=10_000):
'''
INPUT:
- mu_target: float,
the mean to test against
- one_side: bool,
if True, the test is one sided (i.e. H0: mu >= mu_target). Default is False
- n_boot: int,
the number of bootstrap replications. Default is 10_000
'''
self.mu_target = mu_target
self.one_side = one_side
self.n_boot = n_boot
[docs]
def null_statistic(self, B_data):
'''
Compute the null statistic for the bootstrap sample
INPUT:
- B_data: numpy array,
the bootstrap sample
OUTPUT:
- stat: float,
the null statistic
'''
return (np.mean(B_data) - self.obs_mean) * np.sqrt(self.n) / np.std(B_data)
[docs]
def statistic(self, data):
'''
Compute the test statistic for the original sample
INPUT:
- data: numpy array,
the original sample
OUTPUT:
- stat: float,
the test statistic
'''
return (self.obs_mean - self.mu_target) * np.sqrt(self.n) / np.std(data)
[docs]
def __call__(self, data, seed=None):
'''
Compute the test
INPUT:
- data: numpy array,
the original sample
- seed: int,
the seed for the random number generator. Default is None
OUTPUT:
- statistic: float,
the test statistic
- p_value: float,
the p-value of the test
'''
np.random.seed(seed)
self.obs_mean = np.mean(data)
self.n = len(data)
B_stats = list()
for _ in range(self.n_boot):
B_stats.append( self.null_statistic(
np.random.choice(data, size=self.n, replace=True) ))
B_stats = np.array(B_stats)
self.B_stats = B_stats
if self.one_side:
obs = self.statistic(data)
return {'statistic':obs, 'p_value':np.mean(B_stats < obs)}
else:
obs = np.abs(self.statistic(data))
return {'statistic':self.statistic(data),
'p_value':np.mean((B_stats > obs) | (B_stats < -obs))}
[docs]
class McneilFrey_test(bootstrap_mean_test):
'''
McNeil-Frey test for assessing the goodness of the Expected Shortfall estimate
'''
def __init__(self, one_side=False, n_boot=10_000):
'''
INPUT:
- one_side: bool,
if True, the test is one sided (i.e. H0: mu >= mu_target). Default is False
- n_boot: int,
the number of bootstrap replications. Default is 10_000
'''
super().__init__(0, one_side, n_boot)
[docs]
def __call__(self, Q, E, Y, seed=None):
'''
Compute the test
INPUT:
- Q: numpy array,
the quantile estimates
- E: numpy array,
the expected shortfall estimates
- Y: numpy array,
the actual time series
- seed: int,
the seed for the random number generator. Default is None
OUTPUT:
- statistic: float,
the test statistic
- p_value: float,
the p-value of the test
'''
return super().__call__( self.mnf_transform(Q, E, Y).flatten(), seed)
[docs]
class AS14_test(bootstrap_mean_test):
'''
Acerbi-Szekely test for assessing the goodness of the Expected Shortfall estimate
'''
def __init__(self, one_side=False, n_boot=10_000):
'''
INPUT:
- one_side: bool,
if True, the test is one sided (i.e. H0: mu >= mu_target). Default is False
- n_boot: int,
the number of bootstrap replications. Default is 10_000
'''
super().__init__(-1, one_side, n_boot)
[docs]
def __call__(self, Q, E, Y, theta, test_type='Z1', seed=None):
'''
Compute the test
INPUT:
- Q: numpy array,
the quantile estimates
- E: numpy array,
the expected shortfall estimates
- Y: numpy array,
the actual time series
- test_type: str,
the type of test to perform. It must be either 'Z1' or 'Z2'
- theta: float,
the threshold for the test
- seed: int,
the seed for the random number generator. Default is None
OUTPUT:
- statistic: float,
the test statistic
- p_value: float,
the p-value of the test
'''
return super().__call__( self.as14_transform(test_type, Q, E, Y, theta).flatten(), seed)
[docs]
class LossDiff_test(bootstrap_mean_test):
'''
Encompassing test to assess whenever the first sample of losses is statistically lower than the second.
'''
def __init__(self, loss, n_boot=10_000):
'''
INPUT:
- loss: callable,
the loss function to compute the forecast accuracy
- n_boot: int,
the number of bootstrap replications. Default is 10_000
'''
super().__init__(0, True, n_boot)
self.loss = loss
[docs]
def __call__(self, Q_new, E_new, Q_bench, E_bench, Y, seed=None):
'''
Compute the test
INPUT:
- Q_new: numpy array,
the first set of quantile predictions
- E_new: numpy array,
the first set of expected shortfall predictions
- Q_bench: numpy array,
the second set of quantile predictions
- E_bench: numpy array,
the second set of expected shortfall predictions
- Y: numpy array,
the actual time series
- seed: int,
the seed for the random number generator. Default is None
OUTPUT:
- statistic: float,
the test statistic
- p_value: float,
the p-value of the test
'''
return super().__call__( self.ld_transform(
Q_new, E_new, Q_bench, E_bench, Y).flatten(), seed)
[docs]
class Encompassing_test(bootstrap_mean_test):
'''
Encompassing test to assess whenever the first sample of losses is statistically lower than the second.
'''
def __init__(self, loss, n_boot=10_000):
'''
INPUT:
- loss: callable,
the loss function to compute the forecast accuracy
- n_boot: int,
the number of bootstrap replications. Default is 10_000
'''
super().__init__(0, True, n_boot)
self.loss = loss
[docs]
def __call__(self, Q_new, E_new, Q_bench, E_bench, Y, seed=None):
'''
Compute the test
INPUT:
- Q_new: numpy array,
the first set of quantile predictions
- E_new: numpy array,
the first set of expected shortfall predictions
- Q_bench: numpy array,
the second set of quantile predictions
- E_bench: numpy array,
the second set of expected shortfall predictions
- Y: numpy array,
the actual time series
- seed: int,
the seed for the random number generator. Default is None
OUTPUT:
- statistic: float,
the test statistic
- p_value: float,
the p-value of the test
'''
return super().__call__( self.en_transform(
Q_new, E_new, Q_bench, E_bench, Y).flatten(), seed)
[docs]
def christoffersen_cc_test(violations, theta):
'''
Christoffersen (1998) conditional coverage test for VaR backtesting.
Tests both unconditional coverage (correct violation rate) and independence
(no clustering of violations). The test statistic is LR_CC = LR_UC + LR_IND,
distributed as chi-squared with 2 degrees of freedom under the null.
INPUT:
- violations: numpy array,
binary array where 1 indicates a VaR violation at time t
- theta: float,
the nominal coverage level (e.g., 0.025 for 2.5% VaR)
OUTPUT:
- dict with keys:
- 'LR_UC': float, unconditional coverage test statistic
- 'LR_IND': float, independence test statistic
- 'LR_CC': float, conditional coverage test statistic
- 'p_value_UC': float, p-value for unconditional coverage
- 'p_value_IND': float, p-value for independence
- 'p_value_CC': float, p-value for conditional coverage
- 'violation_rate': float, observed violation rate
'''
from scipy.stats import chi2
# Flatten and convert to binary array
violations = violations.flatten().astype(int)
T = len(violations)
n_violations = np.sum(violations)
violation_rate = n_violations / T
# Unconditional Coverage Test (Kupiec 1995)
if n_violations == 0 or n_violations == T:
# Edge case: no violations or all violations
LR_UC = np.inf if (n_violations == 0 and theta > 0) or (n_violations == T and theta < 1) else 0
p_value_UC = 0 if LR_UC == np.inf else 1
else:
LR_UC = -2 * (
n_violations * np.log(theta) +
(T - n_violations) * np.log(1 - theta) -
n_violations * np.log(violation_rate) -
(T - n_violations) * np.log(1 - violation_rate)
)
p_value_UC = 1 - chi2.cdf(LR_UC, df=1)
# Independence Test (Christoffersen 1998)
# Count transitions: n_ij = number of times state i is followed by state j
# States: 0 = no violation, 1 = violation
n_00 = np.sum((violations[:-1] == 0) & (violations[1:] == 0))
n_01 = np.sum((violations[:-1] == 0) & (violations[1:] == 1))
n_10 = np.sum((violations[:-1] == 1) & (violations[1:] == 0))
n_11 = np.sum((violations[:-1] == 1) & (violations[1:] == 1))
# Transition probabilities
# pi_0 = P(violation at t | no violation at t-1)
# pi_1 = P(violation at t | violation at t-1)
if n_00 + n_01 == 0:
pi_0 = 0
else:
pi_0 = n_01 / (n_00 + n_01)
if n_10 + n_11 == 0:
pi_1 = 0
else:
pi_1 = n_11 / (n_10 + n_11)
# Unconditional probability (pooled)
pi = (n_01 + n_11) / (T - 1)
# Likelihood ratio test for independence
# H0: pi_0 = pi_1 (violations are independent)
# H1: pi_0 ≠ pi_1 (violations are clustered)
if pi_0 == 0 and pi_1 == 0:
# No violations in the sequence
LR_IND = 0
p_value_IND = 1
elif pi == 0 or pi == 1:
# All same state
LR_IND = 0
p_value_IND = 1
else:
# Compute log-likelihood under H0 (independence)
LL_H0 = (n_00 + n_10) * np.log(1 - pi) + (n_01 + n_11) * np.log(pi)
# Compute log-likelihood under H1 (dependence)
LL_H1 = 0
if pi_0 > 0 and pi_0 < 1:
LL_H1 += n_00 * np.log(1 - pi_0) + n_01 * np.log(pi_0)
elif pi_0 == 0:
LL_H1 += n_00 * np.log(1 - 1e-10) # Avoid log(0)
else: # pi_0 == 1
LL_H1 += n_01 * np.log(1 - 1e-10)
if pi_1 > 0 and pi_1 < 1:
LL_H1 += n_10 * np.log(1 - pi_1) + n_11 * np.log(pi_1)
elif pi_1 == 0:
LL_H1 += n_10 * np.log(1 - 1e-10)
else: # pi_1 == 1
LL_H1 += n_11 * np.log(1 - 1e-10)
LR_IND = -2 * (LL_H0 - LL_H1)
p_value_IND = 1 - chi2.cdf(LR_IND, df=1)
# Conditional Coverage Test
LR_CC = LR_UC + LR_IND
p_value_CC = 1 - chi2.cdf(LR_CC, df=2)
return {
'LR_UC': LR_UC,
'LR_IND': LR_IND,
'LR_CC': LR_CC,
'p_value_UC': p_value_UC,
'p_value_IND': p_value_IND,
'p_value_CC': p_value_CC,
'violation_rate': violation_rate,
'n_violations': n_violations
}
[docs]
def gaussian_tail_stats(theta, loc=0, scale=1):
'''
Compute the Value at Risk and Expected Shortfall for a Gaussian distribution
INPUT:
- theta: float,
the quantile to compute the statistics
- loc: numpy array,
the mean of the distribution
- scale: numpy array,
the standard deviation of the distribution
OUTPUT:
- var: numpy array,
the Value at Risk
- es: numpy array,
the Expected Shortfall
'''
from scipy.stats import norm
# If working with scalar, convert to numpy array
if isinstance(loc, (int, float)):
loc = np.array([loc])
if isinstance(scale, (int, float)):
scale = np.array([scale])
# Raise error if the dimensions do not match
if loc.shape != scale.shape:
raise ValueError(f'loc and scale must have the same dimensions!\nFound loc={loc.shape} and scale={scale.shape}')
# Compute the Expected Shortfall
var = np.zeros_like(loc)
es = np.zeros_like(loc)
for t in range(len(loc)):
es[t] = loc[t] - scale[t]*norm.pdf(norm.ppf(1-theta))/theta
var[t] = loc[t] + scale[t]*norm.ppf(theta)
# If the input was a scalar, return scalars
if len(var) == 1:
return {'var':var[0], 'es':es[0]}
else:
return {'var':var, 'es':es}
[docs]
def tstudent_tail_stats(theta, df, loc=0, scale=1):
'''
Compute the Value at Risk and Expected Shortfall for a Student's t distribution
INPUT:
- theta: float,
the quantile to compute the statistics
- df: int,
the degrees of freedom of the distribution
- loc: numpy array,
the mean of the distribution
- scale: numpy array,
the standard deviation of the distribution
OUTPUT:
- var: numpy array,
the Value at Risk
- es: numpy array,
the Expected Shortfall
'''
from scipy.stats import t as t_dist
from scipy.special import gamma as gamma_func
# If working with scalar, convert to numpy array
if isinstance(loc, (int, float)):
loc = np.array([loc])
if isinstance(scale, (int, float)):
scale = np.array([scale])
# Raise error if the dimensions do not match
if loc.shape != scale.shape:
raise ValueError('loc and scale must have the same dimensions!')
# Compute the Expected Shortfall
cte = gamma_func((df+1)/2) / (np.sqrt(np.pi*df)*gamma_func(df/2))
var = np.zeros_like(loc)
es = np.zeros_like(loc)
for t in range(len(loc)):
var[t] = t_dist.ppf(theta, df=df, loc=0, scale=1)
tau = cte * (1 + var[t]**2/df)**(-(1+df)/2)
es[t] = loc[t] - scale[t] * (df + var[t]**2) * tau / ( (df-1) * theta)
var[t] = loc[t] + var[t] * scale[t]
# If the input was a scalar, return scalars
if len(var) == 1:
return {'var':var[0], 'es':es[0]}
else:
return {'var':var, 'es':es}