Anomaly Detection using LSTM Autoencoder using Keras

The goal of this post is to walk you through the steps to create and train an AI deep learning neural network for anomaly detection using Python, Keras and TensorFlow. I will not delve too much in to the underlying theory and assume the reader has some basic knowledge of the underlying technologies. However, I will provide links to more detailed information as we go and you can find the source code for this study in my GitHub repo.

We will use vibration sensor readings from the NASA Acoustics and Vibration Database as our dataset for this study. In the NASA study, sensor readings were taken on four bearings that were run to failure under constant load over multiple days. Our dataset consists of individual files that are 1-second vibration signal snapshots recorded at 10 minute intervals. Each file contains 20,480 sensor data points per bearing that were obtained by reading the bearing sensors at a sampling rate of 20 kHz.

You can download the sensor data here. You will need to unzip them and combine them into a single data directory.

Anomaly detection is the task of determining when something has gone astray from the “norm”. Anomaly detection using neural networks is modeled in an unsupervised / self-supervised manner; as opposed to supervised learning, where there is a one-to-one correspondence between input feature samples and their corresponding output labels. The presumption is that normal behavior, and hence the quantity of available “normal” data, is the norm and that anomalies are the exception to the norm to the point where the modeling of “normalcy” is possible.

We will use an autoencoder deep learning neural network model to identify vibrational anomalies from the sensor readings. The goal is to predict future bearing failures before they happen.

#MODEL

import warnings

warnings.filterwarnings(‘ignore’)
import tensorflow as tf

import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import seaborn as sns
sns.set(color_codes=True)
import matplotlib.pyplot as plt
# %matplotlib inline
from numpy.random import seed
#%tensorflow_version 1.x

data_dir = r”Downloads\IMS\2nd_test”
merged_data = pd.DataFrame()

for filename in os.listdir(data_dir):
dataset = pd.read_csv(os.path.join(data_dir, filename), sep=’\t’)
dataset_mean_abs = np.array(dataset.abs().mean())
dataset_mean_abs = pd.DataFrame(dataset_mean_abs.reshape(1,4))
dataset_mean_abs.index = [filename]
merged_data = merged_data.append(dataset_mean_abs)

merged_data.columns = [‘Bearing 1’, ‘Bearing 2’, ‘Bearing 3’, ‘Bearing 4’]

# transform data file index to datetime and sort in chronological order
merged_data.index = pd.to_datetime(merged_data.index, format=’%Y.%m.%d.%H.%M.%S’)
merged_data = merged_data.sort_index()
merged_data.to_csv(‘Averaged_BearingTest_Dataset.csv’)
print(“Dataset shape:”, merged_data.shape)
merged_data.head()

#merged_data.tail()

train = merged_data[‘2004-02-12 10:52:39’: ‘2004-02-15 12:52:39’]
test = merged_data[‘2004-02-15 12:52:39’:]
print(“Training dataset shape:”, train.shape)
print(“Test dataset shape:”, test.shape)

“””
from sklearn.model_selection import train_test_split

df = merged_data
RANDOM_SEED = 101

train, test = train_test_split(df, test_size=0.8, random_state = RANDOM_SEED)

print(‘Training data size :’, train.shape)
print(‘Validation data size :’, test.shape)
“””

# normalize the data
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(train)
X_test_scaled = scaler.transform(test)

X_train_scaled.shape

X_train = X_train_scaled.reshape(X_train_scaled.shape[0], 1, X_train_scaled.shape[1])
print(“Training data shape:”, X_train.shape)
X_test = X_test_scaled.reshape(X_test_scaled.shape[0], 1, X_test_scaled.shape[1])
print(“Test data shape:”, X_test.shape)

from tensorflow.keras.layers import Input, Dropout, LSTM , TimeDistributed , RepeatVector, Dense
from tensorflow.keras.models import Model
from tensorflow.keras import regularizers

“””
from keras.layers import Input, Dropout, Dense, LSTM, TimeDistributed, RepeatVector
from keras.models import Model
from keras import regularizers
“””

def autoencoder_model(X):
inputs = Input(shape=(X.shape[1], X.shape[2]))
L1 = LSTM(16, activation=’relu’, return_sequences=True,
kernel_regularizer=regularizers.l2(0.00))(inputs)
L2 = LSTM(4, activation=’relu’, return_sequences=False)(L1)
L3 = RepeatVector(X.shape[1])(L2)
L4 = LSTM(4, activation=’relu’, return_sequences=True)(L3)
L5 = LSTM(16, activation=’relu’, return_sequences=True)(L4)
output = TimeDistributed(Dense(X.shape[2]))(L5)
model = Model(inputs=inputs, outputs=output)
return model

model = autoencoder_model(X_train)
model.compile(optimizer=’adam’, loss=’mae’, metrics=[“accuracy”])
model.summary()

nb_epochs = 100
batch_size = 10

import datetime
t_ini = datetime.datetime.now()

history = model.fit(X_train, X_train, epochs=nb_epochs, batch_size=batch_size,
validation_split=0.05).history

t_fin = datetime.datetime.now()
print(‘Time to run the model: {} Sec.’.format((t_fin – t_ini).total_seconds()))

df_history = pd.DataFrame(history)

predictions = model.predict(X_test)

#plotting

import seaborn as sns
sns.set(color_codes=True)
import matplotlib.pyplot as plt

fig, ax = plt.subplots(figsize=(14, 6), dpi=80)
ax.plot(history[‘loss’], ‘b’, label=’Train’, linewidth=2)
ax.plot(history[‘val_loss’], ‘r’, label=’Validation’, linewidth=2)
ax.set_title(‘Model loss’, fontsize=16)
ax.set_ylabel(‘Loss (mae)’)
ax.set_xlabel(‘Epoch’)
ax.legend(loc=’upper right’)
plt.show()

# plot the loss distribution of the training set
X_pred = model.predict(X_train)
X_pred = X_pred.reshape(X_pred.shape[0], X_pred.shape[2])
X_pred = pd.DataFrame(X_pred, columns=train.columns)
X_pred.index = train.index

scored = pd.DataFrame(index=train.index)
Xtrain = X_train.reshape(X_train.shape[0], X_train.shape[2])
scored[‘Loss_mae’] = np.mean(np.abs(X_pred-Xtrain), axis = 1)
plt.figure(figsize=(16,9), dpi=80)
plt.title(‘Loss Distribution’, fontsize=16)
sns.distplot(scored[‘Loss_mae’], bins = 20, kde= True, color = ‘blue’);
# plt.xlim([0.0,.5])

mx = round(max(scored[‘Loss_mae’]),2)

th = round(((mx * 10)/100) + mx,3)

X_pred = model.predict(X_test)
X_pred = X_pred.reshape(X_pred.shape[0], X_pred.shape[2])
X_pred = pd.DataFrame(X_pred, columns=test.columns)
X_pred.index = test.index

scored = pd.DataFrame(index=test.index)
Xtest = X_test.reshape(X_test.shape[0], X_test.shape[2])
scored[‘Loss_mae’] = np.mean(np.abs(X_pred-Xtest), axis = 1)
scored[‘Threshold’] = th
scored[‘Anomaly’] = scored[‘Loss_mae’] > scored[‘Threshold’]
scored.head()

scored[‘Anomaly’].value_counts()

scored.tail()

model.save(“LSTM_model.h5”)
print(“Model saved”)

 

In the next article, we’ll do another way Anomaly Detection using CNN Autoencoder using Keras

One Reply to “Anomaly Detection using LSTM Autoencoder using Keras”

Leave a Reply

Your email address will not be published. Required fields are marked *