-
Notifications
You must be signed in to change notification settings - Fork 3k
fix(policies): Fix pi05 incorrect behaviour when threading #2441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |||||||
| import builtins | ||||||||
| import logging | ||||||||
| import math | ||||||||
| import threading | ||||||||
| from collections import deque | ||||||||
| from pathlib import Path | ||||||||
| from typing import TYPE_CHECKING, Literal | ||||||||
|
|
@@ -1028,9 +1029,36 @@ def _fix_pytorch_state_dict_keys( | |||||||
| def get_optim_params(self) -> dict: | ||||||||
| return self.parameters() | ||||||||
|
|
||||||||
| def _new_action_queue(self) -> deque: | ||||||||
| """Create a fresh action queue honoring n_action_steps.""" | ||||||||
| return deque(maxlen=self.config.n_action_steps) | ||||||||
|
|
||||||||
| def _get_thread_action_queue(self) -> deque: | ||||||||
| """Return the action queue scoped to the current thread.""" | ||||||||
| if not hasattr(self, "_thread_local"): | ||||||||
| self._thread_local = threading.local() | ||||||||
| action_queue = getattr(self._thread_local, "action_queue", None) | ||||||||
| if action_queue is None: | ||||||||
| action_queue = self._new_action_queue() | ||||||||
| self._thread_local.action_queue = action_queue | ||||||||
| return action_queue | ||||||||
|
|
||||||||
| @property | ||||||||
| def _action_queue(self) -> deque: | ||||||||
| """Expose the thread-local action queue (backwards compatible attribute).""" | ||||||||
| return self._get_thread_action_queue() | ||||||||
|
|
||||||||
| @_action_queue.setter | ||||||||
| def _action_queue(self, queue: deque) -> None: | ||||||||
| if not hasattr(self, "_thread_local"): | ||||||||
| self._thread_local = threading.local() | ||||||||
|
|
||||||||
| self._thread_local.action_queue = queue | ||||||||
|
|
||||||||
| def reset(self): | ||||||||
| """Reset internal state - called when environment resets.""" | ||||||||
| self._action_queue = deque(maxlen=self.config.n_action_steps) | ||||||||
| self._thread_local = threading.local() | ||||||||
|
||||||||
| self._thread_local = threading.local() | |
| if not hasattr(self, "_thread_local"): | |
| self._thread_local = threading.local() |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _queues attribute is not thread-safe. Similar to _action_queue, this dictionary is shared across all threads and could cause the same cross-contamination issues when env.max_parallel_tasks > 1.
Consider applying the same thread-local pattern to _queues or clarifying its intended usage. If it's not actively used elsewhere in the codebase, it might be safe to leave it as-is, but if it's used in multi-threaded contexts, it should also be made thread-local.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider adding a module-level or class-level docstring explaining the thread-safety approach. This would help future maintainers understand why
_action_queueis implemented as a property with thread-local storage. For example: