U
    Jcq@                     @   s   d dl mZmZ d dlZd dlmZmZmZmZ d dl	Z	d dl
mZ d dlmZmZmZ d dlmZ d dlmZ d dlmZ dd	 Ze	jd
ddZG dd deZG dd deZdS )    )abcdefaultdictN)DictListOptionalUnion)FloatTensor)
GradScalerOptState_MultiDeviceReplicator)ProcessGroup)SGDc                   C   s   t ji dS )N)stagefound_inf_per_device)r
   ZREADY r   r   N/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/fsdp/sharded_grad_scaler.py_refresh_per_optimizer_state   s    r   tensorc                 C   s   | j p| jjdkS )N)Zxlacpu)Zis_cudadevicetyper   r   r   r   _is_supported_device   s    r   c                   @   s"   e Zd ZdZejddddZdS )_GeneralMultiDeviceReplicatorz
    Lazily serves tensor to request device. This class extends
    _MultiDeviceReplicator to allow support for "cpu" as a device.
    N)master_tensorreturnc                 C   s   t |st|| _i | _d S N)r   AssertionErrorZmaster_per_device_tensors)selfr   r   r   r   __init__   s    z&_GeneralMultiDeviceReplicator.__init__)__name__
__module____qualname____doc__torchTensorr    r   r   r   r   r      s   r   c                       s
  e Zd ZdZdddddejjfeeeee	e
e d fdd	Zeejeej f eejeej f d
ddZeejejddddZdeejeje	eejejf dddZeddddZee
e d fddZddddZd e
eeef  ddddZ  ZS )!ShardedGradScalera@	  
    ShardedGradScaler helps perform gradient scaling in a shard aware manner. It extends
    functionality from GradScaler:
    * Suports Pytorch DDP and FSDP implementations
    * Support CPU offloaded tensors (as used in fully sharded data parallel[FSDP])
    * Supports the custom Mixed Precision loss dtype (fp16, bf16) that FSDP returns
    * Sync inf/nan for scaled gradient tensors on any torch.device (where tensors are placed) across
    nodes

    Example::

        # Creates a ShardedGradScaler once at the beginning of training.
        scaler = ShardedGradScaler()

        for epoch in epochs:
            for input, target in data:
                optimizer.zero_grad()
                output = model(input)
                loss = loss_fn(output, target)

                # Scales loss.  Calls backward() on scaled loss to create scaled gradients.
                scaler.scale(loss).backward()

                # scaler.step() first unscales gradients of the optimizer's params.
                # If gradients don't contain infs/NaNs, optimizer.step() is then called,
                # otherwise, optimizer.step() is skipped.
                scaler.step(optimizer)

                # Updates the scale for next iteration.
                scaler.update()

    See :class:`GradScaler` for explanation of scaling/unscaling and more use cases.

    Args:
        init_scale (float, optional, default=2.**16):  Initial scale factor.
        growth_factor (float, optional, default=2.0):  Factor by which the scale is multiplied during
            :meth:`update` if no inf/NaN gradients occur for ``growth_interval`` consecutive iterations.
        backoff_factor (float, optional, default=0.5):  Factor by which the scale is multiplied during
            :meth:`update` if inf/NaN gradients occur in an iteration.
        growth_interval (int, optional, default=2000):  Number of consecutive iterations without inf/NaN gradients
            that must occur for the scale to be multiplied by ``growth_factor``.
        enabled (bool, optional):  If ``False``, disables gradient scaling. :meth:`step` simply
            invokes the underlying ``optimizer.step()``, and other methods become no-ops.
            Default: ``True``
        process_group (ProcessGroup, optional, default=torch.distributed.group.WORLD):
            process group for sharding
    g      @g      ?g       @i  T)
init_scalebackoff_factorgrowth_factorgrowth_intervalenabledprocess_groupc                    s0   t  j|||||d | jr,|| _tt| _d S )N)r(   r)   r*   r+   r,   )superr    _enabledr-   r   r   _per_optimizer_states)r   r(   r)   r*   r+   r,   r-   	__class__r   r   r    P   s    	zShardedGradScaler.__init__)outputsr   c                    s   j s
|S t|tjrht|s"tjd kr8|j jd k	sFt|jj	|jdd }|
|jS g ttjtjf ttjtjf d fdd  |S )NTr   Znon_blocking)valr   c                    s   t | tjrxt| sttdkrXjd kr:| j jd k	sHt	t
j | d | j }|| jS t | tjrt | }t | ttfrt| |S |S ntdd S )Nr   z2outputs must be a Tensor or an iterable of Tensors)
isinstancer%   r&   r   r   len_scale_lazy_init_scale_growth_trackerr   appendr   getr   dtyper   Iterablemaplisttuple
ValueError)r5   Z
scaled_valiteratorapply_scaler   stashr   r   rD   u   s    

z,ShardedGradScaler.scale.<locals>.apply_scale)r/   r6   r%   r&   r   r   r8   r9   r   tor   r<   r   r   r=   )r   r3   Zscaled_outputr   rC   r   scaled   s    
0zShardedGradScaler.scaleN)grads	found_inf	inv_scaler   c                 C   s   t |dkrd S | dks$td| dks8td|d j}|D ]}|D ]|}|j|krxtd|j|f  tdt|	 
 dkst|	 
 dkrtdg|_ qFqN| j|
 9  _qNqFd S )	Nr      z%inv_scale must be a 1-element tensor.z%found_inf must be a 1-element tensor.z-tensor device is %s and expected device is %sz%Gradients must be on the same device.T      ?)r7   numelr   r   loggingerrorrA   r%   isinfanyitemisnanr   data)r   rH   rI   rJ   Zexpected_devicegradr   r   r   r   *_foreach_non_finite_check_and_unscale_cpu_   s    

