from __future__ import annotations import json from collections import defaultdict from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[2] ETF_NAME_HINTS = ( "KODEX", "TIGER", "RISE", "KBSTAR", "ARIRANG", "ACE", "KOSEF", "HANARO", "SOL", "TIMEFOLIO", "WOORI", "PLUS", "NPLUS", "TREX", "FOCUS", "KIWOOM", ) ROBOTICS_FALLBACK_PROXY = { "Sector": "로보틱스", "Proxy_Ticker": "0190C0", "Proxy_Name": "RISE 현대차고정피지컬AI", "Proxy_Type": "ETF", "Sector_Rank": 12, "SmartMoney_5D_KRW": 0.0, "Sector_Ret20D": 0.0, } ROBOTICS_FALLBACK_UNIVERSE = [ {"Sector": "로보틱스", "Proxy_Ticker": "0190C0", "Proxy_Name": "RISE 현대차고정피지컬AI", "Proxy_Type": "ETF", "Constituent_Code": "005380", "Constituent_Name": "현대차", "Weight": 0.2402, "Is_ETF": False}, {"Sector": "로보틱스", "Proxy_Ticker": "0190C0", "Proxy_Name": "RISE 현대차고정피지컬AI", "Proxy_Type": "ETF", "Constituent_Code": "012330", "Constituent_Name": "현대모비스", "Weight": 0.1588, "Is_ETF": False}, {"Sector": "로보틱스", "Proxy_Ticker": "0190C0", "Proxy_Name": "RISE 현대차고정피지컬AI", "Proxy_Type": "ETF", "Constituent_Code": "011070", "Constituent_Name": "LG이노텍", "Weight": 0.1450, "Is_ETF": False}, {"Sector": "로보틱스", "Proxy_Ticker": "0190C0", "Proxy_Name": "RISE 현대차고정피지컬AI", "Proxy_Type": "ETF", "Constituent_Code": "000270", "Constituent_Name": "기아", "Weight": 0.1234, "Is_ETF": False}, {"Sector": "로보틱스", "Proxy_Ticker": "0190C0", "Proxy_Name": "RISE 현대차고정피지컬AI", "Proxy_Type": "ETF", "Constituent_Code": "307950", "Constituent_Name": "현대오토에버", "Weight": 0.0899, "Is_ETF": False}, {"Sector": "로보틱스", "Proxy_Ticker": "0190C0", "Proxy_Name": "RISE 현대차고정피지컬AI", "Proxy_Type": "ETF", "Constituent_Code": "277810", "Constituent_Name": "레인보우로보틱스", "Weight": 0.0673, "Is_ETF": False}, {"Sector": "로보틱스", "Proxy_Ticker": "0190C0", "Proxy_Name": "RISE 현대차고정피지컬AI", "Proxy_Type": "ETF", "Constituent_Code": "064400", "Constituent_Name": "LG씨엔에스", "Weight": 0.0519, "Is_ETF": False}, {"Sector": "로보틱스", "Proxy_Ticker": "0190C0", "Proxy_Name": "RISE 현대차고정피지컬AI", "Proxy_Type": "ETF", "Constituent_Code": "454910", "Constituent_Name": "두산로보틱스", "Weight": 0.0367, "Is_ETF": False}, {"Sector": "로보틱스", "Proxy_Ticker": "0190C0", "Proxy_Name": "RISE 현대차고정피지컬AI", "Proxy_Type": "ETF", "Constituent_Code": "108490", "Constituent_Name": "로보티즈", "Weight": 0.0240, "Is_ETF": False}, {"Sector": "로보틱스", "Proxy_Ticker": "0190C0", "Proxy_Name": "RISE 현대차고정피지컬AI", "Proxy_Type": "ETF", "Constituent_Code": "058610", "Constituent_Name": "에스피지", "Weight": 0.0173, "Is_ETF": False}, ] def _parse_jsonish(value: Any) -> Any: if isinstance(value, (dict, list)): return value if isinstance(value, str) and value.strip(): try: return json.loads(value) except Exception: return value return value def _load_payload(payload: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]: data = payload.get("data") if isinstance(payload.get("data"), dict) else {} hctx = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {} return data, hctx def _num(value: Any, default: float = 0.0) -> float: try: return float(value) except Exception: return default def _txt(value: Any, default: str = "") -> str: if value is None: return default text = str(value).strip() return text if text else default def _is_etf_like_name(name: str) -> bool: upper = name.upper() return any(hint in upper for hint in ETF_NAME_HINTS) def _liquidity_rank(value: str) -> int: upper = value.upper() if upper in {"PREFERRED", "OK", "GOOD"}: return 0 if upper in {"WATCH", "NORMAL", "TRACK"}: return 1 if upper in {"CAUTION", "WARN", "RISK"}: return 2 return 3 def _monitor_state(row: dict[str, Any]) -> str: liquidity = _txt(row.get("Liquidity_Status"), "UNKNOWN").upper() quote = _txt(row.get("Quote_Status"), "UNKNOWN").upper() spread = _txt(row.get("Spread_Status"), "UNKNOWN").upper() close = _num(row.get("Close"), 0.0) ma20 = _num(row.get("MA20"), 0.0) ret20d = _num(row.get("Ret20D"), 0.0) if quote not in {"NAVER_QUOTE_OK", "OK"} or spread not in {"OK"}: return "CAUTION" if liquidity == "PREFERRED" and close >= ma20 and ret20d > 0: return "BUY_REVIEW" if ret20d > 0 and close >= ma20: return "TRACK" return "WATCH" def _selection_score(row: dict[str, Any], is_weighted: bool) -> float: liquidity = _txt(row.get("Liquidity_Status"), "UNKNOWN").upper() quote = _txt(row.get("Quote_Status"), "UNKNOWN").upper() spread = _num(row.get("Spread_Pct"), 99.0) ret20d = _num(row.get("Ret20D"), 0.0) avgtrade = _num(row.get("AvgTradeValue_20D_KRW"), 0.0) score = 0.0 if is_weighted: score += 3.0 if liquidity == "PREFERRED": score += 3.0 elif liquidity in {"WATCH", "NORMAL", "TRACK"}: score += 1.5 if quote in {"NAVER_QUOTE_OK", "OK"}: score += 1.0 if spread <= 0.2: score += 1.0 elif spread <= 0.5: score += 0.5 if ret20d >= 0: score += 1.0 if avgtrade >= 50_000_000_000: score += 1.0 return round(score, 2) def _constituent_priority_score( spec: dict[str, Any], live_row: dict[str, Any] | None, ) -> tuple[float, float, float, float, float, str]: weight = _num(spec.get("Weight"), 0.0) live_score = 0.0 liquidity_rank = 99.0 spread = 99.0 ret20d = -999.0 name = _txt(spec.get("Constituent_Name")) if isinstance(live_row, dict): live_score = _selection_score(live_row, True) liquidity_rank = float(_liquidity_rank(_txt(live_row.get("Liquidity_Status"), "UNKNOWN"))) spread = _num(live_row.get("Spread_Pct"), 99.0) ret20d = _num(live_row.get("Ret20D"), -999.0) if not name: name = _txt(live_row.get("Name")) return (-weight, -live_score, liquidity_rank, spread, -ret20d, name) def _build_rep_item( row: dict[str, Any], spec: dict[str, Any], proxy: dict[str, Any], source_kind: str, original_constituent: str = "", original_constituent_name: str = "", ) -> dict[str, Any]: alignment = "ALIGNED" if (_num(row.get("Ret20D"), 0.0) >= 0) == (_num(proxy.get("Sector_Ret20D"), 0.0) >= 0) else "DIVERGING" item = { "ticker": _txt(row.get("Ticker"), _txt(spec.get("Constituent_Code"), _txt(spec.get("Ticker")))), "name": _txt(row.get("Name"), _txt(spec.get("Constituent_Name"), _txt(spec.get("Name")))), "weight": spec.get("Weight", ""), "close": row.get("Close", ""), "ma20": row.get("MA20", ""), "ret10d": row.get("Ret10D", ""), "ret20d": row.get("Ret20D", ""), "ret60d": row.get("Ret60D", ""), "avgtradevalue20d_krw": row.get("AvgTradeValue_20D_KRW", ""), "spread_pct": row.get("Spread_Pct", ""), "quote_status": _txt(row.get("Quote_Status")), "liquidity_status": _txt(row.get("Liquidity_Status")), "frg_5d": row.get("Frg_5D", ""), "monitor_state": _monitor_state(row), "proxy_alignment": alignment, "selection_source": source_kind, "selection_score": _selection_score(row, source_kind == "ETF_CONSTITUENT_WEIGHT"), } if original_constituent: item["original_constituent_ticker"] = original_constituent if original_constituent_name: item["original_constituent_name"] = original_constituent_name return item def build_etf_representative_monitor(payload: dict[str, Any]) -> dict[str, Any]: data, hctx = _load_payload(payload) sector_flow = data.get("sector_flow") if isinstance(data.get("sector_flow"), list) else [] core_satellite = data.get("core_satellite") if isinstance(data.get("core_satellite"), list) else [] sector_universe = data.get("sector_universe") if isinstance(data.get("sector_universe"), list) else [] sector_flow = [r for r in sector_flow if isinstance(r, dict)] core_satellite = [r for r in core_satellite if isinstance(r, dict)] sector_universe = [r for r in sector_universe if isinstance(r, dict)] etf_sectors: dict[str, dict[str, Any]] = {} for row in sector_flow: sector = _txt(row.get("Sector")) if not sector: continue if _txt(row.get("Proxy_Type")).upper() == "ETF": etf_sectors[sector] = row if "로보틱스" not in etf_sectors: etf_sectors["로보틱스"] = ROBOTICS_FALLBACK_PROXY sector_candidates: dict[str, list[dict[str, Any]]] = defaultdict(list) core_by_ticker: dict[str, dict[str, Any]] = {} for row in core_satellite: sector = _txt(row.get("Sector")) name = _txt(row.get("Name")) ticker = _txt(row.get("Ticker")) if not sector or not ticker: continue core_by_ticker[ticker] = row if _is_etf_like_name(name): continue sector_candidates[sector].append(row) universe_candidates: dict[str, list[dict[str, Any]]] = defaultdict(list) for row in sector_universe: sector = _txt(row.get("Sector")) constituent = _txt(row.get("Constituent_Code")) if not sector or not constituent: continue if _txt(row.get("Is_ETF")).upper() == "Y": continue if _txt(row.get("Enabled"), "Y").upper() == "N": continue if _txt(row.get("Status"), "OK").upper() not in {"OK", "ACTIVE", "LIVE"}: continue universe_candidates[sector].append(row) if "로보틱스" not in universe_candidates: universe_candidates["로보틱스"] = ROBOTICS_FALLBACK_UNIVERSE.copy() rows: list[dict[str, Any]] = [] for sector, proxy in sorted(etf_sectors.items(), key=lambda item: (_num(item[1].get("Sector_Rank"), 999), -abs(_num(item[1].get("SmartMoney_5D_KRW"), 0.0)))): target_rep_count = 5 if sector == "로보틱스" else 3 fallback_rows = sorted( sector_candidates.get(sector, []), key=lambda r: ( _liquidity_rank(_txt(r.get("Liquidity_Status"), "UNKNOWN")), -_num(r.get("AvgTradeValue_20D_KRW"), 0.0), -_num(r.get("Ret20D"), 0.0), -_num(r.get("Ret10D"), 0.0), ), ) # ETF 대표주는 구성비 내림차순을 1차 기준으로 고정한다. # live score는 동일 비중/동일 구성일 때만 보조 판단으로 사용한다. universe_rows = sorted( universe_candidates.get(sector, []), key=lambda r: ( -_num(r.get("Weight"), 0.0), _constituent_priority_score( r, core_by_ticker.get(_txt(r.get("Constituent_Code"))) or next((x for x in fallback_rows if _txt(x.get("Ticker")) == _txt(r.get("Constituent_Code"))), None), ), ), ) basket_items: list[dict[str, Any]] = [] selected_specs: list[tuple[str, dict[str, Any]]] = [("ETF_CONSTITUENT_WEIGHT", row) for row in universe_rows[:target_rep_count]] selected_tickers = {_txt(row.get("Constituent_Code")) for row in universe_rows[:target_rep_count]} if len(selected_specs) < target_rep_count: for row in fallback_rows: ticker = _txt(row.get("Ticker")) if not ticker or ticker in selected_tickers: continue selected_specs.append(("SECTOR_LIQUIDITY_FALLBACK", row)) selected_tickers.add(ticker) if len(selected_specs) >= target_rep_count: break if not selected_specs: selected_specs = [("SECTOR_LIQUIDITY_FALLBACK", row) for row in fallback_rows[:target_rep_count]] rep_source = "ETF_CONSTITUENT_WEIGHT" if universe_rows else "SECTOR_LIQUIDITY_FALLBACK" rep_basis_detail = "ETF_WEIGHT_PRIMARY" if universe_rows and len(universe_rows) < target_rep_count and len(selected_specs) >= target_rep_count: rep_basis_detail = "ETF_WEIGHT_PRIMARY_PLUS_SECTOR_TOPUP" if not universe_rows: rep_basis_detail = "SECTOR_LIQUIDITY_FALLBACK" for source_kind, spec in selected_specs: if source_kind == "ETF_CONSTITUENT_WEIGHT": ticker = _txt(spec.get("Constituent_Code")) rep = core_by_ticker.get(ticker) if rep is None: rep = next((r for r in fallback_rows if _txt(r.get("Ticker")) == ticker), None) if rep is None: rep = next((r for r in fallback_rows if _txt(r.get("Ticker")) not in selected_tickers), None) if rep is not None: source_kind = "SECTOR_LIQUIDITY_FALLBACK_REPLACEMENT" else: rep = spec if not rep: basket_items.append({ "ticker": _txt(spec.get("Constituent_Code"), _txt(spec.get("Ticker"))), "name": _txt(spec.get("Constituent_Name"), _txt(spec.get("Name"))), "weight": spec.get("Weight", ""), "close": "DATA_MISSING — 하네스 업데이트 필요", "ma20": "DATA_MISSING — 하네스 업데이트 필요", "ret10d": "DATA_MISSING — 하네스 업데이트 필요", "ret20d": "DATA_MISSING — 하네스 업데이트 필요", "ret60d": "DATA_MISSING — 하네스 업데이트 필요", "avgtradevalue20d_krw": "DATA_MISSING — 하네스 업데이트 필요", "spread_pct": "DATA_MISSING — 하네스 업데이트 필요", "quote_status": "DATA_MISSING — 하네스 업데이트 필요", "liquidity_status": "DATA_MISSING — 하네스 업데이트 필요", "frg_5d": "DATA_MISSING — 하네스 업데이트 필요", "monitor_state": "DATA_MISSING", "proxy_alignment": "UNKNOWN", "selection_source": source_kind, "selection_score": 0.0, "replacement_reason": "NO_LIVE_REPLACEMENT", }) continue basket_items.append(_build_rep_item( rep, spec, proxy, source_kind, _txt(spec.get("Constituent_Code")), _txt(spec.get("Constituent_Name")), )) if len(basket_items) < target_rep_count: used_tickers = {item["ticker"] for item in basket_items} for rep in fallback_rows: ticker = _txt(rep.get("Ticker")) if not ticker or ticker in used_tickers: continue basket_items.append(_build_rep_item(rep, {"Weight": ""}, proxy, "SECTOR_LIQUIDITY_FALLBACK")) used_tickers.add(ticker) if len(basket_items) >= target_rep_count: break if not basket_items: continue primary = basket_items[0] basket_buy = sum(1 for r in basket_items if r.get("monitor_state") == "BUY_REVIEW") basket_track = sum(1 for r in basket_items if r.get("monitor_state") == "TRACK") basket_watch = sum(1 for r in basket_items if r.get("monitor_state") == "WATCH") basket_caution = sum(1 for r in basket_items if r.get("monitor_state") == "CAUTION") basket_aligned = sum(1 for r in basket_items if r.get("proxy_alignment") == "ALIGNED") basket_missing = sum(1 for r in basket_items if r.get("monitor_state") == "DATA_MISSING") basket_real = len(basket_items) - basket_missing basket_coverage_pct = round((basket_real / len(basket_items)) * 100.0, 2) if basket_items else 0.0 basket_quality_state = "COMPLETE" if basket_missing == 0 else "PARTIAL" basket_state = "BUY_REVIEW" if basket_buy >= 2 and basket_aligned >= 2 else ( "CAUTION" if basket_caution > 0 else "TRACK" if basket_track > 0 else "WATCH" ) rows.append({ "sector": sector, "etf_proxy_ticker": _txt(proxy.get("Proxy_Ticker")), "etf_proxy_name": _txt(proxy.get("Proxy_Name")), "etf_proxy_type": _txt(proxy.get("Proxy_Type")), "universe_source": _txt(proxy.get("Universe_Source"), "DEFAULT_TEMPLATE"), "sector_rank": proxy.get("Sector_Rank", ""), "sector_score": proxy.get("Sector_Score", ""), "sector_smart_money_5d_krw": proxy.get("SmartMoney_5D_KRW", ""), "sector_ret20d": proxy.get("Sector_Ret20D", ""), "representative_count": len(basket_items), "representative_ticker": primary["ticker"], "representative_name": primary["name"], "representative_basis": rep_source, "representative_basis_detail": rep_basis_detail, "constituent_weight": primary["weight"], "weight_sum_stocks_only": universe_rows[0].get("Weight_Sum_Stocks_Only", "") if universe_rows else "", "weight_sum_all": universe_rows[0].get("Weight_Sum_All", "") if universe_rows else "", "representative_close": primary["close"], "representative_ma20": primary["ma20"], "representative_ret10d": primary["ret10d"], "representative_ret20d": primary["ret20d"], "representative_ret60d": primary["ret60d"], "representative_avgtradevalue20d_krw": primary["avgtradevalue20d_krw"], "representative_spread_pct": primary["spread_pct"], "representative_quote_status": primary["quote_status"], "representative_liquidity_status": primary["liquidity_status"], "representative_frg_5d": primary["frg_5d"], "monitor_state": basket_state, "proxy_alignment": "ALIGNED" if basket_aligned >= 2 else "DIVERGING", "basket_buy_review_count": basket_buy, "basket_track_count": basket_track, "basket_watch_count": basket_watch, "basket_caution_count": basket_caution, "basket_aligned_count": basket_aligned, "basket_missing_count": basket_missing, "basket_real_count": basket_real, "basket_coverage_pct": basket_coverage_pct, "basket_quality_state": basket_quality_state, "representatives": basket_items, "monitor_reason": ( f"ETF 구성비중 상위 {target_rep_count}종목이 같은 방향으로 정렬" if basket_state == "BUY_REVIEW" else "대표 종목 바스켓 추세 확인 중" if basket_state == "TRACK" else "유동성/추세 보수 모니터링" ), }) buy_review = sum(1 for r in rows if r.get("monitor_state") == "BUY_REVIEW") track = sum(1 for r in rows if r.get("monitor_state") == "TRACK") watch = sum(1 for r in rows if r.get("monitor_state") == "WATCH") caution = sum(1 for r in rows if r.get("monitor_state") == "CAUTION") aligned = sum(1 for r in rows if r.get("proxy_alignment") == "ALIGNED") weighted_basis = sum(1 for r in rows if r.get("representative_basis") == "ETF_CONSTITUENT_WEIGHT") fallback_basis = sum(1 for r in rows if r.get("representative_basis") == "SECTOR_LIQUIDITY_FALLBACK") complete_basket_count = sum(1 for r in rows if r.get("basket_quality_state") == "COMPLETE") partial_basket_count = sum(1 for r in rows if r.get("basket_quality_state") == "PARTIAL") basket_missing_total = sum(_num(r.get("basket_missing_count"), 0.0) for r in rows) result = { "formula_id": "ETF_REPRESENTATIVE_MONITOR_V1", "gate": "PASS" if rows else "DATA_MISSING", "etf_sector_count": len(etf_sectors), "tracked_count": len(rows), "summary": { "buy_review_count": buy_review, "track_count": track, "watch_count": watch, "caution_count": caution, "aligned_count": aligned, "weighted_basis_count": weighted_basis, "fallback_basis_count": fallback_basis, "complete_basket_count": complete_basket_count, "partial_basket_count": partial_basket_count, "basket_missing_total": basket_missing_total, "selected_sector_count": len({r["sector"] for r in rows}), "top_rep_names": [", ".join(rep["name"] for rep in r.get("representatives", [])) for r in rows[:3]], }, "rows": rows, "source": { "sector_flow_rows": len(sector_flow), "core_satellite_rows": len(core_satellite), "sector_universe_rows": len(sector_universe), "template_source_count": sum(1 for r in rows if str(r.get("universe_source") or "").upper() == "DEFAULT_TEMPLATE"), }, } return result