Skip to content
75 changes: 69 additions & 6 deletions rsync_time_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import re
import signal
import sys
import tempfile
import time
from contextlib import suppress
from datetime import datetime
from typing import TYPE_CHECKING, Callable, NamedTuple

Expand Down Expand Up @@ -60,6 +62,33 @@ def sanitize(s: str) -> str:
return s.encode("utf-8", "surrogateescape").decode("utf-8", "replace")


def prepare_exclusion_file(exclusion_file: str) -> tuple[str, Callable[[], None]]:
"""Ensure rsync exclusion file ends with a newline to match rsync parsing."""
try:
with open(exclusion_file, "rb") as f:
contents = f.read()
except FileNotFoundError:
log_error(f"Exclusion file '{exclusion_file}' does not exist - aborting.")
sys.exit(1)

if not contents or contents.endswith(b"\n"):

def cleanup_noop() -> None:
return None

return exclusion_file, cleanup_noop

fd, temp_path = tempfile.mkstemp(prefix="rsync-exclude-", suffix=".txt")
with os.fdopen(fd, "wb") as tmp:
tmp.write(contents + b"\n")

def cleanup() -> None:
with suppress(FileNotFoundError):
os.remove(temp_path)

return temp_path, cleanup


def log(message: str, level: str = "info") -> None:
"""Log a message with the specified log level."""
levels = {"info": "", "warning": "[WARNING] ", "error": "[ERROR] "}
Expand Down Expand Up @@ -638,10 +667,34 @@ def get_rsync_flags(
return rsync_flags


def normalize_pid(running_pid: str, ssh: SSH | None = None) -> int | None:
"""Return a valid PID or None when the stored value should be ignored."""
pid_str = running_pid.strip()
try:
pid = int(pid_str)
except ValueError:
return None

if pid <= 0:
return None

if ssh is None and pid == os.getpid():
# Allow re-entrancy within the same local process (e.g. during tests)
return None

return pid


def exit_if_pid_running(running_pid: str, ssh: SSH | None = None) -> None:
"""Exit if another instance of this script is already running."""
pid = normalize_pid(running_pid, ssh)
if pid is None:
return

pid_str = str(pid)

if sys.platform == "cygwin":
cmd = f"procps -wwfo cmd -p {running_pid} --no-headers | grep '{APPNAME}'"
cmd = f"procps -wwfo cmd -p {pid_str} --no-headers | grep '{APPNAME}'"
running_cmd = run_cmd(cmd, ssh)
if running_cmd.returncode == 0:
log_error(
Expand All @@ -650,7 +703,7 @@ def exit_if_pid_running(running_pid: str, ssh: SSH | None = None) -> None:
sys.exit(1)
else:
ps_flags = "-axp" if sys.platform.startswith("netbsd") else "-p"
cmd = f"ps {ps_flags} {running_pid} -o 'command' | grep '{APPNAME}'"
cmd = f"ps {ps_flags} {pid_str} -o 'command' | grep '{APPNAME}'"
if run_cmd(cmd).stdout:
log_error("Previous backup task is still active - aborting.")
sys.exit(1)
Expand Down Expand Up @@ -694,7 +747,7 @@ def deal_with_no_space_left(
auto_expire: bool,
) -> bool:
"""Deal with no space left on device."""
with open(log_file) as f:
with open(log_file, encoding="utf-8", errors="surrogateescape") as f:
log_data = f.read()

no_space_left = re.search(
Expand Down Expand Up @@ -727,7 +780,7 @@ def check_rsync_errors(
auto_delete_log: bool, # noqa: FBT001
) -> None:
"""Check rsync errors."""
with open(log_file) as f:
with open(log_file, encoding="utf-8", errors="surrogateescape") as f:
log_data = f.read()
if "rsync error:" in log_data:
log_error(
Expand Down Expand Up @@ -765,6 +818,10 @@ def start_backup(
log_dir,
f"{now}.log",
)

def cleanup_exclusion() -> None:
return None

if ssh is not None:
src_folder = f"{ssh.src_folder_prefix}{src_folder}"
dest = f"{ssh.dest_folder_prefix}{dest}"
Expand All @@ -780,7 +837,10 @@ def start_backup(
cmd = f"{cmd} {' '.join(rsync_flags)}"
cmd = f"{cmd} --log-file '{log_file}'"
if exclusion_file:
cmd = f"{cmd} --exclude-from '{exclusion_file}'"
prepared_exclusion_file, cleanup_exclusion = prepare_exclusion_file(
exclusion_file,
)
cmd = f"{cmd} --exclude-from '{prepared_exclusion_file}'"

cmd = f"{cmd} {link_dest_option}"
cmd = f"{cmd} -- '{src_folder}/' '{dest}/'"
Expand All @@ -790,7 +850,10 @@ def start_backup(

run_cmd(f"echo {mypid} > {inprogress_file}", ssh)

run_cmd(cmd)
try:
run_cmd(cmd)
finally:
cleanup_exclusion()
return log_file


Expand Down
241 changes: 241 additions & 0 deletions tests/test_diff_coverage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
"""Additional tests ensuring full coverage of the PR diff."""

from __future__ import annotations

import importlib
import os
from pathlib import Path
from typing import Any

import pytest


@pytest.fixture
def rtm() -> Any:
"""Reload the module so coverage tracks executed lines precisely."""
module = importlib.import_module("rsync_time_machine")
return importlib.reload(module)


def test_prepare_exclusion_file_missing_file(tmp_path: Path, rtm: Any) -> None:
"""Missing exclusion files should trigger a fatal error."""
module = rtm
missing = tmp_path / "does-not-exist.txt"
with pytest.raises(SystemExit):
module.prepare_exclusion_file(str(missing))


def test_prepare_exclusion_file_noop_cleanup(tmp_path: Path, rtm: Any) -> None:
"""A newline-terminated exclusion file is returned untouched."""
module = rtm
exclusion = tmp_path / "exclude.txt"
exclusion.write_bytes(b"pattern\n")

prepared, cleanup = module.prepare_exclusion_file(str(exclusion))

assert prepared == str(exclusion)
cleanup() # ensure the no-op path is executed
assert exclusion.exists()


def test_prepare_exclusion_file_appends_newline_and_cleans(
tmp_path: Path, rtm: Any, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Files without a trailing newline are copied and cleaned up after use."""
module = rtm
exclusion = tmp_path / "exclude.txt"
exclusion.write_bytes(b"pattern")

created_paths: list[Path] = []

original_mkstemp = module.tempfile.mkstemp

def fake_mkstemp(prefix: str, suffix: str) -> tuple[int, str]:
fd, path = original_mkstemp(prefix=prefix, suffix=suffix, dir=tmp_path)
created_paths.append(Path(path))
return fd, path

monkeypatch.setattr(module.tempfile, "mkstemp", fake_mkstemp)

prepared, cleanup = module.prepare_exclusion_file(str(exclusion))

prepared_path = Path(prepared)
assert prepared_path in created_paths
assert prepared_path.read_bytes() == b"pattern\n"

cleanup()
for path in created_paths:
assert not path.exists()


def test_normalize_pid_variants(rtm: Any) -> None:
"""Exercise the PID normalisation helper across its branches."""
module = rtm
assert module.normalize_pid("", None) is None
assert module.normalize_pid("0", None) is None

current_pid = os.getpid()
assert module.normalize_pid(str(current_pid), None) is None

fake_ssh = module.SSH("", "", "", "", "", "22", None)
assert module.normalize_pid(str(current_pid), fake_ssh) == current_pid

assert module.normalize_pid("1234", None) == 1234


def test_exit_if_pid_running_invokes_ps(
rtm: Any, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Valid PIDs should lead to a ps invocation with the stripped PID string."""
module = rtm
commands: list[str] = []

def fake_run_cmd(cmd: str, ssh: Any | None = None) -> Any:
commands.append(cmd)
return module.CmdResult("", "", 0)

monkeypatch.setattr(module, "run_cmd", fake_run_cmd)

module.exit_if_pid_running(" 42 ")

expected = f"ps -p 42 -o 'command' | grep '{module.APPNAME}'"
assert expected in commands


def test_exit_if_pid_running_detects_active_process(
rtm: Any, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A matching process triggers an early exit."""
module = rtm

def fake_run_cmd(cmd: str, ssh: Any | None = None) -> Any:
if cmd.startswith("ps "):
return module.CmdResult("pytest", "", 0)
return module.CmdResult("", "", 0)

monkeypatch.setattr(module, "run_cmd", fake_run_cmd)

with pytest.raises(SystemExit):
module.exit_if_pid_running("4242")


def test_exit_if_pid_running_cygwin_branch(
rtm: Any, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""The cygwin branch should use the normalised PID and exit when active."""
module = rtm
commands: list[str] = []

monkeypatch.setattr(module.sys, "platform", "cygwin")

def fake_run_cmd(cmd: str, ssh: Any | None = None) -> Any:
commands.append(cmd)
return module.CmdResult("", "", 0)

monkeypatch.setattr(module, "run_cmd", fake_run_cmd)

with pytest.raises(SystemExit):
module.exit_if_pid_running("0100")

expected = f"procps -wwfo cmd -p 100 --no-headers | grep '{module.APPNAME}'"
assert expected in commands


def test_start_backup_cleans_temp_exclusion_on_failure(
tmp_path: Path, rtm: Any, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""start_backup removes temporary exclude files even when rsync fails."""
module = rtm
src = tmp_path / "src"
dest = tmp_path / "dest"
log_dir = tmp_path / "logs"
src.mkdir()
dest.mkdir()
log_dir.mkdir()

exclusion = tmp_path / "exclude.txt"
exclusion.write_text("pattern")

created_paths: list[Path] = []

original_mkstemp = module.tempfile.mkstemp

def fake_mkstemp(prefix: str, suffix: str) -> tuple[int, str]:
fd, path = original_mkstemp(prefix=prefix, suffix=suffix, dir=tmp_path)
created_paths.append(Path(path))
return fd, path

monkeypatch.setattr(module.tempfile, "mkstemp", fake_mkstemp)

def fake_run_cmd(cmd: str, ssh: Any | None = None) -> Any:
if cmd.startswith("echo"):
_, path = cmd.split(">", maxsplit=1)
Path(path.strip().strip("'\"")).write_text(str(os.getpid()))
return module.CmdResult("", "", 0)
raise RuntimeError("rsync failed")

monkeypatch.setattr(module, "run_cmd", fake_run_cmd)

with pytest.raises(RuntimeError, match="rsync failed"):
module.start_backup(
src_folder=str(src),
dest=str(dest),
exclusion_file=str(exclusion),
inprogress_file=str(dest / "backup.inprogress"),
link_dest_option="",
rsync_flags=["-a"],
log_dir=str(log_dir),
mypid=os.getpid(),
ssh=None,
now="2025-10-13-211815",
)

for path in created_paths:
assert not path.exists()


def test_deal_with_no_space_left_handles_non_utf8(
tmp_path: Path, rtm: Any, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Non-UTF8 log content is parsed safely when space runs out."""
module = rtm
log_file = tmp_path / "backup.log"
log_file.write_bytes(b"\xffNo space left on device (28)")

monkeypatch.setattr(module, "find_backups", lambda dest, ssh: ["a", "b"])

expired: list[str] = []

def fake_expire(folder: str, ssh: Any) -> None:
expired.append(folder)

monkeypatch.setattr(module, "expire_backup", fake_expire)
monkeypatch.setattr(module, "log_warn", lambda message: None)
monkeypatch.setattr(module, "log_error", lambda message: None)

result = module.deal_with_no_space_left(
log_file=str(log_file),
dest_folder=str(tmp_path),
ssh=None,
auto_expire=True,
)

assert result is True
assert expired


def test_check_rsync_errors_handles_non_utf8_error(
tmp_path: Path, rtm: Any, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Log parsing tolerates undecodable bytes when reporting rsync errors."""
module = rtm
log_file = tmp_path / "rsync.log"
log_file.write_bytes(b"\xffrsync error: something broke")

messages: list[str] = []

monkeypatch.setattr(module, "log_error", lambda message: messages.append(message))

module.check_rsync_errors(str(log_file), auto_delete_log=False)

assert messages and "rsync error" in messages[0]
Loading