#!/usr/bin/env python3 """Callme 30-minute research/paper-live coordinator. This is a deterministic server-side coordinator, not a real-money trader. It wakes, audits current research outputs, writes the next safe pipeline actions, may launch one guarded heavy tune, and may prepare paper-live only after gates. Real-money live orders remain hard-blocked. """ import json import os import re import subprocess import time from pathlib import Path ROOT = Path("/var/www/vps2.happyuser.info/top/callme_overnight_20260601") OUT = ROOT / "output" COORD = OUT / "coordinator" STOP = ROOT / "STOP_COORDINATOR" SUMMARY = OUT / "summary.json" RUNNER = ROOT / "callme_dca_v21_overnight_runner.py" PUBLIC_STATUS = Path("/var/www/vps2.happyuser.info/veronica_pipeline_graph/callme_agent_status.json") HEAVY_SCRIPT = ROOT / "callme_heavy_tune_once.sh" PAPER_RUNNER = ROOT / "callme_paper_live_runner.py" PAPER_STATUS = OUT / "paper_live" / "status.json" HEAVY_LOCK = COORD / "heavy_job.lock" HEAVY_LAUNCH_MARKER = COORD / "HEAVY_LAUNCHED_BY_COORDINATOR.json" HEAVY_LAST_RUN = COORD / "HEAVY_LAST_RUN.json" HEAVY_CURRENT_RUN = COORD / "HEAVY_CURRENT_RUN.json" POLICY = { "paper_live_allowed_if_ready": True, "real_live_allowed": False, "heavy_jobs_allowed": True, "max_heavy_jobs_parallel": 1, "heavy_job_cpu_policy": "taskset CPU 0, nice 15, ionice best-effort/7; leave CPU 1 and memory headroom for current live", "heavy_job_retention": "after completion keep summary.json, selected_group_configs.csv, group_top_metrics.csv, REPORT.md, STATUS.json, RETENTION_MANIFEST.json only", } TARGETS = { "baseline_name": "HYPE yearly live-model research baseline", "hard_gates": { "coverage_pct": 100.0, "margin_call_count": 0, "no_live_orders": True, "no_secrets_read": True, "entry_uses_signal_avgCost": False, }, "primary_goals": { "net_pct_min": 52.12610575421329, "max_mtm_dd_pct_min": -12.265614112284059, "min_trade_mtm_pct_equity_min": -13.037553836794384, }, "stretch_goals": { "pf_min": 3.7159234919152704, "max_realized_dd_pct_min": -5.637815184175834, }, } def read_json(path): if not path.exists(): return None return json.loads(path.read_text(encoding="utf-8")) def write_json(path, data): path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") def redact_public(value): if isinstance(value, dict): out = {} for key, item in value.items(): if key in {"script", "log", "run_dir", "retention_manifest"}: out[key] = Path(str(item)).name if item else item else: out[key] = redact_public(item) return out if isinstance(value, list): return [redact_public(item) for item in value] if isinstance(value, str) and value.startswith(str(ROOT)): return value.replace(str(ROOT), "$CALLME_ROOT") return value def append_log(line): COORD.mkdir(parents=True, exist_ok=True) with (COORD / "WAKE_LOG.md").open("a", encoding="utf-8") as fp: fp.write(line.rstrip() + "\n") def disk_free_mb(path): st = os.statvfs(str(path)) return st.f_bavail * st.f_frsize / (1024 * 1024) def read_meminfo_mb(): out = {} try: for line in Path("/proc/meminfo").read_text(encoding="utf-8").splitlines(): key, rest = line.split(":", 1) value_kb = int(rest.strip().split()[0]) out[key] = value_kb / 1024.0 except Exception: pass return out def audit_entry_policy(): text = RUNNER.read_text(encoding="utf-8") avg_cost_entry = bool(re.search(r"entry\s*=\s*.*avgCost", text)) next_open_entry = 'entry = rows[start]["open"]' in text or "entry = rows[start]['open']" in text signal_avg_cost_mentions = len(re.findall(r"avgCost", text)) policy = { "status": "PASS" if next_open_entry and not avg_cost_entry else "FAIL", "backtest_entry_source": "next_1m_open_after_signal_receipt", "uses_signal_avgCost_for_entry": avg_cost_entry, "runner_has_next_bar_open_entry": next_open_entry, "signal_avgCost_mentions_in_file": signal_avg_cost_mentions, "live_requirement": ( "For paper/live, source signal only authorizes symbol/side/entry time. " "The executable entry must be the current public exchange mark price or " "best available market snapshot at the moment our system receives the signal. " "Do not use Binance copy avgCost/avgClosePrice as executable entry/exit prices." ), "research_caveat": ( "Historical exact mark-price-at-receipt is not available in this cached dataset. " "The current replay uses next 1m candle open after the signal timestamp as an executable proxy." ), } write_json(COORD / "MARK_PRICE_AUDIT.json", policy) (COORD / "MARK_PRICE_EXECUTION_POLICY.md").write_text( "\n".join( [ "# Mark Price Execution Policy", "", "Status: `%s`." % policy["status"], "", "- Backtest executable proxy: `next_1m_open_after_signal_receipt`.", "- Live/paper requirement: use current public exchange mark price or immediate market snapshot when our system receives the signal.", "- Do not use source `avgCost` or `avgClosePrice` as executable fill prices.", "- Source copytrader owns symbol, side, and permission timing; V21/DCA owns sizing/exit policy; runner only executes.", "", "Caveat: historical exact mark price at signal receipt is unavailable in the cached public data, so `next_1m_open` is the replay proxy.", ] ) + "\n", encoding="utf-8", ) return policy def score_summary(summary, mark_policy): if not summary: return {"status": "BLOCKED", "reason": "missing summary.json"} s = summary.get("all_signal_summary", {}) hard = { "coverage_ok": float(s.get("coverage_pct", 0.0)) >= TARGETS["hard_gates"]["coverage_pct"], "margin_ok": int(s.get("margin_call_count", 999)) == TARGETS["hard_gates"]["margin_call_count"], "mark_entry_ok": mark_policy.get("status") == "PASS", } primary = { "net_ok": float(s.get("net_pct", -999.0)) >= TARGETS["primary_goals"]["net_pct_min"], "max_mtm_ok": float(s.get("max_mtm_dd_pct", -999.0)) >= TARGETS["primary_goals"]["max_mtm_dd_pct_min"], "min_trade_mtm_ok": float(s.get("min_trade_mtm_pct_equity", -999.0)) >= TARGETS["primary_goals"]["min_trade_mtm_pct_equity_min"], } stretch = { "pf_ok": float(s.get("pf", 0.0)) >= TARGETS["stretch_goals"]["pf_min"], "max_realized_dd_ok": float(s.get("max_realized_dd_pct", -999.0)) >= TARGETS["stretch_goals"]["max_realized_dd_pct_min"], } status = "PASS_RESEARCH_CANDIDATE" if not all(hard.values()): status = "FAIL_HARD_GATES" elif not all(primary.values()): status = "PARTIAL_PRIMARY_GOALS" elif not all(stretch.values()): status = "PASS_PRIMARY_FAIL_STRETCH" return { "status": status, "hard_gates": hard, "primary_goals": primary, "stretch_goals": stretch, "targets": TARGETS, "observed": s, } def heavy_lock_active(): return HEAVY_LOCK.exists() def heavy_should_launch(score, resources): if not POLICY["heavy_jobs_allowed"]: return False, "heavy_jobs_disabled" if score.get("status") != "PASS_PRIMARY_FAIL_STRETCH": return False, "heavy_refine_not_needed_for_current_score" if HEAVY_LAUNCH_MARKER.exists(): return False, "heavy_already_launched_by_coordinator" if heavy_lock_active(): return False, "heavy_job_already_running" if not HEAVY_SCRIPT.exists(): return False, "heavy_script_missing" if resources.get("disk_free_mb", 0.0) < 5500: return False, "insufficient_disk_headroom_for_heavy_job" mem = resources.get("mem_available_mb") if mem is not None and mem < 500: return False, "insufficient_memory_headroom_for_heavy_job" return True, "stretch_goals_failed_pf_or_realized_dd" def launch_heavy_if_needed(score, resources, now): should_launch, reason = heavy_should_launch(score, resources) state = { "allowed": POLICY["heavy_jobs_allowed"], "max_parallel": POLICY["max_heavy_jobs_parallel"], "lock_active": heavy_lock_active(), "launch_decision": "skip", "reason": reason, "last_run": read_json(HEAVY_LAST_RUN), "current_run": read_json(HEAVY_CURRENT_RUN), } if not should_launch: return state log_path = COORD / "heavy_tune_spawn.log" with log_path.open("a", encoding="utf-8") as log: subprocess.Popen( [str(HEAVY_SCRIPT)], stdout=log, stderr=subprocess.STDOUT, close_fds=True, ) marker = { "launched_at": now, "script": str(HEAVY_SCRIPT), "reason": reason, "log": str(log_path), "retention_policy": POLICY["heavy_job_retention"], } write_json(HEAVY_LAUNCH_MARKER, marker) state.update({ "lock_active": True, "launch_decision": "launched", "reason": reason, "marker": marker, }) return state def paper_live_readiness(score, mark_policy): hard_ok = all(score.get("hard_gates", {}).values()) primary_ok = all(score.get("primary_goals", {}).values()) stretch_ok = all(score.get("stretch_goals", {}).values()) research_gates_pass = bool(hard_ok and primary_ok and mark_policy.get("status") == "PASS") runner_verified = PAPER_RUNNER.exists() ready = bool(POLICY["paper_live_allowed_if_ready"] and research_gates_pass and runner_verified) return { "paper_live_allowed_if_ready": POLICY["paper_live_allowed_if_ready"], "paper_live_research_gates_pass": research_gates_pass, "paper_live_runner_verified": runner_verified, "paper_live_ready": ready, "paper_live_status": read_json(PAPER_STATUS), "real_live_allowed": POLICY["real_live_allowed"], "real_live_hard_blocked": not POLICY["real_live_allowed"], "reason": ( "research_gates_pass_and_paper_runner_verified" if ready else ("paper_runner_not_verified" if research_gates_pass else "research_gates_not_sufficient") ), "stretch_goals_pass": stretch_ok, "paper_live_scope": ( "paper-only telemetry/staging may be prepared; it must not send real exchange orders " "and must not read secrets unless a later human-approved paper connector requires it." ), } def write_next_actions(score, paper, heavy): actions = [ "# Callme Next Actions", "", "Coordinator status: `%s`." % score.get("status"), "", "Safe next steps without human approval:", "", "1. Keep real-money live blocked.", "2. Paper-live staging is allowed only if the runner is paper-only and all hard/primary/mark gates pass.", "3. Run one guarded heavy tune when stretch goals fail; never more than one heavy job in parallel.", "4. After heavy tune cleanup, retain only parameter/result tables and metrics.", "5. Add OOS/time-split validation because some symbol-side groups have only 1-4 trades.", "6. Compare source-close-only vs V21 early full-sell on the same trades.", "7. Update public graph state after each completed verification artifact.", "", "Current gated decisions:", "", "- paper_live_ready: `%s`" % paper.get("paper_live_ready"), "- real_live_allowed: `%s`" % paper.get("real_live_allowed"), "- heavy_launch_decision: `%s` (`%s`)" % (heavy.get("launch_decision"), heavy.get("reason")), "", "Still blocked unless the human explicitly approves a new scope:", "", "- real exchange orders / real live deployment;", "- reading secrets/private sessions;", "- restarting existing live processes.", ] (COORD / "NEXT_ACTIONS.md").write_text("\n".join(actions) + "\n", encoding="utf-8") def main(): COORD.mkdir(parents=True, exist_ok=True) now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) if STOP.exists(): append_log("- `%s` stop file present; coordinator skipped." % now) return 0 summary = read_json(SUMMARY) mark_policy = audit_entry_policy() score = score_summary(summary, mark_policy) mem = read_meminfo_mb() resources = { "disk_free_mb": disk_free_mb(ROOT), "mem_available_mb": mem.get("MemAvailable"), "swap_free_mb": mem.get("SwapFree"), } paper = paper_live_readiness(score, mark_policy) heavy = launch_heavy_if_needed(score, resources, now) status = { "woke_at": now, "state": "awake_complete", "sleep_seconds": 1800, "tmux_window": "Callme", "safety": { "no_live_orders": True, "no_secrets_read": True, "no_private_sessions": True, "no_runtime_restarts": True, "paper_live_allowed_if_ready": POLICY["paper_live_allowed_if_ready"], "real_live_allowed": POLICY["real_live_allowed"], }, "policy": POLICY, "resources": resources, "score": score, "mark_price_policy": mark_policy, "paper_live": paper, "heavy_job": heavy, } write_json(COORD / "WAKE_STATUS.json", status) write_json(COORD / "METRIC_TARGETS.json", TARGETS) write_json(PUBLIC_STATUS, redact_public(status)) write_next_actions(score, paper, heavy) append_log("- `%s` wake complete: `%s`; disk_free_mb=%.1f; mem_available_mb=%s." % ( now, score.get("status"), status["resources"]["disk_free_mb"], status["resources"]["mem_available_mb"], )) return 0 if __name__ == "__main__": raise SystemExit(main())