From 1b18a8e24f781257abb912ffce9e262db26d016e Mon Sep 17 00:00:00 2001 From: Esteban Zimanyi Date: Thu, 21 May 2026 13:14:06 +0200 Subject: [PATCH] feat(wirings): stateless tier DataStream wirings for the generated MEOS facades MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the org.mobilitydb.flink.meos.wirings package — thin, generic Flink-DataStream wrappers around the generated MeosOps* facades from PR #5, organized per streaming tier. This PR ships the stateless tier: - MeosStatelessMap: generic MapFunction wrapping any stateless MeosOps* method (804 of the 2,097 generated methods qualify per the v4 baseline — 92 OO-classified + 712 free-fn) - MeosStatelessFilter: generic FilterFunction wrapping any stateless boolean-returning MeosOps* method, plus a .fromIntPredicate(...) adapter for JMEOS' int-coded predicates - demo/MeosWiringsDemoJob: runnable end-to-end DataStream pipeline parsing TBox WKT → filtering by overlap with a query box → serializing surviving boxes to hex-WKB, all through the generated facades wired via this package - README documenting tier vocabulary, the wrap-once-use-everywhere pattern, the DataStream-API-only design choice (Table API as future follow-up), and coexistence with berlinmod.MEOSBridge Future follow-ups (one PR per tier, mirroring this one's shape): - MeosBoundedStateMap (generic KeyedProcessFunction with ValueState for MEOS handle per key — covers 797 of the generated methods) - MeosWindowedAggregate (generic ProcessWindowFunction — 161 methods) - MeosCrossStreamJoin (generic KeyedCoProcessFunction or interval-join — 140 methods) - Optional: Table API sibling (MeosScalarUDF + MeosCatalogRegistrar) if the repo adopts Table API for other reasons Stacks on codegen/flink-meos-ops (PR #5). Additive-only; touches no existing file. Locally compile-verified: 129 .class files total (123 from the parent PR + 6 new from this package's classes + demo + their nested lambdas). (cherry picked from commit 457987c935015ab373116414f513919d1812c366) --- .../meos/wirings/MeosStatelessFilter.java | 87 +++++++++++++++ .../flink/meos/wirings/MeosStatelessMap.java | 93 ++++++++++++++++ .../flink/meos/wirings/MeosWiringRuntime.java | 37 +++++++ .../mobilitydb/flink/meos/wirings/README.md | 89 +++++++++++++++ .../meos/wirings/demo/MeosWiringsDemoJob.java | 101 ++++++++++++++++++ 5 files changed, 407 insertions(+) create mode 100644 flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/MeosStatelessFilter.java create mode 100644 flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/MeosStatelessMap.java create mode 100644 flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/MeosWiringRuntime.java create mode 100644 flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/README.md create mode 100644 flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/demo/MeosWiringsDemoJob.java diff --git a/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/MeosStatelessFilter.java b/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/MeosStatelessFilter.java new file mode 100644 index 0000000..080d2a6 --- /dev/null +++ b/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/MeosStatelessFilter.java @@ -0,0 +1,87 @@ +package org.mobilitydb.flink.meos.wirings; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.configuration.Configuration; + +import java.io.Serializable; + +/** + * DataStream wiring for the {@code stateless} streaming tier of the + * generated {@code org.mobilitydb.flink.meos.MeosOps*} facades — the + * predicate-shaped sibling of {@link MeosStatelessMap}. + * + *

