Coverage for src/fast_minimum_variance/minvar_problem.py: 100%
213 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-02 13:28 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-02 13:28 +0000
1"""Minimum-variance solver: primal asset elimination with dual-feasibility check."""
3from collections.abc import Callable
4from dataclasses import dataclass
5from typing import Any
7import clarabel
8import numpy as np
9from cvx.linalg import DenseOperator, FactorOperator, GramOperator, SumOperator, cholesky
10from scipy.linalg import solve as spd_solve
11from scipy.optimize import nnls
12from scipy.sparse import csc_matrix, eye, vstack
13from scipy.sparse.linalg import LinearOperator, cg
15from ._base import _BaseProblem
18@dataclass(frozen=True)
19class _MinVarProblem(_BaseProblem):
20 """Minimum-variance portfolio solver via primal-dual active-set iteration.
22 Solves::
24 min (1-alpha)||X w||^2 + alpha*(||X||_F^2/N)*||w||^2 - rho*mu^T w
25 s.t. 1^T w = 1, w >= 0
27 Each inner step solves the equality-constrained subproblem over the current
28 active asset set. Stationarity gives ``2*Sigma_a*w_a = lambda*1 + rho*mu_a``
29 where ``Sigma_a = (1-alpha)*X_a^T X_a + ridge*I``. Solving the ``n_a x n_a``
30 SPD system ``Sigma_a v = 1`` (and ``Sigma_a v2 = mu_a`` when ``rho != 0``)
31 and recovering ``lambda`` from the budget constraint avoids the indefinite
32 ``(n_a+1) x (n_a+1)`` saddle-point system entirely. The outer primal-dual
33 loop enforces ``w >= 0`` and terminates when both primal and dual feasibility
34 hold simultaneously.
36 Use ``alpha = N/(N+T)`` for Ledoit-Wolf shrinkage intensity::
38 T, N = X.shape
39 w, iters = Problem(X, alpha=N/(N+T)).solve_kkt()
41 Examples:
42 >>> import numpy as np
43 >>> from fast_minimum_variance import Problem
44 >>> X = np.random.default_rng(0).standard_normal((100, 5))
45 >>> w, iters = Problem(X).solve_kkt()
46 >>> float(round(w.sum(), 6))
47 1.0
48 >>> bool((w >= 0).all())
49 True
50 """
52 # No extra fields — X, alpha, rho, mu all inherited from _BaseProblem.
54 # ------------------------------------------------------------------
55 # Shared helpers used by both active-set loop variants
56 # ------------------------------------------------------------------
58 def _compute_gradient(self, w: np.ndarray) -> np.ndarray:
59 """Return the full objective gradient at w, including rho*mu adjustment."""
60 data_grad = (self.X.T @ (self.X @ w)) / self.t
61 if self.target_lr is not None:
62 bar_lam, U_k, delta_k = self.target_lr # noqa: N806
63 tgt_w = bar_lam * w + U_k @ (delta_k * (U_k.T @ w))
64 grad = 2.0 * ((1 - self.alpha) * data_grad + self.alpha * tgt_w)
65 elif self.target is not None:
66 grad = 2.0 * ((1 - self.alpha) * data_grad + self.alpha * self.target @ w)
67 else:
68 grad = 2.0 * data_grad
69 if self.rho != 0.0 and self.mu is not None:
70 grad = grad - self.rho * self.mu
71 result: np.ndarray = grad
72 return result
74 @staticmethod
75 def _primal_drop(w_a: np.ndarray, asset_active: np.ndarray, tol: float) -> bool:
76 """Drop negative-weight assets from active set in-place; return True if any dropped."""
77 if not np.any(w_a < -tol):
78 return False
79 idx = np.where(asset_active)[0]
80 strong = w_a < -10 * tol
81 if np.any(strong):
82 asset_active[idx[strong]] = False
83 else:
84 asset_active[idx[np.argmin(w_a)]] = False
85 return True
87 def _dual_add(self, grad: np.ndarray, asset_active: np.ndarray, tol: float) -> int:
88 """Return index of excluded asset that violates KKT dual condition, or -1 if none."""
89 excluded = ~asset_active
90 if not excluded.any():
91 return -1
92 g_a = grad[asset_active]
93 lambda_ = np.median(g_a) if g_a.size > 5 else g_a.mean()
94 nu = grad - lambda_
95 idx_ex = np.where(excluded)[0]
96 j = idx_ex[np.argmin(nu[excluded])]
97 return int(j) if nu[j] < -tol else -1
99 # ------------------------------------------------------------------
100 # Outer loop: primal elimination + dual feasibility check
101 # ------------------------------------------------------------------
102 def _constraint_active_set(
103 self,
104 solve_fn: Callable[[np.ndarray], tuple[np.ndarray, int]],
105 tol: float = 1e-6,
106 max_iter: int = 10_000,
107 ) -> tuple[np.ndarray, int, int]:
108 """Run the primal-dual active-set loop enforcing ``w >= 0``.
110 Calls ``solve_fn(active_mask)`` repeatedly. The *primal step* drops assets
111 with negative weights; the *dual step* re-adds any excluded asset whose KKT
112 gradient condition is violated. Terminates when both conditions hold
113 simultaneously, which together with stationarity is sufficient for global
114 optimality.
115 """
116 n = self.n
117 asset_active = np.ones(n, dtype=bool)
118 total_inner_iters = 0
119 outer_steps = 0
120 prev_active = None
121 w = np.zeros(n)
123 for _ in range(max_iter):
124 if prev_active is not None and np.array_equal(prev_active, asset_active):
125 break # pragma: no cover - structurally unreachable safety guard
126 prev_active = asset_active.copy()
128 w_a, step_iters = solve_fn(asset_active)
129 outer_steps += 1
130 total_inner_iters += step_iters
132 if self._primal_drop(w_a, asset_active, tol):
133 continue
135 w = np.zeros(n)
136 w[asset_active] = w_a
138 j = self._dual_add(self._compute_gradient(w), asset_active, tol)
139 if j < 0:
140 break
141 asset_active[j] = True
143 return w, outer_steps, total_inner_iters
145 # ------------------------------------------------------------------
146 # Inner steps
147 # ------------------------------------------------------------------
149 def _kkt_step(self, active: np.ndarray, x0: np.ndarray | None = None) -> tuple[np.ndarray, int]: # noqa: ARG002
150 """Solve the reduced SPD system directly; return ``(w_a, 1)``.
152 Stationarity gives ``2*Sigma_a*w_a = lambda*1 + rho*mu_a``. A single
153 solve with two RHS columns yields ``v1 = Sigma_a^{-1} 1`` and
154 ``v2 = Sigma_a^{-1} mu_a``; the budget constraint then pins ``lambda``
155 analytically as ``lambda = 2*(1 - rho/2 * sum(v2)) / sum(v1)``.
157 When ``alpha=1`` and ``target_lr`` is set the system is purely the RMT
158 target ``T0 = bar_lam*I + U_k diag(delta_k) U_k^T``. The Woodbury
159 identity gives the exact inverse in O(n_a*k + k^3) without CG iterations:
160 ``T0^{-1} b = b/bar_lam - U_k_a W^{-1}(U_k_a^T b)/bar_lam^2``
161 where ``W = diag(1/delta_k) + U_k_a^T U_k_a / bar_lam``.
162 """
163 n_a = int(active.sum())
165 # Woodbury direct solve: O(n_a*k + k^3) for alpha=1, RMT target. The target
166 # T0 = bar_lam*I + U_k diag(delta_k) U_k^T is a diagonal-plus-low-rank operator,
167 # so its active-block inverse is exactly cvx-linalg's FactorOperator.solve_free.
168 if self.alpha == 1.0 and self.target_lr is not None:
169 bar_lam, U_k, delta_k = self.target_lr # noqa: N806
170 t0 = FactorOperator(np.full(U_k.shape[0], bar_lam), U_k, np.diag(delta_k))
171 idx = np.flatnonzero(active)
173 if self.rho == 0.0 or self.mu is None:
174 v = t0.solve_free(idx, np.ones(n_a))
175 return v / v.sum(), 1
176 v1, v2 = t0.solve_free(idx, np.column_stack([np.ones(n_a), self.mu[active]])).T
177 half_rho = 0.5 * self.rho
178 half_lambda = (1.0 - half_rho * v2.sum()) / v1.sum()
179 return half_lambda * v1 + half_rho * v2, 1
181 x_a = self.X[:, active]
182 if self.target is None:
183 sigma = (x_a.T @ x_a) / self.t
184 else:
185 sigma = (1.0 - self.alpha) * (x_a.T @ x_a) / self.t + self.alpha * self.target[np.ix_(active, active)]
187 if self.rho == 0.0 or self.mu is None:
188 v = spd_solve(sigma, np.ones(n_a), assume_a="pos")
189 return v / v.sum(), 1
191 v1, v2 = spd_solve(sigma, np.column_stack([np.ones(n_a), self.mu[active]]), assume_a="pos").T
192 half_rho = 0.5 * self.rho
193 half_lambda = (1.0 - half_rho * v2.sum()) / v1.sum()
194 return half_lambda * v1 + half_rho * v2, 1
196 def _cvxpy_constraints(self, w: Any, cp: Any) -> list[Any]:
197 """Return budget-equality and long-only inequality constraints for CVXPY."""
198 return [cp.sum(w) == 1, w >= 0]
200 def _system_operator(self) -> SumOperator:
201 """Build ``Sigma = (1-alpha)/T * X^T X + alpha * T0`` as a cvx-linalg operator.
203 A :class:`~cvx.linalg.SumOperator` of the data Gram term and, when present,
204 the target term (a :class:`~cvx.linalg.FactorOperator` for a low-rank RMT
205 target, else a :class:`~cvx.linalg.DenseOperator`). The full-universe
206 operators are sliced to the active set via ``apply_free``; nothing is
207 formed at ``n x n``. Without a target the data term carries the full weight.
208 """
209 has_target = self.target_lr is not None or self.target is not None
210 c_data = (1.0 - self.alpha) if has_target else 1.0
211 terms: list[tuple[float, Any]] = [(c_data / self.t, GramOperator(self.X))]
212 if self.target_lr is not None:
213 bar_lam, u_k, delta_k = self.target_lr
214 terms.append((self.alpha, FactorOperator(np.full(u_k.shape[0], bar_lam), u_k, np.diag(delta_k))))
215 elif self.target is not None:
216 terms.append((self.alpha, DenseOperator(self.target)))
217 return SumOperator(terms)
219 def _cg_step(self, active: np.ndarray, x0: np.ndarray | None = None) -> tuple[np.ndarray, int]:
220 """Solve the reduced SPD system via matrix-free CG; return ``(w_a, iters)``.
222 Runs conjugate gradients over the active-set system operator
223 (:meth:`_system_operator`, applied through ``apply_free``) without ever
224 forming ``Sigma_a`` explicitly. Low-rank and dense targets share one path.
226 Args:
227 active: Boolean mask selecting the active asset subset.
228 x0: Optional initial guess for the first CG solve (warm start).
229 When provided it must have shape ``(active.sum(),)``.
230 """
231 n_a = int(active.sum())
232 sigma = self._system_operator()
233 active_idx = np.flatnonzero(active)
234 count = [0]
236 def matvec(v: np.ndarray) -> np.ndarray:
237 """Apply Sigma_a to v via the operator's free-block product."""
238 count[0] += 1
239 return sigma.apply_free(active_idx, v)
241 op = LinearOperator((n_a, n_a), matvec=matvec, dtype=np.float64) # ty:ignore[missing-argument, parameter-already-assigned, unknown-argument]
243 if self.rho == 0.0 or self.mu is None:
244 v, _ = cg(op, np.ones(n_a), x0=x0)
245 return v / v.sum(), count[0]
247 v1, _ = cg(op, np.ones(n_a), x0=x0)
248 v2, _ = cg(op, self.mu[active], x0=x0)
249 half_rho = 0.5 * self.rho
250 half_lambda = (1.0 - half_rho * v2.sum()) / v1.sum()
251 return half_lambda * v1 + half_rho * v2, count[0]
253 def _pcg_step(self, active: np.ndarray, x0: np.ndarray | None = None) -> tuple[np.ndarray, int]:
254 """Solve the reduced SPD system via PCG with RMT preconditioner; return (w_a, iters).
256 The system matrix is the oracle-LW covariance (using self.alpha and self.target).
257 The preconditioner P = T0^RMT is applied via the Woodbury identity:
258 P^{-1} v = (1/bar_lam) v + U_k diag(1/lambda_k - 1/bar_lam) U_k^T v
259 costing O(n_a * k) per application. Requires self.pcg_lr to be set.
260 """
261 n_a = int(active.sum())
263 # System matvec — the same active-set operator as _cg_step.
264 sigma = self._system_operator()
265 active_idx = np.flatnonzero(active)
266 count = [0]
268 def matvec(v: np.ndarray) -> np.ndarray:
269 """Apply the active-set system matrix Sigma_a to v."""
270 count[0] += 1
271 return sigma.apply_free(active_idx, v)
273 op = LinearOperator((n_a, n_a), matvec=matvec, dtype=np.float64) # ty:ignore[missing-argument, parameter-already-assigned, unknown-argument]
275 # Preconditioner P^{-1}: Woodbury inverse of T0^RMT restricted to active set
276 pcg_lr = self.pcg_lr
277 if pcg_lr is None: # pragma: no cover - defensive; solve_pcg validates pcg_lr upfront (see _base.py)
278 raise RuntimeError("_pcg_step called without pcg_lr") # noqa: TRY003
279 bar_lam_p, U_k_p, delta_k_p = pcg_lr # noqa: N806
280 U_k_a_p = U_k_p[active, :] # noqa: N806 # (n_a, k)
281 inv_coeff = 1.0 / (bar_lam_p + delta_k_p) - 1.0 / bar_lam_p # (k,) negative
283 def precond(v: np.ndarray) -> np.ndarray:
284 """Apply P^{-1} to v via the Woodbury identity."""
285 result: np.ndarray = (1.0 / bar_lam_p) * v + U_k_a_p @ (inv_coeff * (U_k_a_p.T @ v))
286 return result
288 M_op = LinearOperator((n_a, n_a), matvec=precond, dtype=np.float64) # ty:ignore[missing-argument, parameter-already-assigned, unknown-argument] # noqa: N806
290 v, _ = cg(op, np.ones(n_a), x0=x0, M=M_op)
291 return v / v.sum(), count[0]
293 def _constraint_active_set_warm(
294 self,
295 solve_fn: Callable[..., tuple[np.ndarray, int]] | None = None,
296 tol: float = 1e-6,
297 max_iter: int = 10_000,
298 warm_start: tuple[np.ndarray, np.ndarray] | None = None,
299 ) -> tuple[np.ndarray, int, int, np.ndarray, np.ndarray | None]:
300 """Active-set loop with warm-starting; returns ``(w, iters, active, w_full)``.
302 Generalises ``_constraint_active_set``: accepts an initial active set
303 and passes the previous iterate as a starting guess to the inner solver.
304 Solvers that support an initial guess (CG via ``_cg_step``) benefit from
305 both the warm active set and the x0; direct solvers (KKT via
306 ``_kkt_step``) accept and silently ignore the x0, profiting only from
307 the warm active set.
309 Args:
310 solve_fn: Inner solver callable ``(active, x0=None) -> (w_a, iters)``.
311 Defaults to ``self._cg_step``.
312 tol: Primal feasibility tolerance; assets with weight below ``-tol``
313 are dropped from the active set.
314 max_iter: Maximum number of outer active-set iterations.
315 warm_start: Optional ``(active_mask, w_full)`` from a previous call.
316 ``active_mask`` is a boolean array of length ``n``;
317 ``w_full`` is the full ``n``-vector of weights.
319 Returns:
320 ``(w, total_iters, final_active, w_full)`` — solution, cumulative
321 iteration count, final active-set mask, and full weight vector
322 suitable as the ``warm_start`` argument for the next solve.
323 """
324 if solve_fn is None:
325 solve_fn = self._cg_step
326 n = self.n
328 if warm_start is not None:
329 asset_active, last_w_full = warm_start
330 asset_active = asset_active.copy()
331 else:
332 asset_active = np.ones(n, dtype=bool)
333 last_w_full = None
335 total_inner_iters = 0
336 outer_steps = 0
337 prev_active = None
338 w = np.zeros(n)
340 for _ in range(max_iter):
341 if prev_active is not None and np.array_equal(prev_active, asset_active):
342 break # pragma: no cover - structurally unreachable safety guard
343 prev_active = asset_active.copy()
345 x0 = None
346 if last_w_full is not None:
347 sub = last_w_full[asset_active]
348 s = sub.sum()
349 if s > 1e-12:
350 x0 = sub / s
352 w_a, step_iters = solve_fn(asset_active, x0=x0)
353 outer_steps += 1
354 total_inner_iters += step_iters
356 if self._primal_drop(w_a, asset_active, tol):
357 continue
359 w = np.zeros(n)
360 w[asset_active] = w_a
361 last_w_full = w.copy()
363 j = self._dual_add(self._compute_gradient(w), asset_active, tol)
364 if j < 0:
365 break
366 asset_active[j] = True
368 return w, outer_steps, total_inner_iters, asset_active.copy(), last_w_full
370 def solve_cg_warm(
371 self,
372 *,
373 project: bool = True,
374 warm_start: tuple[np.ndarray, np.ndarray] | None = None,
375 ) -> tuple[np.ndarray, int, int, tuple[np.ndarray, np.ndarray | None]]:
376 """Solve via matrix-free CG with warm-starting.
378 Like ``solve_cg`` but accepts and returns warm-start state so that a
379 sequence of related problems (e.g. an efficient-frontier sweep over
380 many ``rho`` values) can chain solves together. Adjacent problems share
381 a similar active set and similar solution, so subsequent solves need
382 far fewer outer iterations and CG steps.
384 Args:
385 project: Clip weights to ``[0, ∞)`` and renormalize to sum to 1.
386 warm_start: ``(active_mask, w_full)`` returned by a previous call,
387 or ``None`` for a cold start.
389 Returns:
390 ``(w, outer_steps, inner_iters, warm_state)`` — weight vector,
391 number of outer active-set steps, cumulative CG iteration count,
392 and warm state for the next call in the sequence.
394 Examples:
395 >>> import numpy as np
396 >>> from fast_minimum_variance import Problem
397 >>> X = np.random.default_rng(0).standard_normal((100, 5))
398 >>> mu = np.ones(5) * 0.01
399 >>> warm = None
400 >>> for rho in [0.0, 0.5, 1.0]:
401 ... p = Problem(X, rho=rho, mu=mu)
402 ... w, outer, inner, warm = p.solve_cg_warm(warm_start=warm)
403 >>> float(round(w.sum(), 10))
404 1.0
405 >>> bool((w >= 0).all())
406 True
407 """
408 w, outer, inner, final_active, final_w = self._constraint_active_set_warm(
409 solve_fn=self._cg_step, warm_start=warm_start
410 )
411 if project:
412 w = self._clip_and_renormalize(w)
413 return w, outer, inner, (final_active, final_w)
415 def solve_kkt_warm(
416 self,
417 *,
418 project: bool = True,
419 warm_start: tuple[np.ndarray, np.ndarray] | None = None,
420 ) -> tuple[np.ndarray, int, tuple[np.ndarray, np.ndarray | None]]:
421 """Solve via direct KKT factorisation with active-set warm-starting.
423 Like ``solve_kkt`` but accepts and returns warm-start state for chaining
424 a sequence of related problems. KKT is a direct solver, so only the
425 active-set mask is warm-started (not an iterative initial guess); the
426 benefit is fewer outer primal-dual iterations when consecutive problems
427 share a similar active set.
429 Args:
430 project: Clip weights to ``[0, ∞)`` and renormalize to sum to 1.
431 warm_start: ``(active_mask, w_full)`` returned by a previous call,
432 or ``None`` for a cold start.
434 Returns:
435 ``(w, outer_steps, warm_state)`` — weight vector, number of outer
436 active-set steps, and warm state for the next call in the sequence.
438 Examples:
439 >>> import numpy as np
440 >>> from fast_minimum_variance import Problem
441 >>> X = np.random.default_rng(0).standard_normal((100, 5))
442 >>> mu = np.ones(5) * 0.01
443 >>> warm = None
444 >>> for rho in [0.0, 0.5, 1.0]:
445 ... p = Problem(X, rho=rho, mu=mu)
446 ... w, outer, warm = p.solve_kkt_warm(warm_start=warm)
447 >>> float(round(w.sum(), 10))
448 1.0
449 >>> bool((w >= 0).all())
450 True
451 """
452 w, outer, _inner, final_active, final_w = self._constraint_active_set_warm(
453 solve_fn=self._kkt_step, warm_start=warm_start
454 )
455 if project:
456 w = self._clip_and_renormalize(w)
457 return w, outer, (final_active, final_w)
459 def _clarabel_constraints(self) -> tuple[csc_matrix, np.ndarray, list[Any]]:
460 """Return budget-equality and long-only inequality constraints for Clarabel."""
461 n = self.n
462 a_mat = vstack(
463 [csc_matrix(np.ones((1, n))), -eye(n, format="csc")],
464 format="csc",
465 )
467 b_vec = np.concatenate([[1.0], np.zeros(n)])
468 cones = [clarabel.ZeroConeT(1), clarabel.NonnegativeConeT(n)] # ty:ignore[unresolved-attribute]
469 return a_mat, b_vec, cones
471 def _osqp_constraints(self) -> tuple[csc_matrix, np.ndarray, np.ndarray]:
472 """Return budget-equality and long-only inequality constraints for OSQP."""
473 n = self.n
474 a_mat = vstack(
475 [csc_matrix(np.ones((1, n))), eye(n, format="csc")],
476 format="csc",
477 )
478 l_vec = np.concatenate([[1.0], np.zeros(n)])
479 u_vec = np.concatenate([[1.0], np.full(n, np.inf)])
480 return a_mat, l_vec, u_vec
482 def _nnls_solve(self) -> tuple[np.ndarray, int]:
483 """Solve via NNLS on the augmented return matrix; return ``(w, 1)``.
485 Builds ``A = [sqrt(1-alpha)*X ; sqrt(gamma)*I ; M*ones^T]`` and
486 solves ``min ||Aw||² s.t. w >= 0``. The budget row with weight
487 ``M = ||X||_F * T`` enforces ``ones^T w ≈ 1``; exact normalisation
488 is applied by the ``project`` step in ``solve_nnls``.
489 Return tilt (``rho != 0``) is not supported.
490 """
491 t = self.X.shape[0]
492 m = float(np.linalg.norm(self.X, "fro")) * t
494 if self.target is not None:
495 # target is the penalty matrix M; Cholesky gives chol s.t. chol @ chol.T = M,
496 # so sqrt(alpha)*chol.T rows enforce alpha * w^T M w in the LS objective.
497 chol = cholesky(self.target)
498 rows = [np.sqrt((1 - self.alpha) / self.t) * self.X]
499 tgt = [np.zeros(t)]
500 if self.alpha > 0.0:
501 rows.append(np.sqrt(self.alpha) * chol.T)
502 tgt.append(np.zeros(self.n))
503 else:
504 rows = [np.sqrt(1.0 / self.t) * self.X]
505 tgt = [np.zeros(t)]
506 rows.append(m * np.ones((1, self.n)))
507 tgt.append(np.array([m]))
509 w, _ = nnls(np.vstack(rows), np.concatenate(tgt))
510 return w, 1