Coverage for src / jquantstats / _stats / _basic.py: 100%

266 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-07 14:28 +0000

1"""Basic statistical metrics for financial returns data.""" 

2 

3from __future__ import annotations 

4 

5import math 

6from collections.abc import Iterable 

7from typing import TYPE_CHECKING, cast 

8 

9import numpy as np 

10import polars as pl 

11from scipy.stats import norm 

12 

13from ._core import columnwise_stat 

14from ._internals import _annualization_factor, _comp_return 

15 

16# ── Basic statistics mixin ─────────────────────────────────────────────────── 

17 

18 

19class _BasicStatsMixin: 

20 """Mixin providing basic return/risk and win/loss financial statistics. 

21 

22 Covers: basic statistics (skew, kurtosis, avg return/win/loss), volatility, 

23 win/loss metrics (payoff ratio, profit factor), and risk metrics (VaR, CVaR, 

24 win rate, kelly criterion, best/worst, exposure). 

25 

26 Attributes (provided by the concrete subclass): 

27 data: The :class:`~jquantstats._data.Data` object. 

28 all: Combined DataFrame for efficient column selection. 

29 """ 

30 

31 if TYPE_CHECKING: 

32 from ._protocol import DataLike 

33 

34 data: DataLike 

35 all: pl.DataFrame | None 

36 

37 @staticmethod 

38 def _mean_positive_expr(series: pl.Series) -> float: 

39 """Return the mean of all positive values in *series*, or NaN if none exist.""" 

40 return cast(float, series.filter(series > 0).mean()) 

41 

42 @staticmethod 

43 def _mean_negative_expr(series: pl.Series) -> float: 

44 """Return the mean of all negative values in *series*, or NaN if none exist.""" 

45 return cast(float, series.filter(series < 0).mean()) 

46 

47 # ── Basic statistics ────────────────────────────────────────────────────── 

48 

49 @columnwise_stat 

50 def skew(self, series: pl.Series) -> int | float | None: 

51 """Calculate skewness (asymmetry) for each numeric column. 

52 

53 Args: 

54 series (pl.Series): The series to calculate skewness for. 

55 

56 Returns: 

57 float: The skewness value. 

58 

59 """ 

60 return series.skew(bias=False) 

61 

62 @columnwise_stat 

63 def kurtosis(self, series: pl.Series) -> int | float | None: 

64 """Calculate the kurtosis of returns. 

65 

66 The degree to which a distribution peak compared to a normal distribution. 

67 

68 Args: 

69 series (pl.Series): The series to calculate kurtosis for. 

70 

71 Returns: 

72 float: The kurtosis value. 

73 

74 """ 

75 return series.kurtosis(bias=False) 

76 

77 @columnwise_stat 

78 def avg_return(self, series: pl.Series) -> float: 

79 """Calculate average return per non-zero, non-null value. 

80 

81 Args: 

82 series (pl.Series): The series to calculate average return for. 

83 

84 Returns: 

85 float: The average return value. 

86 

87 """ 

88 return cast(float, series.filter(series.is_not_null() & (series != 0)).mean()) 

89 

90 @columnwise_stat 

91 def avg_win(self, series: pl.Series) -> float: 

92 """Calculate the average winning return/trade for an asset. 

93 

94 Args: 

95 series (pl.Series): The series to calculate average win for. 

96 

97 Returns: 

98 float: The average winning return. 

99 

100 """ 

101 return self._mean_positive_expr(series) 

102 

103 @columnwise_stat 

104 def avg_loss(self, series: pl.Series) -> float: 

105 """Calculate the average loss return/trade for a period. 

106 

107 Args: 

108 series (pl.Series): The series to calculate average loss for. 

109 

110 Returns: 

111 float: The average loss return. 

112 

113 """ 

114 return self._mean_negative_expr(series) 

115 

116 @columnwise_stat 

117 def comp(self, series: pl.Series) -> float: 

118 """Calculate the total compounded return over the full period. 

119 

120 Computed as product(1 + r) - 1. 

121 

122 Args: 

123 series (pl.Series): The series to calculate compounded return for. 

124 

125 Returns: 

126 float: Total compounded return. 

127 

128 """ 

129 return _comp_return(series) 

130 

131 @columnwise_stat 

132 def geometric_mean(self, series: pl.Series, periods: int | float | None = None, annualize: bool = False) -> float: 

