Files
QuantEngineByItz/tools/sync_replay_sheet_to_history.py
T
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

140 lines
5.4 KiB
Python

from __future__ import annotations
import argparse
import json
from datetime import datetime
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_HISTORY = ROOT / "Temp" / "proposal_evaluation_history.json"
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
obj = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def _text(v: Any) -> str:
return str(v or "").strip()
def _to_float(v: Any) -> float | None:
try:
if v is None or v == "":
return None
return float(v)
except Exception:
return None
def _summarize(records: list[dict[str, Any]]) -> dict[str, Any]:
def hs(status_key: str, out_key: str, ret_key: str) -> dict[str, Any]:
ev = [r for r in records if str(r.get(status_key) or "").startswith("EVALUATED_")]
m = [r for r in ev if r.get(out_key) == "MATCHED"]
mm = [r for r in ev if r.get(out_key) == "MISMATCHED"]
rets = [r.get(ret_key) for r in ev if isinstance(r.get(ret_key), (int, float))]
return {
"evaluated_count": len(ev),
"matched_count": len(m),
"mismatched_count": len(mm),
"match_rate_pct": round((len(m) / len(ev)) * 100, 2) if ev else None,
"avg_return_pct": round(sum(rets) / len(rets), 2) if rets else None,
}
t1 = [r for r in records if r.get("evaluation_status") == "EVALUATED_T1"]
t1m = [r for r in t1 if r.get("outcome") == "MATCHED"]
t1mm = [r for r in t1 if r.get("outcome") == "MISMATCHED"]
return {
"evaluated_count": len(t1),
"matched_count": len(t1m),
"mismatched_count": len(t1mm),
"match_rate_pct": round((len(t1m) / len(t1)) * 100, 2) if t1 else None,
"t5_horizon": hs("t5_evaluation_status", "t5_outcome", "t5_return_pct"),
"t20_horizon": hs("t20_evaluation_status", "t20_outcome", "t20_return_pct"),
"last_updated": datetime.now().isoformat(timespec="seconds"),
}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--history", default=str(DEFAULT_HISTORY))
args = ap.parse_args()
jp = Path(args.json)
hp = Path(args.history)
if not jp.is_absolute():
jp = ROOT / jp
if not hp.is_absolute():
hp = ROOT / hp
payload = _load_json(jp)
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
replay = data.get("replay_eod_backfill") if isinstance(data.get("replay_eod_backfill"), list) else []
replay = [r for r in replay if isinstance(r, dict)]
hist = _load_json(hp)
records = hist.get("records") if isinstance(hist.get("records"), list) else []
existing = {_text(r.get("proposal_id")) for r in records if isinstance(r, dict)}
added = 0
for r in replay:
pid = _text(r.get("proposal_id"))
if not pid or pid in existing:
continue
rec = {
"proposal_id": pid,
"record_type": _text(r.get("record_type") or "HISTORICAL_REPLAY_EOD"),
"data_origin": _text(r.get("data_origin") or "REPLAY_FROM_XLSX"),
"proposal_date": _text(r.get("proposal_date")),
"ticker": _text(r.get("ticker")),
"name": _text(r.get("name")),
"action": _text(r.get("action")),
"order_type": _text(r.get("order_type")),
"validation_status": _text(r.get("validation_status") or "REPLAY_BACKFILL"),
"expected_direction": _text(r.get("expected_direction") or "NEUTRAL"),
"proposed_close": _to_float(r.get("proposed_close")),
"proposed_limit_price": None,
"proposed_quantity": None,
"rule_basis": "REPLAY_BACKFILL_XLSX_PERSISTENT",
"evaluation_status": "EVALUATED_T1",
"result_date": _text(r.get("result_date")),
"result_close": _to_float(r.get("result_close")),
"next_return_pct": _to_float(r.get("next_return_pct")),
"outcome": _text(r.get("outcome")),
"error_cause": _text(r.get("error_cause") or "REPLAY_BACKFILL"),
"improvement_proposal": _text(r.get("improvement_proposal") or "REPLAY_ONLY_DO_NOT_AUTO_ADOPT"),
"t5_evaluation_status": "EVALUATED_T5",
"t5_result_date": _text(r.get("t5_result_date")),
"t5_return_pct": _to_float(r.get("t5_return_pct")),
"t5_outcome": _text(r.get("t5_outcome")),
"t20_evaluation_status": "EVALUATED_T20",
"t20_result_date": _text(r.get("t20_result_date")),
"t20_return_pct": _to_float(r.get("t20_return_pct")),
"t20_outcome": _text(r.get("t20_outcome")),
}
records.append(rec)
existing.add(pid)
added += 1
records.sort(key=lambda x: (_text(x.get("proposal_date")), _text(x.get("ticker")), _text(x.get("proposal_id"))))
hist["schema_version"] = "2026-05-25-proposal-evaluation-v4-replay-sync"
hist["records"] = records
hist["summary"] = _summarize(records)
hp.parent.mkdir(parents=True, exist_ok=True)
hp.write_text(json.dumps(hist, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"REPLAY_SYNC_OK added={added} total={len(records)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())