from __future__ import annotations import argparse import csv import datetime as dt import re from pathlib import Path from typing import Any import openpyxl ROOT = Path(__file__).resolve().parents[2] DEFAULT_XLSX = ROOT / "GatherTradingData.xlsx" OUTPUT_HEADERS = [ "ETF_Ticker", "ETF_Name", "Close", "NAV", "iNAV", "Premium_Discount_Pct", "Tracking_Error", "AUM", "Source_Date", "Source", "Enabled", "Note", ] COLUMN_ALIASES = { "ticker": ["ETF_Ticker", "종목코드", "단축코드", "표준코드", "code", "ticker"], "name": ["ETF_Name", "종목명", "한글종목명", "Name", "name"], "close": ["Close", "종가", "현재가", "시장가격", "TDD_CLSPRC", "close"], "nav": ["NAV", "순자산가치", "기준가격", "기준가", "NAV(원)", "nav"], "inav": ["iNAV", "추정순자산가치", "실시간기준가", "iNAV(원)", "inav"], "premium_discount_pct": ["Premium_Discount_Pct", "괴리율", "괴리율(%)", "가격괴리율", "premium_discount_pct"], "tracking_error": ["Tracking_Error", "추적오차율", "추적오차", "추적오차율(%)", "tracking_error"], "aum": ["AUM", "순자산총액", "순자산총액(원)", "상장좌수", "aum"], "source_date": ["Source_Date", "기준일", "일자", "거래일자", "Date", "date"], } def normalize_header(value: Any) -> str: return re.sub(r"\s+", "", str(value or "").strip()).lower() def normalize_ticker(value: Any) -> str: text = str(value or "").strip() if text.endswith(".0"): text = text[:-2] text = re.sub(r"[^0-9A-Za-z]", "", text) if text.isdigit(): return text.zfill(6) if re.fullmatch(r"[0-9A-Za-z]{1,6}", text): return text.zfill(6) return text def parse_number(value: Any) -> float | None: if value in (None, ""): return None if isinstance(value, (int, float)) and not isinstance(value, bool): return float(value) text = str(value).strip() if not text or text in {"-", "N/A", "nan"}: return None text = text.replace(",", "").replace("%", "") try: return float(text) except ValueError: return None def parse_date(value: Any) -> str: if value in (None, ""): return "" if isinstance(value, (dt.datetime, dt.date)): return value.strftime("%Y-%m-%d") text = str(value).strip() match = re.search(r"(\d{4})[./-]?(\d{1,2})[./-]?(\d{1,2})", text) if not match: return "" y, m, d = match.groups() return f"{y}-{int(m):02d}-{int(d):02d}" def read_source_table(path: Path) -> list[dict[str, Any]]: if path.suffix.lower() in {".xlsx", ".xlsm"}: wb = openpyxl.load_workbook(path, data_only=True, read_only=True) ws = wb[wb.sheetnames[0]] rows = list(ws.iter_rows(values_only=True)) header_row_idx = 0 best_score = -1 alias_tokens = {normalize_header(a) for aliases in COLUMN_ALIASES.values() for a in aliases} for i, row in enumerate(rows[:20]): score = sum(1 for cell in row if normalize_header(cell) in alias_tokens) if score > best_score: best_score = score header_row_idx = i headers = [str(v or "").strip() for v in rows[header_row_idx]] return [ dict(zip(headers, row)) for row in rows[header_row_idx + 1 :] if row and any(v not in (None, "") for v in row) ] encoding_candidates = ["utf-8-sig", "cp949", "euc-kr"] last_error: Exception | None = None for encoding in encoding_candidates: try: with path.open("r", encoding=encoding, newline="") as f: sample = f.read(4096) f.seek(0) dialect = csv.Sniffer().sniff(sample, delimiters=",\t;") return list(csv.DictReader(f, dialect=dialect)) except Exception as exc: last_error = exc raise RuntimeError(f"failed to read source file {path}: {last_error}") def resolve_columns(rows: list[dict[str, Any]]) -> dict[str, str]: if not rows: return {} source_headers = list(rows[0].keys()) normalized = {normalize_header(h): h for h in source_headers} resolved: dict[str, str] = {} for field, aliases in COLUMN_ALIASES.items(): for alias in aliases: key = normalize_header(alias) if key in normalized: resolved[field] = normalized[key] break return resolved def existing_etfs(wb: openpyxl.Workbook) -> dict[str, str]: result: dict[str, str] = {} if "etf_raw" in wb.sheetnames: ws = wb["etf_raw"] headers = [ws.cell(2, c).value for c in range(1, ws.max_column + 1)] idx = {h: i + 1 for i, h in enumerate(headers) if h} if "ETF_Ticker" in idx: for r in range(3, ws.max_row + 1): ticker = normalize_ticker(ws.cell(r, idx["ETF_Ticker"]).value) if ticker: result[ticker] = str(ws.cell(r, idx.get("ETF_Name", idx["ETF_Ticker"])).value or "") return result def update_workbook(workbook_path: Path, source_path: Path, enable: bool) -> tuple[int, int]: rows = read_source_table(source_path) columns = resolve_columns(rows) if "ticker" not in columns: raise RuntimeError(f"source file has no ticker/code column. resolved={columns}") wb = openpyxl.load_workbook(workbook_path) targets = existing_etfs(wb) if "etf_nav_manual" in wb.sheetnames: del wb["etf_nav_manual"] insert_at = wb.sheetnames.index("etf_raw") + 1 if "etf_raw" in wb.sheetnames else 1 ws = wb.create_sheet("etf_nav_manual", insert_at) ws.append([f"updated: imported from {source_path.name}"]) ws.append(OUTPUT_HEADERS) imported = 0 matched = 0 seen: set[str] = set() for row in rows: ticker = normalize_ticker(row.get(columns["ticker"])) if not ticker or ticker in seen: continue seen.add(ticker) name = str(row.get(columns.get("name", ""), "") or targets.get(ticker, "")).strip() close = parse_number(row.get(columns.get("close", ""))) nav = parse_number(row.get(columns.get("nav", ""))) inav = parse_number(row.get(columns.get("inav", ""))) premium = parse_number(row.get(columns.get("premium_discount_pct", ""))) if premium is None: basis_nav = nav if nav and nav > 0 else inav if close is not None and basis_nav and basis_nav > 0: premium = ((close / basis_nav) - 1) * 100 tracking_error = parse_number(row.get(columns.get("tracking_error", ""))) aum = parse_number(row.get(columns.get("aum", ""))) source_date = parse_date(row.get(columns.get("source_date", ""))) is_match = not targets or ticker in targets if is_match: matched += 1 row_enable = "Y" if enable and is_match and (nav is not None or inav is not None) else "N" ws.append([ ticker, name, close, nav, inav, premium, tracking_error, aum, source_date, f"import:{source_path.name}", row_enable, "matched_etf_raw" if is_match else "not_in_etf_raw_review_before_enable", ]) imported += 1 for row in ws.iter_rows(min_row=1, max_row=ws.max_row): row[0].number_format = "@" for cell in ws[2]: cell.font = openpyxl.styles.Font(bold=True, color="FFFFFF") cell.fill = openpyxl.styles.PatternFill("solid", fgColor="7030A0") ws.freeze_panes = "A3" ws.auto_filter.ref = f"A2:L{ws.max_row}" widths = [14, 34, 14, 14, 14, 20, 16, 16, 16, 28, 10, 42] for i, width in enumerate(widths, 1): ws.column_dimensions[openpyxl.utils.get_column_letter(i)].width = width wb.save(workbook_path) return imported, matched def main() -> int: parser = argparse.ArgumentParser(description="Import official ETF NAV/iNAV data into etf_nav_manual sheet.") parser.add_argument("source", type=Path, help="KRX/KIND/issuer CSV or XLSX export") parser.add_argument("--workbook", type=Path, default=DEFAULT_XLSX) parser.add_argument("--enable", action="store_true", help="Set Enabled=Y for matched rows with NAV or iNAV") args = parser.parse_args() imported, matched = update_workbook(args.workbook, args.source, args.enable) print(f"ETF NAV IMPORT OK: imported={imported} matched_etf_raw={matched} workbook={args.workbook.name}") return 0 if __name__ == "__main__": raise SystemExit(main())