# Standard imports import os import sys import logging import pathlib from typing import Union import struct from datetime import datetime # import multiprocessing from multiprocessing import Lock # from multiprocessing.synchronize import Lock # External imports import numpy as np import tqdm import torch import torch.utils.data as data # Imports from the project from dataset import Dataset _DEFAULT_TRAIN_FILEPATH = "/mounts/Datasets3/2022-ChallengePlankton/sub_2CMEMS-MEDSEA-2010-2016-training.nc.bin" _DEFAULT_TEST_FILEPATH = ( "/mounts/Datasets3/2022-ChallengePlankton/sub_2CMEMS-MEDSEA-2017-testing.nc.bin" ) _ENCODING_LINEAR = "I" _ENCODING_INDEX = "I" # h(short) with 2 bytes should be sufficient _ENCODING_OFFSET_FORMAT = "" _ENCODING_ENDIAN = "<" def train_valid_split( dataset, valid_ratio, train_subset_filepath, valid_subset_filepath, max_num_samples=None, ): """ Generates two splits of 1-valid_ratio and valid_ratio fractions Takes 2 minutes for generating the splits for 50M datapoints For smalls sets, we could use indices in main memory with torch.utils.data.Subset """ N = len(dataset) if max_num_samples is not None: pkeep = max_num_samples / N Nvalid = int(valid_ratio * max_num_samples)/1000000 Ntrain = max_num_samples - Nvalid ptrain = Ntrain / max_num_samples else: pkeep = 1.0 Nvalid = int(valid_ratio * N) Ntrain = N - Nvalid ptrain = Ntrain / N ftrain = open(train_subset_filepath, "wb") fvalid = open(valid_subset_filepath, "wb") gen = np.random.default_rng() logging.info(f"Generating the subset files from {N} samples") for i in tqdm.tqdm(range(N)): if gen.uniform() < pkeep: if gen.uniform() < ptrain: ftrain.write(struct.pack(_ENCODING_ENDIAN + _ENCODING_LINEAR, i)) else: fvalid.write(struct.pack(_ENCODING_ENDIAN + _ENCODING_LINEAR, i)) fvalid.close() ftrain.close() def get_dataloaders( filepath, num_days, batch_size, num_workers, pin_memory, valid_ratio, overwrite_index=True, train_transform=None, train_target_transform=None, valid_transform=None, valid_target_transform=None, max_num_samples=None, ): logging.info("= Dataloaders") # Load the base dataset logging.info(" - Dataset creation") dataset = Dataset( filepath, train=True, overwrite_index=overwrite_index, num_days=num_days ) logging.info(f" - Loaded a dataset with {len(dataset)} samples") # Split the number of samples in each fold logging.info(" - Splitting the data in training and validation sets") train_subset_file = "train_indices.subset" valid_subset_file = "valid_indices.subset" train_valid_split( dataset, valid_ratio, train_subset_file, valid_subset_file, max_num_samples ) logging.info(" - Subset dataset") train_dataset = Dataset( filepath, subset_file=train_subset_file, transform=train_transform, target_transform=train_target_transform, num_days=num_days, ) valid_dataset = Dataset( filepath, subset_file=valid_subset_file, transform=valid_transform, target_transform=valid_target_transform, num_days=num_days, ) # The sum of the two folds are not expected to be exactly of # max_num_samples logging.info(f" - The train fold has {len(train_dataset)} samples") logging.info(f" - The valid fold has {len(valid_dataset)} samples") # Build the dataloaders train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=pin_memory, ) valid_loader = torch.utils.data.DataLoader( valid_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=pin_memory, ) return train_loader, valid_loader def get_stats_train_dataset( filepath, num_days, batch_size, num_workers, pin_memory, valid_ratio, overwrite_index=True, train_transform=None, train_target_transform=None, valid_transform=None, valid_target_transform=None, max_num_samples=None, ): logging.info("= Dataloaders for mean and standard deviation") # Load the base dataset logging.info(" - Dataset creation") dataset = Dataset( filepath, train=True, overwrite_index=overwrite_index, num_days=num_days ) logging.info(f" - Loaded a dataset with {len(dataset)} samples") # Split the number of samples in each fold logging.info(" - Splitting the data in training and validation sets") train_subset_file = "train_indices.subset" valid_subset_file = "valid_indices.subset" train_valid_split( dataset, valid_ratio, train_subset_file, valid_subset_file, max_num_samples ) logging.info(" - Subset dataset") train_dataset = Dataset( filepath, subset_file=train_subset_file, transform=train_transform, target_transform=train_target_transform, num_days=num_days, ) # The sum of the two folds are not expected to be exactly of # max_num_samples logging.info(f" - The train fold has {len(train_dataset)} samples") # Build the dataloaders train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=pin_memory, ) mean = 0. std = 0. nb_samples = 0. it = iter(train_loader) X, Y = next(it) maxi = X.max(dim=0).values.max(dim=0).values mini = X.min(dim=0).values.min(dim=0).values print("Computing the statistics of the train dataset") # Iterate over the data for data in tqdm.tqdm(train_loader, total = len(train_loader)): # Update the total sum mean += data[0].mean(dim=0).mean(dim=0) * data[0].size(0) std += data[0].std(dim=0).mean(dim=0) * data[0].size(0) #Update mini and maxi maxi = torch.stack((data[0].max(dim=0).values.max(dim=0).values, maxi)).max(dim=0).values mini = torch.stack((data[0].min(dim=0).values.min(dim=0).values, mini)).min(dim=0).values # Update the number of samples nb_samples += data[0].size(0) # Calculate the mean and std mean /= nb_samples std /= nb_samples return mean, std, maxi, mini def get_test_dataloader( filepath, num_days, batch_size, num_workers, pin_memory, overwrite_index=True, transform=None, target_transform=None, ): logging.info("= Dataloaders") # Load the base dataset logging.info(" - Dataset creation") test_dataset = Dataset( filepath, train=False, transform=transform, target_transform=target_transform, num_days=num_days, overwrite_index=overwrite_index, ) logging.info(f"I loaded {len(test_dataset)} values in the test set") # Build the dataloader, be carefull not to shuffle the data # to ensure we keep the test_loader = torch.utils.data.DataLoader( test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=pin_memory, ) return test_loader def transform_remove_space_time(): return lambda attributes : attributes[:,4:] def transform_normalize_with_train_statistics(MEAN, STD): return lambda attributes : torch.div(torch.sub(attributes, MEAN), STD) def transform_min_max_scaling(MIN, MAX): return lambda attributes : torch.div(torch.sub(attributes, MIN), torch.sub(MAX,MIN)) def composite_transform(f,g): return lambda attributes : f(g(attributes)) if __name__ == "__main__": logging.basicConfig(filename='logs/dataloader_unit_test.log', level=logging.INFO) logging.info("====> Test dataloader") use_cuda = torch.cuda.is_available() trainpath = _DEFAULT_TRAIN_FILEPATH num_days = 35 # Test with sequence of 1 day batch_size = 128 num_workers = 7 valid_ratio = 0.2 # max_num_samples = 1000 max_num_samples = None MEAN = torch.tensor([ 4.2457e+01, 7.4651e+00, 1.6738e+02, 1.3576e+09, 2.3628e+00, 4.6839e+01, 2.3855e-01, 3.6535e+00, 1.9776e+00, 2.2628e+02, 8.1003e+00, 1.8691e-01, 3.8384e+01, 2.6626e+00, 1.4315e+01,-4.1419e-03, 6.0274e-03, -5.1017e-01]) STD = torch.tensor([5.8939e-01, 8.1625e-01, 1.4535e+02, 5.4952e+07, 1.7543e-02, 1.3846e+02,\ 2.1302e-01, 1.9558e+00, 4.1455e+00, 1.2408e+01, 2.2938e-02, 9.9070e-02,\ 1.9490e-01, 9.2847e-03, 2.2575e+00, 8.5310e-02, 7.8280e-02, 8.6237e-02]) train_loader, valid_loader = get_dataloaders( filepath = trainpath, num_days = num_days, batch_size = batch_size, num_workers = num_workers, pin_memory = True, valid_ratio = valid_ratio, overwrite_index=True, max_num_samples=max_num_samples, train_transform =composite_transform(transform_remove_space_time(), transform_normalize_with_train_statistics(MEAN, STD)) ) mean, std, maxi, mini = get_stats_train_dataset( filepath = trainpath, num_days = num_days, batch_size = batch_size, num_workers = num_workers, pin_memory = True, valid_ratio = valid_ratio, overwrite_index=True, max_num_samples=max_num_samples, ) print("Mini = ") print(mini) print("Maxi =") print(maxi) it = iter(train_loader) X, Y = next(it) print("Size of X :") print(X.shape) # (128, 35, 18) ####### Test to see what is inside a sample : timeframe and localization # print("Latitudes") print(torch.min(X[0, :, 0]), torch.max(X[0, :, 0])) print("Longitudes") print(torch.min(X[0, :, 1]), torch.max(X[0, :, 1])) print("Depths") print(torch.min(X[0, :, 2], torch.max(X[0, :, 2]))) print("Times") print(torch.min(X[0, : ,3]), torch.max(X[0, :, 3])) print("="*30) print("Size of Y :") print(Y.shape) # (128, 35) print("Y for first elem of batch :") print(Y[0,:]) print("-"*30) def check_min(tensor, index): """ For a tensor of shape (B, T, N) return the min value of N[index] """ mini = 1e99 for batch in tensor: for value_time in batch: value = value_time[index].item() if value < mini : mini = value print(f"Min value for index {index} is ") return mini def check_max(tensor, index): """ For a tensor of shape (B, T, N) return the max value of N[index] """ maxi = -1e99 for batch in tensor: for value_time in batch: value = value_time[index].item() if value > maxi : maxi = value print(f"Max value for index {index} is ") return maxi def check_info(tensor, index): print("="*30) print(check_min(tensor, index)) print(check_max(tensor, index)) print("="*30) print(X.shape) check_info(X,0) #latitude check_info(X,1) #longitude check_info(X,2) #depth check_info(X,3) #time """ Size of X = (Batchsize, Number of days, Features (18) (lat, long, depth, time, features)) Size of Y = (Batchsize, Number of days) """ logging.info(f"Got a minibatch of size {X.shape} -> {Y.shape}")