Streaming API¶
BasanosStream provides an incremental, one-row-at-a-time interface for
computing optimised positions in real time without re-running the full
batch engine.
BasanosStream¶
BasanosStream
¶
Incremental (streaming) optimiser backed by a single :class:_StreamState.
After warming up on a historical batch via :meth:from_warmup, each call
to :meth:step advances the internal state by exactly one row in
O(N^2) time — without revisiting the full warmup history.
Attributes:
| Name | Type | Description |
|---|---|---|
assets |
list[str]
|
Ordered list of asset column names (read-only). |
Examples:
>>> import numpy as np
>>> import polars as pl
>>> from datetime import date, timedelta
>>> from basanos.math import BasanosConfig, BasanosStream
>>> rng = np.random.default_rng(0)
>>> warmup_len = 60
>>> dates = pl.date_range(
... start=date(2024, 1, 1),
... end=date(2024, 1, 1) + timedelta(days=warmup_len),
... interval="1d",
... eager=True,
... )
>>> prices = pl.DataFrame({
... "date": dates,
... "A": np.cumprod(1 + rng.normal(0.001, 0.02, warmup_len + 1)) * 100.0,
... "B": np.cumprod(1 + rng.normal(0.001, 0.02, warmup_len + 1)) * 150.0,
... })
>>> mu = pl.DataFrame({
... "date": dates,
... "A": rng.normal(0, 0.5, warmup_len + 1),
... "B": rng.normal(0, 0.5, warmup_len + 1),
... })
>>> cfg = BasanosConfig(vola=5, corr=10, clip=3.0, shrink=0.5, aum=1e6)
>>> stream = BasanosStream.from_warmup(prices.head(warmup_len), mu.head(warmup_len), cfg)
>>> result = stream.step(
... prices.select(["A", "B"]).to_numpy()[warmup_len],
... mu.select(["A", "B"]).to_numpy()[warmup_len],
... prices["date"][warmup_len],
... )
>>> isinstance(result, StepResult)
True
>>> result.cash_position.shape
(2,)
_cfg: BasanosConfig
instance-attribute
¶
_assets: list[str]
instance-attribute
¶
_state: _StreamState
instance-attribute
¶
assets: list[str]
property
¶
Ordered list of asset column names.
__init__(cfg: BasanosConfig, assets: list[str], state: _StreamState) -> None
¶
Initialise from an explicit config, asset list, and state container.
__setattr__(name: str, value: object) -> None
¶
Prevent accidental attribute mutation — BasanosStream is immutable.
from_warmup(prices: pl.DataFrame, mu: pl.DataFrame, cfg: BasanosConfig) -> BasanosStream
classmethod
¶
Build a :class:BasanosStream from a historical warmup batch.
Runs :class:~basanos.math.BasanosEngine on the full warmup batch
exactly once and extracts the minimal IIR-filter state required for
subsequent :meth:step calls. After this call, each :meth:step
advances the optimiser in O(N^2) time without touching the warmup
data again.
Parameters¶
prices:
Historical price DataFrame. Must contain a 'date' column and
at least one numeric asset column with strictly positive,
non-monotonic values.
mu:
Expected-return signal DataFrame aligned row-by-row with
prices.
cfg:
Engine configuration. Both :class:~basanos.math.EwmaShrinkConfig
and :class:~basanos.math.SlidingWindowConfig are supported.
Returns:¶
BasanosStream
A stream instance whose :meth:step method is ready to accept the
row immediately following the last warmup row.
Notes:¶
Short-warmup behaviour with SlidingWindowConfig: when
len(prices) < cfg.covariance_config.window, the internal rolling
buffer (sw_ret_buf) is NaN-padded for the missing prefix rows.
:meth:step returns StepResult(status="warmup") for each of the
first window - len(prices) calls, exactly matching the EWM warmup
semantics. By the time :meth:step returns the first non-warmup
result the buffer contains only real data — no NaN-padded rows remain.
Raises:¶
MissingDateColumnError
If 'date' is absent from prices.
step(new_prices: np.ndarray | dict[str, float], new_mu: np.ndarray | dict[str, float], date: Any = None) -> StepResult
¶
Advance the stream by one row and return the new optimised position.
Parameters¶
new_prices:
Per-asset prices for the new timestep. Either a numpy array of
shape (N,) (assets ordered as in :attr:assets) or a dict
mapping asset names to price values.
new_mu:
Per-asset expected-return signals, same format as new_prices.
date:
Timestamp for this step (stored in :attr:StepResult.date
verbatim; not used in any computation).
Returns:¶
StepResult
Frozen dataclass with cash_position, vola, status, and
date for this timestep.
save(path: str | os.PathLike[str]) -> None
¶
Serialise the stream to a .npz archive at path.
All :class:_StreamState arrays, the configuration, and the asset
list are written in a single :func:numpy.savez call. A stream
restored via :meth:load produces bit-for-bit identical
:meth:step output.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | PathLike[str]
|
Destination file path. :func: |
required |
Examples:
>>> import tempfile, pathlib, numpy as np
>>> import polars as pl
>>> from datetime import date, timedelta
>>> from basanos.math import BasanosConfig, BasanosStream
>>> rng = np.random.default_rng(0)
>>> n = 60
>>> end = date(2024, 1, 1) + timedelta(days=n - 1)
>>> dates = pl.date_range(
... date(2024, 1, 1), end, interval="1d", eager=True
... )
>>> prices = pl.DataFrame({
... "date": dates,
... "A": np.cumprod(1 + rng.normal(0.001, 0.02, n)) * 100.0,
... "B": np.cumprod(1 + rng.normal(0.001, 0.02, n)) * 150.0,
... })
>>> mu = pl.DataFrame({
... "date": dates,
... "A": rng.normal(0, 0.5, n),
... "B": rng.normal(0, 0.5, n),
... })
>>> cfg = BasanosConfig(vola=5, corr=10, clip=3.0, shrink=0.5, aum=1e6)
>>> stream = BasanosStream.from_warmup(prices, mu, cfg)
>>> with tempfile.TemporaryDirectory() as tmp:
... p = pathlib.Path(tmp) / "stream.npz"
... stream.save(p)
... restored = BasanosStream.load(p)
... restored.assets == stream.assets
True
load(path: str | os.PathLike[str]) -> BasanosStream
classmethod
¶
Restore a stream previously saved with :meth:save.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | PathLike[str]
|
Path to a |
required |
Returns:
| Name | Type | Description |
|---|---|---|
A |
BasanosStream
|
class: |
BasanosStream
|
bit-for-bit identical to the original stream at the time |
|
BasanosStream
|
meth: |
Examples:
>>> import tempfile, pathlib, numpy as np
>>> import polars as pl
>>> from datetime import date, timedelta
>>> from basanos.math import BasanosConfig, BasanosStream
>>> rng = np.random.default_rng(1)
>>> n = 60
>>> end = date(2024, 1, 1) + timedelta(days=n - 1)
>>> dates = pl.date_range(
... date(2024, 1, 1), end, interval="1d", eager=True
... )
>>> prices = pl.DataFrame({
... "date": dates,
... "A": np.cumprod(1 + rng.normal(0.001, 0.02, n)) * 100.0,
... "B": np.cumprod(1 + rng.normal(0.001, 0.02, n)) * 150.0,
... })
>>> mu = pl.DataFrame({
... "date": dates,
... "A": rng.normal(0, 0.5, n),
... "B": rng.normal(0, 0.5, n),
... })
>>> cfg = BasanosConfig(vola=5, corr=10, clip=3.0, shrink=0.5, aum=1e6)
>>> stream = BasanosStream.from_warmup(prices, mu, cfg)
>>> with tempfile.TemporaryDirectory() as tmp:
... p = pathlib.Path(tmp) / "stream.npz"
... stream.save(p)
... restored = BasanosStream.load(p)
... restored.assets == stream.assets
True
StepResult¶
StepResult
dataclass
¶
Frozen dataclass representing the output of a single BasanosStream step.
Each call to BasanosStream.step() returns one StepResult capturing
the optimised cash positions, the per-asset volatility estimate, the step
date, and a status label that describes the solver outcome for that
timestep.
Attributes:
| Name | Type | Description |
|---|---|---|
date |
object
|
The timestamp or date label for this step. The type mirrors
whatever is stored in the |
cash_position |
ndarray
|
Optimised cash-position vector, shape |
status |
SolveStatus
|
Solver outcome label for this timestep
(:class:
|
vola |
ndarray
|
Per-asset EWMA percentage-return volatility, shape |
Examples:
>>> import numpy as np
>>> result = StepResult(
... date="2024-01-02",
... cash_position=np.array([1000.0, -500.0]),
... status="valid",
... vola=np.array([0.012, 0.018]),
... )
>>> result.status
'valid'
>>> result.cash_position.shape
(2,)