from __future__ import annotations import argparse import json from hashlib import sha256 from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = ROOT / "Temp" / "execution_method_ladder_v1.json" DEFAULT_TIMING = ROOT / "Temp" / "sell_execution_timing_lock_v2.json" DEFAULT_WATERFALL = ROOT / "Temp" / "sell_waterfall_engine_v2.json" DEFAULT_SCR = ROOT / "Temp" / "smart_cash_recovery_v7.json" def _load(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: obj = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return obj if isinstance(obj, dict) else {} def _hash_text(text: str) -> str: return sha256(text.encode("utf-8")).hexdigest()[:16] def _rows(value: Any) -> list[dict[str, Any]]: if isinstance(value, list): return [x for x in value if isinstance(x, dict)] return [] def _extract_harness_root(payload: dict[str, Any]) -> dict[str, Any]: h_apex = payload.get("hApex") data_apex = ((payload.get("data") or {}).get("_harness_context")) if isinstance(payload.get("data"), dict) else None if isinstance(h_apex, dict) and isinstance(data_apex, dict): merged = dict(data_apex) merged.update(h_apex) return merged if isinstance(h_apex, dict): return h_apex if isinstance(data_apex, dict): return data_apex return payload def _method_row(method: str, order_type: str, split_count: int, trigger_rule: str, emergency_full_sell: bool) -> dict[str, Any]: return { "method": method, "order_type": order_type, "split_count": split_count, "trigger_rule": trigger_rule, "emergency_full_sell": emergency_full_sell, "market_order_default": order_type.upper() == "MARKET", } def main() -> int: ap = argparse.ArgumentParser(description="Build execution method ladder contract.") ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) ap.add_argument("--timing", default=str(DEFAULT_TIMING)) ap.add_argument("--waterfall", default=str(DEFAULT_WATERFALL)) ap.add_argument("--scr", default=str(DEFAULT_SCR)) args = ap.parse_args() def _rp(path_str: str) -> Path: path = Path(path_str) return path if path.is_absolute() else ROOT / path json_path = _rp(args.json) timing_path = _rp(args.timing) waterfall_path = _rp(args.waterfall) scr_path = _rp(args.scr) out_path = _rp(args.out) payload = _load(json_path) hctx = _extract_harness_root(payload) timing = _load(timing_path) waterfall = _load(waterfall_path) scr = _load(scr_path) sell_timing_verdict = str(timing.get("sell_timing_verdict") or hctx.get("sell_timing_verdict") or "DATA_MISSING") waterfall_gate = str(waterfall.get("gate") or "MISSING") scr_gate = str(scr.get("status") or "MISSING") methods = [ _method_row( "NORMAL_LIQUIDITY", "LIMIT_NEAR_BID_OR_MID", 3, "avg_trade_value_20d_m >= 50 and not emergency_full_sell", False, ), _method_row( "HIGH_LIQUIDITY_BREACH", "TWAP_5_SPLIT", 5, "avg_trade_value_20d_m >= 500 and stop_breach_gate == BREACH", False, ), _method_row( "OVERSOLD_REBOUND", "K2_50_50", 2, "execution_style == OVERSOLD_REBOUND_SELL and rebound_trigger_price defined", False, ), _method_row( "EMERGENCY", "EXIT_100", 1, "emergency_full_sell == true", True, ), ] market_order_default_count = sum(1 for row in methods if row["market_order_default"]) emergency_full_sell_without_flag_count = sum( 1 for row in methods if row["method"] == "EMERGENCY" and row["emergency_full_sell"] is not True ) gate = "PASS" reasons: list[str] = [] if sell_timing_verdict == "DATA_MISSING": gate = "WARN" reasons.append("SELL_TIMING_DATA_MISSING") if waterfall_gate not in {"PASS", "WARN"}: gate = "FAIL" reasons.append(f"WATERFALL_GATE={waterfall_gate}") if scr_gate not in {"PASS", "WARN"}: gate = "FAIL" reasons.append(f"SMART_CASH_RECOVERY_GATE={scr_gate}") if market_order_default_count != 0: gate = "FAIL" reasons.append("MARKET_ORDER_DEFAULT_NOT_ZERO") if emergency_full_sell_without_flag_count != 0: gate = "FAIL" reasons.append("EMERGENCY_FULL_SELL_FLAG_MISSING") result = { "formula_id": "EXECUTION_METHOD_LADDER_V1", "gate": gate, "sell_timing_verdict": sell_timing_verdict, "waterfall_gate": waterfall_gate, "smart_cash_recovery_gate": scr_gate, "market_order_default_count": market_order_default_count, "emergency_full_sell_without_flag_count": emergency_full_sell_without_flag_count, "methods": methods, "source": { "json_path": str(json_path), "sell_execution_timing_lock_v2": str(timing_path), "sell_waterfall_engine_v2": str(waterfall_path), "smart_cash_recovery_v7": str(scr_path), "generated_by_llm": False, }, "input_hash": _hash_text(json.dumps(payload, ensure_ascii=False, sort_keys=True)), "validation": { "all_methods_defined": len(methods) == 4, "no_market_order_default": market_order_default_count == 0, "emergency_flag_locked": emergency_full_sell_without_flag_count == 0, "reasons": reasons, }, } out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())