Source code for torchnaut.utils
import torch
from sklearn.decomposition import PCA
import numpy as np
[docs]
def get_batch_ixs(ref_tensor, batch_size=16, permute=False):
"""Generate batch indices for mini-batch processing.
Args:
ref_tensor: Reference tensor to determine total size
batch_size: Size of each batch
permute: Whether to randomly permute indices
Returns:
List of index tensors for each batch
"""
if ref_tensor.shape[0] <= batch_size:
return torch.arange(ref_tensor.shape[0]).unsqueeze(0)
if permute:
ixs = torch.randperm(ref_tensor.shape[0])
else:
ixs = torch.arange(ref_tensor.shape[0])
return torch.tensor_split(ixs, ixs.shape[0] // batch_size)
[docs]
class LabelScaler:
"""A scaler for preprocessing label data with optional PCA transformation.
This class provides functionality to scale and transform label data by:
1. Centering the data by subtracting the mean
2. Optionally applying PCA dimensionality reduction
3. Scaling the data to unit variance
The transformation can be reversed using the inverse_transform method.
"""
def __init__(self):
"""Initialize the LabelScaler."""
pass
[docs]
def fit_transform(self, arr, pca_dims=-1, pca_whiten=True):
"""Fit the scaler to the data and transform it.
Args:
arr (numpy.ndarray): Input array of shape [dataset_size, *feature_dims]
pca_dims (int): Number of PCA components. If -1, no PCA is applied
pca_whiten (bool): Whether to apply whitening in PCA transformation
Returns:
numpy.ndarray: Transformed array of shape:
- [dataset_size, prod(feature_dims)] if pca_dims=-1
- [dataset_size, pca_dims] if PCA is applied
"""
self.label_dims = arr.shape[1:]
self.pca_whiten = pca_whiten
self.pca_dims = pca_dims
self.mean_ = np.mean(arr, axis=0, keepdims=True)
arr = arr - self.mean_
arr = arr.reshape(arr.shape[0], -1)
if self.pca_dims != -1:
self.pca = PCA(n_components=pca_dims, whiten=pca_whiten)
arr = self.pca.fit_transform(arr)
else:
self.std_ = arr.std()
return arr / arr.std()
if self.pca_dims != -1 and not self.pca_whiten:
self.pca_mean = arr.mean(axis=0, keepdims=True)
self.pca_std = (
arr.std(axis=0).max()
) # using max so that relative scaling is preserved, unlike with whitening
arr = (arr - self.pca_mean) / self.pca_std
return arr
[docs]
def transform(self, arr):
"""Transform new data using the fitted scaler.
Args:
arr (numpy.ndarray): Input array of shape [dataset_size, *feature_dims]
Must match the dimensions of the data used in fit_transform
Returns:
numpy.ndarray: Transformed array of shape:
- [dataset_size, prod(feature_dims)] if pca_dims=-1
- [dataset_size, pca_dims] if PCA is applied
"""
arr = arr - self.mean_
if self.pca_dims != -1:
arr = self.pca.transform(arr.reshape(arr.shape[0], -1))
else:
arr = arr.reshape(arr.shape[0], -1) / self.std_
if self.pca_dims != -1 and not self.pca_whiten:
arr = (arr - self.pca_mean) / self.pca_std
return arr
[docs]
def inverse_transform(self, arr_scaled):
"""Inverse transform scaled data back to the original space.
Args:
arr_scaled (numpy.ndarray or torch.Tensor): Scaled input array of shape
[*batch_dims, n_features] where n_features matches the output
dimension of transform()
Returns:
numpy.ndarray or torch.Tensor: Array in original space with shape
[*batch_dims, *feature_dims] where feature_dims matches the
original input dimensions
Example:
If original data was shape [1000, 32, 32]:
- Can handle inputs of shape [64, 1024] -> [64, 32, 32]
- Can handle inputs of shape [64, 20, 1024] -> [64, 20, 32, 32]
"""
is_tensor = isinstance(arr_scaled, torch.Tensor)
device = arr_scaled.device if is_tensor else None
if is_tensor:
arr_scaled = arr_scaled.cpu().numpy()
if self.pca_dims != -1 and not self.pca_whiten:
arr_scaled = arr_scaled * self.pca_std + self.pca_mean
if self.pca_dims != -1:
arr_scaled = self.pca.inverse_transform(arr_scaled)
else:
arr_scaled = arr_scaled * self.std_
arr_scaled = arr_scaled.reshape(*arr_scaled.shape[:-1], *self.label_dims)
arr_scaled += self.mean_
if is_tensor:
arr_scaled = torch.from_numpy(arr_scaled).to(device)
return arr_scaled
[docs]
def calculate_pit_cdf(preds, y, weights=None):
"""Calculate the Probability Integral Transform (PIT) and its CDF.
Args:
preds (numpy.ndarray): Model predictions of shape [num_predictions, num_samples]
weights (numpy.ndarray): Weights for each prediction of shape [num_predictions, num_samples]
y (numpy.ndarray): Ground truth values of shape [num_predictions]
Returns:
tuple: Contains:
- numpy.ndarray: Reference percentiles (linspace from 0 to 1)
- numpy.ndarray: Empirical CDF of the PIT values
"""
if weights is None:
weights = np.ones_like(preds)
# Sort samples for each prediction
preds_order = np.argsort(preds, axis=1)
all_outputs_sorted = np.take_along_axis(preds, preds_order, axis=1)
weights_sorted = np.take_along_axis(weights, preds_order, axis=1)
# Calculate percentiles for each prediction
weights_to_sum = np.where(
all_outputs_sorted < y.reshape(-1, 1),
weights_sorted,
np.zeros_like(weights_sorted),
)
percentiles = weights_to_sum.sum(axis=1) / weights.sum(axis=1)
# Calculate the empirical cumulative distribution function (CDF) of the probability integral transformed (PIT) values
ref_percentiles = np.linspace(0, 1, 101)
cumulative_percentiles = np.searchsorted(
np.sort(percentiles), ref_percentiles, side="left"
) / len(percentiles)
return ref_percentiles, cumulative_percentiles