feat: Sprint-3 완결 + Sprint-4 착수 (WBS-3.2, 3.4, 5.2)

주요 변경:
- [WBS-3.2] 리밸런싱 V2 신호 가중 목표배분 (signal_weighted_ss001_v1)
  * equal_weight -> SS001_Norm_Score 비례 버킷내 배분
  * 하네스: 삼성(36.84%) > SK하이닉스(29.16%), Core=66.00% PASS
- [WBS-3.4] logDailyAssetHistory_ SpreadsheetApp.getActiveSpreadsheet() -> getSpreadsheet_() 수정
  * run_all 컨텍스트에서 null 반환 방지
- [WBS-5.2] deploy_gas.py 전면 재작성
  * src/gas_adapter_parts/ + src/gas/ 양쪽 소스 탐색
  * gdc_01+gdc_02 -> gas_data_collect.gs 번들링
  * dry-run PASS: 17개 파일 WARN 0건
- src/gas/ 디렉토리 신규 추가 (CLASP 조직화 구조)
- tools/automate_routine.py, download_trading_data.py 신규 추가
- .gitignore: .clasprc.json OAuth 토큰 제외 추가
- ROADMAP_WBS.md: Sprint-3 [x] 완료, Sprint-4 착수 목록 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-13 16:22:19 +09:00
parent cb4787ca2d
commit 72f8d61244
26 changed files with 22879 additions and 85 deletions
+101
View File
@@ -0,0 +1,101 @@
import json
import os
import requests
import time
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
CLASPRC_PATH = ROOT / ".clasprc.json"
CLASP_PATH = ROOT / ".clasp.json"
SPREADSHEET_ID = "1e1TNlLfnT69nvw-I1wU_oBHmEtI2pfbld3e0fFmtrZM"
OUTPUT_XLSX = ROOT / "GatherTradingData.xlsx"
def get_tokens():
if not CLASPRC_PATH.exists():
raise FileNotFoundError(".clasprc.json not found")
with open(CLASPRC_PATH, "r", encoding="utf-8") as f:
return json.load(f)["tokens"]["default"]
def get_script_id():
if not CLASP_PATH.exists():
raise FileNotFoundError(".clasp.json not found")
with open(CLASP_PATH, "r", encoding="utf-8") as f:
return json.load(f)["scriptId"]
def refresh_access_token(tokens):
url = "https://oauth2.googleapis.com/token"
payload = {
"grant_type": "refresh_token",
"refresh_token": tokens["refresh_token"],
"client_id": tokens["client_id"],
"client_secret": tokens["client_secret"],
}
resp = requests.post(url, data=payload)
resp.raise_for_status()
return resp.json()["access_token"]
def run_gas_function(script_id, access_token, function_name):
url = f"https://script.googleapis.com/v1/scripts/{script_id}:run"
headers = {"Authorization": f"Bearer {access_token}"}
payload = {"function": function_name, "devMode": True}
print(f"Executing GAS function: {function_name} ...")
resp = requests.post(url, headers=headers, json=payload)
# Handle response
if resp.status_code != 200:
print(f"Error executing function: HTTP {resp.status_code}")
print(resp.text)
return False
result = resp.json()
if "error" in result:
print(f"Function execution failed: {json.dumps(result['error'], indent=2)}")
# Check if error is because it's not deployed as API Executable (even if user said it is, common issues persist)
return False
print("Function execution triggered successfully.")
return True
def download_spreadsheet(spreadsheet_id, access_token, output_path):
print(f"Downloading spreadsheet {spreadsheet_id} as XLSX...")
# Using Drive API v3 to export Google Sheet as XLSX
url = f"https://www.googleapis.com/drive/v3/files/{spreadsheet_id}/export?mimeType=application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get(url, headers=headers)
if resp.status_code == 403:
print("Error: 403 Forbidden. This usually means the OAuth token lacks the 'https://www.googleapis.com/auth/drive.readonly' or 'drive' scope.")
print("Please ensure your clasp login was done with proper scopes or manual token has Drive access.")
return False
resp.raise_for_status()
with open(output_path, "wb") as f:
f.write(resp.content)
print(f"Successfully downloaded to {output_path}")
return True
def main():
try:
tokens = get_tokens()
script_id = get_script_id()
access_token = refresh_access_token(tokens)
# Step 1: Execute GAS run_all
if run_gas_function(script_id, access_token, "run_all"):
print("Waiting a bit for GAS processes to finalize (optional)...")
time.sleep(5)
# Step 2: Download spreadsheet
if download_spreadsheet(SPREADSHEET_ID, access_token, OUTPUT_XLSX):
print("\nRoutine Part 1 & 2 complete.")
print("Final step: npm run prepare-upload-zip")
else:
print("\nDownload failed. Please check Drive API scopes.")
else:
print("\nGAS execution failed. Process aborted.")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
+20 -8
View File
@@ -381,6 +381,9 @@ def main() -> int:
continue
seen_tickers.add(ticker)
df_row = df_row_map.get(ticker, {})
# SS001_Norm_Score: 0~100 신호 강도. 0은 데이터 없음 → 최소 가중치 1.0으로 폴백
ss001_raw = _f(df_row.get("SS001_Norm_Score"), default=-1.0)
ss001_norm = max(1.0, ss001_raw) if ss001_raw > 0 else 1.0
holdings.append({
"ticker": ticker,
"name": sp["name"] or _s(df_row.get("Name")),
@@ -392,6 +395,7 @@ def main() -> int:
"final_action": _s(df_row.get("Final_Action")),
"sell_reason": _s(df_row.get("Sell_Reason")),
"force_signal": _detect_force_signal(df_row),
"ss001_norm": ss001_norm,
})
# data_feed 에만 있는 보유 종목 보완 (snap 에 없는 경우 — 비정상이지만 방어)
@@ -403,6 +407,8 @@ def main() -> int:
acct_mv = _f(row.get("Account_Market_Value"))
if weight_pct <= 0 and acct_mv <= 0:
continue
ss001_raw = _f(row.get("SS001_Norm_Score"), default=-1.0)
ss001_norm = max(1.0, ss001_raw) if ss001_raw > 0 else 1.0
holdings.append({
"ticker": ticker,
"name": _s(row.get("Name")),
@@ -414,6 +420,7 @@ def main() -> int:
"final_action": _s(row.get("Final_Action")),
"sell_reason": _s(row.get("Sell_Reason")),
"force_signal": _detect_force_signal(row),
"ss001_norm": ss001_norm,
})
# ── 3. 버킷별 현재 비중 집계 ─────────────────────────────────────────────
@@ -462,17 +469,22 @@ def main() -> int:
})
# ── 4. 종목별 분석 ───────────────────────────────────────────────────────
# 버킷 내 equal-weight target
bucket_ticker_count: dict[str, int] = {}
# [WBS-3.2] 신호 가중 목표배분 (SS001_Norm_Score 기반)
# ticker_target_pct = bucket_target × (ss001_norm / Σ ss001_norm in bucket)
# 폴백: ss001_norm 모두 1.0 → equal_weight 와 동일
bucket_ss001_total: dict[str, float] = {}
for h in holdings:
bucket_ticker_count[h["bucket"]] = bucket_ticker_count.get(h["bucket"], 0) + 1
bucket_ss001_total[h["bucket"]] = (
bucket_ss001_total.get(h["bucket"], 0.0) + h.get("ss001_norm", 1.0)
)
ticker_rows: list[dict] = []
for h in holdings:
bname = h["bucket"]
bcfg = BUCKET_CONFIG.get(bname, BUCKET_CONFIG["Satellite"])
n_tickers = bucket_ticker_count.get(bname, 1)
target_pct = _round2(bcfg["target"] / n_tickers)
bname = h["bucket"]
bcfg = BUCKET_CONFIG.get(bname, BUCKET_CONFIG["Satellite"])
ss001_w = h.get("ss001_norm", 1.0)
bucket_ss001 = max(bucket_ss001_total.get(bname, ss001_w), 0.01)
target_pct = _round2(bcfg["target"] * ss001_w / bucket_ss001)
current_pct = _round2(h["weight_pct"])
drift = _round2(current_pct - target_pct)
band_min = _round2(target_pct - band["contract"])
@@ -612,7 +624,7 @@ def main() -> int:
out = {
"formula_id": FORMULA_ID,
"metadata": {
"per_ticker_target_method": "equal_weight_within_bucket",
"per_ticker_target_method": "signal_weighted_ss001_v1",
"regime_source": "macro.REGIME_PRELIM > settings > harness_context > computed_harness",
},
"summary": summary,
+115 -64
View File
@@ -1,91 +1,142 @@
"""deploy_gas.py -- WBS-5.2 GAS auto-deploy script
Bundles src/gas_adapter_parts/ + src/gas/ -> Temp/gas_deploy/ then clasp push.
Usage: python tools/deploy_gas.py [--dry-run] [--skip-push]
"""
import os
import shutil
import json
import argparse
import subprocess
from pathlib import Path
# Resolve project root dynamically relative to this script's directory (tools/)
ROOT = Path(__file__).resolve().parent.parent
SRC_PARTS = ROOT / "src" / "gas_adapter_parts"
SRC_GAS = ROOT / "src" / "gas"
DEPLOY_DIR = ROOT / "Temp" / "gas_deploy"
GAS_FILES = [
"gas_data_feed.gs",
"gas_data_collect.gs",
"gas_lib.gs",
"gas_harness_rows.gs",
"gas_report.gs",
"gas_event_calendar.gs",
"gas_apex_alpha_watch.gs",
"gas_apex_runtime_core.gs"
]
# Resolve a file from multiple candidate directories
def _find(filename: str) -> Path | None:
candidates = [
SRC_PARTS / filename,
SRC_GAS / "core" / filename,
SRC_GAS / "collection" / filename,
SRC_GAS / "engines" / filename,
SRC_GAS / "reports" / filename,
]
for c in candidates:
if c.exists():
return c
return None
def main():
print(f"Preparing deployment directory: {DEPLOY_DIR}")
if DEPLOY_DIR.exists():
shutil.rmtree(DEPLOY_DIR)
DEPLOY_DIR.mkdir(parents=True, exist_ok=True)
# dst_name -> [src_file, ...] (concatenated in order)
BUNDLE_MAP: dict[str, list[str]] = {
"appsscript.json": ["appsscript.json"],
"gas_lib.gs": ["gas_lib.gs"],
"data_feed_base.gs": ["data_feed_base.gs"],
"gas_apex_runtime_core.gs":["gas_apex_runtime_core.gs"],
# gdc_01 + gdc_02 bundled as single file (GAS project legacy name)
"gas_data_collect.gs": [
"gdc_01_fetch_fundamentals.gs",
"gdc_02_account_satellite.gs",
],
"gdf_01_price_metrics.gs": ["gdf_01_price_metrics.gs"],
"gdf_02_harness_assembly.gs": ["gdf_02_harness_assembly.gs"],
"gdf_03_portfolio_gates.gs": ["gdf_03_portfolio_gates.gs"],
"gdf_04_execution_quality.gs":["gdf_04_execution_quality.gs"],
"gdf_05_alpha_engines.gs": ["gdf_05_alpha_engines.gs"],
"gdf_06_rebalance.gs": ["gdf_06_rebalance.gs"],
"gas_data_feed.gs": ["gas_data_feed.gs"],
"gas_harness_rows.gs": ["gas_harness_rows.gs"],
"gas_report.gs": ["gas_report.gs"],
"gas_event_calendar.gs": ["gas_event_calendar.gs"],
"gas_apex_alpha_watch.gs": ["gas_apex_alpha_watch.gs"],
}
# 1. Copy appsscript.json manifest
manifest_src = ROOT / "src" / "gas_adapter_parts" / "appsscript.json"
if manifest_src.exists():
shutil.copy(manifest_src, DEPLOY_DIR / "appsscript.json")
print(f"Copied appsscript.json from {manifest_src}")
else:
# Create default manifest
manifest = {
"timeZone": "Asia/Seoul",
"dependencies": {},
"exceptionLogging": "STACKDRIVER",
"runtimeVersion": "V8"
SCRIPT_ID = "1xfeBAeeknmnBtSvrIqWXO_2hc3ByeriLUOSuOOB4YxLLHhN3zdnL7tVh"
def build_deploy(dry_run: bool = False) -> bool:
print("[deploy_gas] src_parts=" + str(SRC_PARTS))
print("[deploy_gas] src_gas= " + str(SRC_GAS))
print("[deploy_gas] dst= " + str(DEPLOY_DIR))
if not dry_run:
if DEPLOY_DIR.exists():
shutil.rmtree(DEPLOY_DIR)
DEPLOY_DIR.mkdir(parents=True, exist_ok=True)
ok = True
for dst_name, src_files in BUNDLE_MAP.items():
dst_path = DEPLOY_DIR / dst_name
parts: list[str] = []
for sf in src_files:
src_path = _find(sf)
if src_path is None:
print(" WARN: " + sf + " not found")
ok = False
continue
parts.append(src_path.read_text(encoding="utf-8"))
if not parts:
continue
content = "\n".join(parts)
tag = "[DRY]" if dry_run else "write"
print(" " + tag + " " + dst_name + " (" + str(len(content)) + " chars)")
if not dry_run:
dst_path.write_text(content, encoding="utf-8")
if not dry_run:
clasp_cfg = {
"scriptId": SCRIPT_ID,
"rootDir": str(DEPLOY_DIR.relative_to(ROOT)).replace("\\", "/"),
}
with open(DEPLOY_DIR / "appsscript.json", "w", encoding="utf-8") as f:
json.dump(manifest, f, ensure_ascii=False, indent=2)
print("Created default appsscript.json")
(ROOT / ".clasp.json").write_text(
json.dumps(clasp_cfg, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(" write .clasp.json -> rootDir=" + clasp_cfg["rootDir"])
return ok
# 2. Copy GAS files from ROOT to DEPLOY_DIR
copied_count = 0
for gf in GAS_FILES:
src = ROOT / gf
if src.exists():
shutil.copy(src, DEPLOY_DIR / gf)
print(f"Copied {gf}")
copied_count += 1
else:
print(f"WARNING: Source file not found: {gf}")
# 3. Create/Overwrite .clasp.json with DEPLOY_DIR as rootDir
clasp_cfg = {
"scriptId": "1xfeBAeeknmnBtSvrIqWXO_2hc3ByeriLUOSuOOB4YxLLHhN3zdnL7tVh",
"rootDir": str(DEPLOY_DIR.relative_to(ROOT))
}
with open(ROOT / ".clasp.json", "w", encoding="utf-8") as f:
json.dump(clasp_cfg, f, ensure_ascii=False, indent=2)
print(f"Updated .clasp.json with rootDir={clasp_cfg['rootDir']}")
# 4. Run clasp push with shell=True for Windows compatibility
print("Running npx @google/clasp push -f ...")
env = dict(os.environ)
def clasp_push() -> bool:
print("[deploy_gas] clasp push -f ...")
res = subprocess.run(
["npx", "@google/clasp", "push", "-f"],
cwd=str(ROOT),
env=env,
shell=True,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace"
errors="replace",
)
print(f"Return code: {res.returncode}")
print("STDOUT:")
print(res.stdout)
print("STDERR:")
print(res.stderr)
if res.stderr:
print("STDERR: " + res.stderr[:500])
if res.returncode == 0:
print("GAS deploy completed successfully!")
else:
print("GAS deploy failed!")
exit(1)
print("[deploy_gas] clasp push OK")
return True
print("[deploy_gas] clasp push FAILED rc=" + str(res.returncode))
return False
def main() -> None:
parser = argparse.ArgumentParser(description="GAS auto-deploy")
parser.add_argument("--dry-run", action="store_true", help="List files without writing")
parser.add_argument("--skip-push", action="store_true", help="Bundle only, skip clasp push")
args = parser.parse_args()
ok = build_deploy(dry_run=args.dry_run)
if not ok:
print("[deploy_gas] Some source files missing -- check warnings above")
raise SystemExit(1)
if args.dry_run or args.skip_push:
print("[deploy_gas] dry-run/skip-push -- push skipped")
return
if not clasp_push():
raise SystemExit(1)
print("[deploy_gas] Done. To run_all: python tools/automate_routine.py")
if __name__ == "__main__":
main()
+49
View File
@@ -0,0 +1,49 @@
import json
import os
import requests
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
CLASPRC_PATH = ROOT / ".clasprc.json"
SPREADSHEET_ID = "1e1TNlLfnT69nvw-I1wU_oBHmEtI2pfbld3e0fFmtrZM"
OUTPUT_XLSX = ROOT / "GatherTradingData.xlsx"
def get_tokens():
if not CLASPRC_PATH.exists():
raise FileNotFoundError(".clasprc.json not found")
with open(CLASPRC_PATH, "r", encoding="utf-8") as f:
return json.load(f)["tokens"]["default"]
def refresh_access_token(tokens):
url = "https://oauth2.googleapis.com/token"
payload = {
"grant_type": "refresh_token",
"refresh_token": tokens["refresh_token"],
"client_id": tokens["client_id"],
"client_secret": tokens["client_secret"],
}
resp = requests.post(url, data=payload)
resp.raise_for_status()
return resp.json()["access_token"]
def download_spreadsheet(spreadsheet_id, access_token, output_path):
print(f"Downloading spreadsheet {spreadsheet_id} as XLSX...")
url = f"https://www.googleapis.com/drive/v3/files/{spreadsheet_id}/export?mimeType=application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get(url, headers=headers)
resp.raise_for_status()
with open(output_path, "wb") as f:
f.write(resp.content)
print(f"Successfully downloaded to {output_path}")
def main():
try:
tokens = get_tokens()
access_token = refresh_access_token(tokens)
download_spreadsheet(SPREADSHEET_ID, access_token, OUTPUT_XLSX)
print("\nNext step: npm run prepare-upload-zip")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()