diff --git a/README.md b/README.md index e0fffa3..0743014 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,9 @@ # Description -FrameTransforms is a lightweight Python pacakge to simplify frame transformations. It supports: +FrameTransforms is a lightweight, native Python pacakge to simplify frame transformations. It supports: 1. Registration and update of relative coordinate frames. 2. Automatic computation of transitive transformations. +3. Multithreaded access. ## Application Consider a simple robot consisting of a mobile base and a camera mounted on a gimbal. diff --git a/frame_transforms/registry.py b/frame_transforms/registry.py index e924a23..645551c 100644 --- a/frame_transforms/registry.py +++ b/frame_transforms/registry.py @@ -1,10 +1,12 @@ -from typing import Generic, Hashable, TypeVar +from typing import Any, Callable, Generic, Hashable, TypeVar +from threading import Lock import numpy as np # Key to identify coordinate frames in the registry. FrameID_T = TypeVar("FrameID_T", bound=Hashable) +Ret_T = TypeVar("Ret_T", bound=Any) class InvaidTransformationError(Exception): @@ -19,8 +21,6 @@ class Registry(Generic[FrameID_T]): maintaining a directed acyclic graph (DAG) of relationships. Made for use with 4x4 3D transformation matrices. - - TODO: Support concurrency """ def __init__(self, world_frame: FrameID_T): @@ -39,6 +39,14 @@ def __init__(self, world_frame: FrameID_T): world_frame: {world_frame: [world_frame]} } + # For thread safety, implement as third readers-writers problem (no starvation). + # Reference: https://en.wikipedia.org/wiki/Readers%E2%80%93writers_problem#Third_readers%E2%80%93writers_problem + self._read_count = 0 + + self._resource_lock = Lock() + self._counts_lock = Lock() + self._service_queue = Lock() + def get_transform(self, from_frame: FrameID_T, to_frame: FrameID_T) -> np.ndarray: """ Gets the transformation matrix from one frame to another. @@ -50,6 +58,13 @@ def get_transform(self, from_frame: FrameID_T, to_frame: FrameID_T) -> np.ndarra Returns: The transformation matrix from `from_frame` to `to_frame`. """ + return self._concurrent_read( + lambda: self._get_transform_unsafe(from_frame, to_frame) + ) + + def _get_transform_unsafe( + self, from_frame: FrameID_T, to_frame: FrameID_T + ) -> np.ndarray: path = self._get_path(from_frame, to_frame) transformation = np.eye(4) @@ -75,6 +90,13 @@ def add_transform( to_frame: The destination frame. transform: The transformation matrix from `from_frame` to `to_frame`. """ + self._concurrent_write( + lambda: self._add_transform_unsafe(from_frame, to_frame, transform) + ) + + def _add_transform_unsafe( + self, from_frame: FrameID_T, to_frame: FrameID_T, transform: np.ndarray + ): if from_frame in self._adjacencies and to_frame in self._adjacencies: raise InvaidTransformationError( "Both frames already exist in the registry." @@ -102,6 +124,22 @@ def update(self, from_frame: FrameID_T, to_frame: FrameID_T, transform: np.ndarr Note that `from_frame` and `to_frame` must have been added together in the registry, i.e., they are attached to each other. However, they can be in any order. + Args: + from_frame: The source frame whose transformation is being updated. + to_frame: The destination frame (should be the parent of `from_frame`). + transform: The new transformation matrix from `from_frame` to `to_frame`. + """ + self._concurrent_write( + lambda: self._update_unsafe(from_frame, to_frame, transform) + ) + + def _update_unsafe( + self, from_frame: FrameID_T, to_frame: FrameID_T, transform: np.ndarray + ): + """ + Internal method to update the transformation between two frames. + This is used by the `update` method to ensure that the frames are already connected. + Args: from_frame: The source frame whose transformation is being updated. to_frame: The destination frame (should be the parent of `from_frame`). @@ -125,6 +163,40 @@ def update(self, from_frame: FrameID_T, to_frame: FrameID_T, transform: np.ndarr self._adjacencies[from_frame][to_frame] = transform self._adjacencies[to_frame][from_frame] = np.linalg.inv(transform) + def _concurrent_read(self, func: Callable[[], Ret_T]) -> Ret_T: + """ + Wrapper to execute a synchrnous, thread-unsafe function that reads from the registry. + """ + self._service_queue.acquire() + self._counts_lock.acquire() + + self._read_count += 1 + if self._read_count == 1: + self._resource_lock.acquire() + + self._service_queue.release() + self._counts_lock.release() + + try: + return func() + finally: + with self._counts_lock: + self._read_count -= 1 + if self._read_count == 0: + self._resource_lock.release() + + def _concurrent_write(self, func: Callable[[], Ret_T]) -> Ret_T: + """ + Wrapper to execute a synchronous, thread-unsafe function that writes to the registry. + """ + with self._service_queue: + self._resource_lock.acquire() + + try: + return func() + finally: + self._resource_lock.release() + def _update_paths(self, new_frame: FrameID_T): """ Updates the paths in the registry after adding a new frame. @@ -164,14 +236,9 @@ def _get_path(self, from_frame: FrameID_T, to_frame: FrameID_T) -> list[FrameID_ Returns: A list of frames representing the path from `from_frame` to `to_frame`. """ - if from_frame not in self._adjacencies: - raise InvaidTransformationError( - f"Frame {from_frame} does not exist in the registry." - ) - - if to_frame not in self._adjacencies: + try: + return self._paths[from_frame][to_frame] + except KeyError: raise InvaidTransformationError( - f"Frame {to_frame} does not exist in the registry." + f"Either {from_frame} or {to_frame} does not exist in the registry." ) - - return self._paths[from_frame][to_frame] diff --git a/pyproject.toml b/pyproject.toml index ab026f1..d4efaf5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "frame-transforms" -version = "0.1.2" +version = "0.2.0" readme = "README.md" description = "Automatically compute and apply coordinate frame transformations"