Files
QuantEngineByItz/tools/build_fundamental_multifactor_v3.py
kjh2064 54e61e71e6 fix: DATA_GATED exclusion for harness/registry, FULL_ADVANCED multiplier bug
- harness_coverage_auditor: _load_data_gated_formula_ids() now correctly
  parses {formulas:[...]} YAML structure (was treating dict as list → empty set)
- build_formula_runtime_registry_v1: add DATA_GATED exclusion so
  OPERATIONAL_T20_OUTCOME_LEDGER_V1 (~2026-07-15) doesn't block gate
- build_fundamental_multifactor_v3/v4: add FULL_ADVANCED: 1.0 to
  _QUALITY_MULTIPLIER (all non-ETF stocks were scoring 0.0/grade=F)
- spec/51_formula_lifecycle_registry.yaml: OPERATIONAL_T20_OUTCOME_LEDGER_V1
  lifecycle_state ACTIVE → DATA_GATED

DAG: gate=PASS step_count=55 | formula_runtime_registry: 100% | DQR: 99.97

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 17:59:57 +09:00

360 lines
12 KiB
Python

"""FUNDAMENTAL_MULTIFACTOR_V3 — 종목별 펀더멘털 등급 산출기.
fundamental_raw_v1.json(FUNDAMENTAL_RAW_INGEST_V1 출력)에서 per-ticker 지표를
결정론 공식으로 합산하여 종목별 등급을 산출한다.
점수 (총 100점):
ROE 25점 (roe_pct)
OPM 20점 (opm_pct)
OCF/매출 15점 (ocf_krw / revenue_krw)
FCF 15점 (fcf_krw)
Debt 10점 (net_debt_krw)
밸류에이션 15점 (per/pbr)
누락 필드 정규화: 보유 필드 기준 100점 환산 후 품질 계수 적용
data_quality=FULL → multiplier 1.00
data_quality=PARTIAL → multiplier 0.90
data_quality=SPARSE → multiplier 0.80
data_quality=MISSING → multiplier 0.00
data_quality=ETF_EXCLUDED → ETF 등급 별도 부여 (점수 없음)
등급:
A ≥ 80점
B ≥ 65점
C ≥ 50점
D ≥ 35점
F < 35점
ETF — ETF 종목 (펀더멘털 미적용)
buy_allowed = grade ∈ {A, B} AND len(critical_fail_reasons) == 0.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_RAW = ROOT / "Temp" / "fundamental_raw_v1.json"
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = ROOT / "Temp" / "fundamental_multifactor_v3.json"
_QUALITY_MULTIPLIER = {
"FULL_ADVANCED": 1.00,
"FULL": 1.00,
"PARTIAL": 0.90,
"SPARSE": 0.80,
"MISSING": 0.00,
"ETF_EXCLUDED": None, # ETF는 별도 처리
}
_FIELD_MAX = {
"roe": 25.0,
"opm": 20.0,
"ocf": 15.0,
"fcf": 15.0,
"debt": 10.0,
"valuation": 15.0,
}
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
def _num(v: Any, default: float = 0.0) -> float:
try:
if v is None or v == "" or v == "N/A":
return default
return float(v)
except (TypeError, ValueError):
return default
def _clip(v: float, lo: float = 0.0, hi: float = 100.0) -> float:
return max(lo, min(hi, v))
def _grade_from_score(score: float) -> str:
if score >= 80:
return "A"
if score >= 65:
return "B"
if score >= 50:
return "C"
if score >= 35:
return "D"
return "F"
def _score_component(raw: dict[str, Any]) -> tuple[dict[str, float], list[str], dict[str, bool]]:
"""각 컴포넌트 점수를 산출. Returns (scores, fail_reasons, has_data)."""
scores: dict[str, float] = {}
fail_reasons: list[str] = []
has_data: dict[str, bool] = {}
# ── ROE (25점) ────────────────────────────────────────────────────────────
roe = _num(raw.get("roe_pct"))
has_data["roe"] = raw.get("roe_pct") is not None and roe != 0.0
if not has_data["roe"]:
scores["roe"] = 0.0
elif roe >= 20:
scores["roe"] = 25.0
elif roe >= 15:
scores["roe"] = 20.0
elif roe >= 10:
scores["roe"] = 15.0
elif roe >= 5:
scores["roe"] = 8.0
elif roe > 0:
scores["roe"] = 3.0
else:
scores["roe"] = 0.0
if has_data["roe"]:
fail_reasons.append(f"ROE_NEGATIVE({roe:.1f}%)")
# ── OPM (20점) ───────────────────────────────────────────────────────────
opm = _num(raw.get("opm_pct"))
has_data["opm"] = raw.get("opm_pct") is not None and opm != 0.0
if not has_data["opm"]:
scores["opm"] = 0.0
elif opm >= 20:
scores["opm"] = 20.0
elif opm >= 15:
scores["opm"] = 16.0
elif opm >= 10:
scores["opm"] = 12.0
elif opm >= 5:
scores["opm"] = 7.0
elif opm > 0:
scores["opm"] = 3.0
else:
scores["opm"] = 0.0
if has_data["opm"]:
fail_reasons.append(f"OPM_NEGATIVE({opm:.1f}%)")
# ── OCF/매출 (15점) ───────────────────────────────────────────────────────
ocf = _num(raw.get("ocf_krw"))
revenue = _num(raw.get("revenue_krw"))
has_data["ocf"] = raw.get("ocf_krw") is not None and ocf != 0.0
if not has_data["ocf"]:
scores["ocf"] = 0.0
elif revenue > 0 and ocf > 0:
ocf_margin = ocf / revenue * 100.0
if ocf_margin >= 20:
scores["ocf"] = 15.0
elif ocf_margin >= 12:
scores["ocf"] = 12.0
elif ocf_margin >= 7:
scores["ocf"] = 8.0
elif ocf_margin >= 3:
scores["ocf"] = 4.0
else:
scores["ocf"] = 1.0
elif ocf > 0:
scores["ocf"] = 7.0 # 매출 없어도 양전
else:
scores["ocf"] = 0.0
if has_data["ocf"]:
fail_reasons.append("OCF_NEGATIVE")
# ── FCF (15점) ────────────────────────────────────────────────────────────
fcf = _num(raw.get("fcf_krw"))
has_data["fcf"] = raw.get("fcf_krw") is not None and fcf != 0.0
if not has_data["fcf"]:
scores["fcf"] = 0.0
elif fcf > 0:
scores["fcf"] = 12.0
else:
scores["fcf"] = 0.0
if has_data["fcf"]:
fail_reasons.append("FCF_NEGATIVE")
# ── 부채비율 (10점) ───────────────────────────────────────────────────────
net_debt = _num(raw.get("net_debt_krw"))
has_data["debt"] = raw.get("net_debt_krw") is not None and net_debt != 0.0
if not has_data["debt"]:
scores["debt"] = 5.0 # 알 수 없으면 중립 (5점)
elif net_debt <= 0:
scores["debt"] = 10.0 # 무부채
elif revenue > 0 and net_debt / revenue < 0.5:
scores["debt"] = 8.0
elif revenue > 0 and net_debt / revenue < 1.5:
scores["debt"] = 5.0
else:
scores["debt"] = 2.0
fail_reasons.append("HIGH_NET_DEBT")
# ── 밸류에이션 (15점) ─────────────────────────────────────────────────────
per = _num(raw.get("per"))
pbr = _num(raw.get("pbr"))
has_data["valuation"] = (raw.get("per") is not None and per > 0) or (raw.get("pbr") is not None and pbr > 0)
val_score = 0.0
if not has_data["valuation"]:
scores["valuation"] = 0.0
else:
if 0 < per <= 15:
val_score += 8.0
elif 0 < per <= 25:
val_score += 5.0
elif 0 < per <= 40:
val_score += 2.0
elif per > 40:
fail_reasons.append(f"HIGH_PER({per:.1f})")
if 0 < pbr <= 1.5:
val_score += 7.0
elif 0 < pbr <= 3.0:
val_score += 4.0
elif 0 < pbr <= 6.0:
val_score += 2.0
elif pbr > 6:
pass # 0점
scores["valuation"] = _clip(val_score, 0, 15)
return scores, fail_reasons, has_data
def _normalize_score(
scores: dict[str, float],
has_data: dict[str, bool],
data_quality: str,
) -> float:
"""보유 데이터 기준 100점 환산."""
multiplier = _QUALITY_MULTIPLIER.get(data_quality, 0.0)
if multiplier is None or multiplier == 0.0:
return 0.0
# 실제 점수 합산
raw_total = sum(scores.values())
# 보유 필드의 최대 가능 점수 계산
available_max = 0.0
for field, max_pts in _FIELD_MAX.items():
if field == "debt":
# debt는 데이터 유무 관계없이 항상 5~10점 부여
available_max += max_pts
elif has_data.get(field):
available_max += max_pts
if available_max <= 0:
return 0.0
# 100점 환산
normalized = (raw_total / available_max) * 100.0
# 품질 계수 적용
final = normalized * multiplier
return _clip(final, 0.0, 100.0)
def _score_ticker(raw: dict[str, Any]) -> tuple[float, str, list[str], dict[str, float], bool]:
"""결정론 점수 산출. Returns (score, grade, fail_reasons, breakdown, buy_allowed)."""
data_quality = str(raw.get("data_quality") or "MISSING")
# ETF 별도 처리
if data_quality == "ETF_EXCLUDED" or raw.get("is_etf"):
return 0.0, "ETF", [], {}, False
scores, fail_reasons, has_data = _score_component(raw)
score = _normalize_score(scores, has_data, data_quality)
score = round(score, 2)
grade = _grade_from_score(score)
# 치명적 실패 사유 필터
critical_fails = [r for r in fail_reasons if any(
kw in r for kw in ("NEGATIVE", "HIGH_NET_DEBT")
)]
buy_allowed = grade in ("A", "B") and len(critical_fails) == 0
breakdown = {k: round(v, 2) for k, v in scores.items()}
return score, grade, fail_reasons, breakdown, buy_allowed
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--raw", default=str(DEFAULT_RAW))
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
raw_path = Path(args.raw)
out_path = Path(args.out)
if not raw_path.is_absolute():
raw_path = ROOT / raw_path
if not out_path.is_absolute():
out_path = ROOT / out_path
raw_data = _load_json(raw_path)
raw_rows = raw_data.get("rows") if isinstance(raw_data.get("rows"), list) else []
# data_feed에서 이름 맵 구성 (보완)
json_path = Path(args.json)
if not json_path.is_absolute():
json_path = ROOT / json_path
gtd = _load_json(json_path)
df_list = (gtd.get("data") or {}).get("data_feed") or []
name_map = {str(r.get("Ticker") or ""): str(r.get("Name") or "") for r in df_list if isinstance(r, dict)}
grade_counts: dict[str, int] = {}
rows: list[dict[str, Any]] = []
for raw in raw_rows:
if not isinstance(raw, dict):
continue
ticker = str(raw.get("ticker") or "")
if not ticker:
continue
score, grade, fail_reasons, breakdown, buy_allowed = _score_ticker(raw)
name = raw.get("name") or name_map.get(ticker, "")
rows.append({
"ticker": ticker,
"name": name,
"score": score,
"grade": grade,
"buy_allowed": buy_allowed,
"fail_reasons": fail_reasons,
"breakdown": breakdown,
"data_quality": raw.get("data_quality", "MISSING"),
"is_etf": bool(raw.get("is_etf")),
"as_of_date": raw.get("as_of_date"),
"source": raw.get("source", "fallback"),
})
grade_counts[grade] = grade_counts.get(grade, 0) + 1
# Gate: 비-ETF 종목 기준
non_etf_rows = [r for r in rows if r["grade"] != "ETF"]
data_missing_count = sum(1 for r in non_etf_rows if r["data_quality"] == "MISSING")
unique_non_etf_grades = {r["grade"] for r in non_etf_rows}
grade_diverse = len(unique_non_etf_grades) >= 2
gate = "PASS" if (non_etf_rows and data_missing_count == 0 and grade_diverse) else (
"CAUTION" if non_etf_rows else "FAIL"
)
result = {
"formula_id": "FUNDAMENTAL_MULTIFACTOR_V3",
"gate": gate,
"row_count": len(rows),
"non_etf_count": len(non_etf_rows),
"data_missing_count": data_missing_count,
"grade_counts": grade_counts,
"grade_diverse": grade_diverse,
"rows": rows,
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(
f"FUNDAMENTAL_MULTIFACTOR_V3 gate={gate} rows={len(rows)} non_etf={len(non_etf_rows)} "
f"missing={data_missing_count} grades={grade_counts}"
)
print("FUNDAMENTAL_MULTIFACTOR_V3_OK" if gate != "FAIL" else "FUNDAMENTAL_MULTIFACTOR_V3_FAIL")
return 0 if gate != "FAIL" else 1
if __name__ == "__main__":
raise SystemExit(main())