Coverage for src / fast_minimum_variance / minvar_problem.py: 100%

115 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-09 14:08 +0000

1"""Minimum-variance solver: primal asset elimination with dual-feasibility check.""" 

2 

3from dataclasses import dataclass 

4 

5import clarabel 

6import numpy as np 

7from scipy.optimize import nnls 

8from scipy.sparse import csc_matrix, eye, vstack 

9from scipy.sparse.linalg import LinearOperator, cg 

10 

11from ._base import _BaseProblem 

12 

13 

14@dataclass(frozen=True) 

15class _MinVarProblem(_BaseProblem): 

16 """Minimum-variance portfolio solver via primal-dual active-set iteration. 

17 

18 Solves:: 

19 

20 min (1-alpha)||X w||^2 + alpha*(||X||_F^2/N)*||w||^2 - rho*mu^T w 

21 s.t. 1^T w = 1, w >= 0 

22 

23 Each inner step solves the equality-constrained subproblem over the current 

24 active asset set. Stationarity gives ``2*Sigma_a*w_a = lambda*1 + rho*mu_a`` 

25 where ``Sigma_a = (1-alpha)*X_a^T X_a + ridge*I``. Solving the ``n_a x n_a`` 

26 SPD system ``Sigma_a v = 1`` (and ``Sigma_a v2 = mu_a`` when ``rho != 0``) 

27 and recovering ``lambda`` from the budget constraint avoids the indefinite 

28 ``(n_a+1) x (n_a+1)`` saddle-point system entirely. The outer primal-dual 

29 loop enforces ``w >= 0`` and terminates when both primal and dual feasibility 

30 hold simultaneously. 

31 

32 Use ``alpha = N/(N+T)`` for Ledoit-Wolf shrinkage intensity:: 

33 

34 T, N = X.shape 

35 w, iters = Problem(X, alpha=N/(N+T)).solve_kkt() 

36 

37 Examples: 

38 >>> import numpy as np 

39 >>> from fast_minimum_variance import Problem 

40 >>> X = np.random.default_rng(0).standard_normal((100, 5)) 

41 >>> w, iters = Problem(X).solve_kkt() 

42 >>> float(round(w.sum(), 6)) 

43 1.0 

44 >>> bool((w >= 0).all()) 

45 True 

46 """ 

47 

48 # No extra fields — X, alpha, rho, mu all inherited from _BaseProblem. 

49 

50 # ------------------------------------------------------------------ 

51 # Outer loop: primal elimination + dual feasibility check 

52 # ------------------------------------------------------------------ 

53 def _constraint_active_set(self, solve_fn, tol=1e-6, max_iter=10_000): 

54 """Run the primal-dual active-set loop enforcing ``w >= 0``. 

55 

56 Calls ``solve_fn(active_mask)`` repeatedly. The *primal step* drops assets 

57 with negative weights; the *dual step* re-adds any excluded asset whose KKT 

58 gradient condition is violated. Terminates when both conditions hold 

59 simultaneously, which together with stationarity is sufficient for global 

60 optimality. 

61 """ 

62 n = self.n 

63 asset_active = np.ones(n, dtype=bool) 

64 total_iters = 0 

65 

66 prev_active = None 

67 

68 for _ in range(max_iter): 

69 if prev_active is not None and np.array_equal(prev_active, asset_active): 

70 break # pragma: no cover - structurally unreachable safety guard 

71 prev_active = asset_active.copy() 

72 

73 # === Solve === 

74 w_a, step_iters = solve_fn(asset_active) 

75 total_iters += step_iters 

76 

77 # === PRIMAL STEP === 

78 neg = w_a < -tol 

79 if np.any(neg): 

80 idx = np.where(asset_active)[0] 

81 

82 strong = w_a < -10 * tol 

83 

84 if np.any(strong): 

85 asset_active[idx[strong]] = False 

86 else: 

87 j = idx[np.argmin(w_a)] 

