U
    JcH                     @  s  U d dl mZ d dlmZ d dlmZmZmZmZm	Z	m
Z
mZ d dlZd dlmZ d dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlm  m  mZ d d	lmZmZ d d
l m!Z!m"Z" ddl#m$Z$m%Z% ddl&m'Z' ddl(m)Z)m*Z* ddl+m,Z,m-Z-m.Z.m/Z/m0Z0 d dl1m2Z2 d dl3m4Z4 e5 Z6d a7i a8de9d< i Z:de9d< i Z;de9d< ddddddZ<G dd dej=Z>G dd de>Z?d d!d"d#Z@dS )$    )annotations)	dataclass)CallableDictListOptionalSequenceTuplecastN)reduce)rpc)distributed_c10d)ShardMetadata)_dispatch_custom_op_has_custom_op)check_tensor(validate_non_overlapping_shards_metadata   )TensorPropertiesShardedTensorMetadata)Shard)reshuffle_local_shardreshard_local_shard)_flatten_tensor_size!_parse_and_validate_remote_device"_validate_output_tensor_for_gather build_metadata_from_local_shardsbuild_global_metadata)_remote_device)tree_mapz1Dict[int, 'weakref.ReferenceType[ShardedTensor]']_sharded_tensor_mapzDict[Callable, Callable]_SHARDED_OPS_CUSTOM_SHARDED_OPSintList[rpc.RRef[Shard]])sharded_tensor_idrrefsrpc_rankc              	   C  s\   t N | tkr&td|  dt  t|   }|d krBtdn||| W 5 Q R X d S )Nz"Could not find sharded_tensor_id: z	 in map: z*ShardedTensor weakref has been deallocated)_sharded_tensor_lockr    RuntimeErrorkeys_register_remote_shards)r%   r&   r'   sharded_tensor r-   O/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/_shard/sharded_tensor/api.pyr+   8   s    

r+   c                   @  sv   e Zd ZU ded< ded< ded< dddd	Zdd
ddZdd
ddZedddddddZedddZ	dS )ShardedTensorBaseshard_spec.ShardingSpec_sharding_specr   	_metadataList[Shard]_local_shardssharding_specc                 O  s   t jd t|tjs*tdt| t|}|d }|d }|d }|d }|d krbt 	 }t
||||d}	|j||	d}
t jj| |||||d	}||_|
|_g |_|S )
Nz'torch.distributed._shard.sharded_tensorz Expecting ShardingSpec but got: dtypelayout
pin_memoryrequires_grad)r9   tensor_propertiesr7   r8   r9   r:   )torchZ_CZ_log_api_usage_once
isinstance
shard_specZShardingSpec
ValueErrortyper   Zget_default_dtyper   build_metadataTensorZ_make_wrapper_subclassr1   r2   r4   )clsr6   sizekwargsZsizesr7   r8   r9   r:   r<   sharded_tensor_metadatarr-   r-   r.   __new__I   s@        	zShardedTensorBase.__new__returnc                 C  s   | j S )z~
        Returns a :class:`ShardedTensorMetadata` object corresponding to the
        metadata for the entire tensor.
        r2   selfr-   r-   r.   metadatar   s    zShardedTensorBase.metadatac                 C  s   | j S )z
        Returns a list of :class:`Shard' corresponding to the
        local shards for this rank. Returns an empty list if the current rank
        does not host any shards for this Tensor.
        )r4   rN   r-   r-   r.   local_shardsy   s    zShardedTensorBase.local_shardsN'ShardedTensor'rQ   rH   rL   c              	   C  s`  |j }|j}t|dkr td|jtjkr4td|dkrHt|}n|}t	j
t	||j|j|j|j|jd}ddd}|D ]}	|	j}
|	j}|
j}|dk	std	| }| }||j|jd
|d | std||
jt| d| ||j| d|d |||jd| ||j|jd|d ||j|jd|d q|t| t|t|j ||_|S )ab  
        Initialize a ShardedTensorBase with local shards and a global
        ShardedTensorMetadata built on each rank.
        Warning: This API is experimental and subject to change. It does
                 not do cross rank validations, and fully rely on the user
                 for the correctness of sharded_tensor_metadata on each rank
        r   z"shards_metadata must not be empty!0Only torch.strided layout is currently supportedNr=   Fc                 S  sR   |rdnd}| |krNt d| d| d| d| d| d|  d	| d| d
