Coverage for src / jquantstats / _stats / _reporting.py: 100%

266 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-07 14:28 +0000

1"""Temporal reporting, capture ratios, and summary statistics.""" 

2 

3from __future__ import annotations 

4 

5from typing import TYPE_CHECKING, Any, cast 

6 

7import polars as pl 

8 

9from ._core import _drawdown_series, _to_float, columnwise_stat 

10from ._internals import _comp_return 

11 

12# ── Reporting statistics mixin ─────────────────────────────────────────────── 

13 

14 

15class _ReportingStatsMixin: 

16 """Mixin providing temporal, capture, and summary reporting metrics. 

17 

18 Covers: periods per year, average drawdown, Calmar ratio, recovery factor, 

19 max drawdown duration, monthly win rate, worst-N periods, up/down capture 

20 ratios, annual breakdown, and summary statistics table. 

21 

22 Attributes (provided by the concrete subclass): 

23 data: The :class:`~jquantstats._data.Data` object. 

24 all: Combined DataFrame for efficient column selection. 

25 """ 

26 

27 if TYPE_CHECKING: 

28 from ._protocol import DataLike 

29 

30 data: DataLike 

31 all: pl.DataFrame | None 

32 

33 def avg_return(self) -> dict[str, float]: 

34 """Defined on _BasicStatsMixin.""" 

35 

36 def avg_win(self) -> dict[str, float]: 

37 """Defined on _BasicStatsMixin.""" 

38 

39 def avg_loss(self) -> dict[str, float]: 

40 """Defined on _BasicStatsMixin.""" 

41 

42 def win_rate(self) -> dict[str, float]: 

43 """Defined on _BasicStatsMixin.""" 

44 

45 def profit_factor(self) -> dict[str, float]: 

46 """Defined on _BasicStatsMixin.""" 

47 

48 def payoff_ratio(self) -> dict[str, float]: 

49 """Defined on _BasicStatsMixin.""" 

50 

51 def best(self) -> dict[str, float]: 

52 """Defined on _BasicStatsMixin.""" 

53 

54 def worst(self) -> dict[str, float]: 

55 """Defined on _BasicStatsMixin.""" 

56 

57 def volatility(self) -> dict[str, float]: 

58 """Defined on _BasicStatsMixin.""" 

59 

60 def sharpe(self) -> dict[str, float]: 

61 """Defined on _PerformanceStatsMixin.""" 

62 

63 def skew(self) -> dict[str, float]: 

64 """Defined on _BasicStatsMixin.""" 

65 

66 def kurtosis(self) -> dict[str, float]: 

67 """Defined on _BasicStatsMixin.""" 

68 

69 def value_at_risk(self) -> dict[str, float]: 

70 """Defined on _BasicStatsMixin.""" 

71 

72 def conditional_value_at_risk(self) -> dict[str, float]: 

73 """Defined on _BasicStatsMixin.""" 

74 

75 def max_drawdown(self) -> dict[str, float]: 

76 """Defined on _PerformanceStatsMixin.""" 

77 

78 def cagr(self, periods: int | float | None = None) -> dict[str, float]: 

79 """Defined on _ReportingStatsMixin.""" 

80 

81 def exposure(self) -> dict[str, float]: 

82 """Defined on _BasicStatsMixin.""" 

83 

84 # ── Temporal & reporting ────────────────────────────────────────────────── 

85 

86 @property 

87 def periods_per_year(self) -> float: 

88 """Estimate the number of periods per year from the data index spacing. 

89 

90 Returns: 

91 float: Estimated number of observations per calendar year. 

92 """ 

93 return self.data._periods_per_year 

94 

95 @columnwise_stat 

96 def avg_drawdown(self, series: pl.Series) -> float: 

97 """Average drawdown across all underwater periods. 

98 

99 Returns 0.0 when there are no underwater periods. 

100 

101 Matches the QuantStats sign convention: drawdown is expressed as a 

102 negative fraction (e.g. ``-0.2`` for 20% below peak). 

103 

104 Args: 

105 series (pl.Series): Series of additive daily returns. 

106 

107 Returns: 

108 float: Mean drawdown in [-1, 0]. 

109 """ 

110 dd = _drawdown_series(series) 

111 in_dd = dd.filter(dd > 0) 

