Coverage for src / fast_minimum_variance / problem.py: 100%

136 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-09 14:08 +0000

1"""General mean-variance portfolio problem with growing-constraint active-set.""" 

2 

3from dataclasses import dataclass 

4 

5import clarabel 

6import numpy as np 

7from scipy.optimize import nnls 

8from scipy.sparse import csc_matrix, vstack 

9from scipy.sparse.linalg import LinearOperator, minres 

10 

11from ._base import _BaseProblem 

12 

13 

14@dataclass(frozen=True) 

15class _Problem(_BaseProblem): 

16 """Mean-variance portfolio problem with arbitrary linear constraints. 

17 

18 Encodes the optimization problem:: 

19 

20 min (1-alpha)||X w||^2 + alpha*(||X||_F^2/N)*||w||^2 - rho * mu^T w 

21 s.t. A^T w = b (equality constraints) 

22 C^T w <= d (inequality constraints) 

23 

24 The first term is the sample portfolio variance (X is the demeaned return 

25 matrix of shape T x N). The ``alpha`` term adds a Ledoit-Wolf ridge 

26 ``alpha * (||X||_F^2 / N) * I`` to the covariance, improving conditioning. 

27 The ``rho * mu`` term tilts the portfolio toward higher-expected-return 

28 assets (Markowitz). 

29 

30 Defaults reproduce the long-only minimum-variance problem: 

31 

32 * ``A = ones(N, 1)``, ``b = [1]`` — budget constraint: sum(w) = 1 

33 * ``C = -I``, ``d = 0`` — long-only: -w <= 0, i.e. w >= 0 

34 

35 The active-set loop *adds* violated inequality constraints as equalities 

36 (growing approach), operating on the full N-dimensional system throughout. 

37 See :class:`~fast_minimum_variance.minvar_problem._MinVarProblem` for the 

38 complementary shrinking approach optimised for the default long-only case. 

39 

40 Solvers:: 

41 

42 w, iters = Problem(X, A=A, b=b).solve_kkt() 

43 w, iters = Problem(X, A=A, b=b).solve_minres() 

44 w, iters = Problem(X, A=A, b=b).solve_cg() 

45 w, iters = Problem(X, A=A, b=b).solve_cvxpy() # requires [convex] extra 

46 """ 

47 

48 def _cg_step(self, active): 

49 """Solve the KKT saddle-point system via MINRES; return ``(w, iters)``.""" 

50 op, rhs = self._kkt_operator(active=active) 

51 iters = [0] 

52 

53 def _count(_): 

54 iters[0] += 1 

55 

56 x, _ = minres(op, rhs, callback=_count) 

57 return x[: self.n], iters[0] 

58 

59 A: np.ndarray | None = None 

60 b: np.ndarray | None = None 

61 C: np.ndarray | None = None 

62 d: np.ndarray | None = None 

63 

64 def __post_init__(self): 

65 """Fill in default constraint matrices when not supplied.""" 

66 super().__post_init__() 

67 n = self.n 

68 if self.A is None: 

69 object.__setattr__(self, "A", np.ones((n, 1))) 

70 if self.b is None: 

71 object.__setattr__(self, "b", np.ones(1)) 

72 if self.C is None: 

73 object.__setattr__(self, "C", -np.eye(n)) 

74 if self.d is None: 

75 object.__setattr__(self, "d", np.zeros(n)) 

76 

77 @property 

78 def _m(self) -> int: 

79 """Number of equality constraints.""" 

80 assert self.A is not None # noqa: S101 

81 return self.A.shape[1] 

82 

83 # ------------------------------------------------------------------ 

84 # Active-set loop (growing: add violated inequality constraints) 

85 # ------------------------------------------------------------------ 

86 

87 def _constraint_active_set(self, solve_fn, tol=1e-6, max_iter=10_000): 

88 """Run the active-set loop, promoting violated inequalities to equalities.""" 

89 assert self.C is not None # noqa: S101 

90 assert self.d is not None # noqa: S101 

91 p = self.d.size 

92 active = np.zeros(p, dtype=bool) 

93 total_iters = 0 

94 

95 while True: 

96 w, step_iters = solve_fn(active) 

97 violations = self.C[:, ~active].T @ w - self.d[~active] 

98 total_iters += step_iters 

99 if np.all(violations <= 1e-10): 

100 break 

101 active[~active] |= violations > 1e-10 

102 

103 return w, total_iters 

104 

105 # ------------------------------------------------------------------ 

106 # Inner steps (called by the template solve_* methods on the base) 

107 # ------------------------------------------------------------------ 

108 

109 def _kkt_step(self, active): 

110 """Solve the full KKT system directly; return ``(w, 1)``.""" 

111 K, rhs = self._kkt(active=active) # noqa: N806 

112 return np.linalg.solve(K, rhs)[: self.n], 1 

113 

114 def _cvxpy_constraints(self, w, cp): 

115 """Return equality and inequality constraints for CVXPY.""" 

116 assert self.A is not None # noqa: S101 

117 assert self.b is not None # noqa: S101 

118 assert self.C is not None # noqa: S101 

119 assert self.d is not None # noqa: S101 

120 return [self.A.T @ w == self.b, self.C.T @ w <= self.d] 

121 

122 # ------------------------------------------------------------------ 

123 # Operator builders (also accessed directly by tests) 

124 # ------------------------------------------------------------------ 

125 

126 def _kkt(self, active=None): 

127 """Build the (N+m) x (N+m) KKT saddle-point system.""" 

128 assert self.A is not None # noqa: S101 

129 assert self.b is not None # noqa: S101 

