U
    Jc                     @   s   d dl mZ d dlZd dlZd dlmZ d dlZd dlZd dl	m
Z ddlmZ d dl	mZ ddd	gZeeZdd
dZdddZdd Zdd ZG dd deZeejejjej dddZeejejjej ddd	ZdS )    )defaultdictN)Dict   )default_hooks)distributed_c10dPowerSGDStatepowerSGD_hookbatched_powerSGD_hookc              
   C   s   t | jdkr"| jd | jd ks&t| jd }| jd }| j}|dksX|tjtjfkrft| |d n&tjj	| | tj
|||| j|dfd dS )	z
    Decide between Gram-Schmidt or QR factorization to orthogonalize a batch of matrices.
    QR factorization doesn't work with half-precision, but it is usually faster with a rank > 2.
          r   r   )epsilondevicedtypeoutN)lenshapeAssertionErrorr   torchZfloat16Zbfloat16_orthogonalize_gram_schmidtZlinalgZqremptyr   )matricesr   Znum_matricesZrankr    r   ]/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/algorithms/ddp_comm_hooks/powerSGD_hook.py_orthogonalize   s    &

r   c              	   C   s   | j d }t|D ]}| dddd||d f }|dkrz|tj|ddd }W q tk
r|   td |d Y qX n|tj|ddd|  }|d |k r| dddd|d df }|tj|| ddd| 8 }qdS )	z
    Applies Gram-Schmidt procedure to orthogonalize a batch of matrices.
    If epsilon is 0, this is equivalent to `torch.qr(matrices, out=(matrices, _))`,
    r   Nr   r   T)ZdimZkeepdimzThe matrices to be orthogonalized has at least a column of all 0s. Please set a small value such as 1e-8 as `orthogonalization_epsilon` in PowerSGD state.g        )	r   ranger   ZnormZeroDivisionErrorloggererrorfill_sum)r   r   num_colsicolrestr   r   r   r   )   s    
r   c                 C   s&   | | }| | | }|| |k ||fS )a  
    Returns a recommendation as to whether the 2D tensor described by the arguments is worth compressing,
    including statistics describing the expected savings from compression.  We consider a tensor worth
    compressing when ``min_compression_rate`` < uncompressed size / compressed size, where
    uncompressed size = ``num_rows`` * ``num_cols``,
    and compressed size = (``num_rows`` + ``num_cols``) * ``matrix_approximation_rank``.

    The result of this function is a tuple of the form (compression_recommendation, uncompressed_el_count, compressed_el_count), where:

    compresion_recommendation is true if the tensor is worth compressing, and false otherwise (see above);

    uncompressed_el_count is the uncompressed element count, i.e. ``num_rows`` * ``num_cols``; and,

    compress_el_count is the element count after compression, i.e. (``num_rows`` + ``num_cols``) * ``matrix_approximation_rank``.
    r   )Znum_rowsr"   matrix_approximation_rankmin_compression_rateuncompressed_sizeZcompressed_sizer   r   r   _should_compressI   s    
r)   c              	   C   sR   |   rN|j|jkrN| }td|j|d |d |d  |j|j |_dS )zy
    Report compression stats at the frequency of `compression_stats_logging_frequency` specified in PowerSGD state.
    z\Compression stats: iter {}, total before compression {}, total after compression {}, rate {}r   r   r   N)is_lastiternext_stats_reportcompression_statsr   infoformat#compression_stats_logging_frequency)bucketstatestatsr   r   r   _report_compression_statsd   s    
   r4   c                   @   sf   e Zd ZdZdddddddd	d
