您好,登錄后才能下訂單哦!
這篇“怎么使用PyTorch和LSTM實現(xiàn)單變量時間序列預測”文章的知識點大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“怎么使用PyTorch和LSTM實現(xiàn)單變量時間序列預測”文章吧。
時間序列是指在一段時間內(nèi)發(fā)生的任何可量化的度量或事件。盡管這聽起來微不足道,但幾乎任何東西都可以被認為是時間序列。一個月里你每小時的平均心率,一年里一只股票的日收盤價,一年里某個城市每周發(fā)生的交通事故數(shù)。
在任何一段時間段內(nèi)記錄這些信息都被認為是一個時間序列。對于這些例子中的每一個,都有事件發(fā)生的頻率(每天、每周、每小時等)和事件發(fā)生的時間長度(一個月、一年、一天等)。
我們的目標是接收一個值序列,預測該序列中的下一個值。最簡單的方法是使用自回歸模型,我們將專注于使用LSTM來解決這個問題。
讓我們看一個時間序列樣本。下圖顯示了2013年至2018年石油價格的一些數(shù)據(jù)。
這只是一個日期軸上單個數(shù)字序列的圖。下表顯示了這個時間序列的前10個條目。每天都有價格數(shù)據(jù)。
date dcoilwtico
2013-01-01 NaN
2013-01-02 93.14
2013-01-03 92.97
2013-01-04 93.12
2013-01-07 93.20
2013-01-08 93.21
2013-01-09 93.08
2013-01-10 93.81
2013-01-11 93.60
2013-01-14 94.27
許多機器學習模型在標準化數(shù)據(jù)上的表現(xiàn)要好得多。標準化數(shù)據(jù)的標準方法是對數(shù)據(jù)進行轉(zhuǎn)換,使得每一列的均值為0,標準差為1。下面的代碼scikit-learn進行標準化
from sklearn.preprocessing import StandardScaler # Fit scalers scalers = {} for x in df.columns: scalers[x] = StandardScaler().fit(df[x].values.reshape(-1, 1)) # Transform data via scalers norm_df = df.copy() for i, key in enumerate(scalers.keys()): norm = scalers[key].transform(norm_df.iloc[:, i].values.reshape(-1, 1)) norm_df.iloc[:, i] = norm
我們還希望數(shù)據(jù)具有統(tǒng)一的頻率——在這個例子中,有這5年里每天的石油價格,如果你的數(shù)據(jù)情況并非如此,Pandas有幾種不同的方法來重新采樣數(shù)據(jù)以適應統(tǒng)一的頻率,請參考我們公眾號以前的文章
對于訓練數(shù)據(jù)我們需要將完整的時間序列數(shù)據(jù)截取成固定長度的序列。假設(shè)我們有一個序列:[1, 2, 3, 4, 5, 6]。
通過選擇長度為 3 的序列,我們可以生成以下序列及其相關(guān)目標:
[Sequence] Target
[1, 2, 3] → 4
[2, 3, 4] → 5
[3, 4, 5] → 6
或者說我們定義了為了預測下一個值需要回溯多少步。我們將這個值稱為訓練窗口,而要預測的值的數(shù)量稱為預測窗口。在這個例子中,它們分別是3和1。下面的函數(shù)詳細說明了這是如何完成的。
# 如上所示,定義一個創(chuàng)建序列和目標的函數(shù) def generate_sequences(df: pd.DataFrame, tw: int, pw: int, target_columns, drop_targets=False): ''' df: Pandas DataFrame of the univariate time-series tw: Training Window - Integer defining how many steps to look back pw: Prediction Window - Integer defining how many steps forward to predict returns: dictionary of sequences and targets for all sequences ''' data = dict() # Store results into a dictionary L = len(df) for i in range(L-tw): # Option to drop target from dataframe if drop_targets: df.drop(target_columns, axis=1, inplace=True) # Get current sequence sequence = df[i:i+tw].values # Get values right after the current sequence target = df[i+tw:i+tw+pw][target_columns].values data[i] = {'sequence': sequence, 'target': target} return data
這樣我們就可以在PyTorch中使用Dataset類自定義數(shù)據(jù)集
class SequenceDataset(Dataset): def __init__(self, df): self.data = df def __getitem__(self, idx): sample = self.data[idx] return torch.Tensor(sample['sequence']), torch.Tensor(sample['target']) def __len__(self): return len(self.data)
然后,我們可以使用PyTorch DataLoader來遍歷數(shù)據(jù)。使用DataLoader的好處是它在內(nèi)部自動進行批處理和數(shù)據(jù)的打亂,所以我們不必自己實現(xiàn)它,代碼如下:
# 這里我們?yōu)槲覀兊哪P投x屬性 BATCH_SIZE = 16 # Training batch size split = 0.8 # Train/Test Split ratio sequences = generate_sequences(norm_df.dcoilwtico.to_frame(), sequence_len, nout, 'dcoilwtico') dataset = SequenceDataset(sequences) # 根據(jù)拆分比例拆分數(shù)據(jù),并將每個子集加載到單獨的DataLoader對象中 train_len = int(len(dataset)*split) lens = [train_len, len(dataset)-train_len] train_ds, test_ds = random_split(dataset, lens) trainloader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, drop_last=True) testloader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
在每次迭代中,DataLoader將產(chǎn)生16個(批量大小)序列及其相關(guān)目標,我們將這些目標傳遞到模型中。
我們將使用一個單獨的LSTM層,然后是模型的回歸部分的一些線性層,當然在它們之間還有dropout層。該模型將為每個訓練輸入輸出單個值。
class LSTMForecaster(nn.Module): def __init__(self, n_features, n_hidden, n_outputs, sequence_len, n_lstm_layers=1, n_deep_layers=10, use_cuda=False, dropout=0.2): ''' n_features: number of input features (1 for univariate forecasting) n_hidden: number of neurons in each hidden layer n_outputs: number of outputs to predict for each training example n_deep_layers: number of hidden dense layers after the lstm layer sequence_len: number of steps to look back at for prediction dropout: float (0 < dropout < 1) dropout ratio between dense layers ''' super().__init__() self.n_lstm_layers = n_lstm_layers self.nhid = n_hidden self.use_cuda = use_cuda # set option for device selection # LSTM Layer self.lstm = nn.LSTM(n_features, n_hidden, num_layers=n_lstm_layers, batch_first=True) # As we have transformed our data in this way # first dense after lstm self.fc1 = nn.Linear(n_hidden * sequence_len, n_hidden) # Dropout layer self.dropout = nn.Dropout(p=dropout) # Create fully connected layers (n_hidden x n_deep_layers) dnn_layers = [] for i in range(n_deep_layers): # Last layer (n_hidden x n_outputs) if i == n_deep_layers - 1: dnn_layers.append(nn.ReLU()) dnn_layers.append(nn.Linear(nhid, n_outputs)) # All other layers (n_hidden x n_hidden) with dropout option else: dnn_layers.append(nn.ReLU()) dnn_layers.append(nn.Linear(nhid, nhid)) if dropout: dnn_layers.append(nn.Dropout(p=dropout)) # compile DNN layers self.dnn = nn.Sequential(*dnn_layers) def forward(self, x): # Initialize hidden state hidden_state = torch.zeros(self.n_lstm_layers, x.shape[0], self.nhid) cell_state = torch.zeros(self.n_lstm_layers, x.shape[0], self.nhid) # move hidden state to device if self.use_cuda: hidden_state = hidden_state.to(device) cell_state = cell_state.to(device) self.hidden = (hidden_state, cell_state) # Forward Pass x, h = self.lstm(x, self.hidden) # LSTM x = self.dropout(x.contiguous().view(x.shape[0], -1)) # Flatten lstm out x = self.fc1(x) # First Dense return self.dnn(x) # Pass forward through fully connected DNN.
我們設(shè)置了2個可以自由地調(diào)優(yōu)的參數(shù)n_hidden和n_deep_players。更大的參數(shù)意味著模型更復雜和更長的訓練時間,所以這里我們可以使用這兩個參數(shù)靈活調(diào)整。
剩下的參數(shù)如下:sequence_len指的是訓練窗口,nout定義了要預測多少步;將sequence_len設(shè)置為180,nout設(shè)置為1,意味著模型將查看180天(半年)后的情況,以預測明天將發(fā)生什么。
nhid = 50 # Number of nodes in the hidden layer n_dnn_layers = 5 # Number of hidden fully connected layers nout = 1 # Prediction Window sequence_len = 180 # Training Window # Number of features (since this is a univariate timeseries we'll set # this to 1 -- multivariate analysis is coming in the future) ninp = 1 # Device selection (CPU | GPU) USE_CUDA = torch.cuda.is_available() device = 'cuda' if USE_CUDA else 'cpu' # Initialize the model model = LSTMForecaster(ninp, nhid, nout, sequence_len, n_deep_layers=n_dnn_layers, use_cuda=USE_CUDA).to(device)
定義好模型后,我們可以選擇損失函數(shù)和優(yōu)化器,設(shè)置學習率和周期數(shù),并開始我們的訓練循環(huán)。由于這是一個回歸問題(即我們試圖預測一個連續(xù)值),最簡單也是最安全的損失函數(shù)是均方誤差。這提供了一種穩(wěn)健的方法來計算實際值和模型預測值之間的誤差。
優(yōu)化器和損失函數(shù)如下:
# Set learning rate and number of epochs to train over lr = 4e-4 n_epochs = 20 # Initialize the loss function and optimizer criterion = nn.MSELoss().to(device) optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
下面就是訓練循環(huán)的代碼:在每次訓練迭代中,我們將計算之前創(chuàng)建的訓練集和驗證集的損失:
# Lists to store training and validation losses t_losses, v_losses = [], [] # Loop over epochs for epoch in range(n_epochs): train_loss, valid_loss = 0.0, 0.0 # train step model.train() # Loop over train dataset for x, y in trainloader: optimizer.zero_grad() # move inputs to device x = x.to(device) y = y.squeeze().to(device) # Forward Pass preds = model(x).squeeze() loss = criterion(preds, y) # compute batch loss train_loss += loss.item() loss.backward() optimizer.step() epoch_loss = train_loss / len(trainloader) t_losses.append(epoch_loss) # validation step model.eval() # Loop over validation dataset for x, y in testloader: with torch.no_grad(): x, y = x.to(device), y.squeeze().to(device) preds = model(x).squeeze() error = criterion(preds, y) valid_loss += error.item() valid_loss = valid_loss / len(testloader) v_losses.append(valid_loss) print(f'{epoch} - train: {epoch_loss}, valid: {valid_loss}') plot_losses(t_losses, v_losses)
這樣模型已經(jīng)訓練好了,可以評估預測了。
我們調(diào)用訓練過的模型來預測未打亂的數(shù)據(jù),并比較預測與真實觀察有多大不同。
def make_predictions_from_dataloader(model, unshuffled_dataloader): model.eval() predictions, actuals = [], [] for x, y in unshuffled_dataloader: with torch.no_grad(): p = model(x) predictions.append(p) actuals.append(y.squeeze()) predictions = torch.cat(predictions).numpy() actuals = torch.cat(actuals).numpy() return predictions.squeeze(), actuals
石油歷史上的常態(tài)化預測與實際價格
我們的預測看起來還不錯!預測的效果還可以,表明我們沒有過度擬合模型,讓我們看看能否用它來預測未來。
如果我們將歷史定義為預測時刻之前的序列,算法很簡單:
1.從歷史(訓練窗口長度)中獲取最新的有效序列。
2.將最新的序列輸入模型并預測下一個值。
3.將預測值附加到歷史記錄上。
4.迭代重復步驟1。
這里需要注意的是,根據(jù)訓練模型時選擇的參數(shù),你預測的越長(遠),模型就越容易表現(xiàn)出它自己的偏差,開始預測平均值。因此,如果沒有必要,我們不希望總是預測得太超前,因為這會影響預測的準確性。
這在下面的函數(shù)中實現(xiàn):
def one_step_forecast(model, history): ''' model: PyTorch model object history: a sequence of values representing the latest values of the time series, requirement -> len(history.shape) == 2 outputs a single value which is the prediction of the next value in the sequence. ''' model.cpu() model.eval() with torch.no_grad(): pre = torch.Tensor(history).unsqueeze(0) pred = self.model(pre) return pred.detach().numpy().reshape(-1) def n_step_forecast(data: pd.DataFrame, target: str, tw: int, n: int, forecast_from: int=None, plot=False): ''' n: integer defining how many steps to forecast forecast_from: integer defining which index to forecast from. None if you want to forecast from the end. plot: True if you want to output a plot of the forecast, False if not. ''' history = data[target].copy().to_frame() # Create initial sequence input based on where in the series to forecast # from. if forecast_from: pre = list(history[forecast_from - tw : forecast_from][target].values) else: pre = list(history[self.target])[-tw:] # Call one_step_forecast n times and append prediction to history for i, step in enumerate(range(n)): pre_ = np.array(pre[-tw:]).reshape(-1, 1) forecast = self.one_step_forecast(pre_).squeeze() pre.append(forecast) # The rest of this is just to add the forecast to the correct time of # the history series res = history.copy() ls = [np.nan for i in range(len(history))] # Note: I have not handled the edge case where the start index + n is # before the end of the dataset and crosses past it. if forecast_from: ls[forecast_from : forecast_from + n] = list(np.array(pre[-n:])) res['forecast'] = ls res.columns = ['actual', 'forecast'] else: fc = ls + list(np.array(pre[-n:])) ls = ls + [np.nan for i in range(len(pre[-n:]))] ls[:len(history)] = history[self.target].values res = pd.DataFrame([ls, fc], index=['actual', 'forecast']).T return res
我們來看看實際的效果
我們在這個時間序列的中間從不同的地方進行預測,這樣我們就可以將預測與實際發(fā)生的情況進行比較。我們的預測程序,可以從任何地方對任何合理數(shù)量的步驟進行預測,紅線表示預測。(這些圖表顯示的是y軸上的標準化后的價格)
預測2013年第三季度后200天
預測2014/15 后200天
從2016年第一季度開始預測200天
從數(shù)據(jù)的最后一天開始預測200天
以上就是關(guān)于“怎么使用PyTorch和LSTM實現(xiàn)單變量時間序列預測”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對大家有幫助,若想了解更多相關(guān)的知識內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。