Coverage for src / fast_minimum_variance / minvar_problem.py: 100%
115 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-09 14:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-09 14:08 +0000
1"""Minimum-variance solver: primal asset elimination with dual-feasibility check."""
3from dataclasses import dataclass
5import clarabel
6import numpy as np
7from scipy.optimize import nnls
8from scipy.sparse import csc_matrix, eye, vstack
9from scipy.sparse.linalg import LinearOperator, cg
11from ._base import _BaseProblem
14@dataclass(frozen=True)
15class _MinVarProblem(_BaseProblem):
16 """Minimum-variance portfolio solver via primal-dual active-set iteration.
18 Solves::
20 min (1-alpha)||X w||^2 + alpha*(||X||_F^2/N)*||w||^2 - rho*mu^T w
21 s.t. 1^T w = 1, w >= 0
23 Each inner step solves the equality-constrained subproblem over the current
24 active asset set. Stationarity gives ``2*Sigma_a*w_a = lambda*1 + rho*mu_a``
25 where ``Sigma_a = (1-alpha)*X_a^T X_a + ridge*I``. Solving the ``n_a x n_a``
26 SPD system ``Sigma_a v = 1`` (and ``Sigma_a v2 = mu_a`` when ``rho != 0``)
27 and recovering ``lambda`` from the budget constraint avoids the indefinite
28 ``(n_a+1) x (n_a+1)`` saddle-point system entirely. The outer primal-dual
29 loop enforces ``w >= 0`` and terminates when both primal and dual feasibility
30 hold simultaneously.
32 Use ``alpha = N/(N+T)`` for Ledoit-Wolf shrinkage intensity::
34 T, N = X.shape
35 w, iters = Problem(X, alpha=N/(N+T)).solve_kkt()
37 Examples:
38 >>> import numpy as np
39 >>> from fast_minimum_variance import Problem
40 >>> X = np.random.default_rng(0).standard_normal((100, 5))
41 >>> w, iters = Problem(X).solve_kkt()
42 >>> float(round(w.sum(), 6))
43 1.0
44 >>> bool((w >= 0).all())
45 True
46 """
48 # No extra fields — X, alpha, rho, mu all inherited from _BaseProblem.
50 # ------------------------------------------------------------------
51 # Outer loop: primal elimination + dual feasibility check
52 # ------------------------------------------------------------------
53 def _constraint_active_set(self, solve_fn, tol=1e-6, max_iter=10_000):
54 """Run the primal-dual active-set loop enforcing ``w >= 0``.
56 Calls ``solve_fn(active_mask)`` repeatedly. The *primal step* drops assets
57 with negative weights; the *dual step* re-adds any excluded asset whose KKT
58 gradient condition is violated. Terminates when both conditions hold
59 simultaneously, which together with stationarity is sufficient for global
60 optimality.
61 """
62 n = self.n
63 asset_active = np.ones(n, dtype=bool)
64 total_iters = 0
66 prev_active = None
68 for _ in range(max_iter):
69 if prev_active is not None and np.array_equal(prev_active, asset_active):
70 break # pragma: no cover - structurally unreachable safety guard
71 prev_active = asset_active.copy()
73 # === Solve ===
74 w_a, step_iters = solve_fn(asset_active)
75 total_iters += step_iters
77 # === PRIMAL STEP ===
78 neg = w_a < -tol
79 if np.any(neg):
80 idx = np.where(asset_active)[0]
82 strong = w_a < -10 * tol
84 if np.any(strong):
85 asset_active[idx[strong]] = False
86 else:
87 j = idx[np.argmin(w_a)]
88 asset_active[j] = False
90 continue # CRITICAL
92 # === Assemble full vector ===
93 w = np.zeros(n)
94 w[asset_active] = w_a
96 # === Gradient ===
97 if self.target is None:
98 grad = 2.0 * (self.X.T @ (self.X @ w)) / self.t
99 else:
100 grad = 2.0 * ((1 - self.alpha) * (self.X.T @ (self.X @ w)) / self.t + self.alpha * self.target @ w)
102 if self.rho != 0.0 and self.mu is not None:
103 grad = grad - self.rho * self.mu
105 # === Lambda ===
106 g_a = grad[asset_active]
107 lambda_ = np.median(g_a) if g_a.size > 5 else g_a.mean()
109 # === Dual ===
110 nu = grad - lambda_
112 excluded = ~asset_active
113 if not excluded.any():
114 break
116 nu_ex = nu[excluded]
117 idx_ex = np.where(excluded)[0]
119 j = idx_ex[np.argmin(nu_ex)]
120 violate = nu[j]
122 # === DUAL STEP ===
123 if violate >= -tol:
124 break
126 asset_active[j] = True
128 return w, total_iters
130 # ------------------------------------------------------------------
131 # Inner steps
132 # ------------------------------------------------------------------
134 def _kkt_step(self, active):
135 """Solve the reduced SPD system directly; return ``(w_a, 1)``.
137 Stationarity gives ``2*Sigma_a*w_a = lambda*1 + rho*mu_a``. A single
138 solve with two RHS columns yields ``v1 = Sigma_a^{-1} 1`` and
139 ``v2 = Sigma_a^{-1} mu_a``; the budget constraint then pins ``lambda``
140 analytically as ``lambda = 2*(1 - rho/2 * sum(v2)) / sum(v1)``.
141 """
142 x_a = self.X[:, active]
143 n_a = int(active.sum())
144 if self.target is None:
145 sigma = (x_a.T @ x_a) / self.t
146 else:
147 sigma = (1.0 - self.alpha) * (x_a.T @ x_a) / self.t + self.alpha * self.target[np.ix_(active, active)]
149 if self.rho == 0.0 or self.mu is None:
150 v = np.linalg.solve(sigma, np.ones(n_a))
151 return v / v.sum(), 1
153 v1, v2 = np.linalg.solve(sigma, np.column_stack([np.ones(n_a), self.mu[active]])).T
154 half_rho = 0.5 * self.rho
155 half_lambda = (1.0 - half_rho * v2.sum()) / v1.sum()
156 return half_lambda * v1 + half_rho * v2, 1
158 def _cvxpy_constraints(self, w, cp):
159 """Return budget-equality and long-only inequality constraints for CVXPY."""
160 return [cp.sum(w) == 1, w >= 0]
162 def _cg_step(self, active):
163 """Solve the reduced SPD system via matrix-free CG; return ``(w_a, iters)``.
165 Builds a ``LinearOperator`` for ``v -> (1-alpha)*X_a'*(X_a*v) + gamma*v``
166 and runs conjugate gradients without ever forming ``Sigma_a`` explicitly.
167 """
168 x_a = self.X[:, active]
169 n_a = int(active.sum())
170 target_sub = self.target[np.ix_(active, active)] if self.target is not None else None
172 def matvec(v):
173 """Apply Sigma_LW matrix-free: v -> (1-alpha)/T * X_a'*(X_a*v) + alpha*target_a*v."""
174 if target_sub is None:
175 return (x_a.T @ (x_a @ v)) / self.t
176 return (1.0 - self.alpha) * (x_a.T @ (x_a @ v)) / self.t + self.alpha * (target_sub @ v)
178 op = LinearOperator((n_a, n_a), matvec=matvec, dtype=np.float64) # type: ignore[call-arg]
180 iters = [0]
182 def _count(_):
183 """Increment CG iteration counter for the first solve."""
184 iters[0] += 1
186 if self.rho == 0.0 or self.mu is None:
187 v, _ = cg(op, np.ones(n_a), callback=_count)
188 return v / v.sum(), iters[0]
190 iters2 = [0]
192 def _count2(_):
193 """Increment CG iteration counter for the second solve."""
194 iters2[0] += 1
196 v1, _ = cg(op, np.ones(n_a), callback=_count)
197 v2, _ = cg(op, self.mu[active], callback=_count2)
198 half_rho = 0.5 * self.rho
199 half_lambda = (1.0 - half_rho * v2.sum()) / v1.sum()
200 return half_lambda * v1 + half_rho * v2, iters[0] + iters2[0]
202 def _clarabel_constraints(self):
203 """Return budget-equality and long-only inequality constraints for Clarabel."""
204 n = self.n
205 a_mat = vstack(
206 [csc_matrix(np.ones((1, n))), -eye(n, format="csc")],
207 format="csc",
208 )
210 b_vec = np.concatenate([[1.0], np.zeros(n)])
211 cones = [clarabel.ZeroConeT(1), clarabel.NonnegativeConeT(n)] # type: ignore[attr-defined]
212 return a_mat, b_vec, cones
214 def _osqp_constraints(self):
215 """Return budget-equality and long-only inequality constraints for OSQP."""
216 n = self.n
217 a_mat = vstack(
218 [csc_matrix(np.ones((1, n))), eye(n, format="csc")],
219 format="csc",
220 )
221 l_vec = np.concatenate([[1.0], np.zeros(n)])
222 u_vec = np.concatenate([[1.0], np.full(n, np.inf)])
223 return a_mat, l_vec, u_vec
225 def _nnls_solve(self):
226 """Solve via NNLS on the augmented return matrix; return ``(w, 1)``.
228 Builds ``A = [sqrt(1-alpha)*X ; sqrt(gamma)*I ; M*ones^T]`` and
229 solves ``min ||Aw||² s.t. w >= 0``. The budget row with weight
230 ``M = ||X||_F * T`` enforces ``ones^T w ≈ 1``; exact normalisation
231 is applied by the ``project`` step in ``solve_nnls``.
232 Return tilt (``rho != 0``) is not supported.
233 """
234 t = self.X.shape[0]
235 m = float(np.linalg.norm(self.X, "fro")) * t
237 if self.target is not None:
238 # target is the penalty matrix M; Cholesky gives chol s.t. chol @ chol.T = M,
239 # so sqrt(alpha)*chol.T rows enforce alpha * w^T M w in the LS objective.
240 chol = np.linalg.cholesky(self.target)
241 rows = [np.sqrt((1 - self.alpha) / self.t) * self.X]
242 tgt = [np.zeros(t)]
243 if self.alpha > 0.0:
244 rows.append(np.sqrt(self.alpha) * chol.T)
245 tgt.append(np.zeros(self.n))
246 else:
247 rows = [np.sqrt(1.0 / self.t) * self.X]
248 tgt = [np.zeros(t)]
249 rows.append(m * np.ones((1, self.n)))
250 tgt.append(np.array([m]))
252 w, _ = nnls(np.vstack(rows), np.concatenate(tgt))
253 return w, 1