Wraps any {@code MeosOps*.f(...)} call that returns {@code boolean} + * (or {@code int} interpreted as a 0/1 flag, common in JMEOS' int-coded + * predicates) and whose streaming tier is {@code stateless} (per + * {@code tools/codegen/meos-ops-manifest.json}) as a Flink + * {@link FilterFunction}. No per-key state; each event filtered + * independently. + * + *

Typical usage: scalar-predicate filter against the + * generated {@code MeosOpsTBox.overlaps_tbox_tbox} (tier = + * {@code stateless}): + * + *

{@code
+ * DataStream in = ...;
+ * DataStream overlapping = in.filter(
+ *     new MeosStatelessFilter<>(
+ *         pair -> MeosOpsTBox.overlaps_tbox_tbox(pair.a, pair.b)));
+ * }
+ * + *

For int-coded predicates (JMEOS returns {@code int} for some MEOS + * predicates rather than {@code boolean}), use + * {@link #fromIntPredicate}: + * + *

{@code
+ * DataStream in = ...;
+ * DataStream adj = in.filter(
+ *     MeosStatelessFilter.fromIntPredicate(
+ *         pair -> MeosOpsFreeGeo.adjacent_stbox_stbox(pair.a, pair.b)));
+ * }
+ * + * @param the record type being filtered + */ +public final class MeosStatelessFilter extends RichFilterFunction { + + /** Serializable boolean-returning per-event MEOS predicate. */ + @FunctionalInterface + public interface MeosPredicate extends Serializable { + boolean test(IN event) throws Exception; + } + + /** Serializable int-returning per-event MEOS predicate (0/1 flag). */ + @FunctionalInterface + public interface MeosIntPredicate extends Serializable { + int test(IN event) throws Exception; + } + + private final MeosPredicate predicate; + + public MeosStatelessFilter(MeosPredicate predicate) { + this.predicate = predicate; + } + + /** + * Adapt an {@code int}-returning generated MEOS predicate (treating + * non-zero as {@code true}) into a Flink {@code FilterFunction}. + */ + public static MeosStatelessFilter fromIntPredicate(MeosIntPredicate p) { + return new MeosStatelessFilter<>(event -> p.test(event) != 0); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + MeosWiringRuntime.ensureInitializedOnThread(); + } + + @Override + public boolean filter(IN event) throws Exception { + // When chained to a legacy source, records are processed on the source's + // emitter thread rather than the thread open() ran on; the ThreadLocal + // guard makes this a cheap no-op after the first call per thread. + MeosWiringRuntime.ensureInitializedOnThread(); + return predicate.test(event); + } +} diff --git a/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/MeosStatelessMap.java b/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/MeosStatelessMap.java new file mode 100644 index 0000000..aadb4ac --- /dev/null +++ b/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/MeosStatelessMap.java @@ -0,0 +1,93 @@ +package org.mobilitydb.flink.meos.wirings; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.configuration.Configuration; + +import java.io.Serializable; + +/** + * DataStream wiring for the {@code stateless} streaming tier of the + * generated {@code org.mobilitydb.flink.meos.MeosOps*} facades. + * + *

Wraps any {@code MeosOps*.f(...)} call whose streaming tier is + * {@code stateless} (per {@code tools/codegen/meos-ops-manifest.json} + * or {@code meos-ops-free-manifest.json}) as a Flink + * {@link MapFunction}. No per-key state is allocated; each event is + * mapped independently. The wrapped call: + * + *

    + *
  • does no MEOS-handle state across events (per the + * {@code stateless} tier contract);
  • + *
  • does not touch the time domain (no window required);
  • + *
  • may delegate to MEOS via the bundled JMEOS jar when + * {@code MeosOpsRuntime.MEOS_AVAILABLE} — otherwise the + * generated facade throws {@code UnsupportedOperationException}.
  • + *
+ * + *

Typical usage: register a stateless MEOS predicate / arithmetic + * call as a per-event map step in a DataStream pipeline. Example with + * the generated {@code MeosOpsTBox.overlaps_tbox_tbox} (tier = + * {@code stateless}, per the codegen manifest): + * + *

{@code
+ * DataStream in = ...;            // (tboxA, tboxB)
+ * DataStream overlap = in.map(
+ *     new MeosStatelessMap<>(
+ *         pair -> MeosOpsTBox.overlaps_tbox_tbox(pair.a, pair.b)));
+ * }
+ * + *

Tier coverage: as of the codegen state on the parent PR, + * 804 of the 2,097 generated methods are {@code stateless} (92 OO- + * classified + 712 free-fn). Any of those can be wrapped through this + * single class without per-method boilerplate. + * + *

Coexistence with {@code berlinmod.MEOSBridge}: this is the + * low-level catalog-shaped wiring; {@code MEOSBridge} stays as + * the high-level query-shaped wiring for the BerlinMOD-9 suite. + * Both share the same {@code MeosOpsRuntime.MEOS_AVAILABLE} discipline. + * + * @param the input record type + * @param the output type returned by the wrapped MEOS call + */ +public final class MeosStatelessMap extends RichMapFunction { + + /** + * Serializable per-event MEOS call. Implementations forward to a + * generated {@code MeosOps*.f(...)} static method, returning the + * Java type that the generated facade exposes. + */ + @FunctionalInterface + public interface MeosCall extends Serializable { + OUT apply(IN event) throws Exception; + } + + private final MeosCall call; + + /** + * @param call serializable lambda forwarding to a stateless + * generated MEOS facade method. The lambda must be + * serializable (Java 8+ lambdas implementing a + * {@link Serializable} functional interface are). + */ + public MeosStatelessMap(MeosCall call) { + this.call = call; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + // No per-key state in the stateless tier; the only per-operator + // concern is MEOS' per-thread session, initialized on this task thread. + MeosWiringRuntime.ensureInitializedOnThread(); + } + + @Override + public OUT map(IN event) throws Exception { + // When chained to a legacy source, records are processed on the source's + // emitter thread rather than the thread open() ran on; the ThreadLocal + // guard makes this a cheap no-op after the first call per thread. + MeosWiringRuntime.ensureInitializedOnThread(); + return call.apply(event); + } +} diff --git a/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/MeosWiringRuntime.java b/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/MeosWiringRuntime.java new file mode 100644 index 0000000..51840ae --- /dev/null +++ b/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/MeosWiringRuntime.java @@ -0,0 +1,37 @@ +package org.mobilitydb.flink.meos.wirings; + +import functions.GeneratedFunctions; + +/** + * Per-thread MEOS initialization for the {@code org.mobilitydb.flink.meos.wirings} + * operators. + * + *

MEOS keeps its timezone / session state per OS thread. Each Flink + * subtask runs on its own task thread, so every wiring operator must + * initialize MEOS on that thread from its {@code open()} — the JVM-wide + * probe in {@code MeosOpsRuntime} only covers the thread that first + * touches a facade class (typically the job's main thread), not the task + * threads where the operators actually run. + * + *

{@link #ensureInitializedOnThread()} is idempotent per thread (guarded + * by a {@link ThreadLocal}), so it is safe to call from every operator's + * {@code open()} even when operators are chained onto the same thread. It + * installs a no-op error handler so a MEOS-side error surfaces as a thrown + * exception rather than terminating the JVM. + */ +public final class MeosWiringRuntime { + + private static final ThreadLocal INITIALIZED = + ThreadLocal.withInitial(() -> Boolean.FALSE); + + private MeosWiringRuntime() { /* utility */ } + + /** Initialize MEOS on the calling thread exactly once. */ + public static void ensureInitializedOnThread() { + if (!INITIALIZED.get()) { + GeneratedFunctions.meos_initialize_error_handler((level, code, message) -> { }); + GeneratedFunctions.meos_initialize(); + INITIALIZED.set(Boolean.TRUE); + } + } +} diff --git a/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/README.md b/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/README.md new file mode 100644 index 0000000..ffcb437 --- /dev/null +++ b/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/README.md @@ -0,0 +1,89 @@ +# DataStream wirings for the generated MEOS facades + +This package supplies thin, generic Flink-DataStream wrappers around +the generated `org.mobilitydb.flink.meos.MeosOps*` facades, organized +per **streaming tier** (per +`tools/codegen/meos-ops-manifest.json` + `tools/codegen/meos-ops-free-manifest.json`): + +| Tier | Wiring class(es) here | Status in this package | +|---|---|---| +| `stateless` | [`MeosStatelessMap`](MeosStatelessMap.java) (generic `MapFunction`) · [`MeosStatelessFilter`](MeosStatelessFilter.java) (generic `FilterFunction`) | ✅ shipped | +| `bounded-state` | `MeosBoundedStateMap` (generic `KeyedProcessFunction` with `ValueState` per key) | next follow-up | +| `windowed` | `MeosWindowedAggregate` (generic `ProcessWindowFunction`) | next follow-up | +| `cross-stream` | `MeosCrossStreamJoin` (generic `KeyedCoProcessFunction` or interval-join) | next follow-up | +| `io-meta` | covered transitively by the stateless wirings (no state, no window) | n/a | +| `sequence-only` | inherently non-streamable — no wiring | n/a | + +The wirings are **generic**: each takes a serializable lambda +forwarding to whichever generated `MeosOps*.f(...)` method the adopter +needs. No per-method boilerplate, no per-method registration — +adopters wire the entire ~800-method `stateless` slice through +`MeosStatelessMap` / `MeosStatelessFilter` without touching this +package. + +## Why DataStream rather than Table API + +The repo's existing pipeline (`berlinmod/`, `aisdata/`) is +DataStream-API only. Sticking to DataStream avoids adding the +~50 MB `flink-table-planner` runtime dependency to the build matrix. +A Table-API-shaped sibling +(`MeosOpsTableCatalogRegistrar` / `MeosScalarUDF` / `MeosAggregateFunction`) +is a clean follow-up if/when the repo adopts Table API for other +reasons. + +## How a generated MEOS call becomes a Flink operator + +The pattern is the same across all four tiers: + +```java +// 1. Pick the generated MeosOps method +// (Javadoc tier marker tells you which wiring to use) +boolean overlap = MeosOpsTBox.overlaps_tbox_tbox(boxA, boxB); // tier = stateless + +// 2. Wrap with the matching wiring +MeosStatelessFilter filter = MeosStatelessFilter.fromIntPredicate( + pair -> MeosOpsTBox.overlaps_tbox_tbox(pair.a, pair.b)); + +// 3. Apply to the DataStream +DataStream overlapping = stream.filter(filter); +``` + +`MEOS_AVAILABLE` is probed once per JVM by `MeosOpsRuntime`'s static +initializer (shared across all `MeosOps*` and `MeosOpsFree*` +facades). When unavailable, every generated method throws +`UnsupportedOperationException` with a clear message — the wiring +layer doesn't have to handle that itself. + +## End-to-end runnable demo + +[`demo/MeosWiringsDemoJob.java`](demo/MeosWiringsDemoJob.java) walks +through a 3-stage DataStream pipeline using two of the generated +facades wired through `MeosStatelessMap` + `MeosStatelessFilter`: + +1. Parse a stream of TBox WKT strings via + `MeosOpsFreeCore.tbox_in` (io-meta, no state). +2. Filter to those overlapping a fixed query box via + `MeosOpsTBox.overlaps_tbox_tbox` (stateless predicate). +3. Serialize each survivor to hex-WKB via + `MeosOpsTBox.tbox_as_hexwkb` (io-meta, no state). + +Run with: + +```bash +mvn -q exec:java \ + -Dexec.mainClass=org.mobilitydb.flink.meos.wirings.demo.MeosWiringsDemoJob \ + -Dmobilityflink.meos.enabled=true +``` + +Output (expected): two `overlapping-tbox-hex` lines (the two input +boxes that overlap the query box), one disjoint box dropped, one +`MeosWirings stateless tier demo` job completion line. + +## Coexistence with `berlinmod.MEOSBridge` + +`MEOSBridge.java` is the BerlinMOD-specific, hand-written bridge for +the 9-query streaming-form parity matrix — high-level and +query-shaped. The wirings here are low-level and catalog-shaped — +applicable to any of the ~800 stateless or 800 bounded-state +generated facade methods, not just the BerlinMOD-9 subset. Both +share the same `MEOS_AVAILABLE` discipline (`MeosOpsRuntime`). diff --git a/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/demo/MeosWiringsDemoJob.java b/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/demo/MeosWiringsDemoJob.java new file mode 100644 index 0000000..0597a86 --- /dev/null +++ b/flink-processor/src/main/java/org/mobilitydb/flink/meos/wirings/demo/MeosWiringsDemoJob.java @@ -0,0 +1,101 @@ +package org.mobilitydb.flink.meos.wirings.demo; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.mobilitydb.flink.meos.MeosOpsFreeCore; +import org.mobilitydb.flink.meos.MeosOpsTBox; +import org.mobilitydb.flink.meos.wirings.MeosStatelessFilter; +import org.mobilitydb.flink.meos.wirings.MeosStatelessMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; + +/** + * End-to-end runnable demo showing how the generated + * {@code org.mobilitydb.flink.meos.MeosOps*} facades wire into a Flink + * {@code DataStream} pipeline through the + * {@code org.mobilitydb.flink.meos.wirings} helpers. + * + *

The demo: + *

    + *
  1. Builds a small in-memory stream of TBox-WKT strings.
  2. + *
  3. Parses each into a JMEOS {@code Pointer} via + * {@code MeosOpsTBox.tbox_in} (tier = {@code io-meta}).
  4. + *
  5. Filters to those that overlap with a fixed query TBox via + * {@code MeosOpsTBox.overlaps_tbox_tbox} wrapped as a + * {@link MeosStatelessFilter} (tier = {@code stateless}).
  6. + *
  7. Maps each surviving TBox to its serialized WKB hex via + * {@code MeosOpsTBox.tbox_as_hexwkb} wrapped as a + * {@link MeosStatelessMap} (tier = {@code io-meta} but no per-key + * state, so the {@code stateless} wiring works for it too).
  8. + *
+ * + *

Run with: + * + *

{@code
+ * mvn -q exec:java \
+ *     -Dexec.mainClass=org.mobilitydb.flink.meos.wirings.demo.MeosWiringsDemoJob \
+ *     -Dmobilityflink.meos.enabled=true   # require libmeos loadable
+ * }
+ * + *

If libmeos is not loadable on the runtime (or + * {@code -Dmobilityflink.meos.enabled=false}), every wrapped MeosOps + * call throws {@code UnsupportedOperationException} with a clear + * message — the demo prints the throw shape and exits non-zero. + */ +public final class MeosWiringsDemoJob { + + private static final Logger LOG = LoggerFactory.getLogger(MeosWiringsDemoJob.class); + + /** A small box covering (xmin=0, ymin=0, xmax=10, ymax=10). */ + private static final String QUERY_TBOX_WKT = "TBOX XT([0,10],[2026-01-01,2026-01-02])"; + + /** Three input boxes — two overlap the query box, one doesn't. */ + private static final String[] INPUT_TBOX_WKTS = { + "TBOX XT([5,15],[2026-01-01,2026-01-02])", // overlaps + "TBOX XT([20,30],[2026-01-01,2026-01-02])", // disjoint + "TBOX XT([3,8],[2026-01-01,2026-01-02])", // overlaps + }; + + public static void main(String[] args) throws Exception { + // Probe MEOS availability (the static initializer in MeosOpsRuntime + // fires the first time any MeosOps class is touched). + if (!MeosOpsTBox.MEOS_AVAILABLE) { + LOG.error("MEOS not available — the demo requires libmeos. " + + "Set -Dmobilityflink.meos.enabled=true and ensure libmeos is loadable."); + System.exit(1); + } + + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStream tboxWkts = env.fromCollection(Arrays.asList(INPUT_TBOX_WKTS)); + + // The records crossing operator boundaries are serialized MEOS values + // (WKT text) — never raw native pointers, which are process-local and + // not serializable across Flink tasks. Each operator parses to a + // transient MEOS handle, calls MEOS, and re-serializes. + + // Stage 1: parse each WKT and re-serialize via tbox_out (stateless io-meta). + DataStream normalized = tboxWkts.map( + new MeosStatelessMap( + wkt -> MeosOpsTBox.tbox_out(MeosOpsTBox.tbox_in(wkt), 6))) + .returns(Types.STRING); + + // Stage 2: filter to those overlapping the query box (stateless). + // The query box is the constant WKT operand, parsed inside the predicate; + // overlaps_tbox_tbox lives on MeosOpsFreeCore (free fn, not OO-classified). + DataStream overlapping = normalized.filter( + MeosStatelessFilter.fromIntPredicate( + wkt -> MeosOpsFreeCore.overlaps_tbox_tbox( + MeosOpsTBox.tbox_in(wkt), + MeosOpsTBox.tbox_in(QUERY_TBOX_WKT)))); + + overlapping.print("overlapping-tbox"); + + env.execute("MeosWirings stateless tier demo"); + } +}