diff --git a/reflector/__init__.py b/reflector/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/reflector/etl.py b/reflector/etl.py new file mode 100644 index 0000000..230dfac --- /dev/null +++ b/reflector/etl.py @@ -0,0 +1,12 @@ + + +class RtTemplateVariable: + def __init__(self, var_name: str): + self.var_name = var_name + self.value = None + +# class RtTemplateInstructionListPseudoCompiler: + +# class RtTemplateInstructionList: + +# class RtTemplateInstructionListExecutor: diff --git a/reflector/events.py b/reflector/events.py new file mode 100644 index 0000000..47c22dc --- /dev/null +++ b/reflector/events.py @@ -0,0 +1,262 @@ +# events.py +from enum import Enum, auto +from typing import List, Callable, Any, Dict +from abc import ABC, abstractmethod +from typing import Optional +from functools import wraps +from multiprocessing import Lock +from collections import deque, defaultdict +from queue import Queue +import threading + +#TODO Check if a description is ever used. +class DataEventType(Enum): + IMPLICIT = "implicit" + CODED_VALUE = "coded value" + LITERAL_VALUE = "literal value" + JUSTIFIED_ABSENCE = "justified absence" + UNJUSTIFIED_ABSENCE = "unjustified absence" + REDUNDANT_PRESENCE = "redundant presence" + UNJUSTIFIED_PRESENCE = "unjustified presence" + DISALLOWED_VALUE = "disallowed value" + + +def synchronized(member): + """ + @synchronized decorator. + + Lock a method for synchronized access only. The lock is stored to + the function or class instance, depending on what is available. + """ + + @wraps(member) + def wrapper(*args, **kwargs): + lock = vars(member).get("_synchronized_lock", None) + result = None + try: + if lock is None: + lock = vars(member).setdefault("_synchronized_lock", Lock()) + lock.acquire() + result = member(*args, **kwargs) + lock.release() + except Exception as e: + lock.release() + raise e + return result + + return wrapper + + +class DataEvent: # Event structure + + #TODO Have event_type_val be either a string or DataEventType and parse depending on which + def __init__(self, field_name: str, field_value: str, event_type_val: str, record_number: int): + self.field_name = field_name + self.field_value = field_value + self.event_type = DataEventType(event_type_val) + self.record_number = record_number + + + #TODO Figure out what layout for __str__ + + +class ADataEventFilter(ABC): + + def __init__(self): + self.field_name = None + self.field_value = None + self.parse_event = None + + @abstractmethod + def passes(e: DataEvent): + raise NotImplementedError + + def __equals__(self, obj): + if not isinstance(obj, ADataEventFilter): + return False + return self.field_name == obj.field_name and self.field_value == obj.field_value and self.parse_event == obj.parse_event + +class NullDataEventFilter(ADataEventFilter): + def passes(self, e: DataEvent): + assert e is not None + return True + + + +class DataEventFilter(ADataEventFilter): # Base filter class + def __init__(self, field_name: str=None, field_value: str=None, event_type: DataEventType=None): + self.field_name = field_name + self.field_value = field_value + self.event_type = event_type + #TODO Ensure at least one is not None + + def passes(self, e: DataEvent): + return (not self.field_name or (self.field_name == e.field_name)) and (not self.field_value or (self.field_value == e.field_value)) and (not self.event_type or (self.event_type == e.event_type)) + +class ADataEventSubscriber(ABC): + """Abstract DataEventSubscriber Template""" + + def __init__(self): + super.__init__() + self.filter = NullDataEventFilter() + #TODO Figure out what type for writer + self.writer = None + self.count = 0 + self.field_name = "" + self.event_type = None + + @abstractmethod + def notify(self, event: DataEvent): + pass + +class DataEventTypeByFieldCounterSubscriber(ADataEventSubscriber): + def __init__(self, de:DataEventType, field_name:str): + super.__init__() + self.event_type = de + self.field_name = field_name + + def notify(self, event: DataEvent): + if self.event_type == event.event_type and self.field_name == event.field_name: + self.counter += 1 + +class DataEventTypeCounterSubscriber(ADataEventSubscriber): + def __init__(self, de:DataEventType): + super.__init__() + self.event_type = de + self.filter = DataEventFilter(event_type=de) + + def notify(self, e: DataEvent): + if e.event_type == self.event_type: + self.count += 1 + +class AllDataEventStreamWriterSubscriber(ADataEventSubscriber): + def __init__(self, writer, delim='\t'): + super().__init__() + self.writer = writer + self.delim = delim + + def notify(self, data_event:DataEvent): + try: + # Retrieve and write each field with delimiter + self.writer.write(f"{data_event.event_type or 'None'}{self.delim}") + self.writer.write(f"{data_event.field_name or 'None'}{self.delim}") + self.writer.write(f"{data_event.field_value or 'None'}{self.delim}") + self.writer.write(f"{data_event.record_number}\n") + + # Flush after each write to ensure data is immediately written + self.writer.flush() + except IOError as e: + print(f"IO error occurred when writing AllDataEventStreamWriterSubscriber: {e}") + + +class DataEventTypeByFieldStreamWriterSubscriber(ADataEventSubscriber): + def __init__(self, de:DataEventType, field_name:str, writer, delim: str): + super().__init__() + self.writer = writer + self.delim = delim + self.event_type = de + self.field_name = field_name + + def notify(self, data_event: DataEvent): + event_type = data_event.event_type + event_field_name = data_event.field_name + + # Check conditions before writing + if event_type is not None and event_type == self.event_type and event_field_name is not None and event_field_name == self.field_name: + try: + # Write the event data fields, with fallback to 'None' if a field is None + self.writer.write(f"{event_type}{self.delim}") + self.writer.write(f"{event_field_name}{self.delim}") + self.writer.write(f"{data_event.field_value or 'None'}{self.delim}") + self.writer.write(f"{data_event.record_number}\n") + + # Ensure data is written immediately + self.writer.flush() + except IOError as e: + print(f"IO error occurred when writing DataEventTypeByFieldStreamWriterSubscriber: {e}") + +class DataEventTypeStreamWriterSubscriber(ADataEventSubscriber): + def __init__(self, de:DataEventType, writer, delim: str='\t'): + super().__init__() + self.writer = writer + self.delim = delim + self.event_type = de + self.filter = DataEventFilter(event_type=de) + + def notify(self, data_event): + try: + # Write data event type or 'null' if None + self.writer.write(f"{data_event.get_data_event_type() or 'null'}{self.delim}") + + # Write field name or 'null' if None + self.writer.write(f"{data_event.get_field_name() or 'null'}{self.delim}") + + # Write field value or 'null' if None + self.writer.write(f"{data_event.get_field_value() or 'null'}{self.delim}") + + # Write record number and a new line + self.writer.write(f"{data_event.get_record_number()}\n") + + # Flush to ensure data is written immediately + self.writer.flush() + except IOError as e: + print(f"IO error occurred when writing DataEventTypeStreamWriterSubscriber: {e}") + + + +#TODO Decide if I want to stay with static or go to class methods +class DataEventMessageBoard: + """ + Message board that manages events and their subscribers, supporting subscriptions + and broadcasting of events to matching subscribers. + """ + + queue = None + filter_subscriber_map = None + + @staticmethod + @synchronized + def start(): + if DataEventMessageBoard.queue is None: + DataEventMessageBoard.queue = deque(maxlen=50) # deque to hold up to 50 events + DataEventMessageBoard.filter_subscriber_map = defaultdict(list) + + @staticmethod + def publish(event: DataEvent): + DataEventMessageBoard.add_event_to_queue(event) + DataEventMessageBoard.broadcast_event_to_subscribers(event) + + @staticmethod + @synchronized + def add_event_to_queue(event: DataEvent): + # Enqueue event, and remove the oldest if the queue is full + DataEventMessageBoard.queue.append(event) + + @staticmethod + def broadcast_event_to_subscribers(event: DataEvent): + for event_filter, subscribers in DataEventMessageBoard.filter_subscriber_map.items(): + if event_filter.passes(event): + for subscriber in subscribers: + subscriber.notify(event) + + @staticmethod + def subscribe_all(subscriber: ADataEventSubscriber): + DataEventMessageBoard.subscribe(subscriber, NullDataEventFilter()) + + @staticmethod + @synchronized + def subscribe(subscriber: ADataEventSubscriber, event_filter: ADataEventFilter): + # Add subscriber to filter's list + DataEventMessageBoard.filter_subscriber_map[event_filter].append(subscriber) + + # Notify subscriber of all past events that pass the filter + for past_event in DataEventMessageBoard.queue: + if event_filter.passes(past_event): + subscriber.notify(past_event) + + @staticmethod + def unsubscribe(subscriber: ADataEventSubscriber, event_filter: ADataEventFilter): + # Remove subscriber from filter's list + if event_filter in DataEventMessageBoard.filter_subscriber_map: + DataEventMessageBoard.filter_subscriber_map[event_filter].remove(subscriber) + diff --git a/reflector/instructions.py b/reflector/instructions.py new file mode 100644 index 0000000..efa5886 --- /dev/null +++ b/reflector/instructions.py @@ -0,0 +1,110 @@ +# instructions.py +from abc import ABC, abstractmethod +from reflector.etl import RtTemplateVariable +from reflector.events import DataEventMessageBoard, DataEvent, DataEventType +from rt_core_v2.rttuple import RtTuple +from rt_core_v2.ids_codes.rui import Rui + +class RtAbstractInstruction(ABC): + @abstractmethod + def execute(field_sys_vars: list[str], variables: dict[str, RtTemplateVariable]) -> bool: + pass + +class RtVariableAssignmentInstruction(RtAbstractInstruction): + def __init__(self, var_name: str): + self.var_name = var_name + + @abstractmethod + def getVariable() -> RtTemplateVariable: + pass + +class RtAnnotationInstruction(RtAbstractInstruction): + def __init__(self, event_type: DataEventType, field_name: str, field_order_in_table: int): + self.event_type = event_type + self.field_name = field_name + self.field_order_in_table = field_order_in_table + + def execute(self, args: list[str], variables: dict[str, RtTemplateVariable]) -> bool: + rec_num_var = variables["RECORD_NUMBER"] + record_number = rec_num_var.value if rec_num_var else 0 + + field_value = args[self.field_order_in_table] + data_event = DataEvent(self.field_name, field_value, self.event_type, record_number) #TODO - need to send through the field number and record number somehow (copied TODO from java code) + DataEventMessageBoard.publish(data_event) + + return True + +#TODO Fix this class, as it is close to a direct translation. Class is 80% close to being done +# class RtTupleCompletionInstruction(RtAbstractInstruction): +# def __init__(self, tuple_block_fields: list[str], content_block_fields: list[str], subfield_delim: str, quote_open: str, quote_close: str): +# self.tuple_block_fields = tuple_block_fields.copy() +# self.content_block_fields = content_block_fields.copy() +# #TODO Figure out how to implement the factory +# self.tFactory = RtTupleFactory() +# self.subfield_delim = subfield_delim +# self.quote_open = quote_open +# self.quote_close = quote_close +# self.tuple = None + +# def execute(self, args: list[str], variables: dict[str, RtTemplateVariable]) -> bool: +# tuple_block = self._process_fields(self.tuple_block_fields, variables) +# content_block = self._process_fields(self.content_block_fields, variables, args) + +# # Build the tuple +# #TODO Same as previous factory TODO +# self.tuple = self.tFactory.build_rt_tuple_or_temporal_region(tuple_block, content_block) +# return self.tuple is not None + +# def _process_fields(self, fields: list[str], variables: dict[str, RtTemplateVariable], args: list[str] = None) -> list[str]: +# processed_fields = [] +# for s in fields: +# if s.startswith("[") and s.endswith("]"): +# command = s[1:-1] +# if command == "new-rui": +# processed_fields.append(str(Rui())) +# elif command in variables: +# processed_fields.append(variables[command].value) +# else: +# print(f"Unknown command or variable: {command}") +# elif self.subfield_delim in s: +# processed_fields.append(self._handle_subfields(s, variables)) +# elif s.startswith("%") and args: +# field_num = int(s[1:]) +# if field_num < len(args): +# processed_fields.append(args[field_num]) +# elif "=" in s: +# processed_fields.append(self._handle_assignment(s, variables)) +# else: +# processed_fields.append(s) +# return processed_fields + +# def _handle_subfields(self, s: str, variables: dict[str, RtTemplateVariable]) -> str: +# substitution = [] +# subfields = s.split(self.subfield_delim) +# for sub in subfields: +# ref_info = sub.split("=") +# if len(ref_info) > 1: +# command = ref_info[1].strip("[] ") +# var_value = variables[command].value if command in variables else "null" +# substitution.append(f"{ref_info[0].strip()}={var_value}") +# return self.subfield_delim.join(substitution) + +# def _handle_assignment(self, s: str, variables: dict[str, RtTemplateVariable]) -> str: +# ref_info = s.split("=") +# if len(ref_info) == 2: +# command = ref_info[1].strip("[] ") +# var_value = variables.get(command).value if command in variables else "null" +# return f"{ref_info[0].strip()}={var_value}" +# return s + +# def get_tuple(self) -> RtTuple: +# if self.tuple is None: +# raise ValueError("Must execute this instruction before getting the RtTuple.") +# return self.tuple + +# def get_particular_reference(self) -> str: +# return self.content_block_fields[0] + +# class RtAssignFieldValueInstruction(RtAbstractInstruction): + +# class RtAssignRuiInstruction(RtAbstractInstruction): diff --git a/reflector/orchestration.py b/reflector/orchestration.py new file mode 100644 index 0000000..e69de29