Skip to content

Conversation

@grach0v
Copy link

@grach0v grach0v commented Nov 14, 2025

What this does

pi05 policy has action queue as an attribute.
Many lerobot scripts like lerobot-eval (env.max_parallel_tasks > 1) or async-inference use python threading.
Because of that the queue is shared across all the threads.
This leads to each thread getting random action, from another thread.

How it was tested

uv sync 
uv pip install -e ".[pi, libero]"
uv run lerobot-eval   --output_dir=/logs/   --env.type=libero   --env.task=libero_spatial,libero_object   --eval.batch_size=1   --eval.n_episodes=10   --policy.path=lerobot/pi05_libero_finetuned   --policy.n_action_steps=10   --output_dir=./eval_logs/   --env.max_parallel_tasks=4 --policy.compile_model=false

You will get close to 0 success rate and random shaky videos.

I added a feature, so that action queue is unique for each thread.

How to checkout & try? (for the reviewer)

Provide a simple way for the reviewer to try out your changes.

uv sync 
uv pip install -e ".[pi, libero]"
uv run lerobot-eval   --output_dir=/logs/   --env.type=libero   --env.task=libero_spatial,libero_object   --eval.batch_size=1   --eval.n_episodes=10   --policy.path=lerobot/pi05_libero_finetuned   --policy.n_action_steps=10   --output_dir=./eval_logs/   --env.max_parallel_tasks=4 --policy.compile_model=false

You'll get decent performance.

Copilot AI review requested due to automatic review settings November 14, 2025 13:42
Copilot finished reviewing on behalf of grach0v November 14, 2025 13:45
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a critical threading bug in the pi05 policy where the action queue was shared across all threads, causing random action cross-contamination when using multi-threaded evaluation (e.g., env.max_parallel_tasks > 1).

Key Changes

  • Introduced thread-local storage using threading.local() to isolate action queues per thread
  • Converted _action_queue to a property with getter/setter that accesses thread-local storage
  • Added helper methods _new_action_queue() and _get_thread_action_queue() to manage thread-local queues

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 1062 to 1064
self._queues = {
ACTION: deque(maxlen=self.config.n_action_steps),
}
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _queues attribute is not thread-safe. Similar to _action_queue, this dictionary is shared across all threads and could cause the same cross-contamination issues when env.max_parallel_tasks > 1.

Consider applying the same thread-local pattern to _queues or clarifying its intended usage. If it's not actively used elsewhere in the codebase, it might be safe to leave it as-is, but if it's used in multi-threaded contexts, it should also be made thread-local.

Copilot uses AI. Check for mistakes.
Comment on lines +1032 to +1057
def _new_action_queue(self) -> deque:
"""Create a fresh action queue honoring n_action_steps."""
return deque(maxlen=self.config.n_action_steps)

def _get_thread_action_queue(self) -> deque:
"""Return the action queue scoped to the current thread."""
if not hasattr(self, "_thread_local"):
self._thread_local = threading.local()
action_queue = getattr(self._thread_local, "action_queue", None)
if action_queue is None:
action_queue = self._new_action_queue()
self._thread_local.action_queue = action_queue
return action_queue

@property
def _action_queue(self) -> deque:
"""Expose the thread-local action queue (backwards compatible attribute)."""
return self._get_thread_action_queue()

@_action_queue.setter
def _action_queue(self, queue: deque) -> None:
if not hasattr(self, "_thread_local"):
self._thread_local = threading.local()

self._thread_local.action_queue = queue

Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider adding a module-level or class-level docstring explaining the thread-safety approach. This would help future maintainers understand why _action_queue is implemented as a property with thread-local storage. For example:

"""
Thread-Safety:
    This policy uses thread-local storage for the action queue to support
    multi-threaded evaluation scenarios (e.g., env.max_parallel_tasks > 1).
    Each thread maintains its own action queue to prevent cross-contamination
    of actions between parallel environments.
"""

Copilot uses AI. Check for mistakes.
def reset(self):
"""Reset internal state - called when environment resets."""
self._action_queue = deque(maxlen=self.config.n_action_steps)
self._thread_local = threading.local()
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reset() method unconditionally creates a new threading.local() object, which defeats the purpose of thread-local storage. This will clear the action queue for ALL threads, not just the current thread.

Instead, only the current thread's action queue should be reset. Consider this approach:

def reset(self):
    """Reset internal state - called when environment resets."""
    if not hasattr(self, "_thread_local"):
        self._thread_local = threading.local()
    self._action_queue = self._new_action_queue()
    self._queues = {
        ACTION: deque(maxlen=self.config.n_action_steps),
    }

This ensures that _thread_local is initialized only once per policy instance, while still resetting the current thread's action queue.

Suggested change
self._thread_local = threading.local()
if not hasattr(self, "_thread_local"):
self._thread_local = threading.local()

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants