Coverage for src/jquantstats/_stats/_reporting.py: 100%

194 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-23 06:13 +0000

1"""Temporal reporting, capture ratios, and summary statistics.""" 

2 

3from __future__ import annotations 

4 

5from typing import TYPE_CHECKING, Any, cast 

6 

7import polars as pl 

8 

9from ._core import _drawdown_series, _to_float, columnwise_stat 

10from ._internals import _comp_return 

11 

12if TYPE_CHECKING: 

13 from ..data import Data 

14 

15# ── Reporting statistics mixin ─────────────────────────────────────────────── 

16 

17 

18class _ReportingStatsMixin: 

19 """Mixin providing temporal, capture, and summary reporting metrics. 

20 

21 Covers: periods per year, average drawdown, Calmar ratio, recovery factor, 

22 max drawdown duration, monthly win rate, up/down capture ratios, annual 

23 breakdown, and summary statistics table. 

24 

25 Cross-mixin dependencies: 

26 - _BasicStatsMixin: avg_return, avg_win, avg_loss, win_rate, profit_factor, 

27 payoff_ratio, best, worst, volatility, skew, kurtosis, value_at_risk, 

28 conditional_value_at_risk, exposure 

29 - _RiskStatsMixin: sharpe 

30 - _DrawdownMixin: max_drawdown 

31 """ 

32 

33 _data: Data 

34 all: pl.DataFrame 

35 

36 if TYPE_CHECKING: 

37 from .._protocol import DataLike 

38 

39 data: DataLike 

40 

41 def avg_return(self) -> dict[str, float]: 

42 """Defined on _BasicStatsMixin.""" 

43 

44 def avg_win(self) -> dict[str, float]: 

45 """Defined on _BasicStatsMixin.""" 

46 

47 def avg_loss(self) -> dict[str, float]: 

48 """Defined on _BasicStatsMixin.""" 

49 

50 def win_rate(self) -> dict[str, float]: 

51 """Defined on _BasicStatsMixin.""" 

52 

53 def profit_factor(self) -> dict[str, float]: 

54 """Defined on _BasicStatsMixin.""" 

55 

56 def payoff_ratio(self) -> dict[str, float]: 

57 """Defined on _BasicStatsMixin.""" 

58 

59 def best(self) -> dict[str, float | None]: 

60 """Defined on _BasicStatsMixin.""" 

61 

62 def worst(self) -> dict[str, float | None]: 

63 """Defined on _BasicStatsMixin.""" 

64 

65 def volatility(self) -> dict[str, float]: 

66 """Defined on _BasicStatsMixin.""" 

67 

68 def sharpe(self) -> dict[str, float]: 

69 """Defined on _RiskStatsMixin.""" 

70 

71 def skew(self) -> dict[str, int | float | None]: 

72 """Defined on _BasicStatsMixin.""" 

73 

74 def kurtosis(self) -> dict[str, int | float | None]: 

75 """Defined on _BasicStatsMixin.""" 

76 

77 def value_at_risk(self) -> dict[str, float]: 

78 """Defined on _BasicStatsMixin.""" 

79 

80 def conditional_value_at_risk(self) -> dict[str, float]: 

81 """Defined on _BasicStatsMixin.""" 

82 

83 def max_drawdown(self) -> dict[str, float]: 

84 """Defined on _DrawdownMixin.""" 

85 

86 def exposure(self) -> dict[str, float]: 

87 """Defined on _BasicStatsMixin.""" 

88 

89 # ── Temporal & reporting ────────────────────────────────────────────────── 

90 

91 @property 

92 def periods_per_year(self) -> float: 

93 """Estimate the number of periods per year from the data index spacing. 

94 

95 Returns: 

96 float: Estimated number of observations per calendar year. 

97 """ 

98 return self._data._periods_per_year 

99 

100 @columnwise_stat 

101 def avg_drawdown(self, series: pl.Series) -> float: 

102 """Average drawdown across all underwater periods. 

103 

104 Returns 0.0 when there are no underwater periods. 

105 

106 Matches the QuantStats sign convention: drawdown is expressed as a 

107 negative fraction (e.g. ``-0.2`` for 20% below peak). 

108 

109 Args: 

110 series (pl.Series): Series of additive daily returns. 

111 

112 Returns: 

113 float: Mean drawdown in [-1, 0]. 

114 """ 

