Skip to content

Streaming API Guide

BasanosStream provides an incremental, one-row-at-a-time interface for computing optimised positions in real-time without re-running the full batch engine. It is designed for live trading systems, paper-trading loops, and research workflows where data arrives sequentially.

For the complete API reference see Streaming API.


When to use BasanosStream vs BasanosEngine

BasanosEngine BasanosStream
Use case Batch backtests, research Live feeds, sequential updates
Input Full price + signal history One row at a time
State Recomputed from scratch Maintained incrementally
Memory O(T·N²) peak O(N²) — only current state
Per-step cost O(N²) correlation update + O(N³) or O(k³+kN) solve

Quick start

Step 1 — warm up from historical data

import numpy as np
import polars as pl
from basanos.math import BasanosConfig, BasanosStream

cfg = BasanosConfig(vola=16, corr=32, clip=3.5, shrink=0.5, aum=1e6)

# Use a history of prices + signals to initialise state
# prices and mu are pl.DataFrames with a "date" column and one column per asset
stream = BasanosStream.from_warmup(prices=prices, mu=mu, cfg=cfg)

Step 2 — advance one row at a time

# new_prices and new_mu are single-row pl.DataFrames or dicts
result = stream.step(new_prices=new_prices, new_mu=new_mu)

# result.status is one of: "warmup", "zero_signal", "degenerate", "valid"
if result.status == "valid":
    cash_pos = result.cash_position  # dict[str, float] — one value per asset

StepResult

Each call to stream.step() returns a StepResult frozen dataclass:

Field Type Description
status str "warmup" / "zero_signal" / "degenerate" / "valid"
cash_position dict[str, float] \| None Cash positions, or None when status is not "valid"
risk_position dict[str, float] \| None Risk positions before vol-scaling, or None

Status codes

Status Meaning
warmup Not enough rows have been processed yet (step_count < cfg.corr); positions are zero
zero_signal All signal values are zero or below cfg.denom_tol; no solve performed
degenerate Correlation matrix is singular or ill-conditioned; positions are zeroed for safety
valid Normal operation; cash_position contains usable values

Immutability

BasanosStream instances are immutablestream.step() returns a new BasanosStream with updated internal state rather than mutating the original. This makes it safe to checkpoint state and replay from any point:

# Save a checkpoint before processing a risky period
checkpoint = stream

# Process live rows
for row_prices, row_mu in live_feed:
    result = stream.step(row_prices, row_mu)
    stream = result.next_stream  # advance to the updated stream

# Roll back to checkpoint if needed
stream = checkpoint

Performance notes

  • Per-step cost is O(N²) for the IIR correlation update and O(N³) (EWMA mode) or O(k³ + kN) (sliding-window mode) for the linear solve.
  • State is a fixed-size object regardless of how many rows have been processed — memory does not grow over time.
  • For large universes (N > 200) prefer SlidingWindowConfig to reduce the per-step solve cost; see Factor Models and Concepts — Choosing Between Modes.