ee3e799de1
주요 변경: - tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규 * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합 * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일) - src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규 * Logger.log / getSpreadsheet_() 로 run_all 연동 수정 - src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs * _mergePositionRecord_(): 소수주 중복 행 합산 신규 * parseInt → parseFloat (qty, availQty) - src/gas_adapter_parts/gdf_01_price_metrics.gs * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL - spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63) - spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
175 lines
6.9 KiB
Python
175 lines
6.9 KiB
Python
from __future__ import annotations
|
|
"""DATA_QUALITY_GATE_V2_PY — GAS calcDataQualityGateV2_의 Python authoritative 재산출.
|
|
|
|
근거: GAS 원본(gas_data_feed.gs:8643)이 필드경로 버그로 실재 데이터를 0으로 깐다(false-negative).
|
|
정공법: 동일 8개 카테고리를 GatherTradingData.json에서 올바른 키로 결정론 재산출한다.
|
|
|
|
핵심 원칙 (거짓 금지 AND 과대 금지):
|
|
- 데이터-존재 카테고리(prediction/cash/cluster/stop_loss/sell_engine): 실데이터 fill rate로 채점.
|
|
- 표본-PENDING 카테고리(trade_quality/alpha_eval/pattern): 실제 평가 표본 누적 필요 → 0이 아니라 PENDING.
|
|
데이터 품질 분모에서 제외(과대 방지). 성과축에서 별도 PENDING 표기.
|
|
- overall_completeness_pct = 데이터-존재 카테고리 평균. 성과(eval)와 데이터품질을 분리.
|
|
"""
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_JSON = ROOT / "GatherTradingData.json"
|
|
DEFAULT_PA = ROOT / "Temp" / "predictive_alpha_engine_v2.json"
|
|
DEFAULT_OUT = ROOT / "Temp" / "data_quality_gate_v2_py.json"
|
|
|
|
# 데이터-존재 카테고리 vs 표본-PENDING 카테고리
|
|
DATA_CATEGORIES = ["prediction", "cash", "cluster", "stop_loss", "sell_engine"]
|
|
PENDING_CATEGORIES = ["trade_quality", "alpha_eval", "pattern"]
|
|
|
|
|
|
def _load(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _merged_hctx(payload: dict[str, Any]) -> dict[str, Any]:
|
|
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
|
|
hctx = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
|
|
merged = dict(hctx)
|
|
if isinstance(payload.get("hApex"), dict):
|
|
merged.update(payload["hApex"])
|
|
return merged
|
|
|
|
|
|
def _gj(hctx: dict[str, Any], key: str) -> Any:
|
|
"""harness_context의 *_json 필드를 dict/list로 파싱."""
|
|
v = hctx.get(key)
|
|
if isinstance(v, str):
|
|
try:
|
|
return json.loads(v)
|
|
except Exception:
|
|
return v
|
|
return v
|
|
|
|
|
|
def _is_valid(v: Any) -> bool:
|
|
return v is not None and v not in ("-", "PENDING", "", "null")
|
|
|
|
|
|
def _fill_rate(fields: list[Any]) -> int:
|
|
if not fields:
|
|
return 0
|
|
filled = sum(1 for f in fields if _is_valid(f))
|
|
return round(filled / len(fields) * 100)
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--json", default=str(DEFAULT_JSON))
|
|
ap.add_argument("--pa", default=str(DEFAULT_PA))
|
|
ap.add_argument("--out", default=str(DEFAULT_OUT))
|
|
args = ap.parse_args()
|
|
|
|
jp = Path(args.json) if Path(args.json).is_absolute() else ROOT / args.json
|
|
pap = Path(args.pa) if Path(args.pa).is_absolute() else ROOT / args.pa
|
|
op = Path(args.out) if Path(args.out).is_absolute() else ROOT / args.out
|
|
|
|
payload = _load(jp)
|
|
hctx = _merged_hctx(payload)
|
|
pa = _load(pap)
|
|
|
|
# ── 데이터-존재 카테고리 (올바른 키로 재산출) ──────────────────────────
|
|
pa_rows = pa.get("rows") if isinstance(pa.get("rows"), list) else []
|
|
pa0 = pa_rows[0] if pa_rows else {}
|
|
prediction_fields = [
|
|
pa0.get("thesis_score"), pa0.get("antithesis_score"),
|
|
pa0.get("synthesis_verdict"), pa0.get("direction_confidence"),
|
|
]
|
|
|
|
cash_shortfall = _gj(hctx, "cash_shortfall_json")
|
|
cash_shortfall_val = (
|
|
cash_shortfall.get("cash_shortfall_min_krw") if isinstance(cash_shortfall, dict)
|
|
else hctx.get("cash_shortfall_min_krw")
|
|
)
|
|
cash_fields = [
|
|
hctx.get("settlement_cash_d2_krw"), hctx.get("cash_floor_status"), cash_shortfall_val,
|
|
]
|
|
|
|
cluster = _gj(hctx, "semiconductor_cluster_json") or {}
|
|
cluster_fields = [cluster.get("cluster_state"), cluster.get("combined_pct")]
|
|
|
|
pp = _gj(hctx, "profit_preservation_json")
|
|
pp0 = pp[0] if isinstance(pp, list) and pp else {}
|
|
stop_loss_fields = [
|
|
pp0.get("protected_stop_price"), pp0.get("auto_trailing_stop"),
|
|
pp0.get("profit_preservation_state"),
|
|
]
|
|
|
|
scrs = _gj(hctx, "scrs_v2_json") or {}
|
|
combo = scrs.get("selected_combo") or []
|
|
combo0 = combo[0] if combo else {}
|
|
sell_engine_fields = [
|
|
scrs.get("emergency_level"), combo0.get("immediate_qty"), combo0.get("rebound_wait_qty"),
|
|
]
|
|
|
|
data_scores = {
|
|
"prediction": _fill_rate(prediction_fields),
|
|
"cash": _fill_rate(cash_fields),
|
|
"cluster": _fill_rate(cluster_fields),
|
|
"stop_loss": _fill_rate(stop_loss_fields),
|
|
"sell_engine": _fill_rate(sell_engine_fields),
|
|
}
|
|
|
|
# ── 표본-PENDING 카테고리 (실표본 누적 필요 → 데이터품질 분모 제외) ────
|
|
tq = _gj(hctx, "trade_quality_report_json") or {}
|
|
tq_records = tq.get("records") or []
|
|
alpha_hist = _gj(hctx, "alpha_history_summary_json") or {}
|
|
acc_rate = alpha_hist.get("prediction_accuracy_rate")
|
|
pattern = _gj(hctx, "pattern_blacklist_auto_json")
|
|
|
|
pending_status = {
|
|
"trade_quality": "PENDING" if not tq_records else "READY",
|
|
"alpha_eval": "PENDING" if not _is_valid(acc_rate) else "READY",
|
|
"pattern": "PENDING" if not isinstance(pattern, dict) or not pattern.get("status") else "READY",
|
|
}
|
|
|
|
# ── overall = 데이터-존재 카테고리 평균 (성과/eval 분리) ───────────────
|
|
data_vals = list(data_scores.values())
|
|
overall = round(sum(data_vals) / len(data_vals)) if data_vals else 0
|
|
grade = "COMPLETE" if overall >= 90 else "PARTIAL" if overall >= 60 else "INSUFFICIENT"
|
|
|
|
# category_scores: 데이터 카테고리는 점수, PENDING 카테고리는 'PENDING' 문자열
|
|
category_scores: dict[str, Any] = dict(data_scores)
|
|
for cat, st in pending_status.items():
|
|
category_scores[cat] = st
|
|
|
|
pending_list = [c for c, s in pending_status.items() if s == "PENDING"]
|
|
|
|
result = {
|
|
"formula_id": "DATA_QUALITY_GATE_V2_PY",
|
|
"authoritative_over": "GAS calcDataQualityGateV2_ (field-path bug fix)",
|
|
"overall_completeness_pct": overall,
|
|
"completeness_grade": grade,
|
|
"data_category_scores": data_scores,
|
|
"category_scores": category_scores,
|
|
"pending_categories": pending_list,
|
|
"pending_status": pending_status,
|
|
"denominator_note": "overall = 데이터-존재 카테고리 평균. trade_quality/alpha_eval/pattern은 "
|
|
"표본 누적 필요 → PENDING(분모 제외). 거짓 0% AND 과대 0%.",
|
|
"numeric_generation_allowed": 0,
|
|
}
|
|
|
|
op.parent.mkdir(parents=True, exist_ok=True)
|
|
op.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(
|
|
f"DATA_QUALITY_GATE_V2_PY overall={overall}% grade={grade} "
|
|
f"data_scores={data_scores} pending={pending_list}"
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|