from __future__ import annotations import json import math import re from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] JSON_PATH = ROOT / "GatherTradingData.json" WORKBOOK_ROLE = "provided_raw_analysis_data_json" EXPECTED_SCHEMA_VERSION = "2026-05-18-json-raw-data-v1" REQUIRED_SHEETS = {"data_feed", "sector_flow", "macro", "event_risk", "core_satellite", "sell_priority"} REQUIRED_COLUMNS = { "data_feed": { "Ticker", "Name", "Flow_OK", "Frg_5D", "Inst_5D", "Open", "PrevClose", "High", "Low", "Volume", "AvgVolume_5D", "MA20", "MA60", "Ret10D", "Ret20D", "Ret60D", "Timing_Score_Entry", "Timing_Score_Exit", "Timing_Action", "Sell_Action", "Sell_Qty", "Sell_Price_Basis", "Sell_Execution_Window", "Sell_Order_Type", "Sell_Validation", "Account_Holding_Qty", "Account_Parse_Status", "Rule_Sell_Qty", "Rebalance_Need_KRW", "Override_Sell_Qty", "Override_Validation", "Final_Action", "Action_Priority", "Priority_Score", "Final_Rank", "Decision_Source", }, "sector_flow": { "Sector", "Proxy_Ticker", "Proxy_Name", "Proxy_Type", "Coverage_Weight", "Sector_Ret5D", "Sector_Ret20D", "Sector_RS_20D", "SmartMoney_5D_KRW", "SmartMoney_20D_KRW", "Sector_AvgTradeValue_20D_KRW", "SmartMoney_5D_Norm", "Flow_Breadth_5D", "Flow_Rows_Min", "Stale_Count", "ETF_Liquidity_Score", "ETF_NAV_Risk", "ETF_Liquidity_Status", "ETF_Execution_Use", "Sector_Median_PE", "Sector_Median_PBR", "Sector_Score", "Sector_Rank", "Alert_Level", "Data_Quality", "Decision_Use", "Reason", "AsOfDate", }, "macro": {"Symbol", "Name", "Close", "Status"}, "event_risk": {"Date", "Event", "Impact"}, "core_satellite": { "Ticker", "Name", "Open", "PrevClose", "High", "Low", "Volume", "AvgVolume_5D", "MA20", "MA60", "Ret10D", "Ret20D", "Ret60D", "Allowed_Action", "Final_Action", "Sell_Action", "Sell_Ratio_Pct", "Sell_Qty", "Sell_Limit_Price", "Sell_Validation", "Action_Reason", "Action_Params", "Cash_Preserve_Style", "Cash_Preserve_Ratio", "Cash_Preserve_Reason", "Candidate_Quality_Grade", "T1_Forced_Sell_Risk_Score", "T1_Forced_Sell_Risk_State", "Sell_Conflict_Score", "Sell_Conflict_State", "Execution_Recommendation_State", }, "sell_priority": { "Rank", "Ticker", "Name", "Tier", "Tier_Label", "Action_Group", "Sell_Action", "Sell_Ratio_Pct", "Sell_Qty", "Sell_Limit_Price", "Sell_Validation", "Sell_Priority_Score", "Raw_Sell_Priority_Score", "Rebound_Holdback_Score", "Cash_Preserve_Style", "Cash_Preserve_Ratio", "Cash_Preserve_Reason", "Action_Reason", "Action_Params", }, "account_snapshot": { "ticker", "holding_quantity", "immediate_cash", "settlement_cash_d2", "parse_status", "user_confirmed", }, "monthly_history": { "Month", "Total_Asset", "Orbit_Gap_Pct", "Orbit_State", }, } RECOMMENDED_COLUMNS = { "data_feed": {"AvgTradeValue_5D_KRW", "AvgTradeValue_20D_KRW", "TradeValue_Unit"}, "core_satellite": { "AvgTradeValue_5D_KRW", "AvgTradeValue_20D_KRW", "TradeValue_Unit", "Timing_Action", "Timing_Score_Entry", "Timing_Score_Exit", "Candidate_Quality_Grade", "T1_Forced_Sell_Risk_Score", "T1_Forced_Sell_Risk_State", "Sell_Conflict_Score", "Sell_Conflict_State", "Execution_Recommendation_State", }, } STRICT_TICKER_SHEETS = {"data_feed", "core_satellite", "sell_priority"} STRICT_TEXT_CODE_COLUMNS = { "sector_universe": {"Proxy_Ticker", "Base_Ticker", "Constituent_Code"}, "etf_nav_manual": {"ETF_Ticker"}, "sector_flow": {"Proxy_Ticker"}, } STRICT_NUMERIC_COLUMNS = { "data_feed": { "Close", "ATR20", "Frg_5D", "Inst_5D", "Indiv_5D", "Flow_Rows", "AvgTradeValue_5D_KRW", "AvgTradeValue_20D_KRW", "Timing_Score_Entry", "Timing_Score_Exit", "Sell_Ratio_Pct", "Account_Holding_Qty", "Account_Avg_Cost", "Account_Market_Value", "Rule_Sell_Qty", "Rebalance_Target_Cash_Pct", "Rebalance_Need_KRW", "Override_Sell_Qty", "Action_Priority", "Priority_Score", "Final_Rank", }, "core_satellite": { "Close", "ATR20", "Frg_5D", "Inst_5D", "Indiv_5D", "Flow_Rows", "AvgTradeValue_5D_KRW", "AvgTradeValue_20D_KRW", "RS_Rank_20D", "RS_Pct_20D", "Timing_Score_Entry", "Timing_Score_Exit", "T1_Forced_Sell_Risk_Score", "Sell_Conflict_Score", }, "sell_priority": { "Sell_Ratio_Pct", "Sell_Qty", "Sell_Limit_Price", "Sell_Priority_Score", "Raw_Sell_Priority_Score", "Rebound_Holdback_Score", "Cash_Preserve_Ratio", }, "sector_flow": { "Coverage_Weight", "Sector_Ret5D", "Sector_Ret20D", "Sector_RS_20D", "SmartMoney_5D_KRW", "SmartMoney_20D_KRW", "Sector_AvgTradeValue_20D_KRW", "SmartMoney_5D_Norm", "Flow_Breadth_5D", "Flow_Rows_Min", "Stale_Count", "ETF_Liquidity_Score", "Sector_Score", "Sector_Rank", }, "sector_universe": {"Weight"}, "sector_flow_history": { "Sector_Score", "Sector_Rank", "SmartMoney_5D_KRW", "SmartMoney_20D_KRW", "Flow_Breadth_5D", }, "monthly_history": { "Total_Asset", "MoM_Return_Pct", "YTD_Return_Pct", "Orbit_Gap_Pct", }, } ERROR_VALUE_RE = re.compile(r"^#(?:VALUE!|NUM!|REF!|DIV/0!|NAME\?|N/A)") def is_number(value: Any) -> bool: if value in (None, "") or isinstance(value, bool): return False try: return math.isfinite(float(value)) except (TypeError, ValueError): return False def rows_for(data: dict[str, Any], sheet: str) -> list[dict[str, Any]]: value = data.get(sheet) if isinstance(value, list): return [row for row in value if isinstance(row, dict)] return [] def validate_required(data: dict[str, Any], errors: list[str], warnings: list[str]) -> None: missing = sorted(REQUIRED_SHEETS - set(data)) if missing: errors.append(f"missing required json sheets: {missing}; found={list(data)}") for sheet, required in REQUIRED_COLUMNS.items(): if sheet not in data: continue if not isinstance(data[sheet], list): if sheet == "settings" and isinstance(data[sheet], dict): continue errors.append(f"{sheet} must be a list of row objects") continue rows = rows_for(data, sheet) if not rows: errors.append(f"{sheet} has no data rows") continue columns = set().union(*(row.keys() for row in rows[:5])) missing_cols = sorted(required - columns) if missing_cols: errors.append(f"{sheet} missing required columns: {missing_cols}") recommended = RECOMMENDED_COLUMNS.get(sheet, set()) missing_recommended = sorted(recommended - columns) if missing_recommended: warnings.append(f"{sheet} missing recommended columns: {missing_recommended}") def validate_values(data: dict[str, Any], errors: list[str]) -> None: for sheet, rows in ((name, rows_for(data, name)) for name in data): for idx, row in enumerate(rows, start=1): for key, value in row.items(): if isinstance(value, str) and ERROR_VALUE_RE.match(value): errors.append(f"{sheet}[{idx}].{key} has error value {value}") if sheet in STRICT_TICKER_SHEETS: bad = [] for idx, row in enumerate(rows, start=1): text = str(row.get("Ticker") or "").strip() if text and not re.fullmatch(r"\d{6}|\d{4}[A-Z]\d", text): bad.append((idx, text)) if bad: errors.append(f"{sheet}.Ticker invalid samples: {bad[:5]}") for col in sorted(STRICT_TEXT_CODE_COLUMNS.get(sheet, set())): bad = [] for idx, row in enumerate(rows, start=1): text = str(row.get(col) or "").strip() if text and not re.fullmatch(r"\d{6}|\d{4}[A-Z]\d", text): bad.append((idx, text)) if bad: errors.append(f"{sheet}.{col} invalid code samples: {bad[:5]}") for col in sorted(STRICT_NUMERIC_COLUMNS.get(sheet, set())): bad = [] for idx, row in enumerate(rows, start=1): value = row.get(col) if value in (None, ""): continue if not is_number(value): bad.append((idx, value)) if bad: errors.append(f"{sheet}.{col} non-numeric samples: {bad[:5]}") df_rows = rows_for(data, "data_feed") if df_rows: ranks = [] final_rows = [] bad_sell_qty = [] for idx, row in enumerate(df_rows, start=1): if row.get("Sell_Validation") == "NO_HOLDING_QTY" and row.get("Sell_Qty") not in (None, ""): bad_sell_qty.append((idx, row.get("Sell_Qty"))) if row.get("Final_Action") not in (None, ""): final_rows.append(idx) if is_number(row.get("Final_Rank")): ranks.append(int(float(row["Final_Rank"]))) if bad_sell_qty: errors.append(f"data_feed.Sell_Qty must be blank when Sell_Validation=NO_HOLDING_QTY; samples={bad_sell_qty[:5]}") if final_rows and sorted(ranks) != list(range(1, len(final_rows) + 1)): errors.append(f"data_feed.Final_Rank must be contiguous 1-based; found={sorted(ranks)}, expected_count={len(final_rows)}") status_rows = rows_for(data, "core_satellite_status") for row in status_rows[:1]: if row.get("Status") == "COMPLETE": if not (is_number(row.get("Processed_Count")) and is_number(row.get("Universe_Count"))): errors.append("core_satellite_status COMPLETE but counts are not numeric") elif int(float(row["Processed_Count"])) != int(float(row["Universe_Count"])): errors.append("core_satellite_status COMPLETE but processed != universe") if not (is_number(row.get("Coverage_Pct")) and float(row["Coverage_Pct"]) >= 99.9): errors.append(f"core_satellite_status COMPLETE but coverage < 99.9: {row.get('Coverage_Pct')}") def main() -> int: errors: list[str] = [] warnings: list[str] = [] if not JSON_PATH.exists(): errors.append(f"missing json: {JSON_PATH}") else: try: payload = json.loads(JSON_PATH.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: errors.append(f"invalid json: {exc}") payload = {} metadata = payload.get("metadata") if isinstance(payload, dict) else None data = payload.get("data") if isinstance(payload, dict) else None if not isinstance(metadata, dict): errors.append("metadata must be an object") else: if metadata.get("schema_version") != EXPECTED_SCHEMA_VERSION: errors.append(f"metadata.schema_version must be {EXPECTED_SCHEMA_VERSION}; found={metadata.get('schema_version')}") if not isinstance(data, dict): errors.append("data must be an object keyed by sheet name") else: has_harness = "_harness_context" in data harness_missing_flag = metadata.get("harness_context_missing") if isinstance(metadata, dict) else None if has_harness and harness_missing_flag not in (None, ""): errors.append("metadata.harness_context_missing must be null/empty when data._harness_context exists") if not has_harness and harness_missing_flag in (None, ""): errors.append("data._harness_context missing but metadata.harness_context_missing is null/empty") validate_required(data, errors, warnings) validate_values(data, errors) if errors: print("DATA SAMPLE JSON VALIDATION FAIL") for err in errors: print(f"- {err}") return 1 print(f"DATA SAMPLE JSON VALIDATION OK: {JSON_PATH.name} role={WORKBOOK_ROLE}") for warning in warnings: print(f"DATA SAMPLE JSON VALIDATION WARN: {warning}") return 0 if __name__ == "__main__": raise SystemExit(main())