from __future__ import annotations import json from pathlib import Path import yaml ROOT = Path(__file__).resolve().parents[1] def main() -> int: # 1. Load spec/12_field_dictionary.yaml field_dict_path = ROOT / "spec" / "12_field_dictionary.yaml" if not field_dict_path.exists(): print(f"Field dictionary not found: {field_dict_path}") return 1 field_data = yaml.safe_load(field_dict_path.read_text(encoding="utf-8")) or {} fields = field_data.get("field_dictionary", {}).get("fields", {}) unit_missing_count = 0 alias_collision_count = 0 missing_field_dictionary_count = 0 # Build alias & canonical maps canonical_names = set(fields.keys()) alias_to_canonicals: dict[str, list[str]] = {} for fid, info in fields.items(): if not info: continue # Check unit missing unit = info.get("unit") if unit is None: unit_missing_count += 1 canonical_name = info.get("canonical_name", fid) aliases = info.get("aliases", []) all_names = [canonical_name] + aliases for name in all_names: alias_to_canonicals.setdefault(name, []).append(fid) # Check alias collisions (same name maps to multiple distinct canonical fields) collisions = {} for name, canonical_list in alias_to_canonicals.items(): unique_canonicals = sorted(list(set(canonical_list))) if len(unique_canonicals) > 1: alias_collision_count += 1 collisions[name] = unique_canonicals # Helper function to check if a column name matches any canonical_name or aliases def is_field_mapped(col_name: str) -> bool: if col_name in canonical_names: return True for fid, info in fields.items(): if not info: continue aliases = info.get("aliases", []) if col_name in aliases: return True return False # 2. Load spec/14_raw_workbook_mapping.yaml mapping_path = ROOT / "spec" / "14_raw_workbook_mapping.yaml" unmapped_columns = [] if mapping_path.exists(): try: mapping_data = yaml.safe_load(mapping_path.read_text(encoding="utf-8")) or {} sheets = mapping_data.get("raw_workbook", {}).get("required_sheets", {}) for sheet_name, sheet_info in sheets.items(): req = sheet_info.get("required_columns", []) rec = sheet_info.get("recommended_columns", []) for col in (req + rec): if not is_field_mapped(col): missing_field_dictionary_count += 1 unmapped_columns.append(f"Sheet '{sheet_name}': {col}") except Exception as e: print(f"Error parsing raw workbook mapping: {e}") # 3. Load spec/15_account_snapshot_contract.yaml snapshot_path = ROOT / "spec" / "15_account_snapshot_contract.yaml" unmapped_snapshot_fields = [] if snapshot_path.exists(): try: snap_data = yaml.safe_load(snapshot_path.read_text(encoding="utf-8")) or {} contract = snap_data.get("account_snapshot_contract", {}) # required fields in capture groups groups = contract.get("required_capture_groups", {}) for group_name, group_info in groups.items(): fields_in_group = group_info.get("required_fields", []) for f in fields_in_group: if not is_field_mapped(f): missing_field_dictionary_count += 1 unmapped_snapshot_fields.append(f"Capture group '{group_name}': {f}") # canonical fields in contract canonicals = contract.get("canonical_fields", {}) for f in canonicals.keys(): if not is_field_mapped(f): missing_field_dictionary_count += 1 unmapped_snapshot_fields.append(f"Canonical field: {f}") except Exception as e: print(f"Error parsing account snapshot contract: {e}") gate = "PASS" if (missing_field_dictionary_count == 0 and unit_missing_count == 0 and alias_collision_count == 0) else "FAIL" result = { "formula_id": "RAW_WORKBOOK_MAPPING_VALIDATION_V1", "missing_field_dictionary_count": missing_field_dictionary_count, "unit_missing_count": unit_missing_count, "alias_collision_count": alias_collision_count, "gate": gate, "collisions": collisions, "unmapped_columns": unmapped_columns, "unmapped_snapshot_fields": unmapped_snapshot_fields } out_path = ROOT / "Temp" / "raw_workbook_mapping_validation_v1.json" out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if gate == "PASS" else 1 if __name__ == "__main__": raise SystemExit(main())