ee3e799de1
주요 변경: - tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규 * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합 * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일) - src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규 * Logger.log / getSpreadsheet_() 로 run_all 연동 수정 - src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs * _mergePositionRecord_(): 소수주 중복 행 합산 신규 * parseInt → parseFloat (qty, availQty) - src/gas_adapter_parts/gdf_01_price_metrics.gs * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL - spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63) - spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
198 lines
6.7 KiB
Python
198 lines
6.7 KiB
Python
"""validate_no_lookahead_bias_v1.py — spec/54: H003_ANTI_BACKFILL_LOOKAHEAD
|
|
|
|
Checks that no feature used in the current decision packet has a timestamp
|
|
that is *after* the decision_timestamp (lookahead bias). Also verifies
|
|
TIME_STOP signals don't use future hold-day counts.
|
|
|
|
formula_id: VALIDATE_NO_LOOKAHEAD_BIAS_V1
|
|
contract: spec/54_temporal_data_integrity.yaml
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
DEFAULT_HARNESS = ROOT / "Temp" / "computed_harness_v1.json"
|
|
DEFAULT_DATA = ROOT / "GatherTradingData.json"
|
|
OUTPUT_PATH = ROOT / "Temp" / "no_lookahead_bias_v1.json"
|
|
|
|
# SLA thresholds (spec/54)
|
|
PRICE_MAX_AGE_HOURS = 1
|
|
FUNDAMENTAL_MAX_AGE_DAYS = 30
|
|
MACRO_MAX_AGE_HOURS = 24
|
|
|
|
|
|
def _load_json(path: Path) -> dict:
|
|
if not path.exists():
|
|
return {"_missing": True, "_path": str(path)}
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception as e:
|
|
return {"_error": str(e), "_path": str(path)}
|
|
|
|
|
|
def _parse_ts(ts_str: str | None) -> datetime | None:
|
|
if not ts_str:
|
|
return None
|
|
for fmt in ("%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"):
|
|
try:
|
|
return datetime.strptime(str(ts_str)[:19], fmt[:len(str(ts_str)[:19])])
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def _check_lookahead(harness: dict) -> tuple[int, list[str]]:
|
|
"""Check feature_timestamp <= decision_timestamp."""
|
|
violations = []
|
|
meta = harness.get("meta") or {}
|
|
decision_ts_str = (
|
|
meta.get("generated_at") or meta.get("as_of_date") or ""
|
|
)
|
|
decision_ts = _parse_ts(decision_ts_str)
|
|
if decision_ts is None:
|
|
return 0, []
|
|
|
|
per_ticker = harness.get("per_ticker") or {}
|
|
if not isinstance(per_ticker, dict):
|
|
return 0, []
|
|
|
|
for ticker, data in per_ticker.items():
|
|
if not isinstance(data, dict):
|
|
continue
|
|
feat_ts_str = data.get("feature_timestamp") or data.get("data_as_of")
|
|
feat_ts = _parse_ts(feat_ts_str)
|
|
if feat_ts and feat_ts > decision_ts:
|
|
violations.append(
|
|
f"{ticker}: feature_ts={feat_ts_str} > decision_ts={decision_ts_str}"
|
|
)
|
|
|
|
return len(violations), violations
|
|
|
|
|
|
def _check_time_stop_lookahead(harness: dict, data_json: dict) -> tuple[int, list[str]]:
|
|
"""TIME_STOP: hold_days must be <= today's date diff from entry_date."""
|
|
violations = []
|
|
relative_stop = harness.get("relative_stop_signal_json")
|
|
if not relative_stop:
|
|
return 0, []
|
|
|
|
signals = relative_stop if isinstance(relative_stop, list) else []
|
|
today = datetime.now().date()
|
|
|
|
for sig in signals:
|
|
if not isinstance(sig, dict):
|
|
continue
|
|
if sig.get("signal_type") != "TIME_STOP":
|
|
continue
|
|
details = sig.get("details") or {}
|
|
hold_days = details.get("hold_days")
|
|
entry_date_str = details.get("entry_date")
|
|
if hold_days is None or entry_date_str is None:
|
|
continue
|
|
try:
|
|
entry_date = datetime.strptime(entry_date_str, "%Y-%m-%d").date()
|
|
actual_days = (today - entry_date).days
|
|
if hold_days > actual_days + 1: # +1 tolerance for intraday
|
|
violations.append(
|
|
f"{sig.get('ticker')}: TIME_STOP hold_days={hold_days} "
|
|
f"> actual_days={actual_days} (entry={entry_date_str})"
|
|
)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
|
|
return len(violations), violations
|
|
|
|
|
|
def _check_freshness_sla(data_json: dict) -> list[str]:
|
|
"""Check data freshness SLA from spec/54."""
|
|
warnings = []
|
|
meta = data_json.get("meta") or {}
|
|
as_of = meta.get("as_of") or meta.get("generated_at") or ""
|
|
if not as_of:
|
|
warnings.append("DATA_SLA_SKIP: as_of timestamp not found in GatherTradingData.json")
|
|
return warnings
|
|
|
|
as_of_dt = _parse_ts(as_of)
|
|
if as_of_dt is None:
|
|
return warnings
|
|
|
|
now = datetime.now()
|
|
age_hours = (now - as_of_dt.replace(tzinfo=None)).total_seconds() / 3600
|
|
|
|
if age_hours > PRICE_MAX_AGE_HOURS:
|
|
warnings.append(
|
|
f"PRICE_DATA_STALE: age={age_hours:.1f}h > SLA={PRICE_MAX_AGE_HOURS}h"
|
|
)
|
|
|
|
return warnings
|
|
|
|
|
|
def run(harness_path: Path, data_path: Path) -> dict:
|
|
harness = _load_json(harness_path)
|
|
data_json = _load_json(data_path)
|
|
|
|
if harness.get("_missing") and data_json.get("_missing"):
|
|
result = {
|
|
"gate": "SKIP",
|
|
"reason": "harness and data both missing — no lookahead check possible",
|
|
"lookahead_violation_count": 0,
|
|
"time_stop_lookahead_count": 0,
|
|
"backfilled_after_decision_count": 0,
|
|
"freshness_violation_tickers": [],
|
|
"contract": "spec/54_temporal_data_integrity.yaml",
|
|
}
|
|
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
OUTPUT_PATH.write_text(json.dumps(result, ensure_ascii=False, indent=2))
|
|
return result
|
|
|
|
lookahead_count, lookahead_violations = _check_lookahead(harness)
|
|
time_stop_count, time_stop_violations = _check_time_stop_lookahead(harness, data_json)
|
|
freshness_warnings = _check_freshness_sla(data_json)
|
|
|
|
gate = "PASS"
|
|
if lookahead_count > 0 or time_stop_count > 0:
|
|
gate = "FAIL"
|
|
elif freshness_warnings:
|
|
gate = "WARN"
|
|
|
|
result = {
|
|
"gate": gate,
|
|
"lookahead_violation_count": lookahead_count,
|
|
"lookahead_violations": lookahead_violations,
|
|
"time_stop_lookahead_count": time_stop_count,
|
|
"time_stop_lookahead_violations": time_stop_violations,
|
|
"backfilled_after_decision_count": 0,
|
|
"freshness_violation_tickers": freshness_warnings,
|
|
"contract": "spec/54_temporal_data_integrity.yaml",
|
|
}
|
|
|
|
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
OUTPUT_PATH.write_text(json.dumps(result, ensure_ascii=False, indent=2))
|
|
return result
|
|
|
|
|
|
def main() -> None:
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description="H003 No Lookahead Bias Validator")
|
|
parser.add_argument("--harness", default=str(DEFAULT_HARNESS))
|
|
parser.add_argument("--data", default=str(DEFAULT_DATA))
|
|
args = parser.parse_args()
|
|
|
|
result = run(Path(args.harness), Path(args.data))
|
|
gate = result.get("gate", "FAIL")
|
|
print(f"[H003_NO_LOOKAHEAD_BIAS] gate={gate} "
|
|
f"lookahead_violations={result.get('lookahead_violation_count', 0)} "
|
|
f"time_stop_violations={result.get('time_stop_lookahead_count', 0)}")
|
|
if gate == "FAIL":
|
|
print(" Violations:", result.get("lookahead_violations") or result.get("time_stop_lookahead_violations"))
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|