o
    3IhK#                     @   s<  d dl Z d dlZd dlmZmZmZmZmZ d dlZd dl	m	Z	 d dl
mZmZmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dl m!Z!m"Z"m#Z#m$Z$m%Z% d dl&m'Z' dee dee fddZ(dee dee) fddZ*edZ+edZ,G dd deZ-dS )    N)CallableDictListOptionalTypeVar)	overrides)	GetResultMetadataQueryResult)System)Executor)Scan)	CountPlanGetPlanKNNPlan)convert)QueryExecutorStub)DistributedSegmentManager)OtelInterceptor)RetryCallStateRetryingstop_after_attemptwait_exponential_jitterretry_if_exception)Spanmetadatareturnc                 C   sD   | sdS i }|   D ]\}}|ds|||< q
t|dkr dS |S )z[Remove any chroma-specific metadata keys that the client shouldn't see from a metadata map.Nzchroma:r   )items
startswithlen)r   resultkv r#   g/home/air/sanwanet/gpt-api/venv/lib/python3.10/site-packages/chromadb/execution/executor/distributed.py_clean_metadata   s   
r%   c                 C   s   | rd| v rt | d S dS )z-Retrieve the uri (if any) from a Metadata mapz
chroma:uriN)strr   r#   r#   r$   _uri&   s   r(   IOc                       s   e Zd ZU ejed< eeef ed< e	ed< e
ed< e
ed< def fddZd	eeegef  d
edefddZedede
fddZededefddZededefddZdedee fddZdedefddZ  ZS )DistributedExecutor_mtx_grpc_stub_pool_manager_request_timeout_seconds_query_replication_factorsystemc                    sH   t  | t | _i | _| t| _|j	d| _
|j	d| _d S )N$chroma_query_request_timeout_secondschroma_query_replication_factor)super__init__	threadingLockr,   r-   requirer   r.   settingsr/   r0   )selfr1   	__class__r#   r$   r5   :   s   

zDistributedExecutor.__init__funcsargsr   c              	      s   d}d dt ddf fdd}ttdtddd	d
tdd |dD ].} dur.   d | ||t|  |W  d     S 1 sGw   Y  |d7 }q"td)z
        Retry a list of functions in a round-robin fashion until one of them succeeds.

        funcs: List of functions to retry
        args: Arguments to pass to each function

        r   N_r   c                    s&   ddl m} |d ur|d d S d S )Nr   )tracerzWaiting to retry RPC) chromadb.telemetry.opentelemetryr@   
start_span)r?   r@   
sleep_spanr#   r$   before_sleepQ   s   z<DistributedExecutor._round_robin_retry.<locals>.before_sleep   g?)jitterTc                 S   s$   t | tjo|  tjjtjjfv S N)
isinstancegrpcRpcErrorcode
StatusCodeUNAVAILABLEUNKNOWN)xr#   r#   r$   <lambda>`   s    z8DistributedExecutor._round_robin_retry.<locals>.<lambda>)stopwaitreraiseretryrE      z0Unreachable code error - should never reach here)r   r   r   r   r   endr   	Exception)r:   r=   r>   attempt_countrE   attemptr#   rC   r$   _round_robin_retryF   s(   



$
z&DistributedExecutor._round_robin_retryplanc                    s:     |j} fdd|D } |t|}t|S )Nc                       g | ]}  |jqS r#   )	_get_stubCount.0endpointr:   r#   r$   
<listcomp>s       z-DistributedExecutor.count.<locals>.<listcomp>)_get_grpc_endpointsscanr[   r   to_proto_count_planfrom_proto_count_result)r:   r\   	endpointscount_funcscount_resultr#   rc   r$   countp   s   