,z<ShardedGradScaler._foreach_non_finite_check_and_unscale_cpu_)	optimizerrJ   rI   
allow_fp16r   c              
   C   s>  t |}t |}tdd }t  |jD ]}|d D ]}	|	jd krJq:|sd|	jjtjkrdtd|	jj	r|	jjtjkr|	j
tj }
|

tj|	_|	j }n|	j}||j |j | q:q.| D ]^\}}| D ]L}|d jj
dkr| ||||| qt||||| qqW 5 Q R X |jS )Nc                   S   s   t tS r   )r   r?   r   r   r   r   <lambda>       z3ShardedGradScaler._unscale_grads_.<locals>.<lambda>paramsz%Attempting to unscale FP16 gradients.r   r   )r   r   r%   Zno_gradZparam_groupsrU   r<   Zfloat16rA   Z	is_sparser   float32ZcoalesceZ_valuesr   r:   itemsvaluesrV   r;   Z*_amp_foreach_non_finite_check_and_unscale_r   )r   rW   rJ   rI   rX   Zper_device_inv_scaleZper_device_found_infZper_device_and_dtype_gradsgroupparamZparam_grad_fp32Z
to_unscaler   Zper_dtype_gradsrH   r   r   r   _unscale_grads_   s>    

z!ShardedGradScaler._unscale_grads_)rW   r   c                 C   s>  | j s
d S | d | jt| }|d tjkr:tdn|d tjkrPtd| jd k	s^t	| j
   }tjddtj| jjd}| |||d|d	< tj|d< | jt| }g }|d	  D ]b}|jjd
kr
| }|tj|d| jd  ||  q|tj|d| jd  q|r:tj| d S )Nunscale_r   zMunscale_() has already been called on this optimizer since the last update().z(unscale_() is being called after step().)rK   g        )r<   r   Tr   r   )Zasync_opr_   )r/   _check_scale_growth_trackerr0   idr
   ZUNSCALEDRuntimeErrorZSTEPPEDr8   r   doubleZ
reciprocalfloatr%   fullr\   r   ra   r^   r   cudar:   distZ
all_reducer-   Z
get_futurecopy_r   ZfuturesZwait_all)r   rW   Zoptimizer_staterJ   rI   Zfuture_handlesvZ	v_on_cudar   r   r   rb      s.    


zShardedGradScaler.unscale_c                    s   t  j|f||S r   )r.   step)r   rW   argskwargsr1   r   r   rm      s    zShardedGradScaler.step)r   c                 C   sZ   |  dkr$|  j| j9  _d| _n2| jd }|| jkrP|  j| j9  _d| _n|| _dS )z
        If found_inf is 1.0 (True), then scale is multiplied by backoff_factor and growth_tracker is set to zero.
        Otherwise, scale is multiplied by the growth factor when the growth interval is reached.
        rL   r   rK   N)rR   r8   _backoff_factor_growth_tracker_growth_interval_growth_factor)r   rI   Z
successfulr   r   r   _amp_update_scale_cpu_   s    

z(ShardedGradScaler._amp_update_scale_cpu_)	new_scaler   c                    s*  | j s
dS | d\ }|dk	rt|tr8| j| nLd}t|tjjsRt	||
 dksft	||jdksxt	|| j| n fdd| j D }t|dkst	d	|d }t|dkrtdt|D ]}||| 7 }qԈ jjd
kr| | nt| j| j|| j| j| j tt| _dS )a  
        Updates the scale factor.
        If any optimizer steps were skipped the scale is multiplied by ``backoff_factor``
        to reduce it. If ``growth_interval`` unskipped iterations occurred consecutively,
        the scale is multiplied by ``growth_factor`` to increase it.
        Passing ``new_scale`` sets the new scale value manually. (``new_scale`` is not
        used directly, it's used to fill GradScaler's internal scale tensor. So if
        ``new_scale`` was a tensor, later in-place changes to that tensor will not further
        affect the scale GradScaler uses internally.)
        Args:
            new_scale (float or :class:`torch.cuda.FloatTensor`, optional, default=None):  New scale factor.
        .. warning::
            :meth:`update` should only be called at the end of the iteration, after ``scaler.step(optimizer)`` has
            been invoked for all optimizers used this iteration.
        Nupdatez[new_scale should be a float or a 1-element torch.cuda.FloatTensor with requires_grad=False.rK   Fc                    s.   g | ]&}|d    D ]}|j jddqqS )r   Tr4   )r^   rF   r   ).0staterI   r8   r   r   
<listcomp>3  s    z,ShardedGradScaler.update.<locals>.<listcomp>r   z,No inf checks were recorded prior to update.r   )r/   rc   r6   rg   r8   Zfill_r%   ri   r   r   rM   Zrequires_gradrk   r0   r^   r7   ranger   r   rt   Z_amp_update_scale_rq   rs   rp   rr   r   r   )r   ru   rq   reasonZ
found_infsZfound_inf_combinedir   ry   r   rv     s<    


zShardedGradScaler.update)T)N)r!   r"   r#   r$   rj   r_   ZWORLDrg   intboolr   r   r    r   r%   r&   r   rG   rV   r   r   r   ra   rb   rm   rt   r   rv   __classcell__r   r   r1   r   r'       sB   10*      3%r'   )collectionsr   r   rN   typingr   r   r   r   r%   Z
torch.cudar   Ztorch.cuda.amp.grad_scalerr	   r
   r   Z"torch.distributed.distributed_c10dr   Ztorch.distributedZdistributedrj   Ztorch.optim.sgdr   r   r&   r   r   r'   r   r   r   r   <module>   s   