Coverage for src / jquantstats / _utils / _data.py: 100%
56 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-07 14:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-07 14:28 +0000
1"""Utility methods for Data objects — the jquantstats equivalent of qs.utils."""
3from __future__ import annotations
5import dataclasses
6import math
8import polars as pl
10from ..exceptions import MissingDateColumnError
11from ._protocol import DataLike
13__all__ = ["DataUtils"]
15# Maps human-readable aliases to Polars every-string format.
16_PERIOD_ALIASES: dict[str, str] = {
17 "daily": "1d",
18 "weekly": "1w",
19 "monthly": "1mo",
20 "quarterly": "1q",
21 "annual": "1y",
22 "yearly": "1y",
23}
26@dataclasses.dataclass(frozen=True)
27class DataUtils:
28 """Utility transforms and conversions for financial returns data.
30 Mirrors the public API of ``quantstats.utils`` but operates on Polars
31 DataFrames and integrates with :class:`~jquantstats.data.Data` via the
32 ``data.utils`` property.
34 Attributes:
35 data: Any object satisfying the :class:`~jquantstats._utils._protocol.DataLike`
36 protocol — typically a :class:`~jquantstats.data.Data` instance.
38 """
40 data: DataLike
42 def __repr__(self) -> str:
43 """Return a string representation of the DataUtils object."""
44 return f"DataUtils(assets={list(self.data.returns.columns)})"
46 # ── helpers ───────────────────────────────────────────────────────────────
48 def _combined(self) -> pl.DataFrame:
49 """Return index hstacked with returns (no benchmark)."""
50 return pl.concat([self.data.index, self.data.returns], how="horizontal")
52 def _asset_cols(self) -> list[str]:
53 """Return the asset column names from returns (excluding benchmark)."""
54 return list(self.data.returns.columns)
56 def _require_temporal_index(self, method: str) -> str:
57 """Raise MissingDateColumnError if the index is not temporal, else return date col name."""
58 date_cols = self.data.date_col
59 if not date_cols:
60 raise MissingDateColumnError(method) # pragma: no cover
61 date_col = date_cols[0]
62 if not self.data.index[date_col].dtype.is_temporal():
63 raise MissingDateColumnError(method)
64 return date_col
66 # ── public API ────────────────────────────────────────────────────────────
68 def to_prices(self, base: float = 1e5) -> pl.DataFrame:
69 """Convert returns to a cumulative price series.
71 Computes ``base * prod(1 + r_t)`` for each asset column, matching the
72 behaviour of ``quantstats.utils.to_prices``.
74 Args:
75 base: Starting value for the price series. Defaults to ``1e5``.
77 Returns:
78 DataFrame with the same date column (if present) and one price
79 column per asset.
81 """
82 asset_cols = self._asset_cols()
83 return self._combined().with_columns(
84 [(pl.col(c).fill_null(0.0) + 1.0).cum_prod().mul(base).alias(c) for c in asset_cols]
85 )
87 def to_log_returns(self) -> pl.DataFrame:
88 """Convert simple returns to log returns: ``ln(1 + r)``.
90 Matches ``quantstats.utils.to_log_returns``.
92 Returns:
93 DataFrame with the same columns as the input returns, values
94 replaced by their log-return equivalents.
96 """
97 asset_cols = self._asset_cols()
98 return self._combined().with_columns(
99 [(pl.col(c).fill_null(0.0) + 1.0).log(base=math.e).alias(c) for c in asset_cols]
100 )
102 def log_returns(self) -> pl.DataFrame:
103 """Alias for :meth:`to_log_returns`.
105 Matches ``quantstats.utils.log_returns``.
107 Returns:
108 DataFrame of log returns.
110 """
111 return self.to_log_returns()
113 def rebase(self, base: float = 100.0) -> pl.DataFrame:
114 """Normalise the returns as a price series that starts at *base*.
116 Converts returns to prices via :meth:`to_prices` and then rescales
117 each column so its first observation equals *base* exactly, matching
118 the behaviour of ``quantstats.utils.rebase``.
120 Args:
121 base: Target starting value. Defaults to ``100.0``.
123 Returns:
124 DataFrame with price columns anchored to *base* at t = 0.
126 """
127 prices_df = self.to_prices(base=1.0)
128 asset_cols = self._asset_cols()
129 return prices_df.with_columns([(pl.col(c) / pl.col(c).first() * base).alias(c) for c in asset_cols])
131 def group_returns(self, period: str = "1mo", compounded: bool = True) -> pl.DataFrame:
132 """Aggregate returns by a calendar period.
134 Requires a temporal (Date/Datetime) index; raises
135 :exc:`~jquantstats.exceptions.MissingDateColumnError` for integer-indexed data.
137 Human-readable aliases are accepted alongside native Polars interval
138 strings (``"1mo"``, ``"1q"``, ``"1y"``, ``"1w"``, ``"1d"``):
140 ``"daily"``, ``"weekly"``, ``"monthly"``, ``"quarterly"``,
141 ``"annual"`` / ``"yearly"``.
143 Args:
144 period: Aggregation period. Defaults to ``"1mo"`` (monthly).
145 compounded: When ``True`` (default) compound the returns
146 ``prod(1 + r) - 1``; when ``False`` sum them.
148 Returns:
149 DataFrame with one row per period and one column per asset.
151 """
152 date_col = self._require_temporal_index("group_returns")
153 polars_period = _PERIOD_ALIASES.get(period, period)
154 asset_cols = self._asset_cols()
156 if compounded:
157 agg_exprs = [((pl.col(c).fill_null(0.0) + 1.0).product() - 1.0).alias(c) for c in asset_cols]
158 else:
159 agg_exprs = [pl.col(c).fill_null(0.0).sum().alias(c) for c in asset_cols]
161 return (
162 self._combined()
163 .sort(date_col)
164 .group_by_dynamic(date_col, every=polars_period)
165 .agg(agg_exprs)
166 .sort(date_col)
167 )
169 def aggregate_returns(self, period: str = "1mo", compounded: bool = True) -> pl.DataFrame:
170 """Alias for :meth:`group_returns`.
172 Matches ``quantstats.utils.aggregate_returns``.
174 Args:
175 period: Aggregation period. See :meth:`group_returns` for accepted values.
176 compounded: Whether to compound returns. Defaults to ``True``.
178 Returns:
179 DataFrame with one row per period and one column per asset.
181 """
182 return self.group_returns(period=period, compounded=compounded)
184 def to_excess_returns(self, rf: float = 0.0, nperiods: int | None = None) -> pl.DataFrame:
185 """Subtract a risk-free rate from returns.
187 When *nperiods* is supplied the annual *rf* is converted to a
188 per-period rate via ``(1 + rf)^(1/nperiods) - 1``, matching
189 ``quantstats.utils.to_excess_returns``.
191 Args:
192 rf: Annual risk-free rate as a decimal (e.g. ``0.05`` for 5 %).
193 Defaults to ``0.0``.
194 nperiods: Number of return periods per year used to convert *rf*
195 to a per-period rate. When ``None`` *rf* is applied as-is.
197 Returns:
198 DataFrame of excess returns with the same columns as the input.
200 """
201 rf_per_period = ((1.0 + rf) ** (1.0 / nperiods) - 1.0) if nperiods is not None else rf
202 asset_cols = self._asset_cols()
203 return self._combined().with_columns([(pl.col(c) - rf_per_period).alias(c) for c in asset_cols])
205 def exponential_stdev(self, window: int = 30, is_halflife: bool = False) -> pl.DataFrame:
206 """Compute the exponentially weighted standard deviation of returns.
208 Matches ``quantstats.utils.exponential_stdev``. Uses Polars
209 ``ewm_std`` under the hood.
211 Args:
212 window: Span (default) or half-life (when *is_halflife* is
213 ``True``) of the exponential decay. Defaults to ``30``.
214 is_halflife: When ``True`` *window* is interpreted as the
215 half-life; otherwise it is the EWMA span. Defaults to
216 ``False``.
218 Returns:
219 DataFrame of rolling EWMA standard deviations with the same
220 columns as the input returns.
222 """
223 asset_cols = self._asset_cols()
224 if is_halflife:
225 exprs = [pl.col(c).ewm_std(half_life=window, min_samples=1).alias(c) for c in asset_cols]
226 else:
227 exprs = [pl.col(c).ewm_std(span=window, min_samples=1).alias(c) for c in asset_cols]
228 return self._combined().with_columns(exprs)