diff --git a/.gitignore b/.gitignore index 691e813..69cbd43 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,18 @@ dmypy.json # Pyre type checker .pyre/ + +# Sublime Text +*.sublime-workspace +*.sublime-project + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + +# Ruff +.ruff_cache/ diff --git a/libs/coverage-7.2.3-cp38-cp38-macosx_10_9_x86_64.whl b/libs/coverage-7.2.3-cp38-cp38-macosx_10_9_x86_64.whl deleted file mode 100644 index 4dad176..0000000 Binary files a/libs/coverage-7.2.3-cp38-cp38-macosx_10_9_x86_64.whl and /dev/null differ diff --git a/libs/coverage-7.2.3-cp38-cp38-macosx_11_0_arm64.whl b/libs/coverage-7.2.3-cp38-cp38-macosx_11_0_arm64.whl deleted file mode 100644 index 534aaf5..0000000 Binary files a/libs/coverage-7.2.3-cp38-cp38-macosx_11_0_arm64.whl and /dev/null differ diff --git a/libs/coverage-7.2.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl b/libs/coverage-7.2.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl deleted file mode 100644 index 1766d5c..0000000 Binary files a/libs/coverage-7.2.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl and /dev/null differ diff --git a/libs/coverage-7.2.3-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl b/libs/coverage-7.2.3-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl deleted file mode 100644 index 0a252e1..0000000 Binary files a/libs/coverage-7.2.3-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl and /dev/null differ diff --git a/libs/coverage-7.2.3-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl b/libs/coverage-7.2.3-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl deleted file mode 100644 index 64b7607..0000000 Binary files a/libs/coverage-7.2.3-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl and /dev/null differ diff --git a/libs/coverage-7.2.3-cp38-cp38-musllinux_1_1_aarch64.whl b/libs/coverage-7.2.3-cp38-cp38-musllinux_1_1_aarch64.whl deleted file mode 100644 index c8ee94f..0000000 Binary files a/libs/coverage-7.2.3-cp38-cp38-musllinux_1_1_aarch64.whl and /dev/null differ diff --git a/libs/coverage-7.2.3-cp38-cp38-musllinux_1_1_i686.whl b/libs/coverage-7.2.3-cp38-cp38-musllinux_1_1_i686.whl deleted file mode 100644 index 32f4b05..0000000 Binary files a/libs/coverage-7.2.3-cp38-cp38-musllinux_1_1_i686.whl and /dev/null differ diff --git a/libs/coverage-7.2.3-cp38-cp38-musllinux_1_1_x86_64.whl b/libs/coverage-7.2.3-cp38-cp38-musllinux_1_1_x86_64.whl deleted file mode 100644 index b5b09b8..0000000 Binary files a/libs/coverage-7.2.3-cp38-cp38-musllinux_1_1_x86_64.whl and /dev/null differ diff --git a/libs/coverage-7.2.3-cp38-cp38-win32.whl b/libs/coverage-7.2.3-cp38-cp38-win32.whl deleted file mode 100644 index ce6108c..0000000 Binary files a/libs/coverage-7.2.3-cp38-cp38-win32.whl and /dev/null differ diff --git a/libs/coverage-7.2.3-cp38-cp38-win_amd64.whl b/libs/coverage-7.2.3-cp38-cp38-win_amd64.whl deleted file mode 100644 index 05d0a45..0000000 Binary files a/libs/coverage-7.2.3-cp38-cp38-win_amd64.whl and /dev/null differ diff --git a/libs/coverage-7.5.4-cp38-cp38-macosx_10_9_x86_64.whl b/libs/coverage-7.5.4-cp38-cp38-macosx_10_9_x86_64.whl new file mode 100644 index 0000000..475d1c4 Binary files /dev/null and b/libs/coverage-7.5.4-cp38-cp38-macosx_10_9_x86_64.whl differ diff --git a/libs/coverage-7.5.4-cp38-cp38-macosx_11_0_arm64.whl b/libs/coverage-7.5.4-cp38-cp38-macosx_11_0_arm64.whl new file mode 100644 index 0000000..d591ec5 Binary files /dev/null and b/libs/coverage-7.5.4-cp38-cp38-macosx_11_0_arm64.whl differ diff --git a/libs/coverage-7.5.4-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl b/libs/coverage-7.5.4-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl new file mode 100644 index 0000000..8ba49ba Binary files /dev/null and b/libs/coverage-7.5.4-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl differ diff --git a/libs/coverage-7.5.4-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl b/libs/coverage-7.5.4-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl new file mode 100644 index 0000000..6ca341e Binary files /dev/null and b/libs/coverage-7.5.4-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl differ diff --git a/libs/coverage-7.5.4-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl b/libs/coverage-7.5.4-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl new file mode 100644 index 0000000..c07af06 Binary files /dev/null and b/libs/coverage-7.5.4-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl differ diff --git a/libs/coverage-7.5.4-cp38-cp38-win32.whl b/libs/coverage-7.5.4-cp38-cp38-win32.whl new file mode 100644 index 0000000..5754c9e Binary files /dev/null and b/libs/coverage-7.5.4-cp38-cp38-win32.whl differ diff --git a/libs/coverage-7.5.4-cp38-cp38-win_amd64.whl b/libs/coverage-7.5.4-cp38-cp38-win_amd64.whl new file mode 100644 index 0000000..ec9eca2 Binary files /dev/null and b/libs/coverage-7.5.4-cp38-cp38-win_amd64.whl differ diff --git a/libs/packaging-23.1-py3-none-any.whl b/libs/packaging-23.1-py3-none-any.whl deleted file mode 100644 index 46b2f2f..0000000 Binary files a/libs/packaging-23.1-py3-none-any.whl and /dev/null differ diff --git a/libs/watchdog-3.0.0-cp38-cp38-macosx_10_9_x86_64.whl b/libs/watchdog-3.0.0-cp38-cp38-macosx_10_9_x86_64.whl deleted file mode 100644 index ca036ff..0000000 Binary files a/libs/watchdog-3.0.0-cp38-cp38-macosx_10_9_x86_64.whl and /dev/null differ diff --git a/libs/watchdog-3.0.0-cp38-cp38-macosx_11_0_arm64.whl b/libs/watchdog-3.0.0-cp38-cp38-macosx_11_0_arm64.whl deleted file mode 100644 index 1809321..0000000 Binary files a/libs/watchdog-3.0.0-cp38-cp38-macosx_11_0_arm64.whl and /dev/null differ diff --git a/libs/watchdog-3.0.0-py3-none-manylinux2014_aarch64.whl b/libs/watchdog-3.0.0-py3-none-manylinux2014_aarch64.whl deleted file mode 100644 index 2d0b19c..0000000 Binary files a/libs/watchdog-3.0.0-py3-none-manylinux2014_aarch64.whl and /dev/null differ diff --git a/libs/watchdog-3.0.0-py3-none-manylinux2014_i686.whl b/libs/watchdog-3.0.0-py3-none-manylinux2014_i686.whl deleted file mode 100644 index cd4028e..0000000 Binary files a/libs/watchdog-3.0.0-py3-none-manylinux2014_i686.whl and /dev/null differ diff --git a/libs/watchdog-3.0.0-py3-none-manylinux2014_x86_64.whl b/libs/watchdog-3.0.0-py3-none-manylinux2014_x86_64.whl deleted file mode 100644 index 276d8b6..0000000 Binary files a/libs/watchdog-3.0.0-py3-none-manylinux2014_x86_64.whl and /dev/null differ diff --git a/libs/watchdog-3.0.0-py3-none-win32.whl b/libs/watchdog-3.0.0-py3-none-win32.whl deleted file mode 100644 index 651ffcd..0000000 Binary files a/libs/watchdog-3.0.0-py3-none-win32.whl and /dev/null differ diff --git a/libs/watchdog-3.0.0-py3-none-win_amd64.whl b/libs/watchdog-3.0.0-py3-none-win_amd64.whl deleted file mode 100644 index 6482122..0000000 Binary files a/libs/watchdog-3.0.0-py3-none-win_amd64.whl and /dev/null differ diff --git a/libs/watchdog-4.0.2-cp38-cp38-macosx_10_9_x86_64.whl b/libs/watchdog-4.0.2-cp38-cp38-macosx_10_9_x86_64.whl new file mode 100644 index 0000000..ab6be75 Binary files /dev/null and b/libs/watchdog-4.0.2-cp38-cp38-macosx_10_9_x86_64.whl differ diff --git a/libs/watchdog-4.0.2-cp38-cp38-macosx_11_0_arm64.whl b/libs/watchdog-4.0.2-cp38-cp38-macosx_11_0_arm64.whl new file mode 100644 index 0000000..6cb44d0 Binary files /dev/null and b/libs/watchdog-4.0.2-cp38-cp38-macosx_11_0_arm64.whl differ diff --git a/libs/watchdog-4.0.2-py3-none-manylinux2014_aarch64.whl b/libs/watchdog-4.0.2-py3-none-manylinux2014_aarch64.whl new file mode 100644 index 0000000..7107fab Binary files /dev/null and b/libs/watchdog-4.0.2-py3-none-manylinux2014_aarch64.whl differ diff --git a/libs/watchdog-4.0.2-py3-none-manylinux2014_i686.whl b/libs/watchdog-4.0.2-py3-none-manylinux2014_i686.whl new file mode 100644 index 0000000..6cd4db8 Binary files /dev/null and b/libs/watchdog-4.0.2-py3-none-manylinux2014_i686.whl differ diff --git a/libs/watchdog-4.0.2-py3-none-manylinux2014_x86_64.whl b/libs/watchdog-4.0.2-py3-none-manylinux2014_x86_64.whl new file mode 100644 index 0000000..fe9ee28 Binary files /dev/null and b/libs/watchdog-4.0.2-py3-none-manylinux2014_x86_64.whl differ diff --git a/libs/watchdog-4.0.2-py3-none-win32.whl b/libs/watchdog-4.0.2-py3-none-win32.whl new file mode 100644 index 0000000..1da23b5 Binary files /dev/null and b/libs/watchdog-4.0.2-py3-none-win32.whl differ diff --git a/libs/watchdog-4.0.2-py3-none-win_amd64.whl b/libs/watchdog-4.0.2-py3-none-win_amd64.whl new file mode 100644 index 0000000..b70572d Binary files /dev/null and b/libs/watchdog-4.0.2-py3-none-win_amd64.whl differ diff --git a/pyproject.toml b/pyproject.toml index 61b7fc6..f3705b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,12 +14,68 @@ dev = [ "pre-commit", "ruff", "pytest>=7.0.0", + "pytest-cov>=4.0.0", + "pytest-mock>=3.0.0", + # Runtime dependencies needed for testing + # (bundled as wheels in libs/ for Sublime Text runtime) + "coverage>=7.2.0", + "watchdog>=3.0.0", ] -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - [tool.ruff] -# Enable pycodestyle (`E`) and Pyflakes (`F`) codes by default. -select = ["E", "F", "I", "B", "N", "A", "PTH"] +# Target Python 3.8 for Sublime Text compatibility +target-version = "py38" + +# Exclude common directories +exclude = [ + ".git", + ".pytest_cache", + ".ruff_cache", + "__pycache__", + "libs", + "htmlcov", +] + +[tool.ruff.lint] +# Extend default rules with additional checks +extend-select = ["E", "F", "I", "B", "N", "A", "PTH", "RUF", "ARG"] + +[tool.ruff.lint.per-file-ignores] +# Test files can have unused arguments (fixtures) and unused imports (for mocking) +"tests/**/*.py" = ["ARG"] +"tests/mocks/*.py" = ["ARG"] + +[tool.ruff.format] +# Format code to be compatible with Python 3.8 +quote-style = "double" +indent-style = "space" + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] +addopts = [ + "--verbose", + "--cov=.", + "--cov-report=term-missing", + "--cov-report=html", +] + +[tool.coverage.run] +source = ["."] +omit = [ + "tests/*", + "libs/*", + "*/__pycache__/*", +] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "raise AssertionError", + "raise NotImplementedError", + "if __name__ == .__main__.:", + "if TYPE_CHECKING:", +] diff --git a/python-coverage.py b/python-coverage.py index c6bda8e..b0b6298 100644 --- a/python-coverage.py +++ b/python-coverage.py @@ -1,5 +1,8 @@ +import logging import sys +import threading from pathlib import Path +from typing import Dict, Optional import sublime import sublime_plugin @@ -12,123 +15,678 @@ # https://github.com/berendkleinhaneveld/sublime-doorstop/blob/main/doorstop_plugin.py # https://python-watchdog.readthedocs.io/en/stable/ -COVERAGE_FILES = {} -FILE_OBSERVER = None - -FileWatcher = None -LAST_ACTIVE_VIEW = None +# Global state - will be managed by CoverageManager singleton +COVERAGE_MANAGER: Optional["CoverageManager"] = None +ACTIVE_VIEWS = {} # Map view_id -> PythonCoverageEventListener instance SETTINGS_FILE = "python-coverage.sublime-settings" +# Set up logging +logger = logging.getLogger("sublime-python-coverage") +logger.setLevel(logging.INFO) +# Log to Sublime console +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter("Python Coverage [%(levelname)s]: %(message)s")) +logger.addHandler(handler) -def plugin_loaded(): - """ - Hook that is called by Sublime when plugin is loaded. + +def get_setting(key: str, default=None): """ - packaging_wheel = HERE / "libs" / "packaging-23.1-py3-none-any.whl" - if str(packaging_wheel) not in sys.path: - sys.path.append(str(packaging_wheel)) + Get a setting value from the plugin settings file. - from packaging.tags import sys_tags + Uses a try-except to handle mocked settings objects in tests. + """ + try: + settings = sublime.load_settings(SETTINGS_FILE) + # Check if this is the actual settings object + if hasattr(settings, "get"): + value = settings.get(key) + # Return default if key not found (value is None) + return value if value is not None else default + return default + except Exception: + return default - tags = [str(tag) for tag in sys_tags()] - for prefix in {"coverage*", "watchdog*"}: - # Figure out the right whl for the platform - for wheel in (HERE / "libs").glob(prefix): - wheel_tag = "-".join(wheel.stem.split("-")[2:]) - if wheel_tag in tags: - break - else: - print(f"Could not find compatible {prefix} wheel for your platform") - return +class CoverageManager: + """ + Manages coverage files and file watching. + Centralizes resource management to prevent leaks. + Includes debouncing to handle rapid file system events gracefully. + """ - if str(wheel) not in sys.path: - sys.path.append(str(wheel)) + def __init__(self): + self.coverage_files: Dict[Path, "CoverageFile"] = {} + self.file_observer = None + self.FileWatcher = None + self._initialized = False + # Debounce timers for each coverage file + self._update_timers: Dict[Path, threading.Timer] = {} + self._timer_lock = threading.Lock() - from watchdog.observers import Observer + def initialize(self, start_observer=True): + """ + Initialize the file observer and watcher class. - # TODO: only start watching when plugin is showing missing lines - global FILE_OBSERVER - FILE_OBSERVER = Observer() - FILE_OBSERVER.start() + Args: + start_observer: Whether to start the file observer immediately. + If False, it will be started when coverage files are added. + """ + if self._initialized: + logger.warning("CoverageManager already initialized") + return - from watchdog.events import FileSystemEventHandler + try: + from watchdog.events import FileSystemEventHandler + from watchdog.observers import Observer + + # Create observer but only start if requested + self.file_observer = Observer() + if start_observer: + self.file_observer.start() + logger.debug("File observer started") + + class _FileWatcher(FileSystemEventHandler): + def __init__(self, manager, file): + super().__init__() + self.manager = manager + self.file = file + + def _schedule_update(self, event_type="modified"): + """ + Schedule a debounced update for the coverage file. + + This handles the common pattern where coverage.py: + 1. Deletes .coverage file + 2. Creates new .coverage file + 3. Writes data to it + + By debouncing, we wait for the file system events to settle + before attempting to reload coverage data. + """ + logger.debug(f"Coverage file {event_type}: {self.file}") + self.manager._schedule_debounced_update(self.file) + + def on_modified(self, event): + is_coverage = event.src_path.endswith(".coverage") + is_our_file = str(event.src_path) == str(self.file) + if is_coverage and is_our_file: + self._schedule_update("modified") + + def on_created(self, event): + is_coverage = event.src_path.endswith(".coverage") + is_our_file = str(event.src_path) == str(self.file) + if is_coverage and is_our_file: + self._schedule_update("created") + + def on_deleted(self, event): + is_coverage = event.src_path.endswith(".coverage") + is_our_file = str(event.src_path) == str(self.file) + if is_coverage and is_our_file: + logger.debug(f"Coverage file deleted: {self.file}") + # File might be recreated soon, schedule update to check + self._schedule_update("deleted") + + self.FileWatcher = _FileWatcher + self._initialized = True + logger.info("CoverageManager initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize CoverageManager: {e}", exc_info=True) + raise + + def _schedule_debounced_update(self, coverage_file_path: Path): + """ + Schedule a debounced update for a coverage file. - class _FileWatcher(FileSystemEventHandler): - def __init__(self, file): - super().__init__() - self.file = file + If an update is already scheduled, cancel it and schedule a new one. + This ensures we only update once after rapid file system events settle. + """ + with self._timer_lock: + # Cancel existing timer if any + if coverage_file_path in self._update_timers: + self._update_timers[coverage_file_path].cancel() + logger.debug(f"Cancelled pending update for {coverage_file_path}") + + # Schedule new update + debounce_delay = get_setting("update_debounce_delay", 0.5) + timer = threading.Timer( + debounce_delay, + self._perform_debounced_update, + args=(coverage_file_path,), + ) + timer.daemon = True + self._update_timers[coverage_file_path] = timer + timer.start() + logger.debug(f"Scheduled debounced update for {coverage_file_path}") + + def _perform_debounced_update(self, coverage_file_path: Path): + """ + Perform the actual coverage file update after debounce delay. - def _update(self, event): - if not event.src_path.endswith(".coverage"): + Handles cases where file might have been deleted and not recreated, + or is in the process of being written. + """ + try: + with self._timer_lock: + # Remove timer from tracking + if coverage_file_path in self._update_timers: + del self._update_timers[coverage_file_path] + + # Check if file still exists + if not coverage_file_path.exists(): + logger.info( + f"Coverage file no longer exists, removing: {coverage_file_path}" + ) + self.remove_coverage_file(coverage_file_path) return - if str(event.src_path) != str(self.file): + # Check if we're still tracking this file + if coverage_file_path not in self.coverage_files: + logger.debug(f"Coverage file no longer tracked: {coverage_file_path}") return - COVERAGE_FILES[self.file].update() + # Update the coverage data + cov_file = self.coverage_files[coverage_file_path] + cov_file.update() + logger.debug(f"Coverage data updated for {coverage_file_path}") + + # Update all active views + for view_listener in ACTIVE_VIEWS.values(): + try: + view_listener._update_regions() + except Exception as e: + logger.error(f"Error updating view regions: {e}", exc_info=True) + + except Exception as e: + logger.error( + f"Error in debounced update for {coverage_file_path}: {e}", + exc_info=True, + ) + + def add_coverage_file(self, coverage_file_path: Path) -> bool: + """ + Add a coverage file to track. + + If this is the first coverage file and observer isn't running, start it. + + Args: + coverage_file_path: Path to the .coverage file + + Returns: + True if successfully added, False otherwise + """ + if coverage_file_path in self.coverage_files: + logger.debug(f"Coverage file already tracked: {coverage_file_path}") + return False - if LAST_ACTIVE_VIEW: - LAST_ACTIVE_VIEW._update_regions() + try: + if not coverage_file_path.exists(): + logger.warning(f"Coverage file does not exist: {coverage_file_path}") + return False + + # Start observer if this is the first file and observer isn't running + if not self.coverage_files and self.file_observer: + if not self.file_observer.is_alive(): + # Threads can only be started once, + # so create a new observer if stopped + from watchdog.observers import Observer + + self.file_observer = Observer() + self.file_observer.start() + logger.info( + "Created and started new file observer " + "(first coverage file added)" + ) + + cov_file = CoverageFile(self, coverage_file_path) + self.coverage_files[coverage_file_path] = cov_file + logger.info(f"Added coverage file: {coverage_file_path}") + return True + + except Exception as e: + logger.error( + f"Failed to add coverage file {coverage_file_path}: {e}", exc_info=True + ) + return False + + def remove_coverage_file(self, coverage_file_path: Path) -> bool: + """ + Remove a coverage file and cleanup its resources. - def on_modified(self, event): - self._update(event) + If this is the last coverage file, stop the observer to save resources. - def on_created(self, event): - self._update(event) + Args: + coverage_file_path: Path to the .coverage file - global FileWatcher - FileWatcher = _FileWatcher + Returns: + True if successfully removed, False otherwise + """ + if coverage_file_path not in self.coverage_files: + logger.debug(f"Coverage file not tracked: {coverage_file_path}") + return False + + try: + cov_file = self.coverage_files[coverage_file_path] + cov_file.cleanup() + del self.coverage_files[coverage_file_path] + logger.info(f"Removed coverage file: {coverage_file_path}") + + # Stop observer if no more files being tracked + if ( + not self.coverage_files + and self.file_observer + and self.file_observer.is_alive() + ): + self.file_observer.stop() + logger.info("Stopped file observer (no coverage files remaining)") + + return True + + except Exception as e: + logger.error( + f"Failed to remove coverage file {coverage_file_path}: {e}", + exc_info=True, + ) + return False + + def get_coverage_file(self, coverage_file_path: Path) -> Optional["CoverageFile"]: + """Get a coverage file if it exists.""" + return self.coverage_files.get(coverage_file_path) + + def get_coverage_for_file(self, file_path: str) -> Optional["CoverageFile"]: + """ + Find the appropriate coverage file for a given source file. + + Args: + file_path: Path to the source file + + Returns: + CoverageFile if found, None otherwise + """ + file_path_obj = Path(file_path).resolve() + + for coverage_file_path, cov_file in self.coverage_files.items(): + # Check if the file is within the same directory tree as the coverage file + try: + file_path_obj.relative_to(coverage_file_path.parent) + if cov_file.in_coverage_data(file_path): + return cov_file + except ValueError: + # Not in the same tree + continue + + return None + + def cleanup_stale_files(self): + """Remove coverage files that no longer exist.""" + stale_files = [path for path in self.coverage_files.keys() if not path.exists()] + + for path in stale_files: + logger.info(f"Cleaning up stale coverage file: {path}") + self.remove_coverage_file(path) + + def shutdown(self): + """Shutdown the coverage manager and cleanup all resources.""" + logger.info("Shutting down CoverageManager") + + # Cancel all pending update timers + with self._timer_lock: + for coverage_file_path, timer in list(self._update_timers.items()): + try: + timer.cancel() + logger.debug(f"Cancelled pending timer for {coverage_file_path}") + except Exception as e: + logger.error(f"Error cancelling timer: {e}") + self._update_timers.clear() + + # Remove all coverage files (which will cleanup watchers) + for coverage_file_path in list(self.coverage_files.keys()): + self.remove_coverage_file(coverage_file_path) + + # Stop the file observer + if self.file_observer: + try: + self.file_observer.stop() + self.file_observer.join(timeout=5) + except Exception as e: + logger.error(f"Error stopping file observer: {e}", exc_info=True) + finally: + self.file_observer = None + + self._initialized = False + logger.info("CoverageManager shutdown complete") + + +def plugin_loaded(): + """ + Hook that is called by Sublime when plugin is loaded. + """ + global COVERAGE_MANAGER + + try: + # Manually detect platform for loading the correct wheels + # We don't use the packaging library because it doesn't work in Sublime Text's + # embedded Python environment (missing _sysconfigdata modules) + import platform + + py_version = sys.version_info + py_ver = f"{py_version.major}{py_version.minor}" + + tags = [] + # Construct platform-specific tags for wheel matching + if sys.platform == "darwin": + machine = platform.machine() + if machine == "arm64": + tags.extend( + [ + f"cp{py_ver}-cp{py_ver}-macosx_11_0_arm64", + f"cp{py_ver}-cp{py_ver}-macosx_10_9_universal2", + ] + ) + else: # x86_64 + tags.extend( + [ + f"cp{py_ver}-cp{py_ver}-macosx_10_9_x86_64", + f"cp{py_ver}-cp{py_ver}-macosx_10_9_universal2", + ] + ) + elif sys.platform == "win32": + machine = platform.machine() + if "64" in machine: + tags.append(f"cp{py_ver}-cp{py_ver}-win_amd64") + else: + tags.append(f"cp{py_ver}-cp{py_ver}-win32") + elif sys.platform.startswith("linux"): + machine = platform.machine() + if machine == "x86_64": + tags.extend( + [ + f"cp{py_ver}-cp{py_ver}-manylinux_2_5_x86_64", + f"cp{py_ver}-cp{py_ver}-manylinux2014_x86_64", + ] + ) + elif machine == "aarch64": + tags.extend( + [ + f"cp{py_ver}-cp{py_ver}-manylinux_2_17_aarch64", + f"cp{py_ver}-cp{py_ver}-manylinux2014_aarch64", + ] + ) + elif machine in ("i686", "i386"): + tags.extend( + [ + f"cp{py_ver}-cp{py_ver}-manylinux_2_5_i686", + f"cp{py_ver}-cp{py_ver}-manylinux2014_i686", + ] + ) + + # Add generic py3 tags as fallback + tags.extend( + [ + f"py{py_ver}-none-any", + f"py{py_version.major}-none-any", + ] + ) + + logger.info(f"Detected platform tags: {tags[:3]}...") + + for prefix in {"coverage*", "watchdog*"}: + # Figure out the right whl for the platform + wheel_found = False + for wheel in (HERE / "libs").glob(prefix): + wheel_tag = "-".join(wheel.stem.split("-")[2:]) + if wheel_tag in tags: + wheel_found = True + break + + if not wheel_found: + lib_name = prefix.replace("*", "") + sublime.error_message( + f"Python Coverage: Could not find compatible {lib_name} " + f"library for your platform.\n\n" + f"Platform tags: {tags[:3]}...\n" + f"Please report this issue on GitHub." + ) + return + + if str(wheel) not in sys.path: + sys.path.append(str(wheel)) + + # Initialize the coverage manager + COVERAGE_MANAGER = CoverageManager() + COVERAGE_MANAGER.initialize() + + logger.info("Python Coverage plugin loaded successfully") + + except Exception as e: + sublime.error_message( + f"Python Coverage: Failed to load plugin.\n\n" + f"Error: {e}\n\n" + f"Please report this issue on GitHub." + ) + logger.error(f"Plugin load error: {e}", exc_info=True) def plugin_unloaded(): """ Hook that is called by Sublime when plugin is unloaded. """ - COVERAGE_FILES.clear() - global FILE_OBSERVER - FILE_OBSERVER.stop() - FILE_OBSERVER.join() - FILE_OBSERVER = None - global LAST_ACTIVE_VIEW - LAST_ACTIVE_VIEW = None + global COVERAGE_MANAGER + + # Clear active views + ACTIVE_VIEWS.clear() + + # Shutdown coverage manager + if COVERAGE_MANAGER: + COVERAGE_MANAGER.shutdown() + COVERAGE_MANAGER = None + + logger.info("Python Coverage plugin unloaded") class CoverageFile: - def __init__(self, coverage_file): + """ + Represents a .coverage file and manages its data and file watcher. + + Uses lazy loading and caching to handle rapid file updates gracefully. + """ + + def __init__(self, manager: CoverageManager, coverage_file: Path): + """ + Initialize a coverage file. + + Args: + manager: The CoverageManager instance + coverage_file: Path to the .coverage file + """ import coverage + self.manager = manager self.coverage_file = coverage_file - self.data = coverage.Coverage(data_file=coverage_file).get_data() - self.data.read() + self.data = None + self.handler = None + self.watcher = None + + # Cache for parsed Python statements: {file_path: (mtime, statements)} + self._statement_cache: Dict[str, tuple] = {} + + try: + # Lazy load coverage data - load on first use + # This is safer when file might be in process of being created + self.data = coverage.Coverage(data_file=str(coverage_file)).get_data() + self._load_data_with_retry() + + # Set up file watcher + if manager.FileWatcher and manager.file_observer: + self.handler = manager.FileWatcher(manager, coverage_file) + self.watcher = manager.file_observer.schedule( + self.handler, str(coverage_file.parent), recursive=False + ) + logger.debug(f"File watcher scheduled for {coverage_file}") + + except Exception as e: + logger.error(f"Error initializing CoverageFile for {coverage_file}: {e}") + raise + + def _load_data_with_retry(self, max_retries=3): + """ + Load coverage data with retry logic. - self.handler = FileWatcher(coverage_file) - self.watcher = FILE_OBSERVER.schedule(self.handler, str(coverage_file.parent)) + Coverage files might be in the process of being written, + so we retry a few times with a small delay. + """ + import time + + for attempt in range(max_retries): + try: + if not self.coverage_file.exists(): + if attempt < max_retries - 1: + logger.debug( + "Coverage file not ready, " + f"retry {attempt + 1}/{max_retries}" + ) + time.sleep(0.1 * (attempt + 1)) # Exponential backoff + continue + else: + logger.warning( + "Coverage file does not exist after retries: " + f"{self.coverage_file}" + ) + return False + + self.data.read() + logger.debug( + f"Successfully loaded coverage data for {self.coverage_file}" + ) + return True + + except Exception as e: + if attempt < max_retries - 1: + logger.debug( + f"Error loading coverage data (attempt {attempt + 1}): {e}" + ) + time.sleep(0.1 * (attempt + 1)) + else: + logger.error( + "Failed to load coverage data after {max_retries} attempts: " + f"{e}" + ) + raise + + return False def update(self): - self.data.read() + """ + Re-read coverage data from disk. - def in_coverage_data(self, file): - return str(file) in self.data.measured_files() + Uses retry logic to handle cases where file is being rewritten. + Invalidates statement cache since coverage data changed. + """ + try: + if self.data: + success = self._load_data_with_retry() + if success: + # Invalidate statement cache when coverage data changes + self._statement_cache.clear() + logger.debug(f"Updated coverage data for {self.coverage_file}") + except Exception as e: + logger.error(f"Error updating coverage data: {e}", exc_info=True) + + def in_coverage_data(self, file: str) -> bool: + """ + Check if a file is in the coverage data. + + Args: + file: Path to the source file + + Returns: + True if file is in coverage data, False otherwise + """ + try: + if not self.data: + return False + return str(file) in self.data.measured_files() + except Exception as e: + logger.error(f"Error checking coverage data: {e}", exc_info=True) + return False + + def missing_lines(self, file: str, text: str): + """ + Calculate missing lines for a given file. + + Uses cached parsed statements when file hasn't changed to avoid + reparsing on every view activation. + + Args: + file: Path to the source file + text: Source code text + + Returns: + List of missing line numbers (descending order), or None if error + """ + import hashlib - def missing_lines(self, file, text): from coverage.exceptions import DataError from coverage.parser import PythonParser try: + if not self.data: + return None + lines = self.data.lines(file) - except DataError: + except DataError as e: + logger.debug(f"DataError for {file}: {e}") + return None + except Exception as e: + logger.error(f"Error getting lines for {file}: {e}", exc_info=True) return None + if lines is None: return None - # TODO: Maybe this could be cached? And use file watcher to invalidate? - python_parser = PythonParser(text=text) - python_parser.parse_source() - statements = python_parser.statements + try: + # Check cache for parsed statements + # Use hash of text content as cache key since we get text from view + text_hash = hashlib.md5(text.encode()).hexdigest() + cache_key = f"{file}:{text_hash}" + + if cache_key in self._statement_cache: + statements = self._statement_cache[cache_key] + logger.debug(f"Using cached statements for {file}") + else: + # Parse the file to find all executable statements + python_parser = PythonParser(text=text) + python_parser.parse_source() + statements = python_parser.statements + + # Cache the parsed statements + self._statement_cache[cache_key] = statements + logger.debug(f"Cached statements for {file}") + + # Calculate missing lines (statements not executed) + missing = sorted(list(statements - set(lines)), reverse=True) + return missing + + except Exception as e: + logger.error(f"Error parsing file {file}: {e}", exc_info=True) + return None + + def cleanup(self): + """Cleanup resources associated with this coverage file.""" + try: + # Unschedule the file watcher + if self.watcher and self.manager.file_observer: + self.manager.file_observer.unschedule(self.watcher) + logger.debug(f"Unscheduled watcher for {self.coverage_file}") + self.watcher = None + + self.handler = None + self.data = None + self._statement_cache.clear() - return sorted(list(statements - set(lines)), reverse=True) + except Exception as e: + logger.error(f"Error cleaning up CoverageFile: {e}", exc_info=True) class ToggleMissingLinesCommand(sublime_plugin.ApplicationCommand): @@ -145,7 +703,7 @@ def run(self): class PythonCoverageDataFileListener(sublime_plugin.EventListener): @classmethod - def is_applicable(cls, settings): + def is_applicable(cls, _settings): """ Returns: Whether this listener should apply to a view with the given Settings. @@ -176,21 +734,46 @@ def on_post_save_project_async(self, window): def on_pre_close_project(self, window): """ Called right before a project is closed, passed the Window object. + Cleanup coverage files for closing project. """ - self.update_available_coverage_files(window) + if not COVERAGE_MANAGER: + return + + # Remove coverage files for folders in this project + coverage_file_name = get_setting("coverage_file_name", ".coverage") + for folder in window.folders(): + folder = Path(folder) + coverage_file = folder / coverage_file_name + if coverage_file in COVERAGE_MANAGER.coverage_files: + COVERAGE_MANAGER.remove_coverage_file(coverage_file) + + # Cleanup stale files + COVERAGE_MANAGER.cleanup_stale_files() def on_activated_async(self, view): self.update_available_coverage_files(view.window()) def update_available_coverage_files(self, window): + """Scan for and add coverage files in project folders.""" + if not COVERAGE_MANAGER: + return + settings = sublime.load_settings(SETTINGS_FILE) - if not settings["show_missing_lines"]: + if not settings.get("show_missing_lines", False): return - for folder in window.folders(): - folder = Path(folder) - coverage_file = folder / ".coverage" - if coverage_file.is_file() and coverage_file not in COVERAGE_FILES: - COVERAGE_FILES[coverage_file] = CoverageFile(coverage_file) + + try: + coverage_file_name = get_setting("coverage_file_name", ".coverage") + for folder in window.folders(): + folder = Path(folder) + coverage_file = folder / coverage_file_name + + # Add coverage file if it exists and not already tracked + if coverage_file.is_file(): + COVERAGE_MANAGER.add_coverage_file(coverage_file) + + except Exception as e: + logger.error(f"Error updating coverage files: {e}", exc_info=True) class PythonCoverageEventListener(sublime_plugin.ViewEventListener): @@ -207,61 +790,133 @@ def on_modified_async(self): Called after changes have been made to the view. Runs in a separate thread, and does not block the application. """ - pass - # TODO: clear the modified region(s), if any + try: + # Clear coverage markers when file is modified + # They may no longer be accurate since the code has changed + settings = sublime.load_settings(SETTINGS_FILE) + if settings.get("show_missing_lines", False): + self.view.erase_regions(key="python-coverage") + except Exception as e: + logger.error(f"Error in on_modified_async: {e}", exc_info=True) def on_activated_async(self): """ Called when a view gains input focus. Runs in a separate thread, and does not block the application. """ - settings = sublime.load_settings(SETTINGS_FILE) - if not settings["show_missing_lines"]: - self.view.erase_regions(key="python-coverage") - return + try: + settings = sublime.load_settings(SETTINGS_FILE) + if not settings.get("show_missing_lines", False): + self.view.erase_regions(key="python-coverage") + # Remove from active views if present + view_id = self.view.id() + ACTIVE_VIEWS.pop(view_id, None) + return + + # Add this view to active views + view_id = self.view.id() + ACTIVE_VIEWS[view_id] = self - global LAST_ACTIVE_VIEW - LAST_ACTIVE_VIEW = self + self._update_regions() - self._update_regions() + except Exception as e: + logger.error(f"Error in on_activated_async: {e}", exc_info=True) + + def on_close(self): + """ + Called when a view is closed. Runs in the main thread. + """ + # Remove from active views when closed + view_id = self.view.id() + ACTIVE_VIEWS.pop(view_id, None) def _update_regions(self): + """Update coverage regions for this view.""" file_name = self.view.file_name() if not file_name: + self._clear_status_bar() return - for coverage_file in COVERAGE_FILES: - # Assume that the file is somewhere within the - # same (sub)folder as the coverage file - if str(coverage_file.parent) in file_name: - break - else: + if not COVERAGE_MANAGER: self.view.erase_regions(key="python-coverage") + self._clear_status_bar() return - cov = COVERAGE_FILES[coverage_file] - if not cov.in_coverage_data(file_name): - self.view.erase_regions(key="python-coverage") - return + try: + # Use the manager's improved path matching + cov = COVERAGE_MANAGER.get_coverage_for_file(file_name) + if not cov: + self.view.erase_regions(key="python-coverage") + self._clear_status_bar() + return + + # Get file content + full_file_region = sublime.Region(0, self.view.size()) + text = self.view.substr(full_file_region) - full_file_region = sublime.Region(0, self.view.size()) - text = self.view.substr(full_file_region) + # Calculate missing lines + missing = cov.missing_lines(file_name, text) - missing = cov.missing_lines(file_name, text) - if not missing: + # Calculate total executable lines for coverage percentage + from coverage.parser import PythonParser + + python_parser = PythonParser(text=text) + python_parser.parse_source() + total_lines = len(python_parser.statements) + + if not missing: + self.view.erase_regions(key="python-coverage") + self._update_status_bar(0, total_lines) + return + + # Convert line numbers to regions + all_lines_regions = self.view.lines(full_file_region) + missing_regions = [all_lines_regions[line - 1] for line in missing] + + # Add visual indicators with configurable settings + gutter_icon = get_setting("gutter_icon", "triangle") + highlight_scope = get_setting("highlight_scope", "region.orangish") + icon_path = f"Packages/sublime-python-coverage/images/{gutter_icon}.png" + + self.view.add_regions( + key="python-coverage", + regions=missing_regions, + scope=highlight_scope, + icon=icon_path, + flags=sublime.RegionFlags.HIDDEN, + ) + logger.debug( + f"Updated regions for {file_name}: {len(missing)} missing lines" + ) + + # Update status bar with coverage info + self._update_status_bar(len(missing), total_lines) + + except Exception as e: + logger.error(f"Error updating regions for {file_name}: {e}", exc_info=True) self.view.erase_regions(key="python-coverage") + self._clear_status_bar() + + def _update_status_bar(self, missing_count: int, total_lines: int): + """Update the status bar with coverage information.""" + if not get_setting("show_coverage_on_status_bar", True): + return + + if total_lines == 0: + self._clear_status_bar() return - all_lines_regions = self.view.lines(full_file_region) - missing_regions = [all_lines_regions[line - 1] for line in missing] + covered_lines = total_lines - missing_count + coverage_percent = (covered_lines / total_lines) * 100 - self.view.add_regions( - key="python-coverage", - regions=missing_regions, - scope="region.orangish", - icon="Packages/sublime-python-coverage/images/triangle.png", - flags=sublime.RegionFlags.HIDDEN, + status_text = ( + f"Coverage: {coverage_percent:.0f}% ({covered_lines}/{total_lines} lines)" ) + self.view.set_status("python_coverage", status_text) + + def _clear_status_bar(self): + """Clear coverage information from the status bar.""" + self.view.erase_status("python_coverage") def on_hover(self, point, hover_zone): """ diff --git a/python-coverage.sublime-settings b/python-coverage.sublime-settings index db17ab3..5e9ad9d 100644 --- a/python-coverage.sublime-settings +++ b/python-coverage.sublime-settings @@ -1,3 +1,22 @@ { - "show_missing_lines": false + // Enable/disable coverage visualization + "show_missing_lines": false, + + // Show coverage percentage in status bar for the current file + "show_coverage_on_status_bar": true, + + // Name of the coverage file to look for (default: ".coverage") + "coverage_file_name": ".coverage", + + // Debounce delay in seconds for coverage file updates (default: 0.5) + // Increase if you experience issues with rapid file system events + "update_debounce_delay": 0.5, + + // Gutter icon to use for marking uncovered lines + // Options: "triangle", "diamond", "line" + "gutter_icon": "triangle", + + // Color scope for highlighting uncovered regions + // Common options: "region.orangish", "region.redish", "region.yellowish" + "highlight_scope": "region.orangish" } diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..ccea0e5 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,134 @@ +"""Pytest configuration and fixtures.""" + +import importlib.util +import sys +from pathlib import Path + +import pytest + +# Add mocks to sys.modules before importing the plugin +TEST_DIR = Path(__file__).parent +MOCK_DIR = TEST_DIR / "mocks" + +sys.path.insert(0, str(MOCK_DIR)) + +# Mock sublime and sublime_plugin modules +from tests.mocks import sublime, sublime_plugin # noqa: E402 + +sys.modules["sublime"] = sublime +sys.modules["sublime_plugin"] = sublime_plugin + +# Load the main plugin module (python-coverage.py) and add it to sys.modules +PLUGIN_PATH = TEST_DIR.parent / "python-coverage.py" +spec = importlib.util.spec_from_file_location("python_coverage", PLUGIN_PATH) +python_coverage = importlib.util.module_from_spec(spec) +sys.modules["python_coverage"] = python_coverage + +# Try to execute the module (may fail due to missing deps, that's ok) +try: + spec.loader.exec_module(python_coverage) +except Exception as e: + # Some imports may fail in test environment, that's expected + print(f"Note: Plugin module partially loaded (expected): {e}") + + +@pytest.fixture +def temp_coverage_file(tmp_path): + """Create a temporary coverage file.""" + coverage_file = tmp_path / ".coverage" + coverage_file.touch() + return coverage_file + + +@pytest.fixture +def sample_python_code(): + """Sample Python code for testing.""" + return """def hello(name): + if name: + print(f"Hello, {name}!") + else: + print("Hello, World!") + return True +""" + + +@pytest.fixture +def mock_coverage_data(mocker): + """Mock coverage.Coverage and CoverageData.""" + mock_data = mocker.MagicMock() + mock_data.measured_files.return_value = ["/path/to/file.py"] + mock_data.lines.return_value = [1, 2, 3, 5] # Line 4 is missing + mock_data.read.return_value = None + + mock_coverage = mocker.MagicMock() + mock_coverage.get_data.return_value = mock_data + + mock_coverage_class = mocker.patch("coverage.Coverage") + mock_coverage_class.return_value = mock_coverage + + return mock_data + + +@pytest.fixture +def mock_python_parser(mocker): + """Mock coverage.parser.PythonParser.""" + mock_parser = mocker.MagicMock() + mock_parser.statements = {1, 2, 3, 4, 5} # All executable lines + + mock_parser_class = mocker.patch("coverage.parser.PythonParser") + mock_parser_class.return_value = mock_parser + + return mock_parser + + +@pytest.fixture +def mock_file_observer(mocker): + """Mock watchdog Observer.""" + mock_observer = mocker.MagicMock() + mock_observer_class = mocker.patch("watchdog.observers.Observer") + mock_observer_class.return_value = mock_observer + return mock_observer + + +@pytest.fixture +def sublime_view(): + """Create a mock Sublime View.""" + return sublime.View(file_name="/path/to/file.py") + + +@pytest.fixture +def sublime_window(tmp_path): + """Create a mock Sublime Window.""" + return sublime.Window(folders=[str(tmp_path)]) + + +@pytest.fixture(autouse=True) +def reset_globals(): + """Reset global state before each test.""" + import python_coverage as pc + + # Clear global state if attributes exist + if hasattr(pc, "ACTIVE_VIEWS") and pc.ACTIVE_VIEWS: + pc.ACTIVE_VIEWS.clear() + + # Reset coverage manager if it exists + if hasattr(pc, "COVERAGE_MANAGER") and pc.COVERAGE_MANAGER: + # Shutdown existing manager + try: + pc.COVERAGE_MANAGER.shutdown() + except Exception: + pass + pc.COVERAGE_MANAGER = None + + yield + + # Cleanup after test + if hasattr(pc, "ACTIVE_VIEWS") and pc.ACTIVE_VIEWS: + pc.ACTIVE_VIEWS.clear() + + if hasattr(pc, "COVERAGE_MANAGER") and pc.COVERAGE_MANAGER: + try: + pc.COVERAGE_MANAGER.shutdown() + except Exception: + pass + pc.COVERAGE_MANAGER = None diff --git a/tests/mocks/__init__.py b/tests/mocks/__init__.py new file mode 100644 index 0000000..b686b4d --- /dev/null +++ b/tests/mocks/__init__.py @@ -0,0 +1 @@ +"""Mock modules for testing.""" diff --git a/tests/mocks/sublime.py b/tests/mocks/sublime.py new file mode 100644 index 0000000..a5d31f6 --- /dev/null +++ b/tests/mocks/sublime.py @@ -0,0 +1,182 @@ +"""Mock sublime module for testing.""" + +from typing import Any, Callable, Optional + + +class RegionFlags: + """Mock RegionFlags.""" + + HIDDEN = 1 + PERSISTENT = 16 + + +class HoverZone: + """Mock HoverZone.""" + + TEXT = 1 + GUTTER = 2 + MARGIN = 3 + + +HIDE_ON_MOUSE_MOVE_AWAY = 1 + + +class Region: + """Mock Region class.""" + + def __init__(self, a: int, b: Optional[int] = None): + self.a = a + self.b = b if b is not None else a + + def contains(self, point: int) -> bool: + """Check if point is within region.""" + return self.a <= point <= self.b + + def __repr__(self): + return f"Region({self.a}, {self.b})" + + +class View: + """Mock View class.""" + + _id_counter = 1 + + def __init__(self, file_name: Optional[str] = None): + self._file_name = file_name + self._regions = {} + self._status = {} + self._content = "" + self._size = 0 + self._lines = [] + self._id = View._id_counter + View._id_counter += 1 + + def id(self) -> int: + """Return the unique ID of this view.""" + return self._id + + def file_name(self) -> Optional[str]: + """Return the file name.""" + return self._file_name + + def size(self) -> int: + """Return the size of the view.""" + return self._size + + def substr(self, region: Region) -> str: + """Return the string within the region.""" + if hasattr(region, "a") and hasattr(region, "b"): + return self._content[region.a : region.b] + return self._content + + def lines(self, region: Region) -> list: + """Return lines within the region.""" + # Split content into lines and create regions for each + lines = self._content.split("\n") + regions = [] + pos = 0 + for line in lines: + line_end = pos + len(line) + regions.append(Region(pos, line_end)) + pos = line_end + 1 # +1 for the newline + return regions + + def add_regions( + self, key: str, regions: list, scope: str = "", icon: str = "", flags: int = 0 + ): + """Add regions to the view.""" + self._regions[key] = { + "regions": regions, + "scope": scope, + "icon": icon, + "flags": flags, + } + + def erase_regions(self, key: str): + """Erase regions from the view.""" + if key in self._regions: + del self._regions[key] + + def get_regions(self, key: str) -> list: + """Get regions by key.""" + if key in self._regions: + return self._regions[key]["regions"] + return [] + + def set_status(self, key: str, value: str): + """Set status bar text.""" + self._status[key] = value + + def erase_status(self, key: str): + """Erase status bar text.""" + if key in self._status: + del self._status[key] + + def get_status(self, key: str) -> str: + """Get status bar text.""" + return self._status.get(key, "") + + def show_popup( + self, + content: str, + flags: int = 0, + location: int = -1, + max_width: int = 320, + max_height: int = 240, + on_navigate: Optional[Callable] = None, + ): + """Show a popup.""" + pass + + def window(self): + """Return the window containing this view.""" + return Window() + + +class Window: + """Mock Window class.""" + + def __init__(self, folders: Optional[list] = None): + self._folders = folders or [] + + def folders(self) -> list: + """Return the list of folders in the window.""" + return self._folders + + +class Settings: + """Mock Settings class.""" + + def __init__(self): + self._settings = {} + + def get(self, key: str, default: Any = None) -> Any: + """Get a setting value.""" + return self._settings.get(key, default) + + def set(self, key: str, value: Any): + """Set a setting value.""" + self._settings[key] = value + + def __getitem__(self, key: str) -> Any: + """Get a setting using bracket notation.""" + return self._settings[key] + + def __setitem__(self, key: str, value: Any): + """Set a setting using bracket notation.""" + self._settings[key] = value + + +_settings = {} + + +def load_settings(base_name: str) -> Settings: + """Load settings file.""" + if base_name not in _settings: + _settings[base_name] = Settings() + return _settings[base_name] + + +def save_settings(base_name: str): + """Save settings file.""" + pass diff --git a/tests/mocks/sublime_plugin.py b/tests/mocks/sublime_plugin.py new file mode 100644 index 0000000..cd9c6c0 --- /dev/null +++ b/tests/mocks/sublime_plugin.py @@ -0,0 +1,30 @@ +"""Mock sublime_plugin module for testing.""" + + +class ApplicationCommand: + """Mock ApplicationCommand class.""" + + def run(self): + """Run the command.""" + pass + + +class EventListener: + """Mock EventListener class.""" + + @classmethod + def is_applicable(cls, settings): + """Check if listener is applicable.""" + return True + + +class ViewEventListener: + """Mock ViewEventListener class.""" + + def __init__(self, view): + self.view = view + + @classmethod + def is_applicable(cls, settings): + """Check if listener is applicable.""" + return True diff --git a/tests/test_commands.py b/tests/test_commands.py new file mode 100644 index 0000000..df7d10f --- /dev/null +++ b/tests/test_commands.py @@ -0,0 +1,47 @@ +"""Tests for plugin commands.""" + +from unittest.mock import patch + + +def test_toggle_missing_lines_command_enables(mocker): + """Test ToggleMissingLinesCommand toggles from False to True.""" + from tests.mocks.sublime import Settings + + mock_settings = Settings() + mock_settings["show_missing_lines"] = False # Currently disabled + + with patch("sublime.load_settings", return_value=mock_settings), patch( + "sublime.save_settings" + ) as mock_save, patch("builtins.print") as mock_print: + from python_coverage import ToggleMissingLinesCommand + + cmd = ToggleMissingLinesCommand() + cmd.run() + + # Should set to True + assert mock_settings["show_missing_lines"] is True + mock_save.assert_called_once() + mock_print.assert_called_once() + assert "Enabled" in str(mock_print.call_args) + + +def test_toggle_missing_lines_command_disables(mocker): + """Test ToggleMissingLinesCommand toggles from True to False.""" + from tests.mocks.sublime import Settings + + mock_settings = Settings() + mock_settings["show_missing_lines"] = True # Currently enabled + + with patch("sublime.load_settings", return_value=mock_settings), patch( + "sublime.save_settings" + ) as mock_save, patch("builtins.print") as mock_print: + from python_coverage import ToggleMissingLinesCommand + + cmd = ToggleMissingLinesCommand() + cmd.run() + + # Should set to False + assert mock_settings["show_missing_lines"] is False + mock_save.assert_called_once() + mock_print.assert_called_once() + assert "Disabled" in str(mock_print.call_args) diff --git a/tests/test_coverage_file.py b/tests/test_coverage_file.py new file mode 100644 index 0000000..033ac4f --- /dev/null +++ b/tests/test_coverage_file.py @@ -0,0 +1,161 @@ +"""Tests for CoverageFile class.""" + + +def test_coverage_file_initialization( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test CoverageFile initializes correctly.""" + from python_coverage import CoverageFile, CoverageManager + + # Create a manager + manager = CoverageManager() + manager.initialize() + + # Create a coverage file + cov_file = CoverageFile(manager, temp_coverage_file) + + assert cov_file.coverage_file == temp_coverage_file + assert cov_file.data is not None + assert cov_file.manager is manager + + +def test_coverage_file_update( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test CoverageFile.update() re-reads data.""" + from python_coverage import CoverageFile, CoverageManager + + manager = CoverageManager() + manager.initialize() + + cov_file = CoverageFile(manager, temp_coverage_file) + initial_read_count = mock_coverage_data.read.call_count + + cov_file.update() + + assert mock_coverage_data.read.call_count > initial_read_count + + +def test_coverage_file_in_coverage_data( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test CoverageFile.in_coverage_data() checks file presence.""" + from python_coverage import CoverageFile, CoverageManager + + manager = CoverageManager() + manager.initialize() + + cov_file = CoverageFile(manager, temp_coverage_file) + + assert cov_file.in_coverage_data("/path/to/file.py") is True + assert cov_file.in_coverage_data("/other/file.py") is False + + +def test_coverage_file_missing_lines_success( + temp_coverage_file, + mock_coverage_data, + mock_python_parser, + mock_file_observer, + sample_python_code, +): + """Test CoverageFile.missing_lines() calculates missing lines correctly.""" + from python_coverage import CoverageFile, CoverageManager + + manager = CoverageManager() + manager.initialize() + + cov_file = CoverageFile(manager, temp_coverage_file) + + # mock_coverage_data.lines returns [1, 2, 3, 5] + # mock_python_parser.statements is {1, 2, 3, 4, 5} + # Missing line should be [4] + missing = cov_file.missing_lines("/path/to/file.py", sample_python_code) + + assert missing == [4] + + +def test_coverage_file_missing_lines_data_error( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test CoverageFile.missing_lines() handles DataError.""" + from coverage.exceptions import DataError + from python_coverage import CoverageFile, CoverageManager + + manager = CoverageManager() + manager.initialize() + + cov_file = CoverageFile(manager, temp_coverage_file) + + # Make lines() raise DataError + mock_coverage_data.lines.side_effect = DataError("Test error") + + missing = cov_file.missing_lines("/path/to/file.py", "code") + + assert missing is None + + +def test_coverage_file_missing_lines_no_data( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test CoverageFile.missing_lines() handles no coverage data.""" + from python_coverage import CoverageFile, CoverageManager + + manager = CoverageManager() + manager.initialize() + + cov_file = CoverageFile(manager, temp_coverage_file) + + # Make lines() return None + mock_coverage_data.lines.return_value = None + + missing = cov_file.missing_lines("/path/to/file.py", "code") + + assert missing is None + + +def test_coverage_file_missing_lines_all_covered( + temp_coverage_file, + mock_coverage_data, + mock_python_parser, + mock_file_observer, + sample_python_code, +): + """Test CoverageFile.missing_lines() when all lines are covered.""" + from python_coverage import CoverageFile, CoverageManager + + manager = CoverageManager() + manager.initialize() + + cov_file = CoverageFile(manager, temp_coverage_file) + + # All lines covered + mock_coverage_data.lines.return_value = [1, 2, 3, 4, 5] + mock_python_parser.statements = {1, 2, 3, 4, 5} + + missing = cov_file.missing_lines("/path/to/file.py", sample_python_code) + + assert missing == [] + + +def test_coverage_file_missing_lines_sorted_descending( + temp_coverage_file, + mock_coverage_data, + mock_python_parser, + mock_file_observer, + sample_python_code, +): + """Test CoverageFile.missing_lines() returns lines in descending order.""" + from python_coverage import CoverageFile, CoverageManager + + manager = CoverageManager() + manager.initialize() + + cov_file = CoverageFile(manager, temp_coverage_file) + + # Multiple missing lines: 2, 4, 7 + mock_coverage_data.lines.return_value = [1, 3, 5, 6, 8] + mock_python_parser.statements = {1, 2, 3, 4, 5, 6, 7, 8} + + missing = cov_file.missing_lines("/path/to/file.py", sample_python_code) + + assert missing == [7, 4, 2] # Descending order diff --git a/tests/test_coverage_manager.py b/tests/test_coverage_manager.py new file mode 100644 index 0000000..5c951fc --- /dev/null +++ b/tests/test_coverage_manager.py @@ -0,0 +1,448 @@ +"""Tests for CoverageManager class.""" + +from pathlib import Path + + +def test_coverage_manager_initialization(mock_file_observer): + """Test CoverageManager initializes correctly.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + + assert manager._initialized is True + assert manager.file_observer is not None + assert manager.FileWatcher is not None + assert len(manager.coverage_files) == 0 + + +def test_coverage_manager_add_coverage_file( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test adding a coverage file to the manager.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + + success = manager.add_coverage_file(temp_coverage_file) + + assert success is True + assert temp_coverage_file in manager.coverage_files + + +def test_coverage_manager_add_duplicate_coverage_file( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test adding a duplicate coverage file returns False.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + + manager.add_coverage_file(temp_coverage_file) + success = manager.add_coverage_file(temp_coverage_file) + + assert success is False + + +def test_coverage_manager_add_nonexistent_file(mock_file_observer): + """Test adding a nonexistent coverage file returns False.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + + nonexistent = Path("/nonexistent/.coverage") + success = manager.add_coverage_file(nonexistent) + + assert success is False + + +def test_coverage_manager_remove_coverage_file( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test removing a coverage file from the manager.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + + manager.add_coverage_file(temp_coverage_file) + success = manager.remove_coverage_file(temp_coverage_file) + + assert success is True + assert temp_coverage_file not in manager.coverage_files + + +def test_coverage_manager_get_coverage_for_file( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test getting coverage for a file.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + manager.add_coverage_file(temp_coverage_file) + + # File within the coverage directory + file_path = str(temp_coverage_file.parent / "test.py") + + # Mock measured_files to include our test file + mock_coverage_data.measured_files.return_value = [file_path] + + cov = manager.get_coverage_for_file(file_path) + + # Should find the coverage file if in_coverage_data returns True + assert cov is not None + + +def test_coverage_manager_cleanup_stale_files( + tmp_path, mock_coverage_data, mock_file_observer +): + """Test cleanup of stale coverage files.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + + # Create a temporary coverage file + coverage_file = tmp_path / ".coverage" + coverage_file.touch() + manager.add_coverage_file(coverage_file) + + # Delete the file to make it stale + coverage_file.unlink() + + # Cleanup stale files + manager.cleanup_stale_files() + + # Should be removed + assert coverage_file not in manager.coverage_files + + +def test_coverage_manager_shutdown( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test shutting down the coverage manager.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + manager.add_coverage_file(temp_coverage_file) + + manager.shutdown() + + assert len(manager.coverage_files) == 0 + assert manager.file_observer is None + assert manager._initialized is False + + +def test_debounced_update_schedules_timer( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test that _schedule_debounced_update schedules a timer.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + manager.add_coverage_file(temp_coverage_file) + + # Schedule an update + manager._schedule_debounced_update(temp_coverage_file) + + # Verify timer was created + assert temp_coverage_file in manager._update_timers + timer = manager._update_timers[temp_coverage_file] + assert timer.is_alive() + + # Cleanup + timer.cancel() + + +def test_debounced_update_cancels_existing_timer( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test that scheduling a new update cancels the existing timer.""" + + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + manager.add_coverage_file(temp_coverage_file) + + # Schedule first update + manager._schedule_debounced_update(temp_coverage_file) + first_timer = manager._update_timers[temp_coverage_file] + + # Schedule second update (should cancel first timer) + manager._schedule_debounced_update(temp_coverage_file) + second_timer = manager._update_timers[temp_coverage_file] + + # Verify the timers are different objects (old one was replaced) + assert first_timer is not second_timer + + # Wait briefly for the cancelled timer's thread to finish + # Timer.cancel() prevents execution but thread may still be alive briefly + first_timer.finished.wait(timeout=0.1) + + # Now verify the first timer finished (cancelled or executed) + assert first_timer.finished.is_set() + # Second timer should still be active + assert second_timer.is_alive() + + # Cleanup + second_timer.cancel() + + +def test_perform_debounced_update_updates_coverage( + temp_coverage_file, mock_coverage_data, mock_file_observer, mocker +): + """Test that _perform_debounced_update updates the coverage file.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + manager.add_coverage_file(temp_coverage_file) + + cov_file = manager.coverage_files[temp_coverage_file] + spy = mocker.spy(cov_file, "update") + + # Perform the update + manager._perform_debounced_update(temp_coverage_file) + + # Verify update was called + spy.assert_called_once() + + +def test_perform_debounced_update_removes_nonexistent_file( + temp_coverage_file, mock_coverage_data, mock_file_observer +): + """Test that _perform_debounced_update removes file if it no longer exists.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + manager.add_coverage_file(temp_coverage_file) + + # Delete the file + temp_coverage_file.unlink() + + # Perform the update + manager._perform_debounced_update(temp_coverage_file) + + # File should be removed from manager + assert temp_coverage_file not in manager.coverage_files + + +def test_perform_debounced_update_handles_untracked_file( + temp_coverage_file, mock_coverage_data, mock_file_observer, mocker +): + """Test that _perform_debounced_update handles untracked files gracefully.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + + # Don't add the file to manager, but try to update it + # This should not raise an exception + manager._perform_debounced_update(temp_coverage_file) + + # No assertions needed - just verifying no exception is raised + + +def test_perform_debounced_update_updates_active_views( + temp_coverage_file, mock_coverage_data, mock_file_observer, mocker, sublime_view +): + """Test that _perform_debounced_update updates all active views.""" + import python_coverage as pc + from python_coverage import CoverageManager + + # Create a mock view listener + mock_listener = mocker.MagicMock() + mock_listener._update_regions = mocker.MagicMock() + + # Add to active views + pc.ACTIVE_VIEWS = {1: mock_listener} + + manager = CoverageManager() + manager.initialize() + manager.add_coverage_file(temp_coverage_file) + + # Perform the update + manager._perform_debounced_update(temp_coverage_file) + + # Verify _update_regions was called + mock_listener._update_regions.assert_called_once() + + # Cleanup + pc.ACTIVE_VIEWS = {} + + +def test_file_watcher_on_modified( + temp_coverage_file, mock_coverage_data, mock_file_observer, mocker +): + """Test FileWatcher.on_modified triggers debounced update.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + manager.add_coverage_file(temp_coverage_file) + + # Create a FileWatcher instance + watcher = manager.FileWatcher(manager, temp_coverage_file) + + # Create a mock event + mock_event = mocker.MagicMock() + mock_event.src_path = str(temp_coverage_file) + + # Spy on _schedule_debounced_update + spy = mocker.spy(manager, "_schedule_debounced_update") + + # Trigger the event + watcher.on_modified(mock_event) + + # Verify debounced update was scheduled + spy.assert_called_once_with(temp_coverage_file) + + +def test_file_watcher_on_created( + temp_coverage_file, mock_coverage_data, mock_file_observer, mocker +): + """Test FileWatcher.on_created triggers debounced update.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + manager.add_coverage_file(temp_coverage_file) + + watcher = manager.FileWatcher(manager, temp_coverage_file) + + mock_event = mocker.MagicMock() + mock_event.src_path = str(temp_coverage_file) + + spy = mocker.spy(manager, "_schedule_debounced_update") + + watcher.on_created(mock_event) + + spy.assert_called_once_with(temp_coverage_file) + + +def test_file_watcher_on_deleted( + temp_coverage_file, mock_coverage_data, mock_file_observer, mocker +): + """Test FileWatcher.on_deleted triggers debounced update.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + manager.add_coverage_file(temp_coverage_file) + + watcher = manager.FileWatcher(manager, temp_coverage_file) + + mock_event = mocker.MagicMock() + mock_event.src_path = str(temp_coverage_file) + + spy = mocker.spy(manager, "_schedule_debounced_update") + + watcher.on_deleted(mock_event) + + spy.assert_called_once_with(temp_coverage_file) + + +def test_file_watcher_ignores_wrong_file( + temp_coverage_file, mock_coverage_data, mock_file_observer, mocker +): + """Test FileWatcher ignores events for different files.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + manager.add_coverage_file(temp_coverage_file) + + watcher = manager.FileWatcher(manager, temp_coverage_file) + + # Create event for a different file + mock_event = mocker.MagicMock() + mock_event.src_path = str(temp_coverage_file.parent / "other_file.txt") + + spy = mocker.spy(manager, "_schedule_debounced_update") + + # Trigger events + watcher.on_modified(mock_event) + watcher.on_created(mock_event) + watcher.on_deleted(mock_event) + + # Verify debounced update was NOT called + spy.assert_not_called() + + +def test_file_watcher_ignores_non_coverage_files( + temp_coverage_file, mock_coverage_data, mock_file_observer, mocker +): + """Test FileWatcher ignores non-.coverage files.""" + from python_coverage import CoverageManager + + manager = CoverageManager() + manager.initialize() + manager.add_coverage_file(temp_coverage_file) + + watcher = manager.FileWatcher(manager, temp_coverage_file) + + # Create event for a .py file + mock_event = mocker.MagicMock() + mock_event.src_path = str(temp_coverage_file.parent / "test.py") + + spy = mocker.spy(manager, "_schedule_debounced_update") + + # Trigger events + watcher.on_modified(mock_event) + watcher.on_created(mock_event) + watcher.on_deleted(mock_event) + + # Verify debounced update was NOT called + spy.assert_not_called() + + +def test_coverage_manager_recreates_observer_after_stop( + tmp_path, mock_coverage_data, mocker +): + """ + Test that the observer is recreated when adding files after all files are removed. + """ + from python_coverage import CoverageManager + + # Mock the Observer class + mock_observer_instance = mocker.MagicMock() + # Initially alive, then becomes not alive after stop + mock_observer_instance.is_alive.side_effect = [True, True, False] + mock_observer_class = mocker.patch("watchdog.observers.Observer") + mock_observer_class.return_value = mock_observer_instance + + # Create first coverage file + coverage_file_1 = tmp_path / ".coverage" + coverage_file_1.touch() + + manager = CoverageManager() + manager.initialize() + + # Add first file - observer should start + manager.add_coverage_file(coverage_file_1) + assert mock_observer_instance.start.call_count == 1 + + # Remove the file - observer should stop (is_alive returns True, so stop is called) + manager.remove_coverage_file(coverage_file_1) + mock_observer_instance.stop.assert_called_once() + + # Create second coverage file + coverage_file_2 = tmp_path / "project2" / ".coverage" + coverage_file_2.parent.mkdir(parents=True) + coverage_file_2.touch() + + # Add second file - should create NEW observer instance (is_alive returns False now) + manager.add_coverage_file(coverage_file_2) + + # Verify a new Observer was created (second call to constructor) + assert mock_observer_class.call_count == 2 + # Verify start was called on the new instance + assert mock_observer_instance.start.call_count == 2 diff --git a/tests/test_event_listeners.py b/tests/test_event_listeners.py new file mode 100644 index 0000000..0e3722a --- /dev/null +++ b/tests/test_event_listeners.py @@ -0,0 +1,433 @@ +"""Tests for event listeners.""" + +from unittest.mock import patch + + +class TestPythonCoverageDataFileListener: + """Tests for PythonCoverageDataFileListener.""" + + def test_is_applicable(self): + """Test is_applicable returns True.""" + from python_coverage import PythonCoverageDataFileListener + + assert PythonCoverageDataFileListener.is_applicable({}) is True + + def test_update_available_coverage_files_disabled( + self, mocker, sublime_window, temp_coverage_file + ): + """Test update_available_coverage_files when feature is disabled.""" + import python_coverage as pc + from python_coverage import CoverageManager, PythonCoverageDataFileListener + + # Initialize manager + pc.COVERAGE_MANAGER = CoverageManager() + pc.COVERAGE_MANAGER.initialize() + + mock_settings = mocker.MagicMock() + mock_settings.get.return_value = False # show_missing_lines = False + + with patch("sublime.load_settings", return_value=mock_settings): + listener = PythonCoverageDataFileListener() + listener.update_available_coverage_files(sublime_window) + + # Should not add any coverage files + assert len(pc.COVERAGE_MANAGER.coverage_files) == 0 + + def test_update_available_coverage_files_enabled( + self, mocker, sublime_window, tmp_path, mock_file_observer, mock_coverage_data + ): + """Test update_available_coverage_files when feature is enabled.""" + import python_coverage as pc + from python_coverage import CoverageManager, PythonCoverageDataFileListener + + # Create a .coverage file in the temp directory + coverage_file = tmp_path / ".coverage" + coverage_file.touch() + + # Update window to point to temp directory + sublime_window._folders = [str(tmp_path)] + + # Initialize coverage manager + pc.COVERAGE_MANAGER = CoverageManager() + pc.COVERAGE_MANAGER.initialize() + + mock_settings = mocker.MagicMock() + + # Configure get() to return appropriate values based on key + def settings_get(key, default=None): + if key == "show_missing_lines": + return True + if key == "coverage_file_name": + return ".coverage" + return default + + mock_settings.get.side_effect = settings_get + + with patch("sublime.load_settings", return_value=mock_settings): + listener = PythonCoverageDataFileListener() + listener.update_available_coverage_files(sublime_window) + + # Should add the coverage file + assert len(pc.COVERAGE_MANAGER.coverage_files) == 1 + assert coverage_file in pc.COVERAGE_MANAGER.coverage_files + + def test_on_new_project_async( + self, mocker, sublime_window, tmp_path, mock_file_observer, mock_coverage_data + ): + """Test on_new_project_async calls update_available_coverage_files.""" + import python_coverage as pc + from python_coverage import CoverageManager, PythonCoverageDataFileListener + + # Create a .coverage file in the temp directory + coverage_file = tmp_path / ".coverage" + coverage_file.touch() + + # Update window to point to temp directory + sublime_window._folders = [str(tmp_path)] + + # Initialize coverage manager + pc.COVERAGE_MANAGER = CoverageManager() + pc.COVERAGE_MANAGER.initialize() + + mock_settings = mocker.MagicMock() + + # Configure get() to return appropriate values based on key + def settings_get(key, default=None): + if key == "show_missing_lines": + return True + if key == "coverage_file_name": + return ".coverage" + return default + + mock_settings.get.side_effect = settings_get + + with patch("sublime.load_settings", return_value=mock_settings): + listener = PythonCoverageDataFileListener() + listener.on_new_project_async(sublime_window) + + # Should add the coverage file + assert coverage_file in pc.COVERAGE_MANAGER.coverage_files + + def test_on_load_project_async( + self, mocker, sublime_window, tmp_path, mock_file_observer, mock_coverage_data + ): + """Test on_load_project_async calls update_available_coverage_files.""" + import python_coverage as pc + from python_coverage import CoverageManager, PythonCoverageDataFileListener + + # Create a .coverage file in the temp directory + coverage_file = tmp_path / ".coverage" + coverage_file.touch() + + # Update window to point to temp directory + sublime_window._folders = [str(tmp_path)] + + # Initialize coverage manager + pc.COVERAGE_MANAGER = CoverageManager() + pc.COVERAGE_MANAGER.initialize() + + mock_settings = mocker.MagicMock() + + # Configure get() to return appropriate values based on key + def settings_get(key, default=None): + if key == "show_missing_lines": + return True + if key == "coverage_file_name": + return ".coverage" + return default + + mock_settings.get.side_effect = settings_get + + with patch("sublime.load_settings", return_value=mock_settings): + listener = PythonCoverageDataFileListener() + listener.on_load_project_async(sublime_window) + + # Should add the coverage file + assert coverage_file in pc.COVERAGE_MANAGER.coverage_files + + def test_on_pre_close_project( + self, mocker, sublime_window, tmp_path, mock_file_observer, mock_coverage_data + ): + """Test on_pre_close_project removes coverage files.""" + import python_coverage as pc + from python_coverage import CoverageManager, PythonCoverageDataFileListener + + # Create a .coverage file in the temp directory + coverage_file = tmp_path / ".coverage" + coverage_file.touch() + + # Update window to point to temp directory + sublime_window._folders = [str(tmp_path)] + + # Initialize coverage manager and add the file + pc.COVERAGE_MANAGER = CoverageManager() + pc.COVERAGE_MANAGER.initialize() + pc.COVERAGE_MANAGER.add_coverage_file(coverage_file) + + assert coverage_file in pc.COVERAGE_MANAGER.coverage_files + + listener = PythonCoverageDataFileListener() + listener.on_pre_close_project(sublime_window) + + # Should remove the coverage file + assert coverage_file not in pc.COVERAGE_MANAGER.coverage_files + + def test_on_activated_async_calls_update( + self, mocker, sublime_window, tmp_path, mock_file_observer, mock_coverage_data + ): + """Test on_activated_async calls update_available_coverage_files.""" + import python_coverage as pc + from python_coverage import CoverageManager, PythonCoverageDataFileListener + + # Create a .coverage file in the temp directory + coverage_file = tmp_path / ".coverage" + coverage_file.touch() + + # Mock the view + mock_view = mocker.MagicMock() + mock_view.window.return_value = sublime_window + + # Update window to point to temp directory + sublime_window._folders = [str(tmp_path)] + + # Initialize coverage manager + pc.COVERAGE_MANAGER = CoverageManager() + pc.COVERAGE_MANAGER.initialize() + + mock_settings = mocker.MagicMock() + + # Configure get() to return appropriate values based on key + def settings_get(key, default=None): + if key == "show_missing_lines": + return True + if key == "coverage_file_name": + return ".coverage" + return default + + mock_settings.get.side_effect = settings_get + + with patch("sublime.load_settings", return_value=mock_settings): + listener = PythonCoverageDataFileListener() + listener.on_activated_async(mock_view) + + # Should add the coverage file + assert coverage_file in pc.COVERAGE_MANAGER.coverage_files + + +class TestPythonCoverageEventListener: + """Tests for PythonCoverageEventListener.""" + + def test_is_applicable_python_file(self): + """Test is_applicable returns True for Python files.""" + from python_coverage import PythonCoverageEventListener + + settings = {"syntax": "Packages/Python/Python.sublime-syntax"} + assert PythonCoverageEventListener.is_applicable(settings) is True + + def test_is_applicable_non_python_file(self): + """Test is_applicable returns False for non-Python files.""" + from python_coverage import PythonCoverageEventListener + + settings = {"syntax": "Packages/JavaScript/JavaScript.sublime-syntax"} + assert PythonCoverageEventListener.is_applicable(settings) is False + + def test_on_activated_async_feature_disabled(self, mocker, sublime_view): + """Test on_activated_async when feature is disabled.""" + from python_coverage import PythonCoverageEventListener + + mock_settings = mocker.MagicMock() + mock_settings.__getitem__.return_value = False # show_missing_lines = False + + with patch("sublime.load_settings", return_value=mock_settings): + listener = PythonCoverageEventListener(sublime_view) + listener.on_activated_async() + + # Should erase regions + assert "python-coverage" not in sublime_view._regions + + def test_on_activated_async_no_filename(self, mocker, sublime_view): + """Test on_activated_async when view has no filename.""" + from python_coverage import PythonCoverageEventListener + + sublime_view._file_name = None + + mock_settings = mocker.MagicMock() + mock_settings.__getitem__.return_value = True # show_missing_lines = True + + with patch("sublime.load_settings", return_value=mock_settings): + listener = PythonCoverageEventListener(sublime_view) + listener.on_activated_async() + + # Should not add any regions + assert "python-coverage" not in sublime_view._regions + + def test_update_regions_no_coverage_file(self, mocker, sublime_view): + """Test _update_regions when no coverage file is found.""" + from python_coverage import PythonCoverageEventListener + + listener = PythonCoverageEventListener(sublime_view) + listener._update_regions() + + # Should erase regions when no coverage file found + assert "python-coverage" not in sublime_view._regions + + def test_update_regions_with_missing_lines( + self, + mocker, + sublime_view, + temp_coverage_file, + mock_coverage_data, + mock_file_observer, + ): + """Test _update_regions when coverage file has missing lines.""" + import python_coverage as pc + from python_coverage import CoverageManager, PythonCoverageEventListener + + # Initialize manager and add coverage file + pc.COVERAGE_MANAGER = CoverageManager() + pc.COVERAGE_MANAGER.initialize() + pc.COVERAGE_MANAGER.add_coverage_file(temp_coverage_file) + + # Set up the view with a file in the coverage directory + test_file = str(temp_coverage_file.parent / "test.py") + sublime_view._file_name = test_file + sublime_view._content = "def foo():\n pass\n" + sublime_view._size = len(sublime_view._content) + + # Mock measured_files to include our test file + mock_coverage_data.measured_files.return_value = [test_file] + + # Mock missing_lines to return some missing lines + cov_file = pc.COVERAGE_MANAGER.coverage_files[temp_coverage_file] + mocker.patch.object(cov_file, "missing_lines", return_value=[1, 2]) + + listener = PythonCoverageEventListener(sublime_view) + listener._update_regions() + + # Should add regions for missing lines + assert "python-coverage" in sublime_view._regions + regions = sublime_view._regions["python-coverage"]["regions"] + assert len(regions) == 2 + + # Cleanup + pc.COVERAGE_MANAGER.shutdown() + pc.COVERAGE_MANAGER = None + + def test_update_regions_all_lines_covered( + self, + mocker, + sublime_view, + temp_coverage_file, + mock_coverage_data, + mock_file_observer, + ): + """Test _update_regions when all lines are covered.""" + import python_coverage as pc + from python_coverage import CoverageManager, PythonCoverageEventListener + + # Initialize manager and add coverage file + pc.COVERAGE_MANAGER = CoverageManager() + pc.COVERAGE_MANAGER.initialize() + pc.COVERAGE_MANAGER.add_coverage_file(temp_coverage_file) + + # Set up the view + test_file = str(temp_coverage_file.parent / "test.py") + sublime_view._file_name = test_file + sublime_view._content = "def foo():\n pass\n" + sublime_view._size = len(sublime_view._content) + + # Mock measured_files + mock_coverage_data.measured_files.return_value = [test_file] + + # Mock missing_lines to return no missing lines + cov_file = pc.COVERAGE_MANAGER.coverage_files[temp_coverage_file] + mocker.patch.object(cov_file, "missing_lines", return_value=[]) + + listener = PythonCoverageEventListener(sublime_view) + listener._update_regions() + + # Should erase regions when no missing lines + assert "python-coverage" not in sublime_view._regions + + # Cleanup + pc.COVERAGE_MANAGER.shutdown() + pc.COVERAGE_MANAGER = None + + def test_update_regions_handles_errors( + self, + mocker, + sublime_view, + temp_coverage_file, + mock_coverage_data, + mock_file_observer, + ): + """Test _update_regions handles errors gracefully.""" + import python_coverage as pc + from python_coverage import CoverageManager, PythonCoverageEventListener + + # Initialize manager and add coverage file + pc.COVERAGE_MANAGER = CoverageManager() + pc.COVERAGE_MANAGER.initialize() + pc.COVERAGE_MANAGER.add_coverage_file(temp_coverage_file) + + # Set up the view + test_file = str(temp_coverage_file.parent / "test.py") + sublime_view._file_name = test_file + + # Mock measured_files + mock_coverage_data.measured_files.return_value = [test_file] + + # Mock missing_lines to raise an exception + cov_file = pc.COVERAGE_MANAGER.coverage_files[temp_coverage_file] + mocker.patch.object( + cov_file, "missing_lines", side_effect=Exception("Test error") + ) + + listener = PythonCoverageEventListener(sublime_view) + listener._update_regions() + + # Should erase regions on error + assert "python-coverage" not in sublime_view._regions + + # Cleanup + pc.COVERAGE_MANAGER.shutdown() + pc.COVERAGE_MANAGER = None + + def test_on_modified_async_clears_regions(self, mocker, sublime_view): + """Test on_modified_async clears regions.""" + from python_coverage import PythonCoverageEventListener + + # Add some regions first + from tests.mocks.sublime import Region + + sublime_view._regions["python-coverage"] = [Region(0, 10)] + + # Mock settings to enable feature + mock_settings = mocker.MagicMock() + mock_settings.get.return_value = True + + with patch("sublime.load_settings", return_value=mock_settings): + listener = PythonCoverageEventListener(sublime_view) + listener.on_modified_async() + + # Should erase regions on modification + assert "python-coverage" not in sublime_view._regions + + def test_on_close_removes_from_active_views(self, mocker, sublime_view): + """Test on_close removes view from active views.""" + import python_coverage as pc + from python_coverage import PythonCoverageEventListener + + listener = PythonCoverageEventListener(sublime_view) + + # Add to active views + view_id = sublime_view.id() + pc.ACTIVE_VIEWS[view_id] = listener + + listener.on_close() + + # Should be removed from active views + assert view_id not in pc.ACTIVE_VIEWS + + # Cleanup + pc.ACTIVE_VIEWS = {}