115 dd = _drawdown_series(series) 

116 in_dd = dd.filter(dd > 0) 

117 # A series that never falls below its high-water mark has an average drawdown of exactly 0.0. 

118 if in_dd.is_empty(): 

119 return 0.0 

120 return -_to_float(in_dd.mean()) 

121 

122 @columnwise_stat 

123 def cagr( 

124 self, 

125 series: pl.Series, 

126 rf: float = 0.0, 

127 compounded: bool = True, 

128 periods: int | float | None = None, 

129 ) -> float: 

130 """Calculate the Compound Annual Growth Rate (CAGR) of excess returns. 

131 

132 CAGR represents the geometric mean annual growth rate, providing a 

133 smoothed annualized return that accounts for compounding effects. 

134 

135 Args: 

136 series (pl.Series): Series of additive daily returns. 

137 rf (float): Annualized risk-free rate. Defaults to 0.0. 

138 compounded (bool): Whether to compound returns. Defaults to True. 

139 periods: Periods per year for annualisation. Defaults to ``periods_per_year``. 

140 

141 Returns: 

142 float: CAGR of excess returns. 

143 

144 Returns NaN when: 

145 ``float("nan")`` when the series is empty. 

146 """ 

147 raw_periods = periods or self._data._periods_per_year 

148 n = len(series) 

149 if n == 0: 

150 return float("nan") # pragma: no cover 

151 excess = series.cast(pl.Float64) - rf / raw_periods 

152 total = _comp_return(excess) if compounded else _to_float(excess.sum()) 

153 years = n / raw_periods 

154 return float(abs(1.0 + total) ** (1.0 / years) - 1.0) 

155 

156 def expected_return( 

157 self, 

158 aggregate: str | None = None, 

159 compounded: bool = True, 

160 ) -> dict[str, float]: 

161 """Expected return with optional period aggregation. 

162 

163 Returns the arithmetic mean of per-period returns. When *aggregate* is 

164 provided the returns are first compounded (or summed) within each 

165 calendar period, and the mean is taken over those period returns. 

166 

167 Args: 

168 aggregate (str | None): Period to aggregate to before computing the 

169 mean. Accepted values: ``'weekly'``, ``'monthly'``, 

170 ``'quarterly'``, ``'annual'`` / ``'yearly'``. Defaults to 

171 ``None`` (raw per-period mean). 

172 compounded (bool): Compound returns within each period when 

173 *aggregate* is set. Defaults to ``True``. 

174 

175 Returns: 

176 dict[str, float]: Mean return per asset for the specified period. 

177 

178 Raises: 

179 ValueError: If *aggregate* is an unrecognised string. 

180 

181 Note: 

182 Requires a temporal (Date / Datetime) index when *aggregate* is not 

183 ``None``; falls back to the raw per-period mean otherwise. 

184 

185 Returns NaN when: 

186 Entries are ``float("nan")`` when an asset has no non-null 

187 observations. 

188 """ 

189 _freq_map: dict[str, str] = { 

190 "weekly": "1w", 

191 "monthly": "1mo", 

192 "quarterly": "3mo", 

193 "annual": "1y", 

194 "yearly": "1y", 

195 } 

196 

197 def _geomean(s: pl.Series) -> float: 

198 """Per-period geometric mean: (product(1 + r))^(1/n) - 1.""" 

199 n = s.count() 

200 if n == 0: 

201 return float("nan") 

202 return float(_to_float((1.0 + s.cast(pl.Float64)).product()) ** (1.0 / n) - 1.0) 

203 

204 def _raw_expected_returns() -> dict[str, float]: 

205 """Return the geometric mean of each raw return series.""" 

206 return {col: _geomean(series.drop_nulls()) for col, series in self._data.items()} 

207 

208 if aggregate is None: 

209 return _raw_expected_returns() 

210 

211 if aggregate.lower() not in _freq_map: 

212 raise ValueError(f"aggregate must be one of {list(_freq_map)}, got {aggregate!r}") # noqa: TRY003 

