Coverage for src / basanos / math / _ewm_corr.py: 100%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-02 17:47 +0000

1"""Pure-NumPy exponentially weighted moving correlation. 

2 

3This module contains :func:`ewm_corr`, the vectorised 

4IIR-filter implementation of per-row EWM correlation matrices. It is 

5separated from :mod:`basanos.math.optimizer` so that the algorithm can 

6be read, tested, and profiled in isolation without loading the full 

7engine machinery. 

8""" 

9 

10import dataclasses 

11 

12import numpy as np 

13from scipy.signal import lfilter 

14 

15 

16@dataclasses.dataclass 

17class _EwmCorrState: 

18 """Final IIR filter memory after a full EWM correlation pass. 

19 

20 Returned alongside the correlation tensor by 

21 :func:`_ewm_corr_with_final_state`. Pass these values as the ``zi`` 

22 arguments to ``scipy.signal.lfilter`` on the next step to continue the 

23 IIR recurrence without replaying history. 

24 

25 For a first-order filter ``a = [1, -beta]``, ``b = [1]``, the final 

26 state after processing the last sample ``y[-1]`` is ``zf[0] = beta * 

27 y[-1]``. Each ``corr_zi_*`` field stores exactly that quantity, 

28 shaped ``(1, N, N)`` to match the ``zi`` format expected by 

29 ``lfilter(..., axis=0)``. 

30 

31 Attributes: 

32 corr_zi_x: Final state for the ``x``-accumulator; shape ``(1, N, N)``. 

33 corr_zi_x2: Final state for the ``x²``-accumulator; shape ``(1, N, N)``. 

34 corr_zi_xy: Final state for the ``xy``-accumulator; shape ``(1, N, N)``. 

35 corr_zi_w: Final state for the weight-accumulator; shape ``(1, N, N)``. 

36 count: Cumulative joint-finite observation count per asset pair 

37 at the last timestep; shape ``(N, N)`` dtype int64. 

38 """ 

39 

40 corr_zi_x: np.ndarray # (1, N, N) 

41 corr_zi_x2: np.ndarray # (1, N, N) 

42 corr_zi_xy: np.ndarray # (1, N, N) 

43 corr_zi_w: np.ndarray # (1, N, N) 

44 count: np.ndarray # (N, N) int64 

45 

46 

47def _corr_from_ewm_accumulators( 

48 s_x: np.ndarray, 

49 s_x2: np.ndarray, 

50 s_xy: np.ndarray, 

51 s_w: np.ndarray, 

52 count: np.ndarray, 

53 min_periods: int, 

54 min_corr_denom: float = 1e-14, 

55) -> np.ndarray: 

56 """Compute a single EWM correlation matrix from running accumulators. 

57 

58 Single-timestep equivalent of :func:`_ewm_corr_with_final_state`: applies 

59 the same formula to ``(N, N)`` arrays instead of ``(T, N, N)`` tensors. 

60 Called by :meth:`BasanosStream.step` after advancing the IIR filter state 

61 by one row to reconstruct the current correlation matrix without revisiting 

62 history. 

63 

64 This is the **canonical** implementation of the EWM correlation formula 

65 shared by both the batch and the incremental paths. Any change to the 

66 formula (e.g. symmetrisation, denominator guard, NaN-masking) must be made 

67 here only — :func:`_ewm_corr_with_final_state` delegates its inner 

68 computation to this function so that a single definition is maintained. 

69 

70 Args: 

71 s_x: Running EWM sum of x; shape ``(N, N)``. 

72 s_x2: Running EWM sum of x²; shape ``(N, N)``. 

73 s_xy: Running EWM sum of x·y; shape ``(N, N)``. 

74 s_w: Running EWM sum of weights; shape ``(N, N)``. 

75 count: Cumulative joint-finite observation count; shape ``(N, N)`` 

76 dtype int. 

77 min_periods: Minimum joint-finite observations before a value is 

78 reported; earlier entries are ``NaN``. 

79 min_corr_denom: Guard threshold below which the correlation denominator 

80 is treated as zero and the result is set to ``NaN``. 

81 

82 Returns: 

83 np.ndarray of shape ``(N, N)``: symmetric correlation matrix with 

84 diagonal ``1.0`` (or ``NaN`` during warm-up) and off-diagonal entries 

85 in ``[-1, 1]``. 

86 """ 

87 n_assets = s_x.shape[0] 

88 

89 with np.errstate(divide="ignore", invalid="ignore"): 

90 pos_w = s_w > 0 

91 ewm_x = np.where(pos_w, s_x / s_w, np.nan) 

92 ewm_y = np.where(pos_w, s_x.T / s_w, np.nan) 

