U
    Vcn                     @   s  d Z ddlZddlZddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm	Z	 dd	lm
Z
 dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ e ZejZeddddgZe
jZe
jZe
jZe
jZediZediZdZG dd deZdd ZdddZ dd Z!dS )zATakes a generator of values, and accumulates them for a frontend.    N)directory_loader)directory_watcher)event_file_loader)
io_wrapper)plugin_asset_util)	reservoir)	tag_types)
config_pb2)	event_pb2)	graph_pb2)meta_graph_pb2)
tb_loggingTensorEvent	wall_timesteptensor_protoi  .c                   @   s   e Zd ZdZd0ddZdd Zdd	 Zd
d Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) Zd*d+ Zd,d- Zd.d/ ZdS )1EventAccumulatora  An `EventAccumulator` takes an event generator, and accumulates the
    values.

    The `EventAccumulator` is intended to provide a convenient Python
    interface for loading Event data written during a TensorFlow run.
    TensorFlow writes out `Event` protobuf objects, which have a timestamp
    and step number, and often contain a `Summary`. Summaries can have
    different kinds of data stored as arbitrary tensors. The Summaries
    also have a tag, which we use to organize logically related data. The
    `EventAccumulator` supports retrieving the `Event` and `Summary` data
    by its tag.

    Calling `Tags()` gets a map from `tagType` (i.e., `tensors`) to the
    associated tags for those data types. Then, the functional endpoint
    (i.g., `Accumulator.Tensors(tag)`) allows for the retrieval of all
    data associated with that tag.

    The `Reload()` method synchronously loads all of the data written so far.

    Fields:
      most_recent_step: Step of last Event proto added. This should only
          be accessed from the thread that calls Reload. This is -1 if
          nothing has been loaded yet.
      most_recent_wall_time: Timestamp of last Event proto added. This is
          a float containing seconds from the UNIX epoch, or -1 if
          nothing has been loaded yet. This should only be accessed from
          the thread that calls Reload.
      path: A file path to a directory containing tf events files, or a single
          tf events file. The accumulator will load events from this path.
      tensors_by_tag: A dictionary mapping each tag name to a
        reservoir.Reservoir of tensor summaries. Each such reservoir will
        only use a single key, given by `_TENSOR_RESERVOIR_KEY`.

    @@Tensors
    NTc           	      C   s   t |pt}i }tD ]&}||kr.|| ||< qt| ||< q|| _t |pJi | _d| _d| _d| _d| _i | _i | _	i | _
t | _tt | _t | _|| _t|||| _t | _|| _d| _d| _d| _d| _dS )a  Construct the `EventAccumulator`.

        Args:
          path: A file path to a directory containing tf events files, or a single
            tf events file. The accumulator will load events from this path.
          size_guidance: Information on how much data the EventAccumulator should
            store in memory. The DEFAULT_SIZE_GUIDANCE tries not to store too much
            so as to avoid OOMing the client. The size_guidance should be a map
            from a `tagType` string to an integer representing the number of
            items to keep per tag for items of that `tagType`. If the size is 0,
            all events are stored.
          tensor_size_guidance: Like `size_guidance`, but allowing finer
            granularity for tensor summaries. Should be a map from the
            `plugin_name` field on the `PluginData` proto to an integer
            representing the number of items to keep per tag. Plugins for
            which there is no entry in this map will default to the value of
            `size_guidance[event_accumulator.TENSORS]`. Defaults to `{}`.
          purge_orphaned_data: Whether to discard any events that were "orphaned" by
            a TensorFlow restart.
          event_file_active_filter: Optional predicate for determining whether an
            event file latest load timestamp should be considered active. If passed,
            this will enable multifile directory loading.
          detect_file_replacement: Optional boolean; if True, event file loading
            will try to detect when a file has been replaced with a new version
            that contains additional data, by monitoring the file size.
        NF)dictDEFAULT_SIZE_GUIDANCE_size_guidance_tensor_size_guidance_first_event_timestamp_graph_graph_from_metagraph_meta_graph_tagged_metadatasummary_metadatatensors_by_tag	threadingLock_tensors_by_tag_lockcollectionsdefaultdict_plugin_to_tag_to_content_plugin_tag_lockpath_GeneratorFromPath
_generator_generator_mutexpurge_orphaned_data_seen_session_startmost_recent_stepmost_recent_wall_timefile_version)	selfr'   Zsize_guidanceZtensor_size_guidancer+   event_file_active_filterdetect_file_replacementZsizeskey r4   a/tmp/pip-unpacked-wheel-g8kmtpbc/tensorboard/backend/event_processing/plugin_event_accumulator.py__init___   s<    #

  
zEventAccumulator.__init__c              	   C   s0   | j   | j D ]}| | qW 5 Q R X | S )zLoads all events added since the last call to `Reload`.

        If `Reload` was never called, loads all events in the file.

        Returns:
          The `EventAccumulator`.
        )r*   r)   Load_ProcessEventr0   eventr4   r4   r5   Reload   s    zEventAccumulator.Reloadc                 C   s   t | j|S )aH  Return a list of all plugin assets for the given plugin.

        Args:
          plugin_name: The string name of a plugin to retrieve assets for.

        Returns:
          A list of string plugin asset names, or empty list if none are available.
          If the plugin was not registered, an empty list is returned.
        )r   Z
