Coverage for src/fast_minimum_variance/_base.py: 100%

145 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-02 13:28 +0000

1"""Common base for portfolio-optimisation problem classes.""" 

2 

3from abc import ABC, abstractmethod 

4from collections.abc import Callable 

5from dataclasses import dataclass 

6from typing import Any 

7 

8import clarabel 

9import cvxpy as cp 

10import numpy as np 

11import osqp 

12from cvx.linalg import cholesky 

13from scipy.sparse import csc_matrix, triu 

14 

15 

16@dataclass(frozen=True) 

17class _BaseProblem(ABC): 

18 """Shared fields, utilities, and solver templates for portfolio problems. 

19 

20 Subclasses must implement the seven abstract hooks: 

21 

22 * ``_constraint_active_set(solve_fn)`` — outer constraint-handling loop 

23 * ``_kkt_step(mask) -> (w, iters)`` — one direct-KKT inner step 

24 * ``_cg_step(mask) -> (w, iters)`` — one CG inner step 

25 * ``_cvxpy_constraints(w, cp) -> list`` — CVXPY constraint list 

26 * ``_clarabel_constraints() -> (A, b, cones)`` — Clarabel constraint data 

27 * ``_osqp_constraints() -> (A, l, u)`` — OSQP constraint data 

28 * ``_nnls_solve() -> (w, 1)`` — NNLS direct solve 

29 

30 All ``solve_*`` methods are implemented here as template methods that 

31 call ``_constraint_active_set`` with the appropriate ``_XXX_step`` 

32 method, then optionally clip-and-renormalize. 

33 """ 

34 

35 X: np.ndarray 

36 target: np.ndarray | None = None 

37 alpha: float = 0.0 

38 rho: float = 0.0 

39 mu: np.ndarray | None = None 

40 target_lr: tuple[float, np.ndarray, np.ndarray] | None = None # (bar_lam, U_k, delta_k) — low-rank + identity 

41 pcg_lr: tuple[float, np.ndarray, np.ndarray] | None = None # (bar_lam, U_k, delta_k) — RMT preconditioner (§5.3) 

42 

43 def __post_init__(self) -> None: 

44 """Validate target/target_lr shapes when supplied.""" 

45 n = self.n 

46 if self.target is not None and self.target.shape != (n, n): 

47 raise ValueError(f"target must be a square {n} x {n} matrix, got {self.target.shape}") # noqa: TRY003 

48 if self.target_lr is not None: 

49 _bar_lam, U_k, delta_k = self.target_lr # noqa: N806 

50 if U_k.shape[0] != n or U_k.shape[1] != delta_k.shape[0]: 

51 raise ValueError( # noqa: TRY003 

52 f"target_lr: U_k must be ({n}, k) and delta_k (k,), got {U_k.shape}, {delta_k.shape}" 

53 ) 

54 

55 # ------------------------------------------------------------------ 

56 # Shared utilities 

57 # ------------------------------------------------------------------ 

58 

59 @property 

60 def t(self) -> int: 

61 """Return the number of rows in X.""" 

62 return int(self.X.shape[0]) 

63 

64 @property 

65 def n(self) -> int: 

66 """Number of assets (columns of X).""" 

67 return int(self.X.shape[1]) 

68 

69 @staticmethod 

70 def _clip_and_renormalize(w: np.ndarray) -> np.ndarray: 

71 """Clip weights to ``[0, ∞)`` and renormalize to sum to 1.""" 

72 w = np.maximum(w, 0) 

73 w /= w.sum() 

74 return w 

75 

76 # ------------------------------------------------------------------ 

77 # Abstract hooks (raise NotImplementedError — subclasses must override) 

78 # ------------------------------------------------------------------ 

79 @abstractmethod 

80 def _constraint_active_set( 

81 self, 

82 solve_fn: Callable[[np.ndarray], tuple[np.ndarray, int]], 

83 tol: float = 1e-6, 

84 max_iter: int = 10_000, 

85 ) -> tuple[np.ndarray, int, int]: # pragma: no cover 

86 """Run the outer constraint-handling loop, calling ``solve_fn`` each iteration.""" 

87 raise NotImplementedError 

88 

89 @abstractmethod 

90 def _kkt_step(self, active: np.ndarray) -> tuple[np.ndarray, int]: # pragma: no cover 

91 """Solve one inner direct-KKT step; return ``(w, iters)``.""" 

