from __future__ import annotations import json from datetime import datetime from pathlib import Path from openpyxl import load_workbook from openpyxl.chart import BarChart, LineChart, Reference from openpyxl.styles import Font, PatternFill, Alignment from openpyxl.utils import get_column_letter import sys ROOT = Path(__file__).resolve().parent.parent if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from src.quant_engine.sector_universe_refresh import build_sector_universe_refresh_audit INPUT_XLSX = ROOT / "GatherTradingData.xlsx" OUTPUT_DIR = ROOT / "outputs" / "sector_insights_enhanced" OUTPUT_XLSX = OUTPUT_DIR / "GatherTradingData_sector_insights.xlsx" SECTOR_JSON = ROOT / "Temp" / "sector_trend_analysis_v1.json" ETF_JSON = ROOT / "Temp" / "etf_representative_monitor_v1.json" READINESS_JSON = ROOT / "Temp" / "operational_alpha_calibration_v2.json" READINESS_BRIDGE_JSON = ROOT / "Temp" / "performance_readiness_replay_bridge_v1.json" READINESS_BRIDGE_V2_JSON = ROOT / "Temp" / "performance_readiness_replay_bridge_v2.json" EVAL_QUEUE_JSON = ROOT / "Temp" / "operational_eval_queue_v1.json" HEADER_FILL = PatternFill("solid", fgColor="1F4E78") SUBHEADER_FILL = PatternFill("solid", fgColor="D9EAF7") KPI_FILL = PatternFill("solid", fgColor="F3F7FB") KPI_LABEL_FILL = PatternFill("solid", fgColor="E2F0D9") KPI_VALUE_FILL = PatternFill("solid", fgColor="FFF2CC") WHITE_FONT = Font(color="FFFFFF", bold=True) BOLD_FONT = Font(bold=True) TITLE_FONT = Font(size=14, bold=True) NOTE_FONT = Font(italic=True, color="666666") def load_json(path: Path) -> dict: return json.loads(path.read_text(encoding="utf-8")) def remove_if_exists(wb, name: str) -> None: if name in wb.sheetnames: del wb[name] def style_title(ws, title: str, subtitle: str | None = None, end_col: int = 8) -> None: ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=end_col) ws["A1"] = title ws["A1"].font = TITLE_FONT ws["A1"].fill = HEADER_FILL ws["A1"].font = WHITE_FONT ws["A1"].alignment = Alignment(horizontal="left") if subtitle: ws.merge_cells(start_row=2, start_column=1, end_row=2, end_column=end_col) ws["A2"] = subtitle ws["A2"].font = NOTE_FONT def write_table(ws, start_row: int, start_col: int, headers: list[str], rows: list[list], header_fill=HEADER_FILL) -> int: for j, header in enumerate(headers, start=start_col): cell = ws.cell(start_row, j) cell.value = header cell.font = WHITE_FONT cell.fill = header_fill cell.alignment = Alignment(horizontal="center", vertical="center") for i, row in enumerate(rows, start=start_row + 1): for j, value in enumerate(row, start=start_col): cell = ws.cell(i, j) cell.value = value cell.alignment = Alignment(vertical="top") return start_row + len(rows) def add_kpi_block(ws, start_row: int, items: list[tuple[str, object]]) -> int: ws.cell(start_row, 1).value = "KPI" ws.cell(start_row, 1).fill = SUBHEADER_FILL ws.cell(start_row, 1).font = BOLD_FONT row = start_row + 1 for label, value in items: ws.cell(row, 1).value = label ws.cell(row, 1).fill = KPI_LABEL_FILL ws.cell(row, 1).font = BOLD_FONT ws.cell(row, 2).value = value ws.cell(row, 2).fill = KPI_VALUE_FILL row += 1 return row def set_col_widths(ws, widths: dict[str, int]) -> None: for col, width in widths.items(): ws.column_dimensions[col].width = width def style_sheet(ws) -> None: ws.freeze_panes = "A3" ws.sheet_view.showGridLines = False def extract_sheet_rows(wb, sheet_name: str) -> tuple[list[str], list[list]]: ws = wb[sheet_name] first_row = [ws.cell(1, c).value for c in range(1, ws.max_column + 1)] second_row = [ws.cell(2, c).value for c in range(1, ws.max_column + 1)] if ws.max_row >= 2 else [] use_second_row = ( bool(second_row) and not any(v not in (None, "") for v in first_row) and any(v not in (None, "") for v in second_row) ) headers = second_row if use_second_row else first_row start_row = 3 if use_second_row else 2 rows: list[list] = [] for r in range(start_row, ws.max_row + 1): row = [ws.cell(r, c).value for c in range(1, ws.max_column + 1)] if any(v is not None and v != "" for v in row): rows.append(row) return headers, rows def build_portfolio_summary(wb) -> None: def display(value): return value if value not in (None, "") else "DATA_MISSING — 하네스 업데이트 필요" daily_headers, daily_rows = extract_sheet_rows(wb, "daily_history") monthly_headers, monthly_rows = extract_sheet_rows(wb, "monthly_history") account_headers, account_rows = extract_sheet_rows(wb, "account_snapshot") ws = wb.create_sheet("portfolio_performance_summary") style_sheet(ws) style_title( ws, "포트폴리오 성과 요약", "내 자금의 일간/월간 추이와 최신 보유 비중을 함께 보는 요약 시트", end_col=10, ) latest_daily = daily_rows[-1] if daily_rows else [] latest_month = monthly_rows[-1] if monthly_rows else [] latest_total_asset = latest_daily[1] if len(latest_daily) > 1 else None latest_peak_asset = latest_daily[2] if len(latest_daily) > 2 else None latest_mdd = latest_daily[3] if len(latest_daily) > 3 else None latest_month_total = latest_month[1] if len(latest_month) > 1 else None latest_month_return = latest_month[8] if len(latest_month) > 8 else None latest_ytd_return = latest_month[10] if len(latest_month) > 10 else None latest_capture = None if account_rows: latest_capture = account_rows[0][0] for row in account_rows: if row and row[0] and row[0] > latest_capture: latest_capture = row[0] latest_holdings = [r for r in account_rows if r and r[0] == latest_capture] holdings_sorted = sorted( latest_holdings, key=lambda r: (r[10] if len(r) > 10 and isinstance(r[10], (int, float)) else 0), reverse=True, ) total_mv = sum(r[10] for r in holdings_sorted if len(r) > 10 and isinstance(r[10], (int, float))) total_cost = sum(r[8] for r in holdings_sorted if len(r) > 8 and isinstance(r[8], (int, float))) total_pl = sum(r[11] for r in holdings_sorted if len(r) > 11 and isinstance(r[11], (int, float))) items = [ ("latest_daily_asset", display(latest_total_asset)), ("latest_peak_asset", display(latest_peak_asset)), ("latest_daily_mdd_pct", display(latest_mdd)), ("latest_month_total_asset", display(latest_month_total)), ("latest_month_return_pct", display(latest_month_return)), ("latest_ytd_return_pct", display(latest_ytd_return)), ("latest_capture", display(latest_capture)), ("latest_holdings_count", display(len(latest_holdings))), ("latest_holdings_market_value", display(total_mv)), ("latest_holdings_profit_loss", display(total_pl)), ] add_kpi_block(ws, 4, items) ws["D4"] = "Portfolio view" ws["D4"].fill = SUBHEADER_FILL ws["D4"].font = BOLD_FONT ws["D5"] = "일간/월간 자산 추이는 실제 계좌 스냅샷 기반입니다." ws["D6"] = "보유 비중 차트는 최신 스냅샷의 시장가치 기준입니다." ws["D7"] = "수익률이 음수여도 숨기지 않고 그대로 보여줍니다." ws["G4"] = "Top holdings" ws["G4"].fill = SUBHEADER_FILL ws["G4"].font = BOLD_FONT for i, row in enumerate(holdings_sorted[:10], start=5): name = row[4] if len(row) > 4 else "" mv = row[10] if len(row) > 10 else "" ws.cell(i, 7).value = f"{name} ({mv})" # Daily history chart helper daily_sheet = wb["daily_history"] daily_max = daily_sheet.max_row daily_chart = LineChart() daily_chart.title = "Daily Asset / MDD" daily_chart.y_axis.title = "KRW / %" daily_chart.x_axis.title = "Date" daily_chart.height = 7 daily_chart.width = 13 daily_data = Reference(daily_sheet, min_col=2, max_col=4, min_row=1, max_row=daily_max) daily_cats = Reference(daily_sheet, min_col=1, min_row=2, max_row=daily_max) daily_chart.add_data(daily_data, titles_from_data=True, from_rows=False) daily_chart.set_categories(daily_cats) daily_chart.style = 2 ws.add_chart(daily_chart, "A13") # Monthly history chart monthly_sheet = wb["monthly_history"] monthly_max = monthly_sheet.max_row monthly_chart = LineChart() monthly_chart.title = "Monthly Return Trend" monthly_chart.y_axis.title = "%" monthly_chart.x_axis.title = "Month" monthly_chart.height = 7 monthly_chart.width = 13 monthly_data = Reference(monthly_sheet, min_col=8, max_col=11, min_row=1, max_row=monthly_max) monthly_cats = Reference(monthly_sheet, min_col=1, min_row=2, max_row=monthly_max) monthly_chart.add_data(monthly_data, titles_from_data=True, from_rows=False) monthly_chart.set_categories(monthly_cats) monthly_chart.style = 3 ws.add_chart(monthly_chart, "G13") # Top holdings bar chart hold_chart = BarChart() hold_chart.type = "bar" hold_chart.title = "Top Holdings by Market Value" hold_chart.y_axis.title = "Holding" hold_chart.x_axis.title = "KRW" hold_chart.height = 8 hold_chart.width = 13 ws_hold = wb.create_sheet("_portfolio_holdings_helper") helper_headers = ["name", "market_value"] helper_rows = [[r[4], r[10]] for r in holdings_sorted[:10] if len(r) > 10] write_table(ws_hold, 1, 1, helper_headers, helper_rows) hold_data = Reference(ws_hold, min_col=2, min_row=1, max_row=1 + len(helper_rows)) hold_cats = Reference(ws_hold, min_col=1, min_row=2, max_row=1 + len(helper_rows)) hold_chart.add_data(hold_data, titles_from_data=True) hold_chart.set_categories(hold_cats) hold_chart.legend = None ws.add_chart(hold_chart, "A30") set_col_widths(ws, {"A": 22, "B": 18, "C": 18, "D": 24, "E": 18, "F": 18, "G": 26, "H": 26, "I": 18, "J": 18}) def build_performance_readiness_summary(wb) -> None: def display(value): return value if value not in (None, "") else "DATA_MISSING — 하네스 업데이트 필요" readiness = load_json(READINESS_JSON) if READINESS_JSON.exists() else {} bridge = load_json(READINESS_BRIDGE_JSON) if READINESS_BRIDGE_JSON.exists() else {} bridge_v2 = load_json(READINESS_BRIDGE_V2_JSON) if READINESS_BRIDGE_V2_JSON.exists() else {} live = bridge.get("live", {}) if isinstance(bridge.get("live"), dict) else {} replay = bridge.get("replay_informational", {}) if isinstance(bridge.get("replay_informational"), dict) else {} metrics = readiness.get("metrics", {}) if isinstance(readiness.get("metrics"), dict) else {} ws = wb.create_sheet("performance_readiness_summary") style_sheet(ws) style_title( ws, "성과 준비도 요약", "CHECK_83의 live T+20 데이터 게이트와 replay 브리지를 함께 보여주는 상태 시트", end_col=10, ) items = [ ("check_83_gate", display(readiness.get("gate"))), ("confidence_score", display(readiness.get("confidence_score"))), ("performance_ready", display(readiness.get("performance_ready"))), ("readiness_reasons", display(", ".join(readiness.get("readiness_reasons", [])) if isinstance(readiness.get("readiness_reasons"), list) else readiness.get("readiness_reasons"))), ("outcome_quality_score", display(metrics.get("outcome_quality_score"))), ("t20_operational_sample", display(metrics.get("t20_operational_sample"))), ("t5_operational_pass_rate", display(metrics.get("t5_operational_pass_rate"))), ("value_damage_pct_avg", display(metrics.get("value_damage_pct_avg"))), ("live_t20_count", display(live.get("t20_count"))), ("live_sample_gate", display(live.get("sample_gate"))), ] add_kpi_block(ws, 4, items) ws["D4"] = "Readiness rule" ws["D4"].fill = SUBHEADER_FILL ws["D4"].font = BOLD_FONT ws["D5"] = "live T+20가 30건 미만이면 PERFORMANCE_READY로 승격하지 않습니다." ws["D6"] = f"현재 상태: {readiness.get('gate', 'MISSING')}" ws["D7"] = f"브리지 승격 규칙: {bridge_v2.get('promotion_rule', 'DATA_MISSING — 하네스 업데이트 필요')}" ws["D8"] = f"브리지 승격 가능: {bridge_v2.get('promotion_allowed', 'DATA_MISSING — 하네스 업데이트 필요')}" ws["G4"] = "Replay reference" ws["G4"].fill = SUBHEADER_FILL ws["G4"].font = BOLD_FONT ws["G5"] = f"replay_t20_count: {display(replay.get('t20_count'))}" ws["G6"] = f"replay_t20_pass_rate_pct: {display(replay.get('t20_pass_rate_pct'))}" ws["G7"] = f"replay_t20_avg_return_pct: {display(replay.get('t20_avg_return_pct'))}" ws["G8"] = f"replay_note: {display(replay.get('note'))}" readiness_rows = [ ["metric", "count"], ["live_t20_count", live.get("t20_count") or 0], ["required_live_t20_count", 30], ["replay_t20_count", replay.get("t20_count") or 0], ] write_table(ws, 4, 10, readiness_rows[0], readiness_rows[1:]) readiness_chart = BarChart() readiness_chart.type = "bar" readiness_chart.style = 10 readiness_chart.title = "T20 Readiness vs Threshold" readiness_chart.y_axis.title = "Metric" readiness_chart.x_axis.title = "Count" readiness_chart.height = 6.5 readiness_chart.width = 11 readiness_data = Reference(ws, min_col=11, min_row=4, max_row=7) readiness_cats = Reference(ws, min_col=10, min_row=5, max_row=7) readiness_chart.add_data(readiness_data, titles_from_data=True) readiness_chart.set_categories(readiness_cats) readiness_chart.legend = None ws.add_chart(readiness_chart, "J13") set_col_widths(ws, {"A": 24, "B": 18, "C": 18, "D": 28, "E": 18, "F": 18, "G": 26, "H": 26, "I": 18, "J": 20, "K": 16}) def build_operational_eval_queue_summary(wb) -> None: def display(value): return value if value not in (None, "") else "DATA_MISSING — 하네스 업데이트 필요" queue_data = load_json(EVAL_QUEUE_JSON) if EVAL_QUEUE_JSON.exists() else {} metrics = queue_data.get("metrics", {}) if isinstance(queue_data.get("metrics"), dict) else {} queue_rows = queue_data.get("queue", []) if isinstance(queue_data.get("queue"), list) else [] todo = queue_data.get("todo_protocol", []) if isinstance(queue_data.get("todo_protocol"), list) else [] ws = wb.create_sheet("operational_eval_queue_summary") style_sheet(ws) style_title( ws, "운영 T+20 대기열 요약", "T+20 실제 결과 입력 대기 상태와 처리 프로토콜을 한 장에 정리", end_col=10, ) items = [ ("formula_id", display(queue_data.get("formula_id"))), ("as_of", display(queue_data.get("as_of"))), ("t20_days_threshold", display(queue_data.get("t20_days_threshold"))), ("records_total", display(metrics.get("records_total"))), ("t20_evaluated_count", display(metrics.get("t20_evaluated_count"))), ("t20_due_capture_count", display(metrics.get("t20_due_capture_count"))), ("missing_due_date_count", display(metrics.get("missing_due_date_count"))), ("all_proposals_have_due_dates", display(queue_data.get("all_proposals_have_due_dates"))), ("queue_count", display(len(queue_rows))), ("todo_count", display(len(todo))), ] add_kpi_block(ws, 4, items) ws["D4"] = "Queue protocol" ws["D4"].fill = SUBHEADER_FILL ws["D4"].font = BOLD_FONT for idx, line in enumerate(todo[:5], start=5): ws.cell(idx, 4).value = line ws["G4"] = "Queue status" ws["G4"].fill = SUBHEADER_FILL ws["G4"].font = BOLD_FONT ws["G5"] = f"current_queue_rows: {display(len(queue_rows))}" ws["G6"] = f"t20_due_capture_count: {display(metrics.get('t20_due_capture_count'))}" ws["G7"] = f"missing_due_date_count: {display(metrics.get('missing_due_date_count'))}" queue_table_rows = [["metric", "value"], ["records_total", metrics.get("records_total") or 0], ["t20_evaluated_count", metrics.get("t20_evaluated_count") or 0], ["t20_due_capture_count", metrics.get("t20_due_capture_count") or 0]] write_table(ws, 17, 1, queue_table_rows[0], queue_table_rows[1:]) chart = BarChart() chart.type = "bar" chart.style = 10 chart.title = "T20 Queue Status" chart.y_axis.title = "Metric" chart.x_axis.title = "Count" chart.height = 6 chart.width = 11 data_ref = Reference(ws, min_col=2, min_row=17, max_row=20) cats = Reference(ws, min_col=1, min_row=18, max_row=20) chart.add_data(data_ref, titles_from_data=True) chart.set_categories(cats) chart.legend = None ws.add_chart(chart, "J17") set_col_widths(ws, {"A": 24, "B": 18, "C": 18, "D": 40, "E": 18, "F": 18, "G": 28, "H": 28, "I": 18, "J": 20, "K": 16}) def build_portfolio_sector_exposure(wb) -> None: daily_headers, daily_rows = extract_sheet_rows(wb, "daily_history") account_headers, account_rows = extract_sheet_rows(wb, "account_snapshot") universe_headers, universe_rows = extract_sheet_rows(wb, "universe") account_dicts = [dict(zip(account_headers, row)) for row in account_rows if any(v not in (None, "") for v in row)] universe_dicts = [dict(zip(universe_headers, row)) for row in universe_rows if any(v not in (None, "") for v in row)] sector_map: dict[str, str] = {} for row in universe_dicts: ticker = str(row.get("Ticker", "") or "").zfill(6) sector = str(row.get("Sector", "") or "").strip() if ticker and sector: sector_map[ticker] = sector latest_capture = "" for row in account_dicts: cap = str(row.get("captured_at", "") or "") if cap and cap >= latest_capture: latest_capture = cap latest_rows = [r for r in account_dicts if str(r.get("captured_at", "") or "") == latest_capture] exposure: dict[str, dict[str, float]] = {} sector_holdings: dict[str, list[dict[str, object]]] = {} for row in latest_rows: ticker = str(row.get("ticker", "") or "").zfill(6) sector = sector_map.get(ticker, "미분류") mv = float(row.get("market_value", 0) or 0) pl = float(row.get("profit_loss", 0) or 0) cost = float(row.get("total_cost", 0) or 0) bucket = exposure.setdefault(sector, {"market_value": 0.0, "profit_loss": 0.0, "cost": 0.0, "count": 0.0}) bucket["market_value"] += mv bucket["profit_loss"] += pl bucket["cost"] += cost bucket["count"] += 1 sector_holdings.setdefault(sector, []).append({ "ticker": ticker, "name": row.get("name", ""), "market_value": mv, "profit_loss": pl, "return_pct": row.get("return_pct", ""), }) total_mv = sum(v["market_value"] for v in exposure.values()) or 1.0 rows = [] for sector, vals in sorted(exposure.items(), key=lambda kv: kv[1]["market_value"], reverse=True): pct = vals["market_value"] / total_mv * 100.0 ret_pct = (vals["profit_loss"] / vals["cost"] * 100.0) if vals["cost"] else 0.0 rows.append([sector, vals["count"], vals["market_value"], pct, vals["profit_loss"], ret_pct]) ws = wb.create_sheet("portfolio_sector_exposure") style_sheet(ws) style_title( ws, "포트폴리오 섹터 노출", "최신 계좌 스냅샷 기준으로 섹터별 보유 시장가치와 손익률을 집계", end_col=8, ) items = [ ("latest_capture", latest_capture), ("sector_count", len(rows)), ("top_sector", rows[0][0] if rows else ""), ("top_sector_weight_pct", rows[0][3] if rows else 0), ("top3_sector_weight_pct", sum(r[3] for r in rows[:3]) if rows else 0), ("total_market_value", total_mv), ] add_kpi_block(ws, 4, items) headers = ["sector", "holding_count", "market_value", "weight_pct", "profit_loss", "return_pct"] write_table(ws, 4, 4, headers, rows) ws["D4"] = "Sector exposure" ws["D4"].fill = SUBHEADER_FILL ws["D4"].font = BOLD_FONT ws.freeze_panes = "A5" ws.column_dimensions["A"].width = 24 ws.column_dimensions["B"].width = 14 ws.column_dimensions["C"].width = 18 ws.column_dimensions["D"].width = 24 ws.column_dimensions["E"].width = 14 ws.column_dimensions["F"].width = 14 ws.column_dimensions["G"].width = 16 ws.column_dimensions["H"].width = 14 chart = BarChart() chart.type = "bar" chart.style = 10 chart.title = "Sector Exposure by Market Value" chart.y_axis.title = "Sector" chart.x_axis.title = "KRW" chart.height = 8 chart.width = 14 data_ref = Reference(ws, min_col=6, min_row=4, max_row=4 + len(rows)) cats = Reference(ws, min_col=4, min_row=5, max_row=4 + len(rows)) chart.add_data(data_ref, titles_from_data=True) chart.set_categories(cats) chart.legend = None ws.add_chart(chart, "J4") detail_rows: list[list[object]] = [] for sector, vals in sorted(exposure.items(), key=lambda kv: kv[1]["market_value"], reverse=True)[:5]: sector_mv = vals["market_value"] or 1.0 holdings = sorted(sector_holdings.get(sector, []), key=lambda item: float(item.get("market_value", 0) or 0), reverse=True)[:3] for idx, holding in enumerate(holdings, start=1): mv = float(holding.get("market_value", 0) or 0) detail_rows.append([ sector if idx == 1 else "", idx, holding.get("ticker", ""), holding.get("name", ""), mv, mv / sector_mv * 100.0, mv / total_mv * 100.0, holding.get("return_pct", ""), ]) ws["A18"] = "Sector top holdings detail" ws["A18"].fill = SUBHEADER_FILL ws["A18"].font = BOLD_FONT write_table( ws, 19, 1, ["sector", "rank_in_sector", "ticker", "name", "market_value", "sector_weight_pct", "portfolio_weight_pct", "return_pct"], detail_rows, ) ws.column_dimensions["I"].width = 18 ws.column_dimensions["J"].width = 18 ws.column_dimensions["K"].width = 18 ws.column_dimensions["L"].width = 18 ws.column_dimensions["M"].width = 18 if detail_rows: detail_chart = BarChart() detail_chart.type = "bar" detail_chart.style = 11 detail_chart.title = "Top Holdings Contribution" detail_chart.y_axis.title = "Holding" detail_chart.x_axis.title = "Portfolio Weight %" detail_chart.height = 8 detail_chart.width = 14 # Use the first 15 rows of the detail table for a readable chart. chart_end_row = 19 + min(len(detail_rows), 15) data_ref2 = Reference(ws, min_col=7, min_row=19, max_row=chart_end_row) cats2 = Reference(ws, min_col=4, min_row=20, max_row=chart_end_row) detail_chart.add_data(data_ref2, titles_from_data=True) detail_chart.set_categories(cats2) detail_chart.legend = None ws.add_chart(detail_chart, "J20") def build_sector_summary(wb, data: dict) -> None: ws = wb.create_sheet("sector_trend_summary") style_sheet(ws) style_title( ws, "섹터 동향 분석 요약", "ETF 프록시, 스마트머니 유입, 수익률, 유동성 경고를 한 장에 요약한 시트", end_col=8, ) summary = data.get("summary") or {} concentration = data.get("concentration") or {} items = [ ("formula_id", data.get("formula_id", "")), ("gate", data.get("gate", "")), ("latest_snapshot_date", data.get("latest_snapshot_date", "")), ("previous_snapshot_date", data.get("previous_snapshot_date", "")), ("sector_count", data.get("sector_count", 0)), ("trend_posture", summary.get("trend_posture", "")), ("rising_count", summary.get("rising_count", 0)), ("fading_count", summary.get("fading_count", 0)), ("stable_count", summary.get("stable_count", 0)), ("etf_proxy_count", summary.get("etf_proxy_count", 0)), ("smart_money_inflow_count", summary.get("smart_money_inflow_count", 0)), ("smart_money_outflow_count", summary.get("smart_money_outflow_count", 0)), ("flow_aligned_count", summary.get("flow_aligned_count", 0)), ("flow_diverging_count", summary.get("flow_diverging_count", 0)), ] add_kpi_block(ws, 4, items) ws["D4"] = "Concentration" ws["D4"].fill = SUBHEADER_FILL ws["D4"].font = BOLD_FONT for idx, (label, value) in enumerate( [ ("top_sector", concentration.get("top_sector", "")), ("top_sector_weight_pct", concentration.get("top_sector_weight_pct", 0)), ("top2_weight_pct", concentration.get("top2_weight_pct", 0)), ("concentration_gate", concentration.get("concentration_gate", "")), ], start=5, ): ws.cell(idx, 4).value = label ws.cell(idx, 4).fill = KPI_LABEL_FILL ws.cell(idx, 4).font = BOLD_FONT ws.cell(idx, 5).value = value ws.cell(idx, 5).fill = KPI_VALUE_FILL top_inflow = summary.get("top_inflow_sectors") or [] outflow = summary.get("outflow_warning_sectors") or [] ws["G4"] = "Top Inflow" ws["G4"].fill = SUBHEADER_FILL ws["G4"].font = BOLD_FONT for i, item in enumerate(top_inflow, start=5): ws.cell(i, 7).value = item ws["H4"] = "Outflow Warning" ws["H4"].fill = SUBHEADER_FILL ws["H4"].font = BOLD_FONT for i, item in enumerate(outflow, start=5): ws.cell(i, 8).value = item ws["A20"] = "Notes" ws["A20"].fill = SUBHEADER_FILL ws["A20"].font = BOLD_FONT ws["A21"] = "섹터별 ETF 프록시를 기준으로 보고, 은행/증권/지주회사는 분리해서 구성비 상위 종목을 증빙해야 합니다. 대표주 모니터는 섹터 기본 3종, 로보틱스 5종 바스켓으로 함께 확인해야 합니다." ws["A21"].alignment = Alignment(wrap_text=True) ws["A22"] = "Universe_Source가 DEFAULT_TEMPLATE인 행은 템플릿이며, 실제 시트 입력으로 전환되어야 provenance가 완성됩니다." ws["A22"].alignment = Alignment(wrap_text=True) ws["A23"] = "다음 세분화 후보는 바이오/제약과 방산/우주처럼 현재 섹터를 더 세밀하게 나누는 방향입니다. 로보틱스는 RISE 현대차고정피지컬AI를 섹터 프록시로 사용하고, 대표주는 해당 ETF의 실제 구성비 상위 5개 종목에서 뽑습니다." ws["A23"].alignment = Alignment(wrap_text=True) chart = LineChart() chart.title = "Average Sector Score / Breadth Trend" chart.y_axis.title = "Score / Count" chart.x_axis.title = "Snapshot" chart.height = 7.5 chart.width = 13 timeline_sheet = wb["sector_trend_timeline"] max_row = timeline_sheet.max_row data_ref = Reference(timeline_sheet, min_col=13, min_row=4, max_row=max_row, max_col=17) cats = Reference(timeline_sheet, min_col=12, min_row=5, max_row=max_row) chart.add_data(data_ref, titles_from_data=True, from_rows=False) chart.set_categories(cats) chart.style = 2 ws.add_chart(chart, "G12") set_col_widths(ws, {"A": 28, "B": 18, "C": 16, "D": 24, "E": 16, "F": 18, "G": 24, "H": 24}) def build_sector_analysis(wb, data: dict) -> None: ws = wb.create_sheet("sector_trend_analysis") style_sheet(ws) style_title( ws, "섹터 동향 분석", "섹터별 ETF 프록시, 대표주 모니터, 스마트머니 유입, 수익률, 유동성 방향을 함께 보는 상세 시트", end_col=18, ) headers = [ "sector", "proxy_ticker", "proxy_name", "proxy_type", "universe_source", "etf_code", "etf_execution_use", "etf_liquidity_score", "etf_liquidity_status", "etf_nav_risk", "proxy_confidence", "rank", "rank_delta_w1", "rank_delta_w2", "sector_score", "score_delta", "sector_ret5d", "sector_ret20d", "etf_return_5d", "etf_return_20d", "sector_etf_ret_gap_5d", "sector_etf_ret_gap_20d", "smart_money_5d_krw_raw", "smart_money_20d_krw_raw", "smart_money_direction", "liquidity_direction", "flow_alignment_state", "momentum_state", "concentration_weight_pct" ] rows = [] for row in data.get("rows") or []: rows.append([row.get(h, "") for h in headers]) write_table(ws, 4, 1, headers, rows) ws.auto_filter.ref = f"A4:{get_column_letter(len(headers))}{4 + len(rows)}" ws.freeze_panes = "A5" for col in ["F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", "AA", "AB"]: ws.column_dimensions[col].width = 16 ws.column_dimensions["C"].width = 18 ws.column_dimensions["A"].width = 16 ws.column_dimensions["B"].width = 12 ws.column_dimensions["D"].width = 12 ws.column_dimensions["E"].width = 12 ws.column_dimensions["J"].width = 14 ws.column_dimensions["P"].width = 12 ws.column_dimensions["Q"].width = 12 ws.column_dimensions["AA"].width = 18 ws.column_dimensions["AB"].width = 18 chart = BarChart() chart.type = "bar" chart.style = 10 chart.title = "Sector 20D Return by Sector" chart.y_axis.title = "Sector" chart.x_axis.title = "20D Return" chart.height = 8 chart.width = 14 data_ref = Reference(ws, min_col=18, min_row=4, max_row=4 + len(rows)) cats = Reference(ws, min_col=1, min_row=5, max_row=4 + len(rows)) chart.add_data(data_ref, titles_from_data=True) chart.set_categories(cats) chart.legend = None ws.add_chart(chart, "AD4") def build_sector_timeline(wb, data: dict, source_data: dict | None = None) -> None: ws = wb.create_sheet("sector_trend_timeline") style_sheet(ws) style_title(ws, "섹터 시계열", "최근 스냅샷 기준 경향성 추세", end_col=10) headers = [ "snapshot_date", "sector_count", "avg_sector_score", "top_sector", "top_sector_score", "positive_breadth_count", "liquidity_warn_count", "net_smart_money_5d_krw", "top_sector_rank", "top_sector_smart_money_5d_krw" ] rows = [] for row in data.get("timeline") or []: parsed_date = row.get("snapshot_date", "") if isinstance(parsed_date, str) and parsed_date: try: parsed_date = datetime.fromisoformat(parsed_date.replace("Z", "+00:00")).date() except Exception: pass rows.append([ parsed_date, row.get("sector_count", ""), row.get("avg_sector_score", ""), row.get("top_sector", ""), row.get("top_sector_score", ""), row.get("positive_breadth_count", ""), row.get("liquidity_warn_count", ""), row.get("net_smart_money_5d_krw", ""), row.get("top_sector_rank", ""), row.get("top_sector_smart_money_5d_krw", ""), ]) write_table(ws, 4, 1, headers, rows) helper_headers = [ "snapshot_date", "avg_sector_score", "top_sector_score", "positive_breadth_count", "liquidity_warn_count", "net_smart_money_5d_krw" ] helper_rows = [] for row in rows: helper_rows.append([row[0], row[2], row[4], row[5], row[6], row[7]]) write_table(ws, 4, 12, helper_headers, helper_rows) ws.freeze_panes = "A5" ws.column_dimensions["A"].width = 14 ws.column_dimensions["B"].width = 12 ws.column_dimensions["C"].width = 14 ws.column_dimensions["D"].width = 16 ws.column_dimensions["E"].width = 14 ws.column_dimensions["F"].width = 16 ws.column_dimensions["G"].width = 16 ws.column_dimensions["H"].width = 18 ws.column_dimensions["I"].width = 14 ws.column_dimensions["J"].width = 18 chart = LineChart() chart.title = "Trend Score / Breadth / Liquidity" chart.y_axis.title = "Count / Score" chart.x_axis.title = "Snapshot" chart.height = 8 chart.width = 15 data_ref = Reference(ws, min_col=13, max_col=17, min_row=4, max_row=4 + len(helper_rows)) cats = Reference(ws, min_col=12, min_row=5, max_row=4 + len(helper_rows)) chart.add_data(data_ref, titles_from_data=True, from_rows=False) chart.set_categories(cats) chart.style = 3 ws.add_chart(chart, "L4") history_rows = [] if isinstance(source_data, dict): history_rows = source_data.get("sector_flow_history") or [] if not history_rows: history_rows = data.get("timeline_history") or data.get("history") or [] if isinstance(history_rows, list) and history_rows: history_by_sector: dict[str, list[dict[str, object]]] = {} for item in history_rows: if not isinstance(item, dict): continue sector = str(item.get("Sector") or "").strip() if not sector: continue history_by_sector.setdefault(sector, []).append(item) top_sectors = [] for row in rows[:3]: if len(row) > 3 and row[3]: top_sectors.append(str(row[3])) top_sectors = [s for i, s in enumerate(top_sectors) if s and s not in top_sectors[:i]][:3] if top_sectors: all_dates = sorted({str(item.get("Snapshot_Date") or "") for item in history_rows if str(item.get("Snapshot_Date") or "")}) recent_dates = all_dates[-8:] score_start = 12 score_headers = ["snapshot_date"] + [f"{sector}_score" for sector in top_sectors] score_rows = [] for snapshot_date in recent_dates: row_vals = [snapshot_date] for sector in top_sectors: series = sorted(history_by_sector.get(sector, []), key=lambda r: str(r.get("Snapshot_Date") or "")) match = next((r for r in series if str(r.get("Snapshot_Date") or "") == snapshot_date), {}) row_vals.append(match.get("Sector_Score", "")) score_rows.append(row_vals) write_table(ws, 4, score_start, score_headers, score_rows) ws.column_dimensions[get_column_letter(score_start)].width = 14 for offset in range(1, len(score_headers)): ws.column_dimensions[get_column_letter(score_start + offset)].width = 16 score_chart = LineChart() score_chart.title = "Top Sector Score Trend" score_chart.y_axis.title = "Score" score_chart.x_axis.title = "Snapshot" score_chart.height = 8 score_chart.width = 15 score_chart.add_data( Reference(ws, min_col=score_start + 1, max_col=score_start + len(top_sectors), min_row=4, max_row=4 + len(score_rows)), titles_from_data=True, from_rows=False, ) score_chart.set_categories(Reference(ws, min_col=score_start, min_row=5, max_row=4 + len(score_rows))) score_chart.style = 2 ws.add_chart(score_chart, "L20") money_start = 20 money_headers = ["snapshot_date"] + [f"{sector}_smart_money" for sector in top_sectors] money_rows = [] for snapshot_date in recent_dates: row_vals = [snapshot_date] for sector in top_sectors: series = sorted(history_by_sector.get(sector, []), key=lambda r: str(r.get("Snapshot_Date") or "")) match = next((r for r in series if str(r.get("Snapshot_Date") or "") == snapshot_date), {}) row_vals.append(match.get("SmartMoney_5D_KRW", "")) money_rows.append(row_vals) write_table(ws, 4, money_start, money_headers, money_rows) ws.column_dimensions[get_column_letter(money_start)].width = 14 for offset in range(1, len(money_headers)): ws.column_dimensions[get_column_letter(money_start + offset)].width = 18 money_chart = LineChart() money_chart.title = "Top Sector Smart Money Trend" money_chart.y_axis.title = "KRW" money_chart.x_axis.title = "Snapshot" money_chart.height = 8 money_chart.width = 15 money_chart.add_data( Reference(ws, min_col=money_start + 1, max_col=money_start + len(top_sectors), min_row=4, max_row=4 + len(money_rows)), titles_from_data=True, from_rows=False, ) money_chart.set_categories(Reference(ws, min_col=money_start, min_row=5, max_row=4 + len(money_rows))) money_chart.style = 3 ws.add_chart(money_chart, "L36") def build_sector_universe_refresh_audit_sheet(wb, source_data: dict) -> None: ws = wb.create_sheet("sector_universe_refresh_audit") style_sheet(ws) style_title( ws, "섹터 월간 갱신 감사", "Naver ETF 페이지 기반 구성종목 갱신 상태와 provenance 분리 상태를 점검하는 감사 시트. AJAX/XHR 전제는 두지 않고 HTML 서버렌더링 테이블을 우선한다.", end_col=16, ) payload = {"data": source_data} audit = build_sector_universe_refresh_audit(payload) summary = audit.get("summary") or {} items = [ ("formula_id", audit.get("formula_id", "")), ("gate", audit.get("gate", "")), ("sector_count", summary.get("sector_count", 0)), ("current_count", summary.get("current_count", 0)), ("due_count", summary.get("due_count", 0)), ("overdue_count", summary.get("overdue_count", 0)), ("layout_changed_count", summary.get("layout_changed_count", 0)), ("missing_count", summary.get("missing_count", 0)), ("template_count", summary.get("template_count", 0)), ("sheet_input_count", summary.get("sheet_input_count", 0)), ("naver_source_count", summary.get("naver_source_count", 0)), ("missing_source_url_count", summary.get("missing_source_url_count", 0)), ("stale_sector_count", summary.get("stale_sector_count", 0)), ] add_kpi_block(ws, 4, items) ws["D4"] = "Refresh policy" ws["D4"].fill = SUBHEADER_FILL ws["D4"].font = BOLD_FONT ws["D5"] = "NAVER_ETF_PAGE rows are the monthly refreshed source." ws["D6"] = "SHEET_INPUT rows are manual/provisional and must stay separate." ws["D7"] = "DEFAULT_TEMPLATE rows are a fail in the monthly gate." ws["D8"] = "Source_URL and Source_AsOf are required for provenance." ws["D9"] = "This is HTML-server-rendered, not AJAX. JS is only a fallback probe for candidate URLs." ws["D10"] = "No guessed holdings are written when the page layout changes." ws["D11"] = "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED is a separate layout-change failure state." ws["D12"] = "Financial sectors are split as 은행 / 증권 / 지주회사 in sector_universe; sector_flow reflects carryover until GAS runDataFeed is rerun." ws["D13"] = "This split is part of the monthly refresh harness; Source_URL and Source_AsOf must remain valid for provenance." rows = audit.get("rows") or [] if rows: headers = [ "sector", "proxy_ticker", "proxy_name", "proxy_type", "source_kind", "transport_mode", "source_url", "source_asof", "age_days", "constituent_count", "stock_count", "etf_count", "weight_sum", "status", "refresh_reason", ] write_table(ws, 14, 1, headers, [[row.get(h, "") for h in headers] for row in rows]) for col, width in { "A": 16, "B": 12, "C": 18, "D": 12, "E": 16, "F": 18, "G": 42, "H": 14, "I": 10, "J": 14, "K": 12, "L": 12, "M": 12, "N": 12, "O": 24, }.items(): ws.column_dimensions[col].width = width ws.freeze_panes = "A5" ws["A11"] = "Notes" ws["A11"].fill = SUBHEADER_FILL ws["A11"].font = BOLD_FONT ws["A12"] = "홈페이지 리뉴얼로 표 구조가 바뀌면, 파서는 추정하지 않고 실패 상태를 남겨 월간 게이트에서 잡는다." ws["A12"].alignment = Alignment(wrap_text=True) def build_etf_summary(wb, data: dict) -> None: ws = wb.create_sheet("etf_representative_summary") style_sheet(ws) style_title( ws, "ETF 대표 종목 요약", "ETF 구성비중 우선, 부족분은 유동성 우선 후보로 보강한 3종목 바스켓 요약", end_col=8, ) summary = data.get("summary") or {} items = [ ("formula_id", data.get("formula_id", "")), ("gate", data.get("gate", "")), ("etf_sector_count", data.get("etf_sector_count", 0)), ("tracked_count", data.get("tracked_count", 0)), ("complete_basket_count", summary.get("complete_basket_count", 0)), ("partial_basket_count", summary.get("partial_basket_count", 0)), ("basket_missing_total", summary.get("basket_missing_total", 0)), ("weighted_basis_count", summary.get("weighted_basis_count", 0)), ("fallback_basis_count", summary.get("fallback_basis_count", 0)), ("selected_sector_count", summary.get("selected_sector_count", 0)), ] add_kpi_block(ws, 4, items) ws["D4"] = "Representative principle" ws["D4"].fill = SUBHEADER_FILL ws["D4"].font = BOLD_FONT ws["D5"] = "1) ETF constituent weight first" ws["D6"] = "2) Missing slots filled with same-sector live candidates" ws["D7"] = "3) Missing data stays explicit as DATA_MISSING" ws["D8"] = "4) Minimum 3 names per sector basket" ws["D9"] = "5) Universe_Source=DEFAULT_TEMPLATE rows are provisional until sheet-backed data exists." ws["G4"] = "Top reps" ws["G4"].fill = SUBHEADER_FILL ws["G4"].font = BOLD_FONT for i, item in enumerate(summary.get("top_rep_names") or [], start=5): ws.cell(i, 7).value = item set_col_widths(ws, {"A": 28, "B": 18, "C": 16, "D": 30, "E": 18, "F": 18, "G": 24, "H": 24}) def build_etf_monitor(wb, data: dict) -> None: ws = wb.create_sheet("etf_representative_monitor") style_sheet(ws) style_title( ws, "ETF 대표 종목 모니터", "섹터별 3종목 바스켓과 선택 근거, 커버리지, 품질 상태를 추적", end_col=18, ) headers = [ "sector", "etf_proxy_ticker", "etf_proxy_name", "etf_proxy_type", "universe_source", "sector_rank", "sector_score", "sector_smart_money_5d_krw", "sector_ret20d", "representative_count", "representative_ticker", "representative_name", "representative_basis", "representative_basis_detail", "constituent_weight", "basket_quality_state", "basket_coverage_pct", "basket_state", "basket_buy_review_count", "basket_track_count", "basket_watch_count", "basket_caution_count", "basket_aligned_count", "basket_missing_count", "basket_real_count", "selection_source", "selection_score", "monitor_reason" ] rows = [] for row in data.get("rows") or []: rows.append([row.get(h, "") for h in headers]) write_table(ws, 4, 1, headers, rows) ws.auto_filter.ref = f"A4:{get_column_letter(len(headers))}{4 + len(rows)}" ws.freeze_panes = "A5" for col, width in {"A": 18, "B": 12, "C": 16, "D": 12, "E": 12, "F": 12, "G": 18, "H": 12, "I": 14, "J": 12, "K": 18, "L": 18, "M": 24, "N": 14, "O": 14, "P": 14, "Q": 14, "R": 14, "S": 14, "T": 14, "U": 14, "V": 14, "W": 14, "X": 18, "Y": 14, "Z": 12, "AA": 24}.items(): ws.column_dimensions[col].width = width chart = BarChart() chart.type = "bar" chart.style = 10 chart.title = "Basket Coverage by Sector" chart.y_axis.title = "Sector" chart.x_axis.title = "Coverage %" chart.height = 8 chart.width = 14 data_ref = Reference(ws, min_col=17, min_row=4, max_row=4 + len(rows)) cats = Reference(ws, min_col=1, min_row=5, max_row=4 + len(rows)) chart.add_data(data_ref, titles_from_data=True) chart.set_categories(cats) chart.legend = None ws.add_chart(chart, "AC4") def main() -> None: if not INPUT_XLSX.exists(): raise FileNotFoundError(INPUT_XLSX) if not SECTOR_JSON.exists(): raise FileNotFoundError(SECTOR_JSON) if not ETF_JSON.exists(): raise FileNotFoundError(ETF_JSON) raw_json_path = ROOT / "GatherTradingData.json" sector = load_json(SECTOR_JSON) etf = load_json(ETF_JSON) raw_data = load_json(raw_json_path) if raw_json_path.exists() else {} raw_source = raw_data.get("data", {}) if isinstance(raw_data.get("data"), dict) else {} wb = load_workbook(INPUT_XLSX) for name in [ "portfolio_performance_summary", "performance_readiness_summary", "operational_eval_queue_summary", "portfolio_sector_exposure", "sector_universe_refresh_audit", "_portfolio_holdings_helper", "sector_trend_summary", "sector_trend_analysis", "sector_trend_timeline", "etf_representative_summary", "etf_representative_monitor", ]: remove_if_exists(wb, name) # Build data sheets first so summary sheets can reference the timeline sheet. build_portfolio_summary(wb) build_performance_readiness_summary(wb) build_operational_eval_queue_summary(wb) build_portfolio_sector_exposure(wb) build_sector_universe_refresh_audit_sheet(wb, raw_source) build_sector_timeline(wb, sector, raw_source) build_sector_analysis(wb, sector) build_sector_summary(wb, sector) build_etf_monitor(wb, etf) build_etf_summary(wb, etf) # Put summary sheets near the front. order = [ "settings", "portfolio_performance_summary", "performance_readiness_summary", "operational_eval_queue_summary", "portfolio_sector_exposure", "sector_universe_refresh_audit", "sector_trend_summary", "sector_trend_analysis", "sector_trend_timeline", "etf_representative_summary", "etf_representative_monitor", ] existing = [s for s in wb.sheetnames if s not in order] wb._sheets = [wb[s] for s in order if s in wb.sheetnames] + [wb[s] for s in existing] if "_portfolio_holdings_helper" in wb.sheetnames: wb["_portfolio_holdings_helper"].sheet_state = "hidden" wb.active = wb.sheetnames.index("sector_trend_summary") OUTPUT_DIR.mkdir(parents=True, exist_ok=True) wb.save(OUTPUT_XLSX) print(f"saved {OUTPUT_XLSX}") print("sheets", wb.sheetnames[:10]) if __name__ == "__main__": main()