from __future__ import annotations import argparse import json from collections import Counter, defaultdict from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_HISTORY = ROOT / "Temp" / "proposal_evaluation_history.json" DEFAULT_OUTPUT = ROOT / "Temp" / "rule_lifecycle_policy.json" KST = timezone(timedelta(hours=9)) def load_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: payload = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return payload if isinstance(payload, dict) else {} def _is_eval(row: dict[str, Any], status_key: str) -> bool: return str(row.get(status_key) or "").startswith("EVALUATED_") def _is_match(row: dict[str, Any], outcome_key: str) -> bool: return str(row.get(outcome_key) or "") == "MATCHED" def _is_mismatch(row: dict[str, Any], outcome_key: str) -> bool: return str(row.get(outcome_key) or "") == "MISMATCHED" def decide_action(mismatch_rate: float, samples: int, t20_mismatch_rate: float | None) -> tuple[str, str]: if samples < 10: return "WATCH", "SAMPLE_INSUFFICIENT" if mismatch_rate >= 70.0 and (t20_mismatch_rate is None or t20_mismatch_rate >= 60.0): return "RETIRE", "HIGH_MISMATCH_PERSISTENT" if mismatch_rate >= 55.0: return "DEMOTE", "MISMATCH_ELEVATED" if mismatch_rate >= 40.0: return "WATCH", "MISMATCH_WARNING" return "KEEP", "STABLE" def build_policy(history: dict[str, Any]) -> dict[str, Any]: records = [r for r in history.get("records", []) if isinstance(r, dict)] t1_eval = [r for r in records if _is_eval(r, "evaluation_status")] t5_eval = [r for r in records if _is_eval(r, "t5_evaluation_status")] t20_eval = [r for r in records if _is_eval(r, "t20_evaluation_status")] by_cause: dict[str, list[dict[str, Any]]] = defaultdict(list) for row in t1_eval: cause = str(row.get("error_cause") or "UNKNOWN").strip() by_cause[cause].append(row) lifecycle_rows: list[dict[str, Any]] = [] for cause, rows in sorted(by_cause.items(), key=lambda item: len(item[1]), reverse=True): samples = len(rows) mismatched = sum(1 for row in rows if _is_mismatch(row, "outcome")) matched = sum(1 for row in rows if _is_match(row, "outcome")) mismatch_rate = round((mismatched / samples) * 100, 2) if samples else None t20_rows = [row for row in rows if _is_eval(row, "t20_evaluation_status")] t20_mismatch = sum(1 for row in t20_rows if _is_mismatch(row, "t20_outcome")) t20_mismatch_rate = round((t20_mismatch / len(t20_rows)) * 100, 2) if t20_rows else None action, reason = decide_action(mismatch_rate or 0.0, samples, t20_mismatch_rate) rule_state = { "KEEP": "ACTIVE", "WATCH": "SHADOW", "DEMOTE": "DEPRECATED", "RETIRE": "RETIRED", }[action] lifecycle_rows.append( { "rule_key": cause, "rule_state": rule_state, "samples_t1": samples, "matched_t1": matched, "mismatched_t1": mismatched, "mismatch_rate_t1_pct": mismatch_rate, "samples_t20": len(t20_rows), "mismatch_rate_t20_pct": t20_mismatch_rate, "policy_action": action, "policy_reason": reason, } ) top_improvements = Counter( str(row.get("improvement_proposal") or "").strip() for row in t1_eval if str(row.get("improvement_proposal") or "").strip() ).most_common(10) active_rule_has_lifecycle_status = all( isinstance(row, dict) and str(row.get("policy_action") or "").strip() for row in lifecycle_rows ) retired_rule_active_count = 0 retired_rule_policy_count = sum(1 for row in lifecycle_rows if row.get("policy_action") == "RETIRE") # Conditional adoption gate for formulas that require enough evidence. def _count_by_keyword(rows: list[dict[str, Any]], keywords: list[str]) -> int: c = 0 for row in rows: blob = " ".join( [ str(row.get("rule_basis") or ""), str(row.get("improvement_proposal") or ""), str(row.get("error_cause") or ""), str(row.get("action") or ""), ] ).lower() if any(k.lower() in blob for k in keywords): c += 1 return c conditional_targets = [ { "formula_id": "PROACTIVE_SELL_RADAR_V2", "min_samples": 30, "keywords": ["distribution", "pre_distribution", "sell_radar", "설거지", "분배"], }, { "formula_id": "SELL_EXECUTION_QUALITY_GATE_V1", "min_samples": 30, "keywords": ["execution_quality", "체결", "slippage", "sell_execution"], }, { "formula_id": "ANTI_LATE_ENTRY_GATE_V3", "min_samples": 30, "keywords": ["late_chase", "anti_late", "추격", "breakout"], }, ] conditional_rows: list[dict[str, Any]] = [] for target in conditional_targets: observed = _count_by_keyword(t1_eval, target["keywords"]) status = "ADOPTED" if observed >= target["min_samples"] else "WATCH_PENDING_SAMPLE" conditional_rows.append( { "formula_id": target["formula_id"], "min_samples": target["min_samples"], "observed_samples": observed, "status": status, "policy": "DATA_MISSING_LOCK_WHEN_UNDER_MIN_SAMPLES", } ) return { "as_of": datetime.now(KST).isoformat(timespec="seconds"), "schema_version": "2026-05-22-rule-lifecycle-v1", "state_legend": { "ACTIVE": "production rule in use", "SHADOW": "observed but not final", "DEPRECATED": "suppressed from final decision", "RETIRED": "not used in final decision", }, "summary": { "t1_evaluated_count": len(t1_eval), "t5_evaluated_count": len(t5_eval), "t20_evaluated_count": len(t20_eval), "rule_keys_count": len(lifecycle_rows), "actions_count": dict(Counter(row["policy_action"] for row in lifecycle_rows)), "retired_rule_active_count": retired_rule_active_count, "active_rule_has_lifecycle_status": active_rule_has_lifecycle_status, "retired_rule_policy_count": retired_rule_policy_count, }, "conditional_formula_adoption": { "schema_version": "2026-05-25-conditional-adoption-v1", "rows": conditional_rows, }, "rule_lifecycle_rows": lifecycle_rows, "top_improvement_proposals": [{"proposal": key, "count": count} for key, count in top_improvements], "retired_rule_active_count": retired_rule_active_count, "active_rule_has_lifecycle_status": active_rule_has_lifecycle_status, } def main() -> int: parser = argparse.ArgumentParser(description="Build deterministic rule lifecycle policy from proposal evaluation history.") parser.add_argument("--history", default=str(DEFAULT_HISTORY)) parser.add_argument("--output", default=str(DEFAULT_OUTPUT)) args = parser.parse_args() history_path = Path(args.history) output_path = Path(args.output) if not history_path.is_absolute(): history_path = ROOT / history_path if not output_path.is_absolute(): output_path = ROOT / output_path history = load_json(history_path) policy = build_policy(history) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(policy, ensure_ascii=False, indent=2), encoding="utf-8") print(f"RULE_LIFECYCLE_POLICY_BUILT: {output_path} rows={len(policy.get('rule_lifecycle_rows', []))}") return 0 if __name__ == "__main__": raise SystemExit(main())