#!/usr/bin/env python3 from __future__ import annotations """ Evidence-driven CL LP algorithm backtester. This is intentionally not a lower/upper grid tuner. It evaluates fixed policy families inspired by the evidence checklist in "DEX-алгоритми з реальною доказовістю.pdf": - realized volatility and drift, not chart indicators - swap-flow/fee density gate - asymmetric inventory shield during directional regimes - explicit capacity share metrics - low rebalance frequency to avoid rebalance drag """ import argparse import csv import json import math from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Tuple import numpy as np from cl_fee_replay_fast_npz_v3 import ( finish_summary, filter_time, liquidity_for_capital, liquidity_share, load_npz, parse_float_list, parse_iso_ts, score_row, sqrt_raw_token1_per_token0_from_price, Strategy, unit_value_one_from_sqrt, unit_value_curve_from_sqrt, ) SCRIPT_VERSION = "cl_adaptive_algorithmic_lp_v1_2026_05_05" @dataclass(frozen=True) class PolicyDecision: deploy: bool lower_pct: float upper_pct: float reason: str @dataclass(frozen=True) class AlgoPolicy: name: str rebalance_hours: float lookback_hours: float gas_usd: float = 0.0 swap_cost_bps: float = 0.0 def parse_fee_specs(spec: str) -> List[Tuple[str, float]]: out: List[Tuple[str, float]] = [] for item in str(spec).split(","): item = item.strip() if not item: continue if ":" not in item: raise ValueError(f"Bad fee spec: {item}; expected name:rate") name, rate = item.split(":", 1) out.append((name, float(rate))) return out def ts_to_iso(ts: int) -> str: return datetime.fromtimestamp(int(ts), tz=timezone.utc).isoformat().replace("+00:00", "Z") def robust_pct(x: np.ndarray, q: float, default: float) -> float: if len(x) == 0: return default v = float(np.nanpercentile(x, q)) if not np.isfinite(v): return default return v def window_features(price: np.ndarray, input_usd: np.ndarray, ts: np.ndarray, end_idx: int, lookback_hours: float) -> Dict[str, float]: end_ts = int(ts[end_idx]) start_ts = end_ts - int(lookback_hours * 3600) lo = int(np.searchsorted(ts, start_ts, side="left")) lo = min(max(0, lo), end_idx) p = price[lo : end_idx + 1] flow = input_usd[lo : end_idx + 1] t = ts[lo : end_idx + 1] if len(p) < 3: return { "drift_pct": 0.0, "path_pct": 0.0, "vol_pct": 0.0, "move_p95_pct": 1.0, "flow_usd_per_hour": 0.0, "events_per_hour": 0.0, "toxicity": 0.0, } lr = np.diff(np.log(np.maximum(p, 1e-300))) abs_lr = np.abs(lr) drift_pct = float((p[-1] / p[0] - 1.0) * 100.0) path_pct = float(np.sum(abs_lr) * 100.0) vol_pct = float(np.std(lr) * math.sqrt(max(1, len(lr))) * 100.0) move_p95_pct = robust_pct(abs_lr * 100.0, 95, 1.0) hours = max(1.0 / 60.0, (int(t[-1]) - int(t[0])) / 3600.0) toxicity = abs(drift_pct) / max(path_pct, 1e-9) return { "drift_pct": drift_pct, "path_pct": path_pct, "vol_pct": vol_pct, "move_p95_pct": move_p95_pct, "flow_usd_per_hour": float(np.sum(flow) / hours), "events_per_hour": float(len(p) / hours), "toxicity": float(min(1.0, toxicity)), } def enrich_multitimeframe_features(feat: Dict[str, float], price: np.ndarray, input_usd: np.ndarray, ts: np.ndarray, end_idx: int, policy: AlgoPolicy) -> Dict[str, float]: if policy.name != "pdf_ensemble_selector_v8": return feat out = dict(feat) for hours in (6.0, 12.0, 168.0): sub = window_features(price, input_usd, ts, end_idx, hours) prefix = f"s{int(hours)}_" for key, value in sub.items(): out[prefix + key] = value return out def decide(policy: AlgoPolicy, feat: Dict[str, float], fee_rate: float, capital: float) -> PolicyDecision: drift = feat["drift_pct"] path = max(feat["path_pct"], 1e-9) vol = feat["vol_pct"] flow = feat["flow_usd_per_hour"] events_h = feat["events_per_hour"] toxicity = feat["toxicity"] move_p95 = max(0.01, feat["move_p95_pct"]) fee_budget_pct_per_day = fee_rate * flow * 24.0 / max(capital, 1e-9) * 100.0 trend = drift / path vol_lvr_budget = 100.0 * (vol / 100.0) ** 2 if policy.name in {"pdf_ensemble_selector_v1", "pdf_ensemble_selector_v2", "pdf_ensemble_selector_v3", "pdf_ensemble_selector_v4", "pdf_ensemble_selector_v5", "pdf_ensemble_selector_v6", "pdf_ensemble_selector_v7", "pdf_ensemble_selector_v8"}: if events_h < 1.0 or fee_budget_pct_per_day < 0.03: return PolicyDecision(False, 0.0, 0.0, "ensemble-idle-thin-flow") stress_down = drift < -8.0 or (trend < -0.08 and vol > 20.0) favorable_fee_vol = fee_budget_pct_per_day >= max(0.15, 0.55 * vol_lvr_budget) clean_flow = toxicity < 0.35 or fee_budget_pct_per_day >= 1.5 * max(0.25, vol) if policy.name == "pdf_ensemble_selector_v1": # Aggressive: range-order whenever directional evidence is clean. if abs(trend) > 0.04 and abs(drift) > 1.0 and clean_flow: if drift > 0: lower = float(np.clip(max(0.05, move_p95 * 0.20), 0.05, 1.0)) upper = float(np.clip(max(0.25, move_p95 * 1.2), 0.25, 5.0)) return PolicyDecision(True, lower, upper, "ensemble-range-up") lower = float(np.clip(max(0.25, move_p95 * 1.2), 0.25, 5.0)) upper = float(np.clip(max(0.05, move_p95 * 0.20), 0.05, 1.0)) return PolicyDecision(True, lower, upper, "ensemble-range-down") if policy.name == "pdf_ensemble_selector_v2": # Defensive: stress downtrends use the slow asymmetric harvester; # range-order is reserved for cleaner uptrends. if stress_down: lower = float(np.clip(max(85.0, abs(drift) * 2.5, vol * 4.0), 85.0, 99.0)) upper = float(np.clip(max(0.01, move_p95 * 0.10), 0.01, 0.20)) return PolicyDecision(True, lower, upper, "ensemble-downtrend-harvester") if trend > 0.04 and drift > 1.0 and clean_flow: lower = float(np.clip(max(0.05, move_p95 * 0.20), 0.05, 1.0)) upper = float(np.clip(max(0.25, move_p95 * 1.2), 0.25, 5.0)) return PolicyDecision(True, lower, upper, "ensemble-range-up") if policy.name == "pdf_ensemble_selector_v3": # Balanced: hard downtrends are harvested, normal directional flow # uses range-order, otherwise require fee/vol evidence. if stress_down and (toxicity > 0.20 or not favorable_fee_vol): lower = float(np.clip(max(85.0, abs(drift) * 2.5, vol * 4.0), 85.0, 99.0)) upper = float(np.clip(max(0.01, move_p95 * 0.10), 0.01, 0.20)) return PolicyDecision(True, lower, upper, "ensemble-downtrend-harvester") if abs(trend) > 0.04 and abs(drift) > 1.0 and clean_flow: if drift > 0: lower = float(np.clip(max(0.05, move_p95 * 0.20), 0.05, 1.0)) upper = float(np.clip(max(0.25, move_p95 * 1.2), 0.25, 5.0)) return PolicyDecision(True, lower, upper, "ensemble-range-up") lower = float(np.clip(max(0.25, move_p95 * 1.2), 0.25, 5.0)) upper = float(np.clip(max(0.05, move_p95 * 0.20), 0.05, 1.0)) return PolicyDecision(True, lower, upper, "ensemble-range-down") if policy.name == "pdf_ensemble_selector_v4": # Guarded selector: the key failure mode is entering range-order # during high realized-vol windows. Use range-order only when the # trend is directional but local volatility remains moderate. if stress_down: lower = float(np.clip(max(85.0, abs(drift) * 2.5, vol * 4.0), 85.0, 99.0)) upper = float(np.clip(max(0.01, move_p95 * 0.10), 0.01, 0.20)) return PolicyDecision(True, lower, upper, "ensemble-downtrend-harvester") if abs(trend) > 0.04 and abs(drift) > 1.0 and clean_flow and vol < 18.0: if drift > 0: lower = float(np.clip(max(0.05, move_p95 * 0.20), 0.05, 1.0)) upper = float(np.clip(max(0.25, move_p95 * 1.2), 0.25, 5.0)) return PolicyDecision(True, lower, upper, "ensemble-range-up") lower = float(np.clip(max(0.25, move_p95 * 1.2), 0.25, 5.0)) upper = float(np.clip(max(0.05, move_p95 * 0.20), 0.05, 1.0)) return PolicyDecision(True, lower, upper, "ensemble-range-down") if favorable_fee_vol and toxicity < 0.60: width = float(np.clip(max(3.0, move_p95 * 10.0, vol * 1.1), 3.0, 75.0)) return PolicyDecision(True, width, width, "ensemble-vol-tox-gated") if policy.name == "pdf_ensemble_selector_v5": # Final defensive selector: do not use range-order on downtrends. # The focused test showed that range-down reacts too late on scaled # CHECK stress. Downtrends are harvested asymmetrically instead. if drift < -1.0 or trend < -0.04: lower = float(np.clip(max(85.0, abs(drift) * 2.5, vol * 4.0), 85.0, 99.0)) upper = float(np.clip(max(0.01, move_p95 * 0.10), 0.01, 0.20)) return PolicyDecision(True, lower, upper, "ensemble-downtrend-harvester") if trend > 0.04 and drift > 1.0 and clean_flow and vol < 18.0: lower = float(np.clip(max(0.05, move_p95 * 0.20), 0.05, 1.0)) upper = float(np.clip(max(0.25, move_p95 * 1.2), 0.25, 5.0)) return PolicyDecision(True, lower, upper, "ensemble-range-up") if favorable_fee_vol and toxicity < 0.60: width = float(np.clip(max(3.0, move_p95 * 10.0, vol * 1.1), 3.0, 75.0)) return PolicyDecision(True, width, width, "ensemble-vol-tox-gated") if policy.name == "pdf_ensemble_selector_v6": # Live filter: only LP into downtrends when observed fee density # is clearly paying for volatility/LVR and flow is not thin. This # intentionally gives up some upside to avoid scaled-CHECK style # drawdown where the correct action is mostly to stay in USDC. fee_edge = fee_budget_pct_per_day / max(0.20, vol_lvr_budget) dense_flow = events_h >= 2.0 and fee_budget_pct_per_day >= 0.08 if drift < -1.0 or trend < -0.04: if dense_flow and fee_edge >= 1.15 and toxicity < 0.55: lower = float(np.clip(max(85.0, abs(drift) * 2.5, vol * 4.0), 85.0, 99.0)) upper = float(np.clip(max(0.01, move_p95 * 0.10), 0.01, 0.20)) return PolicyDecision(True, lower, upper, "ensemble-downtrend-fee-edge") return PolicyDecision(False, 0.0, 0.0, "ensemble-idle-downtrend-low-edge") if trend > 0.04 and drift > 1.0 and clean_flow and vol < 18.0 and dense_flow: lower = float(np.clip(max(0.05, move_p95 * 0.20), 0.05, 1.0)) upper = float(np.clip(max(0.25, move_p95 * 1.2), 0.25, 5.0)) return PolicyDecision(True, lower, upper, "ensemble-range-up") if favorable_fee_vol and toxicity < 0.55 and dense_flow: width = float(np.clip(max(3.0, move_p95 * 10.0, vol * 1.1), 3.0, 75.0)) return PolicyDecision(True, width, width, "ensemble-vol-tox-gated") if policy.name == "pdf_ensemble_selector_v7": # Live-first selector. No symmetric LP fallback: focused tests show # that generic vol/tox LP can be the main loss source on CHECK-like # regimes. Trade only two evidence regimes, otherwise stay in USDC. fee_edge = fee_budget_pct_per_day / max(0.20, vol_lvr_budget) dense_flow = events_h >= 2.0 and fee_budget_pct_per_day >= 0.08 if drift < -1.0 or trend < -0.04: if dense_flow and fee_edge >= 1.15 and toxicity < 0.55: lower = float(np.clip(max(85.0, abs(drift) * 2.5, vol * 4.0), 85.0, 99.0)) upper = float(np.clip(max(0.01, move_p95 * 0.10), 0.01, 0.20)) return PolicyDecision(True, lower, upper, "ensemble-downtrend-fee-edge") return PolicyDecision(False, 0.0, 0.0, "ensemble-idle-downtrend-low-edge") if trend > 0.04 and drift > 1.0 and clean_flow and vol < 18.0 and dense_flow: lower = float(np.clip(max(0.05, move_p95 * 0.20), 0.05, 1.0)) upper = float(np.clip(max(0.25, move_p95 * 1.2), 0.25, 5.0)) return PolicyDecision(True, lower, upper, "ensemble-range-up") return PolicyDecision(False, 0.0, 0.0, "ensemble-idle-no-evidence") if policy.name == "pdf_ensemble_selector_v8": # Multi-timeframe live selector. Short windows catch range-order # opportunities; long windows veto counter-trend entries and choose # slow defensive harvesting. The objective is regime quality, not # maximum deployment time. s_drift = feat.get("s6_drift_pct", drift) s_path = max(feat.get("s6_path_pct", path), 1e-9) s_trend = s_drift / s_path s_vol = feat.get("s6_vol_pct", vol) s_move = max(0.01, feat.get("s6_move_p95_pct", move_p95)) s_events = feat.get("s6_events_per_hour", events_h) s_toxicity = feat.get("s6_toxicity", toxicity) s_fee = fee_rate * feat.get("s6_flow_usd_per_hour", flow) * 24.0 / max(capital, 1e-9) * 100.0 s_lvr = 100.0 * (s_vol / 100.0) ** 2 s_edge = s_fee / max(0.20, s_lvr) m_drift = feat.get("s12_drift_pct", drift) m_path = max(feat.get("s12_path_pct", path), 1e-9) m_trend = m_drift / m_path m_vol = feat.get("s12_vol_pct", vol) m_move = max(0.01, feat.get("s12_move_p95_pct", move_p95)) m_events = feat.get("s12_events_per_hour", events_h) m_toxicity = feat.get("s12_toxicity", toxicity) m_fee = fee_rate * feat.get("s12_flow_usd_per_hour", flow) * 24.0 / max(capital, 1e-9) * 100.0 m_lvr = 100.0 * (m_vol / 100.0) ** 2 m_edge = m_fee / max(0.20, m_lvr) l_drift = feat.get("s168_drift_pct", drift) l_path = max(feat.get("s168_path_pct", path), 1e-9) l_trend = l_drift / l_path l_vol = feat.get("s168_vol_pct", vol) l_move = max(0.01, feat.get("s168_move_p95_pct", move_p95)) l_events = feat.get("s168_events_per_hour", events_h) l_toxicity = feat.get("s168_toxicity", toxicity) l_fee = fee_rate * feat.get("s168_flow_usd_per_hour", flow) * 24.0 / max(capital, 1e-9) * 100.0 l_lvr = 100.0 * (l_vol / 100.0) ** 2 l_edge = l_fee / max(0.20, l_lvr) if l_events < 0.5 and m_events < 1.0 and s_events < 1.0: return PolicyDecision(False, 0.0, 0.0, "ensemble-idle-thin-flow") long_down = l_drift < -1.0 or l_trend < -0.04 mid_down = m_drift < -1.0 or m_trend < -0.04 if long_down and l_events >= 1.0 and l_edge >= 1.05 and l_toxicity < 0.60: lower = float(np.clip(max(85.0, abs(l_drift) * 2.5, l_vol * 4.0), 85.0, 99.0)) upper = float(np.clip(max(0.01, l_move * 0.10), 0.01, 0.20)) return PolicyDecision(True, lower, upper, "ensemble-long-downtrend-fee-edge") if mid_down and m_events >= 2.0 and m_edge >= 1.15 and m_toxicity < 0.55: lower = float(np.clip(max(85.0, abs(m_drift) * 2.5, m_vol * 4.0), 85.0, 99.0)) upper = float(np.clip(max(0.01, m_move * 0.10), 0.01, 0.20)) return PolicyDecision(True, lower, upper, "ensemble-mid-downtrend-fee-edge") long_veto = l_drift < -6.0 or (l_trend < -0.08 and l_vol > 18.0) short_clean = s_toxicity < 0.45 or s_edge >= 1.5 if not long_veto and s_trend > 0.05 and s_drift > 1.0 and s_vol < 18.0 and s_events >= 2.0 and short_clean: lower = float(np.clip(max(0.05, s_move * 0.20), 0.05, 1.0)) upper = float(np.clip(max(0.25, s_move * 1.2), 0.25, 5.0)) return PolicyDecision(True, lower, upper, "ensemble-mtf-range-up") return PolicyDecision(False, 0.0, 0.0, "ensemble-idle-no-evidence") if favorable_fee_vol and toxicity < 0.55: width = float(np.clip(max(3.0, move_p95 * 10.0, vol * 1.0), 3.0, 70.0)) return PolicyDecision(True, width, width, "ensemble-vol-tox-gated") return PolicyDecision(False, 0.0, 0.0, "ensemble-idle-no-edge") if policy.name == "pdf_vol_gated_lp": # PDF principle: quote only when realized fee density plausibly pays for # volatility/LVR. This is a policy rule, not a lower/upper tuner. if events_h < 1.0 or fee_budget_pct_per_day < max(0.20, 0.75 * vol_lvr_budget): return PolicyDecision(False, 0.0, 0.0, "idle-fee-below-realized-vol") if toxicity > 0.72 and fee_budget_pct_per_day < 2.0 * max(0.20, vol_lvr_budget): return PolicyDecision(False, 0.0, 0.0, "idle-toxic-flow") width = float(np.clip(max(2.0, move_p95 * 8.0, vol * 0.9), 2.0, 70.0)) return PolicyDecision(True, width, width, "pdf-vol-gated") if policy.name == "pdf_range_order": # Synthetic limit order from the PDF: a narrow range placed in the # direction of demonstrated flow. The runner closes/restarts at boundary # crossings in run_policy(), unlike periodic range tuning. if events_h < 1.0 or fee_budget_pct_per_day < 0.10: return PolicyDecision(False, 0.0, 0.0, "idle-thin-flow") if trend > 0.04 and drift > 1.0: lower = float(np.clip(max(0.05, move_p95 * 0.20), 0.05, 1.0)) upper = float(np.clip(max(0.25, move_p95 * 1.2), 0.25, 5.0)) return PolicyDecision(True, lower, upper, "pdf-range-order-up") if trend < -0.04 and drift < -1.0: lower = float(np.clip(max(0.25, move_p95 * 1.2), 0.25, 5.0)) upper = float(np.clip(max(0.05, move_p95 * 0.20), 0.05, 1.0)) return PolicyDecision(True, lower, upper, "pdf-range-order-down") return PolicyDecision(False, 0.0, 0.0, "idle-no-trend") if policy.name == "pdf_toxicity_gated_lp": # Avoid directional/informed flow unless fee density is exceptional. if events_h < 1.0 or fee_budget_pct_per_day < 0.10: return PolicyDecision(False, 0.0, 0.0, "idle-thin-flow") if toxicity > 0.45 and fee_budget_pct_per_day < 1.5 * max(0.25, vol): return PolicyDecision(False, 0.0, 0.0, "idle-toxic-flow") width = float(np.clip(max(3.0, move_p95 * 10.0, vol * 1.1), 3.0, 75.0)) return PolicyDecision(True, width, width, "pdf-toxicity-gated") if policy.name == "passive_wide_evidence": return PolicyDecision(True, 80.0, 80.0, "passive-wide") if policy.name == "fee_vol_gate_symmetric": # Only quote when event flow is enough to plausibly compensate LVR. if events_h < 2.0 or fee_budget_pct_per_day < 0.25 * max(1.0, vol): return PolicyDecision(False, 0.0, 0.0, "idle-low-fee-vol") width = float(np.clip(max(2.0, move_p95 * 8.0, vol * 1.2), 2.0, 80.0)) return PolicyDecision(True, width, width, "symmetric-fee-vol") if policy.name == "adaptive_inventory_shield": if events_h < 1.0 or fee_budget_pct_per_day < 0.10: return PolicyDecision(False, 0.0, 0.0, "idle-thin-flow") if trend > 0.05: lower = float(np.clip(max(3.0, move_p95 * 6.0), 3.0, 20.0)) upper = float(np.clip(max(20.0, abs(drift) * 2.5, vol * 4.0), 20.0, 150.0)) return PolicyDecision(True, lower, upper, "uptrend-inventory-runner") if trend < -0.05: lower = float(np.clip(max(45.0, abs(drift) * 2.0, vol * 3.0), 45.0, 95.0)) upper = float(np.clip(max(0.01, move_p95 * 0.25), 0.01, 2.0)) return PolicyDecision(True, lower, upper, "downtrend-inventory-shield") width = float(np.clip(max(5.0, move_p95 * 10.0, vol * 1.5), 5.0, 60.0)) return PolicyDecision(True, width, width, "range-flow") if policy.name == "momentum_range_order": if events_h < 1.0 or (toxicity < 0.05 and abs(drift) < 3.0): return PolicyDecision(False, 0.0, 0.0, "idle-no-trend") if trend > 0.0: return PolicyDecision(True, 2.0, float(np.clip(max(30.0, abs(drift) * 3.0), 30.0, 150.0)), "range-order-up") return PolicyDecision(True, float(np.clip(max(60.0, abs(drift) * 2.5), 60.0, 98.0)), 0.05, "range-order-down") if policy.name == "lvr_resistant_slow": if events_h < 1.0 or fee_budget_pct_per_day < 0.15 * max(1.0, vol) or toxicity > 0.65: return PolicyDecision(False, 0.0, 0.0, "idle-toxic-or-low-fee") width = float(np.clip(max(10.0, vol * 2.0, move_p95 * 12.0), 10.0, 95.0)) return PolicyDecision(True, width, width, "slow-lvr-resistant") if policy.name == "downtrend_fee_harvester": if events_h < 1.0 or fee_budget_pct_per_day < 0.05: return PolicyDecision(False, 0.0, 0.0, "idle-thin-flow") if drift < -1.0 or trend < -0.05: lower = float(np.clip(max(85.0, abs(drift) * 2.5, vol * 4.0), 85.0, 99.0)) upper = float(np.clip(max(0.01, move_p95 * 0.10), 0.01, 0.20)) return PolicyDecision(True, lower, upper, "ultra-defensive-downtrend") if drift > 2.0 and trend > 0.08: return PolicyDecision(True, 5.0, float(np.clip(max(40.0, drift * 2.5), 40.0, 150.0)), "uptrend-runner") return PolicyDecision(False, 0.0, 0.0, "idle-no-edge") if policy.name in {"delta_hedged_fee_lp", "inverse_delta_hedged_fee_lp"}: # A distinct algorithmic family: quote CL only when fees/flow exist, but # neutralize first-order inventory exposure with an external hedge. if events_h < 1.0 or fee_budget_pct_per_day < 0.03: return PolicyDecision(False, 0.0, 0.0, "idle-thin-flow") if trend < -0.05: return PolicyDecision(True, 90.0, 0.25, "hedged-downtrend-fees") if trend > 0.08: return PolicyDecision(True, 8.0, 80.0, "hedged-uptrend-fees") width = float(np.clip(max(8.0, move_p95 * 10.0, vol * 1.2), 8.0, 60.0)) return PolicyDecision(True, width, width, "hedged-neutral-fees") raise ValueError(f"Unknown policy: {policy.name}") def lp_value_at_price(price_usd: float, lower: float, upper: float, liq: float, dec0: int, dec1: int) -> float: sqrt_price = sqrt_raw_token1_per_token0_from_price(np.array([price_usd], dtype=np.float64), dec0, dec1)[0] unit = unit_value_one_from_sqrt(float(sqrt_price), price_usd, lower, upper, dec0, dec1) return float(max(0.0, liq * unit)) def lp_delta_usd_per_price(price_usd: float, lower: float, upper: float, liq: float, dec0: int, dec1: int) -> float: eps = max(1e-8, abs(price_usd) * 1e-4) p_hi = price_usd + eps p_lo = max(1e-12, price_usd - eps) v_hi = lp_value_at_price(p_hi, lower, upper, liq, dec0, dec1) v_lo = lp_value_at_price(p_lo, lower, upper, liq, dec0, dec1) return (v_hi - v_lo) / max(1e-12, p_hi - p_lo) def run_policy( price: np.ndarray, input_usd: np.ndarray, active_liq: np.ndarray, ts: np.ndarray, dec0: int, dec1: int, initial_capital: float, total_capital_usd: float, fee_rate: float, policy: AlgoPolicy, ) -> Dict[str, Any]: n = len(price) sqrt_price = sqrt_raw_token1_per_token0_from_price(price, dec0, dec1) interval = max(1, int(policy.rebalance_hours * 3600)) cash = float(initial_capital) our_liq = 0.0 lower = upper = 0.0 deployed = False fees_uncollected = 0.0 fees_earned_total = 0.0 fees_reinvested = 0.0 hedge_pnl_pending = 0.0 hedge_pnl_total = 0.0 costs_cum = 0.0 rebalances = 0 decisions = 0 idle_decisions = 0 reason_counts: Dict[str, int] = {} equity_arr = np.empty(n, dtype=np.float64) pos_arr = np.zeros(n, dtype=np.float64) in_arr = np.zeros(n, dtype=np.int8) share_arr = np.zeros(n, dtype=np.float64) hodl50 = initial_capital / 2.0 + (initial_capital / 2.0 / float(price[0])) * price active_reason = "" # Bootstrap decision: a live bot must either open a conservative first # virtual position or explicitly sit in cash before the first rebalance. first_feat = enrich_multitimeframe_features(window_features(price, input_usd, ts, 0, policy.lookback_hours), price, input_usd, ts, 0, policy) first_decision = decide(policy, first_feat, fee_rate, max(cash, 1e-9)) decisions += 1 reason_counts[first_decision.reason] = reason_counts.get(first_decision.reason, 0) + 1 if first_decision.deploy and cash > 0.0: p = float(price[0]) lower = p * (1.0 - first_decision.lower_pct / 100.0) upper = p * (1.0 + first_decision.upper_pct / 100.0) our_liq = liquidity_for_capital(cash, p, lower, upper, dec0, dec1) deployed = our_liq > 0.0 active_reason = first_decision.reason if deployed else "" else: idle_decisions += 1 idx = 0 while idx < n: next_ts = int(ts[idx]) + interval next_idx = int(np.searchsorted(ts, next_ts, side="left")) next_idx = min(max(idx + 1, next_idx), n) event_driven_exit = policy.name == "pdf_range_order" or active_reason.startswith("ensemble-range-") if deployed and our_liq > 0.0 and event_driven_exit: probe = price[idx:next_idx] exit_mask = np.zeros(len(probe), dtype=bool) if active_reason.endswith("-up"): exit_mask = probe >= upper elif active_reason.endswith("-down"): exit_mask = probe <= lower if exit_mask.any(): rel = int(np.argmax(exit_mask)) next_idx = max(idx + 1, idx + rel + 1) if deployed and our_liq > 0.0: sl = slice(idx, next_idx) pseg = price[sl] in_range = (pseg >= lower) & (pseg <= upper) share = liquidity_share(our_liq, active_liq[sl]) fee_events = np.zeros_like(pseg, dtype=np.float64) fee_events[in_range] = input_usd[sl][in_range] * fee_rate * share[in_range] fees_cum = np.cumsum(fee_events) unit = unit_value_curve_from_sqrt(sqrt_price[sl], pseg, lower, upper, dec0, dec1) pos_value = our_liq * unit hedge_pnl = 0.0 hedge_curve = 0.0 if policy.name in {"delta_hedged_fee_lp", "inverse_delta_hedged_fee_lp"}: p0 = float(price[idx]) hedge_sign = 1.0 if policy.name == "inverse_delta_hedged_fee_lp" else -1.0 hedge_units = hedge_sign * lp_delta_usd_per_price(p0, lower, upper, our_liq, dec0, dec1) hedge_curve = hedge_units * (pseg - p0) hedge_pnl = float(hedge_curve[-1]) if len(pseg) else 0.0 equity_arr[sl] = pos_value + fees_uncollected + fees_cum + hedge_pnl_pending + hedge_curve pos_arr[sl] = pos_value in_arr[sl] = in_range.astype(np.int8) share_arr[sl] = share segment_fees = float(fees_cum[-1]) if len(fees_cum) else 0.0 fees_uncollected += segment_fees fees_earned_total += segment_fees if policy.name in {"delta_hedged_fee_lp", "inverse_delta_hedged_fee_lp"}: hedge_pnl_pending += hedge_pnl hedge_pnl_total += hedge_pnl else: sl = slice(idx, next_idx) equity_arr[sl] = cash pos_arr[sl] = 0.0 in_arr[sl] = 0 share_arr[sl] = 0.0 idx = next_idx if idx >= n: break # Close old virtual position and decide the next one at the event boundary. if deployed and our_liq > 0.0: p = float(price[idx]) unit_now = unit_value_one_from_sqrt(float(sqrt_price[idx]), p, lower, upper, dec0, dec1) pos_val = our_liq * max(0.0, unit_now) cash = pos_val + fees_uncollected + hedge_pnl_pending fees_reinvested += fees_uncollected fees_uncollected = 0.0 hedge_pnl_pending = 0.0 cost = policy.gas_usd + cash * (policy.swap_cost_bps / 10000.0) costs_cum += cost cash = max(0.0, cash - cost) rebalances += 1 feat = enrich_multitimeframe_features(window_features(price, input_usd, ts, idx, policy.lookback_hours), price, input_usd, ts, idx, policy) decision = decide(policy, feat, fee_rate, max(cash, 1e-9)) decisions += 1 reason_counts[decision.reason] = reason_counts.get(decision.reason, 0) + 1 if not decision.deploy or cash <= 0.0: deployed = False our_liq = 0.0 lower = upper = 0.0 active_reason = "" idle_decisions += 1 continue p = float(price[idx]) lower = p * (1.0 - decision.lower_pct / 100.0) upper = p * (1.0 + decision.upper_pct / 100.0) our_liq = liquidity_for_capital(cash, p, lower, upper, dec0, dec1) deployed = our_liq > 0.0 active_reason = decision.reason if deployed else "" st = Strategy(policy.name, 0.0, 0.0, policy.rebalance_hours, policy.gas_usd, policy.swap_cost_bps, "adaptive") row = finish_summary( { "strategy": policy.name, "lower_pct": np.nan, "upper_pct": np.nan, "rebalance_hours": float(policy.rebalance_hours), "rebalance_mode": "adaptive", "capital_usd": float(initial_capital), "initial_capital_usd": float(initial_capital), }, equity_arr, pos_arr, fees_earned_total, fees_reinvested, fees_uncollected, costs_cum, in_arr, share_arr, hodl50, price, initial_capital, rebalances, total_capital_usd, ) row["decisions"] = int(decisions) row["idle_decisions"] = int(idle_decisions) row["idle_decision_pct"] = float(idle_decisions / decisions * 100.0) if decisions else 0.0 row["policy_reason_top"] = max(reason_counts.items(), key=lambda kv: kv[1])[0] if reason_counts else "" row["policy_reason_counts_json"] = json.dumps(reason_counts, sort_keys=True) row["hedge_pnl_total"] = float(hedge_pnl_total) row["hedge_pnl_pending"] = float(hedge_pnl_pending) return row def write_rows_csv(path: Path, rows: List[Dict[str, Any]]) -> None: if not rows: path.write_text("", encoding="utf-8") return fields: List[str] = [] for row in rows: for k in row: if k not in fields: fields.append(k) with path.open("w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader() w.writerows(rows) def valid_capacity(row: Dict[str, Any], args: argparse.Namespace) -> bool: return ( float(row["avg_liquidity_share_pct_when_in_range"]) <= args.max_avg_liquidity_share_pct and float(row["p95_liquidity_share_pct_when_in_range"]) <= args.max_p95_liquidity_share_pct and float(row["p99_liquidity_share_pct_when_in_range"]) <= args.max_p99_liquidity_share_pct ) def make_policies(args: argparse.Namespace) -> List[AlgoPolicy]: names = [x.strip() for x in args.policies.split(",") if x.strip()] return [ AlgoPolicy(name=n, rebalance_hours=args.rebalance_hours, lookback_hours=args.lookback_hours, gas_usd=args.gas_usd, swap_cost_bps=args.swap_cost_bps) for n in names ] def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--npz", required=True) ap.add_argument("--out-dir", required=True) ap.add_argument("--fee-rates", default="metadata:0.003") ap.add_argument("--time-from", default="") ap.add_argument("--time-to", default="") ap.add_argument("--dec0", type=int, default=0) ap.add_argument("--dec1", type=int, default=0) ap.add_argument("--capital-grid", default="5,10,25,50,100,500,1000") ap.add_argument("--total-capital-usd", type=float, default=1000.0) ap.add_argument("--policies", default="passive_wide_evidence,fee_vol_gate_symmetric,adaptive_inventory_shield,momentum_range_order,lvr_resistant_slow,downtrend_fee_harvester,pdf_vol_gated_lp,pdf_range_order,pdf_toxicity_gated_lp,pdf_ensemble_selector_v1,pdf_ensemble_selector_v2,pdf_ensemble_selector_v3,pdf_ensemble_selector_v4,pdf_ensemble_selector_v5,pdf_ensemble_selector_v6,pdf_ensemble_selector_v7,pdf_ensemble_selector_v8,delta_hedged_fee_lp,inverse_delta_hedged_fee_lp") ap.add_argument("--rebalance-hours", type=float, default=168.0) ap.add_argument("--lookback-hours", type=float, default=72.0) ap.add_argument("--gas-usd", type=float, default=0.0) ap.add_argument("--swap-cost-bps", type=float, default=0.0) ap.add_argument("--target-mdd-pct", type=float, default=25.0) ap.add_argument("--max-avg-liquidity-share-pct", type=float, default=3.0) ap.add_argument("--max-p95-liquidity-share-pct", type=float, default=5.0) ap.add_argument("--max-p99-liquidity-share-pct", type=float, default=10.0) ap.add_argument("--max-liquidity-share-pct", type=float, default=25.0) ap.add_argument("--w-mdd", type=float, default=2.0) ap.add_argument("--w-avg-share", type=float, default=5.0) ap.add_argument("--w-p95-share", type=float, default=10.0) ap.add_argument("--w-p99-share", type=float, default=3.0) ap.add_argument("--w-max-share", type=float, default=0.5) ap.add_argument("--w-rebalance", type=float, default=0.02) ap.add_argument("--top-n", type=int, default=50) args = ap.parse_args() data = load_npz(args.npz) meta = data.get("meta", {}) time_from = args.time_from time_to = args.time_to if not time_from and "timestamp_start" in meta: time_from = ts_to_iso(int(meta["timestamp_start"])) if not time_to and "timestamp_end" in meta: time_to = ts_to_iso(int(meta["timestamp_end"]) + 1) d = filter_time(data, time_from, time_to) dec0 = args.dec0 or int(meta.get("dec0", 6)) dec1 = args.dec1 or int(meta.get("dec1", 18)) ts = d["ts"].astype(np.int64) price = d["price"].astype(np.float64) input_usd = d["input_usd"].astype(np.float64) active_liq = d["active_liquidity"].astype(np.float64) out_dir = Path(args.out_dir) out_dir.mkdir(parents=True, exist_ok=True) rows: List[Dict[str, Any]] = [] for fee_name, fee_rate in parse_fee_specs(args.fee_rates): for policy in make_policies(args): for cap in parse_float_list(args.capital_grid): row = run_policy(price, input_usd, active_liq, ts, dec0, dec1, float(cap), args.total_capital_usd, fee_rate, policy) row.update({ "npz": str(args.npz), "time_from": time_from, "time_to": time_to, "fee_scenario": fee_name, "fee_rate": fee_rate, "run_name": f"{fee_name}__{policy.name}__cap_{float(cap):g}", "script_version": SCRIPT_VERSION, }) row["valid_capacity_avg_p95_p99"] = valid_capacity(row, args) row["valid_capacity_plus_max"] = bool(row["valid_capacity_avg_p95_p99"] and row["max_liquidity_share_pct_when_in_range"] <= args.max_liquidity_share_pct) row["score"] = score_row(row, args) rows.append(row) rows = sorted(rows, key=lambda r: float(r["score"]), reverse=True) write_rows_csv(out_dir / "summary.csv", rows) write_rows_csv(out_dir / "best_by_score.csv", rows[: args.top_n]) write_rows_csv(out_dir / "best_by_return.csv", sorted(rows, key=lambda r: float(r["return_pct"]), reverse=True)[: args.top_n]) valid = [r for r in rows if r["valid_capacity_avg_p95_p99"]] write_rows_csv(out_dir / "best_capacity.csv", valid[: args.top_n]) (out_dir / "summary.json").write_text( json.dumps( { "script_version": SCRIPT_VERSION, "npz": str(args.npz), "time_from": time_from, "time_to": time_to, "rows": int(len(price)), "best_by_score": rows[: min(args.top_n, 20)], }, indent=2, ensure_ascii=False, ), encoding="utf-8", ) print(json.dumps({"script_version": SCRIPT_VERSION, "out_dir": str(out_dir), "rows": len(rows)}, ensure_ascii=False)) if __name__ == "__main__": main()