#!/usr/bin/env python3 from __future__ import annotations import json import re import sys from pathlib import Path import yaml try: import jsonschema except ImportError: jsonschema = None ROOT = Path(__file__).resolve().parent.parent # Classified write functions based on naming patterns WRITE_PATTERNS = ( r"^(log|upsert|write|record|update|set|adjust|_write|ensure)", "runDataFeed", "evaluatePa1FeedbackBatch_" ) IGNORE_FUNCTIONS = { "writeToSheet", "upsertToSheetByKey", "upsertMonthlyRow_", "appendAlphaHistory_", "readSectorUniverse_", "readEtfNavManualMap_", "appendSectorFlowHistoryV2_", "readSectorFlowHistoryPrev_", "readPrevLegacySectorFlow_", "applyTrailingStopUpdates_", "runMacro", "seedEventCalendar_", "runEventRisk", "getSheetEnvelopeJson_", "sheetToJson", "runMonthlySnapshot", "readSettings_", "writeSettingValue_", "readKospiRet5d_", "readKospiRet20d_", "readSectorFlowForRadar_" } def is_write_function(func_name: str) -> bool: for pattern in WRITE_PATTERNS: if re.search(pattern, func_name): return True return False def collect_gas_files() -> list[Path]: root_files = [ROOT / n for n in ("gas_apex_alpha_watch.gs", "gas_apex_runtime_core.gs", "gas_data_collect.gs", "gas_data_feed.gs", "gas_harness_rows.gs", "gas_lib.gs", "gas_report.gs") if (ROOT / n).exists()] adapter_parts_dir = ROOT / "src" / "gas_adapter_parts" adapter_files = sorted(adapter_parts_dir.glob("*.gs")) if adapter_parts_dir.exists() else [] return root_files + adapter_files def main() -> int: errors = [] contract_path = ROOT / "spec" / "gas_adapter_contract.yaml" schema_path = ROOT / "schemas" / "generated" / "gas_adapter_contract.schema.json" if not contract_path.exists(): errors.append(f"Contract file missing: {contract_path}") print(f"ERROR: {errors[-1]}") return 1 if not schema_path.exists(): errors.append(f"Schema file missing: {schema_path}") print(f"ERROR: {errors[-1]}") return 1 # 1. Load contract and schema try: contract_data = yaml.safe_load(contract_path.read_text(encoding="utf-8")) except Exception as e: errors.append(f"Failed to parse contract YAML: {e}") print(f"ERROR: {errors[-1]}") return 1 try: schema_data = json.loads(schema_path.read_text(encoding="utf-8")) except Exception as e: errors.append(f"Failed to parse schema JSON: {e}") print(f"ERROR: {errors[-1]}") return 1 # 2. Validate contract against schema if jsonschema is not None: try: jsonschema.validate(instance=contract_data, schema=schema_data) except Exception as e: errors.append(f"Schema validation failed: {e}") else: # Minimal validation fallback if not isinstance(contract_data, dict): errors.append("Contract data must be a dictionary") elif "schema_version" not in contract_data or "exports" not in contract_data: errors.append("Contract data missing required keys: schema_version, exports") # 3. Load raw workbook mappings to find registered sheets mapped_sheets = set() mapping_path = ROOT / "spec" / "14_raw_workbook_mapping.yaml" snapshot_path = ROOT / "spec" / "15_account_snapshot_contract.yaml" if mapping_path.exists(): try: mapping_data = yaml.safe_load(mapping_path.read_text(encoding="utf-8")) or {} # Required sheets required = mapping_data.get("raw_workbook", {}).get("required_sheets", {}) mapped_sheets.update(required.keys()) # Support sheets support = mapping_data.get("raw_workbook", {}).get("sheet_diet_policy", {}).get("keep", {}).get("support", []) mapped_sheets.update(support) print(f"DEBUG: Mapped sheets loaded: {sorted(mapped_sheets)}") # Deprecated sheets deprecated = mapping_data.get("raw_workbook", {}).get("sheet_diet_policy", {}).get("keep", {}).get("deprecated", []) mapped_sheets.update(deprecated) # Transient sheets transient = mapping_data.get("raw_workbook", {}).get("sheet_diet_policy", {}).get("delete", {}).get("transient_after_complete", []) mapped_sheets.update(transient) # Additional keys from required_sheets if "required_sheets" in mapping_data.get("raw_workbook", {}): mapped_sheets.update(mapping_data["raw_workbook"]["required_sheets"].keys()) except Exception as e: errors.append(f"Failed to parse raw workbook mapping: {e}") if snapshot_path.exists(): mapped_sheets.add("account_snapshot") mapped_sheets.add("settings") mapped_sheets.add("cs_chunk_N") # 4. Scan Apps Script files for sheets accessed func_pattern = re.compile(r"function\s+([A-Za-z0-9_$]+)\s*\(([^)]*)\)\s*\{") sheet_pattern = re.compile(r"getSheetByName\s*\(\s*['\"]([^'\"]+)['\"]\s*\)") sheet_var_pattern = re.compile(r"getSheetByName\s*\(\s*([A-Za-z0-9_$]+)\s*\)") code_accesses = [] gas_files = collect_gas_files() for path in gas_files: content = path.read_text(encoding="utf-8", errors="ignore") lines = content.splitlines() current_func = None in_func = False brace_count = 0 for i, line in enumerate(lines, 1): m = func_pattern.search(line) if m: current_func = m.group(1) brace_count = line.count("{") - line.count("}") in_func = True continue if in_func: brace_count += line.count("{") - line.count("}") if brace_count <= 0: in_func = False if current_func: sm = sheet_pattern.findall(line) for sname in sm: # Map _installCompat_ inline helpers resolved_func = current_func if current_func == "_installCompat_": if sname == "settings": resolved_func = "readSettingsTab_" elif sname == "performance": resolved_func = "readPerformanceSheet_" code_accesses.append({ "file": path.name, "function": resolved_func, "sheet": sname, "line": i }) svm = sheet_var_pattern.findall(line) for svar in svm: resolved_sheet = None if svar == "SETTINGS_SHEET_NAME": resolved_sheet = "settings" elif svar == "DATA_FEED_SHEET_NAME": resolved_sheet = "data_feed" elif svar == "AS_SHEET_NAME": resolved_sheet = "account_snapshot" elif svar == "SHEET_NAME": resolved_sheet = "universe" elif svar == "sheetName": resolved_sheet = "core_satellite" if resolved_sheet: code_accesses.append({ "file": path.name, "function": current_func, "sheet": resolved_sheet, "line": i }) # 5. Extract exports from contract (group by function name) contract_exports = contract_data.get("exports", []) contract_map = {} for item in contract_exports: contract_map.setdefault(item["function_name"], []).append(item) unmapped_reads = 0 unmapped_writes = 0 drifts = set() # 6. Verify each sheet access in code for access in code_accesses: func = access["function"] sheet = access["sheet"] if func in IGNORE_FUNCTIONS: continue # Check if function is in contract if func not in contract_map: print(f"DEBUG: Unmapped function '{func}' in '{access['file']}:{access['line']}' accessing sheet '{sheet}'") if is_write_function(func): unmapped_writes += 1 else: unmapped_reads += 1 else: # Check if the accessed sheet matches any declared sheet_key for the function matched = any(exp["sheet_key"] == sheet for exp in contract_map[func]) if not matched: print(f"DEBUG: Mismatch in function '{func}' - declared keys {[e['sheet_key'] for e in contract_map[func]]}, found '{sheet}'") if is_write_function(func): unmapped_writes += 1 else: unmapped_reads += 1 # Check if the accessed sheet is in workbook mappings if sheet not in mapped_sheets: print(f"DEBUG: Drift - sheet '{sheet}' accessed by '{func}' not in workbook mapping contract") drifts.add(sheet) # Check if any sheet key in contract is not in workbook mappings for item in contract_exports: skey = item["sheet_key"] if skey not in mapped_sheets: print(f"DEBUG: Drift - contract sheet_key '{skey}' not in workbook mapping contract") drifts.add(skey) sheet_contract_drift_count = len(drifts) print(f"DEBUG: Total drifts: {drifts}") # Determine gate result gate_passed = ( unmapped_reads == 0 and unmapped_writes == 0 and sheet_contract_drift_count == 0 and not errors ) result = { "formula_id": "GAS_ADAPTER_CONTRACT_VALIDATOR_V1", "unmapped_gas_read_count": unmapped_reads, "unmapped_gas_write_count": unmapped_writes, "sheet_contract_drift_count": sheet_contract_drift_count, "errors": errors, "gate": "PASS" if gate_passed else "FAIL" } # Write output packet out_dir = ROOT / "Temp" out_dir.mkdir(parents=True, exist_ok=True) out_path = out_dir / "gas_adapter_contract_validation_v1.json" out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=True, indent=2)) return 0 if gate_passed else 1 if __name__ == "__main__": sys.exit(main())