"""
Phase B end-to-end tests for orchestrator + lifecycle wireup.

Covers:
  - Entry path: brain -> sizing -> risk -> opener -> state mutation
  - Exit path: brain.manage_exit -> closer -> risk hooks -> state cleanup
  - PARTIAL_50 with set_be_after_partial: closer.partial_close + modify_stop
  - MOVE_SL: broker.modify_stop + runtime.current_sl_price
  - HOLD: candle dedup persistence
  - State persistence across iterations

Pattern: FakeBroker (duck-typed, no BrokerBase subclass — orchestrator
duck-types its broker), FakeMarketDataProvider with canned bars,
FakeBrain (canned EntryDecision + manage_exit BrainDecision).

Run:
    cd ~/apex_v16
    python -m tests.test_orchestrator_phase_b
"""

from __future__ import annotations

import asyncio
import dataclasses
import sys
import tempfile
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any

import pandas as pd

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

from analysis.tech_snapshot import TechSnapshot
from brain.ai_client import AIResponse
from broker.broker_base import ClosedTrade, OrderResult, Position
from core.config import RuntimeConfig, RunMode, AccountKind
from core.contracts import (
    BrainContext, BrainDecision, EntryDecision, EntryEvalResult, TradeAction,
    TradeEntry, TradeRuntime, utc_now,
)
from orchestrator import Orchestrator
from persistence.state_store import ActiveTrade, SessionState, StateStore
from trading.risk_manager import RiskManager
from trading.trade_closer import TradeCloser
from trading.trade_opener import TradeOpener


def _ok(label: str) -> None:
    print(f"  ok  {label}")


# ============================================================
# FAKES
# ============================================================

class FakeProvider:
    def __init__(self, bars=None) -> None:
        self.bars = bars or {}

    async def get_bars(self, symbol, timeframe, n):
        df = self.bars.get((symbol, timeframe))
        if df is None or df.empty:
            return pd.DataFrame(columns=["open", "high", "low", "close", "volume"])
        return df.tail(n).reset_index(drop=True)


class FakeAI:
    async def ask(self, prompt, temperature=0.2, max_tokens=None):
        return AIResponse(text=None, error_kind="unknown")

    async def ask_for_decision(self, prompt, max_tokens=600, where=None):
        return AIResponse(text=None, error_kind="unknown")


class FakeBrain:
    def __init__(self, entry_decision=None, exit_decision=None):
        self.entry_decision = entry_decision
        self.exit_decision = exit_decision or BrainDecision(action="HOLD", reason="default")
        self.evaluate_calls = 0
        self.manage_calls = 0

    async def evaluate_entry(self, symbol, tech, *, bias_data=None, last_entry_eval_time=0.0):
        self.evaluate_calls += 1
        ct = float(getattr(tech, "candle_time", 0) or 0) or None
        return EntryEvalResult(
            decision=self.entry_decision,
            evaluated_candle_time=ct,
        )

    async def manage_exit(self, ctx):
        self.manage_calls += 1
        return self.exit_decision


class FakeBroker:
    """Duck-typed broker for live-path tests. Records calls."""
    def __init__(self) -> None:
        self.positions_get_calls = 0
        self.place_calls = []
        self.close_calls = []
        self.modify_stop_calls = []
        self.cancel_all_calls = []
        self.partial_close_via_opposite_calls = []
        self.next_order_ids = ("E1", "S1", "T1")
        self.next_partial_ids = ("CLOSE2", "S2", "T2")
        self.fail_modify_stop = False
        self.fail_close_position = False
        self.fail_partial_close_via_opposite = False
        # BUG A test hook: list of Position objects to return on positions_get.
        # Default [] = "no broker-side position" (legacy behavior).
        self.positions_to_return: list = []

    async def positions_get(self, symbol=None):
        self.positions_get_calls += 1
        return list(self.positions_to_return)

    async def place_market_bracket(self, symbol, direction, contracts,
                                    sl_price, tp_price, **kwargs):
        self.place_calls.append({
            "symbol": symbol, "direction": direction, "contracts": contracts,
            "sl_price": sl_price, "tp_price": tp_price,
        })
        e, s, t = self.next_order_ids
        return OrderResult(
            success=True, entry_price=5800.0, sl_price=sl_price,
            tp_price=tp_price, entry_id=e, stop_id=s, target_id=t,
        )

    async def close_position(self, symbol, contracts=None):
        self.close_calls.append({"symbol": symbol, "contracts": contracts})
        if self.fail_close_position:
            raise RuntimeError("broker rejected close_position")
        return OrderResult(success=True)

    async def partial_close_via_opposite_order(
        self, symbol, direction, contracts_to_close, residual_contracts,
        new_sl_price, new_tp_price, old_stop_order_id, old_target_order_id,
    ):
        self.partial_close_via_opposite_calls.append({
            "symbol": symbol, "direction": direction,
            "contracts_to_close": contracts_to_close,
            "residual_contracts": residual_contracts,
            "new_sl_price": new_sl_price, "new_tp_price": new_tp_price,
            "old_stop_order_id": old_stop_order_id,
            "old_target_order_id": old_target_order_id,
        })
        if self.fail_partial_close_via_opposite:
            return OrderResult(success=False, error="simulated 400 partial reject")
        close_id, s, t = self.next_partial_ids
        return OrderResult(
            success=True, entry_id=close_id, stop_id=s, target_id=t,
            sl_price=new_sl_price, tp_price=new_tp_price,
        )

    async def cancel_all_for_symbol(self, symbol):
        self.cancel_all_calls.append(symbol)
        return 0

    async def recent_trades(self, symbol=None, since=None, limit=50):
        return list(getattr(self, "recent_trades_to_return", []))

    async def modify_stop(self, symbol, order_id, new_sl_price):
        self.modify_stop_calls.append({
            "symbol": symbol, "order_id": order_id, "new_sl_price": new_sl_price,
        })
        if self.fail_modify_stop:
            raise RuntimeError("broker rejected modify_stop")
        return OrderResult(success=True, sl_price=new_sl_price)


class JsonlSink:
    def __init__(self):
        self.events = []

    def write(self, event, **fields):
        self.events.append({"event": event, **fields})


class FakeLogger:
    def __init__(self):
        import logging
        self.brain_log = JsonlSink()
        self.session_log = JsonlSink()
        self.error_log = JsonlSink()
        self.system = logging.getLogger("test.orch_b")

    def log_session_event(self, event, **fields):
        self.session_log.write(event, **fields)

    def log_error(self, where, error, **extra):
        self.error_log.write("error", where=where, error=error, **extra)

    def log_trade_opened(self, **fields):
        self.brain_log.write("trade_opened", **fields)

    def log_trade_closed(self, **fields):
        self.brain_log.write("trade_closed", **fields)


# ============================================================
# FIXTURES
# ============================================================

def make_config(**over) -> RuntimeConfig:
    cfg = RuntimeConfig(mode=RunMode.PAPER, account=AccountKind.INELIGIBLE)
    cfg.asset_filter = ["MES"]
    cfg.loop_sleep_seconds = 0
    # V16 3-loop test mode: scan offset=0 (immediate boundary), manage
    # and maintenance interval=0 (yield-only). Combined with the per-loop
    # max_iterations counter, each loop runs exactly max_iterations iters
    # then exits — preserving V15-era 1-tick-per-iter semantics.
    cfg.scan_loop_phase_offset_seconds = 0.0
    cfg.manage_loop_interval_seconds = 0
    cfg.maintenance_loop_interval_seconds = 0
    for k, v in over.items():
        setattr(cfg, k, v)
    return cfg


def make_state() -> SessionState:
    return SessionState()


def make_store(tmp: Path) -> StateStore:
    return StateStore(tmp / "state.json")


def make_entry_decision(direction="BUY") -> EntryDecision:
    return EntryDecision(
        direction=direction,
        entry_price=5800.0,
        sl_price=5790.0 if direction == "BUY" else 5810.0,
        tp_price=5820.0 if direction == "BUY" else 5780.0,
        rr_multiplier=0.50,
        confidence=80, rationale="test",
    )


def make_trade_entry(symbol="MES", brain="TF", direction="BUY",
                     contracts=2, stop_id="S1") -> TradeEntry:
    return TradeEntry(
        symbol=symbol, brain_name=brain, direction=direction, contracts=contracts,
        entry_price=5800.0, sl_price=5790.0 if direction == "BUY" else 5810.0,
        tp_price=5820.0 if direction == "BUY" else 5780.0,
        opened_at=utc_now() - timedelta(minutes=10),
        rsi_m5_at_entry=55.0, rsi_h1_at_entry=58.0, rsi_h4_at_entry=60.0,
        atr_ratio_at_entry=1.0, market_structure_at_entry="BULLISH_EXPANSION",
        regime_at_entry="TRENDING", h1_compat_at_entry=1.0, confidence_at_entry=75,
        entry_order_id="E1", stop_order_id=stop_id, target_order_id="T1",
        is_paper=True,
    )


