Coverage for src/fast_minimum_variance/_base.py: 100%
145 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-02 13:28 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-02 13:28 +0000
1"""Common base for portfolio-optimisation problem classes."""
3from abc import ABC, abstractmethod
4from collections.abc import Callable
5from dataclasses import dataclass
6from typing import Any
8import clarabel
9import cvxpy as cp
10import numpy as np
11import osqp
12from cvx.linalg import cholesky
13from scipy.sparse import csc_matrix, triu
16@dataclass(frozen=True)
17class _BaseProblem(ABC):
18 """Shared fields, utilities, and solver templates for portfolio problems.
20 Subclasses must implement the seven abstract hooks:
22 * ``_constraint_active_set(solve_fn)`` — outer constraint-handling loop
23 * ``_kkt_step(mask) -> (w, iters)`` — one direct-KKT inner step
24 * ``_cg_step(mask) -> (w, iters)`` — one CG inner step
25 * ``_cvxpy_constraints(w, cp) -> list`` — CVXPY constraint list
26 * ``_clarabel_constraints() -> (A, b, cones)`` — Clarabel constraint data
27 * ``_osqp_constraints() -> (A, l, u)`` — OSQP constraint data
28 * ``_nnls_solve() -> (w, 1)`` — NNLS direct solve
30 All ``solve_*`` methods are implemented here as template methods that
31 call ``_constraint_active_set`` with the appropriate ``_XXX_step``
32 method, then optionally clip-and-renormalize.
33 """
35 X: np.ndarray
36 target: np.ndarray | None = None
37 alpha: float = 0.0
38 rho: float = 0.0
39 mu: np.ndarray | None = None
40 target_lr: tuple[float, np.ndarray, np.ndarray] | None = None # (bar_lam, U_k, delta_k) — low-rank + identity
41 pcg_lr: tuple[float, np.ndarray, np.ndarray] | None = None # (bar_lam, U_k, delta_k) — RMT preconditioner (§5.3)
43 def __post_init__(self) -> None:
44 """Validate target/target_lr shapes when supplied."""
45 n = self.n
46 if self.target is not None and self.target.shape != (n, n):
47 raise ValueError(f"target must be a square {n} x {n} matrix, got {self.target.shape}") # noqa: TRY003
48 if self.target_lr is not None:
49 _bar_lam, U_k, delta_k = self.target_lr # noqa: N806
50 if U_k.shape[0] != n or U_k.shape[1] != delta_k.shape[0]:
51 raise ValueError( # noqa: TRY003
52 f"target_lr: U_k must be ({n}, k) and delta_k (k,), got {U_k.shape}, {delta_k.shape}"
53 )
55 # ------------------------------------------------------------------
56 # Shared utilities
57 # ------------------------------------------------------------------
59 @property
60 def t(self) -> int:
61 """Return the number of rows in X."""
62 return int(self.X.shape[0])
64 @property
65 def n(self) -> int:
66 """Number of assets (columns of X)."""
67 return int(self.X.shape[1])
69 @staticmethod
70 def _clip_and_renormalize(w: np.ndarray) -> np.ndarray:
71 """Clip weights to ``[0, ∞)`` and renormalize to sum to 1."""
72 w = np.maximum(w, 0)
73 w /= w.sum()
74 return w
76 # ------------------------------------------------------------------
77 # Abstract hooks (raise NotImplementedError — subclasses must override)
78 # ------------------------------------------------------------------
79 @abstractmethod
80 def _constraint_active_set(
81 self,
82 solve_fn: Callable[[np.ndarray], tuple[np.ndarray, int]],
83 tol: float = 1e-6,
84 max_iter: int = 10_000,
85 ) -> tuple[np.ndarray, int, int]: # pragma: no cover
86 """Run the outer constraint-handling loop, calling ``solve_fn`` each iteration."""
87 raise NotImplementedError
89 @abstractmethod
90 def _kkt_step(self, active: np.ndarray) -> tuple[np.ndarray, int]: # pragma: no cover
91 """Solve one inner direct-KKT step; return ``(w, iters)``."""
92 raise NotImplementedError
94 @abstractmethod
95 def _cvxpy_constraints(self, w: cp.Variable, cp: object) -> list[Any]: # pragma: no cover
96 """Return the list of CVXPY constraints for ``solve_cvxpy``."""
97 raise NotImplementedError
99 @abstractmethod
100 def _cg_step(self, active: np.ndarray) -> tuple[np.ndarray, int]:
101 """Solve one inner CG step; return ``(w, iters)``."""
102 raise NotImplementedError # pragma: no cover
104 def _pcg_step(self, active: np.ndarray, x0: np.ndarray | None = None) -> tuple[np.ndarray, int]: # pragma: no cover
105 """Solve one inner PCG step with RMT preconditioner; return ``(w, iters)``.
107 Subclasses that support PCG (e.g. ``_MinVarProblem``) override this.
108 The base implementation raises so callers get a clear error if PCG is
109 invoked on a problem type that has not implemented it.
110 """
111 raise NotImplementedError
113 @abstractmethod
114 def _nnls_solve(self) -> tuple[np.ndarray, int]: # pragma: no cover
115 """Solve via NNLS directly (no outer loop); return ``(w, 1)``."""
116 raise NotImplementedError
118 @abstractmethod
119 def _clarabel_constraints(self) -> tuple[csc_matrix, np.ndarray, list[Any]]: # pragma: no cover
120 """Return ``(A_mat, b_vec, cones)`` for the Clarabel QP solver."""
121 raise NotImplementedError
123 @abstractmethod
124 def _osqp_constraints(self) -> tuple[csc_matrix, np.ndarray, np.ndarray]: # pragma: no cover
125 """Return ``(A_mat, l_vec, u_vec)`` for the OSQP solver."""
126 raise NotImplementedError
128 # ------------------------------------------------------------------
129 # Template solvers
130 # ------------------------------------------------------------------
132 def solve_kkt(self, *, project: bool = True) -> tuple[np.ndarray, int]:
133 """Solve via the direct KKT system.
135 Args:
136 project: Clip weights to ``[0, ∞)`` and renormalize to sum to 1
137 after solving. Set to ``False`` for custom constraints.
139 Returns:
140 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and number of
141 outer iterations taken.
143 Examples:
144 >>> import numpy as np
145 >>> from fast_minimum_variance import Problem
146 >>> X = np.random.default_rng(0).standard_normal((100, 5))
147 >>> w, iters = Problem(X).solve_kkt()
148 >>> float(round(w.sum(), 10))
149 1.0
150 >>> bool((w >= 0).all())
151 True
152 """
153 w, outer, _inner = self._constraint_active_set(self._kkt_step)
154 if project:
155 w = self._clip_and_renormalize(w)
156 return w, outer
158 def solve_cvxpy(self, *, project: bool = True, backend: str = "clarabel") -> tuple[np.ndarray, int]:
159 """Solve via CVXPY with a configurable backend solver.
161 Requires the ``convex`` extra::
163 pip install fast-minimum-variance[convex]
165 Args:
166 project: Clip and renormalize after solving (see ``solve_kkt``).
167 backend: CVXPY solver name (default ``"clarabel"``; ``"osqp"`` is
168 also supported).
170 Returns:
171 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and solver
172 iteration count.
174 Examples:
175 >>> import numpy as np
176 >>> from fast_minimum_variance import Problem
177 >>> X = np.random.default_rng(0).standard_normal((100, 5))
178 >>> w, iters = Problem(X).solve_cvxpy()
179 >>> float(round(w.sum(), 6))
180 1.0
181 >>> bool((w >= -1e-6).all())
182 True
183 """
184 w = cp.Variable(self.n)
185 if self.target is not None:
186 # target is the penalty matrix M; decompose as M = chol chol^T so ||chol^T w||^2 = w^T M w
187 chol = cholesky(self.target)
188 objective = (1.0 - self.alpha) * cp.sum_squares(self.X @ w) / self.t + self.alpha * cp.sum_squares(
189 chol.T @ w
190 )
191 else:
192 objective = cp.sum_squares(self.X @ w) / self.t
193 if self.rho != 0.0 and self.mu is not None:
194 objective = objective - self.rho * (self.mu @ w)
196 cvxpy_solver = cp.CLARABEL if backend.lower() == "clarabel" else cp.OSQP
197 problem = cp.Problem(cp.Minimize(objective), self._cvxpy_constraints(w, cp))
198 problem.solve(solver=cvxpy_solver)
200 result = w.value
201 if result is None:
202 raise RuntimeError("CVXPY solver failed to find a solution") # noqa: TRY003
203 if project:
204 result = self._clip_and_renormalize(result)
205 return result, int(problem.solver_stats.num_iters or 0)
207 def solve_cg(self, *, project: bool = True) -> tuple[np.ndarray, int, int]:
208 """Solve via matrix-free conjugate gradients.
210 Args:
211 project: Clip weights to ``[0, ∞)`` and renormalize to sum to 1
212 after solving. Set to ``False`` for custom constraints.
214 Returns:
215 ``(w, outer_steps, inner_iters)`` — weight vector, number of outer
216 active-set steps, and total CG iterations summed across all steps.
218 Examples:
219 >>> import numpy as np
220 >>> from fast_minimum_variance import Problem
221 >>> X = np.random.default_rng(0).standard_normal((100, 5))
222 >>> w, outer, inner = Problem(X).solve_cg()
223 >>> float(round(w.sum(), 10))
224 1.0
225 >>> bool((w >= 0).all())
226 True
227 """
228 w, outer, inner = self._constraint_active_set(self._cg_step)
229 if project:
230 w = self._clip_and_renormalize(w)
231 return w, outer, inner
233 def solve_pcg(self, *, project: bool = True) -> tuple[np.ndarray, int, int]:
234 """Solve via matrix-free PCG with RMT preconditioner (Section 5.3).
236 Solves ``Sigma_LW_oracle x = 1`` using ``T0^RMT`` as preconditioner.
237 Requires ``pcg_lr = (bar_lam, U_k, delta_k)`` from RMT preprocessing.
238 The preconditioner is applied via the Woodbury identity at O(nk) per step;
239 the system matvec costs O(nT). Returns the oracle-LW minimum-variance
240 portfolio — not the RMT portfolio — in O(sqrt(1/alpha_oracle)) iterations.
242 Returns:
243 ``(w, outer_steps, inner_iters)``
245 Examples:
246 >>> import numpy as np
247 >>> from fast_minimum_variance import Problem
248 >>> rng = np.random.default_rng(0)
249 >>> X = rng.standard_normal((100, 5))
250 >>> bar_lam = float(np.trace(X.T @ X / 100) / 5)
251 >>> U_k = np.eye(5, 2)
252 >>> delta_k = np.array([0.1, 0.05])
253 >>> w, outer, inner = Problem(X, alpha=0.1, pcg_lr=(bar_lam, U_k, delta_k)).solve_pcg()
254 >>> float(round(w.sum(), 10))
255 1.0
256 >>> bool((w >= 0).all())
257 True
258 """
259 if self.pcg_lr is None:
260 raise ValueError("pcg_lr must be set; pass pcg_lr=(bar_lam, U_k, delta_k)") # noqa: TRY003
261 w, outer, inner = self._constraint_active_set(self._pcg_step)
262 if project:
263 w = self._clip_and_renormalize(w)
264 return w, outer, inner
266 def solve_nnls(self, *, project: bool = True) -> tuple[np.ndarray, int]:
267 """Solve via non-negative least squares (scipy.optimize.nnls).
269 The budget constraint is enforced by augmenting the return matrix
270 with a heavily weighted all-ones row; non-negativity is handled
271 natively by the Lawson-Hanson algorithm. The covariance matrix
272 ``X'X`` is formed internally by scipy. Return tilt (``rho != 0``)
273 is not supported.
275 Args:
276 project: Renormalize weights to sum to 1 after solving.
277 Clipping is a no-op (NNLS already gives ``w >= 0``).
279 Returns:
280 ``(w, 1)`` — weight vector of shape ``(N,)`` and iteration
281 count (always 1; NNLS is a single direct solve).
283 Examples:
284 >>> import numpy as np
285 >>> from fast_minimum_variance import Problem
286 >>> X = np.random.default_rng(0).standard_normal((100, 5))
287 >>> w, iters = Problem(X).solve_nnls()
288 >>> float(round(w.sum(), 10))
289 1.0
290 >>> bool((w >= 0).all())
291 True
292 """
293 w, iters = self._nnls_solve()
294 if project:
295 w = self._clip_and_renormalize(w)
296 return w, iters
298 def solve_osqp(self, *, project: bool = True) -> tuple[np.ndarray, int]:
299 """Solve via OSQP (operator-splitting QP solver).
301 Assembles ``P = 2·Σ_LW`` as a sparse upper-triangular CSC matrix and
302 calls OSQP directly. Returns ``(w, iters)`` where ``iters`` is the
303 OSQP iteration count.
305 Args:
306 project: Clip and renormalize after solving (see ``solve_kkt``).
308 Returns:
309 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and number of
310 OSQP iterations.
312 Examples:
313 >>> import numpy as np
314 >>> from fast_minimum_variance import Problem
315 >>> X = np.random.default_rng(0).standard_normal((100, 5))
316 >>> w, iters = Problem(X).solve_osqp()
317 >>> float(round(w.sum(), 6))
318 1.0
319 >>> bool((w >= -1e-6).all())
320 True
321 """
322 n = self.n
324 if self.target is None:
325 p_dense = 2.0 * ((self.X.T @ self.X) / self.t)
326 else:
327 p_dense = 2.0 * ((1 - self.alpha) * (self.X.T @ self.X) / self.t + self.alpha * self.target)
329 p_upper = triu(csc_matrix(p_dense), format="csc")
331 q = np.zeros(n)
332 if self.rho != 0.0 and self.mu is not None:
333 q = -self.rho * self.mu
335 a_mat, l_vec, u_vec = self._osqp_constraints()
337 solver = osqp.OSQP()
338 solver.setup(
339 p_upper,
340 q,
341 a_mat,
342 l_vec,
343 u_vec,
344 warm_starting=True,
345 verbose=False,
346 eps_abs=1e-8,
347 eps_rel=1e-8,
348 )
349 result = solver.solve()
351 w = np.array(result.x)
352 if project:
353 w = self._clip_and_renormalize(w)
354 return w, result.info.iter
356 def solve_clarabel(self, *, project: bool = True) -> tuple[np.ndarray, int]:
357 """Solve via Clarabel interior-point solver (direct API, no CVXPY overhead).
359 Assembles ``P = 2·Σ_LW`` as a sparse CSC matrix and calls Clarabel
360 directly, bypassing CVXPY's problem-construction overhead. The
361 problem-specific constraint data is supplied by ``_clarabel_constraints``.
362 Returns ``(w, iters)`` where ``iters`` is the Clarabel iteration count.
364 Args:
365 project: Clip and renormalize after solving (see ``solve_kkt``).
367 Returns:
368 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and number of
369 interior-point iterations.
371 Examples:
372 >>> import numpy as np
373 >>> from fast_minimum_variance import Problem
374 >>> X = np.random.default_rng(0).standard_normal((100, 5))
375 >>> w, iters = Problem(X).solve_clarabel()
376 >>> float(round(w.sum(), 6))
377 1.0
378 >>> bool((w >= -1e-6).all())
379 True
380 """
381 n = self.n
383 if self.target is None:
384 p_dense = 2.0 * ((self.X.T @ self.X) / self.t)
385 else:
386 p_dense = 2.0 * ((1 - self.alpha) * (self.X.T @ self.X) / self.t + self.alpha * self.target)
388 p_csc = csc_matrix(p_dense)
390 q = np.zeros(n)
391 if self.rho != 0.0 and self.mu is not None:
392 q = -self.rho * self.mu
394 a_mat, b_vec, cones = self._clarabel_constraints()
396 settings = clarabel.DefaultSettings() # ty:ignore[unresolved-attribute]
397 settings.verbose = False
398 sol = clarabel.DefaultSolver(p_csc, q, a_mat, b_vec, cones, settings).solve() # ty:ignore[unresolved-attribute]
400 w = np.array(sol.x)
401 if project:
402 w = self._clip_and_renormalize(w)
403 return w, sol.iterations
405 def solve_proximal(self, *, project: bool = True) -> tuple[np.ndarray, int]:
406 """Solve via proximal gradient descent projected onto the probability simplex.
408 Minimises ``0.5 * w^T Σ_LW w`` subject to ``w >= 0, sum(w) = 1``.
409 The gradient is computed in two separate terms so that the per-step
410 cost remains ``O(nT)`` regardless of whether shrinkage is applied:
412 grad = (1-alpha)/T * X^T(Xw) + alpha * target @ w
414 This avoids stacking a (T+n)xn matrix, which would inflate per-step
415 cost by O(n) under shrinkage. Return tilt (``rho != 0``) is not supported.
417 Args:
418 project: Clip and renormalize after solving (see ``solve_kkt``).
420 Returns:
421 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and the number
422 of gradient steps taken.
424 Examples:
425 >>> import numpy as np
426 >>> from fast_minimum_variance import Problem
427 >>> X = np.random.default_rng(0).standard_normal((100, 5))
428 >>> w, iters = Problem(X).solve_proximal()
429 >>> float(round(w.sum(), 10))
430 1.0
431 >>> bool((w >= 0).all())
432 True
433 """
434 from .proximal import prox_gradient
436 extra_grad: Callable[[np.ndarray], np.ndarray] | None
437 if self.target is not None and self.alpha > 0.0:
438 c = 1.0 - self.alpha
439 mat = np.sqrt(c) / np.sqrt(self.t) * self.X
440 alpha, target = self.alpha, self.target
442 def extra_grad(v: np.ndarray, a: float = alpha, tgt: np.ndarray = target) -> np.ndarray:
443 """Return the shrinkage gradient contribution a * target @ v."""
444 result: np.ndarray = a * (tgt @ v)
445 return result
446 else:
447 mat = self.X / np.sqrt(self.t)
448 extra_grad = None
450 vec = np.zeros(self.t)
451 w, n_iters = prox_gradient(mat, vec, extra_grad=extra_grad)
452 if project:
453 w = self._clip_and_renormalize(w)
454 return w, n_iters
456 def solve_fista(self, *, project: bool = True) -> tuple[np.ndarray, int]:
457 r"""Solve via Nesterov-accelerated proximal gradient (FISTA).
459 Uses the Beck-Teboulle momentum sequence to achieve $O(1/k^2)$
460 convergence for convex objectives; for strongly convex $f$ with
461 condition number $\\kappa$ the linear rate is $(1-1/\\sqrt{\\kappa})^k$,
462 matching CG's asymptotic iteration count. Same per-step cost
463 $O(nT)$ as ``solve_proximal``, typically 2--10$\\times$ fewer
464 iterations.
466 Args:
467 project: Clip and renormalize after solving (see ``solve_proximal``).
469 Returns:
470 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and the number
471 of gradient steps taken.
473 Examples:
474 >>> import numpy as np
475 >>> from fast_minimum_variance import Problem
476 >>> X = np.random.default_rng(0).standard_normal((100, 5))
477 >>> w, iters = Problem(X).solve_fista()
478 >>> float(round(w.sum(), 10))
479 1.0
480 >>> bool((w >= 0).all())
481 True
482 """
483 from .proximal import fista_gradient
485 extra_grad: Callable[[np.ndarray], np.ndarray] | None
486 if self.target is not None and self.alpha > 0.0:
487 c = 1.0 - self.alpha
488 mat = np.sqrt(c) / np.sqrt(self.t) * self.X
489 alpha, target = self.alpha, self.target
491 def extra_grad(v: np.ndarray, a: float = alpha, tgt: np.ndarray = target) -> np.ndarray:
492 """Return the shrinkage gradient contribution a * target @ v."""
493 result: np.ndarray = a * (tgt @ v)
494 return result
495 else:
496 mat = self.X / np.sqrt(self.t)
497 extra_grad = None
499 vec = np.zeros(self.t)
500 w, n_iters = fista_gradient(mat, vec, extra_grad=extra_grad)
501 if project:
502 w = self._clip_and_renormalize(w)
503 return w, n_iters