API⚓
Class Mockup
⚓
A class for manage mutltiple cameras, multiple serial devices and socketio communications.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cameraSettings |
dict |
settings for camera devices. |
{} |
serialSettings |
dict |
settings for serial devices. |
{} |
serverSettings |
dict |
settings to connect with a socketio server. |
{} |
streamSettings |
dict |
settings for stream devices. |
{} |
Source code in remio/mockup.py
class Mockup:
"""A class for manage mutltiple cameras, multiple serial devices and socketio communications.
Args:
cameraSettings: settings for camera devices.
serialSettings: settings for serial devices.
serverSettings: settings to connect with a socketio server.
streamSettings: settings for stream devices.
"""
def __init__(
self,
cameraSettings: dict = {},
serialSettings: dict = {},
serverSettings: dict = {},
streamSettings: dict = {},
*args,
**kwargs
):
self.camera = Cameras(devices=cameraSettings)
self.serial = Serials(devices=serialSettings)
self.socket = CustomSocketIO(**serverSettings)
self.streamer = SocketStreamer(
socket=self.socket, reader=self.camera.read, **streamSettings
)
self.waitEvent = Event()
self.waitEvent.clear()
signal.signal(signal.SIGINT, self.softStop)
signal.signal(signal.SIGTERM, self.softStop)
def __del__(self):
self.stop()
def start(
self,
camera: bool = False,
serial: bool = False,
socket: bool = False,
streamer: bool = False,
wait: bool = True,
):
"""It starts differents programs.
Args:
camera: start camera?
serial: start serial?
socket: start socket?
streamer: start stream?
wait: block the thread who calls this function?
"""
if camera:
self.camera.startAll()
if serial:
self.serial.startAll()
if socket:
self.socket.start()
if streamer:
self.streamer.start()
if wait:
self.waitEvent.wait()
def stop(self):
"""Stops all tasks of socketio, serial, camera and streamer threads/processes."""
self.camera.stopAll()
self.serial.stopAll()
self.streamer.stop()
self.socket.stop()
self.waitEvent.set()
def softStop(self, sig, frame):
"""Stops mockup exectution when a system signal is emitted, ex. CTRL + C."""
print(" Mockup:: STOPING...")
self.stop()
sys.exit(0)
softStop(self, sig, frame)
⚓
start(self, camera=False, serial=False, socket=False, streamer=False, wait=True)
⚓
It starts differents programs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
camera |
bool |
start camera? |
False |
serial |
bool |
start serial? |
False |
socket |
bool |
start socket? |
False |
streamer |
bool |
start stream? |
False |
wait |
bool |
block the thread who calls this function? |
True |
Source code in remio/mockup.py
def start(
self,
camera: bool = False,
serial: bool = False,
socket: bool = False,
streamer: bool = False,
wait: bool = True,
):
"""It starts differents programs.
Args:
camera: start camera?
serial: start serial?
socket: start socket?
streamer: start stream?
wait: block the thread who calls this function?
"""
if camera:
self.camera.startAll()
if serial:
self.serial.startAll()
if socket:
self.socket.start()
if streamer:
self.streamer.start()
if wait:
self.waitEvent.wait()
stop(self)
⚓
Stops all tasks of socketio, serial, camera and streamer threads/processes.