diff --git a/processing/segmenter/planktoscope/segmenter/__init__.py b/processing/segmenter/planktoscope/segmenter/__init__.py index fb38a8cb..b5085011 100644 --- a/processing/segmenter/planktoscope/segmenter/__init__.py +++ b/processing/segmenter/planktoscope/segmenter/__init__.py @@ -130,6 +130,11 @@ def __init__(self, event, data_path): if not os.path.exists(path): # create the path! os.makedirs(path) + + # Initialize job queue and status tracking + self.job_queue = queue.Queue() + self.job_status = {} # Dictionary to track status of each job + self.segmenter_client = self.initialize_mqtt_client() logger.success("planktoscope.segmenter is initialised and ready to go!") @@ -740,6 +745,65 @@ def _pipe(self, ecotaxa_export): # we're done free some mem self.__flat = None + def initialize_mqtt_client(self): + # Initialize the MQTT client (placeholder) + return MQTT_Client("segmenter/#", "segmenter_client") + + def add_job(self, job_id, job_data): + # Add a new job to the queue and set initial status + self.job_queue.put((job_id, job_data)) + self.job_status[job_id] = {"status": "queued"} + self.segmenter_client.client.publish("status/segmenter", json.dumps({ + "job_id": job_id, + "status": "queued" + })) + + def update_job_status(self, job_id, status): + # Update the status of a job + if job_id in self.job_status: + self.job_status[job_id]["status"] = status + self.segmenter_client.client.publish("status/segmenter", json.dumps({ + "job_id": job_id, + "status": status + })) + + def process_jobs(self): + # Process jobs from the queue + while True: + if not self.job_queue.empty(): + job_id, job_data = self.job_queue.get() + try: + self.update_job_status(job_id, "processing") + self.process_job(job_data) # Actual job processing logic + self.update_job_status(job_id, "completed") + except Exception as e: + self.update_job_status(job_id, "failed") + self.job_status[job_id]["error"] = str(e) + time.sleep(0.5) + + def process_job(self, job_data): + # Placeholder for job processing logic + time.sleep(2) # Simulate processing time + + def run(self): + # Start the job processing thread + threading.Thread(target=self.process_jobs, daemon=True).start() + + def handle_mqtt_message(self, topic, payload): + # Handle incoming MQTT messages for job status queries + try: + request = json.loads(payload) + if "job_id" in request: + job_id = request["job_id"] + if job_id in self.job_status: + response = self.job_status[job_id] + else: + response = {"status": "not_found"} + self.segmenter_client.client.publish(f"status/segmenter/{job_id}", json.dumps(response)) + except Exception as e: + print(f"Failed to handle message: {e}") + + def segment_all(self, paths: list, force=False, ecotaxa_export=True): """Starts the segmentation in all the folders given recursively @@ -998,6 +1062,19 @@ def treat_message(self): f"We did not understand the received request {last_message}" ) +class MQTT_Client: + def __init__(self, topic, name): + # Placeholder for MQTT client initialization + self.client = self.initialize_mqtt_client() + + def initialize_mqtt_client(self): + # Placeholder for setting up the MQTT client + return self + + def publish(self, topic, message): + # Placeholder for MQTT publish functionality + print(f"Published to {topic}: {message}") + ################################################################################ # While loop for capturing commands from Node-RED ################################################################################ @@ -1053,5 +1130,14 @@ def run(self): # This is called if this script is launched directly if __name__ == "__main__": + segmenter = Segmenter() + segmenter.run() + + # Simulate adding jobs + segmenter.add_job("job1", {"data": "example_data1"}) + segmenter.add_job("job2", {"data": "example_data2"}) + + # Simulate MQTT message for job status query + segmenter.handle_mqtt_message("status/segmenter/request", json.dumps({"job_id": "job1"})) # TODO This should be a test suite for this library pass