Coverage for src/fast_minimum_variance/problem.py: 100%

143 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-02 13:28 +0000

1"""General mean-variance portfolio problem with growing-constraint active-set.""" 

2 

3from collections.abc import Callable 

4from dataclasses import dataclass 

5from typing import Any 

6 

7import clarabel 

8import cvxpy as cp 

9import numpy as np 

10from cvx.linalg import cholesky 

11from scipy.optimize import nnls 

12from scipy.sparse import csc_matrix, vstack 

13from scipy.sparse.linalg import LinearOperator, minres 

14 

15from ._base import _BaseProblem 

16 

17 

18@dataclass(frozen=True) 

19class _Problem(_BaseProblem): 

20 """Mean-variance portfolio problem with arbitrary linear constraints. 

21 

22 Encodes the optimization problem:: 

23 

24 min (1-alpha)||X w||^2 + alpha*(||X||_F^2/N)*||w||^2 - rho * mu^T w 

25 s.t. A^T w = b (equality constraints) 

26 C^T w <= d (inequality constraints) 

27 

28 The first term is the sample portfolio variance (X is the demeaned return 

29 matrix of shape T x N). The ``alpha`` term adds a Ledoit-Wolf ridge 

30 ``alpha * (||X||_F^2 / N) * I`` to the covariance, improving conditioning. 

31 The ``rho * mu`` term tilts the portfolio toward higher-expected-return 

32 assets (Markowitz). 

33 

34 Defaults reproduce the long-only minimum-variance problem: 

35 

36 * ``A = ones(N, 1)``, ``b = [1]`` — budget constraint: sum(w) = 1 

37 * ``C = -I``, ``d = 0`` — long-only: -w <= 0, i.e. w >= 0 

38 

39 The active-set loop *adds* violated inequality constraints as equalities 

40 (growing approach), operating on the full N-dimensional system throughout. 

41 See :class:`~fast_minimum_variance.minvar_problem._MinVarProblem` for the 

42 complementary shrinking approach optimised for the default long-only case. 

43 

44 Solvers:: 

45 

46 w, iters = Problem(X, A=A, b=b).solve_kkt() 

47 w, iters = Problem(X, A=A, b=b).solve_minres() 

48 w, iters = Problem(X, A=A, b=b).solve_cg() 

49 w, iters = Problem(X, A=A, b=b).solve_cvxpy() # requires [convex] extra 

50 """ 

51 

52 def _cg_step(self, active: np.ndarray) -> tuple[np.ndarray, int]: 

53 """Solve the KKT saddle-point system via MINRES; return ``(w, iters)``.""" 

54 op, rhs = self._kkt_operator(active=active) 

55 iters = [0] 

56 

57 def _count(_: np.ndarray) -> None: 

58 """Increment iteration counter on each MINRES callback.""" 

59 iters[0] += 1 

60 

61 x, _ = minres(op, rhs, callback=_count) 

62 return x[: self.n], iters[0] 

63 

64 A: np.ndarray | None = None 

65 b: np.ndarray | None = None 

66 C: np.ndarray | None = None 

67 d: np.ndarray | None = None 

68 

69 def __post_init__(self) -> None: 

70 """Fill in default constraint matrices when not supplied.""" 

71 super().__post_init__() 

72 n = self.n 

73 if self.A is None: 

74 object.__setattr__(self, "A", np.ones((n, 1))) 

75 if self.b is None: 

76 object.__setattr__(self, "b", np.ones(1)) 

77 if self.C is None: 

78 object.__setattr__(self, "C", -np.eye(n)) 

79 if self.d is None: 

80 object.__setattr__(self, "d", np.zeros(n)) 

81 

82 @property 

83 def _m(self) -> int: 

84 """Number of equality constraints.""" 

85 assert self.A is not None # noqa: S101 

86 return int(self.A.shape[1]) 

87 

88 # ------------------------------------------------------------------ 

89 # Active-set loop (growing: add violated inequality constraints) 

90 # ------------------------------------------------------------------ 

91 

