Coverage for src / jquantstats / _portfolio_nav.py: 100%

65 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-07 14:36 +0000

1"""NAV & returns chain mixin for Portfolio.""" 

2 

3from __future__ import annotations 

4 

5import contextlib 

6from typing import TYPE_CHECKING 

7 

8import polars as pl 

9 

10from .exceptions import MissingDateColumnError 

11 

12 

13class PortfolioNavMixin: 

14 """Mixin providing NAV & returns chain properties for Portfolio.""" 

15 

16 if TYPE_CHECKING: 

17 prices: pl.DataFrame 

18 cashposition: pl.DataFrame 

19 aum: float 

20 _profits_cache: pl.DataFrame | None 

21 _returns_cache: pl.DataFrame | None 

22 

23 @staticmethod 

24 def _assert_clean_series(series: pl.Series, name: str = "") -> None: 

25 """Raise ValueError if *series* contains nulls or non-finite values.""" 

26 ... 

27 

28 @property 

29 def profits(self) -> pl.DataFrame: 

30 """Compute per-asset daily cash profits, preserving non-numeric columns. 

31 

32 Returns: 

33 pl.DataFrame: Per-asset daily profit series along with any 

34 non-numeric columns (e.g., ``'date'``). 

35 

36 The result is cached after first access so repeated calls are O(1). 

37 

38 Note: 

39 Caching is not thread-safe. Concurrent access from multiple 

40 threads may trigger redundant computation, but will never produce 

41 incorrect results because each thread stores the same deterministic 

42 value. 

43 

44 Examples: 

45 >>> from jquantstats.portfolio import Portfolio 

46 >>> import polars as pl 

47 >>> prices = pl.DataFrame({"A": [100.0, 110.0, 105.0]}) 

48 >>> pos = pl.DataFrame({"A": [1000.0, 1000.0, 1000.0]}) 

49 >>> pf = Portfolio(prices=prices, cashposition=pos, aum=1e6) 

50 >>> pf.profits.columns 

51 ['A'] 

52 """ 

53 cache = getattr(self, "_profits_cache", None) 

54 if cache is not None: 

55 return cache 

56 

57 assets = [c for c in self.prices.columns if self.prices[c].dtype.is_numeric()] 

58 

59 result = self.prices.with_columns( 

60 (self.prices[asset].pct_change().fill_null(0.0) * self.cashposition[asset].shift(n=1).fill_null(0.0)).alias( 

61 asset 

62 ) 

63 for asset in assets 

64 ) 

65 

66 if assets: 

67 result = result.with_columns( 

68 pl.when(pl.col(c).is_finite()).then(pl.col(c)).otherwise(0.0).fill_null(0.0).alias(c) for c in assets 

69 ) 

70 

71 with contextlib.suppress(AttributeError, TypeError): 

72 object.__setattr__(self, "_profits_cache", result) 

73 return result 

74 

75 @property 

76 def profit(self) -> pl.DataFrame: 

77 """Return total daily portfolio profit including the ``'date'`` column. 

78 

79 Aggregates per-asset profits into a single ``'profit'`` column and 

80 validates that no day's total profit is NaN/null. 

81 """ 

82 df_profits = self.profits 

83 assets = [c for c in df_profits.columns if df_profits[c].dtype.is_numeric()] 

84 

85 if not assets: 

86 raise ValueError 

87 

88 non_assets = [c for c in df_profits.columns if c not in set(assets)] 

89 

90 portfolio_daily_profit = pl.sum_horizontal([pl.col(c).fill_null(0.0) for c in assets]).alias("profit") 

91 result = df_profits.select([*non_assets, portfolio_daily_profit]) 

92 

93 self._assert_clean_series(series=result["profit"]) 

94 return result 

95 

96 @property 

97 def nav_accumulated(self) -> pl.DataFrame: 

98 """Compute cumulative additive NAV of the portfolio, preserving ``'date'``.""" 

99 return self.profit.with_columns((pl.col("profit").cum_sum() + self.aum).alias("NAV_accumulated")) 

100 

101 @property 

102 def returns(self) -> pl.DataFrame: 

103 """Return daily returns as profit scaled by AUM, preserving ``'date'``. 

104 

105 The returned DataFrame contains the original ``'date'`` column with the 

106 ``'profit'`` column scaled by AUM (i.e., per-period returns), and also 

107 an additional convenience column named ``'returns'`` with the same 

108 values for downstream consumers. 

109 

110 The result is cached after first access so repeated calls are O(1). 

111 

112 Note: 

113 Caching is not thread-safe. Concurrent access from multiple 

114 threads may trigger redundant computation, but will never produce 

115 incorrect results because each thread stores the same deterministic 

116 value. 

117 """ 