112 if in_dd.is_empty(): 

113 return 0.0 

114 return -_to_float(in_dd.mean()) 

115 

116 @columnwise_stat 

117 def cagr( 

118 self, 

119 series: pl.Series, 

120 rf: float = 0.0, 

121 compounded: bool = True, 

122 periods: int | float | None = None, 

123 ) -> float: 

124 """Calculate the Compound Annual Growth Rate (CAGR) of excess returns. 

125 

126 CAGR represents the geometric mean annual growth rate, providing a 

127 smoothed annualized return that accounts for compounding effects. 

128 

129 Args: 

130 series (pl.Series): Series of additive daily returns. 

131 rf (float): Annualized risk-free rate. Defaults to 0.0. 

132 compounded (bool): Whether to compound returns. Defaults to True. 

133 periods: Periods per year for annualisation. Defaults to ``periods_per_year``. 

134 

135 Returns: 

136 float: CAGR of excess returns. 

137 """ 

138 raw_periods = periods or self.data._periods_per_year 

139 n = len(series) 

140 if n == 0: 

141 return float("nan") # pragma: no cover 

142 excess = series.cast(pl.Float64) - rf / raw_periods 

143 total = _comp_return(excess) if compounded else _to_float(excess.sum()) 

144 years = n / raw_periods 

145 return float(abs(1.0 + total) ** (1.0 / years) - 1.0) 

146 

147 def expected_return( 

148 self, 

149 aggregate: str | None = None, 

150 compounded: bool = True, 

151 ) -> dict[str, float]: 

152 """Expected return with optional period aggregation. 

153 

154 Returns the arithmetic mean of per-period returns. When *aggregate* is 

155 provided the returns are first compounded (or summed) within each 

156 calendar period, and the mean is taken over those period returns. 

157 

158 Args: 

159 aggregate (str | None): Period to aggregate to before computing the 

160 mean. Accepted values: ``'weekly'``, ``'monthly'``, 

161 ``'quarterly'``, ``'annual'`` / ``'yearly'``. Defaults to 

162 ``None`` (raw per-period mean). 

163 compounded (bool): Compound returns within each period when 

164 *aggregate* is set. Defaults to ``True``. 

165 

166 Returns: 

167 dict[str, float]: Mean return per asset for the specified period. 

168 

169 Raises: 

170 ValueError: If *aggregate* is an unrecognised string. 

171 

172 Note: 

173 Requires a temporal (Date / Datetime) index when *aggregate* is not 

174 ``None``; falls back to the raw per-period mean otherwise. 

175 """ 

176 _freq_map: dict[str, str] = { 

177 "weekly": "1w", 

178 "monthly": "1mo", 

179 "quarterly": "3mo", 

180 "annual": "1y", 

181 "yearly": "1y", 

182 } 

183 

184 def _geomean(s: pl.Series) -> float: 

185 """Per-period geometric mean: (product(1 + r))^(1/n) - 1.""" 

186 n = s.count() 

187 if n == 0: 

188 return float("nan") 

189 return float(_to_float((1.0 + s.cast(pl.Float64)).product()) ** (1.0 / n) - 1.0) 

190 

191 if aggregate is None: 

192 return {col: _geomean(series.drop_nulls()) for col, series in self.data.items()} 

193 

194 if aggregate.lower() not in _freq_map: 

195 raise ValueError(f"aggregate must be one of {list(_freq_map)}, got {aggregate!r}") # noqa: TRY003 

196 

197 all_df = cast(pl.DataFrame, self.all) 

198 date_col_name = self.data.date_col[0] if self.data.date_col else None 

199 if date_col_name is None or not all_df[date_col_name].dtype.is_temporal(): 

200 return {col: _geomean(series.drop_nulls()) for col, series in self.data.items()} 

201 

202 trunc = _freq_map[aggregate.lower()] 

203 agg_expr = ((1.0 + pl.col("ret")).product() - 1.0) if compounded else pl.col("ret").sum() 

204 

205 result: dict[str, float] = {} 

206 for col, series in self.data.items(): 

207 df = ( 

208 pl.DataFrame({"date": all_df[date_col_name], "ret": series}) 

209 .drop_nulls() 

210 .with_columns(pl.col("date").dt.truncate(trunc).alias("period")) 

211 ) 