d S )Nztensor propertyzlocal ShardMetadatazLocal shards' tensor z property is incompatible with z	 on rank z:  =z, local shard tensor .)rA   )expectedactualZ	prop_namerankZis_propertyZtensor_property_or_metadatar-   r-   r.   _raise_if_mismatch   s    
4zYShardedTensorBase._init_from_local_shards_and_global_metadata.<locals>._raise_if_mismatchz#Must specify placement for `Shard`!r8   TAOnly torch.contiguous_format memory_format is currently supportedrF   r9   devicer7   r:   )F)shards_metadatar<   lenrA   r8   r>   stridedr@   )_infer_sharding_spec_from_shards_metadataShardedTensorrJ   rF   r7   r9   r:   rP   tensor	placementAssertionErrorrZ   r]   is_contiguousshard_sizeslist	is_pinnedr   r   r4   )rE   rQ   rH   r6   r^   r<   specZsharded_tensor_baser[   shardZ
shard_metaZlocal_shard_tensorrd   rZ   local_devicer-   r-   r.   +_init_from_local_shards_and_global_metadata   s    


	z=ShardedTensorBase._init_from_local_shards_and_global_metadatar-   c                 C  s&   t d| j d|j d|j dd S )NzA z- object is being used from c++ while calling rW   zE but the there is no custom __torch_dispatch__ implementation for it.)r)   __name__
__module__)rE   functypesargsrG   r-   r-   r.   __torch_dispatch__   s    z$ShardedTensorBase.__torch_dispatch__)N)r-   N)
rn   ro   __qualname____annotations__rJ   rP   rQ   classmethodrm   rs   r-   r-   r-   r.   r/   D   s   
) lr/   c                      s  e Zd ZdZdd fddZdejddejdddddd	d
ZdRddZ	dd Z
dd Zdd ZddddZdSddddddZejdfd dddZddejdfd dd d!Zd dd"d#Zeddd$d%d&d'd(Zeddd$d)dd*d+d,d-d.d/ZedTd%d0d,d1 fd2d3Zddd4d5Zdd d6d7d8Zd)dd9d:ZedUd<d=Zd>dd?d@ZdAddBdCdDZdEddFdGZdHdI ZdJdK Ze G dLdM dMZ!dNdO Z"dPdQ Z#  Z$S )Vrb   a
  
    ShardedTensor is an torch.Tensor subclass to represent Tensors that are sharded
    across multiple devices and multiple processes.

    ShardedTensor is initialized in an SPMD like fashion where each rank
    initializes the ShardedTensor. The ShardedTensor object on each rank
    then only stores the local shard for the Tensor and provides global
    metadata for all the shards.

    ShardedTensor doesn't provide any Tensor like operations but is a wrapper
    providing the Tensor representing the local shard and the global metadata.
    Using these, users can build their custom distributed._sharded computations
    on top of this primitive. The local shards are all initialized using the
    create_op specified by tensor_init_params.create_op, e.g., torch.ones, or
    torch.empty

    Args:
        sharding_spec (:class:`torch.distributed._shard.sharding_spec.ShardingSpec`): The specification
            describing how to shard the Tensor.
        size (int...): a sequence of integers defining the shape of the output
            tensor. Can be a variable number of arguments or a collection like a list or tuple.

    Keyword args:
        dtype (:class:`torch.dtype`, optional): the desired data type of returned tensor.
                Default: if ``None``, uses a global default (see :func:`torch.set_default_tensor_type`).
        layout (:class:`torch.layout`, optional): the desired layout of returned Tensor.
            Default: ``torch.strided``.
        requires_grad (bool, optional): If autograd should record operations on the
            returned tensor. Default: ``False``.
        pin_memory (bool, optional): If set, returned tensor would be allocated in
            the pinned memory. Works only for CPU tensors. Default: ``False``.
        memory_format (:class:`torch.memory_format`, optional): the desired memory format of
            returned Tensor. Default: ``torch.contiguous_format``.
        init_rrefs (bool, optional): Whether or not to initialize
            :class:`torch.distributed.rpc.RRef`s pointing to remote shards.
            Need to initialize the RPC Framework if specified as ``True``.
            Default: ``False``.

    .. note:: ShardedTensor uses collectives to do various operations, i.e. it
        uses all_gather to do cross rank validations. For NCCL-based process
        groups, internal tensor representations of objects must be moved to the
        GPU device before communication takes place. In this case, the device
        used is given by ``torch.cuda.current_device()`` and it is the user's
        responsibility to ensure that this is set so that each rank has an
        individual GPU, via ``torch.cuda.set_device()``

    r0   r5   c                   s   t t| j| |f||}|S N)superrb   rJ   )rE   r6   rF   rG   rO   	__class__r-   r.   rJ   %  s    zShardedTensor.__new__NF)r7   r8   r:   r9   memory_formatprocess_group
