diff --git a/samples/aruco_stream.py b/samples/aruco_stream.py new file mode 100644 index 0000000..90778e1 --- /dev/null +++ b/samples/aruco_stream.py @@ -0,0 +1,49 @@ +import time + +from src.modules.emu import Emu +from src.modules.imaging.detector import ArucoDetector +from src.modules.imaging.camera import DebugCamera +from src.modules.imaging.location import DebugLocationProvider +from src.modules.imaging.analysis import ImageAnalysisDelegate +from src.modules.imaging.aruco_stream import ArucoEmuStreamer +from src.modules.imaging.video_emu_stream import SharedFrameCamera, VideoEmuStreamer + + +def main(): + emu = Emu("tmp") + emu.start_comms() + time.sleep(1) + + # Base camera — swap DebugCamera for RPiCamera/OakdCamera on real hardware + base_camera = DebugCamera("res/test-image.jpeg") + + # SharedFrameCamera captures at 15fps; both video stream and analysis read from it + shared_cam = SharedFrameCamera(base_camera, fps=15) + shared_cam.start() + + # Video stream → EMU /video endpoint (browser: ) + video_streamer = VideoEmuStreamer(emu, shared_cam, fps=15, quality=70) + video_streamer.start() + + # ArUco detection pipeline reads latest frame from shared camera + detector = ArucoDetector() + location_provider = DebugLocationProvider() + + analysis = ImageAnalysisDelegate(detector, shared_cam, location_provider) + aruco_streamer = ArucoEmuStreamer(emu, "tmp") + analysis.subscribe(aruco_streamer.on_detection) + analysis.start() + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + pass + finally: + analysis.stop() + video_streamer.stop() + shared_cam.stop() + + +if __name__ == "__main__": + main() diff --git a/src/modules/emu/emu.py b/src/modules/emu/emu.py index b808e72..6531a8d 100644 --- a/src/modules/emu/emu.py +++ b/src/modules/emu/emu.py @@ -2,9 +2,8 @@ import queue import asyncio -from typing import Callable, List +from typing import Callable, List, Optional -from aiohttp import web from aiohttp import web import aiohttp @@ -28,6 +27,9 @@ def __init__(self, img_dir: str): self._on_connect = lambda: None self._is_connected = False + self._latest_video_frame: Optional[bytes] = None + self._video_lock = threading.Lock() + def start_comms(self): self._comms_thread = threading.Thread(target=self._start_comms_loop, daemon=True) self._comms_thread.start() @@ -38,9 +40,7 @@ def send_image(self, path: str): the path sent should be accessable from within self.img_dir so it can be accessed through /images/{filename} """ - print(path) img_url = "/images/" + path - print(img_url) content = { "type": "img", "value": img_url @@ -66,6 +66,14 @@ def send_msg(self, message: str): """ self._send_queue.put(message) + def send_video_frame(self, jpeg_bytes: bytes): + """ + Update the latest video frame served at /video (MJPEG stream). + Call this from a background thread with raw JPEG bytes. + """ + with self._video_lock: + self._latest_video_frame = jpeg_bytes + def set_on_connect(self, func: Callable): self._on_connect = func @@ -75,8 +83,11 @@ def _start_comms_loop(self): """ print("start_comms loop") self.app = web.Application() - self.app.add_routes([web.static('/images', self.img_dir), - web.get('/ws', self.handle_websocket)]) + self.app.add_routes([ + web.static('/images', self.img_dir), + web.get('/ws', self.handle_websocket), + web.get('/video', self.handle_video_stream), + ]) web.run_app(self.app, handle_signals=False) @@ -102,6 +113,36 @@ async def consumer_handler(self, ws): elif msg.type == aiohttp.WSMsgType.ERROR: print("WebSocket error:", ws.exception()) + async def handle_video_stream(self, request): + """ + MJPEG stream endpoint. Connect with: + No frontend JS needed — the browser handles multipart natively. + """ + response = web.StreamResponse(headers={ + 'Content-Type': 'multipart/x-mixed-replace; boundary=frame', + 'Cache-Control': 'no-cache', + }) + await response.prepare(request) + + try: + while True: + with self._video_lock: + frame = self._latest_video_frame + + if frame is not None: + await response.write( + b'--frame\r\n' + b'Content-Type: image/jpeg\r\n\r\n' + + frame + + b'\r\n' + ) + + await asyncio.sleep(1 / 15) + except (ConnectionResetError, asyncio.CancelledError): + pass + + return response + async def handle_websocket(self, request): ws = web.WebSocketResponse() await ws.prepare(request) diff --git a/src/modules/imaging/.aruco_stream.py.swp b/src/modules/imaging/.aruco_stream.py.swp new file mode 100644 index 0000000..c618dce Binary files /dev/null and b/src/modules/imaging/.aruco_stream.py.swp differ diff --git a/src/modules/imaging/analysis.py b/src/modules/imaging/analysis.py index 94ad15a..1c90d54 100644 --- a/src/modules/imaging/analysis.py +++ b/src/modules/imaging/analysis.py @@ -1,4 +1,4 @@ -from typing import Callable, Optional, List, Callable, Any +from typing import Callable, Optional, List, Any, Tuple import threading # from multiprocessing import Process @@ -59,7 +59,7 @@ def __init__(self, self.location_provider = location_provider self.navigation_provider = navigation_provider - self.subscribers: List[Callable[[Image.Image, float, float], Any]] = [] + self.subscribers: List[Callable[[Image.Image, Optional[BoundingBox], Optional[Tuple[float, float]]], Any]] = [] self.camera_attributes = CameraAttributes() self.thread = None self.loop = True @@ -110,9 +110,9 @@ def _analyze_image(self): if inference: x, y = get_object_location(self.camera_attributes, inference) - subscriber(im, (x, y)) + subscriber(im, bounding_box, (x, y)) else: - subscriber(im, None) + subscriber(im, None, None) def _analysis_loop(self): """ diff --git a/src/modules/imaging/aruco_stream.py b/src/modules/imaging/aruco_stream.py new file mode 100644 index 0000000..45daed0 --- /dev/null +++ b/src/modules/imaging/aruco_stream.py @@ -0,0 +1,33 @@ +from PIL import Image, ImageDraw +import os +from typing import Optional, Tuple + +from src.modules.emu import Emu +from src.modules.imaging.detector import BoundingBox + + +class ArucoEmuStreamer: + def __init__(self, emu: Emu, output_dir: str = "tmp"): + self.emu = emu + self.output_dir = output_dir + self.counter = 0 + + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + + def on_detection(self, image: Image.Image, bounding_box: Optional[BoundingBox], position: Optional[Tuple[float, float]]): + im = image.copy() + + if bounding_box is not None: + draw = ImageDraw.Draw(im) + bb = (bounding_box.position.x, bounding_box.position.y, + bounding_box.position.x + bounding_box.size.x, + bounding_box.position.y + bounding_box.size.y) + draw.rectangle(bb, outline="red") + + filename = f"{self.counter}.jpeg" + filepath = os.path.join(self.output_dir, filename) + im.save(filepath) + + self.emu.send_image(filename) + self.counter += 1 diff --git a/src/modules/imaging/detector.py b/src/modules/imaging/detector.py index 65dd8fc..0a7b381 100644 --- a/src/modules/imaging/detector.py +++ b/src/modules/imaging/detector.py @@ -1,5 +1,7 @@ -from functools import lru_cache +from functools import lru_cache, cached_property from typing import Optional +from dataclasses import dataclass +import math from PIL import Image import numpy as np @@ -56,35 +58,35 @@ def max(v1: 'Vec2', v2: 'Vec2') -> 'Vec2': class BoundingBox: -def __init__(self, position: Vec2, size: Vec2): - self.position = position - self.size = size + def __init__(self, position: Vec2, size: Vec2): + self.position = position + self.size = size -@lru_cache(maxsize=2) -def intersection(self, other: 'BoundingBox') -> float: - top_left = Vec2.max(self.position, other.position) - bottom_right = Vec2.min(self.position + self.size, - other.position + other.size) + @lru_cache(maxsize=2) + def intersection(self, other: 'BoundingBox') -> float: + top_left = Vec2.max(self.position, other.position) + bottom_right = Vec2.min(self.position + self.size, + other.position + other.size) - size = bottom_right - top_left + size = bottom_right - top_left - intersection = size.x * size.y - return max(intersection, 0) + intersection = size.x * size.y + return max(intersection, 0) -def union(self, other: 'BoundingBox') -> float: - intersection = self.intersection(other) - if intersection == 0: - return 0 + def union(self, other: 'BoundingBox') -> float: + intersection = self.intersection(other) + if intersection == 0: + return 0 - union = self.size.x * self.size.y + other.size.x * other.size.y - intersection - return union + union = self.size.x * self.size.y + other.size.x * other.size.y - intersection + return union -def intersection_over_union(self, pred: 'BoundingBox') -> Optional[float]: - intersection = self.intersection(pred) - if intersection == 0: - return 0 - iou = intersection / self.union(pred) - return iou + def intersection_over_union(self, pred: 'BoundingBox') -> Optional[float]: + intersection = self.intersection(pred) + if intersection == 0: + return 0 + iou = intersection / self.union(pred) + return iou class BaseDetector: @@ -115,16 +117,17 @@ def predict(self, image: Image.Image) -> Optional[BoundingBox]: return BoundingBox(Vec2(x, y), Vec2(w, h)) -class ArucoDetector(): +class ArucoDetector(BaseDetector): def predict(self, image: Image.Image) -> Optional[BoundingBox]: img = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50) - params = cv2.aruco.DetectorParemeters() + params = cv2.aruco.DetectorParameters() - corners, ids, rejected = cv2.aruco.detectMarkers(img, aruco_dict, parameters=params) + detector = cv2.aruco.ArucoDetector(aruco_dict, params) + corners, ids, rejected = detector.detectMarkers(img) if ids: for c in zip(corners, ids): diff --git a/src/modules/imaging/video_emu_stream.py b/src/modules/imaging/video_emu_stream.py new file mode 100644 index 0000000..20aac33 --- /dev/null +++ b/src/modules/imaging/video_emu_stream.py @@ -0,0 +1,88 @@ +import io +import threading +import time + +from PIL import Image + +from src.modules.emu import Emu +from src.modules.imaging.camera import CameraProvider + + +class SharedFrameCamera(CameraProvider): + """ + Wraps a CameraProvider and shares the latest captured frame across + multiple consumers (e.g. video streamer + analysis pipeline) without + both threads calling camera.capture() simultaneously. + """ + + def __init__(self, camera: CameraProvider, fps: int = 15): + self._camera = camera + self._fps = fps + self._latest: Image.Image | None = None + self._lock = threading.Lock() + self._running = False + self._thread: threading.Thread | None = None + + def start(self): + self._running = True + self._thread = threading.Thread(target=self._capture_loop, daemon=True) + self._thread.start() + + def stop(self): + self._running = False + if self._thread: + self._thread.join() + + def get_latest(self) -> Image.Image | None: + with self._lock: + return self._latest + + def capture(self) -> Image.Image: + while True: + with self._lock: + if self._latest is not None: + return self._latest + time.sleep(0.01) + + def _capture_loop(self): + interval = 1 / self._fps + while self._running: + frame = self._camera.capture() + with self._lock: + self._latest = frame + time.sleep(interval) + + +class VideoEmuStreamer: + """ + Continuously grabs frames from a SharedFrameCamera and pushes them + to EMU's MJPEG /video endpoint at the given fps. + """ + + def __init__(self, emu: Emu, shared_cam: SharedFrameCamera, fps: int = 15, quality: int = 70): + self.emu = emu + self.shared_cam = shared_cam + self.fps = fps + self.quality = quality + self._running = False + self._thread: threading.Thread | None = None + + def start(self): + self._running = True + self._thread = threading.Thread(target=self._stream_loop, daemon=True) + self._thread.start() + + def stop(self): + self._running = False + if self._thread: + self._thread.join() + + def _stream_loop(self): + interval = 1 / self.fps + while self._running: + frame = self.shared_cam.get_latest() + if frame is not None: + buf = io.BytesIO() + frame.save(buf, format="JPEG", quality=self.quality) + self.emu.send_video_frame(buf.getvalue()) + time.sleep(interval) diff --git a/test/test_analysis.py b/test/test_analysis.py index 080f1b8..42245dd 100644 --- a/test/test_analysis.py +++ b/test/test_analysis.py @@ -37,9 +37,12 @@ def test_analysis_subscriber(): global detected detected = None - def _callback(_image, lon, lat): + def _callback(_image, bounding_box, pos): global detected - detected = Vec2(lon, lat) + if pos is not None: + detected = Vec2(pos[0], pos[1]) + else: + detected = None analysis.subscribe(_callback) @@ -87,7 +90,7 @@ def test_analysis_debugger(): location_provider = DebugLocationProvider() location_provider.set_altitude(1.0) analysis = ImageAnalysisDelegate(detector, camera, location_provider, - debug) + debugger=debug) def run_analysis(): detector.bounding_box = BoundingBox(Vec2(0, 0), Vec2(100, 100))