#!/usr/bin/env python3 """Research-only Callme DCA/V21 overnight tuner. Inputs are public Binance copytrader closed positions plus cached public 1m OHLCV. The runner does not place orders, read secrets, call private APIs, restart services, or inspect browser/session material. """ import argparse import csv import json import math import os import shutil import time from dataclasses import dataclass from pathlib import Path FEE = 0.0005 SLIPPAGE = 0.000425 MIN_ORDER_USD = 2.0 @dataclass(frozen=True) class Candidate: name: str target_notional: float base_frac: float steps_pct: tuple add_weights: tuple tp_pct: float exit_policy: str def read_positions(path): out = [] with open(path, "r", encoding="utf-8-sig", newline="") as fp: for row in csv.DictReader(fp): if row.get("status") != "All Closed": continue row["opened_ms"] = int(float(row["opened_ms"])) row["closed_ms"] = int(float(row["closed_ms"])) row["avgCost"] = float(row["avgCost"]) row["avgClosePrice"] = float(row["avgClosePrice"]) row["closingPnl"] = float(row.get("closingPnl") or 0.0) out.append(row) return sorted(out, key=lambda r: (r["opened_ms"], r["closed_ms"], r["id"])) def load_ohlcv(cache_dir): by_symbol = {} for path in sorted(Path(cache_dir).glob("*.json")): symbol = path.name.split("_", 1)[0] data = json.loads(path.read_text(encoding="utf-8")) by_symbol.setdefault(symbol, []) by_symbol[symbol].extend(data) merged = {} for symbol, rows in by_symbol.items(): seen = {} for row in rows: seen[int(row["t"])] = { "t": int(row["t"]), "open": float(row["open"]), "high": float(row["high"]), "low": float(row["low"]), "close": float(row["close"]), } merged[symbol] = [seen[k] for k in sorted(seen)] return merged def first_index_at_or_after(rows, t_ms): lo, hi = 0, len(rows) while lo < hi: mid = (lo + hi) // 2 if rows[mid]["t"] < t_ms: lo = mid + 1 else: hi = mid return min(lo, max(len(rows) - 1, 0)) def dca_levels(side, entry, steps_pct): levels = [] last = entry for step in steps_pct: if side == "LONG": last *= 1.0 - step / 100.0 else: last *= 1.0 + step / 100.0 levels.append(last) return levels def allocations(candidate): base = candidate.target_notional * candidate.base_frac remaining = max(candidate.target_notional - base, 0.0) total_w = sum(candidate.add_weights) adds = [remaining * w / total_w for w in candidate.add_weights] if total_w > 0 else [] return base, adds def ret_for(side, entry, exit_price): if side == "LONG": return (exit_price / entry) - 1.0 return (entry / exit_price) - 1.0 def max_drawdown_pct(equity): peak = None worst = 0.0 for value in equity: peak = value if peak is None else max(peak, value) if peak and peak > 0: worst = min(worst, 100.0 * (value - peak) / peak) return worst def pf(gross_profit, gross_loss): if gross_loss >= 0: return 999999.0 if gross_profit > 0 else 0.0 return gross_profit / abs(gross_loss) def hit_take_profit(side, avg_entry, tp_pct, row): if tp_pct <= 0: return None if side == "LONG": level = avg_entry * (1.0 + tp_pct / 100.0) if row["high"] >= level: return level else: level = avg_entry * (1.0 - tp_pct / 100.0) if row["low"] <= level: return level return None def simulate_trade(pos, candidate, ohlcv_by_symbol, initial_equity): rows = ohlcv_by_symbol.get(pos["symbol"], []) if not rows: return None start = first_index_at_or_after(rows, pos["opened_ms"]) + 1 end = first_index_at_or_after(rows, pos["closed_ms"]) + 1 if start >= len(rows) or end <= start: return None end = min(end, len(rows) - 1) entry = rows[start]["open"] base_notional, adds = allocations(candidate) levels = dca_levels(pos["side"], entry, candidate.steps_pct[: len(adds)]) legs = [base_notional] + adds min_leg = min(legs) if legs else 0.0 if min_leg < MIN_ORDER_USD: return None avg_entry = entry notional = base_notional fills = 0 min_mtm = 0.0 exit_price = rows[end]["open"] exit_reason = "source_close_next_bar_open" exit_t = rows[end]["t"] for i in range(start, end + 1): row = rows[i] if i not in (start, end): fills_this = 0 while fills < len(levels): crossed = row["close"] <= levels[fills] if pos["side"] == "LONG" else row["close"] >= levels[fills] if not crossed or fills_this >= 1: break add_notional = adds[fills] old_qty = notional / max(avg_entry, 1e-12) add_qty = add_notional / max(levels[fills], 1e-12) notional += add_notional avg_entry = notional / max(old_qty + add_qty, 1e-12) fills += 1 fills_this += 1 adverse_mark = row["low"] if pos["side"] == "LONG" else row["high"] mtm = (ret_for(pos["side"], avg_entry, adverse_mark) - 2 * FEE - 2 * SLIPPAGE) * notional min_mtm = min(min_mtm, mtm) if candidate.exit_policy == "v21_tp_then_source_close" and i not in (start, end): tp_price = hit_take_profit(pos["side"], avg_entry, candidate.tp_pct, row) if tp_price is not None: exit_price = tp_price exit_reason = "v21_full_sell_tp" exit_t = row["t"] break net = (ret_for(pos["side"], avg_entry, exit_price) - 2 * FEE - 2 * SLIPPAGE) * notional return { "id": pos["id"], "symbol": pos["symbol"], "side": pos["side"], "opened_ms": pos["opened_ms"], "closed_ms": pos["closed_ms"], "exit_t": exit_t, "exit_reason": exit_reason, "candidate": candidate.name, "entry": entry, "exit": exit_price, "notional": notional, "fills": fills, "net": net, "min_mtm": min_mtm, "min_mtm_pct_equity": 100.0 * min_mtm / initial_equity, "margin_call": abs(min_mtm) >= max(notional, 1e-12), } def summarize(trades, initial_equity): equity = [initial_equity] gross_profit = 0.0 gross_loss = 0.0 wins = 0 min_trade_mtm = 0.0 margin_calls = 0 for trade in sorted(trades, key=lambda r: (r["closed_ms"], r["id"])): net = trade["net"] equity.append(equity[-1] + net) if net >= 0: gross_profit += net wins += 1 else: gross_loss += net min_trade_mtm = min(min_trade_mtm, trade["min_mtm_pct_equity"]) margin_calls += 1 if trade["margin_call"] else 0 return { "trades": len(trades), "equity_start": initial_equity, "equity_end": equity[-1], "net_pct": 100.0 * (equity[-1] - initial_equity) / initial_equity, "max_realized_dd_pct": max_drawdown_pct(equity), "max_mtm_dd_pct": min_trade_mtm, "min_trade_mtm_pct_equity": min_trade_mtm, "win_rate_pct": 100.0 * wins / len(trades) if trades else 0.0, "pf": pf(gross_profit, gross_loss), "gross_profit": gross_profit, "gross_loss": gross_loss, "avg_notional": sum(t["notional"] for t in trades) / len(trades) if trades else 0.0, "max_notional": max([t["notional"] for t in trades] or [0.0]), "avg_dca_fills": sum(t["fills"] for t in trades) / len(trades) if trades else 0.0, "margin_call_count": margin_calls, } def candidate_grid(max_target_notional, quick, heavy): targets = [50, 75, 100, 150, 200, 250, 300, 400, 500] base_fracs = [0.12, 0.16, 0.22, 0.28, 0.35] step_sets = [ (0.35, 0.55, 0.80), (0.45, 0.70, 1.00), (0.55, 0.85, 1.25), (0.45, 0.70, 1.00, 1.40), (0.60, 0.90, 1.30, 1.80), (0.45, 0.70, 1.00, 1.40, 1.90), ] weight_sets = [ (1.0, 1.3, 1.8), (0.8, 1.2, 2.2), (0.7, 1.0, 1.5, 2.4), (0.6, 0.9, 1.3, 1.8, 2.6), ] tp_values = [0.0, 0.45, 0.7, 1.0, 1.4, 2.0] if heavy: targets = [50, 75, 100, 125, 150, 200, 250, 300, 350, 400, 450, 500] base_fracs = [0.10, 0.12, 0.16, 0.20, 0.24, 0.28, 0.35, 0.45] step_sets = [ (0.25, 0.40, 0.60), (0.35, 0.55, 0.80), (0.45, 0.70, 1.00), (0.55, 0.85, 1.25), (0.35, 0.55, 0.80, 1.10), (0.45, 0.70, 1.00, 1.40), (0.60, 0.90, 1.30, 1.80), (0.45, 0.70, 1.00, 1.40, 1.90), (0.70, 1.05, 1.55, 2.15, 2.95), ] weight_sets = [ (0.9, 1.1, 1.4), (1.0, 1.3, 1.8), (0.8, 1.2, 2.2), (0.75, 1.0, 1.4, 2.0), (0.7, 1.0, 1.5, 2.4), (0.55, 0.85, 1.2, 1.8, 2.7), (0.6, 0.9, 1.3, 1.8, 2.6), ] tp_values = [0.0, 0.30, 0.45, 0.60, 0.80, 1.0, 1.2, 1.5, 2.0, 2.5] if quick: targets = [50, 100, 200] base_fracs = [0.16, 0.28] step_sets = step_sets[:3] tp_values = [0.0, 0.7] out = [] for target in targets: if target > max_target_notional: continue out.append(Candidate("plain_source_t%d" % target, target, 1.0, tuple(), tuple(), 0.0, "source_close_only")) for base_frac in base_fracs: for steps in step_sets: for weights in weight_sets: if len(weights) != len(steps): continue for tp in tp_values: policy = "source_close_only" if tp <= 0 else "v21_tp_then_source_close" name = "t%d_b%d_s%s_w%s_tp%s_%s" % ( target, int(base_frac * 100), "-".join(str(x).replace(".", "p") for x in steps), "-".join(str(x).replace(".", "p") for x in weights), str(tp).replace(".", "p"), "src" if policy == "source_close_only" else "v21", ) out.append(Candidate(name, target, base_frac, steps, weights, tp, policy)) return out def write_csv(path, rows): path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) if not rows: path.write_text("", encoding="utf-8") return with path.open("w", encoding="utf-8", newline="") as fp: writer = csv.DictWriter(fp, fieldnames=list(rows[0].keys())) writer.writeheader() writer.writerows(rows) def write_json(path, data): path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") def disk_free_mb(path): if hasattr(os, "statvfs"): st = os.statvfs(str(path)) return st.f_bavail * st.f_frsize / (1024 * 1024) return shutil.disk_usage(str(path)).free / (1024 * 1024) def main(): ap = argparse.ArgumentParser() ap.add_argument("--positions", required=True) ap.add_argument("--ohlcv-cache", required=True) ap.add_argument("--out-dir", required=True) ap.add_argument("--initial-equity", type=float, default=500.0) ap.add_argument("--max-target-notional", type=float, default=500.0) ap.add_argument("--min-free-mb", type=float, default=4096.0) ap.add_argument("--stop-file", default="") ap.add_argument("--quick", action="store_true") ap.add_argument("--heavy", action="store_true") args = ap.parse_args() out_dir = Path(args.out_dir) out_dir.mkdir(parents=True, exist_ok=True) stop_file = Path(args.stop_file) if args.stop_file else out_dir / "STOP" positions = read_positions(args.positions) ohlcv = load_ohlcv(args.ohlcv_cache) candidates = candidate_grid(args.max_target_notional, args.quick, args.heavy) groups = {} for pos in positions: groups.setdefault("%s %s" % (pos["symbol"], pos["side"]), []).append(pos) status = { "started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "state": "running", "safety": { "no_live_orders": True, "no_secrets_read": True, "no_private_sessions": True, "no_runtime_restarts": True, "public_ohlcv_cache_only": True, }, "inputs": { "positions": args.positions, "ohlcv_cache": args.ohlcv_cache, "positions_count": len(positions), "groups": sorted(groups), "candidate_count": len(candidates), "grid_mode": "heavy" if args.heavy else ("quick" if args.quick else "standard"), }, "resource_guards": { "min_free_mb": args.min_free_mb, "stop_file": str(stop_file), }, } write_json(out_dir / "STATUS.json", status) group_results = [] chosen = {} for group_name in sorted(groups): if stop_file.exists(): status["state"] = "stopped_by_stop_file" write_json(out_dir / "STATUS.json", status) break if disk_free_mb(out_dir) < args.min_free_mb: status["state"] = "stopped_low_disk" write_json(out_dir / "STATUS.json", status) break group_positions = groups[group_name] rows = [] best = None best_trades = [] for candidate in candidates: trades = [] for pos in group_positions: trade = simulate_trade(pos, candidate, ohlcv, args.initial_equity) if trade is not None: trades.append(trade) if len(trades) != len(group_positions): continue summary = summarize(trades, args.initial_equity) summary.update({ "group": group_name, "candidate": candidate.name, "target_notional": candidate.target_notional, "base_frac": candidate.base_frac, "steps_pct": json.dumps(candidate.steps_pct), "add_weights": json.dumps(candidate.add_weights), "tp_pct": candidate.tp_pct, "exit_policy": candidate.exit_policy, }) rows.append(summary) if summary["margin_call_count"] == 0: score = (summary["net_pct"], summary["pf"], summary["max_mtm_dd_pct"]) if best is None or score > best[0]: best = (score, summary) best_trades = trades rows.sort(key=lambda r: (r["net_pct"], r["pf"], r["max_mtm_dd_pct"]), reverse=True) write_csv(out_dir / "groups" / group_name.replace(" ", "_") / "candidate_summary.csv", rows) if best is not None: chosen[group_name] = best[1] write_csv(out_dir / "groups" / group_name.replace(" ", "_") / "best_trades.csv", best_trades) group_results.append(best[1]) status["last_completed_group"] = group_name status["completed_groups"] = len(group_results) write_json(out_dir / "STATUS.json", status) combined_trades = [] for pos in positions: group_name = "%s %s" % (pos["symbol"], pos["side"]) config = chosen.get(group_name) if not config: continue candidate = None for c in candidates: if c.name == config["candidate"]: candidate = c break if candidate is None: continue trade = simulate_trade(pos, candidate, ohlcv, args.initial_equity) if trade is not None: combined_trades.append(trade) all_signal_summary = summarize(combined_trades, args.initial_equity) all_signal_summary["covered_trades"] = len(combined_trades) all_signal_summary["source_trades"] = len(positions) all_signal_summary["coverage_pct"] = 100.0 * len(combined_trades) / max(len(positions), 1) write_csv(out_dir / "selected_group_configs.csv", group_results) write_csv(out_dir / "all_signal_backtest_trades.csv", combined_trades) write_json(out_dir / "summary.json", { "status": "complete" if len(combined_trades) == len(positions) else "partial", "all_signal_summary": all_signal_summary, "selected_group_configs": group_results, "responsibility_split": { "source_copytrader": [ "defines entry permission", "defines source-close maximum holding boundary", "defines symbol and side" ], "v21_dca": [ "chooses per-symbol-side DCA sizing ladder", "may full-sell early when tuned TP fires if exit_policy is v21_tp_then_source_close", "does not open trades without source signal" ], "runner_broker": [ "execution only", "no strategy invention", "no live authority in this research run" ] }, }) status["state"] = "complete" status["completed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) status["all_signal_summary"] = all_signal_summary write_json(out_dir / "STATUS.json", status) report = [ "# Callme Overnight DCA/V21 Research Run", "", "Status: `%s`." % status["state"], "", "Safety: no live orders, no secrets, no private sessions, no runtime restarts.", "", "## All-Signal Backtest", "", "- source trades: `%s`" % all_signal_summary["source_trades"], "- covered trades: `%s`" % all_signal_summary["covered_trades"], "- net %%: `%.4f`" % all_signal_summary["net_pct"], "- PF: `%.4f`" % all_signal_summary["pf"], "- max realized DD %%: `%.4f`" % all_signal_summary["max_realized_dd_pct"], "- max/min trade MTM %% equity: `%.4f`" % all_signal_summary["min_trade_mtm_pct_equity"], "- margin calls: `%s`" % all_signal_summary["margin_call_count"], "", "## Responsibility Split", "", "- Source copytrader owns symbol, side, entry permission, and source-close maximum holding boundary.", "- V21/DCA owns per-symbol-side DCA ladder and may full-sell early only when the tuned V21 TP policy wins.", "- Runner/broker remains execution-only; this run does not authorize live or paper-live deployment.", ] (out_dir / "REPORT.md").write_text("\n".join(report) + "\n", encoding="utf-8") if __name__ == "__main__": main()