o
    3Ih}K                     @   s`  d dl mZ d dlZd dlmZmZ d dlmZmZm	Z	 d dl
mZ d dlmZmZmZmZmZ d dlmZmZmZmZmZ d dlmZ d d	lmZmZmZ d d
lmZ d dl m!Z! d dl"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z( d dl)m*Z* d dl+m,Z,m-Z- d dl)Z)d dl.Z.d dl/m0Z0 e.1e2Z3ej4d ej5dej6dej7diZ8dd e89 D Z:dZ;G dd deeeZ<dS )    )cached_propertyN)ConfigurationParameter$EmbeddingsQueueConfigurationInternal)SqlDBParameterValueget_sql)BatchSizeExceededError)ProducerConsumerConsumerCallbackFndecode_vectorencode_vector)OperationRecord	LogRecordScalarEncodingSeqId	Operation)System)OpenTelemetryClientOpenTelemetryGranularitytrace_method)override)defaultdict)SequenceOptionalDictSetTuplecast)UUID)Table	functions)create_topic_name         c                 C   s   i | ]\}}||qS  r&   ).0kvr&   r&   c/home/air/sanwanet/gpt-api/venv/lib/python3.10/site-packages/chromadb/db/mixins/embeddings_queue.py
<dictcomp>/   s    r+   Fc                       s  e Zd ZU dZG dd dZeeee f ed< e	e
 ed< eed< eed< dZd	ef fd
dZedejedA fddZedejededdfddZedejededdfddZedejedededefddZedejededee dee fddZedeje			dBdedede	e de	e d e	e defd!dZed"ejed#eddfd$d"Zedefd%d&Zedefd'd(Ze ed)ejede
fd*d)Z!ed+ejdede"e	e# e	e e	e f fd,d+Z$ed-ejd.eddfd/d-Z%ed0ejde	e de	e de"e
e
f fd1d0Z&ed2ejde
fd3d2Z'ed4ejd5edee( ddfd6d4Z)ed7ejd8edee( ddfd9d7Z*e+de,fd:d;Z-d<e,ddfd=d>Z.de
fd?d@Z/  Z0S )CSqlEmbeddingsQueuea  A SQL database that stores embeddings, allowing a traditional RDBMS to be used as
    the primary ingest queue and satisfying the top level Producer/Consumer interfaces.

    Note that this class is only suitable for use cases where the producer and consumer
    are in the same process.

    This is because notification of new embeddings happens solely in-process: this
    implementation does not actively listen to the the database for new records added by
    other processes.
    c                
   @   sT   e Zd ZU eed< eed< eed< eed< eed< dededededef
ddZdS )	zSqlEmbeddingsQueue.Subscriptionid
topic_namestartendcallbackc                 C   s"   || _ || _|| _|| _|| _d S N)r-   r.   r/   r0   r1   )selfr-   r.   r/   r0   r1   r&   r&   r*   __init__I   s
   
z(SqlEmbeddingsQueue.Subscription.__init__N)	__name__
__module____qualname__r   __annotations__strintr   r4   r&   r&   r&   r*   SubscriptionB   s"   
 r;   _subscriptions_max_batch_size_tenant_topic_namespace   systemc                    sH   t t| _d | _|t| _|jd| _|jd| _	t
 | d S )N	tenant_idtopic_namespace)r   setr<   r=   requirer   _opentelemetry_clientsettingsr>   r?   superr4   )r3   rA   	__class__r&   r*   r4   ^   s   
zSqlEmbeddingsQueue.__init__zSqlEmbeddingsQueue.reset_statereturnNc                    s4   t    tt| _z| `W d S  ty   Y d S w r2   )rH   reset_stater   rD   r<   configAttributeErrorr3   rI   r&   r*   rL   f   s   


zSqlEmbeddingsQueue.delete_topiccollection_idc                 C   s   t | j| j|}td}|  ||jt|k	 }| 
 }t||  \}}||| W d    d S 1 s<w   Y  d S )Nembeddings_queue)r"   r>   r?   r    querybuilderfrom_wheretopicr   deletetxr   parameter_formatexecute)r3   rP   r.   tqcursqlparamsr&   r&   r*   
delete_logs   s   

