Newer
Older
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Function
# Linear Regression (Feed forward Network)
class LinearRegression(nn.Module):
def __init__(self, cfg, input_size):
super(LinearRegression, self).__init__()
self.input_size = input_size
self.bias = cfg["LinearRegression"]["Bias"]
self.hidden_size = int(cfg["LinearRegression"]["HiddenSize"])
self.regressor = nn.Sequential(
nn.Linear(input_size,self.hidden_size,self.bias),
nn.ReLU(),
nn.Linear(self.hidden_size, self.hidden_size, self.bias),
nn.ReLU(),
nn.Linear(self.hidden_size,self.hidden_size,self.bias),
nn.ReLU(),
nn.Linear(self.hidden_size,self.hidden_size,self.bias),
nn.ReLU(),
nn.Linear(self.hidden_size,self.hidden_size,self.bias),
nn.ReLU(),
nn.Linear(self.hidden_size,self.hidden_size,self.bias),
nn.ReLU(),
nn.Linear(self.hidden_size,self.hidden_size, self.bias),
nn.ReLU(),
nn.Linear(self.hidden_size,1, self.bias),
nn.ReLU()
)
# Recurrent Neural Networks
class RNN(nn.Module):
def __init__(self, cfg, input_size):
super(RNN, self).__init__()
self.hidden_size = cfg["RNN"]["HiddenSize"]
self.num_layers = cfg["RNN"]["NumLayers"]
# RNN
self.rnn = nn.Sequential(
nn.RNN(input_size, self.hidden_size, self.num_layers, batch_first=True, nonlinearity='relu'),
nn.Dropout(p=0.2)
)
self.fc = nn.Sequential(
nn.Linear(self.hidden_size, 1),
nn.Dropout(p=0.2)
)
def forward(self, x):
use_cuda = torch.cuda.is_available()
if use_cuda :
device = torch.device('cuda')
else :
device = toch.device('cpu')
# Initialize hidden state with zeros
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
# One time step
out, hn = self.rnn(x, h0)
out = self.fc(out)
return out
# Bidirectional LSTM
class BidirectionalLSTM(nn.Module):
def __init__(self, cfg, input_size):
super(BidirectionalLSTM, self).__init__()
self.hidden_size = cfg["BidirectionalLSTM"]["HiddenSize"]
self.num_layers = cfg["BidirectionalLSTM"]["NumLayers"]
self.lstm = nn.LSTM(input_size, self.hidden_size, self.num_layers, batch_first = True, bidirectional =True, dropout = 0.2)
self.linear = nn.Linear(2*self.hidden_size, 1)
def forward(self, x):
use_cuda = torch.cuda.is_available()
if use_cuda :
device = torch.device('cuda')
else :
device = toch.device('cpu')
h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device)
out, (hidden_state, cell_state) = self.lstm(x, (h0, c0))
result = self.linear(out)
return result
# Initialization
def init_he(module):
if type(module)==nn.Linear:
nn.init.kaiming_uniform_(module.weight, mode='fan_in', nonlinearity='relu')
def init_xavier(module):
nn.init.xavier_uniform_(module.weight)
# Generic function to build model
def initialize_model(cfg, network):
if eval(cfg[cfg['Model']['Name']]['Initialization']) != None:
for name, layer in network.named_modules():
#print(f"{cfg[cfg['Model']['Name']]['Initialization']}")
layer.apply(eval(f"{cfg[cfg['Model']['Name']]['Initialization']}"))
class ModelCheckpoint:
def __init__(self, filepath, model):
self.min_loss = None
self.filepath = filepath
self.model = model
def update(self, loss):
if (self.min_loss is None) or (loss < self.min_loss):
print("Saving a better model")
torch.save(self.model.state_dict(), self.filepath)
self.min_loss = loss