init_rrefsc                G  s   | j ||d |tjkr td|tjkr2td|| jj_t	| j
}
| jjD ]D}t| j
|j\}}||
krPt|j|| jjd}| jt|| qP|   d S )Nr|   r}   rT   r\   )rl   r<   )_prepare_initr>   r`   rA   contiguous_formatr2   r<   r{   distget_rank_process_groupr^   r   rd   _create_tensor_from_paramsrg   r4   appendr   
_post_init)rO   r6   r7   r8   r:   r9   r{   r|   r}   rF   current_rankshard_metadatarZ   r]   local_tensorr-   r-   r.   __init__)  s"    


zShardedTensor.__init__c                 C  s,   || _ d | _|d k	r|nt | _i | _d S rw   )_init_rrefs_sharded_tensor_idr   _get_default_groupr   _remote_shards)rO   r|   r}   r-   r-   r.   r   P  s    zShardedTensor._prepare_initc              	   C  sP   | j rLt$ t| _t| t| j< td7 aW 5 Q R X t sDt	d| 
  d S )Nr   zgRPC Framework needs to be initialized using torch.distributed.rpc.init_rpc if init_rrefs is set to True)r   r(   _sharded_tensor_current_idr   weakrefrefr    r   Z_is_current_rpc_agent_setr)   	_init_rpcrN   r-   r-   r.   r   \  s    zShardedTensor._post_initc              	   C  s4   t & t| dr&| jtkr&t| j W 5 Q R X d S )Nr   )r(   hasattrr   r    poprN   r-   r-   r.   __del__k  s    zShardedTensor.__del__c                 C  s  t  }t j}||kr.td| d| i | _t  }i }i }|D ]}|j	||j< |j||j	< qLtj
| j}g }t j}tt  D ]\}	|	t  krqt|  dkrdd |  D }
tj|	t|||	  |
|fd}|| qtj| tj
d  d S )NzaDefault ProcessGroup and RPC ranks must be the same for ShardedTensor, found process group rank: z and RPC rank: r   c                 S  s   g | ]}t |qS r-   )r   ZRRef).0rk   r-   r-   r.   
<listcomp>  s     z+ShardedTensor._init_rpc.<locals>.<listcomp>)rr   )r   r   r   Zget_worker_infoidrA   r   Z_get_current_rpc_agentZget_worker_infosnameapiZ_all_gatherr   rangeget_world_sizer_   rQ   Z	rpc_asyncr+   r   r>   ZfuturesZwait_all)rO   Zpg_rankr'   Zworker_infosZrank_to_nameZname_to_rankZworker_infoZall_tensor_idsZfutsrZ   r&   Zfutr-   r-   r.   r   u  s:    

zShardedTensor._init_rpcztorch.devicerK   c                 C  s.   t | jt jjkr$ttj S tdS )z
        Return the prefered device to be used when creating tensors for collectives.
        This method takes into account the associated process group
        cpu)	r   get_backendr   ZBackendZNCCLr>   r]   cudacurrent_devicerN   r-   r-   r.   _get_preferred_device  s    z#ShardedTensor._get_preferred_devicer   r#   zOptional[torch.Tensor]None)dstoutrL   c              	     s  dd }t | j}|  j}t||| |  }t | j}dd t|D }d i }	|  j	D ]R}
t
t|
j }|dk	st||| f|	|
< ||  ||
7  < t ||  qd||krdk	st fddt|D }nd}t V tj |  d}|D ]8}|j }|	|j d	 }||||   | q
W 5 Q R X t j|||| jd
 ||krpdS t
tj|dk	st|  j}t|}|  j	D ]r}
|	|
 \}}|| }|||||
  }||
j}}t|D ] }|||
j| |
j| }q|| qdS )a  
        Creates a full :class:`Tensor` on rank ``dst`` by gathering all shards of the
        sharded tensor.

        The API needs to be called on all ranks in SPMD fashion. All ranks should have
        the same ``dst``. ``out`` should be a tensor of the same size as the overall
        size of the sharded tensor on ``dst`` and ``None`` on all other ranks.

        Args:
            dst(int): The rank where full tensor is constructed.
                Default: 0
            out (:class `torch.Tensor`, optional): The output full tensor.
                Must to be provided ONLY on ``dst`` rank.
                Default: ``None``
        c                 S  s   t dd | jS )Nc                 S  s   | | S rw   r-   )xyr-   r-   r.   <lambda>      z:ShardedTensor.gather.<locals>.shard_size.<locals>.<lambda>)r   rg   )shard_mdr-   r-   r.   
shard_size  s    z(ShardedTensor.gather.<locals>.shard_sizec                 S  s   g | ]}d qS )r   r-   r   _r-   r-   r.   r     s     z(ShardedTensor.gather.<locals>.<listcomp>r   Nc                   s   g | ]}t j fjd qS )r]   )r>   emptyr]   r   Zmax_rank_sizer   r-   r.   r     s     r   r   )rc   gather_listr   group)r   r   r   rP   rF   r   rQ   r   r   r^   r
   r   rd   rZ   re   maxr>   Zno_gradr   r   rc   flattenZnumelZcopy_gatherrD   r_   viewrg   ZnarrowZshard_offsets)rO   r   r   r   rZ   Z	full_sizerQ   
world_sizeZ
rank_sizesZshard_placementr   Z
shard_rankr   datark   srcZshard_offsetZdimsZrank_offsetrc   Zout_narrow_viewdimr-   r   r.   r     sd    


(

zShardedTensor.gatherc                 C  s   |t jkr|t jkrtdd}|  jD ]}||j jdkM }q*|rL| S g }| j	D ]<}|j
j|d}t|j}t d|j_|t|| qVt|  }	|	jD ]"}|j jdkrt d|j_q|dkr| jn|}
tj||	|
| jd}|S )a  
        Returns a copy of this object in CPU memory.

        If this ShardedTensor is already on CPU memory, then no copy is
        performed and original object is returned.

        .. note:: When moving a ShardedTensor from GPU to CPU, the ShardedTensor might
            need to be managed by a different type of ProcessGroup(i.e. ProcessGroupGloo),
            it is the user's responsiblity to explicitly pass in a new process_group that
            is compatible with CPU.
        GOnly `torch.contiguous_format` or `torch.preserve_format` is supported!Tr   )r{   NrH   r|   r}   )r>   preserve_formatr   r)   rP   r^   rd   r]   rB   r4   rc   r   copydeepcopy_devicer   r   r   rb   rm   r   )rO   r{   r|   Z
all_on_cpumetalist_shardsrk   Z
cpu_tensorrP   st_metapgZst_cpur-   r-   r.   r     s:    


zShardedTensor.cpuc                 C  s  |t jkr|t jkrtd|dk	r`t|tr8t |n|}t|t jrX|jt j	 ks`t
dt t j	 }g }| jD ]:}|jj|||d}t|j}	||	j_|t||	 qzt|  }
|
jD ]}|j jdkr||j_q|dkr| jn|}tj||
|| jd}|S )a  
        Returns a copy of this object in CUDA memory, if the original ShardedTensor
        is on CPU, we will move the local shard to the current GPU device of each
        process in a SPMD fashion.
        If this ShardedTensor is already on CUDA memory and local shards on each rank are
        already on current device, we still returns a new ShardedTensor object with new
        metadata, but no underlying data movements are performed.
        .. note:: When moving a ShardedTensor from CPU to GPU, the ShardedTensor might
            need to be managed by a different type of ProcessGroup(i.e. ProcessGroupNCCL),
            it is the user's responsiblity to explicitly pass in a new process_group that
            is compatible with GPU.
        r   NzSOnly device without device id (e.g. "cpu" or "cuda") is expected for ShardedTensor!)r]   non_blockingr{   r   r   )r>   r   r   r)   r?   strr]   indexr   r   re   r4   rc   r   r   rP   rd   r   r   r   r^   rB   r   rb   rm   r   )rO   r]   r   r{   r|   r   r   rk   Zcuda_tensorrP   r   r   r   Zst_cudar-   r-   r.   r   8  sD    



zShardedTensor.cudac                 O  s<  | j d jj}| j}|}|}t|dkrt|d tjrB|d }qt|d tjr\|d }qt|d ttfr~t|d }qt|d tj	r|d j}|d j}qt
d| n.t|dkr|\}}n|d|}|d|}t|ttfrt|n|}|jdkr<tj }|j|kr2dd l}|d t|}|d	d
}	|dd
}
|dtj}|dd }|	s||kr||kr| S g }| j D ]L}|jj|||
|	|d}t|j}|jd k	r||j_|t|| qt|  }||j_|jD ]}||j_q |d kr | jn|}tj|||| j d}|S )Nr   r   z)ShardedTensor.to() have wrong arguments:    r7   r]   r   zuShardedTensor.to only move tensor to its current deviceIf you want to put to different device, use `reshard` instead.r   Fr   r{   r|   )r]   r7   r   r   r{   r   )!r4   rc   r]   r7   r_   r?   r>   r   r#   rD   r)   getrB   r   r   r   warningswarnr   tor   r   rP   rd   r   r   r   r<   r^   r   rb   rm   r   )rO   rr   rG   r   Zcurrent_dtypeZ	device_toZdtype_toZcurrent_idxr   Zcopy_tensorr   r{   r|   r   rk   Z
new_tensorrP   r   r   r   Zst_tor-   r-   r.   r   x  sr    








zShardedTensor.tor~   r3   )rQ   c             	   G  s   |d k	r|nt  }t|}t|}d }t|}t|dkrNt||||}g }	|dkr~dd t|D }	tj	|	||d n|g}	t
|	}
|
j}t|
j}| j| ||
j|j|j|j|jd}|j||d ||_|  |S )Nr   r   c                 S  s   g | ]}d qS rw   r-   r   r-   r-   r.   r     s     z9ShardedTensor._init_from_local_shards.<locals>.<listcomp>)r   r=   r~   )r   r   r   r   r   r   r_   r   r   Zall_gather_objectr   r<   r@   ra   r^   rJ   rF   r7   r8   r9   r:   r   r4   r   )rE   rQ   r|   r}   global_sizer   r   Zlocal_sharded_tensor_metadataglobal_tensor_sizeZgathered_metadatasZglobal_sharded_tensor_metadatar<   rj   r,   r-   r-   r.   _init_from_local_shards  sJ    

z%ShardedTensor._init_from_local_shardsztorch.TensorzSequence[int]zdist.ProcessGrouprR   )r   r6   r   r|   rL   c                G  s   |  stdt|}t|j|j|jtj|	 d}|
||}|dk	rN|nt }t|}	g }
|jD ],}t||j\}}||	krj|
t|| qjtj|
||||dS )ah
  
        Initialize a ShardedTensor given only one local tensor, global sharded tensor
        size and sharding spec on each rank.

        Args:
            local_tensor (Tensor): Single tensor of local shard stored in each rank.
            sharding_spec (:class:`torch.distributed._shard.sharding_spec.ShardingSpec`):
                The specification describing how to shard the Tensor.
            global_size (Sequence[int]): Size of the sharded tensor.
            process_group (ProcessGroup, optional): The process group to aggregate on.
                Default: None
            init_rrefs (bool, optional): Whether or not to initialize
                :class:`torch.distributed.rpc.RRef`s pointing to remote shards.
                Need to initialize the RPC Framework if specified as ``True``.
                Default: ``False``.

        Returns:
            A :class:`ShardedTensor` sharded based on the given sharding_spec with local
                tensor stored in the current rank.

        Examples:
            >>> # All tensors below are of torch.int64 type.
            >>> # We have 2 process groups, 2 ranks.
            >>> # xdoctest: +SKIP
            >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
            >>> local_tensor = torch.unsqueeze(torch.cat([tensor, tensor + 2]))
            >>> local_tensor
            tensor([[1, 2, 3, 4]]) # Rank 0
            tensor([[3, 4, 5, 6]]) # Rank 1
            >>> sharding_dim = 0
            >>> sharding_spec = ChunkShardingSpec(
                    dim=sharding_dim,
                    placements=[
                        "rank:0/cuda:0",
                        "rank:1/cuda:1",
                    ],
                )
            >>> st = ShardedTensor._init_from_local_tensor(local_tensor, sharding_spec, [2, 4])
            >>> st
            ShardedTensor(
                ShardedTensorMetadata(
                    shards_metadata=[
                        ShardMetadata(shard_offsets=[0, 0], shard_sizes=[1, 4], placement=rank:0/cuda:0),
                        ShardMetadata(shard_offsets=[1, 0], shard_sizes=[1, 4], placement=rank:1/cuda:1),
                    ],
                    size=torch.Size([2, 4])
            )
            >>> st.local_tensor()
            tensor([1, 2, 3, 4]) # Rank 0
            tensor([3, 4, 5, 6]) # Rank 1

        Warning: This API is experimental and subject to change. It lacks of a fully across
                 rank validations, and we only validate the local shard on the current rank.
                 We fully rely on the user to ensure local tensor is sharded based on the
                 sharding spec.
        z(local_tensor is not a contiguous Tensor.)r7   r8   r:   r{   r9   N)r|   r}   r6   )rf   rA   r   r   r7   r8   r:   r>   r   ri   rC   r   r   r   r   r^   r   rd   r   r   rb   rm   )rE   r   r6   r|   r}   r   r   r<   rH   r   rQ   r   rZ   r]   r-   r-   r.   _init_from_local_tensor  s>    A

z%ShardedTensor._init_from_local_tensorr   rS   c                   s   |dk	r|nt  }t|}|j}g }|D ]&}	t||	j\}
}||
kr,||	 q,t|t|krt	dt| dt| d| dt
t| j|||d}|j||d |  |S )a_  
        Initialize a ShardedTensor with local shards and a global
        ShardedTensorMetadata built on each rank.

        Warning: This API is experimental and subject to change. It does
                 not do cross rank validations, and fully rely on the user
                 for the correctness of sharded_tensor_metadata on each rank
        NzNumber of local shards (zM) does not match number of local shards metadata in sharded_tensor_metadata (z) on rank (z) r5   r~   )r   r   r   r   r^   r   rd   r   r_   r)   rx   rb   rm   r   r   )rE   rQ   rH   r|   r}   r6   r   r^   Zlocal_shard_metadatasr   rZ   rl   r,   ry   r-   r.   rm   k  s4    
   z9ShardedTensor._init_from_local_shards_and_global_metadatac                 C  s   | j S )z:
        Returns the ShardingSpec for the tensor.
        )r1   rN   r-   r-   r.   r6     s    zShardedTensor.sharding_spec)resharding_specrL   c                 C  s   t |tjrt | jtjs"tdt|  dkr:td| jj|jkr|| jj|jkrZ| S t	| 
 |  | j|| j\}}n t| 
 |  | j|| j\}}|| _|| j_|| _| S )a  
        Reshard a sharded tensor given the ``resharding_spec``. For now, we only support
        single local shard.

        If ``resharding_spec`` is same as the original one, this becomes a no-op.
        If only ``resharding_spec`` shares the same sharding dim with the original one,
        we swap local shards directly.
        For more generic cases, we merge different shards across different ranks and split
        the local shards based on the ``resharding_spec`` via `all_to_all` collective API.

        Args:
            resharding_spec (:class:`torch.distributed._shard.sharding_spec.ShardingSpec`): The
                specification describing how the tensor is sharded.

        Returns:
            A :class:`ShardedTensor` object whose local shards are resharded.

        Examples:
            >>> # We have 2 process groups, 2 ranks.
            >>> # xdoctest: +SKIP
            >>> tensor = torch.arange(4, dtype=torch.int64) + 1 + 2 * rank
            >>> tensor = torch.stack([tensor, tensor])
            >>> tensor
            tensor([[1, 2, 3, 4], [1, 2, 3, 4]]) # Rank 0
            tensor([[3, 4, 5, 6], [3, 4, 5, 6]]) # Rank 1
            tensor([[5, 6, 7, 8], [5, 6, 7, 8]]) # Rank 2
            tensor([[7, 8, 9, 10], [7, 8, 9, 10]]) # Rank 3
            >>> sharding_dim = 0
            >>> spec = ChunkShardingSpec(
                    dim=sharding_dim,
                    placements=[
                        "rank:0/cuda:0",
                        "rank:1/cuda:1",
                        "rank:2/cuda:2",
                        "rank:3/cuda:3",
                    ],
                )
            >>> current_offsets = [0] * 2
            >>> current_offsets[0] = rank * 2
            >>> shard_metadata = ShardMetadata(
                    shard_offsets=copy.deepcopy(current_offsets),
                    shard_sizes=tensor.size(),
                    placement=spec.placements[rank],
                )
            >>> local_shards = [
                    Shard(
                        tensor=tensor,
                        metadata=shard_metadata,
                    )
                ]
            >>> st = ShardedTensor._init_from_local_shards(local_shards, tensor.size())
            >>> sharding_dim = 1
            >>> resharding_spec = ChunkShardingSpec(
                    dim=sharding_dim,
                    placements=[
                        "rank:0/cuda:0",
                        "rank:1/cuda:1",
                        "rank:2/cuda:2",
                        "rank:3/cuda:3",
                    ],
                )
            >>> st.reshard(resharding_spec)
            >>> tensor = st.local_shards()[0].tensor
            >>> tensor
            tensor([[1], [1], [3], [3], [5], [5], [7], [7]]) # Rank 0
            tensor([[2], [2], [4], [4], [6], [6], [8], [8]]) # Rank 1
            tensor([[3], [3], [5], [5], [7], [7], [9], [9]]) # Rank 2
            tensor([[4], [4], [6], [6], [8], [8], [10], [10]]) # Rank 3
        z-Only ChunkShardingSpec supported for reshard.r   z.Only single local shard supported for reshard.)r?   r@   ZChunkShardingSpecr1   NotImplementedErrorr_   rQ   r   Z
placementsr   r   rF   r   r   r4   r2   r^   )rO   r   rQ   r^   r-   r-   r.   reshard  s8    G

zShardedTensor.reshardc                 C  s&   t |  dkrtd|  d jS )z
        Return local tensor for a sharded_tensor. For now we only support single local shard.

        Returns:
            A :class:`torch.Tensor` of the local shard.
        r   z%Only single local shard is supported.r   )r_   rQ   r   rc   rN   r-   r-   r.   r     s    zShardedTensor.local_tensorr-   c                   sp   ddd fdd}d fdd}t |  t | d k	rN||S td|j d	  d
 dd S )Nrb   r   strp   c                   s|   |t krt |  | jS t| j|r>t| j| | jS |tkrZt|  | jS td|j d  d dd S )Ntorch function '', with args:  and kwargs: ! not supported for ShardedTensor!)r"   r   r   r1   r   r!   r)   rn   r   )rr   rG   rq   r-   r.   dispatch  s     	z2ShardedTensor.__torch_function__.<locals>.dispatchc                   s    d krt | tr|  d S rw   )r?   rb   )e)st_instancer-   r.   find_sharded_tensor2  s    z=ShardedTensor.__torch_function__.<locals>.find_sharded_tensorr   r   r   r   )r   r)   rn   )rE   rp   rq   rr   rG   r   r   r-   )rr   rG   r   rq   r.   __torch_function__  s    


z ShardedTensor.__torch_function__boolc                 C  s
   | j jjS )za
        Returns True if the sharded tensor (each local shard) resides in pinned memory.
        )r2   r<   r9   rN   r-   r-   r.   ri   A  s    zShardedTensor.is_pinnedr$   )remote_shardsr'   c                 C  s   || j |< d S rw   )r   )rO   r   r'   r-   r-   r.   r+   G  s    z%ShardedTensor._register_remote_shardsz Dict[int, List[rpc.RRef[Shard]]]c                 C  s   | j std| jS )a  
        Returns a Dict[int, RRef] with keys being the RPC rank and values
        being RRefs to shards on that rank. Need to initialize the
        RPC framework for this functionality.

        Raises an exception if ShardedTensor was created with ``init_rrefs=False``
        zPShardedTensor created with init_rrefs=False, no RRefs to remote shards available)r   r)   r   rN   r-   r-   r.   r   J  s
    zShardedTensor.remote_shardsc                 C  s   t | S rw   )r   rN   r-   r-   r.   __hash__X  s    zShardedTensor.__hash__c                 C  s   d| j  dS )NzShardedTensor()rM   rN   r-   r-   r.   __repr__[  s    zShardedTensor.__repr__c                   @  s2   e Zd ZU dZded< ded< ded< ded< dS )zShardedTensor.ProcessGroupStatez3
        State for ser-de of process group
        r#   
local_rankglobal_ranklocal_world_sizeglobal_world_sizeN)rn   ro   rt   __doc__ru   r-   r-   r-   r.   ProcessGroupState^  s
   
r   c                 C  s>   t t| jt t| jt }| j| j|| j| j	fS rw   )
rb   r   r   r   r   r   r4   r2   r1   r   )rO   pg_stater-   r-   r.   __getstate__h  s    

zShardedTensor.__getstate__c                 C  s   d | _ t std|\| _| _}| _| _ddlm	} | | _
t| j
}|j|krltd|j d| t }|j|krtd|j d| t| j
}|j|krtd|j d| t }|j|krtd|j d| |   d S )	Nz`Need to initialize default process group using "init_process_group" before loading ShardedTensorr   )_get_current_process_groupzLocal rank at save time was z, but at load time was zGlobal rank at save time was z"Local world size at save time was z#Global world size at save time was )r   r   Zis_initializedr)   r4   r2   r1   r   Ztorch.distributed._shard.apir   r   r   r   r   r   r   r   r   )rO   stater   r   r   r   r   r   r-   r-   r.   __setstate__r  s:    



