"""
Same-candle entry-evaluation dedup (V16 patch).

Prevents the 20×-per-candle AI hammering observed in LIVE on 29 apr:
TF/MR pre-validations passed every 15s loop iter, calling ai.ask each
time on the same M5 candle for the same symbol. Now the brain compares
tech.candle_time with last_entry_eval_time (passed by orchestrator
from SessionState.entry_eval_cache) and short-circuits if equal.

Cache update policy (matches docstring in EntryEvalResult):
  text present (success or malformed JSON)        -> update
  text None + error_kind in {credit, invalid}     -> update (permanent)
  text None + error_kind = "unknown" (transient)  -> NOT updated (retry)
"""

from __future__ import annotations

import asyncio
import json
import sys
import tempfile
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

from analysis.tech_snapshot import TechSnapshot
from brain.ai_client import AIResponse
from brain.brain_mr import BrainMR
from brain.brain_tf import BrainTF
from core.config import RuntimeConfig, RunMode, AccountKind
from core.contracts import EntryEvalResult
from persistence.state_store import (
    EntryEvalCache, SCHEMA_VERSION, SessionState, StateStore,
)


def _ok(label: str) -> None:
    print(f"  ok  {label}")


def _bd(snap):
    """Build a BiasData from a TechSnapshot's algo-only bias fields.
    Production passes bias_data from BiasResolver (Source A); tests
    derive it from snap to keep legacy fixtures unchanged."""
    from analysis.bias import BiasData
    return BiasData(
        bias=snap.bias,
        allowed_direction=snap.allowed_direction,
        h1_compatibility=getattr(snap, "h1_compatibility", 1.0),
        h1_reason=getattr(snap, "h1_reason", "test"),
        ambiguous=False,
    )


# ============================================================
# FAKES — minimal AI client w/ canned response, call counter
# ============================================================

class _CountingAI:
    def __init__(self, resp: AIResponse) -> None:
        self.resp = resp
        self.calls: int = 0

    async def ask_for_decision(self, prompt, max_tokens=1200, where=None):
        self.calls += 1
        return self.resp


def _to_snap(d: dict) -> TechSnapshot:
    """
    Mirrors the dict-driven helper used in test_brain_tf.py: only
    fields named in `d` override defaults. Keeps this test resilient
    to future TechSnapshot field additions.
    """
    return TechSnapshot(
        symbol=d.get("symbol", "MES"),
        price=float(d.get("price", 5800.0)),
        open=float(d.get("open", d.get("price", 5798.0))),
        candle_time=int(d.get("candle_time", 1714378500) or 0),
        is_candle_closed=bool(d.get("is_candle_closed", True)),
        candle_age_seconds=float(d.get("candle_age_seconds", 10.0)),
        rsi=float(d.get("rsi", 50.0)),
        rsi_prev=float(d.get("rsi_prev", d.get("rsi", 50.0))),
        rsi_h1=float(d.get("rsi_h1", 52.0)),
        rsi_h4=float(d.get("rsi_h4", 55.0)),
        atr_m5_points=float(d.get("atr_m5_points", 30.0)),
        atr_ratio=float(d.get("atr_ratio", 1.1)),
        vol_regime=d.get("vol_regime", "NORMAL"),
        vol_spike=bool(d.get("vol_spike", False)),
        market_structure=d.get("market_structure", "BULLISH_EXPANSION"),
        h1_struct_bull=bool(d.get("h1_struct_bull", True)),
        h1_struct_bear=bool(d.get("h1_struct_bear", False)),
        trend_maturity=int(d.get("trend_maturity", 4)),
        regime=d.get("regime", "TRENDING"),
        regime_reason=d.get("regime_reason", ""),
        regime_near_trending=list(d.get("regime_near_trending", [])),
        deviation_pct=float(d.get("deviation_pct", 0.1)),
        divergence=d.get("divergence", "NONE"),
        macd_decelerating=bool(d.get("macd_decelerating", False)),
        macd_hist_last=float(d.get("macd_hist_last", 0.0)),
        candle_strength=float(d.get("candle_strength", 1.0)),
        hammer=bool(d.get("hammer", False)),
        shooting_star=bool(d.get("shooting_star", False)),
        bull_engulfing=bool(d.get("bull_engulfing", False)),
        bear_engulfing=bool(d.get("bear_engulfing", False)),
        doji=bool(d.get("doji", False)),
        doji_type=d.get("doji_type"),
        piercing=bool(d.get("piercing", False)),
        dark_cloud=bool(d.get("dark_cloud", False)),
        morning_star=bool(d.get("morning_star", False)),
        evening_star=bool(d.get("evening_star", False)),
        volume_weak=bool(d.get("volume_weak", False)),
        buy_absorption=bool(d.get("buy_absorption", False)),
        sell_absorption=bool(d.get("sell_absorption", False)),
        vwap=float(d.get("vwap", d.get("price", 5800.0))),
        vwap_deviation_pct=float(d.get("vwap_deviation_pct", 0.0)),
        bias=d.get("bias", "RIALZISTA"),
        allowed_direction=d.get("allowed_direction", "BUY"),
        h1_compatibility=float(d.get("h1_compatibility", 1.0)),
        h1_reason=d.get("h1_reason", ""),
        swing_data=dict(d.get("swing_data", {"swing_found": False})),
        consecutive_sl_count=int(d.get("consecutive_sl_count", 0)),
        tick_size=float(d.get("tick_size", 0.25)),
        tick_value=float(d.get("tick_value", 1.25)),
    )


