Coverage for src/jquantstats/_stats/_periodic.py: 100%

83 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-23 06:13 +0000

1"""Period-bucketed reporting tables for financial returns data. 

2 

3Tabular, period-grouped views of returns: the monthly-returns pivot, the 

4inlier/outlier distribution across calendar frequencies, the benchmark 

5comparison table, and the worst-N-periods list. 

6""" 

7 

8from __future__ import annotations 

9 

10from typing import TYPE_CHECKING, cast 

11 

12import polars as pl 

13 

14if TYPE_CHECKING: 

15 from ..data import Data 

16 

17# ── Periodic reporting mixin ────────────────────────────────────────────────── 

18 

19 

20class _PeriodicReportingMixin: 

21 """Mixin providing period-bucketed reporting tables. 

22 

23 Covers: monthly-returns pivot table, distribution across calendar 

24 frequencies (daily…yearly), benchmark comparison table, and worst-N periods. 

25 """ 

26 

27 _data: Data 

28 all: pl.DataFrame 

29 

30 if TYPE_CHECKING: 

31 from .._protocol import DataLike 

32 

33 data: DataLike 

34 

35 def monthly_returns(self, eoy: bool = True, compounded: bool = True) -> dict[str, pl.DataFrame]: 

36 """Calculate monthly returns in a pivot-table format. 

37 

38 Groups returns by calendar month and year, producing a DataFrame with 

39 years as rows and months (JAN-DEC) as columns, plus an optional EOY 

40 column with the full-year compounded return. 

41 

42 Args: 

43 eoy (bool): Include an EOY column with the annual compounded return. 

44 Defaults to True. 

45 compounded (bool): Compound returns within each period. Defaults to True. 

46 

47 Returns: 

48 dict[str, pl.DataFrame]: Per-asset pivot tables with columns 

49 ``year``, ``JAN`` … ``DEC``, and optionally ``EOY``. 

50 

51 """ 

52 all_df = self.all 

53 date_col_name = self._data.date_col[0] 

54 month_names = { 

55 1: "JAN", 

56 2: "FEB", 

57 3: "MAR", 

58 4: "APR", 

59 5: "MAY", 

60 6: "JUN", 

61 7: "JUL", 

62 8: "AUG", 

63 9: "SEP", 

64 10: "OCT", 

65 11: "NOV", 

66 12: "DEC", 

67 } 

68 month_order = list(month_names.values()) 

69 

70 result: dict[str, pl.DataFrame] = {} 

71 for col, series in self._data.items(): 

72 df = pl.DataFrame({"date": all_df[date_col_name], "ret": series}).drop_nulls() 

73 df = df.with_columns( 

74 [ 

75 pl.col("date").dt.year().alias("year"), 

76 pl.col("date").dt.month().alias("month_num"), 

77 ] 

78 ) 

79 

80 agg_expr = ((1.0 + pl.col("ret")).product() - 1.0) if compounded else pl.col("ret").sum() 

81 monthly = ( 

82 df.group_by(["year", "month_num"]) 

83 .agg(agg_expr.alias("ret")) 

84 .with_columns( 

85 pl.col("month_num") 

86 .replace_strict( 

87 list(month_names.keys()), 

88 list(month_names.values()), 

89 return_dtype=pl.String, 

90 ) 

91 .alias("month_name") 

92 ) 

93 .sort(["year", "month_num"]) 

94 ) 

95 

96 pivoted = monthly.pivot(on="month_name", index="year", values="ret", aggregate_function="first") 

97 for m in month_order: 

98 if m not in pivoted.columns: 

99 pivoted = pivoted.with_columns(pl.lit(0.0).alias(m)) 

100 pivoted = ( 

101 pivoted.select(["year", *month_order]) 

102 .fill_null(0.0) 

103 .with_columns(pl.col("year").cast(pl.Int32)) 

104 .sort("year") 

105 ) 

106 

107 if eoy: 