130 assert self.C is not None # noqa: S101 

131 assert self.d is not None # noqa: S101 

132 if active is None: 

133 active = np.zeros(self.C.shape[1], dtype=bool) 

134 A = np.hstack([self.A, self.C[:, active]]) # noqa: N806 

135 b = np.concatenate([self.b, self.d[active]]) 

136 m = A.shape[1] 

137 

138 # ridge = self._ridge() 

139 # oma = 1.0 - self.alpha 

140 K = np.zeros((self.n + m, self.n + m)) # noqa: N806 

141 if self.target is None: 

142 K[: self.n, : self.n] = 2 * (self.X.T @ self.X) / self.t 

143 else: 

144 K[: self.n, : self.n] = 2 * ((1 - self.alpha) * (self.X.T @ self.X) / self.t + self.alpha * self.target) 

145 K[: self.n, self.n :] = A 

146 K[self.n :, : self.n] = A.T 

147 

148 rhs = np.zeros(self.n + m) 

149 if self.rho != 0.0 and self.mu is not None: 

150 rhs[: self.n] = self.rho * self.mu 

151 rhs[self.n :] = b 

152 

153 return K, rhs 

154 

155 def _kkt_operator(self, active=None): 

156 """Build the matrix-free KKT saddle-point operator and RHS for MINRES.""" 

157 assert self.A is not None # noqa: S101 

158 assert self.b is not None # noqa: S101 

159 assert self.C is not None # noqa: S101 

160 assert self.d is not None # noqa: S101 

161 if active is None: 

162 active = np.zeros(self.C.shape[1], dtype=bool) 

163 aa = np.hstack([self.A, self.C[:, active]]) 

164 na, ma = self.n, aa.shape[1] 

165 

166 def _matvec(x, xx=self.X, a=aa, n_=na, m_=ma): 

167 """Apply the KKT saddle-point matrix to vector ``x``.""" 

168 out = np.empty(n_ + m_) 

169 if self.target is None: 

170 out[:n_] = 2.0 * (xx.T @ (xx @ x[:n_])) / self.t + a @ x[n_:] 

171 else: 

172 out[:n_] = ( 

173 2.0 * ((1.0 - self.alpha) * (xx.T @ (xx @ x[:n_])) / self.t + self.alpha * (self.target @ x[:n_])) 

174 + a @ x[n_:] 

175 ) 

176 out[n_:] = a.T @ x[:n_] 

177 return out 

178 

179 rhs = np.zeros(na + ma) 

180 if self.rho != 0.0 and self.mu is not None: 

181 rhs[:na] = self.rho * self.mu 

182 rhs[na:] = np.concatenate([self.b, self.d[active]]) 

183 

184 return LinearOperator(shape=(na + ma, na + ma), matvec=_matvec), rhs # type: ignore[call-arg] 

185 

186 def _clarabel_constraints(self): 

187 """Return equality and inequality constraints for Clarabel.""" 

188 assert self.A is not None # noqa: S101 

189 assert self.b is not None # noqa: S101 

190 assert self.C is not None # noqa: S101 

191 assert self.d is not None # noqa: S101 

192 a_mat = vstack([csc_matrix(self.A.T), csc_matrix(self.C.T)], format="csc") 

193 b_vec = np.concatenate([self.b, self.d]) 

194 cones = [clarabel.ZeroConeT(self._m), clarabel.NonnegativeConeT(len(self.d))] # type: ignore[attr-defined] 

195 return a_mat, b_vec, cones 

196 

197 def _osqp_constraints(self): 

198 """Return equality and inequality constraints for OSQP.""" 

199 assert self.A is not None # noqa: S101 

200 assert self.b is not None # noqa: S101 

201 assert self.C is not None # noqa: S101 

202 assert self.d is not None # noqa: S101 

203 a_mat = vstack([csc_matrix(self.A.T), csc_matrix(self.C.T)], format="csc") 

204 l_vec = np.concatenate([self.b, np.full(len(self.d), -np.inf)]) 

205 u_vec = np.concatenate([self.b, self.d]) 

206 return a_mat, l_vec, u_vec 

207 

208 def _nnls_solve(self): 

209 """Solve via NNLS on the augmented return matrix; return ``(w, 1)``. 

210 

211 Augments ``X`` with rows for the LW ridge term and all equality 

212 constraints (scaled by ``M = ||X||_F * T``); non-negativity is 

213 handled natively by Lawson-Hanson. Inequality constraints beyond 

214 ``w >= 0`` are not enforced; use ``solve_kkt`` for general ``C``. 

215 """ 

216 assert self.A is not None # noqa: S101 

217 assert self.b is not None # noqa: S101 

218 t = self.X.shape[0] 

219 # oma = 1.0 - self.alpha 

220 # gamma = self._ridge() 

221 m = float(np.linalg.norm(self.X, "fro")) * t 

222 

223 if self.target is not None: 

224 chol = np.linalg.cholesky(self.target) 

225 rows = [np.sqrt((1 - self.alpha) / self.t) * self.X] 

226 tgt = [np.zeros(t)] 

227 if self.alpha > 0.0: 

228 rows.append(np.sqrt(self.alpha) * chol.T) 

229 tgt.append(np.zeros(self.n)) 

230 else: 

231 rows = [np.sqrt(1.0 / self.t) * self.X] 

232 tgt = [np.zeros(t)] 

233 rows.append(m * self.A.T) 

234 tgt.append(m * self.b) 

235 

236 w, _ = nnls(np.vstack(rows), np.concatenate(tgt)) 

237 return w, 1