U
    Jci4                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	m
Z
mZ d dlm  m  mZ d dlmZ d dlmZmZmZmZmZ d dlmZ d dlmZ d dlmZmZ d dlm Z  d d	l!m"Z" e" Z#d
ddgZ$dZ%dZ&G dd
 d
eZ'dS )    N)AnyDictOptionalTuple)events)	RunResultSimpleElasticAgentWorkerGroup
WorkerSpecWorkerState)EventMetadataValue)prof)PContextstart_processes)macros)
get_loggerLocalElasticAgentTORCHELASTIC_ENABLE_FILE_TIMERTORCHELASTIC_TIMER_FILEc                       s   e Zd ZdZdeeee d fddZee edd	d
Z	e
ee
eef f ddddZedddZeeej ddddZeeddddZeee
eef d fddZejfejddddZeeedddZ  ZS )r   aQ  
    An implementation of :py:class:`torchelastic.agent.server.ElasticAgent`
    that handles host-local workers.
    This agent is deployed per host and is configured to spawn ``n`` workers.
    When using GPUs, ``n`` maps to the number of GPUs available on the host.

    The local agent does not communicate to other local agents deployed on
    other hosts, even if the workers may communicate inter-host. The worker id
    is interpreted to be a local process. The agent starts and stops all worker
    processes as a single unit.


    The worker function and argument passed to the worker function must be
    python multiprocessing compatible. To pass multiprocessing data structures
    to the workers you may create the data structure in the same multiprocessing
    context as the specified ``start_method`` and pass it as a function argument.

    The ``exit_barrier_timeout`` specifies the amount of time (in seconds) to wait
    for other agents to finish. This acts as a safety net to handle cases where
    workers finish at different times, to prevent agents from viewing workers
    that finished early as a scale-down event. It is strongly advised that the
    user code deal with ensuring that workers are terminated in a synchronous
    manner rather than relying on the exit_barrier_timeout.

    A named pipe based watchdog can be enabled in ```LocalElasticAgent``` if an
    environment variable ``TORCHELASTIC_ENABLE_FILE_TIMER`` with value 1 has
    been defined in the ```LocalElasticAgent``` process.
    Optionally, another environment variable ```TORCHELASTIC_TIMER_FILE```
    can be set with a unique file name for the named pipe. If the environment
    variable ```TORCHELASTIC_TIMER_FILE``` is not set, ```LocalElasticAgent```
    will internally create a unique file name and set it to the environment
    variable ```TORCHELASTIC_TIMER_FILE```, and this environment variable will
    be propagated to the worker processes to allow them to connect to the same
    named pipe that ```LocalElasticAgent``` uses.

    Example launching function

    ::

        def trainer(args) -> str:
            return "do train"

        def main():
            start_method="spawn"
            shared_queue= multiprocessing.get_context(start_method).Queue()
            spec = WorkerSpec(
                        role="trainer",
                        local_world_size=nproc_per_process,
                        entrypoint=trainer,
                        args=("foobar",),
                        ...<OTHER_PARAMS...>)
            agent = LocalElasticAgent(spec, start_method)
            results = agent.run()

            if results.is_failed():
                print("trainer failed")
            else:
                print(f"rank 0 return value: {results.return_values[0]}")
                # prints -> rank 0 return value: do train

    Example launching binary

    ::

        def main():
            spec = WorkerSpec(
                        role="trainer",
                        local_world_size=nproc_per_process,
                        entrypoint="/usr/local/bin/trainer",
                        args=("--trainer_args", "foobar"),
                        ...<OTHER_PARAMS...>)
            agent = LocalElasticAgent(spec)
            results = agent.run()

            if not results.is_failed():
                print("binary launches do not have return values")

    spawn,  N)specexit_barrier_timeoutlog_dirc                    s<   t  || || _d | _|j }| ||| _d | _d S N)	super__init___start_method	_pcontextrdzv_handler
get_run_id_make_log_dir_log_dir_worker_watchdog)selfr   start_methodr   r   rdzv_run_id	__class__ ^/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/elastic/agent/server/local_elastic_agent.pyr   ~   s    
zLocalElasticAgent.__init__)r   r&   c                 C   sF   |pt jdd}tj|dd t j| d|d}td|  |S )NZtorchelastic_)prefixT)exist_ok_)r+   dirzlog directory set to: )tempfilemkdtemposmakedirsloginfo)r$   r   r&   Zbase_log_dirr.   r)   r)   r*   r!      s
    zLocalElasticAgent._make_log_dir)envsreturnc                 C   s   t }t|}t}t|}|d k	rt|dkr|d krHdtt  }td| d t	j
|dd| jd| _| j  td ntd	| d
 |d k	r| D ]\}}|||< qd S )N1z/tmp/watchdog_timer_z Starting a FileTimerServer with z ...g?T)	file_pathZmax_intervaldaemonZ	log_eventzFileTimerServer startedzEnvironment variable 'z*' not found. Do not start FileTimerServer.)r   r1   getenvr   struuidZuuid4r3   r4   timerZFileTimerServer_log_watchdog_eventr#   startitems)r$   r5   Zenable_watchdog_env_nameZwatchdog_enabledZwatchdog_file_env_nameZwatchdog_file_pathr-   
