From ddec27652da8b1d6930c27ee661f76fda3e9afd4 Mon Sep 17 00:00:00 2001 From: zsquare12 <36557466+zsquare12@users.noreply.github.com> Date: Tue, 1 Apr 2025 18:23:07 +0530 Subject: [PATCH 1/2] TOPP Execution: No more execution needed if Error occur in any. when Exrro occur in any thread, of the TOPP execution, then don't start the next execution. --- src/workflow/CommandExecutor.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 6cc493014..75ee6a086 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -24,6 +24,9 @@ def __init__(self, workflow_dir: Path, logger: Logger, parameter_manager: Parame self.pid_dir = Path(workflow_dir, "pids") self.logger = logger self.parameter_manager = parameter_manager + self.states = { + "is_executing_error_occurred": False, + } def run_multiple_commands( self, commands: list[str] @@ -42,6 +45,9 @@ def run_multiple_commands( # Log the start of command execution self.logger.log(f"Running {len(commands)} commands in parallel...", 1) start_time = time.time() + + # make sure is_executing_error_occurred is reset to False + self.states["is_executing_error_occurred"] = False # Initialize a list to keep track of threads threads = [] @@ -55,6 +61,11 @@ def run_multiple_commands( # Wait for all threads to complete for thread in threads: thread.join() + + # Check if any error occurred during execution and raise error so that no more commands are executed + if self.states["is_executing_error_occurred"]: + self.logger.log("❌ Stopping the execution as error occurred.") + raise Exception("Execution error occurred in one of the commands.") # Calculate and log the total execution time end_time = time.time() @@ -89,6 +100,11 @@ def run_command(self, command: list[str]) -> None: # Wait for command completion and capture output stdout, stderr = process.communicate() + # Check if the process is killed + if self.states["is_executing_error_occurred"]: + self.logger.log(f"Process {child_pid}: stopped the executing as in other thread error occurred") + return + # Cleanup PID file pid_file_path.unlink() @@ -103,6 +119,7 @@ def run_command(self, command: list[str]) -> None: # Log stderr and raise an exception if errors occurred if stderr or process.returncode != 0: + self.states["is_executing_error_occurred"] = True error_message = stderr.decode().strip() self.logger.log(f"ERRORS OCCURRED:\n{error_message}", 2) From 5a7691feb4e6e08f2b3c751e191ea1466520dbe7 Mon Sep 17 00:00:00 2001 From: zsquare12 <36557466+zsquare12@users.noreply.github.com> Date: Tue, 1 Apr 2025 18:35:29 +0530 Subject: [PATCH 2/2] TOPP Execution: Terminate all the threads if error occurs in any. it will terminated all the threads by terminating the it process (as direct thread termination is not possible in python). --- src/workflow/CommandExecutor.py | 45 +++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 75ee6a086..7df0fdcbf 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -9,6 +9,7 @@ import sys import importlib.util import json +from random import randint class CommandExecutor: """ @@ -22,6 +23,7 @@ class CommandExecutor: # Methods for running commands and logging def __init__(self, workflow_dir: Path, logger: Logger, parameter_manager: ParameterManager): self.pid_dir = Path(workflow_dir, "pids") + self.execution_pids = [] self.logger = logger self.parameter_manager = parameter_manager self.states = { @@ -84,34 +86,37 @@ def run_command(self, command: list[str]) -> None: # Ensure all command parts are strings command = [str(c) for c in command] - # Log the execution start - self.logger.log(f"Running command:\n"+' '.join(command)+"\nWaiting for command to finish...", 1) - start_time = time.time() - # Execute the command process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) child_pid = process.pid + # Log the execution start + self.logger.log(f"Running command:{child_pid} "+' '.join(command)+"\nWaiting for command to finish...", 1) + start_time = time.time() + # Record the PID to keep track of running processes associated with this workspace/workflow # User can close the Streamlit app and return to a running workflow later pid_file_path = self.pid_dir / str(child_pid) pid_file_path.touch() + self.execution_pids.append(str(child_pid)) # Wait for command completion and capture output stdout, stderr = process.communicate() + # Cleanup PID file + pid_file_path.unlink() + if str(child_pid) in self.execution_pids: + self.execution_pids.remove(str(child_pid)) + # Check if the process is killed if self.states["is_executing_error_occurred"]: - self.logger.log(f"Process {child_pid}: stopped the executing as in other thread error occurred") + self.logger.log(f"❌ Process {child_pid}: stopped the executing as in other thread error occurred") return - # Cleanup PID file - pid_file_path.unlink() - end_time = time.time() execution_time = end_time - start_time # Format the logging prefix - self.logger.log(f"Process finished:\n"+' '.join(command)+f"\nTotal time to run command: {execution_time:.2f} seconds", 1) + # self.logger.log(f"Process finished:{child_pid} "+' '.join(command)+f"\nTotal time to run command: {execution_time:.2f} seconds", 1) # Log stdout if present if stdout: @@ -121,7 +126,10 @@ def run_command(self, command: list[str]) -> None: if stderr or process.returncode != 0: self.states["is_executing_error_occurred"] = True error_message = stderr.decode().strip() - self.logger.log(f"ERRORS OCCURRED:\n{error_message}", 2) + self.logger.log(f"❌ ERRORS OCCURRED:{child_pid} | {error_message}", 2) + self.stop() + else: + self.logger.log(f"✅ Command finished successfully: {child_pid} ") def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> None: """ @@ -207,6 +215,12 @@ def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> N if len(commands) == 1: self.run_command(commands[0]) elif len(commands) > 1: + # commands = [ + # ["ping", "127.0.0.1", "-n", "10"], + # ["ping", "127.0.0.1", "-n", "10"], + # ["ping", "127.0.0.1", "-n", "15"], + # ["python", "-c", "import time; time.sleep(3); raise Exception('Simulated Error')"] + # ] self.run_multiple_commands(commands) else: raise Exception("No commands to execute.") @@ -215,14 +229,19 @@ def stop(self) -> None: """ Terminates all processes initiated by this executor by killing them based on stored PIDs. """ - self.logger.log("Stopping all running processes...") - pids = [Path(f).stem for f in self.pid_dir.iterdir()] + stop_id = randint(0, 1000000) + # self.logger.log(f"Stopping-{stop_id} all running processes...") + # pids = [Path(f).stem for f in self.pid_dir.iterdir()] + pids = self.execution_pids[:] + self.logger.log(f"🛑 Stopping-{stop_id} all running processes: {pids}") for pid in pids: + self.logger.log(f"Stopping-{stop_id} process {pid}") try: os.kill(int(pid), 9) + self.logger.log(f"Stopping-{stop_id} successfully killed process {pid}") except OSError as e: - self.logger.log(f"Failed to kill process {pid}: {e}") + self.logger.log(f"Failed-{stop_id} to kill process {pid}: {e}") shutil.rmtree(self.pid_dir, ignore_errors=True) self.logger.log("Workflow stopped.")