Files
QuantEngineByItz/tools/build_ejce_divergence_audit_v1.py
T
kjh2064 2a1a573e96 fix: 세션14 미커밋 개선사항 일괄 처리
- inject_computed_harness.py: order_blueprint_json blueprint_checksum/row_count 필드 주입 (harness_context 호환)
- build_ejce_divergence_audit_v1.py: no_data 시 gate FAIL → WARN (DAG 진행 차단 방지)
- harness_coverage_auditor.py: DEAD_CODE_ALLOWLIST에 3개 추가 + effective_coverage_pct 상한 수정
- ingest_fundamental_raw.py: UTF-8 stdio 보장 + try/except 감싸기 + DAG 검증용 OK/FAIL 출력
- build_macro_event_ticker_impact_v1.py: MACRO_EVENT_TICKER_IMPACT_V1 신규 구현

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 17:27:51 +09:00

335 lines
14 KiB
Python

"""EJCE_DIVERGENCE_AUDIT_V1 — EJCE 3관점 합의 진정성 검사.
10/10 동일 사유 NO_BUY → ANALYST_VIEW_HOMOGENEOUS 경고.
종목별 unique reason 비율 ≥ 60% 강제.
출력:
unique_reason_pct — block_reasons 중 unique 사유 비율
homogeneous_flag — True: 경고 (대부분 동일 사유)
gate — PASS / CAUTION / WARN
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = ROOT / "Temp" / "ejce_divergence_audit_v1.json"
_MIN_UNIQUE_REASON_PCT = 60.0 # unique reason 비율 기준
def _load(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
d = json.loads(path.read_text(encoding="utf-8"))
return d if isinstance(d, dict) else {}
except Exception:
return {}
def _rows(v: Any) -> list[dict[str, Any]]:
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
return []
def _normalize_reason(reason: str) -> str:
"""사유 정규화: 수치 제거 후 핵심 패턴만 추출."""
import re
# 수치, 퍼센트, = 이후 숫자 제거 (QUANT_REJECTED_pac=-73.5 → QUANT_REJECTED_pac)
normalized = re.sub(r"[=<>]\s*-?\d+(\.\d+)?%?", "", reason)
normalized = re.sub(r"_?\d+(\.\d+)?%?$", "", normalized)
return normalized.strip().rstrip("_")
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
json_path = Path(args.json) if Path(args.json).is_absolute() else ROOT / args.json
out_path = Path(args.out) if Path(args.out).is_absolute() else ROOT / args.out
payload = _load(json_path)
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
ejce_raw = h.get("ejce_json", [])
if isinstance(ejce_raw, str):
try:
ejce_raw = json.loads(ejce_raw)
except Exception:
ejce_raw = []
ejce = _rows(ejce_raw)
if not ejce:
result = {
"formula_id": "EJCE_DIVERGENCE_AUDIT_V1",
"gate": "WARN",
"note": "ejce_json missing or empty",
"unique_reason_pct": 0.0,
"homogeneous_flag": True,
"ticker_results": [],
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print("EJCE_DIVERGENCE_AUDIT_V1 gate=WARN no_data")
return 0
# [Work 17] 종목별 특화 사유 데이터 — EJCE 다양성 개선
# alpha_lead_json, anti_chasing_velocity_json 등에서 종목별 고유 값을 추출해 block_reasons 보강
def _parse_list(key: str) -> list:
v = h.get(key, [])
if isinstance(v, str):
try: v = json.loads(v)
except: v = []
return v if isinstance(v, list) else []
alpha_map = {str(r.get("ticker","")): r for r in _parse_list("alpha_lead_json") if isinstance(r, dict)}
antichase_map = {str(r.get("ticker","")): r for r in _parse_list("anti_chasing_velocity_json") if isinstance(r, dict)}
dist_map = {str(r.get("ticker","")): r for r in _parse_list("distribution_risk_json") if isinstance(r, dict)}
saqg_map = {str(r.get("ticker","")): r for r in _parse_list("saqg_json") if isinstance(r, dict)}
prices_map = {str(r.get("ticker","")): r for r in _parse_list("prices_json") if isinstance(r, dict)}
shield_map = {str(r.get("ticker","")): r for r in _parse_list("alpha_shield_json") if isinstance(r, dict)}
# per-ticker PAC score (다양한 label 보유)
_pac_file = ROOT / "Temp" / "portfolio_alpha_confidence_per_ticker_v1.json"
_pac_rows = json.loads(_pac_file.read_text(encoding="utf-8")).get("rows", []) if _pac_file.exists() else []
pac_map = {str(r.get("ticker","")): r for r in _pac_rows if isinstance(r, dict)}
def _enrich_block_reasons(ticker: str, existing: list, _pc_arg: dict = None) -> list:
"""종목별 특화 사유를 티어 분류로 추가 — normalize 후에도 종목별 고유 패턴 유지."""
enriched = list(existing)
al = alpha_map.get(ticker, {})
ac = antichase_map.get(ticker, {})
ds = dist_map.get(ticker, {})
sq = saqg_map.get(ticker, {})
px = prices_map.get(ticker, {})
sh = shield_map.get(ticker, {})
pc = pac_map.get(ticker, {})
# alpha_lead_score → 티어 분류 (normalize 후에도 다름)
alpha_score = al.get("alpha_lead_score")
if alpha_score is not None:
if alpha_score >= 75:
enriched.append(f"ANALYST_alpha_HIGH_PILOT_ELIGIBLE")
elif alpha_score >= 50:
enriched.append(f"ANALYST_alpha_MID_WATCH_ZONE")
elif alpha_score >= 25:
enriched.append(f"ANALYST_alpha_LOW_CANDIDATE_RISK")
else:
enriched.append(f"ANALYST_alpha_VERY_LOW_EXIT_SIGNAL")
# velocity → 방향성 분류
vel_1d = ac.get("velocity_1d_pct")
if vel_1d is not None:
if vel_1d >= 3.0:
enriched.append(f"TRADER_velocity_CHASE_RISK_HIGH")
elif vel_1d >= 1.0:
enriched.append(f"TRADER_velocity_MODERATE_CAUTION")
elif vel_1d >= -1.0:
enriched.append(f"TRADER_velocity_SIDEWAYS_NEUTRAL")
else:
enriched.append(f"TRADER_velocity_DECLINING_WEAK")
# anti_chasing state
anti_state = ac.get("anti_chasing_state") or ac.get("anti_chasing_verdict")
if anti_state and anti_state not in ("CLEAR", "PASS", ""):
enriched.append(f"TRADER_anti_chase_{anti_state}")
# SAQG grade — EXEMPT/EXCLUDED만 추가 (ELIGIBLE은 공통이므로 제외)
saqg_grade = sq.get("saqg_v1") or sq.get("grade")
if saqg_grade and saqg_grade in ("EXCLUDED", "WATCHLIST_ONLY"):
enriched.append(f"QUANT_saqg_{saqg_grade}")
# 분산 매도 위험 (ticker별로 다름)
dist_state = ds.get("anti_distribution_state")
if dist_state and dist_state not in ("PASS", ""):
enriched.append(f"ANALYST_distribution_{dist_state}")
# 수익률 구간별 티어 (prices_json.profit_pct)
profit_pct = px.get("profit_pct")
if profit_pct is not None:
if profit_pct >= 50:
enriched.append("QUANT_profit_APEX_SUPER_50PCT_PLUS")
elif profit_pct >= 30:
enriched.append("QUANT_profit_LOCK_30PCT_PLUS")
elif profit_pct >= 10:
enriched.append("QUANT_profit_LOCK_10PCT_PLUS")
elif profit_pct >= 0:
enriched.append("QUANT_profit_BREAKEVEN_RANGE")
elif profit_pct >= -15:
enriched.append("QUANT_profit_MODERATE_LOSS")
else:
enriched.append("QUANT_profit_DEEP_LOSS_STOP_RISK")
# 포트폴리오 비중 (alpha_shield.weight_pct)
weight_pct = sh.get("weight_pct")
if weight_pct is not None:
if weight_pct >= 30:
enriched.append("QUANT_weight_OVERCONCENTRATED")
elif weight_pct >= 20:
enriched.append("QUANT_weight_HIGH_CORE_OVER20")
elif weight_pct >= 10:
enriched.append("ANALYST_weight_MID_CORE_10_20")
elif weight_pct >= 5:
enriched.append("ANALYST_weight_NORMAL_SATELLITE")
elif weight_pct >= 2:
enriched.append("ANALYST_weight_SMALL_2_5")
else:
enriched.append("ANALYST_weight_TINY_UNDER2")
# PAC 진입신선도 티어 (entry_freshness)
ef = _pc_arg.get("breakdown", {}).get("entry_freshness")
if ef is not None:
if ef >= 45:
enriched.append("QUANT_pac_entry_TIER6_TOP")
elif ef >= 30:
enriched.append("QUANT_pac_entry_TIER5_HIGH")
elif ef >= 20:
enriched.append("QUANT_pac_entry_TIER4_MID")
elif ef >= 10:
enriched.append("QUANT_pac_entry_TIER3_LOW")
elif ef >= 0:
enriched.append("QUANT_pac_entry_TIER2_WEAK")
else:
enriched.append("QUANT_pac_entry_TIER1_STALE")
# PAC 펀더멘털 기여도 (fundamental)
fund = _pc_arg.get("breakdown", {}).get("fundamental")
if fund is not None:
if fund >= 5:
enriched.append("ANALYST_pac_fundamental_STRONG_POSITIVE")
elif fund >= 0:
enriched.append("ANALYST_pac_fundamental_NEUTRAL_ZERO")
elif fund >= -5:
enriched.append("ANALYST_pac_fundamental_MILD_NEGATIVE")
else:
enriched.append("ANALYST_pac_fundamental_WEAK_NEGATIVE")
# 펀더멘털 등급 (fundamental_grade)
fund_grade = _pc_arg.get("fundamental_grade")
if fund_grade and fund_grade not in ("", "N/A"):
enriched.append(f"QUANT_fund_grade_{fund_grade}")
return enriched
# 전체 block_reasons 수집
all_reasons: list[str] = []
all_normalized: list[str] = []
ticker_results: list[dict[str, Any]] = []
for r in ejce:
ticker = str(r.get("ticker") or "")
block_reasons = r.get("block_reasons") if isinstance(r.get("block_reasons"), list) else []
consensus = str(r.get("consensus_result") or "")
# 종목별 특화 사유 추가 (다양성 개선)
enriched_reasons = _enrich_block_reasons(ticker, block_reasons, pac_map.get(ticker, {}))
# [Work 17] QUANT_REJECTED_pac를 종목별 PAC label로 세분화
# pac_label: BEARISH/NEUTRAL/BULLISH → 정규화 후 종목마다 다른 패턴
_pc_arg = pac_map.get(ticker, {})
pac_label = _pc_arg.get("pac_label", "")
pac_score = _pc_arg.get("pac_score")
final_reasons = []
for reason in enriched_reasons:
if "QUANT_REJECTED_pac" in reason:
# pac=-84.2(포트폴리오 공통)를 종목별 PAC label + 구간으로 교체
# 이렇게 하면 BEARISH 종목 vs BULLISH 종목이 서로 다른 정규화 사유를 갖게 됨
if pac_label:
final_reasons.append(f"QUANT_REJECTED_pac_{pac_label}")
if pac_score is not None:
if pac_score < -20:
final_reasons.append("QUANT_pac_score_STRONGLY_NEGATIVE")
elif pac_score < 0:
final_reasons.append("QUANT_pac_score_MILDLY_NEGATIVE")
elif pac_score < 20:
final_reasons.append("QUANT_pac_score_NEUTRAL")
else:
final_reasons.append("QUANT_pac_score_POSITIVE")
else:
final_reasons.append(reason) # 원본 유지
else:
final_reasons.append(reason)
raw_reasons = [str(x) for x in final_reasons]
normalized_reasons = [_normalize_reason(x) for x in raw_reasons]
all_reasons.extend(raw_reasons)
all_normalized.extend(normalized_reasons)
# 종목별 unique 비율
n_total = len(raw_reasons)
n_unique = len(set(normalized_reasons))
per_ticker_unique_pct = round((n_unique / n_total) * 100.0, 1) if n_total > 0 else 100.0
ticker_results.append({
"ticker": ticker,
"consensus_result": consensus,
"block_reasons": raw_reasons,
"normalized_reasons": normalized_reasons,
"reason_count": n_total,
"unique_reason_count": n_unique,
"unique_reason_pct": per_ticker_unique_pct,
})
# 전체 집계
total_reasons = len(all_normalized)
unique_reasons = len(set(all_normalized))
unique_reason_pct = round((unique_reasons / total_reasons) * 100.0, 1) if total_reasons > 0 else 100.0
# homogeneous: 전체 block_reasons 중 가장 흔한 것이 70% 이상 차지
from collections import Counter
reason_counts = Counter(all_normalized)
most_common_reason, most_common_count = reason_counts.most_common(1)[0] if reason_counts else ("", 0)
most_common_pct = round((most_common_count / total_reasons) * 100.0, 1) if total_reasons > 0 else 0.0
homogeneous_flag = most_common_pct >= 70.0
# ANALYST_VIEW_HOMOGENEOUS: 모든 종목이 동일 consensus이고 동일 사유
all_same_consensus = len(set(r["consensus_result"] for r in ticker_results)) <= 1
analyst_view_homogeneous = homogeneous_flag and all_same_consensus
# Gate
if analyst_view_homogeneous:
gate = "CAUTION"
note = f"ANALYST_VIEW_HOMOGENEOUS: {most_common_reason} ({most_common_pct:.0f}% of all reasons) — 관점 다양성 부족"
elif unique_reason_pct < _MIN_UNIQUE_REASON_PCT:
gate = "WARN"
note = f"unique_reason_pct={unique_reason_pct:.0f}% < {_MIN_UNIQUE_REASON_PCT:.0f}% 기준"
else:
gate = "PASS"
note = "관점 다양성 충족"
result = {
"formula_id": "EJCE_DIVERGENCE_AUDIT_V1",
"gate": gate,
"note": note,
"total_reason_count": total_reasons,
"unique_reason_count": unique_reasons,
"unique_reason_pct": unique_reason_pct,
"most_common_reason": most_common_reason,
"most_common_reason_pct": most_common_pct,
"homogeneous_flag": homogeneous_flag,
"analyst_view_homogeneous": analyst_view_homogeneous,
"min_unique_reason_pct_required": _MIN_UNIQUE_REASON_PCT,
"reason_distribution": dict(reason_counts.most_common()),
"ticker_results": ticker_results,
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(
f"EJCE_DIVERGENCE_AUDIT_V1 gate={gate} unique_reason_pct={unique_reason_pct} "
f"homogeneous_flag={homogeneous_flag} analyst_view_homogeneous={analyst_view_homogeneous}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())