92 raise NotImplementedError 

93 

94 @abstractmethod 

95 def _cvxpy_constraints(self, w: cp.Variable, cp: object) -> list[Any]: # pragma: no cover 

96 """Return the list of CVXPY constraints for ``solve_cvxpy``.""" 

97 raise NotImplementedError 

98 

99 @abstractmethod 

100 def _cg_step(self, active: np.ndarray) -> tuple[np.ndarray, int]: 

101 """Solve one inner CG step; return ``(w, iters)``.""" 

102 raise NotImplementedError # pragma: no cover 

103 

104 def _pcg_step(self, active: np.ndarray, x0: np.ndarray | None = None) -> tuple[np.ndarray, int]: # pragma: no cover 

105 """Solve one inner PCG step with RMT preconditioner; return ``(w, iters)``. 

106 

107 Subclasses that support PCG (e.g. ``_MinVarProblem``) override this. 

108 The base implementation raises so callers get a clear error if PCG is 

109 invoked on a problem type that has not implemented it. 

110 """ 

111 raise NotImplementedError 

112 

113 @abstractmethod 

114 def _nnls_solve(self) -> tuple[np.ndarray, int]: # pragma: no cover 

115 """Solve via NNLS directly (no outer loop); return ``(w, 1)``.""" 

116 raise NotImplementedError 

117 

118 @abstractmethod 

119 def _clarabel_constraints(self) -> tuple[csc_matrix, np.ndarray, list[Any]]: # pragma: no cover 

120 """Return ``(A_mat, b_vec, cones)`` for the Clarabel QP solver.""" 

121 raise NotImplementedError 

122 

123 @abstractmethod 

124 def _osqp_constraints(self) -> tuple[csc_matrix, np.ndarray, np.ndarray]: # pragma: no cover 

125 """Return ``(A_mat, l_vec, u_vec)`` for the OSQP solver.""" 

126 raise NotImplementedError 

127 

128 # ------------------------------------------------------------------ 

129 # Template solvers 

130 # ------------------------------------------------------------------ 

131 

132 def solve_kkt(self, *, project: bool = True) -> tuple[np.ndarray, int]: 

133 """Solve via the direct KKT system. 

134 

135 Args: 

136 project: Clip weights to ``[0, ∞)`` and renormalize to sum to 1 

137 after solving. Set to ``False`` for custom constraints. 

138 

139 Returns: 

140 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and number of 

141 outer iterations taken. 

142 

143 Examples: 

144 >>> import numpy as np 

145 >>> from fast_minimum_variance import Problem 

146 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

147 >>> w, iters = Problem(X).solve_kkt() 

148 >>> float(round(w.sum(), 10)) 

149 1.0 

150 >>> bool((w >= 0).all()) 

151 True 

152 """ 

153 w, outer, _inner = self._constraint_active_set(self._kkt_step) 

154 if project: 

155 w = self._clip_and_renormalize(w) 

156 return w, outer 

157 

158 def solve_cvxpy(self, *, project: bool = True, backend: str = "clarabel") -> tuple[np.ndarray, int]: 

159 """Solve via CVXPY with a configurable backend solver. 

160 

161 Requires the ``convex`` extra:: 

162 

163 pip install fast-minimum-variance[convex] 

164 

165 Args: 

166 project: Clip and renormalize after solving (see ``solve_kkt``). 

167 backend: CVXPY solver name (default ``"clarabel"``; ``"osqp"`` is 

168 also supported). 

169 

170 Returns: 

171 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and solver 

172 iteration count. 

173 

174 Examples: 

175 >>> import numpy as np 

176 >>> from fast_minimum_variance import Problem 

177 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

178 >>> w, iters = Problem(X).solve_cvxpy() 

179 >>> float(round(w.sum(), 6)) 

180 1.0 

181 >>> bool((w >= -1e-6).all()) 

182 True 

183 """ 

184 w = cp.Variable(self.n) 

185 if self.target is not None: 

186 # target is the penalty matrix M; decompose as M = chol chol^T so ||chol^T w||^2 = w^T M w 

187 chol = cholesky(self.target) 

188 objective = (1.0 - self.alpha) * cp.sum_squares(self.X @ w) / self.t + self.alpha * cp.sum_squares( 

189 chol.T @ w 

190 ) 

191 else: 

192 objective = cp.sum_squares(self.X @ w) / self.t 

193 if self.rho != 0.0 and self.mu is not None: 

