U
    Jc)$                     @   sd  d dl mZ d dlZd dl Z d dlZd dlZd dlmZmZmZm	Z	 d dl
Z
d dl
mZ d dlmZ d dlmZ ddlmZmZ dd	lmZmZmZ dd
lmZmZmZmZmZmZmZmZ d dl m!Z! eG dd dZ"eG dd dZ#dZ$e
je
jdddZ%eedddZ&dd Z'eee(dddZ)G dd deZ*G dd dej+Z,G d d! d!eZ-dS )"    )	dataclassN)ListUnionDictcast)Tensor)Future)Path   )MetadataMetadataIndex)StorageReaderStorageWriterWriteResult)LoadItemTypeLoadPlannerLoadPlanSavePlanSavePlannerReadItem	WriteItemWriteItemType)narrow_tensor_by_indexc                   @   s*   e Zd ZU dZeed< eed< eed< dS )_StorageInfoz,
    This is the per entry storage info
    relative_pathoffsetlengthN)__name__
__module____qualname____doc__str__annotations__int r$   r$   R/tmp/pip-unpacked-wheel-gikjz4vx/torch/distributed/_shard/checkpoint/filesystem.pyr   %   s   
r   c                   @   s   e Zd ZU eed< dS )_StoragePrefixprefixN)r   r   r   r!   r"   r$   r$   r$   r%   r&   .   s   
r&   z.distcp)tensorreturnc                 C   s,   |    } |   |  kr(|  } | S N)detachcpustoragesizeZnumelclone)r(   r$   r$   r%   _trim4   s    r0   )itemr)   c                 C   s   t | j||dS )N)indexsize_in_bytesstorage_data)r   r2   )r1   r3   r4   r$   r$   r%   _result_from_write_item:   s
    r5   c                 C   s   |   }|jtjkr4t|tjs$t| |	  n0t|t
jsDt|jt
dksXtt
||  |   | }t||t|||S )Nr,   )telltyper   BYTE_IO
isinstanceioBytesIOAssertionErrorwrite	getbuffertorchr   Zdevicesaver5   r   )streamdata
write_itemZstorage_keyr   r   r$   r$   r%   _write_item@   s    
rD   
file_queueplanner	use_fsyncc              
   C   s   g }| D ]\}}}dd |D }dd |D }t |d~}	|D ]"}
||
}|t|	||
| q>|D ]8}
tttj||
}|jrt	|t|	||
| qf|rt
|	  W 5 Q R X q|S )Nc                 S   s   g | ]}|j tjkr|qS r$   r7   r   r8   .0Zwir$   r$   r%   
<listcomp>Z   s      z+_write_files_from_queue.<locals>.<listcomp>c                 S   s   g | ]}|j tjkr|qS r$   rI   rJ   r$   r$   r%   rL   [   s      wb)openZresolve_dataappendrD   r0   r   r?   r   Zis_cudar<   osfsyncfileno)rF   rG   rH   Zwrite_results	file_path	file_nameZwrite_itemsZtensor_wZbytes_wrA   rC   rB   r(   r$   r$   r%   _write_files_from_queueR   s    

rU   c                       s   e Zd ZdZdeeejf eedd fddZ	eddd	d
Z
eedddZee ee dddZeeeee  dddZeeee  ddddZ  ZS )FileSystemWriteraa  
    Basic implementation of StorageWriter using file IO.

    This implementation makes the following assumptions and simplifications:

    * The checkpoint path is an empty or non-existing directory.
    * File creation is atomic

    The checkpoint consist of one file per write request plus
    a `.metadata` file with the serialized metadata.

    FTN)pathsingle_file_per_rank
sync_filesr)   c                    s$   t    t|| _|| _|| _dS )a  
        Initialize the writer pointing to `path`

        Args:
            path: diretory where the checkpoint will be writen to.
            single_file_per_rank: Produce one file per rank instead of one file per tensor/blob. Default to True.
            sync_files: force files to be synced to permanent storage. Default to True.

        N. B. If sync_files is disabled, there's no guarantee that the checkpoint will be consistent in the case of a failure.
        N)super__init__r	   rW   rX   rY   )selfrW   rX   rY   	__class__r$   r%   r[   y   s    

zFileSystemWriter.__init__)is_coordinatorr)   c                 C   s   d S r*   r$   )r\   r_   r$   r$   r%   init   s    zFileSystemWriter.initplanr)   c                 C   s   |S r*   r$   r\   rb   r$   r$   r%   prepare_local_plan   s    z#FileSystemWriter.prepare_local_planglobal_planr)   c                 C   s&   | j jddd dd t|D }|S )NT)parentsexist_okc                 S   s*   g | ]"\}}t j|td | ddqS )___)r4   )dataclassesreplacer&   )rK   irb   r$   r$   r%   rL      s    z8FileSystemWriter.prepare_global_plan.<locals>.<listcomp>)rW   mkdir	enumerate)r\   rf   Z	new_plansr$   r$   r%   prepare_global_plan   s
    z$FileSystemWriter.prepare_global_planrb   rG   r)   c           	         s   |j d  fdd}g }| jrB| }|| j| ||jf n*|jD ]"}| }|| j| ||gf qHt||| jd}t }|| |S )Nr   c                     s   j    t }  d7  | S )Nr
   )r'   DEFAULT_SUFIX)rT   Z
