U
    Jc                     @   sb   d dl Z d dlmZ d dlmZ d dlmZ d dlmZ e j	e j
j	e j
jgZG dd de j
ZdS )    N)ShardedTensor)distributed_c10d)get_default_nowrap_functionsc                       sp   e Zd ZU dZejed< dgZdddZdd Z	 fdd	Z
edddZedddZdd Zdd Z  ZS )ReplicatedTensora   
    ReplicatedTensor represents a tensor which is replicated across the `world_size` and
    has the same value on each rank.

    ReplicatedTensor is a :class:`~torch.Tensor` subclass, and it could be used together
    with ShardedTensor/Tensor together to express different types of computation. The
    inter-op rules defined as (using torch.add as an example op):
        ReplicatedTensor + ReplicatedTensor = ReplicatedTensor
        ReplicatedTensor + torch.Tensor = torch.Tensor
        ReplicatedTensor + ShardedTensor = ShardedTensor
        ReplicatedTensor + other type (i.e. Scalar) = other type

    NOTE: We do not gurantee equal content of ReplicatedTensor across nodes after its
    construction. Although we defined proper inter-op rules to make sure ReplicatedTensor
    stays the same, there's no enforcement on it (i.e. if you manually modify content on
    some ranks, the modified value will not automatically get synced to other nodes). If
    you wish to manually validate tensors are the same across ranks, use `validate()`.

    _process_groupNc                 C   s>   |d krt d}t j| ||j}|d k	r0|nt |_|S )Nr   )torchemptyTensorZ_make_subclassrequires_gradr   Z_get_default_groupr   )clsdataZprocess_groupr r   N/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/_shard/replicated_tensor.py__new__(   s    
zReplicatedTensor.__new__c                 C   sH   t | |kr|t |  S t| | jjtjd| j}||t | < |S d S )N)Zmemory_format)idtyper   cloner   Zpreserve_formatr   )selfmemoresultr   r   r   __deepcopy__3   s
    zReplicatedTensor.__deepcopy__c                    s   dt t|   dS )NzReplicatedTensor())superr   __repr__r   	__class__r   r   r   ;   s    zReplicatedTensor.__repr__r   c              
      s  d kri d dd  fdd}D ]}||\}}|r4|  S q4d k	r  D ] \}	}
||
\}}|rb|  S qbtj v }t kr|W  5 Q R  S t|tjot|t } pڈoڈtk}|r|r|	t}|_
|W  5 Q R  S Q R X d S )NTc                    st   t | trd| fS t | trVd kr8| jqp| jkrptdj dnt | tjrldd nd dS )NTzOReplicatedTensor operands must be in the same process group in torch function 'zQ', but found at least two ReplicatedTensor operands in different process groups! F)FN)	
isinstancer   __torch_function__r   r   RuntimeError__name__r   r	   )argZall_replicatedargsfunckwargsZreplicated_pgZreplicated_with_non_tensortypesr   r   dispatch_argJ   s    


z9ReplicatedTensor.__torch_function__.<locals>.dispatch_arg)itemsr   _CDisableTorchFunctionr   r   r	   r   %_REPLICATED_WITH_NON_TENSOR_ALLOWLISTZas_subclassr   )r   r%   r'   r$   r&   r(   r"   ZredispatchedreskvrsZresult_not_replicatedZshould_convert_to_replicatedr   r#   r   r   >   s6    





z#ReplicatedTensor.__torch_function__)returnc                    sv   t  j}t  j} fddt|D }t j|  jd t|D ](\}}t |sHt	d| d| qHdS )a  
        Validate the ReplicatedTensor is legit by all gathering tensors on all ranks
        and check to make sure they are the same.

        If there's some ranks with different values, a ValueError will be raised.

        Keyword args:
            process_group (ProcessGroup, optional): The process group to work on. If None,
                the default process group will be used.

        Returns:
            True if validation succeed.
        c                    s   g | ]}t  qS r   )r   Z
empty_like).0_r   r   r   
<listcomp>   s     z-ReplicatedTensor.validate.<locals>.<listcomp>)groupz/ReplicatedTensor have different values on rank z and T)
distZget_world_sizer   Zget_rankrangeZ
all_gather	enumerater   Zallclose
ValueError)r   Z
world_sizeZcurrent_rankZtensors_on_rankZrankZtensorr   r   r   validate   s    zReplicatedTensor.validatec              	   C   s<   t j ( || _|j| _ddlm} | | _W 5 Q R X d S )Nr   )_get_current_process_group)r   r*   r+   r   r
   Ztorch.distributed._shard.apir;   r   )r   stater;   r   r   r   __setstate__   s
    zReplicatedTensor.__setstate__c                 C   s   | j S )N)r   r   r   r   r   __getstate__   s    zReplicatedTensor.__getstate__)NN)r   N)r!   
__module____qualname____doc__r   ZProcessGroup__annotations__	__slots__r   r   r   classmethodr   boolr:   r=   r>   __classcell__r   r   r   r   r      s   


Dr   )r   Ztorch.distributedZdistributedr6   Z+torch.distributed._shard.sharded_tensor.apir   r   Ztorch.overridesr   Z	unsqueezer	   __getitem__r,   r   r   r   r   r   <module>   s   