Coverage for src/jquantstats/_stats/_periodic.py: 100%
83 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-23 06:13 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-23 06:13 +0000
1"""Period-bucketed reporting tables for financial returns data.
3Tabular, period-grouped views of returns: the monthly-returns pivot, the
4inlier/outlier distribution across calendar frequencies, the benchmark
5comparison table, and the worst-N-periods list.
6"""
8from __future__ import annotations
10from typing import TYPE_CHECKING, cast
12import polars as pl
14if TYPE_CHECKING:
15 from ..data import Data
17# ── Periodic reporting mixin ──────────────────────────────────────────────────
20class _PeriodicReportingMixin:
21 """Mixin providing period-bucketed reporting tables.
23 Covers: monthly-returns pivot table, distribution across calendar
24 frequencies (daily…yearly), benchmark comparison table, and worst-N periods.
25 """
27 _data: Data
28 all: pl.DataFrame
30 if TYPE_CHECKING:
31 from .._protocol import DataLike
33 data: DataLike
35 def monthly_returns(self, eoy: bool = True, compounded: bool = True) -> dict[str, pl.DataFrame]:
36 """Calculate monthly returns in a pivot-table format.
38 Groups returns by calendar month and year, producing a DataFrame with
39 years as rows and months (JAN-DEC) as columns, plus an optional EOY
40 column with the full-year compounded return.
42 Args:
43 eoy (bool): Include an EOY column with the annual compounded return.
44 Defaults to True.
45 compounded (bool): Compound returns within each period. Defaults to True.
47 Returns:
48 dict[str, pl.DataFrame]: Per-asset pivot tables with columns
49 ``year``, ``JAN`` … ``DEC``, and optionally ``EOY``.
51 """
52 all_df = self.all
53 date_col_name = self._data.date_col[0]
54 month_names = {
55 1: "JAN",
56 2: "FEB",
57 3: "MAR",
58 4: "APR",
59 5: "MAY",
60 6: "JUN",
61 7: "JUL",
62 8: "AUG",
63 9: "SEP",
64 10: "OCT",
65 11: "NOV",
66 12: "DEC",
67 }
68 month_order = list(month_names.values())
70 result: dict[str, pl.DataFrame] = {}
71 for col, series in self._data.items():
72 df = pl.DataFrame({"date": all_df[date_col_name], "ret": series}).drop_nulls()
73 df = df.with_columns(
74 [
75 pl.col("date").dt.year().alias("year"),
76 pl.col("date").dt.month().alias("month_num"),
77 ]
78 )
80 agg_expr = ((1.0 + pl.col("ret")).product() - 1.0) if compounded else pl.col("ret").sum()
81 monthly = (
82 df.group_by(["year", "month_num"])
83 .agg(agg_expr.alias("ret"))
84 .with_columns(
85 pl.col("month_num")
86 .replace_strict(
87 list(month_names.keys()),
88 list(month_names.values()),
89 return_dtype=pl.String,
90 )
91 .alias("month_name")
92 )
93 .sort(["year", "month_num"])
94 )
96 pivoted = monthly.pivot(on="month_name", index="year", values="ret", aggregate_function="first")
97 for m in month_order:
98 if m not in pivoted.columns:
99 pivoted = pivoted.with_columns(pl.lit(0.0).alias(m))
100 pivoted = (
101 pivoted.select(["year", *month_order])
102 .fill_null(0.0)
103 .with_columns(pl.col("year").cast(pl.Int32))
104 .sort("year")
105 )
107 if eoy:
108 eoy_agg = (
109 df.group_by("year")
110 .agg(agg_expr.alias("EOY"))
111 .with_columns(pl.col("year").cast(pl.Int32))
112 .sort("year")
113 )
114 pivoted = pivoted.join(eoy_agg, on="year").sort("year")
116 result[col] = pivoted
117 return result
119 def distribution(self, compounded: bool = True) -> dict[str, dict[str, dict[str, list[float]]]]:
120 """Analyse return distributions across daily, weekly, monthly, quarterly, and yearly periods.
122 For each period, splits values into inliers and outliers using the
123 IQR method (1.5 * IQR beyond Q1/Q3).
125 Args:
126 compounded (bool): Compound returns within each period. Defaults to True.
128 Returns:
129 dict: Nested dict ``{asset: {period: {"values": [...], "outliers": [...]}}}``
130 where period is one of ``"Daily"``, ``"Weekly"``, ``"Monthly"``,
131 ``"Quarterly"``, ``"Yearly"``.
133 """
134 all_df = self.all
135 date_col_name = self._data.date_col[0]
137 def _agg(df: pl.DataFrame, group_col: str) -> pl.Series:
138 """Aggregate returns within each group using product or sum."""
139 expr = ((1.0 + pl.col("ret")).product() - 1.0) if compounded else pl.col("ret").sum()
140 return df.group_by(group_col).agg(expr.alias("ret"))["ret"]
142 def _iqr_split(s: pl.Series) -> dict[str, list[float]]:
143 """Split series into inliers and outliers using the IQR method."""
144 q1 = cast(float, s.quantile(0.25))
145 q3 = cast(float, s.quantile(0.75))
146 iqr = q3 - q1
147 mask = (s >= q1 - 1.5 * iqr) & (s <= q3 + 1.5 * iqr)
148 return {"values": s.filter(mask).to_list(), "outliers": s.filter(~mask).to_list()}
150 result: dict[str, dict[str, dict[str, list[float]]]] = {}
151 for col, series in self._data.items():
152 df = pl.DataFrame({"date": all_df[date_col_name], "ret": series}).drop_nulls()
153 df = df.with_columns(
154 [
155 pl.col("date").dt.truncate("1w").alias("week"),
156 pl.col("date").dt.truncate("1mo").alias("month"),
157 pl.col("date").dt.truncate("3mo").alias("quarter"),
158 pl.col("date").dt.truncate("1y").alias("year"),
159 ]
160 )
161 result[col] = {
162 "Daily": _iqr_split(df["ret"]),
163 "Weekly": _iqr_split(_agg(df, "week")),
164 "Monthly": _iqr_split(_agg(df, "month")),
165 "Quarterly": _iqr_split(_agg(df, "quarter")),
166 "Yearly": _iqr_split(_agg(df, "year")),
167 }
168 return result
170 def compare(
171 self,
172 aggregate: str | None = None,
173 compounded: bool = True,
174 round_vals: int | None = None,
175 ) -> dict[str, pl.DataFrame]:
176 """Compare each asset's returns against the benchmark.
178 Aligns returns and benchmark by date, multiplies by 100 (percentage),
179 then computes a ``Multiplier`` (Returns / Benchmark) and ``Won``
180 indicator (``"+"`` when the asset outperformed, ``"-"`` otherwise).
182 Args:
183 aggregate (str | None): Pandas-style resample frequency for
184 period aggregation (e.g. ``"ME"``, ``"QE"``, ``"YE"``).
185 ``None`` returns daily rows. Defaults to None.
186 compounded (bool): Compound returns when aggregating. Defaults to True.
187 round_vals (int | None): Decimal places to round. Defaults to None.
189 Returns:
190 dict[str, pl.DataFrame]: Per-asset DataFrames with columns
191 ``Benchmark``, ``Returns``, ``Multiplier``, ``Won``.
193 Raises:
194 AttributeError: If no benchmark data is attached.
196 """
197 if self._data.benchmark is None:
198 raise AttributeError("No benchmark data available") # noqa: TRY003
200 all_df = self.all
201 date_col_name = self._data.date_col[0]
202 bench_col = self._data.benchmark.columns[0]
204 _freq_map = {"ME": "1mo", "QE": "3mo", "YE": "1y", "W": "1w"}
206 def _agg_series(df: pl.DataFrame, period_col: str, val_col: str) -> pl.DataFrame:
207 """Aggregate a value column grouped by period using product or sum."""
208 expr = ((1.0 + pl.col(val_col)).product() - 1.0) if compounded else pl.col(val_col).sum()
209 return df.group_by(period_col).agg(expr.alias(val_col)).sort(period_col)
211 result: dict[str, pl.DataFrame] = {}
212 for col in self._data.returns.columns:
213 df = all_df.select(
214 [
215 pl.col(date_col_name),
216 pl.col(col).alias("ret"),
217 pl.col(bench_col).alias("bench"),
218 ]
219 )
221 if aggregate is not None and aggregate in _freq_map:
222 trunc = _freq_map[aggregate]
223 df = df.with_columns(pl.col(date_col_name).dt.truncate(trunc).alias("period"))
224 ret_agg = _agg_series(df.drop_nulls(subset=["ret"]), "period", "ret")
225 bench_agg = _agg_series(df.drop_nulls(subset=["bench"]), "period", "bench")
226 df = ret_agg.join(bench_agg, on="period", how="full", coalesce=True).sort("period")
227 ret_col, bench_col_name, _date_alias = "ret", "bench", "period"
228 else:
229 ret_col, bench_col_name, _date_alias = "ret", "bench", date_col_name
231 ret_pct = (df[ret_col] * 100).alias("Returns")
232 bench_pct = (df[bench_col_name] * 100).alias("Benchmark")
233 out = pl.DataFrame(
234 {
235 "Benchmark": bench_pct,
236 "Returns": ret_pct,
237 }
238 )
239 out = out.with_columns(
240 [
241 (pl.col("Returns") / pl.col("Benchmark").replace(0.0, None)).alias("Multiplier"),
242 pl.when(pl.col("Returns") >= pl.col("Benchmark"))
243 .then(pl.lit("+"))
244 .otherwise(pl.lit("-"))
245 .alias("Won"),
246 ]
247 )
249 if round_vals is not None:
250 out = out.with_columns(
251 [
252 pl.col("Benchmark").round(round_vals),
253 pl.col("Returns").round(round_vals),
254 pl.col("Multiplier").round(round_vals),
255 ]
256 )
258 result[col] = out
259 return result
261 def worst_n_periods(self, n: int = 5) -> dict[str, list[float | None]]:
262 """Return the N worst return periods per asset.
264 If a series has fewer than ``n`` non-null observations the list is
265 padded with ``None`` on the right.
267 Args:
268 n: Number of worst periods to return. Defaults to 5.
270 Returns:
271 dict[str, list[float | None]]: Sorted worst returns per asset.
272 """
273 result: dict[str, list[float | None]] = {}
274 for col, series in self._data.items():
275 nonnull = series.drop_nulls()
276 worst: list[float | None] = nonnull.sort(descending=False).head(n).to_list()
277 while len(worst) < n:
278 worst.append(None)
279 result[col] = worst
280 return result