U
    <cw                     @   sD  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZ d dlmZ d dlmZmZmZ d dlZd dlZd dl m!Z" d d	l#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z, ej-ej.d
 e/e0Z1G dd deZ2e2dde2dde2dde2dde2dde2dde2dde2dde2dde2dd e2d!d"e2d#d$e2d%d&e2d'd(e2d)d*e2d+d,d-Z3eG d.d/ d/Z4d0d1 Z5d2d3 Z6d4d5 Z7d6d7 Z8d8d9 Z9d:d; Z:d<d= Z;d>d? Z<d@dA Z=dBdC Z>dDdE Z?dFdG Z@dHdI ZAdJdK ZBdLdM ZCe)dNdOdPedQdRdPdSfdTdUZDe&rrdVZEneFeGdWdXZEdYdZiZHe%rd[eHd\< dvd]d^ZIeFd_d`daZJedbdc ZKdweFeFeFdddedfZLeFeMdgdhdiZNdaOeejP eQdj< dxeeM ddkdldmZRdd_dndoZSG dpdq dqe$ZTdaUeVd_drdsZWdtdu ZXdS )y    N)contextmanager)	dataclass)	timedelta)Enum)partialreducewraps)StringIO)
NamedTupleOptionalUnion)	TestCaseTEST_WITH_ROCMTEST_WITH_TSANFILE_SCHEMAfind_free_portretry_on_connect_failuresIS_SANDCASTLEsandcastle_skip_ifsandcastle_skip)levelc                   @   s   e Zd ZU eed< eed< dS )TestSkip	exit_codemessageN)__name__
__module____qualname__int__annotations__str r    r    N/tmp/pip-unpacked-wheel-gikjz4vx/torch/testing/_internal/common_distributed.pyr   ,   s   
r   H   z5Skipped because distributed backend is not available.I   z Skipped due to small world size.W   zSkipped due to odd world size.J   zCUDA is not available.K   zNeed at least 1 CUDA deviceM   zNeed at least 2 CUDA devicesP   zNeed at least 3 CUDA devicesQ   zNeed at least 4 CUDA devicesR   zNeed at least 5 CUDA devicesS   zNeed at least 6 CUDA devicesT   zNeed at least 7 CUDA devicesU   zNeed at least 8 CUDA devicesL   z#c10d not compiled with NCCL supportN   zTest skipped for ROCmO   z'Test skipped because no GPU peer accessV   zHTest skipped at subprocess level, look at subprocess log for skip reason)Zbackend_unavailablesmall_worldsizeodd_worldsizeno_cudazmulti-gpu-1zmulti-gpu-2zmulti-gpu-3zmulti-gpu-4zmulti-gpu-5zmulti-gpu-6zmulti-gpu-7zmulti-gpu-8nccl
skipIfRocmZno_peer_accessgenericc                   @   s   e Zd Zi Zdddhed< e ed< ddhed< ddhed< i Zddhed	< dddhed
< dddhed< dddhed< e ed< dS )DistTestCasesr5   mpiZuccZallgather_coalescedr   zsendrecv anysourcezcpu barrierZglooZgpucudaZddpZsubgroupZpluginN)r   r   r   Zskip_collectivesetZbackend_featurer    r    r    r!   r8   H   s   
r8   c                    s   t   fdd}|S )zSkips if the world size exceeds the number of GPUs, ensuring that if the
    test is run, each rank has its own GPU via ``torch.cuda.device(rank)``.c                     sV   t j sttd j ttj	d }t j
 |k rLttd|  j  | |S )Nr4   
