Files
QuantEngineByItz/tools/build_cross_section_consistency_v1.py
T
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

236 lines
8.7 KiB
Python

"""
build_cross_section_consistency_v1.py
목적: operational_report.json의 모든 섹션 markdown에서 canonical_metrics_registry에
정의된 논리 지표가 여러 섹션에서 상이한 값으로 렌더링되는지 검사.
정책: AGENTS.md R1 enforcement_mode_until 방식
- now < enforcement_mode_until → conflict 있어도 gate=WARN (보고서 발행 허용)
- now >= enforcement_mode_until → gate=FAIL (hard-block)
출력 Temp/cross_section_consistency_v1.json:
{
"formula_id": "CROSS_SECTION_CONSISTENCY_V1",
"score": 0~100,
"conflict_count": N,
"conflicts": [{metric, section, rendered, canonical}],
"forbidden_uniform_labels": N,
"incomplete_tables": N,
"enforcement_mode_until": "2026-06-15",
"gate": "PASS" | "WARN" | "FAIL"
}
"""
import json
import pathlib
import re
import sys
from datetime import date, datetime, timezone
ROOT = pathlib.Path(__file__).parent.parent
REPORT_PATH = ROOT / "Temp" / "operational_report.json"
CANON_PATH = ROOT / "Temp" / "canonical_metrics_v1.json"
REGISTRY_PATH = ROOT / "spec" / "25_canonical_metrics_registry.yaml"
OUT_PATH = ROOT / "Temp" / "cross_section_consistency_v1.json"
ENFORCEMENT_MODE_UNTIL = date(2026, 6, 15)
# AGENTS.md R1 금지 일률값
# 주의: "정상"은 STATUS_LABELS["NORMAL"] 정상 번역값이므로 제외.
# R1 금지 대상은 stub/은폐용 일률 placeholder이며 실제 상태코드는 제외.
FORBIDDEN_LABELS = {
"데이터 누락", "DATA_MISSING",
# LOSING은 market_share_proxy_v1에서 실제 알고리즘 산출 상태코드이므로 제외.
# R1 금지 대상: 실질 데이터가 있어야 할 위치에 "정보 없음"으로 은폐하는 stub만 해당.
}
# 화이트리스트 컬럼 (이 컬럼에 있으면 금지 값 허용)
WHITELIST_COLS = {"비고", "해제조건", "remarks", "해석", "근거"}
# 섹션별 예외 허용 (해당 섹션에서는 forbidden_labels 검사 제외)
# pa1_report_table: 471990처럼 universe에 없는 종목은 PA1 미수집이 정당
WHITELIST_SECTIONS = {"pa1_report_table"}
# 섹션별 검사 대상 지표 + 검색 패턴
# key = metric_id, value = 섹션·패턴 맵핑
METRIC_PATTERNS = {
"cluster_pct": {
"sections": ["cluster_sync_audit", "portfolio_structure_risks", "mandatory_reduction_plan"],
"pattern": r"cluster_pct\s*[=:]\s*([\d.]+)%|반도체 클러스터[^\|]*\|\s*([\d.]+)\s*\|",
},
"cash_min_required_krw": {
"sections": ["exec_safety_declaration", "cash_recovery_plan_crdl", "QEH_AUDIT_BLOCK"],
"pattern": r"최소\s+([\d,]+)원|최소 필요 현금:\s*\*\*([\d,]+)원|현금 부족분[^\|]*\|\s*([\d,]+)\s*\|",
},
"cash_reference_total_krw": {
"sections": ["cash_recovery_plan_crdl"],
"pattern": r"참고용 전체 후보 누적 \(([\d,]+)원\)",
},
}
def _load_json(p: pathlib.Path) -> dict:
if p.exists():
try:
return json.loads(p.read_text(encoding="utf-8"))
except Exception:
pass
return {}
def _parse_krw(s: str) -> float | None:
"""'39,797,073' → 39797073.0"""
if s is None:
return None
cleaned = s.replace(",", "").replace("원", "").strip()
try:
return float(cleaned)
except ValueError:
return None
def _extract_value(text: str, pattern: str) -> str | None:
"""markdown에서 정규식으로 값 추출 (첫 번째 그룹)."""
m = re.search(pattern, text)
if not m:
return None
for g in m.groups():
if g is not None:
return g.strip()
return None
def check_conflicts(report_sections: dict[str, str], canon: dict) -> list[dict]:
conflicts = []
metrics_canon = canon.get("metrics", {})
for metric_id, info in METRIC_PATTERNS.items():
canonical_val = metrics_canon.get(metric_id)
if canonical_val is None:
continue
for section_name in info["sections"]:
md = report_sections.get(section_name, "")
if not md:
continue
rendered_raw = _extract_value(md, info["pattern"])
if rendered_raw is None:
continue
# 값 파싱 — 숫자형 비교
rendered_num = _parse_krw(rendered_raw)
if rendered_num is None:
try:
rendered_num = float(rendered_raw)
except ValueError:
continue
tol = 0.1 if metric_id == "cluster_pct" else 0
if abs(rendered_num - float(canonical_val)) > tol:
conflicts.append({
"metric": metric_id,
"section": section_name,
"rendered": rendered_raw,
"canonical": canonical_val,
"diff": round(rendered_num - float(canonical_val), 4),
})
return conflicts
def check_forbidden_labels(report_sections: dict[str, str]) -> int:
"""GFM 표 셀에서 금지 일률값 개수 반환."""
count = 0
for section_name, md in report_sections.items():
if section_name in WHITELIST_SECTIONS:
continue
for line in md.splitlines():
if "|" not in line:
continue
cells = [c.strip() for c in line.split("|")]
for cell in cells:
if any(wl in cell for wl in WHITELIST_COLS):
break
if cell in FORBIDDEN_LABELS:
count += 1
return count
def check_incomplete_tables(report_sections: dict[str, str]) -> int:
"""
핵심 산출 표(stub_token이 실질 데이터여야 할 컬럼에 있을 때) INCOMPLETE_TABLE 판정.
AGENTS.md R1 기준: blank ≥ 5% = WARN, ≥ 20% = FAIL.
단, 이 게이트는 교차섹션 일관성 검사 범위이므로
'지정가·수량·손익률·클러스터%' 핵심 컬럼에 stub이 있는 경우만 집계.
"""
CRITICAL_STUBS = {"미산출", "DATA_MISSING", "데이터 누락"}
incomplete = 0
for md in report_sections.values():
table_lines = [l for l in md.splitlines() if l.strip().startswith("|") and "---" not in l]
if len(table_lines) < 3:
continue
data_lines = table_lines[1:]
critical_stubs = 0
total_data_cells = 0
for line in data_lines:
cells = [c.strip() for c in line.split("|") if c.strip()]
total_data_cells += len(cells)
critical_stubs += sum(1 for c in cells if c in CRITICAL_STUBS)
if total_data_cells > 0 and critical_stubs / total_data_cells >= 0.05:
incomplete += 1
return incomplete
def main() -> int:
report_data = _load_json(REPORT_PATH)
canon_data = _load_json(CANON_PATH)
sections_raw = report_data.get("sections") or []
report_sections: dict[str, str] = {
str(s.get("name") or ""): str(s.get("markdown") or "")
for s in sections_raw if isinstance(s, dict)
}
conflicts = check_conflicts(report_sections, canon_data)
forbidden_count = check_forbidden_labels(report_sections)
incomplete_count = check_incomplete_tables(report_sections)
conflict_count = len(conflicts)
today = date.today()
in_enforcement = today >= ENFORCEMENT_MODE_UNTIL
if conflict_count == 0 and forbidden_count == 0 and incomplete_count == 0:
gate = "PASS"
elif in_enforcement:
gate = "FAIL"
else:
gate = "WARN"
# 점수: conflict 1건당 -10, forbidden 1개당 -2, incomplete 1개당 -5
score = max(0, 100 - conflict_count * 10 - forbidden_count * 2 - incomplete_count * 5)
out = {
"formula_id": "CROSS_SECTION_CONSISTENCY_V1",
"generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"score": score,
"conflict_count": conflict_count,
"conflicts": conflicts,
"forbidden_uniform_labels": forbidden_count,
"incomplete_tables": incomplete_count,
"enforcement_mode_until": str(ENFORCEMENT_MODE_UNTIL),
"enforcement_active": in_enforcement,
"gate": gate,
}
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
OUT_PATH.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"CROSS_SECTION_CONSISTENCY_V1: gate={gate} score={score} conflicts={conflict_count} "
f"forbidden={forbidden_count} incomplete={incomplete_count}")
if conflicts:
print(" CONFLICTS:")
for c in conflicts:
print(f" [{c['metric']}] section={c['section']} rendered={c['rendered']} canonical={c['canonical']} diff={c['diff']}")
return 0 if gate in ("PASS", "WARN") else 1
if __name__ == "__main__":
sys.exit(main())