file_countZstorage_planr$   r%   gen_file   s    z-FileSystemWriter.write_data.<locals>.gen_filerE   )	r4   rX   rO   rW   itemsrU   rY   r   
set_result)	r\   rb   rG   rt   rF   rT   r1   resultsfutr$   rs   r%   
write_data   s$    

zFileSystemWriter.write_data)metadatarw   r)   c              	   C   sz   t  }|D ]}|dd |D  q
||_| jd d }t|| t|	  W 5 Q R X | jd 
| jd  d S )Nc                 S   s   i | ]}|j |jqS r$   )r2   r4   )rK   wrr$   r$   r%   
<dictcomp>   s     z+FileSystemWriter.finish.<locals>.<dictcomp>z.metadata.tmprM   	.metadata)dictupdater4   rW   rN   pickledumprP   rQ   rR   rename)r\   rz   rw   Z
storage_mdZwr_listmetadata_filer$   r$   r%   finish   s    

zFileSystemWriter.finish)FT)r   r   r   r    r   r!   rP   PathLikeboolr[   r`   r   rd   r   rp   r   r   r   ry   r   r   __classcell__r$   r$   r]   r%   rV   l   s"     

!rV   c                       sV   e Zd Zejeed fddZejfeeed fddZ	ed fdd	Z
  ZS )
SlicedBufferedReader)base_streamr   lenc                    s&   t  | || _|| _| d d S )Nr   )rZ   r[   r   r   seek)r\   r   r   r   r]   r$   r%   r[      s    zSlicedBufferedReader.__init__)_SlicedBufferedReader__offset_SlicedBufferedReader__whencer)   c                    sD   |t jkr| j| }n |t jkr6t j}| j| j | }t ||S r*   )rP   SEEK_SETr   SEEK_ENDr   rZ   r   )r\   r   r   r]   r$   r%   r      s    

zSlicedBufferedReader.seekr)   c                    s   t   | j S r*   )rZ   r6   r   )r\   r]   r$   r%   r6      s    zSlicedBufferedReader.tell)r   r   r   r:   	RawIOBaser#   r[   rP   r   r   r6   r   r$   r$   r]   r%   r      s   r   c                       s   e Zd Zeeejf dd fddZedddZ	e
eed dd	d
ZedddZeeddddZe
e
dddZee
 ee
 dddZ  ZS )FileSystemReaderN)rW   r)   c                    s    t    t|| _t | _d S r*   )rZ   r[   r	   rW   r~   r4   )r\   rW   r]   r$   r%   r[      s    

zFileSystemReader.__init__)sinfoc                 C   s   t tj| dd|j|jS )NF)closefd)r   r:   FileIOrR   r   r   )r\   filer   r$   r$   r%   _slice_file   s
     zFileSystemReader._slice_filerq   c                 C   sN  t  }|jD ](}| j|j }|j}||g | q| D ]\}}| j| d}	|D ]}
| j|
j }| 	|	|}|
j
tjkrt||j}|d ||
| q\tttj|dd}t||
j|
j}||
 }| | kstd|
j d|  d|  || ||
| q\W 5 Q R X q>t  }|!d  |S )Nrbr   r,   )Zmap_locationzreq z mismatch sizes z vs )"r~   ru   r4   Zstorage_indexr   
setdefaultrO   rW   rN   r   r7   r   r8   r:   r;   readr   r   Z
load_bytesr   r   r?   loadr   Zstorage_offsetslengthsZresolve_tensorr+   r.   r<   Zcopy_Zcommit_tensorr   rv   )r\   rb   rG   Zper_fileZ	read_itemZitem_mdrW   r   reqsr   reqZ
file_slicebytesr(   Ztarget_tensorrx   r$   r$   r%   	read_data   s4    



zFileSystemReader.read_datar   c              
   C   s2   | j d d}t|W  5 Q R  S Q R X d S )Nr}   r   )rW   rN   r   r   )r\   r   r$   r$   r%   read_metadata  s    zFileSystemReader.read_metadata)rz   r_   r)   c                 C   s   |j | _ | j d k	std S r*   )r4   r<   )r\   rz   r_   r$   r$   r%   r`     s    zFileSystemReader.initra   c                 C   s   |S r*   r$   rc   r$   r$   r%   rd     s    z#FileSystemReader.prepare_local_planre   c                 C   s   |S r*   r$   )r\   rf   r$   r$   r%   rp     s    z$FileSystemReader.prepare_global_plan)r   r   r   r   r!   rP   r   r[   r   r   r   r   r   r   r   r   r   r`   rd   r   rp   r   r$   r$   r]   r%   r      s   &r   ).rk   r   rP   r:   r   typingr   r   r   r   r?   r   Ztorch.futuresr   pathlibr	   rz   r   r   r-   r   r   r   rG   r   r   r   r   r   r   r   r   Ztorch.distributed._shard._utilsr   r   r&   rr   r0   r5   rD   r   rU   rV   BufferedReaderr   r   r$   r$   r$   r%   <module>   s8   (`