#!/usr/bin/env python3 """Paper-live observer for Callme public Binance copytrader signals. This runner is paper-only. It polls public Binance copytrading open positions, uses current public market data as executable proxy, applies the selected Callme DCA/V21 configs, and records slippage telemetry. It never sends orders, does not read secrets, and does not touch private sessions. """ import argparse import csv import json import math import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import requests POSITIONS_URL = "https://www.binance.com/bapi/futures/v1/friendly/future/copy-trade/lead-data/positions" BINANCE_MARK_URL = "https://fapi.binance.com/fapi/v1/premiumIndex" BINANCE_DEPTH_URL = "https://fapi.binance.com/fapi/v1/depth" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0 Safari/537.36" FEE = 0.0005 DEFAULT_SLIPPAGE_BP = 4.25 def utc_now() -> datetime: return datetime.now(timezone.utc) def iso(dt: datetime) -> str: return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") def parse_float(raw: Any, default: float = math.nan) -> float: try: if raw in ("", None): return default return float(str(raw).replace(",", "")) except Exception: return default def finite_pos(raw: Any) -> Optional[float]: val = parse_float(raw) return val if math.isfinite(val) and val > 0 else None def norm_side(raw: Any, amount: float = 0.0) -> str: side = str(raw or "").strip().upper() if side in {"LONG", "SHORT"}: return side if side == "BOTH": return "LONG" if amount >= 0 else "SHORT" if side == "BUY": return "LONG" if side == "SELL": return "SHORT" return "UNKNOWN" def ret_for(side: str, entry: float, exit_px: float) -> float: if side == "LONG": return exit_px / entry - 1.0 return entry / exit_px - 1.0 def exec_price(mark: float, side: str, action: str, slippage_bp: float) -> float: slip = slippage_bp / 10000.0 if side == "LONG": return mark * (1.0 + slip) if action == "entry" else mark * (1.0 - slip) return mark * (1.0 - slip) if action == "entry" else mark * (1.0 + slip) def headers(portfolio_id: str) -> Dict[str, str]: return { "Accept": "application/json", "Content-Type": "application/json", "User-Agent": USER_AGENT, "Origin": "https://www.binance.com", "Referer": f"https://www.binance.com/en/copy-trading/lead-details/{portfolio_id}", } def read_json(path: Path, default: Any) -> Any: try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return default def write_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") def append_jsonl(path: Path, rows: List[Dict[str, Any]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as fp: for row in rows: fp.write(json.dumps(row, sort_keys=True, ensure_ascii=False) + "\n") def read_configs(path: Path) -> Dict[str, Dict[str, Any]]: out: Dict[str, Dict[str, Any]] = {} with path.open("r", encoding="utf-8", newline="") as fp: for row in csv.DictReader(fp): group = str(row.get("group") or "").strip() if group: out[group] = row return out def fallback_config(group: str) -> Dict[str, Any]: return { "group": group, "candidate": "fallback_plain_source_t500_unseen_symbol", "target_notional": "500", "base_frac": "1.0", "steps_pct": "[]", "add_weights": "[]", "tp_pct": "0.0", "exit_policy": "source_close_only", "fallback_config": True, } def parse_tuple(raw: Any) -> List[float]: if not raw: return [] text = str(raw).strip() if text.startswith("["): try: return [float(x) for x in json.loads(text)] except Exception: return [] return [] def fetch_open_positions(session: requests.Session, portfolio_id: str, timeout_sec: float) -> List[Dict[str, Any]]: resp = session.get(POSITIONS_URL, params={"portfolioId": portfolio_id}, headers=headers(portfolio_id), timeout=timeout_sec) resp.raise_for_status() data = resp.json() if str(data.get("code")) != "000000": raise RuntimeError(f"Binance positions code={data.get('code')} message={data.get('message')}") rows = data.get("data") or [] out = [] for row in rows if isinstance(rows, list) else []: amount = parse_float(row.get("positionAmount"), 0.0) notional = abs(parse_float(row.get("notionalValue"), 0.0)) if abs(amount) <= 0 and notional <= 0: continue symbol = str(row.get("symbol") or "").upper().strip() side = norm_side(row.get("positionSide"), amount) if not symbol or side not in {"LONG", "SHORT"}: continue out.append( { "source_id": str(row.get("id") or f"{symbol}:{side}"), "symbol": symbol, "side": side, "source_entry_price": finite_pos(row.get("entryPrice")) or finite_pos(row.get("breakEvenPrice")), "source_mark_price": finite_pos(row.get("markPrice")), "source_unrealized_profit": parse_float(row.get("unrealizedProfit"), 0.0), "source_notional_value": parse_float(row.get("notionalValue"), 0.0), "raw": row, } ) return out def market_snapshot(session: requests.Session, symbol: str, timeout_sec: float) -> Dict[str, Any]: started = time.time() mark = None mark_source = "missing" try: resp = session.get(BINANCE_MARK_URL, params={"symbol": symbol}, timeout=timeout_sec) resp.raise_for_status() mark = finite_pos(resp.json().get("markPrice")) if mark: mark_source = "binance_futures_mark" except Exception as exc: mark_source = f"binance_mark_error:{str(exc)[:120]}" book: Dict[str, Any] = {"source": "binance_futures_depth"} try: resp = session.get(BINANCE_DEPTH_URL, params={"symbol": symbol, "limit": 20}, timeout=timeout_sec) resp.raise_for_status() data = resp.json() bids = data.get("bids") or [] asks = data.get("asks") or [] bid = finite_pos(bids[0][0]) if bids else None ask = finite_pos(asks[0][0]) if asks else None mid = (bid + ask) / 2.0 if bid and ask else None book.update( { "best_bid": bid, "best_ask": ask, "mid": mid, "spread_bp": ((ask - bid) / mid * 10000.0) if bid and ask and mid else None, "bid_depth_top5_usdt": sum(float(px) * float(qty) for px, qty in bids[:5]), "ask_depth_top5_usdt": sum(float(px) * float(qty) for px, qty in asks[:5]), } ) except Exception as exc: book["error"] = str(exc)[:160] return { "symbol": symbol, "mark": mark, "mark_source": mark_source, "orderbook": book, "request_latency_ms": round((time.time() - started) * 1000.0, 3), } def slippage_model(symbol: str, notional: float, book: Dict[str, Any], fallback_bp: float) -> Dict[str, Any]: spread_bp = book.get("spread_bp") half_spread = float(spread_bp) / 2.0 if spread_bp is not None else None bid_depth = finite_pos(book.get("bid_depth_top5_usdt")) ask_depth = finite_pos(book.get("ask_depth_top5_usdt")) depths = [x for x in (bid_depth, ask_depth) if x is not None] depth = min(depths) if depths else None depth_impact = 0.0 if depth is not None and notional > depth: depth_impact = min(50.0, 10000.0 * (notional - depth) / max(notional, 1e-12)) suggested = max(0.5, float(fallback_bp), float(half_spread or 0.0) + depth_impact + 0.25) return { "symbol": symbol, "model": "half_spread_plus_depth_floor_v1", "fallback_bp": fallback_bp, "spread_bp": spread_bp, "half_spread_bp": half_spread, "depth_impact_bp": depth_impact, "suggested_slippage_bp_per_side": suggested, "notional_usdt": notional, } def dca_levels(side: str, entry: float, steps: List[float]) -> List[float]: levels = [] last = entry for step in steps: last = last * (1.0 - step / 100.0) if side == "LONG" else last * (1.0 + step / 100.0) levels.append(last) return levels def allocations(cfg: Dict[str, Any]) -> Tuple[float, List[float]]: target = float(cfg.get("target_notional") or 0.0) base_frac = float(cfg.get("base_frac") or 1.0) weights = parse_tuple(cfg.get("add_weights")) base = target * base_frac remaining = max(target - base, 0.0) total_w = sum(weights) adds = [remaining * w / total_w for w in weights] if total_w > 0 else [] return base, adds def maybe_open_trade(pos: Dict[str, Any], cfg: Dict[str, Any], snap: Dict[str, Any], slippage_bp: float, now: str) -> Optional[Dict[str, Any]]: mark = snap.get("mark") if not mark: return None base, adds = allocations(cfg) entry = exec_price(float(mark), pos["side"], "entry", slippage_bp) return { "paper_key": f"{pos['source_id']}:{pos['symbol']}:{pos['side']}", "source_id": pos["source_id"], "symbol": pos["symbol"], "side": pos["side"], "opened_utc": now, "last_seen_utc": now, "status": "OPEN", "candidate": cfg.get("candidate"), "exit_policy": cfg.get("exit_policy"), "target_notional": float(cfg.get("target_notional") or 0.0), "base_notional": base, "adds": adds, "steps_pct": parse_tuple(cfg.get("steps_pct")), "tp_pct": float(cfg.get("tp_pct") or 0.0), "entry_mark_price": float(mark), "entry_exec_price": entry, "avg_entry": entry, "notional": base, "fills": 0, "min_mtm_usdt": 0.0, "min_mtm_pct_equity": 0.0, "entry_snapshot": snap, "raw_signal": pos, } def update_trade(trade: Dict[str, Any], snap: Dict[str, Any], slippage_bp: float, initial_equity: float, now: str) -> Tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]: events = [] mark = snap.get("mark") if not mark: return events, None side = trade["side"] levels = dca_levels(side, float(trade["entry_exec_price"]), list(trade.get("steps_pct") or [])) adds = list(trade.get("adds") or []) while int(trade.get("fills") or 0) < len(levels): idx = int(trade.get("fills") or 0) crossed = float(mark) <= levels[idx] if side == "LONG" else float(mark) >= levels[idx] if not crossed: break add_notional = float(adds[idx]) old_qty = float(trade["notional"]) / max(float(trade["avg_entry"]), 1e-12) add_qty = add_notional / max(levels[idx], 1e-12) trade["notional"] = float(trade["notional"]) + add_notional trade["avg_entry"] = float(trade["notional"]) / max(old_qty + add_qty, 1e-12) trade["fills"] = idx + 1 events.append({"type": "paper_dca_fill", "utc": now, "paper_key": trade["paper_key"], "symbol": trade["symbol"], "level": levels[idx], "add_notional": add_notional}) break mtm = (ret_for(side, float(trade["avg_entry"]), float(mark)) - 2 * FEE - 2 * slippage_bp / 10000.0) * float(trade["notional"]) trade["min_mtm_usdt"] = min(float(trade.get("min_mtm_usdt") or 0.0), mtm) trade["min_mtm_pct_equity"] = min(float(trade.get("min_mtm_pct_equity") or 0.0), 100.0 * mtm / max(initial_equity, 1e-12)) trade["last_seen_utc"] = now trade["last_mark_price"] = float(mark) trade["last_snapshot"] = snap if str(trade.get("exit_policy")) == "v21_tp_then_source_close" and float(trade.get("tp_pct") or 0.0) > 0: tp_pct = float(trade["tp_pct"]) tp_price = float(trade["avg_entry"]) * (1.0 + tp_pct / 100.0) if side == "LONG" else float(trade["avg_entry"]) * (1.0 - tp_pct / 100.0) hit = float(mark) >= tp_price if side == "LONG" else float(mark) <= tp_price if hit: closed = close_trade(trade, float(mark), snap, slippage_bp, "v21_full_sell_tp", now) return events, closed return events, None def close_trade(trade: Dict[str, Any], mark: float, snap: Dict[str, Any], slippage_bp: float, reason: str, now: str) -> Dict[str, Any]: exit_px = exec_price(mark, trade["side"], "exit", slippage_bp) pnl = (ret_for(trade["side"], float(trade["avg_entry"]), exit_px) - 2 * FEE) * float(trade["notional"]) closed = dict(trade) closed.update( { "status": "CLOSED", "closed_utc": now, "exit_mark_price": mark, "exit_exec_price": exit_px, "exit_snapshot": snap, "exit_reason": reason, "paper_pnl_usdt": pnl, "paper_return_pct": 100.0 * pnl / max(float(trade["notional"]), 1e-12), } ) return closed def summarize(state: Dict[str, Any]) -> Dict[str, Any]: closed = state.get("closed_trades") or [] open_pos = state.get("open_positions") or {} gross_profit = sum(float(t.get("paper_pnl_usdt") or 0.0) for t in closed if float(t.get("paper_pnl_usdt") or 0.0) >= 0) gross_loss = sum(float(t.get("paper_pnl_usdt") or 0.0) for t in closed if float(t.get("paper_pnl_usdt") or 0.0) < 0) pf = 999999.0 if gross_loss == 0 and gross_profit > 0 else (gross_profit / abs(gross_loss) if gross_loss else 0.0) return { "open_paper_positions": len(open_pos), "closed_paper_trades": len(closed), "realized_pnl_usdt": sum(float(t.get("paper_pnl_usdt") or 0.0) for t in closed), "gross_profit": gross_profit, "gross_loss": gross_loss, "pf": pf, "events_kept": len(state.get("events") or []), } def poll_once(args: argparse.Namespace) -> Dict[str, Any]: now_dt = utc_now() now = iso(now_dt) session = requests.Session() state = read_json(Path(args.state_path), {"open_positions": {}, "closed_trades": [], "events": [], "seen_source_keys": []}) state.setdefault("open_positions", {}) state.setdefault("closed_trades", []) state.setdefault("events", []) state.setdefault("seen_source_keys", []) configs = read_configs(Path(args.configs_csv)) source_positions = fetch_open_positions(session, args.portfolio_id, args.timeout_sec) current_keys = {f"{p['source_id']}:{p['symbol']}:{p['side']}" for p in source_positions} events: List[Dict[str, Any]] = [] telemetry: List[Dict[str, Any]] = [] if not state.get("seeded_at_utc"): state["seeded_at_utc"] = now state["seen_source_keys"] = sorted(current_keys) events.append({"type": "seed_existing_source_positions", "utc": now, "count": len(current_keys), "policy": "do_not_back_enter_existing_open_positions"}) for pos in source_positions: key = f"{pos['source_id']}:{pos['symbol']}:{pos['side']}" group = f"{pos['symbol']} {pos['side']}" snap = market_snapshot(session, pos["symbol"], args.timeout_sec) cfg = configs.get(group) or fallback_config(group) notional = float(cfg.get("target_notional") or args.initial_equity) model = slippage_model(pos["symbol"], notional, snap.get("orderbook") or {}, args.slippage_bp) telemetry.append({"utc": now, "type": "source_open_snapshot", "source_key": key, "group": group, "has_tuned_config": group in configs, "fallback_config": bool(cfg.get("fallback_config")), "snapshot": snap, "slippage_model": model, "source_position": pos}) if key in state["open_positions"]: more_events, closed = update_trade(state["open_positions"][key], snap, args.slippage_bp, args.initial_equity, now) events.extend(more_events) if closed: state["closed_trades"].append(closed) del state["open_positions"][key] events.append({"type": "paper_exit", "utc": now, "paper_key": key, "symbol": pos["symbol"], "reason": closed["exit_reason"], "pnl": closed["paper_pnl_usdt"]}) continue if key in set(state.get("seen_source_keys") or []): continue state.setdefault("seen_source_keys", []).append(key) trade = maybe_open_trade(pos, cfg, snap, args.slippage_bp, now) if not trade: events.append({"type": "skip_signal", "utc": now, "source_key": key, "group": group, "reason": "missing_mark"}) continue state["open_positions"][key] = trade events.append({"type": "paper_entry", "utc": now, "paper_key": key, "symbol": pos["symbol"], "side": pos["side"], "entry_mark": trade["entry_mark_price"], "entry_exec": trade["entry_exec_price"], "candidate": trade["candidate"]}) for key, trade in list(state["open_positions"].items()): if key in current_keys: continue snap = market_snapshot(session, trade["symbol"], args.timeout_sec) mark = snap.get("mark") if not mark: continue closed = close_trade(trade, float(mark), snap, args.slippage_bp, "source_position_no_longer_open", now) state["closed_trades"].append(closed) del state["open_positions"][key] events.append({"type": "paper_exit", "utc": now, "paper_key": key, "symbol": trade["symbol"], "reason": closed["exit_reason"], "pnl": closed["paper_pnl_usdt"]}) state["events"].extend(events) state["events"] = state["events"][-args.max_events :] state["last_poll"] = { "utc": now, "portfolio_id": args.portfolio_id, "source_open_positions": len(source_positions), "events": events, "summary": summarize(state), "safety": { "paper_only": True, "no_live_orders": True, "no_secrets_read": True, "entry_price_source": "current_public_binance_futures_mark", "source_avgCost_used_for_entry": False, }, } if not args.dry_run: write_json(Path(args.state_path), state) write_json(Path(args.status_json), state["last_poll"]) append_jsonl(Path(args.events_jsonl), events) append_jsonl(Path(args.telemetry_jsonl), telemetry) return state["last_poll"] def build_parser() -> argparse.ArgumentParser: ap = argparse.ArgumentParser() ap.add_argument("--portfolio-id", default="4512404768792222208") ap.add_argument("--configs-csv", required=True) ap.add_argument("--state-path", required=True) ap.add_argument("--status-json", required=True) ap.add_argument("--events-jsonl", required=True) ap.add_argument("--telemetry-jsonl", required=True) ap.add_argument("--initial-equity", type=float, default=500.0) ap.add_argument("--slippage-bp", type=float, default=DEFAULT_SLIPPAGE_BP) ap.add_argument("--timeout-sec", type=float, default=20.0) ap.add_argument("--interval-sec", type=float, default=60.0) ap.add_argument("--max-events", type=int, default=2000) ap.add_argument("--dry-run", action="store_true") ap.add_argument("--once", action="store_true") ap.add_argument("--loop", action="store_true") return ap def main() -> None: args = build_parser().parse_args() while True: print(json.dumps(poll_once(args), ensure_ascii=False, indent=2), flush=True) if not args.loop: break time.sleep(args.interval_sec) if __name__ == "__main__": main()