from __future__ import annotations import re from pathlib import Path from typing import Any import yaml ROOT = Path(__file__).resolve().parents[1] STOP_YAML = ROOT / "spec" / "exit" / "stop_loss.yaml" REGISTRY_YAML = ROOT / "spec" / "13_formula_registry.yaml" GAS_FILE = ROOT / "gas_data_feed.gs" OUTPUT_JSON = ROOT / "Temp" / "relative_underperformance_alert_v1.json" def _collect_gas_text() -> str: """Concatenate text from all GAS files — root + adapter parts (P5-T02 split).""" root_names = ( "gas_apex_alpha_watch.gs", "gas_apex_runtime_core.gs", "gas_data_collect.gs", "gas_data_feed.gs", "gas_harness_rows.gs", "gas_lib.gs", "gas_report.gs", ) parts: list[str] = [] for name in root_names: p = ROOT / name if p.exists(): parts.append(p.read_text(encoding="utf-8", errors="ignore")) adapter_dir = ROOT / "src" / "gas_adapter_parts" if adapter_dir.exists(): for p in sorted(adapter_dir.glob("*.gs")): parts.append(p.read_text(encoding="utf-8", errors="ignore")) return "\n".join(parts) REQUIRED_FORMULAS = { "ABSOLUTE_RISK_STOP_V1", "RELATIVE_UNDERPERF_ALERT_V1", "STOP_ACTION_LADDER_V1", } def _load_yaml(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: payload = yaml.safe_load(path.read_text(encoding="utf-8")) except Exception: return {} return payload if isinstance(payload, dict) else {} def _load_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: import json payload = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return payload if isinstance(payload, dict) else {} def main() -> int: stop_yaml = _load_yaml(STOP_YAML) reg_yaml = _load_yaml(REGISTRY_YAML) gas_text = _collect_gas_text() output = _load_json(OUTPUT_JSON) registry = ((reg_yaml.get("formula_registry") or {}).get("formulas")) or {} errors: list[str] = [] for fid in REQUIRED_FORMULAS: if fid not in registry: errors.append(f"registry_missing:{fid}") if f"function calc{fid.split('_V1')[0].title().replace('_', '')}" not in gas_text: # lightweight check only; exact name checked below pass for fn in ["calcAbsoluteRiskStopV1_", "calcRelativeUnderperfAlertV1_", "calcStopActionLadderV1_"]: if fn not in gas_text: errors.append(f"gas_missing:{fn}") exec_rules = (((stop_yaml.get("stop_loss") or {}).get("executable_rules") or {}).get("rules")) or [] if not isinstance(exec_rules, list) or not exec_rules: errors.append("stop_loss.executable_rules.rules_missing") ambiguous_count = 0 for line in STOP_YAML.read_text(encoding="utf-8", errors="ignore").splitlines(): if any(tok in line for tok in ("또는", "실패 시", "회복 실패")): if "order_type" in line or "price_expression" in line or "quantity_expression" in line: ambiguous_count += 1 stop_action_has_price_qty_method_reason = True for rule in exec_rules: if not isinstance(rule, dict): stop_action_has_price_qty_method_reason = False break if not rule.get("id"): stop_action_has_price_qty_method_reason = False break if "output_fields" not in rule and "output_field" not in rule: stop_action_has_price_qty_method_reason = False break if not any(k in rule for k in ("quantity_rule", "quantity_source", "rules", "expression")): stop_action_has_price_qty_method_reason = False break relative_only_full_liquidation_count = 0 gap_down_full_market_sell_violations = 0 llm_generated_stop_price_count = 0 if output: # If the produced artifact has any non-deterministic or missing identifiers, count them. if str(output.get("formula_id") or "") != "RELATIVE_UNDERPERF_ALERT_V1": llm_generated_stop_price_count += 1 for row in output.get("stop_action_ladder_rows") or []: if not isinstance(row, dict): continue if row.get("action") == "EXIT_100" and row.get("reason") == "REL_EXCESS": relative_only_full_liquidation_count += 1 if row.get("action") == "SELL_FULL" and row.get("reason") == "GAP_DOWN": gap_down_full_market_sell_violations += 1 ok = ( ambiguous_count == 0 and stop_action_has_price_qty_method_reason and relative_only_full_liquidation_count == 0 and gap_down_full_market_sell_violations == 0 and llm_generated_stop_price_count == 0 and not errors ) print(f"[STOP_LOSS_POLICY_V1] ambiguous={ambiguous_count} price_qty_reason={stop_action_has_price_qty_method_reason} " f"relative_only_full_liq={relative_only_full_liquidation_count} gap_down_full_sell={gap_down_full_market_sell_violations} " f"llm_generated_stop_price={llm_generated_stop_price_count}") if errors: print(" errors: " + ", ".join(errors)) if ok: print("STOP_LOSS_POLICY_V1_OK") return 0 print("STOP_LOSS_POLICY_V1_FAIL") return 1 if __name__ == "__main__": raise SystemExit(main())