194 objective = objective - self.rho * (self.mu @ w) 

195 

196 cvxpy_solver = cp.CLARABEL if backend.lower() == "clarabel" else cp.OSQP 

197 problem = cp.Problem(cp.Minimize(objective), self._cvxpy_constraints(w, cp)) 

198 problem.solve(solver=cvxpy_solver) 

199 

200 result = w.value 

201 if result is None: 

202 raise RuntimeError("CVXPY solver failed to find a solution") # noqa: TRY003 

203 if project: 

204 result = self._clip_and_renormalize(result) 

205 return result, int(problem.solver_stats.num_iters or 0) 

206 

207 def solve_cg(self, *, project: bool = True) -> tuple[np.ndarray, int, int]: 

208 """Solve via matrix-free conjugate gradients. 

209 

210 Args: 

211 project: Clip weights to ``[0, ∞)`` and renormalize to sum to 1 

212 after solving. Set to ``False`` for custom constraints. 

213 

214 Returns: 

215 ``(w, outer_steps, inner_iters)`` — weight vector, number of outer 

216 active-set steps, and total CG iterations summed across all steps. 

217 

218 Examples: 

219 >>> import numpy as np 

220 >>> from fast_minimum_variance import Problem 

221 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

222 >>> w, outer, inner = Problem(X).solve_cg() 

223 >>> float(round(w.sum(), 10)) 

224 1.0 

225 >>> bool((w >= 0).all()) 

226 True 

227 """ 

228 w, outer, inner = self._constraint_active_set(self._cg_step) 

229 if project: 

230 w = self._clip_and_renormalize(w) 

231 return w, outer, inner 

232 

233 def solve_pcg(self, *, project: bool = True) -> tuple[np.ndarray, int, int]: 

234 """Solve via matrix-free PCG with RMT preconditioner (Section 5.3). 

235 

236 Solves ``Sigma_LW_oracle x = 1`` using ``T0^RMT`` as preconditioner. 

237 Requires ``pcg_lr = (bar_lam, U_k, delta_k)`` from RMT preprocessing. 

238 The preconditioner is applied via the Woodbury identity at O(nk) per step; 

239 the system matvec costs O(nT). Returns the oracle-LW minimum-variance 

240 portfolio — not the RMT portfolio — in O(sqrt(1/alpha_oracle)) iterations. 

241 

242 Returns: 

243 ``(w, outer_steps, inner_iters)`` 

244 

245 Examples: 

246 >>> import numpy as np 

247 >>> from fast_minimum_variance import Problem 

248 >>> rng = np.random.default_rng(0) 

249 >>> X = rng.standard_normal((100, 5)) 

250 >>> bar_lam = float(np.trace(X.T @ X / 100) / 5) 

251 >>> U_k = np.eye(5, 2) 

252 >>> delta_k = np.array([0.1, 0.05]) 

253 >>> w, outer, inner = Problem(X, alpha=0.1, pcg_lr=(bar_lam, U_k, delta_k)).solve_pcg() 

254 >>> float(round(w.sum(), 10)) 

255 1.0 

256 >>> bool((w >= 0).all()) 

257 True 

258 """ 

259 if self.pcg_lr is None: 

260 raise ValueError("pcg_lr must be set; pass pcg_lr=(bar_lam, U_k, delta_k)") # noqa: TRY003 

261 w, outer, inner = self._constraint_active_set(self._pcg_step) 

262 if project: 

263 w = self._clip_and_renormalize(w) 

264 return w, outer, inner 

265 

266 def solve_nnls(self, *, project: bool = True) -> tuple[np.ndarray, int]: 

267 """Solve via non-negative least squares (scipy.optimize.nnls). 

268 

269 The budget constraint is enforced by augmenting the return matrix 

270 with a heavily weighted all-ones row; non-negativity is handled 

271 natively by the Lawson-Hanson algorithm. The covariance matrix 

272 ``X'X`` is formed internally by scipy. Return tilt (``rho != 0``) 

273 is not supported. 

274 

275 Args: 

276 project: Renormalize weights to sum to 1 after solving. 

277 Clipping is a no-op (NNLS already gives ``w >= 0``). 

278 

279 Returns: 

280 ``(w, 1)`` — weight vector of shape ``(N,)`` and iteration 

281 count (always 1; NNLS is a single direct solve). 

282 

283 Examples: 

284 >>> import numpy as np 

285 >>> from fast_minimum_variance import Problem 

286 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

287 >>> w, iters = Problem(X).solve_nnls() 

288 >>> float(round(w.sum(), 10)) 

289 1.0 

290 >>> bool((w >= 0).all()) 

291 True 

292 """ 

