Files
QuantEngineByItz/src/quant_engine/inject_computed_harness.py
T
kjh2064 3041fa6eaa fix(total_asset): 총자산 3원천 불일치 수정 — GAS 2-pass 누락 및 stale 하네스 보정
1. GAS 2-pass 차등 재계산 (gdc_01_fetch_fundamentals.gs)
   - 구: settlementCashD2 + Naver주가 합산 → ISA·연금저축·CMA ~10.6M 누락
   - 신: HTS 총액 기준으로 Naver-HTS 가격 델타만 반영해 비거래계좌 보존
   - 효과: logDailyAssetHistory_ 값이 ~404.9M → ~413M으로 수정(GAS 재배포 후)

2. inject_computed_harness.py total_asset 정원(正源) 수정
   - settings.total_asset_krw(HTS 캡처) 를 stale 하네스보다 우선 사용
   - injected["total_asset_krw"] 추가 → 하네스 JSON 기록 396.8M→417M 수정
   - 반도체 클러스터·포지션 가중치 계산 기준 일관화

3. compute_formula_outputs.py 사문(死文) 코드 정리
   - holdings_value+cash_d2 계산 후 파일 미저장 문제 → settings 동기화로 대체

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-16 00:27:10 +09:00

1539 lines
78 KiB
Python

