"""
================================================================
  🔌 TOPSTEPX ADAPTER — V15 (ASYNC NATIVE)
================================================================

  IMPORTANTE — CAMBIO ARCHITETTURALE vs versione precedente:
  
  La versione precedente tentava un wrapper SYNC sopra il SDK ASYNC
  usando un event loop in un thread separato. Non funzionava: il SDK
  `project-x-py` spawna task interni (WebSocket callbacks, position
  processor) che si aspettano un event loop NEL thread corrente.
  Risultato: spam di errori "no running event loop" illeggibili.
  
  Questa versione è ASYNC-NATIVA:
    - Ogni metodo è `async def`
    - Tutto gira nello stesso event loop
    - I Brain V14 (sync, CPU-bound, chiamano Claude API) vanno usati
      con `await asyncio.to_thread(brain.ask_entry, ...)` dal main

  API PUBBLICA:
    async connect(instruments)               -> bool
    async disconnect()
    async get_bars(symbol, timeframe, count) -> pd.DataFrame
    async get_current_price(symbol)          -> float | None
    async get_account_info()                 -> dict
    async positions_get(symbol=None)         -> List[dict]
    async place_market_bracket(symbol, side, size, sl, tp) -> dict
    async modify_sl(symbol, stop_order_id, new_sl)         -> bool
    async close_position(symbol, size=None)  -> dict
    async get_cumulative_delta(symbol, minutes) -> float | None

    (sync, solo lookup da cache locale)
    get_instrument_info(symbol) -> dict

  REQUIREMENTS:
    pip install project-x-py pandas
"""
from __future__ import annotations

import logging
from typing import Any, Optional, List, Dict

import pandas as pd

try:
    from project_x_py import TradingSuite
    from project_x_py.models import Position, BracketOrderResponse
    _SDK_AVAILABLE = True
except ImportError:
    _SDK_AVAILABLE = False
    TradingSuite = None  # type: ignore

# APEX V18: applica patch SDK persistenti (Position kwargs filter, ecc.).
# Sopravvive a pip install / venv recreation: il fix è qui, non in venv/.
if _SDK_AVAILABLE:
    from broker._sdk_patches import apply_sdk_patches
    apply_sdk_patches()

# Applica monkey-patch al SDK per fixare il bug depth=None
try:
    from orderbook_patch import apply_orderbook_patch
    apply_orderbook_patch()
except ImportError:
    pass  # patch optional

log = logging.getLogger("tsx_adapter")


# ─────────────────────────────────────────────────────────────────
# 🔇 SDK LOG HANDLER FILTER
# ─────────────────────────────────────────────────────────────────
class _JsonNoiseFilter(logging.Filter):
    """
    Il SDK project-x-py emette log JSON strutturati rumorosi (warning
    psutil, DST init, throttle notices). Filtriamo dal root logger 
    per tenere la console leggibile.
    
    Filtra per contenuto specifico — NON nasconde errori veri.
    """
    NOISE_SUBSTRINGS = [
        "psutil not available",
        "DST handling initialized",
        "throttle",
        "Dynamic resource limits",
        # Orderbook SDK bug (v3.5.x): spam 'NoneType' on depth entries.
        # Innocuo ma rumoroso. In Fase 1 teniamo orderbook off, in Fase 2
        # lo integreremo con care.
        "Error processing depth entry",
        "Orderbook reset due to RESET event",
        "Gap will be filled when real-time data arrives",
        # Statistics bounded logger noise during init
        "Dynamic resource configuration updated",
        "Dynamic resource limits enabled",
        "Started dynamic resource monitoring",
        "Cleanup scheduler",
        # SignalR reconnect notices
        "on_reconnect not defined",
        "Connection closed EOF",
    ]
    
    def filter(self, record: logging.LogRecord) -> bool:
        msg = record.getMessage()
        for noise in self.NOISE_SUBSTRINGS:
            if noise in msg:
                return False
        return True


def install_log_filter():
    """Installa il filtro sul root logger. Idempotente."""
    root = logging.getLogger()
    for f in root.filters:
        if isinstance(f, _JsonNoiseFilter):
            return
    root.addFilter(_JsonNoiseFilter())


