-
Notifications
You must be signed in to change notification settings - Fork 3k
fix(policies): Fix pi05 incorrect behaviour when threading #2441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a critical threading bug in the pi05 policy where the action queue was shared across all threads, causing random action cross-contamination when using multi-threaded evaluation (e.g., env.max_parallel_tasks > 1).
Key Changes
- Introduced thread-local storage using
threading.local()to isolate action queues per thread - Converted
_action_queueto a property with getter/setter that accesses thread-local storage - Added helper methods
_new_action_queue()and_get_thread_action_queue()to manage thread-local queues
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._queues = { | ||
| ACTION: deque(maxlen=self.config.n_action_steps), | ||
| } |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _queues attribute is not thread-safe. Similar to _action_queue, this dictionary is shared across all threads and could cause the same cross-contamination issues when env.max_parallel_tasks > 1.
Consider applying the same thread-local pattern to _queues or clarifying its intended usage. If it's not actively used elsewhere in the codebase, it might be safe to leave it as-is, but if it's used in multi-threaded contexts, it should also be made thread-local.
| def _new_action_queue(self) -> deque: | ||
| """Create a fresh action queue honoring n_action_steps.""" | ||
| return deque(maxlen=self.config.n_action_steps) | ||
|
|
||
| def _get_thread_action_queue(self) -> deque: | ||
| """Return the action queue scoped to the current thread.""" | ||
| if not hasattr(self, "_thread_local"): | ||
| self._thread_local = threading.local() | ||
| action_queue = getattr(self._thread_local, "action_queue", None) | ||
| if action_queue is None: | ||
| action_queue = self._new_action_queue() | ||
| self._thread_local.action_queue = action_queue | ||
| return action_queue | ||
|
|
||
| @property | ||
| def _action_queue(self) -> deque: | ||
| """Expose the thread-local action queue (backwards compatible attribute).""" | ||
| return self._get_thread_action_queue() | ||
|
|
||
| @_action_queue.setter | ||
| def _action_queue(self, queue: deque) -> None: | ||
| if not hasattr(self, "_thread_local"): | ||
| self._thread_local = threading.local() | ||
|
|
||
| self._thread_local.action_queue = queue | ||
|
|
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider adding a module-level or class-level docstring explaining the thread-safety approach. This would help future maintainers understand why _action_queue is implemented as a property with thread-local storage. For example:
"""
Thread-Safety:
This policy uses thread-local storage for the action queue to support
multi-threaded evaluation scenarios (e.g., env.max_parallel_tasks > 1).
Each thread maintains its own action queue to prevent cross-contamination
of actions between parallel environments.
"""| def reset(self): | ||
| """Reset internal state - called when environment resets.""" | ||
| self._action_queue = deque(maxlen=self.config.n_action_steps) | ||
| self._thread_local = threading.local() |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reset() method unconditionally creates a new threading.local() object, which defeats the purpose of thread-local storage. This will clear the action queue for ALL threads, not just the current thread.
Instead, only the current thread's action queue should be reset. Consider this approach:
def reset(self):
"""Reset internal state - called when environment resets."""
if not hasattr(self, "_thread_local"):
self._thread_local = threading.local()
self._action_queue = self._new_action_queue()
self._queues = {
ACTION: deque(maxlen=self.config.n_action_steps),
}This ensures that _thread_local is initialized only once per policy instance, while still resetting the current thread's action queue.
| self._thread_local = threading.local() | |
| if not hasattr(self, "_thread_local"): | |
| self._thread_local = threading.local() |
What this does
pi05 policy has action queue as an attribute.
Many lerobot scripts like
lerobot-eval(env.max_parallel_tasks > 1) orasync-inferenceuse python threading.Because of that the queue is shared across all the threads.
This leads to each thread getting random action, from another thread.
How it was tested
You will get close to 0 success rate and random shaky videos.
I added a feature, so that action queue is unique for each thread.
How to checkout & try? (for the reviewer)
Provide a simple way for the reviewer to try out your changes.
You'll get decent performance.