#!/usr/bin/env python3 """ Oleg-style tranche LP-DCA replay for CL pools. Interpretation from chat/context: - split capital into working tranche + reserve - open a medium/wide CL range - if price exits the range, close and reopen - if price exits below, optionally add another reserve tranche - report fee PnL separately from inventory/mark-to-market PnL """ import argparse import csv import json from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Tuple import numpy as np from cl_fee_replay_fast_npz_v3 import ( Strategy, filter_time, finish_summary, liquidity_for_capital, liquidity_share, load_npz, parse_float_list, parse_iso_ts, score_row, sqrt_raw_token1_per_token0_from_price, unit_value_curve_from_sqrt, unit_value_one_from_sqrt, ) SCRIPT_VERSION = "cl_fee_replay_tranche_dca_v1_2026_05_05" @dataclass(frozen=True) class TrancheConfig: total_capital: float parts: int initial_deploy_fraction: float lower_pct: float upper_pct: float add_mode: str max_reentries: int min_reentry_hours: float anchor_mode: str support_lookback_hours: float support_offset_pct: float gas_usd: float swap_cost_bps: float @property def name(self) -> str: return ( f"oleg_tranche_{self.parts}p_{self.initial_deploy_fraction:g}f_{self.lower_pct:g}_{self.upper_pct:g}_" f"{self.add_mode}_{self.max_reentries}x_{self.min_reentry_hours:g}h_" f"{self.anchor_mode}_{self.support_lookback_hours:g}h_{self.support_offset_pct:g}off" ) def ts_to_iso(ts: int) -> str: return datetime.fromtimestamp(int(ts), tz=timezone.utc).isoformat().replace("+00:00", "Z") def write_csv(path: Path, rows: List[Dict[str, Any]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) if not rows: path.write_text("", encoding="utf-8") return fields: List[str] = [] for row in rows: for key in row: if key not in fields: fields.append(key) with path.open("w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader() w.writerows(rows) def tranche_amount(cfg: TrancheConfig, add_count: int) -> float: base = cfg.total_capital / max(1, cfg.parts) if cfg.add_mode == "equal": return base if cfg.add_mode == "double_base": return base * 2.0 if cfg.add_mode == "double_once": return base * 2.0 if add_count == 0 else base if cfg.add_mode == "progressive": return base * (1.0 + add_count) raise ValueError(f"bad add_mode: {cfg.add_mode}") def anchor_price(price: np.ndarray, ts: np.ndarray, idx: int, cfg: TrancheConfig) -> float: p = float(price[idx]) if cfg.anchor_mode == "current_price": return p if cfg.anchor_mode != "rolling_low": raise ValueError(f"bad anchor_mode: {cfg.anchor_mode}") start_ts = int(ts[idx]) - int(cfg.support_lookback_hours * 3600) lo = int(np.searchsorted(ts, start_ts, side="left")) low = float(np.min(price[max(0, lo) : idx + 1])) return low * (1.0 - cfg.support_offset_pct / 100.0) def range_from_anchor(price: np.ndarray, ts: np.ndarray, idx: int, cfg: TrancheConfig) -> Tuple[float, float]: a = anchor_price(price, ts, idx, cfg) lower = max(1e-300, a * (1.0 - cfg.lower_pct / 100.0)) upper = max(lower * 1.000001, a * (1.0 + cfg.upper_pct / 100.0)) return lower, upper def run_one( price: np.ndarray, input_usd: np.ndarray, active_liq: np.ndarray, ts: np.ndarray, dec0: int, dec1: int, fee_rate: float, cfg: TrancheConfig, ) -> Dict[str, Any]: n = len(price) sqrt_price = sqrt_raw_token1_per_token0_from_price(price, dec0, dec1) reserve = float(cfg.total_capital) deploy = min(reserve, cfg.total_capital * cfg.initial_deploy_fraction) reserve -= deploy initial_deployed = float(deploy) initial_reserve = float(reserve) deployed_cash = deploy add_count = 0 rebalances = 0 fees_uncollected = 0.0 fees_earned_total = 0.0 fees_reinvested = 0.0 costs_cum = 0.0 lower = upper = 0.0 our_liq = 0.0 equity_arr = np.empty(n, dtype=np.float64) pos_arr = np.zeros(n, dtype=np.float64) in_arr = np.zeros(n, dtype=np.int8) share_arr = np.zeros(n, dtype=np.float64) hodl50 = cfg.total_capital / 2.0 + (cfg.total_capital / 2.0 / float(price[0])) * price def open_position(idx: int, cash: float) -> Tuple[float, float, float]: lo, up = range_from_anchor(price, ts, idx, cfg) liq = liquidity_for_capital(cash, float(price[idx]), lo, up, dec0, dec1) return lo, up, liq lower, upper, our_liq = open_position(0, deployed_cash) idx = 0 last_reentry_ts = int(ts[0]) while idx < n: min_next_ts = last_reentry_ts + int(cfg.min_reentry_hours * 3600) start_check = max(idx + 1, int(np.searchsorted(ts, min_next_ts, side="left"))) if start_check >= n: next_idx = n exit_side = "" else: out_low = price[start_check:] < lower out_high = price[start_check:] > upper out = out_low | out_high if out.any(): rel = int(np.argmax(out)) next_idx = start_check + rel exit_side = "down" if bool(out_low[rel]) else "up" else: next_idx = n exit_side = "" if next_idx > idx: sl = slice(idx, next_idx) pseg = price[sl] in_range = (pseg >= lower) & (pseg <= upper) share = liquidity_share(our_liq, active_liq[sl]) fee_events = np.zeros_like(pseg, dtype=np.float64) fee_events[in_range] = input_usd[sl][in_range] * fee_rate * share[in_range] fees_cum = np.cumsum(fee_events) unit = unit_value_curve_from_sqrt(sqrt_price[sl], pseg, lower, upper, dec0, dec1) pos_value = our_liq * unit equity_arr[sl] = reserve + pos_value + fees_uncollected + fees_cum pos_arr[sl] = pos_value in_arr[sl] = in_range.astype(np.int8) share_arr[sl] = share segment_fees = float(fees_cum[-1]) if len(fees_cum) else 0.0 fees_uncollected += segment_fees fees_earned_total += segment_fees idx = next_idx if idx >= n: break p = float(price[idx]) unit_now = unit_value_one_from_sqrt(float(sqrt_price[idx]), p, lower, upper, dec0, dec1) pos_val = max(0.0, our_liq * unit_now) deployed_cash = pos_val + fees_uncollected fees_reinvested += fees_uncollected fees_uncollected = 0.0 if exit_side == "down" and add_count < cfg.max_reentries and reserve > 0.0: add = min(reserve, tranche_amount(cfg, add_count)) reserve -= add deployed_cash += add add_count += 1 cost = cfg.gas_usd + deployed_cash * cfg.swap_cost_bps / 10000.0 costs_cum += cost deployed_cash = max(0.0, deployed_cash - cost) lower, upper, our_liq = open_position(idx, deployed_cash) rebalances += 1 last_reentry_ts = int(ts[idx]) strat = Strategy(cfg.name, cfg.lower_pct, cfg.upper_pct, cfg.min_reentry_hours, cfg.gas_usd, cfg.swap_cost_bps, "tranche_dca") row = finish_summary( { "strategy": cfg.name, "lower_pct": cfg.lower_pct, "upper_pct": cfg.upper_pct, "rebalance_hours": cfg.min_reentry_hours, "rebalance_mode": "tranche_dca_oor", "capital_usd": cfg.total_capital, "initial_capital_usd": cfg.total_capital, "parts": cfg.parts, "initial_deploy_fraction": cfg.initial_deploy_fraction, "add_mode": cfg.add_mode, "max_reentries": cfg.max_reentries, "min_reentry_hours": cfg.min_reentry_hours, "anchor_mode": cfg.anchor_mode, "support_lookback_hours": cfg.support_lookback_hours, "support_offset_pct": cfg.support_offset_pct, "reserve_end_usd": reserve, "reentry_adds_used": add_count, "initial_deployed_usd": initial_deployed, "initial_reserve_usd": initial_reserve, }, equity_arr, pos_arr, fees_earned_total, fees_reinvested, fees_uncollected, costs_cum, in_arr, share_arr, hodl50, price, cfg.total_capital, rebalances, cfg.total_capital, ) row["inventory_pnl_ex_fees_usd"] = float(row["profit_usd"] - fees_earned_total + costs_cum) row["fees_minus_inventory_loss_usd"] = float(fees_earned_total + min(0.0, row["inventory_pnl_ex_fees_usd"])) row["return_on_initial_deployed_pct"] = float(row["profit_usd"] / initial_deployed * 100.0) if initial_deployed else np.nan return row def valid_capacity(row: Dict[str, Any], args: argparse.Namespace) -> bool: return ( float(row["avg_liquidity_share_pct_when_in_range"]) <= args.max_avg_liquidity_share_pct and float(row["p95_liquidity_share_pct_when_in_range"]) <= args.max_p95_liquidity_share_pct and float(row["p99_liquidity_share_pct_when_in_range"]) <= args.max_p99_liquidity_share_pct ) def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--npz", required=True) ap.add_argument("--out-dir", required=True) ap.add_argument("--time-from", default="") ap.add_argument("--time-to", default="") ap.add_argument("--fee-rate", type=float, default=0.003) ap.add_argument("--dec0", type=int, default=0) ap.add_argument("--dec1", type=int, default=0) ap.add_argument("--total-capital-grid", default="50,75,100,150,200,300,600") ap.add_argument("--parts-grid", default="3,4,5,6") ap.add_argument("--initial-deploy-fraction-grid", default="0") ap.add_argument("--lower-grid", default="35,40,45,50,55,60,70,80") ap.add_argument("--upper-grid", default="5,10,15,20,25,30") ap.add_argument("--add-mode", default="equal,double_base,double_once,progressive") ap.add_argument("--max-reentries-grid", default="1,2,3,4") ap.add_argument("--min-reentry-hours-grid", default="0,6,12,24") ap.add_argument("--anchor-mode", default="current_price,rolling_low") ap.add_argument("--support-lookback-hours", default="6,12,24") ap.add_argument("--support-offset-pct", default="0,2,5") ap.add_argument("--gas-usd", type=float, default=0.0) ap.add_argument("--swap-cost-bps", type=float, default=0.0) ap.add_argument("--target-mdd-pct", type=float, default=20.0) ap.add_argument("--max-avg-liquidity-share-pct", type=float, default=3.0) ap.add_argument("--max-p95-liquidity-share-pct", type=float, default=5.0) ap.add_argument("--max-p99-liquidity-share-pct", type=float, default=10.0) ap.add_argument("--max-liquidity-share-pct", type=float, default=25.0) ap.add_argument("--w-mdd", type=float, default=2.0) ap.add_argument("--w-avg-share", type=float, default=5.0) ap.add_argument("--w-p95-share", type=float, default=10.0) ap.add_argument("--w-p99-share", type=float, default=3.0) ap.add_argument("--w-max-share", type=float, default=0.5) ap.add_argument("--w-rebalance", type=float, default=0.02) ap.add_argument("--top-n", type=int, default=50) args = ap.parse_args() data = load_npz(args.npz) meta = data.get("meta", {}) time_from = args.time_from or (ts_to_iso(int(meta["timestamp_start"])) if "timestamp_start" in meta else "") time_to = args.time_to or (ts_to_iso(int(meta["timestamp_end"]) + 1) if "timestamp_end" in meta else "") d = filter_time(data, time_from, time_to) dec0 = args.dec0 or int(meta.get("dec0", 6)) dec1 = args.dec1 or int(meta.get("dec1", 18)) price = d["price"].astype(np.float64) input_usd = d["input_usd"].astype(np.float64) active_liq = d["active_liquidity"].astype(np.float64) ts = d["ts"].astype(np.int64) rows: List[Dict[str, Any]] = [] for cap in parse_float_list(args.total_capital_grid): for parts in [int(x) for x in parse_float_list(args.parts_grid)]: deploy_fracs = parse_float_list(args.initial_deploy_fraction_grid) if deploy_fracs == [0.0]: deploy_fracs = [1.0 / max(1, parts)] for deploy_frac in deploy_fracs: if deploy_frac <= 0.0 or deploy_frac > 1.0: raise ValueError(f"bad initial deploy fraction: {deploy_frac}") for lower in parse_float_list(args.lower_grid): for upper in parse_float_list(args.upper_grid): for add_mode in [x.strip() for x in args.add_mode.split(",") if x.strip()]: for max_re in [int(x) for x in parse_float_list(args.max_reentries_grid)]: for min_h in parse_float_list(args.min_reentry_hours_grid): for anchor in [x.strip() for x in args.anchor_mode.split(",") if x.strip()]: for look_h in parse_float_list(args.support_lookback_hours): for off in parse_float_list(args.support_offset_pct): cfg = TrancheConfig(cap, parts, deploy_frac, lower, upper, add_mode, max_re, min_h, anchor, look_h, off, args.gas_usd, args.swap_cost_bps) row = run_one(price, input_usd, active_liq, ts, dec0, dec1, args.fee_rate, cfg) row["npz"] = str(args.npz) row["time_from"] = time_from row["time_to"] = time_to row["fee_rate"] = args.fee_rate row["script_version"] = SCRIPT_VERSION row["valid_capacity_avg_p95_p99"] = valid_capacity(row, args) row["valid_capacity_plus_max"] = bool(row["valid_capacity_avg_p95_p99"] and float(row["max_liquidity_share_pct_when_in_range"]) <= args.max_liquidity_share_pct) row["score"] = score_row(row, args) rows.append(row) out = Path(args.out_dir) out.mkdir(parents=True, exist_ok=True) rows_sorted = sorted(rows, key=lambda r: float(r["score"]), reverse=True) write_csv(out / "summary.csv", rows) write_csv(out / "best_by_score.csv", rows_sorted[: args.top_n]) write_csv(out / "best_by_return.csv", sorted(rows, key=lambda r: float(r["return_pct"]), reverse=True)[: args.top_n]) write_csv(out / "best_capacity.csv", [r for r in rows_sorted if r["valid_capacity_avg_p95_p99"]][: args.top_n]) (out / "summary.json").write_text(json.dumps({"rows": len(rows), "out_dir": str(out), "script_version": SCRIPT_VERSION}, indent=2), encoding="utf-8") print(json.dumps({"out_dir": str(out), "rows": len(rows), "script_version": SCRIPT_VERSION}, ensure_ascii=False)) if __name__ == "__main__": main()