"""
HAR-CAESar: Heterogeneous Autoregressive Conditional Autoregressive Expected Shortfall
This module implements the HAR-CAESar model, an extension of CAESar that incorporates
heterogeneous autoregressive dynamics based on the Heterogeneous Market Hypothesis
(Muller et al., 1997; Corsi, 2009).
The model includes daily, weekly (5-day), and monthly (22-day) return aggregates
to capture long-memory effects in tail risk.
Author: MSc Thesis Implementation
"""
import numpy as np
import multiprocessing as mp
from .caviar import CAViaR
import warnings
def compute_har_features(returns, weekly_window=5, monthly_window=22):
"""
Compute HAR (Heterogeneous Autoregressive) features from a return series.
Args:
returns (ndarray): Daily return series of length T.
weekly_window (int, optional): Number of days for weekly average. Default is 5.
monthly_window (int, optional): Number of days for monthly average. Default is 22.
Returns:
dict: HAR features with keys:
- 'daily': r_{t-1} (lagged returns)
- 'weekly': average of r_{t-1}, ..., r_{t-5}
- 'monthly': average of r_{t-1}, ..., r_{t-22}
Note:
For the first few observations where full windows are not available,
partial averages are computed using available data.
"""
T = len(returns)
# Daily: just lagged returns (r_{t-1} for forecasting r_t)
daily = np.zeros(T)
daily[1:] = returns[:-1]
daily[0] = returns[0] # Use first return as initial value
# Weekly: rolling 5-day average of lagged returns
weekly = np.zeros(T)
for t in range(T):
if t == 0:
weekly[t] = returns[0]
elif t < weekly_window:
weekly[t] = np.mean(returns[:t])
else:
weekly[t] = np.mean(returns[t-weekly_window:t])
# Monthly: rolling 22-day average of lagged returns
monthly = np.zeros(T)
for t in range(T):
if t == 0:
monthly[t] = returns[0]
elif t < monthly_window:
monthly[t] = np.mean(returns[:t])
else:
monthly[t] = np.mean(returns[t-monthly_window:t])
return {'daily': daily, 'weekly': weekly, 'monthly': monthly}
[docs]
class HAR_CAESar:
"""
HAR-CAESar: Heterogeneous Autoregressive CAESar for joint VaR and ES estimation.
This model extends the standard CAESar (Gatta et al., 2024) by incorporating
HAR-style dynamics that capture long-memory effects in tail risk.
The VaR equation is:
q_t = β₀ + β₁^(d) r_{t-1}^+ + β₂^(d) r_{t-1}^-
+ β₁^(w) r^(w)_{t-1}^+ + β₂^(w) r^(w)_{t-1}^-
+ β₁^(m) r^(m)_{t-1}^+ + β₂^(m) r^(m)_{t-1}^-
+ β_q q_{t-1} + β_e e_{t-1}
The ES equation has the same structure with γ coefficients.
Parameters per equation: 9 (intercept + 6 return coeffs + 2 AR terms)
Total parameters: 18
"""
def __init__(self, theta, lambdas=dict()):
"""
Initialize the HAR-CAESar model.
Args:
theta (float): Quantile level (e.g., 0.025 for 2.5% VaR).
lambdas (dict, optional): Penalty weights for soft constraints.
Keys: 'r' (ES residual), 'q' (VaR positivity), 'e' (monotonicity).
Default is {'r': 10, 'q': 10, 'e': 10}.
"""
self.theta = theta
self.lambdas = {'r': 10, 'q': 10, 'e': 10}
self.lambdas.update(lambdas)
# HAR-CAESar specific parameters
self.mdl_spec = 'HAR'
self.n_parameters = 9 # Per equation: intercept + 6 return + 2 AR
# Parameter indices for clarity
# VaR equation (indices 0-8):
# 0: intercept
# 1: daily positive, 2: daily negative
# 3: weekly positive, 4: weekly negative
# 5: monthly positive, 6: monthly negative
# 7: lagged q, 8: lagged e
# ES equation (indices 9-17): same structure
# Set the loop functions
self.loop = self.R_HARloop
self.joint_loop = self.Joint_HARloop
[docs]
def loss_function(self, v, r, y):
"""Compute the Barrera loss function for ES estimation (Step 2).
Args:
v (ndarray): Value at Risk forecast.
r (ndarray): Difference between ES forecast and VaR (r = e - q).
y (ndarray): Target return series.
Returns:
float: Barrera loss value.
"""
loss_val = np.mean(
(r - np.where(y < v, (y - v) / self.theta, 0)) ** 2
) + self.lambdas['r'] * np.mean(np.where(r > 0, r, 0))
return loss_val
[docs]
def joint_loss(self, v, e, y):
"""Compute the Fissler-Ziegel (Patton) loss function for joint estimation (Step 3).
Args:
v (ndarray): VaR forecast.
e (ndarray): ES forecast.
y (ndarray): Target return series.
Returns:
float: Fissler-Ziegel loss value.
"""
loss_val = np.mean(
np.where(y <= v, (y - v) / (self.theta * e), 0) + v / e + np.log(-e) - 1
) + self.lambdas['e'] * np.mean(np.where(e > v, e - v, 0)) + \
self.lambdas['q'] * np.mean(np.where(v > 0, v, 0))
return loss_val
[docs]
def ESloss(self, beta, y, q, r0, har_features):
"""Compute the Barrera loss for ES estimation.
Args:
beta (ndarray): Parameters for ES equation. Shape is (9,).
y (ndarray): Target return series.
q (ndarray): VaR forecast from Step 1.
r0 (float): Initial value for r = e - q.
har_features (dict): HAR features with keys 'daily', 'weekly', 'monthly'.
Returns:
float: Loss value.
"""
r = self.loop(beta, y, q, r0, har_features)
loss_val = self.loss_function(q, r, y)
return loss_val
[docs]
def Jointloss(self, beta, y, q0, e0, har_features):
"""Compute the Fissler-Ziegel loss for joint estimation.
Args:
beta (ndarray): Parameters for both equations. Shape is (18,).
y (ndarray): Target return series.
q0 (float): Initial VaR.
e0 (float): Initial ES.
har_features (dict): HAR features.
Returns:
float: Loss value.
"""
q, e = self.joint_loop(beta, y, q0, e0, har_features)
loss_val = self.joint_loss(q, e, y)
return loss_val
[docs]
def R_HARloop(self, beta, y, q, r0, har_features, pred_mode=False):
"""Recursive loop for ES residual (r = e - q) estimation with HAR specification."""
rd = har_features['daily']
rw = har_features['weekly']
rm = har_features['monthly']
# Precompute asymmetric coefficients
rd_coeff = np.where(rd > 0, beta[1], beta[2])
rw_coeff = np.where(rw > 0, beta[3], beta[4])
rm_coeff = np.where(rm > 0, beta[5], beta[6])
if pred_mode:
# In prediction mode, r0 = [last_r]
r = []
r.append(beta[0] +
rd_coeff[0] * rd[0] +
rw_coeff[0] * rw[0] +
rm_coeff[0] * rm[0] +
beta[7] * r0[1] + # lagged q from state
beta[8] * r0[2]) # lagged r from state
else:
r = [r0]
# Main loop
for t in range(1, len(y)):
r_t = (beta[0] +
rd_coeff[t] * rd[t] +
rw_coeff[t] * rw[t] +
rm_coeff[t] * rm[t] +
beta[7] * q[t-1] +
beta[8] * r[t-1])
r.append(r_t)
return np.array(r)
[docs]
def Joint_HARloop(self, beta, y, q0, e0, har_features, pred_mode=False):
"""Recursive loop for joint VaR and ES estimation with HAR specification."""
rd = har_features['daily']
rw = har_features['weekly']
rm = har_features['monthly']
# Precompute asymmetric coefficients for VaR equation
rd_coeff_q = np.where(rd > 0, beta[1], beta[2])
rw_coeff_q = np.where(rw > 0, beta[3], beta[4])
rm_coeff_q = np.where(rm > 0, beta[5], beta[6])
# Precompute asymmetric coefficients for ES equation
rd_coeff_e = np.where(rd > 0, beta[10], beta[11])
rw_coeff_e = np.where(rw > 0, beta[12], beta[13])
rm_coeff_e = np.where(rm > 0, beta[14], beta[15])
if pred_mode:
# In prediction mode, e0 = [y_history, q_history, e_history]
q = []
e = []
# First prediction uses lagged values from state
q.append(beta[0] +
rd_coeff_q[0] * rd[0] +
rw_coeff_q[0] * rw[0] +
rm_coeff_q[0] * rm[0] +
beta[7] * e0[1][-1] + # lagged q from state
beta[8] * e0[2][-1]) # lagged e from state
e.append(beta[9] +
rd_coeff_e[0] * rd[0] +
rw_coeff_e[0] * rw[0] +
rm_coeff_e[0] * rm[0] +
beta[16] * e0[1][-1] + # lagged q from state
beta[17] * e0[2][-1]) # lagged e from state
else:
q = [q0]
e = [e0]
# Main loop
for t in range(1, len(y)):
q_t = (beta[0] +
rd_coeff_q[t] * rd[t] +
rw_coeff_q[t] * rw[t] +
rm_coeff_q[t] * rm[t] +
beta[7] * q[t-1] +
beta[8] * e[t-1])
e_t = (beta[9] +
rd_coeff_e[t] * rd[t] +
rw_coeff_e[t] * rw[t] +
rm_coeff_e[t] * rm[t] +
beta[16] * q[t-1] +
beta[17] * e[t-1])
q.append(q_t)
e.append(e_t)
return np.array(q), np.array(e)
[docs]
def optim4mp(self, yi, qi, r0, beta0, n_rep, har_features, pipend):
"""Optimization routine for multiprocessing (Step 2: ES estimation)."""
from scipy.optimize import minimize
# First iteration
res = minimize(
lambda x: self.ESloss(x, yi, qi, r0, har_features), beta0,
method='SLSQP', options={'disp': False})
beta_worker, fval_worker, exitflag_worker = res.x, res.fun, int(res.success)
# Repeat until success or max iterations
for _ in range(n_rep):
res = minimize(
lambda x: self.ESloss(x, yi, qi, r0, har_features), beta_worker,
method='SLSQP', options={'disp': False})
beta_worker, fval_worker, exitflag_worker = res.x, res.fun, int(res.success)
if exitflag_worker == 1:
break
pipend.send((beta_worker, fval_worker, exitflag_worker))
[docs]
def joint_optim(self, yi, q0, e0, beta0, n_rep, har_features):
"""Joint optimization routine (Step 3)."""
from scipy.optimize import minimize
# First iteration
res = minimize(
lambda x: self.Jointloss(x, yi, q0, e0, har_features), beta0,
method='SLSQP', options={'disp': False})
beta_worker, exitflag_worker = res.x, int(res.success)
# Repeat until success or max iterations
for _ in range(n_rep):
res = minimize(
lambda x: self.Jointloss(x, yi, q0, e0, har_features), beta_worker,
method='SLSQP', options={'disp': False})
beta_worker, exitflag_worker = res.x, int(res.success)
if exitflag_worker == 1:
break
return beta_worker
[docs]
def fit(self, yi, seed=None, return_train=False, q0=None, nV=102, n_init=3, n_rep=5):
"""
Fit the HAR-CAESar model on a univariate return series.
Args:
yi (ndarray): Target return series.
seed (int, optional): Random seed for reproducibility.
return_train (bool, optional): If True, return in-sample predictions. Default is False.
q0 (list, optional): Initial [VaR, ES]. If None, computed from data.
nV (int, optional): Number of random initializations. Default is 102.
n_init (int, optional): Number of best initializations to refine. Default is 3.
n_rep (int, optional): Number of optimization repetitions. Default is 5.
Returns:
dict: Keys 'qi', 'ei', 'beta' when return_train=True, else None.
"""
warnings.simplefilter(action='ignore', category=RuntimeWarning)
if isinstance(yi, list):
yi = np.array(yi)
# Compute HAR features
self.har_features_train = compute_har_features(yi)
#-------------------- Step 0: CAViaR for initial VaR guess
# Use standard CAViaR (AS specification) for initial VaR
cav_res = CAViaR(self.theta, 'AS', p=1, u=1).fit(
yi, seed=seed, return_train=True, q0=q0)
qi, beta_cav = cav_res['qi'], cav_res['beta']
del cav_res
#-------------------- Step 1: Initialization
if isinstance(q0, type(None)):
n_emp = int(np.ceil(0.1 * len(yi)))
if round(n_emp * self.theta) == 0:
n_emp = len(yi)
y_sort = np.sort(yi[:n_emp])
quantile0 = int(round(n_emp * self.theta)) - 1
if quantile0 < 0:
quantile0 = 0
e0 = np.mean(y_sort[:quantile0 + 1])
q0_val = y_sort[quantile0]
else:
q0_val = q0[0]
e0 = q0[1]
#-------------------- Step 2: Initial guess for ES parameters
np.random.seed(seed)
nInitialVectors = [nV // 3, self.n_parameters]
beta0 = [np.random.uniform(0, 1, nInitialVectors)]
beta0.append(np.random.uniform(-1, 1, nInitialVectors))
beta0.append(np.random.randn(*nInitialVectors))
beta0 = np.concatenate(beta0, axis=0)
# Evaluate initial guesses
AEfval = np.empty(nV)
for i in range(nV):
AEfval[i] = self.ESloss(beta0[i, :], yi, qi, e0 - q0_val,
self.har_features_train)
beta0 = beta0[AEfval.argsort()][0:n_init]
#-------------------- Step 3: Optimization - Step I (Barrera loss)
beta = np.empty((n_init, self.n_parameters))
fval_beta = np.empty(n_init)
exitflag = np.empty(n_init)
# Multiprocessing
workers = []
for i in range(n_init):
parent_pipend, child_pipend = mp.Pipe()
worker = mp.Process(
target=self.optim4mp,
args=(yi, qi, e0 - q0_val, beta0[i, :], n_rep,
self.har_features_train, child_pipend))
workers.append([worker, parent_pipend])
worker.start()
# Gather results
for i, worker_list in enumerate(workers):
worker, parent_pipend = worker_list
beta_worker, fval_worker, exitflag_worker = parent_pipend.recv()
worker.join()
beta[i, :] = beta_worker
fval_beta[i] = fval_worker
exitflag[i] = exitflag_worker
ind_min = np.argmin(fval_beta)
self.beta_es = beta[ind_min, :]
#-------------------- Step 4: Optimization - Step II (Patton/FZ loss)
# Build initial joint parameters from CAViaR and ES estimates
# CAViaR AS has 5 parameters: [intercept, y_pos, y_neg, q_lag, e_lag placeholder]
# We need to map to HAR structure
# Initialize VaR parameters: use CAViaR for daily, zeros for weekly/monthly
beta_q_init = np.zeros(self.n_parameters)
beta_q_init[0] = beta_cav[0] # intercept
beta_q_init[1] = beta_cav[1] # daily positive
beta_q_init[2] = beta_cav[2] # daily negative
beta_q_init[3] = 0.0 # weekly positive (new)
beta_q_init[4] = 0.0 # weekly negative (new)
beta_q_init[5] = 0.0 # monthly positive (new)
beta_q_init[6] = 0.0 # monthly negative (new)
beta_q_init[7] = beta_cav[3] # lagged q
beta_q_init[8] = 0.0 # lagged e (cross term)
# Initialize ES parameters from ES step, adjusted
beta_e_init = np.zeros(self.n_parameters)
beta_e_init[0] = self.beta_es[0] + beta_q_init[0]
beta_e_init[1] = self.beta_es[1] + beta_q_init[1]
beta_e_init[2] = self.beta_es[2] + beta_q_init[2]
beta_e_init[3] = self.beta_es[3]
beta_e_init[4] = self.beta_es[4]
beta_e_init[5] = self.beta_es[5]
beta_e_init[6] = self.beta_es[6]
beta_e_init[7] = self.beta_es[7] + beta_q_init[7] - self.beta_es[8]
beta_e_init[8] = self.beta_es[8]
joint_beta = np.concatenate([beta_q_init, beta_e_init])
# Joint optimization
joint_beta_temp = self.joint_optim(yi, q0_val, e0, joint_beta, n_rep,
self.har_features_train)
if not np.isnan(joint_beta_temp).any():
joint_beta = joint_beta_temp
self.beta = joint_beta
# Compute fitted values
qi, ei = self.joint_loop(self.beta, yi, q0_val, e0, self.har_features_train)
# Store state for prediction (need history for HAR features)
self.last_state = [yi[-22:], qi[-1:], ei[-1:]] # Keep 22 for monthly avg
self.last_y_full = yi # Keep full history for HAR computation
if return_train:
return {'qi': qi, 'ei': ei,
'beta': self.beta.reshape((2, self.n_parameters))}
[docs]
def predict(self, yf=np.array([])):
"""
Predict VaR and ES for new observations.
Args:
yf (ndarray, optional): New return observations. Default is empty array.
Returns:
dict: Keys 'qf' (VaR forecasts) and 'ef' (ES forecasts).
"""
if len(yf) == 0:
return {'qf': np.array([]), 'ef': np.array([])}
# Compute HAR features for prediction
# Combine historical and new data for proper HAR computation
y_combined = np.concatenate([self.last_y_full, yf])
har_features_full = compute_har_features(y_combined)
# Extract features for the forecast period
n_hist = len(self.last_y_full)
har_features_pred = {
'daily': har_features_full['daily'][n_hist:],
'weekly': har_features_full['weekly'][n_hist:],
'monthly': har_features_full['monthly'][n_hist:]
}
# Run prediction loop
qf, ef = self.joint_loop(self.beta, yf, None, self.last_state,
har_features_pred, pred_mode=True)
# Update state
if len(yf) > 0:
self.last_y_full = np.concatenate([self.last_y_full, yf])
# Keep last 22 for state (needed for HAR)
if len(self.last_y_full) > 1000: # Trim to avoid memory issues
self.last_y_full = self.last_y_full[-500:]
self.last_state = [
np.concatenate([self.last_state[0], yf])[-22:],
np.array([qf[-1]]),
np.array([ef[-1]])
]
return {'qf': qf, 'ef': ef}
[docs]
def fit_predict(self, y, ti, seed=None, return_train=True, q0=None,
nV=102, n_init=3, n_rep=5):
"""
Fit the model on training data and predict on test data.
Args:
y (ndarray): Full return series.
ti (int): Training set length.
seed (int, optional): Random seed.
return_train (bool, optional): Return training predictions. Default is True.
q0 (list, optional): Initial [VaR, ES].
nV (int, optional): Number of random initializations.
n_init (int, optional): Number of best initializations to refine.
n_rep (int, optional): Number of optimization repetitions.
Returns:
dict: Keys 'qi', 'ei', 'qf', 'ef', 'beta'.
"""
yi, yf = y[:ti], y[ti:]
if return_train:
res_train = self.fit(yi, seed=seed, return_train=True, q0=q0,
nV=nV, n_init=n_init, n_rep=n_rep)
res_test = self.predict(yf)
return {
'qi': res_train['qi'],
'ei': res_train['ei'],
'qf': res_test['qf'],
'ef': res_test['ef'],
'beta': self.beta.reshape((2, self.n_parameters))
}
else:
self.fit(yi, seed=seed, return_train=False, q0=q0,
nV=nV, n_init=n_init, n_rep=n_rep)
res_test = self.predict(yf)
return {
'qf': res_test['qf'],
'ef': res_test['ef'],
'beta': self.beta.reshape((2, self.n_parameters))
}
# Convenience function for model selection
[docs]
def HAR_CAESar_model(theta, lambdas=dict()):
"""
Factory function for HAR-CAESar model.
Args:
theta (float): Quantile level.
lambdas (dict, optional): Penalty weights.
Returns:
HAR_CAESar: Model instance.
"""
return HAR_CAESar(theta, lambdas=lambdas)