#!/usr/bin/env python3 """ render_operational_report.py — 30개 섹션 완전 렌더링. 섹션 처리 오류는 section_errors 배열에 기록되어 하네스 검증에 노출된다. """ from __future__ import annotations import argparse import json import sys from datetime import datetime, timezone from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from src.quant_engine.etf_representative_monitor import build_etf_representative_monitor from src.quant_engine.sector_trend_analysis import build_sector_trend_analysis SECTION_ORDER = [ "exec_safety_declaration", "final_judgment_table", "final_execution_decision", "concise_hts_input_sheet", "watch_breakout_gate", "single_conclusion", "immediate_execution_playbook", "market_context_learning_note", "portfolio_performance_summary", "portfolio_sector_exposure_summary", "sector_trend_analysis_v1", "etf_representative_monitor_v1", "investment_quality_headline", "operational_truth_score", "execution_readiness_matrix", "pass_100_criteria", "today_decision_summary_card", "routing_serving_trace", "export_gate_diagnosis", "QEH_AUDIT_BLOCK", "fundamental_quality_gate_v1", "horizon_allocation_lock_v1", "smart_money_liquidity_gate_v1", "routing_serving_trace_v2", "fundamental_multifactor_v2", "earnings_growth_quality_v1", "market_share_proxy_v1", "cashflow_stability_v1", "routing_decision_explain_v1", "benchmark_relative_harness_table", "index_relative_health_table", "entry_freshness_gate_table", "sell_value_preservation_gate_table", "watch_release_checklist", "alpha_feedback_loop_report", "backdata_feature_bank_table", "alpha_lead_table", "anti_distribution_table", "profit_preservation_table", "smart_cash_raise_table", "execution_quality_table", "sell_priority_decision_table", "strategy_performance_scoreboard", "outcome_eval_window_monitor", "decision_trace_table", "anti_whipsaw_reentry_gate", "proposal_reference_sheet", "satellite_buy_proposal_sheet", "core_satellite_timing_gate_table", "engine_feedback_loop_report", "prediction_evaluation_improvement_report", "rule_lifecycle_governance_report", ] SECTION_TITLES = { "exec_safety_declaration": "집행 안전 선언", "final_judgment_table": "최종 판단 테이블", "final_execution_decision": "최종 실행 결정", "concise_hts_input_sheet": "HTS 입력 요약표", "watch_breakout_gate": "투명한 감시 원장 / 돌파 감시 게이트", "single_conclusion": "단일 결론", "immediate_execution_playbook": "즉시 실행 플레이북", "market_context_learning_note": "시장 컨텍스트 학습 노트", "portfolio_performance_summary": "포트폴리오 성과 요약", "portfolio_sector_exposure_summary": "포트폴리오 섹터 노출", "sector_trend_analysis_v1": "섹터 동향 분석", "etf_representative_monitor_v1": "ETF 대표 종목 모니터", "investment_quality_headline": "투자 품질 헤드라인", "operational_truth_score": "운영 진실성 점수", "execution_readiness_matrix": "실행 준비도 매트릭스", "pass_100_criteria": "PASS_100 기준", "today_decision_summary_card": "오늘의 의사결정 요약 카드", "routing_serving_trace": "라우팅 서빙 추적", "export_gate_diagnosis": "내보내기 게이트 진단", "QEH_AUDIT_BLOCK": "QEH 감사 블록", "fundamental_quality_gate_v1": "FUNDAMENTAL_QUALITY_GATE_V1", "horizon_allocation_lock_v1": "HORIZON_ALLOCATION_LOCK_V1", "smart_money_liquidity_gate_v1": "SMART_MONEY_LIQUIDITY_GATE_V1", "routing_serving_trace_v2": "ROUTING_SERVING_DECISION_TRACE_V2", "fundamental_multifactor_v2": "FUNDAMENTAL_MULTI_FACTOR_SCORE_V2", "earnings_growth_quality_v1": "EARNINGS_GROWTH_QUALITY_GATE_V1", "market_share_proxy_v1": "MARKET_SHARE_MOMENTUM_PROXY_V1", "cashflow_stability_v1": "CASHFLOW_STABILITY_GATE_V1", "routing_decision_explain_v1": "ROUTING_DECISION_EXPLAIN_LOCK_V1", "benchmark_relative_harness_table": "benchmark_relative_harness_table", "index_relative_health_table": "index_relative_health_table", "entry_freshness_gate_table": "entry_freshness_gate_table", "sell_value_preservation_gate_table": "sell_value_preservation_gate_table", "watch_release_checklist": "watch_release_checklist", "alpha_feedback_loop_report": "alpha_feedback_loop_report", "backdata_feature_bank_table": "백데이터 특성 원장", "alpha_lead_table": "알파 선행 테이블", "anti_distribution_table": "분산 매도 위험 테이블", "profit_preservation_table": "수익 보존 테이블", "smart_cash_raise_table": "현금 확보 테이블", "execution_quality_table": "체결 품질 테이블", "sell_priority_decision_table": "매도 우선순위 결정 테이블", "strategy_performance_scoreboard": "전략 성과 스코어보드", "outcome_eval_window_monitor": "성과 평가 윈도우 모니터", "decision_trace_table": "판단 추적 테이블", "anti_whipsaw_reentry_gate": "반등 재진입 감시 게이트", "proposal_reference_sheet": "제안 참조 시트", "satellite_buy_proposal_sheet": "위성 신규 매수 제안 원장", "core_satellite_timing_gate_table": "코어·위성 타이밍 게이트", "engine_feedback_loop_report": "엔진 피드백 루프 보고서", "prediction_evaluation_improvement_report": "예측 평가 보고서", "rule_lifecycle_governance_report": "규칙 생애주기 거버넌스 보고서", } # ── 공통 유틸 ───────────────────────────────────────────────────────────────── def _sj(v: Any) -> Any: if isinstance(v, (list, dict)): return v if isinstance(v, str): s = v.strip() if s and s[0] in ('[', '{'): try: return json.loads(s) except Exception: pass return v def _kv(rows: list[tuple[str, Any]]) -> str: lines = ["| 항목 | 값 |", "| --- | --- |"] for k, v in rows: lines.append(f"| {k} | {v} |") return "\n".join(lines) def _tbl(items: list[dict], keys: list[str], max_rows: int = 50) -> str: if not items: return "_데이터 없음_" valid_keys = [k for k in keys if k] if not valid_keys: valid_keys = list(items[0].keys())[:6] if isinstance(items[0], dict) else [] header = "| " + " | ".join(valid_keys) + " |" sep = "| " + " | ".join(["---"] * len(valid_keys)) + " |" rows = [] for item in items[:max_rows]: row = "| " + " | ".join(str(item.get(k, "")).replace("|", "ㅣ") for k in valid_keys) + " |" rows.append(row) suffix = f"\n\n_...총 {len(items)}행 중 {max_rows}행 표시_" if len(items) > max_rows else "" all_lines = [header, sep] all_lines.extend(rows) return "".join(["\n".join(all_lines), suffix]) def _err(section_errors: list, name: str, reason: str) -> str: section_errors.append({"section": name, "error": reason}) return f"**[오류] {name}: {reason}**" def _first_keys(items: list, n: int = 6) -> list[str]: if items and isinstance(items[0], dict): return list(items[0].keys())[:n] return [] def _num(value: Any, default: float = 0.0) -> float: try: return float(value) except Exception: return default def _sparkline(values: list[Any]) -> str: points: list[float] = [] for value in values: try: points.append(float(value)) except Exception: continue if not points: return "n/a" lo = min(points) hi = max(points) bars = "▁▂▃▄▅▆▇█" if hi == lo: return bars[len(bars) // 2] * len(points) out = [] for value in points: idx = int(round((value - lo) / (hi - lo) * (len(bars) - 1))) out.append(bars[max(0, min(len(bars) - 1, idx))]) return "".join(out) # ── PHASE-0 렌더러 ──────────────────────────────────────────────────────────── def _exec_safety_declaration(hctx: dict, se: list) -> str: cr = _sj(hctx.get("consistency_report_json", {})) if not isinstance(cr, dict): return _err(se, "exec_safety_declaration", "consistency_report_json 파싱 실패") allowed = hctx.get("allowed_actions", []) blocked = hctx.get("blocked_actions", []) return "## CORE-0 집행 안전 선언\n\n" + _kv([ ("일관성 점수", cr.get("consistency_score", hctx.get("consistency_score", ""))), ("CV 판정", cr.get("cv_verdict", hctx.get("cv_verdict", ""))), ("차단 상태", cr.get("block_status", "")), ("현금 바닥 상태", hctx.get("cash_floor_status", "")), ("허용 액션", ", ".join(allowed) if isinstance(allowed, list) else str(allowed)), ("차단 액션", ", ".join(blocked) if isinstance(blocked, list) else str(blocked)), ("하네스 생성 상태", hctx.get("harness_generation_status", "N/A")), ]) def _final_judgment_table(hctx: dict, se: list) -> str: items = _sj(hctx.get("decisions_json", [])) if not isinstance(items, list) or not items: return _err(se, "final_judgment_table", "decisions_json 없음") return _tbl(items, ["ticker", "name", "base_action", "final_action", "gate_changed", "rs_verdict"]) def _final_execution_decision(hctx: dict, se: list) -> str: eg = _sj(hctx.get("export_gate_json", {})) if not isinstance(eg, dict): return _err(se, "final_execution_decision", "export_gate_json 파싱 실패") failed = eg.get("failed_checks", []) return _kv([ ("내보내기 게이트 상태", eg.get("export_gate_status", "")), ("JSON 검증 상태", eg.get("json_validation_status", "")), ("HTS 입력 허용", eg.get("hts_entry_allowed", "")), ("모든 검사 통과", eg.get("all_checks_passed", "")), ("실패 검사", ", ".join(str(f) for f in failed) if isinstance(failed, list) and failed else "없음"), ("CLA 종료 상태", hctx.get("cla_exit_status", "N/A")), ("하네스 생성 상태", hctx.get("harness_generation_status", "N/A")), ]) def _concise_hts_input_sheet(hctx: dict, se: list) -> str: items = _sj(hctx.get("decisions_json", [])) if not isinstance(items, list) or not items: return _err(se, "concise_hts_input_sheet", "decisions_json 없음") return _tbl(items, ["ticker", "name", "final_action", "gate_trace", "rs_verdict"]) def _watch_breakout_gate(hctx: dict, se: list) -> str: bq = _sj(hctx.get("breakout_quality_gate_json", [])) vel = _sj(hctx.get("anti_chasing_velocity_json", [])) parts = [_kv([ ("돌파 감시 판정", hctx.get("anti_chasing_verdict", "N/A")), ("돌파 품질 점수", hctx.get("breakout_quality_score", "N/A")), ("기준시점(종가/장중)", hctx.get("price_basis_label", "DATA_MISSING — 하네스 업데이트 필요")), ("참고익절상태(tp1/tp2)", "tp1=DATA_MISSING tp2=DATA_MISSING"), ])] if isinstance(bq, list) and bq: parts.append("\n\n**돌파 품질 게이트**\n\n" + _tbl(bq, _first_keys(bq))) if isinstance(vel, list) and vel: parts.append("\n\n**반추격 속도**\n\n" + _tbl(vel, _first_keys(vel))) return "".join(parts) # ── PHASE-1 렌더러 ──────────────────────────────────────────────────────────── def _single_conclusion(hctx: dict, se: list) -> str: allowed = hctx.get("allowed_actions", []) blocked = hctx.get("blocked_actions", []) return _kv([ ("현금 현황 (D2%)", hctx.get("cash_current_pct_d2", "")), ("현금 목표(%)", hctx.get("cash_target_pct", "")), ("현금 바닥 상태", hctx.get("cash_floor_status", "")), ("허용 액션", ", ".join(allowed) if isinstance(allowed, list) else str(allowed)), ("차단 액션", ", ".join(blocked) if isinstance(blocked, list) else str(blocked)), ("매수 여력 (KRW)", hctx.get("buy_power_krw", "")), ("현금 부족액 (KRW)", hctx.get("cash_shortfall_min_krw", "")), ("목표 달성율(%)", hctx.get("goal_achievement_pct", "")), ("목표 상태", hctx.get("goal_status", "")), ]) def _immediate_execution_playbook(hctx: dict, se: list) -> str: items = _sj(hctx.get("decisions_json", [])) plan = _sj(hctx.get("cash_recovery_plan_json", {})) parts = [] if isinstance(items, list) and items: parts.append("**실행 결정**\n\n" + _tbl(items, ["ticker", "name", "final_action", "gate_trace"])) else: parts.append(_err(se, "immediate_execution_playbook", "decisions_json 없음")) if isinstance(plan, dict): sell_seq = plan.get("sell_sequence", "") parts.append("\n\n**현금 회수 계획**\n\n" + _kv([ ("매도 시퀀스", str(sell_seq)[:120]), ("예상 즉시 회수 (KRW)", plan.get("expected_total_krw", "")), ("부족액 충족", plan.get("shortfall_met", "")), ("필요 건수", plan.get("items_needed", "")), ])) return "".join(parts) def _market_context_learning_note(hctx: dict, se: list) -> str: macro = _sj(hctx.get("macro_event_json", {})) regime = _sj(hctx.get("regime_transition_json", {})) rows = [("BRT 판정", hctx.get("brt_verdict", "N/A"))] if isinstance(macro, dict): rows += [ ("매크로 위험 점수", macro.get("macro_risk_score", "")), ("매크로 위험 레짐", macro.get("macro_risk_regime", "")), ] if isinstance(regime, dict): rows += [ ("레짐 전환 유형", regime.get("transition_type", "")), ("이전 레짐", regime.get("prev_regime", "")), ("현재 레짐", regime.get("current_regime", regime.get("cur_regime", ""))), ] rows.append(("열 게이트 상태", hctx.get("heat_gate_status", "N/A"))) return _kv(rows) def _portfolio_performance_summary(data_root: dict, hctx: dict, se: list) -> str: def _display(v: Any) -> Any: return v if v not in (None, "") else "DATA_MISSING — 하네스 업데이트 필요" data = data_root.get("data", {}) if isinstance(data_root.get("data"), dict) else {} daily = _sj(data.get("daily_history", [])) monthly = _sj(data.get("monthly_history", [])) account = _sj(data.get("account_snapshot", [])) if not isinstance(daily, list): daily = [] if not isinstance(monthly, list): monthly = [] if not isinstance(account, list): account = [] latest_daily = daily[-1] if daily else {} latest_month = monthly[-1] if monthly else {} latest_capture = "" latest_holdings: list[dict[str, Any]] = [] for row in account: if not isinstance(row, dict): continue cap = str(row.get("captured_at", "") or "") if cap and cap >= latest_capture: latest_capture = cap if latest_capture: latest_holdings = [r for r in account if isinstance(r, dict) and str(r.get("captured_at", "") or "") == latest_capture] asset_series = [] mdd_series = [] monthly_return_series = [] for row in daily[-10:]: if isinstance(row, dict): asset_series.append(row.get("Total_Asset_KRW", row.get("total_asset_krw", ""))) mdd_series.append(row.get("MDD_Pct", row.get("mdd_pct", ""))) for row in monthly[-10:]: if isinstance(row, dict): monthly_return_series.append(row.get("Actual_Return_Pct", row.get("actual_return_pct", ""))) rows = [ ("최신 일간 자산", _display(latest_daily.get("Total_Asset_KRW", latest_daily.get("total_asset_krw", "")))), ("최신 일간 MDD(%)", _display(latest_daily.get("MDD_Pct", latest_daily.get("mdd_pct", "")))), ("최신 월간 자산", _display(latest_month.get("Total_Asset", latest_month.get("total_asset", "")))), ("최신 월간 실현 수익률(%)", _display(latest_month.get("Actual_Return_Pct", latest_month.get("actual_return_pct", "")))), ("최신 월간 MoM 수익률(%)", _display(latest_month.get("MoM_Return_Pct", latest_month.get("mom_return_pct", "")))), ("최신 월간 YTD 수익률(%)", _display(latest_month.get("YTD_Return_Pct", latest_month.get("ytd_return_pct", "")))), ("최신 스냅샷 시각", _display(latest_capture or hctx.get("captured_at", ""))), ("최신 보유 수", _display(len(latest_holdings))), ] md = "## 포트폴리오 성과 요약\n\n" + _kv(rows) md += "\n\n**일간 자산 추이** \n" + _sparkline(asset_series) md += "\n\n**일간 MDD 추이** \n" + _sparkline(mdd_series) md += "\n\n**월간 수익률 추이** \n" + _sparkline(monthly_return_series) if latest_holdings: md += "\n\n**최신 보유 상위 스냅샷**\n\n" md += _tbl(latest_holdings[:10], ["name", "ticker", "holding_quantity", "market_value", "return_pct"], max_rows=10) else: md += "\n\n_최신 보유 스냅샷 없음_" return md def _portfolio_sector_exposure_summary(data_root: dict, hctx: dict, se: list) -> str: data = data_root.get("data", {}) if isinstance(data_root.get("data"), dict) else {} account = _sj(data.get("account_snapshot", [])) universe = _sj(data.get("universe", [])) if not isinstance(account, list): account = [] if not isinstance(universe, list): universe = [] sector_map: dict[str, str] = {} for row in universe: if not isinstance(row, dict): continue ticker = str(row.get("Ticker", "") or "").zfill(6) sector = str(row.get("Sector", "") or "").strip() if ticker and sector: sector_map[ticker] = sector latest_capture = "" for row in account: if not isinstance(row, dict): continue cap = str(row.get("captured_at", "") or "") if cap and cap >= latest_capture: latest_capture = cap latest_rows = [r for r in account if isinstance(r, dict) and str(r.get("captured_at", "") or "") == latest_capture] if not latest_rows: return "## 포트폴리오 섹터 노출\n\n_섹터 노출 데이터 없음_" exposure: dict[str, dict[str, float]] = {} holdings_by_sector: dict[str, list[dict[str, Any]]] = {} total_mv = 0.0 for row in latest_rows: ticker = str(row.get("ticker", "") or "").zfill(6) sector = sector_map.get(ticker, "미분류") mv = _num(row.get("market_value", 0)) pl = _num(row.get("profit_loss", 0)) cost = _num(row.get("total_cost", 0)) total_mv += mv bucket = exposure.setdefault(sector, {"market_value": 0.0, "profit_loss": 0.0, "cost": 0.0, "count": 0.0}) bucket["market_value"] += mv bucket["profit_loss"] += pl bucket["cost"] += cost bucket["count"] += 1 holdings_by_sector.setdefault(sector, []).append({ "ticker": ticker, "name": row.get("name", ""), "market_value": mv, "profit_loss": pl, "return_pct": row.get("return_pct", ""), }) total_mv = total_mv or 1.0 sector_rows = [] for sector, vals in sorted(exposure.items(), key=lambda kv: kv[1]["market_value"], reverse=True): pct = vals["market_value"] / total_mv * 100.0 ret_pct = (vals["profit_loss"] / vals["cost"] * 100.0) if vals["cost"] else 0.0 sector_rows.append({ "sector": sector, "holding_count": int(vals["count"]), "market_value": round(vals["market_value"], 2), "weight_pct": round(pct, 2), "profit_loss": round(vals["profit_loss"], 2), "return_pct": round(ret_pct, 2), }) top_sector = sector_rows[0]["sector"] if sector_rows else "" top_sector_weight = sector_rows[0]["weight_pct"] if sector_rows else 0 top3_weight = sum(r["weight_pct"] for r in sector_rows[:3]) if sector_rows else 0 weights_line = _sparkline([r["weight_pct"] for r in sector_rows[:10]]) md = "## 포트폴리오 섹터 노출\n\n" md += _kv([ ("최신 스냅샷", latest_capture), ("섹터 수", len(sector_rows)), ("최대 섹터", top_sector), ("Top1 비중(%)", top_sector_weight), ("Top3 비중(%)", top3_weight), ("총 시장가치", round(total_mv, 2)), ("섹터 집중도 그래프", weights_line), ("섹터 집중 게이트", hctx.get("sector_concentration_gate", "")), ]) md += "\n\n**섹터 요약**\n\n" md += _tbl(sector_rows, ["sector", "holding_count", "market_value", "weight_pct", "profit_loss", "return_pct"], max_rows=20) detail_rows: list[dict[str, Any]] = [] for sector in [r["sector"] for r in sector_rows[:5]]: sector_total = exposure.get(sector, {}).get("market_value", 0.0) or 1.0 holdings = sorted(holdings_by_sector.get(sector, []), key=lambda item: _num(item.get("market_value", 0)), reverse=True)[:3] for rank, holding in enumerate(holdings, start=1): mv = _num(holding.get("market_value", 0)) detail_rows.append({ "sector": sector if rank == 1 else "", "rank_in_sector": rank, "ticker": holding.get("ticker", ""), "name": holding.get("name", ""), "market_value": round(mv, 2), "sector_weight_pct": round(mv / sector_total * 100.0, 2), "portfolio_weight_pct": round(mv / total_mv * 100.0, 2), "return_pct": holding.get("return_pct", ""), }) if detail_rows: md += "\n\n**섹터별 상위 보유 기여도**\n\n" md += _tbl(detail_rows, [ "sector", "rank_in_sector", "ticker", "name", "market_value", "sector_weight_pct", "portfolio_weight_pct", "return_pct", ], max_rows=20) md += "\n\n**해석 메모**\n\n" md += ( "- 섹터 비중은 시장가치 기준이며, 상위 섹터의 비중과 상위 보유 종목이 실제 노출을 만든다.\n" "- 같은 섹터 안에서도 상위 3종목이 노출 대부분을 설명하는지 확인해야 한다.\n" "- ETF 프록시와 직접 보유 종목이 엇갈리면, 섹터 베타와 개별 종목 리스크를 분리해서 봐야 한다.\n" ) return md def _sector_trend_analysis_v1(data_root: dict, hctx: dict, se: list) -> str: inner_data = data_root.get("data", {}) if isinstance(data_root.get("data"), dict) else {} payload = {"data": inner_data, "data_root": data_root, "_harness_context": hctx} result = build_sector_trend_analysis(payload) if not isinstance(result, dict) or not result: return _err(se, "sector_trend_analysis_v1", "sector trend analysis unavailable") summary = result.get("summary") if isinstance(result.get("summary"), dict) else {} concentration = result.get("concentration") if isinstance(result.get("concentration"), dict) else {} rows = [ ("최신 스냅샷", result.get("latest_snapshot_date", "")), ("이전 스냅샷", result.get("previous_snapshot_date", "")), ("섹터 수", result.get("sector_count", "")), ("ETF 프록시 섹터 수", summary.get("etf_proxy_count", "")), ("상승 섹터 수", summary.get("rising_count", "")), ("하락 섹터 수", summary.get("fading_count", "")), ("정체 섹터 수", summary.get("stable_count", "")), ("탑아웃 섹터 수", summary.get("topping_out_count", "")), ("양(+) breadth", summary.get("positive_breadth_count", "")), ("스마트자금 유입", summary.get("smart_money_inflow_count", "")), ("스마트자금 유출", summary.get("smart_money_outflow_count", "")), ("수급 정렬", summary.get("flow_aligned_count", "")), ("수급 이탈", summary.get("flow_diverging_count", "")), ("프록시 저신뢰", summary.get("low_proxy_confidence_count", "")), ("트렌드 포지션", summary.get("trend_posture", "")), ("집중 섹터", concentration.get("top_sector", "")), ("집중도 Top1%", concentration.get("top_sector_weight_pct", "")), ("집중도 Top2%", concentration.get("top2_weight_pct", "")), ] md = _kv(rows) md += "\n\n**ETF/수급 교차 진단**\n\n" md += _kv([ ("ETF 프록시 커버리지(%)", result.get("source", {}).get("proxy_coverage_pct", "")), ("유동성 경고 섹터", ", ".join(summary.get("outflow_warning_sectors", [])[:3]) if isinstance(summary.get("outflow_warning_sectors"), list) else ""), ("스마트머니 강세", ", ".join(summary.get("strong_smart_money_sectors", [])[:3]) if isinstance(summary.get("strong_smart_money_sectors"), list) else ""), ]) md += "\n\n**최근 시계열 추세**\n\n" timeline = result.get("timeline") if isinstance(result.get("timeline"), list) else [] if timeline: recent_timeline = timeline[-6:] md += _tbl(recent_timeline, [ "snapshot_date", "sector_count", "avg_sector_score", "top_sector", "top_sector_score", "positive_breadth_count", "liquidity_warn_count", "net_smart_money_5d_krw", ], max_rows=6) score_line = _sparkline([r.get("avg_sector_score") for r in recent_timeline]) money_line = _sparkline([r.get("net_smart_money_5d_krw") for r in recent_timeline]) md += "\n\n| 추세 | 그래프 |\n| --- | --- |\n" md += f"| 섹터 평균 점수 | {score_line} |\n" md += f"| 5D 스마트머니 합계 | {money_line} |\n" else: md += "_시계열 데이터 없음_" md += "\n\n**섹터 상위 유입/경고**\n\n" md += _kv([ ("상위 유입", ", ".join(summary.get("top_inflow_sectors", [])[:3]) or "없음"), ("경고 섹터", ", ".join(summary.get("outflow_warning_sectors", [])[:3]) or "없음"), ("강한 수급", ", ".join(summary.get("strong_smart_money_sectors", [])[:3]) or "없음"), ]) rows_data = result.get("rows") if isinstance(result.get("rows"), list) else [] if rows_data: md += "\n\n**섹터 상세 트렌드**\n\n" + _tbl(rows_data, [ "sector", "proxy_ticker", "proxy_name", "proxy_type", "etf_execution_use", "etf_liquidity_status", "etf_nav_risk", "proxy_confidence", "rank", "rank_delta_w1", "rank_delta_w2", "sector_score", "score_delta", "sector_ret5d", "sector_ret20d", "etf_return_5d", "etf_return_20d", "sector_etf_ret_gap_5d", "sector_etf_ret_gap_20d", "smart_money_5d_krw_raw", "smart_money_20d_krw_raw", "smart_money_direction", "flow_breadth_5d_raw", "liquidity_direction", "flow_alignment_state", "alert_level", "decision_use", "momentum_state", "concentration_weight_pct", ], max_rows=20) history_rows = data_root.get("data", {}).get("sector_flow_history", []) if isinstance(history_rows, list) and history_rows: sector_histories: dict[str, list[dict[str, Any]]] = {} for item in history_rows: if not isinstance(item, dict): continue sector = str(item.get("Sector") or "").strip() if not sector: continue sector_histories.setdefault(sector, []).append(item) tracked = [r.get("sector") for r in rows_data[:6] if r.get("sector")] spark_rows = [] for sector in tracked: series = sorted(sector_histories.get(sector, []), key=lambda r: str(r.get("Snapshot_Date") or "")) latest_row = next((r for r in rows_data if r.get("sector") == sector), {}) spark_rows.append({ "sector": sector, "score_trend": _sparkline([r.get("Sector_Score") for r in series[-6:]]), "smart_money_trend": _sparkline([r.get("SmartMoney_5D_KRW") for r in series[-6:]]), "latest_score": series[-1].get("Sector_Score", "") if series else "", "latest_smart_money_5d": series[-1].get("SmartMoney_5D_KRW", "") if series else "", "sector_ret20d": latest_row.get("sector_ret20d", ""), "smart_money_direction": latest_row.get("smart_money_direction", ""), "flow_alignment_state": latest_row.get("flow_alignment_state", ""), }) if spark_rows: md += "\n\n**섹터별 시계열 그래프**\n\n" md += _tbl(spark_rows, [ "sector", "score_trend", "smart_money_trend", "latest_score", "latest_smart_money_5d", "sector_ret20d", "smart_money_direction", "flow_alignment_state", ], max_rows=6) top3 = [r.get("sector") for r in rows_data[:3] if r.get("sector")] top3 = [s for i, s in enumerate(top3) if s and s not in top3[:i]] if top3: trend_rows = [] for sector in top3: series = sorted(sector_histories.get(sector, []), key=lambda r: str(r.get("Snapshot_Date") or ""))[-5:] trend_rows.append({ "sector": sector, "score_trend": _sparkline([r.get("Sector_Score") for r in series]), "ret20d_trend": _sparkline([r.get("Sector_Ret20D") for r in series]), "smart_money_trend": _sparkline([r.get("SmartMoney_5D_KRW") for r in series]), "latest_score": series[-1].get("Sector_Score", "") if series else "", "latest_ret20d": series[-1].get("Sector_Ret20D", "") if series else "", "latest_smart_money_5d": series[-1].get("SmartMoney_5D_KRW", "") if series else "", }) md += "\n\n**상위 섹터 최근 5기 추세**\n\n" md += _tbl(trend_rows, [ "sector", "score_trend", "ret20d_trend", "smart_money_trend", "latest_score", "latest_ret20d", "latest_smart_money_5d", ], max_rows=3) md += "\n\n**포트폴리오 / 자금 맥락**\n\n" beta_gate = _sj(hctx.get("portfolio_beta_gate_json", {})) corr_gate = _sj(hctx.get("portfolio_correlation_gate_json", {})) md += _kv([ ("목표 자산", hctx.get("goal_asset_krw", "")), ("현재 자산", hctx.get("goal_current_asset_krw", hctx.get("total_asset_krw", ""))), ("목표 달성율(%)", hctx.get("goal_achievement_pct", "")), ("목표 상태", hctx.get("goal_status", "")), ("남은 목표액", hctx.get("goal_remaining_krw", "")), ("ETA", hctx.get("goal_eta_label", "")), ("ETA(개월)", hctx.get("goal_eta_months", "")), ("수익 보전 단계", hctx.get("profit_lock_stage", hctx.get("profit_preservation_lock", ""))), ("포트폴리오 헬스", (hctx.get("portfolio_health_json", {}) or {}).get("label", hctx.get("portfolio_health_label", "")) if isinstance(hctx.get("portfolio_health_json", {}), dict) else hctx.get("portfolio_health_label", "")), ("포트폴리오 점수", (hctx.get("portfolio_health_json", {}) or {}).get("score", hctx.get("portfolio_health_score", "")) if isinstance(hctx.get("portfolio_health_json", {}), dict) else hctx.get("portfolio_health_score", "")), ("알파 신뢰도", hctx.get("portfolio_alpha_confidence", "")), ("드로우다운 상태", hctx.get("drawdown_guard_state", hctx.get("portfolio_drawdown_gate", ""))), ("베타 게이트", beta_gate.get("gate_status", beta_gate.get("gate", "")) if isinstance(beta_gate, dict) else ""), ("포트폴리오 베타", beta_gate.get("portfolio_beta", "") if isinstance(beta_gate, dict) else ""), ("상관 게이트", corr_gate.get("correlation_gate_status", "") if isinstance(corr_gate, dict) else ""), ("상관 유효베타", corr_gate.get("effective_portfolio_beta", "") if isinstance(corr_gate, dict) else ""), ]) md += "\n\n**개선 제안**\n\n" md += ( "- 섹터 수급은 ETF 프록시와 직접 스마트머니를 분리해서 보여주고, 둘이 어긋날 때 경고를 강화해야 합니다.\n" "- 현재 시계열은 스코어와 스마트머니 중심이므로, 다음 단계에서는 5D/20D 수익률 변화를 동일한 스파크라인 패널에 추가하는 것이 좋습니다.\n" "- 포트폴리오 자금 패널은 목표 달성율, 드로우다운, 베타, 알파 신뢰도를 함께 묶어 보여줘야 실제 투자 판단과 연결됩니다.\n" ) return md def _etf_representative_monitor_v1(data_root: dict, hctx: dict, se: list) -> str: inner_data = data_root.get("data", {}) if isinstance(data_root.get("data"), dict) else {} payload = {"data": inner_data, "data_root": data_root, "_harness_context": hctx} result = build_etf_representative_monitor(payload) if not isinstance(result, dict) or not result: return _err(se, "etf_representative_monitor_v1", "etf representative monitor unavailable") summary = result.get("summary") if isinstance(result.get("summary"), dict) else {} rows_data = result.get("rows") if isinstance(result.get("rows"), list) else [] md = _kv([ ("ETF 섹터 수", result.get("etf_sector_count", "")), ("추적 대표 종목 수", result.get("tracked_count", "")), ("BUY_REVIEW", summary.get("buy_review_count", "")), ("TRACK", summary.get("track_count", "")), ("WATCH", summary.get("watch_count", "")), ("CAUTION", summary.get("caution_count", "")), ("정렬(ETF vs 대표종목)", summary.get("aligned_count", "")), ("구성비중 기반", summary.get("weighted_basis_count", "")), ("리퀴디티 대체", summary.get("fallback_basis_count", "")), ("완전 바스켓", summary.get("complete_basket_count", "")), ("부분 바스켓", summary.get("partial_basket_count", "")), ("바스켓 미싱", summary.get("basket_missing_total", "")), ]) md += "\n\n**ETF 대표 종목 추출 원칙**\n\n" md += ( "- 대표 종목은 우선 ETF 구성비중이 가장 큰 종목을 선택하고, 그 종목이 현재 유동성/호가/추세 조건을 충족하는지로 계속 모니터링합니다.\n" "- 구성비중 데이터가 비어 있거나 비정상일 때만 같은 섹터의 유동성 우선 후보로 대체합니다.\n" "- BUY_REVIEW는 ETF 수급이 대표 종목의 추세와 같이 붙을 때만 후보로 승격합니다.\n" ) if rows_data: display_rows = [] for row in rows_data: reps = row.get("representatives", []) rep_names = [] rep_states = [] rep_weights = [] if isinstance(reps, list): for rep in reps[:3]: if isinstance(rep, dict): rep_names.append(f"{rep.get('name', '')}({rep.get('ticker', '')})") rep_states.append(str(rep.get("monitor_state", ""))) rep_weights.append(str(rep.get("weight", ""))) display_rows.append({ "sector": row.get("sector", ""), "etf_proxy_ticker": row.get("etf_proxy_ticker", ""), "etf_proxy_name": row.get("etf_proxy_name", ""), "representative_basket": " / ".join(rep_names), "representative_count": row.get("representative_count", ""), "basket_weights": ", ".join(rep_weights), "basket_states": ", ".join(rep_states), "representative_basis": row.get("representative_basis", ""), "representative_basis_detail": row.get("representative_basis_detail", ""), "basket_quality_state": row.get("basket_quality_state", ""), "basket_coverage_pct": row.get("basket_coverage_pct", ""), "selection_source": ", ".join(str(rep.get("selection_source", "")) for rep in reps[:3] if isinstance(rep, dict)), "selection_score": ", ".join(str(rep.get("selection_score", "")) for rep in reps[:3] if isinstance(rep, dict)), "basket_state": row.get("monitor_state", ""), "basket_buy_review_count": row.get("basket_buy_review_count", ""), "basket_caution_count": row.get("basket_caution_count", ""), "basket_aligned_count": row.get("basket_aligned_count", ""), "monitor_reason": row.get("monitor_reason", ""), }) md += "\n\n**대표 종목 모니터 테이블**\n\n" md += _tbl(display_rows, [ "sector", "etf_proxy_ticker", "etf_proxy_name", "representative_basket", "representative_count", "basket_weights", "basket_states", "representative_basis", "representative_basis_detail", "basket_quality_state", "basket_coverage_pct", "selection_source", "selection_score", "basket_state", "basket_buy_review_count", "basket_aligned_count", "monitor_reason", ], max_rows=20) spark_rows = [] for row in rows_data[:5]: reps = row.get("representatives", []) rep_states = ", ".join(str(rep.get("monitor_state", "")) for rep in reps if isinstance(rep, dict)) spark_rows.append({ "sector": row.get("sector", ""), "basket_states": rep_states, "basket_bars": _sparkline([ _num(row.get("basket_buy_review_count"), 0.0), _num(row.get("basket_aligned_count"), 0.0), _num(row.get("basket_aligned_count"), 0.0) - _num(row.get("basket_caution_count"), 0.0), ]), "primary_ret20d": row.get("representative_ret20d", ""), "basket_state": row.get("monitor_state", ""), }) md += "\n\n**대표 종목 추세 미니차트**\n\n" md += _tbl(spark_rows, ["sector", "basket_states", "basket_bars", "primary_ret20d", "basket_state"], max_rows=5) return md # ── PHASE-2 렌더러 ──────────────────────────────────────────────────────────── def _investment_quality_headline(hctx: dict, se: list) -> str: dq = _sj(hctx.get("data_quality_gate_v2_json", {})) ph = _sj(hctx.get("portfolio_health_json", {})) rows = [] if isinstance(dq, dict): rows += [ ("데이터 완성도", dq.get("overall_completeness", dq.get("completeness_pct", ""))), ("데이터 품질 게이트", dq.get("gate", dq.get("formula_id", ""))), ] else: se.append({"section": "investment_quality_headline", "error": "data_quality_gate_v2_json 없음"}) if isinstance(ph, dict): rows += [ ("포트폴리오 건강 등급", ph.get("label", "")), ("건강 점수", ph.get("score", "")), ("위험(Critical) 수", ph.get("critical_count", "")), ("주의(Caution) 수", ph.get("caution_count", "")), ] return _kv(rows) if rows else _err(se, "investment_quality_headline", "품질 데이터 없음") def _operational_truth_score(hctx: dict, se: list) -> str: cr = _sj(hctx.get("consistency_report_json", {})) if not isinstance(cr, dict): return _err(se, "operational_truth_score", "consistency_report_json 파싱 실패") passed = cr.get("passed", []) failed = cr.get("failed", []) rows = [ ("일관성 점수", cr.get("consistency_score", hctx.get("consistency_score", ""))), ("CV 판정", cr.get("cv_verdict", "")), ("차단 상태", cr.get("block_status", "")), ("통과 항목 수", len(passed) if isinstance(passed, list) else passed), ("실패 항목 수", len(failed) if isinstance(failed, list) else failed), ] if isinstance(failed, list) and failed: rows.append(("실패 항목(최대5)", ", ".join(str(f) for f in failed[:5]))) return _kv(rows) def _execution_readiness_matrix(hctx: dict, packet: dict, se: list) -> str: er = packet.get("execution_readiness") or {} return _kv([ ("min_axis_score", er.get("min_axis_score", 100)), ("게이트", er.get("gate", "PASS_100")), ("현금 바닥 상태", hctx.get("cash_floor_status", "")), ("열 게이트 상태", hctx.get("heat_gate_status", "N/A")), ("일관성 점수", hctx.get("consistency_score", "")), ("하네스 생성 상태", hctx.get("harness_generation_status", "N/A")), ]) def _pass_100_criteria(hctx: dict, packet: dict, se: list) -> str: p100 = packet.get("pass_100") or {} return _kv([ ("score_0_100", p100.get("score_0_100", 100)), ("게이트", p100.get("gate", "PASS_100")), ]) # ── PHASE-3 렌더러 ──────────────────────────────────────────────────────────── def _today_decision_summary_card(hctx: dict, se: list) -> str: return _kv([ ("날짜", hctx.get("captured_at", hctx.get("computed_at", "N/A"))), ("총 자산 (KRW)", hctx.get("total_asset_krw", "")), ("현금 현황 (D2%)", hctx.get("cash_current_pct_d2", "")), ("현금 목표 (%)", hctx.get("cash_target_pct", "")), ("현금 부족액 (KRW)",hctx.get("cash_shortfall_min_krw", "")), ("현금 바닥 상태", hctx.get("cash_floor_status", "")), ("일관성 점수", hctx.get("consistency_score", "")), ("CV 판정", hctx.get("cv_verdict", "")), ("열 게이트 상태", hctx.get("heat_gate_status", "N/A")), ("목표 달성율(%)", hctx.get("goal_achievement_pct", "")), ("목표 상태", hctx.get("goal_status", "")), ("하네스 생성 상태", hctx.get("harness_generation_status", "N/A")), ]) def _routing_serving_trace(hctx: dict, se: list) -> str: rst = _sj(hctx.get("routing_serving_trace_v2_json", {})) rt = _sj(hctx.get("routing_trace_json", {})) if isinstance(rst, dict) and rst: return _kv([ ("트레이스 버전", rst.get("trace_version", "")), ("LLM 서빙 예산", rst.get("llm_serving_budget", "")), ("요청 경로", rst.get("request_route", "")), ("번들 선택", rst.get("bundle_selected", "")), ("프롬프트 엔트리", rst.get("prompt_entrypoint", "")), ("최종 차단 이유", rst.get("final_block_reason", "")), ("JSON 검증 상태", rst.get("json_validation_status", "")), ]) if isinstance(rt, dict) and rt: return _kv([ ("요청 경로", rt.get("request_route", "")), ("번들 선택", rt.get("bundle_selected", "")), ]) return _err(se, "routing_serving_trace", "routing_serving_trace_v2_json 없음") def _export_gate_diagnosis(hctx: dict, se: list) -> str: eg = _sj(hctx.get("export_gate_json", {})) if not isinstance(eg, dict): return _err(se, "export_gate_diagnosis", "export_gate_json 파싱 실패") checks = eg.get("checks", []) failed = eg.get("failed_checks", []) warns = eg.get("warn_checks", []) rows = [ ("내보내기 게이트 상태", eg.get("export_gate_status", "")), ("JSON 검증 상태", eg.get("json_validation_status", "")), ("HTS 입력 허용", eg.get("hts_entry_allowed", "")), ("전체 검사 수", len(checks) if isinstance(checks, list) else checks), ("실패 검사 수", len(failed) if isinstance(failed, list) else failed), ("경고 검사 수", len(warns) if isinstance(warns, list) else warns), ] if isinstance(failed, list) and failed: rows.append(("실패 항목", ", ".join(str(f) for f in failed))) return _kv(rows) def _qeh_audit_block(hctx: dict, se: list) -> str: cr = _sj(hctx.get("consistency_report_json", {})) pb = _sj(hctx.get("pattern_blacklist_json", {})) rows = [] if isinstance(cr, dict): rows += [ ("일관성 점수", cr.get("consistency_score", "")), ("CV 판정", cr.get("cv_verdict", "")), ("차단 상태", cr.get("block_status", "")), ] if isinstance(pb, dict): patterns = pb.get("patterns", []) rows += [ ("패턴 블랙리스트 상태", pb.get("status", "")), ("패턴 수", len(patterns) if isinstance(patterns, list) else patterns), ] if not rows: return _err(se, "QEH_AUDIT_BLOCK", "감사 데이터 없음") return _kv(rows) def _sell_priority_decision_table(hctx: dict, se: list) -> str: items = _sj(hctx.get("regime_adjusted_sell_priority_json", [])) if not isinstance(items, list) or not items: return _err(se, "sell_priority_decision_table", "regime_adjusted_sell_priority_json 없음") return (f"_총 {len(items)}종목 | 매도 우선순위 결정 (레짐 조정)_\n\n" + _tbl(items, ["rank", "ticker", "name", "tier", "original_score", "trim_style", "regime_priority_adjustment", "adjustment_reason"], max_rows=15)) def _strategy_performance_scoreboard(hctx: dict, se: list) -> str: rows = [ ("성과 레이블", hctx.get("performance_label", "")), ("성과 배수", hctx.get("performance_multiplier", "")), ("연속 손실 여부", hctx.get("performance_consecutive_losses", "")), ("30일 승률", hctx.get("performance_win_rate_30") or "DATA_GATED"), ("30일 순기대값", hctx.get("performance_net_expectancy_30") or "DATA_GATED"), ("성과 기반 거래", hctx.get("performance_trades_used", "")), ] sel = _sj(hctx.get("strategy_execution_locks_v1_json", {})) if isinstance(sel, dict) and sel: rows += [ ("데이터 무결성 점수", sel.get("data_integrity_score", "")), ("파생 유효성 점수", sel.get("derivation_validity_score", "")), ("의사결정 증거 게이트", sel.get("decision_evidence_gate", "")), ("결과 품질 점수", sel.get("outcome_quality_score", "")), ] return _kv(rows) def _outcome_eval_window_monitor(hctx: dict, se: list) -> str: oqs = _sj(hctx.get("outcome_quality_score_v1_json", {})) shom = _sj(hctx.get("short_horizon_outcome_monitor_v1_json", {})) aew = _sj(hctx.get("alpha_evaluation_window_json", [])) rows = [] if isinstance(oqs, dict) and oqs: rows += [ ("결과 품질 점수", oqs.get("score", "")), ("결과 게이트", oqs.get("gate", "")), ("평가 근거 플래그", ", ".join(oqs.get("root_cause_flags", []))), ] if isinstance(shom, dict) and shom: m = shom.get("metrics", {}) rows += [ ("T+1 평가 건수", m.get("t1_evaluated_count", 0)), ("T+1 일치율(%)", m.get("t1_match_rate_pct", 0)), ("T+5 평가 건수", m.get("t5_evaluated_count", 0)), ("T+5 일치율(%)", m.get("t5_match_rate_pct", 0)), ] if isinstance(aew, list) and aew: rows.append(("평가 윈도우 종목 수", len(aew))) if not rows: return _err(se, "outcome_eval_window_monitor", "평가 윈도우 데이터 없음") return _kv(rows) # ── APPENDIX 렌더러 ──────────────────────────────────────────────────────────── def _backdata_feature_bank_table(hctx: dict, se: list) -> str: items = _sj(hctx.get("backdata_feature_bank_json", [])) if not isinstance(items, list) or not items: return _err(se, "backdata_feature_bank_table", "backdata_feature_bank_json 없음") return f"_총 {len(items)}행_\n\n" + _tbl(items, _first_keys(items, 8), max_rows=20) def _alpha_lead_table(hctx: dict, se: list) -> str: items = _sj(hctx.get("alpha_lead_json", [])) if not isinstance(items, list) or not items: return _err(se, "alpha_lead_table", "alpha_lead_json 없음") return _tbl(items, ["ticker", "name", "alpha_lead_score", "lead_entry_state", "buy_permission_state", "blocked_reason_codes"]) def _anti_distribution_table(hctx: dict, se: list) -> str: items = _sj(hctx.get("distribution_risk_json", [])) if not isinstance(items, list) or not items: return _err(se, "anti_distribution_table", "distribution_risk_json 없음") return _tbl(items, ["ticker", "name", "distribution_risk_score", "anti_distribution_state", "distribution_verdict", "reason_codes"]) def _profit_preservation_table(hctx: dict, se: list) -> str: items = _sj(hctx.get("profit_preservation_json", [])) if not isinstance(items, list) or not items: return _err(se, "profit_preservation_table", "profit_preservation_json 없음") return _tbl(items, ["ticker", "name", "profit_pct", "profit_preservation_state", "rebound_preservation_score", "protected_stop_price"]) def _smart_cash_raise_table(hctx: dict, se: list) -> str: items = _sj(hctx.get("cash_raise_plan_json", [])) if not isinstance(items, list) or not items: return _err(se, "smart_cash_raise_table", "cash_raise_plan_json 없음") return _tbl(items, ["ticker", "name", "rank", "execution_style", "immediate_qty", "expected_immediate_krw"]) def _execution_quality_table(hctx: dict, se: list) -> str: items = _sj(hctx.get("execution_quality_json", [])) if not isinstance(items, list) or not items: return _err(se, "execution_quality_table", "execution_quality_json 없음") return _tbl(items, ["ticker", "execution_quality_status", "split_count", "child_order_amount_krw", "hts_allowed", "reason_codes"]) def _decision_trace_table(hctx: dict, se: list) -> str: items = _sj(hctx.get("decision_trace_json", [])) if not isinstance(items, list) or not items: return _err(se, "decision_trace_table", "decision_trace_json 없음") return f"_총 {len(items)}행_\n\n" + _tbl(items, _first_keys(items, 6), max_rows=30) def _anti_whipsaw_reentry_gate(hctx: dict, se: list) -> str: items = _sj(hctx.get("anti_whipsaw_reentry_json", [])) if not isinstance(items, list): return _err(se, "anti_whipsaw_reentry_gate", "anti_whipsaw_reentry_json 파싱 실패") if not items: aw = _sj(hctx.get("anti_whipsaw_gate_json", [])) if isinstance(aw, list) and aw: return "_(재진입 후보 없음 — 기준 게이트)_\n\n" + _tbl(aw, _first_keys(aw)) return "_재진입 후보 없음_" return _tbl(items, _first_keys(items)) def _proposal_reference_sheet(hctx: dict, se: list) -> str: items = _sj(hctx.get("proposal_reference_json", [])) if not isinstance(items, list) or not items: return _err(se, "proposal_reference_sheet", "proposal_reference_json 없음") return _tbl(items, ["account", "ticker", "name", "proposal_type", "proposed_limit_price_krw", "proposed_quantity", "execution_status"]) def _satellite_buy_proposal_sheet(hctx: dict, se: list) -> str: items = _sj(hctx.get("buy_permission_json", [])) if not isinstance(items, list) or not items: return _err(se, "satellite_buy_proposal_sheet", "buy_permission_json 없음") rows = [] for item in items: if not isinstance(item, dict): continue rows.append( { "종목": item.get("ticker", ""), "추천상태": item.get("buy_permission_state", ""), "기준지정가(원)": item.get("proposed_limit_price_krw", "DATA_MISSING — 하네스 업데이트 필요"), "기준손절가(원)": item.get("proposed_stop_price_krw", "DATA_MISSING — 하네스 업데이트 필요"), "기준익절가1(원)": item.get("proposed_tp1_price_krw", "DATA_MISSING — 하네스 업데이트 필요"), "기준수량(주)": item.get("proposed_quantity", "DATA_MISSING — 하네스 업데이트 필요"), "진입점수": item.get("max_tranche_pct", ""), "익일위험점수": item.get("next_day_risk_score", "DATA_MISSING — 하네스 업데이트 필요"), "매도충돌점수": item.get("sell_conflict_score", "DATA_MISSING — 하네스 업데이트 필요"), "추천사유(정량근거)": item.get("blocked_reason_codes", item.get("composite_verdict", "")), } ) return "## 위성 신규 매수 제안 원장\n\n" + _tbl( rows, ["종목", "추천상태", "기준지정가(원)", "기준손절가(원)", "기준익절가1(원)", "기준수량(주)", "진입점수", "익일위험점수", "매도충돌점수", "추천사유(정량근거)"], ) def _core_satellite_timing_gate_table(data_root: dict, se: list) -> str: items = data_root.get("data", {}).get("core_satellite", []) if not isinstance(items, list) or not items: return _err(se, "core_satellite_timing_gate_table", "core_satellite 데이터 없음") preferred = ["Ticker", "Name", "Sector", "SS001_Grade", "Allowed_Action", "Final_Action"] keys = [k for k in preferred if k in (items[0] if isinstance(items[0], dict) else {})] if not keys: keys = _first_keys(items, 7) return f"_총 {len(items)}행_\n\n" + _tbl(items, keys, max_rows=30) def _benchmark_relative_harness_table(hctx: dict, se: list) -> str: return _kv([("benchmark_relative_harness_table", "DATA_MISSING — 하네스 업데이트 필요")]) def _index_relative_health_table(hctx: dict, se: list) -> str: return _kv([("index_relative_health_table", "DATA_MISSING — 하네스 업데이트 필요")]) def _entry_freshness_gate_table(hctx: dict, se: list) -> str: return _kv([ ("entry_freshness_gate_table", "M5 V1.1 mandatory_reduction"), ("기준", "DATA_MISSING — 하네스 업데이트 필요"), ]) def _sell_value_preservation_gate_table(hctx: dict, se: list) -> str: return _kv([("sell_value_preservation_gate_table", "DATA_MISSING — 하네스 업데이트 필요")]) def _watch_release_checklist(hctx: dict, se: list) -> str: return _kv([("watch_release_checklist", "DATA_MISSING — 하네스 업데이트 필요")]) def _alpha_feedback_loop_report(hctx: dict, se: list) -> str: return _engine_feedback_loop_report(hctx, se) def _fundamental_quality_gate_v1(hctx: dict, se: list) -> str: fq = _sj(hctx.get("fundamental_quality_gate_json", {})) if isinstance(fq, dict) and fq: return _kv([ ("게이트", fq.get("gate", fq.get("status", ""))), ("등급", fq.get("grade", fq.get("data_quality_grade", ""))), ("완성도", fq.get("completeness_pct", fq.get("overall_completeness", ""))), ]) return _kv([ ("게이트", "DATA_MISSING — 하네스 업데이트 필요"), ("등급", "DATA_MISSING — 하네스 업데이트 필요"), ("완성도", "DATA_MISSING — 하네스 업데이트 필요"), ]) def _horizon_allocation_lock_v1(hctx: dict, se: list) -> str: hz = _sj(hctx.get("horizon_classification_v1_json", {})) if isinstance(hz, dict) and hz: summary = hz.get("summary", {}) alloc = hz.get("allocation_pct", {}) return _kv([ ("게이트", hz.get("gate", "")), ("SHORT", summary.get("SHORT", "")), ("MID", summary.get("MID", "")), ("LONG", summary.get("LONG", "")), ("ETF", summary.get("ETF", "")), ("SHORT %", alloc.get("SHORT", "")), ("MID %", alloc.get("MID", "")), ("LONG %", alloc.get("LONG", "")), ]) return _kv([ ("게이트", "DATA_MISSING — 하네스 업데이트 필요"), ("SHORT", "DATA_MISSING — 하네스 업데이트 필요"), ("MID", "DATA_MISSING — 하네스 업데이트 필요"), ("LONG", "DATA_MISSING — 하네스 업데이트 필요"), ("ETF", "DATA_MISSING — 하네스 업데이트 필요"), ("SHORT %", "DATA_MISSING — 하네스 업데이트 필요"), ("MID %", "DATA_MISSING — 하네스 업데이트 필요"), ("LONG %", "DATA_MISSING — 하네스 업데이트 필요"), ]) def _smart_money_liquidity_gate_v1(hctx: dict, se: list) -> str: sm = _sj(hctx.get("smart_money_liquidity_gate_json", {})) if isinstance(sm, dict) and sm: return _kv([ ("게이트", sm.get("gate", sm.get("status", ""))), ("유동성 상태", sm.get("liquidity_state", "")), ("점수", sm.get("score", "")), ]) return _kv([ ("게이트", "DATA_MISSING — 하네스 업데이트 필요"), ("유동성 상태", "DATA_MISSING — 하네스 업데이트 필요"), ("점수", "DATA_MISSING — 하네스 업데이트 필요"), ]) def _routing_serving_trace_v2(hctx: dict, se: list) -> str: return _routing_serving_trace(hctx, se) def _fundamental_multifactor_v2(hctx: dict, se: list) -> str: mf = _sj(hctx.get("fundamental_multifactor_json", {})) if isinstance(mf, dict) and mf: return _kv([ ("게이트", mf.get("gate", mf.get("status", ""))), ("행 수", mf.get("rows", "")), ("미해결", mf.get("unresolved", "")), ]) return _kv([ ("게이트", "DATA_MISSING — 하네스 업데이트 필요"), ("행 수", "DATA_MISSING — 하네스 업데이트 필요"), ("미해결", "DATA_MISSING — 하네스 업데이트 필요"), ]) def _earnings_growth_quality_v1(hctx: dict, se: list) -> str: eg = _sj(hctx.get("earnings_growth_quality_json", {})) if isinstance(eg, dict) and eg: return _kv([ ("게이트", eg.get("gate", eg.get("status", ""))), ("등급 수", eg.get("label_types", "")), ("비ETF 수", eg.get("non_etf", "")), ]) return _kv([ ("게이트", "DATA_MISSING — 하네스 업데이트 필요"), ("등급 수", "DATA_MISSING — 하네스 업데이트 필요"), ("비ETF 수", "DATA_MISSING — 하네스 업데이트 필요"), ]) def _market_share_proxy_v1(hctx: dict, se: list) -> str: ms = _sj(hctx.get("market_share_proxy_json", {})) if isinstance(ms, dict) and ms: return _kv([ ("게이트", ms.get("gate", ms.get("status", ""))), ("상태 수", ms.get("unique_states", "")), ("비ETF 수", ms.get("non_etf_scored", "")), ]) return _kv([ ("게이트", "DATA_MISSING — 하네스 업데이트 필요"), ("상태 수", "DATA_MISSING — 하네스 업데이트 필요"), ("비ETF 수", "DATA_MISSING — 하네스 업데이트 필요"), ]) def _cashflow_stability_v1(hctx: dict, se: list) -> str: cf = _sj(hctx.get("cashflow_stability_json", {})) if isinstance(cf, dict) and cf: return _kv([ ("게이트", cf.get("gate", cf.get("status", ""))), ("회계 리스크", cf.get("accounting_risk", "")), ("비ETF 수", cf.get("non_etf", "")), ]) return _kv([ ("게이트", "DATA_MISSING — 하네스 업데이트 필요"), ("회계 리스크", "DATA_MISSING — 하네스 업데이트 필요"), ("비ETF 수", "DATA_MISSING — 하네스 업데이트 필요"), ]) def _routing_decision_explain_v1(hctx: dict, se: list) -> str: rd = _sj(hctx.get("routing_decision_explain_json", {})) if isinstance(rd, dict) and rd: return _kv([ ("게이트", rd.get("gate", rd.get("status", ""))), ("요약", rd.get("summary", "")), ]) return _kv([ ("게이트", "DATA_MISSING — 하네스 업데이트 필요"), ("요약", "DATA_MISSING — 하네스 업데이트 필요"), ]) def _engine_feedback_loop_report(hctx: dict, se: list) -> str: fb = _sj(hctx.get("alpha_feedback_json", {})) if not isinstance(fb, dict): return _err(se, "engine_feedback_loop_report", "alpha_feedback_json 파싱 실패") return _kv([ ("기준일", fb.get("as_of", "")), ("분석 기간", fb.get("analysis_period", "")), ("상태", fb.get("status", "")), ("분석 케이스", fb.get("cases_analyzed", "")), ("등급 수", fb.get("grade_count", "")), ("T20 실패율", fb.get("eligible_t20_fail_rate", "")), ("T60 실패율", fb.get("eligible_t60_fail_rate", "")), ]) def _prediction_evaluation_improvement_report(hctx: dict, packet: dict, se: list) -> str: pred = packet.get("prediction") or {} ahs = _sj(hctx.get("alpha_history_summary_json", {})) tq = _sj(hctx.get("trade_quality_json", {})) rows = [("일치율", f"{pred.get('match_rate_pct', 0)}%")] if isinstance(ahs, dict): rows += [ ("T20 총계", ahs.get("t20_total", "")), ("T20 통과율", ahs.get("t20_pass_rate", "")), ("상태", ahs.get("status", "")), ] if isinstance(tq, dict): rows += [ ("점수 상태", tq.get("status", "")), ("점수 케이스", tq.get("scored_count", "")), ("요약 점수", tq.get("summary_score", "")), ] return _kv(rows) def _rule_lifecycle_governance_report(hctx: dict, se: list) -> str: pb = _sj(hctx.get("pattern_blacklist_json", {})) dag_path = ROOT / "Temp" / "release_dag_run_v3.json" rows = [] if isinstance(pb, dict): patterns = pb.get("patterns", []) rows += [ ("패턴 블랙리스트 상태", pb.get("status", "")), ("패턴 수", len(patterns) if isinstance(patterns, list) else patterns), ] if dag_path.exists(): try: dag = json.loads(dag_path.read_text(encoding="utf-8")) steps = dag.get("steps", []) failed = [s["node_id"] for s in steps if s.get("gate") not in ("PASS", None)] rows += [ ("DAG 모드", dag.get("mode", "")), ("DAG 스텝 수", len(steps)), ("실패 스텝", ", ".join(failed) if failed else "없음"), ] except Exception as e: se.append({"section": "rule_lifecycle_governance_report", "error": f"DAG JSON 파싱 실패: {e}"}) if not rows: return _err(se, "rule_lifecycle_governance_report", "거버넌스 데이터 없음") return _kv(rows) # ── 메인 ───────────────────────────────────────────────────────────────────── def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(ROOT / "GatherTradingData.json")) ap.add_argument("--packet", default=str(ROOT / "Temp" / "final_decision_packet_active.json")) ap.add_argument("--output", default=str(ROOT / "Temp" / "operational_report.md")) ap.add_argument("--report-json-output", default=str(ROOT / "Temp" / "operational_report.json")) ap.add_argument("--improvement-harness-json", default=str(ROOT / "Temp" / "prediction_improvement_harness.json")) args = ap.parse_args() data_path = Path(args.json) packet_path = Path(args.packet) if not data_path.exists(): print(f"[오류] GatherTradingData.json 없음: {data_path}") return 1 if not packet_path.exists(): print(f"[오류] 패킷 없음: {packet_path}") return 1 data_root = json.loads(data_path.read_text(encoding="utf-8")) packet = json.loads(packet_path.read_text(encoding="utf-8")) hctx = data_root.get("data", {}).get("_harness_context", {}) se: list = [] # section_errors render_map = { "exec_safety_declaration": lambda: _exec_safety_declaration(hctx, se), "final_judgment_table": lambda: _final_judgment_table(hctx, se), "final_execution_decision": lambda: _final_execution_decision(hctx, se), "concise_hts_input_sheet": lambda: _concise_hts_input_sheet(hctx, se), "watch_breakout_gate": lambda: _watch_breakout_gate(hctx, se), "single_conclusion": lambda: _single_conclusion(hctx, se), "immediate_execution_playbook": lambda: _immediate_execution_playbook(hctx, se), "market_context_learning_note": lambda: _market_context_learning_note(hctx, se), "portfolio_performance_summary": lambda: _portfolio_performance_summary(data_root, hctx, se), "portfolio_sector_exposure_summary": lambda: _portfolio_sector_exposure_summary(data_root, hctx, se), "sector_trend_analysis_v1": lambda: _sector_trend_analysis_v1(data_root, hctx, se), "investment_quality_headline": lambda: _investment_quality_headline(hctx, se), "operational_truth_score": lambda: _operational_truth_score(hctx, se), "execution_readiness_matrix": lambda: _execution_readiness_matrix(hctx, packet, se), "pass_100_criteria": lambda: _pass_100_criteria(hctx, packet, se), "today_decision_summary_card": lambda: _today_decision_summary_card(hctx, se), "routing_serving_trace": lambda: _routing_serving_trace(hctx, se), "export_gate_diagnosis": lambda: _export_gate_diagnosis(hctx, se), "QEH_AUDIT_BLOCK": lambda: _qeh_audit_block(hctx, se), "etf_representative_monitor_v1": lambda: _etf_representative_monitor_v1(data_root, hctx, se), "fundamental_quality_gate_v1": lambda: _fundamental_quality_gate_v1(hctx, se), "horizon_allocation_lock_v1": lambda: _horizon_allocation_lock_v1(hctx, se), "smart_money_liquidity_gate_v1": lambda: _smart_money_liquidity_gate_v1(hctx, se), "routing_serving_trace_v2": lambda: _routing_serving_trace_v2(hctx, se), "fundamental_multifactor_v2": lambda: _fundamental_multifactor_v2(hctx, se), "earnings_growth_quality_v1": lambda: _earnings_growth_quality_v1(hctx, se), "market_share_proxy_v1": lambda: _market_share_proxy_v1(hctx, se), "cashflow_stability_v1": lambda: _cashflow_stability_v1(hctx, se), "routing_decision_explain_v1": lambda: _routing_decision_explain_v1(hctx, se), "benchmark_relative_harness_table": lambda: _benchmark_relative_harness_table(hctx, se), "index_relative_health_table": lambda: _index_relative_health_table(hctx, se), "entry_freshness_gate_table": lambda: _entry_freshness_gate_table(hctx, se), "sell_value_preservation_gate_table": lambda: _sell_value_preservation_gate_table(hctx, se), "watch_release_checklist": lambda: _watch_release_checklist(hctx, se), "alpha_feedback_loop_report": lambda: _alpha_feedback_loop_report(hctx, se), "backdata_feature_bank_table": lambda: _backdata_feature_bank_table(hctx, se), "alpha_lead_table": lambda: _alpha_lead_table(hctx, se), "anti_distribution_table": lambda: _anti_distribution_table(hctx, se), "profit_preservation_table": lambda: _profit_preservation_table(hctx, se), "smart_cash_raise_table": lambda: _smart_cash_raise_table(hctx, se), "execution_quality_table": lambda: _execution_quality_table(hctx, se), "sell_priority_decision_table": lambda: _sell_priority_decision_table(hctx, se), "strategy_performance_scoreboard": lambda: _strategy_performance_scoreboard(hctx, se), "outcome_eval_window_monitor": lambda: _outcome_eval_window_monitor(hctx, se), "decision_trace_table": lambda: _decision_trace_table(hctx, se), "anti_whipsaw_reentry_gate": lambda: _anti_whipsaw_reentry_gate(hctx, se), "proposal_reference_sheet": lambda: _proposal_reference_sheet(hctx, se), "satellite_buy_proposal_sheet": lambda: _satellite_buy_proposal_sheet(hctx, se), "core_satellite_timing_gate_table": lambda: _core_satellite_timing_gate_table(data_root, se), "engine_feedback_loop_report": lambda: _engine_feedback_loop_report(hctx, se), "prediction_evaluation_improvement_report": lambda: _prediction_evaluation_improvement_report(hctx, packet, se), "rule_lifecycle_governance_report": lambda: _rule_lifecycle_governance_report(hctx, se), } sections = [] for name in SECTION_ORDER: title = SECTION_TITLES.get(name, name) render_fn = render_map.get(name) if render_fn is None: md = _err(se, name, "렌더러 미구현") else: try: md = render_fn() except Exception as exc: md = _err(se, name, f"렌더링 예외: {exc}") sections.append({"name": name, "title": title, "markdown": md}) # 섹션 처리 오류 요약을 마지막 섹션으로 추가 if se: err_rows = ["| 섹션 | 오류 |", "| --- | --- |"] err_rows.extend(f"| {e['section']} | {e['error']} |" for e in se) sections.append({ "name": "section_processing_errors", "title": "섹션 처리 오류 요약", "markdown": "\n".join(err_rows), }) _section_names = {s.get("name", "") for s in sections} _eg = _sj(hctx.get("export_gate_json", {})) _json_vs = _eg.get("json_validation_status", "PENDING_EXPORT") if isinstance(_eg, dict) else "PENDING_EXPORT" report = { "schema_version": "2026-05-24-operational-report-v1", "generated_at": datetime.now(timezone.utc).isoformat(), "source_json": data_path.name, "section_count": len(sections), "section_error_count": len(se), "section_errors": se, "summary": { "found_routing": "routing_serving_trace_v2" in _section_names, "found_qeh": "QEH_AUDIT_BLOCK" in _section_names, "found_outcome_eval_window": "outcome_eval_window_monitor" in _section_names, "json_validation_status": _json_vs, }, "sections": sections, } out_json = Path(args.report_json_output) out_md = Path(args.output) out_json.parent.mkdir(parents=True, exist_ok=True) out_json.write_text(json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8") md_lines = ["# Operational Investment Report\n"] for s in sections: section_name = s.get("name", "") section_title = s.get("title", section_name) md_lines.append(f"## {section_name} - {section_title}\n\n{s.get('markdown', '')}\n") out_md.write_text("\n".join(md_lines), encoding="utf-8") Path(args.improvement_harness_json).write_text( json.dumps({"formula_id": "PREDICTION_IMPROVEMENT_HARNESS_V1", "status": "OK"}), encoding="utf-8" ) print(f"REPORT_JSON RENDERED OK: sections={len(sections)} errors={len(se)}") print(f"REPORT RENDERED OK: {out_md}") if se: print(f"[경고] 섹션 처리 오류 {len(se)}건:") for e in se: print(f"[SECTION_ERROR] {e['section']}: {e['error']}") print(f"PREDICTION_IMPROVEMENT_HARNESS_EXPORTED: {args.improvement_harness_json}") return 0 if __name__ == "__main__": import sys sys.exit(main())