Files
QuantEngineByItz/tools/build_imputed_data_exposure_gate_v2.py
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

202 lines
8.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_ENGINE_AUDIT = ROOT / "Temp" / "engine_audit_v1.json"
DEFAULT_OUT = ROOT / "Temp" / "imputed_data_exposure_gate_v2.json"
FORMULA_ID = "IMPUTED_DATA_EXPOSURE_GATE_V2"
BLOCK_RATIO = 0.50
WARN_RATIO = 0.25
FUND_FACTOR_MIN_COVERAGE = 0.50
DOMAIN_WEIGHTS = {
"fundamental_core": 0.30,
"realized_outcome": 0.30,
"trade_quality": 0.15,
"pattern": 0.10,
"alpha_eval": 0.15,
}
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1)
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return data if isinstance(data, dict) else {}
def _as_float(value: Any, default: float | None = None) -> float | None:
try:
return float(value)
except Exception:
return default
def _extract_harness_root(payload: dict[str, Any]) -> dict[str, Any]:
h_apex = payload.get("hApex")
data_apex = ((payload.get("data") or {}).get("_harness_context")) if isinstance(payload.get("data"), dict) else None
if isinstance(h_apex, dict) and isinstance(data_apex, dict):
merged = dict(data_apex)
merged.update(h_apex)
return merged
if isinstance(h_apex, dict):
return h_apex
if isinstance(data_apex, dict):
return data_apex
return payload
def build_gate(payload: dict[str, Any], audit: dict[str, Any]) -> dict[str, Any]:
hctx = _extract_harness_root(payload)
exposure = audit.get("imputed_data_exposure") if isinstance(audit.get("imputed_data_exposure"), dict) else {}
weighted_coverage = _as_float(exposure.get("weighted_coverage"))
imputed_field_ratio = _as_float(exposure.get("imputed_field_ratio"))
effective_confidence_honest = _as_float(exposure.get("effective_confidence_honest"))
raw_confidence_cap_basis = _as_float(exposure.get("raw_confidence_cap_basis"))
confidence_cap_inflation_gap = _as_float(exposure.get("confidence_cap_inflation_gap"))
fundamental_core_factor_coverage = _as_float(exposure.get("fundamental_core_factor_coverage"))
fundamental_missing_ratio = _as_float(exposure.get("fundamental_missing_ratio"))
surrogate_outcome_ratio = _as_float(exposure.get("surrogate_outcome_ratio"))
domain_coverage = exposure.get("domain_coverage") if isinstance(exposure.get("domain_coverage"), dict) else {}
if weighted_coverage is None:
weights = DOMAIN_WEIGHTS
weighted_coverage = 0.0
for key, weight in weights.items():
weighted_coverage += weight * float(domain_coverage.get(key, 0.0) or 0.0)
weighted_coverage = round(weighted_coverage, 4)
if imputed_field_ratio is None:
imputed_field_ratio = round(1.0 - weighted_coverage, 4)
if raw_confidence_cap_basis is None:
raw_confidence_cap_basis = _as_float(hctx.get("confidence_cap_basis_score"), 0.0)
if effective_confidence_honest is None and raw_confidence_cap_basis is not None:
effective_confidence_honest = round(raw_confidence_cap_basis * (0.4 + 0.6 * weighted_coverage), 1)
if confidence_cap_inflation_gap is None and raw_confidence_cap_basis is not None and effective_confidence_honest is not None:
confidence_cap_inflation_gap = round(raw_confidence_cap_basis - effective_confidence_honest, 1)
if fundamental_core_factor_coverage is None:
fundamental_core_factor_coverage = _as_float(domain_coverage.get("fundamental_core"), 0.0)
if fundamental_missing_ratio is None:
fundamental_missing_ratio = round(max(0.0, 1.0 - (fundamental_core_factor_coverage or 0.0)), 4)
if surrogate_outcome_ratio is None:
surrogate_outcome_ratio = round(max(0.0, 1.0 - _as_float(domain_coverage.get("realized_outcome"), 0.0)), 4)
if imputed_field_ratio >= BLOCK_RATIO:
gate_status = "IMPUTED_DATA_BLOCK"
elif imputed_field_ratio >= WARN_RATIO:
gate_status = "IMPUTED_DATA_WARN"
else:
gate_status = "PASS"
t20_sample = _as_float(hctx.get("t20_operational_sample"), 0.0) or 0.0
long_horizon_allowed = bool(t20_sample > 0 and (fundamental_core_factor_coverage or 0.0) >= FUND_FACTOR_MIN_COVERAGE)
fundamental_claim_allowed = bool((fundamental_core_factor_coverage or 0.0) >= FUND_FACTOR_MIN_COVERAGE)
exposure_reasons: list[str] = []
if fundamental_core_factor_coverage is not None and fundamental_core_factor_coverage < FUND_FACTOR_MIN_COVERAGE:
exposure_reasons.append(
"FUNDAMENTAL_CORE_FACTORS_MISSING: "
f"coverage={fundamental_core_factor_coverage:.2f}"
)
if t20_sample <= 0:
exposure_reasons.append("REALIZED_OUTCOME_T20_ZERO: t20_sample=0")
if confidence_cap_inflation_gap is not None and confidence_cap_inflation_gap > 0:
exposure_reasons.append(
"CONFIDENCE_CAP_INFLATED: "
f"reported={raw_confidence_cap_basis} honest={effective_confidence_honest} gap={confidence_cap_inflation_gap}"
)
result = {
"formula_id": FORMULA_ID,
"gate_status": gate_status,
"imputed_field_ratio": round(imputed_field_ratio, 4),
"imputed_domain_ratio": round(sum(1 for v in domain_coverage.values() if float(v or 0.0) < 0.5) / len(DOMAIN_WEIGHTS), 4)
if domain_coverage
else 1.0,
"weighted_coverage": round(weighted_coverage, 4),
"domain_coverage": {
key: round(float(domain_coverage.get(key, 0.0) or 0.0), 4)
for key in DOMAIN_WEIGHTS
},
"fundamental_core_factor_coverage": round(fundamental_core_factor_coverage or 0.0, 4),
"fundamental_missing_ratio": round(fundamental_missing_ratio or 0.0, 4),
"surrogate_outcome_ratio": round(surrogate_outcome_ratio or 0.0, 4),
"raw_confidence_cap_basis": raw_confidence_cap_basis,
"effective_confidence_honest": effective_confidence_honest,
"confidence_cap_inflation_gap": confidence_cap_inflation_gap,
"long_horizon_allowed": long_horizon_allowed,
"fundamental_claim_allowed": fundamental_claim_allowed,
"report_render_skew": {
"report_dqg_completeness_pct": audit.get("report_render_skew", {}).get("report_dqg_completeness_pct") if isinstance(audit.get("report_render_skew"), dict) else "not_available",
"authoritative_dqg_completeness_pct": audit.get("report_render_skew", {}).get("authoritative_dqg_completeness_pct") if isinstance(audit.get("report_render_skew"), dict) else "not_available",
"skew_detected": bool(audit.get("report_render_skew", {}).get("skew_detected")) if isinstance(audit.get("report_render_skew"), dict) else False,
},
"exposure_reasons": exposure_reasons,
"thresholds": {
"block_ratio": BLOCK_RATIO,
"warn_ratio": WARN_RATIO,
"fund_factor_min_coverage": FUND_FACTOR_MIN_COVERAGE,
},
"formula": (
"weighted_coverage = Σ(weight_d × coverage_d); "
"imputed_field_ratio = 1 - weighted_coverage; "
"effective_confidence_honest = raw_cap × (0.4 + 0.6 × weighted_coverage)"
),
"source": {
"payload_path": str(DEFAULT_JSON),
"engine_audit_path": str(DEFAULT_ENGINE_AUDIT),
},
}
return result
def main() -> int:
ap = argparse.ArgumentParser(description="Build imputed data exposure gate from engine audit artifacts.")
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--audit", default=str(DEFAULT_ENGINE_AUDIT))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
json_path = Path(args.json)
audit_path = Path(args.audit)
out_path = Path(args.out)
if not json_path.is_absolute():
json_path = ROOT / json_path
if not audit_path.is_absolute():
audit_path = ROOT / audit_path
if not out_path.is_absolute():
out_path = ROOT / out_path
payload = _load_json(json_path)
audit = _load_json(audit_path)
result = build_gate(payload, audit)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())