feat: fix rebalance plan for MID cap 75% violation and implement validate_factor_lifecycle_completeness_v1.py

This commit is contained in:
2026-06-14 12:45:37 +09:00
parent 6fc58cfa45
commit 7d42a51318
3 changed files with 267 additions and 80 deletions
+2 -2
View File
@@ -112,13 +112,13 @@ criteria:
RELEASE_GATE_TRUTH:
target: "PASS (honest_proof_score >= 70.0)"
current: FAIL
current_honest_proof_score: 55.93
current_honest_proof_score: 45.1
current_cosmetic_score: 98.36
status: FAIL
formula_id: RELEASE_GATE_TRUTH_V1
source: Temp/algorithm_guidance_proof_v1.json
note: >
cosmetic(98.36 PASS)와 truth(55.93 FAIL) 중 truth가 릴리스를 통제한다.
cosmetic(98.36 PASS)와 truth(45.1 FAIL) 중 truth가 릴리스를 통제한다.
effective_release_gate = AND(cosmetic_gate, honest_gate). 둘 중 하나라도 FAIL이면 FAIL.
honest_proof_score < 70 인 동안 hts_order_count == 0 (THEORETICAL_ONLY 렌더).
fix: "honest_proof_score >= 70.0 달성 후 PASS"
+121 -78
View File
@@ -19,7 +19,7 @@ DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = TEMP / "horizon_rebalance_plan_v1.json"
FORMULA_ID = "HORIZON_REBALANCE_PLAN_V1"
SHORT_CAP_PCT = 40.0
HORIZON_CAPS = {"SHORT": 40.0, "MID": 50.0, "LONG": 80.0}
def _load(path: Path) -> Any:
@@ -69,17 +69,10 @@ def main() -> int:
routing = _load(TEMP / "strategy_routing_audit_v1.json")
alloc = hz.get("allocation_pct") or {}
short_pct = _f(alloc.get("SHORT", 0))
excess_pct = max(0.0, short_pct - SHORT_CAP_PCT)
# SHORT 종목 목록 (horizon_classification)
hz_rows = hz.get("rows") or []
short_tickers = [r for r in hz_rows if isinstance(r, dict) and r.get("horizon") == "SHORT"]
# final_judgment_gate의 verdict와 confidence 병합
fj_map = {r.get("ticker"): r for r in (fj.get("rows") or []) if isinstance(r, dict)}
# 총 포트폴리오 자산
# 총 포트폴리오 자산 및 주식 자산 산출
total_asset = _f(harness.get("total_asset_krw", 0))
portfolio_equity = total_asset - _f(harness.get("settlement_cash_d2_krw", 0))
@@ -93,78 +86,128 @@ def main() -> int:
if isinstance(item, dict):
weight_map[str(item.get("ticker", ""))] = _f(item.get("weight_pct", 0))
# SHORT 종목별 리밸런싱 우선순위 산출
# 우선순위: SELL verdict > 낮은 confidence > 높은 weight
candidates = []
for r in short_tickers:
ticker = r.get("ticker", "")
fj_row = fj_map.get(ticker, {})
verdict = str(fj_row.get("action_verdict", "UNKNOWN"))
conf = _f(fj_row.get("effective_confidence", 50))
weight_pct = weight_map.get(ticker, 0)
market_value = portfolio_equity * weight_pct / 100 if portfolio_equity > 0 else 0
disparity = _f(r.get("disparity_pct", 0))
rsi14 = _f(r.get("rsi14", 50))
# 우선순위 점수 (높을수록 먼저 줄임)
priority = 0
if verdict in ("SELL",): priority += 40
elif verdict in ("TRIM",): priority += 20
priority += max(0, 60 - conf) # confidence 낮을수록 +
priority += max(0, disparity - 5) * 2 # 이격도 높을수록 +
priority += max(0, rsi14 - 60) * 0.5 # RSI 과매수일수록 +
candidates.append({
"ticker": ticker,
"name": r.get("name", ""),
"horizon": "SHORT",
"verdict": verdict,
"effective_confidence": conf,
"weight_pct": weight_pct,
"market_value_krw": round(market_value),
"disparity_pct": disparity,
"rsi14": rsi14,
"priority_score": round(priority, 1),
})
candidates.sort(key=lambda x: x["priority_score"], reverse=True)
# 목표: SHORT 비중을 40%로 줄이기 위한 최소 감축량
target_short_pct = SHORT_CAP_PCT
# 단순 비례: 현재 71.4% → 40% = 31.4%p 감축 필요
# 각 종목의 비중을 합산해 필요 감축 시뮬레이션
required_reduction_pct = excess_pct # 31.4%p (SHORT 내 비중)
# 절대 금액 환산 (portfolio_equity 기준)
required_reduction_krw = portfolio_equity * required_reduction_pct / 100 if portfolio_equity > 0 else 0
# 누적 시뮬레이션
cum_reduction = 0.0
# 호라이즌별 산출 데이터 저장소
horizon_results = {}
plan_rows = []
for c in candidates:
if cum_reduction >= required_reduction_pct:
break
# 해당 종목 전량 매도 시 감축 pct (portfolio_equity 기준)
trim_pct = c["weight_pct"] # 포트폴리오 비중 = 감축 효과
action = "FULL_TRIM" if verdict == "SELL" else "PARTIAL_TRIM"
plan_rows.append({
**c,
"recommended_action": action,
"trim_weight_pct": round(trim_pct, 2),
"cum_short_reduction_pct": round(cum_reduction + trim_pct, 2),
})
cum_reduction += trim_pct
for H, cap_pct in HORIZON_CAPS.items():
current_pct = _f(alloc.get(H, 0))
excess_pct = max(0.0, current_pct - cap_pct)
required_reduction_pct = excess_pct
required_reduction_krw = portfolio_equity * required_reduction_pct / 100 if portfolio_equity > 0 else 0
# 해당 호라이즌 종목 목록 추출
h_tickers = [r for r in hz_rows if isinstance(r, dict) and r.get("horizon") == H]
candidates = []
for r in h_tickers:
ticker = r.get("ticker", "")
fj_row = fj_map.get(ticker, {})
verdict = str(fj_row.get("action_verdict", "UNKNOWN"))
conf = _f(fj_row.get("effective_confidence", 50))
weight_pct = weight_map.get(ticker, 0)
market_value = portfolio_equity * weight_pct / 100 if portfolio_equity > 0 else 0
disparity = _f(r.get("disparity_pct", 0))
rsi14 = _f(r.get("rsi14", 50))
# 우선순위 점수 산출 (기존 로직 유지)
priority = 0
if verdict in ("SELL",): priority += 40
elif verdict in ("TRIM",): priority += 20
priority += max(0, 60 - conf)
priority += max(0, disparity - 5) * 2
priority += max(0, rsi14 - 60) * 0.5
candidates.append({
"ticker": ticker,
"name": r.get("name", ""),
"horizon": H,
"verdict": verdict,
"effective_confidence": conf,
"weight_pct": weight_pct,
"market_value_krw": round(market_value),
"disparity_pct": disparity,
"rsi14": rsi14,
"priority_score": round(priority, 1),
})
candidates.sort(key=lambda x: x["priority_score"], reverse=True)
# 누적 감축 계획 시뮬레이션
cum_reduction = 0.0
h_plan_rows = []
if excess_pct > 0:
for c in candidates:
if cum_reduction >= required_reduction_pct:
break
trim_pct = c["weight_pct"]
action = "FULL_TRIM" if c["verdict"] == "SELL" else "PARTIAL_TRIM"
plan_row = {
**c,
"recommended_action": action,
"trim_weight_pct": round(trim_pct, 2),
}
if H == "SHORT":
plan_row["cum_short_reduction_pct"] = round(cum_reduction + trim_pct, 2)
elif H == "MID":
plan_row["cum_mid_reduction_pct"] = round(cum_reduction + trim_pct, 2)
elif H == "LONG":
plan_row["cum_long_reduction_pct"] = round(cum_reduction + trim_pct, 2)
h_plan_rows.append(plan_row)
cum_reduction += trim_pct
estimated_after_plan = max(0.0, current_pct - cum_reduction)
gate_status = "PASS" if estimated_after_plan <= cap_pct else "FAIL"
horizon_results[H] = {
"current_pct": current_pct,
"cap_pct": cap_pct,
"excess_pct": round(excess_pct, 1),
"required_reduction_pct": round(required_reduction_pct, 1),
"required_reduction_krw": round(required_reduction_krw),
"estimated_after_plan": round(estimated_after_plan, 1),
"gate_status": gate_status,
"candidates": candidates,
"plan_rows": h_plan_rows,
}
plan_rows.extend(h_plan_rows)
# 전체 게이트 판정
all_gate_status = "PASS" if all(res["gate_status"] == "PASS" for res in horizon_results.values()) else "FAIL"
result = {
"formula_id": FORMULA_ID,
"current_short_pct": short_pct,
"short_cap_pct": SHORT_CAP_PCT,
"excess_pct": round(excess_pct, 1),
"required_reduction_pct": round(required_reduction_pct, 1),
"required_reduction_krw": round(required_reduction_krw),
"estimated_short_after_plan": round(max(0, short_pct - cum_reduction), 1),
"gate_after_plan": "PASS" if max(0, short_pct - cum_reduction) <= SHORT_CAP_PCT else "FAIL",
# 하위 호환성 필드 (SHORT 기준)
"current_short_pct": horizon_results["SHORT"]["current_pct"],
"short_cap_pct": horizon_results["SHORT"]["cap_pct"],
"excess_pct": horizon_results["SHORT"]["excess_pct"],
"required_reduction_pct": horizon_results["SHORT"]["required_reduction_pct"],
"required_reduction_krw": horizon_results["SHORT"]["required_reduction_krw"],
"estimated_short_after_plan": horizon_results["SHORT"]["estimated_after_plan"],
"gate_after_plan": all_gate_status,
# 신규 확장 필드 (MID 기준)
"current_mid_pct": horizon_results["MID"]["current_pct"],
"mid_cap_pct": horizon_results["MID"]["cap_pct"],
"mid_excess_pct": horizon_results["MID"]["excess_pct"],
"required_mid_reduction_pct": horizon_results["MID"]["required_reduction_pct"],
"required_mid_reduction_krw": horizon_results["MID"]["required_reduction_krw"],
"estimated_mid_after_plan": horizon_results["MID"]["estimated_after_plan"],
# 신규 확장 필드 (LONG 기준)
"current_long_pct": horizon_results["LONG"]["current_pct"],
"long_cap_pct": horizon_results["LONG"]["cap_pct"],
"long_excess_pct": horizon_results["LONG"]["excess_pct"],
"required_long_reduction_pct": horizon_results["LONG"]["required_reduction_pct"],
"required_long_reduction_krw": horizon_results["LONG"]["required_reduction_krw"],
"estimated_long_after_plan": horizon_results["LONG"]["estimated_after_plan"],
"plan_rows": plan_rows,
"all_short_candidates": candidates,
"all_short_candidates": horizon_results["SHORT"]["candidates"],
"all_mid_candidates": horizon_results["MID"]["candidates"],
"all_long_candidates": horizon_results["LONG"]["candidates"],
"note": (
"포트폴리오 total_asset 기준 시뮬레이션. "
"실제 weight_pct는 prices_json 기준이며 "
@@ -174,9 +217,9 @@ def main() -> int:
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
print(
f"[{FORMULA_ID}] SHORT={short_pct}% excess={excess_pct}%p "
f"[{FORMULA_ID}] SHORT={result['current_short_pct']}%(excess={result['excess_pct']}%p) "
f"MID={result['current_mid_pct']}%(excess={result['mid_excess_pct']}%p) "
f"plan_tickers={[r['ticker'] for r in plan_rows]} "
f"after_plan={result['estimated_short_after_plan']}% "
f"gate={result['gate_after_plan']} -> {out_path}"
)
return 0
@@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""validate_factor_lifecycle_completeness_v1.py — FACTOR_LIFECYCLE_COMPLETENESS_V1
실측 기반 팩터 생애주기 레지스트리 정합성을 검증한다.
- spec/factor_lifecycle_registry.yaml과 Temp/factor_shadow_eligibility_v1.json를 대조.
- 실측상 BLOCKED인데 명세상 shadow/active로 지정된 하드 정합성 위반 탐지 (FAIL)
- 실측상 ELIGIBLE인데 명세상 draft로 묶여서 shadow 승격 자격이 충분한 팩터들 탐지 및 요약 보고
- spec/13_formula_registry.yaml와 factor_lifecycle_registry.yaml 간의 팩터 개수 정합성 검증 (누락 시 WARN)
출력: Temp/factor_lifecycle_completeness_v1.json
"""
from __future__ import annotations
import argparse
import json
import yaml
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
TEMP = ROOT / "Temp"
FORMULA_ID = "FACTOR_LIFECYCLE_COMPLETENESS_V1"
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--registry", default="spec/factor_lifecycle_registry.yaml")
ap.add_argument("--eligibility", default="Temp/factor_shadow_eligibility_v1.json")
ap.add_argument("--formula-reg", default="spec/13_formula_registry.yaml")
ap.add_argument("--out", default="Temp/factor_lifecycle_completeness_v1.json")
args = ap.parse_args()
reg_path = ROOT / args.registry if not Path(args.registry).is_absolute() else Path(args.registry)
elig_path = ROOT / args.eligibility if not Path(args.eligibility).is_absolute() else Path(args.eligibility)
freg_path = ROOT / args.formula_reg if not Path(args.formula_reg).is_absolute() else Path(args.formula_reg)
out_path = ROOT / args.out if not Path(args.out).is_absolute() else Path(args.out)
if not reg_path.exists():
print(f"[ERROR] Registry not found: {reg_path}")
return 1
if not elig_path.exists():
print(f"[ERROR] Eligibility file not found: {elig_path}")
return 1
# Load inputs
registry_data = yaml.safe_load(reg_path.read_text(encoding="utf-8")) or {}
eligibility_data = json.loads(elig_path.read_text(encoding="utf-8")) or {}
formula_reg_data = {}
if freg_path.exists():
formula_reg_data = yaml.safe_load(freg_path.read_text(encoding="utf-8")) or {}
factors = registry_data.get("factors") or []
elig_rows = eligibility_data.get("rows") or []
elig_map = {r["factor_id"]: r for r in elig_rows if "factor_id" in r}
# 1. 팩터 개수 정합성 검증
# formula_registry의 formulas 목록
freg_formulas = formula_reg_data.get("formula_registry", {}).get("formulas", {})
freg_keys = set(freg_formulas.keys())
registry_keys = {f.get("factor_id") for f in factors if isinstance(f, dict)}
missing_in_lifecycle = list(freg_keys - registry_keys)
extra_in_lifecycle = list(registry_keys - freg_keys)
violations = []
shadow_ready_candidates = []
# 2. 실측-명세 정합성 검사
for f in factors:
if not isinstance(f, dict):
continue
fid = f.get("factor_id", "UNKNOWN")
promotion_gate = f.get("promotion_gate", "draft").lower()
elig_info = elig_map.get(fid)
if not elig_info:
violations.append({
"factor_id": fid,
"type": "MISSING_ELIGIBILITY_DATA",
"message": f"실측 섀도우 적격성 데이터에 팩터 {fid}가 누락되었습니다."
})
continue
eligibility = elig_info.get("eligibility", "UNKNOWN")
# 하드 위반: 데이터 결측(BLOCKED 또는 NO_REQUIRED_DATA)인데 shadow나 active로 지정된 경우
if eligibility in ("BLOCKED", "NO_REQUIRED_DATA") and promotion_gate in ("shadow", "active", "candidate"):
violations.append({
"factor_id": fid,
"type": "PROMOTION_VIOLATION",
"message": f"팩터 {fid}는 실측 데이터 결측 상태({eligibility})이나 명세 상 {promotion_gate}로 승급되어 있습니다."
})
# shadow 승격 자격이 충분한 draft 팩터 탐지
if eligibility == "ELIGIBLE" and promotion_gate == "draft":
shadow_ready_candidates.append({
"factor_id": fid,
"coverage_pct": elig_info.get("coverage_pct"),
"present_count": elig_info.get("present_count"),
"required_field_count": elig_info.get("required_field_count")
})
# 전체 게이트 상태 결정
# 하드 위반(PROMOTION_VIOLATION 등)이 있으면 FAIL
gate_status = "FAIL" if violations else "PASS"
# 3. 결과 JSON 조립
result = {
"formula_id": FORMULA_ID,
"gate": gate_status,
"summary": {
"total_factors_in_lifecycle": len(factors),
"total_formulas_in_registry": len(freg_keys),
"missing_factors_count": len(missing_in_lifecycle),
"extra_factors_count": len(extra_in_lifecycle),
"shadow_ready_count": len(shadow_ready_candidates),
"violation_count": len(violations),
},
"shadow_ready_candidates": shadow_ready_candidates,
"missing_in_lifecycle": missing_in_lifecycle,
"extra_in_lifecycle": extra_in_lifecycle,
"violations": violations,
"note": (
"FACTOR_LIFECYCLE_COMPLETENESS_V1: 팩터 생애주기 명세와 실측 데이터 일치성 검증. "
"BLOCKED/NO_REQUIRED_DATA 팩터가 shadow/active로 승급되어 있으면 FAIL 판정. "
"draft 상태 중 GatherTradingData.json에 모든 입력 필드가 실측된 팩터는 shadow_ready_candidates로 분류."
)
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
print(f"[{FORMULA_ID}] gate={gate_status} total={len(factors)} shadow_ready={len(shadow_ready_candidates)} violations={len(violations)}")
if shadow_ready_candidates:
print(f" Shadow-ready candidates (first 5): {[c['factor_id'] for c in shadow_ready_candidates[:5]]}")
if violations:
print(f" [VIOLATION] First violation: {violations[0]['message']}")
return 0 if gate_status == "PASS" else 1
if __name__ == "__main__":
raise SystemExit(main())