"""
Volume-based indicators — port fedele da V15.

Currently exposes `calc_absorption`: detects when a directional candle
is absorbed by the opposing side (high volume + small range = exhaustion).
"""

from __future__ import annotations

import pandas as pd


VOL_ABSORPTION_VOL_MULTIPLIER = 1.5   # last vol > 1.5x 20-bar avg
VOL_ABSORPTION_RANGE_RATIO    = 0.5   # range < 0.5x ATR
VOL_ABSORPTION_LOOKBACK       = 20
VOL_ABSORPTION_MIN_BODY_RATIO = 0.1   # body must be > 10% of range (skip doji)


def calc_absorption(df: pd.DataFrame, atr: float) -> tuple[bool, bool]:
    """
    Returns (buy_absorption, sell_absorption) on the last bar.

    BUY_ABSORPTION  := close > open  AND  vol > 1.5x avg20  AND  range < 0.5x ATR
    SELL_ABSORPTION := close < open  AND  vol > 1.5x avg20  AND  range < 0.5x ATR

    Doji bodies (body <= 10% of range) are excluded — no directional bias.

    Semantics: a directional candle backed by heavy volume but compressed
    range means the opposing side is absorbing the push — exhaustion signal,
    contrarian.

    Safe defaults:
      - missing volume column        → (False, False)
      - rolling NaN (df < 20 bars)   → (False, False)
      - atr <= 0                     → (False, False)
    """
    if atr <= 0 or len(df) < VOL_ABSORPTION_LOOKBACK:
        return False, False

    col = "volume" if "volume" in df.columns else "tick_volume"
    if col not in df.columns:
        return False, False

    avg_vol = df[col].rolling(VOL_ABSORPTION_LOOKBACK).mean().iloc[-1]
    if pd.isna(avg_vol) or avg_vol <= 0:
        return False, False

    last_vol = float(df[col].iloc[-1])
    o = float(df["open"].iloc[-1])
    h = float(df["high"].iloc[-1])
    l = float(df["low"].iloc[-1])
    c = float(df["close"].iloc[-1])

    rng = h - l
    if rng <= 0:
        return False, False

    body = abs(c - o)
    if body <= VOL_ABSORPTION_MIN_BODY_RATIO * rng:
        return False, False

    high_vol = last_vol > VOL_ABSORPTION_VOL_MULTIPLIER * float(avg_vol)
    small_range = rng < VOL_ABSORPTION_RANGE_RATIO * atr
    if not (high_vol and small_range):
        return False, False

    buy_abs = c > o
    sell_abs = c < o
    return bool(buy_abs), bool(sell_abs)
