""" Batch Gate-1 sender for AIMA FAR v9 pilot (users who downloaded but never registered a shop). Reads: data/processed/aima_far_v9_contacts.csv Sends: Telegram messages via aima_support_session Logs: data/processed/aima_far_v9_telegram_log.csv Ledger: data/processed/aima_telegram_message_ledger.csv Skips contacts already marked gate1_message_sent=yes in the log. Honors worker-do-not-send column in the contacts CSV. Run: python src/aima_batch_send_v9.py [--dry-run] """ import argparse import asyncio import csv import os import random import sys from pathlib import Path if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") from aima_telegram_log import DEFAULT_LEDGER_CSV, log_sent from telethon import TelegramClient from telethon.errors import RPCError from telethon.tl.functions.contacts import GetContactsRequest, ImportContactsRequest from telethon.tl.types import InputPhoneContact BASE_DIR = Path(__file__).parent.parent CONTACTS_CSV = BASE_DIR / "data" / "processed" / "aima_far_v9_contacts.csv" LOG_CSV = BASE_DIR / "data" / "processed" / "aima_far_v9_telegram_log.csv" LEDGER_CSV = BASE_DIR / DEFAULT_LEDGER_CSV DEFAULT_SESSION = "data/processed/telegram/aima_support_session" DEFAULT_ENV_FILES = [ BASE_DIR / ".env", Path(r"C:\python_scripts\top_1_telegram_signals\.env"), ] DELAY_RANGE = (45, 90) DO_NOT_SEND_VALUES = {"1", "true", "yes", "y", "так", "да", "do_not_send", "do-not-send"} def load_env() -> None: for path in DEFAULT_ENV_FILES: if not path.exists(): continue for raw in path.read_text(encoding="utf-8", errors="ignore").splitlines(): line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'")) def normalize_phone(value: str) -> str: return "".join(ch for ch in str(value or "") if ch.isdigit() or ch == "+") def is_do_not_send(row: dict) -> bool: return str(row.get("worker-do-not-send", "") or "").strip().casefold() in DO_NOT_SEND_VALUES def already_sent_ids() -> set: if not LOG_CSV.exists(): return set() with LOG_CSV.open(encoding="utf-8-sig") as f: return { str(r["pilot_id"]) for r in csv.DictReader(f) if r.get("gate1_message_sent", "").strip().lower() == "yes" } async def resolve_recipient( client: TelegramClient, phone: str, first_name: str, last_name: str ): wanted = normalize_phone(phone).lstrip("+") result = await client(GetContactsRequest(hash=0)) for user in result.users: user_phone = normalize_phone(getattr(user, "phone", "") or "").lstrip("+") if user_phone and user_phone == wanted: return user contact = InputPhoneContact( client_id=0, phone=phone, first_name=first_name or phone[-4:], last_name=last_name or "", ) imported = await client(ImportContactsRequest([contact])) if not imported.users: raise RuntimeError(f"phone {phone} has no Telegram account") return imported.users[0] async def run_batch(dry_run: bool, session: str) -> None: load_env() api_id = os.environ.get("TG_API_ID") api_hash = os.environ.get("TG_API_HASH") if not api_id or not api_hash: raise SystemExit("TG_API_ID/TG_API_HASH missing from .env") session_path = Path(session) if not session_path.is_absolute(): session_path = BASE_DIR / session_path with CONTACTS_CSV.open(encoding="utf-8-sig") as f: queue = list(csv.DictReader(f)) sent_ids = already_sent_ids() to_send = [ r for r in queue if not is_do_not_send(r) and str(r["pilot_id"]) not in sent_ids ] skipped_dnf = [r for r in queue if is_do_not_send(r)] print( f"[init] queue={len(queue)} worker-do-not-send={len(skipped_dnf)} " f"already_sent={len(sent_ids)} to_send={len(to_send)}" ) if skipped_dnf: print(f"[skip] worker-do-not-send: {[r['pilot_id'] for r in skipped_dnf]}") if not to_send: print("[done] nothing to send.") return client = TelegramClient(str(session_path), int(api_id), api_hash) await client.connect() if not await client.is_user_authorized(): raise SystemExit("Session not authorized — run aima_telegram_login.py first.") me = await client.get_me() print(f"[auth] {getattr(me, 'first_name', '')} (@{getattr(me, 'username', 'no_username')})") sent_ok = 0 no_account = 0 errors = 0 for i, row in enumerate(to_send): pilot_id = row["pilot_id"] phone = normalize_phone(row["phone"]) message = row["gate1_text"] first_name = row.get("first_name", "") last_name = row.get("last_name", "") variant = row.get("variant", "") print(f"\n[{i+1}/{len(to_send)}] pilot_id={pilot_id} ...{phone[-4:]} variant={variant}") if dry_run: print(f" [dry-run] {message[:80]}...") continue try: entity = await resolve_recipient(client, phone, first_name, last_name) except RuntimeError as exc: print(f" [skip] {exc}") no_account += 1 continue except Exception as exc: print(f" [error] resolve: {exc}") errors += 1 continue try: await client.send_message(entity, message) recipient_id = str(getattr(entity, "id", "unknown")) log_sent( LOG_CSV, LEDGER_CSV, pilot_id=pilot_id, phone=phone, recipient_id=recipient_id, message=message, notes="batch_send_v9", ) print(f" [sent] id={recipient_id}") sent_ok += 1 except RPCError as exc: print(f" [error] RPC {type(exc).__name__}: {exc}") errors += 1 if i < len(to_send) - 1: delay = random.uniform(*DELAY_RANGE) print(f" [pause] {delay:.0f}s...") await asyncio.sleep(delay) await client.disconnect() print(f"\n[done] sent={sent_ok} no_account={no_account} errors={errors}") print(f"[log] {LOG_CSV}") def main() -> None: parser = argparse.ArgumentParser(description="Batch Gate-1 sender for AIMA FAR v9 pilot.") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--session", default=DEFAULT_SESSION) args = parser.parse_args() loop = asyncio.get_event_loop() loop.run_until_complete(run_batch(dry_run=args.dry_run, session=args.session)) if __name__ == "__main__": main()