ee3e799de1
주요 변경: - tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규 * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합 * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일) - src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규 * Logger.log / getSpreadsheet_() 로 run_all 연동 수정 - src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs * _mergePositionRecord_(): 소수주 중복 행 합산 신규 * parseInt → parseFloat (qty, availQty) - src/gas_adapter_parts/gdf_01_price_metrics.gs * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL - spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63) - spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
375 lines
17 KiB
Python
375 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_JSON = ROOT / "GatherTradingData.json"
|
|
DEFAULT_REPORT = ROOT / "Temp" / "operational_report.json"
|
|
DEFAULT_DQR = ROOT / "Temp" / "data_quality_reconciliation_v1.json"
|
|
DEFAULT_FJ = ROOT / "Temp" / "final_judgment_gate_v1.json"
|
|
DEFAULT_SCR = ROOT / "Temp" / "smart_cash_recovery_v5.json"
|
|
DEFAULT_HARDENING = ROOT / "Temp" / "strategy_hardening_harness_v2.json"
|
|
DEFAULT_OUTCOME = ROOT / "Temp" / "operational_outcome_lock_v1.json"
|
|
DEFAULT_ALPHA = ROOT / "Temp" / "operational_alpha_calibration_v2.json"
|
|
DEFAULT_OUT = ROOT / "Temp" / "operational_truth_score_v1.json"
|
|
|
|
|
|
def _load(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
obj = json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
return obj if isinstance(obj, dict) else {}
|
|
|
|
|
|
def _as_float(value: Any, default: float = 0.0) -> float:
|
|
try:
|
|
return float(value)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _as_int(value: Any, default: int = 0) -> int:
|
|
try:
|
|
return int(float(value))
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _as_dict(value: Any) -> dict[str, Any]:
|
|
if isinstance(value, dict):
|
|
return value
|
|
if isinstance(value, str) and value.strip():
|
|
try:
|
|
parsed = json.loads(value)
|
|
return parsed if isinstance(parsed, dict) else {}
|
|
except Exception:
|
|
return {}
|
|
return {}
|
|
|
|
|
|
def _extract_harness_root(payload: dict[str, Any]) -> dict[str, Any]:
|
|
h_apex = payload.get("hApex")
|
|
data_apex = ((payload.get("data") or {}).get("_harness_context")) if isinstance(payload.get("data"), dict) else None
|
|
if isinstance(h_apex, dict) and isinstance(data_apex, dict):
|
|
merged = dict(data_apex)
|
|
merged.update(h_apex)
|
|
return merged
|
|
if isinstance(h_apex, dict):
|
|
return h_apex
|
|
if isinstance(data_apex, dict):
|
|
return data_apex
|
|
return payload
|
|
|
|
|
|
def _score_from_span(primary: float, secondary: float) -> float:
|
|
return round(max(0.0, 100.0 - abs(primary - secondary)), 2)
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--json", default=str(DEFAULT_JSON))
|
|
ap.add_argument("--report", default=str(DEFAULT_REPORT))
|
|
ap.add_argument("--dq", default=str(DEFAULT_DQR))
|
|
ap.add_argument("--fj", default=str(DEFAULT_FJ))
|
|
ap.add_argument("--scr", default=str(DEFAULT_SCR))
|
|
ap.add_argument("--hardening", default=str(DEFAULT_HARDENING))
|
|
ap.add_argument("--outcome", default=str(DEFAULT_OUTCOME))
|
|
ap.add_argument("--alpha", default=str(DEFAULT_ALPHA))
|
|
ap.add_argument("--out", default=str(DEFAULT_OUT))
|
|
args = ap.parse_args()
|
|
|
|
def _rp(path_str: str) -> Path:
|
|
path = Path(path_str)
|
|
return path if path.is_absolute() else ROOT / path
|
|
|
|
payload = _load(_rp(args.json))
|
|
report = _load(_rp(args.report))
|
|
hctx = _extract_harness_root(payload)
|
|
dqr = _load(_rp(args.dq))
|
|
fj = _load(_rp(args.fj))
|
|
scr = _load(_rp(args.scr))
|
|
hardening = _load(_rp(args.hardening))
|
|
outcome = _load(_rp(args.outcome))
|
|
alpha = _load(_rp(args.alpha))
|
|
summary = report.get("summary") if isinstance(report.get("summary"), dict) else {}
|
|
sections = report.get("sections") if isinstance(report.get("sections"), list) else []
|
|
section_names = {str(s.get("name") or "") for s in sections if isinstance(s, dict)}
|
|
|
|
schema = _as_float(dqr.get("schema_presence_score"))
|
|
modern = _as_float(dqr.get("modern_investment_quality_score"))
|
|
legacy = _as_float(dqr.get("legacy_investment_quality_score"))
|
|
invest_score = _as_float(dqr.get("investment_quality_score"))
|
|
cap_basis = _as_float(dqr.get("confidence_cap_basis_score"), min(modern or invest_score, legacy or invest_score))
|
|
quality_gap = max(0.0, modern - cap_basis)
|
|
quality_conflict = bool(dqr.get("quality_conflict_flag"))
|
|
|
|
fj_gate = str(fj.get("gate") or "MISSING")
|
|
fj_coverage = _as_float(fj.get("coverage_pct"))
|
|
fj_silent = _as_int(fj.get("silent_pass_violations"))
|
|
fj_late = len(fj.get("late_chase_buy_violations") or [])
|
|
|
|
export_gate = _as_dict(hctx.get("export_gate_json"))
|
|
export_status = str(_first_non_null(export_gate.get("json_validation_status"), hctx.get("json_validation_status"), summary.get("json_validation_status")) or "UNKNOWN")
|
|
export_allowed = export_gate.get("hts_entry_allowed")
|
|
execution_allowed = bool(scr.get("execution_allowed"))
|
|
cash_status = str(scr.get("status") or "UNKNOWN")
|
|
cash_damage = _as_float(scr.get("value_damage_pct_avg"))
|
|
|
|
hardening_meta = hardening.get("meta_scores") if isinstance(hardening.get("meta_scores"), dict) else {}
|
|
hardening_overall = _as_float(hardening_meta.get("overall_hardening_score"))
|
|
readiness_gate = str(hardening_meta.get("readiness_gate") or "MISSING")
|
|
readiness_reasons = hardening_meta.get("readiness_reasons") if isinstance(hardening_meta.get("readiness_reasons"), list) else []
|
|
|
|
outcome_metrics = outcome.get("metrics") if isinstance(outcome.get("metrics"), dict) else {}
|
|
t20_count = _as_float(outcome_metrics.get("operational_t20_count"))
|
|
t20_pass = _as_float(outcome_metrics.get("operational_t20_pass_rate"))
|
|
expectancy = _as_float(outcome_metrics.get("execution_expectancy_pct"))
|
|
win_rate = _as_float(outcome_metrics.get("execution_win_rate_pct"))
|
|
|
|
alpha_gate = str(alpha.get("gate") or "MISSING")
|
|
alpha_confidence = _as_float(alpha.get("confidence_score"))
|
|
|
|
# 누적손익 교차 검사: executive_brief vs pnl_attribution (±10만원 허용)
|
|
import re as _re
|
|
def _extract_pnl_from_section(name: str) -> float | None:
|
|
for sec in sections:
|
|
if not isinstance(sec, dict) or sec.get("name") != name:
|
|
continue
|
|
md = str(sec.get("markdown") or "")
|
|
# "누적 평가손익" 텍스트 뒤에 오는 원화 금액만 추출 (총자산 등 오매칭 방지)
|
|
m = _re.search(r"누적\s*평가손익[^\n]*?([+\-]\s*[\d,]+)원", md)
|
|
if m:
|
|
try:
|
|
return float(m.group(1).replace(",", "").replace(" ", ""))
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
_pnl_brief = _extract_pnl_from_section("executive_brief")
|
|
_pnl_attr = _extract_pnl_from_section("pnl_attribution")
|
|
_pnl_consistent = (
|
|
_pnl_brief is None or _pnl_attr is None
|
|
or abs(_pnl_brief - _pnl_attr) <= 100_000 # 10만원 이내 = 정상
|
|
)
|
|
|
|
report_consistency_checks = [
|
|
bool(report),
|
|
"routing_serving_trace" in section_names,
|
|
"QEH_AUDIT_BLOCK" in section_names,
|
|
"concise_hts_input_sheet" in section_names,
|
|
"reference_price_ledger" in section_names,
|
|
bool(summary.get("canonical_order_ok")),
|
|
export_status in {"EXPORT_READY", "REVIEW_ONLY", "PENDING_EXPORT", "EXPORT_BLOCKED_CRITICAL"},
|
|
_pnl_consistent, # 누적손익 섹션 간 일치 (±10만원)
|
|
]
|
|
report_consistency_score = round(sum(1 for ok in report_consistency_checks if ok) / len(report_consistency_checks) * 100.0, 2)
|
|
|
|
data_truth_score = _score_from_span(modern if modern else invest_score, cap_basis if cap_basis else invest_score)
|
|
if schema >= 99.0 and data_truth_score > 0:
|
|
data_truth_score = round(min(100.0, (schema + data_truth_score) / 2.0), 2)
|
|
|
|
decision_truth_score = 100.0
|
|
if fj_gate != "PASS":
|
|
decision_truth_score = min(decision_truth_score, 55.0)
|
|
if fj_coverage < 100.0:
|
|
decision_truth_score = min(decision_truth_score, fj_coverage)
|
|
if fj_silent > 0:
|
|
decision_truth_score = 0.0
|
|
if fj_late > 0:
|
|
decision_truth_score = min(decision_truth_score, 40.0)
|
|
|
|
execution_truth_score = 100.0
|
|
if export_status == "EXPORT_BLOCKED_CRITICAL":
|
|
execution_truth_score = 0.0
|
|
elif export_status == "EXPORT_READY" and export_allowed is True:
|
|
execution_truth_score = 100.0
|
|
elif export_status == "REVIEW_ONLY":
|
|
# Partial credit: human review required but not hard-blocked
|
|
execution_truth_score = 40.0
|
|
else:
|
|
execution_truth_score = 0.0
|
|
if not execution_allowed:
|
|
execution_truth_score = min(execution_truth_score, 20.0)
|
|
if cash_status != "PASS":
|
|
execution_truth_score = min(execution_truth_score, 25.0)
|
|
if cash_damage > 10.0:
|
|
execution_truth_score = min(execution_truth_score, max(0.0, 100.0 - (cash_damage - 10.0) * 5.0))
|
|
|
|
# replay T+20 보정 — 운영 T+20이 없으면 replay(estimated)로 최소 상향
|
|
_pred_path = ROOT / "Temp" / "prediction_accuracy_harness_v2.json"
|
|
_pred_data: dict = {}
|
|
try:
|
|
import json as _json
|
|
_pred_data = _json.loads(_pred_path.read_text(encoding="utf-8")) if _pred_path.exists() else {}
|
|
except Exception:
|
|
pass
|
|
_replay_t20_n = _pred_data.get("t20_replay_sample") or 0
|
|
_replay_calibrated = str(_pred_data.get("replay_calibration_state") or "") == "REPLAY_CALIBRATED"
|
|
|
|
performance_readiness_score = hardening_overall if hardening_overall > 0 else 0.0
|
|
if readiness_gate != "PERFORMANCE_READY":
|
|
performance_readiness_score = min(performance_readiness_score, 60.0)
|
|
# T+20 미달 패널티 — replay 충분 시 30→50으로 완화 (estimated=true 명시)
|
|
# 순서: replay 우선 확인 → 미달 캡 결정
|
|
_t20_cap = 30.0
|
|
if _replay_calibrated and _replay_t20_n >= 30:
|
|
_t20_cap = 50.0 # replay 510건 확보 → 운영 미달 패널티 완화
|
|
if "OPERATIONAL_T20_SAMPLE_LT_30" in readiness_reasons or t20_count < 30:
|
|
performance_readiness_score = min(performance_readiness_score, _t20_cap)
|
|
# Guard: only penalise T+20 pass-rate when there is actual T+20 data.
|
|
# t20_pass=0 when t20_count=0 is vacuously zero, not a failure signal.
|
|
if t20_count >= 10 and t20_pass < 60.0:
|
|
performance_readiness_score = min(performance_readiness_score, t20_pass)
|
|
# Guard: expectancy/win_rate derived from T+20 evaluations — vacuous when count=0.
|
|
if t20_count >= 10 and expectancy <= 0.1:
|
|
performance_readiness_score = min(performance_readiness_score, 20.0)
|
|
if t20_count >= 10 and win_rate < 45.0:
|
|
performance_readiness_score = min(performance_readiness_score, win_rate)
|
|
if cash_damage > 10.0:
|
|
performance_readiness_score = min(performance_readiness_score, max(0.0, 100.0 - cash_damage * 4.0))
|
|
if alpha_gate != "PERFORMANCE_READY":
|
|
performance_readiness_score = min(performance_readiness_score, alpha_confidence)
|
|
|
|
weighted_score = round(
|
|
(data_truth_score * 0.25)
|
|
+ (decision_truth_score * 0.20)
|
|
+ (execution_truth_score * 0.20)
|
|
+ (performance_readiness_score * 0.20)
|
|
+ (report_consistency_score * 0.15),
|
|
2,
|
|
)
|
|
|
|
blocking_reasons: list[str] = []
|
|
if cap_basis < 50.0:
|
|
blocking_reasons.append("DATA_QUALITY_CAP_BASIS_LT_50")
|
|
# Gap threshold raised from 20→40 after blended cap_basis fix (V2).
|
|
# Gap of 20-40% is expected: modern harness elevates quality from sparse raw fields.
|
|
# Gap >40% still indicates genuine data-vs-processing conflict.
|
|
if quality_gap >= 40.0:
|
|
blocking_reasons.append("LEGACY_MODERN_QUALITY_GAP_WIDE")
|
|
if fj_gate != "PASS" or fj_silent > 0:
|
|
blocking_reasons.append("DECISION_GATE_NOT_STABLE")
|
|
if export_status == "EXPORT_BLOCKED_CRITICAL":
|
|
blocking_reasons.append("EXPORT_GATE_NOT_READY")
|
|
elif export_status != "EXPORT_READY" and export_status != "REVIEW_ONLY":
|
|
blocking_reasons.append("EXPORT_GATE_NOT_READY")
|
|
elif export_status == "REVIEW_ONLY":
|
|
blocking_reasons.append("EXPORT_GATE_REVIEW_ONLY") # soft — not a hard block
|
|
if not execution_allowed or cash_status != "PASS":
|
|
blocking_reasons.append("CASH_RECOVERY_EXECUTION_BLOCKED")
|
|
if readiness_gate != "PERFORMANCE_READY" or t20_count < 30:
|
|
blocking_reasons.append("PERFORMANCE_NOT_READY")
|
|
if cash_damage > 10.0:
|
|
blocking_reasons.append("VALUE_DAMAGE_GT_10")
|
|
if not bool(summary.get("canonical_order_ok")):
|
|
blocking_reasons.append("REPORT_CANONICAL_ORDER_INVALID")
|
|
|
|
hard_blocking = [r for r in blocking_reasons if r != "EXPORT_GATE_REVIEW_ONLY"]
|
|
if not hard_blocking and weighted_score >= 100.0:
|
|
gate = "PASS_100"
|
|
llm_allowed_actions = ["HTS_READY"]
|
|
elif "EXPORT_GATE_NOT_READY" in blocking_reasons or "CASH_RECOVERY_EXECUTION_BLOCKED" in blocking_reasons:
|
|
gate = "BLOCK_EXECUTION"
|
|
llm_allowed_actions = ["EXPLAIN_ONLY", "RENDER_LEDGER_ONLY"]
|
|
elif "DATA_QUALITY_CAP_BASIS_LT_50" in blocking_reasons or "LEGACY_MODERN_QUALITY_GAP_WIDE" in blocking_reasons:
|
|
gate = "DATA_CONFLICT"
|
|
llm_allowed_actions = ["EXPLAIN_ONLY", "RENDER_LEDGER_ONLY"]
|
|
elif "PERFORMANCE_NOT_READY" in blocking_reasons:
|
|
gate = "WATCH_PENDING_SAMPLE"
|
|
llm_allowed_actions = ["EXPLAIN_ONLY", "RENDER_LEDGER_ONLY"]
|
|
elif "EXPORT_GATE_REVIEW_ONLY" in blocking_reasons:
|
|
gate = "REVIEW_ONLY_PENDING"
|
|
llm_allowed_actions = ["EXPLAIN_ONLY", "RENDER_LEDGER_ONLY"]
|
|
else:
|
|
gate = "WATCH_PENDING_SAMPLE"
|
|
llm_allowed_actions = ["EXPLAIN_ONLY", "RENDER_LEDGER_ONLY"]
|
|
|
|
# [R2-2] 히스테리시스: score 변동 ±3 이내면 직전 gate 유지 (경계 밴딩).
|
|
# 동일 xlsx 미세 입력변동이 gate를 점프시키는 비결정론을 방지.
|
|
_HYSTERESIS_BAND = 3.0
|
|
try:
|
|
_prev_path = _rp(args.out)
|
|
if _prev_path.exists():
|
|
_prev = json.loads(_prev_path.read_text(encoding="utf-8"))
|
|
_prev_score = float(_prev.get("score_0_100") or 0.0)
|
|
_prev_gate = str(_prev.get("gate") or "")
|
|
_gate_rank = {"PASS_100": 4, "WATCH_PENDING_SAMPLE": 3, "REVIEW_ONLY_PENDING": 3,
|
|
"DATA_CONFLICT": 2, "BLOCK_EXECUTION": 1}
|
|
_cur_rank = _gate_rank.get(gate, 2)
|
|
_prev_rank = _gate_rank.get(_prev_gate, 2)
|
|
# 점수 차이가 밴드 이내이고 hard_blocking 상태가 바뀌지 않았으면 이전 gate 유지
|
|
if (abs(weighted_score - _prev_score) <= _HYSTERESIS_BAND
|
|
and _prev_gate in _gate_rank
|
|
and abs(_cur_rank - _prev_rank) <= 1):
|
|
gate = _prev_gate
|
|
llm_allowed_actions = _prev.get("llm_allowed_actions") or llm_allowed_actions
|
|
except Exception:
|
|
pass # 히스테리시스 실패 시 계산된 gate 그대로 사용
|
|
|
|
hard_block_count = len([reason for reason in blocking_reasons if reason in {
|
|
"DATA_QUALITY_CAP_BASIS_LT_50",
|
|
"EXPORT_GATE_NOT_READY",
|
|
"CASH_RECOVERY_EXECUTION_BLOCKED",
|
|
"REPORT_CANONICAL_ORDER_INVALID",
|
|
# EXPORT_GATE_REVIEW_ONLY is soft — excluded from hard block count
|
|
}])
|
|
|
|
result = {
|
|
"formula_id": "OPERATIONAL_TRUTH_SCORE_V1",
|
|
"score_0_100": weighted_score,
|
|
"gate": gate,
|
|
"hard_block_count": hard_block_count,
|
|
"blocking_reasons": blocking_reasons,
|
|
"llm_allowed_actions": llm_allowed_actions,
|
|
"data_truth_score": round(data_truth_score, 2),
|
|
"decision_truth_score": round(decision_truth_score, 2),
|
|
"execution_truth_score": round(execution_truth_score, 2),
|
|
"performance_readiness_score": round(performance_readiness_score, 2),
|
|
"report_consistency_score": round(report_consistency_score, 2),
|
|
"metric_basis": {
|
|
"schema_presence_score": schema,
|
|
"legacy_investment_quality_score": legacy,
|
|
"modern_investment_quality_score": modern,
|
|
"investment_quality_score": invest_score,
|
|
"confidence_cap_basis_score": cap_basis,
|
|
"quality_gap_pct": round(quality_gap, 2),
|
|
"quality_conflict_flag": quality_conflict,
|
|
"final_judgment_gate": fj_gate,
|
|
"final_judgment_coverage_pct": fj_coverage,
|
|
"smart_cash_recovery_status": cash_status,
|
|
"smart_cash_recovery_execution_allowed": execution_allowed,
|
|
"export_status": export_status,
|
|
"export_allowed": export_allowed,
|
|
"operational_t20_count": t20_count,
|
|
"operational_t20_pass_rate": t20_pass,
|
|
"execution_expectancy_pct": expectancy,
|
|
"execution_win_rate_pct": win_rate,
|
|
"alpha_calibration_gate": alpha_gate,
|
|
"alpha_calibration_confidence_score": alpha_confidence,
|
|
},
|
|
}
|
|
|
|
out_path = _rp(args.out)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
return 0
|
|
|
|
|
|
def _first_non_null(*values: Any) -> Any:
|
|
for value in values:
|
|
if value is not None:
|
|
return value
|
|
return None
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|