Coverage for src / jquantstats / _portfolio_nav.py: 100%
65 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-07 14:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-07 14:36 +0000
1"""NAV & returns chain mixin for Portfolio."""
3from __future__ import annotations
5import contextlib
6from typing import TYPE_CHECKING
8import polars as pl
10from .exceptions import MissingDateColumnError
13class PortfolioNavMixin:
14 """Mixin providing NAV & returns chain properties for Portfolio."""
16 if TYPE_CHECKING:
17 prices: pl.DataFrame
18 cashposition: pl.DataFrame
19 aum: float
20 _profits_cache: pl.DataFrame | None
21 _returns_cache: pl.DataFrame | None
23 @staticmethod
24 def _assert_clean_series(series: pl.Series, name: str = "") -> None:
25 """Raise ValueError if *series* contains nulls or non-finite values."""
26 ...
28 @property
29 def profits(self) -> pl.DataFrame:
30 """Compute per-asset daily cash profits, preserving non-numeric columns.
32 Returns:
33 pl.DataFrame: Per-asset daily profit series along with any
34 non-numeric columns (e.g., ``'date'``).
36 The result is cached after first access so repeated calls are O(1).
38 Note:
39 Caching is not thread-safe. Concurrent access from multiple
40 threads may trigger redundant computation, but will never produce
41 incorrect results because each thread stores the same deterministic
42 value.
44 Examples:
45 >>> from jquantstats.portfolio import Portfolio
46 >>> import polars as pl
47 >>> prices = pl.DataFrame({"A": [100.0, 110.0, 105.0]})
48 >>> pos = pl.DataFrame({"A": [1000.0, 1000.0, 1000.0]})
49 >>> pf = Portfolio(prices=prices, cashposition=pos, aum=1e6)
50 >>> pf.profits.columns
51 ['A']
52 """
53 cache = getattr(self, "_profits_cache", None)
54 if cache is not None:
55 return cache
57 assets = [c for c in self.prices.columns if self.prices[c].dtype.is_numeric()]
59 result = self.prices.with_columns(
60 (self.prices[asset].pct_change().fill_null(0.0) * self.cashposition[asset].shift(n=1).fill_null(0.0)).alias(
61 asset
62 )
63 for asset in assets
64 )
66 if assets:
67 result = result.with_columns(
68 pl.when(pl.col(c).is_finite()).then(pl.col(c)).otherwise(0.0).fill_null(0.0).alias(c) for c in assets
69 )
71 with contextlib.suppress(AttributeError, TypeError):
72 object.__setattr__(self, "_profits_cache", result)
73 return result
75 @property
76 def profit(self) -> pl.DataFrame:
77 """Return total daily portfolio profit including the ``'date'`` column.
79 Aggregates per-asset profits into a single ``'profit'`` column and
80 validates that no day's total profit is NaN/null.
81 """
82 df_profits = self.profits
83 assets = [c for c in df_profits.columns if df_profits[c].dtype.is_numeric()]
85 if not assets:
86 raise ValueError
88 non_assets = [c for c in df_profits.columns if c not in set(assets)]
90 portfolio_daily_profit = pl.sum_horizontal([pl.col(c).fill_null(0.0) for c in assets]).alias("profit")
91 result = df_profits.select([*non_assets, portfolio_daily_profit])
93 self._assert_clean_series(series=result["profit"])
94 return result
96 @property
97 def nav_accumulated(self) -> pl.DataFrame:
98 """Compute cumulative additive NAV of the portfolio, preserving ``'date'``."""
99 return self.profit.with_columns((pl.col("profit").cum_sum() + self.aum).alias("NAV_accumulated"))
101 @property
102 def returns(self) -> pl.DataFrame:
103 """Return daily returns as profit scaled by AUM, preserving ``'date'``.
105 The returned DataFrame contains the original ``'date'`` column with the
106 ``'profit'`` column scaled by AUM (i.e., per-period returns), and also
107 an additional convenience column named ``'returns'`` with the same
108 values for downstream consumers.
110 The result is cached after first access so repeated calls are O(1).
112 Note:
113 Caching is not thread-safe. Concurrent access from multiple
114 threads may trigger redundant computation, but will never produce
115 incorrect results because each thread stores the same deterministic
116 value.
117 """
118 cache = getattr(self, "_returns_cache", None)
119 if cache is not None:
120 return cache
121 result = self.nav_accumulated.with_columns(
122 (pl.col("profit") / self.aum).alias("returns"),
123 )
124 with contextlib.suppress(AttributeError, TypeError):
125 object.__setattr__(self, "_returns_cache", result)
126 return result
128 @property
129 def monthly(self) -> pl.DataFrame:
130 """Return monthly compounded returns and calendar columns.
132 Aggregates daily returns (profit/AUM) by calendar month and computes
133 the compounded monthly return: prod(1 + r_d) - 1. The resulting frame
134 includes:
136 - ``date``: month-end label as a Polars Date (end of the grouping window)
137 - ``returns``: compounded monthly return
138 - ``NAV_accumulated``: last NAV within the month
139 - ``profit``: summed profit within the month
140 - ``year``: integer year (e.g., 2020)
141 - ``month``: integer month number (1-12)
142 - ``month_name``: abbreviated month name (e.g., ``"Jan"``, ``"Feb"``)
144 Raises:
145 MissingDateColumnError: If the portfolio data has no ``'date'``
146 column.
147 """
148 if "date" not in self.prices.columns:
149 raise MissingDateColumnError("monthly")
150 daily = self.returns.select(["date", "returns", "profit", "NAV_accumulated"])
151 monthly = (
152 daily.group_by_dynamic(
153 "date",
154 every="1mo",
155 period="1mo",
156 label="left",
157 closed="right",
158 )
159 .agg(
160 [
161 pl.col("profit").sum().alias("profit"),
162 pl.col("NAV_accumulated").last().alias("NAV_accumulated"),
163 (pl.col("returns") + 1.0).product().alias("gross"),
164 ]
165 )
166 .with_columns((pl.col("gross") - 1.0).alias("returns"))
167 .select(["date", "returns", "NAV_accumulated", "profit"])
168 .with_columns(
169 [
170 pl.col("date").dt.year().alias("year"),
171 pl.col("date").dt.month().alias("month"),
172 pl.col("date").dt.strftime("%b").alias("month_name"),
173 ]
174 )
175 .sort("date")
176 )
177 return monthly
179 @property
180 def nav_compounded(self) -> pl.DataFrame:
181 """Compute compounded NAV from returns (profit/AUM), preserving ``'date'``."""
182 return self.returns.with_columns(((pl.col("returns") + 1.0).cum_prod() * self.aum).alias("NAV_compounded"))
184 @property
185 def highwater(self) -> pl.DataFrame:
186 """Return the cumulative maximum of NAV as the high-water mark series.
188 The resulting DataFrame preserves the ``'date'`` column and adds a
189 ``'highwater'`` column computed as the cumulative maximum of
190 ``'NAV_accumulated'``.
191 """
192 return self.returns.with_columns(pl.col("NAV_accumulated").cum_max().alias("highwater"))
194 @property
195 def drawdown(self) -> pl.DataFrame:
196 """Return drawdown as the distance from high-water mark to current NAV.
198 Computes ``'drawdown'`` = ``'highwater'`` - ``'NAV_accumulated'`` and
199 preserves the ``'date'`` column alongside the intermediate columns.
200 """
201 return self.highwater.with_columns(
202 (pl.col("highwater") - pl.col("NAV_accumulated")).alias("drawdown"),
203 ((pl.col("highwater") - pl.col("NAV_accumulated")) / pl.col("highwater")).alias("drawdown_pct"),
204 )
206 @property
207 def all(self) -> pl.DataFrame:
208 """Return a merged view of drawdown and compounded NAV.
210 When a ``'date'`` column is present the two frames are joined on that
211 column to ensure temporal alignment. When the data is integer-indexed
212 (no ``'date'`` column) the frames are stacked horizontally — they are
213 guaranteed to have identical row counts because both are derived from
214 the same source portfolio.
215 """
216 left = self.drawdown
217 if "date" in left.columns:
218 right = self.nav_compounded.select(["date", "NAV_compounded"])
219 return left.join(right, on="date", how="inner")
220 else:
221 right = self.nav_compounded.select(["NAV_compounded"])
222 return left.hstack(right)