133 """Calculate the geometric mean of returns. 

134 

135 Computed as the per-period geometric average: (∏(1 + rᵢ))^(1/n) - 1. 

136 When annualized, raises to the power of periods_per_year instead of 1/n. 

137 

138 Args: 

139 series (pl.Series): The series to calculate geometric mean for. 

140 periods (int | float, optional): Periods per year for annualization. Defaults to periods_per_year. 

141 annualize (bool): Whether to annualize the result. Defaults to False. 

142 

143 Returns: 

144 float: The geometric mean return. 

145 

146 """ 

147 clean = series.drop_nulls().cast(pl.Float64) 

148 n = clean.len() 

149 if n == 0: 

150 return float(np.nan) 

151 compound = float((1.0 + clean).product()) 

152 if compound <= 0: 

153 return float(np.nan) 

154 exponent = (periods or self.data._periods_per_year) / n if annualize else (1.0 / n) 

155 return float(compound**exponent) - 1.0 

156 

157 # ── Volatility & risk ───────────────────────────────────────────────────── 

158 

159 @columnwise_stat 

160 def volatility(self, series: pl.Series, periods: int | float | None = None, annualize: bool = True) -> float: 

161 """Calculate the volatility of returns. 

162 

163 - Std dev of returns 

164 - Annualized by sqrt(periods) if `annualize` is True. 

165 

166 Args: 

167 series (pl.Series): The series to calculate volatility for. 

168 periods (int, optional): Number of periods per year. Defaults to 252. 

169 annualize (bool, optional): Whether to annualize the result. Defaults to True. 

170 

171 Returns: 

172 float: The volatility value. 

173 

174 """ 

175 raw_periods = periods or self.data._periods_per_year 

176 

177 # Ensure it's numeric 

178 if not isinstance(raw_periods, int | float): 

179 raise TypeError(f"Expected int or float for periods, got {type(raw_periods).__name__}") # noqa: TRY003 

180 

181 factor = _annualization_factor(raw_periods) if annualize else 1.0 

182 std_val = cast(float, series.std()) 

183 return (std_val if std_val is not None else 0.0) * factor 

184 

185 # ── Win / loss metrics ──────────────────────────────────────────────────── 

186 

187 @columnwise_stat 

188 def payoff_ratio(self, series: pl.Series) -> float: 

189 """Measure the payoff ratio. 

190 

191 The payoff ratio is calculated as average win / abs(average loss). 

192 

193 Args: 

194 series (pl.Series): The series to calculate payoff ratio for. 

195 

196 Returns: 

197 float: The payoff ratio value. 

198 

199 """ 

200 avg_win = cast(float, series.filter(series > 0).mean()) 

201 avg_loss = float(np.abs(cast(float, series.filter(series < 0).mean()))) 

202 return avg_win / avg_loss 

203 

204 def win_loss_ratio(self) -> dict[str, float]: 

205 """Shorthand for payoff_ratio(). 

206 

207 Returns: 

208 dict[str, float]: Dictionary mapping asset names to win/loss ratios. 

209 

210 """ 

211 return self.payoff_ratio() 

212 

213 @columnwise_stat 

214 def profit_ratio(self, series: pl.Series) -> float: 

215 """Measure the profit ratio. 

216 

217 The profit ratio is calculated as win ratio / loss ratio. 

218 

219 Args: 

220 series (pl.Series): The series to calculate profit ratio for. 

221 

222 Returns: 

223 float: The profit ratio value. 

224 

225 """ 

226 wins = series.filter(series >= 0) 

227 losses = series.filter(series < 0) 

228 

229 try: 

230 win_mean = cast(float, wins.mean()) 

231 loss_mean = cast(float, losses.mean()) 

232 win_ratio = float(np.abs(win_mean / wins.count())) 

233 loss_ratio = float(np.abs(loss_mean / losses.count())) 

234 

235 return win_ratio / loss_ratio 

236 

237 except TypeError: 

238 return float(np.nan) 

239 

240 @columnwise_stat 

241 def profit_factor(self, series: pl.Series) -> float: 

242 """Measure the profit factor. 

243 

244 The profit factor is calculated as wins / loss. 

245 

246 Args: 

247 series (pl.Series): The series to calculate profit factor for. 

248 

249 Returns: 

250 float: The profit factor value. 

251 

252 """ 

253 wins = series.filter(series > 0) 

