Coverage for src/jquantstats/_stats/_rolling.py: 100%
82 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-23 06:13 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-23 06:13 +0000
1"""Rolling-window statistical metrics for financial returns data."""
3from __future__ import annotations
5import math
6from typing import TYPE_CHECKING
8import numpy as np
9import polars as pl
11from ._core import _to_float
12from ._drawdown import _DrawdownMixin
13from ._internals import _annualization_factor
15if TYPE_CHECKING:
16 from ..data import Data
18# ── Rolling statistics mixin ─────────────────────────────────────────────────
21class _RollingStatsMixin:
22 """Mixin class providing rolling-window financial statistics methods.
24 Separates rolling-window computations from the core point-in-time metrics
25 in `_core`. The concrete `Stats` class inherits from both.
26 """
28 _data: Data
29 all: pl.DataFrame
31 if TYPE_CHECKING:
32 from .._protocol import DataLike
34 data: DataLike
36 def implied_volatility(self, periods: int = 252, annualize: bool = True) -> pl.DataFrame | dict[str, float]:
37 """Calculate implied volatility using log returns.
39 Uses log returns (ln(1 + r)) instead of simple returns for mathematical
40 correctness with continuous compounding.
42 When ``annualize=True`` (default), returns a rolling DataFrame of
43 annualised log-return volatility: ``rolling_std(periods) * sqrt(periods)``.
44 When ``annualize=False``, returns a scalar standard deviation per asset.
46 Args:
47 periods (int): Rolling window size and annualisation factor. Defaults to 252.
48 annualize (bool): Whether to annualize and return a rolling series.
49 Defaults to True.
51 Returns:
52 pl.DataFrame: Rolling annualised implied volatility (one column per
53 asset) when ``annualize=True``.
54 dict[str, float]: Scalar log-return std per asset when
55 ``annualize=False``.
57 """
58 if annualize:
59 scale = _annualization_factor(periods)
60 return self.all.select(
61 [pl.col(name) for name in self._data.date_col]
62 + [
63 ((1.0 + pl.col(col)).log(math.e).rolling_std(window_size=periods) * scale).alias(col)
64 for col, _ in self._data.items()
65 ]
66 )
67 return {
68 col: _to_float((1.0 + series.cast(pl.Float64)).log(math.e).cast(pl.Float64).std())
69 for col, series in self._data.items()
70 }
72 @staticmethod
73 def _pct_rank_series(s: pl.Series) -> float:
74 """Percentile rank of the last element among all elements (pandas average method).
76 Args:
77 s (pl.Series): Window of price values.
79 Returns:
80 float: Rank of s[-1] in [0, 100].
82 """
83 arr = s.to_numpy()
84 current = arr[-1]
85 n = len(arr)
86 below = float(np.sum(arr < current))
87 equal = float(np.sum(arr == current))
88 return (below + (equal + 1) / 2) / n * 100.0
90 def pct_rank(self, window: int = 60) -> pl.DataFrame:
91 """Calculate the rolling percentile rank of prices within a window.
93 Converts returns to a cumulative price series, then for each period
94 returns the percentile rank (0-100) of the current price within the
95 trailing ``window`` prices. Matches ``qs.stats.pct_rank`` (pandas
96 ``rank(pct=True)`` with ``method='average'``).
98 Args:
99 window (int): Rolling window size. Defaults to 60.
101 Returns:
102 pl.DataFrame: Date column(s) plus one percentile-rank column per asset.
104 Raises:
105 ValueError: If window is not a positive integer.
107 """
108 if not isinstance(window, int) or window <= 0:
109 raise ValueError("window must be a positive integer") # noqa: TRY003
111 cols: list[pl.Expr | pl.Series] = [pl.col(name) for name in self._data.date_col]
112 for col, series in self._data.items():
113 prices = _DrawdownMixin.prices(series)
114 ranked = prices.rolling_map(
115 function=self._pct_rank_series,
116 window_size=window,
117 ).alias(col)
118 cols.append(ranked)
120 return self.all.select(cols)
122 def rolling_sortino(
123 self,
124 rolling_period: int = 126,
125 periods_per_year: int | float | None = None,
126 ) -> pl.DataFrame:
127 """Calculate the rolling Sortino ratio.
129 Args:
130 rolling_period: Rolling window size. Defaults to 126.
131 periods_per_year: Periods per year for annualisation.
133 Returns:
134 pl.DataFrame: Date column(s) plus one annualised rolling Sortino
135 column per asset.
137 Raises:
138 ValueError: If rolling_period is not a positive integer.
140 """
141 if not isinstance(rolling_period, int) or rolling_period <= 0:
142 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003
143 ppy = periods_per_year or self._data._periods_per_year
144 scale = _annualization_factor(ppy)
145 exprs: list[pl.Expr] = []
146 for col, _ in self._data.items():
147 mean_ret = pl.col(col).rolling_mean(window_size=rolling_period)
148 negative_squared = pl.when(pl.col(col) < 0).then(pl.col(col) ** 2).otherwise(0.0)
149 downside = negative_squared.rolling_mean(window_size=rolling_period)
150 exprs.append(((mean_ret / downside.sqrt()) * scale).alias(col))
151 return self.all.select([pl.col(name) for name in self._data.date_col] + exprs)
153 def rolling_sharpe(
154 self,
155 rolling_period: int = 126,
156 periods_per_year: int | float | None = None,
157 ) -> pl.DataFrame:
158 """Calculate the rolling Sharpe ratio.
160 Args:
161 rolling_period: Rolling window size. Defaults to 126.
162 periods_per_year: Periods per year for annualisation.
164 Returns:
165 pl.DataFrame: Date column(s) plus one annualised rolling Sharpe
166 column per asset.
168 Raises:
169 ValueError: If rolling_period is not a positive integer.
171 """
172 actual_window = rolling_period
173 actual_periods = periods_per_year or self._data._periods_per_year
174 if not isinstance(actual_window, int) or actual_window <= 0:
175 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003
176 scale = _annualization_factor(actual_periods)
177 return self.all.select(
178 [pl.col(name) for name in self._data.date_col]
179 + [
180 (
181 pl.col(col).rolling_mean(window_size=actual_window)
182 / pl.col(col).rolling_std(window_size=actual_window)
183 * scale
184 ).alias(col)
185 for col, _ in self._data.items()
186 ]
187 )
189 def rolling_greeks(
190 self,
191 rolling_period: int = 126,
192 periods_per_year: int | float | None = None,
193 benchmark: str | None = None,
194 ) -> pl.DataFrame:
195 """Rolling alpha and beta versus the benchmark.
197 Computes rolling alpha (annualised) and beta for each asset against the
198 benchmark using a trailing window. Beta is estimated via the standard
199 OLS formula: ``cov(asset, bench) / var(bench)``. Alpha is the
200 per-period intercept annualised by multiplying by *periods_per_year*.
202 Args:
203 rolling_period (int): Trailing window size. Defaults to 126.
204 periods_per_year (int | float, optional): Periods per year used to
205 annualise alpha. Defaults to the value inferred from the data.
206 benchmark (str, optional): Benchmark column name. Defaults to the
207 first benchmark column.
209 Returns:
210 pl.DataFrame: Date column(s) followed by ``{asset}_alpha`` and
211 ``{asset}_beta`` columns for every asset.
213 Raises:
214 AttributeError: If no benchmark data is attached.
215 ValueError: If *rolling_period* is not a positive integer.
216 """
217 if self._data.benchmark is None:
218 raise AttributeError("No benchmark data available") # noqa: TRY003
219 if not isinstance(rolling_period, int) or rolling_period <= 0:
220 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003
222 ppy = periods_per_year or self._data._periods_per_year
223 all_df = self.all
224 bench_col = benchmark or self._data.benchmark.columns[0]
226 w = rolling_period
227 exprs: list[pl.Expr] = []
228 for col, _ in self._data.items():
229 mean_x = pl.col(col).rolling_mean(window_size=w)
230 mean_y = pl.col(bench_col).rolling_mean(window_size=w)
231 mean_xy = (pl.col(col) * pl.col(bench_col)).rolling_mean(window_size=w)
232 mean_y2 = (pl.col(bench_col) ** 2).rolling_mean(window_size=w)
234 bench_var = mean_y2 - mean_y**2
235 bench_cov = mean_xy - mean_x * mean_y
237 # beta = cov(asset, bench) / var(bench); NaN when var(bench) = 0
238 beta_expr = (bench_cov / bench_var).alias(f"{col}_beta")
239 # alpha (per period) = mean(asset) - beta * mean(bench), annualised
240 alpha_expr = ((mean_x - (bench_cov / bench_var) * mean_y) * ppy).alias(f"{col}_alpha")
242 exprs.extend([beta_expr, alpha_expr])
244 return all_df.select([pl.col(name) for name in self._data.date_col] + exprs)
246 def rolling_volatility(
247 self,
248 rolling_period: int = 126,
249 periods_per_year: int | float | None = None,
250 annualize: bool = True,
251 ) -> pl.DataFrame:
252 """Calculate the rolling volatility of returns.
254 Args:
255 rolling_period: Rolling window size. Defaults to 126.
256 periods_per_year: Periods per year for annualisation.
257 annualize: Multiply by ``sqrt(periods_per_year)`` when True (default).
259 Returns:
260 pl.DataFrame: Date column(s) plus one rolling volatility column
261 per asset.
263 Raises:
264 ValueError: If rolling_period is not a positive integer.
265 TypeError: If periods_per_year is not numeric.
267 """
268 actual_window = rolling_period
269 actual_periods = periods_per_year or self._data._periods_per_year
270 if not isinstance(actual_window, int) or actual_window <= 0:
271 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003
272 if not isinstance(actual_periods, int | float):
273 raise TypeError
274 factor = _annualization_factor(actual_periods) if annualize else 1.0
275 return self.all.select(
276 [pl.col(name) for name in self._data.date_col]
277 + [
278 (pl.col(col).rolling_std(window_size=actual_window) * factor).alias(col)
279 for col, _ in self._data.items()
280 ]
281 )