From d6644e33eeb5eea428f79aff5a06c8bc55070fb7 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 7 Jan 2026 14:23:11 -0800 Subject: [PATCH 1/8] use aiohttp in remote rollout processor to fix the open files issue --- .../pytest/remote_rollout_processor.py | 67 +++++++------------ 1 file changed, 26 insertions(+), 41 deletions(-) diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index aa1c5d44..cdee40d7 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -1,14 +1,10 @@ import asyncio import time -from typing import Any, Dict, List, Optional +from typing import List, Optional -import requests +import aiohttp from eval_protocol.models import EvaluationRow, Status -from eval_protocol.data_loader.dynamic_data_loader import DynamicDataLoader -from eval_protocol.types.remote_rollout_processor import ( - DataLoaderConfig, -) from eval_protocol.adapters.fireworks_tracing import FireworksTracingAdapter from eval_protocol.exceptions import exception_for_status_code @@ -88,48 +84,24 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: init_payload = build_init_request(row, config, model_base_url) # Fire-and-poll - def _post_init() -> None: - url = f"{remote_base_url}/init" + init_url = f"{remote_base_url}/init" + + timeout_init = aiohttp.ClientTimeout(total=300) + + async with aiohttp.ClientSession() as session: try: - r = requests.post(url, json=init_payload.model_dump(), timeout=300) - r.raise_for_status() - except requests.exceptions.Timeout: + async with session.post(init_url, json=init_payload.model_dump(), timeout=timeout_init) as resp: + if resp.status >= 400: + body = await resp.text() + raise RuntimeError(f"Remote /init failed (HTTP {resp.status}): {body}") + except asyncio.TimeoutError: raise TimeoutError( - f"The /init endpoint tried {url} with {init_payload.model_dump()} but timed out after 300 seconds." + f"The /init endpoint tried {init_url} with {init_payload.model_dump()} but timed out after 300 seconds." ) - await asyncio.to_thread(_post_init) - - terminated = False deadline = time.time() + timeout_seconds - def _get_status() -> Dict[str, Any]: - url = f"{remote_base_url}/status" - r = requests.get(url, params={"rollout_id": row.execution_metadata.rollout_id}, timeout=15) - r.raise_for_status() - return r.json() - - continue_polling_status = True while time.time() < deadline: - try: - if continue_polling_status: - status = await asyncio.to_thread(_get_status) - terminated = bool(status.get("terminated", False)) - if terminated: - break - except requests.exceptions.HTTPError as e: - if e.response is not None and e.response.status_code == 404: - # 404 means server doesn't implement /status endpoint, stop polling - logger.debug( - f"Server doesn't implement /status endpoint (404), stopping status polling for rollout {row.execution_metadata.rollout_id}" - ) - continue_polling_status = False - else: - raise - except Exception: - # For all other exceptions, raise them - raise - # Search Fireworks tracing logs for completion (run in thread to avoid blocking event loop) completed_logs = await asyncio.to_thread( self._tracing_adapter.search_logs, tags=[f"rollout_id:{row.execution_metadata.rollout_id}"] @@ -142,6 +114,17 @@ def _get_status() -> Dict[str, Any]: status_logs.append(log) if status_logs: + # finished_logs = [] + # for log in status_logs: + # sd = log.get("status") or {} + # if isinstance(sd, dict) and sd.get("code") == Status.Code.FINISHED: + # finished_logs.append(log) + # if len(finished_logs) > 1: + # logger.warning( + # "Found %s FINISHED status logs for rollout %s; expected at most 1. Using the first one.", + # len(finished_logs), + # row.execution_metadata.rollout_id, + # ) # Use the first log with status information status_log = status_logs[0] status_dict = status_log.get("status") @@ -169,6 +152,8 @@ def _get_status() -> Dict[str, Any]: details=status_details, ) + # then add the log extras to be stuffed into row.artifacts or something + logger.info("Stopping polling for rollout %s", row.execution_metadata.rollout_id) break From 93bdef9029361cbaa40bde8e0d44d63cdafea498 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 7 Jan 2026 15:00:28 -0800 Subject: [PATCH 2/8] update --- eval_protocol/pytest/evaluation_test_utils.py | 10 ++-- .../pytest/remote_rollout_processor.py | 48 ++++++++++++++----- eval_protocol/training/gepa_trainer.py | 6 ++- 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/eval_protocol/pytest/evaluation_test_utils.py b/eval_protocol/pytest/evaluation_test_utils.py index 48f8a015..628a6100 100644 --- a/eval_protocol/pytest/evaluation_test_utils.py +++ b/eval_protocol/pytest/evaluation_test_utils.py @@ -371,7 +371,7 @@ async def execute_row_with_backoff_retry(row: EvaluationRow) -> EvaluationRow: retry_config = replace(config, kwargs={**(config.kwargs or {}), "start_server": False}) retry_tasks = rollout_processor([row], retry_config) result = await retry_tasks[0] - + # Apply post-processing quality checks if configured # This must be inside the retry function so ResponseQualityError can trigger retries if config.post_processor is not None: @@ -380,7 +380,7 @@ async def execute_row_with_backoff_retry(row: EvaluationRow) -> EvaluationRow: except ResponseQualityError as quality_error: # Re-raise ResponseQualityError to trigger retry logic raise quality_error - + return result async def execute_row_with_backoff(task: asyncio.Task[EvaluationRow], row: EvaluationRow) -> EvaluationRow: @@ -464,7 +464,11 @@ async def execute_row_with_backoff_and_log( yield result finally: - rollout_processor.cleanup() + # Prefer async cleanup if available, fall back to sync + if hasattr(rollout_processor, "aclose"): + await getattr(rollout_processor, "aclose")() + else: + rollout_processor.cleanup() def sanitize_filename(text: str) -> str: diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index cdee40d7..c27707cd 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -47,6 +47,14 @@ def __init__( self._poll_interval = poll_interval self._timeout_seconds = timeout_seconds self._tracing_adapter = FireworksTracingAdapter(base_url=self._model_base_url) + self._session: Optional[aiohttp.ClientSession] = None + self._session_lock = asyncio.Lock() + + async def _get_session(self) -> aiohttp.ClientSession: + async with self._session_lock: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession() + return self._session def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]: tasks: List[asyncio.Task[EvaluationRow]] = [] @@ -88,16 +96,18 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: timeout_init = aiohttp.ClientTimeout(total=300) - async with aiohttp.ClientSession() as session: - try: - async with session.post(init_url, json=init_payload.model_dump(), timeout=timeout_init) as resp: - if resp.status >= 400: - body = await resp.text() - raise RuntimeError(f"Remote /init failed (HTTP {resp.status}): {body}") - except asyncio.TimeoutError: - raise TimeoutError( - f"The /init endpoint tried {init_url} with {init_payload.model_dump()} but timed out after 300 seconds." - ) + try: + session = await self._get_session() + async with session.post(init_url, json=init_payload.model_dump(), timeout=timeout_init) as resp: + if resp.status >= 400: + body = await resp.text() + raise RuntimeError(f"Remote /init failed (HTTP {resp.status}): {body}") + resp.raise_for_status() + await resp.read() # Drain the response body and release the connection back to the pool + except asyncio.TimeoutError: + raise TimeoutError( + f"The /init endpoint tried {init_url} with {init_payload.model_dump()} but timed out after 300 seconds." + ) deadline = time.time() + timeout_seconds @@ -185,5 +195,21 @@ async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow: tasks = [asyncio.create_task(_sem_wrapper(row)) for row in rows] return tasks + async def aclose(self) -> None: + """Async cleanup - preferred when you can await.""" + if self._session and not self._session.closed: + await self._session.close() + def cleanup(self) -> None: - return None + """Sync cleanup - best-effort, schedules close if event loop is running.""" + if self._session and not self._session.closed: + try: + loop = asyncio.get_running_loop() + loop.create_task(self._session.close()) + except RuntimeError: + # No running event loop - can't safely close the session. + # The session will be garbage collected eventually, but warn about it. + logger.warning( + "RemoteRolloutProcessor.cleanup() called outside of async context. " + "Session may not be properly closed. Use `await processor.aclose()` when possible." + ) diff --git a/eval_protocol/training/gepa_trainer.py b/eval_protocol/training/gepa_trainer.py index d91efe67..9409586d 100644 --- a/eval_protocol/training/gepa_trainer.py +++ b/eval_protocol/training/gepa_trainer.py @@ -503,7 +503,11 @@ async def evaluate_with_ep( } finally: - rollout_processor.cleanup() + # Prefer async cleanup if available, fall back to sync + if hasattr(rollout_processor, "aclose"): + await getattr(rollout_processor, "aclose")() + else: + rollout_processor.cleanup() def run_ep_evaluation( self, From 4a27a12db6b740648c41dba57cd85bda22adc0b4 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 7 Jan 2026 15:06:05 -0800 Subject: [PATCH 3/8] remove comments --- eval_protocol/pytest/remote_rollout_processor.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index c27707cd..2ebfa4a6 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -124,17 +124,6 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: status_logs.append(log) if status_logs: - # finished_logs = [] - # for log in status_logs: - # sd = log.get("status") or {} - # if isinstance(sd, dict) and sd.get("code") == Status.Code.FINISHED: - # finished_logs.append(log) - # if len(finished_logs) > 1: - # logger.warning( - # "Found %s FINISHED status logs for rollout %s; expected at most 1. Using the first one.", - # len(finished_logs), - # row.execution_metadata.rollout_id, - # ) # Use the first log with status information status_log = status_logs[0] status_dict = status_log.get("status") @@ -162,8 +151,6 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: details=status_details, ) - # then add the log extras to be stuffed into row.artifacts or something - logger.info("Stopping polling for rollout %s", row.execution_metadata.rollout_id) break From d424168d5eab377c15a8e5f24aac7224247ebc5b Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 7 Jan 2026 15:17:12 -0800 Subject: [PATCH 4/8] fix test --- tests/pytest/test_utils.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/pytest/test_utils.py b/tests/pytest/test_utils.py index 0176c279..b6e0ca05 100644 --- a/tests/pytest/test_utils.py +++ b/tests/pytest/test_utils.py @@ -1,5 +1,5 @@ import asyncio -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock import pytest from eval_protocol.pytest.evaluation_test_utils import rollout_processor_with_retry @@ -16,6 +16,7 @@ def mock_rollout_processor(self): """Create a mock rollout processor that returns async tasks.""" processor = MagicMock() processor.cleanup = MagicMock() + processor.aclose = AsyncMock() # async cleanup method return processor @pytest.fixture @@ -71,8 +72,8 @@ async def mock_task(): assert mock_config.logger.log.call_count == 1 mock_config.logger.log.assert_called_once_with(results[0]) - # Verify cleanup was called - mock_rollout_processor.cleanup.assert_called_once() + # Verify async cleanup was called (aclose is preferred over cleanup) + mock_rollout_processor.aclose.assert_awaited_once() @pytest.mark.asyncio async def test_logger_called_on_failed_execution(self, mock_rollout_processor, mock_config, sample_dataset): @@ -97,8 +98,8 @@ async def failing_task(): assert results[0].rollout_status.code == 13 # INTERNAL error code assert "Test error" in results[0].rollout_status.message - # Verify cleanup was called - mock_rollout_processor.cleanup.assert_called_once() + # Verify async cleanup was called (aclose is preferred over cleanup) + mock_rollout_processor.aclose.assert_awaited_once() @pytest.mark.asyncio async def test_logger_called_on_retry_execution(self, mock_rollout_processor, mock_config, sample_dataset): @@ -134,8 +135,8 @@ async def flaky_task(): assert mock_config.logger.log.call_count == 1 mock_config.logger.log.assert_called_once_with(results[0]) - # Verify cleanup was called - mock_rollout_processor.cleanup.assert_called_once() + # Verify async cleanup was called (aclose is preferred over cleanup) + mock_rollout_processor.aclose.assert_awaited_once() @pytest.mark.asyncio async def test_logger_called_for_multiple_rows(self, mock_rollout_processor, mock_config): @@ -182,8 +183,8 @@ async def mock_task(): assert mock_config.logger.log.call_count == 2 assert len(results) == 2 - # Verify cleanup was called - mock_rollout_processor.cleanup.assert_called_once() + # Verify async cleanup was called (aclose is preferred over cleanup) + mock_rollout_processor.aclose.assert_awaited_once() @pytest.mark.asyncio async def test_logger_called_even_when_processor_fails_to_initialize( From 81680341807a4babfa97cc8b3de92886787ca02e Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 7 Jan 2026 15:24:40 -0800 Subject: [PATCH 5/8] fixed --- tests/pytest/test_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pytest/test_utils.py b/tests/pytest/test_utils.py index b6e0ca05..cd7af469 100644 --- a/tests/pytest/test_utils.py +++ b/tests/pytest/test_utils.py @@ -199,5 +199,5 @@ async def test_logger_called_even_when_processor_fails_to_initialize( async for result in rollout_processor_with_retry(mock_rollout_processor, sample_dataset, mock_config): pass - # Verify cleanup was called even though the function failed - mock_rollout_processor.cleanup.assert_called_once() + # Verify async cleanup was called even though the function failed + mock_rollout_processor.aclose.assert_awaited_once() From be3107b7bdf96ed8f309be2310ce14df973cbd96 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 7 Jan 2026 15:30:35 -0800 Subject: [PATCH 6/8] update --- eval_protocol/pytest/remote_rollout_processor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 2ebfa4a6..eaaddf74 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -50,7 +50,7 @@ def __init__( self._session: Optional[aiohttp.ClientSession] = None self._session_lock = asyncio.Lock() - async def _get_session(self) -> aiohttp.ClientSession: + async def _get_or_create_session(self) -> aiohttp.ClientSession: async with self._session_lock: if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() @@ -97,7 +97,7 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: timeout_init = aiohttp.ClientTimeout(total=300) try: - session = await self._get_session() + session = await self._get_or_create_session() async with session.post(init_url, json=init_payload.model_dump(), timeout=timeout_init) as resp: if resp.status >= 400: body = await resp.text() From c099cc916addbfd9bbf07fa2da77f55f7c832546 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 7 Jan 2026 15:33:05 -0800 Subject: [PATCH 7/8] rename --- eval_protocol/pytest/evaluation_test_utils.py | 4 ++-- eval_protocol/pytest/remote_rollout_processor.py | 4 ++-- eval_protocol/training/gepa_trainer.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/eval_protocol/pytest/evaluation_test_utils.py b/eval_protocol/pytest/evaluation_test_utils.py index 628a6100..24325aa5 100644 --- a/eval_protocol/pytest/evaluation_test_utils.py +++ b/eval_protocol/pytest/evaluation_test_utils.py @@ -465,8 +465,8 @@ async def execute_row_with_backoff_and_log( finally: # Prefer async cleanup if available, fall back to sync - if hasattr(rollout_processor, "aclose"): - await getattr(rollout_processor, "aclose")() + if hasattr(rollout_processor, "acleanup"): + await getattr(rollout_processor, "acleanup")() else: rollout_processor.cleanup() diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index eaaddf74..6ff4d802 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -182,7 +182,7 @@ async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow: tasks = [asyncio.create_task(_sem_wrapper(row)) for row in rows] return tasks - async def aclose(self) -> None: + async def acleanup(self) -> None: """Async cleanup - preferred when you can await.""" if self._session and not self._session.closed: await self._session.close() @@ -198,5 +198,5 @@ def cleanup(self) -> None: # The session will be garbage collected eventually, but warn about it. logger.warning( "RemoteRolloutProcessor.cleanup() called outside of async context. " - "Session may not be properly closed. Use `await processor.aclose()` when possible." + "Session may not be properly closed. Use `await processor.acleanup()` when possible." ) diff --git a/eval_protocol/training/gepa_trainer.py b/eval_protocol/training/gepa_trainer.py index 9409586d..74ba70cd 100644 --- a/eval_protocol/training/gepa_trainer.py +++ b/eval_protocol/training/gepa_trainer.py @@ -504,8 +504,8 @@ async def evaluate_with_ep( finally: # Prefer async cleanup if available, fall back to sync - if hasattr(rollout_processor, "aclose"): - await getattr(rollout_processor, "aclose")() + if hasattr(rollout_processor, "acleanup"): + await getattr(rollout_processor, "acleanup")() else: rollout_processor.cleanup() From e913ca1808583cdce1c5094705879af4a118d196 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 7 Jan 2026 15:50:55 -0800 Subject: [PATCH 8/8] update --- eval_protocol/pytest/evaluation_test_utils.py | 7 ++----- eval_protocol/pytest/remote_rollout_processor.py | 12 +++++------- eval_protocol/pytest/rollout_processor.py | 4 ++++ eval_protocol/training/gepa_trainer.py | 7 ++----- tests/pytest/test_utils.py | 12 ++++++------ 5 files changed, 19 insertions(+), 23 deletions(-) diff --git a/eval_protocol/pytest/evaluation_test_utils.py b/eval_protocol/pytest/evaluation_test_utils.py index 24325aa5..64f0c8b3 100644 --- a/eval_protocol/pytest/evaluation_test_utils.py +++ b/eval_protocol/pytest/evaluation_test_utils.py @@ -464,11 +464,8 @@ async def execute_row_with_backoff_and_log( yield result finally: - # Prefer async cleanup if available, fall back to sync - if hasattr(rollout_processor, "acleanup"): - await getattr(rollout_processor, "acleanup")() - else: - rollout_processor.cleanup() + await rollout_processor.acleanup() + rollout_processor.cleanup() def sanitize_filename(text: str) -> str: diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 6ff4d802..374978e1 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -48,13 +48,11 @@ def __init__( self._timeout_seconds = timeout_seconds self._tracing_adapter = FireworksTracingAdapter(base_url=self._model_base_url) self._session: Optional[aiohttp.ClientSession] = None - self._session_lock = asyncio.Lock() - async def _get_or_create_session(self) -> aiohttp.ClientSession: - async with self._session_lock: - if self._session is None or self._session.closed: - self._session = aiohttp.ClientSession() - return self._session + def _get_or_create_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession() + return self._session def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]: tasks: List[asyncio.Task[EvaluationRow]] = [] @@ -97,7 +95,7 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: timeout_init = aiohttp.ClientTimeout(total=300) try: - session = await self._get_or_create_session() + session = self._get_or_create_session() async with session.post(init_url, json=init_payload.model_dump(), timeout=timeout_init) as resp: if resp.status >= 400: body = await resp.text() diff --git a/eval_protocol/pytest/rollout_processor.py b/eval_protocol/pytest/rollout_processor.py index 95fbfa1b..c15413d1 100644 --- a/eval_protocol/pytest/rollout_processor.py +++ b/eval_protocol/pytest/rollout_processor.py @@ -19,6 +19,10 @@ def __call__(self, rows: list[EvaluationRow], config: RolloutProcessorConfig) -> """Process evaluation rows and return async tasks. Must be implemented by subclasses.""" pass + async def acleanup(self) -> None: + """Async cleanup - preferred when you can await.""" + pass + def cleanup(self) -> None: """Cleanup resources. Override in subclasses if cleanup is needed.""" pass diff --git a/eval_protocol/training/gepa_trainer.py b/eval_protocol/training/gepa_trainer.py index 74ba70cd..d8625bf2 100644 --- a/eval_protocol/training/gepa_trainer.py +++ b/eval_protocol/training/gepa_trainer.py @@ -503,11 +503,8 @@ async def evaluate_with_ep( } finally: - # Prefer async cleanup if available, fall back to sync - if hasattr(rollout_processor, "acleanup"): - await getattr(rollout_processor, "acleanup")() - else: - rollout_processor.cleanup() + await rollout_processor.acleanup() + rollout_processor.cleanup() def run_ep_evaluation( self, diff --git a/tests/pytest/test_utils.py b/tests/pytest/test_utils.py index cd7af469..09378fb7 100644 --- a/tests/pytest/test_utils.py +++ b/tests/pytest/test_utils.py @@ -16,7 +16,7 @@ def mock_rollout_processor(self): """Create a mock rollout processor that returns async tasks.""" processor = MagicMock() processor.cleanup = MagicMock() - processor.aclose = AsyncMock() # async cleanup method + processor.acleanup = AsyncMock() # async cleanup method return processor @pytest.fixture @@ -73,7 +73,7 @@ async def mock_task(): mock_config.logger.log.assert_called_once_with(results[0]) # Verify async cleanup was called (aclose is preferred over cleanup) - mock_rollout_processor.aclose.assert_awaited_once() + mock_rollout_processor.acleanup.assert_awaited_once() @pytest.mark.asyncio async def test_logger_called_on_failed_execution(self, mock_rollout_processor, mock_config, sample_dataset): @@ -99,7 +99,7 @@ async def failing_task(): assert "Test error" in results[0].rollout_status.message # Verify async cleanup was called (aclose is preferred over cleanup) - mock_rollout_processor.aclose.assert_awaited_once() + mock_rollout_processor.acleanup.assert_awaited_once() @pytest.mark.asyncio async def test_logger_called_on_retry_execution(self, mock_rollout_processor, mock_config, sample_dataset): @@ -136,7 +136,7 @@ async def flaky_task(): mock_config.logger.log.assert_called_once_with(results[0]) # Verify async cleanup was called (aclose is preferred over cleanup) - mock_rollout_processor.aclose.assert_awaited_once() + mock_rollout_processor.acleanup.assert_awaited_once() @pytest.mark.asyncio async def test_logger_called_for_multiple_rows(self, mock_rollout_processor, mock_config): @@ -184,7 +184,7 @@ async def mock_task(): assert len(results) == 2 # Verify async cleanup was called (aclose is preferred over cleanup) - mock_rollout_processor.aclose.assert_awaited_once() + mock_rollout_processor.acleanup.assert_awaited_once() @pytest.mark.asyncio async def test_logger_called_even_when_processor_fails_to_initialize( @@ -200,4 +200,4 @@ async def test_logger_called_even_when_processor_fails_to_initialize( pass # Verify async cleanup was called even though the function failed - mock_rollout_processor.aclose.assert_awaited_once() + mock_rollout_processor.acleanup.assert_awaited_once()