o
    4@Hh                     @  s   d dl mZ d dlZd dlZd dlmZ d dlZd dlZG dd deZ	d'd	d
Z
d(ddZd)ddZd*ddZd+ddZd*ddZd*ddZd*ddZd*dd Zd*d!d"Zd*d#d$Zd(d%d&ZdS ),    )annotationsN)Protocolc                   @  s   e Zd Zdd	ddZdS )
RawInput promptstrreturnc                 C  s   d S N )selfr   r
   r
   U/home/air/sanwanet/gpt-api/venv/lib/python3.10/site-packages/trio/_tests/test_repl.py__call__   s    zRawInput.__call__Nr   r   r   r   r   )__name__
__module____qualname__r   r
   r
   r
   r   r      s    r   cmds	list[str]r   c                   s"   t |  g dd	 fdd}|S )
z
    Pass in a list of strings.
    Returns a callable that returns each string, each time its called
    When there are not more strings to return, raise EOFError
    r   r   r   r   c                   s*    |  zt W S  ty   td w r	   )appendnextStopIterationEOFError)r   	cmds_iterpromptsr
   r   _raw_helper   s   

z$build_raw_input.<locals>._raw_helperNr   r   )iter)r   r   r
   r   r   build_raw_input   s   r   Nonec                  C  sN   t dg} |  dksJ tt |   W d   dS 1 s w   Y  dS )z"Quick test of our helper function.cmd1N)r   pytestraisesr   )	raw_inputr
   r
   r   test_build_raw_input#   s
   
"r$   dict[str, object]c                   C  s   dt iS )N__builtins__)r&   r
   r
   r
   r   build_locals.   s   r'   capsyspytest.CaptureFixture[str]monkeypatchpytest.MonkeyPatchc                   sb   t jjt d}tg d}||d| t j|I dH  |  \}}| g dks/J dS )z
    Run some basic commands through the interpreter while capturing stdout.
    Ensure that the interpreted prints the expected results.
    repl_locals)zx = 1zprint(f'{x=}')'hello'zdef func():z  print(x + 1)r   zfunc()zasync def afunc():z
  return 4r   zawait afunc()
import sysz"sys.stdout.write('hello stdout\n')r#   N)zx=1r.   24zhello stdout13)	trio_replTrioInteractiveConsoler'   r   setattrrun_repl
readouterr
splitlinesr(   r*   consoler#   out_errr
   r
   r   test_basic_interaction2   s   r>   c                   sl   t jjt d}tdg}| |d| tt t j	|I d H  W d    d S 1 s/w   Y  d S )Nr,   zraise SystemExitr#   )
r3   r4   r5   r'   r   r6   r!   r"   
SystemExitr7   )r*   r;   r#   r
   r
   r   "test_system_exits_quit_interpreterW   s   "r@   c                   sr   t jjt d}tg d}||d| t j|I d H  |  \}}d|v s+J d|vs1J d|v s7J d S )Nr,   )z"import signal, trio, trio.lowlevelzasync def f():zh  trio.lowlevel.spawn_system_task(    trio.to_thread.run_sync,    signal.raise_signal, signal.SIGINT,  )z  await trio.sleep_forever()z  print('should not see this')r   z	await f()z print('AFTER KeyboardInterrupt')r#   KeyboardInterruptshouldzAFTER KeyboardInterruptr3   r4   r5   r'   r   r6   r7   r8   r(   r*   r;   r#   r<   errr
   r
   r   test_KI_interruptsc   s   rF   c                   Z   t jjt d}tg d}||d| t j|I d H  |  \}}d|v s+J d S )Nr,   )r/   if sys.version_info < (3, 11):/  from exceptiongroup import BaseExceptionGroupr   z<raise BaseExceptionGroup('', [RuntimeError(), SystemExit()])!print('AFTER BaseExceptionGroup')r#   AFTER BaseExceptionGrouprC   r:   r
   r
   r   test_system_exits_in_exc_group   s   
rL   c                   rG   )Nr,   )r/   rH   rI   r   zraise BaseExceptionGroup(z?  '', [BaseExceptionGroup('', [RuntimeError(), SystemExit()])])rJ   r#   rK   rC   r:   r
   r
   r   %test_system_exits_in_nested_exc_group   s   rM   c                   sr   t jjt d}tddg}||d| t j|I d H  |  \}}d|vs+J d|vs1J d|v s7J d S )Nr,   zraise BaseExceptionprint('AFTER BaseException')r#   _threads.py_repl.pyAFTER BaseExceptionrC   rD   r
   r
   r   test_base_exception_captured   s   rR   c                   sZ   t jjt d}tddg}||d| t j|I d H  |  \}}d|v s+J d S )Nr,   z&raise ExceptionGroup('', [KeyError()])zprint('AFTER ExceptionGroup')r#   zAFTER ExceptionGrouprC   r:   r
   r
   r   test_exc_group_captured   s   rS   c                   sr   t jjt d}tg d}||d| t j|I d H  |  \}}d|vs+J d|vs1J d|v s7J d S )Nr,   )z-async def async_func_raises_base_exception():z  raise BaseExceptionr   z(await async_func_raises_base_exception()rN   r#   rO   rP   rQ   rC   rD   r
   r
   r   *test_base_exception_capture_from_coroutine   s   rT   c                  C  s(   t jtjddgdd} | jdksJ dS )zL
    Basic smoke test when running via the package __main__ entrypoint.
    z-mr3   s   exit())inputr   N)
subprocessrunsys
executable
returncode)replr
   r
   r   test_main_entrypoint   s   r\   )r   r   r   r   )r   r   )r   r%   )r(   r)   r*   r+   r   r   )r*   r+   r   r   )
__future__r   rV   rX   typingr   r!   
trio._replr3   r   r   r$   r'   r>   r@   rF   rL   rM   rR   rS   rT   r\   r
   r
   r
   r   <module>   s&    




%





