d35e8311d6
audit_gas_thin_adapter_v1.py(Phase-1) 출력 형식(summary.forbidden_count, summary.total_functions)을 올바르게 읽도록 elif audit 브랜치 수정. 수정 전: forbidden=0, coverage=0.0 → gate=FAIL 수정 후: forbidden=23, coverage=100.0, migration_plan=true → gate=PASS Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
169 lines
7.2 KiB
Python
169 lines
7.2 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
from refactor_master_helpers import ROOT, collect_gas_files, read_text
|
|
try:
|
|
from audit_gas_business_logic_v1 import _classify, _function_bodies
|
|
except Exception: # pragma: no cover - fallback for isolated execution
|
|
_classify = None
|
|
_function_bodies = None
|
|
|
|
|
|
ALLOWLIST = ("collect", "normalize", "export", "display")
|
|
FORBIDDEN = ("decision", "sizing", "stop_loss", "take_profit", "risk_score")
|
|
|
|
# Non-violation classifications — lines in these categories are not forbidden.
|
|
_ADAPTER_OK_CLASSES = {"ADAPTER_OK", "COMMENT_FALSE_POSITIVE"}
|
|
|
|
|
|
# P5-T02: gas_data_feed.gs split mapping (original_line → new_file, new_line)
|
|
_GDF_SPLITS = [
|
|
(1, 2347, "gdf_01_price_metrics.gs", 0),
|
|
(2348, 4560, "gdf_02_harness_assembly.gs", 2347),
|
|
(4561, 6806, "gdf_03_portfolio_gates.gs", 4560),
|
|
(6807, 9015, "gdf_04_execution_quality.gs", 6806),
|
|
(9016, 10302, "gdf_05_alpha_engines.gs", 9015),
|
|
]
|
|
_GDC_SPLITS = [
|
|
(1, 2405, "gdc_01_fetch_fundamentals.gs", 0),
|
|
(2406, 4460, "gdc_02_account_satellite.gs", 2405),
|
|
]
|
|
|
|
|
|
def _remap_line(fname: str, lineno: int) -> list[tuple[str, int]]:
|
|
"""Return all (file, lineno) pairs that represent the given original location.
|
|
|
|
After P5-T02 file split, a line originally in gas_data_feed.gs or
|
|
gas_data_collect.gs may now live in a split adapter part file with an
|
|
offset line number. Returns both the original location and the new one.
|
|
"""
|
|
results = [(fname, lineno)]
|
|
if fname == "gas_data_feed.gs":
|
|
for lo, hi, new_file, offset in _GDF_SPLITS:
|
|
if lo <= lineno <= hi:
|
|
results.append((new_file, lineno - offset))
|
|
break
|
|
elif fname == "gas_data_collect.gs":
|
|
for lo, hi, new_file, offset in _GDC_SPLITS:
|
|
if lo <= lineno <= hi:
|
|
results.append((new_file, lineno - offset))
|
|
break
|
|
return results
|
|
|
|
|
|
def _load_classified_allowlist() -> set[tuple[str, int]]:
|
|
"""Load (file, lineno) pairs classified as ADAPTER_OK or COMMENT_FALSE_POSITIVE.
|
|
|
|
Reads runtime/gas_migration_wave1.yaml and runtime/gas_migration_wave2_4.yaml.
|
|
After P5-T02 file split, original line numbers are remapped to adapter part files.
|
|
"""
|
|
allowlist: set[tuple[str, int]] = set()
|
|
wave_files = [
|
|
ROOT / "runtime" / "gas_migration_wave1.yaml",
|
|
ROOT / "runtime" / "gas_migration_wave2_4.yaml",
|
|
]
|
|
for wf in wave_files:
|
|
if not wf.exists():
|
|
continue
|
|
try:
|
|
data = yaml.safe_load(wf.read_text(encoding="utf-8")) or {}
|
|
except Exception:
|
|
continue
|
|
for item in data.get("wave1_items", []) + data.get("wave_items", []):
|
|
cls = item.get("classification", "")
|
|
if cls in _ADAPTER_OK_CLASSES:
|
|
fname = item.get("file", "")
|
|
lineno = item.get("line")
|
|
if fname and isinstance(lineno, int):
|
|
for mapped_fname, mapped_line in _remap_line(fname, lineno):
|
|
allowlist.add((mapped_fname, mapped_line))
|
|
return allowlist
|
|
|
|
|
|
def main() -> int:
|
|
policy_path = ROOT / "spec" / "39_gas_thin_adapter_policy.yaml"
|
|
migration_plan_exists = False
|
|
if policy_path.exists():
|
|
policy = yaml.safe_load(policy_path.read_text(encoding="utf-8")) or {}
|
|
migration_plan = policy.get("migration_plan")
|
|
migration_plan_exists = isinstance(migration_plan, dict) and bool(migration_plan.get("phases"))
|
|
audit_path = ROOT / "Temp" / "gas_business_logic_audit_v1.json"
|
|
audit = {}
|
|
if audit_path.exists():
|
|
try:
|
|
audit = json.loads(audit_path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
audit = {}
|
|
if not audit and _function_bodies and _classify:
|
|
rows = []
|
|
for path in collect_gas_files():
|
|
for name, line, body in _function_bodies(path):
|
|
allowed_responsibility, matched_tokens = _classify(name, body, path.name)
|
|
rows.append(
|
|
{
|
|
"file": str(path.relative_to(ROOT)),
|
|
"name": name,
|
|
"line": line,
|
|
"allowed_responsibility": allowed_responsibility,
|
|
"matched_tokens": matched_tokens,
|
|
}
|
|
)
|
|
forbidden_function_count = sum(1 for row in rows if row["allowed_responsibility"] == "forbidden")
|
|
function_inventory_coverage_pct = 100.0 if rows else 0.0
|
|
sample_findings = rows[:200]
|
|
elif audit:
|
|
# audit_gas_thin_adapter_v1.py (v2) 출력 형식 지원:
|
|
# summary.forbidden_count, summary.compliance_pct, migration_candidates
|
|
summary = audit.get("summary", {})
|
|
if summary:
|
|
forbidden_function_count = int(summary.get("forbidden_count") or 0)
|
|
total_functions = int(summary.get("total_functions") or 0)
|
|
function_inventory_coverage_pct = 100.0 if total_functions > 0 else 0.0
|
|
candidates = audit.get("migration_candidates", [])
|
|
sample_findings = [
|
|
{"file": c.get("file",""), "name": c.get("name",""),
|
|
"responsibility": c.get("responsibility",[])}
|
|
for c in candidates[:200]
|
|
]
|
|
else:
|
|
forbidden_function_count = int(audit.get("forbidden_function_count") or 0)
|
|
function_inventory_coverage_pct = float(audit.get("function_inventory_coverage_pct") or 0.0)
|
|
sample_findings = audit.get("rows")[:200] if isinstance(audit.get("rows"), list) else []
|
|
else:
|
|
classified_allowlist = _load_classified_allowlist()
|
|
findings: list[dict[str, str]] = []
|
|
for path in collect_gas_files():
|
|
text = read_text(path)
|
|
rel_name = path.name # scanner uses filename only (not relative path)
|
|
for lineno, line in enumerate(text.splitlines(), start=1):
|
|
low = line.lower()
|
|
if not any(word in low for word in FORBIDDEN):
|
|
continue
|
|
# Skip lines confirmed as ADAPTER_OK or COMMENT_FALSE_POSITIVE in wave YAMLs.
|
|
if (rel_name, lineno) in classified_allowlist:
|
|
continue
|
|
findings.append({"file": str(path.relative_to(ROOT)), "line": str(lineno), "text": line.strip()})
|
|
forbidden_function_count = len(findings)
|
|
function_inventory_coverage_pct = 0.0 if not findings else 100.0
|
|
sample_findings = findings[:200]
|
|
result = {
|
|
"formula_id": "GAS_THIN_ADAPTER_V1",
|
|
"forbidden_gas_business_logic_count": forbidden_function_count,
|
|
"function_inventory_coverage_pct": function_inventory_coverage_pct,
|
|
"migration_plan_exists": migration_plan_exists,
|
|
"findings": sample_findings,
|
|
"gate": "PASS" if (function_inventory_coverage_pct >= 100.0 and migration_plan_exists) else "FAIL",
|
|
}
|
|
out = ROOT / "Temp" / "gas_thin_adapter_validation_v1.json"
|
|
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(json.dumps(result, ensure_ascii=True, indent=2))
|
|
return 0 if result["gate"] == "PASS" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|