Coverage for src / jquantstats / _portfolio_attribution.py: 100%

33 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-07 14:36 +0000

1"""Tilt/timing attribution mixin for Portfolio.""" 

2 

3from __future__ import annotations 

4 

5import contextlib 

6from typing import TYPE_CHECKING, Self 

7 

8import polars as pl 

9 

10if TYPE_CHECKING: 

11 pass 

12 

13 

14class PortfolioAttributionMixin: 

15 """Mixin providing tilt/timing attribution properties for Portfolio.""" 

16 

17 if TYPE_CHECKING: 

18 cashposition: pl.DataFrame 

19 prices: pl.DataFrame 

20 aum: float 

21 cost_per_unit: float 

22 cost_bps: float 

23 assets: list[str] 

24 nav_accumulated: pl.DataFrame 

25 _tilt_cache: Self | None 

26 

27 @classmethod 

28 def from_cash_position( 

29 cls, 

30 prices: pl.DataFrame, 

31 cash_position: pl.DataFrame, 

32 aum: float, 

33 cost_per_unit: float = 0.0, 

34 cost_bps: float = 0.0, 

35 ) -> Self: 

36 """Create a Portfolio directly from cash positions aligned with prices.""" 

37 ... 

38 

39 @property 

40 def tilt(self) -> Self: 

41 """Return the 'tilt' portfolio with constant average weights. 

42 

43 Computes the time-average of each asset's cash position (ignoring 

44 nulls/NaNs) and builds a new Portfolio with those constant weights 

45 applied across time. Prices and AUM are preserved. 

46 

47 The result is cached after first access so repeated calls are O(1). 

48 

49 Note: 

50 Caching is not thread-safe. Concurrent access from multiple 

51 threads may trigger redundant computation, but will never produce 

52 incorrect results because each thread stores the same deterministic 

53 value. 

54 """ 

55 cache = getattr(self, "_tilt_cache", None) 

56 if cache is not None: 

57 return cache 

58 const_position = self.cashposition.with_columns( 

59 pl.col(col).drop_nulls().drop_nans().mean().alias(col) for col in self.assets 

60 ) 

61 result = type(self).from_cash_position( 

62 self.prices, 

63 const_position, 

64 aum=self.aum, 

65 cost_per_unit=self.cost_per_unit, 

66 cost_bps=self.cost_bps, 

67 ) 

68 with contextlib.suppress(AttributeError, TypeError): 

69 object.__setattr__(self, "_tilt_cache", result) 

70 return result 

71 

72 @property 

73 def timing(self) -> Self: 

74 """Return the 'timing' portfolio capturing deviations from the tilt. 

75 

76 Constructs weights as original cash positions minus the tilt's 

77 constant positions, per asset. This isolates timing (alloc-demeaned) 

78 effects. Prices and AUM are preserved. 

79 """ 

80 const_position = self.tilt.cashposition 

81 position = self.cashposition.with_columns((pl.col(col) - const_position[col]).alias(col) for col in self.assets) 

82 return type(self).from_cash_position( 

83 self.prices, 

84 position, 

85 aum=self.aum, 

86 cost_per_unit=self.cost_per_unit, 

87 cost_bps=self.cost_bps, 

88 ) 

89 

90 @property 

91 def tilt_timing_decomp(self) -> pl.DataFrame: 

92 """Return the portfolio's tilt/timing NAV decomposition. 

93 

94 When a ``'date'`` column is present the three NAV series are joined on 

95 it. When data is integer-indexed the frames are stacked horizontally. 

96 """ 

97 if "date" in self.nav_accumulated.columns: 

98 nav_portfolio = self.nav_accumulated.select(["date", "NAV_accumulated"]) 

99 nav_tilt = self.tilt.nav_accumulated.select(["date", "NAV_accumulated"]) 

100 nav_timing = self.timing.nav_accumulated.select(["date", "NAV_accumulated"]) 

101 

102 merged_df = nav_portfolio.join(nav_tilt, on="date", how="inner", suffix="_tilt").join( 

103 nav_timing, on="date", how="inner", suffix="_timing" 

104 ) 

105 else: 

106 nav_portfolio = self.nav_accumulated.select(["NAV_accumulated"]) 

107 nav_tilt = self.tilt.nav_accumulated.select(["NAV_accumulated"]).rename( 

108 {"NAV_accumulated": "NAV_accumulated_tilt"} 

109 ) 

110 nav_timing = self.timing.nav_accumulated.select(["NAV_accumulated"]).rename( 

111 {"NAV_accumulated": "NAV_accumulated_timing"} 

112 ) 

113 merged_df = nav_portfolio.hstack(nav_tilt).hstack(nav_timing) 

114 

115 merged_df = merged_df.rename( 

116 {"NAV_accumulated_tilt": "tilt", "NAV_accumulated_timing": "timing", "NAV_accumulated": "portfolio"} 

117 ) 

118 return merged_df