diff --git a/custom_plot_item.py b/custom_plot_item.py index dabd1b3..c65f7be 100644 --- a/custom_plot_item.py +++ b/custom_plot_item.py @@ -14,12 +14,19 @@ import numpy as np import graph_utils +from plot_spec import PlotSpec +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from var_list_widget import VarListWidget + from maths_widget import MathsWidget + # PlotSpec is already imported above, no need to re-import under TYPE_CHECKING unless it was causing issues class CustomPlotItem(QLabel): PEN_WIDTH = 2 - def __init__(self, parent, plot_data_item, source, current_tick): + def __init__(self, parent, plot_data_item, source, current_tick, plot_spec_from_source: 'PlotSpec | None' = None): # Added plot_spec_from_source QLabel.__init__(self, plot_data_item.name(), parent=parent) ''' This item should handle the following things: @@ -64,6 +71,31 @@ def __init__(self, parent, plot_data_item, source, current_tick): self.setContextMenuPolicy(Qt.CustomContextMenu) self.customContextMenuRequested.connect(self.show_menu) + # Initialize self.plot_spec based on plot_spec_from_source + if plot_spec_from_source: + self.plot_spec = plot_spec_from_source + else: + # Create a default "file" PlotSpec if none is provided + name = plot_data_item.name() # Use the name from the plot_data_item + file_id = "unknown_source_widget" # Default file identifier + + # Attempt to get a more specific file_id from the source + # source is typically VarListWidget or MathsWidget (or their models) + if hasattr(source, 'filename') and source.filename: # Check for direct filename (e.g. VarListWidget) + file_id = source.filename + elif hasattr(source, 'idx') and source.idx is not None: # Check for direct idx (e.g. VarListWidget) + file_id = str(source.idx) + # If source is MathsWidget, it's more complex as it doesn't have a single 'filename' or 'idx' + # However, if plot_spec_from_source is None, it implies the data isn't from a known math operation. + # This logic assumes that if data comes from MathsWidget, plot_spec_from_source should be set. + + self.plot_spec = PlotSpec( + name=name, + source_type="file", # Default to "file" if not otherwise specified + original_name=name, # Assume original_name is same as current name if not specified + file_source_identifier=file_id + ) + def update_color(self, color_str): pen = pg.mkPen(color=color_str, width=CustomPlotItem.PEN_WIDTH) self.trace.setPen(pen) @@ -121,7 +153,8 @@ def name(self): def get_plot_spec(self): # For now, we'll just get the name of the trace, but this will become more complex in the # future when we start supporting derived signals. - return self.trace.name() + # This method should now return the full PlotSpec object. + return self.plot_spec @pyqtSlot(float) def on_time_changed(self, time): diff --git a/data_file_widget.py b/data_file_widget.py index d717754..a36f499 100644 --- a/data_file_widget.py +++ b/data_file_widget.py @@ -43,6 +43,9 @@ def __init__(self, parent): self.tabs.currentChanged.connect(lambda x: self.tabChanged.emit()) layout.addWidget(self.tabs) + # Make var_lists easily accessible for get_data_item_by_file_id_and_name + self.var_lists = [] # List to store VarListWidget instances + self.filter_box = FilterBoxWidget(self.tabs) layout.addWidget(self.filter_box) @@ -68,8 +71,12 @@ def open_file(self, filepath): tab_name = os.path.basename(filepath) # Create a new tab and add the varListWidget to it. self.latest_data_file_name = filepath - self.tabs.addTab(var_list, tab_name) - self.sources[filepath] = self.tabs.widget(self.tabs.count() - 1) + tab_idx = self.tabs.addTab(var_list, tab_name) # addTab returns the index + # self.sources[filepath] = self.tabs.widget(self.tabs.count() - 1) # Old way + self.sources[filepath] = var_list # Store the instance directly + var_list.idx = tab_idx # Assign tab index as an ad-hoc idx if needed for PlotSpec compatibility + self.var_lists.append(var_list) # Add to our list + self.tabs.setCurrentWidget(var_list) self._update_range_slider() @@ -81,11 +88,22 @@ def open_file(self, filepath): def close_file(self, index): # Add function for closing the tab here. - filename = self.tabs.widget(index).filename - self.tabs.widget(index).close() - self.tabs.removeTab(index) + widget_to_close = self.tabs.widget(index) + filename = widget_to_close.filename + + if widget_to_close in self.var_lists: + self.var_lists.remove(widget_to_close) + if filename in self.sources: + del self.sources[filename] + + widget_to_close.close() # Close the widget first + self.tabs.removeTab(index) # Then remove tab + if self.tabs.count() > 0: self._update_range_slider() + else: # Reset slider if no files are open + self.controller.plot_manager.update_slider_limits(0, 1.e9) + self.countChanged.emit() self.fileClosed[str].emit(filename) @@ -99,8 +117,44 @@ def get_latest_data_file_name(self): def get_data_file(self, idx): return self.tabs.widget(idx) - def get_data_file_by_name(self, name): - return self.sources[name] + def get_data_file_by_name(self, name): # name is filepath + return self.sources.get(name) # Use .get for safer access + + def get_data_item_by_file_id_and_name(self, file_identifier: str, signal_name: str) -> 'DataItem | None': + """ + Retrieves a DataItem from one of the loaded files. + file_identifier can be the file's full path or its stringified tab index at time of PlotSpec creation. + signal_name is the original_name of the signal. + """ + # Try matching by full filepath first (most robust) + var_list_widget = self.sources.get(file_identifier) + if var_list_widget: + model = var_list_widget.model() + if model: + data_item = model.get_data_by_name(signal_name) + if data_item: + return data_item + + # Fallback: iterate through all var_lists if not found by path (e.g., if file_identifier was an index string) + # This is less robust if tab order changed or files were closed/reopened. + for vlw in self.var_lists: + # Check against filename (if file_identifier was a filename but not full path) + if os.path.basename(vlw.filename) == file_identifier: + model = vlw.model() + if model: + data_item = model.get_data_by_name(signal_name) + if data_item: return data_item + + # Check against ad-hoc idx (tab index at time of creation) + # This relies on VarListWidget having an 'idx' attribute that was set to its tab index. + if hasattr(vlw, 'idx') and str(vlw.idx) == file_identifier: + model = vlw.model() + if model: + data_item = model.get_data_by_name(signal_name) + if data_item: return data_item + + print(f"Warning: Could not find DataItem for signal '{signal_name}' from file ID '{file_identifier}'") + return None def get_sources(self): return self.sources @@ -108,8 +162,14 @@ def get_sources(self): def get_time(self, idx=0): if self.tabs.count() == 0: return None + # Ensure idx is valid before trying to access widget + if idx < 0 or idx >= self.tabs.count(): + if self.tabs.count() > 0: # Default to the current tab if idx is bad but tabs exist + return self.tabs.currentWidget().time + return None # No tabs, no time return self.get_data_file(idx).time + @pyqtSlot(QPoint) def on_context_menu_request(self, pos): # We only want to bring up the context menu when an actual tab is right-clicked. Check that diff --git a/data_model.py b/data_model.py index 2619cd3..2f0ce2f 100644 --- a/data_model.py +++ b/data_model.py @@ -4,15 +4,18 @@ import numpy as np +from plot_spec import PlotSpec # Added import + class DataItem(object): """ Data structure for storing data items in the list widget """ - def __init__(self, var_name, data): + def __init__(self, var_name, data, plot_spec: PlotSpec | None = None): # Added plot_spec self._var_name = var_name self._data = data + self._plot_spec = plot_spec # Added plot_spec self._time = None @property @@ -27,6 +30,14 @@ def data(self): def time(self): return self._time + @property + def plot_spec(self): # Added plot_spec property + return self._plot_spec + + @plot_spec.setter + def plot_spec(self, value: PlotSpec | None): # Added plot_spec setter + self._plot_spec = value + def __repr__(self): return self._var_name @@ -99,10 +110,29 @@ def data(self, index, role): def has_key(self, name): return name in self._raw_data.index - def get_data_by_name(self, name): - data = None - try: - data = self._raw_data[name] - except KeyError: - print(f"Unknown key: {name}") - return data + def get_data_by_name(self, name) -> DataItem | None: + # Iterates self._data (which is list[DataItem]) and finds the DataItem with the matching var_name. + for item in self._data: + if item.var_name == name: + # Ensure the PlotSpec is created if it's missing for a file-loaded item + if item.plot_spec is None: + # Attempt to get a file_source_identifier + # This DataModel instance itself doesn't store the filename directly in a way + # that's easily accessible per item here. We'll assume a generic one or improve later if needed. + # For now, we know it's from this model, which is usually file-based. + file_id = "unknown_data_model_source" + # A better approach would be if data_loader.source (filename) was stored in DataModel + # and accessible here, or if DataItem was initialized with it. + # Let's assume self.filename could exist if DataModel was enhanced. + # if hasattr(self, 'filename') and self.filename: + # file_id = self.filename + + item.plot_spec = PlotSpec( + name=item.var_name, + source_type="file", + original_name=item.var_name, + file_source_identifier=file_id # Placeholder, actual file ID needs better handling + ) + return item + print(f"Unknown key: {name} in DataModel") + return None diff --git a/maths/diff_int.py b/maths/diff_int.py index 6651096..2de9404 100644 --- a/maths/diff_int.py +++ b/maths/diff_int.py @@ -19,6 +19,13 @@ def do_math(self, data, dt): def default_var_name(self, vname): return f"Diff({vname})" + def get_operation_details(self): + # Differentiation is straightforward, parameters could be added if different methods are implemented + return {'method': 'finite_difference', 'order': 1} + + def get_source_type(self): + return "math_diff" + class IntegrateSpec(MathSpecBase): def __init__(self, parent): @@ -35,3 +42,10 @@ def do_math(self, data, dt): def default_var_name(self, vname): return f"Int({vname})" + + def get_operation_details(self): + # Integration is also straightforward for this implementation + return {'method': 'cumulative_sum'} + + def get_source_type(self): + return "math_integrate" diff --git a/maths/filter.py b/maths/filter.py index 9ad08b4..6609f0c 100644 --- a/maths/filter.py +++ b/maths/filter.py @@ -83,3 +83,16 @@ def do_math(self, data, dt): def default_var_name(self, vname): return f"Filter({vname},{self._params.order},{self._params.type},{self._params.cutoff})" + + def get_operation_details(self): + if hasattr(self, '_params') and self._params: + return { + 'order': self._params.order, + 'type': self._params.type, + 'cutoff': self._params.cutoff, + 'filtfilt': self._params.filtfilt + } + return None # Or raise an error if params are expected to always be there + + def get_source_type(self): + return "math_filter" diff --git a/maths/maths_base.py b/maths/maths_base.py index 4f59971..b531105 100644 --- a/maths/maths_base.py +++ b/maths/maths_base.py @@ -7,6 +7,7 @@ import abc from data_model import DataItem +from plot_spec import PlotSpec # Added import from var_list_widget import VarListWidget @@ -77,9 +78,38 @@ def eventFilter(self, obj, event): vname, accept = QInputDialog.getText(self.parent(), "Enter variable name", "Variable name:", text=self.default_var_name(selected.var_name)) if accept: - data_item = DataItem(vname, val) - data_item._time = selected._time - self.parent().add_new_var(data_item, vlist) + # PlotSpec creation for math operations + input_plot_spec = selected.plot_spec + if not input_plot_spec: + print(f"Warning: Input variable {selected.var_name} for {self.name} operation is missing PlotSpec. Creating a fallback.") + fallback_file_id = "unknown_input_source" + # vlist is the VarListWidget, vlist.model() is the DataModel + source_model = vlist.model() + if hasattr(source_model, 'filename') and source_model.filename: # Check if source_model has filename + fallback_file_id = source_model.filename + elif hasattr(source_model, 'idx') and source_model.idx is not None: # Check if source_model has idx + fallback_file_id = str(source_model.idx) + + input_plot_spec = PlotSpec( + name=selected.var_name, + source_type="file_fallback", + original_name=selected.var_name, + file_source_identifier=fallback_file_id + ) + + operation_details = self.get_operation_details() + source_type = self.get_source_type() + + output_plot_spec = PlotSpec( + name=vname, + source_type=source_type, + input_plot_specs=[input_plot_spec], + operation_details=operation_details + ) + + data_item = DataItem(vname, val, plot_spec=output_plot_spec) + data_item._time = selected._time # selected is a DataItem, its _time should be set + self.parent().add_new_var(data_item, vlist, output_plot_spec) # Pass output_plot_spec else: print("User cancelled operation!") else: diff --git a/maths/running_minmax.py b/maths/running_minmax.py index 7581102..1e2b32a 100644 --- a/maths/running_minmax.py +++ b/maths/running_minmax.py @@ -109,3 +109,15 @@ def do_math(self, data, dt): def default_var_name(self, vname): return f"RunningMinMax({vname},{self._params.type.name.lower()},{self._params.window_sz},{int(self._params.is_ticks)}) " + + def get_operation_details(self): + if hasattr(self, '_params') and self._params: + return { + 'type': self._params.type.name.lower(), + 'window_sz': self._params.window_sz, + 'is_ticks': self._params.is_ticks + } + return None + + def get_source_type(self): + return "math_running_minmax" diff --git a/maths/running_window.py b/maths/running_window.py index 150c1e4..eced38f 100644 --- a/maths/running_window.py +++ b/maths/running_window.py @@ -110,3 +110,15 @@ def do_math(self, data, dt): def default_var_name(self, vname): return f"RunningWindow({vname},{self._params.type.name.lower()},{self._params.window_sz},{int(self._params.is_ticks)})" + + def get_operation_details(self): + if hasattr(self, '_params') and self._params: + return { + 'type': self._params.type.name.lower(), # e.g. "mean", "median" + 'window_sz': self._params.window_sz, + 'is_ticks': self._params.is_ticks + } + return None + + def get_source_type(self): + return "math_running_window" diff --git a/maths_widget.py b/maths_widget.py index 6a6f902..e422234 100644 --- a/maths_widget.py +++ b/maths_widget.py @@ -16,11 +16,12 @@ from maths.filter import FilterSpec from maths.diff_int import DifferentiateSpec, IntegrateSpec -from maths.running_window import RunningWindowSpec -from maths.running_minmax import RunningMinMaxSpec +from maths.running_window import RunningWindowSpec, WindowTypes # Added WindowTypes import +from maths.running_minmax import RunningMinMaxSpec, MinMaxType # Added MinMaxType import from data_model import DataItem from docked_widget import DockedWidget +from plot_spec import PlotSpec # Added import try: from py_expression_eval import Parser @@ -59,6 +60,7 @@ class VarInfo: var_name: str data: np.ndarray source: int + plot_spec: PlotSpec | None = None # Added plot_spec class MathsWidget(QWidget): @@ -180,15 +182,27 @@ def dragEnterEvent(self, e): def dropEvent(self, e): data = e.mimeData() bstream = data.retrieveData("application/x-DataItem", QVariant.ByteArray) - selected = pickle.loads(bstream) + selected = pickle.loads(bstream) # selected is a DataItem var_name = selected.var_name + # Determine file_source_identifier + file_source_identifier = "unknown_file" + if hasattr(e.source(), 'filename') and e.source().filename: + file_source_identifier = e.source().filename + elif hasattr(e.source(), 'idx') and e.source().idx is not None: + file_source_identifier = str(e.source().idx) + + # Create PlotSpec for the incoming variable + ps = PlotSpec(name=var_name, + source_type="file", + original_name=var_name, + file_source_identifier=file_source_identifier) + vidx = f"x{self.var_in.count()}" list_name = f"{vidx}: {var_name}" new_item = QListWidgetItem(list_name) new_item.setData(Qt.ToolTipRole, vidx) - # self._vars[f"x{self.var_in.count()}"] = e.source().model().get_data_by_name(var_name) - var_info = VarInfo(var_name, selected.data, e.source()) + var_info = VarInfo(var_name, selected.data, e.source(), plot_spec=ps) # Store PlotSpec self._vars[vidx] = var_info # Add this to the list of input variables. self.var_in.addItem(new_item) @@ -241,18 +255,54 @@ def evaluate_math(self): data_item = DataItem(vname, val) # NOTE(rose@): If all of the variables are from the same file, then picking the time for # the first file is sufficient. - data_item._time = self._vars[e_vars[0]].source.time - - self.add_new_var(data_item, self._vars[e_vars[0]].source) - - def add_new_var(self, data_item, source): + data_item._time = self._vars[e_vars[0]].source.time # This time is of the source model (DataModel) + + # Create PlotSpec for the newly evaluated variable + input_plot_specs = [] + for v_key in e_vars: # e_vars are like 'x0', 'x1' + var_info = self._vars[v_key] # This is a VarInfo object + if var_info.plot_spec: + input_plot_specs.append(var_info.plot_spec) + else: + # Fallback for missing input PlotSpec + print(f"Warning: Input variable {var_info.var_name} (key: {v_key}) is missing PlotSpec. Creating a fallback.") + # Try to determine file_source_identifier for fallback + fallback_file_id = "unknown_source" + if hasattr(var_info.source, 'filename') and var_info.source.filename: + fallback_file_id = var_info.source.filename + elif hasattr(var_info.source, 'idx') and var_info.source.idx is not None: + fallback_file_id = str(var_info.source.idx) + + fallback_ps = PlotSpec( + name=var_info.var_name, + source_type="file_fallback", # Indicates it was likely a file var but spec was missing + original_name=var_info.var_name, + file_source_identifier=fallback_file_id + ) + input_plot_specs.append(fallback_ps) + + expression_str = self.math_entry.text() + output_plot_spec = PlotSpec( + name=vname, # This is the user-defined name for the new variable + source_type="math_expr", # Changed source_type + expression=expression_str, + input_plot_specs=input_plot_specs, + operation_details={'expression': expression_str} # Added operation_details + ) + # Assign the created PlotSpec to the DataItem that will be stored + data_item.plot_spec = output_plot_spec + + self.add_new_var(data_item, self._vars[e_vars[0]].source, output_plot_spec) + + def add_new_var(self, data_item: DataItem, source, plot_spec: PlotSpec): # plot_spec is now required list_name = f"y{self.var_out.count()}" new_item = QListWidgetItem(f"{list_name}: {data_item.var_name}") new_item.setData(Qt.UserRole, data_item) new_item.setData(Qt.ToolTipRole, list_name) self.var_out.addItem(new_item) # User can use output vars as inputs also. - self._vars[list_name] = VarInfo(data_item.var_name, data_item.data, source) + # Store the provided plot_spec in VarInfo + self._vars[list_name] = VarInfo(data_item.var_name, data_item.data, source, plot_spec=plot_spec) # TODO(rose@) - Remove this variable from self._vars also. remove_row = lambda: self.var_out.takeItem(self.var_out.row(new_item)) @@ -261,13 +311,27 @@ def add_new_var(self, data_item, source): def start_drag(self, e): index = self.var_out.currentRow() - selected = self.var_out.item(index).data(Qt.UserRole) + selected = self.var_out.item(index).data(Qt.UserRole) # This is a DataItem + vid = self.var_out.item(index).data(Qt.ToolTipRole) + + # Ensure the DataItem (selected) has its PlotSpec correctly set before pickling. + # The plot_spec should have been set on the DataItem either when it was created + # (if it's a result of evaluate_math) or when it was dropped (if it came from var_in). + # The VarInfo's plot_spec is the authoritative one for var_out items. + if vid in self._vars and self._vars[vid].plot_spec: + selected.plot_spec = self._vars[vid].plot_spec + elif selected.plot_spec is None: # Fallback if DataItem itself doesn't have one + # This case should ideally not be hit if logic is correct elsewhere. + # It implies a var_out item whose DataItem didn't get a PlotSpec. + print(f"Warning: PlotSpec missing for {selected.var_name} in start_drag. Creating a basic one.") + selected.plot_spec = PlotSpec(name=selected.var_name, source_type="unknown_derived_in_drag") + bstream = pickle.dumps(selected) mime_data = QMimeData() mime_data.setData("application/x-DataItem", bstream) - vid = self.var_out.item(index).data(Qt.ToolTipRole) + # vid = self.var_out.item(index).data(Qt.ToolTipRole) # Already got vid # NOTE(rose@) These feel like dirty little hacks, but they do work (for now). setattr(self, 'onClose', self._vars[vid].source.onClose) setattr(self, 'time', self._vars[vid].source.time) @@ -282,15 +346,165 @@ def model(self): # TODO(rose@): Fix this hack! For now, redirect the model to our mocked model. return self._silly_model - def get_data_by_name(self, name): + def get_data_by_name(self, name) -> DataItem | None: # TODO(rose@): This is a bit of a hack in order to make drag/drop plotting work. # If we decide to keep this, the model should be formalized. - data = None - # Make a lookup of the variable names... oi vey - v_lookup = {v.var_name: k for k, v in self._vars.items()} - try: - vid = v_lookup[name] - data = self._vars[vid].data - except KeyError: - print(f"Unknown key: {name}") - return data + # This method should return a DataItem, similar to DataModel.get_data_by_name + + # self.parent is MathsWidget, self.parent._vars stores VarInfo objects + for var_info in self.parent._vars.values(): + if var_info.var_name == name: + # Construct a DataItem on the fly + # VarInfo contains: var_name, data, source (VarListWidget), plot_spec + data_item = DataItem( + var_name=var_info.var_name, + data=var_info.data, + plot_spec=var_info.plot_spec + ) + # Try to set the time attribute for the DataItem + # The source in VarInfo is typically the VarListWidget from which the data originated + # or was derived in the context of MathsWidget. + if hasattr(var_info.source, 'time'): + data_item._time = var_info.source.time + elif hasattr(var_info.source, 'model') and hasattr(var_info.source.model(), 'time'): + # If source is a widget, its model might have time + data_item._time = var_info.source.model().time + else: + # Fallback or if time is not critical for this DataItem's usage via ThinModelMock + # print(f"Warning: Time data not found for {var_info.var_name} in ThinModelMock") + pass # _time will remain None + return data_item + + print(f"Unknown key: {name} in ThinModelMock") + return None + + def execute_operation_from_spec(self, output_spec: PlotSpec, input_data_items: list[DataItem]) -> DataItem | None: + """ + Executes a mathematical operation defined by output_spec using input_data_items. + This is used for reproducing signals when loading plots. + """ + if not input_data_items: + print(f"Error: No input data items provided for operation: {output_spec.name}") + return None + + result_array = None + op_details = output_spec.operation_details if output_spec.operation_details else {} + + # Common setup for DataItem + new_data_item_name = output_spec.name + # Time array should be consistent with inputs; use the first input's time. + # This assumes all inputs to an operation share a compatible time basis. + time_array = input_data_items[0].time + # avg_dt might be needed for some operations + avg_dt = np.mean(np.diff(time_array)).item() if time_array is not None and len(time_array) > 1 else 0.0 + + + if output_spec.source_type == "math_expr": + expression_str = output_spec.expression + if not expression_str: + print(f"Error: No expression string found in PlotSpec for: {output_spec.name}") + return None + + e_data = {} + # The expression uses 'x0', 'x1', ... which correspond to input_data_items + # Their PlotSpecs are output_spec.input_plot_specs + # The actual variable names used in the expression ('x0', 'x1') are implicitly mapped by order. + for i, item in enumerate(input_data_items): + e_data[f'x{i}'] = item.data + + try: + expr_parsed = self.parser.parse(expression_str) + result_array = expr_parsed.evaluate(e_data) + except Exception as e: + print(f"Error evaluating expression '{expression_str}' for '{output_spec.name}': {e}") + return None + + # Implementation for specific math operations + elif output_spec.source_type == "math_filter": + from scipy import signal as scipy_signal # Avoid conflict with PyQt signal + input_data = input_data_items[0].data + if not all(k in op_details for k in ['order', 'type', 'cutoff', 'filtfilt']): + print(f"Error: Missing parameters in operation_details for filter: {output_spec.name}") + return None + fs = 1 / avg_dt if avg_dt > 0 else 1.0 # Avoid division by zero + Wn = op_details['cutoff'] / (0.5 * fs) + b, a = scipy_signal.butter(op_details['order'], Wn, btype=op_details['type']) + if op_details['filtfilt']: + result_array = scipy_signal.filtfilt(b, a, input_data, method='gust') + else: + result_array = scipy_signal.lfilter(b, a, input_data) + + elif output_spec.source_type == "math_diff": + input_data = input_data_items[0].data + if time_array is not None and len(time_array) == len(input_data) and len(time_array) > 1: + result_array = np.concatenate(([0], np.diff(input_data) / np.diff(time_array))) + else: + print(f"Error: Invalid time_array for differentiation: {output_spec.name}") + return None + + elif output_spec.source_type == "math_integrate": + input_data = input_data_items[0].data + if time_array is not None and len(time_array) == len(input_data) and len(time_array) > 1: + result_array = np.cumsum(input_data * np.concatenate(([0], np.diff(time_array)))) + else: + print(f"Error: Invalid time_array for integration: {output_spec.name}") + return None + + elif output_spec.source_type == "math_running_minmax": + from scipy.ndimage.filters import maximum_filter1d, minimum_filter1d + input_data = input_data_items[0].data + if not all(k in op_details for k in ['type', 'window_sz', 'is_ticks']): + print(f"Error: Missing parameters for running_minmax: {output_spec.name}") + return None + + window_sz_ticks = op_details['window_sz'] + if not op_details['is_ticks']: # Convert time window to ticks + if avg_dt <= 0: + print(f"Error: avg_dt is <=0, cannot convert time window to ticks for {output_spec.name}") + return None + window_sz_ticks = round(op_details['window_sz'] / avg_dt) + window_sz_ticks = int(max(1, window_sz_ticks)) # Ensure positive integer + + func = minimum_filter1d if op_details['type'] == MinMaxType.MIN.name.lower() else maximum_filter1d # Requires MinMaxType enum or string comparison + offset = math.ceil(0.5 * window_sz_ticks) - 1 + result_array = func(input_data, size=window_sz_ticks, mode='nearest', origin=int(offset)) + + + elif output_spec.source_type == "math_running_window": + from scipy import signal as scipy_signal # Avoid conflict + input_data = input_data_items[0].data + if not all(k in op_details for k in ['type', 'window_sz', 'is_ticks']): + print(f"Error: Missing parameters for running_window: {output_spec.name}") + return None + + window_sz_ticks = op_details['window_sz'] + if not op_details['is_ticks']: + if avg_dt <= 0: + print(f"Error: avg_dt is <=0, cannot convert time window to ticks for {output_spec.name}") + return None + window_sz_ticks = round(op_details['window_sz'] / avg_dt) + window_sz_ticks = int(max(1, window_sz_ticks)) + + if op_details['type'] == WindowTypes.MEAN.name.lower(): # Requires WindowTypes enum or string comparison + result_array = np.convolve(input_data, np.ones(window_sz_ticks) / float(window_sz_ticks), mode='same') + else: # MEDIAN + if window_sz_ticks % 2 == 0: window_sz_ticks += 1 # Median filter window must be odd + result_array = scipy_signal.medfilt(input_data, kernel_size=window_sz_ticks) + + else: + print(f"Error: Unknown math source_type '{output_spec.source_type}' for '{output_spec.name}'") + return None + + if result_array is None: + print(f"Error: Math operation did not produce a result_array for '{output_spec.name}'") + return None + + # Create the DataItem + new_data_item = DataItem(name=new_data_item_name, data=result_array, plot_spec=output_spec) + new_data_item._time = time_array + + # Add to MathsWidget's internal tracking (_vars and var_out list) + # The 'source' for VarInfo when reproducing is self (MathsWidget), as it's the reproducer. + self.add_new_var(new_data_item, self, output_spec) + + return new_data_item diff --git a/plot_manager.py b/plot_manager.py index 5fefedc..596b10d 100644 --- a/plot_manager.py +++ b/plot_manager.py @@ -6,6 +6,13 @@ from QRangeSlider import QRangeSlider from sub_plot_widget import SubPlotWidget +from plot_spec import PlotSpec +from data_model import DataItem # Added import + +from typing import TYPE_CHECKING, Dict, Any # Added imports +if TYPE_CHECKING: + from maths_widget import MathsWidget + from data_file_widget import DataFileWidget import math import graph_utils @@ -299,6 +306,113 @@ def _get_index(self, subplot): def _get_plot(self, idx): return self.plot_area.itemAt(idx).widget() + def _reproduce_signal(self, plot_spec_dict: Dict[str, Any], + data_file_widget_ref: 'DataFileWidget', + maths_widget_ref: 'MathsWidget', + loaded_signals_cache: Dict[str, DataItem]) -> DataItem | None: + """ + Recursively reproduces a signal based on its PlotSpec dictionary. + Uses a cache to avoid reprocessing the same signal. + """ + if not plot_spec_dict: + print("Error: Received empty plot_spec_dict.") + return None + + try: + plot_spec = PlotSpec.from_dict(plot_spec_dict) + except Exception as e: + print(f"Error converting dict to PlotSpec: {e}. Dict: {plot_spec_dict}") + return None + + if plot_spec.unique_id in loaded_signals_cache: + return loaded_signals_cache[plot_spec.unique_id] + + data_item: DataItem | None = None + + if plot_spec.source_type == "file": + if plot_spec.file_source_identifier and plot_spec.original_name: + # Assume DataFileWidget has a method to get a DataItem by some identifier and original name + # This method would need to find the correct VarListWidget/DataModel and get the DataItem + # For now, this is a conceptual call. + # data_item = data_file_widget_ref.get_data_item_by_file_id_and_name( + # plot_spec.file_source_identifier, + # plot_spec.original_name + # ) + + # Placeholder for DataFileWidget interaction: + # We need a way to map file_source_identifier to a specific DataModel/VarListWidget + # and then retrieve the DataItem. + # Let's assume data_file_widget_ref can provide access to its models/var_lists + found_model = None + if hasattr(data_file_widget_ref, 'var_lists'): # Assuming var_lists is a list of VarListWidget + for vl in data_file_widget_ref.var_lists: + # This matching logic for file_source_identifier needs to be robust. + # It could be a filename, an index, or a unique ID assigned to the source. + current_model_identifier = vl.filename if hasattr(vl, 'filename') else str(vl.idx) + if current_model_identifier == plot_spec.file_source_identifier: + found_model = vl.model() # DataModel + break + + if found_model: + data_item = found_model.get_data_by_name(plot_spec.original_name) # This returns DataItem + if data_item and data_item.plot_spec is None: + # If the original DataItem didn't have a PlotSpec (e.g. from older save), + # assign the one we just loaded/recreated. + data_item.plot_spec = plot_spec + elif data_item and data_item.plot_spec and data_item.plot_spec.unique_id != plot_spec.unique_id: + # This might happen if the file was reloaded and IDs changed. + # The loaded plot_spec should be preferred for consistency of the loaded plot. + print(f"Info: Replacing PlotSpec on DataItem {data_item.var_name} with loaded PlotSpec.") + data_item.plot_spec = plot_spec + + if data_item is None: + print(f"Error: Could not find 'file' signal '{plot_spec.original_name}' from source '{plot_spec.file_source_identifier}'.") + + else: + print(f"Error: 'file' source_type missing file_source_identifier or original_name for PlotSpec: {plot_spec.name}") + + elif plot_spec.source_type.startswith("math_"): + input_data_items: list[DataItem] = [] + valid_inputs = True + for input_spec_dict in plot_spec_dict.get('input_plot_specs', []): # Use dict for recursion + input_data_item = self._reproduce_signal( + input_spec_dict, + data_file_widget_ref, + maths_widget_ref, + loaded_signals_cache + ) + if input_data_item: + input_data_items.append(input_data_item) + else: + print(f"Error: Could not reproduce input signal for {plot_spec.name}. Input spec dict: {input_spec_dict}") + valid_inputs = False + break # Stop if any input fails + + if valid_inputs: + # Pass the PlotSpec object, not the dict + data_item = maths_widget_ref.execute_operation_from_spec(plot_spec, input_data_items) + if data_item and data_item.plot_spec is None: + # Ensure the reproduced DataItem has its plot_spec set (execute_operation_from_spec should ideally do this) + print(f"Warning: PlotSpec was not set by execute_operation_from_spec for {data_item.var_name}. Setting it now.") + data_item.plot_spec = plot_spec + elif data_item and data_item.plot_spec and data_item.plot_spec.unique_id != plot_spec.unique_id: + # If execute_operation_from_spec set a different PlotSpec (e.g. a new one with a new ID), + # prefer the one that guided the reproduction to maintain consistency with the saved plot. + print(f"Info: Overwriting PlotSpec on DataItem {data_item.var_name} from math operation with the loaded PlotSpec for consistency.") + data_item.plot_spec = plot_spec + + else: + print(f"Error: Failed to reproduce one or more input signals for math operation: {plot_spec.name}") + + else: + print(f"Error: Unknown or unsupported source_type: {plot_spec.source_type} for PlotSpec: {plot_spec.name}") + + + if data_item: + loaded_signals_cache[plot_spec.unique_id] = data_item + + return data_item + def get_plot_info(self): n_plots = self.plot_area.count() plotlist = dict() @@ -323,19 +437,66 @@ def generate_plots(self, plot_info, data_source, clear_existing=True): self.remove_subplot(self._get_plot(self.plot_area.count() - 1)) # Walk the list of traces and produce the plots. - for i in range(requested_count): - plot = plot_info["plots"][i] + loaded_signals_cache: Dict[str, DataItem] = {} + # Assuming controller and its widgets are accessible. Adjust paths if necessary. + # These refs might be better passed in or accessed via a more stable interface. + maths_widget_ref = self.plot_manager()._controller.maths_widget if hasattr(self.plot_manager()._controller, 'maths_widget') else None + data_file_widget_ref = self.plot_manager()._controller.data_file_widget if hasattr(self.plot_manager()._controller, 'data_file_widget') else None + + if not maths_widget_ref or not data_file_widget_ref: + print("Error: MathsWidget or DataFileWidget reference not found. Cannot reproduce signals.") + return + for i in range(requested_count): + plot_dict = plot_info["plots"][i] # Renamed 'plot' to 'plot_dict' to avoid confusion subplot = self._get_plot(i) if clear_existing: subplot.clear_plot() - for trace in plot["traces"]: - subplot.plot_data_from_source(trace, data_source) + for trace_spec_dict in plot_dict["traces"]: # trace_spec_dict is a PlotSpec dictionary + # The 'data_source' argument in the original plot_data_from_source was a single + # VarListWidget. This is not suitable for reproduced signals which can come from + # various files or be math-generated. + # We now use _reproduce_signal to get the DataItem. + + data_item_to_plot = self._reproduce_signal( + trace_spec_dict, # This is the plot_spec_dict for the trace + data_file_widget_ref, + maths_widget_ref, + loaded_signals_cache + ) + + if data_item_to_plot: + # The SubPlotWidget.plot_data_from_source method will need to be adapted + # to accept a DataItem directly, or a new method like plot_reproduced_data_item + # needs to be created. For now, we assume plot_data_from_source can handle this. + # The 'source' argument here is tricky. For file items, it was the VarListWidget. + # For math items, it was MathsWidget. + # We pass a reference that CustomPlotItem might use (e.g. for onClose). + # This will be refined when SubPlotWidget is modified. + + # Tentative call, assuming plot_data_from_source is modified: + # The first argument to plot_data_from_source was 'name'. Now we pass DataItem. + # The second argument was 'source'. + # Let's assume a modification to plot_data_from_source like: + # plot_data_from_source(self, name_or_data_item, source_for_connections_if_any=None, is_reproduced_item=False) + + # Determine a 'source' for connection purposes (e.g. onClose). + # If it's a file item, its original VarListWidget source is complex to get here. + # If it's a math item, MathsWidget is the source. + # For now, pass maths_widget_ref as a general source for potential connections. + # This part (source_for_connections) needs careful handling in SubPlotWidget. + subplot.plot_data_from_source( + name_or_data_item=data_item_to_plot, + source_or_none=maths_widget_ref, # Placeholder for source context + is_reproduced_item=True # New flag + ) + else: + print(f"Warning: Could not reproduce signal for spec: {trace_spec_dict.get('name', 'Unknown name')}") # Handle the case where the "yrange" key is missing. - if clear_existing and "yrange" in plot.keys(): + if clear_existing and "yrange" in plot_dict.keys(): # Use plot_dict # Don't mess up the y-range if plots are being appended. subplot.set_y_range(*plot["yrange"]) diff --git a/plot_spec.py b/plot_spec.py new file mode 100644 index 0000000..74d7f5e --- /dev/null +++ b/plot_spec.py @@ -0,0 +1,46 @@ +import uuid +from typing import List, Dict, Optional, Any + +class PlotSpec: + def __init__(self, + name: str, + source_type: str, + unique_id: Optional[str] = None, # Allow providing one, else generate + file_source_identifier: Optional[str] = None, + original_name: Optional[str] = None, + expression: Optional[str] = None, + operation_details: Optional[Dict[str, Any]] = None, + input_plot_specs: Optional[List['PlotSpec']] = None): + self.name = name + self.source_type = source_type + self.unique_id = unique_id if unique_id is not None else str(uuid.uuid4()) + self.file_source_identifier = file_source_identifier + self.original_name = original_name + self.expression = expression + self.operation_details = operation_details + self.input_plot_specs: List['PlotSpec'] = [] if input_plot_specs is None else input_plot_specs + + def to_dict(self) -> Dict[str, Any]: + return { + 'name': self.name, + 'unique_id': self.unique_id, + 'source_type': self.source_type, + 'file_source_identifier': self.file_source_identifier, + 'original_name': self.original_name, + 'expression': self.expression, + 'operation_details': self.operation_details, + 'input_plot_specs': [spec.to_dict() for spec in self.input_plot_specs] if self.input_plot_specs else [] + } + + @staticmethod + def from_dict(data: Dict[str, Any]) -> 'PlotSpec': + return PlotSpec( + name=data['name'], + source_type=data['source_type'], + unique_id=data['unique_id'], + file_source_identifier=data.get('file_source_identifier'), + original_name=data.get('original_name'), + expression=data.get('expression'), + operation_details=data.get('operation_details'), + input_plot_specs=[PlotSpec.from_dict(spec_data) for spec_data in data.get('input_plot_specs', [])] + ) diff --git a/sub_plot_widget.py b/sub_plot_widget.py index 7cce715..15f3e72 100644 --- a/sub_plot_widget.py +++ b/sub_plot_widget.py @@ -122,29 +122,92 @@ def _on_scene_mouse_click_event(self, event): self.parent().plot_manager().set_tick_from_time(t_click) event.accept() - def plot_data_from_source(self, name, source): - y_data = source.model().get_data_by_name(name) - - if y_data is None: + def plot_data_from_source(self, name_or_data_item, source_or_none, is_reproduced_item=False): + data_item: DataItem | None = None + name_to_plot: str = "" + y_data: np.ndarray | None = None + time_data: np.ndarray | None = None + plot_spec_for_custom_item: PlotSpec | None = None + source_for_custom_item = source_or_none # This will be the source for CustomPlotItem + + if is_reproduced_item: + if not isinstance(name_or_data_item, DataItem): + print("Error: Expected DataItem when is_reproduced_item is True.") + return + data_item = name_or_data_item + # Ensure plot_spec exists on the reproduced item, critical for CustomPlotItem + if not data_item.plot_spec: + print(f"Error: Reproduced DataItem '{data_item.var_name}' is missing its PlotSpec.") + return # Cannot proceed without a PlotSpec for the CustomPlotItem + + plot_spec_for_custom_item = data_item.plot_spec + name_to_plot = plot_spec_for_custom_item.name # Use name from PlotSpec for consistency + y_data = data_item.data + time_data = data_item.time + # source_for_custom_item is already set to source_or_none (e.g. maths_widget_ref) + else: + if not isinstance(name_or_data_item, str): + print("Error: Expected signal name string when is_reproduced_item is False.") + return + if source_or_none is None: + print("Error: Source widget cannot be None when is_reproduced_item is False.") + return + + name_to_plot = name_or_data_item + current_source_widget = source_or_none + + data_item = current_source_widget.model().get_data_by_name(name_to_plot) + + if data_item is None: + print(f"Error: Could not retrieve DataItem for '{name_to_plot}' from source.") + return + + plot_spec_for_custom_item = data_item.plot_spec # This should exist due to prior subtasks + y_data = data_item.data + time_data = data_item.time if data_item.time is not None else current_source_widget.time + # source_for_custom_item is already set (it's current_source_widget) + + if y_data is None: # Should be caught by data_item is None earlier, but as a safeguard + print(f"Error: No y_data available for '{name_to_plot}'.") + return + if not isinstance(y_data, np.ndarray): + print(f"Warning: Data for '{name_to_plot}' is not a numpy array. Attempting conversion.") + try: + y_data = np.array(y_data) + except Exception as e: + print(f"Error converting data for '{name_to_plot}' to numpy array: {e}") + return + + if time_data is None: + print(f"Error: No time_data available for '{name_to_plot}'. Cannot plot.") return - item = self.pw.getPlotItem().plot(x=source.time, - y=y_data, - pen=pg.mkPen(color=self._get_color(self.cidx), - width=CustomPlotItem.PEN_WIDTH), - name=name, - # clipToView=True, - autoDownsample=True, - downsampleMethod='peak') - - label = CustomPlotItem(self, item, source, self.parent().plot_manager()._tick) + # Actual plotting + pg_plot_item = self.pw.getPlotItem().plot(x=time_data, + y=y_data, + pen=pg.mkPen(color=self._get_color(self.cidx), + width=CustomPlotItem.PEN_WIDTH), + name=name_to_plot, + autoDownsample=True, + downsampleMethod='peak') + + label = CustomPlotItem(self, pg_plot_item, source_for_custom_item, + self.parent().plot_manager()._tick, + plot_spec_from_source=plot_spec_for_custom_item) self._traces.append(label) self._labels.addWidget(label) self.parent().plot_manager().timeValueChanged.connect(label.on_time_changed) - source.onClose.connect(lambda: self.remove_item(item, label)) - self.cidx += 1 + if hasattr(source_for_custom_item, 'onClose') and source_for_custom_item.onClose is not None: + try: + source_for_custom_item.onClose.connect(lambda: self.remove_item(pg_plot_item, label)) + except Exception as e: # Broad exception to catch issues with signal connection + print(f"Warning: Could not connect onClose for source of '{name_to_plot}'. Error: {e}") + elif not is_reproduced_item: # Only warn if not reproduced, as reproduced might not have traditional source + print(f"Warning: Source for '{name_to_plot}' does not have 'onClose' signal or it's None.") + + self.cidx += 1 self.update_plot_yrange() def remove_item(self, trace, label): @@ -190,7 +253,8 @@ def get_plot_info(self): # isn't documented in the public API. y_range = self.pw.getPlotItem().getAxis('left').range plot_info['yrange'] = y_range - plot_info['traces'] = [trace.get_plot_spec() for trace in self._traces if trace.isVisible()] + # Ensure PlotSpec objects are serialized to dictionaries + plot_info['traces'] = [trace.get_plot_spec().to_dict() for trace in self._traces if trace.isVisible()] return plot_info