Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 100 additions & 1 deletion src/hooks/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,108 @@

# Presidio
DEFAULT_LANGUAGE_CODE = "en"

ENGINE_CONFIG_FILE = "engine_config.yaml"
NLP_CONFIG_FILE = "nlp_config.yaml"
RECOGNIZER_CONFIG_FILE = "recognizer_config.yaml"
DEFAULT_FILE_TYPES = [".txt", ".yml", ".yaml", ".csv"]
PRESIDIO_EXCLUSIONS_FILE_PATH = "personal-data-exclusions.txt"

# File verification
FILE_VERIFICATION_EXCLUSIONS_FILE_PATH = "file-verifications-exclusions.txt"
MAX_FILE_SIZE_BYTES = 1024 * 500
BLOCKED_FILE_EXTENSION_REGEX = [
# Databases
r"\.backup$",
r"\.bak$",
# Worksheets
r"\.xlsx$",
r"\.xls$",
# Word Legacy
r"\.doc$",
r"\.dot$",
r"\.wbk$",
#  Word Office Open XML (OOXML) format
r"\.docx$",
r"\.docm$",
r"\.dotx$",
r"\.dotm$",
r"\.docb$",
# Excel
r"\.xls$",
r"\.xlt$",
r"\.xlm$",
#  Excel OOXML
r"\.xlsx$",
r"\.xlsm$",
r"\.xltx$",
r"\.xltm$",
# Other formats
r"\.xlsb$",
r"\.xla$",
r"\.xlam$",
r"\.xll$",
r"\.xlw$",
# PowerPoint legacy
r"\.ppt$",
r"\.pot$",
r"\.pps$",
# OOXML
r"\.pptx$",
r"\.pptm$",
r"\.potx$",
r"\.potm$",
r"\.ppam$",
r"\.ppsx$",
r"\.ppsm$",
r"\.sldx$",
r"\.sldm$",
# Access
r"\.accdb$",
r"\.accde$",
r"\.accdt$",
r"\.accdr$",
# OneNote
r"\.one$",
# Publisher
r"\.pub$",
# XPS Document
r"\.xps$",
# Adobe
r"\.pdf$",
r"\.ps$",
r"\.eps$"
r"\.prn$",
# Secret files
r"\.p12$",
r"\.pfx$",
r"\.pkcs12$",
r"\.pem$",
r"_rsa$",
r"_dsa$",
r"]_ed25519$",
r"_ecdsa$",
r"\.jks$",
# bash/zsh rc file:
r"^\.?(bash|zsh)?rc$",
# bash/zsh profile:
r"^\.?(bash|zsh)_profile$",
# bash/zsh aliases file:
r"^\.?(bash|zsh)_aliases$",
# credential(s) file:
r"^\.credential(s)?$",
# Github Enterprise file:
r"^\.githubenterprise$",
# Apple Keychain file:
r"^\.*keychain$",
# Keystore/Keyring file:
r"^key(store|ring)$",
# Keepass secret file
r"^\.*kdb",
# Archive files:
r"\.zip$",
r"\.rar$",
r"\.7z$",
r"\.tar$",
r"\.gz$",
r"\.bz2$",
]
Empty file.
111 changes: 111 additions & 0 deletions src/hooks/file_verification/scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import asyncio
import re


from anyio import Path, open_file
from io import StringIO
from typing import List

from src.hooks.config import (
BLOCKED_FILE_EXTENSION_REGEX,
FILE_VERIFICATION_EXCLUSIONS_FILE_PATH,
LOGGER,
MAX_FILE_SIZE_BYTES,
)

logger = LOGGER


class FileVerificationScanResult:
def __init__(self, forbidden: List[str] | None = None, exceeds_file_size: List[str] | None = None) -> None:
self.forbidden = forbidden if forbidden else []
self.exceeds_file_size = exceeds_file_size if exceeds_file_size else []

def __str__(self) -> str:
with StringIO() as output_buffer:
output_buffer.write("--------FILE VERIFICATION SCAN SUMMARY--------")

if not self.forbidden and not self.exceeds_file_size:
output_buffer.write("No file verification issues detected")

if self.forbidden:
output_buffer.write("\n\nFILES WITH A FORBIDDEN FILE EXTENSION\n")
for forbidden in self.forbidden:
output_buffer.write(forbidden)
output_buffer.write("\n")

if self.exceeds_file_size:
output_buffer.write("\n\nFILES THAT EXCEED THE MAXIMUM FILE SIZE\n")
for exceeds in self.exceeds_file_size:
output_buffer.write(exceeds)
output_buffer.write("\n")

return output_buffer.getvalue()


class FileVerificationScanner:
def __init__(
self,
verbose: bool = False,
paths: List[str] | None = None,
) -> None:
self.verbose = verbose
self.paths = paths if paths else []

def _is_path_blocked(self, path: str, file_extension_regex: list[str]):
return any(re.search(regex, path) for regex in file_extension_regex)

async def _check_file_size_exceeds_maximum(self, path: str, results: list[str]):
stat_result = await Path(path).stat()
if stat_result.st_size > MAX_FILE_SIZE_BYTES:
logger.debug(
"Path %s has a file size of %s which is above the maximum of %s",
path,
stat_result.st_size,
MAX_FILE_SIZE_BYTES,
)
results.append(path)

