from __future__ import annotations import argparse import datetime as dt import json import re import shutil import sys from collections import OrderedDict from pathlib import Path from typing import Any from urllib.parse import urljoin, urlparse, parse_qs import requests from bs4 import BeautifulSoup from openpyxl import load_workbook from openpyxl.styles import Alignment, Font, PatternFill from openpyxl.utils import get_column_letter ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from src.quant_engine.sector_universe_refresh import build_sector_universe_refresh_audit DEFAULT_INPUT_XLSX = ROOT / "GatherTradingData.xlsx" DEFAULT_OUTPUT_XLSX = ROOT / "outputs" / "sector_universe_refresh" / "GatherTradingData_sector_universe.xlsx" DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0 Safari/537.36" NAVER_BASE = "https://finance.naver.com" NAVER_ITEM_CODE_RE = re.compile(r"(?:https?:)?//finance\.naver\.com(?P/item/[^\"'\s<>]+code=(?P\d+)[^\"'\s<>]*)", re.I) NAVER_REL_CODE_RE = re.compile(r"(?P/item/[^\"'\s<>]+code=(?P\d+)[^\"'\s<>]*)", re.I) TITLE_FILL = PatternFill("solid", fgColor="1F4E78") HEADER_FILL = PatternFill("solid", fgColor="1F4E78") SUBHEADER_FILL = PatternFill("solid", fgColor="D9EAF7") WHITE_FONT = Font(color="FFFFFF", bold=True) BOLD_FONT = Font(bold=True) NOTE_FONT = Font(italic=True, color="666666") def _kst_now() -> dt.datetime: return dt.datetime.now(dt.timezone(dt.timedelta(hours=9))) def _kst_today() -> str: return _kst_now().strftime("%Y-%m-%d") def _clean_text(value: Any) -> str: if value is None: return "" return str(value).strip() def _normalize_code(value: Any) -> str: text = _clean_text(value) if not text: return "" text = text.replace(",", "") if text.endswith(".0"): text = text[:-2] if text.isdigit(): return text.zfill(6) if re.fullmatch(r"\d+\.\d+", text): return str(int(float(text))).zfill(6) return text def _parse_weight(value: str) -> float | None: text = _clean_text(value).replace("%", "").replace(",", "") if not text: return None try: return float(text) except Exception: return None def _discover_naver_candidate_urls(soup: BeautifulSoup, proxy_ticker: str) -> list[str]: candidates: list[str] = [] seen: set[str] = set() def add(url: str) -> None: url = _clean_text(url) if not url or url in seen: return seen.add(url) candidates.append(url) expected_code = _normalize_code(proxy_ticker) for script in soup.find_all("script"): src = _clean_text(script.get("src")) if src: if expected_code and expected_code in src: if src.startswith("//"): add(f"https:{src}") elif src.startswith("/"): add(urljoin(NAVER_BASE, src)) else: add(src) continue text = script.get_text(" ", strip=True) or "" if not text: continue for regex in (NAVER_ITEM_CODE_RE, NAVER_REL_CODE_RE): for match in regex.finditer(text): code = _normalize_code(match.groupdict().get("code") or "") if expected_code and code and code != expected_code: continue path = match.groupdict().get("path") or "" if path: add(urljoin(NAVER_BASE, path)) return candidates def _parse_naver_etf_holdings(session: requests.Session, proxy_ticker: str, limit: int) -> dict[str, Any]: url_candidates = [ f"{NAVER_BASE}/item/main.naver?code={proxy_ticker}", f"{NAVER_BASE}/item/coinfo.naver?code={proxy_ticker}&target=cu_more", ] last_message = "" for url in url_candidates: response = session.get(url, timeout=20) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") discovered = _discover_naver_candidate_urls(soup, proxy_ticker) for candidate in discovered: if candidate not in url_candidates: url_candidates.append(candidate) section = soup.select_one("div.section.etf_asset") table = section.select_one("table.tb_type1_a") if section is not None else None if table is None: # layout changed or this endpoint does not expose the constituent table last_message = "ETF constituent table missing; page structure may have changed" continue holdings: list[dict[str, Any]] = [] for tr in table.select("tbody tr"): tds = tr.find_all("td") if len(tds) < 3: continue name_link = tr.find("a", href=re.compile(r"code=\d+")) if name_link is None: continue name = _clean_text(name_link.get_text(" ", strip=True)) href = _clean_text(name_link.get("href")) m = re.search(r"code=(\d+)", href) code = _normalize_code(m.group(1) if m else "") if not code or not name: continue weight = _parse_weight(tds[2].get_text(" ", strip=True)) if weight is None: continue holdings.append({ "Constituent_Code": code, "Constituent_Name": name, "Weight": round(weight / 100.0, 6), "Source": "NAVER_ETF_PAGE", }) if len(holdings) >= limit: break if holdings: return { "source_url": url, "source_kind": "NAVER_ETF_PAGE", "holdings": holdings, "discovered_urls": discovered, "message": "", } last_message = "no holdings parsed" return { "source_url": url_candidates[0], "source_kind": "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED", "holdings": [], "discovered_urls": [], "message": last_message or "page structure changed; no expected values were inferred", } def _extract_sector_seed_rows(ws) -> list[dict[str, Any]]: headers = [ws.cell(2, c).value for c in range(1, ws.max_column + 1)] headers = [str(h).strip() if h is not None else "" for h in headers] idx = {name: i for i, name in enumerate(headers)} rows: list[dict[str, Any]] = [] for r in range(3, ws.max_row + 1): row = {name: ws.cell(r, c + 1).value for c, name in enumerate(headers) if name} if not any(v not in (None, "") for v in row.values()): continue rows.append(row) return rows def _group_seed_rows(rows: list[dict[str, Any]]) -> OrderedDict[str, dict[str, Any]]: grouped: OrderedDict[str, dict[str, Any]] = OrderedDict() for row in rows: sector = _clean_text(row.get("Sector")) if not sector: continue if sector not in grouped: grouped[sector] = { "meta": row, "rows": [], } grouped[sector]["rows"].append(row) return grouped def _build_refreshed_rows(seed_rows: list[dict[str, Any]], limit: int) -> tuple[list[dict[str, Any]], dict[str, Any]]: session = requests.Session() session.headers.update({"User-Agent": DEFAULT_USER_AGENT}) grouped = _group_seed_rows(seed_rows) refreshed: list[dict[str, Any]] = [] sector_stats: list[dict[str, Any]] = [] today = _kst_today() for sector, bundle in grouped.items(): meta = bundle["meta"] proxy_ticker = _normalize_code(meta.get("Proxy_Ticker")) proxy_name = _clean_text(meta.get("Proxy_Name")) proxy_type = _clean_text(meta.get("Proxy_Type")) or "ETF" base_ticker = _normalize_code(meta.get("Base_Ticker")) or "069500" if sector == "금융/은행": split_specs = [ {"sector": "은행", "proxy_ticker": "091170", "proxy_name": "KODEX 은행", "proxy_type": "ETF"}, {"sector": "증권", "proxy_ticker": "0111J0", "proxy_name": "HANARO 증권고배당TOP3플러스", "proxy_type": "ETF"}, {"sector": "지주회사", "proxy_ticker": "307520", "proxy_name": "TIGER 지주회사", "proxy_type": "ETF"}, ] for spec in split_specs: split_proxy_ticker = _normalize_code(spec["proxy_ticker"]) split_proxy_name = _clean_text(spec["proxy_name"]) split_proxy_type = _clean_text(spec["proxy_type"]) or "ETF" split_source = "SHEET_INPUT" split_source_url = "" split_message = "" split_source_kind = "SHEET_INPUT" try: scraped = _parse_naver_etf_holdings(session, split_proxy_ticker, limit) split_source_url = scraped.get("source_url", "") split_source_kind = scraped.get("source_kind", "NAVER_ETF_PAGE_FAIL") holdings = scraped.get("holdings", []) split_message = scraped.get("message", "") if holdings: split_source = "NAVER_ETF_PAGE" weight_sum = round(sum(float(h["Weight"]) for h in holdings), 6) for h in holdings: refreshed.append({ "Sector": spec["sector"], "Proxy_Ticker": split_proxy_ticker, "Proxy_Name": split_proxy_name, "Proxy_Type": split_proxy_type, "Base_Ticker": base_ticker, "Constituent_Code": h["Constituent_Code"], "Constituent_Name": h["Constituent_Name"], "Weight": h["Weight"], "Is_ETF": "N", "Enabled": "Y", "Effective_Date": today, "Source": split_source, "Transport_Mode": "HTML_SERVER_RENDERED", "Source_URL": split_source_url, "Source_AsOf": today, "Sector_Check": spec["sector"], "Weight_Sum_All": weight_sum, "Weight_Sum_Stocks_Only": weight_sum, "ETF_Rows": 0, "Status": "OK", }) sector_stats.append({ "sector": spec["sector"], "proxy_ticker": split_proxy_ticker, "proxy_name": split_proxy_name, "proxy_type": split_proxy_type, "source_kind": split_source, "transport_mode": "HTML_SERVER_RENDERED", "source_url": split_source_url, "source_asof": today, "constituent_count": len(holdings), "weight_sum": weight_sum, "status": "CURRENT", "refresh_reason": "NAVER_ETF_PAGE_SPLIT", }) continue except Exception as exc: split_message = str(exc) split_source_kind = "NAVER_ETF_PAGE_FAIL" # 실패 시는 투명하게 남기고, 섹터 누락은 그대로 드러낸다. sector_stats.append({ "sector": spec["sector"], "proxy_ticker": split_proxy_ticker, "proxy_name": split_proxy_name, "proxy_type": split_proxy_type, "source_kind": split_source_kind, "transport_mode": "LAYOUT_CHANGED" if split_source_kind == "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED" else "UNKNOWN", "source_url": split_source_url, "source_asof": today, "constituent_count": 0, "weight_sum": 0.0, "status": "FAIL" if "FAIL" in split_source_kind else "WARN", "refresh_reason": split_message or "split_sector_fallback", }) continue source = "SHEET_INPUT" source_url = "" message = "" source_kind = "SHEET_INPUT" if proxy_type != "ETF": source_kind = "REPRESENTATIVE_STOCK_PROXY" source = source_kind source_url = f"{NAVER_BASE}/item/main.naver?code={proxy_ticker}" if proxy_ticker else "" fallback_rows = bundle["rows"][:limit] if bundle["rows"] else [] weight_sum = 0.0 for row in fallback_rows: weight = row.get("Weight") try: weight_sum += float(weight) if weight not in (None, "") else 0.0 except Exception: pass refreshed.append({ "Sector": sector, "Proxy_Ticker": proxy_ticker, "Proxy_Name": proxy_name, "Proxy_Type": proxy_type, "Base_Ticker": base_ticker, "Constituent_Code": _normalize_code(row.get("Constituent_Code")), "Constituent_Name": _clean_text(row.get("Constituent_Name")), "Weight": float(row.get("Weight") or 0), "Is_ETF": _clean_text(row.get("Is_ETF")) or "N", "Enabled": "Y", "Effective_Date": today, "Source": source_kind, "Transport_Mode": "HTML_SERVER_RENDERED" if source_kind == "REPRESENTATIVE_STOCK_PROXY" else "MANUAL_OR_TEMPLATE", "Source_URL": source_url, "Source_AsOf": today, "Sector_Check": sector, "Weight_Sum_All": weight_sum, "Weight_Sum_Stocks_Only": weight_sum, "ETF_Rows": 0, "Status": "CURRENT", }) sector_stats.append({ "sector": sector, "proxy_ticker": proxy_ticker, "proxy_name": proxy_name, "proxy_type": proxy_type, "source_kind": source_kind, "transport_mode": "HTML_SERVER_RENDERED" if source_kind == "REPRESENTATIVE_STOCK_PROXY" else "MANUAL_OR_TEMPLATE", "source_url": source_url, "source_asof": today, "constituent_count": len(fallback_rows), "weight_sum": round(weight_sum, 6), "status": "CURRENT", "refresh_reason": "REPRESENTATIVE_STOCK_PROXY", }) continue if proxy_ticker: try: scraped = _parse_naver_etf_holdings(session, proxy_ticker, limit) source_url = scraped.get("source_url", "") source_kind = scraped.get("source_kind", "NAVER_ETF_PAGE_FAIL") holdings = scraped.get("holdings", []) message = scraped.get("message", "") if holdings: source = "NAVER_ETF_PAGE" weight_sum = round(sum(float(h["Weight"]) for h in holdings), 6) for h in holdings: refreshed.append({ "Sector": sector, "Proxy_Ticker": proxy_ticker, "Proxy_Name": proxy_name, "Proxy_Type": proxy_type, "Base_Ticker": base_ticker, "Constituent_Code": h["Constituent_Code"], "Constituent_Name": h["Constituent_Name"], "Weight": h["Weight"], "Is_ETF": "N", "Enabled": "Y", "Effective_Date": today, "Source": source, "Transport_Mode": "HTML_SERVER_RENDERED", "Source_URL": source_url, "Source_AsOf": today, "Sector_Check": sector, "Weight_Sum_All": weight_sum, "Weight_Sum_Stocks_Only": weight_sum, "ETF_Rows": 0, "Status": "OK", }) sector_stats.append({ "sector": sector, "proxy_ticker": proxy_ticker, "proxy_name": proxy_name, "proxy_type": proxy_type, "source_kind": source, "transport_mode": "HTML_SERVER_RENDERED", "source_url": source_url, "source_asof": today, "constituent_count": len(holdings), "weight_sum": weight_sum, "status": "CURRENT", "refresh_reason": "NAVER_ETF_PAGE", }) continue except Exception as exc: message = str(exc) source_kind = "NAVER_ETF_PAGE_FAIL" # fallback: preserve seed rows but expose the failure transparently fallback_rows = bundle["rows"][:limit] if bundle["rows"] else [] weight_sum = 0.0 for row in fallback_rows: weight = row.get("Weight") try: weight_sum += float(weight) if weight not in (None, "") else 0.0 except Exception: pass refreshed.append({ "Sector": sector, "Proxy_Ticker": proxy_ticker, "Proxy_Name": proxy_name, "Proxy_Type": proxy_type, "Base_Ticker": base_ticker, "Constituent_Code": _normalize_code(row.get("Constituent_Code")), "Constituent_Name": _clean_text(row.get("Constituent_Name")), "Weight": float(row.get("Weight") or 0), "Is_ETF": _clean_text(row.get("Is_ETF")) or "N", "Enabled": "Y", "Effective_Date": today, "Source": source_kind, "Transport_Mode": "LAYOUT_CHANGED" if source_kind == "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED" else "UNKNOWN", "Source_URL": source_url, "Source_AsOf": today, "Sector_Check": sector, "Weight_Sum_All": weight_sum, "Weight_Sum_Stocks_Only": weight_sum, "ETF_Rows": 0, "Status": "FAIL" if source_kind.endswith("FAIL") else "WARN", }) sector_stats.append({ "sector": sector, "proxy_ticker": proxy_ticker, "proxy_name": proxy_name, "proxy_type": proxy_type, "source_kind": source_kind, "transport_mode": "LAYOUT_CHANGED" if source_kind == "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED" else "UNKNOWN", "source_url": source_url, "source_asof": today, "constituent_count": len(fallback_rows), "weight_sum": round(weight_sum, 6), "status": "FAIL" if "FAIL" in source_kind else "WARN", "refresh_reason": message or "seed_fallback", }) audit_payload = build_sector_universe_refresh_audit({"data": {"sector_universe": refreshed}}) return refreshed, { "sector_universe_refresh_audit": audit_payload, "sector_stats": sector_stats, } def _style_title(ws, title: str, subtitle: str) -> None: ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=max(8, ws.max_column or 8)) ws["A1"] = title ws["A1"].font = WHITE_FONT ws["A1"].fill = TITLE_FILL ws["A1"].alignment = Alignment(horizontal="left") ws.merge_cells(start_row=2, start_column=1, end_row=2, end_column=max(8, ws.max_column or 8)) ws["A2"] = subtitle ws["A2"].font = NOTE_FONT def _write_table(ws, start_row: int, start_col: int, headers: list[str], rows: list[list[Any]]) -> int: for i, header in enumerate(headers, start=start_col): cell = ws.cell(start_row, i) cell.value = header cell.font = WHITE_FONT cell.fill = HEADER_FILL cell.alignment = Alignment(horizontal="center") for r_idx, row in enumerate(rows, start=start_row + 1): for c_idx, value in enumerate(row, start=start_col): ws.cell(r_idx, c_idx).value = value return start_row + len(rows) def _write_sector_universe_sheet(wb, rows: list[dict[str, Any]]) -> None: if "sector_universe" in wb.sheetnames: del wb["sector_universe"] ws = wb.create_sheet("sector_universe") headers = [ "Sector", "Proxy_Ticker", "Proxy_Name", "Proxy_Type", "Base_Ticker", "Constituent_Code", "Constituent_Name", "Weight", "Is_ETF", "Enabled", "Effective_Date", "Source", "Transport_Mode", "Source_URL", "Source_AsOf", "Sector_Check", "Weight_Sum_All", "Weight_Sum_Stocks_Only", "ETF_Rows", "Status", ] now = _kst_now().strftime("%Y-%m-%d %H:%M:%S") ws["A1"] = f"updated: {now} KST" ws["A1"].font = Font(bold=True) _write_table(ws, 2, 1, headers, [[r.get(h, "") for h in headers] for r in rows]) for col_idx, header in enumerate(headers, start=1): if header in {"Proxy_Ticker", "Base_Ticker", "Constituent_Code"}: for r in range(3, ws.max_row + 1): ws.cell(r, col_idx).number_format = "@" if header in {"Weight", "Weight_Sum_All", "Weight_Sum_Stocks_Only"}: for r in range(3, ws.max_row + 1): ws.cell(r, col_idx).number_format = "0.0000" width = 16 if header in {"Constituent_Name", "Proxy_Name"}: width = 22 elif header in {"Source_URL"}: width = 42 elif header in {"Status", "Source", "Sector_Check", "Proxy_Type", "Transport_Mode"}: width = 16 ws.column_dimensions[get_column_letter(col_idx)].width = width ws.freeze_panes = "A3" ws.sheet_view.showGridLines = False def _write_audit_sheet(wb, audit_payload: dict[str, Any]) -> None: audit = audit_payload["sector_universe_refresh_audit"] if "sector_universe_refresh_audit" in wb.sheetnames: del wb["sector_universe_refresh_audit"] ws = wb.create_sheet("sector_universe_refresh_audit") ws.sheet_view.showGridLines = False _style_title( ws, "섹터 월간 갱신 감사", "Naver ETF 페이지 기반 월간 갱신 상태와 provenance 분리 현황을 점검한다.", ) summary = audit.get("summary", {}) summary_rows = [ ["formula_id", audit.get("formula_id", "")], ["gate", audit.get("gate", "")], ["sector_count", summary.get("sector_count", 0)], ["current_count", summary.get("current_count", 0)], ["due_count", summary.get("due_count", 0)], ["overdue_count", summary.get("overdue_count", 0)], ["missing_count", summary.get("missing_count", 0)], ["template_count", summary.get("template_count", 0)], ["sheet_input_count", summary.get("sheet_input_count", 0)], ["naver_source_count", summary.get("naver_source_count", 0)], ["missing_source_url_count", summary.get("missing_source_url_count", 0)], ["stale_sector_count", summary.get("stale_sector_count", 0)], ["oldest_source_asof", summary.get("oldest_source_asof", "")], ["newest_source_asof", summary.get("newest_source_asof", "")], ] _write_table(ws, 4, 1, ["key", "value"], summary_rows) rows = audit.get("rows", []) or [] if rows: headers = [ "sector", "proxy_ticker", "proxy_name", "proxy_type", "source_kind", "source_url", "source_asof", "age_days", "constituent_count", "stock_count", "etf_count", "weight_sum", "status", "refresh_reason", ] _write_table(ws, 4, 4, headers, [[r.get(h, "") for h in headers] for r in rows]) for idx, header in enumerate(headers, start=4): width = 16 if header in {"sector", "proxy_name", "refresh_reason"}: width = 20 elif header == "source_url": width = 42 ws.column_dimensions[get_column_letter(idx)].width = width ws.freeze_panes = "A5" def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--input", default=str(DEFAULT_INPUT_XLSX)) ap.add_argument("--output", default=str(DEFAULT_OUTPUT_XLSX)) ap.add_argument("--limit", type=int, default=10, help="Per-sector holdings limit from Naver ETF pages") ap.add_argument("--apply", action="store_true", help="Overwrite the input workbook in place as well") args = ap.parse_args() input_path = Path(args.input) output_path = Path(args.output) if not input_path.exists(): raise FileNotFoundError(input_path) wb = load_workbook(input_path) if "sector_universe" not in wb.sheetnames: raise RuntimeError("sector_universe sheet not found") seed_ws = wb["sector_universe"] seed_rows = _extract_sector_seed_rows(seed_ws) refreshed_rows, audit_payload = _build_refreshed_rows(seed_rows, max(1, args.limit)) _write_sector_universe_sheet(wb, refreshed_rows) _write_audit_sheet(wb, audit_payload) output_path.parent.mkdir(parents=True, exist_ok=True) wb.save(output_path) if args.apply and input_path.resolve() != output_path.resolve(): shutil.copy2(output_path, input_path) print(json.dumps({ "status": "OK", "input": str(input_path), "output": str(output_path), "rows": len(refreshed_rows), "sectors": len(audit_payload["sector_stats"]), "current_count": audit_payload["sector_universe_refresh_audit"]["summary"]["current_count"], "overdue_count": audit_payload["sector_universe_refresh_audit"]["summary"]["overdue_count"], "template_count": audit_payload["sector_universe_refresh_audit"]["summary"]["template_count"], }, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": sys.exit(main())