Skip to content

Single API

Class Camera

Camera device object, based on threading.

Parameters:

Name Type Description Default
name str

device name

'default'
src Union[int, str]

device source

0
reconnectDelay Union[int, float]

wait time for try reopen camera device

5
fps Optional[int]

frames per second, if it's None it will set automatically

None
verbose bool

display info messages?

False
size Union[list, tuple]

tuple or list with a dimension of the image

None
flipX bool

flip image horizontally?

False
flipY bool

flip image vertically?

False
emitterIsEnabled bool

disable on/emit events (callbacks execution)

False
backgroundIsEnabled bool

if some error is produced with the camera it will display a black frame with a message.

False
text str

background message

'Device not available'
font None

cv2 font

0
fontScale Union[int, float]

cv2 font scale

0.5
fontColor Union[tuple, list]

bgr tuple for set the color of the text

[255, 255, 255]
thickness int

font thickness

1
queueModeEnabled bool

enable queue mode?

False
queueMaxSize int

queue maxsize

96
processing Callable

a function to performing some image processing

None
processingParams dict

params for processing functions

{}

Events

frame-ready: emits a new frame when it's available. frame-available: emits a notification when a new frame it's available.

Examples:

camera = Camera(src=0, emitterIsEnabled) camera.on('frame-ready', lambda frame: print('frame is ready:', type(frame)))

