-
Notifications
You must be signed in to change notification settings - Fork 2
feat(berlinmod): streaming throughput benchmark on the canonical BerlinMOD corpus (stacks on #14) #15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Ar-mie
merged 2 commits into
MobilityDB:consolidate/kafka-meos-integration
from
estebanzimanyi:consolidate/kafka-benchmark
Jun 19, 2026
+3,389
−194
Merged
feat(berlinmod): streaming throughput benchmark on the canonical BerlinMOD corpus (stacks on #14) #15
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,167 @@ | ||
| #!/usr/bin/env bash | ||
| # | ||
| # build-jmeos.sh — build the JMEOS jar and the native libmeos.so from source and | ||
| # install them locally, so the repository never has to carry the binaries. | ||
| # | ||
| # This is the downstream generation chain for the JVM streaming tools: | ||
| # | ||
| # MobilityDB/MEOS (deliverable PRs) -> JMEOS (FFI facade + jar) -> MobilityKafka | ||
| # | ||
| # (The MEOS-API meos-idl.json step is pre-materialized in the JMEOS branch's | ||
| # committed codegen/input/meos-idl.json, so this script only has to build the | ||
| # two endpoints.) | ||
| # | ||
| # What it does: | ||
| # 1. Clones MobilityDB at the pinned ref and builds libmeos.so (cmake -DMEOS=ON). | ||
| # 2. Clones JMEOS at the pinned ref, drops libmeos.so in, and builds JMEOS.jar. | ||
| # 3. Registers the jar in the local Maven repository via | ||
| # `mvn install:install-file` under the coordinates the kafka-streams-app | ||
| # pom depends on (com.mobilitydb:jmeos:1.4.0 by default). | ||
| # 4. Copies libmeos.so into kafka-streams-app/lib/ for the test/runtime | ||
| # LD_LIBRARY_PATH. | ||
| # | ||
| # After running this once, `cd kafka-streams-app && mvn test` resolves JMEOS as | ||
| # an ordinary dependency — no committed jar/so required. | ||
| # | ||
| # The refs below point at the DELIVERABLE pull requests that provide the surface | ||
| # this project consumes — NOT at an ecosystem-pin tag. A pin is the ecosystem's | ||
| # benchmark / evidence vehicle, not a source of truth, so a binding must never | ||
| # depend on one. The dependency is expressed as the PRs' immutable head commit | ||
| # SHAs (overridable env vars): | ||
| # * MobilityDB PR #1148 — the *_tgeoarr_tgeoarr set-set spatial-join MEOS symbols. | ||
| # * JMEOS PR #25 — the org.mobilitydb.meos facade incl. MeosSetSetJoin. | ||
| # A head SHA is immutable: a later rebase/force-push of either PR creates a NEW | ||
| # SHA and leaves the pinned one unchanged. Once both PRs merge upstream, repoint | ||
| # MOBILITYDB_REF -> MobilityDB/MobilityDB master and JMEOS_REF -> MobilityDB/JMEOS | ||
| # main (or a release tag) — that is the only change needed. | ||
| # | ||
| set -euo pipefail | ||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Pinned sources (override any of these via the environment). | ||
| # --------------------------------------------------------------------------- | ||
| # MobilityDB PR #1148 (set-set spatial join, rebased onto #1162 clean/geo so the | ||
| # set-set join inherits the geodetic-disjoint surface) — immutable head SHA. | ||
| MOBILITYDB_REPO="${MOBILITYDB_REPO:-https://github.com/estebanzimanyi/MobilityDB.git}" | ||
| MOBILITYDB_REF="${MOBILITYDB_REF:-fab7025b24f9b2b45db26d6bd0a3118058d5b55c}" # PR #1148 head | ||
|
|
||
| # JMEOS PR #25 (org.mobilitydb.meos facade + MeosSetSetJoin) — immutable head SHA. | ||
| JMEOS_REPO="${JMEOS_REPO:-https://github.com/estebanzimanyi/JMEOS.git}" | ||
| JMEOS_REF="${JMEOS_REF:-f921d8608f18574a5a824b837af1a6b82c985fc2}" # PR #25 head | ||
|
|
||
| # Maven coordinates the jar is installed under (must match kafka-streams-app/pom.xml). | ||
| JMEOS_GROUP_ID="${JMEOS_GROUP_ID:-com.mobilitydb}" | ||
| JMEOS_ARTIFACT_ID="${JMEOS_ARTIFACT_ID:-jmeos}" | ||
| JMEOS_VERSION="${JMEOS_VERSION:-1.4.0}" | ||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Layout. | ||
| # --------------------------------------------------------------------------- | ||
| SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" | ||
| APP_DIR="${SCRIPT_DIR}/kafka-streams-app" | ||
| WORK_DIR="${WORK_DIR:-${SCRIPT_DIR}/.build-jmeos}" | ||
| JOBS="${JOBS:-$(nproc 2>/dev/null || echo 4)}" | ||
|
|
||
| log() { printf '\n\033[1;34m==>\033[0m %s\n' "$*"; } | ||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Preconditions. | ||
| # --------------------------------------------------------------------------- | ||
| for tool in git cmake make mvn; do | ||
| command -v "$tool" >/dev/null 2>&1 || { echo "error: '$tool' is required but not on PATH" >&2; exit 1; } | ||
| done | ||
|
|
||
| mkdir -p "${WORK_DIR}" | ||
|
|
||
| # clone_at <repo-url> <ref> <dest> | ||
| # Clones (or reuses) <dest> and checks out the exact <ref>. <ref> may be a tag, | ||
| # branch or commit SHA; PR-head SHAs are fetched from the pull ref namespace if | ||
| # they are not reachable from the default branches. | ||
| clone_at() { | ||
| local repo="$1" ref="$2" dest="$3" | ||
| if [ ! -d "${dest}/.git" ]; then | ||
| log "Cloning ${repo}" | ||
| git clone "${repo}" "${dest}" | ||
| fi | ||
| git -C "${dest}" fetch --quiet --tags origin | ||
| if ! git -C "${dest}" cat-file -e "${ref}^{commit}" 2>/dev/null; then | ||
| # SHA only reachable through an open PR — fetch every PR head, then retry. | ||
| git -C "${dest}" fetch --quiet origin '+refs/pull/*/head:refs/remotes/origin/pr/*' || true | ||
| fi | ||
| log "Checking out ${ref}" | ||
| git -C "${dest}" -c advice.detachedHead=false checkout --quiet "${ref}" | ||
| } | ||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # 1. Build libmeos.so from MobilityDB. | ||
| # --------------------------------------------------------------------------- | ||
| MDB_DIR="${WORK_DIR}/MobilityDB" | ||
| clone_at "${MOBILITYDB_REPO}" "${MOBILITYDB_REF}" "${MDB_DIR}" | ||
|
|
||
| # Enable the MEOS type families the streaming app exercises: circular buffers, | ||
| # network points (default ON) and geoposes (which auto-enables rigid geometries). | ||
| # The facade smoke tests link these symbols, so they must be in libmeos.so. | ||
| # H3 and POINTCLOUD are left OFF — the app does not use them and they require | ||
| # extra system libraries (libh3, libpointcloud); enable them via MEOS_CMAKE_ARGS | ||
| # if a downstream consumer ever needs them. | ||
| MEOS_CMAKE_ARGS="${MEOS_CMAKE_ARGS:--DCBUFFER=ON -DNPOINT=ON -DPOSE=ON}" | ||
|
|
||
| log "Building libmeos.so (MEOS=ON ${MEOS_CMAKE_ARGS})" | ||
| rm -rf "${MDB_DIR}/build" | ||
| cmake -S "${MDB_DIR}" -B "${MDB_DIR}/build" -DMEOS=ON ${MEOS_CMAKE_ARGS} >/dev/null | ||
| cmake --build "${MDB_DIR}/build" --target meos -j "${JOBS}" | ||
|
|
||
| LIBMEOS_SO="$(find "${MDB_DIR}/build" -name 'libmeos.so' -print -quit)" | ||
| [ -n "${LIBMEOS_SO}" ] || { echo "error: libmeos.so not produced by the MEOS build" >&2; exit 1; } | ||
| log "Built ${LIBMEOS_SO}" | ||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # 2. Build JMEOS.jar against that libmeos.so. | ||
| # --------------------------------------------------------------------------- | ||
| JMEOS_DIR="${WORK_DIR}/JMEOS" | ||
| clone_at "${JMEOS_REPO}" "${JMEOS_REF}" "${JMEOS_DIR}" | ||
|
|
||
| # JMEOS' build bundles src/libmeos.so into the jar and JarLibraryLoader extracts it. | ||
| cp -f "${LIBMEOS_SO}" "${JMEOS_DIR}/jmeos-core/src/libmeos.so" | ||
|
|
||
| log "Building JMEOS.jar" | ||
| # FunctionsGenerator lives in the codegen module, which jmeos-core does not | ||
| # declare as a Maven dependency — so '-am' will not build it. Compile it first | ||
| # so jmeos-core's build-time facade generation can find it. Use | ||
| # 'maven.test.skip' (not 'skipTests'): the jmeos-core pom hardcodes | ||
| # <skipTests>false</skipTests>, which overrides -DskipTests but not this. | ||
| mvn -f "${JMEOS_DIR}/pom.xml" -q -pl codegen compile | ||
| mvn -f "${JMEOS_DIR}/pom.xml" -q -pl jmeos-core -am -Dmaven.test.skip=true package | ||
|
|
||
| JMEOS_JAR="${JMEOS_DIR}/jar/JMEOS.jar" | ||
| [ -f "${JMEOS_JAR}" ] || { echo "error: ${JMEOS_JAR} was not produced" >&2; exit 1; } | ||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # 3. Install the jar into the local Maven repository. | ||
| # --------------------------------------------------------------------------- | ||
| log "Installing ${JMEOS_GROUP_ID}:${JMEOS_ARTIFACT_ID}:${JMEOS_VERSION} into the local Maven repo" | ||
| mvn -q install:install-file \ | ||
| -Dfile="${JMEOS_JAR}" \ | ||
| -DgroupId="${JMEOS_GROUP_ID}" \ | ||
| -DartifactId="${JMEOS_ARTIFACT_ID}" \ | ||
| -Dversion="${JMEOS_VERSION}" \ | ||
| -Dpackaging=jar | ||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # 4. Stage libmeos.so for the kafka-streams-app runtime (LD_LIBRARY_PATH). | ||
| # --------------------------------------------------------------------------- | ||
| mkdir -p "${APP_DIR}/lib" | ||
| cp -f "${LIBMEOS_SO}" "${APP_DIR}/lib/libmeos.so" | ||
|
|
||
| log "Done." | ||
| cat <<EOF | ||
|
|
||
| Installed jar : ${JMEOS_GROUP_ID}:${JMEOS_ARTIFACT_ID}:${JMEOS_VERSION} | ||
| Native library: ${APP_DIR}/lib/libmeos.so | ||
|
|
||
| Build and test the app with: | ||
|
|
||
| cd ${APP_DIR} | ||
| mvn test | ||
|
|
||
| EOF |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| # BerlinMOD streaming benchmark (Kafka Streams) | ||
|
|
||
| The Kafka-Streams counterpart of the MobilityFlink BerlinMOD benchmark covers | ||
| the 27 BerlinMOD-9 × 3-form cells (Q1–Q9 × continuous / windowed / snapshot) | ||
| with two harnesses, both over the same corpus and corpus-derived parameters, and | ||
| with the spatial predicates evaluating through MEOS (see | ||
| [`MEOSBridge`](../src/main/java/berlinmod/MEOSBridge.java)): | ||
|
|
||
| - [`BerlinMODBenchmark`](../src/main/java/berlinmod/BerlinMODBenchmark.java) runs | ||
| each cell in isolation through a [`TopologyTestDriver`](../src/main/java/berlinmod/BerlinMODTopology.java) | ||
| and reads its output cardinality — an in-process **correctness** harness. | ||
| - [`EmbeddedBrokerBenchmark`](../src/test/java/berlinmod/EmbeddedBrokerBenchmark.java) | ||
| runs each cell as a real [`KafkaStreams`](../src/main/java/berlinmod/BerlinMODTopology.java) | ||
| application against an in-process `EmbeddedKafkaCluster` (a genuine | ||
| `KafkaServer` over the loopback network) — the **throughput** harness, the | ||
| Flink-comparable analog of MobilityFlink's per-cell jobs. | ||
|
|
||
| ## Corpus and parameters (regular with MobilityFlink) | ||
|
|
||
| The corpus is the real BerlinMOD instants (`--csv`, reprojected EPSG:3857→4326 | ||
| through MEOS by [`BerlinMODCorpus`](../src/main/java/berlinmod/BerlinMODCorpus.java)) | ||
| or a synthetic corpus. The per-query parameters (point `P`, region box, road | ||
| segment, points of interest, target vehicle ids) **and** the window/tick | ||
| granularity are derived from the corpus via `BerlinMODCorpus.derive` — the same | ||
| mechanism MobilityFlink uses — and threaded through `BerlinMODTopology.build(Params)`, | ||
| so the topology auto-scales to the corpus span instead of carrying a fixed | ||
| window/tick. The two stream bindings share one mechanism. | ||
|
|
||
| ## Throughput harness | ||
|
|
||
| Each cell runs against its own fresh `EmbeddedKafkaCluster`, the true analog of | ||
| MobilityFlink's independent per-cell jobs: the corpus is produced once into the | ||
| input topic, the cell runs as a single-threaded `KafkaStreams` application, and | ||
| throughput is the events consumed divided by the wall-clock from streams start | ||
| until the application has read the whole input topic (its own | ||
| `records-consumed-total` metric reaches the input end offset) and its output has | ||
| gone idle. The trailing settle time is excluded from the wall; each consumed | ||
| record runs the cell's MEOS predicate, so this is the steady-state per-event | ||
| processing rate, directly comparable to the MobilityFlink figures. | ||
|
|
||
| Run from `kafka-streams-app/` after `../build-jmeos.sh` (which installs JMEOS into | ||
| the local Maven repository and stages `libmeos.so` under `lib/`). The test-scope | ||
| classpath that `mvn` reconstructs already carries JMEOS and the embedded broker, so | ||
| no jar is referenced by path: | ||
|
|
||
| ``` | ||
| CP=$(mvn -q dependency:build-classpath -DincludeScope=test -Dmdep.outputFile=/dev/stdout | tail -1) | ||
| LD_LIBRARY_PATH=lib java -Dorg.slf4j.simpleLogger.defaultLogLevel=warn \ | ||
| -cp target/classes:target/test-classes:$CP \ | ||
| berlinmod.EmbeddedBrokerBenchmark --csv <berlinmod_instants.csv> [--max N] [--only Q3-continuous] | ||
| ``` | ||
|
|
||
| ## Figures | ||
|
|
||
| Real BerlinMOD corpus (216,075 instants, 5 vehicles, ~11 days, EPSG:3857), | ||
| single-broker `EmbeddedKafkaCluster`, one stream thread per cell, Java 21, | ||
| 16-core host; libmeos built with `-DMEOS/CBUFFER/NPOINT/POSE/RGEO=ON`. | ||
|
|
||
| | Cell | Events in | Output rows | Wall (ms) | Throughput (ev/s) | | ||
| |---|---:|---:|---:|---:| | ||
| | Q1-continuous | 216075 | 5 | 1899 | 113,785 | | ||
| | Q1-snapshot | 216075 | 703 | 1701 | 127,029 | | ||
| | Q1-windowed | 216075 | 86 | 1650 | 130,956 | | ||
| | Q2-continuous | 216075 | 61170 | 1289 | 167,631 | | ||
| | Q2-snapshot | 216075 | 140 | 1683 | 128,388 | | ||
| | Q2-windowed | 216075 | 50 | 1704 | 126,806 | | ||
| | Q3-continuous | 216075 | 216075 | 4618 | 46,790 | | ||
| | Q3-snapshot | 216075 | 97 | 2607 | 82,883 | | ||
| | Q3-windowed | 216075 | 50 | 4188 | 51,594 | | ||
| | Q4-continuous | 216075 | 62 | 9356 | 23,095 | | ||
| | Q4-snapshot | 216075 | 4685 | 10499 | 20,581 | | ||
| | Q4-windowed | 216075 | 98 | 9741 | 22,182 | | ||
| | Q5-continuous | 216075 | 60577 | 22889 | 9,440 | | ||
| | Q5-snapshot | 216075 | 34 | 2006 | 107,715 | | ||
| | Q5-windowed | 216075 | 6 | 2896 | 74,612 | | ||
| | Q6-continuous | 216075 | 216075 | 8708 | 24,814 | | ||
| | Q6-snapshot | 216075 | 698 | 7258 | 29,771 | | ||
| | Q6-windowed | 216075 | 203 | 6268 | 34,473 | | ||
| | Q7-continuous | 216075 | 5 | 4321 | 50,006 | | ||
| | Q7-snapshot | 216075 | 632 | 7487 | 28,860 | | ||
| | Q7-windowed | 216075 | 53 | 11625 | 18,587 | | ||
| | Q8-continuous | 216075 | 216075 | 4807 | 44,950 | | ||
| | Q8-snapshot | 216075 | 281 | 2825 | 76,487 | | ||
| | Q8-windowed | 216075 | 77 | 5946 | 36,340 | | ||
| | Q9-continuous | 216075 | 107870 | 4561 | 47,375 | | ||
| | Q9-snapshot | 216075 | 140 | 2281 | 94,729 | | ||
| | Q9-windowed | 216075 | 22 | 2384 | 90,636 | | ||
|
|
||
| The per-event MEOS-predicate cells that emit one row per input event | ||
| (Q3/Q8/Q9-continuous, 216,075 output rows through `edwithin_tgeo_geo` / | ||
| `eintersects_tgeo_geo`) sustain 45,000–47,000 ev/s. In the cross-platform | ||
| streaming catalog these cells run below MobilityFlink's in-JVM mini-cluster on | ||
| the same corpus, because Kafka Streams routes every record through the broker; | ||
| the per-cell shape matches across both engines. Q5-continuous is the O(V²) | ||
| all-pairs-meeting outlier (9,440 ev/s). Non-spatial cells (Q1/Q2) reach | ||
| 110,000–168,000 ev/s. The `TopologyTestDriver` correctness harness | ||
| ([`BerlinMODBenchmark`](../src/main/java/berlinmod/BerlinMODBenchmark.java)) | ||
| reports output cardinality and correctness, not throughput: its per-event | ||
| bookkeeping dominates wall-clock, so a no-predicate cell already runs far below a | ||
| real broker's rate. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file references
jar/JMEOS.jardirectly but should either use~/.m2or reconstruct the classpath viamvnThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
docs/benchmark.mdno longer referencesjar/JMEOS.jarby path. The run command reconstructs the classpath viamvn dependency:build-classpath— which now resolves JMEOS from the local Maven repo (~/.m2/.../com/mobilitydb/jmeos/1.4.0/jmeos-1.4.0.jar, installed bybuild-jmeos.sh) — so the-cpis justtarget/classes:target/test-classes:$CP, andLD_LIBRARY_PATH=libpoints at thelibmeos.sothe script stages. Verified the documented command loads JMEOS and starts the benchmark (no committed jar).