Files
QuantEngineByItz/tools/build_cashflow_quality_signal_v1.py
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

281 lines
9.4 KiB
Python

"""CASHFLOW_QUALITY_SIGNAL_V1 — 현금흐름 안정성 시그널 산출기.
OCF / FCF 기반으로 종목별 현금흐름 품질을 결정론적으로 라벨링한다.
주 소스: fundamental_raw_v1.json → ocf_krw, fcf_krw
보완 소스: GatherTradingData.json → FCF_B (단위: 십억원)
이익 검증 프록시: EPS > 0 확인 (OCF/FCF 없을 때 최소 수익성 확인)
라벨:
ROBUST ← OCF 양전 + FCF 양전 + OCF/매출 ≥ 10%
STABLE ← OCF 양전 + FCF 양전 (마진 미확인)
VOLATILE ← OCF 양전 XOR FCF 양전 (불일치)
RISKY ← OCF 음전 OR FCF 음전
DATA_MISSING ← 모든 소스 결손
ACCOUNTING_RISK:
Y: OCF < NI 의심 (EPS > 0이나 FCF < 0인 경우)
N: 위험 미감지 또는 데이터 부족
buy_modifier:
ROBUST → +10
STABLE → 0
VOLATILE → -10
RISKY → -20
DATA_MISSING → -5
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_RAW = ROOT / "Temp" / "fundamental_raw_v1.json"
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = ROOT / "Temp" / "cashflow_quality_signal_v1.json"
_BUY_MODIFIER: dict[str, int] = {
"ROBUST": 10,
"STABLE": 0,
"VOLATILE": -10,
"RISKY": -20,
"DATA_MISSING": -5,
"ETF_EXCLUDED": 0,
}
def _load(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
d = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return d if isinstance(d, dict) else {}
def _rows(v: Any) -> list[dict[str, Any]]:
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
return []
def _f(v: Any, default: float | None = None) -> float | None:
if v is None or v == "" or v == "N/A":
return default
try:
return float(v)
except (TypeError, ValueError):
return default
def _classify_from_ocf_fcf(
ocf: float | None,
fcf: float | None,
revenue: float | None,
eps: float | None,
) -> tuple[str, str, str]:
"""OCF/FCF 수치에서 라벨, 근거, ACCOUNTING_RISK 산출."""
if ocf is None and fcf is None:
return "DATA_MISSING", "no_ocf_no_fcf", "N"
accounting_risk = "N"
if ocf is not None and fcf is not None:
ocf_positive = ocf > 0
fcf_positive = fcf > 0
# ACCOUNTING_RISK: EPS>0이나 FCF<0 → 이익 대비 현금 창출 의심
if eps is not None and eps > 0 and not fcf_positive:
accounting_risk = "Y"
if ocf_positive and fcf_positive:
# OCF 마진 확인
if revenue is not None and revenue > 0:
ocf_margin = ocf / revenue * 100.0
if ocf_margin >= 10.0:
return "ROBUST", f"ocf={ocf:.0f}_fcf={fcf:.0f}_ocf_margin={ocf_margin:.1f}%", accounting_risk
return "STABLE", f"ocf={ocf:.0f}_fcf={fcf:.0f}", accounting_risk
if ocf_positive != fcf_positive:
return "VOLATILE", f"ocf={'pos' if ocf_positive else 'neg'}_fcf={'pos' if fcf_positive else 'neg'}", accounting_risk
# 둘 다 음전
return "RISKY", f"ocf={ocf:.0f}_fcf={fcf:.0f}_both_neg", accounting_risk
# 한쪽만 있는 경우
val = ocf if ocf is not None else fcf
label_str = "ocf" if ocf is not None else "fcf"
assert val is not None
if val > 0:
return "STABLE", f"{label_str}_positive({val:.0f})", accounting_risk
# ACCOUNTING_RISK: EPS>0이나 단일 cashflow<0
if eps is not None and eps > 0 and val < 0:
accounting_risk = "Y"
return "RISKY", f"{label_str}_negative({val:.0f})", accounting_risk
def _process_ticker(
ticker: str,
name: str,
raw_row: dict[str, Any] | None,
df_row: dict[str, Any] | None,
is_etf: bool,
) -> dict[str, Any]:
if is_etf:
return {
"ticker": ticker,
"name": name,
"label": "ETF_EXCLUDED",
"buy_modifier": 0,
"confidence": "N/A",
"data_source": "etf_skip",
"proxy_basis": None,
"accounting_risk": "N/A",
"missing_fields": [],
"is_etf": True,
}
missing_fields: list[str] = []
label = "DATA_MISSING"
confidence = "NONE"
data_source = "none"
proxy_basis: str | None = None
accounting_risk = "N"
# ── 1순위: fundamental_raw ocf_krw + fcf_krw ─────────────────────────────
ocf = _f(raw_row.get("ocf_krw") if raw_row else None)
fcf = _f(raw_row.get("fcf_krw") if raw_row else None)
revenue = _f(raw_row.get("revenue_krw") if raw_row else None)
eps_raw = _f(raw_row.get("eps_krw") if raw_row else None)
if ocf is not None or fcf is not None:
label, proxy_basis, accounting_risk = _classify_from_ocf_fcf(ocf, fcf, revenue, eps_raw)
confidence = "HIGH" if (ocf is not None and fcf is not None) else "MEDIUM"
data_source = "fundamental_raw.ocf_fcf"
else:
if raw_row is not None:
missing_fields += ["fundamental_raw.ocf_krw", "fundamental_raw.fcf_krw"]
else:
missing_fields.append("fundamental_raw.(not_found)")
# ── 2순위: data_feed FCF_B (단위: 십억원) ─────────────────────────────
fcf_b = _f(df_row.get("FCF_B") if df_row else None)
eps_df = _f(df_row.get("EPS") if df_row else None)
if fcf_b is not None:
# FCF_B > 0 → positive FCF
fcf_val = fcf_b * 1e9 # 십억원 → 원
if fcf_val > 0:
label = "STABLE"
proxy_basis = f"fcf_b={fcf_b:.2f}B_positive"
confidence = "MEDIUM"
else:
label = "RISKY"
proxy_basis = f"fcf_b={fcf_b:.2f}B_negative"
confidence = "MEDIUM"
if eps_df is not None and eps_df > 0:
accounting_risk = "Y"
data_source = "data_feed.FCF_B"
else:
missing_fields.append("data_feed.FCF_B")
# DATA_MISSING 유지 — EPS만으로는 현금흐름 추정 불가
eps = eps_df
if eps is not None:
proxy_basis = f"eps_only({eps:.0f})_no_cashflow"
data_source = "none"
buy_modifier = _BUY_MODIFIER.get(label, -5)
return {
"ticker": ticker,
"name": name,
"label": label,
"buy_modifier": buy_modifier,
"confidence": confidence,
"data_source": data_source,
"proxy_basis": proxy_basis,
"accounting_risk": accounting_risk,
"missing_fields": missing_fields,
"is_etf": False,
}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--raw", default=str(DEFAULT_RAW))
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
raw_path = Path(args.raw) if Path(args.raw).is_absolute() else ROOT / args.raw
json_path = Path(args.json) if Path(args.json).is_absolute() else ROOT / args.json
out_path = Path(args.out) if Path(args.out).is_absolute() else ROOT / args.out
raw_data = _load(raw_path)
raw_map: dict[str, dict[str, Any]] = {
str(r.get("ticker") or ""): r
for r in _rows(raw_data.get("rows"))
}
gtd = _load(json_path)
df_list = _rows((gtd.get("data") or {}).get("data_feed"))
tickers_seen: set[str] = set()
rows: list[dict[str, Any]] = []
label_counts: dict[str, int] = {}
accounting_risk_count = 0
for df_row in df_list:
ticker = str(df_row.get("Ticker") or "")
if not ticker or ticker in tickers_seen:
continue
tickers_seen.add(ticker)
name = str(df_row.get("Name") or "")
is_etf = (
df_row.get("EPS") is None
and df_row.get("Forward_PE") is None
and df_row.get("PBR") is None
)
raw_row = raw_map.get(ticker)
if raw_row is not None:
is_etf = bool(raw_row.get("is_etf", is_etf))
result = _process_ticker(ticker, name, raw_row, df_row, is_etf)
rows.append(result)
lbl = result["label"]
label_counts[lbl] = label_counts.get(lbl, 0) + 1
if result.get("accounting_risk") == "Y":
accounting_risk_count += 1
non_etf = [r for r in rows if not r["is_etf"]]
data_missing_pct = (
sum(1 for r in non_etf if r["label"] == "DATA_MISSING") / len(non_etf) * 100
if non_etf else 0.0
)
gate = "PASS" if non_etf else "FAIL"
out = {
"formula_id": "CASHFLOW_QUALITY_SIGNAL_V1",
"gate": gate,
"data_missing_pct": round(data_missing_pct, 1),
"accounting_risk_count": accounting_risk_count,
"label_counts": label_counts,
"row_count": len(rows),
"non_etf_count": len(non_etf),
"rows": rows,
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
status = "CASHFLOW_QUALITY_SIGNAL_V1_OK" if gate != "FAIL" else "CASHFLOW_QUALITY_SIGNAL_V1_FAIL"
print(
f"CASHFLOW_QUALITY_SIGNAL_V1 gate={gate} rows={len(rows)} "
f"non_etf={len(non_etf)} data_missing_pct={data_missing_pct:.1f}% "
f"accounting_risk={accounting_risk_count} labels={label_counts}"
)
print(status)
return 0
if __name__ == "__main__":
raise SystemExit(main())