Files
QuantEngineByItz/tools/validate_specs.py
kjh2064 5166750b53 WBS-7.3/7.4/7.5/7.11: 거버넌스 문서 정합성 정리 + spec-코드 동기화 게이트
2026-06-21 비판적 리뷰에서 spec/governance YAML이 코드 상태와 어긋난
채로 방치되던 3개 구체적 사례를 발견하고 정정했다. 근본 원인(동기화를
보장하는 장치 없음)에 대응하는 신규 CI 게이트도 함께 추가한다.

- spec/aliases.yaml: deprecated alias 17건 제거(활성 참조 0건 확인 후,
  2026-06-30 데드라인 전). role: deprecated_redirect인 spec/03_risk_policy.yaml,
  spec/04_strategy_rules.yaml 2개만 실삭제 — spec/06_exit_policy.yaml은
  role: compatibility_index(영구유지 설계)였음을 재확인해 보존
- governance/gas_logic_migration_ledger_v1.yaml: 존재하지 않는 파일을
  canonical 구현으로 인용하던 오류 2건 발견·정정, parity 테스트 부재로
  GAS 코드 삭제 보류(F12/F13/F14)
- spec/13_formula_registry.yaml: OVERHANG_PRESSURE_V1의 "-500000"
  절대값 폴백을 avg_volume_5d 비례식으로 교체(EXPERT_PRIOR 등록)
- tools/validate_specs.py: validate_spec_code_sync() 신규 — has_code_implementation/
  code_path 필드가 있는 spec만 검사(점진적 롤아웃, 기존 PASS 상태 비파괴),
  12개 파일 1차 태깅
2026-06-21 20:08:48 +09:00

