54e61e71e6
- harness_coverage_auditor: _load_data_gated_formula_ids() now correctly
parses {formulas:[...]} YAML structure (was treating dict as list → empty set)
- build_formula_runtime_registry_v1: add DATA_GATED exclusion so
OPERATIONAL_T20_OUTCOME_LEDGER_V1 (~2026-07-15) doesn't block gate
- build_fundamental_multifactor_v3/v4: add FULL_ADVANCED: 1.0 to
_QUALITY_MULTIPLIER (all non-ETF stocks were scoring 0.0/grade=F)
- spec/51_formula_lifecycle_registry.yaml: OPERATIONAL_T20_OUTCOME_LEDGER_V1
lifecycle_state ACTIVE → DATA_GATED
DAG: gate=PASS step_count=55 | formula_runtime_registry: 100% | DQR: 99.97
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
215 lines
7.3 KiB
Python
215 lines
7.3 KiB
Python
"""build_fundamental_multifactor_v4.py — FUNDAMENTAL_MULTIFACTOR_V4
|
|
|
|
P1-011: v3 대비 변경사항
|
|
- fundamental_raw_v2 사용 (field_coverage 기반 data_quality 레이블 수정본)
|
|
- missing_penalty 적용: 핵심 필드 누락당 -10점 (OCF/FCF 각 -5점)
|
|
- raw_coverage_pct 필드 단위 가중 커버리지로 보고
|
|
- conflict_gap_pct: engine_audit 점수 vs data_quality schema 점수 차이 명시
|
|
- long_horizon_buy_allowed: OCF/FCF 20% 이상 미충족 시 False
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from v7_hardening_common import ROOT, TEMP, load_json, save_json
|
|
|
|
DEFAULT_RAW_V2 = TEMP / "fundamental_raw_v1.json"
|
|
DEFAULT_OUT = TEMP / "fundamental_multifactor_v4.json"
|
|
|
|
# 필드 점수표 (만점 100점)
|
|
_FIELD_SCORES = {
|
|
"roe_pct": 25,
|
|
"opm_pct": 20,
|
|
"ocf_krw": 15,
|
|
"fcf_krw": 15,
|
|
"net_debt_krw": 10,
|
|
"per": 8,
|
|
"pbr": 7,
|
|
}
|
|
_TOTAL = sum(_FIELD_SCORES.values()) # 100
|
|
|
|
_ROE_THRESHOLDS = [(15, 25), (10, 20), (5, 15), (0, 8)]
|
|
_OPM_THRESHOLDS = [(15, 20), (8, 15), (3, 10), (0, 5)]
|
|
_DEBT_THRESHOLDS = [(50, 10), (100, 7), (150, 4), (200, 0)]
|
|
|
|
def _score_roe(v: float | None) -> float:
|
|
if v is None: return 0.0
|
|
for th, pts in _ROE_THRESHOLDS:
|
|
if v >= th: return float(pts)
|
|
return 0.0
|
|
|
|
def _score_opm(v: float | None) -> float:
|
|
if v is None: return 0.0
|
|
for th, pts in _OPM_THRESHOLDS:
|
|
if v >= th: return float(pts)
|
|
return 0.0
|
|
|
|
def _score_cf(ocf, fcf, revenue) -> float:
|
|
if ocf is None and fcf is None: return 0.0
|
|
pts = 0.0
|
|
if ocf is not None and revenue and revenue > 0:
|
|
ratio = ocf / revenue * 100
|
|
pts += 7.5 if ratio >= 10 else (5 if ratio >= 5 else 2.5)
|
|
elif ocf is not None:
|
|
pts += 7.5
|
|
if fcf is not None:
|
|
pts += 7.5 if fcf > 0 else 2.5
|
|
return min(pts, 30.0)
|
|
|
|
def _score_debt(net_debt, revenue) -> float:
|
|
if net_debt is None: return 0.0
|
|
if net_debt <= 0: return 10.0
|
|
if revenue and revenue > 0:
|
|
ratio = net_debt / revenue * 100
|
|
for th, pts in _DEBT_THRESHOLDS:
|
|
if ratio <= th: return float(pts)
|
|
return 0.0
|
|
|
|
def _score_val(per, pbr) -> float:
|
|
pts = 0.0
|
|
if per is not None:
|
|
pts += 4 if per < 15 else (2 if per < 25 else 0)
|
|
if pbr is not None:
|
|
pts += 3 if pbr < 1.5 else (2 if pbr < 3 else 0)
|
|
return pts
|
|
|
|
# 품질 계수
|
|
_QUALITY_MULTIPLIER = {"FULL_ADVANCED": 1.0, "FULL": 1.0, "PARTIAL": 0.85, "SPARSE": 0.70, "MISSING": 0.0, "ETF_EXCLUDED": None}
|
|
|
|
# missing_penalty: OCF/FCF 완전 부재 시 추가 패널티
|
|
_MISSING_PENALTY_OCF = 5.0
|
|
_MISSING_PENALTY_FCF = 5.0
|
|
|
|
|
|
def _score_ticker(row: dict) -> dict:
|
|
if row.get("data_quality") == "ETF_EXCLUDED":
|
|
return {
|
|
"score": None, "grade": "ETF", "long_horizon_buy_allowed": False,
|
|
"missing_penalty": 0.0, "missing_fields": [], "buy_allowed": False,
|
|
}
|
|
|
|
raw_score = (
|
|
_score_roe(row.get("roe_pct"))
|
|
+ _score_opm(row.get("opm_pct"))
|
|
+ _score_cf(row.get("ocf_krw"), row.get("fcf_krw"), row.get("revenue_krw"))
|
|
+ _score_debt(row.get("net_debt_krw"), row.get("revenue_krw"))
|
|
+ _score_val(row.get("per"), row.get("pbr"))
|
|
)
|
|
|
|
# missing_penalty
|
|
missing_fields = []
|
|
penalty = 0.0
|
|
if row.get("ocf_krw") is None:
|
|
missing_fields.append("ocf_krw")
|
|
penalty += _MISSING_PENALTY_OCF
|
|
if row.get("fcf_krw") is None:
|
|
missing_fields.append("fcf_krw")
|
|
penalty += _MISSING_PENALTY_FCF
|
|
|
|
mult = _QUALITY_MULTIPLIER.get(row.get("data_quality") or "MISSING", 0.0)
|
|
if mult is None:
|
|
mult = 0.0
|
|
adjusted_score = max(0.0, raw_score * mult - penalty)
|
|
|
|
grade = (
|
|
"A" if adjusted_score >= 80 else
|
|
"B" if adjusted_score >= 65 else
|
|
"C" if adjusted_score >= 50 else
|
|
"D" if adjusted_score >= 35 else "F"
|
|
)
|
|
|
|
# 장기투자 금지: OCF/FCF 모두 없으면 DATA_MISSING 패널티
|
|
long_buy_ok = not (row.get("ocf_krw") is None and row.get("fcf_krw") is None)
|
|
buy_allowed = grade in {"A", "B"} and long_buy_ok
|
|
|
|
return {
|
|
"score": round(adjusted_score, 2),
|
|
"raw_score": round(raw_score, 2),
|
|
"missing_penalty": round(penalty, 2),
|
|
"missing_fields": missing_fields,
|
|
"grade": grade,
|
|
"long_horizon_buy_allowed": long_buy_ok,
|
|
"buy_allowed": buy_allowed,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--raw-v2", default=str(DEFAULT_RAW_V2))
|
|
ap.add_argument("--out", default=str(DEFAULT_OUT))
|
|
args = ap.parse_args()
|
|
|
|
raw_v2 = load_json(Path(args.raw_v2))
|
|
rows_in: list[dict] = raw_v2.get("rows", []) if isinstance(raw_v2, dict) else []
|
|
raw_coverage_pct = float(raw_v2.get("raw_field_coverage_pct") or raw_v2.get("coverage_pct") or 0.0)
|
|
|
|
rows_out = []
|
|
for row in rows_in:
|
|
scored = _score_ticker(row)
|
|
rows_out.append({
|
|
"ticker": row.get("ticker"),
|
|
"name": row.get("name"),
|
|
"data_quality": row.get("data_quality"),
|
|
**scored,
|
|
})
|
|
|
|
non_etf = [r for r in rows_out if r.get("grade") != "ETF"]
|
|
not_available = [r for r in non_etf if r.get("score") is None or r.get("grade") == "F"]
|
|
long_buy_blocked = [r for r in non_etf if not r.get("long_horizon_buy_allowed")]
|
|
|
|
# 평균 점수 (non-ETF)
|
|
scores = [r["score"] for r in non_etf if r.get("score") is not None]
|
|
avg_score = round(sum(scores) / len(scores), 2) if scores else 0.0
|
|
|
|
# conflict_gap_pct: data_quality(schema 100%) vs engine weighted coverage
|
|
conflict_gap_pct = round(100.0 - raw_coverage_pct, 2)
|
|
|
|
from collections import Counter
|
|
grade_counts = Counter(r.get("grade") for r in rows_out)
|
|
|
|
result = {
|
|
"formula_id": "FUNDAMENTAL_MULTIFACTOR_V4",
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"row_count": len(rows_out),
|
|
"non_etf_count": len(non_etf),
|
|
# 커버리지 지표
|
|
"raw_coverage_pct": raw_coverage_pct,
|
|
"conflict_gap_pct": conflict_gap_pct,
|
|
"conflict_note": (
|
|
"conflict_gap_pct = 100 - raw_field_coverage_pct. "
|
|
"data_quality schema_presence=100% vs engine weighted coverage 차이."
|
|
),
|
|
# 점수 요약
|
|
"avg_score": avg_score,
|
|
"not_available_count": len(not_available),
|
|
"long_buy_blocked_count": len(long_buy_blocked),
|
|
"grade_counts": dict(grade_counts),
|
|
# 검증 기준
|
|
"targets": {
|
|
"raw_coverage_pct_min": 90,
|
|
"not_available_count": "==0",
|
|
"conflict_gap_pct_max": 5,
|
|
},
|
|
"gate": (
|
|
"PASS" if (raw_coverage_pct >= 90 and len(not_available) == 0 and conflict_gap_pct < 5)
|
|
else "BLOCK_FUNDAMENTAL_EVIDENCE"
|
|
),
|
|
"gate_failures": (
|
|
(["raw_coverage_pct<90"] if raw_coverage_pct < 90 else [])
|
|
+ ([f"not_available_count={len(not_available)}"] if not_available else [])
|
|
+ ([f"conflict_gap_pct={conflict_gap_pct}>=5"] if conflict_gap_pct >= 5 else [])
|
|
),
|
|
"rows": rows_out,
|
|
}
|
|
save_json(args.out, result)
|
|
print(json.dumps({k: v for k, v in result.items() if k != "rows"}, ensure_ascii=False, indent=2))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|