async def _get_exclusions(self, exclusions_file: str) -> list[str]:
exclusions = []

if not await Path(exclusions_file).exists():
logger.debug("The file verification exclusions file %s is not present", exclusions_file)
return exclusions

async with await open_file(exclusions_file) as f:
async for exclusion in f:
exclusions.append(exclusion.rstrip())

logger.debug("Loaded exclusions from file %s", FILE_VERIFICATION_EXCLUSIONS_FILE_PATH)
return exclusions

async def _get_paths_to_scan(self, paths) -> list[str]:
exclusions = await self._get_exclusions(exclusions_file=FILE_VERIFICATION_EXCLUSIONS_FILE_PATH)

if not exclusions:
return paths

paths_to_scan = []
for path in paths:
if path in exclusions:
logger.debug("Path %s is excluded from file verification scan", path)
continue
paths_to_scan.append(path)
return paths_to_scan

async def scan(self) -> FileVerificationScanResult:
blocked_file_extension_paths: list[str] = []
exceeds_file_size_paths: list[str] = []
tasks: list[asyncio.Task] = []

async with asyncio.TaskGroup() as tg:
for path in await self._get_paths_to_scan(self.paths):
match = self._is_path_blocked(path, BLOCKED_FILE_EXTENSION_REGEX)
if match:
logger.debug("Path %s has a forbidden file extension", path)
blocked_file_extension_paths.append(path)

tasks.append(tg.create_task(self._check_file_size_exceeds_maximum(path, exceeds_file_size_paths)))

return FileVerificationScanResult(blocked_file_extension_paths, exceeds_file_size_paths)
31 changes: 28 additions & 3 deletions src/hooks/run_security_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
RELEASE_CHECK_URL,
SECURITY_SCAN,
)
from src.hooks.file_verification.scanner import FileVerificationScanResult, FileVerificationScanner
from src.hooks.hooks_base import Hook, HookRunResult
from src.hooks.presidio.scanner import PresidioScanResult, PresidioScanner
from src.hooks.trufflehog.scanner import TrufflehogScanResult, TrufflehogScanner
Expand All @@ -24,11 +25,13 @@
class RunSecurityScanResult(HookRunResult):
def __init__(
self,
trufflehog_scan_result: TrufflehogScanResult,
presidio_scan_result: PresidioScanResult,
trufflehog_scan_result: TrufflehogScanResult | None,
presidio_scan_result: PresidioScanResult | None,
file_verification_task_result: FileVerificationScanResult | None,
):
self.trufflehog_scan_result = trufflehog_scan_result
self.presidio_scan_result = presidio_scan_result
self.file_verification_task_result = file_verification_task_result

def run_success(self) -> bool:
is_success = True
Expand All @@ -41,18 +44,29 @@ def run_success(self) -> bool:
and len(self.presidio_scan_result.paths_containing_personal_data) > 0
):
is_success = False
if self.file_verification_task_result:
if (
self.file_verification_task_result.exceeds_file_size
and len(self.file_verification_task_result.exceeds_file_size) > 0
) or (self.file_verification_task_result.forbidden and len(self.file_verification_task_result.forbidden) > 0):
is_success = False
return is_success

def run_summary(self) -> str | None:
trufflehog_summary = ""
presidio_summary = ""
file_verification_summary = ""

if self.trufflehog_scan_result:
trufflehog_summary = str(self.trufflehog_scan_result)

if self.presidio_scan_result:
presidio_summary = str(self.presidio_scan_result)

return "".join(["\n", trufflehog_summary, "\n", "\n", presidio_summary])
if self.file_verification_task_result:
file_verification_summary = str(self.file_verification_task_result)

return "".join(["\n", trufflehog_summary, "\n", "\n", presidio_summary, "\n", "\n", file_verification_summary])


class RunSecurityScan(Hook):
Expand Down Expand Up @@ -151,9 +165,16 @@ async def run_personal_scan(self) -> PresidioScanResult:
paths_to_scan,
).scan()

async def run_file_verification_scan(self):
if self.github_action: # only scan new files
return None

return await FileVerificationScanner(self.verbose, self.paths).scan()

async def run(self) -> RunSecurityScanResult:
security_scan_task = None
personal_data_scan_task = None
file_verification_task = None

async with asyncio.TaskGroup() as tg:
if SECURITY_SCAN not in self.excluded_scans:
Expand All @@ -168,10 +189,14 @@ async def run(self) -> RunSecurityScanResult:
else:
logger.debug("Personal data scan is excluded")

file_verification_task = tg.create_task(self.run_file_verification_scan())

security_scan_result = security_scan_task.result() if security_scan_task else None
personal_data_scan_result = personal_data_scan_task.result() if personal_data_scan_task else None
file_verification_task_result = file_verification_task.result() if file_verification_task else None

return RunSecurityScanResult(
trufflehog_scan_result=security_scan_result,
presidio_scan_result=personal_data_scan_result,
file_verification_task_result=file_verification_task_result,
)
Loading
Loading