model.py 6.15 KiB
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Function
from torch.autograd import Variable
# Linear Regression (Feed forward Network)
class LinearRegression(nn.Module):
def __init__(self, cfg, input_size):
super(LinearRegression, self).__init__()
self.input_size = input_size
self.bias = cfg["LinearRegression"]["Bias"]
self.hidden_size = int(cfg["LinearRegression"]["HiddenSize"])
self.regressor = nn.Sequential(
nn.Linear(input_size,self.hidden_size,self.bias),
nn.ReLU(),
nn.Linear(self.hidden_size, self.hidden_size, self.bias),
nn.ReLU(),
nn.Linear(self.hidden_size,self.hidden_size,self.bias),
nn.ReLU()
)
def forward(self, x):
return self.regressor(x)
# Recurrent Neural Networks
class RNN(nn.Module):
def __init__(self, cfg, input_size):
super(RNN, self).__init__()
self.hidden_size = cfg["RNN"]["HiddenSize"]
self.num_layers = cfg["RNN"]["NumLayers"]
self.num_ffn = cfg["RNN"]["NumFFN"]
self.dropout = cfg["RNN"]["Dropout"]
# RNN
self.rnn = nn.RNN(input_size, self.hidden_size, self.num_layers, batch_first=True, nonlinearity='relu')
self.fc = nn.Sequential()
for layer in range(self.num_ffn):
self.fc.add_module(
f"linear_{layer}", nn.Linear(self.hidden_size, self.hidden_size)
)
self.fc.add_module(
f"relu_{layer}",
nn.ReLU()
)
self.fc.add_module(
f"dropout_{layer}",
nn.Dropout(p=self.dropout)
)
self.fc.add_module(
"last_linear",
nn.Linear(self.hidden_size, 1)
)
def forward(self, x):
use_cuda = torch.cuda.is_available()
if use_cuda :
device = torch.device('cuda')
else :
device = toch.device('cpu')
# Initialize hidden state with zeros
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
out, hn = self.rnn(x, h0)
out = nn.Dropout(p= self.dropout)(out)
out = self.fc(out)
return out
# Bidirectional LSTM
class BidirectionalLSTM(nn.Module):
def __init__(self, cfg, input_size):
super(BidirectionalLSTM, self).__init__()
self.hidden_size = cfg["BidirectionalLSTM"]["HiddenSize"]
self.num_layers = cfg["BidirectionalLSTM"]["NumLayers"]
self.LSTM_dropout = cfg["BidirectionalLSTM"]["LSTMDropout"]
self.FFN_dropout = cfg["BidirectionalLSTM"]["FFNDropout"]
self.num_ffn = cfg["BidirectionalLSTM"]["NumFFN"]
self.lstm = nn.LSTM(input_size, self.hidden_size, self.num_layers, batch_first = True, bidirectional =True, dropout = self.LSTM_dropout)
self.fc = nn.Sequential()
for layer in range(self.num_ffn):
self.fc.add_module(
f"linear_{layer}", nn.Linear(2*self.hidden_size, 2*self.hidden_size)
)
self.fc.add_module(
f"relu_{layer}",
nn.ReLU()
)
self.fc.add_module(
f"dropout_{layer}",
nn.Dropout(p=self.FFN_dropout)
)
self.fc.add_module(
"last_linear",
nn.Linear(2*self.hidden_size, 1)
)
def forward(self, x):
use_cuda = torch.cuda.is_available()
if use_cuda :
device = torch.device('cuda')
else :
device = toch.device('cpu')
h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device)
out, (hidden_state, cell_state) = self.lstm(x, (h0, c0))
result = self.fc(out)
result = nn.ReLU()(result)
return result
# CNN
class CNN1D(torch.nn.Module):
def __init__(self, cfg, num_inputs):
super(CNN1D, self).__init__()
self.model = torch.nn.Sequential(
*conv_block(num_inputs, 32),
*conv_block(32, 128)
)
self.avg_pool = torch.nn.AdaptiveAvgPool1d(1)
self.ffn = nn.Sequential(
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128,cfg["Dataset"]["num_days"])
)
def forward(self, x):
x = torch.transpose(x, 1, 2)
out = self.model(x)
print(f"This is after CNN : {out}")
out = self.avg_pool(out)
out = out.view([out.shape[0], -1])
#print(f"{out.shape} this is out.shape")
out = self.ffn(out)
out = out.view([out.shape[0], out.shape[1], 1])
return out
def conv_block(in_channels, out_channels):
return [
torch.nn.Conv1d(in_channels, out_channels, kernel_size = 3, stride = 1, padding = 1),
torch.nn.ReLU(),
torch.nn.BatchNorm1d(out_channels),
]
# Initialization
def init_he(module):
if type(module)==nn.Linear:
nn.init.kaiming_uniform_(module.weight, mode='fan_in', nonlinearity='relu')
def init_xavier(module):
nn.init.xavier_uniform_(module.weight)
# Generic function to build model
def build_model(cfg, input_size):
print(f"{cfg['Model']['Name']}(cfg, input_size)")
return eval(f"{cfg['Model']['Name']}(cfg, input_size)")
def initialize_model(cfg, network):
if eval(cfg[cfg['Model']['Name']]['Initialization']) != None:
for name, layer in network.named_modules():
#print(f"{cfg[cfg['Model']['Name']]['Initialization']}")
layer.apply(eval(f"{cfg[cfg['Model']['Name']]['Initialization']}"))
class ModelCheckpoint:
def __init__(self, filepath, model):
self.min_loss = None
self.filepath = filepath
self.model = model
def update(self, loss):
if (self.min_loss is None) or (loss < self.min_loss):
print("Saving a better model")
torch.save(self.model.state_dict(), self.filepath)
self.min_loss = loss
if __name__== "__main__":
import yaml
config_file = open("config.yml","r")
cfg = yaml.load(config_file)
print(cfg['Model']['Name'])