U
    <c
                     @   s  d dl Z d dlZd dlZd dlmZmZ d dlmZ d dlm	Z	 d dl
mZmZ d dlmZ d dlmZmZmZmZmZmZmZmZ d dlmZ d dlZd dlmZ d dlmZ d d	l m!Z! d d
l m"Z# d dl$m%Z%m&Z&m'Z'm(Z( d dl)m*Z* d dl+m,Z,m-Z-m.Z. d dlm/Z/m0Z0 d dl1m2Z3 d dl4m5Z5m6Z6 d dl7m8Z8m9Z9 G dd deZ:G dd deZ;G dd dej<eZ=ej<ej>edddZ?dKej<e@dddZAdLdd ZBd!d" ZCdMej<e@d$d%d&ZDej<e@d'd(d)ZEej<e@d*d+d,ZFG d-d. d.ZGG d/d0 d0ejj<ZHG d1d2 d2e=ZIG d3d4 d4e=ZJG d5d6 d6eJZKG d7d8 d8e=ZLG d9d: d:eLZMG d;d< d<ej<ZNG d=d> d>eJZOG d?d@ d@e6ZPG dAdB dBej<ZQG dCdD dDej<ZRG dEdF dFej<ZSdGdH ZTdIdJ ZUdS )N    N)ABCabstractmethod)suppress)deepcopy)Enumauto)inf)AnyCallableDictListOptionalTupleTypeUnion)mock)
CPUOffload)FullyShardedDataParallel)BackwardPrefetchMixedPrecisionShardingStrategyTrainingState_)ShardedGradScaler)always_wrap_policytransformer_auto_wrap_policywrap)TransformerDecoderLayerTransformerEncoderLayer)DistributedDataParallel)
TEST_SKIPSMultiProcessTestCase)FILE_SCHEMAget_cycles_per_msc                   @   s   e Zd Ze Ze ZdS )FSDPInitModeN)__name__
__module____qualname__r   NO_FSDP	RECURSIVE r)   r)   G/tmp/pip-unpacked-wheel-gikjz4vx/torch/testing/_internal/common_fsdp.pyr#   (   s   r#   c                   @   s   e Zd Ze Ze Ze ZdS )CUDAInitModeN)r$   r%   r&   r   CUDA_BEFORE
CUDA_AFTERZ
CUDA_NEVERr)   r)   r)   r*   r+   1   s   r+   c                   @   s   e Zd ZdZeeejdf dddZeejdddZ	eddd	d
Z
eedddejeeeeeeef  eeejdddZdS )FSDPTestModelzZThis defines the interface expected from all models used commonly for
    FSDP unit tests..returnc                 C   s   dS )z+Returns an input for the model as as tuple.Nr)   selfdevicer)   r)   r*   	get_input=   s    zFSDPTestModel.get_inputc                 C   s   dS )z,Returns the loss given the input and output.Nr)   )r2   inputoutputr)   r)   r*   get_lossB   s    zFSDPTestModel.get_lossNc                 C   s   dS )z<Runs the backward pass (e.g. including ``loss.backward()``).Nr)   r2   lossr)   r)   r*   run_backwardG   s    zFSDPTestModel.run_backwardF)fsdp_kwargsdeterministic)groupfsdp_init_mode	init_argscuda_init_moder;   r<   init_kwargsr0   c                O   s   dS )z&Initializes an instance of this model.Nr)   )r=   r>   r@   r;   r<   r?   rA   r)   r)   r*   initL   s    zFSDPTestModel.init)r$   r%   r&   __doc__r   r   torchTensorr4   r7   r:   staticmethoddistProcessGroupr#   r	   r+   r   r   strboolnnModulerB   r)   r)   r)   r*   r.   :   s*   r.   )modelprocess_group	assert_fnc                 C   s   dd |   D }|dd |  D 7 }t|}dd t|D }tj|||d |d }|dd D ]*}t||D ]\\}}	\}}
||	|
 qvqhdS )	a  
    All-gathers module states across ranks and calls ``assert_fn`` on each pair
    of corresponding states from rank 0 and a nonzero rank. For example, if
    ``assert_fn`` is ``self.assertEqual()``, then this checks that all module
    states are equal across ranks.
    c                 S   s    g | ]\}}||   fqS r)   detachcpu).0
