""" build_canonical_metrics_v1.py 목적: spec/25_canonical_metrics_registry.yaml에 정의된 논리 지표를 단일 정규 원천에서 읽어 Temp/canonical_metrics_v1.json으로 산출. 렌더러(render_operational_report.py)는 이 파일을 경유해서만 지표값을 조회하고 직접 harness_context의 중복 키를 읽지 않는다. 출력 구조: { "formula_id": "CANONICAL_METRICS_V1", "generated_at": "...", "metrics": { metric_id: value }, # 스칼라 지표 "per_ticker": { metric_id: {ticker: v} }, # 종목별 지표 "resolved_count": N, "unresolved": [ {metric, reason} ], # 은폐 금지: 미해석도 명시 "gate": "PASS" | "WARN" } """ import json import pathlib import sys from datetime import datetime, timezone ROOT = pathlib.Path(__file__).parent.parent JSON_PATH = ROOT / "GatherTradingData.json" OUT_PATH = ROOT / "Temp" / "canonical_metrics_v1.json" def _load_hc(): with open(JSON_PATH, encoding="utf-8") as f: data = json.load(f) return data.get("data", {}).get("_harness_context", {}) def _parse(val): """str이면 JSON으로 파싱 시도, 실패 시 원본 반환.""" if isinstance(val, str): try: return json.loads(val) except Exception: return val return val def _hc_get(hc, key): return _parse(hc.get(key)) def _list_to_ticker_map(lst, ticker_key="ticker"): """리스트 → {ticker: item} 딕셔너리 변환.""" if not isinstance(lst, list): return {} result = {} for item in lst: if isinstance(item, dict): t = str(item.get(ticker_key, "")).strip() if t: result[t] = item return result def build(hc): metrics = {} per_ticker = {} unresolved = [] resolved = 0 # ────────────────────────────────────────────── # 스칼라 지표 # ────────────────────────────────────────────── # 1. cluster_pct sc_json = _hc_get(hc, "semiconductor_cluster_json") if isinstance(sc_json, dict) and sc_json.get("combined_pct") is not None: metrics["cluster_pct"] = float(sc_json["combined_pct"]) resolved += 1 else: # fallback: mandatory_reduction_json.cluster_pct mr = _hc_get(hc, "mandatory_reduction_json") if isinstance(mr, dict) and mr.get("cluster_pct") is not None: metrics["cluster_pct"] = float(mr["cluster_pct"]) resolved += 1 else: unresolved.append({ "metric": "cluster_pct", "reason": "semiconductor_cluster_json.combined_pct AND mandatory_reduction_json.cluster_pct 모두 None" }) # 2. cash_min_required_krw cd = _hc_get(hc, "cash_recovery_display_json") if isinstance(cd, dict) and cd.get("min_required_krw") is not None: metrics["cash_min_required_krw"] = int(cd["min_required_krw"]) resolved += 1 else: # fallback: harness_context 최상위 cash_shortfall_min_krw v = hc.get("cash_shortfall_min_krw") if v is not None: metrics["cash_min_required_krw"] = int(v) resolved += 1 else: unresolved.append({ "metric": "cash_min_required_krw", "reason": "cash_recovery_display_json.min_required_krw AND cash_shortfall_min_krw 모두 None" }) # 3. cash_reference_total_krw # trim_plan_to_min_cash_json은 list — 마지막 accumulated_krw가 합계 tp = _hc_get(hc, "trim_plan_to_min_cash_json") ref_total = None if isinstance(tp, list) and tp: last_row = tp[-1] if isinstance(tp[-1], dict) else {} acc = last_row.get("accumulated_krw") if acc is not None: ref_total = int(acc) else: ref_total = sum( int(r.get("estimated_sell_krw", 0)) for r in tp if isinstance(r, dict) ) elif isinstance(tp, dict): ref_total = tp.get("total_plan_krw") if ref_total is not None and ref_total > 0: metrics["cash_reference_total_krw"] = ref_total resolved += 1 else: # fallback: cash_recovery_display_json.reference_total_krw (0이면 미산출로 기록) if isinstance(cd, dict) and cd.get("reference_total_krw") is not None: val = int(cd["reference_total_krw"]) if val == 0: unresolved.append({ "metric": "cash_reference_total_krw", "reason": "trim_plan accumulated_krw=None or 0, fallback reference_total_krw=0(미산출)" }) else: metrics["cash_reference_total_krw"] = val resolved += 1 else: unresolved.append({ "metric": "cash_reference_total_krw", "reason": "trim_plan_to_min_cash_json 비어 있음 AND cash_recovery_display_json.reference_total_krw None" }) # ────────────────────────────────────────────── # 종목별 지표 # ────────────────────────────────────────────── # prices_json → ticker map prices_list = _hc_get(hc, "prices_json") prices_map = _list_to_ticker_map(prices_list) if isinstance(prices_list, list) else {} # sell_quantities_json → ticker map sq_list = _hc_get(hc, "sell_quantities_json") sq_map = _list_to_ticker_map(sq_list) if isinstance(sq_list, list) else {} # proposal_reference_json → ticker map pr_list = _hc_get(hc, "proposal_reference_json") pr_map = _list_to_ticker_map(pr_list) if isinstance(pr_list, list) else {} # scrs_v2_json.selected_combo → ticker map scrs2 = _hc_get(hc, "scrs_v2_json") combo_list = scrs2.get("selected_combo", []) if isinstance(scrs2, dict) else [] combo_map = _list_to_ticker_map(combo_list) # profit_preservation_json → ticker map pp_list = _hc_get(hc, "profit_preservation_json") pp_map = _list_to_ticker_map(pp_list) if isinstance(pp_list, list) else {} # 보유 종목 전체 ticker 집합 (prices_json 기준) all_tickers = set(prices_map.keys()) # 4. scrs_immediate_qty (AGENTS.md 5b 키 불일치 수정) imm_map = {} for t, item in combo_map.items(): # immediate_qty 가 정규 키 (immediate_sell_qty는 잘못된 키) v = item.get("immediate_qty") imm_map[t] = v if v is not None else "-" per_ticker["scrs_immediate_qty"] = imm_map if imm_map: resolved += 1 else: unresolved.append({"metric": "scrs_immediate_qty", "reason": "scrs_v2_json.selected_combo 비어 있음"}) # 5. scrs_rebound_qty rb_map = {} for t, item in combo_map.items(): v = item.get("rebound_wait_qty") rb_map[t] = v if v is not None else "-" per_ticker["scrs_rebound_qty"] = rb_map if rb_map: resolved += 1 else: unresolved.append({"metric": "scrs_rebound_qty", "reason": "scrs_v2_json.selected_combo 비어 있음"}) # 6. ticker_profit_pct (profit_preservation_json의 unrealized_pnl_pct=None → prices_json.profit_pct) pnl_map = {} for t in all_tickers: # profit_preservation_json에 profit_pct 키가 있으면 사용 (동일 값) pp_row = pp_map.get(t, {}) v = pp_row.get("profit_pct") if v is None: v = (prices_map.get(t) or {}).get("profit_pct") pnl_map[t] = v # None이어도 명시 (은폐 금지) per_ticker["ticker_profit_pct"] = pnl_map filled = sum(1 for v in pnl_map.values() if v is not None) resolved += 1 if filled < len(pnl_map): unresolved.append({ "metric": "ticker_profit_pct", "reason": f"{len(pnl_map)-filled}개 종목 profit_pct=None (prices_json 미수집)" }) # 7. ticker_stop_price stop_map = {} for t in all_tickers: v = (prices_map.get(t) or {}).get("stop_price") stop_map[t] = v per_ticker["ticker_stop_price"] = stop_map resolved += 1 # 8. ticker_limit_price (shadow_ledger용 참고방어가) limit_map = {} for t in all_tickers: # 1순위: proposal_reference_json.proposed_limit_price_krw v = (pr_map.get(t) or {}).get("proposed_limit_price_krw") if v is None: # 2순위: prices_json.stop_price (참고방어가) v = (prices_map.get(t) or {}).get("stop_price") limit_map[t] = v per_ticker["ticker_limit_price"] = limit_map resolved += 1 # 9. ticker_base_qty (shadow_ledger용 산출 수량) qty_map = {} for t in all_tickers: v = (sq_map.get(t) or {}).get("sell_qty") qty_map[t] = v # None이면 명시 per_ticker["ticker_base_qty"] = qty_map resolved += 1 # 10. ticker_tp1_price tp1_map = {} for t in all_tickers: v = (prices_map.get(t) or {}).get("tp1_price") tp1_map[t] = v per_ticker["ticker_tp1_price"] = tp1_map resolved += 1 # ────────────────────────────────────────────── # EVALUATION_WINDOW_HONESTY_V1: t20_is_proxy 감지 (RC5 수정) # ────────────────────────────────────────────── import pathlib as _pl _oqs_path = _pl.Path(__file__).parent.parent / "Temp" / "outcome_quality_score_v1.json" _agp_path = _pl.Path(__file__).parent.parent / "Temp" / "algorithm_guidance_proof_v1.json" _t20_is_proxy = False _t20_source = None _t20_effective_rate = None _t20_proxy_label = None try: _oqs = json.loads(_oqs_path.read_text(encoding="utf-8")) if _oqs_path.exists() else {} _t20_source = (_oqs.get("metrics") or {}).get("t20_source") or _oqs.get("t20_source") _t20_effective_rate = (_oqs.get("metrics") or {}).get("t20_effective_rate") or _oqs.get("t20_effective_rate") if not _t20_source: _agp = json.loads(_agp_path.read_text(encoding="utf-8")) if _agp_path.exists() else {} _t20_effective_rate = (_agp.get("honest_components") or {}).get("t20_pass_rate", _t20_effective_rate) except Exception: pass _t20_is_proxy = (_t20_source != "operational_t20") if _t20_source else True if _t20_is_proxy: _t20_proxy_label = f"[T20_PROXY: t20_source={_t20_source} — 실측 T+20 표본 부재]" metrics["t20_pass_rate"] = _t20_effective_rate metrics["t20_is_proxy"] = _t20_is_proxy metrics["t20_source"] = _t20_source metrics["t20_proxy_label"] = _t20_proxy_label resolved += 1 # t20_proxy는 '정보 표기'이지 '미해결'이 아님 — unresolved에 넣지 않음 (CHECK_89 보호) # ────────────────────────────────────────────── # 게이트 판정 # ────────────────────────────────────────────── gate = "PASS" if not unresolved else "WARN" # proxy 경고는 별도 섹션으로 분리 (unresolved와 혼용 금지) proxy_warnings = [] if _t20_is_proxy: proxy_warnings.append({ "metric": "t20_pass_rate", "proxy_label": _t20_proxy_label, "enforcement": "proxy 상태에서 release_gate t20_alpha 합격 근거 사용 금지", }) return { "formula_id": "CANONICAL_METRICS_V1", "generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "metrics": metrics, "per_ticker": per_ticker, "resolved_count": resolved, "unresolved": unresolved, "proxy_warnings": proxy_warnings, "gate": gate, "t20_honesty": { "t20_is_proxy": _t20_is_proxy, "t20_source": _t20_source, "t20_effective_rate": _t20_effective_rate, "t20_proxy_label": _t20_proxy_label, "release_gate_t20_alpha_blocked": _t20_is_proxy, }, } def main(): hc = _load_hc() out = build(hc) OUT_PATH.parent.mkdir(parents=True, exist_ok=True) with open(OUT_PATH, "w", encoding="utf-8") as f: json.dump(out, f, ensure_ascii=False, indent=2) # 콘솔 요약 print(f"CANONICAL_METRICS_V1: gate={out['gate']} resolved={out['resolved_count']} unresolved={len(out['unresolved'])}") print(" metrics:") for k, v in out["metrics"].items(): print(f" {k}: {v}") print(" per_ticker counts:", {k: len(v) for k, v in out["per_ticker"].items()}) if out["unresolved"]: print(" UNRESOLVED:") for u in out["unresolved"]: print(f" {u['metric']}: {u['reason']}") return 0 if out["gate"] == "PASS" else 1 if __name__ == "__main__": sys.exit(main())