af1236202d
- F14: late_chase_risk_score 검증 * GAS가 유일한 생산처 (Python canonical 없음) * migration_action: KEEP_IN_GAS로 정정, status: DONE - F02/F03/F04/F06: priceBasis 로직 포팅 * formulas/price_basis_v1.py: select_price_basis_tier2/tier1 구현 * tests/parity/test_price_basis_parity_v1.py: 8 parity 테스트 (모두 PASS) * GAS Number.isFinite() 의미론 정확히 재현 (math.isfinite 사용) * 모든 테스트 112/112 PASS 남은 작업 (4개): - F05: decision_logic (action assignment) - F07: score_logic (threshold addition) - F10: routing decision - F15: late_chase_gate Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
129 lines
4.9 KiB
Python
129 lines
4.9 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
import yaml
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
|
|
def main() -> int:
|
|
# 1. Load spec/12_field_dictionary.yaml
|
|
field_dict_path = ROOT / "spec" / "12_field_dictionary.yaml"
|
|
if not field_dict_path.exists():
|
|
print(f"Field dictionary not found: {field_dict_path}")
|
|
return 1
|
|
|
|
field_data = yaml.safe_load(field_dict_path.read_text(encoding="utf-8")) or {}
|
|
fields = field_data.get("field_dictionary", {}).get("fields", {})
|
|
|
|
unit_missing_count = 0
|
|
alias_collision_count = 0
|
|
missing_field_dictionary_count = 0
|
|
|
|
# Build alias & canonical maps
|
|
canonical_names = set(fields.keys())
|
|
alias_to_canonicals: dict[str, list[str]] = {}
|
|
|
|
for fid, info in fields.items():
|
|
if not info:
|
|
continue
|
|
# Check unit missing
|
|
unit = info.get("unit")
|
|
if unit is None:
|
|
unit_missing_count += 1
|
|
|
|
canonical_name = info.get("canonical_name", fid)
|
|
aliases = info.get("aliases", [])
|
|
|
|
all_names = [canonical_name] + aliases
|
|
for name in all_names:
|
|
alias_to_canonicals.setdefault(name, []).append(fid)
|
|
|
|
# Check alias collisions (same name maps to multiple distinct canonical fields)
|
|
collisions = {}
|
|
for name, canonical_list in alias_to_canonicals.items():
|
|
unique_canonicals = sorted(list(set(canonical_list)))
|
|
if len(unique_canonicals) > 1:
|
|
alias_collision_count += 1
|
|
collisions[name] = unique_canonicals
|
|
|
|
# Helper function to check if a column name matches any canonical_name or aliases
|
|
def is_field_mapped(col_name: str) -> bool:
|
|
if col_name in canonical_names:
|
|
return True
|
|
for fid, info in fields.items():
|
|
if not info:
|
|
continue
|
|
aliases = info.get("aliases", [])
|
|
if col_name in aliases:
|
|
return True
|
|
return False
|
|
|
|
# 2. Load spec/14_raw_workbook_mapping.yaml
|
|
mapping_path = ROOT / "spec" / "14_raw_workbook_mapping.yaml"
|
|
unmapped_columns = []
|
|
if mapping_path.exists():
|
|
try:
|
|
mapping_data = yaml.safe_load(mapping_path.read_text(encoding="utf-8")) or {}
|
|
sheets = mapping_data.get("raw_workbook", {}).get("required_sheets", {})
|
|
for sheet_name, sheet_info in sheets.items():
|
|
req = sheet_info.get("required_columns", [])
|
|
rec = sheet_info.get("recommended_columns", [])
|
|
for col in (req + rec):
|
|
if not is_field_mapped(col):
|
|
missing_field_dictionary_count += 1
|
|
unmapped_columns.append(f"Sheet '{sheet_name}': {col}")
|
|
except Exception as e:
|
|
print(f"Error parsing raw workbook mapping: {e}")
|
|
|
|
# 3. Load spec/15_account_snapshot_contract.yaml
|
|
snapshot_path = ROOT / "spec" / "15_account_snapshot_contract.yaml"
|
|
unmapped_snapshot_fields = []
|
|
if snapshot_path.exists():
|
|
try:
|
|
snap_data = yaml.safe_load(snapshot_path.read_text(encoding="utf-8")) or {}
|
|
contract = snap_data.get("account_snapshot_contract", {})
|
|
|
|
# required fields in capture groups
|
|
groups = contract.get("required_capture_groups", {})
|
|
for group_name, group_info in groups.items():
|
|
fields_in_group = group_info.get("required_fields", [])
|
|
for f in fields_in_group:
|
|
if not is_field_mapped(f):
|
|
missing_field_dictionary_count += 1
|
|
unmapped_snapshot_fields.append(f"Capture group '{group_name}': {f}")
|
|
|
|
# canonical fields in contract
|
|
canonicals = contract.get("canonical_fields", {})
|
|
for f in canonicals.keys():
|
|
if not is_field_mapped(f):
|
|
missing_field_dictionary_count += 1
|
|
unmapped_snapshot_fields.append(f"Canonical field: {f}")
|
|
except Exception as e:
|
|
print(f"Error parsing account snapshot contract: {e}")
|
|
|
|
gate = "PASS" if (missing_field_dictionary_count == 0 and unit_missing_count == 0 and alias_collision_count == 0) else "FAIL"
|
|
|
|
result = {
|
|
"formula_id": "RAW_WORKBOOK_MAPPING_VALIDATION_V1",
|
|
"missing_field_dictionary_count": missing_field_dictionary_count,
|
|
"unit_missing_count": unit_missing_count,
|
|
"alias_collision_count": alias_collision_count,
|
|
"gate": gate,
|
|
"collisions": collisions,
|
|
"unmapped_columns": unmapped_columns,
|
|
"unmapped_snapshot_fields": unmapped_snapshot_fields
|
|
}
|
|
|
|
out_path = ROOT / "Temp" / "raw_workbook_mapping_validation_v1.json"
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
return 0 if gate == "PASS" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|