93 ewm_x2 = np.where(pos_w, s_x2 / s_w, np.nan) 

94 ewm_y2 = np.where(pos_w, s_x2.T / s_w, np.nan) 

95 ewm_xy = np.where(pos_w, s_xy / s_w, np.nan) 

96 

97 var_x = np.maximum(ewm_x2 - ewm_x**2, 0.0) 

98 var_y = np.maximum(ewm_y2 - ewm_y**2, 0.0) 

99 denom_corr = np.sqrt(var_x * var_y) 

100 cov = ewm_xy - ewm_x * ewm_y 

101 

102 with np.errstate(divide="ignore", invalid="ignore"): 

103 corr = np.where(denom_corr > min_corr_denom, cov / denom_corr, np.nan) 

104 corr = np.clip(corr, -1.0, 1.0) 

105 corr[count < min_periods] = np.nan 

106 

107 diag_idx = np.arange(n_assets) 

108 corr[diag_idx, diag_idx] = np.where(count[diag_idx, diag_idx] >= min_periods, 1.0, np.nan) 

109 

110 # Enforce symmetry: average strict lower/upper triangle pairs in place. 

111 tril_i, tril_j = np.tril_indices(n_assets, k=-1) 

112 upper_vals = corr[tril_j, tril_i] 

113 lower_vals = corr[tril_i, tril_j] 

114 avg_vals = 0.5 * (upper_vals + lower_vals) 

115 corr[tril_i, tril_j] = avg_vals 

116 corr[tril_j, tril_i] = avg_vals 

117 

118 return corr 

119 

120 

121def _ewm_corr_with_final_state( 

122 data: np.ndarray, 

123 com: int, 

124 min_periods: int, 

125 min_corr_denom: float = 1e-14, 

126) -> tuple[np.ndarray, _EwmCorrState]: 

127 """Compute per-row EWM correlation matrices and return the final IIR state. 

128 

129 Identical to :func:`ewm_corr` but also returns the final filter 

130 memory as an :class:`_EwmCorrState`. Callers that need both the 

131 correlation tensor *and* the IIR state (e.g. 

132 :meth:`BasanosEngine.warmup_state`) should call this function once rather 

133 than calling :func:`ewm_corr` and rerunning ``lfilter`` separately 

134 to extract the state. 

135 

136 Args: 

137 data: Float array of shape ``(T, N)`` — typically volatility-adjusted 

138 log returns. 

139 com: EWM centre-of-mass. 

140 min_periods: Minimum joint-finite observations before a value is 

141 reported. 

142 min_corr_denom: Guard threshold for the correlation denominator. 

143 

144 Returns: 

145 tuple: ``(result, state)`` where ``result`` has shape ``(T, N, N)`` 

146 (see :func:`ewm_corr`) and ``state`` is the final 

147 :class:`_EwmCorrState`. 

148 """ 

149 _t_len, n_assets = data.shape 

150 beta = com / (1.0 + com) 

151 

152 fin = np.isfinite(data) 

153 xt_f = np.where(fin, data, 0.0) 

154 joint_fin = fin[:, :, np.newaxis] & fin[:, np.newaxis, :] 

155 

156 v_x = xt_f[:, :, np.newaxis] * joint_fin 

157 v_x2 = (xt_f * xt_f)[:, :, np.newaxis] * joint_fin 

158 v_xy = xt_f[:, :, np.newaxis] * xt_f[:, np.newaxis, :] 

159 v_w = joint_fin.astype(np.float64) 

160 

161 filt_a = np.array([1.0, -beta]) 

162 s_x = lfilter([1.0], filt_a, v_x, axis=0) 

163 s_x2 = lfilter([1.0], filt_a, v_x2, axis=0) 

164 s_xy = lfilter([1.0], filt_a, v_xy, axis=0) 

165 s_w = lfilter([1.0], filt_a, v_w, axis=0) 

166 

167 count = np.cumsum(joint_fin, axis=0) 

168 

169 # Final IIR state: for b=[1], a=[1,-beta] in direct-form-II-transposed, 

170 # the filter memory after sample y[-1] is zf[0] = beta * y[-1]. 

171 # Shaped (1, N, N) to match the zi format expected by lfilter. 

172 iir_state = _EwmCorrState( 

173 corr_zi_x=(beta * s_x[-1])[np.newaxis], 

174 corr_zi_x2=(beta * s_x2[-1])[np.newaxis], 

175 corr_zi_xy=(beta * s_xy[-1])[np.newaxis], 

176 corr_zi_w=(beta * s_w[-1])[np.newaxis], 

177 count=count[-1].astype(np.int64), 

178 ) 

179 

180 with np.errstate(divide="ignore", invalid="ignore"): 

