Coverage for src/jquantstats/_stats/_rolling.py: 100%

82 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-23 06:13 +0000

1"""Rolling-window statistical metrics for financial returns data.""" 

2 

3from __future__ import annotations 

4 

5import math 

6from typing import TYPE_CHECKING 

7 

8import numpy as np 

9import polars as pl 

10 

11from ._core import _to_float 

12from ._drawdown import _DrawdownMixin 

13from ._internals import _annualization_factor 

14 

15if TYPE_CHECKING: 

16 from ..data import Data 

17 

18# ── Rolling statistics mixin ───────────────────────────────────────────────── 

19 

20 

21class _RollingStatsMixin: 

22 """Mixin class providing rolling-window financial statistics methods. 

23 

24 Separates rolling-window computations from the core point-in-time metrics 

25 in `_core`. The concrete `Stats` class inherits from both. 

26 """ 

27 

28 _data: Data 

29 all: pl.DataFrame 

30 

31 if TYPE_CHECKING: 

32 from .._protocol import DataLike 

33 

34 data: DataLike 

35 

36 def implied_volatility(self, periods: int = 252, annualize: bool = True) -> pl.DataFrame | dict[str, float]: 

37 """Calculate implied volatility using log returns. 

38 

39 Uses log returns (ln(1 + r)) instead of simple returns for mathematical 

40 correctness with continuous compounding. 

41 

42 When ``annualize=True`` (default), returns a rolling DataFrame of 

43 annualised log-return volatility: ``rolling_std(periods) * sqrt(periods)``. 

44 When ``annualize=False``, returns a scalar standard deviation per asset. 

45 

46 Args: 

47 periods (int): Rolling window size and annualisation factor. Defaults to 252. 

48 annualize (bool): Whether to annualize and return a rolling series. 

49 Defaults to True. 

50 

51 Returns: 

52 pl.DataFrame: Rolling annualised implied volatility (one column per 

53 asset) when ``annualize=True``. 

54 dict[str, float]: Scalar log-return std per asset when 

55 ``annualize=False``. 

56 

57 """ 

58 if annualize: 

59 scale = _annualization_factor(periods) 

60 return self.all.select( 

61 [pl.col(name) for name in self._data.date_col] 

62 + [ 

63 ((1.0 + pl.col(col)).log(math.e).rolling_std(window_size=periods) * scale).alias(col) 

64 for col, _ in self._data.items() 

65 ] 

66 ) 

67 return { 

68 col: _to_float((1.0 + series.cast(pl.Float64)).log(math.e).cast(pl.Float64).std()) 

69 for col, series in self._data.items() 

70 } 

71 

72 @staticmethod 

73 def _pct_rank_series(s: pl.Series) -> float: 

74 """Percentile rank of the last element among all elements (pandas average method). 

75 

76 Args: 

77 s (pl.Series): Window of price values. 

78 

79 Returns: 

80 float: Rank of s[-1] in [0, 100]. 

81 

82 """ 

83 arr = s.to_numpy() 

84 current = arr[-1] 

85 n = len(arr) 

86 below = float(np.sum(arr < current)) 

87 equal = float(np.sum(arr == current)) 

88 return (below + (equal + 1) / 2) / n * 100.0 

89 

90 def pct_rank(self, window: int = 60) -> pl.DataFrame: 

91 """Calculate the rolling percentile rank of prices within a window. 

92 

93 Converts returns to a cumulative price series, then for each period 

94 returns the percentile rank (0-100) of the current price within the 

95 trailing ``window`` prices. Matches ``qs.stats.pct_rank`` (pandas 

96 ``rank(pct=True)`` with ``method='average'``). 

97 

98 Args: 

99 window (int): Rolling window size. Defaults to 60. 

100 

101 Returns: 

102 pl.DataFrame: Date column(s) plus one percentile-rank column per asset. 

103 

104 Raises: 

105 ValueError: If window is not a positive integer. 

106 

107 """ 

108 if not isinstance(window, int) or window <= 0: 

109 raise ValueError("window must be a positive integer") # noqa: TRY003 

110 

111 cols: list[pl.Expr | pl.Series] = [pl.col(name) for name in self._data.date_col] 

112 for col, series in self._data.items(): 

113 prices = _DrawdownMixin.prices(series) 

114 ranked = prices.rolling_map( 

115 function=self._pct_rank_series, 

116 window_size=window, 

117 ).alias(col) 

118 cols.append(ranked) 

119 

120 return self.all.select(cols) 

121 

122 def rolling_sortino( 

123 self, 

124 rolling_period: int = 126, 

125 periods_per_year: int | float | None = None, 

126 ) -> pl.DataFrame: 

127 """Calculate the rolling Sortino ratio. 

128 

129 Args: 

130 rolling_period: Rolling window size. Defaults to 126. 

131 periods_per_year: Periods per year for annualisation. 

132 

133 Returns: 

134 pl.DataFrame: Date column(s) plus one annualised rolling Sortino 

135 column per asset. 

136 

137 Raises: 

138 ValueError: If rolling_period is not a positive integer. 

139 

140 """ 

141 if not isinstance(rolling_period, int) or rolling_period <= 0: 

142 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003 

143 ppy = periods_per_year or self._data._periods_per_year 

144 scale = _annualization_factor(ppy) 

145 exprs: list[pl.Expr] = [] 

146 for col, _ in self._data.items(): 

147 mean_ret = pl.col(col).rolling_mean(window_size=rolling_period) 

148 negative_squared = pl.when(pl.col(col) < 0).then(pl.col(col) ** 2).otherwise(0.0) 