zShardedTensor.__setstate__)NF)r   N)NFN)r-   N)%rn   ro   rt   r   rJ   r>   r`   r   r   r   r   r   r   r   r   r   r   r   r   rv   r   r   rm   r6   r   r   r   ri   r+   r   r   r   r   r   r   r   __classcell__r-   r-   ry   r.   rb      sl   /'

-  X9@N=f   3f*	
rb   r   r;   c              	   G  s8   |j }|j}|j}|j}|j}tj|||| |||dS )zA Helper to construct tensor from size, device and common params. )r7   r8   r]   r:   r{   r9   )r7   r8   r:   r{   r9   r>   r   )rl   r<   rF   r7   r8   r:   r{   r9   r-   r-   r.   r     s        r   )A
__future__r   Zdataclassesr   typingr   r   r   r   r   r	   r
   r   	functoolsr   r   	threadingr>   Ztorch.distributedZdistributedr   r   r   Z!torch.distributed._shard.metadatar   Z&torch.distributed._shard.sharding_specZ_shardr6   r@   Z*torch.distributed._shard.sharding_spec.apir   r   Z1torch.distributed._shard.sharding_spec._internalsr   r   rP   r   r   rk   r   r   r   r   utilsr   r   r   r   r   Ztorch.distributed.remote_devicer   Ztorch.utils._pytreer   Lockr(   r   r    ru   r!   r"   r+   rD   r/   rb   r   r-   r-   r-   r.   <module>   sL    $	 2       -