U
    <c(2                     @   s   d dl mZ d dlmZ d dlmZmZ d dlmZ d dl	m
Z
mZmZmZmZmZmZmZ dddd	gZed
ddZedG dd deZedG dd dee ZedG dd	 d	eZedG dd dee ZdS )    )defaultdict)functional_datapipe)IterDataPipe	DataChunk)_check_unpickable_fn)AnyCallableDefaultDictIteratorListOptionalSizedTypeVarBatcherIterDataPipeGrouperIterDataPipeShardingFilterIterDataPipeUnBatcherIterDataPipeT_coT)	covariantZsharding_filterc                   @   s>   e Zd ZdZedddZdd Zdd Zd	d
 Zdd Z	dS )r   an  
    Wrapper that allows DataPipe to be sharded (functional name: ``sharding_filter``). After ``apply_sharding`` is
    called, each instance of the DataPipe (on different workers) will have every `n`-th element of the
    original DataPipe, where `n` equals to the number of instances.

    Args:
        source_datapipe: Iterable DataPipe that will be sharded
    )source_datapipec                 C   s   || _ d| _d| _d S )N   r   )r   num_of_instancesinstance_id)selfr    r   L/tmp/pip-unpacked-wheel-gikjz4vx/torch/utils/data/datapipes/iter/grouping.py__init__   s    z#ShardingFilterIterDataPipe.__init__c                 C   s   dS )NTr   r   r   r   r   is_shardable!   s    z'ShardingFilterIterDataPipe.is_shardablec                 C   s   || _ || _d S N)r   r   )r   r   r   r   r   r   apply_sharding$   s    z)ShardingFilterIterDataPipe.apply_shardingc                 c   s.   t | jD ]\}}|| j | jkr
|V  q
d S r   )	enumerater   r   r   )r   iitemr   r   r   __iter__(   s    z#ShardingFilterIterDataPipe.__iter__c                 C   sR   t | jtr:t| j| j | jt| j| j k r4dnd S tdt| j	d S )Nr   r   %{} instance doesn't have valid length)

isinstancer   r   lenr   r   	TypeErrorformattype__name__r   r   r   r   __len__-   s
    z"ShardingFilterIterDataPipe.__len__N)
r+   
__module____qualname____doc__r   r   r   r    r$   r,   r   r   r   r   r      s   batchc                       sx   e Zd ZU dZeed< eed< eed< ee ed< de	feeedd fd	d
Z
ee	 dddZedddZ  ZS )r   a1  
    Creates mini-batches of data (functional name: ``batch``). An outer dimension will be added as
    ``batch_size`` if ``drop_last`` is set to ``True``, or ``length % batch_size`` for the
    last batch if ``drop_last`` is set to ``False``.

    Args:
        datapipe: Iterable DataPipe being batched
        batch_size: The size of each batch
        drop_last: Option to drop the last batch if it's not full
        wrapper_class: wrapper to apply onto each batch (type ``List``) before yielding,
            defaults to ``DataChunk``

    Example:
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> dp = IterableWrapper(range(10))
        >>> dp = dp.batch(batch_size=3, drop_last=True)
        >>> list(dp)
        [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
    datapipe
batch_size	drop_lastlengthFN)r1   r2   r3   returnc                    s<   |dkst dt   || _|| _|| _d | _|| _d S )Nr   z+Batch size is required to be larger than 0!)AssertionErrorsuperr   r1   r2   r3   r4   wrapper_class)r   r1   r2   r3   r8   	__class__r   r   r   O   s    
zBatcherIterDataPipe.__init__r5   c                 c   sZ   g }| j D ],}|| t|| jkr
| |V  g }q
t|dkrV| jsV| |V  d S Nr   )r1   appendr'   r2   r8   r3   )r   r0   xr   r   r   r$   ]   s    

