Single API⚓
Class Camera
⚓
Camera device object, based on threading.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
device name |
'default' |
src |
Union[int, str] |
device source |
0 |
reconnectDelay |
Union[int, float] |
wait time for try reopen camera device |
5 |
fps |
Optional[int] |
frames per second, if it's None it will set automatically |
None |
verbose |
bool |
display info messages? |
False |
size |
Union[list, tuple] |
tuple or list with a dimension of the image |
None |
flipX |
bool |
flip image horizontally? |
False |
flipY |
bool |
flip image vertically? |
False |
emitterIsEnabled |
bool |
disable on/emit events (callbacks execution) |
False |
backgroundIsEnabled |
bool |
if some error is produced with the camera it will display a black frame with a message. |
False |
text |
str |
background message |
'Device not available' |
font |
None |
cv2 font |
0 |
fontScale |
Union[int, float] |
cv2 font scale |
0.5 |
fontColor |
Union[tuple, list] |
bgr tuple for set the color of the text |
[255, 255, 255] |
thickness |
int |
font thickness |
1 |
queueModeEnabled |
bool |
enable queue mode? |
False |
queueMaxSize |
int |
queue maxsize |
96 |
processing |
Callable |
a function to performing some image processing |
None |
processingParams |
dict |
params for processing functions |
{} |
Events
frame-ready: emits a new frame when it's available. frame-available: emits a notification when a new frame it's available.
Examples:
camera = Camera(src=0, emitterIsEnabled) camera.on('frame-ready', lambda frame: print('frame is ready:', type(frame)))
Source code in remio/camio.py
class Camera(Emitter):
"""Camera device object, based on threading.
Args:
name: device name
src: device source
reconnectDelay: wait time for try reopen camera device
fps: frames per second, if it's None it will set automatically
verbose: display info messages?
size: tuple or list with a dimension of the image
flipX: flip image horizontally?
flipY: flip image vertically?
emitterIsEnabled: disable on/emit events (callbacks execution)
backgroundIsEnabled: if some error is produced with the camera it will display
a black frame with a message.
text: background message
font: cv2 font
fontScale: cv2 font scale
fontColor: bgr tuple for set the color of the text
thickness: font thickness
queueModeEnabled: enable queue mode?
queueMaxSize: queue maxsize
processing: a function to performing some image processing
processingParams: params for processing functions
Events:
frame-ready: emits a new frame when it's available.
frame-available: emits a notification when a new frame it's available.
Example:
camera = Camera(src=0, emitterIsEnabled)
camera.on('frame-ready', lambda frame: print('frame is ready:', type(frame)))
"""
def __init__(
self,
name: str = "default",
src: Union[int, str] = 0,
reconnectDelay: Union[int, float] = 5,
fps: Union[int, None] = None,
verbose: bool = False,
size: Union[list, tuple, None] = None,
flipX: bool = False,
flipY: bool = False,
emitterIsEnabled: bool = False,
backgroundIsEnabled: bool = False,
text: str = "Device not available",
font: None = cv2.FONT_HERSHEY_SIMPLEX,
fontScale: Union[int, float] = 0.5,
fontColor: Union[tuple, list] = [255, 255, 255],
thickness: int = 1,
queueModeEnabled: bool = False,
queueMaxSize: int = 96,
processing: Callable = None,
processingParams: dict = {},
encoderIsEnabled: bool = True,
encoderParams: dict = {},
*args,
**kwargs,
):
super(Camera, self).__init__(emitterIsEnabled=emitterIsEnabled, *args, **kwargs)
self.src = src
self.name = name
self.frame = None
self.fps = fps
self.verbose = verbose
self.size = size
self.flipX = flipX
self.flipY = flipY
self.backgroundIsEnabled = backgroundIsEnabled
self.background = None
self.processing = processing
self.processingParams = processingParams
self.queueModeEnabled = queueModeEnabled
self.queue = Queue(maxsize=queueMaxSize)
self.frame64 = None
self.encoderIsEnabled = encoderIsEnabled
self.encoder = MJPEGEncoder(**encoderParams)
self.isNewFrame = True
if self.fps is not None:
self.delay = 1 / self.fps
else:
self.delay = 0.1
self.defaultDelay = self.delay
self.reconnectDelay = reconnectDelay
self.defaultSize = [720, 1280, 3]
self.emitterIsEnabled = emitterIsEnabled
self.device = None
self.thread = Thread(target=self.run, name="camera-thread", daemon=True)
self.running = Event()
self.pauseEvent = Event()
self.readEvent = Event()
self.readLock = Lock()
self.resume()
if self.backgroundIsEnabled:
self.enableBackground(
size=self.size,
text=text,
font=font,
fontColor=fontColor,
fontScale=fontScale,
thickness=thickness,
)
def __del__(self):
self.stop()
def createBackground(
self,
text: str = "Device not available",
font: None = cv2.FONT_HERSHEY_SIMPLEX,
fontScale: Union[int, float] = 0.5,
fontColor: Union[tuple, list] = [255, 255, 255],
thickness: int = 1,
size: Union[tuple, list, None] = None,
):
"""It creates a custom background as numpy array (image) with a text message.
Args:
text: message to be displayed on the center of the background.
font: cv2 font family
fontScale: scale of the font
fontColor: color of the font
thickness: thickness of the font
size: tuple, list with the size of the background.
If it's None, size will be set automatically.
"""
if size is None:
size = self.defaultSize
if len(size) == 2:
size = size[::-1]
size.append(3)
background = np.zeros(size, dtype=np.uint8)
textsize = cv2.getTextSize(text, font, fontScale, thickness)[0]
h, w, _ = size
tw, th = textsize
origin = (int((w - tw) / 2), int((h - th) / 2))
background = cv2.putText(
background, text, origin, font, fontScale, fontColor, thickness
)
return background
def enableBackground(
self,
text: str = "Device not available.",
font: None = cv2.FONT_HERSHEY_COMPLEX,
fontScale: Union[int, float] = 2,
fontColor: Union[tuple, list] = [255, 255, 255],
thickness: int = 3,
size: Union[tuple, list, None] = None,
):
"""It enables background as a frame.
Args:
text: message to be displayed on the center of the background.
font: cv2 font family
fontScale: scale of the font
fontColor: color of the font
thickness: thickness of the font
size: tuple, list with the size of the background. If it's None,
size will be set automatically.
"""
self.backgroundIsEnabled = True
self.background = self.createBackground(
text=text,
font=font,
fontScale=fontScale,
fontColor=fontColor,
thickness=thickness,
size=size,
)
def disableBackground(self):
"""Disables the background."""
self.backgroundIsEnabled = False
self.background = None
def clearFrame(self):
"""Clears the current frame."""
self.frame = None
self.frame64 = None
def start(self):
"""Starts the read loop."""
self.thread.start()
return self
def getProcessingParams(self):
"""Returns processings params (kwargs)."""
if self.processingParams is None:
self.processingParams = {}
return self.processingParams
def hasProcessing(self):
"""Checks if a processing function is available."""
return self.processing is not None
def isConnected(self) -> bool:
"""Checks if a camera device is available."""
if self.device is not None:
return self.device.isOpened()
return False
def loadDevice(self):
"""Loads a camera device."""
self.device = cv2.VideoCapture(self.src)
if not self.isConnected():
print(f"-> {self.name} : {self.src} :: could not be connected.")
def reconnect(self):
"""Tries to reconnect with the camera device."""
self.readEvent.clear()
self.loadDevice()
time.sleep(self.reconnectDelay)
self.readEvent.set()
def resume(self):
"""Resumes the read loop."""
self.pauseEvent.set()
def pause(self):
"""Pauses the read loop."""
self.pauseEvent.clear()
def setPause(self, value: bool = True):
"""Updates the pause/resume state."""
if value:
self.pause()
else:
self.resume()
def needAPause(self):
"""Pauses or resume the read loop."""
self.pauseEvent.wait()
def preprocess(self, frame: np.ndarray = None) -> np.ndarray:
"""Preprocess the frame.
Args:
frame: an image array
"""
if self.size is not None:
frame = cv2.resize(frame, self.size[:2])
if self.flipX:
frame = cv2.flip(frame, 1)
if self.flipY:
frame = cv2.flip(frame, 0)
return frame
def process(self, frame: np.ndarray = None) -> np.ndarray:
"""Executes the processing function."""
if self.hasProcessing():
kwargs = self.getProcessingParams()
frame = self.processing(frame, **kwargs)
return frame
def update(self):
"""Tries to read a frame from the camera."""
self.readEvent.clear()
self.readLock.acquire()
deviceIsAvailable, frame = self.device.read()
if deviceIsAvailable:
frame = self.preprocess(frame)
self.frame = self.process(frame)
if self.emitterIsEnabled:
self.emit("frame-ready", {self.name: self.frame})
self.emit("frame-available", self.name)
if self.queueModeEnabled:
self.queue.put(self.frame)
self.readEvent.set()
if self.readLock.locked():
self.readLock.release()
def read(self, timeout: Union[float, int, None] = 0) -> Union[None, np.ndarray]:
"""Returns a frame or a background.
Args:
timeout: max time in seconds to lock operation
"""
if self.isConnected():
while self.queueModeEnabled:
return self.queue.get(timeout=timeout)
return self.frame
else:
return self.background
def applyDelay(self):
"""If a fps was defined it will wait for respective delay."""
if self.fps is not None:
time.sleep(self.delay)
def run(self):
"""Executes the read loop."""
self.loadDevice()
self.running.set()
while self.running.is_set():
if self.isConnected():
self.update()
else:
self.reconnect()
self.applyDelay()
self.needAPause()
self.queueModeEnabled = False
self.readEvent.set()
self.device.release()
def getName(self) -> str:
"""Returns the name of the current device."""
return self.name
def getFrame(self) -> Union[None, np.ndarray]:
"""Returns the current frame."""
return self.frame
def getFrame64(self) -> Union[None, str]:
"""Returns the current frame on base64 format."""
return self.frame64
def getBackground(self) -> Union[None, np.ndarray]:
"""Returns the current background."""
if self.backgroundIsEnabled:
if self.background is None:
self.background = self.createBackground(size=self.size)
return self.background
def setSpeed(self, fps: Union[int, None] = 10):
"""Updates the frames per second (fps).
Args:
fps: frames per sec. If no parameter is passed, auto speed will be set.
"""
self.fps = fps
if self.fps is not None:
self.defaultDelay = 1 / self.fps
self.delay = self.defaultDelay
def setProcessing(self, processing: Callable = None, **kwargs):
"""Updates the processing function and its params (kwargs).
Args:
processing: a function for make some image processing
kwargs: kwargs (params) for the processing function
"""
self.processing = processing
self.processingParams = kwargs
def stop(self):
"""Stops the read loop."""
self.resume()
if self.readLock.locked():
self.readLock.release()
if self.running.is_set():
self.running.clear()
self.thread.join()
applyDelay(self)
⚓
clearFrame(self)
⚓
createBackground(self, text='Device not available', font=0, fontScale=0.5, fontColor=[255, 255, 255], thickness=1, size=None)
⚓
It creates a custom background as numpy array (image) with a text message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text |
str |
message to be displayed on the center of the background. |
'Device not available' |
font |
None |
cv2 font family |
0 |
fontScale |
Union[int, float] |
scale of the font |
0.5 |
fontColor |
Union[tuple, list] |
color of the font |
[255, 255, 255] |
thickness |
int |
thickness of the font |
1 |
size |
Union[tuple, list] |
tuple, list with the size of the background. If it's None, size will be set automatically. |
None |
Source code in remio/camio.py
def createBackground(
self,
text: str = "Device not available",
font: None = cv2.FONT_HERSHEY_SIMPLEX,
fontScale: Union[int, float] = 0.5,
fontColor: Union[tuple, list] = [255, 255, 255],
thickness: int = 1,
size: Union[tuple, list, None] = None,
):
"""It creates a custom background as numpy array (image) with a text message.
Args:
text: message to be displayed on the center of the background.
font: cv2 font family
fontScale: scale of the font
fontColor: color of the font
thickness: thickness of the font
size: tuple, list with the size of the background.
If it's None, size will be set automatically.
"""
if size is None:
size = self.defaultSize
if len(size) == 2:
size = size[::-1]
size.append(3)
background = np.zeros(size, dtype=np.uint8)
textsize = cv2.getTextSize(text, font, fontScale, thickness)[0]
h, w, _ = size
tw, th = textsize
origin = (int((w - tw) / 2), int((h - th) / 2))
background = cv2.putText(
background, text, origin, font, fontScale, fontColor, thickness
)
return background
disableBackground(self)
⚓
enableBackground(self, text='Device not available.', font=3, fontScale=2, fontColor=[255, 255, 255], thickness=3, size=None)
⚓
It enables background as a frame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text |
str |
message to be displayed on the center of the background. |
'Device not available.' |
font |
None |
cv2 font family |
3 |
fontScale |
Union[int, float] |
scale of the font |
2 |
fontColor |
Union[tuple, list] |
color of the font |
[255, 255, 255] |
thickness |
int |
thickness of the font |
3 |
size |
Union[tuple, list] |
tuple, list with the size of the background. If it's None, size will be set automatically. |
None |
Source code in remio/camio.py
def enableBackground(
self,
text: str = "Device not available.",
font: None = cv2.FONT_HERSHEY_COMPLEX,
fontScale: Union[int, float] = 2,
fontColor: Union[tuple, list] = [255, 255, 255],
thickness: int = 3,
size: Union[tuple, list, None] = None,
):
"""It enables background as a frame.
Args:
text: message to be displayed on the center of the background.
font: cv2 font family
fontScale: scale of the font
fontColor: color of the font
thickness: thickness of the font
size: tuple, list with the size of the background. If it's None,
size will be set automatically.
"""
self.backgroundIsEnabled = True
self.background = self.createBackground(
text=text,
font=font,
fontScale=fontScale,
fontColor=fontColor,
thickness=thickness,
size=size,
)
getBackground(self)
⚓
getFrame(self)
⚓
getFrame64(self)
⚓
getName(self)
⚓
getProcessingParams(self)
⚓
hasProcessing(self)
⚓
isConnected(self)
⚓
loadDevice(self)
⚓
needAPause(self)
⚓
pause(self)
⚓
preprocess(self, frame=None)
⚓
Preprocess the frame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame |
ndarray |
an image array |
None |
Source code in remio/camio.py
process(self, frame=None)
⚓
read(self, timeout=0)
⚓
Returns a frame or a background.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout |
Union[float, int] |
max time in seconds to lock operation |
0 |
Source code in remio/camio.py
def read(self, timeout: Union[float, int, None] = 0) -> Union[None, np.ndarray]:
"""Returns a frame or a background.
Args:
timeout: max time in seconds to lock operation
"""
if self.isConnected():
while self.queueModeEnabled:
return self.queue.get(timeout=timeout)
return self.frame
else:
return self.background
reconnect(self)
⚓
resume(self)
⚓
run(self)
⚓
Executes the read loop.
Source code in remio/camio.py
setPause(self, value=True)
⚓
setProcessing(self, processing=None, **kwargs)
⚓
Updates the processing function and its params (kwargs).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
processing |
Callable |
a function for make some image processing |
None |
kwargs |
kwargs (params) for the processing function |
{} |
Source code in remio/camio.py
setSpeed(self, fps=10)
⚓
Updates the frames per second (fps).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fps |
Optional[int] |
frames per sec. If no parameter is passed, auto speed will be set. |
10 |
Source code in remio/camio.py
start(self)
⚓
stop(self)
⚓
update(self)
⚓
Tries to read a frame from the camera.
Source code in remio/camio.py
def update(self):
"""Tries to read a frame from the camera."""
self.readEvent.clear()
self.readLock.acquire()
deviceIsAvailable, frame = self.device.read()
if deviceIsAvailable:
frame = self.preprocess(frame)
self.frame = self.process(frame)
if self.emitterIsEnabled:
self.emit("frame-ready", {self.name: self.frame})
self.emit("frame-available", self.name)
if self.queueModeEnabled:
self.queue.put(self.frame)
self.readEvent.set()
if self.readLock.locked():
self.readLock.release()