# Fixed midday UTC clock used by all Orchestrator-instantiating tests.
# Avoids brain_selector._is_night_tf_block clobbering TF entries when the
# test suite runs between 20:00-04:00 UTC.
FIXED_DAY_UTC: datetime = datetime(2026, 4, 28, 14, 0, 0, tzinfo=timezone.utc)


def make_orch_for_test(
    *,
    brain=None,
    state=None,
    cfg=None,
    opener=None,
    closer=None,
    broker=None,
    risk=None,
    store=None,
    is_paper=True,
    max_iterations=1,
):
    cfg = cfg or make_config()
    state = state if state is not None else make_state()
    logger = FakeLogger()
    opener = opener or TradeOpener(broker=broker, is_paper=is_paper, logger=logger)
    closer = closer or TradeCloser(broker=broker, is_paper=is_paper, logger=logger)
    risk = risk or RiskManager(cfg, state, logger=logger)
    brains = {"TF": brain, "MR": brain} if brain else {}
    return Orchestrator(
        config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
        state=state, store=store or _MemoryStore(), logger=logger,
        brain_dispatch=brains,
        opener=opener, closer=closer, risk_manager=risk, broker=broker,
        now_utc_provider=lambda: FIXED_DAY_UTC,
        max_iterations=max_iterations,
    ), logger


class _MemoryStore:
    def __init__(self) -> None:
        self.saves = 0
        self.last = None
    def save(self, state):
        self.saves += 1
        self.last = state.to_dict()


# Force MES tech to bypass build_tech_snapshot's None return: we monkey-patch
# orchestrator._build_tech to return a synthetic TechSnapshot.

def _synthetic_tech(symbol="MES", price=5800.0, candle_time=1730000000) -> TechSnapshot:
    return TechSnapshot(
        symbol=symbol, price=price, open=price, candle_time=candle_time,
        is_candle_closed=True, candle_age_seconds=10.0,
        rsi=50.0, rsi_prev=48.0, rsi_h1=50.0, rsi_h4=50.0,
        atr_m5_points=10.0, atr_ratio=1.0,
        vol_regime="NORMAL", vol_spike=False,
        market_structure="BULLISH_EXPANSION",
        h1_struct_bull=True, h1_struct_bear=False, trend_maturity=4,
        regime="TRENDING", regime_reason="", regime_near_trending=[],
        deviation_pct=0.0, divergence="NONE", macd_decelerating=False,
        macd_hist_last=0.0,
        candle_strength=1.0,
        hammer=False, shooting_star=False, bull_engulfing=False,
        bear_engulfing=False, doji=False, doji_type=None, piercing=False,
        dark_cloud=False, morning_star=False, evening_star=False,
        volume_weak=False,
        buy_absorption=False,
        sell_absorption=False,
        vwap=price, vwap_deviation_pct=0.0,
        bias="RIALZISTA", allowed_direction="BUY",
        h1_compatibility=1.0, h1_reason="",
        tick_size=0.25, tick_value=1.25,
    )


def patch_tech(orch, tech=None):
    """Monkey-patch _build_tech to return a known TechSnapshot."""
    tech = tech or _synthetic_tech()
    async def _bt(symbol, **kwargs):
        return tech
    orch._build_tech = _bt   # type: ignore[assignment]


# ============================================================
# 1. ENTRY: happy path
# ============================================================

def test_entry_happy_path_creates_active_trade():
    brain = FakeBrain(entry_decision=make_entry_decision())
    orch, logger = make_orch_for_test(brain=brain)
    patch_tech(orch)
    asyncio.run(orch.run())
    assert "MES" in orch.state.active_trades
    at = orch.state.active_trades["MES"]
    assert at.entry.direction == "BUY"
    assert at.entry.contracts > 0
    assert at.runtime.current_sl_price == at.entry.sl_price, "SL baseline must be entry.sl"
    assert orch.state.daily.executed_count == 1
    assert orch.state.daily.approved_count == 1
    # An open event was logged
    assert any(e["event"] == "trade_opened" for e in logger.brain_log.events)
    _ok("entry happy path: active_trade created, counters incremented, log emitted")


def test_entry_approved_carries_radar_tech_fields():
    """V18 dashboard radar: entry_approved must embed rsi_m5/rsi_h4/
    h1_compat/macd_*/pattern/atr_ratio/bias/h1_struct_*/volume_weak/
    divergence/bouncing_rsi from the TechSnapshot at decision time, so
    the web UI can render RADAR without secondary lookups."""
    decision = make_entry_decision()
    brain = FakeBrain(entry_decision=decision)
    orch, logger = make_orch_for_test(brain=brain)
    # Custom tech with distinctive values so we know the field came from here.
    custom_tech = _synthetic_tech()
    custom_tech = dataclasses.replace(
        custom_tech,
        rsi=54.3, rsi_prev=50.1,            # bouncing_rsi=True
        rsi_h4=58.7,
        h1_compatibility=0.92,
        macd_hist_last=0.001234,
        macd_decelerating=True,
        hammer=True,                        # pattern label HAMMER
        atr_ratio=1.15,
        bias="BULLISH",
        h1_struct_bull=True, h1_struct_bear=False,
        volume_weak=False,
        divergence="BULLISH",
    )
    patch_tech(orch, tech=custom_tech)
    asyncio.run(orch.run())
    approved = [e for e in logger.brain_log.events if e["event"] == "entry_approved"]
    assert len(approved) == 1
    ev = approved[0]
    assert ev["rsi_m5"] == 54.3
    assert ev["rsi_h4"] == 58.7
    assert ev["h1_compat"] == 0.92
    assert ev["macd_hist"] == 0.001234
    assert ev["macd_decel"] is True
    assert ev["pattern"] == "HAMMER"
    assert ev["atr_ratio"] == 1.15
    assert ev["bias"] == "BULLISH"
    assert ev["h1_struct_bull"] is True
    assert ev["h1_struct_bear"] is False
    assert ev["volume_weak"] is False
    assert ev["divergence"] == "BULLISH"
    assert ev["bouncing_rsi"] is True
    _ok("entry_approved carries full RADAR tech fields (V18)")


def test_entry_approved_event_emitted_in_brain_log():
    """
    BUG 7 fix: brain_log must contain an entry_approved event after a
    Brain decision is accepted (symmetric to entry_rejected). Pre-fix:
    only rejections were logged on brain_log; successful approvals were
    only visible on trade_log -> incomplete decision flow trace.

    TP variante γ: brain emits tp_price=0.0 sentinel + rr_multiplier;
    entry_approved logs tp_price_pre_resolve and rr_multiplier_ai. The
    finalized tp_price lives in the separate tp_resolved event.
    """
    decision = make_entry_decision()
    decision.metadata.update({
        "sl_atr_multiplier": 0.075,
        "clamp_active": False,
        "sl_ticks_used": 30,
        "profile_origin": "US500",
        "risk_multiplier": 1.0,
        "rr_multiplier_ai": 0.50,
        "step_1": "BUONO",
        "step_2": "BUONO",
        "step_3": "FAVOREVOLE",
        "key_risk": "macro headwind",
    })
    brain = FakeBrain(entry_decision=decision)
    orch, logger = make_orch_for_test(brain=brain)
    patch_tech(orch)
    asyncio.run(orch.run())
    approved = [e for e in logger.brain_log.events if e["event"] == "entry_approved"]
    assert len(approved) == 1
    ev = approved[0]
    assert ev["symbol"] == "MES"
    assert ev["direction"] == "BUY"
    assert ev["confidence"] == decision.confidence
    assert ev["sl_atr_multiplier"] == 0.075
    assert ev["rr_multiplier_ai"] == 0.50
    assert ev["clamp_active"] is False
    assert ev["profile_origin"] == "US500"
    assert ev["step_1"] == "BUONO"
    _ok("entry_approved emitted on brain_log with AI/profile diagnostics")


def test_tp_resolved_event_emitted_post_sizing():
    """
    TP variante γ: orchestrator finalizes tp_price post-sizing via
    tp_resolver.resolve_tp_price(...) and emits a tp_resolved event on
    brain_log with the dollar-domain values + rr_effective. Lifecycle
    event, separate from entry_approved.
    """
    decision = make_entry_decision()
    decision.metadata.update({
        "sl_atr_multiplier": 0.075,
        "clamp_active": False,
        "sl_ticks_used": 30,
        "profile_origin": "US500",
        "rr_multiplier_ai": 0.50,
    })
    brain = FakeBrain(entry_decision=decision)
    orch, logger = make_orch_for_test(brain=brain)
    patch_tech(orch)
    asyncio.run(orch.run())
    resolved = [e for e in logger.brain_log.events if e["event"] == "tp_resolved"]
    assert len(resolved) == 1
    ev = resolved[0]
    assert ev["symbol"] == "MES"
    assert ev["direction"] == "BUY"
    assert ev["rr_multiplier_ai"] == 0.50
    assert ev["rr_multiplier_used"] == 0.50      # within hard-clamp range
    assert ev["tp_ticks_final"] >= 1
    assert ev["sl_usd_actual"] > 0
    assert ev["tp_usd_target"] > 0
    # tp_price (resolved) > entry_price for BUY
    assert ev["tp_price"] > ev["entry_price"]
    _ok("tp_resolved emitted on brain_log post-sizing with rr/usd diagnostics")


