Source code for finance_datagen._portfolio

"""Synthetic positions and transactions for post-trade workflows."""

from __future__ import annotations

from datetime import date, datetime, time, timedelta, timezone
from typing import Sequence

import numpy as np
import polars as pl
from finance_dates import Calendar
from finance_enums import (
    Currency,
    ExchangeCode,
    OrderStatus,
    OrderType,
    PositionEffect,
    Side,
    TimeInForce,
    exchange_record,
)
from pydantic import model_validator

from ._base import DataGenerator, NonNegativeFloat, PositiveFloat, PositiveInt

_TRANSACTION_INTENTS = (
    (Side.Buy.value, PositionEffect.Open.value),
    (Side.Sell.value, PositionEffect.Close.value),
    (Side.Sell.value, PositionEffect.Open.value),
    (Side.Buy.value, PositionEffect.Close.value),
)


def _date_range(n_dates: int, start: date | None) -> list[date]:
    first = date(2020, 1, 1) if start is None else start
    return [first + timedelta(days=i) for i in range(n_dates)]


def _trading_date_range(n_dates: int, start: date | None, exchange: str | None) -> list[date]:
    if exchange is None:
        return _date_range(n_dates, start)

    first = date(2020, 1, 1) if start is None else start
    calendar = Calendar.from_exchange(exchange)
    window_end = first + timedelta(days=max(10, n_dates * 3))
    dates = calendar.business_days(first, window_end)
    while len(dates) < n_dates:
        window_end = window_end + timedelta(days=max(10, n_dates * 2))
        dates = calendar.business_days(first, window_end)
    return dates[:n_dates]


def _symbols(n_assets: int, symbols: Sequence[str] | None) -> list[str]:
    values = [f"A{i:04d}" for i in range(n_assets)] if symbols is None else list(symbols)
    if len(values) != n_assets:
        raise ValueError(f"symbols length {len(values)} != n_assets {n_assets}")
    return values


