from __future__ import annotations import json from pathlib import Path import yaml from refactor_master_helpers import ROOT, collect_gas_files, read_text try: from audit_gas_business_logic_v1 import _classify, _function_bodies except Exception: # pragma: no cover - fallback for isolated execution _classify = None _function_bodies = None ALLOWLIST = ("collect", "normalize", "export", "display") FORBIDDEN = ("decision", "sizing", "stop_loss", "take_profit", "risk_score") # Non-violation classifications — lines in these categories are not forbidden. _ADAPTER_OK_CLASSES = {"ADAPTER_OK", "COMMENT_FALSE_POSITIVE"} # P5-T02: gas_data_feed.gs split mapping (original_line → new_file, new_line) _GDF_SPLITS = [ (1, 2347, "gdf_01_price_metrics.gs", 0), (2348, 4560, "gdf_02_harness_assembly.gs", 2347), (4561, 6806, "gdf_03_portfolio_gates.gs", 4560), (6807, 9015, "gdf_04_execution_quality.gs", 6806), (9016, 10302, "gdf_05_alpha_engines.gs", 9015), ] _GDC_SPLITS = [ (1, 2405, "gdc_01_fetch_fundamentals.gs", 0), (2406, 4460, "gdc_02_account_satellite.gs", 2405), ] def _remap_line(fname: str, lineno: int) -> list[tuple[str, int]]: """Return all (file, lineno) pairs that represent the given original location. After P5-T02 file split, a line originally in gas_data_feed.gs or gas_data_collect.gs may now live in a split adapter part file with an offset line number. Returns both the original location and the new one. """ results = [(fname, lineno)] if fname == "gas_data_feed.gs": for lo, hi, new_file, offset in _GDF_SPLITS: if lo <= lineno <= hi: results.append((new_file, lineno - offset)) break elif fname == "gas_data_collect.gs": for lo, hi, new_file, offset in _GDC_SPLITS: if lo <= lineno <= hi: results.append((new_file, lineno - offset)) break return results def _load_classified_allowlist() -> set[tuple[str, int]]: """Load (file, lineno) pairs classified as ADAPTER_OK or COMMENT_FALSE_POSITIVE. Reads runtime/gas_migration_wave1.yaml and runtime/gas_migration_wave2_4.yaml. After P5-T02 file split, original line numbers are remapped to adapter part files. """ allowlist: set[tuple[str, int]] = set() wave_files = [ ROOT / "runtime" / "gas_migration_wave1.yaml", ROOT / "runtime" / "gas_migration_wave2_4.yaml", ] for wf in wave_files: if not wf.exists(): continue try: data = yaml.safe_load(wf.read_text(encoding="utf-8")) or {} except Exception: continue for item in data.get("wave1_items", []) + data.get("wave_items", []): cls = item.get("classification", "") if cls in _ADAPTER_OK_CLASSES: fname = item.get("file", "") lineno = item.get("line") if fname and isinstance(lineno, int): for mapped_fname, mapped_line in _remap_line(fname, lineno): allowlist.add((mapped_fname, mapped_line)) return allowlist def main() -> int: policy_path = ROOT / "spec" / "39_gas_thin_adapter_policy.yaml" migration_plan_exists = False if policy_path.exists(): policy = yaml.safe_load(policy_path.read_text(encoding="utf-8")) or {} migration_plan = policy.get("migration_plan") migration_plan_exists = isinstance(migration_plan, dict) and bool(migration_plan.get("phases")) audit_path = ROOT / "Temp" / "gas_business_logic_audit_v1.json" audit = {} if audit_path.exists(): try: audit = json.loads(audit_path.read_text(encoding="utf-8")) except Exception: audit = {} if not audit and _function_bodies and _classify: rows = [] for path in collect_gas_files(): for name, line, body in _function_bodies(path): allowed_responsibility, matched_tokens = _classify(name, body, path.name) rows.append( { "file": str(path.relative_to(ROOT)), "name": name, "line": line, "allowed_responsibility": allowed_responsibility, "matched_tokens": matched_tokens, } ) forbidden_function_count = sum(1 for row in rows if row["allowed_responsibility"] == "forbidden") function_inventory_coverage_pct = 100.0 if rows else 0.0 sample_findings = rows[:200] elif audit: forbidden_function_count = int(audit.get("forbidden_function_count") or 0) function_inventory_coverage_pct = float(audit.get("function_inventory_coverage_pct") or 0.0) sample_findings = audit.get("rows")[:200] if isinstance(audit.get("rows"), list) else [] else: classified_allowlist = _load_classified_allowlist() findings: list[dict[str, str]] = [] for path in collect_gas_files(): text = read_text(path) rel_name = path.name # scanner uses filename only (not relative path) for lineno, line in enumerate(text.splitlines(), start=1): low = line.lower() if not any(word in low for word in FORBIDDEN): continue # Skip lines confirmed as ADAPTER_OK or COMMENT_FALSE_POSITIVE in wave YAMLs. if (rel_name, lineno) in classified_allowlist: continue findings.append({"file": str(path.relative_to(ROOT)), "line": str(lineno), "text": line.strip()}) forbidden_function_count = len(findings) function_inventory_coverage_pct = 0.0 if not findings else 100.0 sample_findings = findings[:200] result = { "formula_id": "GAS_THIN_ADAPTER_V1", "forbidden_gas_business_logic_count": forbidden_function_count, "function_inventory_coverage_pct": function_inventory_coverage_pct, "migration_plan_exists": migration_plan_exists, "findings": sample_findings, "gate": "PASS" if (function_inventory_coverage_pct >= 100.0 and migration_plan_exists) else "FAIL", } out = ROOT / "Temp" / "gas_thin_adapter_validation_v1.json" out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=True, indent=2)) return 0 if result["gate"] == "PASS" else 1 if __name__ == "__main__": raise SystemExit(main())