149 downside = negative_squared.rolling_mean(window_size=rolling_period) 

150 exprs.append(((mean_ret / downside.sqrt()) * scale).alias(col)) 

151 return self.all.select([pl.col(name) for name in self._data.date_col] + exprs) 

152 

153 def rolling_sharpe( 

154 self, 

155 rolling_period: int = 126, 

156 periods_per_year: int | float | None = None, 

157 ) -> pl.DataFrame: 

158 """Calculate the rolling Sharpe ratio. 

159 

160 Args: 

161 rolling_period: Rolling window size. Defaults to 126. 

162 periods_per_year: Periods per year for annualisation. 

163 

164 Returns: 

165 pl.DataFrame: Date column(s) plus one annualised rolling Sharpe 

166 column per asset. 

167 

168 Raises: 

169 ValueError: If rolling_period is not a positive integer. 

170 

171 """ 

172 actual_window = rolling_period 

173 actual_periods = periods_per_year or self._data._periods_per_year 

174 if not isinstance(actual_window, int) or actual_window <= 0: 

175 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003 

176 scale = _annualization_factor(actual_periods) 

177 return self.all.select( 

178 [pl.col(name) for name in self._data.date_col] 

179 + [ 

180 ( 

181 pl.col(col).rolling_mean(window_size=actual_window) 

182 / pl.col(col).rolling_std(window_size=actual_window) 

183 * scale 

184 ).alias(col) 

185 for col, _ in self._data.items() 

186 ] 

187 ) 

188 

189 def rolling_greeks( 

190 self, 

191 rolling_period: int = 126, 

192 periods_per_year: int | float | None = None, 

193 benchmark: str | None = None, 

194 ) -> pl.DataFrame: 

195 """Rolling alpha and beta versus the benchmark. 

196 

197 Computes rolling alpha (annualised) and beta for each asset against the 

198 benchmark using a trailing window. Beta is estimated via the standard 

199 OLS formula: ``cov(asset, bench) / var(bench)``. Alpha is the 

200 per-period intercept annualised by multiplying by *periods_per_year*. 

201 

202 Args: 

203 rolling_period (int): Trailing window size. Defaults to 126. 

204 periods_per_year (int | float, optional): Periods per year used to 

205 annualise alpha. Defaults to the value inferred from the data. 

206 benchmark (str, optional): Benchmark column name. Defaults to the 

207 first benchmark column. 

208 

209 Returns: 

210 pl.DataFrame: Date column(s) followed by ``{asset}_alpha`` and 

211 ``{asset}_beta`` columns for every asset. 

212 

213 Raises: 

214 AttributeError: If no benchmark data is attached. 

215 ValueError: If *rolling_period* is not a positive integer. 

216 """ 

217 if self._data.benchmark is None: 

218 raise AttributeError("No benchmark data available") # noqa: TRY003 

219 if not isinstance(rolling_period, int) or rolling_period <= 0: 

220 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003 

221 

222 ppy = periods_per_year or self._data._periods_per_year 

223 all_df = self.all 

224 bench_col = benchmark or self._data.benchmark.columns[0] 

225 

226 w = rolling_period 

227 exprs: list[pl.Expr] = [] 

228 for col, _ in self._data.items(): 

229 mean_x = pl.col(col).rolling_mean(window_size=w) 

230 mean_y = pl.col(bench_col).rolling_mean(window_size=w) 

231 mean_xy = (pl.col(col) * pl.col(bench_col)).rolling_mean(window_size=w) 

232 mean_y2 = (pl.col(bench_col) ** 2).rolling_mean(window_size=w) 

233 

234 bench_var = mean_y2 - mean_y**2 

235 bench_cov = mean_xy - mean_x * mean_y 

236 

237 # beta = cov(asset, bench) / var(bench); NaN when var(bench) = 0 

238 beta_expr = (bench_cov / bench_var).alias(f"{col}_beta") 

239 # alpha (per period) = mean(asset) - beta * mean(bench), annualised 

240 alpha_expr = ((mean_x - (bench_cov / bench_var) * mean_y) * ppy).alias(f"{col}_alpha") 

241 

242 exprs.extend([beta_expr, alpha_expr]) 

243 

244 return all_df.select([pl.col(name) for name in self._data.date_col] + exprs) 

245 

246 def rolling_volatility( 

247 self, 

248 rolling_period: int = 126, 

249 periods_per_year: int | float | None = None, 

250 annualize: bool = True, 

251 ) -> pl.DataFrame: 

252 """Calculate the rolling volatility of returns. 

253 

254 Args: 

255 rolling_period: Rolling window size. Defaults to 126. 

256 periods_per_year: Periods per year for annualisation. 

257 annualize: Multiply by ``sqrt(periods_per_year)`` when True (default). 

258 

259 Returns: 

260 pl.DataFrame: Date column(s) plus one rolling volatility column 

261 per asset. 

262 

263 Raises: 

264 ValueError: If rolling_period is not a positive integer. 

265 TypeError: If periods_per_year is not numeric. 

266 

267 """ 

268 actual_window = rolling_period 

269 actual_periods = periods_per_year or self._data._periods_per_year 

270 if not isinstance(actual_window, int) or actual_window <= 0: 

271 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003 

272 if not isinstance(actual_periods, int | float): 

273 raise TypeError 

274 factor = _annualization_factor(actual_periods) if annualize else 1.0 

275 return self.all.select( 

276 [pl.col(name) for name in self._data.date_col] 

277 + [ 

278 (pl.col(col).rolling_std(window_size=actual_window) * factor).alias(col) 

279 for col, _ in self._data.items() 

280 ] 

281 )