ee3e799de1
주요 변경: - tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규 * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합 * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일) - src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규 * Logger.log / getSpreadsheet_() 로 run_all 연동 수정 - src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs * _mergePositionRecord_(): 소수주 중복 행 합산 신규 * parseInt → parseFloat (qty, availQty) - src/gas_adapter_parts/gdf_01_price_metrics.gs * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL - spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63) - spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
236 lines
8.7 KiB
Python
236 lines
8.7 KiB
Python
"""
|
|
build_cross_section_consistency_v1.py
|
|
목적: operational_report.json의 모든 섹션 markdown에서 canonical_metrics_registry에
|
|
정의된 논리 지표가 여러 섹션에서 상이한 값으로 렌더링되는지 검사.
|
|
|
|
정책: AGENTS.md R1 enforcement_mode_until 방식
|
|
- now < enforcement_mode_until → conflict 있어도 gate=WARN (보고서 발행 허용)
|
|
- now >= enforcement_mode_until → gate=FAIL (hard-block)
|
|
|
|
출력 Temp/cross_section_consistency_v1.json:
|
|
{
|
|
"formula_id": "CROSS_SECTION_CONSISTENCY_V1",
|
|
"score": 0~100,
|
|
"conflict_count": N,
|
|
"conflicts": [{metric, section, rendered, canonical}],
|
|
"forbidden_uniform_labels": N,
|
|
"incomplete_tables": N,
|
|
"enforcement_mode_until": "2026-06-15",
|
|
"gate": "PASS" | "WARN" | "FAIL"
|
|
}
|
|
"""
|
|
import json
|
|
import pathlib
|
|
import re
|
|
import sys
|
|
from datetime import date, datetime, timezone
|
|
|
|
ROOT = pathlib.Path(__file__).parent.parent
|
|
REPORT_PATH = ROOT / "Temp" / "operational_report.json"
|
|
CANON_PATH = ROOT / "Temp" / "canonical_metrics_v1.json"
|
|
REGISTRY_PATH = ROOT / "spec" / "25_canonical_metrics_registry.yaml"
|
|
OUT_PATH = ROOT / "Temp" / "cross_section_consistency_v1.json"
|
|
|
|
ENFORCEMENT_MODE_UNTIL = date(2026, 6, 15)
|
|
|
|
# AGENTS.md R1 금지 일률값
|
|
# 주의: "정상"은 STATUS_LABELS["NORMAL"] 정상 번역값이므로 제외.
|
|
# R1 금지 대상은 stub/은폐용 일률 placeholder이며 실제 상태코드는 제외.
|
|
FORBIDDEN_LABELS = {
|
|
"데이터 누락", "DATA_MISSING",
|
|
# LOSING은 market_share_proxy_v1에서 실제 알고리즘 산출 상태코드이므로 제외.
|
|
# R1 금지 대상: 실질 데이터가 있어야 할 위치에 "정보 없음"으로 은폐하는 stub만 해당.
|
|
}
|
|
# 화이트리스트 컬럼 (이 컬럼에 있으면 금지 값 허용)
|
|
WHITELIST_COLS = {"비고", "해제조건", "remarks", "해석", "근거"}
|
|
# 섹션별 예외 허용 (해당 섹션에서는 forbidden_labels 검사 제외)
|
|
# pa1_report_table: 471990처럼 universe에 없는 종목은 PA1 미수집이 정당
|
|
WHITELIST_SECTIONS = {"pa1_report_table"}
|
|
|
|
# 섹션별 검사 대상 지표 + 검색 패턴
|
|
# key = metric_id, value = 섹션·패턴 맵핑
|
|
METRIC_PATTERNS = {
|
|
"cluster_pct": {
|
|
"sections": ["cluster_sync_audit", "portfolio_structure_risks", "mandatory_reduction_plan"],
|
|
"pattern": r"cluster_pct\s*[=:]\s*([\d.]+)%|반도체 클러스터[^\|]*\|\s*([\d.]+)\s*\|",
|
|
},
|
|
"cash_min_required_krw": {
|
|
"sections": ["exec_safety_declaration", "cash_recovery_plan_crdl", "QEH_AUDIT_BLOCK"],
|
|
"pattern": r"최소\s+([\d,]+)원|최소 필요 현금:\s*\*\*([\d,]+)원|현금 부족분[^\|]*\|\s*([\d,]+)\s*\|",
|
|
},
|
|
"cash_reference_total_krw": {
|
|
"sections": ["cash_recovery_plan_crdl"],
|
|
"pattern": r"참고용 전체 후보 누적 \(([\d,]+)원\)",
|
|
},
|
|
}
|
|
|
|
|
|
def _load_json(p: pathlib.Path) -> dict:
|
|
if p.exists():
|
|
try:
|
|
return json.loads(p.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _parse_krw(s: str) -> float | None:
|
|
"""'39,797,073' → 39797073.0"""
|
|
if s is None:
|
|
return None
|
|
cleaned = s.replace(",", "").replace("원", "").strip()
|
|
try:
|
|
return float(cleaned)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _extract_value(text: str, pattern: str) -> str | None:
|
|
"""markdown에서 정규식으로 값 추출 (첫 번째 그룹)."""
|
|
m = re.search(pattern, text)
|
|
if not m:
|
|
return None
|
|
for g in m.groups():
|
|
if g is not None:
|
|
return g.strip()
|
|
return None
|
|
|
|
|
|
def check_conflicts(report_sections: dict[str, str], canon: dict) -> list[dict]:
|
|
conflicts = []
|
|
metrics_canon = canon.get("metrics", {})
|
|
|
|
for metric_id, info in METRIC_PATTERNS.items():
|
|
canonical_val = metrics_canon.get(metric_id)
|
|
if canonical_val is None:
|
|
continue
|
|
|
|
for section_name in info["sections"]:
|
|
md = report_sections.get(section_name, "")
|
|
if not md:
|
|
continue
|
|
rendered_raw = _extract_value(md, info["pattern"])
|
|
if rendered_raw is None:
|
|
continue
|
|
|
|
# 값 파싱 — 숫자형 비교
|
|
rendered_num = _parse_krw(rendered_raw)
|
|
if rendered_num is None:
|
|
try:
|
|
rendered_num = float(rendered_raw)
|
|
except ValueError:
|
|
continue
|
|
|
|
tol = 0.1 if metric_id == "cluster_pct" else 0
|
|
if abs(rendered_num - float(canonical_val)) > tol:
|
|
conflicts.append({
|
|
"metric": metric_id,
|
|
"section": section_name,
|
|
"rendered": rendered_raw,
|
|
"canonical": canonical_val,
|
|
"diff": round(rendered_num - float(canonical_val), 4),
|
|
})
|
|
|
|
return conflicts
|
|
|
|
|
|
def check_forbidden_labels(report_sections: dict[str, str]) -> int:
|
|
"""GFM 표 셀에서 금지 일률값 개수 반환."""
|
|
count = 0
|
|
for section_name, md in report_sections.items():
|
|
if section_name in WHITELIST_SECTIONS:
|
|
continue
|
|
for line in md.splitlines():
|
|
if "|" not in line:
|
|
continue
|
|
cells = [c.strip() for c in line.split("|")]
|
|
for cell in cells:
|
|
if any(wl in cell for wl in WHITELIST_COLS):
|
|
break
|
|
if cell in FORBIDDEN_LABELS:
|
|
count += 1
|
|
return count
|
|
|
|
|
|
def check_incomplete_tables(report_sections: dict[str, str]) -> int:
|
|
"""
|
|
핵심 산출 표(stub_token이 실질 데이터여야 할 컬럼에 있을 때) INCOMPLETE_TABLE 판정.
|
|
AGENTS.md R1 기준: blank ≥ 5% = WARN, ≥ 20% = FAIL.
|
|
단, 이 게이트는 교차섹션 일관성 검사 범위이므로
|
|
'지정가·수량·손익률·클러스터%' 핵심 컬럼에 stub이 있는 경우만 집계.
|
|
"""
|
|
CRITICAL_STUBS = {"미산출", "DATA_MISSING", "데이터 누락"}
|
|
incomplete = 0
|
|
for md in report_sections.values():
|
|
table_lines = [l for l in md.splitlines() if l.strip().startswith("|") and "---" not in l]
|
|
if len(table_lines) < 3:
|
|
continue
|
|
data_lines = table_lines[1:]
|
|
critical_stubs = 0
|
|
total_data_cells = 0
|
|
for line in data_lines:
|
|
cells = [c.strip() for c in line.split("|") if c.strip()]
|
|
total_data_cells += len(cells)
|
|
critical_stubs += sum(1 for c in cells if c in CRITICAL_STUBS)
|
|
if total_data_cells > 0 and critical_stubs / total_data_cells >= 0.05:
|
|
incomplete += 1
|
|
return incomplete
|
|
|
|
|
|
def main() -> int:
|
|
report_data = _load_json(REPORT_PATH)
|
|
canon_data = _load_json(CANON_PATH)
|
|
|
|
sections_raw = report_data.get("sections") or []
|
|
report_sections: dict[str, str] = {
|
|
str(s.get("name") or ""): str(s.get("markdown") or "")
|
|
for s in sections_raw if isinstance(s, dict)
|
|
}
|
|
|
|
conflicts = check_conflicts(report_sections, canon_data)
|
|
forbidden_count = check_forbidden_labels(report_sections)
|
|
incomplete_count = check_incomplete_tables(report_sections)
|
|
|
|
conflict_count = len(conflicts)
|
|
today = date.today()
|
|
in_enforcement = today >= ENFORCEMENT_MODE_UNTIL
|
|
|
|
if conflict_count == 0 and forbidden_count == 0 and incomplete_count == 0:
|
|
gate = "PASS"
|
|
elif in_enforcement:
|
|
gate = "FAIL"
|
|
else:
|
|
gate = "WARN"
|
|
|
|
# 점수: conflict 1건당 -10, forbidden 1개당 -2, incomplete 1개당 -5
|
|
score = max(0, 100 - conflict_count * 10 - forbidden_count * 2 - incomplete_count * 5)
|
|
|
|
out = {
|
|
"formula_id": "CROSS_SECTION_CONSISTENCY_V1",
|
|
"generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
"score": score,
|
|
"conflict_count": conflict_count,
|
|
"conflicts": conflicts,
|
|
"forbidden_uniform_labels": forbidden_count,
|
|
"incomplete_tables": incomplete_count,
|
|
"enforcement_mode_until": str(ENFORCEMENT_MODE_UNTIL),
|
|
"enforcement_active": in_enforcement,
|
|
"gate": gate,
|
|
}
|
|
|
|
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
OUT_PATH.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
print(f"CROSS_SECTION_CONSISTENCY_V1: gate={gate} score={score} conflicts={conflict_count} "
|
|
f"forbidden={forbidden_count} incomplete={incomplete_count}")
|
|
if conflicts:
|
|
print(" CONFLICTS:")
|
|
for c in conflicts:
|
|
print(f" [{c['metric']}] section={c['section']} rendered={c['rendered']} canonical={c['canonical']} diff={c['diff']}")
|
|
|
|
return 0 if gate in ("PASS", "WARN") else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|