Skip to content

Multiple API

Class Serials

A class for manage multiple serial devices at the same time.

Parameters:

Name Type Description Default
name

device name.

required
reconnectDelay

wait time between reconnection attempts.

required
maxAttempts

max read attempts.

required
portsRefreshTime

time for check serial devices changes.

required
emitterIsEnabled

disable on/emit events (callbacks execution).

required

Events

data: it's emitted when new data is available. connection: it's emitted when the connection status is updated. ports: it's emitted when a new device is found or disconnected.

Source code in remio/serialio.py
class Serials:
    """A class for manage multiple serial devices at the same time.
    Args:
        name: device name.
        reconnectDelay: wait time between reconnection attempts.
        maxAttempts: max read attempts.
        portsRefreshTime: time for check serial devices changes.
        emitterIsEnabled: disable on/emit events (callbacks execution).
    Events:
        data: it's emitted when new data is available.
        connection: it's emitted when the connection status is updated.
        ports: it's emitted when a new device is found or disconnected.
    """

    def __init__(self, devices: dict = {}, *args, **kwargs):
        self.devices = {}
        if len(devices) > 0:
            for name, settings in devices.items():
                if isinstance(settings, dict):
                    self.devices[name] = Serial(name=name, **settings)

    def __len__(self):
        return len(self.devices)

    def __getitem__(self, key):
        if key in self.devices:
            return self.devices[key]

    def hasDevices(self):
        """Checks if there is some serial device on list."""
        return len(self.devices) > 0

    def setDevices(self, devices: list = [], *args, **kwargs):
        """Updates the current serial devices list.
        Args:
            devices: serial devices list.
        """
        if self.hasDevices():
            self.stopAll()
        devices = list(set(devices))
        self.devices = [Serial(port=name, *args, **kwargs) for name in devices]

    def startAll(self):
        """Starts all serial devices"""
        for device in self.devices.values():
            device.start()

    def startOnly(self, name: str = "default"):
        """Starts only one specific serial device.
        Args:
            name: device name.
        """
        if name in self.devices:
            self.devices[name].start()

    def stopAll(self):
        """Stops all serial devices running."""
        for device in self.devices.values():
            device.stop()

    def stopOnly(self, name: str = "default"):
        """Stops only one specific serial device.
        Args:
            name: device name.
        """
        if name in self.devices:
            self.device[name].stop()

    def pauseOnly(self, deviceName: str = "default"):
        """Pauses a specific camera device.
        Args:
            deviceName: camera device name.
        """
        if deviceName in self.devices:
            device = self.devices[deviceName]
            device.pause()

    def pauseAll(self):
        """Pauses all camera devices."""
        for device in self.devices.values():
            device.pause()

    def resumeAll(self):
        """Resumes all camera devices."""
        for device in self.devices.values():
            device.resume()

    def resumeOnly(self, deviceName: str = "default"):
        """Resumes a specific camera device.
        Args:
            deviceName: camera device name.
        """
        if deviceName in self.devices:
            device = self.devices[deviceName]
            device.resume()

    @staticmethod
    def ports():
        """Returns a list with the availables serial port devices."""
        return [port.device for port in list_ports.comports()]

    def toJson(self, data: str = ""):
        """Converts a string to a json.
        Args:
            data: a string message.
        """
        try:
            return json.loads(data)
        except Exception as e:
            print(f"-> Serials :: {e}")
            return data

    def writeTo(
        self,
        deviceName: str = "default",
        message: str = "",
        end: str = "\n",
        asJson: bool = False,
    ):
        """Writes a message to a specific serial device.
        Args:
            deviceName: name of the serial device.
            message: message to be written.
            end: newline character to be concated with the message.
            asJson: transform message to a json?
        """
        if deviceName in self.devices:
            self.device[deviceName].write(message=message, end=end, asJson=asJson)

    def write(self, message: dict = {}, end: str = "\n", asJson: bool = False):
        """Writes a message given a dict with the device name and the message.
        Args:
            message: message to be written.
            end: newline character to be concated with the message.
            asJson: transform message to a json?
        """
        for deviceName, data in message.items():
            if deviceName in self.devices:
                self.device[deviceName].write(message=data, end=end, asJson=asJson)

    def on(self, eventName: str = "", callback=None, *args, **kwargs):
        """A wrapper function for use on/emit functions. It defines a specific event
        to every serial devices listed on current instance.
        Args:
            eventName: name of the event to be signaled.
            callback: callback function
        """
        index = 0
        for device in self.devices.values():
            f = None
            # if eventName =='data':
            #     f = lambda data: callback(device.getPort(), data)
            # elif eventName == 'connection':
            #     f = lambda status: callback(device.getPort(), status)
            # elif eventName == 'ports' and index == 0:
            #     f = lambda ports: callback(device.ports())
            #     device.on(eventName, f, *args, **kwargs)
            # if  eventName == 'ports':
            #     device.on(eventName, callback, *args, **kwargs)
            device.on(eventName, callback, *args, **kwargs)
            index += 1

hasDevices(self)

Checks if there is some serial device on list.

Source code in remio/serialio.py
def hasDevices(self):
    """Checks if there is some serial device on list."""
    return len(self.devices) > 0

on(self, eventName='', callback=None, *args, **kwargs)

A wrapper function for use on/emit functions. It defines a specific event to every serial devices listed on current instance.

Parameters:

Name Type Description Default
eventName str

name of the event to be signaled.

''
callback

callback function