[docs] class PositionsGenerator(DataGenerator[pl.DataFrame]): """Generate a long-form synthetic positions table.""" n_dates: PositiveInt = 252 n_assets: PositiveInt = 50 portfolio_value: PositiveFloat = 1_000_000.0 gross_exposure: PositiveFloat = 1.0 average_price: PositiveFloat = 100.0 price_vol: NonNegativeFloat = 0.02 seed: int | None = None start: date | None = None symbols: tuple[str, ...] | None = None currency: str | None = None exchange: str | None = None include_region: bool = False @model_validator(mode="after") def _validate_symbols(self): _symbols(self.n_assets, self.symbols) if self.currency is not None: Currency(self.currency) if self.exchange is not None: ExchangeCode(self.exchange) return self
[docs] def generate(self) -> pl.DataFrame: """Return ``[date, symbol, price, quantity, market_value, weight]``.""" rng = np.random.default_rng(self.seed) dates = _trading_date_range(self.n_dates, self.start, self.exchange) symbols = _symbols(self.n_assets, self.symbols) base_prices = self.average_price * rng.lognormal(mean=0.0, sigma=0.25, size=self.n_assets) returns = rng.normal(0.0002, self.price_vol, size=(self.n_dates, self.n_assets)) prices = base_prices * np.exp(np.cumsum(returns, axis=0)) raw_weights = rng.normal(0.0, 1.0, size=(self.n_dates, self.n_assets)) raw_weights /= np.abs(raw_weights).sum(axis=1, keepdims=True) weights = raw_weights * self.gross_exposure market_values = weights * self.portfolio_value quantities = market_values / prices frame = pl.DataFrame( { "date": np.repeat(np.array(dates, dtype="datetime64[D]"), self.n_assets), "symbol": np.tile(np.array(symbols), self.n_dates), "price": prices.reshape(-1), "quantity": quantities.reshape(-1), "market_value": market_values.reshape(-1), "weight": weights.reshape(-1), } ).with_columns(pl.col("date").cast(pl.Date)) if self.currency is not None: frame = frame.with_columns(pl.lit(self.currency).alias("currency")) if self.exchange is not None: frame = frame.with_columns(pl.lit(self.exchange).alias("exchange")) if self.include_region: record = exchange_record(self.exchange) region = None if record is None else record.region frame = frame.with_columns(pl.lit(region).alias("region")) return frame
[docs] class TransactionsGenerator(DataGenerator[pl.DataFrame]): """Generate a synthetic transaction log for post-trade tests.""" n_dates: PositiveInt = 252 n_assets: PositiveInt = 50 trades_per_day: PositiveInt = 25 average_price: PositiveFloat = 100.0 price_vol: NonNegativeFloat = 0.25 max_amount: PositiveInt = 1_000 commission: NonNegativeFloat = 1.0 fee_bps: NonNegativeFloat = 0.2 bps: NonNegativeFloat = 5.0 seed: int | None = None start: date | None = None symbols: tuple[str, ...] | None = None currency: str | None = None exchange: str | None = None include_region: bool = False @model_validator(mode="after") def _validate_symbols(self): _symbols(self.n_assets, self.symbols) if self.currency is not None: Currency(self.currency) if self.exchange is not None: ExchangeCode(self.exchange) return self
[docs] def generate(self) -> pl.DataFrame: """Return transaction rows with side labels and explicit costs.""" rng = np.random.default_rng(self.seed) dates = _trading_date_range(self.n_dates, self.start, self.exchange) n_rows = self.n_dates * self.trades_per_day transaction_intents = np.asarray(_TRANSACTION_INTENTS, dtype=object) symbols = _symbols(self.n_assets, self.symbols) timestamps: list[datetime] = [] calendar = Calendar.from_exchange(self.exchange) if self.exchange is not None else None for current_date in dates: if calendar is not None: sessions = calendar.sessions(current_date, current_date) if sessions: session_open, session_close = sessions[0] span_seconds = max(1, int((session_close - session_open).total_seconds())) offsets = np.sort(rng.integers(0, span_seconds + 1, size=self.trades_per_day)) timestamps.extend(session_open + timedelta(seconds=int(offset)) for offset in offsets) continue base = datetime.combine(current_date, time(9, 30), tzinfo=timezone.utc) offsets = np.sort(rng.integers(0, 6 * 60 * 60 + 30 * 60, size=self.trades_per_day)) timestamps.extend(base + timedelta(seconds=int(offset)) for offset in offsets) intent_indices = rng.integers(0, len(_TRANSACTION_INTENTS), size=n_rows) side_values = transaction_intents[intent_indices, 0] position_effect_values = transaction_intents[intent_indices, 1] raw_amounts = rng.integers(1, self.max_amount + 1, size=n_rows).astype(float) signed_amounts = np.where(side_values == Side.Buy.value, raw_amounts, -raw_amounts) prices = self.average_price * rng.lognormal(mean=0.0, sigma=self.price_vol, size=n_rows) notional = np.abs(signed_amounts) * prices frame = pl.DataFrame( { "timestamp": pl.Series(timestamps).cast(pl.Datetime("ms", "UTC")), "symbol": rng.choice(np.array(symbols), size=n_rows), "amount": signed_amounts, "price": prices, "side": side_values, "position_effect": position_effect_values, "notional": notional, "commission": np.full(n_rows, self.commission), "fees": notional * self.fee_bps / 10_000.0, "bps": np.full(n_rows, self.bps), } ) if self.currency is not None: frame = frame.with_columns(pl.lit(self.currency).alias("currency")) if self.exchange is not None: frame = frame.with_columns(pl.lit(self.exchange).alias("exchange")) if self.include_region: record = exchange_record(self.exchange) region = None if record is None else record.region frame = frame.with_columns(pl.lit(region).alias("region")) return frame
[docs] class OrdersGenerator(DataGenerator[pl.DataFrame]): """Generate enum-backed synthetic order fixtures.""" n_dates: PositiveInt = 252 n_assets: PositiveInt = 50 orders_per_day: PositiveInt = 25 average_price: PositiveFloat = 100.0 price_vol: NonNegativeFloat = 0.2 max_quantity: PositiveInt = 1_000 seed: int | None = None start: date | None = None symbols: tuple[str, ...] | None = None exchange: str | None = None currency: str | None = None include_region: bool = False @model_validator(mode="after") def _validate_symbols(self): _symbols(self.n_assets, self.symbols) if self.exchange is not None: ExchangeCode(self.exchange) if self.currency is not None: Currency(self.currency) return self
[docs] def generate(self) -> pl.DataFrame: """Return ``[timestamp, symbol, order_id, side, order_type, quantity, limit_price, order_status, time_in_force]``.""" rng = np.random.default_rng(self.seed) dates = _trading_date_range(self.n_dates, self.start, self.exchange) symbols = _symbols(self.n_assets, self.symbols) n_rows = self.n_dates * self.orders_per_day calendar = Calendar.from_exchange(self.exchange) if self.exchange is not None else None timestamps: list[datetime] = [] for current_date in dates: if calendar is not None: sessions = calendar.sessions(current_date, current_date) if sessions: session_open, session_close = sessions[0] span_seconds = max(1, int((session_close - session_open).total_seconds())) offsets = np.sort(rng.integers(0, span_seconds + 1, size=self.orders_per_day)) timestamps.extend(session_open + timedelta(seconds=int(offset)) for offset in offsets) continue base = datetime.combine(current_date, time(9, 30), tzinfo=timezone.utc) offsets = np.sort(rng.integers(0, 6 * 60 * 60 + 30 * 60, size=self.orders_per_day)) timestamps.extend(base + timedelta(seconds=int(offset)) for offset in offsets) frame = pl.DataFrame( { "timestamp": pl.Series(timestamps).cast(pl.Datetime("ms", "UTC")), "symbol": rng.choice(np.array(symbols), size=n_rows), "order_id": [f"ORD-{i:08d}" for i in range(n_rows)], "side": rng.choice(np.array([Side.Buy.value, Side.Sell.value]), size=n_rows), "order_type": rng.choice(np.array([OrderType.Market.value, OrderType.Limit.value]), size=n_rows, p=[0.55, 0.45]), "quantity": rng.integers(1, self.max_quantity + 1, size=n_rows), "limit_price": self.average_price * rng.lognormal(mean=0.0, sigma=self.price_vol, size=n_rows), "order_status": rng.choice( np.array( [ OrderStatus.New.value, OrderStatus.PartiallyFilled.value, OrderStatus.Filled.value, OrderStatus.Canceled.value, OrderStatus.Rejected.value, ] ), size=n_rows, p=[0.35, 0.25, 0.25, 0.10, 0.05], ), "time_in_force": rng.choice( np.array([TimeInForce.Day.value, TimeInForce.GoodTillCanceled.value]), size=n_rows, p=[0.80, 0.20], ), } ) if self.currency is not None: frame = frame.with_columns(pl.lit(self.currency).alias("currency")) if self.exchange is not None: frame = frame.with_columns(pl.lit(self.exchange).alias("exchange")) if self.include_region: record = exchange_record(self.exchange) region = None if record is None else record.region frame = frame.with_columns(pl.lit(region).alias("region")) return frame
[docs] class ExecutionsGenerator(DataGenerator[pl.DataFrame]): """Generate synthetic execution fixtures tied to synthetic orders.""" n_dates: PositiveInt = 252 n_assets: PositiveInt = 50 executions_per_day: PositiveInt = 30 average_price: PositiveFloat = 100.0 price_vol: NonNegativeFloat = 0.2 max_quantity: PositiveInt = 1_000 seed: int | None = None start: date | None = None symbols: tuple[str, ...] | None = None exchange: str | None = None currency: str | None = None include_region: bool = False @model_validator(mode="after") def _validate_symbols(self): _symbols(self.n_assets, self.symbols) if self.exchange is not None: ExchangeCode(self.exchange) if self.currency is not None: Currency(self.currency) return self
[docs] def generate(self) -> pl.DataFrame: """Return ``[timestamp, order_id, symbol, side, price, quantity, liquidity_flag]``.""" rng = np.random.default_rng(self.seed) dates = _trading_date_range(self.n_dates, self.start, self.exchange) symbols = _symbols(self.n_assets, self.symbols) n_rows = self.n_dates * self.executions_per_day calendar = Calendar.from_exchange(self.exchange) if self.exchange is not None else None timestamps: list[datetime] = [] for current_date in dates: if calendar is not None: sessions = calendar.sessions(current_date, current_date) if sessions: session_open, session_close = sessions[0] span_seconds = max(1, int((session_close - session_open).total_seconds())) offsets = np.sort(rng.integers(0, span_seconds + 1, size=self.executions_per_day)) timestamps.extend(session_open + timedelta(seconds=int(offset)) for offset in offsets) continue base = datetime.combine(current_date, time(9, 30), tzinfo=timezone.utc) offsets = np.sort(rng.integers(0, 6 * 60 * 60 + 30 * 60, size=self.executions_per_day)) timestamps.extend(base + timedelta(seconds=int(offset)) for offset in offsets) frame = pl.DataFrame( { "timestamp": pl.Series(timestamps).cast(pl.Datetime("ms", "UTC")), "execution_id": [f"EXE-{i:08d}" for i in range(n_rows)], "order_id": [f"ORD-{int(i / 2):08d}" for i in range(n_rows)], "symbol": rng.choice(np.array(symbols), size=n_rows), "side": rng.choice(np.array([Side.Buy.value, Side.Sell.value]), size=n_rows), "price": self.average_price * rng.lognormal(mean=0.0, sigma=self.price_vol, size=n_rows), "quantity": rng.integers(1, self.max_quantity + 1, size=n_rows), "liquidity_flag": rng.choice(np.array(["Added", "Removed", "Auction"]), size=n_rows, p=[0.4, 0.5, 0.1]), "time_in_force": rng.choice( np.array([TimeInForce.Day.value, TimeInForce.GoodTillCanceled.value]), size=n_rows, p=[0.80, 0.20], ), } ) if self.currency is not None: frame = frame.with_columns(pl.lit(self.currency).alias("currency")) if self.exchange is not None: frame = frame.with_columns(pl.lit(self.exchange).alias("exchange")) if self.include_region: record = exchange_record(self.exchange) region = None if record is None else record.region frame = frame.with_columns(pl.lit(region).alias("region")) return frame
[docs] def generate_positions( n_dates: int = 252, n_assets: int = 50, portfolio_value: float = 1_000_000.0, gross_exposure: float = 1.0, average_price: float = 100.0, price_vol: float = 0.02, seed: int | None = None, start: date | None = None, symbols: Sequence[str] | None = None, currency: str | None = None, exchange: str | None = None, include_region: bool = False, ) -> pl.DataFrame: """Generate a synthetic positions table.""" return PositionsGenerator( n_dates=n_dates, n_assets=n_assets, portfolio_value=portfolio_value, gross_exposure=gross_exposure, average_price=average_price, price_vol=price_vol, seed=seed, start=start, symbols=None if symbols is None else tuple(symbols), currency=currency, exchange=exchange, include_region=include_region, ).generate()
[docs] def generate_transactions( n_dates: int = 252, n_assets: int = 50, trades_per_day: int = 25, average_price: float = 100.0, price_vol: float = 0.25, max_amount: int = 1_000, commission: float = 1.0, fee_bps: float = 0.2, bps: float = 5.0, seed: int | None = None, start: date | None = None, symbols: Sequence[str] | None = None, currency: str | None = None, exchange: str | None = None, include_region: bool = False, ) -> pl.DataFrame: """Generate a synthetic transaction log.""" return TransactionsGenerator( n_dates=n_dates, n_assets=n_assets, trades_per_day=trades_per_day, average_price=average_price, price_vol=price_vol, max_amount=max_amount, commission=commission, fee_bps=fee_bps, bps=bps, seed=seed, start=start, symbols=None if symbols is None else tuple(symbols), currency=currency, exchange=exchange, include_region=include_region, ).generate()
[docs] def generate_orders( n_dates: int = 252, n_assets: int = 50, orders_per_day: int = 25, average_price: float = 100.0, price_vol: float = 0.2, max_quantity: int = 1_000, seed: int | None = None, start: date | None = None, symbols: Sequence[str] | None = None, currency: str | None = None, exchange: str | None = None, include_region: bool = False, ) -> pl.DataFrame: """Generate synthetic order fixtures.""" return OrdersGenerator( n_dates=n_dates, n_assets=n_assets, orders_per_day=orders_per_day, average_price=average_price, price_vol=price_vol, max_quantity=max_quantity, seed=seed, start=start, symbols=None if symbols is None else tuple(symbols), currency=currency, exchange=exchange, include_region=include_region, ).generate()
[docs] def generate_executions( n_dates: int = 252, n_assets: int = 50, executions_per_day: int = 30, average_price: float = 100.0, price_vol: float = 0.2, max_quantity: int = 1_000, seed: int | None = None, start: date | None = None, symbols: Sequence[str] | None = None, currency: str | None = None, exchange: str | None = None, include_region: bool = False, ) -> pl.DataFrame: """Generate synthetic execution fixtures.""" return ExecutionsGenerator( n_dates=n_dates, n_assets=n_assets, executions_per_day=executions_per_day, average_price=average_price, price_vol=price_vol, max_quantity=max_quantity, seed=seed, start=start, symbols=None if symbols is None else tuple(symbols), currency=currency, exchange=exchange, include_region=include_region, ).generate()