U
    Jºc  ã                	   @   s~   d dl mZmZmZ d dlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ deeef eeej eee
dd	œd
d„ZdS )é    )ÚAnyÚDictÚOptionalNé   )ÚStorageReader)ÚLoadPlanner)ÚDefaultLoadPlanner)Ú_DistWrapperF)Ú
state_dictÚstorage_readerÚprocess_groupÚcoordinator_rankÚno_distÚplannerÚreturnc           
         sj   t || |ƒ‰ˆdkrtƒ ‰‡‡‡‡fdd„}‡‡fdd„}ˆ d||¡‰ ‡ ‡‡fdd„}ˆ d	|¡}	dS )
aÑ	  
    Load a distributed state_dict in SPMD style.

    Each rank will try to read the least amount of data necessary
    to fullfill the requested `state_dict`.

    When loading ShardedTensor instances, each rank only
    reads data for their local shards.

    All tensors in ``state_dict`` must be allocated on their
    destination device prior to calling this function.

    All non-tensor data is loaded using `torch.load()` and modified in place
    on state_dict.

    Users must call `load_state_dict` on the root module to ensure load
    pos-processing and non-tensor data properly propagates.

    This function can be used for local inference and load a checkpoint
    produced by ``save_state_dict`` without having a process group initialized
    by passing ``no_dist=True`` and by using Tensors instead of ShardedTensors.

    Args:
        state_dict (Dict[str, Any]) : The state_dict to load. Note that this
            state dict will updated in places.
        storage_reader (StorageReader): StorageReader used to load data from.
        process_group (ProcessGroup): ProcessGroup to be used for cross-rank synchronization
        coordinator_rank (int): Rank to use to coordinate the checkpoint, rank0 is used by default
        no_dist (bool): Don't attempt to load in SPMD style. Default to False

    Returns:
        None.

    Examples
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()
        >>> optimizer = Adagrad(my_model.parameters())
        >>> model_state_dict = my_model.state_dict()
        >>> fs_storage_loader = torch.distributed._shard.checkpoint.FileSystemLoader("/checkpoint/1")

        >>> torch.distributed._shard.checkpoint.load_state_dict(
        >>>     state_dict=model_state_dict,
        >>>     storage_reader=fs_storage_loader,
        >>> )

        >>> # module.load_state_dict() function might have customized steps
        >>> # to flush the state_dict, must call it to
        >>> # ensure correct behavior.
        >>> my_model.load_state_dict(model_state_dict)

    .. note:: load_state_dict uses collectives to coordinate reads across ranks.
        For NCCL-based process groups, internal tensor representations of objects
        must be moved to the GPU device before communication takes place. In this
        case, the device used is given by ``torch.cuda.current_device()`` and it
        is the user's responsibility to ensure that this is set so that each rank
        has an individual GPU, via ``torch.cuda.set_device()``
    Nc                     sH   ˆd k	st ‚ˆ ¡ } ˆ ˆ| ˆ j¡ ˆ | ˆ j¡ ˆ ¡ }ˆ |¡}|S ©N)ÚAssertionErrorZread_metadataÚinitZis_coordinatorZcreate_local_planZprepare_local_plan)ÚmetadataZ
local_plan)ÚdistWr   r
   r   © úY/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/_shard/checkpoint/state_dict_loader.pyÚ
local_stepS   s    
z#load_state_dict.<locals>.local_stepc                    s$   ˆ d k	st ‚ˆ  | ¡} ˆ | ¡} | S r   )r   Zcreate_global_planZprepare_global_plan)Zall_local_plans)r   r   r   r   Úglobal_step]   s    

z$load_state_dict.<locals>.global_stepZplanc                     s.   ˆd k	st ‚ˆ ˆ ¡} ˆ | ˆ¡}| ¡  d S r   )r   Zfinish_planÚ	read_dataÚwait)Zfinal_local_planZ	all_reads)Úcentral_planr   r   r   r   r   e   s
    
z"load_state_dict.<locals>.read_dataÚread)r	   r   Zreduce_scatterZ
all_gather)
r
   r   r   r   r   r   r   r   r   Ú_r   )r   r   r   r
   r   r   Úload_state_dict   s    A
r   )Nr   FN)Útypingr   r   r   Ztorch.distributedZdistributedÚdistZstorager   r   r   Zdefault_plannerr   Úutilsr	   ÚstrZProcessGroupÚintÚboolr   r   r   r   r   Ú<module>   s$       ú
ù