Coverage for src / jquantstats / _stats / _rolling.py: 100%

75 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-07 14:28 +0000

1"""Rolling-window statistical metrics for financial returns data.""" 

2 

3from __future__ import annotations 

4 

5import math 

6from typing import TYPE_CHECKING, cast 

7 

8import numpy as np 

9import polars as pl 

10 

11from ._core import _to_float, to_frame 

12from ._internals import _annualization_factor 

13from ._performance import _PerformanceStatsMixin 

14 

15# ── Rolling statistics mixin ───────────────────────────────────────────────── 

16 

17 

18class _RollingStatsMixin: 

19 """Mixin class providing rolling-window financial statistics methods. 

20 

21 Separates rolling-window computations from the core point-in-time metrics 

22 in :mod:`~jquantstats._stats._core`. The concrete 

23 :class:`~jquantstats._stats.Stats` dataclass inherits from both. 

24 

25 Attributes (provided by the concrete subclass): 

26 data: The :class:`~jquantstats._data.Data` object. 

27 all: Combined DataFrame for efficient column selection. 

28 """ 

29 

30 if TYPE_CHECKING: 

31 from ._protocol import DataLike 

32 

33 data: DataLike 

34 all: pl.DataFrame | None 

35 

36 def implied_volatility(self, periods: int = 252, annualize: bool = True) -> pl.DataFrame | dict[str, float]: 

37 """Calculate implied volatility using log returns. 

38 

39 Uses log returns (ln(1 + r)) instead of simple returns for mathematical 

40 correctness with continuous compounding. 

41 

42 When ``annualize=True`` (default), returns a rolling DataFrame of 

43 annualised log-return volatility: ``rolling_std(periods) * sqrt(periods)``. 

44 When ``annualize=False``, returns a scalar standard deviation per asset. 

45 

46 Args: 

47 periods (int): Rolling window size and annualisation factor. Defaults to 252. 

48 annualize (bool): Whether to annualize and return a rolling series. 

49 Defaults to True. 

50 

51 Returns: 

52 pl.DataFrame: Rolling annualised implied volatility (one column per 

53 asset) when ``annualize=True``. 

54 dict[str, float]: Scalar log-return std per asset when 

55 ``annualize=False``. 

56 

57 """ 

58 if annualize: 

59 scale = _annualization_factor(periods) 

60 return cast(pl.DataFrame, self.all).select( 

61 [pl.col(name) for name in self.data.date_col] 

62 + [ 

63 ((1.0 + pl.col(col)).log(math.e).rolling_std(window_size=periods) * scale).alias(col) 

64 for col, _ in self.data.items() 

65 ] 

66 ) 

67 return { 

68 col: _to_float((1.0 + series.cast(pl.Float64)).log(math.e).cast(pl.Float64).std()) 

69 for col, series in self.data.items() 

70 } 

71 

72 @staticmethod 

73 def _pct_rank_series(s: pl.Series) -> float: 

74 """Percentile rank of the last element among all elements (pandas average method). 

75 

76 Args: 

77 s (pl.Series): Window of price values. 

78 

79 Returns: 

80 float: Rank of s[-1] in [0, 100]. 

81 

82 """ 

83 arr = s.to_numpy() 

84 current = arr[-1] 

85 n = len(arr) 

86 below = float(np.sum(arr < current)) 

87 equal = float(np.sum(arr == current)) 

88 return (below + (equal + 1) / 2) / n * 100.0 

89 

90 def pct_rank(self, window: int = 60) -> pl.DataFrame: 

91 """Calculate the rolling percentile rank of prices within a window. 

92 

93 Converts returns to a cumulative price series, then for each period 

94 returns the percentile rank (0-100) of the current price within the 

95 trailing ``window`` prices. Matches ``qs.stats.pct_rank`` (pandas 

96 ``rank(pct=True)`` with ``method='average'``). 

97 

98 Args: 

99 window (int): Rolling window size. Defaults to 60. 

100 

101 Returns: 

102 pl.DataFrame: Date column(s) plus one percentile-rank column per asset. 

103 

104 Raises: 

105 ValueError: If window is not a positive integer. 

106 

107 """ 

108 if not isinstance(window, int) or window <= 0: 

109 raise ValueError("window must be a positive integer") # noqa: TRY003 

110 

111 cols = [] 

112 for col, series in self.data.items(): 

113 prices = _PerformanceStatsMixin.prices(series) 

114 ranked = prices.rolling_map( 

115 function=self._pct_rank_series, 

116 window_size=window, 

117 ).alias(col) 

118 cols.append(ranked) 

119 

120 return cast(pl.DataFrame, self.all).select([pl.col(name) for name in self.data.date_col] + cols) 

121 

122 @to_frame 

123 def rolling_sortino( 

124 self, series: pl.Expr, rolling_period: int = 126, periods_per_year: int | float | None = None 

125 ) -> pl.Expr: 

126 """Calculate the rolling Sortino ratio. 

127 

128 Args: 

129 series (pl.Expr): The expression to calculate rolling Sortino ratio for. 

130 rolling_period (int, optional): The rolling window size. Defaults to 126. 

131 periods_per_year (int, optional): Number of periods per year. Defaults to 252. 

132 

133 Returns: 

134 pl.Expr: The rolling Sortino ratio expression. 

135 

136 """ 

137 ppy = periods_per_year or self.data._periods_per_year 

138 

139 mean_ret = series.rolling_mean(window_size=rolling_period) 

140 

141 # Rolling downside deviation (squared negative returns averaged over window) 

142 downside = series.map_elements(lambda x: x**2 if x < 0 else 0.0, return_dtype=pl.Float64).rolling_mean( 

143 window_size=rolling_period 

144 ) 