108 eoy_agg = ( 

109 df.group_by("year") 

110 .agg(agg_expr.alias("EOY")) 

111 .with_columns(pl.col("year").cast(pl.Int32)) 

112 .sort("year") 

113 ) 

114 pivoted = pivoted.join(eoy_agg, on="year").sort("year") 

115 

116 result[col] = pivoted 

117 return result 

118 

119 def distribution(self, compounded: bool = True) -> dict[str, dict[str, dict[str, list[float]]]]: 

120 """Analyse return distributions across daily, weekly, monthly, quarterly, and yearly periods. 

121 

122 For each period, splits values into inliers and outliers using the 

123 IQR method (1.5 * IQR beyond Q1/Q3). 

124 

125 Args: 

126 compounded (bool): Compound returns within each period. Defaults to True. 

127 

128 Returns: 

129 dict: Nested dict ``{asset: {period: {"values": [...], "outliers": [...]}}}`` 

130 where period is one of ``"Daily"``, ``"Weekly"``, ``"Monthly"``, 

131 ``"Quarterly"``, ``"Yearly"``. 

132 

133 """ 

134 all_df = self.all 

135 date_col_name = self._data.date_col[0] 

136 

137 def _agg(df: pl.DataFrame, group_col: str) -> pl.Series: 

138 """Aggregate returns within each group using product or sum.""" 

139 expr = ((1.0 + pl.col("ret")).product() - 1.0) if compounded else pl.col("ret").sum() 

140 return df.group_by(group_col).agg(expr.alias("ret"))["ret"] 

141 

142 def _iqr_split(s: pl.Series) -> dict[str, list[float]]: 

143 """Split series into inliers and outliers using the IQR method.""" 

144 q1 = cast(float, s.quantile(0.25)) 

145 q3 = cast(float, s.quantile(0.75)) 

146 iqr = q3 - q1 

147 mask = (s >= q1 - 1.5 * iqr) & (s <= q3 + 1.5 * iqr) 

148 return {"values": s.filter(mask).to_list(), "outliers": s.filter(~mask).to_list()} 

149 

150 result: dict[str, dict[str, dict[str, list[float]]]] = {} 

151 for col, series in self._data.items(): 

152 df = pl.DataFrame({"date": all_df[date_col_name], "ret": series}).drop_nulls() 

153 df = df.with_columns( 

154 [ 

155 pl.col("date").dt.truncate("1w").alias("week"), 

156 pl.col("date").dt.truncate("1mo").alias("month"), 

157 pl.col("date").dt.truncate("3mo").alias("quarter"), 

158 pl.col("date").dt.truncate("1y").alias("year"), 

159 ] 

160 ) 

161 result[col] = { 

162 "Daily": _iqr_split(df["ret"]), 

163 "Weekly": _iqr_split(_agg(df, "week")), 

164 "Monthly": _iqr_split(_agg(df, "month")), 

165 "Quarterly": _iqr_split(_agg(df, "quarter")), 

166 "Yearly": _iqr_split(_agg(df, "year")), 

167 } 

168 return result 

169 

170 def compare( 

171 self, 

172 aggregate: str | None = None, 

173 compounded: bool = True, 

174 round_vals: int | None = None, 

175 ) -> dict[str, pl.DataFrame]: 

176 """Compare each asset's returns against the benchmark. 

177 

178 Aligns returns and benchmark by date, multiplies by 100 (percentage), 

179 then computes a ``Multiplier`` (Returns / Benchmark) and ``Won`` 

180 indicator (``"+"`` when the asset outperformed, ``"-"`` otherwise). 

181 

182 Args: 

183 aggregate (str | None): Pandas-style resample frequency for 

184 period aggregation (e.g. ``"ME"``, ``"QE"``, ``"YE"``). 

185 ``None`` returns daily rows. Defaults to None. 

186 compounded (bool): Compound returns when aggregating. Defaults to True. 

187 round_vals (int | None): Decimal places to round. Defaults to None. 

188 

189 Returns: 

190 dict[str, pl.DataFrame]: Per-asset DataFrames with columns 

191 ``Benchmark``, ``Returns``, ``Multiplier``, ``Won``. 

192 

193 Raises: 

194 AttributeError: If no benchmark data is attached. 

195 

196 """ 

