diff --git a/dataloader.py b/dataloader.py new file mode 100644 index 0000000000000000000000000000000000000000..6cd159e44c0e6eb68dae0c91740dae3e2ecc2ee2 --- /dev/null +++ b/dataloader.py @@ -0,0 +1,182 @@ +# Standard imports +import os +import sys +import logging +import pathlib +from typing import Union +import struct + +from datetime import datetime + +# import multiprocessing +from multiprocessing import Lock + +# from multiprocessing.synchronize import Lock + +# External imports +import numpy as np +import tqdm +import torch +import torch.utils.data as data + +# Imports from the project +from dataset import Dataset + +_DEFAULT_TRAIN_FILEPATH = "/mounts/Datasets3/2022-ChallengePlankton/sub_2CMEMS-MEDSEA-2010-2016-training.nc.bin" +_DEFAULT_TEST_FILEPATH = ( + "/mounts/Datasets3/2022-ChallengePlankton/sub_2CMEMS-MEDSEA-2017-testing.nc.bin" +) + +_ENCODING_LINEAR = "I" +_ENCODING_INDEX = "I" # h(short) with 2 bytes should be sufficient +_ENCODING_OFFSET_FORMAT = "" +_ENCODING_ENDIAN = "<" + +def train_valid_split( + dataset, + valid_ratio, + train_subset_filepath, + valid_subset_filepath, + max_num_samples=None, +): + """ + Generates two splits of 1-valid_ratio and valid_ratio fractions + Takes 2 minutes for generating the splits for 50M datapoints + + For smalls sets, we could use indices in main memory with + torch.utils.data.Subset + """ + + N = len(dataset) + if max_num_samples is not None: + pkeep = max_num_samples / N + Nvalid = int(valid_ratio * max_num_samples)/1000000 + Ntrain = max_num_samples - Nvalid + ptrain = Ntrain / max_num_samples + + else: + pkeep = 1.0 + Nvalid = int(valid_ratio * N) + Ntrain = N - Nvalid + ptrain = Ntrain / N + + ftrain = open(train_subset_filepath, "wb") + fvalid = open(valid_subset_filepath, "wb") + + gen = np.random.default_rng() + + logging.info(f"Generating the subset files from {N} samples") + for i in tqdm.tqdm(range(N)): + if gen.uniform() < pkeep: + if gen.uniform() < ptrain: + ftrain.write(struct.pack(_ENCODING_ENDIAN + _ENCODING_LINEAR, i)) + else: + fvalid.write(struct.pack(_ENCODING_ENDIAN + _ENCODING_LINEAR, i)) + fvalid.close() + ftrain.close() + + +def get_dataloaders( + filepath, + num_days, + batch_size, + num_workers, + pin_memory, + valid_ratio, + overwrite_index=True, + train_transform=None, + train_target_transform=None, + valid_transform=None, + valid_target_transform=None, + max_num_samples=None, +): + logging.info("= Dataloaders") + # Load the base dataset + logging.info(" - Dataset creation") + dataset = Dataset( + filepath, train=True, overwrite_index=overwrite_index, num_days=num_days + ) + logging.info(f" - Loaded a dataset with {len(dataset)} samples") + + # Split the number of samples in each fold + logging.info(" - Splitting the data in training and validation sets") + train_subset_file = "train_indices.subset" + valid_subset_file = "valid_indices.subset" + train_valid_split( + dataset, valid_ratio, train_subset_file, valid_subset_file, max_num_samples + ) + + logging.info(" - Subset dataset") + train_dataset = Dataset( + filepath, + subset_file=train_subset_file, + transform=train_transform, + target_transform=train_target_transform, + num_days=num_days, + ) + valid_dataset = Dataset( + filepath, + subset_file=valid_subset_file, + transform=valid_transform, + target_transform=valid_target_transform, + num_days=num_days, + ) + # The sum of the two folds are not expected to be exactly of + # max_num_samples + logging.info(f" - The train fold has {len(train_dataset)} samples") + logging.info(f" - The valid fold has {len(valid_dataset)} samples") + + # Build the dataloaders + train_loader = torch.utils.data.DataLoader( + train_dataset, + batch_size=batch_size, + shuffle=True, + num_workers=num_workers, + pin_memory=pin_memory, + ) + + valid_loader = torch.utils.data.DataLoader( + valid_dataset, + batch_size=batch_size, + shuffle=False, + num_workers=num_workers, + pin_memory=pin_memory, + ) + + return train_loader, valid_loader + + +def get_test_dataloader( + filepath, + num_days, + batch_size, + num_workers, + pin_memory, + overwrite_index=True, + transform=None, + target_transform=None, +): + logging.info("= Dataloaders") + # Load the base dataset + logging.info(" - Dataset creation") + test_dataset = Dataset( + filepath, + train=False, + transform=transform, + target_transform=target_transform, + num_days=num_days, + overwrite_index=overwrite_index, + ) + logging.info(f"I loaded {len(test_dataset)} values in the test set") + + # Build the dataloader, be carefull not to shuffle the data + # to ensure we keep the + test_loader = torch.utils.data.DataLoader( + test_dataset, + batch_size=batch_size, + shuffle=False, + num_workers=num_workers, + pin_memory=pin_memory, + ) + return test_loader + diff --git a/dataset.py b/dataset.py new file mode 100644 index 0000000000000000000000000000000000000000..c814c97de8e6b745186eaef44430fbc0b776c5e9 --- /dev/null +++ b/dataset.py @@ -0,0 +1,368 @@ +# Standard imports +import os +import sys +import logging +import pathlib +from typing import Union +import struct + +from datetime import datetime + +# import multiprocessing +from multiprocessing import Lock + +# from multiprocessing.synchronize import Lock + +# External imports +import numpy as np +import tqdm +import torch +import torch.utils.data as data + +_DEFAULT_TRAIN_FILEPATH = "/mounts/Datasets3/2022-ChallengePlankton/sub_2CMEMS-MEDSEA-2010-2016-training.nc.bin" +_DEFAULT_TEST_FILEPATH = ( + "/mounts/Datasets3/2022-ChallengePlankton/sub_2CMEMS-MEDSEA-2017-testing.nc.bin" +) + +_ENCODING_LINEAR = "I" +_ENCODING_INDEX = "I" # h(short) with 2 bytes should be sufficient +_ENCODING_OFFSET_FORMAT = "" +_ENCODING_ENDIAN = "<" + + +class Dataset(data.Dataset): + """ + Dataset for training a predictor predicting the phytoplankton + density from (a series of) environmental variables + + time + depth + latitude + longitude + + This dataset contains 50 154 984 valid data points + sub_2CMEMS-MEDSEA-2010-2016-training.nc.bin + + Generating the index takes approximately 20 seconds + """ + + def __init__( + self, + filepath: Union[pathlib.Path, str], + overwrite_index: bool = False, + train=True, + subset_file=None, + num_days=1, + transform=None, + target_transform=None, + ): + """ + Builds a pointdataset generating the index if necessary or requested + + Arguments: + filepath: the full path to the nc file to load + overwrite_index: if True ignores the index and regenerates it + train: if True, accessing an element also returns the phyc + subset_file: a filename which holds a list of indices this dataset must use + num_days: the number of days that each sample considers + transform: a transform to apply to the input tensor + target_transform: a transform to apply to the phyc output tensor + """ + super().__init__() + if isinstance(filepath, str): + filepath = pathlib.Path(filepath) + self.filepath = filepath + + # Load the dataset. It can be indexed by [lat, long, depth, time] + self.fp = open(filepath, "rb") + # self.fp.close() + self.fp_lock = Lock() + + # List all the environmental variables + # To which we could add time, depth, latitude, longitude + self.in_variables = [ + "dissic", + "mlotst", + "nh4", + "no3", + "nppv", + "o2", + "ph", + "po4", + "so", + "talk", + "thetao", + "uo", + "vo", + "zos", + # "phyc", + ] + self.train = train + if self.train: + self.out_variable = "phyc" + + # Store the size of the timeseries, in days + self.num_days = num_days + + # Set up the format of a row + self.row_format = ( + "?" + "f" * len(self.in_variables) + ("f" if self.train else "") + ) + self.row_size = struct.calcsize(_ENCODING_ENDIAN + self.row_format) + + # Load the header of the file for the dimension variables + # Local utilitary function to parse the header + def _read_dim(fp, offset, base_format, lock): + fmt = _ENCODING_ENDIAN + "i" + (dim, nbytes_dim) = read_bin_data(fp, offset, os.SEEK_SET, fmt, lock) + + dim = dim[0] # the returned values is a tuple + offset += nbytes_dim + + fmt = _ENCODING_ENDIAN + (base_format * dim) + (values, nbytes_values) = read_bin_data(fp, offset, os.SEEK_SET, fmt, lock) + return dim, np.array(values), nbytes_dim + nbytes_values + + self.header_offset = 0 + self.nlatitudes, self.latitudes, nbytes = _read_dim( + self.fp, self.header_offset, "f", self.fp_lock + ) + self.header_offset += nbytes + + self.nlongitudes, self.longitudes, nbytes = _read_dim( + self.fp, self.header_offset, "f", self.fp_lock + ) + self.header_offset += nbytes + + self.ndepths, self.depths, nbytes = _read_dim( + self.fp, self.header_offset, "f", self.fp_lock + ) + self.header_offset += nbytes + + self.ntimes, self.times, nbytes = _read_dim( + self.fp, self.header_offset, "i", self.fp_lock + ) + # If we need to convert timestamps to datetimes + # self.dtimes = np.array([datetime.fromtimestamp(di) for di in dtimes.tolist()]) + self.header_offset += nbytes + + self.lat_chunk_size = self.nlongitudes * self.ndepths * self.ntimes + self.lon_chunk_size = self.ndepths * self.ntimes + self.depth_chunk_size = self.ntimes + + logging.info( + f"The loaded dataset contains {self.nlatitudes} latitudes, {self.nlongitudes} longitudes, {self.ndepths} depths and {self.ntimes} time points" + ) + + # We store an index which maps a linear index of valid measures + # (i.e. excluding points where there is no phytoplankton record) + # to the byte offset in the data file + self._index_file = None + self._index_lock = Lock() + self._load_index(filepath.name, overwrite_index) + + # The subset map is a filehandler to a file + # where the i-th row contains the i-th index in the original dataset + self._subset_map = None + if subset_file is not None: + self._subset_lock = Lock() + self._load_subset_index(subset_file) + + self.transform = transform + self.target_transform = target_transform + + def _generate_index(self, indexpath): + """ + Generate an index for the binary data file + which maps the linear index to the original data file offset + and 4D indices of valid measures + Note we only index valid measures, i.e. excluding coast, etc.. + + Generating the index takes around 2 minutes for 50M samples + """ + + fp = open(self.filepath, "rb") + with open(indexpath, "wb") as fhindex: + linear_index = 0 + # Rewind the file handler to just after the header + fp.seek(self.header_offset, os.SEEK_SET) + for ilatitude in tqdm.tqdm(range(self.nlatitudes)): + for ilongitude in range(self.nlongitudes): + for idepth in range(self.ndepths): + # No need to iterate over time + # only latitude x longitude x depth + # specify a valid or invalid location + + t0_offset = ( + self.header_offset + + ( + ilatitude * self.lat_chunk_size + + ilongitude * self.lon_chunk_size + + idepth * self.depth_chunk_size + ) + * self.row_size + ) + fp.seek(t0_offset, os.SEEK_SET) + # Just read the 'valid' field + (is_valid,) = struct.unpack( + "<?", fp.read(struct.calcsize("<?")) + ) + + # If the location is valid + # we record all the time samples + if is_valid: + for dt in range( + 0, self.ntimes - self.num_days + 1, self.num_days + ): + fileoffset = t0_offset + dt * self.row_size + + fmt = ( + _ENCODING_ENDIAN + + (_ENCODING_LINEAR * 2) + + (_ENCODING_INDEX * 4) + ) + write_bin_data( + fhindex, + fmt, + ( + linear_index, + fileoffset, + ilatitude, + ilongitude, + idepth, + dt, + ), + ) + + linear_index += 1 + fp.close() + + def _load_index(self, basename: str, overwrite_index: bool): + """ + Loads (and possibly compute) an index file + to convert a linear index idx to its corresponding + time, depth, latitude, longitude indices + + Arguments: + + """ + indexpath = pathlib.Path(".") / f"{basename}_index.idx" + # Generate the index if necessary or requested + if not indexpath.exists() or overwrite_index: + logging.info("Generating the index") + self._generate_index(indexpath) + # And then load the index + logging.info(f"Loading the index from {indexpath}") + self._index_file = open(indexpath, "rb") + + def _load_subset_index(self, subsetpath): + self._subset_map = open(subsetpath, "rb") + + def _get_fileoffset(self, idx): + fmt = _ENCODING_ENDIAN + (_ENCODING_LINEAR * 2) + (_ENCODING_INDEX * 4) + whence = 0 if idx >= 0 else 2 + offset = idx * struct.calcsize(fmt) + + (values, _) = read_bin_data( + self._index_file, offset, whence, fmt, self._index_lock + ) + linidx = values[0] + file_offset = values[1] + tab_indices = values[2:] + return linidx, file_offset, tab_indices + + def __getitem__(self, idx): + """ + Access the i-th measure + Return an array of size (15,)+ time(?) + depth(?)+ lat(?) + lon(?) and the phyc to predict + """ + + # If we are processing a subet, convert the "idx" to the + # original dataset index + if self._subset_map is not None: + fmt = _ENCODING_ENDIAN + _ENCODING_LINEAR + offset = idx * struct.calcsize(fmt) + (values, _) = read_bin_data( + self._subset_map, offset, os.SEEK_SET, fmt, self._subset_lock + ) + idx = values[0] + + # File offset for the i-th sample + _, file_offset, (ilatitude, ilongitude, idepth, itime) = self._get_fileoffset( + idx + ) + values, _ = read_bin_data( + self.fp, + file_offset, + os.SEEK_SET, + _ENCODING_ENDIAN + (self.row_format * self.num_days), + self.fp_lock, + ) + + values = np.array(values).reshape((self.num_days, -1)) + assert np.all( + values[:, 0] + ) # This is always expected to be True since only valid + # locations can be indexed with our generated index map + + # Preprend the latitude, longitude, depth and time + # TODO Take the time to think about whether this is relevant ! + coordinates = np.ones((self.num_days, 4)) + # print(coordinates.shape, self.times[itime : (itime + self.num_days)]) + coordinates[:, 0] *= self.latitudes[ilatitude] + coordinates[:, 1] *= self.longitudes[ilongitude] + coordinates[:, 2] *= self.depths[idepth] + coordinates[:, 3] = self.times[itime : (itime + self.num_days)] + + # Stick the values with the coordinates + # This np array is now (num_days , num_coordinates + num_values) + values = np.hstack([coordinates, values[:, 1:]]) + + if self.train: + in_values = torch.Tensor(values[:, :-1]) + out_values = torch.Tensor(values[:, -1]) + + if self.transform is not None: + in_values = self.transform(in_values) + + if self.target_transform is not None: + out_values = self.target_transform(out_values) + + return in_values, out_values + else: + in_values = torch.Tensor(values[:, :-1]) + if self.transform is not None: + in_values = self.transform(in_values) + return in_values + + def __len__(self): + """ + Returns the total number of valid datapoints for this dataset + + This corresponds to the length of the index or subset index + """ + # If we are processing a subset, the size of the dataset + # is the size of the subset + if self._subset_map is not None: + subset_size = os.path.getsize(self._subset_map.name) + return subset_size // struct.calcsize(_ENCODING_ENDIAN + _ENCODING_LINEAR) + else: + # Access the last of the index file + # and get its linear index. This linear index is also + # the total number of "rows" minus 1 in the file + lin_idx, _, _ = self._get_fileoffset(-1) + return lin_idx + 1 + + + +def write_bin_data(fp, fmt, values): + fp.write(struct.pack(fmt, *values)) + + +def read_bin_data(fp, offset, whence, fmt, lock): + # Unfortunately does not work + # If it worked, that would prevent to open/close + # the file repeatidly + # with lock: + with open(fp.name, "rb") as fp: + # values = struct.unpack_from(fmt, localfp, offset) + fp.seek(offset, whence) + nbytes = struct.calcsize(fmt) + values = struct.unpack(fmt, fp.read(nbytes)) + return values, nbytes diff --git a/debug.py b/debug.py new file mode 100644 index 0000000000000000000000000000000000000000..57961a826add76ca582946c5d7ae7ebde9a7e5fb --- /dev/null +++ b/debug.py @@ -0,0 +1,17 @@ +from dataset import Dataset + + +_DEFAULT_TRAIN_FILEPATH = "/mounts/Datasets3/2022-ChallengePlankton/sub_2CMEMS-MEDSEA-2010-2016-training.nc.bin" +_DEFAULT_TEST_FILEPATH = ( + "/mounts/Datasets3/2022-ChallengePlankton/sub_2CMEMS-MEDSEA-2017-testing.nc.bin" +) + +idx ="sub_2CMEMS-MEDSEA-2010-2016-training.nc.bin_index.idx" + +data = Dataset(_DEFAULT_TRAIN_FILEPATH, False, True, idx, 1, None, None) + +print(len(data[0])) + +print(data[0][0].shape) + +print(data[0][1]) \ No newline at end of file diff --git a/model.py b/model.py new file mode 100644 index 0000000000000000000000000000000000000000..a376ef84c27aba6bda2179b717c1393631d6285d --- /dev/null +++ b/model.py @@ -0,0 +1,6 @@ +import numpy as np +import torch +import torch.nn as nn +import torch.nn.functional as F +from torch.autograd import Function + diff --git a/train_indices.subset b/train_indices.subset index 4c46a39bd8c3ab02ef02ad4470cfeb9b521655ba..20e13c16236888ceacd42847424c1c598cbf0fc6 100644 Binary files a/train_indices.subset and b/train_indices.subset differ