o
    Ohj8                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dlZd dlZG dd de
Zd1d
dZd2ddZd3ddZd4ddZd5ddZd4ddZd4ddZd4ddZd4d d!Zd4d"d#Zd4d$d%Zd2d&d'Zd6d)d*Zejje  d+d,d2d-d.Zd2d/d0ZdS )7    )annotationsN)partial)Protocolc                   @  s   e Zd Zdd	ddZdS )
RawInput promptstrreturnc                 C  s   d S N )selfr   r   r   R/home/air/yokohama/back/venv/lib/python3.10/site-packages/trio/_tests/test_repl.py__call__   s    zRawInput.__call__Nr   r   r   r	   r   )__name__
__module____qualname__r   r   r   r   r   r      s    r   cmds	list[str]r	   c                   s"   t |  g dd	 fdd}|S )
z
    Pass in a list of strings.
    Returns a callable that returns each string, each time its called
    When there are not more strings to return, raise EOFError
    r   r   r   r	   c                   s*    |  zt W S  ty   td w r
   )appendnextStopIterationEOFError)r   	cmds_iterpromptsr   r   _raw_helper   s   

z$build_raw_input.<locals>._raw_helperNr   r   )iter)r   r   r   r   r   build_raw_input   s   r   Nonec                  C  sN   t dg} |  dksJ tt |   W d   dS 1 s w   Y  dS )z"Quick test of our helper function.cmd1N)r   pytestraisesr   )	raw_inputr   r   r   test_build_raw_input'   s
   
"r%   dict[str, object]c                   C  s   dt iS )N__builtins__)r'   r   r   r   r   build_locals2   s   r(   capsyspytest.CaptureFixture[str]monkeypatchpytest.MonkeyPatchc                   sb   t jjt d}tg d}||d| t j|I dH  |  \}}| g dks/J dS )z
    Run some basic commands through the interpreter while capturing stdout.
    Ensure that the interpreted prints the expected results.
    repl_locals)zx = 1zprint(f'{x=}')'hello'zdef func():z  print(x + 1)r   zfunc()zasync def afunc():z
  return 4r   zawait afunc()
import sysz"sys.stdout.write('hello stdout\n')r$   N)zx=1r/   24zhello stdout13)	trio_replTrioInteractiveConsoler(   r   setattrrun_repl
readouterr
splitlinesr)   r+   consoler$   out_errr   r   r   test_basic_interaction6   s   r?   c                   sl   t jjt d}tdg}| |d| tt t j	|I d H  W d    d S 1 s/w   Y  d S )Nr-   zraise SystemExitr$   )
r4   r5   r6   r(   r   r7   r"   r#   
SystemExitr8   )r+   r<   r$   r   r   r   "test_system_exits_quit_interpreter[   s   "rA   c                   sr   t jjt d}tg d}||d| t j|I d H  |  \}}d|v s+J d|vs1J d|v s7J d S )Nr-   )z"import signal, trio, trio.lowlevelzasync def f():zh  trio.lowlevel.spawn_system_task(    trio.to_thread.run_sync,    signal.raise_signal, signal.SIGINT,  )z  await trio.sleep_forever()z  print('should not see this')r   z	await f()z print('AFTER KeyboardInterrupt')r$   KeyboardInterruptshouldzAFTER KeyboardInterruptr4   r5   r6   r(   r   r7   r8   r9   r)   r+   r<   r$   r=   errr   r   r   test_KI_interruptsg   s   rG   c                   Z   t jjt d}tg d}||d| t j|I d H  |  \}}d|v s+J d S )Nr-   )r0   if sys.version_info < (3, 11):/  from exceptiongroup import BaseExceptionGroupr   z<raise BaseExceptionGroup('', [RuntimeError(), SystemExit()])!print('AFTER BaseExceptionGroup')r$   AFTER BaseExceptionGrouprD   r;   r   r   r   test_system_exits_in_exc_group   s   
rM   c                   rH   )Nr-   )r0   rI   rJ   r   zraise BaseExceptionGroup(z?  '', [BaseExceptionGroup('', [RuntimeError(), SystemExit()])])rK   r$   rL   rD   r;   r   r   r   %test_system_exits_in_nested_exc_group   s   rN   c                   sr   t jjt d}tddg}||d| t j|I d H  |  \}}d|vs+J d|vs1J d|v s7J d S )Nr-   zraise BaseExceptionprint('AFTER BaseException')r$   _threads.py_repl.pyAFTER BaseExceptionrD   rE   r   r   r   test_base_exception_captured   s   rS   c                   sZ   t jjt d}tddg}||d| t j|I d H  |  \}}d|v s+J d S )Nr-   z&raise ExceptionGroup('', [KeyError()])zprint('AFTER ExceptionGroup')r$   zAFTER ExceptionGrouprD   r;   r   r   r   test_exc_group_captured   s   rT   c                   sr   t jjt d}tg d}||d| t j|I d H  |  \}}d|vs+J d|vs1J d|v s7J d S )Nr-   )z-async def async_func_raises_base_exception():z  raise BaseExceptionr   z(await async_func_raises_base_exception()rO   r$   rP   rQ   rR   rD   rE   r   r   r   *test_base_exception_capture_from_coroutine   s   rU   c                  C  s(   t jtjddgdd} | jdksJ dS )zL
    Basic smoke test when running via the package __main__ entrypoint.
    -mr4   s   exit())inputr   N)
