70675a5a92
DAG (step_count 83→86): - update_proposal_evaluation_history (wave_5): 일간 실행 — core_satellite 제안 기록 + T+1/T+5/T+20 자동 평가 - build_operational_eval_queue (wave_5): T+20 평가 대기 큐 — due_date 초과 종목 목록 - build_operational_outcome_lock (wave_5): 실운영 T+20 성과 잠금 — 30건 이상 누적 후 활성화 - build_algorithm_guidance_proof depends_on에 build_operational_outcome_lock 추가 - validate_specs.py: 41_release_dag.yaml 50KB 예외 추가 (DAG 확장 예정) 렌더러/워크북: - render_operational_report.py: 섹터 상위 3개 최근 5기 추세 테이블 추가 (score/ret20d/smart_money sparkline) - update_workbook_sector_insights.py: sector_flow_history 기반 섹터 시계열 차트 추가 (score + smart money) 운영: update_proposal_evaluation_history 최초 실행 — 75건 core_satellite 제안 기록 완료 (T+20 ~2026-07-12) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
820 lines
34 KiB
Python
820 lines
34 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from openpyxl import load_workbook
|
|
from openpyxl.chart import BarChart, LineChart, Reference
|
|
from openpyxl.styles import Font, PatternFill, Alignment
|
|
from openpyxl.utils import get_column_letter
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
INPUT_XLSX = ROOT / "GatherTradingData.xlsx"
|
|
OUTPUT_DIR = ROOT / "outputs" / "sector_insights_enhanced"
|
|
OUTPUT_XLSX = OUTPUT_DIR / "GatherTradingData_sector_insights.xlsx"
|
|
SECTOR_JSON = ROOT / "Temp" / "sector_trend_analysis_v1.json"
|
|
ETF_JSON = ROOT / "Temp" / "etf_representative_monitor_v1.json"
|
|
|
|
|
|
HEADER_FILL = PatternFill("solid", fgColor="1F4E78")
|
|
SUBHEADER_FILL = PatternFill("solid", fgColor="D9EAF7")
|
|
KPI_FILL = PatternFill("solid", fgColor="F3F7FB")
|
|
KPI_LABEL_FILL = PatternFill("solid", fgColor="E2F0D9")
|
|
KPI_VALUE_FILL = PatternFill("solid", fgColor="FFF2CC")
|
|
WHITE_FONT = Font(color="FFFFFF", bold=True)
|
|
BOLD_FONT = Font(bold=True)
|
|
TITLE_FONT = Font(size=14, bold=True)
|
|
NOTE_FONT = Font(italic=True, color="666666")
|
|
|
|
|
|
def load_json(path: Path) -> dict:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def remove_if_exists(wb, name: str) -> None:
|
|
if name in wb.sheetnames:
|
|
del wb[name]
|
|
|
|
|
|
def style_title(ws, title: str, subtitle: str | None = None, end_col: int = 8) -> None:
|
|
ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=end_col)
|
|
ws["A1"] = title
|
|
ws["A1"].font = TITLE_FONT
|
|
ws["A1"].fill = HEADER_FILL
|
|
ws["A1"].font = WHITE_FONT
|
|
ws["A1"].alignment = Alignment(horizontal="left")
|
|
if subtitle:
|
|
ws.merge_cells(start_row=2, start_column=1, end_row=2, end_column=end_col)
|
|
ws["A2"] = subtitle
|
|
ws["A2"].font = NOTE_FONT
|
|
|
|
|
|
def write_table(ws, start_row: int, start_col: int, headers: list[str], rows: list[list], header_fill=HEADER_FILL) -> int:
|
|
for j, header in enumerate(headers, start=start_col):
|
|
cell = ws.cell(start_row, j)
|
|
cell.value = header
|
|
cell.font = WHITE_FONT
|
|
cell.fill = header_fill
|
|
cell.alignment = Alignment(horizontal="center", vertical="center")
|
|
for i, row in enumerate(rows, start=start_row + 1):
|
|
for j, value in enumerate(row, start=start_col):
|
|
cell = ws.cell(i, j)
|
|
cell.value = value
|
|
cell.alignment = Alignment(vertical="top")
|
|
return start_row + len(rows)
|
|
|
|
|
|
def add_kpi_block(ws, start_row: int, items: list[tuple[str, object]]) -> int:
|
|
ws.cell(start_row, 1).value = "KPI"
|
|
ws.cell(start_row, 1).fill = SUBHEADER_FILL
|
|
ws.cell(start_row, 1).font = BOLD_FONT
|
|
row = start_row + 1
|
|
for label, value in items:
|
|
ws.cell(row, 1).value = label
|
|
ws.cell(row, 1).fill = KPI_LABEL_FILL
|
|
ws.cell(row, 1).font = BOLD_FONT
|
|
ws.cell(row, 2).value = value
|
|
ws.cell(row, 2).fill = KPI_VALUE_FILL
|
|
row += 1
|
|
return row
|
|
|
|
|
|
def set_col_widths(ws, widths: dict[str, int]) -> None:
|
|
for col, width in widths.items():
|
|
ws.column_dimensions[col].width = width
|
|
|
|
|
|
def style_sheet(ws) -> None:
|
|
ws.freeze_panes = "A3"
|
|
ws.sheet_view.showGridLines = False
|
|
|
|
|
|
def extract_sheet_rows(wb, sheet_name: str) -> tuple[list[str], list[list]]:
|
|
ws = wb[sheet_name]
|
|
first_row = [ws.cell(1, c).value for c in range(1, ws.max_column + 1)]
|
|
second_row = [ws.cell(2, c).value for c in range(1, ws.max_column + 1)] if ws.max_row >= 2 else []
|
|
use_second_row = (
|
|
bool(second_row)
|
|
and not any(v not in (None, "") for v in first_row)
|
|
and any(v not in (None, "") for v in second_row)
|
|
)
|
|
headers = second_row if use_second_row else first_row
|
|
start_row = 3 if use_second_row else 2
|
|
rows: list[list] = []
|
|
for r in range(start_row, ws.max_row + 1):
|
|
row = [ws.cell(r, c).value for c in range(1, ws.max_column + 1)]
|
|
if any(v is not None and v != "" for v in row):
|
|
rows.append(row)
|
|
return headers, rows
|
|
|
|
|
|
def build_portfolio_summary(wb) -> None:
|
|
def display(value):
|
|
return value if value not in (None, "") else "DATA_MISSING — 하네스 업데이트 필요"
|
|
|
|
daily_headers, daily_rows = extract_sheet_rows(wb, "daily_history")
|
|
monthly_headers, monthly_rows = extract_sheet_rows(wb, "monthly_history")
|
|
account_headers, account_rows = extract_sheet_rows(wb, "account_snapshot")
|
|
|
|
ws = wb.create_sheet("portfolio_performance_summary")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"포트폴리오 성과 요약",
|
|
"내 자금의 일간/월간 추이와 최신 보유 비중을 함께 보는 요약 시트",
|
|
end_col=10,
|
|
)
|
|
|
|
latest_daily = daily_rows[-1] if daily_rows else []
|
|
latest_month = monthly_rows[-1] if monthly_rows else []
|
|
latest_total_asset = latest_daily[1] if len(latest_daily) > 1 else None
|
|
latest_peak_asset = latest_daily[2] if len(latest_daily) > 2 else None
|
|
latest_mdd = latest_daily[3] if len(latest_daily) > 3 else None
|
|
latest_month_total = latest_month[1] if len(latest_month) > 1 else None
|
|
latest_month_return = latest_month[8] if len(latest_month) > 8 else None
|
|
latest_ytd_return = latest_month[10] if len(latest_month) > 10 else None
|
|
|
|
latest_capture = None
|
|
if account_rows:
|
|
latest_capture = account_rows[0][0]
|
|
for row in account_rows:
|
|
if row and row[0] and row[0] > latest_capture:
|
|
latest_capture = row[0]
|
|
|
|
latest_holdings = [r for r in account_rows if r and r[0] == latest_capture]
|
|
holdings_sorted = sorted(
|
|
latest_holdings,
|
|
key=lambda r: (r[10] if len(r) > 10 and isinstance(r[10], (int, float)) else 0),
|
|
reverse=True,
|
|
)
|
|
total_mv = sum(r[10] for r in holdings_sorted if len(r) > 10 and isinstance(r[10], (int, float)))
|
|
total_cost = sum(r[8] for r in holdings_sorted if len(r) > 8 and isinstance(r[8], (int, float)))
|
|
total_pl = sum(r[11] for r in holdings_sorted if len(r) > 11 and isinstance(r[11], (int, float)))
|
|
|
|
items = [
|
|
("latest_daily_asset", display(latest_total_asset)),
|
|
("latest_peak_asset", display(latest_peak_asset)),
|
|
("latest_daily_mdd_pct", display(latest_mdd)),
|
|
("latest_month_total_asset", display(latest_month_total)),
|
|
("latest_month_return_pct", display(latest_month_return)),
|
|
("latest_ytd_return_pct", display(latest_ytd_return)),
|
|
("latest_capture", display(latest_capture)),
|
|
("latest_holdings_count", display(len(latest_holdings))),
|
|
("latest_holdings_market_value", display(total_mv)),
|
|
("latest_holdings_profit_loss", display(total_pl)),
|
|
]
|
|
add_kpi_block(ws, 4, items)
|
|
|
|
ws["D4"] = "Portfolio view"
|
|
ws["D4"].fill = SUBHEADER_FILL
|
|
ws["D4"].font = BOLD_FONT
|
|
ws["D5"] = "일간/월간 자산 추이는 실제 계좌 스냅샷 기반입니다."
|
|
ws["D6"] = "보유 비중 차트는 최신 스냅샷의 시장가치 기준입니다."
|
|
ws["D7"] = "수익률이 음수여도 숨기지 않고 그대로 보여줍니다."
|
|
|
|
ws["G4"] = "Top holdings"
|
|
ws["G4"].fill = SUBHEADER_FILL
|
|
ws["G4"].font = BOLD_FONT
|
|
for i, row in enumerate(holdings_sorted[:10], start=5):
|
|
name = row[4] if len(row) > 4 else ""
|
|
mv = row[10] if len(row) > 10 else ""
|
|
ws.cell(i, 7).value = f"{name} ({mv})"
|
|
|
|
# Daily history chart helper
|
|
daily_sheet = wb["daily_history"]
|
|
daily_max = daily_sheet.max_row
|
|
daily_chart = LineChart()
|
|
daily_chart.title = "Daily Asset / MDD"
|
|
daily_chart.y_axis.title = "KRW / %"
|
|
daily_chart.x_axis.title = "Date"
|
|
daily_chart.height = 7
|
|
daily_chart.width = 13
|
|
daily_data = Reference(daily_sheet, min_col=2, max_col=4, min_row=1, max_row=daily_max)
|
|
daily_cats = Reference(daily_sheet, min_col=1, min_row=2, max_row=daily_max)
|
|
daily_chart.add_data(daily_data, titles_from_data=True, from_rows=False)
|
|
daily_chart.set_categories(daily_cats)
|
|
daily_chart.style = 2
|
|
ws.add_chart(daily_chart, "A13")
|
|
|
|
# Monthly history chart
|
|
monthly_sheet = wb["monthly_history"]
|
|
monthly_max = monthly_sheet.max_row
|
|
monthly_chart = LineChart()
|
|
monthly_chart.title = "Monthly Return Trend"
|
|
monthly_chart.y_axis.title = "%"
|
|
monthly_chart.x_axis.title = "Month"
|
|
monthly_chart.height = 7
|
|
monthly_chart.width = 13
|
|
monthly_data = Reference(monthly_sheet, min_col=8, max_col=11, min_row=1, max_row=monthly_max)
|
|
monthly_cats = Reference(monthly_sheet, min_col=1, min_row=2, max_row=monthly_max)
|
|
monthly_chart.add_data(monthly_data, titles_from_data=True, from_rows=False)
|
|
monthly_chart.set_categories(monthly_cats)
|
|
monthly_chart.style = 3
|
|
ws.add_chart(monthly_chart, "G13")
|
|
|
|
# Top holdings bar chart
|
|
hold_chart = BarChart()
|
|
hold_chart.type = "bar"
|
|
hold_chart.title = "Top Holdings by Market Value"
|
|
hold_chart.y_axis.title = "Holding"
|
|
hold_chart.x_axis.title = "KRW"
|
|
hold_chart.height = 8
|
|
hold_chart.width = 13
|
|
ws_hold = wb.create_sheet("_portfolio_holdings_helper")
|
|
helper_headers = ["name", "market_value"]
|
|
helper_rows = [[r[4], r[10]] for r in holdings_sorted[:10] if len(r) > 10]
|
|
write_table(ws_hold, 1, 1, helper_headers, helper_rows)
|
|
hold_data = Reference(ws_hold, min_col=2, min_row=1, max_row=1 + len(helper_rows))
|
|
hold_cats = Reference(ws_hold, min_col=1, min_row=2, max_row=1 + len(helper_rows))
|
|
hold_chart.add_data(hold_data, titles_from_data=True)
|
|
hold_chart.set_categories(hold_cats)
|
|
hold_chart.legend = None
|
|
ws.add_chart(hold_chart, "A30")
|
|
|
|
set_col_widths(ws, {"A": 22, "B": 18, "C": 18, "D": 24, "E": 18, "F": 18, "G": 26, "H": 26, "I": 18, "J": 18})
|
|
|
|
|
|
def build_portfolio_sector_exposure(wb) -> None:
|
|
daily_headers, daily_rows = extract_sheet_rows(wb, "daily_history")
|
|
account_headers, account_rows = extract_sheet_rows(wb, "account_snapshot")
|
|
universe_headers, universe_rows = extract_sheet_rows(wb, "universe")
|
|
|
|
account_dicts = [dict(zip(account_headers, row)) for row in account_rows if any(v not in (None, "") for v in row)]
|
|
universe_dicts = [dict(zip(universe_headers, row)) for row in universe_rows if any(v not in (None, "") for v in row)]
|
|
|
|
sector_map: dict[str, str] = {}
|
|
for row in universe_dicts:
|
|
ticker = str(row.get("Ticker", "") or "").zfill(6)
|
|
sector = str(row.get("Sector", "") or "").strip()
|
|
if ticker and sector:
|
|
sector_map[ticker] = sector
|
|
|
|
latest_capture = ""
|
|
for row in account_dicts:
|
|
cap = str(row.get("captured_at", "") or "")
|
|
if cap and cap >= latest_capture:
|
|
latest_capture = cap
|
|
latest_rows = [r for r in account_dicts if str(r.get("captured_at", "") or "") == latest_capture]
|
|
|
|
exposure: dict[str, dict[str, float]] = {}
|
|
sector_holdings: dict[str, list[dict[str, object]]] = {}
|
|
for row in latest_rows:
|
|
ticker = str(row.get("ticker", "") or "").zfill(6)
|
|
sector = sector_map.get(ticker, "미분류")
|
|
mv = float(row.get("market_value", 0) or 0)
|
|
pl = float(row.get("profit_loss", 0) or 0)
|
|
cost = float(row.get("total_cost", 0) or 0)
|
|
bucket = exposure.setdefault(sector, {"market_value": 0.0, "profit_loss": 0.0, "cost": 0.0, "count": 0.0})
|
|
bucket["market_value"] += mv
|
|
bucket["profit_loss"] += pl
|
|
bucket["cost"] += cost
|
|
bucket["count"] += 1
|
|
sector_holdings.setdefault(sector, []).append({
|
|
"ticker": ticker,
|
|
"name": row.get("name", ""),
|
|
"market_value": mv,
|
|
"profit_loss": pl,
|
|
"return_pct": row.get("return_pct", ""),
|
|
})
|
|
|
|
total_mv = sum(v["market_value"] for v in exposure.values()) or 1.0
|
|
rows = []
|
|
for sector, vals in sorted(exposure.items(), key=lambda kv: kv[1]["market_value"], reverse=True):
|
|
pct = vals["market_value"] / total_mv * 100.0
|
|
ret_pct = (vals["profit_loss"] / vals["cost"] * 100.0) if vals["cost"] else 0.0
|
|
rows.append([sector, vals["count"], vals["market_value"], pct, vals["profit_loss"], ret_pct])
|
|
|
|
ws = wb.create_sheet("portfolio_sector_exposure")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"포트폴리오 섹터 노출",
|
|
"최신 계좌 스냅샷 기준으로 섹터별 보유 시장가치와 손익률을 집계",
|
|
end_col=8,
|
|
)
|
|
items = [
|
|
("latest_capture", latest_capture),
|
|
("sector_count", len(rows)),
|
|
("top_sector", rows[0][0] if rows else ""),
|
|
("top_sector_weight_pct", rows[0][3] if rows else 0),
|
|
("top3_sector_weight_pct", sum(r[3] for r in rows[:3]) if rows else 0),
|
|
("total_market_value", total_mv),
|
|
]
|
|
add_kpi_block(ws, 4, items)
|
|
headers = ["sector", "holding_count", "market_value", "weight_pct", "profit_loss", "return_pct"]
|
|
write_table(ws, 4, 4, headers, rows)
|
|
ws["D4"] = "Sector exposure"
|
|
ws["D4"].fill = SUBHEADER_FILL
|
|
ws["D4"].font = BOLD_FONT
|
|
ws.freeze_panes = "A5"
|
|
ws.column_dimensions["A"].width = 24
|
|
ws.column_dimensions["B"].width = 14
|
|
ws.column_dimensions["C"].width = 18
|
|
ws.column_dimensions["D"].width = 24
|
|
ws.column_dimensions["E"].width = 14
|
|
ws.column_dimensions["F"].width = 14
|
|
ws.column_dimensions["G"].width = 16
|
|
ws.column_dimensions["H"].width = 14
|
|
|
|
chart = BarChart()
|
|
chart.type = "bar"
|
|
chart.style = 10
|
|
chart.title = "Sector Exposure by Market Value"
|
|
chart.y_axis.title = "Sector"
|
|
chart.x_axis.title = "KRW"
|
|
chart.height = 8
|
|
chart.width = 14
|
|
data_ref = Reference(ws, min_col=6, min_row=4, max_row=4 + len(rows))
|
|
cats = Reference(ws, min_col=4, min_row=5, max_row=4 + len(rows))
|
|
chart.add_data(data_ref, titles_from_data=True)
|
|
chart.set_categories(cats)
|
|
chart.legend = None
|
|
ws.add_chart(chart, "J4")
|
|
|
|
detail_rows: list[list[object]] = []
|
|
for sector, vals in sorted(exposure.items(), key=lambda kv: kv[1]["market_value"], reverse=True)[:5]:
|
|
sector_mv = vals["market_value"] or 1.0
|
|
holdings = sorted(sector_holdings.get(sector, []), key=lambda item: float(item.get("market_value", 0) or 0), reverse=True)[:3]
|
|
for idx, holding in enumerate(holdings, start=1):
|
|
mv = float(holding.get("market_value", 0) or 0)
|
|
detail_rows.append([
|
|
sector if idx == 1 else "",
|
|
idx,
|
|
holding.get("ticker", ""),
|
|
holding.get("name", ""),
|
|
mv,
|
|
mv / sector_mv * 100.0,
|
|
mv / total_mv * 100.0,
|
|
holding.get("return_pct", ""),
|
|
])
|
|
|
|
ws["A18"] = "Sector top holdings detail"
|
|
ws["A18"].fill = SUBHEADER_FILL
|
|
ws["A18"].font = BOLD_FONT
|
|
write_table(
|
|
ws,
|
|
19,
|
|
1,
|
|
["sector", "rank_in_sector", "ticker", "name", "market_value", "sector_weight_pct", "portfolio_weight_pct", "return_pct"],
|
|
detail_rows,
|
|
)
|
|
ws.column_dimensions["I"].width = 18
|
|
ws.column_dimensions["J"].width = 18
|
|
ws.column_dimensions["K"].width = 18
|
|
ws.column_dimensions["L"].width = 18
|
|
ws.column_dimensions["M"].width = 18
|
|
|
|
if detail_rows:
|
|
detail_chart = BarChart()
|
|
detail_chart.type = "bar"
|
|
detail_chart.style = 11
|
|
detail_chart.title = "Top Holdings Contribution"
|
|
detail_chart.y_axis.title = "Holding"
|
|
detail_chart.x_axis.title = "Portfolio Weight %"
|
|
detail_chart.height = 8
|
|
detail_chart.width = 14
|
|
# Use the first 15 rows of the detail table for a readable chart.
|
|
chart_end_row = 19 + min(len(detail_rows), 15)
|
|
data_ref2 = Reference(ws, min_col=7, min_row=19, max_row=chart_end_row)
|
|
cats2 = Reference(ws, min_col=4, min_row=20, max_row=chart_end_row)
|
|
detail_chart.add_data(data_ref2, titles_from_data=True)
|
|
detail_chart.set_categories(cats2)
|
|
detail_chart.legend = None
|
|
ws.add_chart(detail_chart, "J20")
|
|
|
|
|
|
def build_sector_summary(wb, data: dict) -> None:
|
|
ws = wb.create_sheet("sector_trend_summary")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"섹터 동향 분석 요약",
|
|
"ETF 프록시, 스마트머니 유입, 수익률, 유동성 경고를 한 장에 요약한 시트",
|
|
end_col=8,
|
|
)
|
|
summary = data.get("summary") or {}
|
|
concentration = data.get("concentration") or {}
|
|
items = [
|
|
("formula_id", data.get("formula_id", "")),
|
|
("gate", data.get("gate", "")),
|
|
("latest_snapshot_date", data.get("latest_snapshot_date", "")),
|
|
("previous_snapshot_date", data.get("previous_snapshot_date", "")),
|
|
("sector_count", data.get("sector_count", 0)),
|
|
("trend_posture", summary.get("trend_posture", "")),
|
|
("rising_count", summary.get("rising_count", 0)),
|
|
("fading_count", summary.get("fading_count", 0)),
|
|
("stable_count", summary.get("stable_count", 0)),
|
|
("etf_proxy_count", summary.get("etf_proxy_count", 0)),
|
|
("smart_money_inflow_count", summary.get("smart_money_inflow_count", 0)),
|
|
("smart_money_outflow_count", summary.get("smart_money_outflow_count", 0)),
|
|
("flow_aligned_count", summary.get("flow_aligned_count", 0)),
|
|
("flow_diverging_count", summary.get("flow_diverging_count", 0)),
|
|
]
|
|
add_kpi_block(ws, 4, items)
|
|
ws["D4"] = "Concentration"
|
|
ws["D4"].fill = SUBHEADER_FILL
|
|
ws["D4"].font = BOLD_FONT
|
|
for idx, (label, value) in enumerate(
|
|
[
|
|
("top_sector", concentration.get("top_sector", "")),
|
|
("top_sector_weight_pct", concentration.get("top_sector_weight_pct", 0)),
|
|
("top2_weight_pct", concentration.get("top2_weight_pct", 0)),
|
|
("concentration_gate", concentration.get("concentration_gate", "")),
|
|
],
|
|
start=5,
|
|
):
|
|
ws.cell(idx, 4).value = label
|
|
ws.cell(idx, 4).fill = KPI_LABEL_FILL
|
|
ws.cell(idx, 4).font = BOLD_FONT
|
|
ws.cell(idx, 5).value = value
|
|
ws.cell(idx, 5).fill = KPI_VALUE_FILL
|
|
|
|
top_inflow = summary.get("top_inflow_sectors") or []
|
|
outflow = summary.get("outflow_warning_sectors") or []
|
|
ws["G4"] = "Top Inflow"
|
|
ws["G4"].fill = SUBHEADER_FILL
|
|
ws["G4"].font = BOLD_FONT
|
|
for i, item in enumerate(top_inflow, start=5):
|
|
ws.cell(i, 7).value = item
|
|
ws["H4"] = "Outflow Warning"
|
|
ws["H4"].fill = SUBHEADER_FILL
|
|
ws["H4"].font = BOLD_FONT
|
|
for i, item in enumerate(outflow, start=5):
|
|
ws.cell(i, 8).value = item
|
|
|
|
ws["A20"] = "Notes"
|
|
ws["A20"].fill = SUBHEADER_FILL
|
|
ws["A20"].font = BOLD_FONT
|
|
ws["A21"] = "섹터별 ETF 프록시와 스마트머니 방향이 다르면 매수 근거를 보수적으로 해석해야 합니다."
|
|
ws["A21"].alignment = Alignment(wrap_text=True)
|
|
ws["A22"] = "데이터 결측은 하네스 업데이트가 필요합니다."
|
|
ws["A22"].alignment = Alignment(wrap_text=True)
|
|
|
|
chart = LineChart()
|
|
chart.title = "Average Sector Score / Breadth Trend"
|
|
chart.y_axis.title = "Score / Count"
|
|
chart.x_axis.title = "Snapshot"
|
|
chart.height = 7.5
|
|
chart.width = 13
|
|
timeline_sheet = wb["sector_trend_timeline"]
|
|
max_row = timeline_sheet.max_row
|
|
data_ref = Reference(timeline_sheet, min_col=13, min_row=4, max_row=max_row, max_col=17)
|
|
cats = Reference(timeline_sheet, min_col=12, min_row=5, max_row=max_row)
|
|
chart.add_data(data_ref, titles_from_data=True, from_rows=False)
|
|
chart.set_categories(cats)
|
|
chart.style = 2
|
|
ws.add_chart(chart, "G12")
|
|
|
|
set_col_widths(ws, {"A": 28, "B": 18, "C": 16, "D": 24, "E": 16, "F": 18, "G": 24, "H": 24})
|
|
|
|
|
|
def build_sector_analysis(wb, data: dict) -> None:
|
|
ws = wb.create_sheet("sector_trend_analysis")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"섹터 동향 분석",
|
|
"섹터별 ETF 프록시, 스마트머니 유입, 수익률, 유동성 방향을 함께 보는 상세 시트",
|
|
end_col=18,
|
|
)
|
|
headers = [
|
|
"sector", "proxy_ticker", "proxy_name", "proxy_type", "etf_code",
|
|
"etf_execution_use", "etf_liquidity_score", "etf_liquidity_status", "etf_nav_risk",
|
|
"proxy_confidence", "rank", "rank_delta_w1", "rank_delta_w2", "sector_score",
|
|
"score_delta", "sector_ret5d", "sector_ret20d", "etf_return_5d", "etf_return_20d",
|
|
"sector_etf_ret_gap_5d", "sector_etf_ret_gap_20d", "smart_money_5d_krw_raw",
|
|
"smart_money_20d_krw_raw", "smart_money_direction", "liquidity_direction",
|
|
"flow_alignment_state", "momentum_state", "concentration_weight_pct"
|
|
]
|
|
rows = []
|
|
for row in data.get("rows") or []:
|
|
rows.append([row.get(h, "") for h in headers])
|
|
write_table(ws, 4, 1, headers, rows)
|
|
ws.auto_filter.ref = f"A4:{get_column_letter(len(headers))}{4 + len(rows)}"
|
|
ws.freeze_panes = "A5"
|
|
for col in ["F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", "AA", "AB"]:
|
|
ws.column_dimensions[col].width = 16
|
|
ws.column_dimensions["C"].width = 18
|
|
ws.column_dimensions["A"].width = 16
|
|
ws.column_dimensions["B"].width = 12
|
|
ws.column_dimensions["D"].width = 12
|
|
ws.column_dimensions["E"].width = 12
|
|
ws.column_dimensions["J"].width = 14
|
|
ws.column_dimensions["P"].width = 12
|
|
ws.column_dimensions["Q"].width = 12
|
|
ws.column_dimensions["AA"].width = 18
|
|
ws.column_dimensions["AB"].width = 18
|
|
|
|
chart = BarChart()
|
|
chart.type = "bar"
|
|
chart.style = 10
|
|
chart.title = "Sector 20D Return by Sector"
|
|
chart.y_axis.title = "Sector"
|
|
chart.x_axis.title = "20D Return"
|
|
chart.height = 8
|
|
chart.width = 14
|
|
data_ref = Reference(ws, min_col=17, min_row=4, max_row=4 + len(rows))
|
|
cats = Reference(ws, min_col=1, min_row=5, max_row=4 + len(rows))
|
|
chart.add_data(data_ref, titles_from_data=True)
|
|
chart.set_categories(cats)
|
|
chart.legend = None
|
|
ws.add_chart(chart, "AD4")
|
|
|
|
|
|
def build_sector_timeline(wb, data: dict, source_data: dict | None = None) -> None:
|
|
ws = wb.create_sheet("sector_trend_timeline")
|
|
style_sheet(ws)
|
|
style_title(ws, "섹터 시계열", "최근 스냅샷 기준 경향성 추세", end_col=10)
|
|
headers = [
|
|
"snapshot_date", "sector_count", "avg_sector_score", "top_sector", "top_sector_score",
|
|
"positive_breadth_count", "liquidity_warn_count", "net_smart_money_5d_krw",
|
|
"top_sector_rank", "top_sector_smart_money_5d_krw"
|
|
]
|
|
rows = []
|
|
for row in data.get("timeline") or []:
|
|
parsed_date = row.get("snapshot_date", "")
|
|
if isinstance(parsed_date, str) and parsed_date:
|
|
try:
|
|
parsed_date = datetime.fromisoformat(parsed_date.replace("Z", "+00:00")).date()
|
|
except Exception:
|
|
pass
|
|
rows.append([
|
|
parsed_date,
|
|
row.get("sector_count", ""),
|
|
row.get("avg_sector_score", ""),
|
|
row.get("top_sector", ""),
|
|
row.get("top_sector_score", ""),
|
|
row.get("positive_breadth_count", ""),
|
|
row.get("liquidity_warn_count", ""),
|
|
row.get("net_smart_money_5d_krw", ""),
|
|
row.get("top_sector_rank", ""),
|
|
row.get("top_sector_smart_money_5d_krw", ""),
|
|
])
|
|
write_table(ws, 4, 1, headers, rows)
|
|
helper_headers = [
|
|
"snapshot_date", "avg_sector_score", "top_sector_score",
|
|
"positive_breadth_count", "liquidity_warn_count", "net_smart_money_5d_krw"
|
|
]
|
|
helper_rows = []
|
|
for row in rows:
|
|
helper_rows.append([row[0], row[2], row[4], row[5], row[6], row[7]])
|
|
write_table(ws, 4, 12, helper_headers, helper_rows)
|
|
ws.freeze_panes = "A5"
|
|
ws.column_dimensions["A"].width = 14
|
|
ws.column_dimensions["B"].width = 12
|
|
ws.column_dimensions["C"].width = 14
|
|
ws.column_dimensions["D"].width = 16
|
|
ws.column_dimensions["E"].width = 14
|
|
ws.column_dimensions["F"].width = 16
|
|
ws.column_dimensions["G"].width = 16
|
|
ws.column_dimensions["H"].width = 18
|
|
ws.column_dimensions["I"].width = 14
|
|
ws.column_dimensions["J"].width = 18
|
|
|
|
chart = LineChart()
|
|
chart.title = "Trend Score / Breadth / Liquidity"
|
|
chart.y_axis.title = "Count / Score"
|
|
chart.x_axis.title = "Snapshot"
|
|
chart.height = 8
|
|
chart.width = 15
|
|
data_ref = Reference(ws, min_col=13, max_col=17, min_row=4, max_row=4 + len(helper_rows))
|
|
cats = Reference(ws, min_col=12, min_row=5, max_row=4 + len(helper_rows))
|
|
chart.add_data(data_ref, titles_from_data=True, from_rows=False)
|
|
chart.set_categories(cats)
|
|
chart.style = 3
|
|
ws.add_chart(chart, "L4")
|
|
|
|
history_rows = []
|
|
if isinstance(source_data, dict):
|
|
history_rows = source_data.get("sector_flow_history") or []
|
|
if not history_rows:
|
|
history_rows = data.get("timeline_history") or data.get("history") or []
|
|
if isinstance(history_rows, list) and history_rows:
|
|
history_by_sector: dict[str, list[dict[str, object]]] = {}
|
|
for item in history_rows:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
sector = str(item.get("Sector") or "").strip()
|
|
if not sector:
|
|
continue
|
|
history_by_sector.setdefault(sector, []).append(item)
|
|
|
|
top_sectors = []
|
|
for row in rows[:3]:
|
|
if len(row) > 3 and row[3]:
|
|
top_sectors.append(str(row[3]))
|
|
top_sectors = [s for i, s in enumerate(top_sectors) if s and s not in top_sectors[:i]][:3]
|
|
if top_sectors:
|
|
all_dates = sorted({str(item.get("Snapshot_Date") or "") for item in history_rows if str(item.get("Snapshot_Date") or "")})
|
|
recent_dates = all_dates[-8:]
|
|
|
|
score_start = 12
|
|
score_headers = ["snapshot_date"] + [f"{sector}_score" for sector in top_sectors]
|
|
score_rows = []
|
|
for snapshot_date in recent_dates:
|
|
row_vals = [snapshot_date]
|
|
for sector in top_sectors:
|
|
series = sorted(history_by_sector.get(sector, []), key=lambda r: str(r.get("Snapshot_Date") or ""))
|
|
match = next((r for r in series if str(r.get("Snapshot_Date") or "") == snapshot_date), {})
|
|
row_vals.append(match.get("Sector_Score", ""))
|
|
score_rows.append(row_vals)
|
|
write_table(ws, 4, score_start, score_headers, score_rows)
|
|
ws.column_dimensions[get_column_letter(score_start)].width = 14
|
|
for offset in range(1, len(score_headers)):
|
|
ws.column_dimensions[get_column_letter(score_start + offset)].width = 16
|
|
|
|
score_chart = LineChart()
|
|
score_chart.title = "Top Sector Score Trend"
|
|
score_chart.y_axis.title = "Score"
|
|
score_chart.x_axis.title = "Snapshot"
|
|
score_chart.height = 8
|
|
score_chart.width = 15
|
|
score_chart.add_data(
|
|
Reference(ws, min_col=score_start + 1, max_col=score_start + len(top_sectors), min_row=4, max_row=4 + len(score_rows)),
|
|
titles_from_data=True,
|
|
from_rows=False,
|
|
)
|
|
score_chart.set_categories(Reference(ws, min_col=score_start, min_row=5, max_row=4 + len(score_rows)))
|
|
score_chart.style = 2
|
|
ws.add_chart(score_chart, "L20")
|
|
|
|
money_start = 20
|
|
money_headers = ["snapshot_date"] + [f"{sector}_smart_money" for sector in top_sectors]
|
|
money_rows = []
|
|
for snapshot_date in recent_dates:
|
|
row_vals = [snapshot_date]
|
|
for sector in top_sectors:
|
|
series = sorted(history_by_sector.get(sector, []), key=lambda r: str(r.get("Snapshot_Date") or ""))
|
|
match = next((r for r in series if str(r.get("Snapshot_Date") or "") == snapshot_date), {})
|
|
row_vals.append(match.get("SmartMoney_5D_KRW", ""))
|
|
money_rows.append(row_vals)
|
|
write_table(ws, 4, money_start, money_headers, money_rows)
|
|
ws.column_dimensions[get_column_letter(money_start)].width = 14
|
|
for offset in range(1, len(money_headers)):
|
|
ws.column_dimensions[get_column_letter(money_start + offset)].width = 18
|
|
|
|
money_chart = LineChart()
|
|
money_chart.title = "Top Sector Smart Money Trend"
|
|
money_chart.y_axis.title = "KRW"
|
|
money_chart.x_axis.title = "Snapshot"
|
|
money_chart.height = 8
|
|
money_chart.width = 15
|
|
money_chart.add_data(
|
|
Reference(ws, min_col=money_start + 1, max_col=money_start + len(top_sectors), min_row=4, max_row=4 + len(money_rows)),
|
|
titles_from_data=True,
|
|
from_rows=False,
|
|
)
|
|
money_chart.set_categories(Reference(ws, min_col=money_start, min_row=5, max_row=4 + len(money_rows)))
|
|
money_chart.style = 3
|
|
ws.add_chart(money_chart, "L36")
|
|
|
|
|
|
def build_etf_summary(wb, data: dict) -> None:
|
|
ws = wb.create_sheet("etf_representative_summary")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"ETF 대표 종목 요약",
|
|
"ETF 구성비중 우선, 부족분은 유동성 우선 후보로 보강한 3종목 바스켓 요약",
|
|
end_col=8,
|
|
)
|
|
summary = data.get("summary") or {}
|
|
items = [
|
|
("formula_id", data.get("formula_id", "")),
|
|
("gate", data.get("gate", "")),
|
|
("etf_sector_count", data.get("etf_sector_count", 0)),
|
|
("tracked_count", data.get("tracked_count", 0)),
|
|
("complete_basket_count", summary.get("complete_basket_count", 0)),
|
|
("partial_basket_count", summary.get("partial_basket_count", 0)),
|
|
("basket_missing_total", summary.get("basket_missing_total", 0)),
|
|
("weighted_basis_count", summary.get("weighted_basis_count", 0)),
|
|
("fallback_basis_count", summary.get("fallback_basis_count", 0)),
|
|
("selected_sector_count", summary.get("selected_sector_count", 0)),
|
|
]
|
|
add_kpi_block(ws, 4, items)
|
|
ws["D4"] = "Representative principle"
|
|
ws["D4"].fill = SUBHEADER_FILL
|
|
ws["D4"].font = BOLD_FONT
|
|
ws["D5"] = "1) ETF constituent weight first"
|
|
ws["D6"] = "2) Missing slots filled with same-sector live candidates"
|
|
ws["D7"] = "3) Missing data stays explicit as DATA_MISSING"
|
|
ws["D8"] = "4) Minimum 3 names per sector basket"
|
|
ws["G4"] = "Top reps"
|
|
ws["G4"].fill = SUBHEADER_FILL
|
|
ws["G4"].font = BOLD_FONT
|
|
for i, item in enumerate(summary.get("top_rep_names") or [], start=5):
|
|
ws.cell(i, 7).value = item
|
|
set_col_widths(ws, {"A": 28, "B": 18, "C": 16, "D": 30, "E": 18, "F": 18, "G": 24, "H": 24})
|
|
|
|
|
|
def build_etf_monitor(wb, data: dict) -> None:
|
|
ws = wb.create_sheet("etf_representative_monitor")
|
|
style_sheet(ws)
|
|
style_title(
|
|
ws,
|
|
"ETF 대표 종목 모니터",
|
|
"섹터별 3종목 바스켓과 선택 근거, 커버리지, 품질 상태를 추적",
|
|
end_col=18,
|
|
)
|
|
headers = [
|
|
"sector", "etf_proxy_ticker", "etf_proxy_name", "etf_proxy_type", "sector_rank",
|
|
"sector_score", "sector_smart_money_5d_krw", "sector_ret20d", "representative_count",
|
|
"representative_ticker", "representative_name", "representative_basis",
|
|
"representative_basis_detail", "constituent_weight", "basket_quality_state",
|
|
"basket_coverage_pct", "basket_state", "basket_buy_review_count",
|
|
"basket_track_count", "basket_watch_count", "basket_caution_count",
|
|
"basket_aligned_count", "basket_missing_count", "basket_real_count",
|
|
"selection_source", "selection_score", "monitor_reason"
|
|
]
|
|
rows = []
|
|
for row in data.get("rows") or []:
|
|
rows.append([row.get(h, "") for h in headers])
|
|
write_table(ws, 4, 1, headers, rows)
|
|
ws.auto_filter.ref = f"A4:{get_column_letter(len(headers))}{4 + len(rows)}"
|
|
ws.freeze_panes = "A5"
|
|
for col, width in {"A": 18, "B": 12, "C": 16, "D": 12, "E": 12, "F": 12, "G": 18, "H": 12,
|
|
"I": 14, "J": 12, "K": 18, "L": 18, "M": 24, "N": 14, "O": 14,
|
|
"P": 14, "Q": 14, "R": 14, "S": 14, "T": 14, "U": 14, "V": 14,
|
|
"W": 14, "X": 18, "Y": 14, "Z": 12, "AA": 24}.items():
|
|
ws.column_dimensions[col].width = width
|
|
|
|
chart = BarChart()
|
|
chart.type = "bar"
|
|
chart.style = 10
|
|
chart.title = "Basket Coverage by Sector"
|
|
chart.y_axis.title = "Sector"
|
|
chart.x_axis.title = "Coverage %"
|
|
chart.height = 8
|
|
chart.width = 14
|
|
data_ref = Reference(ws, min_col=16, min_row=4, max_row=4 + len(rows))
|
|
cats = Reference(ws, min_col=1, min_row=5, max_row=4 + len(rows))
|
|
chart.add_data(data_ref, titles_from_data=True)
|
|
chart.set_categories(cats)
|
|
chart.legend = None
|
|
ws.add_chart(chart, "AC4")
|
|
|
|
|
|
def main() -> None:
|
|
if not INPUT_XLSX.exists():
|
|
raise FileNotFoundError(INPUT_XLSX)
|
|
if not SECTOR_JSON.exists():
|
|
raise FileNotFoundError(SECTOR_JSON)
|
|
if not ETF_JSON.exists():
|
|
raise FileNotFoundError(ETF_JSON)
|
|
raw_json_path = ROOT / "GatherTradingData.json"
|
|
|
|
sector = load_json(SECTOR_JSON)
|
|
etf = load_json(ETF_JSON)
|
|
raw_data = load_json(raw_json_path) if raw_json_path.exists() else {}
|
|
raw_source = raw_data.get("data", {}) if isinstance(raw_data.get("data"), dict) else {}
|
|
|
|
wb = load_workbook(INPUT_XLSX)
|
|
for name in [
|
|
"portfolio_performance_summary",
|
|
"portfolio_sector_exposure",
|
|
"_portfolio_holdings_helper",
|
|
"sector_trend_summary",
|
|
"sector_trend_analysis",
|
|
"sector_trend_timeline",
|
|
"etf_representative_summary",
|
|
"etf_representative_monitor",
|
|
]:
|
|
remove_if_exists(wb, name)
|
|
|
|
# Build data sheets first so summary sheets can reference the timeline sheet.
|
|
build_portfolio_summary(wb)
|
|
build_portfolio_sector_exposure(wb)
|
|
build_sector_timeline(wb, sector, raw_source)
|
|
build_sector_analysis(wb, sector)
|
|
build_sector_summary(wb, sector)
|
|
build_etf_monitor(wb, etf)
|
|
build_etf_summary(wb, etf)
|
|
|
|
# Put summary sheets near the front.
|
|
order = [
|
|
"settings",
|
|
"portfolio_performance_summary",
|
|
"portfolio_sector_exposure",
|
|
"sector_trend_summary",
|
|
"sector_trend_analysis",
|
|
"sector_trend_timeline",
|
|
"etf_representative_summary",
|
|
"etf_representative_monitor",
|
|
]
|
|
existing = [s for s in wb.sheetnames if s not in order]
|
|
wb._sheets = [wb[s] for s in order if s in wb.sheetnames] + [wb[s] for s in existing]
|
|
if "_portfolio_holdings_helper" in wb.sheetnames:
|
|
wb["_portfolio_holdings_helper"].sheet_state = "hidden"
|
|
wb.active = wb.sheetnames.index("sector_trend_summary")
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
wb.save(OUTPUT_XLSX)
|
|
print(f"saved {OUTPUT_XLSX}")
|
|
print("sheets", wb.sheetnames[:10])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|