Skip to content

Streaming API

BasanosStream provides an incremental, one-row-at-a-time interface for computing optimised positions in real time without re-running the full batch engine.

BasanosStream

BasanosStream

Incremental (streaming) optimiser backed by a single :class:_StreamState.

After warming up on a historical batch via :meth:from_warmup, each call to :meth:step advances the internal state by exactly one row in O(N^2) time — without revisiting the full warmup history.

Attributes:

Name Type Description
assets list[str]

Ordered list of asset column names (read-only).

Examples:

>>> import numpy as np
>>> import polars as pl
>>> from datetime import date, timedelta
>>> from basanos.math import BasanosConfig, BasanosStream
>>> rng = np.random.default_rng(0)
>>> warmup_len = 60
>>> dates = pl.date_range(
...     start=date(2024, 1, 1),
...     end=date(2024, 1, 1) + timedelta(days=warmup_len),
...     interval="1d",
...     eager=True,
... )
>>> prices = pl.DataFrame({
...     "date": dates,
...     "A": np.cumprod(1 + rng.normal(0.001, 0.02, warmup_len + 1)) * 100.0,
...     "B": np.cumprod(1 + rng.normal(0.001, 0.02, warmup_len + 1)) * 150.0,
... })
>>> mu = pl.DataFrame({
...     "date": dates,
...     "A": rng.normal(0, 0.5, warmup_len + 1),
...     "B": rng.normal(0, 0.5, warmup_len + 1),
... })
>>> cfg = BasanosConfig(vola=5, corr=10, clip=3.0, shrink=0.5, aum=1e6)
>>> stream = BasanosStream.from_warmup(prices.head(warmup_len), mu.head(warmup_len), cfg)
>>> result = stream.step(
...     prices.select(["A", "B"]).to_numpy()[warmup_len],
...     mu.select(["A", "B"]).to_numpy()[warmup_len],
...     prices["date"][warmup_len],
... )
>>> isinstance(result, StepResult)
True
>>> result.cash_position.shape
(2,)

_cfg: BasanosConfig instance-attribute

_assets: list[str] instance-attribute

_state: _StreamState instance-attribute

assets: list[str] property

Ordered list of asset column names.

__init__(cfg: BasanosConfig, assets: list[str], state: _StreamState) -> None

Initialise from an explicit config, asset list, and state container.

__setattr__(name: str, value: object) -> None

Prevent accidental attribute mutation — BasanosStream is immutable.

from_warmup(prices: pl.DataFrame, mu: pl.DataFrame, cfg: BasanosConfig) -> BasanosStream classmethod

Build a :class:BasanosStream from a historical warmup batch.

Runs :class:~basanos.math.BasanosEngine on the full warmup batch exactly once and extracts the minimal IIR-filter state required for subsequent :meth:step calls. After this call, each :meth:step advances the optimiser in O(N^2) time without touching the warmup data again.

Parameters

prices: Historical price DataFrame. Must contain a 'date' column and at least one numeric asset column with strictly positive, non-monotonic values. mu: Expected-return signal DataFrame aligned row-by-row with prices. cfg: Engine configuration. Both :class:~basanos.math.EwmaShrinkConfig and :class:~basanos.math.SlidingWindowConfig are supported.

Returns:

BasanosStream A stream instance whose :meth:step method is ready to accept the row immediately following the last warmup row.

Notes:

Short-warmup behaviour with SlidingWindowConfig: when len(prices) < cfg.covariance_config.window, the internal rolling buffer (sw_ret_buf) is NaN-padded for the missing prefix rows. :meth:step returns StepResult(status="warmup") for each of the first window - len(prices) calls, exactly matching the EWM warmup semantics. By the time :meth:step returns the first non-warmup result the buffer contains only real data — no NaN-padded rows remain.

Raises:

MissingDateColumnError If 'date' is absent from prices.

step(new_prices: np.ndarray | dict[str, float], new_mu: np.ndarray | dict[str, float], date: Any = None) -> StepResult

Advance the stream by one row and return the new optimised position.

Parameters

new_prices: Per-asset prices for the new timestep. Either a numpy array of shape (N,) (assets ordered as in :attr:assets) or a dict mapping asset names to price values. new_mu: Per-asset expected-return signals, same format as new_prices. date: Timestamp for this step (stored in :attr:StepResult.date verbatim; not used in any computation).

Returns:

StepResult Frozen dataclass with cash_position, vola, status, and date for this timestep.

save(path: str | os.PathLike[str]) -> None

Serialise the stream to a .npz archive at path.

All :class:_StreamState arrays, the configuration, and the asset list are written in a single :func:numpy.savez call. A stream restored via :meth:load produces bit-for-bit identical :meth:step output.

Parameters:

Name Type Description Default
path str | PathLike[str]

Destination file path. :func:numpy.savez appends .npz automatically when the suffix is absent.

required

Examples:

