Files
QuantEngineByItz/tools/validate_gas_adapter_contract_v1.py
T
kjh2064 af1236202d WBS-7.3: GAS→Python 마이그레이션 5개 항목 완료 (F14, F02-F06)
- F14: late_chase_risk_score 검증
  * GAS가 유일한 생산처 (Python canonical 없음)
  * migration_action: KEEP_IN_GAS로 정정, status: DONE

- F02/F03/F04/F06: priceBasis 로직 포팅
  * formulas/price_basis_v1.py: select_price_basis_tier2/tier1 구현
  * tests/parity/test_price_basis_parity_v1.py: 8 parity 테스트 (모두 PASS)
  * GAS Number.isFinite() 의미론 정확히 재현 (math.isfinite 사용)
  * 모든 테스트 112/112 PASS

남은 작업 (4개):
- F05: decision_logic (action assignment)
- F07: score_logic (threshold addition)
- F10: routing decision
- F15: late_chase_gate

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-22 22:45:00 +09:00

284 lines
10 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import json
import re
import sys
from pathlib import Path
import yaml
try:
import jsonschema
except ImportError:
jsonschema = None
ROOT = Path(__file__).resolve().parent.parent
# Classified write functions based on naming patterns
WRITE_PATTERNS = (
r"^(log|upsert|write|record|update|set|adjust|_write|ensure)",
"runDataFeed",
"evaluatePa1FeedbackBatch_"
)
IGNORE_FUNCTIONS = {
"writeToSheet",
"upsertToSheetByKey",
"upsertMonthlyRow_",
"appendAlphaHistory_",
"readSectorUniverse_",
"readEtfNavManualMap_",
"appendSectorFlowHistoryV2_",
"readSectorFlowHistoryPrev_",
"readPrevLegacySectorFlow_",
"applyTrailingStopUpdates_",
"runMacro",
"seedEventCalendar_",
"runEventRisk",
"getSheetEnvelopeJson_",
"sheetToJson",
"runMonthlySnapshot",
"readSettings_",
"writeSettingValue_",
"readKospiRet5d_",
"readKospiRet20d_",
"readSectorFlowForRadar_"
}
def is_write_function(func_name: str) -> bool:
for pattern in WRITE_PATTERNS:
if re.search(pattern, func_name):
return True
return False
def collect_gas_files() -> list[Path]:
root_files = [ROOT / n for n in ("gas_apex_alpha_watch.gs", "gas_apex_runtime_core.gs", "gas_data_collect.gs", "gas_data_feed.gs", "gas_harness_rows.gs", "gas_lib.gs", "gas_report.gs") if (ROOT / n).exists()]
adapter_parts_dir = ROOT / "src" / "gas_adapter_parts"
adapter_files = sorted(adapter_parts_dir.glob("*.gs")) if adapter_parts_dir.exists() else []
return root_files + adapter_files
def main() -> int:
errors = []
contract_path = ROOT / "spec" / "gas_adapter_contract.yaml"
schema_path = ROOT / "schemas" / "generated" / "gas_adapter_contract.schema.json"
if not contract_path.exists():
errors.append(f"Contract file missing: {contract_path}")
print(f"ERROR: {errors[-1]}")
return 1
if not schema_path.exists():
errors.append(f"Schema file missing: {schema_path}")
print(f"ERROR: {errors[-1]}")
return 1
# 1. Load contract and schema
try:
contract_data = yaml.safe_load(contract_path.read_text(encoding="utf-8"))
except Exception as e:
errors.append(f"Failed to parse contract YAML: {e}")
print(f"ERROR: {errors[-1]}")
return 1
try:
schema_data = json.loads(schema_path.read_text(encoding="utf-8"))
except Exception as e:
errors.append(f"Failed to parse schema JSON: {e}")
print(f"ERROR: {errors[-1]}")
return 1
# 2. Validate contract against schema
if jsonschema is not None:
try:
jsonschema.validate(instance=contract_data, schema=schema_data)
except Exception as e:
errors.append(f"Schema validation failed: {e}")
else:
# Minimal validation fallback
if not isinstance(contract_data, dict):
errors.append("Contract data must be a dictionary")
elif "schema_version" not in contract_data or "exports" not in contract_data:
errors.append("Contract data missing required keys: schema_version, exports")
# 3. Load raw workbook mappings to find registered sheets
mapped_sheets = set()
mapping_path = ROOT / "spec" / "14_raw_workbook_mapping.yaml"
snapshot_path = ROOT / "spec" / "15_account_snapshot_contract.yaml"
if mapping_path.exists():
try:
mapping_data = yaml.safe_load(mapping_path.read_text(encoding="utf-8")) or {}
# Required sheets
required = mapping_data.get("raw_workbook", {}).get("required_sheets", {})
mapped_sheets.update(required.keys())
# Support sheets
support = mapping_data.get("raw_workbook", {}).get("sheet_diet_policy", {}).get("keep", {}).get("support", [])
mapped_sheets.update(support)
print(f"DEBUG: Mapped sheets loaded: {sorted(mapped_sheets)}")
# Deprecated sheets
deprecated = mapping_data.get("raw_workbook", {}).get("sheet_diet_policy", {}).get("keep", {}).get("deprecated", [])
mapped_sheets.update(deprecated)
# Transient sheets
transient = mapping_data.get("raw_workbook", {}).get("sheet_diet_policy", {}).get("delete", {}).get("transient_after_complete", [])
mapped_sheets.update(transient)
# Additional keys from required_sheets
if "required_sheets" in mapping_data.get("raw_workbook", {}):
mapped_sheets.update(mapping_data["raw_workbook"]["required_sheets"].keys())
except Exception as e:
errors.append(f"Failed to parse raw workbook mapping: {e}")
if snapshot_path.exists():
mapped_sheets.add("account_snapshot")
mapped_sheets.add("settings")
mapped_sheets.add("cs_chunk_N")
# 4. Scan Apps Script files for sheets accessed
func_pattern = re.compile(r"function\s+([A-Za-z0-9_$]+)\s*\(([^)]*)\)\s*\{")
sheet_pattern = re.compile(r"getSheetByName\s*\(\s*['\"]([^'\"]+)['\"]\s*\)")
sheet_var_pattern = re.compile(r"getSheetByName\s*\(\s*([A-Za-z0-9_$]+)\s*\)")
code_accesses = []
gas_files = collect_gas_files()
for path in gas_files:
content = path.read_text(encoding="utf-8", errors="ignore")
lines = content.splitlines()
current_func = None
in_func = False
brace_count = 0
for i, line in enumerate(lines, 1):
m = func_pattern.search(line)
if m:
current_func = m.group(1)
brace_count = line.count("{") - line.count("}")
in_func = True
continue
if in_func:
brace_count += line.count("{") - line.count("}")
if brace_count <= 0:
in_func = False
if current_func:
sm = sheet_pattern.findall(line)
for sname in sm:
# Map _installCompat_ inline helpers
resolved_func = current_func
if current_func == "_installCompat_":
if sname == "settings":
resolved_func = "readSettingsTab_"
elif sname == "performance":
resolved_func = "readPerformanceSheet_"
code_accesses.append({
"file": path.name,
"function": resolved_func,
"sheet": sname,
"line": i
})
svm = sheet_var_pattern.findall(line)
for svar in svm:
resolved_sheet = None
if svar == "SETTINGS_SHEET_NAME":
resolved_sheet = "settings"
elif svar == "DATA_FEED_SHEET_NAME":
resolved_sheet = "data_feed"
elif svar == "AS_SHEET_NAME":
resolved_sheet = "account_snapshot"
elif svar == "SHEET_NAME":
resolved_sheet = "universe"
elif svar == "sheetName":
resolved_sheet = "core_satellite"
if resolved_sheet:
code_accesses.append({
"file": path.name,
"function": current_func,
"sheet": resolved_sheet,
"line": i
})
# 5. Extract exports from contract (group by function name)
contract_exports = contract_data.get("exports", [])
contract_map = {}
for item in contract_exports:
contract_map.setdefault(item["function_name"], []).append(item)
unmapped_reads = 0
unmapped_writes = 0
drifts = set()
# 6. Verify each sheet access in code
for access in code_accesses:
func = access["function"]
sheet = access["sheet"]
if func in IGNORE_FUNCTIONS:
continue
# Check if function is in contract
if func not in contract_map:
print(f"DEBUG: Unmapped function '{func}' in '{access['file']}:{access['line']}' accessing sheet '{sheet}'")
if is_write_function(func):
unmapped_writes += 1
else:
unmapped_reads += 1
else:
# Check if the accessed sheet matches any declared sheet_key for the function
matched = any(exp["sheet_key"] == sheet for exp in contract_map[func])
if not matched:
print(f"DEBUG: Mismatch in function '{func}' - declared keys {[e['sheet_key'] for e in contract_map[func]]}, found '{sheet}'")
if is_write_function(func):
unmapped_writes += 1
else:
unmapped_reads += 1
# Check if the accessed sheet is in workbook mappings
if sheet not in mapped_sheets:
print(f"DEBUG: Drift - sheet '{sheet}' accessed by '{func}' not in workbook mapping contract")
drifts.add(sheet)
# Check if any sheet key in contract is not in workbook mappings
for item in contract_exports:
skey = item["sheet_key"]
if skey not in mapped_sheets:
print(f"DEBUG: Drift - contract sheet_key '{skey}' not in workbook mapping contract")
drifts.add(skey)
sheet_contract_drift_count = len(drifts)
print(f"DEBUG: Total drifts: {drifts}")
# Determine gate result
gate_passed = (
unmapped_reads == 0 and
unmapped_writes == 0 and
sheet_contract_drift_count == 0 and
not errors
)
result = {
"formula_id": "GAS_ADAPTER_CONTRACT_VALIDATOR_V1",
"unmapped_gas_read_count": unmapped_reads,
"unmapped_gas_write_count": unmapped_writes,
"sheet_contract_drift_count": sheet_contract_drift_count,
"errors": errors,
"gate": "PASS" if gate_passed else "FAIL"
}
# Write output packet
out_dir = ROOT / "Temp"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / "gas_adapter_contract_validation_v1.json"
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=True, indent=2))
return 0 if gate_passed else 1
if __name__ == "__main__":
sys.exit(main())