118 cache = getattr(self, "_returns_cache", None) 

119 if cache is not None: 

120 return cache 

121 result = self.nav_accumulated.with_columns( 

122 (pl.col("profit") / self.aum).alias("returns"), 

123 ) 

124 with contextlib.suppress(AttributeError, TypeError): 

125 object.__setattr__(self, "_returns_cache", result) 

126 return result 

127 

128 @property 

129 def monthly(self) -> pl.DataFrame: 

130 """Return monthly compounded returns and calendar columns. 

131 

132 Aggregates daily returns (profit/AUM) by calendar month and computes 

133 the compounded monthly return: prod(1 + r_d) - 1. The resulting frame 

134 includes: 

135 

136 - ``date``: month-end label as a Polars Date (end of the grouping window) 

137 - ``returns``: compounded monthly return 

138 - ``NAV_accumulated``: last NAV within the month 

139 - ``profit``: summed profit within the month 

140 - ``year``: integer year (e.g., 2020) 

141 - ``month``: integer month number (1-12) 

142 - ``month_name``: abbreviated month name (e.g., ``"Jan"``, ``"Feb"``) 

143 

144 Raises: 

145 MissingDateColumnError: If the portfolio data has no ``'date'`` 

146 column. 

147 """ 

148 if "date" not in self.prices.columns: 

149 raise MissingDateColumnError("monthly") 

150 daily = self.returns.select(["date", "returns", "profit", "NAV_accumulated"]) 

151 monthly = ( 

152 daily.group_by_dynamic( 

153 "date", 

154 every="1mo", 

155 period="1mo", 

156 label="left", 

157 closed="right", 

158 ) 

159 .agg( 

160 [ 

161 pl.col("profit").sum().alias("profit"), 

162 pl.col("NAV_accumulated").last().alias("NAV_accumulated"), 

163 (pl.col("returns") + 1.0).product().alias("gross"), 

164 ] 

165 ) 

166 .with_columns((pl.col("gross") - 1.0).alias("returns")) 

167 .select(["date", "returns", "NAV_accumulated", "profit"]) 

168 .with_columns( 

169 [ 

170 pl.col("date").dt.year().alias("year"), 

171 pl.col("date").dt.month().alias("month"), 

172 pl.col("date").dt.strftime("%b").alias("month_name"), 

173 ] 

174 ) 

175 .sort("date") 

176 ) 

177 return monthly 

178 

179 @property 

180 def nav_compounded(self) -> pl.DataFrame: 

181 """Compute compounded NAV from returns (profit/AUM), preserving ``'date'``.""" 

182 return self.returns.with_columns(((pl.col("returns") + 1.0).cum_prod() * self.aum).alias("NAV_compounded")) 

183 

184 @property 

185 def highwater(self) -> pl.DataFrame: 

186 """Return the cumulative maximum of NAV as the high-water mark series. 

187 

188 The resulting DataFrame preserves the ``'date'`` column and adds a 

189 ``'highwater'`` column computed as the cumulative maximum of 

190 ``'NAV_accumulated'``. 

191 """ 

192 return self.returns.with_columns(pl.col("NAV_accumulated").cum_max().alias("highwater")) 

193 

194 @property 

195 def drawdown(self) -> pl.DataFrame: 

196 """Return drawdown as the distance from high-water mark to current NAV. 

197 

198 Computes ``'drawdown'`` = ``'highwater'`` - ``'NAV_accumulated'`` and 

199 preserves the ``'date'`` column alongside the intermediate columns. 

200 """ 

201 return self.highwater.with_columns( 

202 (pl.col("highwater") - pl.col("NAV_accumulated")).alias("drawdown"), 

203 ((pl.col("highwater") - pl.col("NAV_accumulated")) / pl.col("highwater")).alias("drawdown_pct"), 

204 ) 

205 

206 @property 

207 def all(self) -> pl.DataFrame: 

208 """Return a merged view of drawdown and compounded NAV. 

209 

210 When a ``'date'`` column is present the two frames are joined on that 

211 column to ensure temporal alignment. When the data is integer-indexed 

212 (no ``'date'`` column) the frames are stacked horizontally — they are 

213 guaranteed to have identical row counts because both are derived from 

214 the same source portfolio. 

215 """ 

216 left = self.drawdown 

217 if "date" in left.columns: 

218 right = self.nav_compounded.select(["date", "NAV_compounded"]) 

219 return left.join(right, on="date", how="inner") 

220 else: 

221 right = self.nav_compounded.select(["NAV_compounded"]) 

222 return left.hstack(right)