U
    Jc$5                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
mZmZmZmZmZ d dlmZmZ dddgZG dd deZG dd deZG d	d dZdS )
    N)CallableDictListOptionalSetTuple)TimerClientTimerRequestFileTimerClientFileTimerRequestFileTimerServerc                   @   sR   e Zd ZdZdddddgZdeeeedd	d
dZe	dddZ
edddZdS )r   a@  
    Data object representing a countdown timer acquisition and release
    that is used between the ``FileTimerClient`` and ``FileTimerServer``.
    A negative ``expiration_time`` should be interpreted as a "release"
    request.
    ``signal`` is the signal to reap the worker process from the server
    process.
    version
worker_pidscope_idexpiration_timesignalr   N)r   r   r   r   returnc                 C   s"   d| _ || _|| _|| _|| _d S )N   )r   r   r   r   r   )selfr   r   r   r    r   Z/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/elastic/timer/file_based_local_timer.py__init__"   s
    zFileTimerRequest.__init__r   c                 C   sJ   t |trF| j|jkoD| j|jkoD| j|jkoD| j|jkoD| j|jkS dS )NF)
isinstancer   r   r   r   r   r   )r   otherr   r   r   __eq__)   s    




zFileTimerRequest.__eq__c                 C   s    t | j| j| j| j| jdS )N)r   pidr   r   r   )jsondumpsr   r   r   r   r   r   r   r   r   to_json4   s    zFileTimerRequest.to_json)r   )__name__
__module____qualname____doc__	__slots__intstrfloatr   boolr   r    r   r   r   r   r      s
   	c                       s   e Zd ZdZejdkrejnejfe	dd fddZ
eej ddd	Zedd
ddZe	eddddZe	ddddZ  ZS )r
   a  
    Client side of ``FileTimerServer``. This client is meant to be used
    on the same host that the ``FileTimerServer`` is running on and uses
    pid to uniquely identify a worker.
    This client uses a named_pipe to send timer requests to the
    ``FileTimerServer``. This client is a producer while the
    ``FileTimerServer`` is a consumer. Multiple clients can work with
    the same ``FileTimerServer``.

    Args:

        file_path: str, the path of a FIFO special file. ``FileTimerServer``
                        must have created it by calling os.mkfifo().

        signal: singal, the signal to use to kill the process. Using a
                        negative or zero signal will not kill the process.
    win32N)	file_pathr   c                    s   t    || _|| _d S N)superr   
_file_pathr   )r   r+   r   	__class__r   r   r   R   s    
zFileTimerClient.__init__r   c                 C   s@   z$t | jt jt jB }t |dW S  tk
r:   Y d S X d S )Nwt)osopenr.   O_WRONLY
O_NONBLOCKfdopen	Exception)r   fdr   r   r   _open_non_blockingX   s
    z"FileTimerClient._open_non_blocking)requestr   c              	   C   sf   |   }|d krtd|@ | }t|tjkrJtdtj d| ||d  W 5 Q R X d S )NzMCould not send the FileTimerRequest because FileTimerServer is not available.zFileTimerRequest larger than z bytes is not supported: 
)r9   BrokenPipeErrorr    lenselectPIPE_BUFRuntimeErrorwrite)r   r:   filejson_requestr   r   r   _send_request_   s    zFileTimerClient._send_request)r   r   r   c                 C   s"   | j tt ||| jdd d S )Nr   r   r   r   r:   )rD   r   r2   getpidr   )r   r   r   r   r   r   acquireq   s    zFileTimerClient.acquire)r   r   c                 C   s    | j tt |dddd d S )Nr   rE   rF   )rD   r   r2   rG   )r   r   r   r   r   release{   s    zFileTimerClient.release)r!   r"   r#   r$   sysplatformr   SIGKILLZCTRL_C_EVENTr'   r   r   ioTextIOWrapperr9   r   rD   r(   rH   rJ   __classcell__r   r   r/   r   r
   @   s   
c                   @   s  e Zd ZdZd%eeeeeee	 gdf ddddZ
ddd	d
ZddddZddddZddddZejddddZee	 ee dddZejeee	 dddZee	 ddddZee ddddZeeeee	 f dd d!Zeeed"d#d$ZdS )&r   a  
    Server that works with ``FileTimerClient``. Clients are expected to be
    running on the same host as the process that is running this server.
    Each host in the job is expected to start its own timer server locally
    and each server instance manages timers for local workers (running on
    processes on the same host).

    Args:

        file_path: str, the path of a FIFO special file to be created.

        max_interval: float, max interval in seconds for each watchdog loop.

        daemon: bool, running the watchdog thread in daemon mode or not.
                      A daemon thread will not block a process to stop.
        log_event: Callable[[Dict[str, str]], None], an optional callback for
                logging the events in JSON format.
    
   TN)r+   max_intervaldaemon	log_eventr   c                 C   sp   || _ || _|| _i | _d| _d | _tj| j r>t	| j  t
