diff --git a/.github/workflows/calculate-size-delta.yml b/.github/workflows/calculate-size-delta.yml index e199789e..33179b48 100644 --- a/.github/workflows/calculate-size-delta.yml +++ b/.github/workflows/calculate-size-delta.yml @@ -5,6 +5,7 @@ on: permissions: contents: read + pull-requests: read jobs: build: diff --git a/containers/ei-models-runner/Dockerfile b/containers/ei-models-runner/Dockerfile index b7a542fa..af80e73a 100644 --- a/containers/ei-models-runner/Dockerfile +++ b/containers/ei-models-runner/Dockerfile @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: MPL-2.0 -FROM public.ecr.aws/z9b3d4t5/inference-container-qc-adreno-702:4d7979284677b6bdb557abe8948fa1395dc89a63 +FROM public.ecr.aws/z9b3d4t5/inference-container-qc-adreno-702:a7ec92c6c9b2b9c94255baed18328cf4e2fda6d0 # Create the user and group needed to run the container as non-root RUN set -ex; \ diff --git a/pyproject.toml b/pyproject.toml index 477f8080..060c2be3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dev = [ "setuptools", "build", "pytest", + "pytest-asyncio", "websocket-client", "ruff", "docstring_parser>=0.16", diff --git a/src/arduino/app_bricks/camera_code_detection/README.md b/src/arduino/app_bricks/camera_code_detection/README.md index 786da81d..0b1b10c5 100644 --- a/src/arduino/app_bricks/camera_code_detection/README.md +++ b/src/arduino/app_bricks/camera_code_detection/README.md @@ -6,8 +6,8 @@ This Brick enables real-time barcode and QR code scanning from a camera video st The Camera Code Detection Brick allows you to: -- Capture frames from a USB camera. -- Configure camera settings (resolution and frame rate). +- Capture frames from a Camera (see Camera peripheral for supported cameras). +- Configure Camera settings (resolution and frame rate). - Define the type of code to detect: barcodes and/or QR codes. - Process detections with customizable callbacks. @@ -22,7 +22,7 @@ The Camera Code Detection Brick allows you to: ## Prerequisites -To use this Brick you should have a USB camera connected to your board. +To use this Brick you can choose to plug a camera to your board or use a network-connected camera. **Tip**: Use a USB-C® Hub with USB-A connectors to support commercial web cameras. @@ -37,9 +37,25 @@ def render_frame(frame): def handle_detected_code(frame, detection): ... -# Select the camera you want to use, its resolution and the max fps -detection = CameraCodeDetection(camera=0, resolution=(640, 360), fps=10) +detection = CameraCodeDetection() detection.on_frame(render_frame) detection.on_detection(handle_detected_code) -detection.start() + +App.run() ``` + +You can also select a specific camera to use: + +```python +from arduino.app_bricks.camera_code_detection import CameraCodeDetection + +def handle_detected_code(frame, detection): + ... + +# Select the camera you want to use, its resolution and the max fps +camera = Camera(camera="rtsp://...", resolution=(640, 360), fps=10) +detection = CameraCodeDetection(camera) +detection.on_detection(handle_detected_code) + +App.run() +``` \ No newline at end of file diff --git a/src/arduino/app_bricks/camera_code_detection/__init__.py b/src/arduino/app_bricks/camera_code_detection/__init__.py index 3bb7eb0e..c396a841 100644 --- a/src/arduino/app_bricks/camera_code_detection/__init__.py +++ b/src/arduino/app_bricks/camera_code_detection/__init__.py @@ -2,7 +2,6 @@ # # SPDX-License-Identifier: MPL-2.0 -from .detection import Detection, CameraCodeDetection -from .utils import draw_bounding_boxes, draw_bounding_box +from .detection import CameraCodeDetection, Detection -__all__ = ["CameraCodeDetection", "Detection", "draw_bounding_boxes", "draw_bounding_box"] +__all__ = ["CameraCodeDetection", "Detection"] diff --git a/src/arduino/app_bricks/camera_code_detection/detection.py b/src/arduino/app_bricks/camera_code_detection/detection.py index 6c67c972..48c79e14 100644 --- a/src/arduino/app_bricks/camera_code_detection/detection.py +++ b/src/arduino/app_bricks/camera_code_detection/detection.py @@ -6,12 +6,12 @@ import threading from typing import Callable -import cv2 from pyzbar.pyzbar import decode, ZBarSymbol, PyZbarError import numpy as np -from PIL.Image import Image +from PIL.Image import Image, fromarray -from arduino.app_peripherals.usb_camera import USBCamera +from arduino.app_peripherals.camera import Camera, BaseCamera +from arduino.app_utils.image import greyscale from arduino.app_utils import brick, Logger logger = Logger("CameraCodeDetection") @@ -44,7 +44,7 @@ class CameraCodeDetection: """Scans a camera video feed for QR codes and/or barcodes. Args: - camera (USBCamera): The USB camera instance. If None, a default camera will be initialized. + camera (BaseCamera): The camera instance to use for capturing video. If None, a default camera will be initialized. detect_qr (bool): Whether to detect QR codes. Defaults to True. detect_barcode (bool): Whether to detect barcodes. Defaults to True. @@ -55,7 +55,7 @@ class CameraCodeDetection: def __init__( self, - camera: USBCamera = None, + camera: BaseCamera = None, detect_qr: bool = True, detect_barcode: bool = True, ): @@ -63,10 +63,12 @@ def __init__( if detect_qr is False and detect_barcode is False: raise ValueError("At least one of 'detect_qr' or 'detect_barcode' must be True.") + self._camera = camera if camera else Camera() + self._detect_qr = detect_qr self._detect_barcode = detect_barcode - # These callbacks do not require locks as long as we're running on CPython + # These callbacks don't require locking as long as we're running on CPython self._on_frame_cb = None self._on_error_cb = None @@ -76,8 +78,6 @@ def __init__( self.already_seen_codes = set() - self._camera = camera if camera else USBCamera() - def start(self): """Start the detector and begin scanning for codes.""" self._camera.start() @@ -154,13 +154,13 @@ def loop(self): self._on_error(e) return - # Use grayscale for barcode/QR code detection - gs_frame = cv2.cvtColor(np.asarray(frame), cv2.COLOR_RGB2GRAY) - - self._on_frame(frame) + pil_frame = fromarray(frame) + self._on_frame(pil_frame) + # Use grayscale for barcode/QR code detection + gs_frame = greyscale(frame) detections = self._scan_frame(gs_frame) - self._on_detect(frame, detections) + self._on_detect(pil_frame, detections) def _on_frame(self, frame: Image): if self._on_frame_cb: @@ -170,7 +170,7 @@ def _on_frame(self, frame: Image): logger.error(f"Failed to run on_frame callback: {e}") self._on_error(e) - def _scan_frame(self, frame: cv2.typing.MatLike) -> list[Detection]: + def _scan_frame(self, frame: np.ndarray) -> list[Detection]: """Scan the frame for a single barcode or QR code.""" detections = [] diff --git a/src/arduino/app_bricks/camera_code_detection/examples/2_detection_list.py b/src/arduino/app_bricks/camera_code_detection/examples/2_detection_list.py index 7d63bb46..554b0f46 100644 --- a/src/arduino/app_bricks/camera_code_detection/examples/2_detection_list.py +++ b/src/arduino/app_bricks/camera_code_detection/examples/2_detection_list.py @@ -19,4 +19,4 @@ def on_codes_detected(frame: Image, detections: list[Detection]): detector = CameraCodeDetection() detector.on_detect(on_codes_detected) -App.run() # This will block until the app is stopped +App.run() diff --git a/src/arduino/app_bricks/camera_code_detection/examples/3_detection_with_overrides.py b/src/arduino/app_bricks/camera_code_detection/examples/3_detection_with_overrides.py index d128cd96..4d15cadd 100644 --- a/src/arduino/app_bricks/camera_code_detection/examples/3_detection_with_overrides.py +++ b/src/arduino/app_bricks/camera_code_detection/examples/3_detection_with_overrides.py @@ -6,7 +6,7 @@ # EXAMPLE_REQUIRES = "Requires an USB webcam connected to the Arduino board." from PIL.Image import Image from arduino.app_utils.app import App -from arduino.app_peripherals.usb_camera import USBCamera +from arduino.app_peripherals.usb_camera import Camera from arduino.app_bricks.camera_code_detection import CameraCodeDetection, Detection @@ -17,7 +17,7 @@ def on_code_detected(frame: Image, detection: Detection): # e.g., draw a bounding box, save it to a database or log it. -camera = USBCamera(camera=0, resolution=(640, 360), fps=10) +camera = Camera(camera=2, resolution=(640, 360), fps=10) detector = CameraCodeDetection(camera) detector.on_detect(on_code_detected) diff --git a/src/arduino/app_bricks/object_detection/README.md b/src/arduino/app_bricks/object_detection/README.md index 3234ca67..9489e695 100644 --- a/src/arduino/app_bricks/object_detection/README.md +++ b/src/arduino/app_bricks/object_detection/README.md @@ -23,23 +23,24 @@ The Object Detection Brick allows you to: ```python import os from arduino.app_bricks.object_detection import ObjectDetection +from arduino.app_utils.image import draw_bounding_boxes object_detection = ObjectDetection() -# Image frame can be as bytes or PIL image -frame = os.read("path/to/your/image.jpg") +# Image can be provided as bytes or PIL.Image +img = os.read("path/to/your/image.jpg") -out = object_detection.detect(frame) -# is it possible to customize image type, confidence level and box overlap -# out = object_detection.detect(frame, image_type = "png", confidence = 0.35, overlap = 0.5) +out = object_detection.detect(img) +# You can also provide a confidence level +# out = object_detection.detect(frame, confidence = 0.35) if out and "detection" in out: for i, obj_det in enumerate(out["detection"]): - # For every object detected, get its details + # For every object detected, print its details detected_object = obj_det.get("class_name", None) - bounding_box = obj_det.get("bounding_box_xyxy", None) confidence = obj_det.get("confidence", None) + bounding_box = obj_det.get("bounding_box_xyxy", None) -# draw the bounding box and key points on the image -out_image = object_detection.draw_bounding_boxes(frame, out) +# Draw the bounding boxes +out_image = draw_bounding_boxes(img, out) ``` diff --git a/src/arduino/app_bricks/object_detection/__init__.py b/src/arduino/app_bricks/object_detection/__init__.py index 87e0ef05..112a961d 100644 --- a/src/arduino/app_bricks/object_detection/__init__.py +++ b/src/arduino/app_bricks/object_detection/__init__.py @@ -4,7 +4,8 @@ from typing import Any from PIL import Image -from arduino.app_utils import brick, Logger, draw_bounding_boxes, Shape +from arduino.app_utils import brick, Logger +from arduino.app_utils.image import draw_bounding_boxes, Shape from arduino.app_internal.core import EdgeImpulseRunnerFacade logger = Logger("ObjectDetection") diff --git a/src/arduino/app_bricks/object_detection/examples/object_detection_example.py b/src/arduino/app_bricks/object_detection/examples/object_detection_example.py index 166f5b7c..51a4b4b4 100644 --- a/src/arduino/app_bricks/object_detection/examples/object_detection_example.py +++ b/src/arduino/app_bricks/object_detection/examples/object_detection_example.py @@ -3,23 +3,24 @@ # SPDX-License-Identifier: MPL-2.0 # EXAMPLE_NAME = "Object Detection" +import os from arduino.app_bricks.object_detection import ObjectDetection +from arduino.app_utils.image import draw_bounding_boxes object_detection = ObjectDetection() -# Image frame can be as bytes or PIL image -with open("image.png", "rb") as f: - frame = f.read() +# Image can be provided as bytes or PIL.Image +img = os.read("path/to/your/image.jpg") -out = object_detection.detect(frame) -# is it possible to customize image type, confidence level and box overlap -# out = object_detection.detect(frame, image_type = "png", confidence = 0.35, overlap = 0.5) +out = object_detection.detect(img) +# You can also provide a confidence level +# out = object_detection.detect(frame, confidence = 0.35) if out and "detection" in out: for i, obj_det in enumerate(out["detection"]): - # For every object detected, get its details + # For every object detected, print its details detected_object = obj_det.get("class_name", None) - bounding_box = obj_det.get("bounding_box_xyxy", None) confidence = obj_det.get("confidence", None) + bounding_box = obj_det.get("bounding_box_xyxy", None) -# draw the bounding box and key points on the image -out_image = object_detection.draw_bounding_boxes(frame, out) +# Draw the bounding boxes +out_image = draw_bounding_boxes(img, out) diff --git a/src/arduino/app_bricks/video_imageclassification/__init__.py b/src/arduino/app_bricks/video_imageclassification/__init__.py index e479939b..6ce10f77 100644 --- a/src/arduino/app_bricks/video_imageclassification/__init__.py +++ b/src/arduino/app_bricks/video_imageclassification/__init__.py @@ -2,16 +2,22 @@ # # SPDX-License-Identifier: MPL-2.0 -from arduino.app_utils import brick, Logger -from arduino.app_internal.core import load_brick_compose_file, resolve_address -from arduino.app_internal.core import EdgeImpulseRunnerFacade -import threading import time +import json +import inspect +import threading +import socket +import numpy as np from typing import Callable + from websockets.sync.client import connect, ClientConnection from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError -import json -import inspect + +from arduino.app_peripherals.camera import Camera, BaseCamera +from arduino.app_internal.core import load_brick_compose_file, resolve_address +from arduino.app_internal.core import EdgeImpulseRunnerFacade +from arduino.app_utils.image import compress_to_jpeg +from arduino.app_utils import brick, Logger logger = Logger("VideoImageClassification") @@ -25,10 +31,11 @@ class VideoImageClassification: ALL_HANDLERS_KEY = "__ALL" - def __init__(self, confidence: float = 0.3, debounce_sec: float = 0.0): + def __init__(self, camera: BaseCamera = None, confidence: float = 0.3, debounce_sec: float = 0.0): """Initialize the VideoImageClassification class. Args: + camera (BaseCamera): The camera instance to use for capturing video. If None, a default camera will be initialized. confidence (float): The minimum confidence level for a classification to be considered valid. Default is 0.3. debounce_sec (float): The minimum time in seconds between consecutive detections of the same object to avoid multiple triggers. Default is 0 seconds. @@ -36,6 +43,8 @@ def __init__(self, confidence: float = 0.3, debounce_sec: float = 0.0): Raises: RuntimeError: If the host address could not be resolved. """ + self._camera = camera if camera else Camera() + self._confidence = confidence self._debounce_sec = debounce_sec self._last_detected = {} @@ -114,40 +123,26 @@ def on_detect(self, object: str, callback: Callable[[], None]): self._handlers[object] = callback def start(self): - """Start the classification stream. - - This only sets the internal running flag. You must call - `execute` in a loop or a separate thread to actually begin receiving classification results. - """ + """Start the classification.""" + self._camera.start() self._is_running.set() def stop(self): - """Stop the classification stream and release resources. - - This clears the running flag. Any active `execute` loop - will exit gracefully at its next iteration. - """ + """Stop the classification and release resources.""" self._is_running.clear() + self._camera.stop() + + @brick.execute + def classification_loop(self): + """Classification main loop. - def execute(self): - """Run the main classification loop. - - Behavior: - - Opens a WebSocket connection to the model runner. - - Receives classification messages in real time. - - Filters classifications below the confidence threshold. - - Applies debounce rules before invoking callbacks. - - Retries on transient connection errors until stopped. - - Exceptions: - ConnectionClosedOK: - Raised to exit when the server closes the connection cleanly. - ConnectionClosedError, TimeoutError, ConnectionRefusedError: - Logged and retried with backoff. + Maintains WebSocket connection to the model runner and processes classification messages. + Retries on connection errors until stopped. """ while self._is_running.is_set(): try: with connect(self._uri) as ws: + logger.info("WebSocket connection established") while self._is_running.is_set(): try: message = ws.recv() @@ -157,21 +152,61 @@ def execute(self): except ConnectionClosedOK: raise except (TimeoutError, ConnectionRefusedError, ConnectionClosedError): - logger.warning(f"Connection lost. Retrying...") + logger.warning(f"WebSocket connection lost. Retrying...") raise except Exception as e: logger.exception(f"Failed to process detection: {e}") except ConnectionClosedOK: - logger.debug(f"Disconnected cleanly, exiting WebSocket read loop.") + logger.debug(f"WebSocket disconnected cleanly, exiting loop.") return except (TimeoutError, ConnectionRefusedError, ConnectionClosedError): logger.debug(f"Waiting for model runner. Retrying...") - import time - time.sleep(2) continue except Exception as e: logger.exception(f"Failed to establish WebSocket connection to {self._host}: {e}") + time.sleep(2) + + @brick.execute + def camera_loop(self): + """Camera main loop. + + Captures images from the camera and forwards them over the TCP connection. + Retries on connection errors until stopped. + """ + while self._is_running.is_set(): + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcp_socket: + tcp_socket.connect((self._host, 5050)) + logger.info(f"TCP connection established to {self._host}:5050") + + # Send a priming frame to initialize the EI pipeline and its web server + frame = np.zeros((320, 320, 3), dtype=np.uint8) + jpeg_frame = compress_to_jpeg(frame) + tcp_socket.sendall(jpeg_frame.tobytes()) + + while self._is_running.is_set(): + try: + frame = self._camera.capture() + if frame is None: + time.sleep(0.01) # Brief sleep if no image available + continue + + jpeg_frame = compress_to_jpeg(frame) + tcp_socket.sendall(jpeg_frame.tobytes()) + + except (BrokenPipeError, ConnectionResetError, OSError) as e: + logger.warning(f"TCP connection lost: {e}. Retrying...") + break + except Exception as e: + logger.exception(f"Error capturing/sending image: {e}") + + except (ConnectionRefusedError, OSError) as e: + logger.debug(f"TCP connection failed: {e}. Retrying in 2 seconds...") + time.sleep(2) + except Exception as e: + logger.exception(f"Unexpected error in TCP loop: {e}") + time.sleep(2) def _process_message(self, ws: ClientConnection, message: str): jmsg = json.loads(message) diff --git a/src/arduino/app_bricks/video_imageclassification/brick_compose.yaml b/src/arduino/app_bricks/video_imageclassification/brick_compose.yaml index 7e054acc..431f2e4c 100644 --- a/src/arduino/app_bricks/video_imageclassification/brick_compose.yaml +++ b/src/arduino/app_bricks/video_imageclassification/brick_compose.yaml @@ -9,13 +9,16 @@ services: max-size: "5m" max-file: "2" ports: - - ${BIND_ADDRESS:-0.0.0.0}:4912:4912 + - ${BIND_ADDRESS:-0.0.0.0}:5050:5050 # TCP input for video frames + - ${BIND_ADDRESS:-0.0.0.0}:4912:4912 # Embedded UI port volumes: - "${CUSTOM_MODEL_PATH:-/home/arduino/.arduino-bricks/ei-models/}:${CUSTOM_MODEL_PATH:-/home/arduino/.arduino-bricks/ei-models/}" - "/run/udev:/run/udev" - command: ["--model-file", "${EI_CLASSIFICATION_MODEL:-/models/ootb/ei/mobilenet-v2-224px.eim}", "--dont-print-predictions", "--mode", "streaming", "--preview-original-resolution", "--camera", "${VIDEO_DEVICE:-/dev/video1}"] + command: ["--model-file", "${EI_CLASSIFICATION_MODEL:-/models/ootb/ei/mobilenet-v2-224px.eim}", "--dont-print-predictions", "--mode", "streaming-tcp-server", "--preview-original-resolution"] healthcheck: - test: [ "CMD-SHELL", "wget -q --spider http://ei-video-classification-runner:4912 || exit 1" ] + # test: [ "CMD-SHELL", "wget -q --spider http://ei-video-classification-runner:4912 || exit 1" ] + # test: [ "CMD-SHELL", "pgrep -f gstreamer || pgrep -f gst-launch || exit 1" ] + test: [ "CMD-SHELL", "netstat -tuln | grep :5050 || exit 1" ] interval: 2s timeout: 2s retries: 25 diff --git a/src/arduino/app_bricks/video_objectdetection/__init__.py b/src/arduino/app_bricks/video_objectdetection/__init__.py index 2f48de1c..58ad8c16 100644 --- a/src/arduino/app_bricks/video_objectdetection/__init__.py +++ b/src/arduino/app_bricks/video_objectdetection/__init__.py @@ -2,16 +2,22 @@ # # SPDX-License-Identifier: MPL-2.0 -from arduino.app_utils import brick, Logger -from arduino.app_internal.core import load_brick_compose_file, resolve_address -from arduino.app_internal.core import EdgeImpulseRunnerFacade import time +import json +import inspect import threading +import socket +import numpy as np from typing import Callable + from websockets.sync.client import connect, ClientConnection from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError -import json -import inspect + +from arduino.app_peripherals.camera import Camera, BaseCamera +from arduino.app_internal.core import load_brick_compose_file, resolve_address +from arduino.app_internal.core import EdgeImpulseRunnerFacade +from arduino.app_utils.image.adjustments import compress_to_jpeg +from arduino.app_utils import brick, Logger logger = Logger("VideoObjectDetection") @@ -30,16 +36,19 @@ class VideoObjectDetection: ALL_HANDLERS_KEY = "__ALL" - def __init__(self, confidence: float = 0.3, debounce_sec: float = 0.0): + def __init__(self, camera: BaseCamera = None, confidence: float = 0.3, debounce_sec: float = 0.0): """Initialize the VideoObjectDetection class. Args: + camera (BaseCamera): The camera instance to use for capturing video. If None, a default camera will be initialized. confidence (float): Confidence level for detection. Default is 0.3 (30%). debounce_sec (float): Minimum seconds between repeated detections of the same object. Default is 0 seconds. Raises: RuntimeError: If the host address could not be resolved. """ + self._camera = camera if camera else Camera() + self._confidence = confidence self._debounce_sec = debounce_sec self._last_detected: dict[str, float] = {} @@ -107,32 +116,25 @@ def on_detect_all(self, callback: Callable[[dict], None]): def start(self): """Start the video object detection process.""" + self._camera.start() self._is_running.set() def stop(self): - """Stop the video object detection process.""" + """Stop the video object detection process and release resources.""" self._is_running.clear() + self._camera.stop() - def execute(self): - """Connect to the model runner and process messages until `stop` is called. - - Behavior: - - Establishes a WebSocket connection to the runner. - - Parses ``"hello"`` messages to capture model metadata and optionally - performs a threshold override to align the runner with the local setting. - - Parses ``"classification"`` messages, filters detections by confidence, - applies debounce, then invokes registered callbacks. - - Retries on transient WebSocket errors while running. - - Exceptions: - ConnectionClosedOK: - Propagated to exit cleanly when the server closes the connection. - ConnectionClosedError, TimeoutError, ConnectionRefusedError: - Logged and retried with a short backoff while running. + @brick.execute + def object_detection_loop(self): + """Object detection main loop. + + Maintains WebSocket connection to the model runner and processes object detection messages. + Retries on connection errors until stopped. """ while self._is_running.is_set(): try: with connect(self._uri) as ws: + logger.info("WebSocket connection established") while self._is_running.is_set(): try: message = ws.recv() @@ -142,21 +144,61 @@ def execute(self): except ConnectionClosedOK: raise except (TimeoutError, ConnectionRefusedError, ConnectionClosedError): - logger.warning(f"Connection lost. Retrying...") + logger.warning(f"WebSocket connection lost. Retrying...") raise except Exception as e: logger.exception(f"Failed to process detection: {e}") except ConnectionClosedOK: - logger.debug(f"Disconnected cleanly, exiting WebSocket read loop.") + logger.debug(f"WebSocket disconnected cleanly, exiting loop.") return except (TimeoutError, ConnectionRefusedError, ConnectionClosedError): logger.debug(f"Waiting for model runner. Retrying...") - import time - time.sleep(2) continue except Exception as e: logger.exception(f"Failed to establish WebSocket connection to {self._host}: {e}") + time.sleep(2) + + @brick.execute + def camera_loop(self): + """Camera main loop. + + Captures images from the camera and forwards them over the TCP connection. + Retries on connection errors until stopped. + """ + while self._is_running.is_set(): + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcp_socket: + tcp_socket.connect((self._host, 5050)) + logger.info(f"TCP connection established to {self._host}:5050") + + # Send a priming frame to initialize the EI pipeline and its web server + frame = np.zeros((320, 320, 3), dtype=np.uint8) + jpeg_frame = compress_to_jpeg(frame) + tcp_socket.sendall(jpeg_frame.tobytes()) + + while self._is_running.is_set(): + try: + frame = self._camera.capture() + if frame is None: + time.sleep(0.01) # Brief sleep if no image available + continue + + jpeg_frame = compress_to_jpeg(frame) + tcp_socket.sendall(jpeg_frame.tobytes()) + + except (BrokenPipeError, ConnectionResetError, OSError) as e: + logger.warning(f"TCP connection lost: {e}. Retrying...") + break + except Exception as e: + logger.exception(f"Error sending image: {e}") + + except (ConnectionRefusedError, OSError) as e: + logger.debug(f"TCP connection failed: {e}. Retrying in 2 seconds...") + time.sleep(2) + except Exception as e: + logger.exception(f"Unexpected error in TCP loop: {e}") + time.sleep(2) def _process_message(self, ws: ClientConnection, message: str): jmsg = json.loads(message) diff --git a/src/arduino/app_bricks/video_objectdetection/brick_compose.yaml b/src/arduino/app_bricks/video_objectdetection/brick_compose.yaml index 82ac93ad..7cf67aec 100644 --- a/src/arduino/app_bricks/video_objectdetection/brick_compose.yaml +++ b/src/arduino/app_bricks/video_objectdetection/brick_compose.yaml @@ -9,13 +9,16 @@ services: max-size: "5m" max-file: "2" ports: - - ${BIND_ADDRESS:-0.0.0.0}:4912:4912 + - ${BIND_ADDRESS:-0.0.0.0}:5050:5050 # TCP input for video frames + - ${BIND_ADDRESS:-0.0.0.0}:4912:4912 # Embedded UI port volumes: - "${CUSTOM_MODEL_PATH:-/home/arduino/.arduino-bricks/ei-models/}:${CUSTOM_MODEL_PATH:-/home/arduino/.arduino-bricks/ei-models/}" - "/run/udev:/run/udev" - command: ["--model-file", "${EI_OBJ_DETECTION_MODEL:-/models/ootb/ei/yolo-x-nano.eim}", "--dont-print-predictions", "--mode", "streaming", "--preview-original-resolution", "--camera", "${VIDEO_DEVICE:-/dev/video1}"] + command: ["--model-file", "${EI_OBJ_DETECTION_MODEL:-/models/ootb/ei/yolo-x-nano.eim}", "--dont-print-predictions", "--mode", "streaming-tcp-server", "--preview-original-resolution"] healthcheck: - test: [ "CMD-SHELL", "wget -q --spider http://ei-video-obj-detection-runner:4912 || exit 1" ] + # test: [ "CMD-SHELL", "wget -q --spider http://ei-video-obj-detection-runner:4912 || exit 1" ] + # test: [ "CMD-SHELL", "pgrep -f gstreamer || pgrep -f gst-launch || exit 1" ] + test: [ "CMD-SHELL", "netstat -tuln | grep :5050 || exit 1" ] interval: 2s timeout: 2s retries: 25 diff --git a/src/arduino/app_bricks/visual_anomaly_detection/examples/visual_anomaly_example.py b/src/arduino/app_bricks/visual_anomaly_detection/examples/visual_anomaly_example.py new file mode 100644 index 00000000..91797686 --- /dev/null +++ b/src/arduino/app_bricks/visual_anomaly_detection/examples/visual_anomaly_example.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME = "Visual Anomaly Detection" +import os +from arduino.app_bricks.visual_anomaly_detection import VisualAnomalyDetection +from arduino.app_utils.image import draw_anomaly_markers + +anomaly_detection = VisualAnomalyDetection() + +# Image can be provided as bytes or PIL.Image +img = os.read("path/to/your/image.jpg") + +out = anomaly_detection.detect(img) +if out and "detection" in out: + for i, anomaly in enumerate(out["detection"]): + # For every anomaly detected, print its details + detected_anomaly = anomaly.get("class_name", None) + score = anomaly.get("score", None) + bounding_box = anomaly.get("bounding_box_xyxy", None) + +# Draw the bounding boxes +out_image = draw_anomaly_markers(img, out) diff --git a/src/arduino/app_internal/core/ei.py b/src/arduino/app_internal/core/ei.py index 15ab6d2f..3cea9c07 100644 --- a/src/arduino/app_internal/core/ei.py +++ b/src/arduino/app_internal/core/ei.py @@ -5,8 +5,8 @@ import requests import io from arduino.app_internal.core import load_brick_compose_file, resolve_address -from arduino.app_utils import get_image_bytes, get_image_type, HttpClient -from arduino.app_utils import Logger +from arduino.app_utils.image import get_image_bytes, get_image_type +from arduino.app_utils import Logger, HttpClient logger = Logger(__name__) diff --git a/src/arduino/app_internal/core/peripherals/__init__.py b/src/arduino/app_internal/core/peripherals/__init__.py new file mode 100644 index 00000000..f2c9968e --- /dev/null +++ b/src/arduino/app_internal/core/peripherals/__init__.py @@ -0,0 +1,8 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +from .bpp_codec import BPPCodec +from .bpp_stream_codec import BPPStreamCodec + +__all__ = ["BPPCodec", "BPPStreamCodec"] diff --git a/src/arduino/app_internal/core/peripherals/bpp_codec.py b/src/arduino/app_internal/core/peripherals/bpp_codec.py new file mode 100644 index 00000000..45dd5c08 --- /dev/null +++ b/src/arduino/app_internal/core/peripherals/bpp_codec.py @@ -0,0 +1,253 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +import base64 +import time +import hashlib +import hmac +import secrets +import struct +from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 +from cryptography.exceptions import InvalidTag + +from arduino.app_utils.logger import Logger + +logger = Logger("BPPCodec") + +BPP_VERSION = 0x00 +MODE_NONE = 0x00 +MODE_SIGN = 0x01 +MODE_ENC = 0x02 + +# Big-endian header: [Version:1] [Mode:1] [Timestamp:8] [Random:4] +HEADER_FORMAT = ">BBQL" +HEADER_SIZE = struct.calcsize(HEADER_FORMAT) # 14 bytes +WINDOW_US = 10_000_000 # 10s in µs + + +class ReplayProtection: + """ + Manages the sliding window replay protection and the temporary cache storing + the IVs already seen within the validity window. + """ + + def __init__(self, window_us: int = WINDOW_US): + self.window_us = window_us + self.cache: dict[bytes, int] = {} # IV -> Expiration timestamp + + def check_and_update(self, iv: bytes, timestamp_us: int) -> bool: + """ + Determines if the message is valid by assessing replay attack conditions: + timestamp out of validity window and IV reuse. + """ + now = time.time_ns() // 1_000 + + # Check time window + if abs(now - timestamp_us) > self.window_us: + logger.warning(f"Message outside validity window. Drift: {(now - timestamp_us) / 1000}ms") + return False + + # Check IV reuse + if iv in self.cache: + logger.warning("IV reuse detected") + return False + + # Prune old entries if cache grows too large + if len(self.cache) > 1000: + self._prune(now) + + self.cache[iv] = now + self.window_us + + return True + + def _prune(self, now: int): + # Remove expired entries + expired_keys = [k for k, v in self.cache.items() if now > v] + for k in expired_keys: + del self.cache[k] + + +class BPPCodec: + """ + Binary Peripheral Protocol (BPP) Codec. + Implements a secure container format for peripherals and allows to encode and + decode payloads. + This codec is intended to be used with message-based protocols, i.e. with builtin + message boundaries (e.g., WebSocket). If used with stream-based protocols (e.g., + TCP, BLE, UART), it must be wrapped in BPPStreamCodec. + + The protocol supports three security modes: + - Mode 0: No Security; + - Mode 1: HMAC-SHA256 Signing, useful for authentication and data integrity; + - Mode 2: ChaCha20-Poly1305 Encryption and Signing, providing confidentiality, + authentication and data integrity. + + The binary format is as follows: + + [Version (1)] [Mode (1)] [Timestamp (8)] [Random (4)] [Payload (Var)] [AuthTag/Sig (16/32)] + + - Version: Protocol version (currently 0x01). + - Mode: Security mode (0x00: None, 0x01: HMAC-SHA256, 0x02: ChaCha20-Poly1305). + - Timestamp: Microsecond-precision timestamp (Unix epoch). + - Random: 32-bit random value for uniqueness. + - Payload: Actual data being transmitted. + - AuthTag/Sig: HMAC signature (32 bytes for Mode 1) or AuthTag (16 bytes for Mode 2). + + Text-safe encoding/decoding via Base64URL are also provided. + """ + + def __init__(self, secret: str = "", enable_encryption: bool = False): + """ + Initialize codec. + + Args: + secret: Pre-shared secret. Default: empty (no security). + enable_encryption: If True, uses ChaCha20-Poly1305. If False, uses HMAC-SHA256 if + secret is provided. Default: False. + """ + self.secret = secret.encode() if secret else b"" + self.enable_encryption = enable_encryption and bool(secret) + self.cc_cipher = None + + if self.enable_encryption: + # Derive 32-byte key for ChaCha20 + key = hashlib.sha256(self.secret).digest() + self.cc_cipher = ChaCha20Poly1305(key) + + self.replay_protection = ReplayProtection() + + def encode(self, data: bytes) -> bytes: + """ + Packs data into a BPP message and returns its bytes. + + Args: + data: The payload to encode. + Returns: + The complete BPP message (bytes). + """ + # Assemble the header + mode = MODE_ENC if self.enable_encryption else (MODE_SIGN if self.secret else MODE_NONE) + timestamp_us = time.time_ns() // 1_000 + random_val = secrets.randbits(32) + header = struct.pack(HEADER_FORMAT, BPP_VERSION, mode, timestamp_us, random_val) + + if mode == MODE_ENC and self.cc_cipher: + # Encrypt with ChaCha20-Poly1305, use header as AAD + # Note: cryptography lib appends the 16-byte Poly1305 AuthTag automatically + iv = header[2:] # Last 12 bytes of header (Timestamp + Random) + encrypted_payload = self.cc_cipher.encrypt(iv, data, header) + return header + encrypted_payload + + elif mode == MODE_SIGN and self.secret: + # HMAC Signature + msg_to_sign = header + data + signature = hmac.new(self.secret, msg_to_sign, hashlib.sha256).digest() + return header + data + signature + + else: + # No Security + return header + data + + def decode(self, message: bytes) -> bytes | None: + """ + Unpacks a BPP message and returns its payload. + + Args: + message: The complete BPP message to decode. + Returns: + The decoded payload (bytes) if valid, else None. + """ + if len(message) < HEADER_SIZE: + logger.warning("Message too short for header") + return None + + try: + ver, mode, timestamp_us, random_val = struct.unpack(HEADER_FORMAT, message[:HEADER_SIZE]) + except struct.error: + logger.warning("Header parsing failed") + return None + + if ver != BPP_VERSION: + logger.warning(f"Unsupported version {ver}") + return None + + # Check expected minimum size + footer_size = 16 if mode == MODE_ENC else (32 if mode == MODE_SIGN else 0) + min_size = HEADER_SIZE + footer_size + if len(message) < min_size: + logger.warning("Message too short (truncated)") + return None + + # Check for downgrade attacks + if self.enable_encryption: + if mode != MODE_ENC: + logger.warning(f"Security mode mismatch: expected Mode 2 (encrypt), but received Mode {mode}.") + return None + elif self.secret: + if mode != MODE_SIGN: + logger.warning(f"Security mode mismatch: expected Mode 1 (sign), but received Mode {mode}.") + return None + else: + if mode != MODE_NONE: + logger.warning(f"Security mode mismatch: expected Mode 0 (none), but received Mode {mode}.") + return None + + # Check for replay attacks + replay_id = message[2:HEADER_SIZE] # Timestamp (8) + Random (4) + if not self.replay_protection.check_and_update(replay_id, timestamp_us): + return None + + header_bytes = message[:HEADER_SIZE] + + # Decrypt/verify + try: + if mode == MODE_ENC: + iv = replay_id + ciphertext_with_tag = message[HEADER_SIZE:] + return self.cc_cipher.decrypt(iv, ciphertext_with_tag, header_bytes) + + elif mode == MODE_SIGN: + if len(message) < HEADER_SIZE + 32: + return None + + payload = message[HEADER_SIZE:-32] + received_sig = message[-32:] + + msg_to_verify = header_bytes + payload + expected_sig = hmac.new(self.secret, msg_to_verify, hashlib.sha256).digest() + + if not hmac.compare_digest(received_sig, expected_sig): + logger.warning("HMAC verification failed") + return None + + return payload + + elif mode == MODE_NONE: + return message[HEADER_SIZE:] + + except InvalidTag: + logger.warning(f"Decryption failed: encryption key or data integrity issue") + return None + except Exception as e: + logger.error(f"Unknown error while decoding: {e} ({type(e)})") + return None + + def encode_text(self, data: bytes) -> str: + """ + Encodes a text-safe BPP packet to a Base64URL string. + """ + binary_packet = self.encode(data) + return base64.b64encode(binary_packet).decode("ascii") + + def decode_text(self, b64_str: str) -> bytes | None: + """ + Decodes a text-safe BPP packet from a Base64URL string. + """ + try: + binary_packet = base64.b64decode(b64_str) + return self.decode(binary_packet) + + except Exception as e: + logger.warning(f"Text decode failed: {e}") + return None diff --git a/src/arduino/app_internal/core/peripherals/bpp_stream_codec.py b/src/arduino/app_internal/core/peripherals/bpp_stream_codec.py new file mode 100644 index 00000000..b8b5d5a1 --- /dev/null +++ b/src/arduino/app_internal/core/peripherals/bpp_stream_codec.py @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +import struct +from typing import Iterator + +from .bpp_codec import BPPCodec + +MAGIC = 0xAA +HEADER_FORMAT = ">BIB" +HEADER_SIZE = struct.calcsize(HEADER_FORMAT) # 6 bytes + + +class BPPStreamCodec: + """ + Wraps a BPPCodec to provide support for stream-based protocols (e.g. TCP, + BLE, UART). + + The binary format is as follows: + + [Magic(1)] [Length(4)] [HeaderCRC(1)] [BPP Packet] + + - Magic Byte: 0xAA, marks the start of a BPP frame. + - Length: 4-byte big-endian unsigned int indicating the length of the BPP packet. + - HeaderCRC: Simple checksum over the Length and Magic byte for header integrity. + - BPP Packet: The actual BPP-encoded packet as per BPPCodec. + """ + + def __init__(self, codec: BPPCodec): + self.codec = codec + self._buffer = bytearray() + + def encode(self, data: bytes) -> bytes: + bpp_packet = self.codec.encode(data) + length = len(bpp_packet) + checksum = self._calc_header_checksum(length) + header = struct.pack(HEADER_FORMAT, MAGIC, length, checksum) + return header + bpp_packet + + def decode(self, chunk: bytes = b"") -> Iterator[bytes]: + """ + Ingests a stream chunk and yields all fully decoded BPP payloads found. + + Yields: + Decoded payloads (bytes) + """ + if chunk: + self._buffer.extend(chunk) + + while True: + if not self._buffer: + break + + # Look for the Magic byte + if self._buffer[0] != MAGIC: + try: + idx = self._buffer.index(MAGIC) + del self._buffer[:idx] + except ValueError: + self._buffer.clear() + break + + # Do we have a full header? + if len(self._buffer) < HEADER_SIZE: + break # Need more data, wait for next chunk + + magic, length, checksum = struct.unpack(HEADER_FORMAT, self._buffer[:HEADER_SIZE]) + if self._calc_header_checksum(length) != checksum: + del self._buffer[0] + continue + + total_frame_size = HEADER_SIZE + length + + # Do we have the full frame? + if len(self._buffer) < total_frame_size: + break # Need more data, wait for next chunk + + bpp_raw = self._buffer[HEADER_SIZE:total_frame_size] + del self._buffer[:total_frame_size] + + # Decode BPP payload and yield if valid + payload = self.codec.decode(bytes(bpp_raw)) + if payload is not None: + yield payload + + def _calc_header_checksum(self, length: int) -> int: + """ "Calculates simple checksum for header integrity verification.""" + b0 = (length >> 24) & 0xFF + b1 = (length >> 16) & 0xFF + b2 = (length >> 8) & 0xFF + b3 = length & 0xFF + return (MAGIC ^ b0 ^ b1 ^ b2 ^ b3) & 0xFF diff --git a/src/arduino/app_internal/pipeline/pipeline.py b/src/arduino/app_internal/pipeline/pipeline.py index c265b4b6..aa4217f3 100644 --- a/src/arduino/app_internal/pipeline/pipeline.py +++ b/src/arduino/app_internal/pipeline/pipeline.py @@ -177,11 +177,13 @@ def _run_loop(self, loop_ready_event: threading.Event): self._loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) self._loop.run_until_complete(self._loop.shutdown_asyncgens()) - self._loop.close() - logger.debug("Internal event loop stopped.") except Exception as e: logger.exception(f"Error during event loop cleanup: {e}") - self._loop = None + finally: + if self._loop and not self._loop.is_closed(): + self._loop.close() + self._loop = None + logger.debug("Internal event loop stopped.") async def _async_run_pipeline(self): """The main async logic using Adapters.""" diff --git a/src/arduino/app_peripherals/camera/README.md b/src/arduino/app_peripherals/camera/README.md new file mode 100644 index 00000000..a6512a2d --- /dev/null +++ b/src/arduino/app_peripherals/camera/README.md @@ -0,0 +1,149 @@ +# Camera + +The `Camera` peripheral provides a unified abstraction for capturing images from different camera types and protocols. + +## Features + +- **Universal Interface**: Single API for V4L/USB, IP cameras, and WebSocket cameras +- **Automatic Detection**: Selects appropriate camera implementation based on source +- **Multiple Protocols**: Supports V4L, RTSP, HTTP/MJPEG, and WebSocket streams +- **Thread-Safe**: Safe concurrent access with proper locking +- **Context Manager**: Automatic resource management + +## Quick Start + +Instantiate the default camera: +```python +from arduino.app_peripherals.camera import Camera + +# Default camera (V4L camera at index 0) +camera = Camera() +``` + +Camera needs to be started and stopped explicitly: + +```python +# Specify camera and configuration +camera = Camera(0, resolution=(640, 480), fps=15) +camera.start() + +image = camera.capture() + +camera.stop() +``` + +Or you can leverage context support for doing that automatically: +```python +with Camera(source, **options) as camera: + frame = camera.capture() + if frame is not None: + print(f"Captured frame with shape: {frame.shape}") + # Camera automatically stopped when exiting +``` + +## Frame Adjustments + +The `adjustments` parameter allows you to apply custom transformations to captured frames. This parameter accepts a callable that takes a numpy array (the frame) and returns a modified numpy array. It's also possible to build adjustment pipelines by concatenating these functions with the pipe (|) operator + +```python +import cv2 +from arduino.app_peripherals.camera import Camera +from arduino.app_utils.image import greyscaled + + +def blurred(): + def apply_blur(frame): + return cv2.GaussianBlur(frame, (15, 15), 0) + return PipeableFunction(apply_blur) + +# Using adjustments with Camera +with Camera(0, adjustments=greyscaled) as camera: + frame = camera.capture() + # frame is now grayscale + +# Or with multiple transformations +with Camera(0, adjustments=greyscaled | blurred) as camera: + frame = camera.capture() + # frame is now greyscaled and blurred +``` + +See the arduino.app_utils.image module for more supported adjustments. + +## Camera Types +The Camera class provides automatic camera type detection based on the format of its source argument. keyword arguments will be propagated to the underlying implementation. + +Note: Camera's constructor arguments (except those in its signature) must be provided in keyword format to forward them correctly to the specific camera implementations. + +The underlying camera implementations can also be instantiated explicitly (V4LCamera, IPCamera and WebSocketCamera), if needed. + +### V4L Cameras +For local USB cameras and V4L-compatible devices. + +**Features:** +- Supports cameras compatible with the Video4Linux2 drivers + +```python +camera = Camera(0) # Camera index +camera = Camera("/dev/video0") # Device path +camera = V4LCamera(0) +``` + +### IP Cameras +For network cameras supporting RTSP (Real-Time Streaming Protocol) and HLS (HTTP Live Streaming). + +**Features:** +- Supports capturing RTSP, HLS streams +- Authentication support +- Automatic reconnection + +```python +camera = Camera("rtsp://admin:secret@192.168.1.100/stream") +camera = Camera("http://camera.local/stream", + username="admin", password="secret") +camera = IPCamera("http://camera.local/stream", + username="admin", password="secret") +``` + +### WebSocket Cameras +For hosting a WebSocket server that receives frames from a single client at a time. + +**Features:** +- **Single client limitation**: Only one client can connect at a time +- Stream data from any client with WebSockets support +- Base64, binary, and JSON frame formats +- Supports 8-bit images (e.g. JPEG, PNG 8-bit) + +```python +camera = Camera("ws://0.0.0.0:8080", timeout=5) +camera = WebSocketCamera("0.0.0.0", 8080, timeout=5) +``` + +Client implementation example: +```python +import time +import base64 +import cv2 +import websockets.sync.client as wsclient +import websockets.exceptions as wsexc + + +# Open camera +camera = cv2.VideoCapture(0) +with wsclient.connect("ws://:8080") as websocket: + while True: + time.sleep(1.0 / 15.0) # 15 FPS + ret, frame = camera.read() + if ret: + # Compress frame to JPEG + _, buffer = cv2.imencode('.jpg', frame) + # Convert to base64 + jpeg_b64 = base64.b64encode(buffer).decode('utf-8') + try: + websocket.send(jpeg_b64) + except wsexc.ConnectionClosed: + break +``` + +## Migration from Legacy Camera + +The new Camera abstraction is backward compatible with the existing Camera implementation. Existing code using the old API will continue to work, but will use the new Camera backend. New code should use the improved abstraction for better flexibility and features. diff --git a/src/arduino/app_peripherals/camera/__init__.py b/src/arduino/app_peripherals/camera/__init__.py new file mode 100644 index 00000000..5f09a8a2 --- /dev/null +++ b/src/arduino/app_peripherals/camera/__init__.py @@ -0,0 +1,23 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +from .camera import Camera +from .base_camera import BaseCamera +from .v4l_camera import V4LCamera +from .ip_camera import IPCamera +from .websocket_camera import WebSocketCamera +from .errors import * + +__all__ = [ + "Camera", + "BaseCamera", + "V4LCamera", + "IPCamera", + "WebSocketCamera", + "CameraError", + "CameraConfigError", + "CameraOpenError", + "CameraReadError", + "CameraTransformError", +] diff --git a/src/arduino/app_peripherals/camera/base_camera.py b/src/arduino/app_peripherals/camera/base_camera.py new file mode 100644 index 00000000..22b997b9 --- /dev/null +++ b/src/arduino/app_peripherals/camera/base_camera.py @@ -0,0 +1,423 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +import threading +import time +from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor +from typing import Literal, Optional, Callable +import numpy as np + +from arduino.app_utils import Logger + +from .errors import CameraOpenError, CameraReadError, CameraTransformError + +logger = Logger("Camera") + + +class BaseCamera(ABC): + """ + Abstract base class for camera implementations. + + This class defines the common interface that all camera implementations must follow, + providing a unified API regardless of the underlying camera protocol or type. + """ + + def __init__( + self, + resolution: tuple[int, int] = (640, 480), + fps: int = 10, + adjustments: Callable[[np.ndarray], np.ndarray] | None = None, + auto_reconnect: bool = True, + ): + """ + Initialize the camera base. + + Args: + resolution (tuple, optional): Resolution as (width, height). None uses default resolution. + fps (int): Frames per second to capture from the camera. + adjustments (callable, optional): Function or function pipeline to adjust frames that takes + a numpy array and returns a numpy array. Default: None + auto_reconnect (bool, optional): Enable automatic reconnection on failure. Default: True. + """ + self.resolution = resolution + if fps <= 0: + raise ValueError("FPS must be a positive integer") + self.fps = fps + self.adjustments = adjustments + self.logger = logger # This will be overridden by subclasses if needed + self.name = self.__class__.__name__ # This will be overridden by subclasses if needed + self._status: Literal["disconnected", "connected", "streaming", "paused"] = "disconnected" + + self._camera_lock = threading.Lock() + self._is_started = False + self._last_capture_time = time.monotonic() + self._desired_interval = 1.0 / fps if fps > 0 else 0 + + # Auto-reconnection parameters + self.auto_reconnect = auto_reconnect + self.auto_reconnect_delay = 1.0 + self.first_connection_max_retries = 10 + + # Stream interruption detection + self._consecutive_none_frames = 0 + + # Event handling + self._on_status_changed_cb: Callable[[str, dict], None] | None = None + self._event_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="CameraEvent") + + @property + def status(self) -> Literal["disconnected", "connected", "streaming", "paused"]: + """Read-only property for camera status.""" + return self._status + + @property + def _none_frame_threshold(self) -> int: + """Heuristic: 750ms of empty frames based on current fps.""" + return int(0.75 * self.fps) if self.fps > 0 else 10 + + def start(self) -> None: + """ + Start the camera capture with retries, if enabled. + + Raises: + CameraOpenError: If the camera fails to start after the retries. + Exception: If the underlying implementation fails to start the camera. + """ + with self._camera_lock: + self.logger.info("Starting camera...") + + attempt = 0 + while not self.is_started(): + try: + self._open_camera() + self._is_started = True + self._last_capture_time = time.monotonic() + self.logger.info(f"Successfully started {self.name}") + except CameraOpenError as e: # We consider this a fatal error so we don't retry + self.logger.error(f"Fatal error while starting {self.name}: {e}") + raise + except Exception as e: + if not self.auto_reconnect: + raise + attempt += 1 + if attempt >= self.first_connection_max_retries: + raise CameraOpenError( + f"Failed to start camera {self.name} after {self.first_connection_max_retries} attempts, last error is: {e}" + ) + + delay = min(self.auto_reconnect_delay * (2 ** (attempt - 1)), 60) # Exponential backoff + self.logger.warning( + f"Failed attempt {attempt}/{self.first_connection_max_retries} at starting camera {self.name}: {e}. " + f"Retrying in {delay:.1f}s..." + ) + time.sleep(delay) + + def stop(self) -> None: + """Stop the camera and release resources.""" + with self._camera_lock: + if not self.is_started(): + return + + self.logger.info("Stopping camera...") + + try: + self._close_camera() + self._event_executor.shutdown() + self._is_started = False + self.logger.info(f"Successfully stopped {self.name}") + except Exception as e: + self.logger.warning(f"Failed to stop camera: {e}") + + def capture(self) -> Optional[np.ndarray]: + """ + Capture a frame from the camera, respecting the configured FPS. + + Returns: + Numpy array or None if no frame is available. + + Raises: + CameraReadError: If the camera is not started. + Exception: If the underlying implementation fails to read a frame. + """ + with self._camera_lock: + if not self.is_started(): + raise CameraReadError(f"Attempted to read from {self.name} before starting it.") + + # Apply FPS throttling + if self._desired_interval > 0: + current_time = time.monotonic() + elapsed = current_time - self._last_capture_time + if elapsed < self._desired_interval: + time.sleep(self._desired_interval - elapsed) + + self._last_capture_time = time.monotonic() + + frame = self._read_frame() + if frame is None: + self._consecutive_none_frames += 1 + if self._consecutive_none_frames >= self._none_frame_threshold: + self._set_status("paused") + return None + + self._set_status("streaming") + + self._consecutive_none_frames = 0 + + if self.adjustments is not None: + try: + frame = self.adjustments(frame) + except Exception as e: + raise CameraTransformError(f"Frame transformation failed ({self.adjustments}): {e}") + + return frame + + def stream(self): + """ + Continuously capture frames from the camera. + + This is a generator that yields frames continuously while the camera is started. + Built on top of capture() for convenience. + + Yields: + np.ndarray: Video frames as numpy arrays. + + Raises: + CameraReadError: If the camera is not started. + """ + if not self.is_started(): + raise CameraReadError(f"Attempted to acquire stream from {self.name} before starting it.") + + while self.is_started(): + frame = self.capture() + if frame is not None: + yield frame + + def record(self, duration) -> np.ndarray: + """ + Record video for a specified duration and return it as a numpy array of raw frames. + + Args: + duration (float): Recording duration in seconds. + + Returns: + np.ndarray: numpy array of raw frames. + + Raises: + CameraReadError: If camera is not started or any read error occurs. + ValueError: If duration is not positive. + MemoryError: If memory allocation for the full recording fails. + """ + if duration <= 0: + raise ValueError("Duration must be positive") + + total_frames = int(self.fps * duration) + + # Get shape and dtype from first frame + first_frame = self.capture() + if first_frame is None: + raise CameraOpenError("Failed to inspect the video stream for metadata.") + frame_shape = first_frame.shape + frame_dtype = first_frame.dtype + + try: + frames = np.zeros((total_frames, *frame_shape), dtype=frame_dtype) + except Exception as e: + raise MemoryError(f"Could not allocate memory for {total_frames} frames: {e}") + + count = 1 + frames[0] = first_frame + while count < total_frames: + frame = self.capture() + if frame is not None: + frames[count] = frame + count += 1 + + if not frames.any(): + raise CameraReadError("No frames captured during recording.") + + return frames[:count] + + def record_avi(self, duration) -> np.ndarray: + """ + Record video for a specified duration and return as MJPEG in AVI container. + + Args: + duration (float): Recording duration in seconds. + + Returns: + np.ndarray: AVI file containing MJPEG video. + + Raises: + CameraReadError: If camera is not started or any read error occurs. + ValueError: If duration is not positive. + MemoryError: If memory allocation for the full recording fails. + """ + if duration <= 0: + raise ValueError("Duration must be positive") + + import os + import tempfile + import cv2 + + total_frames = int(self.fps * duration) + + # Get width and height from first frame + first_frame = self.capture() + if first_frame is None: + raise CameraOpenError("Failed to inspect the video stream for metadata.") + height, width = first_frame.shape[:2] + + # Write MJPEG AVI to a temp file + with tempfile.NamedTemporaryFile(suffix=".avi", delete=False) as tmp: + filename = tmp.name + + avi_data = np.empty(0, dtype=np.uint8) + try: + fourcc = cv2.VideoWriter.fourcc(*"MJPG") + out = cv2.VideoWriter(filename, fourcc, self.fps, (width, height)) + + frame = first_frame + for i in range(total_frames): + if frame is not None: + if frame.dtype != np.uint8: + frame = _to_uint8(frame) + out.write(frame) + + if i < total_frames - 1: + frame = self.capture() + + out.release() + with open(filename, "rb") as f: + avi_data = f.read() + finally: + os.remove(filename) + + return np.frombuffer(avi_data, dtype=np.uint8) + + def is_started(self) -> bool: + """Check if the camera has been started.""" + return self._is_started + + def on_status_changed(self, callback: Callable[[str, dict], None] | None): + """Registers or removes a callback to be triggered on camera lifecycle events. + + When a camera status changes, the provided callback function will be invoked. + If None is provided, the callback will be removed. + + Args: + callback (Callable[[str, dict], None]): A callback that will be called every time the + camera status changes with the new status and any associated data. The status names + depend on the actual camera implementation being used. Some common events are: + - 'connected': The camera has been reconnected. + - 'disconnected': The camera has been disconnected. + - 'streaming': The stream is streaming. + - 'paused': The stream has been paused and is temporarily unavailable. + callback (None): To unregister the current callback, if any. + + Example: + def on_status(status: str, data: dict): + print(f"Camera is now: {status}") + print(f"Data: {data}") + # Here you can add your code to react to the event + + camera.on_status_changed(on_status) + """ + if callback is None: + self._on_status_changed_cb = None + else: + + def _callback_wrapper(new_status: str, data: dict): + try: + callback(new_status, data) + except Exception as e: + self.logger.error(f"Callback for '{new_status}' status failed with error: {e}") + + self._on_status_changed_cb = _callback_wrapper + + @abstractmethod + def _open_camera(self) -> None: + """ + Open the camera connection. + + Must be implemented by subclasses and status changes should be emitted accordingly. + """ + pass + + @abstractmethod + def _close_camera(self) -> None: + """ + Close the camera connection. + + Must be implemented by subclasses and status changes should be emitted accordingly. + """ + pass + + @abstractmethod + def _read_frame(self) -> Optional[np.ndarray]: + """ + Read a single frame from the camera. + + Must be implemented by subclasses. + """ + pass + + def _set_status(self, new_status: Literal["disconnected", "connected", "streaming", "paused"], data: dict | None = None) -> None: + """ + Updates the current status of the camera and invokes the registered status + changed callback in the background, if any. + + Only allowed states and transitions are considered, other states are ignored. + Allowed states are: + - disconnected + - connected + - streaming + - paused + + Args: + new_status (str): The name of the new status. + data (dict): Additional data associated with the status change. + """ + + if self.status == new_status: + return + + allowed_transitions = { + "disconnected": ["connected"], + "connected": ["disconnected", "streaming"], + "streaming": ["paused", "disconnected"], + "paused": ["streaming", "disconnected"], + } + + # If new status is not in the state machine, ignore it + if new_status not in allowed_transitions: + return + + # Check if new_status is an allowed transition for the current status + if new_status in allowed_transitions[self._status]: + self._status = new_status + if self._on_status_changed_cb is not None: + self._event_executor.submit(self._on_status_changed_cb, new_status, data if data is not None else {}) + + def __enter__(self): + """Context manager entry.""" + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.stop() + + +def _to_uint8(frame) -> np.ndarray: + """Normalize and convert to uint8.""" + if np.issubdtype(frame.dtype, np.floating): + # We adopt the OpenCV convention: float images are in [0, 1] + frame = np.clip(frame * 255, 0, 255) + + elif np.issubdtype(frame.dtype, np.integer) and frame.dtype != np.uint8: + info = np.iinfo(frame.dtype) + frame = (frame.astype(np.float32) - info.min) / (info.max - info.min) * 255 + + return frame.astype(np.uint8) diff --git a/src/arduino/app_peripherals/camera/camera.py b/src/arduino/app_peripherals/camera/camera.py new file mode 100644 index 00000000..6461fc37 --- /dev/null +++ b/src/arduino/app_peripherals/camera/camera.py @@ -0,0 +1,121 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +from collections.abc import Callable +from urllib.parse import urlparse + +import numpy as np + +from .base_camera import BaseCamera +from .errors import CameraConfigError + + +class Camera: + """ + Unified Camera class that can be configured for different camera types. + + This class serves as both a factory and a wrapper, automatically creating + the appropriate camera implementation based on the provided configuration. + + Supports: + - V4L Cameras (local cameras connected to the system), the default + - IP Cameras (network-based cameras via RTSP, HLS) + - WebSocket Cameras (input video streams via WebSocket client) + + Note: constructor arguments (except those in signature) must be provided in + keyword format to forward them correctly to the specific camera implementations. + """ + + def __new__( + cls, + source: str | int = 0, + resolution: tuple[int, int] = (640, 480), + fps: int = 10, + adjustments: Callable[[np.ndarray], np.ndarray] | None = None, + **kwargs, + ) -> BaseCamera: + """Create a camera instance based on the source type. + + Args: + source (Union[str, int]): Camera source identifier. Supports: + - int: V4L camera index (e.g., 0, 1) + - str: V4L camera index (e.g., "0", "1") or device path (i.e., "/dev/video0", "/dev/v4l/by-id/...", "/dev/v4l/by-path/...") + - str: URL for IP cameras (e.g., "rtsp://...", "http://...") + - str: WebSocket URL for input streams (e.g., "ws://0.0.0.0:8080") + resolution (tuple, optional): Frame resolution as (width, height). + Default: (640, 480) + fps (int, optional): Target frames per second. Default: 10 + adjustments (callable, optional): Function pipeline to adjust frames that takes a + numpy array and returns a numpy array. Default: None + **kwargs: Camera-specific configuration parameters grouped by type: + V4L Camera Parameters: + device (int, optional): V4L device index override. Default: 0. + IP Camera Parameters: + url (str): Camera stream URL + username (str, optional): Authentication username + password (str, optional): Authentication password + timeout (float, optional): Connection timeout in seconds. Default: 10.0 + WebSocket Camera Parameters: + host (str, optional): WebSocket server host. Default: "0.0.0.0" + port (int, optional): WebSocket server port. Default: 8080 + timeout (float, optional): Connection timeout in seconds. Default: 10.0 + frame_format (str, optional): Expected frame format ("base64", "binary", + "json"). Default: "base64" + + Returns: + BaseCamera: Appropriate camera implementation instance + + Raises: + CameraConfigError: If source type is not supported or parameters are invalid + CameraOpenError: If the camera cannot be opened + + Examples: + V4L Camera: + + ```python + camera = Camera(0, resolution=(640, 480), fps=30) + camera = Camera("/dev/video1", fps=15) + ``` + + IP Camera: + + ```python + camera = Camera("rtsp://192.168.1.100:554/stream", username="admin", password="secret", timeout=15.0) + camera = Camera("http://192.168.1.100:8080/video.mp4") + ``` + + WebSocket Camera: + + ```python + camera = Camera("ws://0.0.0.0:8080", frame_format="json") + camera = Camera("ws://192.168.1.100:8080", timeout=5) + ``` + """ + if isinstance(source, int) or (isinstance(source, str) and source.isdigit()): + # V4L Camera + from .v4l_camera import V4LCamera + + return V4LCamera(source, resolution=resolution, fps=fps, adjustments=adjustments, **kwargs) + elif isinstance(source, str): + parsed = urlparse(source) + if parsed.scheme in ["http", "https", "rtsp"]: + # IP Camera + from .ip_camera import IPCamera + + return IPCamera(source, resolution=resolution, fps=fps, adjustments=adjustments, **kwargs) + elif parsed.scheme in ["ws", "wss"]: + # WebSocket Camera - extract host and port from URL + from .websocket_camera import WebSocketCamera + + port = parsed.port or 8080 + return WebSocketCamera(port=port, resolution=resolution, fps=fps, adjustments=adjustments, **kwargs) + elif source.startswith("/dev/video") or source.startswith("/dev/v4l/by-id/") or source.startswith("/dev/v4l/by-path/"): + # V4L device path, by-id, or by-path + from .v4l_camera import V4LCamera + + return V4LCamera(source, resolution=resolution, fps=fps, adjustments=adjustments, **kwargs) + else: + raise CameraConfigError(f"Unsupported camera source: {source}") + else: + raise CameraConfigError(f"Invalid source type: {type(source)}") diff --git a/src/arduino/app_peripherals/camera/errors.py b/src/arduino/app_peripherals/camera/errors.py new file mode 100644 index 00000000..45cf80fb --- /dev/null +++ b/src/arduino/app_peripherals/camera/errors.py @@ -0,0 +1,33 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + + +class CameraError(Exception): + """Base exception for camera-related errors.""" + + pass + + +class CameraOpenError(CameraError): + """Exception raised when the camera cannot be opened.""" + + pass + + +class CameraReadError(CameraError): + """Exception raised when reading from camera fails.""" + + pass + + +class CameraConfigError(CameraError): + """Exception raised when camera configuration is invalid.""" + + pass + + +class CameraTransformError(CameraError): + """Exception raised when frame transformation fails.""" + + pass diff --git a/src/arduino/app_peripherals/camera/examples/1_initialize.py b/src/arduino/app_peripherals/camera/examples/1_initialize.py new file mode 100644 index 00000000..4d55cbdd --- /dev/null +++ b/src/arduino/app_peripherals/camera/examples/1_initialize.py @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME = "Initialize camera input" +# EXAMPLE_REQUIRES = "Requires a connected camera" +from arduino.app_peripherals.camera import Camera, V4LCamera + + +default = Camera() # Uses default camera (V4L) + +# The following two are equivalent +camera = Camera(2, resolution=(640, 480), fps=15) # Infers camera type +v4l = V4LCamera(2, (640, 480), 15) # Explicitly request V4L camera + +# Note: Camera's constructor arguments (except those in its signature) +# must be provided in keyword format to forward them correctly to the +# specific camera implementations. diff --git a/src/arduino/app_peripherals/camera/examples/2_capture_image.py b/src/arduino/app_peripherals/camera/examples/2_capture_image.py new file mode 100644 index 00000000..9295573f --- /dev/null +++ b/src/arduino/app_peripherals/camera/examples/2_capture_image.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME = "Capture an image" +# EXAMPLE_REQUIRES = "Requires a connected camera" +import numpy as np +from arduino.app_peripherals.camera import Camera + + +camera = Camera() +camera.start() +image: np.ndarray = camera.capture() +camera.stop() diff --git a/src/arduino/app_peripherals/camera/examples/3_capture_video.py b/src/arduino/app_peripherals/camera/examples/3_capture_video.py new file mode 100644 index 00000000..a5f30206 --- /dev/null +++ b/src/arduino/app_peripherals/camera/examples/3_capture_video.py @@ -0,0 +1,27 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME = "Capture a video" +# EXAMPLE_REQUIRES = "Requires a connected camera" +import time +import numpy as np +from arduino.app_peripherals.camera import Camera + + +camera = Camera(fps=15) +camera.start() + +# You can capture a video by capturing frames in a loop +start_time = time.time() +while time.time() - start_time < 5: + image: np.ndarray = camera.capture() + # You can process the image here if needed, e.g save it + +# Or you can obtain the same in a single sentence +recording: np.ndarray = camera.record(duration=5) + +# Or you can ask for an AVI recording +recording: np.ndarray = camera.record_avi(duration=5) + +camera.stop() diff --git a/src/arduino/app_peripherals/camera/examples/4_capture_hls.py b/src/arduino/app_peripherals/camera/examples/4_capture_hls.py new file mode 100644 index 00000000..64bf777c --- /dev/null +++ b/src/arduino/app_peripherals/camera/examples/4_capture_hls.py @@ -0,0 +1,23 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME = "Capture an HLS (HTTP Live Stream) video" +import time +import numpy as np +from arduino.app_peripherals.camera import Camera + + +# Capture a freely available HLS playlist for testing +# Note: Public streams can be unreliable and may go offline without notice. +url = "https://demo.unified-streaming.com/k8s/features/stable/video/tears-of-steel/tears-of-steel.ism/.m3u8" + +camera = Camera(url) +camera.start() + +start_time = time.time() +while time.time() - start_time < 5: + image: np.ndarray = camera.capture() + # You can process the image here if needed, e.g save it + +camera.stop() diff --git a/src/arduino/app_peripherals/camera/examples/5_capture_rtsp.py b/src/arduino/app_peripherals/camera/examples/5_capture_rtsp.py new file mode 100644 index 00000000..0e71c47f --- /dev/null +++ b/src/arduino/app_peripherals/camera/examples/5_capture_rtsp.py @@ -0,0 +1,23 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME = "Capture an RTSP (Real-Time Streaming Protocol) video" +import time +import numpy as np +from arduino.app_peripherals.camera import Camera + + +# Capture a freely available RTSP stream for testing +# Note: Public streams can be unreliable and may go offline without notice. +url = "rtsp://170.93.143.139/rtplive/470011e600ef003a004ee33696235daa" + +camera = Camera(url) +camera.start() + +start_time = time.time() +while time.time() - start_time < 5: + image: np.ndarray = camera.capture() + # You can process the image here if needed, e.g save it + +camera.stop() diff --git a/src/arduino/app_peripherals/camera/examples/6_capture_websocket.py b/src/arduino/app_peripherals/camera/examples/6_capture_websocket.py new file mode 100644 index 00000000..724644c0 --- /dev/null +++ b/src/arduino/app_peripherals/camera/examples/6_capture_websocket.py @@ -0,0 +1,21 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME = "Capture an input WebSocket video stream" +# EXAMPLE_REQUIRES = "Requires a connected camera" +import time +import numpy as np +from arduino.app_peripherals.camera import Camera + + +# Expose a WebSocket camera stream for clients to connect to +camera = Camera("ws://0.0.0.0:8080", timeout=5) +camera.start() + +start_time = time.time() +while time.time() - start_time < 5: + image: np.ndarray = camera.capture() + # You can process the image here if needed, e.g save it + +camera.stop() diff --git a/src/arduino/app_peripherals/camera/ip_camera.py b/src/arduino/app_peripherals/camera/ip_camera.py new file mode 100644 index 00000000..091ac80d --- /dev/null +++ b/src/arduino/app_peripherals/camera/ip_camera.py @@ -0,0 +1,171 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +import cv2 +import numpy as np +import requests +from urllib.parse import urlparse +from collections.abc import Callable + +from arduino.app_utils import Logger + +from .camera import BaseCamera +from .errors import CameraConfigError, CameraOpenError, CameraReadError + +logger = Logger("IPCamera") + + +class IPCamera(BaseCamera): + """ + IP Camera implementation for network-based cameras. + + Supports RTSP, HTTP, and HTTPS camera streams. + Can handle authentication and various streaming protocols. + """ + + def __init__( + self, + url: str, + username: str | None = None, + password: str | None = None, + timeout: int = 10, + resolution: tuple[int, int] = (640, 480), + fps: int = 10, + adjustments: Callable[[np.ndarray], np.ndarray] | None = None, + auto_reconnect: bool = True, + ): + """ + Initialize IP camera. + + Args: + url: Camera stream URL (i.e. rtsp://..., http://..., https://...) + username: Optional authentication username + password: Optional authentication password + timeout: Connection timeout in seconds + resolution (tuple, optional): Resolution as (width, height). None uses default resolution. + fps (int): Frames per second to capture from the camera. + adjustments (callable, optional): Function or function pipeline to adjust frames that takes + a numpy array and returns a numpy array. Default: None + auto_reconnect (bool, optional): Enable automatic reconnection on failure. Default: True. + """ + super().__init__(resolution, fps, adjustments, auto_reconnect) + self.url = url + self.username = username + self.password = password + self.timeout = timeout + self.logger = logger + + self._cap = None + + self._last_reconnection_attempt = 0.0 # Used for auto-reconnection when _read_frame is called + + self._validate_url() + + def _validate_url(self) -> None: + """Validate the camera URL format.""" + try: + parsed = urlparse(self.url) + if parsed.scheme not in ["http", "https", "rtsp"]: + raise CameraConfigError(f"Unsupported URL scheme: {parsed.scheme}") + except Exception as e: + raise CameraConfigError(f"Invalid URL format: {e}") + + def _open_camera(self) -> None: + """Open the IP camera connection.""" + url = self._build_url() + + # Test connectivity first for HTTP streams + if self.url.startswith(("http://", "https://")): + self._test_http_connectivity() + + try: + self._cap = cv2.VideoCapture(url) + if not self._cap.isOpened(): + raise RuntimeError(f"Failed to open IP camera at {self.url}") + + self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Reduce buffer to minimize latency + + # Test by reading one frame + ret, frame = self._cap.read() + if not ret and frame is None: + raise RuntimeError(f"Read test failed for IP camera at {self.url}") + + self._set_status("connected", {"camera_url": self.url}) + + except Exception as e: + logger.error(f"Unexpected error opening IP camera at {self.url}: {e}") + if self._cap is not None: + self._cap.release() + self._cap = None + raise + + def _build_url(self) -> str: + """Build URL with authentication if credentials provided.""" + # If no username or password provided as parameters, return original URL + if not self.username or not self.password: + return self.url + + parsed = urlparse(self.url) + + # Override any URL credentials if credentials are provided + auth_netloc = f"{self.username}:{self.password}@{parsed.hostname}" + if parsed.port: + auth_netloc += f":{parsed.port}" + + return f"{parsed.scheme}://{auth_netloc}{parsed.path}" + + def _test_http_connectivity(self) -> None: + """Test HTTP/HTTPS camera connectivity.""" + try: + auth = None + if self.username and self.password: + auth = (self.username, self.password) + + response = requests.head(self.url, auth=auth, timeout=self.timeout, allow_redirects=True) + + if response.status_code not in [200, 206]: # 206 for partial content + raise RuntimeError(f"HTTP camera returned status {response.status_code}: {self.url}") + + except requests.RequestException as e: + raise RuntimeError(f"Cannot connect to HTTP camera {self.url}: {e}") + + def _close_camera(self) -> None: + """Close the IP camera connection.""" + if self._cap is not None: + self._cap.release() + self._cap = None + self._set_status("disconnected", {"camera_url": self.url}) + + def _read_frame(self) -> np.ndarray | None: + """Read a frame from the IP camera with automatic reconnection.""" + try: + if self._cap is None: + if not self.auto_reconnect: + return None + + # Prevent spamming connection attempts + import time + + current_time = time.monotonic() + elapsed = current_time - self._last_reconnection_attempt + if elapsed < self.auto_reconnect_delay: + time.sleep(self.auto_reconnect_delay - elapsed) + self._last_reconnection_attempt = current_time + + self._open_camera() + self.logger.info(f"Successfully reconnected to IP camera at {self.url}") + + ret, frame = self._cap.read() + if (not ret and frame is None) or not self._cap.isOpened(): + raise CameraReadError(f"Invalid frame returned") + + return frame + + except (CameraOpenError, CameraReadError, Exception) as e: + self.logger.error( + f"Failed to read from IP camera at {self.url}: {e}." + f"{' Retrying...' if self.auto_reconnect else ' Auto-reconnect is disabled, please restart the app.'}" + ) + self._close_camera() # Will reconnect on next call + return None diff --git a/src/arduino/app_peripherals/camera/v4l_camera.py b/src/arduino/app_peripherals/camera/v4l_camera.py new file mode 100644 index 00000000..d6d3a951 --- /dev/null +++ b/src/arduino/app_peripherals/camera/v4l_camera.py @@ -0,0 +1,241 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +import os +import time +from typing import Optional +import cv2 +import numpy as np +from collections.abc import Callable + +from arduino.app_utils import Logger + +from .camera import BaseCamera +from .errors import CameraOpenError, CameraReadError + +logger = Logger("V4LCamera") + + +class V4LCamera(BaseCamera): + """ + V4L (Video4Linux) camera implementation for physically connected cameras. + + This class handles USB cameras and other V4L-compatible devices on Linux systems. + """ + + def __init__( + self, + device: str | int = 0, + resolution: tuple[int, int] = (640, 480), + fps: int = 10, + adjustments: Optional[Callable[[np.ndarray], np.ndarray]] = None, + auto_reconnect: bool = True, + ): + """ + Initialize V4L camera. + + Args: + device: Camera identifier - can be: + - int: Camera index (e.g., 0, 1) + - str: Camera index as string or device path + resolution (tuple, optional): Resolution as (width, height). None uses default resolution. + fps (int, optional): Frames per second to capture from the camera. Default: 10. + adjustments (callable, optional): Function or function pipeline to adjust frames that takes + a numpy array and returns a numpy array. Default: None + auto_reconnect (bool, optional): Enable automatic reconnection on failure. Default: True. + """ + super().__init__(resolution, fps, adjustments, auto_reconnect) + + self.v4l_path = self._resolve_stable_path(device) + self.name = self._resolve_name(self.v4l_path) # Override parent name with a human-readable name + self.logger = logger + + self._cap = None + + self._last_reconnection_attempt = 0.0 # Used for auto-reconnection when _read_frame is called + + def _resolve_stable_path(self, device: str | int) -> str: + """ + Resolve a camera identifier to a link stable across reconnections. + + Args: + device: Camera identifier + + Returns: + str: stable path to the camera device + + Raises: + CameraOpenError: If camera cannot be resolved + """ + if isinstance(device, str) and device.startswith("/dev/v4l/by-id"): + # Already a stable link + return device + elif isinstance(device, str) and device.startswith("/dev/v4l/by-path"): + # A stable link, but not the one we want, resolve to by-id + if not os.path.exists(device): + raise CameraOpenError(f"Device path {device} does not exist") + resolved_path = os.path.realpath(device) + video_path = resolved_path + elif isinstance(device, int) or (isinstance(device, str) and device.isdigit()): + # Treat as /dev/video + dev_num = int(device) + video_path = f"/dev/video{dev_num}" + elif isinstance(device, str) and device.startswith("/dev/video"): + # A device node path + video_path = device + else: + raise CameraOpenError(f"Unrecognized device identifier: {device}") + + # Now map /dev/videoX to a stable link in /dev/v4l/by-id + by_id_dir = "/dev/v4l/by-id/" + if not os.path.exists(by_id_dir): + raise CameraOpenError(f"Directory '{by_id_dir}' not found.") + + try: + for entry in os.listdir(by_id_dir): + full_path = os.path.join(by_id_dir, entry) + if os.path.islink(full_path): + target = os.path.realpath(full_path) + if target == video_path: + return full_path + except Exception as e: + raise CameraOpenError(f"Error resolving stable link: {e}") + + raise CameraOpenError(f"No stable link found for device {device} (resolved as {video_path})") + + def _resolve_name(self, stable_path: str) -> str: + """ + Resolve a human-readable name for the camera whose stable path is provided + by looking at /sys/class/video4linux/