213 

214 all_df = self.all 

215 date_col_name = self._data.date_col[0] if self._data.date_col else None 

216 if date_col_name is None or not all_df[date_col_name].dtype.is_temporal(): 

217 return _raw_expected_returns() 

218 

219 trunc = _freq_map[aggregate.lower()] 

220 agg_expr = ((1.0 + pl.col("ret")).product() - 1.0) if compounded else pl.col("ret").sum() 

221 

222 result: dict[str, float] = {} 

223 for col, series in self._data.items(): 

224 df = ( 

225 pl.DataFrame({"date": all_df[date_col_name], "ret": series}) 

226 .drop_nulls() 

227 .with_columns(pl.col("date").dt.truncate(trunc).alias("period")) 

228 ) 

229 period_rets = df.group_by("period").agg(agg_expr.alias("ret"))["ret"] 

230 result[col] = _geomean(period_rets) 

231 return result 

232 

233 def rar(self, periods: int | float = 252) -> dict[str, float]: 

234 """Risk-Adjusted Return: CAGR divided by exposure. 

235 

236 Measures annualised return per unit of market participation time, 

237 matching the quantstats convention. 

238 

239 Args: 

240 periods: Periods per year for CAGR annualisation. Defaults to ``periods_per_year``. 

241 

242 Returns: 

243 dict[str, float]: RAR per asset. 

244 """ 

245 cagr = self.cagr(periods=periods) 

246 exp = self.exposure() 

247 return {col: cagr[col] / exp[col] for col in cagr} 

248 

249 @columnwise_stat 

250 def calmar(self, series: pl.Series, periods: int | float | None = None) -> float: 

251 """Calmar ratio (CAGR divided by maximum drawdown). 

252 

253 Returns ``nan`` when the maximum drawdown is zero. 

254 

255 Args: 

256 series (pl.Series): Series of additive daily returns. 

257 periods: Annualisation factor. Defaults to ``periods_per_year``. 

258 

259 Returns: 

260 float: Calmar ratio, or ``nan`` if max drawdown is zero. 

261 """ 

262 raw_periods = float(periods or self._data._periods_per_year) 

263 max_dd = _to_float(_drawdown_series(series).max()) 

264 if max_dd <= 0: 

265 return float("nan") 

266 n = len(series) 

267 comp_return = _comp_return(series) 

268 cagr = float((1.0 + comp_return) ** (raw_periods / n)) - 1.0 

269 return cagr / max_dd 

270 

271 @columnwise_stat 

272 def recovery_factor(self, series: pl.Series) -> float: 

273 """Recovery factor (total return divided by maximum drawdown). 

274 

275 Matches the quantstats convention: total return is the simple sum of 

276 returns, not compounded. Returns ``nan`` when the maximum drawdown 

277 is zero. 

278 

279 Args: 

280 series (pl.Series): Series of additive daily returns. 

281 

282 Returns: 

283 float: Recovery factor, or ``nan`` if max drawdown is zero. 

284 """ 

285 max_dd = _to_float(_drawdown_series(series).max()) 

286 if max_dd <= 0: 

287 return float("nan") 

288 total_return = _to_float(series.sum()) 

289 return abs(total_return) / max_dd 

290 

291 def max_drawdown_duration(self) -> dict[str, float | int | None]: 

292 """Maximum drawdown duration in calendar days (or periods) per asset. 

293 

294 When the index is a temporal column (``Date`` / ``Datetime``) the 

295 duration is expressed as calendar days spanned by the longest 

296 underwater run. For integer-indexed data each row counts as one 

297 period. 

298 

299 Returns: 

300 dict[str, float | int | None]: Asset → max drawdown duration. 

301 Returns 0 when there are no underwater periods. 

302 """ 

303 all_df = self.all 

304 date_col_name = self._data.date_col[0] if self._data.date_col else None 

305 has_date = date_col_name is not None and all_df[date_col_name].dtype.is_temporal() 

306 result: dict[str, float | int | None] = {} 

307 for col, series in self._data.items(): 

308 nav = 1.0 + series.cast(pl.Float64).cum_sum() 

309 hwm = nav.cum_max() 