92 def _constraint_active_set( 

93 self, 

94 solve_fn: Callable[[np.ndarray], tuple[np.ndarray, int]], 

95 tol: float = 1e-6, # noqa: ARG002 

96 max_iter: int = 10_000, # noqa: ARG002 

97 ) -> tuple[np.ndarray, int, int]: 

98 """Run the active-set loop, promoting violated inequalities to equalities.""" 

99 assert self.C is not None # noqa: S101 

100 assert self.d is not None # noqa: S101 

101 p = self.d.size 

102 active = np.zeros(p, dtype=bool) 

103 outer_steps = 0 

104 total_iters = 0 

105 

106 while True: 

107 w, step_iters = solve_fn(active) 

108 violations = self.C[:, ~active].T @ w - self.d[~active] 

109 outer_steps += 1 

110 total_iters += step_iters 

111 if np.all(violations <= 1e-10): 

112 break 

113 active[~active] |= violations > 1e-10 

114 

115 return w, outer_steps, total_iters 

116 

117 # ------------------------------------------------------------------ 

118 # Inner steps (called by the template solve_* methods on the base) 

119 # ------------------------------------------------------------------ 

120 

121 def _kkt_step(self, active: np.ndarray) -> tuple[np.ndarray, int]: 

122 """Solve the full KKT system directly; return ``(w, 1)``.""" 

123 K, rhs = self._kkt(active=active) # noqa: N806 

124 return np.linalg.solve(K, rhs)[: self.n], 1 

125 

126 def _cvxpy_constraints(self, w: cp.Variable, cp: object) -> list[Any]: # noqa: ARG002 

127 """Return equality and inequality constraints for CVXPY.""" 

128 assert self.A is not None # noqa: S101 

129 assert self.b is not None # noqa: S101 

130 assert self.C is not None # noqa: S101 

131 assert self.d is not None # noqa: S101 

132 return [self.A.T @ w == self.b, self.C.T @ w <= self.d] 

133 

134 # ------------------------------------------------------------------ 

135 # Operator builders (also accessed directly by tests) 

136 # ------------------------------------------------------------------ 

137 

138 def _kkt(self, active: np.ndarray | None = None) -> tuple[np.ndarray, np.ndarray]: 

139 """Build the (N+m) x (N+m) KKT saddle-point system.""" 

140 assert self.A is not None # noqa: S101 

141 assert self.b is not None # noqa: S101 

142 assert self.C is not None # noqa: S101 

143 assert self.d is not None # noqa: S101 

144 if active is None: 

145 active = np.zeros(self.C.shape[1], dtype=bool) 

146 A = np.hstack([self.A, self.C[:, active]]) # noqa: N806 

147 b = np.concatenate([self.b, self.d[active]]) 

148 m = A.shape[1] 

149 

150 # ridge = self._ridge() 

151 # oma = 1.0 - self.alpha 

152 K = np.zeros((self.n + m, self.n + m)) # noqa: N806 

153 if self.target is None: 

154 K[: self.n, : self.n] = 2 * (self.X.T @ self.X) / self.t 

155 else: 

156 K[: self.n, : self.n] = 2 * ((1 - self.alpha) * (self.X.T @ self.X) / self.t + self.alpha * self.target) 

157 K[: self.n, self.n :] = A 

158 K[self.n :, : self.n] = A.T 

159 

160 rhs = np.zeros(self.n + m) 

161 if self.rho != 0.0 and self.mu is not None: 

162 rhs[: self.n] = self.rho * self.mu 

163 rhs[self.n :] = b 

164 

165 return K, rhs 

166 

167 def _kkt_operator(self, active: np.ndarray | None = None) -> tuple[LinearOperator, np.ndarray]: 

168 """Build the matrix-free KKT saddle-point operator and RHS for MINRES.""" 

169 assert self.A is not None # noqa: S101 

170 assert self.b is not None # noqa: S101 

171 assert self.C is not None # noqa: S101 

172 assert self.d is not None # noqa: S101 

173 if active is None: 

174 active = np.zeros(self.C.shape[1], dtype=bool) 

175 aa = np.hstack([self.A, self.C[:, active]]) 

176 na, ma = self.n, aa.shape[1] 

177 

