-
Notifications
You must be signed in to change notification settings - Fork 1
/
utils.py
217 lines (177 loc) · 6.68 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import asyncio
import logging
import socket
import struct
import time
from fcntl import ioctl
from hashlib import sha256
from math import copysign
from sys import platform
from typing import Coroutine, List
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("slac_utils")
# commands
SIOCGIFHWADDR = 0x8927 # Get hardware address
SIOCGIFADDR = 0x8915 # get PA address
# From net/if_arp.h
ARPHDR_ETHER = 1
ARPHDR_METRICOM = 23
ARPHDR_PPP = 512
ARPHDR_LOOPBACK = 772
ARPHDR_TUN = 65534
NID_LENGTH = 7
def generate_nid(nmk: bytes):
"""
It generates a NID key, based on the NMK, which is a random 16 bytes value.
The way the NID is generated is by hashing recursively the NMK 5 times,
reinitializing the sha256 buffer each time. It then collects the first
7 bytes of the result and shifts 4 times the least significant byte.
This algorithm was extracted from the Qualcomm implementation of it
in https://github.com/qca/open-plc-utils/blob/master/key/HPAVKeyNID.c
The procedure to get the NID described in the HPGP 1.1, section 4.4.3.1
does not match the algorithm presented here and in fact makes no sense;
During the testival in Stuttgart between 28-30 September 2021, tried to get
someone to explain the logic described in HPGP but no one could.
:param nmk: NMK [16 bytes] randomly generated
:return: NID [7 bytes]
"""
# For the sake of clarity, the attribute for the method is nmk, but over
# the loop we want to keep updating the same variable and it becomes a
# digest, so we set the initial digest variable value as equal to nmk
digest = nmk
for _ in range(5):
_sha256 = sha256()
_sha256.update(digest)
digest = _sha256.digest()
truncated_digest = digest[:NID_LENGTH]
last_byte = NID_LENGTH - 1
nid = truncated_digest[:last_byte] + (truncated_digest[last_byte] >> 4).to_bytes(
1, "big"
)
return nid
def half_round(x):
"""
Python round() function rounds the float number to the nearest even number,
i.e:
>> round(2.3)
2
>> round(2.5)
2
This happens because python 3.x in contrast to 2.x, uses Banker's rounding
for the function (http://en.wikipedia.org/wiki/Banker's_rounding;
https://wiki.c2.com/?BankersRounding)
This function implements a half-way rounding, which is more commonly used,
for positive and negative numbers. The copysign(0.5, x) provides the sign
of the float number to be rounded and injects it in 0.5.
"""
return int(x + copysign(0.5, x))
def time_now_ms():
return round(time.time() * 1000)
def is_distro_linux() -> bool:
"""
Checks if the Machine is a Linux one
:return:
"""
if platform.startswith("linux"):
return True
return False
def plain_str(x):
"""Convert basic byte objects to str"""
if isinstance(x, bytes):
return x.decode(errors="backslashreplace")
return str(x)
def str2mac(s):
"""
Returns the Mac Address in the format 00:00:00:00:00:00
"""
if isinstance(s, str):
return ("%02x:" * 6)[:-1] % tuple(map(ord, s))
return ("%02x:" * 6)[:-1] % tuple(s)
def get_if_hwaddr(iff: str, to_mac_fmt=False):
"""
Returns the MAC (hardware) address of an interface in readable format
iff: interface name, e.g. en0 or enp0s3
"""
addr_family, mac = get_if_raw_hwaddr_linux(iff) # type: ignore # noqa: F405
if addr_family in [ARPHDR_ETHER, ARPHDR_LOOPBACK]:
if to_mac_fmt:
return str2mac(mac)
return mac
raise Exception(
f"Unsupported address family ({addr_family}) " "for interface [{iff}]"
)
def get_if_raw_hwaddr_linux(iff: str, siocgifhwaddr: int = SIOCGIFHWADDR):
"""Get the raw MAC address of a local interface.
This function uses SIOCGIFHWADDR (System I/O Command Get Interface HW Addr)
calls, therefore only works
on some distros.
iff: str - Interface Designation (e.g. 'en0')
siocgifhwaddr: int = SIOCGIFHWADDR - SIO command
"""
sck = socket.socket()
try:
# input/output control
raw_addr = ioctl(sck, siocgifhwaddr, struct.pack("16s16x", iff.encode("utf8")))
except OSError as e:
raise OSError(f"There is no such interface {iff}") from e
finally:
sck.close()
# raw_addr is a 32 bytes buffer
# 16 bytes are padding ones, thus they are ignored (16x)
# we expect a short (h) - 2 bytes
# a 6-bytes value that is converted as string (6s), like b'\x00\x01..\x06'
# and 8 bytes are ignored as padding (8x)
# so the value returned is a tuple with two items, like:
# (1, b'\x02B\xac\x14\x00\x02')
return struct.unpack("16xh6s8x", raw_addr)
async def cancel_task(task):
"""Cancel the task safely"""
task.cancel()
try:
await task
print("Task Canceled")
except asyncio.CancelledError:
print("Task Didnt Canceled")
def task_callback(task: asyncio.Task):
"""
Callback for a task
This is very useful when spawning tasks in the background with asyncio.create_task.
As the task runs in the background any runtime exception is not logged, so with this
callback, it is possible to keep logging the exceptions
https://stackoverflow.com/questions/66293545/asyncio-re-raise-exception-from-a-task
"""
try:
task.result()
except asyncio.CancelledError:
pass # Task cancellation should not be logged as an error.
except Exception as e:
logger.error(f"Exception raised by task: {task.get_name()}", e)
async def wait_for_tasks(
await_tasks: List[Coroutine], return_when=asyncio.FIRST_EXCEPTION
):
"""
Method to run multiple tasks concurrently.
return_when is used directly in the asyncio.wait call and sets the
condition to cancel all running tasks and return.
The arguments for it can be:
asyncio.FIRST_COMPLETED, asyncio.FIRST_EXCEPTION or
asyncio.ALL_COMPLETED
check:
https://docs.python.org/3/library/asyncio-task.html#waiting-primitives)
Similar solutions for awaiting for several tasks can be found in:
* https://python.plainenglish.io/how-to-manage-exceptions-when-waiting-on-multiple-asyncio-tasks-a5530ac10f02 # noqa: E501
* https://stackoverflow.com/questions/63583822/asyncio-wait-on-multiple-tasks-with-timeout-and-cancellation # noqa: E501
"""
tasks = []
for task in await_tasks:
if not isinstance(task, asyncio.Task):
task = asyncio.create_task(task)
tasks.append(task)
done, pending = await asyncio.wait(tasks, return_when=return_when)
for task in pending:
await cancel_task(task)
for task in done:
try:
task.result()
except Exception as e:
logger.exception(e)