Coverage for src/jquantstats/_stats/_reporting.py: 100%
194 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-23 06:13 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-23 06:13 +0000
1"""Temporal reporting, capture ratios, and summary statistics."""
3from __future__ import annotations
5from typing import TYPE_CHECKING, Any, cast
7import polars as pl
9from ._core import _drawdown_series, _to_float, columnwise_stat
10from ._internals import _comp_return
12if TYPE_CHECKING:
13 from ..data import Data
15# ── Reporting statistics mixin ───────────────────────────────────────────────
18class _ReportingStatsMixin:
19 """Mixin providing temporal, capture, and summary reporting metrics.
21 Covers: periods per year, average drawdown, Calmar ratio, recovery factor,
22 max drawdown duration, monthly win rate, up/down capture ratios, annual
23 breakdown, and summary statistics table.
25 Cross-mixin dependencies:
26 - _BasicStatsMixin: avg_return, avg_win, avg_loss, win_rate, profit_factor,
27 payoff_ratio, best, worst, volatility, skew, kurtosis, value_at_risk,
28 conditional_value_at_risk, exposure
29 - _RiskStatsMixin: sharpe
30 - _DrawdownMixin: max_drawdown
31 """
33 _data: Data
34 all: pl.DataFrame
36 if TYPE_CHECKING:
37 from .._protocol import DataLike
39 data: DataLike
41 def avg_return(self) -> dict[str, float]:
42 """Defined on _BasicStatsMixin."""
44 def avg_win(self) -> dict[str, float]:
45 """Defined on _BasicStatsMixin."""
47 def avg_loss(self) -> dict[str, float]:
48 """Defined on _BasicStatsMixin."""
50 def win_rate(self) -> dict[str, float]:
51 """Defined on _BasicStatsMixin."""
53 def profit_factor(self) -> dict[str, float]:
54 """Defined on _BasicStatsMixin."""
56 def payoff_ratio(self) -> dict[str, float]:
57 """Defined on _BasicStatsMixin."""
59 def best(self) -> dict[str, float | None]:
60 """Defined on _BasicStatsMixin."""
62 def worst(self) -> dict[str, float | None]:
63 """Defined on _BasicStatsMixin."""
65 def volatility(self) -> dict[str, float]:
66 """Defined on _BasicStatsMixin."""
68 def sharpe(self) -> dict[str, float]:
69 """Defined on _RiskStatsMixin."""
71 def skew(self) -> dict[str, int | float | None]:
72 """Defined on _BasicStatsMixin."""
74 def kurtosis(self) -> dict[str, int | float | None]:
75 """Defined on _BasicStatsMixin."""
77 def value_at_risk(self) -> dict[str, float]:
78 """Defined on _BasicStatsMixin."""
80 def conditional_value_at_risk(self) -> dict[str, float]:
81 """Defined on _BasicStatsMixin."""
83 def max_drawdown(self) -> dict[str, float]:
84 """Defined on _DrawdownMixin."""
86 def exposure(self) -> dict[str, float]:
87 """Defined on _BasicStatsMixin."""
89 # ── Temporal & reporting ──────────────────────────────────────────────────
91 @property
92 def periods_per_year(self) -> float:
93 """Estimate the number of periods per year from the data index spacing.
95 Returns:
96 float: Estimated number of observations per calendar year.
97 """
98 return self._data._periods_per_year
100 @columnwise_stat
101 def avg_drawdown(self, series: pl.Series) -> float:
102 """Average drawdown across all underwater periods.
104 Returns 0.0 when there are no underwater periods.
106 Matches the QuantStats sign convention: drawdown is expressed as a
107 negative fraction (e.g. ``-0.2`` for 20% below peak).
109 Args:
110 series (pl.Series): Series of additive daily returns.
112 Returns:
113 float: Mean drawdown in [-1, 0].
114 """
115 dd = _drawdown_series(series)
116 in_dd = dd.filter(dd > 0)
117 # A series that never falls below its high-water mark has an average drawdown of exactly 0.0.
118 if in_dd.is_empty():
119 return 0.0
120 return -_to_float(in_dd.mean())
122 @columnwise_stat
123 def cagr(
124 self,
125 series: pl.Series,
126 rf: float = 0.0,
127 compounded: bool = True,
128 periods: int | float | None = None,
129 ) -> float:
130 """Calculate the Compound Annual Growth Rate (CAGR) of excess returns.
132 CAGR represents the geometric mean annual growth rate, providing a
133 smoothed annualized return that accounts for compounding effects.
135 Args:
136 series (pl.Series): Series of additive daily returns.
137 rf (float): Annualized risk-free rate. Defaults to 0.0.
138 compounded (bool): Whether to compound returns. Defaults to True.
139 periods: Periods per year for annualisation. Defaults to ``periods_per_year``.
141 Returns:
142 float: CAGR of excess returns.
144 Returns NaN when:
145 ``float("nan")`` when the series is empty.
146 """
147 raw_periods = periods or self._data._periods_per_year
148 n = len(series)
149 if n == 0:
150 return float("nan") # pragma: no cover
151 excess = series.cast(pl.Float64) - rf / raw_periods
152 total = _comp_return(excess) if compounded else _to_float(excess.sum())
153 years = n / raw_periods
154 return float(abs(1.0 + total) ** (1.0 / years) - 1.0)
156 def expected_return(
157 self,
158 aggregate: str | None = None,
159 compounded: bool = True,
160 ) -> dict[str, float]:
161 """Expected return with optional period aggregation.
163 Returns the arithmetic mean of per-period returns. When *aggregate* is
164 provided the returns are first compounded (or summed) within each
165 calendar period, and the mean is taken over those period returns.
167 Args:
168 aggregate (str | None): Period to aggregate to before computing the
169 mean. Accepted values: ``'weekly'``, ``'monthly'``,
170 ``'quarterly'``, ``'annual'`` / ``'yearly'``. Defaults to
171 ``None`` (raw per-period mean).
172 compounded (bool): Compound returns within each period when
173 *aggregate* is set. Defaults to ``True``.
175 Returns:
176 dict[str, float]: Mean return per asset for the specified period.
178 Raises:
179 ValueError: If *aggregate* is an unrecognised string.
181 Note:
182 Requires a temporal (Date / Datetime) index when *aggregate* is not
183 ``None``; falls back to the raw per-period mean otherwise.
185 Returns NaN when:
186 Entries are ``float("nan")`` when an asset has no non-null
187 observations.
188 """
189 _freq_map: dict[str, str] = {
190 "weekly": "1w",
191 "monthly": "1mo",
192 "quarterly": "3mo",
193 "annual": "1y",
194 "yearly": "1y",
195 }
197 def _geomean(s: pl.Series) -> float:
198 """Per-period geometric mean: (product(1 + r))^(1/n) - 1."""
199 n = s.count()
200 if n == 0:
201 return float("nan")
202 return float(_to_float((1.0 + s.cast(pl.Float64)).product()) ** (1.0 / n) - 1.0)
204 def _raw_expected_returns() -> dict[str, float]:
205 """Return the geometric mean of each raw return series."""
206 return {col: _geomean(series.drop_nulls()) for col, series in self._data.items()}
208 if aggregate is None:
209 return _raw_expected_returns()
211 if aggregate.lower() not in _freq_map:
212 raise ValueError(f"aggregate must be one of {list(_freq_map)}, got {aggregate!r}") # noqa: TRY003
214 all_df = self.all
215 date_col_name = self._data.date_col[0] if self._data.date_col else None
216 if date_col_name is None or not all_df[date_col_name].dtype.is_temporal():
217 return _raw_expected_returns()
219 trunc = _freq_map[aggregate.lower()]
220 agg_expr = ((1.0 + pl.col("ret")).product() - 1.0) if compounded else pl.col("ret").sum()
222 result: dict[str, float] = {}
223 for col, series in self._data.items():
224 df = (
225 pl.DataFrame({"date": all_df[date_col_name], "ret": series})
226 .drop_nulls()
227 .with_columns(pl.col("date").dt.truncate(trunc).alias("period"))
228 )
229 period_rets = df.group_by("period").agg(agg_expr.alias("ret"))["ret"]
230 result[col] = _geomean(period_rets)
231 return result
233 def rar(self, periods: int | float = 252) -> dict[str, float]:
234 """Risk-Adjusted Return: CAGR divided by exposure.
236 Measures annualised return per unit of market participation time,
237 matching the quantstats convention.
239 Args:
240 periods: Periods per year for CAGR annualisation. Defaults to ``periods_per_year``.
242 Returns:
243 dict[str, float]: RAR per asset.
244 """
245 cagr = self.cagr(periods=periods)
246 exp = self.exposure()
247 return {col: cagr[col] / exp[col] for col in cagr}
249 @columnwise_stat
250 def calmar(self, series: pl.Series, periods: int | float | None = None) -> float:
251 """Calmar ratio (CAGR divided by maximum drawdown).
253 Returns ``nan`` when the maximum drawdown is zero.
255 Args:
256 series (pl.Series): Series of additive daily returns.
257 periods: Annualisation factor. Defaults to ``periods_per_year``.
259 Returns:
260 float: Calmar ratio, or ``nan`` if max drawdown is zero.
261 """
262 raw_periods = float(periods or self._data._periods_per_year)
263 max_dd = _to_float(_drawdown_series(series).max())
264 if max_dd <= 0:
265 return float("nan")
266 n = len(series)
267 comp_return = _comp_return(series)
268 cagr = float((1.0 + comp_return) ** (raw_periods / n)) - 1.0
269 return cagr / max_dd
271 @columnwise_stat
272 def recovery_factor(self, series: pl.Series) -> float:
273 """Recovery factor (total return divided by maximum drawdown).
275 Matches the quantstats convention: total return is the simple sum of
276 returns, not compounded. Returns ``nan`` when the maximum drawdown
277 is zero.
279 Args:
280 series (pl.Series): Series of additive daily returns.
282 Returns:
283 float: Recovery factor, or ``nan`` if max drawdown is zero.
284 """
285 max_dd = _to_float(_drawdown_series(series).max())
286 if max_dd <= 0:
287 return float("nan")
288 total_return = _to_float(series.sum())
289 return abs(total_return) / max_dd
291 def max_drawdown_duration(self) -> dict[str, float | int | None]:
292 """Maximum drawdown duration in calendar days (or periods) per asset.
294 When the index is a temporal column (``Date`` / ``Datetime``) the
295 duration is expressed as calendar days spanned by the longest
296 underwater run. For integer-indexed data each row counts as one
297 period.
299 Returns:
300 dict[str, float | int | None]: Asset → max drawdown duration.
301 Returns 0 when there are no underwater periods.
302 """
303 all_df = self.all
304 date_col_name = self._data.date_col[0] if self._data.date_col else None
305 has_date = date_col_name is not None and all_df[date_col_name].dtype.is_temporal()
306 result: dict[str, float | int | None] = {}
307 for col, series in self._data.items():
308 nav = 1.0 + series.cast(pl.Float64).cum_sum()
309 hwm = nav.cum_max()
310 in_dd = nav < hwm
312 if not in_dd.any():
313 result[col] = 0
314 continue
316 if has_date and date_col_name is not None:
317 frame = pl.DataFrame({"date": all_df[date_col_name], "in_dd": in_dd})
318 else:
319 frame = pl.DataFrame({"date": pl.Series(list(range(len(series))), dtype=pl.Int64), "in_dd": in_dd})
321 frame = frame.with_columns(pl.col("in_dd").rle_id().alias("run_id"))
322 dd_runs = (
323 frame.filter(pl.col("in_dd"))
324 .group_by("run_id")
325 .agg([pl.col("date").min().alias("start"), pl.col("date").max().alias("end")])
326 )
328 if has_date:
329 dd_runs = dd_runs.with_columns(
330 ((pl.col("end") - pl.col("start")).dt.total_days() + 1).alias("duration")
331 )
332 else:
333 dd_runs = dd_runs.with_columns((pl.col("end") - pl.col("start") + 1).alias("duration"))
335 result[col] = int(_to_float(dd_runs["duration"].max()))
336 return result
338 def monthly_win_rate(self) -> dict[str, float]:
339 """Fraction of calendar months with a positive compounded return per asset.
341 Requires a temporal (Date / Datetime) index. Returns ``nan`` per
342 asset when no temporal index is present.
344 Returns:
345 dict[str, float]: Monthly win rate in [0, 1] per asset.
347 Returns NaN when:
348 Entries are ``float("nan")`` when no temporal index is present or an
349 asset has no non-null observations.
350 """
351 all_df = self.all
352 date_col_name = self._data.date_col[0] if self._data.date_col else None
353 if date_col_name is None or not all_df[date_col_name].dtype.is_temporal():
354 return {col: float("nan") for col, _ in self._data.items()}
356 result: dict[str, float] = {}
357 for col, _ in self._data.items():
358 df = (
359 all_df.select([date_col_name, col])
360 .drop_nulls()
361 .with_columns(
362 [
363 pl.col(date_col_name).dt.year().alias("_year"),
364 pl.col(date_col_name).dt.month().alias("_month"),
365 ]
366 )
367 )
368 monthly = (
369 df.group_by(["_year", "_month"])
370 .agg((pl.col(col) + 1.0).product().alias("gross"))
371 .with_columns((pl.col("gross") - 1.0).alias("monthly_return"))
372 )
373 n_total = len(monthly)
374 if n_total == 0:
375 result[col] = float("nan")
376 else:
377 n_positive = int((monthly["monthly_return"] > 0).sum())
378 result[col] = n_positive / n_total
379 return result
381 # ── Capture ratios ────────────────────────────────────────────────────────
383 def up_capture(self, benchmark: pl.Series) -> dict[str, float]:
384 """Up-market capture ratio relative to an explicit benchmark series.
386 Measures the fraction of the benchmark's upside that the strategy
387 captures. A value greater than 1.0 means the strategy outperformed
388 the benchmark in rising markets.
390 Args:
391 benchmark: Benchmark return series aligned row-by-row with the data.
393 Returns:
394 dict[str, float]: Up capture ratio per asset.
396 Returns NaN when:
397 Entries are ``float("nan")`` when the benchmark has no positive
398 periods, its up-market geometric mean is zero, or an asset has no
399 usable returns during those periods.
400 """
401 up_mask = benchmark > 0
402 bench_up = benchmark.filter(up_mask).drop_nulls()
403 # A benchmark with no positive periods makes up-capture undefined for every asset.
404 if bench_up.is_empty():
405 return {col: float("nan") for col, _ in self._data.items()}
406 bench_geom = float((bench_up + 1.0).product()) ** (1.0 / len(bench_up)) - 1.0
407 if bench_geom == 0.0: # pragma: no cover
408 return {col: float("nan") for col, _ in self._data.items()}
409 result: dict[str, float] = {}
410 for col, series in self._data.items():
411 strat_up = series.filter(up_mask).drop_nulls()
412 # An asset may have no usable returns during the benchmark's up periods after null filtering.
413 if strat_up.is_empty():
414 result[col] = float("nan")
415 else:
416 strat_geom = float((strat_up + 1.0).product()) ** (1.0 / len(strat_up)) - 1.0
417 result[col] = strat_geom / bench_geom
418 return result
420 def down_capture(self, benchmark: pl.Series) -> dict[str, float]:
421 """Down-market capture ratio relative to an explicit benchmark series.
423 A value less than 1.0 means the strategy lost less than the benchmark
424 in falling markets (a desirable property).
426 Args:
427 benchmark: Benchmark return series aligned row-by-row with the data.
429 Returns:
430 dict[str, float]: Down capture ratio per asset.
432 Returns NaN when:
433 Entries are ``float("nan")`` when the benchmark has no negative
434 periods, its down-market geometric mean is zero, or an asset has no
435 usable returns during those periods.
436 """
437 down_mask = benchmark < 0
438 bench_down = benchmark.filter(down_mask).drop_nulls()
439 # A benchmark with no negative periods makes down-capture undefined for every asset.
440 if bench_down.is_empty():
441 return {col: float("nan") for col, _ in self._data.items()}
442 bench_geom = float((bench_down + 1.0).product()) ** (1.0 / len(bench_down)) - 1.0
443 if bench_geom == 0.0: # pragma: no cover
444 return {col: float("nan") for col, _ in self._data.items()}
445 result: dict[str, float] = {}
446 for col, series in self._data.items():
447 strat_down = series.filter(down_mask).drop_nulls()
448 # An asset may have no usable returns during the benchmark's down periods after null filtering.
449 if strat_down.is_empty():
450 result[col] = float("nan")
451 else:
452 strat_geom = float((strat_down + 1.0).product()) ** (1.0 / len(strat_down)) - 1.0
453 result[col] = strat_geom / bench_geom
454 return result
456 # ── Summary & breakdown ────────────────────────────────────────────────────
458 def annual_breakdown(self) -> pl.DataFrame:
459 """Summary statistics broken down by calendar year.
461 Groups the data by calendar year using the date index, computes a
462 full `summary` for each year, and stacks the results with an
463 additional ``year`` column.
465 Returns:
466 pl.DataFrame: Columns ``year``, ``metric``, one per asset, sorted
467 by ``year``.
469 Raises:
470 ValueError: If the data has no date index.
471 """
472 all_df = self.all
473 date_col_name = self._data.date_col[0] if self._data.date_col else None
474 has_temporal = date_col_name is not None and all_df[date_col_name].dtype.is_temporal()
476 from ..data import Data
478 if not has_temporal:
479 # Integer-index fallback: group by chunks of ~_periods_per_year rows
480 chunk = round(self._data._periods_per_year)
481 total = all_df.height
482 frames_int: list[pl.DataFrame] = []
483 for i, start in enumerate(range(0, total, chunk), start=1):
484 chunk_all = all_df.slice(start, chunk)
485 if chunk_all.height < max(5, chunk // 4):
486 continue
487 chunk_index = chunk_all.select(self._data.date_col)
488 chunk_returns = chunk_all.select(self._data.returns.columns)
489 chunk_benchmark = (
490 chunk_all.select(self._data.benchmark.columns) if self._data.benchmark is not None else None
491 )
492 chunk_data = Data(returns=chunk_returns, index=chunk_index, benchmark=chunk_benchmark)
493 chunk_summary = cast(Any, type(self))(chunk_data).summary()
494 chunk_summary = chunk_summary.with_columns(pl.lit(i).alias("year"))
495 frames_int.append(chunk_summary)
496 if not frames_int:
497 return pl.DataFrame()
498 result_int = pl.concat(frames_int)
499 ordered_int = ["year", "metric", *[c for c in result_int.columns if c not in ("year", "metric")]]
500 return result_int.select(ordered_int)
502 if date_col_name is None: # unreachable: has_temporal guarantees non-None # pragma: no cover
503 return pl.DataFrame() # pragma: no cover
504 years = all_df[date_col_name].dt.year().unique().sort().to_list()
506 frames: list[pl.DataFrame] = []
507 for year in years:
508 year_all = all_df.filter(pl.col(date_col_name).dt.year() == year)
509 if year_all.height < 2:
510 continue
511 year_index = year_all.select([date_col_name])
512 year_returns = year_all.select(self._data.returns.columns)
513 year_benchmark = year_all.select(self._data.benchmark.columns) if self._data.benchmark is not None else None
514 year_data = Data(returns=year_returns, index=year_index, benchmark=year_benchmark)
515 year_summary = cast(Any, type(self))(year_data).summary()
516 year_summary = year_summary.with_columns(pl.lit(year).alias("year"))
517 frames.append(year_summary)
519 if not frames:
520 asset_cols = list(self._data.returns.columns)
521 schema: dict[str, type[pl.DataType]] = {
522 "year": pl.Int32,
523 "metric": pl.String,
524 **dict.fromkeys(asset_cols, pl.Float64),
525 }
526 return pl.DataFrame(schema=schema)
528 result = pl.concat(frames)
529 ordered = ["year", "metric", *[c for c in result.columns if c not in ("year", "metric")]]
530 return result.select(ordered)
532 def summary(self) -> pl.DataFrame:
533 """Summary statistics for each asset as a tidy DataFrame.
535 Each row is one metric; each column beyond ``metric`` is one asset.
537 Returns:
538 pl.DataFrame: A DataFrame with a ``metric`` column followed by one
539 column per asset.
541 Returns NaN when:
542 Cells are ``float("nan")`` when the underlying metric is unavailable
543 for the data (e.g. no temporal index or no benchmark).
544 """
545 assets = [col for col, _ in self._data.items()]
547 def _safe(fn: Any) -> dict[str, Any]:
548 """Call *fn()* and return its result; return NaN for each asset on any exception."""
549 try:
550 result: dict[str, Any] = fn()
551 except Exception:
552 return dict.fromkeys(assets, float("nan"))
553 return result
555 metrics: dict[str, dict[str, Any]] = {
556 "avg_return": _safe(self.avg_return),
557 "avg_win": _safe(self.avg_win),
558 "avg_loss": _safe(self.avg_loss),
559 "win_rate": _safe(self.win_rate),
560 "profit_factor": _safe(self.profit_factor),
561 "payoff_ratio": _safe(self.payoff_ratio),
562 "monthly_win_rate": _safe(self.monthly_win_rate),
563 "best": _safe(self.best),
564 "worst": _safe(self.worst),
565 "volatility": _safe(self.volatility),
566 "sharpe": _safe(self.sharpe),
567 "skew": _safe(self.skew),
568 "kurtosis": _safe(self.kurtosis),
569 "value_at_risk": _safe(self.value_at_risk),
570 "conditional_value_at_risk": _safe(self.conditional_value_at_risk),
571 "max_drawdown": _safe(self.max_drawdown),
572 "avg_drawdown": _safe(self.avg_drawdown),
573 "max_drawdown_duration": _safe(self.max_drawdown_duration),
574 "calmar": _safe(self.calmar),
575 "recovery_factor": _safe(self.recovery_factor),
576 }
578 rows: list[dict[str, Any]] = [
579 {"metric": name, **{asset: values.get(asset) for asset in assets}} for name, values in metrics.items()
580 ]
581 return pl.DataFrame(rows)