섹터 유니버스 분리와 월간 갱신 정합화

This commit is contained in:
2026-06-15 02:29:29 +09:00
parent e2820065d1
commit 82ca4ddbfd
11 changed files with 1658 additions and 43 deletions
+31 -2
View File
@@ -3,6 +3,7 @@ import os
import requests
import time
import subprocess
import argparse
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
@@ -10,6 +11,7 @@ CLASPRC_PATH = ROOT / ".clasprc.json"
CLASP_PATH = ROOT / ".clasp.json"
SPREADSHEET_ID = "1e1TNlLfnT69nvw-I1wU_oBHmEtI2pfbld3e0fFmtrZM"
OUTPUT_XLSX = ROOT / "GatherTradingData.xlsx"
LOCAL_OUTPUT_XLSX = ROOT / "outputs" / "sector_insights_enhanced" / "GatherTradingData_sector_insights.xlsx"
def get_tokens():
if not CLASPRC_PATH.exists():
@@ -75,20 +77,46 @@ def download_spreadsheet(spreadsheet_id, access_token, output_path):
print(f"Successfully downloaded to {output_path}")
return True
def validate_monthly_sector_refresh(xlsx_path: Path) -> bool:
cmd = [
"python",
"tools/validate_sector_universe_monthly_refresh_v1.py",
"--xlsx",
str(xlsx_path),
]
print(f"Validating monthly sector refresh: {xlsx_path} ...")
res = subprocess.run(cmd, cwd=str(ROOT))
if res.returncode == 0:
print("Monthly sector refresh validation passed.")
return True
print("Monthly sector refresh validation failed.")
return False
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--function", default="runDataFeed", help="Primary GAS function to execute before download")
parser.add_argument("--fallback-function", default="run_all", help="Fallback GAS function to execute if primary fails")
args = parser.parse_args()
try:
tokens = get_tokens()
script_id = get_script_id()
access_token = refresh_access_token(tokens)
# Step 1: Execute GAS run_all
if run_gas_function(script_id, access_token, "run_all"):
# Step 1: Execute GAS runDataFeed first, then fallback to run_all if needed.
primary_ok = run_gas_function(script_id, access_token, args.function)
if not primary_ok and args.fallback_function and args.fallback_function != args.function:
print(f"Primary function {args.function} failed; trying fallback {args.fallback_function} ...")
primary_ok = run_gas_function(script_id, access_token, args.fallback_function)
if primary_ok:
print("Waiting a bit for GAS processes to finalize (optional)...")
time.sleep(5)
# Step 2: Download spreadsheet
if download_spreadsheet(SPREADSHEET_ID, access_token, OUTPUT_XLSX):
print("\nRoutine Part 1 & 2 complete.")
validate_monthly_sector_refresh(OUTPUT_XLSX)
print("Final step: npm run prepare-upload-zip")
else:
print("\nDownload failed. Please check Drive API scopes.")
@@ -98,6 +126,7 @@ def main():
fallback = subprocess.run(["python", "tools/update_workbook_sector_insights.py"], cwd=str(ROOT))
if fallback.returncode == 0:
print("Local sector-insight workbook updated.")
validate_monthly_sector_refresh(LOCAL_OUTPUT_XLSX)
else:
print("Local sector-insight workbook build failed.")
+616
View File
@@ -0,0 +1,616 @@
from __future__ import annotations
import argparse
import datetime as dt
import json
import re
import shutil
import sys
from collections import OrderedDict
from pathlib import Path
from typing import Any
from urllib.parse import urljoin, urlparse, parse_qs
import requests
from bs4 import BeautifulSoup
from openpyxl import load_workbook
from openpyxl.styles import Alignment, Font, PatternFill
from openpyxl.utils import get_column_letter
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from src.quant_engine.sector_universe_refresh import build_sector_universe_refresh_audit
DEFAULT_INPUT_XLSX = ROOT / "GatherTradingData.xlsx"
DEFAULT_OUTPUT_XLSX = ROOT / "outputs" / "sector_universe_refresh" / "GatherTradingData_sector_universe.xlsx"
DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0 Safari/537.36"
NAVER_BASE = "https://finance.naver.com"
NAVER_ITEM_CODE_RE = re.compile(r"(?:https?:)?//finance\.naver\.com(?P<path>/item/[^\"'\s<>]+code=(?P<code>\d+)[^\"'\s<>]*)", re.I)
NAVER_REL_CODE_RE = re.compile(r"(?P<path>/item/[^\"'\s<>]+code=(?P<code>\d+)[^\"'\s<>]*)", re.I)
TITLE_FILL = PatternFill("solid", fgColor="1F4E78")
HEADER_FILL = PatternFill("solid", fgColor="1F4E78")
SUBHEADER_FILL = PatternFill("solid", fgColor="D9EAF7")
WHITE_FONT = Font(color="FFFFFF", bold=True)
BOLD_FONT = Font(bold=True)
NOTE_FONT = Font(italic=True, color="666666")
def _kst_now() -> dt.datetime:
return dt.datetime.now(dt.timezone(dt.timedelta(hours=9)))
def _kst_today() -> str:
return _kst_now().strftime("%Y-%m-%d")
def _clean_text(value: Any) -> str:
if value is None:
return ""
return str(value).strip()
def _normalize_code(value: Any) -> str:
text = _clean_text(value)
if not text:
return ""
text = text.replace(",", "")
if text.endswith(".0"):
text = text[:-2]
if text.isdigit():
return text.zfill(6)
if re.fullmatch(r"\d+\.\d+", text):
return str(int(float(text))).zfill(6)
return text
def _parse_weight(value: str) -> float | None:
text = _clean_text(value).replace("%", "").replace(",", "")
if not text:
return None
try:
return float(text)
except Exception:
return None
def _discover_naver_candidate_urls(soup: BeautifulSoup, proxy_ticker: str) -> list[str]:
candidates: list[str] = []
seen: set[str] = set()
def add(url: str) -> None:
url = _clean_text(url)
if not url or url in seen:
return
seen.add(url)
candidates.append(url)
expected_code = _normalize_code(proxy_ticker)
for script in soup.find_all("script"):
src = _clean_text(script.get("src"))
if src:
if expected_code and expected_code in src:
if src.startswith("//"):
add(f"https:{src}")
elif src.startswith("/"):
add(urljoin(NAVER_BASE, src))
else:
add(src)
continue
text = script.get_text(" ", strip=True) or ""
if not text:
continue
for regex in (NAVER_ITEM_CODE_RE, NAVER_REL_CODE_RE):
for match in regex.finditer(text):
code = _normalize_code(match.groupdict().get("code") or "")
if expected_code and code and code != expected_code:
continue
path = match.groupdict().get("path") or ""
if path:
add(urljoin(NAVER_BASE, path))
return candidates
def _parse_naver_etf_holdings(session: requests.Session, proxy_ticker: str, limit: int) -> dict[str, Any]:
url_candidates = [
f"{NAVER_BASE}/item/main.naver?code={proxy_ticker}",
f"{NAVER_BASE}/item/coinfo.naver?code={proxy_ticker}&target=cu_more",
]
last_message = ""
for url in url_candidates:
response = session.get(url, timeout=20)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
discovered = _discover_naver_candidate_urls(soup, proxy_ticker)
for candidate in discovered:
if candidate not in url_candidates:
url_candidates.append(candidate)
section = soup.select_one("div.section.etf_asset")
table = section.select_one("table.tb_type1_a") if section is not None else None
if table is None:
# layout changed or this endpoint does not expose the constituent table
last_message = "ETF constituent table missing; page structure may have changed"
continue
holdings: list[dict[str, Any]] = []
for tr in table.select("tbody tr"):
tds = tr.find_all("td")
if len(tds) < 3:
continue
name_link = tr.find("a", href=re.compile(r"code=\d+"))
if name_link is None:
continue
name = _clean_text(name_link.get_text(" ", strip=True))
href = _clean_text(name_link.get("href"))
m = re.search(r"code=(\d+)", href)
code = _normalize_code(m.group(1) if m else "")
if not code or not name:
continue
weight = _parse_weight(tds[2].get_text(" ", strip=True))
if weight is None:
continue
holdings.append({
"Constituent_Code": code,
"Constituent_Name": name,
"Weight": round(weight / 100.0, 6),
"Source": "NAVER_ETF_PAGE",
})
if len(holdings) >= limit:
break
if holdings:
return {
"source_url": url,
"source_kind": "NAVER_ETF_PAGE",
"holdings": holdings,
"discovered_urls": discovered,
"message": "",
}
last_message = "no holdings parsed"
return {
"source_url": url_candidates[0],
"source_kind": "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED",
"holdings": [],
"discovered_urls": [],
"message": last_message or "page structure changed; no expected values were inferred",
}
def _extract_sector_seed_rows(ws) -> list[dict[str, Any]]:
headers = [ws.cell(2, c).value for c in range(1, ws.max_column + 1)]
headers = [str(h).strip() if h is not None else "" for h in headers]
idx = {name: i for i, name in enumerate(headers)}
rows: list[dict[str, Any]] = []
for r in range(3, ws.max_row + 1):
row = {name: ws.cell(r, c + 1).value for c, name in enumerate(headers) if name}
if not any(v not in (None, "") for v in row.values()):
continue
rows.append(row)
return rows
def _group_seed_rows(rows: list[dict[str, Any]]) -> OrderedDict[str, dict[str, Any]]:
grouped: OrderedDict[str, dict[str, Any]] = OrderedDict()
for row in rows:
sector = _clean_text(row.get("Sector"))
if not sector:
continue
if sector not in grouped:
grouped[sector] = {
"meta": row,
"rows": [],
}
grouped[sector]["rows"].append(row)
return grouped
def _build_refreshed_rows(seed_rows: list[dict[str, Any]], limit: int) -> tuple[list[dict[str, Any]], dict[str, Any]]:
session = requests.Session()
session.headers.update({"User-Agent": DEFAULT_USER_AGENT})
grouped = _group_seed_rows(seed_rows)
refreshed: list[dict[str, Any]] = []
sector_stats: list[dict[str, Any]] = []
today = _kst_today()
for sector, bundle in grouped.items():
meta = bundle["meta"]
proxy_ticker = _normalize_code(meta.get("Proxy_Ticker"))
proxy_name = _clean_text(meta.get("Proxy_Name"))
proxy_type = _clean_text(meta.get("Proxy_Type")) or "ETF"
base_ticker = _normalize_code(meta.get("Base_Ticker")) or "069500"
if sector == "금융/은행":
split_specs = [
{"sector": "은행", "proxy_ticker": "091170", "proxy_name": "KODEX 은행", "proxy_type": "ETF"},
{"sector": "증권", "proxy_ticker": "0111J0", "proxy_name": "HANARO 증권고배당TOP3플러스", "proxy_type": "ETF"},
{"sector": "지주회사", "proxy_ticker": "307520", "proxy_name": "TIGER 지주회사", "proxy_type": "ETF"},
]
for spec in split_specs:
split_proxy_ticker = _normalize_code(spec["proxy_ticker"])
split_proxy_name = _clean_text(spec["proxy_name"])
split_proxy_type = _clean_text(spec["proxy_type"]) or "ETF"
split_source = "SHEET_INPUT"
split_source_url = ""
split_message = ""
split_source_kind = "SHEET_INPUT"
try:
scraped = _parse_naver_etf_holdings(session, split_proxy_ticker, limit)
split_source_url = scraped.get("source_url", "")
split_source_kind = scraped.get("source_kind", "NAVER_ETF_PAGE_FAIL")
holdings = scraped.get("holdings", [])
split_message = scraped.get("message", "")
if holdings:
split_source = "NAVER_ETF_PAGE"
weight_sum = round(sum(float(h["Weight"]) for h in holdings), 6)
for h in holdings:
refreshed.append({
"Sector": spec["sector"],
"Proxy_Ticker": split_proxy_ticker,
"Proxy_Name": split_proxy_name,
"Proxy_Type": split_proxy_type,
"Base_Ticker": base_ticker,
"Constituent_Code": h["Constituent_Code"],
"Constituent_Name": h["Constituent_Name"],
"Weight": h["Weight"],
"Is_ETF": "N",
"Enabled": "Y",
"Effective_Date": today,
"Source": split_source,
"Transport_Mode": "HTML_SERVER_RENDERED",
"Source_URL": split_source_url,
"Source_AsOf": today,
"Sector_Check": spec["sector"],
"Weight_Sum_All": weight_sum,
"Weight_Sum_Stocks_Only": weight_sum,
"ETF_Rows": 0,
"Status": "OK",
})
sector_stats.append({
"sector": spec["sector"],
"proxy_ticker": split_proxy_ticker,
"proxy_name": split_proxy_name,
"proxy_type": split_proxy_type,
"source_kind": split_source,
"transport_mode": "HTML_SERVER_RENDERED",
"source_url": split_source_url,
"source_asof": today,
"constituent_count": len(holdings),
"weight_sum": weight_sum,
"status": "CURRENT",
"refresh_reason": "NAVER_ETF_PAGE_SPLIT",
})
continue
except Exception as exc:
split_message = str(exc)
split_source_kind = "NAVER_ETF_PAGE_FAIL"
# 실패 시는 투명하게 남기고, 섹터 누락은 그대로 드러낸다.
sector_stats.append({
"sector": spec["sector"],
"proxy_ticker": split_proxy_ticker,
"proxy_name": split_proxy_name,
"proxy_type": split_proxy_type,
"source_kind": split_source_kind,
"transport_mode": "LAYOUT_CHANGED" if split_source_kind == "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED" else "UNKNOWN",
"source_url": split_source_url,
"source_asof": today,
"constituent_count": 0,
"weight_sum": 0.0,
"status": "FAIL" if "FAIL" in split_source_kind else "WARN",
"refresh_reason": split_message or "split_sector_fallback",
})
continue
source = "SHEET_INPUT"
source_url = ""
message = ""
source_kind = "SHEET_INPUT"
if proxy_type != "ETF":
source_kind = "REPRESENTATIVE_STOCK_PROXY"
source = source_kind
source_url = f"{NAVER_BASE}/item/main.naver?code={proxy_ticker}" if proxy_ticker else ""
fallback_rows = bundle["rows"][:limit] if bundle["rows"] else []
weight_sum = 0.0
for row in fallback_rows:
weight = row.get("Weight")
try:
weight_sum += float(weight) if weight not in (None, "") else 0.0
except Exception:
pass
refreshed.append({
"Sector": sector,
"Proxy_Ticker": proxy_ticker,
"Proxy_Name": proxy_name,
"Proxy_Type": proxy_type,
"Base_Ticker": base_ticker,
"Constituent_Code": _normalize_code(row.get("Constituent_Code")),
"Constituent_Name": _clean_text(row.get("Constituent_Name")),
"Weight": float(row.get("Weight") or 0),
"Is_ETF": _clean_text(row.get("Is_ETF")) or "N",
"Enabled": "Y",
"Effective_Date": today,
"Source": source_kind,
"Transport_Mode": "HTML_SERVER_RENDERED" if source_kind == "REPRESENTATIVE_STOCK_PROXY" else "MANUAL_OR_TEMPLATE",
"Source_URL": source_url,
"Source_AsOf": today,
"Sector_Check": sector,
"Weight_Sum_All": weight_sum,
"Weight_Sum_Stocks_Only": weight_sum,
"ETF_Rows": 0,
"Status": "CURRENT",
})
sector_stats.append({
"sector": sector,
"proxy_ticker": proxy_ticker,
"proxy_name": proxy_name,
"proxy_type": proxy_type,
"source_kind": source_kind,
"transport_mode": "HTML_SERVER_RENDERED" if source_kind == "REPRESENTATIVE_STOCK_PROXY" else "MANUAL_OR_TEMPLATE",
"source_url": source_url,
"source_asof": today,
"constituent_count": len(fallback_rows),
"weight_sum": round(weight_sum, 6),
"status": "CURRENT",
"refresh_reason": "REPRESENTATIVE_STOCK_PROXY",
})
continue
if proxy_ticker:
try:
scraped = _parse_naver_etf_holdings(session, proxy_ticker, limit)
source_url = scraped.get("source_url", "")
source_kind = scraped.get("source_kind", "NAVER_ETF_PAGE_FAIL")
holdings = scraped.get("holdings", [])
message = scraped.get("message", "")
if holdings:
source = "NAVER_ETF_PAGE"
weight_sum = round(sum(float(h["Weight"]) for h in holdings), 6)
for h in holdings:
refreshed.append({
"Sector": sector,
"Proxy_Ticker": proxy_ticker,
"Proxy_Name": proxy_name,
"Proxy_Type": proxy_type,
"Base_Ticker": base_ticker,
"Constituent_Code": h["Constituent_Code"],
"Constituent_Name": h["Constituent_Name"],
"Weight": h["Weight"],
"Is_ETF": "N",
"Enabled": "Y",
"Effective_Date": today,
"Source": source,
"Transport_Mode": "HTML_SERVER_RENDERED",
"Source_URL": source_url,
"Source_AsOf": today,
"Sector_Check": sector,
"Weight_Sum_All": weight_sum,
"Weight_Sum_Stocks_Only": weight_sum,
"ETF_Rows": 0,
"Status": "OK",
})
sector_stats.append({
"sector": sector,
"proxy_ticker": proxy_ticker,
"proxy_name": proxy_name,
"proxy_type": proxy_type,
"source_kind": source,
"transport_mode": "HTML_SERVER_RENDERED",
"source_url": source_url,
"source_asof": today,
"constituent_count": len(holdings),
"weight_sum": weight_sum,
"status": "CURRENT",
"refresh_reason": "NAVER_ETF_PAGE",
})
continue
except Exception as exc:
message = str(exc)
source_kind = "NAVER_ETF_PAGE_FAIL"
# fallback: preserve seed rows but expose the failure transparently
fallback_rows = bundle["rows"][:limit] if bundle["rows"] else []
weight_sum = 0.0
for row in fallback_rows:
weight = row.get("Weight")
try:
weight_sum += float(weight) if weight not in (None, "") else 0.0
except Exception:
pass
refreshed.append({
"Sector": sector,
"Proxy_Ticker": proxy_ticker,
"Proxy_Name": proxy_name,
"Proxy_Type": proxy_type,
"Base_Ticker": base_ticker,
"Constituent_Code": _normalize_code(row.get("Constituent_Code")),
"Constituent_Name": _clean_text(row.get("Constituent_Name")),
"Weight": float(row.get("Weight") or 0),
"Is_ETF": _clean_text(row.get("Is_ETF")) or "N",
"Enabled": "Y",
"Effective_Date": today,
"Source": source_kind,
"Transport_Mode": "LAYOUT_CHANGED" if source_kind == "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED" else "UNKNOWN",
"Source_URL": source_url,
"Source_AsOf": today,
"Sector_Check": sector,
"Weight_Sum_All": weight_sum,
"Weight_Sum_Stocks_Only": weight_sum,
"ETF_Rows": 0,
"Status": "FAIL" if source_kind.endswith("FAIL") else "WARN",
})
sector_stats.append({
"sector": sector,
"proxy_ticker": proxy_ticker,
"proxy_name": proxy_name,
"proxy_type": proxy_type,
"source_kind": source_kind,
"transport_mode": "LAYOUT_CHANGED" if source_kind == "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED" else "UNKNOWN",
"source_url": source_url,
"source_asof": today,
"constituent_count": len(fallback_rows),
"weight_sum": round(weight_sum, 6),
"status": "FAIL" if "FAIL" in source_kind else "WARN",
"refresh_reason": message or "seed_fallback",
})
audit_payload = build_sector_universe_refresh_audit({"data": {"sector_universe": refreshed}})
return refreshed, {
"sector_universe_refresh_audit": audit_payload,
"sector_stats": sector_stats,
}
def _style_title(ws, title: str, subtitle: str) -> None:
ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=max(8, ws.max_column or 8))
ws["A1"] = title
ws["A1"].font = WHITE_FONT
ws["A1"].fill = TITLE_FILL
ws["A1"].alignment = Alignment(horizontal="left")
ws.merge_cells(start_row=2, start_column=1, end_row=2, end_column=max(8, ws.max_column or 8))
ws["A2"] = subtitle
ws["A2"].font = NOTE_FONT
def _write_table(ws, start_row: int, start_col: int, headers: list[str], rows: list[list[Any]]) -> int:
for i, header in enumerate(headers, start=start_col):
cell = ws.cell(start_row, i)
cell.value = header
cell.font = WHITE_FONT
cell.fill = HEADER_FILL
cell.alignment = Alignment(horizontal="center")
for r_idx, row in enumerate(rows, start=start_row + 1):
for c_idx, value in enumerate(row, start=start_col):
ws.cell(r_idx, c_idx).value = value
return start_row + len(rows)
def _write_sector_universe_sheet(wb, rows: list[dict[str, Any]]) -> None:
if "sector_universe" in wb.sheetnames:
del wb["sector_universe"]
ws = wb.create_sheet("sector_universe")
headers = [
"Sector", "Proxy_Ticker", "Proxy_Name", "Proxy_Type", "Base_Ticker",
"Constituent_Code", "Constituent_Name", "Weight", "Is_ETF", "Enabled",
"Effective_Date", "Source", "Transport_Mode", "Source_URL", "Source_AsOf", "Sector_Check",
"Weight_Sum_All", "Weight_Sum_Stocks_Only", "ETF_Rows", "Status",
]
now = _kst_now().strftime("%Y-%m-%d %H:%M:%S")
ws["A1"] = f"updated: {now} KST"
ws["A1"].font = Font(bold=True)
_write_table(ws, 2, 1, headers, [[r.get(h, "") for h in headers] for r in rows])
for col_idx, header in enumerate(headers, start=1):
if header in {"Proxy_Ticker", "Base_Ticker", "Constituent_Code"}:
for r in range(3, ws.max_row + 1):
ws.cell(r, col_idx).number_format = "@"
if header in {"Weight", "Weight_Sum_All", "Weight_Sum_Stocks_Only"}:
for r in range(3, ws.max_row + 1):
ws.cell(r, col_idx).number_format = "0.0000"
width = 16
if header in {"Constituent_Name", "Proxy_Name"}:
width = 22
elif header in {"Source_URL"}:
width = 42
elif header in {"Status", "Source", "Sector_Check", "Proxy_Type", "Transport_Mode"}:
width = 16
ws.column_dimensions[get_column_letter(col_idx)].width = width
ws.freeze_panes = "A3"
ws.sheet_view.showGridLines = False
def _write_audit_sheet(wb, audit_payload: dict[str, Any]) -> None:
audit = audit_payload["sector_universe_refresh_audit"]
if "sector_universe_refresh_audit" in wb.sheetnames:
del wb["sector_universe_refresh_audit"]
ws = wb.create_sheet("sector_universe_refresh_audit")
ws.sheet_view.showGridLines = False
_style_title(
ws,
"섹터 월간 갱신 감사",
"Naver ETF 페이지 기반 월간 갱신 상태와 provenance 분리 현황을 점검한다.",
)
summary = audit.get("summary", {})
summary_rows = [
["formula_id", audit.get("formula_id", "")],
["gate", audit.get("gate", "")],
["sector_count", summary.get("sector_count", 0)],
["current_count", summary.get("current_count", 0)],
["due_count", summary.get("due_count", 0)],
["overdue_count", summary.get("overdue_count", 0)],
["missing_count", summary.get("missing_count", 0)],
["template_count", summary.get("template_count", 0)],
["sheet_input_count", summary.get("sheet_input_count", 0)],
["naver_source_count", summary.get("naver_source_count", 0)],
["missing_source_url_count", summary.get("missing_source_url_count", 0)],
["stale_sector_count", summary.get("stale_sector_count", 0)],
["oldest_source_asof", summary.get("oldest_source_asof", "")],
["newest_source_asof", summary.get("newest_source_asof", "")],
]
_write_table(ws, 4, 1, ["key", "value"], summary_rows)
rows = audit.get("rows", []) or []
if rows:
headers = [
"sector", "proxy_ticker", "proxy_name", "proxy_type", "source_kind",
"source_url", "source_asof", "age_days", "constituent_count",
"stock_count", "etf_count", "weight_sum", "status", "refresh_reason",
]
_write_table(ws, 4, 4, headers, [[r.get(h, "") for h in headers] for r in rows])
for idx, header in enumerate(headers, start=4):
width = 16
if header in {"sector", "proxy_name", "refresh_reason"}:
width = 20
elif header == "source_url":
width = 42
ws.column_dimensions[get_column_letter(idx)].width = width
ws.freeze_panes = "A5"
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--input", default=str(DEFAULT_INPUT_XLSX))
ap.add_argument("--output", default=str(DEFAULT_OUTPUT_XLSX))
ap.add_argument("--limit", type=int, default=10, help="Per-sector holdings limit from Naver ETF pages")
ap.add_argument("--apply", action="store_true", help="Overwrite the input workbook in place as well")
args = ap.parse_args()
input_path = Path(args.input)
output_path = Path(args.output)
if not input_path.exists():
raise FileNotFoundError(input_path)
wb = load_workbook(input_path)
if "sector_universe" not in wb.sheetnames:
raise RuntimeError("sector_universe sheet not found")
seed_ws = wb["sector_universe"]
seed_rows = _extract_sector_seed_rows(seed_ws)
refreshed_rows, audit_payload = _build_refreshed_rows(seed_rows, max(1, args.limit))
_write_sector_universe_sheet(wb, refreshed_rows)
_write_audit_sheet(wb, audit_payload)
output_path.parent.mkdir(parents=True, exist_ok=True)
wb.save(output_path)
if args.apply and input_path.resolve() != output_path.resolve():
shutil.copy2(output_path, input_path)
print(json.dumps({
"status": "OK",
"input": str(input_path),
"output": str(output_path),
"rows": len(refreshed_rows),
"sectors": len(audit_payload["sector_stats"]),
"current_count": audit_payload["sector_universe_refresh_audit"]["summary"]["current_count"],
"overdue_count": audit_payload["sector_universe_refresh_audit"]["summary"]["overdue_count"],
"template_count": audit_payload["sector_universe_refresh_audit"]["summary"]["template_count"],
}, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
sys.exit(main())
@@ -0,0 +1,173 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import datetime as dt
import json
import sys
from pathlib import Path
from typing import Any
from openpyxl import load_workbook
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_XLSX = ROOT / "GatherTradingData.xlsx"
MAX_AGE_DAYS = 31
def _txt(value: Any, default: str = "") -> str:
if value is None:
return default
if isinstance(value, str):
return value.strip() or default
return str(value).strip() or default
def _parse_date(value: Any) -> dt.date | None:
text = _txt(value)
if not text:
return None
for fmt in ("%Y-%m-%d", "%Y.%m.%d", "%Y/%m/%d"):
try:
return dt.datetime.strptime(text[:10], fmt).date()
except Exception:
pass
try:
return dt.date.fromisoformat(text[:10])
except Exception:
return None
def _age_days(value: Any) -> int | None:
parsed = _parse_date(value)
if parsed is None:
return None
today = dt.datetime.now(dt.timezone(dt.timedelta(hours=9))).date()
return (today - parsed).days
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--xlsx", default=str(DEFAULT_XLSX))
args = ap.parse_args()
xlsx = Path(args.xlsx)
if not xlsx.exists():
print(f"[오류] 워크북 없음: {xlsx}")
return 1
wb = load_workbook(xlsx, data_only=True)
if "sector_universe" not in wb.sheetnames:
print("[FAIL] sector_universe 시트가 없습니다.")
return 1
ws = wb["sector_universe"]
headers = [_txt(ws.cell(2, c).value) for c in range(1, ws.max_column + 1)]
idx = {name: i for i, name in enumerate(headers) if name}
required = ["Sector", "Proxy_Ticker", "Constituent_Code", "Weight", "Source", "Source_URL", "Source_AsOf"]
missing_headers = [h for h in required if h not in idx]
rows: list[dict[str, Any]] = []
for r in range(3, ws.max_row + 1):
row = {h: ws.cell(r, c + 1).value for c, h in enumerate(headers) if h}
if not any(v not in (None, "") for v in row.values()):
continue
rows.append(row)
sector_map: dict[str, list[dict[str, Any]]] = {}
for row in rows:
sector = _txt(row.get("Sector"))
if sector:
sector_map.setdefault(sector, []).append(row)
template_rows = 0
representative_rows = 0
sheet_input_rows = 0
naver_rows = 0
layout_changed_rows = 0
fail_rows = 0
missing_source_url = 0
stale_rows = 0
mixed_sector_count = 0
sector_status_rows: list[str] = []
for sector, sector_rows in sector_map.items():
source_kinds = {_txt(r.get("Source"), "SHEET_INPUT") or "SHEET_INPUT" for r in sector_rows}
if len(source_kinds) > 1:
mixed_sector_count += 1
sector_status_rows.append(f"{sector}:MIXED({','.join(sorted(source_kinds))})")
sector_template = any(src == "DEFAULT_TEMPLATE" for src in source_kinds)
sector_rep = any(src == "REPRESENTATIVE_STOCK_PROXY" for src in source_kinds)
sector_input = any(src == "SHEET_INPUT" for src in source_kinds)
sector_naver = any(src == "NAVER_ETF_PAGE" for src in source_kinds)
sector_layout_changed = any(src == "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED" for src in source_kinds)
sector_fail = any("FAIL" in src for src in source_kinds)
if sector_template:
template_rows += len(sector_rows)
if sector_rep:
representative_rows += len(sector_rows)
if sector_input:
sheet_input_rows += len(sector_rows)
if sector_naver:
naver_rows += len(sector_rows)
if sector_layout_changed:
layout_changed_rows += len(sector_rows)
if sector_fail:
fail_rows += len(sector_rows)
source_urls = {_txt(r.get("Source_URL")) for r in sector_rows if _txt(r.get("Source_URL"))}
if not source_urls:
missing_source_url += len(sector_rows)
ages = [_age_days(r.get("Source_AsOf")) for r in sector_rows]
age_vals = [a for a in ages if a is not None]
if age_vals and max(age_vals) > MAX_AGE_DAYS:
stale_rows += sum(1 for a in age_vals if a is not None and a > MAX_AGE_DAYS)
sector_status_rows.append(f"{sector}:STALE(max={max(age_vals)})")
gate = "PASS"
if missing_headers:
gate = "FAIL"
elif template_rows > 0 or fail_rows > 0 or stale_rows > 0 or mixed_sector_count > 0:
gate = "FAIL"
elif sheet_input_rows > 0:
gate = "WARN"
print(f"[sector_universe_refresh] gate={gate}")
print(f" rows={len(rows)} sectors={len(sector_map)}")
print(f" naver_rows={naver_rows} representative_rows={representative_rows} layout_changed_rows={layout_changed_rows} sheet_input_rows={sheet_input_rows} template_rows={template_rows} fail_rows={fail_rows}")
print(f" missing_source_url={missing_source_url} stale_rows={stale_rows} mixed_sector_count={mixed_sector_count}")
if missing_headers:
print(f" missing_headers={missing_headers}")
if sector_status_rows:
print(" sector_flags=" + ", ".join(sector_status_rows[:20]))
result = {
"validator": "validate_sector_universe_monthly_refresh_v1",
"gate": gate,
"total_rows": len(rows),
"sector_count": len(sector_map),
"naver_rows": naver_rows,
"representative_rows": representative_rows,
"layout_changed_rows": layout_changed_rows,
"sheet_input_rows": sheet_input_rows,
"template_rows": template_rows,
"fail_rows": fail_rows,
"missing_source_url": missing_source_url,
"stale_rows": stale_rows,
"mixed_sector_count": mixed_sector_count,
"missing_headers": missing_headers,
"sector_flags": sector_status_rows,
"max_age_days": MAX_AGE_DAYS,
}
out = ROOT / "Temp" / "sector_universe_refresh_validation.json"
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"OUTPUT: {out}")
return 0 if gate in {"PASS", "WARN"} else 1
if __name__ == "__main__":
sys.exit(main())