310 in_dd = nav < hwm 

311 

312 if not in_dd.any(): 

313 result[col] = 0 

314 continue 

315 

316 if has_date and date_col_name is not None: 

317 frame = pl.DataFrame({"date": all_df[date_col_name], "in_dd": in_dd}) 

318 else: 

319 frame = pl.DataFrame({"date": pl.Series(list(range(len(series))), dtype=pl.Int64), "in_dd": in_dd}) 

320 

321 frame = frame.with_columns(pl.col("in_dd").rle_id().alias("run_id")) 

322 dd_runs = ( 

323 frame.filter(pl.col("in_dd")) 

324 .group_by("run_id") 

325 .agg([pl.col("date").min().alias("start"), pl.col("date").max().alias("end")]) 

326 ) 

327 

328 if has_date: 

329 dd_runs = dd_runs.with_columns( 

330 ((pl.col("end") - pl.col("start")).dt.total_days() + 1).alias("duration") 

331 ) 

332 else: 

333 dd_runs = dd_runs.with_columns((pl.col("end") - pl.col("start") + 1).alias("duration")) 

334 

335 result[col] = int(_to_float(dd_runs["duration"].max())) 

336 return result 

337 

338 def monthly_win_rate(self) -> dict[str, float]: 

339 """Fraction of calendar months with a positive compounded return per asset. 

340 

341 Requires a temporal (Date / Datetime) index. Returns ``nan`` per 

342 asset when no temporal index is present. 

343 

344 Returns: 

345 dict[str, float]: Monthly win rate in [0, 1] per asset. 

346 

347 Returns NaN when: 

348 Entries are ``float("nan")`` when no temporal index is present or an 

349 asset has no non-null observations. 

350 """ 

351 all_df = self.all 

352 date_col_name = self._data.date_col[0] if self._data.date_col else None 

353 if date_col_name is None or not all_df[date_col_name].dtype.is_temporal(): 

354 return {col: float("nan") for col, _ in self._data.items()} 

355 

356 result: dict[str, float] = {} 

357 for col, _ in self._data.items(): 

358 df = ( 

359 all_df.select([date_col_name, col]) 

360 .drop_nulls() 

361 .with_columns( 

362 [ 

363 pl.col(date_col_name).dt.year().alias("_year"), 

364 pl.col(date_col_name).dt.month().alias("_month"), 

365 ] 

366 ) 

367 ) 

368 monthly = ( 

369 df.group_by(["_year", "_month"]) 

370 .agg((pl.col(col) + 1.0).product().alias("gross")) 

371 .with_columns((pl.col("gross") - 1.0).alias("monthly_return")) 

372 ) 

373 n_total = len(monthly) 

374 if n_total == 0: 

375 result[col] = float("nan") 

376 else: 

377 n_positive = int((monthly["monthly_return"] > 0).sum()) 

378 result[col] = n_positive / n_total 

379 return result 

380 

381 # ── Capture ratios ──────────────────────────────────────────────────────── 

382 

383 def up_capture(self, benchmark: pl.Series) -> dict[str, float]: 

384 """Up-market capture ratio relative to an explicit benchmark series. 

385 

386 Measures the fraction of the benchmark's upside that the strategy 

387 captures. A value greater than 1.0 means the strategy outperformed 

388 the benchmark in rising markets. 

389 

390 Args: 

391 benchmark: Benchmark return series aligned row-by-row with the data. 

392 

393 Returns: 

394 dict[str, float]: Up capture ratio per asset. 

395 

396 Returns NaN when: 

397 Entries are ``float("nan")`` when the benchmark has no positive 

398 periods, its up-market geometric mean is zero, or an asset has no 

399 usable returns during those periods. 

400 """ 

401 up_mask = benchmark > 0 

402 bench_up = benchmark.filter(up_mask).drop_nulls() 

403 # A benchmark with no positive periods makes up-capture undefined for every asset. 

404 if bench_up.is_empty(): 

405 return {col: float("nan") for col, _ in self._data.items()} 

406 bench_geom = float((bench_up + 1.0).product()) ** (1.0 / len(bench_up)) - 1.0 