212 period_rets = df.group_by("period").agg(agg_expr.alias("ret"))["ret"] 

213 result[col] = _geomean(period_rets) 

214 return result 

215 

216 def rar(self, periods: int | float = 252) -> dict[str, float]: 

217 """Risk-Adjusted Return: CAGR divided by exposure. 

218 

219 Measures annualised return per unit of market participation time, 

220 matching the quantstats convention. 

221 

222 Args: 

223 periods: Periods per year for CAGR annualisation. Defaults to ``periods_per_year``. 

224 

225 Returns: 

226 dict[str, float]: RAR per asset. 

227 """ 

228 cagr = self.cagr(periods=periods) 

229 exp = self.exposure() 

230 return {col: cagr[col] / exp[col] for col in cagr} 

231 

232 @columnwise_stat 

233 def calmar(self, series: pl.Series, periods: int | float | None = None) -> float: 

234 """Calmar ratio (CAGR divided by maximum drawdown). 

235 

236 Returns ``nan`` when the maximum drawdown is zero. 

237 

238 Args: 

239 series (pl.Series): Series of additive daily returns. 

240 periods: Annualisation factor. Defaults to ``periods_per_year``. 

241 

242 Returns: 

243 float: Calmar ratio, or ``nan`` if max drawdown is zero. 

244 """ 

245 raw_periods = periods or self.data._periods_per_year 

246 max_dd = _to_float(_drawdown_series(series).max()) 

247 if max_dd <= 0: 

248 return float("nan") 

249 n = len(series) 

250 comp_return = _comp_return(series) 

251 cagr = (1.0 + comp_return) ** (raw_periods / n) - 1.0 

252 return cagr / max_dd 

253 

254 @columnwise_stat 

255 def recovery_factor(self, series: pl.Series) -> float: 

256 """Recovery factor (total return divided by maximum drawdown). 

257 

258 Matches the quantstats convention: total return is the simple sum of 

259 returns, not compounded. Returns ``nan`` when the maximum drawdown 

260 is zero. 

261 

262 Args: 

263 series (pl.Series): Series of additive daily returns. 

264 

265 Returns: 

266 float: Recovery factor, or ``nan`` if max drawdown is zero. 

267 """ 

268 max_dd = _to_float(_drawdown_series(series).max()) 

269 if max_dd <= 0: 

270 return float("nan") 

271 total_return = _to_float(series.sum()) 

272 return abs(total_return) / max_dd 

273 

274 def max_drawdown_duration(self) -> dict[str, float | int | None]: 

275 """Maximum drawdown duration in calendar days (or periods) per asset. 

276 

277 When the index is a temporal column (``Date`` / ``Datetime``) the 

278 duration is expressed as calendar days spanned by the longest 

279 underwater run. For integer-indexed data each row counts as one 

280 period. 

281 

282 Returns: 

283 dict[str, float | int | None]: Asset → max drawdown duration. 

284 Returns 0 when there are no underwater periods. 

285 """ 

286 all_df = cast(pl.DataFrame, self.all) 

287 date_col_name = self.data.date_col[0] if self.data.date_col else None 

288 has_date = date_col_name is not None and all_df[date_col_name].dtype.is_temporal() 

289 result: dict[str, float | int | None] = {} 

290 for col, series in self.data.items(): 

291 nav = 1.0 + series.cast(pl.Float64).cum_sum() 

292 hwm = nav.cum_max() 

293 in_dd = nav < hwm 

294 

295 if not in_dd.any(): 

296 result[col] = 0 

297 continue 

298 

299 if has_date and date_col_name is not None: 

300 frame = pl.DataFrame({"date": all_df[date_col_name], "in_dd": in_dd}) 

301 else: 

302 frame = pl.DataFrame({"date": pl.Series(list(range(len(series))), dtype=pl.Int64), "in_dd": in_dd}) 

303 

304 frame = frame.with_columns(pl.col("in_dd").rle_id().alias("run_id")) 

305 dd_runs = ( 

306 frame.filter(pl.col("in_dd")) 

307 .group_by("run_id") 

308 .agg([pl.col("date").min().alias("start"), pl.col("date").max().alias("end")]) 

309 ) 

310 

311 if has_date: 

