ee3e799de1
주요 변경: - tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규 * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합 * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일) - src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규 * Logger.log / getSpreadsheet_() 로 run_all 연동 수정 - src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs * _mergePositionRecord_(): 소수주 중복 행 합산 신규 * parseInt → parseFloat (qty, availQty) - src/gas_adapter_parts/gdf_01_price_metrics.gs * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL - spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63) - spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
219 lines
8.2 KiB
Python
219 lines
8.2 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
from orchestration_harness_v1 import run_plan
|
|
from pipeline_runtime_anomaly_lib_v1 import finalize_runtime_profile, runtime_profile_from_steps
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DIST = ROOT / "dist"
|
|
RUNTIME_PROFILE = ROOT / "Temp" / "build_bundle_runtime_profile_v1.json"
|
|
CACHE_PATH = ROOT / "Temp" / "build_bundle_cache_v1.json"
|
|
|
|
|
|
def read(path: str) -> str:
|
|
return (ROOT / path).read_text(encoding="utf-8").rstrip()
|
|
|
|
|
|
def load_bundle_contents(paths: list[str]) -> dict[str, str]:
|
|
contents: dict[str, str] = {}
|
|
for path in paths:
|
|
contents[path] = read(path)
|
|
return contents
|
|
|
|
|
|
def compute_content_signature(contents: dict[str, str]) -> str:
|
|
digest = hashlib.sha256()
|
|
for path in sorted(contents):
|
|
digest.update(path.encode("utf-8"))
|
|
digest.update(b"\0")
|
|
digest.update(contents[path].encode("utf-8"))
|
|
digest.update(b"\0")
|
|
return digest.hexdigest()
|
|
|
|
|
|
def flatten_manifest_files(manifest: dict) -> list[str]:
|
|
ordered: list[str] = ["RetirementAssetPortfolio.yaml", "AGENTS.md"]
|
|
for step in (manifest.get("load_sequence") or {}).values():
|
|
for file_name in step.get("files", []):
|
|
if "*" not in file_name and file_name not in ordered:
|
|
ordered.append(file_name)
|
|
for extra in (
|
|
"spec/ownership_map.yaml",
|
|
"spec/aliases.yaml",
|
|
"spec/xref_matrix.yaml",
|
|
"prompts/analysis_prompt.md",
|
|
"prompts/review_prompt.md",
|
|
):
|
|
if extra not in ordered and (ROOT / extra).exists():
|
|
ordered.append(extra)
|
|
return ordered
|
|
|
|
|
|
def bundle_profile(manifest: dict, mode: str) -> dict:
|
|
if mode == "full":
|
|
return {
|
|
"output": "dist/retirement_portfolio_bundle.yaml",
|
|
"purpose": "분리된 문서를 manifest 순서로 묶은 전체 LLM 입력용 합본",
|
|
"files": flatten_manifest_files(manifest),
|
|
}
|
|
profiles = manifest.get("bundle_profiles") or {}
|
|
profile = profiles.get(mode)
|
|
if not isinstance(profile, dict):
|
|
raise ValueError(f"missing bundle profile: {mode}")
|
|
return profile
|
|
|
|
|
|
def write_bundle(
|
|
paths: list[str],
|
|
output: Path,
|
|
mode: str = "full",
|
|
purpose_text: str | None = None,
|
|
*,
|
|
contents: dict[str, str] | None = None,
|
|
) -> None:
|
|
title_suffix = {
|
|
"full": "Full",
|
|
"compact": "Compact",
|
|
"ultra_compact": "Ultra Compact",
|
|
}[mode]
|
|
purpose = purpose_text or "분리된 문서를 manifest 순서로 묶은 LLM 입력용 합본"
|
|
chunks = [
|
|
"meta:",
|
|
f" title: \"은퇴자산포트폴리오 {title_suffix} Bundle\"",
|
|
" generated_by: \"tools/build_bundle.py\"",
|
|
f" purpose: \"{purpose}\"",
|
|
f" mode: \"{mode}\"",
|
|
f" file_count: {len(paths)}",
|
|
"",
|
|
"bundle_files:",
|
|
]
|
|
for path in paths:
|
|
chunks.append(f" - \"{path}\"")
|
|
chunks.append("")
|
|
chunks.append("bundle_content:")
|
|
for path in paths:
|
|
text = contents[path] if contents and path in contents else read(path)
|
|
indented = "\n".join(" " + line for line in text.splitlines())
|
|
key = path.replace("\\", "/").replace("/", "__").replace(".", "_")
|
|
chunks.append(f" {key}: |")
|
|
chunks.append(indented if indented else " ")
|
|
output.write_text("\n".join(chunks).rstrip() + "\n", encoding="utf-8")
|
|
|
|
|
|
def _write_bundle_job(mode: str, profile: dict, contents: dict[str, str]) -> dict[str, str]:
|
|
output = ROOT / profile["output"]
|
|
paths = [path for path in profile["files"] if "*" not in path]
|
|
write_bundle(paths, output, mode=mode, purpose_text=profile.get("purpose"), contents=contents)
|
|
return {"mode": mode, "output": str(output), "status": "OK"}
|
|
|
|
|
|
def _validate_bundle(path: Path) -> dict[str, str]:
|
|
yaml.safe_load(path.read_text(encoding="utf-8"))
|
|
return {"path": str(path), "status": "OK"}
|
|
|
|
|
|
def load_cache() -> dict[str, str]:
|
|
if not CACHE_PATH.exists():
|
|
return {}
|
|
try:
|
|
data = json.loads(CACHE_PATH.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
return data if isinstance(data, dict) else {}
|
|
|
|
|
|
def save_cache(payload: dict[str, str]) -> None:
|
|
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
CACHE_PATH.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def main() -> int:
|
|
DIST.mkdir(exist_ok=True)
|
|
# 1) 공통 입력 로드는 순차로 고정한다.
|
|
manifest = yaml.safe_load((ROOT / "RetirementAssetPortfolio.yaml").read_text(encoding="utf-8"))
|
|
profiles = {mode: bundle_profile(manifest, mode) for mode in ("full", "compact", "ultra_compact")}
|
|
all_profile_paths = [path for profile in profiles.values() for path in profile.get("files", [])]
|
|
all_unique_paths = sorted({path for path in all_profile_paths if "*" not in path})
|
|
missing = [path for path in all_profile_paths if "*" not in path and not (ROOT / path).exists()]
|
|
if missing:
|
|
print("BUNDLE BUILD FAIL")
|
|
for path in missing:
|
|
print(f"- missing: {path}")
|
|
return 1
|
|
shared_contents = load_bundle_contents(all_unique_paths)
|
|
content_signature = compute_content_signature(shared_contents)
|
|
latest_source_mtime = max((ROOT / path).stat().st_mtime for path in all_unique_paths) if all_unique_paths else 0.0
|
|
cache = load_cache()
|
|
output_paths = [ROOT / profiles[mode]["output"] for mode in ("full", "compact", "ultra_compact")]
|
|
cache_hit = bool(cache) and all(path.exists() for path in output_paths) and min(path.stat().st_mtime for path in output_paths) >= latest_source_mtime
|
|
bundle_paths = [
|
|
ROOT / profiles["full"]["output"],
|
|
ROOT / profiles["compact"]["output"],
|
|
ROOT / profiles["ultra_compact"]["output"],
|
|
]
|
|
if cache_hit:
|
|
# 소스와 산출물 해시가 같으면 기존 번들을 재사용한다.
|
|
orchestration_steps = []
|
|
for mode, profile in profiles.items():
|
|
orchestration_steps.append({
|
|
"name": f"build_{mode}",
|
|
"callable": (lambda m=mode, p=profile: {"mode": m, "output": str(ROOT / p["output"]), "status": "CACHED"}),
|
|
})
|
|
for mode, path in zip(("full", "compact", "ultra_compact"), bundle_paths):
|
|
orchestration_steps.append({
|
|
"name": f"validate_{path.stem}",
|
|
"depends_on": [f"build_{mode}"],
|
|
"callable": (lambda p=path: {"path": str(p), "status": "OK_CACHED"}),
|
|
})
|
|
results = run_plan(orchestration_steps, label="build-bundle:cached")
|
|
else:
|
|
# 2) 서로 독립인 3개 번들 생성은 병렬, 각 생성물 검증은 해당 생성 직후 수행한다.
|
|
orchestration_steps = []
|
|
for mode, profile in profiles.items():
|
|
orchestration_steps.append({
|
|
"name": f"build_{mode}",
|
|
"callable": (lambda m=mode, p=profile, c=shared_contents: _write_bundle_job(m, p, c)),
|
|
})
|
|
for mode, path in zip(("full", "compact", "ultra_compact"), bundle_paths):
|
|
orchestration_steps.append({
|
|
"name": f"validate_{path.stem}",
|
|
"depends_on": [f"build_{mode}"],
|
|
"callable": (lambda p=path: _validate_bundle(p)),
|
|
})
|
|
results = run_plan(orchestration_steps, label="build-bundle")
|
|
save_cache({
|
|
"content_signature": content_signature,
|
|
"latest_source_mtime": latest_source_mtime,
|
|
"bundle_outputs": [str(p) for p in bundle_paths],
|
|
})
|
|
profile = runtime_profile_from_steps(
|
|
harness_name="build-bundle",
|
|
mode="bundle",
|
|
steps=results,
|
|
runtime_context={
|
|
"harness_name": "build-bundle",
|
|
"mode": "bundle",
|
|
},
|
|
file_count=len(all_profile_paths),
|
|
gate_status="OK",
|
|
)
|
|
profile["cache_hit"] = cache_hit
|
|
analysis = finalize_runtime_profile(
|
|
profile_path=RUNTIME_PROFILE,
|
|
payload=profile,
|
|
)
|
|
if analysis.get("status") == "ALERT":
|
|
print("RUNTIME_ANOMALY:", ";".join(analysis.get("anomaly_reason_codes") or []))
|
|
print("BUNDLE BUILD OK" + (" (cached)" if cache_hit else ""))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|