diff --git a/.gitea/workflows/qualitative_sell_strategy.yml b/.gitea/workflows/qualitative_sell_strategy.yml new file mode 100644 index 0000000..2ceec01 --- /dev/null +++ b/.gitea/workflows/qualitative_sell_strategy.yml @@ -0,0 +1,84 @@ +name: Qualitative Sell Strategy (Read-Only, SQLite Canonical) + +on: + schedule: + - cron: "0 10 * * 1-5" # KST 19:00-ish daily post-close batch window (UTC 10:00) + workflow_dispatch: + +jobs: + evaluate-qualitative-sell: + runs-on: self-hosted + + steps: + - name: Checkout Code + run: | + if [ -d .git ]; then + git remote set-url origin http://x-access-token:${{ secrets.GITHUB_TOKEN }}@192.168.123.100:8418/KimJaeHyun/myfinance.git + else + git init + git remote add origin http://x-access-token:${{ secrets.GITHUB_TOKEN }}@192.168.123.100:8418/KimJaeHyun/myfinance.git + fi + git fetch origin main --depth=1 + git reset --hard FETCH_HEAD + + - name: Configure Runtime Paths + run: | + export PATH=/usr/local/bin:$PATH + echo "/usr/local/bin" >> $GITHUB_PATH + /usr/bin/python3 --version + + - name: Setup Python Environment + run: | + VENV_BASE=/volume1/gitea/python_venv + REQ_HASH=$(md5sum tools/build_qualitative_sell_inputs_v1.py 2>/dev/null | cut -d' ' -f1 || echo "qual-default") + VENV="$VENV_BASE/$REQ_HASH" + if [ ! -f "$VENV/bin/python" ]; then + mkdir -p "$VENV_BASE" + /usr/bin/python3 -m venv "$VENV" + "$VENV/bin/pip" install --upgrade pip --quiet + "$VENV/bin/pip" install requests beautifulsoup4 pyyaml openpyxl --quiet + fi + echo "$VENV/bin" >> $GITHUB_PATH + + - name: "[CRITICAL] No Direct API Trading Gate" + run: python3 tools/validate_no_direct_api_trading_v1.py + + - name: "[CRITICAL] Validate KIS API Credentials (mock)" + env: + KIS_APP_Key_TEST: ${{ secrets.KIS_APP_KEY_TEST }} + KIS_APP_Secret_TEST: ${{ secrets.KIS_APP_SECRET_TEST }} + run: python3 tools/validate_kis_api_credentials_v1.py --account mock --ticker 005930 + + - name: Build Qualitative Sell Inputs (batch) + env: + KIS_APP_Key: ${{ secrets.KIS_APP_KEY }} + KIS_APP_Secret: ${{ secrets.KIS_APP_SECRET }} + run: | + if [ -f GatherTradingData.xlsx ]; then + python3 tools/build_qualitative_sell_inputs_v1.py \ + --batch \ + --workbook GatherTradingData.xlsx \ + --kis-account real \ + --apply + else + echo "GatherTradingData.xlsx missing -> skip batch build" + fi + + - name: Build Satellite Recommendations + run: | + if [ -f GatherTradingData.xlsx ]; then + python3 tools/build_satellite_candidate_recommendations_v1.py \ + --workbook GatherTradingData.xlsx \ + --apply + else + echo "GatherTradingData.xlsx missing -> skip satellite build" + fi + + - name: Evaluate Qualitative Sell Accuracy + run: | + if [ -f outputs/qualitative_sell_strategy/qualitative_sell_strategy.db ]; then + python3 tools/evaluate_qualitative_sell_strategy_accuracy_v1.py \ + --sqlite-db outputs/qualitative_sell_strategy/qualitative_sell_strategy.db + else + echo "qualitative_sell_strategy.db missing -> skip accuracy evaluation" + fi diff --git a/spec/exit/qualitative_sell_strategy_v1.yaml b/spec/exit/qualitative_sell_strategy_v1.yaml new file mode 100644 index 0000000..0740bde --- /dev/null +++ b/spec/exit/qualitative_sell_strategy_v1.yaml @@ -0,0 +1,153 @@ +meta: + title: "은퇴자산포트폴리오 — 비기계적 매도전략(가치보존) 명세" + parent_file: "RetirementAssetPortfolio.yaml" + version: "2026-06-21-PHASE8_qualitative_sell" + language: "ko-KR" + timezone: "Asia/Seoul" + role: "canonical" + has_code_implementation: true + code_path: "src/quant_engine/qualitative_sell_strategy_v1.py" + purpose: > + 익절/손절을 고정 % 임계값으로 기계적으로 트리거하지 않고, 매크로·실적·펀더멘털· + 공매도수급·호가 미시구조·대내외 변수(대형 IPO·섹터 로테이션) 5개 독립 팩터군의 + 합의(confluence)로 매도/보유/추가 확신도를 산출해 주식가치를 최대치로 보존한다. + 현금부족 사유는 입력에서 의도적으로 배제한다. + +qualitative_sell_strategy: + policy: + execution: "보유 포지션 검토 시 항상 실행. STOP_PRICE_CORE_V1/PROFIT_RATCHET_TIERED_V2 등 + 기존 기계적 손절/래칫 라인과 병행 — 이 명세가 그것들을 대체하지 않으며, '서두르지 않는 + 재량적 정리' 판단을 보강한다." + confluence_rule: "5개 팩터군 중 최소 3개가 동일 방향(+/-)으로 합의해야 행동 생성. 단일 + 팩터의 임계값 돌파만으로 매도 트리거 금지." + cash_shortfall_exclusion: "현금부족·리밸런싱 강제매도 사유는 이 명세의 입력에서 제외한다. + 해당 사유의 매도는 spec/exit/value_preserving_cash_raise_optimizer_v7.yaml 책임." + date_basis: "review_window는 실제 실적발표일·고영향 매크로 이벤트일(spec/strategy/ + macro_event_synchronizer_v2.yaml:event_hold_gate)에서 역산한다. 임의 고정일 금지." + + factor_families: + macro_pressure: + id: "F1" + formula_ref: "spec/strategy/macro_event_synchronizer_v2.yaml:position_size_scale_formula" + sources: ["macro_risk_score", "FX", "금리", "산업통상부 수출입동향(섹터별)"] + note: "수출입 동향으로 섹터별 실적 선행지표를 추정해 가중." + + fundamental_trajectory: + id: "F2" + formula_ref: "spec/strategy/fundamental_quality_v3.yaml" + sources: ["EPS 추정치 변화", "영업이익률 추세", "실적발표 컨센서스 서프라이즈"] + + short_interest_pressure: + id: "F3" + formula_ref: "spec/13b_harness_formulas.yaml:formula_registry.formulas.SHORT_INTEREST_RISK_GAUGE_V1" + sources: ["공매도잔고율 추세", "공매도거래비중", "상대수익률", "거래량 이상", "실적전망"] + note: > + 잔고율은 '매수/매도 버튼'이 아니라 위험계기판. 잔고율이 낮은 종목(예: 현대로템형, + <1%)은 잔고율 자체보다 거래비중·상대수익률을 더 중요하게 본다. + + microstructure_pressure: + id: "F4" + sources: ["호가 10단계 매수/매도 잔량 불균형", "체결강도", "스프레드"] + note: "전략적 방향 결정에는 쓰지 않고 confluence가 SELL/ADD로 합의된 이후의 + '집행 타이밍'에만 사용 — execution_window 산정 보조." + + liquidity_rotation_risk: + id: "F5" + sources: ["대형 IPO 청약/상장에 따른 섹터 자금 이탈", "동일 섹터 로테이션", + "외국인/기관 섹터 비중 변화"] + + output: + formula_ref: "spec/13b_harness_formulas.yaml:formula_registry.formulas.QUALITATIVE_SELL_STRATEGY_V1" + python_tool: "src/quant_engine/qualitative_sell_strategy_v1.py:compute_qualitative_sell_strategy" + actions: + EXIT_REVIEW_FULL: "4-5개 팩터군 매도방향 합의 + composite_score>=0.6 — 전량 정리 검토" + TRIM_REVIEW_PARTIAL: "3개 이상 팩터군 매도방향 합의, composite_score<0.6 — 부분 정리 검토" + HOLD_ADD_CONVICTION: "3개 이상 팩터군 지지방향 합의 — 보유/추가 확신" + HOLD_NO_CONFLUENCE: "합의 미달 — 보유, 관찰 지속" + INSUFFICIENT_DATA_NO_ACTION: "confluence 판정에 필요한 최소 데이터 부족 — 추정 금지" + + market_regime: + formula_ref: "spec/13b_harness_formulas.yaml:formula_registry.formulas.MARKET_REGIME_CLASSIFIER_V1" + rule: "금리 상승기(RISING)=PERFORMANCE_MARKET(실적장세) — fundamental_trajectory 가중 상향. + 금리 보합/하락기(FLAT/FALLING)=TECHNICAL_MARKET(기술장세) — short_interest_pressure/ + microstructure_pressure 가중 상향. confluence 합의건수 판정 자체는 가중치와 무관 — + composite_score(행동 강도)에만 영향." + + satellite_candidate_score: + formula_ref: "spec/13b_harness_formulas.yaml:formula_registry.formulas.SATELLITE_CANDIDATE_SCORE_V1" + purpose: "미보유 위성 유니버스 종목의 BUY_CANDIDATE/WATCH/AVOID 사전 평가. sector_export_trend + (관세청/산업통상부 수출입동향)·fundamental_trajectory·relative_return_20d를 market_regime별 + 가중치로 종합." + + data_sources: + note: "2026-06-21 세션 실측 결과. investing.com 직접 스크래핑은 403(Cloudflare) 차단 확인 — + 자동 수집 경로로 채택하지 않는다." + relative_return_20d: + tool: "tools/fetch_naver_market_data_v1.py:compute_relative_return_20d" + source: "finance.naver.com/item/sise_day.naver (무인증, 동작 확인)" + status: "WORKING" + volume_ratio_5d: + tool: "tools/fetch_naver_market_data_v1.py:compute_volume_ratio_5d" + source: "finance.naver.com/item/sise_day.naver" + status: "WORKING" + foreign_institution_flow: + tool: "tools/fetch_naver_market_data_v1.py:fetch_foreign_institution_flow" + source: "finance.naver.com/item/frgn.naver (GAS gdc_01_fetch_fundamentals.gs와 동일 소스 — + 보유종목은 기존 GAS 수집 결과 재사용 권장, 위성 후보군만 직접 호출)" + status: "WORKING" + sector_export_trend: + tool: "tools/fetch_trade_statistics_motie_v1.py:compute_sector_export_trend" + source: "관세청/산업통상부 수출입통계 — 1차: --csv 수동 다운로드 경로(안정적, 권장). + 2차: data.go.kr OpenAPI(CUSTOMS_API_KEY 필요, 미설정 시 DATA_MISSING)." + status: "CSV_PATH_WORKING / API_PATH_NEEDS_KEY" + short_balance_ratio: + source: "KRX 공매도종합포털(open.krx.co.kr/contents/SRT) — 직접 API 호출은 OTP 세션 필요, + LOGOUT 응답으로 차단 확인. KIS Open API도 잔고율(보유 포지션 개념)은 제공하지 않음 + (실측 확인, 2026-06-21). 수동 다운로드 CSV(--short-csv)로만 안정 확보 — 자동화 + 재시도 불필요(차단 확정)." + status: "MANUAL_CSV_ONLY" + short_turnover_share: + source: "[2026-06-21 해결] KIS Open API daily-short-sale(FHPST04830000, + /uapi/domestic-stock/v1/quotations/daily-short-sale) output2.ssts_vol_rlim — + 실전계좌 도메인(--kis-account real)에서 실측 동작 확인. 모의계좌 도메인은 + 500 에러(미지원). Naver는 KRX iframe 위임으로 값 없음(폐기)." + tool: "tools/build_qualitative_sell_inputs_v1.py:fetch_kis_supplement" + status: "KIS_API_WORKING (real account only)" + microstructure_pressure_10_level_orderbook: + source: "[2026-06-21 해결] KIS Open API inquire-asking-price-exp-ccn(FHKST01010200, + /uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn) output1 — + 실전+모의계좌 도메인 모두 실측 동작 확인. 필드명: askp1~10/bidp1~10/ + askp_rsqn1~10/bidp_rsqn1~10/total_askp_rsqn/total_bidp_rsqn(전부 소문자, + 실측 확인). 전략 방향 결정에는 쓰지 않고 confluence 성립 후 집행 타이밍 + 보조로만 사용(factor_families.microstructure_pressure 참조)." + tool: "src/quant_engine/qualitative_sell_strategy_v1.py:compute_microstructure_pressure_from_orderbook" + status: "KIS_API_WORKING" + investor_trend_official: + source: "[참고, 미연동] KIS Open API inquire-investor(FHKST01010900) — + 개인/외국인/기관 순매수수량(prsn_ntby_qty/frgn_ntby_qty/orgn_ntby_qty) 등 실측 + 확인. Naver frgn.naver 스크래핑을 대체할 수 있는 공식 소스이나 아직 미연동 + (기존 GAS 수급 피드와 중복 — 필요 시 후속 작업)." + status: "VERIFIED_NOT_WIRED" + kis_open_api_constraints: + note: "[CRITICAL] governance/rules/06_no_direct_api_trading.yaml(주문 미실행), + governance/rules/07_no_kis_account_balance_query.yaml(계좌 보유종목 조회 금지) — + KIS API는 시장 전체 공개 데이터(시세/호가/공매도/투자자동향) 조회에만 사용. + CI 강제 게이트: tools/validate_no_direct_api_trading_v1.py(strict, warn_only 불가)." + macro_pressure / rate_trend / next_earnings_date / next_macro_event_date / macro_event_impact: + source: "기존 GAS 하네스(macro_event_synchronizer_v2, gas_event_calendar.gs)가 이미 + 산출/수집 — 중복 수집 금지, --context-json으로 그 결과를 주입." + status: "REUSE_EXISTING_HARNESS" + + orchestrator: + tool: "tools/build_qualitative_sell_inputs_v1.py" + purpose: "위 출처들을 종목별 ctx로 조립해 QUALITATIVE_SELL_STRATEGY_V1을 호출하고 + outputs/qualitative_sell_strategy/.json에 기록한다. --batch --workbook으로 + account_snapshot 실보유 종목 전체 일괄 처리." + + satellite_orchestrator: + tool: "tools/build_satellite_candidate_recommendations_v1.py" + purpose: "universe 시트(미보유 위성 유니버스)에서 보유종목을 제외한 후보 전체를 + SATELLITE_CANDIDATE_SCORE_V1로 평가해 outputs/qualitative_sell_strategy/ + satellite_recommendations.json에 기록한다. universe.Sector 한글 라벨은 부분 + 문자열 매칭으로 SECTOR_HS_MAP에 연결 — 매칭 실패 시 sector_export_trend를 + 추정하지 않고 None 유지(추정 금지 원칙)." diff --git a/src/quant_engine/qualitative_sell_strategy_store_v1.py b/src/quant_engine/qualitative_sell_strategy_store_v1.py new file mode 100644 index 0000000..5e1e14a --- /dev/null +++ b/src/quant_engine/qualitative_sell_strategy_store_v1.py @@ -0,0 +1,146 @@ +"""qualitative_sell_strategy_v1 산출물의 SQLite 시계열 저장소. + +GAS/xlsx 구조와 완전히 분리된 추가(additive) 저장소다 — 이 모듈이 다루는 데이터는 +순수 Python 산출물(KIS API 수집 + confluence 판단 결과)이며, GAS가 쓰지도 읽지도 +않고 사람이 시트에서 직접 편집하지도 않는다. 기존 outputs/qualitative_sell_strategy/ +*.json 파일 출력을 대체하지 않고 병행 저장한다(JSON은 1회성 점검용, SQLite는 시계열 +추이 조회용). 표준 라이브러리 sqlite3만 사용 — 추가 의존성 없음. +""" +from __future__ import annotations + +import json +import sqlite3 +from pathlib import Path +from dataclasses import dataclass +from typing import Any + +from src.quant_engine.storage_backend_v1 import StoreSpec, default_sqlite_store_path, normalize_store_spec + +SCHEMA = """ +CREATE TABLE IF NOT EXISTS sell_strategy_results ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL, + generated_at TEXT NOT NULL, + action TEXT, + conviction TEXT, + market_regime TEXT, + composite_score REAL, + rationale TEXT, + raw_json TEXT NOT NULL, + inserted_at TEXT DEFAULT (datetime('now')) +); +CREATE INDEX IF NOT EXISTS idx_sell_strategy_code_time + ON sell_strategy_results(code, generated_at); + +CREATE TABLE IF NOT EXISTS satellite_recommendations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ticker TEXT NOT NULL, + generated_at TEXT NOT NULL, + satellite_action TEXT, + attractiveness_score REAL, + market_regime TEXT, + raw_json TEXT NOT NULL, + inserted_at TEXT DEFAULT (datetime('now')) +); +CREATE INDEX IF NOT EXISTS idx_satellite_ticker_time + ON satellite_recommendations(ticker, generated_at); +""" + + +@dataclass(frozen=True) +class QualitativeSellStoreSpec(StoreSpec): + pass + + +def default_qualitative_sell_store_path(root: Path) -> Path: + return default_sqlite_store_path(root, "qualitative_sell_strategy/qualitative_sell_strategy.db") + + +def resolve_store_path(spec: QualitativeSellStoreSpec, root: Path) -> Path: + backend, location = normalize_store_spec( + spec, + root, + default_sqlite_name="qualitative_sell_strategy/qualitative_sell_strategy.db", + ) + if backend != "sqlite": + raise ValueError( + "qualitative_sell_strategy_store_v1 currently executes on sqlite only; " + "the caller contract already allows future PostgreSQL swap-in." + ) + return Path(location) + + +def init_db(db_path: Path) -> None: + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(db_path) + try: + conn.executescript(SCHEMA) + conn.commit() + finally: + conn.close() + + +def insert_sell_strategy_result(db_path: Path, result: dict[str, Any]) -> None: + """build_qualitative_sell_inputs_v1.process_one()의 반환값(dict)을 그대로 받는다.""" + init_db(db_path) + decision = result.get("decision") or {} + conn = sqlite3.connect(db_path) + try: + conn.execute( + "INSERT INTO sell_strategy_results " + "(code, generated_at, action, conviction, market_regime, composite_score, rationale, raw_json) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ( + result.get("code"), + result.get("generated_at"), + decision.get("action"), + decision.get("conviction"), + decision.get("market_regime"), + decision.get("composite_score"), + decision.get("rationale"), + json.dumps(result, ensure_ascii=False, default=str), + ), + ) + conn.commit() + finally: + conn.close() + + +def insert_satellite_recommendation(db_path: Path, generated_at: str, candidate: dict[str, Any]) -> None: + """build_satellite_candidate_recommendations_v1.py results[i] 항목 하나를 받는다.""" + init_db(db_path) + score = candidate.get("score") or {} + conn = sqlite3.connect(db_path) + try: + conn.execute( + "INSERT INTO satellite_recommendations " + "(ticker, generated_at, satellite_action, attractiveness_score, market_regime, raw_json) " + "VALUES (?, ?, ?, ?, ?, ?)", + ( + candidate.get("ticker"), + generated_at, + score.get("satellite_action"), + score.get("attractiveness_score"), + score.get("market_regime"), + json.dumps(candidate, ensure_ascii=False, default=str), + ), + ) + conn.commit() + finally: + conn.close() + + +def fetch_recent_sell_strategy_results(db_path: Path, code: str, limit: int = 20) -> list[dict[str, Any]]: + if not db_path.exists(): + return [] + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + "SELECT code, generated_at, action, conviction, market_regime, composite_score, rationale " + "FROM sell_strategy_results WHERE code = ? ORDER BY generated_at DESC LIMIT ?", + (code, limit), + ).fetchall() + return [dict(row) for row in rows] + finally: + conn.close() diff --git a/src/quant_engine/qualitative_sell_strategy_v1.py b/src/quant_engine/qualitative_sell_strategy_v1.py new file mode 100644 index 0000000..67858de --- /dev/null +++ b/src/quant_engine/qualitative_sell_strategy_v1.py @@ -0,0 +1,377 @@ +from __future__ import annotations + +import math +from datetime import date, timedelta +from typing import Any + +# 매도 결정에 동원하는 5개 독립 팩터군. 단일 팩터의 임계값 돌파만으로는 행동을 +# 트리거하지 않는다 — 최소 CONFLUENCE_MIN개 팩터군이 동일 방향으로 합의해야 +# SELL/ADD 확신도가 성립한다. (기계적 단일 트리거 매도 금지 원칙) +FACTOR_FAMILIES: tuple[str, ...] = ( + "macro_pressure", + "fundamental_trajectory", + "short_interest_pressure", + "microstructure_pressure", + "liquidity_rotation_risk", +) +CONFLUENCE_MIN = 3 +EVENT_PRE_GUARD_DAYS = 5 # macro_event_synchronizer_v2.event_hold_gate와 동일 — HIGH 이벤트 5일 전 +EVENT_POST_GUARD_DAYS = 2 # 이벤트 후 2일 변동성 소화 구간 + +# 금리국면별 시장 성격: 금리 상승기=실적장세(펀더멘털/수출입 실적이 가격을 주도), +# 금리 보합·하락기=기술장세(수급·미시구조가 가격을 주도). 동일한 5팩터라도 +# 국면에 따라 가중치를 달리 줘야 confluence가 의미를 갖는다. +REGIME_FLAT_WEIGHTS: dict[str, float] = {family: 1.0 for family in FACTOR_FAMILIES} +REGIME_WEIGHT_TABLE: dict[str, dict[str, float]] = { + "PERFORMANCE_MARKET": { # 금리 상승기 — 실적/수출입 펀더멘털 가중 상향 + "macro_pressure": 1.2, + "fundamental_trajectory": 1.8, + "short_interest_pressure": 1.0, + "microstructure_pressure": 0.5, + "liquidity_rotation_risk": 1.0, + }, + "TECHNICAL_MARKET": { # 금리 보합·하락기 — 수급/미시구조 가중 상향 + "macro_pressure": 0.8, + "fundamental_trajectory": 0.8, + "short_interest_pressure": 1.3, + "microstructure_pressure": 1.6, + "liquidity_rotation_risk": 1.3, + }, + "NEUTRAL": REGIME_FLAT_WEIGHTS, +} + + +def classify_market_regime(rate_trend: str | None) -> str: + """금리 추세 문자열(RISING/FLAT/FALLING)을 실적장세/기술장세로 분류. + + RISING → PERFORMANCE_MARKET(실적장세): 금리 상승기엔 유동성보다 실적/펀더멘털이 + 가격을 결정. FLAT/FALLING → TECHNICAL_MARKET(기술장세): 유동성이 풍부해 수급· + 미시구조·테마성 모멘텀이 가격을 주도. 입력 결측 시 NEUTRAL(가중치 변화 없음). + """ + trend = str(rate_trend or "").upper() + if trend == "RISING": + return "PERFORMANCE_MARKET" + if trend in {"FLAT", "FALLING"}: + return "TECHNICAL_MARKET" + return "NEUTRAL" + + +def _finite(value: Any) -> bool: + return isinstance(value, (int, float)) and math.isfinite(float(value)) + + +def compute_short_interest_composite(ctx: dict[str, Any]) -> dict[str, Any]: + """SHORT_INTEREST_RISK_GAUGE_V1. + + 5요소: 공매도잔고율 변화, 공매도거래비중, 상대수익률(섹터/지수 대비), + 거래량 이상, 실적전망. 잔고율 단독으로는 매도 근거가 약함(현대로템형) — + 잔고율이 낮을 때는 거래비중·상대수익률 가중치를 자동 상향한다. + """ + missing: list[str] = [] + + short_balance_ratio = ctx.get("short_balance_ratio") # %, 현재 잔고율 + short_balance_ratio_chg_20d = ctx.get("short_balance_ratio_chg_20d") # %p, 20일 변화 + short_turnover_share = ctx.get("short_turnover_share") # 당일 거래 중 공매도 비중 % + relative_return_20d = ctx.get("relative_return_20d") # 종목수익률 - 섹터(or지수)수익률, %p + volume_ratio_5d = ctx.get("volume_ratio_5d") # 5일평균거래량 대비 비율 + earnings_outlook = str(ctx.get("earnings_outlook") or "").upper() # IMPROVING|STABLE|DETERIORATING|UNKNOWN + + for name, value in ( + ("short_balance_ratio", short_balance_ratio), + ("short_turnover_share", short_turnover_share), + ("relative_return_20d", relative_return_20d), + ): + if not _finite(value): + missing.append(name) + + if missing: + return { + "short_interest_pressure": None, + "status": "DATA_MISSING", + "missing_inputs": missing, + "note": "잔고율/거래비중/상대수익률 중 결측 — 공매도 합성 점수를 산출하지 않음(추정 금지)", + } + + low_balance_regime = float(short_balance_ratio) < 1.0 # 잔고율 1% 미만이면 '낮은 잔고율' 취급(현대로템형) + + # 잔고율 추세: 상승=매도근거 강화, 하락=매도근거 약화(혹은 매수근거) + balance_trend_signal = 0.0 + if _finite(short_balance_ratio_chg_20d): + balance_trend_signal = max(-1.0, min(1.0, float(short_balance_ratio_chg_20d) / 1.5)) + + turnover_signal = max(-1.0, min(1.0, (float(short_turnover_share) - 8.0) / 12.0)) # 8% 기준선 + relative_return_signal = max(-1.0, min(1.0, -float(relative_return_20d) / 10.0)) # 상대 약세일수록 + + volume_signal = 0.0 + if _finite(volume_ratio_5d): + volume_signal = max(-1.0, min(1.0, (float(volume_ratio_5d) - 1.0))) + + outlook_signal = { + "IMPROVING": -0.6, + "STABLE": 0.0, + "DETERIORATING": 0.7, + }.get(earnings_outlook, 0.0) + + if low_balance_regime: + # 잔고율 자체는 약한 근거 — 거래비중·상대수익률 가중치 상향, 잔고율추세 가중치 하향 + weights = {"balance": 0.10, "turnover": 0.30, "relative": 0.30, "volume": 0.10, "outlook": 0.20} + else: + weights = {"balance": 0.30, "turnover": 0.20, "relative": 0.20, "volume": 0.10, "outlook": 0.20} + + pressure = ( + balance_trend_signal * weights["balance"] + + turnover_signal * weights["turnover"] + + relative_return_signal * weights["relative"] + + volume_signal * weights["volume"] + + outlook_signal * weights["outlook"] + ) + pressure = max(-1.0, min(1.0, pressure)) + + label = "ELEVATED_SHORT_PRESSURE" if pressure >= 0.5 else "WATCH" if pressure >= 0.2 else \ + "SHORT_COVERING_SUPPORTIVE" if pressure <= -0.5 else "NEUTRAL" + + return { + "short_interest_pressure": round(pressure, 4), + "status": "OK", + "low_balance_regime": low_balance_regime, + "label": label, + "components": { + "balance_trend_signal": round(balance_trend_signal, 4), + "turnover_signal": round(turnover_signal, 4), + "relative_return_signal": round(relative_return_signal, 4), + "volume_signal": round(volume_signal, 4), + "outlook_signal": outlook_signal, + }, + "weights_used": weights, + } + + +def compute_microstructure_pressure_from_orderbook(orderbook_output1: dict[str, Any]) -> dict[str, Any]: + """MICROSTRUCTURE_PRESSURE_FROM_ORDERBOOK_V1. + + KIS Open API FHKST01010200(주식현재가 호가/예상체결) output1의 10단계 호가 잔량을 + -1(매수우위/지지)~+1(매도우위/압력)로 계량화. 실측 확인된 필드명(2026-06-21, + 005930 라이브 호출): total_askp_rsqn, total_bidp_rsqn(10단계 합계 잔량). + 이 점수는 전략 방향 결정에는 쓰지 않고 confluence가 성립한 이후의 '집행 타이밍' + 보조로만 사용한다(spec/exit/qualitative_sell_strategy_v1.yaml:factor_families. + microstructure_pressure 참조). + """ + total_askp = orderbook_output1.get("total_askp_rsqn") + total_bidp = orderbook_output1.get("total_bidp_rsqn") + try: + total_askp = float(total_askp) + total_bidp = float(total_bidp) + except (TypeError, ValueError): + return {"microstructure_pressure": None, "status": "DATA_MISSING"} + + denom = total_askp + total_bidp + if denom <= 0: + return {"microstructure_pressure": None, "status": "DATA_MISSING"} + + pressure = max(-1.0, min(1.0, (total_askp - total_bidp) / denom)) + return { + "microstructure_pressure": round(pressure, 4), + "status": "OK", + "total_askp_rsqn": total_askp, + "total_bidp_rsqn": total_bidp, + } + + +def _event_review_window( + today: date, + pressure_sign: int, + next_earnings_date: date | None, + next_macro_event_date: date | None, + macro_event_impact: str | None, + earnings_outlook: str, +) -> dict[str, Any]: + """캘린더 기반 검토 구간 산출 — 임의 날짜 고정이 아니라 실제 이벤트 일정에서 역산.""" + candidates: list[tuple[date, str]] = [] + + if next_earnings_date is not None: + if pressure_sign < 0 and earnings_outlook == "DETERIORATING": + # 실적 악화 전망 + 매도압력 → 실적발표 전 정리(서프라이즈 리스크 회피) + candidates.append((next_earnings_date - timedelta(days=EVENT_PRE_GUARD_DAYS), "PRE_EARNINGS_EXIT_BEFORE_SURPRISE_RISK")) + elif pressure_sign < 0 and earnings_outlook in {"IMPROVING", "STABLE"}: + # 단기 기술적 매도압력이지만 실적전망은 양호 → 발표 직전 매도는 가치훼손, 발표 이후로 연기 + candidates.append((next_earnings_date + timedelta(days=EVENT_POST_GUARD_DAYS), "DEFER_TO_POST_EARNINGS_AVOID_PREMATURE_EXIT")) + elif pressure_sign > 0: + # 추가매수/보유 신호 — 발표 변동성 통과 후 확신 재평가 + candidates.append((next_earnings_date + timedelta(days=EVENT_POST_GUARD_DAYS), "REASSESS_AFTER_EARNINGS_CONFIRM")) + + if next_macro_event_date is not None and str(macro_event_impact or "").upper() in {"HIGH", "VERY_HIGH"}: + if pressure_sign < 0: + candidates.append((next_macro_event_date - timedelta(days=EVENT_PRE_GUARD_DAYS), "PRE_MACRO_EVENT_DERISK")) + else: + candidates.append((next_macro_event_date + timedelta(days=EVENT_POST_GUARD_DAYS), "POST_MACRO_EVENT_CONFIRM")) + + if not candidates: + return { + "review_window_start": today.isoformat(), + "review_window_end": (today + timedelta(days=10)).isoformat(), + "window_basis": "NO_SCHEDULED_EVENT_DEFAULT_10D_REVIEW", + } + + earliest = min(candidates, key=lambda item: item[0]) + window_start = max(today, earliest[0] - timedelta(days=2)) + window_end = earliest[0] + timedelta(days=2) + return { + "review_window_start": window_start.isoformat(), + "review_window_end": window_end.isoformat(), + "window_basis": earliest[1], + } + + +def compute_qualitative_sell_strategy(ctx: dict[str, Any]) -> dict[str, Any]: + """QUALITATIVE_SELL_STRATEGY_V1. + + 매크로/실적/펀더멘털/공매도수급/호가미시구조/대내외(IPO·로테이션) 5개 + 독립 팩터군의 합의(confluence)로만 행동을 생성한다. 현금부족 사유는 + 입력에서 의도적으로 배제(cash_shortfall_excluded=True) — 가치보존이 + 유일한 목적 함수. + """ + today_raw = ctx.get("today") + today = today_raw if isinstance(today_raw, date) else date.today() + + factor_values: dict[str, float | None] = {} + missing_factors: list[str] = [] + for family in FACTOR_FAMILIES: + value = ctx.get(family) + if _finite(value): + factor_values[family] = max(-1.0, min(1.0, float(value))) + else: + factor_values[family] = None + missing_factors.append(family) + + available = {k: v for k, v in factor_values.items() if v is not None} + if len(available) < CONFLUENCE_MIN: + return { + "action": "INSUFFICIENT_DATA_NO_ACTION", + "conviction": "NONE", + "available_factors": list(available.keys()), + "missing_factors": missing_factors, + "rationale": "5개 팩터군 중 confluence 판정에 필요한 최소 데이터가 부족 — 추정으로 행동 생성 금지", + "cash_shortfall_excluded": True, + "mechanical_sell_prohibited": True, + } + + # 부호 규약: 모든 팩터군은 +1(매도압력 최대) ~ -1(보유/추가 지지 최대) 동일 스케일. + # short_interest_pressure도 동일 — ELEVATED_SHORT_PRESSURE(+) / SHORT_COVERING_SUPPORTIVE(-). + # confluence 합의 카운트는 국면 가중치와 무관하게 원시 방향성으로만 판정한다 + # (가중치는 행동 '강도'에만 영향 — 합의 성립 여부 자체를 왜곡하지 않는다). + sell_agree = [k for k, v in available.items() if v >= 0.30] + hold_add_agree = [k for k, v in available.items() if v <= -0.30] + + market_regime = classify_market_regime(ctx.get("rate_trend")) if "market_regime" not in ctx else str(ctx.get("market_regime") or "NEUTRAL").upper() + regime_weights = REGIME_WEIGHT_TABLE.get(market_regime, REGIME_FLAT_WEIGHTS) + weighted_sum = sum(available[k] * regime_weights.get(k, 1.0) for k in available) + weight_total = sum(regime_weights.get(k, 1.0) for k in available) + composite_score = weighted_sum / weight_total if weight_total else 0.0 + + earnings_outlook = str(ctx.get("earnings_outlook") or "STABLE").upper() + next_earnings_date = ctx.get("next_earnings_date") if isinstance(ctx.get("next_earnings_date"), date) else None + next_macro_event_date = ctx.get("next_macro_event_date") if isinstance(ctx.get("next_macro_event_date"), date) else None + macro_event_impact = ctx.get("macro_event_impact") + + if len(sell_agree) >= CONFLUENCE_MIN: + conviction = "HIGH" if len(sell_agree) >= 4 else "MEDIUM" + action = "EXIT_REVIEW_FULL" if composite_score >= 0.6 else "TRIM_REVIEW_PARTIAL" + pressure_sign = -1 + rationale = f"매도압력 합의({len(sell_agree)}/{len(available)} 팩터군 매도방향 합치): " + ", ".join(sell_agree) + elif len(hold_add_agree) >= CONFLUENCE_MIN: + conviction = "HIGH" if len(hold_add_agree) >= 4 else "MEDIUM" + action = "HOLD_ADD_CONVICTION" + pressure_sign = 1 + rationale = f"보유/추가 근거 합의({len(hold_add_agree)}/{len(available)} 팩터군 지지방향 합치): " + ", ".join(hold_add_agree) + else: + conviction = "LOW" + action = "HOLD_NO_CONFLUENCE" + pressure_sign = 0 + rationale = "팩터군 간 합의 미달 — 단일/소수 팩터의 임계값 돌파만으로는 매도 트리거 금지" + + window = _event_review_window( + today=today, + pressure_sign=pressure_sign, + next_earnings_date=next_earnings_date, + next_macro_event_date=next_macro_event_date, + macro_event_impact=macro_event_impact, + earnings_outlook=earnings_outlook, + ) if pressure_sign != 0 else None + + return { + "action": action, + "conviction": conviction, + "market_regime": market_regime, + "composite_score": round(composite_score, 4), + "sell_agreeing_factors": sell_agree, + "hold_add_agreeing_factors": hold_add_agree, + "missing_factors": missing_factors, + "review_window": window, + "rationale": rationale, + "cash_shortfall_excluded": True, + "mechanical_sell_prohibited": True, + } + + +def compute_satellite_candidate_score(ctx: dict[str, Any]) -> dict[str, Any]: + """SATELLITE_CANDIDATE_SCORE_V1. + + 미보유 유니버스 종목을 섹터 수출입 전망(sector_export_trend) + 펀더멘털 + 추세 + 국면적합도로 평가해 WATCH/BUY_CANDIDATE/AVOID를 산출한다. 보유종목 + 매도판단(compute_qualitative_sell_strategy)과 동일한 부호 규약을 쓰지 않고 + 별도 -1(약세)~+1(강세) 매력도 스케일을 쓴다 — 매수후보 평가와 매도판단은 + 목적함수가 다르므로 동일 점수를 재사용하지 않는다. + """ + sector_export_trend = ctx.get("sector_export_trend") # %, 섹터 수출 YoY/MoM 추세 + fundamental_trajectory = ctx.get("fundamental_trajectory") # -1(악화)~+1(개선), 매도엔진과 동일 정의역이나 부호 반대 해석 주의 + relative_return_20d = ctx.get("relative_return_20d") + market_regime = str(ctx.get("market_regime") or classify_market_regime(ctx.get("rate_trend"))).upper() + + missing = [name for name, value in ( + ("sector_export_trend", sector_export_trend), + ("fundamental_trajectory", fundamental_trajectory), + ) if not _finite(value)] + if missing: + return { + "satellite_action": "INSUFFICIENT_DATA_NO_ACTION", + "missing_inputs": missing, + "market_regime": market_regime, + } + + export_signal = max(-1.0, min(1.0, float(sector_export_trend) / 10.0)) + fundamental_signal = max(-1.0, min(1.0, -float(fundamental_trajectory))) # 매도엔진 부호(+)=악화 -> 매력도는 반전 + relative_signal = max(-1.0, min(1.0, float(relative_return_20d) / 10.0)) if _finite(relative_return_20d) else 0.0 + + if market_regime == "PERFORMANCE_MARKET": + weights = {"export": 0.45, "fundamental": 0.40, "relative": 0.15} + elif market_regime == "TECHNICAL_MARKET": + weights = {"export": 0.20, "fundamental": 0.25, "relative": 0.55} + else: + weights = {"export": 0.34, "fundamental": 0.33, "relative": 0.33} + + attractiveness = ( + export_signal * weights["export"] + + fundamental_signal * weights["fundamental"] + + relative_signal * weights["relative"] + ) + attractiveness = max(-1.0, min(1.0, attractiveness)) + + if attractiveness >= 0.5: + satellite_action = "BUY_CANDIDATE" + elif attractiveness >= 0.2: + satellite_action = "WATCH" + elif attractiveness <= -0.4: + satellite_action = "AVOID" + else: + satellite_action = "NEUTRAL_NO_EDGE" + + return { + "satellite_action": satellite_action, + "attractiveness_score": round(attractiveness, 4), + "market_regime": market_regime, + "components": { + "export_signal": round(export_signal, 4), + "fundamental_signal": round(fundamental_signal, 4), + "relative_signal": round(relative_signal, 4), + }, + "weights_used": weights, + } diff --git a/tests/unit/test_evaluate_qualitative_sell_strategy_accuracy_v1.py b/tests/unit/test_evaluate_qualitative_sell_strategy_accuracy_v1.py new file mode 100644 index 0000000..c805401 --- /dev/null +++ b/tests/unit/test_evaluate_qualitative_sell_strategy_accuracy_v1.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from tools.evaluate_qualitative_sell_strategy_accuracy_v1 import ( + _scoreable_direction, + build_accuracy_report, + evaluate_decision, +) +from src.quant_engine.qualitative_sell_strategy_store_v1 import insert_sell_strategy_result + + +def test_scoreable_direction(): + assert _scoreable_direction("EXIT_REVIEW_FULL") == -1 + assert _scoreable_direction("TRIM_REVIEW_PARTIAL") == -1 + assert _scoreable_direction("HOLD_ADD_CONVICTION") == 1 + assert _scoreable_direction("HOLD_NO_CONFLUENCE") is None + assert _scoreable_direction("INSUFFICIENT_DATA_NO_ACTION") is None + + +def test_evaluate_decision_sell_success_when_price_drops(): + decision = {"action": "EXIT_REVIEW_FULL"} + result = evaluate_decision(decision, price_at_decision=100.0, price_after=90.0) + assert result["success"] is True + assert result["realized_return_pct"] == -10.0 + + +def test_evaluate_decision_sell_failure_when_price_rises(): + decision = {"action": "TRIM_REVIEW_PARTIAL"} + result = evaluate_decision(decision, price_at_decision=100.0, price_after=110.0) + assert result["success"] is False + + +def test_evaluate_decision_hold_add_success_when_price_rises(): + decision = {"action": "HOLD_ADD_CONVICTION"} + result = evaluate_decision(decision, price_at_decision=100.0, price_after=105.0) + assert result["success"] is True + + +def test_evaluate_decision_returns_none_for_non_directional_action(): + assert evaluate_decision({"action": "HOLD_NO_CONFLUENCE"}, 100.0, 105.0) is None + + +def test_build_accuracy_report_data_gated_when_sample_too_small(tmp_path): + db_path = tmp_path / "test.db" + insert_sell_strategy_result(db_path, { + "code": "005930", "generated_at": "2026-06-01T12:00:00", + "decision": {"action": "EXIT_REVIEW_FULL"}, + }) + report = build_accuracy_report(db_path, price_lookup={ + "005930": {"2026-06-01": 100.0, "2026-06-06": 90.0}, + }) + assert report["status"] == "DATA_GATED" + assert report["scored_sample_count"] == 1 + + +def test_build_accuracy_report_ok_with_enough_samples(tmp_path): + db_path = tmp_path / "test.db" + price_lookup: dict = {} + for i in range(12): + code = f"00000{i % 3}" + gen_at = f"2026-05-{(i % 20) + 1:02d}T12:00:00" + insert_sell_strategy_result(db_path, { + "code": code, "generated_at": gen_at, + "decision": {"action": "EXIT_REVIEW_FULL"}, + }) + date_key = gen_at[:10] + future_key = ( + __import__("datetime").date.fromisoformat(date_key) + __import__("datetime").timedelta(days=5) + ).isoformat() + price_lookup.setdefault(code, {})[date_key] = 100.0 + price_lookup[code][future_key] = 90.0 # 매도신호 후 하락 — success + report = build_accuracy_report(db_path, price_lookup) + assert report["status"] == "OK" + assert report["hit_rate_pct"] == 100.0 + assert report["scored_sample_count"] == 12 diff --git a/tests/unit/test_qualitative_sell_strategy_store_v1.py b/tests/unit/test_qualitative_sell_strategy_store_v1.py new file mode 100644 index 0000000..23ad288 --- /dev/null +++ b/tests/unit/test_qualitative_sell_strategy_store_v1.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from src.quant_engine.qualitative_sell_strategy_store_v1 import ( + QualitativeSellStoreSpec, + fetch_recent_sell_strategy_results, + insert_satellite_recommendation, + insert_sell_strategy_result, + resolve_store_path, +) + + +def test_insert_and_fetch_sell_strategy_result(tmp_path): + db_path = tmp_path / "test.db" + result = { + "code": "005930", + "generated_at": "2026-06-21T12:00:00+09:00", + "decision": { + "action": "TRIM_REVIEW_PARTIAL", + "conviction": "MEDIUM", + "market_regime": "TECHNICAL_MARKET", + "composite_score": 0.42, + "rationale": "test rationale", + }, + } + insert_sell_strategy_result(db_path, result) + rows = fetch_recent_sell_strategy_results(db_path, "005930") + assert len(rows) == 1 + assert rows[0]["action"] == "TRIM_REVIEW_PARTIAL" + assert rows[0]["composite_score"] == 0.42 + + +def test_fetch_returns_empty_list_when_db_missing(tmp_path): + rows = fetch_recent_sell_strategy_results(tmp_path / "nonexistent.db", "005930") + assert rows == [] + + +def test_multiple_inserts_ordered_by_generated_at_desc(tmp_path): + db_path = tmp_path / "test.db" + for ts in ("2026-06-19T12:00:00", "2026-06-21T12:00:00", "2026-06-20T12:00:00"): + insert_sell_strategy_result(db_path, { + "code": "005930", "generated_at": ts, + "decision": {"action": "HOLD_NO_CONFLUENCE"}, + }) + rows = fetch_recent_sell_strategy_results(db_path, "005930") + assert [r["generated_at"] for r in rows] == ["2026-06-21T12:00:00", "2026-06-20T12:00:00", "2026-06-19T12:00:00"] + + +def test_insert_satellite_recommendation(tmp_path): + db_path = tmp_path / "test.db" + insert_satellite_recommendation(db_path, "2026-06-21T12:00:00+09:00", { + "ticker": "042700", + "score": {"satellite_action": "BUY_CANDIDATE", "attractiveness_score": 0.6, "market_regime": "PERFORMANCE_MARKET"}, + }) + import sqlite3 + conn = sqlite3.connect(db_path) + row = conn.execute("SELECT ticker, satellite_action, attractiveness_score FROM satellite_recommendations").fetchone() + conn.close() + assert row == ("042700", "BUY_CANDIDATE", 0.6) + + +def test_resolve_store_path_supports_sqlite(tmp_path): + db_path = resolve_store_path(QualitativeSellStoreSpec(location=tmp_path / "qualitative.db"), ROOT) + assert str(db_path).endswith("qualitative.db") diff --git a/tests/unit/test_qualitative_sell_strategy_v1.py b/tests/unit/test_qualitative_sell_strategy_v1.py new file mode 100644 index 0000000..232326d --- /dev/null +++ b/tests/unit/test_qualitative_sell_strategy_v1.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +import sys +from datetime import date +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from src.quant_engine.qualitative_sell_strategy_v1 import ( + classify_market_regime, + compute_microstructure_pressure_from_orderbook, + compute_qualitative_sell_strategy, + compute_satellite_candidate_score, + compute_short_interest_composite, +) + + +def test_classify_market_regime(): + assert classify_market_regime("RISING") == "PERFORMANCE_MARKET" + assert classify_market_regime("FLAT") == "TECHNICAL_MARKET" + assert classify_market_regime("FALLING") == "TECHNICAL_MARKET" + assert classify_market_regime(None) == "NEUTRAL" + assert classify_market_regime("garbage") == "NEUTRAL" + + +def test_short_interest_composite_data_missing_without_estimating(): + result = compute_short_interest_composite({"short_balance_ratio": 0.6}) + assert result["status"] == "DATA_MISSING" + assert "short_turnover_share" in result["missing_inputs"] + assert result["short_interest_pressure"] is None + + +def test_short_interest_composite_low_balance_regime_reweights(): + low_balance = compute_short_interest_composite({ + "short_balance_ratio": 0.6, "short_balance_ratio_chg_20d": 0.1, + "short_turnover_share": 14.0, "relative_return_20d": -8.0, + "volume_ratio_5d": 1.8, "earnings_outlook": "DETERIORATING", + }) + assert low_balance["low_balance_regime"] is True + assert low_balance["weights_used"]["balance"] < low_balance["weights_used"]["turnover"] + assert low_balance["label"] == "ELEVATED_SHORT_PRESSURE" + + +def test_confluence_requires_minimum_three_agreeing_factors(): + # 2개 팩터만 매도방향(macro, short_interest) 합의 — 3개 미달이므로 매도 액션 금지 + ctx = { + "macro_pressure": 0.5, "short_interest_pressure": 0.6, + "fundamental_trajectory": -0.5, "microstructure_pressure": -0.4, + "liquidity_rotation_risk": 0.1, + } + out = compute_qualitative_sell_strategy(ctx) + assert out["action"] not in {"EXIT_REVIEW_FULL", "TRIM_REVIEW_PARTIAL"} + + +def test_confluence_triggers_trim_when_three_factors_agree(): + ctx = { + "macro_pressure": 0.5, "short_interest_pressure": 0.5, + "fundamental_trajectory": 0.4, "microstructure_pressure": 0.1, + "liquidity_rotation_risk": 0.0, + } + out = compute_qualitative_sell_strategy(ctx) + assert out["action"] == "TRIM_REVIEW_PARTIAL" + assert set(out["sell_agreeing_factors"]) == {"macro_pressure", "short_interest_pressure", "fundamental_trajectory"} + + +def test_insufficient_data_does_not_fabricate_action(): + out = compute_qualitative_sell_strategy({"macro_pressure": 0.9}) + assert out["action"] == "INSUFFICIENT_DATA_NO_ACTION" + assert out["mechanical_sell_prohibited"] is True + + +def test_review_window_pre_earnings_when_outlook_deteriorating(): + ctx = { + "macro_pressure": 0.5, "fundamental_trajectory": 0.5, "short_interest_pressure": 0.5, + "earnings_outlook": "DETERIORATING", + "next_earnings_date": date(2026, 7, 24), + "today": date(2026, 6, 21), + } + out = compute_qualitative_sell_strategy(ctx) + assert out["review_window"]["window_basis"] == "PRE_EARNINGS_EXIT_BEFORE_SURPRISE_RISK" + assert out["review_window"]["review_window_end"] < "2026-07-24" + + +def test_review_window_defers_past_earnings_when_outlook_improving(): + ctx = { + "macro_pressure": -0.5, "fundamental_trajectory": -0.5, "short_interest_pressure": -0.5, + "earnings_outlook": "IMPROVING", + "next_earnings_date": date(2026, 7, 24), + "today": date(2026, 6, 21), + } + out = compute_qualitative_sell_strategy(ctx) + assert out["action"] == "HOLD_ADD_CONVICTION" + assert out["review_window"]["window_basis"] == "REASSESS_AFTER_EARNINGS_CONFIRM" + + +def test_regime_weighting_shifts_composite_score_without_changing_confluence_count(): + base_ctx = { + "macro_pressure": 0.4, "fundamental_trajectory": 0.6, "short_interest_pressure": 0.35, + "microstructure_pressure": 0.1, "liquidity_rotation_risk": 0.0, + } + performance = compute_qualitative_sell_strategy({**base_ctx, "rate_trend": "RISING"}) + technical = compute_qualitative_sell_strategy({**base_ctx, "rate_trend": "FALLING"}) + assert performance["market_regime"] == "PERFORMANCE_MARKET" + assert technical["market_regime"] == "TECHNICAL_MARKET" + assert performance["sell_agreeing_factors"] == technical["sell_agreeing_factors"] + assert performance["composite_score"] != technical["composite_score"] + + +def test_satellite_candidate_score_insufficient_data(): + out = compute_satellite_candidate_score({"fundamental_trajectory": 0.2}) + assert out["satellite_action"] == "INSUFFICIENT_DATA_NO_ACTION" + + +def test_satellite_candidate_score_buy_candidate_on_strong_export_and_fundamentals(): + out = compute_satellite_candidate_score({ + "sector_export_trend": 12.0, "fundamental_trajectory": -0.4, + "relative_return_20d": 3.0, "rate_trend": "RISING", + }) + assert out["satellite_action"] == "BUY_CANDIDATE" + assert out["market_regime"] == "PERFORMANCE_MARKET" + + +def test_microstructure_pressure_from_orderbook_ask_heavy_is_positive(): + out = compute_microstructure_pressure_from_orderbook({"total_askp_rsqn": "300000", "total_bidp_rsqn": "100000"}) + assert out["status"] == "OK" + assert out["microstructure_pressure"] > 0 + + +def test_microstructure_pressure_from_orderbook_bid_heavy_is_negative(): + out = compute_microstructure_pressure_from_orderbook({"total_askp_rsqn": "100000", "total_bidp_rsqn": "300000"}) + assert out["microstructure_pressure"] < 0 + + +def test_microstructure_pressure_from_orderbook_missing_fields(): + out = compute_microstructure_pressure_from_orderbook({}) + assert out["status"] == "DATA_MISSING" + assert out["microstructure_pressure"] is None + + +def test_map_universe_sector_to_hs_sector_substring_match(): + from tools.build_satellite_candidate_recommendations_v1 import map_universe_sector_to_hs_sector + + assert map_universe_sector_to_hs_sector("반도체/PCB") == "반도체" + assert map_universe_sector_to_hs_sector("자동차/부품") == "자동차" + assert map_universe_sector_to_hs_sector("AI전력/기기") is None + assert map_universe_sector_to_hs_sector(None) is None diff --git a/tests/unit/test_validate_qualitative_sell_strategy_pipeline_v1.py b/tests/unit/test_validate_qualitative_sell_strategy_pipeline_v1.py new file mode 100644 index 0000000..91e9267 --- /dev/null +++ b/tests/unit/test_validate_qualitative_sell_strategy_pipeline_v1.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import json +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +import tools.validate_qualitative_sell_strategy_pipeline_v1 as validator + + +def test_validate_qualitative_sell_strategy_pipeline_passes(tmp_path, monkeypatch): + out = tmp_path / "qualitative_sell_strategy_pipeline_v1.json" + monkeypatch.setattr(sys, "argv", ["validate_qualitative_sell_strategy_pipeline_v1.py"]) + monkeypatch.setattr(validator, "ROOT", ROOT) + + rc = validator.main() + payload = json.loads((ROOT / "Temp" / "qualitative_sell_strategy_pipeline_v1.json").read_text(encoding="utf-8")) + + assert rc == 0 + assert payload["gate"] == "PASS" + assert payload["checks"]["store_contract"] is True diff --git a/tools/build_macro_context_from_workbook_v1.py b/tools/build_macro_context_from_workbook_v1.py new file mode 100644 index 0000000..d992f21 --- /dev/null +++ b/tools/build_macro_context_from_workbook_v1.py @@ -0,0 +1,204 @@ +"""GatherTradingData.xlsx에서 실제 매크로/이벤트/포지션 컨텍스트를 추출. + +build_qualitative_sell_inputs_v1.py의 --context-json을 수동 작성하지 않고, 이미 +GAS 하네스가 산출/수집해 둔 시트 값을 그대로 읽어 자동 조립한다(중복 수집 금지 +원칙 — qualitative_sell_strategy_v1.yaml:data_sources 참조). + +실측 확인된 시트/컬럼(2026-06-21): + - macro 시트: Symbol='MRS_COMPUTED'.Close = market_risk_score(0~10, 하네스 산출). + Symbol='^TNX'(US10Y_Yield).Ret20D = 20일 금리추세 proxy(국내 기준금리 시트 없음 — + 한국은행 금통위 일정은 event_calendar Type='BOK'로 별도 포착). + - event_risk 시트: Date/DaysLeft/Event/Type/Impact(HIGH/MEDIUM/LOW)/Alert/AsOfDate. + - event_calendar 시트: Date/Event/Type(EARNINGS/FOMC/BOK/...)/Impact/DaysLeft 등. + Type='EARNINGS'에 종목명이 Event 텍스트에 포함된 행만 종목별 실적발표일로 매칭. + - account_snapshot 시트: ticker/name/holding_quantity/parse_status='CAPTURE_READ_OK'. +""" +from __future__ import annotations + +import argparse +import datetime as dt +import json +import sys +from pathlib import Path +from typing import Any + +from openpyxl import load_workbook + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +RATE_RISING_THRESHOLD_PCT = 2.0 +RATE_FALLING_THRESHOLD_PCT = -2.0 + + +def _read_sheet_rows(xlsx_path: Path, sheet: str) -> tuple[tuple, list[dict[str, Any]]]: + """헤더 행을 탐색한다. 일부 시트(macro/event_risk)는 1행에 'updated: ...' 배너 + 셀 1개만 있고 실제 헤더는 2행 — 비어있거나 단일 셀뿐인 선행 행은 건너뛴다.""" + wb = load_workbook(xlsx_path, read_only=True, data_only=True) + ws = wb[sheet] + rows_iter = ws.iter_rows(min_row=1, values_only=True) + header: tuple = () + for row in rows_iter: + non_empty = [c for c in row if c is not None] + if len(non_empty) >= 2: + header = row + break + rows = [dict(zip(header, row)) for row in rows_iter if any(c is not None for c in row)] + return header, rows + + +def read_macro_pressure_and_regime(xlsx_path: Path) -> dict[str, Any]: + """MRS_COMPUTED.Close(0~10) -> macro_pressure(-1~+1, 위험도 높을수록 매도압력). + + ^TNX Ret20D(%) -> rate_trend(RISING/FLAT/FALLING) — 국내 기준금리 시트가 없어 + 미국채 10년물 20일 변화율을 proxy로 사용한다(국내 금리는 미 국채와 강한 동행성). + """ + _, rows = _read_sheet_rows(xlsx_path, "macro") + by_symbol = {row.get("Symbol"): row for row in rows} + + mrs_row = by_symbol.get("MRS_COMPUTED") + macro_pressure = None + market_risk_score = None + if mrs_row is not None and isinstance(mrs_row.get("Close"), (int, float)): + market_risk_score = float(mrs_row["Close"]) + macro_pressure = max(-1.0, min(1.0, (market_risk_score / 10.0) * 2.0 - 1.0)) + + tnx_row = by_symbol.get("^TNX") + rate_trend = None + rate_ret20d_pct = None + if tnx_row is not None and tnx_row.get("Ret20D") not in (None, ""): + try: + rate_ret20d_pct = float(tnx_row["Ret20D"]) + except (TypeError, ValueError): + rate_ret20d_pct = None + if rate_ret20d_pct is not None: + if rate_ret20d_pct >= RATE_RISING_THRESHOLD_PCT: + rate_trend = "RISING" + elif rate_ret20d_pct <= RATE_FALLING_THRESHOLD_PCT: + rate_trend = "FALLING" + else: + rate_trend = "FLAT" + + regime_row = by_symbol.get("REGIME_PRELIM") + regime_prelim = regime_row.get("Close") if regime_row else None + + return { + "macro_pressure": macro_pressure, + "market_risk_score": market_risk_score, + "rate_trend": rate_trend, + "rate_ret20d_pct": rate_ret20d_pct, + "regime_prelim": regime_prelim, + "macro_pressure_source": "GatherTradingData.xlsx:macro", + } + + +def read_next_macro_event(xlsx_path: Path, today: dt.date | None = None) -> dict[str, Any]: + """event_risk 시트에서 오늘 이후 가장 가까운 HIGH 임팩트 이벤트일.""" + today = today or dt.date.today() + _, rows = _read_sheet_rows(xlsx_path, "event_risk") + candidates = [] + for row in rows: + event_date = row.get("Date") + if not isinstance(event_date, dt.datetime): + continue + event_date = event_date.date() + if event_date < today or row.get("Impact") not in {"HIGH"}: + continue + candidates.append((event_date, row.get("Event"), row.get("Impact"))) + if not candidates: + return {"next_macro_event_date": None, "macro_event_impact": None} + candidates.sort(key=lambda item: item[0]) + event_date, event_name, impact = candidates[0] + return { + "next_macro_event_date": event_date.isoformat(), + "macro_event_impact": impact, + "macro_event_name": event_name, + "macro_event_source": "GatherTradingData.xlsx:event_risk", + } + + +def read_next_earnings_date(xlsx_path: Path, company_name: str, today: dt.date | None = None) -> dict[str, Any]: + """event_calendar에서 Type='EARNINGS'이며 Event 텍스트에 종목명이 포함된 가장 빠른 미래 일정.""" + today = today or dt.date.today() + _, rows = _read_sheet_rows(xlsx_path, "event_calendar") + candidates = [] + name = (company_name or "").strip() + if not name: + return {"next_earnings_date": None, "earnings_event_impact": None} + for row in rows: + if row.get("Type") != "EARNINGS": + continue + event_text = str(row.get("Event") or "") + if name not in event_text: + continue + event_date = row.get("Date") + if isinstance(event_date, dt.datetime): + event_date = event_date.date() + elif isinstance(event_date, str): + try: + event_date = dt.date.fromisoformat(event_date) + except ValueError: + continue + else: + continue + if event_date < today: + continue + candidates.append((event_date, row.get("Impact"))) + if not candidates: + return {"next_earnings_date": None, "earnings_event_impact": None} + candidates.sort(key=lambda item: item[0]) + event_date, impact = candidates[0] + return { + "next_earnings_date": event_date.isoformat(), + "earnings_event_impact": impact, + "earnings_source": "GatherTradingData.xlsx:event_calendar", + } + + +def read_positions(xlsx_path: Path) -> list[dict[str, Any]]: + """account_snapshot에서 실제 보유 종목 목록(CAPTURE_READ_OK, 보유수량>0).""" + _, rows = _read_sheet_rows(xlsx_path, "account_snapshot") + positions: dict[str, dict[str, Any]] = {} + for row in rows: + if row.get("parse_status") != "CAPTURE_READ_OK": + continue + ticker_raw = row.get("ticker") + qty = row.get("holding_quantity") or 0 + if ticker_raw is None or not isinstance(qty, (int, float)) or qty <= 0: + continue + ticker = str(ticker_raw) + ticker = ticker.zfill(6) if ticker.isdigit() else ticker + entry = positions.setdefault(ticker, {"ticker": ticker, "name": row.get("name"), "holding_quantity": 0.0}) + entry["holding_quantity"] += float(qty) # 소수주 분리 행 합산 + return list(positions.values()) + + +def build_context_for_ticker(xlsx_path: Path, ticker: str, company_name: str) -> dict[str, Any]: + today = dt.date.today() + ctx: dict[str, Any] = {} + ctx.update(read_macro_pressure_and_regime(xlsx_path)) + ctx.update(read_next_macro_event(xlsx_path, today)) + ctx.update(read_next_earnings_date(xlsx_path, company_name, today)) + return ctx + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--xlsx", type=Path, default=ROOT / "GatherTradingData.xlsx") + ap.add_argument("--ticker", default=None) + ap.add_argument("--name", default=None, help="실적발표 일정 매칭용 종목명(한글)") + ap.add_argument("--list-positions", action="store_true") + args = ap.parse_args() + + if args.list_positions: + print(json.dumps(read_positions(args.xlsx), ensure_ascii=False, indent=2)) + return 0 + + result = build_context_for_ticker(args.xlsx, args.ticker or "", args.name or "") + print(json.dumps(result, ensure_ascii=False, indent=2, default=str)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/build_qualitative_sell_inputs_v1.py b/tools/build_qualitative_sell_inputs_v1.py new file mode 100644 index 0000000..b066a20 --- /dev/null +++ b/tools/build_qualitative_sell_inputs_v1.py @@ -0,0 +1,342 @@ +"""qualitative_sell_strategy_v1 입력 ctx 조립 오케스트레이터. + +데이터 출처 (2026-06-21 세션 실측 기준, KIS Open API 연동 이후): + - relative_return_20d, volume_ratio_5d ← tools/fetch_naver_market_data_v1.py (무인증, 동작 확인) + - sector_export_trend ← tools/fetch_trade_statistics_motie_v1.py (--csv 경로 권장) + - short_turnover_share ← [신규] KIS Open API daily-short-sale(FHPST04830000) + output2.ssts_vol_rlim — 실측 동작 확인(실전계좌 도메인, + 모의계좌 도메인은 500 에러). --kis-account real 필요. + - short_balance_ratio(잔고율) ← 여전히 미확보. KIS API도 제공하지 않음(KRX 공매도종합 + 포털 대량보유 공시 전용 데이터) — --short-csv 수동 + 다운로드로만 가능. + - microstructure_pressure(호가10단계) ← [신규] KIS Open API inquire-asking-price-exp-ccn + (FHKST01010200) output1.total_askp_rsqn/total_bidp_rsqn + — 실측 동작 확인(실전+모의 도메인 모두). --kis-account + {real,mock}로 활성화. + - macro_pressure, rate_trend, next_earnings_date, next_macro_event_date, macro_event_impact + ← 기존 GAS 하네스(macro_event_synchronizer_v2, + gas_event_calendar.gs)가 이미 산출/수집 중 — + 이 스크립트가 중복 수집하지 않고 --context-json/ + --workbook으로 그 결과를 주입받는다. + - investing.com ← 직접 스크래핑 403(Cloudflare) 차단 확인. 사용 안 함. + +[CRITICAL] KIS API는 조회(read-only)로만 사용한다 — 매수/매도 주문은 어떤 경우에도 이 코드를 +통해 실행하지 않는다(governance/rules/06_no_direct_api_trading.yaml, CI 강제 게이트 +tools/validate_no_direct_api_trading_v1.py). + +사용 예: + python tools/build_qualitative_sell_inputs_v1.py \ + --ticker 005930 --benchmark-code 069500 --sector 반도체 \ + --kis-account real --short-csv Temp/krx_short_balance_manual.csv \ + --context-json Temp/macro_context.json --apply +""" +from __future__ import annotations + +import argparse +import datetime as dt +import json +import sys +from pathlib import Path +from typing import Any + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from tools.fetch_naver_market_data_v1 import ( + _session, + compute_relative_return_20d, + compute_volume_ratio_5d, + fetch_price_history, +) +from tools.fetch_trade_statistics_motie_v1 import ( + compute_sector_export_trend, + load_trade_statistics_csv, +) +from src.quant_engine.qualitative_sell_strategy_v1 import ( + compute_microstructure_pressure_from_orderbook, + compute_qualitative_sell_strategy, + compute_short_interest_composite, +) +from src.quant_engine.qualitative_sell_strategy_store_v1 import ( + QualitativeSellStoreSpec, + insert_sell_strategy_result, + resolve_store_path, +) + +DEFAULT_OUTPUT_DIR = ROOT / "outputs" / "qualitative_sell_strategy" +DEFAULT_SQLITE_DB = DEFAULT_OUTPUT_DIR / "qualitative_sell_strategy.db" + + +def _kst_now_iso() -> str: + return dt.datetime.now(dt.timezone(dt.timedelta(hours=9))).isoformat() + + +def _parse_date(value: str | None) -> dt.date | None: + if not value: + return None + try: + return dt.date.fromisoformat(value) + except ValueError: + return None + + +def load_short_interest_csv(path: Path, code: str) -> dict[str, Any]: + """KRX 공매도종합포털 수동 다운로드 CSV. 컬럼: 종목코드, 잔고율, 잔고율변화20일, 거래비중.""" + import csv + + with path.open(encoding="utf-8-sig", newline="") as f: + for row in csv.DictReader(f): + row_code = str(row.get("종목코드") or row.get("code") or "").strip().zfill(6) + if row_code == code: + return { + "short_balance_ratio": float(row.get("잔고율") or row.get("short_balance_ratio") or 0), + "short_balance_ratio_chg_20d": float(row.get("잔고율변화20일") or row.get("short_balance_ratio_chg_20d") or 0), + "short_turnover_share": float(row.get("거래비중") or row.get("short_turnover_share") or 0), + } + return {} + + +def fetch_kis_supplement(code: str, kis_account: str | None) -> dict[str, Any]: + """KIS Open API에서 short_turnover_share(공매도거래비중)와 microstructure_pressure + (호가10단계)를 조회한다. 조회(read-only)만 수행 — 주문 관련 호출 없음.""" + if not kis_account: + return {} + from src.quant_engine.kis_api_client_v1 import KisCredentials, get_asking_price_10_level, get_daily_short_sale + + result: dict[str, Any] = {} + try: + creds = KisCredentials.load(kis_account) + except RuntimeError as exc: + return {"kis_error": str(exc)} + + try: + ob = get_asking_price_10_level(creds, code) + micro = compute_microstructure_pressure_from_orderbook(ob.get("output1", {})) + if micro.get("status") == "OK": + result["microstructure_pressure"] = micro["microstructure_pressure"] + except Exception as exc: # noqa: BLE001 — KIS 호출 실패가 전체 파이프라인을 막지 않음 + result["kis_orderbook_error"] = str(exc) + + try: + today = dt.date.today() + start = (today - dt.timedelta(days=10)).strftime("%Y%m%d") + end = today.strftime("%Y%m%d") + ss = get_daily_short_sale(creds, code, start, end) + rows = ss.get("output2") or [] + if rows: + latest = rows[0] + ssts_vol_rlim = latest.get("ssts_vol_rlim") + if ssts_vol_rlim is not None: + result["short_turnover_share"] = float(ssts_vol_rlim) + except Exception as exc: # noqa: BLE001 + result["kis_short_sale_error"] = str(exc) + + return result + + +def build_ctx_for_ticker( + code: str, + benchmark_code: str, + sector: str | None, + earnings_outlook: str, + trade_csv: Path | None, + short_csv: Path | None, + external_context: dict[str, Any], + kis_account: str | None = None, +) -> dict[str, Any]: + session = _session() + price = fetch_price_history(session, code) + benchmark = fetch_price_history(session, benchmark_code) + + relative_return_20d = compute_relative_return_20d(price.get("rows", []), benchmark.get("rows", [])) + volume_ratio_5d = compute_volume_ratio_5d(price.get("rows", [])) + kis_supplement = fetch_kis_supplement(code, kis_account) + + short_inputs: dict[str, Any] = {} + if short_csv and short_csv.exists(): + short_inputs = load_short_interest_csv(short_csv, code) + if "short_turnover_share" in kis_supplement: + short_inputs["short_turnover_share"] = kis_supplement["short_turnover_share"] + short_inputs.setdefault("relative_return_20d", relative_return_20d) + short_inputs.setdefault("volume_ratio_5d", volume_ratio_5d) + short_inputs.setdefault("earnings_outlook", earnings_outlook) + short_interest = compute_short_interest_composite(short_inputs) + + sector_export_trend = None + if trade_csv and trade_csv.exists() and sector: + rows = load_trade_statistics_csv(trade_csv) + export_result = compute_sector_export_trend(rows, sector, compare="yoy") + if export_result.get("status") == "OK": + sector_export_trend = export_result["sector_export_trend"] + + fundamental_trajectory = external_context.get("fundamental_trajectory") + if fundamental_trajectory is None and sector_export_trend is not None: + fundamental_trajectory = max(-1.0, min(1.0, -sector_export_trend / 15.0)) + + ctx: dict[str, Any] = { + "today": dt.date.today(), + "macro_pressure": external_context.get("macro_pressure"), + "fundamental_trajectory": fundamental_trajectory, + "short_interest_pressure": short_interest.get("short_interest_pressure"), + "microstructure_pressure": kis_supplement.get("microstructure_pressure", external_context.get("microstructure_pressure")), + "liquidity_rotation_risk": external_context.get("liquidity_rotation_risk"), + "earnings_outlook": earnings_outlook, + "next_earnings_date": _parse_date(external_context.get("next_earnings_date")), + "next_macro_event_date": _parse_date(external_context.get("next_macro_event_date")), + "macro_event_impact": external_context.get("macro_event_impact"), + "rate_trend": external_context.get("rate_trend"), + } + return { + "code": code, + "ctx": ctx, + "short_interest_composite": short_interest, + "sector_export_trend": sector_export_trend, + "relative_return_20d": relative_return_20d, + "volume_ratio_5d": volume_ratio_5d, + "kis_supplement": kis_supplement, + "generated_at": _kst_now_iso(), + } + + +def process_one( + ticker: str, + name: str, + benchmark_code: str, + sector: str | None, + earnings_outlook: str, + trade_csv: Path | None, + short_csv: Path | None, + workbook: Path | None, + context_json: Path | None, + kis_account: str | None = None, +) -> dict[str, Any]: + external_context: dict[str, Any] = {} + if context_json and context_json.exists(): + external_context = json.loads(context_json.read_text(encoding="utf-8")) + elif workbook and workbook.exists(): + from tools.build_macro_context_from_workbook_v1 import build_context_for_ticker + external_context = build_context_for_ticker(workbook, ticker, name) + + assembled = build_ctx_for_ticker( + code=ticker, + benchmark_code=benchmark_code, + sector=sector, + earnings_outlook=earnings_outlook, + trade_csv=trade_csv, + short_csv=short_csv, + external_context=external_context, + kis_account=kis_account, + ) + decision = compute_qualitative_sell_strategy(assembled["ctx"]) + result = {**assembled, "decision": decision} + result["ctx"] = {k: (v.isoformat() if isinstance(v, dt.date) else v) for k, v in result["ctx"].items()} + return result + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--ticker", default=None, help="6자리 종목코드(단일 실행 시 필수)") + ap.add_argument("--name", default=None, help="실적발표 매칭용 종목명(한글)") + ap.add_argument("--benchmark-code", default="069500") + ap.add_argument("--sector", default=None, help="fetch_trade_statistics_motie_v1.SECTOR_HS_MAP 키") + ap.add_argument("--earnings-outlook", default="STABLE", choices=["IMPROVING", "STABLE", "DETERIORATING"]) + ap.add_argument("--trade-csv", type=Path, default=None) + ap.add_argument("--short-csv", type=Path, default=None, help="KRX 공매도종합포털 수동 다운로드 CSV") + ap.add_argument("--context-json", type=Path, default=None, help="macro_pressure/rate_trend/이벤트일 등 외부 산출값 JSON(수동)") + ap.add_argument("--workbook", type=Path, default=None, help="GatherTradingData.xlsx — macro/event_risk/event_calendar 시트에서 컨텍스트 자동 추출(권장)") + ap.add_argument("--batch", action="store_true", help="--workbook의 account_snapshot 실보유 종목 전체 순회(국내 6자리 코드만)") + ap.add_argument("--kis-account", choices=["real", "mock"], default=None, + help="KIS Open API로 호가10단계/공매도거래비중 보강 조회(read-only). " + "공매도 일별추이는 real 도메인만 동작 확인됨(mock은 500 에러).") + ap.add_argument("--apply", action="store_true", help="outputs/qualitative_sell_strategy/.json 저장") + ap.add_argument("--sqlite-db", type=Path, default=DEFAULT_SQLITE_DB, + help="JSON 저장과 병행해 시계열 SQLite에도 기록(GAS/xlsx와 무관한 추가 저장소)") + ap.add_argument("--store-backend", default="sqlite", help="Storage backend contract placeholder (sqlite today, postgresql planned)") + ap.add_argument("--store-location", default=None, help="Backend location/DSN. sqlite path or future postgres DSN.") + ap.add_argument("--no-sqlite", action="store_true", help="SQLite 기록 비활성화") + args = ap.parse_args() + store_db = resolve_store_path( + QualitativeSellStoreSpec( + backend=args.store_backend, + location=args.store_location or args.sqlite_db, + ), + ROOT, + ) + + if args.batch: + if not args.workbook or not args.workbook.exists(): + raise SystemExit("--batch는 --workbook 경로가 필요합니다") + from tools.build_macro_context_from_workbook_v1 import read_positions + positions = [p for p in read_positions(args.workbook) if str(p["ticker"]).isdigit() and len(str(p["ticker"])) == 6] + if args.apply: + DEFAULT_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + results = [] + for pos in positions: + try: + result = process_one( + ticker=pos["ticker"], name=str(pos.get("name") or ""), + benchmark_code=args.benchmark_code, sector=args.sector, + earnings_outlook=args.earnings_outlook, trade_csv=args.trade_csv, + short_csv=args.short_csv, workbook=args.workbook, context_json=None, + kis_account=args.kis_account, + ) + except Exception as exc: # noqa: BLE001 — 종목 1건 실패가 배치 전체를 막지 않음 + result = {"code": pos["ticker"], "status": "FETCH_ERROR", "note": str(exc)} + results.append(result) + if args.apply: + out_path = DEFAULT_OUTPUT_DIR / f"{pos['ticker']}.json" + out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") + if not args.no_sqlite and result.get("status") != "FETCH_ERROR": + insert_sell_strategy_result(store_db, result) + error_count = sum(1 for r in results if r.get("status") == "FETCH_ERROR") + action_counts: dict[str, int] = {} + for r in results: + action = (r.get("decision") or {}).get("action", "N/A") + action_counts[action] = action_counts.get(action, 0) + 1 + summary = { + "generated_at": _kst_now_iso(), + "ticker_count": len(results), + "error_count": error_count, + "action_counts": action_counts, + } + print(f"SUMMARY: {json.dumps(summary, ensure_ascii=False)}") + if args.apply: + (DEFAULT_OUTPUT_DIR / "_batch_summary.json").write_text( + json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8" + ) + print(f"written {len(results)} files to {DEFAULT_OUTPUT_DIR}") + else: + print(json.dumps(results, ensure_ascii=False, indent=2)) + # 절반 이상 실패면 CI에서 빨간불로 보이도록 — 호출결과를 로그만으로 확인 가능하게 함 + if results and error_count / len(results) >= 0.5: + print(f"BATCH_GATE: FAIL — error_count={error_count}/{len(results)}") + return 1 + print("BATCH_GATE: PASS") + return 0 + + if not args.ticker: + raise SystemExit("--ticker 또는 --batch 중 하나는 필수입니다") + + result = process_one( + ticker=args.ticker, name=args.name or "", + benchmark_code=args.benchmark_code, sector=args.sector, + earnings_outlook=args.earnings_outlook, trade_csv=args.trade_csv, + short_csv=args.short_csv, workbook=args.workbook, context_json=args.context_json, + kis_account=args.kis_account, + ) + + if args.apply: + DEFAULT_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + out_path = DEFAULT_OUTPUT_DIR / f"{args.ticker}.json" + out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") + if not args.no_sqlite: + insert_sell_strategy_result(store_db, result) + print(f"written: {out_path}") + else: + print(json.dumps(result, ensure_ascii=False, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/build_satellite_candidate_recommendations_v1.py b/tools/build_satellite_candidate_recommendations_v1.py new file mode 100644 index 0000000..9340813 --- /dev/null +++ b/tools/build_satellite_candidate_recommendations_v1.py @@ -0,0 +1,139 @@ +"""universe 시트(미보유 위성 유니버스) 전체를 SATELLITE_CANDIDATE_SCORE_V1로 평가. + +WBS-6 후속 — qualitative_sell_strategy_v1.compute_satellite_candidate_score를 실제 +GatherTradingData.xlsx universe 시트(Ticker/Name/Sector/AddedDate, 실측 확인됨)에 연동. +보유 종목(account_snapshot)은 제외하고 미보유 후보만 평가한다. + +universe.Sector 한글 라벨은 fetch_trade_statistics_motie_v1.SECTOR_HS_MAP 키와 1:1로 +일치하지 않으므로 부분 문자열 매칭으로 연결한다. 매칭 실패 종목은 sector_export_trend를 +추정하지 않고 None으로 두어 컨플루언스 부족(INSUFFICIENT_DATA_NO_ACTION)으로 자연 처리된다 +(추정 금지 원칙 — qualitative_sell_strategy_v1.yaml과 동일). +""" +from __future__ import annotations + +import argparse +import datetime as dt +import json +import sys +from pathlib import Path +from typing import Any + +from openpyxl import load_workbook + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from tools.build_macro_context_from_workbook_v1 import _read_sheet_rows, read_positions, read_macro_pressure_and_regime +from tools.fetch_naver_market_data_v1 import _session, compute_relative_return_20d, fetch_price_history +from tools.fetch_trade_statistics_motie_v1 import SECTOR_HS_MAP, compute_sector_export_trend, load_trade_statistics_csv +from src.quant_engine.qualitative_sell_strategy_v1 import compute_satellite_candidate_score +from src.quant_engine.qualitative_sell_strategy_store_v1 import ( + QualitativeSellStoreSpec, + insert_satellite_recommendation, + resolve_store_path, +) + +DEFAULT_OUTPUT = ROOT / "outputs" / "qualitative_sell_strategy" / "satellite_recommendations.json" +DEFAULT_SQLITE_DB = ROOT / "outputs" / "qualitative_sell_strategy" / "qualitative_sell_strategy.db" + + +def map_universe_sector_to_hs_sector(universe_sector: str) -> str | None: + text = str(universe_sector or "") + for hs_sector in SECTOR_HS_MAP: + if hs_sector in text: + return hs_sector + return None + + +def read_universe_candidates(xlsx_path: Path, exclude_tickers: set[str]) -> list[dict[str, Any]]: + _, rows = _read_sheet_rows(xlsx_path, "universe") + candidates = [] + for row in rows: + ticker = str(row.get("Ticker") or "").strip() + if not ticker or ticker in exclude_tickers: + continue + candidates.append({ + "ticker": ticker, + "name": row.get("Name"), + "universe_sector": row.get("Sector"), + "hs_sector": map_universe_sector_to_hs_sector(row.get("Sector")), + }) + return candidates + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--workbook", type=Path, default=ROOT / "GatherTradingData.xlsx") + ap.add_argument("--benchmark-code", default="069500") + ap.add_argument("--trade-csv", type=Path, default=None, help="관세청/산업통상부 수출입통계 CSV — 없으면 sector_export_trend는 전부 DATA_MISSING") + ap.add_argument("--apply", action="store_true", help=str(DEFAULT_OUTPUT) + " 저장") + ap.add_argument("--sqlite-db", type=Path, default=DEFAULT_SQLITE_DB, + help="JSON 저장과 병행해 시계열 SQLite에도 기록(GAS/xlsx와 무관한 추가 저장소)") + ap.add_argument("--store-backend", default="sqlite", help="Storage backend contract placeholder (sqlite today, postgresql planned)") + ap.add_argument("--store-location", default=None, help="Backend location/DSN. sqlite path or future postgres DSN.") + ap.add_argument("--no-sqlite", action="store_true", help="SQLite 기록 비활성화") + args = ap.parse_args() + store_db = resolve_store_path( + QualitativeSellStoreSpec( + backend=args.store_backend, + location=args.store_location or args.sqlite_db, + ), + ROOT, + ) + + held = {p["ticker"] for p in read_positions(args.workbook) if str(p["ticker"]).isdigit()} + candidates = read_universe_candidates(args.workbook, held) + + trade_rows = load_trade_statistics_csv(args.trade_csv) if args.trade_csv and args.trade_csv.exists() else [] + macro = read_macro_pressure_and_regime(args.workbook) + rate_trend = macro.get("rate_trend") + + session = _session() + benchmark = fetch_price_history(session, args.benchmark_code) + + results = [] + for cand in candidates: + sector_export_trend = None + if cand["hs_sector"] and trade_rows: + export_result = compute_sector_export_trend(trade_rows, cand["hs_sector"], compare="yoy") + if export_result.get("status") == "OK": + sector_export_trend = export_result["sector_export_trend"] + + relative_return_20d = None + if cand["ticker"].isdigit() and len(cand["ticker"]) == 6: + try: + price = fetch_price_history(session, cand["ticker"]) + relative_return_20d = compute_relative_return_20d(price.get("rows", []), benchmark.get("rows", [])) + except Exception: # noqa: BLE001 — 개별 종목 수집 실패가 전체 배치를 막지 않음 + relative_return_20d = None + + score = compute_satellite_candidate_score({ + "sector_export_trend": sector_export_trend, + "fundamental_trajectory": None, # universe 시트에 펀더멘털 추세 없음 — 추정 금지 + "relative_return_20d": relative_return_20d, + "rate_trend": rate_trend, + }) + results.append({**cand, "sector_export_trend": sector_export_trend, "relative_return_20d": relative_return_20d, "score": score}) + + output = { + "generated_at": dt.datetime.now(dt.timezone(dt.timedelta(hours=9))).isoformat(), + "rate_trend": rate_trend, + "candidate_count": len(results), + "results": results, + } + + if args.apply: + DEFAULT_OUTPUT.parent.mkdir(parents=True, exist_ok=True) + DEFAULT_OUTPUT.write_text(json.dumps(output, ensure_ascii=False, indent=2), encoding="utf-8") + if not args.no_sqlite: + for cand in results: + insert_satellite_recommendation(store_db, output["generated_at"], cand) + print(f"written: {DEFAULT_OUTPUT} ({len(results)} candidates)") + else: + print(json.dumps(output, ensure_ascii=False, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/evaluate_qualitative_sell_strategy_accuracy_v1.py b/tools/evaluate_qualitative_sell_strategy_accuracy_v1.py new file mode 100644 index 0000000..f6129b9 --- /dev/null +++ b/tools/evaluate_qualitative_sell_strategy_accuracy_v1.py @@ -0,0 +1,143 @@ +"""qualitative_sell_strategy_v1 자체 평가 루프 — "한 번 만들고 끝"이 아니라 결정이 +실제로 가치를 보존했는지 사후 검증한다(30년 시니어 퀀트의 핵심 습관: 판단 → 결과 → +재보정). 기존 T+5/T+20 outcome ledger(proposal_evaluation_history)와 별개로, +qualitative_sell_strategy_store_v1.db에 쌓인 SQLite 시계열을 사용한다 — GAS/xlsx와 +무관하므로 이 모듈만의 독립 평가 루프를 구성해도 기존 시스템과 충돌하지 않는다. + +판정 기준(가치보존 관점, 기계적 승률 게임이 아님): + - EXIT_REVIEW_FULL / TRIM_REVIEW_PARTIAL(매도방향) → 이후 가격이 하락했으면 + "가치보존 성공"(매도가 손실을 막았다). 상승했으면 "기회비용 발생"(조급한 매도). + - HOLD_ADD_CONVICTION(지지방향) → 이후 가격이 상승했으면 성공. + - HOLD_NO_CONFLUENCE / INSUFFICIENT_DATA_NO_ACTION → 방향성 주장이 없으므로 평가 대상 제외. + +표본이 부족하면(DATA_GATED) 추정하지 않고 명시적으로 보류한다 — honest_proof_score와 +동일한 원칙(spec/algorithm_guidance_proof 계열). +""" +from __future__ import annotations + +import argparse +import datetime as dt +import json +import sqlite3 +import sys +from pathlib import Path +from typing import Any + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from src.quant_engine.qualitative_sell_strategy_store_v1 import QualitativeSellStoreSpec, resolve_store_path + +MIN_HOLDING_DAYS = 5 # T+5 수준 — 너무 짧으면 노이즈, 너무 길면 표본 희소 +MIN_SAMPLE_FOR_HIT_RATE = 10 # 이보다 적으면 hit_rate를 신뢰 구간 없이 표기하지 않음(DATA_GATED) + + +def _scoreable_direction(action: str) -> int | None: + if action in {"EXIT_REVIEW_FULL", "TRIM_REVIEW_PARTIAL"}: + return -1 # 매도 방향 — 가격 하락이 "성공" + if action == "HOLD_ADD_CONVICTION": + return 1 # 지지 방향 — 가격 상승이 "성공" + return None # HOLD_NO_CONFLUENCE / INSUFFICIENT_DATA_NO_ACTION — 평가 제외 + + +def load_scoreable_decisions(db_path: Path, min_age_days: int = MIN_HOLDING_DAYS) -> list[dict[str, Any]]: + if not db_path.exists(): + return [] + cutoff = (dt.date.today() - dt.timedelta(days=min_age_days)).isoformat() + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + "SELECT code, generated_at, action, conviction, market_regime, composite_score " + "FROM sell_strategy_results WHERE generated_at <= ? ORDER BY generated_at", + (cutoff,), + ).fetchall() + return [dict(row) for row in rows] + finally: + conn.close() + + +def evaluate_decision(decision: dict[str, Any], price_at_decision: float, price_after: float) -> dict[str, Any] | None: + direction = _scoreable_direction(decision["action"]) + if direction is None or not price_at_decision or price_at_decision <= 0: + return None + realized_return_pct = (price_after / price_at_decision - 1.0) * 100.0 + success = (direction * realized_return_pct) > 0 # 방향 일치 시 성공 + return { + **decision, + "price_at_decision": price_at_decision, + "price_after": price_after, + "realized_return_pct": round(realized_return_pct, 4), + "success": success, + } + + +def build_accuracy_report(db_path: Path, price_lookup: dict[str, dict[str, float]]) -> dict[str, Any]: + """price_lookup: {code: {generated_at_date_iso: close_price}} — 호출측이 실제 가격 + 히스토리(fetch_naver_market_data_v1 등)로 조립해 주입한다. 이 함수는 가격을 추정하지 + 않는다 — 주어진 값만 사용.""" + decisions = load_scoreable_decisions(db_path) + evaluated: list[dict[str, Any]] = [] + skipped_no_price = 0 + for decision in decisions: + prices = price_lookup.get(decision["code"], {}) + decision_date = decision["generated_at"][:10] + price_at = prices.get(decision_date) + future_date = (dt.date.fromisoformat(decision_date) + dt.timedelta(days=MIN_HOLDING_DAYS)).isoformat() + price_after = prices.get(future_date) + if price_at is None or price_after is None: + skipped_no_price += 1 + continue + result = evaluate_decision(decision, price_at, price_after) + if result is not None: + evaluated.append(result) + + scored = [e for e in evaluated if e is not None] + if len(scored) < MIN_SAMPLE_FOR_HIT_RATE: + return { + "status": "DATA_GATED", + "scored_sample_count": len(scored), + "min_sample_required": MIN_SAMPLE_FOR_HIT_RATE, + "note": "표본 부족 — hit_rate를 산출하지 않음(추정 금지). 결정 누적과 가격 매칭이 더 필요.", + "skipped_no_price": skipped_no_price, + } + + hit_rate_pct = round(100.0 * sum(1 for e in scored if e["success"]) / len(scored), 2) + return { + "status": "OK", + "scored_sample_count": len(scored), + "hit_rate_pct": hit_rate_pct, + "evaluations": scored, + "skipped_no_price": skipped_no_price, + } + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--sqlite-db", type=Path, + default=ROOT / "outputs" / "qualitative_sell_strategy" / "qualitative_sell_strategy.db") + ap.add_argument("--store-backend", default="sqlite", help="Storage backend contract placeholder (sqlite today, postgresql planned)") + ap.add_argument("--store-location", default=None, help="Backend location/DSN. sqlite path or future postgres DSN.") + ap.add_argument("--price-lookup-json", type=Path, default=None, + help='{"code": {"YYYY-MM-DD": close_price, ...}} 형식 — 미지정 시 가격 매칭 없이 표본 카운트만 보고') + args = ap.parse_args() + db_path = resolve_store_path( + QualitativeSellStoreSpec( + backend=args.store_backend, + location=args.store_location or args.sqlite_db, + ), + ROOT, + ) + + price_lookup: dict[str, dict[str, float]] = {} + if args.price_lookup_json and args.price_lookup_json.exists(): + price_lookup = json.loads(args.price_lookup_json.read_text(encoding="utf-8")) + + report = build_accuracy_report(db_path, price_lookup) + print(json.dumps(report, ensure_ascii=False, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/fetch_naver_market_data_v1.py b/tools/fetch_naver_market_data_v1.py new file mode 100644 index 0000000..d869307 --- /dev/null +++ b/tools/fetch_naver_market_data_v1.py @@ -0,0 +1,168 @@ +"""Naver Finance 시세/수급 수집기 — qualitative_sell_strategy_v1 입력용. + +확인된 무인증 엔드포인트만 사용한다(2026-06-21 세션 실측): + - https://finance.naver.com/item/sise_day.naver?code={code}&page=N (일별 시세/거래량) + - https://finance.naver.com/item/frgn.naver?code={code}&page=N (외국인/기관 수급) + - https://polling.finance.naver.com/api/realtime/domestic/stock/{code} (실시간 스냅샷, JSON) + +investing.com 직접 스크래핑은 403(Cloudflare 차단) 확인됨 — 시도하지 않는다. +KRX 공매도 잔고(data.krx.co.kr)는 OTP 세션 필요(LOGOUT 응답) — 시도하지 않는다. +이미 GAS(gdc_01_fetch_fundamentals.gs/gas_event_calendar.gs)에서 수집 중인 +외국인/기관 수급·실적발표 일정·경제지표 일정은 보유종목에 대해서는 account_snapshot/ +GatherTradingData.xlsx에서 재사용하고, 이 스크립트는 그 시트에 없는 위성 후보군 +티커를 평가할 때만 직접 호출한다(중복 수집 금지). +""" +from __future__ import annotations + +import argparse +import datetime as dt +import json +import sys +from pathlib import Path +from typing import Any + +import requests +from bs4 import BeautifulSoup + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0 Safari/537.36" +NAVER_REFERER = "https://finance.naver.com/" + + +def _session() -> requests.Session: + s = requests.Session() + s.headers.update({ + "User-Agent": USER_AGENT, + "Referer": NAVER_REFERER, + "Accept-Language": "ko-KR,ko;q=0.9,en;q=0.8", + }) + return s + + +def _num(text: str) -> float: + cleaned = text.replace(",", "").replace("+", "").strip() + try: + return float(cleaned) + except ValueError: + return 0.0 + + +def fetch_price_history(session: requests.Session, code: str, pages: int = 3) -> dict[str, Any]: + """일별 [date, close, change, open, high, low, volume] 최신순. 페이지당 10행.""" + rows: list[dict[str, Any]] = [] + for page in range(1, pages + 1): + url = f"https://finance.naver.com/item/sise_day.naver?code={code}&page={page}" + resp = session.get(url, timeout=10) + resp.encoding = "euc-kr" + soup = BeautifulSoup(resp.text, "html.parser") + table = soup.find("table", {"class": "type2"}) + if table is None: + break + for tr in table.find_all("tr"): + cells = [td.get_text(strip=True) for td in tr.find_all("td")] + if len(cells) != 7 or not cells[0]: + continue + rows.append({ + "date": cells[0].replace(".", "-"), + "close": _num(cells[1]), + "open": _num(cells[3]), + "high": _num(cells[4]), + "low": _num(cells[5]), + "volume": _num(cells[6]), + }) + if not rows: + return {"status": "DATA_MISSING", "rows": [], "source_url": NAVER_REFERER} + return { + "status": "OK", + "rows": rows, + "source_url": f"https://finance.naver.com/item/sise_day.naver?code={code}", + "source_as_of": dt.datetime.now(dt.timezone(dt.timedelta(hours=9))).isoformat(), + } + + +def fetch_foreign_institution_flow(session: requests.Session, code: str, pages: int = 2) -> dict[str, Any]: + """외국인/기관 5일·20일 수급. tds: [date, close, change, ret_pct, volume, inst, frgn, frgn_ratio].""" + rows: list[dict[str, Any]] = [] + for page in range(1, pages + 1): + url = f"https://finance.naver.com/item/frgn.naver?code={code}&page={page}" + resp = session.get(url, timeout=10) + resp.encoding = "euc-kr" + soup = BeautifulSoup(resp.text, "html.parser") + for table in soup.find_all("table", {"class": "type2"}): + for tr in table.find_all("tr"): + cells = [td.get_text(strip=True) for td in tr.find_all("td")] + if len(cells) < 8 or not cells[0] or "." not in cells[0]: + continue + rows.append({ + "date": cells[0].replace(".", "-"), + "close": _num(cells[1]), + "inst_net": _num(cells[5]), + "frgn_net": _num(cells[6]), + }) + if not rows: + return {"status": "DATA_MISSING", "rows": []} + return { + "status": "OK", + "rows": rows, + "source_url": f"https://finance.naver.com/item/frgn.naver?code={code}", + "source_as_of": dt.datetime.now(dt.timezone(dt.timedelta(hours=9))).isoformat(), + } + + +def compute_relative_return_20d(stock_rows: list[dict[str, Any]], benchmark_rows: list[dict[str, Any]]) -> float | None: + """종목수익률(최신 vs 20거래일전) - 벤치마크(섹터ETF/KOSPI)수익률, %p.""" + def _ret(rows: list[dict[str, Any]]) -> float | None: + closes = [r["close"] for r in rows if r.get("close")] + if len(closes) < 2: + return None + recent, past = closes[0], closes[min(len(closes) - 1, 19)] + if not past: + return None + return (recent / past - 1.0) * 100.0 + + stock_ret = _ret(stock_rows) + bench_ret = _ret(benchmark_rows) + if stock_ret is None or bench_ret is None: + return None + return round(stock_ret - bench_ret, 4) + + +def compute_volume_ratio_5d(rows: list[dict[str, Any]]) -> float | None: + """오늘 거래량 / 직전 5일 평균거래량.""" + volumes = [r["volume"] for r in rows if r.get("volume")] + if len(volumes) < 6: + return None + today_vol = volumes[0] + avg5 = sum(volumes[1:6]) / 5.0 + if avg5 <= 0: + return None + return round(today_vol / avg5, 4) + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--code", required=True, help="6자리 종목코드") + ap.add_argument("--benchmark-code", default="069500", help="비교 벤치마크 코드(기본 KODEX200 069500)") + args = ap.parse_args() + + session = _session() + price = fetch_price_history(session, args.code) + benchmark = fetch_price_history(session, args.benchmark_code) + flow = fetch_foreign_institution_flow(session, args.code) + + result = { + "code": args.code, + "price_history": price, + "foreign_institution_flow": flow, + "relative_return_20d": compute_relative_return_20d(price.get("rows", []), benchmark.get("rows", [])), + "volume_ratio_5d": compute_volume_ratio_5d(price.get("rows", [])), + } + print(json.dumps(result, ensure_ascii=False, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/fetch_trade_statistics_motie_v1.py b/tools/fetch_trade_statistics_motie_v1.py new file mode 100644 index 0000000..810754d --- /dev/null +++ b/tools/fetch_trade_statistics_motie_v1.py @@ -0,0 +1,186 @@ +"""관세청/산업통상부 수출입동향 → 섹터별 수출 추세(sector_export_trend) 산출기. + +실측 결과(2026-06-21 세션): investing.com 직접 스크래핑은 403(Cloudflare)으로 차단되고, +관세청·산업통상부는 실시간 무인증 JSON API를 공개하지 않는다(통계청/관세청 수출입통계는 +data.go.kr 공공데이터포털의 서비스키 기반 OpenAPI 또는 매월 발표되는 보도자료 첨부 +XLSX/CSV로만 배포). 따라서 이 모듈은 두 경로를 모두 지원한다: + + 1) API 경로 — data.go.kr 관세청 수출입통계 API. CUSTOMS_API_KEY 환경변수(또는 + --api-key) 필요. 키가 없거나 호출 실패 시 추정하지 않고 DATA_MISSING 반환. + 2) CSV 경로(권장, 안정적) — 관세청 수출입무역통계(https://unipass.customs.go.kr/ets/) + 또는 산업통상부 보도자료에서 사용자가 다운로드한 월별 HS코드별 수출입 CSV를 + --csv 인자로 입력. 이 경로가 실패할 일이 없어 1차 권장 경로다. + +산출물 sector_export_trend(%, MoM 또는 YoY)는 qualitative_sell_strategy_v1의 +fundamental_trajectory 보강 입력 및 compute_satellite_candidate_score의 1차 팩터로 쓰인다. +""" +from __future__ import annotations + +import argparse +import csv +import json +import os +import sys +from collections import defaultdict +from pathlib import Path +from typing import Any + +import requests + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +# 섹터 → HS코드 prefix(2~4자리). 위성종목 추천/매도판단에 쓰는 핵심 수출 섹터만 우선 등록. +SECTOR_HS_MAP: dict[str, tuple[str, ...]] = { + "반도체": ("8541", "8542"), + "자동차": ("8701", "8702", "8703", "8704"), + "2차전지": ("8507",), + "조선": ("8901", "8902", "8905"), + "철강": ("72",), + "석유화학": ("29", "39"), + "디스플레이": ("8524", "9013"), + "기계": ("84",), + "바이오": ("30",), # universe.Sector 실측 라벨이 "바이오"(헬스 접미사 없음) — 그대로 매칭 + "방산": ("93",), # 무기류·탄약(HS Ch.93) — 현대로템 등 보유종목 K-방산 테마 대응 +} + +CUSTOMS_API_BASE = "https://apis.data.go.kr/1220000/nitemtrade/getNitemtradeList" + + +def fetch_customs_trade_api( + session: requests.Session, + api_key: str | None, + hs_code: str, + start_ym: str, + end_ym: str, +) -> dict[str, Any]: + """data.go.kr 관세청 수출입통계 API 호출. 키 없거나 실패 시 DATA_MISSING(추정 금지).""" + if not api_key: + return {"status": "DATA_MISSING", "note": "CUSTOMS_API_KEY 미설정 — --csv 경로 사용 권장"} + try: + resp = session.get( + CUSTOMS_API_BASE, + params={ + "serviceKey": api_key, + "strtYymm": start_ym, + "endYymm": end_ym, + "hsSgn": hs_code, + "type": "json", + }, + timeout=15, + ) + resp.raise_for_status() + data = resp.json() + except Exception as exc: # noqa: BLE001 — 외부 API 실패는 광범위하게 잡아 DATA_MISSING 처리 + return {"status": "API_ERROR", "note": str(exc)} + return {"status": "OK", "raw": data, "source_url": CUSTOMS_API_BASE} + + +def load_trade_statistics_csv(path: Path) -> list[dict[str, Any]]: + """관세청/산업통상부 배포 CSV. 컬럼: 기간(YYYYMM), HS코드, 수출액(달러), 수입액(달러). + + 헤더명은 배포처마다 다를 수 있어 한글/영문 별칭을 모두 허용한다. + """ + alias = { + "기간": "period", "year_month": "period", "period": "period", + "hs코드": "hs_code", "hs_code": "hs_code", "hscode": "hs_code", + "수출액": "export_usd", "export": "export_usd", "export_usd": "export_usd", + "수입액": "import_usd", "import": "import_usd", "import_usd": "import_usd", + } + rows: list[dict[str, Any]] = [] + with path.open(encoding="utf-8-sig", newline="") as f: + reader = csv.DictReader(f) + for raw_row in reader: + row: dict[str, Any] = {} + for key, value in raw_row.items(): + norm_key = alias.get(str(key).strip().lower()) + if norm_key: + row[norm_key] = value + if {"period", "hs_code"}.issubset(row): + for money_field in ("export_usd", "import_usd"): + if money_field in row: + try: + row[money_field] = float(str(row[money_field]).replace(",", "")) + except ValueError: + row[money_field] = 0.0 + rows.append(row) + return rows + + +def compute_sector_export_trend( + rows: list[dict[str, Any]], + sector: str, + compare: str = "yoy", +) -> dict[str, Any]: + """sector_export_trend(%) = 최신월 수출액 / 비교월 수출액 - 1. + + compare="yoy": 12개월 전 동월 대비. compare="mom": 직전월 대비. + 데이터 부족 시 추정하지 않고 DATA_MISSING. + """ + hs_prefixes = SECTOR_HS_MAP.get(sector) + if not hs_prefixes: + return {"status": "UNKNOWN_SECTOR", "sector": sector, "known_sectors": list(SECTOR_HS_MAP)} + + by_period: dict[str, float] = defaultdict(float) + for row in rows: + hs_code = str(row.get("hs_code") or "") + if any(hs_code.startswith(prefix) for prefix in hs_prefixes): + period = str(row.get("period") or "") + by_period[period] += float(row.get("export_usd") or 0.0) + + if len(by_period) < 2: + return {"status": "DATA_MISSING", "sector": sector, "note": "기간별 수출액 표본 부족"} + + periods_sorted = sorted(by_period) + latest_period = periods_sorted[-1] + latest_value = by_period[latest_period] + + if compare == "mom": + compare_period = periods_sorted[-2] + else: + latest_ym = int(latest_period) + target_ym = latest_ym - 100 # YYYYMM에서 12개월 전 = -100 + compare_period = str(target_ym) + if compare_period not in by_period: + return {"status": "DATA_MISSING", "sector": sector, "note": f"YoY 비교월({compare_period}) 데이터 없음 — MoM으로 재시도 권장"} + + compare_value = by_period.get(compare_period, 0.0) + if compare_value <= 0: + return {"status": "DATA_MISSING", "sector": sector, "note": "비교월 수출액이 0 이하"} + + trend_pct = round((latest_value / compare_value - 1.0) * 100.0, 4) + return { + "status": "OK", + "sector": sector, + "compare": compare, + "latest_period": latest_period, + "compare_period": compare_period, + "sector_export_trend": trend_pct, + } + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--csv", type=Path, help="관세청/산업통상부 배포 수출입 CSV 경로(권장 경로)") + ap.add_argument("--sector", default="반도체", choices=list(SECTOR_HS_MAP)) + ap.add_argument("--compare", default="yoy", choices=["yoy", "mom"]) + ap.add_argument("--api-key", default=os.environ.get("CUSTOMS_API_KEY")) + ap.add_argument("--hs-code", default="", help="API 경로 사용 시 HS코드") + ap.add_argument("--start-ym", default="") + ap.add_argument("--end-ym", default="") + args = ap.parse_args() + + if args.csv: + rows = load_trade_statistics_csv(args.csv) + result = compute_sector_export_trend(rows, args.sector, args.compare) + else: + session = requests.Session() + result = fetch_customs_trade_api(session, args.api_key, args.hs_code, args.start_ym, args.end_ym) + + print(json.dumps(result, ensure_ascii=False, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/validate_qualitative_sell_strategy_pipeline_v1.py b/tools/validate_qualitative_sell_strategy_pipeline_v1.py new file mode 100644 index 0000000..babd5fc --- /dev/null +++ b/tools/validate_qualitative_sell_strategy_pipeline_v1.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import json +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] + + +def _read(path: Path) -> str: + return path.read_text(encoding="utf-8", errors="replace") if path.exists() else "" + + +def main() -> int: + files = { + "workflow": ROOT / ".gitea" / "workflows" / "qualitative_sell_strategy.yml", + "build_inputs": ROOT / "tools" / "build_qualitative_sell_inputs_v1.py", + "build_satellite": ROOT / "tools" / "build_satellite_candidate_recommendations_v1.py", + "evaluate": ROOT / "tools" / "evaluate_qualitative_sell_strategy_accuracy_v1.py", + "store": ROOT / "src" / "quant_engine" / "qualitative_sell_strategy_store_v1.py", + "package": ROOT / "package.json", + } + errors: list[str] = [] + + for name, path in files.items(): + if not path.exists(): + errors.append(f"missing:{name}") + + checks = { + "build_inputs_flags": ("--store-backend" in _read(files["build_inputs"]) and "--store-location" in _read(files["build_inputs"])), + "build_satellite_flags": ("--store-backend" in _read(files["build_satellite"]) and "--store-location" in _read(files["build_satellite"])), + "evaluate_flags": ("--store-backend" in _read(files["evaluate"]) and "--store-location" in _read(files["evaluate"])), + "store_contract": ("resolve_store_path" in _read(files["store"]) and "QualitativeSellStoreSpec" in _read(files["store"])), + "workflow_mentions_mock_validation": ("validate_kis_api_credentials_v1.py" in _read(files["workflow"])), + "workflow_has_schedule": ("schedule:" in _read(files["workflow"]) and "workflow_dispatch:" in _read(files["workflow"])), + "package_scripts": ("ops:sell-build" in _read(files["package"]) and "ops:sell-eval" in _read(files["package"]) and "ops:sell-validate" in _read(files["package"])), + } + + for key, ok in checks.items(): + if not ok: + errors.append(key) + + result = { + "formula_id": "QUALITATIVE_SELL_STRATEGY_PIPELINE_V1", + "gate": "PASS" if not errors else "FAIL", + "checks": checks, + "errors": errors, + } + out = ROOT / "Temp" / "qualitative_sell_strategy_pipeline_v1.json" + out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") + print(json.dumps(result, ensure_ascii=False, indent=2)) + return 0 if not errors else 1 + + +if __name__ == "__main__": + raise SystemExit(main())