Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 45 additions & 9 deletions src/workflow/CommandExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import sys
import importlib.util
import json
from random import randint

class CommandExecutor:
"""
Expand All @@ -22,8 +23,12 @@ class CommandExecutor:
# Methods for running commands and logging
def __init__(self, workflow_dir: Path, logger: Logger, parameter_manager: ParameterManager):
self.pid_dir = Path(workflow_dir, "pids")
self.execution_pids = []
self.logger = logger
self.parameter_manager = parameter_manager
self.states = {
"is_executing_error_occurred": False,
}

def run_multiple_commands(
self, commands: list[str]
Expand All @@ -42,6 +47,9 @@ def run_multiple_commands(
# Log the start of command execution
self.logger.log(f"Running {len(commands)} commands in parallel...", 1)
start_time = time.time()

# make sure is_executing_error_occurred is reset to False
self.states["is_executing_error_occurred"] = False

# Initialize a list to keep track of threads
threads = []
Expand All @@ -55,6 +63,11 @@ def run_multiple_commands(
# Wait for all threads to complete
for thread in threads:
thread.join()

# Check if any error occurred during execution and raise error so that no more commands are executed
if self.states["is_executing_error_occurred"]:
self.logger.log("❌ Stopping the execution as error occurred.")
raise Exception("Execution error occurred in one of the commands.")

# Calculate and log the total execution time
end_time = time.time()
Expand All @@ -73,38 +86,50 @@ def run_command(self, command: list[str]) -> None:
# Ensure all command parts are strings
command = [str(c) for c in command]

# Log the execution start
self.logger.log(f"Running command:\n"+' '.join(command)+"\nWaiting for command to finish...", 1)
start_time = time.time()

# Execute the command
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
child_pid = process.pid

# Log the execution start
self.logger.log(f"Running command:{child_pid} "+' '.join(command)+"\nWaiting for command to finish...", 1)
start_time = time.time()

# Record the PID to keep track of running processes associated with this workspace/workflow
# User can close the Streamlit app and return to a running workflow later
pid_file_path = self.pid_dir / str(child_pid)
pid_file_path.touch()
self.execution_pids.append(str(child_pid))

# Wait for command completion and capture output
stdout, stderr = process.communicate()

# Cleanup PID file
pid_file_path.unlink()
if str(child_pid) in self.execution_pids:
self.execution_pids.remove(str(child_pid))

# Check if the process is killed
if self.states["is_executing_error_occurred"]:
self.logger.log(f"❌ Process {child_pid}: stopped the executing as in other thread error occurred")
return

end_time = time.time()
execution_time = end_time - start_time
# Format the logging prefix
self.logger.log(f"Process finished:\n"+' '.join(command)+f"\nTotal time to run command: {execution_time:.2f} seconds", 1)
# self.logger.log(f"Process finished:{child_pid} "+' '.join(command)+f"\nTotal time to run command: {execution_time:.2f} seconds", 1)

# Log stdout if present
if stdout:
self.logger.log(stdout.decode(), 2)

# Log stderr and raise an exception if errors occurred
if stderr or process.returncode != 0:
self.states["is_executing_error_occurred"] = True
error_message = stderr.decode().strip()
self.logger.log(f"ERRORS OCCURRED:\n{error_message}", 2)
self.logger.log(f"❌ ERRORS OCCURRED:{child_pid} | {error_message}", 2)
self.stop()
else:
self.logger.log(f"✅ Command finished successfully: {child_pid} ")

def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> None:
"""
Expand Down Expand Up @@ -190,6 +215,12 @@ def run_topp(self, tool: str, input_output: dict, custom_params: dict = {}) -> N
if len(commands) == 1:
self.run_command(commands[0])
elif len(commands) > 1:
# commands = [
# ["ping", "127.0.0.1", "-n", "10"],
# ["ping", "127.0.0.1", "-n", "10"],
# ["ping", "127.0.0.1", "-n", "15"],
# ["python", "-c", "import time; time.sleep(3); raise Exception('Simulated Error')"]
# ]
self.run_multiple_commands(commands)
else:
raise Exception("No commands to execute.")
Expand All @@ -198,14 +229,19 @@ def stop(self) -> None:
"""
Terminates all processes initiated by this executor by killing them based on stored PIDs.
"""
self.logger.log("Stopping all running processes...")
pids = [Path(f).stem for f in self.pid_dir.iterdir()]
stop_id = randint(0, 1000000)
# self.logger.log(f"Stopping-{stop_id} all running processes...")
# pids = [Path(f).stem for f in self.pid_dir.iterdir()]
pids = self.execution_pids[:]
self.logger.log(f"🛑 Stopping-{stop_id} all running processes: {pids}")

for pid in pids:
self.logger.log(f"Stopping-{stop_id} process {pid}")
try:
os.kill(int(pid), 9)
self.logger.log(f"Stopping-{stop_id} successfully killed process {pid}")
except OSError as e:
self.logger.log(f"Failed to kill process {pid}: {e}")
self.logger.log(f"Failed-{stop_id} to kill process {pid}: {e}")

shutil.rmtree(self.pid_dir, ignore_errors=True)
self.logger.log("Workflow stopped.")
Expand Down