312 dd_runs = dd_runs.with_columns( 

313 ((pl.col("end") - pl.col("start")).dt.total_days() + 1).alias("duration") 

314 ) 

315 else: 

316 dd_runs = dd_runs.with_columns((pl.col("end") - pl.col("start") + 1).alias("duration")) 

317 

318 result[col] = int(_to_float(dd_runs["duration"].max())) 

319 return result 

320 

321 def monthly_win_rate(self) -> dict[str, float]: 

322 """Fraction of calendar months with a positive compounded return per asset. 

323 

324 Requires a temporal (Date / Datetime) index. Returns ``nan`` per 

325 asset when no temporal index is present. 

326 

327 Returns: 

328 dict[str, float]: Monthly win rate in [0, 1] per asset. 

329 """ 

330 all_df = cast(pl.DataFrame, self.all) 

331 date_col_name = self.data.date_col[0] if self.data.date_col else None 

332 if date_col_name is None or not all_df[date_col_name].dtype.is_temporal(): 

333 return {col: float("nan") for col, _ in self.data.items()} 

334 

335 result: dict[str, float] = {} 

336 for col, _ in self.data.items(): 

337 df = ( 

338 all_df.select([date_col_name, col]) 

339 .drop_nulls() 

340 .with_columns( 

341 [ 

342 pl.col(date_col_name).dt.year().alias("_year"), 

343 pl.col(date_col_name).dt.month().alias("_month"), 

344 ] 

345 ) 

346 ) 

347 monthly = ( 

348 df.group_by(["_year", "_month"]) 

349 .agg((pl.col(col) + 1.0).product().alias("gross")) 

350 .with_columns((pl.col("gross") - 1.0).alias("monthly_return")) 

351 ) 

352 n_total = len(monthly) 

353 if n_total == 0: 

354 result[col] = float("nan") 

355 else: 

356 n_positive = int((monthly["monthly_return"] > 0).sum()) 

357 result[col] = n_positive / n_total 

358 return result 

359 

360 def monthly_returns(self, eoy: bool = True, compounded: bool = True) -> dict[str, pl.DataFrame]: 

361 """Calculate monthly returns in a pivot-table format. 

362 

363 Groups returns by calendar month and year, producing a DataFrame with 

364 years as rows and months (JAN-DEC) as columns, plus an optional EOY 

365 column with the full-year compounded return. 

366 

367 Args: 

368 eoy (bool): Include an EOY column with the annual compounded return. 

369 Defaults to True. 

370 compounded (bool): Compound returns within each period. Defaults to True. 

371 

372 Returns: 

373 dict[str, pl.DataFrame]: Per-asset pivot tables with columns 

374 ``year``, ``JAN`` … ``DEC``, and optionally ``EOY``. 

375 

376 """ 

377 all_df = cast(pl.DataFrame, self.all) 

378 date_col_name = self.data.date_col[0] 

379 month_names = { 

380 1: "JAN", 

381 2: "FEB", 

382 3: "MAR", 

383 4: "APR", 

384 5: "MAY", 

385 6: "JUN", 

386 7: "JUL", 

387 8: "AUG", 

388 9: "SEP", 

389 10: "OCT", 

390 11: "NOV", 

391 12: "DEC", 

392 } 

393 month_order = list(month_names.values()) 

394 

395 result: dict[str, pl.DataFrame] = {} 

396 for col, series in self.data.items(): 

397 df = pl.DataFrame({"date": all_df[date_col_name], "ret": series}).drop_nulls() 

398 df = df.with_columns( 

399 [ 

400 pl.col("date").dt.year().alias("year"), 

401 pl.col("date").dt.month().alias("month_num"), 

402 ] 

403 ) 

404 

405 agg_expr = ((1.0 + pl.col("ret")).product() - 1.0) if compounded else pl.col("ret").sum() 

406 monthly = ( 

407 df.group_by(["year", "month_num"]) 

408 .agg(agg_expr.alias("ret")) 

409 .with_columns( 

410 pl.col("month_num") 

411 .map_elements(month_names.__getitem__, return_dtype=pl.String) 

412 .alias("month_name") 

413 ) 

414 .sort(["year", "month_num"]) 

415 ) 

416 

417 pivoted = monthly.pivot(on="month_name", index="year", values="ret", aggregate_function="first") 

