-
Notifications
You must be signed in to change notification settings - Fork 9
/
process.py
82 lines (60 loc) · 3.12 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
import numpy as np
import pandas as pd
import math
from PythonCode.Trajectory_Prediction.load_data_traj import load_test_data, load_test_interpolated_data
import matplotlib.pyplot as plt
scalar = MinMaxScaler(feature_range=(0, 1))
# data pre-processing
def scale_data(x_train):
return scalar.fit_transform(x_train)
def get_inverse_transform(x_train):
return scalar.inverse_transform(x_train)
def reshape_data(x_train, INPUT_LEN, dim):
# samples_train = x_train.shape[0]
# x_train = x_train[:samples_train][:]
print('size = ', x_train.shape[0])
x_train.shape = (x_train.shape[0], INPUT_LEN, dim)
return x_train
def model_evaluate(train_history, trainPredict, testPredict, target_train, target_test, INPUT_LEN, dim, scaling):
plt.plot(range(len(train_history.history['loss'])), train_history.history['loss'])
plt.savefig("loss_seq2seq.png")
plt.pause(0.001)
trainPredict = trainPredict.reshape(trainPredict.__len__(), INPUT_LEN * dim)
testPredict = testPredict.reshape(testPredict.__len__(), INPUT_LEN * dim)
if scaling:
# invert predictions
trainPredict = scalar.inverse_transform(trainPredict)
testPredict = scalar.inverse_transform(testPredict)
trainScore = math.sqrt(mean_squared_error(target_train[:, 0], trainPredict[:, 0])) + \
math.sqrt(mean_squared_error(target_train[:, 1], trainPredict[:, 1]))
print('Train Score: %.2f RMSE' % (trainScore))
testScore = math.sqrt(mean_squared_error(target_test[:, 0], testPredict[:, 0])) + \
math.sqrt(mean_squared_error(target_test[:, 1], testPredict[:, 1]))
print('Test Score: %.2f RMSE' % (testScore))
def test_track(INPUT_LEN, TARGET_LEN, features, dim, track_to_check, model, ENU=False):
original_data, test_data, target_data = load_test_interpolated_data(INPUT_LEN, TARGET_LEN, features, dim, track_to_check)
# data pre-processing
data_test = test_data
data_test = scale_data(test_data)
X_test = reshape_data(data_test, INPUT_LEN, dim)
test_predict = model.predict(X_test)
# testPredict = model.predict(X_test_broken)
# invert predictions
test_predict.shape = (data_test.shape[0], INPUT_LEN * dim)
test_predict = get_inverse_transform(test_predict)
test_predict[test_predict <= 0] = np.nan
fig, ax = plt.subplots(figsize=(8, 8))
ax.plot(original_data[:, 0], original_data[:, 1], '.k', label='Original trajectory')
# ax.plot(original_data[-1, range(dim, INPUT_LEN*dim, dim)], original_data[-1, range(dim+1, INPUT_LEN*dim, dim)], '.m')
ax.plot(test_predict[:, 0], test_predict[:, 1], '.b', label='Predicted trajectory')
ax.plot(test_predict[-1, range(dim, TARGET_LEN * dim, dim)],
test_predict[-1, range(dim + 1, TARGET_LEN * dim, dim)], '.b')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
# plt.title('AIS on-off switching anomaly detection')
ax.legend()
plt.show()
plt.savefig('trajectory_pred1.pdf')
plt.pause(0.001)