254 losses = series.filter(series < 0) 

255 wins_sum = wins.sum() 

256 losses_sum = losses.sum() 

257 

258 return float(np.abs(float(wins_sum) / float(losses_sum))) 

259 

260 # ── Risk metrics ────────────────────────────────────────────────────────── 

261 

262 @columnwise_stat 

263 def value_at_risk(self, series: pl.Series, sigma: float = 1.0, alpha: float = 0.05) -> float: 

264 """Calculate the daily value-at-risk. 

265 

266 Uses variance-covariance calculation with confidence level. 

267 

268 Args: 

269 series (pl.Series): The series to calculate value at risk for. 

270 alpha (float, optional): Confidence level. Defaults to 0.05. 

271 sigma (float, optional): Standard deviation multiplier. Defaults to 1.0. 

272 

273 Returns: 

274 float: The value at risk. 

275 

276 """ 

277 mean_val = cast(float, series.mean()) 

278 std_val = cast(float, series.std()) 

279 mu = mean_val if mean_val is not None else 0.0 

280 sigma *= std_val if std_val is not None else 0.0 

281 

282 return float(norm.ppf(alpha, mu, sigma)) 

283 

284 @columnwise_stat 

285 def _conditional_value_at_risk_impl(self, series: pl.Series, sigma: float = 1.0, alpha: float = 0.05) -> float: 

286 """Inner per-series implementation of conditional value-at-risk.""" 

287 mean_val = cast(float, series.mean()) 

288 std_val = cast(float, series.std()) 

289 mu = mean_val if mean_val is not None else 0.0 

290 sigma *= std_val if std_val is not None else 0.0 

291 

292 var = norm.ppf(alpha, mu, sigma) 

293 

294 # Compute mean of returns less than or equal to VaR 

295 # Cast to Any or pl.Series to suppress Ty error 

296 # Cast the mask to pl.Expr to satisfy type checker 

297 mask = cast(Iterable[bool], series < var) 

298 return cast(float, series.filter(mask).mean()) 

299 

300 def conditional_value_at_risk( 

301 self, sigma: float = 1.0, confidence: float = 0.95, **kwargs: float 

302 ) -> dict[str, float]: 

303 """Calculate the conditional value-at-risk (CVaR / Expected Shortfall). 

304 

305 Also known as CVaR or expected shortfall, calculated for each numeric column. 

306 

307 Args: 

308 sigma (float, optional): Standard deviation multiplier. Defaults to 1.0. 

309 confidence (float, optional): Confidence level (e.g. 0.95 for 95 %). 

310 Converted internally to ``alpha = 1 - confidence``. Defaults to 0.95. 

311 alpha (float, optional): Tail probability (lower tail). ``alpha`` is the 

312 probability mass in the *loss* tail, so ``alpha = 1 - confidence``. 

313 For example, a 95 % confidence level corresponds to ``alpha = 0.05`` 

314 (the default). 

315 **kwargs: Legacy keyword arguments. Passing ``confidence`` (e.g. 

316 ``confidence=0.95``) is accepted for backwards compatibility with 

317 QuantStats but emits a :class:`DeprecationWarning`. Use 

318 ``alpha = 1 - confidence`` instead. 

319 

320 Returns: 

321 dict[str, float]: The conditional value at risk per asset column. 

322 

323 Raises: 

324 TypeError: If unexpected keyword arguments are passed. 

325 

326 """ 

327 return self._conditional_value_at_risk_impl(sigma=sigma, alpha=1.0 - confidence) 

328 

329 @staticmethod 

330 def _drawdown_with_baseline(series: pl.Series) -> pl.Series: 

331 """Compute drawdown series with a phantom zero-return baseline prepended. 

332 

333 Matches the quantstats convention: a negative first return is treated as 

334 a drawdown from the initial capital of 1.0, not as the new high-water mark. 

335 """ 

336 extended = pl.concat([pl.Series([0.0]), series.cast(pl.Float64)]) 

337 nav = (1.0 + extended).cum_prod() 

338 hwm = nav.cum_max() 

339 dd = ((hwm - nav) / hwm.clip(lower_bound=1e-10)).clip(lower_bound=0.0) 

340 return dd[1:] # drop phantom point 

341 

342 @staticmethod 

343 def _ulcer_index_series(series: pl.Series) -> float: 

344 """Compute ulcer index for a single returns series.""" 