ListAssetsr'   r0   plugin_namer4   r4   r5   PluginAssets   s    
zEventAccumulator.PluginAssetsc                 C   s   t | j||S )a4  Return the contents of a given plugin asset.

        Args:
          plugin_name: The string name of a plugin.
          asset_name: The string name of an asset.

        Returns:
          The string contents of the plugin asset.

        Raises:
          KeyError: If the asset is not available.
        )r   ZRetrieveAssetr'   )r0   r=   Z
asset_namer4   r4   r5   RetrievePluginAsset   s
      z$EventAccumulator.RetrievePluginAssetc              
   C   sp   | j dk	r| j S | jP z,t| j }| | | j W W  5 Q R  S  tk
r`   tdY nX W 5 Q R X dS )a  Returns the timestamp in seconds of the first event.

        If the first event has been loaded (either by this method or by `Reload`,
        this returns immediately. Otherwise, it will load in the first event. Note
        that this means that calling `Reload` will cause this to block until
        `Reload` has finished.

        Returns:
          The timestamp in seconds of the first event that was loaded.

        Raises:
          ValueError: If no events have been loaded and there were no events found
          on disk.
        Nz!No event timestamp could be found)r   r*   nextr)   r7   r8   StopIteration
ValueErrorr9   r4   r4   r5   FirstEventTimestamp   s    

z$EventAccumulator.FirstEventTimestampc              
   C   sB   | j 2 || jkrtd| t| j| W  5 Q R  S Q R X dS )a  Returns a dict mapping tags to content specific to that plugin.

        Args:
          plugin_name: The name of the plugin for which to fetch plugin-specific
            content.

        Raises:
          KeyError: if the plugin name is not found.

        Returns:
          A dict mapping tag names to bytestrings of plugin-specific content-- by
          convention, in the form of binary serialized protos.
        zPlugin %r could not be found.N)r&   r%   KeyErrorr   r<   r4   r4   r5   PluginTagToContent   s    
z#EventAccumulator.PluginTagToContentc              
   C   s(   | j  t| jW  5 Q R  S Q R X dS )zReturn a set of plugins with summary data.

        Returns:
          The distinct union of `plugin_data.plugin_name` fields from
          all the `SummaryMetadata` protos stored in this accumulator.
        N)r&   	frozensetr%   r0   r4   r4   r5   ActivePlugins  s    zEventAccumulator.ActivePluginsc                 C   s
   | j | S )zGiven a summary tag name, return the associated metadata object.

        Args:
          tag: The name of a tag, as a string.

        Raises:
          KeyError: If the tag is not found.

        Returns:
          A `SummaryMetadata` protobuf.
        )r   r0   tagr4   r4   r5   SummaryMetadata  s    z EventAccumulator.SummaryMetadatac                 C   s
   t | jS )zReturn summary metadata for all tags.

        Returns:
          A dict `d` such that `d[tag]` is a `SummaryMetadata` proto for
          the keyed tag.
        )r   r   rG   r4   r4   r5   AllSummaryMetadata  s    z#EventAccumulator.AllSummaryMetadatac              
   C   s  | j dkr|j| _ |drPt|j}| jrJ| j|krJtd| j| || _| | |dr| j	dk	rxtd |j
| _	d| _n|dr
| jdk	rtd |j| _| j	dks| jrt }|| j |j
r| j	dk	rtd	 d
| _|j
 | _	n|drL|jj}|| jkr<td| d  |jj| j|< n|dr|jjD ]}|dr|j}|| jkr|j| j|< |jj}|jr| j |j| j|j |< W 5 Q R X ntd| |dr`|j}|j}|s|j }| !||j|j"| q`dS )z#Called whenever an event is loaded.Nr/   zqFound new file_version for event.proto. This will affect purging logic for TensorFlow restarts. Old: {0} New: {1}	graph_defzFound more than one graph event per run, or there was a metagraph containing a graph_def, as well as one or more graph events.  Overwriting the graph with the newest event.Fmeta_graph_defz]Found more than one metagraph event per run. Overwriting the metagraph with the newest event.zFound multiple metagraphs containing graph_defs,but did not find any graph events.  Overwriting the graph with the newest metagraph version.Ttagged_run_metadataz2Found more than one "run metadata" event with tag z'. Overwriting it with the newest event.summarymetadataz?This summary with tag %r is oddly not associated with a plugin.tensor)#r   r   HasField_ParseFileVersionr/   loggerwarningformat_MaybePurgeOrphanedDatar   rM   r   r   rN   r   MetaGraphDefParseFromStringZSerializeToStringrO   rJ   r   run_metadatarP   valuer   rQ   plugin_datar=   r&   contentr%   rR   Z	node_name_ProcessTensorr   )r0   r:   Znew_file_version
