from __future__ import annotations import json from datetime import datetime from pathlib import Path from openpyxl import load_workbook from openpyxl.chart import BarChart, LineChart, Reference from openpyxl.styles import Font, PatternFill, Alignment from openpyxl.utils import get_column_letter ROOT = Path(__file__).resolve().parent.parent INPUT_XLSX = ROOT / "GatherTradingData.xlsx" OUTPUT_DIR = ROOT / "outputs" / "sector_insights_enhanced" OUTPUT_XLSX = OUTPUT_DIR / "GatherTradingData_sector_insights.xlsx" SECTOR_JSON = ROOT / "Temp" / "sector_trend_analysis_v1.json" ETF_JSON = ROOT / "Temp" / "etf_representative_monitor_v1.json" HEADER_FILL = PatternFill("solid", fgColor="1F4E78") SUBHEADER_FILL = PatternFill("solid", fgColor="D9EAF7") KPI_FILL = PatternFill("solid", fgColor="F3F7FB") KPI_LABEL_FILL = PatternFill("solid", fgColor="E2F0D9") KPI_VALUE_FILL = PatternFill("solid", fgColor="FFF2CC") WHITE_FONT = Font(color="FFFFFF", bold=True) BOLD_FONT = Font(bold=True) TITLE_FONT = Font(size=14, bold=True) NOTE_FONT = Font(italic=True, color="666666") def load_json(path: Path) -> dict: return json.loads(path.read_text(encoding="utf-8")) def remove_if_exists(wb, name: str) -> None: if name in wb.sheetnames: del wb[name] def style_title(ws, title: str, subtitle: str | None = None, end_col: int = 8) -> None: ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=end_col) ws["A1"] = title ws["A1"].font = TITLE_FONT ws["A1"].fill = HEADER_FILL ws["A1"].font = WHITE_FONT ws["A1"].alignment = Alignment(horizontal="left") if subtitle: ws.merge_cells(start_row=2, start_column=1, end_row=2, end_column=end_col) ws["A2"] = subtitle ws["A2"].font = NOTE_FONT def write_table(ws, start_row: int, start_col: int, headers: list[str], rows: list[list], header_fill=HEADER_FILL) -> int: for j, header in enumerate(headers, start=start_col): cell = ws.cell(start_row, j) cell.value = header cell.font = WHITE_FONT cell.fill = header_fill cell.alignment = Alignment(horizontal="center", vertical="center") for i, row in enumerate(rows, start=start_row + 1): for j, value in enumerate(row, start=start_col): cell = ws.cell(i, j) cell.value = value cell.alignment = Alignment(vertical="top") return start_row + len(rows) def add_kpi_block(ws, start_row: int, items: list[tuple[str, object]]) -> int: ws.cell(start_row, 1).value = "KPI" ws.cell(start_row, 1).fill = SUBHEADER_FILL ws.cell(start_row, 1).font = BOLD_FONT row = start_row + 1 for label, value in items: ws.cell(row, 1).value = label ws.cell(row, 1).fill = KPI_LABEL_FILL ws.cell(row, 1).font = BOLD_FONT ws.cell(row, 2).value = value ws.cell(row, 2).fill = KPI_VALUE_FILL row += 1 return row def set_col_widths(ws, widths: dict[str, int]) -> None: for col, width in widths.items(): ws.column_dimensions[col].width = width def style_sheet(ws) -> None: ws.freeze_panes = "A3" ws.sheet_view.showGridLines = False def extract_sheet_rows(wb, sheet_name: str) -> tuple[list[str], list[list]]: ws = wb[sheet_name] first_row = [ws.cell(1, c).value for c in range(1, ws.max_column + 1)] second_row = [ws.cell(2, c).value for c in range(1, ws.max_column + 1)] if ws.max_row >= 2 else [] use_second_row = ( bool(second_row) and not any(v not in (None, "") for v in first_row) and any(v not in (None, "") for v in second_row) ) headers = second_row if use_second_row else first_row start_row = 3 if use_second_row else 2 rows: list[list] = [] for r in range(start_row, ws.max_row + 1): row = [ws.cell(r, c).value for c in range(1, ws.max_column + 1)] if any(v is not None and v != "" for v in row): rows.append(row) return headers, rows def build_portfolio_summary(wb) -> None: def display(value): return value if value not in (None, "") else "DATA_MISSING — 하네스 업데이트 필요" daily_headers, daily_rows = extract_sheet_rows(wb, "daily_history") monthly_headers, monthly_rows = extract_sheet_rows(wb, "monthly_history") account_headers, account_rows = extract_sheet_rows(wb, "account_snapshot") ws = wb.create_sheet("portfolio_performance_summary") style_sheet(ws) style_title( ws, "포트폴리오 성과 요약", "내 자금의 일간/월간 추이와 최신 보유 비중을 함께 보는 요약 시트", end_col=10, ) latest_daily = daily_rows[-1] if daily_rows else [] latest_month = monthly_rows[-1] if monthly_rows else [] latest_total_asset = latest_daily[1] if len(latest_daily) > 1 else None latest_peak_asset = latest_daily[2] if len(latest_daily) > 2 else None latest_mdd = latest_daily[3] if len(latest_daily) > 3 else None latest_month_total = latest_month[1] if len(latest_month) > 1 else None latest_month_return = latest_month[8] if len(latest_month) > 8 else None latest_ytd_return = latest_month[10] if len(latest_month) > 10 else None latest_capture = None if account_rows: latest_capture = account_rows[0][0] for row in account_rows: if row and row[0] and row[0] > latest_capture: latest_capture = row[0] latest_holdings = [r for r in account_rows if r and r[0] == latest_capture] holdings_sorted = sorted( latest_holdings, key=lambda r: (r[10] if len(r) > 10 and isinstance(r[10], (int, float)) else 0), reverse=True, ) total_mv = sum(r[10] for r in holdings_sorted if len(r) > 10 and isinstance(r[10], (int, float))) total_cost = sum(r[8] for r in holdings_sorted if len(r) > 8 and isinstance(r[8], (int, float))) total_pl = sum(r[11] for r in holdings_sorted if len(r) > 11 and isinstance(r[11], (int, float))) items = [ ("latest_daily_asset", display(latest_total_asset)), ("latest_peak_asset", display(latest_peak_asset)), ("latest_daily_mdd_pct", display(latest_mdd)), ("latest_month_total_asset", display(latest_month_total)), ("latest_month_return_pct", display(latest_month_return)), ("latest_ytd_return_pct", display(latest_ytd_return)), ("latest_capture", display(latest_capture)), ("latest_holdings_count", display(len(latest_holdings))), ("latest_holdings_market_value", display(total_mv)), ("latest_holdings_profit_loss", display(total_pl)), ] add_kpi_block(ws, 4, items) ws["D4"] = "Portfolio view" ws["D4"].fill = SUBHEADER_FILL ws["D4"].font = BOLD_FONT ws["D5"] = "일간/월간 자산 추이는 실제 계좌 스냅샷 기반입니다." ws["D6"] = "보유 비중 차트는 최신 스냅샷의 시장가치 기준입니다." ws["D7"] = "수익률이 음수여도 숨기지 않고 그대로 보여줍니다." ws["G4"] = "Top holdings" ws["G4"].fill = SUBHEADER_FILL ws["G4"].font = BOLD_FONT for i, row in enumerate(holdings_sorted[:10], start=5): name = row[4] if len(row) > 4 else "" mv = row[10] if len(row) > 10 else "" ws.cell(i, 7).value = f"{name} ({mv})" # Daily history chart helper daily_sheet = wb["daily_history"] daily_max = daily_sheet.max_row daily_chart = LineChart() daily_chart.title = "Daily Asset / MDD" daily_chart.y_axis.title = "KRW / %" daily_chart.x_axis.title = "Date" daily_chart.height = 7 daily_chart.width = 13 daily_data = Reference(daily_sheet, min_col=2, max_col=4, min_row=1, max_row=daily_max) daily_cats = Reference(daily_sheet, min_col=1, min_row=2, max_row=daily_max) daily_chart.add_data(daily_data, titles_from_data=True, from_rows=False) daily_chart.set_categories(daily_cats) daily_chart.style = 2 ws.add_chart(daily_chart, "A13") # Monthly history chart monthly_sheet = wb["monthly_history"] monthly_max = monthly_sheet.max_row monthly_chart = LineChart() monthly_chart.title = "Monthly Return Trend" monthly_chart.y_axis.title = "%" monthly_chart.x_axis.title = "Month" monthly_chart.height = 7 monthly_chart.width = 13 monthly_data = Reference(monthly_sheet, min_col=8, max_col=11, min_row=1, max_row=monthly_max) monthly_cats = Reference(monthly_sheet, min_col=1, min_row=2, max_row=monthly_max) monthly_chart.add_data(monthly_data, titles_from_data=True, from_rows=False) monthly_chart.set_categories(monthly_cats) monthly_chart.style = 3 ws.add_chart(monthly_chart, "G13") # Top holdings bar chart hold_chart = BarChart() hold_chart.type = "bar" hold_chart.title = "Top Holdings by Market Value" hold_chart.y_axis.title = "Holding" hold_chart.x_axis.title = "KRW" hold_chart.height = 8 hold_chart.width = 13 ws_hold = wb.create_sheet("_portfolio_holdings_helper") helper_headers = ["name", "market_value"] helper_rows = [[r[4], r[10]] for r in holdings_sorted[:10] if len(r) > 10] write_table(ws_hold, 1, 1, helper_headers, helper_rows) hold_data = Reference(ws_hold, min_col=2, min_row=1, max_row=1 + len(helper_rows)) hold_cats = Reference(ws_hold, min_col=1, min_row=2, max_row=1 + len(helper_rows)) hold_chart.add_data(hold_data, titles_from_data=True) hold_chart.set_categories(hold_cats) hold_chart.legend = None ws.add_chart(hold_chart, "A30") set_col_widths(ws, {"A": 22, "B": 18, "C": 18, "D": 24, "E": 18, "F": 18, "G": 26, "H": 26, "I": 18, "J": 18}) def build_portfolio_sector_exposure(wb) -> None: daily_headers, daily_rows = extract_sheet_rows(wb, "daily_history") account_headers, account_rows = extract_sheet_rows(wb, "account_snapshot") universe_headers, universe_rows = extract_sheet_rows(wb, "universe") account_dicts = [dict(zip(account_headers, row)) for row in account_rows if any(v not in (None, "") for v in row)] universe_dicts = [dict(zip(universe_headers, row)) for row in universe_rows if any(v not in (None, "") for v in row)] sector_map: dict[str, str] = {} for row in universe_dicts: ticker = str(row.get("Ticker", "") or "").zfill(6) sector = str(row.get("Sector", "") or "").strip() if ticker and sector: sector_map[ticker] = sector latest_capture = "" for row in account_dicts: cap = str(row.get("captured_at", "") or "") if cap and cap >= latest_capture: latest_capture = cap latest_rows = [r for r in account_dicts if str(r.get("captured_at", "") or "") == latest_capture] exposure: dict[str, dict[str, float]] = {} sector_holdings: dict[str, list[dict[str, object]]] = {} for row in latest_rows: ticker = str(row.get("ticker", "") or "").zfill(6) sector = sector_map.get(ticker, "미분류") mv = float(row.get("market_value", 0) or 0) pl = float(row.get("profit_loss", 0) or 0) cost = float(row.get("total_cost", 0) or 0) bucket = exposure.setdefault(sector, {"market_value": 0.0, "profit_loss": 0.0, "cost": 0.0, "count": 0.0}) bucket["market_value"] += mv bucket["profit_loss"] += pl bucket["cost"] += cost bucket["count"] += 1 sector_holdings.setdefault(sector, []).append({ "ticker": ticker, "name": row.get("name", ""), "market_value": mv, "profit_loss": pl, "return_pct": row.get("return_pct", ""), }) total_mv = sum(v["market_value"] for v in exposure.values()) or 1.0 rows = [] for sector, vals in sorted(exposure.items(), key=lambda kv: kv[1]["market_value"], reverse=True): pct = vals["market_value"] / total_mv * 100.0 ret_pct = (vals["profit_loss"] / vals["cost"] * 100.0) if vals["cost"] else 0.0 rows.append([sector, vals["count"], vals["market_value"], pct, vals["profit_loss"], ret_pct]) ws = wb.create_sheet("portfolio_sector_exposure") style_sheet(ws) style_title( ws, "포트폴리오 섹터 노출", "최신 계좌 스냅샷 기준으로 섹터별 보유 시장가치와 손익률을 집계", end_col=8, ) items = [ ("latest_capture", latest_capture), ("sector_count", len(rows)), ("top_sector", rows[0][0] if rows else ""), ("top_sector_weight_pct", rows[0][3] if rows else 0), ("top3_sector_weight_pct", sum(r[3] for r in rows[:3]) if rows else 0), ("total_market_value", total_mv), ] add_kpi_block(ws, 4, items) headers = ["sector", "holding_count", "market_value", "weight_pct", "profit_loss", "return_pct"] write_table(ws, 4, 4, headers, rows) ws["D4"] = "Sector exposure" ws["D4"].fill = SUBHEADER_FILL ws["D4"].font = BOLD_FONT ws.freeze_panes = "A5" ws.column_dimensions["A"].width = 24 ws.column_dimensions["B"].width = 14 ws.column_dimensions["C"].width = 18 ws.column_dimensions["D"].width = 24 ws.column_dimensions["E"].width = 14 ws.column_dimensions["F"].width = 14 ws.column_dimensions["G"].width = 16 ws.column_dimensions["H"].width = 14 chart = BarChart() chart.type = "bar" chart.style = 10 chart.title = "Sector Exposure by Market Value" chart.y_axis.title = "Sector" chart.x_axis.title = "KRW" chart.height = 8 chart.width = 14 data_ref = Reference(ws, min_col=6, min_row=4, max_row=4 + len(rows)) cats = Reference(ws, min_col=4, min_row=5, max_row=4 + len(rows)) chart.add_data(data_ref, titles_from_data=True) chart.set_categories(cats) chart.legend = None ws.add_chart(chart, "J4") detail_rows: list[list[object]] = [] for sector, vals in sorted(exposure.items(), key=lambda kv: kv[1]["market_value"], reverse=True)[:5]: sector_mv = vals["market_value"] or 1.0 holdings = sorted(sector_holdings.get(sector, []), key=lambda item: float(item.get("market_value", 0) or 0), reverse=True)[:3] for idx, holding in enumerate(holdings, start=1): mv = float(holding.get("market_value", 0) or 0) detail_rows.append([ sector if idx == 1 else "", idx, holding.get("ticker", ""), holding.get("name", ""), mv, mv / sector_mv * 100.0, mv / total_mv * 100.0, holding.get("return_pct", ""), ]) ws["A18"] = "Sector top holdings detail" ws["A18"].fill = SUBHEADER_FILL ws["A18"].font = BOLD_FONT write_table( ws, 19, 1, ["sector", "rank_in_sector", "ticker", "name", "market_value", "sector_weight_pct", "portfolio_weight_pct", "return_pct"], detail_rows, ) ws.column_dimensions["I"].width = 18 ws.column_dimensions["J"].width = 18 ws.column_dimensions["K"].width = 18 ws.column_dimensions["L"].width = 18 ws.column_dimensions["M"].width = 18 if detail_rows: detail_chart = BarChart() detail_chart.type = "bar" detail_chart.style = 11 detail_chart.title = "Top Holdings Contribution" detail_chart.y_axis.title = "Holding" detail_chart.x_axis.title = "Portfolio Weight %" detail_chart.height = 8 detail_chart.width = 14 # Use the first 15 rows of the detail table for a readable chart. chart_end_row = 19 + min(len(detail_rows), 15) data_ref2 = Reference(ws, min_col=7, min_row=19, max_row=chart_end_row) cats2 = Reference(ws, min_col=4, min_row=20, max_row=chart_end_row) detail_chart.add_data(data_ref2, titles_from_data=True) detail_chart.set_categories(cats2) detail_chart.legend = None ws.add_chart(detail_chart, "J20") def build_sector_summary(wb, data: dict) -> None: ws = wb.create_sheet("sector_trend_summary") style_sheet(ws) style_title( ws, "섹터 동향 분석 요약", "ETF 프록시, 스마트머니 유입, 수익률, 유동성 경고를 한 장에 요약한 시트", end_col=8, ) summary = data.get("summary") or {} concentration = data.get("concentration") or {} items = [ ("formula_id", data.get("formula_id", "")), ("gate", data.get("gate", "")), ("latest_snapshot_date", data.get("latest_snapshot_date", "")), ("previous_snapshot_date", data.get("previous_snapshot_date", "")), ("sector_count", data.get("sector_count", 0)), ("trend_posture", summary.get("trend_posture", "")), ("rising_count", summary.get("rising_count", 0)), ("fading_count", summary.get("fading_count", 0)), ("stable_count", summary.get("stable_count", 0)), ("etf_proxy_count", summary.get("etf_proxy_count", 0)), ("smart_money_inflow_count", summary.get("smart_money_inflow_count", 0)), ("smart_money_outflow_count", summary.get("smart_money_outflow_count", 0)), ("flow_aligned_count", summary.get("flow_aligned_count", 0)), ("flow_diverging_count", summary.get("flow_diverging_count", 0)), ] add_kpi_block(ws, 4, items) ws["D4"] = "Concentration" ws["D4"].fill = SUBHEADER_FILL ws["D4"].font = BOLD_FONT for idx, (label, value) in enumerate( [ ("top_sector", concentration.get("top_sector", "")), ("top_sector_weight_pct", concentration.get("top_sector_weight_pct", 0)), ("top2_weight_pct", concentration.get("top2_weight_pct", 0)), ("concentration_gate", concentration.get("concentration_gate", "")), ], start=5, ): ws.cell(idx, 4).value = label ws.cell(idx, 4).fill = KPI_LABEL_FILL ws.cell(idx, 4).font = BOLD_FONT ws.cell(idx, 5).value = value ws.cell(idx, 5).fill = KPI_VALUE_FILL top_inflow = summary.get("top_inflow_sectors") or [] outflow = summary.get("outflow_warning_sectors") or [] ws["G4"] = "Top Inflow" ws["G4"].fill = SUBHEADER_FILL ws["G4"].font = BOLD_FONT for i, item in enumerate(top_inflow, start=5): ws.cell(i, 7).value = item ws["H4"] = "Outflow Warning" ws["H4"].fill = SUBHEADER_FILL ws["H4"].font = BOLD_FONT for i, item in enumerate(outflow, start=5): ws.cell(i, 8).value = item ws["A20"] = "Notes" ws["A20"].fill = SUBHEADER_FILL ws["A20"].font = BOLD_FONT ws["A21"] = "섹터별 ETF 프록시와 스마트머니 방향이 다르면 매수 근거를 보수적으로 해석해야 합니다." ws["A21"].alignment = Alignment(wrap_text=True) ws["A22"] = "데이터 결측은 하네스 업데이트가 필요합니다." ws["A22"].alignment = Alignment(wrap_text=True) chart = LineChart() chart.title = "Average Sector Score / Breadth Trend" chart.y_axis.title = "Score / Count" chart.x_axis.title = "Snapshot" chart.height = 7.5 chart.width = 13 timeline_sheet = wb["sector_trend_timeline"] max_row = timeline_sheet.max_row data_ref = Reference(timeline_sheet, min_col=13, min_row=4, max_row=max_row, max_col=17) cats = Reference(timeline_sheet, min_col=12, min_row=5, max_row=max_row) chart.add_data(data_ref, titles_from_data=True, from_rows=False) chart.set_categories(cats) chart.style = 2 ws.add_chart(chart, "G12") set_col_widths(ws, {"A": 28, "B": 18, "C": 16, "D": 24, "E": 16, "F": 18, "G": 24, "H": 24}) def build_sector_analysis(wb, data: dict) -> None: ws = wb.create_sheet("sector_trend_analysis") style_sheet(ws) style_title( ws, "섹터 동향 분석", "섹터별 ETF 프록시, 스마트머니 유입, 수익률, 유동성 방향을 함께 보는 상세 시트", end_col=18, ) headers = [ "sector", "proxy_ticker", "proxy_name", "proxy_type", "etf_code", "etf_execution_use", "etf_liquidity_score", "etf_liquidity_status", "etf_nav_risk", "proxy_confidence", "rank", "rank_delta_w1", "rank_delta_w2", "sector_score", "score_delta", "sector_ret5d", "sector_ret20d", "etf_return_5d", "etf_return_20d", "sector_etf_ret_gap_5d", "sector_etf_ret_gap_20d", "smart_money_5d_krw_raw", "smart_money_20d_krw_raw", "smart_money_direction", "liquidity_direction", "flow_alignment_state", "momentum_state", "concentration_weight_pct" ] rows = [] for row in data.get("rows") or []: rows.append([row.get(h, "") for h in headers]) write_table(ws, 4, 1, headers, rows) ws.auto_filter.ref = f"A4:{get_column_letter(len(headers))}{4 + len(rows)}" ws.freeze_panes = "A5" for col in ["F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", "AA", "AB"]: ws.column_dimensions[col].width = 16 ws.column_dimensions["C"].width = 18 ws.column_dimensions["A"].width = 16 ws.column_dimensions["B"].width = 12 ws.column_dimensions["D"].width = 12 ws.column_dimensions["E"].width = 12 ws.column_dimensions["J"].width = 14 ws.column_dimensions["P"].width = 12 ws.column_dimensions["Q"].width = 12 ws.column_dimensions["AA"].width = 18 ws.column_dimensions["AB"].width = 18 chart = BarChart() chart.type = "bar" chart.style = 10 chart.title = "Sector 20D Return by Sector" chart.y_axis.title = "Sector" chart.x_axis.title = "20D Return" chart.height = 8 chart.width = 14 data_ref = Reference(ws, min_col=17, min_row=4, max_row=4 + len(rows)) cats = Reference(ws, min_col=1, min_row=5, max_row=4 + len(rows)) chart.add_data(data_ref, titles_from_data=True) chart.set_categories(cats) chart.legend = None ws.add_chart(chart, "AD4") def build_sector_timeline(wb, data: dict) -> None: ws = wb.create_sheet("sector_trend_timeline") style_sheet(ws) style_title(ws, "섹터 시계열", "최근 스냅샷 기준 경향성 추세", end_col=10) headers = [ "snapshot_date", "sector_count", "avg_sector_score", "top_sector", "top_sector_score", "positive_breadth_count", "liquidity_warn_count", "net_smart_money_5d_krw", "top_sector_rank", "top_sector_smart_money_5d_krw" ] rows = [] for row in data.get("timeline") or []: parsed_date = row.get("snapshot_date", "") if isinstance(parsed_date, str) and parsed_date: try: parsed_date = datetime.fromisoformat(parsed_date.replace("Z", "+00:00")).date() except Exception: pass rows.append([ parsed_date, row.get("sector_count", ""), row.get("avg_sector_score", ""), row.get("top_sector", ""), row.get("top_sector_score", ""), row.get("positive_breadth_count", ""), row.get("liquidity_warn_count", ""), row.get("net_smart_money_5d_krw", ""), row.get("top_sector_rank", ""), row.get("top_sector_smart_money_5d_krw", ""), ]) write_table(ws, 4, 1, headers, rows) helper_headers = [ "snapshot_date", "avg_sector_score", "top_sector_score", "positive_breadth_count", "liquidity_warn_count", "net_smart_money_5d_krw" ] helper_rows = [] for row in rows: helper_rows.append([row[0], row[2], row[4], row[5], row[6], row[7]]) write_table(ws, 4, 12, helper_headers, helper_rows) ws.freeze_panes = "A5" ws.column_dimensions["A"].width = 14 ws.column_dimensions["B"].width = 12 ws.column_dimensions["C"].width = 14 ws.column_dimensions["D"].width = 16 ws.column_dimensions["E"].width = 14 ws.column_dimensions["F"].width = 16 ws.column_dimensions["G"].width = 16 ws.column_dimensions["H"].width = 18 ws.column_dimensions["I"].width = 14 ws.column_dimensions["J"].width = 18 chart = LineChart() chart.title = "Trend Score / Breadth / Liquidity" chart.y_axis.title = "Count / Score" chart.x_axis.title = "Snapshot" chart.height = 8 chart.width = 15 data_ref = Reference(ws, min_col=13, max_col=17, min_row=4, max_row=4 + len(helper_rows)) cats = Reference(ws, min_col=12, min_row=5, max_row=4 + len(helper_rows)) chart.add_data(data_ref, titles_from_data=True, from_rows=False) chart.set_categories(cats) chart.style = 3 ws.add_chart(chart, "L4") def build_etf_summary(wb, data: dict) -> None: ws = wb.create_sheet("etf_representative_summary") style_sheet(ws) style_title( ws, "ETF 대표 종목 요약", "ETF 구성비중 우선, 부족분은 유동성 우선 후보로 보강한 3종목 바스켓 요약", end_col=8, ) summary = data.get("summary") or {} items = [ ("formula_id", data.get("formula_id", "")), ("gate", data.get("gate", "")), ("etf_sector_count", data.get("etf_sector_count", 0)), ("tracked_count", data.get("tracked_count", 0)), ("complete_basket_count", summary.get("complete_basket_count", 0)), ("partial_basket_count", summary.get("partial_basket_count", 0)), ("basket_missing_total", summary.get("basket_missing_total", 0)), ("weighted_basis_count", summary.get("weighted_basis_count", 0)), ("fallback_basis_count", summary.get("fallback_basis_count", 0)), ("selected_sector_count", summary.get("selected_sector_count", 0)), ] add_kpi_block(ws, 4, items) ws["D4"] = "Representative principle" ws["D4"].fill = SUBHEADER_FILL ws["D4"].font = BOLD_FONT ws["D5"] = "1) ETF constituent weight first" ws["D6"] = "2) Missing slots filled with same-sector live candidates" ws["D7"] = "3) Missing data stays explicit as DATA_MISSING" ws["D8"] = "4) Minimum 3 names per sector basket" ws["G4"] = "Top reps" ws["G4"].fill = SUBHEADER_FILL ws["G4"].font = BOLD_FONT for i, item in enumerate(summary.get("top_rep_names") or [], start=5): ws.cell(i, 7).value = item set_col_widths(ws, {"A": 28, "B": 18, "C": 16, "D": 30, "E": 18, "F": 18, "G": 24, "H": 24}) def build_etf_monitor(wb, data: dict) -> None: ws = wb.create_sheet("etf_representative_monitor") style_sheet(ws) style_title( ws, "ETF 대표 종목 모니터", "섹터별 3종목 바스켓과 선택 근거, 커버리지, 품질 상태를 추적", end_col=18, ) headers = [ "sector", "etf_proxy_ticker", "etf_proxy_name", "etf_proxy_type", "sector_rank", "sector_score", "sector_smart_money_5d_krw", "sector_ret20d", "representative_count", "representative_ticker", "representative_name", "representative_basis", "representative_basis_detail", "constituent_weight", "basket_quality_state", "basket_coverage_pct", "basket_state", "basket_buy_review_count", "basket_track_count", "basket_watch_count", "basket_caution_count", "basket_aligned_count", "basket_missing_count", "basket_real_count", "selection_source", "selection_score", "monitor_reason" ] rows = [] for row in data.get("rows") or []: rows.append([row.get(h, "") for h in headers]) write_table(ws, 4, 1, headers, rows) ws.auto_filter.ref = f"A4:{get_column_letter(len(headers))}{4 + len(rows)}" ws.freeze_panes = "A5" for col, width in {"A": 18, "B": 12, "C": 16, "D": 12, "E": 12, "F": 12, "G": 18, "H": 12, "I": 14, "J": 12, "K": 18, "L": 18, "M": 24, "N": 14, "O": 14, "P": 14, "Q": 14, "R": 14, "S": 14, "T": 14, "U": 14, "V": 14, "W": 14, "X": 18, "Y": 14, "Z": 12, "AA": 24}.items(): ws.column_dimensions[col].width = width chart = BarChart() chart.type = "bar" chart.style = 10 chart.title = "Basket Coverage by Sector" chart.y_axis.title = "Sector" chart.x_axis.title = "Coverage %" chart.height = 8 chart.width = 14 data_ref = Reference(ws, min_col=16, min_row=4, max_row=4 + len(rows)) cats = Reference(ws, min_col=1, min_row=5, max_row=4 + len(rows)) chart.add_data(data_ref, titles_from_data=True) chart.set_categories(cats) chart.legend = None ws.add_chart(chart, "AC4") def main() -> None: if not INPUT_XLSX.exists(): raise FileNotFoundError(INPUT_XLSX) if not SECTOR_JSON.exists(): raise FileNotFoundError(SECTOR_JSON) if not ETF_JSON.exists(): raise FileNotFoundError(ETF_JSON) sector = load_json(SECTOR_JSON) etf = load_json(ETF_JSON) wb = load_workbook(INPUT_XLSX) for name in [ "portfolio_performance_summary", "portfolio_sector_exposure", "_portfolio_holdings_helper", "sector_trend_summary", "sector_trend_analysis", "sector_trend_timeline", "etf_representative_summary", "etf_representative_monitor", ]: remove_if_exists(wb, name) # Build data sheets first so summary sheets can reference the timeline sheet. build_portfolio_summary(wb) build_portfolio_sector_exposure(wb) build_sector_timeline(wb, sector) build_sector_analysis(wb, sector) build_sector_summary(wb, sector) build_etf_monitor(wb, etf) build_etf_summary(wb, etf) # Put summary sheets near the front. order = [ "settings", "portfolio_performance_summary", "portfolio_sector_exposure", "sector_trend_summary", "sector_trend_analysis", "sector_trend_timeline", "etf_representative_summary", "etf_representative_monitor", ] existing = [s for s in wb.sheetnames if s not in order] wb._sheets = [wb[s] for s in order if s in wb.sheetnames] + [wb[s] for s in existing] if "_portfolio_holdings_helper" in wb.sheetnames: wb["_portfolio_holdings_helper"].sheet_state = "hidden" wb.active = wb.sheetnames.index("sector_trend_summary") OUTPUT_DIR.mkdir(parents=True, exist_ok=True) wb.save(OUTPUT_XLSX) print(f"saved {OUTPUT_XLSX}") print("sheets", wb.sheetnames[:10]) if __name__ == "__main__": main()