345 dd = _BasicStatsMixin._drawdown_with_baseline(series) 

346 n = series.len() 

347 return float(np.sqrt(float((dd**2).sum()) / (n - 1))) 

348 

349 @columnwise_stat 

350 def ulcer_index(self, series: pl.Series) -> float: 

351 """Calculate the Ulcer Index (downside risk measurement). 

352 

353 Measures the depth and duration of drawdowns as the root mean square 

354 of squared drawdowns: sqrt(sum(dd²) / (n - 1)). 

355 

356 Args: 

357 series (pl.Series): The series to calculate ulcer index for. 

358 

359 Returns: 

360 float: Ulcer Index value. 

361 

362 """ 

363 return self._ulcer_index_series(series) 

364 

365 @columnwise_stat 

366 def ulcer_performance_index(self, series: pl.Series, rf: float = 0.0) -> float: 

367 """Calculate the Ulcer Performance Index (UPI). 

368 

369 Risk-adjusted return using Ulcer Index as the risk measure: 

370 (compounded_return - rf) / ulcer_index. 

371 

372 Args: 

373 series (pl.Series): The series to calculate UPI for. 

374 rf (float): Risk-free rate. Defaults to 0. 

375 

376 Returns: 

377 float: Ulcer Performance Index. 

378 

379 """ 

380 comp = _comp_return(series) 

381 ui = self._ulcer_index_series(series) 

382 return float(np.nan) if ui == 0 else (comp - rf) / ui 

383 

384 @columnwise_stat 

385 def serenity_index(self, series: pl.Series, rf: float = 0.0) -> float: 

386 """Calculate the Serenity Index. 

387 

388 Combines the Ulcer Index with a CVaR-based pitfall measure: 

389 (sum_returns - rf) / (ulcer_index * pitfall), where 

390 pitfall = -CVaR(drawdowns) / std(returns). 

391 

392 Args: 

393 series (pl.Series): The series to calculate serenity index for. 

394 rf (float): Risk-free rate. Defaults to 0. 

395 

396 Returns: 

397 float: Serenity Index. 

398 

399 """ 

400 std_val = cast(float, series.std()) 

401 if not std_val: 

402 return float(np.nan) 

403 

404 # Negate drawdowns to match quantstats sign convention (negative = below peak) 

405 dd_neg = -self._drawdown_with_baseline(series) 

406 mu = cast(float, dd_neg.mean()) 

407 sigma = cast(float, dd_neg.std()) 

408 var_threshold = float(norm.ppf(0.05, mu, sigma)) 

409 mask = cast(Iterable[bool], dd_neg < var_threshold) 

410 cvar_val = cast(float, dd_neg.filter(mask).mean()) 

411 

412 pitfall = -cvar_val / std_val 

413 ui = self._ulcer_index_series(series) 

414 denominator = ui * pitfall 

415 return float(np.nan) if denominator == 0 else (float(series.sum()) - rf) / denominator 

416 

417 @columnwise_stat 

418 def win_rate(self, series: pl.Series) -> float: 

419 """Calculate the win ratio for a period. 

420 

421 Args: 

422 series (pl.Series): The series to calculate win rate for. 

423 

424 Returns: 

425 float: The win rate value. 

426 

427 """ 

428 num_pos = series.filter(series > 0).count() 

429 num_nonzero = series.filter(series != 0).count() 

430 return float(num_pos / num_nonzero) 

431 

432 @columnwise_stat 

433 def autocorr_penalty(self, series: pl.Series) -> float: 

434 """Calculate the autocorrelation penalty for risk-adjusted metrics. 

435 

436 Computes a penalty factor that accounts for autocorrelation in returns, 

437 which can inflate Sharpe and Sortino ratios. 

438 

439 Args: 

440 series (pl.Series): The series to calculate autocorrelation penalty for. 

441 

442 Returns: 

443 float: Autocorrelation penalty factor (>= 1). 

444 

445 """ 

446 arr = series.drop_nulls().to_numpy() 

447 num = len(arr) 

448 coef = float(np.abs(np.corrcoef(arr[:-1], arr[1:])[0, 1])) 

449 x = np.arange(1, num) 

450 corr = ((num - x) / num) * (coef**x) 

451 return float(np.sqrt(1 + 2 * np.sum(corr))) 

452 

453 @staticmethod 

454 def _max_consecutive(mask: pl.Series) -> int: 