88 asset_active[j] = False 

89 

90 continue # CRITICAL 

91 

92 # === Assemble full vector === 

93 w = np.zeros(n) 

94 w[asset_active] = w_a 

95 

96 # === Gradient === 

97 if self.target is None: 

98 grad = 2.0 * (self.X.T @ (self.X @ w)) / self.t 

99 else: 

100 grad = 2.0 * ((1 - self.alpha) * (self.X.T @ (self.X @ w)) / self.t + self.alpha * self.target @ w) 

101 

102 if self.rho != 0.0 and self.mu is not None: 

103 grad = grad - self.rho * self.mu 

104 

105 # === Lambda === 

106 g_a = grad[asset_active] 

107 lambda_ = np.median(g_a) if g_a.size > 5 else g_a.mean() 

108 

109 # === Dual === 

110 nu = grad - lambda_ 

111 

112 excluded = ~asset_active 

113 if not excluded.any(): 

114 break 

115 

116 nu_ex = nu[excluded] 

117 idx_ex = np.where(excluded)[0] 

118 

119 j = idx_ex[np.argmin(nu_ex)] 

120 violate = nu[j] 

121 

122 # === DUAL STEP === 

123 if violate >= -tol: 

124 break 

125 

126 asset_active[j] = True 

127 

128 return w, total_iters 

129 

130 # ------------------------------------------------------------------ 

131 # Inner steps 

132 # ------------------------------------------------------------------ 

133 

134 def _kkt_step(self, active): 

135 """Solve the reduced SPD system directly; return ``(w_a, 1)``. 

136 

137 Stationarity gives ``2*Sigma_a*w_a = lambda*1 + rho*mu_a``. A single 

138 solve with two RHS columns yields ``v1 = Sigma_a^{-1} 1`` and 

139 ``v2 = Sigma_a^{-1} mu_a``; the budget constraint then pins ``lambda`` 

140 analytically as ``lambda = 2*(1 - rho/2 * sum(v2)) / sum(v1)``. 

141 """ 

142 x_a = self.X[:, active] 

143 n_a = int(active.sum()) 

144 if self.target is None: 

145 sigma = (x_a.T @ x_a) / self.t 

146 else: 

147 sigma = (1.0 - self.alpha) * (x_a.T @ x_a) / self.t + self.alpha * self.target[np.ix_(active, active)] 

148 

149 if self.rho == 0.0 or self.mu is None: 

150 v = np.linalg.solve(sigma, np.ones(n_a)) 

151 return v / v.sum(), 1 

152 

153 v1, v2 = np.linalg.solve(sigma, np.column_stack([np.ones(n_a), self.mu[active]])).T 

154 half_rho = 0.5 * self.rho 

155 half_lambda = (1.0 - half_rho * v2.sum()) / v1.sum() 

156 return half_lambda * v1 + half_rho * v2, 1 

157 

158 def _cvxpy_constraints(self, w, cp): 

159 """Return budget-equality and long-only inequality constraints for CVXPY.""" 

160 return [cp.sum(w) == 1, w >= 0] 

161 

162 def _cg_step(self, active): 

163 """Solve the reduced SPD system via matrix-free CG; return ``(w_a, iters)``. 

164 

165 Builds a ``LinearOperator`` for ``v -> (1-alpha)*X_a'*(X_a*v) + gamma*v`` 

166 and runs conjugate gradients without ever forming ``Sigma_a`` explicitly. 

167 """ 

168 x_a = self.X[:, active] 

169 n_a = int(active.sum()) 

170 target_sub = self.target[np.ix_(active, active)] if self.target is not None else None 

171 

172 def matvec(v): 

173 """Apply Sigma_LW matrix-free: v -> (1-alpha)/T * X_a'*(X_a*v) + alpha*target_a*v.""" 

174 if target_sub is None: 

175 return (x_a.T @ (x_a @ v)) / self.t 