ddddddddgZd&edddZdd Zdd  Zd!d" Z	d#d$ Z
d%S )'r   ah  
    Stores both the algorithm's hyperparameters and the internal state for all the gradients during the training.
    Particularly, ``matrix_approximation_rank`` and ``start_powerSGD_iter`` are the main hyperparameters that should be tuned by the user.
    For performance, we suggest to keep binary hyperparameters ``use_error_feedback`` and ``warm_start`` on.

    1. ``matrix_approximation_rank`` controls the size of compressed low-rank tensors, which determines the compression rate. The lower the rank, the stronger the compression.

        1.1. If ``matrix_approximation_rank`` is too low, the full model quality will need more training steps to reach or will never reach and yield loss in accuracy.

        1.2. The increase of ``matrix_approximation_rank`` can substantially increase the computation costs of the compression, and the accuracy may not be futher improved beyond a certain ``matrix_approximation_rank`` threshold.

    To tune ``matrix_approximation_rank``, we suggest to start from 1 and increase by factors of 2 (like an expoential grid search, 1, 2, 4, ...), until a satisfactory accuracy is reached. Typically only a small value 1-4 is used. For some NLP tasks (as shown in Appendix D of the original paper), this value has been increased to 32.

    2. ``start_powerSGD_iter`` defers PowerSGD compression until step ``start_powerSGD_iter``, and vanilla allreduce runs prior to step ``start_powerSGD_iter``. This hybrid scheme of **vanilla allreduce + PowerSGD** can effectively improve the accuracy, even a relatively small ``matrix_approximation_rank`` is used. This is because that, the beginning of training phase is usually very sensitive to inaccurate gradients, and compressing gradients too early may make the training quickly take a suboptimal trajectory, which can result in an irrecoverable impact on the accuracy.

    To tune ``start_powerSGD_iter``, we suggest to start with 10% of total training steps, and increase it until a satisfactory accuracy is reached. If there is a warm-up stage in the training, ``start_powerSGD_iter`` typically should be no less than the number of warm-up steps.

    3. ``min_compression_rate`` is the minimum compression rate required when a layer is compressed. Due to the computation overheads incurred by the compression, a tensor is worth compressing only if there can be sufficient saving in bandwidth, where ``(num_rows + num_cols) * matrix_approximation_rank * min_compression_rate < num_rows * num_cols``. If the specified compression rate threshold cannot be satisfied, the tensor will be directly allreduced without compression.

    Compression statistics are logged every ``compression_stats_logging_frequency`` iterations once PowerSGD compression starts.

    4. ``orthogonalization_epsilon`` can be a very small value (e.g., 1e-8) added to every normalized matrix column in orthogonalization step, to prevent div-by-zero error if any column has all 0s. If this can already be prevented (e.g., by batch normalization), an epsilon of 0 is recommended for accuracy.

    5. ``batch_tensors_with_same_shape`` controls whether to compress and decompress tensors with same shape in a batched operation to achieve higher parallelism. Note that you should also increase the bucket size (i.e., ``bucket_cap_mb`` arg in DDP constructor) to make more same-shaped tensors appear in the same bucket, however this may reduce the overlap between computation and communication, and increase the memory footprint due to stacking the tensors of the same shape. Set to ``True`` if the compression / decompression computation is a bottleneck.

    .. warning ::
        If error feedback or warm-up is enabled, the minimum value of ``start_powerSGD_iter`` allowed in DDP is 2.
        This is because there is another internal optimization that rebuilds buckets at iteration 1 in DDP,
        and this can conflict with any tensor memorized before the rebuild process.
    process_groupr&   start_powerSGD_iterr'   orthogonalization_epsilonuse_error_feedback
warm_startbatch_tensors_with_same_shaperng
error_dictp_memory_dictq_memory_dictr+   total_numel_before_compressiontotal_numel_after_compressionr0   r,   r     r   Tr   '  F)r:   c                 C   s   t d||||||||	|
	 || _|| _|s4|rD|dkrDtd|| _|| _|| _|| _	|| _
tj|| _i | _i | _i | _d| _d| _d| _td|	| _d| _|
| _d S )Na  PowerSGD config: matrix_approximation_rank = {}; start_powerSGD_iter = {}; min_compression_rate = {}; orthogonalization_epsilon = {}; use_error_feedback = {}; warm_start = {}; random_seed = {}; compression_stats_logging_frequency = {}; batch_tensors_with_same_shape = {}r   zExpect `start_powerSGD_iter` > 1 if `use_error_feedback` or `warm_start` is enabled, because PowerSGD can only be applied after the first two iterations in DDP.r   )r   r.   r/   r5   r&   
ValueErrorr6   r'   r8   r9   r7   nprandomZRandomStater;   r<   r=   r>   r+   r?   r@   maxr0   r,   r:   )selfr5   r&   r6   r'   r8   r9   r7   Zrandom_seedr0   r:   r   r   r   __init__   sJ    	 zPowerSGDState.__init__c                    s   t d  fdd jD S )z
        Returns a ``Dict[str, Any]`` which will be pickled and saved.
        ``process_group`` is not serializable and excluded from
        a returned state.
        zHNOTE: Process group is not serializable and excluded from a saved state.c                    s    i | ]}|d kr|t  |qS )r5   )getattr).0slotrG   r   r   