worker_envr)   r)   r*   _setup_local_watchdog   s(    


z'LocalElasticAgent._setup_local_watchdog)r6   c                 C   s   t t  S r   )socketgetfqdngethostname)r$   r)   r)   r*   _get_fq_hostname   s    z"LocalElasticAgent._get_fq_hostname)namerequestr6   c           
      C   s   | j }|j}d|i}|d k	rPt|j|d< |j|d< t|j|d< t|j|d< t|}d}|j	
 d |jd |j|  || j|j	 d ||j| j d}tj|tjj|d}	t|	 d S )	NZwatchdog_event
worker_pidscope_idexpiration_timesignalZRUNNING)Zrun_idglobal_rank
group_rankZ	worker_idrolehostnamestateZtotal_run_timeZrdzv_backendZ	raw_errormetadataZagent_restarts)rG   sourcerR   )Z_worker_groupr   r;   rI   rJ   rK   rL   jsondumpsr   r    rN   rO   rF   Z_total_execution_timeget_backendmax_restarts_remaining_restartsr   EventZEventSourceZAGENTrecord)
r$   rG   rH   Zwgr   ZmdZmd_strrQ   rR   eventr)   r)   r*   r>      s>     


  z%LocalElasticAgent._log_watchdog_event)worker_groupr6   c                 C   s   |    d S r   )	_shutdown)r$   r\   r)   r)   r*   _stop_workers   s    zLocalElasticAgent._stop_workersc                    s  |j }|j}|d k	stt |\}}|j| j }|j dk}i }i }	|j	D ]}
|
j
}t|t|
jt|jt|
j|jt|jt|
jt|jt|
j|t|t|t|j|j t|tdtdd}dtjkrtjd |d< ||	|< t|j}t|t|}t|||< qPtj| jd| }t j!|dd t"| | j#|	d	 |j$d k	sftt%|j|j$||	|| j&|j'|j(d
| _)| j)* S )NZstaticNCCL_ASYNC_ERROR_HANDLING   )Z
LOCAL_RANKZRANKZ
GROUP_RANKZ	ROLE_RANKZ	ROLE_NAMEZLOCAL_WORLD_SIZEZ
WORLD_SIZEZGROUP_WORLD_SIZEZROLE_WORLD_SIZEZMASTER_ADDRZMASTER_PORTZTORCHELASTIC_RESTART_COUNTZTORCHELASTIC_MAX_RESTARTSZTORCHELASTIC_RUN_IDZTORCHELASTIC_USE_AGENT_STOREr_   ZOMP_NUM_THREADSZattempt_T)ignore_errors)r5   )rG   
entrypointargsr5   r   r%   	redirectstee)+r   storeAssertionErrorr   Z_get_master_addr_portrW   rX   r   rV   workers
local_rankr;   rM   rN   Z	role_rankrO   Zlocal_world_sizeZ
world_sizeZgroup_world_sizeZrole_world_sizer    r1   r:   environlistrc   r   
substitutetuplepathjoinr"   shutilrmtreer2   rB   rb   r   r   rd   re   r   pids)r$   r\   r   rf   Zmaster_addrZmaster_portZrestart_countZuse_agent_storerc   r5   workerri   rA   Zworker_argsZattempt_log_dirr'   r)   r*   _start_workers   sh    
 


z LocalElasticAgent._start_workers)	death_sigr6   c                 C   s0   | j d k	r| j   d | _ | jr,| j| d S r   )r#   stopr   close)r$   ru   r)   r)   r*   r]     s
    

zLocalElasticAgent._shutdownc                 C   s  |j j}dd |jD }| jd k	s&tt| j  }||krht	d| d| d|  t
tjdS | jd}|r| ri }|j D ]\}}|j| }	|||	j< qt
tj|dS i }
|j D ]\}}|j| }	||
|	j< qt
tj|
d	S nt
tjdS d S )
Nc                 S   s   h | ]
}|j qS r)   )id).0wr)   r)   r*   	<setcomp>(  s     z5LocalElasticAgent._monitor_workers.<locals>.<setcomp>[z;] worker pids do not match process_context pids. Expected: z
, actual: )rQ   r   )rQ   failures)rQ   return_values)r   rO   rh   r   rg   setrr   valuesr3   errorr   r   UNKNOWNwaitZ	is_failedr}   r@   rM   ZFAILEDr~   Z	SUCCEEDEDZHEALTHY)r$   r\   rO   Zworker_pidsZpc_pidsresultZworker_failuresri   Zfailurers   Zworkers_ret_valsZret_valr)   r)   r*   _monitor_workers%  s:    

z"LocalElasticAgent._monitor_workers)r   r   N)__name__
__module____qualname____doc__r
   floatr   r;   r   r!   r   intrB   rF   r=   ZFileTimerRequestr>   r   r	   r^   r   rt   rL   SIGTERMSignalsr]   r   r   __classcell__r)   r)   r'   r*   r   .   s.   R    (?	)(rT   r1   rp   rL   rC   r/   r<   typingr   r   r   r   Ztorch.distributed.elastic.timerZdistributedZelasticr=   Ztorch.distributed.elasticr   Z*torch.distributed.elastic.agent.server.apir   r   r	   r
   r   Z$torch.distributed.elastic.events.apir   Z%torch.distributed.elastic.metrics.apir   Z)torch.distributed.elastic.multiprocessingr   r   Ztorch.distributed.elastic.utilsr   Z'torch.distributed.elastic.utils.loggingr   r3   __all__r   r   r   r)   r)   r)   r*   <module>
   s.   