Coverage for src / jquantstats / _stats / _reporting.py: 100%
266 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-07 14:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-07 14:28 +0000
1"""Temporal reporting, capture ratios, and summary statistics."""
3from __future__ import annotations
5from typing import TYPE_CHECKING, Any, cast
7import polars as pl
9from ._core import _drawdown_series, _to_float, columnwise_stat
10from ._internals import _comp_return
12# ── Reporting statistics mixin ───────────────────────────────────────────────
15class _ReportingStatsMixin:
16 """Mixin providing temporal, capture, and summary reporting metrics.
18 Covers: periods per year, average drawdown, Calmar ratio, recovery factor,
19 max drawdown duration, monthly win rate, worst-N periods, up/down capture
20 ratios, annual breakdown, and summary statistics table.
22 Attributes (provided by the concrete subclass):
23 data: The :class:`~jquantstats._data.Data` object.
24 all: Combined DataFrame for efficient column selection.
25 """
27 if TYPE_CHECKING:
28 from ._protocol import DataLike
30 data: DataLike
31 all: pl.DataFrame | None
33 def avg_return(self) -> dict[str, float]:
34 """Defined on _BasicStatsMixin."""
36 def avg_win(self) -> dict[str, float]:
37 """Defined on _BasicStatsMixin."""
39 def avg_loss(self) -> dict[str, float]:
40 """Defined on _BasicStatsMixin."""
42 def win_rate(self) -> dict[str, float]:
43 """Defined on _BasicStatsMixin."""
45 def profit_factor(self) -> dict[str, float]:
46 """Defined on _BasicStatsMixin."""
48 def payoff_ratio(self) -> dict[str, float]:
49 """Defined on _BasicStatsMixin."""
51 def best(self) -> dict[str, float]:
52 """Defined on _BasicStatsMixin."""
54 def worst(self) -> dict[str, float]:
55 """Defined on _BasicStatsMixin."""
57 def volatility(self) -> dict[str, float]:
58 """Defined on _BasicStatsMixin."""
60 def sharpe(self) -> dict[str, float]:
61 """Defined on _PerformanceStatsMixin."""
63 def skew(self) -> dict[str, float]:
64 """Defined on _BasicStatsMixin."""
66 def kurtosis(self) -> dict[str, float]:
67 """Defined on _BasicStatsMixin."""
69 def value_at_risk(self) -> dict[str, float]:
70 """Defined on _BasicStatsMixin."""
72 def conditional_value_at_risk(self) -> dict[str, float]:
73 """Defined on _BasicStatsMixin."""
75 def max_drawdown(self) -> dict[str, float]:
76 """Defined on _PerformanceStatsMixin."""
78 def cagr(self, periods: int | float | None = None) -> dict[str, float]:
79 """Defined on _ReportingStatsMixin."""
81 def exposure(self) -> dict[str, float]:
82 """Defined on _BasicStatsMixin."""
84 # ── Temporal & reporting ──────────────────────────────────────────────────
86 @property
87 def periods_per_year(self) -> float:
88 """Estimate the number of periods per year from the data index spacing.
90 Returns:
91 float: Estimated number of observations per calendar year.
92 """
93 return self.data._periods_per_year
95 @columnwise_stat
96 def avg_drawdown(self, series: pl.Series) -> float:
97 """Average drawdown across all underwater periods.
99 Returns 0.0 when there are no underwater periods.
101 Matches the QuantStats sign convention: drawdown is expressed as a
102 negative fraction (e.g. ``-0.2`` for 20% below peak).
104 Args:
105 series (pl.Series): Series of additive daily returns.
107 Returns:
108 float: Mean drawdown in [-1, 0].
109 """
110 dd = _drawdown_series(series)
111 in_dd = dd.filter(dd > 0)
112 if in_dd.is_empty():
113 return 0.0
114 return -_to_float(in_dd.mean())
116 @columnwise_stat
117 def cagr(
118 self,
119 series: pl.Series,
120 rf: float = 0.0,
121 compounded: bool = True,
122 periods: int | float | None = None,
123 ) -> float:
124 """Calculate the Compound Annual Growth Rate (CAGR) of excess returns.
126 CAGR represents the geometric mean annual growth rate, providing a
127 smoothed annualized return that accounts for compounding effects.
129 Args:
130 series (pl.Series): Series of additive daily returns.
131 rf (float): Annualized risk-free rate. Defaults to 0.0.
132 compounded (bool): Whether to compound returns. Defaults to True.
133 periods: Periods per year for annualisation. Defaults to ``periods_per_year``.
135 Returns:
136 float: CAGR of excess returns.
137 """
138 raw_periods = periods or self.data._periods_per_year
139 n = len(series)
140 if n == 0:
141 return float("nan") # pragma: no cover
142 excess = series.cast(pl.Float64) - rf / raw_periods
143 total = _comp_return(excess) if compounded else _to_float(excess.sum())
144 years = n / raw_periods
145 return float(abs(1.0 + total) ** (1.0 / years) - 1.0)
147 def expected_return(
148 self,
149 aggregate: str | None = None,
150 compounded: bool = True,
151 ) -> dict[str, float]:
152 """Expected return with optional period aggregation.
154 Returns the arithmetic mean of per-period returns. When *aggregate* is
155 provided the returns are first compounded (or summed) within each
156 calendar period, and the mean is taken over those period returns.
158 Args:
159 aggregate (str | None): Period to aggregate to before computing the
160 mean. Accepted values: ``'weekly'``, ``'monthly'``,
161 ``'quarterly'``, ``'annual'`` / ``'yearly'``. Defaults to
162 ``None`` (raw per-period mean).
163 compounded (bool): Compound returns within each period when
164 *aggregate* is set. Defaults to ``True``.
166 Returns:
167 dict[str, float]: Mean return per asset for the specified period.
169 Raises:
170 ValueError: If *aggregate* is an unrecognised string.
172 Note:
173 Requires a temporal (Date / Datetime) index when *aggregate* is not
174 ``None``; falls back to the raw per-period mean otherwise.
175 """
176 _freq_map: dict[str, str] = {
177 "weekly": "1w",
178 "monthly": "1mo",
179 "quarterly": "3mo",
180 "annual": "1y",
181 "yearly": "1y",
182 }
184 def _geomean(s: pl.Series) -> float:
185 """Per-period geometric mean: (product(1 + r))^(1/n) - 1."""
186 n = s.count()
187 if n == 0:
188 return float("nan")
189 return float(_to_float((1.0 + s.cast(pl.Float64)).product()) ** (1.0 / n) - 1.0)
191 if aggregate is None:
192 return {col: _geomean(series.drop_nulls()) for col, series in self.data.items()}
194 if aggregate.lower() not in _freq_map:
195 raise ValueError(f"aggregate must be one of {list(_freq_map)}, got {aggregate!r}") # noqa: TRY003
197 all_df = cast(pl.DataFrame, self.all)
198 date_col_name = self.data.date_col[0] if self.data.date_col else None
199 if date_col_name is None or not all_df[date_col_name].dtype.is_temporal():
200 return {col: _geomean(series.drop_nulls()) for col, series in self.data.items()}
202 trunc = _freq_map[aggregate.lower()]
203 agg_expr = ((1.0 + pl.col("ret")).product() - 1.0) if compounded else pl.col("ret").sum()
205 result: dict[str, float] = {}
206 for col, series in self.data.items():
207 df = (
208 pl.DataFrame({"date": all_df[date_col_name], "ret": series})
209 .drop_nulls()
210 .with_columns(pl.col("date").dt.truncate(trunc).alias("period"))
211 )
212 period_rets = df.group_by("period").agg(agg_expr.alias("ret"))["ret"]
213 result[col] = _geomean(period_rets)
214 return result
216 def rar(self, periods: int | float = 252) -> dict[str, float]:
217 """Risk-Adjusted Return: CAGR divided by exposure.
219 Measures annualised return per unit of market participation time,
220 matching the quantstats convention.
222 Args:
223 periods: Periods per year for CAGR annualisation. Defaults to ``periods_per_year``.
225 Returns:
226 dict[str, float]: RAR per asset.
227 """
228 cagr = self.cagr(periods=periods)
229 exp = self.exposure()
230 return {col: cagr[col] / exp[col] for col in cagr}
232 @columnwise_stat
233 def calmar(self, series: pl.Series, periods: int | float | None = None) -> float:
234 """Calmar ratio (CAGR divided by maximum drawdown).
236 Returns ``nan`` when the maximum drawdown is zero.
238 Args:
239 series (pl.Series): Series of additive daily returns.
240 periods: Annualisation factor. Defaults to ``periods_per_year``.
242 Returns:
243 float: Calmar ratio, or ``nan`` if max drawdown is zero.
244 """
245 raw_periods = periods or self.data._periods_per_year
246 max_dd = _to_float(_drawdown_series(series).max())
247 if max_dd <= 0:
248 return float("nan")
249 n = len(series)
250 comp_return = _comp_return(series)
251 cagr = (1.0 + comp_return) ** (raw_periods / n) - 1.0
252 return cagr / max_dd
254 @columnwise_stat
255 def recovery_factor(self, series: pl.Series) -> float:
256 """Recovery factor (total return divided by maximum drawdown).
258 Matches the quantstats convention: total return is the simple sum of
259 returns, not compounded. Returns ``nan`` when the maximum drawdown
260 is zero.
262 Args:
263 series (pl.Series): Series of additive daily returns.
265 Returns:
266 float: Recovery factor, or ``nan`` if max drawdown is zero.
267 """
268 max_dd = _to_float(_drawdown_series(series).max())
269 if max_dd <= 0:
270 return float("nan")
271 total_return = _to_float(series.sum())
272 return abs(total_return) / max_dd
274 def max_drawdown_duration(self) -> dict[str, float | int | None]:
275 """Maximum drawdown duration in calendar days (or periods) per asset.
277 When the index is a temporal column (``Date`` / ``Datetime``) the
278 duration is expressed as calendar days spanned by the longest
279 underwater run. For integer-indexed data each row counts as one
280 period.
282 Returns:
283 dict[str, float | int | None]: Asset → max drawdown duration.
284 Returns 0 when there are no underwater periods.
285 """
286 all_df = cast(pl.DataFrame, self.all)
287 date_col_name = self.data.date_col[0] if self.data.date_col else None
288 has_date = date_col_name is not None and all_df[date_col_name].dtype.is_temporal()
289 result: dict[str, float | int | None] = {}
290 for col, series in self.data.items():
291 nav = 1.0 + series.cast(pl.Float64).cum_sum()
292 hwm = nav.cum_max()
293 in_dd = nav < hwm
295 if not in_dd.any():
296 result[col] = 0
297 continue
299 if has_date and date_col_name is not None:
300 frame = pl.DataFrame({"date": all_df[date_col_name], "in_dd": in_dd})
301 else:
302 frame = pl.DataFrame({"date": pl.Series(list(range(len(series))), dtype=pl.Int64), "in_dd": in_dd})
304 frame = frame.with_columns(pl.col("in_dd").rle_id().alias("run_id"))
305 dd_runs = (
306 frame.filter(pl.col("in_dd"))
307 .group_by("run_id")
308 .agg([pl.col("date").min().alias("start"), pl.col("date").max().alias("end")])
309 )
311 if has_date:
312 dd_runs = dd_runs.with_columns(
313 ((pl.col("end") - pl.col("start")).dt.total_days() + 1).alias("duration")
314 )
315 else:
316 dd_runs = dd_runs.with_columns((pl.col("end") - pl.col("start") + 1).alias("duration"))
318 result[col] = int(_to_float(dd_runs["duration"].max()))
319 return result
321 def monthly_win_rate(self) -> dict[str, float]:
322 """Fraction of calendar months with a positive compounded return per asset.
324 Requires a temporal (Date / Datetime) index. Returns ``nan`` per
325 asset when no temporal index is present.
327 Returns:
328 dict[str, float]: Monthly win rate in [0, 1] per asset.
329 """
330 all_df = cast(pl.DataFrame, self.all)
331 date_col_name = self.data.date_col[0] if self.data.date_col else None
332 if date_col_name is None or not all_df[date_col_name].dtype.is_temporal():
333 return {col: float("nan") for col, _ in self.data.items()}
335 result: dict[str, float] = {}
336 for col, _ in self.data.items():
337 df = (
338 all_df.select([date_col_name, col])
339 .drop_nulls()
340 .with_columns(
341 [
342 pl.col(date_col_name).dt.year().alias("_year"),
343 pl.col(date_col_name).dt.month().alias("_month"),
344 ]
345 )
346 )
347 monthly = (
348 df.group_by(["_year", "_month"])
349 .agg((pl.col(col) + 1.0).product().alias("gross"))
350 .with_columns((pl.col("gross") - 1.0).alias("monthly_return"))
351 )
352 n_total = len(monthly)
353 if n_total == 0:
354 result[col] = float("nan")
355 else:
356 n_positive = int((monthly["monthly_return"] > 0).sum())
357 result[col] = n_positive / n_total
358 return result
360 def monthly_returns(self, eoy: bool = True, compounded: bool = True) -> dict[str, pl.DataFrame]:
361 """Calculate monthly returns in a pivot-table format.
363 Groups returns by calendar month and year, producing a DataFrame with
364 years as rows and months (JAN-DEC) as columns, plus an optional EOY
365 column with the full-year compounded return.
367 Args:
368 eoy (bool): Include an EOY column with the annual compounded return.
369 Defaults to True.
370 compounded (bool): Compound returns within each period. Defaults to True.
372 Returns:
373 dict[str, pl.DataFrame]: Per-asset pivot tables with columns
374 ``year``, ``JAN`` … ``DEC``, and optionally ``EOY``.
376 """
377 all_df = cast(pl.DataFrame, self.all)
378 date_col_name = self.data.date_col[0]
379 month_names = {
380 1: "JAN",
381 2: "FEB",
382 3: "MAR",
383 4: "APR",
384 5: "MAY",
385 6: "JUN",
386 7: "JUL",
387 8: "AUG",
388 9: "SEP",
389 10: "OCT",
390 11: "NOV",
391 12: "DEC",
392 }
393 month_order = list(month_names.values())
395 result: dict[str, pl.DataFrame] = {}
396 for col, series in self.data.items():
397 df = pl.DataFrame({"date": all_df[date_col_name], "ret": series}).drop_nulls()
398 df = df.with_columns(
399 [
400 pl.col("date").dt.year().alias("year"),
401 pl.col("date").dt.month().alias("month_num"),
402 ]
403 )
405 agg_expr = ((1.0 + pl.col("ret")).product() - 1.0) if compounded else pl.col("ret").sum()
406 monthly = (
407 df.group_by(["year", "month_num"])
408 .agg(agg_expr.alias("ret"))
409 .with_columns(
410 pl.col("month_num")
411 .map_elements(month_names.__getitem__, return_dtype=pl.String)
412 .alias("month_name")
413 )
414 .sort(["year", "month_num"])
415 )
417 pivoted = monthly.pivot(on="month_name", index="year", values="ret", aggregate_function="first")
418 for m in month_order:
419 if m not in pivoted.columns:
420 pivoted = pivoted.with_columns(pl.lit(0.0).alias(m))
421 pivoted = (
422 pivoted.select(["year", *month_order])
423 .fill_null(0.0)
424 .with_columns(pl.col("year").cast(pl.Int32))
425 .sort("year")
426 )
428 if eoy:
429 eoy_agg = (
430 df.group_by("year")
431 .agg(agg_expr.alias("EOY"))
432 .with_columns(pl.col("year").cast(pl.Int32))
433 .sort("year")
434 )
435 pivoted = pivoted.join(eoy_agg, on="year").sort("year")
437 result[col] = pivoted
438 return result
440 def distribution(self, compounded: bool = True) -> dict[str, dict[str, dict[str, list[float]]]]:
441 """Analyse return distributions across daily, weekly, monthly, quarterly, and yearly periods.
443 For each period, splits values into inliers and outliers using the
444 IQR method (1.5 * IQR beyond Q1/Q3).
446 Args:
447 compounded (bool): Compound returns within each period. Defaults to True.
449 Returns:
450 dict: Nested dict ``{asset: {period: {"values": [...], "outliers": [...]}}}``
451 where period is one of ``"Daily"``, ``"Weekly"``, ``"Monthly"``,
452 ``"Quarterly"``, ``"Yearly"``.
454 """
455 all_df = cast(pl.DataFrame, self.all)
456 date_col_name = self.data.date_col[0]
458 def _agg(df: pl.DataFrame, group_col: str) -> pl.Series:
459 """Aggregate returns within each group using product or sum."""
460 expr = ((1.0 + pl.col("ret")).product() - 1.0) if compounded else pl.col("ret").sum()
461 return df.group_by(group_col).agg(expr.alias("ret"))["ret"]
463 def _iqr_split(s: pl.Series) -> dict[str, list[float]]:
464 """Split series into inliers and outliers using the IQR method."""
465 q1 = cast(float, s.quantile(0.25))
466 q3 = cast(float, s.quantile(0.75))
467 iqr = q3 - q1
468 mask = (s >= q1 - 1.5 * iqr) & (s <= q3 + 1.5 * iqr)
469 return {"values": s.filter(mask).to_list(), "outliers": s.filter(~mask).to_list()}
471 result: dict[str, dict[str, dict[str, list[float]]]] = {}
472 for col, series in self.data.items():
473 df = pl.DataFrame({"date": all_df[date_col_name], "ret": series}).drop_nulls()
474 df = df.with_columns(
475 [
476 pl.col("date").dt.truncate("1w").alias("week"),
477 pl.col("date").dt.truncate("1mo").alias("month"),
478 pl.col("date").dt.truncate("3mo").alias("quarter"),
479 pl.col("date").dt.truncate("1y").alias("year"),
480 ]
481 )
482 result[col] = {
483 "Daily": _iqr_split(df["ret"]),
484 "Weekly": _iqr_split(_agg(df, "week")),
485 "Monthly": _iqr_split(_agg(df, "month")),
486 "Quarterly": _iqr_split(_agg(df, "quarter")),
487 "Yearly": _iqr_split(_agg(df, "year")),
488 }
489 return result
491 def compare(
492 self,
493 aggregate: str | None = None,
494 compounded: bool = True,
495 round_vals: int | None = None,
496 ) -> dict[str, pl.DataFrame]:
497 """Compare each asset's returns against the benchmark.
499 Aligns returns and benchmark by date, multiplies by 100 (percentage),
500 then computes a ``Multiplier`` (Returns / Benchmark) and ``Won``
501 indicator (``"+"`` when the asset outperformed, ``"-"`` otherwise).
503 Args:
504 aggregate (str | None): Pandas-style resample frequency for
505 period aggregation (e.g. ``"ME"``, ``"QE"``, ``"YE"``).
506 ``None`` returns daily rows. Defaults to None.
507 compounded (bool): Compound returns when aggregating. Defaults to True.
508 round_vals (int | None): Decimal places to round. Defaults to None.
510 Returns:
511 dict[str, pl.DataFrame]: Per-asset DataFrames with columns
512 ``Benchmark``, ``Returns``, ``Multiplier``, ``Won``.
514 Raises:
515 AttributeError: If no benchmark data is attached.
517 """
518 if self.data.benchmark is None:
519 raise AttributeError("No benchmark data available") # noqa: TRY003
521 all_df = cast(pl.DataFrame, self.all)
522 date_col_name = self.data.date_col[0]
523 bench_col = self.data.benchmark.columns[0]
525 _freq_map = {"ME": "1mo", "QE": "3mo", "YE": "1y", "W": "1w"}
527 def _agg_series(df: pl.DataFrame, period_col: str, val_col: str) -> pl.DataFrame:
528 """Aggregate a value column grouped by period using product or sum."""
529 expr = ((1.0 + pl.col(val_col)).product() - 1.0) if compounded else pl.col(val_col).sum()
530 return df.group_by(period_col).agg(expr.alias(val_col)).sort(period_col)
532 result: dict[str, pl.DataFrame] = {}
533 for col in self.data.returns.columns:
534 df = all_df.select(
535 [
536 pl.col(date_col_name),
537 pl.col(col).alias("ret"),
538 pl.col(bench_col).alias("bench"),
539 ]
540 )
542 if aggregate is not None and aggregate in _freq_map:
543 trunc = _freq_map[aggregate]
544 df = df.with_columns(pl.col(date_col_name).dt.truncate(trunc).alias("period"))
545 ret_agg = _agg_series(df.drop_nulls(subset=["ret"]), "period", "ret")
546 bench_agg = _agg_series(df.drop_nulls(subset=["bench"]), "period", "bench")
547 df = ret_agg.join(bench_agg, on="period", how="full", coalesce=True).sort("period")
548 ret_col, bench_col_name, _date_alias = "ret", "bench", "period"
549 else:
550 ret_col, bench_col_name, _date_alias = "ret", "bench", date_col_name
552 ret_pct = (df[ret_col] * 100).alias("Returns")
553 bench_pct = (df[bench_col_name] * 100).alias("Benchmark")
554 out = pl.DataFrame(
555 {
556 "Benchmark": bench_pct,
557 "Returns": ret_pct,
558 }
559 )
560 out = out.with_columns(
561 [
562 (pl.col("Returns") / pl.col("Benchmark").replace(0.0, None)).alias("Multiplier"),
563 pl.when(pl.col("Returns") >= pl.col("Benchmark"))
564 .then(pl.lit("+"))
565 .otherwise(pl.lit("-"))
566 .alias("Won"),
567 ]
568 )
570 if round_vals is not None:
571 out = out.with_columns(
572 [
573 pl.col("Benchmark").round(round_vals),
574 pl.col("Returns").round(round_vals),
575 pl.col("Multiplier").round(round_vals),
576 ]
577 )
579 result[col] = out
580 return result
582 def worst_n_periods(self, n: int = 5) -> dict[str, list[float | None]]:
583 """Return the N worst return periods per asset.
585 If a series has fewer than ``n`` non-null observations the list is
586 padded with ``None`` on the right.
588 Args:
589 n: Number of worst periods to return. Defaults to 5.
591 Returns:
592 dict[str, list[float | None]]: Sorted worst returns per asset.
593 """
594 result: dict[str, list[float | None]] = {}
595 for col, series in self.data.items():
596 nonnull = series.drop_nulls()
597 worst: list[float | None] = nonnull.sort(descending=False).head(n).to_list()
598 while len(worst) < n:
599 worst.append(None)
600 result[col] = worst
601 return result
603 # ── Capture ratios ────────────────────────────────────────────────────────
605 def up_capture(self, benchmark: pl.Series) -> dict[str, float]:
606 """Up-market capture ratio relative to an explicit benchmark series.
608 Measures the fraction of the benchmark's upside that the strategy
609 captures. A value greater than 1.0 means the strategy outperformed
610 the benchmark in rising markets.
612 Args:
613 benchmark: Benchmark return series aligned row-by-row with the data.
615 Returns:
616 dict[str, float]: Up capture ratio per asset.
617 """
618 up_mask = benchmark > 0
619 bench_up = benchmark.filter(up_mask).drop_nulls()
620 if bench_up.is_empty():
621 return {col: float("nan") for col, _ in self.data.items()}
622 bench_geom = float((bench_up + 1.0).product()) ** (1.0 / len(bench_up)) - 1.0
623 if bench_geom == 0.0: # pragma: no cover
624 return {col: float("nan") for col, _ in self.data.items()}
625 result: dict[str, float] = {}
626 for col, series in self.data.items():
627 strat_up = series.filter(up_mask).drop_nulls()
628 if strat_up.is_empty():
629 result[col] = float("nan")
630 else:
631 strat_geom = float((strat_up + 1.0).product()) ** (1.0 / len(strat_up)) - 1.0
632 result[col] = strat_geom / bench_geom
633 return result
635 def down_capture(self, benchmark: pl.Series) -> dict[str, float]:
636 """Down-market capture ratio relative to an explicit benchmark series.
638 A value less than 1.0 means the strategy lost less than the benchmark
639 in falling markets (a desirable property).
641 Args:
642 benchmark: Benchmark return series aligned row-by-row with the data.
644 Returns:
645 dict[str, float]: Down capture ratio per asset.
646 """
647 down_mask = benchmark < 0
648 bench_down = benchmark.filter(down_mask).drop_nulls()
649 if bench_down.is_empty():
650 return {col: float("nan") for col, _ in self.data.items()}
651 bench_geom = float((bench_down + 1.0).product()) ** (1.0 / len(bench_down)) - 1.0
652 if bench_geom == 0.0: # pragma: no cover
653 return {col: float("nan") for col, _ in self.data.items()}
654 result: dict[str, float] = {}
655 for col, series in self.data.items():
656 strat_down = series.filter(down_mask).drop_nulls()
657 if strat_down.is_empty():
658 result[col] = float("nan")
659 else:
660 strat_geom = float((strat_down + 1.0).product()) ** (1.0 / len(strat_down)) - 1.0
661 result[col] = strat_geom / bench_geom
662 return result
664 # ── Summary & breakdown ────────────────────────────────────────────────────
666 def annual_breakdown(self) -> pl.DataFrame:
667 """Summary statistics broken down by calendar year.
669 Groups the data by calendar year using the date index, computes a
670 full :py:meth:`summary` for each year, and stacks the results with an
671 additional ``year`` column.
673 Returns:
674 pl.DataFrame: Columns ``year``, ``metric``, one per asset, sorted
675 by ``year``.
677 Raises:
678 ValueError: If the data has no date index.
679 """
680 all_df = cast(pl.DataFrame, self.all)
681 date_col_name = self.data.date_col[0] if self.data.date_col else None
682 has_temporal = date_col_name is not None and all_df[date_col_name].dtype.is_temporal()
684 from ..data import Data
686 if not has_temporal:
687 # Integer-index fallback: group by chunks of ~_periods_per_year rows
688 chunk = round(self.data._periods_per_year)
689 total = all_df.height
690 frames_int: list[pl.DataFrame] = []
691 for i, start in enumerate(range(0, total, chunk), start=1):
692 chunk_all = all_df.slice(start, chunk)
693 if chunk_all.height < max(5, chunk // 4):
694 continue
695 chunk_index = chunk_all.select(self.data.date_col)
696 chunk_returns = chunk_all.select(self.data.returns.columns)
697 chunk_benchmark = (
698 chunk_all.select(self.data.benchmark.columns) if self.data.benchmark is not None else None
699 )
700 chunk_data = Data(returns=chunk_returns, index=chunk_index, benchmark=chunk_benchmark)
701 chunk_summary = cast(Any, type(self))(chunk_data).summary()
702 chunk_summary = chunk_summary.with_columns(pl.lit(i).alias("year"))
703 frames_int.append(chunk_summary)
704 if not frames_int:
705 return pl.DataFrame()
706 result_int = pl.concat(frames_int)
707 ordered_int = ["year", "metric", *[c for c in result_int.columns if c not in ("year", "metric")]]
708 return result_int.select(ordered_int)
710 if date_col_name is None: # unreachable: has_temporal guarantees non-None # pragma: no cover
711 return pl.DataFrame() # pragma: no cover
712 years = all_df[date_col_name].dt.year().unique().sort().to_list()
714 frames: list[pl.DataFrame] = []
715 for year in years:
716 year_all = all_df.filter(pl.col(date_col_name).dt.year() == year)
717 if year_all.height < 2:
718 continue
719 year_index = year_all.select([date_col_name])
720 year_returns = year_all.select(self.data.returns.columns)
721 year_benchmark = year_all.select(self.data.benchmark.columns) if self.data.benchmark is not None else None
722 year_data = Data(returns=year_returns, index=year_index, benchmark=year_benchmark)
723 year_summary = cast(Any, type(self))(year_data).summary()
724 year_summary = year_summary.with_columns(pl.lit(year).alias("year"))
725 frames.append(year_summary)
727 if not frames:
728 asset_cols = list(self.data.returns.columns)
729 schema: dict[str, type[pl.DataType]] = {
730 "year": pl.Int32,
731 "metric": pl.String,
732 **dict.fromkeys(asset_cols, pl.Float64),
733 }
734 return pl.DataFrame(schema=schema)
736 result = pl.concat(frames)
737 ordered = ["year", "metric", *[c for c in result.columns if c not in ("year", "metric")]]
738 return result.select(ordered)
740 def summary(self) -> pl.DataFrame:
741 """Summary statistics for each asset as a tidy DataFrame.
743 Each row is one metric; each column beyond ``metric`` is one asset.
745 Returns:
746 pl.DataFrame: A DataFrame with a ``metric`` column followed by one
747 column per asset.
748 """
749 assets = [col for col, _ in self.data.items()]
751 def _safe(fn: Any) -> dict[str, Any]:
752 """Call *fn()* and return its result; return NaN for each asset on any exception."""
753 try:
754 return fn()
755 except Exception:
756 return dict.fromkeys(assets, float("nan"))
758 metrics: dict[str, dict[str, Any]] = {
759 "avg_return": _safe(self.avg_return),
760 "avg_win": _safe(self.avg_win),
761 "avg_loss": _safe(self.avg_loss),
762 "win_rate": _safe(self.win_rate),
763 "profit_factor": _safe(self.profit_factor),
764 "payoff_ratio": _safe(self.payoff_ratio),
765 "monthly_win_rate": _safe(self.monthly_win_rate),
766 "best": _safe(self.best),
767 "worst": _safe(self.worst),
768 "volatility": _safe(self.volatility),
769 "sharpe": _safe(self.sharpe),
770 "skew": _safe(self.skew),
771 "kurtosis": _safe(self.kurtosis),
772 "value_at_risk": _safe(self.value_at_risk),
773 "conditional_value_at_risk": _safe(self.conditional_value_at_risk),
774 "max_drawdown": _safe(self.max_drawdown),
775 "avg_drawdown": _safe(self.avg_drawdown),
776 "max_drawdown_duration": _safe(self.max_drawdown_duration),
777 "calmar": _safe(self.calmar),
778 "recovery_factor": _safe(self.recovery_factor),
779 }
781 rows: list[dict[str, Any]] = [
782 {"metric": name, **{asset: values.get(asset) for asset in assets}} for name, values in metrics.items()
783 ]
784 return pl.DataFrame(rows)