Skip to content

Single API

Class Serial

A custom serial class threaded and event emit based.

Parameters:

Name Type Description Default
name str

device name.

'default'
reconnectDelay Union[int, float]

wait time between reconnection attempts.

1
maxAttempts int

max read attempts.

10
portsRefreshTime int

time for check serial devices changes (secs).

1
emitterIsEnabled bool

disable on/emit events (callbacks execution).

True
emitAsDict bool

emit events on dict format {'emitter_name': data} ?

True

Events

data: it's emitted when new data is available. connection: it's emitted when the connection status is updated. ports: it's emitted when a new device is found or disconnected.

Source code in remio/serialio.py
class Serial(Emitter):
    """A custom serial class threaded and event emit based.
    Args:
        name: device name.
        reconnectDelay: wait time between reconnection attempts.
        maxAttempts: max read attempts.
        portsRefreshTime: time for check serial devices changes (secs).
        emitterIsEnabled: disable on/emit events (callbacks execution).
        emitAsDict: emit events on dict format {'emitter_name': data} ?
    Events:
        data: it's emitted when new data is available.
        connection: it's emitted when the connection status is updated.
        ports: it's emitted when a new device is found or disconnected.
    """

    def __init__(
        self,
        name: str = "default",
        reconnectDelay: Union[int, float] = 1,
        maxAttempts: int = 10,
        portsRefreshTime: int = 1,
        emitterIsEnabled: bool = True,
        emitAsDict: bool = True,
        *args,
        **kwargs,
    ):
        super().__init__(emitterIsEnabled=emitterIsEnabled, *args, **kwargs)
        self.name = name
        self.port = kwargs.pop("port", None)
        self.reconnectDelay = reconnectDelay
        self.maxAttempts = maxAttempts
        self.portsRefreshTime = portsRefreshTime
        self.emitAsDict = emitAsDict
        self.lastConnectionState = False
        self.attempts = 0
        self.time = 0
        self.lastDevicesList = []
        self.serial = PySerial(*args, **kwargs)

        self.thread = Thread(target=self.run, name="serial-thread", daemon=True)
        self.running = Event()
        self.pauseEvent = Event()
        self.resume()

    def __setitem__(self, key, value):
        if self.serial.isOpen():
            self.serial.close()
        setattr(self.serial, key, value)

    def isConnected(self):
        """Checks if the serial port is open."""
        return self.serial.isOpen()

    def getPort(self):
        """Ceturns the current port device."""
        return self.serial.port

    def setPort(self, port: str = None):
        """Updates the port device value."""
        if port is not None:
            self.serial.close()
            self.serial.port = port
            self.connect()

    def restorePort(self):
        """Restores the default serial port."""
        self.setPort(self.port)

    def resume(self):
        """Resumes the read loop."""
        self.pauseEvent.set()

    def pause(self):
        """Pauses the read loop."""
        self.pauseEvent.clear()

    def setPause(self, value: bool = True):
        """Updates the pause/resume state."""
        if value:
            self.pause()
        else:
            self.resume()

    def needAPause(self):
        """Pauses or resume the read loop."""
        self.pauseEvent.wait()

    def hasDevice(self):
        """Checks if a serial device is setted."""
        return self.port is not None

    def isOpen(self):
        """Checks if serial port device is open."""
        return self.serial.isOpen()

    def attemptsLimitReached(self):
        """Checks if read attempts have reached their limit."""
        return self.attempts >= self.maxAttempts

    def start(self):
        """Starts read loop."""
        self.running.set()
        self.thread.start()

    @staticmethod
    def ports():
        """Returns a list with the availables serial port devices."""
        return [port.device for port in list_ports.comports()]

    def connect(self):
        """Will try to connect with the specified serial device."""
        try:
            self.lastConnectionState = self.serial.isOpen()

            if self.serial.isOpen():
                self.serial.close()

            if self.serial.port is None and self.port is not None:
                self.serial.port = self.port

            self.serial.open()

        except Exception as e:
            print(f"-> Serial - {self.name} :: {e}")

    def dictToJson(self, message: dict = {}) -> str:
        """Converts a dictionary to a json str."""
        try:
            return json.dumps(message)
        except Exception as e:
            print(f"-> Serial - {self.name} :: {e}")
        return message

    def write(
        self, message: Union[str, dict] = "", end: str = "\n", asJson: bool = False
    ):
        """Writes a message to the serial device.
        Args:
            message: string to be sent.
            end: newline character to be concated with the message.
            asJson: convert to JSON?
        """
        if self.serial.isOpen():
            try:
                if len(message) > 0:
                    if asJson:
                        message = self.dictToJson(message)
                    message += end
                    message = message.encode()
                    self.serial.write(message)
            except Exception as e:
                print(f"-> Serial - {self.name} :: {e}")

    def readData(self):
        """Will try to read incoming data."""
        try:
            data = self.serial.readline().decode().rstrip()
            if len(data) > 0:
                if self.emitAsDict:
                    data = {self.name: data}
                self.emit("data", data)
                return data
        except Exception as e:
            print(f"-> Serial - {self.name} :: {e}")
            if not self.attemptsLimitReached():
                self.attempts += 1
            else:
                self.attempts = 0
                try:
                    self.serial.close()
                except Exception as e:
                    print(f"-> Serial - {self.name} :: {e}")
            return None

    def checkSerialPorts(self, dt: Union[int, float]):
        """Monitors if there are changes in the serial devices."""
        if self.portsRefreshTime > 0:
            self.time += dt
            if self.time >= self.portsRefreshTime:
                actualDevicesList = self.ports()
                if actualDevicesList != self.lastDevicesList:
                    self.lastDevicesList = actualDevicesList
                    self.emit("ports", actualDevicesList)
                self.time = 0

    def checkConnectionStatus(self):
        """Checks if the connection status changes."""
        if self.lastConnectionState != self.serial.isOpen():
            status = self.serial.isOpen()
            if self.emitAsDict:
                status = {self.name: status}
            self.emit("connection", status)
            self.lastConnectionState = self.serial.isOpen()

    def reconnect(self):
        """Tries to reconnect with the serial device."""
        if self.hasDevice():
            self.connect()
            time.sleep(self.reconnectDelay)

    def run(self):
        """Here the run loop is executed."""
        while self.running.is_set():
            t0 = time.time()

            self.checkConnectionStatus()

            if self.serial.isOpen():
                self.readData()
            else:
                self.reconnect()

            t1 = time.time()
            dt = t1 - t0

            self.checkSerialPorts(dt)
            self.needAPause()

    def disconnect(self, force: bool = False):
        """Clears the current serial port device.
        Args:
            force: force disconnection? prevents reconnection.
        """
        if self.serial.port is not None:
            self.serial.close()
            self.serial.port = None
            if force:
                self.port = None

    def stop(self):
        """Stops the read loop an closed the connection with the serial device."""
        self.resume()
        self.disconnect()
        if self.running.is_set():
            self.running.clear()
            self.thread.join()

