"""CASHFLOW_QUALITY_SIGNAL_V1 — 현금흐름 안정성 시그널 산출기. OCF / FCF 기반으로 종목별 현금흐름 품질을 결정론적으로 라벨링한다. 주 소스: fundamental_raw_v1.json → ocf_krw, fcf_krw 보완 소스: GatherTradingData.json → FCF_B (단위: 십억원) 이익 검증 프록시: EPS > 0 확인 (OCF/FCF 없을 때 최소 수익성 확인) 라벨: ROBUST ← OCF 양전 + FCF 양전 + OCF/매출 ≥ 10% STABLE ← OCF 양전 + FCF 양전 (마진 미확인) VOLATILE ← OCF 양전 XOR FCF 양전 (불일치) RISKY ← OCF 음전 OR FCF 음전 DATA_MISSING ← 모든 소스 결손 ACCOUNTING_RISK: Y: OCF < NI 의심 (EPS > 0이나 FCF < 0인 경우) N: 위험 미감지 또는 데이터 부족 buy_modifier: ROBUST → +10 STABLE → 0 VOLATILE → -10 RISKY → -20 DATA_MISSING → -5 """ from __future__ import annotations import argparse import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_RAW = ROOT / "Temp" / "fundamental_raw_v1.json" DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = ROOT / "Temp" / "cashflow_quality_signal_v1.json" _BUY_MODIFIER: dict[str, int] = { "ROBUST": 10, "STABLE": 0, "VOLATILE": -10, "RISKY": -20, "DATA_MISSING": -5, "ETF_EXCLUDED": 0, } def _load(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: d = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return d if isinstance(d, dict) else {} def _rows(v: Any) -> list[dict[str, Any]]: if isinstance(v, list): return [x for x in v if isinstance(x, dict)] return [] def _f(v: Any, default: float | None = None) -> float | None: if v is None or v == "" or v == "N/A": return default try: return float(v) except (TypeError, ValueError): return default def _classify_from_ocf_fcf( ocf: float | None, fcf: float | None, revenue: float | None, eps: float | None, ) -> tuple[str, str, str]: """OCF/FCF 수치에서 라벨, 근거, ACCOUNTING_RISK 산출.""" if ocf is None and fcf is None: return "DATA_MISSING", "no_ocf_no_fcf", "N" accounting_risk = "N" if ocf is not None and fcf is not None: ocf_positive = ocf > 0 fcf_positive = fcf > 0 # ACCOUNTING_RISK: EPS>0이나 FCF<0 → 이익 대비 현금 창출 의심 if eps is not None and eps > 0 and not fcf_positive: accounting_risk = "Y" if ocf_positive and fcf_positive: # OCF 마진 확인 if revenue is not None and revenue > 0: ocf_margin = ocf / revenue * 100.0 if ocf_margin >= 10.0: return "ROBUST", f"ocf={ocf:.0f}_fcf={fcf:.0f}_ocf_margin={ocf_margin:.1f}%", accounting_risk return "STABLE", f"ocf={ocf:.0f}_fcf={fcf:.0f}", accounting_risk if ocf_positive != fcf_positive: return "VOLATILE", f"ocf={'pos' if ocf_positive else 'neg'}_fcf={'pos' if fcf_positive else 'neg'}", accounting_risk # 둘 다 음전 return "RISKY", f"ocf={ocf:.0f}_fcf={fcf:.0f}_both_neg", accounting_risk # 한쪽만 있는 경우 val = ocf if ocf is not None else fcf label_str = "ocf" if ocf is not None else "fcf" assert val is not None if val > 0: return "STABLE", f"{label_str}_positive({val:.0f})", accounting_risk # ACCOUNTING_RISK: EPS>0이나 단일 cashflow<0 if eps is not None and eps > 0 and val < 0: accounting_risk = "Y" return "RISKY", f"{label_str}_negative({val:.0f})", accounting_risk def _process_ticker( ticker: str, name: str, raw_row: dict[str, Any] | None, df_row: dict[str, Any] | None, is_etf: bool, ) -> dict[str, Any]: if is_etf: return { "ticker": ticker, "name": name, "label": "ETF_EXCLUDED", "buy_modifier": 0, "confidence": "N/A", "data_source": "etf_skip", "proxy_basis": None, "accounting_risk": "N/A", "missing_fields": [], "is_etf": True, } missing_fields: list[str] = [] label = "DATA_MISSING" confidence = "NONE" data_source = "none" proxy_basis: str | None = None accounting_risk = "N" # ── 1순위: fundamental_raw ocf_krw + fcf_krw ───────────────────────────── ocf = _f(raw_row.get("ocf_krw") if raw_row else None) fcf = _f(raw_row.get("fcf_krw") if raw_row else None) revenue = _f(raw_row.get("revenue_krw") if raw_row else None) eps_raw = _f(raw_row.get("eps_krw") if raw_row else None) if ocf is not None or fcf is not None: label, proxy_basis, accounting_risk = _classify_from_ocf_fcf(ocf, fcf, revenue, eps_raw) confidence = "HIGH" if (ocf is not None and fcf is not None) else "MEDIUM" data_source = "fundamental_raw.ocf_fcf" else: if raw_row is not None: missing_fields += ["fundamental_raw.ocf_krw", "fundamental_raw.fcf_krw"] else: missing_fields.append("fundamental_raw.(not_found)") # ── 2순위: data_feed FCF_B (단위: 십억원) ───────────────────────────── fcf_b = _f(df_row.get("FCF_B") if df_row else None) eps_df = _f(df_row.get("EPS") if df_row else None) if fcf_b is not None: # FCF_B > 0 → positive FCF fcf_val = fcf_b * 1e9 # 십억원 → 원 if fcf_val > 0: label = "STABLE" proxy_basis = f"fcf_b={fcf_b:.2f}B_positive" confidence = "MEDIUM" else: label = "RISKY" proxy_basis = f"fcf_b={fcf_b:.2f}B_negative" confidence = "MEDIUM" if eps_df is not None and eps_df > 0: accounting_risk = "Y" data_source = "data_feed.FCF_B" else: missing_fields.append("data_feed.FCF_B") # DATA_MISSING 유지 — EPS만으로는 현금흐름 추정 불가 eps = eps_df if eps is not None: proxy_basis = f"eps_only({eps:.0f})_no_cashflow" data_source = "none" buy_modifier = _BUY_MODIFIER.get(label, -5) return { "ticker": ticker, "name": name, "label": label, "buy_modifier": buy_modifier, "confidence": confidence, "data_source": data_source, "proxy_basis": proxy_basis, "accounting_risk": accounting_risk, "missing_fields": missing_fields, "is_etf": False, } def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--raw", default=str(DEFAULT_RAW)) ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() raw_path = Path(args.raw) if Path(args.raw).is_absolute() else ROOT / args.raw json_path = Path(args.json) if Path(args.json).is_absolute() else ROOT / args.json out_path = Path(args.out) if Path(args.out).is_absolute() else ROOT / args.out raw_data = _load(raw_path) raw_map: dict[str, dict[str, Any]] = { str(r.get("ticker") or ""): r for r in _rows(raw_data.get("rows")) } gtd = _load(json_path) df_list = _rows((gtd.get("data") or {}).get("data_feed")) tickers_seen: set[str] = set() rows: list[dict[str, Any]] = [] label_counts: dict[str, int] = {} accounting_risk_count = 0 for df_row in df_list: ticker = str(df_row.get("Ticker") or "") if not ticker or ticker in tickers_seen: continue tickers_seen.add(ticker) name = str(df_row.get("Name") or "") is_etf = ( df_row.get("EPS") is None and df_row.get("Forward_PE") is None and df_row.get("PBR") is None ) raw_row = raw_map.get(ticker) if raw_row is not None: is_etf = bool(raw_row.get("is_etf", is_etf)) result = _process_ticker(ticker, name, raw_row, df_row, is_etf) rows.append(result) lbl = result["label"] label_counts[lbl] = label_counts.get(lbl, 0) + 1 if result.get("accounting_risk") == "Y": accounting_risk_count += 1 non_etf = [r for r in rows if not r["is_etf"]] data_missing_pct = ( sum(1 for r in non_etf if r["label"] == "DATA_MISSING") / len(non_etf) * 100 if non_etf else 0.0 ) gate = "PASS" if non_etf else "FAIL" out = { "formula_id": "CASHFLOW_QUALITY_SIGNAL_V1", "gate": gate, "data_missing_pct": round(data_missing_pct, 1), "accounting_risk_count": accounting_risk_count, "label_counts": label_counts, "row_count": len(rows), "non_etf_count": len(non_etf), "rows": rows, } out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8") status = "CASHFLOW_QUALITY_SIGNAL_V1_OK" if gate != "FAIL" else "CASHFLOW_QUALITY_SIGNAL_V1_FAIL" print( f"CASHFLOW_QUALITY_SIGNAL_V1 gate={gate} rows={len(rows)} " f"non_etf={len(non_etf)} data_missing_pct={data_missing_pct:.1f}% " f"accounting_risk={accounting_risk_count} labels={label_counts}" ) print(status) return 0 if __name__ == "__main__": raise SystemExit(main())