param_nameparamr)   r)   r*   
<listcomp>h   s   z)_assert_module_states.<locals>.<listcomp>c                 S   s    g | ]\}}||   fqS r)   rP   )rS   Zbuffer_namebufferr)   r)   r*   rV   l   s   c                 S   s   g | ]}d qS Nr)   )rS   _r)   r)   r*   rV   q   s     r=   r      N)Znamed_parametersZnamed_buffersrG   Zget_world_sizerangeZall_gather_objectzip)rM   rN   rO   Znamed_module_states
world_sizeZolistZrank0_statesstaterY   p1p2r)   r)   r*   _assert_module_states\   s    
rb   F)rM   zero_buffersc                 C   sr   t | ^ |  D ] }t  |  W 5 Q R X q|rd|  D ] }t  |  W 5 Q R X qBW 5 Q R X dS )zBZeros the parameters and optionally buffers of ``model`` in place.N)FSDPsummon_full_params
parametersrD   no_gradZzero_buffers)rM   rc   rU   rW   r)   r)   r*   _zero_modelx   s    

ri   c                 C   s    |s|   } |r|   |  S rX   )cudahalf
state_dict)rM   cpu_offloadrk   r)   r)   r*   _get_state_dict   s
    rn   c                    s   d  fdd|D S )NrY   c                    s$   g | ]}|d k	r t | ndqS )Nnone)rI   )rS   stest_name_mappingr)   r*   rV      s     z subtest_name.<locals>.<listcomp>)join)rr   argsr)   rq   r*   subtest_name   s    ru   TrM   recursec              
   C   s6   t j| |d tt|  W  5 Q R  S Q R X dS )a[  
    Returns the full unsharded parameters of ``model``. Any FSDP-managed
    parameters offloaded to CPU are moved to GPU in the returned list.

    Args:
        recurse (bool): If ``False``, only unshards the parameters immediate to
            ``model``; if ``True``, recurses through the module hierarchy
            rooted at ``model``.
    )rw   N)rd   re   r   listrf   rv   r)   r)   r*   get_full_params   s    
ry   rM   move_to_cudac                 C   s   |r|   S | S rX   )rj   rz   r)   r)   r*   _maybe_cuda   s    r|   )rM   	wrap_fsdpc                 O   s   |s| S t | f||S rX   rd   )rM   r}   rt   kwargsr)   r)   r*   _maybe_wrap_fsdp   s    r   c                   @   s@   e Zd ZeedddZedddZedddZd	d
 ZdS )DummyProcessGroup)ranksizec                 C   s   || _ || _d S rX   )_rank_size)r2   r   r   r)   r)   r*   __init__   s    zDummyProcessGroup.__init__r/   c                 C   s   | j S rX   )r   r2   r)   r)   r*   r      s    zDummyProcessGroup.rankc                 C   s   | j S rX   )r   r   r)   r)   r*   r      s    zDummyProcessGroup.sizec                 O   s   t  }dd }||_|S )Nc                  S   s   t j } | d | S )Nr[   )rD   ZfuturesZFutureZ
set_result)futurer)   r)   r*   
get_future   s    

z/DummyProcessGroup.allreduce.<locals>.get_future)r   ZMockr   )r2   rt   r   Z	dist_waitr   r)   r)   r*   	allreduce   s    zDummyProcessGroup.allreduceN)r$   r%   r&   intr   r   r   r   r)   r)   r)   r*   r      s   r   c                       s.   e Zd Zeddf fdd	Zdd Z  ZS )DeterministicModelF)offload_paramsc                    sT   t    td tjdd | _|r<t| j|d| _tjdd | _	d S )Nr      )rm   )
superr   rD   manual_seedrK   Linearrj   innerrd   outer)r2   r}   rm   	__class__r)   r*   r      s    

zDeterministicModel.__init__c                 C   s   |  |}| |S rX   )r   r   )r2   xyr)   r)   r*   forward   s    
zDeterministicModel.forward)r$   r%   r&   r   r   r   __classcell__r)   r)   r   r*   r      s   
r   c                       s   e Zd Zejeeed fddZdd Zdd Z	dd	 Z
d
d Zedejeeeeeef  eeeejef dddZdd Z  ZS )TransformerWithSharedParams)r=   r@   add_bnr<   c                    s   t    | | _| | _|r,td d}d}t||| _	tj
|ddddd| _t||| _| j	j| j_| d| j	j|f | d	tj| jtjd
 d| _|rtj| jntj | _|tjkr|  } |r|   d S )Nr         r      g?)d_modelZnum_encoder_layersZnum_decoder_layersZdim_feedforwardZdropout
