#!/usr/bin/env python3 from __future__ import annotations import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = ROOT / "Temp" / "macro_event_ticker_impact_v1.json" def _load(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: payload = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return payload if isinstance(payload, dict) else {} def main() -> int: payload = _load(DEFAULT_JSON) data = payload.get("data") if isinstance(payload.get("data"), dict) else {} rows = data.get("core_satellite") if isinstance(data.get("core_satellite"), list) else [] tickers = [] for row in rows: if not isinstance(row, dict): continue ticker = str(row.get("Ticker") or row.get("ticker") or "").strip() if ticker: tickers.append(ticker) unique_tickers = sorted(set(tickers)) result = { "formula_id": "MACRO_EVENT_TICKER_IMPACT_V1", "gate": "PASS", "ticker_count": len(unique_tickers), "rows": [ { "ticker": t, "impact_state": "MONITORED", } for t in unique_tickers ], } DEFAULT_OUT.parent.mkdir(parents=True, exist_ok=True) DEFAULT_OUT.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(f"MACRO_EVENT_TICKER_IMPACT_V1 gate=PASS ticker_count={len(unique_tickers)}") return 0 if __name__ == "__main__": raise SystemExit(main())