o
    "ch;                  !   @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZmZmZmZmZ d dlZdd	lm Z  dd
l!m"Z"m#Z#m$Z$m%Z% erxd dlm&Z& dZ'e(dZ)dddde  d dddddddddeee*f dee*edef f deedf deee*ef  dddeeee# gdf  deee"e*ge+f  de,d e-d!e-d"ee+ d#e-d$e-d%e+d&e+d'e-f d(d)Z.dddde  d dddddd*deee*f dee*edef f deedf deee*ef  dddeeee# gef  deee"e*ge+f  de,d e-d!e-d"ee+ d%e+d&e+d'e-fd+d,Z/ed-Z0d.e*d'ee* fd/d0Z1	dKdee*edef f dd1deedf deee*ef  d2eee#  d'd3fd4d5Z2dee*edef f d'd1fd6d7Z3G d8d3 d3Z4d9e*d:ee* deedf dee*ef d'df
d;d<Z5d=e*d'efd>d?Z6d'ee* fd@dAZ7e j8d:ee* d'edB fdCdDZ9dEe-dFed'dfdGdHZ:dLdIdJZ;dS )M    N)import_module)get_context)SpawnProcess)Path)sleep)
TYPE_CHECKINGAnyCallableDict	GeneratorListOptionalSetTupleUnion   )DefaultFilter)Change
FileChangeawatchwatch)Literal)run_processarun_processdetect_target_typeimport_stringzwatchfiles.main autoi@  2      TF)argskwargstarget_typecallbackwatch_filtergrace_perioddebouncestepdebugsigint_timeoutsigkill_timeout	recursiveignore_permission_deniedpathstarget.r    r!   r"   z&Literal['function', 'command', 'auto']r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   returnc              
   G   s   |dkrt | }td| | t  t| |||}d}|r'td| t| z0t|||||	d||dD ]}|o<|| |j|
|d t| ||||}|d7 }q5W |  |S |  w )	u  
    Run a process and restart it upon file changes.

    `run_process` can work in two ways:

    * Using `multiprocessing.Process` † to run a python function
    * Or, using `subprocess.Popen` to run a command

    !!! note

        **†** technically `multiprocessing.get_context('spawn').Process` to avoid forking and improve
        code reload/import.

    Internally, `run_process` uses [`watch`][watchfiles.watch] with `raise_interrupt=False` so the function
    exits cleanly upon `Ctrl+C`.

    Args:
        *paths: matches the same argument of [`watch`][watchfiles.watch]
        target: function or command to run
        args: arguments to pass to `target`, only used if `target` is a function
        kwargs: keyword arguments to pass to `target`, only used if `target` is a function
        target_type: type of target. Can be `'function'`, `'command'`, or `'auto'` in which case
            [`detect_target_type`][watchfiles.run.detect_target_type] is used to determine the type.
        callback: function to call on each reload, the function should accept a set of changes as the sole argument
        watch_filter: matches the same argument of [`watch`][watchfiles.watch]
        grace_period: number of seconds after the process is started before watching for changes
        debounce: matches the same argument of [`watch`][watchfiles.watch]
        step: matches the same argument of [`watch`][watchfiles.watch]
        debug: matches the same argument of [`watch`][watchfiles.watch]
        sigint_timeout: the number of seconds to wait after sending sigint before sending sigkill
        sigkill_timeout: the number of seconds to wait after sending sigkill before raising an exception
        recursive: matches the same argument of [`watch`][watchfiles.watch]

    Returns:
        number of times the function was reloaded.

    ```py title="Example of run_process running a function"
    from watchfiles import run_process

    def callback(changes):
        print('changes detected:', changes)

    def foobar(a, b):
        print('foobar called with:', a, b)

    if __name__ == '__main__':
        run_process('./path/to/dir', target=foobar, args=(1, 2), callback=callback)
    ```

    As well as using a `callback` function, changes can be accessed from within the target function,
    using the `WATCHFILES_CHANGES` environment variable.

    ```py title="Example of run_process accessing changes"
    from watchfiles import run_process

    def foobar(a, b, c):
        # changes will be an empty list "[]" the first time the function is called
        changes = os.getenv('WATCHFILES_CHANGES')
        changes = json.loads(changes)
        print('foobar called due to changes:', changes)

    if __name__ == '__main__':
        run_process('./path/to/dir', target=foobar, args=(1, 2, 3))
    ```

    Again with the target as `command`, `WATCHFILES_CHANGES` can be used
    to access changes.

    ```bash title="example.sh"
    echo "changers: ${WATCHFILES_CHANGES}"
    ```

    ```py title="Example of run_process running a command"
    from watchfiles import run_process

    if __name__ == '__main__':
        run_process('.', target='./example.sh')
    ```
    r   running "%s" as %sr   3sleeping for %s seconds before watching for changesF)r$   r&   r'   r(   raise_interruptr+   r,   )r)   r*   r   )r   loggerr(   catch_sigtermstart_processr   r   stop)r.   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   processreloadschangesr   r   L/home/air/goalskill/back/venv/lib/python3.10/site-packages/watchfiles/run.pyr      s:   `



