U
    Jc                  	   @   s~   d dl mZ d dlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZmZ ddlmZ dee
eej eeeed
ddZdS )    )OptionalN   )SavePlanner)DefaultSavePlanner)StorageWriter)MetadataSTATE_DICT_TYPE)_DistWrapperF)
state_dictstorage_writerprocess_groupcoordinator_rankno_distplannerreturnc           
         s   t || |dkrt dk	s(tdfdd}fdd}d||  fdd}fd	d
}	d||	S )a&	  
    Save a distributed model in SPMD style.

    This function is different from ``torch.save()`` as it handles
    ``ShardedTensor`` by having each rank only save their local shards.

    To produce a state_dict with ShardedTensor instances you must call
    ``_register_state_dict_hook`` on the top module with value
    `torch.distributed._shard.sharded_tensor.state_dict_hook` prior to
    calling `state_dict()` on the top module.

    There is no guarantees of Backwards Compatibility across PyTorch versions
    for saved state_dicts.

    If using the `process_group` argument, make sure that only its ranks
    call `save_state_dict` and that all data in state_dict belong to it.

    This function can be used to save a state_dict with an intialized process
    group by passing ``no_dist=True``. This can be used to produce a checkpoint
    that can consumed by load_state_dict is a SPMD fashion.

    Args:
        state_dict (Dict[str, Any]) : A state_dict
        storage_writer (StorageWriter): Instance of StorageWrite use to perform writes.
        process_group (ProcessGroup): ProcessGroup to be used for cross-rank synchronization
        coordinator_rank (int): Rank to use to coordinate the checkpoint, rank0 is used by default
        no_dist (bool): Don't attempt to save in SPMD style. Default to False

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()
        >>> # We must call this function prior to state_dict()
        >>> my_model._register_state_dict_hook(state_dict_hook)

        >>> model_state_dict = my_model.state_dict()

        >>> fs_storage_writer = torch.distributed._shard.checkpoint.FileSystemWriter("/checkpoint/1")
        >>> torch.distributed._shard.checkpoint.save_state_dict(
        >>>     state_dict=model_state_dict,
        >>>     storage_writer=fs_stroage_writer,
        >>> )

    .. note:: save_state_dict uses collectives to coordinate writes across ranks.
        For NCCL-based process groups, internal tensor representations of objects
        must be moved to the GPU device before communication takes place. In this
        case, the device used is given by ``torch.cuda.current_device()`` and it
        is the user's responsibility to ensure that this is set so that each rank
        has an individual GPU, via ``torch.cuda.set_device()``
    Nc                     s<   d k	st  j  j  } | } | S N)AssertionErrorinitZis_coordinatorZcreate_local_planZprepare_local_plan)Z
local_plan)distWr   r
   r    X/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/_shard/checkpoint/state_dict_saver.py
local_stepS   s    
z#save_state_dict.<locals>.local_stepc                    s(   d k	st | \}  | } | S r   )r   Zcreate_global_planZprepare_global_plan)Zall_local_plans)global_metatadatar   r   r   r   global_step[   s    
z$save_state_dict.<locals>.global_stepZplanc                     s2   d k	st  } | }|  | S r   )r   Zfinish_plan
write_datawaitvalue)Zfinal_local_planZ
all_writes)central_planr   r   r   r   r   e   s
    
z#save_state_dict.<locals>.write_datac                    s    d k	st j | d  S )N)metadataresults)r   finish)Zall_results)r   r   r   r   finish_checkpointm   s    z*save_state_dict.<locals>.finish_checkpointwrite)r	   r   r   Zreduce_scatterZ
all_reduce)
r
   r   r   r   r   r   r   r   r   r!   r   )r   r   r   r   r
   r   r   save_state_dict   s    9r#   )Nr   FN)typingr   Ztorch.distributedZdistributeddistr   r   Zdefault_plannerr   Zstorager   r   r   r   utilsr	   ZProcessGroupintboolr#   r   r   r   r   <module>   s&       