-
Notifications
You must be signed in to change notification settings - Fork 43
Pathlib and perf #823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Pathlib and perf #823
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -15,12 +15,6 @@ | |||||
| from logging import exception as logging_exception | ||||||
| from logging import info as logging_info | ||||||
| from logging import warning as logging_warning | ||||||
| from os import getcwd as os_getcwd | ||||||
| from os import listdir as os_listdir | ||||||
| from os import path as os_path | ||||||
| from os import remove as os_remove | ||||||
| from os import rename as os_rename | ||||||
| from os import rmdir as os_rmdir | ||||||
| from pathlib import Path | ||||||
| from platform import system as platform_system | ||||||
| from re import compile as re_compile | ||||||
|
|
@@ -129,7 +123,7 @@ def re_init(self, vehicle_dir: str, vehicle_type: str, blank_component_data: boo | |||||
| # Extend parameter documentation metadata if <parameter_file>.pdef.xml exists | ||||||
| for filename in self.file_parameters: | ||||||
| pdef_xml_file = filename.replace(".param", ".pdef.xml") | ||||||
| if os_path.exists(os_path.join(xml_dir, pdef_xml_file)): | ||||||
| if (Path(xml_dir) / pdef_xml_file).exists(): | ||||||
| doc_dict = parse_parameter_metadata("", xml_dir, pdef_xml_file, vehicle_type, TOOLTIP_MAX_LENGTH) | ||||||
| self.doc_dict.update(doc_dict) | ||||||
|
|
||||||
|
|
@@ -168,9 +162,9 @@ def rename_parameter_files(self) -> None: | |||||
| new_filename, | ||||||
| ) | ||||||
| continue | ||||||
| new_filename_path = os_path.join(self.vehicle_dir, new_filename) | ||||||
| old_filename_path = os_path.join(self.vehicle_dir, old_filename) | ||||||
| os_rename(old_filename_path, new_filename_path) | ||||||
| new_filename_path = Path(self.vehicle_dir) / new_filename | ||||||
| old_filename_path = Path(self.vehicle_dir) / old_filename | ||||||
| old_filename_path.rename(new_filename_path) | ||||||
| logging_info("Renamed %s to %s", old_filename, new_filename) | ||||||
|
|
||||||
| def _format_columns_sorted_numerically( # pylint: disable=too-many-locals | ||||||
|
|
@@ -338,15 +332,17 @@ def read_params_from_files(self) -> dict[str, ParDict]: | |||||
|
|
||||||
| """ | ||||||
| parameters: dict[str, ParDict] = {} | ||||||
| if os_path.isdir(self.vehicle_dir): | ||||||
| vehicle_path = Path(self.vehicle_dir) | ||||||
| if vehicle_path.is_dir(): | ||||||
| # Regular expression pattern for filenames starting with two digits followed by an underscore and ending in .param | ||||||
| pattern = re_compile(r"^\d{2}_.*\.param$") | ||||||
|
|
||||||
| for filename in sorted(os_listdir(self.vehicle_dir)): | ||||||
| for file_path in sorted(vehicle_path.iterdir()): | ||||||
| filename = file_path.name | ||||||
| if pattern.match(filename): | ||||||
| if filename in {"00_default.param", "01_ignore_readonly.param"}: | ||||||
| continue | ||||||
| parameters[filename] = ParDict.from_file(os_path.join(self.vehicle_dir, filename)) | ||||||
| parameters[filename] = ParDict.from_file(str(file_path)) | ||||||
| else: | ||||||
| logging_error(_("Error: %s is not a directory."), self.vehicle_dir) | ||||||
| return parameters | ||||||
|
|
@@ -387,11 +383,10 @@ def export_to_param(self, params: ParDict, filename_out: str, annotate_doc: bool | |||||
| annotate_doc (bool, optional): Whether to update the parameter documentation. Default is True. | ||||||
|
|
||||||
| """ | ||||||
| params.export_to_param(os_path.join(self.vehicle_dir, filename_out)) | ||||||
| output_path = Path(self.vehicle_dir) / filename_out | ||||||
| params.export_to_param(str(output_path)) | ||||||
| if annotate_doc: | ||||||
| update_parameter_documentation( | ||||||
| self.doc_dict, os_path.join(self.vehicle_dir, filename_out), "missionplanner", self.param_default_dict | ||||||
| ) | ||||||
| update_parameter_documentation(self.doc_dict, str(output_path), "missionplanner", self.param_default_dict) | ||||||
|
|
||||||
| def vehicle_configuration_file_exists(self, filename: str) -> bool: | ||||||
| """ | ||||||
|
|
@@ -404,8 +399,8 @@ def vehicle_configuration_file_exists(self, filename: str) -> bool: | |||||
| bool: True if the file exists and is a file (not a directory) and is not empty, False otherwise. | ||||||
|
|
||||||
| """ | ||||||
| file_path = os_path.join(self.vehicle_dir, filename) | ||||||
| return os_path.exists(file_path) and os_path.isfile(file_path) and os_path.getsize(file_path) > 0 | ||||||
| file_path = Path(self.vehicle_dir) / filename | ||||||
| return file_path.exists() and file_path.is_file() and file_path.stat().st_size > 0 | ||||||
|
|
||||||
| def __all_intermediate_parameter_file_comments(self) -> dict[str, str]: | ||||||
| """ | ||||||
|
|
@@ -463,27 +458,25 @@ def categorize_parameters(self, param: ParDict) -> tuple[ParDict, ParDict, ParDi | |||||
|
|
||||||
| @staticmethod | ||||||
| def get_directory_name_from_full_path(full_path: str) -> str: | ||||||
| # Normalize the path to ensure it's in a standard format | ||||||
| normalized_path = os_path.normpath(full_path) | ||||||
|
|
||||||
| # Split the path into head and tail, then get the basename of the tail | ||||||
| return os_path.basename(os_path.split(normalized_path)[1]) | ||||||
| # Use pathlib for cleaner path operations | ||||||
| return Path(full_path).name | ||||||
|
|
||||||
| # Extract the vehicle name from the directory path | ||||||
| def get_vehicle_directory_name(self) -> str: | ||||||
| return self.get_directory_name_from_full_path(self.vehicle_dir) | ||||||
|
|
||||||
| def zip_file_path(self) -> str: | ||||||
| vehicle_name = self.get_vehicle_directory_name() | ||||||
| return os_path.join(self.vehicle_dir, f"{vehicle_name}.zip") | ||||||
| return str(Path(self.vehicle_dir) / f"{vehicle_name}.zip") | ||||||
|
|
||||||
| def zip_file_exists(self) -> bool: | ||||||
| zip_file_path = self.zip_file_path() | ||||||
| return os_path.exists(zip_file_path) and os_path.isfile(zip_file_path) | ||||||
| zip_file_path = Path(self.zip_file_path()) | ||||||
| return zip_file_path.exists() and zip_file_path.is_file() | ||||||
|
|
||||||
| def add_configuration_file_to_zip(self, zipf: ZipFile, filename: str) -> None: | ||||||
| if self.vehicle_configuration_file_exists(filename): | ||||||
| zipf.write(os_path.join(self.vehicle_dir, filename), arcname=filename) | ||||||
| file_path = Path(self.vehicle_dir) / filename | ||||||
| zipf.write(str(file_path), arcname=filename) | ||||||
|
|
||||||
| def zip_files(self, files_to_zip: list[tuple[bool, str]]) -> None: | ||||||
| """ | ||||||
|
|
@@ -529,31 +522,36 @@ def zip_files(self, files_to_zip: list[tuple[bool, str]]) -> None: | |||||
| logging_info(_("Intermediate parameter files and summary files zipped to %s"), zip_file_path) | ||||||
|
|
||||||
| def vehicle_image_filepath(self) -> str: | ||||||
| return os_path.join(self.vehicle_dir, "vehicle.jpg") | ||||||
| return str(Path(self.vehicle_dir) / "vehicle.jpg") | ||||||
|
|
||||||
| def vehicle_image_exists(self) -> bool: | ||||||
| return os_path.exists(self.vehicle_image_filepath()) and os_path.isfile(self.vehicle_image_filepath()) | ||||||
| image_path = Path(self.vehicle_image_filepath()) | ||||||
| return image_path.exists() and image_path.is_file() | ||||||
|
|
||||||
| @staticmethod | ||||||
| def new_vehicle_dir(base_dir: str, new_dir: str) -> str: | ||||||
| return os_path.join(base_dir, new_dir) | ||||||
| return str(Path(base_dir) / new_dir) | ||||||
|
|
||||||
| @staticmethod | ||||||
| def directory_exists(directory: str) -> bool: | ||||||
| return os_path.exists(directory) and os_path.isdir(directory) | ||||||
| dir_path = Path(directory) | ||||||
| return dir_path.exists() and dir_path.is_dir() | ||||||
|
|
||||||
| def copy_template_files_to_new_vehicle_dir( | ||||||
| self, template_dir: str, new_vehicle_dir: str, blank_change_reason: bool, copy_vehicle_image: bool | ||||||
| ) -> str: | ||||||
| # Copy the template files to the new vehicle directory | ||||||
| try: | ||||||
| if not os_path.exists(template_dir): | ||||||
| template_path = Path(template_dir) | ||||||
| new_vehicle_path = Path(new_vehicle_dir) | ||||||
|
|
||||||
| if not template_path.exists(): | ||||||
| error_msg = _("Template directory does not exist: {template_dir}") | ||||||
| error_msg = error_msg.format(**locals()) | ||||||
| logging_error(error_msg) | ||||||
| return error_msg | ||||||
|
|
||||||
| if not os_path.exists(new_vehicle_dir): | ||||||
| if not new_vehicle_path.exists(): | ||||||
| error_msg = _("New vehicle directory does not exist: {new_vehicle_dir}") | ||||||
| error_msg = error_msg.format(**locals()) | ||||||
| logging_error(error_msg) | ||||||
|
|
@@ -568,21 +566,21 @@ def copy_template_files_to_new_vehicle_dir( | |||||
| if not copy_vehicle_image: | ||||||
| skip_files.add("vehicle.jpg") | ||||||
|
|
||||||
| for item in os_listdir(template_dir): | ||||||
| if item in skip_files: | ||||||
| for item_path in template_path.iterdir(): | ||||||
| item_name = item_path.name | ||||||
| if item_name in skip_files: | ||||||
| continue | ||||||
| source = os_path.join(template_dir, item) | ||||||
| dest = os_path.join(new_vehicle_dir, item) | ||||||
| if blank_change_reason and item.endswith(".param"): | ||||||
| dest = new_vehicle_path / item_name | ||||||
| if blank_change_reason and item_name.endswith(".param"): | ||||||
| # Blank the change reason in the template files, strip the comments that start with # | ||||||
| with open(source, encoding="utf-8") as file: | ||||||
| with open(item_path, encoding="utf-8") as file: | ||||||
| lines = file.readlines() | ||||||
| with open(dest, "w", encoding="utf-8") as file: | ||||||
| file.writelines(line.split("#")[0].strip() + "\n" for line in lines) | ||||||
| elif os_path.isdir(source): | ||||||
| shutil_copytree(source, dest) | ||||||
| elif item_path.is_dir(): | ||||||
| shutil_copytree(str(item_path), str(dest)) | ||||||
| else: | ||||||
| shutil_copy2(source, dest) | ||||||
| shutil_copy2(str(item_path), str(dest)) | ||||||
| except (OSError, shutil_Error) as _e: | ||||||
| error_msg = _("Error copying template files to new vehicle directory: {_e}") | ||||||
| return error_msg.format(**locals()) | ||||||
|
|
@@ -591,7 +589,8 @@ def copy_template_files_to_new_vehicle_dir( | |||||
| def remove_created_files_and_vehicle_dir(self) -> str: | ||||||
| # Remove the created files in the new vehicle directory | ||||||
| try: | ||||||
| if not os_path.exists(self.vehicle_dir): | ||||||
| vehicle_path = Path(self.vehicle_dir) | ||||||
| if not vehicle_path.exists(): | ||||||
| # Use the actual vehicle_dir value to avoid KeyError from missing format keys | ||||||
| error_msg = _("New vehicle directory does not exist: {vehicle_dir}") | ||||||
| error_msg = error_msg.format(vehicle_dir=self.vehicle_dir) | ||||||
|
|
@@ -600,23 +599,31 @@ def remove_created_files_and_vehicle_dir(self) -> str: | |||||
|
|
||||||
| # delete all files in the vehicle directory and delete the vehicle directory | ||||||
| errors: list[str] = [] | ||||||
| for item in os_listdir(self.vehicle_dir): | ||||||
| item_path = os_path.join(self.vehicle_dir, item) | ||||||
|
|
||||||
| def safe_remove_item(item_path: Path) -> Optional[str]: | ||||||
| """Safely remove a file or directory, returning error message if failed.""" | ||||||
| try: | ||||||
| # If the entry is a symlink, remove the link instead of recursing into the target | ||||||
| if os_path.islink(item_path): | ||||||
| os_remove(item_path) | ||||||
| elif os_path.isdir(item_path): | ||||||
| shutil_rmtree(item_path) | ||||||
| if item_path.is_symlink(): | ||||||
| item_path.unlink() | ||||||
| elif item_path.is_dir(): | ||||||
| shutil_rmtree(str(item_path)) | ||||||
| else: | ||||||
| os_remove(item_path) | ||||||
| item_path.unlink() | ||||||
| return None | ||||||
| except OSError as e: | ||||||
| logging_exception(_("Error removing %s"), item_path, e) | ||||||
| errors.append(str(e)) | ||||||
| return str(e) | ||||||
|
|
||||||
| # Remove all items and collect errors | ||||||
| for item_path in vehicle_path.iterdir(): | ||||||
| error = safe_remove_item(item_path) | ||||||
| if error: | ||||||
| errors.append(error) | ||||||
|
|
||||||
| # Try to remove the now-empty vehicle directory | ||||||
| try: | ||||||
| os_rmdir(self.vehicle_dir) | ||||||
| vehicle_path.rmdir() | ||||||
| except OSError as e: | ||||||
| logging_exception(_("Error removing directory %s"), self.vehicle_dir, e) | ||||||
| errors.append(str(e)) | ||||||
|
|
@@ -633,11 +640,11 @@ def remove_created_files_and_vehicle_dir(self) -> str: | |||||
|
|
||||||
| @staticmethod | ||||||
| def getcwd() -> str: | ||||||
| return os_getcwd() | ||||||
| return str(Path.cwd()) | ||||||
|
|
||||||
| def tempcal_imu_result_param_tuple(self) -> tuple[str, str]: | ||||||
| tempcal_imu_result_param_filename = "03_imu_temperature_calibration_results.param" | ||||||
| return tempcal_imu_result_param_filename, os_path.join(self.vehicle_dir, tempcal_imu_result_param_filename) | ||||||
| return tempcal_imu_result_param_filename, str(Path(self.vehicle_dir) / tempcal_imu_result_param_filename) | ||||||
|
|
||||||
| def copy_fc_values_to_file(self, selected_file: str, params: dict[str, float]) -> int: | ||||||
| ret = 0 | ||||||
|
|
@@ -652,14 +659,16 @@ def copy_fc_values_to_file(self, selected_file: str, params: dict[str, float]) - | |||||
|
|
||||||
| def write_last_uploaded_filename(self, current_file: str) -> None: | ||||||
| try: | ||||||
| with open(os_path.join(self.vehicle_dir, "last_uploaded_filename.txt"), "w", encoding="utf-8") as file: | ||||||
| last_uploaded_path = Path(self.vehicle_dir) / "last_uploaded_filename.txt" | ||||||
| with open(last_uploaded_path, "w", encoding="utf-8") as file: | ||||||
| file.write(current_file) | ||||||
| except Exception as e: # pylint: disable=broad-except | ||||||
| logging_error(_("Error writing last uploaded filename: %s"), e) | ||||||
|
|
||||||
| def __read_last_uploaded_filename(self) -> str: | ||||||
| try: | ||||||
| with open(os_path.join(self.vehicle_dir, "last_uploaded_filename.txt"), encoding="utf-8") as file: | ||||||
| last_uploaded_path = Path(self.vehicle_dir) / "last_uploaded_filename.txt" | ||||||
| with open(last_uploaded_path, encoding="utf-8") as file: | ||||||
| return file.read().strip() | ||||||
| except FileNotFoundError as e: | ||||||
| logging_debug(_("last_uploaded_filename.txt not found: %s"), e) | ||||||
|
|
@@ -726,7 +735,8 @@ def backup_fc_parameters_to_file( | |||||
| overwrite_existing_file or not self.vehicle_configuration_file_exists(filename) | ||||||
| ): | ||||||
| param_dict_as_par = ParDict({param: Par(float(value), "") for param, value in param_dict.items()}) | ||||||
| param_dict_as_par.export_to_param(os_path.join(self.vehicle_dir, filename)) | ||||||
| param_file_path = Path(self.vehicle_dir) / filename | ||||||
| param_dict_as_par.export_to_param(str(param_file_path)) | ||||||
|
|
||||||
| def get_eval_variables(self) -> dict[str, dict[str, Any]]: | ||||||
| variables = {} | ||||||
|
|
@@ -824,7 +834,8 @@ def write_param_default_values(self, param_default_values: ParDict) -> bool: | |||||
| def write_param_default_values_to_file(self, param_default_values: ParDict, filename: str = "00_default.param") -> None: | ||||||
| if self.write_param_default_values(param_default_values): | ||||||
| self.file_parameters[filename] = param_default_values | ||||||
| self.param_default_dict.export_to_param(os_path.join(self.vehicle_dir, filename)) | ||||||
| param_file_path = Path(self.vehicle_dir) / filename | ||||||
| self.param_default_dict.export_to_param(str(param_file_path)) | ||||||
|
|
||||||
| def get_download_url_and_local_filename(self, selected_file: str) -> tuple[str, str]: | ||||||
| if ( | ||||||
|
|
@@ -835,7 +846,7 @@ def get_download_url_and_local_filename(self, selected_file: str) -> tuple[str, | |||||
| src = self.configuration_steps[selected_file]["download_file"].get("source_url", "") | ||||||
| dst = self.configuration_steps[selected_file]["download_file"].get("dest_local", "") | ||||||
| if self.vehicle_dir and src and dst: | ||||||
| return src, os_path.join(self.vehicle_dir, dst) | ||||||
| return src, str(Path(self.vehicle_dir) / dst) | ||||||
| return "", "" | ||||||
|
|
||||||
| def get_upload_local_and_remote_filenames(self, selected_file: str) -> tuple[str, str]: | ||||||
|
|
@@ -847,7 +858,7 @@ def get_upload_local_and_remote_filenames(self, selected_file: str) -> tuple[str | |||||
| src = self.configuration_steps[selected_file]["upload_file"].get("source_local", "") | ||||||
| dst = self.configuration_steps[selected_file]["upload_file"].get("dest_on_fc", "") | ||||||
| if self.vehicle_dir and src and dst: | ||||||
| return os_path.join(self.vehicle_dir, src), dst | ||||||
| return str(Path(self.vehicle_dir) / src), dst | ||||||
| return "", "" | ||||||
|
|
||||||
| @staticmethod | ||||||
|
|
@@ -858,7 +869,7 @@ def get_git_commit_hash() -> str: | |||||
| ["git", "rev-parse", "HEAD"], # noqa: S607 | ||||||
| capture_output=True, | ||||||
| text=True, | ||||||
| cwd=os_path.dirname(__file__), | ||||||
| cwd=Path(__file__).parent, | ||||||
|
||||||
| cwd=Path(__file__).parent, | |
| cwd=str(Path(__file__).parent), |
Copilot
AI
Sep 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The open() function expects a string path, but git_hash_file is a Path object. Convert to string: with open(str(git_hash_file), encoding=\"utf-8\") as file:
| with open(git_hash_file, encoding="utf-8") as file: | |
| with open(str(git_hash_file), encoding="utf-8") as file: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function name
get_directory_name_from_full_pathsuggests it should return a directory name, butPath.namereturns the final component of the path (which could be a file). The original code usedos.path.basename(os.path.split(normalized_path)[1])which had different behavior. This change may break functionality if callers expect directory-specific behavior.