-
Notifications
You must be signed in to change notification settings - Fork 3
/
comm.py
380 lines (313 loc) · 12.9 KB
/
comm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
"""
The Communications Module is responsible for sending and retrieving data
with the satellite (or the simulator).
"""
import time
import json
import signal
import sys
import os
from enum import Enum
from datetime import datetime, timezone
from groundstation.backend_api.flightschedule import FlightScheduleList, Flightschedule
from groundstation.backend_api.communications import CommunicationList, Communication
from groundstation.backend_api.housekeeping import HousekeepingLogList
from groundstation.tests.utils import fake_housekeeping_as_dict, fake_adcs_hk_as_dict, \
fake_athena_hk_as_dict, fake_eps_hk_as_dict, fake_eps_startup_hk_as_dict, \
fake_uhf_hk_as_dict, fake_sband_hk_as_dict, fake_hyperion_hk_as_dict, \
fake_charon_hk_as_dict, fake_dfgm_hk_as_dict, \
fake_northern_spirit_hk_as_dict, fake_iris_hk_as_dict
class Connection(Enum):
SIMULATOR = 1
SATELLITE = 2
class FSStatus(Enum):
QUEUED = 1
DRAFT = 2
UPLOADED = 3
# Global variables
mode = None
communication_list = CommunicationList()
communication_patch = Communication()
flightschedule_list = FlightScheduleList()
flightschedule_patch = Flightschedule()
housekeeping_post = HousekeepingLogList()
# Handle the sig alarm
def handler(signum, frame):
exit()
def change_fs_status(fs_id, new_status, execution_time=None, error=0):
"""
Given a flightschedule, change its status
:param fs_id: The id of the flightschedule to change
:param new_status: The new status of the flightschedule.
:param execution_time: The execution time of the flightschedule.
:param error: The error code returned after upload (0 means success).
:returns: A dict representing the newly patched flightschedule
"""
if execution_time is None:
fs = flightschedule_patch.get(fs_id)
execution_time = fs[0]['data']['execution_time']
patch_data = {
'status': new_status,
'execution_time': execution_time,
'commands': [],
'error': error
}
return flightschedule_patch.patch(
fs_id, local_data=json.dumps(patch_data))[0]['data']
def get_queued_fs():
"""
Fetches a queued flightschedule.
The website's API logic only allowes for one flightschedule to be queued
at any given time.
:returns: A dict representing the queued flightschedule object
"""
local_args = {'limit': 1, 'queued': 1}
fs = flightschedule_list.get(local_args=local_args)
if len(fs[0]['data']['flightschedules']) >= 1:
return fs[0]['data']['flightschedules'][0]
return None
def reset_fs_status_except_uploaded(uploadedID):
"""
Resets all previously uploaded flightschedules to 'draft' status when
a new flightschedule is uploaded.
:param uploadedID: The ID of the most recently uploaded flightschedule.
:type uploadedID: int
"""
local_args = {'limit': 5, 'queued': 3}
prev_uploaded = flightschedule_list.get(local_args=local_args)
for prev_fs in prev_uploaded[0]['data']['flightschedules']:
if prev_fs['flightschedule_id'] != uploadedID:
change_fs_status(
prev_fs['flightschedule_id'],
FSStatus.DRAFT.value,
prev_fs['execution_time']
)
def format_date_time(dt_str):
"""Generates a datetime object from a string.
:param str dt_str: A date-time string to convert
:returns: A datetime object representing the time passed in from string.
:rtype: datetime
"""
try:
exec_time = (datetime
.strptime(dt_str, '%Y-%m-%d %H:%M:%S.%f')
.replace(tzinfo=timezone.utc))
except ValueError: # Sometimes the date format is off for some reason
exec_time = (datetime
.strptime(dt_str, '%Y-%m-%d %H:%M:%S')
.replace(tzinfo=timezone.utc))
return exec_time
def generate_fs_file(fs):
"""Generates a flight schdeule file for upload.
:param dict fs: A flight schedule object fetched from db.
:return: Path to the flight schedule file.
:rtype: str
"""
file_name = 'flightschedules/fs_{}.txt'.format(fs['flightschedule_id'])
with open(file_name, "w+") as file:
for command in fs['commands']:
# Format the command string from fs
command_name = command['command']['command_name']
args = [arg['argument'] for arg in command['args']]
command_string = ('{}({})'.format(command_name, ','.join(args)))
# Format the date/time from fs
exec_time = format_date_time(command['timestamp'])
time_fields = {
'ms': '*' if command['repeats']['repeat_ms']
else int(exec_time.microsecond / 1000),
'second': '*' if command['repeats']['repeat_sec']
else exec_time.second,
'minute': '*' if command['repeats']['repeat_min']
else exec_time.minute,
'hour': '*' if command['repeats']['repeat_hr']
else exec_time.hour,
'day': '*' if command['repeats']['repeat_day']
else exec_time.day,
'month': '*' if command['repeats']['repeat_month']
else exec_time.month,
'year': '*' if command['repeats']['repeat_year']
else exec_time.year - 1970 # Offset from 1970
}
time_str = ('{ms} {second} {minute} {hour} 0 {day} {month} {year}'
.format(**time_fields))
# Write fs commands to file
print(time_str, command_string, file=file)
return file_name
def send_flightschedules(gs):
queued_fs = get_queued_fs()
if queued_fs is not None:
if mode == Connection.SIMULATOR:
resp = send_to_simulator(queued_fs)
elif mode == Connection.SATELLITE:
file_path = generate_fs_file(queued_fs)
resp = send_to_satellite(
gs, 'ex2.scheduler.replace_schedule({})'.format(file_path))
save_response('Flight Schedule Successful! (ID = {}): {}'.format(
queued_fs['flightschedule_id'], repr(resp)))
if resp['err'] == 0:
change_fs_status(
queued_fs['flightschedule_id'],
FSStatus.UPLOADED.value,
queued_fs['execution_time']
)
reset_fs_status_except_uploaded(queued_fs['flightschedule_id'])
else:
change_fs_status(
queued_fs['flightschedule_id'],
FSStatus.DRAFT.value,
queued_fs['execution_time'],
resp['err']
)
def log_housekeeping(response):
"""
Parses housekeeping data from the HOUSEKEEPING.GET_HK command and creates
a database entry for each housekeeping entry.
"""
for log in response:
if log['err'] != 0:
save_response(
'Failed to log housekeeping! (error: {})'.format(log['err']))
continue
# Form baseline schema for the post data
hk = fake_housekeeping_as_dict(
timestamp=datetime.fromtimestamp(log['UNIXtimestamp']).isoformat(),
data_position=log['dataPosition']
)
hk['adcs'] = fake_adcs_hk_as_dict()
hk['athena'] = fake_athena_hk_as_dict()
hk['eps'] = fake_eps_hk_as_dict()
hk['eps_startup'] = fake_eps_startup_hk_as_dict()
hk['uhf'] = fake_uhf_hk_as_dict()
hk['sband'] = fake_sband_hk_as_dict()
hk['hyperion'] = fake_hyperion_hk_as_dict()
hk['charon'] = fake_charon_hk_as_dict()
hk['dfgm'] = fake_dfgm_hk_as_dict()
hk['northern_spirit'] = fake_northern_spirit_hk_as_dict()
hk['iris'] = fake_iris_hk_as_dict()
# Strip the subsystem title from response data
for key in list(log):
if '#' in key:
log[key.split('\r\n')[-1]] = log.pop(key)
# Copy over response data to post data
subsystems = [
'adcs',
'athena',
'eps',
'eps_startup',
'uhf',
'sband',
'hyperion',
'charon',
'dfgm',
'northern_spirit',
'iris'
]
for subsystem in subsystems:
for key in hk[subsystem]:
hk[subsystem][key] = log[key]
# Post HK data
post_data = json.dumps(hk)
housekeeping_post.post(local_data=post_data)
# Log HK transaction
save_response('Logged housekeeping!\nTimestamp: {}\nData Position: {}'.format(
hk['timestamp'], hk['data_position']))
def send_to_simulator(msg):
try:
return antenna.send(json.dumps(msg))
except Exception as e:
print('Unexpected error occured:', e)
def send_to_satellite(gs, msg):
try:
transactObj = gs.interactive.getTransactionObject(
msg, gs.networkManager)
return transactObj.execute()
except Exception as e:
print('Unexpected error occured:', e)
return 'Unexpected error occured: {}'.format(e)
# Save the satellite response as a comm log
def save_response(message):
print('Received:', message)
message = {
'message': str(message),
'sender': 'comm',
'receiver': 'logs',
'is_queued': False
}
message = json.dumps(message)
communication_list.post(local_data=message)
def communication_loop(gs=None):
"""
Main communication loop which polls for messages that are queued and addressed to comm
(i.e. messages it needs to send to satellite). This should be run when a passover is
expected to occur.
:param Csp csp: The Csp instance. See groundStation.py
"""
if mode == Connection.SATELLITE and gs is None:
raise Exception(
'Ground station instance must be specified if sending to satellite')
request_data = {'is_queued': True,
'receiver': 'comm', 'newest-first': False}
# Check communication table every minute
while True:
# Upload any queued flight schedules
if mode == Connection.SATELLITE:
send_flightschedules(gs)
# Get queued communications
messages = communication_list.get(local_data=request_data)[0]
# If we have queued messages addressed to comm send them to the satellite
if len(messages['data']['messages']) > 0:
for message in messages['data']['messages']:
if message['message']:
# Send the message to the satellite
response = None
msg = message['message']
print('Sent:', msg)
if mode == Connection.SIMULATOR:
msg = message['message'].replace(" ", ".")
response = send_to_simulator(msg)
elif mode == Connection.SATELLITE:
response = send_to_satellite(gs, msg)
if response:
if 'housekeeping.get_hk' in msg or 'housekeeping.get_instant_hk' in msg:
log_housekeeping(response)
elif isinstance(response, list):
for item in response:
save_response(item)
else:
save_response(response)
# Denote that the message has been executed if successful
communication_patch.patch(
message['message_id'],
local_data=json.dumps({'is_queued': False}))
time.sleep(5)
def main():
# Terminate after 10 minutes
signal.signal(signal.SIGALRM, handler)
signal.alarm(60 * 60)
if mode == Connection.SIMULATOR:
communication_loop()
elif mode == Connection.SATELLITE:
opts = optionsFactory("basic")
gs = GroundStation(opts.getOptions())
communication_loop(gs)
if __name__ == '__main__':
if len(sys.argv) > 1:
print('Detected CLI arguments for Ground Station Software!')
print('Automatically setting mode to satellite...')
mode = Connection.SATELLITE
sys.path.append(os.path.join(
sys.path[0], 'ex2_ground_station_software', 'src'))
from groundStation import GroundStation
from options import optionsFactory
else:
if input('Would like to communicate with the satellite simulator (if not, the program '
'will attempt to communicate with the satellite) [Y/n]: ').strip() in ('Y', 'y'):
mode = Connection.SIMULATOR
import satellite_simulator.antenna as antenna
else:
mode = Connection.SATELLITE
sys.path.append(os.path.join(
sys.path[0], 'ex2_ground_station_software', 'src'))
from groundStation import GroundStation
from options import optionsFactory
main()