54e61e71e6
- harness_coverage_auditor: _load_data_gated_formula_ids() now correctly
parses {formulas:[...]} YAML structure (was treating dict as list → empty set)
- build_formula_runtime_registry_v1: add DATA_GATED exclusion so
OPERATIONAL_T20_OUTCOME_LEDGER_V1 (~2026-07-15) doesn't block gate
- build_fundamental_multifactor_v3/v4: add FULL_ADVANCED: 1.0 to
_QUALITY_MULTIPLIER (all non-ETF stocks were scoring 0.0/grade=F)
- spec/51_formula_lifecycle_registry.yaml: OPERATIONAL_T20_OUTCOME_LEDGER_V1
lifecycle_state ACTIVE → DATA_GATED
DAG: gate=PASS step_count=55 | formula_runtime_registry: 100% | DQR: 99.97
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
360 lines
12 KiB
Python
360 lines
12 KiB
Python
"""FUNDAMENTAL_MULTIFACTOR_V3 — 종목별 펀더멘털 등급 산출기.
|
|
|
|
fundamental_raw_v1.json(FUNDAMENTAL_RAW_INGEST_V1 출력)에서 per-ticker 지표를
|
|
결정론 공식으로 합산하여 종목별 등급을 산출한다.
|
|
|
|
점수 (총 100점):
|
|
ROE 25점 (roe_pct)
|
|
OPM 20점 (opm_pct)
|
|
OCF/매출 15점 (ocf_krw / revenue_krw)
|
|
FCF 15점 (fcf_krw)
|
|
Debt 10점 (net_debt_krw)
|
|
밸류에이션 15점 (per/pbr)
|
|
|
|
누락 필드 정규화: 보유 필드 기준 100점 환산 후 품질 계수 적용
|
|
data_quality=FULL → multiplier 1.00
|
|
data_quality=PARTIAL → multiplier 0.90
|
|
data_quality=SPARSE → multiplier 0.80
|
|
data_quality=MISSING → multiplier 0.00
|
|
data_quality=ETF_EXCLUDED → ETF 등급 별도 부여 (점수 없음)
|
|
|
|
등급:
|
|
A ≥ 80점
|
|
B ≥ 65점
|
|
C ≥ 50점
|
|
D ≥ 35점
|
|
F < 35점
|
|
ETF — ETF 종목 (펀더멘털 미적용)
|
|
|
|
buy_allowed = grade ∈ {A, B} AND len(critical_fail_reasons) == 0.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_RAW = ROOT / "Temp" / "fundamental_raw_v1.json"
|
|
DEFAULT_JSON = ROOT / "GatherTradingData.json"
|
|
DEFAULT_OUT = ROOT / "Temp" / "fundamental_multifactor_v3.json"
|
|
|
|
_QUALITY_MULTIPLIER = {
|
|
"FULL_ADVANCED": 1.00,
|
|
"FULL": 1.00,
|
|
"PARTIAL": 0.90,
|
|
"SPARSE": 0.80,
|
|
"MISSING": 0.00,
|
|
"ETF_EXCLUDED": None, # ETF는 별도 처리
|
|
}
|
|
|
|
_FIELD_MAX = {
|
|
"roe": 25.0,
|
|
"opm": 20.0,
|
|
"ocf": 15.0,
|
|
"fcf": 15.0,
|
|
"debt": 10.0,
|
|
"valuation": 15.0,
|
|
}
|
|
|
|
|
|
def _load_json(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _num(v: Any, default: float = 0.0) -> float:
|
|
try:
|
|
if v is None or v == "" or v == "N/A":
|
|
return default
|
|
return float(v)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _clip(v: float, lo: float = 0.0, hi: float = 100.0) -> float:
|
|
return max(lo, min(hi, v))
|
|
|
|
|
|
def _grade_from_score(score: float) -> str:
|
|
if score >= 80:
|
|
return "A"
|
|
if score >= 65:
|
|
return "B"
|
|
if score >= 50:
|
|
return "C"
|
|
if score >= 35:
|
|
return "D"
|
|
return "F"
|
|
|
|
|
|
def _score_component(raw: dict[str, Any]) -> tuple[dict[str, float], list[str], dict[str, bool]]:
|
|
"""각 컴포넌트 점수를 산출. Returns (scores, fail_reasons, has_data)."""
|
|
scores: dict[str, float] = {}
|
|
fail_reasons: list[str] = []
|
|
has_data: dict[str, bool] = {}
|
|
|
|
# ── ROE (25점) ────────────────────────────────────────────────────────────
|
|
roe = _num(raw.get("roe_pct"))
|
|
has_data["roe"] = raw.get("roe_pct") is not None and roe != 0.0
|
|
if not has_data["roe"]:
|
|
scores["roe"] = 0.0
|
|
elif roe >= 20:
|
|
scores["roe"] = 25.0
|
|
elif roe >= 15:
|
|
scores["roe"] = 20.0
|
|
elif roe >= 10:
|
|
scores["roe"] = 15.0
|
|
elif roe >= 5:
|
|
scores["roe"] = 8.0
|
|
elif roe > 0:
|
|
scores["roe"] = 3.0
|
|
else:
|
|
scores["roe"] = 0.0
|
|
if has_data["roe"]:
|
|
fail_reasons.append(f"ROE_NEGATIVE({roe:.1f}%)")
|
|
|
|
# ── OPM (20점) ───────────────────────────────────────────────────────────
|
|
opm = _num(raw.get("opm_pct"))
|
|
has_data["opm"] = raw.get("opm_pct") is not None and opm != 0.0
|
|
if not has_data["opm"]:
|
|
scores["opm"] = 0.0
|
|
elif opm >= 20:
|
|
scores["opm"] = 20.0
|
|
elif opm >= 15:
|
|
scores["opm"] = 16.0
|
|
elif opm >= 10:
|
|
scores["opm"] = 12.0
|
|
elif opm >= 5:
|
|
scores["opm"] = 7.0
|
|
elif opm > 0:
|
|
scores["opm"] = 3.0
|
|
else:
|
|
scores["opm"] = 0.0
|
|
if has_data["opm"]:
|
|
fail_reasons.append(f"OPM_NEGATIVE({opm:.1f}%)")
|
|
|
|
# ── OCF/매출 (15점) ───────────────────────────────────────────────────────
|
|
ocf = _num(raw.get("ocf_krw"))
|
|
revenue = _num(raw.get("revenue_krw"))
|
|
has_data["ocf"] = raw.get("ocf_krw") is not None and ocf != 0.0
|
|
if not has_data["ocf"]:
|
|
scores["ocf"] = 0.0
|
|
elif revenue > 0 and ocf > 0:
|
|
ocf_margin = ocf / revenue * 100.0
|
|
if ocf_margin >= 20:
|
|
scores["ocf"] = 15.0
|
|
elif ocf_margin >= 12:
|
|
scores["ocf"] = 12.0
|
|
elif ocf_margin >= 7:
|
|
scores["ocf"] = 8.0
|
|
elif ocf_margin >= 3:
|
|
scores["ocf"] = 4.0
|
|
else:
|
|
scores["ocf"] = 1.0
|
|
elif ocf > 0:
|
|
scores["ocf"] = 7.0 # 매출 없어도 양전
|
|
else:
|
|
scores["ocf"] = 0.0
|
|
if has_data["ocf"]:
|
|
fail_reasons.append("OCF_NEGATIVE")
|
|
|
|
# ── FCF (15점) ────────────────────────────────────────────────────────────
|
|
fcf = _num(raw.get("fcf_krw"))
|
|
has_data["fcf"] = raw.get("fcf_krw") is not None and fcf != 0.0
|
|
if not has_data["fcf"]:
|
|
scores["fcf"] = 0.0
|
|
elif fcf > 0:
|
|
scores["fcf"] = 12.0
|
|
else:
|
|
scores["fcf"] = 0.0
|
|
if has_data["fcf"]:
|
|
fail_reasons.append("FCF_NEGATIVE")
|
|
|
|
# ── 부채비율 (10점) ───────────────────────────────────────────────────────
|
|
net_debt = _num(raw.get("net_debt_krw"))
|
|
has_data["debt"] = raw.get("net_debt_krw") is not None and net_debt != 0.0
|
|
if not has_data["debt"]:
|
|
scores["debt"] = 5.0 # 알 수 없으면 중립 (5점)
|
|
elif net_debt <= 0:
|
|
scores["debt"] = 10.0 # 무부채
|
|
elif revenue > 0 and net_debt / revenue < 0.5:
|
|
scores["debt"] = 8.0
|
|
elif revenue > 0 and net_debt / revenue < 1.5:
|
|
scores["debt"] = 5.0
|
|
else:
|
|
scores["debt"] = 2.0
|
|
fail_reasons.append("HIGH_NET_DEBT")
|
|
|
|
# ── 밸류에이션 (15점) ─────────────────────────────────────────────────────
|
|
per = _num(raw.get("per"))
|
|
pbr = _num(raw.get("pbr"))
|
|
has_data["valuation"] = (raw.get("per") is not None and per > 0) or (raw.get("pbr") is not None and pbr > 0)
|
|
val_score = 0.0
|
|
if not has_data["valuation"]:
|
|
scores["valuation"] = 0.0
|
|
else:
|
|
if 0 < per <= 15:
|
|
val_score += 8.0
|
|
elif 0 < per <= 25:
|
|
val_score += 5.0
|
|
elif 0 < per <= 40:
|
|
val_score += 2.0
|
|
elif per > 40:
|
|
fail_reasons.append(f"HIGH_PER({per:.1f})")
|
|
|
|
if 0 < pbr <= 1.5:
|
|
val_score += 7.0
|
|
elif 0 < pbr <= 3.0:
|
|
val_score += 4.0
|
|
elif 0 < pbr <= 6.0:
|
|
val_score += 2.0
|
|
elif pbr > 6:
|
|
pass # 0점
|
|
scores["valuation"] = _clip(val_score, 0, 15)
|
|
|
|
return scores, fail_reasons, has_data
|
|
|
|
|
|
def _normalize_score(
|
|
scores: dict[str, float],
|
|
has_data: dict[str, bool],
|
|
data_quality: str,
|
|
) -> float:
|
|
"""보유 데이터 기준 100점 환산."""
|
|
multiplier = _QUALITY_MULTIPLIER.get(data_quality, 0.0)
|
|
if multiplier is None or multiplier == 0.0:
|
|
return 0.0
|
|
|
|
# 실제 점수 합산
|
|
raw_total = sum(scores.values())
|
|
|
|
# 보유 필드의 최대 가능 점수 계산
|
|
available_max = 0.0
|
|
for field, max_pts in _FIELD_MAX.items():
|
|
if field == "debt":
|
|
# debt는 데이터 유무 관계없이 항상 5~10점 부여
|
|
available_max += max_pts
|
|
elif has_data.get(field):
|
|
available_max += max_pts
|
|
|
|
if available_max <= 0:
|
|
return 0.0
|
|
|
|
# 100점 환산
|
|
normalized = (raw_total / available_max) * 100.0
|
|
# 품질 계수 적용
|
|
final = normalized * multiplier
|
|
return _clip(final, 0.0, 100.0)
|
|
|
|
|
|
def _score_ticker(raw: dict[str, Any]) -> tuple[float, str, list[str], dict[str, float], bool]:
|
|
"""결정론 점수 산출. Returns (score, grade, fail_reasons, breakdown, buy_allowed)."""
|
|
data_quality = str(raw.get("data_quality") or "MISSING")
|
|
|
|
# ETF 별도 처리
|
|
if data_quality == "ETF_EXCLUDED" or raw.get("is_etf"):
|
|
return 0.0, "ETF", [], {}, False
|
|
|
|
scores, fail_reasons, has_data = _score_component(raw)
|
|
score = _normalize_score(scores, has_data, data_quality)
|
|
score = round(score, 2)
|
|
grade = _grade_from_score(score)
|
|
|
|
# 치명적 실패 사유 필터
|
|
critical_fails = [r for r in fail_reasons if any(
|
|
kw in r for kw in ("NEGATIVE", "HIGH_NET_DEBT")
|
|
)]
|
|
buy_allowed = grade in ("A", "B") and len(critical_fails) == 0
|
|
|
|
breakdown = {k: round(v, 2) for k, v in scores.items()}
|
|
return score, grade, fail_reasons, breakdown, buy_allowed
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--raw", default=str(DEFAULT_RAW))
|
|
ap.add_argument("--json", default=str(DEFAULT_JSON))
|
|
ap.add_argument("--out", default=str(DEFAULT_OUT))
|
|
args = ap.parse_args()
|
|
|
|
raw_path = Path(args.raw)
|
|
out_path = Path(args.out)
|
|
if not raw_path.is_absolute():
|
|
raw_path = ROOT / raw_path
|
|
if not out_path.is_absolute():
|
|
out_path = ROOT / out_path
|
|
|
|
raw_data = _load_json(raw_path)
|
|
raw_rows = raw_data.get("rows") if isinstance(raw_data.get("rows"), list) else []
|
|
|
|
# data_feed에서 이름 맵 구성 (보완)
|
|
json_path = Path(args.json)
|
|
if not json_path.is_absolute():
|
|
json_path = ROOT / json_path
|
|
gtd = _load_json(json_path)
|
|
df_list = (gtd.get("data") or {}).get("data_feed") or []
|
|
name_map = {str(r.get("Ticker") or ""): str(r.get("Name") or "") for r in df_list if isinstance(r, dict)}
|
|
|
|
grade_counts: dict[str, int] = {}
|
|
rows: list[dict[str, Any]] = []
|
|
for raw in raw_rows:
|
|
if not isinstance(raw, dict):
|
|
continue
|
|
ticker = str(raw.get("ticker") or "")
|
|
if not ticker:
|
|
continue
|
|
score, grade, fail_reasons, breakdown, buy_allowed = _score_ticker(raw)
|
|
name = raw.get("name") or name_map.get(ticker, "")
|
|
rows.append({
|
|
"ticker": ticker,
|
|
"name": name,
|
|
"score": score,
|
|
"grade": grade,
|
|
"buy_allowed": buy_allowed,
|
|
"fail_reasons": fail_reasons,
|
|
"breakdown": breakdown,
|
|
"data_quality": raw.get("data_quality", "MISSING"),
|
|
"is_etf": bool(raw.get("is_etf")),
|
|
"as_of_date": raw.get("as_of_date"),
|
|
"source": raw.get("source", "fallback"),
|
|
})
|
|
grade_counts[grade] = grade_counts.get(grade, 0) + 1
|
|
|
|
# Gate: 비-ETF 종목 기준
|
|
non_etf_rows = [r for r in rows if r["grade"] != "ETF"]
|
|
data_missing_count = sum(1 for r in non_etf_rows if r["data_quality"] == "MISSING")
|
|
unique_non_etf_grades = {r["grade"] for r in non_etf_rows}
|
|
grade_diverse = len(unique_non_etf_grades) >= 2
|
|
gate = "PASS" if (non_etf_rows and data_missing_count == 0 and grade_diverse) else (
|
|
"CAUTION" if non_etf_rows else "FAIL"
|
|
)
|
|
|
|
result = {
|
|
"formula_id": "FUNDAMENTAL_MULTIFACTOR_V3",
|
|
"gate": gate,
|
|
"row_count": len(rows),
|
|
"non_etf_count": len(non_etf_rows),
|
|
"data_missing_count": data_missing_count,
|
|
"grade_counts": grade_counts,
|
|
"grade_diverse": grade_diverse,
|
|
"rows": rows,
|
|
}
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(
|
|
f"FUNDAMENTAL_MULTIFACTOR_V3 gate={gate} rows={len(rows)} non_etf={len(non_etf_rows)} "
|
|
f"missing={data_missing_count} grades={grade_counts}"
|
|
)
|
|
print("FUNDAMENTAL_MULTIFACTOR_V3_OK" if gate != "FAIL" else "FUNDAMENTAL_MULTIFACTOR_V3_FAIL")
|
|
return 0 if gate != "FAIL" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|