# ============================================================
# 2. ENTRY: brain returns None -> no change
# ============================================================

def test_entry_brain_none_no_state_change():
    brain = FakeBrain(entry_decision=None)
    orch, _ = make_orch_for_test(brain=brain)
    patch_tech(orch)
    asyncio.run(orch.run())
    assert orch.state.active_trades == {}
    assert orch.state.daily.executed_count == 0
    _ok("entry: brain returns None -> no state change")


# ============================================================
# 3. ENTRY: SIZING_SKIP -> no opener call
# ============================================================

def test_entry_skipped_when_sizing_skip():
    """sizing skip -> SIZING_SKIP rule -> no opener call.

    Con worst-case sizing su MES (sl_ticks→MAX_SL_TICKS=40, sl_usd=$50/ct)
    e MAX_CONTRACTS_PER_TRADE[MES]=6, real_risk=$300. Spingiamo daily_pnl
    vicino al hard_stop in modo che il daily budget cap (0.33 × remaining)
    scenda sotto $300 → trigger risk_exceeds_daily_budget dentro sizing.
    """
    state = make_state()
    state.daily.daily_pnl = -1200.0  # remaining=300 → cap=99 < 300
    decision = EntryDecision(
        direction="BUY", entry_price=5800.0, sl_price=5790.0, tp_price=5810.0,
        rr_multiplier=0.50, confidence=80, rationale="sizing-skip target")
    brain = FakeBrain(entry_decision=decision)
    orch, logger = make_orch_for_test(brain=brain, state=state)
    patch_tech(orch)
    asyncio.run(orch.run())
    assert orch.state.active_trades == {}, "no trade must be opened"
    assert orch.state.daily.rejected_count >= 1
    # Risk-skip event with rule=SIZING_SKIP must be logged
    risk_skips = [e for e in logger.brain_log.events if e["event"] == "scan_skip_risk"]
    assert any(e.get("rule") == "SIZING_SKIP" for e in risk_skips), risk_skips
    _ok("entry: huge SL -> SIZING_SKIP propagated, no opener call")


# ============================================================
# 4. ENTRY: HALTED -> no opener call
# ============================================================

def test_entry_skipped_when_risk_halted():
    state = make_state()
    state.halted = True
    state.halt_reason = "manual"
    brain = FakeBrain(entry_decision=make_entry_decision())
    orch, logger = make_orch_for_test(brain=brain, state=state)
    patch_tech(orch)
    asyncio.run(orch.run())
    assert orch.state.active_trades == {}
    assert any(
        e["event"] == "scan_skip_risk" and e.get("rule") == "HALTED"
        for e in logger.brain_log.events
    )
    _ok("entry: state.halted -> HALTED rule, no opener")


# ============================================================
# 5. ENTRY: DAILY_LOSS_HARD_STOP_HIT
# ============================================================

def test_entry_skipped_when_daily_loss_hard_stop_hit():
    """
    With daily_pnl <= hard_stop, the daily remaining budget is 0 ->
    sizing.size_for_entry skips with reason 'budget_hard' -> the SIZING_SKIP
    gate (rule #1, ordering by design) fires BEFORE the DAILY_LOSS_HARD_STOP_HIT
    gate. The DAILY_LOSS_HARD_STOP_HIT rule is reachable only when sizing
    somehow succeeds (inconsistent config).

    Either way, the entry is blocked and no trade opens — what the test asserts.
    """
    state = make_state()
    state.daily.daily_pnl = -1500.0
    cfg = make_config(daily_loss_hard_stop=-1500.0)
    brain = FakeBrain(entry_decision=make_entry_decision())
    orch, logger = make_orch_for_test(brain=brain, state=state, cfg=cfg)
    patch_tech(orch)
    asyncio.run(orch.run())
    assert orch.state.active_trades == {}
    risk_skips = [e for e in logger.brain_log.events if e["event"] == "scan_skip_risk"]
    assert risk_skips, "expected at least one scan_skip_risk event"
    # Either SIZING_SKIP (budget-exhausted via sizing) or DAILY_LOSS_HARD_STOP_HIT
    assert risk_skips[-1].get("rule") in (
        "SIZING_SKIP", "DAILY_LOSS_HARD_STOP_HIT",
    ), risk_skips[-1]
    _ok("entry: daily_pnl <= hard_stop -> entry blocked (SIZING_SKIP or HARD_STOP)")


# ============================================================
# 6. ENTRY: counters
# ============================================================

def test_entry_increments_counters():
    brain = FakeBrain(entry_decision=make_entry_decision())
    orch, _ = make_orch_for_test(brain=brain)
    patch_tech(orch)
    asyncio.run(orch.run())
    assert orch.state.daily.approved_count == 1
    assert orch.state.daily.executed_count == 1
    assert orch.state.daily.rejected_count == 0
    _ok("entry: approved + executed counters incremented")


# ============================================================
# 7. ENTRY: opener failure -> no state mutation
# ============================================================

def test_entry_opener_failure_no_state_mutation():
    """Opener returns success=False -> state untouched."""
    brain = FakeBrain(entry_decision=make_entry_decision())
    cfg = make_config()
    state = make_state()
    logger = FakeLogger()
    # Make a broken opener that always fails
    class BrokenOpener:
        async def open_trade(self, **kwargs):
            from trading.trade_opener import TradeOpenResult
            return TradeOpenResult(success=False, error="boom", error_kind="config_error")
    orch = Orchestrator(
        config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
        state=state, store=_MemoryStore(), logger=logger,
        brain_dispatch={"TF": brain, "MR": brain},
        opener=BrokenOpener(), closer=TradeCloser(broker=None, is_paper=True),
        risk_manager=RiskManager(cfg, state),
        now_utc_provider=lambda: FIXED_DAY_UTC,
        max_iterations=1,
    )
    patch_tech(orch)
    asyncio.run(orch.run())
    assert state.active_trades == {}
    assert state.daily.executed_count == 0
    assert any(e["event"] == "open_failed" for e in logger.brain_log.events)
    _ok("entry: opener failure -> no state mutation, open_failed logged")


# ============================================================
# 8. ENTRY: MAX_OPEN_TRADES_REACHED
# ============================================================

def test_entry_skipped_when_max_open_trades_reached():
    state = make_state()
    # Pre-populate with five active trades (V18 cap = max_open_trades_total=5)
    for sym in ("6E", "GC", "CL", "NQ", "RTY"):
        e = make_trade_entry(symbol=sym)
        state.active_trades[sym] = ActiveTrade(entry=e, runtime=TradeRuntime())
    brain = FakeBrain(entry_decision=make_entry_decision())
    orch, logger = make_orch_for_test(brain=brain, state=state)
    patch_tech(orch)
    asyncio.run(orch.run())
    # Original five still there; MES (filter target) NOT added
    assert "MES" not in orch.state.active_trades
    _ok("entry: 5/5 open trades -> MAX_OPEN_TRADES_REACHED, MES skipped")


# ============================================================
# 9. EXIT: BrainDecision.action = EXIT closes trade
# ============================================================

def test_exit_action_closes_trade():
    state = make_state()
    e = make_trade_entry()
    state.active_trades["MES"] = ActiveTrade(entry=e, runtime=TradeRuntime())
    brain = FakeBrain(
        entry_decision=None,
        exit_decision=BrainDecision(action="EXIT", reason="brain_exit"),
    )
    orch, logger = make_orch_for_test(brain=brain, state=state)
    patch_tech(orch)
    asyncio.run(orch.run())
    assert "MES" not in orch.state.active_trades
    assert any(ev["event"] == "trade_closed" for ev in logger.brain_log.events)
    _ok("exit: action=EXIT -> closer success, state.active_trades cleared")


# ============================================================
# 10-11. EXIT: register_tp_hit on win, register_sl_hit on loss
# ============================================================

def test_exit_win_calls_register_tp_hit():
    state = make_state()
    e = make_trade_entry(direction="BUY")   # entry 5800, sl 5790, tp 5820
    state.active_trades["MES"] = ActiveTrade(entry=e, runtime=TradeRuntime())
    brain = FakeBrain(
        entry_decision=None,
        exit_decision=BrainDecision(action="EXIT", reason="tp_target"),
    )
    # tech price 5820 -> winning exit
    orch, _ = make_orch_for_test(brain=brain, state=state)
    patch_tech(orch, tech=_synthetic_tech(price=5820.0))
    asyncio.run(orch.run())
    # consecutive_sl reset to 0 via register_tp_hit
    assert state.metadata.get("consecutive_sl_count", {}).get("MES", 0) == 0
    # cooldown_until NOT set (TP doesn't extend cooldown)
    assert "MES" not in state.cooldown.cooldown_until
    _ok("exit-win: register_tp_hit -> consecutive_sl=0, no cooldown set")


