Merge pull request 'feat: GAS_THIN_ADAPTER_POLICY_V1 Phase-1 Inventory' (#36) from feature/gas-thin-adapter-phase1-inventory into main

feat: GAS_THIN_ADAPTER_POLICY_V1 Phase-1 Inventory (#36)
This commit is contained in:
2026-06-14 10:31:54 +09:00
2 changed files with 297 additions and 1 deletions
+12 -1
View File
@@ -15,9 +15,20 @@ forbidden_responsibilities:
- take_profit
- risk_score
migration_plan:
status: PLANNED
status: IN_PROGRESS
inventory_result:
tool: tools/audit_gas_thin_adapter_v1.py
output: Temp/gas_business_logic_audit_v1.json
total_functions: 273
forbidden_count: 23
mixed_count: 15
allowed_count: 104
unknown_count: 131
compliance_pct: 38.1
migration_candidate_count: 23
phases:
- phase: inventory
status: DONE
target: Temp/gas_business_logic_audit_v1.json
action: 분류된 GAS 함수 목록을 확정한다.
- phase: extract
+285
View File
@@ -0,0 +1,285 @@
"""
GAS_THIN_ADAPTER_POLICY_V1 — Phase 1: Inventory
spec/39_gas_thin_adapter_policy.yaml의 inventory 단계.
src/gas_adapter_parts/*.gs 내 모든 함수를 스캔하여 allowed/forbidden/mixed/unknown으로 분류.
결과를 Temp/gas_business_logic_audit_v1.json에 저장.
"""
import re
import json
import sys
import os
from pathlib import Path
ROOT = Path(__file__).parent.parent
# ── 분류 키워드 ─────────────────────────────────────────────────────────────
# forbidden_responsibilities: decision / sizing / stop_loss / take_profit / risk_score
FORBIDDEN_PATTERNS = [
# sizing
r"\bcalcQuantities_\b", r"\bposition_size\b", r"\blimit_price_est\b",
r"\blimit_price_calc\b", r"\bqty_ladder\b", r"\btp_qty\b",
r"POSITION_SIZE_V1", r"calcPrices_\(",
# stop_loss
r"\bstop_loss\b", r"\bstop_price\b", r"STOP_PRICE_CORE",
r"calcStopPrice", r"STOP_BREACH_ALERT",
# take_profit
r"\btake_profit\b", r"\btp1_price\b", r"\btp2_price\b",
r"TAKE_PROFIT_LADDER", r"TP_TRIGGER_ALERT",
r"calcTpQuantityLadder_", r"calcProfitPreservationRow_",
r"calcCashPreservationSellEngineV2_",
# risk_score
r"\brisk_score\b", r"\briskScore\b", r"RISK_SCORE",
# decision
r"validation_status\s*[=:]\s*['\"]PASS['\"]",
r"validation_status\s*[=:]\s*['\"]BLOCK",
r"\bENTRY_ALLOWED\b", r"\bBUY_ALLOWED\b",
r"\border_type\s*[=:]\s*['\"]BUY['\"]",
r"\border_type\s*[=:]\s*['\"]SELL['\"]",
r"POSITION_SIZE_V1", r"STOP_PRICE_CORE_V1",
r"calcExitSellAction_", r"calcSmartCashRaiseV2_",
r"calcCashPreservationPlan_", r"calcCashShortfallHarness_",
r"calcCoreSatelliteExecutionState_",
r"calcMandatoryReductionPlan_",
r"calcDistributionRiskRow_", r"calcSellConflictScore_",
r"calcBreakoutQualityGate_", r"calcEntryTimingSignal_",
r"calcAntiLateEntryGateV2_", r"applyAlegGate4And5_",
r"calcSmartCashRecoverySell_",
# sizing helpers with calculation output
r"\bbase_qty\b", r"\bsell_qty\s*=", r"\bbuy_qty\s*=",
r"\bsuggested_sell_qty\b",
]
# allowed_responsibilities: collect / normalize / export / display
ALLOWED_PATTERNS = [
# collect
r"UrlFetchApp\.fetch\b", r"\bwithFetchCache_\b",
r"\bgetCachedFetchResult_\b", r"\bsetCachedFetchResult_\b",
r"fetchNaver", r"fetchYahoo",
r"\bisFetchCircuitOpen_\b", r"\brecordFetchSuccess_\b",
r"\bconsumeFetchBudget_\b",
# normalize
r"\bnormalizeTickerCode\b", r"\bnormalizeYahooSymbol\b",
r"\bparseKrNum_\b", r"\btoNumber_\b",
r"\bformatIso_\b", r"\bparseIsoDateYmd_\b",
# export
r"\bgetSpreadsheet_\b", r"\bss\.getSheetByName\b",
r"\.getDataRange\(\)\.getValues\(\)",
r"\.setValues\b", r"\.appendRow\b", r"\.setValue\b",
r"\bSpreadsheetApp\b", r"\bsheetToJson\b",
r"\bupsertToSheetByKey\b",
# display
r"\bLogger\.log\b", r"\bUtilities\.formatDate\b",
r"\bPropertiesService\b", r"\bCacheService\b",
r"setProperty\b", r"getProperty\b",
]
# 예외 (spec exceptions): 이름 기반 — 보고서 렌더링/데이터 수집 헬퍼
EXCEPTION_PATTERNS = [
r"^render", r"^format", r"^parse", r"^normalize",
r"^fetch", r"^get[A-Z]", r"^set[A-Z]", r"^read",
r"^summarize", r"^annotate", r"^cache",
r"^beginFetch", r"^clearFetch", r"^record",
r"_fallback_$", r"Compat",
]
def extract_functions(source: str) -> list[dict]:
"""GAS 소스에서 function 정의를 추출한다. 중첩 함수 미지원(GAS 최상위 함수만)."""
results = []
# function 시그니처: function name_(args) {
sig_re = re.compile(
r'^function\s+([A-Za-z_$][A-Za-z0-9_$]*)\s*\([^)]*\)\s*\{',
re.MULTILINE
)
lines = source.split('\n')
line_starts = []
offset = 0
for ln in lines:
line_starts.append(offset)
offset += len(ln) + 1
for m in sig_re.finditer(source):
func_name = m.group(1)
brace_start = m.end() - 1 # position of opening {
# 괄호 카운팅으로 함수 끝 찾기
depth = 0
pos = brace_start
body_end = len(source)
for i in range(brace_start, len(source)):
if source[i] == '{':
depth += 1
elif source[i] == '}':
depth -= 1
if depth == 0:
body_end = i + 1
break
body = source[brace_start:body_end]
# 함수 시작 행 번호 (1-based)
start_line = sum(1 for ls in line_starts if ls <= m.start())
results.append({
'name': func_name,
'start_line': start_line,
'body': body,
})
return results
def classify_function(name: str, body: str) -> str:
"""allowed / forbidden / mixed / unknown 중 하나를 반환."""
is_exception = any(re.search(p, name) for p in EXCEPTION_PATTERNS)
forbidden_hits = [p for p in FORBIDDEN_PATTERNS if re.search(p, body)]
allowed_hits = [p for p in ALLOWED_PATTERNS if re.search(p, body)]
# 예외 패턴에 해당하는 함수는 forbidden hit이 있어도 mixed로 처리 (display/collect 겸용 가능)
if is_exception and forbidden_hits:
return 'mixed'
if forbidden_hits and allowed_hits:
return 'mixed'
if forbidden_hits:
return 'forbidden'
if allowed_hits:
return 'allowed'
return 'unknown'
def classify_function_detail(name: str, body: str) -> dict:
is_exception = any(re.search(p, name) for p in EXCEPTION_PATTERNS)
forbidden_hits = [p for p in FORBIDDEN_PATTERNS if re.search(p, body)]
allowed_hits = [p for p in ALLOWED_PATTERNS if re.search(p, body)]
classification = classify_function(name, body)
# 어떤 responsibility 범주인지 추정
responsibility = []
if re.search(r"UrlFetchApp|fetchNaver|fetchYahoo|withFetchCache_|fetch_", body):
responsibility.append('collect')
if re.search(r"normalize|parseKrNum_|toNumber_|formatIso_", body):
responsibility.append('normalize')
if re.search(r"getSpreadsheet_|setValues|appendRow|setValue|sheetToJson|upsertToSheet", body):
responsibility.append('export')
if re.search(r"Logger\.log|Utilities\.formatDate|PropertiesService|CacheService", body):
responsibility.append('display')
if forbidden_hits:
if re.search(r"stop_loss|stop_price|STOP_PRICE|calcStopPrice|STOP_BREACH", str(forbidden_hits)):
responsibility.append('stop_loss')
if re.search(r"take_profit|tp1_price|tp2_price|TAKE_PROFIT|calcTpQ", str(forbidden_hits)):
responsibility.append('take_profit')
if re.search(r"risk_score|riskScore|RISK_SCORE", str(forbidden_hits)):
responsibility.append('risk_score')
if re.search(r"qty|size|Quantities|sizing", str(forbidden_hits)):
responsibility.append('sizing')
if re.search(r"decision|order_type|validation_status|ENTRY_ALLOWED|BUY_ALLOWED|Exit|Sell.*Action|calc.*Gate|calc.*Guard", str(forbidden_hits)):
responsibility.append('decision')
return {
'classification': classification,
'responsibility': list(set(responsibility)) if responsibility else ['unknown'],
'is_exception': is_exception,
'forbidden_hit_count': len(forbidden_hits),
'allowed_hit_count': len(allowed_hits),
}
def audit_file(gs_path: Path) -> dict:
source = gs_path.read_text(encoding='utf-8', errors='replace')
functions = extract_functions(source)
per_function = []
counts = {'allowed': 0, 'forbidden': 0, 'mixed': 0, 'unknown': 0}
for fn in functions:
detail = classify_function_detail(fn['name'], fn['body'])
cls = detail['classification']
counts[cls] += 1
per_function.append({
'name': fn['name'],
'start_line': fn['start_line'],
**detail,
})
return {
'file': gs_path.name,
'total_functions': len(functions),
'counts': counts,
'functions': per_function,
}
def main():
gas_dir = ROOT / 'src' / 'gas_adapter_parts'
gs_files = sorted(gas_dir.glob('*.gs'))
if not gs_files:
print("ERROR: src/gas_adapter_parts/*.gs 파일 없음", file=sys.stderr)
sys.exit(1)
all_files = []
global_counts = {'allowed': 0, 'forbidden': 0, 'mixed': 0, 'unknown': 0}
forbidden_funcs = []
for gf in gs_files:
result = audit_file(gf)
all_files.append(result)
for k, v in result['counts'].items():
global_counts[k] += v
for fn in result['functions']:
if fn['classification'] in ('forbidden', 'mixed'):
forbidden_funcs.append({
'file': result['file'],
'name': fn['name'],
'classification': fn['classification'],
'responsibility': fn['responsibility'],
})
total = sum(global_counts.values())
forbidden_total = global_counts['forbidden'] + global_counts['mixed']
allowed_total = global_counts['allowed']
migration_candidates = [f for f in forbidden_funcs if f['classification'] == 'forbidden']
output = {
'policy_id': 'GAS_THIN_ADAPTER_POLICY_V1',
'phase': 'inventory',
'generated_at': __import__('datetime').datetime.now().isoformat(),
'summary': {
'total_functions': total,
'allowed_count': global_counts['allowed'],
'forbidden_count': global_counts['forbidden'],
'mixed_count': global_counts['mixed'],
'unknown_count': global_counts['unknown'],
'forbidden_total': forbidden_total,
'migration_candidate_count': len(migration_candidates),
'compliance_pct': round(allowed_total / total * 100, 1) if total > 0 else 0,
},
'migration_candidates': migration_candidates,
'per_file': all_files,
}
out_path = ROOT / 'Temp' / 'gas_business_logic_audit_v1.json'
out_path.parent.mkdir(exist_ok=True)
out_path.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding='utf-8')
print(f"=== GAS_THIN_ADAPTER_POLICY_V1 Phase-1 Inventory ===")
print(f"파일 수 : {len(gs_files)}")
print(f"총 함수 수 : {total}")
print(f" allowed : {global_counts['allowed']}")
print(f" forbidden : {global_counts['forbidden']}")
print(f" mixed : {global_counts['mixed']}")
print(f" unknown : {global_counts['unknown']}")
print(f"compliance_pct : {output['summary']['compliance_pct']}%")
print(f"이전 후보 : {len(migration_candidates)}")
print()
print("── 이전 후보 (forbidden) ──")
for f in migration_candidates[:30]:
print(f" [{f['file']}] {f['name']} {f['responsibility']}")
if len(migration_candidates) > 30:
print(f" ... 외 {len(migration_candidates)-30}")
print()
print(f"출력: {out_path}")
# CI가 아니면 forbidden 존재 시 exit 0으로 성공 (audit-only 모드)
return 0
if __name__ == '__main__':
sys.exit(main())