"""validate_execution_simulator_v1.py — spec/55: H004_EXECUTION_SIMULATOR Simulates tick normalization, minimum order quantity, cash floor, and slippage checks for every order in the final_decision_packet. formula_id: VALIDATE_EXECUTION_SIMULATOR_V1 contract: spec/55_execution_simulator_contract.yaml """ from __future__ import annotations import json import math import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] DEFAULT_PACKET = ROOT / "Temp" / "final_decision_packet_active.json" OUTPUT_PATH = ROOT / "Temp" / "execution_simulator_v1.json" # Simulation parameters from spec/55 SLIPPAGE_BPS = 5 MINIMUM_RESERVE_KRW = 10_000_000 GOAL_TARGET_KRW = 500_000_000 D_PLUS_2_COUNTS = True # KRX tick table (spec/00 HS008 / spec/13_formula_registry TICK_NORMALIZER_V1) TICK_TABLE = [ ( 2_000, 1), ( 5_000, 5), ( 20_000, 10), ( 50_000, 50), ( 200_000, 100), ( 500_000, 500), (2_000_000, 1_000), ] TICK_ABOVE_2M = 1_000 # Load names from GatherTradingData.json to detect ETF or US Stocks TICKER_NAMES: dict[str, str] = {} gtd_path = ROOT / "GatherTradingData.json" if gtd_path.exists(): try: gtd = json.loads(gtd_path.read_text(encoding="utf-8")) df_rows = gtd.get("data", {}).get("data_feed") or [] for r in df_rows: if isinstance(r, dict) and r.get("Ticker"): TICKER_NAMES[str(r["Ticker"])] = str(r.get("Name") or "") univ_rows = gtd.get("data", {}).get("universe") or [] for r in univ_rows: if isinstance(r, dict) and r.get("Ticker"): TICKER_NAMES[str(r["Ticker"])] = str(r.get("Name") or "") except Exception: pass def _is_etf(ticker: str) -> bool: name = TICKER_NAMES.get(ticker, "").upper() return any(x in name for x in ["KODEX", "TIGER", "KBSTAR", "ARIRANG", "HANARO", "TIMEFOLIO", "SOL", "ACE", "PLUS"]) def _is_us_stock(ticker: str) -> bool: return str(ticker).isalpha() def _load_json(path: Path) -> dict: if not path.exists(): return {"_missing": True, "_path": str(path)} try: return json.loads(path.read_text(encoding="utf-8")) except Exception as e: return {"_error": str(e), "_path": str(path)} def _tick_size(price: float, ticker: str) -> float: if _is_us_stock(ticker): return 0.01 if _is_etf(ticker): return 5.0 for threshold, tick in TICK_TABLE: if price < threshold: return tick return TICK_ABOVE_2M def _normalize_tick(price: float, ticker: str) -> float: tick = _tick_size(price, ticker) return math.floor(price / tick) * tick def _apply_slippage(price: float, side: str) -> float: factor = SLIPPAGE_BPS / 10_000 if side == "BUY": return price * (1 + factor) return price * (1 - factor) def _extract_orders(packet: dict) -> list[dict]: orders = [] blueprint = packet.get("order_blueprint_json") or [] if isinstance(blueprint, list): for order in blueprint: if isinstance(order, dict): orders.append(order) per_ticker = packet.get("per_ticker") or {} if isinstance(per_ticker, dict) and not orders: for ticker, data in per_ticker.items(): if not isinstance(data, dict): continue action = str(data.get("final_action") or "").upper() if any(x in action for x in ("BUY", "SELL", "TRIM", "EXIT")): orders.append({ "ticker": ticker, "action": action, "price": data.get("entry_price") or data.get("close_price") or 0, "quantity": data.get("final_qty") or data.get("sell_qty") or 0, "validation_status": data.get("validation_status") or "UNKNOWN", }) return orders def _simulate_order(order: dict) -> dict: errors = [] price = float(order.get("price") or 0) qty = int(order.get("quantity") or order.get("qty") or 0) action = str(order.get("action") or order.get("order_type") or "").upper() side = "BUY" if "BUY" in action else "SELL" ticker = str(order.get("ticker") or "") # Tick normalization check if price > 0: normalized = _normalize_tick(price, ticker) if abs(normalized - price) > 0.01: errors.append(f"TICK_INVALID: price={price} → normalized={normalized}") else: errors.append("PRICE_MISSING") # Minimum quantity check if qty < 1: errors.append(f"QTY_BELOW_MIN: qty={qty} < 1") # Slippage-adjusted price slippage_price = _apply_slippage(price, side) if price > 0 else 0 order_amount = slippage_price * qty return { "ticker": ticker, "action": action, "original_price": price, "normalized_price": _normalize_tick(price, ticker) if price > 0 else 0, "slippage_adjusted_price": round(slippage_price, 2) if _is_us_stock(ticker) else round(slippage_price, 0), "quantity": qty, "order_amount_krw": round(order_amount), "errors": errors, "valid": len(errors) == 0, } def run(packet_path: Path) -> dict: packet = _load_json(packet_path) if packet.get("_missing"): result = { "gate": "SKIP", "reason": f"packet missing: {packet_path}", "invalid_order_count": 0, "cash_floor_after_orders_krw": 0, "slippage_adjusted_orders": [], "contract": "spec/55_execution_simulator_contract.yaml", } OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) OUTPUT_PATH.write_text(json.dumps(result, ensure_ascii=False, indent=2)) return result orders = _extract_orders(packet) simulated = [_simulate_order(o) for o in orders] invalid_count = sum(1 for s in simulated if not s["valid"]) # Cash floor check cash_data = packet.get("cash_recovery_plan_json") or {} available_cash = float( (cash_data.get("available_cash_krw") if isinstance(cash_data, dict) else None) or packet.get("available_cash_krw") or 0 ) buy_total = sum( s["order_amount_krw"] for s in simulated if "BUY" in str(s.get("action") or "").upper() ) cash_after = available_cash - buy_total cash_floor_ok = (cash_after >= MINIMUM_RESERVE_KRW) if available_cash > 0 else True gate = "PASS" if invalid_order_count := invalid_count: gate = "FAIL" elif not cash_floor_ok: gate = "FAIL" result = { "gate": gate, "invalid_order_count": invalid_order_count, "cash_floor_after_orders_krw": round(cash_after), "cash_floor_ok": cash_floor_ok, "minimum_reserve_krw": MINIMUM_RESERVE_KRW, "available_cash_krw": available_cash, "buy_total_krw": buy_total, "slippage_adjusted_orders": simulated, "order_count": len(simulated), "contract": "spec/55_execution_simulator_contract.yaml", } OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) OUTPUT_PATH.write_text(json.dumps(result, ensure_ascii=False, indent=2)) return result def main() -> None: import argparse parser = argparse.ArgumentParser(description="H004 Execution Simulator") parser.add_argument("--packet", default=str(DEFAULT_PACKET)) args = parser.parse_args() result = run(Path(args.packet)) gate = result.get("gate", "FAIL") print(f"[H004_EXECUTION_SIMULATOR] gate={gate} " f"orders={result.get('order_count', 0)} " f"invalid={result.get('invalid_order_count', 0)} " f"cash_floor_ok={result.get('cash_floor_ok', True)}") if gate == "FAIL": print(" Invalid orders:", [o for o in result.get("slippage_adjusted_orders", []) if not o.get("valid")]) sys.exit(1) if __name__ == "__main__": main()