"""build_fundamental_multifactor_v4.py — FUNDAMENTAL_MULTIFACTOR_V4 P1-011: v3 대비 변경사항 - fundamental_raw_v2 사용 (field_coverage 기반 data_quality 레이블 수정본) - missing_penalty 적용: 핵심 필드 누락당 -10점 (OCF/FCF 각 -5점) - raw_coverage_pct 필드 단위 가중 커버리지로 보고 - conflict_gap_pct: engine_audit 점수 vs data_quality schema 점수 차이 명시 - long_horizon_buy_allowed: OCF/FCF 20% 이상 미충족 시 False """ from __future__ import annotations import argparse import json import math from datetime import datetime, timezone from pathlib import Path from v7_hardening_common import ROOT, TEMP, load_json, save_json DEFAULT_RAW_V2 = TEMP / "fundamental_raw_v1.json" DEFAULT_OUT = TEMP / "fundamental_multifactor_v4.json" # 필드 점수표 (만점 100점) _FIELD_SCORES = { "roe_pct": 25, "opm_pct": 20, "ocf_krw": 15, "fcf_krw": 15, "net_debt_krw": 10, "per": 8, "pbr": 7, } _TOTAL = sum(_FIELD_SCORES.values()) # 100 _ROE_THRESHOLDS = [(15, 25), (10, 20), (5, 15), (0, 8)] _OPM_THRESHOLDS = [(15, 20), (8, 15), (3, 10), (0, 5)] _DEBT_THRESHOLDS = [(50, 10), (100, 7), (150, 4), (200, 0)] def _score_roe(v: float | None) -> float: if v is None: return 0.0 for th, pts in _ROE_THRESHOLDS: if v >= th: return float(pts) return 0.0 def _score_opm(v: float | None) -> float: if v is None: return 0.0 for th, pts in _OPM_THRESHOLDS: if v >= th: return float(pts) return 0.0 def _score_cf(ocf, fcf, revenue) -> float: if ocf is None and fcf is None: return 0.0 pts = 0.0 if ocf is not None and revenue and revenue > 0: ratio = ocf / revenue * 100 pts += 7.5 if ratio >= 10 else (5 if ratio >= 5 else 2.5) elif ocf is not None: pts += 7.5 if fcf is not None: pts += 7.5 if fcf > 0 else 2.5 return min(pts, 30.0) def _score_debt(net_debt, revenue) -> float: if net_debt is None: return 0.0 if net_debt <= 0: return 10.0 if revenue and revenue > 0: ratio = net_debt / revenue * 100 for th, pts in _DEBT_THRESHOLDS: if ratio <= th: return float(pts) return 0.0 def _score_val(per, pbr) -> float: pts = 0.0 if per is not None: pts += 4 if per < 15 else (2 if per < 25 else 0) if pbr is not None: pts += 3 if pbr < 1.5 else (2 if pbr < 3 else 0) return pts # 품질 계수 _QUALITY_MULTIPLIER = {"FULL_ADVANCED": 1.0, "FULL": 1.0, "PARTIAL": 0.85, "SPARSE": 0.70, "MISSING": 0.0, "ETF_EXCLUDED": None} # missing_penalty: OCF/FCF 완전 부재 시 추가 패널티 _MISSING_PENALTY_OCF = 5.0 _MISSING_PENALTY_FCF = 5.0 def _score_ticker(row: dict) -> dict: if row.get("data_quality") == "ETF_EXCLUDED": return { "score": None, "grade": "ETF", "long_horizon_buy_allowed": False, "missing_penalty": 0.0, "missing_fields": [], "buy_allowed": False, } raw_score = ( _score_roe(row.get("roe_pct")) + _score_opm(row.get("opm_pct")) + _score_cf(row.get("ocf_krw"), row.get("fcf_krw"), row.get("revenue_krw")) + _score_debt(row.get("net_debt_krw"), row.get("revenue_krw")) + _score_val(row.get("per"), row.get("pbr")) ) # missing_penalty missing_fields = [] penalty = 0.0 if row.get("ocf_krw") is None: missing_fields.append("ocf_krw") penalty += _MISSING_PENALTY_OCF if row.get("fcf_krw") is None: missing_fields.append("fcf_krw") penalty += _MISSING_PENALTY_FCF mult = _QUALITY_MULTIPLIER.get(row.get("data_quality") or "MISSING", 0.0) if mult is None: mult = 0.0 adjusted_score = max(0.0, raw_score * mult - penalty) grade = ( "A" if adjusted_score >= 80 else "B" if adjusted_score >= 65 else "C" if adjusted_score >= 50 else "D" if adjusted_score >= 35 else "F" ) # 장기투자 금지: OCF/FCF 모두 없으면 DATA_MISSING 패널티 long_buy_ok = not (row.get("ocf_krw") is None and row.get("fcf_krw") is None) buy_allowed = grade in {"A", "B"} and long_buy_ok return { "score": round(adjusted_score, 2), "raw_score": round(raw_score, 2), "missing_penalty": round(penalty, 2), "missing_fields": missing_fields, "grade": grade, "long_horizon_buy_allowed": long_buy_ok, "buy_allowed": buy_allowed, } def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--raw-v2", default=str(DEFAULT_RAW_V2)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() raw_v2 = load_json(Path(args.raw_v2)) rows_in: list[dict] = raw_v2.get("rows", []) if isinstance(raw_v2, dict) else [] raw_coverage_pct = float(raw_v2.get("raw_field_coverage_pct") or raw_v2.get("coverage_pct") or 0.0) rows_out = [] for row in rows_in: scored = _score_ticker(row) rows_out.append({ "ticker": row.get("ticker"), "name": row.get("name"), "data_quality": row.get("data_quality"), **scored, }) non_etf = [r for r in rows_out if r.get("grade") != "ETF"] not_available = [r for r in non_etf if r.get("score") is None or r.get("grade") == "F"] long_buy_blocked = [r for r in non_etf if not r.get("long_horizon_buy_allowed")] # 평균 점수 (non-ETF) scores = [r["score"] for r in non_etf if r.get("score") is not None] avg_score = round(sum(scores) / len(scores), 2) if scores else 0.0 # conflict_gap_pct: data_quality(schema 100%) vs engine weighted coverage conflict_gap_pct = round(100.0 - raw_coverage_pct, 2) from collections import Counter grade_counts = Counter(r.get("grade") for r in rows_out) result = { "formula_id": "FUNDAMENTAL_MULTIFACTOR_V4", "generated_at": datetime.now(timezone.utc).isoformat(), "row_count": len(rows_out), "non_etf_count": len(non_etf), # 커버리지 지표 "raw_coverage_pct": raw_coverage_pct, "conflict_gap_pct": conflict_gap_pct, "conflict_note": ( "conflict_gap_pct = 100 - raw_field_coverage_pct. " "data_quality schema_presence=100% vs engine weighted coverage 차이." ), # 점수 요약 "avg_score": avg_score, "not_available_count": len(not_available), "long_buy_blocked_count": len(long_buy_blocked), "grade_counts": dict(grade_counts), # 검증 기준 "targets": { "raw_coverage_pct_min": 90, "not_available_count": "==0", "conflict_gap_pct_max": 5, }, "gate": ( "PASS" if (raw_coverage_pct >= 90 and len(not_available) == 0 and conflict_gap_pct < 5) else "BLOCK_FUNDAMENTAL_EVIDENCE" ), "gate_failures": ( (["raw_coverage_pct<90"] if raw_coverage_pct < 90 else []) + ([f"not_available_count={len(not_available)}"] if not_available else []) + ([f"conflict_gap_pct={conflict_gap_pct}>=5"] if conflict_gap_pct >= 5 else []) ), "rows": rows_out, } save_json(args.out, result) print(json.dumps({k: v for k, v in result.items() if k != "rows"}, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())