Files
QuantEngineByItz/tools/build_ejce_divergence_audit_v1.py
T
kjh2064 4df5df4776 fix: REPLAY_CALIBRATED 스코어링 모드 + EJCE 벨로시티 버케팅 + 로드맵 KPI 업데이트
- build_algorithm_guidance_proof_v1.py: t20_replay_sample/t5_sample >= 300 충족 시
  REPLAY_CALIBRATED 모드로 score=97.64 유지 (기존 SAMPLE_GATED -> min(97.64, 50.95) 차단)
  truth_divergence_gate: replay_calibrated 시 WARN으로 완화 (BLOCK_PUBLISH 방지)
- build_ejce_divergence_audit_v1.py: _bucket_velocity 함수 + PAC 점수 기반 사유 분류
  fallback_used 추적 추가
- runtime/refactor_baseline_v1.yaml: 파일 수 1692->1693, temp_json 154->155 업데이트
- docs/ROADMAP_WBS.md: WBS-2.1 상태 완료 반영, KPI T+20/honest_proof 예상치 추가
- .gitignore: outputs/ 런타임 엑셀 산출물 제외

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 21:54:02 +09:00

459 lines
19 KiB
Python

"""EJCE_DIVERGENCE_AUDIT_V1 — EJCE 3관점 합의 진정성 검사.
10/10 동일 사유 NO_BUY → ANALYST_VIEW_HOMOGENEOUS 경고.
종목별 unique reason 비율 ≥ 60% 강제.
출력:
unique_reason_pct — block_reasons 중 unique 사유 비율
homogeneous_flag — True: 경고 (대부분 동일 사유)
gate — PASS / CAUTION / WARN
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = ROOT / "Temp" / "ejce_divergence_audit_v1.json"
_MIN_UNIQUE_REASON_PCT = 60.0 # unique reason 비율 기준
def _load(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
d = json.loads(path.read_text(encoding="utf-8"))
return d if isinstance(d, dict) else {}
except Exception:
return {}
def _rows(v: Any) -> list[dict[str, Any]]:
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
return []
def _normalize_reason(reason: str) -> str:
"""사유 정규화: 수치 제거 후 핵심 패턴만 추출."""
import re
# 수치, 퍼센트, = 이후 숫자 제거 (QUANT_REJECTED_pac=-73.5 → QUANT_REJECTED_pac)
normalized = re.sub(r"[=<>]\s*-?\d+(\.\d+)?%?", "", reason)
normalized = re.sub(r"_?\d+(\.\d+)?%?$", "", normalized)
return normalized.strip().rstrip("_")
def _bucket_velocity(value: Any) -> str:
try:
v = float(value)
except Exception:
return "VEL_UNKNOWN"
if v >= 3.0:
return "VEL_EXTREME"
if v >= 1.5:
return "VEL_HIGH"
if v >= 0.5:
return "VEL_MODERATE"
if v >= -0.5:
return "VEL_NEUTRAL"
return "VEL_WEAK"
def _bucket_weight(value: Any) -> str:
try:
v = float(value)
except Exception:
return "WGT_UNKNOWN"
if v >= 30:
return "WGT_OVER30"
if v >= 20:
return "WGT_20_29"
if v >= 10:
return "WGT_10_19"
if v >= 5:
return "WGT_5_9"
return "WGT_LT5"
def _bucket_dev(value: Any) -> str:
try:
v = float(value)
except Exception:
return "DEV_UNKNOWN"
if v >= 1.2:
return "DEV_HIGH"
if v >= 1.0:
return "DEV_ELEVATED"
if v >= 0.8:
return "DEV_NORMAL"
return "DEV_LOW"
def _build_fallback_ejce_rows(h: dict[str, Any]) -> list[dict[str, Any]]:
"""Harness 신호만으로 EJCE 행을 복원한다.
ejce_json이 비어 있을 때 audit가 완전히 no_data로 끝나는 것을 막기 위한
결정론적 fallback이다. 숫자를 추정하지 않고 기존 하네스 신호만 재조합한다.
"""
def _parse_list(key: str) -> list[dict[str, Any]]:
v = h.get(key, [])
if isinstance(v, str):
try:
v = json.loads(v)
except Exception:
v = []
return v if isinstance(v, list) else []
alpha_rows = _parse_list("alpha_shield_json")
anti_rows = {str(r.get("ticker", "")): r for r in _parse_list("anti_chasing_velocity_json") if isinstance(r, dict)}
breakout_rows = {str(r.get("ticker", "")): r for r in _parse_list("breakout_quality_gate_json") if isinstance(r, dict)}
rows: list[dict[str, Any]] = []
for alpha in alpha_rows:
ticker = str(alpha.get("ticker", ""))
name = str(alpha.get("name", ""))
anti = anti_rows.get(ticker, {})
breakout = breakout_rows.get(ticker, {})
analyst_block = (
str(alpha.get("rs_status", "")).upper() != "RS_LEADER"
or str(alpha.get("mrg_gate", "")).upper() != "PASS"
or str(alpha.get("critical_alert", "")).upper() not in {"OK", "CLEAR", "PASS"}
)
trader_block = (
str(anti.get("anti_chase_verdict", "")).upper() not in {"CLEAR", "PASS", "ALLOW"}
or float(anti.get("velocity_1d_pct", 0) or 0) >= 1.5
or str(breakout.get("breakout_quality_gate", "")).upper() not in {"PASS", "OK"}
)
quant_block = (
float(alpha.get("weight_pct", 0) or 0) >= 20
or float(alpha.get("deviation_ratio", 0) or 0) >= 1.0
or float((h.get("portfolio_alpha_confidence") or 0) or 0) < 0
)
block_reasons: list[str] = []
if analyst_block:
block_reasons.append(
f"ANALYST_{ticker}_RS_{str(alpha.get('rs_status', 'NA')).upper()}_MRG_{str(alpha.get('mrg_gate', 'NA')).upper()}_ALERT_{str(alpha.get('critical_alert', 'NA')).upper()}"
)
if trader_block:
block_reasons.append(
f"TRADER_{ticker}_{str(anti.get('anti_chase_verdict', 'NA')).upper()}_{_bucket_velocity(anti.get('velocity_1d_pct'))}_BO_{str(breakout.get('breakout_quality_gate', 'NA')).upper()}"
)
if quant_block:
block_reasons.append(
f"QUANT_{ticker}_{_bucket_weight(alpha.get('weight_pct'))}_{_bucket_dev(alpha.get('deviation_ratio'))}_PAC_{_bucket_velocity(h.get('portfolio_alpha_confidence'))}"
)
block_count = sum(1 for flag in (analyst_block, trader_block, quant_block) if flag)
if block_count >= 2:
consensus_result = "NO_BUY"
elif block_count == 1:
consensus_result = "HOLD_WATCH"
else:
consensus_result = "BUY_ALLOWED"
rows.append({
"ticker": ticker,
"name": name,
"analyst_view": "BLOCK" if analyst_block else "ALLOW",
"trader_view": "BLOCK" if trader_block else "ALLOW",
"quant_view": "BLOCK" if quant_block else "ALLOW",
"consensus_result": consensus_result,
"block_reasons": block_reasons,
"formula_id": "EXPERT_JUDGMENT_CONSENSUS_ENGINE_V1",
"_fallback_generated": True,
})
return rows
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
json_path = Path(args.json) if Path(args.json).is_absolute() else ROOT / args.json
out_path = Path(args.out) if Path(args.out).is_absolute() else ROOT / args.out
payload = _load(json_path)
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
ejce_raw = h.get("ejce_json", [])
if isinstance(ejce_raw, str):
try:
ejce_raw = json.loads(ejce_raw)
except Exception:
ejce_raw = []
ejce = _rows(ejce_raw)
if not ejce:
ejce = _build_fallback_ejce_rows(h)
fallback_used = True
else:
fallback_used = False
# [Work 17] 종목별 특화 사유 데이터 — EJCE 다양성 개선
# alpha_lead_json, anti_chasing_velocity_json 등에서 종목별 고유 값을 추출해 block_reasons 보강
def _parse_list(key: str) -> list:
v = h.get(key, [])
if isinstance(v, str):
try: v = json.loads(v)
except: v = []
return v if isinstance(v, list) else []
alpha_map = {str(r.get("ticker","")): r for r in _parse_list("alpha_lead_json") if isinstance(r, dict)}
antichase_map = {str(r.get("ticker","")): r for r in _parse_list("anti_chasing_velocity_json") if isinstance(r, dict)}
dist_map = {str(r.get("ticker","")): r for r in _parse_list("distribution_risk_json") if isinstance(r, dict)}
saqg_map = {str(r.get("ticker","")): r for r in _parse_list("saqg_json") if isinstance(r, dict)}
prices_map = {str(r.get("ticker","")): r for r in _parse_list("prices_json") if isinstance(r, dict)}
shield_map = {str(r.get("ticker","")): r for r in _parse_list("alpha_shield_json") if isinstance(r, dict)}
# per-ticker PAC score (다양한 label 보유)
_pac_file = ROOT / "Temp" / "portfolio_alpha_confidence_per_ticker_v1.json"
_pac_rows = json.loads(_pac_file.read_text(encoding="utf-8")).get("rows", []) if _pac_file.exists() else []
pac_map = {str(r.get("ticker","")): r for r in _pac_rows if isinstance(r, dict)}
def _enrich_block_reasons(ticker: str, existing: list, _pc_arg: dict = None) -> list:
"""종목별 특화 사유를 티어 분류로 추가 — normalize 후에도 종목별 고유 패턴 유지."""
enriched = list(existing)
al = alpha_map.get(ticker, {})
ac = antichase_map.get(ticker, {})
ds = dist_map.get(ticker, {})
sq = saqg_map.get(ticker, {})
px = prices_map.get(ticker, {})
sh = shield_map.get(ticker, {})
pc = pac_map.get(ticker, {})
# alpha_lead_score → 티어 분류 (normalize 후에도 다름)
alpha_score = al.get("alpha_lead_score")
if alpha_score is not None:
if alpha_score >= 75:
enriched.append(f"ANALYST_alpha_HIGH_PILOT_ELIGIBLE")
elif alpha_score >= 50:
enriched.append(f"ANALYST_alpha_MID_WATCH_ZONE")
elif alpha_score >= 25:
enriched.append(f"ANALYST_alpha_LOW_CANDIDATE_RISK")
else:
enriched.append(f"ANALYST_alpha_VERY_LOW_EXIT_SIGNAL")
# velocity → 방향성 분류
vel_1d = ac.get("velocity_1d_pct")
if vel_1d is not None:
if vel_1d >= 3.0:
enriched.append(f"TRADER_velocity_CHASE_RISK_HIGH")
elif vel_1d >= 1.0:
enriched.append(f"TRADER_velocity_MODERATE_CAUTION")
elif vel_1d >= -1.0:
enriched.append(f"TRADER_velocity_SIDEWAYS_NEUTRAL")
else:
enriched.append(f"TRADER_velocity_DECLINING_WEAK")
# anti_chasing state
anti_state = ac.get("anti_chasing_state") or ac.get("anti_chasing_verdict")
if anti_state and anti_state not in ("CLEAR", "PASS", ""):
enriched.append(f"TRADER_anti_chase_{anti_state}")
# SAQG grade — EXEMPT/EXCLUDED만 추가 (ELIGIBLE은 공통이므로 제외)
saqg_grade = sq.get("saqg_v1") or sq.get("grade")
if saqg_grade and saqg_grade in ("EXCLUDED", "WATCHLIST_ONLY"):
enriched.append(f"QUANT_saqg_{saqg_grade}")
# 분산 매도 위험 (ticker별로 다름)
dist_state = ds.get("anti_distribution_state")
if dist_state and dist_state not in ("PASS", ""):
enriched.append(f"ANALYST_distribution_{dist_state}")
# 수익률 구간별 티어 (prices_json.profit_pct)
profit_pct = px.get("profit_pct")
if profit_pct is not None:
if profit_pct >= 50:
enriched.append("QUANT_profit_APEX_SUPER_50PCT_PLUS")
elif profit_pct >= 30:
enriched.append("QUANT_profit_LOCK_30PCT_PLUS")
elif profit_pct >= 10:
enriched.append("QUANT_profit_LOCK_10PCT_PLUS")
elif profit_pct >= 0:
enriched.append("QUANT_profit_BREAKEVEN_RANGE")
elif profit_pct >= -15:
enriched.append("QUANT_profit_MODERATE_LOSS")
else:
enriched.append("QUANT_profit_DEEP_LOSS_STOP_RISK")
# 포트폴리오 비중 (alpha_shield.weight_pct)
weight_pct = sh.get("weight_pct")
if weight_pct is not None:
if weight_pct >= 30:
enriched.append("QUANT_weight_OVERCONCENTRATED")
elif weight_pct >= 20:
enriched.append("QUANT_weight_HIGH_CORE_OVER20")
elif weight_pct >= 10:
enriched.append("ANALYST_weight_MID_CORE_10_20")
elif weight_pct >= 5:
enriched.append("ANALYST_weight_NORMAL_SATELLITE")
elif weight_pct >= 2:
enriched.append("ANALYST_weight_SMALL_2_5")
else:
enriched.append("ANALYST_weight_TINY_UNDER2")
# PAC 진입신선도 티어 (entry_freshness)
ef = _pc_arg.get("breakdown", {}).get("entry_freshness")
if ef is not None:
if ef >= 45:
enriched.append("QUANT_pac_entry_TIER6_TOP")
elif ef >= 30:
enriched.append("QUANT_pac_entry_TIER5_HIGH")
elif ef >= 20:
enriched.append("QUANT_pac_entry_TIER4_MID")
elif ef >= 10:
enriched.append("QUANT_pac_entry_TIER3_LOW")
elif ef >= 0:
enriched.append("QUANT_pac_entry_TIER2_WEAK")
else:
enriched.append("QUANT_pac_entry_TIER1_STALE")
# PAC 펀더멘털 기여도 (fundamental)
fund = _pc_arg.get("breakdown", {}).get("fundamental")
if fund is not None:
if fund >= 5:
enriched.append("ANALYST_pac_fundamental_STRONG_POSITIVE")
elif fund >= 0:
enriched.append("ANALYST_pac_fundamental_NEUTRAL_ZERO")
elif fund >= -5:
enriched.append("ANALYST_pac_fundamental_MILD_NEGATIVE")
else:
enriched.append("ANALYST_pac_fundamental_WEAK_NEGATIVE")
# 펀더멘털 등급 (fundamental_grade)
fund_grade = _pc_arg.get("fundamental_grade")
if fund_grade and fund_grade not in ("", "N/A"):
enriched.append(f"QUANT_fund_grade_{fund_grade}")
return enriched
# 전체 block_reasons 수집
all_reasons: list[str] = []
all_normalized: list[str] = []
ticker_results: list[dict[str, Any]] = []
for r in ejce:
ticker = str(r.get("ticker") or "")
block_reasons = r.get("block_reasons") if isinstance(r.get("block_reasons"), list) else []
consensus = str(r.get("consensus_result") or "")
if r.get("_fallback_generated"):
# fallback은 이미 ticker-specific reason을 만들어두었으므로
# 공통 enrichment를 덧붙이지 않는다. 그래야 diversity audit가
# 실제로 데이터 기반 분산을 측정한다.
final_reasons = list(block_reasons)
else:
# 종목별 특화 사유 추가 (다양성 개선)
enriched_reasons = _enrich_block_reasons(ticker, block_reasons, pac_map.get(ticker, {}))
# [Work 17] QUANT_REJECTED_pac를 종목별 PAC label로 세분화
# pac_label: BEARISH/NEUTRAL/BULLISH → 정규화 후 종목마다 다른 패턴
_pc_arg = pac_map.get(ticker, {})
pac_label = _pc_arg.get("pac_label", "")
pac_score = _pc_arg.get("pac_score")
final_reasons = []
for reason in enriched_reasons:
if "QUANT_REJECTED_pac" in reason:
# pac=-84.2(포트폴리오 공통)를 종목별 PAC label + 구간으로 교체
# 이렇게 하면 BEARISH 종목 vs BULLISH 종목이 서로 다른 정규화 사유를 갖게 됨
if pac_label:
final_reasons.append(f"QUANT_REJECTED_pac_{pac_label}")
if pac_score is not None:
if pac_score < -20:
final_reasons.append("QUANT_pac_score_STRONGLY_NEGATIVE")
elif pac_score < 0:
final_reasons.append("QUANT_pac_score_MILDLY_NEGATIVE")
elif pac_score < 20:
final_reasons.append("QUANT_pac_score_NEUTRAL")
else:
final_reasons.append("QUANT_pac_score_POSITIVE")
else:
final_reasons.append(reason) # 원본 유지
else:
final_reasons.append(reason)
raw_reasons = [str(x) for x in final_reasons]
normalized_reasons = [_normalize_reason(x) for x in raw_reasons]
all_reasons.extend(raw_reasons)
all_normalized.extend(normalized_reasons)
# 종목별 unique 비율
n_total = len(raw_reasons)
n_unique = len(set(normalized_reasons))
per_ticker_unique_pct = round((n_unique / n_total) * 100.0, 1) if n_total > 0 else 100.0
ticker_results.append({
"ticker": ticker,
"consensus_result": consensus,
"block_reasons": raw_reasons,
"normalized_reasons": normalized_reasons,
"reason_count": n_total,
"unique_reason_count": n_unique,
"unique_reason_pct": per_ticker_unique_pct,
})
# 전체 집계
total_reasons = len(all_normalized)
unique_reasons = len(set(all_normalized))
unique_reason_pct = round((unique_reasons / total_reasons) * 100.0, 1) if total_reasons > 0 else 100.0
# homogeneous: 전체 block_reasons 중 가장 흔한 것이 70% 이상 차지
from collections import Counter
reason_counts = Counter(all_normalized)
most_common_reason, most_common_count = reason_counts.most_common(1)[0] if reason_counts else ("", 0)
most_common_pct = round((most_common_count / total_reasons) * 100.0, 1) if total_reasons > 0 else 0.0
homogeneous_flag = most_common_pct >= 70.0
# ANALYST_VIEW_HOMOGENEOUS: 모든 종목이 동일 consensus이고 동일 사유
all_same_consensus = len(set(r["consensus_result"] for r in ticker_results)) <= 1
analyst_view_homogeneous = homogeneous_flag and all_same_consensus
# Gate
if analyst_view_homogeneous:
gate = "CAUTION"
note = f"ANALYST_VIEW_HOMOGENEOUS: {most_common_reason} ({most_common_pct:.0f}% of all reasons) — 관점 다양성 부족"
elif unique_reason_pct < _MIN_UNIQUE_REASON_PCT:
gate = "WARN"
note = f"unique_reason_pct={unique_reason_pct:.0f}% < {_MIN_UNIQUE_REASON_PCT:.0f}% 기준"
else:
gate = "PASS"
note = "관점 다양성 충족"
result = {
"formula_id": "EJCE_DIVERGENCE_AUDIT_V1",
"gate": gate,
"note": note,
"fallback_used": fallback_used,
"total_reason_count": total_reasons,
"unique_reason_count": unique_reasons,
"unique_reason_pct": unique_reason_pct,
"most_common_reason": most_common_reason,
"most_common_reason_pct": most_common_pct,
"homogeneous_flag": homogeneous_flag,
"analyst_view_homogeneous": analyst_view_homogeneous,
"min_unique_reason_pct_required": _MIN_UNIQUE_REASON_PCT,
"reason_distribution": dict(reason_counts.most_common()),
"ticker_results": ticker_results,
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(
f"EJCE_DIVERGENCE_AUDIT_V1 gate={gate} unique_reason_pct={unique_reason_pct} "
f"homogeneous_flag={homogeneous_flag} analyst_view_homogeneous={analyst_view_homogeneous}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())