Skip to content

feat(berlinmod): full BerlinMOD-9 × 3-form parity matrix on Kafka Streams (27/27 cells)#1

Open
estebanzimanyi wants to merge 1 commit into
MobilityDB:mainfrom
estebanzimanyi:feat/berlinmod-q1-scaffold
Open

feat(berlinmod): full BerlinMOD-9 × 3-form parity matrix on Kafka Streams (27/27 cells)#1
estebanzimanyi wants to merge 1 commit into
MobilityDB:mainfrom
estebanzimanyi:feat/berlinmod-q1-scaffold

Conversation

@estebanzimanyi

@estebanzimanyi estebanzimanyi commented May 20, 2026

Copy link
Copy Markdown
Member

Draft. BerlinMOD-9 × 3-form parity-matrix work on MobilityKafka (Kafka-Streams runtime). 27 of 27 cells = 100 % of the MobilityKafka parity-matrix row, matching MobilityFlink #3.

Coverage

Q Topic Continuous Windowed Snapshot
Q1 "which vehicles have appeared?"
Q2 "where is vehicle X at time T?"
Q3 "vehicles within d of P at time T?"
Q4 "vehicles entered region R?"
Q5 "pairs of vehicles meeting near P"
Q6 "cumulative distance per vehicle"
Q7 "first passage through POIs"
Q8 "vehicles close to road segment"
Q9 "X-Y distance at time T"

Module contents

kafka-streams-app/ — Maven project (Java 21, Kafka Streams 3.6.0).

Per-form processors (27): Q<N>ContinuousProcessor, Q<N>WindowedProcessor, Q<N>SnapshotProcessor for N ∈ {1..9}. Each Q-Windowed/Q-Snapshot processor schedules a PunctuationType.STREAM_TIME callback at the form-appropriate interval (10 000 ms for windowed, 5 000 ms for snapshot).

Shared infrastructure: BerlinMODTrip (data class), BerlinMODTripSerde (JSON Serde), Haversine (great-circle distance, m), SegmentDistance (planar projection point-to-segment, m), PointOfInterest (Q7 record), BerlinMODTopology (unified topology fanning to per-Q-form output topics), BerlinMODQ1LocalTest (TopologyTestDriver-based local driver).

Output counts (TopologyTestDriver, sorted-event-time corpus + dual sentinels)

Q Continuous Windowed Snapshot
Q1 3 2 13
Q2 7 2 4
Q3 21 2 9
Q4 1 2 5
Q5 19 2 4
Q6 21 6 13
Q7 3 6 13
Q8 21 2 9
Q9 13 2 4

Two windows close in the test ([T0+0, T0+10000) and [T0+10000, T0+20000)); Q1/Q2/Q3/Q4/Q5/Q8/Q9 emit 1 line per closed window; Q6/Q7 emit 1 line per vehicle per closed window.

Snapshot semantics vs MobilityFlink #3

Kafka Streams' STREAM_TIME punctuator fires on stream-time advance with state-at-fire-moment, and coalesces multi-interval stream-time jumps into a single fire at the current stream-time. Flink's bounded-source watermark jumps to +∞ at source-close and flushes all keyed timers with final state.

The LocalTest pipes two sentinel records (at t=T0+15001 and t=T0+20001) to step the punctuator through the desired tick boundaries one at a time. The first event's initial-catchup fire also produces a tick at T=T0 — a Kafka Streams runtime feature with no Flink-bounded-source analogue.

State patterns covered

Pattern Cells
Stateless filter / predicate Q2-c, Q3-c, Q8-c
Single keyed flag Q1-c, Q1-s, Q4-c
Single-key state Q2-s, Q9-s
Per-vehicle accumulator Q6-c, Q6-s
Per-(vehicle, POI) keyed state Q7-c, Q7-s
Cross-vehicle shared via selectKey(0) Q5-c, Q3-s, Q5-s, Q7-s, Q8-s
Paired shared via selectKey(0) Q9-c, Q4-s, Q9-s
winStart-keyed CSV-vehicleId set Q1-w, Q3-w, Q8-w
winStart-keyed encoded last-known per vehicle Q2-w, Q5-w, Q9-w
winStart-keyed per-vehicle accumulator Q6-w
winStart-keyed per-vehicle entries log Q4-w, Q7-w

Bump-isolation

Zero MEOS surface anywhere:

  • No JMEOS jar dependency
  • No libmeos.so linkage
  • No PyMEOS-side code
  • No file collision with the in-flight MEOS 1.4 bump's scope

Spatial predicates use pure-Java Haversine / planar SegmentDistance / point-in-box, marked TODO(meos) at each call site for JMEOS-bridge migration after the bump settles.

Build verification

mvn clean test
... Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
... BUILD SUCCESS

The full 27-cell matrix runs as a JUnit 5 regression test,
berlinmod.BerlinMODFullMatrixTest (in src/test/java), which asserts the
per-Q<N>-form output-record count for every continuous / windowed / snapshot
topic. kafka-streams-test-utils is test-scoped, so it never enters the
shaded JAR.

Single squashed commit, no AI attribution, target/ ignored by .gitignore.

Companion artefacts