418 for m in month_order: 

419 if m not in pivoted.columns: 

420 pivoted = pivoted.with_columns(pl.lit(0.0).alias(m)) 

421 pivoted = ( 

422 pivoted.select(["year", *month_order]) 

423 .fill_null(0.0) 

424 .with_columns(pl.col("year").cast(pl.Int32)) 

425 .sort("year") 

426 ) 

427 

428 if eoy: 

429 eoy_agg = ( 

430 df.group_by("year") 

431 .agg(agg_expr.alias("EOY")) 

432 .with_columns(pl.col("year").cast(pl.Int32)) 

433 .sort("year") 

434 ) 

435 pivoted = pivoted.join(eoy_agg, on="year").sort("year") 

436 

437 result[col] = pivoted 

438 return result 

439 

440 def distribution(self, compounded: bool = True) -> dict[str, dict[str, dict[str, list[float]]]]: 

441 """Analyse return distributions across daily, weekly, monthly, quarterly, and yearly periods. 

442 

443 For each period, splits values into inliers and outliers using the 

444 IQR method (1.5 * IQR beyond Q1/Q3). 

445 

446 Args: 

447 compounded (bool): Compound returns within each period. Defaults to True. 

448 

449 Returns: 

450 dict: Nested dict ``{asset: {period: {"values": [...], "outliers": [...]}}}`` 

451 where period is one of ``"Daily"``, ``"Weekly"``, ``"Monthly"``, 

452 ``"Quarterly"``, ``"Yearly"``. 

453 

454 """ 

455 all_df = cast(pl.DataFrame, self.all) 

456 date_col_name = self.data.date_col[0] 

457 

458 def _agg(df: pl.DataFrame, group_col: str) -> pl.Series: 

459 """Aggregate returns within each group using product or sum.""" 

460 expr = ((1.0 + pl.col("ret")).product() - 1.0) if compounded else pl.col("ret").sum() 

461 return df.group_by(group_col).agg(expr.alias("ret"))["ret"] 

462 

463 def _iqr_split(s: pl.Series) -> dict[str, list[float]]: 

464 """Split series into inliers and outliers using the IQR method.""" 

465 q1 = cast(float, s.quantile(0.25)) 

466 q3 = cast(float, s.quantile(0.75)) 

467 iqr = q3 - q1 

468 mask = (s >= q1 - 1.5 * iqr) & (s <= q3 + 1.5 * iqr) 

469 return {"values": s.filter(mask).to_list(), "outliers": s.filter(~mask).to_list()} 

470 

471 result: dict[str, dict[str, dict[str, list[float]]]] = {} 

472 for col, series in self.data.items(): 

473 df = pl.DataFrame({"date": all_df[date_col_name], "ret": series}).drop_nulls() 

474 df = df.with_columns( 

475 [ 

476 pl.col("date").dt.truncate("1w").alias("week"), 

477 pl.col("date").dt.truncate("1mo").alias("month"), 

478 pl.col("date").dt.truncate("3mo").alias("quarter"), 

479 pl.col("date").dt.truncate("1y").alias("year"), 

480 ] 

481 ) 

482 result[col] = { 

483 "Daily": _iqr_split(df["ret"]), 

484 "Weekly": _iqr_split(_agg(df, "week")), 

485 "Monthly": _iqr_split(_agg(df, "month")), 

486 "Quarterly": _iqr_split(_agg(df, "quarter")), 

487 "Yearly": _iqr_split(_agg(df, "year")), 

488 } 

489 return result 

490 

491 def compare( 

492 self, 

493 aggregate: str | None = None, 

494 compounded: bool = True, 

495 round_vals: int | None = None, 

496 ) -> dict[str, pl.DataFrame]: 

497 """Compare each asset's returns against the benchmark. 

498 

499 Aligns returns and benchmark by date, multiplies by 100 (percentage), 

500 then computes a ``Multiplier`` (Returns / Benchmark) and ``Won`` 

501 indicator (``"+"`` when the asset outperformed, ``"-"`` otherwise). 

502 

503 Args: 

504 aggregate (str | None): Pandas-style resample frequency for 

505 period aggregation (e.g. ``"ME"``, ``"QE"``, ``"YE"``). 

506 ``None`` returns daily rows. Defaults to None. 

507 compounded (bool): Compound returns when aggregating. Defaults to True. 

508 round_vals (int | None): Decimal places to round. Defaults to None. 

509 

510 Returns: 

511 dict[str, pl.DataFrame]: Per-asset DataFrames with columns 

512 ``Benchmark``, ``Returns``, ``Multiplier``, ``Won``. 

513 

514 Raises: 

515 AttributeError: If no benchmark data is attached. 

516 

517 """ 