meta_graphrJ   r\   r]   Zdatumr4   r4   r5   r8   '  s    


 





zEventAccumulator._ProcessEventc              
   C   s4   t t| j t| jdk	t| jdk	tt| j	 iS )zReturn all tags found in the value stream.

        Returns:
          A `{tagType: ['list', 'of', 'tags']}` dictionary.
        N)
TENSORSlistr   keysGRAPHr   
META_GRAPHr   RUN_METADATAr   rG   r4   r4   r5   Tags  s        zEventAccumulator.Tagsc                 C   s.   t  }| jdk	r"|| j |S tddS )aM  Return the graph definition, if there is one.

        If the graph is stored directly, return that.  If no graph is stored
        directly but a metagraph is stored containing a graph, return that.

        Raises:
          ValueError: If there is no graph for this run.

        Returns:
          The `graph_def` proto.
        Nz*There is no graph in this EventAccumulator)r   ZGraphDefr   rZ   rB   )r0   graphr4   r4   r5   Graph  s
    
zEventAccumulator.Graphc                 C   s   | j S )z@Return the graph definition in serialized form, if there is one.)r   rG   r4   r4   r5   SerializedGraph  s    z EventAccumulator.SerializedGraphc                 C   s*   | j dkrtdt }|| j  |S )zReturn the metagraph definition, if there is one.

        Raises:
          ValueError: If there is no metagraph for this run.

        Returns:
          The `meta_graph_def` proto.
        Nz.There is no metagraph in this EventAccumulator)r   rB   r   rY   rZ   )r0   r`   r4   r4   r5   	MetaGraph  s
    	
zEventAccumulator.MetaGraphc                 C   s.   || j krtdt }|| j |  |S )a  Given a tag, return the associated session.run() metadata.

        Args:
          tag: A string tag associated with the event.

        Raises:
          ValueError: If the tag is not found.

        Returns:
          The metadata in form of `RunMetadata` proto.
        z+There is no run metadata with this tag name)r   rB   r	   RunMetadatarZ   )r0   rJ   r[   r4   r4   r5   rl     s
    
zEventAccumulator.RunMetadatac                 C   s   | j | tS )zGiven a summary tag, return all associated tensors.

        Args:
          tag: A string tag associated with the events.

        Raises:
          KeyError: If the tag is not found.

        Returns:
          An array of `TensorEvent`s.
        )r   ZItems_TENSOR_RESERVOIR_KEYrI   r4   r4   r5   Tensors  s    zEventAccumulator.Tensorsc                 C   sN   | j s
dS | jr&| jdkr&| | n
| | |drJ|j| _|j| _dS )a  Maybe purge orphaned data due to a TensorFlow crash.

        When TensorFlow crashes at step T+O and restarts at step T, any events
        written after step T are now "orphaned" and will be at best misleading if
        they are included in TensorBoard.

        This logic attempts to determine if there is orphaned data, and purge it
        if it is found.

        Args:
          event: The event to use as a reference, to determine if a purge is needed.
        N   rP   )	r+   r/   _CheckForRestartAndMaybePurge$_CheckForOutOfOrderStepAndMaybePurgerS   r   r-   r   r.   r9   r4   r4   r5   rX     s    

z(EventAccumulator._MaybePurgeOrphanedDatac                 C   s6   |j jtjjkrdS | js$d| _dS | j|dd dS )a  Check and discard expired events using SessionLog.START.

        The first SessionLog.START event in a run indicates the start of a
        supervisor session. Subsequent SessionLog.START events indicate a
        *restart*, which may need to preempt old events. This method checks
        for a session restart event and purges all previously seen events whose
        step is larger than or equal to this event's step.

        Because of supervisor threading, it is possible that this logic will
        cause the first few event messages to be discarded since supervisor
        threading does not guarantee that the START message is deterministically
        written first.

        This method is preferred over _CheckForOutOfOrderStepAndMaybePurge which
        can inadvertently discard events due to supervisor threading.

        Args:
          event: The event to use as reference. If the event is a START event, all
            previously seen events with a greater event.step will be purged.
        NTFby_tags)Zsession_logstatusr
   Z
SessionLogSTARTr,   _Purger9   r4   r4   r5   rp     s    z.EventAccumulator._CheckForRestartAndMaybePurgec                 C   s(   |j | jk r$|dr$| j|dd dS )a  Check for out-of-order event.step and discard expired events for
        tags.

        Check if the event is out of order relative to the global most recent step.
        If it is, purge outdated summaries for tags that the event contains.

        Args:
          event: The event to use as reference. If the event is out-of-order, all
            events with the same tags, but with a greater event.step will be purged.
        rP   Trr   N)r   r-   rS   rv   r9   r4   r4   r5   rq   "  s    z5EventAccumulator._CheckForOutOfOrderStepAndMaybePurgec              	   C   sZ   t |||d}| j* || jkr:| |}t|| j|< W 5 Q R X | j| t| d S )N)r   r   r   )r   r"   r   _GetTensorReservoirSizer   Z	ReservoirZAddItemrm   )r0   rJ   r   r   rR   tvZreservoir_sizer4   r4   r5   r_   0  s    

zEventAccumulator._ProcessTensorc                 C   s4   | j t }| j|}|d kr"|S | j|jj|S N)r   ra   r   getr   r]   r=   )r0   rJ   defaultr   r4   r4   r5   rw   8  s    
 z(EventAccumulator._GetTensorReservoirSizec                    s    fdd}d}|rL j jD ],}|j| jkr| j|j }|||t7 }qn | j D ]}|||t7 }qV|dkrt| j| j	 j
 j|}t| dS )a  Purge all events that have occurred after the given event.step.

        If by_tags is True, purge all events that occurred after the given
        event.step, but only for the tags that the event has. Non-sequential
        event.steps suggest that a TensorFlow restart occurred, and we discard
        the out-of-order events to display a consistent view in TensorBoard.

        Discarding by tags is the safer method, when we are unsure whether a restart
        has occurred, given that threading in supervisor can cause events of
        different tags to arrive with unsynchronized step values.

        If by_tags is False, then purge all events with event.step greater than the
        given event.step. This can be used when we are certain that a TensorFlow
        restart has occurred and these events can be discarded.

        Args:
          event: The event to use as reference for the purge. All events with
            the same tags, but with a greater event.step will be purged.
          by_tags: Bool to dictate whether to discard all out-of-order events or
            only those that are associated with the given reference event.
        c                    s   | j  j k S ry   )r   )xr:   r4   r5   <lambda>X      z)EventAccumulator._Purge.<locals>.<lambda>r   N)rP   r\   rJ   r   ZFilterItemsrm   values_GetPurgeMessager-   r.   r   r   rU   rV   )r0   r:   rs   Z_NotExpirednum_expiredr\   Ztag_reservoirZ	purge_msgr4   r}   r5   rv   A  s0     
 zEventAccumulator._Purge)NNTNN)__name__
__module____qualname____doc__r6   r;   r>   r?   rC   rE   rH   rK   rL   r8   rg   ri   rj   rk   rl   rn   rX   rp   rq   r_   rw   rv   r4   r4   r4   r5   r   :   s8   '     
O
	l	r   c                 C   s   d || |||S )z=Return the string message associated with TensorBoard purges.zDetected out of order event.step likely caused by a TensorFlow restart. Purging {} expired tensor events from Tensorboard display between the previous step: {} (timestamp: {}) and current step: {} (timestamp: {}).)rW   )r-   r.   Z
event_stepZevent_wall_timer   r4   r4   r5   r   r  s    	r   c                    sf   | st dt| r"t|  S |rF fdd}tj| |tj|dS  fdd}t| |tjS dS )zECreate an event generator for file or directory at given path string.zpath must be a valid stringc                    s   t |  S ry   )r   ZTimestampedEventFileLoaderr'   r2   r4   r5   r~     s    z$_GeneratorFromPath.<locals>.<lambda>)Zpath_filterZactive_filterc                    s   t |  S ry   )r   EventFileLoaderr   r   r4   r5   r~     s    N)	rB   r   ZIsSummaryEventsFiler   r   r   ZDirectoryLoaderr   ZDirectoryWatcher)r'   r1   r2   Zloader_factoryr4   r   r5   r(     s&    

r(   c                 C   s>   |  d}zt|d W S  tk
r8   td Y dS X dS )zConvert the string file_version in event.proto into a float.

    Args:
      file_version: String file_version from event.proto

    Returns:
      Version number as a float.
    zbrain.Event:r   zpInvalid event.proto file_version. Defaulting to use of out-of-order event.step logic for purging expired events.N)splitfloatrB   rU   rV   )r/   tokensr4   r4   r5   rT     s    	
rT   )NN)"r   r#   r    Z$tensorboard.backend.event_processingr   r   r   r   r   r   r   Ztensorboard.compat.protor	   r
   r   r   Ztensorboard.utilr   Z
get_loggerrU   
namedtupler   ra   rd   re   rf   r   ZSTORE_EVERYTHING_SIZE_GUIDANCErm   objectr   r   r(   rT   r4   r4   r4   r5   <module>   sL         <   