455 """Return the longest run of True values in a boolean mask. 

456 

457 Args: 

458 mask (pl.Series): Boolean series (True = qualifying period). 

459 

460 Returns: 

461 int: Length of the longest consecutive True run. 

462 

463 """ 

464 group_ids = mask.rle_id() 

465 df = pl.DataFrame({"v": mask.cast(pl.Int32), "g": group_ids}) 

466 result = ( 

467 df.with_columns((pl.int_range(pl.len()).over("g") + 1).alias("rank")) 

468 .select((pl.col("v") * pl.col("rank")).max()) 

469 .item() 

470 ) 

471 return int(result) if result is not None else 0 

472 

473 @columnwise_stat 

474 def consecutive_wins(self, series: pl.Series) -> int: 

475 """Calculate the maximum number of consecutive winning periods. 

476 

477 Args: 

478 series (pl.Series): The series to calculate consecutive wins for. 

479 

480 Returns: 

481 int: Maximum number of consecutive winning periods. 

482 

483 """ 

484 return self._max_consecutive(series > 0) 

485 

486 @columnwise_stat 

487 def consecutive_losses(self, series: pl.Series) -> int: 

488 """Calculate the maximum number of consecutive losing periods. 

489 

490 Args: 

491 series (pl.Series): The series to calculate consecutive losses for. 

492 

493 Returns: 

494 int: Maximum number of consecutive losing periods. 

495 

496 """ 

497 return self._max_consecutive(series < 0) 

498 

499 @columnwise_stat 

500 def risk_of_ruin(self, series: pl.Series) -> float: 

501 """Calculate the risk of ruin (probability of losing all capital). 

502 

503 Uses the formula: ((1 - win_rate) / (1 + win_rate)) ^ n, 

504 where n is the number of periods. 

505 

506 Args: 

507 series (pl.Series): The series to calculate risk of ruin for. 

508 

509 Returns: 

510 float: The risk of ruin probability. 

511 

512 """ 

513 num_pos = series.filter(series > 0).count() 

514 num_nonzero = series.filter(series != 0).count() 

515 wins = float(num_pos / num_nonzero) 

516 n = series.len() 

517 return ((1 - wins) / (1 + wins)) ** n 

518 

519 @columnwise_stat 

520 def tail_ratio(self, series: pl.Series, cutoff: float = 0.95) -> float: 

521 """Calculate the tail ratio (right tail / left tail). 

522 

523 Measures the ratio between the upper and lower tails of the return 

524 distribution: abs(quantile(cutoff) / quantile(1 - cutoff)). 

525 

526 Args: 

527 series (pl.Series): The series to calculate tail ratio for. 

528 cutoff (float): Percentile cutoff for tail analysis. Defaults to 0.95. 

529 

530 Returns: 

531 float: Tail ratio. 

532 

533 """ 

534 upper = cast(float, series.quantile(cutoff, interpolation="linear")) 

535 lower = cast(float, series.quantile(1 - cutoff, interpolation="linear")) 

536 if upper is None or lower is None or lower == 0: 

537 return float(np.nan) 

538 return float(np.abs(upper / lower)) 

539 

540 def cpc_index(self) -> dict[str, float]: 

541 """Calculate the CPC Index (Profit Factor * Win Rate * Win-Loss Ratio). 

542 

543 Returns: 

544 dict[str, float]: Dictionary mapping asset names to CPC Index values. 

545 

546 """ 

547 pf = self.profit_factor() 

548 wr = self.win_rate() 

549 wlr = self.win_loss_ratio() 

550 return {col: pf[col] * wr[col] * wlr[col] for col in pf} 

551 

552 def common_sense_ratio(self) -> dict[str, float]: 

553 """Calculate the Common Sense Ratio (Profit Factor * Tail Ratio). 

554 

555 Returns: 

556 dict[str, float]: Dictionary mapping asset names to Common Sense Ratio values. 

557 

558 """ 

559 pf = self.profit_factor() 

560 tr = self.tail_ratio() 

561 return {col: pf[col] * tr[col] for col in pf} 

562 

563 def outliers(self, quantile: float = 0.95) -> dict[str, pl.Series]: 

564 """Return only the returns above a quantile threshold. 

565 

566 Args: 

567 quantile (float): Upper quantile threshold. Defaults to 0.95. 

568 

569 Returns: 

570 dict[str, pl.Series]: Filtered series per asset containing only 

571 returns above the quantile. 

572 

573 """ 

