U
    Jc03                     @   s  U d dl Z d dlmZmZmZ d dlZd dlmZ d dl	m  m
  mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ erd dlmZ i Zeeef ed	< d
d ZG dd dejZdddZeejjdddZeejdddZ eej!dddZ"eejj#j$e eejj%j$e eejj&j$e eejj'j$e eejj(e eejj)e eejj*j$e eejj+e eejj,e dS )    N)CallableDictTYPE_CHECKING)distributed_c10d)reduce_scatter)_register_default_op)_decorator_func)tree_mapShardedTensor_PARTIAL_TENSOR_OPSc                 C   s   t jt| tdS )z
    Decorate for custom partial tensor op
    Args:
        func(Callable): Torch function for which we want to provide a PartialTensor
            implementation (ex: torch.nn.functional.linear)
    )opZop_table)	functoolspartialr   r   )func r   K/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/_shard/partial_tensor.py_custom_partial_tensor_op   s
    r   c                       s   e Zd ZU dZejed< ejed< ej	ed< dddgZ
dej	jfddZdd	 Zejd
dddZedddZedddZ fddZ  ZS )_PartialTensora;  
    PartialTensor is an abstraction to represent Tensors that need
    aggregation across multiple devices and multiple processes.

    PartialTensor is initialized in an SPMD like fashion where each rank
    initializes the PartialTensor. The PartialTensor object on each rank
    then only stores the local partial shard, process group and the
    aggregation way to get a full tensor.

    PartialTensor doesn't provide any Tensor like operations but is a
    wrapper providing the Tensor representing the local partial shard.

    We assume the size of each local tensor to be exactly the same.

    Users can apply custom distributed sharded computations on top of
    this primitive.

    Args:
        local_partial_shard (Tensor): Partial result stored across ranks.
        process_group (ProcessGroup): The process group to aggregate on.
        reduce_op (distributed_c10d.ReduceOp): Way to aggregate the partial result.
            Default: ``distributed_c10d.ReduceOp.SUM``

    Examples:
        >>> # All tensors below are of torch.int64 type.
        >>> # We have 2 process groups, 2 ranks.
        >>> # xdoctest: +SKIP
        >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
        >>> tensor = torch.cat([tensor, tensor + 2])
        >>> tensor
        tensor([1, 2, 3, 4]) # Rank 0
        tensor([3, 4, 5, 6]) # Rank 1
        >>> partial_tensor = _PartialTensor(tensor, distributed_c10d.ReduceOp.MAX)
        >>> sharding_dim = 0
        >>> collect_spec = shard_spec.ChunkShardingSpec(
                dim=sharding_dim,
                placements=[
                    "rank:0/cuda:0",
                    "rank:1/cuda:1",
                ],
            )
        >>> complete_tensor = partial_tensor.reshard(collect_spec)
        >>> complete_tensor
        ShardedTensor(
            ShardedTensorMetadata(
                shards_metadata=[
                    ShardMetadata(shard_offsets=[0], shard_sizes=[2], placement=rank:0/cuda:0),
                    ShardMetadata(shard_offsets=[2], shard_sizes=[2], placement=rank:1/cuda:1)],
                size=torch.Size([4])
        )
        >>> complete_tensor.local_tensor()
        tensor([3, 4]) # Rank 0
        tensor([5, 6]) # Rank 1

        >>> # All tensors below are of torch.cfloat type.
        >>> # We have 2 process groups, 2 ranks.
        >>> tensor = torch.tensor([1, 2]) + 2 * rank
        >>> tensor = torch.cat([tensor, tensor + 2])
        >>> tensor
        tensor([1, 2, 3, 4]) # Rank 0
        tensor([3, 4, 5, 6]) # Rank 1
        >>> partial_tensor = _PartialTensor(tensor)
        >>> complete_tensor = partial_tensor.reshard(collect_spec)
        >>> complete_tensor
        ShardedTensor(
            ShardedTensorMetadata(
                shards_metadata=[
                    ShardMetadata(shard_offsets=[0], shard_sizes=[2], placement=rank:0/cuda:0),
                    ShardMetadata(shard_offsets=[2], shard_sizes=[2], placement=rank:1/cuda:1)],
                size=torch.Size([4])
        )
        >>> complete_tensor.local_tensor()
        tensor([4, 6]) # Rank 0
        tensor([8, 10]) # Rank 1
    _process_group_local_shard
_reduce_opNc                 C   sL   t jj| | |j|j| |jd}|d k	r2|nt	 |_
||_||_|S )N)dtypelayoutZ
pin_memoryrequires_grad)torchTensorZ_make_wrapper_subclasssizer   r   	is_pinnedr   r   Z_get_default_groupr   r   r   )clslocal_shardprocess_group	reduce_oprr   r   r   __new__w   s    	z_PartialTensor.__new__c                 C   s   t | jtjstdd S )Nz<reduce_op needs to be a member of distributed_c10d.ReduceOp.)
isinstancer   r   ReduceOp
ValueErrorselfr   r   r   __post_init__   s    z_PartialTensor.__post_init__r   )resharding_specreturnc                    s  ddl m} t|tjs td| j r2tdt|j	}| j
|| j
  }| j}|dkrdg|	 d  }| j
 | |d< tjj|t|dd}t| j}d}d	}	dg| j
  }