def _ai_success_payload() -> AIResponse:
    body = json.dumps({
        "step_1_qualita_sconto": "BUONO",
        "step_2_timing_pullback": "BUONO",
        "step_3_contesto_macro": "FAVOREVOLE",
        "approved": True,
        "confidence": 75,
        "direction": "BUY",
        "risk_multiplier": 1.0,
        "sl_atr_multiplier": 0.78,   # in MES TF profile sl_range (0.36, 1.20)
        "rr_multiplier": 0.50,
        "key_risk": "ok",
        "reason": "ok",
    })
    return AIResponse(text=body, attempts=1)


def _make_cfg() -> RuntimeConfig:
    return RuntimeConfig(
        mode=RunMode.PAPER, account=AccountKind.INELIGIBLE,
    )


# ============================================================
# 1. TF: first call invokes AI; same candle skipped
# ============================================================

def test_tf_first_call_invokes_ai():
    async def run():
        ai = _CountingAI(_ai_success_payload())
        brain = BrainTF(_make_cfg(), ai)
        snap = _to_snap({"rsi": 50.0, "rsi_prev": 48.0})
        res = await brain.evaluate_entry(
            "MES", snap, bias_data=_bd(snap), last_entry_eval_time=0.0,
        )
        assert isinstance(res, EntryEvalResult)
        assert res.dedup_skipped is False
        assert res.decision is not None
        assert ai.calls == 1
        # candle_time fed back so orchestrator can update cache
        assert res.evaluated_candle_time == 1714378500.0
    asyncio.run(run())
    _ok("TF: first call invokes AI; evaluated_candle_time populated")


def test_tf_same_candle_skips_ai():
    async def run():
        ai = _CountingAI(_ai_success_payload())
        brain = BrainTF(_make_cfg(), ai)
        snap = _to_snap({"rsi": 50.0, "rsi_prev": 48.0})
        # Cache says we already evaluated this candle
        res = await brain.evaluate_entry(
            "MES", snap, bias_data=_bd(snap), last_entry_eval_time=1714378500.0,
        )
        assert res.dedup_skipped is True
        assert res.decision is None
        assert res.evaluated_candle_time is None
        assert ai.calls == 0, "AI must NOT be called on same-candle dedup"
    asyncio.run(run())
    _ok("TF: same-candle dedup short-circuits AI call")


# ============================================================
# 2. MR: same dedup behaviour
# ============================================================

def test_mr_same_candle_skips_ai():
    async def run():
        ai = _CountingAI(_ai_success_payload())
        brain = BrainMR(_make_cfg(), ai)
        # MR pre-vals: RSI must be extreme (rsi < 30 for BUY)
        snap = _to_snap({"rsi": 25.0, "rsi_prev": 28.0,
                         "regime": "RANGING",
                         "market_structure": "RANGING",
                         "symbol": "6E"})
        res = await brain.evaluate_entry(
            "6E", snap, bias_data=_bd(snap), last_entry_eval_time=1714378500.0,
        )
        assert res.dedup_skipped is True
        assert res.decision is None
        assert ai.calls == 0
    asyncio.run(run())
    _ok("MR: same-candle dedup short-circuits AI call")


# ============================================================
# 3. Pre-val rejection: NO AI call, NO cache update
# ============================================================

def test_pre_val_rejection_does_not_update_cache():
    """Pre-val rejection (e.g. RSI outside zone) must leave
    evaluated_candle_time=None — cache stays untouched."""
    async def run():
        ai = _CountingAI(_ai_success_payload())
        brain = BrainTF(_make_cfg(), ai)
        snap = _to_snap({"rsi": 65.0, "rsi_prev": 60.0})  # outside TF zone
        res = await brain.evaluate_entry(
            "MES", snap, bias_data=_bd(snap), last_entry_eval_time=0.0,
        )
        assert res.dedup_skipped is False
        assert res.decision is None
        assert res.evaluated_candle_time is None  # AI not called
        assert ai.calls == 0
    asyncio.run(run())
    _ok("pre-val rejection: cache unchanged (evaluated_candle_time None)")


