from __future__ import annotations import datetime as dt import math import re import sys import zipfile from pathlib import Path from xml.etree import ElementTree as ET ROOT = Path(__file__).resolve().parents[1] XLSX = ROOT / "GatherTradingData.xlsx" WORKBOOK_ROLE = "provided_raw_analysis_data" REQUIRED_SHEETS = {"data_feed", "sector_flow", "macro", "event_risk", "core_satellite"} REQUIRED_COLUMNS = { "data_feed": { "Ticker", "Name", "Flow_OK", "Frg_5D", "Inst_5D", "Open", "PrevClose", "High", "Low", "Volume", "AvgVolume_5D", "MA20", "MA60", "Ret10D", "Ret20D", "Ret60D", "Timing_Score_Entry", "Timing_Score_Exit", "Timing_Action", "Sell_Action", "Sell_Qty", "Sell_Price_Basis", "Sell_Execution_Window", "Sell_Order_Type", "Sell_Validation", "Account_Holding_Qty", "Account_Parse_Status", "Rule_Sell_Qty", "Rebalance_Need_KRW", "Override_Sell_Qty", "Override_Validation", "Final_Action", "Action_Priority", "Priority_Score", "Final_Rank", "Decision_Source", }, "sector_flow": { "Sector", "Proxy_Ticker", "Proxy_Name", "Proxy_Type", "Coverage_Weight", "Sector_Ret5D", "Sector_Ret20D", "Sector_RS_20D", "SmartMoney_5D_KRW", "SmartMoney_20D_KRW", "Sector_AvgTradeValue_20D_KRW", "SmartMoney_5D_Norm", "Flow_Breadth_5D", "Flow_Rows_Min", "Stale_Count", "ETF_Liquidity_Score", "ETF_NAV_Risk", "ETF_Liquidity_Status", "ETF_Execution_Use", "Sector_Median_PE", "Sector_Median_PBR", "Sector_Score", "Sector_Rank", "Alert_Level", "Data_Quality", "Decision_Use", "Reason", "AsOfDate", }, "sector_universe": { "Sector", "Proxy_Ticker", "Proxy_Type", "Constituent_Code", "Weight", "Enabled", }, "etf_nav_manual": { "ETF_Ticker", "ETF_Name", "Close", "NAV", "iNAV", "Premium_Discount_Pct", "Tracking_Error", "AUM", "Source_Date", "Source", "Enabled", }, "sector_flow_history": { "Snapshot_Date", "Sector", "Sector_Score", "Sector_Rank", "SmartMoney_5D_KRW", "Flow_Breadth_5D", "Data_Quality", "Decision_Use", }, "monthly_history": { "Month", "Total_Asset", "Orbit_Gap_Pct", "Orbit_State", }, "macro": {"Symbol", "Name", "Close", "Status"}, "event_risk": {"Date", "Event", "Impact"}, "core_satellite": { "Ticker", "Name", "Open", "PrevClose", "High", "Low", "Volume", "AvgVolume_5D", "MA20", "MA60", "Ret10D", "Ret20D", "Ret60D" }, "account_snapshot": { "ticker", "holding_quantity", "immediate_cash", "settlement_cash_d2", "parse_status", "user_confirmed", }, } RECOMMENDED_COLUMNS = { "data_feed": { "AvgTradeValue_5D_KRW", "AvgTradeValue_20D_KRW", "TradeValue_Unit", }, "core_satellite": { "AvgTradeValue_5D_KRW", "AvgTradeValue_20D_KRW", "TradeValue_Unit", "Timing_Action", "Timing_Score_Entry", "Timing_Score_Exit", "Candidate_Quality_Grade", "T1_Forced_Sell_Risk_Score", "T1_Forced_Sell_Risk_State", "Sell_Conflict_Score", "Sell_Conflict_State", "Execution_Recommendation_State", }, } STRICT_TICKER_SHEETS = {"data_feed", "core_satellite"} STRICT_TEXT_CODE_COLUMNS = { "sector_universe": {"Proxy_Ticker", "Base_Ticker", "Constituent_Code"}, "etf_nav_manual": {"ETF_Ticker"}, "sector_flow": {"Proxy_Ticker"}, } STRICT_NUMERIC_COLUMNS = { "data_feed": { "Close", "ATR20", "Frg_5D", "Inst_5D", "Indiv_5D", "Flow_Rows", "AvgTradeValue_5D_KRW", "AvgTradeValue_20D_KRW", "Timing_Score_Entry", "Timing_Score_Exit", "Sell_Ratio_Pct", "Account_Holding_Qty", "Account_Avg_Cost", "Account_Market_Value", "Rule_Sell_Qty", "Rebalance_Target_Cash_Pct", "Rebalance_Need_KRW", "Override_Sell_Qty", "Action_Priority", "Priority_Score", "Final_Rank", }, "core_satellite": { "Close", "ATR20", "Frg_5D", "Inst_5D", "Indiv_5D", "Flow_Rows", "AvgTradeValue_5D_KRW", "AvgTradeValue_20D_KRW", "RS_Rank_20D", "RS_Pct_20D", "Timing_Score_Entry", "Timing_Score_Exit", "T1_Forced_Sell_Risk_Score", "Sell_Conflict_Score", }, "sector_flow": { "Coverage_Weight", "Sector_Ret5D", "Sector_Ret20D", "Sector_RS_20D", "SmartMoney_5D_KRW", "SmartMoney_20D_KRW", "Sector_AvgTradeValue_20D_KRW", "SmartMoney_5D_Norm", "Flow_Breadth_5D", "Flow_Rows_Min", "Stale_Count", "ETF_Liquidity_Score", "Sector_Score", "Sector_Rank", }, "sector_universe": {"Weight"}, "etf_nav_manual": { "Close", "NAV", "iNAV", "Premium_Discount_Pct", "Tracking_Error", "AUM", }, "sector_flow_history": { "Sector_Score", "Sector_Rank", "SmartMoney_5D_KRW", "SmartMoney_20D_KRW", "Flow_Breadth_5D", }, "monthly_history": { "Total_Asset", "MoM_Return_Pct", "YTD_Return_Pct", "Orbit_Gap_Pct", }, } ERROR_VALUE_RE = re.compile(r"^#(?:VALUE!|NUM!|REF!|DIV/0!|NAME\?|N/A)") def workbook_sheet_names(path: Path) -> list[str]: with zipfile.ZipFile(path) as zf: xml = zf.read("xl/workbook.xml") root = ET.fromstring(xml) ns = {"m": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"} names = [] for sheet in root.findall(".//m:sheet", ns): name = sheet.attrib.get("name") if name: names.append(name) return names def workbook_sheet_map(path: Path) -> dict[str, str]: with zipfile.ZipFile(path) as zf: workbook = ET.fromstring(zf.read("xl/workbook.xml")) rels = ET.fromstring(zf.read("xl/_rels/workbook.xml.rels")) ns = { "m": "http://schemas.openxmlformats.org/spreadsheetml/2006/main", "r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships", "rel": "http://schemas.openxmlformats.org/package/2006/relationships", } rel_map = {rel.attrib["Id"]: rel.attrib["Target"] for rel in rels.findall(".//rel:Relationship", ns)} result = {} for sheet in workbook.findall(".//m:sheet", ns): name = sheet.attrib.get("name") rid = sheet.attrib.get(f"{{{ns['r']}}}id") target = rel_map.get(rid or "") if name and target: normalized = target.lstrip("/") if not normalized.startswith("xl/"): normalized = "xl/" + normalized result[name] = normalized return result def shared_strings(path: Path) -> list[str]: with zipfile.ZipFile(path) as zf: if "xl/sharedStrings.xml" not in zf.namelist(): return [] root = ET.fromstring(zf.read("xl/sharedStrings.xml")) ns = {"m": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"} values = [] for si in root.findall(".//m:si", ns): text = "".join(t.text or "" for t in si.findall(".//m:t", ns)) values.append(text) return values def cell_text(cell: ET.Element, strings: list[str], ns: dict[str, str]) -> str: value = cell.find("m:v", ns) raw = value.text if value is not None else "" if cell.attrib.get("t") == "s" and raw.isdigit(): idx = int(raw) return strings[idx] if idx < len(strings) else raw inline = cell.find("m:is/m:t", ns) if inline is not None: return inline.text or "" return raw def first_rows_values(path: Path, sheet_xml: str, strings: list[str], max_rows: int = 8) -> list[list[str]]: with zipfile.ZipFile(path) as zf: root = ET.fromstring(zf.read(sheet_xml)) ns = {"m": "http://schemas.openxmlformats.org/spreadsheetml/2006/main"} rows = [] for row in root.findall(".//m:sheetData/m:row", ns)[:max_rows]: values = [cell_text(cell, strings, ns) for cell in row.findall("m:c", ns)] rows.append([value.strip() for value in values if value and value.strip()]) return rows def is_number(value: object) -> bool: if value in (None, "") or isinstance(value, bool): return False try: return math.isfinite(float(value)) except (TypeError, ValueError): return False def strict_workbook_checks(path: Path) -> tuple[list[str], list[str]]: """Validate value-level invariants not visible from raw XML header checks.""" errors: list[str] = [] warnings: list[str] = [] try: import openpyxl # type: ignore except ImportError: warnings.append("openpyxl unavailable; skipped strict value/format checks") return errors, warnings wb = openpyxl.load_workbook(path, data_only=True, read_only=False) for ws in wb.worksheets: data_rows = [r for r in range(1, ws.max_row + 1) if ws.cell(r, 1).value not in (None, "")] for r in data_rows: for c in range(1, ws.max_column + 1): cell = ws.cell(r, c) value = cell.value if isinstance(value, str) and ERROR_VALUE_RE.match(value): header = ws.cell(2, c).value if ws.max_row >= 2 else None errors.append(f"{ws.title}!{cell.coordinate} has error value {value} under header={header}") for sheet_name, numeric_columns in STRICT_NUMERIC_COLUMNS.items(): if sheet_name not in wb.sheetnames: continue ws = wb[sheet_name] headers = [ws.cell(2, c).value for c in range(1, ws.max_column + 1)] header_map = {str(h).strip(): i + 1 for i, h in enumerate(headers) if h not in (None, "")} data_rows = [r for r in range(3, ws.max_row + 1) if ws.cell(r, 1).value not in (None, "")] if not data_rows: errors.append(f"{sheet_name} has no data rows") continue if sheet_name in STRICT_TICKER_SHEETS and "Ticker" in header_map: col = header_map["Ticker"] bad_tickers = [] bad_formats = [] for r in data_rows: value = ws.cell(r, col).value text = "" if value is None else str(value).strip() if text and not re.fullmatch(r"\d{6}|\d{4}[A-Z]\d", text): bad_tickers.append((r, text)) if ws.cell(r, col).number_format != "@": bad_formats.append((r, ws.cell(r, col).number_format)) if bad_tickers: errors.append(f"{sheet_name}.Ticker invalid samples: {bad_tickers[:5]}") if bad_formats: errors.append(f"{sheet_name}.Ticker must be text format '@'; samples={bad_formats[:5]}") for column in sorted(STRICT_TEXT_CODE_COLUMNS.get(sheet_name, set())): if column not in header_map: continue col = header_map[column] bad_codes = [] bad_formats = [] for r in data_rows: value = ws.cell(r, col).value text = "" if value is None else str(value).strip() if text and not re.fullmatch(r"\d{6}|\d{4}[A-Z]\d", text): bad_codes.append((r, text)) if ws.cell(r, col).number_format != "@": bad_formats.append((r, ws.cell(r, col).number_format)) if bad_codes: errors.append(f"{sheet_name}.{column} invalid code samples: {bad_codes[:5]}") if bad_formats: errors.append(f"{sheet_name}.{column} must be text format '@'; samples={bad_formats[:5]}") for column in sorted(numeric_columns): if column not in header_map: continue col = header_map[column] bad_values = [] date_like = [] for r in data_rows: value = ws.cell(r, col).value if value in (None, ""): continue if isinstance(value, (dt.date, dt.datetime)): date_like.append((r, str(value))) elif not is_number(value): bad_values.append((r, value)) if date_like: errors.append(f"{sheet_name}.{column} is date-formatted/date-valued; samples={date_like[:5]}") if bad_values: errors.append(f"{sheet_name}.{column} non-numeric samples: {bad_values[:5]}") if sheet_name == "data_feed": required = { "Sell_Validation", "Sell_Qty", "Final_Action", "Sell_Price_Basis", "Sell_Execution_Window", "Sell_Order_Type", "Action_Priority", "Priority_Score", "Final_Rank", "Account_Holding_Qty", "Rule_Sell_Qty", "Override_Sell_Qty", "Override_Validation", } if required <= set(header_map): sell_validation_col = header_map["Sell_Validation"] sell_qty_col = header_map["Sell_Qty"] final_action_col = header_map["Final_Action"] action_priority_col = header_map["Action_Priority"] priority_score_col = header_map["Priority_Score"] final_rank_col = header_map["Final_Rank"] account_qty_col = header_map["Account_Holding_Qty"] rule_sell_qty_col = header_map["Rule_Sell_Qty"] override_qty_col = header_map["Override_Sell_Qty"] override_validation_col = header_map["Override_Validation"] sell_action_col = header_map["Sell_Action"] sell_price_basis_col = header_map["Sell_Price_Basis"] sell_execution_window_col = header_map["Sell_Execution_Window"] sell_order_type_col = header_map["Sell_Order_Type"] bad_sell_qty = [] missing_sell_basis = [] bad_rule_qty = [] bad_override_qty = [] ranks = [] priority_rows = [] for r in data_rows: sell_validation = ws.cell(r, sell_validation_col).value sell_qty = ws.cell(r, sell_qty_col).value sell_action = ws.cell(r, sell_action_col).value if sell_validation == "NO_HOLDING_QTY" and sell_qty not in (None, ""): bad_sell_qty.append((r, sell_qty)) if sell_action not in (None, "", "HOLD"): basis = ws.cell(r, sell_price_basis_col).value window = ws.cell(r, sell_execution_window_col).value order_type = ws.cell(r, sell_order_type_col).value if basis in (None, "") or window in (None, "") or order_type in (None, ""): missing_sell_basis.append((r, sell_action, basis, window, order_type)) account_qty = ws.cell(r, account_qty_col).value rule_sell_qty = ws.cell(r, rule_sell_qty_col).value override_qty = ws.cell(r, override_qty_col).value override_validation = ws.cell(r, override_validation_col).value if sell_validation == "PASS" and is_number(rule_sell_qty) and is_number(account_qty): if int(float(rule_sell_qty)) > int(float(account_qty)): bad_rule_qty.append((r, rule_sell_qty, account_qty)) if override_validation == "PASS_USER_CASH_TARGET": if not (is_number(override_qty) and is_number(account_qty)): bad_override_qty.append((r, override_qty, account_qty, "missing_numeric")) elif int(float(override_qty)) > int(float(account_qty)): bad_override_qty.append((r, override_qty, account_qty, "exceeds_holding")) final_action = ws.cell(r, final_action_col).value action_priority = ws.cell(r, action_priority_col).value priority_score = ws.cell(r, priority_score_col).value final_rank = ws.cell(r, final_rank_col).value if final_action not in (None, ""): priority_rows.append((r, action_priority, priority_score, final_rank)) if is_number(final_rank): ranks.append(int(float(final_rank))) if bad_sell_qty: errors.append( "data_feed.Sell_Qty must be blank when Sell_Validation=NO_HOLDING_QTY; " f"samples={bad_sell_qty[:5]}" ) if missing_sell_basis: errors.append(f"data_feed sell actions require price basis/window/order type samples: {missing_sell_basis[:5]}") if bad_rule_qty: errors.append(f"data_feed.Rule_Sell_Qty exceeds Account_Holding_Qty samples: {bad_rule_qty[:5]}") if bad_override_qty: errors.append(f"data_feed.Override_Sell_Qty invalid samples: {bad_override_qty[:5]}") if priority_rows: expected = list(range(1, len(priority_rows) + 1)) if sorted(ranks) != expected: errors.append( "data_feed.Final_Rank must be a contiguous 1-based rank across final-action rows; " f"found={sorted(ranks)[:20]}, expected_count={len(priority_rows)}" ) missing_priority = [ (r, action_priority, priority_score, final_rank) for r, action_priority, priority_score, final_rank in priority_rows if not (is_number(action_priority) and is_number(priority_score) and is_number(final_rank)) ] if missing_priority: errors.append(f"data_feed final priority fields missing/non-numeric samples: {missing_priority[:5]}") if sheet_name == "etf_nav_manual": required_for_enabled = {"ETF_Ticker", "NAV", "iNAV", "Source_Date", "Enabled"} if required_for_enabled <= set(header_map): enabled_col = header_map["Enabled"] nav_col = header_map["NAV"] inav_col = header_map["iNAV"] date_col = header_map["Source_Date"] invalid_enabled = [] for r in data_rows: enabled = str(ws.cell(r, enabled_col).value or "").strip().upper() if enabled not in {"Y", "YES", "TRUE", "1"}: continue nav = ws.cell(r, nav_col).value inav = ws.cell(r, inav_col).value source_date = ws.cell(r, date_col).value if not (is_number(nav) or is_number(inav)) or source_date in (None, ""): invalid_enabled.append((r, nav, inav, source_date)) if invalid_enabled: errors.append( "etf_nav_manual Enabled=Y rows require NAV or iNAV plus Source_Date; " f"samples={invalid_enabled[:5]}" ) if sheet_name == "core_satellite_status": required = {"Status", "Universe_Count", "Processed_Count", "Coverage_Pct"} if required <= set(header_map): status_col = header_map["Status"] universe_col = header_map["Universe_Count"] processed_col = header_map["Processed_Count"] coverage_col = header_map["Coverage_Pct"] status_rows = data_rows[:1] for r in status_rows: status = str(ws.cell(r, status_col).value or "").strip() universe = ws.cell(r, universe_col).value processed = ws.cell(r, processed_col).value coverage = ws.cell(r, coverage_col).value if status == "COMPLETE": if not (is_number(universe) and is_number(processed) and int(float(universe)) == int(float(processed))): errors.append(f"core_satellite_status COMPLETE but processed != universe at row {r}") if not (is_number(coverage) and float(coverage) >= 99.9): errors.append(f"core_satellite_status COMPLETE but coverage < 100 at row {r}: {coverage}") return errors, warnings def main() -> int: errors: list[str] = [] warnings: list[str] = [] if not XLSX.exists(): errors.append(f"missing xlsx: {XLSX}") elif not zipfile.is_zipfile(XLSX): errors.append(f"not a valid xlsx zip: {XLSX}") else: names = workbook_sheet_names(XLSX) missing = sorted(REQUIRED_SHEETS - set(names)) if missing: errors.append(f"missing required sheets: {missing}; found={names}") bad_names = [name for name in names if re.search(r"\s+$", name)] if bad_names: errors.append(f"sheet names have trailing spaces: {bad_names}") sheet_map = workbook_sheet_map(XLSX) strings = shared_strings(XLSX) for sheet_name, required in REQUIRED_COLUMNS.items(): if sheet_name not in sheet_map: continue candidate_rows = first_rows_values(XLSX, sheet_map[sheet_name], strings) matched_header = next((set(row) for row in candidate_rows if required <= set(row)), None) if matched_header is None: best = max((set(row) for row in candidate_rows), key=lambda row: len(required & row), default=set()) missing_cols = sorted(required - best) errors.append(f"{sheet_name} missing required columns: {missing_cols}; sampled_rows={candidate_rows}") recommended = RECOMMENDED_COLUMNS.get(sheet_name, set()) if recommended: best = max((set(row) for row in candidate_rows), key=lambda row: len(recommended & row), default=set()) missing_recommended = sorted(recommended - best) if missing_recommended: warnings.append(f"{sheet_name} missing recommended columns: {missing_recommended}") strict_errors, strict_warnings = strict_workbook_checks(XLSX) errors.extend(strict_errors) warnings.extend(strict_warnings) if errors: print("DATA SAMPLE VALIDATION FAIL") for err in errors: print(f"- {err}") return 1 print(f"DATA SAMPLE VALIDATION OK: {XLSX.name} role={WORKBOOK_ROLE}") for warning in warnings: print(f"DATA SAMPLE VALIDATION WARN: {warning}") return 0 if __name__ == "__main__": raise SystemExit(main())