178 def _matvec( 

179 x: np.ndarray, 

180 xx: np.ndarray = self.X, 

181 a: np.ndarray = aa, 

182 n_: int = na, 

183 m_: int = ma, 

184 ) -> np.ndarray: 

185 """Apply the KKT saddle-point matrix to vector ``x``.""" 

186 out = np.empty(n_ + m_) 

187 if self.target is None: 

188 out[:n_] = 2.0 * (xx.T @ (xx @ x[:n_])) / self.t + a @ x[n_:] 

189 else: 

190 out[:n_] = ( 

191 2.0 * ((1.0 - self.alpha) * (xx.T @ (xx @ x[:n_])) / self.t + self.alpha * (self.target @ x[:n_])) 

192 + a @ x[n_:] 

193 ) 

194 out[n_:] = a.T @ x[:n_] 

195 return out 

196 

197 rhs = np.zeros(na + ma) 

198 if self.rho != 0.0 and self.mu is not None: 

199 rhs[:na] = self.rho * self.mu 

200 rhs[na:] = np.concatenate([self.b, self.d[active]]) 

201 

202 op = LinearOperator(shape=(na + ma, na + ma), matvec=_matvec) # ty:ignore[missing-argument, unknown-argument] 

203 return op, rhs 

204 

205 def _clarabel_constraints(self) -> tuple[csc_matrix, np.ndarray, list[Any]]: 

206 """Return equality and inequality constraints for Clarabel.""" 

207 assert self.A is not None # noqa: S101 

208 assert self.b is not None # noqa: S101 

209 assert self.C is not None # noqa: S101 

210 assert self.d is not None # noqa: S101 

211 a_mat = vstack([csc_matrix(self.A.T), csc_matrix(self.C.T)], format="csc") 

212 b_vec = np.concatenate([self.b, self.d]) 

213 cones = [clarabel.ZeroConeT(self._m), clarabel.NonnegativeConeT(len(self.d))] # ty:ignore[unresolved-attribute] 

214 return a_mat, b_vec, cones 

215 

216 def _osqp_constraints(self) -> tuple[csc_matrix, np.ndarray, np.ndarray]: 

217 """Return equality and inequality constraints for OSQP.""" 

218 assert self.A is not None # noqa: S101 

219 assert self.b is not None # noqa: S101 

220 assert self.C is not None # noqa: S101 

221 assert self.d is not None # noqa: S101 

222 a_mat = vstack([csc_matrix(self.A.T), csc_matrix(self.C.T)], format="csc") 

223 l_vec = np.concatenate([self.b, np.full(len(self.d), -np.inf)]) 

224 u_vec = np.concatenate([self.b, self.d]) 

225 return a_mat, l_vec, u_vec 

226 

227 def _nnls_solve(self) -> tuple[np.ndarray, int]: 

228 """Solve via NNLS on the augmented return matrix; return ``(w, 1)``. 

229 

230 Augments ``X`` with rows for the LW ridge term and all equality 

231 constraints (scaled by ``M = ||X||_F * T``); non-negativity is 

232 handled natively by Lawson-Hanson. Inequality constraints beyond 

233 ``w >= 0`` are not enforced; use ``solve_kkt`` for general ``C``. 

234 """ 

235 assert self.A is not None # noqa: S101 

236 assert self.b is not None # noqa: S101 

237 t = self.X.shape[0] 

238 # oma = 1.0 - self.alpha 

239 # gamma = self._ridge() 

240 m = float(np.linalg.norm(self.X, "fro")) * t 

241 

242 if self.target is not None: 

243 chol = cholesky(self.target) 

244 rows = [np.sqrt((1 - self.alpha) / self.t) * self.X] 

245 tgt = [np.zeros(t)] 

246 if self.alpha > 0.0: 

247 rows.append(np.sqrt(self.alpha) * chol.T) 

248 tgt.append(np.zeros(self.n)) 

249 else: 

250 rows = [np.sqrt(1.0 / self.t) * self.X] 

251 tgt = [np.zeros(t)] 

252 rows.append(m * self.A.T) 

253 tgt.append(m * self.b) 

254 

255 w, _ = nnls(np.vstack(rows), np.concatenate(tgt)) 

256 return w, 1