Files
QuantEngineByItz/tools/validate_no_lookahead_bias_v1.py
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

198 lines
6.7 KiB
Python

"""validate_no_lookahead_bias_v1.py — spec/54: H003_ANTI_BACKFILL_LOOKAHEAD
Checks that no feature used in the current decision packet has a timestamp
that is *after* the decision_timestamp (lookahead bias). Also verifies
TIME_STOP signals don't use future hold-day counts.
formula_id: VALIDATE_NO_LOOKAHEAD_BIAS_V1
contract: spec/54_temporal_data_integrity.yaml
"""
from __future__ import annotations
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_HARNESS = ROOT / "Temp" / "computed_harness_v1.json"
DEFAULT_DATA = ROOT / "GatherTradingData.json"
OUTPUT_PATH = ROOT / "Temp" / "no_lookahead_bias_v1.json"
# SLA thresholds (spec/54)
PRICE_MAX_AGE_HOURS = 1
FUNDAMENTAL_MAX_AGE_DAYS = 30
MACRO_MAX_AGE_HOURS = 24
def _load_json(path: Path) -> dict:
if not path.exists():
return {"_missing": True, "_path": str(path)}
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception as e:
return {"_error": str(e), "_path": str(path)}
def _parse_ts(ts_str: str | None) -> datetime | None:
if not ts_str:
return None
for fmt in ("%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"):
try:
return datetime.strptime(str(ts_str)[:19], fmt[:len(str(ts_str)[:19])])
except ValueError:
continue
return None
def _check_lookahead(harness: dict) -> tuple[int, list[str]]:
"""Check feature_timestamp <= decision_timestamp."""
violations = []
meta = harness.get("meta") or {}
decision_ts_str = (
meta.get("generated_at") or meta.get("as_of_date") or ""
)
decision_ts = _parse_ts(decision_ts_str)
if decision_ts is None:
return 0, []
per_ticker = harness.get("per_ticker") or {}
if not isinstance(per_ticker, dict):
return 0, []
for ticker, data in per_ticker.items():
if not isinstance(data, dict):
continue
feat_ts_str = data.get("feature_timestamp") or data.get("data_as_of")
feat_ts = _parse_ts(feat_ts_str)
if feat_ts and feat_ts > decision_ts:
violations.append(
f"{ticker}: feature_ts={feat_ts_str} > decision_ts={decision_ts_str}"
)
return len(violations), violations
def _check_time_stop_lookahead(harness: dict, data_json: dict) -> tuple[int, list[str]]:
"""TIME_STOP: hold_days must be <= today's date diff from entry_date."""
violations = []
relative_stop = harness.get("relative_stop_signal_json")
if not relative_stop:
return 0, []
signals = relative_stop if isinstance(relative_stop, list) else []
today = datetime.now().date()
for sig in signals:
if not isinstance(sig, dict):
continue
if sig.get("signal_type") != "TIME_STOP":
continue
details = sig.get("details") or {}
hold_days = details.get("hold_days")
entry_date_str = details.get("entry_date")
if hold_days is None or entry_date_str is None:
continue
try:
entry_date = datetime.strptime(entry_date_str, "%Y-%m-%d").date()
actual_days = (today - entry_date).days
if hold_days > actual_days + 1: # +1 tolerance for intraday
violations.append(
f"{sig.get('ticker')}: TIME_STOP hold_days={hold_days} "
f"> actual_days={actual_days} (entry={entry_date_str})"
)
except (ValueError, TypeError):
continue
return len(violations), violations
def _check_freshness_sla(data_json: dict) -> list[str]:
"""Check data freshness SLA from spec/54."""
warnings = []
meta = data_json.get("meta") or {}
as_of = meta.get("as_of") or meta.get("generated_at") or ""
if not as_of:
warnings.append("DATA_SLA_SKIP: as_of timestamp not found in GatherTradingData.json")
return warnings
as_of_dt = _parse_ts(as_of)
if as_of_dt is None:
return warnings
now = datetime.now()
age_hours = (now - as_of_dt.replace(tzinfo=None)).total_seconds() / 3600
if age_hours > PRICE_MAX_AGE_HOURS:
warnings.append(
f"PRICE_DATA_STALE: age={age_hours:.1f}h > SLA={PRICE_MAX_AGE_HOURS}h"
)
return warnings
def run(harness_path: Path, data_path: Path) -> dict:
harness = _load_json(harness_path)
data_json = _load_json(data_path)
if harness.get("_missing") and data_json.get("_missing"):
result = {
"gate": "SKIP",
"reason": "harness and data both missing — no lookahead check possible",
"lookahead_violation_count": 0,
"time_stop_lookahead_count": 0,
"backfilled_after_decision_count": 0,
"freshness_violation_tickers": [],
"contract": "spec/54_temporal_data_integrity.yaml",
}
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
OUTPUT_PATH.write_text(json.dumps(result, ensure_ascii=False, indent=2))
return result
lookahead_count, lookahead_violations = _check_lookahead(harness)
time_stop_count, time_stop_violations = _check_time_stop_lookahead(harness, data_json)
freshness_warnings = _check_freshness_sla(data_json)
gate = "PASS"
if lookahead_count > 0 or time_stop_count > 0:
gate = "FAIL"
elif freshness_warnings:
gate = "WARN"
result = {
"gate": gate,
"lookahead_violation_count": lookahead_count,
"lookahead_violations": lookahead_violations,
"time_stop_lookahead_count": time_stop_count,
"time_stop_lookahead_violations": time_stop_violations,
"backfilled_after_decision_count": 0,
"freshness_violation_tickers": freshness_warnings,
"contract": "spec/54_temporal_data_integrity.yaml",
}
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
OUTPUT_PATH.write_text(json.dumps(result, ensure_ascii=False, indent=2))
return result
def main() -> None:
import argparse
parser = argparse.ArgumentParser(description="H003 No Lookahead Bias Validator")
parser.add_argument("--harness", default=str(DEFAULT_HARNESS))
parser.add_argument("--data", default=str(DEFAULT_DATA))
args = parser.parse_args()
result = run(Path(args.harness), Path(args.data))
gate = result.get("gate", "FAIL")
print(f"[H003_NO_LOOKAHEAD_BIAS] gate={gate} "
f"lookahead_violations={result.get('lookahead_violation_count', 0)} "
f"time_stop_violations={result.get('time_stop_lookahead_count', 0)}")
if gate == "FAIL":
print(" Violations:", result.get("lookahead_violations") or result.get("time_stop_lookahead_violations"))
sys.exit(1)
if __name__ == "__main__":
main()