WORLD_SIZE
multi-gpu-)torchr:   is_availablesysexit
TEST_SKIPSr   r   osenvirondevice_count)argskwargs
world_sizefuncr    r!   wrapper]   s    
zskip_if_no_gpu.<locals>.wrapperr   rJ   rK   r    rI   r!   skip_if_no_gpuZ   s    	rN   c                    s   t   fdd}|S )Nc                     s:   t jd dkr0tt jd dkr0ttd j  | |S )NBACKENDr9   r<      r2   rC   rD   r   r@   rA   rB   r   rF   rG   rI   r    r!   rK   k   s     z(skip_if_small_worldsize.<locals>.wrapperrL   rM   r    rI   r!   skip_if_small_worldsizej   s    rS   c                    s   t   fdd}|S )Nc                     s>   t jd dkr4tt jd d dkr4ttd j  | |S )NrO   r9   r<   rP      r3   rQ   rR   rI   r    r!   rK   u   s    $z&skip_if_odd_worldsize.<locals>.wrapperrL   rM   r    rI   r!   skip_if_odd_worldsizet   s    rU   c                    s    fdd}|S )Nc                    s   t   fdd}|S )Nc                     s<    dkr.t j k r.ttd  j n
| |S d S Nr5   r=   )r>   r:   rE   r@   rA   rB   r   rR   )backendrJ   nr    r!   rK      s    zCrequire_n_gpus_for_nccl_backend.<locals>.decorator.<locals>.wrapperrL   rM   rW   rX   rI   r!   	decorator   s    z2require_n_gpus_for_nccl_backend.<locals>.decoratorr    )rX   rW   rZ   r    rY   r!   require_n_gpus_for_nccl_backend~   s    
r[   c                    s    fdd}|S )Nc                    s   t   fdd}|S )Nc                     s<   t j r"t j kr" | |S ttd  j d S )Nr=   r>   r:   r?   rE   r@   rA   rB   r   rR   )rJ   xr    r!   rK      s    
z4skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapperrL   rM   r]   rI   r!   rZ      s    z#skip_if_lt_x_gpu.<locals>.decoratorr    )r]   rZ   r    r^   r!   skip_if_lt_x_gpu   s    	r_   c                    s    fdd}|S )Nc                    s   t   fdd}|S )Nc                     sN    dkr| |S t j r4t j kr4| |S ttd  j d S rV   r\   rR   )rW   rJ   r]   r    r!   rK      s
    