<dictcomp>  s     z.PowerSGDState.__getstate__.<locals>.<dictcomp>)r   warning	__slots__rL   r   rL   r   __getstate__  s    
zPowerSGDState.__getstate__c                 C   s6   t  | _td | D ]\}}t| || qdS )zz
        Takes a provided ``state`` and retrieves ``PowerSGDState``.
        ``process_group`` is set to default.
        zNOTE: Process group will be set to a default group (i.e. the world size).                If a different group is desired, please set `self.process_group` after PowerSGD state is loaded.N)r   Z_get_default_groupr5   r   rN   itemssetattr)rG   r2   rK   valuer   r   r   __setstate__  s    
zPowerSGDState.__setstate__c                 C   s8   |  r|  jd7  _| j| jkr4td| j d S )Nr   z,Start to apply PowerSGD after {} iterations.)r*   r+   r6   r   r.   r/   )rG   r1   r   r   r   maybe_increase_iter'  s    
z!PowerSGDState.maybe_increase_iterc                 C   s(   | j dkr| j| j  nd}|| j| j fS )a  
        Returns the latest compression statistics as a tuple of the form (compress_rate, numel_before_compression, numel_after_compression), where:

        compress_rate is the effective compression rate i.e. (number of elements before compression) / (number of elements after compression);

        numel_before_compression is the total number of elements before compression was applied; and,

        numel_after_compression is the total number of elements after compression was applied.
        r   )r@   r?   )rG   Zcompress_rater   r   r   r-   2  s    zPowerSGDState.compression_statsN)	r   rA   r   TTr   r   rB   F)__name__
__module____qualname____doc__rO   boolrH   rP   rT   rU   r-   r   r   r   r   r   t   sF             _)r2   r1   returnc                    s6  	j }|dk	r|ntjj   	j	jk rL	  t	
S jj}  djd }	jrĈ	jkr	j  n&td| tj||d	j< t   }g g  
d}d}|D ]}||jd d}	|	j\}
}t|
|	j}t|
||	j}	 j|d 7  _|d rr
 |	 ||
| 7 }||| 7 }	 j!|d 7  _!q | 	 j!|d 7  _!qt" 	 rt#dd	 D ntj$g |d}d
}	j%r܈	j&kr&d}	j%rtd|| tj'||d	j&< tj'||d	j(< t)t*
D ]}|j  | q2	fdd}g 
g g d}d}| D ]}|j\}}
}t|
|	j}
 |  	j& ||||
 |   ||
|  	j( |||| |   ||| |||
 | 7 }||| | 7 }qr|s:D ]}t+|	j, q$n^tj-j.g dJ t/	j01d D ],}|2tj3|jd|d t+|	j, q`W 5 Q R X t4
D ]\}}}tj5|||d qtj6|dd7 }	fdd}	
fdd} 	
fdd}|8|8|8|S )aL  
    This DDP communication hook implements PowerSGD gradient compression
    algorithm described in the `paper <https://arxiv.org/abs/1905.13727>`_.
    Once gradient tensors are aggregated across all workers, this hook applies
    compression as follows:

    1. Views the input flattened 1D gradient tensor as a list of per-parameter tensors, and divides all the tensors into two groups:

        1.1 The tensors that should be compressed before allreduce, because the compression can give enough saving in bandwidth.

        1.2 Rest of the tensors will be directly allreduced without compression, including all the vector tensors (for biases).

    2. Handles uncompressed tensors:

        2.1. Allocate contiguous memory for those uncompressed tensors, and allreduces all the uncompressed tensors as a batch, without compression;

        2.2. Copies the individual uncompressed tensors from the contiguous memory back to the input tensor.

    3. Handles the tensors that should be compressed by PowerSGD compression:

        3.1. For each tensor M, creates two low-rank tensors P and Q for decomposing M,
        such that M = PQ^T, where Q is initialized from a standard normal distribution and orthogonalized;

        3.2. Computes each P in Ps, which is equal to MQ;

        3.3. Allreduces Ps as a batch;

        3.4. Orthogonalizes each P in Ps;

        3.5. Computes each Q in Qs, which is approximately equal to M^TP;

        3.6. Allreduces Qs as a batch;

        3.7. Computes each M among all the compressed tensors, which is approximately equal to PQ^T.

    Note that this communication hook enforces vanilla allreduce for the first ``state.start_powerSGD_iter`` iterations.
    This not only gives the user more control over the tradeoff between speedup and accuracy,
    but also helps abstract away some complexity of the internal optimization of DDP for future communication hook developers.

    Args:
        state (PowerSGDState): State information to configure the compression rate and support error feedback, warm start, etc.
            To tune the compression configs, mainly need to tune ``matrix_approximation_rank``, ``start_powerSGD_iter``
            and ``min_compression_rate``.
        bucket (dist.GradBucket): Bucket that stores a 1D flattened gradient tensor that batches multiple per-variable tensors.
            Note that since DDP comm hook only supports single process single device mode,
            only exactly one tensor is stored in this bucket.

    Returns:
        Future handler of the communication, which updates the gradients in place.

    Example::
        >>> # xdoctest: +SKIP
        >>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1,
                                  start_powerSGD_iter=10, min_compression_rate=0.5)
        >>> ddp_model.register_comm_hook(state, powerSGD_hook)
    Nr   BA zero tensor of length {} that represents local error is created.r   r   r   c                 S   s   g | ]}| d qS )r]   )view)rJ   tensorr   r   r   