293 w, iters = self._nnls_solve() 

294 if project: 

295 w = self._clip_and_renormalize(w) 

296 return w, iters 

297 

298 def solve_osqp(self, *, project: bool = True) -> tuple[np.ndarray, int]: 

299 """Solve via OSQP (operator-splitting QP solver). 

300 

301 Assembles ``P = 2·Σ_LW`` as a sparse upper-triangular CSC matrix and 

302 calls OSQP directly. Returns ``(w, iters)`` where ``iters`` is the 

303 OSQP iteration count. 

304 

305 Args: 

306 project: Clip and renormalize after solving (see ``solve_kkt``). 

307 

308 Returns: 

309 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and number of 

310 OSQP iterations. 

311 

312 Examples: 

313 >>> import numpy as np 

314 >>> from fast_minimum_variance import Problem 

315 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

316 >>> w, iters = Problem(X).solve_osqp() 

317 >>> float(round(w.sum(), 6)) 

318 1.0 

319 >>> bool((w >= -1e-6).all()) 

320 True 

321 """ 

322 n = self.n 

323 

324 if self.target is None: 

325 p_dense = 2.0 * ((self.X.T @ self.X) / self.t) 

326 else: 

327 p_dense = 2.0 * ((1 - self.alpha) * (self.X.T @ self.X) / self.t + self.alpha * self.target) 

328 

329 p_upper = triu(csc_matrix(p_dense), format="csc") 

330 

331 q = np.zeros(n) 

332 if self.rho != 0.0 and self.mu is not None: 

333 q = -self.rho * self.mu 

334 

335 a_mat, l_vec, u_vec = self._osqp_constraints() 

336 

337 solver = osqp.OSQP() 

338 solver.setup( 

339 p_upper, 

340 q, 

341 a_mat, 

342 l_vec, 

343 u_vec, 

344 warm_starting=True, 

345 verbose=False, 

346 eps_abs=1e-8, 

347 eps_rel=1e-8, 

348 ) 

349 result = solver.solve() 

350 

351 w = np.array(result.x) 

352 if project: 

353 w = self._clip_and_renormalize(w) 

354 return w, result.info.iter 

355 

356 def solve_clarabel(self, *, project: bool = True) -> tuple[np.ndarray, int]: 

357 """Solve via Clarabel interior-point solver (direct API, no CVXPY overhead). 

358 

359 Assembles ``P = 2·Σ_LW`` as a sparse CSC matrix and calls Clarabel 

360 directly, bypassing CVXPY's problem-construction overhead. The 

361 problem-specific constraint data is supplied by ``_clarabel_constraints``. 

362 Returns ``(w, iters)`` where ``iters`` is the Clarabel iteration count. 

363 

364 Args: 

365 project: Clip and renormalize after solving (see ``solve_kkt``). 

366 

367 Returns: 

368 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and number of 

369 interior-point iterations. 

370 

371 Examples: 

372 >>> import numpy as np 

373 >>> from fast_minimum_variance import Problem 

374 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

375 >>> w, iters = Problem(X).solve_clarabel() 

376 >>> float(round(w.sum(), 6)) 

377 1.0 

378 >>> bool((w >= -1e-6).all()) 

379 True 

380 """ 

381 n = self.n 

382 

383 if self.target is None: 

384 p_dense = 2.0 * ((self.X.T @ self.X) / self.t) 

385 else: 

386 p_dense = 2.0 * ((1 - self.alpha) * (self.X.T @ self.X) / self.t + self.alpha * self.target) 

387 

388 p_csc = csc_matrix(p_dense) 

389 

390 q = np.zeros(n) 

391 if self.rho != 0.0 and self.mu is not None: 

392 q = -self.rho * self.mu 

393 

394 a_mat, b_vec, cones = self._clarabel_constraints() 

395 

396 settings = clarabel.DefaultSettings() # ty:ignore[unresolved-attribute] 

397 settings.verbose = False 

398 sol = clarabel.DefaultSolver(p_csc, q, a_mat, b_vec, cones, settings).solve() # ty:ignore[unresolved-attribute] 

399 

400 w = np.array(sol.x) 

401 if project: 

