Single API⚓
Class Serial
⚓
A custom serial class threaded and event emit based.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
device name. |
'default' |
reconnectDelay |
Union[int, float] |
wait time between reconnection attempts. |
1 |
maxAttempts |
int |
max read attempts. |
10 |
portsRefreshTime |
int |
time for check serial devices changes (secs). |
1 |
emitterIsEnabled |
bool |
disable on/emit events (callbacks execution). |
True |
emitAsDict |
bool |
emit events on dict format {'emitter_name': data} ? |
True |
Events
data: it's emitted when new data is available. connection: it's emitted when the connection status is updated. ports: it's emitted when a new device is found or disconnected.
Source code in remio/serialio.py
class Serial(Emitter):
"""A custom serial class threaded and event emit based.
Args:
name: device name.
reconnectDelay: wait time between reconnection attempts.
maxAttempts: max read attempts.
portsRefreshTime: time for check serial devices changes (secs).
emitterIsEnabled: disable on/emit events (callbacks execution).
emitAsDict: emit events on dict format {'emitter_name': data} ?
Events:
data: it's emitted when new data is available.
connection: it's emitted when the connection status is updated.
ports: it's emitted when a new device is found or disconnected.
"""
def __init__(
self,
name: str = "default",
reconnectDelay: Union[int, float] = 1,
maxAttempts: int = 10,
portsRefreshTime: int = 1,
emitterIsEnabled: bool = True,
emitAsDict: bool = True,
*args,
**kwargs,
):
super().__init__(emitterIsEnabled=emitterIsEnabled, *args, **kwargs)
self.name = name
self.port = kwargs.pop("port", None)
self.reconnectDelay = reconnectDelay
self.maxAttempts = maxAttempts
self.portsRefreshTime = portsRefreshTime
self.emitAsDict = emitAsDict
self.lastConnectionState = False
self.attempts = 0
self.time = 0
self.lastDevicesList = []
self.serial = PySerial(*args, **kwargs)
self.thread = Thread(target=self.run, name="serial-thread", daemon=True)
self.running = Event()
self.pauseEvent = Event()
self.resume()
def __setitem__(self, key, value):
if self.serial.isOpen():
self.serial.close()
setattr(self.serial, key, value)
def isConnected(self):
"""Checks if the serial port is open."""
return self.serial.isOpen()
def getPort(self):
"""Ceturns the current port device."""
return self.serial.port
def setPort(self, port: str = None):
"""Updates the port device value."""
if port is not None:
self.serial.close()
self.serial.port = port
self.connect()
def restorePort(self):
"""Restores the default serial port."""
self.setPort(self.port)
def resume(self):
"""Resumes the read loop."""
self.pauseEvent.set()
def pause(self):
"""Pauses the read loop."""
self.pauseEvent.clear()
def setPause(self, value: bool = True):
"""Updates the pause/resume state."""
if value:
self.pause()
else:
self.resume()
def needAPause(self):
"""Pauses or resume the read loop."""
self.pauseEvent.wait()
def hasDevice(self):
"""Checks if a serial device is setted."""
return self.port is not None
def isOpen(self):
"""Checks if serial port device is open."""
return self.serial.isOpen()
def attemptsLimitReached(self):
"""Checks if read attempts have reached their limit."""
return self.attempts >= self.maxAttempts
def start(self):
"""Starts read loop."""
self.running.set()
self.thread.start()
@staticmethod
def ports():
"""Returns a list with the availables serial port devices."""
return [port.device for port in list_ports.comports()]
def connect(self):
"""Will try to connect with the specified serial device."""
try:
self.lastConnectionState = self.serial.isOpen()
if self.serial.isOpen():
self.serial.close()
if self.serial.port is None and self.port is not None:
self.serial.port = self.port
self.serial.open()
except Exception as e:
print(f"-> Serial - {self.name} :: {e}")
def dictToJson(self, message: dict = {}) -> str:
"""Converts a dictionary to a json str."""
try:
return json.dumps(message)
except Exception as e:
print(f"-> Serial - {self.name} :: {e}")
return message
def write(
self, message: Union[str, dict] = "", end: str = "\n", asJson: bool = False
):
"""Writes a message to the serial device.
Args:
message: string to be sent.
end: newline character to be concated with the message.
asJson: convert to JSON?
"""
if self.serial.isOpen():
try:
if len(message) > 0:
if asJson:
message = self.dictToJson(message)
message += end
message = message.encode()
self.serial.write(message)
except Exception as e:
print(f"-> Serial - {self.name} :: {e}")
def readData(self):
"""Will try to read incoming data."""
try:
data = self.serial.readline().decode().rstrip()
if len(data) > 0:
if self.emitAsDict:
data = {self.name: data}
self.emit("data", data)
return data
except Exception as e:
print(f"-> Serial - {self.name} :: {e}")
if not self.attemptsLimitReached():
self.attempts += 1
else:
self.attempts = 0
try:
self.serial.close()
except Exception as e:
print(f"-> Serial - {self.name} :: {e}")
return None
def checkSerialPorts(self, dt: Union[int, float]):
"""Monitors if there are changes in the serial devices."""
if self.portsRefreshTime > 0:
self.time += dt
if self.time >= self.portsRefreshTime:
actualDevicesList = self.ports()
if actualDevicesList != self.lastDevicesList:
self.lastDevicesList = actualDevicesList
self.emit("ports", actualDevicesList)
self.time = 0
def checkConnectionStatus(self):
"""Checks if the connection status changes."""
if self.lastConnectionState != self.serial.isOpen():
status = self.serial.isOpen()
if self.emitAsDict:
status = {self.name: status}
self.emit("connection", status)
self.lastConnectionState = self.serial.isOpen()
def reconnect(self):
"""Tries to reconnect with the serial device."""
if self.hasDevice():
self.connect()
time.sleep(self.reconnectDelay)
def run(self):
"""Here the run loop is executed."""
while self.running.is_set():
t0 = time.time()
self.checkConnectionStatus()
if self.serial.isOpen():
self.readData()
else:
self.reconnect()
t1 = time.time()
dt = t1 - t0
self.checkSerialPorts(dt)
self.needAPause()
def disconnect(self, force: bool = False):
"""Clears the current serial port device.
Args:
force: force disconnection? prevents reconnection.
"""
if self.serial.port is not None:
self.serial.close()
self.serial.port = None
if force:
self.port = None
def stop(self):
"""Stops the read loop an closed the connection with the serial device."""
self.resume()
self.disconnect()
if self.running.is_set():
self.running.clear()
self.thread.join()
attemptsLimitReached(self)
⚓
checkConnectionStatus(self)
⚓
Checks if the connection status changes.
Source code in remio/serialio.py
checkSerialPorts(self, dt)
⚓
Monitors if there are changes in the serial devices.
Source code in remio/serialio.py
def checkSerialPorts(self, dt: Union[int, float]):
"""Monitors if there are changes in the serial devices."""
if self.portsRefreshTime > 0:
self.time += dt
if self.time >= self.portsRefreshTime:
actualDevicesList = self.ports()
if actualDevicesList != self.lastDevicesList:
self.lastDevicesList = actualDevicesList
self.emit("ports", actualDevicesList)
self.time = 0
connect(self)
⚓
Will try to connect with the specified serial device.
Source code in remio/serialio.py
def connect(self):
"""Will try to connect with the specified serial device."""
try:
self.lastConnectionState = self.serial.isOpen()
if self.serial.isOpen():
self.serial.close()
if self.serial.port is None and self.port is not None:
self.serial.port = self.port
self.serial.open()
except Exception as e:
print(f"-> Serial - {self.name} :: {e}")
dictToJson(self, message={})
⚓
disconnect(self, force=False)
⚓
Clears the current serial port device.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force |
bool |
force disconnection? prevents reconnection. |
False |
getPort(self)
⚓
hasDevice(self)
⚓
isConnected(self)
⚓
isOpen(self)
⚓
needAPause(self)
⚓
pause(self)
⚓
ports()
staticmethod
⚓
readData(self)
⚓
Will try to read incoming data.
Source code in remio/serialio.py
def readData(self):
"""Will try to read incoming data."""
try:
data = self.serial.readline().decode().rstrip()
if len(data) > 0:
if self.emitAsDict:
data = {self.name: data}
self.emit("data", data)
return data
except Exception as e:
print(f"-> Serial - {self.name} :: {e}")
if not self.attemptsLimitReached():
self.attempts += 1
else:
self.attempts = 0
try:
self.serial.close()
except Exception as e:
print(f"-> Serial - {self.name} :: {e}")
return None
reconnect(self)
⚓
restorePort(self)
⚓
resume(self)
⚓
run(self)
⚓
Here the run loop is executed.
setPause(self, value=True)
⚓
setPort(self, port=None)
⚓
start(self)
⚓
stop(self)
⚓
write(self, message='', end='\n', asJson=False)
⚓
Writes a message to the serial device.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Union[str, dict] |
string to be sent. |
'' |
end |
str |
newline character to be concated with the message. |
'\n' |
asJson |
bool |
convert to JSON? |
False |
Source code in remio/serialio.py
def write(
self, message: Union[str, dict] = "", end: str = "\n", asJson: bool = False
):
"""Writes a message to the serial device.
Args:
message: string to be sent.
end: newline character to be concated with the message.
asJson: convert to JSON?
"""
if self.serial.isOpen():
try:
if len(message) > 0:
if asJson:
message = self.dictToJson(message)
message += end
message = message.encode()
self.serial.write(message)
except Exception as e:
print(f"-> Serial - {self.name} :: {e}")