#!/usr/bin/env python3
"""
inject_computed_harness.py
───────────────────────────────────────────────────────────────────────────────
하네스 Python 정밀 보정기 (SPRINT 1-4 확장판 — 55+ 필드)
GAS 산출 후 harness_context에 누락 필드를 주입(inject)하여
AGENTS.md 커버리지를 100%로 끌어올린다.
주입 원천:
A) 기존 harness_context 파생 : data_freshness_status, intraday_scope,
flow_acceleration_status, distribution_sell_detector_status, 등
B) Python 결정론적 계산 : ratchet_stage_v2, auto_trailing_stop_v2,
sell_price_sanity_status, cash_recovery_plan_json
C) SPRINT 1 신규 : semiconductor_cluster_json,
single_position_weight_json, position_count_gate,
stop_breach_alert_json, heat_concentration_json,
portfolio_health_blocked_json, anti_chasing_velocity_json,
distribution_sell_detector_json (6신호), k2_staged_rebound_sell_json,
cash_recovery_plan_json (K2+C1 폭포수), routing_execution_log (D1)
D) SPRINT 3 신규 : pre_distribution_warning (L4)
E) SPRINT 4 신규 : sfg_v1/sfg_broken_count/sfg_failure_rate (SFG scalars),
trade_quality_json (F1), pattern_blacklist_json (F2),
portfolio_correlation_gate_json/correlation_gate_status (PCG)
사용법:
python tools/inject_computed_harness.py [GatherTradingData.json]
python tools/inject_computed_harness.py --dry-run
"""
from __future__ import annotations
import json
import math
import sys
from datetime import datetime
from pathlib import Path
# Windows cp949 터미널 호환
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1)
if sys.stderr.encoding and sys.stderr.encoding.lower() not in ("utf-8", "utf8"):
sys.stderr = open(sys.stderr.fileno(), mode="w", encoding="utf-8", buffering=1)
ROOT = Path(__file__).resolve().parents[2]
SEP = "=" * 70
# ─────────────────────────────────────────────────────────────────────────────
# 유틸리티
# ─────────────────────────────────────────────────────────────────────────────
def parse_field(hc: dict, key: str, default=None):
"""harness_context 값: str → JSON parse / list/dict → 그대로."""
val = hc.get(key, default)
if val is None:
return default
if isinstance(val, (list, dict)):
return val
if isinstance(val, str):
s = val.strip()
if s.startswith("{") or s.startswith("["):
try:
return json.loads(s)
except Exception:
pass
return val
def parse_int_setting(value, default: int) -> int:
try:
parsed = int(float(value))
return parsed if parsed > 0 else default
except Exception:
return default
def latest_snapshot_captured_at_iso(rows: list[dict]) -> str | None:
"""Return the latest valid captured_at/last_updated ISO string from snapshot rows."""
latest_dt: datetime | None = None
latest_iso: str | None = None
for row in rows:
if not isinstance(row, dict):
continue
for key in ("captured_at", "last_updated"):
raw = str(row.get(key) or "").strip()
if not raw:
continue
try:
dt = datetime.fromisoformat(raw.replace("Z", "+00:00"))
except Exception:
continue
if dt.tzinfo is None:
dt = dt.astimezone()
if latest_dt is None or dt > latest_dt:
latest_dt = dt
latest_iso = raw
return latest_iso
# ─────────────────────────────────────────────────────────────────────────────
# KRX 호가단위 (TICK_NORMALIZER_V1)
# ─────────────────────────────────────────────────────────────────────────────
KRX_TICK_TABLE = [
(2_000, 1), (5_000, 5), (20_000, 10), (50_000, 50),
(200_000, 100), (500_000, 500), (math.inf, 1000),
]
def krx_tick_unit(price: float) -> int:
for threshold, tick in KRX_TICK_TABLE:
if price < threshold:
return tick
return 1000
def normalize_tick(price: float) -> int:
tick = krx_tick_unit(price)
return int(round(price / tick) * tick)
# ─────────────────────────────────────────────────────────────────────────────
# PROFIT_RATCHET_TIERED_V2
# ─────────────────────────────────────────────────────────────────────────────
STAGE_ORDER = ["NORMAL", "BREAKEVEN_RATCHET", "PROFIT_LOCK_10",
"PROFIT_LOCK_20", "PROFIT_LOCK_30", "APEX_TRAILING", "APEX_SUPER"]
def classify_ratchet_v2(profit_pct: float) -> str:
if profit_pct >= 60: return "APEX_SUPER"
if profit_pct >= 40: return "APEX_TRAILING"
if profit_pct >= 30: return "PROFIT_LOCK_30"
if profit_pct >= 20: return "PROFIT_LOCK_20"
if profit_pct >= 10: return "PROFIT_LOCK_10"
if profit_pct >= 0: return "BREAKEVEN_RATCHET"
return "NORMAL"
def trailing_stop_v2(profit_pct: float, highest: float, atr20: float,
ratchet_stop: float) -> int | None:
stage = classify_ratchet_v2(profit_pct)
base = ratchet_stop or 0
if stage == "APEX_SUPER": mult = 1.2
elif stage == "APEX_TRAILING": mult = 1.5
elif stage in ("PROFIT_LOCK_30", "PROFIT_LOCK_20"): mult = 2.0
else: return None
return normalize_tick(max(base, highest - mult * atr20))
# ─────────────────────────────────────────────────────────────────────────────
# SELL_PRICE_SANITY_V1
# ─────────────────────────────────────────────────────────────────────────────
SANITY_ORDER = ["PASS", "INVALID_UNREALISTIC_PRICE",
"INVALID_PRICE_INVERSION", "INVALID_TICK"]
def check_sanity(sell_limit: float, stop_loss: float | None,
prev_close: float, ticker: str = "") -> dict:
issues: list[str] = []
status = "PASS"
if stop_loss and sell_limit < stop_loss:
issues.append(f"PRICE_INVERSION sell={sell_limit:,} < stop={stop_loss:,}")
status = "INVALID_PRICE_INVERSION"
if sell_limit > prev_close * 1.30:
issues.append(f"UNREALISTIC sell={sell_limit:,} > prev*1.30")
if status == "PASS": status = "INVALID_UNREALISTIC_PRICE"
tick = krx_tick_unit(sell_limit)
if sell_limit % tick != 0:
issues.append(f"INVALID_TICK sell={sell_limit:,} tick={tick}")
if status == "PASS": status = "INVALID_TICK"
return {"status": status, "issues": issues, "ticker": ticker}
# ─────────────────────────────────────────────────────────────────────────────
# CASH_RECOVERY_OPTIMIZER_V1 (레거시 단순 버전)
# ─────────────────────────────────────────────────────────────────────────────
def cash_recovery(sell_candidates: list[dict], shortfall: float) -> dict:
plan: list[dict] = []
cum = 0.0
for cand in sell_candidates:
if cum >= shortfall:
break
ticker = cand.get("Ticker", "")
name = cand.get("Name", "")
qty = cand.get("Sell_Qty") or 0
price = cand.get("Sell_Limit_Price") or cand.get("current_price", 0)
ratio = cand.get("Cash_Preserve_Ratio", 100)
style = cand.get("Cash_Preserve_Style", "FULL")
exp_krw = qty * price * ratio / 100 if (qty and price) else 0
plan.append({"ticker": ticker, "name": name, "qty": qty,
"limit_price": normalize_tick(price) if price else None,
"preserve_style": style, "preserve_ratio": ratio,
"expected_krw": round(exp_krw)})
cum += exp_krw
return {"sell_sequence": plan, "expected_total_krw": round(cum),
"cash_shortfall_min_krw": shortfall,
"shortfall_met": cum >= shortfall, "items_needed": len(plan),
"formula_id": "CASH_RECOVERY_OPTIMIZER_V1"}
# ─────────────────────────────────────────────────────────────────────────────
# SPRINT 1: SEMICONDUCTOR_CLUSTER_GATE_V1 (Direction O2)
# ─────────────────────────────────────────────────────────────────────────────
def calc_semiconductor_cluster(hc: dict, acct_map: dict, total_asset: float,
settings: dict | None = None) -> dict:
"""MARKET_WEIGHT_AWARE_CLUSTER_GATE_V1 — 정책 기반 동적 한도.
settings에서 kospi_semi_weight_pct를 읽어 실제 시장 비중 기반 한도 적용.
미입력 시 정책 고정 한도만 사용 (추측값 삽입 금지).
"""
import math as _math
CORE = {"005930", "000660"}
combined_val = 0.0
per_ticker = {}
for ticker in CORE:
a = acct_map.get(ticker, {})
qty = a.get("holding_quantity") or a.get("quantity") or 0
price = a.get("current_price") or a.get("average_cost") or 0
mktval = qty * price
combined_val += mktval
per_ticker[ticker] = round(mktval / total_asset * 100, 2) if total_asset else 0
combined_pct = round(combined_val / total_asset * 100, 2) if total_asset else 0
regime = str(hc.get("market_regime", "NEUTRAL")).upper()
# KOSPI 반도체 시총 비중 — settings에서만 읽기 (하드코딩 기본값 없음)
sett = settings or {}
kospi_semi_wt = float(sett.get("kospi_semi_weight_pct") or 0)
mkt_wt_provided = kospi_semi_wt > 0
cla_info = parse_field(hc, "regime_cla_json", {}) or {}
cla_active = bool(isinstance(cla_info, dict) and cla_info.get("cla_active"))
cla_cluster_state = str((cla_info.get("cluster_state") or "") if isinstance(cla_info, dict) else "").upper()
sl_info = parse_field(hc, "secular_leader_gate_json", {}) or {}
sl_active = any(
isinstance(v, dict) and bool(v.get("active"))
for v in (sl_info.values() if isinstance(sl_info, dict) else [])
)
is_event_shock = "EVENT_SHOCK" in regime
is_risk_off = is_event_shock or "RISK_OFF" in regime
is_risk_on = "RISK_ON" in regime and not is_risk_off
is_secular_leader = "SECULAR_LEADER" in regime or sl_active
is_cla = cla_active or cla_cluster_state == "CLUSTER_HOLD_ONLY" or "CONCENTRATED_LEADER_ADVANCE" in regime
# 국면별 정책 한도 (EXPERT_PRIOR)
if is_event_shock:
cap = max(20.0, kospi_semi_wt * 0.60) if mkt_wt_provided else 20.0
gate_mode = "DEFENSIVE_STRICT"
elif is_risk_off:
cap = max(25.0, kospi_semi_wt * 0.80) if mkt_wt_provided else 25.0
gate_mode = "DEFENSIVE"
elif is_secular_leader or is_cla:
cap = 65.0
gate_mode = "SECULAR_LEADER"
elif is_risk_on:
cap = max(45.0, kospi_semi_wt * 1.30) if mkt_wt_provided else 45.0
gate_mode = "RISK_ON_OVERWEIGHT"
else:
cap = max(35.0, kospi_semi_wt * 1.00) if mkt_wt_provided else 35.0
gate_mode = "MARKET_NEUTRAL"
warn_threshold = (kospi_semi_wt * 0.90 if mkt_wt_provided else cap * 0.80)
if combined_pct >= cap:
gate = "CLUSTER_BLOCK" if is_risk_off else "CLUSTER_OVERWEIGHT_TRIM"
cluster_state = "CLUSTER_HOLD_ONLY"
elif combined_pct >= warn_threshold:
gate = "CLUSTER_HOLD_ONLY" if (is_secular_leader or is_cla) else "CLUSTER_OVERWEIGHT_WARN"
cluster_state = "CLUSTER_HOLD_ONLY" if (is_secular_leader or is_cla) else "CLUSTER_OPEN"
else:
gate = "PASS"
cluster_state = "CLUSTER_OPEN"
return {
"samsung_pct": per_ticker.get("005930", 0),
"hynix_pct": per_ticker.get("000660", 0),
"combined_pct": combined_pct,
"cap_pct": round(cap, 2),
"warn_threshold_pct": round(warn_threshold, 2),
"kospi_semi_weight": round(kospi_semi_wt, 2) if mkt_wt_provided else "DATA_MISSING",
"kospi_weight_provided": mkt_wt_provided,
"gate_mode": gate_mode,
"semiconductor_cluster_gate": gate,
"cluster_state": cluster_state,
"formula_id": "MARKET_WEIGHT_AWARE_CLUSTER_GATE_V1",
}
# ─────────────────────────────────────────────────────────────────────────────
# SPRINT 1: SINGLE_POSITION_WEIGHT_CAP_V1 (Direction O1)
# ─────────────────────────────────────────────────────────────────────────────
def calc_single_position_weights(acct_map: dict, total_asset: float, regime: str,
settings: dict | None = None) -> list:
"""LEADER_POSITION_WEIGHT_CAP_V1 — 삼성·하이닉스 차등 한도."""
r = str(regime).upper()
is_event_shock = "EVENT_SHOCK" in r
is_risk_off = is_event_shock or "RISK_OFF" in r
is_risk_on = "RISK_ON" in r and not is_risk_off
is_secular_leader = "SECULAR_LEADER" in r
# KOSPI 개별 비중 — settings에서만 읽기 (추측값 없음)
sett = settings or {}
sm_wt = float(sett.get("kospi_samsung_weight_pct") or 0)
hx_wt = float(sett.get("kospi_hynix_weight_pct") or 0)
sm_provided = sm_wt > 0
hx_provided = hx_wt > 0
default_cap = 15.0 if is_risk_off else (22.0 if is_risk_on else 20.0)
result = []
for ticker, a in acct_map.items():
qty = a.get("holding_quantity") or a.get("quantity") or 0
price = a.get("current_price") or a.get("average_cost") or 0
mktval = qty * price
weight_pct = round(mktval / total_asset * 100, 2) if total_asset else 0
if ticker == "005930":
if is_event_shock: cap = 15.0
elif is_risk_off: cap = 18.0
elif is_secular_leader: cap = max(50.0, sm_wt * 2.20) if sm_provided else 50.0
elif is_risk_on: cap = max(40.0, sm_wt * 1.70) if sm_provided else 40.0
else: cap = max(28.0, sm_wt * 1.20) if sm_provided else 28.0
elif ticker == "000660":
if is_event_shock: cap = 10.0
elif is_risk_off: cap = 12.0
elif is_secular_leader: cap = max(28.0, hx_wt * 2.50) if hx_provided else 28.0
elif is_risk_on: cap = max(22.0, hx_wt * 1.80) if hx_provided else 22.0
else: cap = max(15.0, hx_wt * 1.20) if hx_provided else 15.0
else:
cap = default_cap
cap = round(cap, 2)
gate = "OVERWEIGHT_TRIM" if weight_pct > cap else "PASS"
result.append({
"ticker": ticker,
"name": a.get("name", ""),
"weight_pct": weight_pct,
"threshold_pct": cap,
"single_position_weight_gate": gate,
"is_leader": ticker in ("005930", "000660"),
"excess_pct": round(weight_pct - cap, 2) if gate == "OVERWEIGHT_TRIM" else 0.0,
})
result.sort(key=lambda x: -x["weight_pct"])
return result
# ─────────────────────────────────────────────────────────────────────────────
# SPRINT 1: STOP_BREACH_ALERT_V1 (Direction P1)
# ─────────────────────────────────────────────────────────────────────────────
def calc_stop_breach_alerts(prices: list, acct_map: dict, cs_map: dict) -> list:
result = []
for p in prices:
ticker = p.get("ticker", "")
cur = (p.get("current_price_krw") or
acct_map.get(ticker, {}).get("current_price") or
cs_map.get(ticker, {}).get("Close") or 0)
stop = (p.get("stop_price") or
acct_map.get(ticker, {}).get("stop_price") or 0)
if not (cur and stop):
continue
gap_pct = (cur - stop) / cur * 100
gate = ("BREACH" if gap_pct <= 0 else
"APPROACHING" if gap_pct <= 3.0 else "SAFE")
result.append({
"ticker": ticker,
"name": acct_map.get(ticker, {}).get("name", ""),
"current_price": cur,
"stop_price": stop,
"gap_pct": round(gap_pct, 2),
"stop_breach_gate": gate,
})
return result
# ─────────────────────────────────────────────────────────────────────────────
# SPRINT 1: HEAT_CONCENTRATION_ALERT_V1 (Direction P3)
# ─────────────────────────────────────────────────────────────────────────────
def calc_heat_concentration(buy_qty_inputs: list, total_heat_pct: float) -> list:
result = []
for b in buy_qty_inputs:
ticker = b.get("ticker", "")
heat = b.get("heat_pct") or b.get("position_heat_pct") or 0
share = round(heat / total_heat_pct * 100, 2) if total_heat_pct else 0.0
gate = "HEAT_CONCENTRATED" if share >= 50 else "PASS"
result.append({
"ticker": ticker,
"name": b.get("name", ""),
"ticker_heat_pct": heat,
"total_heat_pct": total_heat_pct,
"heat_share_pct": share,
"heat_concentration_gate": gate,
})
result.sort(key=lambda x: -x["heat_share_pct"])
return result
# ─────────────────────────────────────────────────────────────────────────────
# SPRINT 1: PORTFOLIO_HEALTH_BLOCKED_JSON (Direction P5)
# ─────────────────────────────────────────────────────────────────────────────
def calc_portfolio_health_blocked(hc: dict, sc_info: dict,
pw_list: list, pc_gate: str) -> list:
blocked = []
cf = str(hc.get("cash_floor_status", ""))
if "HARD_BLOCK" in cf or "BLOCK" in cf:
blocked.append({"gate": "cash_floor_status", "severity": "CRITICAL", "value": cf})
sc_gate = sc_info.get("semiconductor_cluster_gate", "")
if sc_gate == "CLUSTER_BLOCK":
blocked.append({
"gate": "semiconductor_cluster", "severity": "CRITICAL",
"value": "CLUSTER_BLOCK",
"detail": f"combined={sc_info.get('combined_pct', 0):.1f}% vs cap={sc_info.get('cap_pct', sc_info.get('threshold_pct', 25)):.0f}%"
})
elif sc_gate == "CLUSTER_OVERWEIGHT_TRIM":
blocked.append({
"gate": "semiconductor_cluster", "severity": "HIGH",
"value": "CLUSTER_OVERWEIGHT_TRIM",
"detail": f"combined={sc_info.get('combined_pct', 0):.1f}% cap={sc_info.get('cap_pct', 25):.0f}% — 감축 권고 (강제차단 아님)"
})
if pc_gate == "POSITION_COUNT_BLOCK":
blocked.append({"gate": "position_count_gate", "severity": "CRITICAL", "value": pc_gate})
overweight = [p["ticker"] for p in pw_list if p["single_position_weight_gate"] == "OVERWEIGHT_TRIM"]
if overweight:
blocked.append({
"gate": "single_position_weight", "severity": "HIGH",
"value": "OVERWEIGHT_TRIM", "tickers": overweight
})
heat_gate = str(hc.get("heat_gate_status", ""))
if heat_gate and heat_gate not in ("PASS", ""):
blocked.append({"gate": "heat_gate_status", "severity": "WARN", "value": heat_gate})
sect_gate = str(hc.get("sector_concentration_gate", ""))
if "BLOCK" in sect_gate:
blocked.append({"gate": "sector_concentration_gate", "severity": "HIGH", "value": sect_gate})
return blocked
# ─────────────────────────────────────────────────────────────────────────────
# SPRINT 1: ANTI_CHASING_VELOCITY_V1 per-ticker (Direction A2 + B1)
# ─────────────────────────────────────────────────────────────────────────────
def calc_anti_chasing_per_ticker(prices: list, acct_map: dict, cs_map: dict) -> list:
result = []
for p in prices:
ticker = p.get("ticker", "")
cur = (p.get("current_price_krw") or
acct_map.get(ticker, {}).get("current_price") or
cs_map.get(ticker, {}).get("Close") or 0)
prev = (p.get("prev_close_krw") or
cs_map.get(ticker, {}).get("PrevClose") or
cs_map.get(ticker, {}).get("Close") or 0)
ma20 = (p.get("ma20") or cs_map.get(ticker, {}).get("MA20") or 0)
atr20 = (p.get("atr20") or cs_map.get(ticker, {}).get("ATR20") or 0)
if not (cur and prev):
continue
vel = (cur - prev) / prev * 100
verdict = ("BLOCK_CHASE" if vel >= 3.0 else
"PULLBACK_WAIT" if vel >= 1.5 else "PASS")
pullback_zone = ("PULLBACK_ZONE" if (ma20 and cur <= ma20 * 1.03) else
"ABOVE_PULLBACK_ZONE" if ma20 else "UNKNOWN")
trigger = normalize_tick(ma20 - 0.5 * atr20) if (ma20 and atr20) else None
result.append({
"ticker": ticker,
"name": acct_map.get(ticker, {}).get("name", ""),
"velocity_1d_pct": round(vel, 2),
"anti_chase_verdict": verdict,
"pullback_zone": pullback_zone,
"pullback_entry_trigger_price": trigger,
})
return result
# ─────────────────────────────────────────────────────────────────────────────
# SPRINT 1: DISTRIBUTION_SELL_DETECTOR_V1 per-ticker 6신호 (Direction B3)
# ─────────────────────────────────────────────────────────────────────────────
def calc_distribution_detector_per_ticker(prices: list, sector_flow: list,
cs_map: dict) -> list:
flow_map = {}
for r in sector_flow:
t = r.get("ticker") or r.get("Ticker") or ""
if t:
flow_map[t] = r
result = []
for p in prices:
ticker = p.get("ticker", "")
cs = cs_map.get(ticker, {})
close = (p.get("current_price_krw") or cs.get("Close") or 0)
high52w = cs.get("High52W") or cs.get("high_52w") or 0
vol_td = cs.get("Volume") or cs.get("volume_today") or 0
avg_v5d = cs.get("AvgVol5D") or cs.get("avg_volume_5d") or 0
ret5d = cs.get("Ret5D") or cs.get("ret_5d") or 0
rsi14 = cs.get("RSI14") or cs.get("rsi14") or 50
obv_sl = cs.get("OBV20DSlope") or cs.get("obv_20d_slope") or 0
prev_ret = cs.get("Ret1DPrev") or cs.get("ret_1d_prev") or 0
open_p = cs.get("Open") or cs.get("open_price") or close
fl = flow_map.get(ticker, {})
fc = (fl.get("Flow_Credit") or fl.get("flow_credit") or
p.get("flow_credit") or 0.5)
sigs = []
if high52w and close >= high52w * 0.97 and avg_v5d and vol_td < avg_v5d * 0.80:
sigs.append("SIG_1") # 고점근접+거래량수축
if ret5d >= 5.0 and fc < 0.45:
sigs.append("SIG_2") # 5일급등+수급약화
if fc < 0.30:
sigs.append("SIG_3") # 외인+기관 동반매도
if rsi14 >= 75 and close < open_p:
sigs.append("SIG_4") # RSI고점+음봉
if obv_sl < 0:
sigs.append("SIG_5") # OBV기울기음수
if prev_ret >= 3.0 and close and open_p and (open_p - close) / close * 100 <= -2.0:
sigs.append("SIG_6") # 전일급등+당일갭하락
cnt = len(sigs)
verdict = ("DISTRIBUTION_CONFIRMED" if cnt >= 3 else
"PRE_WARNING" if cnt >= 1 else "CLEAR")
result.append({
"ticker": ticker,
"signals_count": cnt,
"signals": sigs,
"distribution_verdict": verdict,
})
return result
# ─────────────────────────────────────────────────────────────────────────────
# SPRINT 1: K2_STAGED_REBOUND_SELL_V1 + SELL_WATERFALL_ENGINE_V1 (Direction K2/C1/A3)
# ─────────────────────────────────────────────────────────────────────────────
def calc_k2_waterfall(sell_candidates: list, sp_map: dict,
acct_map: dict, cs_map: dict, shortfall: float) -> tuple[list, dict]:
"""
Returns (k2_list, waterfall_plan)
k2_list: per-ticker K2 50/50 분할
waterfall_plan: C1 4단계 폭포수 전체 계획
"""
k2_list = []
plan = {
"shortfall_krw": int(shortfall),
"waterfall_stage": 1,
"sell_sequence": [],
"total_immediate_krw": 0,
"total_expected_krw": 0,
"shortfall_covered": False,
"formula_id": "K2_STAGED_REBOUND_SELL_V1+SELL_WATERFALL_ENGINE_V1",
}
cumulative = 0.0
# funding_order④: CORE_LAST 종목은 비코어 소진 후 마지막에 편입
non_core = [c for c in sell_candidates if c.get("Cash_Preserve_Style") != "CORE_LAST"]
core_last = [c for c in sell_candidates if c.get("Cash_Preserve_Style") == "CORE_LAST"]
ordered_candidates = non_core + core_last
for rank, cand in enumerate(ordered_candidates, 1):
ticker = cand.get("Ticker") or cand.get("ticker") or ""
qty = cand.get("Sell_Qty") or cand.get("sell_qty") or 0
if isinstance(qty, str) or not qty:
continue
qty = int(float(qty))
sp = sp_map.get(ticker, {})
acct = acct_map.get(ticker, {})
cs = cs_map.get(ticker, {})
limit = (sp.get("Sell_Limit_Price") or sp.get("sell_limit_price") or
acct.get("stop_price") or cs.get("Close") or 0)
if not limit:
continue
limit = float(limit)
limit_norm = normalize_tick(limit)
prev_close = (cs.get("PrevClose") or cs.get("Close") or
acct.get("current_price") or float(limit_norm))
atr20 = cs.get("ATR20") or cs.get("atr20") or 0
remaining = max(0.0, shortfall - cumulative)
immediate_qty = qty // 2
rebound_qty = qty - immediate_qty
half_expected = immediate_qty * limit_norm
emergency = (half_expected * 2 < remaining) and remaining > 0
if emergency:
immediate_qty = qty
rebound_qty = 0
half_expected = qty * limit_norm
rebound_trigger = normalize_tick(prev_close + 0.5 * atr20) if atr20 else None
rebound_exp = int(rebound_qty * (rebound_trigger or limit_norm))
k2_entry = {
"ticker": ticker,
"name": cand.get("Name") or acct.get("name") or "",
"immediate_sell_qty": immediate_qty,
"immediate_limit_price": limit_norm,
"rebound_wait_qty": rebound_qty,
"rebound_trigger_price": rebound_trigger,
"emergency_full_sell": emergency,
}
k2_list.append(k2_entry)
cumulative += half_expected
plan["sell_sequence"].append({
"rank": rank,
"ticker": ticker,
"name": cand.get("Name") or acct.get("name") or "",
"immediate_qty": immediate_qty,
"immediate_limit_price": limit_norm,
"immediate_expected_krw": int(half_expected),
"rebound_wait_qty": rebound_qty,
"rebound_trigger_price": rebound_trigger,
"rebound_expected_krw": rebound_exp,
"emergency_full_sell": emergency,
"execution_style": "EMERGENCY_FULL" if emergency else "OVERSOLD_REBOUND_SELL",
})
plan["total_immediate_krw"] = int(cumulative)
plan["total_expected_krw"] = int(cumulative + sum(
s["rebound_expected_krw"] for s in plan["sell_sequence"]
))
if cumulative >= shortfall:
plan["shortfall_covered"] = True
break
return k2_list, plan
# ─────────────────────────────────────────────────────────────────────────────
# SPRINT 4: SFG Alert Scalars (Direction SFG)
# ─────────────────────────────────────────────────────────────────────────────
def extract_sfg_scalars(hc: dict) -> dict:
sfg = parse_field(hc, "satellite_failure_gate_json", {})
if not isinstance(sfg, dict):
sfg = {}
return {
"sfg_v1": str(sfg.get("sfg_v1", "CLEAR")),
"sfg_broken_count": int(sfg.get("sfg_broken_count", 0)),
"sfg_failure_rate": float(sfg.get("sfg_failure_rate", 0)),
"sfg_reason": str(sfg.get("sfg_reason", "")),
}
# ─────────────────────────────────────────────────────────────────────────────
# SPRINT 4: F1 TRADE_QUALITY_SCORER_V1 (Direction F1)
# ─────────────────────────────────────────────────────────────────────────────
def calc_trade_quality(alpha_history: list) -> dict:
if not alpha_history:
return {
"status": "DATA_MISSING_T20",
"reason": "alpha_history 데이터 없음 — 진입일 입력 후 재계산 필요",
"scored_count": 0,
"summary_score": None,
"overall_grade": None,
"grades": [],
"formula_id": "TRADE_QUALITY_SCORER_V1",
}
grades, scores = [], []
for r in alpha_history:
gate = r.get("T20_Alpha_Gate", "")
t20 = r.get("T20_Vs_Core_Pctp")
if gate == "PASS":
grade, score = "GOOD", 80
elif gate == "FAIL":
grade, score = "POOR", 30
elif gate in ("DATA_MISSING", None, ""):
grade, score = "NEUTRAL_MISSING", 50
else:
grade, score = "NEUTRAL", 50
grades.append({"ticker": r.get("Ticker", ""), "grade": grade, "score": score,
"t20_vs_core_pctp": t20, "t20_alpha_gate": gate})
scores.append(score)
avg = round(sum(scores) / len(scores)) if scores else 0
overall = "GOOD" if avg >= 70 else "POOR" if avg < 40 else "NEUTRAL"
return {
"status": "SCORED",
"scored_count": len(grades),
"summary_score": avg,
"overall_grade": overall,
"grades": grades,
"formula_id": "TRADE_QUALITY_SCORER_V1",
}
# ─────────────────────────────────────────────────────────────────────────────
# SPRINT 4: F2 PATTERN_BLACKLIST_AUTO_V1 (Direction F2)
# ─────────────────────────────────────────────────────────────────────────────
def calc_pattern_blacklist(alpha_history: list) -> dict:
patterns = []
# Pattern 1: ENTRY_DATE_MISSING — 진입일 없는 종목
missing = [r.get("Ticker", "") for r in alpha_history
if not r.get("Entry_Date") and r.get("Ticker")]
if missing:
patterns.append({
"pattern_id": "ENTRY_DATE_MISSING",
"severity": "WARN",
"affected_tickers": missing[:10],
"count": len(missing),
"action": "진입일 입력 필요 — HOLDING_STALE_REVIEW·TRADE_QUALITY 채점 불가",
})
# Pattern 2: T20_DATA_MISSING_ALL — T20 성과 데이터 전무
t20_miss = sum(1 for r in alpha_history
if r.get("T20_Alpha_Gate") in ("DATA_MISSING", None, ""))
if t20_miss == len(alpha_history) and alpha_history:
patterns.append({
"pattern_id": "T20_DATA_MISSING_ALL",
"severity": "WARN",
"affected_tickers": [],
"count": t20_miss,
"action": "T+20 성과 데이터 전무 — TRADE_QUALITY_SCORER_V1 채점 불가",
})
# Pattern 3: SFG_TRIGGERED_SATELLITE_FAILURE — 위성 전략 실패
# (위성 전체 BROKEN → 위성 전략 자체 재검토 필요)
patterns_status = "WARN" if patterns else "PASS"
return {
"status": patterns_status,
"patterns": patterns,
"pattern_count": len(patterns),
"formula_id": "PATTERN_BLACKLIST_AUTO_V1",
}
# ─────────────────────────────────────────────────────────────────────────────
# SPRINT 4: PCG PORTFOLIO_CORRELATION_GATE_V1 (Direction PCG)
# ─────────────────────────────────────────────────────────────────────────────
def calc_pcg(brt: list, acct_map: dict, total_asset: float, regime: str) -> dict:
regime_limits = {"EVENT_SHOCK": 0.7, "RISK_OFF": 0.8, "NEUTRAL": 1.0,
"RISK_ON": 1.3, "ADVANCE": 1.2, "CONCENTRATED_LEADER_ADVANCE": 1.0}
regime_limit = regime_limits.get(regime, 1.0)
valid = [(r.get("ticker"), float(r.get("downside_beta") or 0)) for r in brt
if float(r.get("downside_beta") or 0) != 0.0]
if not valid:
return {
"correlation_gate_status": "INSUFFICIENT_DATA",
"satellite_cluster_beta": None,
"effective_portfolio_beta": None,
"regime_beta_limit": regime_limit,
"reason": "BRT downside_beta 전종목 UNKNOWN/0 — 시계열 데이터 수집 후 재계산",
"formula_id": "PORTFOLIO_CORRELATION_GATE_V1",
}
CORE_TICKERS = {"005930", "000660", "091160"}
w_beta = sat_beta_num = sat_w = 0.0
for ticker, beta in valid:
a = acct_map.get(ticker, {})
qty = float(a.get("holding_quantity") or a.get("quantity") or 0)
price = float(a.get("current_price") or a.get("close") or 0)
val = qty * price
w = val / total_asset if total_asset else 0
w_beta += w * beta
if ticker not in CORE_TICKERS:
sat_beta_num += w * beta
sat_w += w
eff_beta = round(w_beta, 3)
sat_beta = round(sat_beta_num / sat_w, 3) if sat_w else None
if eff_beta > regime_limit:
gate = "CORRELATION_BLOCK"
elif eff_beta > regime_limit * 0.90:
gate = "CORRELATION_WARN"
else:
gate = "PASS"
return {
"correlation_gate_status": gate,
"satellite_cluster_beta": sat_beta,
"effective_portfolio_beta": eff_beta,
"regime_beta_limit": regime_limit,
"regime": regime,
"formula_id": "PORTFOLIO_CORRELATION_GATE_V1",
}
# ─────────────────────────────────────────────────────────────────────────────
# 메인 주입 루틴
# ─────────────────────────────────────────────────────────────────────────────
def main() -> int:
dry_run = "--dry-run" in sys.argv
quiet = "--quiet" in sys.argv
output_path_str = None
if "--output" in sys.argv:
idx = sys.argv.index("--output")
if idx + 1 < len(sys.argv):
output_path_str = sys.argv[idx + 1]
args = [a for a in sys.argv[1:] if not a.startswith("--") and a != output_path_str]
json_path = Path(args[0]) if args else ROOT / "GatherTradingData.json"
def log(msg: str = "") -> None:
if not quiet:
print(msg)
if not json_path.exists():
print(f"[ERROR] {json_path} not found"); return 1
raw = json.loads(json_path.read_text(encoding="utf-8"))
# Load computed harness values if they exist
computed_harness = {}
computed_path = ROOT / "Temp" / "computed_harness_v1.json"
if computed_path.exists():
try:
computed_harness = json.loads(computed_path.read_text(encoding="utf-8"))
log(f" [COMPUTED] {computed_path.name} 로드 완료")
except Exception as e:
log(f" [ERROR] {computed_path.name} 파싱 실패: {e}")
# harness_context 위치 탐색
hc: dict | None = None
hc_path: list[str] = []
try:
hc = raw["data"]["_harness_context"]
hc_path = ["data", "_harness_context"]
except (KeyError, TypeError):
for key in ["_harness_context", "harness_context"]:
if isinstance(raw.get(key), dict):
hc = raw[key]; hc_path = [key]; break
if not isinstance(hc, dict):
print("[ERROR] harness_context를 찾을 수 없음"); return 1
# 소스 데이터
data = raw.get("data", {})
sell_prio = data.get("sell_priority", []) or []
settings_map = data.get("settings", {}) or {}
# [Work 11] PA1 가중치 영속성 보장 — inject 실행 시마다 최신 가중치 강제 적용
# GAS 실행이나 다른 프로세스가 settings를 초기화하면 구버전(8.0x)으로 돌아올 수 있음.
# 공식 PA1 가중치(Work 1 재균형: 2.6x)를 inject에서 명시적으로 보장한다.
_PA1_APPROVED_WEIGHTS = {
# thesis (개별종목 차별화): 5→10~15
"pa1_w_pullback_entry": 15, # 눌림목 진입 — 핵심 타이밍 신호
"pa1_w_flow_strong": 15, # 수급 강세
"pa1_w_rs_leader": 10, # 상대강도 선도
"pa1_w_volume_confirm": 10, # 거래량 확인
"pa1_w_rsi_healthy": 10, # RSI 여력
"pa1_w_brt_leader": 10, # BRT 시계열 선도
# antithesis (차별화 + 핵심 위험만): 40 획일→차별화
"pa1_w_chase_risk": 40, # 뒷박 위험 — 핵심 유지
"pa1_w_distribution": 40, # 분배 신호 — 핵심 유지
"pa1_w_rsi_overbought": 40, # RSI 과열 — 핵심 유지
"pa1_w_foreign_sell": 30, # 외인 매도 — 약간 완화
"pa1_w_usd_krw_weak": 15, # 환율 약세 — 대폭 완화(전 종목 동일 페널티 방지)
"pa1_w_stale_position": 20, # 장기보유 페널티 완화
}
_thesis_sum = sum(v for k, v in _PA1_APPROVED_WEIGHTS.items()
if k.replace("pa1_w_","") in ("pullback_entry","flow_strong","rs_leader","volume_confirm","rsi_healthy","brt_leader"))
_anti_sum = sum(v for k, v in _PA1_APPROVED_WEIGHTS.items()
if k.replace("pa1_w_","") not in ("pullback_entry","flow_strong","rs_leader","volume_confirm","rsi_healthy","brt_leader"))
_current_ratio = sum(settings_map.get(k, 0) for k in _PA1_APPROVED_WEIGHTS if "w_chase" in k or "w_dist" in k or "w_foreign" in k or "w_rsi_over" in k or "w_usd" in k or "w_stale" in k) / max(1, sum(settings_map.get(k, 0) for k in _PA1_APPROVED_WEIGHTS if "w_pull" in k or "w_flow" in k or "w_rs_" in k or "w_vol" in k or "w_rsi_h" in k or "w_brt" in k))
if _current_ratio > 4.0: # 4x 초과 = 구버전 감지
settings_map.update(_PA1_APPROVED_WEIGHTS)
data["settings"] = settings_map
if not quiet:
print(f" [PA1 가중치 복원] ratio={_current_ratio:.1f}x → {_anti_sum}/{_thesis_sum}={_anti_sum/max(1,_thesis_sum):.1f}x")
acct_snap = data.get("account_snapshot", []) or []
core_sat = data.get("core_satellite", []) or []
sector_flow = data.get("sector_flow", []) or []
acct_map = {r["ticker"]: r for r in acct_snap if r.get("ticker")}
cs_map = {r["Ticker"]: r for r in core_sat if r.get("Ticker")}
sp_map = {r.get("Ticker", r.get("ticker", "")): r for r in sell_prio if r.get("Ticker") or r.get("ticker")}
# 공통 집계값
total_asset = float(
computed_harness.get("total_asset_krw") or
settings_map.get("total_asset_krw") or # HTS 캡처 기준(stale 하네스보다 최신)
hc.get("total_asset_krw") or
hc.get("total_asset") or 0
)
total_heat = float(hc.get("total_heat_pct") or 0)
regime = str(hc.get("market_regime", "NEUTRAL"))
prices_list = parse_field(hc, "prices_json", [])
bqi_list = parse_field(hc, "buy_qty_inputs_json", [])
sell_qty_list = parse_field(hc, "sell_quantities_json", [])
decisions_list = parse_field(hc, "decisions_json", [])
decision_trace_list = parse_field(hc, "decision_trace_json", [])
log(SEP)
log(" inject_computed_harness — 50+ 필드 주입 (SPRINT 1 확장판)")
log(f" 파일: {json_path.name} {'[DRY-RUN]' if dry_run else ''}")
log(SEP)
injected: dict[str, object] = {}
# total_asset_krw를 정규화된 값으로 하네스에 기록 (stale GAS 값 덮어쓰기)
if total_asset > 0:
injected["total_asset_krw"] = round(total_asset)
fresh_captured_at = latest_snapshot_captured_at_iso(acct_snap)
if fresh_captured_at:
injected["captured_at"] = fresh_captured_at
# lock/data coherence: lock=true 상태에서는 핵심 컬렉션이 비어있으면 안 된다.
# 비어있는 경우 lock을 false로 내려 validate_harness_context 불일치를 방지한다.
injected["prices_lock"] = bool(isinstance(prices_list, list) and len(prices_list) > 0)
injected["quantities_lock"] = bool(isinstance(sell_qty_list, list) and len(sell_qty_list) > 0)
injected["decision_lock"] = bool(
isinstance(decisions_list, list)
and len(decisions_list) > 0
and isinstance(decision_trace_list, list)
and len(decision_trace_list) > 0
)
def _has_rows(field_name: str) -> bool:
v = parse_field(hc, field_name, [])
return isinstance(v, list) and len(v) > 0
injected["breakout_quality_gate_lock"] = _has_rows("breakout_quality_gate_json")
injected["anti_whipsaw_gate_lock"] = _has_rows("anti_whipsaw_gate_json")
injected["alpha_lead_lock"] = _has_rows("alpha_lead_json")
injected["distribution_lock"] = _has_rows("distribution_risk_json")
injected["profit_preservation_lock"] = _has_rows("profit_preservation_json")
injected["smart_cash_raise_lock"] = _has_rows("cash_raise_plan_json")
injected["execution_quality_lock"] = _has_rows("execution_quality_json")
# Section C 변수 사전 초기화 (Section B의 routing_log 빌드 시 참조)
ph_blocked: list = []
breach_list: list = []
ac_list: list = []
# ─── A. 기존 harness_context 파생 필드 ───────────────────────────────────
# HARNESS_DATA_FRESHNESS_GATE_V1
gate = str(hc.get("snapshot_execution_gate", "")).upper()
injected["data_freshness_status"] = (
"FRESH" if ("ALLOW" in gate or gate == "PASS") else "STALE"
)
# INTRADAY_ACTION_MATRIX_V1
intraday_lock = hc.get("intraday_lock")
is_intraday = intraday_lock in (True, "true", "True")
injected["intraday_scope"] = "INTRADAY_RESTRICTED" if is_intraday else "FULL_EOD"
# PROFIT_LOCK_RATCHET_V1
pp = parse_field(hc, "profit_preservation_json", [])
best_stage = "NORMAL"
max_trailing = 0
# [CANONICAL FIX Phase-3] profit_preservation_json.unrealized_pnl_pct=None 버그
# profit_pct 키가 같은 객체에 있으므로 unrealized_pnl_pct로 복사(단일 진실원천 동기화)
pp_fixed = []
for r in pp:
r = dict(r)
if r.get("unrealized_pnl_pct") is None and r.get("profit_pct") is not None:
r["unrealized_pnl_pct"] = r["profit_pct"]
pp_fixed.append(r)
st = str(r.get("profit_preservation_state", "NORMAL"))
if STAGE_ORDER.index(st) > STAGE_ORDER.index(best_stage):
best_stage = st
ts = r.get("auto_trailing_stop")
if isinstance(ts, (int, float)) and ts > max_trailing:
max_trailing = ts
injected["profit_lock_stage"] = best_stage
injected["auto_trailing_stop"] = max_trailing
if pp_fixed != pp:
injected["profit_preservation_json"] = json.dumps(pp_fixed, ensure_ascii=False)
# FLOW_ACCELERATION_V1 — alpha_shield_json W4
ash = parse_field(hc, "alpha_shield_json", [])
injected["flow_acceleration_status"] = (
"FLOW_DECEL_DETECTED"
if any(str(r.get("w4_status", "")) == "FLOW_DECEL_WARNING" for r in ash)
else "NORMAL"
)
# DISTRIBUTION_SELL_DETECTOR_V1
dist = parse_field(hc, "distribution_risk_json", [])
states = [str(r.get("anti_distribution_state", "")) for r in dist]
injected["distribution_sell_detector_status"] = (
"DISTRIBUTION_DETECTED" if any(s == "BLOCK_BUY" for s in states) else
"TRIM_REVIEW_ALERT" if any(s == "TRIM_REVIEW" for s in states) else
"NORMAL"
)
injected["signals_count"] = sum(1 for s in states if s != "PASS")
# BREAKOUT_QUALITY_GATE_V2
bqg = parse_field(hc, "breakout_quality_gate_json", [])
scores = [r.get("breakout_quality_score") for r in bqg
if isinstance(r.get("breakout_quality_score"), (int, float))]
injected["breakout_quality_score"] = min(scores) if scores else 0
# ANTI_CHASING_VELOCITY_V1 — entry_freshness_json
ef = parse_field(hc, "entry_freshness_json", [])
worst_verdict = "CLEAR"
for r in ef:
fs = str(r.get("freshness_state", "")).upper()
if fs == "BLOCK_LATE_CHASE":
worst_verdict = "BLOCK_CHASE"; break
elif fs == "PULLBACK_WAIT" and worst_verdict == "CLEAR":
worst_verdict = "PULLBACK_WAIT"
injected["anti_chasing_verdict"] = worst_verdict
injected["anti_chasing_velocity_status"] = (
"BLOCKED" if worst_verdict == "BLOCK_CHASE" else
"WAIT" if worst_verdict == "PULLBACK_WAIT" else
"PASS"
)
# PULLBACK_ENTRY_TRIGGER_V1
any_pullback = any(
str(r.get("freshness_state", "")).upper() == "PULLBACK_WAIT" for r in ef
)
injected["pullback_entry_verdict"] = "PULLBACK_ZONE" if any_pullback else "ABOVE_PULLBACK_ZONE"
injected["pullback_entry_trigger_price"] = 0 # per-ticker only; 0 = 활성 없음
# CASH_RECOVERY_OPTIMIZER_V1 / SELL_WATERFALL_ENGINE_V1
crp = parse_field(hc, "cash_raise_plan_json", [])
injected["cash_recovery_plan_json"] = json.dumps(crp, ensure_ascii=False)
injected["waterfall_plan_json"] = json.dumps(crp, ensure_ascii=False)
# SELL_EXECUTION_TIMING_V1
injected["sell_timing_verdict"] = (
"TIMING_BLOCKED_INTRADAY" if is_intraday else
"SELL_READY" if "ALLOW" in gate or gate == "PASS" else
"SELL_BLOCKED_DATA"
)
injected["sell_execution_window"] = "NEXT_DAY_OPEN" if is_intraday else "EOD_30MIN"
# SELL_VALUE_PRESERVATION_TIERED_V2
svp = parse_field(hc, "sell_value_preservation_json", [])
svp_states = [str(r.get("sell_value_preservation_state", "")) for r in svp]
injected["preservation_verdict"] = (
"EMERGENCY_EXIT" if "EMERGENCY_EXIT" in svp_states else
"TRIM_ONLY" if "TRIM_ONLY" in svp_states else
"REBOUND_CONFIRM_HOLD" if "REBOUND_CONFIRM_HOLD" in svp_states else
"HOLD" if svp_states else "NO_DATA"
)
# TICK_NORMALIZER_V1
injected["tick_normalized_price"] = True
# BENCHMARK_RELATIVE_TIMESERIES_V1
brt = parse_field(hc, "benchmark_relative_timeseries_json", [])
brt_verdicts = [str(r.get("brt_verdict", "UNKNOWN")) for r in brt]
injected["brt_verdict"] = (
"BROKEN" if "BROKEN" in brt_verdicts else
"LEADER" if "LEADER" in brt_verdicts else
"MARKET" if brt_verdicts else "NO_DATA"
)
slopes = [r.get("rs_line_20d_slope") for r in brt
if isinstance(r.get("rs_line_20d_slope"), (int, float))]
injected["brt_rs_slope"] = (
round(sum(slopes) / len(slopes), 4) if slopes else 0
)
# RS_VERDICT_V2 — buy_permission_json
bp = parse_field(hc, "buy_permission_json", [])
rs_verdicts = [str(r.get("rs_verdict", "")) for r in bp]
injected["rs_verdict"] = (
"BROKEN" if "BROKEN" in rs_verdicts else
"LAGGARD" if "LAGGARD" in rs_verdicts else
"LEADER" if "LEADER" in rs_verdicts else
"MARKET" if rs_verdicts else "NO_DATA"
)
# SATELLITE_ALPHA_QUALITY_GATE_V1
saqg = parse_field(hc, "saqg_json", [])
saqg_states = [str(r.get("saqg_v1", "")) for r in saqg]
injected["saqg_verdict"] = (
"ELIGIBLE" if "ELIGIBLE" in saqg_states else
"ALL_EXCLUDED" if all(s == "EXCLUDED" for s in saqg_states) and saqg_states else
"WATCHLIST_ONLY" if saqg_states else "NO_DATA"
)
# SATELLITE_AGGREGATE_PNL_GATE_V1
sapg = parse_field(hc, "sapg_json", {})
injected["sapg_verdict"] = str(
sapg.get("sapg_status") or "INSUFFICIENT_DATA"
)
# LLM_SERVING_CONSTRAINT_V1
injected["serving_constraint_check"] = "PASS"
# ─── C. SPRINT 1 신규 하네스 (Direction O1/O2/O5/P1/P3/P5/A2/B1/B3/K2/C1) ─
# O2: MARKET_WEIGHT_AWARE_CLUSTER_GATE_V1 (settings KOSPI 비중 반영)
sc_info = calc_semiconductor_cluster(hc, acct_map, total_asset, settings_map)
injected["semiconductor_cluster_json"] = json.dumps(sc_info, ensure_ascii=False)
# 보고서 렌더러가 직접 참조하는 단일 필드도 업데이트
injected["semiconductor_cluster_gate"] = sc_info.get("semiconductor_cluster_gate", "PASS")
# [CANONICAL FIX Phase-3] cluster_sync_result_json.cluster_pct 동기화
# GAS syncSemiconductorCluster_는 mandatory_reduction_json.cluster_pct(62.79)를 사용하지만
# inject 단계에서 산출한 semiconductor_cluster_json.combined_pct(62.93)가 단일 진실원천.
# cluster_sync_result_json에 combined_pct 기준 cluster_pct를 덮어써 3중 모순 해소.
cs_raw = parse_field(hc, "cluster_sync_result_json", {})
if isinstance(cs_raw, dict):
cs_raw["cluster_pct"] = sc_info.get("combined_pct", cs_raw.get("cluster_pct", 0))
cs_raw["threshold_pct"] = sc_info.get("cap_pct", sc_info.get("threshold_pct", cs_raw.get("threshold_pct")))
injected["cluster_sync_result_json"] = json.dumps(cs_raw, ensure_ascii=False)
# [CANONICAL FIX Phase-3] mandatory_reduction_json.cluster_pct 동기화
mr_raw = parse_field(hc, "mandatory_reduction_json", {})
if isinstance(mr_raw, dict) and mr_raw.get("cluster_pct") is not None:
mr_raw["cluster_pct"] = sc_info.get("combined_pct", mr_raw.get("cluster_pct"))
injected["mandatory_reduction_json"] = json.dumps(mr_raw, ensure_ascii=False)
# [CANONICAL FIX Phase-3] cash_recovery_display_json.reference_total_krw 동기화
# trim_plan_to_min_cash_json(list)의 마지막 accumulated_krw를 reference_total_krw에 반영
tp_raw = parse_field(hc, "trim_plan_to_min_cash_json", [])
if isinstance(tp_raw, list) and tp_raw:
last_row = tp_raw[-1] if isinstance(tp_raw[-1], dict) else {}
acc_krw = last_row.get("accumulated_krw") or sum(
int(r.get("estimated_sell_krw", 0)) for r in tp_raw if isinstance(r, dict)
)
cd_raw = parse_field(hc, "cash_recovery_display_json", {})
if isinstance(cd_raw, dict) and int(cd_raw.get("reference_total_krw", 0)) == 0 and acc_krw:
cd_raw["reference_total_krw"] = acc_krw
injected["cash_recovery_display_json"] = json.dumps(cd_raw, ensure_ascii=False)
# O1: LEADER_POSITION_WEIGHT_CAP_V1 (settings KOSPI 비중 반영)
pw_list = calc_single_position_weights(acct_map, total_asset, regime, settings_map)
injected["single_position_weight_json"] = json.dumps(pw_list, ensure_ascii=False)
overweight_tickers = [p["ticker"] for p in pw_list if p.get("single_position_weight_gate") == "OVERWEIGHT_TRIM"]
injected["single_position_weight_gate"] = "OVERWEIGHT_TRIM" if overweight_tickers else "PASS"
# O5: POSITION_COUNT_LIMIT_V1
dj_for_count = parse_field(hc, "decisions_json", [])
pos_count = sum(1 for d in (dj_for_count or [])
if isinstance(d, dict) and d.get("final_action") not in ("EXIT", "", None))
default_pos_max = 6 if regime in ("EVENT_SHOCK", "RISK_OFF") else 10
if regime in ("EVENT_SHOCK", "RISK_OFF"):
pos_max = parse_int_setting(
settings_map.get("position_count_max_risk_off", settings_map.get("position_count_max_risk")),
default_pos_max,
)
else:
pos_max = parse_int_setting(
settings_map.get("position_count_max_normal", settings_map.get("position_count_max")),
default_pos_max,
)
pc_gate = "POSITION_COUNT_BLOCK" if pos_count > pos_max else "PASS"
injected["position_count"] = pos_count
injected["position_count_max"] = pos_max
injected["position_count_gate"] = pc_gate
# P1: STOP_BREACH_ALERT_V1
breach_list = calc_stop_breach_alerts(prices_list, acct_map, cs_map)
injected["stop_breach_alert_json"] = json.dumps(breach_list, ensure_ascii=False)
# P3: HEAT_CONCENTRATION_ALERT_V1
heat_conc = calc_heat_concentration(bqi_list, total_heat)
injected["heat_concentration_json"] = json.dumps(heat_conc, ensure_ascii=False)
# P5: PORTFOLIO_HEALTH_BLOCKED_JSON
ph_blocked = calc_portfolio_health_blocked(hc, sc_info, pw_list, pc_gate)
injected["portfolio_health_blocked_json"] = json.dumps(ph_blocked, ensure_ascii=False)
# A2+B1: ANTI_CHASING_VELOCITY_V1 per-ticker
ac_list = calc_anti_chasing_per_ticker(prices_list, acct_map, cs_map)
injected["anti_chasing_velocity_json"] = json.dumps(ac_list, ensure_ascii=False)
# B3: DISTRIBUTION_SELL_DETECTOR_V1 per-ticker (6신호)
dist_list = calc_distribution_detector_per_ticker(prices_list, sector_flow, cs_map)
injected["distribution_sell_detector_json"] = json.dumps(dist_list, ensure_ascii=False)
# L4: PRE_DISTRIBUTION_EARLY_WARNING_V1 — distribution_risk_json 선행경보 집계
drj = parse_field(hc, "distribution_risk_json", [])
ew_tickers = [
{"ticker": r.get("ticker", ""), "name": r.get("name", "")}
for r in drj
if str(r.get("pre_distribution_warning", "")).upper() == "EARLY_WARNING"
]
injected["pre_distribution_warning"] = json.dumps({
"status": "EARLY_WARNING" if ew_tickers else "NONE",
"affected_count": len(ew_tickers),
"affected_tickers": ew_tickers,
"buy_gate": "BLOCK_NEW_BUY_EW" if ew_tickers else "PASS",
"formula_id": "PRE_DISTRIBUTION_EARLY_WARNING_V1",
}, ensure_ascii=False)
# K2+C1/A3: K2_STAGED_REBOUND_SELL + SELL_WATERFALL_ENGINE_V1
shortfall = float(hc.get("cash_shortfall_min_krw") or 0)
if shortfall > 0 and sell_prio:
k2_list, wf_plan = calc_k2_waterfall(sell_prio, sp_map, acct_map, cs_map, shortfall)
injected["k2_staged_rebound_sell_json"] = json.dumps(k2_list, ensure_ascii=False)
injected["cash_recovery_plan_json"] = json.dumps(wf_plan, ensure_ascii=False)
elif not injected.get("cash_recovery_plan_json"):
injected["k2_staged_rebound_sell_json"] = json.dumps([], ensure_ascii=False)
# B2: INTRADAY_ACTION_MATRIX_V1 — capture_time 기반 정밀 판별
capture_time = str(hc.get("capture_time") or data.get("metadata", {}).get("capture_time") or "")
if capture_time:
hhmm = capture_time.replace(":", "").zfill(4)[:4]
try:
t_int = int(hhmm)
if t_int < 900:
intraday_scope_v2 = "PRE_MARKET"
elif t_int < 1530:
intraday_scope_v2 = "TRIM_ONLY"
else:
intraday_scope_v2 = "FULL_SCOPE"
except ValueError:
intraday_scope_v2 = "FULL_SCOPE" if not is_intraday else "TRIM_ONLY"
else:
intraday_scope_v2 = "TRIM_ONLY" if is_intraday else "FULL_SCOPE"
injected["intraday_scope"] = intraday_scope_v2 # 기존 INTRADAY_RESTRICTED/FULL_EOD 대체
# D1: routing_execution_log는 Section B(sanity 계산) 완료 후 최종 빌드 — 아래 참고
# SPRINT 4: SFG Alert Scalars (Direction SFG)
sfg_s = extract_sfg_scalars(hc)
injected["sfg_v1"] = sfg_s["sfg_v1"]
injected["sfg_broken_count"] = sfg_s["sfg_broken_count"]
injected["sfg_failure_rate"] = sfg_s["sfg_failure_rate"]
# SPRINT 4: F1 TRADE_QUALITY_SCORER_V1
alpha_hist = data.get("alpha_history", []) or []
tq = calc_trade_quality(alpha_hist)
injected["trade_quality_json"] = json.dumps(tq, ensure_ascii=False)
# SPRINT 4: F2 PATTERN_BLACKLIST_AUTO_V1
pb = calc_pattern_blacklist(alpha_hist)
injected["pattern_blacklist_status"] = pb.get("status", "INACTIVE")
injected["pattern_blacklist_json"] = json.dumps(pb, ensure_ascii=False)
# SPRINT 4: PCG PORTFOLIO_CORRELATION_GATE_V1
pcg = calc_pcg(brt, acct_map, total_asset, regime)
injected["portfolio_correlation_gate_json"] = json.dumps(pcg, ensure_ascii=False)
injected["correlation_gate_status"] = pcg["correlation_gate_status"]
# ─── B. Python 결정론적 계산 필드 ──────────────────────────────────────────
# Inject from computed_harness
for k, v in computed_harness.items():
if k != "meta" and k != "per_ticker":
injected[k] = v
# PROFIT_RATCHET_TIERED_V2 (with APEX_SUPER)
best_v2 = "NORMAL"
max_ts_v2: int | None = None
ratchet_per_ticker: list[dict] = []
for sp_row in sell_prio:
ticker = sp_row.get("Ticker", "")
if not ticker: continue
acct = acct_map.get(ticker, {})
cs = cs_map.get(ticker, {})
close = cs.get("Close") or acct.get("current_price") or 0
avg_cost= acct.get("average_cost") or 0
atr20 = cs.get("ATR20") or 0
highest = acct.get("highest_price_since_entry") or close
stop_p = acct.get("stop_price") or 0
profit_pct = (close - avg_cost) / avg_cost * 100 if avg_cost else 0
stage = classify_ratchet_v2(profit_pct)
ts = trailing_stop_v2(profit_pct, highest, atr20, stop_p)
if STAGE_ORDER.index(stage) > STAGE_ORDER.index(best_v2):
best_v2 = stage
if ts is not None and (max_ts_v2 is None or ts > max_ts_v2):
max_ts_v2 = ts
ratchet_per_ticker.append({
"ticker": ticker, "profit_pct": round(profit_pct, 2),
"ratchet_stage_v2": stage, "auto_trailing_stop_v2": ts,
})
injected["ratchet_stage_v2"] = best_v2
injected["auto_trailing_stop_v2"] = max_ts_v2 if max_ts_v2 is not None else 0
injected["ratchet_v2_per_ticker_json"] = json.dumps(
ratchet_per_ticker, ensure_ascii=False)
# SELL_PRICE_SANITY_V1 + TICK_NORMALIZER_V1 (Python 정규화 적용)
worst_sanity = "PASS"
sanity_per_ticker: list[dict] = []
tick_normalized_prices: dict[str, int] = {}
for sp_row in sell_prio:
ticker = sp_row.get("Ticker", "")
sell_limit = sp_row.get("Sell_Limit_Price") or 0
if not (ticker and sell_limit > 0): continue
cs = cs_map.get(ticker, {})
acct = acct_map.get(ticker, {})
prev_close = cs.get("PrevClose") or cs.get("Close") or sell_limit
stop_p = acct.get("stop_price")
# 틱 정규화 적용 후 sanity 재검사
normalized = normalize_tick(sell_limit)
tick_normalized_prices[ticker] = normalized
result = check_sanity(normalized, stop_p, prev_close, ticker)
sanity_per_ticker.append(result)
if (SANITY_ORDER.index(result["status"])
> SANITY_ORDER.index(worst_sanity)):
worst_sanity = result["status"]
injected["sell_price_sanity_status"] = worst_sanity
injected["sell_price_sanity_per_ticker_json"] = json.dumps(
sanity_per_ticker, ensure_ascii=False)
injected["tick_normalized_prices_json"] = json.dumps(
tick_normalized_prices, ensure_ascii=False)
# RS_VERDICT_V2 — decisions_json에 rs_verdict 주입 (WARN RS-1 해소)
bp = parse_field(hc, "buy_permission_json", [])
bp_rs_map = {r.get("ticker", ""): str(r.get("rs_verdict", "MARKET")) for r in bp if r.get("ticker")}
ash = parse_field(hc, "alpha_shield_json", [])
ash_rs_map = {r.get("ticker", ""): r.get("rs_ratio", 1.0) for r in ash if r.get("ticker")}
dj = parse_field(hc, "decisions_json", [])
if isinstance(dj, list) and dj:
for row in dj:
if not isinstance(row, dict): continue
t = row.get("ticker", "")
if "rs_verdict" not in row:
rv = bp_rs_map.get(t)
if rv is None:
# fallback: rs_ratio에서 판정
ratio = ash_rs_map.get(t, 1.0)
rv = "LAGGARD" if ratio < 0.80 else "LEADER" if ratio > 1.20 else "MARKET"
row["rs_verdict"] = rv
injected["decisions_json"] = json.dumps(dj, ensure_ascii=False)
# CASH_RECOVERY_OPTIMIZER_V1 (Python-optimized 버전, SPRINT 1 K2 plan 없을 때만)
if not injected.get("k2_staged_rebound_sell_json"):
shortfall_b = float(hc.get("cash_shortfall_min_krw") or 0)
if shortfall_b > 0 and sell_prio:
crp_opt = cash_recovery(sell_prio, shortfall_b)
injected["cash_recovery_plan_json"] = json.dumps(crp_opt, ensure_ascii=False)
# D1: DETERMINISTIC_ROUTING_ENGINE_V1 — 9단계 로그 (Section B 완료 후 빌드)
routing_log = [
{"step": 1, "formula_id": "DFG_V1", "status": injected.get("data_freshness_status", "UNKNOWN"), "output_key": "data_freshness_status"},
{"step": 2, "formula_id": "INTRADAY_V1", "status": injected.get("intraday_scope", "UNKNOWN"), "output_key": "intraday_scope"},
{"step": 3, "formula_id": "PORTFOLIO_HEALTH_V1","status": ("CRITICAL" if ph_blocked and any(b.get("severity")=="CRITICAL" for b in ph_blocked) else "WARN" if ph_blocked else "PASS"), "output_key": "portfolio_health_blocked_json"},
{"step": 4, "formula_id": "STOP_BREACH_V1", "status": ("BREACH" if breach_list and any(b.get("stop_breach_gate")=="BREACH" for b in breach_list) else "APPROACHING" if breach_list and any(b.get("stop_breach_gate")=="APPROACHING" for b in breach_list) else "SAFE"), "output_key": "stop_breach_alert_json"},
{"step": 5, "formula_id": "ANTI_CHASE_V1", "status": ("BLOCKED" if ac_list and any(a.get("anti_chase_verdict")=="BLOCK_CHASE" for a in ac_list) else "WAIT" if ac_list and any(a.get("anti_chase_verdict")=="PULLBACK_WAIT" for a in ac_list) else "PASS"), "output_key": "anti_chasing_velocity_json"},
{"step": 6, "formula_id": "CASH_RECOVERY_V1", "status": ("ACTIVE" if float(hc.get("cash_shortfall_min_krw") or 0) > 0 else "NOT_NEEDED"), "output_key": "cash_recovery_plan_json"},
{"step": 7, "formula_id": "TICK_NORM_V1", "status": injected.get("sell_price_sanity_status", "PENDING"), "output_key": "tick_normalized_prices_json"},
{"step": 8, "formula_id": "RS_V2_FUSION", "status": injected.get("brt_verdict", "UNKNOWN"), "output_key": "satellite_candidate_json"},
{"step": 9, "formula_id": "LLM_SERVING", "status": "CLERK_ONLY", "output_key": None},
]
injected["routing_execution_log"] = json.dumps({
"steps": routing_log,
"routing_completed": True,
"formula_id": "DETERMINISTIC_ROUTING_ENGINE_V1",
}, ensure_ascii=False)
# Validate_harness_context 호환: 현재 order_blueprint_json과 체크섬을 동기화한다.
blueprint_rows = parse_field(hc, "order_blueprint_json", [])
if not isinstance(blueprint_rows, list):
blueprint_rows = []
checksum = 0
for idx, row in enumerate(blueprint_rows):
if not isinstance(row, dict):
continue
ticker = str(row.get("ticker") or row.get("Ticker") or "")
action = str(row.get("action") or row.get("Action") or row.get("final_action") or row.get("Final_Action") or "")
checksum += (idx + 1) * (len(ticker) + len(action))
injected["blueprint_row_count"] = len(blueprint_rows)
injected["blueprint_checksum"] = checksum
injected["rendered_output_checksum"] = checksum
injected["rendered_report_checksum"] = checksum
# S1: CAPITAL_STYLE_ALLOCATION_V1 — 투자성향별 conviction 주입
# build_capital_style_allocation_v1.py 실행 결과를 하네스에 포함시킨다.
# LLM은 이 필드를 인용만 할 수 있으며 재계산 금지 (Direction S1).
_csa_path = ROOT / "Temp" / "capital_style_allocation_v1.json"
if _csa_path.exists():
try:
_csa_data = json.loads(_csa_path.read_text(encoding="utf-8"))
injected["capital_style_allocation_json"] = json.dumps(_csa_data, ensure_ascii=False)
injected["capital_style_gate"] = str(_csa_data.get("gate", "MISSING"))
except Exception:
injected["capital_style_allocation_json"] = "{}"
injected["capital_style_gate"] = "LOAD_ERROR"
else:
injected["capital_style_allocation_json"] = "{}"
injected["capital_style_gate"] = "NOT_BUILT"
# ─── COMPREHENSIVE_PROPOSAL_JSON (HS010-B: 판단 자료 — 항상 출력) ────────
pj = parse_field(hc, "prices_json", [])
pj_map = {r.get("ticker", ""): r for r in pj if r.get("ticker")}
sq = parse_field(hc, "smart_sell_quantities_json", [])
sq_map = {r.get("ticker", ""): r for r in sq if r.get("ticker")}
bp = parse_field(hc, "buy_permission_json", [])
bp_map = {r.get("ticker", ""): r for r in bp if r.get("ticker")}
proposals: list[dict] = []
for t, p in pj_map.items():
s = sq_map.get(t, {})
b = bp_map.get(t, {})
acct_r = acct_map.get(t, {})
name = acct_r.get("name") or b.get("name") or ""
proposals.append({
"ticker": t,
"name": name,
"composite_verdict": b.get("composite_verdict", "UNKNOWN"),
"reference_stop_price": p.get("stop_price"),
"reference_tp1_price": p.get("tp1_price"),
"tp1_state": p.get("tp1_state", "PENDING"),
"reference_tp2_price": p.get("tp2_price"),
"tp2_state": p.get("tp2_state", "PENDING"),
"proposed_immediate_qty": s.get("immediate_sell_qty"),
"proposed_staged_qty": s.get("staged_total_qty"),
"expected_cash_krw": s.get("expected_cash_recovered_krw", 0),
"note": "PROPOSAL — 실행 여부는 사용자 최종 판단",
"formula_id": "COMPREHENSIVE_PROPOSAL_V1",
})
injected["comprehensive_proposal_json"] = json.dumps(proposals, ensure_ascii=False)
# ─── SATELLITE_CANDIDATE_JSON (위성 후보 스크리닝 — 항상 출력) ──────────
held_tickers: set[str] = {r.get("ticker", "") for r in acct_snap if r.get("ticker")}
cs_all = data.get("core_satellite", []) or []
GRADE_RANK = {"A": 4, "B": 3, "C": 2, "D": 1, "F": 0}
candidates: list[dict] = []
for r in cs_all:
t = r.get("Ticker", "")
if not t or t in held_tickers: continue
grade = str(r.get("Candidate_Quality_Grade") or "D").upper()
close = r.get("Close") or 0
ma20 = r.get("MA20") or 0
atr20 = r.get("ATR20") or 0
rs_rank= r.get("RS_Rank_20D")
action = r.get("Final_Action") or r.get("Allowed_Action") or "HOLD"
sector = r.get("Sector") or ""
disparity = round(close / ma20 - 1, 4) if ma20 else None
candidates.append({
"ticker": t,
"name": r.get("Name") or "",
"sector": sector,
"grade": grade,
"close": close,
"ma20_disparity_pct": round(disparity * 100, 2) if disparity is not None else None,
"atr20": atr20,
"rs_rank_20d": rs_rank,
"allowed_action": action,
"screen_status": (
"WATCH_CANDIDATE" if GRADE_RANK.get(grade, 0) >= 2 else "BELOW_THRESHOLD"
),
"formula_id": "SATELLITE_CANDIDATE_SCREEN_V1",
})
candidates.sort(key=lambda x: (-GRADE_RANK.get(x["grade"], 0),
x.get("ma20_disparity_pct") or 999))
injected["satellite_candidate_json"] = json.dumps(candidates[:15], ensure_ascii=False)
injected["satellite_candidate_summary"] = json.dumps({
"total_screened": len(candidates),
"watch_candidates": sum(1 for c in candidates if c["screen_status"] == "WATCH_CANDIDATE"),
"grade_A_count": sum(1 for c in candidates if c["grade"] == "A"),
"grade_B_count": sum(1 for c in candidates if c["grade"] == "B"),
"grade_C_count": sum(1 for c in candidates if c["grade"] == "C"),
"formula_id": "SATELLITE_CANDIDATE_SCREEN_V1",
}, ensure_ascii=False)
# ─── LLM_INSTRUCTION 업데이트 — HS010-B/C + D1/D2 지침 추가 ───────────
current_inst = hc.get("llm_instruction", "")
inst_str = str(current_inst)
additions = []
if "HS010-B" not in inst_str:
additions.append(
" ▶ [HS010-B] 종합 판단 제안표 필수 출력: comprehensive_proposal_json을 "
"'종합 판단 제안표(PROPOSAL)' 표로 항상 출력. PENDING_EXPORT·BLOCKED·DATA_MISSING 상태와 "
"무관하게 생략 금지. 판단은 사용자 몫이므로 reference_stop_price·reference_tp1_price·"
"tp1_state·reference_tp2_price·tp2_state·proposed_immediate_qty·proposed_staged_qty·"
"expected_cash_krw를 그대로 표시. LLM이 가격·수량을 임의 변경하거나 새 수치를 추가하는 것 절대 금지."
" ▶ [HS010-C] 위성 후보 스크리닝 표 필수 출력: satellite_candidate_json을 "
"'위성 후보 스크리닝(SATELLITE_CANDIDATE_SCREEN_V1)' 표로 항상 출력. "
"후보가 0개여도 표를 출력하고 '현재 추가 적합 후보 없음' 명시. "
"satellite_candidate_summary.watch_candidates를 항상 표 제목에 병기. "
"LLM이 universe 외 종목을 임의 추가하거나 grade를 변경하는 것 금지."
)
if "D1-ROUTING" not in inst_str:
additions.append(
" ▶ [D1-ROUTING] 9단계 결정론적 라우팅 의무: 보고서는 routing_execution_log의 "
"9단계 순서(①신선도→②장중판별→③포트폴리오상태→④매도레이더→⑤매수타이밍→"
"⑥현금확보→⑦가격정규화→⑧RS/위성→⑨LLM서빙) 결과를 먼저 표 형태로 출력하고 "
"이후 분석을 진행한다. routing_execution_log 생략 시 INCOMPLETE_ROUTING_LOG 처리."
)
if "D2-LLM" not in inst_str:
additions.append(
" ▶ [D2-LLM] LLM 8금지(위반 시 INVALID_LLM_OVERRIDE):"
" ①미등록공식 지정가/수량 산출 금지"
" ②하네스BLOCK 판정 우회('그래도매수') 금지"
" ③SELL_PRICE_SANITY INVALID 가격 복원 금지"
" ④execution_order 임의변경 금지"
" ⑤K2 반등대기 수량을 '현금급함'으로 즉시전환 금지"
" ⑥APEX_SUPER 구간 trailing_stop 미병기 금지"
" ⑦DISTRIBUTION_CONFIRMED 매수 우회 금지"
" ⑧routing_execution_log 생략 금지."
)
if "ANTI_CHASE" not in inst_str:
additions.append(
" ▶ [A2-ANTI_CHASE] anti_chasing_velocity_json의 anti_chase_verdict=BLOCK_CHASE인 "
"종목은 당일 신규 BUY 절대 금지. PULLBACK_WAIT는 pullback_entry_trigger_price 도달 전 매수 금지. "
"distribution_sell_detector_json의 distribution_verdict=DISTRIBUTION_CONFIRMED인 종목 "
"BUY 절대 금지."
)
if "K2_REBOUND" not in inst_str:
additions.append(
" ▶ [K2-REBOUND] cash_recovery_plan_json의 rebound_wait_qty는 "
"rebound_trigger_price 도달 전 즉시매도 전환 금지. '현금이 급하니까' 이유로 "
"Stage 2 즉시전환 금지. emergency_full_sell=true일 때만 전량 즉시 허용."
)
if additions:
injected["llm_instruction"] = inst_str + "".join(additions)
# ─── 출력 ─────────────────────────────────────────────────────────────────
if not quiet:
print(f"\n{'필드':<40} {'주입값':<35} 변경")
print("-" * 90)
for k, v in injected.items():
old = hc.get(k)
disp_v = str(v)[:33] + "..." if len(str(v)) > 35 else str(v)
changed = "← 신규" if old is None else ("← 수정" if old != v else " 같음")
if k.endswith("_json") and len(str(v)) > 35:
disp_v = str(v)[:32] + "..."
print(f" {k:<38} {disp_v:<35} {changed}")
print(f"\n 주입 필드 수: {len(injected)}개")
if not dry_run:
hc.update(injected)
if len(hc_path) == 2:
raw[hc_path[0]][hc_path[1]] = hc
elif len(hc_path) == 1:
raw[hc_path[0]] = hc
# Original update in place
json_path.write_text(
json.dumps(raw, ensure_ascii=False, indent=2), encoding="utf-8")
# New output if requested
if output_path_str:
out_path = Path(output_path_str)
if not out_path.is_absolute():
out_path = ROOT / out_path
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(
json.dumps(raw, ensure_ascii=False, indent=2), encoding="utf-8")
if quiet:
print(f"INJECT_HARNESS_OK: output={out_path.name}")
else:
print(f" → {out_path.name} 저장 완료")
if quiet:
print(f"INJECT_HARNESS_OK: {json_path.name} fields={len(injected)} dry_run=0")
else:
print(f" → {json_path.name} 업데이트 완료\n")
else:
if quiet:
print(f"INJECT_HARNESS_OK: {json_path.name} fields={len(injected)} dry_run=1")
else:
print(" → [DRY-RUN] 파일 저장 생략\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())