Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions samples/aruco_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import time

from src.modules.emu import Emu
from src.modules.imaging.detector import ArucoDetector
from src.modules.imaging.camera import DebugCamera
from src.modules.imaging.location import DebugLocationProvider
from src.modules.imaging.analysis import ImageAnalysisDelegate
from src.modules.imaging.aruco_stream import ArucoEmuStreamer
from src.modules.imaging.video_emu_stream import SharedFrameCamera, VideoEmuStreamer


def main():
emu = Emu("tmp")
emu.start_comms()
time.sleep(1)

# Base camera — swap DebugCamera for RPiCamera/OakdCamera on real hardware
base_camera = DebugCamera("res/test-image.jpeg")

# SharedFrameCamera captures at 15fps; both video stream and analysis read from it
shared_cam = SharedFrameCamera(base_camera, fps=15)
shared_cam.start()

# Video stream → EMU /video endpoint (browser: <img src="http://HOST:8080/video">)
video_streamer = VideoEmuStreamer(emu, shared_cam, fps=15, quality=70)
video_streamer.start()

# ArUco detection pipeline reads latest frame from shared camera
detector = ArucoDetector()
location_provider = DebugLocationProvider()

analysis = ImageAnalysisDelegate(detector, shared_cam, location_provider)
aruco_streamer = ArucoEmuStreamer(emu, "tmp")
analysis.subscribe(aruco_streamer.on_detection)
analysis.start()

try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
analysis.stop()
video_streamer.stop()
shared_cam.stop()


if __name__ == "__main__":
main()
53 changes: 47 additions & 6 deletions src/modules/emu/emu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
import queue

import asyncio
from typing import Callable, List
from typing import Callable, List, Optional

from aiohttp import web
from aiohttp import web
import aiohttp

Expand All @@ -28,6 +27,9 @@ def __init__(self, img_dir: str):
self._on_connect = lambda: None
self._is_connected = False

self._latest_video_frame: Optional[bytes] = None
self._video_lock = threading.Lock()

def start_comms(self):
self._comms_thread = threading.Thread(target=self._start_comms_loop, daemon=True)
self._comms_thread.start()
Expand All @@ -38,9 +40,7 @@ def send_image(self, path: str):
the path sent should be accessable from within self.img_dir so it can be accessed through
/images/{filename}
"""
print(path)
img_url = "/images/" + path
print(img_url)
content = {
"type": "img",
"value": img_url
Expand All @@ -66,6 +66,14 @@ def send_msg(self, message: str):
"""
self._send_queue.put(message)

def send_video_frame(self, jpeg_bytes: bytes):
"""
Update the latest video frame served at /video (MJPEG stream).
Call this from a background thread with raw JPEG bytes.
"""
with self._video_lock:
self._latest_video_frame = jpeg_bytes

def set_on_connect(self, func: Callable):
self._on_connect = func

Expand All @@ -75,8 +83,11 @@ def _start_comms_loop(self):
"""
print("start_comms loop")
self.app = web.Application()
self.app.add_routes([web.static('/images', self.img_dir),
web.get('/ws', self.handle_websocket)])
self.app.add_routes([
web.static('/images', self.img_dir),
web.get('/ws', self.handle_websocket),
web.get('/video', self.handle_video_stream),
])

web.run_app(self.app, handle_signals=False)

Expand All @@ -102,6 +113,36 @@ async def consumer_handler(self, ws):
elif msg.type == aiohttp.WSMsgType.ERROR:
print("WebSocket error:", ws.exception())

async def handle_video_stream(self, request):
"""
MJPEG stream endpoint. Connect with: <img src="http://HOST:PORT/video">
No frontend JS needed — the browser handles multipart natively.
"""
response = web.StreamResponse(headers={
'Content-Type': 'multipart/x-mixed-replace; boundary=frame',
'Cache-Control': 'no-cache',
})
await response.prepare(request)

try:
while True:
with self._video_lock:
frame = self._latest_video_frame

if frame is not None:
await response.write(
b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' +
frame +
b'\r\n'
)

await asyncio.sleep(1 / 15)
except (ConnectionResetError, asyncio.CancelledError):
pass

return response

async def handle_websocket(self, request):
ws = web.WebSocketResponse()
await ws.prepare(request)
Expand Down
Binary file added src/modules/imaging/.aruco_stream.py.swp
Binary file not shown.
8 changes: 4 additions & 4 deletions src/modules/imaging/analysis.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Optional, List, Callable, Any
from typing import Callable, Optional, List, Any, Tuple

import threading
# from multiprocessing import Process
Expand Down Expand Up @@ -59,7 +59,7 @@ def __init__(self,
self.location_provider = location_provider
self.navigation_provider = navigation_provider

self.subscribers: List[Callable[[Image.Image, float, float], Any]] = []
self.subscribers: List[Callable[[Image.Image, Optional[BoundingBox], Optional[Tuple[float, float]]], Any]] = []
self.camera_attributes = CameraAttributes()
self.thread = None
self.loop = True
Expand Down Expand Up @@ -110,9 +110,9 @@ def _analyze_image(self):
if inference:
x, y = get_object_location(self.camera_attributes,
inference)
subscriber(im, (x, y))
subscriber(im, bounding_box, (x, y))
else:
subscriber(im, None)
subscriber(im, None, None)

def _analysis_loop(self):
"""
Expand Down
33 changes: 33 additions & 0 deletions src/modules/imaging/aruco_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from PIL import Image, ImageDraw
import os
from typing import Optional, Tuple

from src.modules.emu import Emu
from src.modules.imaging.detector import BoundingBox


