Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 75 additions & 64 deletions ardupilot_methodic_configurator/backend_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@
from logging import exception as logging_exception
from logging import info as logging_info
from logging import warning as logging_warning
from os import getcwd as os_getcwd
from os import listdir as os_listdir
from os import path as os_path
from os import remove as os_remove
from os import rename as os_rename
from os import rmdir as os_rmdir
from pathlib import Path
from platform import system as platform_system
from re import compile as re_compile
Expand Down Expand Up @@ -129,7 +123,7 @@ def re_init(self, vehicle_dir: str, vehicle_type: str, blank_component_data: boo
# Extend parameter documentation metadata if <parameter_file>.pdef.xml exists
for filename in self.file_parameters:
pdef_xml_file = filename.replace(".param", ".pdef.xml")
if os_path.exists(os_path.join(xml_dir, pdef_xml_file)):
if (Path(xml_dir) / pdef_xml_file).exists():
doc_dict = parse_parameter_metadata("", xml_dir, pdef_xml_file, vehicle_type, TOOLTIP_MAX_LENGTH)
self.doc_dict.update(doc_dict)

Expand Down Expand Up @@ -168,9 +162,9 @@ def rename_parameter_files(self) -> None:
new_filename,
)
continue
new_filename_path = os_path.join(self.vehicle_dir, new_filename)
old_filename_path = os_path.join(self.vehicle_dir, old_filename)
os_rename(old_filename_path, new_filename_path)
new_filename_path = Path(self.vehicle_dir) / new_filename
old_filename_path = Path(self.vehicle_dir) / old_filename
old_filename_path.rename(new_filename_path)
logging_info("Renamed %s to %s", old_filename, new_filename)

