Files
QuantEngineByItz/tools/run_release_dag_v2.py
T
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

181 lines
5.6 KiB
Python

#!/usr/bin/env python3
import argparse
import sys
import json
import time
import subprocess
import hashlib
from pathlib import Path
import yaml
ROOT = Path(__file__).resolve().parents[1]
LINEAGE_LOG = ROOT / "runtime" / "lineage_events.jsonl"
REPORT = ROOT / "Temp" / "release_dag_run_v1.json"
def file_sha256(path: Path) -> str:
if not path.exists():
return ""
h = hashlib.sha256()
try:
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
except Exception:
return ""
def compute_combined_hash(paths: list[str]) -> str:
hashes = []
for p in paths:
path = ROOT / p
h = file_sha256(path)
if h:
hashes.append(h)
if not hashes:
return ""
return hashlib.sha256("".join(hashes).encode("utf-8")).hexdigest()
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--dag", default="spec/41_release_dag.yaml")
parser.add_argument("--mode", choices=["release", "full"], default="release")
args = parser.parse_args()
dag_path = ROOT / args.dag
if not dag_path.exists():
print(f"DAG file not found: {dag_path}")
return 1
try:
data = yaml.safe_load(dag_path.read_text(encoding="utf-8"))
except Exception as e:
print(f"Error parsing YAML: {e}")
return 1
nodes = data["dag"]["nodes"]
# Topological sort
visited = set()
temp = set()
order = []
def visit(nid):
if nid in temp:
raise ValueError("Cycle detected")
if nid not in visited:
temp.add(nid)
for dep in nodes[nid].get("depends_on") or []:
if dep in nodes:
visit(dep)
temp.remove(nid)
visited.add(nid)
order.append(nid)
for nid in nodes:
if nid not in visited:
try:
visit(nid)
except ValueError:
print("Cycle detected during topological sorting")
return 1
# In "release" mode, we might skip build nodes or keep only validation nodes.
# But to match run_release_dag_v1 behavior:
# "release" runs validation commands.
# "full" runs audits + validation + builds.
# Let's filter depending on the mode.
# If mode == "release", we only run validate_ nodes (or we run everything that doesn't start with build_).
# Actually, let's look at what run_release_dag_v1 did:
# release runs: validate_specs, validate_active_manifest, validate_report_packet_sync_v1, validate_field_dictionary, validate_number_provenance_strict_v3, validate_low_capability_pack_v1, validate_golden_coverage_100, validate_calibration_registry_v1, validate_schema_model_generation_v1, validate_gas_thin_adapter_v1, validate_agents_shrink_v1, validate_no_replay_live_mix_v1, validate_renderer_no_calculation_v1, validate_release_dag_v1.
# So release mode only runs nodes whose ID starts with "validate_".
# full mode runs everything.
steps_run = []
success = True
LINEAGE_LOG.parent.mkdir(parents=True, exist_ok=True)
for nid in order:
node = nodes[nid]
if args.mode == "release" and not nid.startswith("validate_"):
continue
cmd = list(node["command"])
# If the command starts with python, use sys.executable
if cmd and cmd[0] == "python":
cmd[0] = sys.executable
print(f"Running node: {nid} ...")
start_time = time.time()
# Compute input hash before running
input_hash = compute_combined_hash(node.get("inputs") or [])
proc = subprocess.run(
cmd,
cwd=ROOT,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace"
)
elapsed = round(time.time() - start_time, 3)
# Compute output hash after running
output_hash = compute_combined_hash(node.get("outputs") or [])
gate = "PASS" if proc.returncode == 0 else "FAIL"
# Log to lineage_events.jsonl
event = {
"node_id": nid,
"command": " ".join(cmd),
"returncode": proc.returncode,
"elapsed_sec": elapsed,
"gate": gate,
"input_hash": input_hash,
"output_hash": output_hash,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
with LINEAGE_LOG.open("a", encoding="utf-8") as lf:
lf.write(json.dumps(event, ensure_ascii=False) + "\n")
steps_run.append({
"command": " ".join(cmd),
"returncode": proc.returncode,
"stdout": proc.stdout[-4000:],
"stderr": proc.stderr[-4000:]
})
if proc.returncode != 0:
print(f"Node {nid} failed with returncode {proc.returncode}")
print(proc.stderr)
success = False
if node.get("strict", True):
break
# Save release_dag_run_v1.json
REPORT.parent.mkdir(parents=True, exist_ok=True)
REPORT.write_text(json.dumps({
"formula_id": "RELEASE_DAG_RUN_V1",
"mode": args.mode,
"steps": steps_run
}, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps({
"formula_id": "RELEASE_DAG_RUN_V1",
"mode": args.mode,
"step_count": len(steps_run),
"gate": "PASS" if success else "FAIL"
}, ensure_ascii=True, indent=2))
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())