class ArucoEmuStreamer:
def __init__(self, emu: Emu, output_dir: str = "tmp"):
self.emu = emu
self.output_dir = output_dir
self.counter = 0

if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)

def on_detection(self, image: Image.Image, bounding_box: Optional[BoundingBox], position: Optional[Tuple[float, float]]):
im = image.copy()

if bounding_box is not None:
draw = ImageDraw.Draw(im)
bb = (bounding_box.position.x, bounding_box.position.y,
bounding_box.position.x + bounding_box.size.x,
bounding_box.position.y + bounding_box.size.y)
draw.rectangle(bb, outline="red")

filename = f"{self.counter}.jpeg"
filepath = os.path.join(self.output_dir, filename)
im.save(filepath)

self.emu.send_image(filename)
self.counter += 1
57 changes: 30 additions & 27 deletions src/modules/imaging/detector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from functools import lru_cache
from functools import lru_cache, cached_property
from typing import Optional
from dataclasses import dataclass
import math

from PIL import Image
import numpy as np
Expand Down Expand Up @@ -56,35 +58,35 @@ def max(v1: 'Vec2', v2: 'Vec2') -> 'Vec2':

class BoundingBox:

def __init__(self, position: Vec2, size: Vec2):
self.position = position
self.size = size
def __init__(self, position: Vec2, size: Vec2):
self.position = position
self.size = size

@lru_cache(maxsize=2)
def intersection(self, other: 'BoundingBox') -> float:
top_left = Vec2.max(self.position, other.position)
bottom_right = Vec2.min(self.position + self.size,
other.position + other.size)
@lru_cache(maxsize=2)
def intersection(self, other: 'BoundingBox') -> float:
top_left = Vec2.max(self.position, other.position)
bottom_right = Vec2.min(self.position + self.size,
other.position + other.size)

size = bottom_right - top_left
size = bottom_right - top_left

intersection = size.x * size.y
return max(intersection, 0)
intersection = size.x * size.y
return max(intersection, 0)

def union(self, other: 'BoundingBox') -> float:
intersection = self.intersection(other)
if intersection == 0:
return 0
def union(self, other: 'BoundingBox') -> float:
intersection = self.intersection(other)
if intersection == 0:
return 0

union = self.size.x * self.size.y + other.size.x * other.size.y - intersection
return union
union = self.size.x * self.size.y + other.size.x * other.size.y - intersection
return union

def intersection_over_union(self, pred: 'BoundingBox') -> Optional[float]:
intersection = self.intersection(pred)
if intersection == 0:
return 0
iou = intersection / self.union(pred)
return iou
def intersection_over_union(self, pred: 'BoundingBox') -> Optional[float]:
intersection = self.intersection(pred)
if intersection == 0:
return 0
iou = intersection / self.union(pred)
return iou


class BaseDetector:
Expand Down Expand Up @@ -115,16 +117,17 @@ def predict(self, image: Image.Image) -> Optional[BoundingBox]:
return BoundingBox(Vec2(x, y), Vec2(w, h))


class ArucoDetector():
class ArucoDetector(BaseDetector):

def predict(self, image: Image.Image) -> Optional[BoundingBox]:
img = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)

aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)

params = cv2.aruco.DetectorParemeters()
params = cv2.aruco.DetectorParameters()

corners, ids, rejected = cv2.aruco.detectMarkers(img, aruco_dict, parameters=params)
detector = cv2.aruco.ArucoDetector(aruco_dict, params)
corners, ids, rejected = detector.detectMarkers(img)

if ids:
for c in zip(corners, ids):
Expand Down
88 changes: 88 additions & 0 deletions src/modules/imaging/video_emu_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import io
import threading
import time

from PIL import Image

from src.modules.emu import Emu
from src.modules.imaging.camera import CameraProvider


class SharedFrameCamera(CameraProvider):
"""
Wraps a CameraProvider and shares the latest captured frame across
multiple consumers (e.g. video streamer + analysis pipeline) without
both threads calling camera.capture() simultaneously.
"""

def __init__(self, camera: CameraProvider, fps: int = 15):
self._camera = camera
self._fps = fps
self._latest: Image.Image | None = None
self._lock = threading.Lock()
self._running = False
self._thread: threading.Thread | None = None

def start(self):
self._running = True
self._thread = threading.Thread(target=self._capture_loop, daemon=True)
self._thread.start()

def stop(self):
self._running = False
if self._thread:
self._thread.join()

def get_latest(self) -> Image.Image | None:
with self._lock:
return self._latest

def capture(self) -> Image.Image:
while True:
with self._lock:
if self._latest is not None:
return self._latest
time.sleep(0.01)

def _capture_loop(self):
interval = 1 / self._fps
while self._running:
frame = self._camera.capture()
with self._lock:
self._latest = frame
time.sleep(interval)


class VideoEmuStreamer:
"""
Continuously grabs frames from a SharedFrameCamera and pushes them
to EMU's MJPEG /video endpoint at the given fps.
"""

def __init__(self, emu: Emu, shared_cam: SharedFrameCamera, fps: int = 15, quality: int = 70):
self.emu = emu
self.shared_cam = shared_cam
self.fps = fps
self.quality = quality
self._running = False
self._thread: threading.Thread | None = None

def start(self):
self._running = True
self._thread = threading.Thread(target=self._stream_loop, daemon=True)
self._thread.start()

def stop(self):
self._running = False
if self._thread:
self._thread.join()

def _stream_loop(self):
interval = 1 / self.fps
while self._running:
frame = self.shared_cam.get_latest()
if frame is not None:
buf = io.BytesIO()
frame.save(buf, format="JPEG", quality=self.quality)
self.emu.send_video_frame(buf.getvalue())
time.sleep(interval)
Loading
Loading