import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.autograd import Function from torch.autograd import Variable # Linear Regression (Feed forward Network) class LinearRegression(nn.Module): def __init__(self, cfg, input_size): super(LinearRegression, self).__init__() self.input_size = input_size self.bias = cfg["LinearRegression"]["Bias"] self.hidden_size = int(cfg["LinearRegression"]["HiddenSize"]) self.regressor = nn.Sequential( nn.Linear(input_size,self.hidden_size,self.bias), nn.ReLU(), nn.Linear(self.hidden_size, self.hidden_size, self.bias), nn.ReLU(), nn.Linear(self.hidden_size,self.hidden_size,self.bias), nn.ReLU() ) def forward(self, x): return self.regressor(x) # Recurrent Neural Networks class RNN(nn.Module): def __init__(self, cfg, input_size): super(RNN, self).__init__() self.hidden_size = cfg["RNN"]["HiddenSize"] self.num_layers = cfg["RNN"]["NumLayers"] self.num_ffn = cfg["RNN"]["NumFFN"] self.dropout = cfg["RNN"]["Dropout"] # RNN self.rnn = nn.RNN(input_size, self.hidden_size, self.num_layers, batch_first=True, nonlinearity='relu') self.fc = nn.Sequential() for layer in range(self.num_ffn): self.fc.add_module( f"linear_{layer}", nn.Linear(self.hidden_size, self.hidden_size) ) self.fc.add_module( f"relu_{layer}", nn.ReLU() ) self.fc.add_module( f"dropout_{layer}", nn.Dropout(p=self.dropout) ) self.fc.add_module( "last_linear", nn.Linear(self.hidden_size, 1) ) def forward(self, x): use_cuda = torch.cuda.is_available() if use_cuda : device = torch.device('cuda') else : device = toch.device('cpu') # Initialize hidden state with zeros h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) out, hn = self.rnn(x, h0) out = nn.Dropout(p= self.dropout)(out) out = self.fc(out) return out # Bidirectional LSTM class BidirectionalLSTM(nn.Module): def __init__(self, cfg, input_size): super(BidirectionalLSTM, self).__init__() self.hidden_size = cfg["BidirectionalLSTM"]["HiddenSize"] self.num_layers = cfg["BidirectionalLSTM"]["NumLayers"] self.LSTM_dropout = cfg["BidirectionalLSTM"]["LSTMDropout"] self.FFN_dropout = cfg["BidirectionalLSTM"]["FFNDropout"] self.num_ffn = cfg["BidirectionalLSTM"]["NumFFN"] self.lstm = nn.LSTM(input_size, self.hidden_size, self.num_layers, batch_first = True, bidirectional =True, dropout = self.LSTM_dropout) self.fc = nn.Sequential() for layer in range(self.num_ffn): self.fc.add_module( f"linear_{layer}", nn.Linear(2*self.hidden_size, 2*self.hidden_size) ) self.fc.add_module( f"relu_{layer}", nn.ReLU() ) self.fc.add_module( f"dropout_{layer}", nn.Dropout(p=self.FFN_dropout) ) self.fc.add_module( "last_linear", nn.Linear(2*self.hidden_size, 1) ) def forward(self, x): use_cuda = torch.cuda.is_available() if use_cuda : device = torch.device('cuda') else : device = toch.device('cpu') h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device) c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device) out, (hidden_state, cell_state) = self.lstm(x, (h0, c0)) result = self.fc(out) result = nn.ReLU()(result) return result # CNN class CNN1D(torch.nn.Module): def __init__(self, cfg, num_inputs): super(CNN1D, self).__init__() self.model = torch.nn.Sequential( *conv_block(num_inputs, 32), *conv_block(32, 128) ) self.avg_pool = torch.nn.AdaptiveAvgPool1d(1) self.ffn = nn.Sequential( nn.Linear(128, 128), nn.ReLU(), nn.Linear(128,cfg["Dataset"]["num_days"]) ) def forward(self, x): x = torch.transpose(x, 1, 2) out = self.model(x) print(f"This is after CNN : {out}") out = self.avg_pool(out) out = out.view([out.shape[0], -1]) #print(f"{out.shape} this is out.shape") out = self.ffn(out) out = out.view([out.shape[0], out.shape[1], 1]) return out def conv_block(in_channels, out_channels): return [ torch.nn.Conv1d(in_channels, out_channels, kernel_size = 3, stride = 1, padding = 1), torch.nn.ReLU(), torch.nn.BatchNorm1d(out_channels), ] # Initialization def init_he(module): if type(module)==nn.Linear: nn.init.kaiming_uniform_(module.weight, mode='fan_in', nonlinearity='relu') def init_xavier(module): nn.init.xavier_uniform_(module.weight) # Generic function to build model def build_model(cfg, input_size): print(f"{cfg['Model']['Name']}(cfg, input_size)") return eval(f"{cfg['Model']['Name']}(cfg, input_size)") def initialize_model(cfg, network): if eval(cfg[cfg['Model']['Name']]['Initialization']) != None: for name, layer in network.named_modules(): #print(f"{cfg[cfg['Model']['Name']]['Initialization']}") layer.apply(eval(f"{cfg[cfg['Model']['Name']]['Initialization']}")) class ModelCheckpoint: def __init__(self, filepath, model): self.min_loss = None self.filepath = filepath self.model = model def update(self, loss): if (self.min_loss is None) or (loss < self.min_loss): print("Saving a better model") torch.save(self.model.state_dict(), self.filepath) self.min_loss = loss if __name__== "__main__": import yaml config_file = open("config.yml","r") cfg = yaml.load(config_file) print(cfg['Model']['Name'])