@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q1-scaffold branch 3 times, most recently from 44a2fb2 to 3ec6d58 Compare May 20, 2026 21:32
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod-q1): scaffold the Q1 continuous-form cell on Kafka Streams feat(berlinmod): scaffold Q1 + Q2 + Q3 continuous-form cells on Kafka Streams (3/27 cells) May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q1-scaffold branch from 3ec6d58 to 8e702c4 Compare May 20, 2026 21:40
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod): scaffold Q1 + Q2 + Q3 continuous-form cells on Kafka Streams (3/27 cells) feat(berlinmod): Q1..Q9 continuous-form cells on Kafka Streams (9/27 cells) May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q1-scaffold branch 2 times, most recently from 9369347 to 85d8de6 Compare May 20, 2026 22:03
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod): Q1..Q9 continuous-form cells on Kafka Streams (9/27 cells) feat(berlinmod): Q1..Q9 continuous + Q1..Q9 snapshot cells on Kafka Streams (18/27 cells) May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q1-scaffold branch from 85d8de6 to 97a8a66 Compare May 20, 2026 22:10
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod): Q1..Q9 continuous + Q1..Q9 snapshot cells on Kafka Streams (18/27 cells) feat(berlinmod): Q1..Q9 continuous + Q1/Q3/Q8 windowed + Q1..Q9 snapshot on Kafka Streams (21/27 cells) May 20, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q1-scaffold branch from 97a8a66 to 4ce7999 Compare May 21, 2026 05:42
@estebanzimanyi estebanzimanyi changed the title feat(berlinmod): Q1..Q9 continuous + Q1/Q3/Q8 windowed + Q1..Q9 snapshot on Kafka Streams (21/27 cells) feat(berlinmod): full BerlinMOD-9 × 3-form parity matrix on Kafka Streams (27/27 cells) May 21, 2026
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q1-scaffold branch from 4ce7999 to 9fa2096 Compare May 21, 2026 06:38
@estebanzimanyi

Copy link
Copy Markdown
Member Author

Consolidated into #13 (consolidate/kafka-streaming-implementation).

@estebanzimanyi estebanzimanyi marked this pull request as ready for review June 10, 2026 15:22
Comment thread kafka-streams-app/pom.xml

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be better to update the versions of slf4j, jackson and junit to newer versions

kafka-streams-test-utils dependency should be in a test scope in order not to pollute the JAR : <scope>test</scope>

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Bumped slf4j 1.7.30 → 2.0.16, jackson 2.14.3 → 2.18.2, junit 5.8.2 → 5.11.4, and moved kafka-streams-test-utils to test scope (verified absent from the shaded JAR).

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class seems poorly named
The name suggests that it only tests the Q1 when it actually tests every query
Maybe a better name would be something like BerlinMODFullMatrixTest

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Renamed to BerlinMODFullMatrixTest and moved it into src/test/java as a real JUnit 5 test that asserts the output-record count for all 27 cells (every continuous / windowed / snapshot topic), run via mvn test. It is no longer the shaded-JAR main class.

@Ar-mie Ar-mie left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some comments on the pom.xml and the BerlinMODQ1LocalTest.java files

…eams (27/27)

All 27 cells of the BerlinMOD-9 × 3-form parity matrix on MobilityKafka,
matching MobilityFlink MobilityDB#3's coverage on the Kafka-Streams runtime.

Continuous form (9): per-event emission via record-by-record dispatch.
Windowed form (9):   STREAM_TIME punctuator at WINDOW_SIZE_MILLIS
                     (10_000 ms) emitting closed windows.
Snapshot form (9):   STREAM_TIME punctuator at SNAPSHOT_TICK_MILLIS
                     (5_000 ms) emitting per-tick state.

State patterns:
  - Stateless filter (Q2-c, Q3-c, Q8-c)
  - Single keyed flag (Q1-c, Q1-s, Q4-c)
  - Single-key state (Q2-s, Q9-s)
  - Per-vehicle accumulator (Q6-c, Q6-s)
  - Per-(vehicle, POI) keyed state (Q7-c, Q7-s)
  - Cross-vehicle shared via selectKey(0) (Q5-c, Q3-s, Q5-s, Q7-s, Q8-s)
  - Paired shared via selectKey(0) (Q9-c, Q4-s, Q9-s)
  - winStart-keyed comma-separated vehicleId set (Q1-w, Q3-w, Q8-w)
  - winStart-keyed encoded last-known per vehicle (Q2-w, Q5-w, Q9-w)
  - winStart-keyed per-vehicle accumulator (Q6-w)
  - winStart-keyed per-vehicle entries log (Q4-w, Q7-w)

Output counts (TopologyTestDriver, 21-event sorted-by-event-time
corpus + 2 sentinel records at t=15001 and t=20001):

  Q  | continuous | windowed | snapshot
  ---|------------|----------|---------
  Q1 |          3 |        2 |       13
  Q2 |          7 |        2 |        4
  Q3 |         21 |        2 |        9
  Q4 |          1 |        2 |        5
  Q5 |         19 |        2 |        4
  Q6 |         21 |        6 |       13
  Q7 |          3 |        6 |       13
  Q8 |         21 |        2 |        9
  Q9 |         13 |        2 |        4

All 9 windowed cells emit 2 lines per Q (Q1/Q2/Q3/Q4/Q5/Q8/Q9) or 6
lines per Q (Q6/Q7 — one per vehicle × 2 windows for per-vehicle
outputs). Continuous and snapshot count differences vs MobilityFlink
reflect Kafka Streams' STREAM_TIME punctuator semantics: fires on
stream-time advance with state-at-fire-moment, multi-interval jumps
coalesced; vs Flink's bounded source flushing all keyed timers with
final state at +infty. Dual-sentinel pattern in LocalTest steps the
punctuator through the desired tick boundaries.

Bump-isolation: zero JMEOS calls, zero MEOS-C calls, zero PyMEOS
dependency. Pure Kafka Streams + Jackson + Java. Spatial predicates
use Haversine / SegmentDistance / point-in-box, marked TODO(meos)
for JMEOS-bridge migration.
@estebanzimanyi estebanzimanyi force-pushed the feat/berlinmod-q1-scaffold branch from 346999a to 5e64f44 Compare June 11, 2026 17:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants