Files
QuantEngineByItz/tools/build_qualitative_sell_inputs_v1.py

469 lines
21 KiB
Python

"""qualitative_sell_strategy_v1 입력 ctx 조립 오케스트레이터.
데이터 출처 (2026-06-22 기준, KIS Open API 우선):
- relative_return_20d, volume_ratio_5d ← KIS Open API 우선, Naver는 fallback
- sector_export_trend ← tools/fetch_trade_statistics_motie_v1.py (--csv 경로 권장)
- short_turnover_share ← [신규] KIS Open API daily-short-sale(FHPST04830000)
output2.ssts_vol_rlim — 실측 동작 확인(실전계좌 도메인,
모의계좌 도메인은 500 에러). --kis-account real 필요.
- short_balance_ratio(잔고율) ← 여전히 미확보. KIS API도 제공하지 않음(KRX 공매도종합
포털 대량보유 공시 전용 데이터) — --short-csv 수동
다운로드로만 가능.
- microstructure_pressure(호가10단계) ← [신규] KIS Open API inquire-asking-price-exp-ccn
(FHKST01010200) output1.total_askp_rsqn/total_bidp_rsqn
— 실측 동작 확인(실전+모의 도메인 모두). --kis-account
{real,mock}로 활성화.
- macro_pressure, rate_trend, next_earnings_date, next_macro_event_date, macro_event_impact
← 기존 GAS 하네스(macro_event_synchronizer_v2,
gas_event_calendar.gs)가 이미 산출/수집 중 —
이 스크립트가 중복 수집하지 않고 --context-json/
--workbook으로 그 결과를 주입받는다.
- investing.com ← 직접 스크래핑 403(Cloudflare) 차단 확인. 사용 안 함.
[CRITICAL] KIS API는 조회(read-only)로만 사용한다 — 매수/매도 주문은 어떤 경우에도 이 코드를
통해 실행하지 않는다(governance/rules/06_no_direct_api_trading.yaml, CI 강제 게이트
tools/validate_no_direct_api_trading_v1.py).
사용 예:
python tools/build_qualitative_sell_inputs_v1.py \
--ticker 005930 --benchmark-code 069500 --sector 반도체 \
--kis-account real --short-csv Temp/krx_short_balance_manual.csv \
--context-json Temp/macro_context.json --apply
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from tools.fetch_naver_market_data_v1 import (
_session,
compute_relative_return_20d,
compute_volume_ratio_5d,
fetch_price_history,
)
from tools.fetch_trade_statistics_motie_v1 import (
compute_sector_export_trend,
load_trade_statistics_csv,
)
from src.quant_engine.qualitative_sell_strategy_v1 import (
compute_microstructure_pressure_from_orderbook,
compute_qualitative_sell_strategy,
compute_short_interest_composite,
)
from src.quant_engine.qualitative_sell_strategy_store_v1 import (
QualitativeSellStoreSpec,
insert_sell_strategy_result,
resolve_store_path,
)
DEFAULT_OUTPUT_DIR = ROOT / "outputs" / "qualitative_sell_strategy"
DEFAULT_SQLITE_DB = DEFAULT_OUTPUT_DIR / "qualitative_sell_strategy.db"
def _kst_now_iso() -> str:
return dt.datetime.now(dt.timezone(dt.timedelta(hours=9))).isoformat()
def _parse_date(value: str | None) -> dt.date | None:
if not value:
return None
try:
return dt.date.fromisoformat(value)
except ValueError:
return None
def _coerce_price_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
for row in rows:
if not isinstance(row, dict):
continue
normalized.append(
{
"date": str(row.get("date") or "").strip(),
"close": row.get("close"),
"open": row.get("open"),
"high": row.get("high"),
"low": row.get("low"),
"volume": row.get("volume"),
}
)
return [row for row in normalized if row["date"]]
def _parse_kis_price_rows(payload: dict[str, Any]) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for key in ("output2", "output1", "output"):
items = payload.get(key)
if not isinstance(items, list):
continue
for item in items:
if not isinstance(item, dict):
continue
date = str(
item.get("stck_bsop_date")
or item.get("data_date")
or item.get("trd_dd")
or item.get("date")
or ""
).strip()
close = item.get("stck_clpr") or item.get("close") or item.get("price")
volume = item.get("acml_vol") or item.get("volume") or item.get("trd_vol") or 0
if not date:
continue
try:
close_val = float(str(close).replace(",", ""))
except Exception:
close_val = 0.0
try:
volume_val = float(str(volume).replace(",", ""))
except Exception:
volume_val = 0.0
rows.append(
{
"date": date.replace(".", "-"),
"close": close_val,
"open": float(str(item.get("stck_oprc") or item.get("open") or close or 0).replace(",", "")) if str(item.get("stck_oprc") or item.get("open") or close or 0).replace(",", "").strip() else close_val,
"high": float(str(item.get("stck_hgpr") or item.get("high") or close or 0).replace(",", "")) if str(item.get("stck_hgpr") or item.get("high") or close or 0).replace(",", "").strip() else close_val,
"low": float(str(item.get("stck_lwpr") or item.get("low") or close or 0).replace(",", "")) if str(item.get("stck_lwpr") or item.get("low") or close or 0).replace(",", "").strip() else close_val,
"volume": volume_val,
}
)
return rows
def fetch_price_history_kis(code: str, kis_account: str | None, benchmark_code: str | None = None) -> dict[str, Any]:
if not kis_account:
return {"status": "DATA_MISSING", "rows": []}
from src.quant_engine.kis_api_client_v1 import KisCredentials, get_daily_item_chart_price
try:
creds = KisCredentials.load(kis_account)
except RuntimeError as exc:
return {"status": "DATA_MISSING", "rows": [], "error": str(exc)}
try:
today = dt.date.today()
end = today.strftime("%Y%m%d")
start = (today - dt.timedelta(days=40)).strftime("%Y%m%d")
payload = get_daily_item_chart_price(creds, code, start, end, period="D")
rows = _parse_kis_price_rows(payload)
if benchmark_code is not None and not rows:
return {"status": "DATA_MISSING", "rows": []}
if rows:
return {
"status": "OK",
"rows": rows,
"source_url": "KIS Open API /uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice",
"source_as_of": _kst_now_iso(),
}
except Exception as exc: # noqa: BLE001
return {"status": "DATA_MISSING", "rows": [], "error": str(exc)}
return {"status": "DATA_MISSING", "rows": []}
def _fetch_price_bundle(
code: str,
*,
kis_account: str | None,
prefer_kis: bool = True,
) -> dict[str, Any]:
"""가격 히스토리와 벤치마크 히스토리를 동일 규칙으로 조립한다.
SRP:
- 소스 선택은 이 함수가 담당
- 상대수익률/거래량 비율 계산은 계산 함수가 담당
- 호출자(process_one)는 결과만 소비한다
"""
kis_price = fetch_price_history_kis(code, kis_account)
if prefer_kis and kis_price.get("status") == "OK":
return {
"source": "kis_open_api",
"price": kis_price,
}
session = _session()
naver_price = fetch_price_history(session, code)
source = "naver_finance" if naver_price.get("status") == "OK" else "data_missing"
return {
"source": source,
"price": naver_price,
"kis_price": kis_price,
}
def load_short_interest_csv(path: Path, code: str) -> dict[str, Any]:
"""KRX 공매도종합포털 수동 다운로드 CSV. 컬럼: 종목코드, 잔고율, 잔고율변화20일, 거래비중."""
import csv
with path.open(encoding="utf-8-sig", newline="") as f:
for row in csv.DictReader(f):
row_code = str(row.get("종목코드") or row.get("code") or "").strip().zfill(6)
if row_code == code:
return {
"short_balance_ratio": float(row.get("잔고율") or row.get("short_balance_ratio") or 0),
"short_balance_ratio_chg_20d": float(row.get("잔고율변화20일") or row.get("short_balance_ratio_chg_20d") or 0),
"short_turnover_share": float(row.get("거래비중") or row.get("short_turnover_share") or 0),
}
return {}
def fetch_kis_supplement(code: str, kis_account: str | None) -> dict[str, Any]:
"""KIS Open API에서 short_turnover_share(공매도거래비중)와 microstructure_pressure
(호가10단계)를 조회한다. 조회(read-only)만 수행 — 주문 관련 호출 없음."""
if not kis_account:
return {}
from src.quant_engine.kis_api_client_v1 import KisCredentials, get_asking_price_10_level, get_daily_short_sale
result: dict[str, Any] = {}
try:
creds = KisCredentials.load(kis_account)
except RuntimeError as exc:
return {"kis_error": str(exc)}
try:
ob = get_asking_price_10_level(creds, code)
micro = compute_microstructure_pressure_from_orderbook(ob.get("output1", {}))
if micro.get("status") == "OK":
result["microstructure_pressure"] = micro["microstructure_pressure"]
except Exception as exc: # noqa: BLE001 — KIS 호출 실패가 전체 파이프라인을 막지 않음
result["kis_orderbook_error"] = str(exc)
try:
today = dt.date.today()
start = (today - dt.timedelta(days=10)).strftime("%Y%m%d")
end = today.strftime("%Y%m%d")
ss = get_daily_short_sale(creds, code, start, end)
rows = ss.get("output2") or []
if rows:
latest = rows[0]
ssts_vol_rlim = latest.get("ssts_vol_rlim")
if ssts_vol_rlim is not None:
result["short_turnover_share"] = float(ssts_vol_rlim)
except Exception as exc: # noqa: BLE001
result["kis_short_sale_error"] = str(exc)
return result
def build_ctx_for_ticker(
code: str,
benchmark_code: str,
sector: str | None,
earnings_outlook: str,
trade_csv: Path | None,
short_csv: Path | None,
external_context: dict[str, Any],
kis_account: str | None = None,
) -> dict[str, Any]:
price_bundle = _fetch_price_bundle(code, kis_account=kis_account, prefer_kis=True)
benchmark_bundle = _fetch_price_bundle(benchmark_code, kis_account=kis_account, prefer_kis=True)
price = price_bundle["price"]
benchmark = benchmark_bundle["price"]
price_rows = _coerce_price_rows(price.get("rows") or [])
benchmark_rows = _coerce_price_rows(benchmark.get("rows") or [])
relative_return_20d = compute_relative_return_20d(price_rows, benchmark_rows)
volume_ratio_5d = compute_volume_ratio_5d(price_rows)
kis_supplement = fetch_kis_supplement(code, kis_account)
short_inputs: dict[str, Any] = {}
if short_csv and short_csv.exists():
short_inputs = load_short_interest_csv(short_csv, code)
if "short_turnover_share" in kis_supplement:
short_inputs["short_turnover_share"] = kis_supplement["short_turnover_share"]
short_inputs.setdefault("relative_return_20d", relative_return_20d)
short_inputs.setdefault("volume_ratio_5d", volume_ratio_5d)
short_inputs.setdefault("earnings_outlook", earnings_outlook)
short_interest = compute_short_interest_composite(short_inputs)
sector_export_trend = None
if trade_csv and trade_csv.exists() and sector:
rows = load_trade_statistics_csv(trade_csv)
export_result = compute_sector_export_trend(rows, sector, compare="yoy")
if export_result.get("status") == "OK":
sector_export_trend = export_result["sector_export_trend"]
fundamental_trajectory = external_context.get("fundamental_trajectory")
if fundamental_trajectory is None and sector_export_trend is not None:
fundamental_trajectory = max(-1.0, min(1.0, -sector_export_trend / 15.0))
ctx: dict[str, Any] = {
"today": dt.date.today(),
"macro_pressure": external_context.get("macro_pressure"),
"fundamental_trajectory": fundamental_trajectory,
"short_interest_pressure": short_interest.get("short_interest_pressure"),
"microstructure_pressure": kis_supplement.get("microstructure_pressure", external_context.get("microstructure_pressure")),
"liquidity_rotation_risk": external_context.get("liquidity_rotation_risk"),
"earnings_outlook": earnings_outlook,
"next_earnings_date": _parse_date(external_context.get("next_earnings_date")),
"next_macro_event_date": _parse_date(external_context.get("next_macro_event_date")),
"macro_event_impact": external_context.get("macro_event_impact"),
"rate_trend": external_context.get("rate_trend"),
}
return {
"code": code,
"ctx": ctx,
"short_interest_composite": short_interest,
"sector_export_trend": sector_export_trend,
"relative_return_20d": relative_return_20d,
"volume_ratio_5d": volume_ratio_5d,
"kis_supplement": kis_supplement,
"price_source": price_bundle["source"],
"price_source_url": price.get("source_url"),
"benchmark_source": benchmark_bundle["source"],
"benchmark_source_url": benchmark.get("source_url"),
"generated_at": _kst_now_iso(),
}
def process_one(
ticker: str,
name: str,
benchmark_code: str,
sector: str | None,
earnings_outlook: str,
trade_csv: Path | None,
short_csv: Path | None,
workbook: Path | None,
context_json: Path | None,
kis_account: str | None = None,
) -> dict[str, Any]:
external_context: dict[str, Any] = {}
if context_json and context_json.exists():
external_context = json.loads(context_json.read_text(encoding="utf-8"))
elif workbook and workbook.exists():
from tools.build_macro_context_from_workbook_v1 import build_context_for_ticker
external_context = build_context_for_ticker(workbook, ticker, name)
assembled = build_ctx_for_ticker(
code=ticker,
benchmark_code=benchmark_code,
sector=sector,
earnings_outlook=earnings_outlook,
trade_csv=trade_csv,
short_csv=short_csv,
external_context=external_context,
kis_account=kis_account,
)
decision = compute_qualitative_sell_strategy(assembled["ctx"])
result = {**assembled, "decision": decision}
result["ctx"] = {k: (v.isoformat() if isinstance(v, dt.date) else v) for k, v in result["ctx"].items()}
return result
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--ticker", default=None, help="6자리 종목코드(단일 실행 시 필수)")
ap.add_argument("--name", default=None, help="실적발표 매칭용 종목명(한글)")
ap.add_argument("--benchmark-code", default="069500")
ap.add_argument("--sector", default=None, help="fetch_trade_statistics_motie_v1.SECTOR_HS_MAP 키")
ap.add_argument("--earnings-outlook", default="STABLE", choices=["IMPROVING", "STABLE", "DETERIORATING"])
ap.add_argument("--trade-csv", type=Path, default=None)
ap.add_argument("--short-csv", type=Path, default=None, help="KRX 공매도종합포털 수동 다운로드 CSV")
ap.add_argument("--context-json", type=Path, default=None, help="macro_pressure/rate_trend/이벤트일 등 외부 산출값 JSON(수동)")
ap.add_argument("--workbook", type=Path, default=None, help="GatherTradingData.xlsx — macro/event_risk/event_calendar 시트에서 컨텍스트 자동 추출(권장)")
ap.add_argument("--batch", action="store_true", help="--workbook의 account_snapshot 실보유 종목 전체 순회(국내 6자리 코드만)")
ap.add_argument("--kis-account", choices=["real", "mock"], default=None,
help="KIS Open API로 호가10단계/공매도거래비중 보강 조회(read-only). "
"공매도 일별추이는 real 도메인만 동작 확인됨(mock은 500 에러).")
ap.add_argument("--apply", action="store_true", help="outputs/qualitative_sell_strategy/<code>.json 저장")
ap.add_argument("--sqlite-db", type=Path, default=DEFAULT_SQLITE_DB,
help="JSON 저장과 병행해 시계열 SQLite에도 기록(GAS/xlsx와 무관한 추가 저장소)")
ap.add_argument("--store-backend", default="sqlite", help="Storage backend contract placeholder (sqlite today, postgresql planned)")
ap.add_argument("--store-location", default=None, help="Backend location/DSN. sqlite path or future postgres DSN.")
ap.add_argument("--no-sqlite", action="store_true", help="SQLite 기록 비활성화")
args = ap.parse_args()
store_db = resolve_store_path(
QualitativeSellStoreSpec(
backend=args.store_backend,
location=args.store_location or args.sqlite_db,
),
ROOT,
)
if args.batch:
if not args.workbook or not args.workbook.exists():
raise SystemExit("--batch는 --workbook 경로가 필요합니다")
from tools.build_macro_context_from_workbook_v1 import read_positions
positions = [p for p in read_positions(args.workbook) if str(p["ticker"]).isdigit() and len(str(p["ticker"])) == 6]
if args.apply:
DEFAULT_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
results = []
for pos in positions:
try:
result = process_one(
ticker=pos["ticker"], name=str(pos.get("name") or ""),
benchmark_code=args.benchmark_code, sector=args.sector,
earnings_outlook=args.earnings_outlook, trade_csv=args.trade_csv,
short_csv=args.short_csv, workbook=args.workbook, context_json=None,
kis_account=args.kis_account,
)
except Exception as exc: # noqa: BLE001 — 종목 1건 실패가 배치 전체를 막지 않음
result = {"code": pos["ticker"], "status": "FETCH_ERROR", "note": str(exc)}
results.append(result)
if args.apply:
out_path = DEFAULT_OUTPUT_DIR / f"{pos['ticker']}.json"
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
if not args.no_sqlite and result.get("status") != "FETCH_ERROR":
insert_sell_strategy_result(store_db, result)
error_count = sum(1 for r in results if r.get("status") == "FETCH_ERROR")
action_counts: dict[str, int] = {}
for r in results:
action = (r.get("decision") or {}).get("action", "N/A")
action_counts[action] = action_counts.get(action, 0) + 1
summary = {
"generated_at": _kst_now_iso(),
"ticker_count": len(results),
"error_count": error_count,
"action_counts": action_counts,
}
print(f"SUMMARY: {json.dumps(summary, ensure_ascii=False)}")
if args.apply:
(DEFAULT_OUTPUT_DIR / "_batch_summary.json").write_text(
json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f"written {len(results)} files to {DEFAULT_OUTPUT_DIR}")
else:
print(json.dumps(results, ensure_ascii=False, indent=2))
# 절반 이상 실패면 CI에서 빨간불로 보이도록 — 호출결과를 로그만으로 확인 가능하게 함
if results and error_count / len(results) >= 0.5:
print(f"BATCH_GATE: FAIL — error_count={error_count}/{len(results)}")
return 1
print("BATCH_GATE: PASS")
return 0
if not args.ticker:
raise SystemExit("--ticker 또는 --batch 중 하나는 필수입니다")
result = process_one(
ticker=args.ticker, name=args.name or "",
benchmark_code=args.benchmark_code, sector=args.sector,
earnings_outlook=args.earnings_outlook, trade_csv=args.trade_csv,
short_csv=args.short_csv, workbook=args.workbook, context_json=args.context_json,
kis_account=args.kis_account,
)
if args.apply:
DEFAULT_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
out_path = DEFAULT_OUTPUT_DIR / f"{args.ticker}.json"
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
if not args.no_sqlite:
insert_sell_strategy_result(store_db, result)
print(f"written: {out_path}")
else:
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())