""" GAS_THIN_ADAPTER_POLICY_V1 — Phase 1: Inventory spec/39_gas_thin_adapter_policy.yaml의 inventory 단계. src/gas_adapter_parts/*.gs 내 모든 함수를 스캔하여 allowed/forbidden/mixed/unknown으로 분류. 결과를 Temp/gas_business_logic_audit_v1.json에 저장. """ import re import json import sys import os from pathlib import Path ROOT = Path(__file__).parent.parent # ── 분류 키워드 ───────────────────────────────────────────────────────────── # forbidden_responsibilities: decision / sizing / stop_loss / take_profit / risk_score FORBIDDEN_PATTERNS = [ # sizing r"\bcalcQuantities_\b", r"\bposition_size\b", r"\blimit_price_est\b", r"\blimit_price_calc\b", r"\bqty_ladder\b", r"\btp_qty\b", r"POSITION_SIZE_V1", r"calcPrices_\(", # stop_loss r"\bstop_loss\b", r"\bstop_price\b", r"STOP_PRICE_CORE", r"calcStopPrice", r"STOP_BREACH_ALERT", # take_profit r"\btake_profit\b", r"\btp1_price\b", r"\btp2_price\b", r"TAKE_PROFIT_LADDER", r"TP_TRIGGER_ALERT", r"calcTpQuantityLadder_", r"calcProfitPreservationRow_", r"calcCashPreservationSellEngineV2_", # risk_score r"\brisk_score\b", r"\briskScore\b", r"RISK_SCORE", # decision r"validation_status\s*[=:]\s*['\"]PASS['\"]", r"validation_status\s*[=:]\s*['\"]BLOCK", r"\bENTRY_ALLOWED\b", r"\bBUY_ALLOWED\b", r"\border_type\s*[=:]\s*['\"]BUY['\"]", r"\border_type\s*[=:]\s*['\"]SELL['\"]", r"POSITION_SIZE_V1", r"STOP_PRICE_CORE_V1", r"calcExitSellAction_", r"calcSmartCashRaiseV2_", r"calcCashPreservationPlan_", r"calcCashShortfallHarness_", r"calcCoreSatelliteExecutionState_", r"calcMandatoryReductionPlan_", r"calcDistributionRiskRow_", r"calcSellConflictScore_", r"calcBreakoutQualityGate_", r"calcEntryTimingSignal_", r"calcAntiLateEntryGateV2_", r"applyAlegGate4And5_", r"calcSmartCashRecoverySell_", # sizing helpers with calculation output r"\bbase_qty\b", r"\bsell_qty\s*=", r"\bbuy_qty\s*=", r"\bsuggested_sell_qty\b", ] # allowed_responsibilities: collect / normalize / export / display ALLOWED_PATTERNS = [ # collect r"UrlFetchApp\.fetch\b", r"\bwithFetchCache_\b", r"\bgetCachedFetchResult_\b", r"\bsetCachedFetchResult_\b", r"fetchNaver", r"fetchYahoo", r"\bisFetchCircuitOpen_\b", r"\brecordFetchSuccess_\b", r"\bconsumeFetchBudget_\b", # normalize r"\bnormalizeTickerCode\b", r"\bnormalizeYahooSymbol\b", r"\bparseKrNum_\b", r"\btoNumber_\b", r"\bformatIso_\b", r"\bparseIsoDateYmd_\b", # export r"\bgetSpreadsheet_\b", r"\bss\.getSheetByName\b", r"\.getDataRange\(\)\.getValues\(\)", r"\.setValues\b", r"\.appendRow\b", r"\.setValue\b", r"\bSpreadsheetApp\b", r"\bsheetToJson\b", r"\bupsertToSheetByKey\b", # display r"\bLogger\.log\b", r"\bUtilities\.formatDate\b", r"\bPropertiesService\b", r"\bCacheService\b", r"setProperty\b", r"getProperty\b", ] # 예외 (spec exceptions): 이름 기반 — 보고서 렌더링/데이터 수집 헬퍼 EXCEPTION_PATTERNS = [ r"^render", r"^format", r"^parse", r"^normalize", r"^fetch", r"^get[A-Z]", r"^set[A-Z]", r"^read", r"^summarize", r"^annotate", r"^cache", r"^beginFetch", r"^clearFetch", r"^record", r"_fallback_$", r"Compat", ] def extract_functions(source: str) -> list[dict]: """GAS 소스에서 function 정의를 추출한다. 중첩 함수 미지원(GAS 최상위 함수만).""" results = [] # function 시그니처: function name_(args) { sig_re = re.compile( r'^function\s+([A-Za-z_$][A-Za-z0-9_$]*)\s*\([^)]*\)\s*\{', re.MULTILINE ) lines = source.split('\n') line_starts = [] offset = 0 for ln in lines: line_starts.append(offset) offset += len(ln) + 1 for m in sig_re.finditer(source): func_name = m.group(1) brace_start = m.end() - 1 # position of opening { # 괄호 카운팅으로 함수 끝 찾기 depth = 0 pos = brace_start body_end = len(source) for i in range(brace_start, len(source)): if source[i] == '{': depth += 1 elif source[i] == '}': depth -= 1 if depth == 0: body_end = i + 1 break body = source[brace_start:body_end] # 함수 시작 행 번호 (1-based) start_line = sum(1 for ls in line_starts if ls <= m.start()) results.append({ 'name': func_name, 'start_line': start_line, 'body': body, }) return results def classify_function(name: str, body: str) -> str: """allowed / forbidden / mixed / unknown 중 하나를 반환.""" is_exception = any(re.search(p, name) for p in EXCEPTION_PATTERNS) forbidden_hits = [p for p in FORBIDDEN_PATTERNS if re.search(p, body)] allowed_hits = [p for p in ALLOWED_PATTERNS if re.search(p, body)] # 예외 패턴에 해당하는 함수는 forbidden hit이 있어도 mixed로 처리 (display/collect 겸용 가능) if is_exception and forbidden_hits: return 'mixed' if forbidden_hits and allowed_hits: return 'mixed' if forbidden_hits: return 'forbidden' if allowed_hits: return 'allowed' return 'unknown' def classify_function_detail(name: str, body: str) -> dict: is_exception = any(re.search(p, name) for p in EXCEPTION_PATTERNS) forbidden_hits = [p for p in FORBIDDEN_PATTERNS if re.search(p, body)] allowed_hits = [p for p in ALLOWED_PATTERNS if re.search(p, body)] classification = classify_function(name, body) # 어떤 responsibility 범주인지 추정 responsibility = [] if re.search(r"UrlFetchApp|fetchNaver|fetchYahoo|withFetchCache_|fetch_", body): responsibility.append('collect') if re.search(r"normalize|parseKrNum_|toNumber_|formatIso_", body): responsibility.append('normalize') if re.search(r"getSpreadsheet_|setValues|appendRow|setValue|sheetToJson|upsertToSheet", body): responsibility.append('export') if re.search(r"Logger\.log|Utilities\.formatDate|PropertiesService|CacheService", body): responsibility.append('display') if forbidden_hits: if re.search(r"stop_loss|stop_price|STOP_PRICE|calcStopPrice|STOP_BREACH", str(forbidden_hits)): responsibility.append('stop_loss') if re.search(r"take_profit|tp1_price|tp2_price|TAKE_PROFIT|calcTpQ", str(forbidden_hits)): responsibility.append('take_profit') if re.search(r"risk_score|riskScore|RISK_SCORE", str(forbidden_hits)): responsibility.append('risk_score') if re.search(r"qty|size|Quantities|sizing", str(forbidden_hits)): responsibility.append('sizing') if re.search(r"decision|order_type|validation_status|ENTRY_ALLOWED|BUY_ALLOWED|Exit|Sell.*Action|calc.*Gate|calc.*Guard", str(forbidden_hits)): responsibility.append('decision') return { 'classification': classification, 'responsibility': list(set(responsibility)) if responsibility else ['unknown'], 'is_exception': is_exception, 'forbidden_hit_count': len(forbidden_hits), 'allowed_hit_count': len(allowed_hits), } def audit_file(gs_path: Path) -> dict: source = gs_path.read_text(encoding='utf-8', errors='replace') functions = extract_functions(source) per_function = [] counts = {'allowed': 0, 'forbidden': 0, 'mixed': 0, 'unknown': 0} for fn in functions: detail = classify_function_detail(fn['name'], fn['body']) cls = detail['classification'] counts[cls] += 1 per_function.append({ 'name': fn['name'], 'start_line': fn['start_line'], **detail, }) return { 'file': gs_path.name, 'total_functions': len(functions), 'counts': counts, 'functions': per_function, } def main(): gas_dir = ROOT / 'src' / 'gas_adapter_parts' gs_files = sorted(gas_dir.glob('*.gs')) if not gs_files: print("ERROR: src/gas_adapter_parts/*.gs 파일 없음", file=sys.stderr) sys.exit(1) all_files = [] global_counts = {'allowed': 0, 'forbidden': 0, 'mixed': 0, 'unknown': 0} forbidden_funcs = [] for gf in gs_files: result = audit_file(gf) all_files.append(result) for k, v in result['counts'].items(): global_counts[k] += v for fn in result['functions']: if fn['classification'] in ('forbidden', 'mixed'): forbidden_funcs.append({ 'file': result['file'], 'name': fn['name'], 'classification': fn['classification'], 'responsibility': fn['responsibility'], }) total = sum(global_counts.values()) forbidden_total = global_counts['forbidden'] + global_counts['mixed'] allowed_total = global_counts['allowed'] migration_candidates = [f for f in forbidden_funcs if f['classification'] == 'forbidden'] output = { 'policy_id': 'GAS_THIN_ADAPTER_POLICY_V1', 'phase': 'inventory', 'generated_at': __import__('datetime').datetime.now().isoformat(), 'summary': { 'total_functions': total, 'allowed_count': global_counts['allowed'], 'forbidden_count': global_counts['forbidden'], 'mixed_count': global_counts['mixed'], 'unknown_count': global_counts['unknown'], 'forbidden_total': forbidden_total, 'migration_candidate_count': len(migration_candidates), 'compliance_pct': round(allowed_total / total * 100, 1) if total > 0 else 0, }, 'migration_candidates': migration_candidates, 'per_file': all_files, } out_path = ROOT / 'Temp' / 'gas_business_logic_audit_v1.json' out_path.parent.mkdir(exist_ok=True) out_path.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding='utf-8') print(f"=== GAS_THIN_ADAPTER_POLICY_V1 Phase-1 Inventory ===") print(f"파일 수 : {len(gs_files)}") print(f"총 함수 수 : {total}") print(f" allowed : {global_counts['allowed']}") print(f" forbidden : {global_counts['forbidden']}") print(f" mixed : {global_counts['mixed']}") print(f" unknown : {global_counts['unknown']}") print(f"compliance_pct : {output['summary']['compliance_pct']}%") print(f"이전 후보 : {len(migration_candidates)}개") print() print("── 이전 후보 (forbidden) ──") for f in migration_candidates[:30]: print(f" [{f['file']}] {f['name']} {f['responsibility']}") if len(migration_candidates) > 30: print(f" ... 외 {len(migration_candidates)-30}개") print() print(f"출력: {out_path}") # CI가 아니면 forbidden 존재 시 exit 0으로 성공 (audit-only 모드) return 0 if __name__ == '__main__': sys.exit(main())