from typing import Optional import torch from .expanded_weights_impl import ExpandedWeight def standard_kwargs(kwarg_names, expanded_args): r'''Most `__torch_function__`s standardize the kwargs that they give, so this will separate the args and kwargs they pass. Functions that don't are linear and convND ''' kwarg_values = expanded_args[len(expanded_args) - len(kwarg_names):] expanded_args_without_kwargs = expanded_args[:len(expanded_args) - len(kwarg_names)] expanded_kwargs = {name: value for (name, value) in zip(kwarg_names, kwarg_values)} return expanded_args_without_kwargs, expanded_kwargs def forward_helper(func, expanded_args, expanded_kwargs): r'''Forward helper computes the forward pass for a function that has expanded weight(s) passed to it. It will run the forward pass where all ExpandedWeights are their original weight. It runs checks on the given arguments and detaches the outputs. .. note:: First argument in :attr:`expanded_args` must be the input with the batch dimension as the first element of the shape .. note:: :attr:`func` must return a Tensor or tuple of Tensors Args: func: The function to be called ctx: The context from the autograd.Function object. Will be used to save computed state from the forward pass expanded_args: Arguments to be passed to :attr:`func`. Will include arguments that need to be unpacked because they are ExpandedWeights num_true_outs: The number of outputs seen by the user since some functions return auxillary data that is only used in the backward pass ''' unexpanded_args, unexpanded_kwargs = _check_and_unexpand_args(func, expanded_args, expanded_kwargs) return func(*unexpanded_args, **unexpanded_kwargs) def _check_and_unexpand_args(func, expanded_args, expanded_kwargs): # input must be the first argument passed input = expanded_args[0] if isinstance(input, ExpandedWeight): raise RuntimeError("Expanded Weights do not support inputs that are also ExpandedWeights. " f"Input must be a Tensor, got {type(input).__name__} in function {func.__name__}") if not isinstance(input, torch.Tensor): raise RuntimeError("Expanded Weights requires a Tensor as the first input to get the batch dimension, " f"got {type(input).__name__} in function {func.__name__}") if len(input.shape) == 0: raise RuntimeError(f"Expanded Weights requires a batch dimension but got an input of size 0 in function {func.__name__}") if input.shape[0] == 0: raise RuntimeError("0 is not a valid batch size for Expanded Weights but got input tensor of " f"{input} in function {func.__name__}") batch_size = input.shape[0] for arg in expanded_args + tuple(expanded_kwargs.values()): if isinstance(arg, ExpandedWeight) and arg.batch_size != batch_size: raise RuntimeError("Expected ExpandedWeights to have batch size matching input but got " f"input batch size of {batch_size} with ExpandedWeight of batch size {arg.batch_size}") loss_reduction: Optional[str] = None for arg in expanded_args + tuple(expanded_kwargs.values()): if isinstance(arg, ExpandedWeight): if loss_reduction is None: loss_reduction = arg.loss_reduction elif loss_reduction != arg.loss_reduction: raise RuntimeError("Expected ExpandedWeights to all have the same loss_reduction argument but got one" f"with {loss_reduction} and one with {arg.loss_reduction}") unexpanded_args = tuple(arg.orig_weight if isinstance(arg, ExpandedWeight) else arg for arg in expanded_args) unexpanded_kwargs = {name: arg.orig_weight if isinstance(arg, ExpandedWeight) else arg for (name, arg) in expanded_kwargs.items()} return unexpanded_args, unexpanded_kwargs def maybe_scale_by_batch_size(grad_sample, expanded_weight): if expanded_weight.loss_reduction == "mean": return grad_sample * expanded_weight.batch_size else: return grad_sample def set_grad_sample_if_exists(maybe_expanded_weight, per_sample_grad_fn): unpacked = unpack_expanded_weight_or_tensor(maybe_expanded_weight) if isinstance(maybe_expanded_weight, ExpandedWeight): grad_sample_contribution = maybe_scale_by_batch_size(per_sample_grad_fn(unpacked), maybe_expanded_weight) if hasattr(unpacked, "grad_sample") and unpacked.grad_sample is not None: unpacked.grad_sample = unpacked.grad_sample + grad_sample_contribution else: unpacked.grad_sample = grad_sample_contribution def unpack_expanded_weight_or_tensor(maybe_expanded_weight, func=lambda x: x): if isinstance(maybe_expanded_weight, ExpandedWeight): orig_weight = maybe_expanded_weight.orig_weight return func(orig_weight) elif isinstance(maybe_expanded_weight, torch.Tensor) and not maybe_expanded_weight.requires_grad: return func(maybe_expanded_weight) elif isinstance(maybe_expanded_weight, torch.Tensor): raise RuntimeError("ExpandedWeights currently does not support a mixture of ExpandedWeight parameters " "and normal Parameters. Please file and issue with pytorch/pytorch") def sum_over_all_but_batch_and_last_n( tensor: torch.Tensor, n_dims: int ) -> torch.Tensor: r""" Calculates the sum over all dimensions, except the first (batch dimension), and excluding the last n_dims. This function will ignore the first dimension and it will not aggregate over the last n_dims dimensions. Args: tensor: An input tensor of shape ``(B, ..., X[n_dims-1])``. n_dims: Number of dimensions to keep. Example: >>> tensor = torch.ones(1, 2, 3, 4, 5) >>> sum_over_all_but_batch_and_last_n(tensor, n_dims=2).shape torch.Size([1, 4, 5]) Returns: A tensor of shape ``(B, ..., X[n_dims-1])`` """ if tensor.dim() == n_dims + 1: return tensor else: dims = list(range(1, tensor.dim() - n_dims)) return tensor.sum(dim=dims)