U
    Jc                     @   sd   d dl Z d dlZd dlZd dlmZmZ d dlmZ d dlZd dl	m
Z
 dd ZG dd de
ZdS )	    N)	b64decode	b64encode)Optional)Storec                   C   s   t tdd d S )Nr   g?)timesleeprandomuniform r
   r
   S/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/elastic/rendezvous/etcd_store.py	cas_delay   s    r   c                       s   e Zd ZdZdeej d fddZdd Ze	dd	d
Z
eedddZdeej dddZedddZedddZe	dddZdddZ  ZS )	EtcdStorez
    Implements a c10 Store interface by piggybacking on the rendezvous etcd
    instance. This is the store object returned by ``EtcdRendezvous``
    N)timeoutc                    sF   t    || _|| _|d k	r(| | | jdsB|  jd7  _d S )N/)super__init__clientprefixZset_timeoutendswith)selfZetcd_clientZetcd_store_prefixr   	__class__r
   r   r       s    

zEtcdStore.__init__c                 C   s&   | j j| j| | | |d dS )z
        Write a key/value pair into ``EtcdStore``.
        Both key and value may be either Python ``str`` or ``bytes``.
        keyvalueN)r   setr   _encode)r   r   r   r
   r
   r   r   2   s    zEtcdStore.set)returnc                 C   sB   | j | | }| |g}|dkr4td| d| || S )aV  
        Get a value by key, possibly doing a blocking wait.

        If key is not immediately present, will do a blocking wait
        for at most ``timeout`` duration or until the key is published.


        Returns:
            value ``(bytes)``

        Raises:
            LookupError - If key still not published after timeout
        NzKey z not found in EtcdStore)r   r   _try_wait_getLookupError_decode)r   r   b64_keykvsr
   r
   r   get9   s
    zEtcdStore.get)numr   c                 C   s   |  |}z4| jj| j| |  t|dd}t| |jW S  tj	k
rT   Y nX | jj
| j| d}|  tt| |j| }z(| jj|j||jd}t| |jW S  tjk
r   t  Y qVX qVdS )a  
        Atomically increment a value by an integer amount. The integer is
        represented as a string using base 10. If key is not present,
        a default value of ``0`` will be assumed.

        Returns:
             the new (incremented) value


        F)r   r   Z	prevExistr   )r   r   Z
prev_valueN)r   r   writer   strintr    r   etcdZEtcdAlreadyExistr#   Ztest_and_setr   ZEtcdCompareFailedr   )r   r   r$   r!   node	new_valuer
   r
   r   addO   s*    
  zEtcdStore.addoverride_timeoutc                    s2    fdd|D }  ||}|dkr.tddS )z
        Waits until all of the keys are published, or until timeout.

        Raises:
            LookupError - if timeout occurs
        c                    s   g | ]} j  | qS r
   r   r   .0r   r   r
   r   
<listcomp>{   s     z"EtcdStore.wait.<locals>.<listcomp>Nz+Timeout while waiting for keys in EtcdStore)r   r   )r   keysr.   b64_keysr"   r
   r2   r   waitt   s    zEtcdStore.waitc                    s0    fdd|D } j |tjddd}|dk	S )zU
        Check if all of the keys are immediately present (without waiting).
        c                    s   g | ]} j  | qS r
   r/   r0   r2   r
   r   r3      s     z#EtcdStore.check.<locals>.<listcomp>   )microsecondsr-   N)r   datetime	timedelta)r   r4   r5   r"   r
   r2   r   check   s    
zEtcdStore.checkc                 C   s@   t |tkrt| S t |tkr4t|  S tdd S Nz"Value must be of type str or bytes)typebytesr   decoder'   encode
ValueErrorr   r   r
   r
   r   r      s
    zEtcdStore._encodec                 C   s8   t |tkrt|S t |tkr,t| S tdd S r<   )r=   r>   r   r'   r@   rA   rB   r
   r
   r   r       s
    zEtcdStore._decodec                    s   |d kr| j n|}t |  }| jj| jd} fdd|jD }t|t krZ|S |t  }|dkrrd S z | jj| jd||j	d d W q" t
jk
r   t |krY d S Y q"Y q" t
jk
r   Y q"Y q"X q"d S )Nr%   c                    s    i | ]}|j  kr|j |jqS r
   r   )r1   r*   r5   r
   r   
<dictcomp>   s    
  z+EtcdStore._try_wait_get.<locals>.<dictcomp>r   Tr7   )r   	recursiver   index)r   r   total_secondsr   r#   r   childrenlenZwatchZ
etcd_indexr)   ZEtcdWatchTimedOutZEtcdEventIndexCleared)r   r5   r.   r   deadlineZ	all_nodesZ	req_nodesZwatch_timeoutr
   rC   r   r      s0    

zEtcdStore._try_wait_get)N)N)N)__name__
__module____qualname____doc__r   r9   r:   r   r   r>   r#   r(   r,   r6   boolr;   r'   r   r    r   __classcell__r
   r
   r   r   r      s   
 %r   )r9   r   r   base64r   r   typingr   r)   Ztorch.distributedr   r   r   r
   r
   r
   r   <module>   s   