r   )r    r!   r"   r#   r$   r%   r&   r'   r(   r+   r,   c              	      s  ddl }|dkrt| }td| | t  tjt| |||I dH }d}|r6td| t	|I dH  t
|||||	|
|d2 z43 dH W }|durZ||}||rZ|I dH  tj|jI dH  tjt| ||||I dH }|d7 }qB6 tj|jI dH  |S )a  
    Async equivalent of [`run_process`][watchfiles.run_process], all arguments match those of `run_process` except
    `callback` which can be a coroutine.

    Starting and stopping the process and watching for changes is done in a separate thread.

    As with `run_process`, internally `arun_process` uses [`awatch`][watchfiles.awatch], however `KeyboardInterrupt`
    cannot be caught and suppressed in `awatch` so these errors need to be caught separately, see below.

    ```py title="Example of arun_process usage"
    import asyncio
    from watchfiles import arun_process

    async def callback(changes):
        await asyncio.sleep(0.1)
        print('changes detected:', changes)

    def foobar(a, b):
        print('foobar called with:', a, b)

    async def main():
        await arun_process('.', target=foobar, args=(1, 2), callback=callback)

    if __name__ == '__main__':
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print('stopped via KeyboardInterrupt')
    ```
    r   Nr   r0   r1   )r$   r&   r'   r(   r+   r,   r   )inspectr   r3   r(   r4   anyio	to_threadrun_syncr5   r   r   isawaitabler6   )r.   r    r!   r"   r#   r$   r%   r&   r'   r(   r+   r,   r-   r;   r7   r8   r9   rr   r   r:   r      s<   -	


r   spawncmdc                 C   s(   dd l }| j dk}tj| |dS )Nr   windows)posix)platformunamesystemlowershlexsplit)rB   rE   rD   r   r   r:   	split_cmd   s   rK   zLiteral['function', 'command']r9   CombinedProcessc           	      C   s   |d u rd}n
t dd |D }|tjd< |dkrA|pi }t| tr/| t ||f}t}i }n| }tj	|||d}|
  t|S |sE|rJtd t| tsSJ dt| }t|}t|S )	Nz[]c                 S   s   g | ]
\}}|  |gqS r   )raw_str).0cpr   r   r:   
<listcomp>  s    z!start_process.<locals>.<listcomp>WATCHFILES_CHANGESfunction)r.   r    r!   z-ignoring args and kwargs for "command" targetz+target must be a string to run as a command)jsondumpsosenviron
isinstancestrget_tty_pathrun_functionspawn_contextProcessstartr3   warningrK   
subprocessPopenrL   )	r.   r"   r    r!   r9   changes_env_vartarget_r7   
popen_argsr   r   r:   r5      s(   



r5   c                 C   s0   t | tsdS | drdS td| rdS dS )a^  
    Used by [`run_process`][watchfiles.run_process], [`arun_process`][watchfiles.arun_process]
    and indirectly the CLI to determine the target type with `target_type` is `auto`.

    Detects the target type - either `function` or `command`. This method is only called with `target_type='auto'`.

    The following logic is employed:

    * If `target` is not a string, it is assumed to be a function
    * If `target` ends with `.py` or `.sh`, it is assumed to be a command
    * Otherwise, the target is assumed to be a function if it matches the regex `[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)+`

    If this logic does not work for you, specify the target type explicitly using the `target_type` function argument
    or `--target-type` command line argument.

    Args:
        target: The target value

    Returns:
        either `'function'` or `'command'`
    rS   )z.pyz.shcommandz[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)+)rX   rY   endswithre	fullmatch)r.   r   r   r:   r     s   