vocab_biaslong_bufferdtype)r   r   r   r   r^   rD   r   rK   Z	Embeddingembed_tokensZTransformertransformerr   output_projZweightZregister_bufferZnew_onesZ
zeros_liker   longbsZBatchNorm1dZIdentitybnr+   r,   rj   eval)r2   r=   r@   r   r<   Zd_vocabr   r   r)   r*   r      s>    



 
z$TransformerWithSharedParams.__init__c                 C   sN   t d| j  t jd|dd| j}t j| jd |dd| j}||fS )Nr[      r3         )rD   r   r   Zarangeviewr   )r2   r3   srctgtr)   r)   r*   r4      s    z%TransformerWithSharedParams.get_inputc                 C   sJ   |  |}|| j | j| }|  |}| |}| ||}| |S rX   )r   r   r   Ztype_asr   r   r   )r2   Zsrc_idsZtgt_idsr   r   r   r)   r)   r*   r      s    


z#TransformerWithSharedParams.forwardc                 C   s.   |\}}t jj|d|d|dddS )Nsum)Z	reduction)rK   Z
functionalZcross_entropyr   r   )r2   r5   r6   rY   r   r)   r)   r*   r7     s      z$TransformerWithSharedParams.get_lossc                 C   s   |   d S rX   backwardr8   r)   r)   r*   r:     s    z(TransformerWithSharedParams.run_backwardNFT)r=   r>   r@   r;   r<   r   r0   c                 C   s   |dkri }|t jkr$t| |||S |t jkrd|krJtjttthd}n
|	d}t
t| |||| fd|i|}|tjkr| }|S td| dS )a}  
        Initializes a :class:`TransformerWithSharedParams` instance.

        Args:
            fsdp_init_mode (FSDPInitMode): If ``NO_FSDP``, then does not wrap
                any modules with FSDP. If ``RECURSIVE``, then wraps with
                top-level FSDP. By default, the top-level FSDP uses the
                ``transformer_auto_wrap_policy()`` for encoder and decoder
                layers, but a different auto wrap policy may be specified via
                ``fsdp_kwargs``.
            cuda_init_mode (CUDAInitMode): Determines model movement to CUDA.
            fsdp_kwargs (Optional[Dict[str, Any]]): Optional keyword arguments
                forwarded to the FSDP constructor.
            deterministic (bool): Whether to make the model deterministic
                across constructions.
            add_bn (bool): Whether to include batch norm in the model.
        Nauto_wrap_policy)Ztransformer_layer_clsUnsupported FSDP init mode: )r#   r'   r   r(   	functoolspartialr   r   r   poprd   r+   r-   rj   
ValueError)r=   r>   r@   r;   r<   r   r   
fsdp_modelr)   r)   r*   rB     s2    



