Coverage for src/fast_minimum_variance/problem.py: 100%
143 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-02 13:28 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-02 13:28 +0000
1"""General mean-variance portfolio problem with growing-constraint active-set."""
3from collections.abc import Callable
4from dataclasses import dataclass
5from typing import Any
7import clarabel
8import cvxpy as cp
9import numpy as np
10from cvx.linalg import cholesky
11from scipy.optimize import nnls
12from scipy.sparse import csc_matrix, vstack
13from scipy.sparse.linalg import LinearOperator, minres
15from ._base import _BaseProblem
18@dataclass(frozen=True)
19class _Problem(_BaseProblem):
20 """Mean-variance portfolio problem with arbitrary linear constraints.
22 Encodes the optimization problem::
24 min (1-alpha)||X w||^2 + alpha*(||X||_F^2/N)*||w||^2 - rho * mu^T w
25 s.t. A^T w = b (equality constraints)
26 C^T w <= d (inequality constraints)
28 The first term is the sample portfolio variance (X is the demeaned return
29 matrix of shape T x N). The ``alpha`` term adds a Ledoit-Wolf ridge
30 ``alpha * (||X||_F^2 / N) * I`` to the covariance, improving conditioning.
31 The ``rho * mu`` term tilts the portfolio toward higher-expected-return
32 assets (Markowitz).
34 Defaults reproduce the long-only minimum-variance problem:
36 * ``A = ones(N, 1)``, ``b = [1]`` — budget constraint: sum(w) = 1
37 * ``C = -I``, ``d = 0`` — long-only: -w <= 0, i.e. w >= 0
39 The active-set loop *adds* violated inequality constraints as equalities
40 (growing approach), operating on the full N-dimensional system throughout.
41 See :class:`~fast_minimum_variance.minvar_problem._MinVarProblem` for the
42 complementary shrinking approach optimised for the default long-only case.
44 Solvers::
46 w, iters = Problem(X, A=A, b=b).solve_kkt()
47 w, iters = Problem(X, A=A, b=b).solve_minres()
48 w, iters = Problem(X, A=A, b=b).solve_cg()
49 w, iters = Problem(X, A=A, b=b).solve_cvxpy() # requires [convex] extra
50 """
52 def _cg_step(self, active: np.ndarray) -> tuple[np.ndarray, int]:
53 """Solve the KKT saddle-point system via MINRES; return ``(w, iters)``."""
54 op, rhs = self._kkt_operator(active=active)
55 iters = [0]
57 def _count(_: np.ndarray) -> None:
58 """Increment iteration counter on each MINRES callback."""
59 iters[0] += 1
61 x, _ = minres(op, rhs, callback=_count)
62 return x[: self.n], iters[0]
64 A: np.ndarray | None = None
65 b: np.ndarray | None = None
66 C: np.ndarray | None = None
67 d: np.ndarray | None = None
69 def __post_init__(self) -> None:
70 """Fill in default constraint matrices when not supplied."""
71 super().__post_init__()
72 n = self.n
73 if self.A is None:
74 object.__setattr__(self, "A", np.ones((n, 1)))
75 if self.b is None:
76 object.__setattr__(self, "b", np.ones(1))
77 if self.C is None:
78 object.__setattr__(self, "C", -np.eye(n))
79 if self.d is None:
80 object.__setattr__(self, "d", np.zeros(n))
82 @property
83 def _m(self) -> int:
84 """Number of equality constraints."""
85 assert self.A is not None # noqa: S101
86 return int(self.A.shape[1])
88 # ------------------------------------------------------------------
89 # Active-set loop (growing: add violated inequality constraints)
90 # ------------------------------------------------------------------
92 def _constraint_active_set(
93 self,
94 solve_fn: Callable[[np.ndarray], tuple[np.ndarray, int]],
95 tol: float = 1e-6, # noqa: ARG002
96 max_iter: int = 10_000, # noqa: ARG002
97 ) -> tuple[np.ndarray, int, int]:
98 """Run the active-set loop, promoting violated inequalities to equalities."""
99 assert self.C is not None # noqa: S101
100 assert self.d is not None # noqa: S101
101 p = self.d.size
102 active = np.zeros(p, dtype=bool)
103 outer_steps = 0
104 total_iters = 0
106 while True:
107 w, step_iters = solve_fn(active)
108 violations = self.C[:, ~active].T @ w - self.d[~active]
109 outer_steps += 1
110 total_iters += step_iters
111 if np.all(violations <= 1e-10):
112 break
113 active[~active] |= violations > 1e-10
115 return w, outer_steps, total_iters
117 # ------------------------------------------------------------------
118 # Inner steps (called by the template solve_* methods on the base)
119 # ------------------------------------------------------------------
121 def _kkt_step(self, active: np.ndarray) -> tuple[np.ndarray, int]:
122 """Solve the full KKT system directly; return ``(w, 1)``."""
123 K, rhs = self._kkt(active=active) # noqa: N806
124 return np.linalg.solve(K, rhs)[: self.n], 1
126 def _cvxpy_constraints(self, w: cp.Variable, cp: object) -> list[Any]: # noqa: ARG002
127 """Return equality and inequality constraints for CVXPY."""
128 assert self.A is not None # noqa: S101
129 assert self.b is not None # noqa: S101
130 assert self.C is not None # noqa: S101
131 assert self.d is not None # noqa: S101
132 return [self.A.T @ w == self.b, self.C.T @ w <= self.d]
134 # ------------------------------------------------------------------
135 # Operator builders (also accessed directly by tests)
136 # ------------------------------------------------------------------
138 def _kkt(self, active: np.ndarray | None = None) -> tuple[np.ndarray, np.ndarray]:
139 """Build the (N+m) x (N+m) KKT saddle-point system."""
140 assert self.A is not None # noqa: S101
141 assert self.b is not None # noqa: S101
142 assert self.C is not None # noqa: S101
143 assert self.d is not None # noqa: S101
144 if active is None:
145 active = np.zeros(self.C.shape[1], dtype=bool)
146 A = np.hstack([self.A, self.C[:, active]]) # noqa: N806
147 b = np.concatenate([self.b, self.d[active]])
148 m = A.shape[1]
150 # ridge = self._ridge()
151 # oma = 1.0 - self.alpha
152 K = np.zeros((self.n + m, self.n + m)) # noqa: N806
153 if self.target is None:
154 K[: self.n, : self.n] = 2 * (self.X.T @ self.X) / self.t
155 else:
156 K[: self.n, : self.n] = 2 * ((1 - self.alpha) * (self.X.T @ self.X) / self.t + self.alpha * self.target)
157 K[: self.n, self.n :] = A
158 K[self.n :, : self.n] = A.T
160 rhs = np.zeros(self.n + m)
161 if self.rho != 0.0 and self.mu is not None:
162 rhs[: self.n] = self.rho * self.mu
163 rhs[self.n :] = b
165 return K, rhs
167 def _kkt_operator(self, active: np.ndarray | None = None) -> tuple[LinearOperator, np.ndarray]:
168 """Build the matrix-free KKT saddle-point operator and RHS for MINRES."""
169 assert self.A is not None # noqa: S101
170 assert self.b is not None # noqa: S101
171 assert self.C is not None # noqa: S101
172 assert self.d is not None # noqa: S101
173 if active is None:
174 active = np.zeros(self.C.shape[1], dtype=bool)
175 aa = np.hstack([self.A, self.C[:, active]])
176 na, ma = self.n, aa.shape[1]
178 def _matvec(
179 x: np.ndarray,
180 xx: np.ndarray = self.X,
181 a: np.ndarray = aa,
182 n_: int = na,
183 m_: int = ma,
184 ) -> np.ndarray:
185 """Apply the KKT saddle-point matrix to vector ``x``."""
186 out = np.empty(n_ + m_)
187 if self.target is None:
188 out[:n_] = 2.0 * (xx.T @ (xx @ x[:n_])) / self.t + a @ x[n_:]
189 else:
190 out[:n_] = (
191 2.0 * ((1.0 - self.alpha) * (xx.T @ (xx @ x[:n_])) / self.t + self.alpha * (self.target @ x[:n_]))
192 + a @ x[n_:]
193 )
194 out[n_:] = a.T @ x[:n_]
195 return out
197 rhs = np.zeros(na + ma)
198 if self.rho != 0.0 and self.mu is not None:
199 rhs[:na] = self.rho * self.mu
200 rhs[na:] = np.concatenate([self.b, self.d[active]])
202 op = LinearOperator(shape=(na + ma, na + ma), matvec=_matvec) # ty:ignore[missing-argument, unknown-argument]
203 return op, rhs
205 def _clarabel_constraints(self) -> tuple[csc_matrix, np.ndarray, list[Any]]:
206 """Return equality and inequality constraints for Clarabel."""
207 assert self.A is not None # noqa: S101
208 assert self.b is not None # noqa: S101
209 assert self.C is not None # noqa: S101
210 assert self.d is not None # noqa: S101
211 a_mat = vstack([csc_matrix(self.A.T), csc_matrix(self.C.T)], format="csc")
212 b_vec = np.concatenate([self.b, self.d])
213 cones = [clarabel.ZeroConeT(self._m), clarabel.NonnegativeConeT(len(self.d))] # ty:ignore[unresolved-attribute]
214 return a_mat, b_vec, cones
216 def _osqp_constraints(self) -> tuple[csc_matrix, np.ndarray, np.ndarray]:
217 """Return equality and inequality constraints for OSQP."""
218 assert self.A is not None # noqa: S101
219 assert self.b is not None # noqa: S101
220 assert self.C is not None # noqa: S101
221 assert self.d is not None # noqa: S101
222 a_mat = vstack([csc_matrix(self.A.T), csc_matrix(self.C.T)], format="csc")
223 l_vec = np.concatenate([self.b, np.full(len(self.d), -np.inf)])
224 u_vec = np.concatenate([self.b, self.d])
225 return a_mat, l_vec, u_vec
227 def _nnls_solve(self) -> tuple[np.ndarray, int]:
228 """Solve via NNLS on the augmented return matrix; return ``(w, 1)``.
230 Augments ``X`` with rows for the LW ridge term and all equality
231 constraints (scaled by ``M = ||X||_F * T``); non-negativity is
232 handled natively by Lawson-Hanson. Inequality constraints beyond
233 ``w >= 0`` are not enforced; use ``solve_kkt`` for general ``C``.
234 """
235 assert self.A is not None # noqa: S101
236 assert self.b is not None # noqa: S101
237 t = self.X.shape[0]
238 # oma = 1.0 - self.alpha
239 # gamma = self._ridge()
240 m = float(np.linalg.norm(self.X, "fro")) * t
242 if self.target is not None:
243 chol = cholesky(self.target)
244 rows = [np.sqrt((1 - self.alpha) / self.t) * self.X]
245 tgt = [np.zeros(t)]
246 if self.alpha > 0.0:
247 rows.append(np.sqrt(self.alpha) * chol.T)
248 tgt.append(np.zeros(self.n))
249 else:
250 rows = [np.sqrt(1.0 / self.t) * self.X]
251 tgt = [np.zeros(t)]
252 rows.append(m * self.A.T)
253 tgt.append(m * self.b)
255 w, _ = nnls(np.vstack(rows), np.concatenate(tgt))
256 return w, 1