z9nccl_skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapperrL   rM   rW   r]   rI   r!   rZ      s    z(nccl_skip_if_lt_x_gpu.<locals>.decoratorr    )rW   r]   rZ   r    r`   r!   nccl_skip_if_lt_x_gpu   s    ra   c                 C   st   |   }d|kstd|ks td|ks,t|d }|ddkrF|n|dd }||ksptd| d| d S )	N	iterationZ	has_errorerrorz
Exception raised from r   zDid not find expected z in ddp logging data error: )Z_get_ddp_logging_dataAssertionErrorfindsplit)Z	model_DDPZ
err_substrZddp_logging_dataZlogging_erractualr    r    r!   verify_ddp_error_logged   s    ri   c                    s   t   fdd}|S )a2  
    Convenience decorator to set/unset NCCL_BLOCKING_WAIT flag. Note that use of
    this decorator will override the setting of NCCL_ASYNC_ERROR_HANDLING for
    the particular test. After the test, both NCCL_BLOCKING_WAIT and
    NCCL_ASYNC_ERROR_HANDLING will be restored to their original values.
    c               	      s   zt jd }t jd= W n tk
r.   d }Y nX z,zt jd }W n tk
rX   d }Y nX W 5 dt jd< X z | |}|W S |d k	r|t jd< |d k	r|t jd< X d S )NZNCCL_ASYNC_ERROR_HANDLING1ZNCCL_BLOCKING_WAIT)rC   rD   KeyError)rF   rG   Z cached_nccl_async_error_handlingZcached_nccl_blocking_waitretrI   r    r!   rK      s0    

z(with_nccl_blocking_wait.<locals>.wrapperrL   rM   r    rI   r!   with_nccl_blocking_wait   s    "rm   c                    s    fdd}|S )zK
    Runs a test for each distributed debug level specified in levels.
    c                    s   t   fdd}|S )Nc                     sR   t jdd }D ]:}|t jd< t   | |}t  |d k	r|t jd< q|S )NZTORCH_DISTRIBUTED_DEBUG)rC   rD   getc10dZset_debug_level_from_envbarrier)rF   rG   Z	old_levelr   rl   )rJ   levelsr    r!   rK      s    

z:with_dist_debug_levels.<locals>.decorator.<locals>.wrapperrL   rM   rq   rI   r!   rZ      s    z)with_dist_debug_levels.<locals>.decoratorr    )rq   rZ   r    rr   r!   with_dist_debug_levels   s    rs   c                   C   s   t t  dS )Nz+c10d was not compiled with the Gloo backend)r   ro   Zis_gloo_availabler    r    r    r!   requires_gloo   s    rt   c                 C   s<   t  stdS ttjj | k d| tjj |S d S )N+c10d was not compiled with the NCCL backendzIRequires NCCL version greater than or equal to: {}, found: {}, reason: {})	ro   is_nccl_availabler   r   r>   r:   r5   versionformat)rw   msgr    r    r!   requires_nccl_version  s     
 rz   c                   C   s   t t  dS )Nru   )r   ro   rv   r    r    r    r!   requires_nccl  s    r{   c                   C   s   t t  dS )Nz*c10d was not compiled with the MPI backend)r   ro   Zis_mpi_availabler    r    r    r!   requires_mpi  s    r|   c                    s   d _ t  fdd}|S )zSkips a test for ROCmTc                     s"   t s | |S ttd j d S )Nr6   )r   r@   rA   rB   r   rR   rI   r    r!   rK   '  s    
zskip_if_rocm.<locals>.wrapper)skip_if_rocmr   rM   r    rI   r!   r}   #  s    r}   c                   C   s   t tjdkdS )Nwin32z9This unit test case is not supportted on Windows platform)r   r@   platformr    r    r    r!   skip_if_win320  s    r   	localhostrT   T   )minutesFc                 C   sJ   t  }|r2t|tdd }tjj| ||||S tj| ||||dS dS )zL
    Creates a TCP store. Retries if the chosen port is already in use.
    rT   )Zmilliseconds)wait_for_workersN)r   r   r   r>   classesZ	dist_c10dZTCPStorero   )addrrH   	is_mastertimeoutr   Z	jit_classportZtimeout_millisecondr    r    r!   create_tcp_store7  s"            r   i  Z!DISTRIBUTED_TESTS_DEFAULT_TIMEOUTZ300Ztest_ddp_uneven_inputsi     Ztest_join_kwargsc                 C   s2   t jdks| d kr tjjddS tjj| dS d S )Nr~   z	127.0.0.1)hostnameZ	interface)r@   r   ro   ZProcessGroupGloocreate_devicer   r    r    r!   r   Z  s    r   returnc                 C   s   t | dd tS N.rd   )TIMEOUT_OVERRIDErn   rg   TIMEOUT_DEFAULT)Ztest_idr    r    r!   get_timeouta  s    r   c               	   c   sR   t  t   } }tjtj }}z | | t_t_tjtjfV  W 5 || t_t_X d S N)r	   r@   stdoutstderr)Znew_outZnew_errZold_outZold_errr    r    r!   captured_outpute  s    r   )rankrH   
num_inputsc              
      sx   dt t t t ddd}t ddd  fd	d
t|ddt|ddt|ddt|ddt|ddt|ddfD S )z
    Generate a number of basic test cases for sparse reduction.
    These cover tensors with a varying number of sparse dimensions and a varying
    number of dense dimensions. The only reduction operation we support is sum.
    rT   r   )r   rH   sparse_dims
dense_dimsc              	   S   s   t t | d d| d f}|gdd t|D  }t|d D ](}t |t d| d f}|| qBt | d gdd t|D  }t |||S )NrT   c                 S   s   g | ]}d qS rP   r    .0_r    r    r!   
<listcomp>|  s     z@simple_sparse_reduce_tests.<locals>.generate.<locals>.<listcomp>c                 S   s   g | ]}d qS r   r    r   r    r    r!   r     s     )	r>   ZreshapeZarangerangecatzerosappendZonesZsparse_coo_tensor)r   rH   r   r   indicesshaper   valuesr    r    r!   generatew  s    "z,simple_sparse_reduce_tests.<locals>.generate)rH   c                    s"   t dd  fddtD S )Nc                 S   s   | | S r   r    )abr    r    r!   <lambda>      zAsimple_sparse_reduce_tests.<locals>.compute_sum.<locals>.<lambda>c                    s   g | ]} |qS r    r    )r   r   fnrH   r    r!   r     s     zCsimple_sparse_reduce_tests.<locals>.compute_sum.<locals>.<listcomp>)r   r   r   r    r   r!   compute_sum  s     z/simple_sparse_reduce_tests.<locals>.compute_sumc                    sD   g | ]<  fd dt D  fddt D fqS )c                    s"   g | ]}  |  qS r    r    r   i)r   r   r   rH   r    r!   r     s   z9simple_sparse_reduce_tests.<locals>.<listcomp>.<listcomp>c                    s   g | ]}  qS r    r    r   )r   r   r   rH   r    r!   r     s     )r   )r   r   r   r   rH   r   r!   r     s   z.simple_sparse_reduce_tests.<locals>.<listcomp>)r   rP      )r   )rT   r   )r   r   )r   rH   r   r   r    r   r!   simple_sparse_reduce_testsp  s    	





r   )rH   rW   c                    sT   t j }t||dkr$dtjd< d | |kr8||    fddt| D }|S )zMultigpu tests are designed to simulate the multi nodes with multi
    GPUs on each node. Nccl backend requires equal #GPUs in each process.
    On a single node, all visible GPUs are evenly
    divided to subsets, each process only uses a subset.
    r5   rj   ZNCCL_MAX_NRINGSrT   c                    s*   i | ]"}|t |  |d     qS )rT   )listr   ZnGPUs_per_processZvisible_devicesr    r!   
<dictcomp>  s
    z(init_multigpu_helper.<locals>.<dictcomp>)r>   r:   rE   r   rC   rD   )rH   rW   ZnGPUsZrank_to_GPUr    r   r!   init_multigpu_helper  s    

r   tmp_dir)init_methodr   c                 C   s   t  atjtjd< ttjtjd ttjtjd tjtjd}t| | d k	rn| tjd< nt	tj|d tjd< d S )NZTEMP_DIRrp   Ztest_dirZinit_dirZINIT_METHODZshared_init_file)
tempfileTemporaryDirectoryr   namerC   rD   mkdirpathjoinr   )r   Zinit_dir_pathr    r    r!   initialize_temp_directories  s    
 r   c                   C   s   t d k	rt   d S r   )r   cleanupr    r    r    r!   cleanup_temp_dir  s    r   c                       s8  e Zd ZdZdZedddZeedddZ	dd	 Z
d.edd fddZdd fddZdd fddZedddZddddZddddZG dd deZeedddZeeeedddd Zedd!d"d#Zddd$d%Zddd&d'Zddd(d)Zddd*d+Zeedd,d-Z  ZS )/MultiProcessTestCaserd   
   r   c                 C   s   dS )NFr    selfr    r    r!   _should_stop_test_suite  s    z,MultiProcessTestCase._should_stop_test_suitec                 C   s   dS )N   r    r   r    r    r!   rH     s    zMultiProcessTestCase.world_sizec                    s    t   fdd}t|| S )Nc                    s"   | j | jkr|   n   d S r   )r   MAIN_PROCESS_RANK_join_processesr   r   r    r!   rK     s    z1MultiProcessTestCase.join_or_run.<locals>.wrapper)r   types
MethodType)r   r   rK   r    r   r!   join_or_run  s    z MultiProcessTestCase.join_or_runrunTestN)method_namer   c                    s,   t  | t| |}t| || | d S r   )super__init__getattrsetattrr   )r   r   r   	__class__r    r!   r     s    
zMultiProcessTestCase.__init__c                    s8   t    g | _g | _| j| _tjddj| _	i | _
d S )NF)delete)r   setUpskip_return_code_checks	processesr   r   r   NamedTemporaryFiler   	file_namepid_to_piper   r   r    r!   r     s    
zMultiProcessTestCase.setUpc                    s(   t    | jD ]}|  qg | _d S r   )r   tearDownr   	terminate)r   pr   r    r!   r     s    


zMultiProcessTestCase.tearDownc                 C   s   |   dd S r   )idrg   r   r    r    r!   _current_test_name  s    z'MultiProcessTestCase._current_test_namec                 C   s   g | _ tt| jD ]r}tj \}}|| jjdt	| || 
 | j|fd}|  td| d|j  || j|j< | j | qd S )Nzprocess )targetr   rF   zStarted process z
 with pid )r   r   r   rH   r>   multiprocessingPiper   _runr   r   r   startloggerinfopidr   r   )r   procr   Zparent_connZ
child_connprocessr    r    r!   _start_processes  s    
z%MultiProcessTestCase._start_processesc                 C   s   t jdj}| | d S )Nspawn)r>   r   Zget_contextProcessr   )r   r   r    r    r!   _spawn_processes)  s    z%MultiProcessTestCase._spawn_processesc                   @   s   e Zd ZdZdS )zMultiProcessTestCase.EventrT   N)r   r   r   GET_TRACEBACKr    r    r    r!   Event-  s   r   r   c              	   C   s   t d|  tj| |g}| |kr| jrDt d| d d S |  }t d| d|  |tjj	krt
jddB}t| |  |d | |  t d	| d
 W 5 Q R X ||krd S qd S )Nz(Starting event listener thread for rank Pipe closed for process z , stopping event listener threadzReceived event z on process zr+)moder   Process z sent traceback)r   r   r   
connectionwaitclosedrecvr   r   r   r   r   faulthandlerZdump_tracebackflushseeksendread)parent_pipeZsignal_piper   Zready_pipeseventZtmp_filer    r    r!   _event_listener0  s&    


z$MultiProcessTestCase._event_listener)r   	test_namer   r   c                 C   s8   ddl m} |d | |}||_||_||| d S )Nr   )_set_ddp_with_replicated_tensorT)Z.torch.nn.parallel._replicated_tensor_ddp_utilsr  r   r   run_test)clsr   r  r   r  r  r   r    r    r!   r   O  s    zMultiProcessTestCase._run)r  r   c                 C   sd  t jjdd\}}tjtj||| jfdd}|  t	j
dkrTt	j
dkrTt jd dtjd< zzt| |  W n tjk
r } z6td	| j d
| dt|  t	td j W 5 d }~X Y nb tk
r& } zBtdt  d| j dtj   |t  t	tj  W 5 d }~X Y nX W 5 |d k	r@|d  |d k	sNt|  |  X d S )NF)ZduplexT)r   rF   daemonr~   darwinrj   ZTORCH_SHOW_CPP_STACKTRACESr   z skipping test z for following reason: r7   zCaught exception: 
z exiting process z with exit code: )!r>   r   r   	threadingThreadr   r  r   r   r@   r   Z_CZ'_set_print_stack_traces_on_fatal_signalrC   rD   r  re   r   closer   unittestSkipTestr   r   r   rA   rB   r   	Exceptionrc   	traceback
format_excTEST_ERROR_EXIT_CODE)r   r  r  Zsignal_recv_pipeZsignal_send_pipeZevent_listener_threadseer    r    r!   r  [  s8    

 "

zMultiProcessTestCase.run_testc                 C   s0  g }t | jD ]x\}}|jd kr| j|j }z |tjj |	||f W q t
k
r } ztd| d|  W 5 d }~X Y qX q|D ]\}}zZ|dr|jrtd| d W q| }td| d|  ntd|  W q t
k
r( } ztd| d|  W 5 d }~X Y qX qd S )	Nz<Encountered error while trying to get traceback for process z: r   r   z, cannot retrieve tracebackr   z timed out with traceback: 

z4Could not retrieve traceback for timed out process: )	enumerater   exitcoder   r   r  r   r   r   r   ConnectionErrorr   rc   pollr   r   r   )r   Zpipesr   r   piper  r   r  r    r    r!   _get_timedout_process_traceback  s<    


z4MultiProcessTestCase._get_timedout_process_tracebackc              	   C   s6  t |  }t }d}zt| jD ]P\}}|jt	j
kr$td| d|j d tj }	|	D ]}
|
  q^d} qvq$|r|qtdd | jD rqt | }||kr|   td| d	 | jD ]}|  qqtd
 qt | }|| jkr| | n
| | W 5 | j D ]\}}|  qX d S )NFr   z terminated with exit code z", terminating remaining processes.Tc                 S   s   g | ]}|j d k	qS r   )r  )r   r   r    r    r!   r     s     z8MultiProcessTestCase._join_processes.<locals>.<listcomp>zTiming out after z" seconds and killing subprocesses.g?)r   r   timer   itemsr  r  r   r  r   r  printr>   r   active_childrenr   allr  sleepr   _check_no_test_errors_check_return_codes)r   r   r   
start_timeZsubprocess_errorr   r  r   r   r   acelapsedelapsed_timer    r    r!   r     sD    




z$MultiProcessTestCase._join_processesc                 C   sB   t | jD ]2\}}|jdkr,td||| | j|j q
dS )zV
        Checks that we didn't have any errors thrown in the child processes.
        Nz%Process {} timed out after {} seconds)r  r   r  RuntimeErrorrx   ZassertNotEqualr  )r   r(  r   r   r    r    r!   r#    s    

z*MultiProcessTestCase._check_no_test_errorsc           
   	   C   s  | j d }dd t| j D }|r`d}|D ],\}}| j|j  }|d|tj|7 }q*t|t| j D ]D\}}|j	dkrtd||| j