t|jD ]4\}}| |kr|}| |krd
}	||
| < q|j| j
 |d |	r, fdd|
D  tt d t | j| jd}| j
 }|dkr| jj| j
 |d}|| 
 }|
 |kr||d|| }|j|||| jdS )au  
        The reshard happens in two steps logically:

        1. Aggregate all the shards of the partial tensor.
        2. Shard this tensor according to the provided spec.

        In reality, for the sake of performance, we consolidate all partial tensors
        across multiple ranks and covert to a sharded tensor in one step.

        Args:
            resharding_spec (:class:`torch.distributed._shard.sharding_spec.ShardingSpec`):
                The specification describing how we reshard the aggregated local result.

        Returns:
            A :class:`ShardedTensor` filled with local aggregated result.
        r   r
   z-Only ChunkShardingSpec supported for reshard.z/Only real partial tensor supported for reshard.   ZconstantNFT)dimc                    s   g | ]} | qS r   r   ).0idxlocal_shardsr   r   
<listcomp>   s     z*_PartialTensor.reshard.<locals>.<listcomp>)r   groupr!   )Z+torch.distributed._shard.sharded_tensor.apir   r%   
shard_specZChunkShardingSpecNotImplementedErrorr   Z
is_complexintr/   r   r   r   nnZ
functionalpadtupledistZget_rank	enumerateZ
placementsZrankchunkr   Z
empty_likelistr   ZnarrowZ_init_from_local_tensor)r)   r+   r   Zsharding_dimZchunk_mode_resr    paddingZcurrent_rankZrank_idxZrearrange_local_shardsindicesr1   Z	placementZlocal_resultZsharded_tensor_sizeZuneven_local_shardsZexpected_sizer   r2   r   reshard   sn    



 z_PartialTensor.reshardr   c              	      s   d   fdd}t || t || |tkr>t| ||| S tj }z8tj $ td|j d| d| dW 5 Q R X W 5 ~X d S )Nc                    s    d krt | tr| j d S N)r%   r   r   )er6   r   r   find_process_group   s    z=_PartialTensor.__torch_function__.<locals>.find_process_groupztorch function 'z', with args: z and kwargs: z! not supported for PartialTensor!)r	   r   r   Z_CZ_DisableTorchDispatchZDisableTorchFunctionRuntimeError__name__)r   r   typesargskwargsrF   Zguardr   r6   r   __torch_function__   s    


z!_PartialTensor.__torch_function__c                 C   s&   t d| j d|j d|j dd S )NzA z- object is being used from c++ while calling .zE but the there is no custom __torch_dispatch__ implementation for it.)rG   rH   
__module__)r   r   rI   rJ   rK   r   r   r   __torch_dispatch__   s    z!_PartialTensor.__torch_dispatch__c                    s   dt t|   dS )NzPartialTensor())superr   __repr__r(   	__class__r   r   rR      s    z_PartialTensor.__repr__)r   N)r   N)rH   rN   __qualname____doc__r   ZProcessGroup__annotations__r   r   r&   	__slots__ZSUMr$   r*   r7   ZShardingSpecrC   classmethodrL   rO   rR   __classcell__r   r   rS   r   r   $   s   
L



Nr   r   c                 C   s6   |d }|j }|d }|d }tt|||||jS )Nr      r-   )r   r   r   	transposer   )rI   rJ   rK   r!   Zpartial_tensorinputZdim0Zdim1r   r   r   _transpose_impl  s    r^   c                 C   s   t | |||S rD   r^   rI   rJ   rK   r!   r   r   r   partial_transpose  s    ra   c                 C   s   t | |||S rD   r_   r`   r   r   r   partial_torch_transpose  s    rb   c           
      C   s   |d }t |dkrtdg }t|D ]H\}}t|tsBtd|dkrR|j}n||jkrdtd||j q(|d krd}	n$d|krtdd|kr|d nd}	tt	||	||jS )Nr   z#Empty list of tensors to torch.cat!z3All inputs need to be an instance of _PartialTensorz\All _PartialTensor reduce_ops need to be the same, found: {reduce_op} and {input._reduce_op}outz"out" kwarg is not supported!r/   )
lenrG   r>   r%   r   r   appendr   r   cat)
rI   rJ   rK   r!   Z
input_listr3   r1   r]   r"   r/   r   r   r   partial_cat  s(    

rg   )r   NN)r   NN)r   NN)r   NN)-r   typingr   r   r   r   Ztorch.distributedZdistributedr=   Z&torch.distributed._shard.sharding_specZ_shardZsharding_specr7   r   Ztorch.distributed.nn.functionalr   Z(torch.distributed._shard.common_op_utilsr   Z*torch.distributed._shard.op_registry_utilsr   Ztorch.utils._pytreer	   Z'torch.distributed._shard.sharded_tensorr   r   rW   r   r   r   r^   r\   ra   rb   rf   rg   r   __get__shaper   r   r   r/   ndimZis_contiguous
contiguousr   r   r   r   <module>   s@     ^

