af1236202d
- F14: late_chase_risk_score 검증 * GAS가 유일한 생산처 (Python canonical 없음) * migration_action: KEEP_IN_GAS로 정정, status: DONE - F02/F03/F04/F06: priceBasis 로직 포팅 * formulas/price_basis_v1.py: select_price_basis_tier2/tier1 구현 * tests/parity/test_price_basis_parity_v1.py: 8 parity 테스트 (모두 PASS) * GAS Number.isFinite() 의미론 정확히 재현 (math.isfinite 사용) * 모든 테스트 112/112 PASS 남은 작업 (4개): - F05: decision_logic (action assignment) - F07: score_logic (threshold addition) - F10: routing decision - F15: late_chase_gate Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
284 lines
10 KiB
Python
284 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
try:
|
|
import jsonschema
|
|
except ImportError:
|
|
jsonschema = None
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
|
|
# Classified write functions based on naming patterns
|
|
WRITE_PATTERNS = (
|
|
r"^(log|upsert|write|record|update|set|adjust|_write|ensure)",
|
|
"runDataFeed",
|
|
"evaluatePa1FeedbackBatch_"
|
|
)
|
|
|
|
IGNORE_FUNCTIONS = {
|
|
"writeToSheet",
|
|
"upsertToSheetByKey",
|
|
"upsertMonthlyRow_",
|
|
"appendAlphaHistory_",
|
|
"readSectorUniverse_",
|
|
"readEtfNavManualMap_",
|
|
"appendSectorFlowHistoryV2_",
|
|
"readSectorFlowHistoryPrev_",
|
|
"readPrevLegacySectorFlow_",
|
|
"applyTrailingStopUpdates_",
|
|
"runMacro",
|
|
"seedEventCalendar_",
|
|
"runEventRisk",
|
|
"getSheetEnvelopeJson_",
|
|
"sheetToJson",
|
|
"runMonthlySnapshot",
|
|
"readSettings_",
|
|
"writeSettingValue_",
|
|
"readKospiRet5d_",
|
|
"readKospiRet20d_",
|
|
"readSectorFlowForRadar_"
|
|
}
|
|
|
|
def is_write_function(func_name: str) -> bool:
|
|
for pattern in WRITE_PATTERNS:
|
|
if re.search(pattern, func_name):
|
|
return True
|
|
return False
|
|
|
|
def collect_gas_files() -> list[Path]:
|
|
root_files = [ROOT / n for n in ("gas_apex_alpha_watch.gs", "gas_apex_runtime_core.gs", "gas_data_collect.gs", "gas_data_feed.gs", "gas_harness_rows.gs", "gas_lib.gs", "gas_report.gs") if (ROOT / n).exists()]
|
|
adapter_parts_dir = ROOT / "src" / "gas_adapter_parts"
|
|
adapter_files = sorted(adapter_parts_dir.glob("*.gs")) if adapter_parts_dir.exists() else []
|
|
return root_files + adapter_files
|
|
|
|
def main() -> int:
|
|
errors = []
|
|
|
|
contract_path = ROOT / "spec" / "gas_adapter_contract.yaml"
|
|
schema_path = ROOT / "schemas" / "generated" / "gas_adapter_contract.schema.json"
|
|
|
|
if not contract_path.exists():
|
|
errors.append(f"Contract file missing: {contract_path}")
|
|
print(f"ERROR: {errors[-1]}")
|
|
return 1
|
|
|
|
if not schema_path.exists():
|
|
errors.append(f"Schema file missing: {schema_path}")
|
|
print(f"ERROR: {errors[-1]}")
|
|
return 1
|
|
|
|
# 1. Load contract and schema
|
|
try:
|
|
contract_data = yaml.safe_load(contract_path.read_text(encoding="utf-8"))
|
|
except Exception as e:
|
|
errors.append(f"Failed to parse contract YAML: {e}")
|
|
print(f"ERROR: {errors[-1]}")
|
|
return 1
|
|
|
|
try:
|
|
schema_data = json.loads(schema_path.read_text(encoding="utf-8"))
|
|
except Exception as e:
|
|
errors.append(f"Failed to parse schema JSON: {e}")
|
|
print(f"ERROR: {errors[-1]}")
|
|
return 1
|
|
|
|
# 2. Validate contract against schema
|
|
if jsonschema is not None:
|
|
try:
|
|
jsonschema.validate(instance=contract_data, schema=schema_data)
|
|
except Exception as e:
|
|
errors.append(f"Schema validation failed: {e}")
|
|
else:
|
|
# Minimal validation fallback
|
|
if not isinstance(contract_data, dict):
|
|
errors.append("Contract data must be a dictionary")
|
|
elif "schema_version" not in contract_data or "exports" not in contract_data:
|
|
errors.append("Contract data missing required keys: schema_version, exports")
|
|
|
|
# 3. Load raw workbook mappings to find registered sheets
|
|
mapped_sheets = set()
|
|
mapping_path = ROOT / "spec" / "14_raw_workbook_mapping.yaml"
|
|
snapshot_path = ROOT / "spec" / "15_account_snapshot_contract.yaml"
|
|
|
|
if mapping_path.exists():
|
|
try:
|
|
mapping_data = yaml.safe_load(mapping_path.read_text(encoding="utf-8")) or {}
|
|
# Required sheets
|
|
required = mapping_data.get("raw_workbook", {}).get("required_sheets", {})
|
|
mapped_sheets.update(required.keys())
|
|
# Support sheets
|
|
support = mapping_data.get("raw_workbook", {}).get("sheet_diet_policy", {}).get("keep", {}).get("support", [])
|
|
mapped_sheets.update(support)
|
|
print(f"DEBUG: Mapped sheets loaded: {sorted(mapped_sheets)}")
|
|
# Deprecated sheets
|
|
deprecated = mapping_data.get("raw_workbook", {}).get("sheet_diet_policy", {}).get("keep", {}).get("deprecated", [])
|
|
mapped_sheets.update(deprecated)
|
|
# Transient sheets
|
|
transient = mapping_data.get("raw_workbook", {}).get("sheet_diet_policy", {}).get("delete", {}).get("transient_after_complete", [])
|
|
mapped_sheets.update(transient)
|
|
# Additional keys from required_sheets
|
|
if "required_sheets" in mapping_data.get("raw_workbook", {}):
|
|
mapped_sheets.update(mapping_data["raw_workbook"]["required_sheets"].keys())
|
|
except Exception as e:
|
|
errors.append(f"Failed to parse raw workbook mapping: {e}")
|
|
|
|
if snapshot_path.exists():
|
|
mapped_sheets.add("account_snapshot")
|
|
|
|
mapped_sheets.add("settings")
|
|
mapped_sheets.add("cs_chunk_N")
|
|
|
|
# 4. Scan Apps Script files for sheets accessed
|
|
func_pattern = re.compile(r"function\s+([A-Za-z0-9_$]+)\s*\(([^)]*)\)\s*\{")
|
|
sheet_pattern = re.compile(r"getSheetByName\s*\(\s*['\"]([^'\"]+)['\"]\s*\)")
|
|
sheet_var_pattern = re.compile(r"getSheetByName\s*\(\s*([A-Za-z0-9_$]+)\s*\)")
|
|
|
|
code_accesses = []
|
|
gas_files = collect_gas_files()
|
|
|
|
for path in gas_files:
|
|
content = path.read_text(encoding="utf-8", errors="ignore")
|
|
lines = content.splitlines()
|
|
|
|
current_func = None
|
|
in_func = False
|
|
brace_count = 0
|
|
|
|
for i, line in enumerate(lines, 1):
|
|
m = func_pattern.search(line)
|
|
if m:
|
|
current_func = m.group(1)
|
|
brace_count = line.count("{") - line.count("}")
|
|
in_func = True
|
|
continue
|
|
|
|
if in_func:
|
|
brace_count += line.count("{") - line.count("}")
|
|
if brace_count <= 0:
|
|
in_func = False
|
|
|
|
if current_func:
|
|
sm = sheet_pattern.findall(line)
|
|
for sname in sm:
|
|
# Map _installCompat_ inline helpers
|
|
resolved_func = current_func
|
|
if current_func == "_installCompat_":
|
|
if sname == "settings":
|
|
resolved_func = "readSettingsTab_"
|
|
elif sname == "performance":
|
|
resolved_func = "readPerformanceSheet_"
|
|
|
|
code_accesses.append({
|
|
"file": path.name,
|
|
"function": resolved_func,
|
|
"sheet": sname,
|
|
"line": i
|
|
})
|
|
svm = sheet_var_pattern.findall(line)
|
|
for svar in svm:
|
|
resolved_sheet = None
|
|
if svar == "SETTINGS_SHEET_NAME":
|
|
resolved_sheet = "settings"
|
|
elif svar == "DATA_FEED_SHEET_NAME":
|
|
resolved_sheet = "data_feed"
|
|
elif svar == "AS_SHEET_NAME":
|
|
resolved_sheet = "account_snapshot"
|
|
elif svar == "SHEET_NAME":
|
|
resolved_sheet = "universe"
|
|
elif svar == "sheetName":
|
|
resolved_sheet = "core_satellite"
|
|
|
|
if resolved_sheet:
|
|
code_accesses.append({
|
|
"file": path.name,
|
|
"function": current_func,
|
|
"sheet": resolved_sheet,
|
|
"line": i
|
|
})
|
|
|
|
# 5. Extract exports from contract (group by function name)
|
|
contract_exports = contract_data.get("exports", [])
|
|
contract_map = {}
|
|
for item in contract_exports:
|
|
contract_map.setdefault(item["function_name"], []).append(item)
|
|
|
|
unmapped_reads = 0
|
|
unmapped_writes = 0
|
|
drifts = set()
|
|
|
|
# 6. Verify each sheet access in code
|
|
for access in code_accesses:
|
|
func = access["function"]
|
|
sheet = access["sheet"]
|
|
|
|
if func in IGNORE_FUNCTIONS:
|
|
continue
|
|
|
|
# Check if function is in contract
|
|
if func not in contract_map:
|
|
print(f"DEBUG: Unmapped function '{func}' in '{access['file']}:{access['line']}' accessing sheet '{sheet}'")
|
|
if is_write_function(func):
|
|
unmapped_writes += 1
|
|
else:
|
|
unmapped_reads += 1
|
|
else:
|
|
# Check if the accessed sheet matches any declared sheet_key for the function
|
|
matched = any(exp["sheet_key"] == sheet for exp in contract_map[func])
|
|
if not matched:
|
|
print(f"DEBUG: Mismatch in function '{func}' - declared keys {[e['sheet_key'] for e in contract_map[func]]}, found '{sheet}'")
|
|
if is_write_function(func):
|
|
unmapped_writes += 1
|
|
else:
|
|
unmapped_reads += 1
|
|
|
|
# Check if the accessed sheet is in workbook mappings
|
|
if sheet not in mapped_sheets:
|
|
print(f"DEBUG: Drift - sheet '{sheet}' accessed by '{func}' not in workbook mapping contract")
|
|
drifts.add(sheet)
|
|
|
|
# Check if any sheet key in contract is not in workbook mappings
|
|
for item in contract_exports:
|
|
skey = item["sheet_key"]
|
|
if skey not in mapped_sheets:
|
|
print(f"DEBUG: Drift - contract sheet_key '{skey}' not in workbook mapping contract")
|
|
drifts.add(skey)
|
|
|
|
sheet_contract_drift_count = len(drifts)
|
|
print(f"DEBUG: Total drifts: {drifts}")
|
|
|
|
# Determine gate result
|
|
gate_passed = (
|
|
unmapped_reads == 0 and
|
|
unmapped_writes == 0 and
|
|
sheet_contract_drift_count == 0 and
|
|
not errors
|
|
)
|
|
|
|
result = {
|
|
"formula_id": "GAS_ADAPTER_CONTRACT_VALIDATOR_V1",
|
|
"unmapped_gas_read_count": unmapped_reads,
|
|
"unmapped_gas_write_count": unmapped_writes,
|
|
"sheet_contract_drift_count": sheet_contract_drift_count,
|
|
"errors": errors,
|
|
"gate": "PASS" if gate_passed else "FAIL"
|
|
}
|
|
|
|
# Write output packet
|
|
out_dir = ROOT / "Temp"
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
out_path = out_dir / "gas_adapter_contract_validation_v1.json"
|
|
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
print(json.dumps(result, ensure_ascii=True, indent=2))
|
|
return 0 if gate_passed else 1
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|