o
    3Ih                     @   sr   d dl Z d dlZd dlZd dlmZ ddlmZ ddlmZ ddlm	Z	 G dd dej
Zed	kr7e  dS dS )
    N)mock   )ConfigException)ExecProvider)
ConfigNodec                   @   s   e Zd Zdd Zdd Zeddd Zeddd	 Zedd
d Z	eddd Z
eddd Zeddd Zeddd Zeddd ZdS )ExecProviderTestc                 C   s@   t ddg ddd d| _t ddg dddd d| _d| _d S )	Ntestzaws-iam-authenticator)tokenz-idummyz$client.authentication.k8s.io/v1beta1)commandargs
apiVersionenvT)r   r   r   provideClusterInfor   z
        {
            "apiVersion": "client.authentication.k8s.io/v1beta1",
            "kind": "ExecCredential",
            "status": {
                "token": "dummy"
            }
        }
        )r   input_okinput_with_cluster	output_ok)self r   d/home/air/sanwanet/gpt-api/venv/lib/python3.10/site-packages/kubernetes/config/exec_provider_test.pysetUp   s   


zExecProviderTest.setUpc              	   C   sz   t di t dddit dddig}|D ]&}| t}t|d  W d    n1 s+w   Y  | d|jjd  qd S )	Ntest1test2r    test3r   z$exec: malformed request. missing keyr   )r   assertRaisesr   r   assertIn	exceptionr   )r   exec_configsexec_configcontextr   r   r   test_missing_input_keys4   s   
z(ExecProviderTest.test_missing_input_keyszsubprocess.Popenc                 C   st   |j }d|j_ d|j_ | t}t| jd }|  W d    n1 s%w   Y  | d|jj  |j	j
d  d S )Nr   r   r   zexec: process returned %dr   return_valuewaitcommunicater   r   r   r   runr   r   r   r   r   instancer    epr   r   r   test_error_code_returned>   s   

z)ExecProviderTest.test_error_code_returnedc                 C   sl   |j }d|j_ d|j_ | t}t| jd }|  W d    n1 s%w   Y  | d|j	j
d  d S )Nr   r"   z%exec: failed to decode process outputr#   r(   r   r   r   test_nonjson_output_returnedI   s   

z-ExecProviderTest.test_nonjson_output_returnedc              	   C   s   |j }d|j_ g d}|D ]1}|df|j_ | t}t| jd }|  W d    n1 s/w   Y  | d|j	j
d  qd S )Nr   )z
            {
                "kind": "ExecCredential",
                "status": {
                    "token": "dummy"
                }
            }
            z
            {
                "apiVersion": "client.authentication.k8s.io/v1beta1",
                "status": {
                    "token": "dummy"
                }
            }
            z
            {
                "apiVersion": "client.authentication.k8s.io/v1beta1",
                "kind": "ExecCredential"
            }
            r   z%exec: malformed response. missing keyr#   )r   r   r)   outputsoutputr    r*   r   r   r   test_missing_output_keysT   s   

z)ExecProviderTest.test_missing_output_keysc                 C   s   |j }d|j_ d}d| }|df|j_ | t}t| jd }|  W d    n1 s-w   Y  | d| |j	j
d  d S )Nr   zclient.authentication.k8s.io/v1z
        {
            "apiVersion": "%s",
            "kind": "ExecCredential",
            "status": {
                "token": "dummy"
            }
        }
        r   z*exec: plugin api version %s does not matchr#   )r   r   r)   wrong_api_versionr.   r    r*   r   r   r   test_mismatched_api_versionv   s"   	

z,ExecProviderTest.test_mismatched_api_versionc                 C   sR   |j }d|j_ | jdf|j_ t| jd }| }| t|t	 | d|v  d S )Nr   r   r	   )
r$   r%   r   r&   r   r   r'   
assertTrue
isinstancedict)r   r   r)   r*   resultr   r   r   
test_ok_01   s   zExecProviderTest.test_ok_01c                 C   sJ   |j }d|j_ | jdf|j_ t| jd}|  | |jd d d d S )Nr   r   z/some/directoryr   cwd)	r$   r%   r   r&   r   r   r'   assertEqual	call_args)r   r   r)   r*   r   r   r   test_run_in_dir   s   z ExecProviderTest.test_run_in_dirc                 C   s   |j }d|j_ | jdf|j_ tjjdd d}|" t| jd }|	 }| 
t|t | 
d|v  W d    d S 1 s=w   Y  d S )Nr   r   z
sys.stdout)newr	   )r$   r%   r   r&   unittestr   patchr   r   r'   r2   r3   r4   )r   r   r)   mock_stdoutr*   r5   r   r   r   test_ok_no_console_attached   s   "z,ExecProviderTest.test_ok_no_console_attachedc                 C   s   |j }d|j_ | jdf|j_ t| jd tdddi}| }| t	|t
 | d|v  t|jjd d }| |d	 d d d d S )
Nr   r   clusterserverzname.company.comr	   r   KUBERNETES_EXEC_INFOspec)r$   r%   r   r&   r   r   r   r'   r2   r3   r4   jsonloadsr9   kwargsr8   )r   r   r)   r*   r5   objr   r   r   test_with_cluster_info   s   z'ExecProviderTest.test_with_cluster_infoN)__name__
__module____qualname__r   r!   r   r=   r+   r,   r/   r1   r6   r:   r?   rH   r   r   r   r   r      s&    





!

	

r   __main__)rD   osr<   r   config_exceptionr   exec_providerr   kube_configr   TestCaser   rI   mainr   r   r   r   <module>   s    "