subprocessrunsys
executable
returncode)replr   r   r   test_main_entrypoint   s   r^   boolc                  C  s0   t jdkrdS td} |  sdS |  dkS )NlinuxFz /proc/sys/dev/tty/legacy_tiocstiT1)rZ   platformpathlibPathexists	read_text)sysctlr   r   r   should_try_newline_injection   s   

rh   z"the ioctl we use is disabled in CI)reasonc                  C  s  t jdksJ dd l} |  \}}|dkr$tjt jgt jdddgR   d}|ds8|t|d7 }|dr+t	|
  d}t|d	 |d
sX|t|d7 }|d
rK|ddksaJ t	|
  d}t|tj |d
s|t|d7 }|d
rud|v sJ t	|
  d}t|d t|tj |d
s|t|d7 }|d
rd|v sJ t	|
  t| t|dd  d S )Nwin32r   -urV   r4       s   import trio
>>> i      print("hello!")
   >>>    hello!      KeyboardInterrupts   print("hello!")   )rZ   rb   ptyforkosexeclpr[   endswithreadprintdecodewritecountkillsignalSIGINTclosewaitpid)rs   pidpty_fdbufferr   r   r   test_ki_newline_injection  sD   








r   c                    s  t  4 I d H } | tt jtjdddgtjtj	tjtj
dkr$tjnddI d H }|j4 I d H  d}|j2 z3 d H W }||7 }|dd	d
rO nq:6 t|  d}|jdI d H  |j2 z3 d H W }||7 }|drv nqe6 d|v s~J t|  tj
dkrd}|jdI d H  |j2 z3 d H W }||7 }|dr nq6 t|  d}|jdI d H  |j2 z3 d H W }||7 }|dr nq6 t|  tj
dkrtjntj}t|j| tj
dkr|jd	I d H  n	|jd	I d H  d}|j2 z3 d H W }||7 }|dr nq6 d|v s"J t|  |jdI d H  d}d}|j2 z)3 d H W }||7 }|dd	drX|sXt|j| d}|dr` nq86 d|v sjJ d|v sqJ t|  |jdI d H  d}d}|j2 z)3 d H W }||7 }|dd	dr|st|j| d}|dr nq6 d|v sJ d|v sJ t|  W d   I d H  n1 I d H sw   Y  | j  W d   I d H  d S 1 I d H sw   Y  d S )Nrk   rV   r4   rj   r   )stdoutstderrstdincreationflagsrl   s   
   
s   import trio
>>> rm   rn   ro   sI   import ctypes; ctypes.windll.kernel32.SetConsoleCtrlHandler(None, False)
sW   import coverage; trio.lowlevel.enable_ki_protection(coverage.pytracer.PyTracer._trace)
rq   s+   print("READY"); await trio.sleep_forever()
Fs   READY
Ts   trios/   import time; print("READY"); time.sleep(99999)
s	   Traceback)r4   open_nurserystartr   run_processrZ   r[   rX   PIPESTDOUTrb   CREATE_NEW_PROCESS_GROUPr   replacerw   ry   rz   r   send_allr~   CTRL_C_EVENTr   ru   r}   r   cancel_scopecancel)nurseryprocr   partsignal_sentkilledr   r   r   test_ki_in_repl<  s   








*j0r   )r   r   r	   r   )r	   r    )r	   r&   )r)   r*   r+   r,   r	   r    )r+   r,   r	   r    )r	   r_   ) 
__future__r   ru   rc   r~   rX   rZ   	functoolsr   typingr   r"   
trio._replr4   r   r   r%   r(   r?   rA   rG   rM   rN   rS   rT   rU   r^   rh   markskipifr   r   r   r   r   r   <module>   s<    




%







4