Source code in remio/camio.py
class Camera(Emitter):
    """Camera device object, based on threading.

    Args:
        name: device name
        src: device source
        reconnectDelay: wait time for try reopen camera device
        fps: frames per second, if it's None it will set automatically
        verbose: display info messages?
        size: tuple or list with a dimension of the image
        flipX: flip image horizontally?
        flipY: flip image vertically?
        emitterIsEnabled: disable on/emit events (callbacks execution)
        backgroundIsEnabled: if some error is produced with the camera it will display
                          a black frame with a message.
        text: background message
        font: cv2 font
        fontScale: cv2 font scale
        fontColor: bgr tuple for set the color of the text
        thickness: font thickness
        queueModeEnabled: enable queue mode?
        queueMaxSize: queue maxsize
        processing: a function to performing some image processing
        processingParams: params for processing functions

    Events:
        frame-ready: emits a new frame when it's available.
        frame-available: emits a notification when a new frame it's available.

    Example:
        camera = Camera(src=0, emitterIsEnabled)
        camera.on('frame-ready', lambda frame: print('frame is ready:', type(frame)))
    """

    def __init__(
        self,
        name: str = "default",
        src: Union[int, str] = 0,
        reconnectDelay: Union[int, float] = 5,
        fps: Union[int, None] = None,
        verbose: bool = False,
        size: Union[list, tuple, None] = None,
        flipX: bool = False,
        flipY: bool = False,
        emitterIsEnabled: bool = False,
        backgroundIsEnabled: bool = False,
        text: str = "Device not available",
        font: None = cv2.FONT_HERSHEY_SIMPLEX,
        fontScale: Union[int, float] = 0.5,
        fontColor: Union[tuple, list] = [255, 255, 255],
        thickness: int = 1,
        queueModeEnabled: bool = False,
        queueMaxSize: int = 96,
        processing: Callable = None,
        processingParams: dict = {},
        encoderIsEnabled: bool = True,
        encoderParams: dict = {},
        *args,
        **kwargs,
    ):
        super(Camera, self).__init__(emitterIsEnabled=emitterIsEnabled, *args, **kwargs)
        self.src = src
        self.name = name
        self.frame = None
        self.fps = fps
        self.verbose = verbose
        self.size = size
        self.flipX = flipX
        self.flipY = flipY
        self.backgroundIsEnabled = backgroundIsEnabled
        self.background = None
        self.processing = processing
        self.processingParams = processingParams
        self.queueModeEnabled = queueModeEnabled
        self.queue = Queue(maxsize=queueMaxSize)

        self.frame64 = None
        self.encoderIsEnabled = encoderIsEnabled
        self.encoder = MJPEGEncoder(**encoderParams)

        self.isNewFrame = True
        if self.fps is not None:
            self.delay = 1 / self.fps
        else:
            self.delay = 0.1

        self.defaultDelay = self.delay
        self.reconnectDelay = reconnectDelay
        self.defaultSize = [720, 1280, 3]

        self.emitterIsEnabled = emitterIsEnabled
        self.device = None
        self.thread = Thread(target=self.run, name="camera-thread", daemon=True)

        self.running = Event()
        self.pauseEvent = Event()
        self.readEvent = Event()
        self.readLock = Lock()

        self.resume()

        if self.backgroundIsEnabled:
            self.enableBackground(
                size=self.size,
                text=text,
                font=font,
                fontColor=fontColor,
                fontScale=fontScale,
                thickness=thickness,
            )

    def __del__(self):
        self.stop()

    def createBackground(
        self,
        text: str = "Device not available",
        font: None = cv2.FONT_HERSHEY_SIMPLEX,
        fontScale: Union[int, float] = 0.5,
        fontColor: Union[tuple, list] = [255, 255, 255],
        thickness: int = 1,
        size: Union[tuple, list, None] = None,
    ):
        """It creates a custom background as numpy array (image) with a text message.

        Args:
            text: message to be displayed on the center of the background.
            font: cv2 font family
            fontScale: scale of the font
            fontColor: color of the font
            thickness: thickness of the font
            size: tuple, list with the size of the background.
                        If it's None, size will be set automatically.
        """
        if size is None:
            size = self.defaultSize

        if len(size) == 2:
            size = size[::-1]
            size.append(3)

        background = np.zeros(size, dtype=np.uint8)
        textsize = cv2.getTextSize(text, font, fontScale, thickness)[0]
        h, w, _ = size
        tw, th = textsize
        origin = (int((w - tw) / 2), int((h - th) / 2))
        background = cv2.putText(
            background, text, origin, font, fontScale, fontColor, thickness
        )
        return background

    def enableBackground(
        self,
        text: str = "Device not available.",
        font: None = cv2.FONT_HERSHEY_COMPLEX,
        fontScale: Union[int, float] = 2,
        fontColor: Union[tuple, list] = [255, 255, 255],
        thickness: int = 3,
        size: Union[tuple, list, None] = None,
    ):
        """It enables background as a frame.

        Args:
            text: message to be displayed on the center of the background.
            font: cv2 font family
            fontScale: scale of the font
            fontColor: color of the font
            thickness: thickness of the font
            size: tuple, list with the size of the background. If it's None,
                        size will be set automatically.
        """
        self.backgroundIsEnabled = True
        self.background = self.createBackground(
            text=text,
            font=font,
            fontScale=fontScale,
            fontColor=fontColor,
            thickness=thickness,
            size=size,
        )

    def disableBackground(self):
        """Disables the background."""
        self.backgroundIsEnabled = False
        self.background = None

    def clearFrame(self):
        """Clears the current frame."""
        self.frame = None
        self.frame64 = None

    def start(self):
        """Starts the read loop."""
        self.thread.start()
        return self

    def getProcessingParams(self):
        """Returns processings params (kwargs)."""
        if self.processingParams is None:
            self.processingParams = {}
        return self.processingParams

    def hasProcessing(self):
        """Checks if a processing function is available."""
        return self.processing is not None

    def isConnected(self) -> bool:
        """Checks if a camera device is available."""
        if self.device is not None:
            return self.device.isOpened()
        return False

    def loadDevice(self):
        """Loads a camera device."""
        self.device = cv2.VideoCapture(self.src)
        if not self.isConnected():
            print(f"-> {self.name} : {self.src} :: could not be connected.")

    def reconnect(self):
        """Tries to reconnect with the camera device."""
        self.readEvent.clear()
        self.loadDevice()
        time.sleep(self.reconnectDelay)
        self.readEvent.set()

    def resume(self):
        """Resumes the read loop."""
        self.pauseEvent.set()

    def pause(self):
        """Pauses the read loop."""
        self.pauseEvent.clear()

    def setPause(self, value: bool = True):
        """Updates the pause/resume state."""
        if value:
            self.pause()
        else:
            self.resume()

    def needAPause(self):
        """Pauses or resume the read loop."""
        self.pauseEvent.wait()

    def preprocess(self, frame: np.ndarray = None) -> np.ndarray:
        """Preprocess the frame.

        Args:
            frame: an image array
        """
        if self.size is not None:
            frame = cv2.resize(frame, self.size[:2])

        if self.flipX:
            frame = cv2.flip(frame, 1)

        if self.flipY:
            frame = cv2.flip(frame, 0)

        return frame

    def process(self, frame: np.ndarray = None) -> np.ndarray:
        """Executes the processing function."""
        if self.hasProcessing():
            kwargs = self.getProcessingParams()
            frame = self.processing(frame, **kwargs)
        return frame

    def update(self):
        """Tries to read a frame from the camera."""
        self.readEvent.clear()
        self.readLock.acquire()

        deviceIsAvailable, frame = self.device.read()

        if deviceIsAvailable:

            frame = self.preprocess(frame)
            self.frame = self.process(frame)

            if self.emitterIsEnabled:
                self.emit("frame-ready", {self.name: self.frame})
                self.emit("frame-available", self.name)

            if self.queueModeEnabled:
                self.queue.put(self.frame)

        self.readEvent.set()

        if self.readLock.locked():
            self.readLock.release()

    def read(self, timeout: Union[float, int, None] = 0) -> Union[None, np.ndarray]:
        """Returns a frame or a background.

        Args:
            timeout: max time in seconds to lock operation
        """
        if self.isConnected():
            while self.queueModeEnabled:
                return self.queue.get(timeout=timeout)
            return self.frame
        else:
            return self.background

    def applyDelay(self):
        """If a fps was defined it will wait for respective delay."""
        if self.fps is not None:
            time.sleep(self.delay)

    def run(self):
        """Executes the read loop."""
        self.loadDevice()
        self.running.set()

        while self.running.is_set():
            if self.isConnected():
                self.update()
            else:
                self.reconnect()
            self.applyDelay()
            self.needAPause()

        self.queueModeEnabled = False
        self.readEvent.set()
        self.device.release()

    def getName(self) -> str:
        """Returns the name of the current device."""
        return self.name

    def getFrame(self) -> Union[None, np.ndarray]:
        """Returns the current frame."""
        return self.frame

    def getFrame64(self) -> Union[None, str]:
        """Returns the current frame on base64 format."""
        return self.frame64

    def getBackground(self) -> Union[None, np.ndarray]:
        """Returns the current background."""
        if self.backgroundIsEnabled:
            if self.background is None:
                self.background = self.createBackground(size=self.size)
            return self.background

    def setSpeed(self, fps: Union[int, None] = 10):
        """Updates the frames per second (fps).
        Args:
            fps: frames per sec. If no parameter is passed, auto speed will be set.
        """
        self.fps = fps
        if self.fps is not None:
            self.defaultDelay = 1 / self.fps
            self.delay = self.defaultDelay

    def setProcessing(self, processing: Callable = None, **kwargs):
        """Updates the processing function and its params (kwargs).

        Args:
            processing: a function for make some image processing
            kwargs: kwargs (params) for the processing function
        """
        self.processing = processing
        self.processingParams = kwargs

    def stop(self):
        """Stops the read loop."""
        self.resume()

        if self.readLock.locked():
            self.readLock.release()

        if self.running.is_set():
            self.running.clear()
            self.thread.join()