# ============================================================
# 4. AI transient failure: cache NOT updated -> retry next iter
# ============================================================

def test_ai_transient_failure_does_not_update_cache():
    """error_kind='unknown' (covers 429/5xx/network/timeout) must leave
    evaluated_candle_time=None so the next iteration retries."""
    async def run():
        # text=None + error_kind='unknown' = transient
        ai = _CountingAI(AIResponse(text=None, error_kind="unknown", attempts=3))
        brain = BrainTF(_make_cfg(), ai)
        snap = _to_snap({"rsi": 50.0, "rsi_prev": 48.0})
        res = await brain.evaluate_entry(
            "MES", snap, bias_data=_bd(snap), last_entry_eval_time=0.0,
        )
        assert res.decision is None
        assert res.dedup_skipped is False
        assert res.evaluated_candle_time is None, \
            "transient AI failure must NOT consume the candle"
        assert ai.calls == 1
    asyncio.run(run())
    _ok("AI transient (unknown): cache NOT updated, retries enabled")


# ============================================================
# 5. AI permanent failure (credit/invalid): cache UPDATED
# ============================================================

def test_ai_permanent_failure_updates_cache():
    """error_kind='credit' or 'invalid' is permanent -- retrying within
    the same candle just burns budget. Cache must update."""
    async def run():
        ai = _CountingAI(AIResponse(text=None, error_kind="credit", attempts=1))
        brain = BrainTF(_make_cfg(), ai)
        snap = _to_snap({"rsi": 50.0, "rsi_prev": 48.0})
        res = await brain.evaluate_entry(
            "MES", snap, bias_data=_bd(snap), last_entry_eval_time=0.0,
        )
        assert res.decision is None
        assert res.evaluated_candle_time == 1714378500.0
    asyncio.run(run())
    _ok("AI permanent (credit/invalid): cache UPDATED, no retry loop")


# ============================================================
# 6. Persistence: EntryEvalCache survives save/load roundtrip
# ============================================================

def test_dedup_cache_persists_across_restart():
    with tempfile.TemporaryDirectory() as tmp:
        store = StateStore(Path(tmp) / "state.json")
        s = SessionState()
        s.entry_eval_cache.last_eval["MES"] = 1714378500.0
        s.entry_eval_cache.last_eval["6E"]  = 1714378200.0
        store.save(s)

        s2 = store.load()
        assert s2 is not None
        assert s2.schema_version == SCHEMA_VERSION
        assert s2.entry_eval_cache.last_eval["MES"] == 1714378500.0
        assert s2.entry_eval_cache.last_eval["6E"]  == 1714378200.0
    _ok("EntryEvalCache survives save/load roundtrip (post-crash dedup intact)")


# ============================================================
# CANDLE-STATE GATING (V16 patch, 29 apr)
# ============================================================
# Brain.evaluate_entry skips AI before pre-val when:
#   - is_candle_closed is False                        -> CANDLE_NOT_CLOSED
#   - candle_age_seconds < cfg.candle_close_delay      -> CANDLE_STABILIZING
#   - candle_age_seconds > cfg.candle_max_age_seconds  -> CANDLE_TOO_OLD
# Defaults: 5s / 60s. Same gate behaves identically on TF and MR.

def test_evaluate_entry_skips_when_candle_not_closed():
    """is_candle_closed=False -> reject_reason CANDLE_NOT_CLOSED, no AI."""
    async def run():
        ai = _CountingAI(_ai_success_payload())
        brain = BrainTF(_make_cfg(), ai)
        snap = _to_snap({"is_candle_closed": False,
                         "candle_age_seconds": -120.0})
        res = await brain.evaluate_entry(
            "MES", snap, bias_data=_bd(snap), last_entry_eval_time=0.0,
        )
        assert res.decision is None
        assert res.reject_reason == "CANDLE_NOT_CLOSED"
        assert res.dedup_skipped is False
        assert res.evaluated_candle_time is None
        assert ai.calls == 0
    asyncio.run(run())
    _ok("candle gate: not closed -> CANDLE_NOT_CLOSED, no AI")


def test_evaluate_entry_skips_when_candle_stabilizing():
    """age < cfg.candle_close_delay_seconds (5s) -> CANDLE_STABILIZING."""
    async def run():
        ai = _CountingAI(_ai_success_payload())
        brain = BrainTF(_make_cfg(), ai)
        snap = _to_snap({"is_candle_closed": True,
                         "candle_age_seconds": 2.0})  # < 5s default
        res = await brain.evaluate_entry(
            "MES", snap, bias_data=_bd(snap), last_entry_eval_time=0.0,
        )
        assert res.decision is None
        assert res.reject_reason == "CANDLE_STABILIZING"
        assert ai.calls == 0
    asyncio.run(run())
    _ok("candle gate: age < 5s -> CANDLE_STABILIZING, no AI")