574 result = {} 

575 for col, series in self.data.items(): 

576 threshold = cast(float, series.quantile(quantile, interpolation="linear")) 

577 result[col] = series.filter(series > threshold).drop_nulls() 

578 return result 

579 

580 def remove_outliers(self, quantile: float = 0.95) -> dict[str, pl.Series]: 

581 """Return returns with values above a quantile threshold removed. 

582 

583 Args: 

584 quantile (float): Upper quantile threshold. Defaults to 0.95. 

585 

586 Returns: 

587 dict[str, pl.Series]: Filtered series per asset containing only 

588 returns below the quantile. 

589 

590 """ 

591 result = {} 

592 for col, series in self.data.items(): 

593 threshold = cast(float, series.quantile(quantile, interpolation="linear")) 

594 result[col] = series.filter(series < threshold) 

595 return result 

596 

597 @columnwise_stat 

598 def outlier_win_ratio(self, series: pl.Series, quantile: float = 0.99) -> float: 

599 """Calculate the outlier winners ratio. 

600 

601 Ratio of the high-quantile return to the mean positive return, 

602 showing how much outlier wins contribute to overall performance. 

603 

604 Args: 

605 series (pl.Series): The series to calculate outlier win ratio for. 

606 quantile (float): Quantile for the outlier threshold. Defaults to 0.99. 

607 

608 Returns: 

609 float: Outlier win ratio. 

610 

611 """ 

612 positive_mean = cast(float, series.filter(series >= 0).mean()) 

613 if positive_mean is None or positive_mean == 0: 

614 return float(np.nan) 

615 quantile_val = cast(float, series.quantile(quantile, interpolation="linear")) 

616 return float(quantile_val / positive_mean) 

617 

618 @columnwise_stat 

619 def outlier_loss_ratio(self, series: pl.Series, quantile: float = 0.01) -> float: 

620 """Calculate the outlier losers ratio. 

621 

622 Ratio of the low-quantile return to the mean negative return, 

623 showing how much outlier losses contribute to overall risk. 

624 

625 Args: 

626 series (pl.Series): The series to calculate outlier loss ratio for. 

627 quantile (float): Quantile for the outlier threshold. Defaults to 0.01. 

628 

629 Returns: 

630 float: Outlier loss ratio. 

631 

632 """ 

633 negative_mean = cast(float, series.filter(series < 0).mean()) 

634 if negative_mean is None or negative_mean == 0: 

635 return float(np.nan) 

636 quantile_val = cast(float, series.quantile(quantile, interpolation="linear")) 

637 return float(quantile_val / negative_mean) 

638 

639 @columnwise_stat 

640 def gain_to_pain_ratio(self, series: pl.Series) -> float: 

641 """Calculate Jack Schwager's Gain-to-Pain Ratio. 

642 

643 The ratio is calculated as total return / sum of losses (in absolute value). 

644 

645 Args: 

646 series (pl.Series): The series to calculate gain to pain ratio for. 

647 

648 Returns: 

649 float: The gain to pain ratio value. 

650 

651 """ 

652 total_gain = series.sum() 

653 total_pain = series.filter(series < 0).abs().sum() 

654 try: 

655 return float(float(total_gain) / float(total_pain)) 

656 except ZeroDivisionError: 

657 return float(np.nan) 

658 

659 @columnwise_stat 

660 def risk_return_ratio(self, series: pl.Series) -> float: 

661 """Calculate the return/risk ratio. 

662 

663 This is equivalent to the Sharpe ratio without a risk-free rate. 

664 

665 Args: 

666 series (pl.Series): The series to calculate risk return ratio for. 

667 

668 Returns: 

669 float: The risk return ratio value. 

670 

671 """ 

672 mean_val = cast(float, series.mean()) 

673 std_val = cast(float, series.std()) 

674 return (mean_val if mean_val is not None else 0.0) / (std_val if std_val is not None else 1.0) 

675 

676 def kelly_criterion(self) -> dict[str, float]: 

677 """Calculate the optimal capital allocation per column. 

678 

679 Uses the Kelly Criterion formula: f* = [(b * p) - q] / b 

680 where: 

681 - b = payoff ratio 

682 - p = win rate 

683 - q = 1 - p. 

684 

685 Returns: 

686 dict[str, float]: Dictionary mapping asset names to Kelly criterion values. 

687 

688 """ 

