Coverage for src / fast_minimum_variance / _base.py: 100%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-09 14:08 +0000

1"""Common base for portfolio-optimisation problem classes.""" 

2 

3from abc import ABC, abstractmethod 

4from dataclasses import dataclass 

5 

6import clarabel 

7import cvxpy as cp 

8import numpy as np 

9import osqp 

10from scipy.sparse import csc_matrix, triu 

11 

12 

13@dataclass(frozen=True) 

14class _BaseProblem(ABC): 

15 """Shared fields, utilities, and solver templates for portfolio problems. 

16 

17 Subclasses must implement the seven abstract hooks: 

18 

19 * ``_constraint_active_set(solve_fn)`` — outer constraint-handling loop 

20 * ``_kkt_step(mask) -> (w, iters)`` — one direct-KKT inner step 

21 * ``_cg_step(mask) -> (w, iters)`` — one CG inner step 

22 * ``_cvxpy_constraints(w, cp) -> list`` — CVXPY constraint list 

23 * ``_clarabel_constraints() -> (A, b, cones)`` — Clarabel constraint data 

24 * ``_osqp_constraints() -> (A, l, u)`` — OSQP constraint data 

25 * ``_nnls_solve() -> (w, 1)`` — NNLS direct solve 

26 

27 All ``solve_*`` methods are implemented here as template methods that 

28 call ``_constraint_active_set`` with the appropriate ``_XXX_step`` 

29 method, then optionally clip-and-renormalize. 

30 """ 

31 

32 X: np.ndarray 

33 target: np.ndarray | None = None 

34 alpha: float = 0.0 

35 rho: float = 0.0 

36 mu: np.ndarray | None = None 

37 

38 def __post_init__(self): 

39 """Validate target shape when supplied; shrinkage is only active when target is not None.""" 

40 n = self.n 

41 if self.target is not None and self.target.shape != (n, n): 

42 raise ValueError(f"target must be a square {n} x {n} matrix, got {self.target.shape}") # noqa: TRY003 

43 

44 # ------------------------------------------------------------------ 

45 # Shared utilities 

46 # ------------------------------------------------------------------ 

47 

48 @property 

49 def t(self) -> int: 

50 """Return the number of rows in X.""" 

51 return self.X.shape[0] 

52 

53 @property 

54 def n(self) -> int: 

55 """Number of assets (columns of X).""" 

56 return self.X.shape[1] 

57 

58 @staticmethod 

59 def _clip_and_renormalize(w: np.ndarray) -> np.ndarray: 

60 """Clip weights to ``[0, ∞)`` and renormalize to sum to 1.""" 

61 w = np.maximum(w, 0) 

62 w /= w.sum() 

63 return w 

64 

65 # ------------------------------------------------------------------ 

66 # Abstract hooks (raise NotImplementedError — subclasses must override) 

67 # ------------------------------------------------------------------ 

68 @abstractmethod 

69 def _constraint_active_set(self, solve_fn, tol=1e-6, max_iter=10_000): # pragma: no cover 

70 """Run the outer constraint-handling loop, calling ``solve_fn`` each iteration.""" 

71 raise NotImplementedError 

72 

73 @abstractmethod 

74 def _kkt_step(self, active): # pragma: no cover 

75 """Solve one inner direct-KKT step; return ``(w, iters)``.""" 

76 raise NotImplementedError 

77 

78 @abstractmethod 

79 def _cvxpy_constraints(self, w, cp): # pragma: no cover 

80 """Return the list of CVXPY constraints for ``solve_cvxpy``.""" 

81 raise NotImplementedError 

82 

83 @abstractmethod 

84 def _cg_step(self, active): 

85 """Solve one inner CG step; return ``(w, iters)``.""" 

86 raise NotImplementedError # pragma: no cover 

87 

88 @abstractmethod 

89 def _nnls_solve(self): # pragma: no cover 

90 """Solve via NNLS directly (no outer loop); return ``(w, 1)``.""" 

91 raise NotImplementedError 

92 

93 @abstractmethod 

94 def _clarabel_constraints(self): # pragma: no cover 

95 """Return ``(A_mat, b_vec, cones)`` for the Clarabel QP solver.""" 

96 raise NotImplementedError 

97 

98 @abstractmethod 

99 def _osqp_constraints(self): # pragma: no cover 

100 """Return ``(A_mat, l_vec, u_vec)`` for the OSQP solver.""" 

101 raise NotImplementedError 

102 

103 # ------------------------------------------------------------------ 