z TransformerWithSharedParams.initc                 C   s   | j gS rX   )r   r   r)   r)   r*   get_ignored_modulesE  s    z/TransformerWithSharedParams.get_ignored_modules)NFT)r$   r%   r&   rG   rH   r+   rJ   r   r4   r   r7   r:   rF   r#   r   r   rI   r	   r   rK   rL   rd   rB   r   r   r)   r)   r   r*   r      s.   *   4r   c                	       sx   e Zd Zejeeed fddZdd Zdd Z	dd	 Z
d
d Zedejeeeeeef  eejdddZ  ZS )NestedWrappedModuler=   r}   r@   r<   c                    s   t     | _ | _|tjk} fdd}|rFtd t	
tt	dd||t	
|tt	dd|tt	dd||tt	dd|tt	dd|| _d S )Nc                    s   rt | f S | S rX   r~   )Zlayerr;   r=   r}   r)   r*   _maybe_wrapW  s    z1NestedWrappedModule.__init__.<locals>._maybe_wrapr   r   r   r   )r   r   r   r   r^   r+   r,   rD   r   rK   
Sequentialr|   r   module)r2   r=   r}   r@   r<   r;   r{   r   r   r   r*   r   J  s$    




zNestedWrappedModule.__init__c                 C   s"   t d| j  t jdd|dfS )Nr[   r   r   r   )rD   r   r   Zrandr1   r)   r)   r*   r4   j  s    zNestedWrappedModule.get_inputc                 C   s
   |  |S rX   r   r2   r   r)   r)   r*   r   n  s    zNestedWrappedModule.forwardc                 C   s   |  }|S rX   )r   r2   r5   r6   r9   r)   r)   r*   r7   q  s    zNestedWrappedModule.get_lossc                 C   s   |   d S rX   r   r8   r)   r)   r*   r:   u  s    z NestedWrappedModule.run_backwardNF)r=   r>   r@   r;   r<   r0   c                 C   sp   |dkri }|t jkr&t| d||dS |t jkr^t| fd||d|}|tjkrZ| }|S td| dS )a  
        Initializes a :class:`NestedWrappedModule` instance.

        Args:
            fsdp_init_mode (FSDPInitMode): If ``NO_FSDP``, then does not wrap
                any modules with FSDP. If ``RECURSIVE``, then wraps some nested
                modules with FSDP but not the top-level module. The model may
                later be wrapped with a top-level FSDP external to this method
                if desired.
            cuda_init_mode (CUDAInitMode): Determines model movement to CUDA.
            fsdp_kwargs (Optional[Dict[str, Any]]): Optional keyword arguments
                forwarded to the FSDP constructor.
            deterministic (bool): Whether to make the model deterministic
                across constructions.
        NF)r}   r@   r<   Tr   )r#   r'   r   r(   r+   r-   rj   r   )r=   r>   r@   r;   r<   r   r)   r)   r*   rB   x  s.    


zNestedWrappedModule.init)NF)r$   r%   r&   rG   rH   rJ   r+   r   r4   r   r7   r:   rF   r#   r   r   rI   r	   rK   rL   rB   r   r)   r)   r   r*   r   I  s(      r   c                	       s>   e Zd Zedejeeee	e
ef  ed fddZ  ZS )AlwaysWrapNestedWrappedModuleNFr=   r>   r@   r;   r<   c                    sf   t tt}|j| tj|||d}|tjkr.|S |tjkrbt|fdti|}|tj	kr^|
 }|S dS )z
        Initializes a :class:`NestedWrappedModule` instance, but unlike
        :meth:`NestedWrappedModule.init`, for the ``RECURSIVE`` init mode, this
        wraps with top-level FSDP and the ``always_wrap_policy()`` auto wrap
        policy.
        r   r   N)r   r   rB   r#   r'   r(   rd   r   r+   r-   rj   )r=   r>   r@   r;   r<   Zsuper_rM   r   r   r)   r*   rB     s    



z"AlwaysWrapNestedWrappedModule.init)NF)r$   r%   r&   rF   rG   rH   r#   r+   r   r   rI   r	   rJ   rB   r   r)   r)   r   r*   r     s     r   c                       sj   e Zd ZdZejeed fddZdd Zdd Z	d	d
 Z
dd Zeee eeeedddZ  ZS )ModuleWithDelayzThis class wraps a :class:`FSDPTestModel` to optionally add a delay
    after computing the loss and/or before the gradient reduction.)r   delay_after_loss_msdelay_before_reduction_msc                    s    t    || _|| _|| _d S rX   )r   r   r   r   r   )r2   r   r   r   r   r)   r*   r     s    
zModuleWithDelay.__init__c                 C   s   | j |S rX   )r   r4   r1   r)   r)   r*   r4     s    zModuleWithDelay.get_inputc                 C   s
   |  |S rX   r   r   r)   r)   r*   r     s    zModuleWithDelay.forwardc                 C   s4   | j ||}| jdkr0tjt| jt   |S Nr   )r   r7   r   rD   rj   _sleepr   r"   r   r)   r)   r*   r7     s    
zModuleWithDelay.get_lossc              	      s>   t jj  fdd}td| j| W 5 Q R X d S )Nc                     s,   j dkr"tjtj t    | |S r   )r   rD   rj   r   r   r"   )rt   r   Zorig_reduce_scatterr2   r)   r*   _delayed_reduce_scatter  s
    
