diff --git a/tests/parity/test_distribution_risk_parity.py b/tests/parity/test_distribution_risk_parity.py new file mode 100644 index 0000000..e999deb --- /dev/null +++ b/tests/parity/test_distribution_risk_parity.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +import sys +import unittest +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from tools.build_distribution_risk_score_v2 import calculate_distribution_risk + + +class TestDistributionRiskParity(unittest.TestCase): + + def test_distribution_risk_parity_scenarios(self): + # Scenario 1: Smart Money Outflow only + row_1 = { + "close": 10000, + "ma20": 10000, + "frg_5d": -100, + "inst_5d": -200, + } + res_1 = calculate_distribution_risk(row_1, kospi_ret_5d=0.0) + self.assertEqual(res_1["distribution_risk_score"], 30) + self.assertIn("smart_money_outflow", res_1["reason_codes"]) + self.assertEqual(res_1["anti_distribution_state"], "PASS") + + # Scenario 2: High upper wick and low flow credit under priceAboveMa20 + row_2 = { + "close": 12000, + "ma20": 10000, # priceAboveMa20 = True + "high": 15000, + "low": 10000, + # upperWickRatio = (15000-12000)/5000 = 3000/5000 = 0.60 >= 0.45 + "flow_credit": 0.35, # flow_credit < 0.40 + } + res_2 = calculate_distribution_risk(row_2, kospi_ret_5d=0.0) + self.assertIn("upper_wick_distribution", res_2["reason_codes"]) + self.assertIn("flow_credit_low", res_2["reason_codes"]) + # score = 15 (upper wick) + 20 (flow credit low) = 35 + self.assertEqual(res_2["distribution_risk_score"], 35) + + # Scenario 3: Trim Review threshold (score >= 55) + row_3 = { + "close": 10000, + "ma20": 10000, + "frg_5d": -100, + "inst_5d": -200, # +30 + "flow_credit": 0.30, # +20 + "volume": 70, + "avg_volume_5d": 100, # volume < 80% of avg_vol_5d -> +20 + } + res_3 = calculate_distribution_risk(row_3, kospi_ret_5d=0.0) + # score = 30 + 20 + 20 = 70 (BLOCK_BUY) + self.assertEqual(res_3["distribution_risk_score"], 70) + self.assertEqual(res_3["anti_distribution_state"], "BLOCK_BUY") + + def test_distribution_risk_early_warning_signals(self): + # Early warning signal 1: New high volume contraction + row_4 = { + "close": 9800, + "high_52w": 10000, # close >= 97% of 52w high -> nearNewHigh = True + "volume": 70, + "avg_volume_5d": 100, # volume < 80% -> +12 + } + res_4 = calculate_distribution_risk(row_4, kospi_ret_5d=0.0) + self.assertIn("new_high_volume_contraction", res_4["reason_codes"]) + self.assertEqual(res_4["pre_distribution_warning"], "EARLY_WARNING") + + # Early warning signal 2: Surge weak flow + row_5 = { + "close": 10000, + "ret_5d": 6.0, # ret5d >= 5 + "flow_credit": 0.40, # flow_credit < 0.45 -> +10 + } + res_5 = calculate_distribution_risk(row_5, kospi_ret_5d=0.0) + self.assertIn("surge_weak_flow", res_5["reason_codes"]) + self.assertEqual(res_5["pre_distribution_warning"], "EARLY_WARNING") + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/build_distribution_risk_score_v2.py b/tools/build_distribution_risk_score_v2.py index 59111d7..05cb649 100644 --- a/tools/build_distribution_risk_score_v2.py +++ b/tools/build_distribution_risk_score_v2.py @@ -6,6 +6,93 @@ from pathlib import Path ROOT = Path(__file__).resolve().parents[1] +def calculate_distribution_risk(row: dict, kospi_ret_5d: float) -> dict: + close = float(row.get("Close") or row.get("close") or 0.0) + ma20 = float(row.get("MA20") or row.get("ma20") or 0.0) + high = float(row.get("High") or row.get("high") or close) + low = float(row.get("Low") or row.get("low") or close) + volume = row.get("Volume") or row.get("volume") + avg_vol_5d = row.get("AvgVolume5d") or row.get("avg_volume_5d") or row.get("Avg_Volume_5D") + flow_credit = row.get("FlowCredit") or row.get("flow_credit") or row.get("Flow_Credit") + + # Coerce to float if valid + volume = float(volume) if volume not in (None, "") else None + avg_vol_5d = float(avg_vol_5d) if avg_vol_5d not in (None, "") else None + flow_credit = float(flow_credit) if flow_credit not in (None, "") else None + + price_above_ma20 = close > 0 and ma20 > 0 and close > ma20 + + score = 0 + reasons = [] + + frg5d = row.get("Frg5D") or row.get("frg_5d") or row.get("Frg_5D") + inst5d = row.get("Inst5D") or row.get("inst_5d") or row.get("Inst_5D") + frg5d = float(frg5d) if frg5d not in (None, "") else None + inst5d = float(inst5d) if inst5d not in (None, "") else None + + if frg5d is not None and inst5d is not None and frg5d < 0 and inst5d < 0: + score += 30 + reasons.append("smart_money_outflow") + + if volume is not None and avg_vol_5d is not None and avg_vol_5d > 0 and volume < avg_vol_5d * 0.80: + score += 20 + reasons.append("volume_fade_after_surge") + + if high > low and close > 0: + upper_wick_ratio = (high - close) / max(high - low, 1.0) + if upper_wick_ratio >= 0.45 and price_above_ma20: + score += 15 + reasons.append("upper_wick_distribution") + + if flow_credit is not None and flow_credit < 0.40: + score += 20 + reasons.append("flow_credit_low") + + ret5d = row.get("Ret5D") or row.get("ret_5d") or row.get("Ret_5D") + ret5d = float(ret5d) if ret5d not in (None, "") else None + if ret5d is not None and kospi_ret_5d is not None and ret5d < kospi_ret_5d - 3: + score += 15 + reasons.append("sector_relative_lag") + + ac_gate = row.get("acGate") or row.get("ac_gate") + if ac_gate and "CLIMAX" in str(ac_gate).upper(): + score += 15 + reasons.append("anti_climax_gate") + + ac_total = row.get("acTotal") or row.get("ac_total") + ac_total = float(ac_total) if ac_total not in (None, "") else None + if ac_total is not None and ac_total >= 2: + score += 10 + reasons.append("ac_total_gte2") + + val_surge_pct = row.get("valSurgePct") or row.get("val_surge_pct") + val_surge_pct = float(val_surge_pct) if val_surge_pct not in (None, "") else None + if val_surge_pct is not None and val_surge_pct >= 40 and price_above_ma20 and (flow_credit is None or flow_credit < 0.50): + score += 10 + reasons.append("val_surge_no_flow_support") + + high52w = row.get("high52w") or row.get("high_52w") or row.get("High_52W") + high52w = float(high52w) if high52w not in (None, "") and float(high52w) > 0 else None + near_new_high = (high52w is not None and close > 0 and close >= high52w * 0.97) or (ma20 > 0 and close > ma20 * 1.15) + if near_new_high and volume is not None and avg_vol_5d is not None and avg_vol_5d > 0 and volume < avg_vol_5d * 0.80: + score += 12 + reasons.append("new_high_volume_contraction") + + if ret5d is not None and ret5d >= 5 and flow_credit is not None and flow_credit < 0.45: + score += 10 + reasons.append("surge_weak_flow") + + final_score = min(100, max(0, score)) + state = "BLOCK_BUY" if final_score >= 70 else "TRIM_REVIEW" if final_score >= 55 else "PASS" + pre_dist_warning = "EARLY_WARNING" if ("new_high_volume_contraction" in reasons or "surge_weak_flow" in reasons) else "NONE" + + return { + "distribution_risk_score": final_score, + "anti_distribution_state": state, + "pre_distribution_warning": pre_dist_warning, + "reason_codes": reasons + } + def main(): parser = argparse.ArgumentParser() parser.add_argument("--json", default="GatherTradingData.json") @@ -18,22 +105,23 @@ def main(): sys.exit(1) raw = json.loads(json_path.read_text(encoding="utf-8")) - core_satellite = raw.get("data", {}).get("core_satellite", []) or [] + data_feed = raw.get("data", {}).get("data_feed", []) or [] + + # Find KOSPI ret_5d if present in macro to align with kospiRet5d + macro = raw.get("data", {}).get("macro", []) or [] + kospi_ret_5d = 0.0 + for m in macro: + if str(m.get("Ticker") or m.get("ticker")).strip() == "KOSPI": + kospi_ret_5d = float(m.get("Ret5D") or m.get("ret_5d") or 0.0) + break scores = {} - for row in core_satellite: - ticker = row.get("Ticker") + for row in data_feed: + ticker = row.get("Ticker") or row.get("ticker") if not ticker: continue - close = row.get("Close") or 0.0 - ma20 = row.get("MA20") or close - - # Calculate distribution risk score: 0 to 100 - score = min(100, max(0, int((close - ma20) / ma20 * 200))) - scores[ticker] = { - "distribution_risk_score": score, - "status": "HIGH" if score >= 50 else "NORMAL" - } + res = calculate_distribution_risk(row, kospi_ret_5d) + scores[ticker] = res out_path = ROOT / args.out out_path.parent.mkdir(parents=True, exist_ok=True)