attemptsLimitReached(self)

Checks if read attempts have reached their limit.

Source code in remio/serialio.py
def attemptsLimitReached(self):
    """Checks if read attempts have reached their limit."""
    return self.attempts >= self.maxAttempts

checkConnectionStatus(self)

Checks if the connection status changes.

Source code in remio/serialio.py
def checkConnectionStatus(self):
    """Checks if the connection status changes."""
    if self.lastConnectionState != self.serial.isOpen():
        status = self.serial.isOpen()
        if self.emitAsDict:
            status = {self.name: status}
        self.emit("connection", status)
        self.lastConnectionState = self.serial.isOpen()

checkSerialPorts(self, dt)

Monitors if there are changes in the serial devices.

Source code in remio/serialio.py
def checkSerialPorts(self, dt: Union[int, float]):
    """Monitors if there are changes in the serial devices."""
    if self.portsRefreshTime > 0:
        self.time += dt
        if self.time >= self.portsRefreshTime:
            actualDevicesList = self.ports()
            if actualDevicesList != self.lastDevicesList:
                self.lastDevicesList = actualDevicesList
                self.emit("ports", actualDevicesList)
            self.time = 0

connect(self)

Will try to connect with the specified serial device.

Source code in remio/serialio.py
def connect(self):
    """Will try to connect with the specified serial device."""
    try:
        self.lastConnectionState = self.serial.isOpen()

        if self.serial.isOpen():
            self.serial.close()

        if self.serial.port is None and self.port is not None:
            self.serial.port = self.port

        self.serial.open()

    except Exception as e:
        print(f"-> Serial - {self.name} :: {e}")

dictToJson(self, message={})

Converts a dictionary to a json str.

Source code in remio/serialio.py
def dictToJson(self, message: dict = {}) -> str:
    """Converts a dictionary to a json str."""
    try:
        return json.dumps(message)
    except Exception as e:
        print(f"-> Serial - {self.name} :: {e}")
    return message

disconnect(self, force=False)

Clears the current serial port device.

Parameters:

Name Type Description Default
force bool

force disconnection? prevents reconnection.

False
Source code in remio/serialio.py
def disconnect(self, force: bool = False):
    """Clears the current serial port device.
    Args:
        force: force disconnection? prevents reconnection.
    """
    if self.serial.port is not None:
        self.serial.close()
        self.serial.port = None
        if force:
            self.port = None