r   c                   @   sv   e Zd ZdddZddeded	d
fddZd	efddZed	efddZ	ded	d
fddZ
ed	ee fddZd
S )rL   rP   ,Union[SpawnProcess, subprocess.Popen[bytes]]c                 C   s   || _ | jd usJ dd S )Nzprocess not yet spawned_ppid)selfrP   r   r   r:   __init__>  s   zCombinedProcess.__init__r   r   r)   r*   r/   Nc                 C   s   t jdd  |  rPtd t | jtj	 z| 
| W n tjy/   td| Y nw | jd u rItd t | jtj | 
| d S td d S td| j d S )NrR   zstopping process...z!SIGINT timed out after %r secondsz+process has not terminated, sending SIGKILLzprocess stoppedz#process already dead, exit code: %d)rV   rW   popis_aliver3   r(   killrl   signalSIGINTjoinr`   TimeoutExpiredr_   exitcodeSIGKILL)rm   r)   r*   r   r   r:   r6   B  s    


zCombinedProcess.stopc                 C   s$   t | jtr| j S | j d u S N)rX   rk   r   rp   pollrm   r   r   r:   rp   Z  s   
zCombinedProcess.is_alivec                 C   s   | j jS rx   rj   rz   r   r   r:   rl   `  s   zCombinedProcess.pidtimeoutc                 C   s,   t | jtr| j| d S | j| d S rx   )rX   rk   r   rt   wait)rm   r{   r   r   r:   rt   e  s   zCombinedProcess.joinc                 C   s   t | jtr
| jjS | jjS rx   )rX   rk   r   rv   
returncoderz   r   r   r:   rv   k  s   zCombinedProcess.exitcode)rP   ri   )r   r   )__name__
__module____qualname__rn   intr6   boolrp   propertyrl   rt   r   rv   r   r   r   r:   rL   =  s    
rS   tty_pathc                 C   sD   t | t| }||i | W d    d S 1 sw   Y  d S rx   )set_ttyr   )rS   r   r    r!   funcr   r   r:   r[   s  s   
"r[   dotted_pathc              
   C   s   z|  ddd\}}W n ty" } z	td|  d|d}~ww t|}zt||W S  tyE } ztd| d| d	|d}~ww )
z
    Stolen approximately from django. Import a dotted module path and return the attribute/class designated by the
    last name in the path. Raise ImportError if the import fails.
     .r   "z!" doesn't look like a module pathNzModule "z" does not define a "z" attribute)striprsplit
ValueErrorImportErrorr   getattrAttributeError)r   module_path
class_nameemoduler   r   r:   r   y  s   r   c                   C   s:   z	t tj W S  ty   Y dS  ty   Y dS w )zr
    Return the path to the current TTY, if any.

    Virtually impossible to test in pytest, hence no cover.
    z/dev/ttyN)rV   ttynamesysstdinfilenoOSErrorr   r   r   r   r:   rZ     s   rZ   )NNNc                 c   sj    | r0zt | }|t_d V  W d    W d S 1 sw   Y  W d S  ty/   d V  Y d S w d V  d S rx   )openr   r   r   )r   ttyr   r   r:   r     s   
&
r   signum_framec                 C   s   t dt|  t)Nz-received signal %s, raising KeyboardInterrupt)r3   r_   rr   SignalsKeyboardInterrupt)r   r   r   r   r:   raise_keyboard_interrupt  s   r   c                   C   s"   t dt  ttjt dS )a  
    Catch SIGTERM and raise KeyboardInterrupt instead. This means watchfiles will stop quickly
    on `docker compose stop` and other cases where SIGTERM is sent.

    Without this the watchfiles process will be killed while a running process will continue uninterrupted.
    z8registering handler for SIGTERM on watchfiles process %dN)r3   r(   rV   getpidrr   SIGTERMr   r   r   r   r:   r4     s   r4   rx   )r/   N)<
contextlibrT   loggingrV   rg   rI   rr   r`   r   	importlibr   multiprocessingr   multiprocessing.contextr   pathlibr   timer   typingr   r   r	   r
   r   r   r   r   r   r   r<   filtersr   mainr   r   r   r   r   __all__	getLoggerr3   rY   r   floatr   r   r   r\   rK   r5   r   rL   r[   r   rZ   contextmanagerr   r   r4   r   r   r   r:   <module>   s   0


	

 

	

R


"$ 26