diff --git a/spec/39_gas_thin_adapter_policy.yaml b/spec/39_gas_thin_adapter_policy.yaml index ca48c1d..275e457 100644 --- a/spec/39_gas_thin_adapter_policy.yaml +++ b/spec/39_gas_thin_adapter_policy.yaml @@ -15,9 +15,20 @@ forbidden_responsibilities: - take_profit - risk_score migration_plan: - status: PLANNED + status: IN_PROGRESS + inventory_result: + tool: tools/audit_gas_thin_adapter_v1.py + output: Temp/gas_business_logic_audit_v1.json + total_functions: 273 + forbidden_count: 23 + mixed_count: 15 + allowed_count: 104 + unknown_count: 131 + compliance_pct: 38.1 + migration_candidate_count: 23 phases: - phase: inventory + status: DONE target: Temp/gas_business_logic_audit_v1.json action: 분류된 GAS 함수 목록을 확정한다. - phase: extract diff --git a/tools/audit_gas_thin_adapter_v1.py b/tools/audit_gas_thin_adapter_v1.py new file mode 100644 index 0000000..9d9372a --- /dev/null +++ b/tools/audit_gas_thin_adapter_v1.py @@ -0,0 +1,285 @@ +""" +GAS_THIN_ADAPTER_POLICY_V1 — Phase 1: Inventory +spec/39_gas_thin_adapter_policy.yaml의 inventory 단계. +src/gas_adapter_parts/*.gs 내 모든 함수를 스캔하여 allowed/forbidden/mixed/unknown으로 분류. +결과를 Temp/gas_business_logic_audit_v1.json에 저장. +""" +import re +import json +import sys +import os +from pathlib import Path + +ROOT = Path(__file__).parent.parent + +# ── 분류 키워드 ───────────────────────────────────────────────────────────── + +# forbidden_responsibilities: decision / sizing / stop_loss / take_profit / risk_score +FORBIDDEN_PATTERNS = [ + # sizing + r"\bcalcQuantities_\b", r"\bposition_size\b", r"\blimit_price_est\b", + r"\blimit_price_calc\b", r"\bqty_ladder\b", r"\btp_qty\b", + r"POSITION_SIZE_V1", r"calcPrices_\(", + # stop_loss + r"\bstop_loss\b", r"\bstop_price\b", r"STOP_PRICE_CORE", + r"calcStopPrice", r"STOP_BREACH_ALERT", + # take_profit + r"\btake_profit\b", r"\btp1_price\b", r"\btp2_price\b", + r"TAKE_PROFIT_LADDER", r"TP_TRIGGER_ALERT", + r"calcTpQuantityLadder_", r"calcProfitPreservationRow_", + r"calcCashPreservationSellEngineV2_", + # risk_score + r"\brisk_score\b", r"\briskScore\b", r"RISK_SCORE", + # decision + r"validation_status\s*[=:]\s*['\"]PASS['\"]", + r"validation_status\s*[=:]\s*['\"]BLOCK", + r"\bENTRY_ALLOWED\b", r"\bBUY_ALLOWED\b", + r"\border_type\s*[=:]\s*['\"]BUY['\"]", + r"\border_type\s*[=:]\s*['\"]SELL['\"]", + r"POSITION_SIZE_V1", r"STOP_PRICE_CORE_V1", + r"calcExitSellAction_", r"calcSmartCashRaiseV2_", + r"calcCashPreservationPlan_", r"calcCashShortfallHarness_", + r"calcCoreSatelliteExecutionState_", + r"calcMandatoryReductionPlan_", + r"calcDistributionRiskRow_", r"calcSellConflictScore_", + r"calcBreakoutQualityGate_", r"calcEntryTimingSignal_", + r"calcAntiLateEntryGateV2_", r"applyAlegGate4And5_", + r"calcSmartCashRecoverySell_", + # sizing helpers with calculation output + r"\bbase_qty\b", r"\bsell_qty\s*=", r"\bbuy_qty\s*=", + r"\bsuggested_sell_qty\b", +] + +# allowed_responsibilities: collect / normalize / export / display +ALLOWED_PATTERNS = [ + # collect + r"UrlFetchApp\.fetch\b", r"\bwithFetchCache_\b", + r"\bgetCachedFetchResult_\b", r"\bsetCachedFetchResult_\b", + r"fetchNaver", r"fetchYahoo", + r"\bisFetchCircuitOpen_\b", r"\brecordFetchSuccess_\b", + r"\bconsumeFetchBudget_\b", + # normalize + r"\bnormalizeTickerCode\b", r"\bnormalizeYahooSymbol\b", + r"\bparseKrNum_\b", r"\btoNumber_\b", + r"\bformatIso_\b", r"\bparseIsoDateYmd_\b", + # export + r"\bgetSpreadsheet_\b", r"\bss\.getSheetByName\b", + r"\.getDataRange\(\)\.getValues\(\)", + r"\.setValues\b", r"\.appendRow\b", r"\.setValue\b", + r"\bSpreadsheetApp\b", r"\bsheetToJson\b", + r"\bupsertToSheetByKey\b", + # display + r"\bLogger\.log\b", r"\bUtilities\.formatDate\b", + r"\bPropertiesService\b", r"\bCacheService\b", + r"setProperty\b", r"getProperty\b", +] + +# 예외 (spec exceptions): 이름 기반 — 보고서 렌더링/데이터 수집 헬퍼 +EXCEPTION_PATTERNS = [ + r"^render", r"^format", r"^parse", r"^normalize", + r"^fetch", r"^get[A-Z]", r"^set[A-Z]", r"^read", + r"^summarize", r"^annotate", r"^cache", + r"^beginFetch", r"^clearFetch", r"^record", + r"_fallback_$", r"Compat", +] + + +def extract_functions(source: str) -> list[dict]: + """GAS 소스에서 function 정의를 추출한다. 중첩 함수 미지원(GAS 최상위 함수만).""" + results = [] + # function 시그니처: function name_(args) { + sig_re = re.compile( + r'^function\s+([A-Za-z_$][A-Za-z0-9_$]*)\s*\([^)]*\)\s*\{', + re.MULTILINE + ) + lines = source.split('\n') + line_starts = [] + offset = 0 + for ln in lines: + line_starts.append(offset) + offset += len(ln) + 1 + + for m in sig_re.finditer(source): + func_name = m.group(1) + brace_start = m.end() - 1 # position of opening { + # 괄호 카운팅으로 함수 끝 찾기 + depth = 0 + pos = brace_start + body_end = len(source) + for i in range(brace_start, len(source)): + if source[i] == '{': + depth += 1 + elif source[i] == '}': + depth -= 1 + if depth == 0: + body_end = i + 1 + break + body = source[brace_start:body_end] + # 함수 시작 행 번호 (1-based) + start_line = sum(1 for ls in line_starts if ls <= m.start()) + results.append({ + 'name': func_name, + 'start_line': start_line, + 'body': body, + }) + return results + + +def classify_function(name: str, body: str) -> str: + """allowed / forbidden / mixed / unknown 중 하나를 반환.""" + is_exception = any(re.search(p, name) for p in EXCEPTION_PATTERNS) + + forbidden_hits = [p for p in FORBIDDEN_PATTERNS if re.search(p, body)] + allowed_hits = [p for p in ALLOWED_PATTERNS if re.search(p, body)] + + # 예외 패턴에 해당하는 함수는 forbidden hit이 있어도 mixed로 처리 (display/collect 겸용 가능) + if is_exception and forbidden_hits: + return 'mixed' + + if forbidden_hits and allowed_hits: + return 'mixed' + if forbidden_hits: + return 'forbidden' + if allowed_hits: + return 'allowed' + return 'unknown' + + +def classify_function_detail(name: str, body: str) -> dict: + is_exception = any(re.search(p, name) for p in EXCEPTION_PATTERNS) + forbidden_hits = [p for p in FORBIDDEN_PATTERNS if re.search(p, body)] + allowed_hits = [p for p in ALLOWED_PATTERNS if re.search(p, body)] + classification = classify_function(name, body) + + # 어떤 responsibility 범주인지 추정 + responsibility = [] + if re.search(r"UrlFetchApp|fetchNaver|fetchYahoo|withFetchCache_|fetch_", body): + responsibility.append('collect') + if re.search(r"normalize|parseKrNum_|toNumber_|formatIso_", body): + responsibility.append('normalize') + if re.search(r"getSpreadsheet_|setValues|appendRow|setValue|sheetToJson|upsertToSheet", body): + responsibility.append('export') + if re.search(r"Logger\.log|Utilities\.formatDate|PropertiesService|CacheService", body): + responsibility.append('display') + if forbidden_hits: + if re.search(r"stop_loss|stop_price|STOP_PRICE|calcStopPrice|STOP_BREACH", str(forbidden_hits)): + responsibility.append('stop_loss') + if re.search(r"take_profit|tp1_price|tp2_price|TAKE_PROFIT|calcTpQ", str(forbidden_hits)): + responsibility.append('take_profit') + if re.search(r"risk_score|riskScore|RISK_SCORE", str(forbidden_hits)): + responsibility.append('risk_score') + if re.search(r"qty|size|Quantities|sizing", str(forbidden_hits)): + responsibility.append('sizing') + if re.search(r"decision|order_type|validation_status|ENTRY_ALLOWED|BUY_ALLOWED|Exit|Sell.*Action|calc.*Gate|calc.*Guard", str(forbidden_hits)): + responsibility.append('decision') + + return { + 'classification': classification, + 'responsibility': list(set(responsibility)) if responsibility else ['unknown'], + 'is_exception': is_exception, + 'forbidden_hit_count': len(forbidden_hits), + 'allowed_hit_count': len(allowed_hits), + } + + +def audit_file(gs_path: Path) -> dict: + source = gs_path.read_text(encoding='utf-8', errors='replace') + functions = extract_functions(source) + + per_function = [] + counts = {'allowed': 0, 'forbidden': 0, 'mixed': 0, 'unknown': 0} + for fn in functions: + detail = classify_function_detail(fn['name'], fn['body']) + cls = detail['classification'] + counts[cls] += 1 + per_function.append({ + 'name': fn['name'], + 'start_line': fn['start_line'], + **detail, + }) + + return { + 'file': gs_path.name, + 'total_functions': len(functions), + 'counts': counts, + 'functions': per_function, + } + + +def main(): + gas_dir = ROOT / 'src' / 'gas_adapter_parts' + gs_files = sorted(gas_dir.glob('*.gs')) + + if not gs_files: + print("ERROR: src/gas_adapter_parts/*.gs 파일 없음", file=sys.stderr) + sys.exit(1) + + all_files = [] + global_counts = {'allowed': 0, 'forbidden': 0, 'mixed': 0, 'unknown': 0} + forbidden_funcs = [] + + for gf in gs_files: + result = audit_file(gf) + all_files.append(result) + for k, v in result['counts'].items(): + global_counts[k] += v + for fn in result['functions']: + if fn['classification'] in ('forbidden', 'mixed'): + forbidden_funcs.append({ + 'file': result['file'], + 'name': fn['name'], + 'classification': fn['classification'], + 'responsibility': fn['responsibility'], + }) + + total = sum(global_counts.values()) + forbidden_total = global_counts['forbidden'] + global_counts['mixed'] + allowed_total = global_counts['allowed'] + migration_candidates = [f for f in forbidden_funcs if f['classification'] == 'forbidden'] + + output = { + 'policy_id': 'GAS_THIN_ADAPTER_POLICY_V1', + 'phase': 'inventory', + 'generated_at': __import__('datetime').datetime.now().isoformat(), + 'summary': { + 'total_functions': total, + 'allowed_count': global_counts['allowed'], + 'forbidden_count': global_counts['forbidden'], + 'mixed_count': global_counts['mixed'], + 'unknown_count': global_counts['unknown'], + 'forbidden_total': forbidden_total, + 'migration_candidate_count': len(migration_candidates), + 'compliance_pct': round(allowed_total / total * 100, 1) if total > 0 else 0, + }, + 'migration_candidates': migration_candidates, + 'per_file': all_files, + } + + out_path = ROOT / 'Temp' / 'gas_business_logic_audit_v1.json' + out_path.parent.mkdir(exist_ok=True) + out_path.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding='utf-8') + + print(f"=== GAS_THIN_ADAPTER_POLICY_V1 Phase-1 Inventory ===") + print(f"파일 수 : {len(gs_files)}") + print(f"총 함수 수 : {total}") + print(f" allowed : {global_counts['allowed']}") + print(f" forbidden : {global_counts['forbidden']}") + print(f" mixed : {global_counts['mixed']}") + print(f" unknown : {global_counts['unknown']}") + print(f"compliance_pct : {output['summary']['compliance_pct']}%") + print(f"이전 후보 : {len(migration_candidates)}개") + print() + print("── 이전 후보 (forbidden) ──") + for f in migration_candidates[:30]: + print(f" [{f['file']}] {f['name']} {f['responsibility']}") + if len(migration_candidates) > 30: + print(f" ... 외 {len(migration_candidates)-30}개") + print() + print(f"출력: {out_path}") + + # CI가 아니면 forbidden 존재 시 exit 0으로 성공 (audit-only 모드) + return 0 + + +if __name__ == '__main__': + sys.exit(main())