Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework transport to leave re-connect to user #150

Merged
merged 18 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 144 additions & 76 deletions RFXtrx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
# pylint: disable=R0903, invalid-name
# pylint: disable= too-many-lines

import functools
import glob
import socket
import threading
import time
import logging

from time import sleep
Expand Down Expand Up @@ -674,6 +674,21 @@ def __str__(self):
return "{0} device=[{1}]".format(
type(self), self.device)


class ConnectionEvent(RFXtrxEvent):
""" Connection event """
def __init__(self):
super().__init__(None)


class ConnectionLost(ConnectionEvent):
""" Connection lost """


class ConnectionDone(ConnectionEvent):
""" Connection lost """


###############################################################################
# DummySerial class
###############################################################################
Expand Down Expand Up @@ -730,10 +745,19 @@ def close(self):
self._close_event.set()


###############################################################################
# RFXtrxTransportError class
###############################################################################


class RFXtrxTransportError(Exception):
""" Connection error """

###############################################################################
# RFXtrxTransport class
###############################################################################


class RFXtrxTransport:
""" Abstract superclass for all transport mechanisms """

Expand All @@ -757,12 +781,40 @@ def parse(data):
return obj
return None

def connect(self, timeout=None):
""" connect to device """

def reset(self):
""" reset the rfxtrx device """

def close(self):
""" close connection to rfxtrx device """

def receive_blocking(self):
""" Wait until a packet is received and return with an RFXtrxEvent """

def send(self, data):
""" Send the given packet """


def transport_errors(message):
""" Decorator to wrap low level errors in known error. """
def _errors(func):
@functools.wraps(func)
def __errors(instance: RFXtrxTransport, *args, **kargs):
try:
return func(instance, *args, **kargs)
except (socket.error,
serial.SerialException,
OSError) as exception:
_LOGGER.debug("%s failed: %s", message,
str(exception), exc_info=True)
raise RFXtrxTransportError(
"{0} failed: {1}".format(message, exception)
) from exception
return __errors
return _errors

###############################################################################
# PySerialTransport class
###############################################################################
Expand All @@ -774,45 +826,39 @@ class PySerialTransport(RFXtrxTransport):
def __init__(self, port):
self.port = port
self.serial = None
self._run_event = threading.Event()
self._run_event.set()
self.connect()

def connect(self):
@transport_errors("connect")
def connect(self, timeout=None):
""" Open a serial connexion """
try:
self.serial = serial.Serial(self.port, 38400, timeout=0.1)
except serial.serialutil.SerialException:
self.serial = serial.Serial(self.port, 38400)
except serial.SerialException:
port = glob.glob('/dev/serial/by-id/usb-RFXCOM_*-port0')
if len(port) < 1:
return
self.serial = serial.Serial(port[0], 38400, timeout=0.1)
raise
_LOGGER.debug("Attempting connection by name %s", port)
self.serial = serial.Serial(port[0], 38400)

@transport_errors("receive")
def receive_blocking(self):
return self._receive_packet()

def _receive_packet(self):
""" Wait until a packet is received and return with an RFXtrxEvent """
data = None
while self._run_event.is_set():
try:
data = self.serial.read()
except TypeError:
continue
except serial.serialutil.SerialException:
try:
self.connect()
except serial.serialutil.SerialException:
time.sleep(5)
continue
if not data or data == '\x00':
continue
pkt = bytearray(data)
data = self.serial.read(pkt[0])
data = self.serial.read()
if data == '\x00':
return None
pkt = bytearray(data)
while len(pkt) < pkt[0]+1:
data = self.serial.read(pkt[0]+1 - len(pkt))
pkt.extend(bytearray(data))
_LOGGER.debug(
"Recv: %s",
" ".join("0x{0:02x}".format(x) for x in pkt)
)
return self.parse(pkt)
_LOGGER.debug(
"Recv: %s",
" ".join("0x{0:02x}".format(x) for x in pkt)
)
return self.parse(pkt)

@transport_errors("send")
def send(self, data):
""" Send the given packet """
if isinstance(data, bytearray):
Expand All @@ -827,18 +873,19 @@ def send(self, data):
)
self.serial.write(pkt)

@transport_errors("reset")
def reset(self):
""" Reset the RFXtrx """
self.send(b'\x0D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.send(b'\x0D\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00')
elupus marked this conversation as resolved.
Show resolved Hide resolved
sleep(0.3) # Should work with 0.05, but not for me
self.serial.flushInput()

@transport_errors("close")
def close(self):
""" close connection to rfxtrx device """
self._run_event.clear()
self.serial.close()


###############################################################################
# PyNetworkTransport class
###############################################################################
Expand All @@ -850,44 +897,40 @@ class PyNetworkTransport(RFXtrxTransport):
def __init__(self, hostport):
self.hostport = hostport # must be a (host, port) tuple
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._run_event = threading.Event()
self._run_event.set()
self.connect()

def connect(self):
@transport_errors("connect")
def connect(self, timeout=None):
""" Open a socket connection """
try:
self.sock.connect(self.hostport)
_LOGGER.info("Connected to network socket")
except socket.error:
_LOGGER.error('Failed to create socket, check host port config')
# This may throw exception for use by caller:
self.sock.connect(self.hostport)
self.sock.settimeout(timeout)
self.sock.connect(self.hostport)
self.sock.settimeout(None)
_LOGGER.debug("Connected to network socket")

@transport_errors("receive")
def receive_blocking(self):
""" Wait until a packet is received and return with an RFXtrxEvent """
data = None
while self._run_event.is_set():
try:
data = self.sock.recv(1)
except socket.error:
try:
self.connect()
except socket.error:
time.sleep(5)
continue
if not data or data == '\x00':
continue
pkt = bytearray(data)
while len(pkt) < pkt[0]+1:
data = self.sock.recv(pkt[0]+1 - len(pkt))
pkt.extend(bytearray(data))
_LOGGER.debug(
"Recv: %s",
" ".join("0x{0:02x}".format(x) for x in pkt)
)
return self.parse(pkt)
return self._receive_packet()

def _receive_packet(self):
""" Wait until a packet is received and return with an RFXtrxEvent """
data = self.sock.recv(1)
if data == b'':
raise RFXtrxTransportError("Server was shutdown")
if data == '\x00':
return None
pkt = bytearray(data)
while len(pkt) < pkt[0]+1:
data = self.sock.recv(pkt[0]+1 - len(pkt))
if data == b'':
raise RFXtrxTransportError("Server was shutdown")
pkt.extend(bytearray(data))
_LOGGER.debug(
"Recv: %s",
" ".join("0x{0:02x}".format(x) for x in pkt)
)
return self.parse(pkt)

@transport_errors("send")
def send(self, data):
""" Send the given packet """
if isinstance(data, bytearray):
Expand All @@ -902,15 +945,21 @@ def send(self, data):
)
self.sock.send(pkt)