None
Source code in remio/serialio.py
def on(self, eventName: str = "", callback=None, *args, **kwargs):
    """A wrapper function for use on/emit functions. It defines a specific event
    to every serial devices listed on current instance.
    Args:
        eventName: name of the event to be signaled.
        callback: callback function
    """
    index = 0
    for device in self.devices.values():
        f = None
        # if eventName =='data':
        #     f = lambda data: callback(device.getPort(), data)
        # elif eventName == 'connection':
        #     f = lambda status: callback(device.getPort(), status)
        # elif eventName == 'ports' and index == 0:
        #     f = lambda ports: callback(device.ports())
        #     device.on(eventName, f, *args, **kwargs)
        # if  eventName == 'ports':
        #     device.on(eventName, callback, *args, **kwargs)
        device.on(eventName, callback, *args, **kwargs)
        index += 1

pauseAll(self)

Pauses all camera devices.

Source code in remio/serialio.py
def pauseAll(self):
    """Pauses all camera devices."""
    for device in self.devices.values():
        device.pause()

pauseOnly(self, deviceName='default')

Pauses a specific camera device.

Parameters:

Name Type Description Default
deviceName str

camera device name.

'default'
Source code in remio/serialio.py
def pauseOnly(self, deviceName: str = "default"):
    """Pauses a specific camera device.
    Args:
        deviceName: camera device name.
    """
    if deviceName in self.devices:
        device = self.devices[deviceName]
        device.pause()

ports() staticmethod

Returns a list with the availables serial port devices.

Source code in remio/serialio.py
@staticmethod
def ports():
    """Returns a list with the availables serial port devices."""
    return [port.device for port in list_ports.comports()]

resumeAll(self)

Resumes all camera devices.

Source code in remio/serialio.py
def resumeAll(self):
    """Resumes all camera devices."""
    for device in self.devices.values():
        device.resume()

resumeOnly(self, deviceName='default')

Resumes a specific camera device.

Parameters:

Name Type Description Default
deviceName str

camera device name.

'default'
Source code in remio/serialio.py
def resumeOnly(self, deviceName: str = "default"):
    """Resumes a specific camera device.
    Args:
        deviceName: camera device name.
    """
    if deviceName in self.devices:
        device = self.devices[deviceName]
        device.resume()

setDevices(self, devices=[], *args, **kwargs)

Updates the current serial devices list.

Parameters:

Name Type Description Default
devices list

serial devices list.

[]
Source code in remio/serialio.py
def setDevices(self, devices: list = [], *args, **kwargs):
    """Updates the current serial devices list.
    Args:
        devices: serial devices list.
    """
    if self.hasDevices():
        self.stopAll()
    devices = list(set(devices))
    self.devices = [Serial(port=name, *args, **kwargs) for name in devices]

startAll(self)

Starts all serial devices

Source code in remio/serialio.py
def startAll(self):
    """Starts all serial devices"""
    for device in self.devices.values():
        device.start()

startOnly(self, name='default')

Starts only one specific serial device.

Parameters:

Name Type Description Default
name str

device name.

'default'
Source code in remio/serialio.py
def startOnly(self, name: str = "default"):
    """Starts only one specific serial device.
    Args:
        name: device name.
    """
    if name in self.devices:
        self.devices[name].start()

stopAll(self)

Stops all serial devices running.

Source code in remio/serialio.py
def stopAll(self):
    """Stops all serial devices running."""
    for device in self.devices.values():
        device.stop()

stopOnly(self, name='default')

Stops only one specific serial device.

Parameters:

Name Type Description Default
name str

device name.

'default'
Source code in remio/serialio.py
def stopOnly(self, name: str = "default"):
    """Stops only one specific serial device.
    Args:
        name: device name.
    """
    if name in self.devices:
        self.device[name].stop()

toJson(self, data='')

Converts a string to a json.

Parameters:

Name Type Description Default
data str

a string message.

''
Source code in remio/serialio.py
def toJson(self, data: str = ""):
    """Converts a string to a json.
    Args:
        data: a string message.
    """
    try:
        return json.loads(data)
    except Exception as e:
        print(f"-> Serials :: {e}")
        return data

write(self, message={}, end='\n', asJson=False)

Writes a message given a dict with the device name and the message.

Parameters:

Name Type Description Default
message dict

message to be written.

{}
end str

newline character to be concated with the message.

'\n'
asJson bool

transform message to a json?

False
Source code in remio/serialio.py
def write(self, message: dict = {}, end: str = "\n", asJson: bool = False):
    """Writes a message given a dict with the device name and the message.
    Args:
        message: message to be written.
        end: newline character to be concated with the message.
        asJson: transform message to a json?
    """
    for deviceName, data in message.items():
        if deviceName in self.devices:
            self.device[deviceName].write(message=data, end=end, asJson=asJson)

writeTo(self, deviceName='default', message='', end='\n', asJson=False)

Writes a message to a specific serial device.

Parameters:

Name Type Description Default
deviceName str

name of the serial device.

'default'
message str

message to be written.

''
end str

newline character to be concated with the message.

'\n'
asJson bool

transform message to a json?

False
Source code in remio/serialio.py
def writeTo(
    self,
    deviceName: str = "default",
    message: str = "",
    end: str = "\n",
    asJson: bool = False,
):
    """Writes a message to a specific serial device.
    Args:
        deviceName: name of the serial device.
        message: message to be written.
        end: newline character to be concated with the message.
        asJson: transform message to a json?
    """
    if deviceName in self.devices:
        self.device[deviceName].write(message=message, end=end, asJson=asJson)