104 # Template solvers 

105 # ------------------------------------------------------------------ 

106 

107 def solve_kkt(self, *, project: bool = True): 

108 """Solve via the direct KKT system. 

109 

110 Args: 

111 project: Clip weights to ``[0, ∞)`` and renormalize to sum to 1 

112 after solving. Set to ``False`` for custom constraints. 

113 

114 Returns: 

115 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and number of 

116 outer iterations taken. 

117 

118 Examples: 

119 >>> import numpy as np 

120 >>> from fast_minimum_variance import Problem 

121 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

122 >>> w, iters = Problem(X).solve_kkt() 

123 >>> float(round(w.sum(), 10)) 

124 1.0 

125 >>> bool((w >= 0).all()) 

126 True 

127 """ 

128 w, iters = self._constraint_active_set(self._kkt_step) 

129 if project: 

130 w = self._clip_and_renormalize(w) 

131 return w, iters 

132 

133 def solve_cvxpy(self, *, project: bool = True): 

134 """Solve via CVXPY / Clarabel (reference interior-point solver). 

135 

136 Requires the ``convex`` extra:: 

137 

138 pip install fast-minimum-variance[convex] 

139 

140 Args: 

141 project: Clip and renormalize after solving (see ``solve_kkt``). 

142 

143 Returns: 

144 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and Clarabel 

145 iteration count. 

146 

147 Examples: 

148 >>> import numpy as np 

149 >>> from fast_minimum_variance import Problem 

150 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

151 >>> w, iters = Problem(X).solve_cvxpy() 

152 >>> float(round(w.sum(), 6)) 

153 1.0 

154 >>> bool((w >= -1e-6).all()) 

155 True 

156 """ 

157 w = cp.Variable(self.n) 

158 if self.target is not None: 

159 # target is the penalty matrix M; decompose as M = chol chol^T so ||chol^T w||^2 = w^T M w 

160 chol = np.linalg.cholesky(self.target) 

161 objective = (1.0 - self.alpha) * cp.sum_squares(self.X @ w) / self.t + self.alpha * cp.sum_squares( 

162 chol.T @ w 

163 ) 

164 else: 

165 objective = cp.sum_squares(self.X @ w) / self.t 

166 if self.rho != 0.0 and self.mu is not None: 

167 objective = objective - self.rho * (self.mu @ w) 

168 

169 problem = cp.Problem(cp.Minimize(objective), self._cvxpy_constraints(w, cp)) 

170 problem.solve(solver=cp.CLARABEL) 

171 

172 result = w.value 

173 if result is None: 

174 raise RuntimeError("CVXPY solver failed to find a solution") # noqa: TRY003 

175 if project: 

176 result = self._clip_and_renormalize(result) 

177 return result, problem.solver_stats.num_iters 

178 

179 def solve_cg(self, *, project: bool = True): 

180 """Solve via matrix-free conjugate gradients. 

181 

182 Args: 

183 project: Clip weights to ``[0, ∞)`` and renormalize to sum to 1 

184 after solving. Set to ``False`` for custom constraints. 

185 

186 Returns: 

187 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and total CG 

188 iteration count across all outer active-set steps. 

189 

190 Examples: 

191 >>> import numpy as np 

192 >>> from fast_minimum_variance import Problem 

193 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

194 >>> w, iters = Problem(X).solve_cg() 

195 >>> float(round(w.sum(), 10)) 

196 1.0 

197 >>> bool((w >= 0).all()) 

198 True 

199 """ 

200 w, iters = self._constraint_active_set(self._cg_step) 

201 if project: 

202 w = self._clip_and_renormalize(w) 

203 return w, iters 

204 

205 def solve_nnls(self, *, project: bool = True): 

206 """Solve via non-negative least squares (scipy.optimize.nnls). 

207 

208 The budget constraint is enforced by augmenting the return matrix 

209 with a heavily weighted all-ones row; non-negativity is handled 

210 natively by the Lawson-Hanson algorithm. The covariance matrix 

211 ``X'X`` is formed internally by scipy. Return tilt (``rho != 0``) 

212 is not supported. 

213 

214 Args: 

215 project: Renormalize weights to sum to 1 after solving. 

216 Clipping is a no-op (NNLS already gives ``w >= 0``). 

217 

218 Returns: 

219 ``(w, 1)`` — weight vector of shape ``(N,)`` and iteration 

220 count (always 1; NNLS is a single direct solve). 

221 

222 Examples: 

223 >>> import numpy as np 

224 >>> from fast_minimum_variance import Problem 

225 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

226 >>> w, iters = Problem(X).solve_nnls() 

227 >>> float(round(w.sum(), 10)) 

228 1.0 

229 >>> bool((w >= 0).all()) 

230 True 

231 """ 

