Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions src/lerobot/datasets/lerobot_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,11 +1160,21 @@ def save_episode(self, episode_data: dict | None = None) -> None:
start_ep = self.num_episodes - self.batch_encoding_size
end_ep = self.num_episodes
self._batch_save_episode_video(start_ep, end_ep)

# Clean up temporary images after successful batch encoding
for ep_idx in range(start_ep, end_ep):
for cam_key in self.meta.camera_keys:
img_dir = self._get_image_file_dir(ep_idx, cam_key)
if img_dir.is_dir():
shutil.rmtree(img_dir)

self.episodes_since_last_encoding = 0

if not episode_data:
# Reset episode buffer and clean up temporary images (if not already deleted during video encoding)
self.clear_episode_buffer(delete_images=len(self.meta.image_keys) > 0)
# For batched encoding, keep images until batch encoding is complete
delete_images = len(self.meta.image_keys) > 0 and not (has_video_keys and use_batched_encoding)
self.clear_episode_buffer(delete_images=delete_images)

def _batch_save_episode_video(self, start_episode: int, end_episode: int | None = None) -> None:
"""
Expand All @@ -1181,6 +1191,14 @@ def _batch_save_episode_video(self, start_episode: int, end_episode: int | None
f"Batch encoding {self.batch_encoding_size} videos for episodes {start_episode} to {end_episode - 1}"
)

# Close writer to ensure all episode metadata is flushed and written to disk completely
# This is necessary because ParquetWriter buffers data and only writes complete files on close
self.meta._close_writer()

# Reload episodes to ensure we have the latest metadata for all episodes,
# especially when resuming recording with batch encoding enabled
self.meta.episodes = load_episodes(self.root)

chunk_idx = self.meta.episodes[start_episode]["data/chunk_index"]
file_idx = self.meta.episodes[start_episode]["data/file_index"]
episode_df_path = self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
Expand Down Expand Up @@ -1326,8 +1344,13 @@ def _save_episode_video(self, video_key: str, episode_index: int) -> dict:
):
# Initialize indices for a new dataset made of the first episode data
chunk_idx, file_idx = 0, 0
if self.meta.episodes is not None and len(self.meta.episodes) > 0:
# It means we are resuming recording, so we need to load the latest episode
if (
self.meta.episodes is not None
and len(self.meta.episodes) > 0
and f"videos/{video_key}/chunk_index" in self.meta.episodes[-1]
and self.meta.episodes[-1][f"videos/{video_key}/chunk_index"] is not None
):
# It means we are resuming recording with existing video metadata
# Update the indices to avoid overwriting the latest episode
old_chunk_idx = self.meta.episodes[-1][f"videos/{video_key}/chunk_index"]
old_file_idx = self.meta.episodes[-1][f"videos/{video_key}/file_index"]
Expand Down
9 changes: 9 additions & 0 deletions src/lerobot/datasets/video_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,15 @@ def __exit__(self, exc_type, exc_val, exc_tb):
)
self.dataset._batch_save_episode_video(start_ep, end_ep)

# Clean up temporary images after successful batch encoding
for ep_idx in range(start_ep, end_ep):
for cam_key in self.dataset.meta.camera_keys:
img_dir = self.dataset._get_image_file_dir(ep_idx, cam_key)
if img_dir.is_dir():
shutil.rmtree(img_dir)

self.dataset.episodes_since_last_encoding = 0

# Finalize the dataset to properly close all writers
self.dataset.finalize()

Expand Down