def test_exit_loss_calls_register_sl_hit_with_cooldown():
    state = make_state()
    e = make_trade_entry(direction="BUY")
    state.active_trades["MES"] = ActiveTrade(entry=e, runtime=TradeRuntime())
    brain = FakeBrain(
        entry_decision=None,
        exit_decision=BrainDecision(action="EXIT", reason="sl_hit"),
    )
    # tech price 5790 -> losing exit (entry 5800 - 10pt SL distance)
    orch, _ = make_orch_for_test(brain=brain, state=state)
    patch_tech(orch, tech=_synthetic_tech(price=5790.0))
    asyncio.run(orch.run())
    assert state.metadata.get("consecutive_sl_count", {}).get("MES", 0) == 1
    assert "MES" in state.cooldown.cooldown_until
    _ok("exit-loss: register_sl_hit -> consecutive=1, cooldown_until set")


# ============================================================
# 12. EXIT: closer failure keeps active_trade
# ============================================================

def test_exit_closer_failure_keeps_active_trade():
    state = make_state()
    e = make_trade_entry()
    state.active_trades["MES"] = ActiveTrade(entry=e, runtime=TradeRuntime())
    brain = FakeBrain(
        entry_decision=None,
        exit_decision=BrainDecision(action="EXIT", reason="brain_exit"),
    )
    # Use live broker that fails close_position
    broker = FakeBroker()
    broker.fail_close_position = True
    # BUG A guard: position must be reported as still open broker-side,
    # otherwise _check_external_close would clean up active_trades before
    # the closer is reached. This test exercises closer-failure path only.
    broker.positions_to_return = [
        Position(symbol="MES", direction="BUY", contracts=2, avg_price=5800.0),
    ]
    cfg = make_config()
    logger = FakeLogger()
    closer = TradeCloser(broker=broker, is_paper=False, logger=logger)
    # Mark trade as live so closer goes through broker path
    e_live = TradeEntry(
        **{**{f.name: getattr(e, f.name) for f in e.__dataclass_fields__.values()},
           "is_paper": False}  # type: ignore[misc]
    )
    state.active_trades["MES"] = ActiveTrade(entry=e_live, runtime=TradeRuntime())
    risk = RiskManager(cfg, state, logger=logger)
    orch = Orchestrator(
        config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
        state=state, store=_MemoryStore(), logger=logger,
        brain_dispatch={"TF": brain, "MR": brain},
        opener=TradeOpener(broker=broker, is_paper=False),
        closer=closer, risk_manager=risk, broker=broker,
        now_utc_provider=lambda: FIXED_DAY_UTC,
        max_iterations=1,
    )
    patch_tech(orch)
    asyncio.run(orch.run())
    assert "MES" in orch.state.active_trades, "closer failure must NOT clear active_trade"
    _ok("exit: broker close_position fail -> active_trade preserved")


# ============================================================
# 12b. BUG A: external close detection (broker-side TP/SL fill)
# ============================================================

def test_manage_skips_when_position_closed_externally():
    """
    BUG A fix: ProjectX SL/TP not server-side OCO. When TP fills,
    SL stays open (and viceversa). manage_one polls positions_get
    before brain.manage_exit — if position is gone, cancel orphans
    + cleanup state + emit position_closed_externally event.
    """
    state = make_state()
    e = make_trade_entry()
    e_live = TradeEntry(
        **{**{f.name: getattr(e, f.name) for f in e.__dataclass_fields__.values()},
           "is_paper": False}  # type: ignore[misc]
    )
    state.active_trades["MES"] = ActiveTrade(entry=e_live, runtime=TradeRuntime())
    brain = FakeBrain(
        entry_decision=None,
        # If brain.manage_exit were called, it would say EXIT — assert
        # that it is NOT called when external close is detected.
        exit_decision=BrainDecision(action="EXIT", reason="should_not_run"),
    )
    broker = FakeBroker()
    broker.positions_to_return = []   # broker says: position is gone
    cfg = make_config()
    logger = FakeLogger()
    closer = TradeCloser(broker=broker, is_paper=False, logger=logger)
    risk = RiskManager(cfg, state, logger=logger)
    orch = Orchestrator(
        config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
        state=state, store=_MemoryStore(), logger=logger,
        brain_dispatch={"TF": brain, "MR": brain},
        opener=TradeOpener(broker=broker, is_paper=False),
        closer=closer, risk_manager=risk, broker=broker,
        now_utc_provider=lambda: FIXED_DAY_UTC,
        max_iterations=1,
    )
    patch_tech(orch)
    asyncio.run(orch.run())

    # State cleared
    assert "MES" not in orch.state.active_trades, \
        "external close must drop active_trade"
    # cancel_all_for_symbol called for orphan cleanup
    assert "MES" in broker.cancel_all_calls, \
        f"orphan cleanup must call cancel_all_for_symbol, got {broker.cancel_all_calls}"
    # brain.manage_exit was NOT called (we short-circuited)
    assert brain.manage_calls == 0, \
        f"manage_exit must NOT run when position closed externally, got {brain.manage_calls}"
    # closer.close_trade was NOT called (no in-flight close decision)
    assert broker.close_calls == [], \
        f"close_position must NOT be called on external close, got {broker.close_calls}"
    # Structured event emitted
    events = [ev for ev in logger.brain_log.events
              if ev["event"] == "position_closed_externally"]
    assert len(events) == 1, f"expected 1 event, got {events}"
    ev = events[0]
    assert ev["symbol"] == "MES"
    assert ev["brain"] == "TF"
    assert ev["direction"] == "BUY"
    assert ev["entry_price"] == e_live.entry_price
    assert ev["sl_price"] == e_live.sl_price
    assert ev["tp_price"] == e_live.tp_price
    assert ev["stop_id"] == "S1"
    assert ev["target_id"] == "T1"
    _ok("BUG A: external close -> cancel orphans + state cleanup + event")


def test_manage_runs_brain_when_position_still_open():
    """
    BUG A fix: positions_get returns the open position -> manage_exit
    runs normally (no spurious cleanup).
    """
    state = make_state()
    e = make_trade_entry()
    e_live = TradeEntry(
        **{**{f.name: getattr(e, f.name) for f in e.__dataclass_fields__.values()},
           "is_paper": False}  # type: ignore[misc]
    )
    state.active_trades["MES"] = ActiveTrade(entry=e_live, runtime=TradeRuntime())
    brain = FakeBrain(
        entry_decision=None,
        exit_decision=BrainDecision(action="HOLD", reason="hold_default"),
    )
    broker = FakeBroker()
    broker.positions_to_return = [
        Position(symbol="MES", direction="BUY", contracts=2, avg_price=5800.0),
    ]
    cfg = make_config()
    logger = FakeLogger()
    closer = TradeCloser(broker=broker, is_paper=False, logger=logger)
    risk = RiskManager(cfg, state, logger=logger)
    orch = Orchestrator(
        config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
        state=state, store=_MemoryStore(), logger=logger,
        brain_dispatch={"TF": brain, "MR": brain},
        opener=TradeOpener(broker=broker, is_paper=False),
        closer=closer, risk_manager=risk, broker=broker,
        now_utc_provider=lambda: FIXED_DAY_UTC,
        max_iterations=1,
    )
    patch_tech(orch)
    asyncio.run(orch.run())

    # State preserved
    assert "MES" in orch.state.active_trades, \
        "open position must NOT be cleaned up"
    # No orphan cleanup
    assert broker.cancel_all_calls == [], \
        f"no cleanup expected when position is open, got {broker.cancel_all_calls}"
    # brain.manage_exit was called normally
    assert brain.manage_calls == 1, \
        f"manage_exit must run when position is open, got {brain.manage_calls}"
    # No external-close event
    events = [ev for ev in logger.brain_log.events
              if ev["event"] == "position_closed_externally"]
    assert events == [], f"no event expected, got {events}"
    _ok("BUG A: position still open -> brain runs, no spurious cleanup")


def test_manage_skips_external_check_for_paper_trades():
    """
    BUG A fix: paper trades have no broker-side counterpart, so
    positions_get must NOT be polled. Check skipped via is_paper guard.
    """
    state = make_state()
    e = make_trade_entry()   # default is_paper=True
    state.active_trades["MES"] = ActiveTrade(entry=e, runtime=TradeRuntime())
    brain = FakeBrain(
        entry_decision=None,
        exit_decision=BrainDecision(action="HOLD", reason="hold"),
    )
    broker = FakeBroker()
    broker.positions_to_return = []   # would trigger cleanup if check fired
    orch, logger = make_orch_for_test(brain=brain, state=state, broker=broker)
    patch_tech(orch)
    asyncio.run(orch.run())

    # Paper trade survives even though broker says no positions
    assert "MES" in orch.state.active_trades
    # positions_get NOT called for paper (skip guard)
    # (it may be called by other paths like opener pre-check on a FRESH
    # entry; here no entry is opened because active_trades already has MES,
    # so opener's pre-check positions_get never runs)
    assert broker.positions_get_calls == 0, \
        f"paper trade must not poll positions_get, got {broker.positions_get_calls}"
    _ok("BUG A: paper trade skips broker-side check")