197 if self._data.benchmark is None: 

198 raise AttributeError("No benchmark data available") # noqa: TRY003 

199 

200 all_df = self.all 

201 date_col_name = self._data.date_col[0] 

202 bench_col = self._data.benchmark.columns[0] 

203 

204 _freq_map = {"ME": "1mo", "QE": "3mo", "YE": "1y", "W": "1w"} 

205 

206 def _agg_series(df: pl.DataFrame, period_col: str, val_col: str) -> pl.DataFrame: 

207 """Aggregate a value column grouped by period using product or sum.""" 

208 expr = ((1.0 + pl.col(val_col)).product() - 1.0) if compounded else pl.col(val_col).sum() 

209 return df.group_by(period_col).agg(expr.alias(val_col)).sort(period_col) 

210 

211 result: dict[str, pl.DataFrame] = {} 

212 for col in self._data.returns.columns: 

213 df = all_df.select( 

214 [ 

215 pl.col(date_col_name), 

216 pl.col(col).alias("ret"), 

217 pl.col(bench_col).alias("bench"), 

218 ] 

219 ) 

220 

221 if aggregate is not None and aggregate in _freq_map: 

222 trunc = _freq_map[aggregate] 

223 df = df.with_columns(pl.col(date_col_name).dt.truncate(trunc).alias("period")) 

224 ret_agg = _agg_series(df.drop_nulls(subset=["ret"]), "period", "ret") 

225 bench_agg = _agg_series(df.drop_nulls(subset=["bench"]), "period", "bench") 

226 df = ret_agg.join(bench_agg, on="period", how="full", coalesce=True).sort("period") 

227 ret_col, bench_col_name, _date_alias = "ret", "bench", "period" 

228 else: 

229 ret_col, bench_col_name, _date_alias = "ret", "bench", date_col_name 

230 

231 ret_pct = (df[ret_col] * 100).alias("Returns") 

232 bench_pct = (df[bench_col_name] * 100).alias("Benchmark") 

233 out = pl.DataFrame( 

234 { 

235 "Benchmark": bench_pct, 

236 "Returns": ret_pct, 

237 } 

238 ) 

239 out = out.with_columns( 

240 [ 

241 (pl.col("Returns") / pl.col("Benchmark").replace(0.0, None)).alias("Multiplier"), 

242 pl.when(pl.col("Returns") >= pl.col("Benchmark")) 

243 .then(pl.lit("+")) 

244 .otherwise(pl.lit("-")) 

245 .alias("Won"), 

246 ] 

247 ) 

248 

249 if round_vals is not None: 

250 out = out.with_columns( 

251 [ 

252 pl.col("Benchmark").round(round_vals), 

253 pl.col("Returns").round(round_vals), 

254 pl.col("Multiplier").round(round_vals), 

255 ] 

256 ) 

257 

258 result[col] = out 

259 return result 

260 

261 def worst_n_periods(self, n: int = 5) -> dict[str, list[float | None]]: 

262 """Return the N worst return periods per asset. 

263 

264 If a series has fewer than ``n`` non-null observations the list is 

265 padded with ``None`` on the right. 

266 

267 Args: 

268 n: Number of worst periods to return. Defaults to 5. 

269 

270 Returns: 

271 dict[str, list[float | None]]: Sorted worst returns per asset. 

272 """ 

273 result: dict[str, list[float | None]] = {} 

274 for col, series in self._data.items(): 

275 nonnull = series.drop_nulls() 

276 worst: list[float | None] = nonnull.sort(descending=False).head(n).to_list() 

277 while len(worst) < n: 

278 worst.append(None) 

279 result[col] = worst 

280 return result