Coverage for src / fast_minimum_variance / _base.py: 100%
96 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-09 14:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-09 14:08 +0000
1"""Common base for portfolio-optimisation problem classes."""
3from abc import ABC, abstractmethod
4from dataclasses import dataclass
6import clarabel
7import cvxpy as cp
8import numpy as np
9import osqp
10from scipy.sparse import csc_matrix, triu
13@dataclass(frozen=True)
14class _BaseProblem(ABC):
15 """Shared fields, utilities, and solver templates for portfolio problems.
17 Subclasses must implement the seven abstract hooks:
19 * ``_constraint_active_set(solve_fn)`` — outer constraint-handling loop
20 * ``_kkt_step(mask) -> (w, iters)`` — one direct-KKT inner step
21 * ``_cg_step(mask) -> (w, iters)`` — one CG inner step
22 * ``_cvxpy_constraints(w, cp) -> list`` — CVXPY constraint list
23 * ``_clarabel_constraints() -> (A, b, cones)`` — Clarabel constraint data
24 * ``_osqp_constraints() -> (A, l, u)`` — OSQP constraint data
25 * ``_nnls_solve() -> (w, 1)`` — NNLS direct solve
27 All ``solve_*`` methods are implemented here as template methods that
28 call ``_constraint_active_set`` with the appropriate ``_XXX_step``
29 method, then optionally clip-and-renormalize.
30 """
32 X: np.ndarray
33 target: np.ndarray | None = None
34 alpha: float = 0.0
35 rho: float = 0.0
36 mu: np.ndarray | None = None
38 def __post_init__(self):
39 """Validate target shape when supplied; shrinkage is only active when target is not None."""
40 n = self.n
41 if self.target is not None and self.target.shape != (n, n):
42 raise ValueError(f"target must be a square {n} x {n} matrix, got {self.target.shape}") # noqa: TRY003
44 # ------------------------------------------------------------------
45 # Shared utilities
46 # ------------------------------------------------------------------
48 @property
49 def t(self) -> int:
50 """Return the number of rows in X."""
51 return self.X.shape[0]
53 @property
54 def n(self) -> int:
55 """Number of assets (columns of X)."""
56 return self.X.shape[1]
58 @staticmethod
59 def _clip_and_renormalize(w: np.ndarray) -> np.ndarray:
60 """Clip weights to ``[0, ∞)`` and renormalize to sum to 1."""
61 w = np.maximum(w, 0)
62 w /= w.sum()
63 return w
65 # ------------------------------------------------------------------
66 # Abstract hooks (raise NotImplementedError — subclasses must override)
67 # ------------------------------------------------------------------
68 @abstractmethod
69 def _constraint_active_set(self, solve_fn, tol=1e-6, max_iter=10_000): # pragma: no cover
70 """Run the outer constraint-handling loop, calling ``solve_fn`` each iteration."""
71 raise NotImplementedError
73 @abstractmethod
74 def _kkt_step(self, active): # pragma: no cover
75 """Solve one inner direct-KKT step; return ``(w, iters)``."""
76 raise NotImplementedError
78 @abstractmethod
79 def _cvxpy_constraints(self, w, cp): # pragma: no cover
80 """Return the list of CVXPY constraints for ``solve_cvxpy``."""
81 raise NotImplementedError
83 @abstractmethod
84 def _cg_step(self, active):
85 """Solve one inner CG step; return ``(w, iters)``."""
86 raise NotImplementedError # pragma: no cover
88 @abstractmethod
89 def _nnls_solve(self): # pragma: no cover
90 """Solve via NNLS directly (no outer loop); return ``(w, 1)``."""
91 raise NotImplementedError
93 @abstractmethod
94 def _clarabel_constraints(self): # pragma: no cover
95 """Return ``(A_mat, b_vec, cones)`` for the Clarabel QP solver."""
96 raise NotImplementedError
98 @abstractmethod
99 def _osqp_constraints(self): # pragma: no cover
100 """Return ``(A_mat, l_vec, u_vec)`` for the OSQP solver."""
101 raise NotImplementedError
103 # ------------------------------------------------------------------
104 # Template solvers
105 # ------------------------------------------------------------------
107 def solve_kkt(self, *, project: bool = True):
108 """Solve via the direct KKT system.
110 Args:
111 project: Clip weights to ``[0, ∞)`` and renormalize to sum to 1
112 after solving. Set to ``False`` for custom constraints.
114 Returns:
115 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and number of
116 outer iterations taken.
118 Examples:
119 >>> import numpy as np
120 >>> from fast_minimum_variance import Problem
121 >>> X = np.random.default_rng(0).standard_normal((100, 5))
122 >>> w, iters = Problem(X).solve_kkt()
123 >>> float(round(w.sum(), 10))
124 1.0
125 >>> bool((w >= 0).all())
126 True
127 """
128 w, iters = self._constraint_active_set(self._kkt_step)
129 if project:
130 w = self._clip_and_renormalize(w)
131 return w, iters
133 def solve_cvxpy(self, *, project: bool = True):
134 """Solve via CVXPY / Clarabel (reference interior-point solver).
136 Requires the ``convex`` extra::
138 pip install fast-minimum-variance[convex]
140 Args:
141 project: Clip and renormalize after solving (see ``solve_kkt``).
143 Returns:
144 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and Clarabel
145 iteration count.
147 Examples:
148 >>> import numpy as np
149 >>> from fast_minimum_variance import Problem
150 >>> X = np.random.default_rng(0).standard_normal((100, 5))
151 >>> w, iters = Problem(X).solve_cvxpy()
152 >>> float(round(w.sum(), 6))
153 1.0
154 >>> bool((w >= -1e-6).all())
155 True
156 """
157 w = cp.Variable(self.n)
158 if self.target is not None:
159 # target is the penalty matrix M; decompose as M = chol chol^T so ||chol^T w||^2 = w^T M w
160 chol = np.linalg.cholesky(self.target)
161 objective = (1.0 - self.alpha) * cp.sum_squares(self.X @ w) / self.t + self.alpha * cp.sum_squares(
162 chol.T @ w
163 )
164 else:
165 objective = cp.sum_squares(self.X @ w) / self.t
166 if self.rho != 0.0 and self.mu is not None:
167 objective = objective - self.rho * (self.mu @ w)
169 problem = cp.Problem(cp.Minimize(objective), self._cvxpy_constraints(w, cp))
170 problem.solve(solver=cp.CLARABEL)
172 result = w.value
173 if result is None:
174 raise RuntimeError("CVXPY solver failed to find a solution") # noqa: TRY003
175 if project:
176 result = self._clip_and_renormalize(result)
177 return result, problem.solver_stats.num_iters
179 def solve_cg(self, *, project: bool = True):
180 """Solve via matrix-free conjugate gradients.
182 Args:
183 project: Clip weights to ``[0, ∞)`` and renormalize to sum to 1
184 after solving. Set to ``False`` for custom constraints.
186 Returns:
187 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and total CG
188 iteration count across all outer active-set steps.
190 Examples:
191 >>> import numpy as np
192 >>> from fast_minimum_variance import Problem
193 >>> X = np.random.default_rng(0).standard_normal((100, 5))
194 >>> w, iters = Problem(X).solve_cg()
195 >>> float(round(w.sum(), 10))
196 1.0
197 >>> bool((w >= 0).all())
198 True
199 """
200 w, iters = self._constraint_active_set(self._cg_step)
201 if project:
202 w = self._clip_and_renormalize(w)
203 return w, iters
205 def solve_nnls(self, *, project: bool = True):
206 """Solve via non-negative least squares (scipy.optimize.nnls).
208 The budget constraint is enforced by augmenting the return matrix
209 with a heavily weighted all-ones row; non-negativity is handled
210 natively by the Lawson-Hanson algorithm. The covariance matrix
211 ``X'X`` is formed internally by scipy. Return tilt (``rho != 0``)
212 is not supported.
214 Args:
215 project: Renormalize weights to sum to 1 after solving.
216 Clipping is a no-op (NNLS already gives ``w >= 0``).
218 Returns:
219 ``(w, 1)`` — weight vector of shape ``(N,)`` and iteration
220 count (always 1; NNLS is a single direct solve).
222 Examples:
223 >>> import numpy as np
224 >>> from fast_minimum_variance import Problem
225 >>> X = np.random.default_rng(0).standard_normal((100, 5))
226 >>> w, iters = Problem(X).solve_nnls()
227 >>> float(round(w.sum(), 10))
228 1.0
229 >>> bool((w >= 0).all())
230 True
231 """
232 w, iters = self._nnls_solve()
233 if project:
234 w = self._clip_and_renormalize(w)
235 return w, iters
237 def solve_clarabel(self, *, project: bool = True):
238 """Solve via Clarabel interior-point solver (direct API, no CVXPY overhead).
240 Assembles ``P = 2·Σ_LW`` as a sparse CSC matrix and calls Clarabel
241 directly, bypassing CVXPY's problem-construction overhead. The
242 problem-specific constraint data is supplied by ``_clarabel_constraints``.
243 Returns ``(w, iters)`` where ``iters`` is the Clarabel iteration count.
245 Args:
246 project: Clip and renormalize after solving (see ``solve_kkt``).
248 Returns:
249 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and number of
250 interior-point iterations.
252 Examples:
253 >>> import numpy as np
254 >>> from fast_minimum_variance import Problem
255 >>> X = np.random.default_rng(0).standard_normal((100, 5))
256 >>> w, iters = Problem(X).solve_clarabel()
257 >>> float(round(w.sum(), 6))
258 1.0
259 >>> bool((w >= -1e-6).all())
260 True
261 """
262 n = self.n
264 if self.target is None:
265 p_dense = 2.0 * ((self.X.T @ self.X) / self.t)
266 else:
267 p_dense = 2.0 * ((1 - self.alpha) * (self.X.T @ self.X) / self.t + self.alpha * self.target)
269 p_csc = csc_matrix(p_dense)
271 q = np.zeros(n)
272 if self.rho != 0.0 and self.mu is not None:
273 q = -self.rho * self.mu
275 a_mat, b_vec, cones = self._clarabel_constraints()
277 settings = clarabel.DefaultSettings() # type: ignore[attr-defined]
278 settings.verbose = False
279 sol = clarabel.DefaultSolver(p_csc, q, a_mat, b_vec, cones, settings).solve() # type: ignore[attr-defined]
281 w = np.array(sol.x)
282 if project:
283 w = self._clip_and_renormalize(w)
284 return w, sol.iterations
286 def solve_osqp(self, *, project: bool = True):
287 """Solve via OSQP (operator-splitting QP solver, direct API, no CVXPY overhead).
289 Assembles ``P = 2·Σ_LW`` as a sparse upper-triangular CSC matrix and
290 calls OSQP directly. The problem-specific constraint data is supplied
291 by ``_osqp_constraints``. Returns ``(w, iters)`` where ``iters`` is
292 the number of ADMM iterations.
294 Args:
295 project: Clip and renormalize after solving (see ``solve_kkt``).
297 Returns:
298 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and number of
299 ADMM iterations.
301 Examples:
302 >>> import numpy as np
303 >>> from fast_minimum_variance import Problem
304 >>> X = np.random.default_rng(0).standard_normal((100, 5))
305 >>> w, iters = Problem(X).solve_osqp()
306 >>> float(round(w.sum(), 6))
307 1.0
308 >>> bool((w >= -1e-6).all())
309 True
310 """
311 n = self.n
313 if self.target is None:
314 p_dense = 2.0 * ((self.X.T @ self.X) / self.t)
315 else:
316 p_dense = 2.0 * ((1 - self.alpha) * (self.X.T @ self.X) / self.t + self.alpha * self.target)
318 # p_dense = 2.0 * ((1-self.alpha) * (self.X.T @ self.X)/self.t + self.alpha * self.target)
319 p_upper = triu(p_dense, format="csc")
321 q = np.zeros(n)
322 if self.rho != 0.0 and self.mu is not None:
323 q = -self.rho * self.mu
325 a_mat, l_vec, u_vec = self._osqp_constraints()
327 prob = osqp.OSQP()
328 prob.setup(p_upper, q, a_mat, l_vec, u_vec, verbose=False)
329 res = prob.solve()
331 w = np.array(res.x)
332 if project:
333 w = self._clip_and_renormalize(w)
334 return w, res.info.iter