from __future__ import annotations import json import re import sys from datetime import date from pathlib import Path import yaml try: import jsonschema except Exception: # pragma: no cover - optional dependency jsonschema = None ROOT = Path(__file__).resolve().parents[1] SCHEMA_VERSION = "2026-05-15-F6-compat-output" MAX_SPEC_BYTES = 50_000 def fail(errors: list[str], message: str) -> None: errors.append(message) def load_yaml(path: Path, errors: list[str]): try: return yaml.safe_load(path.read_text(encoding="utf-8")) except Exception as exc: fail(errors, f"YAML parse failed: {path}: {type(exc).__name__}: {exc}") return None def load_json(path: Path, errors: list[str]): try: return json.loads(path.read_text(encoding="utf-8")) except Exception as exc: fail(errors, f"JSON parse failed: {path}: {type(exc).__name__}: {exc}") return None def validate_json_schema_minimal(schema: dict, sample: dict, errors: list[str]) -> None: if jsonschema is not None: try: jsonschema.validate(instance=sample, schema=schema) return except Exception as exc: fail(errors, f"jsonschema validation failed: {type(exc).__name__}: {exc}") return required = schema.get("required", []) for key in required: if key not in sample: fail(errors, f"full_output_valid.json missing required field: {key}") expected = schema.get("properties", {}).get("schema_version", {}).get("const") if sample.get("schema_version") != expected: fail(errors, f"sample schema_version mismatch: {sample.get('schema_version')} != {expected}") for order in sample.get("orders", []): for q_key in ("quantity", "stop_quantity", "take_profit_quantity"): value = order.get(q_key) if value is not None and not isinstance(value, int): fail(errors, f"order {q_key} must be integer or null") def base_field_name(field: str) -> str: name = str(field).replace("positions[].", "") return name.split(".", 1)[0] def validate_formula_registry(errors: list[str]) -> None: dictionary = load_yaml(ROOT / "spec" / "12_field_dictionary.yaml", errors) or {} registry = load_yaml(ROOT / "spec" / "13_formula_registry.yaml", errors) or {} harness_registry = load_yaml(ROOT / "spec" / "13b_harness_formulas.yaml", errors) or {} fields = (dictionary.get("field_dictionary") or {}).get("fields") or {} canonical_names = {meta.get("canonical_name") for meta in fields.values() if isinstance(meta, dict)} formulas = ((registry.get("formula_registry") or {}).get("formulas")) or {} harness_formulas = ((harness_registry.get("formula_registry") or {}).get("formulas")) or {} all_formulas = {**formulas, **harness_formulas} if not canonical_names: fail(errors, "field_dictionary has no canonical fields") if not formulas: fail(errors, "formula_registry has no formulas") # Proposal51 신규 공식: inputs/output 대신 output_contract/checks/layers 구조 사용 허용 # Phase-1/2/3 Python-tool 공식: GAS 하네스가 아닌 Python tools로 구현, inputs 필드 없음 (의도적) _ALTERNATE_STRUCTURE_FORMULAS_ = { "EXPORT_GATE_V2", "PROACTIVE_SELL_RADAR_V2", "ANTI_LATE_ENTRY_GATE_V3", "PRICE_HIERARCHY_LOCK_V1", "DATA_QUALITY_GATE_V2", "CASH_RECOVERY_DISPLAY_LOCK_V1", "SEMICONDUCTOR_CLUSTER_SYNC_V1", # inputs 있으나 output_contract 구조 "FUNDAMENTAL_MULTI_FACTOR_SCORE_V2", "EARNINGS_GROWTH_QUALITY_GATE_V1", "MARKET_SHARE_MOMENTUM_PROXY_V1", "CASHFLOW_STABILITY_GATE_V1", "ROUTING_DECISION_EXPLAIN_LOCK_V1", # Phase-1 Python-tool-only 공식 (inputs 필드 없음, expected_outputs 구조) "BLANK_CELL_AUDIT_V1", "VALUE_PRESERVATION_SCORER_V1", "SMART_CASH_RECOVERY_V3", "RATCHET_TRAILING_GENERAL_V1", "EJCE_VIEW_RENDERER_V1", "ROUTING_EXECUTION_LOG_TABLE_V1", # Phase-2 Python-tool-only 공식 "FUNDAMENTAL_RAW_INGEST_V1", "FUNDAMENTAL_MULTIFACTOR_V3", "HORIZON_CLASSIFICATION_V1", # Phase-2B Python-tool-only 공식 "EARNINGS_QUALITY_SIGNAL_V1", "GROWTH_RATE_SIGNAL_V1", "CASHFLOW_QUALITY_SIGNAL_V1", # Phase-3 Python-tool-only 공식 "SMART_MONEY_FLOW_SIGNAL_V2", "LIQUIDITY_FLOW_SIGNAL_V1", "PORTFOLIO_ALPHA_CONFIDENCE_PER_TICKER_V1", # Phase-3 Market Share V2 (proxy-based) "MARKET_SHARE_SIGNAL_V2", # Phase-4~5 Python-tool-only 공식 (실측 반영 + 신규 하네스) "TRADE_QUALITY_FROM_T5_V1", "PREDICTION_ACCURACY_HARNESS_V2", "MACRO_EVENT_TICKER_IMPACT_V1", "SELL_WATERFALL_ENGINE_V2", "LLM_NARRATIVE_TEMPLATE_LOCK_V1", "EJCE_DIVERGENCE_AUDIT_V1", "PREDICTIVE_ALPHA_REPORT_LOCK_V2", # Phase-6 Python-tool-only 공식 (판단 결정론 계층) "SMART_MONEY_LIQUIDITY_GATE_V1", "FINAL_JUDGMENT_GATE_V1", "VERDICT_CONSISTENCY_LOCK_V1", "INVESTMENT_QUALITY_HEADLINE_V1", # Phase-7 단일 진실원천 + 교차섹션 정합성 게이트 "CANONICAL_METRICS_V1", "CROSS_SECTION_CONSISTENCY_V1", # Work 7 + Work 3 분석 도구 "ALPHA_FEEDBACK_LOOP_V2", "ALPHA_LEAD_THRESHOLD_OPTIMIZER_V1", # ENGINE_AUDIT — Python-tool-only 감사 게이트 (GAS 런타임 비개입) "IMPUTED_DATA_EXPOSURE_GATE_V1", } for formula_id, formula in all_formulas.items(): if not isinstance(formula, dict): fail(errors, f"formula must be mapping: {formula_id}") continue if formula_id in _ALTERNATE_STRUCTURE_FORMULAS_ or str(formula.get("version", "")).endswith("_ORPHAN_RECONCILE"): if "purpose" not in formula: fail(errors, f"formula missing purpose: {formula_id}") continue # inputs/output 구조 검사 스킵 for key in ("purpose", "inputs", "output"): if key not in formula: fail(errors, f"formula missing {key}: {formula_id}") # GAS-internal computed fields: not in field_dictionary by design _INPUT_INTERNAL_ALLOWLIST_ = { "price", # GAS price object (price.ret10D, price.close, …) "globalKospiRet10D_", # preReads KOSPI 10-day return "portfolioStats", # harness aggregate stats object "satellite_holdings", # harness portfolio-level array "satellite_holdings[]", # base_field_name result for satellite_holdings[].field # Sprint B/C harness-internal aggregate objects (not in field_dictionary by design) "metadata", # harness metadata object (capturedAt, market_date) "today_date", # GAS runtime date passed to freshness gate "monthly_history[]", # monthly_history sheet aggregate (AFL inputs) "alpha_history[]", # alpha_history sheet aggregate (AFL inputs) # [3RD_HARNESS] harness-internal / batch-only fields "monthly_history", # PATTERN_BLACKLIST batch input (full sheet array) "highest_close", # PROFIT_RATCHET_TIERED_V2 — GAS runtime max(close) "oversold_gate", # K2_STAGED_REBOUND_SELL_V1 output used as downstream input "h2_priority_rank", # sell_priority sheet rank (GAS runtime integer) "waterfall_plan_json", # SELL_WATERFALL_ENGINE_V1 output piped as input "cash_recovery_plan_json", # CASH_RECOVERY_OPTIMIZER_V1 output piped as input "trade_quality_json", # TRADE_QUALITY_SCORER_V1 output piped to PATTERN_BLACKLIST "proactive_sell_radar_json", "routing_trace_json", "export_gate_json", "opm_pct", "revenue_growth_pct", "market_share_proxy_pct", "free_cf_krw", "eps_growth_qoq_pct", "eps_growth_yoy_pct", "accrual_ratio_pct", # Price/velocity fields computed from core_satellite sheet "velocity_1d", # (close-prev_close)/prev_close*100 — derived "velocity_5d", # Ret5D from core_satellite "prev_close", # PrevClose from core_satellite "obv_slope_20d", # OBV 20-day slope — technical indicator "anti_chasing_status", # ANTI_CHASING_VELOCITY_V1 output piped downstream # Intraday/timing fields from INTRADAY_ACTION_MATRIX_V1 "gap_down_pct", # (open - prev_close)/prev_close — intraday gap "intraday_drop", # (close - open)/open — intraday drop "intraday_change", # real-time price change pct "time_slot_label", # PRE_MARKET/INTRADAY/POST_MARKET label # TRADE_QUALITY_SCORER_V1 batch-only historical fields (not live feed fields) "velocity_1d_at_entry", "ma20_at_entry", "volume_ratio_at_entry", "t5_return_pct", "t20_vs_core_pctp", "sell_price", "ma20_at_sell", "price_t5_after_sell", "cash_recovered_krw", # Orchestrator/meta formula context objects "harness_context", # DETERMINISTIC_ROUTING_ENGINE_V1 full context # Sell order / price fields from sell_priority sheet (GAS runtime) "sell_limit_price", # GAS computed sell limit price (Sell_Limit_Price column) "stop_loss_price", # stop price from account_snapshot (stop_price column) "tick_unit", # KRX tick unit size (GAS computed from close level) # Timestamp / market-date fields "capture_time", # HTS capture timestamp (from account_snapshot.captured_at) "market_date", # GAS runtime market date string # Historical close prices for velocity computation "close_1d_ago", # PrevClose from core_satellite "close_5d_ago", # Close 5 days prior (from price history / ret5d back-calc) # [PROPOSAL50] GAS 하네스 내부 집계 객체 (field_dictionary 미등록 의도) "df", # 종목별 데이터 피드 맵 슬라이스 (dfMap[ticker]) "paeRow", # PAE 엔진 출력 행 (per-ticker predictive_alpha row) "hApex", # 하네스 Apex 컨텍스트 집계 객체 "holdings", # 보유 종목 배열 (GAS 런타임 집계) "dfMap", # 전체 데이터 피드 맵 (ticker→df) "cashShortfallInfo", # 현금부족 정보 집계 객체 "h2", # 매도우선순위 레이어 집계 객체 "semiconductorClusterGate", # 반도체 클러스터 게이트 출력 객체 "macroJson", # getMacroJson() 반환값 — 거시 지표 집계 객체 "mesResult", # MACRO_EVENT_SYNCHRONIZER_V1 출력 객체 "h3", # 수량 레이어 집계 객체 "totalAsset", # 총자산 KRW (GAS 런타임 스칼라) "capturedAtIso", # HTS 캡처 타임스탬프 ISO8601 문자열 "now", # GAS 런타임 Date 객체 # [PROPOSAL50] 신규 함수 GAS 내부 입력 "blueprints", # SHADOW_LEDGER_V1 — order_blueprint_json 배열 "order_condition_text", # VALIDATE_ORDER_CONDITION_V1 — 주문 조건 텍스트 "avg_trade_val_5d", # AVG_TRADE_VALUE_SIGNAL_V1 — 5일 평균 거래대금 "avg_trade_val_20d", # AVG_TRADE_VALUE_SIGNAL_V1 — 20일 평균 거래대금 "profit_lock_stage", # AVG_TRADE_VALUE_SIGNAL_V1 — 수익 잠금 스테이지 "sell_candidates_json", # TRIM_PLAN_MIN_CASH_V1 내부 입력 "sell_quantities_json", # TRIM_PLAN_MIN_CASH_V1 내부 입력 # Phase-1/2/3 deterministic harness internal inputs "operational_report_json", "Close", "MA20", "MA60", "ATR20", "RSI14", "BB_Position", "Frg_5D", "Inst_5D", "AvgTradeValue_5D_M", "AvgTradeValue_20D_M", "Recovery_Ratio_5D", "Stock_Drawdown_From_High_Pct", "value_preservation_scorer_v1_json", "scrs_v2_json", "macro_risk_regime", "Spread_Pct", "Profit_Pct", "High52W", "Stop_Price_Est", "Account_Avg_Cost", "ejce_json", "breakout_quality_gate_json", "anti_chasing_velocity_json", "portfolio_alpha_confidence", "routing_execution_log", "alpha_lead_json", "_harness_context", # [NF1~NF5] Python-harness 보조 공식 전용 입력 (GAS 미사용, field_dictionary 미등록 의도) "ticker_type", # NF1: export | domestic | neutral 분류 "base_macro_score", # NF1: 거시팩터 기본 점수 "down_streak", # NF2: 연속 하락 일수 (prices_json 파생) "t5_ledger", # NF3: proposal_evaluation_history 비-REPLAY T+5 행 "cut_decile", # NF3: BUY 차단 분위 경계 (EXPERT_PRIOR=3) "sample_n", # NF3: 표본 수 (캘리브레이션 판단용) "sell_qty", # NF4: 매도 수량 (scrs_v2 selected_combo 파생) "prev_trail_stop", # NF5: 이전 래칫 손절가 (ratchet_trailing_general 파생) "high_since_entry", # NF5: 진입 후 최고가 (prices_json 파생) "profit_pct", # NF5: 수익률 % (account_snapshot 파생) "buy_timing_score", # NF3: entry-timing proxy (proposal_evaluation_history 파생) "adv20", # NF4: 20일 평균 거래대금 (prices_json 파생) "emergency_full_sell", # NF4: 비상 전량매도 플래그 (scrs_v2 파생) } for input_item in formula.get("inputs", []): field = base_field_name(input_item.get("field", "")) if field and field not in canonical_names and field not in _INPUT_INTERNAL_ALLOWLIST_: fail(errors, f"formula input field not in field_dictionary: {formula_id}: {field}") output = formula.get("output") or {} output_field = base_field_name(output.get("field", "")) # Intermediate derived fields do not need dictionary entries, but final formula outputs should be named. if output_field and output_field not in canonical_names and output_field not in { "flow_credit", "total_heat_pct", "expected_edge", "target_cash_pct", "final_quantity", "peg_gate_result", "take_profit_ladder_v2", "financial_health_score", "portfolio_beta", "ratchet_stop_price", "tick_normalized_price", "alpha_lead_json", "follow_through_json", "distribution_risk_json", "profit_preservation_json", "cash_raise_plan_json", "rebound_sell_trigger_json", "execution_quality_json", "buy_permission_json", "smart_sell_quantities_json", "limit_price_policy_json", # Sprint B/C new formula outputs "data_freshness_status", # HARNESS_DATA_FRESHNESS_GATE_V1 "satellite_lifecycle_stage", # SATELLITE_LIFECYCLE_GATE_V1 "cla_exit_status", # CLA_REGIME_EXIT_CONDITION_V1 "satellite_cluster_beta", # PORTFOLIO_CORRELATION_GATE_V1 "alpha_feedback_json", # ALPHA_FEEDBACK_LOOP_V1 # [3RD_HARNESS] new formula outputs "sell_price_sanity_status", # SELL_PRICE_SANITY_V1 "cash_recovery_plan_json", # CASH_RECOVERY_OPTIMIZER_V1 "intraday_scope", # INTRADAY_ACTION_MATRIX_V1 "anti_chasing_verdict", # ANTI_CHASING_VELOCITY_V1 "pullback_entry_verdict", # PULLBACK_ENTRY_TRIGGER_V1 "distribution_sell_detector_status", # DISTRIBUTION_SELL_DETECTOR_V1 "waterfall_plan_json", # SELL_WATERFALL_ENGINE_V1 "sell_timing_verdict", # SELL_EXECUTION_TIMING_V1 "routing_execution_log", # DETERMINISTIC_ROUTING_ENGINE_V1 "llm_constraint_status", # LLM_SERVING_CONSTRAINT_V1 "auto_trailing_stop_v2", # PROFIT_RATCHET_TIERED_V2 "preservation_verdict", # SELL_VALUE_PRESERVATION_TIERED_V2 "trade_quality_json", # TRADE_QUALITY_SCORER_V1 "pattern_blacklist_status", # PATTERN_BLACKLIST_AUTO_V1 "velocity_5d", # computed from ret5d / used in SELL_VALUE_PRES. "fundamental_quality_json", "horizon_allocation_json", "smart_money_liquidity_json", "routing_serving_trace_v2_json", "fundamental_multifactor_json", "earnings_growth_quality_json", "market_share_proxy_json", "cashflow_stability_json", "routing_decision_explain_json", "blank_cell_audit_v1_json", "value_preservation_scorer_v1_json", "smart_cash_recovery_v3_json", "ratchet_trailing_general_v1_json", "ejce_view_renderer_v1_json", "routing_execution_log_v1_json", "pullback_state", # PULLBACK_ENTRY_TRIGGER_V1 output "serving_constraint_check", # LLM_SERVING_CONSTRAINT_V1 output "anti_chasing_velocity_status", # ANTI_CHASING_VELOCITY_V1 output # [3RD_HARNESS_V1] 커버리지 완성 추가 출력 필드 "ratchet_stage_v2", # PROFIT_RATCHET_TIERED_V2 "profit_lock_stage", # PROFIT_LOCK_RATCHET_V1 "auto_trailing_stop", # PROFIT_LOCK_RATCHET_V1 "flow_acceleration_status", # FLOW_ACCELERATION_V1 "signals_count", # DISTRIBUTION_SELL_DETECTOR_V1 "pullback_entry_trigger_price", # PULLBACK_ENTRY_TRIGGER_V1 "sell_execution_window", # SELL_EXECUTION_TIMING_V1 "tick_normalized_price", # TICK_NORMALIZER_V1 (duplicate-safe) "brt_verdict", # BENCHMARK_RELATIVE_TIMESERIES_V1 "brt_rs_slope", # BENCHMARK_RELATIVE_TIMESERIES_V1 "rs_verdict", # RS_VERDICT_V2 "saqg_verdict", # SATELLITE_ALPHA_QUALITY_GATE_V1 "sapg_verdict", # SATELLITE_AGGREGATE_PNL_GATE_V1 "tick_normalized_prices_json", # TICK_NORMALIZER_V1 per-ticker map "ratchet_v2_per_ticker_json", # PROFIT_RATCHET_TIERED_V2 per-ticker "sell_price_sanity_per_ticker_json", # SELL_PRICE_SANITY_V1 per-ticker "decisions_json", # DETERMINISTIC_ROUTING_ENGINE_V1 (updated) "comprehensive_proposal_json", # HS010-B 판단 제안표 원천 데이터 "satellite_candidate_json", # HS010-C 위성 후보 스크리닝 "satellite_candidate_summary", # HS010-C 요약 # SPRINT 1 신규 필드 (Direction O1/O2/O5/P1/P3/P5/A2/B1/B3/K2/C1/D1) "semiconductor_cluster_json", # O2 SEMICONDUCTOR_CLUSTER_GATE_V1 "single_position_weight_json", # O1 SINGLE_POSITION_WEIGHT_CAP_V1 "position_count", # O5 POSITION_COUNT_LIMIT_V1 "position_count_max", # O5 "position_count_gate", # O5 "stop_breach_alert_json", # P1 STOP_BREACH_ALERT_V1 "heat_concentration_json", # P3 HEAT_CONCENTRATION_ALERT_V1 "portfolio_health_blocked_json", # P5 PORTFOLIO_HEALTH_SCORE_V1 "anti_chasing_velocity_json", # A2+B1 ANTI_CHASING_VELOCITY_V1 "distribution_sell_detector_json", # B3 DISTRIBUTION_SELL_DETECTOR_V1 "k2_staged_rebound_sell_json", # K2 K2_STAGED_REBOUND_SELL_V1 "cash_recovery_plan_json", # C1/A3 SELL_WATERFALL_ENGINE_V1 # SPRINT 2 신규 필드 (Direction REGIME_CLA/RS_VERDICT/RAG) "regime_cla_json", # REGIME_CLA CONCENTRATED_LEADER_ADVANCE_V1 "cla_exit_status", # REGIME_CLA CLA_EXIT_CONFIRMED / CLA_ACTIVE "rag_v1", # RAG REPLACEMENT_ALPHA_GATE_V1 "rag_reason", # RAG 사유 텍스트 "rs_verdict_source", # RS_VERDICT V2_FUSION / V1_ONLY "rs_verdict_v1_raw", # RS_VERDICT V1 원시값 # SPRINT 3 신규 필드 (Direction L4) "pre_distribution_warning", # L4 PRE_DISTRIBUTION_EARLY_WARNING_V1 # SPRINT 4 신규 필드 (Direction SFG/F1/F2/PCG) "sfg_v1", # SFG SATELLITE_FAILURE_GATE_V1 스칼라 "sfg_broken_count", # SFG 위성 BROKEN 종목 수 "sfg_failure_rate", # SFG 위성 실패율 (0.0–1.0) "pattern_blacklist_json", # F2 PATTERN_BLACKLIST_AUTO_V1 "portfolio_correlation_gate_json", # PCG PORTFOLIO_CORRELATION_GATE_V1 "correlation_gate_status", # PCG 상태 스칼라 # [PROPOSAL46] 신규 하네스 출력 필드 "predictive_alpha_json", # PA1 PREDICTIVE_ALPHA_ENGINE_V1 "anti_late_entry_json", # PA2 ANTI_LATE_ENTRY_GATE_V2 "cash_preservation_sell_json", # PA3 CASH_PRESERVATION_SELL_ENGINE_V2 "macro_event_json", # PA4 MACRO_EVENT_SYNCHRONIZER_V1 "consistency_report_json", # PA5 CONSISTENCY_VALIDATOR_V2 # [PROPOSAL50] 신규 하네스 출력 필드 "ejce_json", # EJCE-V1 EXPERT_JUDGMENT_CONSENSUS_ENGINE_V1 "scrs_v2_json", # SCRS-V2 SMART_CASH_RECOVERY_SELL_ENGINE_V2 "mrag_v2_json", # MRAG-V2 MACRO_REGIME_ADAPTIVE_GATE_V2 "mandatory_reduction_json", # M5 V1.1 MANDATORY_REDUCTION_PLAN_V1 "serving_lock_json", # DSLE-V1 DETERMINISTIC_SERVING_LOCK_ENGINE_V1 "order_condition_validation", # HS007 VALIDATE_ORDER_CONDITION_V1 "shadow_ledger_json", # H10 SHADOW_LEDGER_V1 "llm_serving_constraint_json", # D2 LLM_SERVING_CONSTRAINT_V1 "avg_trade_val_signal_json", # H6 AVG_TRADE_VALUE_SIGNAL_V1 # [Advanced Harness Architecture] "dynamic_value_preservation_sell_v6_json", # DYNAMIC_VALUE_PRESERVATION_SELL_V6 "predictive_alpha_engine_v2_json", # PREDICTIVE_ALPHA_DIALECTIC_ENGINE_V2 "capital_style_time_stop_v1_json", # CAPITAL_STYLE_TIME_STOP_V1 "execution_integrity_gate_v1_json", # EXECUTION_INTEGRITY_GATE_V1 # [NF1~NF5] Python-harness 보조 공식 출력 (field_dictionary 미등록 의도) "macro_factor_applied", # NF1 REGIME_CONDITIONAL_MACRO_FACTOR_V1 "rebound_capture_hit", # NF2 REBOUND_CAPTURE_THESIS_FACTOR_V1 "velocity_decile_thresholds", # NF3 ENTRY_TIMING_DECILE_FACTOR_V1 "max_child_qty", # NF4 SELL_SLIPPAGE_BUDGET_FACTOR_V1 "trail_stop", # NF5 PROFIT_GIVEBACK_RATCHET_FACTOR_V1 }: fail(errors, f"formula output field not registered or allowlisted: {formula_id}: {output_field}") def validate_output_rendering_contract(schema: dict | None, errors: list[str]) -> None: output_spec = load_yaml(ROOT / "spec" / "07_output_schema.yaml", errors) or {} report_template = load_yaml(ROOT / "RetirementAssetPortfolioReportTemplate.yaml", errors) or {} analysis_prompt = (ROOT / "prompts" / "analysis_prompt.md").read_text(encoding="utf-8") display_policy = ((output_spec.get("recommendation_grade") or {}).get("display_policy")) or {} sequence = display_policy.get("output_sequence") or {} # I1: routing_serving_trace → QEH_AUDIT_BLOCK이 step_0a/0b로 등록됐는지 확인 if sequence.get("step_0a") != "routing_serving_trace": fail(errors, "output_sequence missing step_0a=routing_serving_trace (I1/G4 required)") if sequence.get("step_0b") != "QEH_AUDIT_BLOCK": fail(errors, "output_sequence missing step_0b=QEH_AUDIT_BLOCK (I1/G4 required)") expected_prefix = [ "capture_read_ledger", "data_completeness_matrix", "backdata_feature_bank_table", "benchmark_relative_harness_table", "alpha_lead_table", "anti_distribution_table", "profit_preservation_table", "smart_cash_raise_table", "execution_quality_table", "order_quantity_4stage_gate", "decision_trace_table", "sell_priority_decision_table", "current_holdings_analysis_report_template", ] actual_prefix = [sequence.get(f"step_{index}") for index in range(1, len(expected_prefix) + 1)] if actual_prefix != expected_prefix: fail(errors, f"output_sequence prefix mismatch: {actual_prefix} != {expected_prefix}") # I1: human_report.required_sections 검증 human_report = output_spec.get("human_report") or {} required_sections = human_report.get("required_sections") or [] required_section_names = {s.get("name") for s in required_sections if isinstance(s, dict)} for mandatory in ( "routing_serving_trace", "QEH_AUDIT_BLOCK", "decision_trace_table", "backdata_feature_bank_table", "alpha_lead_table", "anti_distribution_table", "smart_cash_raise_table", "execution_quality_table", "prediction_evaluation_improvement_report", ): if mandatory not in required_section_names: fail(errors, f"human_report.required_sections missing: {mandatory}") # I4: watch_ledger 컬럼 제한 검증 watch_ledger = human_report.get("watch_ledger") or {} forbidden = set(watch_ledger.get("forbidden_columns") or []) hs010_forbidden = {"지정가", "손절가", "익절가", "주문수량", "주문금액"} if not hs010_forbidden.issubset(forbidden): fail(errors, f"watch_ledger.forbidden_columns missing HS010-I4 terms: {sorted(hs010_forbidden - forbidden)}") if "two_phase_rendering" not in (output_spec.get("json_output_contract") or {}): fail(errors, "json_output_contract missing two_phase_rendering") if "terminology_control" not in (output_spec.get("output_format") or {}): fail(errors, "output_format missing terminology_control") terms = { item.get("term") for item in ((output_spec.get("output_format") or {}).get("terminology_control") or {}).get("prohibited_freeform_terms", []) if isinstance(item, dict) } expected_terms = {"부분감액", "1차 감액", "부분정리", "전량", "전량매도"} if not expected_terms.issubset(terms): fail(errors, f"terminology_control missing prohibited terms: {sorted(expected_terms - terms)}") templates = report_template.get("output_format_templates") or {} rendering_contract = templates.get("rendering_contract") or {} prohibited_order_terms = set(((rendering_contract.get("terminology_rule") or {}).get("prohibited_order_terms")) or []) if not expected_terms.issubset(prohibited_order_terms): fail(errors, f"report rendering_contract missing prohibited_order_terms: {sorted(expected_terms - prohibited_order_terms)}") required_report_templates = ( "routing_serving_trace_report", "QEH_AUDIT_BLOCK_report", "capture_read_ledger_report", "backdata_feature_bank_table", "order_quantity_4stage_gate_report", "decision_trace_table", "sell_priority_decision_table", "alpha_lead_table", "anti_distribution_table", "profit_preservation_table", "smart_cash_raise_table", "execution_quality_table", "reference_price_ledger", "prediction_evaluation_improvement_report", ) for required_template in required_report_templates: if required_template not in templates: fail(errors, f"report template missing {required_template}") required_sequence = (rendering_contract.get("required_sequence") or []) sequence_text = "\n".join(str(item) for item in required_sequence) for required_step in ( "routing_serving_trace", "QEH_AUDIT_BLOCK", "backdata_feature_bank_table", "alpha_lead_table", "anti_distribution_table", "profit_preservation_table", "smart_cash_raise_table", "execution_quality_table", "decision_trace_table", "reference_price_ledger", "prediction_evaluation_improvement_report", ): if required_step not in sequence_text: fail(errors, f"report rendering_contract.required_sequence missing {required_step}") if "schemas/output_schema.json" not in analysis_prompt or "market_context_learning_note" not in analysis_prompt: fail(errors, "analysis_prompt missing schema-first or learning-note instruction") if "sell_priority_decision_table" not in analysis_prompt: fail(errors, "analysis_prompt missing sell_priority_decision_table instruction") if "decision_trace" not in analysis_prompt or "decision_trace_table" not in analysis_prompt: fail(errors, "analysis_prompt missing decision_trace instruction") for prompt_term in ( "routing_serving_trace", "backdata_feature_bank_table", "alpha_lead_table", "anti_distribution_table", "profit_preservation_table", "smart_cash_raise_table", "execution_quality_table", "buy_permission_json", "limit_price_policy_json", "BLOCKED_REPORT", ): if prompt_term not in analysis_prompt: fail(errors, f"analysis_prompt missing required report/harness term: {prompt_term}") for term in expected_terms: if term not in analysis_prompt: fail(errors, f"analysis_prompt missing prohibited free-form term: {term}") if schema: required = set(schema.get("required") or []) if "capture_read_ledger" not in required: fail(errors, "output_schema required missing capture_read_ledger") if "decision_trace" not in required: fail(errors, "output_schema required missing decision_trace") trace = (((schema.get("properties") or {}).get("decision_trace") or {}).get("items")) or {} trace_required = set(trace.get("required") or []) for field in ("state", "check_id", "rule_ref", "inputs_used", "result", "selected_action", "blocked_actions", "missing_inputs", "tie_breaker_applied"): if field not in trace_required: fail(errors, f"output_schema decision_trace.required missing {field}") orders = (((schema.get("properties") or {}).get("orders") or {}).get("items")) or {} order_required = set(orders.get("required") or []) for field in ("current_holding_quantity", "average_cost_krw", "current_price_krw"): if field not in order_required: fail(errors, f"output_schema orders.required missing {field}") portfolio_exposure = load_yaml(ROOT / "spec" / "risk" / "portfolio_exposure.yaml", errors) or {} sell_priority = ((portfolio_exposure.get("portfolio_exposure_framework") or {}).get("sell_priority_engine")) or {} if not sell_priority: fail(errors, "portfolio_exposure_framework missing sell_priority_engine") else: for key in ("hard_precedence", "candidate_scoring", "tie_breakers", "output_required", "prohibition"): if key not in sell_priority: fail(errors, f"sell_priority_engine missing {key}") if "smart_cash_raise_execution" not in sell_priority: fail(errors, "sell_priority_engine missing smart_cash_raise_execution") position_sizing = load_yaml(ROOT / "spec" / "05_position_sizing.yaml", errors) or {} sizing_root = position_sizing.get("position_sizing") or {} if "pre_permission_gate" not in sizing_root: fail(errors, "position_sizing missing pre_permission_gate") if "BUY_PERMISSION_MATRIX_V1" not in str(sizing_root.get("sequence")): fail(errors, "position_sizing.sequence missing BUY_PERMISSION_MATRIX_V1") decision_flow = load_yaml(ROOT / "spec" / "09_decision_flow.yaml", errors) or {} deterministic = ((decision_flow.get("decision_flow") or {}).get("deterministic_execution_control")) or {} if not deterministic: fail(errors, "decision_flow missing deterministic_execution_control") else: for key in ("trace_required_fields", "tie_breaker_order", "null_propagation_rule", "no_freeform_override"): if key not in deterministic: fail(errors, f"deterministic_execution_control missing {key}") def validate_harness_contract_consistency(errors: list[str]) -> None: """E3: 19_harness_contract.yaml의 scalar/collection_keys가 validate_harness_context.py에 모두 검사되는지 교차검증.""" contract = load_yaml(ROOT / "spec" / "19_harness_contract.yaml", errors) or {} validator_path = ROOT / "tools" / "validate_harness_context.py" try: validator_text = validator_path.read_text(encoding="utf-8") except Exception as exc: fail(errors, f"cannot read validate_harness_context.py: {exc}") return keys_section = ((contract.get("harness_contract") or {}).get("required_harness_context_keys")) or {} raw_scalars = keys_section.get("scalar_keys") or [] raw_collections = keys_section.get("collection_keys") or [] # 주석(# ...) 및 비문자열 항목 제거 def clean_keys(raw: list) -> list[str]: result = [] for item in raw: if isinstance(item, str): result.append(item.strip()) return result scalar_keys = clean_keys(raw_scalars) collection_keys = clean_keys(raw_collections) for key in scalar_keys: if f'"{key}"' not in validator_text: fail(errors, f"harness_contract scalar_key not checked in validator: {key}") for key in collection_keys: if f'"{key}"' not in validator_text: fail(errors, f"harness_contract collection_key not checked in validator: {key}") def main() -> int: errors: list[str] = [] yaml_paths = [ ROOT / "RetirementAssetPortfolio.yaml", ROOT / "RetirementAssetPortfolioReportTemplate.yaml", *sorted((ROOT / "spec").rglob("*.yaml")), *sorted((ROOT / "examples").glob("*.yaml")), *sorted((ROOT / "proposals").glob("*.yaml")), *sorted((ROOT / "tests").glob("*.yaml")), ] for path in yaml_paths: load_yaml(path, errors) json_paths = sorted((ROOT / "schemas").glob("*.json")) + sorted((ROOT / "examples").glob("*.json")) parsed_json = {path: load_json(path, errors) for path in json_paths} for path in sorted((ROOT / "examples").glob("*.jsonl")): try: for line_no, line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1): if line.strip(): json.loads(line) except Exception as exc: fail(errors, f"JSONL parse failed: {path}:{line_no}: {type(exc).__name__}: {exc}") manifest = load_yaml(ROOT / "RetirementAssetPortfolio.yaml", errors) or {} for step_name, step in (manifest.get("load_sequence") or {}).items(): for file_name in step.get("files", []): if "*" not in file_name and not (ROOT / file_name).exists(): fail(errors, f"manifest load_sequence missing file: {step_name}: {file_name}") for key, file_name in (manifest.get("spec_files") or {}).items(): if not isinstance(file_name, str): continue if "*" not in file_name and not (ROOT / file_name).exists(): fail(errors, f"manifest spec_files missing file: {key}: {file_name}") # All spec YAML files should be registered in manifest, governance, split indexes, or compatibility indexes. manifest_text = (ROOT / "RetirementAssetPortfolio.yaml").read_text(encoding="utf-8") for path in sorted((ROOT / "spec").rglob("*.yaml")): rel = path.relative_to(ROOT).as_posix() if rel not in manifest_text and rel not in {"spec/03_risk_policy.yaml", "spec/04_strategy_rules.yaml"}: fail(errors, f"spec file not registered in manifest: {rel}") if path.stat().st_size > MAX_SPEC_BYTES and path.name not in { "03_risk_policy.yaml", "04_strategy_rules.yaml", "13_formula_registry.yaml", "13b_harness_formulas.yaml", "12_field_dictionary.yaml", "formula_golden_cases_v2.yaml", # BCH-V1 골든케이스 — 공식 수 증가로 50KB 초과 허용 "formula_golden_cases_nf.yaml", # NF1~NF5 Python-harness 보조 공식 명세 golden cases "calibration_registry.yaml", # CALIB-V1 임계값 레지스트리 "27_bch_calibration_runbook.yaml", # BCH 런북 "output_field_owner_ledger.yaml", # generated ledger — size threshold exempt "formula_registry.normalized.yaml", # Normalized formula registry "factor_lifecycle_registry.yaml", # Factor lifecycle registry "exit.yaml", "risk.yaml", }: fail(errors, f"spec file exceeds {MAX_SPEC_BYTES} bytes and should be split/indexed: {rel}") combined_text = "\n".join( path.read_text(encoding="utf-8") for path in [ ROOT / "RetirementAssetPortfolio.yaml", ROOT / "AGENTS.md", ROOT / "spec" / "07_output_schema.yaml", ROOT / "prompts" / "analysis_prompt.md", ROOT / "schemas" / "output_schema.json", ] ) if "F2-json-output" in combined_text or "F5-complete-output" in combined_text: fail(errors, "stale JSON schema version reference remains") if SCHEMA_VERSION not in combined_text: fail(errors, "current schema version not referenced") active_text_paths = [ ROOT / "RetirementAssetPortfolio.yaml", ROOT / "AGENTS.md", ROOT / "RetirementAssetPortfolioReportTemplate.yaml", *sorted((ROOT / "spec").rglob("*.yaml")), *sorted((ROOT / "prompts").glob("*.md")), *sorted((ROOT / "tests").glob("*.yaml")), ] active_text = "\n".join(path.read_text(encoding="utf-8") for path in active_text_paths) bad_legacy = [ "llm_compact_execution_contract.non_negotiable_tables", "llm_compact_execution_contract.master_prohibitions", "llm_compact_execution_contract.hard_stops", ] legacy_allowed_file = ROOT / "spec" / "00_execution_contract.yaml" active_text_without_alias_file = "\n".join( path.read_text(encoding="utf-8") for path in active_text_paths if path != legacy_allowed_file ) for token in bad_legacy: if token in active_text_without_alias_file: fail(errors, f"legacy dangling reference remains: {token}") exec_contract = load_yaml(ROOT / "spec" / "00_execution_contract.yaml", errors) or {} mp = exec_contract.get("master_prohibitions") or {} for pid in ("P1", "P2", "P3", "P4", "P5"): if not any(str(key).startswith(pid) for key in mp): fail(errors, f"master_prohibitions missing {pid}") for match in re.finditer(r"master_prohibitions\.P([1-5])", active_text): pid = "P" + match.group(1) if not any(str(key).startswith(pid) for key in mp): fail(errors, f"reference to undefined {pid}") # Derived adapters must not claim broad canonical authority. for path in sorted((ROOT / "spec").rglob("*.yaml")): data = load_yaml(path, errors) if not isinstance(data, dict): continue meta = data.get("meta") or {} if meta.get("role") == "derived_adapter": text = path.read_text(encoding="utf-8") if re.search(r"^\s+canonical:\s+true\s*$", text, flags=re.MULTILINE): fail(errors, f"derived_adapter has broad canonical:true: {path}") numeric_lines = [] for line_no, line in enumerate(text.splitlines(), 1): if re.search(r"(? expiry: fail(errors, f"deprecated alias expired: {deprecated} remove_after={remove_after}") except ValueError: fail(errors, f"invalid alias remove_after date: {deprecated} remove_after={remove_after}") for path in active_text_paths: if path in alias_files: continue if deprecated in path.read_text(encoding="utf-8"): fail(errors, f"deprecated alias used outside alias/index files: {deprecated} in {path}") # Examples are illustrative, but they must not teach legacy paths to downstream LLM runs. example_text_paths = [*sorted((ROOT / "examples").glob("*.yaml")), *sorted((ROOT / "examples").glob("*.jsonl"))] for deprecated in alias_map: for path in example_text_paths: if deprecated in path.read_text(encoding="utf-8"): fail(errors, f"deprecated alias used in example: {deprecated} in {path}") # Bundle profiles are manifest-owned; build script must follow the manifest lists. profiles = manifest.get("bundle_profiles") or {} for profile_name in ("compact", "ultra_compact"): profile = profiles.get(profile_name) if not isinstance(profile, dict): fail(errors, f"manifest missing bundle_profiles.{profile_name}") continue for file_name in profile.get("files", []): if "*" not in file_name and not (ROOT / file_name).exists(): fail(errors, f"bundle profile missing file: {profile_name}: {file_name}") ownership = load_yaml(ROOT / "spec" / "ownership_map.yaml", errors) or {} for file_name, policy in (ownership.get("ownership_map") or {}).items(): path = ROOT / file_name if not path.exists(): continue text = path.read_text(encoding="utf-8") for forbidden in policy.get("must_not_own", []): # Korean natural-language labels are advisory. Only enforce key-like forbidden tokens. if re.match(r"^[A-Za-z0-9_.:/_-]+$", forbidden) and forbidden in text: fail(errors, f"ownership violation: {file_name} contains must_not_own token {forbidden}") xref = load_yaml(ROOT / "spec" / "xref_matrix.yaml", errors) or {} for file_name, policy in (xref.get("xref_matrix") or {}).items(): candidates = [p for p in active_text_paths if p.relative_to(ROOT).as_posix().startswith(file_name.rstrip("/"))] for path in candidates: text = path.read_text(encoding="utf-8") for forbidden in policy.get("must_not_reference", []): if forbidden and forbidden in text: fail(errors, f"xref violation: {path} references forbidden token {forbidden}") for bundle in ( ROOT / "dist" / "retirement_portfolio_bundle.yaml", ROOT / "dist" / "retirement_portfolio_compact.yaml", ROOT / "dist" / "retirement_portfolio_ultra_compact.yaml", ): if bundle.exists(): load_yaml(bundle, errors) if errors: print("VALIDATION FAIL") for err in errors: print(f"- {err}") return 1 print("VALIDATION OK") return 0 if __name__ == "__main__": raise SystemExit(main())