feat(berlinmod): full BerlinMOD-9 × 3-form parity matrix on Kafka Streams (27/27 cells)#1
Conversation
44a2fb2 to
3ec6d58
Compare
3ec6d58 to
8e702c4
Compare
9369347 to
85d8de6
Compare
85d8de6 to
97a8a66
Compare
97a8a66 to
4ce7999
Compare
4ce7999 to
9fa2096
Compare
|
Consolidated into #13 (consolidate/kafka-streaming-implementation). |
There was a problem hiding this comment.
Maybe it would be better to update the versions of slf4j, jackson and junit to newer versions
kafka-streams-test-utils dependency should be in a test scope in order not to pollute the JAR : <scope>test</scope>
There was a problem hiding this comment.
Done. Bumped slf4j 1.7.30 → 2.0.16, jackson 2.14.3 → 2.18.2, junit 5.8.2 → 5.11.4, and moved kafka-streams-test-utils to test scope (verified absent from the shaded JAR).
There was a problem hiding this comment.
This class seems poorly named
The name suggests that it only tests the Q1 when it actually tests every query
Maybe a better name would be something like BerlinMODFullMatrixTest
There was a problem hiding this comment.
Agreed. Renamed to BerlinMODFullMatrixTest and moved it into src/test/java as a real JUnit 5 test that asserts the output-record count for all 27 cells (every continuous / windowed / snapshot topic), run via mvn test. It is no longer the shaded-JAR main class.
Ar-mie
left a comment
There was a problem hiding this comment.
I made some comments on the pom.xml and the BerlinMODQ1LocalTest.java files
9fa2096 to
346999a
Compare
…eams (27/27) All 27 cells of the BerlinMOD-9 × 3-form parity matrix on MobilityKafka, matching MobilityFlink MobilityDB#3's coverage on the Kafka-Streams runtime. Continuous form (9): per-event emission via record-by-record dispatch. Windowed form (9): STREAM_TIME punctuator at WINDOW_SIZE_MILLIS (10_000 ms) emitting closed windows. Snapshot form (9): STREAM_TIME punctuator at SNAPSHOT_TICK_MILLIS (5_000 ms) emitting per-tick state. State patterns: - Stateless filter (Q2-c, Q3-c, Q8-c) - Single keyed flag (Q1-c, Q1-s, Q4-c) - Single-key state (Q2-s, Q9-s) - Per-vehicle accumulator (Q6-c, Q6-s) - Per-(vehicle, POI) keyed state (Q7-c, Q7-s) - Cross-vehicle shared via selectKey(0) (Q5-c, Q3-s, Q5-s, Q7-s, Q8-s) - Paired shared via selectKey(0) (Q9-c, Q4-s, Q9-s) - winStart-keyed comma-separated vehicleId set (Q1-w, Q3-w, Q8-w) - winStart-keyed encoded last-known per vehicle (Q2-w, Q5-w, Q9-w) - winStart-keyed per-vehicle accumulator (Q6-w) - winStart-keyed per-vehicle entries log (Q4-w, Q7-w) Output counts (TopologyTestDriver, 21-event sorted-by-event-time corpus + 2 sentinel records at t=15001 and t=20001): Q | continuous | windowed | snapshot ---|------------|----------|--------- Q1 | 3 | 2 | 13 Q2 | 7 | 2 | 4 Q3 | 21 | 2 | 9 Q4 | 1 | 2 | 5 Q5 | 19 | 2 | 4 Q6 | 21 | 6 | 13 Q7 | 3 | 6 | 13 Q8 | 21 | 2 | 9 Q9 | 13 | 2 | 4 All 9 windowed cells emit 2 lines per Q (Q1/Q2/Q3/Q4/Q5/Q8/Q9) or 6 lines per Q (Q6/Q7 — one per vehicle × 2 windows for per-vehicle outputs). Continuous and snapshot count differences vs MobilityFlink reflect Kafka Streams' STREAM_TIME punctuator semantics: fires on stream-time advance with state-at-fire-moment, multi-interval jumps coalesced; vs Flink's bounded source flushing all keyed timers with final state at +infty. Dual-sentinel pattern in LocalTest steps the punctuator through the desired tick boundaries. Bump-isolation: zero JMEOS calls, zero MEOS-C calls, zero PyMEOS dependency. Pure Kafka Streams + Jackson + Java. Spatial predicates use Haversine / SegmentDistance / point-in-box, marked TODO(meos) for JMEOS-bridge migration.
346999a to
5e64f44
Compare
Draft. BerlinMOD-9 × 3-form parity-matrix work on MobilityKafka (Kafka-Streams runtime). 27 of 27 cells = 100 % of the MobilityKafka parity-matrix row, matching MobilityFlink #3.
Coverage
Module contents
kafka-streams-app/— Maven project (Java 21, Kafka Streams 3.6.0).Per-form processors (27):
Q<N>ContinuousProcessor,Q<N>WindowedProcessor,Q<N>SnapshotProcessorfor N ∈ {1..9}. Each Q-Windowed/Q-Snapshot processor schedules aPunctuationType.STREAM_TIMEcallback at the form-appropriate interval (10 000 ms for windowed, 5 000 ms for snapshot).Shared infrastructure:
BerlinMODTrip(data class),BerlinMODTripSerde(JSON Serde),Haversine(great-circle distance, m),SegmentDistance(planar projection point-to-segment, m),PointOfInterest(Q7 record),BerlinMODTopology(unified topology fanning to per-Q-form output topics),BerlinMODQ1LocalTest(TopologyTestDriver-based local driver).Output counts (TopologyTestDriver, sorted-event-time corpus + dual sentinels)
Two windows close in the test (
[T0+0, T0+10000)and[T0+10000, T0+20000)); Q1/Q2/Q3/Q4/Q5/Q8/Q9 emit 1 line per closed window; Q6/Q7 emit 1 line per vehicle per closed window.Snapshot semantics vs MobilityFlink #3
Kafka Streams'
STREAM_TIMEpunctuator fires on stream-time advance with state-at-fire-moment, and coalesces multi-interval stream-time jumps into a single fire at the current stream-time. Flink's bounded-source watermark jumps to+∞at source-close and flushes all keyed timers with final state.The LocalTest pipes two sentinel records (at t=T0+15001 and t=T0+20001) to step the punctuator through the desired tick boundaries one at a time. The first event's initial-catchup fire also produces a tick at T=T0 — a Kafka Streams runtime feature with no Flink-bounded-source analogue.
State patterns covered
selectKey(0)selectKey(0)Bump-isolation
Zero MEOS surface anywhere:
libmeos.solinkageSpatial predicates use pure-Java
Haversine/ planarSegmentDistance/ point-in-box, markedTODO(meos)at each call site for JMEOS-bridge migration after the bump settles.Build verification
The full 27-cell matrix runs as a JUnit 5 regression test,
berlinmod.BerlinMODFullMatrixTest(insrc/test/java), which asserts theper-
Q<N>-form output-record count for every continuous / windowed / snapshottopic.
kafka-streams-test-utilsistest-scoped, so it never enters theshaded JAR.
Single squashed commit, no AI attribution,
target/ignored by.gitignore.Companion artefacts