145 

146 # Avoid division by zero 

147 sortino = mean_ret / downside.sqrt().fill_nan(0).fill_null(0) 

148 return cast(pl.Expr, sortino * (ppy**0.5)) 

149 

150 def rolling_sharpe( 

151 self, 

152 rolling_period: int = 126, 

153 periods_per_year: int | float | None = None, 

154 ) -> pl.DataFrame: 

155 """Calculate the rolling Sharpe ratio. 

156 

157 Args: 

158 rolling_period: Rolling window size. Defaults to 126. 

159 periods_per_year: Periods per year for annualisation. 

160 

161 Returns: 

162 pl.DataFrame: Date column(s) plus one annualised rolling Sharpe 

163 column per asset. 

164 

165 Raises: 

166 ValueError: If rolling_period is not a positive integer. 

167 

168 """ 

169 actual_window = rolling_period 

170 actual_periods = periods_per_year or self.data._periods_per_year 

171 if not isinstance(actual_window, int) or actual_window <= 0: 

172 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003 

173 scale = _annualization_factor(actual_periods) 

174 return cast(pl.DataFrame, self.all).select( 

175 [pl.col(name) for name in self.data.date_col] 

176 + [ 

177 ( 

178 pl.col(col).rolling_mean(window_size=actual_window) 

179 / pl.col(col).rolling_std(window_size=actual_window) 

180 * scale 

181 ).alias(col) 

182 for col, _ in self.data.items() 

183 ] 

184 ) 

185 

186 def rolling_greeks( 

187 self, 

188 rolling_period: int = 126, 

189 periods_per_year: int | float | None = None, 

190 benchmark: str | None = None, 

191 ) -> pl.DataFrame: 

192 """Rolling alpha and beta versus the benchmark. 

193 

194 Computes rolling alpha (annualised) and beta for each asset against the 

195 benchmark using a trailing window. Beta is estimated via the standard 

196 OLS formula: ``cov(asset, bench) / var(bench)``. Alpha is the 

197 per-period intercept annualised by multiplying by *periods_per_year*. 

198 

199 Args: 

200 rolling_period (int): Trailing window size. Defaults to 126. 

201 periods_per_year (int | float, optional): Periods per year used to 

202 annualise alpha. Defaults to the value inferred from the data. 

203 benchmark (str, optional): Benchmark column name. Defaults to the 

204 first benchmark column. 

205 

206 Returns: 

207 pl.DataFrame: Date column(s) followed by ``{asset}_alpha`` and 

208 ``{asset}_beta`` columns for every asset. 

209 

210 Raises: 

211 AttributeError: If no benchmark data is attached. 

212 ValueError: If *rolling_period* is not a positive integer. 

213 """ 

214 if self.data.benchmark is None: 

215 raise AttributeError("No benchmark data available") # noqa: TRY003 

216 if not isinstance(rolling_period, int) or rolling_period <= 0: 

217 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003 

218 

219 ppy = periods_per_year or self.data._periods_per_year 

220 all_df = cast(pl.DataFrame, self.all) 

221 bench_col = benchmark or self.data.benchmark.columns[0] 

222 

223 w = rolling_period 

224 exprs: list[pl.Expr] = [] 

225 for col, _ in self.data.items(): 

226 mean_x = pl.col(col).rolling_mean(window_size=w) 

227 mean_y = pl.col(bench_col).rolling_mean(window_size=w) 

228 mean_xy = (pl.col(col) * pl.col(bench_col)).rolling_mean(window_size=w) 

229 mean_y2 = (pl.col(bench_col) ** 2).rolling_mean(window_size=w) 

230 

231 bench_var = mean_y2 - mean_y**2 

232 bench_cov = mean_xy - mean_x * mean_y 

233 

234 # beta = cov(asset, bench) / var(bench); NaN when var(bench) = 0 

235 beta_expr = (bench_cov / bench_var).alias(f"{col}_beta") 

236 # alpha (per period) = mean(asset) - beta * mean(bench), annualised 

237 alpha_expr = ((mean_x - (bench_cov / bench_var) * mean_y) * ppy).alias(f"{col}_alpha") 

238 

239 exprs.extend([beta_expr, alpha_expr]) 

240 

241 return all_df.select([pl.col(name) for name in self.data.date_col] + exprs) 

242 

243 def rolling_volatility( 

244 self, 

245 rolling_period: int = 126, 

246 periods_per_year: int | float | None = None, 

247 annualize: bool = True, 

248 ) -> pl.DataFrame: 

249 """Calculate the rolling volatility of returns. 

250 

251 Args: 

252 rolling_period: Rolling window size. Defaults to 126. 

253 periods_per_year: Periods per year for annualisation. 

254 annualize: Multiply by ``sqrt(periods_per_year)`` when True (default). 

255 

256 Returns: 

257 pl.DataFrame: Date column(s) plus one rolling volatility column 

258 per asset. 

259 

260 Raises: 

261 ValueError: If rolling_period is not a positive integer. 

262 TypeError: If periods_per_year is not numeric. 

263 

264 """ 

265 actual_window = rolling_period 

266 actual_periods = periods_per_year or self.data._periods_per_year 

267 if not isinstance(actual_window, int) or actual_window <= 0: 

268 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003 

269 if not isinstance(actual_periods, int | float): 

270 raise TypeError 

271 factor = _annualization_factor(actual_periods) if annualize else 1.0 

272 return cast(pl.DataFrame, self.all).select( 

273 [pl.col(name) for name in self.data.date_col] 

274 + [(pl.col(col).rolling_std(window_size=actual_window) * factor).alias(col) for col, _ in self.data.items()] 

275 )