o
    \h                      @   s   d dl Z d dlZd dlZd dlmZ d dlZd dlmZmZm	Z	 zd dl
mZ W n ey7   d dlmZ Y nw dZdZG dd deZdS )	    N)Thread)APIErrorDatetimeSerializer
batch_post)Emptyi  i  P c                   @   s\   e Zd ZdZedZ								dd	d
Zdd Zdd Z	dd Z
dd Zdd ZdS )Consumerz.Consumes the messages from the client's queue.posthogd   N      ?F
      c                 C   sV   t |  d| _|| _|| _|| _|| _|| _|| _|| _	d| _
|| _|	| _|
| _dS )zCreate a consumer thread.TN)r   __init__daemonflush_atflush_intervalapi_keyhoston_errorqueuegziprunningretriestimeouthistorical_migration)selfr   r   r   r   r   r   r   r   r   r    r   S/home/air/segue/gemini/backup/venv/lib/python3.10/site-packages/posthog/consumer.pyr      s   

zConsumer.__init__c                 C   s0   | j d | jr|   | js	| j d dS )zRuns the consumer.zconsumer is running...zconsumer exited.N)logdebugr   uploadr   r   r   r   run=   s
   zConsumer.runc                 C   s
   d| _ dS )zPause the consumer.FN)r   r    r   r   r   pauseE   s   
zConsumer.pausec              
   C   s   d}|   }t|dkrdS z;z	| | d}W n$ ty< } z| jd| d}| jr2| || W Y d}~nd}~ww W |D ]}| j  q@|S |D ]}| j  qL|     Y S )z:Upload the next batch of items, return whether successful.Fr   Tzerror uploading: %sN)	nextlenrequest	Exceptionr   errorr   r   	task_done)r   successbatcheitemr   r   r   r   I   s*   
zConsumer.uploadc                 C   s   | j }g }t }d}t|| jk rot | }|| jkr 	 |S z=|jd| j| d}ttj|t	d
 }|tkrE| jdt| W q|| ||7 }|tkr\| jd| W |S W n
 tyg   Y |S w t|| jk s|S )z)Return the next batch of items to upload.r   T)blockr   )clsz)Item exceeds 900kib limit, dropping. (%s)zhit batch size limit (size: %d))r   time	monotonicr$   r   r   getjsondumpsr   encodeMAX_MSG_SIZEr   r'   strappendBATCH_SIZE_LIMITr   r   )r   r   items
start_time
total_sizeelapsedr,   	item_sizer   r   r   r#   ^   s<   

zConsumer.nextc                    s:   dd }t jt jtjd |d fdd}|  dS )z=Attempt to upload the batch and retry before raising an errorc                 S   s@   t | tr| jdkrdS d| j  kodk n  o| jdkS dS )NzN/AFi  i  i  )
isinstancer   status)excr   r   r   fatal_exception   s
   

$z)Consumer.request.<locals>.fatal_exception   )	max_triesgiveupc                      s"   t jjjj jd d S )N)r   r   r*   r   )r   r   r   r   r   r   r   r*   r   r   r   send_request   s   
z&Consumer.request.<locals>.send_requestN)backoffon_exceptionexpor&   r   )r   r*   rA   rF   r   rE   r   r%   |   s   

zConsumer.request)r	   NNr
   Fr   r   F)__name__
__module____qualname____doc__logging	getLoggerr   r   r!   r"   r   r#   r%   r   r   r   r   r      s"    

!r   )r2   rN   r/   	threadingr   rG   posthog.requestr   r   r   r   r   ImportErrorQueuer5   r8   r   r   r   r   r   <module>   s    