o
    dTi6                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dlZd dlZG dd de
Zd.d
dZd/ddZd0ddZd1ddZd0ddZd0ddZd0ddZd0ddZd0dd Zd0d!d"Zd/d#d$Zd2d&d'Zejje  d(d)d/d*d+Zd/d,d-ZdS )3    )annotationsN)partial)Protocolc                   @  s   e Zd Zdd	ddZdS )
RawInput promptstrreturnc                 C  s   d S N )selfr   r   r   Q/home/air/sos-pdf/back/venv/lib/python3.10/site-packages/trio/_tests/test_repl.py__call__   s    zRawInput.__call__Nr   r   r   r	   r   )__name__
__module____qualname__r   r   r   r   r   r      s    r   cmds	list[str]r	   c                   s"   t |  g dd	 fdd}|S )
z
    Pass in a list of strings.
    Returns a callable that returns each string, each time its called
    When there are not more strings to return, raise EOFError
    r   r   r   r	   c                   s*    |  zt W S  ty   td w r
   )appendnextStopIterationEOFError)r   	cmds_iterpromptsr   r   _raw_helper   s   

z$build_raw_input.<locals>._raw_helperNr   r   )iter)r   r   r   r   r   build_raw_input   s   r   Nonec                  C  sN   t dg} |  dksJ tt |   W d   dS 1 s w   Y  dS )z"Quick test of our helper function.cmd1N)r   pytestraisesr   )	raw_inputr   r   r   test_build_raw_input'   s
   
"r%   capsyspytest.CaptureFixture[str]monkeypatchpytest.MonkeyPatchc                   s\   t j }tg d}||d| t j|I dH  |  \}}| g dks,J dS )z
    Run some basic commands through the interpreter while capturing stdout.
    Ensure that the interpreted prints the expected results.
    )zx = 1zprint(f'{x=}')'hello'zdef func():z  print(x + 1)r   zfunc()zasync def afunc():z
  return 4r   zawait afunc()