"zSqlEmbeddingsQueue.delete_logzSqlEmbeddingsQueue.purge_logc                 C   sB  t d}|  |tt djd|jt	| 
|kt d|jt djk}t| j| j|}|  ]}t||  \}}||| | }|rZtdd |D }	n		 W d    d S t d}
|  |
|
jt	|	k |
jt	|k }t||  \}}||| W d    d S 1 sw   Y  d S )Nsegments
max_seq_idc                 s   s    | ]}|d  V  qdS )r   Nr&   )r'   rowr&   r&   r*   	<genexpr>   s    z/SqlEmbeddingsQueue.purge_log.<locals>.<genexpr>rQ   )r    rR   rS   selectr!   Coalesceseq_idrT   
collectionr   
uuid_to_db	left_joinonr-   
segment_idr"   r>   r?   rW   r   rX   rY   fetchallminrU   rV   )r3   rP   
segments_tsegment_ids_qr.   r\   r]   r^   results
min_seq_idrZ   r[   r&   r&   r*   	purge_log   s>   


	"z#SqlEmbeddingsQueue.submit_embedding	embeddingc                 C   s    | j std| ||gd S )NComponent not runningr   )_runningRuntimeErrorsubmit_embeddings)r3   rP   rt   r&   r&   r*   submit_embedding   s   z$SqlEmbeddingsQueue.submit_embeddings
embeddingsc                 C   s  | j stdt|dkrg S t|| jkr%td| jdd| jdd| j}t| j| j|}t	d}| 
 ||j|j|j|j|j|j}i }|D ].}| |\}	}
}|tt|d  t|t|d	 t|	t|
t|}t|||d	 < qM|  k}t||  \}}| d
}||| }ttd gt| }g }|D ]*\}}|||| < |||  }t|t||d |d |d |d dd}| | q| !|| | j"dj#r| $| |W  d    S 1 sw   Y  d S )Nru   r   z)
                Cannot submit more than ,zf embeddings at once.
                Please submit your embeddings in batches of size
                z or less.
                rQ   	operationr-   z RETURNING seq_id, idrt   encodingmetadata)r-   rt   r}   r~   r|   
log_offsetrecordautomatically_purge)%rv   rw   lenmax_batch_sizer   rM   r"   r>   r?   r    rR   intocolumnsr|   rU   r-   vectorr}   r~   !_prepare_vector_encoding_metadatainsertr   _operation_codesrW   r   rX   rY   rm   r   r   r   r   append_notify_allget_parametervaluers   )r3   rP   rz   _r.   rZ   r   	id_to_idxrt   embedding_bytesr}   r~   r\   r]   r^   rq   seq_idsembedding_recordsrg   r-   submit_embedding_recordembedding_recordr&   r&   r*   rx      s   





$zSqlEmbeddingsQueue.subscribe
consume_fnr/   r0   r-   c           	      C   sj   | j stdt| j| j|}|pt }| ||\}}| |||||}| 	| | j
| | |S )Nru   )rv   rw   r"   r>   r?   uuiduuid4_validate_ranger;   	_backfillr<   add)	r3   rP   r   r/   r0   r-   r.   subscription_idsubscriptionr&   r&   r*   	subscribe  s   



zSqlEmbeddingsQueue.unsubscriber   c                 C   sR   | j  D ]!\}}|D ]}|j|kr%|| t|dkr!| j |=   d S qqd S )Nr   )r<   itemsr-   remover   )r3   r   r.   subscriptionsr   r&   r&   r*   unsubscribe.  s   

c                 C      dS )Nrb   r&   rO   r&   r&   r*   	min_seqid9     zSqlEmbeddingsQueue.min_seqidc                 C   r   )Nl    r&   rO   r&   r&   r*   	max_seqid=  r   zSqlEmbeddingsQueue.max_seqidz!SqlEmbeddingsQueue.max_batch_sizec                 C   s   | j d u rR|  A}|d | }|D ]}d|d v r,t|d dd | j | _ q| j d u rAd| j | _ W d    | j S W d    | j S 1 sMw   Y  | j S )NzPRAGMA compile_options;MAX_VARIABLE_NUMBERr   =r#   i  )r=   rW   rY   rm   r:   splitVARIABLES_PER_RECORD)r3   r\   compile_optionsoptionr&   r&   r*   r   A  s&   





z4SqlEmbeddingsQueue._prepare_vector_encoding_metadatac                 C   s\   |d d urt t|d }|j}t|d |}nd }d }|d r't|d nd }|||fS )Nrt   r}   r~   )r   r   r   r   jsondumps)r3   rt   encoding_typer}   r   r~   r&   r&   r*   r   X  s   
zSqlEmbeddingsQueue._backfillr   c                 C   sD  t d}|  ||jt|jk|jt|jk|jt|j	k
|j|j|j|j|j|j|j}|  \}t||  \}}||| | }|D ]=}|d rft|d }	t|d |	}
nd}	d}
| |t|d tt|d  |d |
|	|d rt|d ndd	d
g qRW d   dS 1 sw   Y  dS )zUBackfill the given subscription with any currently matching records in the
        DBrQ   r%      Nr   r#   r$      )r|   r-   rt   r}   r~   r   )r    rR   rS   rT   rU   r   r.   rg   r/   r0   re   r|   r-   r   r}   r~   orderbyrW   r   rX   rY   rm   r   r   _notify_oner   r   _operation_codes_invr   loads)r3   r   rZ   r[   r\   r]   r^   rowsrc   r}   r   r&   r&   r*   r   i  sF   
	