applyDelay(self)

If a fps was defined it will wait for respective delay.

Source code in remio/camio.py
def applyDelay(self):
    """If a fps was defined it will wait for respective delay."""
    if self.fps is not None:
        time.sleep(self.delay)

clearFrame(self)

Clears the current frame.

Source code in remio/camio.py
def clearFrame(self):
    """Clears the current frame."""
    self.frame = None
    self.frame64 = None

createBackground(self, text='Device not available', font=0, fontScale=0.5, fontColor=[255, 255, 255], thickness=1, size=None)

It creates a custom background as numpy array (image) with a text message.

Parameters:

Name Type Description Default
text str

message to be displayed on the center of the background.

'Device not available'
font None

cv2 font family

0
fontScale Union[int, float]

scale of the font

0.5
fontColor Union[tuple, list]

color of the font

[255, 255, 255]
thickness int

thickness of the font

1
size Union[tuple, list]

tuple, list with the size of the background. If it's None, size will be set automatically.

None
Source code in remio/camio.py
def createBackground(
    self,
    text: str = "Device not available",
    font: None = cv2.FONT_HERSHEY_SIMPLEX,
    fontScale: Union[int, float] = 0.5,
    fontColor: Union[tuple, list] = [255, 255, 255],
    thickness: int = 1,
    size: Union[tuple, list, None] = None,
):
    """It creates a custom background as numpy array (image) with a text message.

    Args:
        text: message to be displayed on the center of the background.
        font: cv2 font family
        fontScale: scale of the font
        fontColor: color of the font
        thickness: thickness of the font
        size: tuple, list with the size of the background.
                    If it's None, size will be set automatically.
    """
    if size is None:
        size = self.defaultSize

    if len(size) == 2:
        size = size[::-1]
        size.append(3)

    background = np.zeros(size, dtype=np.uint8)
    textsize = cv2.getTextSize(text, font, fontScale, thickness)[0]
    h, w, _ = size
    tw, th = textsize
    origin = (int((w - tw) / 2), int((h - th) / 2))
    background = cv2.putText(
        background, text, origin, font, fontScale, fontColor, thickness
    )
    return background