# ============================================================
# 12c. V18 fix: external close -> recover P&L + risk_manager hooks
# ============================================================

def _external_close_setup(*, recent_trades: list[ClosedTrade], direction="BUY"):
    """Helper: build a live trade closed externally, broker returns the
    given recent_trades list. Returns (orch, broker, risk, state, logger).
    """
    state = make_state()
    e = make_trade_entry(direction=direction)
    e_live = TradeEntry(
        **{**{f.name: getattr(e, f.name) for f in e.__dataclass_fields__.values()},
           "is_paper": False}  # type: ignore[misc]
    )
    state.active_trades["MES"] = ActiveTrade(entry=e_live, runtime=TradeRuntime())
    brain = FakeBrain(
        entry_decision=None,
        exit_decision=BrainDecision(action="EXIT", reason="should_not_run"),
    )
    broker = FakeBroker()
    broker.positions_to_return = []                       # broker-side flat
    broker.recent_trades_to_return = recent_trades        # programmable history
    cfg = make_config()
    logger = FakeLogger()
    closer = TradeCloser(broker=broker, is_paper=False, logger=logger)
    risk = RiskManager(cfg, state, logger=logger)
    orch = Orchestrator(
        config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
        state=state, store=_MemoryStore(), logger=logger,
        brain_dispatch={"TF": brain, "MR": brain},
        opener=TradeOpener(broker=broker, is_paper=False),
        closer=closer, risk_manager=risk, broker=broker,
        now_utc_provider=lambda: FIXED_DAY_UTC,
        max_iterations=1,
    )
    patch_tech(orch)
    return orch, broker, risk, state, logger


def test_external_close_loss_recovers_pnl_and_registers_sl_hit():
    """
    V18: external close with broker.recent_trades returning a closing
    leg (BUY position -> SELL closing trade) with negative P&L:
      - pnl_usd recovered from broker history
      - daily_pnl decremented
      - register_sl_hit fires (consecutive_sl counter += 1, cooldown set)
      - tf_losses += 1
      - position_closed_externally event has pnl_source="broker"
    """
    closing = ClosedTrade(
        trade_id="t1", symbol="CON.F.US.ES.M25",  # contractId, NOT short
        contracts=2, side="SELL",                  # opposite of BUY position
        exit_price=5790.0, pnl_usd=-50.0, closed_at="2026-04-28T14:05:00Z",
    )
    orch, broker, risk, state, logger = _external_close_setup(
        recent_trades=[closing], direction="BUY",
    )

    asyncio.run(orch.run())

    # State cleared
    assert "MES" not in state.active_trades
    # P&L recovered + applied
    assert risk.state.daily.daily_pnl == -50.0
    assert risk.state.brain.tf_losses == 1
    assert risk.state.brain.tf_wins == 0
    # SL hit registered (cooldown + consecutive counter)
    counters = risk.state.metadata.get("consecutive_sl_count", {})
    assert counters.get("MES") == 1, f"consecutive_sl_count must be 1, got {counters}"
    assert "MES" in risk.state.cooldown.cooldown_until
    # Event reflects broker-recovered P&L
    events = [ev for ev in logger.brain_log.events
              if ev["event"] == "position_closed_externally"]
    assert len(events) == 1, f"expected 1 event, got {events}"
    assert events[0]["pnl_usd"] == -50.0
    assert events[0]["pnl_source"] == "broker"
    assert events[0]["exit_price"] == 5790.0
    _ok("V18: external close LOSS -> pnl recovered, register_sl_hit fires")


def test_external_close_win_calls_register_tp_hit_and_resets_consecutive_sl():
    """
    V18: external close with positive P&L resets consecutive_sl_count
    via register_tp_hit and increments tf_wins. No cooldown extension.
    """
    closing = ClosedTrade(
        trade_id="t2", symbol="CON.F.US.ES.M25",
        contracts=2, side="SELL", exit_price=5820.0,
        pnl_usd=+80.0, closed_at="2026-04-28T14:05:00Z",
    )
    orch, broker, risk, state, logger = _external_close_setup(
        recent_trades=[closing], direction="BUY",
    )
    # Pre-existing 2 consecutive losses to confirm reset behavior
    risk.state.metadata["consecutive_sl_count"] = {"MES": 2}

    asyncio.run(orch.run())

    assert "MES" not in state.active_trades
    assert risk.state.daily.daily_pnl == +80.0
    assert risk.state.brain.tf_wins == 1
    assert risk.state.brain.tf_losses == 0
    # register_tp_hit zeroes the counter
    counters = risk.state.metadata.get("consecutive_sl_count", {})
    assert counters.get("MES") == 0, \
        f"register_tp_hit must reset consecutive_sl_count, got {counters}"
    events = [ev for ev in logger.brain_log.events
              if ev["event"] == "position_closed_externally"]
    assert events[0]["pnl_usd"] == +80.0
    assert events[0]["pnl_source"] == "broker"
    _ok("V18: external close WIN -> register_tp_hit, consecutive_sl reset")


def test_external_close_no_match_skips_risk_hooks():
    """
    V18: when recent_trades returns no matching closing leg (or the API
    failed), risk_manager hooks must NOT fire — counters stay un-touched.
    The trade is still dropped from state, but pnl_source="unknown" is
    emitted so the operator can reconcile manually.
    """
    # Only a same-side leg (BUY) matching the entry, no closing SELL leg:
    same_side = ClosedTrade(
        trade_id="t3", symbol="CON.F.US.ES.M25",
        contracts=2, side="BUY", exit_price=5800.0,
        pnl_usd=0.0, closed_at="2026-04-28T13:55:00Z",
    )
    orch, broker, risk, state, logger = _external_close_setup(
        recent_trades=[same_side], direction="BUY",
    )

    asyncio.run(orch.run())

    # State cleared (cleanup still runs)
    assert "MES" not in state.active_trades
    # Counters un-touched (no match found, can't trust)
    assert risk.state.daily.daily_pnl == 0.0
    assert risk.state.brain.tf_wins == 0
    assert risk.state.brain.tf_losses == 0
    counters = risk.state.metadata.get("consecutive_sl_count", {})
    assert counters.get("MES", 0) == 0
    assert "MES" not in risk.state.cooldown.cooldown_until
    # Event reflects gap
    events = [ev for ev in logger.brain_log.events
              if ev["event"] == "position_closed_externally"]
    assert len(events) == 1
    assert events[0]["pnl_usd"] is None
    assert events[0]["pnl_source"] == "unknown"
    _ok("V18: external close no-match -> counters un-touched, gap logged")


# ============================================================
# 13-14. PARTIAL_50 with / without set_be
# ============================================================

def test_partial_50_with_set_be_modifies_stop_and_runtime():
    """V18 12-mag — partial via opposite order: SL passa a BE direttamente
    nel nuovo bracket (no separate modify_stop). In paper mode, il closer
    skip-pa il broker call ma l'orchestrator aggiorna comunque
    runtime.current_sl_price al nuovo prezzo."""
    state = make_state()
    e = make_trade_entry(contracts=4)
    state.active_trades["MES"] = ActiveTrade(entry=e, runtime=TradeRuntime(
        current_sl_price=e.sl_price,
    ))
    decision = BrainDecision(
        action="PARTIAL_50", reason="rsi50_partial",
        metadata={"set_be_after_partial": True},
    )
    brain = FakeBrain(entry_decision=None, exit_decision=decision)
    broker = FakeBroker()
    cfg = make_config()
    logger = FakeLogger()
    risk = RiskManager(cfg, state, logger=logger)
    orch = Orchestrator(
        config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
        state=state, store=_MemoryStore(), logger=logger,
        brain_dispatch={"TF": brain, "MR": brain},
        opener=TradeOpener(broker=broker, is_paper=True),
        closer=TradeCloser(broker=broker, is_paper=True),
        risk_manager=risk, broker=broker,
        max_iterations=1,
    )
    patch_tech(orch, tech=_synthetic_tech(price=5810.0))   # winning partial
    asyncio.run(orch.run())
    at = orch.state.active_trades["MES"]
    assert at.runtime.partial_done is True
    # V18: modify_stop NON più chiamato — il nuovo bracket è già a BE.
    assert broker.modify_stop_calls == [], broker.modify_stop_calls
    # Runtime SL aggiornato direttamente dall'orchestrator a BE.
    assert at.runtime.current_sl_price == e.entry_price
    _ok("partial_50 + set_be: partial_done=True, no modify_stop, runtime SL=entry")


