diff --git a/getm/concurrent/buffers.py b/getm/concurrent/buffers.py index f674b2a..81c5102 100644 --- a/getm/concurrent/buffers.py +++ b/getm/concurrent/buffers.py @@ -22,7 +22,11 @@ def __init__(self, name: Optional[str]=None, size: int=0, create=False): self._did_create = True else: self._shared_memory = SharedMemory(name) - self._view = self._shared_memory.buf + # self._view = self._shared_memory.buf + + @property + def _view(self): + return self._shared_memory.buf @property def size(self): diff --git a/getm/reader.py b/getm/reader.py index f3b1000..5efe08c 100644 --- a/getm/reader.py +++ b/getm/reader.py @@ -1,6 +1,9 @@ import io +import sys import time +import logging import warnings +import traceback from math import ceil from collections import deque from multiprocessing import Process @@ -12,6 +15,9 @@ from getm.concurrent import ConcurrentQueue, ConcurrentPool, SharedCircularBuffer, SharedBufferArray +logger = logging.getLogger(__name__) + + class BaseURLReader(io.IOBase): def readable(self): return True @@ -176,6 +182,13 @@ def compute_buffer_size(concurrent_downloads: int, chunk_size: int) -> int: return buffer_size def run(self): + try: + self._run() + except Exception as e: + logger.error(f"reader subprocesses exiting due to '{traceback.format_exc()}'") + sys.exit(1) + + def _run(self): with http_session().get(self.url, stream=True) as resp: handle = resp.raw start = stop = 0 @@ -193,21 +206,32 @@ def run(self): stop += bytes_read buf.stop = stop - def read(self, sz: int=-1): + def _check_buf(self) -> SharedCircularBuffer: + if self.exitcode: + raise RuntimeError(f"subprocesses quit with non-zero exitcode {self.exitcode}") + return self._buf + + def _read(self, sz: int=-1): if -1 == sz: sz = self.max_read - self._buf.start = self._start + self._check_buf().start = self._start sz = min(sz, self.max_read) while sz > self._stop - self._start and self._stop < self.size: - self._stop = self._buf.stop + self._stop = self._check_buf().stop sz = min(sz, self._stop - self._start) if sz: - res = self._buf[self._start: self._start + sz] + res = self._check_buf()[self._start: self._start + sz] self._start += len(res) return res else: return memoryview(bytes()) + def read(self, sz: int=-1): + try: + return self._read(sz) + except RuntimeError: + return memoryview(bytes()) + def readinto(self, buff: bytearray) -> int: d = self.read(len(buff)) bytes_read = len(d) @@ -220,6 +244,8 @@ def close(self): self.join(timeout=5) self._buf.close() super().close() + if self.exitcode: + raise RuntimeError() def __enter__(self): self.start()