import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.autograd import Function from torch.autograd import Variable import torch.nn as nn # Linear Regression (Feed forward Network) class LinearRegression(nn.Module): def __init__(self, cfg, input_size): super(LinearRegression, self).__init__() self.input_size = input_size self.bias = cfg["LinearRegression"]["Bias"] self.hidden_size = int(cfg["LinearRegression"]["HiddenSize"]) self.regressor = nn.Sequential( nn.Linear(input_size,self.hidden_size,self.bias), nn.ReLU(), nn.Linear(self.hidden_size, self.hidden_size, self.bias), nn.ReLU(), nn.Linear(self.hidden_size,self.hidden_size,self.bias), nn.ReLU(), nn.Linear(self.hidden_size,self.hidden_size,self.bias), nn.ReLU(), nn.Linear(self.hidden_size,self.hidden_size,self.bias), nn.ReLU(), nn.Linear(self.hidden_size,self.hidden_size,self.bias), nn.ReLU(), nn.Linear(self.hidden_size,self.hidden_size, self.bias), nn.ReLU(), nn.Linear(self.hidden_size,1, self.bias), nn.ReLU() ) def forward(self, x): return self.regressor(x) # Recurrent Neural Networks class RNN(nn.Module): def __init__(self, cfg, input_size): super(RNN, self).__init__() self.hidden_size = cfg["RNN"]["HiddenSize"] self.num_layers = cfg["RNN"]["NumLayers"] # RNN self.rnn = nn.Sequential( nn.RNN(input_size, self.hidden_size, self.num_layers, batch_first=True, nonlinearity='relu'), nn.Dropout(p=0.2) ) self.fc = nn.Sequential( nn.Linear(self.hidden_size, 1), nn.Dropout(p=0.2) ) def forward(self, x): use_cuda = torch.cuda.is_available() if use_cuda : device = torch.device('cuda') else : device = toch.device('cpu') # Initialize hidden state with zeros h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) # One time step out, hn = self.rnn(x, h0) out = self.fc(out) return out # Bidirectional LSTM class BidirectionalLSTM(nn.Module): def __init__(self, cfg, input_size): super(BidirectionalLSTM, self).__init__() self.hidden_size = cfg["BidirectionalLSTM"]["HiddenSize"] self.num_layers = cfg["BidirectionalLSTM"]["NumLayers"] self.lstm = nn.LSTM(input_size, self.hidden_size, self.num_layers, batch_first = True, bidirectional =True, dropout = 0.2) self.linear = nn.Linear(2*self.hidden_size, 1) def forward(self, x): use_cuda = torch.cuda.is_available() if use_cuda : device = torch.device('cuda') else : device = toch.device('cpu') h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device) c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device) out, (hidden_state, cell_state) = self.lstm(x, (h0, c0)) result = self.linear(out) return result # Initialization def init_he(module): if type(module)==nn.Linear: nn.init.kaiming_uniform_(module.weight, mode='fan_in', nonlinearity='relu') def init_xavier(module): nn.init.xavier_uniform_(module.weight) # Generic function to build model def build_model(cfg, input_size): print(f"{cfg['Model']['Name']}(cfg, input_size)") return eval(f"{cfg['Model']['Name']}(cfg, input_size)") def initialize_model(cfg, network): if eval(cfg[cfg['Model']['Name']]['Initialization']) != None: for name, layer in network.named_modules(): #print(f"{cfg[cfg['Model']['Name']]['Initialization']}") layer.apply(eval(f"{cfg[cfg['Model']['Name']]['Initialization']}")) class ModelCheckpoint: def __init__(self, filepath, model): self.min_loss = None self.filepath = filepath self.model = model def update(self, loss): if (self.min_loss is None) or (loss < self.min_loss): print("Saving a better model") torch.save(self.model.state_dict(), self.filepath) self.min_loss = loss if __name__== "__main__": import yaml config_file = open("config.yml","r") cfg = yaml.load(config_file) print(cfg['Model']['Name'])