U
    Jc                     @   s
  d dl Z d dlmZ d dlmZ d dlmZ d dlm	Z	m
Z
mZmZ g Ze	e	eee	df eedf f dddZee	df eedf eee	df e
ee	f f d	d
dZdd Zdd ZdddZdd Zejee j eedddZe
ee	f eeddddZdS )    N)_get_stream)_is_namedtuple)AnyDictListTuple.)argskwargsreturnc                  O   sB   g }t | }| D ]\}}|| || qt|t|fS )a  
    Turn argument list into separate key list and value list (unpack_kwargs does the opposite)
    Inspiration: https://github.com/facebookresearch/fairscale/blob/eeb6684/fairscale/internal/containers.py#L70
    Usage::

        kwarg_keys, flat_args = pack_kwargs(1, 2, a=3, b=4)
        assert kwarg_keys == ("a", "b")
        assert flat_args == (1, 2, 3, 4)
        args, kwargs = unpack_kwargs(kwarg_keys, flat_args)
        assert args == (1, 2)
        assert kwargs == {"a": 3, "b": 4}
    Returns:
        Tuple[Tuple[Any, ...], Tuple[str, ...]]: The first tuple element gives
        gives both positional args and kwarg values, where the positional args
        proceed kwarg values and kwarg values are ordered consistently with the
        kwarg keys. The second tuple element gives the kwarg keys.
        The second tuple element's length is at most the first tuple element's length.
    )listitemsappendtuple)r   r	   
kwarg_keys	flat_argskv r   ;/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/utils.py_pack_kwargs   s    
r   )r   r   r
   c                 C   s|   t |t | ks,tdt | dt |  t |dkr@| i fS | dt |  }dd t|| t | d D }||fS )zSee _pack_kwargs.ztoo many keys z vs. r   Nc                 S   s   i | ]\}}||qS r   r   ).0r   r   r   r   r   
<dictcomp>-   s      z"_unpack_kwargs.<locals>.<dictcomp>)lenAssertionErrorzip)r   r   r   r	   r   r   r   _unpack_kwargs'   s    ,"r   c                    s(    fddz| }W 5 dX |S )z4
    Recursively moves input to the target_gpu.
    c              	      sX  t  tjr jtdkr$ fS s4 fS t}tj|  }W 5 Q R X tj$ tj }|	| |
| W 5 Q R X |fS t r fddtt  D S t  trt dkrttt  S t  trt dkrdd tt  D S t  trRt dkrR fddtt   D S  gS )Ncudac                    s   g | ]}t  | qS r   type)r   r   objr   r   
<listcomp>K   s     z1_recursive_to.<locals>.to_map.<locals>.<listcomp>r   c                 S   s   g | ]}t |qS r   )r   r   ir   r   r   r!   O   s     c                    s   g | ]}t  |qS r   r   r"   r   r   r   r!   Q   s     )
isinstancetorchTensorZdevicetor   r   streamcurrent_streamZwait_streamZrecord_streamr   r   mapr   r   r   dictr   )r    r(   outputr)   
target_gputo_map!use_side_stream_for_tensor_copiesr   r   r/   5   s,    

 z_recursive_to.<locals>.to_mapNr   )inputsr.   r0   resr   r-   r   _recursive_to0   s
     r3   c                 C   s   | rt | ||ng } |r$t |||ng }t| t|k r^| dd tt|t|  D  n4t|t| k r|dd tt| t| D  t| } t|}| |fS )Nc                 S   s   g | ]}d qS )r   r   r   _r   r   r   r!   h   s     z_to_kwargs.<locals>.<listcomp>c                 S   s   g | ]}i qS r   r   r4   r   r   r   r!   j   s     )r3   r   extendranger   )r1   r	   Z	device_idr0   r   r   r   
_to_kwargs\   s    &$r8   c                 C   s   t | ||S )N)distZ_verify_params_across_processes)process_groupZtensorsloggerr   r   r   $_verify_param_shape_across_processeso   s    r<   c           	      C   sf   g }|   D ]\}}||kr||  q|  D ]\}}||kr4||  q4t|||| dS )a2  
    Syncs ``module``'s parameters and buffers state so that all ranks contain
    the same module state across all ranks. Note that this API assumes that all
    parameter shapes are consistent before running the synchronization. This can
    be checked with ``_verify_param_shape_across_processes``.
    N)Znamed_parametersr   detachZnamed_buffers_sync_params_and_buffers)	moduler:   broadcast_bucket_sizesrcZparams_and_buffers_to_ignoremodule_statesnameparambufferr   r   r   _sync_module_statesr   s    rF   r:   rB   r@   rA   c                 C   s    t |dkrt| ||| dS )zu
    Synchronizes ``module_states`` (list of tensors) across all processes by
    broadcasting them from rank 0.
    r   N)r   r9   Z_broadcast_coalescedrG   r   r   r   r>      s    
   r>   )
state_dict
old_prefix
new_prefixr
   c                 C   sX   ||krt dt|  D ]6}||s,q||t|d  }| | | |< | |= qdS )a  
    Replace all keys that match a given old_prefix with a new_prefix (in-place).

    Usage::

        state_dict = {"layer.xyz": torch.tensor(1)}
        replace_by_prefix_(state_dict, "layer.", "module.layer.")
        assert state_dict == {"module.layer.xyz": torch.tensor(1)}
    z*old_prefix and new_prefix must be distinctN)
ValueErrorr   keys
startswithr   )rH   rI   rJ   keyZnew_keyr   r   r   _replace_by_prefix   s    
rO   )N)r%   Ztorch.distributedZdistributedr9   Ztorch.nn.parallel._functionsr   Z torch.nn.parallel.scatter_gatherr   typingr   r   r   r   __all__strr   r   r3   r8   r<   rF   ZProcessGroupr&   intr>   rO   r   r   r   r   <module>   s*   *:	,