z=ModuleWithDelay.run_backward.<locals>._delayed_reduce_scatterz&torch.distributed._reduce_scatter_base)rD   distributedZ_reduce_scatter_baser   patchr   r:   )r2   r9   r   r)   r   r*   r:     s     zModuleWithDelay.run_backward)module_class
model_argsr   r   model_kwargsc                O   s   t | j||||S )aA  
        Args:
            module_class (Type[FSDPTestModel]): Wrapped module class to which
                to add delays.
            model_args: Positional arguments forwarded to the ``module_class``
                ``init()``.
            delay_after_loss_ms (int): Delay after computing the loss/before
                the optimizer step (in ms).
            delay_before_reduction_ms (int): Delay before reduce-scattering
                gradients (in ms).
            model_kwargs: Keyword arguments forwarded to the ``module_class``
                ``init()``.
        )r   rB   )r   r   r   r   r   r)   r)   r*   rB     s
    
zModuleWithDelay.init)r$   r%   r&   rC   rK   rL   r   r   r4   r   r7   r:   rF   r   r.   r	   rB   r   r)   r)   r   r*   r     s    r   c                
       sN   e Zd Zeejddddfejeee	e
eef  eeed fddZ  ZS )NestedWrappedModuleWithDelayNFr   r=   r>   r@   r;   r<   r   r   c              
      s    t ttjt| ||||||dS )Nr   )r   r   rB   r   r   r   r)   r*   rB     s    

z!NestedWrappedModuleWithDelay.init)r$   r%   r&   rF   r+   r-   rG   rH   r#   r   r   rI   r	   rJ   r   rB   r   r)   r)   r   r*   r     s   r   c                       s$   e Zd Z fddZdd Z  ZS )DummyDDPc                    s   t    || _d S rX   )r   r   r   )r2   r   r   r)   r*   r   #  s    
zDummyDDP.__init__c                 O   s   | j ||S rX   r   )r2   rt   r   r)   r)   r*   r   '  s    zDummyDDP.forwardr$   r%   r&   r   r   r   r)   r)   r   r*   r   "  s   r   c                	       sh   e Zd Zejeeeed fddZdd Z	dd Z
edejeeeeeef  eedddZ  ZS )MixtureOfExperts)r=   r}   r@   delay_before_free_msr<   c              
      s  t  j||||d || _|| _|| _|tjk| _|rFt	d| j
  d}d}d}	tt||| j}
tdd |
 D | _|
 D ]
}d|_q|rt	d	 tt||| j}|rtj|
 g}t|
|f|}
t||f|}ttt|	|| j||
tt||	| j| _d S )
Nr   *   r   r   r   c                 S   s   g | ]}|  qS r)   )ZnumelrS   pr)   r)   r*   rV   G  s     z-MixtureOfExperts.__init__.<locals>.<listcomp>Tr   )r   r   r=   r   r}   r+   r,   r{   rD   r   r   r|   rK   r   r   rf   Znum_expert_paramsexpertr   Z	new_grouprd   r   r   )r2   r=   r}   r@   r   r<   r;   Zd_expertZd_sharedZd_inputr   r   ZsharedZexpert_groupr   r)   r*   r   ,  sD    	
zMixtureOfExperts.__init__c              
      s   j dkrxjd }t|trxjd j  fdd}t|dsJtdtj	|d| |W  5 Q R  S Q R X |S )Nr   r   c                     s    t jtjt    |  S rX   )rD   rj   r   r   r   r"   )rt   Zorig_reshardr2   r)   r*   _free_full_params_with_delayf  s    z>MixtureOfExperts.forward.<locals>._free_full_params_with_delay_reshardz4expert FSDP module should have a `_reshard()` method)