def test_partial_50_live_rewires_bracket_ids_on_success():
    """V18 12-mag — live partial: broker.partial_close_via_opposite_order
    returns new stop/target IDs; orchestrator swaps them into active.entry."""
    state = make_state()
    e = make_trade_entry(contracts=4)
    # Stash distinctive original IDs and force is_paper=False so the
    # closer routes through the broker.
    e_with_ids = dataclasses.replace(
        e, stop_order_id="OLD-STOP", target_order_id="OLD-TARGET",
        is_paper=False,
    )
    state.active_trades["MES"] = ActiveTrade(entry=e_with_ids, runtime=TradeRuntime(
        current_sl_price=e_with_ids.sl_price,
    ))
    decision = BrainDecision(
        action="PARTIAL_50", reason="rsi50_partial",
        metadata={"set_be_after_partial": True, "be_price": e.entry_price},
    )
    brain = FakeBrain(entry_decision=None, exit_decision=decision)
    broker = FakeBroker()
    broker.next_partial_ids = ("CLOSE-X", "NEW-STOP", "NEW-TARGET")
    # _check_external_close polls positions_get before every manage tick;
    # if empty in live mode, it treats the position as externally closed.
    broker.positions_to_return = [
        Position(symbol="MES", direction="BUY", contracts=4, avg_price=5800.0),
    ]
    cfg = make_config()
    logger = FakeLogger()
    risk = RiskManager(cfg, state, logger=logger)
    orch = Orchestrator(
        config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
        state=state, store=_MemoryStore(), logger=logger,
        brain_dispatch={"TF": brain, "MR": brain},
        # is_paper=False so the closer routes through the broker.
        opener=TradeOpener(broker=broker, is_paper=False),
        closer=TradeCloser(broker=broker, is_paper=False),
        risk_manager=risk, broker=broker,
        max_iterations=1,
    )
    patch_tech(orch, tech=_synthetic_tech(price=5810.0))
    asyncio.run(orch.run())
    at = orch.state.active_trades["MES"]
    # 1. Broker received the new partial method (NOT close_position).
    assert len(broker.partial_close_via_opposite_calls) == 1
    call = broker.partial_close_via_opposite_calls[0]
    assert call["symbol"] == "MES"
    assert call["direction"] == "BUY"
    assert call["contracts_to_close"] == 2          # 4 // 2
    assert call["residual_contracts"] == 2
    assert call["new_sl_price"] == e.entry_price    # BE
    assert call["new_tp_price"] == e.tp_price
    assert call["old_stop_order_id"] == "OLD-STOP"
    assert call["old_target_order_id"] == "OLD-TARGET"
    assert broker.close_calls == []                 # old path NOT taken
    # 2. State: partial_done=True, current_sl=BE, IDs swapped to new ones.
    assert at.runtime.partial_done is True
    assert at.runtime.current_sl_price == e.entry_price
    assert at.entry.stop_order_id == "NEW-STOP"
    assert at.entry.target_order_id == "NEW-TARGET"
    _ok("partial_50 live: opposite-order called + bracket IDs rewired")


def test_partial_50_failure_sets_partial_done_to_block_retries():
    """V18 12-mag — broker rejects partial → runtime.partial_done=True
    anche su fallimento, per evitare retry infiniti ogni tick."""
    state = make_state()
    e_paper = make_trade_entry(contracts=4)
    e = dataclasses.replace(e_paper, is_paper=False)
    state.active_trades["MES"] = ActiveTrade(entry=e, runtime=TradeRuntime(
        current_sl_price=e.sl_price,
        partial_done=False,
    ))
    decision = BrainDecision(
        action="PARTIAL_50", reason="auto_partial",
        metadata={"set_be_after_partial": True, "be_price": e.entry_price},
    )
    brain = FakeBrain(entry_decision=None, exit_decision=decision)
    broker = FakeBroker()
    broker.fail_partial_close_via_opposite = True
    broker.positions_to_return = [
        Position(symbol="MES", direction="BUY", contracts=4, avg_price=5800.0),
    ]
    cfg = make_config()
    logger = FakeLogger()
    risk = RiskManager(cfg, state, logger=logger)
    orch = Orchestrator(
        config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
        state=state, store=_MemoryStore(), logger=logger,
        brain_dispatch={"TF": brain, "MR": brain},
        opener=TradeOpener(broker=broker, is_paper=False),
        closer=TradeCloser(broker=broker, is_paper=False),
        risk_manager=risk, broker=broker,
        max_iterations=1,
    )
    patch_tech(orch, tech=_synthetic_tech(price=5810.0))
    asyncio.run(orch.run())
    at = orch.state.active_trades["MES"]
    # Broker WAS called (got rejected), runtime marked done to block retries.
    assert len(broker.partial_close_via_opposite_calls) == 1
    assert at.runtime.partial_done is True
    # SL/TP IDs NOT swapped (success=False), original entry preserved.
    assert at.entry.stop_order_id == e.stop_order_id
    assert at.entry.target_order_id == e.target_order_id
    _ok("partial_50 failure: partial_done=True (block retries), entry IDs preserved")


def test_partial_50_without_set_be():
    state = make_state()
    e = make_trade_entry(contracts=4)
    state.active_trades["MES"] = ActiveTrade(entry=e, runtime=TradeRuntime(
        current_sl_price=e.sl_price,
    ))
    decision = BrainDecision(
        action="PARTIAL_50", reason="auto_partial",
        metadata={},   # no set_be_after_partial
    )
    brain = FakeBrain(entry_decision=None, exit_decision=decision)
    broker = FakeBroker()
    orch, _ = make_orch_for_test(brain=brain, state=state, broker=broker)
    patch_tech(orch, tech=_synthetic_tech(price=5810.0))
    asyncio.run(orch.run())
    at = orch.state.active_trades["MES"]
    assert at.runtime.partial_done is True
    # No set_be -> no modify_stop call
    assert broker.modify_stop_calls == []
    # SL stays at original
    assert at.runtime.current_sl_price == e.sl_price
    _ok("partial_50 no set_be: partial_done=True, NO modify_stop, SL unchanged")


# ============================================================
# 15-16. MOVE_SL breakeven + trailing
# ============================================================

def test_move_sl_breakeven_updates_runtime_sl_price():
    state = make_state()
    e = make_trade_entry()
    state.active_trades["MES"] = ActiveTrade(entry=e, runtime=TradeRuntime(
        current_sl_price=e.sl_price,
    ))
    decision = BrainDecision(
        action="MOVE_SL", reason="move_to_be", move_sl_to=e.entry_price,
        metadata={"sl_target": "breakeven"},
    )
    brain = FakeBrain(entry_decision=None, exit_decision=decision)
    broker = FakeBroker()
    orch, _ = make_orch_for_test(brain=brain, state=state, broker=broker)
    patch_tech(orch)
    asyncio.run(orch.run())
    assert broker.modify_stop_calls[-1]["new_sl_price"] == e.entry_price
    assert orch.state.active_trades["MES"].runtime.current_sl_price == e.entry_price
    _ok("MOVE_SL breakeven: broker.modify_stop called, runtime.current_sl_price updated")


def test_move_sl_trailing_updates_runtime_sl_price():
    state = make_state()
    e = make_trade_entry()
    state.active_trades["MES"] = ActiveTrade(entry=e, runtime=TradeRuntime(
        current_sl_price=e.sl_price,
    ))
    new_trail = 5805.0
    decision = BrainDecision(
        action="MOVE_SL", reason="trail", move_sl_to=new_trail,
        metadata={"sl_target": "trailing"},
    )
    brain = FakeBrain(entry_decision=None, exit_decision=decision)
    broker = FakeBroker()
    orch, _ = make_orch_for_test(brain=brain, state=state, broker=broker)
    patch_tech(orch)
    asyncio.run(orch.run())
    assert broker.modify_stop_calls[-1]["new_sl_price"] == new_trail
    assert orch.state.active_trades["MES"].runtime.current_sl_price == new_trail
    _ok("MOVE_SL trailing: runtime.current_sl_price updated")


# ============================================================
# 17. HOLD: candle dedup persists
# ============================================================

def test_hold_persists_runtime_last_exit_eval_time():
    state = make_state()
    e = make_trade_entry()
    state.active_trades["MES"] = ActiveTrade(entry=e, runtime=TradeRuntime())
    decision = BrainDecision(
        action="HOLD", reason="grace_period",
        metadata={"evaluated_candle_time": 1730000000},
    )
    brain = FakeBrain(entry_decision=None, exit_decision=decision)
    orch, _ = make_orch_for_test(brain=brain, state=state)
    patch_tech(orch)
    asyncio.run(orch.run())
    assert orch.state.active_trades["MES"].runtime.last_exit_eval_time == 1730000000.0
    _ok("HOLD: candle_time persisted in runtime.last_exit_eval_time")


# ============================================================
# 18. CANDLE DEDUP across all action types
# ============================================================

