diff --git a/src/hooks/config.py b/src/hooks/config.py index 1f32f258..472dab83 100644 --- a/src/hooks/config.py +++ b/src/hooks/config.py @@ -27,9 +27,108 @@ # Presidio DEFAULT_LANGUAGE_CODE = "en" - ENGINE_CONFIG_FILE = "engine_config.yaml" NLP_CONFIG_FILE = "nlp_config.yaml" RECOGNIZER_CONFIG_FILE = "recognizer_config.yaml" DEFAULT_FILE_TYPES = [".txt", ".yml", ".yaml", ".csv"] PRESIDIO_EXCLUSIONS_FILE_PATH = "personal-data-exclusions.txt" + +# File verification +FILE_VERIFICATION_EXCLUSIONS_FILE_PATH = "file-verifications-exclusions.txt" +MAX_FILE_SIZE_BYTES = 1024 * 500 +BLOCKED_FILE_EXTENSION_REGEX = [ + # Databases + r"\.backup$", + r"\.bak$", + # Worksheets + r"\.xlsx$", + r"\.xls$", + # Word Legacy + r"\.doc$", + r"\.dot$", + r"\.wbk$", + #  Word Office Open XML (OOXML) format + r"\.docx$", + r"\.docm$", + r"\.dotx$", + r"\.dotm$", + r"\.docb$", + # Excel + r"\.xls$", + r"\.xlt$", + r"\.xlm$", + #  Excel OOXML + r"\.xlsx$", + r"\.xlsm$", + r"\.xltx$", + r"\.xltm$", + # Other formats + r"\.xlsb$", + r"\.xla$", + r"\.xlam$", + r"\.xll$", + r"\.xlw$", + # PowerPoint legacy + r"\.ppt$", + r"\.pot$", + r"\.pps$", + # OOXML + r"\.pptx$", + r"\.pptm$", + r"\.potx$", + r"\.potm$", + r"\.ppam$", + r"\.ppsx$", + r"\.ppsm$", + r"\.sldx$", + r"\.sldm$", + # Access + r"\.accdb$", + r"\.accde$", + r"\.accdt$", + r"\.accdr$", + # OneNote + r"\.one$", + # Publisher + r"\.pub$", + # XPS Document + r"\.xps$", + # Adobe + r"\.pdf$", + r"\.ps$", + r"\.eps$" + r"\.prn$", + # Secret files + r"\.p12$", + r"\.pfx$", + r"\.pkcs12$", + r"\.pem$", + r"_rsa$", + r"_dsa$", + r"]_ed25519$", + r"_ecdsa$", + r"\.jks$", + # bash/zsh rc file: + r"^\.?(bash|zsh)?rc$", + # bash/zsh profile: + r"^\.?(bash|zsh)_profile$", + # bash/zsh aliases file: + r"^\.?(bash|zsh)_aliases$", + # credential(s) file: + r"^\.credential(s)?$", + # Github Enterprise file: + r"^\.githubenterprise$", + # Apple Keychain file: + r"^\.*keychain$", + # Keystore/Keyring file: + r"^key(store|ring)$", + # Keepass secret file + r"^\.*kdb", + # Archive files: + r"\.zip$", + r"\.rar$", + r"\.7z$", + r"\.tar$", + r"\.gz$", + r"\.bz2$", +] diff --git a/src/hooks/file_verification/__init__.py b/src/hooks/file_verification/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/hooks/file_verification/scanner.py b/src/hooks/file_verification/scanner.py new file mode 100644 index 00000000..290ad6c6 --- /dev/null +++ b/src/hooks/file_verification/scanner.py @@ -0,0 +1,111 @@ +import asyncio +import re + + +from anyio import Path, open_file +from io import StringIO +from typing import List + +from src.hooks.config import ( + BLOCKED_FILE_EXTENSION_REGEX, + FILE_VERIFICATION_EXCLUSIONS_FILE_PATH, + LOGGER, + MAX_FILE_SIZE_BYTES, +) + +logger = LOGGER + + +class FileVerificationScanResult: + def __init__(self, forbidden: List[str] | None = None, exceeds_file_size: List[str] | None = None) -> None: + self.forbidden = forbidden if forbidden else [] + self.exceeds_file_size = exceeds_file_size if exceeds_file_size else [] + + def __str__(self) -> str: + with StringIO() as output_buffer: + output_buffer.write("--------FILE VERIFICATION SCAN SUMMARY--------") + + if not self.forbidden and not self.exceeds_file_size: + output_buffer.write("No file verification issues detected") + + if self.forbidden: + output_buffer.write("\n\nFILES WITH A FORBIDDEN FILE EXTENSION\n") + for forbidden in self.forbidden: + output_buffer.write(forbidden) + output_buffer.write("\n") + + if self.exceeds_file_size: + output_buffer.write("\n\nFILES THAT EXCEED THE MAXIMUM FILE SIZE\n") + for exceeds in self.exceeds_file_size: + output_buffer.write(exceeds) + output_buffer.write("\n") + + return output_buffer.getvalue() + + +class FileVerificationScanner: + def __init__( + self, + verbose: bool = False, + paths: List[str] | None = None, + ) -> None: + self.verbose = verbose + self.paths = paths if paths else [] + + def _is_path_blocked(self, path: str, file_extension_regex: list[str]): + return any(re.search(regex, path) for regex in file_extension_regex) + + async def _check_file_size_exceeds_maximum(self, path: str, results: list[str]): + stat_result = await Path(path).stat() + if stat_result.st_size > MAX_FILE_SIZE_BYTES: + logger.debug( + "Path %s has a file size of %s which is above the maximum of %s", + path, + stat_result.st_size, + MAX_FILE_SIZE_BYTES, + ) + results.append(path) + + async def _get_exclusions(self, exclusions_file: str) -> list[str]: + exclusions = [] + + if not await Path(exclusions_file).exists(): + logger.debug("The file verification exclusions file %s is not present", exclusions_file) + return exclusions + + async with await open_file(exclusions_file) as f: + async for exclusion in f: + exclusions.append(exclusion.rstrip()) + + logger.debug("Loaded exclusions from file %s", FILE_VERIFICATION_EXCLUSIONS_FILE_PATH) + return exclusions + + async def _get_paths_to_scan(self, paths) -> list[str]: + exclusions = await self._get_exclusions(exclusions_file=FILE_VERIFICATION_EXCLUSIONS_FILE_PATH) + + if not exclusions: + return paths + + paths_to_scan = [] + for path in paths: + if path in exclusions: + logger.debug("Path %s is excluded from file verification scan", path) + continue + paths_to_scan.append(path) + return paths_to_scan + + async def scan(self) -> FileVerificationScanResult: + blocked_file_extension_paths: list[str] = [] + exceeds_file_size_paths: list[str] = [] + tasks: list[asyncio.Task] = [] + + async with asyncio.TaskGroup() as tg: + for path in await self._get_paths_to_scan(self.paths): + match = self._is_path_blocked(path, BLOCKED_FILE_EXTENSION_REGEX) + if match: + logger.debug("Path %s has a forbidden file extension", path) + blocked_file_extension_paths.append(path) + + tasks.append(tg.create_task(self._check_file_size_exceeds_maximum(path, exceeds_file_size_paths))) + + return FileVerificationScanResult(blocked_file_extension_paths, exceeds_file_size_paths) diff --git a/src/hooks/run_security_scan.py b/src/hooks/run_security_scan.py index 893f6b3b..2ff0ea2b 100644 --- a/src/hooks/run_security_scan.py +++ b/src/hooks/run_security_scan.py @@ -13,6 +13,7 @@ RELEASE_CHECK_URL, SECURITY_SCAN, ) +from src.hooks.file_verification.scanner import FileVerificationScanResult, FileVerificationScanner from src.hooks.hooks_base import Hook, HookRunResult from src.hooks.presidio.scanner import PresidioScanResult, PresidioScanner from src.hooks.trufflehog.scanner import TrufflehogScanResult, TrufflehogScanner @@ -24,11 +25,13 @@ class RunSecurityScanResult(HookRunResult): def __init__( self, - trufflehog_scan_result: TrufflehogScanResult, - presidio_scan_result: PresidioScanResult, + trufflehog_scan_result: TrufflehogScanResult | None, + presidio_scan_result: PresidioScanResult | None, + file_verification_task_result: FileVerificationScanResult | None, ): self.trufflehog_scan_result = trufflehog_scan_result self.presidio_scan_result = presidio_scan_result + self.file_verification_task_result = file_verification_task_result def run_success(self) -> bool: is_success = True @@ -41,18 +44,29 @@ def run_success(self) -> bool: and len(self.presidio_scan_result.paths_containing_personal_data) > 0 ): is_success = False + if self.file_verification_task_result: + if ( + self.file_verification_task_result.exceeds_file_size + and len(self.file_verification_task_result.exceeds_file_size) > 0 + ) or (self.file_verification_task_result.forbidden and len(self.file_verification_task_result.forbidden) > 0): + is_success = False return is_success def run_summary(self) -> str | None: trufflehog_summary = "" presidio_summary = "" + file_verification_summary = "" + if self.trufflehog_scan_result: trufflehog_summary = str(self.trufflehog_scan_result) if self.presidio_scan_result: presidio_summary = str(self.presidio_scan_result) - return "".join(["\n", trufflehog_summary, "\n", "\n", presidio_summary]) + if self.file_verification_task_result: + file_verification_summary = str(self.file_verification_task_result) + + return "".join(["\n", trufflehog_summary, "\n", "\n", presidio_summary, "\n", "\n", file_verification_summary]) class RunSecurityScan(Hook): @@ -151,9 +165,16 @@ async def run_personal_scan(self) -> PresidioScanResult: paths_to_scan, ).scan() + async def run_file_verification_scan(self): + if self.github_action: # only scan new files + return None + + return await FileVerificationScanner(self.verbose, self.paths).scan() + async def run(self) -> RunSecurityScanResult: security_scan_task = None personal_data_scan_task = None + file_verification_task = None async with asyncio.TaskGroup() as tg: if SECURITY_SCAN not in self.excluded_scans: @@ -168,10 +189,14 @@ async def run(self) -> RunSecurityScanResult: else: logger.debug("Personal data scan is excluded") + file_verification_task = tg.create_task(self.run_file_verification_scan()) + security_scan_result = security_scan_task.result() if security_scan_task else None personal_data_scan_result = personal_data_scan_task.result() if personal_data_scan_task else None + file_verification_task_result = file_verification_task.result() if file_verification_task else None return RunSecurityScanResult( trufflehog_scan_result=security_scan_result, presidio_scan_result=personal_data_scan_result, + file_verification_task_result=file_verification_task_result, ) diff --git a/tests/integration/hooks/test_cli.py b/tests/integration/hooks/test_cli.py index 56f79ea8..2d56a281 100644 --- a/tests/integration/hooks/test_cli.py +++ b/tests/integration/hooks/test_cli.py @@ -1,10 +1,13 @@ +import os +from random import randint import tempfile +from typing import List from anyio import NamedTemporaryFile, TemporaryDirectory from unittest.mock import patch from src.hooks.cli import main_async, main -from src.hooks.config import TRUFFLEHOG_ERROR_CODE +from src.hooks.config import MAX_FILE_SIZE_BYTES, PERSONAL_DATA_SCAN, TRUFFLEHOG_ERROR_CODE class TestCLI: @@ -47,8 +50,9 @@ async def test_run_scan_with_secret_data(self): mock_run_process.return_value.returncode = TRUFFLEHOG_ERROR_CODE result = await main_async(["run_scan", "-v", root_td]) - assert mock_run_process.was_called() + assert result == 1 + mock_run_process.assert_called() async def test_run_scan_with_personal_data(self): async with ( @@ -71,7 +75,69 @@ async def test_run_scan_with_personal_data(self): mock_run_process.return_value.returncode = 0 result = await main_async(["run_scan", "-v", root_file.name]) - assert mock_run_process.was_called() + + assert result == 1 + mock_run_process.assert_called() + + async def test_run_scan_with_large_files(self): + async with ( + TemporaryDirectory() as root_td, + ): + with patch("src.hooks.trufflehog.scanner.run_process") as mock_run_process: + large_files: List[str] = [] + for _ in range(0, randint(3, 10)): + async with NamedTemporaryFile( + dir=root_td, + mode="wb", + prefix="large_file_", + suffix=".txt", + delete=False, # Delete handled by the directory being deleted + ) as ntf: + await ntf.write(os.urandom(randint(MAX_FILE_SIZE_BYTES, MAX_FILE_SIZE_BYTES * 2))) + large_files.append(ntf.name) + + small_files: List[str] = [] + for _ in range(0, randint(8, 15)): + async with NamedTemporaryFile( + dir=root_td, + mode="wb", + prefix="small_file_", + suffix=".txt", + delete=False, # Delete handled by the directory being deleted + ) as ntf: + await ntf.write(os.urandom(randint(10, MAX_FILE_SIZE_BYTES - 1))) + small_files.append(ntf.name) + + # trufflehog needs to be installed, mock the subprocess.run call to avoid calling directly + mock_run_process.return_value.stdout = "".encode() + mock_run_process.return_value.returncode = 0 + + result = await main_async(["run_scan", "-v", "-x", PERSONAL_DATA_SCAN] + large_files + small_files) + + mock_run_process.assert_called() + assert result == 1 + + async def test_run_scan_with_blocked_files(self): + async with ( + TemporaryDirectory() as root_td, + ): + with patch("src.hooks.trufflehog.scanner.run_process") as mock_run_process: + blocked_files: List[str] = [] + for file_extension in [".pdf", ".xlsx", ".bak", ".pem"]: + async with NamedTemporaryFile( + dir=root_td, + mode="wb", + suffix=file_extension, + delete=False, # Delete handled by the directory being deleted + ) as ntf: + blocked_files.append(ntf.name) + + mock_run_process.return_value.stdout = "".encode() + mock_run_process.return_value.returncode = 0 + + result = await main_async(["run_scan", "-v"] + blocked_files) + + mock_run_process.assert_called() assert result == 1 async def test_run_scan_with_no_failures(self): @@ -93,5 +159,6 @@ async def test_run_scan_with_no_failures(self): mock_run_process.return_value.returncode = 0 result = await main_async(["run_scan", "-v", root_file.name, dir_file2.name]) + + mock_run_process.assert_called() assert result == 0 - assert mock_run_process.was_called() diff --git a/tests/unit/hooks/file_verification/__init__.py b/tests/unit/hooks/file_verification/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/hooks/file_verification/test_scanner.py b/tests/unit/hooks/file_verification/test_scanner.py new file mode 100644 index 00000000..0c70eed2 --- /dev/null +++ b/tests/unit/hooks/file_verification/test_scanner.py @@ -0,0 +1,75 @@ +import os +from unittest import mock +from anyio import NamedTemporaryFile +import pytest +from src.hooks.config import BLOCKED_FILE_EXTENSION_REGEX, MAX_FILE_SIZE_BYTES +from src.hooks.file_verification.scanner import FileVerificationScanner + + +class TestFileVerificationScanner: + @pytest.mark.parametrize("file_extension", [".zip", ".pfx", ".xps", ".bak", ".xlsx"]) + def test_is_path_blocked_returns_false_for_invalid_path(self, file_extension): + assert FileVerificationScanner()._is_path_blocked(f"file.{file_extension}", BLOCKED_FILE_EXTENSION_REGEX) is True + + @pytest.mark.parametrize("file_extension", [".py", ".js", ".ts"]) + def test_is_path_blocked_returns_none_for_valid_path(self, file_extension): + assert FileVerificationScanner()._is_path_blocked(f"file.{file_extension}", BLOCKED_FILE_EXTENSION_REGEX) is False + + async def test_check_file_size_exceeds_maximum_adds_file_to_list_when_file_size_is_above_maximum(self): + files = [] + async with NamedTemporaryFile( + mode="wb", + ) as ntf: + await ntf.write(os.urandom(MAX_FILE_SIZE_BYTES + 1)) + await FileVerificationScanner()._check_file_size_exceeds_maximum(ntf.name, files) + assert ntf.name in files + + async def test_check_file_size_exceeds_maximum_does_not_add_file_to_list_when_file_size_is_below_maximum(self): + files = [] + async with NamedTemporaryFile( + mode="wb", + ) as ntf: + await ntf.write(os.urandom(100)) + await FileVerificationScanner()._check_file_size_exceeds_maximum(ntf.name, files) + assert ntf.name not in files + + async def test_get_exclusions_returns_empty_list_when_exclusions_file_is_missing(self): + assert await FileVerificationScanner()._get_exclusions("not_present_file.txt") == [] + + async def test_get_exclusions_returns_all_exclusions_in_exclusions_file(self): + async with NamedTemporaryFile("w+t") as exclusions_file: + await exclusions_file.writelines(["file1.txt", os.linesep, "file2.csv"]) + await exclusions_file.seek(0) + + assert await FileVerificationScanner()._get_exclusions(exclusions_file.name) == ["file1.txt", "file2.csv"] + + async def test_get_paths_to_scan_returns_same_paths_if_no_exclusions_exist(self): + paths = ["file1.pdf", "file2.py", "file3.yml"] + with mock.patch.object(FileVerificationScanner, "_get_exclusions", return_value=[]): + assert await FileVerificationScanner()._get_paths_to_scan(paths) == paths + + async def test_get_paths_to_scan_returns_only_paths_not_in_the_exclusions_list(self): + paths = ["file1.pdf", "file2.py", "file3.yml"] + with mock.patch.object(FileVerificationScanner, "_get_exclusions", return_value=["file1.pdf"]): + assert await FileVerificationScanner()._get_paths_to_scan(paths) == ["file2.py", "file3.yml"] + + async def test_scan_returns_result_with_blocked(self): + def check_file_size(path, files): + if path == "file1.txt": + files.append(path) + + mock_is_path_blocked = mock.MagicMock() + mock_is_path_blocked.side_effect = [False, False, True] # block file3.xlsx + + mock_check_file_size_exceeds_maximum = mock.AsyncMock() + mock_check_file_size_exceeds_maximum.side_effect = check_file_size + + with mock.patch.multiple( + FileVerificationScanner, + _is_path_blocked=mock_is_path_blocked, + _check_file_size_exceeds_maximum=mock_check_file_size_exceeds_maximum, + ): + scan_result = await FileVerificationScanner(paths=["file1.txt", "file2.csv", "file3.xlsx"]).scan() + + assert scan_result.forbidden == ["file3.xlsx"] + assert scan_result.exceeds_file_size == ["file1.txt"] diff --git a/tests/unit/hooks/test_run_security_scan.py b/tests/unit/hooks/test_run_security_scan.py index ca8d03f1..d3787450 100644 --- a/tests/unit/hooks/test_run_security_scan.py +++ b/tests/unit/hooks/test_run_security_scan.py @@ -17,6 +17,7 @@ RELEASE_CHECK_URL, SECURITY_SCAN, ) +from src.hooks.file_verification.scanner import FileVerificationScanResult from src.hooks.presidio.path_filter import PathScanStatus from src.hooks.presidio.scanner import PersonalDataDetection, PresidioScanResult, PathScanResult from src.hooks.run_security_scan import RunSecurityScan @@ -201,22 +202,39 @@ async def test_run_personal_scan_without_data_detected_returns_expected_results( assert len(result.paths_containing_personal_data) == 0 assert len(result.paths_without_personal_data) == 1 + async def test_run_file_verification_with_github_action_true_returns_none(self): + scan_result = FileVerificationScanResult() + + mock_scan_result = AsyncMock() + mock_scan_result.return_value = scan_result + + with patch("src.hooks.run_security_scan.FileVerificationScanner") as mock_scanner: + mock_scanner().scan = mock_scan_result + + scan = RunSecurityScan(github_action=True) + + assert await scan.run_file_verification_scan() is None + async def test_run_with_run_security_scan_true_and_run_personal_scan_true_returns_result_for_both( self, ): with ( patch.object(RunSecurityScan, "run_security_scan") as mock_run_security_scan, patch.object(RunSecurityScan, "run_personal_scan") as mock_run_personal_scan, + patch.object(RunSecurityScan, "run_file_verification_scan") as mock_run_file_verification_scan, ): mock_run_security_scan.return_value = TrufflehogScanResult() mock_run_personal_scan.return_value = PresidioScanResult() + mock_run_file_verification_scan.return_value = FileVerificationScanResult() result = await RunSecurityScan().run() assert result.trufflehog_scan_result is not None assert result.presidio_scan_result is not None + assert result.trufflehog_scan_result is not None mock_run_personal_scan.assert_called_once() mock_run_security_scan.assert_called_once() + mock_run_file_verification_scan.assert_called_once() async def test_run_with_run_security_scan_excluded_does_not_run_a_security_scan( self,