From 7d42a51318a81a3577953fac37819f87fe9d3dcf Mon Sep 17 00:00:00 2001 From: kjh2064 Date: Sun, 14 Jun 2026 12:45:37 +0900 Subject: [PATCH] feat: fix rebalance plan for MID cap 75% violation and implement validate_factor_lifecycle_completeness_v1.py --- spec/30_completion_criteria_contract.yaml | 4 +- tools/build_horizon_rebalance_plan_v1.py | 199 +++++++++++------- ...lidate_factor_lifecycle_completeness_v1.py | 144 +++++++++++++ 3 files changed, 267 insertions(+), 80 deletions(-) create mode 100644 tools/validate_factor_lifecycle_completeness_v1.py diff --git a/spec/30_completion_criteria_contract.yaml b/spec/30_completion_criteria_contract.yaml index e8ed7aa..39b0f1f 100644 --- a/spec/30_completion_criteria_contract.yaml +++ b/spec/30_completion_criteria_contract.yaml @@ -112,13 +112,13 @@ criteria: RELEASE_GATE_TRUTH: target: "PASS (honest_proof_score >= 70.0)" current: FAIL - current_honest_proof_score: 55.93 + current_honest_proof_score: 45.1 current_cosmetic_score: 98.36 status: FAIL formula_id: RELEASE_GATE_TRUTH_V1 source: Temp/algorithm_guidance_proof_v1.json note: > - cosmetic(98.36 PASS)와 truth(55.93 FAIL) 중 truth가 릴리스를 통제한다. + cosmetic(98.36 PASS)와 truth(45.1 FAIL) 중 truth가 릴리스를 통제한다. effective_release_gate = AND(cosmetic_gate, honest_gate). 둘 중 하나라도 FAIL이면 FAIL. honest_proof_score < 70 인 동안 hts_order_count == 0 (THEORETICAL_ONLY 렌더). fix: "honest_proof_score >= 70.0 달성 후 PASS" diff --git a/tools/build_horizon_rebalance_plan_v1.py b/tools/build_horizon_rebalance_plan_v1.py index 0397b05..a891ab6 100644 --- a/tools/build_horizon_rebalance_plan_v1.py +++ b/tools/build_horizon_rebalance_plan_v1.py @@ -19,7 +19,7 @@ DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = TEMP / "horizon_rebalance_plan_v1.json" FORMULA_ID = "HORIZON_REBALANCE_PLAN_V1" -SHORT_CAP_PCT = 40.0 +HORIZON_CAPS = {"SHORT": 40.0, "MID": 50.0, "LONG": 80.0} def _load(path: Path) -> Any: @@ -69,17 +69,10 @@ def main() -> int: routing = _load(TEMP / "strategy_routing_audit_v1.json") alloc = hz.get("allocation_pct") or {} - short_pct = _f(alloc.get("SHORT", 0)) - excess_pct = max(0.0, short_pct - SHORT_CAP_PCT) - - # SHORT 종목 목록 (horizon_classification) hz_rows = hz.get("rows") or [] - short_tickers = [r for r in hz_rows if isinstance(r, dict) and r.get("horizon") == "SHORT"] - - # final_judgment_gate의 verdict와 confidence 병합 fj_map = {r.get("ticker"): r for r in (fj.get("rows") or []) if isinstance(r, dict)} - # 총 포트폴리오 자산 + # 총 포트폴리오 자산 및 주식 자산 산출 total_asset = _f(harness.get("total_asset_krw", 0)) portfolio_equity = total_asset - _f(harness.get("settlement_cash_d2_krw", 0)) @@ -93,78 +86,128 @@ def main() -> int: if isinstance(item, dict): weight_map[str(item.get("ticker", ""))] = _f(item.get("weight_pct", 0)) - # SHORT 종목별 리밸런싱 우선순위 산출 - # 우선순위: SELL verdict > 낮은 confidence > 높은 weight - candidates = [] - for r in short_tickers: - ticker = r.get("ticker", "") - fj_row = fj_map.get(ticker, {}) - verdict = str(fj_row.get("action_verdict", "UNKNOWN")) - conf = _f(fj_row.get("effective_confidence", 50)) - weight_pct = weight_map.get(ticker, 0) - market_value = portfolio_equity * weight_pct / 100 if portfolio_equity > 0 else 0 - disparity = _f(r.get("disparity_pct", 0)) - rsi14 = _f(r.get("rsi14", 50)) - - # 우선순위 점수 (높을수록 먼저 줄임) - priority = 0 - if verdict in ("SELL",): priority += 40 - elif verdict in ("TRIM",): priority += 20 - priority += max(0, 60 - conf) # confidence 낮을수록 + - priority += max(0, disparity - 5) * 2 # 이격도 높을수록 + - priority += max(0, rsi14 - 60) * 0.5 # RSI 과매수일수록 + - - candidates.append({ - "ticker": ticker, - "name": r.get("name", ""), - "horizon": "SHORT", - "verdict": verdict, - "effective_confidence": conf, - "weight_pct": weight_pct, - "market_value_krw": round(market_value), - "disparity_pct": disparity, - "rsi14": rsi14, - "priority_score": round(priority, 1), - }) - - candidates.sort(key=lambda x: x["priority_score"], reverse=True) - - # 목표: SHORT 비중을 40%로 줄이기 위한 최소 감축량 - target_short_pct = SHORT_CAP_PCT - # 단순 비례: 현재 71.4% → 40% = 31.4%p 감축 필요 - # 각 종목의 비중을 합산해 필요 감축 시뮬레이션 - required_reduction_pct = excess_pct # 31.4%p (SHORT 내 비중) - # 절대 금액 환산 (portfolio_equity 기준) - required_reduction_krw = portfolio_equity * required_reduction_pct / 100 if portfolio_equity > 0 else 0 - - # 누적 시뮬레이션 - cum_reduction = 0.0 + # 호라이즌별 산출 데이터 저장소 + horizon_results = {} plan_rows = [] - for c in candidates: - if cum_reduction >= required_reduction_pct: - break - # 해당 종목 전량 매도 시 감축 pct (portfolio_equity 기준) - trim_pct = c["weight_pct"] # 포트폴리오 비중 = 감축 효과 - action = "FULL_TRIM" if verdict == "SELL" else "PARTIAL_TRIM" - plan_rows.append({ - **c, - "recommended_action": action, - "trim_weight_pct": round(trim_pct, 2), - "cum_short_reduction_pct": round(cum_reduction + trim_pct, 2), - }) - cum_reduction += trim_pct + + for H, cap_pct in HORIZON_CAPS.items(): + current_pct = _f(alloc.get(H, 0)) + excess_pct = max(0.0, current_pct - cap_pct) + required_reduction_pct = excess_pct + required_reduction_krw = portfolio_equity * required_reduction_pct / 100 if portfolio_equity > 0 else 0 + + # 해당 호라이즌 종목 목록 추출 + h_tickers = [r for r in hz_rows if isinstance(r, dict) and r.get("horizon") == H] + + candidates = [] + for r in h_tickers: + ticker = r.get("ticker", "") + fj_row = fj_map.get(ticker, {}) + verdict = str(fj_row.get("action_verdict", "UNKNOWN")) + conf = _f(fj_row.get("effective_confidence", 50)) + weight_pct = weight_map.get(ticker, 0) + market_value = portfolio_equity * weight_pct / 100 if portfolio_equity > 0 else 0 + disparity = _f(r.get("disparity_pct", 0)) + rsi14 = _f(r.get("rsi14", 50)) + + # 우선순위 점수 산출 (기존 로직 유지) + priority = 0 + if verdict in ("SELL",): priority += 40 + elif verdict in ("TRIM",): priority += 20 + priority += max(0, 60 - conf) + priority += max(0, disparity - 5) * 2 + priority += max(0, rsi14 - 60) * 0.5 + + candidates.append({ + "ticker": ticker, + "name": r.get("name", ""), + "horizon": H, + "verdict": verdict, + "effective_confidence": conf, + "weight_pct": weight_pct, + "market_value_krw": round(market_value), + "disparity_pct": disparity, + "rsi14": rsi14, + "priority_score": round(priority, 1), + }) + + candidates.sort(key=lambda x: x["priority_score"], reverse=True) + + # 누적 감축 계획 시뮬레이션 + cum_reduction = 0.0 + h_plan_rows = [] + if excess_pct > 0: + for c in candidates: + if cum_reduction >= required_reduction_pct: + break + trim_pct = c["weight_pct"] + action = "FULL_TRIM" if c["verdict"] == "SELL" else "PARTIAL_TRIM" + plan_row = { + **c, + "recommended_action": action, + "trim_weight_pct": round(trim_pct, 2), + } + if H == "SHORT": + plan_row["cum_short_reduction_pct"] = round(cum_reduction + trim_pct, 2) + elif H == "MID": + plan_row["cum_mid_reduction_pct"] = round(cum_reduction + trim_pct, 2) + elif H == "LONG": + plan_row["cum_long_reduction_pct"] = round(cum_reduction + trim_pct, 2) + + h_plan_rows.append(plan_row) + cum_reduction += trim_pct + + estimated_after_plan = max(0.0, current_pct - cum_reduction) + gate_status = "PASS" if estimated_after_plan <= cap_pct else "FAIL" + + horizon_results[H] = { + "current_pct": current_pct, + "cap_pct": cap_pct, + "excess_pct": round(excess_pct, 1), + "required_reduction_pct": round(required_reduction_pct, 1), + "required_reduction_krw": round(required_reduction_krw), + "estimated_after_plan": round(estimated_after_plan, 1), + "gate_status": gate_status, + "candidates": candidates, + "plan_rows": h_plan_rows, + } + plan_rows.extend(h_plan_rows) + + # 전체 게이트 판정 + all_gate_status = "PASS" if all(res["gate_status"] == "PASS" for res in horizon_results.values()) else "FAIL" result = { "formula_id": FORMULA_ID, - "current_short_pct": short_pct, - "short_cap_pct": SHORT_CAP_PCT, - "excess_pct": round(excess_pct, 1), - "required_reduction_pct": round(required_reduction_pct, 1), - "required_reduction_krw": round(required_reduction_krw), - "estimated_short_after_plan": round(max(0, short_pct - cum_reduction), 1), - "gate_after_plan": "PASS" if max(0, short_pct - cum_reduction) <= SHORT_CAP_PCT else "FAIL", + + # 하위 호환성 필드 (SHORT 기준) + "current_short_pct": horizon_results["SHORT"]["current_pct"], + "short_cap_pct": horizon_results["SHORT"]["cap_pct"], + "excess_pct": horizon_results["SHORT"]["excess_pct"], + "required_reduction_pct": horizon_results["SHORT"]["required_reduction_pct"], + "required_reduction_krw": horizon_results["SHORT"]["required_reduction_krw"], + "estimated_short_after_plan": horizon_results["SHORT"]["estimated_after_plan"], + "gate_after_plan": all_gate_status, + + # 신규 확장 필드 (MID 기준) + "current_mid_pct": horizon_results["MID"]["current_pct"], + "mid_cap_pct": horizon_results["MID"]["cap_pct"], + "mid_excess_pct": horizon_results["MID"]["excess_pct"], + "required_mid_reduction_pct": horizon_results["MID"]["required_reduction_pct"], + "required_mid_reduction_krw": horizon_results["MID"]["required_reduction_krw"], + "estimated_mid_after_plan": horizon_results["MID"]["estimated_after_plan"], + + # 신규 확장 필드 (LONG 기준) + "current_long_pct": horizon_results["LONG"]["current_pct"], + "long_cap_pct": horizon_results["LONG"]["cap_pct"], + "long_excess_pct": horizon_results["LONG"]["excess_pct"], + "required_long_reduction_pct": horizon_results["LONG"]["required_reduction_pct"], + "required_long_reduction_krw": horizon_results["LONG"]["required_reduction_krw"], + "estimated_long_after_plan": horizon_results["LONG"]["estimated_after_plan"], + "plan_rows": plan_rows, - "all_short_candidates": candidates, + "all_short_candidates": horizon_results["SHORT"]["candidates"], + "all_mid_candidates": horizon_results["MID"]["candidates"], + "all_long_candidates": horizon_results["LONG"]["candidates"], "note": ( "포트폴리오 total_asset 기준 시뮬레이션. " "실제 weight_pct는 prices_json 기준이며 " @@ -174,9 +217,9 @@ def main() -> int: out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") print( - f"[{FORMULA_ID}] SHORT={short_pct}% excess={excess_pct}%p " + f"[{FORMULA_ID}] SHORT={result['current_short_pct']}%(excess={result['excess_pct']}%p) " + f"MID={result['current_mid_pct']}%(excess={result['mid_excess_pct']}%p) " f"plan_tickers={[r['ticker'] for r in plan_rows]} " - f"after_plan={result['estimated_short_after_plan']}% " f"gate={result['gate_after_plan']} -> {out_path}" ) return 0 diff --git a/tools/validate_factor_lifecycle_completeness_v1.py b/tools/validate_factor_lifecycle_completeness_v1.py new file mode 100644 index 0000000..27841ce --- /dev/null +++ b/tools/validate_factor_lifecycle_completeness_v1.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +"""validate_factor_lifecycle_completeness_v1.py — FACTOR_LIFECYCLE_COMPLETENESS_V1 + +실측 기반 팩터 생애주기 레지스트리 정합성을 검증한다. +- spec/factor_lifecycle_registry.yaml과 Temp/factor_shadow_eligibility_v1.json를 대조. +- 실측상 BLOCKED인데 명세상 shadow/active로 지정된 하드 정합성 위반 탐지 (FAIL) +- 실측상 ELIGIBLE인데 명세상 draft로 묶여서 shadow 승격 자격이 충분한 팩터들 탐지 및 요약 보고 +- spec/13_formula_registry.yaml와 factor_lifecycle_registry.yaml 간의 팩터 개수 정합성 검증 (누락 시 WARN) + +출력: Temp/factor_lifecycle_completeness_v1.json +""" +from __future__ import annotations + +import argparse +import json +import yaml +from pathlib import Path +from typing import Any + +ROOT = Path(__file__).resolve().parents[1] +TEMP = ROOT / "Temp" +FORMULA_ID = "FACTOR_LIFECYCLE_COMPLETENESS_V1" + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--registry", default="spec/factor_lifecycle_registry.yaml") + ap.add_argument("--eligibility", default="Temp/factor_shadow_eligibility_v1.json") + ap.add_argument("--formula-reg", default="spec/13_formula_registry.yaml") + ap.add_argument("--out", default="Temp/factor_lifecycle_completeness_v1.json") + args = ap.parse_args() + + reg_path = ROOT / args.registry if not Path(args.registry).is_absolute() else Path(args.registry) + elig_path = ROOT / args.eligibility if not Path(args.eligibility).is_absolute() else Path(args.eligibility) + freg_path = ROOT / args.formula_reg if not Path(args.formula_reg).is_absolute() else Path(args.formula_reg) + out_path = ROOT / args.out if not Path(args.out).is_absolute() else Path(args.out) + + if not reg_path.exists(): + print(f"[ERROR] Registry not found: {reg_path}") + return 1 + if not elig_path.exists(): + print(f"[ERROR] Eligibility file not found: {elig_path}") + return 1 + + # Load inputs + registry_data = yaml.safe_load(reg_path.read_text(encoding="utf-8")) or {} + eligibility_data = json.loads(elig_path.read_text(encoding="utf-8")) or {} + + formula_reg_data = {} + if freg_path.exists(): + formula_reg_data = yaml.safe_load(freg_path.read_text(encoding="utf-8")) or {} + + factors = registry_data.get("factors") or [] + elig_rows = eligibility_data.get("rows") or [] + elig_map = {r["factor_id"]: r for r in elig_rows if "factor_id" in r} + + # 1. 팩터 개수 정합성 검증 + # formula_registry의 formulas 목록 + freg_formulas = formula_reg_data.get("formula_registry", {}).get("formulas", {}) + freg_keys = set(freg_formulas.keys()) + registry_keys = {f.get("factor_id") for f in factors if isinstance(f, dict)} + + missing_in_lifecycle = list(freg_keys - registry_keys) + extra_in_lifecycle = list(registry_keys - freg_keys) + + violations = [] + shadow_ready_candidates = [] + + # 2. 실측-명세 정합성 검사 + for f in factors: + if not isinstance(f, dict): + continue + fid = f.get("factor_id", "UNKNOWN") + promotion_gate = f.get("promotion_gate", "draft").lower() + + elig_info = elig_map.get(fid) + if not elig_info: + violations.append({ + "factor_id": fid, + "type": "MISSING_ELIGIBILITY_DATA", + "message": f"실측 섀도우 적격성 데이터에 팩터 {fid}가 누락되었습니다." + }) + continue + + eligibility = elig_info.get("eligibility", "UNKNOWN") + + # 하드 위반: 데이터 결측(BLOCKED 또는 NO_REQUIRED_DATA)인데 shadow나 active로 지정된 경우 + if eligibility in ("BLOCKED", "NO_REQUIRED_DATA") and promotion_gate in ("shadow", "active", "candidate"): + violations.append({ + "factor_id": fid, + "type": "PROMOTION_VIOLATION", + "message": f"팩터 {fid}는 실측 데이터 결측 상태({eligibility})이나 명세 상 {promotion_gate}로 승급되어 있습니다." + }) + + # shadow 승격 자격이 충분한 draft 팩터 탐지 + if eligibility == "ELIGIBLE" and promotion_gate == "draft": + shadow_ready_candidates.append({ + "factor_id": fid, + "coverage_pct": elig_info.get("coverage_pct"), + "present_count": elig_info.get("present_count"), + "required_field_count": elig_info.get("required_field_count") + }) + + # 전체 게이트 상태 결정 + # 하드 위반(PROMOTION_VIOLATION 등)이 있으면 FAIL + gate_status = "FAIL" if violations else "PASS" + + # 3. 결과 JSON 조립 + result = { + "formula_id": FORMULA_ID, + "gate": gate_status, + "summary": { + "total_factors_in_lifecycle": len(factors), + "total_formulas_in_registry": len(freg_keys), + "missing_factors_count": len(missing_in_lifecycle), + "extra_factors_count": len(extra_in_lifecycle), + "shadow_ready_count": len(shadow_ready_candidates), + "violation_count": len(violations), + }, + "shadow_ready_candidates": shadow_ready_candidates, + "missing_in_lifecycle": missing_in_lifecycle, + "extra_in_lifecycle": extra_in_lifecycle, + "violations": violations, + "note": ( + "FACTOR_LIFECYCLE_COMPLETENESS_V1: 팩터 생애주기 명세와 실측 데이터 일치성 검증. " + "BLOCKED/NO_REQUIRED_DATA 팩터가 shadow/active로 승급되어 있으면 FAIL 판정. " + "draft 상태 중 GatherTradingData.json에 모든 입력 필드가 실측된 팩터는 shadow_ready_candidates로 분류." + ) + } + + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + + print(f"[{FORMULA_ID}] gate={gate_status} total={len(factors)} shadow_ready={len(shadow_ready_candidates)} violations={len(violations)}") + if shadow_ready_candidates: + print(f" Shadow-ready candidates (first 5): {[c['factor_id'] for c in shadow_ready_candidates[:5]]}") + if violations: + print(f" [VIOLATION] First violation: {violations[0]['message']}") + + return 0 if gate_status == "PASS" else 1 + + +if __name__ == "__main__": + raise SystemExit(main())