402 w = self._clip_and_renormalize(w) 

403 return w, sol.iterations 

404 

405 def solve_proximal(self, *, project: bool = True) -> tuple[np.ndarray, int]: 

406 """Solve via proximal gradient descent projected onto the probability simplex. 

407 

408 Minimises ``0.5 * w^T Σ_LW w`` subject to ``w >= 0, sum(w) = 1``. 

409 The gradient is computed in two separate terms so that the per-step 

410 cost remains ``O(nT)`` regardless of whether shrinkage is applied: 

411 

412 grad = (1-alpha)/T * X^T(Xw) + alpha * target @ w 

413 

414 This avoids stacking a (T+n)xn matrix, which would inflate per-step 

415 cost by O(n) under shrinkage. Return tilt (``rho != 0``) is not supported. 

416 

417 Args: 

418 project: Clip and renormalize after solving (see ``solve_kkt``). 

419 

420 Returns: 

421 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and the number 

422 of gradient steps taken. 

423 

424 Examples: 

425 >>> import numpy as np 

426 >>> from fast_minimum_variance import Problem 

427 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

428 >>> w, iters = Problem(X).solve_proximal() 

429 >>> float(round(w.sum(), 10)) 

430 1.0 

431 >>> bool((w >= 0).all()) 

432 True 

433 """ 

434 from .proximal import prox_gradient 

435 

436 extra_grad: Callable[[np.ndarray], np.ndarray] | None 

437 if self.target is not None and self.alpha > 0.0: 

438 c = 1.0 - self.alpha 

439 mat = np.sqrt(c) / np.sqrt(self.t) * self.X 

440 alpha, target = self.alpha, self.target 

441 

442 def extra_grad(v: np.ndarray, a: float = alpha, tgt: np.ndarray = target) -> np.ndarray: 

443 """Return the shrinkage gradient contribution a * target @ v.""" 

444 result: np.ndarray = a * (tgt @ v) 

445 return result 

446 else: 

447 mat = self.X / np.sqrt(self.t) 

448 extra_grad = None 

449 

450 vec = np.zeros(self.t) 

451 w, n_iters = prox_gradient(mat, vec, extra_grad=extra_grad) 

452 if project: 

453 w = self._clip_and_renormalize(w) 

454 return w, n_iters 

455 

456 def solve_fista(self, *, project: bool = True) -> tuple[np.ndarray, int]: 

457 r"""Solve via Nesterov-accelerated proximal gradient (FISTA). 

458 

459 Uses the Beck-Teboulle momentum sequence to achieve $O(1/k^2)$ 

460 convergence for convex objectives; for strongly convex $f$ with 

461 condition number $\\kappa$ the linear rate is $(1-1/\\sqrt{\\kappa})^k$, 

462 matching CG's asymptotic iteration count. Same per-step cost 

463 $O(nT)$ as ``solve_proximal``, typically 2--10$\\times$ fewer 

464 iterations. 

465 

466 Args: 

467 project: Clip and renormalize after solving (see ``solve_proximal``). 

468 

469 Returns: 

470 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and the number 

471 of gradient steps taken. 

472 

473 Examples: 

474 >>> import numpy as np 

475 >>> from fast_minimum_variance import Problem 

476 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

477 >>> w, iters = Problem(X).solve_fista() 

478 >>> float(round(w.sum(), 10)) 

479 1.0 

480 >>> bool((w >= 0).all()) 

481 True 

482 """ 

483 from .proximal import fista_gradient 

484 

485 extra_grad: Callable[[np.ndarray], np.ndarray] | None 

486 if self.target is not None and self.alpha > 0.0: 

487 c = 1.0 - self.alpha 

488 mat = np.sqrt(c) / np.sqrt(self.t) * self.X 

489 alpha, target = self.alpha, self.target 

490 

491 def extra_grad(v: np.ndarray, a: float = alpha, tgt: np.ndarray = target) -> np.ndarray: 

492 """Return the shrinkage gradient contribution a * target @ v.""" 

493 result: np.ndarray = a * (tgt @ v) 

494 return result 

495 else: 

496 mat = self.X / np.sqrt(self.t) 

497 extra_grad = None 

498 

499 vec = np.zeros(self.t) 

500 w, n_iters = fista_gradient(mat, vec, extra_grad=extra_grad) 

501 if project: 

502 w = self._clip_and_renormalize(w) 

503 return w, n_iters