Files
QuantEngineByItz/src/quant_engine/compute_formula_outputs.py
T
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

813 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
compute_formula_outputs.py
───────────────────────────────────────────────────────────────────────────────
Python 공식 계산 엔진
GAS가 아직 구현하지 않은 핵심 공식을 Python으로 결정론적으로 계산한다.
동일 입력 → 동일 출력. 텍스트 판단 없음. 수치만 출력.
계산 대상:
- VELOCITY_V1 : velocity_1d, velocity_5d
- PROFIT_LOCK_STAGE : profit_pct → 단계 분류
- ANTI_CHASING_VELOCITY_V1 : velocity_1d 기반 뒷박 차단
- PULLBACK_ENTRY_TRIGGER_V1 : MA20 기반 눌림목 기준가
- SELL_PRICE_SANITY_V1 : 매도가 역전·비현실가 검증
- TICK_NORMALIZER_V1 : KRX 호가 단위 정규화
- CASH_RECOVERY_OPTIMIZER_V1 : 최소 주식가치 훼손 매도조합
- PROFIT_RATCHET_TIERED_V2 : APEX_SUPER ATR×1.2 trailing
사용법:
python tools/compute_formula_outputs.py [GatherTradingData.json]
python tools/compute_formula_outputs.py --output computed_harness.json
"""
from __future__ import annotations
import json
import math
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from src.quant_engine.exit_decisions import (
compute_cash_shortfall_harness as _compute_cash_shortfall_harness,
compute_dynamic_heat_thresholds as _compute_dynamic_heat_thresholds,
compute_final_decision as _compute_final_decision,
compute_sell_decision as _compute_sell_decision,
compute_timing_decision as _compute_timing_decision,
compute_stop_action_ladder as _compute_stop_action_ladder,
compute_stop_price_core as _compute_stop_price_core,
normalize_tick as _normalize_tick,
)
# Windows cp949 터미널 호환
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1)
if sys.stderr.encoding and sys.stderr.encoding.lower() not in ("utf-8", "utf8"):
sys.stderr = open(sys.stderr.fileno(), mode="w", encoding="utf-8", buffering=1)
SEP = "=" * 70
# ─────────────────────────────────────────────────────────────────────────────
# KRX 호가 단위 테이블 (TICK_NORMALIZER_V1)
# ─────────────────────────────────────────────────────────────────────────────
KRX_TICK_TABLE = [
( 2_000, 1),
( 5_000, 5),
( 20_000, 10),
( 50_000, 50),
( 200_000, 100),
( 500_000, 500),
(math.inf, 1000),
]
def krx_tick_unit(price: float) -> int:
for threshold, tick in KRX_TICK_TABLE:
if price < threshold:
return tick
return 1000
def normalize_tick(price: float) -> int:
return _normalize_tick(price)
# ─────────────────────────────────────────────────────────────────────────────
# PROFIT_LOCK_STAGE 분류기
# ─────────────────────────────────────────────────────────────────────────────
def classify_profit_lock_stage(profit_pct: float) -> str:
if profit_pct >= 60:
return "APEX_SUPER"
elif profit_pct >= 40:
return "APEX_TRAILING"
elif profit_pct >= 30:
return "PROFIT_LOCK_30"
elif profit_pct >= 20:
return "PROFIT_LOCK_20"
elif profit_pct >= 10:
return "PROFIT_LOCK_10"
elif profit_pct >= 0:
return "BREAKEVEN_RATCHET"
else:
return "NORMAL"
# ─────────────────────────────────────────────────────────────────────────────
# PROFIT_RATCHET_TIERED_V2
# ─────────────────────────────────────────────────────────────────────────────
def compute_trailing_stop_v2(
profit_pct: float,
highest_close: float,
atr20: float,
ratchet_stop: float | None,
average_cost: float,
) -> dict:
stage = classify_profit_lock_stage(profit_pct)
ratchet_stop = ratchet_stop or average_cost
if stage == "APEX_SUPER":
raw = highest_close - 1.2 * atr20
trailing_stop = normalize_tick(max(ratchet_stop, raw))
tp_action = "강제 10% 익절 권고"
elif stage == "APEX_TRAILING":
raw = highest_close - 1.5 * atr20
trailing_stop = normalize_tick(max(ratchet_stop, raw))
tp_action = "부분익절 검토"
elif stage in ("PROFIT_LOCK_30", "PROFIT_LOCK_20"):
raw = highest_close - 2.0 * atr20
trailing_stop = normalize_tick(max(ratchet_stop, raw))
tp_action = "래칫 유지"
else:
trailing_stop = None
tp_action = "적용 안함"
return {
"ratchet_stage_v2": stage,
"auto_trailing_stop_v2": trailing_stop,
"tp_ladder_action": tp_action,
"apex_super_active": stage == "APEX_SUPER",
}
def compute_stop_price_core(entry_price: float | None, atr20: float | None, current_price: float | None) -> dict:
return _compute_stop_price_core(entry_price, atr20, current_price)
def compute_stop_action_ladder(context: dict) -> dict:
return _compute_stop_action_ladder(context)
def compute_dynamic_heat_thresholds(regime: str) -> dict:
return _compute_dynamic_heat_thresholds(regime)
def compute_cash_shortfall_harness(as_result: dict, total_asset: float, cash_floor_info: dict, mrs_score: float) -> dict:
return _compute_cash_shortfall_harness(as_result, total_asset, cash_floor_info, mrs_score)
def compute_timing_decision(ctx: dict) -> dict:
return _compute_timing_decision(ctx)
def compute_sell_decision(ctx: dict) -> dict:
return _compute_sell_decision(ctx)
def compute_final_decision(ctx: dict) -> dict:
return _compute_final_decision(ctx)
# ─────────────────────────────────────────────────────────────────────────────
# ANTI_CHASING_VELOCITY_V1
# ─────────────────────────────────────────────────────────────────────────────
def compute_anti_chasing(velocity_1d: float) -> dict:
if velocity_1d >= 3.0:
verdict = "BLOCK_CHASE"
status = "BLOCKED"
elif velocity_1d >= 1.5:
verdict = "PULLBACK_WAIT"
status = "WAIT"
else:
verdict = "CLEAR"
status = "PASS"
return {
"anti_chasing_verdict": verdict,
"anti_chasing_velocity_status": status,
"velocity_1d_input": round(velocity_1d, 4),
}
# ─────────────────────────────────────────────────────────────────────────────
# PULLBACK_ENTRY_TRIGGER_V1
# ─────────────────────────────────────────────────────────────────────────────
def compute_pullback_trigger(close: float, ma20: float, atr20: float) -> dict:
trigger_price = normalize_tick(ma20 - 0.5 * atr20)
upper_band = ma20 * 1.03
if close <= upper_band:
verdict = "PULLBACK_ZONE"
state = "PASS"
else:
verdict = "ABOVE_PULLBACK_ZONE"
state = "BLOCKED"
return {
"pullback_entry_verdict": verdict,
"pullback_state": state,
"pullback_entry_trigger_price": trigger_price,
"pullback_upper_band": normalize_tick(upper_band),
}
# ─────────────────────────────────────────────────────────────────────────────
# SELL_PRICE_SANITY_V1
# ─────────────────────────────────────────────────────────────────────────────
def check_sell_price_sanity(
sell_limit_price: float,
stop_loss_price: float | None,
prev_close: float,
ticker: str = "",
) -> dict:
issues: list[str] = []
status = "PASS"
if stop_loss_price is not None and sell_limit_price < stop_loss_price:
issues.append(
f"INVALID_PRICE_INVERSION: sell={sell_limit_price:,} < stop={stop_loss_price:,}"
)
status = "INVALID_PRICE_INVERSION"
upper_limit = prev_close * 1.30
if sell_limit_price > upper_limit:
issues.append(
f"INVALID_UNREALISTIC_PRICE: sell={sell_limit_price:,} > prev_close*1.30={upper_limit:,.0f}"
)
if status == "PASS":
status = "INVALID_UNREALISTIC_PRICE"
tick_unit = krx_tick_unit(sell_limit_price)
if sell_limit_price % tick_unit != 0:
corrected = normalize_tick(sell_limit_price)
issues.append(
f"INVALID_TICK: sell={sell_limit_price:,} 호가단위={tick_unit}원 → 정규화={corrected:,}"
)
if status == "PASS":
status = "INVALID_TICK"
return {
"sell_price_sanity_status": status,
"sell_price_sanity_issues": issues,
"hts_allowed": status == "PASS",
"shadow_ledger": status != "PASS",
"ticker": ticker,
}
# ─────────────────────────────────────────────────────────────────────────────
# CASH_RECOVERY_OPTIMIZER_V1
# ─────────────────────────────────────────────────────────────────────────────
def compute_cash_recovery_optimizer(
sell_candidates: list[dict], # sorted by h2_priority_rank asc
cash_shortfall_min_krw: float,
) -> dict:
plan: list[dict] = []
cumulative_krw = 0.0
for cand in sell_candidates:
if cumulative_krw >= cash_shortfall_min_krw:
break
ticker = cand.get("Ticker", "")
name = cand.get("Name", "")
qty = cand.get("Sell_Qty") or 0
limit_price = cand.get("Sell_Limit_Price") or cand.get("current_price", 0)
preserve_ratio = cand.get("Cash_Preserve_Ratio", 100)
style = cand.get("Cash_Preserve_Style", "FULL")
if qty and limit_price:
expected_krw = qty * limit_price * (preserve_ratio / 100)
else:
expected_krw = 0
plan.append({
"ticker": ticker,
"name": name,
"qty": qty,
"limit_price": normalize_tick(limit_price) if limit_price else None,
"preserve_style": style,
"preserve_ratio": preserve_ratio,
"expected_krw": round(expected_krw),
})
cumulative_krw += expected_krw
shortfall_met = cumulative_krw >= cash_shortfall_min_krw
return {
"cash_recovery_plan_json": {
"sell_sequence": plan,
"expected_total_krw": round(cumulative_krw),
"cash_shortfall_min_krw": cash_shortfall_min_krw,
"shortfall_met": shortfall_met,
"items_needed": len(plan),
}
}
# ─────────────────────────────────────────────────────────────────────────────
# 메인 계산 루틴
# ─────────────────────────────────────────────────────────────────────────────
def main() -> int:
output_path: Path | None = None
args = sys.argv[1:]
if "--output" in args:
idx = args.index("--output")
output_path = Path(args[idx + 1])
args = [a for i, a in enumerate(args) if i != idx and i != idx + 1]
json_path = Path(args[0]) if args else ROOT / "GatherTradingData.json"
if not json_path.exists():
print(f"[ERROR] {json_path} not found")
return 1
raw = json.loads(json_path.read_text(encoding="utf-8"))
hc = None
try:
hc = raw["data"]["_harness_context"]
except (KeyError, TypeError):
pass
if not isinstance(hc, dict):
hc = {}
account_snapshot = raw.get("data", {}).get("account_snapshot", []) or []
sell_priority = raw.get("data", {}).get("sell_priority", []) or []
core_satellite = raw.get("data", {}).get("core_satellite", []) or []
# ── TOTAL ASSET CALCULATION ─────────────────────────────────────────────
# Sum market_value of all holdings + available_cash (if present in context)
holdings_value = sum(float(r.get("market_value", 0) or 0) for r in account_snapshot if r.get("market_value"))
# Try to find cash in snapshot or context
cash_d2 = float(hc.get("settlement_cash_d2_krw") or hc.get("available_cash") or 0)
total_asset = holdings_value + cash_d2
hc["total_asset_krw"] = round(total_asset)
hc["total_asset"] = round(total_asset)
regime = str(hc.get("market_regime", "NEUTRAL"))
# ticker → account row lookup
acct_map: dict[str, dict] = {
row["ticker"]: row
for row in account_snapshot
if row.get("ticker")
}
# ticker → core_satellite row lookup
cs_map: dict[str, dict] = {
row["Ticker"]: row
for row in core_satellite
if row.get("Ticker")
}
computed: dict[str, object] = {}
per_ticker: list[dict] = []
cash_shortfall = hc.get("cash_shortfall_min_krw", 0) or 0
for sp_row in sell_priority:
ticker = sp_row.get("Ticker", "")
if not ticker:
continue
acct = acct_map.get(ticker, {})
cs = cs_map.get(ticker, {})
close = cs.get("Close") or acct.get("current_price") or 0
prev_close = cs.get("PrevClose") or close
ma20 = cs.get("MA20") or close
atr20 = cs.get("ATR20") or 0
avg_cost = acct.get("average_cost") or 0
qty_held = acct.get("holding_quantity") or 0
highest = acct.get("highest_price_since_entry") or close
stop_price = acct.get("stop_price")
sell_limit = sp_row.get("Sell_Limit_Price") or 0
ret5d = cs.get("Ret5D") or 0
# ── VELOCITY ────────────────────────────────────────────────────────
velocity_1d = ((close - prev_close) / prev_close * 100) if prev_close else 0
velocity_5d = float(ret5d) if ret5d else 0
# ── PROFIT PCT & STAGE ──────────────────────────────────────────────
profit_pct = ((close - avg_cost) / avg_cost * 100) if avg_cost else (
acct.get("return_pct") or 0
)
profit_lock = classify_profit_lock_stage(profit_pct)
# ── RATCHET V2 ──────────────────────────────────────────────────────
ratchet = compute_trailing_stop_v2(
profit_pct=profit_pct,
highest_close=highest,
atr20=atr20,
ratchet_stop=stop_price,
average_cost=avg_cost,
)
# ── ANTI_CHASING ────────────────────────────────────────────────────
anti_chase = compute_anti_chasing(velocity_1d)
# ── PULLBACK TRIGGER ─────────────────────────────────────────────────
pullback = compute_pullback_trigger(close, ma20, atr20)
# ── SELL PRICE SANITY ────────────────────────────────────────────────
sanity = {}
if sell_limit:
sanity = check_sell_price_sanity(
sell_limit_price=sell_limit,
stop_loss_price=stop_price,
prev_close=prev_close or close,
ticker=ticker,
)
# ── SELL DECISION ───────────────────────────────────────────────────
sell_ctx = {
"close": close,
"stopPrice": stop_price,
"trailingStop": ratchet.get("auto_trailing_stop_v2"),
"tp1Price": sp_row.get("TP1_Price"),
"tp2Price": sp_row.get("TP2_Price"),
"profitPct": profit_pct,
"rwPartial": sp_row.get("RW_Partial"),
"timingExitScore": sp_row.get("Timing_Score_Exit"),
"daysToTimeStop": sp_row.get("Days_To_Time_Stop"),
"timingAction": sp_row.get("Timing_Action"),
"regime": regime,
"atr20": atr20,
}
r_sell = compute_sell_decision(sell_ctx)
# ── FINAL DECISION ──────────────────────────────────────────────────
decision_ctx = {
"sellAction": r_sell.get("action"),
"sellValidation": r_sell.get("validation"),
"allowedAction": sp_row.get("Allowed_Action"),
"timingAction": sp_row.get("Timing_Action"),
"timingScoreEntry": sp_row.get("Timing_Score_Entry"),
"timingExitScore": sp_row.get("Timing_Score_Exit"),
"ss001Total": sp_row.get("SS001_Total"),
"flowCredit": sp_row.get("Flow_Credit"),
"leaderTotal": sp_row.get("Leader_Scan_Total"),
"rwPartial": sp_row.get("RW_Partial"),
"profitPct": profit_pct,
"daysToTimeStop": sp_row.get("Days_To_Time_Stop"),
"weightPct": sp_row.get("Weight_Pct"),
"acGate": sp_row.get("AC_Gate"),
"liquidityStatus": sp_row.get("Liquidity_Status"),
"spreadStatus": sp_row.get("Spread_Status"),
"dartRisk": bool(sp_row.get("DART_Risk")),
"missingFields": sp_row.get("Missing_Fields"),
}
r_final = compute_final_decision(decision_ctx)
row_result = {
"ticker": ticker,
"name": sp_row.get("Name", ""),
"close": close,
"prev_close": prev_close,
"ma20": ma20,
"atr20": atr20,
"avg_cost": avg_cost,
"profit_pct": round(profit_pct, 2),
"velocity_1d": round(velocity_1d, 4),
"velocity_5d": round(velocity_5d, 4),
"profit_lock_stage": profit_lock,
**ratchet,
**anti_chase,
**pullback,
**(sanity if sanity else {}),
**r_sell,
**r_final,
}
per_ticker.append(row_result)
# ── ORDER BLUEPRINT GENERATION ──────────────────────────────────────────
blueprint_rows = []
for r in per_ticker:
if r.get("final_action") == "SELL_READY":
blueprint_rows.append({
"ticker": r["ticker"],
"name": r["name"],
"order_type": r.get("order_type", "LIMIT_SELL"),
"limit_price": r.get("limit_price"),
"quantity": int(acct_map.get(r["ticker"], {}).get("holding_quantity", 0) * (r.get("ratio_pct", 0) / 100)),
"validation_status": "PASS",
"rationale_code": r.get("reason"),
"formula_id": "ORDER_BLUEPRINT_V1",
})
computed["order_blueprint_json"] = blueprint_rows
# ── CASH_RECOVERY_OPTIMIZER ─────────────────────────────────────────────
if cash_shortfall > 0:
recovery = compute_cash_recovery_optimizer(sell_priority, cash_shortfall)
computed.update(recovery)
computed["per_ticker"] = per_ticker
computed["meta"] = {
"source_file": str(json_path.name),
"computed_by": "tools/compute_formula_outputs.py",
"formulas_run": [
"VELOCITY_V1", "PROFIT_LOCK_STAGE_V1", "PROFIT_RATCHET_TIERED_V2",
"ANTI_CHASING_VELOCITY_V1", "PULLBACK_ENTRY_TRIGGER_V1",
"SELL_PRICE_SANITY_V1", "CASH_RECOVERY_OPTIMIZER_V1",
],
"deterministic": True,
"llm_computed": False,
}
# ── 출력 ────────────────────────────────────────────────────────────────
print(SEP)
print(" Python 공식 계산 엔진 — compute_formula_outputs")
print(f" 소스: {json_path.name} | 현금부족: {cash_shortfall:,.0f}원")
print(SEP)
print(f"\n{'ticker':<10} {'종목명':<14} {'수익률%':>7} {'단계':<20} "
f"{'velocity_1d%':>12} {'뒷박판정':<15} {'trailing_stop':>14}")
print("-" * 100)
for r in per_ticker:
ts = r.get("auto_trailing_stop_v2") or r.get("auto_trailing_stop_v2")
ts_str = f"{ts:>14,}" if ts else f"{'(없음)':>14}"
print(
f"{r['ticker']:<10} {r['name']:<14} {r['profit_pct']:>7.1f}% "
f"{r['profit_lock_stage']:<20} {r['velocity_1d']:>12.2f}% "
f"{r['anti_chasing_verdict']:<15} {ts_str}"
)
print()
# 매도가 역전 경보
invalid_prices = [r for r in per_ticker if r.get("sell_price_sanity_status", "PASS") != "PASS"]
if invalid_prices:
print(f"[!] SELL_PRICE_SANITY 경보 — {len(invalid_prices)}개 종목 HTS 입력 차단")
for r in invalid_prices:
for issue in r.get("sell_price_sanity_issues", []):
print(f" {r['ticker']} {r['name']}: {issue}")
else:
print("[✔] SELL_PRICE_SANITY — 전 종목 가격 정상")
# 현금회복 계획
crp = computed.get("cash_recovery_plan_json")
if crp:
print(f"\n[현금회복 최적 매도조합] 부족분 {cash_shortfall:,.0f}원")
print(f" {'#':<3} {'ticker':<10} {'종목명':<14} {'수량':>6} {'지정가':>10} {'예상회수':>12}")
print(" " + "-" * 58)
for i, s in enumerate(crp.get("sell_sequence", []), 1):
lp = s.get("limit_price") or 0
ek = s.get("expected_krw") or 0
print(f" {i:<3} {s['ticker']:<10} {s['name']:<14} {s.get('qty', 0):>6} "
f"{lp:>10,} {ek:>12,}")
met_str = "✔ 달성" if crp.get("shortfall_met") else "✗ 부족"
print(f" 합계: {crp.get('expected_total_krw', 0):>12,}원 ({met_str})")
# ── JSON 파일 저장 ─────────────────────────────────────────────────────
if output_path is None:
output_path = ROOT / "Temp" / "computed_harness.json"
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(
json.dumps(computed, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f"\n → 결과 저장: {output_path}")
print(f" → 결정론적 계산 완료. LLM 추정 없음.\n")
return 0
# ─────────────────────────────────────────────────────────────────────────────
# IMPUTED_DATA_EXPOSURE_GATE_V1
# weighted_coverage = Σ(weight × coverage); ifr = 1 - wc; ech = raw × (0.4 + 0.6 × wc)
# gate: ifr ≥ 0.50 → BLOCK / ≥ 0.25 → WARN / else PASS
# ─────────────────────────────────────────────────────────────────────────────
_IDEG_WEIGHTS = {"fundamental_core": 0.30, "realized_outcome": 0.30,
"trade_quality": 0.15, "pattern": 0.10, "alpha_eval": 0.15}
def compute_imputed_data_exposure(domain_coverage: dict, raw_cap: float) -> dict:
wc = sum(_IDEG_WEIGHTS.get(k, 0) * v for k, v in domain_coverage.items())
ifr = round(1.0 - wc, 4)
ech = round(raw_cap * (0.4 + 0.6 * wc), 1)
gate = "IMPUTED_DATA_BLOCK" if ifr >= 0.50 else ("IMPUTED_DATA_WARN" if ifr >= 0.25 else "PASS")
return {"gate_status": gate, "imputed_field_ratio": round(ifr, 4),
"weighted_coverage": round(wc, 4), "effective_confidence_honest": ech}
# ─────────────────────────────────────────────────────────────────────────────
# TRAILING_STOP_PRICE_V1
# trailing_stop = highest_price - atr20 × multiplier
# ─────────────────────────────────────────────────────────────────────────────
def compute_trailing_stop_price(highest_price_since_entry: float, atr20: float,
trailing_atr_multiplier: float) -> dict:
return {"trailing_stop_price": round(highest_price_since_entry - atr20 * trailing_atr_multiplier)}
# ─────────────────────────────────────────────────────────────────────────────
# EXPECTED_EDGE_V1
# edge = ((tp - entry) / (entry - stop)) × confidence
# ─────────────────────────────────────────────────────────────────────────────
def compute_expected_edge(target_price: float, entry_price: float,
stop_price: float, bayesian_confidence: float) -> dict:
r_multiple = (target_price - entry_price) / (entry_price - stop_price) if (entry_price - stop_price) != 0 else 0
edge = round(r_multiple * bayesian_confidence, 4)
return {"expected_edge": edge, "r_multiple": round(r_multiple, 4)}
# ─────────────────────────────────────────────────────────────────────────────
# TP_VALIDITY_CHECK_V1
# tp_price > current_price → valid; else null (triggered)
# ─────────────────────────────────────────────────────────────────────────────
def compute_tp_validity(tp_price: float | None, current_price: float) -> dict:
if tp_price is None:
return {"tp_validated_price": None, "tp_state": "UNKNOWN_NO_CLOSE"}
if tp_price > current_price:
return {"tp_validated_price": tp_price, "tp_state": "PENDING"}
return {"tp_validated_price": None, "tp_state": "TP1_ALREADY_TRIGGERED"}
# ─────────────────────────────────────────────────────────────────────────────
# RS_RATIO_V1
# rs_ratio = stock_5d_return / kospi_5d_return
# ─────────────────────────────────────────────────────────────────────────────
def compute_rs_ratio(stock_close_5d_return: float, kospi_close_5d_return: float) -> dict:
if kospi_close_5d_return == 0:
return {"rs_ratio": None, "note": "kospi_return=0"}
return {"rs_ratio": round(stock_close_5d_return / kospi_close_5d_return, 4)}
# ─────────────────────────────────────────────────────────────────────────────
# RATCHET_TRAILING_AUTO_V1
# PROFIT_LOCK_20: max(ratchet_stop, highest_close - 1.5×ATR20)
# PROFIT_LOCK_30/APEX: max(ratchet_stop, highest_close - 2.0×ATR20)
# Others: null
# ─────────────────────────────────────────────────────────────────────────────
def compute_ratchet_trailing_auto(profit_lock_stage: str, ratchet_stop: float,
highest_close: float, atr20: float) -> dict:
_mult = {"PROFIT_LOCK_20": 1.5, "PROFIT_LOCK_30": 2.0, "APEX_TRAILING": 2.0}
m = _mult.get(profit_lock_stage)
if m is None:
return {"auto_trailing_stop": None, "note": f"{profit_lock_stage}: no trailing"}
atr_stop = highest_close - m * atr20
result = max(ratchet_stop, atr_stop)
return {"auto_trailing_stop": round(result), "atr_stop": round(atr_stop),
"ratchet_stop": ratchet_stop, "multiplier": m}
# ─────────────────────────────────────────────────────────────────────────────
# STOP_PRICE_CORE_V1
# max(entry × 0.92, entry - atr20 × multiplier)
# ─────────────────────────────────────────────────────────────────────────────
def compute_stop_price_core(entry_price: float, atr20: float, atr_multiplier: float) -> dict:
return _compute_stop_price_core(entry_price, atr20, atr_multiplier=atr_multiplier)
# ─────────────────────────────────────────────────────────────────────────────
# TARGET_CASH_PCT_V1
# max(5 + (mrs/10)×15, cash_floor_regime_min_pct)
# ─────────────────────────────────────────────────────────────────────────────
def compute_target_cash_pct(market_risk_score: float, cash_floor_regime_min_pct: float) -> dict:
formula_result = 5 + (market_risk_score / 10) * 15
target = max(formula_result, cash_floor_regime_min_pct)
return {"target_cash_pct": round(target, 2), "formula_result": round(formula_result, 2)}
# ─────────────────────────────────────────────────────────────────────────────
# FLOW_CREDIT_V1
# C1×0.30 + C2×0.30 + C3×0.40
# ─────────────────────────────────────────────────────────────────────────────
def compute_flow_credit(c1_price_action: float, c2_volume_action: float,
c3_flow_action: float) -> dict:
credit = round(c1_price_action * 0.30 + c2_volume_action * 0.30 + c3_flow_action * 0.40, 4)
return {"flow_credit": credit}
# ─────────────────────────────────────────────────────────────────────────────
# MARKET_RISK_SCORE_V1
# min(10, vix + kospi + usd_krw + usd_jpy + credit)
# ─────────────────────────────────────────────────────────────────────────────
def compute_market_risk_score(vix_score: float, kospi_score: float, usd_krw_score: float,
usd_jpy_score: float, credit_score: float) -> dict:
raw = vix_score + kospi_score + usd_krw_score + usd_jpy_score + credit_score
return {"market_risk_score": min(10, raw), "raw_sum": raw}
# ─────────────────────────────────────────────────────────────────────────────
# PORTFOLIO_BETA_V1
# beta = Σ(beta_i × mv_i) / total_equity
# ─────────────────────────────────────────────────────────────────────────────
def compute_portfolio_beta(holdings: list, total_equity_value: float) -> dict:
if total_equity_value <= 0:
return {"portfolio_beta": None}
weighted = sum(h.get("beta", 0) * h.get("market_value", 0) for h in holdings
if h.get("beta") is not None)
return {"portfolio_beta": round(weighted / total_equity_value, 4)}
# ─────────────────────────────────────────────────────────────────────────────
# RISK_BUDGET_CASCADE_V1
# base × feedback_mult × brake_mult
# ─────────────────────────────────────────────────────────────────────────────
def compute_risk_budget_cascade(base_risk_budget: float, net_return_feedback_multiplier: float,
performance_brake_multiplier: float) -> dict:
result = base_risk_budget * net_return_feedback_multiplier * performance_brake_multiplier
return {"risk_budget": round(result, 6)}
# ─────────────────────────────────────────────────────────────────────────────
# MEAN_REVERSION_GATE_V1
# deviation_ratio = close / ma20; gate by ratio
# ─────────────────────────────────────────────────────────────────────────────
def compute_mean_reversion_gate(close_price: float, ma20: float) -> dict:
if ma20 <= 0:
return {"deviation_ratio": None, "gate": "DATA_MISSING"}
ratio = round(close_price / ma20, 4)
gate = "OVEREXTENDED" if ratio >= 1.10 else ("NORMAL" if ratio >= 0.95 else "OVERSOLD")
return {"deviation_ratio": ratio, "gate": gate}
# ─────────────────────────────────────────────────────────────────────────────
# T1_FORCED_SELL_RISK_V1
# min(100, sell_action_active*40 + timing_exit_ge_50*25 + rw_ge_2*25 + dist_ge_70*30)
# ─────────────────────────────────────────────────────────────────────────────
def compute_t1_forced_sell_risk(sell_action_active: int, timing_exit_ge_50: int,
rw_ge_2: int, distribution_ge_70: int) -> dict:
score = min(100, sell_action_active * 40 + timing_exit_ge_50 * 25 +
rw_ge_2 * 25 + distribution_ge_70 * 30)
gate = "HIGH_SELL_RISK" if score >= 70 else ("MODERATE" if score >= 40 else "LOW")
return {"t1_forced_sell_risk_score": score, "gate": gate}
# ─────────────────────────────────────────────────────────────────────────────
# SELL_CONFLICT_AWARE_RECOMMENDATION_V1
# min(100, sell_signal_active*55 + cash_preserve_active*20 + no_add_gate*20)
# ─────────────────────────────────────────────────────────────────────────────
def compute_sell_conflict_recommendation(sell_signal_active: int, cash_preserve_active: int,
no_add_gate: int) -> dict:
score = min(100, sell_signal_active * 55 + cash_preserve_active * 20 + no_add_gate * 20)
recommendation = "SELL_PRIORITY" if score >= 55 else ("REDUCE" if score >= 20 else "HOLD")
return {"conflict_score": score, "recommendation": recommendation}
# ─────────────────────────────────────────────────────────────────────────────
# DIVERGENCE_SCORE_V1
# divergence = price_above_ma20 × (frg_sell×0.40 + inst_sell×0.35 + vol_surge×0.25)
# ─────────────────────────────────────────────────────────────────────────────
def compute_divergence_score(price_above_ma20: int, foreign_net_sell: float,
institution_net_sell: float, vol_surge: float) -> dict:
score = price_above_ma20 * (foreign_net_sell * 0.40 + institution_net_sell * 0.35 +
vol_surge * 0.25)
score = round(min(100, max(-100, score)), 2)
gate = "DISTRIBUTION_SIGNAL" if score >= 50 else ("NEUTRAL" if score >= 0 else "ACCUMULATION")
return {"divergence_score": score, "gate": gate}
# ─────────────────────────────────────────────────────────────────────────────
# SEMICONDUCTOR_CLUSTER_SYNC_V1
# is_mandatory = cluster_pct > cluster_limit_pct × 2
# ─────────────────────────────────────────────────────────────────────────────
def compute_semiconductor_cluster_sync(cluster_pct: float, cluster_limit_pct: float) -> dict:
is_mandatory = cluster_pct > cluster_limit_pct * 2
ratio = round(cluster_pct / cluster_limit_pct, 3) if cluster_limit_pct > 0 else None
gate = "MANDATORY_REDUCE" if is_mandatory else "PASS"
return {"is_mandatory": is_mandatory, "cluster_ratio": ratio, "gate": gate}
# ─────────────────────────────────────────────────────────────────────────────
# GOAL_RETIREMENT_V1
# achievement_pct = round(total_asset / GOAL_KRW * 1000) / 10
# goal_remaining_krw = max(0, GOAL_KRW - total_asset)
# ─────────────────────────────────────────────────────────────────────────────
def compute_goal_retirement(total_asset_krw: float, goal_krw: float = 500_000_000) -> dict:
achievement_pct = round(total_asset_krw / goal_krw * 1000) / 10
remaining_krw = max(0.0, goal_krw - total_asset_krw)
status = "ACHIEVED" if achievement_pct >= 100 else "IN_PROGRESS"
return {
"goal_achievement_pct": achievement_pct,
"goal_remaining_krw": round(remaining_krw),
"goal_status": status,
}
# ─────────────────────────────────────────────────────────────────────────────
# VELOCITY_1D_V1 (renamed from VELOCITY_V1 for clarity)
# velocity = (close - prev_close) / prev_close × 100
# ─────────────────────────────────────────────────────────────────────────────
def compute_velocity_1d(close: float, prev_close: float) -> dict:
if prev_close <= 0:
return {"velocity_1d": None}
v = round((close - prev_close) / prev_close * 100, 4)
return {"velocity_1d": v}
# ─────────────────────────────────────────────────────────────────────────────
# POSITION_SIZE_V1
# position_size = min(atr_qty, cash_limit_qty, weight_limit_qty, sector_limit_qty)
# atr_qty = floor(risk_budget_krw / (atr20 × atr_mult × close))
# ─────────────────────────────────────────────────────────────────────────────
def compute_position_size(risk_budget_krw: float, atr20: float, atr_mult: float,
close: float, cash_limit_qty: int,
weight_limit_qty: int, sector_limit_qty: int) -> dict:
import math
if atr20 <= 0 or close <= 0 or atr_mult <= 0:
return {"position_size_qty": 0, "binding_constraint": "DATA_MISSING"}
atr_qty = math.floor(risk_budget_krw / (atr20 * atr_mult * close))
constraints = {
"atr": atr_qty, "cash": cash_limit_qty,
"weight": weight_limit_qty, "sector": sector_limit_qty
}
min_val = min(constraints.values())
binding = [k for k, v in constraints.items() if v == min_val][0]
return {"position_size_qty": max(0, min_val), "atr_qty": atr_qty,
"binding_constraint": binding, "constraints": constraints}
if __name__ == "__main__":
raise SystemExit(main())