def _format_columns_sorted_numerically( # pylint: disable=too-many-locals
Expand Down Expand Up @@ -338,15 +332,17 @@ def read_params_from_files(self) -> dict[str, ParDict]:

"""
parameters: dict[str, ParDict] = {}
if os_path.isdir(self.vehicle_dir):
vehicle_path = Path(self.vehicle_dir)
if vehicle_path.is_dir():
# Regular expression pattern for filenames starting with two digits followed by an underscore and ending in .param
pattern = re_compile(r"^\d{2}_.*\.param$")

for filename in sorted(os_listdir(self.vehicle_dir)):
for file_path in sorted(vehicle_path.iterdir()):
filename = file_path.name
if pattern.match(filename):
if filename in {"00_default.param", "01_ignore_readonly.param"}:
continue
parameters[filename] = ParDict.from_file(os_path.join(self.vehicle_dir, filename))
parameters[filename] = ParDict.from_file(str(file_path))
else:
logging_error(_("Error: %s is not a directory."), self.vehicle_dir)
return parameters
Expand Down Expand Up @@ -387,11 +383,10 @@ def export_to_param(self, params: ParDict, filename_out: str, annotate_doc: bool
annotate_doc (bool, optional): Whether to update the parameter documentation. Default is True.

"""
params.export_to_param(os_path.join(self.vehicle_dir, filename_out))
output_path = Path(self.vehicle_dir) / filename_out
params.export_to_param(str(output_path))
if annotate_doc:
update_parameter_documentation(
self.doc_dict, os_path.join(self.vehicle_dir, filename_out), "missionplanner", self.param_default_dict
)
update_parameter_documentation(self.doc_dict, str(output_path), "missionplanner", self.param_default_dict)

def vehicle_configuration_file_exists(self, filename: str) -> bool:
"""
Expand All @@ -404,8 +399,8 @@ def vehicle_configuration_file_exists(self, filename: str) -> bool:
bool: True if the file exists and is a file (not a directory) and is not empty, False otherwise.

"""
file_path = os_path.join(self.vehicle_dir, filename)
return os_path.exists(file_path) and os_path.isfile(file_path) and os_path.getsize(file_path) > 0
file_path = Path(self.vehicle_dir) / filename
return file_path.exists() and file_path.is_file() and file_path.stat().st_size > 0

def __all_intermediate_parameter_file_comments(self) -> dict[str, str]:
"""
Expand Down Expand Up @@ -463,27 +458,25 @@ def categorize_parameters(self, param: ParDict) -> tuple[ParDict, ParDict, ParDi

@staticmethod
def get_directory_name_from_full_path(full_path: str) -> str:
# Normalize the path to ensure it's in a standard format
normalized_path = os_path.normpath(full_path)

# Split the path into head and tail, then get the basename of the tail
return os_path.basename(os_path.split(normalized_path)[1])
# Use pathlib for cleaner path operations
return Path(full_path).name
Comment on lines +461 to +462
Copy link

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name get_directory_name_from_full_path suggests it should return a directory name, but Path.name returns the final component of the path (which could be a file). The original code used os.path.basename(os.path.split(normalized_path)[1]) which had different behavior. This change may break functionality if callers expect directory-specific behavior.

Suggested change
# Use pathlib for cleaner path operations
return Path(full_path).name
# Return the name of the parent directory, even if the path ends with a file
return Path(full_path).parent.name

Copilot uses AI. Check for mistakes.

# Extract the vehicle name from the directory path
def get_vehicle_directory_name(self) -> str:
return self.get_directory_name_from_full_path(self.vehicle_dir)

def zip_file_path(self) -> str:
vehicle_name = self.get_vehicle_directory_name()
return os_path.join(self.vehicle_dir, f"{vehicle_name}.zip")
return str(Path(self.vehicle_dir) / f"{vehicle_name}.zip")

def zip_file_exists(self) -> bool:
zip_file_path = self.zip_file_path()
return os_path.exists(zip_file_path) and os_path.isfile(zip_file_path)
zip_file_path = Path(self.zip_file_path())
return zip_file_path.exists() and zip_file_path.is_file()

def add_configuration_file_to_zip(self, zipf: ZipFile, filename: str) -> None:
if self.vehicle_configuration_file_exists(filename):
zipf.write(os_path.join(self.vehicle_dir, filename), arcname=filename)
file_path = Path(self.vehicle_dir) / filename
zipf.write(str(file_path), arcname=filename)

def zip_files(self, files_to_zip: list[tuple[bool, str]]) -> None:
"""
Expand Down Expand Up @@ -529,31 +522,36 @@ def zip_files(self, files_to_zip: list[tuple[bool, str]]) -> None:
logging_info(_("Intermediate parameter files and summary files zipped to %s"), zip_file_path)

def vehicle_image_filepath(self) -> str:
return os_path.join(self.vehicle_dir, "vehicle.jpg")
return str(Path(self.vehicle_dir) / "vehicle.jpg")

def vehicle_image_exists(self) -> bool:
return os_path.exists(self.vehicle_image_filepath()) and os_path.isfile(self.vehicle_image_filepath())
image_path = Path(self.vehicle_image_filepath())
return image_path.exists() and image_path.is_file()

@staticmethod
def new_vehicle_dir(base_dir: str, new_dir: str) -> str:
return os_path.join(base_dir, new_dir)
return str(Path(base_dir) / new_dir)

@staticmethod
def directory_exists(directory: str) -> bool:
return os_path.exists(directory) and os_path.isdir(directory)
dir_path = Path(directory)
return dir_path.exists() and dir_path.is_dir()

def copy_template_files_to_new_vehicle_dir(
self, template_dir: str, new_vehicle_dir: str, blank_change_reason: bool, copy_vehicle_image: bool
) -> str:
# Copy the template files to the new vehicle directory
try:
if not os_path.exists(template_dir):
template_path = Path(template_dir)
new_vehicle_path = Path(new_vehicle_dir)

if not template_path.exists():
error_msg = _("Template directory does not exist: {template_dir}")
error_msg = error_msg.format(**locals())
logging_error(error_msg)
return error_msg

if not os_path.exists(new_vehicle_dir):
if not new_vehicle_path.exists():
error_msg = _("New vehicle directory does not exist: {new_vehicle_dir}")
error_msg = error_msg.format(**locals())
logging_error(error_msg)
Expand All @@ -568,21 +566,21 @@ def copy_template_files_to_new_vehicle_dir(
if not copy_vehicle_image:
skip_files.add("vehicle.jpg")

for item in os_listdir(template_dir):
if item in skip_files:
for item_path in template_path.iterdir():
item_name = item_path.name
if item_name in skip_files:
continue
source = os_path.join(template_dir, item)
dest = os_path.join(new_vehicle_dir, item)
if blank_change_reason and item.endswith(".param"):
dest = new_vehicle_path / item_name
if blank_change_reason and item_name.endswith(".param"):
# Blank the change reason in the template files, strip the comments that start with #
with open(source, encoding="utf-8") as file:
with open(item_path, encoding="utf-8") as file:
lines = file.readlines()
with open(dest, "w", encoding="utf-8") as file:
file.writelines(line.split("#")[0].strip() + "\n" for line in lines)
elif os_path.isdir(source):
shutil_copytree(source, dest)
elif item_path.is_dir():
shutil_copytree(str(item_path), str(dest))
else:
shutil_copy2(source, dest)
shutil_copy2(str(item_path), str(dest))
except (OSError, shutil_Error) as _e:
error_msg = _("Error copying template files to new vehicle directory: {_e}")
return error_msg.format(**locals())
Expand All @@ -591,7 +589,8 @@ def copy_template_files_to_new_vehicle_dir(
def remove_created_files_and_vehicle_dir(self) -> str:
# Remove the created files in the new vehicle directory
try:
if not os_path.exists(self.vehicle_dir):
vehicle_path = Path(self.vehicle_dir)
if not vehicle_path.exists():
# Use the actual vehicle_dir value to avoid KeyError from missing format keys
error_msg = _("New vehicle directory does not exist: {vehicle_dir}")
error_msg = error_msg.format(vehicle_dir=self.vehicle_dir)
Expand All @@ -600,23 +599,31 @@ def remove_created_files_and_vehicle_dir(self) -> str:

# delete all files in the vehicle directory and delete the vehicle directory
errors: list[str] = []
for item in os_listdir(self.vehicle_dir):
item_path = os_path.join(self.vehicle_dir, item)

def safe_remove_item(item_path: Path) -> Optional[str]:
"""Safely remove a file or directory, returning error message if failed."""
try:
# If the entry is a symlink, remove the link instead of recursing into the target
if os_path.islink(item_path):
os_remove(item_path)
elif os_path.isdir(item_path):
shutil_rmtree(item_path)
if item_path.is_symlink():
item_path.unlink()
elif item_path.is_dir():
shutil_rmtree(str(item_path))
else:
os_remove(item_path)
item_path.unlink()
return None
except OSError as e:
logging_exception(_("Error removing %s"), item_path, e)
errors.append(str(e))
return str(e)

# Remove all items and collect errors
for item_path in vehicle_path.iterdir():
error = safe_remove_item(item_path)
if error:
errors.append(error)

# Try to remove the now-empty vehicle directory
try:
os_rmdir(self.vehicle_dir)
vehicle_path.rmdir()
except OSError as e:
logging_exception(_("Error removing directory %s"), self.vehicle_dir, e)
errors.append(str(e))
Expand All @@ -633,11 +640,11 @@ def remove_created_files_and_vehicle_dir(self) -> str:

@staticmethod
def getcwd() -> str:
return os_getcwd()
return str(Path.cwd())

def tempcal_imu_result_param_tuple(self) -> tuple[str, str]:
tempcal_imu_result_param_filename = "03_imu_temperature_calibration_results.param"
return tempcal_imu_result_param_filename, os_path.join(self.vehicle_dir, tempcal_imu_result_param_filename)
return tempcal_imu_result_param_filename, str(Path(self.vehicle_dir) / tempcal_imu_result_param_filename)

def copy_fc_values_to_file(self, selected_file: str, params: dict[str, float]) -> int:
ret = 0
Expand All @@ -652,14 +659,16 @@ def copy_fc_values_to_file(self, selected_file: str, params: dict[str, float]) -

def write_last_uploaded_filename(self, current_file: str) -> None:
try:
with open(os_path.join(self.vehicle_dir, "last_uploaded_filename.txt"), "w", encoding="utf-8") as file:
last_uploaded_path = Path(self.vehicle_dir) / "last_uploaded_filename.txt"
with open(last_uploaded_path, "w", encoding="utf-8") as file:
file.write(current_file)
except Exception as e: # pylint: disable=broad-except
logging_error(_("Error writing last uploaded filename: %s"), e)

def __read_last_uploaded_filename(self) -> str:
try:
with open(os_path.join(self.vehicle_dir, "last_uploaded_filename.txt"), encoding="utf-8") as file:
last_uploaded_path = Path(self.vehicle_dir) / "last_uploaded_filename.txt"
with open(last_uploaded_path, encoding="utf-8") as file:
return file.read().strip()
except FileNotFoundError as e:
logging_debug(_("last_uploaded_filename.txt not found: %s"), e)
Expand Down Expand Up @@ -726,7 +735,8 @@ def backup_fc_parameters_to_file(
overwrite_existing_file or not self.vehicle_configuration_file_exists(filename)
):
param_dict_as_par = ParDict({param: Par(float(value), "") for param, value in param_dict.items()})
param_dict_as_par.export_to_param(os_path.join(self.vehicle_dir, filename))
param_file_path = Path(self.vehicle_dir) / filename
param_dict_as_par.export_to_param(str(param_file_path))

def get_eval_variables(self) -> dict[str, dict[str, Any]]:
variables = {}
Expand Down Expand Up @@ -824,7 +834,8 @@ def write_param_default_values(self, param_default_values: ParDict) -> bool:
def write_param_default_values_to_file(self, param_default_values: ParDict, filename: str = "00_default.param") -> None:
if self.write_param_default_values(param_default_values):
self.file_parameters[filename] = param_default_values
self.param_default_dict.export_to_param(os_path.join(self.vehicle_dir, filename))
param_file_path = Path(self.vehicle_dir) / filename
self.param_default_dict.export_to_param(str(param_file_path))

def get_download_url_and_local_filename(self, selected_file: str) -> tuple[str, str]:
if (
Expand All @@ -835,7 +846,7 @@ def get_download_url_and_local_filename(self, selected_file: str) -> tuple[str,
src = self.configuration_steps[selected_file]["download_file"].get("source_url", "")
dst = self.configuration_steps[selected_file]["download_file"].get("dest_local", "")
if self.vehicle_dir and src and dst:
return src, os_path.join(self.vehicle_dir, dst)
return src, str(Path(self.vehicle_dir) / dst)
return "", ""

def get_upload_local_and_remote_filenames(self, selected_file: str) -> tuple[str, str]:
Expand All @@ -847,7 +858,7 @@ def get_upload_local_and_remote_filenames(self, selected_file: str) -> tuple[str
src = self.configuration_steps[selected_file]["upload_file"].get("source_local", "")
dst = self.configuration_steps[selected_file]["upload_file"].get("dest_on_fc", "")
if self.vehicle_dir and src and dst:
return os_path.join(self.vehicle_dir, src), dst
return str(Path(self.vehicle_dir) / src), dst
return "", ""

@staticmethod
Expand All @@ -858,7 +869,7 @@ def get_git_commit_hash() -> str:
["git", "rev-parse", "HEAD"], # noqa: S607
capture_output=True,
text=True,
cwd=os_path.dirname(__file__),
cwd=Path(__file__).parent,
Copy link

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cwd parameter expects a string path, but Path(__file__).parent returns a Path object. This should be converted to string: cwd=str(Path(__file__).parent)

Suggested change
cwd=Path(__file__).parent,
cwd=str(Path(__file__).parent),

Copilot uses AI. Check for mistakes.
check=True,
)
if result.returncode == 0:
Expand All @@ -867,7 +878,7 @@ def get_git_commit_hash() -> str:
# Log the specific error and fallback to reading the git_hash.txt file
msg = f"Failed to get git hash via command: {e}"
logging_debug(msg)
git_hash_file = os_path.join(os_path.dirname(__file__), "git_hash.txt")
git_hash_file = Path(__file__).parent / "git_hash.txt"
try:
with open(git_hash_file, encoding="utf-8") as file:
Copy link

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The open() function expects a string path, but git_hash_file is a Path object. Convert to string: with open(str(git_hash_file), encoding=\"utf-8\") as file:

Suggested change
with open(git_hash_file, encoding="utf-8") as file:
with open(str(git_hash_file), encoding="utf-8") as file:

Copilot uses AI. Check for mistakes.
return file.read().strip()
Expand Down Expand Up @@ -904,7 +915,7 @@ def add_argparse_arguments(parser: ArgumentParser) -> ArgumentParser:
parser.add_argument( # type: ignore[attr-defined]
"--vehicle-dir",
type=str,
default=os_getcwd(),
default=str(Path.cwd()),
help=_(
"Directory containing vehicle-specific intermediate parameter files. Default is the current working directory"
),
Expand Down
Loading