916 lines
49 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import re
import sys
from datetime import date
from pathlib import Path
import yaml
try:
import jsonschema
except Exception: # pragma: no cover - optional dependency
jsonschema = None
ROOT = Path(__file__).resolve().parents[1]
SCHEMA_VERSION = "2026-05-15-F6-compat-output"
MAX_SPEC_BYTES = 50_000
def fail(errors: list[str], message: str) -> None:
errors.append(message)
def load_yaml(path: Path, errors: list[str]):
try:
return yaml.safe_load(path.read_text(encoding="utf-8"))
except Exception as exc:
fail(errors, f"YAML parse failed: {path}: {type(exc).__name__}: {exc}")
return None
def load_json(path: Path, errors: list[str]):
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception as exc:
fail(errors, f"JSON parse failed: {path}: {type(exc).__name__}: {exc}")
return None
def validate_json_schema_minimal(schema: dict, sample: dict, errors: list[str]) -> None:
if jsonschema is not None:
try:
jsonschema.validate(instance=sample, schema=schema)
return
except Exception as exc:
fail(errors, f"jsonschema validation failed: {type(exc).__name__}: {exc}")
return
required = schema.get("required", [])
for key in required:
if key not in sample:
fail(errors, f"full_output_valid.json missing required field: {key}")
expected = schema.get("properties", {}).get("schema_version", {}).get("const")
if sample.get("schema_version") != expected:
fail(errors, f"sample schema_version mismatch: {sample.get('schema_version')} != {expected}")
for order in sample.get("orders", []):
for q_key in ("quantity", "stop_quantity", "take_profit_quantity"):
value = order.get(q_key)
if value is not None and not isinstance(value, int):
fail(errors, f"order {q_key} must be integer or null")
def base_field_name(field: str) -> str:
name = str(field).replace("positions[].", "")
return name.split(".", 1)[0]
def validate_formula_registry(errors: list[str]) -> None:
dictionary = load_yaml(ROOT / "spec" / "12_field_dictionary.yaml", errors) or {}
registry = load_yaml(ROOT / "spec" / "13_formula_registry.yaml", errors) or {}
harness_registry = load_yaml(ROOT / "spec" / "13b_harness_formulas.yaml", errors) or {}
fields = (dictionary.get("field_dictionary") or {}).get("fields") or {}
canonical_names = {meta.get("canonical_name") for meta in fields.values() if isinstance(meta, dict)}
formulas = ((registry.get("formula_registry") or {}).get("formulas")) or {}
harness_formulas = ((harness_registry.get("formula_registry") or {}).get("formulas")) or {}
all_formulas = {**formulas, **harness_formulas}
if not canonical_names:
fail(errors, "field_dictionary has no canonical fields")
if not formulas:
fail(errors, "formula_registry has no formulas")
# Proposal51 신규 공식: inputs/output 대신 output_contract/checks/layers 구조 사용 허용
# Phase-1/2/3 Python-tool 공식: GAS 하네스가 아닌 Python tools로 구현, inputs 필드 없음 (의도적)
_ALTERNATE_STRUCTURE_FORMULAS_ = {
"EXPORT_GATE_V2", "PROACTIVE_SELL_RADAR_V2", "ANTI_LATE_ENTRY_GATE_V3",
"PRICE_HIERARCHY_LOCK_V1", "DATA_QUALITY_GATE_V2", "CASH_RECOVERY_DISPLAY_LOCK_V1",
"SEMICONDUCTOR_CLUSTER_SYNC_V1", # inputs 있으나 output_contract 구조
"FUNDAMENTAL_MULTI_FACTOR_SCORE_V2", "EARNINGS_GROWTH_QUALITY_GATE_V1",
"MARKET_SHARE_MOMENTUM_PROXY_V1", "CASHFLOW_STABILITY_GATE_V1",
"ROUTING_DECISION_EXPLAIN_LOCK_V1",
# Phase-1 Python-tool-only 공식 (inputs 필드 없음, expected_outputs 구조)
"BLANK_CELL_AUDIT_V1", "VALUE_PRESERVATION_SCORER_V1",
"SMART_CASH_RECOVERY_V3", "RATCHET_TRAILING_GENERAL_V1",
"EJCE_VIEW_RENDERER_V1", "ROUTING_EXECUTION_LOG_TABLE_V1",
# Phase-2 Python-tool-only 공식
"FUNDAMENTAL_RAW_INGEST_V1", "FUNDAMENTAL_MULTIFACTOR_V3",
"HORIZON_CLASSIFICATION_V1",
# Phase-2B Python-tool-only 공식
"EARNINGS_QUALITY_SIGNAL_V1", "GROWTH_RATE_SIGNAL_V1",
"CASHFLOW_QUALITY_SIGNAL_V1",
# Phase-3 Python-tool-only 공식
"SMART_MONEY_FLOW_SIGNAL_V2", "LIQUIDITY_FLOW_SIGNAL_V1",
"PORTFOLIO_ALPHA_CONFIDENCE_PER_TICKER_V1",
# Phase-3 Market Share V2 (proxy-based)
"MARKET_SHARE_SIGNAL_V2",
# Phase-4~5 Python-tool-only 공식 (실측 반영 + 신규 하네스)
"TRADE_QUALITY_FROM_T5_V1", "PREDICTION_ACCURACY_HARNESS_V2",
"MACRO_EVENT_TICKER_IMPACT_V1", "SELL_WATERFALL_ENGINE_V2",
"LLM_NARRATIVE_TEMPLATE_LOCK_V1", "EJCE_DIVERGENCE_AUDIT_V1",
"PREDICTIVE_ALPHA_REPORT_LOCK_V2",
# Phase-6 Python-tool-only 공식 (판단 결정론 계층)
"SMART_MONEY_LIQUIDITY_GATE_V1", "FINAL_JUDGMENT_GATE_V1",
"VERDICT_CONSISTENCY_LOCK_V1", "INVESTMENT_QUALITY_HEADLINE_V1",
# Phase-7 단일 진실원천 + 교차섹션 정합성 게이트
"CANONICAL_METRICS_V1", "CROSS_SECTION_CONSISTENCY_V1",
# Work 7 + Work 3 분석 도구
"ALPHA_FEEDBACK_LOOP_V2", "ALPHA_LEAD_THRESHOLD_OPTIMIZER_V1",
# ENGINE_AUDIT — Python-tool-only 감사 게이트 (GAS 런타임 비개입)
"IMPUTED_DATA_EXPOSURE_GATE_V1",
# Phase-8 비기계적 매도전략 — confluence 기반 판단 게이트 (output_contract 구조)
"SHORT_INTEREST_RISK_GAUGE_V1", "QUALITATIVE_SELL_STRATEGY_V1",
"MARKET_REGIME_CLASSIFIER_V1", "SATELLITE_CANDIDATE_SCORE_V1",
"MICROSTRUCTURE_PRESSURE_FROM_ORDERBOOK_V1",
}
for formula_id, formula in all_formulas.items():
if not isinstance(formula, dict):
fail(errors, f"formula must be mapping: {formula_id}")
continue
if formula_id in _ALTERNATE_STRUCTURE_FORMULAS_ or str(formula.get("version", "")).endswith("_ORPHAN_RECONCILE"):
if "purpose" not in formula:
fail(errors, f"formula missing purpose: {formula_id}")
continue # inputs/output 구조 검사 스킵
for key in ("purpose", "inputs", "output"):
if key not in formula:
fail(errors, f"formula missing {key}: {formula_id}")
# GAS-internal computed fields: not in field_dictionary by design
_INPUT_INTERNAL_ALLOWLIST_ = {
"price", # GAS price object (price.ret10D, price.close, …)
"globalKospiRet10D_", # preReads KOSPI 10-day return
"portfolioStats", # harness aggregate stats object
"satellite_holdings", # harness portfolio-level array
"satellite_holdings[]", # base_field_name result for satellite_holdings[].field
# Sprint B/C harness-internal aggregate objects (not in field_dictionary by design)
"metadata", # harness metadata object (capturedAt, market_date)
"today_date", # GAS runtime date passed to freshness gate
"monthly_history[]", # monthly_history sheet aggregate (AFL inputs)
"alpha_history[]", # alpha_history sheet aggregate (AFL inputs)
# [3RD_HARNESS] harness-internal / batch-only fields
"monthly_history", # PATTERN_BLACKLIST batch input (full sheet array)
"highest_close", # PROFIT_RATCHET_TIERED_V2 — GAS runtime max(close)
"oversold_gate", # K2_STAGED_REBOUND_SELL_V1 output used as downstream input
"h2_priority_rank", # sell_priority sheet rank (GAS runtime integer)
"waterfall_plan_json", # SELL_WATERFALL_ENGINE_V1 output piped as input
"cash_recovery_plan_json", # CASH_RECOVERY_OPTIMIZER_V1 output piped as input
"trade_quality_json", # TRADE_QUALITY_SCORER_V1 output piped to PATTERN_BLACKLIST
"proactive_sell_radar_json",
"routing_trace_json",
"export_gate_json",
"opm_pct",
"revenue_growth_pct",
"market_share_proxy_pct",
"free_cf_krw",
"eps_growth_qoq_pct",
"eps_growth_yoy_pct",
"accrual_ratio_pct",
# Price/velocity fields computed from core_satellite sheet
"velocity_1d", # (close-prev_close)/prev_close*100 — derived
"velocity_5d", # Ret5D from core_satellite
"prev_close", # PrevClose from core_satellite
"obv_slope_20d", # OBV 20-day slope — technical indicator
"anti_chasing_status", # ANTI_CHASING_VELOCITY_V1 output piped downstream
# Intraday/timing fields from INTRADAY_ACTION_MATRIX_V1
"gap_down_pct", # (open - prev_close)/prev_close — intraday gap
"intraday_drop", # (close - open)/open — intraday drop
"intraday_change", # real-time price change pct
"time_slot_label", # PRE_MARKET/INTRADAY/POST_MARKET label
# TRADE_QUALITY_SCORER_V1 batch-only historical fields (not live feed fields)
"velocity_1d_at_entry", "ma20_at_entry", "volume_ratio_at_entry",
"t5_return_pct", "t20_vs_core_pctp",
"sell_price", "ma20_at_sell", "price_t5_after_sell",
"cash_recovered_krw",
# Orchestrator/meta formula context objects
"harness_context", # DETERMINISTIC_ROUTING_ENGINE_V1 full context
# Sell order / price fields from sell_priority sheet (GAS runtime)
"sell_limit_price", # GAS computed sell limit price (Sell_Limit_Price column)
"stop_loss_price", # stop price from account_snapshot (stop_price column)
"tick_unit", # KRX tick unit size (GAS computed from close level)
# Timestamp / market-date fields
"capture_time", # HTS capture timestamp (from account_snapshot.captured_at)
"market_date", # GAS runtime market date string
# Historical close prices for velocity computation
"close_1d_ago", # PrevClose from core_satellite
"close_5d_ago", # Close 5 days prior (from price history / ret5d back-calc)
# [PROPOSAL50] GAS 하네스 내부 집계 객체 (field_dictionary 미등록 의도)
"df", # 종목별 데이터 피드 맵 슬라이스 (dfMap[ticker])
"paeRow", # PAE 엔진 출력 행 (per-ticker predictive_alpha row)
"hApex", # 하네스 Apex 컨텍스트 집계 객체
"holdings", # 보유 종목 배열 (GAS 런타임 집계)
"dfMap", # 전체 데이터 피드 맵 (ticker→df)
"cashShortfallInfo", # 현금부족 정보 집계 객체
"h2", # 매도우선순위 레이어 집계 객체
"semiconductorClusterGate", # 반도체 클러스터 게이트 출력 객체
"macroJson", # getMacroJson() 반환값 — 거시 지표 집계 객체
"mesResult", # MACRO_EVENT_SYNCHRONIZER_V1 출력 객체
"h3", # 수량 레이어 집계 객체
"totalAsset", # 총자산 KRW (GAS 런타임 스칼라)
"capturedAtIso", # HTS 캡처 타임스탬프 ISO8601 문자열
"now", # GAS 런타임 Date 객체
# [PROPOSAL50] 신규 함수 GAS 내부 입력
"blueprints", # SHADOW_LEDGER_V1 — order_blueprint_json 배열
"order_condition_text", # VALIDATE_ORDER_CONDITION_V1 — 주문 조건 텍스트
"avg_trade_val_5d", # AVG_TRADE_VALUE_SIGNAL_V1 — 5일 평균 거래대금
"avg_trade_val_20d", # AVG_TRADE_VALUE_SIGNAL_V1 — 20일 평균 거래대금
"profit_lock_stage", # AVG_TRADE_VALUE_SIGNAL_V1 — 수익 잠금 스테이지
"sell_candidates_json", # TRIM_PLAN_MIN_CASH_V1 내부 입력
"sell_quantities_json", # TRIM_PLAN_MIN_CASH_V1 내부 입력
# Phase-1/2/3 deterministic harness internal inputs
"operational_report_json",
"Close",
"MA20",
"MA60",
"ATR20",
"RSI14",
"BB_Position",
"Frg_5D",
"Inst_5D",
"AvgTradeValue_5D_M",
"AvgTradeValue_20D_M",
"Recovery_Ratio_5D",
"Stock_Drawdown_From_High_Pct",
"value_preservation_scorer_v1_json",
"scrs_v2_json",
"macro_risk_regime",
"Spread_Pct",
"Profit_Pct",
"High52W",
"Stop_Price_Est",
"Account_Avg_Cost",
"ejce_json",
"breakout_quality_gate_json",
"anti_chasing_velocity_json",
"portfolio_alpha_confidence",
"routing_execution_log",
"alpha_lead_json",
"_harness_context",
# [NF1~NF5] Python-harness 보조 공식 전용 입력 (GAS 미사용, field_dictionary 미등록 의도)
"ticker_type", # NF1: export | domestic | neutral 분류
"base_macro_score", # NF1: 거시팩터 기본 점수
"down_streak", # NF2: 연속 하락 일수 (prices_json 파생)
"t5_ledger", # NF3: proposal_evaluation_history 비-REPLAY T+5 행
"cut_decile", # NF3: BUY 차단 분위 경계 (EXPERT_PRIOR=3)
"sample_n", # NF3: 표본 수 (캘리브레이션 판단용)
"sell_qty", # NF4: 매도 수량 (scrs_v2 selected_combo 파생)
"prev_trail_stop", # NF5: 이전 래칫 손절가 (ratchet_trailing_general 파생)
"high_since_entry", # NF5: 진입 후 최고가 (prices_json 파생)
"profit_pct", # NF5: 수익률 % (account_snapshot 파생)
"buy_timing_score", # NF3: entry-timing proxy (proposal_evaluation_history 파생)
"adv20", # NF4: 20일 평균 거래대금 (prices_json 파생)
"emergency_full_sell", # NF4: 비상 전량매도 플래그 (scrs_v2 파생)
}
for input_item in formula.get("inputs", []):
field = base_field_name(input_item.get("field", ""))
if field and field not in canonical_names and field not in _INPUT_INTERNAL_ALLOWLIST_:
fail(errors, f"formula input field not in field_dictionary: {formula_id}: {field}")
output = formula.get("output") or {}
output_field = base_field_name(output.get("field", ""))
# Intermediate derived fields do not need dictionary entries, but final formula outputs should be named.
if output_field and output_field not in canonical_names and output_field not in {
"flow_credit",
"total_heat_pct",
"expected_edge",
"target_cash_pct",
"final_quantity",
"peg_gate_result",
"take_profit_ladder_v2",
"financial_health_score",
"portfolio_beta",
"ratchet_stop_price",
"tick_normalized_price",
"alpha_lead_json",
"follow_through_json",
"distribution_risk_json",
"profit_preservation_json",
"cash_raise_plan_json",
"rebound_sell_trigger_json",
"execution_quality_json",
"buy_permission_json",
"smart_sell_quantities_json",
"limit_price_policy_json",
# Sprint B/C new formula outputs
"data_freshness_status", # HARNESS_DATA_FRESHNESS_GATE_V1
"satellite_lifecycle_stage", # SATELLITE_LIFECYCLE_GATE_V1
"cla_exit_status", # CLA_REGIME_EXIT_CONDITION_V1
"satellite_cluster_beta", # PORTFOLIO_CORRELATION_GATE_V1
"alpha_feedback_json", # ALPHA_FEEDBACK_LOOP_V1
# [3RD_HARNESS] new formula outputs
"sell_price_sanity_status", # SELL_PRICE_SANITY_V1
"cash_recovery_plan_json", # CASH_RECOVERY_OPTIMIZER_V1
"intraday_scope", # INTRADAY_ACTION_MATRIX_V1
"anti_chasing_verdict", # ANTI_CHASING_VELOCITY_V1
"pullback_entry_verdict", # PULLBACK_ENTRY_TRIGGER_V1
"distribution_sell_detector_status", # DISTRIBUTION_SELL_DETECTOR_V1
"waterfall_plan_json", # SELL_WATERFALL_ENGINE_V1
"sell_timing_verdict", # SELL_EXECUTION_TIMING_V1
"routing_execution_log", # DETERMINISTIC_ROUTING_ENGINE_V1
"llm_constraint_status", # LLM_SERVING_CONSTRAINT_V1
"auto_trailing_stop_v2", # PROFIT_RATCHET_TIERED_V2
"preservation_verdict", # SELL_VALUE_PRESERVATION_TIERED_V2
"trade_quality_json", # TRADE_QUALITY_SCORER_V1
"pattern_blacklist_status", # PATTERN_BLACKLIST_AUTO_V1
"velocity_5d", # computed from ret5d / used in SELL_VALUE_PRES.
"fundamental_quality_json",
"horizon_allocation_json",
"smart_money_liquidity_json",
"routing_serving_trace_v2_json",
"fundamental_multifactor_json",
"earnings_growth_quality_json",
"market_share_proxy_json",
"cashflow_stability_json",
"routing_decision_explain_json",
"blank_cell_audit_v1_json",
"value_preservation_scorer_v1_json",
"smart_cash_recovery_v3_json",
"ratchet_trailing_general_v1_json",
"ejce_view_renderer_v1_json",
"routing_execution_log_v1_json",
"pullback_state", # PULLBACK_ENTRY_TRIGGER_V1 output
"serving_constraint_check", # LLM_SERVING_CONSTRAINT_V1 output
"anti_chasing_velocity_status", # ANTI_CHASING_VELOCITY_V1 output
# [3RD_HARNESS_V1] 커버리지 완성 추가 출력 필드
"ratchet_stage_v2", # PROFIT_RATCHET_TIERED_V2
"profit_lock_stage", # PROFIT_LOCK_RATCHET_V1
"auto_trailing_stop", # PROFIT_LOCK_RATCHET_V1
"flow_acceleration_status", # FLOW_ACCELERATION_V1
"signals_count", # DISTRIBUTION_SELL_DETECTOR_V1
"pullback_entry_trigger_price", # PULLBACK_ENTRY_TRIGGER_V1
"sell_execution_window", # SELL_EXECUTION_TIMING_V1
"tick_normalized_price", # TICK_NORMALIZER_V1 (duplicate-safe)
"brt_verdict", # BENCHMARK_RELATIVE_TIMESERIES_V1
"brt_rs_slope", # BENCHMARK_RELATIVE_TIMESERIES_V1
"rs_verdict", # RS_VERDICT_V2
"saqg_verdict", # SATELLITE_ALPHA_QUALITY_GATE_V1
"sapg_verdict", # SATELLITE_AGGREGATE_PNL_GATE_V1
"tick_normalized_prices_json", # TICK_NORMALIZER_V1 per-ticker map
"ratchet_v2_per_ticker_json", # PROFIT_RATCHET_TIERED_V2 per-ticker
"sell_price_sanity_per_ticker_json", # SELL_PRICE_SANITY_V1 per-ticker
"decisions_json", # DETERMINISTIC_ROUTING_ENGINE_V1 (updated)
"comprehensive_proposal_json", # HS010-B 판단 제안표 원천 데이터
"satellite_candidate_json", # HS010-C 위성 후보 스크리닝
"satellite_candidate_summary", # HS010-C 요약
# SPRINT 1 신규 필드 (Direction O1/O2/O5/P1/P3/P5/A2/B1/B3/K2/C1/D1)
"semiconductor_cluster_json", # O2 SEMICONDUCTOR_CLUSTER_GATE_V1
"single_position_weight_json", # O1 SINGLE_POSITION_WEIGHT_CAP_V1
"position_count", # O5 POSITION_COUNT_LIMIT_V1
"position_count_max", # O5
"position_count_gate", # O5
"stop_breach_alert_json", # P1 STOP_BREACH_ALERT_V1
"heat_concentration_json", # P3 HEAT_CONCENTRATION_ALERT_V1
"portfolio_health_blocked_json", # P5 PORTFOLIO_HEALTH_SCORE_V1
"anti_chasing_velocity_json", # A2+B1 ANTI_CHASING_VELOCITY_V1
"distribution_sell_detector_json", # B3 DISTRIBUTION_SELL_DETECTOR_V1
"k2_staged_rebound_sell_json", # K2 K2_STAGED_REBOUND_SELL_V1
"cash_recovery_plan_json", # C1/A3 SELL_WATERFALL_ENGINE_V1
# SPRINT 2 신규 필드 (Direction REGIME_CLA/RS_VERDICT/RAG)
"regime_cla_json", # REGIME_CLA CONCENTRATED_LEADER_ADVANCE_V1
"cla_exit_status", # REGIME_CLA CLA_EXIT_CONFIRMED / CLA_ACTIVE
"rag_v1", # RAG REPLACEMENT_ALPHA_GATE_V1
"rag_reason", # RAG 사유 텍스트
"rs_verdict_source", # RS_VERDICT V2_FUSION / V1_ONLY
"rs_verdict_v1_raw", # RS_VERDICT V1 원시값
# SPRINT 3 신규 필드 (Direction L4)
"pre_distribution_warning", # L4 PRE_DISTRIBUTION_EARLY_WARNING_V1
# SPRINT 4 신규 필드 (Direction SFG/F1/F2/PCG)
"sfg_v1", # SFG SATELLITE_FAILURE_GATE_V1 스칼라
"sfg_broken_count", # SFG 위성 BROKEN 종목 수
"sfg_failure_rate", # SFG 위성 실패율 (0.01.0)
"pattern_blacklist_json", # F2 PATTERN_BLACKLIST_AUTO_V1
"portfolio_correlation_gate_json", # PCG PORTFOLIO_CORRELATION_GATE_V1
"correlation_gate_status", # PCG 상태 스칼라
# [PROPOSAL46] 신규 하네스 출력 필드
"predictive_alpha_json", # PA1 PREDICTIVE_ALPHA_ENGINE_V1
"anti_late_entry_json", # PA2 ANTI_LATE_ENTRY_GATE_V2
"cash_preservation_sell_json", # PA3 CASH_PRESERVATION_SELL_ENGINE_V2
"macro_event_json", # PA4 MACRO_EVENT_SYNCHRONIZER_V1
"consistency_report_json", # PA5 CONSISTENCY_VALIDATOR_V2
# [PROPOSAL50] 신규 하네스 출력 필드
"ejce_json", # EJCE-V1 EXPERT_JUDGMENT_CONSENSUS_ENGINE_V1
"scrs_v2_json", # SCRS-V2 SMART_CASH_RECOVERY_SELL_ENGINE_V2
"mrag_v2_json", # MRAG-V2 MACRO_REGIME_ADAPTIVE_GATE_V2
"mandatory_reduction_json", # M5 V1.1 MANDATORY_REDUCTION_PLAN_V1
"serving_lock_json", # DSLE-V1 DETERMINISTIC_SERVING_LOCK_ENGINE_V1
"order_condition_validation", # HS007 VALIDATE_ORDER_CONDITION_V1
"shadow_ledger_json", # H10 SHADOW_LEDGER_V1
"llm_serving_constraint_json", # D2 LLM_SERVING_CONSTRAINT_V1
"avg_trade_val_signal_json", # H6 AVG_TRADE_VALUE_SIGNAL_V1
# [Advanced Harness Architecture]
"dynamic_value_preservation_sell_v6_json", # DYNAMIC_VALUE_PRESERVATION_SELL_V6
"predictive_alpha_engine_v2_json", # PREDICTIVE_ALPHA_DIALECTIC_ENGINE_V2
"capital_style_time_stop_v1_json", # CAPITAL_STYLE_TIME_STOP_V1
"execution_integrity_gate_v1_json", # EXECUTION_INTEGRITY_GATE_V1
# [NF1~NF5] Python-harness 보조 공식 출력 (field_dictionary 미등록 의도)
"macro_factor_applied", # NF1 REGIME_CONDITIONAL_MACRO_FACTOR_V1
"rebound_capture_hit", # NF2 REBOUND_CAPTURE_THESIS_FACTOR_V1
"velocity_decile_thresholds", # NF3 ENTRY_TIMING_DECILE_FACTOR_V1
"max_child_qty", # NF4 SELL_SLIPPAGE_BUDGET_FACTOR_V1
"trail_stop", # NF5 PROFIT_GIVEBACK_RATCHET_FACTOR_V1
}:
fail(errors, f"formula output field not registered or allowlisted: {formula_id}: {output_field}")
def validate_output_rendering_contract(schema: dict | None, errors: list[str]) -> None:
output_spec = load_yaml(ROOT / "spec" / "07_output_schema.yaml", errors) or {}
report_template = load_yaml(ROOT / "RetirementAssetPortfolioReportTemplate.yaml", errors) or {}
analysis_prompt = (ROOT / "prompts" / "analysis_prompt.md").read_text(encoding="utf-8")
display_policy = ((output_spec.get("recommendation_grade") or {}).get("display_policy")) or {}
sequence = display_policy.get("output_sequence") or {}
# I1: routing_serving_trace → QEH_AUDIT_BLOCK이 step_0a/0b로 등록됐는지 확인
if sequence.get("step_0a") != "routing_serving_trace":
fail(errors, "output_sequence missing step_0a=routing_serving_trace (I1/G4 required)")
if sequence.get("step_0b") != "QEH_AUDIT_BLOCK":
fail(errors, "output_sequence missing step_0b=QEH_AUDIT_BLOCK (I1/G4 required)")
expected_prefix = [
"capture_read_ledger",
"data_completeness_matrix",
"backdata_feature_bank_table",
"benchmark_relative_harness_table",
"alpha_lead_table",
"anti_distribution_table",
"profit_preservation_table",
"smart_cash_raise_table",
"execution_quality_table",
"order_quantity_4stage_gate",
"decision_trace_table",
"sell_priority_decision_table",
"current_holdings_analysis_report_template",
]
actual_prefix = [sequence.get(f"step_{index}") for index in range(1, len(expected_prefix) + 1)]
if actual_prefix != expected_prefix:
fail(errors, f"output_sequence prefix mismatch: {actual_prefix} != {expected_prefix}")
# I1: human_report.required_sections 검증
human_report = output_spec.get("human_report") or {}
required_sections = human_report.get("required_sections") or []
required_section_names = {s.get("name") for s in required_sections if isinstance(s, dict)}
for mandatory in (
"routing_serving_trace",
"QEH_AUDIT_BLOCK",
"decision_trace_table",
"backdata_feature_bank_table",
"alpha_lead_table",
"anti_distribution_table",
"smart_cash_raise_table",
"execution_quality_table",
"prediction_evaluation_improvement_report",
):
if mandatory not in required_section_names:
fail(errors, f"human_report.required_sections missing: {mandatory}")
# I4: watch_ledger 컬럼 제한 검증
watch_ledger = human_report.get("watch_ledger") or {}
forbidden = set(watch_ledger.get("forbidden_columns") or [])
hs010_forbidden = {"지정가", "손절가", "익절가", "주문수량", "주문금액"}
if not hs010_forbidden.issubset(forbidden):
fail(errors, f"watch_ledger.forbidden_columns missing HS010-I4 terms: {sorted(hs010_forbidden - forbidden)}")
if "two_phase_rendering" not in (output_spec.get("json_output_contract") or {}):
fail(errors, "json_output_contract missing two_phase_rendering")
if "terminology_control" not in (output_spec.get("output_format") or {}):
fail(errors, "output_format missing terminology_control")
terms = {
item.get("term")
for item in ((output_spec.get("output_format") or {}).get("terminology_control") or {}).get("prohibited_freeform_terms", [])
if isinstance(item, dict)
}
expected_terms = {"부분감액", "1차 감액", "부분정리", "전량", "전량매도"}
if not expected_terms.issubset(terms):
fail(errors, f"terminology_control missing prohibited terms: {sorted(expected_terms - terms)}")
templates = report_template.get("output_format_templates") or {}
rendering_contract = templates.get("rendering_contract") or {}
prohibited_order_terms = set(((rendering_contract.get("terminology_rule") or {}).get("prohibited_order_terms")) or [])
if not expected_terms.issubset(prohibited_order_terms):
fail(errors, f"report rendering_contract missing prohibited_order_terms: {sorted(expected_terms - prohibited_order_terms)}")
required_report_templates = (
"routing_serving_trace_report",
"QEH_AUDIT_BLOCK_report",
"capture_read_ledger_report",
"backdata_feature_bank_table",
"order_quantity_4stage_gate_report",
"decision_trace_table",
"sell_priority_decision_table",
"alpha_lead_table",
"anti_distribution_table",
"profit_preservation_table",
"smart_cash_raise_table",
"execution_quality_table",
"reference_price_ledger",
"prediction_evaluation_improvement_report",
)
for required_template in required_report_templates:
if required_template not in templates:
fail(errors, f"report template missing {required_template}")
required_sequence = (rendering_contract.get("required_sequence") or [])
sequence_text = "\n".join(str(item) for item in required_sequence)
for required_step in (
"routing_serving_trace",
"QEH_AUDIT_BLOCK",
"backdata_feature_bank_table",
"alpha_lead_table",
"anti_distribution_table",
"profit_preservation_table",
"smart_cash_raise_table",
"execution_quality_table",
"decision_trace_table",
"reference_price_ledger",
"prediction_evaluation_improvement_report",
):
if required_step not in sequence_text:
fail(errors, f"report rendering_contract.required_sequence missing {required_step}")
if "schemas/output_schema.json" not in analysis_prompt or "market_context_learning_note" not in analysis_prompt:
fail(errors, "analysis_prompt missing schema-first or learning-note instruction")
if "sell_priority_decision_table" not in analysis_prompt:
fail(errors, "analysis_prompt missing sell_priority_decision_table instruction")
if "decision_trace" not in analysis_prompt or "decision_trace_table" not in analysis_prompt:
fail(errors, "analysis_prompt missing decision_trace instruction")
for prompt_term in (
"routing_serving_trace",
"backdata_feature_bank_table",
"alpha_lead_table",
"anti_distribution_table",
"profit_preservation_table",
"smart_cash_raise_table",
"execution_quality_table",
"buy_permission_json",
"limit_price_policy_json",
"BLOCKED_REPORT",
):
if prompt_term not in analysis_prompt:
fail(errors, f"analysis_prompt missing required report/harness term: {prompt_term}")
for term in expected_terms:
if term not in analysis_prompt:
fail(errors, f"analysis_prompt missing prohibited free-form term: {term}")
if schema:
required = set(schema.get("required") or [])
if "capture_read_ledger" not in required:
fail(errors, "output_schema required missing capture_read_ledger")
if "decision_trace" not in required:
fail(errors, "output_schema required missing decision_trace")
trace = (((schema.get("properties") or {}).get("decision_trace") or {}).get("items")) or {}
trace_required = set(trace.get("required") or [])
for field in ("state", "check_id", "rule_ref", "inputs_used", "result", "selected_action", "blocked_actions", "missing_inputs", "tie_breaker_applied"):
if field not in trace_required:
fail(errors, f"output_schema decision_trace.required missing {field}")
orders = (((schema.get("properties") or {}).get("orders") or {}).get("items")) or {}
order_required = set(orders.get("required") or [])
for field in ("current_holding_quantity", "average_cost_krw", "current_price_krw"):
if field not in order_required:
fail(errors, f"output_schema orders.required missing {field}")
portfolio_exposure = load_yaml(ROOT / "spec" / "risk" / "portfolio_exposure.yaml", errors) or {}
sell_priority = ((portfolio_exposure.get("portfolio_exposure_framework") or {}).get("sell_priority_engine")) or {}
if not sell_priority:
fail(errors, "portfolio_exposure_framework missing sell_priority_engine")
else:
for key in ("hard_precedence", "candidate_scoring", "tie_breakers", "output_required", "prohibition"):
if key not in sell_priority:
fail(errors, f"sell_priority_engine missing {key}")
if "smart_cash_raise_execution" not in sell_priority:
fail(errors, "sell_priority_engine missing smart_cash_raise_execution")
position_sizing = load_yaml(ROOT / "spec" / "05_position_sizing.yaml", errors) or {}
sizing_root = position_sizing.get("position_sizing") or {}
if "pre_permission_gate" not in sizing_root:
fail(errors, "position_sizing missing pre_permission_gate")
if "BUY_PERMISSION_MATRIX_V1" not in str(sizing_root.get("sequence")):
fail(errors, "position_sizing.sequence missing BUY_PERMISSION_MATRIX_V1")
decision_flow = load_yaml(ROOT / "spec" / "09_decision_flow.yaml", errors) or {}
deterministic = ((decision_flow.get("decision_flow") or {}).get("deterministic_execution_control")) or {}
if not deterministic:
fail(errors, "decision_flow missing deterministic_execution_control")
else:
for key in ("trace_required_fields", "tie_breaker_order", "null_propagation_rule", "no_freeform_override"):
if key not in deterministic:
fail(errors, f"deterministic_execution_control missing {key}")
def validate_harness_contract_consistency(errors: list[str]) -> None:
"""E3: 19_harness_contract.yaml의 scalar/collection_keys가 validate_harness_context.py에 모두 검사되는지 교차검증."""
contract = load_yaml(ROOT / "spec" / "19_harness_contract.yaml", errors) or {}
validator_path = ROOT / "tools" / "validate_harness_context.py"
try:
validator_text = validator_path.read_text(encoding="utf-8")
except Exception as exc:
fail(errors, f"cannot read validate_harness_context.py: {exc}")
return
keys_section = ((contract.get("harness_contract") or {}).get("required_harness_context_keys")) or {}
raw_scalars = keys_section.get("scalar_keys") or []
raw_collections = keys_section.get("collection_keys") or []
# 주석(# ...) 및 비문자열 항목 제거
def clean_keys(raw: list) -> list[str]:
result = []
for item in raw:
if isinstance(item, str):
result.append(item.strip())
return result
scalar_keys = clean_keys(raw_scalars)
collection_keys = clean_keys(raw_collections)
for key in scalar_keys:
if f'"{key}"' not in validator_text:
fail(errors, f"harness_contract scalar_key not checked in validator: {key}")
for key in collection_keys:
if f'"{key}"' not in validator_text:
fail(errors, f"harness_contract collection_key not checked in validator: {key}")
def validate_spec_code_sync(errors: list[str]) -> dict:
"""WBS-7.11(2026-06-22) — spec YAML이 code_path로 가리키는 파일이 실제로 존재하는지 검사.
has_code_implementation 필드가 있는 파일만 검사한다(점진적 롤아웃 — 필드가 없는
파일은 스킵되므로 1차 태깅이 기존 PASS 상태를 절대 깨지 않는다). redirect_only:true인
파일은 의도적으로 코드가 없는 순수 호환 인덱스이므로 code_path 검사 대상이 아니며,
has_code_implementation:true와 동시에 있으면 그 자체로 모순이라 fail한다.
"""
all_yaml_paths = sorted((ROOT / "spec").rglob("*.yaml")) + sorted((ROOT / "governance").rglob("*.yaml"))
total_files = len(all_yaml_paths)
checked = 0
missing = 0
for path in all_yaml_paths:
try:
data = yaml.safe_load(path.read_text(encoding="utf-8"))
except Exception:
continue
if not isinstance(data, dict):
continue
meta = data.get("meta") if isinstance(data.get("meta"), dict) else data
has_code = meta.get("has_code_implementation")
if has_code is None:
continue
redirect_only = bool(meta.get("redirect_only"))
checked += 1
if redirect_only and has_code:
fail(errors, f"spec_code_sync contradiction: {path} has redirect_only=true AND has_code_implementation=true")
missing += 1
continue
if not has_code:
continue
code_path = meta.get("code_path")
candidates = code_path if isinstance(code_path, list) else [code_path] if code_path else []
if not candidates:
fail(errors, f"spec_code_sync: {path} declares has_code_implementation=true but no code_path")
missing += 1
continue
for rel in candidates:
if not (ROOT / str(rel)).exists():
fail(errors, f"spec declares code_path that does not exist: {path} -> {rel}")
missing += 1
result = {
"formula_id": "SPEC_CODE_SYNC_V1",
"total_spec_files": total_files,
"checked_count": checked,
"missing_code_path_count": missing,
"sync_field_coverage_pct": round(100.0 * checked / total_files, 2) if total_files else 0.0,
"gate": "PASS" if missing == 0 else "FAIL",
}
out = ROOT / "Temp" / "spec_code_sync_v1.json"
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
return result
def main() -> int:
errors: list[str] = []
yaml_paths = [
ROOT / "RetirementAssetPortfolio.yaml",
ROOT / "RetirementAssetPortfolioReportTemplate.yaml",
*sorted((ROOT / "spec").rglob("*.yaml")),
*sorted((ROOT / "examples").glob("*.yaml")),
*sorted((ROOT / "proposals").glob("*.yaml")),
*sorted((ROOT / "tests").glob("*.yaml")),
]
for path in yaml_paths:
load_yaml(path, errors)
json_paths = sorted((ROOT / "schemas").glob("*.json")) + sorted((ROOT / "examples").glob("*.json"))
parsed_json = {path: load_json(path, errors) for path in json_paths}
for path in sorted((ROOT / "examples").glob("*.jsonl")):
try:
for line_no, line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
if line.strip():
json.loads(line)
except Exception as exc:
fail(errors, f"JSONL parse failed: {path}:{line_no}: {type(exc).__name__}: {exc}")
manifest = load_yaml(ROOT / "RetirementAssetPortfolio.yaml", errors) or {}
for step_name, step in (manifest.get("load_sequence") or {}).items():
for file_name in step.get("files", []):
# Temp/ 파일은 런타임 생성 아티팩트 — CI 체크아웃 환경에서는 존재하지 않음
if "*" not in file_name and not file_name.startswith("Temp/") and not (ROOT / file_name).exists():
fail(errors, f"manifest load_sequence missing file: {step_name}: {file_name}")
for key, file_name in (manifest.get("spec_files") or {}).items():
if not isinstance(file_name, str):
continue
if "*" not in file_name and not file_name.startswith("Temp/") and not (ROOT / file_name).exists():
fail(errors, f"manifest spec_files missing file: {key}: {file_name}")
# All spec YAML files should be registered in manifest, governance, split indexes, or compatibility indexes.
manifest_text = (ROOT / "RetirementAssetPortfolio.yaml").read_text(encoding="utf-8")
for path in sorted((ROOT / "spec").rglob("*.yaml")):
rel = path.relative_to(ROOT).as_posix()
if rel not in manifest_text:
fail(errors, f"spec file not registered in manifest: {rel}")
if path.stat().st_size > MAX_SPEC_BYTES and path.name not in {
"13_formula_registry.yaml", "13b_harness_formulas.yaml",
"12_field_dictionary.yaml",
"51_formula_lifecycle_registry.yaml", # 290+ formula lifecycle registry (Proposal51-P1)
"formula_golden_cases_v2.yaml", # BCH-V1 골든케이스 — 공식 수 증가로 50KB 초과 허용
"formula_golden_cases_nf.yaml", # NF1~NF5 Python-harness 보조 공식 명세 golden cases
"calibration_registry.yaml", # CALIB-V1 임계값 레지스트리
"27_bch_calibration_runbook.yaml", # BCH 런북
"output_field_owner_ledger.yaml", # generated ledger — size threshold exempt
"formula_registry.normalized.yaml", # Normalized formula registry
"factor_lifecycle_registry.yaml", # Factor lifecycle registry
"exit.yaml",
"risk.yaml",
"41_release_dag.yaml", # release DAG grows with each new pipeline step
}:
fail(errors, f"spec file exceeds {MAX_SPEC_BYTES} bytes and should be split/indexed: {rel}")
combined_text = "\n".join(
path.read_text(encoding="utf-8")
for path in [
ROOT / "RetirementAssetPortfolio.yaml",
ROOT / "AGENTS.md",
ROOT / "spec" / "07_output_schema.yaml",
ROOT / "prompts" / "analysis_prompt.md",
ROOT / "schemas" / "output_schema.json",
]
)
if "F2-json-output" in combined_text or "F5-complete-output" in combined_text:
fail(errors, "stale JSON schema version reference remains")
if SCHEMA_VERSION not in combined_text:
fail(errors, "current schema version not referenced")
active_text_paths = [
ROOT / "RetirementAssetPortfolio.yaml",
ROOT / "AGENTS.md",
ROOT / "RetirementAssetPortfolioReportTemplate.yaml",
*sorted((ROOT / "spec").rglob("*.yaml")),
*sorted((ROOT / "prompts").glob("*.md")),
*sorted((ROOT / "tests").glob("*.yaml")),
]
active_text = "\n".join(path.read_text(encoding="utf-8") for path in active_text_paths)
bad_legacy = [
"llm_compact_execution_contract.non_negotiable_tables",
"llm_compact_execution_contract.master_prohibitions",
"llm_compact_execution_contract.hard_stops",
]
legacy_allowed_file = ROOT / "spec" / "00_execution_contract.yaml"
active_text_without_alias_file = "\n".join(
path.read_text(encoding="utf-8")
for path in active_text_paths
if path != legacy_allowed_file
)
for token in bad_legacy:
if token in active_text_without_alias_file:
fail(errors, f"legacy dangling reference remains: {token}")
exec_contract = load_yaml(ROOT / "spec" / "00_execution_contract.yaml", errors) or {}
mp = exec_contract.get("master_prohibitions") or {}
for pid in ("P1", "P2", "P3", "P4", "P5"):
if not any(str(key).startswith(pid) for key in mp):
fail(errors, f"master_prohibitions missing {pid}")
for match in re.finditer(r"master_prohibitions\.P([1-5])", active_text):
pid = "P" + match.group(1)
if not any(str(key).startswith(pid) for key in mp):
fail(errors, f"reference to undefined {pid}")
# Derived adapters must not claim broad canonical authority.
for path in sorted((ROOT / "spec").rglob("*.yaml")):
data = load_yaml(path, errors)
if not isinstance(data, dict):
continue
meta = data.get("meta") or {}
if meta.get("role") == "derived_adapter":
text = path.read_text(encoding="utf-8")
if re.search(r"^\s+canonical:\s+true\s*$", text, flags=re.MULTILINE):
fail(errors, f"derived_adapter has broad canonical:true: {path}")
numeric_lines = []
for line_no, line in enumerate(text.splitlines(), 1):
if re.search(r"(?<![A-Za-z])\d+(?:\.\d+)?%?", line) and "canonical_ref" not in line and "version:" not in line:
numeric_lines.append(line_no)
if numeric_lines and "authority_rule" not in text:
fail(errors, f"derived_adapter numeric thresholds without authority_rule: {path}:{numeric_lines[:5]}")
# Decision-flow required_refs should point to existing files.
flow = load_yaml(ROOT / "spec" / "09_decision_flow.yaml", errors) or {}
states = (((flow.get("decision_flow") or {}).get("states")) or {})
for state_name, state in states.items():
for ref in state.get("required_refs", []):
file_name = str(ref).split(":", 1)[0]
if file_name.endswith(".yaml") and not (ROOT / file_name).exists():
fail(errors, f"decision_flow missing required_ref file: {state_name}: {file_name}")
# AGENTS should mention the same critical top-level authority files as manifest.
agents_text = (ROOT / "AGENTS.md").read_text(encoding="utf-8")
for critical in ("spec/00_execution_contract.yaml", "spec/risk/aggregate_risk.yaml", "spec/risk/portfolio_exposure.yaml", "spec/12_field_dictionary.yaml", "spec/13_formula_registry.yaml", "spec/14_raw_workbook_mapping.yaml", "spec/15_account_snapshot_contract.yaml", "spec/02_data_contract.yaml", "spec/09_decision_flow.yaml"):
if critical not in agents_text:
fail(errors, f"AGENTS.md missing critical source file: {critical}")
schema = parsed_json.get(ROOT / "schemas" / "output_schema.json")
sample = parsed_json.get(ROOT / "examples" / "full_output_valid.json")
if schema and sample:
validate_json_schema_minimal(schema, sample, errors)
validate_formula_registry(errors)
validate_output_rendering_contract(schema, errors)
validate_harness_contract_consistency(errors)
validate_spec_code_sync(errors)
aliases = load_yaml(ROOT / "spec" / "aliases.yaml", errors) or {}
alias_map = aliases.get("aliases") or {}
alias_files = {
ROOT / "spec" / "aliases.yaml",
ROOT / "spec" / "06_exit_policy.yaml",
ROOT / "spec" / "risk" / "risk_control.yaml",
ROOT / "spec" / "strategy" / "entry_gates.yaml",
}
for deprecated in alias_map:
meta = alias_map.get(deprecated) or {}
remove_after = meta.get("remove_after")
if remove_after:
try:
expiry = date.fromisoformat(str(remove_after))
if date.today() > expiry:
fail(errors, f"deprecated alias expired: {deprecated} remove_after={remove_after}")
except ValueError:
fail(errors, f"invalid alias remove_after date: {deprecated} remove_after={remove_after}")
for path in active_text_paths:
if path in alias_files:
continue
if deprecated in path.read_text(encoding="utf-8"):
fail(errors, f"deprecated alias used outside alias/index files: {deprecated} in {path}")
# Examples are illustrative, but they must not teach legacy paths to downstream LLM runs.
example_text_paths = [*sorted((ROOT / "examples").glob("*.yaml")), *sorted((ROOT / "examples").glob("*.jsonl"))]
for deprecated in alias_map:
for path in example_text_paths:
if deprecated in path.read_text(encoding="utf-8"):
fail(errors, f"deprecated alias used in example: {deprecated} in {path}")
# Bundle profiles are manifest-owned; build script must follow the manifest lists.
profiles = manifest.get("bundle_profiles") or {}
for profile_name in ("compact", "ultra_compact"):
profile = profiles.get(profile_name)
if not isinstance(profile, dict):
fail(errors, f"manifest missing bundle_profiles.{profile_name}")
continue
for file_name in profile.get("files", []):
# Temp/ 파일은 런타임 생성 아티팩트 — CI 환경에서는 스킵
if "*" not in file_name and not file_name.startswith("Temp/") and not (ROOT / file_name).exists():
fail(errors, f"bundle profile missing file: {profile_name}: {file_name}")
ownership = load_yaml(ROOT / "spec" / "ownership_map.yaml", errors) or {}
for file_name, policy in (ownership.get("ownership_map") or {}).items():
path = ROOT / file_name
if not path.exists():
continue
text = path.read_text(encoding="utf-8")
for forbidden in policy.get("must_not_own", []):
# Korean natural-language labels are advisory. Only enforce key-like forbidden tokens.
if re.match(r"^[A-Za-z0-9_.:/_-]+$", forbidden) and forbidden in text:
fail(errors, f"ownership violation: {file_name} contains must_not_own token {forbidden}")
xref = load_yaml(ROOT / "spec" / "xref_matrix.yaml", errors) or {}
for file_name, policy in (xref.get("xref_matrix") or {}).items():
candidates = [p for p in active_text_paths if p.relative_to(ROOT).as_posix().startswith(file_name.rstrip("/"))]
for path in candidates:
text = path.read_text(encoding="utf-8")
for forbidden in policy.get("must_not_reference", []):
if forbidden and forbidden in text:
fail(errors, f"xref violation: {path} references forbidden token {forbidden}")
for bundle in (
ROOT / "dist" / "retirement_portfolio_bundle.yaml",
ROOT / "dist" / "retirement_portfolio_compact.yaml",
ROOT / "dist" / "retirement_portfolio_ultra_compact.yaml",
):
if bundle.exists():
load_yaml(bundle, errors)
if errors:
print("VALIDATION FAIL")
for err in errors:
print(f"- {err}")
return 1
print("VALIDATION OK")
return 0
if __name__ == "__main__":
raise SystemExit(main())