# ─────────────────────────────────────────────────────────────────
# 🔌 TOPSTEPX ASYNC ADAPTER
# ─────────────────────────────────────────────────────────────────
class TopstepXAdapter:
    """
    Async-native facade sopra project-x-py TradingSuite.
    
    Usage:
        async def main():
            tsx = TopstepXAdapter()
            await tsx.connect(["MES", "MNQ", "MGC", "MCL"])
            bars = await tsx.get_bars("MES", "5min", 200)
            await tsx.disconnect()
        
        asyncio.run(main())
    """

    TIMEFRAME_MAP = {
        "M5":    "5min",
        "M15":   "15min",
        "H1":    "1hr",
        "H4":    "4hr",
        "D1":    "1day",
        # passthrough SDK-native
        "5min":  "5min",
        "15min": "15min",
        "1hr":   "1hr",
        "4hr":   "4hr",
        "1day":  "1day",
        # legacy aliases
        "1hour": "1hr",
        "4hour": "4hr",
    }

    # ProjectX convention: 0 = BUY, 1 = SELL
    SIDE_MAP = {"BUY": 0, "SELL": 1, 0: 0, 1: 1}

    def __init__(self):
        if not _SDK_AVAILABLE:
            raise RuntimeError(
                "project-x-py non installato. "
                "Esegui: pip install project-x-py"
            )
        install_log_filter()
        self._suite: Optional[TradingSuite] = None
        self._connected = False
        self._instruments: List[str] = []
        self._instrument_cache: Dict[str, Dict[str, Any]] = {}
        self._enable_orderbook = True

    # ── LIFECYCLE ─────────────────────────────────────────────────

    async def connect(
        self,
        instruments: List[str],
        timeframes: Optional[List[str]] = None,
        enable_orderbook: bool = False,   # FASE 1: orderbook OFF (SDK bug v3.5.x)
    ) -> bool:
        """
        Autentica, crea TradingSuite, sottoscrive ai dati real-time.
        
        NOTE sulla feature `enable_orderbook`:
        In SDK v3.5.x c'è un bug noto che genera spam di errori
        "Error processing depth entry" sul feed L2. Teniamo orderbook
        disabilitato in Fase 1 (comunque abbiamo volumi reali sulle barre).
        Lo riattiveremo in Fase 2 quando integreremo delta/volume profile
        con più care (eventualmente usando un workaround custom sul raw WS).
        """
        self._instruments = instruments
        self._enable_orderbook = enable_orderbook
        
        tfs = timeframes or ["5min", "1hr", "4hr"]
        features = []
        if enable_orderbook:
            features.append("orderbook")

        try:
            # 🔧 V15 optimization (24/04): disabilita tick processing SDK.
            # L'SDK processa ogni quote update con filter+polars.collect su
            # DataFrame grossi → blocca main thread per minuti (vedi
            # py-spy dump). V15 usa REST fallback (get_bars) ogni scan,
            # quindi live tick è ridondante e nocivo.
            data_mgr_cfg = {
                "enable_tick_data": False,     # ← KEY: no tick live processing
                "enable_level2_data": False,   # no orderbook depth live
            }
            self._suite = await TradingSuite.create(
                instruments=instruments,
                timeframes=tfs,
                features=features,
                data_manager_config=data_mgr_cfg,
            )
            self._connected = True
            log.info(f"[TSX] Connected — account={self._get_account_name()}")

            for sym in instruments:
                ctx = self._suite[sym]
                inst = ctx.instrument_info
                self._instrument_cache[sym] = {
                    "contract_id":  inst.id,
                    "symbol":       sym,
                    "name":         inst.name,
                    "tick_size":    float(inst.tickSize),
                    "tick_value":   float(inst.tickValue),
                    "description":  inst.description,
                }
                log.info(
                    f"[TSX] {sym} → contract={inst.id}, "
                    f"tick_size={inst.tickSize}, "
                    f"tick_value=${inst.tickValue}"
                )
            return True
        except Exception as e:
            log.error(f"[TSX] Connect failed: {e!r}")
            self._connected = False
            return False

    async def disconnect(self):
        if self._suite:
            try:
                await self._suite.disconnect()
            except Exception as e:
                log.warning(f"[TSX] Disconnect warning: {e}")
            self._suite = None
        self._connected = False
        log.info("[TSX] Disconnected")

    def is_connected(self) -> bool:
        return self._connected and self._suite is not None

    def _get_account_name(self) -> str:
        try:
            return self._suite.client.account_info.name
        except Exception:
            return "unknown"

    def _get_ctx(self, symbol: str):
        if not self._suite:
            raise RuntimeError("Not connected. Call connect() first.")
        return self._suite[symbol]

    def _align_to_tick(self, symbol: str, price: float) -> float:
        info = self._instrument_cache.get(symbol)
        if not info:
            return round(price, 2)
        ts = info["tick_size"]
        return round(round(price / ts) * ts, 6)

    # ── MARKET DATA ───────────────────────────────────────────────

    # Timeframe → (ratio per REST interval in min)
    _TF_TO_MINUTES = {
        "1min": 1, "5min": 5, "15min": 15, "30min": 30,
        "1hr": 60, "4hr": 240, "1d": 1440,
        "1m": 1, "5m": 5, "15m": 15, "30m": 30,
        "1h": 60, "4h": 240,
    }
    
    def _df_last_age_seconds(self, df: pd.DataFrame) -> Optional[float]:
        """Age in seconds dell'ultima barra. None se df vuoto o senza 'time'."""
        try:
            if df is None or df.empty or "time" not in df.columns:
                return None
            from datetime import datetime, timezone
            last_ts = df["time"].iloc[-1]
            if hasattr(last_ts, "to_pydatetime"):
                dt = last_ts.to_pydatetime()
            else:
                dt = last_ts
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc)
            return (datetime.now(timezone.utc) - dt).total_seconds()
        except Exception:
            return None
    
    def _is_close_frozen(self, symbol: str, tf: str, df: pd.DataFrame) -> bool:
        """
        True se il close dell'ultima barra è identico per N fetch consecutivi.
        
        Quando WS è disconnesso, `ctx.data.get_data` restituisce sempre gli stessi
        dati cached. La barra corrente non si aggiorna con i tick real-time → 
        close identico tra fetch consecutivi = WS probabilmente rotto.
        
        Track per (symbol, tf) key. Dopo 2 fetch consecutivi con stesso close,
        consideriamo i dati "frozen".
        """
        try:
            if df is None or df.empty or "close" not in df.columns:
                return False
            
            current_close = float(df["close"].iloc[-1])
            current_ts = df["time"].iloc[-1] if "time" in df.columns else None
            
            if not hasattr(self, "_last_seen_close"):
                self._last_seen_close = {}
            
            key = f"{symbol}:{tf}"
            prev = self._last_seen_close.get(key)
            
            # Update registro
            self._last_seen_close[key] = {
                "close": current_close,
                "ts": current_ts,
                "count": 1,
            }
            
            if prev is None:
                return False
            
            # Se stesso timestamp (stessa barra) E stesso close → incrementa counter
            # Se close identico per 2+ fetch con stesso timestamp, è frozen
            if (prev.get("ts") == current_ts and
                abs(prev.get("close", 0) - current_close) < 1e-12):
                new_count = prev.get("count", 1) + 1
                self._last_seen_close[key]["count"] = new_count
                # Frozen dopo 2 fetch consecutivi identici
                return new_count >= 2
            
            return False
        except Exception:
            return False

    async def get_bars(
        self, symbol: str, timeframe: str, count: int = 200
    ) -> pd.DataFrame:
        """
        DataFrame pandas con [time, open, high, low, close, volume, tick_volume].
        
        STRATEGIA V15 (hybrid realtime + REST fallback + sticky mode):
        
          MODE NORMAL (default):
            - Prova `ctx.data.get_data` (WS-backed, fast)
            - Fallback REST se: empty, timestamp stale, close frozen, FORCE_REST_ONLY
          
          MODE STICKY (auto-attivato dopo N frozen consecutivi):
            - Va SEMPRE direttamente a REST (skip WS, no noise)
            - Ogni `WS_RETRY_INTERVAL_SEC` (default 5min) ritenta WS una volta
              per auto-recovery se il WebSocket torna su
          
          MODE FORCED (env var FORCE_REST_ONLY=1):
            - Sempre e solo REST, ignora WS completamente
        """
        import os, time
        force_rest = os.getenv("FORCE_REST_ONLY", "0") == "1"
        
        tf = self.TIMEFRAME_MAP.get(timeframe, timeframe)
        tf_minutes = self._TF_TO_MINUTES.get(tf) or self._TF_TO_MINUTES.get(timeframe) or 5
        
        # Init sticky state lazily
        if not hasattr(self, "_ws_sticky_down"):
            self._ws_sticky_down = False
            self._ws_sticky_since = 0.0
            self._ws_retry_interval_sec = 300  # 5 min
            self._ws_sticky_frozen_count = 0
        
        # Modalità forzata o sticky down → REST diretto (ma con retry periodico)
        use_rest_direct = force_rest
        if not force_rest and self._ws_sticky_down:
            # Ogni 5min ritenta il WS per vedere se è tornato
            elapsed = time.time() - self._ws_sticky_since
            if elapsed >= self._ws_retry_interval_sec:
                log.info(f"[TSX] Retry WS dopo {elapsed:.0f}s sticky REST-only")
                # Esci dalla modalità sticky e ritenta
                self._ws_sticky_down = False
                self._ws_sticky_frozen_count = 0
                # Reset freeze tracker per permettere nuova detection
                if hasattr(self, "_last_seen_close"):
                    self._last_seen_close.clear()
            else:
                use_rest_direct = True
        
        if use_rest_direct:
            df_rest = await self._get_bars_rest(symbol, tf_minutes, count)
            return df_rest if df_rest is not None else pd.DataFrame()
        
        df = await self._get_bars_realtime(symbol, tf, count)
        
        # Trigger 1: timestamp stale
        age = self._df_last_age_seconds(df)
        stale_threshold_sec = 2.5 * tf_minutes * 60
        is_time_stale = age is not None and age > stale_threshold_sec
        
        # Trigger 2: close frozen
        is_frozen = self._is_close_frozen(symbol, tf, df)
        
        # Trigger 3: df vuoto
        is_empty = df is None or df.empty
        
        if is_empty or is_time_stale or is_frozen:
            if is_empty:
                reason = "empty"
            elif is_time_stale:
                reason = f"timestamp {age:.0f}s stale"
            else:
                reason = "close frozen"
            
            # Incrementa contatore frozen per sticky detection
            self._ws_sticky_frozen_count += 1
            
            # Dopo 3 frozen consecutivi → WS è davvero morto, passa sticky
            SITCKY_THRESHOLD = 3
            if self._ws_sticky_frozen_count >= SITCKY_THRESHOLD and not self._ws_sticky_down:
                log.warning(
                    f"[TSX] ⚠️  WebSocket down confermato ({self._ws_sticky_frozen_count} "
                    f"frozen). Attivo modalità REST-only per {self._ws_retry_interval_sec}s"
                )
                self._ws_sticky_down = True
                self._ws_sticky_since = time.time()
            else:
                # Log singolo warning solo sotto soglia
                if self._ws_sticky_frozen_count <= SITCKY_THRESHOLD:
                    log.warning(
                        f"[TSX] {symbol} {tf} data {reason} → fallback REST"
                    )
            
            df_rest = await self._get_bars_rest(symbol, tf_minutes, count)
            if df_rest is not None and not df_rest.empty:
                # Reset freeze counter per questo key dopo REST fresh
                if hasattr(self, "_last_seen_close"):
                    key = f"{symbol}:{tf}"
                    self._last_seen_close.pop(key, None)
                return df_rest
            log.error(f"[TSX] {symbol} {tf} REST fallback also empty")
        else:
            # Fetch WS è fresco → reset frozen counter globale (WS è vivo)
            if self._ws_sticky_frozen_count > 0:
                log.info(f"[TSX] ✅ WS ha ricevuto tick fresh → reset frozen counter")
                self._ws_sticky_frozen_count = 0
        
        return df if df is not None else pd.DataFrame()
    
    async def _get_bars_realtime(
        self, symbol: str, tf: str, count: int
    ) -> pd.DataFrame:
        """Fetch da realtime data manager (WebSocket-backed)."""
        try:
            ctx = self._get_ctx(symbol)
            df_pl = await ctx.data.get_data(tf, bars=count)
        except Exception as e:
            log.warning(f"[TSX] realtime get_bars({symbol}, {tf}) error: {e}")
            return pd.DataFrame()

        if df_pl is None or df_pl.is_empty():
            return pd.DataFrame()

        return self._pl_to_pandas(df_pl, count)
    
    async def _get_bars_rest(
        self, symbol: str, interval_minutes: int, count: int
    ) -> pd.DataFrame:
        """
        Fetch via REST DIRETTA (bypass SDK cache).
        🔧 V15 27/04 v3: SDK get_bars è cached. Usa /api/History/retrieveBars
        direttamente per garantire bar fresche anche con WS down.
        Fallback al SDK solo se direct REST fallisce (es. errore network).
        """
        # Tenta prima REST diretta
        try:
            df_direct = await self._fetch_bars_direct_rest(symbol, interval_minutes, count)
            if df_direct is not None and not df_direct.empty:
                return df_direct
        except Exception as e:
            log.warning(f"[TSX] {symbol} direct REST bars failed: {e}, fallback SDK")

        # Fallback SDK (vecchio comportamento)
        try:
            client = self._suite.client
            bars_per_day = max(1, (1440 // interval_minutes))
            days = max(1, min(30, (count // bars_per_day) + 2))
            df_pl = await client.get_bars(
                symbol, days=days, interval=interval_minutes
            )
        except Exception as e:
            log.error(f"[TSX] REST get_bars({symbol}, {interval_minutes}min) error: {e}")
            return pd.DataFrame()
        
        if df_pl is None:
            return pd.DataFrame()
        # Può essere polars DataFrame o pandas (varia tra versioni SDK)
        try:
            if hasattr(df_pl, "is_empty") and df_pl.is_empty():
                return pd.DataFrame()
            if hasattr(df_pl, "empty") and df_pl.empty:
                return pd.DataFrame()
        except Exception:
            pass
        
        return self._pl_to_pandas(df_pl, count)
    
    def _pl_to_pandas(self, df_obj, count: int) -> pd.DataFrame:
        """Convert polars/pandas DF a pandas standard con colonne normalizzate."""
        # Convert polars → pandas
        if hasattr(df_obj, "to_pandas"):
            try:
                df = df_obj.to_pandas(use_pyarrow_extension_array=False)
            except (ImportError, ModuleNotFoundError):
                df = pd.DataFrame({col: df_obj[col].to_list() for col in df_obj.columns})
            except Exception:
                df = pd.DataFrame({col: df_obj[col].to_list() for col in df_obj.columns})
        else:
            df = df_obj  # already pandas

        rename_map = {
            "timestamp": "time",
            "Timestamp": "time",
            "Open":   "open", "High":   "high",
            "Low":    "low",  "Close":  "close",
            "Volume": "volume",
        }
        df = df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns})

        if "volume" in df.columns and "tick_volume" not in df.columns:
            df["tick_volume"] = df["volume"]

        if "time" in df.columns:
            df = df.sort_values("time").tail(count).reset_index(drop=True)

        return df

    async def get_current_price(self, symbol: str) -> Optional[float]:
        """
        Prezzo corrente dell'instrumento.
        
        Strategia:
          1. Se WS sticky down → REST diretto (da ultima M1 bar)
          2. Altrimenti prova ctx.data.get_current_price() (WS-backed)
          3. Se ritorna None o WS frozen → REST fallback
        """
        # Se siamo in modalità sticky WS-down, vai direttamente REST
        if getattr(self, "_ws_sticky_down", False):
            return await self._get_current_price_rest(symbol)
        
        try:
            ctx = self._get_ctx(symbol)
            p = await ctx.data.get_current_price()
            if p is not None:
                return float(p)
        except Exception as e:
            log.warning(f"[TSX] get_current_price({symbol}) WS failed: {e}")
        
        # Fallback REST se WS ritorna None o fallisce
        return await self._get_current_price_rest(symbol)
    
    async def _fetch_bars_direct_rest(self, symbol, interval_minutes, count):
        """
        V15 27/04 v3: REST diretta /api/History/retrieveBars per N barre.
        Usato da _get_bars_rest per bypass cache SDK.
        Returns: pandas DataFrame o None se fail.
        """
        import os, requests, pandas as pd
        from datetime import datetime, timezone, timedelta
        try:
            info = self.get_instrument_info(symbol)
            if not info or not info.get("contract_id"):
                return None
            contract_id = info["contract_id"]
            api_key  = os.getenv("PROJECT_X_API_KEY")
            username = os.getenv("PROJECT_X_USERNAME")
            if not api_key or not username:
                return None
            now_ts = datetime.now(timezone.utc).timestamp()
            cached_token = getattr(self, "_direct_rest_token", None)
            cached_exp   = getattr(self, "_direct_rest_token_exp", 0)
            if cached_token and now_ts < cached_exp:
                token = cached_token
            else:
                r = requests.post(
                    "https://api.topstepx.com/api/Auth/loginKey",
                    json={"userName": username, "apiKey": api_key},
                    timeout=5,
                )
                if r.status_code != 200:
                    return None
                token = r.json().get("token")
                if not token:
                    return None
                self._direct_rest_token = token
                self._direct_rest_token_exp = now_ts + 1800
            headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
            now = datetime.now(timezone.utc)
            mins_back = max(count * interval_minutes * 3 + interval_minutes * 10, 480)
            start = now - timedelta(minutes=mins_back)
            r = requests.post(
                "https://api.topstepx.com/api/History/retrieveBars",
                headers=headers,
                json={
                    "contractId": contract_id,
                    "live": False,
                    "startTime": start.isoformat(),
                    "endTime": now.isoformat(),
                    "unit": 2,
                    "unitNumber": interval_minutes,
                    "limit": count + 5,
                    "includePartialBar": False,
                },
                timeout=8,
            )
            if r.status_code != 200:
                return None
            data = r.json()
            if not data.get("success"):
                return None
            bars = data.get("bars") or []
            if not bars:
                return None
            rows = []
            for b in bars:
                rows.append({
                    "time": b.get("t"),
                    "open": b.get("o"),
                    "high": b.get("h"),
                    "low": b.get("l"),
                    "close": b.get("c"),
                    "volume": b.get("v") or 0,
                })
            df = pd.DataFrame(rows)
            df["time"] = pd.to_datetime(df["time"], utc=True)
            df = df.sort_values("time").tail(count).reset_index(drop=True)
            df["tick_volume"] = df["volume"]
            return df
        except Exception as e:
            log.warning(f"[TSX] {symbol} _fetch_bars_direct_rest error: {e}")
            return None

    async def _fetch_bar_direct_rest(self, symbol, minutes_back=10):
        """
        V15 27/04: REST diretta retrieveBars per bypassare cache SDK.
        Usa live=False (live=True ritorna errorCode 1).
        """
        import os, requests
        from datetime import datetime, timezone, timedelta
        try:
            info = self.get_instrument_info(symbol)
            if not info or not info.get("contract_id"):
                return None
            contract_id = info["contract_id"]
            api_key  = os.getenv("PROJECT_X_API_KEY")
            username = os.getenv("PROJECT_X_USERNAME")
            if not api_key or not username:
                return None
            now_ts = datetime.now(timezone.utc).timestamp()
            cached_token = getattr(self, "_direct_rest_token", None)
            cached_exp   = getattr(self, "_direct_rest_token_exp", 0)
            if cached_token and now_ts < cached_exp:
                token = cached_token
            else:
                r = requests.post(
                    "https://api.topstepx.com/api/Auth/loginKey",
                    json={"userName": username, "apiKey": api_key},
                    timeout=5,
                )
                if r.status_code != 200:
                    return None
                token = r.json().get("token")
                if not token:
                    return None
                self._direct_rest_token = token
                self._direct_rest_token_exp = now_ts + 1800
            headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
            now = datetime.now(timezone.utc)
            start = now - timedelta(minutes=minutes_back)
            r = requests.post(
                "https://api.topstepx.com/api/History/retrieveBars",
                headers=headers,
                json={
                    "contractId": contract_id,
                    "live": False,
                    "startTime": start.isoformat(),
                    "endTime": now.isoformat(),
                    "unit": 2,
                    "unitNumber": 1,
                    "limit": minutes_back + 1,
                    "includePartialBar": False,
                },
                timeout=5,
            )
            if r.status_code != 200:
                return None
            data = r.json()
            if not data.get("success"):
                return None
            bars = data.get("bars") or []
            if not bars:
                return None
            return bars[0]
        except Exception as e:
            log.warning(f"[TSX] {symbol} direct REST failed: {e}")
            return None

    async def _get_current_price_rest(self, symbol: str) -> Optional[float]:
        """
        Prezzo corrente via REST DIRETTA (bypass SDK cache).
        🔧 V15 27/04 v2: SDK get_bars è cached (non aggiorna se WS down).
        Usa _fetch_bar_direct_rest che chiama /api/History/retrieveBars
        direttamente. Scarta bar > 180s (= 3 M1 chiuse di ritardo accettabile).
        """
        from datetime import datetime, timezone
        MAX_AGE_SEC = 180

        try:
            bar = await self._fetch_bar_direct_rest(symbol, minutes_back=5)
            if not bar:
                # Fallback al vecchio metodo SDK se direct REST fallisce
                df = await self._get_bars_rest(symbol, interval_minutes=1, count=1)
                if df is None or df.empty or "close" not in df.columns:
                    return None
                return float(df["close"].iloc[-1])

            # Check freschezza
            ts_str = bar.get("t")
            if ts_str:
                from datetime import datetime as _dt
                last_ts = _dt.fromisoformat(ts_str.replace("Z", "+00:00"))
                age = (datetime.now(timezone.utc) - last_ts).total_seconds()
                if age > MAX_AGE_SEC:
                    log.warning(f"[TSX] {symbol} REST direct stale: bar age={age:.0f}s (> {MAX_AGE_SEC}s)")
                    return None
            close = bar.get("c")
            if close is not None:
                return float(close)
            return None
        except Exception as e:
            log.warning(f"[TSX] get_current_price REST direct({symbol}) failed: {e}")
            return None

    def get_instrument_info(self, symbol: str) -> Optional[Dict[str, Any]]:
        """Sync: lookup da cache popolata al connect()."""
        return self._instrument_cache.get(symbol)

    # ── ACCOUNT ───────────────────────────────────────────────────

    async def get_account_info(self) -> Dict[str, Any]:
        try:
            acc = self._suite.client.account_info
            return {
                "name":    acc.name,
                "balance": float(acc.balance),
                "id":      getattr(acc, "id", None),
            }
        except Exception as e:
            log.warning(f"[TSX] get_account_info failed: {e}")
            return {}

    # ── POSITIONS ─────────────────────────────────────────────────

    async def _positions_direct_rest(self) -> List[Dict[str, Any]]:
        """
        🔧 V15 27/04: REST diretta a /api/Position/searchOpen, bypassa SDK
        rate limiter che ritarda fino a timeout. Usato da positions_get
        come primary path; fallback a _raw_positions_search se fail.
        Token cache 30min condivisa con _fetch_bar_direct_rest.
        """
        import os, time, requests
        from datetime import datetime, timezone

        def _post_retry(url, **kwargs):
            # 1 tentativo + 2 retry su Timeout/ConnectionError, backoff 0.2s.
            # HTTP status != 200 NON viene riprovato (gestito dai chiamanti).
            last_exc = None
            for attempt in range(3):
                try:
                    return requests.post(url, **kwargs)
                except (requests.Timeout, requests.ConnectionError) as e:
                    last_exc = e
                    if attempt < 2:
                        time.sleep(0.2)
            raise last_exc

        try:
            api_key  = os.getenv("PROJECT_X_API_KEY")
            username = os.getenv("PROJECT_X_USERNAME")
            if not api_key or not username:
                return []
            now_ts = datetime.now(timezone.utc).timestamp()
            cached_token = getattr(self, "_direct_rest_token", None)
            cached_exp   = getattr(self, "_direct_rest_token_exp", 0)
            if cached_token and now_ts < cached_exp:
                token = cached_token
            else:
                r = _post_retry(
                    "https://api.topstepx.com/api/Auth/loginKey",
                    json={"userName": username, "apiKey": api_key},
                    timeout=5,
                )
                if r.status_code != 200:
                    return []
                token = r.json().get("token")
                if not token:
                    return []
                self._direct_rest_token = token
                self._direct_rest_token_exp = now_ts + 1800
            client = self._suite.client if self._suite else None
            acc = getattr(client, "account_info", None) if client else None
            account_id = getattr(acc, "id", None) if acc else None
            if not account_id:
                return []
            r = _post_retry(
                "https://api.topstepx.com/api/Position/searchOpen",
                headers={"Authorization": "Bearer " + token, "Content-Type": "application/json"},
                json={"accountId": int(account_id)},
                timeout=4,
            )
            if r.status_code != 200:
                return []
            return r.json().get("positions", []) or []
        except Exception as e:
            log.warning(f"[TSX] _positions_direct_rest error: {e}")
            return []

    async def _raw_positions_search(self) -> List[Dict[str, Any]]:
        """
        Raw REST call a /Position/searchOpen — bypassa la classe Position
        della SDK (che crasha con 'Position.__init__ got unexpected kwarg
        contractDisplayName' quando ProjectX API ha campi nuovi).
        
        Ritorna lista di dict con le posizioni aperte dell'account corrente,
        usando i campi raw del JSON di risposta.
        """
        if not self._suite:
            return []
        try:
            client = self._suite.client
            acc = getattr(client, "account_info", None)
            account_id = getattr(acc, "id", None) if acc else None
            if not account_id:
                return []
            payload = {"accountId": int(account_id)}
            resp = await client._make_request(
                "POST",
                "/Position/searchOpen",
                data=payload,
            )
            if not isinstance(resp, dict):
                return []
            positions_raw = resp.get("positions", []) or []
            return positions_raw  # lista di dict raw come ritornati da API
        except Exception as e:
            log.warning(f"[TSX] _raw_positions_search failed: {e}")
            return []

    async def _raw_partial_close(
        self,
        contract_id: str,
        close_size: int,
        account_id: Optional[int] = None,
    ) -> Dict[str, Any]:
        """
        Raw REST call a /Position/partialCloseContract — bypassa il wrapper
        SDK `close_position_by_contract` che internamente chiama
        `get_position` → `get_all_positions` → istanzia la classe Position
        → crasha su 'contractDisplayName' → ritorna None → wrapper risponde
        "No open position found" anche se la posizione esiste davvero.
        
        Endpoint confermato dal source SDK 3.5.9
        (position_manager/operations.py:267):
            POST /Position/partialCloseContract
            { "accountId": int, "contractId": str, "closeSize": int }
        
        Ritorna dict con:
          - success (bool)
          - orderId (str, se success)
          - errorMessage (str, se fail)
          - status_code (int, se errore HTTP)
          - server_response (dict, raw body server per diagnostica)
        """
        if not self._suite:
            return {"success": False, "errorMessage": "adapter not connected"}
        
        if close_size <= 0:
            return {"success": False, "errorMessage": f"close_size must be > 0, got {close_size}"}
        
        try:
            client = self._suite.client
            
            if account_id is None:
                acc = getattr(client, "account_info", None)
                account_id = getattr(acc, "id", None) if acc else None
                if not account_id:
                    return {"success": False, "errorMessage": "no account_id available"}
            
            payload = {
                "accountId": int(account_id),
                "contractId": str(contract_id),
                "closeSize": int(close_size),
            }
            
            log.info(
                f"[TSX] _raw_partial_close POST /Position/partialCloseContract "
                f"contract={contract_id} size={close_size}"
            )
            
            resp = await client._make_request(
                "POST",
                "/Position/partialCloseContract",
                data=payload,
            )
            
            # _make_request ritorna dict su 2xx, solleva su 4xx/5xx
            if isinstance(resp, dict):
                return resp
            return {"success": False, "errorMessage": "unexpected response shape", "raw": resp}
        
        except Exception as e:
            # Logga SEMPRE il body se è un HTTP error — serve per diagnostica
            err_str = f"{type(e).__name__}: {e}"
            log.error(f"[TSX] _raw_partial_close failed: {err_str}")
            return {"success": False, "errorMessage": err_str, "exception": True}

    async def positions_get(
        self, symbol: Optional[str] = None
    ) -> List[Dict[str, Any]]:
        """
        Lista posizioni (dict compat V14).
        
        NOTA: La SDK project_x_py 3.5.9 ha un bug noto — la classe Position
        crasha con 'unexpected keyword contractDisplayName' perché ProjectX
        API ha aggiunto campi nuovi. Il logger SDK assorbe l'eccezione e
        ritorna lista vuota silenziosamente (falso negativo pericoloso).
        
        Per questo motivo, saltiamo direttamente al raw REST che bypassa
        la classe Position e legge i campi raw dal JSON.
        """
        if not self._suite:
            return []
        
        # Build symbol <-> contract_id lookup
        contract_to_symbol = {
            info["contract_id"]: sym
            for sym, info in self._instrument_cache.items()
        }
        symbols_filter = {symbol} if symbol else None
        
        results: List[Dict[str, Any]] = []
        
        # ── DIRECT REST puro (bypass SDK rate limiter) ──
        # 🔧 V15 27/04: SDK._make_request usa rate_limiter.acquire() che
        # ritarda fino al timeout 5s. Direct REST puro evita la coda.
        raw_positions = await self._positions_direct_rest()
        if not raw_positions:
            # Fallback: prova SDK raw (potrebbe trovare cose se REST diretto fail)
            raw_positions = await self._raw_positions_search()
        for p in raw_positions:
            contract_id = p.get("contractId", "")
            sym = contract_to_symbol.get(contract_id)
            if not sym:
                continue  # posizione su contratto non tracciato
            if symbols_filter and sym not in symbols_filter:
                continue
            
            pos_type = p.get("type", 0)
            size_val = int(p.get("size", 0))
            is_buy = (pos_type == 1)
            signed_size = size_val if is_buy else -size_val
            
            results.append({
                "symbol":      sym,
                "contract_id": contract_id,
                "position_id": p.get("id"),
                "size":        signed_size,
                "abs_size":    size_val,
                "avg_price":   float(p.get("averagePrice", 0.0)),
                "type":        "BUY" if is_buy else "SELL",
                "is_buy":      is_buy,
                "account_id":  p.get("accountId"),
            })
        
        return results

    async def has_open_position(self, symbol: str) -> bool:
        pos = await self.positions_get(symbol)
        return len(pos) > 0

    # ── ORDERS ────────────────────────────────────────────────────

    async def place_market_bracket(
        self,
        symbol: str,
        side: str,
        size: int,
        sl_price: float,
        tp_price: float,
        # V15 structural-aware params (optional — default preserves old behavior):
        sl_absolute_price: Optional[float] = None,
        sl_ticks: Optional[int] = None,
        tp_ticks: Optional[int] = None,
        sl_source: str = "ATR",
    ) -> Dict[str, Any]:
        """
        Entry market + OCO SL + TP — REST-only implementation (no WS dependency).
        
        ═══════════════════════════════════════════════════════════════════════
        🎯 NON-WS IMPLEMENTATION (v15.1+) + STRUCTURAL-AWARE (v15.2+)
        ═══════════════════════════════════════════════════════════════════════
        
        Il metodo `place_bracket_order` nativo della SDK ASPETTA il fill event
        via WebSocket (timeout 60s). Con WS down → ordine viene davvero fillato
        dal broker ma la SDK lo dichiara "timeout" → orfani → duplicati al
        prossimo iter.
        
        Questa versione bypassa la dipendenza WS:
          1. Piazza market order entry via REST (place_market_order)
          2. Polling REST di positions_get() per confermare fill (1-3s)
          3. RICALCOLA SL/TP dal fill_price reale (se sl_ticks/tp_ticks passati)
          4. Se fill confermato → piazza SL stop order + TP limit order via REST
          5. Ritorna tutti gli IDs come l'API originale
        
        SL/TP Placement (v15.2+):
          - Se `sl_absolute_price` fornito (structural): piazza SL a quel prezzo
            ESATTAMENTE, indipendentemente dal fill_price.
          - Se `sl_ticks` fornito (ATR fallback): piazza SL a fill_price ± sl_ticks × tick
          - Se nulla fornito (legacy): usa sl_price passato as-is (vecchio comportamento, BUGGATO con slippage)
          - TP: sempre ricalcolato da fill_price con tp_ticks se fornito, altrimenti tp_price legacy
        
        Vantaggi:
          - Zero dipendenza WebSocket
          - Latenza fill detection 1-3s vs 60s
          - Zero orfani (se il market order non fillerà, positions_get lo vede)
          - SL/TP ancorati al FILL reale, non al prezzo Brain-time → edge preservato
        ═══════════════════════════════════════════════════════════════════════
        """
        side_int = self.SIDE_MAP.get(side.upper() if isinstance(side, str) else side)
        if side_int is None:
            return {"success": False, "error": f"invalid side: {side}"}

        info = self._instrument_cache.get(symbol)
        if not info:
            return {"success": False, "error": f"no contract cached for {symbol}"}

        tick_size = info["tick_size"]
        
        # Legacy fallback alignment (usato se sl_ticks/tp_ticks non forniti)
        sl_aligned = self._align_to_tick(symbol, sl_price)
        tp_aligned = self._align_to_tick(symbol, tp_price)
        
        # Per sicurezza: SL/TP devono essere sul lato giusto rispetto all'entry
        # side_int: 0=BUY, 1=SELL
        contract_id = info["contract_id"]
        ctx = self._get_ctx(symbol)

        import asyncio as _aio
        
        try:
            # ═══════════════════════════════════════════════════════════════
            # STEP 1: Piazza MARKET ENTRY (REST)
            # ═══════════════════════════════════════════════════════════════
            log.info(f"[TSX] place_market_bracket({symbol}) STEP1: market entry "
                     f"{side} {size}ct → contract={contract_id}")
            
            entry_resp = await ctx.orders.place_market_order(
                contract_id=contract_id,
                side=side_int,
                size=int(size),
            )
            
            entry_order_id = getattr(entry_resp, "orderId", None) or \
                             getattr(entry_resp, "order_id", None)
            
            if not entry_order_id:
                err = getattr(entry_resp, "errorMessage", None) or \
                      getattr(entry_resp, "error_message", None) or \
                      "no entry order ID returned"
                log.error(f"[TSX] place_market_order failed: {err}")
                return {"success": False, "error": f"entry order failed: {err}"}
            
            log.info(f"[TSX] ✅ Entry order placed: id={entry_order_id}")
            
            # ═══════════════════════════════════════════════════════════════
            # STEP 2: Polling REST per confermare fill (deadline wall-clock 15s)
            # ═══════════════════════════════════════════════════════════════
            fill_price = None
            position_found = False

            import time as _time
            poll_deadline = _time.monotonic() + 15.0
            attempt = 0

            while _time.monotonic() < poll_deadline:
                attempt += 1
                await _aio.sleep(0.5)
                try:
                    positions = await self.positions_get(symbol)
                    if positions:
                        pos = positions[0]
                        pos_size = abs(pos.get("size", 0))
                        if pos_size >= size:  # posizione aperta con size attesa
                            fill_price = float(pos.get("avg_price", 0))
                            position_found = True
                            log.info(f"[TSX] ✅ Fill confermato via REST (tentativo {attempt}): "
                                     f"size={pos_size}, avg_price={fill_price}")
                            break
                except Exception as e:
                    log.warning(f"[TSX] positions_get poll error (tentativo {attempt}): {e}")
            
            if not position_found:
                # Entry order piazzato MA non è ancora fillato in 15s.
                # Potrebbe essere ancora pending oppure davvero rifiutato.
                # Controlliamo status direttamente.
                try:
                    order_status = await ctx.orders.get_order_by_id(int(entry_order_id))
                    status_str = str(getattr(order_status, "status", "unknown"))
                    log.warning(f"[TSX] Entry order {entry_order_id} non fillato in 15s, status={status_str}")
                except Exception as e:
                    log.warning(f"[TSX] Cannot check order status: {e}")
                
                # Prova a cancellare per non lasciarlo pending all'infinito
                try:
                    await ctx.orders.cancel_order(int(entry_order_id))
                    log.info(f"[TSX] Entry order {entry_order_id} cancellato (non fillato)")
                except Exception as e:
                    log.warning(f"[TSX] Cancel order failed: {e}")
                
                return {
                    "success": False,
                    "entry_id": entry_order_id,
                    "error": f"entry order {entry_order_id} did not fill within 15s",
                }
            
            # ═══════════════════════════════════════════════════════════════
            # STEP 2.5: RICALCOLO SL/TP DAL FILL_PRICE (v15.2+ fix slippage)
            # ═══════════════════════════════════════════════════════════════
            # Le distanze che Brain ha calcolato (sl_ticks/tp_ticks) devono
            # essere misurate dal FILL reale, non dal prezzo Brain-time.
            # Per SL structural: usa prezzo assoluto (livello di invalidazione fisso).
            
            # TP: ricalcola sempre da fill_price se tp_ticks fornito
            if tp_ticks is not None and tp_ticks > 0:
                if side_int == 0:  # BUY → TP sopra
                    tp_aligned = self._align_to_tick(symbol, fill_price + tp_ticks * tick_size)
                else:  # SELL → TP sotto
                    tp_aligned = self._align_to_tick(symbol, fill_price - tp_ticks * tick_size)
                log.info(f"[TSX] TP ricalcolato da fill: fill={fill_price} + {tp_ticks}t = {tp_aligned}")
            # else: legacy — tp_aligned già assegnato sopra da tp_price
            
            # SL: due modalità
            if sl_absolute_price is not None:
                # STRUCTURAL: prezzo fisso, al livello di invalidazione
                sl_aligned = self._align_to_tick(symbol, sl_absolute_price)
                log.info(f"[TSX] SL STRUCTURAL @ {sl_aligned} (fisso, livello invalidazione)")
            elif sl_ticks is not None and sl_ticks > 0:
                # ATR: distanza costante dal fill
                if side_int == 0:  # BUY → SL sotto
                    sl_aligned = self._align_to_tick(symbol, fill_price - sl_ticks * tick_size)
                else:  # SELL → SL sopra
                    sl_aligned = self._align_to_tick(symbol, fill_price + sl_ticks * tick_size)
                log.info(f"[TSX] SL ATR ricalcolato da fill: fill={fill_price} ± {sl_ticks}t = {sl_aligned}")
            # else: legacy — sl_aligned già assegnato sopra da sl_price (BUGGY — slippage non compensato)
            
            # Safety check: SL/TP devono essere dalla parte giusta vs fill_price
            if side_int == 0:  # BUY
                if sl_aligned >= fill_price:
                    log.error(f"[TSX] 🚨 SL invalido BUY: sl={sl_aligned} >= fill={fill_price} — emergency close")
                    await self.close_position(symbol)
                    return {"success": False, "entry_id": entry_order_id,
                            "error": f"SL on wrong side: sl={sl_aligned} fill={fill_price}"}
                if tp_aligned <= fill_price:
                    log.warning(f"[TSX] ⚠️ TP dubbio BUY: tp={tp_aligned} <= fill={fill_price} — skip TP")
                    tp_aligned = None  # niente TP, solo SL
            else:  # SELL
                if sl_aligned <= fill_price:
                    log.error(f"[TSX] 🚨 SL invalido SELL: sl={sl_aligned} <= fill={fill_price} — emergency close")
                    await self.close_position(symbol)
                    return {"success": False, "entry_id": entry_order_id,
                            "error": f"SL on wrong side: sl={sl_aligned} fill={fill_price}"}
                if tp_aligned >= fill_price:
                    log.warning(f"[TSX] ⚠️ TP dubbio SELL: tp={tp_aligned} >= fill={fill_price} — skip TP")
                    tp_aligned = None
            
            # ═══════════════════════════════════════════════════════════════
            # STEP 3: Piazza STOP LOSS (REST)
            # ═══════════════════════════════════════════════════════════════
            # SL side = opposto dell'entry: se BUY entry → SELL stop; se SELL entry → BUY stop
            sl_side_int = 1 if side_int == 0 else 0  # flip
            stop_order_id = None
            try:
                log.info(f"[TSX] STEP3: piazzo STOP LOSS @ {sl_aligned}")
                sl_resp = await ctx.orders.place_stop_order(
                    contract_id=contract_id,
                    side=sl_side_int,
                    size=int(size),
                    stop_price=sl_aligned,
                )
                stop_order_id = getattr(sl_resp, "orderId", None) or \
                                getattr(sl_resp, "order_id", None)
                log.info(f"[TSX] ✅ Stop order placed: id={stop_order_id}")
            except Exception as e:
                log.error(f"[TSX] ⚠️  Stop order placement failed: {e}")
                # CRITICO: posizione aperta senza SL. Chiudiamo subito market per sicurezza.
                log.error(f"[TSX] 🚨 EMERGENCY CLOSE {symbol}: posizione senza SL!")
                try:
                    await self.close_position(symbol)
                    return {
                        "success": False,
                        "entry_id": entry_order_id,
                        "error": f"SL placement failed, emergency-closed position: {e}",
                    }
                except Exception as e2:
                    log.critical(f"[TSX] 🚨🚨 EMERGENCY CLOSE FAILED: {e2} — intervento MANUALE richiesto!")
                    return {
                        "success": False,
                        "entry_id": entry_order_id,
                        "error": f"SL failed AND emergency close failed: {e} / {e2}",
                    }
            
            # ═══════════════════════════════════════════════════════════════
            # STEP 4: Piazza TAKE PROFIT (REST)
            # ═══════════════════════════════════════════════════════════════
            # TP side = opposto dell'entry (come SL), ma è un limit order
            target_order_id = None
            if tp_aligned is not None:
                try:
                    log.info(f"[TSX] STEP4: piazzo TAKE PROFIT @ {tp_aligned}")
                    tp_resp = await ctx.orders.place_limit_order(
                        contract_id=contract_id,
                        side=sl_side_int,   # opposto dell'entry
                        size=int(size),
                        limit_price=tp_aligned,
                    )
                    target_order_id = getattr(tp_resp, "orderId", None) or \
                                      getattr(tp_resp, "order_id", None)
                    log.info(f"[TSX] ✅ TP order placed: id={target_order_id}")
                except Exception as e:
                    # TP failure è meno grave di SL failure: posizione è protetta dallo stop
                    log.error(f"[TSX] ⚠️  TP placement failed: {e} — posizione ha comunque SL attivo")
                    # Continua lo stesso: almeno abbiamo SL
            else:
                log.warning(f"[TSX] TP skipped per {symbol} (tp_aligned=None dal safety check)")
            
            # ═══════════════════════════════════════════════════════════════
            # STEP 5: Successo — ritorna tutti gli IDs
            # ═══════════════════════════════════════════════════════════════
            return {
                "success":     True,
                "entry_id":    entry_order_id,
                "stop_id":     stop_order_id,
                "target_id":   target_order_id,
                "entry_price": fill_price,
                "sl_price":    sl_aligned,
                "tp_price":    tp_aligned,
                "sl_source":   sl_source,
                "error":       None,
            }
        
        except Exception as e:
            log.exception(f"[TSX] place_market_bracket({symbol}) unexpected error: {e}")
            return {"success": False, "error": str(e)}

    async def partial_close_via_opposite_order(
        self,
        symbol: str,
        direction: str,
        contracts_to_close: int,
        residual_contracts: int,
        new_sl_price: float,
        new_tp_price: float,
        old_stop_order_id: Optional[str] = None,
        old_target_order_id: Optional[str] = None,
    ) -> Dict[str, Any]:
        """V18 12-mag — partial close via opposite market order + bracket rebuild.

        Workaround per `/Position/partialCloseContract` che ritorna 400 su
        alcuni asset ProjectX (6A confermato). Sostituisce la chiamata raw
        con un flusso a 4 step:

          STEP 1: market opposite-direction × contracts_to_close (chiude metà)
          STEP 2: cancel old SL + cancel old TP (bracket originale)
          STEP 3: nuovo STOP @ new_sl_price × residual_contracts
          STEP 4: nuovo LIMIT @ new_tp_price × residual_contracts

        Se STEP 1 fallisce: ritorna fallimento, niente effetti.
        Se STEP 1 ok ma STEP 3 (SL) fallisce: emergency close del residuo
        per non lasciare posizione senza SL.
        STEP 2 e STEP 4 sono best-effort: SL già piazzato copre il rischio.
        """
        entry_side_int = self.SIDE_MAP.get(
            direction.upper() if isinstance(direction, str) else direction
        )
        if entry_side_int is None:
            return {"success": False, "error": f"invalid direction: {direction}"}
        opposite_side_int = 1 if entry_side_int == 0 else 0

        info = self._instrument_cache.get(symbol)
        if not info:
            return {"success": False, "error": f"no contract cached for {symbol}"}
        contract_id = info["contract_id"]
        ctx = self._get_ctx(symbol)

        # ═══ STEP 1: closing market order ═══
        try:
            log.info(
                f"[TSX] partial_close_via_opposite_order({symbol}) STEP1: "
                f"opposite market {contracts_to_close}ct (orig dir={direction})"
            )
            close_resp = await ctx.orders.place_market_order(
                contract_id=contract_id,
                side=opposite_side_int,
                size=int(contracts_to_close),
            )
            closing_order_id = (
                getattr(close_resp, "orderId", None)
                or getattr(close_resp, "order_id", None)
            )
            if not closing_order_id:
                err = (
                    getattr(close_resp, "errorMessage", None)
                    or getattr(close_resp, "error_message", None)
                    or "no order id from closing market"
                )
                log.error(f"[TSX] partial close STEP1 failed: {err}")
                return {"success": False, "error": f"closing market failed: {err}"}
        except Exception as e:
            log.error(f"[TSX] partial close STEP1 exception: {e}")
            return {"success": False, "error": f"closing market raised: {e}"}

        # ═══ STEP 2: cancel original SL + TP (best-effort) ═══
        if old_stop_order_id:
            try:
                await ctx.orders.cancel_order(int(old_stop_order_id))
                log.info(f"[TSX] partial close STEP2: cancelled old SL {old_stop_order_id}")
            except Exception as e:
                log.warning(
                    f"[TSX] partial close STEP2 cancel old SL {old_stop_order_id} "
                    f"failed (benigno se già fillato): {e}"
                )
        if old_target_order_id:
            try:
                await ctx.orders.cancel_order(int(old_target_order_id))
                log.info(f"[TSX] partial close STEP2: cancelled old TP {old_target_order_id}")
            except Exception as e:
                log.warning(
                    f"[TSX] partial close STEP2 cancel old TP {old_target_order_id} "
                    f"failed (benigno se già fillato): {e}"
                )

        # V18 12-mag — safety-net sweep: cancella TUTTI gli ordini aperti
        # rimanenti sul contract. Cattura orfani lasciati da bracket
        # precedenti (incidente MGC: TP orfano 4ct @ 4697.7 sopravvissuto
        # al partial perché non era né old_stop_order_id né
        # old_target_order_id). Best-effort: se la sweep fallisce, il
        # nuovo SL piazzato al STEP 3 copre comunque il rischio del
        # residuo.
        try:
            sweep = await self.cancel_all_orders_for_contract(symbol)
            swept = int(sweep.get("cancelled_count", 0))
            if swept > 0:
                log.warning(
                    f"[TSX] partial close STEP2 sweep: cancellati {swept} ordini "
                    f"orfani residui su {symbol} ({sweep.get('order_ids')})"
                )
        except Exception as e:
            log.warning(
                f"[TSX] partial close STEP2 orphan sweep failed: {e} "
                f"(non blocking — new SL at STEP3 protegge il residuo)"
            )

        # ═══ STEP 3: new STOP for residual ═══
        new_sl_aligned = self._align_to_tick(symbol, new_sl_price)
        new_stop_id: Optional[str] = None
        try:
            log.info(
                f"[TSX] partial close STEP3: new SL @ {new_sl_aligned} × {residual_contracts}ct"
            )
            sl_resp = await ctx.orders.place_stop_order(
                contract_id=contract_id,
                side=opposite_side_int,
                size=int(residual_contracts),
                stop_price=new_sl_aligned,
            )
            new_stop_id = (
                getattr(sl_resp, "orderId", None)
                or getattr(sl_resp, "order_id", None)
            )
            if not new_stop_id:
                raise RuntimeError("place_stop_order returned no id")
            log.info(f"[TSX] ✅ New SL placed: id={new_stop_id}")
        except Exception as e:
            log.error(
                f"[TSX] 🚨 partial close STEP3 FAILED: {e} — residuo senza SL, emergency close"
            )
            try:
                await self.close_position(symbol)
            except Exception as e2:
                log.critical(
                    f"[TSX] 🚨🚨 EMERGENCY CLOSE FAILED post-partial: {e2} — manuale!"
                )
            return {
                "success": False,
                "closing_order_id": closing_order_id,
                "error": f"new SL placement failed after partial: {e}",
            }

        # ═══ STEP 4: new TP for residual (best-effort) ═══
        new_tp_aligned = self._align_to_tick(symbol, new_tp_price)
        new_target_id: Optional[str] = None
        try:
            log.info(
                f"[TSX] partial close STEP4: new TP @ {new_tp_aligned} × {residual_contracts}ct"
            )
            tp_resp = await ctx.orders.place_limit_order(
                contract_id=contract_id,
                side=opposite_side_int,
                size=int(residual_contracts),
                limit_price=new_tp_aligned,
            )
            new_target_id = (
                getattr(tp_resp, "orderId", None)
                or getattr(tp_resp, "order_id", None)
            )
            log.info(f"[TSX] ✅ New TP placed: id={new_target_id}")
        except Exception as e:
            log.warning(
                f"[TSX] partial close STEP4 (TP) failed: {e} — residuo coperto da SL"
            )

        return {
            "success": True,
            "closing_order_id": closing_order_id,
            "new_stop_id": new_stop_id,
            "new_target_id": new_target_id,
            "new_sl_price": new_sl_aligned,
            "new_tp_price": new_tp_aligned,
            "error": None,
        }

    async def modify_sl(
        self, symbol: str, stop_order_id: int, new_sl: float
    ) -> bool:
        new_sl_aligned = self._align_to_tick(symbol, new_sl)
        try:
            ctx = self._get_ctx(symbol)
            return bool(await ctx.orders.modify_order(
                order_id=int(stop_order_id),
                stop_price=new_sl_aligned,
            ))
        except Exception as e:
            log.error(f"[TSX] modify_sl failed: {e}")
            return False

    async def cancel_order(
        self, symbol: str, order_id: int
    ) -> bool:
        """
        Cancella un ordine pendente (usato per cleanup SL/TP orfani).
        """
        try:
            ctx = self._get_ctx(symbol)
            result = await ctx.orders.cancel_order(int(order_id))
            log.info(f"[TSX] ✅ Cancelled order {order_id} on {symbol}")
            return True
        except Exception as e:
            # Può fallire se l'ordine era già fillato o cancellato (benigno)
            err_msg = str(e).lower()
            if "already" in err_msg or "filled" in err_msg or "not found" in err_msg:
                log.info(f"[TSX] Order {order_id} già fillato/cancellato (benigno): {e}")
                return True
            log.warning(f"[TSX] cancel_order({order_id}) failed: {e}")
            return False

    async def cancel_all_orders_for_contract(
        self, symbol: str
    ) -> Dict[str, Any]:
        """
        Cancella TUTTI gli ordini pendenti sul contract (senza bisogno di IDs).
        
        Usato per cleanup orfani quando SL/TP natural-hit chiudono la posizione
        broker-side senza passare per close_position() di V15.
        
        Strategia:
          1. Raw REST POST /Order/searchOpen per listare ordini pendenti
          2. Filtra per accountId e contractId del symbol
          3. Cancella ciascun ordine via ctx.orders.cancel_order
        
        Ritorna:
          {"cancelled_count": int, "order_ids": [int], "error": Optional[str]}
        """
        info = self._instrument_cache.get(symbol)
        if not info:
            return {"cancelled_count": 0, "order_ids": [], "error": f"no contract for {symbol}"}
        
        contract_id = info["contract_id"]
        
        # Step 1: trova ordini pendenti via raw REST
        open_orders: List[int] = []
        try:
            client = self._suite.client
            acc = getattr(client, "account_info", None)
            account_id = getattr(acc, "id", None) if acc else None
            if not account_id:
                return {"cancelled_count": 0, "order_ids": [], "error": "no account_id"}
            
            payload = {"accountId": int(account_id)}
            resp = await client._make_request(
                "POST",
                "/Order/searchOpen",
                data=payload,
            )
            
            if isinstance(resp, dict):
                orders_raw = resp.get("orders", []) or []
                for order in orders_raw:
                    if not isinstance(order, dict):
                        continue
                    # Filtra solo ordini per questo contract
                    order_contract = order.get("contractId") or order.get("contract_id")
                    if order_contract != contract_id:
                        continue
                    order_id = order.get("id")
                    if order_id is not None:
                        open_orders.append(int(order_id))
        except Exception as e:
            log.warning(f"[TSX] cancel_all: search failed for {symbol}: {e}")
            return {"cancelled_count": 0, "order_ids": [], "error": str(e)}
        
        if not open_orders:
            log.info(f"[TSX] cancel_all {symbol}: nessun ordine pendente")
            return {"cancelled_count": 0, "order_ids": [], "error": None}
        
        # Step 2: cancella ciascun ordine
        log.info(f"[TSX] cancel_all {symbol}: trovati {len(open_orders)} ordini → {open_orders}")
        cancelled = 0
        cancelled_ids = []
        for order_id in open_orders:
            try:
                ok = await self.cancel_order(symbol, order_id)
                if ok:
                    cancelled += 1
                    cancelled_ids.append(order_id)
            except Exception as e:
                log.warning(f"[TSX] cancel_all {symbol}: order {order_id} failed: {e}")
        
        return {
            "cancelled_count": cancelled,
            "order_ids":       cancelled_ids,
            "error":           None,
        }

    async def close_position(
        self,
        symbol: str,
        size: Optional[int] = None,
        bracket_order_ids: Optional[List[int]] = None,
    ) -> Dict[str, Any]:
        """
        size=None → close full
        size=int  → close partial
        
        bracket_order_ids: lista di order IDs (SL, TP) da cancellare dopo la close.
                           Evita ordini orfani quando chiudiamo la posizione a market
                           invece di far scattare SL/TP naturalmente.
                           Per partial close, NON cancellare (gli SL/TP rimangono
                           validi per la size rimanente).
        """
        info = self._instrument_cache.get(symbol)
        if not info:
            return {"success": False, "error": f"no contract for {symbol}"}
        
        contract_id = info["contract_id"]
        try:
            # ── PARTIAL CLOSE: usa raw REST (bypassa bug SDK 3.5.9) ──
            # ctx.positions.close_position_by_contract(close_size=X) chiama
            # internamente get_position() → crasha su 'contractDisplayName'
            # → ritorna None → wrapper risponde "No open position found"
            # anche se la posizione esiste. Evitiamo del tutto.
            if size is not None:
                raw_resp = await self._raw_partial_close(
                    contract_id=contract_id,
                    close_size=int(size),
                )
                success = bool(raw_resp.get("success", False))
                if not success:
                    log.error(
                        f"[TSX] partial close {symbol} size={size} failed: "
                        f"{raw_resp.get('errorMessage', 'unknown')} "
                        f"(server={raw_resp.get('server_response', {})})"
                    )
                return {
                    "success":         success,
                    "mode":            "partial",
                    "closed_size":     size,
                    "cancelled_count": 0,  # SL/TP restano attivi sulla size residua
                    "raw":             raw_resp,
                    "errorMessage":    raw_resp.get("errorMessage"),
                    "orderId":         raw_resp.get("orderId"),
                }
            
            # ── FULL CLOSE: resta su SDK (funziona, non tocchiamo) ──
            ctx = self._get_ctx(symbol)
            resp = await ctx.positions.close_position_by_contract(
                contract_id=contract_id,
                close_size=size,
            )
            
            # 🧹 CLEANUP: cancella SL/TP orfani dopo close FULL
            # (per partial skip: gli ordini servono ancora per la size rimanente)
            cancelled_count = 0
            if bracket_order_ids and size is None:
                log.info(f"[TSX] Cleanup bracket orders per {symbol}: {bracket_order_ids}")
                for order_id in bracket_order_ids:
                    if order_id is None:
                        continue
                    try:
                        ok = await self.cancel_order(symbol, int(order_id))
                        if ok:
                            cancelled_count += 1
                    except Exception as e:
                        log.warning(f"[TSX] cleanup order {order_id}: {e}")
            
            return {
                "success":         bool(resp.get("success", True)) if isinstance(resp, dict) else True,
                "mode":            "full",
                "closed_size":     size,
                "cancelled_count": cancelled_count,
                "raw":             resp,
            }
        except Exception as e:
            log.error(f"[TSX] close_position({symbol}) failed: {e}")
            return {"success": False, "error": str(e)}

    # ── ORDERFLOW (richiede orderbook feature) ────────────────────

    async def get_cumulative_delta(
        self, symbol: str, minutes: int = 60
    ) -> Optional[float]:
        if not self._enable_orderbook:
            return None
        try:
            ctx = self._get_ctx(symbol)
            ob = getattr(ctx, "orderbook", None)
            if ob is None:
                return None
            result = await ob.get_cumulative_delta(time_window_minutes=minutes)
            return float(result) if result is not None else None
        except Exception as e:
            log.debug(f"[TSX] get_cumulative_delta({symbol}) failed: {e}")
            return None

    async def get_trade_flow_summary(self, symbol: str) -> Dict[str, Any]:
        if not self._enable_orderbook:
            return {}
        try:
            ctx = self._get_ctx(symbol)
            ob = getattr(ctx, "orderbook", None)
            if ob is None:
                return {}
            return await ob.get_trade_flow_summary() or {}
        except Exception:
            return {}

    async def get_liquidity_levels(self, symbol: str) -> List[Dict[str, Any]]:
        if not self._enable_orderbook:
            return []
        try:
            ctx = self._get_ctx(symbol)
            ob = getattr(ctx, "orderbook", None)
            if ob is None:
                return []
            return await ob.get_liquidity_levels() or []
        except Exception:
            return []

    # ── CONTEXT MANAGER ───────────────────────────────────────────
    
    async def __aenter__(self):
        return self
    
    async def __aexit__(self, exc_type, exc, tb):
        await self.disconnect()
