비기계적 매도전략(가치보존) + 위성종목 추천 엔진 추가

매크로·실적·펀더멘털·공매도수급·호가미시구조·대내외 변수 5개 독립
팩터군의 confluence(최소 3/5 합의) 없이는 매도 트리거를 금지하는
정성적 매도판단 엔진과, 보유종목 제외 위성후보 추천 로직을 추가한다.

- 단일 팩터 임계값 돌파만으로는 매도 신호를 생성하지 않음
  (mechanical_sell_prohibited=true)
- 데이터 결측 시 항상 DATA_MISSING/INSUFFICIENT_DATA_NO_ACTION —
  추정값으로 채우지 않음
- KIS 호가10단계·공매도거래비중 + Naver 시세/수급 스크래핑 입력 연동
- SQLite 시계열 저장 + 사후 적중률 자체평가
  (evaluate_qualitative_sell_strategy_accuracy_v1)
- Gitea 일일 스케줄(장마감 후) + 파이프라인 계약 검증 게이트
This commit is contained in:
2026-06-21 20:05:55 +09:00
parent 4cb206a269
commit da0e1b0f7e
15 changed files with 2321 additions and 0 deletions
@@ -0,0 +1,146 @@
"""qualitative_sell_strategy_v1 산출물의 SQLite 시계열 저장소.
GAS/xlsx 구조와 완전히 분리된 추가(additive) 저장소다 — 이 모듈이 다루는 데이터는
순수 Python 산출물(KIS API 수집 + confluence 판단 결과)이며, GAS가 쓰지도 읽지도
않고 사람이 시트에서 직접 편집하지도 않는다. 기존 outputs/qualitative_sell_strategy/
*.json 파일 출력을 대체하지 않고 병행 저장한다(JSON은 1회성 점검용, SQLite는 시계열
추이 조회용). 표준 라이브러리 sqlite3만 사용 — 추가 의존성 없음.
"""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from dataclasses import dataclass
from typing import Any
from src.quant_engine.storage_backend_v1 import StoreSpec, default_sqlite_store_path, normalize_store_spec
SCHEMA = """
CREATE TABLE IF NOT EXISTS sell_strategy_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL,
generated_at TEXT NOT NULL,
action TEXT,
conviction TEXT,
market_regime TEXT,
composite_score REAL,
rationale TEXT,
raw_json TEXT NOT NULL,
inserted_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_sell_strategy_code_time
ON sell_strategy_results(code, generated_at);
CREATE TABLE IF NOT EXISTS satellite_recommendations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
generated_at TEXT NOT NULL,
satellite_action TEXT,
attractiveness_score REAL,
market_regime TEXT,
raw_json TEXT NOT NULL,
inserted_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_satellite_ticker_time
ON satellite_recommendations(ticker, generated_at);
"""
@dataclass(frozen=True)
class QualitativeSellStoreSpec(StoreSpec):
pass
def default_qualitative_sell_store_path(root: Path) -> Path:
return default_sqlite_store_path(root, "qualitative_sell_strategy/qualitative_sell_strategy.db")
def resolve_store_path(spec: QualitativeSellStoreSpec, root: Path) -> Path:
backend, location = normalize_store_spec(
spec,
root,
default_sqlite_name="qualitative_sell_strategy/qualitative_sell_strategy.db",
)
if backend != "sqlite":
raise ValueError(
"qualitative_sell_strategy_store_v1 currently executes on sqlite only; "
"the caller contract already allows future PostgreSQL swap-in."
)
return Path(location)
def init_db(db_path: Path) -> None:
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path)
try:
conn.executescript(SCHEMA)
conn.commit()
finally:
conn.close()
def insert_sell_strategy_result(db_path: Path, result: dict[str, Any]) -> None:
"""build_qualitative_sell_inputs_v1.process_one()의 반환값(dict)을 그대로 받는다."""
init_db(db_path)
decision = result.get("decision") or {}
conn = sqlite3.connect(db_path)
try:
conn.execute(
"INSERT INTO sell_strategy_results "
"(code, generated_at, action, conviction, market_regime, composite_score, rationale, raw_json) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(
result.get("code"),
result.get("generated_at"),
decision.get("action"),
decision.get("conviction"),
decision.get("market_regime"),
decision.get("composite_score"),
decision.get("rationale"),
json.dumps(result, ensure_ascii=False, default=str),
),
)
conn.commit()
finally:
conn.close()
def insert_satellite_recommendation(db_path: Path, generated_at: str, candidate: dict[str, Any]) -> None:
"""build_satellite_candidate_recommendations_v1.py results[i] 항목 하나를 받는다."""
init_db(db_path)
score = candidate.get("score") or {}
conn = sqlite3.connect(db_path)
try:
conn.execute(
"INSERT INTO satellite_recommendations "
"(ticker, generated_at, satellite_action, attractiveness_score, market_regime, raw_json) "
"VALUES (?, ?, ?, ?, ?, ?)",
(
candidate.get("ticker"),
generated_at,
score.get("satellite_action"),
score.get("attractiveness_score"),
score.get("market_regime"),
json.dumps(candidate, ensure_ascii=False, default=str),
),
)
conn.commit()
finally:
conn.close()
def fetch_recent_sell_strategy_results(db_path: Path, code: str, limit: int = 20) -> list[dict[str, Any]]:
if not db_path.exists():
return []
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"SELECT code, generated_at, action, conviction, market_regime, composite_score, rationale "
"FROM sell_strategy_results WHERE code = ? ORDER BY generated_at DESC LIMIT ?",
(code, limit),
).fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
@@ -0,0 +1,377 @@
from __future__ import annotations
import math
from datetime import date, timedelta
from typing import Any
# 매도 결정에 동원하는 5개 독립 팩터군. 단일 팩터의 임계값 돌파만으로는 행동을
# 트리거하지 않는다 — 최소 CONFLUENCE_MIN개 팩터군이 동일 방향으로 합의해야
# SELL/ADD 확신도가 성립한다. (기계적 단일 트리거 매도 금지 원칙)
FACTOR_FAMILIES: tuple[str, ...] = (
"macro_pressure",
"fundamental_trajectory",
"short_interest_pressure",
"microstructure_pressure",
"liquidity_rotation_risk",
)
CONFLUENCE_MIN = 3
EVENT_PRE_GUARD_DAYS = 5 # macro_event_synchronizer_v2.event_hold_gate와 동일 — HIGH 이벤트 5일 전
EVENT_POST_GUARD_DAYS = 2 # 이벤트 후 2일 변동성 소화 구간
# 금리국면별 시장 성격: 금리 상승기=실적장세(펀더멘털/수출입 실적이 가격을 주도),
# 금리 보합·하락기=기술장세(수급·미시구조가 가격을 주도). 동일한 5팩터라도
# 국면에 따라 가중치를 달리 줘야 confluence가 의미를 갖는다.
REGIME_FLAT_WEIGHTS: dict[str, float] = {family: 1.0 for family in FACTOR_FAMILIES}
REGIME_WEIGHT_TABLE: dict[str, dict[str, float]] = {
"PERFORMANCE_MARKET": { # 금리 상승기 — 실적/수출입 펀더멘털 가중 상향
"macro_pressure": 1.2,
"fundamental_trajectory": 1.8,
"short_interest_pressure": 1.0,
"microstructure_pressure": 0.5,
"liquidity_rotation_risk": 1.0,
},
"TECHNICAL_MARKET": { # 금리 보합·하락기 — 수급/미시구조 가중 상향
"macro_pressure": 0.8,
"fundamental_trajectory": 0.8,
"short_interest_pressure": 1.3,
"microstructure_pressure": 1.6,
"liquidity_rotation_risk": 1.3,
},
"NEUTRAL": REGIME_FLAT_WEIGHTS,
}
def classify_market_regime(rate_trend: str | None) -> str:
"""금리 추세 문자열(RISING/FLAT/FALLING)을 실적장세/기술장세로 분류.
RISING → PERFORMANCE_MARKET(실적장세): 금리 상승기엔 유동성보다 실적/펀더멘털이
가격을 결정. FLAT/FALLING → TECHNICAL_MARKET(기술장세): 유동성이 풍부해 수급·
미시구조·테마성 모멘텀이 가격을 주도. 입력 결측 시 NEUTRAL(가중치 변화 없음).
"""
trend = str(rate_trend or "").upper()
if trend == "RISING":
return "PERFORMANCE_MARKET"
if trend in {"FLAT", "FALLING"}:
return "TECHNICAL_MARKET"
return "NEUTRAL"
def _finite(value: Any) -> bool:
return isinstance(value, (int, float)) and math.isfinite(float(value))
def compute_short_interest_composite(ctx: dict[str, Any]) -> dict[str, Any]:
"""SHORT_INTEREST_RISK_GAUGE_V1.
5요소: 공매도잔고율 변화, 공매도거래비중, 상대수익률(섹터/지수 대비),
거래량 이상, 실적전망. 잔고율 단독으로는 매도 근거가 약함(현대로템형) —
잔고율이 낮을 때는 거래비중·상대수익률 가중치를 자동 상향한다.
"""
missing: list[str] = []
short_balance_ratio = ctx.get("short_balance_ratio") # %, 현재 잔고율
short_balance_ratio_chg_20d = ctx.get("short_balance_ratio_chg_20d") # %p, 20일 변화
short_turnover_share = ctx.get("short_turnover_share") # 당일 거래 중 공매도 비중 %
relative_return_20d = ctx.get("relative_return_20d") # 종목수익률 - 섹터(or지수)수익률, %p
volume_ratio_5d = ctx.get("volume_ratio_5d") # 5일평균거래량 대비 비율
earnings_outlook = str(ctx.get("earnings_outlook") or "").upper() # IMPROVING|STABLE|DETERIORATING|UNKNOWN
for name, value in (
("short_balance_ratio", short_balance_ratio),
("short_turnover_share", short_turnover_share),
("relative_return_20d", relative_return_20d),
):
if not _finite(value):
missing.append(name)
if missing:
return {
"short_interest_pressure": None,
"status": "DATA_MISSING",
"missing_inputs": missing,
"note": "잔고율/거래비중/상대수익률 중 결측 — 공매도 합성 점수를 산출하지 않음(추정 금지)",
}
low_balance_regime = float(short_balance_ratio) < 1.0 # 잔고율 1% 미만이면 '낮은 잔고율' 취급(현대로템형)
# 잔고율 추세: 상승=매도근거 강화, 하락=매도근거 약화(혹은 매수근거)
balance_trend_signal = 0.0
if _finite(short_balance_ratio_chg_20d):
balance_trend_signal = max(-1.0, min(1.0, float(short_balance_ratio_chg_20d) / 1.5))
turnover_signal = max(-1.0, min(1.0, (float(short_turnover_share) - 8.0) / 12.0)) # 8% 기준선
relative_return_signal = max(-1.0, min(1.0, -float(relative_return_20d) / 10.0)) # 상대 약세일수록 +
volume_signal = 0.0
if _finite(volume_ratio_5d):
volume_signal = max(-1.0, min(1.0, (float(volume_ratio_5d) - 1.0)))
outlook_signal = {
"IMPROVING": -0.6,
"STABLE": 0.0,
"DETERIORATING": 0.7,
}.get(earnings_outlook, 0.0)
if low_balance_regime:
# 잔고율 자체는 약한 근거 — 거래비중·상대수익률 가중치 상향, 잔고율추세 가중치 하향
weights = {"balance": 0.10, "turnover": 0.30, "relative": 0.30, "volume": 0.10, "outlook": 0.20}
else:
weights = {"balance": 0.30, "turnover": 0.20, "relative": 0.20, "volume": 0.10, "outlook": 0.20}
pressure = (
balance_trend_signal * weights["balance"]
+ turnover_signal * weights["turnover"]
+ relative_return_signal * weights["relative"]
+ volume_signal * weights["volume"]
+ outlook_signal * weights["outlook"]
)
pressure = max(-1.0, min(1.0, pressure))
label = "ELEVATED_SHORT_PRESSURE" if pressure >= 0.5 else "WATCH" if pressure >= 0.2 else \
"SHORT_COVERING_SUPPORTIVE" if pressure <= -0.5 else "NEUTRAL"
return {
"short_interest_pressure": round(pressure, 4),
"status": "OK",
"low_balance_regime": low_balance_regime,
"label": label,
"components": {
"balance_trend_signal": round(balance_trend_signal, 4),
"turnover_signal": round(turnover_signal, 4),
"relative_return_signal": round(relative_return_signal, 4),
"volume_signal": round(volume_signal, 4),
"outlook_signal": outlook_signal,
},
"weights_used": weights,
}
def compute_microstructure_pressure_from_orderbook(orderbook_output1: dict[str, Any]) -> dict[str, Any]:
"""MICROSTRUCTURE_PRESSURE_FROM_ORDERBOOK_V1.
KIS Open API FHKST01010200(주식현재가 호가/예상체결) output1의 10단계 호가 잔량을
-1(매수우위/지지)~+1(매도우위/압력)로 계량화. 실측 확인된 필드명(2026-06-21,
005930 라이브 호출): total_askp_rsqn, total_bidp_rsqn(10단계 합계 잔량).
이 점수는 전략 방향 결정에는 쓰지 않고 confluence가 성립한 이후의 '집행 타이밍'
보조로만 사용한다(spec/exit/qualitative_sell_strategy_v1.yaml:factor_families.
microstructure_pressure 참조).
"""
total_askp = orderbook_output1.get("total_askp_rsqn")
total_bidp = orderbook_output1.get("total_bidp_rsqn")
try:
total_askp = float(total_askp)
total_bidp = float(total_bidp)
except (TypeError, ValueError):
return {"microstructure_pressure": None, "status": "DATA_MISSING"}
denom = total_askp + total_bidp
if denom <= 0:
return {"microstructure_pressure": None, "status": "DATA_MISSING"}
pressure = max(-1.0, min(1.0, (total_askp - total_bidp) / denom))
return {
"microstructure_pressure": round(pressure, 4),
"status": "OK",
"total_askp_rsqn": total_askp,
"total_bidp_rsqn": total_bidp,
}
def _event_review_window(
today: date,
pressure_sign: int,
next_earnings_date: date | None,
next_macro_event_date: date | None,
macro_event_impact: str | None,
earnings_outlook: str,
) -> dict[str, Any]:
"""캘린더 기반 검토 구간 산출 — 임의 날짜 고정이 아니라 실제 이벤트 일정에서 역산."""
candidates: list[tuple[date, str]] = []
if next_earnings_date is not None:
if pressure_sign < 0 and earnings_outlook == "DETERIORATING":
# 실적 악화 전망 + 매도압력 → 실적발표 전 정리(서프라이즈 리스크 회피)
candidates.append((next_earnings_date - timedelta(days=EVENT_PRE_GUARD_DAYS), "PRE_EARNINGS_EXIT_BEFORE_SURPRISE_RISK"))
elif pressure_sign < 0 and earnings_outlook in {"IMPROVING", "STABLE"}:
# 단기 기술적 매도압력이지만 실적전망은 양호 → 발표 직전 매도는 가치훼손, 발표 이후로 연기
candidates.append((next_earnings_date + timedelta(days=EVENT_POST_GUARD_DAYS), "DEFER_TO_POST_EARNINGS_AVOID_PREMATURE_EXIT"))
elif pressure_sign > 0:
# 추가매수/보유 신호 — 발표 변동성 통과 후 확신 재평가
candidates.append((next_earnings_date + timedelta(days=EVENT_POST_GUARD_DAYS), "REASSESS_AFTER_EARNINGS_CONFIRM"))
if next_macro_event_date is not None and str(macro_event_impact or "").upper() in {"HIGH", "VERY_HIGH"}:
if pressure_sign < 0:
candidates.append((next_macro_event_date - timedelta(days=EVENT_PRE_GUARD_DAYS), "PRE_MACRO_EVENT_DERISK"))
else:
candidates.append((next_macro_event_date + timedelta(days=EVENT_POST_GUARD_DAYS), "POST_MACRO_EVENT_CONFIRM"))
if not candidates:
return {
"review_window_start": today.isoformat(),
"review_window_end": (today + timedelta(days=10)).isoformat(),
"window_basis": "NO_SCHEDULED_EVENT_DEFAULT_10D_REVIEW",
}
earliest = min(candidates, key=lambda item: item[0])
window_start = max(today, earliest[0] - timedelta(days=2))
window_end = earliest[0] + timedelta(days=2)
return {
"review_window_start": window_start.isoformat(),
"review_window_end": window_end.isoformat(),
"window_basis": earliest[1],
}
def compute_qualitative_sell_strategy(ctx: dict[str, Any]) -> dict[str, Any]:
"""QUALITATIVE_SELL_STRATEGY_V1.
매크로/실적/펀더멘털/공매도수급/호가미시구조/대내외(IPO·로테이션) 5개
독립 팩터군의 합의(confluence)로만 행동을 생성한다. 현금부족 사유는
입력에서 의도적으로 배제(cash_shortfall_excluded=True) — 가치보존이
유일한 목적 함수.
"""
today_raw = ctx.get("today")
today = today_raw if isinstance(today_raw, date) else date.today()
factor_values: dict[str, float | None] = {}
missing_factors: list[str] = []
for family in FACTOR_FAMILIES:
value = ctx.get(family)
if _finite(value):
factor_values[family] = max(-1.0, min(1.0, float(value)))
else:
factor_values[family] = None
missing_factors.append(family)
available = {k: v for k, v in factor_values.items() if v is not None}
if len(available) < CONFLUENCE_MIN:
return {
"action": "INSUFFICIENT_DATA_NO_ACTION",
"conviction": "NONE",
"available_factors": list(available.keys()),
"missing_factors": missing_factors,
"rationale": "5개 팩터군 중 confluence 판정에 필요한 최소 데이터가 부족 — 추정으로 행동 생성 금지",
"cash_shortfall_excluded": True,
"mechanical_sell_prohibited": True,
}
# 부호 규약: 모든 팩터군은 +1(매도압력 최대) ~ -1(보유/추가 지지 최대) 동일 스케일.
# short_interest_pressure도 동일 — ELEVATED_SHORT_PRESSURE(+) / SHORT_COVERING_SUPPORTIVE(-).
# confluence 합의 카운트는 국면 가중치와 무관하게 원시 방향성으로만 판정한다
# (가중치는 행동 '강도'에만 영향 — 합의 성립 여부 자체를 왜곡하지 않는다).
sell_agree = [k for k, v in available.items() if v >= 0.30]
hold_add_agree = [k for k, v in available.items() if v <= -0.30]
market_regime = classify_market_regime(ctx.get("rate_trend")) if "market_regime" not in ctx else str(ctx.get("market_regime") or "NEUTRAL").upper()
regime_weights = REGIME_WEIGHT_TABLE.get(market_regime, REGIME_FLAT_WEIGHTS)
weighted_sum = sum(available[k] * regime_weights.get(k, 1.0) for k in available)
weight_total = sum(regime_weights.get(k, 1.0) for k in available)
composite_score = weighted_sum / weight_total if weight_total else 0.0
earnings_outlook = str(ctx.get("earnings_outlook") or "STABLE").upper()
next_earnings_date = ctx.get("next_earnings_date") if isinstance(ctx.get("next_earnings_date"), date) else None
next_macro_event_date = ctx.get("next_macro_event_date") if isinstance(ctx.get("next_macro_event_date"), date) else None
macro_event_impact = ctx.get("macro_event_impact")
if len(sell_agree) >= CONFLUENCE_MIN:
conviction = "HIGH" if len(sell_agree) >= 4 else "MEDIUM"
action = "EXIT_REVIEW_FULL" if composite_score >= 0.6 else "TRIM_REVIEW_PARTIAL"
pressure_sign = -1
rationale = f"매도압력 합의({len(sell_agree)}/{len(available)} 팩터군 매도방향 합치): " + ", ".join(sell_agree)
elif len(hold_add_agree) >= CONFLUENCE_MIN:
conviction = "HIGH" if len(hold_add_agree) >= 4 else "MEDIUM"
action = "HOLD_ADD_CONVICTION"
pressure_sign = 1
rationale = f"보유/추가 근거 합의({len(hold_add_agree)}/{len(available)} 팩터군 지지방향 합치): " + ", ".join(hold_add_agree)
else:
conviction = "LOW"
action = "HOLD_NO_CONFLUENCE"
pressure_sign = 0
rationale = "팩터군 간 합의 미달 — 단일/소수 팩터의 임계값 돌파만으로는 매도 트리거 금지"
window = _event_review_window(
today=today,
pressure_sign=pressure_sign,
next_earnings_date=next_earnings_date,
next_macro_event_date=next_macro_event_date,
macro_event_impact=macro_event_impact,
earnings_outlook=earnings_outlook,
) if pressure_sign != 0 else None
return {
"action": action,
"conviction": conviction,
"market_regime": market_regime,
"composite_score": round(composite_score, 4),
"sell_agreeing_factors": sell_agree,
"hold_add_agreeing_factors": hold_add_agree,
"missing_factors": missing_factors,
"review_window": window,
"rationale": rationale,
"cash_shortfall_excluded": True,
"mechanical_sell_prohibited": True,
}
def compute_satellite_candidate_score(ctx: dict[str, Any]) -> dict[str, Any]:
"""SATELLITE_CANDIDATE_SCORE_V1.
미보유 유니버스 종목을 섹터 수출입 전망(sector_export_trend) + 펀더멘털
추세 + 국면적합도로 평가해 WATCH/BUY_CANDIDATE/AVOID를 산출한다. 보유종목
매도판단(compute_qualitative_sell_strategy)과 동일한 부호 규약을 쓰지 않고
별도 -1(약세)~+1(강세) 매력도 스케일을 쓴다 — 매수후보 평가와 매도판단은
목적함수가 다르므로 동일 점수를 재사용하지 않는다.
"""
sector_export_trend = ctx.get("sector_export_trend") # %, 섹터 수출 YoY/MoM 추세
fundamental_trajectory = ctx.get("fundamental_trajectory") # -1(악화)~+1(개선), 매도엔진과 동일 정의역이나 부호 반대 해석 주의
relative_return_20d = ctx.get("relative_return_20d")
market_regime = str(ctx.get("market_regime") or classify_market_regime(ctx.get("rate_trend"))).upper()
missing = [name for name, value in (
("sector_export_trend", sector_export_trend),
("fundamental_trajectory", fundamental_trajectory),
) if not _finite(value)]
if missing:
return {
"satellite_action": "INSUFFICIENT_DATA_NO_ACTION",
"missing_inputs": missing,
"market_regime": market_regime,
}
export_signal = max(-1.0, min(1.0, float(sector_export_trend) / 10.0))
fundamental_signal = max(-1.0, min(1.0, -float(fundamental_trajectory))) # 매도엔진 부호(+)=악화 -> 매력도는 반전
relative_signal = max(-1.0, min(1.0, float(relative_return_20d) / 10.0)) if _finite(relative_return_20d) else 0.0
if market_regime == "PERFORMANCE_MARKET":
weights = {"export": 0.45, "fundamental": 0.40, "relative": 0.15}
elif market_regime == "TECHNICAL_MARKET":
weights = {"export": 0.20, "fundamental": 0.25, "relative": 0.55}
else:
weights = {"export": 0.34, "fundamental": 0.33, "relative": 0.33}
attractiveness = (
export_signal * weights["export"]
+ fundamental_signal * weights["fundamental"]
+ relative_signal * weights["relative"]
)
attractiveness = max(-1.0, min(1.0, attractiveness))
if attractiveness >= 0.5:
satellite_action = "BUY_CANDIDATE"
elif attractiveness >= 0.2:
satellite_action = "WATCH"
elif attractiveness <= -0.4:
satellite_action = "AVOID"
else:
satellite_action = "NEUTRAL_NO_EDGE"
return {
"satellite_action": satellite_action,
"attractiveness_score": round(attractiveness, 4),
"market_regime": market_regime,
"components": {
"export_signal": round(export_signal, 4),
"fundamental_signal": round(fundamental_signal, 4),
"relative_signal": round(relative_signal, 4),
},
"weights_used": weights,
}