Coverage for src / jquantstats / _stats / _rolling.py: 100%
75 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-07 14:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-07 14:28 +0000
1"""Rolling-window statistical metrics for financial returns data."""
3from __future__ import annotations
5import math
6from typing import TYPE_CHECKING, cast
8import numpy as np
9import polars as pl
11from ._core import _to_float, to_frame
12from ._internals import _annualization_factor
13from ._performance import _PerformanceStatsMixin
15# ── Rolling statistics mixin ─────────────────────────────────────────────────
18class _RollingStatsMixin:
19 """Mixin class providing rolling-window financial statistics methods.
21 Separates rolling-window computations from the core point-in-time metrics
22 in :mod:`~jquantstats._stats._core`. The concrete
23 :class:`~jquantstats._stats.Stats` dataclass inherits from both.
25 Attributes (provided by the concrete subclass):
26 data: The :class:`~jquantstats._data.Data` object.
27 all: Combined DataFrame for efficient column selection.
28 """
30 if TYPE_CHECKING:
31 from ._protocol import DataLike
33 data: DataLike
34 all: pl.DataFrame | None
36 def implied_volatility(self, periods: int = 252, annualize: bool = True) -> pl.DataFrame | dict[str, float]:
37 """Calculate implied volatility using log returns.
39 Uses log returns (ln(1 + r)) instead of simple returns for mathematical
40 correctness with continuous compounding.
42 When ``annualize=True`` (default), returns a rolling DataFrame of
43 annualised log-return volatility: ``rolling_std(periods) * sqrt(periods)``.
44 When ``annualize=False``, returns a scalar standard deviation per asset.
46 Args:
47 periods (int): Rolling window size and annualisation factor. Defaults to 252.
48 annualize (bool): Whether to annualize and return a rolling series.
49 Defaults to True.
51 Returns:
52 pl.DataFrame: Rolling annualised implied volatility (one column per
53 asset) when ``annualize=True``.
54 dict[str, float]: Scalar log-return std per asset when
55 ``annualize=False``.
57 """
58 if annualize:
59 scale = _annualization_factor(periods)
60 return cast(pl.DataFrame, self.all).select(
61 [pl.col(name) for name in self.data.date_col]
62 + [
63 ((1.0 + pl.col(col)).log(math.e).rolling_std(window_size=periods) * scale).alias(col)
64 for col, _ in self.data.items()
65 ]
66 )
67 return {
68 col: _to_float((1.0 + series.cast(pl.Float64)).log(math.e).cast(pl.Float64).std())
69 for col, series in self.data.items()
70 }
72 @staticmethod
73 def _pct_rank_series(s: pl.Series) -> float:
74 """Percentile rank of the last element among all elements (pandas average method).
76 Args:
77 s (pl.Series): Window of price values.
79 Returns:
80 float: Rank of s[-1] in [0, 100].
82 """
83 arr = s.to_numpy()
84 current = arr[-1]
85 n = len(arr)
86 below = float(np.sum(arr < current))
87 equal = float(np.sum(arr == current))
88 return (below + (equal + 1) / 2) / n * 100.0
90 def pct_rank(self, window: int = 60) -> pl.DataFrame:
91 """Calculate the rolling percentile rank of prices within a window.
93 Converts returns to a cumulative price series, then for each period
94 returns the percentile rank (0-100) of the current price within the
95 trailing ``window`` prices. Matches ``qs.stats.pct_rank`` (pandas
96 ``rank(pct=True)`` with ``method='average'``).
98 Args:
99 window (int): Rolling window size. Defaults to 60.
101 Returns:
102 pl.DataFrame: Date column(s) plus one percentile-rank column per asset.
104 Raises:
105 ValueError: If window is not a positive integer.
107 """
108 if not isinstance(window, int) or window <= 0:
109 raise ValueError("window must be a positive integer") # noqa: TRY003
111 cols = []
112 for col, series in self.data.items():
113 prices = _PerformanceStatsMixin.prices(series)
114 ranked = prices.rolling_map(
115 function=self._pct_rank_series,
116 window_size=window,
117 ).alias(col)
118 cols.append(ranked)
120 return cast(pl.DataFrame, self.all).select([pl.col(name) for name in self.data.date_col] + cols)
122 @to_frame
123 def rolling_sortino(
124 self, series: pl.Expr, rolling_period: int = 126, periods_per_year: int | float | None = None
125 ) -> pl.Expr:
126 """Calculate the rolling Sortino ratio.
128 Args:
129 series (pl.Expr): The expression to calculate rolling Sortino ratio for.
130 rolling_period (int, optional): The rolling window size. Defaults to 126.
131 periods_per_year (int, optional): Number of periods per year. Defaults to 252.
133 Returns:
134 pl.Expr: The rolling Sortino ratio expression.
136 """
137 ppy = periods_per_year or self.data._periods_per_year
139 mean_ret = series.rolling_mean(window_size=rolling_period)
141 # Rolling downside deviation (squared negative returns averaged over window)
142 downside = series.map_elements(lambda x: x**2 if x < 0 else 0.0, return_dtype=pl.Float64).rolling_mean(
143 window_size=rolling_period
144 )
146 # Avoid division by zero
147 sortino = mean_ret / downside.sqrt().fill_nan(0).fill_null(0)
148 return cast(pl.Expr, sortino * (ppy**0.5))
150 def rolling_sharpe(
151 self,
152 rolling_period: int = 126,
153 periods_per_year: int | float | None = None,
154 ) -> pl.DataFrame:
155 """Calculate the rolling Sharpe ratio.
157 Args:
158 rolling_period: Rolling window size. Defaults to 126.
159 periods_per_year: Periods per year for annualisation.
161 Returns:
162 pl.DataFrame: Date column(s) plus one annualised rolling Sharpe
163 column per asset.
165 Raises:
166 ValueError: If rolling_period is not a positive integer.
168 """
169 actual_window = rolling_period
170 actual_periods = periods_per_year or self.data._periods_per_year
171 if not isinstance(actual_window, int) or actual_window <= 0:
172 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003
173 scale = _annualization_factor(actual_periods)
174 return cast(pl.DataFrame, self.all).select(
175 [pl.col(name) for name in self.data.date_col]
176 + [
177 (
178 pl.col(col).rolling_mean(window_size=actual_window)
179 / pl.col(col).rolling_std(window_size=actual_window)
180 * scale
181 ).alias(col)
182 for col, _ in self.data.items()
183 ]
184 )
186 def rolling_greeks(
187 self,
188 rolling_period: int = 126,
189 periods_per_year: int | float | None = None,
190 benchmark: str | None = None,
191 ) -> pl.DataFrame:
192 """Rolling alpha and beta versus the benchmark.
194 Computes rolling alpha (annualised) and beta for each asset against the
195 benchmark using a trailing window. Beta is estimated via the standard
196 OLS formula: ``cov(asset, bench) / var(bench)``. Alpha is the
197 per-period intercept annualised by multiplying by *periods_per_year*.
199 Args:
200 rolling_period (int): Trailing window size. Defaults to 126.
201 periods_per_year (int | float, optional): Periods per year used to
202 annualise alpha. Defaults to the value inferred from the data.
203 benchmark (str, optional): Benchmark column name. Defaults to the
204 first benchmark column.
206 Returns:
207 pl.DataFrame: Date column(s) followed by ``{asset}_alpha`` and
208 ``{asset}_beta`` columns for every asset.
210 Raises:
211 AttributeError: If no benchmark data is attached.
212 ValueError: If *rolling_period* is not a positive integer.
213 """
214 if self.data.benchmark is None:
215 raise AttributeError("No benchmark data available") # noqa: TRY003
216 if not isinstance(rolling_period, int) or rolling_period <= 0:
217 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003
219 ppy = periods_per_year or self.data._periods_per_year
220 all_df = cast(pl.DataFrame, self.all)
221 bench_col = benchmark or self.data.benchmark.columns[0]
223 w = rolling_period
224 exprs: list[pl.Expr] = []
225 for col, _ in self.data.items():
226 mean_x = pl.col(col).rolling_mean(window_size=w)
227 mean_y = pl.col(bench_col).rolling_mean(window_size=w)
228 mean_xy = (pl.col(col) * pl.col(bench_col)).rolling_mean(window_size=w)
229 mean_y2 = (pl.col(bench_col) ** 2).rolling_mean(window_size=w)
231 bench_var = mean_y2 - mean_y**2
232 bench_cov = mean_xy - mean_x * mean_y
234 # beta = cov(asset, bench) / var(bench); NaN when var(bench) = 0
235 beta_expr = (bench_cov / bench_var).alias(f"{col}_beta")
236 # alpha (per period) = mean(asset) - beta * mean(bench), annualised
237 alpha_expr = ((mean_x - (bench_cov / bench_var) * mean_y) * ppy).alias(f"{col}_alpha")
239 exprs.extend([beta_expr, alpha_expr])
241 return all_df.select([pl.col(name) for name in self.data.date_col] + exprs)
243 def rolling_volatility(
244 self,
245 rolling_period: int = 126,
246 periods_per_year: int | float | None = None,
247 annualize: bool = True,
248 ) -> pl.DataFrame:
249 """Calculate the rolling volatility of returns.
251 Args:
252 rolling_period: Rolling window size. Defaults to 126.
253 periods_per_year: Periods per year for annualisation.
254 annualize: Multiply by ``sqrt(periods_per_year)`` when True (default).
256 Returns:
257 pl.DataFrame: Date column(s) plus one rolling volatility column
258 per asset.
260 Raises:
261 ValueError: If rolling_period is not a positive integer.
262 TypeError: If periods_per_year is not numeric.
264 """
265 actual_window = rolling_period
266 actual_periods = periods_per_year or self.data._periods_per_year
267 if not isinstance(actual_window, int) or actual_window <= 0:
268 raise ValueError("rolling_period must be a positive integer") # noqa: TRY003
269 if not isinstance(actual_periods, int | float):
270 raise TypeError
271 factor = _annualization_factor(actual_periods) if annualize else 1.0
272 return cast(pl.DataFrame, self.all).select(
273 [pl.col(name) for name in self.data.date_col]
274 + [(pl.col(col).rolling_std(window_size=actual_window) * factor).alias(col) for col, _ in self.data.items()]
275 )