Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.mobilitydb.flink.meos.wirings;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.configuration.Configuration;

import java.io.Serializable;

/**
* DataStream wiring for the {@code stateless} streaming tier of the
* generated {@code org.mobilitydb.flink.meos.MeosOps*} facades — the
* predicate-shaped sibling of {@link MeosStatelessMap}.
*
* <p>Wraps any {@code MeosOps*.f(...)} call that returns {@code boolean}
* (or {@code int} interpreted as a 0/1 flag, common in JMEOS' int-coded
* predicates) and whose streaming tier is {@code stateless} (per
* {@code tools/codegen/meos-ops-manifest.json}) as a Flink
* {@link FilterFunction}. No per-key state; each event filtered
* independently.
*
* <p><b>Typical usage</b>: scalar-predicate filter against the
* generated {@code MeosOpsTBox.overlaps_tbox_tbox} (tier =
* {@code stateless}):
*
* <pre>{@code
* DataStream<TbiePair> in = ...;
* DataStream<TbiePair> overlapping = in.filter(
* new MeosStatelessFilter<>(
* pair -> MeosOpsTBox.overlaps_tbox_tbox(pair.a, pair.b)));
* }</pre>
*
* <p>For int-coded predicates (JMEOS returns {@code int} for some MEOS
* predicates rather than {@code boolean}), use
* {@link #fromIntPredicate}:
*
* <pre>{@code
* DataStream<StboxPair> in = ...;
* DataStream<StboxPair> adj = in.filter(
* MeosStatelessFilter.fromIntPredicate(
* pair -> MeosOpsFreeGeo.adjacent_stbox_stbox(pair.a, pair.b)));
* }</pre>
*
* @param <IN> the record type being filtered
*/
public final class MeosStatelessFilter<IN> extends RichFilterFunction<IN> {

/** Serializable boolean-returning per-event MEOS predicate. */
@FunctionalInterface
public interface MeosPredicate<IN> extends Serializable {
boolean test(IN event) throws Exception;
}

/** Serializable int-returning per-event MEOS predicate (0/1 flag). */
@FunctionalInterface
public interface MeosIntPredicate<IN> extends Serializable {
int test(IN event) throws Exception;
}

private final MeosPredicate<IN> predicate;

public MeosStatelessFilter(MeosPredicate<IN> predicate) {
this.predicate = predicate;
}

/**
* Adapt an {@code int}-returning generated MEOS predicate (treating
* non-zero as {@code true}) into a Flink {@code FilterFunction}.
*/
public static <IN> MeosStatelessFilter<IN> fromIntPredicate(MeosIntPredicate<IN> p) {
return new MeosStatelessFilter<>(event -> p.test(event) != 0);
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MeosWiringRuntime.ensureInitializedOnThread();
}

@Override
public boolean filter(IN event) throws Exception {
// When chained to a legacy source, records are processed on the source's
// emitter thread rather than the thread open() ran on; the ThreadLocal
// guard makes this a cheap no-op after the first call per thread.
MeosWiringRuntime.ensureInitializedOnThread();
return predicate.test(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.mobilitydb.flink.meos.wirings;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

import java.io.Serializable;

/**
* DataStream wiring for the {@code stateless} streaming tier of the
* generated {@code org.mobilitydb.flink.meos.MeosOps*} facades.
*
* <p>Wraps any {@code MeosOps*.f(...)} call whose streaming tier is
* {@code stateless} (per {@code tools/codegen/meos-ops-manifest.json}
* or {@code meos-ops-free-manifest.json}) as a Flink
* {@link MapFunction}. No per-key state is allocated; each event is
* mapped independently. The wrapped call:
*
* <ul>
* <li>does no MEOS-handle state across events (per the
* {@code stateless} tier contract);</li>
* <li>does not touch the time domain (no window required);</li>
* <li>may delegate to MEOS via the bundled JMEOS jar when
* {@code MeosOpsRuntime.MEOS_AVAILABLE} — otherwise the
* generated facade throws {@code UnsupportedOperationException}.</li>
* </ul>
*
* <p><b>Typical usage</b>: register a stateless MEOS predicate / arithmetic
* call as a per-event map step in a DataStream pipeline. Example with
* the generated {@code MeosOpsTBox.overlaps_tbox_tbox} (tier =
* {@code stateless}, per the codegen manifest):
*
* <pre>{@code
* DataStream<TbiePair> in = ...; // (tboxA, tboxB)
* DataStream<Boolean> overlap = in.map(
* new MeosStatelessMap<>(
* pair -> MeosOpsTBox.overlaps_tbox_tbox(pair.a, pair.b)));
* }</pre>
*
* <p><b>Tier coverage</b>: as of the codegen state on the parent PR,
* 804 of the 2,097 generated methods are {@code stateless} (92 OO-
* classified + 712 free-fn). Any of those can be wrapped through this
* single class without per-method boilerplate.
*
* <p><b>Coexistence with {@code berlinmod.MEOSBridge}</b>: this is the
* <i>low-level catalog-shaped</i> wiring; {@code MEOSBridge} stays as
* the <i>high-level query-shaped</i> wiring for the BerlinMOD-9 suite.
* Both share the same {@code MeosOpsRuntime.MEOS_AVAILABLE} discipline.
*
* @param <IN> the input record type
* @param <OUT> the output type returned by the wrapped MEOS call
*/
public final class MeosStatelessMap<IN, OUT> extends RichMapFunction<IN, OUT> {

/**
* Serializable per-event MEOS call. Implementations forward to a
* generated {@code MeosOps*.f(...)} static method, returning the
* Java type that the generated facade exposes.
*/
@FunctionalInterface
public interface MeosCall<IN, OUT> extends Serializable {
OUT apply(IN event) throws Exception;
}

private final MeosCall<IN, OUT> call;

/**
* @param call serializable lambda forwarding to a stateless
* generated MEOS facade method. The lambda must be
* serializable (Java 8+ lambdas implementing a
* {@link Serializable} functional interface are).
*/
public MeosStatelessMap(MeosCall<IN, OUT> call) {
this.call = call;
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// No per-key state in the stateless tier; the only per-operator
// concern is MEOS' per-thread session, initialized on this task thread.
MeosWiringRuntime.ensureInitializedOnThread();
}

@Override
public OUT map(IN event) throws Exception {
// When chained to a legacy source, records are processed on the source's
// emitter thread rather than the thread open() ran on; the ThreadLocal
// guard makes this a cheap no-op after the first call per thread.
MeosWiringRuntime.ensureInitializedOnThread();
return call.apply(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.mobilitydb.flink.meos.wirings;

import functions.GeneratedFunctions;

/**
* Per-thread MEOS initialization for the {@code org.mobilitydb.flink.meos.wirings}
* operators.
*
* <p>MEOS keeps its timezone / session state per OS thread. Each Flink
* subtask runs on its own task thread, so every wiring operator must
* initialize MEOS on that thread from its {@code open()} — the JVM-wide
* probe in {@code MeosOpsRuntime} only covers the thread that first
* touches a facade class (typically the job's main thread), not the task
* threads where the operators actually run.
*
* <p>{@link #ensureInitializedOnThread()} is idempotent per thread (guarded
* by a {@link ThreadLocal}), so it is safe to call from every operator's
* {@code open()} even when operators are chained onto the same thread. It
* installs a no-op error handler so a MEOS-side error surfaces as a thrown
* exception rather than terminating the JVM.
*/
public final class MeosWiringRuntime {

private static final ThreadLocal<Boolean> INITIALIZED =
ThreadLocal.withInitial(() -> Boolean.FALSE);

private MeosWiringRuntime() { /* utility */ }

/** Initialize MEOS on the calling thread exactly once. */
public static void ensureInitializedOnThread() {
if (!INITIALIZED.get()) {
GeneratedFunctions.meos_initialize_error_handler((level, code, message) -> { });
GeneratedFunctions.meos_initialize();
INITIALIZED.set(Boolean.TRUE);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# DataStream wirings for the generated MEOS facades

This package supplies thin, generic Flink-DataStream wrappers around
the generated `org.mobilitydb.flink.meos.MeosOps*` facades, organized
per **streaming tier** (per
`tools/codegen/meos-ops-manifest.json` + `tools/codegen/meos-ops-free-manifest.json`):

| Tier | Wiring class(es) here | Status in this package |
|---|---|---|
| `stateless` | [`MeosStatelessMap`](MeosStatelessMap.java) (generic `MapFunction`) · [`MeosStatelessFilter`](MeosStatelessFilter.java) (generic `FilterFunction`) | ✅ shipped |
| `bounded-state` | `MeosBoundedStateMap` (generic `KeyedProcessFunction` with `ValueState<Pointer>` per key) | next follow-up |
| `windowed` | `MeosWindowedAggregate` (generic `ProcessWindowFunction`) | next follow-up |
| `cross-stream` | `MeosCrossStreamJoin` (generic `KeyedCoProcessFunction` or interval-join) | next follow-up |
| `io-meta` | covered transitively by the stateless wirings (no state, no window) | n/a |
| `sequence-only` | inherently non-streamable — no wiring | n/a |

The wirings are **generic**: each takes a serializable lambda
forwarding to whichever generated `MeosOps*.f(...)` method the adopter
needs. No per-method boilerplate, no per-method registration —
adopters wire the entire ~800-method `stateless` slice through
`MeosStatelessMap` / `MeosStatelessFilter` without touching this
package.

## Why DataStream rather than Table API

The repo's existing pipeline (`berlinmod/`, `aisdata/`) is
DataStream-API only. Sticking to DataStream avoids adding the
~50 MB `flink-table-planner` runtime dependency to the build matrix.
A Table-API-shaped sibling
(`MeosOpsTableCatalogRegistrar` / `MeosScalarUDF` / `MeosAggregateFunction`)
is a clean follow-up if/when the repo adopts Table API for other
reasons.

## How a generated MEOS call becomes a Flink operator

The pattern is the same across all four tiers:

```java
// 1. Pick the generated MeosOps method
// (Javadoc tier marker tells you which wiring to use)
boolean overlap = MeosOpsTBox.overlaps_tbox_tbox(boxA, boxB); // tier = stateless

// 2. Wrap with the matching wiring
MeosStatelessFilter<TboxPair> filter = MeosStatelessFilter.fromIntPredicate(
pair -> MeosOpsTBox.overlaps_tbox_tbox(pair.a, pair.b));

// 3. Apply to the DataStream
DataStream<TboxPair> overlapping = stream.filter(filter);
```

`MEOS_AVAILABLE` is probed once per JVM by `MeosOpsRuntime`'s static
initializer (shared across all `MeosOps*` and `MeosOpsFree*`
facades). When unavailable, every generated method throws
`UnsupportedOperationException` with a clear message — the wiring
layer doesn't have to handle that itself.

## End-to-end runnable demo

[`demo/MeosWiringsDemoJob.java`](demo/MeosWiringsDemoJob.java) walks
through a 3-stage DataStream pipeline using two of the generated
facades wired through `MeosStatelessMap` + `MeosStatelessFilter`:

1. Parse a stream of TBox WKT strings via
`MeosOpsFreeCore.tbox_in` (io-meta, no state).
2. Filter to those overlapping a fixed query box via
`MeosOpsTBox.overlaps_tbox_tbox` (stateless predicate).
3. Serialize each survivor to hex-WKB via
`MeosOpsTBox.tbox_as_hexwkb` (io-meta, no state).

Run with:

```bash
mvn -q exec:java \
-Dexec.mainClass=org.mobilitydb.flink.meos.wirings.demo.MeosWiringsDemoJob \
-Dmobilityflink.meos.enabled=true
```

Output (expected): two `overlapping-tbox-hex` lines (the two input
boxes that overlap the query box), one disjoint box dropped, one
`MeosWirings stateless tier demo` job completion line.

## Coexistence with `berlinmod.MEOSBridge`

`MEOSBridge.java` is the BerlinMOD-specific, hand-written bridge for
the 9-query streaming-form parity matrix — high-level and
query-shaped. The wirings here are low-level and catalog-shaped —
applicable to any of the ~800 stateless or 800 bounded-state
generated facade methods, not just the BerlinMOD-9 subset. Both
share the same `MEOS_AVAILABLE` discipline (`MeosOpsRuntime`).
Loading