#!/usr/bin/env python3 """ ohlcv_lp_sim_v1.py Hourly-OHLCV LP simulator for Uniswap V3 / Aerodrome Slipstream pools. Uses hourly OHLCV candles (price in USD, volume in USD) to simulate concentrated LP position value + fee income. Fee income formula: fee_candle = (our_conc_liq / pool_conc_liq) * fee_rate * volume_usd [when in range] where our_conc_liq = capital / f(price, lower, upper) [Uniswap V3 math] pool_conc_liq ≈ pool_tvl_usd / g(price, assumed_pool_range) IL and position value use standard Uniswap V3 inventory math. """ from __future__ import annotations import argparse, json, math from pathlib import Path import numpy as np import pandas as pd SCRIPT_VERSION = "ohlcv_lp_sim_v1_2026_05_02" def sqrt_raw(price: float | np.ndarray, dec0: int, dec1: int) -> float | np.ndarray: return np.sqrt(10.0 ** (dec1 - dec0) / np.maximum(price, 1e-300)) def liquidity_for_capital(capital: float, p0: float, lower: float, upper: float, dec0: int, dec1: int) -> float: sp = float(sqrt_raw(np.array([p0]), dec0, dec1)[0]) sa = float(sqrt_raw(upper, dec0, dec1)) sb = float(sqrt_raw(lower, dec0, dec1)) if p0 <= lower: a0, a1 = 0.0, float(sb - sa) elif p0 >= upper: a0, a1 = float((sb - sa) / (sa * sb)), 0.0 else: a0 = float((sb - sp) / (sp * sb)) a1 = float(sp - sa) uval = a0 / 10 ** dec0 + a1 / 10 ** dec1 * p0 if uval <= 1e-300: return 0.0 return capital / uval def position_value(L: float, price: np.ndarray, lower: float, upper: float, dec0: int, dec1: int) -> np.ndarray: sp = sqrt_raw(price, dec0, dec1) sa = float(sqrt_raw(upper, dec0, dec1)) sb = float(sqrt_raw(lower, dec0, dec1)) below = price <= lower above = price >= upper mid = ~(below | above) a0 = np.zeros_like(price) a1 = np.zeros_like(price) a0[above] = L * (sb - sa) / (sa * sb) a1[below] = L * (sb - sa) a0[mid] = L * (sb - sp[mid]) / (sp[mid] * sb) a1[mid] = L * (sp[mid] - sa) return a0 / 10 ** dec0 + a1 / 10 ** dec1 * price def max_drawdown(equity: np.ndarray) -> float: peak = np.maximum.accumulate(equity) dd = equity / np.where(peak == 0, np.nan, peak) - 1.0 return float(np.nanmin(dd) * 100.0) def run_strategy(prices: np.ndarray, volumes: np.ndarray, fee_rate: float, pool_tvl_usd: float, pool_assumed_range_pct: float, capital: float, lower_pct: float, upper_pct: float, rebalance_hours: int, dec0: int, dec1: int) -> dict: n = len(prices) p0 = prices[0] lower = p0 * (1.0 - lower_pct / 100.0) upper = p0 * (1.0 + upper_pct / 100.0) our_L = liquidity_for_capital(capital, p0, lower, upper, dec0, dec1) # Pool liquidity proxy: TVL / unit_value_for_assumed_range pool_lower = p0 * (1.0 - pool_assumed_range_pct / 100.0) pool_upper = p0 * (1.0 + pool_assumed_range_pct / 100.0) pool_L = liquidity_for_capital(pool_tvl_usd, p0, pool_lower, pool_upper, dec0, dec1) equity = np.empty(n) in_range_arr = np.zeros(n, dtype=bool) share_arr = np.zeros(n) fees_cum = 0.0 rebalances = 0 last_reb = 0 reb_interval = rebalance_hours cur_lower, cur_upper = lower, upper cur_L = our_L cur_capital = capital for i in range(n): p = prices[i] in_r = (cur_lower <= p <= cur_upper) in_range_arr[i] = in_r if in_r: share = cur_L / (pool_L + cur_L) if (pool_L + cur_L) > 0 else 0.0 fee_i = share * fee_rate * volumes[i] fees_cum += fee_i share_arr[i] = share pos_val = float(position_value(cur_L, np.array([p]), cur_lower, cur_upper, dec0, dec1)[0]) equity[i] = pos_val + fees_cum # Periodic rebalance if reb_interval > 0 and (i - last_reb) >= reb_interval: # collect fees, re-enter at current price cur_capital = equity[i] cur_lower = p * (1.0 - lower_pct / 100.0) cur_upper = p * (1.0 + upper_pct / 100.0) cur_L = liquidity_for_capital(cur_capital, p, cur_lower, cur_upper, dec0, dec1) # Refresh pool_L estimate pool_L = liquidity_for_capital(pool_tvl_usd, p, p * (1.0 - pool_assumed_range_pct / 100.0), p * (1.0 + pool_assumed_range_pct / 100.0), dec0, dec1) fees_cum = 0.0 # fees now included in capital rebalances += 1 last_reb = i time_in_range_pct = float(in_range_arr.mean() * 100.0) return_pct = float((equity[-1] / capital - 1.0) * 100.0) mdd_pct = max_drawdown(equity) fees_total = float(np.sum( [share_arr[i] * fee_rate * volumes[i] for i in range(n)] )) avg_share = float(share_arr[in_range_arr].mean() * 100.0) if in_range_arr.any() else 0.0 pnl_mdd = return_pct / abs(mdd_pct) if mdd_pct != 0 else float('inf') return { 'return_pct': return_pct, 'mdd_pct': mdd_pct, 'pnl_mdd': pnl_mdd, 'fees_total_usd': fees_total, 'time_in_range_pct': time_in_range_pct, 'avg_share_pct': avg_share, 'rebalances': rebalances, 'equity_end': float(equity[-1]), 'price_start': float(prices[0]), 'price_end': float(prices[-1]), 'price_return_pct': float((prices[-1] / prices[0] - 1.0) * 100.0), } def main(): ap = argparse.ArgumentParser() ap.add_argument('--csv', required=True, help='Hourly OHLCV CSV with columns: timestamp,open,high,low,close,volume,datetime_utc') ap.add_argument('--weth-csv', default='', help='Optional: hourly WETH/USDC price CSV to convert WETH-quoted price to USD') ap.add_argument('--fee-rate', type=float, required=True) ap.add_argument('--pool-tvl-usd', type=float, required=True, help='Pool TVL in USD at deployment time') ap.add_argument('--pool-assumed-range-pct', type=float, default=30.0, help='Assumed average LP range for pool liquidity calibration') ap.add_argument('--capital', type=float, default=600.0) ap.add_argument('--dec0', type=int, default=18, help='token0 decimals') ap.add_argument('--dec1', type=int, default=18, help='token1 decimals') ap.add_argument('--grid-lower', default='15,20,25,30,35,40,50') ap.add_argument('--grid-upper', default='5,10,15,20,25,30') ap.add_argument('--grid-rebalance-h', default='0,168,336,504', help='0=static') ap.add_argument('--time-from', default='') ap.add_argument('--time-to', default='') ap.add_argument('--out-dir', required=True) ap.add_argument('--pool-name', default='pool') args = ap.parse_args() print(f'[script_version] {__file__} SCRIPT_VERSION={SCRIPT_VERSION}') df = pd.read_csv(args.csv) df['datetime_utc'] = pd.to_datetime(df['datetime_utc'], utc=True) if args.time_from: df = df[df['datetime_utc'] >= pd.Timestamp(args.time_from, tz='UTC')] if args.time_to: df = df[df['datetime_utc'] < pd.Timestamp(args.time_to, tz='UTC')] df = df.sort_values('datetime_utc').reset_index(drop=True) if df.empty: raise SystemExit('No data in time range') # Convert prices to USD if weth-csv provided if args.weth_csv: wdf = pd.read_csv(args.weth_csv) wdf['datetime_utc'] = pd.to_datetime(wdf['datetime_utc'], utc=True) wdf = wdf.set_index('datetime_utc')['price_usdc'].sort_index() df['close_usd'] = df.apply(lambda r: r['close'] * float(wdf.asof(r['datetime_utc'])), axis=1) df['open_usd'] = df.apply(lambda r: r['open'] * float(wdf.asof(r['datetime_utc'])), axis=1) price_col = 'close_usd' else: price_col = 'close' prices = df[price_col].values.astype(float) volumes = df['volume'].values.astype(float) lowers = [float(x) for x in args.grid_lower.split(',')] uppers = [float(x) for x in args.grid_upper.split(',')] rebs = [int(x) for x in args.grid_rebalance_h.split(',')] rows = [] for lo in lowers: for up in uppers: for reb in rebs: res = run_strategy( prices, volumes, fee_rate=args.fee_rate, pool_tvl_usd=args.pool_tvl_usd, pool_assumed_range_pct=args.pool_assumed_range_pct, capital=args.capital, lower_pct=lo, upper_pct=up, rebalance_hours=reb, dec0=args.dec0, dec1=args.dec1 ) res.update({ 'pool': args.pool_name, 'lower_pct': lo, 'upper_pct': up, 'rebalance_h': reb, 'capital': args.capital, 'fee_rate': args.fee_rate, 'pool_tvl_usd': args.pool_tvl_usd, 'strategy': f'{"static" if reb==0 else f"reb_{reb}h"}_{lo}_{up}', }) rows.append(res) out = Path(args.out_dir) out.mkdir(parents=True, exist_ok=True) df_out = pd.DataFrame(rows) df_out.to_csv(out / 'summary.csv', index=False) # Valid: MDD > -20% AND PnL/MDD > 2 AND return > 0 valid = df_out[(df_out['mdd_pct'] >= -20) & (df_out['pnl_mdd'] >= 2.0) & (df_out['return_pct'] > 0)] valid.to_csv(out / 'valid.csv', index=False) print(f'\nTotal configs: {len(df_out)}') print(f'Valid (MDD>-20%, PnL/MDD>2, return>0): {len(valid)}') print(f'\nTop 15 by return (valid only):') cols = ['strategy','return_pct','mdd_pct','pnl_mdd','fees_total_usd','time_in_range_pct','rebalances'] if len(valid) > 0: print(valid.sort_values('return_pct', ascending=False).head(15)[cols].to_string(index=False)) else: print('No valid configs. Top 10 by PnL/MDD:') top = df_out[df_out['return_pct']>0].sort_values('pnl_mdd',ascending=False).head(10) print(top[cols].to_string(index=False)) # Write JSON summary result = { 'script_version': SCRIPT_VERSION, 'pool': args.pool_name, 'n_candles': len(df), 'period_start': str(df['datetime_utc'].iloc[0]), 'period_end': str(df['datetime_utc'].iloc[-1]), 'price_start': float(prices[0]), 'price_end': float(prices[-1]), 'total_volume': float(volumes.sum()), 'daily_volume_avg': float(volumes.sum() / max(len(df)/24, 1)), 'n_valid': len(valid), 'top_valid': valid.sort_values('return_pct',ascending=False).head(5)[cols].to_dict('records') if len(valid)>0 else [], } (out / 'summary.json').write_text(json.dumps(result, indent=2)) print(f'\nSaved to {out}') if __name__ == '__main__': main()