181 pos_w = s_w > 0 

182 ewm_x = np.where(pos_w, s_x / s_w, np.nan) 

183 ewm_y = np.where(pos_w, s_x.swapaxes(1, 2) / s_w, np.nan) 

184 ewm_x2 = np.where(pos_w, s_x2 / s_w, np.nan) 

185 ewm_y2 = np.where(pos_w, s_x2.swapaxes(1, 2) / s_w, np.nan) 

186 ewm_xy = np.where(pos_w, s_xy / s_w, np.nan) 

187 

188 var_x = np.maximum(ewm_x2 - ewm_x * ewm_x, 0.0) 

189 var_y = np.maximum(ewm_y2 - ewm_y * ewm_y, 0.0) 

190 denom = np.sqrt(var_x * var_y) 

191 cov = ewm_xy - ewm_x * ewm_y 

192 

193 with np.errstate(divide="ignore", invalid="ignore"): 

194 result = np.where(denom > min_corr_denom, cov / denom, np.nan) 

195 

196 result = np.clip(result, -1.0, 1.0) 

197 result[count < min_periods] = np.nan 

198 

199 diag_idx = np.arange(n_assets) 

200 diag_count = count[:, diag_idx, diag_idx] 

201 result[:, diag_idx, diag_idx] = np.where(diag_count >= min_periods, 1.0, np.nan) 

202 

203 # Enforce symmetry without allocating an extra (T, N, N) temporary: 

204 # average only over strict triangle pairs and mirror in place. 

205 tril_i, tril_j = np.tril_indices(n_assets, k=-1) 

206 upper_vals = result[:, tril_j, tril_i] 

207 lower_vals = result[:, tril_i, tril_j] 

208 avg_vals = 0.5 * (upper_vals + lower_vals) 

209 result[:, tril_i, tril_j] = avg_vals 

210 result[:, tril_j, tril_i] = avg_vals 

211 

212 return result, iir_state 

213 

214 

215def ewm_corr( 

216 data: np.ndarray, 

217 com: int, 

218 min_periods: int, 

219 min_corr_denom: float = 1e-14, 

220) -> np.ndarray: 

221 """Compute per-row EWM correlation matrices without pandas. 

222 

223 Matches ``pandas.DataFrame.ewm(com=com, min_periods=min_periods).corr()`` 

224 with the default ``adjust=True, ignore_na=False`` settings to within 

225 floating-point rounding error. 

226 

227 All five EWM components used to compute ``corr(i, j)`` — namely 

228 ``ewm(x_i)``, ``ewm(x_j)``, ``ewm(x_i²)``, ``ewm(x_j²)``, and 

229 ``ewm(x_i·x_j)`` — share the **same joint weight structure**: weights 

230 decay at every timestep (``ignore_na=False``) but a new observation is 

231 only added at timesteps where *both* ``x_i`` and ``x_j`` are finite. As 

232 a result the correlation for a pair is frozen once either asset goes 

233 missing, exactly mirroring pandas behaviour. 

234 

235 The EWM recurrence ``s[t] = β·s[t-1] + v[t]`` is an IIR filter and is 

236 solved for **all N² pairs simultaneously** via ``scipy.signal.lfilter`` 

237 — no Python loop over the T timesteps. 

238 

239 Args: 

240 data: Float array of shape ``(T, N)`` - typically volatility-adjusted 

241 log returns. 

242 com: EWM centre-of-mass (``alpha = 1 / (1 + com)``). 

243 min_periods: Minimum number of joint finite observations required 

244 before a correlation value is reported; earlier rows are NaN. 

245 min_corr_denom: Guard threshold below which the correlation denominator 

246 is treated as zero and the result is set to NaN. Defaults to 

247 ``1e-14``. 

248 

249 Returns: 

250 np.ndarray of shape ``(T, N, N)`` containing the per-row correlation 

251 matrices. Each matrix is symmetric with diagonal 1.0 (or NaN during 

252 warm-up). 

253 

254 Performance: 

255 **Time** — O(T·N²): ``lfilter`` processes all N² pairs simultaneously, 

256 so wall-clock time scales linearly with both T and N². 

257 

258 **Memory** — approximately 14 float64 arrays of shape ``(T, N, N)`` 

259 exist at peak, giving roughly ``112 * T * N²`` bytes. For 100 assets 

260 over 2 520 trading days (~10 years) that is ≈ 2.8 GB; for 500 assets 

261 the same period requires ≈ 70 GB, which exceeds typical workstation 

262 RAM. Reduce T or N before calling this function when working with 

263 large universes. 

264 """ 

265 result, _ = _ewm_corr_with_final_state(data, com, min_periods, min_corr_denom) 

266 return result