"""validate_factor_conflict_matrix_v1.py — spec/53: H002_FACTOR_CONFLICT_MATRIX Verifies that every active factor family pair has a defined conflict precedence and that no unresolved conflicts exist in the current decision packet. formula_id: VALIDATE_FACTOR_CONFLICT_MATRIX_V1 contract: spec/53_factor_conflict_matrix.yaml """ from __future__ import annotations import json import sys from pathlib import Path import yaml ROOT = Path(__file__).resolve().parents[1] DEFAULT_TAXONOMY = ROOT / "spec" / "43_quant_factor_taxonomy.yaml" DEFAULT_PACKET = ROOT / "Temp" / "final_decision_packet_active.json" DEFAULT_CONFLICT_SPEC = ROOT / "spec" / "53_factor_conflict_matrix.yaml" OUTPUT_PATH = ROOT / "Temp" / "factor_conflict_matrix_v1.json" # Ordered precedence families (rank 1 = highest) CONFLICT_PRECEDENCE = [ "portfolio_risk_budget", "execution_quality", "entry_timing", "smart_money_liquidity", "fundamental_quality", "market_regime", "exit_value_preservation", ] def _load_json(path: Path) -> dict: if not path.exists(): return {"_missing": True, "_path": str(path)} try: return json.loads(path.read_text(encoding="utf-8")) except Exception as e: return {"_error": str(e), "_path": str(path)} def _load_yaml(path: Path) -> dict: if not path.exists(): return {"_missing": True, "_path": str(path)} try: obj = yaml.safe_load(path.read_text(encoding="utf-8")) return obj if isinstance(obj, dict) else {} except Exception as e: return {"_error": str(e), "_path": str(path)} def _extract_active_families(taxonomy: dict) -> list[str]: families = [] factors = taxonomy.get("factors") or {} if isinstance(factors, dict): for fid, fdata in factors.items(): if isinstance(fdata, dict) and fdata.get("lifecycle_state") == "active": fam = fdata.get("family") or fdata.get("domain") if fam and fam not in families: families.append(fam) return families def _check_precedence_coverage(active_families: list[str]) -> tuple[int, list[str]]: """Count families not covered by the precedence matrix.""" undefined = [f for f in active_families if f not in CONFLICT_PRECEDENCE] return len(undefined), undefined def _detect_conflicts_in_packet(packet: dict) -> list[dict]: """Look for explicit signal contradictions in per_ticker data.""" conflicts = [] per_ticker = packet.get("per_ticker") or {} if not isinstance(per_ticker, dict): return conflicts for ticker, data in per_ticker.items(): if not isinstance(data, dict): continue # Detect buy signal + high heat block simultaneously buy_signal = str(data.get("final_action") or "").upper() heat_gate = str(data.get("total_heat_gate") or "").upper() if "BUY" in buy_signal and heat_gate in ("BLOCK_NEW_BUY", "HALVE"): conflicts.append({ "ticker": ticker, "conflict_type": "BUY_SIGNAL_vs_HEAT_BLOCK", "family_a": "entry_timing", "family_b": "portfolio_risk_budget", "resolution": "portfolio_risk_budget wins (rank=1)", "resolved": True, }) # Detect sell signal + profit_lock preservation simultaneously if "SELL" in buy_signal or "EXIT" in buy_signal: profit_lock = str(data.get("profit_lock_stage") or "").upper() if profit_lock not in ("", "NORMAL", "NONE"): conflicts.append({ "ticker": ticker, "conflict_type": "SELL_SIGNAL_vs_PROFIT_LOCK", "family_a": "exit_value_preservation", "family_b": "portfolio_risk_budget", "resolution": "portfolio_risk_budget wins (rank=1)", "resolved": True, }) return conflicts def run(taxonomy_path: Path, packet_path: Path) -> dict: taxonomy = _load_yaml(taxonomy_path) packet = _load_json(packet_path) if taxonomy.get("_missing") and packet.get("_missing"): result = { "gate": "SKIP", "reason": "taxonomy and packet both missing", "unresolved_conflict_count": 0, "contract": "spec/53_factor_conflict_matrix.yaml", } OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) OUTPUT_PATH.write_text(json.dumps(result, ensure_ascii=False, indent=2)) return result active_families = _extract_active_families(taxonomy) undefined_count, undefined_families = _check_precedence_coverage(active_families) conflicts = [] if not packet.get("_missing"): conflicts = _detect_conflicts_in_packet(packet) unresolved = [c for c in conflicts if not c.get("resolved")] unresolved_count = unresolved.__len__() + undefined_count gate = "PASS" if unresolved_count == 0 else "FAIL" result = { "gate": gate, "conflict_matrix": conflicts, "unresolved_conflict_count": unresolved_count, "precedence_undefined_families": undefined_families, "active_families_checked": active_families, "total_conflicts_detected": len(conflicts), "contract": "spec/53_factor_conflict_matrix.yaml", } OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) OUTPUT_PATH.write_text(json.dumps(result, ensure_ascii=False, indent=2)) return result def main() -> None: import argparse parser = argparse.ArgumentParser(description="H002 Factor Conflict Matrix Validator") parser.add_argument("--taxonomy", default=str(DEFAULT_TAXONOMY)) parser.add_argument("--packet", default=str(DEFAULT_PACKET)) args = parser.parse_args() result = run(Path(args.taxonomy), Path(args.packet)) gate = result.get("gate", "FAIL") print(f"[H002_FACTOR_CONFLICT_MATRIX] gate={gate} " f"conflicts={result.get('total_conflicts_detected', 0)} " f"unresolved={result.get('unresolved_conflict_count', 0)}") if gate == "FAIL": print(" FAIL — undefined families:", result.get("precedence_undefined_families")) sys.exit(1) if __name__ == "__main__": main()