Files
QuantEngineByItz/tools/build_data_quality_gate_v2_py.py
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

175 lines
6.9 KiB
Python

from __future__ import annotations
"""DATA_QUALITY_GATE_V2_PY — GAS calcDataQualityGateV2_의 Python authoritative 재산출.
근거: GAS 원본(gas_data_feed.gs:8643)이 필드경로 버그로 실재 데이터를 0으로 깐다(false-negative).
정공법: 동일 8개 카테고리를 GatherTradingData.json에서 올바른 키로 결정론 재산출한다.
핵심 원칙 (거짓 금지 AND 과대 금지):
- 데이터-존재 카테고리(prediction/cash/cluster/stop_loss/sell_engine): 실데이터 fill rate로 채점.
- 표본-PENDING 카테고리(trade_quality/alpha_eval/pattern): 실제 평가 표본 누적 필요 → 0이 아니라 PENDING.
데이터 품질 분모에서 제외(과대 방지). 성과축에서 별도 PENDING 표기.
- overall_completeness_pct = 데이터-존재 카테고리 평균. 성과(eval)와 데이터품질을 분리.
"""
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_PA = ROOT / "Temp" / "predictive_alpha_engine_v2.json"
DEFAULT_OUT = ROOT / "Temp" / "data_quality_gate_v2_py.json"
# 데이터-존재 카테고리 vs 표본-PENDING 카테고리
DATA_CATEGORIES = ["prediction", "cash", "cluster", "stop_loss", "sell_engine"]
PENDING_CATEGORIES = ["trade_quality", "alpha_eval", "pattern"]
def _load(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
def _merged_hctx(payload: dict[str, Any]) -> dict[str, Any]:
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
hctx = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
merged = dict(hctx)
if isinstance(payload.get("hApex"), dict):
merged.update(payload["hApex"])
return merged
def _gj(hctx: dict[str, Any], key: str) -> Any:
"""harness_context의 *_json 필드를 dict/list로 파싱."""
v = hctx.get(key)
if isinstance(v, str):
try:
return json.loads(v)
except Exception:
return v
return v
def _is_valid(v: Any) -> bool:
return v is not None and v not in ("-", "PENDING", "", "null")
def _fill_rate(fields: list[Any]) -> int:
if not fields:
return 0
filled = sum(1 for f in fields if _is_valid(f))
return round(filled / len(fields) * 100)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--pa", default=str(DEFAULT_PA))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
jp = Path(args.json) if Path(args.json).is_absolute() else ROOT / args.json
pap = Path(args.pa) if Path(args.pa).is_absolute() else ROOT / args.pa
op = Path(args.out) if Path(args.out).is_absolute() else ROOT / args.out
payload = _load(jp)
hctx = _merged_hctx(payload)
pa = _load(pap)
# ── 데이터-존재 카테고리 (올바른 키로 재산출) ──────────────────────────
pa_rows = pa.get("rows") if isinstance(pa.get("rows"), list) else []
pa0 = pa_rows[0] if pa_rows else {}
prediction_fields = [
pa0.get("thesis_score"), pa0.get("antithesis_score"),
pa0.get("synthesis_verdict"), pa0.get("direction_confidence"),
]
cash_shortfall = _gj(hctx, "cash_shortfall_json")
cash_shortfall_val = (
cash_shortfall.get("cash_shortfall_min_krw") if isinstance(cash_shortfall, dict)
else hctx.get("cash_shortfall_min_krw")
)
cash_fields = [
hctx.get("settlement_cash_d2_krw"), hctx.get("cash_floor_status"), cash_shortfall_val,
]
cluster = _gj(hctx, "semiconductor_cluster_json") or {}
cluster_fields = [cluster.get("cluster_state"), cluster.get("combined_pct")]
pp = _gj(hctx, "profit_preservation_json")
pp0 = pp[0] if isinstance(pp, list) and pp else {}
stop_loss_fields = [
pp0.get("protected_stop_price"), pp0.get("auto_trailing_stop"),
pp0.get("profit_preservation_state"),
]
scrs = _gj(hctx, "scrs_v2_json") or {}
combo = scrs.get("selected_combo") or []
combo0 = combo[0] if combo else {}
sell_engine_fields = [
scrs.get("emergency_level"), combo0.get("immediate_qty"), combo0.get("rebound_wait_qty"),
]
data_scores = {
"prediction": _fill_rate(prediction_fields),
"cash": _fill_rate(cash_fields),
"cluster": _fill_rate(cluster_fields),
"stop_loss": _fill_rate(stop_loss_fields),
"sell_engine": _fill_rate(sell_engine_fields),
}
# ── 표본-PENDING 카테고리 (실표본 누적 필요 → 데이터품질 분모 제외) ────
tq = _gj(hctx, "trade_quality_report_json") or {}
tq_records = tq.get("records") or []
alpha_hist = _gj(hctx, "alpha_history_summary_json") or {}
acc_rate = alpha_hist.get("prediction_accuracy_rate")
pattern = _gj(hctx, "pattern_blacklist_auto_json")
pending_status = {
"trade_quality": "PENDING" if not tq_records else "READY",
"alpha_eval": "PENDING" if not _is_valid(acc_rate) else "READY",
"pattern": "PENDING" if not isinstance(pattern, dict) or not pattern.get("status") else "READY",
}
# ── overall = 데이터-존재 카테고리 평균 (성과/eval 분리) ───────────────
data_vals = list(data_scores.values())
overall = round(sum(data_vals) / len(data_vals)) if data_vals else 0
grade = "COMPLETE" if overall >= 90 else "PARTIAL" if overall >= 60 else "INSUFFICIENT"
# category_scores: 데이터 카테고리는 점수, PENDING 카테고리는 'PENDING' 문자열
category_scores: dict[str, Any] = dict(data_scores)
for cat, st in pending_status.items():
category_scores[cat] = st
pending_list = [c for c, s in pending_status.items() if s == "PENDING"]
result = {
"formula_id": "DATA_QUALITY_GATE_V2_PY",
"authoritative_over": "GAS calcDataQualityGateV2_ (field-path bug fix)",
"overall_completeness_pct": overall,
"completeness_grade": grade,
"data_category_scores": data_scores,
"category_scores": category_scores,
"pending_categories": pending_list,
"pending_status": pending_status,
"denominator_note": "overall = 데이터-존재 카테고리 평균. trade_quality/alpha_eval/pattern은 "
"표본 누적 필요 → PENDING(분모 제외). 거짓 0% AND 과대 0%.",
"numeric_generation_allowed": 0,
}
op.parent.mkdir(parents=True, exist_ok=True)
op.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(
f"DATA_QUALITY_GATE_V2_PY overall={overall}% grade={grade} "
f"data_scores={data_scores} pending={pending_list}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())