Files
QuantEngineByItz/tools/build_canonical_metrics_v1.py
T
kjh2064 e0508324e5
WBS-9.3 - NULL Policy CI Gate / NULL Policy Validation (push) Failing after 3s
WBS-9.3 - NULL Policy CI Gate / NULL Policy Validation (pull_request) Failing after 4s
Quant Engine CI/CD Pipeline / validate-core (pull_request) Failing after 2m17s
Quant Engine CI/CD Pipeline / validate-ui-and-storage (pull_request) Has been skipped
docs: .NET 렌더러 운영 상태와 검증 기준 정리
- 운영 상태 문서와 README를 .NET canonical renderer 기준으로 정리했습니다.
- 레거시 렌더러 비운영 선언과 감사/검증기 경로를 통일했습니다.
- 운영 보정 로직의 데이터 소스 반영을 정리했습니다.
2026-06-26 14:18:48 +09:00

331 lines
13 KiB
Python

"""
build_canonical_metrics_v1.py
목적: spec/25_canonical_metrics_registry.yaml에 정의된 논리 지표를
단일 정규 원천에서 읽어 Temp/canonical_metrics_v1.json으로 산출.
렌더러(src/dotnet/QuantEngine.Tools)는 이 파일을 경유해서만 지표값을 조회하고
직접 harness_context의 중복 키를 읽지 않는다.
출력 구조:
{
"formula_id": "CANONICAL_METRICS_V1",
"generated_at": "...",
"metrics": { metric_id: value }, # 스칼라 지표
"per_ticker": { metric_id: {ticker: v} }, # 종목별 지표
"resolved_count": N,
"unresolved": [ {metric, reason} ], # 은폐 금지: 미해석도 명시
"gate": "PASS" | "WARN"
}
"""
import json
import pathlib
import sys
from datetime import datetime, timezone
ROOT = pathlib.Path(__file__).parent.parent
JSON_PATH = ROOT / "GatherTradingData.json"
OUT_PATH = ROOT / "Temp" / "canonical_metrics_v1.json"
def _load_hc():
with open(JSON_PATH, encoding="utf-8") as f:
data = json.load(f)
return data.get("data", {}).get("_harness_context", {})
def _parse(val):
"""str이면 JSON으로 파싱 시도, 실패 시 원본 반환."""
if isinstance(val, str):
try:
return json.loads(val)
except Exception:
return val
return val
def _hc_get(hc, key):
return _parse(hc.get(key))
def _list_to_ticker_map(lst, ticker_key="ticker"):
"""리스트 → {ticker: item} 딕셔너리 변환."""
if not isinstance(lst, list):
return {}
result = {}
for item in lst:
if isinstance(item, dict):
t = str(item.get(ticker_key, "")).strip()
if t:
result[t] = item
return result
def build(hc):
metrics = {}
per_ticker = {}
unresolved = []
resolved = 0
# ──────────────────────────────────────────────
# 스칼라 지표
# ──────────────────────────────────────────────
# 1. cluster_pct
sc_json = _hc_get(hc, "semiconductor_cluster_json")
if isinstance(sc_json, dict) and sc_json.get("combined_pct") is not None:
metrics["cluster_pct"] = float(sc_json["combined_pct"])
resolved += 1
else:
# fallback: mandatory_reduction_json.cluster_pct
mr = _hc_get(hc, "mandatory_reduction_json")
if isinstance(mr, dict) and mr.get("cluster_pct") is not None:
metrics["cluster_pct"] = float(mr["cluster_pct"])
resolved += 1
else:
unresolved.append({
"metric": "cluster_pct",
"reason": "semiconductor_cluster_json.combined_pct AND mandatory_reduction_json.cluster_pct 모두 None"
})
# 2. cash_min_required_krw
cd = _hc_get(hc, "cash_recovery_display_json")
if isinstance(cd, dict) and cd.get("min_required_krw") is not None:
metrics["cash_min_required_krw"] = int(cd["min_required_krw"])
resolved += 1
else:
# fallback: harness_context 최상위 cash_shortfall_min_krw
v = hc.get("cash_shortfall_min_krw")
if v is not None:
metrics["cash_min_required_krw"] = int(v)
resolved += 1
else:
unresolved.append({
"metric": "cash_min_required_krw",
"reason": "cash_recovery_display_json.min_required_krw AND cash_shortfall_min_krw 모두 None"
})
# 3. cash_reference_total_krw
# trim_plan_to_min_cash_json은 list — 마지막 accumulated_krw가 합계
tp = _hc_get(hc, "trim_plan_to_min_cash_json")
ref_total = None
if isinstance(tp, list) and tp:
last_row = tp[-1] if isinstance(tp[-1], dict) else {}
acc = last_row.get("accumulated_krw")
if acc is not None:
ref_total = int(acc)
else:
ref_total = sum(
int(r.get("estimated_sell_krw", 0))
for r in tp if isinstance(r, dict)
)
elif isinstance(tp, dict):
ref_total = tp.get("total_plan_krw")
if ref_total is not None and ref_total > 0:
metrics["cash_reference_total_krw"] = ref_total
resolved += 1
else:
# fallback: cash_recovery_display_json.reference_total_krw (0이면 미산출로 기록)
if isinstance(cd, dict) and cd.get("reference_total_krw") is not None:
val = int(cd["reference_total_krw"])
if val == 0:
unresolved.append({
"metric": "cash_reference_total_krw",
"reason": "trim_plan accumulated_krw=None or 0, fallback reference_total_krw=0(미산출)"
})
else:
metrics["cash_reference_total_krw"] = val
resolved += 1
else:
unresolved.append({
"metric": "cash_reference_total_krw",
"reason": "trim_plan_to_min_cash_json 비어 있음 AND cash_recovery_display_json.reference_total_krw None"
})
# ──────────────────────────────────────────────
# 종목별 지표
# ──────────────────────────────────────────────
# prices_json → ticker map
prices_list = _hc_get(hc, "prices_json")
prices_map = _list_to_ticker_map(prices_list) if isinstance(prices_list, list) else {}
# sell_quantities_json → ticker map
sq_list = _hc_get(hc, "sell_quantities_json")
sq_map = _list_to_ticker_map(sq_list) if isinstance(sq_list, list) else {}
# proposal_reference_json → ticker map
pr_list = _hc_get(hc, "proposal_reference_json")
pr_map = _list_to_ticker_map(pr_list) if isinstance(pr_list, list) else {}
# scrs_v2_json.selected_combo → ticker map
scrs2 = _hc_get(hc, "scrs_v2_json")
combo_list = scrs2.get("selected_combo", []) if isinstance(scrs2, dict) else []
combo_map = _list_to_ticker_map(combo_list)
# profit_preservation_json → ticker map
pp_list = _hc_get(hc, "profit_preservation_json")
pp_map = _list_to_ticker_map(pp_list) if isinstance(pp_list, list) else {}
# 보유 종목 전체 ticker 집합 (prices_json 기준)
all_tickers = set(prices_map.keys())
# 4. scrs_immediate_qty (AGENTS.md 5b 키 불일치 수정)
imm_map = {}
for t, item in combo_map.items():
# immediate_qty 가 정규 키 (immediate_sell_qty는 잘못된 키)
v = item.get("immediate_qty")
imm_map[t] = v if v is not None else "-"
per_ticker["scrs_immediate_qty"] = imm_map
if imm_map:
resolved += 1
else:
unresolved.append({"metric": "scrs_immediate_qty", "reason": "scrs_v2_json.selected_combo 비어 있음"})
# 5. scrs_rebound_qty
rb_map = {}
for t, item in combo_map.items():
v = item.get("rebound_wait_qty")
rb_map[t] = v if v is not None else "-"
per_ticker["scrs_rebound_qty"] = rb_map
if rb_map:
resolved += 1
else:
unresolved.append({"metric": "scrs_rebound_qty", "reason": "scrs_v2_json.selected_combo 비어 있음"})
# 6. ticker_profit_pct (profit_preservation_json의 unrealized_pnl_pct=None → prices_json.profit_pct)
pnl_map = {}
for t in all_tickers:
# profit_preservation_json에 profit_pct 키가 있으면 사용 (동일 값)
pp_row = pp_map.get(t, {})
v = pp_row.get("profit_pct")
if v is None:
v = (prices_map.get(t) or {}).get("profit_pct")
pnl_map[t] = v # None이어도 명시 (은폐 금지)
per_ticker["ticker_profit_pct"] = pnl_map
filled = sum(1 for v in pnl_map.values() if v is not None)
resolved += 1
if filled < len(pnl_map):
unresolved.append({
"metric": "ticker_profit_pct",
"reason": f"{len(pnl_map)-filled}개 종목 profit_pct=None (prices_json 미수집)"
})
# 7. ticker_stop_price
stop_map = {}
for t in all_tickers:
v = (prices_map.get(t) or {}).get("stop_price")
stop_map[t] = v
per_ticker["ticker_stop_price"] = stop_map
resolved += 1
# 8. ticker_limit_price (shadow_ledger용 참고방어가)
limit_map = {}
for t in all_tickers:
# 1순위: proposal_reference_json.proposed_limit_price_krw
v = (pr_map.get(t) or {}).get("proposed_limit_price_krw")
if v is None:
# 2순위: prices_json.stop_price (참고방어가)
v = (prices_map.get(t) or {}).get("stop_price")
limit_map[t] = v
per_ticker["ticker_limit_price"] = limit_map
resolved += 1
# 9. ticker_base_qty (shadow_ledger용 산출 수량)
qty_map = {}
for t in all_tickers:
v = (sq_map.get(t) or {}).get("sell_qty")
qty_map[t] = v # None이면 명시
per_ticker["ticker_base_qty"] = qty_map
resolved += 1
# 10. ticker_tp1_price
tp1_map = {}
for t in all_tickers:
v = (prices_map.get(t) or {}).get("tp1_price")
tp1_map[t] = v
per_ticker["ticker_tp1_price"] = tp1_map
resolved += 1
# ──────────────────────────────────────────────
# EVALUATION_WINDOW_HONESTY_V1: t20_is_proxy 감지 (RC5 수정)
# ──────────────────────────────────────────────
import pathlib as _pl
_oqs_path = _pl.Path(__file__).parent.parent / "Temp" / "outcome_quality_score_v1.json"
_agp_path = _pl.Path(__file__).parent.parent / "Temp" / "algorithm_guidance_proof_v1.json"
_t20_is_proxy = False
_t20_source = None
_t20_effective_rate = None
_t20_proxy_label = None
try:
_oqs = json.loads(_oqs_path.read_text(encoding="utf-8")) if _oqs_path.exists() else {}
_t20_source = (_oqs.get("metrics") or {}).get("t20_source") or _oqs.get("t20_source")
_t20_effective_rate = (_oqs.get("metrics") or {}).get("t20_effective_rate") or _oqs.get("t20_effective_rate")
if not _t20_source:
_agp = json.loads(_agp_path.read_text(encoding="utf-8")) if _agp_path.exists() else {}
_t20_effective_rate = (_agp.get("honest_components") or {}).get("t20_pass_rate", _t20_effective_rate)
except Exception:
pass
_t20_is_proxy = (_t20_source != "operational_t20") if _t20_source else True
if _t20_is_proxy:
_t20_proxy_label = f"[T20_PROXY: t20_source={_t20_source} — 실측 T+20 표본 부재]"
metrics["t20_pass_rate"] = _t20_effective_rate
metrics["t20_is_proxy"] = _t20_is_proxy
metrics["t20_source"] = _t20_source
metrics["t20_proxy_label"] = _t20_proxy_label
resolved += 1
# t20_proxy는 '정보 표기'이지 '미해결'이 아님 — unresolved에 넣지 않음 (CHECK_89 보호)
# ──────────────────────────────────────────────
# 게이트 판정
# ──────────────────────────────────────────────
gate = "PASS" if not unresolved else "WARN"
# proxy 경고는 별도 섹션으로 분리 (unresolved와 혼용 금지)
proxy_warnings = []
if _t20_is_proxy:
proxy_warnings.append({
"metric": "t20_pass_rate",
"proxy_label": _t20_proxy_label,
"enforcement": "proxy 상태에서 release_gate t20_alpha 합격 근거 사용 금지",
})
return {
"formula_id": "CANONICAL_METRICS_V1",
"generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"metrics": metrics,
"per_ticker": per_ticker,
"resolved_count": resolved,
"unresolved": unresolved,
"proxy_warnings": proxy_warnings,
"gate": gate,
"t20_honesty": {
"t20_is_proxy": _t20_is_proxy,
"t20_source": _t20_source,
"t20_effective_rate": _t20_effective_rate,
"t20_proxy_label": _t20_proxy_label,
"release_gate_t20_alpha_blocked": _t20_is_proxy,
},
}
def main():
hc = _load_hc()
out = build(hc)
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(OUT_PATH, "w", encoding="utf-8") as f:
json.dump(out, f, ensure_ascii=False, indent=2)
# 콘솔 요약
print(f"CANONICAL_METRICS_V1: gate={out['gate']} resolved={out['resolved_count']} unresolved={len(out['unresolved'])}")
print(" metrics:")
for k, v in out["metrics"].items():
print(f" {k}: {v}")
print(" per_ticker counts:", {k: len(v) for k, v in out["per_ticker"].items()})
if out["unresolved"]:
print(" UNRESOLVED:")
for u in out["unresolved"]:
print(f" {u['metric']}: {u['reason']}")
return 0 if out["gate"] == "PASS" else 1
if __name__ == "__main__":
sys.exit(main())