def test_candle_dedup_runtime_field_updated_on_move_sl():
    state = make_state()
    e = make_trade_entry()
    state.active_trades["MES"] = ActiveTrade(entry=e, runtime=TradeRuntime())
    decision = BrainDecision(
        action="MOVE_SL", reason="trail", move_sl_to=5805.0,
        metadata={"sl_target": "trailing", "evaluated_candle_time": 1730000500},
    )
    brain = FakeBrain(entry_decision=None, exit_decision=decision)
    broker = FakeBroker()
    orch, _ = make_orch_for_test(brain=brain, state=state, broker=broker)
    patch_tech(orch)
    asyncio.run(orch.run())
    assert orch.state.active_trades["MES"].runtime.last_exit_eval_time == 1730000500.0
    _ok("candle dedup: MOVE_SL also persists last_exit_eval_time")


# ============================================================
# 19. STATE persists across iterations
# ============================================================

def test_state_persists_across_iterations():
    """Open trade in iter 1, manage_exit HOLD in iter 2 -> active_trade survives."""
    state = make_state()
    decisions = [make_entry_decision(), None]
    class _SeqBrain(FakeBrain):
        def __init__(self):
            super().__init__()
            self.exit_decision = BrainDecision(action="HOLD", reason="hold")
        async def evaluate_entry(self, symbol, tech, *, bias_data=None, last_entry_eval_time=0.0):
            self.evaluate_calls += 1
            d = decisions[min(self.evaluate_calls - 1, 1)]
            ct = float(getattr(tech, "candle_time", 0) or 0) or None
            return EntryEvalResult(decision=d, evaluated_candle_time=ct)
    brain = _SeqBrain()
    orch, _ = make_orch_for_test(brain=brain, state=state, max_iterations=2)
    patch_tech(orch)
    asyncio.run(orch.run())
    assert "MES" in orch.state.active_trades, "trade opened in iter 1 must survive iter 2"
    assert brain.evaluate_calls == 1, \
        "iter 2 must skip evaluate_entry because MES already active"
    # iter 1 runs scan (opens) AND manage_exit (immediately after); iter 2
    # runs manage_exit only -> total 2 manage_exit calls.
    assert brain.manage_calls == 2, \
        f"expected 2 manage_exit calls (iter1 + iter2), got {brain.manage_calls}"
    _ok("state persistence: open in iter 1, hold in iter 2, active_trade preserved")


# ============================================================
# 20. TechSnapshot.to_dict provides all 7 opener fields
# ============================================================

def test_techsnapshot_to_dict_provides_opener_fields():
    snap = _synthetic_tech()
    d = snap.to_dict()
    for key in ("rsi", "rsi_h1", "rsi_h4", "atr_ratio",
                "market_structure", "regime", "h1_compatibility"):
        assert key in d, f"missing key for opener: {key}"
    assert d["market_structure"] == "BULLISH_EXPANSION"
    _ok("TechSnapshot.to_dict: all 7 fields used by trade_opener present")


# ============================================================
# 21. TRADING HOURS FILTER (V14 port)
# ============================================================

def test_trading_hours_inside_passes_to_brain():
    """
    FIXED_DAY_UTC=14:00, MES allowed_hours=[13..18]. Gate passes;
    Brain returns None setup -> scan_skip with brain_no_setup, NOT
    OUTSIDE_TRADING_HOURS. Verifies the gate doesn't false-positive.
    """
    brain = FakeBrain(entry_decision=None)
    orch, logger = make_orch_for_test(brain=brain)
    patch_tech(orch)
    asyncio.run(orch.run())
    skips = [e for e in logger.brain_log.events if e["event"] == "scan_skip"]
    reasons = [e.get("reason") for e in skips]
    assert "OUTSIDE_TRADING_HOURS" not in reasons, \
        f"hour 14:00 inside MES [13..18] must pass; got {reasons}"
    assert "brain_no_setup" in reasons, f"expected brain_no_setup, got {reasons}"
    _ok("trading_hours: 14:00 UTC inside MES window -> passes to brain")


def test_trading_hours_outside_skips_before_brain():
    """
    Clock at 22:00 UTC, MES allowed [13..18] -> OUTSIDE_TRADING_HOURS.
    Skip happens BEFORE brain is called: brain.evaluate_entry must
    NOT have been invoked (V14 cost-saver).
    """
    cfg = make_config()
    state = make_state()
    logger = FakeLogger()
    brain = FakeBrain(entry_decision=None)
    orch = Orchestrator(
        config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
        state=state, store=_MemoryStore(), logger=logger,
        brain_dispatch={"TF": brain, "MR": brain},
        opener=TradeOpener(broker=None, is_paper=True, logger=logger),
        closer=TradeCloser(broker=None, is_paper=True, logger=logger),
        risk_manager=RiskManager(cfg, state, logger=logger),
        broker=None,
        # 22:00 UTC -> outside MES [13..18]
        now_utc_provider=lambda: datetime(2026, 4, 28, 22, 0, 0, tzinfo=timezone.utc),
        max_iterations=1,
    )
    patch_tech(orch)
    asyncio.run(orch.run())
    skips = [e for e in logger.brain_log.events if e["event"] == "scan_skip"]
    assert len(skips) >= 1
    ev = skips[0]
    assert ev["reason"] == "OUTSIDE_TRADING_HOURS"
    assert ev["now_hour_utc"] == 22
    assert 14 in ev["allowed_hours"]
    # Brain must NOT have been called (gate is BEFORE brain)
    assert brain.evaluate_calls == 0, \
        f"brain called {brain.evaluate_calls}× despite outside-hours gate"
    _ok("trading_hours: 22:00 UTC outside MES window -> OUTSIDE_TRADING_HOURS, no brain call")


def test_trading_hours_unknown_symbol_fail_open():
    """
    Symbol not in TRADING_HOURS -> gate is fail-open (don't block
    onboarding of new assets). Even at 22:00 UTC, scan proceeds.
    Achieved by monkey-patching TRADING_HOURS to delete "MES" entry.
    """
    import core.config_futures as cfg_fut
    saved = cfg_fut.TRADING_HOURS.pop("MES")
    try:
        cfg = make_config()
        state = make_state()
        logger = FakeLogger()
        brain = FakeBrain(entry_decision=None)
        orch = Orchestrator(
            config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
            state=state, store=_MemoryStore(), logger=logger,
            brain_dispatch={"TF": brain, "MR": brain},
            opener=TradeOpener(broker=None, is_paper=True, logger=logger),
            closer=TradeCloser(broker=None, is_paper=True, logger=logger),
            risk_manager=RiskManager(cfg, state, logger=logger),
            broker=None,
            now_utc_provider=lambda: datetime(2026, 4, 28, 22, 0, 0, tzinfo=timezone.utc),
            max_iterations=1,
        )
        patch_tech(orch)
        asyncio.run(orch.run())
    finally:
        cfg_fut.TRADING_HOURS["MES"] = saved
    skips = [e for e in logger.brain_log.events if e["event"] == "scan_skip"]
    reasons = [e.get("reason") for e in skips]
    assert "OUTSIDE_TRADING_HOURS" not in reasons, \
        f"unknown symbol must fail-open on hours gate, got {reasons}"
    # Past the gate, downstream stages may still skip (selector, brain,
    # etc.). The proof of fail-open is just: the OUTSIDE_TRADING_HOURS
    # event does not appear, so the gate did not block.
    assert len(reasons) >= 1, "scan must have run past the hours gate"
    _ok("trading_hours: symbol absent from TRADING_HOURS -> fail-open (gate bypassed)")