zDistributedExecutor.countc              	      s     |j} fdd|D } |t|}t|}dd |D }|jjr/dd |D nd }|jjr<dd |D nd }|jj	rIdd |D nd }	|jj
rVdd |D nd }
t||||	d |
|jjdS )	Nc                    r]   r#   )r^   Getr`   rc   r#   r$   rd   |   re   z+DistributedExecutor.get.<locals>.<listcomp>c                 S      g | ]}|d  qS )idr#   ra   recordr#   r#   r$   rd          c                 S   ro   )	embeddingr#   rq   r#   r#   r$   rd      rs   c                 S   ro   )documentr#   rq   r#   r#   r$   rd      rs   c                 S      g | ]}t |d  qS r'   r(   rq   r#   r#   r$   rd      re   c                 S   rv   r'   r%   rq   r#   r#   r$   rd      re   )ids
embeddings	documentsurisdata	metadatasincluded)rf   rg   r[   r   to_proto_get_planfrom_proto_get_result
projectionrt   ru   urir   r   r   )r:   r\   rj   	get_funcs
get_resultrecordsry   rz   r{   r|   r~   r#   rc   r$   gety   s<   
zDistributedExecutor.getc              
      s     |j} fdd|D } |t|}t|}dd |D }|jjr/dd |D nd }|jjr<dd |D nd }|jj	rIdd |D nd }	|jj
rVdd |D nd }
|jjrcdd |D nd }t||||	d |
||jjd	S )
Nc                    r]   r#   )r^   KNNr`   rc   r#   r$   rd      re   z+DistributedExecutor.knn.<locals>.<listcomp>c                 S      g | ]	}d d |D qS )c                 S      g | ]}|d  d qS )rr   rp   r#   rq   r#   r#   r$   rd      re   6DistributedExecutor.knn.<locals>.<listcomp>.<listcomp>r#   ra   r   r#   r#   r$   rd          c                 S   r   )c                 S   r   )rr   rt   r#   rq   r#   r#   r$   rd      re   r   r#   r   r#   r#   r$   rd          c                 S   r   )c                 S   r   )rr   ru   r#   rq   r#   r#   r$   rd      re   r   r#   r   r#   r#   r$   rd      r   c                 S   r   )c                 S      g | ]
}t |d  d qS rr   r   rw   rq   r#   r#   r$   rd          r   r#   r   r#   r#   r$   rd      r   c                 S   r   )c                 S   r   r   rx   rq   r#   r#   r$   rd      r   r   r#   r   r#   r#   r$   rd      r   c                 S   r   )c                 S   ro   )distancer#   rq   r#   r#   r$   rd      rs   r   r#   r   r#   r#   r$   rd      r   )ry   rz   r{   r|   r}   r~   	distancesr   )rf   rg   r[   r   to_proto_knn_planfrom_proto_knn_batch_resultr   rt   ru   r   r   rankr
   r   )r:   r\   rj   	knn_funcs
knn_resultresultsry   rz   r{   r|   r~   r   r#   rc   r$   knn   sV   

zDistributedExecutor.knnrg   c                 C   s    | j |j| j}t| |S rH   )r.   get_endpointsrr   r0   randomshuffle)r:   rg   	grpc_urlsr#   r#   r$   rf      s
   
z'DistributedExecutor._get_grpc_endpointsgrpc_urlc                 C   sx   | j / || jvr&tj|ddgd}t g}tj|g|R  }t|| j|< | j| W  d    S 1 s5w   Y  d S )N)zgrpc.max_concurrent_streamsi  )zgrpc.max_receive_message_lengthi H)options)r,   r-   rJ   insecure_channelr   intercept_channelr   )r:   r   channelinterceptorsr#   r#   r$   r^      s   
$zDistributedExecutor._get_stub)__name__
__module____qualname__r6   r7   __annotations__r   r&   r   r   intr   r5   r   r   r)   r*   r[   r   r   rm   r   r   r   r   r
   r   r   rf   r^   __classcell__r#   r#   r;   r$   r+   3   s    
 
$*'9
r+   ).r6   r   typingr   r   r   r   r   rJ   r   chromadb.api.typesr   r	   r
   chromadb.configr   $chromadb.execution.executor.abstractr   &chromadb.execution.expression.operatorr   "chromadb.execution.expression.planr   r   r   chromadb.protor   &chromadb.proto.query_executor_pb2_grpcr   )chromadb.segment.impl.manager.distributedr   %chromadb.telemetry.opentelemetry.grpcr   tenacityr   r   r   r   r   opentelemetry.tracer   r%   r&   r(   r)   r*   r+   r#   r#   r#   r$   <module>   s*    	