import sysz"sys.stdout.write('hello stdout\n')r$   N)zx=1r*   24zhello stdout13)trio_replTrioInteractiveConsoler   setattrrun_repl
readouterr
splitlinesr&   r(   consoler$   out_errr   r   r   test_basic_interaction/   s   
r:   c                   sf   t j }tdg}| |d| tt t j|I d H  W d    d S 1 s,w   Y  d S )Nzraise SystemExitr$   )	r/   r0   r1   r   r2   r"   r#   
SystemExitr3   )r(   r7   r$   r   r   r   "test_system_exits_quit_interpreterT   s   
"r<   c                   sl   t j }tg d}||d| t j|I d H  |  \}}d|v s(J d|vs.J d|v s4J d S )N)z"import signal, trio, trio.lowlevelzasync def f():zh  trio.lowlevel.spawn_system_task(    trio.to_thread.run_sync,    signal.raise_signal, signal.SIGINT,  )z  await trio.sleep_forever()z  print('should not see this')r   z	await f()z print('AFTER KeyboardInterrupt')r$   KeyboardInterruptshouldzAFTER KeyboardInterruptr/   r0   r1   r   r2   r3   r4   r&   r(   r7   r$   r8   errr   r   r   test_KI_interrupts`   s   
rB   c                   T   t j }tg d}||d| t j|I d H  |  \}}d|v s(J d S )N)r+   if sys.version_info < (3, 11):/  from exceptiongroup import BaseExceptionGroupr   z<raise BaseExceptionGroup('', [RuntimeError(), SystemExit()])!print('AFTER BaseExceptionGroup')r$   AFTER BaseExceptionGroupr?   r6   r   r   r   test_system_exits_in_exc_group|   s   

rH   c                   rC   )N)r+   rD   rE   r   zraise BaseExceptionGroup(z?  '', [BaseExceptionGroup('', [RuntimeError(), SystemExit()])])rF   r$   rG   r?   r6   r   r   r   %test_system_exits_in_nested_exc_group   s   
rI   c                   sl   t j }tddg}||d| t j|I d H  |  \}}d|vs(J d|vs.J d|v s4J d S )Nzraise BaseExceptionprint('AFTER BaseException')r$   _threads.py_repl.pyAFTER BaseExceptionr?   r@   r   r   r   test_base_exception_captured   s   
rN   c                   sT   t j }tddg}||d| t j|I d H  |  \}}d|v s(J d S )Nz&raise ExceptionGroup('', [KeyError()])zprint('AFTER ExceptionGroup')r$   zAFTER ExceptionGroupr?   r6   r   r   r   test_exc_group_captured   s   
rO   c                   sl   t j }tg d}||d| t j|I d H  |  \}}d|vs(J d|vs.J d|v s4J d S )N)z-async def async_func_raises_base_exception():z  raise BaseExceptionr   z(await async_func_raises_base_exception()rJ   r$   rK   rL   rM   r?   r@   r   r   r   *test_base_exception_capture_from_coroutine   s   
rP   c                  C  s(   t jtjddgdd} | jdksJ dS )zL
    Basic smoke test when running via the package __main__ entrypoint.
    -mr/   s   exit())inputr   N)
subprocessrunsys
executable
returncode)replr   r   r   test_main_entrypoint   s   rY   boolc                  C  s0   t jdkrdS td} |  sdS |  dkS )NlinuxFz /proc/sys/dev/tty/legacy_tiocstiT1)rU   platformpathlibPathexists	read_text)sysctlr   r   r   should_try_newline_injection   s   

rc   z"the ioctl we use is disabled in CI)reasonc                  C  s  t jdksJ dd l} |  \}}|dkr$tjt jgt jdddgR   d}|ds8|t|d7 }|dr+t	|
  d}t|d	 |d
sX|t|d7 }|d
rK|ddksaJ t	|
  d}t|tj |d
s|t|d7 }|d
rud|v sJ t	|
  d}t|d t|tj |d
s|t|d7 }|d
rd|v sJ t	|
  t| t|dd  d S )Nwin32r   -urQ   r/       s   import trio
>>> i      print("hello!")
   >>>    hello!      KeyboardInterrupts   print("hello!")   )rU   r]   ptyforkosexeclprV   endswithreadprintdecodewritecountkillsignalSIGINTclosewaitpid)rn   pidpty_fdbufferr   r   r   test_ki_newline_injection   sD   








r   c                    s  t  4 I d H } | tt jtjdddgtjtj	tjtj
dkr$tjnddI d H }|j4 I d H  d}|j2 z3 d H W }||7 }|dd	d
rO nq:6 t|  d}|jdI d H  |j2 z3 d H W }||7 }|drv nqe6 d|v s~J t|  tj
dkrd}|jdI d H  |j2 z3 d H W }||7 }|dr nq6 t|  d}|jdI d H  |j2 z3 d H W }||7 }|dr nq6 t|  tj
dkrtjntj}t|j| tj
dkr|jd	I d H  n	|jd	I d H  d}|j2 z3 d H W }||7 }|dr nq6 d|v s"J t|  |jdI d H  d}d}|j2 z)3 d H W }||7 }|dd	drX|sXt|j| d}|dr` nq86 d|v sjJ d|v sqJ t|  |jdI d H  d}d}|j2 z)3 d H W }||7 }|dd	dr|st|j| d}|dr nq6 d|v sJ d|v sJ t|  W d   I d H  n1 I d H sw   Y  | j  W d   I d H  d S 1 I d H sw   Y  d S )Nrf   rQ   r/   re   r   )stdoutstderrstdincreationflagsrg   s   
   
s   import trio
>>> rh   ri   rj   sI   import ctypes; ctypes.windll.kernel32.SetConsoleCtrlHandler(None, False)
sW   import coverage; trio.lowlevel.enable_ki_protection(coverage.pytracer.PyTracer._trace)
rl   s+   print("READY"); await trio.sleep_forever()
Fs   READY
Ts   trios/   import time; print("READY"); time.sleep(99999)
s	   Traceback)r/   open_nurserystartr   run_processrU   rV   rS   PIPESTDOUTr]   CREATE_NEW_PROCESS_GROUPr   replacerr   rt   ru   r   send_allry   CTRL_C_EVENTrz   rp   rx   r}   cancel_scopecancel)nurseryprocr   partsignal_sentkilledr   r   r   test_ki_in_repl5  s   








*j0r   )r   r   r	   r   )r	   r    )r&   r'   r(   r)   r	   r    )r(   r)   r	   r    )r	   rZ   )
__future__r   rp   r^   ry   rS   rU   	functoolsr   typingr   r"   
trio._replr/   r   r   r%   r:   r<   rB   rH   rI   rN   rO   rP   rY   rc   markskipifr   r   r   r   r   r   <module>   s:    



%







4