|j	|j	d||j	|j	d	 qjt D ]B}	|j	|	jkrtrtd
|   d|	j   dS t|	jq| j
|j	dd|j	|jd	 dS )z
        Checks that the return codes of all spawned processes match, and skips
        tests if they returned a return code indicating a skipping condition.
        r   c                 S   s$   g | ]\}}|j tjkr||fqS r    )r  r   r  )r   r   r   r    r    r!   r     s   z<MultiProcessTestCase._check_return_codes.<locals>.<listcomp> z7Process {} exited with error code {} and exception:
{}
Nz3Process {} terminated or timed out after {} secondszJExpect process {} exit code to match Process 0 exit code of {}, but got {})ry   z	Skipping z) on sandcastle for the following reason: z.Expected zero exit code but got {} for pid: {})r   r  r   r   r   rx   r   r  r)  r  assertEqualrB   r   r   r   r   r   r   r   r  r  )
r   r(  Zfirst_processZerrored_processesrc   r   r   error_messager   skipr    r    r!   r$    s\    
  
   z(MultiProcessTestCase._check_return_codesc                 C   s
   | j dkS )Nr   r   r   r    r    r!   r   %  s    zMultiProcessTestCase.is_master)r   )r   r   r   r   r  boolr   propertyr   rH   r   r   r   r   r   r   r   r   r   r   staticmethodr  classmethodr   r  r  r   r#  r$  r   __classcell__r    r    r   r!   r     s0   	