| j  d| _d| _|d k	rb|ndd | _d S )NFr   c                 S   s   d S r,   r   )namer:   r   r   r   <lambda>       z*FileTimerServer.__init__.<locals>.<lambda>)r.   _max_interval_daemon_timers_stop_signaled_watchdog_threadr2   pathexistsremovemkfifo_request_count	_run_once
_log_event)r   r+   rR   rS   rT   r   r   r   r      s    zFileTimerServer.__init__r   c                 C   s^   t dt| j d| j d| j  tj| j| jd| _	t d | j	
  | dd  d S )Nz	Starting z... max_interval=z	, daemon=)targetrS   zStarting watchdog thread...zwatchdog started)logginginfotyper!   rX   rY   	threadingThread_watchdog_loopr\   startrc   r   r   r   r   rk      s    

zFileTimerServer.startc                 C   sv   t dt| j  d| _| jrBt d | j| j d | _n
t d tj	
| jrft| j | dd  d S )Nz	Stopping TStopping watchdog thread...)No watchdog thread running, doing nothingzwatchdog stopped)re   rf   rg   r!   r[   r\   joinrX   r2   r]   r^   r.   r_   rc   r   r   r   r   stop   s    

zFileTimerServer.stopc                 C   sP   d| _ | jr(td | j  d | _n
td tj| jrLt	| j d S )NTrl   rm   )
rb   r\   re   rf   rn   r2   r]   r^   r.   r_   r   r   r   r   run_once   s    


zFileTimerServer.run_oncec                 C   sr   t | jd\}| jsdz| j}| | |r.W qdW q tk
r` } ztjd|d W 5 d }~X Y qX qW 5 Q R X d S )NrtzError running watchdogexc_info)r3   r.   r[   rb   _run_watchdogr7   re   error)r   r8   rp   er   r   r   rj      s    
zFileTimerServer._watchdog_loop)r8   r   c           
   	   C   s
  |  || j}| | t }t }| | D ]\}}td| d| 	|  |
| |jdd d d}d }|D ]}	|	jdkr~|	j}|	} qq~|dkrtd| d q4| ||rtd	| d
|  | d| q4td| d q4| | d S )NzReaping worker_pid=[z]. Expired timers: c                 S   s   | j S r,   )r   )timerr   r   r   rV      rW   z/FileTimerServer._run_watchdog.<locals>.<lambda>)keyr   z!No signal specified with worker=[z]. Do not reap it.zSuccessfully reaped worker=[z] with signal=zkill worker processzError reaping worker=[z]. Will retry on next watchdog.)_get_requestsrX   register_timerstimesetget_expired_timersitemsre   rf   _get_scopesaddsortr   _reap_workerrc   ru   clear_timers)
r   r8   timer_requestsnowZreaped_worker_pidsr   expired_timersr   Zexpired_timerrw   r   r   r   rt      s.    


zFileTimerServer._run_watchdog)r   r   c                 C   s   dd |D S )Nc                 S   s   g | ]
}|j qS r   )r   ).0rr   r   r   
<listcomp>  s     z/FileTimerServer._get_scopes.<locals>.<listcomp>r   )r   r   r   r   r   r      s    zFileTimerServer._get_scopes)r8   rR   r   c                 C   s   t   }g }| jr| jr| }t|dkrF| jr4qt t|d n@t|}|d }|d }|d }	|d }
|	t
|||	|
d t   }|| |krqq|S )Nr   r   r   r   r   r   rE   )r{   r[   rb   readliner=   sleepminr   loadsappendr   )r   r8   rR   rk   requestsrC   r:   r   r   r   r   r   r   r   r   ry     s2    
   zFileTimerServer._get_requestsc                 C   st   |D ]j}|j }|j}|j}|  jd7  _||f}|dk rX|| jkrn| j|= | d| q|| j|< | d| qd S )Nr   r   clear timerz	set timer)r   r   r   ra   rZ   rc   )r   r   r:   r   r   r   rx   r   r   r   rz   #  s    

zFileTimerServer.register_timers)worker_pidsr   c                 C   sF   t | j D ]2\}}||kr| d| j||f  | j||f= qd S )Nr   )listrZ   keysrc   )r   r   r   r   r   r   r   r   4  s    zFileTimerServer.clear_timers)deadliner   c                 C   s:   i }| j  D ]&}|j|kr||jg }|| q|S r,   )rZ   valuesr   
setdefaultr   r   )r   r   r   r:   Zexpired_scopesr   r   r   r}   :  s    
z"FileTimerServer.get_expired_timers)r   r   r   c              
   C   st   zt || W dS  tk
r:   td| d Y dS  tk
rn } ztjd| |d W 5 d }~X Y nX dS )NTzProcess with pid=z does not exist. SkippingzError terminating pid=rr   F)r2   killProcessLookupErrorre   rf   r7   ru   )r   r   r   rv   r   r   r   r   C  s    &zFileTimerServer._reap_worker)rQ   TN)r!   r"   r#   r$   r'   r(   r)   r   r   r   r   rk   ro   rp   rj   rN   rO   rt   r   r   ry   rz   r   r&   r   r   r}   r   r   r   r   r   r      s,       	)rN   r   re   r2   r>   r   rK   rh   r{   typingr   r   r   r   r   r   Z#torch.distributed.elastic.timer.apir   r	   __all__r   r
   r   r   r   r   r   <module>   s    
*F