zBatcherIterDataPipe.__iter__c                 C   sp   | j d k	r| j S t| jtrX| jr6t| j| j | _ nt| j| j d | j | _ | j S tdt	| j
d S )Nr   r%   )r4   r&   r1   r   r3   r'   r2   r(   r)   r*   r+   r   r   r   r   r,   h   s    
zBatcherIterDataPipe.__len__)r+   r-   r.   r/   r   __annotations__intboolr   r   r   r
   r$   r,   __classcell__r   r   r9   r   r   4   s   
Zunbatchc                   @   s2   e Zd ZdZdeedddZdd Zdd	 Zd
S )r   a   
    Undoes batching of data (functional name: ``unbatch``). In other words, it flattens the data up to the specified level
    within a batched DataPipe.

    Args:
        datapipe: Iterable DataPipe being un-batched
        unbatch_level: Defaults to ``1`` (only flattening the top level). If set to ``2``,
            it will flatten the top two levels, and ``-1`` will flatten the entire DataPipe.

    Example:
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> source_dp = IterableWrapper([[[0, 1], [2]], [[3, 4], [5]], [[6]]])
        >>> dp1 = source_dp.unbatch()
        >>> list(dp1)
        [[0, 1], [2], [3, 4], [5], [6]]
        >>> dp2 = source_dp.unbatch(unbatch_level=2)
        >>> list(dp2)
        [0, 1, 2, 3, 4, 5, 6]
    r   r1   unbatch_levelc                 C   s   || _ || _d S r   rC   )r   r1   rD   r   r   r   r      s    zUnBatcherIterDataPipe.__init__c                 c   s,   | j D ] }| j|| jdD ]
}|V  qqd S )NrD   )r1   _diverD   )r   elementr"   r   r   r   r$      s    
zUnBatcherIterDataPipe.__iter__c                 c   s   |dk rt d|dkrZt|ts,t|trR|D ]}| j|ddD ]
}|V  qBq0q|V  n`|dkrj|V  nPt|ts~t|tr|D ]"}| j||d dD ]
}|V  qqntd| j dd S )Nz unbatch_level must be -1 or >= 0rE   r   r   zunbatch_level z" exceeds the depth of the DataPipe)
ValueErrorr&   listr   rF   
IndexErrorrD   )r   rG   rD   r#   r"   r   r   r   rF      s    zUnBatcherIterDataPipe._diveN)r   )	r+   r-   r.   r/   r   r@   r   r$   rF   r   r   r   r   r   t   s    groupbyc                   @   sv   e Zd ZdZdddddee eeee ee e	dddZ
d	d
 Zdd ZddddZdd Zdd Zdd ZdS )r   a	  
    Groups data from input IterDataPipe by keys which are generated from ``group_key_fn``,
    and yields a ``DataChunk`` with batch size up to ``group_size`` if defined (functional name: ``groupby``).

    The samples are read sequentially from the source ``datapipe``, and a batch of samples belonging to the same group
    will be yielded as soon as the size of the batch reaches ``group_size``. When the buffer is full,
    the DataPipe will yield the largest batch with the same key, provided that its size is larger
    than ``guaranteed_group_size``. If its size is smaller, it will be dropped if ``drop_remaining=True``.

    After iterating through the entirety of source ``datapipe``, everything not dropped due to the buffer capacity
    will be yielded from the buffer, even if the group sizes are smaller than ``guaranteed_group_size``.

    Args:
        datapipe: Iterable datapipe to be grouped
        group_key_fn: Function used to generate group key from the data of the source datapipe
        buffer_size: The size of buffer for ungrouped data
        group_size: The max size of each group, a batch is yielded as soon as it reaches this size
        guaranteed_group_size: The guaranteed minimum group size to be yielded in case the buffer is full
        drop_remaining: Specifies if the group smaller than ``guaranteed_group_size`` will be dropped from buffer
            when the buffer is full

    Example:
        >>> import os
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> def group_fn(file):
        ...    return os.path.basename(file).split(".")[0]
        >>> source_dp = IterableWrapper(["a.png", "b.png", "a.json", "b.json", "a.jpg", "c.json"])
        >>> dp0 = source_dp.groupby(group_key_fn=group_fn)
        >>> list(dp0)
        [['a.png', 'a.json', 'a.jpg'], ['b.png', 'b.json'], ['c.json']]
        >>> # A group is yielded as soon as its size equals to `group_size`
        >>> dp1 = source_dp.groupby(group_key_fn=group_fn, group_size=2)
        >>> list(dp1)
        [['a.png', 'a.json'], ['b.png', 'b.json'], ['a.jpg'], ['c.json']]
        >>> # Scenario where `buffer` is full, and group 'a' needs to be yielded since its size > `guaranteed_group_size`
        >>> dp2 = source_dp.groupby(group_key_fn=group_fn, buffer_size=3, group_size=3, guaranteed_group_size=2)
        >>> list(dp2)
        [['a.png', 'a.json'], ['b.png', 'b.json'], ['a.jpg'], ['c.json']]
    i'  NF)buffer_size
group_sizeguaranteed_group_sizedrop_remaining)r1   group_key_fnrM   rN   rO   rP   c                C   s   t | || _|| _|| _tt| _d| _|| _d | _	|d k	rf|d k	rfd|  k rZ|ks`n t
|| _	|d k	r|d k	rd|  k r|ksn t
|| _	|| _t| _d S r<   )r   r1   rQ   max_buffer_sizer   rJ   buffer_elementscurr_buffer_sizerN   rO   r6   rP   r   r8   )r   r1   rQ   rM   rN   rO   rP   r   r   r   r      s     
"zGrouperIterDataPipe.__init__c                 C   s   d }d}d }| j  D ](}t| j | |krt| j | }|}q| jd k	rn|| jk rn| jsntdt| j | | jd ks|| jkr| j | }|  j|8  _| j |= |S )Nr   zFailed to group items)rS   keysr'   rO   rP   RuntimeErrorstrrT   )r   Zbiggest_keyZbiggest_sizeresult_to_yieldZfindkeyr   r   r   _remove_biggest_key   s    
z'GrouperIterDataPipe._remove_biggest_keyc                 c   s   | j D ]}| |}| j| | |  jd7  _| jd k	r| jt| j| kr| | j| V  |  jt| j| 8  _| j|= | j| jkr| 	 }|d k	r| |V  qt
| j D ].}| j|}|  jt|8  _| |V  qd S )Nr   )r1   rQ   rS   r=   rT   rN   r'   r8   rR   rY   tuplerU   pop)r   r>   keyrX   resr   r   r   r$     s     

