"""
APEX V16 — Orchestrator (Phase B: full trade lifecycle).

Top-level async loop replacing the V15 monolith's main while loop
(APEX_PREDATOR_V15.py:2660-2712).

Phase B scope:
  - Entry path: brain.evaluate_entry -> sizing.size_for_entry ->
    risk_manager.check_entry -> opener.open_trade -> state mutation
    (active_trades, daily counters) -> save.
  - Exit path: brain.manage_exit -> dispatch on action:
        HOLD       -> persist runtime.last_exit_eval_time, save
        EXIT       -> closer.close_trade -> risk hooks -> state cleanup -> save
        PARTIAL_50 -> closer.partial_close -> risk hooks -> runtime updates
                      -> optional broker.modify_stop for set_be_after_partial
                      -> save
        MOVE_SL    -> broker.modify_stop -> runtime.current_sl_price -> save
  - PAPER and DRY collapse here: opener/closer constructed with is_paper=True.
    LIVE is_paper=False (production wireup of broker.connect() lives in main.py;
    Phase C does the full LIVE provider/broker integration for real Topstep).

Phase C (out of scope here):
  - SIGTERM signal handler + asyncio.Event wired in main.py
  - Broker.connect() + BrokerMarketDataProvider for production
  - DRY mode that connects broker but suppresses orders
  - Orphan recovery, FORCE_FLAT close-all, watchdog

Public API unchanged from Phase A:
  Orchestrator(*, config, ai_client, market_data_provider, state, store,
               logger, brain_dispatch, bias_resolver=None,
               opener=None, closer=None, risk_manager=None,
               max_iterations=None)
    .run() -> int
    .stop()
"""

from __future__ import annotations

import asyncio
import dataclasses
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Callable, Optional

from analysis.bias import BiasResolver
from analysis.tech_snapshot import (
    StaleDataError, TechSnapshot, build_tech_snapshot, invalidate_tech_cache,
    tech_log_fields,
)
from brain.brain_selector import choose_brain
from core import config_futures as cfg_fut
from core.config_futures import (
    ASSETS_MAP,
    ENABLE_TRADING_HOURS_FILTER,
    TRADING_HOURS,
)
from core.contracts import (
    BrainContext, BrainDecision, EntryDecision, TradeAction, TradeRuntime,
)
from core.news_filter import NewsFilter
from dashboard_writer import DashboardWriter
from persistence.state_store import ActiveTrade
from trading.risk_manager import RiskManager
from trading.sizing import SizingDecision, size_for_entry
from trading.tp_resolver import TPResolution, resolve_tp_price
from trading.trade_closer import TradeCloseResult, TradeCloser
from trading.trade_opener import TradeOpener, TradeOpenResult


# ============================================================
# C2b — Broker error classification + exit code
# ============================================================

EXIT_BROKER_UNRECOVERABLE: int = 4

# V17 bug #4: open_trade retry on transient broker_failure.
# Backoff sequence in seconds. Total = 1 initial + 3 retries = 4 attempts,
# max cumulative wait 9s. After exhausting retries, post_order_safety_check
# still runs (a previous attempt may have filled broker-side); only if THAT
# also yields nothing is "open_failed_permanent" emitted and the trade
# abandoned. The legacy "open_failed" event is still emitted on the FIRST
# failure (before retries) for downstream monitoring continuity.
OPEN_TRADE_RETRY_DELAYS_SEC: tuple[float, ...] = (1.0, 3.0, 5.0)

# Error patterns that indicate connection/transport problems and should
# trigger degraded mode (vs. logical errors which the per-symbol
# try/except handles). Substring match on str(exc).
_TRANSIENT_ERROR_TOKENS: tuple[str, ...] = (
    "connection",
    "connect",
    "websocket",
    "timeout",
    "timed out",
    "reset",
    "broken pipe",
    "unreachable",
    "ssl",
    "transport",
    "network",
)

# Auth-failure tokens — recoverable via reconnect (token refresh) but
# treated as transient for degraded-mode purposes.
_AUTH_ERROR_TOKENS: tuple[str, ...] = (
    "401", "403", "unauthorized", "forbidden", "expired", "invalid token",
)


def classify_broker_error(exc: BaseException) -> str:
    """
    Returns "transient" | "auth" | "fatal".
      transient -> connection/timeout, retry via reconnect
      auth      -> token issue, retry via reconnect (loginKey refresh)
      fatal     -> unknown error class, treat as transient (safer default)
    """
    msg = str(exc).lower()
    if any(tok in msg for tok in _AUTH_ERROR_TOKENS):
        return "auth"
    if any(tok in msg for tok in _TRANSIENT_ERROR_TOKENS):
        return "transient"
    if isinstance(exc, (ConnectionError, asyncio.TimeoutError, TimeoutError)):
        return "transient"
    return "fatal"


# ============================================================
# ORCHESTRATOR
# ============================================================