@transport_errors("reset")
def reset(self):
""" Reset the RFXtrx """
self.send(b'\x0D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
sleep(0.3)
self.sock.sendall(b'')

try:
self.send(b'\x0D\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00')
sleep(0.3)
self.sock.sendall(b'')
except socket.error as exception:
raise RFXtrxTransportError(
"Reset failed: {0}".format(exception)) from exception

@transport_errors("reset")
def close(self):
""" close connection to rfxtrx device """
self._run_event.clear()
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()

Expand All @@ -922,6 +971,9 @@ def __init__(self, device=""):
self.device = device
self._close_event = threading.Event()

def connect(self, timeout=None):
pass

def receive(self, data=None):
""" Emulate a receive by parsing the given data """
if data is None:
Expand Down Expand Up @@ -958,6 +1010,8 @@ class DummyTransport2(PySerialTransport):
def __init__(self, device=""):
self.serial = _dummySerial(device, 38400, timeout=0.1)
self._run_event = threading.Event()

def connect(self, timeout=None):
self._run_event.set()


Expand All @@ -966,22 +1020,34 @@ class Connect:
Has methods for sensors.
"""
# pylint: disable=too-many-instance-attributes, too-many-arguments
def __init__(self, device, event_callback=None,
transport_protocol=PySerialTransport,
def __init__(self, transport, event_callback=None,
modes=None):
self._run_event = threading.Event()
self._sensors = {}
self._status = None
self._modes = modes
self._thread = threading.Thread(target=self._connect, daemon=True)
self.event_callback = event_callback
self.transport: RFXtrxTransport = transport

self.transport = transport_protocol(device)
self._thread = threading.Thread(target=self._connect)
self._thread.daemon = True
def connect(self, timeout=None):
"""Connect to device."""
self.transport.connect(timeout)
self._thread.start()
self._run_event.wait()
if not self._run_event.wait(timeout):
self.close_connection()
raise TimeoutError()

def _connect(self):
try:
self._connect_internal()
except RFXtrxTransportError as exception:
_LOGGER.info("Connection lost %s", exception)
finally:
if self.event_callback and self._run_event.is_set():
self.event_callback(ConnectionLost())

def _connect_internal(self):
"""Connect """
self.transport.reset()
self._status = self.send_get_status()
Expand All @@ -998,6 +1064,8 @@ def _connect(self):
self.send_start()

self._run_event.set()
if self.event_callback:
self.event_callback(ConnectionDone())

while self._run_event.is_set():
event = self.transport.receive_blocking()
Expand Down
4 changes: 2 additions & 2 deletions RFXtrx/lowlevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2278,7 +2278,7 @@ def load_receive(self, data):
(data[10] << 8) + data[11])
self.prodwatthours = ((data[12] * pow(2, 24)) + (data[13] << 16) +
(data[14] << 8) + data[15])
self.tarif_num = (data[16] & 0x0f)
self.tarif_num = data[16] & 0x0f
elupus marked this conversation as resolved.
Show resolved Hide resolved
self.voltage = data[17] + 200
self.currentwatt = (data[18] << 8) + data[19]
self.state_byte = data[20]
Expand Down Expand Up @@ -2378,7 +2378,7 @@ def set_transmit(self, subtype, seqnbr, id1, id2, sound):
self.id2 = id2
self.sound = sound
self.rssi = 0
self.rssi_byte = (self.rssi << 4)
self.rssi_byte = self.rssi << 4
elupus marked this conversation as resolved.
Show resolved Hide resolved
self.data = bytearray([self.packetlength, self.packettype,
self.subtype, self.seqnbr,
self.id1, self.id2, self.sound,
Expand Down
3 changes: 2 additions & 1 deletion examples/receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def main():

modes_list = sys.argv[2].split() if len(sys.argv) > 2 else None
print ("modes: ", modes_list)
core = RFXtrx.Core(rfxcom_device, print_callback, modes=modes_list)
core = RFXtrx.Connect(RFXtrx.PySerialTransport(rfxcom_device), print_callback, modes=modes_list)
core.connect()

print (core)
while True:
Expand Down
1 change: 1 addition & 0 deletions examples/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from time import sleep

transport = PySerialTransport('/dev/cu.usbserial-05VN8GHS')
transport.connect()
transport.reset()

while True:
Expand Down
Loading
Loading