
    i                        d Z ddlmZ ddlZddlZddlZddlmZ erddlm	Z	 ddl
ZddlmZ  ej                  e      Z G d d	ej"                        Zy)
zAConsumer thread for processing live data with callback functions.    )annotationsN)TYPE_CHECKING)Callable   )Seisc                  b     e Zd ZdZ	 	 	 	 	 	 d	 fdZd
dZd
dZddZddZdddZ	ddZ
 xZS )ConsumeraK  Threaded consumer for processing Seis data via callback.

    Runs in a separate thread to process incoming data bars from a Seis
    by calling a user-provided callback function. Uses a queue for
    thread-safe data passing.

    Args:
        seis: Seis instance this consumer processes data from
        callback: Function(seis, data) to call with new data bars

    Example:
        >>> def my_callback(seis, data):
        ...     print(f"Received bar: {data}")
        >>> consumer = Consumer(seis, my_callback)
        >>> consumer.start()
        >>> consumer.put(new_data)
    c                L   t         |   d       t        j                         | _        || _        || _        | j                  j                   d| j
                  j                   d| j
                  j                   d| j
                  j                  j                   | _        y)zInitialize consumer thread.

        Args:
            seis: Seis to consume data from
            callback: Callback function for new data
        T)daemon_N)super__init__queueQueue_bufferseiscallback__name__symbolexchangeintervalvaluename)selfr   r   	__class__s      L/home/work/apex_v16/venv/lib/python3.12/site-packages/tvDatafeed/consumer.pyr   zConsumer.__init__%   s     	%9>	  }}%%&ayy yy!!"!yy!!''(* 		    c                P    d| j                   d| j                  j                   dS )zReturn detailed representation.z	Consumer(z, )r   r   r   r   s    r   __repr__zConsumer.__repr__>   s&    499-r$--*@*@)ACCr   c                L    | j                    d| j                  j                   S )zReturn string representation.z, callback=r    r!   s    r   __str__zConsumer.__str__B   s"    ))K(>(>'?@@r   c                D   t         j                  d| j                         	 | j                  j	                         }|!t         j                  d| j                         n	 | j                  | j                  |       [t         j                  d| j                         d| _        d| _        d| _        y# t        $ rc}t         j                  d| j
                  j                  | j                  |d       	 | j                          n# t        $ r Y nw xY wY d}~d}~ww xY w)zMain thread loop - processes data from queue.

        Continuously pulls data from the queue and calls the callback
        function. Exits when None is received (shutdown signal).
        zConsumer thread %s startedTNz+Consumer thread %s received shutdown signalz'Callback %s raised exception for %s: %s)exc_infozConsumer thread %s exiting)loggerdebugr   r   getr   r   	Exceptionerrorr   del_consumer)r   dataes      r   runzConsumer.runF   s     	1499=<<##%D |JDIIVdii. 4 	1499=	'  =MM**II!  %%'  s<    B3 3	D<8D5DD	DDDDDc                T    | j                   | j                   j                  |       yy)z{Add data to processing queue.

        Args:
            data: DataFrame with bar data, or None to signal shutdown
        Nr   put)r   r-   s     r   r2   zConsumer.putm   s%     <<#LLT" $r   c                T    | j                   y| j                   j                  | |      S )zStop thread and remove from Seis.

        Args:
            timeout: Maximum wait time in seconds (-1 for blocking)

        Returns:
            True if successful, False if timeout
        T)r   r,   )r   timeouts     r   r,   zConsumer.del_consumerv   s(     99yy%%dG44r   c                T    | j                   | j                   j                  d       yy)zcSignal thread to stop processing.

        Sends None to queue which triggers thread exit.
        Nr1   r!   s    r   stopzConsumer.stop   s%    
 <<#LLT" $r   )r   r   r   z$Callable[[Seis, pd.DataFrame], None]returnNone)r7   str)r7   r8   )r-   zpd.DataFrame | Noner7   r8   ))r4   intr7   bool)r   
__module____qualname____doc__r   r"   r$   r/   r2   r,   r6   __classcell__)r   s   @r   r	   r	      sJ    $

 7
 
	
2DA%N#5#r   r	   )r?   
__future__r   loggingr   	threadingtypingr   collections.abcr   pandaspdr   r   	getLoggerr   r'   Threadr	    r   r   <module>rK      sI    G "     (			8	$w#y w#r   