*%0@r   c                   C   sH   t dk	rt S ztdddddgjdka W n tk
rB   da Y nX t S )	a   
    If shell command `fi_info -p efa -t FI_EP_RDM` returns exit code 0 then we assume that the machine has
    Libfabric EFA interfaces and EFA software components installed,
    see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html.
    NZfi_infoz-pZefaz-tZ	FI_EP_RDMr   F)EFA_PROBE_RESULT
subprocessrun
returncodeFileNotFoundErrorr    r    r    r!   has_efa,  s    
r8  c                   C   s   t  rddgS dS )a  
    If the machine has Libfabric EFA interfaces and EFA software components installed it may cause
    'RuntimeError: In operator() at tensorpipe/common/ibv.h:172 "": Operation not supported' if tensorpipe
    uses InfiniBand transport, so we exclude it from tensorpipe transports,
    see https://github.com/pytorch/pytorch/issues/73885 and https://github.com/pytorch/pytorch/issues/65022
    ZshmZuvN)r8  r    r    r    r!   tp_transports=  s    r9  )N)rT   )N)Yr   loggingr   rC   r@   r   r  r4  r  r  r   r  
contextlibr   Zdataclassesr   datetimer   enumr   	functoolsr   r   r   ior	   typingr
   r   r   r>   Ztorch.cuda.ncclZtorch.distributedZdistributedro   Z$torch.testing._internal.common_utilsr   r   r   r   r   r   r   r   r   basicConfigINFO	getLoggerr   r   r   rB   r8   rN   rS   rU   r[   r_   ra   ri   rm   rs   rt   rz   r{   r|   r}   r   r   r   r   getenvr   r   r   r   r   r   r   r   r   r   r   r   r   r3  r.  r8  r9  r    r    r    r!   <module>   s    ,
  

.


,   K