1045 lines
46 KiB
Python
1045 lines
46 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from openpyxl import load_workbook
|
|
from openpyxl.chart import BarChart, LineChart, Reference
|
|
from openpyxl.styles import Font, PatternFill, Alignment
|
|
from openpyxl.utils import get_column_letter
|
|
|
|
import sys
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from src.quant_engine.sector_universe_refresh import build_sector_universe_refresh_audit
|
|
|
|
INPUT_XLSX = ROOT / "GatherTradingData.xlsx"
|
|
OUTPUT_DIR = ROOT / "outputs" / "sector_insights_enhanced"
|
|
OUTPUT_XLSX = OUTPUT_DIR / "GatherTradingData_sector_insights.xlsx"
|
|
SECTOR_JSON = ROOT / "Temp" / "sector_trend_analysis_v1.json"
|
|
ETF_JSON = ROOT / "Temp" / "etf_representative_monitor_v1.json"
|
|
READINESS_JSON = ROOT / "Temp" / "operational_alpha_calibration_v2.json"
|
|
READINESS_BRIDGE_JSON = ROOT / "Temp" / "performance_readiness_replay_bridge_v1.json"
|
|
READINESS_BRIDGE_V2_JSON = ROOT / "Temp" / "performance_readiness_replay_bridge_v2.json"
|
|
EVAL_QUEUE_JSON = ROOT / "Temp" / "operational_eval_queue_v1.json"
|
|
|
|
|
|
HEADER_FILL = PatternFill("solid", fgColor="1F4E78")
|
|
SUBHEADER_FILL = PatternFill("solid", fgColor="D9EAF7")
|
|
KPI_FILL = PatternFill("solid", fgColor="F3F7FB")
|
|
KPI_LABEL_FILL = PatternFill("solid", fgColor="E2F0D9")
|
|
KPI_VALUE_FILL = PatternFill("solid", fgColor="FFF2CC")
|
|
WHITE_FONT = Font(color="FFFFFF", bold=True)
|
|
BOLD_FONT = Font(bold=True)
|
|
TITLE_FONT = Font(size=14, bold=True)
|
|
NOTE_FONT = Font(italic=True, color="666666")
|
|
|
|
|
|
def load_json(path: Path) -> dict:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def remove_if_exists(wb, name: str) -> None:
|
|
if name in wb.sheetnames:
|
|
del wb[name]
|
|
|
|
|
|
def style_title(ws, title: str, subtitle: str | None = None, end_col: int = 8) -> None:
|
|
ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=end_col)
|
|
ws["A1"] = title
|
|
ws["A1"].font = TITLE_FONT
|
|
ws["A1"].fill = HEADER_FILL
|
|
ws["A1"].font = WHITE_FONT
|
|
ws["A1"].alignment = Alignment(horizontal="left")
|
|
if subtitle:
|
|
ws.merge_cells(start_row=2, start_column=1, end_row=2, end_column=end_col)
|
|
ws["A2"] = subtitle
|
|
ws["A2"].font = NOTE_FONT
|
|
|
|
|
|
def write_table(ws, start_row: int, start_col: int, headers: list[str], rows: list[list], header_fill=HEADER_FILL) -> int:
|
|
for j, header in enumerate(headers, start=start_col):
|
|
cell = ws.cell(start_row, j)
|
|
cell.value = header
|
|
cell.font = WHITE_FONT
|
|
cell.fill = header_fill
|
|
cell.alignment = Alignment(horizontal="center", vertical="center")
|
|
for i, row in enumerate(rows, start=start_row + 1):
|
|
for j, value in enumerate(row, start=start_col):
|
|
cell = ws.cell(i, j)
|
|
cell.value = value
|
|
cell.alignment = Alignment(vertical="top")
|
|
return start_row + len(rows)
|
|
|
|
|
|
def add_kpi_block(ws, start_row: int, items: list[tuple[str, object]]) -> int:
|
|
ws.cell(start_row, 1).value = "KPI"
|
|
ws.cell(start_row, 1).fill = SUBHEADER_FILL
|
|
ws.cell(start_row, 1).font = BOLD_FONT
|
|
row = start_row + 1
|
|
for label, value in items:
|
|
ws.cell(row, 1).value = label
|
|
ws.cell(row, 1).fill = KPI_LABEL_FILL
|
|
ws.cell(row, 1).font = BOLD_FONT
|
|
ws.cell(row, 2).value = value
|
|
ws.cell(row, 2).fill = KPI_VALUE_FILL
|
|
row += 1
|
|
return row
|
|
|
|
|
|
def set_col_widths(ws, widths: dict[str, int]) -> None:
|
|
for col, width in widths.items():
|
|
ws.column_dimensions[col].width = width
|
|
|
|
|
|
def style_sheet(ws) -> None:
|
|
ws.freeze_panes = "A3"
|
|
ws.sheet_view.showGridLines = False
|
|
|
|
|
|
def extract_sheet_rows(wb, sheet_name: str) -> tuple[list[str], list[list]]:
|
|
ws = wb[sheet_name]
|
|
first_row = [ws.cell(1, c).value for c in range(1, ws.max_column + 1)]
|
|
second_row = [ws.cell(2, c).value for c in range(1, ws.max_column + 1)] if ws.max_row >= 2 else []
|
|
use_second_row = (
|
|
bool(second_row)
|
|
and not any(v not in (None, "") for v in first_row)
|
|
and any(v not in (None, "") for v in second_row)
|
|
)
|
|
headers = second_row if use_second_row else first_row
|
|
start_row = 3 if use_second_row else 2
|
|
rows: list[list] = []
|
|
for r in range(start_row, ws.max_row + 1):
|
|
row = [ws.cell(r, c).value for c in range(1, ws.max_column + 1)]
|
|
if any(v is not None and v != "" for v in row):
|
|
rows.append(row)
|
|
return headers, rows
|
|
|
|
|
|
def build_portfolio_summary(wb) -> None:
|
|
def display(value):
|
|
return value if value not in (None, "") else "DATA_MISSING — 하네스 업데이트 필요"
|
|
|
|
daily_headers, daily_rows = extract_sheet_rows(wb, "daily_history")
|
|
monthly_headers, monthly_rows = extract_sheet_rows(wb, "monthly_history")
|
|
account_headers, account_rows = extract_sheet_rows(wb, "account_snapshot")
|
|
|
|
ws = wb.create_sheet("portfolio_performance_summary")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"포트폴리오 성과 요약",
|
|
"내 자금의 일간/월간 추이와 최신 보유 비중을 함께 보는 요약 시트",
|
|
end_col=10,
|
|
)
|
|
|
|
latest_daily = daily_rows[-1] if daily_rows else []
|
|
latest_month = monthly_rows[-1] if monthly_rows else []
|
|
latest_total_asset = latest_daily[1] if len(latest_daily) > 1 else None
|
|
latest_peak_asset = latest_daily[2] if len(latest_daily) > 2 else None
|
|
latest_mdd = latest_daily[3] if len(latest_daily) > 3 else None
|
|
latest_month_total = latest_month[1] if len(latest_month) > 1 else None
|
|
latest_month_return = latest_month[8] if len(latest_month) > 8 else None
|
|
latest_ytd_return = latest_month[10] if len(latest_month) > 10 else None
|
|
|
|
latest_capture = None
|
|
if account_rows:
|
|
latest_capture = account_rows[0][0]
|
|
for row in account_rows:
|
|
if row and row[0] and row[0] > latest_capture:
|
|
latest_capture = row[0]
|
|
|
|
latest_holdings = [r for r in account_rows if r and r[0] == latest_capture]
|
|
holdings_sorted = sorted(
|
|
latest_holdings,
|
|
key=lambda r: (r[10] if len(r) > 10 and isinstance(r[10], (int, float)) else 0),
|
|
reverse=True,
|
|
)
|
|
total_mv = sum(r[10] for r in holdings_sorted if len(r) > 10 and isinstance(r[10], (int, float)))
|
|
total_cost = sum(r[8] for r in holdings_sorted if len(r) > 8 and isinstance(r[8], (int, float)))
|
|
total_pl = sum(r[11] for r in holdings_sorted if len(r) > 11 and isinstance(r[11], (int, float)))
|
|
|
|
items = [
|
|
("latest_daily_asset", display(latest_total_asset)),
|
|
("latest_peak_asset", display(latest_peak_asset)),
|
|
("latest_daily_mdd_pct", display(latest_mdd)),
|
|
("latest_month_total_asset", display(latest_month_total)),
|
|
("latest_month_return_pct", display(latest_month_return)),
|
|
("latest_ytd_return_pct", display(latest_ytd_return)),
|
|
("latest_capture", display(latest_capture)),
|
|
("latest_holdings_count", display(len(latest_holdings))),
|
|
("latest_holdings_market_value", display(total_mv)),
|
|
("latest_holdings_profit_loss", display(total_pl)),
|
|
]
|
|
add_kpi_block(ws, 4, items)
|
|
|
|
ws["D4"] = "Portfolio view"
|
|
ws["D4"].fill = SUBHEADER_FILL
|
|
ws["D4"].font = BOLD_FONT
|
|
ws["D5"] = "일간/월간 자산 추이는 실제 계좌 스냅샷 기반입니다."
|
|
ws["D6"] = "보유 비중 차트는 최신 스냅샷의 시장가치 기준입니다."
|
|
ws["D7"] = "수익률이 음수여도 숨기지 않고 그대로 보여줍니다."
|
|
|
|
ws["G4"] = "Top holdings"
|
|
ws["G4"].fill = SUBHEADER_FILL
|
|
ws["G4"].font = BOLD_FONT
|
|
for i, row in enumerate(holdings_sorted[:10], start=5):
|
|
name = row[4] if len(row) > 4 else ""
|
|
mv = row[10] if len(row) > 10 else ""
|
|
ws.cell(i, 7).value = f"{name} ({mv})"
|
|
|
|
# Daily history chart helper
|
|
daily_sheet = wb["daily_history"]
|
|
daily_max = daily_sheet.max_row
|
|
daily_chart = LineChart()
|
|
daily_chart.title = "Daily Asset / MDD"
|
|
daily_chart.y_axis.title = "KRW / %"
|
|
daily_chart.x_axis.title = "Date"
|
|
daily_chart.height = 7
|
|
daily_chart.width = 13
|
|
daily_data = Reference(daily_sheet, min_col=2, max_col=4, min_row=1, max_row=daily_max)
|
|
daily_cats = Reference(daily_sheet, min_col=1, min_row=2, max_row=daily_max)
|
|
daily_chart.add_data(daily_data, titles_from_data=True, from_rows=False)
|
|
daily_chart.set_categories(daily_cats)
|
|
daily_chart.style = 2
|
|
ws.add_chart(daily_chart, "A13")
|
|
|
|
# Monthly history chart
|
|
monthly_sheet = wb["monthly_history"]
|
|
monthly_max = monthly_sheet.max_row
|
|
monthly_chart = LineChart()
|
|
monthly_chart.title = "Monthly Return Trend"
|
|
monthly_chart.y_axis.title = "%"
|
|
monthly_chart.x_axis.title = "Month"
|
|
monthly_chart.height = 7
|
|
monthly_chart.width = 13
|
|
monthly_data = Reference(monthly_sheet, min_col=8, max_col=11, min_row=1, max_row=monthly_max)
|
|
monthly_cats = Reference(monthly_sheet, min_col=1, min_row=2, max_row=monthly_max)
|
|
monthly_chart.add_data(monthly_data, titles_from_data=True, from_rows=False)
|
|
monthly_chart.set_categories(monthly_cats)
|
|
monthly_chart.style = 3
|
|
ws.add_chart(monthly_chart, "G13")
|
|
|
|
# Top holdings bar chart
|
|
hold_chart = BarChart()
|
|
hold_chart.type = "bar"
|
|
hold_chart.title = "Top Holdings by Market Value"
|
|
hold_chart.y_axis.title = "Holding"
|
|
hold_chart.x_axis.title = "KRW"
|
|
hold_chart.height = 8
|
|
hold_chart.width = 13
|
|
ws_hold = wb.create_sheet("_portfolio_holdings_helper")
|
|
helper_headers = ["name", "market_value"]
|
|
helper_rows = [[r[4], r[10]] for r in holdings_sorted[:10] if len(r) > 10]
|
|
write_table(ws_hold, 1, 1, helper_headers, helper_rows)
|
|
hold_data = Reference(ws_hold, min_col=2, min_row=1, max_row=1 + len(helper_rows))
|
|
hold_cats = Reference(ws_hold, min_col=1, min_row=2, max_row=1 + len(helper_rows))
|
|
hold_chart.add_data(hold_data, titles_from_data=True)
|
|
hold_chart.set_categories(hold_cats)
|
|
hold_chart.legend = None
|
|
ws.add_chart(hold_chart, "A30")
|
|
|
|
set_col_widths(ws, {"A": 22, "B": 18, "C": 18, "D": 24, "E": 18, "F": 18, "G": 26, "H": 26, "I": 18, "J": 18})
|
|
|
|
|
|
def build_performance_readiness_summary(wb) -> None:
|
|
def display(value):
|
|
return value if value not in (None, "") else "DATA_MISSING — 하네스 업데이트 필요"
|
|
|
|
readiness = load_json(READINESS_JSON) if READINESS_JSON.exists() else {}
|
|
bridge = load_json(READINESS_BRIDGE_JSON) if READINESS_BRIDGE_JSON.exists() else {}
|
|
bridge_v2 = load_json(READINESS_BRIDGE_V2_JSON) if READINESS_BRIDGE_V2_JSON.exists() else {}
|
|
live = bridge.get("live", {}) if isinstance(bridge.get("live"), dict) else {}
|
|
replay = bridge.get("replay_informational", {}) if isinstance(bridge.get("replay_informational"), dict) else {}
|
|
metrics = readiness.get("metrics", {}) if isinstance(readiness.get("metrics"), dict) else {}
|
|
|
|
ws = wb.create_sheet("performance_readiness_summary")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"성과 준비도 요약",
|
|
"CHECK_83의 live T+20 데이터 게이트와 replay 브리지를 함께 보여주는 상태 시트",
|
|
end_col=10,
|
|
)
|
|
|
|
items = [
|
|
("check_83_gate", display(readiness.get("gate"))),
|
|
("confidence_score", display(readiness.get("confidence_score"))),
|
|
("performance_ready", display(readiness.get("performance_ready"))),
|
|
("readiness_reasons", display(", ".join(readiness.get("readiness_reasons", [])) if isinstance(readiness.get("readiness_reasons"), list) else readiness.get("readiness_reasons"))),
|
|
("outcome_quality_score", display(metrics.get("outcome_quality_score"))),
|
|
("t20_operational_sample", display(metrics.get("t20_operational_sample"))),
|
|
("t5_operational_pass_rate", display(metrics.get("t5_operational_pass_rate"))),
|
|
("value_damage_pct_avg", display(metrics.get("value_damage_pct_avg"))),
|
|
("live_t20_count", display(live.get("t20_count"))),
|
|
("live_sample_gate", display(live.get("sample_gate"))),
|
|
]
|
|
add_kpi_block(ws, 4, items)
|
|
|
|
ws["D4"] = "Readiness rule"
|
|
ws["D4"].fill = SUBHEADER_FILL
|
|
ws["D4"].font = BOLD_FONT
|
|
ws["D5"] = "live T+20가 30건 미만이면 PERFORMANCE_READY로 승격하지 않습니다."
|
|
ws["D6"] = f"현재 상태: {readiness.get('gate', 'MISSING')}"
|
|
ws["D7"] = f"브리지 승격 규칙: {bridge_v2.get('promotion_rule', 'DATA_MISSING — 하네스 업데이트 필요')}"
|
|
ws["D8"] = f"브리지 승격 가능: {bridge_v2.get('promotion_allowed', 'DATA_MISSING — 하네스 업데이트 필요')}"
|
|
|
|
ws["G4"] = "Replay reference"
|
|
ws["G4"].fill = SUBHEADER_FILL
|
|
ws["G4"].font = BOLD_FONT
|
|
ws["G5"] = f"replay_t20_count: {display(replay.get('t20_count'))}"
|
|
ws["G6"] = f"replay_t20_pass_rate_pct: {display(replay.get('t20_pass_rate_pct'))}"
|
|
ws["G7"] = f"replay_t20_avg_return_pct: {display(replay.get('t20_avg_return_pct'))}"
|
|
ws["G8"] = f"replay_note: {display(replay.get('note'))}"
|
|
|
|
readiness_rows = [
|
|
["metric", "count"],
|
|
["live_t20_count", live.get("t20_count") or 0],
|
|
["required_live_t20_count", 30],
|
|
["replay_t20_count", replay.get("t20_count") or 0],
|
|
]
|
|
write_table(ws, 4, 10, readiness_rows[0], readiness_rows[1:])
|
|
|
|
readiness_chart = BarChart()
|
|
readiness_chart.type = "bar"
|
|
readiness_chart.style = 10
|
|
readiness_chart.title = "T20 Readiness vs Threshold"
|
|
readiness_chart.y_axis.title = "Metric"
|
|
readiness_chart.x_axis.title = "Count"
|
|
readiness_chart.height = 6.5
|
|
readiness_chart.width = 11
|
|
readiness_data = Reference(ws, min_col=11, min_row=4, max_row=7)
|
|
readiness_cats = Reference(ws, min_col=10, min_row=5, max_row=7)
|
|
readiness_chart.add_data(readiness_data, titles_from_data=True)
|
|
readiness_chart.set_categories(readiness_cats)
|
|
readiness_chart.legend = None
|
|
ws.add_chart(readiness_chart, "J13")
|
|
|
|
set_col_widths(ws, {"A": 24, "B": 18, "C": 18, "D": 28, "E": 18, "F": 18, "G": 26, "H": 26, "I": 18, "J": 20, "K": 16})
|
|
|
|
|
|
def build_operational_eval_queue_summary(wb) -> None:
|
|
def display(value):
|
|
return value if value not in (None, "") else "DATA_MISSING — 하네스 업데이트 필요"
|
|
|
|
queue_data = load_json(EVAL_QUEUE_JSON) if EVAL_QUEUE_JSON.exists() else {}
|
|
metrics = queue_data.get("metrics", {}) if isinstance(queue_data.get("metrics"), dict) else {}
|
|
queue_rows = queue_data.get("queue", []) if isinstance(queue_data.get("queue"), list) else []
|
|
todo = queue_data.get("todo_protocol", []) if isinstance(queue_data.get("todo_protocol"), list) else []
|
|
|
|
ws = wb.create_sheet("operational_eval_queue_summary")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"운영 T+20 대기열 요약",
|
|
"T+20 실제 결과 입력 대기 상태와 처리 프로토콜을 한 장에 정리",
|
|
end_col=10,
|
|
)
|
|
|
|
items = [
|
|
("formula_id", display(queue_data.get("formula_id"))),
|
|
("as_of", display(queue_data.get("as_of"))),
|
|
("t20_days_threshold", display(queue_data.get("t20_days_threshold"))),
|
|
("records_total", display(metrics.get("records_total"))),
|
|
("t20_evaluated_count", display(metrics.get("t20_evaluated_count"))),
|
|
("t20_due_capture_count", display(metrics.get("t20_due_capture_count"))),
|
|
("missing_due_date_count", display(metrics.get("missing_due_date_count"))),
|
|
("all_proposals_have_due_dates", display(queue_data.get("all_proposals_have_due_dates"))),
|
|
("queue_count", display(len(queue_rows))),
|
|
("todo_count", display(len(todo))),
|
|
]
|
|
add_kpi_block(ws, 4, items)
|
|
|
|
ws["D4"] = "Queue protocol"
|
|
ws["D4"].fill = SUBHEADER_FILL
|
|
ws["D4"].font = BOLD_FONT
|
|
for idx, line in enumerate(todo[:5], start=5):
|
|
ws.cell(idx, 4).value = line
|
|
|
|
ws["G4"] = "Queue status"
|
|
ws["G4"].fill = SUBHEADER_FILL
|
|
ws["G4"].font = BOLD_FONT
|
|
ws["G5"] = f"current_queue_rows: {display(len(queue_rows))}"
|
|
ws["G6"] = f"t20_due_capture_count: {display(metrics.get('t20_due_capture_count'))}"
|
|
ws["G7"] = f"missing_due_date_count: {display(metrics.get('missing_due_date_count'))}"
|
|
|
|
queue_table_rows = [["metric", "value"], ["records_total", metrics.get("records_total") or 0], ["t20_evaluated_count", metrics.get("t20_evaluated_count") or 0], ["t20_due_capture_count", metrics.get("t20_due_capture_count") or 0]]
|
|
write_table(ws, 17, 1, queue_table_rows[0], queue_table_rows[1:])
|
|
|
|
chart = BarChart()
|
|
chart.type = "bar"
|
|
chart.style = 10
|
|
chart.title = "T20 Queue Status"
|
|
chart.y_axis.title = "Metric"
|
|
chart.x_axis.title = "Count"
|
|
chart.height = 6
|
|
chart.width = 11
|
|
data_ref = Reference(ws, min_col=2, min_row=17, max_row=20)
|
|
cats = Reference(ws, min_col=1, min_row=18, max_row=20)
|
|
chart.add_data(data_ref, titles_from_data=True)
|
|
chart.set_categories(cats)
|
|
chart.legend = None
|
|
ws.add_chart(chart, "J17")
|
|
|
|
set_col_widths(ws, {"A": 24, "B": 18, "C": 18, "D": 40, "E": 18, "F": 18, "G": 28, "H": 28, "I": 18, "J": 20, "K": 16})
|
|
|
|
|
|
def build_portfolio_sector_exposure(wb) -> None:
|
|
daily_headers, daily_rows = extract_sheet_rows(wb, "daily_history")
|
|
account_headers, account_rows = extract_sheet_rows(wb, "account_snapshot")
|
|
universe_headers, universe_rows = extract_sheet_rows(wb, "universe")
|
|
|
|
account_dicts = [dict(zip(account_headers, row)) for row in account_rows if any(v not in (None, "") for v in row)]
|
|
universe_dicts = [dict(zip(universe_headers, row)) for row in universe_rows if any(v not in (None, "") for v in row)]
|
|
|
|
sector_map: dict[str, str] = {}
|
|
for row in universe_dicts:
|
|
ticker = str(row.get("Ticker", "") or "").zfill(6)
|
|
sector = str(row.get("Sector", "") or "").strip()
|
|
if ticker and sector:
|
|
sector_map[ticker] = sector
|
|
|
|
latest_capture = ""
|
|
for row in account_dicts:
|
|
cap = str(row.get("captured_at", "") or "")
|
|
if cap and cap >= latest_capture:
|
|
latest_capture = cap
|
|
latest_rows = [r for r in account_dicts if str(r.get("captured_at", "") or "") == latest_capture]
|
|
|
|
exposure: dict[str, dict[str, float]] = {}
|
|
sector_holdings: dict[str, list[dict[str, object]]] = {}
|
|
for row in latest_rows:
|
|
ticker = str(row.get("ticker", "") or "").zfill(6)
|
|
sector = sector_map.get(ticker, "미분류")
|
|
mv = float(row.get("market_value", 0) or 0)
|
|
pl = float(row.get("profit_loss", 0) or 0)
|
|
cost = float(row.get("total_cost", 0) or 0)
|
|
bucket = exposure.setdefault(sector, {"market_value": 0.0, "profit_loss": 0.0, "cost": 0.0, "count": 0.0})
|
|
bucket["market_value"] += mv
|
|
bucket["profit_loss"] += pl
|
|
bucket["cost"] += cost
|
|
bucket["count"] += 1
|
|
sector_holdings.setdefault(sector, []).append({
|
|
"ticker": ticker,
|
|
"name": row.get("name", ""),
|
|
"market_value": mv,
|
|
"profit_loss": pl,
|
|
"return_pct": row.get("return_pct", ""),
|
|
})
|
|
|
|
total_mv = sum(v["market_value"] for v in exposure.values()) or 1.0
|
|
rows = []
|
|
for sector, vals in sorted(exposure.items(), key=lambda kv: kv[1]["market_value"], reverse=True):
|
|
pct = vals["market_value"] / total_mv * 100.0
|
|
ret_pct = (vals["profit_loss"] / vals["cost"] * 100.0) if vals["cost"] else 0.0
|
|
rows.append([sector, vals["count"], vals["market_value"], pct, vals["profit_loss"], ret_pct])
|
|
|
|
ws = wb.create_sheet("portfolio_sector_exposure")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"포트폴리오 섹터 노출",
|
|
"최신 계좌 스냅샷 기준으로 섹터별 보유 시장가치와 손익률을 집계",
|
|
end_col=8,
|
|
)
|
|
items = [
|
|
("latest_capture", latest_capture),
|
|
("sector_count", len(rows)),
|
|
("top_sector", rows[0][0] if rows else ""),
|
|
("top_sector_weight_pct", rows[0][3] if rows else 0),
|
|
("top3_sector_weight_pct", sum(r[3] for r in rows[:3]) if rows else 0),
|
|
("total_market_value", total_mv),
|
|
]
|
|
add_kpi_block(ws, 4, items)
|
|
headers = ["sector", "holding_count", "market_value", "weight_pct", "profit_loss", "return_pct"]
|
|
write_table(ws, 4, 4, headers, rows)
|
|
ws["D4"] = "Sector exposure"
|
|
ws["D4"].fill = SUBHEADER_FILL
|
|
ws["D4"].font = BOLD_FONT
|
|
ws.freeze_panes = "A5"
|
|
ws.column_dimensions["A"].width = 24
|
|
ws.column_dimensions["B"].width = 14
|
|
ws.column_dimensions["C"].width = 18
|
|
ws.column_dimensions["D"].width = 24
|
|
ws.column_dimensions["E"].width = 14
|
|
ws.column_dimensions["F"].width = 14
|
|
ws.column_dimensions["G"].width = 16
|
|
ws.column_dimensions["H"].width = 14
|
|
|
|
chart = BarChart()
|
|
chart.type = "bar"
|
|
chart.style = 10
|
|
chart.title = "Sector Exposure by Market Value"
|
|
chart.y_axis.title = "Sector"
|
|
chart.x_axis.title = "KRW"
|
|
chart.height = 8
|
|
chart.width = 14
|
|
data_ref = Reference(ws, min_col=6, min_row=4, max_row=4 + len(rows))
|
|
cats = Reference(ws, min_col=4, min_row=5, max_row=4 + len(rows))
|
|
chart.add_data(data_ref, titles_from_data=True)
|
|
chart.set_categories(cats)
|
|
chart.legend = None
|
|
ws.add_chart(chart, "J4")
|
|
|
|
detail_rows: list[list[object]] = []
|
|
for sector, vals in sorted(exposure.items(), key=lambda kv: kv[1]["market_value"], reverse=True)[:5]:
|
|
sector_mv = vals["market_value"] or 1.0
|
|
holdings = sorted(sector_holdings.get(sector, []), key=lambda item: float(item.get("market_value", 0) or 0), reverse=True)[:3]
|
|
for idx, holding in enumerate(holdings, start=1):
|
|
mv = float(holding.get("market_value", 0) or 0)
|
|
detail_rows.append([
|
|
sector if idx == 1 else "",
|
|
idx,
|
|
holding.get("ticker", ""),
|
|
holding.get("name", ""),
|
|
mv,
|
|
mv / sector_mv * 100.0,
|
|
mv / total_mv * 100.0,
|
|
holding.get("return_pct", ""),
|
|
])
|
|
|
|
ws["A18"] = "Sector top holdings detail"
|
|
ws["A18"].fill = SUBHEADER_FILL
|
|
ws["A18"].font = BOLD_FONT
|
|
write_table(
|
|
ws,
|
|
19,
|
|
1,
|
|
["sector", "rank_in_sector", "ticker", "name", "market_value", "sector_weight_pct", "portfolio_weight_pct", "return_pct"],
|
|
detail_rows,
|
|
)
|
|
ws.column_dimensions["I"].width = 18
|
|
ws.column_dimensions["J"].width = 18
|
|
ws.column_dimensions["K"].width = 18
|
|
ws.column_dimensions["L"].width = 18
|
|
ws.column_dimensions["M"].width = 18
|
|
|
|
if detail_rows:
|
|
detail_chart = BarChart()
|
|
detail_chart.type = "bar"
|
|
detail_chart.style = 11
|
|
detail_chart.title = "Top Holdings Contribution"
|
|
detail_chart.y_axis.title = "Holding"
|
|
detail_chart.x_axis.title = "Portfolio Weight %"
|
|
detail_chart.height = 8
|
|
detail_chart.width = 14
|
|
# Use the first 15 rows of the detail table for a readable chart.
|
|
chart_end_row = 19 + min(len(detail_rows), 15)
|
|
data_ref2 = Reference(ws, min_col=7, min_row=19, max_row=chart_end_row)
|
|
cats2 = Reference(ws, min_col=4, min_row=20, max_row=chart_end_row)
|
|
detail_chart.add_data(data_ref2, titles_from_data=True)
|
|
detail_chart.set_categories(cats2)
|
|
detail_chart.legend = None
|
|
ws.add_chart(detail_chart, "J20")
|
|
|
|
|
|
def build_sector_summary(wb, data: dict) -> None:
|
|
ws = wb.create_sheet("sector_trend_summary")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"섹터 동향 분석 요약",
|
|
"ETF 프록시, 스마트머니 유입, 수익률, 유동성 경고를 한 장에 요약한 시트",
|
|
end_col=8,
|
|
)
|
|
summary = data.get("summary") or {}
|
|
concentration = data.get("concentration") or {}
|
|
items = [
|
|
("formula_id", data.get("formula_id", "")),
|
|
("gate", data.get("gate", "")),
|
|
("latest_snapshot_date", data.get("latest_snapshot_date", "")),
|
|
("previous_snapshot_date", data.get("previous_snapshot_date", "")),
|
|
("sector_count", data.get("sector_count", 0)),
|
|
("trend_posture", summary.get("trend_posture", "")),
|
|
("rising_count", summary.get("rising_count", 0)),
|
|
("fading_count", summary.get("fading_count", 0)),
|
|
("stable_count", summary.get("stable_count", 0)),
|
|
("etf_proxy_count", summary.get("etf_proxy_count", 0)),
|
|
("smart_money_inflow_count", summary.get("smart_money_inflow_count", 0)),
|
|
("smart_money_outflow_count", summary.get("smart_money_outflow_count", 0)),
|
|
("flow_aligned_count", summary.get("flow_aligned_count", 0)),
|
|
("flow_diverging_count", summary.get("flow_diverging_count", 0)),
|
|
]
|
|
add_kpi_block(ws, 4, items)
|
|
ws["D4"] = "Concentration"
|
|
ws["D4"].fill = SUBHEADER_FILL
|
|
ws["D4"].font = BOLD_FONT
|
|
for idx, (label, value) in enumerate(
|
|
[
|
|
("top_sector", concentration.get("top_sector", "")),
|
|
("top_sector_weight_pct", concentration.get("top_sector_weight_pct", 0)),
|
|
("top2_weight_pct", concentration.get("top2_weight_pct", 0)),
|
|
("concentration_gate", concentration.get("concentration_gate", "")),
|
|
],
|
|
start=5,
|
|
):
|
|
ws.cell(idx, 4).value = label
|
|
ws.cell(idx, 4).fill = KPI_LABEL_FILL
|
|
ws.cell(idx, 4).font = BOLD_FONT
|
|
ws.cell(idx, 5).value = value
|
|
ws.cell(idx, 5).fill = KPI_VALUE_FILL
|
|
|
|
top_inflow = summary.get("top_inflow_sectors") or []
|
|
outflow = summary.get("outflow_warning_sectors") or []
|
|
ws["G4"] = "Top Inflow"
|
|
ws["G4"].fill = SUBHEADER_FILL
|
|
ws["G4"].font = BOLD_FONT
|
|
for i, item in enumerate(top_inflow, start=5):
|
|
ws.cell(i, 7).value = item
|
|
ws["H4"] = "Outflow Warning"
|
|
ws["H4"].fill = SUBHEADER_FILL
|
|
ws["H4"].font = BOLD_FONT
|
|
for i, item in enumerate(outflow, start=5):
|
|
ws.cell(i, 8).value = item
|
|
|
|
ws["A20"] = "Notes"
|
|
ws["A20"].fill = SUBHEADER_FILL
|
|
ws["A20"].font = BOLD_FONT
|
|
ws["A21"] = "섹터별 ETF 프록시를 기준으로 보고, 은행/증권/지주회사는 분리해서 구성비 상위 종목을 증빙해야 합니다. 대표주 모니터는 섹터 기본 3종, 로보틱스 5종 바스켓으로 함께 확인해야 합니다."
|
|
ws["A21"].alignment = Alignment(wrap_text=True)
|
|
ws["A22"] = "Universe_Source가 DEFAULT_TEMPLATE인 행은 템플릿이며, 실제 시트 입력으로 전환되어야 provenance가 완성됩니다."
|
|
ws["A22"].alignment = Alignment(wrap_text=True)
|
|
ws["A23"] = "다음 세분화 후보는 바이오/제약과 방산/우주처럼 현재 섹터를 더 세밀하게 나누는 방향입니다. 로보틱스는 RISE 현대차고정피지컬AI를 섹터 프록시로 사용하고, 대표주는 해당 ETF의 실제 구성비 상위 5개 종목에서 뽑습니다."
|
|
ws["A23"].alignment = Alignment(wrap_text=True)
|
|
|
|
chart = LineChart()
|
|
chart.title = "Average Sector Score / Breadth Trend"
|
|
chart.y_axis.title = "Score / Count"
|
|
chart.x_axis.title = "Snapshot"
|
|
chart.height = 7.5
|
|
chart.width = 13
|
|
timeline_sheet = wb["sector_trend_timeline"]
|
|
max_row = timeline_sheet.max_row
|
|
data_ref = Reference(timeline_sheet, min_col=13, min_row=4, max_row=max_row, max_col=17)
|
|
cats = Reference(timeline_sheet, min_col=12, min_row=5, max_row=max_row)
|
|
chart.add_data(data_ref, titles_from_data=True, from_rows=False)
|
|
chart.set_categories(cats)
|
|
chart.style = 2
|
|
ws.add_chart(chart, "G12")
|
|
|
|
set_col_widths(ws, {"A": 28, "B": 18, "C": 16, "D": 24, "E": 16, "F": 18, "G": 24, "H": 24})
|
|
|
|
|
|
def build_sector_analysis(wb, data: dict) -> None:
|
|
ws = wb.create_sheet("sector_trend_analysis")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"섹터 동향 분석",
|
|
"섹터별 ETF 프록시, 대표주 모니터, 스마트머니 유입, 수익률, 유동성 방향을 함께 보는 상세 시트",
|
|
end_col=18,
|
|
)
|
|
headers = [
|
|
"sector", "proxy_ticker", "proxy_name", "proxy_type", "universe_source", "etf_code",
|
|
"etf_execution_use", "etf_liquidity_score", "etf_liquidity_status", "etf_nav_risk",
|
|
"proxy_confidence", "rank", "rank_delta_w1", "rank_delta_w2", "sector_score",
|
|
"score_delta", "sector_ret5d", "sector_ret20d", "etf_return_5d", "etf_return_20d",
|
|
"sector_etf_ret_gap_5d", "sector_etf_ret_gap_20d", "smart_money_5d_krw_raw",
|
|
"smart_money_20d_krw_raw", "smart_money_direction", "liquidity_direction",
|
|
"flow_alignment_state", "momentum_state", "concentration_weight_pct"
|
|
]
|
|
rows = []
|
|
for row in data.get("rows") or []:
|
|
rows.append([row.get(h, "") for h in headers])
|
|
write_table(ws, 4, 1, headers, rows)
|
|
ws.auto_filter.ref = f"A4:{get_column_letter(len(headers))}{4 + len(rows)}"
|
|
ws.freeze_panes = "A5"
|
|
for col in ["F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", "AA", "AB"]:
|
|
ws.column_dimensions[col].width = 16
|
|
ws.column_dimensions["C"].width = 18
|
|
ws.column_dimensions["A"].width = 16
|
|
ws.column_dimensions["B"].width = 12
|
|
ws.column_dimensions["D"].width = 12
|
|
ws.column_dimensions["E"].width = 12
|
|
ws.column_dimensions["J"].width = 14
|
|
ws.column_dimensions["P"].width = 12
|
|
ws.column_dimensions["Q"].width = 12
|
|
ws.column_dimensions["AA"].width = 18
|
|
ws.column_dimensions["AB"].width = 18
|
|
|
|
chart = BarChart()
|
|
chart.type = "bar"
|
|
chart.style = 10
|
|
chart.title = "Sector 20D Return by Sector"
|
|
chart.y_axis.title = "Sector"
|
|
chart.x_axis.title = "20D Return"
|
|
chart.height = 8
|
|
chart.width = 14
|
|
data_ref = Reference(ws, min_col=18, min_row=4, max_row=4 + len(rows))
|
|
cats = Reference(ws, min_col=1, min_row=5, max_row=4 + len(rows))
|
|
chart.add_data(data_ref, titles_from_data=True)
|
|
chart.set_categories(cats)
|
|
chart.legend = None
|
|
ws.add_chart(chart, "AD4")
|
|
|
|
|
|
def build_sector_timeline(wb, data: dict, source_data: dict | None = None) -> None:
|
|
ws = wb.create_sheet("sector_trend_timeline")
|
|
style_sheet(ws)
|
|
style_title(ws, "섹터 시계열", "최근 스냅샷 기준 경향성 추세", end_col=10)
|
|
headers = [
|
|
"snapshot_date", "sector_count", "avg_sector_score", "top_sector", "top_sector_score",
|
|
"positive_breadth_count", "liquidity_warn_count", "net_smart_money_5d_krw",
|
|
"top_sector_rank", "top_sector_smart_money_5d_krw"
|
|
]
|
|
rows = []
|
|
for row in data.get("timeline") or []:
|
|
parsed_date = row.get("snapshot_date", "")
|
|
if isinstance(parsed_date, str) and parsed_date:
|
|
try:
|
|
parsed_date = datetime.fromisoformat(parsed_date.replace("Z", "+00:00")).date()
|
|
except Exception:
|
|
pass
|
|
rows.append([
|
|
parsed_date,
|
|
row.get("sector_count", ""),
|
|
row.get("avg_sector_score", ""),
|
|
row.get("top_sector", ""),
|
|
row.get("top_sector_score", ""),
|
|
row.get("positive_breadth_count", ""),
|
|
row.get("liquidity_warn_count", ""),
|
|
row.get("net_smart_money_5d_krw", ""),
|
|
row.get("top_sector_rank", ""),
|
|
row.get("top_sector_smart_money_5d_krw", ""),
|
|
])
|
|
write_table(ws, 4, 1, headers, rows)
|
|
helper_headers = [
|
|
"snapshot_date", "avg_sector_score", "top_sector_score",
|
|
"positive_breadth_count", "liquidity_warn_count", "net_smart_money_5d_krw"
|
|
]
|
|
helper_rows = []
|
|
for row in rows:
|
|
helper_rows.append([row[0], row[2], row[4], row[5], row[6], row[7]])
|
|
write_table(ws, 4, 12, helper_headers, helper_rows)
|
|
ws.freeze_panes = "A5"
|
|
ws.column_dimensions["A"].width = 14
|
|
ws.column_dimensions["B"].width = 12
|
|
ws.column_dimensions["C"].width = 14
|
|
ws.column_dimensions["D"].width = 16
|
|
ws.column_dimensions["E"].width = 14
|
|
ws.column_dimensions["F"].width = 16
|
|
ws.column_dimensions["G"].width = 16
|
|
ws.column_dimensions["H"].width = 18
|
|
ws.column_dimensions["I"].width = 14
|
|
ws.column_dimensions["J"].width = 18
|
|
|
|
chart = LineChart()
|
|
chart.title = "Trend Score / Breadth / Liquidity"
|
|
chart.y_axis.title = "Count / Score"
|
|
chart.x_axis.title = "Snapshot"
|
|
chart.height = 8
|
|
chart.width = 15
|
|
data_ref = Reference(ws, min_col=13, max_col=17, min_row=4, max_row=4 + len(helper_rows))
|
|
cats = Reference(ws, min_col=12, min_row=5, max_row=4 + len(helper_rows))
|
|
chart.add_data(data_ref, titles_from_data=True, from_rows=False)
|
|
chart.set_categories(cats)
|
|
chart.style = 3
|
|
ws.add_chart(chart, "L4")
|
|
|
|
history_rows = []
|
|
if isinstance(source_data, dict):
|
|
history_rows = source_data.get("sector_flow_history") or []
|
|
if not history_rows:
|
|
history_rows = data.get("timeline_history") or data.get("history") or []
|
|
if isinstance(history_rows, list) and history_rows:
|
|
history_by_sector: dict[str, list[dict[str, object]]] = {}
|
|
for item in history_rows:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
sector = str(item.get("Sector") or "").strip()
|
|
if not sector:
|
|
continue
|
|
history_by_sector.setdefault(sector, []).append(item)
|
|
|
|
top_sectors = []
|
|
for row in rows[:3]:
|
|
if len(row) > 3 and row[3]:
|
|
top_sectors.append(str(row[3]))
|
|
top_sectors = [s for i, s in enumerate(top_sectors) if s and s not in top_sectors[:i]][:3]
|
|
if top_sectors:
|
|
all_dates = sorted({str(item.get("Snapshot_Date") or "") for item in history_rows if str(item.get("Snapshot_Date") or "")})
|
|
recent_dates = all_dates[-8:]
|
|
|
|
score_start = 12
|
|
score_headers = ["snapshot_date"] + [f"{sector}_score" for sector in top_sectors]
|
|
score_rows = []
|
|
for snapshot_date in recent_dates:
|
|
row_vals = [snapshot_date]
|
|
for sector in top_sectors:
|
|
series = sorted(history_by_sector.get(sector, []), key=lambda r: str(r.get("Snapshot_Date") or ""))
|
|
match = next((r for r in series if str(r.get("Snapshot_Date") or "") == snapshot_date), {})
|
|
row_vals.append(match.get("Sector_Score", ""))
|
|
score_rows.append(row_vals)
|
|
write_table(ws, 4, score_start, score_headers, score_rows)
|
|
ws.column_dimensions[get_column_letter(score_start)].width = 14
|
|
for offset in range(1, len(score_headers)):
|
|
ws.column_dimensions[get_column_letter(score_start + offset)].width = 16
|
|
|
|
score_chart = LineChart()
|
|
score_chart.title = "Top Sector Score Trend"
|
|
score_chart.y_axis.title = "Score"
|
|
score_chart.x_axis.title = "Snapshot"
|
|
score_chart.height = 8
|
|
score_chart.width = 15
|
|
score_chart.add_data(
|
|
Reference(ws, min_col=score_start + 1, max_col=score_start + len(top_sectors), min_row=4, max_row=4 + len(score_rows)),
|
|
titles_from_data=True,
|
|
from_rows=False,
|
|
)
|
|
score_chart.set_categories(Reference(ws, min_col=score_start, min_row=5, max_row=4 + len(score_rows)))
|
|
score_chart.style = 2
|
|
ws.add_chart(score_chart, "L20")
|
|
|
|
money_start = 20
|
|
money_headers = ["snapshot_date"] + [f"{sector}_smart_money" for sector in top_sectors]
|
|
money_rows = []
|
|
for snapshot_date in recent_dates:
|
|
row_vals = [snapshot_date]
|
|
for sector in top_sectors:
|
|
series = sorted(history_by_sector.get(sector, []), key=lambda r: str(r.get("Snapshot_Date") or ""))
|
|
match = next((r for r in series if str(r.get("Snapshot_Date") or "") == snapshot_date), {})
|
|
row_vals.append(match.get("SmartMoney_5D_KRW", ""))
|
|
money_rows.append(row_vals)
|
|
write_table(ws, 4, money_start, money_headers, money_rows)
|
|
ws.column_dimensions[get_column_letter(money_start)].width = 14
|
|
for offset in range(1, len(money_headers)):
|
|
ws.column_dimensions[get_column_letter(money_start + offset)].width = 18
|
|
|
|
money_chart = LineChart()
|
|
money_chart.title = "Top Sector Smart Money Trend"
|
|
money_chart.y_axis.title = "KRW"
|
|
money_chart.x_axis.title = "Snapshot"
|
|
money_chart.height = 8
|
|
money_chart.width = 15
|
|
money_chart.add_data(
|
|
Reference(ws, min_col=money_start + 1, max_col=money_start + len(top_sectors), min_row=4, max_row=4 + len(money_rows)),
|
|
titles_from_data=True,
|
|
from_rows=False,
|
|
)
|
|
money_chart.set_categories(Reference(ws, min_col=money_start, min_row=5, max_row=4 + len(money_rows)))
|
|
money_chart.style = 3
|
|
ws.add_chart(money_chart, "L36")
|
|
|
|
|
|
def build_sector_universe_refresh_audit_sheet(wb, source_data: dict) -> None:
|
|
ws = wb.create_sheet("sector_universe_refresh_audit")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"섹터 월간 갱신 감사",
|
|
"Naver ETF 페이지 기반 구성종목 갱신 상태와 provenance 분리 상태를 점검하는 감사 시트. AJAX/XHR 전제는 두지 않고 HTML 서버렌더링 테이블을 우선한다.",
|
|
end_col=16,
|
|
)
|
|
payload = {"data": source_data}
|
|
audit = build_sector_universe_refresh_audit(payload)
|
|
summary = audit.get("summary") or {}
|
|
items = [
|
|
("formula_id", audit.get("formula_id", "")),
|
|
("gate", audit.get("gate", "")),
|
|
("sector_count", summary.get("sector_count", 0)),
|
|
("current_count", summary.get("current_count", 0)),
|
|
("due_count", summary.get("due_count", 0)),
|
|
("overdue_count", summary.get("overdue_count", 0)),
|
|
("layout_changed_count", summary.get("layout_changed_count", 0)),
|
|
("missing_count", summary.get("missing_count", 0)),
|
|
("template_count", summary.get("template_count", 0)),
|
|
("sheet_input_count", summary.get("sheet_input_count", 0)),
|
|
("naver_source_count", summary.get("naver_source_count", 0)),
|
|
("missing_source_url_count", summary.get("missing_source_url_count", 0)),
|
|
("stale_sector_count", summary.get("stale_sector_count", 0)),
|
|
]
|
|
add_kpi_block(ws, 4, items)
|
|
ws["D4"] = "Refresh policy"
|
|
ws["D4"].fill = SUBHEADER_FILL
|
|
ws["D4"].font = BOLD_FONT
|
|
ws["D5"] = "NAVER_ETF_PAGE rows are the monthly refreshed source."
|
|
ws["D6"] = "SHEET_INPUT rows are manual/provisional and must stay separate."
|
|
ws["D7"] = "DEFAULT_TEMPLATE rows are a fail in the monthly gate."
|
|
ws["D8"] = "Source_URL and Source_AsOf are required for provenance."
|
|
ws["D9"] = "This is HTML-server-rendered, not AJAX. JS is only a fallback probe for candidate URLs."
|
|
ws["D10"] = "No guessed holdings are written when the page layout changes."
|
|
ws["D11"] = "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED is a separate layout-change failure state."
|
|
ws["D12"] = "Financial sectors are split as 은행 / 증권 / 지주회사 in sector_universe; sector_flow reflects carryover until GAS runDataFeed is rerun."
|
|
ws["D13"] = "This split is part of the monthly refresh harness; Source_URL and Source_AsOf must remain valid for provenance."
|
|
rows = audit.get("rows") or []
|
|
if rows:
|
|
headers = [
|
|
"sector", "proxy_ticker", "proxy_name", "proxy_type", "source_kind", "transport_mode",
|
|
"source_url", "source_asof", "age_days", "constituent_count",
|
|
"stock_count", "etf_count", "weight_sum", "status", "refresh_reason",
|
|
]
|
|
write_table(ws, 14, 1, headers, [[row.get(h, "") for h in headers] for row in rows])
|
|
for col, width in {
|
|
"A": 16, "B": 12, "C": 18, "D": 12, "E": 16, "F": 18, "G": 42, "H": 14,
|
|
"I": 10, "J": 14, "K": 12, "L": 12, "M": 12, "N": 12, "O": 24,
|
|
}.items():
|
|
ws.column_dimensions[col].width = width
|
|
ws.freeze_panes = "A5"
|
|
ws["A11"] = "Notes"
|
|
ws["A11"].fill = SUBHEADER_FILL
|
|
ws["A11"].font = BOLD_FONT
|
|
ws["A12"] = "홈페이지 리뉴얼로 표 구조가 바뀌면, 파서는 추정하지 않고 실패 상태를 남겨 월간 게이트에서 잡는다."
|
|
ws["A12"].alignment = Alignment(wrap_text=True)
|
|
|
|
|
|
def build_etf_summary(wb, data: dict) -> None:
|
|
ws = wb.create_sheet("etf_representative_summary")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"ETF 대표 종목 요약",
|
|
"ETF 구성비중 우선, 부족분은 유동성 우선 후보로 보강한 3종목 바스켓 요약",
|
|
end_col=8,
|
|
)
|
|
summary = data.get("summary") or {}
|
|
items = [
|
|
("formula_id", data.get("formula_id", "")),
|
|
("gate", data.get("gate", "")),
|
|
("etf_sector_count", data.get("etf_sector_count", 0)),
|
|
("tracked_count", data.get("tracked_count", 0)),
|
|
("complete_basket_count", summary.get("complete_basket_count", 0)),
|
|
("partial_basket_count", summary.get("partial_basket_count", 0)),
|
|
("basket_missing_total", summary.get("basket_missing_total", 0)),
|
|
("weighted_basis_count", summary.get("weighted_basis_count", 0)),
|
|
("fallback_basis_count", summary.get("fallback_basis_count", 0)),
|
|
("selected_sector_count", summary.get("selected_sector_count", 0)),
|
|
]
|
|
add_kpi_block(ws, 4, items)
|
|
ws["D4"] = "Representative principle"
|
|
ws["D4"].fill = SUBHEADER_FILL
|
|
ws["D4"].font = BOLD_FONT
|
|
ws["D5"] = "1) ETF constituent weight first"
|
|
ws["D6"] = "2) Missing slots filled with same-sector live candidates"
|
|
ws["D7"] = "3) Missing data stays explicit as DATA_MISSING"
|
|
ws["D8"] = "4) Minimum 3 names per sector basket"
|
|
ws["D9"] = "5) Universe_Source=DEFAULT_TEMPLATE rows are provisional until sheet-backed data exists."
|
|
ws["G4"] = "Top reps"
|
|
ws["G4"].fill = SUBHEADER_FILL
|
|
ws["G4"].font = BOLD_FONT
|
|
for i, item in enumerate(summary.get("top_rep_names") or [], start=5):
|
|
ws.cell(i, 7).value = item
|
|
set_col_widths(ws, {"A": 28, "B": 18, "C": 16, "D": 30, "E": 18, "F": 18, "G": 24, "H": 24})
|
|
|
|
|
|
def build_etf_monitor(wb, data: dict) -> None:
|
|
ws = wb.create_sheet("etf_representative_monitor")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"ETF 대표 종목 모니터",
|
|
"섹터별 3종목 바스켓과 선택 근거, 커버리지, 품질 상태를 추적",
|
|
end_col=18,
|
|
)
|
|
headers = [
|
|
"sector", "etf_proxy_ticker", "etf_proxy_name", "etf_proxy_type", "universe_source", "sector_rank",
|
|
"sector_score", "sector_smart_money_5d_krw", "sector_ret20d", "representative_count",
|
|
"representative_ticker", "representative_name", "representative_basis",
|
|
"representative_basis_detail", "constituent_weight", "basket_quality_state",
|
|
"basket_coverage_pct", "basket_state", "basket_buy_review_count",
|
|
"basket_track_count", "basket_watch_count", "basket_caution_count",
|
|
"basket_aligned_count", "basket_missing_count", "basket_real_count",
|
|
"selection_source", "selection_score", "monitor_reason"
|
|
]
|
|
rows = []
|
|
for row in data.get("rows") or []:
|
|
rows.append([row.get(h, "") for h in headers])
|
|
write_table(ws, 4, 1, headers, rows)
|
|
ws.auto_filter.ref = f"A4:{get_column_letter(len(headers))}{4 + len(rows)}"
|
|
ws.freeze_panes = "A5"
|
|
for col, width in {"A": 18, "B": 12, "C": 16, "D": 12, "E": 12, "F": 12, "G": 18, "H": 12,
|
|
"I": 14, "J": 12, "K": 18, "L": 18, "M": 24, "N": 14, "O": 14,
|
|
"P": 14, "Q": 14, "R": 14, "S": 14, "T": 14, "U": 14, "V": 14,
|
|
"W": 14, "X": 18, "Y": 14, "Z": 12, "AA": 24}.items():
|
|
ws.column_dimensions[col].width = width
|
|
|
|
chart = BarChart()
|
|
chart.type = "bar"
|
|
chart.style = 10
|
|
chart.title = "Basket Coverage by Sector"
|
|
chart.y_axis.title = "Sector"
|
|
chart.x_axis.title = "Coverage %"
|
|
chart.height = 8
|
|
chart.width = 14
|
|
data_ref = Reference(ws, min_col=17, min_row=4, max_row=4 + len(rows))
|
|
cats = Reference(ws, min_col=1, min_row=5, max_row=4 + len(rows))
|
|
chart.add_data(data_ref, titles_from_data=True)
|
|
chart.set_categories(cats)
|
|
chart.legend = None
|
|
ws.add_chart(chart, "AC4")
|
|
|
|
|
|
def main() -> None:
|
|
if not INPUT_XLSX.exists():
|
|
raise FileNotFoundError(INPUT_XLSX)
|
|
if not SECTOR_JSON.exists():
|
|
raise FileNotFoundError(SECTOR_JSON)
|
|
if not ETF_JSON.exists():
|
|
raise FileNotFoundError(ETF_JSON)
|
|
raw_json_path = ROOT / "GatherTradingData.json"
|
|
|
|
sector = load_json(SECTOR_JSON)
|
|
etf = load_json(ETF_JSON)
|
|
raw_data = load_json(raw_json_path) if raw_json_path.exists() else {}
|
|
raw_source = raw_data.get("data", {}) if isinstance(raw_data.get("data"), dict) else {}
|
|
|
|
wb = load_workbook(INPUT_XLSX)
|
|
for name in [
|
|
"portfolio_performance_summary",
|
|
"performance_readiness_summary",
|
|
"operational_eval_queue_summary",
|
|
"portfolio_sector_exposure",
|
|
"sector_universe_refresh_audit",
|
|
"_portfolio_holdings_helper",
|
|
"sector_trend_summary",
|
|
"sector_trend_analysis",
|
|
"sector_trend_timeline",
|
|
"etf_representative_summary",
|
|
"etf_representative_monitor",
|
|
]:
|
|
remove_if_exists(wb, name)
|
|
|
|
# Build data sheets first so summary sheets can reference the timeline sheet.
|
|
build_portfolio_summary(wb)
|
|
build_performance_readiness_summary(wb)
|
|
build_operational_eval_queue_summary(wb)
|
|
build_portfolio_sector_exposure(wb)
|
|
build_sector_universe_refresh_audit_sheet(wb, raw_source)
|
|
build_sector_timeline(wb, sector, raw_source)
|
|
build_sector_analysis(wb, sector)
|
|
build_sector_summary(wb, sector)
|
|
build_etf_monitor(wb, etf)
|
|
build_etf_summary(wb, etf)
|
|
|
|
# Put summary sheets near the front.
|
|
order = [
|
|
"settings",
|
|
"portfolio_performance_summary",
|
|
"performance_readiness_summary",
|
|
"operational_eval_queue_summary",
|
|
"portfolio_sector_exposure",
|
|
"sector_universe_refresh_audit",
|
|
"sector_trend_summary",
|
|
"sector_trend_analysis",
|
|
"sector_trend_timeline",
|
|
"etf_representative_summary",
|
|
"etf_representative_monitor",
|
|
]
|
|
existing = [s for s in wb.sheetnames if s not in order]
|
|
wb._sheets = [wb[s] for s in order if s in wb.sheetnames] + [wb[s] for s in existing]
|
|
if "_portfolio_holdings_helper" in wb.sheetnames:
|
|
wb["_portfolio_holdings_helper"].sheet_state = "hidden"
|
|
wb.active = wb.sheetnames.index("sector_trend_summary")
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
wb.save(OUTPUT_XLSX)
|
|
print(f"saved {OUTPUT_XLSX}")
|
|
print("sheets", wb.sheetnames[:10])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|