disableBackground(self)

Disables the background.

Source code in remio/camio.py
def disableBackground(self):
    """Disables the background."""
    self.backgroundIsEnabled = False
    self.background = None

enableBackground(self, text='Device not available.', font=3, fontScale=2, fontColor=[255, 255, 255], thickness=3, size=None)

It enables background as a frame.

Parameters:

Name Type Description Default
text str

message to be displayed on the center of the background.

'Device not available.'
font None

cv2 font family

3
fontScale Union[int, float]

scale of the font

2
fontColor Union[tuple, list]

color of the font

[255, 255, 255]
thickness int

thickness of the font

3
size Union[tuple, list]

tuple, list with the size of the background. If it's None, size will be set automatically.

None
Source code in remio/camio.py
def enableBackground(
    self,
    text: str = "Device not available.",
    font: None = cv2.FONT_HERSHEY_COMPLEX,
    fontScale: Union[int, float] = 2,
    fontColor: Union[tuple, list] = [255, 255, 255],
    thickness: int = 3,
    size: Union[tuple, list, None] = None,
):
    """It enables background as a frame.

    Args:
        text: message to be displayed on the center of the background.
        font: cv2 font family
        fontScale: scale of the font
        fontColor: color of the font
        thickness: thickness of the font
        size: tuple, list with the size of the background. If it's None,
                    size will be set automatically.
    """
    self.backgroundIsEnabled = True
    self.background = self.createBackground(
        text=text,
        font=font,
        fontScale=fontScale,
        fontColor=fontColor,
        thickness=thickness,
        size=size,
    )

getBackground(self)

Returns the current background.

Source code in remio/camio.py
def getBackground(self) -> Union[None, np.ndarray]:
    """Returns the current background."""
    if self.backgroundIsEnabled:
        if self.background is None:
            self.background = self.createBackground(size=self.size)
        return self.background

getFrame(self)

Returns the current frame.

Source code in remio/camio.py
def getFrame(self) -> Union[None, np.ndarray]:
    """Returns the current frame."""
    return self.frame

getFrame64(self)

Returns the current frame on base64 format.

Source code in remio/camio.py
def getFrame64(self) -> Union[None, str]:
    """Returns the current frame on base64 format."""
    return self.frame64

getName(self)

Returns the name of the current device.

Source code in remio/camio.py
def getName(self) -> str:
    """Returns the name of the current device."""
    return self.name

getProcessingParams(self)

Returns processings params (kwargs).

Source code in remio/camio.py
def getProcessingParams(self):
    """Returns processings params (kwargs)."""
    if self.processingParams is None:
        self.processingParams = {}
    return self.processingParams

hasProcessing(self)

Checks if a processing function is available.

Source code in remio/camio.py
def hasProcessing(self):
    """Checks if a processing function is available."""
    return self.processing is not None

isConnected(self)

Checks if a camera device is available.

Source code in remio/camio.py
def isConnected(self) -> bool:
    """Checks if a camera device is available."""
    if self.device is not None:
        return self.device.isOpened()
    return False

loadDevice(self)

Loads a camera device.

Source code in remio/camio.py
def loadDevice(self):
    """Loads a camera device."""
    self.device = cv2.VideoCapture(self.src)
    if not self.isConnected():
        print(f"-> {self.name} : {self.src} :: could not be connected.")

needAPause(self)

Pauses or resume the read loop.

Source code in remio/camio.py
def needAPause(self):
    """Pauses or resume the read loop."""
    self.pauseEvent.wait()

pause(self)

Pauses the read loop.

Source code in remio/camio.py
def pause(self):
    """Pauses the read loop."""
    self.pauseEvent.clear()

preprocess(self, frame=None)

Preprocess the frame.

Parameters:

Name Type Description Default
frame ndarray

an image array