def test_trading_hours_per_asset_coverage_liquidity_tuned():
    """
    Liquidity-tuned UTC ranges sanity (post V14 baseline + market review):
      - Equity index micros (MES/MNQ/MYM): cash open + EU/US overlap,
        DROP 18 UTC (= 14 ET "lunch lull" chop).
      - MGC: London AM/PM fix + US data window, DROP pre-London thin
        (< 8 UTC).
      - MCL: NYMEX pit window 13-17, DROP 18 (pre-pit-close gap risk).
      - 6E: Frankfurt → London close, DROP pre-Frankfurt + post-London.
      - 6B: London-bound, ends at 16 UTC (London close fix).
      - 6A: Tokyo prime + US session, DROP EU mattina (6-8) + US PM zombie.
      - 6J: Tokyo + London + US-prime, DROP Tokyo lunch (6) + US PM lull.
      - 6C: USA-only futures, restricted to [12-17] (CAD thin outside US).
    """
    import core.config_futures as cfg_fut
    th = cfg_fut.TRADING_HOURS

    # Equity index micros: USA cash session present, lunch-lull dropped
    for sym in ("MES", "MNQ", "MYM"):
        assert 14 in th[sym], f"{sym} must allow 14:00 UTC (US cash open)"
        assert 18 not in th[sym], f"{sym} must DROP 18:00 UTC (lunch lull)"

    # MGC: London AM fix area + US data; pre-London thin dropped
    assert 6 not in th["MGC"], "MGC must DROP 06 UTC (pre-London thin)"
    assert 7 not in th["MGC"], "MGC must DROP 07 UTC (pre-London thin)"
    assert 8 in th["MGC"], "MGC must allow 08 UTC (London AM warming up)"
    assert 18 in th["MGC"], "MGC must allow 18 UTC (Gold stays liquid in US PM)"

    # MCL: NYMEX pit window, 18 dropped (pre-close gap risk)
    assert 13 in th["MCL"]
    assert 17 in th["MCL"]
    assert 18 not in th["MCL"], "MCL must DROP 18 UTC (NYMEX pit closes 18:30)"
    assert 11 not in th["MCL"], "MCL must DROP 11 UTC (no European pre-session)"

    # 6E: Frankfurt-bounded, post-London dropped
    assert 6 not in th["6E"], "6E must DROP 06 UTC (pre-Frankfurt thin)"
    assert 7 in th["6E"]
    assert 16 in th["6E"]
    assert 17 not in th["6E"], "6E must DROP 17 UTC (post-London, USD-only chop)"

    # 6B: London-bound, ends at 16
    assert 7 in th["6B"]
    assert 16 in th["6B"]
    assert 17 not in th["6B"]

    # 6A: Asia core 0-5 + US session 11-16; EU morning + US PM dropped
    for h in (0, 3, 5):
        assert h in th["6A"], f"6A must allow {h}:00 UTC (Asia core)"
    for h in (6, 7, 8, 17, 18):
        assert h not in th["6A"], f"6A must DROP {h}:00 UTC (zombie hour)"

    # 6J: Tokyo + London + US-prime; lunch gap (6) + PM lull (17,18) dropped
    for h in (0, 3, 5, 8, 10, 14):
        assert h in th["6J"], f"6J must allow {h}:00 UTC"
    for h in (6, 17, 18):
        assert h not in th["6J"], f"6J must DROP {h}:00 UTC"

    # 6C: USA-only [12-17]
    assert 11 not in th["6C"], "6C must DROP 11 UTC (CAD thin EU)"
    assert 12 in th["6C"]
    assert 17 in th["6C"]
    assert 18 not in th["6C"], "6C must DROP 18 UTC (CAD post-US thin)"

    _ok("trading_hours: liquidity-tuned UTC ranges per-asset coverage verified")


def test_trading_hours_18_utc_cap_applied():
    """All assets respect Topstep no-overnight 18 UTC cap (no entry after)."""
    import core.config_futures as cfg_fut
    for sym, hours in cfg_fut.TRADING_HOURS.items():
        assert all(h <= 18 for h in hours), \
            f"{sym} has hour > 18 UTC (Topstep no-overnight violated): {hours}"
        assert all(0 <= h <= 23 for h in hours), \
            f"{sym} has hour outside [0,23]: {hours}"
    _ok("trading_hours: 18 UTC cap respected across all assets")


# ============================================================
# 22. NEWS FILTER GATE (V14 port)
# ============================================================

def test_news_block_skips_before_brain():
    """
    HIGH USD event at +5min within before_min=45 -> scan_skip
    NEWS_BLOCK with full event details. Brain MUST NOT be called
    (gate is pre-fetch H4 / pre-Brain to save AI cost).
    """
    from core.news_filter import NewsEvent, NewsFilter
    fixed = FIXED_DAY_UTC  # 14:00 UTC, MES inside trading hours
    nf = NewsFilter(
        enabled=True, before_min=45, after_min=15,
        source_url="https://test.invalid", logger=None,
    )
    nf._upcoming = [NewsEvent(
        dt=fixed + timedelta(minutes=5),
        country="USD", impact="High", title="FOMC Statement",
    )]
    nf._last_sync_at = fixed

    cfg = make_config()
    state = make_state()
    logger = FakeLogger()
    brain = FakeBrain(entry_decision=None)
    orch = Orchestrator(
        config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
        state=state, store=_MemoryStore(), logger=logger,
        brain_dispatch={"TF": brain, "MR": brain},
        opener=TradeOpener(broker=None, is_paper=True, logger=logger),
        closer=TradeCloser(broker=None, is_paper=True, logger=logger),
        risk_manager=RiskManager(cfg, state, logger=logger),
        broker=None,
        news_filter=nf,
        now_utc_provider=lambda: fixed,
        max_iterations=1,
    )
    patch_tech(orch)
    asyncio.run(orch.run())
    skips = [e for e in logger.brain_log.events if e["event"] == "scan_skip"]
    blk = [e for e in skips if e.get("reason") == "NEWS_BLOCK"]
    assert len(blk) == 1, f"expected 1 NEWS_BLOCK, got {[s.get('reason') for s in skips]}"
    ev = blk[0]
    assert ev["title"] == "FOMC Statement"
    assert ev["country"] == "USD"
    assert ev["impact"] == "High"
    assert ev["window_reason"] == "BEFORE"
    assert 4 < ev["minutes_to_event"] < 6
    # Brain MUST NOT have been called (gate is pre-Brain)
    assert brain.evaluate_calls == 0, \
        f"brain called {brain.evaluate_calls}× despite NEWS_BLOCK gate"
    _ok("news_block: HIGH USD event +5min -> NEWS_BLOCK, no brain call")


def test_news_filter_disabled_passes_to_brain():
    """ENABLE_NEWS_FILTER=False (filter constructed disabled) never blocks."""
    from core.news_filter import NewsEvent, NewsFilter
    fixed = FIXED_DAY_UTC
    nf = NewsFilter(
        enabled=False, before_min=45, after_min=15,
        source_url="https://test.invalid", logger=None,
    )
    # Even with imminent USD event in cache, disabled filter must not block
    nf._upcoming = [NewsEvent(
        dt=fixed + timedelta(minutes=5),
        country="USD", impact="High", title="FOMC",
    )]

    cfg = make_config()
    state = make_state()
    logger = FakeLogger()
    brain = FakeBrain(entry_decision=None)
    orch = Orchestrator(
        config=cfg, ai_client=FakeAI(), market_data_provider=FakeProvider(),
        state=state, store=_MemoryStore(), logger=logger,
        brain_dispatch={"TF": brain, "MR": brain},
        opener=TradeOpener(broker=None, is_paper=True, logger=logger),
        closer=TradeCloser(broker=None, is_paper=True, logger=logger),
        risk_manager=RiskManager(cfg, state, logger=logger),
        broker=None,
        news_filter=nf,
        now_utc_provider=lambda: fixed,
        max_iterations=1,
    )
    patch_tech(orch)
    asyncio.run(orch.run())
    skips = [e for e in logger.brain_log.events if e["event"] == "scan_skip"]
    reasons = [e.get("reason") for e in skips]
    assert "NEWS_BLOCK" not in reasons, \
        f"disabled filter must never produce NEWS_BLOCK, got {reasons}"
    _ok("news_filter: disabled -> no NEWS_BLOCK, passes through to brain")


# ============================================================
# MAIN
# ============================================================

def main() -> int:
    print("test_orchestrator_phase_b.py")
    test_entry_happy_path_creates_active_trade()
    test_entry_approved_event_emitted_in_brain_log()
    test_entry_approved_carries_radar_tech_fields()
    test_tp_resolved_event_emitted_post_sizing()
    test_entry_brain_none_no_state_change()
    test_entry_skipped_when_sizing_skip()
    test_entry_skipped_when_risk_halted()
    test_entry_skipped_when_daily_loss_hard_stop_hit()
    test_entry_increments_counters()
    test_entry_opener_failure_no_state_mutation()
    test_entry_skipped_when_max_open_trades_reached()
    test_exit_action_closes_trade()
    test_exit_win_calls_register_tp_hit()
    test_exit_loss_calls_register_sl_hit_with_cooldown()
    test_exit_closer_failure_keeps_active_trade()
    test_manage_skips_when_position_closed_externally()
    test_manage_runs_brain_when_position_still_open()
    test_manage_skips_external_check_for_paper_trades()
    test_external_close_loss_recovers_pnl_and_registers_sl_hit()
    test_external_close_win_calls_register_tp_hit_and_resets_consecutive_sl()
    test_external_close_no_match_skips_risk_hooks()
    test_partial_50_with_set_be_modifies_stop_and_runtime()
    test_partial_50_live_rewires_bracket_ids_on_success()
    test_partial_50_failure_sets_partial_done_to_block_retries()
    test_partial_50_without_set_be()
    test_move_sl_breakeven_updates_runtime_sl_price()
    test_move_sl_trailing_updates_runtime_sl_price()
    test_hold_persists_runtime_last_exit_eval_time()
    test_candle_dedup_runtime_field_updated_on_move_sl()
    test_state_persists_across_iterations()
    test_techsnapshot_to_dict_provides_opener_fields()
    test_trading_hours_inside_passes_to_brain()
    test_trading_hours_outside_skips_before_brain()
    test_trading_hours_unknown_symbol_fail_open()
    test_trading_hours_per_asset_coverage_liquidity_tuned()
    test_trading_hours_18_utc_cap_applied()
    test_news_block_skips_before_brain()
    test_news_filter_disabled_passes_to_brain()
    print("ALL TESTS PASSED")
    return 0


if __name__ == "__main__":
    sys.exit(main())
