Skip to content
Snippets Groups Projects
model.py 6.15 KiB
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Function
from torch.autograd import Variable

# Linear Regression (Feed forward Network)
class LinearRegression(nn.Module):
    def __init__(self, cfg, input_size):
        super(LinearRegression, self).__init__()
        self.input_size = input_size
        self.bias = cfg["LinearRegression"]["Bias"]
        self.hidden_size = int(cfg["LinearRegression"]["HiddenSize"])
        self.regressor = nn.Sequential(
        nn.Linear(input_size,self.hidden_size,self.bias),
        nn.ReLU(),
        nn.Linear(self.hidden_size, self.hidden_size, self.bias),
        nn.ReLU(),
        nn.Linear(self.hidden_size,self.hidden_size,self.bias),
        nn.ReLU()
    )

    def forward(self, x):
        return self.regressor(x) 

# Recurrent Neural Networks
class RNN(nn.Module):
    def __init__(self, cfg, input_size):
        super(RNN, self).__init__()
        self.hidden_size = cfg["RNN"]["HiddenSize"]
        self.num_layers = cfg["RNN"]["NumLayers"]
        self.num_ffn = cfg["RNN"]["NumFFN"]
        self.dropout = cfg["RNN"]["Dropout"]
        # RNN
        self.rnn = nn.RNN(input_size, self.hidden_size, self.num_layers, batch_first=True, nonlinearity='relu')



        self.fc = nn.Sequential()
        
        for layer in range(self.num_ffn):
            self.fc.add_module(
                f"linear_{layer}", nn.Linear(self.hidden_size, self.hidden_size)
            )
            self.fc.add_module(
                f"relu_{layer}", 
                nn.ReLU()
            )
            self.fc.add_module(
                f"dropout_{layer}", 
                nn.Dropout(p=self.dropout)
            )
        self.fc.add_module(
            "last_linear",
            nn.Linear(self.hidden_size, 1)
        )

    
    def forward(self, x):
        use_cuda = torch.cuda.is_available()
        if use_cuda :
            device = torch.device('cuda')
        else :
            device = toch.device('cpu')
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        out, hn = self.rnn(x, h0)
        out = nn.Dropout(p= self.dropout)(out)
        out = self.fc(out)
        return out

# Bidirectional LSTM
class BidirectionalLSTM(nn.Module):
    def __init__(self, cfg, input_size):
        super(BidirectionalLSTM, self).__init__()
        self.hidden_size = cfg["BidirectionalLSTM"]["HiddenSize"]
        self.num_layers =  cfg["BidirectionalLSTM"]["NumLayers"]
        self.LSTM_dropout    =  cfg["BidirectionalLSTM"]["LSTMDropout"]
        self.FFN_dropout    = cfg["BidirectionalLSTM"]["FFNDropout"]
        self.num_ffn    = cfg["BidirectionalLSTM"]["NumFFN"]

        self.lstm = nn.LSTM(input_size, self.hidden_size, self.num_layers, batch_first = True, bidirectional =True, dropout = self.LSTM_dropout)
        self.fc = nn.Sequential()
        
        for layer in range(self.num_ffn):
            self.fc.add_module(
                f"linear_{layer}", nn.Linear(2*self.hidden_size, 2*self.hidden_size)
            )
            self.fc.add_module(
                f"relu_{layer}", 
                nn.ReLU()
            )
            self.fc.add_module(
                f"dropout_{layer}", 
                nn.Dropout(p=self.FFN_dropout)
            )
        self.fc.add_module(
            "last_linear",
            nn.Linear(2*self.hidden_size, 1)
        )

    def forward(self, x):
        use_cuda = torch.cuda.is_available()
        if use_cuda :
            device = torch.device('cuda')
        else :
            device = toch.device('cpu')
        h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device)

        out, (hidden_state, cell_state) = self.lstm(x, (h0, c0))

        result = self.fc(out)
        
        result = nn.ReLU()(result)
        return result


# CNN
class CNN1D(torch.nn.Module):

    def __init__(self, cfg, num_inputs):
        super(CNN1D, self).__init__()
        self.model = torch.nn.Sequential(
            *conv_block(num_inputs, 32),
            *conv_block(32, 128)
        )
        self.avg_pool = torch.nn.AdaptiveAvgPool1d(1)

        self.ffn = nn.Sequential(
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128,cfg["Dataset"]["num_days"])
        )

    def forward(self, x):
        x = torch.transpose(x, 1, 2)

        out = self.model(x)

        print(f"This is after CNN : {out}")
        out = self.avg_pool(out)

        out = out.view([out.shape[0], -1])

        #print(f"{out.shape} this is out.shape")
        out = self.ffn(out)
        out = out.view([out.shape[0], out.shape[1], 1])
        return out

def conv_block(in_channels, out_channels):
    return [
        torch.nn.Conv1d(in_channels, out_channels, kernel_size = 3, stride = 1, padding = 1),
        torch.nn.ReLU(),
        torch.nn.BatchNorm1d(out_channels),
    ]

# Initialization
def init_he(module):
    if type(module)==nn.Linear:
        nn.init.kaiming_uniform_(module.weight, mode='fan_in', nonlinearity='relu')

def init_xavier(module):
    nn.init.xavier_uniform_(module.weight)

# Generic function to build model

def build_model(cfg, input_size):    
    print(f"{cfg['Model']['Name']}(cfg, input_size)")
    return eval(f"{cfg['Model']['Name']}(cfg, input_size)")

def initialize_model(cfg, network):
    if eval(cfg[cfg['Model']['Name']]['Initialization']) != None:
        for name, layer in network.named_modules():
            #print(f"{cfg[cfg['Model']['Name']]['Initialization']}")
            layer.apply(eval(f"{cfg[cfg['Model']['Name']]['Initialization']}"))

class ModelCheckpoint:
    def __init__(self, filepath, model):
        self.min_loss = None
        self.filepath = filepath
        self.model = model

    def update(self, loss):
        if (self.min_loss is None) or (loss < self.min_loss):
            print("Saving a better model")
            torch.save(self.model.state_dict(), self.filepath)
            self.min_loss = loss

if __name__== "__main__":
    import yaml
    config_file = open("config.yml","r")
    cfg = yaml.load(config_file)
    print(cfg['Model']['Name'])