Coverage for src / basanos / math / _ewm_corr.py: 100%
81 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-02 17:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-02 17:47 +0000
1"""Pure-NumPy exponentially weighted moving correlation.
3This module contains :func:`ewm_corr`, the vectorised
4IIR-filter implementation of per-row EWM correlation matrices. It is
5separated from :mod:`basanos.math.optimizer` so that the algorithm can
6be read, tested, and profiled in isolation without loading the full
7engine machinery.
8"""
10import dataclasses
12import numpy as np
13from scipy.signal import lfilter
16@dataclasses.dataclass
17class _EwmCorrState:
18 """Final IIR filter memory after a full EWM correlation pass.
20 Returned alongside the correlation tensor by
21 :func:`_ewm_corr_with_final_state`. Pass these values as the ``zi``
22 arguments to ``scipy.signal.lfilter`` on the next step to continue the
23 IIR recurrence without replaying history.
25 For a first-order filter ``a = [1, -beta]``, ``b = [1]``, the final
26 state after processing the last sample ``y[-1]`` is ``zf[0] = beta *
27 y[-1]``. Each ``corr_zi_*`` field stores exactly that quantity,
28 shaped ``(1, N, N)`` to match the ``zi`` format expected by
29 ``lfilter(..., axis=0)``.
31 Attributes:
32 corr_zi_x: Final state for the ``x``-accumulator; shape ``(1, N, N)``.
33 corr_zi_x2: Final state for the ``x²``-accumulator; shape ``(1, N, N)``.
34 corr_zi_xy: Final state for the ``xy``-accumulator; shape ``(1, N, N)``.
35 corr_zi_w: Final state for the weight-accumulator; shape ``(1, N, N)``.
36 count: Cumulative joint-finite observation count per asset pair
37 at the last timestep; shape ``(N, N)`` dtype int64.
38 """
40 corr_zi_x: np.ndarray # (1, N, N)
41 corr_zi_x2: np.ndarray # (1, N, N)
42 corr_zi_xy: np.ndarray # (1, N, N)
43 corr_zi_w: np.ndarray # (1, N, N)
44 count: np.ndarray # (N, N) int64
47def _corr_from_ewm_accumulators(
48 s_x: np.ndarray,
49 s_x2: np.ndarray,
50 s_xy: np.ndarray,
51 s_w: np.ndarray,
52 count: np.ndarray,
53 min_periods: int,
54 min_corr_denom: float = 1e-14,
55) -> np.ndarray:
56 """Compute a single EWM correlation matrix from running accumulators.
58 Single-timestep equivalent of :func:`_ewm_corr_with_final_state`: applies
59 the same formula to ``(N, N)`` arrays instead of ``(T, N, N)`` tensors.
60 Called by :meth:`BasanosStream.step` after advancing the IIR filter state
61 by one row to reconstruct the current correlation matrix without revisiting
62 history.
64 This is the **canonical** implementation of the EWM correlation formula
65 shared by both the batch and the incremental paths. Any change to the
66 formula (e.g. symmetrisation, denominator guard, NaN-masking) must be made
67 here only — :func:`_ewm_corr_with_final_state` delegates its inner
68 computation to this function so that a single definition is maintained.
70 Args:
71 s_x: Running EWM sum of x; shape ``(N, N)``.
72 s_x2: Running EWM sum of x²; shape ``(N, N)``.
73 s_xy: Running EWM sum of x·y; shape ``(N, N)``.
74 s_w: Running EWM sum of weights; shape ``(N, N)``.
75 count: Cumulative joint-finite observation count; shape ``(N, N)``
76 dtype int.
77 min_periods: Minimum joint-finite observations before a value is
78 reported; earlier entries are ``NaN``.
79 min_corr_denom: Guard threshold below which the correlation denominator
80 is treated as zero and the result is set to ``NaN``.
82 Returns:
83 np.ndarray of shape ``(N, N)``: symmetric correlation matrix with
84 diagonal ``1.0`` (or ``NaN`` during warm-up) and off-diagonal entries
85 in ``[-1, 1]``.
86 """
87 n_assets = s_x.shape[0]
89 with np.errstate(divide="ignore", invalid="ignore"):
90 pos_w = s_w > 0
91 ewm_x = np.where(pos_w, s_x / s_w, np.nan)
92 ewm_y = np.where(pos_w, s_x.T / s_w, np.nan)
93 ewm_x2 = np.where(pos_w, s_x2 / s_w, np.nan)
94 ewm_y2 = np.where(pos_w, s_x2.T / s_w, np.nan)
95 ewm_xy = np.where(pos_w, s_xy / s_w, np.nan)
97 var_x = np.maximum(ewm_x2 - ewm_x**2, 0.0)
98 var_y = np.maximum(ewm_y2 - ewm_y**2, 0.0)
99 denom_corr = np.sqrt(var_x * var_y)
100 cov = ewm_xy - ewm_x * ewm_y
102 with np.errstate(divide="ignore", invalid="ignore"):
103 corr = np.where(denom_corr > min_corr_denom, cov / denom_corr, np.nan)
104 corr = np.clip(corr, -1.0, 1.0)
105 corr[count < min_periods] = np.nan
107 diag_idx = np.arange(n_assets)
108 corr[diag_idx, diag_idx] = np.where(count[diag_idx, diag_idx] >= min_periods, 1.0, np.nan)
110 # Enforce symmetry: average strict lower/upper triangle pairs in place.
111 tril_i, tril_j = np.tril_indices(n_assets, k=-1)
112 upper_vals = corr[tril_j, tril_i]
113 lower_vals = corr[tril_i, tril_j]
114 avg_vals = 0.5 * (upper_vals + lower_vals)
115 corr[tril_i, tril_j] = avg_vals
116 corr[tril_j, tril_i] = avg_vals
118 return corr
121def _ewm_corr_with_final_state(
122 data: np.ndarray,
123 com: int,
124 min_periods: int,
125 min_corr_denom: float = 1e-14,
126) -> tuple[np.ndarray, _EwmCorrState]:
127 """Compute per-row EWM correlation matrices and return the final IIR state.
129 Identical to :func:`ewm_corr` but also returns the final filter
130 memory as an :class:`_EwmCorrState`. Callers that need both the
131 correlation tensor *and* the IIR state (e.g.
132 :meth:`BasanosEngine.warmup_state`) should call this function once rather
133 than calling :func:`ewm_corr` and rerunning ``lfilter`` separately
134 to extract the state.
136 Args:
137 data: Float array of shape ``(T, N)`` — typically volatility-adjusted
138 log returns.
139 com: EWM centre-of-mass.
140 min_periods: Minimum joint-finite observations before a value is
141 reported.
142 min_corr_denom: Guard threshold for the correlation denominator.
144 Returns:
145 tuple: ``(result, state)`` where ``result`` has shape ``(T, N, N)``
146 (see :func:`ewm_corr`) and ``state`` is the final
147 :class:`_EwmCorrState`.
148 """
149 _t_len, n_assets = data.shape
150 beta = com / (1.0 + com)
152 fin = np.isfinite(data)
153 xt_f = np.where(fin, data, 0.0)
154 joint_fin = fin[:, :, np.newaxis] & fin[:, np.newaxis, :]
156 v_x = xt_f[:, :, np.newaxis] * joint_fin
157 v_x2 = (xt_f * xt_f)[:, :, np.newaxis] * joint_fin
158 v_xy = xt_f[:, :, np.newaxis] * xt_f[:, np.newaxis, :]
159 v_w = joint_fin.astype(np.float64)
161 filt_a = np.array([1.0, -beta])
162 s_x = lfilter([1.0], filt_a, v_x, axis=0)
163 s_x2 = lfilter([1.0], filt_a, v_x2, axis=0)
164 s_xy = lfilter([1.0], filt_a, v_xy, axis=0)
165 s_w = lfilter([1.0], filt_a, v_w, axis=0)
167 count = np.cumsum(joint_fin, axis=0)
169 # Final IIR state: for b=[1], a=[1,-beta] in direct-form-II-transposed,
170 # the filter memory after sample y[-1] is zf[0] = beta * y[-1].
171 # Shaped (1, N, N) to match the zi format expected by lfilter.
172 iir_state = _EwmCorrState(
173 corr_zi_x=(beta * s_x[-1])[np.newaxis],
174 corr_zi_x2=(beta * s_x2[-1])[np.newaxis],
175 corr_zi_xy=(beta * s_xy[-1])[np.newaxis],
176 corr_zi_w=(beta * s_w[-1])[np.newaxis],
177 count=count[-1].astype(np.int64),
178 )
180 with np.errstate(divide="ignore", invalid="ignore"):
181 pos_w = s_w > 0
182 ewm_x = np.where(pos_w, s_x / s_w, np.nan)
183 ewm_y = np.where(pos_w, s_x.swapaxes(1, 2) / s_w, np.nan)
184 ewm_x2 = np.where(pos_w, s_x2 / s_w, np.nan)
185 ewm_y2 = np.where(pos_w, s_x2.swapaxes(1, 2) / s_w, np.nan)
186 ewm_xy = np.where(pos_w, s_xy / s_w, np.nan)
188 var_x = np.maximum(ewm_x2 - ewm_x * ewm_x, 0.0)
189 var_y = np.maximum(ewm_y2 - ewm_y * ewm_y, 0.0)
190 denom = np.sqrt(var_x * var_y)
191 cov = ewm_xy - ewm_x * ewm_y
193 with np.errstate(divide="ignore", invalid="ignore"):
194 result = np.where(denom > min_corr_denom, cov / denom, np.nan)
196 result = np.clip(result, -1.0, 1.0)
197 result[count < min_periods] = np.nan
199 diag_idx = np.arange(n_assets)
200 diag_count = count[:, diag_idx, diag_idx]
201 result[:, diag_idx, diag_idx] = np.where(diag_count >= min_periods, 1.0, np.nan)
203 # Enforce symmetry without allocating an extra (T, N, N) temporary:
204 # average only over strict triangle pairs and mirror in place.
205 tril_i, tril_j = np.tril_indices(n_assets, k=-1)
206 upper_vals = result[:, tril_j, tril_i]
207 lower_vals = result[:, tril_i, tril_j]
208 avg_vals = 0.5 * (upper_vals + lower_vals)
209 result[:, tril_i, tril_j] = avg_vals
210 result[:, tril_j, tril_i] = avg_vals
212 return result, iir_state
215def ewm_corr(
216 data: np.ndarray,
217 com: int,
218 min_periods: int,
219 min_corr_denom: float = 1e-14,
220) -> np.ndarray:
221 """Compute per-row EWM correlation matrices without pandas.
223 Matches ``pandas.DataFrame.ewm(com=com, min_periods=min_periods).corr()``
224 with the default ``adjust=True, ignore_na=False`` settings to within
225 floating-point rounding error.
227 All five EWM components used to compute ``corr(i, j)`` — namely
228 ``ewm(x_i)``, ``ewm(x_j)``, ``ewm(x_i²)``, ``ewm(x_j²)``, and
229 ``ewm(x_i·x_j)`` — share the **same joint weight structure**: weights
230 decay at every timestep (``ignore_na=False``) but a new observation is
231 only added at timesteps where *both* ``x_i`` and ``x_j`` are finite. As
232 a result the correlation for a pair is frozen once either asset goes
233 missing, exactly mirroring pandas behaviour.
235 The EWM recurrence ``s[t] = β·s[t-1] + v[t]`` is an IIR filter and is
236 solved for **all N² pairs simultaneously** via ``scipy.signal.lfilter``
237 — no Python loop over the T timesteps.
239 Args:
240 data: Float array of shape ``(T, N)`` - typically volatility-adjusted
241 log returns.
242 com: EWM centre-of-mass (``alpha = 1 / (1 + com)``).
243 min_periods: Minimum number of joint finite observations required
244 before a correlation value is reported; earlier rows are NaN.
245 min_corr_denom: Guard threshold below which the correlation denominator
246 is treated as zero and the result is set to NaN. Defaults to
247 ``1e-14``.
249 Returns:
250 np.ndarray of shape ``(T, N, N)`` containing the per-row correlation
251 matrices. Each matrix is symmetric with diagonal 1.0 (or NaN during
252 warm-up).
254 Performance:
255 **Time** — O(T·N²): ``lfilter`` processes all N² pairs simultaneously,
256 so wall-clock time scales linearly with both T and N².
258 **Memory** — approximately 14 float64 arrays of shape ``(T, N, N)``
259 exist at peak, giving roughly ``112 * T * N²`` bytes. For 100 assets
260 over 2 520 trading days (~10 years) that is ≈ 2.8 GB; for 500 assets
261 the same period requires ≈ 70 GB, which exceeds typical workstation
262 RAM. Reduce T or N before calling this function when working with
263 large universes.
264 """
265 result, _ = _ewm_corr_with_final_state(data, com, min_periods, min_corr_denom)
266 return result