407 if bench_geom == 0.0: # pragma: no cover 

408 return {col: float("nan") for col, _ in self._data.items()} 

409 result: dict[str, float] = {} 

410 for col, series in self._data.items(): 

411 strat_up = series.filter(up_mask).drop_nulls() 

412 # An asset may have no usable returns during the benchmark's up periods after null filtering. 

413 if strat_up.is_empty(): 

414 result[col] = float("nan") 

415 else: 

416 strat_geom = float((strat_up + 1.0).product()) ** (1.0 / len(strat_up)) - 1.0 

417 result[col] = strat_geom / bench_geom 

418 return result 

419 

420 def down_capture(self, benchmark: pl.Series) -> dict[str, float]: 

421 """Down-market capture ratio relative to an explicit benchmark series. 

422 

423 A value less than 1.0 means the strategy lost less than the benchmark 

424 in falling markets (a desirable property). 

425 

426 Args: 

427 benchmark: Benchmark return series aligned row-by-row with the data. 

428 

429 Returns: 

430 dict[str, float]: Down capture ratio per asset. 

431 

432 Returns NaN when: 

433 Entries are ``float("nan")`` when the benchmark has no negative 

434 periods, its down-market geometric mean is zero, or an asset has no 

435 usable returns during those periods. 

436 """ 

437 down_mask = benchmark < 0 

438 bench_down = benchmark.filter(down_mask).drop_nulls() 

439 # A benchmark with no negative periods makes down-capture undefined for every asset. 

440 if bench_down.is_empty(): 

441 return {col: float("nan") for col, _ in self._data.items()} 

442 bench_geom = float((bench_down + 1.0).product()) ** (1.0 / len(bench_down)) - 1.0 

443 if bench_geom == 0.0: # pragma: no cover 

444 return {col: float("nan") for col, _ in self._data.items()} 

445 result: dict[str, float] = {} 

446 for col, series in self._data.items(): 

447 strat_down = series.filter(down_mask).drop_nulls() 

448 # An asset may have no usable returns during the benchmark's down periods after null filtering. 

449 if strat_down.is_empty(): 

450 result[col] = float("nan") 

451 else: 

452 strat_geom = float((strat_down + 1.0).product()) ** (1.0 / len(strat_down)) - 1.0 

453 result[col] = strat_geom / bench_geom 

454 return result 

455 

456 # ── Summary & breakdown ──────────────────────────────────────────────────── 

457 

458 def annual_breakdown(self) -> pl.DataFrame: 

459 """Summary statistics broken down by calendar year. 

460 

461 Groups the data by calendar year using the date index, computes a 

462 full `summary` for each year, and stacks the results with an 

463 additional ``year`` column. 

464 

465 Returns: 

466 pl.DataFrame: Columns ``year``, ``metric``, one per asset, sorted 

467 by ``year``. 

468 

469 Raises: 

470 ValueError: If the data has no date index. 

471 """ 

472 all_df = self.all 

473 date_col_name = self._data.date_col[0] if self._data.date_col else None 

474 has_temporal = date_col_name is not None and all_df[date_col_name].dtype.is_temporal() 

475 

476 from ..data import Data 

477 

478 if not has_temporal: 

479 # Integer-index fallback: group by chunks of ~_periods_per_year rows 

480 chunk = round(self._data._periods_per_year) 

481 total = all_df.height 

482 frames_int: list[pl.DataFrame] = [] 

483 for i, start in enumerate(range(0, total, chunk), start=1): 

484 chunk_all = all_df.slice(start, chunk) 