def test_evaluate_entry_skips_when_candle_too_old():
    """age > cfg.candle_max_age_seconds (60s) -> CANDLE_TOO_OLD."""
    async def run():
        ai = _CountingAI(_ai_success_payload())
        brain = BrainTF(_make_cfg(), ai)
        snap = _to_snap({"is_candle_closed": True,
                         "candle_age_seconds": 90.0})  # > 60s default
        res = await brain.evaluate_entry(
            "MES", snap, bias_data=_bd(snap), last_entry_eval_time=0.0,
        )
        assert res.decision is None
        assert res.reject_reason == "CANDLE_TOO_OLD"
        assert ai.calls == 0
    asyncio.run(run())
    _ok("candle gate: age > 60s -> CANDLE_TOO_OLD, no AI")


def test_evaluate_entry_proceeds_when_candle_settled():
    """delay <= age <= max_age -> AI is called (no candle gate skip)."""
    async def run():
        ai = _CountingAI(_ai_success_payload())
        brain = BrainTF(_make_cfg(), ai)
        snap = _to_snap({"is_candle_closed": True,
                         "candle_age_seconds": 12.0})  # in [5,60]
        res = await brain.evaluate_entry(
            "MES", snap, bias_data=_bd(snap), last_entry_eval_time=0.0,
        )
        assert res.reject_reason is None
        assert res.decision is not None
        assert ai.calls == 1
    asyncio.run(run())
    _ok("candle gate: 5 <= age <= 60 -> AI called, decision returned")


def test_candle_age_calculation_open_time_convention():
    """build_tech_snapshot derives age from candle_time (bar OPEN) +
    V16_M5_BAR_SECONDS using injected now_utc. Verify the formula
    directly without hitting the full builder by reading from a
    constructed TechSnapshot."""
    from analysis.tech_snapshot import V16_M5_BAR_SECONDS
    assert V16_M5_BAR_SECONDS == 300

    # Open at t=1000, close at t=1300. now=1305 -> age=5s, closed.
    snap = _to_snap({"candle_time": 1000,
                     "is_candle_closed": True,
                     "candle_age_seconds": 5.0})
    assert snap.is_candle_closed is True
    assert snap.candle_age_seconds == 5.0

    # Open at t=1000, close at t=1300. now=1295 -> age=-5s, in-flight.
    snap2 = _to_snap({"candle_time": 1000,
                      "is_candle_closed": False,
                      "candle_age_seconds": -5.0})
    assert snap2.is_candle_closed is False
    assert snap2.candle_age_seconds == -5.0
    _ok("candle age: open-time convention, bar duration = 300s")


def test_candle_time_zero_treated_as_not_closed():
    """Fail-closed policy: candle_time=0 (data unavailable) must NOT
    permit AI calls. Skip with CANDLE_NOT_CLOSED."""
    async def run():
        ai = _CountingAI(_ai_success_payload())
        brain = BrainTF(_make_cfg(), ai)
        snap = _to_snap({"candle_time": 0,
                         "is_candle_closed": False,
                         "candle_age_seconds": 0.0})
        res = await brain.evaluate_entry(
            "MES", snap, bias_data=_bd(snap), last_entry_eval_time=0.0,
        )
        assert res.reject_reason == "CANDLE_NOT_CLOSED"
        assert ai.calls == 0
    asyncio.run(run())
    _ok("candle gate: candle_time=0 -> fail closed, no AI")


# ============================================================
# MAIN
# ============================================================

def main() -> int:
    print("test_entry_dedup.py")
    test_tf_first_call_invokes_ai()
    test_tf_same_candle_skips_ai()
    test_mr_same_candle_skips_ai()
    test_pre_val_rejection_does_not_update_cache()
    test_ai_transient_failure_does_not_update_cache()
    test_ai_permanent_failure_updates_cache()
    test_dedup_cache_persists_across_restart()
    test_evaluate_entry_skips_when_candle_not_closed()
    test_evaluate_entry_skips_when_candle_stabilizing()
    test_evaluate_entry_skips_when_candle_too_old()
    test_evaluate_entry_proceeds_when_candle_settled()
    test_candle_age_calculation_open_time_convention()
    test_candle_time_zero_treated_as_not_closed()
    print("ALL 13 TESTS PASSED")
    return 0


if __name__ == "__main__":
    sys.exit(main())
