#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ bt_live_paper_runner.py ----------------------- Unified runner for BACKTEST, PAPER (db), PAPER-API (live prices, simulated fills), and LIVE (real orders on BingX via CCXT), using strategies from the `strategies/` folder. Key features - One config for all modes (YAML/JSON), same strategy_class across modes - PAPER (db): replays from a cache DB (price_indicators schema) using engine.* APIs - PAPER-API (NEW): pulls live prices from BingX (or a MockFetcher in --dry-run), simulates orders, writes Orders/Decisions/Equity to a session DB, and emits a cache-out DB (price_indicators) of exactly-what-we-saw - LIVE: mirrors your live bot flow (BingX + reduce-only brackets in hedge mode) - BACKTEST: delegates to backtester_core.py with the provided cfg Example Backtest: python3 bt_live_paper_runner.py --mode backtest --cfg configs/cs_C2_base_1h.yaml --limit-bars 500 Paper (DB): python3 bt_live_paper_runner.py --mode paper --paper-source db --cfg configs/cs_C2_base_1h.yaml \ --db combined_cache.db --results-dir paper_results --limit-bars 168 Paper (API, simulated orders + session DB + cache-out): python3 bt_live_paper_runner.py --mode paper --paper-source api --cfg configs/cs_C2_base_1h.yaml \ --exchange bingx --symbol-format usdtm --poll-sec 15 --bar-delay-sec 10 --limit_klines 180 \ --results-dir paper_api_results --orders-db paper_api_results/session.sqlite \ --session-db paper_api_results/session.sqlite \ --cache-out paper_api_results/combined_cache_session.db Paper (API DRY RUN, no internet, generates synthetic OHLCV, 1 step): python3 bt_live_paper_runner.py --mode paper --paper-source api --cfg dryrun_lenient.yaml \ --results-dir paper_api_dryrun --orders-db paper_api_dryrun/session.sqlite \ --session-db paper_api_dryrun/session.sqlite --cache-out paper_api_dryrun/combined_cache_session.db \ --dry-run --iterations 1 """ import os import sys import re import json import time import math import uuid import argparse import importlib import sqlite3 from datetime import datetime, timezone, timedelta from typing import Optional, List, Dict, Any import pandas as pd import numpy as np try: import yaml # optional except Exception: yaml = None # ---------- optional engine imports (from backtester core) ---------- def _try_import(mod_path, names: List[str]): mod = importlib.import_module(mod_path) return [getattr(mod, n) for n in names] EnginePortfolio = None build_md_slice = None load_cache = None try: EnginePortfolio, = _try_import("engine.portfolio", ["Portfolio"]) build_md_slice, load_cache = _try_import("engine.data", ["build_md_slice", "load_cache"]) except Exception: EnginePortfolio = None build_md_slice = None load_cache = None # ---------- helpers ---------- def load_yaml_or_json(path: str) -> dict: if not path: return {} try: if path.endswith(".json"): with open(path, "r", encoding="utf-8") as f: return json.load(f) if yaml is not None: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} except Exception as e: print(f"[cfg] failed to parse {path}: {e}", file=sys.stderr) # naive fallback cfg = {} try: for line in open(path, "r", encoding="utf-8").read().splitlines(): if ":" in line and not line.strip().startswith("#"): k, v = line.split(":", 1) vv = v.strip() if vv.lower() in ("true", "false"): cfg[k.strip()] = (vv.lower() == "true") else: try: if "." in vv or "e" in vv.lower(): cfg[k.strip()] = float(vv) else: cfg[k.strip()] = int(vv) except Exception: cfg[k.strip()] = vv except Exception: pass return cfg def mask(s: str) -> str: s = str(s or "") if len(s) <= 4: return "*" * len(s) return s[:2] + "*" * (len(s) - 4) + s[-2:] def sleep_ms(ms: int): time.sleep(max(0.0, float(ms) / 1000.0)) # ---------- strategy loading ---------- def load_strategy(path_cls: str, cfg: dict): mod_path, cls_name = path_cls.rsplit(".", 1) mod = importlib.import_module(mod_path) cls = getattr(mod, cls_name) return cls(cfg) # ---------- DB fallback loaders for PAPER (db) ---------- def fallback_load_cache_sqlite(db_path: str): con = sqlite3.connect(db_path) cur = con.cursor() cur.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [r[0] for r in cur.fetchall()] table = None for t in tables: try: cur.execute(f"PRAGMA table_info({t})") cols = [r[1].lower() for r in cur.fetchall()] if set(["symbol", "datetime_utc", "close"]).issubset(set(cols)): table = t break except Exception: continue if table is None: con.close() raise RuntimeError("No suitable table in DB (need symbol, datetime_utc, close)") df = pd.read_sql_query(f"SELECT * FROM {table}", con) con.close() df["datetime_utc"] = pd.to_datetime(df["datetime_utc"], utc=True, errors="coerce") df = df.dropna(subset=["datetime_utc"]).sort_values(["datetime_utc", "symbol"]) times = sorted(df["datetime_utc"].unique().tolist()) dfs = {} for sym, sub in df.groupby("symbol"): sub = sub.set_index("datetime_utc").sort_index() dfs[sym] = sub return dfs, times def safe_load_cache(db_path: str): if load_cache is not None: try: return load_cache(db_path) except Exception as e: print(f"[fallback] engine.data.load_cache failed: {e}", file=sys.stderr) return fallback_load_cache_sqlite(db_path) def safe_build_md_slice(dfs: dict, t): if build_md_slice is not None: try: return build_md_slice(dfs, t) except Exception as e: print(f"[fallback] engine.data.build_md_slice failed: {e}", file=sys.stderr) out = {} for sym, df in dfs.items(): row = df[df.index <= t].tail(1) if row is None or len(row) == 0: continue out[sym] = row.iloc[0].to_dict() return out # ---------- features ---------- def compute_feats(df: pd.DataFrame) -> pd.DataFrame: out = df.copy() prev_close = out["close"].shift(1) tr = pd.concat( [(out["high"] - out["low"]).abs(), (out["high"] - prev_close).abs(), (out["low"] - prev_close).abs()], axis=1 ).max(axis=1) atr = tr.ewm(alpha=1 / 14.0, adjust=False).mean() out["atr_ratio"] = (atr / out["close"]).replace([np.inf, -np.inf], np.nan).fillna(0.0) out["quote_volume"] = (out["volume"] * out["close"]).fillna(0.0) out["qv_24h"] = out["quote_volume"].rolling(24, min_periods=1).sum() out["dp6h"] = (out["close"] / out["close"].shift(6) - 1.0).fillna(0.0) out["dp12h"] = (out["close"] / out["close"].shift(12) - 1.0).fillna(0.0) # placeholders for compatibility for k in ("rsi", "stochastic", "mfi", "overbought_index", "gain_24h_before"): if k not in out.columns: out[k] = 0.0 return out # ---------- CCXT fetchers ---------- try: import ccxt # type: ignore except Exception: ccxt = None RATE_MS = 130 class CCXTFetcher: def __init__(self, exchange="bingx", symbol_format="usdtm", debug=False, logfile=""): if not ccxt: raise RuntimeError("ccxt is not installed. pip install 'ccxt<5'") self.debug = debug self.logfile = logfile api_k = os.environ.get("BINGX_KEY") or os.environ.get("API_KEY") api_s = os.environ.get("BINGX_SECRET") or os.environ.get("API_SECRET") opts = {"enableRateLimit": True, "timeout": 20000} if api_k and api_s: opts.update({"apiKey": api_k, "secret": api_s}) self.ex = getattr(ccxt, exchange)(opts) try: self.markets = self.ex.load_markets() except Exception as e: self.markets = {} print(f"[ccxt load_markets] {e}", file=sys.stderr) self.by_base: Dict[str, str] = {} for m in self.markets.values(): try: if m.get("swap") and m.get("quote") == "USDT": b = m.get("base") if b: self.by_base[b] = m["symbol"] except Exception: continue def resolve_symbol(self, s: str) -> Optional[str]: if s in self.markets: return s if s in self.by_base: return self.by_base[s] u = s.upper().replace("-", "/").replace("USDTUSDT", "USDT:USDT") u = u.replace(":USDTUSDT", ":USDT") for cand in (u, u.replace("/USDT", "/USDT:USDT"), u.replace("/USDT:USDT", "/USDT")): if cand in self.markets: return cand b = s.split("/", 1)[0].split("-", 1)[0].replace("USDT", "") if b in self.by_base: return self.by_base[b] return None def fetch_ohlcv_df(self, sym: str, timeframe="1h", limit=180) -> Optional[pd.DataFrame]: ccxt_sym = self.resolve_symbol(sym) or sym try: data = self.ex.fetch_ohlcv(ccxt_sym, timeframe=timeframe, limit=limit) sleep_ms(RATE_MS) except Exception as e: print(f"[fetch_ohlcv] {sym}: {e}", file=sys.stderr) return None if not data: return None df = pd.DataFrame(data, columns=["ts", "open", "high", "low", "close", "volume"]) df["datetime_utc"] = pd.to_datetime(df["ts"], unit="ms", utc=True) df = df.set_index("datetime_utc")[["open", "high", "low", "close", "volume"]].astype(float) return df def fetch_ticker_price(self, sym: str) -> Optional[float]: ccxt_sym = self.resolve_symbol(sym) or sym try: t = self.ex.fetch_ticker(ccxt_sym) sleep_ms(RATE_MS) p = float(t.get("last") or t.get("close") or 0.0) return p if p > 0 else None except Exception as e: print(f"[ticker] {sym}: {e}", file=sys.stderr) return None class MockFetcher: """Network-free fetcher for DRY RUN. Generates synthetic OHLCV for a small universe.""" def __init__(self, symbols=None): if symbols is None: symbols = ["BTC/USDT:USDT", "ETH/USDT:USDT", "SOL/USDT:USDT"] self.by_base = {s.split("/")[0]: s for s in symbols} def resolve_symbol(self, s: str): if s in self.by_base.values(): return s base = s.split("/", 1)[0].split("-", 1)[0].upper().replace("USDT", "") return self.by_base.get(base, list(self.by_base.values())[0]) def fetch_ohlcv_df(self, sym: str, timeframe="1h", limit=180): now = datetime.utcnow().replace(tzinfo=timezone.utc) if timeframe == "1h": idx = [now - timedelta(hours=limit - i) for i in range(limit)] else: idx = [now - timedelta(minutes=limit - i) for i in range(limit)] idx = pd.to_datetime(idx, utc=True) base = 100.0 + (hash(sym) % 300) steps = np.random.normal(0, 0.5, size=limit).cumsum() close = base + steps high = close + np.abs(np.random.normal(0.3, 0.2, size=limit)) low = close - np.abs(np.random.normal(0.3, 0.2, size=limit)) openp = close + np.random.normal(0, 0.2, size=limit) vol = np.abs(np.random.normal(100, 25, size=limit)) df = pd.DataFrame({"open": openp, "high": high, "low": low, "close": close, "volume": vol}, index=idx) return df def fetch_ticker_price(self, sym: str): df = self.fetch_ohlcv_df(sym, limit=2) return float(df["close"].iloc[-1]) # ---------- Orders table helpers ---------- def ensure_orders_db(path: str): os.makedirs(os.path.dirname(path) or ".", exist_ok=True) con = sqlite3.connect(path) cur = con.cursor() cur.execute("""CREATE TABLE IF NOT EXISTS orders( order_id TEXT PRIMARY KEY, ts_utc TEXT, bar_time_utc TEXT, mode TEXT, symbol TEXT, side TEXT, type TEXT, price REAL, qty REAL, status TEXT, reason TEXT, run_id TEXT, extra TEXT )""") con.commit() con.close() def insert_order_row(db_path: str, row: dict): con = sqlite3.connect(db_path) cur = con.cursor() cols = ["order_id", "ts_utc", "bar_time_utc", "mode", "symbol", "side", "type", "price", "qty", "status", "reason", "run_id", "extra"] vals = [row.get(c) for c in cols] cur.execute(f"INSERT OR REPLACE INTO orders({','.join(cols)}) VALUES({','.join(['?']*len(cols))})", vals) con.commit() con.close() # ---------- Session DB & Cache-out helpers ---------- def ensure_session_dbs(results_dir: str, session_db: str = "", cache_out: str = ""): sess = session_db or os.path.join(results_dir, "session.sqlite") cachep = cache_out or os.path.join(results_dir, "combined_cache_session.db") os.makedirs(os.path.dirname(sess) or ".", exist_ok=True) os.makedirs(os.path.dirname(cachep) or ".", exist_ok=True) con = sqlite3.connect(sess) cur = con.cursor() cur.execute("""CREATE TABLE IF NOT EXISTS config_snapshots( run_id TEXT, ts_utc TEXT, cfg_json TEXT, PRIMARY KEY(run_id, ts_utc) )""") cur.execute("""CREATE TABLE IF NOT EXISTS decisions( run_id TEXT, bar_time_utc TEXT, universe_size INTEGER, ranked_json TEXT, selected_json TEXT, PRIMARY KEY(run_id, bar_time_utc) )""") cur.execute("""CREATE TABLE IF NOT EXISTS equity( run_id TEXT, ts_utc TEXT, equity_usdt REAL, cash_usdt REAL, position_value_usdt REAL, realized_pnl_cum REAL, unrealized_pnl REAL, PRIMARY KEY(run_id, ts_utc) )""") con.commit() con.close() con2 = sqlite3.connect(cachep) cur2 = con2.cursor() cur2.execute("""CREATE TABLE IF NOT EXISTS price_indicators( symbol TEXT, datetime_utc TEXT, open REAL, high REAL, low REAL, close REAL, volume REAL, rsi REAL, stochastic REAL, mfi REAL, overbought_index REAL, atr_ratio REAL, gain_24h_before REAL, dp6h REAL, dp12h REAL, quote_volume REAL, qv_24h REAL, vol_surge_mult REAL, PRIMARY KEY(symbol, datetime_utc) )""") con2.commit() con2.close() return sess, cachep def write_config_snapshot(sess_path: str, run_id: str, cfg: dict): con = sqlite3.connect(sess_path) cur = con.cursor() cur.execute("INSERT OR REPLACE INTO config_snapshots(run_id, ts_utc, cfg_json) VALUES(?,?,?)", (run_id, datetime.utcnow().isoformat(), json.dumps(cfg))) con.commit() con.close() def write_decisions(sess_path: str, run_id: str, bar_time, ranked_list, selected_list): con = sqlite3.connect(sess_path) cur = con.cursor() cur.execute("INSERT OR REPLACE INTO decisions(run_id, bar_time_utc, universe_size, ranked_json, selected_json) VALUES(?,?,?,?,?)", (run_id, bar_time.isoformat(), int(len(ranked_list)), json.dumps(list(ranked_list)), json.dumps(list(selected_list)))) con.commit() con.close() def write_equity(sess_path: str, run_id: str, t, equity_dict: dict): con = sqlite3.connect(sess_path) cur = con.cursor() cur.execute("""INSERT OR REPLACE INTO equity(run_id, ts_utc, equity_usdt, cash_usdt, position_value_usdt, realized_pnl_cum, unrealized_pnl) VALUES(?,?,?,?,?,?,?)""", (run_id, t.isoformat(), float(equity_dict.get("equity", 0.0)), float(equity_dict.get("cash", 0.0)), float(equity_dict.get("position_value", 0.0)), float(equity_dict.get("realized_pnl_cum", 0.0)), float(equity_dict.get("unrealized_pnl", 0.0)))) con.commit() con.close() def cache_out_upsert(cache_path: str, symbol: str, feats_df: pd.DataFrame): con = sqlite3.connect(cache_path) cur = con.cursor() cols = ["symbol","datetime_utc","open","high","low","close","volume", "rsi","stochastic","mfi","overbought_index","atr_ratio","gain_24h_before", "dp6h","dp12h","quote_volume","qv_24h","vol_surge_mult"] placeholders = ",".join(["?"] * len(cols)) for idx, r in feats_df.iterrows(): row = [ symbol, pd.to_datetime(idx, utc=True).strftime("%Y-%m-%dT%H:%M:%S+00:00"), float(r.get("open", 0.0)), float(r.get("high", 0.0)), float(r.get("low", 0.0)), float(r.get("close", 0.0)), float(r.get("volume", 0.0)), float(r.get("rsi", 0.0)), float(r.get("stochastic", 0.0)), float(r.get("mfi", 0.0)), float(r.get("overbought_index", 0.0)), float(r.get("atr_ratio", 0.0)), float(r.get("gain_24h_before", 0.0)), float(r.get("dp6h", 0.0)), float(r.get("dp12h", 0.0)), float(r.get("quote_volume", 0.0)), float(r.get("qv_24h", 0.0)), float(r.get("vol_surge_mult", 0.0)) ] cur.execute(f"INSERT OR REPLACE INTO price_indicators({','.join(cols)}) VALUES({placeholders})", row) con.commit() con.close() def read_hour_cache_row(cache_path: str, symbol: str, dt_utc) -> dict: """Return feature dict for exact bar time from price_indicators, or {} if missing.""" try: con = sqlite3.connect(cache_path) cur = con.cursor() ts = dt_utc.isoformat() row = cur.execute( "SELECT open,high,low,close,volume,rsi,stochastic,mfi,overbought_index,atr_ratio,gain_24h_before,dp6h,dp12h,quote_volume,qv_24h,vol_surge_mult " "FROM price_indicators WHERE symbol=? AND datetime_utc=? LIMIT 1", (symbol, ts) ).fetchone() con.close() if not row: return {} keys = ["open","high","low","close","volume","rsi","stochastic","mfi","overbought_index","atr_ratio","gain_24h_before","dp6h","dp12h","quote_volume","qv_24h","vol_surge_mult"] return {k: float(v) for k, v in zip(keys, row)} except Exception: return {} # ---------- PAPER runner (DB replay) ---------- def run_paper(db_path: str, cfg: dict, results_dir: str, limit_bars: Optional[int] = None): assert EnginePortfolio is not None, "engine.portfolio.Portfolio unavailable (place script in repo root)" strat_path = cfg.get("strategy_class", "strategies.cross_sectional_rs.CrossSectionalRS") strat = load_strategy(strat_path, cfg) dfs, times = safe_load_cache(db_path) if limit_bars is not None and limit_bars > 0: times = times[-int(limit_bars):] port_cfg = { "initial_equity": float(cfg.get("initial_equity", cfg.get("start_cash", 200.0))), "fee_rate": float(cfg.get("fee_rate", 0.0006)), "slippage_per_side": float(cfg.get("slippage_per_side", cfg.get("slip_bps", 1.5) / 10000.0 if isinstance(cfg.get("slip_bps", 0), (int, float)) else 0.0003)), "tick_pct": float(cfg.get("tick_pct", 0.0001)), "position_notional": float(cfg.get("notional", 2.2)), "max_notional_frac": float(cfg.get("max_notional_frac", 0.5)), "funding_rate_hour": float(cfg.get("funding_rate_hour", 0.0)), } pf = EnginePortfolio(port_cfg) top_n = int(cfg.get("top_n", 4)) for t in times: md = safe_build_md_slice(dfs, t) # manage for pos in list(pf.positions): row = md.get(pos.symbol) if row is None: continue adj = strat.manage_position(t, pos.symbol, pos, row, ctx={"portfolio": pf}) if adj.action == "EXIT": pf.close(pos, t, row["close"], reason=adj.reason) elif adj.action == "MOVE_SL" and adj.new_stop is not None: pos.stop_price = adj.new_stop elif adj.action == "MOVE_TP" and adj.new_tp is not None: pos.take_profit = adj.new_tp # entries uni = strat.universe(t, md) ranked = strat.rank(t, md, uni)[:top_n] for sym in ranked: row = md.get(sym) if row is None: continue sig = strat.entry_signal(t, sym, row, ctx={"portfolio": pf}) if sig is None: continue if not pf.can_open(port_cfg): continue pf.open(symbol=sym, signal=sig, t=t, last_price=row["close"]) os.makedirs(results_dir, exist_ok=True) trades_csv = os.path.join(results_dir, "trades.csv") summary_csv = os.path.join(results_dir, "summary.csv") pf.save_trades(trades_csv) pf.save_summary(summary_csv) print(f"[paper-db] saved {trades_csv} and {summary_csv}") # ---------- PAPER-API runner (live prices, simulated fills, full logging) ---------- def run_paper_api(cfg: dict, args): assert EnginePortfolio is not None, "engine.portfolio.Portfolio unavailable" strat_path = cfg.get("strategy_class", "strategies.cross_sectional_rs.CrossSectionalRS") strat = load_strategy(strat_path, cfg) port_cfg = { "initial_equity": float(cfg.get("initial_equity", cfg.get("start_cash", 200.0))), "fee_rate": float(cfg.get("fee_rate", 0.0006)), "slippage_per_side": float(cfg.get("slippage_per_side", cfg.get("slip_bps", 1.5) / 10000.0 if isinstance(cfg.get("slip_bps", 0), (int, float)) else 0.0003)), "tick_pct": float(cfg.get("tick_pct", 0.0001)), "position_notional": float(cfg.get("notional", 2.2)), "max_notional_frac": float(cfg.get("max_notional_frac", 0.5)), "funding_rate_hour": float(cfg.get("funding_rate_hour", 0.0)), } pf = EnginePortfolio(port_cfg) use_mock = bool(getattr(args, "dry_run", False)) or (ccxt is None) fetcher = MockFetcher() if use_mock else CCXTFetcher(exchange=args.exchange, symbol_format=args.symbol_format, debug=args.debug) os.makedirs(args.results_dir, exist_ok=True) # DB & cache targets for LIVE session_db_path = getattr(args, "session_db", None) or getattr(args, "orders_db", None) or os.path.join(args.results_dir, "session.sqlite") cache_out_path = getattr(args, "cache_out", None) or os.path.join(args.results_dir, "combined_cache_session.db") ensure_orders_db(session_db_path) run_id = datetime.utcnow().strftime("LIVE_%Y%m%d_%H%M%S") write_config_snapshot(session_db_path, run_id, cfg) orders_db = args.orders_db or os.path.join(args.results_dir, "orders.sqlite") ensure_orders_db(orders_db) session_db_path, cache_out_path = ensure_session_dbs(args.results_dir, args.session_db, args.cache_out) run_id = datetime.utcnow().strftime("PA_%Y%m%d_%H%M%S") write_config_snapshot(session_db_path, run_id, cfg) top_n = int(cfg.get("top_n", 4)) print(f"[paper-api] polling every {args.poll_sec}s; entries at bar close +{args.bar_delay_sec}s; orders -> {orders_db}") last_bar_ts = None iters_left = getattr(args, "iterations", None) if getattr(args, "dry_run", False) else None while True: now = datetime.utcnow().replace(tzinfo=timezone.utc) bar_close = now.replace(minute=0, second=0, microsecond=0) if cfg.get("timeframe", "1h") == "1h" else now.replace(second=0, microsecond=0) if (last_bar_ts is None or bar_close > last_bar_ts) and (now - bar_close).total_seconds() >= args.bar_delay_sec: last_bar_ts = bar_close # Build real-time md universe = sorted(set(fetcher.by_base.values())) md = {} for ccxt_sym in universe: df = fetcher.fetch_ohlcv_df(ccxt_sym, timeframe=cfg.get("timeframe", "1h"), limit=max(60, args.limit_klines)) if df is None or len(df) < 30: continue feats_df = compute_feats(df) cache_out_upsert(cache_out_path, ccxt_sym, feats_df) md[ccxt_sym] = feats_df.iloc[-1].to_dict() print(".", end="", flush=True) # exits for pos in list(pf.positions): row = md.get(pos.symbol) if row is None: continue adj = strat.manage_position(bar_close, pos.symbol, pos, row, ctx={"portfolio": pf}) if adj.action == "EXIT": px = float(row.get("close") or 0.0) * (1 - port_cfg["slippage_per_side"]) # assume sell pf.close(pos, bar_close, px, reason=adj.reason) insert_order_row(orders_db, { "order_id": str(uuid.uuid4()), "ts_utc": datetime.utcnow().isoformat(), "bar_time_utc": bar_close.isoformat(), "mode": "paper_api", "symbol": pos.symbol, "side": "sell", "type": "market", "price": float(px), "qty": float(pos.qty), "status": "filled", "reason": adj.reason or "exit", "run_id": run_id, "extra": json.dumps({"sim": True}) }) # entries + decisions logging uni = strat.universe(bar_close, md) ranked = strat.rank(bar_close, md, uni)[:top_n] selected_syms = list(ranked) write_decisions(session_db_path, run_id, bar_close, ranked, selected_syms) for sym in ranked: row = md.get(sym) if row is None: continue sig = strat.entry_signal(bar_close, sym, row, ctx={"portfolio": pf}) if sig is None: continue if not pf.can_open(port_cfg): continue entry_px = float(row.get("close") or 0.0) * (1 + port_cfg["slippage_per_side"]) # assume buy pos = pf.open(symbol=sym, signal=sig, t=bar_close, last_price=entry_px) insert_order_row(orders_db, { "order_id": str(uuid.uuid4()), "ts_utc": datetime.utcnow().isoformat(), "bar_time_utc": bar_close.isoformat(), "mode": "paper_api", "symbol": sym, "side": "buy", "type": "market", "price": float(entry_px), "qty": float(getattr(pos, "qty", 0.0)), "status": "filled", "reason": "entry", "run_id": run_id, "extra": json.dumps({"sim": True}) }) # equity snapshot + save trades & summary each bar eq = { "equity": getattr(pf, "equity", 0.0), "cash": getattr(pf, "cash", 0.0), "position_value": getattr(pf, "position_value", 0.0), "realized_pnl_cum": getattr(pf, "realized_pnl_cum", 0.0), "unrealized_pnl": getattr(pf, "unrealized_pnl", 0.0) } write_equity(session_db_path, run_id, bar_close, eq) trades_csv = os.path.join(args.results_dir, "trades.csv") summary_csv = os.path.join(args.results_dir, "summary.csv") pf.save_trades(trades_csv) pf.save_summary(summary_csv) print(f"[paper-api] bar {bar_close.isoformat()} processed: positions={len(pf.positions)}") if iters_left is not None: iters_left -= 1 if iters_left <= 0: break time.sleep(args.poll_sec) # ---------- LIVE runner (BingX via CCXT; simplified bracket flow) ---------- def qty_for_notional(mkt: dict, notional: float, price: float): min_qty = float(mkt.get("limits", {}).get("amount", {}).get("min") or 0.0) step = float(mkt.get("precision", {}).get("amount") or 0.0) min_notional_req = float(mkt.get("limits", {}).get("cost", {}).get("min") or 0.0) if step and step > 0: qty = max(min_qty, math.floor(notional / max(price, 1e-9) / step) * step) else: qty = max(min_qty, notional / max(price, 1e-9)) return qty, min_notional_req, step, min_qty def place_open_long(fetcher: CCXTFetcher, sym: str, notional: float, price: float, position_mode: str): ccxt_sym = fetcher.resolve_symbol(sym) mkt = fetcher.markets.get(ccxt_sym, {}) qty, min_notional_req, step, min_qty = qty_for_notional(mkt, notional, price) if min_notional_req > notional + 1e-9: return {"ok": False, "skip_reason": f"min_notional {min_notional_req:.6g} > {notional:.6g}", "qty_rules": {"min_qty": min_qty, "step": step}} params = {"reduceOnly": False} if position_mode == "hedge": params["positionSide"] = "LONG" try: od = fetcher.ex.create_order(ccxt_sym, "market", "buy", qty, None, params) sleep_ms(RATE_MS) return {"ok": True, "order": od, "qty": qty, "params": params} except Exception as e: msg = str(e) if "Min amount" in msg and step > 0: try: qty = max(min_qty, qty + step) od = fetcher.ex.create_order(ccxt_sym, "market", "buy", qty, None, params) sleep_ms(RATE_MS) return {"ok": True, "order": od, "qty": qty, "params": params, "retry": True} except Exception as e2: return {"ok": False, "error": str(e2), "qty": qty, "params": params} return {"ok": False, "error": msg, "qty": qty, "params": params} def place_reduce_only(fetcher: CCXTFetcher, sym: str, side_close: str, qty: float, position_mode: str): ccxt_sym = fetcher.resolve_symbol(sym) params = {"reduceOnly": True} if position_mode == "hedge": params["positionSide"] = "LONG" try: od = fetcher.ex.create_order(ccxt_sym, "market", side_close, qty, None, params) sleep_ms(RATE_MS) return od except Exception as e: print(f"[live reduceOnly] {sym}: {e}", file=sys.stderr) return None def run_live(cfg: dict, args): assert ccxt is not None, "ccxt required for LIVE mode" strat_path = cfg.get("strategy_class", "strategies.cross_sectional_rs.CrossSectionalRS") strat = load_strategy(strat_path, cfg) # Auth api_k = os.environ.get("BINGX_KEY", "") api_s = os.environ.get("BINGX_SECRET", "") print(f'[LIVE API] key="{mask(api_k)}", secret="{mask(api_s)}"') fetcher = CCXTFetcher(exchange=args.exchange, symbol_format=args.symbol_format, debug=args.debug) top_n = int(cfg.get("top_n", 4)) notional = float(cfg.get("notional", 2.2)) position_mode = cfg.get("position_mode", "hedge") os.makedirs(args.results_dir, exist_ok=True) # LIVE session DB & cache setup session_db_path = getattr(args, "session_db", None) or getattr(args, "orders_db", None) or os.path.join(args.results_dir, "session.sqlite") cache_out_path = getattr(args, "cache_out", None) or os.path.join(args.results_dir, "combined_cache_session.db") try: ensure_orders_db(session_db_path) except Exception: pass run_id = datetime.utcnow().strftime("LIVE_%Y%m%d_%H%M%S") try: write_config_snapshot(session_db_path, run_id, cfg) except Exception: pass state_path = os.path.join(args.results_dir, "live_state.json") state = {} if os.path.exists(state_path): try: state = json.load(open(state_path, "r", encoding="utf-8")) except Exception: state = {} state.setdefault("positions", {}) last_bar_ts = None print(f"[live] polling every {args.poll_sec}s; entries at bar close +{args.bar_delay_sec}s") while True: now = datetime.utcnow().replace(tzinfo=timezone.utc) bar_close = now.replace(minute=0, second=0, microsecond=0) if (last_bar_ts is None or bar_close > last_bar_ts) and (now - bar_close).total_seconds() >= args.bar_delay_sec: last_bar_ts = bar_close # md universe = sorted(set(fetcher.by_base.values())) md = {} for ccxt_sym in universe: feats = {} if args.hour_cache == "load": feats = read_hour_cache_row(cache_out_path, ccxt_sym, bar_close) if not feats: df = fetcher.fetch_ohlcv_df(ccxt_sym, timeframe=cfg.get("timeframe", "1h"), limit=max(60, args.limit_klines)) if df is None or len(df) < 30: continue feats = compute_feats(df).iloc[-1].to_dict() # save to cache if asked if args.hour_cache in ("save", "load"): try: cache_out_upsert(cache_out_path, ccxt_sym, df) except Exception: pass md[ccxt_sym] = feats print(".", end="", flush=True) # pipeline uni = strat.universe(bar_close, md) ranked = strat.rank(bar_close, md, uni)[:top_n] opened = 0 selected_syms = list(ranked) write_decisions(session_db_path, run_id, bar_close, ranked, selected_syms) # simple equity snapshot (positions unrealized not tracked here) write_equity(session_db_path, run_id, bar_close, { "equity": 0.0, "cash": 0.0, "position_value": 0.0, "realized_pnl_cum": 0.0, "unrealized_pnl": 0.0 }) for sym in ranked: row = md.get(sym) if row is None: continue sig = strat.entry_signal(bar_close, sym, row, ctx={}) if sig is None or sig.side != "LONG": continue entry_px = fetcher.fetch_ticker_price(sym) or float(row.get("close") or 0.0) if not entry_px: continue res = place_open_long(fetcher, sym, notional, entry_px, position_mode) if not res.get("ok"): print(f"[open FAIL] {sym}: {res}", file=sys.stderr) continue qty = float(res["qty"]) # log order to DB and print try: insert_order_row(session_db_path, { "order_id": f"LIVE-{uuid.uuid4()}", "ts_utc": datetime.utcnow().isoformat(), "bar_time_utc": bar_close.isoformat(), "mode": "live", "symbol": sym, "side": "buy", "type": "market", "price": float(entry_px), "qty": float(qty), "status": "filled", "reason": "entry", "run_id": run_id, "extra": json.dumps({"notional": notional}) }) except Exception: pass print(f"[open OK] {sym} qty={qty:.6g} px={entry_px}", flush=True) state["positions"][sym] = {"entry": entry_px, "qty": qty, "ts": bar_close.isoformat()} json.dump(state, open(state_path, "w", encoding="utf-8"), indent=2) opened += 1 print(f"[live] opened={opened} at {bar_close.isoformat()}") time.sleep(args.poll_sec) # ---------- BACKTEST delegator ---------- def run_backtest(cfg_path: str, limit_bars: Optional[int] = None): bt_entry = os.path.join(os.getcwd(), "backtester_core.py") if not os.path.exists(bt_entry): raise SystemExit("backtester_core.py not found. Run from the backtester repo root.") cmd = [sys.executable, bt_entry, "--cfg", cfg_path] if limit_bars: cmd += ["--limit-bars", str(int(limit_bars))] print("[backtest] exec:", " ".join(cmd)) os.execvp(sys.executable, cmd) # ---------- CLI ---------- def main(): ap = argparse.ArgumentParser() ap.add_argument("--mode", choices=["backtest", "paper", "live"], required=True) ap.add_argument("--paper-source", choices=["db", "api"], default="db") ap.add_argument("--orders-db", default="") ap.add_argument("--dry-run", action="store_true") ap.add_argument("--iterations", type=int, default=1) ap.add_argument("--cfg", required=True, help="YAML/JSON config with strategy_class and params") ap.add_argument("--db", help="Cache DB path for PAPER (db)") ap.add_argument("--results-dir", default="runner_results") ap.add_argument("--limit-bars", type=int, default=0) # Session & cache-out ap.add_argument("--session-db", default="") ap.add_argument("--cache-out", default="") ap.add_argument("--hour-cache", choices=["off","save","load"], default="off", help="Current-hour cache: save (write) or load (read) features from cache_out for live speed-up") # LIVE / PAPER-API params ap.add_argument("--env-file", default="") ap.add_argument("--exchange", default="bingx") ap.add_argument("--symbol-format", default="usdtm") ap.add_argument("--poll-sec", type=int, default=15) ap.add_argument("--bar-delay-sec", type=int, default=10) ap.add_argument("--limit_klines", type=int, default=180) ap.add_argument("--debug", action="store_true") args = ap.parse_args() cfg = load_yaml_or_json(args.cfg) if args.mode == "backtest": return run_backtest(args.cfg, args.limit_bars if args.limit_bars > 0 else None) if args.mode == "paper": if args.paper_source == "api": return run_paper_api(cfg, args) else: if not args.db: raise SystemExit("--db is required for PAPER mode when --paper-source=db") return run_paper(args.db, cfg, args.results_dir, args.limit_bars if args.limit_bars > 0 else None) # LIVE if args.env_file and os.path.exists(args.env_file): for line in open(args.env_file, "r", encoding="utf-8").read().splitlines(): if "=" in line and not line.strip().startswith("#"): k, v = line.split("=", 1) os.environ[k.strip()] = v.strip() return run_live(cfg, args) if __name__ == "__main__": main()