None
Source code in remio/camio.py
def preprocess(self, frame: np.ndarray = None) -> np.ndarray:
    """Preprocess the frame.

    Args:
        frame: an image array
    """
    if self.size is not None:
        frame = cv2.resize(frame, self.size[:2])

    if self.flipX:
        frame = cv2.flip(frame, 1)

    if self.flipY:
        frame = cv2.flip(frame, 0)

    return frame

process(self, frame=None)

Executes the processing function.

Source code in remio/camio.py
def process(self, frame: np.ndarray = None) -> np.ndarray:
    """Executes the processing function."""
    if self.hasProcessing():
        kwargs = self.getProcessingParams()
        frame = self.processing(frame, **kwargs)
    return frame

read(self, timeout=0)

Returns a frame or a background.

Parameters:

Name Type Description Default
timeout Union[float, int]

max time in seconds to lock operation

0
Source code in remio/camio.py
def read(self, timeout: Union[float, int, None] = 0) -> Union[None, np.ndarray]:
    """Returns a frame or a background.

    Args:
        timeout: max time in seconds to lock operation
    """
    if self.isConnected():
        while self.queueModeEnabled:
            return self.queue.get(timeout=timeout)
        return self.frame
    else:
        return self.background

reconnect(self)

Tries to reconnect with the camera device.

Source code in remio/camio.py
def reconnect(self):
    """Tries to reconnect with the camera device."""
    self.readEvent.clear()
    self.loadDevice()
    time.sleep(self.reconnectDelay)
    self.readEvent.set()

resume(self)

Resumes the read loop.

Source code in remio/camio.py
def resume(self):
    """Resumes the read loop."""
    self.pauseEvent.set()

run(self)

Executes the read loop.

Source code in remio/camio.py
def run(self):
    """Executes the read loop."""
    self.loadDevice()
    self.running.set()

    while self.running.is_set():
        if self.isConnected():
            self.update()
        else:
            self.reconnect()
        self.applyDelay()
        self.needAPause()

    self.queueModeEnabled = False
    self.readEvent.set()
    self.device.release()

setPause(self, value=True)

Updates the pause/resume state.

Source code in remio/camio.py
def setPause(self, value: bool = True):
    """Updates the pause/resume state."""
    if value:
        self.pause()
    else:
        self.resume()

setProcessing(self, processing=None, **kwargs)

Updates the processing function and its params (kwargs).

Parameters:

Name Type Description Default
processing Callable

a function for make some image processing

None
kwargs

kwargs (params) for the processing function

{}
Source code in remio/camio.py
def setProcessing(self, processing: Callable = None, **kwargs):
    """Updates the processing function and its params (kwargs).

    Args:
        processing: a function for make some image processing
        kwargs: kwargs (params) for the processing function
    """
    self.processing = processing
    self.processingParams = kwargs

setSpeed(self, fps=10)

Updates the frames per second (fps).

Parameters:

Name Type Description Default
fps Optional[int]

frames per sec. If no parameter is passed, auto speed will be set.

10
Source code in remio/camio.py
def setSpeed(self, fps: Union[int, None] = 10):
    """Updates the frames per second (fps).
    Args:
        fps: frames per sec. If no parameter is passed, auto speed will be set.
    """
    self.fps = fps
    if self.fps is not None:
        self.defaultDelay = 1 / self.fps
        self.delay = self.defaultDelay

start(self)

Starts the read loop.

Source code in remio/camio.py
def start(self):
    """Starts the read loop."""
    self.thread.start()
    return self

stop(self)

Stops the read loop.

Source code in remio/camio.py
def stop(self):
    """Stops the read loop."""
    self.resume()

    if self.readLock.locked():
        self.readLock.release()

    if self.running.is_set():
        self.running.clear()
        self.thread.join()

update(self)

Tries to read a frame from the camera.

Source code in remio/camio.py
def update(self):
    """Tries to read a frame from the camera."""
    self.readEvent.clear()
    self.readLock.acquire()

    deviceIsAvailable, frame = self.device.read()

    if deviceIsAvailable:

        frame = self.preprocess(frame)
        self.frame = self.process(frame)

        if self.emitterIsEnabled:
            self.emit("frame-ready", {self.name: self.frame})
            self.emit("frame-available", self.name)

        if self.queueModeEnabled:
            self.queue.put(self.frame)

    self.readEvent.set()

    if self.readLock.locked():
        self.readLock.release()