Coverage for src/jquantstats/_portfolio_nav.py: 100%

57 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-23 06:13 +0000

1"""NAV & returns chain mixin for Portfolio.""" 

2 

3from __future__ import annotations 

4 

5from typing import TYPE_CHECKING 

6 

7import polars as pl 

8 

9from ._cache import cached_in_slot 

10from ._portfolio_base import _PortfolioMembers 

11from .exceptions import MissingDateColumnError, NoAssetColumnsError 

12 

13 

14class PortfolioNavMixin(_PortfolioMembers): 

15 """Mixin providing NAV & returns chain properties for Portfolio.""" 

16 

17 if TYPE_CHECKING: 

18 _profits_cache: pl.DataFrame | None 

19 _returns_cache: pl.DataFrame | None 

20 

21 @staticmethod 

22 def _assert_clean_series(series: pl.Series, name: str = "") -> None: 

23 """Raise ValueError if *series* contains nulls or non-finite values.""" 

24 ... 

25 

26 @property 

27 @cached_in_slot("_profits_cache") 

28 def profits(self) -> pl.DataFrame: 

29 """Compute per-asset daily cash profits, preserving non-numeric columns. 

30 

31 Returns: 

32 pl.DataFrame: Per-asset daily profit series along with any 

33 non-numeric columns (e.g., ``'date'``). 

34 

35 The result is cached after first access so repeated calls are O(1). 

36 

37 Note: 

38 Caching is not thread-safe. Concurrent access from multiple 

39 threads may trigger redundant computation, but will never produce 

40 incorrect results because each thread stores the same deterministic 

41 value. 

42 

43 Examples: 

44 >>> from jquantstats.portfolio import Portfolio 

45 >>> import polars as pl 

46 >>> prices = pl.DataFrame({"A": [100.0, 110.0, 105.0]}) 

47 >>> pos = pl.DataFrame({"A": [1000.0, 1000.0, 1000.0]}) 

48 >>> pf = Portfolio(prices=prices, cashposition=pos, aum=1e6) 

49 >>> pf.profits.columns 

50 ['A'] 

51 """ 

52 assets = [c for c in self.prices.columns if self.prices[c].dtype.is_numeric()] 

53 

54 result = self.prices.with_columns( 

55 (self.prices[asset].pct_change().fill_null(0.0) * self.cashposition[asset].shift(n=1).fill_null(0.0)).alias( 

56 asset 

57 ) 

58 for asset in assets 

59 ) 

60 

61 if assets: 

62 result = result.with_columns( 

63 pl.when(pl.col(c).is_finite()).then(pl.col(c)).otherwise(0.0).fill_null(0.0).alias(c) for c in assets 

64 ) 

65 

66 return result 

67 

68 @property 

69 def profit(self) -> pl.DataFrame: 

70 """Return total daily portfolio profit including the ``'date'`` column. 

71 

72 Aggregates per-asset profits into a single ``'profit'`` column and 

73 validates that no day's total profit is NaN/null. 

74 

75 Raises: 

76 NoAssetColumnsError: If the profits frame has no numeric asset columns. 

77 """ 

78 df_profits = self.profits 

79 assets = [c for c in df_profits.columns if df_profits[c].dtype.is_numeric()] 

80 

81 if not assets: 

82 raise NoAssetColumnsError("profits") 

83 

84 non_assets = [c for c in df_profits.columns if c not in set(assets)] 

85 

86 portfolio_daily_profit = pl.sum_horizontal([pl.col(c).fill_null(0.0) for c in assets]).alias("profit") 

87 result = df_profits.select([*non_assets, portfolio_daily_profit]) 

88 

89 self._assert_clean_series(series=result["profit"]) 

90 return result 

91 

92 @property 

93 def nav_accumulated(self) -> pl.DataFrame: 

94 """Compute cumulative additive NAV of the portfolio, preserving ``'date'``.""" 

95 return self.profit.with_columns((pl.col("profit").cum_sum() + self.aum).alias("NAV_accumulated")) 

96 

97 @property 

98 @cached_in_slot("_returns_cache") 

99 def returns(self) -> pl.DataFrame: 

100 """Return daily returns as profit scaled by AUM, preserving ``'date'``. 

101 

102 The returned DataFrame contains the original ``'date'`` column with the 

103 ``'profit'`` column scaled by AUM (i.e., per-period returns), and also 

104 an additional convenience column named ``'returns'`` with the same 

105 values for downstream consumers. 

106 

107 The result is cached after first access so repeated calls are O(1). 

108 

109 Note: 

110 Caching is not thread-safe. Concurrent access from multiple 

111 threads may trigger redundant computation, but will never produce 

112 incorrect results because each thread stores the same deterministic 

113 value. 

114 """ 

