데이터 게이트 검증기와 DAG 연결

This commit is contained in:
2026-06-18 01:57:19 +09:00
parent d7f9d3a944
commit 318eb87a26
8 changed files with 422 additions and 8 deletions
+92
View File
@@ -0,0 +1,92 @@
"""validate_alpha_feedback_loop_v2.py — ALPHA_FEEDBACK_LOOP_VALIDATE_V2
Temp/alpha_feedback_loop_v2.json의 구조와 데이터 게이트 상태를 검증한다.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_INPUT = ROOT / "Temp" / "alpha_feedback_loop_v2.json"
DEFAULT_OUT = ROOT / "Temp" / "validate_alpha_feedback_loop_v2.json"
FORMULA_ID = "ALPHA_FEEDBACK_LOOP_VALIDATE_V2"
def _load(path: Path) -> Any:
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
def _is_dict(value: Any) -> bool:
return isinstance(value, dict)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--input", default=str(DEFAULT_INPUT))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
input_path = Path(args.input)
input_path = input_path if input_path.is_absolute() else ROOT / input_path
out_path = Path(args.out)
out_path = out_path if out_path.is_absolute() else ROOT / out_path
payload = _load(input_path)
errors: list[str] = []
if not _is_dict(payload):
errors.append("payload must be object")
else:
if payload.get("formula_id") != "ALPHA_FEEDBACK_LOOP_V2":
errors.append("formula_id mismatch")
status = str(payload.get("status") or "")
if status not in {"DATA_INSUFFICIENT", "ANALYZED"}:
errors.append(f"status={status}")
if "cases_analyzed" not in payload or not isinstance(payload.get("cases_analyzed"), int):
errors.append("cases_analyzed must be int")
if "recommended_adjustments" not in payload or not isinstance(payload.get("recommended_adjustments"), list):
errors.append("recommended_adjustments must be list")
cases = payload.get("cases_analyzed")
recs = payload.get("recommended_adjustments")
if isinstance(cases, int):
if cases < 10 and status != "DATA_INSUFFICIENT":
errors.append("cases_analyzed < 10 requires DATA_INSUFFICIENT")
if cases >= 10 and status != "ANALYZED":
errors.append("cases_analyzed >= 10 requires ANALYZED")
if isinstance(recs, list) and status == "DATA_INSUFFICIENT" and recs:
errors.append("DATA_INSUFFICIENT must not carry recommendations")
if status == "ANALYZED":
for key in [
"active_signal_rate_pct", "active_signal_n",
"passive_signal_rate_pct", "passive_signal_n",
"combined_rate_pct", "sell_signal_rate_pct", "sell_signal_n",
"pa1_current_ratio", "pa1_thesis_sum", "pa1_antithesis_sum",
"component_analysis", "note",
]:
if key not in payload:
errors.append(f"missing field: {key}")
result = {
"formula_id": FORMULA_ID,
"gate": "PASS" if not errors else "FAIL",
"checked_file": str(Path(args.input).as_posix()),
"errors": errors,
}
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0 if not errors else 1
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,105 @@
"""validate_operational_alpha_calibration_v2.py — OPERATIONAL_ALPHA_CALIBRATION_VALIDATE_V2
Temp/operational_alpha_calibration_v2.json의 최소 계약을 검증한다.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_INPUT = ROOT / "Temp" / "operational_alpha_calibration_v2.json"
DEFAULT_OUT = ROOT / "Temp" / "validate_operational_alpha_calibration_v2.json"
FORMULA_ID = "OPERATIONAL_ALPHA_CALIBRATION_VALIDATE_V2"
def _load(path: Path) -> Any:
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
def _is_dict(value: Any) -> bool:
return isinstance(value, dict)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--input", default=str(DEFAULT_INPUT))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
input_path = Path(args.input)
input_path = input_path if input_path.is_absolute() else ROOT / input_path
out_path = Path(args.out)
out_path = out_path if out_path.is_absolute() else ROOT / out_path
payload = _load(input_path)
errors: list[str] = []
if not _is_dict(payload):
errors.append("payload must be object")
else:
if payload.get("formula_id") != "OPERATIONAL_ALPHA_CALIBRATION_V2":
errors.append("formula_id mismatch")
if payload.get("gate") not in {"PERFORMANCE_READY", "NOT_READY"}:
errors.append(f"gate={payload.get('gate')}")
if "performance_ready" not in payload or not isinstance(payload.get("performance_ready"), bool):
errors.append("performance_ready must be bool")
if "confidence_score" not in payload or not isinstance(payload.get("confidence_score"), (int, float)):
errors.append("confidence_score must be numeric")
for key in ["metrics", "targets", "readiness_reasons"]:
if key not in payload:
errors.append(f"missing field: {key}")
metrics = payload.get("metrics")
if isinstance(metrics, dict):
for key in [
"outcome_quality_score",
"t20_operational_sample",
"t20_operational_pass_rate",
"t5_operational_sample",
"t5_operational_pass_rate",
"trade_quality_t5_score",
"value_damage_pct_avg",
]:
if key not in metrics:
errors.append(f"missing field: metrics.{key}")
targets = payload.get("targets")
if isinstance(targets, dict):
for key in [
"outcome_quality_score_min",
"t20_operational_sample_min",
"t20_operational_pass_rate_min",
"t5_operational_sample_min",
"t5_operational_pass_rate_min",
"trade_quality_t5_score_min",
"value_damage_pct_avg_max",
]:
if key not in targets:
errors.append(f"missing field: targets.{key}")
reasons = payload.get("readiness_reasons")
if isinstance(reasons, list):
if payload.get("gate") == "PERFORMANCE_READY" and reasons:
errors.append("PERFORMANCE_READY must not have readiness reasons")
if payload.get("gate") == "NOT_READY" and not reasons:
errors.append("NOT_READY must have readiness reasons")
result = {
"formula_id": FORMULA_ID,
"gate": "PASS" if not errors else "FAIL",
"checked_file": str(Path(args.input).as_posix()),
"errors": errors,
}
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0 if not errors else 1
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,131 @@
"""validate_prediction_accuracy_harness_v2.py — PREDICTION_ACCURACY_HARNESS_VALIDATE_V2
Temp/prediction_accuracy_harness_v2.json의 기본 구조와 허용된 데이터 게이트 상태를 검증한다.
현재는 운영 T+5/T+20 표본이 부족할 수 있으므로 INSUFFICIENT_SAMPLES는 허용한다.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_INPUT = ROOT / "Temp" / "prediction_accuracy_harness_v2.json"
DEFAULT_OUT = ROOT / "Temp" / "validate_prediction_accuracy_harness_v2.json"
FORMULA_ID = "PREDICTION_ACCURACY_HARNESS_VALIDATE_V2"
ALLOWED_CALIBRATION = {
"CALIBRATED",
"MONITOR",
"PAE_CALIBRATION_REQUIRED",
"BUY_PROPOSAL_FROZEN_RECOMMEND",
"INSUFFICIENT_SAMPLES",
}
def _load(path: Path) -> Any:
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
def _is_dict(value: Any) -> bool:
return isinstance(value, dict)
def _ensure_fields(payload: dict[str, Any], path: str, fields: list[str], errors: list[str]) -> None:
block = payload
if path:
for part in path.split("."):
block = block.get(part) if isinstance(block, dict) else None
if not isinstance(block, dict):
errors.append(f"{path or 'root'} must be object")
return
for field in fields:
if field not in block:
errors.append(f"missing field: {path + '.' if path else ''}{field}")
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--input", default=str(DEFAULT_INPUT))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
input_path = Path(args.input)
input_path = input_path if input_path.is_absolute() else ROOT / input_path
out_path = Path(args.out)
out_path = out_path if out_path.is_absolute() else ROOT / out_path
payload = _load(input_path)
errors: list[str] = []
if not _is_dict(payload):
errors.append("payload must be object")
else:
if payload.get("formula_id") != "PREDICTION_ACCURACY_HARNESS_V2":
errors.append("formula_id mismatch")
calibration_state = str(payload.get("calibration_state") or "")
if calibration_state not in ALLOWED_CALIBRATION:
errors.append(f"calibration_state={calibration_state}")
for key in [
"as_of_date",
"data_origin_audit",
"windows",
"evaluation_methodology",
]:
if key not in payload:
errors.append(f"missing field: {key}")
audit = payload.get("data_origin_audit")
if isinstance(audit, dict):
for key in [
"operational_sample_count",
"replay_sample_count",
"untagged_row_count",
"unrealized_outcome_row_count",
"replay_in_live_stats",
"operational_only_accuracy",
]:
if key not in audit:
errors.append(f"missing field: data_origin_audit.{key}")
for key in [
"t1_op_rate", "t1_sample", "t5_op_rate", "t5_sample",
"t20_op_rate", "t20_sample", "t20_replay_rate", "t20_replay_sample",
"t20_replay_avg_return_pct", "t20_replay_stdev_return_pct",
"window_90d_rate",
]:
if key not in payload:
errors.append(f"missing field: {key}")
windows = payload.get("windows")
if isinstance(windows, dict):
_ensure_fields(windows, "t1", ["all", "30d", "7d"], errors)
_ensure_fields(windows, "t5", ["all", "active_passive", "30d", "90d"], errors)
_ensure_fields(windows, "t20", ["operational", "operational_30d", "replay", "replay_return_dist"], errors)
else:
errors.append("windows must be object")
t5_sample = payload.get("t5_sample")
if isinstance(t5_sample, int) and t5_sample < 30 and calibration_state != "INSUFFICIENT_SAMPLES":
errors.append("t5_sample < 30 requires INSUFFICIENT_SAMPLES")
result = {
"formula_id": FORMULA_ID,
"gate": "PASS" if not errors else "FAIL",
"checked_file": str(Path(args.input).as_posix()),
"errors": errors,
}
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0 if not errors else 1
if __name__ == "__main__":
raise SystemExit(main())