518 if self.data.benchmark is None: 

519 raise AttributeError("No benchmark data available") # noqa: TRY003 

520 

521 all_df = cast(pl.DataFrame, self.all) 

522 date_col_name = self.data.date_col[0] 

523 bench_col = self.data.benchmark.columns[0] 

524 

525 _freq_map = {"ME": "1mo", "QE": "3mo", "YE": "1y", "W": "1w"} 

526 

527 def _agg_series(df: pl.DataFrame, period_col: str, val_col: str) -> pl.DataFrame: 

528 """Aggregate a value column grouped by period using product or sum.""" 

529 expr = ((1.0 + pl.col(val_col)).product() - 1.0) if compounded else pl.col(val_col).sum() 

530 return df.group_by(period_col).agg(expr.alias(val_col)).sort(period_col) 

531 

532 result: dict[str, pl.DataFrame] = {} 

533 for col in self.data.returns.columns: 

534 df = all_df.select( 

535 [ 

536 pl.col(date_col_name), 

537 pl.col(col).alias("ret"), 

538 pl.col(bench_col).alias("bench"), 

539 ] 

540 ) 

541 

542 if aggregate is not None and aggregate in _freq_map: 

543 trunc = _freq_map[aggregate] 

544 df = df.with_columns(pl.col(date_col_name).dt.truncate(trunc).alias("period")) 

545 ret_agg = _agg_series(df.drop_nulls(subset=["ret"]), "period", "ret") 

546 bench_agg = _agg_series(df.drop_nulls(subset=["bench"]), "period", "bench") 

547 df = ret_agg.join(bench_agg, on="period", how="full", coalesce=True).sort("period") 

548 ret_col, bench_col_name, _date_alias = "ret", "bench", "period" 

549 else: 

550 ret_col, bench_col_name, _date_alias = "ret", "bench", date_col_name 

551 

552 ret_pct = (df[ret_col] * 100).alias("Returns") 

553 bench_pct = (df[bench_col_name] * 100).alias("Benchmark") 

554 out = pl.DataFrame( 

555 { 

556 "Benchmark": bench_pct, 

557 "Returns": ret_pct, 

558 } 

559 ) 

560 out = out.with_columns( 

561 [ 

562 (pl.col("Returns") / pl.col("Benchmark").replace(0.0, None)).alias("Multiplier"), 

563 pl.when(pl.col("Returns") >= pl.col("Benchmark")) 

564 .then(pl.lit("+")) 

565 .otherwise(pl.lit("-")) 

566 .alias("Won"), 

567 ] 

568 ) 

569 

570 if round_vals is not None: 

571 out = out.with_columns( 

572 [ 

573 pl.col("Benchmark").round(round_vals), 

574 pl.col("Returns").round(round_vals), 

575 pl.col("Multiplier").round(round_vals), 

576 ] 

577 ) 

578 

579 result[col] = out 

580 return result 

581 

582 def worst_n_periods(self, n: int = 5) -> dict[str, list[float | None]]: 

583 """Return the N worst return periods per asset. 

584 

585 If a series has fewer than ``n`` non-null observations the list is 

586 padded with ``None`` on the right. 

587 

588 Args: 

589 n: Number of worst periods to return. Defaults to 5. 

590 

591 Returns: 

592 dict[str, list[float | None]]: Sorted worst returns per asset. 

593 """ 

594 result: dict[str, list[float | None]] = {} 

595 for col, series in self.data.items(): 

596 nonnull = series.drop_nulls() 

597 worst: list[float | None] = nonnull.sort(descending=False).head(n).to_list() 

598 while len(worst) < n: 

599 worst.append(None) 

600 result[col] = worst 

601 return result 

602 

603 # ── Capture ratios ──────────────────────────────────────────────────────── 

604 

605 def up_capture(self, benchmark: pl.Series) -> dict[str, float]: 

