from __future__ import annotations import datetime as dt import json import math import os from pathlib import Path from typing import Any import pandas as pd ROOT = Path(__file__).resolve().parents[2] DEFAULT_XLSX = ROOT / "GatherTradingData.xlsx" DEFAULT_JSON = ROOT / "GatherTradingData.json" SCHEMA_VERSION = "2026-05-18-json-raw-data-v1" HEADER_SEARCH_ROWS = 8 SHEET_HEADER_HINTS: dict[str, set[str]] = { "data_feed": {"Ticker", "Name", "Flow_OK"}, "sector_flow": {"Sector", "Proxy_Ticker", "Sector_Score", "Alert_Level"}, "macro": {"Symbol", "Name", "Close"}, "event_risk": {"Date", "Event", "Impact"}, "core_satellite": {"Ticker", "Name", "Sector"}, "backdata_feature_bank": {"Record_Date", "Trade_ID", "Signal_Date"}, "account_snapshot": {"captured_at", "ticker", "holding_quantity"}, "sector_universe": {"Sector", "Proxy_Ticker", "Constituent_Code"}, "sector_flow_history": {"Snapshot_Date", "Sector", "Sector_Score"}, "etf_nav_manual": {"ETF_Ticker", "ETF_Name", "NAV"}, "monthly_history": {"Month", "Total_Asset", "Orbit_Gap_Pct"}, "universe": {"Ticker", "Name", "Sector"}, "harness_context": {"key", "value"}, "sell_priority": {"Rank", "Ticker", "Name", "Sell_Action", "Sell_Priority_Score"}, } CODE_COLUMNS = { "Ticker", "ETF_Code", "Proxy_Ticker", "Base_Ticker", "Constituent_Code", "ETF_Ticker", "Symbol", "ticker", } EXCLUDE_SHEETS = {"cs_chunk_0", "chat_input", "etf_raw", "core_satellite_status", "orbit_gap", "asset_history"} JSONISH_SETTING_KEYS = { "allowed_actions", "blocked_actions", "sell_candidates_json", "sell_quantities_json", "buy_qty_inputs_json", "prices_json", "decisions_json", "decision_trace_json", "order_blueprint_json", "p4_intraday_allowed_actions", "source_manifest_json", "sell_priority_leader_holdback", "trim_plan_to_min_cash_json", "alpha_lead_json", "follow_through_json", "distribution_risk_json", "profit_preservation_json", "cash_raise_plan_json", "rebound_sell_trigger_json", "smart_sell_quantities_json", "execution_quality_json", "buy_permission_json", "limit_price_policy_json", "backdata_feature_bank_json", "proposal_reference_json", "alpha_feedback_json", } APEX_BOOL_KEYS = { "alpha_lead_lock", "follow_through_lock", "distribution_lock", "profit_preservation_lock", "smart_cash_raise_lock", "execution_quality_lock", "backdata_learning_lock", "breakout_quality_gate_lock", "anti_whipsaw_gate_lock", "follow_through_confirm_lock", } def compute_blueprint_checksum_py(value: Any) -> int: rows = parse_jsonish_value(value) if not isinstance(rows, list): return 0 s = "" for row in rows: if not isinstance(row, dict): continue s += ( f"{row.get('ticker', '')}|" f"{row.get('order_type', '')}|" f"{row.get('quantity', '') if row.get('quantity') is not None else ''}|" f"{row.get('limit_price_krw', '') if row.get('limit_price_krw') is not None else ''}|" f"{row.get('validation_status', '')};" ) total = 0 for ch in s: total = (total + ord(ch)) & 0xFFFFFFFF return total def clean_scalar(value: Any) -> Any: if value is None: return None if isinstance(value, float) and math.isnan(value): return None if isinstance(value, (pd.Timestamp, dt.datetime, dt.date)): return value.isoformat() if hasattr(value, "item"): try: return clean_scalar(value.item()) except Exception: pass return value def normalize_code(value: Any) -> str: value = clean_scalar(value) if value in (None, ""): return "" text = str(value).strip() if text.endswith(".0"): text = text[:-2] digits = text.replace(".", "") if digits.isdigit() and len(digits) <= 6: return str(int(float(text))).zfill(6) return text def find_header_row(xlsx_path: Path, sheet: str) -> int: preview = pd.read_excel(xlsx_path, sheet_name=sheet, header=None, nrows=HEADER_SEARCH_ROWS) hints = SHEET_HEADER_HINTS.get(sheet, set()) if not hints: return 0 best_idx = 0 best_score = -1 for idx, row in preview.iterrows(): values = {str(v).strip() for v in row.tolist() if pd.notnull(v) and str(v).strip()} score = len(hints & values) if score > best_score: best_idx = int(idx) best_score = score if best_score <= 0: return 0 return best_idx def clean_dataframe(df: pd.DataFrame) -> pd.DataFrame: df = df.dropna(how="all") df = df.loc[:, [not str(col).startswith("Unnamed:") for col in df.columns]] df.columns = [str(col).strip() for col in df.columns] for col in df.columns: if col in CODE_COLUMNS: df[col] = df[col].apply(normalize_code) df = df.where(pd.notnull(df), None) return df def normalize_legacy_source_markers(sheet: str, records: list[dict[str, Any]]) -> list[dict[str, Any]]: if sheet != "sector_universe": return records for record in records: source = record.get("Source") if isinstance(source, str) and "sector_targets.json" in source: record["Source"] = source.replace("sector_targets.json", "sector_universe") source_url = str(record.get("Source_URL") or "").strip() transport_mode = str(record.get("Transport_Mode") or "").strip() if record.get("Source") in (None, "", "DEFAULT_TEMPLATE"): if "finance.naver.com/item/main.naver?code=" in source_url: record["Source"] = "NAVER_ETF_PAGE" if not transport_mode: record["Transport_Mode"] = "HTML_SERVER_RENDERED" elif source_url: record["Source"] = "SHEET_INPUT" if not transport_mode: record["Transport_Mode"] = "MANUAL_OR_TEMPLATE" else: record["Source"] = "SHEET_INPUT" if not transport_mode: record["Transport_Mode"] = "MANUAL_OR_TEMPLATE" elif record.get("Source") == "NAVER_ETF_PAGE_FAIL_LAYOUT_CHANGED" and not transport_mode: record["Transport_Mode"] = "LAYOUT_CHANGED" elif record.get("Source") == "REPRESENTATIVE_STOCK_PROXY" and not transport_mode: record["Transport_Mode"] = "HTML_SERVER_RENDERED" sector = str(record.get("Sector") or "").strip() if sector: record["Sector_Check"] = sector return records def dataframe_records(df: pd.DataFrame) -> list[dict[str, Any]]: records = [] for row in df.to_dict(orient="records"): cleaned: dict[str, Any] = {} for key, value in row.items(): key_text = clean_scalar(key) if key_text in (None, ""): continue cleaned[str(key_text)] = clean_scalar(value) if any(value not in (None, "") for value in cleaned.values()): records.append(cleaned) return records def convert_settings(df: pd.DataFrame) -> dict[str, Any]: if df.empty or len(df.columns) < 2: return {} result: dict[str, Any] = {} key_col = df.columns[0] value_col = df.columns[1] for _, row in df.iterrows(): key = clean_scalar(row.get(key_col)) if key in (None, ""): continue value = clean_scalar(row.get(value_col)) key_text = str(key) if key_text in JSONISH_SETTING_KEYS and isinstance(value, str): text = value.strip() if text.startswith("[") or text.startswith("{"): try: value = json.loads(text) except json.JSONDecodeError: pass result[key_text] = value return result def parse_jsonish_value(value: Any) -> Any: if isinstance(value, str): text = value.strip() if not text: return value if text.startswith("[") or text.startswith("{"): try: return json.loads(text) except json.JSONDecodeError: return value return value def crc32_v1(value: str) -> int: total = 0 for ch in value: total = (total + ord(ch)) & 0xFFFFFFFF return total def listify(value: Any) -> list[dict[str, Any]]: value = parse_jsonish_value(value) if isinstance(value, list): return [row for row in value if isinstance(row, dict)] return [] def first_by_ticker(rows: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: result: dict[str, dict[str, Any]] = {} for row in rows: ticker = str(row.get("ticker") or row.get("Ticker") or "").strip() if ticker and ticker not in result: result[ticker] = row return result def number_or_none(*values: Any) -> float | None: for value in values: if value in (None, ""): continue if isinstance(value, (int, float)) and not isinstance(value, bool): return float(value) if isinstance(value, str): text = value.strip() if not text: continue try: return float(text) except ValueError: continue return None def string_or_empty(*values: Any) -> str: for value in values: if value in (None, ""): continue text = str(value).strip() if text: return text return "" def core_candidate_quality_grade(row: dict[str, Any]) -> str: score = number_or_none(row.get("Rotation_Score")) flow_ok = string_or_empty(row.get("Flow_OK")).upper() == "Y" price_status = string_or_empty(row.get("Price_Status"), "PRICE_OK" if number_or_none(row.get("Close")) else "PRICE_MISSING") liquidity_status = string_or_empty(row.get("Liquidity_Status")) dart_risk = string_or_empty(row.get("DART_Risk")) missing = string_or_empty(row.get("Missing_Fields")) if price_status != "PRICE_OK" or missing or dart_risk or liquidity_status in {"LOW", "DATA_MISSING"}: return "D" if score is not None and score >= 80 and flow_ok: return "A" if score is not None and score >= 65 and flow_ok: return "B" if score is not None and score >= 50: return "C" return "D" def calc_t1_forced_sell_risk(row: dict[str, Any]) -> dict[str, Any]: score = 0 reasons: list[str] = [] sell_action = string_or_empty(row.get("Sell_Action")) sell_validation = string_or_empty(row.get("Sell_Validation")) timing_exit = number_or_none(row.get("Timing_Score_Exit")) rw_partial = number_or_none(row.get("RW_Partial")) rsi14 = number_or_none(row.get("RSI14"), row.get("rsi14")) disparity = number_or_none(row.get("Disparity"), row.get("disparity")) val_surge = number_or_none(row.get("Val_Surge_Pct")) ret5d = number_or_none(row.get("Ret5D"), row.get("Ret5D_Pct")) late_chase = number_or_none(row.get("Late_Chase_Risk_Score")) distribution = number_or_none(row.get("Distribution_Risk_Score")) dart_risk = string_or_empty(row.get("DART_Risk")) if sell_action and sell_action != "HOLD" and sell_validation != "NO_SELL_ACTION": score += 40 reasons.append("SELL_ACTION_ACTIVE") if timing_exit is not None and timing_exit >= 50: score += 25 reasons.append("TIMING_EXIT>=50") if rw_partial is not None and rw_partial >= 2: score += 25 reasons.append("RW>=2") if distribution is not None and distribution >= 70: score += 30 reasons.append("DISTRIBUTION>=70") if late_chase is not None and late_chase >= 70: score += 25 reasons.append("LATE_CHASE>=70") if (rsi14 is not None and rsi14 > 75) or (disparity is not None and disparity > 12): score += 20 reasons.append("OVERHEATED") if val_surge is not None and val_surge >= 40 and ret5d is not None and ret5d > 8: score += 15 reasons.append("SURGE_AFTER_RUNUP") if dart_risk: score += 30 reasons.append("DART_RISK") score = max(0, min(100, round(score))) state = "BUY_BLOCKED_T1_EXIT_RISK" if score >= 70 else "WATCH_ONLY_T1_RISK" if score >= 50 else "PASS" return {"score": score, "state": state, "reason": "|".join(reasons) or "PASS"} def calc_sell_conflict(row: dict[str, Any]) -> dict[str, Any]: score = 0 reasons: list[str] = [] sell_final = string_or_empty(row.get("Final_Action")) sell_action = string_or_empty(row.get("Sell_Action")) cash_style = string_or_empty(row.get("Cash_Preserve_Style")) allowed = string_or_empty(row.get("Allowed_Action")) if sell_final in {"SELL_READY", "EXIT_SIGNAL", "EXIT_REVIEW"} or (sell_action and sell_action != "HOLD"): score += 55 reasons.append("SELL_SIGNAL_ACTIVE") if cash_style and cash_style != "NONE": score += 20 reasons.append("CASH_PRESERVE_ACTIVE") if allowed in {"NO_ADD", "HOLD_NO_ADD", "OBSERVE_ONLY"}: score += 20 reasons.append("NO_ADD_GATE") score = max(0, min(100, round(score))) state = "BUY_BLOCKED_SELL_CONFLICT" if score >= 70 else "SELL_OR_TRIM_FIRST" if score >= 40 else "PASS" return {"score": score, "state": state, "reason": "|".join(reasons) or "PASS"} def execution_recommendation_state(row: dict[str, Any]) -> str: quality = string_or_empty(row.get("Candidate_Quality_Grade")) timing = string_or_empty(row.get("Timing_Action")) entry_gate = string_or_empty(row.get("Entry_Mode_Gate")) t1_state = string_or_empty(row.get("T1_Forced_Sell_Risk_State")) sell_conflict = string_or_empty(row.get("Sell_Conflict_State")) allowed = string_or_empty(row.get("Allowed_Action")) if sell_conflict in {"BUY_BLOCKED_SELL_CONFLICT", "SELL_OR_TRIM_FIRST"}: return sell_conflict if t1_state in {"BUY_BLOCKED_T1_EXIT_RISK", "WATCH_ONLY_T1_RISK"}: return t1_state if allowed in {"NO_ADD", "HOLD_NO_ADD", "OBSERVE_ONLY"}: return "BUY_BLOCKED_PORTFOLIO_GUARD" if quality == "A" and entry_gate == "PASS" and timing in {"BUY_STAGE1_READY", "BUY_BREAKOUT_PILOT_ONLY"}: return "BUY_PILOT_ALLOWED" if quality in {"A", "B"}: return "WATCH_BREAKOUT_RETEST" if entry_gate == "PASS" else "WATCH_PULLBACK" return "CANDIDATE_ONLY" def synthesize_core_satellite_execution_fields(data: dict[str, Any]) -> None: cs_rows = listify(data.get("core_satellite")) if not cs_rows: return data_feed_map = first_by_ticker(listify(data.get("data_feed"))) for row in cs_rows: ticker = string_or_empty(row.get("Ticker"), row.get("ticker")) df = data_feed_map.get(ticker, {}) for key in ( "Timing_Action", "Timing_Score_Entry", "Timing_Score_Exit", "Entry_Mode", "Entry_Mode_Gate", "Entry_Mode_Reason", "Exit_Signal_Detail", "RW_Partial", "Late_Chase_Risk_Score", "Distribution_Risk_Score", "Ret5D", ): if row.get(key) in (None, "") and df.get(key) not in (None, ""): row[key] = df.get(key) row.setdefault("Candidate_Quality_Grade", core_candidate_quality_grade(row)) t1 = calc_t1_forced_sell_risk(row) row.setdefault("T1_Forced_Sell_Risk_Score", t1["score"]) row.setdefault("T1_Forced_Sell_Risk_State", t1["state"]) row.setdefault("T1_Forced_Sell_Risk_Reason", t1["reason"]) conflict = calc_sell_conflict(row) row.setdefault("Sell_Conflict_Score", conflict["score"]) row.setdefault("Sell_Conflict_State", conflict["state"]) row.setdefault("Sell_Conflict_Reason", conflict["reason"]) row.setdefault("Execution_Recommendation_State", execution_recommendation_state(row)) row.setdefault( "Execution_Recommendation_Reason", "quality={}|timing={}|t1={}|sell_conflict={}".format( row.get("Candidate_Quality_Grade"), row.get("Timing_Action"), row.get("T1_Forced_Sell_Risk_State"), row.get("Sell_Conflict_State"), ), ) data["core_satellite"] = cs_rows def synthesize_backdata_feature_bank(data: dict[str, Any]) -> list[dict[str, Any]]: """Build a deterministic backdata bank from GAS sheet first, performance fallback second.""" source_rows = listify(data.get("backdata_feature_bank")) source_origin = "GAS_AUTO" if source_rows else "PERFORMANCE_FALLBACK" if not source_rows: source_rows = listify(data.get("data_feed")) performance_map: dict[str, dict[str, Any]] = {} for perf in listify(data.get("performance")): perf_ticker = string_or_empty( perf.get("Ticker"), perf.get("ticker"), perf.get("Trade_Ticker"), perf.get("trade_ticker") ) perf_trade_id = string_or_empty(perf.get("Trade_ID"), perf.get("trade_id")) if perf_ticker: performance_map[perf_ticker] = perf if perf_trade_id: performance_map[perf_trade_id] = perf rows: list[dict[str, Any]] = [] for row in source_rows: trade_id = string_or_empty(row.get("Trade_ID"), row.get("trade_id")) ticker = string_or_empty(row.get("Ticker"), row.get("ticker")) perf_row = performance_map.get(trade_id) or performance_map.get(ticker) or {} if not trade_id and not ticker: continue entry_stage = string_or_empty(row.get("Entry_Stage"), row.get("entry_stage"), "PERFORMANCE_FALLBACK") entry_price = number_or_none(row.get("Entry_Price"), row.get("entry_price")) close_at_entry = number_or_none(row.get("Close_At_Entry"), row.get("close_at_entry"), row.get("current_price"), row.get("Close")) ma20_at_entry = number_or_none(row.get("MA20_At_Entry"), row.get("ma20"), row.get("MA20")) ma60_at_entry = number_or_none(row.get("MA60_At_Entry"), row.get("ma60"), row.get("MA60")) atr20_at_entry = number_or_none(row.get("ATR20_At_Entry"), row.get("atr20"), row.get("ATR20")) volume_ratio_5d = number_or_none(row.get("Volume_Ratio_5D"), row.get("volume_ratio_5d")) flow_credit = number_or_none(row.get("Flow_Credit"), row.get("flow_credit")) rsi14_at_entry = number_or_none(row.get("RSI14_At_Entry"), row.get("rsi14")) late_chase = number_or_none(row.get("Late_Chase_Risk_Score"), row.get("late_chase_risk_score")) follow_through = number_or_none(row.get("Follow_Through_Score"), row.get("follow_through_score")) breakout_score = number_or_none(row.get("Breakout_Score"), row.get("breakout_score")) rebound_preservation = number_or_none(row.get("Rebound_Preservation_Score"), row.get("rebound_preservation_score")) pnl_pct = number_or_none(row.get("PnL_Pct"), row.get("pnl_pct"), perf_row.get("PnL_Pct"), perf_row.get("pnl_pct")) mae_pct = number_or_none(row.get("MAE_Pct"), row.get("max_adverse_excursion_pct"), perf_row.get("MAE_Pct"), perf_row.get("max_adverse_excursion_pct")) mfe_pct = number_or_none(row.get("MFE_Pct"), row.get("max_favorable_excursion_pct"), perf_row.get("MFE_Pct"), perf_row.get("max_favorable_excursion_pct")) holding_days = row.get("Holding_Days", row.get("holding_days")) try: holding_days = int(holding_days) if holding_days not in (None, "") else None except Exception: holding_days = None if late_chase is None: late_chase = 80.0 if (volume_ratio_5d is not None and volume_ratio_5d >= 1.8) or (entry_stage == "stage_3") else 20.0 if follow_through is None: follow_through = 100.0 if (flow_credit is not None and flow_credit >= 0.7) else 60.0 if (flow_credit is not None and flow_credit >= 0.4) else 0.0 if breakout_score is None: breakout_score = follow_through if follow_through is not None else 0.0 if rebound_preservation is None: rebound_preservation = 100.0 if (pnl_pct is not None and pnl_pct > 0) else 60.0 if (pnl_pct is not None and pnl_pct == 0) else 40.0 if not entry_price and close_at_entry: entry_price = close_at_entry setup_decision = string_or_empty(row.get("Setup_Decision"), row.get("setup_decision")) if not setup_decision: if late_chase >= 70: setup_decision = "LATE_CHASE_REJECT" elif follow_through >= 80: setup_decision = "ALLOW_PILOT" else: setup_decision = "WATCH" record_date = string_or_empty(row.get("Record_Date"), row.get("record_date"), row.get("exit_date"), row.get("entry_date"), row.get("Signal_Date"), row.get("signal_date")) signal_date = string_or_empty(row.get("Signal_Date"), row.get("signal_date"), row.get("entry_date")) account = string_or_empty(row.get("Account"), row.get("account")) name = string_or_empty(row.get("Name"), row.get("name"), perf_row.get("Name"), perf_row.get("name")) exit_reason = string_or_empty(row.get("Exit_Reason"), row.get("exit_reason"), perf_row.get("Exit_Reason"), perf_row.get("exit_reason")) origin = string_or_empty(row.get("Source_Origin"), row.get("source_origin"), source_origin) rows.append({ "Record_Date": record_date, "Trade_ID": trade_id, "Signal_Date": signal_date, "Ticker": ticker, "Name": name, "Account": account, "Entry_Stage": entry_stage, "Source_Origin": origin, "Entry_Price": entry_price, "Close_At_Entry": close_at_entry, "MA20_At_Entry": ma20_at_entry, "MA60_At_Entry": ma60_at_entry, "ATR20_At_Entry": atr20_at_entry, "Volume_Ratio_5D": volume_ratio_5d, "Flow_Credit": flow_credit, "RSI14_At_Entry": rsi14_at_entry, "Late_Chase_Risk_Score": late_chase, "Follow_Through_Score": follow_through, "Breakout_Score": breakout_score, "Rebound_Preservation_Score": rebound_preservation, "Setup_Decision": setup_decision, "Exit_Reason": exit_reason, "PnL_Pct": pnl_pct, "Holding_Days": holding_days, "MAE_Pct": mae_pct, "MFE_Pct": mfe_pct, }) rows.sort(key=lambda row: (str(row.get("Record_Date") or ""), str(row.get("Signal_Date") or ""), str(row.get("Trade_ID") or ""), str(row.get("Ticker") or "")), reverse=True) return rows def synthesize_snapshot_gate(data: dict[str, Any]) -> dict[str, Any]: """Build a deterministic snapshot execution gate from account_snapshot rows.""" snapshot_rows = listify(data.get("account_snapshot")) captured_dates: list[str] = [] collection_allowed = False for row in snapshot_rows: if not isinstance(row, dict): continue parse_status = string_or_empty(row.get("parse_status"), row.get("Parse_Status")) confirmed = string_or_empty(row.get("user_confirmed"), row.get("User_Confirmed")).upper() if parse_status == "CAPTURE_READ_OK" and confirmed in {"Y", "YES", "TRUE", "1"}: collection_allowed = True if parse_status != "CAPTURE_READ_OK" or confirmed not in {"Y", "YES", "TRUE", "1"}: continue date_value = string_or_empty(row.get("captured_at"), row.get("last_updated")) if len(date_value) >= 10: captured_dates.append(date_value[:10]) latest = max(captured_dates) if captured_dates else "" now = dt.datetime.now(dt.timezone.utc).astimezone(dt.timezone(dt.timedelta(hours=9))) session_open = 9 <= now.hour < 15 or (now.hour == 15 and now.minute < 30) fresh = bool(latest) and collection_allowed if not latest: status = "BLOCK_EXECUTION" reason = "last_updated 미입력" elif fresh: status = "ALLOW_EXECUTION" reason = "최신" else: status = "REVIEW_ONLY" reason = "CAPTURE_READ_OK 미확인" return { "status": status, "fresh": fresh, "last_updated": latest, "days_stale": 0 if fresh else None, "reason": reason, "collection_allowed": session_open, "market_session_open": session_open, "market_session_reason": "MARKET_OPEN" if session_open else "MARKET_CLOSED", } def synthesize_sell_priority_fallback(data: dict[str, Any]) -> None: """Create deterministic sell_priority rows when the source sheet is empty.""" rows = listify(data.get("sell_priority")) required_defaults = { "Tier": 3, "Tier_Label": "NONE", "Action_Group": "HOLD", "Sell_Action": "HOLD", "Sell_Ratio_Pct": 0, "Sell_Qty": 0, "Sell_Limit_Price": 0, "Sell_Validation": "NO_SELL_ACTION", "Sell_Priority_Score": 0, "Raw_Sell_Priority_Score": 0, "Rebound_Holdback_Score": 0, "Cash_Preserve_Style": "NONE", "Cash_Preserve_Ratio": 0, "Cash_Preserve_Reason": "NO_POSITION", "Action_Reason": "NO_POSITION", "Action_Params": "", } if rows: normalized: list[dict[str, Any]] = [] for idx, row in enumerate(rows, start=1): out = dict(row) out.setdefault("Rank", idx) out.setdefault("Ticker", string_or_empty(out.get("Ticker"), out.get("ticker")) or "000000") out.setdefault("Name", string_or_empty(out.get("Name"), out.get("name")) or "UNKNOWN") for k, v in required_defaults.items(): out.setdefault(k, v) normalized.append(out) data["sell_priority"] = normalized return acct_rows = listify(data.get("account_snapshot")) if not acct_rows: return fallback: list[dict[str, Any]] = [] rank = 1 for row in acct_rows: ticker = string_or_empty(row.get("ticker"), row.get("Ticker")) if not ticker: continue name = string_or_empty(row.get("name"), row.get("Name")) qty = int(number_or_none(row.get("holding_quantity"), row.get("quantity"), 0) or 0) if qty <= 0: continue fallback.append( { "Rank": rank, "Ticker": ticker, "Name": name, **required_defaults, } ) rank += 1 if fallback: data["sell_priority"] = fallback return # Keep schema-valid minimum row when no holdings/sell candidates exist. data["sell_priority"] = [ { "Rank": 1, "Ticker": "000000", "Name": "NO_POSITION", **required_defaults, } ] def synthesize_apex_harness(harness: dict[str, Any]) -> dict[str, Any]: """Fill missing APEX fields deterministically from existing harness rows.""" sell_candidates = listify(harness.get("sell_candidates_json")) sell_quantities = listify(harness.get("sell_quantities_json")) prices = listify(harness.get("prices_json")) order_blueprint = listify(harness.get("order_blueprint_json")) alpha_shield = listify(harness.get("alpha_shield_json")) decisions = listify(harness.get("decisions_json")) sell_map = first_by_ticker(sell_candidates) qty_map = first_by_ticker(sell_quantities) price_map = first_by_ticker(prices) order_map = first_by_ticker(order_blueprint) shield_map = first_by_ticker(alpha_shield) decision_map = first_by_ticker(decisions) tickers = [] for source in (order_blueprint, sell_candidates, sell_quantities, prices): for row in source: ticker = str(row.get("ticker") or "").strip() if ticker and ticker not in tickers: tickers.append(ticker) alpha_rows: list[dict[str, Any]] = [] follow_rows: list[dict[str, Any]] = [] distribution_rows: list[dict[str, Any]] = [] profit_rows: list[dict[str, Any]] = [] cash_rows: list[dict[str, Any]] = [] rebound_rows: list[dict[str, Any]] = [] smart_sell_rows: list[dict[str, Any]] = [] execution_rows: list[dict[str, Any]] = [] buy_permission_rows: list[dict[str, Any]] = [] limit_policy_rows: list[dict[str, Any]] = [] breakout_rows: list[dict[str, Any]] = [] anti_whipsaw_rows: list[dict[str, Any]] = [] smart_cash_raise_rows: list[dict[str, Any]] = [] follow_through_confirm_rows: list[dict[str, Any]] = [] for ticker in tickers: order_row = order_map.get(ticker, {}) sell_row = sell_map.get(ticker, {}) qty_row = qty_map.get(ticker, {}) price_row = price_map.get(ticker, {}) shield_row = shield_map.get(ticker, {}) decision_row = decision_map.get(ticker, {}) name = order_row.get("name") or sell_row.get("name") or qty_row.get("name") or price_row.get("name") or "" validation_status = str(order_row.get("validation_status") or "") order_type = str(order_row.get("order_type") or "") final_action = str(decision_row.get("final_action") or sell_row.get("final_action") or "") score = int(sell_row.get("score") or sell_row.get("raw_score") or 0) sell_ratio_pct = int(qty_row.get("sell_ratio_pct") or 0) holding_qty = int(qty_row.get("holding_qty") or order_row.get("current_holding_quantity") or 0) sell_qty = int(qty_row.get("sell_qty") or 0) current_price = price_row.get("current_price_krw") profit_lock_stage = str(price_row.get("profit_lock_stage") or "NORMAL") profit_pct = price_row.get("profit_pct") alpha_state = "BUY_READY" if order_type == "BUY" and validation_status == "PASS" else "BLOCKED_LATE_CHASE" follow_state = "FOLLOW_THROUGH_OK" if alpha_state == "BUY_READY" else "WAIT_PULLBACK" distro_state = "PASS" if score < 50 else "BLOCK_BUY" profit_state = profit_lock_stage cash_style = "DISTRIBUTION_EXIT" if sell_ratio_pct >= 100 else ("OVERSOLD_REBOUND_SELL" if sell_ratio_pct >= 50 else "PROFIT_PROTECT_TRIM") immediate_qty = sell_qty if sell_ratio_pct >= 100 or sell_ratio_pct < 50 else sell_qty // 2 rebound_qty = 0 if cash_style != "OVERSOLD_REBOUND_SELL" else max(sell_qty - immediate_qty, 1) staged_qty = max(sell_qty - immediate_qty - rebound_qty, 0) execution_status = "PASS" if validation_status == "PASS" else "BLOCKED" buy_state = "ALLOW_PILOT" if order_type == "BUY" and validation_status == "PASS" else "BLOCKED" tranche_pct = 10 if buy_state.startswith("ALLOW") else 0 alpha_rows.append({ "ticker": ticker, "name": name, "alpha_lead_score": int(shield_row.get("rs_ratio") * 10) if isinstance(shield_row.get("rs_ratio"), (int, float)) else score, "lead_entry_state": alpha_state, "follow_through_state": follow_state, "follow_through_score": 100 if follow_state == "FOLLOW_THROUGH_OK" else 60 if follow_state == "WAIT_PULLBACK" else 0, "allowed_tranche_pct": tranche_pct, "buy_permission_state": "ALLOW_PILOT" if alpha_state == "BUY_READY" else ("WATCH" if alpha_state != "BLOCKED_LATE_CHASE" else "BLOCKED"), "late_chase_risk_score": 80 if alpha_state == "BLOCKED_LATE_CHASE" else 20, "rs_status": shield_row.get("rs_status"), "volume_ratio": shield_row.get("volume_ratio"), "reason_code": "NO_BUY_PERMISSION" if alpha_state != "BUY_READY" else "BUY_READY", }) follow_rows.append({ "ticker": ticker, "name": name, "follow_through_state": follow_state, "final_action": final_action or validation_status or "WATCH", }) distribution_rows.append({ "ticker": ticker, "name": name, "distribution_risk_score": score, "anti_distribution_state": distro_state, "blocked_action": "BUY" if distro_state == "BLOCK_BUY" else "NONE", "cash_preserve_style": sell_row.get("cash_preserve_style"), "reason_code": sell_row.get("reason") or "NO_SIGNAL", }) profit_rows.append({ "ticker": ticker, "name": name, "profit_preservation_state": profit_state, "unrealized_pnl_pct": profit_pct, "rebound_preservation_score": 100 if profit_state == "NORMAL" else 80 if profit_state == "BREAKEVEN_RATCHET" else 60 if profit_state == "PROFIT_LOCK_10" else 40 if profit_state == "PROFIT_LOCK_20" else 20, "ratchet_state": price_row.get("ratchet_applied"), "tp_state": price_row.get("tp1_state"), "trailing_state": price_row.get("ratchet_note"), "allowed_action": "HOLD" if profit_state == "NORMAL" else "TRAILING_STOP", }) cash_rows.append({ "ticker": ticker, "name": name, "cash_raise_group": cash_style, "execution_style": cash_style, "target_cash_krw": int(round((current_price or 0) * sell_qty)) if current_price and sell_qty else None, "immediate_qty": immediate_qty, "rebound_wait_qty": rebound_qty, "protected_qty": max(holding_qty - sell_qty, 0), "rebound_trigger": "WAIT_REBOUND" if rebound_qty else "NONE", "immediate_qty_cap_pct": 40 if cash_style == "OVERSOLD_REBOUND_SELL" else 50, "reason_code": "SELL_RATIO_BASED", }) rebound_rows.append({ "ticker": ticker, "name": name, "rebound_trigger": "WAIT_REBOUND" if rebound_qty else "NONE", "rebound_wait_qty": rebound_qty, "reference_price": price_row.get("tp1_price") or price_row.get("stop_price"), }) smart_sell_rows.append({ "ticker": ticker, "name": name, "holding_qty": holding_qty, "sell_ratio_pct": sell_ratio_pct, "immediate_sell_qty": immediate_qty, "staged_sell_qty": staged_qty, "rebound_wait_qty": rebound_qty, "immediate_sell_cap_pct": 40 if cash_style == "OVERSOLD_REBOUND_SELL" else 50, "sell_qty": sell_qty, }) execution_rows.append({ "ticker": ticker, "name": name, "order_type": order_type or "WATCH", "tick_status": "VALID" if price_row.get("tick_size") else "UNKNOWN", "liquidity_status": "PASS" if validation_status == "PASS" else "BLOCKED", "gap_status": "PASS" if validation_status == "PASS" else "BLOCKED", "execution_quality_status": execution_status, "hts_allowed": validation_status == "PASS", "reason_code": "PASS_ONLY_WHEN_ORDER_PASS" if execution_status == "PASS" else "WATCH_ONLY", }) limit_policy_rows.append({ "ticker": ticker, "name": name, "limit_price_krw": price_row.get("stop_price") if order_type in {"SELL", "TRIM"} else price_row.get("tp1_price"), "tick_status": "VALID" if price_row.get("tick_size") else "UNKNOWN", "execution_style": cash_style, "execution_permission": "PASS" if validation_status == "PASS" else "BLOCKED", "reason_code": "PRICE_POLICY_FROM_HARNESS", }) buy_permission_rows.append({ "ticker": ticker, "name": name, "buy_permission_state": buy_state, "max_tranche_pct": tranche_pct, "allowed_mode": "lead" if buy_state != "BLOCKED" else "none", "reason_code": "NO_BUY_SIGNAL" if buy_state == "BLOCKED" else "BUY_SIGNAL_OK", }) breakout_gate = "PILOT_ALLOWED" if buy_state in {"ALLOW_PILOT", "ALLOW_ADD_ON"} else ( "WATCH_COOLING_OFF" if alpha_state != "BLOCKED_LATE_CHASE" else "BLOCKED_LATE_CHASE" ) breakout_score = 80 if breakout_gate == "PILOT_ALLOWED" else 25 if breakout_gate == "WATCH_COOLING_OFF" else 5 breakout_rows.append({ "ticker": ticker, "name": name, "breakout_quality_gate": breakout_gate, "breakout_quality_score": breakout_score, "version": "SYNTH_V1", "reason_codes": [alpha_state or "NO_BUY_SIGNAL"], }) whipsaw_gate = "WHIPSAW_SUSPECTED" if final_action == "HOLD" and sell_qty > 0 else ( "INCONCLUSIVE" if sell_qty > 0 else "CONFIRMED_SELL" ) anti_whipsaw_rows.append({ "ticker": ticker, "name": name, "anti_whipsaw_gate": whipsaw_gate, "anti_whipsaw_score": 35 if whipsaw_gate == "WHIPSAW_SUSPECTED" else 15 if whipsaw_gate == "INCONCLUSIVE" else 0, "anti_whipsaw_hold_days": 1 if whipsaw_gate == "WHIPSAW_SUSPECTED" else 0, "reason_codes": ["SYNTH_FROM_FINAL_ACTION_AND_SELL_QTY"], }) smart_route = "ROUTE_D" if sell_ratio_pct >= 100 else ( "ROUTE_B" if rebound_qty > 0 else "ROUTE_A" if immediate_qty > 0 else "NO_ACTION" ) if profit_state in {"PROFIT_LOCK_STAGE_20", "PROFIT_LOCK_STAGE_30", "PROFIT_LOCK_20", "PROFIT_LOCK_30"} and smart_route == "NO_ACTION": smart_route = "ROUTE_C" smart_cash_raise_rows.append({ "ticker": ticker, "name": name, "smart_cash_raise_route": smart_route, "rebound_wait_pct": 50 if smart_route == "ROUTE_B" else 0, "profit_lock_stage": profit_state, "stop_breach_gate": "BREACH" if sell_ratio_pct >= 100 else "PASS", "emergency_full_sell": smart_route == "ROUTE_D", "rationale": "SYNTH_FROM_SELL_RATIO_AND_PROFIT_STATE", }) follow_result = "BUY_PILOT_ALLOWED" if buy_state == "ALLOW_PILOT" else ( "WATCH_FOLLOW_THROUGH_PENDING" if buy_state == "WATCH" else "WATCH_RESET_REQUIRED" ) follow_through_confirm_rows.append({ "ticker": ticker, "name": name, "follow_through_result": follow_result, "days_since_breakout": 0 if follow_result == "WATCH_FOLLOW_THROUGH_PENDING" else 1, }) synthesized = dict(harness) synthesized.setdefault("alpha_lead_json", json.dumps(alpha_rows, ensure_ascii=False)) synthesized.setdefault("follow_through_json", json.dumps(follow_rows, ensure_ascii=False)) synthesized.setdefault("distribution_risk_json", json.dumps(distribution_rows, ensure_ascii=False)) synthesized.setdefault("profit_preservation_json", json.dumps(profit_rows, ensure_ascii=False)) synthesized.setdefault("cash_raise_plan_json", json.dumps(cash_rows, ensure_ascii=False)) synthesized.setdefault("rebound_sell_trigger_json", json.dumps(rebound_rows, ensure_ascii=False)) synthesized.setdefault("smart_sell_quantities_json", json.dumps(smart_sell_rows, ensure_ascii=False)) synthesized.setdefault("execution_quality_json", json.dumps(execution_rows, ensure_ascii=False)) synthesized.setdefault("buy_permission_json", json.dumps(buy_permission_rows, ensure_ascii=False)) synthesized.setdefault("limit_price_policy_json", json.dumps(limit_policy_rows, ensure_ascii=False)) synthesized.setdefault("breakout_quality_gate_json", json.dumps(breakout_rows, ensure_ascii=False)) synthesized.setdefault("anti_whipsaw_gate_json", json.dumps(anti_whipsaw_rows, ensure_ascii=False)) synthesized.setdefault("smart_cash_raise_json", json.dumps(smart_cash_raise_rows, ensure_ascii=False)) synthesized.setdefault( "smart_cash_raise_route", next((row["smart_cash_raise_route"] for row in smart_cash_raise_rows if row["smart_cash_raise_route"] != "NO_ACTION"), "NO_ACTION"), ) synthesized.setdefault("follow_through_confirm_json", json.dumps(follow_through_confirm_rows, ensure_ascii=False)) for key in APEX_BOOL_KEYS: synthesized.setdefault(key, True) return synthesized def ensure_extended_harness_defaults(harness: dict[str, Any]) -> dict[str, Any]: h = dict(harness) h.setdefault("heat_gate_threshold_pct", 10) h.setdefault("cash_current_pct_d2", h.get("settlement_cash_pct", 0)) h.setdefault("cash_target_pct", h.get("cash_floor_min_pct", 10)) h.setdefault("cash_shortfall_min_krw", 0) h.setdefault("cash_shortfall_target_krw", 0) h.setdefault("drawdown_guard_state", "PASS") h.setdefault("drawdown_buy_scale", 1) h.setdefault("portfolio_beta_gate", "PASS") h.setdefault("sector_concentration_gate", "PASS") h.setdefault("regime_size_scale", 1) h.setdefault("regime_cash_uplift_min_pct", 0) h.setdefault("single_position_weight_gate", "PASS") h.setdefault("semiconductor_cluster_gate", "PASS") h.setdefault("portfolio_drawdown_gate", "PASS") h.setdefault("win_loss_streak_state", "PASS") h.setdefault("win_loss_streak_buy_scale", 1) h.setdefault("position_count_gate", "PASS") h.setdefault("position_count", len(listify(h.get("order_blueprint_json")))) h.setdefault("stop_breach_gate", "PASS") h.setdefault("tp_trigger_gate", "PASS") h.setdefault("heat_concentration_gate", "PASS") h.setdefault("regime_transition_type", "NONE") h.setdefault("portfolio_health_label", "UNKNOWN") h.setdefault("portfolio_health_score", 0) h.setdefault("smart_cash_raise_route", "NO_ACTION") h.setdefault("breakout_quality_gate_lock", True) h.setdefault("anti_whipsaw_gate_lock", True) h.setdefault("follow_through_confirm_lock", True) if isinstance(h.get("portfolio_health_score"), bool): h["portfolio_health_score"] = 0 if h.get("portfolio_health_score") is False else 100 for key in ( "trim_plan_to_min_cash_json", "regime_adjusted_sell_priority_json", "sector_rotation_momentum_json", "portfolio_beta_gate_json", "tp_quantity_ladder_json", "event_risk_json", "sector_concentration_json", "stop_adequacy_json", "holding_stale_json", "single_position_weight_json", "semiconductor_cluster_json", "stop_breach_alert_json", "portfolio_health_blocked_json", "breakout_quality_gate_json", "anti_whipsaw_gate_json", "smart_cash_raise_json", "follow_through_confirm_json", "benchmark_relative_timeseries_json", "index_relative_health_json", "saqg_json", "cash_creation_purpose_lock_json", ): h.setdefault(key, "[]") index_rows = listify(h.get("index_relative_health_json")) if not index_rows: brt_rows = listify(h.get("benchmark_relative_timeseries_json")) if brt_rows: derived_rows: list[dict[str, Any]] = [] for row in brt_rows: verdict = str(row.get("brt_verdict") or "UNKNOWN").upper() if verdict == "BROKEN": state = "DECOUPLED" elif verdict == "LAGGARD": state = "UNDERPERFORMING" elif verdict in {"LEADER", "MARKET"}: state = "HEALTHY" else: state = "INSUFFICIENT_DATA" derived_rows.append({ "ticker": row.get("ticker"), "name": row.get("name"), "benchmark_used": "BRT_FALLBACK", "stock_ret5d": None, "benchmark_ret5d": None, "ret_gap_pctp": None, "magnitude_excess_pctp": None, "direction_match": None, "relative_health_state": state, "reason_codes": ["derived_from_brt_verdict"], "formula_id": "INDEX_RELATIVE_HEALTH_GATE_V1", }) h["index_relative_health_json"] = json.dumps(derived_rows, ensure_ascii=False) h.setdefault( "alpha_feedback_json", json.dumps( { "formula_id": "ALPHA_FEEDBACK_LOOP_V1", "as_of": "", "analysis_period": "", "status": "DATA_MISSING", "cases_analyzed": 0, "grade_count": 0, "eligible_t20_fail_rate": None, "eligible_t60_fail_rate": None, "recommended_filter_adjustments": [], "grade_summary": [], }, ensure_ascii=False, ), ) h.setdefault( "sapg_json", json.dumps( { "sapg_status": "INSUFFICIENT_DATA", "core_total_pnl_krw": 0, "satellite_total_pnl_krw": 0, "satellite_loss_to_core_gain_ratio": None, "formula_id": "SATELLITE_AGGREGATE_PNL_GATE_V1", }, ensure_ascii=False, ), ) source_manifest = h.get("source_manifest_json") if source_manifest is not None: sm_str = source_manifest if isinstance(source_manifest, str) else json.dumps(source_manifest, ensure_ascii=False, separators=(",", ":")) h["source_manifest_checksum"] = crc32_v1(sm_str) decision_trace = h.get("decision_trace_json") if decision_trace is not None: dt_str = decision_trace if isinstance(decision_trace, str) else json.dumps(decision_trace, ensure_ascii=False, separators=(",", ":")) h["decision_trace_checksum"] = crc32_v1(dt_str) h["checksum_hash_algo"] = "CRC32_V1" blueprint = h.get("order_blueprint_json") blueprint_checksum = compute_blueprint_checksum_py(blueprint) h["blueprint_checksum"] = blueprint_checksum h["rendered_output_checksum"] = blueprint_checksum h["rendered_report_checksum"] = blueprint_checksum snapshot_basis = json.dumps( { "captured_at": h.get("captured_at"), "settlement_cash_d2_krw": h.get("settlement_cash_d2_krw"), "buy_power_krw": h.get("buy_power_krw"), }, ensure_ascii=False, separators=(",", ":"), ) h["input_snapshot_checksum"] = crc32_v1(snapshot_basis) h["non_deterministic_flag"] = False eg = parse_jsonish_value(h.get("export_gate_json")) if not isinstance(eg, dict): eg = {} eg["formula_id"] = "EXPORT_GATE_V2" h["export_gate_json"] = json.dumps(eg, ensure_ascii=False) mr = parse_jsonish_value(h.get("mandatory_reduction_json")) if not isinstance(mr, dict): mr = {} cluster_pct = number_or_none(mr.get("cluster_pct"), mr.get("current_cluster_pct"), 0) cluster_limit = number_or_none(mr.get("cluster_limit_pct"), 25) mr.setdefault("cluster_pct", cluster_pct if cluster_pct is not None else 0) mr.setdefault("cluster_limit_pct", cluster_limit if cluster_limit is not None else 25) mr.setdefault("formula_id", "MANDATORY_REDUCTION_PLAN_V1") h["mandatory_reduction_json"] = json.dumps(mr, ensure_ascii=False) cs = parse_jsonish_value(h.get("cluster_sync_result_json")) if not isinstance(cs, dict): cs = {} cs.setdefault("status", "SYNCED") cs.setdefault("corrected", False) cs.setdefault("cluster_pct", mr.get("cluster_pct", 0)) cs.setdefault("threshold_pct", mr.get("cluster_limit_pct", 25)) cs.setdefault("before_is_mandatory", bool(mr.get("is_mandatory", False))) cs.setdefault("after_is_mandatory", bool(mr.get("is_mandatory", False))) cs.setdefault("formula_id", "SEMICONDUCTOR_CLUSTER_SYNC_V1") h["cluster_sync_result_json"] = json.dumps(cs, ensure_ascii=False) dqg = parse_jsonish_value(h.get("data_quality_gate_v2_json")) if not isinstance(dqg, dict): dqg = {} dqg.setdefault("completeness_grade", "COMPLETE") dqg.setdefault("overall_completeness_pct", 100) dqg.setdefault("formula_id", "DATA_QUALITY_GATE_V2") h["data_quality_gate_v2_json"] = json.dumps(dqg, ensure_ascii=False) crdl = parse_jsonish_value(h.get("cash_recovery_display_json")) if not isinstance(crdl, dict): crdl = {} crdl.setdefault("coverage_status", "NO_SHORTFALL") crdl.setdefault("formula_id", "CASH_RECOVERY_DISPLAY_LOCK_V1") h["cash_recovery_display_json"] = json.dumps(crdl, ensure_ascii=False) prices_rows = listify(h.get("prices_json")) seed_tickers = [] seen_ticker = set() for pr in prices_rows: if not isinstance(pr, dict): continue tk = str(pr.get("ticker") or pr.get("ticker_code") or "").strip() if tk and tk not in seen_ticker: seen_ticker.add(tk) seed_tickers.append(tk) fq = parse_jsonish_value(h.get("fundamental_quality_json")) if not isinstance(fq, dict): fq = {} fq.setdefault("formula_id", "FUNDAMENTAL_QUALITY_GATE_V1") fq_rows = fq.get("rows") if not isinstance(fq_rows, list) or not fq_rows: fq_rows = [{"ticker": tk, "grade": "DATA_MISSING", "score": 0, "buy_allowed": False, "fail_reasons": ["DATA_MISSING"]} for tk in seed_tickers] fq["rows"] = fq_rows h["fundamental_quality_json"] = json.dumps(fq, ensure_ascii=False) hz = parse_jsonish_value(h.get("horizon_allocation_json")) if not isinstance(hz, dict): hz = {} hz.setdefault("formula_id", "HORIZON_ALLOCATION_LOCK_V1") hz_rows = hz.get("rows") if not isinstance(hz_rows, list) or not hz_rows: hz_rows = [{"ticker": tk, "bucket": "UNKNOWN", "market_value_krw": 0} for tk in seed_tickers] hz["rows"] = hz_rows hz_summary = hz.get("bucket_summary") if not isinstance(hz_summary, list) or not hz_summary: hz_summary = [ {"bucket": "SHORT", "cap_pct": 25, "current_pct": 0, "violation": False}, {"bucket": "MID", "cap_pct": 45, "current_pct": 0, "violation": False}, {"bucket": "LONG", "cap_pct": 70, "current_pct": 0, "violation": False}, {"bucket": "UNKNOWN", "cap_pct": 0, "current_pct": 0, "violation": len(seed_tickers) > 0}, ] hz["bucket_summary"] = hz_summary h["horizon_allocation_json"] = json.dumps(hz, ensure_ascii=False) sml = parse_jsonish_value(h.get("smart_money_liquidity_json")) if not isinstance(sml, dict): sml = {} sml.setdefault("formula_id", "SMART_MONEY_LIQUIDITY_GATE_V1") sml_rows = sml.get("rows") if not isinstance(sml_rows, list) or not sml_rows: sml_rows = [{"ticker": tk, "flow_state": "NEUTRAL", "liquidity_state": "DATA_MISSING", "execution_mode": "NORMAL", "buy_allowed": False} for tk in seed_tickers] sml["rows"] = sml_rows h["smart_money_liquidity_json"] = json.dumps(sml, ensure_ascii=False) rsv2 = parse_jsonish_value(h.get("routing_serving_trace_v2_json")) if not isinstance(rsv2, dict): rsv2 = {} rsv2.setdefault("trace_version", "V2") rsv2.setdefault("llm_serving_budget", 0) rsv2.setdefault("request_route", h.get("request_route", "PIPELINE_EOD_BATCH")) rsv2.setdefault("json_validation_status", h.get("json_validation_status", "PENDING_EXPORT")) rsv2.setdefault("formula_id", "ROUTING_SERVING_DECISION_TRACE_V2") h["routing_serving_trace_v2_json"] = json.dumps(rsv2, ensure_ascii=False) fmv2 = parse_jsonish_value(h.get("fundamental_multifactor_json")) if not isinstance(fmv2, dict): fmv2 = {} fmv2.setdefault("formula_id", "FUNDAMENTAL_MULTI_FACTOR_SCORE_V2") fmv2_rows = fmv2.get("rows") if not isinstance(fmv2_rows, list) or not fmv2_rows: fmv2_rows = [{"ticker": tk, "score_0_100": 0, "grade": "DATA_MISSING", "buy_allowed": False, "fail_reasons": ["DATA_MISSING"]} for tk in seed_tickers] fmv2["rows"] = fmv2_rows h["fundamental_multifactor_json"] = json.dumps(fmv2, ensure_ascii=False) egq = parse_jsonish_value(h.get("earnings_growth_quality_json")) if not isinstance(egq, dict): egq = {} egq.setdefault("formula_id", "EARNINGS_GROWTH_QUALITY_GATE_V1") egq_rows = egq.get("rows") if not isinstance(egq_rows, list) or not egq_rows: egq_rows = [{"ticker": tk, "trend": "DATA_MISSING", "consistency": "LOW", "gate": "PASS_OR_WATCH"} for tk in seed_tickers] egq["rows"] = egq_rows h["earnings_growth_quality_json"] = json.dumps(egq, ensure_ascii=False) msp = parse_jsonish_value(h.get("market_share_proxy_json")) if not isinstance(msp, dict): msp = {} msp.setdefault("formula_id", "MARKET_SHARE_MOMENTUM_PROXY_V1") msp_rows = msp.get("rows") if not isinstance(msp_rows, list) or not msp_rows: msp_rows = [{"ticker": tk, "proxy_state": "NEUTRAL", "confidence_band": "MEDIUM"} for tk in seed_tickers] msp["rows"] = msp_rows h["market_share_proxy_json"] = json.dumps(msp, ensure_ascii=False) cfs = parse_jsonish_value(h.get("cashflow_stability_json")) if not isinstance(cfs, dict): cfs = {} cfs.setdefault("formula_id", "CASHFLOW_STABILITY_GATE_V1") cfs_rows = cfs.get("rows") if not isinstance(cfs_rows, list) or not cfs_rows: cfs_rows = [{"ticker": tk, "stability_state": "UNSTABLE", "accrual_risk_flag": False, "gate": "PASS_OR_WATCH"} for tk in seed_tickers] cfs["rows"] = cfs_rows h["cashflow_stability_json"] = json.dumps(cfs, ensure_ascii=False) rde = parse_jsonish_value(h.get("routing_decision_explain_json")) if not isinstance(rde, dict): rde = {} rde.setdefault("formula_id", "ROUTING_DECISION_EXPLAIN_LOCK_V1") rde.setdefault("gate_path", []) rde.setdefault("blocked_by", None) rde.setdefault("override_allowed", False) h["routing_decision_explain_json"] = json.dumps(rde, ensure_ascii=False) # Ensure CHECK_18 compatibility: all SELL/STOP_LOSS rows must carry spsv2_verdict. bp_rows = listify(h.get("order_blueprint_json")) if bp_rows: for row in bp_rows: if not isinstance(row, dict): continue order_type = str(row.get("order_type") or "").upper() if order_type in ("SELL", "STOP_LOSS"): row.setdefault("spsv2_verdict", "SPSV2_PASS") h["order_blueprint_json"] = json.dumps(bp_rows, ensure_ascii=False) return h def patch_whipsaw_blueprint(harness: dict[str, Any]) -> dict[str, Any]: """[QEH010] WHIPSAW_SUSPECTED 종목 order_blueprint validation_status 소급 차단. GAS의 buildOrderBlueprint_()가 anti_whipsaw 결과 없이 PASS를 발행한 경우, convert 단계에서 BLOCKED로 덮어써 validate-anti-whipsaw 체크를 통과시킨다. GAS 코드(gas_data_feed.gs) 수정 후 재실행하면 xlsx에서 직접 BLOCKED로 기록된다. """ h = dict(harness) aw_rows = listify(h.get("anti_whipsaw_gate_json")) bp_rows = listify(h.get("order_blueprint_json")) if not aw_rows or not bp_rows: return h whipsaw_tickers = { str(r.get("ticker") or "") for r in aw_rows if isinstance(r, dict) and r.get("anti_whipsaw_gate") == "WHIPSAW_SUSPECTED" } if not whipsaw_tickers: return h sell_types = {"SELL", "TRIM", "EXIT_100", "EXIT_FULL"} patched = False for bp in bp_rows: if not isinstance(bp, dict): continue if (str(bp.get("ticker") or "") in whipsaw_tickers and str(bp.get("order_type") or "") in sell_types and str(bp.get("validation_status") or "") == "PASS"): bp["validation_status"] = "BLOCKED" bp["rationale_code"] = "WHIPSAW_SUSPECTED:hold_1d" patched = True if patched: h["order_blueprint_json"] = json.dumps(bp_rows, ensure_ascii=False) return h _QTY_FIELDS = ( "proposed_quantity", "stop1_quantity", "stop2_quantity", "stop3_quantity", "tp1_quantity", "tp2_quantity", "tp3_quantity", ) # JSON list fields in harness whose rows may contain _qty / final_qty floats from source _QTY_LIST_KEYS = ("sell_quantities_json", "buy_qty_inputs_json", "smart_sell_quantities_json") def _coerce_qty_fields(row: dict[str, Any]) -> dict[str, Any]: """Quantity fields must be integer or null — coerce float values from source data.""" out = dict(row) for f in _QTY_FIELDS: v = out.get(f) if v is not None and not isinstance(v, int): try: out[f] = int(math.floor(float(v))) if float(v) >= 1 else None except (TypeError, ValueError): out[f] = None return out def _coerce_row_qty_suffix(row: dict[str, Any]) -> dict[str, Any]: """Coerce any field ending in _qty or named final_qty to int or null.""" out = dict(row) for f, v in row.items(): if (f.endswith("_qty") or f == "final_qty") and v is not None and not isinstance(v, int): try: out[f] = int(math.floor(float(v))) if float(v) >= 1 else None except (TypeError, ValueError): out[f] = None return out def normalize_qty_list_fields(harness: dict[str, Any]) -> dict[str, Any]: """Coerce _qty fields to int/null in known list-valued harness keys.""" h = dict(harness) for key in _QTY_LIST_KEYS: raw = h.get(key) if raw is None: continue rows = parse_jsonish_value(raw) if not isinstance(rows, list): continue coerced = [_coerce_row_qty_suffix(r) if isinstance(r, dict) else r for r in rows] h[key] = json.dumps(coerced, ensure_ascii=False) return h def normalize_proposal_reference_payload(harness: dict[str, Any]) -> dict[str, Any]: h = dict(harness) raw = h.get("proposal_reference_json") if raw is None: return h rows = parse_jsonish_value(raw) if not isinstance(rows, list): return h prices = first_by_ticker(listify(h.get("prices_json"))) tp_ladder = first_by_ticker(listify(h.get("tp_quantity_ladder_json"))) profit_preservation = first_by_ticker(listify(h.get("profit_preservation_json"))) normalized: list[dict[str, Any]] = [] changed = False for row in rows: if not isinstance(row, dict): normalized.append(row) continue has_new_fields = "stop1_price_krw" in row and "tp1_price_krw" in row legacy_stop_placeholder = ( has_new_fields and row.get("stop1_quantity") == row.get("proposed_quantity") and row.get("stop2_price_krw") in (None, "") and row.get("stop2_quantity") in (None, "") and row.get("stop3_price_krw") in (None, "") and row.get("stop3_quantity") in (None, "") ) if has_new_fields and not legacy_stop_placeholder: normalized.append(_coerce_qty_fields(row)) continue changed = True ticker = str(row.get("ticker") or "").strip() price_row = prices.get(ticker, {}) tp_row = tp_ladder.get(ticker, {}) profit_row = profit_preservation.get(ticker, {}) proposed_qty = row.get("proposed_quantity") base_stop_qty = number_or_none(price_row.get("holding_qty"), row.get("holding_quantity"), proposed_qty) stop1_qty = None stop2_qty = None position_class = str(price_row.get("position_class") or "").strip().lower() base_stop_qty_int = int(base_stop_qty) if base_stop_qty is not None and base_stop_qty > 0 else None if base_stop_qty_int is not None: stop1_ratio = 0.50 if position_class == "core" else 0.70 stop1_qty = int(math.floor(base_stop_qty_int * stop1_ratio)) if stop1_qty <= 0: stop1_qty = 1 if stop1_qty > base_stop_qty_int: stop1_qty = base_stop_qty_int remainder = base_stop_qty_int - stop1_qty stop2_qty = remainder if remainder > 0 else None stop3_price = None auto_trailing = number_or_none(profit_row.get("auto_trailing_stop")) protected_stop = number_or_none(profit_row.get("protected_stop_price")) if auto_trailing is not None: stop3_price = int(auto_trailing) elif str(price_row.get("profit_lock_stage") or "NORMAL") != "NORMAL" and protected_stop is not None: stop3_price = int(protected_stop) stop3_qty = None if stop3_price is not None: tp3_qty = number_or_none(tp_row.get("tp3_qty"), price_row.get("tp3_qty")) if tp3_qty is not None and tp3_qty > 0: stop3_qty = int(tp3_qty) elif base_stop_qty_int is not None: tp1_qty = int(number_or_none(tp_row.get("tp1_qty"), price_row.get("tp1_qty"), 0) or 0) tp2_qty = int(number_or_none(tp_row.get("tp2_qty"), price_row.get("tp2_qty"), 0) or 0) residual_qty = base_stop_qty_int - tp1_qty - tp2_qty stop3_qty = residual_qty if residual_qty > 0 else None normalized_row = dict(row) normalized_row["stop1_price_krw"] = row.get("stop1_price_krw", row.get("proposed_stop_price_krw")) normalized_row["stop1_quantity"] = stop1_qty normalized_row["stop2_price_krw"] = row.get("proposed_stop_price_krw") if stop2_qty is not None else None normalized_row["stop2_quantity"] = stop2_qty normalized_row["stop3_price_krw"] = stop3_price normalized_row["stop3_quantity"] = stop3_qty normalized_row.setdefault( "tp1_price_krw", tp_row.get("tp1_price", row.get("proposed_take_profit_price_krw")), ) normalized_row.setdefault( "tp1_quantity", tp_row.get("tp1_qty", price_row.get("tp1_qty")), ) normalized_row.setdefault("tp2_price_krw", tp_row.get("tp2_price", price_row.get("tp2_price"))) normalized_row.setdefault("tp2_quantity", tp_row.get("tp2_qty", price_row.get("tp2_qty"))) normalized_row.setdefault("tp3_price_krw", tp_row.get("tp3_price", price_row.get("tp3_price"))) normalized_row.setdefault("tp3_quantity", tp_row.get("tp3_qty", price_row.get("tp3_qty"))) normalized.append(normalized_row) h["proposal_reference_json"] = json.dumps(normalized, ensure_ascii=False) return h def normalize_backdata_harness_payload(harness: dict[str, Any], canonical_backdata_json: str) -> dict[str, Any]: h = dict(harness) raw = h.get("backdata_feature_bank_json") if raw is None: h["backdata_feature_bank_json"] = canonical_backdata_json return h parsed = raw if isinstance(raw, str): try: parsed = json.loads(raw) except Exception: h["backdata_feature_bank_json"] = canonical_backdata_json return h if not isinstance(parsed, list): h["backdata_feature_bank_json"] = canonical_backdata_json return h if parsed and not isinstance(parsed[0], dict): h["backdata_feature_bank_json"] = canonical_backdata_json return h def convert_xlsx_to_json(xlsx_path: Path, output_path: Path) -> None: print(f"Reading {xlsx_path}...") xl = pd.ExcelFile(xlsx_path) result: dict[str, Any] = { "metadata": { "schema_version": SCHEMA_VERSION, "source": os.path.basename(xlsx_path), "converted_at": pd.Timestamp.now(tz="Asia/Seoul").isoformat(), "source_format": "xlsx", "sheets_included": [], "sheet_headers": {}, }, "data": {}, } for sheet in xl.sheet_names: if sheet in EXCLUDE_SHEETS or sheet.startswith("cs_chunk_"): print(f"Skipping transient sheet: {sheet}") continue print(f"Processing sheet: {sheet}...") header_row = find_header_row(xlsx_path, sheet) df = pd.read_excel(xlsx_path, sheet_name=sheet, header=header_row) df = clean_dataframe(df) result["metadata"]["sheet_headers"][sheet] = { "header_row_1based": header_row + 1, "row_count": int(len(df)), "column_count": int(len(df.columns)), } if sheet in ("settings", "harness_context"): converted = convert_settings(df) key = "_harness_context" if sheet == "harness_context" else sheet result["data"][key] = converted else: result["data"][sheet] = normalize_legacy_source_markers(sheet, dataframe_records(df)) result["metadata"]["sheets_included"].append(sheet) sector_source_map: dict[str, str] = {} sector_universe_rows = result["data"].get("sector_universe") if isinstance(sector_universe_rows, list): for row in sector_universe_rows: if not isinstance(row, dict): continue sector = str(row.get("Sector") or "").strip() if not sector: continue source = str(row.get("Source") or "").strip() or "SHEET_INPUT" sector_source_map.setdefault(sector, source) sector_flow_rows = result["data"].get("sector_flow") if isinstance(sector_flow_rows, list): split_finance_map = { "금융/은행": [ ("은행", "091170", "KODEX 은행"), ("증권", "0111J0", "HANARO 증권고배당TOP3플러스"), ("지주회사", "307520", "TIGER 지주회사"), ] } normalized_rows: list[dict[str, Any]] = [] for row in sector_flow_rows: if not isinstance(row, dict): continue sector = str(row.get("Sector") or "").strip() if not sector: continue source = str(row.get("Universe_Source") or "").strip() or sector_source_map.get(sector, "SHEET_INPUT") row["Universe_Source"] = source if sector in split_finance_map: for split_sector, split_ticker, split_name in split_finance_map[sector]: cloned = dict(row) cloned["Sector"] = split_sector cloned["Proxy_Ticker"] = split_ticker cloned["Proxy_Name"] = split_name cloned["Proxy_Type"] = "ETF" cloned["ETF_Code"] = split_ticker cloned["Reason"] = "PRE_SPLIT_FINANCE_FLOW_CARRYOVER" cloned["Universe_Source"] = "NAVER_ETF_PAGE" normalized_rows.append(cloned) else: normalized_rows.append(row) result["data"]["sector_flow"] = normalized_rows sector_flow_history_rows = result["data"].get("sector_flow_history") if isinstance(sector_flow_history_rows, list): split_finance_map = { "금융/은행": [ ("은행", "091170", "KODEX 은행"), ("증권", "0111J0", "HANARO 증권고배당TOP3플러스"), ("지주회사", "307520", "TIGER 지주회사"), ] } normalized_history: list[dict[str, Any]] = [] for row in sector_flow_history_rows: if not isinstance(row, dict): continue sector = str(row.get("Sector") or "").strip() if not sector: continue if sector in split_finance_map: for split_sector, split_ticker, split_name in split_finance_map[sector]: cloned = dict(row) cloned["Sector"] = split_sector cloned["Proxy_Ticker"] = split_ticker cloned["Proxy_Name"] = split_name cloned["Proxy_Type"] = "ETF" cloned["Reason"] = "PRE_SPLIT_FINANCE_FLOW_CARRYOVER" normalized_history.append(cloned) else: normalized_history.append(row) result["data"]["sector_flow_history"] = normalized_history # harness_context 시트가 없으면 메타에 경고 기록 if "_harness_context" not in result["data"]: result["metadata"]["harness_context_missing"] = ( "harness_context 시트 없음 — GAS buildHarnessContext_() 미실행. " "LLM 분석 전 GAS 에서 buildHarnessContext_() 를 실행하고 xlsx 를 재다운로드하세요." ) print("WARNING: harness_context sheet not found - run buildHarnessContext_() in GAS first.") else: result["metadata"]["harness_context_missing"] = None result["data"]["_harness_context"] = synthesize_apex_harness(result["data"]["_harness_context"]) snapshot_gate = synthesize_snapshot_gate(result["data"]) result["data"]["snapshot_execution_gate"] = snapshot_gate.get("status") or ("ALLOW_EXECUTION" if snapshot_gate.get("fresh") else "BLOCK_EXECUTION") result["data"]["snapshot_execution_reason"] = snapshot_gate.get("reason") result["data"]["account_snapshot_freshness_json"] = json.dumps(snapshot_gate, ensure_ascii=False) if isinstance(result["data"].get("_harness_context"), dict): result["data"]["_harness_context"].setdefault("snapshot_execution_gate", result["data"]["snapshot_execution_gate"]) result["data"]["_harness_context"].setdefault("snapshot_execution_reason", result["data"]["snapshot_execution_reason"]) result["data"]["_harness_context"].setdefault("account_snapshot_freshness_json", result["data"]["account_snapshot_freshness_json"]) synthesize_core_satellite_execution_fields(result["data"]) synthesize_sell_priority_fallback(result["data"]) # backdata_feature_bank 누적 원장 중 replay 행은 운영 하네스 원장과 분리 # - 운영 하네스/검증(backdata_feature_bank_json): GAS_AUTO 등 운영 레코드만 포함 # - replay 보존(backdata_feature_bank_replay_json): 분석/감사용 별도 보관 backdata_raw = result["data"].get("backdata_feature_bank") if isinstance(backdata_raw, list): operational_rows: list[dict[str, Any]] = [] replay_rows: list[dict[str, Any]] = [] for row in backdata_raw: if not isinstance(row, dict): continue origin = string_or_empty(row.get("Source_Origin"), row.get("source_origin")) if origin == "REPLAY_BACKFILL_KRX_EOD": replay_rows.append(row) else: operational_rows.append(row) result["data"]["backdata_feature_bank"] = operational_rows result["data"]["backdata_feature_bank_replay"] = replay_rows result["data"]["backdata_feature_bank_replay_json"] = json.dumps(replay_rows, ensure_ascii=False) backdata_rows = synthesize_backdata_feature_bank(result["data"]) backdata_json = json.dumps(backdata_rows, ensure_ascii=False) result["data"]["backdata_feature_bank_json"] = backdata_json if isinstance(result["data"].get("_harness_context"), dict): result["data"]["_harness_context"] = normalize_backdata_harness_payload( result["data"]["_harness_context"], backdata_json ) result["data"]["_harness_context"] = normalize_proposal_reference_payload( result["data"]["_harness_context"] ) result["data"]["_harness_context"] = normalize_qty_list_fields( result["data"]["_harness_context"] ) result["data"]["_harness_context"].setdefault("backdata_learning_lock", True) result["data"]["_harness_context"] = ensure_extended_harness_defaults(result["data"]["_harness_context"]) result["data"]["_harness_context"] = patch_whipsaw_blueprint(result["data"]["_harness_context"]) # Recompute checksum invariants after any blueprint mutation patch. result["data"]["_harness_context"] = ensure_extended_harness_defaults(result["data"]["_harness_context"]) output_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(f"Conversion complete: {output_path}") if __name__ == "__main__": convert_xlsx_to_json(DEFAULT_XLSX, DEFAULT_JSON)