176 return (1.0 - self.alpha) * (x_a.T @ (x_a @ v)) / self.t + self.alpha * (target_sub @ v) 

177 

178 op = LinearOperator((n_a, n_a), matvec=matvec, dtype=np.float64) # type: ignore[call-arg] 

179 

180 iters = [0] 

181 

182 def _count(_): 

183 """Increment CG iteration counter for the first solve.""" 

184 iters[0] += 1 

185 

186 if self.rho == 0.0 or self.mu is None: 

187 v, _ = cg(op, np.ones(n_a), callback=_count) 

188 return v / v.sum(), iters[0] 

189 

190 iters2 = [0] 

191 

192 def _count2(_): 

193 """Increment CG iteration counter for the second solve.""" 

194 iters2[0] += 1 

195 

196 v1, _ = cg(op, np.ones(n_a), callback=_count) 

197 v2, _ = cg(op, self.mu[active], callback=_count2) 

198 half_rho = 0.5 * self.rho 

199 half_lambda = (1.0 - half_rho * v2.sum()) / v1.sum() 

200 return half_lambda * v1 + half_rho * v2, iters[0] + iters2[0] 

201 

202 def _clarabel_constraints(self): 

203 """Return budget-equality and long-only inequality constraints for Clarabel.""" 

204 n = self.n 

205 a_mat = vstack( 

206 [csc_matrix(np.ones((1, n))), -eye(n, format="csc")], 

207 format="csc", 

208 ) 

209 

210 b_vec = np.concatenate([[1.0], np.zeros(n)]) 

211 cones = [clarabel.ZeroConeT(1), clarabel.NonnegativeConeT(n)] # type: ignore[attr-defined] 

212 return a_mat, b_vec, cones 

213 

214 def _osqp_constraints(self): 

215 """Return budget-equality and long-only inequality constraints for OSQP.""" 

216 n = self.n 

217 a_mat = vstack( 

218 [csc_matrix(np.ones((1, n))), eye(n, format="csc")], 

219 format="csc", 

220 ) 

221 l_vec = np.concatenate([[1.0], np.zeros(n)]) 

222 u_vec = np.concatenate([[1.0], np.full(n, np.inf)]) 

223 return a_mat, l_vec, u_vec 

224 

225 def _nnls_solve(self): 

226 """Solve via NNLS on the augmented return matrix; return ``(w, 1)``. 

227 

228 Builds ``A = [sqrt(1-alpha)*X ; sqrt(gamma)*I ; M*ones^T]`` and 

229 solves ``min ||Aw||² s.t. w >= 0``. The budget row with weight 

230 ``M = ||X||_F * T`` enforces ``ones^T w ≈ 1``; exact normalisation 

231 is applied by the ``project`` step in ``solve_nnls``. 

232 Return tilt (``rho != 0``) is not supported. 

233 """ 

234 t = self.X.shape[0] 

235 m = float(np.linalg.norm(self.X, "fro")) * t 

236 

237 if self.target is not None: 

238 # target is the penalty matrix M; Cholesky gives chol s.t. chol @ chol.T = M, 

239 # so sqrt(alpha)*chol.T rows enforce alpha * w^T M w in the LS objective. 

240 chol = np.linalg.cholesky(self.target) 

241 rows = [np.sqrt((1 - self.alpha) / self.t) * self.X] 

242 tgt = [np.zeros(t)] 

243 if self.alpha > 0.0: 

244 rows.append(np.sqrt(self.alpha) * chol.T) 

245 tgt.append(np.zeros(self.n)) 

246 else: 

247 rows = [np.sqrt(1.0 / self.t) * self.X] 

248 tgt = [np.zeros(t)] 

249 rows.append(m * np.ones((1, self.n))) 

250 tgt.append(np.array([m])) 

251 

252 w, _ = nnls(np.vstack(rows), np.concatenate(tgt)) 

253 return w, 1