from __future__ import annotations """DATA_QUALITY_GATE_V2_PY — GAS calcDataQualityGateV2_의 Python authoritative 재산출. 근거: GAS 원본(gas_data_feed.gs:8643)이 필드경로 버그로 실재 데이터를 0으로 깐다(false-negative). 정공법: 동일 8개 카테고리를 GatherTradingData.json에서 올바른 키로 결정론 재산출한다. 핵심 원칙 (거짓 금지 AND 과대 금지): - 데이터-존재 카테고리(prediction/cash/cluster/stop_loss/sell_engine): 실데이터 fill rate로 채점. - 표본-PENDING 카테고리(trade_quality/alpha_eval/pattern): 실제 평가 표본 누적 필요 → 0이 아니라 PENDING. 데이터 품질 분모에서 제외(과대 방지). 성과축에서 별도 PENDING 표기. - overall_completeness_pct = 데이터-존재 카테고리 평균. 성과(eval)와 데이터품질을 분리. """ import argparse import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_PA = ROOT / "Temp" / "predictive_alpha_engine_v2.json" DEFAULT_OUT = ROOT / "Temp" / "data_quality_gate_v2_py.json" # 데이터-존재 카테고리 vs 표본-PENDING 카테고리 DATA_CATEGORIES = ["prediction", "cash", "cluster", "stop_loss", "sell_engine"] PENDING_CATEGORIES = ["trade_quality", "alpha_eval", "pattern"] def _load(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return {} def _merged_hctx(payload: dict[str, Any]) -> dict[str, Any]: data = payload.get("data") if isinstance(payload.get("data"), dict) else {} hctx = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {} merged = dict(hctx) if isinstance(payload.get("hApex"), dict): merged.update(payload["hApex"]) return merged def _gj(hctx: dict[str, Any], key: str) -> Any: """harness_context의 *_json 필드를 dict/list로 파싱.""" v = hctx.get(key) if isinstance(v, str): try: return json.loads(v) except Exception: return v return v def _is_valid(v: Any) -> bool: return v is not None and v not in ("-", "PENDING", "", "null") def _fill_rate(fields: list[Any]) -> int: if not fields: return 0 filled = sum(1 for f in fields if _is_valid(f)) return round(filled / len(fields) * 100) def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--pa", default=str(DEFAULT_PA)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() jp = Path(args.json) if Path(args.json).is_absolute() else ROOT / args.json pap = Path(args.pa) if Path(args.pa).is_absolute() else ROOT / args.pa op = Path(args.out) if Path(args.out).is_absolute() else ROOT / args.out payload = _load(jp) hctx = _merged_hctx(payload) pa = _load(pap) # ── 데이터-존재 카테고리 (올바른 키로 재산출) ────────────────────────── pa_rows = pa.get("rows") if isinstance(pa.get("rows"), list) else [] pa0 = pa_rows[0] if pa_rows else {} prediction_fields = [ pa0.get("thesis_score"), pa0.get("antithesis_score"), pa0.get("synthesis_verdict"), pa0.get("direction_confidence"), ] cash_shortfall = _gj(hctx, "cash_shortfall_json") cash_shortfall_val = ( cash_shortfall.get("cash_shortfall_min_krw") if isinstance(cash_shortfall, dict) else hctx.get("cash_shortfall_min_krw") ) cash_fields = [ hctx.get("settlement_cash_d2_krw"), hctx.get("cash_floor_status"), cash_shortfall_val, ] cluster = _gj(hctx, "semiconductor_cluster_json") or {} cluster_fields = [cluster.get("cluster_state"), cluster.get("combined_pct")] pp = _gj(hctx, "profit_preservation_json") pp0 = pp[0] if isinstance(pp, list) and pp else {} stop_loss_fields = [ pp0.get("protected_stop_price"), pp0.get("auto_trailing_stop"), pp0.get("profit_preservation_state"), ] scrs = _gj(hctx, "scrs_v2_json") or {} combo = scrs.get("selected_combo") or [] combo0 = combo[0] if combo else {} sell_engine_fields = [ scrs.get("emergency_level"), combo0.get("immediate_qty"), combo0.get("rebound_wait_qty"), ] data_scores = { "prediction": _fill_rate(prediction_fields), "cash": _fill_rate(cash_fields), "cluster": _fill_rate(cluster_fields), "stop_loss": _fill_rate(stop_loss_fields), "sell_engine": _fill_rate(sell_engine_fields), } # ── 표본-PENDING 카테고리 (실표본 누적 필요 → 데이터품질 분모 제외) ──── tq = _gj(hctx, "trade_quality_report_json") or {} tq_records = tq.get("records") or [] alpha_hist = _gj(hctx, "alpha_history_summary_json") or {} acc_rate = alpha_hist.get("prediction_accuracy_rate") pattern = _gj(hctx, "pattern_blacklist_auto_json") pending_status = { "trade_quality": "PENDING" if not tq_records else "READY", "alpha_eval": "PENDING" if not _is_valid(acc_rate) else "READY", "pattern": "PENDING" if not isinstance(pattern, dict) or not pattern.get("status") else "READY", } # ── overall = 데이터-존재 카테고리 평균 (성과/eval 분리) ─────────────── data_vals = list(data_scores.values()) overall = round(sum(data_vals) / len(data_vals)) if data_vals else 0 grade = "COMPLETE" if overall >= 90 else "PARTIAL" if overall >= 60 else "INSUFFICIENT" # category_scores: 데이터 카테고리는 점수, PENDING 카테고리는 'PENDING' 문자열 category_scores: dict[str, Any] = dict(data_scores) for cat, st in pending_status.items(): category_scores[cat] = st pending_list = [c for c, s in pending_status.items() if s == "PENDING"] result = { "formula_id": "DATA_QUALITY_GATE_V2_PY", "authoritative_over": "GAS calcDataQualityGateV2_ (field-path bug fix)", "overall_completeness_pct": overall, "completeness_grade": grade, "data_category_scores": data_scores, "category_scores": category_scores, "pending_categories": pending_list, "pending_status": pending_status, "denominator_note": "overall = 데이터-존재 카테고리 평균. trade_quality/alpha_eval/pattern은 " "표본 누적 필요 → PENDING(분모 제외). 거짓 0% AND 과대 0%.", "numeric_generation_allowed": 0, } op.parent.mkdir(parents=True, exist_ok=True) op.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print( f"DATA_QUALITY_GATE_V2_PY overall={overall}% grade={grade} " f"data_scores={data_scores} pending={pending_list}" ) return 0 if __name__ == "__main__": raise SystemExit(main())