diff --git a/samples/aruco_stream.py b/samples/aruco_stream.py
new file mode 100644
index 0000000..90778e1
--- /dev/null
+++ b/samples/aruco_stream.py
@@ -0,0 +1,49 @@
+import time
+
+from src.modules.emu import Emu
+from src.modules.imaging.detector import ArucoDetector
+from src.modules.imaging.camera import DebugCamera
+from src.modules.imaging.location import DebugLocationProvider
+from src.modules.imaging.analysis import ImageAnalysisDelegate
+from src.modules.imaging.aruco_stream import ArucoEmuStreamer
+from src.modules.imaging.video_emu_stream import SharedFrameCamera, VideoEmuStreamer
+
+
+def main():
+ emu = Emu("tmp")
+ emu.start_comms()
+ time.sleep(1)
+
+ # Base camera — swap DebugCamera for RPiCamera/OakdCamera on real hardware
+ base_camera = DebugCamera("res/test-image.jpeg")
+
+ # SharedFrameCamera captures at 15fps; both video stream and analysis read from it
+ shared_cam = SharedFrameCamera(base_camera, fps=15)
+ shared_cam.start()
+
+ # Video stream → EMU /video endpoint (browser:
)
+ video_streamer = VideoEmuStreamer(emu, shared_cam, fps=15, quality=70)
+ video_streamer.start()
+
+ # ArUco detection pipeline reads latest frame from shared camera
+ detector = ArucoDetector()
+ location_provider = DebugLocationProvider()
+
+ analysis = ImageAnalysisDelegate(detector, shared_cam, location_provider)
+ aruco_streamer = ArucoEmuStreamer(emu, "tmp")
+ analysis.subscribe(aruco_streamer.on_detection)
+ analysis.start()
+
+ try:
+ while True:
+ time.sleep(1)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ analysis.stop()
+ video_streamer.stop()
+ shared_cam.stop()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/modules/emu/emu.py b/src/modules/emu/emu.py
index b808e72..6531a8d 100644
--- a/src/modules/emu/emu.py
+++ b/src/modules/emu/emu.py
@@ -2,9 +2,8 @@
import queue
import asyncio
-from typing import Callable, List
+from typing import Callable, List, Optional
-from aiohttp import web
from aiohttp import web
import aiohttp
@@ -28,6 +27,9 @@ def __init__(self, img_dir: str):
self._on_connect = lambda: None
self._is_connected = False
+ self._latest_video_frame: Optional[bytes] = None
+ self._video_lock = threading.Lock()
+
def start_comms(self):
self._comms_thread = threading.Thread(target=self._start_comms_loop, daemon=True)
self._comms_thread.start()
@@ -38,9 +40,7 @@ def send_image(self, path: str):
the path sent should be accessable from within self.img_dir so it can be accessed through
/images/{filename}
"""
- print(path)
img_url = "/images/" + path
- print(img_url)
content = {
"type": "img",
"value": img_url
@@ -66,6 +66,14 @@ def send_msg(self, message: str):
"""
self._send_queue.put(message)
+ def send_video_frame(self, jpeg_bytes: bytes):
+ """
+ Update the latest video frame served at /video (MJPEG stream).
+ Call this from a background thread with raw JPEG bytes.
+ """
+ with self._video_lock:
+ self._latest_video_frame = jpeg_bytes
+
def set_on_connect(self, func: Callable):
self._on_connect = func
@@ -75,8 +83,11 @@ def _start_comms_loop(self):
"""
print("start_comms loop")
self.app = web.Application()
- self.app.add_routes([web.static('/images', self.img_dir),
- web.get('/ws', self.handle_websocket)])
+ self.app.add_routes([
+ web.static('/images', self.img_dir),
+ web.get('/ws', self.handle_websocket),
+ web.get('/video', self.handle_video_stream),
+ ])
web.run_app(self.app, handle_signals=False)
@@ -102,6 +113,36 @@ async def consumer_handler(self, ws):
elif msg.type == aiohttp.WSMsgType.ERROR:
print("WebSocket error:", ws.exception())
+ async def handle_video_stream(self, request):
+ """
+ MJPEG stream endpoint. Connect with:
+ No frontend JS needed — the browser handles multipart natively.
+ """
+ response = web.StreamResponse(headers={
+ 'Content-Type': 'multipart/x-mixed-replace; boundary=frame',
+ 'Cache-Control': 'no-cache',
+ })
+ await response.prepare(request)
+
+ try:
+ while True:
+ with self._video_lock:
+ frame = self._latest_video_frame
+
+ if frame is not None:
+ await response.write(
+ b'--frame\r\n'
+ b'Content-Type: image/jpeg\r\n\r\n' +
+ frame +
+ b'\r\n'
+ )
+
+ await asyncio.sleep(1 / 15)
+ except (ConnectionResetError, asyncio.CancelledError):
+ pass
+
+ return response
+
async def handle_websocket(self, request):
ws = web.WebSocketResponse()
await ws.prepare(request)
diff --git a/src/modules/imaging/.aruco_stream.py.swp b/src/modules/imaging/.aruco_stream.py.swp
new file mode 100644
index 0000000..c618dce
Binary files /dev/null and b/src/modules/imaging/.aruco_stream.py.swp differ
diff --git a/src/modules/imaging/analysis.py b/src/modules/imaging/analysis.py
index 94ad15a..1c90d54 100644
--- a/src/modules/imaging/analysis.py
+++ b/src/modules/imaging/analysis.py
@@ -1,4 +1,4 @@
-from typing import Callable, Optional, List, Callable, Any
+from typing import Callable, Optional, List, Any, Tuple
import threading
# from multiprocessing import Process
@@ -59,7 +59,7 @@ def __init__(self,
self.location_provider = location_provider
self.navigation_provider = navigation_provider
- self.subscribers: List[Callable[[Image.Image, float, float], Any]] = []
+ self.subscribers: List[Callable[[Image.Image, Optional[BoundingBox], Optional[Tuple[float, float]]], Any]] = []
self.camera_attributes = CameraAttributes()
self.thread = None
self.loop = True
@@ -110,9 +110,9 @@ def _analyze_image(self):
if inference:
x, y = get_object_location(self.camera_attributes,
inference)
- subscriber(im, (x, y))
+ subscriber(im, bounding_box, (x, y))
else:
- subscriber(im, None)
+ subscriber(im, None, None)
def _analysis_loop(self):
"""
diff --git a/src/modules/imaging/aruco_stream.py b/src/modules/imaging/aruco_stream.py
new file mode 100644
index 0000000..45daed0
--- /dev/null
+++ b/src/modules/imaging/aruco_stream.py
@@ -0,0 +1,33 @@
+from PIL import Image, ImageDraw
+import os
+from typing import Optional, Tuple
+
+from src.modules.emu import Emu
+from src.modules.imaging.detector import BoundingBox
+
+
+class ArucoEmuStreamer:
+ def __init__(self, emu: Emu, output_dir: str = "tmp"):
+ self.emu = emu
+ self.output_dir = output_dir
+ self.counter = 0
+
+ if not os.path.exists(self.output_dir):
+ os.makedirs(self.output_dir)
+
+ def on_detection(self, image: Image.Image, bounding_box: Optional[BoundingBox], position: Optional[Tuple[float, float]]):
+ im = image.copy()
+
+ if bounding_box is not None:
+ draw = ImageDraw.Draw(im)
+ bb = (bounding_box.position.x, bounding_box.position.y,
+ bounding_box.position.x + bounding_box.size.x,
+ bounding_box.position.y + bounding_box.size.y)
+ draw.rectangle(bb, outline="red")
+
+ filename = f"{self.counter}.jpeg"
+ filepath = os.path.join(self.output_dir, filename)
+ im.save(filepath)
+
+ self.emu.send_image(filename)
+ self.counter += 1
diff --git a/src/modules/imaging/detector.py b/src/modules/imaging/detector.py
index 65dd8fc..0a7b381 100644
--- a/src/modules/imaging/detector.py
+++ b/src/modules/imaging/detector.py
@@ -1,5 +1,7 @@
-from functools import lru_cache
+from functools import lru_cache, cached_property
from typing import Optional
+from dataclasses import dataclass
+import math
from PIL import Image
import numpy as np
@@ -56,35 +58,35 @@ def max(v1: 'Vec2', v2: 'Vec2') -> 'Vec2':
class BoundingBox:
-def __init__(self, position: Vec2, size: Vec2):
- self.position = position
- self.size = size
+ def __init__(self, position: Vec2, size: Vec2):
+ self.position = position
+ self.size = size
-@lru_cache(maxsize=2)
-def intersection(self, other: 'BoundingBox') -> float:
- top_left = Vec2.max(self.position, other.position)
- bottom_right = Vec2.min(self.position + self.size,
- other.position + other.size)
+ @lru_cache(maxsize=2)
+ def intersection(self, other: 'BoundingBox') -> float:
+ top_left = Vec2.max(self.position, other.position)
+ bottom_right = Vec2.min(self.position + self.size,
+ other.position + other.size)
- size = bottom_right - top_left
+ size = bottom_right - top_left
- intersection = size.x * size.y
- return max(intersection, 0)
+ intersection = size.x * size.y
+ return max(intersection, 0)
-def union(self, other: 'BoundingBox') -> float:
- intersection = self.intersection(other)
- if intersection == 0:
- return 0
+ def union(self, other: 'BoundingBox') -> float:
+ intersection = self.intersection(other)
+ if intersection == 0:
+ return 0
- union = self.size.x * self.size.y + other.size.x * other.size.y - intersection
- return union
+ union = self.size.x * self.size.y + other.size.x * other.size.y - intersection
+ return union
-def intersection_over_union(self, pred: 'BoundingBox') -> Optional[float]:
- intersection = self.intersection(pred)
- if intersection == 0:
- return 0
- iou = intersection / self.union(pred)
- return iou
+ def intersection_over_union(self, pred: 'BoundingBox') -> Optional[float]:
+ intersection = self.intersection(pred)
+ if intersection == 0:
+ return 0
+ iou = intersection / self.union(pred)
+ return iou
class BaseDetector:
@@ -115,16 +117,17 @@ def predict(self, image: Image.Image) -> Optional[BoundingBox]:
return BoundingBox(Vec2(x, y), Vec2(w, h))
-class ArucoDetector():
+class ArucoDetector(BaseDetector):
def predict(self, image: Image.Image) -> Optional[BoundingBox]:
img = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
- params = cv2.aruco.DetectorParemeters()
+ params = cv2.aruco.DetectorParameters()
- corners, ids, rejected = cv2.aruco.detectMarkers(img, aruco_dict, parameters=params)
+ detector = cv2.aruco.ArucoDetector(aruco_dict, params)
+ corners, ids, rejected = detector.detectMarkers(img)
if ids:
for c in zip(corners, ids):
diff --git a/src/modules/imaging/video_emu_stream.py b/src/modules/imaging/video_emu_stream.py
new file mode 100644
index 0000000..20aac33
--- /dev/null
+++ b/src/modules/imaging/video_emu_stream.py
@@ -0,0 +1,88 @@
+import io
+import threading
+import time
+
+from PIL import Image
+
+from src.modules.emu import Emu
+from src.modules.imaging.camera import CameraProvider
+
+
+class SharedFrameCamera(CameraProvider):
+ """
+ Wraps a CameraProvider and shares the latest captured frame across
+ multiple consumers (e.g. video streamer + analysis pipeline) without
+ both threads calling camera.capture() simultaneously.
+ """
+
+ def __init__(self, camera: CameraProvider, fps: int = 15):
+ self._camera = camera
+ self._fps = fps
+ self._latest: Image.Image | None = None
+ self._lock = threading.Lock()
+ self._running = False
+ self._thread: threading.Thread | None = None
+
+ def start(self):
+ self._running = True
+ self._thread = threading.Thread(target=self._capture_loop, daemon=True)
+ self._thread.start()
+
+ def stop(self):
+ self._running = False
+ if self._thread:
+ self._thread.join()
+
+ def get_latest(self) -> Image.Image | None:
+ with self._lock:
+ return self._latest
+
+ def capture(self) -> Image.Image:
+ while True:
+ with self._lock:
+ if self._latest is not None:
+ return self._latest
+ time.sleep(0.01)
+
+ def _capture_loop(self):
+ interval = 1 / self._fps
+ while self._running:
+ frame = self._camera.capture()
+ with self._lock:
+ self._latest = frame
+ time.sleep(interval)
+
+
+class VideoEmuStreamer:
+ """
+ Continuously grabs frames from a SharedFrameCamera and pushes them
+ to EMU's MJPEG /video endpoint at the given fps.
+ """
+
+ def __init__(self, emu: Emu, shared_cam: SharedFrameCamera, fps: int = 15, quality: int = 70):
+ self.emu = emu
+ self.shared_cam = shared_cam
+ self.fps = fps
+ self.quality = quality
+ self._running = False
+ self._thread: threading.Thread | None = None
+
+ def start(self):
+ self._running = True
+ self._thread = threading.Thread(target=self._stream_loop, daemon=True)
+ self._thread.start()
+
+ def stop(self):
+ self._running = False
+ if self._thread:
+ self._thread.join()
+
+ def _stream_loop(self):
+ interval = 1 / self.fps
+ while self._running:
+ frame = self.shared_cam.get_latest()
+ if frame is not None:
+ buf = io.BytesIO()
+ frame.save(buf, format="JPEG", quality=self.quality)
+ self.emu.send_video_frame(buf.getvalue())
+ time.sleep(interval)
diff --git a/test/test_analysis.py b/test/test_analysis.py
index 080f1b8..42245dd 100644
--- a/test/test_analysis.py
+++ b/test/test_analysis.py
@@ -37,9 +37,12 @@ def test_analysis_subscriber():
global detected
detected = None
- def _callback(_image, lon, lat):
+ def _callback(_image, bounding_box, pos):
global detected
- detected = Vec2(lon, lat)
+ if pos is not None:
+ detected = Vec2(pos[0], pos[1])
+ else:
+ detected = None
analysis.subscribe(_callback)
@@ -87,7 +90,7 @@ def test_analysis_debugger():
location_provider = DebugLocationProvider()
location_provider.set_altitude(1.0)
analysis = ImageAnalysisDelegate(detector, camera, location_provider,
- debug)
+ debugger=debug)
def run_analysis():
detector.bounding_box = BoundingBox(Vec2(0, 0), Vec2(100, 100))