689 b = self.payoff_ratio() 

690 p = self.win_rate() 

691 

692 return {col: ((b[col] * p[col]) - (1 - p[col])) / b[col] for col in b} 

693 

694 @columnwise_stat 

695 def best(self, series: pl.Series) -> float | None: 

696 """Find the maximum return per column (best period). 

697 

698 Args: 

699 series (pl.Series): The series to find the best return for. 

700 

701 Returns: 

702 float: The maximum return value. 

703 

704 """ 

705 val = cast(float, series.max()) 

706 return val if val is not None else None 

707 

708 @columnwise_stat 

709 def worst(self, series: pl.Series) -> float | None: 

710 """Find the minimum return per column (worst period). 

711 

712 Args: 

713 series (pl.Series): The series to find the worst return for. 

714 

715 Returns: 

716 float: The minimum return value. 

717 

718 """ 

719 val = cast(float, series.min()) 

720 return val if val is not None else None 

721 

722 @columnwise_stat 

723 def exposure(self, series: pl.Series) -> float: 

724 """Calculate the market exposure time (returns != 0). 

725 

726 Args: 

727 series (pl.Series): The series to calculate exposure for. 

728 

729 Returns: 

730 float: The exposure value. 

731 

732 """ 

733 all_data = cast(pl.DataFrame, self.all) 

734 ex = series.filter(series != 0).count() / all_data.height 

735 return math.ceil(ex * 100) / 100 

736 

737 @staticmethod 

738 def _pearson_corr_shifted(series: pl.Series, lag: int) -> float: 

739 """Compute Pearson correlation between *series* and its lag-*lag* shift. 

740 

741 Args: 

742 series (pl.Series): The input series. 

743 lag (int): Number of positions to shift. 

744 

745 Returns: 

746 float: Pearson correlation coefficient, or NaN if no valid pairs remain. 

747 

748 """ 

749 shifted = series.shift(lag) 

750 paired = pl.DataFrame({"x": series, "y": shifted}).drop_nulls() 

751 if paired.is_empty(): 

752 return float("nan") 

753 return float(np.corrcoef(paired["x"].to_numpy(), paired["y"].to_numpy())[0, 1]) 

754 

755 @columnwise_stat 

756 def autocorr(self, series: pl.Series, lag: int = 1) -> float: 

757 """Compute lag-n autocorrelation of returns. 

758 

759 Args: 

760 series (pl.Series): The series to calculate autocorrelation for. 

761 lag (int): Number of periods to lag. Must be a positive integer. 

762 

763 Returns: 

764 float: Pearson correlation between returns and their lagged values. 

765 

766 Raises: 

767 TypeError: If *lag* is not an ``int``. 

768 ValueError: If *lag* is not a positive integer (>= 1). 

769 

770 """ 

771 if not isinstance(lag, int): 

772 msg = f"lag must be an int, got {type(lag).__name__}" 

773 raise TypeError(msg) 

774 if lag <= 0: 

775 msg = f"lag must be a positive integer, got {lag}" 

776 raise ValueError(msg) 

777 return self._pearson_corr_shifted(series, lag) 

778 

779 def acf(self, nlags: int = 20) -> pl.DataFrame: 

780 """Compute the autocorrelation function up to nlags. 

781 

782 Args: 

783 nlags (int): Maximum number of lags to include. Default is 20. 

784 

785 Returns: 

786 pl.DataFrame: DataFrame with a ``lag`` column (0..nlags) and one 

787 column per asset containing the ACF values. 

788 

789 Raises: 

790 TypeError: If *nlags* is not an ``int``. 

791 ValueError: If *nlags* is negative. 

792 

793 """ 

794 if not isinstance(nlags, int): 

795 msg = f"nlags must be an int, got {type(nlags).__name__}" 

796 raise TypeError(msg) 

797 if nlags < 0: 

798 msg = f"nlags must be non-negative, got {nlags}" 

799 raise ValueError(msg) 

800 result: dict[str, list[float]] = {"lag": list(range(nlags + 1))} 

801 for col, series in self.data.items(): 

802 acf_values: list[float] = [1.0] 

803 for k in range(1, nlags + 1): 

804 acf_values.append(self._pearson_corr_shifted(series, k)) 

805 result[col] = acf_values 

806 return pl.DataFrame(result)