from __future__ import annotations import copy import json import math import os import random import sqlite3 import time from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd from slippage_directional_model_v1 import predict_directional_slippage_bp def _tf_to_seconds(tf: str) -> int: tf = str(tf or '1m').strip().lower() try: n = int(tf[:-1]) except Exception: n = 1 return n * {'s': 1, 'm': 60, 'h': 3600, 'd': 86400, 'w': 604800}.get(tf[-1], 60) def _iso_from_ms(ms: int) -> str: return pd.to_datetime(ms, unit='ms', utc=True).isoformat() @dataclass class _Position: symbol: str side: str qty: float = 0.0 entry: float = 0.0 class VirtualExchange: id = 'virtual' name = 'VirtualExchange' def __init__( self, *, npz_path: str = '', db_path: str = '', symbols: Optional[List[str]] = None, default_timeframe: str = '1m', mode: str = 'hedge', initial_balance: float = 1000.0, maker_fee: float = 0.0, taker_fee: float = 0.0005, base_slippage_bps: float = 1.0, volume_impact_bps: float = 35.0, max_participation: float = 0.15, default_quote_volume: float = 25000.0, order_ttl_bars: int = 1, seed: int = 42, error_config: Optional[Dict[str, Any]] = None, debug: bool = False, dynamic_slippage_model: Optional[Dict[str, Any]] = None, broker_id: str = 'generic', ): if not npz_path and not db_path: raise ValueError('Provide npz_path or db_path') self.debug = bool(debug) self.mode = str(mode or 'hedge').lower() self.timeframes = {'1s': '1s', '15s': '15s', '30s': '30s', '1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m', '1h': '1h', '4h': '4h', '1d': '1d'} self.default_timeframe = default_timeframe self.enableRateLimit = True self.timeout = 0 self.maker_fee = float(maker_fee) self.taker_fee = float(taker_fee) self.base_slippage_bps = float(base_slippage_bps) self.volume_impact_bps = float(volume_impact_bps) self.max_participation = float(max_participation) self.default_quote_volume = float(default_quote_volume) self.order_ttl_bars = int(order_ttl_bars) self.random = random.Random(seed) self.error_config = dict(error_config or {}) self.dynamic_slippage_model = dict(dynamic_slippage_model or {}) self.broker_id = str(broker_id or 'generic').lower() self.data = self._load_data(npz_path=npz_path, db_path=db_path, symbols=symbols) self.symbols = sorted(self.data.keys()) self._cursor = {sym: 0 for sym in self.symbols} self._clock_ms = min(int(df['ts'].iloc[0]) for df in self.data.values()) if self.data else int(time.time() * 1000) self.markets = self._build_markets(self.symbols) self.balance_total = float(initial_balance) self.balance_free = float(initial_balance) self.positions: Dict[Tuple[str, str], _Position] = {} self.orders: Dict[str, Dict[str, Any]] = {} self.order_seq = 0 self.rejections_left = int(self.error_config.get('reject_next_n_orders', 0) or 0) @classmethod def from_env(cls, debug: bool = False) -> 'VirtualExchange': err_cfg = {} raw = os.getenv('VIRTUAL_EXCHANGE_ERRORS_JSON', '').strip() if raw: try: err_cfg = json.loads(raw) except Exception: err_cfg = {} syms = [s.strip() for s in os.getenv('VIRTUAL_EXCHANGE_SYMBOLS', '').split(',') if s.strip()] return cls( npz_path=os.getenv('VIRTUAL_EXCHANGE_NPZ', ''), db_path=os.getenv('VIRTUAL_EXCHANGE_DB', ''), symbols=syms or None, default_timeframe=os.getenv('VIRTUAL_EXCHANGE_TF', '1m'), mode=os.getenv('VIRTUAL_EXCHANGE_MODE', 'hedge'), initial_balance=float(os.getenv('VIRTUAL_EXCHANGE_BALANCE', '1000')), maker_fee=float(os.getenv('VIRTUAL_EXCHANGE_MAKER_FEE', '0')), taker_fee=float(os.getenv('VIRTUAL_EXCHANGE_TAKER_FEE', '0.0005')), base_slippage_bps=float(os.getenv('VIRTUAL_EXCHANGE_BASE_SLIP_BPS', '1.0')), volume_impact_bps=float(os.getenv('VIRTUAL_EXCHANGE_VOL_IMPACT_BPS', '35.0')), max_participation=float(os.getenv('VIRTUAL_EXCHANGE_MAX_PARTICIPATION', '0.15')), default_quote_volume=float(os.getenv('VIRTUAL_EXCHANGE_DEFAULT_QV', '25000.0')), order_ttl_bars=int(os.getenv('VIRTUAL_EXCHANGE_ORDER_TTL_BARS', '1')), seed=int(os.getenv('VIRTUAL_EXCHANGE_SEED', '42')), error_config=err_cfg, debug=debug, dynamic_slippage_model=(json.loads(os.getenv('VIRTUAL_EXCHANGE_DYNAMIC_SLIPPAGE_JSON', '{}')) if os.getenv('VIRTUAL_EXCHANGE_DYNAMIC_SLIPPAGE_JSON', '').strip() else None), broker_id=os.getenv('VIRTUAL_EXCHANGE_BROKER_ID', os.getenv('VIRTUAL_EXCHANGE_EXCHANGE', 'generic')), ) def _log(self, *parts): if self.debug: print('[virtual-ex]', *parts, flush=True) def _parse_symbol(self, sym: str) -> Tuple[str, str]: s = str(sym) if '/' in s: base, rest = s.split('/', 1) return base, rest.split(':')[0] if s.endswith('USDT'): return s[:-4], 'USDT' return s, 'USDT' def _build_markets(self, symbols: List[str]) -> Dict[str, Dict[str, Any]]: out: Dict[str, Dict[str, Any]] = {} for sym in symbols: base, quote = self._parse_symbol(sym) out[sym] = { 'id': sym.replace('/', '').replace(':', ''), 'symbol': sym, 'base': base, 'quote': quote, 'active': True, 'swap': True, 'future': True, 'linear': True, 'precision': {'amount': 0.001, 'price': 0.0001}, 'limits': {'amount': {'min': 0.001, 'step': 0.001}, 'price': {'step': 0.0001}, 'cost': {'min': 1.0}}, 'info': {'positionMode': self.mode, 'tickSize': '0.0001', 'stepSize': '0.001', 'minQty': '0.001'}, } return out def _load_data(self, *, npz_path: str, db_path: str, symbols: Optional[List[str]]) -> Dict[str, pd.DataFrame]: if npz_path: return self._load_from_npz(npz_path, symbols) return self._load_from_db(db_path, symbols) def _load_from_npz(self, path: str, symbols: Optional[List[str]]) -> Dict[str, pd.DataFrame]: z = np.load(path, allow_pickle=True) files = set(z.files) out: Dict[str, pd.DataFrame] = {} if {'symbol', 'timestamp_s', 'close'}.issubset(files): sym = str(z['symbol'].item() if getattr(z['symbol'], 'shape', ()) == () else z['symbol'][0]) if symbols and sym not in symbols: return {} ts = z['timestamp_s'].astype('int64') * 1000 close = z['close'].astype('float64') if {'open', 'high', 'low', 'volume'}.issubset(files): o = z['open'].astype('float64'); h = z['high'].astype('float64'); l = z['low'].astype('float64'); v = z['volume'].astype('float64') else: o = h = l = close.copy(); v = np.zeros_like(close) out[sym] = pd.DataFrame({'ts': ts, 'open': o, 'high': h, 'low': l, 'close': close, 'volume': v}) return out req = {'symbols', 'offsets', 'timestamp_s', 'close'} if not req.issubset(files): raise ValueError(f'Unsupported NPZ format: {sorted(files)}') syms = [str(s) for s in z['symbols'].tolist()] offs = z['offsets'].astype('int64').tolist() if len(offs) == len(syms): offs = offs + [len(z['close'])] ts_all = z['timestamp_s'].astype('int64') * 1000 close_all = z['close'].astype('float64') open_all = z['open'].astype('float64') if 'open' in files else close_all high_all = z['high'].astype('float64') if 'high' in files else close_all low_all = z['low'].astype('float64') if 'low' in files else close_all vol_all = z['volume'].astype('float64') if 'volume' in files else np.zeros_like(close_all) keep = set(symbols or syms) for i, sym in enumerate(syms): if sym not in keep: continue a, b = offs[i], offs[i+1] out[sym] = pd.DataFrame({'ts': ts_all[a:b], 'open': open_all[a:b], 'high': high_all[a:b], 'low': low_all[a:b], 'close': close_all[a:b], 'volume': vol_all[a:b]}) return out def _load_from_db(self, path: str, symbols: Optional[List[str]]) -> Dict[str, pd.DataFrame]: con = sqlite3.connect(path) q = 'SELECT symbol, datetime_utc, open, high, low, close, volume FROM price_indicators' params: List[Any] = [] if symbols: q += ' WHERE symbol IN (%s)' % ','.join(['?'] * len(symbols)) params.extend(symbols) q += ' ORDER BY symbol ASC, datetime_utc ASC' df = pd.read_sql_query(q, con, params=params) con.close() out: Dict[str, pd.DataFrame] = {} for sym, part in df.groupby('symbol', sort=True): ts = pd.to_datetime(part['datetime_utc'], utc=True).astype('int64') // 10**6 out[str(sym)] = pd.DataFrame({'ts': ts.astype('int64').to_numpy(), 'open': part['open'].astype('float64').to_numpy(), 'high': part['high'].astype('float64').to_numpy(), 'low': part['low'].astype('float64').to_numpy(), 'close': part['close'].astype('float64').to_numpy(), 'volume': part['volume'].astype('float64').to_numpy()}) return out def load_markets(self) -> Dict[str, Dict[str, Any]]: return self.markets def market(self, symbol: str) -> Dict[str, Any]: return self.markets[symbol] def parse_timeframe(self, tf: str) -> int: return _tf_to_seconds(tf) def milliseconds(self) -> int: return int(self._clock_ms) def resolve_symbol(self, sym: str) -> Optional[str]: if sym in self.markets: return sym u = str(sym).upper().replace('-', '/') for cand in (u, u.replace('/USDT', '/USDT:USDT'), u.replace('/USDT:USDT', '/USDT')): if cand in self.markets: return cand base = u.split('/', 1)[0].replace('USDT', '') for s in self.symbols: if s.startswith(base + '/'): return s return None def _infer_base_tf_seconds(self, symbol: str) -> int: df = self.data[symbol] if len(df) < 2: return _tf_to_seconds(self.default_timeframe) diffs = np.diff(df['ts'].to_numpy()[: min(len(df), 1000)]) diffs = diffs[diffs > 0] if len(diffs) == 0: return _tf_to_seconds(self.default_timeframe) return max(1, int(np.median(diffs) // 1000)) def set_cursor(self, symbol: str, index: int) -> None: sym = self.resolve_symbol(symbol) or symbol n = len(self.data[sym]) self._cursor[sym] = max(0, min(int(index), n - 1)) self._clock_ms = int(self.data[sym]['ts'].iloc[self._cursor[sym]]) self._expire_open_orders(sym) def current_bar(self, symbol: str) -> Dict[str, Any]: sym = self.resolve_symbol(symbol) or symbol row = self.data[sym].iloc[self._cursor[sym]] return {'symbol': sym, 'timestamp': int(row['ts']), 'open': float(row['open']), 'high': float(row['high']), 'low': float(row['low']), 'close': float(row['close']), 'volume': float(row['volume'])} def fetch_ohlcv(self, symbol: str, timeframe: str = '1m', since: Optional[int] = None, limit: Optional[int] = None): sym = self.resolve_symbol(symbol) or symbol base = self.data[sym].copy() base = base[base['ts'] <= self._clock_ms] if since is not None: base = base[base['ts'] >= int(since)] if base.empty: return [] tf = str(timeframe or self.default_timeframe) base_sec = self._infer_base_tf_seconds(sym) req_sec = _tf_to_seconds(tf) if req_sec < base_sec: raise ValueError(f'cannot downsample below source timeframe {base_sec}s -> {req_sec}s') if req_sec > base_sec: slot_ms = req_sec * 1000 work = base.copy() work['slot'] = (work['ts'] // slot_ms) * slot_ms base = work.groupby('slot', as_index=False).agg({'ts': 'max', 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}) if limit is not None: base = base.tail(int(limit)) return [[int(r.ts), float(r.open), float(r.high), float(r.low), float(r.close), float(r.volume)] for r in base.itertuples(index=False)] def _synthetic_spread_bps(self, symbol: str) -> float: bar = self.current_bar(symbol) open_px = max(float(bar.get('open') or bar.get('close') or 0.0), 1e-12) range_bp = 10000.0 * (float(bar.get('high') or open_px) - float(bar.get('low') or open_px)) / open_px volume = max(float(bar.get('volume') or 0.0), 1.0) spread_bp = 0.8 + 0.12 * max(range_bp, 0.0) + 0.25 * (1.0 / max(math.log1p(volume), 1e-6)) return float(max(0.5, min(spread_bp, 25.0))) def fetch_order_book(self, symbol: str, limit: Optional[int] = None): sym = self.resolve_symbol(symbol) or symbol bar = self.current_bar(sym) mid = float(bar['close']) spread_bp = self._synthetic_spread_bps(sym) spread_abs = mid * spread_bp / 10000.0 best_bid = max(mid - spread_abs / 2.0, 1e-12) best_ask = mid + spread_abs / 2.0 base_qty = max(float(bar.get('volume') or 0.0) / 40.0, 10.0) depth_mult = [1.0, 1.15, 1.3, 1.5, 1.7, 1.9, 2.15, 2.4, 2.7, 3.0] lim = int(limit or 10) bids = [] asks = [] tick = max(mid * 0.0002, 1e-6) for i in range(min(lim, len(depth_mult))): qty = base_qty * depth_mult[i] bids.append([round(best_bid - i * tick, 8), round(qty, 6)]) asks.append([round(best_ask + i * tick, 8), round(qty, 6)]) return {'symbol': sym, 'timestamp': bar['timestamp'], 'datetime': _iso_from_ms(bar['timestamp']), 'bids': bids, 'asks': asks, 'nonce': int(bar['timestamp'])} def fetch_ticker(self, symbol: str) -> Dict[str, Any]: bar = self.current_bar(symbol) book = self.fetch_order_book(symbol, 1) bid = float(book['bids'][0][0]) if book.get('bids') else bar['close'] ask = float(book['asks'][0][0]) if book.get('asks') else bar['close'] return {'symbol': bar['symbol'], 'timestamp': bar['timestamp'], 'datetime': _iso_from_ms(bar['timestamp']), 'last': bar['close'], 'close': bar['close'], 'bid': bid, 'ask': ask, 'info': {'markPrice': bar['close'], 'spreadBps': self._synthetic_spread_bps(symbol)}} def fetch_positions(self, symbols: Optional[List[str]] = None): rows = [] keep = set(symbols or self.symbols) for (sym, side), pos in sorted(self.positions.items()): if sym not in keep or pos.qty <= 0: continue mark = self.fetch_ticker(sym)['last'] unreal = (mark - pos.entry) * pos.qty if side == 'LONG' else (pos.entry - mark) * pos.qty rows.append({'symbol': sym, 'side': side.lower(), 'contracts': pos.qty, 'entryPrice': pos.entry, 'entry': pos.entry, 'unrealizedPnl': unreal, 'info': {'positionSide': side, 'availableAmt': pos.qty, 'contracts': pos.qty}}) return rows def fetch_balance(self) -> Dict[str, Any]: equity = self.balance_free total_upnl = sum(float(p.get('unrealizedPnl') or 0.0) for p in self.fetch_positions()) equity += total_upnl return {'total': {'USDT': equity}, 'free': {'USDT': self.balance_free}, 'used': {'USDT': max(0.0, self.balance_total - self.balance_free)}, 'equity': equity, 'info': {'equity': equity, 'total': equity}} def fetch_open_orders(self, symbol: Optional[str] = None): out = [] for od in self.orders.values(): if od['status'] != 'open': continue if symbol and od['symbol'] != symbol: continue out.append(copy.deepcopy(od)) return out def create_order(self, symbol: str, type: str, side: str, amount: float, price: Optional[float] = None, params: Optional[Dict[str, Any]] = None): params = dict(params or {}) sym = self.resolve_symbol(symbol) or symbol side = str(side).lower() qty = float(amount or 0.0) if qty <= 0: raise RuntimeError('amount must be > 0') if self.rejections_left > 0: self.rejections_left -= 1 raise RuntimeError('VirtualExchange: injected create_order rejection') prob = float(self.error_config.get('reject_probability', 0.0) or 0.0) if prob > 0 and self.random.random() < prob: raise RuntimeError('VirtualExchange: probabilistic create_order rejection') if self.mode.startswith('hedge'): ps = str(params.get('positionSide') or '').upper() if ps not in {'LONG', 'SHORT'}: raise RuntimeError('positionSide required in hedge mode') else: if params.get('positionSide'): raise RuntimeError('positionSide not allowed in one-way mode') reduce_only = bool(params.get('reduceOnly')) if reduce_only and self.mode.startswith('hedge') and self.error_config.get('simulate_bingx_reduceonly_reject', False): raise RuntimeError("bingx {\"code\":109400,\"msg\":\"In the Hedge mode, the 'ReduceOnly' field can not be filled.\",\"data\":{}}") order_id = f'vex-{self.order_seq+1:08d}' self.order_seq += 1 fillable, exec_px, slip_bps, reason = self._decide_fill(sym, side, qty, requested_price=price) ts = self.milliseconds() ob = self.fetch_order_book(sym, 10) best_bid = float(ob['bids'][0][0]) if ob.get('bids') else None best_ask = float(ob['asks'][0][0]) if ob.get('asks') else None od = {'id': order_id, 'orderId': order_id, 'clientOrderId': params.get('clientOrderId') or order_id, 'symbol': sym, 'type': str(type).lower(), 'side': side, 'amount': qty, 'remaining': 0.0 if fillable else qty, 'filled': qty if fillable else 0.0, 'average': exec_px if fillable else None, 'price': exec_px if fillable else price, 'status': 'closed' if fillable else 'open', 'timestamp': ts, 'datetime': _iso_from_ms(ts), 'reduceOnly': reduce_only, 'info': {'positionSide': params.get('positionSide') or 'BOTH', 'reduceOnly': reduce_only, 'slippageBps': slip_bps, 'reason': reason, 'bestBid': best_bid, 'bestAsk': best_ask, 'spreadBps': self._synthetic_spread_bps(sym)}, '_bar_index': self._cursor[sym], '_ttl_bars': self.order_ttl_bars} self.orders[order_id] = od if fillable: fee = abs(exec_px * qty) * self.taker_fee self._apply_fill(sym, side=side, qty=qty, px=exec_px, reduce_only=reduce_only, position_side=str(params.get('positionSide') or '').upper() or None) self.balance_free -= fee od['fee'] = {'cost': fee, 'currency': 'USDT'} else: self._log('order-open', order_id, sym, side, qty, reason) return copy.deepcopy(od) def fetch_order(self, id: str, symbol: Optional[str] = None): return copy.deepcopy(self.orders[str(id)]) def cancel_order(self, id: str, symbol: Optional[str] = None, params: Optional[Dict[str, Any]] = None): od = self.orders[str(id)] if od['status'] == 'open': od['status'] = 'canceled' od['remaining'] = od['amount'] - od.get('filled', 0.0) od['info']['reason'] = 'canceled' return copy.deepcopy(od) def _expire_open_orders(self, symbol: str): sym = self.resolve_symbol(symbol) or symbol cur_idx = self._cursor[sym] for od in self.orders.values(): if od['symbol'] != sym or od['status'] != 'open': continue age = cur_idx - int(od.get('_bar_index') or cur_idx) if age >= int(od.get('_ttl_bars') or 1): od['status'] = 'canceled' od['info']['reason'] = 'timeout' od['remaining'] = od['amount'] - od.get('filled', 0.0) def _predict_dynamic_slippage_bps(self, bar: Dict[str, Any], side: str, qty: float, requested_price: float, is_exit: bool = False) -> float: if not self.dynamic_slippage_model: return 0.0 kind = str(self.dynamic_slippage_model.get('kind', 'linear_bp')) action = 'CLOSE' if is_exit else 'OPEN' # convert exchange-side buy/sell + exit/open into strategy-side long/short direction used by model if str(side).lower() == 'buy': model_side = 'SHORT' if is_exit else 'LONG' else: model_side = 'LONG' if is_exit else 'SHORT' if kind == 'directional_knn_linear': row = { 'open': float(bar.get('open') or bar.get('close') or 0.0), 'high': float(bar.get('high') or bar.get('close') or 0.0), 'low': float(bar.get('low') or bar.get('close') or 0.0), 'close': float(bar.get('close') or 0.0), 'volume': float(bar.get('volume') or 0.0), 'quote_volume': max(float(bar.get('close') or 0.0) * float(bar.get('volume') or 0.0), self.default_quote_volume), } return float(predict_directional_slippage_bp(self.dynamic_slippage_model, row, model_side, action, float(qty))) open_px = max(float(bar.get('open') or bar.get('close') or 0.0), 1e-12) close_px = float(bar.get('close') or open_px) high_px = float(bar.get('high') or close_px) low_px = float(bar.get('low') or close_px) volume = float(bar.get('volume') or 0.0) quote_vol = max(volume * max(close_px, 1e-12), self.default_quote_volume) participation = abs(float(qty) * requested_price) / max(quote_vol, 1e-12) signed_body_bp = 10000.0 * (close_px - open_px) / open_px range_bp = 10000.0 * (high_px - low_px) / open_px feats = { 'log_volume': math.log1p(max(volume, 0.0)), 'log_quote_volume': math.log1p(max(quote_vol, 0.0)), 'signed_body_bp': signed_body_bp, 'range_bp': range_bp, 'participation': participation, 'log_participation': math.log(max(participation, 1e-12)), 'side_long': 1.0 if side == 'buy' else 0.0, 'side_short': 0.0 if side == 'buy' else 1.0, 'is_open': 0.0 if is_exit else 1.0, 'is_exit': 1.0 if is_exit else 0.0, 'side_x_body_signed_bp': signed_body_bp * (1.0 if side == 'buy' else -1.0), 'side_x_range_bp': range_bp * (1.0 if side == 'buy' else -1.0), } val = float(self.dynamic_slippage_model.get('base_bp', 0.0)) for k, w in (self.dynamic_slippage_model.get('coefficients') or {}).items(): val += float(w) * float(feats.get(k, 0.0)) clip_min = float(self.dynamic_slippage_model.get('clip_min_bp', 0.0)) clip_max = float(self.dynamic_slippage_model.get('clip_max_bp', 1000.0)) return float(np.clip(val, clip_min, clip_max)) def _decide_fill(self, symbol: str, side: str, qty: float, requested_price: Optional[float] = None): bar = self.current_bar(symbol) px0 = float(bar['close']) if requested_price is None else float(requested_price) notional = abs(px0 * qty) quote_vol = max(float(bar.get('volume') or 0.0) * max(float(bar['close']), 1e-12), self.default_quote_volume) participation = 0.0 if quote_vol <= 0 else notional / quote_vol if participation > self.max_participation: return False, None, None, f'participation {participation:.4f} > {self.max_participation:.4f}' slip_bps = self.base_slippage_bps + self.volume_impact_bps * participation slip_bps = max(slip_bps, self._predict_dynamic_slippage_bps(bar, side, qty, px0, is_exit=False)) max_slip = self.error_config.get('max_slippage_bps') if max_slip is not None and slip_bps > float(max_slip): return False, None, slip_bps, f'slippage {slip_bps:.2f}bp > max {float(max_slip):.2f}bp' hi = float(bar['high']); lo = float(bar['low']); close = float(bar['close']) if side == 'buy': exec_px = min(hi if hi > 0 else close, close * (1.0 + slip_bps / 10000.0)) else: exec_px = max(lo if lo > 0 else close, close * (1.0 - slip_bps / 10000.0)) return True, exec_px, slip_bps, 'filled' def _apply_fill(self, symbol: str, side: str, qty: float, px: float, reduce_only: bool, position_side: Optional[str]): side_up = str(position_side or ('LONG' if side == 'buy' else 'SHORT')).upper() if reduce_only: if self.mode.startswith('hedge'): key = (symbol, side_up) pos = self.positions.get(key) if not pos: return close_qty = min(pos.qty, qty) pnl = (px - pos.entry) * close_qty if side_up == 'LONG' else (pos.entry - px) * close_qty self.balance_free += pnl pos.qty -= close_qty if pos.qty <= 1e-12: self.positions.pop(key, None) else: self.positions[key] = pos return if self.mode.startswith('hedge'): key = (symbol, side_up) pos = self.positions.get(key) if pos is None or pos.qty <= 0: self.positions[key] = _Position(symbol=symbol, side=side_up, qty=qty, entry=px) else: new_qty = pos.qty + qty pos.entry = ((pos.entry * pos.qty) + (px * qty)) / max(new_qty, 1e-12) pos.qty = new_qty self.positions[key] = pos