174 lines
6.1 KiB
Python
174 lines
6.1 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from openpyxl import load_workbook
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_XLSX = ROOT / "GatherTradingData.xlsx"
|
|
MAX_AGE_DAYS = 31
|
|
|
|
|
|
def _txt(value: Any, default: str = "") -> str:
|
|
if value is None:
|
|
return default
|
|
if isinstance(value, str):
|
|
return value.strip() or default
|
|
return str(value).strip() or default
|
|
|
|
|
|
def _parse_date(value: Any) -> dt.date | None:
|
|
text = _txt(value)
|
|
if not text:
|
|
return None
|
|
for fmt in ("%Y-%m-%d", "%Y.%m.%d", "%Y/%m/%d"):
|
|
try:
|
|
return dt.datetime.strptime(text[:10], fmt).date()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
return dt.date.fromisoformat(text[:10])
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _age_days(value: Any) -> int | None:
|
|
parsed = _parse_date(value)
|
|
if parsed is None:
|
|
return None
|
|
today = dt.datetime.now(dt.timezone(dt.timedelta(hours=9))).date()
|
|
return (today - parsed).days
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--xlsx", default=str(DEFAULT_XLSX))
|
|
args = ap.parse_args()
|
|
|
|
xlsx = Path(args.xlsx)
|
|
if not xlsx.exists():
|
|
print(f"[오류] 워크북 없음: {xlsx}")
|
|
return 1
|
|
|
|
wb = load_workbook(xlsx, data_only=True)
|
|
if "sector_universe" not in wb.sheetnames:
|
|
print("[FAIL] sector_universe 시트가 없습니다.")
|
|
return 1
|
|
|
|
ws = wb["sector_universe"]
|
|
headers = [_txt(ws.cell(2, c).value) for c in range(1, ws.max_column + 1)]
|
|
idx = {name: i for i, name in enumerate(headers) if name}
|
|
required = ["Sector", "Proxy_Ticker", "Constituent_Code", "Weight", "Source", "Source_URL", "Source_AsOf"]
|
|
missing_headers = [h for h in required if h not in idx]
|
|
|
|
rows: list[dict[str, Any]] = []
|
|
for r in range(3, ws.max_row + 1):
|
|
row = {h: ws.cell(r, c + 1).value for c, h in enumerate(headers) if h}
|
|
if not any(v not in (None, "") for v in row.values()):
|
|
continue
|
|
rows.append(row)
|
|
|
|
sector_map: dict[str, list[dict[str, Any]]] = {}
|
|
for row in rows:
|
|
sector = _txt(row.get("Sector"))
|
|
if sector:
|
|
sector_map.setdefault(sector, []).append(row)
|
|
|
|
template_rows = 0
|
|
representative_rows = 0
|
|
sheet_input_rows = 0
|
|
naver_rows = 0
|
|
layout_changed_rows = 0
|
|
fail_rows = 0
|
|
missing_source_url = 0
|
|
stale_rows = 0
|
|
mixed_sector_count = 0
|
|
sector_status_rows: list[str] = []
|
|
|
|
for sector, sector_rows in sector_map.items():
|
|
source_kinds = {_txt(r.get("Source"), "SHEET_INPUT") or "SHEET_INPUT" for r in sector_rows}
|
|
if len(source_kinds) > 1:
|
|
mixed_sector_count += 1
|
|
sector_status_rows.append(f"{sector}:MIXED({','.join(sorted(source_kinds))})")
|
|
|
|
sector_template = any(src == "DEFAULT_TEMPLATE" for src in source_kinds)
|
|
sector_rep = any(src == "REPRESENTATIVE_STOCK_PROXY" for src in source_kinds)
|
|
sector_input = any(src == "SHEET_INPUT" for src in source_kinds)
|
|
sector_naver = any(src == "NAVER_ETF_PAGE" for src in source_kinds)
|
|
sector_layout_changed = any(src == "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED" for src in source_kinds)
|
|
sector_fail = any("FAIL" in src for src in source_kinds)
|
|
|
|
if sector_template:
|
|
template_rows += len(sector_rows)
|
|
if sector_rep:
|
|
representative_rows += len(sector_rows)
|
|
if sector_input:
|
|
sheet_input_rows += len(sector_rows)
|
|
if sector_naver:
|
|
naver_rows += len(sector_rows)
|
|
if sector_layout_changed:
|
|
layout_changed_rows += len(sector_rows)
|
|
if sector_fail:
|
|
fail_rows += len(sector_rows)
|
|
|
|
source_urls = {_txt(r.get("Source_URL")) for r in sector_rows if _txt(r.get("Source_URL"))}
|
|
if not source_urls:
|
|
missing_source_url += len(sector_rows)
|
|
|
|
ages = [_age_days(r.get("Source_AsOf")) for r in sector_rows]
|
|
age_vals = [a for a in ages if a is not None]
|
|
if age_vals and max(age_vals) > MAX_AGE_DAYS:
|
|
stale_rows += sum(1 for a in age_vals if a is not None and a > MAX_AGE_DAYS)
|
|
sector_status_rows.append(f"{sector}:STALE(max={max(age_vals)})")
|
|
|
|
gate = "PASS"
|
|
if missing_headers:
|
|
gate = "FAIL"
|
|
elif template_rows > 0 or fail_rows > 0 or stale_rows > 0 or mixed_sector_count > 0:
|
|
gate = "FAIL"
|
|
elif sheet_input_rows > 0:
|
|
gate = "WARN"
|
|
|
|
print(f"[sector_universe_refresh] gate={gate}")
|
|
print(f" rows={len(rows)} sectors={len(sector_map)}")
|
|
print(f" naver_rows={naver_rows} representative_rows={representative_rows} layout_changed_rows={layout_changed_rows} sheet_input_rows={sheet_input_rows} template_rows={template_rows} fail_rows={fail_rows}")
|
|
print(f" missing_source_url={missing_source_url} stale_rows={stale_rows} mixed_sector_count={mixed_sector_count}")
|
|
if missing_headers:
|
|
print(f" missing_headers={missing_headers}")
|
|
if sector_status_rows:
|
|
print(" sector_flags=" + ", ".join(sector_status_rows[:20]))
|
|
|
|
result = {
|
|
"validator": "validate_sector_universe_monthly_refresh_v1",
|
|
"gate": gate,
|
|
"total_rows": len(rows),
|
|
"sector_count": len(sector_map),
|
|
"naver_rows": naver_rows,
|
|
"representative_rows": representative_rows,
|
|
"layout_changed_rows": layout_changed_rows,
|
|
"sheet_input_rows": sheet_input_rows,
|
|
"template_rows": template_rows,
|
|
"fail_rows": fail_rows,
|
|
"missing_source_url": missing_source_url,
|
|
"stale_rows": stale_rows,
|
|
"mixed_sector_count": mixed_sector_count,
|
|
"missing_headers": missing_headers,
|
|
"sector_flags": sector_status_rows,
|
|
"max_age_days": MAX_AGE_DAYS,
|
|
}
|
|
out = ROOT / "Temp" / "sector_universe_refresh_validation.json"
|
|
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(f"OUTPUT: {out}")
|
|
return 0 if gate in {"PASS", "WARN"} else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|