class Orchestrator:
    """Phase B: full lifecycle loop with brain -> sizing -> risk -> opener/closer."""

    def __init__(
        self,
        *,
        config,
        ai_client,
        market_data_provider,
        state,
        store,
        logger,
        brain_dispatch: Optional[dict[str, Any]] = None,
        bias_resolver: Optional[BiasResolver] = None,
        opener: Optional[TradeOpener] = None,
        closer: Optional[TradeCloser] = None,
        risk_manager: Optional[RiskManager] = None,
        broker: Optional[Any] = None,
        reconciler: Optional[Any] = None,
        news_filter: Optional[NewsFilter] = None,
        now_utc_provider: Optional[Callable[[], datetime]] = None,
        max_iterations: Optional[int] = None,
    ) -> None:
        self.config = config
        self.ai_client = ai_client
        self.provider = market_data_provider
        self.state = state
        self.store = store
        self.logger = logger
        self.brain_dispatch: dict[str, Any] = dict(brain_dispatch or {})
        self.bias_resolver = bias_resolver or BiasResolver(
            ai_client=ai_client, state=state, logger=logger,
        )
        self.opener = opener
        self.closer = closer
        self.risk_manager = risk_manager
        self.broker = broker
        self.reconciler = reconciler
        # News filter: V14 port. Default-construct from config_futures unless
        # caller injected one (tests inject pre-populated instances). Gate
        # is fail-open so a missing filter behaves identically to disabled.
        self.news_filter: NewsFilter = news_filter or NewsFilter(
            enabled=cfg_fut.ENABLE_NEWS_FILTER,
            before_min=cfg_fut.NEWS_BLOCK_BEFORE_MIN,
            after_min=cfg_fut.NEWS_BLOCK_AFTER_MIN,
            source_url=cfg_fut.NEWS_SOURCE_URL,
            http_timeout=cfg_fut.NEWS_HTTP_TIMEOUT_SEC,
            logger=logger,
            cache_dir=Path("~/apex_v16/cache").expanduser(),
        )
        # Clock injection — defaults to real UTC. Tests pass a fixed-time
        # callable to keep wall-clock-sensitive code (brain_selector
        # night-TF block) deterministic. Default MUST NOT self-reference;
        # write the lambda body literally so a global `replace_all` over
        # `datetime.now(timezone.utc)` cannot break it.
        def _real_utc_now() -> datetime:
            return datetime.now(timezone.utc)
        self._now_utc: Callable[[], datetime] = (
            now_utc_provider if now_utc_provider is not None
            else _real_utc_now
        )
        self.max_iterations = max_iterations
        self._stop_event = asyncio.Event()
        self._iteration = 0
        self._sys_log: logging.Logger = (
            logger.system if logger is not None else logging.getLogger("orchestrator")
        )
        # C2b — broker degraded state. Stays False on PAPER (no broker).
        self._broker_degraded: bool = False
        self._degraded_since: Optional[datetime] = None
        # Optional injection point for tests; main.py reuses _connect_with_retry.
        self._reconnect_fn = None
        # State lock: serializza mutazioni di state.active_trades fra
        # _scan_loop e _manage_loop. Singolo lock = no deadlock possibile.
        self._state_lock = asyncio.Lock()
        # Maintenance counter — usato per la cadenza watchdog (ogni N
        # iter di maintenance_loop). Sostituisce il vecchio iteration mod.
        self._maintenance_iter: int = 0
        # Broker-reported realized P&L for the current CME futures
        # session day (17:00 America/Chicago anchor, DST-aware). Refreshed
        # by _refresh_daily_rpnl every maintenance tick; None when the
        # broker is unavailable (PAPER) or the call failed. Consumed by
        # DashboardWriter as the primary daily_pnl source — falls back
        # to the in-memory counter when None.
        self._cached_daily_rpnl: Optional[float] = None
        self._cached_daily_rpnl_at: Optional[datetime] = None
        # Scan loop tracker — unix-sec OPEN time of the last M5 close
        # whose post-close window we already scanned. Prevents
        # double-scanning the same candle when wakeups are noisy or when
        # the loop body finishes faster than the candle window.
        self._last_scanned_m5_close: int = 0
        # V17 bug #2: set to True by _scan_one on StaleDataError. End of
        # scan_loop iteration rolls back _last_scanned_m5_close so next
        # tick retries the same candle window.
        self._stale_data_in_scan: bool = False
        # V18 dashboard — fail-open JSON snapshot refreshed every
        # maintenance tick (see _maintenance_loop). Read-only on state.
        self.dashboard_writer: DashboardWriter = DashboardWriter()

    # ============================================================
    # PUBLIC
    # ============================================================

    async def run(self) -> int:
        self._sys_log.info(
            "Orchestrator starting in %s mode, %d assets — 3-loop architecture: "
            "scan=M5 boundary +%.0fs, manage=%ds (skip if flat), maintenance=%ds",
            self.config.mode.value,
            len(self._asset_list()),
            float(getattr(self.config, "scan_loop_phase_offset_seconds", 6.0)),
            int(getattr(self.config, "manage_loop_interval_seconds", 20)),
            int(getattr(self.config, "maintenance_loop_interval_seconds", 60)),
        )
        # Sentinels: KeyboardInterrupt and Cancellation derive from
        # BaseException so asyncio.gather(return_exceptions=True) does NOT
        # capture them — they'd propagate and orphan the surviving loops.
        # Convert to plain Exception subclasses inside the wrapper so the
        # gather can collect all results before we decide the exit code.
        class _LoopKeyboardInterrupt(Exception):
            pass

        class _LoopCrashed(Exception):
            def __init__(self, original: BaseException) -> None:
                self.original = original
                super().__init__(str(original))

        async def _wrap(coro):
            try:
                await coro
            except KeyboardInterrupt:
                self._stop_event.set()
                raise _LoopKeyboardInterrupt() from None
            except asyncio.CancelledError:
                self._stop_event.set()
                raise
            except BaseException as e:
                self._stop_event.set()
                raise _LoopCrashed(e) from e

        try:
            tasks = [
                asyncio.create_task(_wrap(self._scan_loop()), name="scan_loop"),
                asyncio.create_task(_wrap(self._manage_loop()), name="manage_loop"),
                asyncio.create_task(_wrap(self._maintenance_loop()), name="maintenance_loop"),
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            for r in results:
                if isinstance(r, _LoopKeyboardInterrupt):
                    self._sys_log.warning("Interrupted by user (Ctrl+C)")
                    return 130
                if isinstance(r, asyncio.CancelledError):
                    raise r
                if isinstance(r, _LoopCrashed):
                    self._sys_log.exception(
                        "Unhandled exception in loop: %s", r.original,
                    )
                    if self.logger is not None:
                        try:
                            self.logger.log_error(
                                where="orchestrator.run", error=str(r.original),
                            )
                        except Exception:
                            pass
                    return 1

            # Hard-timeout exit code propagation
            if self._broker_degraded and self._degraded_since is not None:
                max_min = float(getattr(self.config, "broker_degraded_max_minutes", 15) or 15)
                elapsed_min = (
                    self._now_utc() - self._degraded_since
                ).total_seconds() / 60.0
                if elapsed_min > max_min:
                    return EXIT_BROKER_UNRECOVERABLE
            return 0
        finally:
            try:
                self.store.save(self.state)
            except Exception as e:
                self._sys_log.warning("final save failed: %s", e)

    def stop(self) -> None:
        self._stop_event.set()

    # ============================================================
    # LOOPS (3 cadenze indipendenti)
    # ============================================================

    async def _scan_loop(self) -> None:
        """Scan entries on every M5 boundary + phase_offset.

        Stateful: `self._last_scanned_m5_close` tracks which M5 close we
        already scanned, so we never rescan the same candle twice within
        its 5-minute window. Cold start: if we boot inside the valid
        post-close window for an M5 we haven't yet scanned, run
        immediately; otherwise wait until next close + offset.
        """
        while not self._stop_event.is_set():
            sleep_for = self._scan_decision_seconds()
            if sleep_for > 0:
                try:
                    await asyncio.wait_for(
                        self._stop_event.wait(), timeout=sleep_for,
                    )
                    return  # stop fired during sleep
                except asyncio.TimeoutError:
                    pass
            self._iteration += 1
            # Mark this M5 as scanned BEFORE the work runs, so a slow
            # _scan_entries doesn't trigger a duplicate iter on next pass.
            import time as _time
            prev_scanned_m5_close = self._last_scanned_m5_close
            self._stale_data_in_scan = False
            self._last_scanned_m5_close = int((_time.time() // 300) * 300)
            try:
                if not self._broker_degraded:
                    async with self._state_lock:
                        await self._scan_entries()
                if self._stale_data_in_scan:
                    # V17 bug #2: at least one symbol saw stale broker
                    # data (StaleDataError raised). Rollback so next
                    # tick retries the same candle window.
                    self._sys_log.info(
                        "[scan_loop] stale broker data detected; "
                        "rollback _last_scanned_m5_close (%d -> %d)",
                        self._last_scanned_m5_close, prev_scanned_m5_close,
                    )
                    self._last_scanned_m5_close = prev_scanned_m5_close
            except KeyboardInterrupt:
                raise
            except Exception as e:
                self._sys_log.warning("[scan_loop] iter %d failed: %s", self._iteration, e)
            if self.max_iterations is not None and self._iteration >= self.max_iterations:
                self._sys_log.info(
                    "scan_loop max_iterations=%d reached, exiting", self.max_iterations,
                )
                return
            # Mandatory yield: when offset==0 (test config), sleep_for is
            # always 0 and this would hot-loop without ceding control.
            await asyncio.sleep(0)

    async def _manage_loop(self) -> None:
        """Manage open trades. Skip tick if active_trades is empty."""
        interval = int(getattr(self.config, "manage_loop_interval_seconds", 20))
        manage_iter = 0
        while not self._stop_event.is_set():
            try:
                if self.state.active_trades:
                    async with self._state_lock:
                        await self._manage_open_trades()
            except KeyboardInterrupt:
                raise
            except Exception as e:
                self._sys_log.warning("[manage_loop] failed: %s", e)
            manage_iter += 1
            if self.max_iterations is not None and manage_iter >= self.max_iterations:
                return
            if interval > 0:
                try:
                    await asyncio.wait_for(self._stop_event.wait(), timeout=interval)
                    return
                except asyncio.TimeoutError:
                    pass
            else:
                await asyncio.sleep(0)

    async def _maintenance_loop(self) -> None:
        """Reconnect + balance refresh + news sync + watchdog + persist."""
        interval = int(getattr(self.config, "maintenance_loop_interval_seconds", 60))
        while not self._stop_event.is_set():
            self._maintenance_iter += 1
            # V18 anti-freeze heartbeat — emesso PRIMA di balance/news/watchdog
            # in modo che watchdog.sh veda liveness anche se una di quelle
            # chiamate dovesse stallare oltre il proprio timeout. Una riga
            # ogni `maintenance_loop_interval_seconds` (default 60s) tiene
            # system.log sotto la soglia STALE_SECONDS=600s.
            if getattr(self.config, "maintenance_heartbeat_enabled", True):
                self._sys_log.info(
                    "[maintenance] heartbeat iter=%d active=%d halted=%s degraded=%s "
                    "balance=$%.2f",
                    self._maintenance_iter,
                    len(self.state.active_trades),
                    self.state.halted,
                    self._broker_degraded,
                    float(getattr(self, "_cached_account_balance", 0.0) or 0.0),
                )
            try:
                # Daily reset at UTC midnight. load_state(auto_daily_reset=True)
                # only fires at boot — if the bot stays up across 00:00 UTC the
                # counters keep accumulating against yesterday's date. Mirror
                # the load_state semantics (also clear halts: they are
                # daily-scoped per persistence/state_store.py:527-548).
                today_utc = datetime.now(timezone.utc).date().isoformat()
                if self.state.daily.date != today_utc:
                    from persistence.state_store import DailyCounters
                    prev_date = self.state.daily.date
                    self.state.daily = DailyCounters(date=today_utc)
                    self.state.halted = False
                    self.state.halt_reason = ""
                    self._sys_log.info(
                        "[maintenance] daily reset %s -> %s (counters + halts cleared)",
                        prev_date, today_utc,
                    )
                if self._broker_degraded:
                    outcome = await self._maybe_reconnect()
                    if outcome == "exit":
                        self._stop_event.set()
                        return
                await self._refresh_account_balance()
                await self._refresh_daily_rpnl()
                await self._maybe_sync_news()
                if not self._broker_degraded:
                    await self._maybe_run_watchdog(self._maintenance_iter)
                try:
                    self.store.save(self.state)
                except Exception as e:
                    self._sys_log.warning("[maintenance] save failed: %s", e)
                # V18 dashboard snapshot — fail-open per design; the
                # writer logs its own warnings and never raises.
                self.dashboard_writer.write(self)
            except KeyboardInterrupt:
                raise
            except Exception as e:
                self._sys_log.warning("[maintenance_loop] failed: %s", e)
            if (
                self.max_iterations is not None
                and self._maintenance_iter >= self.max_iterations
            ):
                return
            if interval > 0:
                try:
                    await asyncio.wait_for(self._stop_event.wait(), timeout=interval)
                    return
                except asyncio.TimeoutError:
                    pass
            else:
                await asyncio.sleep(0)

    def _scan_decision_seconds(self) -> float:
        """
        Returns seconds to sleep before the next scan, or 0.0 if scan
        should execute immediately.

        Logic:
          - The M5 candle that just closed has identifier `current_close`
            (unix sec, multiple of 300).
          - The "valid scan window" for that candle is post-close in
            [offset, candle_max_age_seconds] — the same range the brain
            accepts (CANDLE_STABILIZING / CANDLE_TOO_OLD gates).
          - We scan AT MOST ONCE per current_close (tracked via
            self._last_scanned_m5_close), so the loop runs the entry
            evaluation exactly once per candle.

        Returns 0.0 only when:
          (a) we're inside the valid window for current_close, AND
          (b) current_close has not yet been scanned.
        Otherwise returns the seconds until the NEXT M5 close + offset.

        Test escape hatch: when offset==0 we bypass the boundary logic
        entirely (always 0.0). Tests use offset=0 + max_iterations to
        drive N consecutive scan iters deterministically.
        """
        import time as _time
        M5 = 300
        offset = float(getattr(self.config, "scan_loop_phase_offset_seconds", 6.0))
        if offset <= 0.0:
            return 0.0  # test mode: no M5 alignment, no per-candle dedup
        max_age = float(getattr(self.config, "candle_max_age_seconds", 60.0))
        now = _time.time()
        current_close = int((now // M5) * M5)
        secs_since_close = now - current_close
        last_scanned = getattr(self, "_last_scanned_m5_close", 0)

        in_valid_window = (offset <= secs_since_close <= max_age)
        already_scanned_this_candle = (current_close <= last_scanned)

        if in_valid_window and not already_scanned_this_candle:
            return 0.0

        # Wait until the next close + offset.
        next_close = current_close + M5
        return (next_close - now) + offset

    async def _maybe_sync_news(self) -> None:
        """
        Top-of-iteration check; sync if stale > NEWS_SYNC_INTERVAL_MIN
        (or never synced). Async, non-blocking — NewsFilter.sync wraps
        urllib in asyncio.to_thread. Fail-open on any exception.
        """
        if not self.news_filter.enabled:
            return
        last = self.news_filter.last_sync_at
        now = self._now_utc()
        interval = timedelta(minutes=cfg_fut.NEWS_SYNC_INTERVAL_MIN)
        if last is not None and (now - last) < interval:
            return
        timeout = float(getattr(self.config, "news_sync_timeout_seconds", 30.0))
        try:
            await asyncio.wait_for(self.news_filter.sync(), timeout=timeout)
        except asyncio.TimeoutError:
            self._sys_log.warning("[news] sync timed out after %.0fs", timeout)
        except Exception as e:
            self._sys_log.warning("[news] sync raised: %s", e)

    # ============================================================
    # C2b — DEGRADED MODE + RECONNECT
    # ============================================================

    def _mark_degraded(self, reason: str, exc: Optional[BaseException] = None) -> None:
        """Idempotent: first call records timestamp + logs; subsequent are no-ops."""
        if self._broker_degraded:
            return
        self._broker_degraded = True
        self._degraded_since = self._now_utc()
        self._sys_log.warning(
            "[broker] entering DEGRADED mode (reason=%s, exc=%s)",
            reason, exc,
        )
        if self.logger is not None:
            try:
                self.logger.brain_log.write(
                    "broker_degraded_entered", reason=reason, error=str(exc),
                )
            except Exception:
                pass

    def _mark_recovered(self) -> None:
        if not self._broker_degraded:
            return
        elapsed = None
        if self._degraded_since is not None:
            elapsed = (self._now_utc() - self._degraded_since).total_seconds()
        self._broker_degraded = False
        self._degraded_since = None
        self._sys_log.info("[broker] RECOVERED after %.1fs degraded", elapsed or 0.0)
        if self.logger is not None:
            try:
                self.logger.brain_log.write(
                    "broker_degraded_exited", elapsed_seconds=elapsed,
                )
            except Exception:
                pass

    def _maybe_handle_broker_exception(self, where: str, exc: BaseException) -> None:
        """Route a broker-call exception: classify and (if connection-like) degrade."""
        if self.broker is None:
            return
        kind = classify_broker_error(exc)
        if kind in ("transient", "auth", "fatal"):
            # All broker exceptions trigger degraded mode (fatal default
            # is safer than ignore — reconnect retries N times then exits).
            self._mark_degraded(reason=f"{where}:{kind}", exc=exc)

    async def _maybe_reconnect(self) -> str:
        """
        Attempt mid-loop reconnect. Returns:
          "recovered" -> reconnect succeeded, post-reconnect reconcile run
          "still_degraded" -> reconnect failed but timeout not exceeded
          "exit" -> hard timeout exceeded, orchestrator must exit
        """
        if self.broker is None:
            return "still_degraded"

        # Hard timeout check FIRST — don't even attempt if we've been
        # degraded too long (avoid burning more attempts on a dead broker).
        max_min = float(getattr(self.config, "broker_degraded_max_minutes", 15) or 15)
        if self._degraded_since is not None:
            elapsed_min = (
                self._now_utc() - self._degraded_since
            ).total_seconds() / 60.0
            if elapsed_min > max_min:
                self._sys_log.error(
                    "[broker] degraded for %.1fmin (> %.1fmin) — exiting with code %d",
                    elapsed_min, max_min, EXIT_BROKER_UNRECOVERABLE,
                )
                if self.logger is not None:
                    try:
                        self.logger.brain_log.write(
                            "broker_unrecoverable",
                            elapsed_minutes=elapsed_min, max_minutes=max_min,
                        )
                    except Exception:
                        pass
                return "exit"

        ok = await self._do_reconnect()
        if not ok:
            return "still_degraded"

        # Reconnect succeeded — run post-reconnect reconciliation to
        # rebuild state-broker alignment before resuming normal scan.
        self._mark_recovered()
        # V17 bug #2: broker reconnected; cached tech snapshots may
        # reference candles older than current. Drop the cache.
        invalidate_tech_cache()
        if self.reconciler is not None:
            try:
                await self.reconciler.reconcile_startup(post_reconnect=True)
            except TypeError:
                # Older Reconciler without the kwarg — call without it.
                try:
                    await self.reconciler.reconcile_startup()
                except Exception as e:
                    self._sys_log.warning("post-reconnect reconcile failed: %s", e)
            except Exception as e:
                self._sys_log.warning("post-reconnect reconcile failed: %s", e)
        return "recovered"

    async def _do_reconnect(self) -> bool:
        """
        Issue a single broker.disconnect()+connect() cycle. Falls back
        to broker.connect() alone if disconnect raises. Returns True on
        success, False on failure (caller decides what to do next).

        For multi-attempt mid-loop retries, main.py composes this with
        _connect_with_retry by passing a callable via self._reconnect_fn.
        """
        if self._reconnect_fn is not None:
            try:
                return bool(await self._reconnect_fn(self.broker))
            except Exception as e:
                self._sys_log.warning("reconnect_fn raised: %s", e)
                return False
        try:
            try:
                await self.broker.disconnect()
            except Exception:
                pass
            return bool(await self.broker.connect())
        except Exception as e:
            self._sys_log.warning("broker reconnect failed: %s", e)
            return False

    async def _maybe_run_safety_check(
        self, *, symbol: str, brain: str, decision, contracts: int, tech,
        error_context: str,
    ) -> Optional[TradeOpenResult]:
        """
        After an opener failure, ask the opener to verify whether a position
        actually opened broker-side and (if so) re-attach SL/TP. Returns
        the recovered TradeOpenResult on success, None if no recovery
        possible. PAPER skips (no broker side state to inspect).
        """
        if self.opener is None or getattr(self.opener, "is_paper", False):
            return None
        if self.broker is None:
            return None
        check = getattr(self.opener, "post_order_safety_check", None)
        if check is None:
            return None
        try:
            recovered: TradeOpenResult = await check(
                symbol=symbol, brain_name=brain, decision=decision,
                contracts=contracts, tech_now=tech.to_dict(),
                error_context=error_context,
            )
        except Exception as e:
            self._sys_log.warning("[scan] %s safety_check raised: %s", symbol, e)
            return None
        if recovered.success and recovered.entry and recovered.runtime:
            return recovered
        return None

    async def _maybe_run_watchdog(self, iteration: int) -> None:
        """
        Periodically run reconciler.watchdog_naked_positions() to catch
        broker-side position closures that race the bot's main loop.

        Cadence (V16 3-loop): every N maintenance_loop iterations, where
        N = config.reconcile_interval_iterations (default 1 → 60s).
        Failures are warning-only; watchdog never raises into the loop.
        """
        if self.reconciler is None:
            return
        every = int(getattr(self.config, "reconcile_interval_iterations", 2) or 2)
        if every <= 0 or iteration % every != 0:
            return
        timeout = float(getattr(self.config, "watchdog_timeout_seconds", 30.0))
        try:
            await asyncio.wait_for(
                self.reconciler.watchdog_naked_positions(), timeout=timeout,
            )
        except asyncio.TimeoutError:
            self._sys_log.warning("[watchdog] timed out after %.0fs", timeout)
        except Exception as e:
            self._sys_log.warning("[watchdog] failed: %s", e)

    # ------------------------------------------------------------
    # SCAN ENTRIES — full Phase B path
    # ------------------------------------------------------------

    async def _scan_entries(self) -> None:
        for symbol in self._asset_list():
            if symbol in self.state.active_trades:
                continue
            try:
                await self._scan_one(symbol)
            except Exception as e:
                self._sys_log.warning("[scan] %s failed: %s", symbol, e)
                self._maybe_handle_broker_exception("scan", e)
                if self.logger is not None:
                    try:
                        self.logger.log_error(
                            where="orchestrator.scan", error=str(e), symbol=symbol,
                        )
                    except Exception:
                        pass
            if self._broker_degraded:
                # short-circuit the rest of the symbol list — handle one
                # reconnect cycle on the next iteration before re-trying.
                return

    async def _scan_one(self, symbol: str) -> None:
        # 0. Trading hours gate (V14 port). Skip BEFORE H4 fetch / brain
        # to save AI cost and bandwidth on illiquid sessions. Fail-open
        # for symbols not in TRADING_HOURS (onboarding-friendly).
        if ENABLE_TRADING_HOURS_FILTER:
            allowed_hours = TRADING_HOURS.get(symbol)
            if allowed_hours is not None:
                now_hour = self._now_utc().hour
                if now_hour not in allowed_hours:
                    self._log_skip(
                        symbol, "OUTSIDE_TRADING_HOURS",
                        details={
                            "now_hour_utc": now_hour,
                            "allowed_hours": allowed_hours,
                        },
                    )
                    return

        # 0b. News filter gate (V14 port). Skip on imminent or just-passed
        # HIGH-impact event for any of symbol's currencies. Same pre-fetch
        # position as trading_hours: avoids AI cost during macro-driven
        # spread widening / impulse moves. Fail-open if filter disabled,
        # cache empty (never synced / sync failed), or symbol absent
        # from ASSET_CURRENCIES.
        blocking = self.news_filter.is_blocked(symbol, self._now_utc())
        if blocking is not None:
            self._log_skip(
                symbol, "NEWS_BLOCK",
                details={
                    "title": blocking.event.title,
                    "country": blocking.event.country,
                    "impact": blocking.event.impact,
                    "event_dt_utc": blocking.event.dt.isoformat(),
                    "minutes_to_event": round(blocking.minutes_to_event, 1),
                    "window_reason": blocking.reason,
                },
            )
            return

        # Skip H4 fetch if bias_resolver has a fresh cache entry: resolve()
        # would return the cached BiasData without touching df4. Saves 1
        # REST/asset/scan during the TTL window (3600s).
        if self.bias_resolver.has_fresh_cache(symbol):
            df4 = None
        else:
            df4 = await self._fetch_h4(symbol)
        bias = await self.bias_resolver.resolve(symbol, df4)
        # V17 bug #2: pass the just-closed M5 open time so
        # build_tech_snapshot can detect broker propagation lag and retry
        # before accepting stale indicators. On persistent lag,
        # StaleDataError is raised → log scan_skip and rollback so the
        # next tick retries.
        import time as _time
        expected_open = (int(_time.time() // 300) - 1) * 300
        try:
            tech = await self._build_tech(symbol, min_candle_time=expected_open)
        except StaleDataError as e:
            self._stale_data_in_scan = True
            self._log_skip(
                symbol, "DATA_STALE_BROKER_LAG",
                details={
                    "candle_time_seen": e.candle_time_seen,
                    "candle_time_expected": e.candle_time_expected,
                },
            )
            return
        if tech is None:
            return

        sel = choose_brain(symbol=symbol, tech=tech, bias=bias, now_utc=self._now_utc())
        chosen = sel.chosen
        if chosen is None:
            self._log_skip(
                symbol, sel.reject_reason or "brain_selector_none",
                tech=tech, bias=bias, details=sel.details,
            )
            return

        brain = self.brain_dispatch.get(chosen)
        if brain is None:
            self._log_skip(symbol, f"brain_dispatch_missing:{chosen}", tech=tech, bias=bias)
            return

        # 1. Brain entry decision (with same-candle dedup)
        last_eval = float(
            self.state.entry_eval_cache.last_eval.get(symbol, 0.0)
        )
        try:
            eval_result = await brain.evaluate_entry(
                symbol, tech, bias_data=bias, last_entry_eval_time=last_eval,
            )
        except Exception as e:
            self._sys_log.warning("[scan] %s brain.evaluate_entry raised: %s", symbol, e)
            if self.logger is not None:
                try:
                    self.logger.log_error(
                        where="orchestrator.evaluate_entry",
                        error=str(e), symbol=symbol, brain=chosen,
                    )
                except Exception:
                    pass
            return

        if eval_result.dedup_skipped:
            self._log_skip(
                symbol, "SAME_CANDLE_DEDUP",
                tech=tech, bias=bias, brain=chosen,
            )
            return

        # Candle-state gates (CANDLE_NOT_CLOSED / CANDLE_STABILIZING /
        # CANDLE_TOO_OLD): brain returns reject_reason, orchestrator logs
        # scan_skip with that exact reason.
        if eval_result.reject_reason is not None:
            self._log_skip(
                symbol, eval_result.reject_reason,
                tech=tech, bias=bias, brain=chosen,
            )
            return

        # Update entry-eval cache when AI was actually called and a
        # cache update is warranted (candle_time set by the brain).
        if eval_result.evaluated_candle_time is not None:
            self.state.entry_eval_cache.last_eval[symbol] = float(
                eval_result.evaluated_candle_time
            )

        decision = eval_result.decision
        if decision is None:
            self._log_skip(symbol, "brain_no_setup", tech=tech, bias=bias, brain=chosen)
            return

        # Observability: emit entry_approved on brain_log (symmetric to
        # entry_rejected). Lets a single jq query on brain_log.jsonl
        # reconstruct the full AI decision flow per symbol; trade_opened
        # remains on trade_log (lifecycle event, not brain decision).
        self._log_entry_approved(symbol, chosen, decision, tech=tech)

        # 2. Sizing
        try:
            sizing = size_for_entry(
                entry=decision,
                symbol=symbol,
                account_balance=self._resolve_account_balance(),
                risk_per_trade=self.config.risk_per_trade,
                daily_pnl=self.state.daily.daily_pnl,
                daily_loss_hard_stop=self.config.daily_loss_hard_stop,
                max_risk_vs_daily_budget=self.config.max_risk_vs_daily_budget,
            )
        except ValueError as e:
            self._sys_log.warning("[scan] %s sizing rejected inputs: %s", symbol, e)
            self._log_skip(symbol, f"sizing_invalid:{e}", tech=tech, bias=bias, brain=chosen)
            return

        # 3. Risk gate (SIZING_SKIP propagates via gate #1)
        if self.risk_manager is None:
            self._log_skip(symbol, "risk_manager_missing", tech=tech, bias=bias, brain=chosen)
            return
        risk = self.risk_manager.check_entry(
            entry=decision,
            sizing=sizing,
            symbol=symbol,
            active_trades=self.state.active_trades,
        )
        if not risk.approved:
            self._log_risk_skip(symbol, chosen, decision, sizing, risk)
            self.state.daily.rejected_count += 1
            return

        # 3b. TP resolution (V17 dual-path):
        #   - When AI emitted `tp_price_suggested > 0` and direction-coherent,
        #     resolver uses the AI path with 15% conservative margin.
        #   - Otherwise resolver falls back to V16 math: tp_price from
        #     rr_multiplier × real $ risk.
        # Brain emitted tp_price=0.0 sentinel; orchestrator owns the
        # finalization since contracts/sl_ticks are only known post-sizing.
        tp_price_suggested_ai = float(
            decision.metadata.get("tp_price_suggested_ai", 0.0) or 0.0
        )
        try:
            tp_res = resolve_tp_price(
                symbol=symbol,
                direction=decision.direction,
                entry_price=decision.entry_price,
                rr_multiplier=decision.rr_multiplier,
                contracts=sizing.contracts,
                sl_ticks=sizing.sl_ticks,
                tp_price_suggested=tp_price_suggested_ai,
            )
        except (KeyError, ValueError) as e:
            self._sys_log.warning("[scan] %s tp_resolve failed: %s", symbol, e)
            self._log_skip(
                symbol, f"tp_resolve_invalid:{e}",
                tech=tech, bias=bias, brain=chosen,
            )
            return
        decision = dataclasses.replace(decision, tp_price=tp_res.tp_price)
        self._log_tp_resolved(symbol, chosen, decision, tp_res)

        # 4. Opener
        if self.opener is None:
            self._log_skip(symbol, "opener_missing", tech=tech, bias=bias, brain=chosen)
            return

        # V17 bug #4: retry transient broker_failure (1s/3s/5s backoff,
        # 4 attempts total, max 9s cumulative wait). ConnectionError-like
        # exceptions are converted to broker_failure by the catch-all and
        # participate in the same retry path. If a previous attempt actually
        # filled, the next attempt's positions_get pre-check returns
        # error_kind="duplicate" — that breaks the loop (non-broker_failure)
        # and falls through to safety_check which re-attaches SL/TP on the
        # existing orphan.
        opener_exc: Optional[Exception] = None
        result: Optional[TradeOpenResult] = None
        max_attempts = 1 + len(OPEN_TRADE_RETRY_DELAYS_SEC)
        for attempt in range(1, max_attempts + 1):
            opener_exc = None
            try:
                result = await self.opener.open_trade(
                    symbol=symbol,
                    brain_name=chosen,
                    decision=decision,
                    contracts=sizing.contracts,
                    tech_now=tech.to_dict(),
                )
            except Exception as e:
                self._sys_log.warning(
                    "[scan] %s opener raised (attempt=%d/%d): %s",
                    symbol, attempt, max_attempts, e,
                )
                opener_exc = e
                result = TradeOpenResult(
                    success=False, error=str(e), error_kind="broker_failure",
                )
                if self.logger is not None:
                    try:
                        self.logger.log_error(
                            where="orchestrator.opener", error=str(e), symbol=symbol,
                        )
                    except Exception:
                        pass

            # Legacy "open_failed" event: emitted once on the FIRST failure
            # (before retries kick in) for monitoring continuity.
            if attempt == 1 and not result.success:
                if self.logger is not None:
                    try:
                        self.logger.brain_log.write(
                            "open_failed",
                            symbol=symbol,
                            brain=chosen,
                            error=result.error,
                            error_kind=result.error_kind,
                        )
                    except Exception:
                        pass

            # Stop retrying on success or non-retryable failure
            # (duplicate / config_error / unknown).
            if result.success or result.error_kind != "broker_failure":
                break
            if attempt >= max_attempts:
                break

            delay = OPEN_TRADE_RETRY_DELAYS_SEC[attempt - 1]
            self._sys_log.warning(
                "[scan] %s open_trade broker_failure attempt=%d/%d, retry in %.1fs: %s",
                symbol, attempt, max_attempts, delay, result.error,
            )
            if self.logger is not None:
                try:
                    self.logger.brain_log.write(
                        "open_retry",
                        symbol=symbol,
                        brain=chosen,
                        attempt=attempt,
                        next_attempt=attempt + 1,
                        delay_seconds=delay,
                        error=result.error,
                        error_kind=result.error_kind,
                    )
                except Exception:
                    pass
            await asyncio.sleep(delay)

        if not result.success or result.entry is None or result.runtime is None:
            # C2b: safety check — the broker may have opened the position
            # despite the failure (any of the retry attempts may have
            # filled). opener.post_order_safety_check sleeps, polls
            # positions_get, and (if found) attempts SL/TP re-attach.
            recovered = await self._maybe_run_safety_check(
                symbol=symbol, brain=chosen, decision=decision,
                contracts=sizing.contracts, tech=tech,
                error_context=result.error or (str(opener_exc) if opener_exc else ""),
            )
            if recovered is not None:
                result = recovered
            else:
                self._sys_log.warning(
                    "[scan] %s opener failed permanently after %d attempts: %s (%s)",
                    symbol, max_attempts, result.error, result.error_kind,
                )
                if self.logger is not None:
                    try:
                        self.logger.brain_log.write(
                            "open_failed_permanent",
                            symbol=symbol,
                            brain=chosen,
                            error=result.error,
                            error_kind=result.error_kind,
                            total_attempts=max_attempts,
                        )
                    except Exception:
                        pass
                return

        # 5. State mutation + save
        # Initialize current_sl_price from entry's SL (so MOVE_SL has a baseline)
        result.runtime.current_sl_price = result.entry.sl_price
        # A/B test passivo SL (v18-dev): SL grezzo proposto dall'AI prima
        # del clamp MIN/MAX. Letto da decision.metadata e tenuto in runtime
        # solo per logging — il broker continua a vedere result.entry.sl_price.
        try:
            result.runtime.sl_price_ai_raw = float(
                decision.metadata.get("sl_price_ai_raw", 0.0) or 0.0
            )
        except (TypeError, ValueError):
            result.runtime.sl_price_ai_raw = 0.0
        self.state.active_trades[symbol] = ActiveTrade(
            entry=result.entry, runtime=result.runtime,
        )
        self.state.daily.approved_count += 1
        self.state.daily.executed_count += 1
        self._log_trade_opened(symbol, chosen, result.entry, sizing, risk)

    # ------------------------------------------------------------
    # MANAGE OPEN TRADES — full Phase B dispatch
    # ------------------------------------------------------------

    async def _manage_open_trades(self) -> None:
        for symbol in list(self.state.active_trades.keys()):
            try:
                await self._manage_one(symbol)
            except Exception as e:
                self._sys_log.warning("[manage] %s failed: %s", symbol, e)
                self._maybe_handle_broker_exception("manage", e)
                if self.logger is not None:
                    try:
                        self.logger.log_error(
                            where="orchestrator.manage", error=str(e), symbol=symbol,
                        )
                    except Exception:
                        pass

    async def _manage_one(self, symbol: str) -> None:
        active = self.state.active_trades.get(symbol)
        if active is None:
            return

        # BUG A fix: ProjectX SL/TP orders are NOT server-side OCO. When
        # broker fills TP, SL stays open (and viceversa) → orphan order
        # can fill later causing involuntary position. Detect broker-side
        # close before each manage tick by polling positions_get; if the
        # position is gone, cancel orphans and drop state.
        if await self._check_external_close(symbol, active):
            return

        tech_now = await self._build_tech(symbol)
        if tech_now is None:
            return

        # Refresh mutable runtime fields the brains read in manage_exit.
        # Without this, minutes_open and net_profit_usd stay at their
        # dataclass default 0.0 forever -> TF/MR time-stop never triggers
        # (minutes_open >= threshold is always False) and the AI exit
        # prompt always sees P&L=0 / "0m open".
        await self._refresh_runtime(active, tech_now)

        chosen = active.entry.brain_name
        brain = self.brain_dispatch.get(chosen)
        if brain is None:
            self._log_skip(symbol, f"brain_dispatch_missing:{chosen}", tech=tech_now, bias=None)
            return

        ctx = BrainContext(entry=active.entry, runtime=active.runtime, tech_now=tech_now)
        try:
            decision: BrainDecision = await brain.manage_exit(ctx)
        except Exception as e:
            self._sys_log.warning("[manage] %s brain.manage_exit raised: %s", symbol, e)
            return

        action = decision.action
        if action == TradeAction.HOLD.value:
            self._apply_candle_dedup(decision, active.runtime)
            return

        if action == TradeAction.EXIT.value:
            if self._broker_degraded:
                self._log_action_deferred(symbol, "EXIT", decision)
                return
            await self._handle_exit(symbol, active, decision, tech_now)
            return

        if action == TradeAction.PARTIAL_50.value:
            if self._broker_degraded:
                self._log_action_deferred(symbol, "PARTIAL_50", decision)
                return
            await self._handle_partial(symbol, active, decision, tech_now)
            return

        if action == TradeAction.MOVE_SL.value:
            if self._broker_degraded:
                self._log_action_deferred(symbol, "MOVE_SL", decision)
                return
            await self._handle_move_sl(symbol, active, decision)
            return

        # Unknown action: log and treat as HOLD
        self._sys_log.warning("[manage] %s unknown action %s, holding", symbol, action)
        self._apply_candle_dedup(decision, active.runtime)

    # ------------------------------------------------------------
    # ACTION HANDLERS
    # ------------------------------------------------------------

    async def _refresh_runtime(
        self, active: ActiveTrade, tech_now: TechSnapshot,
    ) -> None:
        """Update minutes_open / net_profit_usd / progress_pct from tech_now."""
        from trading.pnl_calculator import compute_pnl

        opened_at = active.entry.opened_at
        now_utc = datetime.now(timezone.utc)
        if opened_at.tzinfo is None:
            opened_at = opened_at.replace(tzinfo=timezone.utc)
        active.runtime.minutes_open = max(
            0.0, (now_utc - opened_at).total_seconds() / 60.0,
        )

        spec = ASSETS_MAP.get(active.entry.symbol, {})
        tick_size = float(spec.get("tick_size", 0.0))
        tick_value = float(spec.get("tick_value", 0.0))
        if tick_size <= 0 or tick_value <= 0:
            return

        # V17 P&L freshness: tech_now.price is M5-cached within the same
        # candle (frozen for up to 5 min between closes). Prefer
        # broker.get_last_price (tick-fresh) for the P&L estimate; silent
        # fallback to cached tech.price on any failure (paper, no broker,
        # exception, None) — same observable behaviour as today, no extra
        # log noise.
        current_price = tech_now.price
        if self.broker is not None and not active.entry.is_paper:
            try:
                broker_price = await self.broker.get_last_price(
                    active.entry.symbol,
                )
                if broker_price is not None and broker_price > 0:
                    current_price = float(broker_price)
            except Exception:
                pass  # silent fallback to tech_now.price (already set)

        try:
            pnl = compute_pnl(
                entry_price=active.entry.entry_price,
                sl_price=active.entry.sl_price,
                tp_price=active.entry.tp_price,
                current_price=current_price,
                direction=active.entry.direction,
                contracts=active.entry.contracts,
                tick_size=tick_size,
                tick_value=tick_value,
            )
        except ValueError as e:
            self._sys_log.debug(
                "[manage] %s compute_pnl skipped: %s", active.entry.symbol, e,
            )
            return
        active.runtime.net_profit_usd = pnl.net_profit_usd
        active.runtime.progress_pct = pnl.progress_pct
        self._sys_log.info(
            "[refresh_runtime] %s minutes_open=%.1f net_profit_usd=%.2f",
            active.entry.symbol,
            active.runtime.minutes_open,
            pnl.net_profit_usd,
        )

        # A/B test passivo SL (v18-dev): se il prezzo corrente ha toccato il
        # livello SL grezzo proposto dall'AI (pre-clamp), logga UNA SOLA
        # VOLTA l'evento. Nessuna azione sul trade reale: il broker continua
        # a vedere active.entry.sl_price (post-clamp). Confronto a 50+ trade
        # tra esito reale e simulato per validare il clamp MIN/MAX.
        if (
            active.runtime.sl_price_ai_raw > 0.0
            and not active.runtime.sl_ai_raw_hit
        ):
            is_long = active.entry.direction == "BUY"
            sl_raw = active.runtime.sl_price_ai_raw
            hit = (
                (is_long and current_price <= sl_raw)
                or (not is_long and current_price >= sl_raw)
            )
            if hit:
                active.runtime.sl_ai_raw_hit = True
                self._sys_log.info(
                    "[ab_test] %s sl_ai_raw HIT @ %.5f (real SL @ %.5f, "
                    "minutes_open=%.1f, pnl=%.2f)",
                    active.entry.symbol,
                    sl_raw,
                    active.entry.sl_price,
                    active.runtime.minutes_open,
                    active.runtime.net_profit_usd,
                )

    async def _check_external_close(
        self, symbol: str, active: ActiveTrade,
    ) -> bool:
        """
        Detect broker-side position close (TP or SL filled standalone).

        ProjectX SL/TP orders placed by place_market_bracket are NOT
        server-side OCO — they live independently. When one fills, the
        other stays open and can fill later, causing an involuntary
        opposite position. This polling check, run before each manage
        tick, reads the broker's position snapshot. If the position is
        gone we know one of the two filled, so we cancel the surviving
        orphan and drop the active_trade.

        Returns True if cleanup performed (caller must skip rest of tick).
        Returns False if the check is skipped (paper / no broker /
        degraded) or the position is still open.

        V18 fix: when recent_trades returns a matching closing leg, this
        path now updates risk_manager (update_daily_pnl + register_sl_hit/
        register_tp_hit) so daily counters and consecutive_sl_count stay
        in sync with broker reality. If pnl_usd cannot be recovered (no
        match / API failure), counters stay un-touched and the gap is
        logged for manual reconcile.
        """
        if active.entry.is_paper:
            return False
        if self.broker is None:
            return False
        if self._broker_degraded:
            return False
        timeout = float(getattr(self.config, "broker_call_timeout_seconds", 10.0))
        try:
            positions = await asyncio.wait_for(
                self.broker.positions_get(symbol), timeout=timeout,
            )
        except asyncio.TimeoutError:
            self._sys_log.warning(
                "[manage] %s positions_get timed out after %.0fs", symbol, timeout,
            )
            return False
        except Exception as e:
            # Be conservative on broker failure: continue normal flow.
            # Reconciler will catch it on its own cadence.
            self._sys_log.warning(
                "[manage] %s positions_get failed: %s", symbol, e,
            )
            return False

        if positions:
            return False

        # Recover exit_price + pnl_usd from broker.recent_trades.
        # Broker-authoritative (real slippage included). When unavailable
        # (broker raises / no matching fill) leave None + pnl_source="unknown"
        # — explicit null beats fabricated approximation for diagnostics.
        exit_price: Optional[float] = None
        pnl_usd: Optional[float] = None
        pnl_source = "unknown"
        try:
            since = active.entry.opened_at - timedelta(minutes=2)
            recent = await self.broker.recent_trades(
                symbol=symbol, since=since, limit=10,
            )
            # broker.recent_trades already filters by symbol server-side
            # via _symbol_matches; ClosedTrade.symbol is the full contractId
            # ("CON.F.US.EU6.M25"), not the short symbol ("6E"), so we no
            # longer compare it here. Match only on the closing-leg side
            # (opposite of the position direction).
            opp_side = "SELL" if active.entry.direction == "BUY" else "BUY"
            for t in sorted(recent, key=lambda x: x.closed_at, reverse=True):
                if t.side == opp_side:
                    exit_price = float(t.exit_price)
                    pnl_usd = float(t.pnl_usd)
                    pnl_source = "broker"
                    break
        except Exception as e:
            self._sys_log.debug(
                "[manage] %s recent_trades fallback (unknown pnl): %s",
                symbol, e,
            )

        # V17 bug #3: orphan cancellation 3-stage cascade. Position is
        # gone broker-side; we want any surviving SL/TP order cancelled.
        #   Stage 1 (id_first):  direct cancel_order on entry's stored
        #     stop_order_id / target_order_id. Bypasses the searchOpen
        #     filter — immune to contractId-format mismatch and to the
        #     order-list propagation race after a fill.
        #   Stage 2 (searchopen): cancel_all_for_symbol catches anything
        #     not in the stored IDs (e.g. modify_stop residuals, manual
        #     broker entries).
        #   Stage 3 (retry):     if Stage 1+2 found nothing, ProjectX
        #     may simply not have propagated the open-order list yet —
        #     wait 2s and retry searchopen once.
        # `orphan_cancel_method` records which stage produced hits, so
        # we can measure id_first vs searchopen vs retry effectiveness
        # over the calibration window.
        cancelled = 0
        orphan_cancel_method = "none"

        # Stage 1: ID-first
        cancelled_id_first = 0
        for tag, oid in (
            ("stop", active.entry.stop_order_id),
            ("target", active.entry.target_order_id),
        ):
            if not oid:
                continue
            try:
                res = await self.broker.cancel_order(symbol, oid)
            except Exception as e:
                self._sys_log.warning(
                    "[manage] %s cancel_order(%s=%s) raised: %s",
                    symbol, tag, oid, e,
                )
                continue
            if getattr(res, "success", False):
                cancelled_id_first += 1
        cancelled += cancelled_id_first
        if cancelled_id_first > 0:
            orphan_cancel_method = "id_first"

        # Stage 2: searchOpen safety net (always — catches non-bracket orphans)
        cancelled_searchopen = 0
        try:
            cancelled_searchopen = await self.broker.cancel_all_for_symbol(symbol)
        except Exception as e:
            self._sys_log.warning(
                "[manage] %s cancel_all_for_symbol after external close "
                "failed: %s", symbol, e,
            )
        cancelled += cancelled_searchopen
        if cancelled_searchopen > 0 and orphan_cancel_method == "none":
            orphan_cancel_method = "searchopen"

        # Stage 3: retry searchOpen once if both stages returned 0
        # (broker order-list propagation race after a fill).
        if cancelled == 0:
            await asyncio.sleep(2.0)
            cancelled_retry = 0
            try:
                cancelled_retry = await self.broker.cancel_all_for_symbol(symbol)
            except Exception as e:
                self._sys_log.warning(
                    "[manage] %s cancel_all_for_symbol retry after "
                    "external close failed: %s", symbol, e,
                )
            cancelled += cancelled_retry
            if cancelled_retry > 0:
                orphan_cancel_method = "retry"

        if self.logger is not None:
            try:
                self.logger.brain_log.write(
                    "position_closed_externally",
                    symbol=symbol,
                    brain=active.entry.brain_name,
                    direction=active.entry.direction,
                    contracts=active.entry.contracts,
                    entry_price=active.entry.entry_price,
                    sl_price=active.entry.sl_price,
                    tp_price=active.entry.tp_price,
                    stop_id=active.entry.stop_order_id,
                    target_id=active.entry.target_order_id,
                    cancelled_orphans=cancelled,
                    orphan_cancel_method=orphan_cancel_method,
                    exit_price=exit_price,
                    pnl_usd=pnl_usd,
                    pnl_source=pnl_source,
                )
            except Exception:
                pass

        self._sys_log.info(
            "[manage] %s position closed broker-side (TP/SL fill); "
            "cancelled %d orphan(s) via %s; state cleaned",
            symbol, cancelled, orphan_cancel_method,
        )

        # V18: register P&L with risk_manager when recovered from broker
        # history. Required for consecutive_sl_count, cooldown, daily_pnl
        # to stay consistent across natural SL/TP fills (which bypass the
        # bot-driven _handle_exit path). If pnl_usd is None we skip the
        # update — counters will under-count this trade, surfaced via
        # the `pnl_source=unknown` field of position_closed_externally.
        if (
            pnl_usd is not None
            and self.risk_manager is not None
        ):
            is_win = pnl_usd > 0
            self.risk_manager.update_daily_pnl(
                pnl_usd, is_win=is_win, brain=active.entry.brain_name,
            )
            if is_win:
                self.risk_manager.register_tp_hit(symbol)
            else:
                self.risk_manager.register_sl_hit(symbol)

        del self.state.active_trades[symbol]
        # V17 bug #2: drop tech cache for this symbol after external close.
        invalidate_tech_cache(symbol)

        # V17 feature #5: balance reconciliation post external close.
        # Same logic as _handle_exit but the "estimated" P&L comes from
        # broker.recent_trades (pnl_usd above) — null-tolerant per spec:
        # if recent_trades returned no match (pnl_usd is None), fall
        # back to 0 so the discrepancy still surfaces (delta = real - 0).
        # PAPER skips (no real balance).
        if self.broker is not None:
            balance_pre = getattr(self, "_cached_account_balance", None)
            balance_post: Optional[float] = None
            # Broker balance has eventual consistency post-fill: a sub-
            # second sleep lets TopstepX settle the close before we
            # snapshot, otherwise balance_post often returns the stale
            # pre-close value and balance_confirmed under-reports.
            await asyncio.sleep(0.5)
            try:
                balance_post = float(await self.broker.get_account_balance())
            except Exception as e:
                self._sys_log.warning(
                    "[manage] %s post-external-close balance fetch failed: %s",
                    symbol, e,
                )

            if balance_post is not None:
                # Refresh cache immediately — don't wait for maintenance.
                self._cached_account_balance = balance_post

                pnl_real: Optional[float] = (
                    balance_post - balance_pre
                    if balance_pre is not None and balance_pre > 0
                    else None
                )

                # Always emit balance_confirmed.
                if self.logger is not None:
                    try:
                        self.logger.brain_log.write(
                            "balance_confirmed",
                            symbol=symbol,
                            contracts=int(active.entry.contracts),
                            balance_post=round(balance_post, 2),
                            pnl_real=(
                                round(pnl_real, 2)
                                if pnl_real is not None else None
                            ),
                            exit_reason="EXTERNAL_CLOSE",
                        )
                    except Exception:
                        pass

                # Discrepancy check: estimated_pnl from recent_trades, with
                # null-fallback to 0 (don't block check on missing broker data).
                if pnl_real is not None:
                    estimated_pnl = float(pnl_usd) if pnl_usd is not None else 0.0
                    delta = pnl_real - estimated_pnl
                    if abs(delta) > 5.0:
                        self._sys_log.warning(
                            "[manage] %s PnL discrepancy (external close): "
                            "estimated=%.2f, real=%.2f, delta=%.2f",
                            symbol, estimated_pnl, pnl_real, delta,
                        )
                        if self.logger is not None:
                            try:
                                self.logger.brain_log.write(
                                    "pnl_discrepancy",
                                    symbol=symbol,
                                    estimated_pnl=round(estimated_pnl, 2),
                                    real_pnl=round(pnl_real, 2),
                                    delta=round(delta, 2),
                                    balance_pre=round(balance_pre, 2),
                                    balance_post=round(balance_post, 2),
                                    exit_reason="EXTERNAL_CLOSE",
                                    pnl_source=pnl_source,
                                )
                            except Exception:
                                pass

        return True

    async def _handle_exit(
        self, symbol: str, active: ActiveTrade, decision: BrainDecision,
        tech_now: TechSnapshot,
    ) -> None:
        if self.closer is None or self.risk_manager is None:
            self._sys_log.warning("[manage] %s EXIT: closer/risk_manager missing, skip", symbol)
            return
        spec = ASSETS_MAP.get(symbol, {})
        tick_size = float(spec.get("tick_size", 0.0))
        tick_value = float(spec.get("tick_value", 0.0))
        exit_price = tech_now.price

        try:
            result: TradeCloseResult = await self.closer.close_trade(
                entry=active.entry, exit_price=exit_price,
                reason=decision.reason, tick_size=tick_size, tick_value=tick_value,
            )
        except Exception as e:
            self._sys_log.warning("[manage] %s closer raised: %s", symbol, e)
            return

        if not result.success:
            self._sys_log.info(
                "[manage] %s closer failed: %s (%s); leaving active_trade in place",
                symbol, result.error, result.error_kind,
            )
            if self.logger is not None:
                try:
                    event_name = (
                        "close_unconfirmed"
                        if result.error_kind == "close_success_but_position_open"
                        else "close_failed"
                    )
                    self.logger.brain_log.write(
                        event_name,
                        symbol=symbol,
                        reason=str(result.error or "")[:200],
                        exit_reason=(decision.reason or "")[:200],
                        error_kind=result.error_kind or "",
                    )
                except Exception:
                    pass
            return

        # Risk hooks
        self.risk_manager.update_daily_pnl(
            result.net_profit_usd, is_win=result.is_win, brain=active.entry.brain_name,
        )
        if result.is_win:
            self.risk_manager.register_tp_hit(symbol)
        else:
            self.risk_manager.register_sl_hit(symbol)

        # `active` resta in scope per _log_trade_closed (A/B test SL fields).
        del self.state.active_trades[symbol]
        self._log_trade_closed(symbol, result, decision, active)
        self._apply_candle_dedup(decision, active.runtime)
        # V17 bug #2: drop tech cache for this symbol so next scan
        # rebuilds fresh (consecutive_sl_count may have changed).
        invalidate_tech_cache(symbol)

        # V17 feature #5: balance reconciliation post-close.
        # Compare bot-estimated P&L with broker-side balance delta to
        # surface fee/slippage drift. PAPER skips (no real balance).
        # `balance_pre` comes from the cache (refreshed by
        # _refresh_account_balance every maintenance tick, ≤60s old);
        # `balance_post` is fetched fresh and immediately replaces the
        # cache so we don't wait for the next maintenance cycle.
        if self.broker is not None:
            balance_pre = getattr(self, "_cached_account_balance", None)
            balance_post: Optional[float] = None
            # See _check_external_close: 0.5s broker-settle window before
            # snapshotting balance_post so balance_confirmed reflects the
            # just-closed trade and not the prior tick's balance.
            await asyncio.sleep(0.5)
            try:
                balance_post = float(await self.broker.get_account_balance())
            except Exception as e:
                self._sys_log.warning(
                    "[manage] %s post-close balance fetch failed: %s",
                    symbol, e,
                )

            if balance_post is not None:
                # Refresh cache immediately — don't wait for maintenance.
                self._cached_account_balance = balance_post

                pnl_real: Optional[float] = (
                    balance_post - balance_pre
                    if balance_pre is not None and balance_pre > 0
                    else None
                )

                # Always emit balance_confirmed.
                if self.logger is not None:
                    try:
                        self.logger.brain_log.write(
                            "balance_confirmed",
                            symbol=symbol,
                            contracts=int(result.contracts),
                            balance_post=round(balance_post, 2),
                            pnl_real=(
                                round(pnl_real, 2)
                                if pnl_real is not None else None
                            ),
                            exit_reason=decision.reason,
                        )
                    except Exception:
                        pass

                # Discrepancy check requires a reliable balance_pre.
                if pnl_real is not None:
                    estimated_pnl = float(result.net_profit_usd)
                    delta = pnl_real - estimated_pnl
                    if abs(delta) > 5.0:
                        self._sys_log.warning(
                            "[manage] %s PnL discrepancy: estimated=%.2f, "
                            "real=%.2f, delta=%.2f",
                            symbol, estimated_pnl, pnl_real, delta,
                        )
                        if self.logger is not None:
                            try:
                                self.logger.brain_log.write(
                                    "pnl_discrepancy",
                                    symbol=symbol,
                                    estimated_pnl=round(estimated_pnl, 2),
                                    real_pnl=round(pnl_real, 2),
                                    delta=round(delta, 2),
                                    balance_pre=round(balance_pre, 2),
                                    balance_post=round(balance_post, 2),
                                    exit_reason=decision.reason,
                                )
                            except Exception:
                                pass

    async def _handle_partial(
        self, symbol: str, active: ActiveTrade, decision: BrainDecision,
        tech_now: TechSnapshot,
    ) -> None:
        if self.closer is None or self.risk_manager is None:
            return
        spec = ASSETS_MAP.get(symbol, {})
        tick_size = float(spec.get("tick_size", 0.0))
        tick_value = float(spec.get("tick_value", 0.0))
        exit_price = tech_now.price

        contracts_to_close = max(1, active.entry.contracts // 2)
        if contracts_to_close >= active.entry.contracts:
            # Half of 1 -> 0 then bumped to 1 = full close. Demote to EXIT.
            await self._handle_exit(symbol, active, decision, tech_now)
            return

        # V18 12-mag — il nuovo `partial_close_via_opposite_order` piazza un
        # nuovo bracket sul residuo: scegliamo qui new_sl (BE se richiesto,
        # altrimenti SL corrente) e new_tp (TP originale). Il closer poi
        # passa questi prezzi al broker.
        set_be = bool(decision.metadata.get("set_be_after_partial"))
        be_price = decision.metadata.get("be_price", active.entry.entry_price)
        current_sl = active.runtime.current_sl_price or active.entry.sl_price
        new_sl_price = float(be_price) if set_be else float(current_sl)
        new_tp_price = float(active.entry.tp_price)

        try:
            result: TradeCloseResult = await self.closer.partial_close(
                entry=active.entry, exit_price=exit_price,
                contracts_to_close=contracts_to_close,
                reason=decision.reason, tick_size=tick_size, tick_value=tick_value,
                new_sl_price=new_sl_price, new_tp_price=new_tp_price,
            )
        except Exception as e:
            self._sys_log.warning("[manage] %s partial raised: %s", symbol, e)
            # V18 12-mag — anche su exception, marca partial_done per evitare
            # retry infinito ogni tick. Il trade resta gestito dal SL/TP
            # originale (non cancellato in caso di fallimento STEP 1).
            active.runtime.partial_done = True
            self._apply_candle_dedup(decision, active.runtime)
            return

        if not result.success:
            self._sys_log.info(
                "[manage] %s partial failed: %s (%s)", symbol, result.error, result.error_kind,
            )
            # V18 12-mag — partial_done=True su fallimento per bloccare
            # retry infiniti (il Brain rievaluerebbe la stessa partial ogni
            # tick a partire dal next candle). Il fallimento è loggato in
            # system.log + brain_log per analisi.
            active.runtime.partial_done = True
            self._apply_candle_dedup(decision, active.runtime)
            return

        # Risk hook for the realized half
        self.risk_manager.update_daily_pnl(
            result.net_profit_usd, is_win=result.is_win, brain=active.entry.brain_name,
        )

        active.runtime.partial_done = True
        active.runtime.partial_pnl_usd = result.net_profit_usd
        # Il nuovo bracket è già al prezzo richiesto: aggiorna lo specchio
        # runtime senza separata modify_stop (eliminata in V18 — il bracket
        # viene ricostruito dal partial_close_via_opposite_order).
        active.runtime.current_sl_price = new_sl_price

        # V18 12-mag — `contracts` va aggiornato al residuo SEMPRE (paper
        # E live). Senza questo, manage_exit calcolava progress_pct e
        # net_profit_usd su contracts originali, e il monitor mostrava
        # contratti sbagliati. Gli stop/target ID sono swappati SOLO se
        # il broker ha ritornato nuovi ID (live path); in paper mode il
        # broker non viene chiamato e gli ID restano quelli precedenti.
        residual_contracts = active.entry.contracts - contracts_to_close
        active.entry = dataclasses.replace(
            active.entry,
            contracts=residual_contracts,
            stop_order_id=result.new_stop_order_id or active.entry.stop_order_id,
            target_order_id=result.new_target_order_id or active.entry.target_order_id,
        )

        self._log_partial_close(symbol, active, result, decision)

        # V18 13-mag — refresh cached balance immediately after a partial
        # fills realized PnL into the broker account. Without this, the
        # dashboard snapshot keeps the pre-partial balance until the next
        # maintenance tick (≤60s), making equity = balance + unrealized
        # understate by exactly partial_pnl_usd in that window. Mirrors
        # the post-close refresh in _handle_exit. Live-only (broker
        # present); paper has no real balance to reconcile.
        if self.broker is not None:
            # Broker-settle delay before the post-partial balance snapshot
            # (mirrors _handle_exit / _check_external_close). Without it
            # the cached balance — and therefore the next dashboard tick's
            # equity figure — keeps the pre-partial value for up to 60s.
            await asyncio.sleep(0.5)
            try:
                balance_post = float(await self.broker.get_account_balance())
                if balance_post > 0:
                    self._cached_account_balance = balance_post
            except Exception as e:
                self._sys_log.warning(
                    "[manage] %s post-partial balance fetch failed: %s",
                    symbol, e,
                )

        self._apply_candle_dedup(decision, active.runtime)

    async def _handle_move_sl(
        self, symbol: str, active: ActiveTrade, decision: BrainDecision,
    ) -> None:
        if decision.move_sl_to is None:
            self._sys_log.warning("[manage] %s MOVE_SL with no move_sl_to, ignoring", symbol)
            return
        # Snapshot pre-move SL for diagnostics (Layer 4 logging).
        prev_sl = float(active.runtime.current_sl_price or active.entry.sl_price)
        if self.broker is not None and active.entry.stop_order_id:
            try:
                result = await self.broker.modify_stop(
                    symbol, active.entry.stop_order_id, decision.move_sl_to,
                )
            except Exception as e:
                self._sys_log.warning("[manage] %s modify_stop raised: %s", symbol, e)
                self._log_move_sl_failed(
                    symbol, active, decision,
                    prev_sl=prev_sl, error=f"raised: {e}",
                )
                self._apply_candle_dedup(decision, active.runtime)
                return
            if result is not None and not getattr(result, "success", True):
                # Broker returned success=False (no exception): log and
                # do NOT mutate runtime so the next iter sees the actual
                # SL state and may retry/escalate.
                err = getattr(result, "error", "") or "broker rejected modify_stop"
                self._sys_log.warning("[manage] %s modify_stop rejected: %s", symbol, err)
                self._log_move_sl_failed(
                    symbol, active, decision,
                    prev_sl=prev_sl, error=err,
                )
                self._apply_candle_dedup(decision, active.runtime)
                return
        active.runtime.current_sl_price = decision.move_sl_to
        self._log_move_sl(symbol, active, decision, prev_sl=prev_sl)
        self._apply_candle_dedup(decision, active.runtime)

    # ============================================================
    # HELPERS
    # ============================================================

    def _resolve_account_balance(self) -> float:
        """
        Account balance for sizing.
          - PAPER (no broker)     -> config.dry_run_balance
          - LIVE / DRY (broker present) -> await broker.get_account_balance()
            with fallback to config.dry_run_balance on exception or zero.
        DRY mode reads through DryRunBroker which delegates to the real
        broker (real balance, fake writes).
        """
        balance = float(self.config.dry_run_balance)
        if self.broker is None:
            return balance
        try:
            real = asyncio.run_coroutine_threadsafe(
                self.broker.get_account_balance(),
                asyncio.get_event_loop(),
            ).result(timeout=2.0) if False else None
        except Exception:
            real = None
        # Sync resolver simplified: callers in Phase B/C live in async context
        # but this method runs synchronously inside size_for_entry. We use a
        # cached balance updated each tick (see _refresh_account_balance below).
        cached = getattr(self, "_cached_account_balance", None)
        if cached and cached > 0:
            return float(cached)
        return balance

    def _cme_session_day_start_utc(self) -> datetime:
        """
        UTC instant of the most recent 17:00 America/Chicago — the CME
        futures session-day anchor. TopstepX's "RP&L" in the UI is the
        sum of realized P&L since this moment, so we use it as the
        `since` argument when fetching broker trades for the dashboard.

        DST-aware via zoneinfo. If we're before 17:00 CT today, the
        session began at 17:00 CT *yesterday*; otherwise it's today.
        """
        try:
            from zoneinfo import ZoneInfo
        except ImportError:  # pragma: no cover — stdlib since 3.9
            return self._now_utc() - timedelta(hours=24)
        ct = ZoneInfo("America/Chicago")
        now_ct = self._now_utc().astimezone(ct)
        anchor_today = now_ct.replace(hour=17, minute=0, second=0, microsecond=0)
        if now_ct < anchor_today:
            anchor_today = anchor_today - timedelta(days=1)
        return anchor_today.astimezone(timezone.utc)

    async def _refresh_daily_rpnl(self) -> None:
        """
        Pull broker-side realized P&L for the current CME session day
        and cache it on `self._cached_daily_rpnl`. Fail-open: on any
        error keep the previous cached value (or None) so the writer
        falls back to the in-memory daily_pnl counter.
        """
        if self.broker is None:
            return
        try:
            since = self._cme_session_day_start_utc()
            value = await self.broker.get_daily_rpnl(since=since, limit=500)
        except Exception as e:
            self._sys_log.warning("[maintenance] daily_rpnl refresh failed: %s", e)
            return
        if value is None:
            return
        self._cached_daily_rpnl = float(value)
        self._cached_daily_rpnl_at = self._now_utc()

    async def _refresh_account_balance(self) -> None:
        """
        Refresh self._cached_account_balance from the broker. Called once
        per tick (via _run_iteration). On exception -> keep last value (or
        config.dry_run_balance on first failure). LIVE / DRY only.
        """
        if self.broker is None:
            self._cached_account_balance = float(self.config.dry_run_balance)
            return
        timeout = float(getattr(self.config, "broker_call_timeout_seconds", 10.0))
        try:
            v = await asyncio.wait_for(
                self.broker.get_account_balance(), timeout=timeout,
            )
            if v is None or v <= 0:
                raise ValueError(f"broker returned non-positive balance: {v}")
            self._cached_account_balance = float(v)
        except asyncio.TimeoutError:
            self._sys_log.warning(
                "broker.get_account_balance timed out after %.0fs; keeping last/dry_run",
                timeout,
            )
            if not getattr(self, "_cached_account_balance", None):
                self._cached_account_balance = float(self.config.dry_run_balance)
        except Exception as e:
            self._sys_log.warning(
                "broker.get_account_balance failed: %s; keeping last/dry_run", e,
            )
            if not getattr(self, "_cached_account_balance", None):
                self._cached_account_balance = float(self.config.dry_run_balance)

    def _apply_candle_dedup(self, decision: BrainDecision, runtime: TradeRuntime) -> None:
        cd = decision.metadata.get("evaluated_candle_time") if decision.metadata else None
        if cd:
            try:
                runtime.last_exit_eval_time = float(cd)
            except (TypeError, ValueError):
                pass

    def _asset_list(self) -> list[str]:
        if self.config.asset_filter:
            return [a for a in self.config.asset_filter if a in ASSETS_MAP]
        return list(ASSETS_MAP.keys())

    async def _build_tech(
        self, symbol: str, *, min_candle_time: int = 0,
    ) -> Optional[TechSnapshot]:
        meta = ASSETS_MAP.get(symbol)
        if meta is None:
            return None
        timeout = float(getattr(self.config, "market_data_timeout_seconds", 30.0))
        try:
            return await asyncio.wait_for(
                build_tech_snapshot(
                    symbol=symbol,
                    provider=self.provider,
                    tick_size=meta["tick_size"],
                    tick_value=meta["tick_value"],
                    min_candle_time=min_candle_time,
                ),
                timeout=timeout,
            )
        except StaleDataError:
            # Caller (_scan_one) handles broker-lag explicitly.
            raise
        except asyncio.TimeoutError:
            # V18 anti-freeze: SDK WS / REST stallato. Skip questo symbol
            # per il tick e lascia che il loop continui; il prossimo M5
            # boundary ritenterà. Senza wait_for, l'intero scan loop si
            # bloccherebbe e dopo 600s watchdog.sh manderebbe SIGTERM.
            self._sys_log.warning(
                "[tech] %s build_tech_snapshot timed out after %.0fs", symbol, timeout,
            )
            return None
        except Exception as e:
            self._sys_log.debug("[tech] %s build failed: %s", symbol, e)
            return None

    async def _fetch_h4(self, symbol: str):
        timeout = float(getattr(self.config, "h4_fetch_timeout_seconds", 15.0))
        try:
            return await asyncio.wait_for(
                self.provider.get_bars(symbol, "4hour", 50),
                timeout=timeout,
            )
        except asyncio.TimeoutError:
            self._sys_log.warning("[tech] %s H4 fetch timed out after %.0fs", symbol, timeout)
            return None
        except Exception:
            return None

    # ------------------------------------------------------------
    # logging shims
    # ------------------------------------------------------------

    def _log_entry_approved(
        self, symbol: str, brain: str, decision: EntryDecision,
        *, tech: Optional[TechSnapshot] = None,
    ) -> None:
        """
        Symmetric to brain_tf/brain_mr._reject's "entry_rejected" event.
        NOTE: tp_price is the sentinel 0.0 here (Brain didn't finalize TP);
        the resolved tp_price is logged separately via _log_tp_resolved.

        V18 12-mag — quando `tech` è disponibile, i campi RADAR
        (rsi_m5/rsi_h4/h1_compat/macd_*/pattern/atr_ratio/bias/...)
        vengono inclusi così la dashboard web mostra il contesto tecnico
        del trade approvato senza secondary lookup.
        """
        if self.logger is None:
            return
        meta = decision.metadata or {}
        try:
            self.logger.brain_log.write(
                "entry_approved",
                symbol=symbol,
                brain=brain,
                direction=decision.direction,
                entry_price=decision.entry_price,
                sl_price=decision.sl_price,
                tp_price_pre_resolve=decision.tp_price,
                rr_multiplier_ai=decision.rr_multiplier,
                confidence=int(decision.confidence),
                reason=str(decision.rationale)[:200] if decision.rationale else "",
                sl_atr_multiplier=meta.get("sl_atr_multiplier"),
                tp_rationale_ai=meta.get("tp_rationale_ai"),
                clamp_active=meta.get("clamp_active"),
                sl_ticks_used=meta.get("sl_ticks_used"),
                sl_floored=meta.get("sl_floored"),
                sl_min_ticks=meta.get("sl_min_ticks"),
                profile_origin=meta.get("profile_origin"),
                risk_multiplier=meta.get("risk_multiplier"),
                step_1=meta.get("step_1"),
                step_2=meta.get("step_2"),
                step_3=meta.get("step_3"),
                key_risk=meta.get("key_risk"),
                **tech_log_fields(tech),
            )
        except Exception:
            pass

    def _log_tp_resolved(
        self,
        symbol: str,
        brain: str,
        decision: EntryDecision,
        tp_res: TPResolution,
    ) -> None:
        """
        Emit tp_resolved event on brain_log so calibration can correlate
        rr_multiplier (AI input) with tp_ticks_final / rr_effective (post
        MIN_TP_TICKS clamp) and the dollar-domain values (sl_usd_actual,
        tp_usd_target). Lifecycle event, separate from entry_approved.
        """
        if self.logger is None:
            return
        try:
            self.logger.brain_log.write(
                "tp_resolved",
                symbol=symbol,
                brain=brain,
                direction=decision.direction,
                entry_price=decision.entry_price,
                sl_price=decision.sl_price,
                tp_price=tp_res.tp_price,
                tp_source=tp_res.tp_source,
                tp_price_suggested_ai=float(
                    decision.metadata.get("tp_price_suggested_ai", 0.0) or 0.0
                ),
                rr_multiplier_ai=decision.rr_multiplier,
                rr_multiplier_used=tp_res.rr_multiplier_used,
                rr_effective=tp_res.rr_effective,
                tp_ticks_final=tp_res.tp_ticks_final,
                tp_distance=tp_res.tp_distance,
                sl_usd_actual=tp_res.sl_usd_actual,
                tp_usd_target=tp_res.tp_usd_target,
                min_tp_ticks_applied=tp_res.min_tp_ticks_applied,
            )
        except Exception:
            pass

    def _log_trade_opened(
        self, symbol: str, brain: str, entry, sizing: SizingDecision, risk,
    ) -> None:
        if self.logger is None:
            return
        try:
            self.logger.log_trade_opened(
                symbol=symbol, brain=brain,
                direction=entry.direction, contracts=entry.contracts,
                entry_price=entry.entry_price, sl_price=entry.sl_price,
                tp_price=entry.tp_price,
                rule=risk.rule, real_risk_usd=sizing.real_risk_usd,
            )
        except Exception:
            pass

    def _log_trade_closed(
        self, symbol: str, result: TradeCloseResult, decision: BrainDecision,
        active: Optional[ActiveTrade] = None,
    ) -> None:
        if self.logger is None:
            return
        # A/B test passivo SL (v18-dev): confronto SL reale (post-clamp) vs.
        # SL grezzo AI (pre-clamp). `sl_ai_raw_hit` indica se il livello
        # grezzo è stato toccato durante la vita del trade; valori 0.0
        # significano "trade aperto prima del feature flag" o "AI non ha
        # emesso sl_price_ai_raw".
        ab_extra: dict = {}
        if active is not None:
            ab_extra = {
                "sl_ai_raw_hit":   bool(active.runtime.sl_ai_raw_hit),
                "sl_price_ai_raw": float(active.runtime.sl_price_ai_raw),
                "sl_price_real":   float(active.entry.sl_price),
            }
        try:
            self.logger.log_trade_closed(
                symbol=symbol, brain=result.brain_name,
                direction=result.direction, contracts=result.contracts,
                reason=decision.reason,
                entry_price=result.entry_price, exit_price=result.exit_price,
                profit_ticks=result.profit_ticks,
                net_profit_usd=result.net_profit_usd,
                is_win=result.is_win,
                duration_minutes=result.duration_minutes,
                broker_orders_cancelled=result.broker_orders_cancelled,
                orphan_cancel_method=result.orphan_cancel_method,
                **ab_extra,
            )
        except Exception:
            pass

    def _log_partial_close(
        self, symbol: str, active: ActiveTrade, result: TradeCloseResult,
        decision: BrainDecision,
    ) -> None:
        if self.logger is None:
            return
        try:
            self.logger.brain_log.write(
                "partial_close",
                symbol=symbol, brain=active.entry.brain_name,
                contracts_closed=result.contracts,
                contracts_remaining=active.entry.contracts - result.contracts,
                partial_pnl_usd=result.net_profit_usd,
                set_be_after_partial=bool(decision.metadata.get("set_be_after_partial")),
                reason=decision.reason[:200],
            )
        except Exception:
            pass

    def _log_move_sl(
        self, symbol: str, active: ActiveTrade, decision: BrainDecision,
        *, prev_sl: Optional[float] = None,
    ) -> None:
        if self.logger is None:
            return
        meta = decision.metadata or {}
        try:
            from core.config_futures import ASSETS_MAP as _AM
            tick_size = float(_AM.get(symbol, {}).get("tick_size", 0.0) or 0.0)
        except Exception:
            tick_size = 0.0
        try:
            self.logger.brain_log.write(
                "move_sl",
                symbol=symbol, brain=active.entry.brain_name,
                current_sl_price=prev_sl,
                new_sl_price=decision.move_sl_to,
                aligned_sl_price=decision.move_sl_to,  # post-round = emitted
                raw_sl_price=meta.get("raw_sl_price"),
                tick_size=tick_size,
                target=meta.get("sl_target"),
                reason=decision.reason[:200],
            )
        except Exception:
            pass

    def _log_move_sl_failed(
        self, symbol: str, active: ActiveTrade, decision: BrainDecision,
        *, prev_sl: float, error: str,
    ) -> None:
        """
        Emitted when broker.modify_stop returns success=False or raises.
        Distinct from move_sl (which is success-only) so jq queries on
        brain_log.jsonl can isolate failed attempts.
        """
        if self.logger is None:
            return
        try:
            self.logger.brain_log.write(
                "move_sl_failed",
                symbol=symbol, brain=active.entry.brain_name,
                current_sl_price=prev_sl,
                attempted_sl_price=decision.move_sl_to,
                target=(decision.metadata or {}).get("sl_target"),
                error=str(error)[:200],
                reason=decision.reason[:200] if decision.reason else "",
            )
        except Exception:
            pass

    def _log_action_deferred(
        self, symbol: str, action: str, decision: BrainDecision,
    ) -> None:
        if self.logger is None:
            return
        try:
            self.logger.brain_log.write(
                "exit_deferred_disconnected",
                symbol=symbol, action=action,
                reason=decision.reason[:200] if decision and decision.reason else "",
            )
        except Exception:
            pass

    def _log_skip(
        self, symbol: str, reason: str, *,
        tech: Optional[TechSnapshot] = None, bias=None, brain: Optional[str] = None,
        details: Optional[dict] = None,
    ) -> None:
        if self.logger is None:
            return
        # Spread details top-level so jq queries on brain_log.jsonl can
        # filter by rsi/rsi_bouncing/night_tf_block/etc directly. Reserved
        # keys (symbol/reason/brain/regime/allowed_direction/event) are
        # protected from accidental override.
        extra: dict = {}
        if details:
            reserved = {
                "symbol", "reason", "brain", "regime", "allowed_direction", "event",
            }
            extra = {k: v for k, v in details.items() if k not in reserved}
        try:
            self.logger.brain_log.write(
                "scan_skip",
                symbol=symbol, reason=reason, brain=brain,
                regime=tech.regime if tech is not None else None,
                allowed_direction=bias.allowed_direction if bias else None,
                **extra,
            )
        except Exception:
            pass

    def _log_risk_skip(
        self, symbol: str, brain: str, decision: EntryDecision,
        sizing: SizingDecision, risk,
    ) -> None:
        if self.logger is None:
            return
        try:
            self.logger.brain_log.write(
                "scan_skip_risk",
                symbol=symbol, brain=brain,
                rule=risk.rule, reason=risk.reason,
                direction=decision.direction,
                contracts=sizing.contracts,
                real_risk_usd=sizing.real_risk_usd,
                **risk.audit_dict(),
            )
        except Exception:
            pass