zGrouperIterDataPipe.__iter__r;   c                 C   s   d| _ tt| _d S r<   )rT   r   rJ   rS   r   r   r   r   reset  s    zGrouperIterDataPipe.resetc              	   C   s@   | j | j| j| j| j| j| j| j| jf	}t	j
d k	r<t	
|S |S r   )r1   rQ   rR   rN   rO   rP   r8   _valid_iterator_id_number_of_samples_yieldedr   Zgetstate_hookr   stater   r   r   __getstate__  s    

z GrouperIterDataPipe.__getstate__c              
   C   s<   |\	| _ | _| _| _| _| _| _| _| _d| _	t
t| _d S r<   )r1   rQ   rR   rN   rO   rP   r8   r_   r`   rT   r   rJ   rS   ra   r   r   r   __setstate__-  s    z GrouperIterDataPipe.__setstate__c                 C   s   | j   d S r   )rS   clearr   r   r   r   __del__<  s    zGrouperIterDataPipe.__del__)r+   r-   r.   r/   r   r   r   r@   r   rA   r   rY   r$   r^   rc   rd   rf   r   r   r   r   r      s&   ,N)collectionsr   Z%torch.utils.data.datapipes._decoratorr   Z#torch.utils.data.datapipes.datapiper   r   Z'torch.utils.data.datapipes.utils.commonr   typingr   r   r	   r
   r   r   r   r   __all__r   r   r   r   r   r   r   r   r   <module>   s$   (!?6