115 return self.nav_accumulated.with_columns( 

116 (pl.col("profit") / self.aum).alias("returns"), 

117 ) 

118 

119 @property 

120 def monthly(self) -> pl.DataFrame: 

121 """Return monthly compounded returns and calendar columns. 

122 

123 Aggregates daily returns (profit/AUM) by calendar month and computes 

124 the compounded monthly return: prod(1 + r_d) - 1. The resulting frame 

125 includes: 

126 

127 - ``date``: month-end label as a Polars Date (end of the grouping window) 

128 - ``returns``: compounded monthly return 

129 - ``NAV_accumulated``: last NAV within the month 

130 - ``profit``: summed profit within the month 

131 - ``year``: integer year (e.g., 2020) 

132 - ``month``: integer month number (1-12) 

133 - ``month_name``: abbreviated month name (e.g., ``"Jan"``, ``"Feb"``) 

134 

135 Raises: 

136 MissingDateColumnError: If the portfolio data has no ``'date'`` 

137 column. 

138 """ 

139 if "date" not in self.prices.columns: 

140 raise MissingDateColumnError("monthly") 

141 daily = self.returns.select(["date", "returns", "profit", "NAV_accumulated"]) 

142 monthly = ( 

143 daily.group_by_dynamic( 

144 "date", 

145 every="1mo", 

146 period="1mo", 

147 label="left", 

148 closed="right", 

149 ) 

150 .agg( 

151 [ 

152 pl.col("profit").sum().alias("profit"), 

153 pl.col("NAV_accumulated").last().alias("NAV_accumulated"), 

154 (pl.col("returns") + 1.0).product().alias("gross"), 

155 ] 

156 ) 

157 .with_columns((pl.col("gross") - 1.0).alias("returns")) 

158 .select(["date", "returns", "NAV_accumulated", "profit"]) 

159 .with_columns( 

160 [ 

161 pl.col("date").dt.year().alias("year"), 

162 pl.col("date").dt.month().alias("month"), 

163 pl.col("date").dt.strftime("%b").alias("month_name"), 

164 ] 

165 ) 

166 .sort("date") 

167 ) 

168 return monthly 

169 

170 @property 

171 def nav_compounded(self) -> pl.DataFrame: 

172 """Compute compounded NAV from returns (profit/AUM), preserving ``'date'``.""" 

173 return self.returns.with_columns(((pl.col("returns") + 1.0).cum_prod() * self.aum).alias("NAV_compounded")) 

174 

175 @property 

176 def highwater(self) -> pl.DataFrame: 

177 """Return the cumulative maximum of NAV as the high-water mark series. 

178 

179 The resulting DataFrame preserves the ``'date'`` column and adds a 

180 ``'highwater'`` column computed as the cumulative maximum of 

181 ``'NAV_accumulated'``. 

182 """ 

183 return self.returns.with_columns(pl.col("NAV_accumulated").cum_max().alias("highwater")) 

184 

185 @property 

186 def drawdown(self) -> pl.DataFrame: 

187 """Return drawdown as the distance from high-water mark to current NAV. 

188 

189 Computes ``'drawdown'`` = ``'highwater'`` - ``'NAV_accumulated'`` and 

190 preserves the ``'date'`` column alongside the intermediate columns. 

191 """ 

192 return self.highwater.with_columns( 

193 (pl.col("highwater") - pl.col("NAV_accumulated")).alias("drawdown"), 

194 ((pl.col("highwater") - pl.col("NAV_accumulated")) / pl.col("highwater")).alias("drawdown_pct"), 

195 ) 

196 

197 @property 

198 def all(self) -> pl.DataFrame: 

199 """Return a merged view of drawdown and compounded NAV. 

200 

201 When a ``'date'`` column is present the two frames are joined on that 

202 column to ensure temporal alignment. When the data is integer-indexed 

203 (no ``'date'`` column) the frames are stacked horizontally — they are 

204 guaranteed to have identical row counts because both are derived from 

205 the same source portfolio. 

206 """ 

207 left = self.drawdown 

208 if "date" in left.columns: 

209 right = self.nav_compounded.select(["date", "NAV_compounded"]) 

210 return left.join(right, on="date", how="inner") 

211 else: 

212 right = self.nav_compounded.select(["NAV_compounded"]) 

213 return left.hstack(right)