<listcomp>  s     z!powerSGD_hook.<locals>.<listcomp>FTzXAllocating contiguous memory of length {} for Ps, and of length {} for Qs, respectively.c                  3   s^      D ]P} jrBt| }|dkr4| d dV  qXt| V  q| D ]}|dV  qFqd S )Nr   r   )valuesr:   r   Z	unsqueezer   stack)tensors
batch_sizer_   )shape_to_tensorsr2   r   r   !maybe_batched_tensors_to_compress  s    z8powerSGD_hook.<locals>.maybe_batched_tensors_to_compressZdevices ʚ;cpur   groupZasync_opc                    sn   |   d }d}D ]0}|||||   | || 7 }qtjj  dd 	 d S )Nr   Trj   )
rS   div_copy_ZnumelZview_asdist
all_reducer=   
get_futurewait)futuncompressed_tensors_memoryidxr_   )bucket_indexgroup_to_user2   uncompressed_tensors
world_sizer   r   ,unpack_uncompressed_tensors_and_allreduce_ps/  s      zCpowerSGD_hook.<locals>.unpack_uncompressed_tensors_and_allreduce_psc                    sv   |   j < D ]}t|j qtD ]"\}}}tj|dd||d q0tj	j
  dd  d S )Nr   r   r   Trj   r   )rS   r=   r   r7   zipr   bmm	transposern   ro   r>   rp   rq   )rr   pr_   q)ru   rv   psqsr2   tensors_to_compressr   r   
compute_qsA  s      z!powerSGD_hook.<locals>.compute_qsc                    s   |   
j< t	D ]"\}}}tj||dd|d q jr	D ]F}|jd dkrbqN|jdd   }t	|D ]\}}|
||  q|qNtj rtj jr j< jsڈj  j    S )Nr   r   r   r   )rS   rl   r>   rz   r   r{   r|   r:   r   	enumeraterm   cudais_availablesynchronizer8   r<   r9   r=   clearrU   )rr   r}   r~   r_   Zoriginal_tensorsr#   Zoriginal_tensor)r1   ru   r   input_tensorinput_tensor_cpr   r   re   r2   r   rx   r   r   
decompressW  s&    



z!powerSGD_hook.<locals>.decompress)9r5   rn   rk   WORLDsizebufferr+   r6   rU   default_allreduce_futr   r   indexr   r8   r<   add_r   r.   r/   r   zerosclonedetachZ	gradientsr^   minr&   r)   r'   r?   appendr@   r4   catr_   r9   r=   r   r>   r   listr   r7   rE   fork_rngmanual_seedr;   randintrm   randnrz   r{   ro   rp   then)r2   r1   r5   r   total_lengthrc   Ztotal_Ps_sizeZtotal_Qs_sizer_   matrixnmr&   Zcompress_testrs   Zneed_randomize_qsrf   Zp_idxZq_idxrd   r~   r}   Z-allreduce_contiguous_uncompressed_tensors_futry   r   r   r   )r1   ru   r   rv   r   r   r   r   re   r2   r   rw   rx   r   r   H  s   ;


  

   




     

        
 c              	      s  j }|dk	r|ntjj 
  jjk rL  t	