"z"SqlEmbeddingsQueue._validate_rangec                 C   sX   |p|   }|p|  }t|trt|tstd||kr(td| d| ||fS )z[Validate and normalize the start and end SeqIDs for a subscription using this
        impl.z2SeqIDs must be integers for sql-based EmbeddingsDBzInvalid SeqID range: z to )_next_seq_idr   
isinstancer:   	TypeError
ValueError)r3   r/   r0   r&   r&   r*   r     s   zSqlEmbeddingsQueue._next_seq_idc                 C   sr   t d}|  |t|j}|  }||	  t
| d d W  d   S 1 s2w   Y  dS )z%Get the next SeqID for this database.rQ   r   r#   N)r    rR   rS   re   r!   Maxrg   rW   rY   r   r:   fetchoner3   rZ   r[   r\   r&   r&   r*   r     s   
$zSqlEmbeddingsQueue._notify_allrU   c                 C   s*   | j r| j| D ]
}| || qdS dS )z:Send a notification to each subscriber of the given topic.N)rv   r<   r   )r3   rU   rz   subr&   r&   r*   r     s
   zSqlEmbeddingsQueue._notify_oner   c              
   C   s   d}g }|D ]}|d |j krq|d |jkrd} n|| qzt|dkr-|| |r8| |j W dS W dS  tye } zt	d|jj
 d|j d t| trZ|W Y d}~dS d}~ww )	z+Send a notification to a single subscriber.Fr   Tr   z6Exception occurred invoking consumer for subscription z	to topic z %sN)r/   r0   r   r   r1   r   r-   BaseExceptionloggererrorhexr.   r9   _called_from_test)r3   r   rz   should_unsubscribefiltered_embeddingsrt   er&   r&   r*   r     s6   
c                 C   s   t d}|  ||jd}|  }||  |	 }W d    n1 s,w   Y  |d u rJ| 
 dk}ttd|g}| | |S t|d S )Nembeddings_queue_configr#   r   r   )r    rR   rS   re   config_json_strlimitrW   rY   r   r   _get_wal_sizer   r   
set_configfrom_json_str)r3   rZ   r[   r\   resultis_fresh_systemrM   r&   r&   r*   rM     s   



zSqlEmbeddingsQueue.configrM   c                 C   s\   |   }|dd| f W d    n1 sw   Y  z| `W d S  ty-   Y d S w )Nz
                INSERT OR REPLACE INTO embeddings_queue_config (id, config_json_str)
                VALUES (?, ?)
            r#   )rW   rY   to_json_strrM   rN   )r3   rM   r\   r&   r&   r*   r     s   

zSqlEmbeddingsQueue.set_configc                 C   sl   t d}|  |td}|  }||  t	|
 d W  d    S 1 s/w   Y  d S )NrQ   *r   )r    rR   rS   re   r!   CountrW   rY   r   r:   r   r   r&   r&   r*   r     s   
$z SqlEmbeddingsQueue._get_wal_size)rK   N)NNN)1r5   r6   r7   __doc__r;   r   r9   r   r8   r   r:   r   r   r4   r   r   ALLr   rL   r   r_   rs   r   r   ry   r   rx   r   r   r   r   r   propertyr   r   bytesr   r   r   r   r   r   r   r   r   rM   r   r   __classcell__r&   r&   rI   r*   r,   6   s   
 



+

S
	


(



r,   )=	functoolsr   r   chromadb.api.configurationr   r   chromadb.db.baser   r   r   chromadb.errorsr   chromadb.ingestr	   r
   r   r   r   chromadb.typesr   r   r   r   r   chromadb.configr    chromadb.telemetry.opentelemetryr   r   r   	overridesr   collectionsr   typingr   r   r   r   r   r   r   r   pypikar    r!   loggingchromadb.ingest.impl.utilsr"   	getLoggerr5   r   ADDUPDATEUPSERTDELETEr   r   r   r   r,   r&   r&   r&   r*   <module>   s4     