606 """Up-market capture ratio relative to an explicit benchmark series. 

607 

608 Measures the fraction of the benchmark's upside that the strategy 

609 captures. A value greater than 1.0 means the strategy outperformed 

610 the benchmark in rising markets. 

611 

612 Args: 

613 benchmark: Benchmark return series aligned row-by-row with the data. 

614 

615 Returns: 

616 dict[str, float]: Up capture ratio per asset. 

617 """ 

618 up_mask = benchmark > 0 

619 bench_up = benchmark.filter(up_mask).drop_nulls() 

620 if bench_up.is_empty(): 

621 return {col: float("nan") for col, _ in self.data.items()} 

622 bench_geom = float((bench_up + 1.0).product()) ** (1.0 / len(bench_up)) - 1.0 

623 if bench_geom == 0.0: # pragma: no cover 

624 return {col: float("nan") for col, _ in self.data.items()} 

625 result: dict[str, float] = {} 

626 for col, series in self.data.items(): 

627 strat_up = series.filter(up_mask).drop_nulls() 

628 if strat_up.is_empty(): 

629 result[col] = float("nan") 

630 else: 

631 strat_geom = float((strat_up + 1.0).product()) ** (1.0 / len(strat_up)) - 1.0 

632 result[col] = strat_geom / bench_geom 

633 return result 

634 

635 def down_capture(self, benchmark: pl.Series) -> dict[str, float]: 

636 """Down-market capture ratio relative to an explicit benchmark series. 

637 

638 A value less than 1.0 means the strategy lost less than the benchmark 

639 in falling markets (a desirable property). 

640 

641 Args: 

642 benchmark: Benchmark return series aligned row-by-row with the data. 

643 

644 Returns: 

645 dict[str, float]: Down capture ratio per asset. 

646 """ 

647 down_mask = benchmark < 0 

648 bench_down = benchmark.filter(down_mask).drop_nulls() 

649 if bench_down.is_empty(): 

650 return {col: float("nan") for col, _ in self.data.items()} 

651 bench_geom = float((bench_down + 1.0).product()) ** (1.0 / len(bench_down)) - 1.0 

652 if bench_geom == 0.0: # pragma: no cover 

653 return {col: float("nan") for col, _ in self.data.items()} 

654 result: dict[str, float] = {} 

655 for col, series in self.data.items(): 

656 strat_down = series.filter(down_mask).drop_nulls() 

657 if strat_down.is_empty(): 

658 result[col] = float("nan") 

659 else: 

660 strat_geom = float((strat_down + 1.0).product()) ** (1.0 / len(strat_down)) - 1.0 

661 result[col] = strat_geom / bench_geom 

662 return result 

663 

664 # ── Summary & breakdown ──────────────────────────────────────────────────── 

665 

666 def annual_breakdown(self) -> pl.DataFrame: 

667 """Summary statistics broken down by calendar year. 

668 

669 Groups the data by calendar year using the date index, computes a 

670 full :py:meth:`summary` for each year, and stacks the results with an 

671 additional ``year`` column. 

672 

673 Returns: 

674 pl.DataFrame: Columns ``year``, ``metric``, one per asset, sorted 

675 by ``year``. 

676 

677 Raises: 

678 ValueError: If the data has no date index. 

679 """ 

680 all_df = cast(pl.DataFrame, self.all) 

681 date_col_name = self.data.date_col[0] if self.data.date_col else None 

682 has_temporal = date_col_name is not None and all_df[date_col_name].dtype.is_temporal() 

683 

684 from ..data import Data 

685 

686 if not has_temporal: 

687 # Integer-index fallback: group by chunks of ~_periods_per_year rows 

688 chunk = round(self.data._periods_per_year) 

689 total = all_df.height 

690 frames_int: list[pl.DataFrame] = [] 

691 for i, start in enumerate(range(0, total, chunk), start=1): 

692 chunk_all = all_df.slice(start, chunk) 