S jjd 	 j	7  _tt	 jj d 7  _d }| 	| d t    djr&jkrj  n(td| tj|jdj< t ! "j#rFj$krj#rbtdj fdd}|d	j%d
j$< |dj%d
j&< t'j&  tj(j& j$ d tj)j$ dd* }fdd} 	
f	dd}|+|+|S )a
  
    This DDP communication hook implements a simplified PowerSGD gradient compression
    algorithm described in the `paper <https://arxiv.org/abs/1905.13727>`_.
    This variant does not compress the gradients layer by layer,
    but instead compresses the flattened input tensor that batches all the gradients.
    Therefore, it is **faster** than :meth:`powerSGD_hook`,
    but usually results in a **much lower accuracy**, unless ``matrix_approximation_rank`` is 1.

    .. warning ::
        Increasing ``matrix_approximation_rank`` here may not necessarily increase the accuracy,
        because batching per-parameter tensors without column/row alignment can destroy low-rank structure.
        Therefore, the user should always consider :meth:`powerSGD_hook` first,
        and only consider this variant when a satisfactory accuracy can be achieved when ``matrix_approximation_rank`` is 1.

    Once gradient tensors are aggregated across all workers, this hook applies
    compression as follows:

    1. Views the input flattened 1D gradient tensor as a square-shaped tensor M with 0 paddings;

    2. Creates two low-rank tensors P and Q for decomposing M, such that M = PQ^T, where Q is initialized from a standard normal distribution and orthogonalized;

    3. Computes P, which is equal to MQ;

    4. Allreduces P;

    5. Orthogonalizes P;

    6. Computes Q, which is approximately equal to M^TP;

    7. Allreduces Q;

    8. Computes M, which is approximately equal to PQ^T.

    9. Truncates the input tensor to the original length.

    Note that this communication hook enforces vanilla allreduce for the first ``state.start_powerSGD_iter`` iterations.
    This not only gives the user more control over the tradeoff between speedup and accuracy,
    but also helps abstract away some complexity of the internal optimization of DDP for future communication hook developers.

    Args:
        state (PowerSGDState): State information to configure the compression rate and support error feedback, warm start, etc.
            To tune the compression configs, mainly need to tune ``matrix_approximation_rank`` and ``start_powerSGD_iter``.
        bucket (dist.GradBucket): Bucket that stores a 1D flattened gradient tensor that batches multiple per-variable tensors.
            Note that since DDP comm hook only supports single process single device mode,
            only exactly one tensor is stored in this bucket.

    Returns:
        Future handler of the communication, which updates the gradients in place.

    Example::
        >>> # xdoctest: +SKIP
        >>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1)
        >>> ddp_model.register_comm_hook(state, batched_powerSGD_hook)
    Nr   r   r\   r   zLInitializing low-rank tensors P and Q, each of which has a shape of {} x {}.c              
      sn   | rTt jjg d: t |d t jjdjd W  5 Q R  S Q R X nt j	j jdS dS )zOReturns a low-rank 2D tensor of square_side_length * matrix_approximation_rank.rg   rh   ri   r   N)
r   rE   r   r   r   r   r&   r   tor   Zfill_random_valuesr;   )r   r   square_side_lengthr2   r   r   create_low_rank_tensor  s"    z5batched_powerSGD_hook.<locals>.create_low_rank_tensorFr   Tr   rj   c                    sb   |   d j < tj   tj j  j  d tjj  dd	 
 d S )Nr   r   Trj   )rS   r=   r   r   matmultr>   rn   ro   rp   rq   )rr   )ru   rv   r   r2   r   r   	compute_q  s      z(batched_powerSGD_hook.<locals>.compute_qc                    s   |   j< tjj j  d jrH j< tj	
 r^tj	 jsxj  j  }  |S )Nr   )rS   rl   r>   r   r   r=   r   r8   r<   r   r   r   r9   r   resize_rU   )rr   ret)	r1   ru   r   r   r   r   r2   r   rx   r   r   r   3  s     




z)batched_powerSGD_hook.<locals>.decompress),r5   rn   rk   r   r   r   r+   r6   rU   r   r   r   r   r?   mathceilsqrtr@   r&   r   r    r4   r   r8   r<   r   r   r.   r/   r   r   r   r   r   r^   r9   r=   r;   r>   r   r   ro   rp   r   )r2   r1   r5   Zpadded_total_lengthr   Zallreduce_p_futr   r   r   )r1   ru   r   rv   r   r   r   r   r2   r   rx   r   r	   ~  s    9




         
)r   )r   ) collectionsr   loggingr   typingr   ZnumpyrD   r   Ztorch.distributedZdistributedrn    r   r   r   __all__	getLoggerrV   r   r   r   r)   r4   objectr   Z
GradBucketZfuturesZFutureZTensorr   r	   r   r   r   r   <module>   s:     


  V   9 