from __future__ import annotations from pathlib import Path import yaml ROOT = Path(__file__).resolve().parents[1] def infer_type_and_unit(name: str) -> tuple[str, str]: lower_name = name.lower() if "price" in lower_name: return "number", "KRW_per_share" elif any(q in lower_name for q in ["qty", "quantity", "count"]): return "integer", "shares" elif any(p in lower_name for p in ["pct", "ratio", "rate", "percent"]): return "number", "percent" elif any(k in lower_name for k in ["krw", "amount", "value", "cash"]): return "number", "KRW" elif "date" in lower_name or "updated" in lower_name: return "date_ISO8601", "none" elif "status" in lower_name or "mode" in lower_name or "action" in lower_name or "state" in lower_name or "gate" in lower_name: return "string", "none" else: return "number", "none" # default to number for scores/metrics def main() -> int: field_dict_path = ROOT / "spec" / "12_field_dictionary.yaml" mapping_path = ROOT / "spec" / "14_raw_workbook_mapping.yaml" snapshot_path = ROOT / "spec" / "15_account_snapshot_contract.yaml" if not field_dict_path.exists(): print("Field dictionary not found.") return 1 # Load existing fields field_data = yaml.safe_load(field_dict_path.read_text(encoding="utf-8")) or {} fields = field_data.get("field_dictionary", {}).get("fields", {}) canonical_names = set(fields.keys()) def is_field_mapped(col_name: str) -> bool: if col_name in canonical_names: return True for fid, info in fields.items(): if not info: continue aliases = info.get("aliases", []) if col_name in aliases: return True return False # Extract all unmapped column/field names unmapped_names = set() # 1. raw mapping columns if mapping_path.exists(): mapping_data = yaml.safe_load(mapping_path.read_text(encoding="utf-8")) or {} sheets = mapping_data.get("raw_workbook", {}).get("required_sheets", {}) for _, sheet_info in sheets.items(): req = sheet_info.get("required_columns", []) rec = sheet_info.get("recommended_columns", []) for col in (req + rec): if not is_field_mapped(col): unmapped_names.add(col) # 2. snapshot fields if snapshot_path.exists(): snap_data = yaml.safe_load(snapshot_path.read_text(encoding="utf-8")) or {} contract = snap_data.get("account_snapshot_contract", {}) # required capture fields groups = contract.get("required_capture_groups", {}) for _, group_info in groups.items(): fields_in_group = group_info.get("required_fields", []) for f in fields_in_group: if not is_field_mapped(f): unmapped_names.add(f) # canonical fields canonicals = contract.get("canonical_fields", {}) for f in canonicals.keys(): if not is_field_mapped(f): unmapped_names.add(f) if not unmapped_names: print("No unmapped fields found.") return 0 print(f"Found {len(unmapped_names)} unmapped fields. Adding to dictionary...") # Populate unmapped fields into dictionary for name in sorted(unmapped_names): # Determine canonical key (lower snake case) canonical_key = name.lower() if canonical_key in fields: # key collision on lowercase version, append unique suffix or skip if mapped if name not in fields[canonical_key].get("aliases", []): fields[canonical_key].setdefault("aliases", []).append(name) else: ftype, funit = infer_type_and_unit(name) fields[canonical_key] = { "canonical_name": canonical_key, "type": ftype, "unit": funit, "aliases": [name] } # Save dictionary back to spec/12_field_dictionary.yaml field_data["field_dictionary"]["fields"] = fields field_dict_path.write_text(yaml.safe_dump(field_data, sort_keys=False, allow_unicode=True), encoding="utf-8") print("Auto-populated 12_field_dictionary.yaml successfully.") return 0 if __name__ == "__main__": raise SystemExit(main())