Multiple API⚓
Class Serials
⚓
A class for manage multiple serial devices at the same time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
device name. |
required | |
reconnectDelay |
wait time between reconnection attempts. |
required | |
maxAttempts |
max read attempts. |
required | |
portsRefreshTime |
time for check serial devices changes. |
required | |
emitterIsEnabled |
disable on/emit events (callbacks execution). |
required |
Events
data: it's emitted when new data is available. connection: it's emitted when the connection status is updated. ports: it's emitted when a new device is found or disconnected.
Source code in remio/serialio.py
class Serials:
"""A class for manage multiple serial devices at the same time.
Args:
name: device name.
reconnectDelay: wait time between reconnection attempts.
maxAttempts: max read attempts.
portsRefreshTime: time for check serial devices changes.
emitterIsEnabled: disable on/emit events (callbacks execution).
Events:
data: it's emitted when new data is available.
connection: it's emitted when the connection status is updated.
ports: it's emitted when a new device is found or disconnected.
"""
def __init__(self, devices: dict = {}, *args, **kwargs):
self.devices = {}
if len(devices) > 0:
for name, settings in devices.items():
if isinstance(settings, dict):
self.devices[name] = Serial(name=name, **settings)
def __len__(self):
return len(self.devices)
def __getitem__(self, key):
if key in self.devices:
return self.devices[key]
def hasDevices(self):
"""Checks if there is some serial device on list."""
return len(self.devices) > 0
def setDevices(self, devices: list = [], *args, **kwargs):
"""Updates the current serial devices list.
Args:
devices: serial devices list.
"""
if self.hasDevices():
self.stopAll()
devices = list(set(devices))
self.devices = [Serial(port=name, *args, **kwargs) for name in devices]
def startAll(self):
"""Starts all serial devices"""
for device in self.devices.values():
device.start()
def startOnly(self, name: str = "default"):
"""Starts only one specific serial device.
Args:
name: device name.
"""
if name in self.devices:
self.devices[name].start()
def stopAll(self):
"""Stops all serial devices running."""
for device in self.devices.values():
device.stop()
def stopOnly(self, name: str = "default"):
"""Stops only one specific serial device.
Args:
name: device name.
"""
if name in self.devices:
self.device[name].stop()
def pauseOnly(self, deviceName: str = "default"):
"""Pauses a specific camera device.
Args:
deviceName: camera device name.
"""
if deviceName in self.devices:
device = self.devices[deviceName]
device.pause()
def pauseAll(self):
"""Pauses all camera devices."""
for device in self.devices.values():
device.pause()
def resumeAll(self):
"""Resumes all camera devices."""
for device in self.devices.values():
device.resume()
def resumeOnly(self, deviceName: str = "default"):
"""Resumes a specific camera device.
Args:
deviceName: camera device name.
"""
if deviceName in self.devices:
device = self.devices[deviceName]
device.resume()
@staticmethod
def ports():
"""Returns a list with the availables serial port devices."""
return [port.device for port in list_ports.comports()]
def toJson(self, data: str = ""):
"""Converts a string to a json.
Args:
data: a string message.
"""
try:
return json.loads(data)
except Exception as e:
print(f"-> Serials :: {e}")
return data
def writeTo(
self,
deviceName: str = "default",
message: str = "",
end: str = "\n",
asJson: bool = False,
):
"""Writes a message to a specific serial device.
Args:
deviceName: name of the serial device.
message: message to be written.
end: newline character to be concated with the message.
asJson: transform message to a json?
"""
if deviceName in self.devices:
self.device[deviceName].write(message=message, end=end, asJson=asJson)
def write(self, message: dict = {}, end: str = "\n", asJson: bool = False):
"""Writes a message given a dict with the device name and the message.
Args:
message: message to be written.
end: newline character to be concated with the message.
asJson: transform message to a json?
"""
for deviceName, data in message.items():
if deviceName in self.devices:
self.device[deviceName].write(message=data, end=end, asJson=asJson)
def on(self, eventName: str = "", callback=None, *args, **kwargs):
"""A wrapper function for use on/emit functions. It defines a specific event
to every serial devices listed on current instance.
Args:
eventName: name of the event to be signaled.
callback: callback function
"""
index = 0
for device in self.devices.values():
f = None
# if eventName =='data':
# f = lambda data: callback(device.getPort(), data)
# elif eventName == 'connection':
# f = lambda status: callback(device.getPort(), status)
# elif eventName == 'ports' and index == 0:
# f = lambda ports: callback(device.ports())
# device.on(eventName, f, *args, **kwargs)
# if eventName == 'ports':
# device.on(eventName, callback, *args, **kwargs)
device.on(eventName, callback, *args, **kwargs)
index += 1
hasDevices(self)
⚓
on(self, eventName='', callback=None, *args, **kwargs)
⚓
A wrapper function for use on/emit functions. It defines a specific event to every serial devices listed on current instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
eventName |
str |
name of the event to be signaled. |
'' |
callback |
callback function |
None |
Source code in remio/serialio.py
def on(self, eventName: str = "", callback=None, *args, **kwargs):
"""A wrapper function for use on/emit functions. It defines a specific event
to every serial devices listed on current instance.
Args:
eventName: name of the event to be signaled.
callback: callback function
"""
index = 0
for device in self.devices.values():
f = None
# if eventName =='data':
# f = lambda data: callback(device.getPort(), data)
# elif eventName == 'connection':
# f = lambda status: callback(device.getPort(), status)
# elif eventName == 'ports' and index == 0:
# f = lambda ports: callback(device.ports())
# device.on(eventName, f, *args, **kwargs)
# if eventName == 'ports':
# device.on(eventName, callback, *args, **kwargs)
device.on(eventName, callback, *args, **kwargs)
index += 1
pauseAll(self)
⚓
pauseOnly(self, deviceName='default')
⚓
Pauses a specific camera device.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deviceName |
str |
camera device name. |
'default' |
ports()
staticmethod
⚓
resumeAll(self)
⚓
resumeOnly(self, deviceName='default')
⚓
Resumes a specific camera device.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deviceName |
str |
camera device name. |
'default' |
setDevices(self, devices=[], *args, **kwargs)
⚓
Updates the current serial devices list.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
devices |
list |
serial devices list. |
[] |
Source code in remio/serialio.py
startAll(self)
⚓
startOnly(self, name='default')
⚓
Starts only one specific serial device.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
device name. |
'default' |
stopAll(self)
⚓
stopOnly(self, name='default')
⚓
Stops only one specific serial device.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
device name. |
'default' |
toJson(self, data='')
⚓
Converts a string to a json.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
str |
a string message. |
'' |
write(self, message={}, end='\n', asJson=False)
⚓
Writes a message given a dict with the device name and the message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
dict |
message to be written. |
{} |
end |
str |
newline character to be concated with the message. |
'\n' |
asJson |
bool |
transform message to a json? |
False |
Source code in remio/serialio.py
def write(self, message: dict = {}, end: str = "\n", asJson: bool = False):
"""Writes a message given a dict with the device name and the message.
Args:
message: message to be written.
end: newline character to be concated with the message.
asJson: transform message to a json?
"""
for deviceName, data in message.items():
if deviceName in self.devices:
self.device[deviceName].write(message=data, end=end, asJson=asJson)
writeTo(self, deviceName='default', message='', end='\n', asJson=False)
⚓
Writes a message to a specific serial device.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deviceName |
str |
name of the serial device. |
'default' |
message |
str |
message to be written. |
'' |
end |
str |
newline character to be concated with the message. |
'\n' |
asJson |
bool |
transform message to a json? |
False |
Source code in remio/serialio.py
def writeTo(
self,
deviceName: str = "default",
message: str = "",
end: str = "\n",
asJson: bool = False,
):
"""Writes a message to a specific serial device.
Args:
deviceName: name of the serial device.
message: message to be written.
end: newline character to be concated with the message.
asJson: transform message to a json?
"""
if deviceName in self.devices:
self.device[deviceName].write(message=message, end=end, asJson=asJson)