485 if chunk_all.height < max(5, chunk // 4): 

486 continue 

487 chunk_index = chunk_all.select(self._data.date_col) 

488 chunk_returns = chunk_all.select(self._data.returns.columns) 

489 chunk_benchmark = ( 

490 chunk_all.select(self._data.benchmark.columns) if self._data.benchmark is not None else None 

491 ) 

492 chunk_data = Data(returns=chunk_returns, index=chunk_index, benchmark=chunk_benchmark) 

493 chunk_summary = cast(Any, type(self))(chunk_data).summary() 

494 chunk_summary = chunk_summary.with_columns(pl.lit(i).alias("year")) 

495 frames_int.append(chunk_summary) 

496 if not frames_int: 

497 return pl.DataFrame() 

498 result_int = pl.concat(frames_int) 

499 ordered_int = ["year", "metric", *[c for c in result_int.columns if c not in ("year", "metric")]] 

500 return result_int.select(ordered_int) 

501 

502 if date_col_name is None: # unreachable: has_temporal guarantees non-None # pragma: no cover 

503 return pl.DataFrame() # pragma: no cover 

504 years = all_df[date_col_name].dt.year().unique().sort().to_list() 

505 

506 frames: list[pl.DataFrame] = [] 

507 for year in years: 

508 year_all = all_df.filter(pl.col(date_col_name).dt.year() == year) 

509 if year_all.height < 2: 

510 continue 

511 year_index = year_all.select([date_col_name]) 

512 year_returns = year_all.select(self._data.returns.columns) 

513 year_benchmark = year_all.select(self._data.benchmark.columns) if self._data.benchmark is not None else None 

514 year_data = Data(returns=year_returns, index=year_index, benchmark=year_benchmark) 

515 year_summary = cast(Any, type(self))(year_data).summary() 

516 year_summary = year_summary.with_columns(pl.lit(year).alias("year")) 

517 frames.append(year_summary) 

518 

519 if not frames: 

520 asset_cols = list(self._data.returns.columns) 

521 schema: dict[str, type[pl.DataType]] = { 

522 "year": pl.Int32, 

523 "metric": pl.String, 

524 **dict.fromkeys(asset_cols, pl.Float64), 

525 } 

526 return pl.DataFrame(schema=schema) 

527 

528 result = pl.concat(frames) 

529 ordered = ["year", "metric", *[c for c in result.columns if c not in ("year", "metric")]] 

530 return result.select(ordered) 

531 

532 def summary(self) -> pl.DataFrame: 

533 """Summary statistics for each asset as a tidy DataFrame. 

534 

535 Each row is one metric; each column beyond ``metric`` is one asset. 

536 

537 Returns: 

538 pl.DataFrame: A DataFrame with a ``metric`` column followed by one 

539 column per asset. 

540 

541 Returns NaN when: 

542 Cells are ``float("nan")`` when the underlying metric is unavailable 

543 for the data (e.g. no temporal index or no benchmark). 

544 """ 

545 assets = [col for col, _ in self._data.items()] 

546 

547 def _safe(fn: Any) -> dict[str, Any]: 

548 """Call *fn()* and return its result; return NaN for each asset on any exception.""" 

549 try: 

550 result: dict[str, Any] = fn() 

551 except Exception: 

552 return dict.fromkeys(assets, float("nan")) 

553 return result 

554 

555 metrics: dict[str, dict[str, Any]] = { 

556 "avg_return": _safe(self.avg_return), 

557 "avg_win": _safe(self.avg_win), 

558 "avg_loss": _safe(self.avg_loss), 

559 "win_rate": _safe(self.win_rate), 

560 "profit_factor": _safe(self.profit_factor), 

561 "payoff_ratio": _safe(self.payoff_ratio), 

562 "monthly_win_rate": _safe(self.monthly_win_rate), 

563 "best": _safe(self.best), 

564 "worst": _safe(self.worst), 

565 "volatility": _safe(self.volatility), 

566 "sharpe": _safe(self.sharpe), 

567 "skew": _safe(self.skew), 

568 "kurtosis": _safe(self.kurtosis), 

569 "value_at_risk": _safe(self.value_at_risk), 

570 "conditional_value_at_risk": _safe(self.conditional_value_at_risk), 

571 "max_drawdown": _safe(self.max_drawdown), 

572 "avg_drawdown": _safe(self.avg_drawdown), 

573 "max_drawdown_duration": _safe(self.max_drawdown_duration), 

574 "calmar": _safe(self.calmar), 

575 "recovery_factor": _safe(self.recovery_factor), 

576 } 

577 

578 rows: list[dict[str, Any]] = [ 

579 {"metric": name, **{asset: values.get(asset) for asset in assets}} for name, values in metrics.items() 

580 ] 

581 return pl.DataFrame(rows)