>>> import tempfile, pathlib, numpy as np
>>> import polars as pl
>>> from datetime import date, timedelta
>>> from basanos.math import BasanosConfig, BasanosStream
>>> rng = np.random.default_rng(0)
>>> n = 60
>>> end = date(2024, 1, 1) + timedelta(days=n - 1)
>>> dates = pl.date_range(
...     date(2024, 1, 1), end, interval="1d", eager=True
... )
>>> prices = pl.DataFrame({
...     "date": dates,
...     "A": np.cumprod(1 + rng.normal(0.001, 0.02, n)) * 100.0,
...     "B": np.cumprod(1 + rng.normal(0.001, 0.02, n)) * 150.0,
... })
>>> mu = pl.DataFrame({
...     "date": dates,
...     "A": rng.normal(0, 0.5, n),
...     "B": rng.normal(0, 0.5, n),
... })
>>> cfg = BasanosConfig(vola=5, corr=10, clip=3.0, shrink=0.5, aum=1e6)
>>> stream = BasanosStream.from_warmup(prices, mu, cfg)
>>> with tempfile.TemporaryDirectory() as tmp:
...     p = pathlib.Path(tmp) / "stream.npz"
...     stream.save(p)
...     restored = BasanosStream.load(p)
...     restored.assets == stream.assets
True

load(path: str | os.PathLike[str]) -> BasanosStream classmethod

Restore a stream previously saved with :meth:save.

Parameters:

Name Type Description Default
path str | PathLike[str]

Path to a .npz archive written by :meth:save.

required

Returns:

Name Type Description
A BasanosStream

class:BasanosStream whose :meth:step output is

BasanosStream

bit-for-bit identical to the original stream at the time

BasanosStream

meth:save was called.

Examples:

>>> import tempfile, pathlib, numpy as np
>>> import polars as pl
>>> from datetime import date, timedelta
>>> from basanos.math import BasanosConfig, BasanosStream
>>> rng = np.random.default_rng(1)
>>> n = 60
>>> end = date(2024, 1, 1) + timedelta(days=n - 1)
>>> dates = pl.date_range(
...     date(2024, 1, 1), end, interval="1d", eager=True
... )
>>> prices = pl.DataFrame({
...     "date": dates,
...     "A": np.cumprod(1 + rng.normal(0.001, 0.02, n)) * 100.0,
...     "B": np.cumprod(1 + rng.normal(0.001, 0.02, n)) * 150.0,
... })
>>> mu = pl.DataFrame({
...     "date": dates,
...     "A": rng.normal(0, 0.5, n),
...     "B": rng.normal(0, 0.5, n),
... })
>>> cfg = BasanosConfig(vola=5, corr=10, clip=3.0, shrink=0.5, aum=1e6)
>>> stream = BasanosStream.from_warmup(prices, mu, cfg)
>>> with tempfile.TemporaryDirectory() as tmp:
...     p = pathlib.Path(tmp) / "stream.npz"
...     stream.save(p)
...     restored = BasanosStream.load(p)
...     restored.assets == stream.assets
True

StepResult

StepResult dataclass

Frozen dataclass representing the output of a single BasanosStream step.

Each call to BasanosStream.step() returns one StepResult capturing the optimised cash positions, the per-asset volatility estimate, the step date, and a status label that describes the solver outcome for that timestep.

Attributes:

Name Type Description
date object

The timestamp or date label for this step. The type mirrors whatever is stored in the 'date' column of the input prices DataFrame (typically a Python :class:datetime.date, :class:datetime.datetime, or a Polars temporal scalar).

cash_position ndarray

Optimised cash-position vector, shape (N,). Entries are NaN for assets that are still in the EWMA warmup period or that are otherwise inactive at this step.

status SolveStatus

Solver outcome label for this timestep (:class:~basanos.math.SolveStatus). Since :class:SolveStatus is a StrEnum, values compare equal to their string equivalents (e.g. result.status == "valid" is True):

  • 'warmup' — fewer rows have been seen than the EWMA warmup requires; all positions are NaN.
  • 'zero_signal' — the expected-return signal vector mu is identically zero; positions are set to zero rather than solved.
  • 'degenerate' — the covariance matrix is ill-conditioned or numerically singular; positions cannot be computed reliably and are returned as NaN.
  • 'valid' — normal operation; cash_position holds the optimised allocations.
vola ndarray

Per-asset EWMA percentage-return volatility, shape (N,). Values are NaN during the warmup period before the EWMA has accumulated sufficient history.

Examples:

>>> import numpy as np
>>> result = StepResult(
...     date="2024-01-02",
...     cash_position=np.array([1000.0, -500.0]),
...     status="valid",
...     vola=np.array([0.012, 0.018]),
... )
>>> result.status
'valid'
>>> result.cash_position.shape
(2,)

date: object instance-attribute

cash_position: np.ndarray instance-attribute

status: SolveStatus instance-attribute

vola: np.ndarray instance-attribute

__init__(date: object, cash_position: np.ndarray, status: SolveStatus, vola: np.ndarray) -> None