getPort(self)

Ceturns the current port device.

Source code in remio/serialio.py
def getPort(self):
    """Ceturns the current port device."""
    return self.serial.port

hasDevice(self)

Checks if a serial device is setted.

Source code in remio/serialio.py
def hasDevice(self):
    """Checks if a serial device is setted."""
    return self.port is not None

isConnected(self)

Checks if the serial port is open.

Source code in remio/serialio.py
def isConnected(self):
    """Checks if the serial port is open."""
    return self.serial.isOpen()

isOpen(self)

Checks if serial port device is open.

Source code in remio/serialio.py
def isOpen(self):
    """Checks if serial port device is open."""
    return self.serial.isOpen()

needAPause(self)

Pauses or resume the read loop.

Source code in remio/serialio.py
def needAPause(self):
    """Pauses or resume the read loop."""
    self.pauseEvent.wait()

pause(self)

Pauses the read loop.

Source code in remio/serialio.py
def pause(self):
    """Pauses the read loop."""
    self.pauseEvent.clear()

ports() staticmethod

Returns a list with the availables serial port devices.

Source code in remio/serialio.py
@staticmethod
def ports():
    """Returns a list with the availables serial port devices."""
    return [port.device for port in list_ports.comports()]

readData(self)

Will try to read incoming data.

Source code in remio/serialio.py
def readData(self):
    """Will try to read incoming data."""
    try:
        data = self.serial.readline().decode().rstrip()
        if len(data) > 0:
            if self.emitAsDict:
                data = {self.name: data}
            self.emit("data", data)
            return data
    except Exception as e:
        print(f"-> Serial - {self.name} :: {e}")
        if not self.attemptsLimitReached():
            self.attempts += 1
        else:
            self.attempts = 0
            try:
                self.serial.close()
            except Exception as e:
                print(f"-> Serial - {self.name} :: {e}")
        return None

reconnect(self)

Tries to reconnect with the serial device.

Source code in remio/serialio.py
def reconnect(self):
    """Tries to reconnect with the serial device."""
    if self.hasDevice():
        self.connect()
        time.sleep(self.reconnectDelay)

restorePort(self)

Restores the default serial port.

Source code in remio/serialio.py
def restorePort(self):
    """Restores the default serial port."""
    self.setPort(self.port)

resume(self)

Resumes the read loop.

Source code in remio/serialio.py
def resume(self):
    """Resumes the read loop."""
    self.pauseEvent.set()

run(self)

Here the run loop is executed.

Source code in remio/serialio.py
def run(self):
    """Here the run loop is executed."""
    while self.running.is_set():
        t0 = time.time()

        self.checkConnectionStatus()

        if self.serial.isOpen():
            self.readData()
        else:
            self.reconnect()

        t1 = time.time()
        dt = t1 - t0

        self.checkSerialPorts(dt)
        self.needAPause()

setPause(self, value=True)

Updates the pause/resume state.

Source code in remio/serialio.py
def setPause(self, value: bool = True):
    """Updates the pause/resume state."""
    if value:
        self.pause()
    else:
        self.resume()

setPort(self, port=None)

Updates the port device value.

Source code in remio/serialio.py
def setPort(self, port: str = None):
    """Updates the port device value."""
    if port is not None:
        self.serial.close()
        self.serial.port = port
        self.connect()

start(self)

Starts read loop.

Source code in remio/serialio.py
def start(self):
    """Starts read loop."""
    self.running.set()
    self.thread.start()

stop(self)

Stops the read loop an closed the connection with the serial device.

Source code in remio/serialio.py
def stop(self):
    """Stops the read loop an closed the connection with the serial device."""
    self.resume()
    self.disconnect()
    if self.running.is_set():
        self.running.clear()
        self.thread.join()

write(self, message='', end='\n', asJson=False)

Writes a message to the serial device.

Parameters:

Name Type Description Default
message Union[str, dict]

string to be sent.

''
end str

newline character to be concated with the message.

'\n'
asJson bool

convert to JSON?

False
Source code in remio/serialio.py
def write(
    self, message: Union[str, dict] = "", end: str = "\n", asJson: bool = False
):
    """Writes a message to the serial device.
    Args:
        message: string to be sent.
        end: newline character to be concated with the message.
        asJson: convert to JSON?
    """
    if self.serial.isOpen():
        try:
            if len(message) > 0:
                if asJson:
                    message = self.dictToJson(message)
                message += end
                message = message.encode()
                self.serial.write(message)
        except Exception as e:
            print(f"-> Serial - {self.name} :: {e}")