Files
QuantEngineByItz/tools/validate_harness_sync.py
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

631 lines
26 KiB
Python

"""
validate_harness_sync.py
proposal_id: 2026-05-18_QEH_VALIDATION_V2
목적:
1. LLM의 최종 출력(output.json)이 하네스(harness_context.json)와 일치하는지 검증.
2. final_action, blocked_actions, 가격, 수량, 주문행 추가/삭제 드리프트를 차단한다.
3. legacy harness 와 확장 harness(order_blueprint_json)를 모두 지원한다.
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
from typing import Any
def load_json(path: str) -> dict[str, Any]:
with open(path, "r", encoding="utf-8") as handle:
payload = json.load(handle)
if isinstance(payload, dict) and "data" in payload and isinstance(payload["data"], dict):
maybe = payload["data"].get("_harness_context")
if isinstance(maybe, dict):
return maybe
return payload
def parse_bool(value: Any) -> bool | None:
if isinstance(value, bool):
return value
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"true", "1", "y", "yes"}:
return True
if normalized in {"false", "0", "n", "no"}:
return False
return None
def parse_jsonish(value: Any) -> Any:
if isinstance(value, (list, dict)):
return value
if isinstance(value, str) and value.strip():
return json.loads(value)
return None
def _has_sell_semantics(order_type: str) -> bool:
normalized = order_type.upper()
return normalized in {"SELL", "TRIM", "EXIT_100", "EXIT_FULL"} or "SELL" in normalized
def _has_buy_semantics(order_type: str) -> bool:
normalized = order_type.upper()
return normalized in {"BUY", "STAGED_BUY", "ADD_ON"}
def compute_blueprint_checksum(blueprint: list[dict[str, Any]]) -> int:
s = ""
for row in blueprint:
s += (
f"{row.get('ticker', '')}|"
f"{row.get('order_type', '')}|"
f"{row.get('quantity', '') if row.get('quantity') is not None else ''}|"
f"{row.get('limit_price_krw', '') if row.get('limit_price_krw') is not None else ''}|"
f"{row.get('validation_status', '')};"
)
total = 0
for ch in s:
total = (total + ord(ch)) & 0xFFFFFFFF
return total
def to_number(value: Any) -> float | None:
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
text = value.strip()
if not text:
return None
try:
return float(text)
except ValueError:
return None
return None
def normalize_harness_decisions(harness: dict[str, Any]) -> dict[str, dict[str, Any]]:
raw = harness.get("decisions_json")
if raw is not None:
parsed = parse_jsonish(raw)
if isinstance(parsed, list):
return {str(row.get("ticker")): row for row in parsed if isinstance(row, dict) and row.get("ticker")}
decisions = harness.get("decisions")
if isinstance(decisions, list):
return {str(row.get("ticker")): row for row in decisions if isinstance(row, dict) and row.get("ticker")}
return {}
def normalize_harness_orders(harness: dict[str, Any]) -> dict[tuple[str, str, str], dict[str, Any]]:
order_blueprint = harness.get("order_blueprint_json")
if order_blueprint is not None:
parsed = parse_jsonish(order_blueprint)
if isinstance(parsed, list):
result = {}
for row in parsed:
if not isinstance(row, dict):
continue
key = (
str(row.get("account") or ""),
str(row.get("ticker") or ""),
str(row.get("order_type") or row.get("action") or ""),
)
result[key] = row
return result
result = {}
for row in harness.get("orders", []):
if not isinstance(row, dict):
continue
key = (
str(row.get("account") or ""),
str(row.get("ticker") or ""),
str(row.get("order_type") or row.get("action") or ""),
)
result[key] = row
return result
def normalize_output_orders(output: dict[str, Any]) -> dict[tuple[str, str, str], dict[str, Any]]:
result = {}
for row in output.get("orders", []):
if not isinstance(row, dict):
continue
key = (
str(row.get("account") or ""),
str(row.get("ticker") or ""),
str(row.get("order_type") or ""),
)
result[key] = row
return result
def get_blocked_actions(harness: dict[str, Any]) -> list[str]:
blocked = harness.get("blocked_actions")
if isinstance(blocked, list):
return [str(value) for value in blocked]
risk = harness.get("risk", {})
if isinstance(risk, dict) and isinstance(risk.get("blocked_actions"), list):
return [str(value) for value in risk["blocked_actions"]]
return []
def compare_order_fields(key: tuple[str, str, str], h_order: dict[str, Any], o_order: dict[str, Any], errors: list[str]) -> None:
label = f"{key[0] or 'UNKNOWN_ACCOUNT'}:{key[1]}:{key[2]}"
field_pairs = [
("limit_price_krw", "limit_price_krw"),
("price", "limit_price_krw"),
("quantity", "quantity"),
("stop_price_krw", "stop_price_krw"),
("stop_price", "stop_price_krw"),
("stop_quantity", "stop_quantity"),
("take_profit_price_krw", "take_profit_price_krw"),
("tp1_price", "take_profit_price_krw"),
("take_profit_quantity", "take_profit_quantity"),
]
compared_output_fields: set[str] = set()
for h_field, o_field in field_pairs:
if h_field not in h_order:
continue
if o_field in compared_output_fields:
continue
h_value = h_order.get(h_field)
o_value = o_order.get(o_field)
compared_output_fields.add(o_field)
if h_value != o_value:
errors.append(f"{label} {o_field} mismatch: harness={h_value}, output={o_value}")
def _sync_goal_tracking(harness: dict[str, Any], output: dict[str, Any], errors: list[str]) -> None:
"""F2: harness goal_* 스칼라가 output.goal_tracking 섹션과 일치하는지 비교."""
goal_section = output.get("goal_tracking")
if goal_section is None:
return # 출력에 goal_tracking 섹션 없으면 skip (선택적 섹션)
if not isinstance(goal_section, dict):
errors.append("output.goal_tracking must be an object")
return
for key in ("goal_achievement_pct", "goal_remaining_krw", "goal_status", "goal_eta_label"):
h_val = harness.get(key)
o_val = goal_section.get(key)
if h_val is not None and o_val is not None and h_val != o_val:
errors.append(
f"goal_tracking.{key} mismatch: harness={h_val!r}, output={o_val!r}"
)
def _sync_secular_leader_gate(harness: dict[str, Any], output: dict[str, Any], errors: list[str]) -> None:
"""F2: secular_leader_gate_active=true 구간에서 LLM 출력 TP 준수 확인."""
gate_raw = harness.get("secular_leader_gate_json")
if not gate_raw:
return
gate = parse_jsonish(gate_raw)
if not isinstance(gate, dict):
return
prices_raw = harness.get("prices_json")
prices = parse_jsonish(prices_raw) if prices_raw else None
if not isinstance(prices, list):
return
price_by_ticker: dict[str, dict[str, Any]] = {
str(p.get("ticker")): p for p in prices if isinstance(p, dict) and p.get("ticker")
}
for ticker, gate_info in gate.items():
if not isinstance(gate_info, dict):
continue
if not gate_info.get("active"):
continue
price_row = price_by_ticker.get(ticker)
if not price_row:
continue
tp1_state = price_row.get("tp1_state", "")
tp1_price = price_row.get("tp1_price")
# DEFERRED_SECULAR_LEADER 구간에서 tp1_price가 null이 아니면 위반
if "DEFERRED" in str(tp1_state) and tp1_price is not None:
errors.append(
f"[H3] secular_leader_gate active for {ticker}: "
f"tp1_state={tp1_state!r} but tp1_price={tp1_price} (must be null)"
)
# output.orders에 해당 ticker의 take_profit 주문이 있으면 위반
for o_order in output.get("orders", []):
if not isinstance(o_order, dict):
continue
if str(o_order.get("ticker") or "") == ticker:
o_action = str(o_order.get("order_type") or o_order.get("action") or "")
if "TAKE_PROFIT" in o_action.upper() or "TP" in o_action.upper():
errors.append(
f"[H3] secular_leader_gate active for {ticker}: "
f"TP order detected in output.orders (tp1_state={tp1_state!r}) — must be blocked"
)
def _sync_breakout_quality_gate(harness: dict[str, Any], output: dict[str, Any], errors: list[str]) -> None:
"""H6: BLOCKED_LATE_CHASE 종목에 BUY PASS가 나오지 않는지 확인."""
rows = parse_jsonish(harness.get("breakout_quality_gate_json"))
if not isinstance(rows, list):
return
blocked = {
str(row.get("ticker"))
for row in rows
if isinstance(row, dict) and row.get("breakout_quality_gate") == "BLOCKED_LATE_CHASE"
}
if not blocked:
return
for idx, order in enumerate(output.get("orders", [])):
if not isinstance(order, dict):
continue
ticker = str(order.get("ticker") or "")
order_type = str(order.get("order_type") or order.get("action") or "")
validation = str(order.get("validation_status") or "")
if ticker in blocked and validation == "PASS" and _has_buy_semantics(order_type):
errors.append(
f"[H6] output.orders[{idx}] emits BUY for BLOCKED_LATE_CHASE ticker={ticker}"
)
def _sync_anti_whipsaw_gate(harness: dict[str, Any], output: dict[str, Any], errors: list[str]) -> None:
"""H7: WHIPSAW_SUSPECTED 종목에 PASS 매도 주문이 나오지 않는지 확인."""
rows = parse_jsonish(harness.get("anti_whipsaw_gate_json"))
if not isinstance(rows, list):
return
blocked = {
str(row.get("ticker"))
for row in rows
if isinstance(row, dict) and row.get("anti_whipsaw_gate") == "WHIPSAW_SUSPECTED"
}
if not blocked:
return
for idx, order in enumerate(output.get("orders", [])):
if not isinstance(order, dict):
continue
ticker = str(order.get("ticker") or "")
order_type = str(order.get("order_type") or order.get("action") or "")
validation = str(order.get("validation_status") or "")
if ticker in blocked and validation == "PASS" and _has_sell_semantics(order_type):
errors.append(
f"[H7] output.orders[{idx}] emits SELL/TRIM for WHIPSAW_SUSPECTED ticker={ticker}"
)
def _sync_smart_cash_raise_v2(harness: dict[str, Any], output: dict[str, Any], errors: list[str]) -> None:
"""H8: 포트폴리오 레벨 경로와 row-level 경로, ROUTE_D 발동 근거를 검증."""
portfolio_route = str(harness.get("smart_cash_raise_route") or "NO_ACTION")
rows = parse_jsonish(harness.get("smart_cash_raise_json"))
if not isinstance(rows, list):
return
active_routes = []
for idx, row in enumerate(rows):
if not isinstance(row, dict):
continue
route = str(row.get("smart_cash_raise_route") or "NO_ACTION")
if route != "NO_ACTION":
active_routes.append(route)
if route == "ROUTE_D":
emergency = row.get("emergency_full_sell")
stop_gate = str(row.get("stop_breach_gate") or "")
if emergency is not True and stop_gate != "BREACH":
errors.append(
f"[H8] smart_cash_raise_json[{idx}] ROUTE_D without emergency_full_sell=true or stop_breach_gate=BREACH"
)
if route == "ROUTE_B":
rebound_pct = row.get("rebound_wait_pct")
if rebound_pct != 50:
errors.append(
f"[H8] smart_cash_raise_json[{idx}] ROUTE_B rebound_wait_pct must be 50, got {rebound_pct!r}"
)
if portfolio_route == "NO_ACTION":
if active_routes:
errors.append(
f"[H8] smart_cash_raise_route=NO_ACTION but active row routes exist: {sorted(set(active_routes))}"
)
elif active_routes and portfolio_route not in active_routes:
errors.append(
f"[H8] smart_cash_raise_route={portfolio_route!r} not found in smart_cash_raise_json routes={sorted(set(active_routes))}"
)
def validate_sync(harness_path: str, output_path: str) -> int:
harness = load_json(harness_path)
output = load_json(output_path)
errors: list[str] = []
# Hard-lock: routing/serving/judgment 필수 키 누락 차단
required_keys = (
"request_route",
"bundle_selected",
"prompt_entrypoint",
"json_validation_status",
"capture_required",
"cash_ledger_basis",
"decision_lock",
"prices_lock",
"quantities_lock",
"decision_trace_json",
"decisions_json",
"order_blueprint_json",
)
for key in required_keys:
if key not in harness:
errors.append(f"missing harness key: {key}")
if harness.get("cash_ledger_basis") != "D2_ONLY":
errors.append(f"cash_ledger_basis must be D2_ONLY, found={harness.get('cash_ledger_basis')!r}")
settlement_cash = to_number(harness.get("settlement_cash_d2_krw"))
open_order_amount = to_number(harness.get("open_order_amount_krw"))
buy_power = to_number(harness.get("buy_power_krw"))
if settlement_cash is not None and buy_power is not None:
expected_buy_power = settlement_cash - (open_order_amount or 0.0)
if abs(expected_buy_power - buy_power) > 0.5:
errors.append(
f"buy_power_krw mismatch: harness={buy_power}, expected={expected_buy_power} "
"(must equal settlement_cash_d2_krw - open_order_amount_krw)"
)
harness_decisions = normalize_harness_decisions(harness)
output_final_action = output.get("portfolio_decision", {}).get("final_action")
if len(harness_decisions) == 1:
only_decision = next(iter(harness_decisions.values())).get("final_action")
if only_decision != output_final_action:
errors.append(f"Final action mismatch: harness={only_decision}, output={output_final_action}")
# decision_lock=true면 decision_trace.selected_action 과 decisions.final_action 일치 필수
if parse_bool(harness.get("decision_lock")) is True:
traces = parse_jsonish(harness.get("decision_trace_json"))
if isinstance(traces, list):
trace_by_ticker: dict[str, str] = {}
for row in traces:
if not isinstance(row, dict):
continue
ticker = str(row.get("ticker") or "")
selected = str(row.get("selected_action") or "")
if ticker and selected:
trace_by_ticker[ticker] = selected
for ticker, dec in harness_decisions.items():
final_action = str(dec.get("final_action") or "")
trace_action = trace_by_ticker.get(str(ticker))
if trace_action and final_action and trace_action != final_action:
errors.append(
f"decision_trace mismatch: ticker={ticker} trace={trace_action}, decision={final_action}"
)
if "total_heat_pct" in harness:
output_heat = output.get("risk_gate", {}).get("total_heat_pct")
if harness.get("total_heat_pct") != output_heat:
errors.append(f"Total heat mismatch: harness={harness.get('total_heat_pct')}, output={output_heat}")
if "cash_floor_status" in harness:
output_cash = output.get("risk_gate", {}).get("cash_floor_status")
if harness.get("cash_floor_status") != output_cash:
errors.append(f"Cash floor mismatch: harness={harness.get('cash_floor_status')}, output={output_cash}")
blocked_actions = set(get_blocked_actions(harness))
for row in output.get("orders", []):
order_type = str(row.get("order_type") or "")
if order_type in blocked_actions:
errors.append(f"Blocked action emitted in output.orders: {order_type}")
h_orders = normalize_harness_orders(harness)
o_orders = normalize_output_orders(output)
if "order_blueprint_json" in harness:
parsed_blueprint = parse_jsonish(harness.get("order_blueprint_json"))
if isinstance(parsed_blueprint, list):
stored_row_count = harness.get("blueprint_row_count")
if isinstance(stored_row_count, str) and stored_row_count.isdigit():
stored_row_count = int(stored_row_count)
if stored_row_count != len(parsed_blueprint):
errors.append(f"Blueprint row count mismatch: stored={stored_row_count}, actual={len(parsed_blueprint)}")
stored_checksum = harness.get("blueprint_checksum")
if isinstance(stored_checksum, str) and stored_checksum.isdigit():
stored_checksum = int(stored_checksum)
computed_checksum = compute_blueprint_checksum(parsed_blueprint)
if stored_checksum != computed_checksum:
errors.append(
f"Blueprint checksum mismatch: stored={stored_checksum}, computed={computed_checksum}"
)
rendered_checksum = harness.get("rendered_output_checksum", harness.get("rendered_report_checksum"))
if isinstance(rendered_checksum, str) and rendered_checksum.isdigit():
rendered_checksum = int(rendered_checksum)
if rendered_checksum is not None and rendered_checksum != computed_checksum:
errors.append(
f"Rendered checksum mismatch: stored={rendered_checksum}, computed={computed_checksum}"
)
for key, h_order in h_orders.items():
if key not in o_orders:
errors.append(f"Missing order from output: {key}")
continue
compare_order_fields(key, h_order, o_orders[key], errors)
authoritative = any(
[
parse_bool(harness.get("decision_lock")) is True,
parse_bool(harness.get("prices_lock")) is True,
parse_bool(harness.get("quantities_lock")) is True,
parse_bool(harness.get("sell_priority_lock")) is True,
]
)
if authoritative:
extra = sorted(set(o_orders) - set(h_orders))
for key in extra:
errors.append(f"Extra output order not present in harness: {key}")
# F2: goal tracking sync — harness goal_* vs output goal_tracking section
_sync_goal_tracking(harness, output, errors)
# F2: secular_leader_gate compliance — active=true 구간에서 tp1_price null 보장
_sync_secular_leader_gate(harness, output, errors)
_sync_breakout_quality_gate(harness, output, errors)
_sync_anti_whipsaw_gate(harness, output, errors)
_sync_smart_cash_raise_v2(harness, output, errors)
if errors:
print("HARNESS SYNC FAIL")
for err in errors:
print(f"- {err}")
return 1
print("HARNESS SYNC OK")
return 0
def _extract_number(text: str) -> float | None:
"""마크다운 텍스트에서 첫 번째 숫자(콤마 포함) 추출."""
import re
m = re.search(r"[-+]?[\d,]+(?:\.\d+)?", text)
if not m:
return None
try:
return float(m.group().replace(",", ""))
except ValueError:
return None
def _parse_markdown_tables(text: str) -> list[dict[str, str]]:
"""마크다운 표에서 모든 행을 {header: cell} 형태로 추출."""
import re
rows: list[dict[str, str]] = []
headers: list[str] = []
for line in text.splitlines():
line = line.strip()
if not line.startswith("|"):
headers = []
continue
cells = [c.strip() for c in line.strip("|").split("|")]
if re.match(r"^[-:| ]+$", line):
continue # separator row
if not headers:
headers = cells
else:
if len(cells) >= len(headers):
rows.append(dict(zip(headers, cells)))
return rows
def validate_from_markdown(harness_path: str, report_path: str) -> int:
"""I2: 사람용 마크다운/텍스트 보고서에서 핵심 숫자를 추출해 하네스와 비교.
추출 대상:
- D+2 현금 / buy_power_krw (settlement_cash_d2_krw, buy_power_krw)
- Total Heat (total_heat_pct)
- cash_floor_status
- blocked_actions 내 주문 유형
"""
import re
harness = load_json(harness_path)
report_text = Path(report_path).read_text(encoding="utf-8")
errors: list[str] = []
table_rows = _parse_markdown_tables(report_text)
# ── 핵심 숫자 매핑 ────────────────────────────────────────────────────────
# 1. D+2 현금 (settlement_cash_d2_krw / buy_power_krw)
h_settlement = to_number(harness.get("settlement_cash_d2_krw"))
h_buy_power = to_number(harness.get("buy_power_krw"))
h_heat = harness.get("total_heat_pct")
h_cash_floor = harness.get("cash_floor_status")
blocked_set = set(get_blocked_actions(harness))
found_settlement = False
found_heat = False
def _get_value_cell(row: dict[str, str]) -> str | None:
"""항목|확인값|평가 형식의 표에서 값 셀 반환. 일반 key:value 표도 지원."""
for key in ("확인값", "value", "하네스 값", "harness_value"):
if key in row:
return row[key]
# fallback: 두 번째 컬럼
values = list(row.values())
return values[1] if len(values) >= 2 else None
def _get_key_cell(row: dict[str, str]) -> str | None:
"""항목|확인값 형식의 표에서 필드명 셀 반환."""
for key in ("항목", "field", "필드", "key", "필드명"):
if key in row:
return row[key]
values = list(row.values())
return values[0] if values else None
for row in table_rows:
key_cell = _get_key_cell(row) or ""
val_cell = _get_value_cell(row) or ""
# settlement_cash_d2_krw / D+2 현금
if any(t in key_cell for t in ("settlement_cash_d2", "D+2 현금", "D+2현금", "d2_cash")):
val = _extract_number(val_cell)
if val is not None and h_settlement is not None:
if abs(val - h_settlement) > 1:
errors.append(
f"[MD] settlement_cash_d2_krw mismatch: report={val}, harness={h_settlement}"
)
found_settlement = True
# total_heat_pct
if any(t in key_cell for t in ("total_heat_pct", "Total Heat", "heat_pct")):
val = _extract_number(val_cell)
if val is not None and h_heat is not None:
h_heat_num = to_number(h_heat)
if h_heat_num is not None and abs(val - h_heat_num) > 0.01:
errors.append(
f"[MD] total_heat_pct mismatch: report={val}, harness={h_heat}"
)
found_heat = True
# cash_floor_status — val_cell이 정확히 상태 코드만 포함하는 경우만 비교
if "cash_floor_status" in key_cell:
# val_cell이 ASCII로만 구성된 경우만 신뢰 (한글 혼합 셀은 인코딩 오탐 방지)
if h_cash_floor and val_cell and val_cell.isascii():
if str(h_cash_floor) not in val_cell:
errors.append(
f"[MD] cash_floor_status mismatch: report={val_cell!r}, harness={h_cash_floor!r}"
)
# blocked_actions — 보고서에 BLOCKED인 주문 유형이 허용된 것처럼 기술됐는지 확인
for action in blocked_set:
pattern = re.compile(
r"(?:허용|가능|실행|주문)\s*[:\-]?\s*" + re.escape(action), re.IGNORECASE
)
if pattern.search(val_cell):
errors.append(
f"[MD] blocked action '{action}' appears as allowed in report: {val_cell[:80]!r}"
)
# buy_power_krw — 단독 숫자 검색 (표 셀이 아닌 본문에서도)
if h_buy_power is not None:
buy_power_pattern = re.compile(r"buy_power_krw\s*[:\|]\s*([\d,]+)")
for m in buy_power_pattern.finditer(report_text):
val = _extract_number(m.group(1))
if val is not None and abs(val - h_buy_power) > 1:
errors.append(
f"[MD] buy_power_krw mismatch: report={val}, harness={h_buy_power}"
)
result = {
"harness_path": harness_path,
"report_path": report_path,
"found_settlement": found_settlement,
"found_heat": found_heat,
"errors": errors,
"status": "MARKDOWN_SYNC_FAIL" if errors else "MARKDOWN_SYNC_OK",
}
print(json.dumps(result, ensure_ascii=False, indent=2))
return 1 if errors else 0
if __name__ == "__main__":
if len(sys.argv) >= 2 and sys.argv[1] == "--from-markdown":
if len(sys.argv) < 4:
print("Usage: python tools/validate_harness_sync.py --from-markdown <harness_json> <report_txt_or_md>")
sys.exit(1)
sys.exit(validate_from_markdown(sys.argv[2], sys.argv[3]))
if len(sys.argv) < 3:
print("Usage: python tools/validate_harness_sync.py <harness_json> <output_json>")
print(" python tools/validate_harness_sync.py --from-markdown <harness_json> <report.txt|.md>")
sys.exit(1)
sys.exit(validate_sync(sys.argv[1], sys.argv[2]))