693 if chunk_all.height < max(5, chunk // 4): 

694 continue 

695 chunk_index = chunk_all.select(self.data.date_col) 

696 chunk_returns = chunk_all.select(self.data.returns.columns) 

697 chunk_benchmark = ( 

698 chunk_all.select(self.data.benchmark.columns) if self.data.benchmark is not None else None 

699 ) 

700 chunk_data = Data(returns=chunk_returns, index=chunk_index, benchmark=chunk_benchmark) 

701 chunk_summary = cast(Any, type(self))(chunk_data).summary() 

702 chunk_summary = chunk_summary.with_columns(pl.lit(i).alias("year")) 

703 frames_int.append(chunk_summary) 

704 if not frames_int: 

705 return pl.DataFrame() 

706 result_int = pl.concat(frames_int) 

707 ordered_int = ["year", "metric", *[c for c in result_int.columns if c not in ("year", "metric")]] 

708 return result_int.select(ordered_int) 

709 

710 if date_col_name is None: # unreachable: has_temporal guarantees non-None # pragma: no cover 

711 return pl.DataFrame() # pragma: no cover 

712 years = all_df[date_col_name].dt.year().unique().sort().to_list() 

713 

714 frames: list[pl.DataFrame] = [] 

715 for year in years: 

716 year_all = all_df.filter(pl.col(date_col_name).dt.year() == year) 

717 if year_all.height < 2: 

718 continue 

719 year_index = year_all.select([date_col_name]) 

720 year_returns = year_all.select(self.data.returns.columns) 

721 year_benchmark = year_all.select(self.data.benchmark.columns) if self.data.benchmark is not None else None 

722 year_data = Data(returns=year_returns, index=year_index, benchmark=year_benchmark) 

723 year_summary = cast(Any, type(self))(year_data).summary() 

724 year_summary = year_summary.with_columns(pl.lit(year).alias("year")) 

725 frames.append(year_summary) 

726 

727 if not frames: 

728 asset_cols = list(self.data.returns.columns) 

729 schema: dict[str, type[pl.DataType]] = { 

730 "year": pl.Int32, 

731 "metric": pl.String, 

732 **dict.fromkeys(asset_cols, pl.Float64), 

733 } 

734 return pl.DataFrame(schema=schema) 

735 

736 result = pl.concat(frames) 

737 ordered = ["year", "metric", *[c for c in result.columns if c not in ("year", "metric")]] 

738 return result.select(ordered) 

739 

740 def summary(self) -> pl.DataFrame: 

741 """Summary statistics for each asset as a tidy DataFrame. 

742 

743 Each row is one metric; each column beyond ``metric`` is one asset. 

744 

745 Returns: 

746 pl.DataFrame: A DataFrame with a ``metric`` column followed by one 

747 column per asset. 

748 """ 

749 assets = [col for col, _ in self.data.items()] 

750 

751 def _safe(fn: Any) -> dict[str, Any]: 

752 """Call *fn()* and return its result; return NaN for each asset on any exception.""" 

753 try: 

754 return fn() 

755 except Exception: 

756 return dict.fromkeys(assets, float("nan")) 

757 

758 metrics: dict[str, dict[str, Any]] = { 

759 "avg_return": _safe(self.avg_return), 

760 "avg_win": _safe(self.avg_win), 

761 "avg_loss": _safe(self.avg_loss), 

762 "win_rate": _safe(self.win_rate), 

763 "profit_factor": _safe(self.profit_factor), 

764 "payoff_ratio": _safe(self.payoff_ratio), 

765 "monthly_win_rate": _safe(self.monthly_win_rate), 

766 "best": _safe(self.best), 

767 "worst": _safe(self.worst), 

768 "volatility": _safe(self.volatility), 

769 "sharpe": _safe(self.sharpe), 

770 "skew": _safe(self.skew), 

771 "kurtosis": _safe(self.kurtosis), 

772 "value_at_risk": _safe(self.value_at_risk), 

773 "conditional_value_at_risk": _safe(self.conditional_value_at_risk), 

774 "max_drawdown": _safe(self.max_drawdown), 

775 "avg_drawdown": _safe(self.avg_drawdown), 

776 "max_drawdown_duration": _safe(self.max_drawdown_duration), 

777 "calmar": _safe(self.calmar), 

778 "recovery_factor": _safe(self.recovery_factor), 

779 } 

780 

781 rows: list[dict[str, Any]] = [ 

782 {"metric": name, **{asset: values.get(asset) for asset in assets}} for name, values in metrics.items() 

783 ] 

784 return pl.DataFrame(rows)