r   r   
isinstancerd   r   hasattrAssertionErrorr   r   object)r2   r   r   r   r)   r   r*   r   `  s$    


   zMixtureOfExperts.forwardc              	   C   sb   |   | js^t B |  D ]2}t|dr0q |j| j tj	j
|j| jd q W 5 Q R X d S )Nr   rZ   )r   r}   rD   rg   rf   r   gradZdiv_r^   r   
all_reducer=   )r2   r9   r   r)   r)   r*   r:   v  s    

zMixtureOfExperts.run_backwardNFr   )r=   r>   r@   r;   r<   r   c                 C   st   |dkri }|t jkr(t| d|||dS |t jkrbt| fd|||d|}|tjkr^| }|S td| dS )a  
        Initializes a :class:`MixtureOfExperts` instance.

        Args:
            fsdp_init_mode (FSDPInitMode): If ``NO_FSDP``, then does not wrap
                any modules with FSDP. If ``RECURSIVE``, then wraps some nested
                modules with FSDP, including the expert and shared layers, but
                not the top-level module. The model may later be wrapped with a
                top-level FSDP external to this method if desired.
            cuda_init_mode (CUDAInitMode): Determines model movement to CUDA.
            fsdp_kwargs (Optional[Dict[str, Any]]): Optional keyword arguments
                forwarded to the FSDP constructor.
            deterministic (bool): Whether to make the model deterministic
                across constructions.
            delay_before_free_ms (int): Delay before resharding expert
                parameters in the forward pass (in ms).
        NF)r}   r@   r   r<   Tr   )r#   r'   r   r(   r+   r-   rj   r   )r=   r>   r@   r;   r<   r   r   r)   r)   r*   rB     s2    


zMixtureOfExperts.init)NFr   )r$   r%   r&   rG   rH   rJ   r+   r   r   r   r:   rF   r#   r   r   rI   r	   rB   r   r)   r)   r   r*   r   +  s(   4   r   c                       s,  e Zd Z fddZedd Zedd Zedd Zd	d
 Zdd Z	dd Z
eeee f eedddZedd Zdejeeeee eeeef  eee eed
ddZddde ddddddddfee ee ee eeeee! ee" ee eeeeeeef  eeeef  dddZ#  Z$S ) FSDPTestc                    s   t t|   |   d S rX   )r   r   setUpZ_spawn_processesr   r   r)   r*   r     s    zFSDPTest.setUpc                 C   s   t j rt j S dS )Nr   )rD   rj   is_availabledevice_countr   r)   r)   r*   r^     s    zFSDPTest.world_sizec                 C   s
   t j S rX   )rG   Zdistributed_c10dZ_get_default_groupr   r)   r)   r*   rN     s    zFSDPTest.process_groupc                 C   s   dj t| jdS )Nz{}{file_name})	file_name)formatr!   r   r   r)   r)   r*   init_method  s    zFSDPTest.init_methodc                 C   s   |  ||j d S rX   )assertEqualrm   )r2   r   rm   r)   r)   r*   _check_cpu_offload  s    zFSDPTest._check_cpu_offloadc                 C   s   |  ||j d S rX   )r   backward_prefetch)r2   r   r   r)   r)   r*   _check_backward_prefetch  s    z!FSDPTest._check_backward_prefetchc                 C   s   |  ||j d S rX   )r   forward_prefetch)r2   r   r   r)   r)   r*   _check_forward_prefetch  s    z FSDPTest._check_forward_prefetch)subtest_configtest_fntest_kwargsc           
   
   O   s~   t | }dd |D }dd |D }tj| D ]F}dd t||D }	| jf |	 ||||	 W 5 Q R X t  q2dS )a  
        Runs a test function given by ``test_fn`` as a subtest according to the
        configurations specified by ``subtest_config``. This amortizes the
        costly setup overhead (including process spawn and initializing the
        process group) over the subtests.

        Args:
            subtest_config (Dict[str, List[Any]]): A mapping from subtest
                keyword argument name to a list of its possible values.
            test_fn (Callable): A callable that runs the actual test.
            test_args: Positional arguments to pass to ``test_fn``.
            test_kwargs: Keyword arguments to pass to ``test_fn``.
        c                 S   s   g | ]}|d  qS )r   r)   rS   itemr)   r)   r*   rV     s     z)FSDPTest.run_subtests.<locals>.<listcomp>c                 S   s   g | ]}|d  qS )r[   r)   r   r)   r)   r*   rV     s     c                 S   s   i | ]\}}||qS r)   r)   )rS   kwargvaluer)   r)   r*   
<dictcomp>  s     z)FSDPTest.run_subtests.<locals>.<dictcomp>N)rx   items	itertoolsproductr]   ZsubTestrG   barrier)
r2   r   r   Z	test_argsr   Zsubtest_config_itemsZsubtest_config_keysZsubtest_config_valuesvaluesZsubtest_kwargsr)   r)   r*   run_subtests  s    zFSDPTest.run_subtestsc              
   C   s   | |}||_ ||_td|j  d|j  tj r:dnd}z tj|j	|t
|j|j d W nB tk
r } z$d|jd krttd j  W 5 d }~X Y nX tj rtj rtj|j tj   t  ||| t  t  td d S )	Nzdist init r=z, world=ZncclZgloo)r   backendr^   r   Z	recompiler   Zbackend_unavailable)r   r   printr^   rD   rj   r   rG   Zinit_process_groupr   r   RuntimeErrorrt   sysexitr   Z	exit_coder   Z
set_devicer   Zrun_testZdestroy_process_group)clsr   Z	test_namer   piper2   r  er)   r)   r*   _run  s.    
zFSDPTest._run{Gz?NF)
rM   	num_stepsautocastlrfsdp_cpu_offload	norm_type
save_modelmixed_precisionenable_sharded_grad_scaleruse_pure_fp16c              
   C   s|  |o|j }t| j}t|	d}tjj| |dd}t|D ]}|	  tj
jj|d |jtd}|
s|rt|tst|tjr| }ntdd |D }|| }|rt|tr| D ]}| |jtd q|j|||}W 5 Q R X ||}|s0|
s0|jtjksvtdnF|
rH| |jtj n.t|trf| |j|j n| |jtj |j| |d k	rd	}t|tr||| t||| j }n tj!j"| || t#||}| $||k |rt|tr| D ]}| |jtd q |%| |&  |r@d
d |' ( D }t)| |*| q@t|trt|+t,j- |. S )N)Zenabledg?)r  Zmomentumrj   c                 s   s   | ]}|  V  qd S rX   )rk   )rS   r   r)   r)   r*   	<genexpr>5  s     z4FSDPTest._train_for_several_steps.<locals>.<genexpr>rR   zeloss data type should be float32, as the original                     parameter data type is float32.g333333?c                 S   s   i | ]\}}||  qS r)   )clone)rS   kvr)   r)   r*   r   f  s      z5FSDPTest._train_for_several_steps.<locals>.<dictcomp>)/r   nextrf   r3   r   rD   optimZSGDr\   Z	zero_gradrj   ampr  r   r4   r   rd   rE   rk   tupler   r7   toZscaler   float32r   Zfloat16Zparam_dtyper:   Zclip_grad_norm__collect_total_grad_norm_fsdpr   rK   utils_collect_total_grad_norm_local
assertTruestepupdaterl   r   ri   Zload_state_dictZ_assert_stater   ZIDLErQ   )r2   rM   r  r  r  r  r  r  r  r  r  Zcpu_offload_paramsZmodel_deviceZsharded_grad_scalerr  rY   r5   r6   r   r9   Zmax_normZtotal_norm_after_cliprl   r)   r)   r*   _train_for_several_steps  sp    





   
z!FSDPTest._train_for_several_stepsr   T)model_classr>   r@   ref_init_fn	num_itersr  rm   r   sharding_strategyr  r   r  r  r  rA   c           !      K   sj  |t jkstd|dkri }d}| j }|j| jt jtjfddi|}|dkrft||g|d}n||}|rz|	 }| j
|||
dk	|||
|||d	}t| }||||	|
|d z"|j| j|||fddi|}W n< tk
r } ztd	| d
t| W 5 d}~X Y nX t|ts8t|| jf|}|rF|	 }|tjkrZ| }|dk	oh|j}|ox|tjk}|o|tjk}|rtd}| D ]}| |j| q|r| tdnt }|$ | j
||d||||
|||d
}W 5 Q R X |rdS |r4| D ]}| |j| q| }t|} tj|| |
dk	rf| j|| ddd dS )a  
        Tests FSDP training against a reference, which defaults to DDP but
        may be customized with ``ref_init_fn``.

        Args:
            model_class (Type[FSDPTestModel]): A model class that inherits from
                ``FSDPTestModel``, which defines the expected interface.
            fsdp_init_mode (FSDPInitMode): The mode to initialize the
                FSDP-wrapped model. This should not be ``NO_FSDP``.
            ref_init_fn (Optional[Callable]): A callable to invoke that wraps a
                non-wrapped model to construct the reference model, where this
                wrapper should provide data parallel semantics. If ``None``,
                then the callable defaults to the DDP constructor.
        z.Expects an FSDP init mode that wraps with FSDPNr  r<   T)Z
device_idsZoutput_device)r  r  r  r  r  r  r  )rm   r   r*  r  r   zInitializing z raised error rR   zExpected param to be on CPUF)r  r  r  r  r  r  r  r  zFSDP did not match DDP)Zexact_devicemsg)r#   r'   r   rN   r   rB   r+   r,   DDPrk   r&  rx   rf   r%  	Exceptionr   rI   r   rd   r-   rj   r   rD   r3   r   assertRaisesRegexr   ry   ZtestingZassert_allclose)!r2   r'  r>   r@   r(  r)  r  rm   r   r*  r  r   r  r  r  rA   r;   r  r   rM   Z	ref_modelZref_lossZ
ddp_paramsr   r
  r   Zexpects_device_errorZexpects_cpu_deviceZ
cpu_devicerU   contextZ	fsdp_lossZfsdp_unsharded_paramsr)   r)   r*   _test_fsdp_parityp  s    !
	
*

zFSDPTest._test_fsdp_parity)r  NNFNFF)%r$   r%   r&   r   propertyr^   rN   r   r   r   r   r   rI   r   r	   r
   r  classmethodr  rK   rL   r   rJ   floatr   r   r   r   r&  r   r.   r#   r+   r   r   r0  r   r)   r)   r   r*   r     s   


!
,       ]r   c                       s$   e Zd Z fddZdd Z  ZS )
SkipModulec                    s    t    tjdddd| _d S N
   FZbias)r   r   rK   r   linr   r   r)   r*   r     s    
zSkipModule.__init__c                 C   s
   |  |S rX   )r8  r   r)   r)   r*   r     s    zSkipModule.forwardr   r)   r)   r   r*   r4    s   r4  c                       s$   e Zd Z fddZdd Z  ZS )NestedLinearc                    sD   t    |r*ttjdddd | _ntjdddd | _d S r5  )r   r   r   rK   r   rj   nested_linear)r2   	fsdp_wrapr   r)   r*   r     s    
zNestedLinear.__init__c                 C   s
   |  |S rX   )r:  r   r)   r)   r*   r     s    zNestedLinear.forwardr   r)   r)   r   r*   r9    s   r9  c                       s$   e Zd Z fddZdd Z  ZS )	SkipModelc                    s@   t    tjdddd | _t  | _tt	|d| _
d S )Nr6  Fr7  )r;  )r   r   rK   r   rj   linearr4  linear_skipr   r9  r:  )r2   Zdouble_nestr   r)   r*   r     s    
zSkipModel.__init__c                 C   s"   |  |}| |}| |}|S rX   )r=  r>  r:  r   r)   r)   r*   r     s    


zSkipModel.forwardr   r)   r)   r   r*   r<    s   r<  c                 C   sV   t | |}tjjj}|tkr*tjjj}d}tj|| |d}tj	||d |d|  S )N      ?r   )op)
r"  rD   r   ZReduceOpZSUMr   MAXZtensorrG   r   )rM   r  r   
total_normr@  Zreturn_normr)   r)   r*   r   %  s    


r   c                 C   sb   |t krtdd |  D S d}|  D ]&}tjj|j|tjd}||| 7 }q*|d|  S d S )Nc                 s   s   | ]}|j   V  qd S rX   )r   absmaxr   r)   r)   r*   r  2  s     z1_collect_total_grad_norm_local.<locals>.<genexpr>g        r   r?  )r   rD  rf   rD   ZlinalgZvector_normr   r  )rM   r  rB  r   Z
local_normr)   r)   r*   r"  0  s    r"  )F)FF)T)Vr   r   r  abcr   r   
contextlibr   copyr   enumr   r   mathr   typingr	   r
   r   r   r   r   r   r   Zunittestr   rD   Ztorch.distributedr   rG   Ztorch.nnrK   Ztorch.distributed.fsdpr   r   rd   Z2torch.distributed.fsdp.fully_sharded_data_parallelr   r   r   r   Z*torch.distributed.fsdp.sharded_grad_scalerr   Ztorch.distributed.fsdp.wrapr   r   r   r   r   Ztorch.nn.parallel.distributedr   r,  Z*torch.testing._internal.common_distributedr   r    Z$torch.testing._internal.common_utilsr!   r"   r#   r+   rL   r.   rH   rb   rJ   ri   rn   ru   ry   r|   r   r   r   r   r   r   r   r   r   r   r   r4  r9  r<  r   r"  r)   r)   r)   r*   <module>   sp   (		# 
{^ D	   O	