U
    Jc=%                     @   s   d dl mZ d dlmZ d dlmZmZmZmZ d dl	Z	d dl
mZ d dlmZmZmZmZmZmZ d dlmZ d dlmZmZmZ eG d	d
 d
ZedddZeeef eeef dddZeee	jj dddZG dd de	jZ dS )    )	dataclass)partial)AnyListOptionalTupleN)_disabled_torch_function_impl)_ProxyTensorfetch_tensor_proxy	get_proxyget_proxy_slotsset_proxy_slottrack_tensor_tree)no_dispatch)tree_flattentree_maptree_map_onlyc                   @   s$   e Zd ZU ejed< ejjed< dS )_CommResult_tensor_workN)__name__
__module____qualname__torchTensor__annotations__distributed_Work r   r   G/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/_spmd/comm_tensor.pyr      s   

r   comm_resultc                 C   s   | j   | jS N)r   waitr   r    r   r   r   
_wait_comm   s    
r$   )resultreturnc                 C   s(   dd }| d }t t||| d |fS )Nc                 S   s   t |tjstdt|| S )NzeExcepting collection of tensors as the first element in the return value of communication operations.)
isinstancer   r   AssertionErrorr   worker   r   r   wrap'   s    z_wrap_comm_result.<locals>.wrap   r   )r   r   )r%   r,   r*   r   r   r   _wrap_comm_result&   s    r.   )objr&   c                 C   s8   t | }|d krd S t| }t|dks0t|d S )Nr-   r   )r   tuplekeyslenr(   )r/   slotsr1   r   r   r   _get_tracer6   s    r4   c                   @   s   e Zd ZU dZdddddgZee ed< ej	ed< e
ejj ed	< eej	d
ddZdd ZeZedd ZedddZdS )
CommTensora   
    A Tensor subclass to wrap input tensors for collective communications. This
    Tensor subclass works for both eager and tracing mode.

    In eager mode, it will record whether the inplace collective communication
    has been launched using this Tensor and remember the corresponding work
    handle. If yes, it will expliclty call wait() in the ``__torch_dispatch__``
    function before subsequent operations consuming the value of the Tensor.

    In tracing mode, ``CommTensor`` inserts two node into the graph using the
    ``__torch_dispatch__`` function.
    1. The first node is inserted right after the
    communication, wrapping both the inplace output tensor and the returned
    work handle into a custom ``_CommResult`` type. We have to do this because
    ``ProxyTorchDispatchMode`` only handles ``torch.Tensor``, ``_ProxyTensor``,
    and ``torch.nn.Parameter`` objects and will treat the work handle
    as a constant and embed that into the graph. As a result, during execution,
    it will use the work handle created during tracing and will lead to wrong
    result. The solution in this test is to manually create a proxy on the
    return value of ``allreduce_`` which is ``([tensor], work)``, and wrap that
    to ``[(_CommResult(tensor, work)), work]``. In this way, subsequent nodes can
    directly consume ``_CommResult``.
    2. The second node is inserted right before any subsequent node reads from
    ``_CommResult``. It will call ``wait()`` on the stashed work handle to ensure
    that computation waits for communication.
    Z
allreduce_Z
allgather_Z
broadcast_Zreduce_scatter_Zscatter__supported_commsr   r   )tensorc                 C   sH   t |tr|jn|}t|d kr$|S tjj| ||jd}||_d |_|S )N)Zrequire_grad)	r'   r5   r   r4   r   r   Z_make_subclassZrequires_gradr   )clsr7   trr   r   r   __new__f   s    zCommTensor.__new__c                 C   s   d| j  d| j dS )NzCommTensor(z, work=))r   r   )selfr   r   r   __repr__w   s    zCommTensor.__repr__c                    s   t  fdd| jD S )Nc                    s   g | ]}| kqS r   r   ).0Zcommop_namer   r   
<listcomp>   s     z,CommTensor._is_supported.<locals>.<listcomp>)anyr6   )r8   rA   r   r@   r   _is_supported~   s    zCommTensor._is_supportedr   Nc              	      s  d  d t d fdd}t ddd}tjjt ddd}t||}t||}	| |jrf d k	r>ttd	d
 ttj	t
 ||	f\}
}||
|} jdt|fi dd}t  |||	}W 5 Q R X t||d  d tt||d |d  t|d \}}t|d \}}t||D ]\}}t| t| q|S |||	}tt||d |d  |S n$d k	rz|||	S t||||	S d S )Nr+   c                    sl   t | trd| jt| j d k	r^ d k	rV jdtt| jjfi dd}t	| j | 
  | jS | S d S )Ncall_functionZ	wait_commname)r'   r5   r   r4   r   create_proxyr$   r   proxyr   r#   )r+   	proxy_restracerr*   r   r   unwrap   s     

z-CommTensor.__torch_dispatch__.<locals>.unwrapc                 S   s   t | tjrt| S | S r"   )r'   r   r   r5   rE   r   r   r   r,      s    z+CommTensor.__torch_dispatch__.<locals>.wrapr)   c                 S   s*   t |tr| |_nt |tjr&td|S )NzwType of output tensors from collective communication during tracing should always be CommTensor instead of torch.Tensor)r'   r5   r   r   r   RuntimeErrorr)   r   r   r   set_work   s    
z/CommTensor.__torch_dispatch__.<locals>.set_workc                 S   s   | j S r"   )rJ   rE   r   r   r   <lambda>       z/CommTensor.__torch_dispatch__.<locals>.<lambda>rF   r!   rG   )ZconstantrM   r-   r   )r   r   r   r   r   rD   r   r   r	   r   r
   rI   r.   r   r   r   r   zipr   r   )r8   functypesargskwargsrN   r,   rP   Zunwrapped_argsZunwrapped_kwargsZ
proxy_argsproxy_kwargsrK   Zcomm_result_proxyoutZ	flat_argsZ	args_specZflat_outZout_specaor   rL   r   __torch_dispatch__   sR    







zCommTensor.__torch_dispatch__)r   N)r   r   r   __doc__r6   r   strr   r   r   r   r   r   staticmethodr;   r>   r   Z__torch_function__classmethodrD   r\   r   r   r   r   r5   ?   s"   


r5   )!Zdataclassesr   	functoolsr   typingr   r   r   r   r   Ztorch._Cr   Z"torch.fx.experimental.proxy_tensorr	   r
   r   r   r   r   Ztorch.utils._mode_utilsr   Ztorch.utils._pytreer   r   r   r   r$   r.   ZfxZTracerr4   r   r5   r   r   r   r   <module>   s     	