232 w, iters = self._nnls_solve() 

233 if project: 

234 w = self._clip_and_renormalize(w) 

235 return w, iters 

236 

237 def solve_clarabel(self, *, project: bool = True): 

238 """Solve via Clarabel interior-point solver (direct API, no CVXPY overhead). 

239 

240 Assembles ``P = 2·Σ_LW`` as a sparse CSC matrix and calls Clarabel 

241 directly, bypassing CVXPY's problem-construction overhead. The 

242 problem-specific constraint data is supplied by ``_clarabel_constraints``. 

243 Returns ``(w, iters)`` where ``iters`` is the Clarabel iteration count. 

244 

245 Args: 

246 project: Clip and renormalize after solving (see ``solve_kkt``). 

247 

248 Returns: 

249 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and number of 

250 interior-point iterations. 

251 

252 Examples: 

253 >>> import numpy as np 

254 >>> from fast_minimum_variance import Problem 

255 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

256 >>> w, iters = Problem(X).solve_clarabel() 

257 >>> float(round(w.sum(), 6)) 

258 1.0 

259 >>> bool((w >= -1e-6).all()) 

260 True 

261 """ 

262 n = self.n 

263 

264 if self.target is None: 

265 p_dense = 2.0 * ((self.X.T @ self.X) / self.t) 

266 else: 

267 p_dense = 2.0 * ((1 - self.alpha) * (self.X.T @ self.X) / self.t + self.alpha * self.target) 

268 

269 p_csc = csc_matrix(p_dense) 

270 

271 q = np.zeros(n) 

272 if self.rho != 0.0 and self.mu is not None: 

273 q = -self.rho * self.mu 

274 

275 a_mat, b_vec, cones = self._clarabel_constraints() 

276 

277 settings = clarabel.DefaultSettings() # type: ignore[attr-defined] 

278 settings.verbose = False 

279 sol = clarabel.DefaultSolver(p_csc, q, a_mat, b_vec, cones, settings).solve() # type: ignore[attr-defined] 

280 

281 w = np.array(sol.x) 

282 if project: 

283 w = self._clip_and_renormalize(w) 

284 return w, sol.iterations 

285 

286 def solve_osqp(self, *, project: bool = True): 

287 """Solve via OSQP (operator-splitting QP solver, direct API, no CVXPY overhead). 

288 

289 Assembles ``P = 2·Σ_LW`` as a sparse upper-triangular CSC matrix and 

290 calls OSQP directly. The problem-specific constraint data is supplied 

291 by ``_osqp_constraints``. Returns ``(w, iters)`` where ``iters`` is 

292 the number of ADMM iterations. 

293 

294 Args: 

295 project: Clip and renormalize after solving (see ``solve_kkt``). 

296 

297 Returns: 

298 ``(w, n_iters)`` — weight vector of shape ``(N,)`` and number of 

299 ADMM iterations. 

300 

301 Examples: 

302 >>> import numpy as np 

303 >>> from fast_minimum_variance import Problem 

304 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

305 >>> w, iters = Problem(X).solve_osqp() 

306 >>> float(round(w.sum(), 6)) 

307 1.0 

308 >>> bool((w >= -1e-6).all()) 

309 True 

310 """ 

311 n = self.n 

312 

313 if self.target is None: 

314 p_dense = 2.0 * ((self.X.T @ self.X) / self.t) 

315 else: 

316 p_dense = 2.0 * ((1 - self.alpha) * (self.X.T @ self.X) / self.t + self.alpha * self.target) 

317 

318 # p_dense = 2.0 * ((1-self.alpha) * (self.X.T @ self.X)/self.t + self.alpha * self.target) 

319 p_upper = triu(p_dense, format="csc") 

320 

321 q = np.zeros(n) 

322 if self.rho != 0.0 and self.mu is not None: 

323 q = -self.rho * self.mu 

324 

325 a_mat, l_vec, u_vec = self._osqp_constraints() 

326 

327 prob = osqp.OSQP() 

328 prob.setup(p_upper, q, a_mat, l_vec, u_vec, verbose=False) 

329 res = prob.solve() 

330 

331 w = np.array(res.x) 

332 if project: 

333 w = self._clip_and_renormalize(w) 

334 return w, res.info.iter