diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 6cc493014..7df0fdcbf 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -9,6 +9,7 @@ import sys import importlib.util import json +from random import randint class CommandExecutor: """ @@ -22,8 +23,12 @@ class CommandExecutor: # Methods for running commands and logging def __init__(self, workflow_dir: Path, logger: Logger, parameter_manager: ParameterManager): self.pid_dir = Path(workflow_dir, "pids") + self.execution_pids = [] self.logger = logger self.parameter_manager = parameter_manager + self.states = { + "is_executing_error_occurred": False, + } def run_multiple_commands( self, commands: list[str] @@ -42,6 +47,9 @@ def run_multiple_commands( # Log the start of command execution self.logger.log(f"Running {len(commands)} commands in parallel...", 1) start_time = time.time() + + # make sure is_executing_error_occurred is reset to False + self.states["is_executing_error_occurred"] = False # Initialize a list to keep track of threads threads = [] @@ -55,6 +63,11 @@ def run_multiple_commands( # Wait for all threads to complete for thread in threads: thread.join() + + # Check if any error occurred during execution and raise error so that no more commands are executed + if self.states["is_executing_error_occurred"]: + self.logger.log("❌ Stopping the execution as error occurred.") + raise Exception("Execution error occurred in one of the commands.") # Calculate and log the total execution time end_time = time.time() @@ -73,29 +86,37 @@ def run_command(self, command: list[str]) -> None: # Ensure all command parts are strings command = [str(c) for c in command] - # Log the execution start - self.logger.log(f"Running command:\n"+' '.join(command)+"\nWaiting for command to finish...", 1) - start_time = time.time() - # Execute the command process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) child_pid = process.pid + # Log the execution start + self.logger.log(f"Running command:{child_pid} "+' '.join(command)+"\nWaiting for command to finish...", 1) + start_time = time.time() + # Record the PID to keep track of running processes associated with this workspace/workflow # User can close the Streamlit app and return to a running workflow later pid_file_path = self.pid_dir / str(child_pid) pid_file_path.touch() + self.execution_pids.append(str(child_pid)) # Wait for command completion and capture output stdout, stderr = process.communicate() # Cleanup PID file pid_file_path.unlink() + if str(child_pid) in self.execution_pids: + self.execution_pids.remove(str(child_pid)) + # Check if the process is killed + if self.states["is_executing_error_occurred"]: + self.logger.log(f"❌ Process {child_pid}: stopped the executing as in other thread error occurred") + return + end_time = time.time() execution_time = end_time - start_time # Format the logging prefix - self.logger.log(f"Process finished:\n"+' '.join(command)+f"\nTotal time to run command: {execution_time:.2f} seconds", 1) + # self.logger.log(f"Process finished:{child_pid} "+' '.join(command)+f"\nTotal time to run command: {execution_time:.2f} seconds", 1) # Log stdout if present if stdout: @@ -103,8 +124,12 @@ def run_command(self, command: list[str]) -> None: # Log stderr and raise an exception if errors occurred if stderr or process.returncode != 0: + self.states["is_executing_error_occurred"] = True error_message = stderr.decode().strip() - self.logger.log(f"ERRORS OCCURRED:\n{error_message}", 2) + self.logger.log(f"❌ ERRORS OCCURRED:{child_pid} | {error_message}", 2) + self.stop() + else: + self.logger.log(f"✅ Command finished successfully: {child_pid} ") def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> None: """ @@ -190,6 +215,12 @@ def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> N if len(commands) == 1: self.run_command(commands[0]) elif len(commands) > 1: + # commands = [ + # ["ping", "127.0.0.1", "-n", "10"], + # ["ping", "127.0.0.1", "-n", "10"], + # ["ping", "127.0.0.1", "-n", "15"], + # ["python", "-c", "import time; time.sleep(3); raise Exception('Simulated Error')"] + # ] self.run_multiple_commands(commands) else: raise Exception("No commands to execute.") @@ -198,14 +229,19 @@ def stop(self) -> None: """ Terminates all processes initiated by this executor by killing them based on stored PIDs. """ - self.logger.log("Stopping all running processes...") - pids = [Path(f).stem for f in self.pid_dir.iterdir()] + stop_id = randint(0, 1000000) + # self.logger.log(f"Stopping-{stop_id} all running processes...") + # pids = [Path(f).stem for f in self.pid_dir.iterdir()] + pids = self.execution_pids[:] + self.logger.log(f"🛑 Stopping-{stop_id} all running processes: {pids}") for pid in pids: + self.logger.log(f"Stopping-{stop_id} process {pid}") try: os.kill(int(pid), 9) + self.logger.log(f"Stopping-{stop_id} successfully killed process {pid}") except OSError as e: - self.logger.log(f"Failed to kill process {pid}: {e}") + self.logger.log(f"Failed-{stop_id} to kill process {pid}: {e}") shutil.rmtree(self.pid_dir, ignore_errors=True) self.logger.log("Workflow stopped.")