Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Maven build artefacts
target/

# JMEOS native dependency — generated by build-jmeos.sh, never committed
.build-jmeos/
kafka-streams-app/jar/
kafka-streams-app/lib/
*.jar
*.so

# IDE
.idea/
.vscode/
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,24 @@ Spatial predicates today use pure-Java great-circle (`Haversine`) and planar seg

## Build and run

### MEOS native dependency

The spatial predicates route through MEOS via the [JMEOS](https://github.com/MobilityDB/JMEOS)
bridge, so the build needs the JMEOS jar and the native `libmeos.so`. Neither is
committed to this repository — generate them from source with the helper script,
which clones MobilityDB and JMEOS at pinned, immutable refs, builds `libmeos.so`,
builds the jar, installs the jar into the local Maven repository, and stages
`libmeos.so` for the runtime:

```
./build-jmeos.sh
```

Run it once (re-run it only to bump the pinned MobilityDB/JMEOS refs at the top of
the script). After it succeeds, JMEOS resolves as an ordinary Maven dependency.

### Build the app

```
cd kafka-streams-app
mvn -q clean package -DskipTests
Expand Down
167 changes: 167 additions & 0 deletions build-jmeos.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#!/usr/bin/env bash
#
# build-jmeos.sh — build the JMEOS jar and the native libmeos.so from source and
# install them locally, so the repository never has to carry the binaries.
#
# This is the downstream generation chain for the JVM streaming tools:
#
# MobilityDB/MEOS (deliverable PRs) -> JMEOS (FFI facade + jar) -> MobilityKafka
#
# (The MEOS-API meos-idl.json step is pre-materialized in the JMEOS branch's
# committed codegen/input/meos-idl.json, so this script only has to build the
# two endpoints.)
#
# What it does:
# 1. Clones MobilityDB at the pinned ref and builds libmeos.so (cmake -DMEOS=ON).
# 2. Clones JMEOS at the pinned ref, drops libmeos.so in, and builds JMEOS.jar.
# 3. Registers the jar in the local Maven repository via
# `mvn install:install-file` under the coordinates the kafka-streams-app
# pom depends on (com.mobilitydb:jmeos:1.4.0 by default).
# 4. Copies libmeos.so into kafka-streams-app/lib/ for the test/runtime
# LD_LIBRARY_PATH.
#
# After running this once, `cd kafka-streams-app && mvn test` resolves JMEOS as
# an ordinary dependency — no committed jar/so required.
#
# The refs below point at the DELIVERABLE pull requests that provide the surface
# this project consumes — NOT at an ecosystem-pin tag. A pin is the ecosystem's
# benchmark / evidence vehicle, not a source of truth, so a binding must never
# depend on one. The dependency is expressed as the PRs' immutable head commit
# SHAs (overridable env vars):
# * MobilityDB PR #1148 — the *_tgeoarr_tgeoarr set-set spatial-join MEOS symbols.
# * JMEOS PR #25 — the org.mobilitydb.meos facade incl. MeosSetSetJoin.
# A head SHA is immutable: a later rebase/force-push of either PR creates a NEW
# SHA and leaves the pinned one unchanged. Once both PRs merge upstream, repoint
# MOBILITYDB_REF -> MobilityDB/MobilityDB master and JMEOS_REF -> MobilityDB/JMEOS
# main (or a release tag) — that is the only change needed.
#
set -euo pipefail

# ---------------------------------------------------------------------------
# Pinned sources (override any of these via the environment).
# ---------------------------------------------------------------------------
# MobilityDB PR #1148 (set-set spatial join, rebased onto #1162 clean/geo so the
# set-set join inherits the geodetic-disjoint surface) — immutable head SHA.
MOBILITYDB_REPO="${MOBILITYDB_REPO:-https://github.com/estebanzimanyi/MobilityDB.git}"
MOBILITYDB_REF="${MOBILITYDB_REF:-fab7025b24f9b2b45db26d6bd0a3118058d5b55c}" # PR #1148 head

# JMEOS PR #25 (org.mobilitydb.meos facade + MeosSetSetJoin) — immutable head SHA.
JMEOS_REPO="${JMEOS_REPO:-https://github.com/estebanzimanyi/JMEOS.git}"
JMEOS_REF="${JMEOS_REF:-f921d8608f18574a5a824b837af1a6b82c985fc2}" # PR #25 head

# Maven coordinates the jar is installed under (must match kafka-streams-app/pom.xml).
JMEOS_GROUP_ID="${JMEOS_GROUP_ID:-com.mobilitydb}"
JMEOS_ARTIFACT_ID="${JMEOS_ARTIFACT_ID:-jmeos}"
JMEOS_VERSION="${JMEOS_VERSION:-1.4.0}"

# ---------------------------------------------------------------------------
# Layout.
# ---------------------------------------------------------------------------
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
APP_DIR="${SCRIPT_DIR}/kafka-streams-app"
WORK_DIR="${WORK_DIR:-${SCRIPT_DIR}/.build-jmeos}"
JOBS="${JOBS:-$(nproc 2>/dev/null || echo 4)}"

log() { printf '\n\033[1;34m==>\033[0m %s\n' "$*"; }

# ---------------------------------------------------------------------------
# Preconditions.
# ---------------------------------------------------------------------------
for tool in git cmake make mvn; do
command -v "$tool" >/dev/null 2>&1 || { echo "error: '$tool' is required but not on PATH" >&2; exit 1; }
done

mkdir -p "${WORK_DIR}"

# clone_at <repo-url> <ref> <dest>
# Clones (or reuses) <dest> and checks out the exact <ref>. <ref> may be a tag,
# branch or commit SHA; PR-head SHAs are fetched from the pull ref namespace if
# they are not reachable from the default branches.
clone_at() {
local repo="$1" ref="$2" dest="$3"
if [ ! -d "${dest}/.git" ]; then
log "Cloning ${repo}"
git clone "${repo}" "${dest}"
fi
git -C "${dest}" fetch --quiet --tags origin
if ! git -C "${dest}" cat-file -e "${ref}^{commit}" 2>/dev/null; then
# SHA only reachable through an open PR — fetch every PR head, then retry.
git -C "${dest}" fetch --quiet origin '+refs/pull/*/head:refs/remotes/origin/pr/*' || true
fi
log "Checking out ${ref}"
git -C "${dest}" -c advice.detachedHead=false checkout --quiet "${ref}"
}

# ---------------------------------------------------------------------------
# 1. Build libmeos.so from MobilityDB.
# ---------------------------------------------------------------------------
MDB_DIR="${WORK_DIR}/MobilityDB"
clone_at "${MOBILITYDB_REPO}" "${MOBILITYDB_REF}" "${MDB_DIR}"

# Enable the MEOS type families the streaming app exercises: circular buffers,
# network points (default ON) and geoposes (which auto-enables rigid geometries).
# The facade smoke tests link these symbols, so they must be in libmeos.so.
# H3 and POINTCLOUD are left OFF — the app does not use them and they require
# extra system libraries (libh3, libpointcloud); enable them via MEOS_CMAKE_ARGS
# if a downstream consumer ever needs them.
MEOS_CMAKE_ARGS="${MEOS_CMAKE_ARGS:--DCBUFFER=ON -DNPOINT=ON -DPOSE=ON}"

log "Building libmeos.so (MEOS=ON ${MEOS_CMAKE_ARGS})"
rm -rf "${MDB_DIR}/build"
cmake -S "${MDB_DIR}" -B "${MDB_DIR}/build" -DMEOS=ON ${MEOS_CMAKE_ARGS} >/dev/null
cmake --build "${MDB_DIR}/build" --target meos -j "${JOBS}"

LIBMEOS_SO="$(find "${MDB_DIR}/build" -name 'libmeos.so' -print -quit)"
[ -n "${LIBMEOS_SO}" ] || { echo "error: libmeos.so not produced by the MEOS build" >&2; exit 1; }
log "Built ${LIBMEOS_SO}"

# ---------------------------------------------------------------------------
# 2. Build JMEOS.jar against that libmeos.so.
# ---------------------------------------------------------------------------
JMEOS_DIR="${WORK_DIR}/JMEOS"
clone_at "${JMEOS_REPO}" "${JMEOS_REF}" "${JMEOS_DIR}"

# JMEOS' build bundles src/libmeos.so into the jar and JarLibraryLoader extracts it.
cp -f "${LIBMEOS_SO}" "${JMEOS_DIR}/jmeos-core/src/libmeos.so"

log "Building JMEOS.jar"
# FunctionsGenerator lives in the codegen module, which jmeos-core does not
# declare as a Maven dependency — so '-am' will not build it. Compile it first
# so jmeos-core's build-time facade generation can find it. Use
# 'maven.test.skip' (not 'skipTests'): the jmeos-core pom hardcodes
# <skipTests>false</skipTests>, which overrides -DskipTests but not this.
mvn -f "${JMEOS_DIR}/pom.xml" -q -pl codegen compile
mvn -f "${JMEOS_DIR}/pom.xml" -q -pl jmeos-core -am -Dmaven.test.skip=true package

JMEOS_JAR="${JMEOS_DIR}/jar/JMEOS.jar"
[ -f "${JMEOS_JAR}" ] || { echo "error: ${JMEOS_JAR} was not produced" >&2; exit 1; }

# ---------------------------------------------------------------------------
# 3. Install the jar into the local Maven repository.
# ---------------------------------------------------------------------------
log "Installing ${JMEOS_GROUP_ID}:${JMEOS_ARTIFACT_ID}:${JMEOS_VERSION} into the local Maven repo"
mvn -q install:install-file \
-Dfile="${JMEOS_JAR}" \
-DgroupId="${JMEOS_GROUP_ID}" \
-DartifactId="${JMEOS_ARTIFACT_ID}" \
-Dversion="${JMEOS_VERSION}" \
-Dpackaging=jar

# ---------------------------------------------------------------------------
# 4. Stage libmeos.so for the kafka-streams-app runtime (LD_LIBRARY_PATH).
# ---------------------------------------------------------------------------
mkdir -p "${APP_DIR}/lib"
cp -f "${LIBMEOS_SO}" "${APP_DIR}/lib/libmeos.so"

log "Done."
cat <<EOF

Installed jar : ${JMEOS_GROUP_ID}:${JMEOS_ARTIFACT_ID}:${JMEOS_VERSION}
Native library: ${APP_DIR}/lib/libmeos.so

Build and test the app with:

cd ${APP_DIR}
mvn test

EOF
100 changes: 100 additions & 0 deletions kafka-streams-app/docs/benchmark.md

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file references jar/JMEOS.jar directly but should either use ~/.m2 or reconstruct the classpath via mvn

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. docs/benchmark.md no longer references jar/JMEOS.jar by path. The run command reconstructs the classpath via mvn dependency:build-classpath — which now resolves JMEOS from the local Maven repo (~/.m2/.../com/mobilitydb/jmeos/1.4.0/jmeos-1.4.0.jar, installed by build-jmeos.sh) — so the -cp is just target/classes:target/test-classes:$CP, and LD_LIBRARY_PATH=lib points at the libmeos.so the script stages. Verified the documented command loads JMEOS and starts the benchmark (no committed jar).

Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# BerlinMOD streaming benchmark (Kafka Streams)

The Kafka-Streams counterpart of the MobilityFlink BerlinMOD benchmark covers
the 27 BerlinMOD-9 × 3-form cells (Q1–Q9 × continuous / windowed / snapshot)
with two harnesses, both over the same corpus and corpus-derived parameters, and
with the spatial predicates evaluating through MEOS (see
[`MEOSBridge`](../src/main/java/berlinmod/MEOSBridge.java)):

- [`BerlinMODBenchmark`](../src/main/java/berlinmod/BerlinMODBenchmark.java) runs
each cell in isolation through a [`TopologyTestDriver`](../src/main/java/berlinmod/BerlinMODTopology.java)
and reads its output cardinality — an in-process **correctness** harness.
- [`EmbeddedBrokerBenchmark`](../src/test/java/berlinmod/EmbeddedBrokerBenchmark.java)
runs each cell as a real [`KafkaStreams`](../src/main/java/berlinmod/BerlinMODTopology.java)
application against an in-process `EmbeddedKafkaCluster` (a genuine
`KafkaServer` over the loopback network) — the **throughput** harness, the
Flink-comparable analog of MobilityFlink's per-cell jobs.

## Corpus and parameters (regular with MobilityFlink)

The corpus is the real BerlinMOD instants (`--csv`, reprojected EPSG:3857→4326
through MEOS by [`BerlinMODCorpus`](../src/main/java/berlinmod/BerlinMODCorpus.java))
or a synthetic corpus. The per-query parameters (point `P`, region box, road
segment, points of interest, target vehicle ids) **and** the window/tick
granularity are derived from the corpus via `BerlinMODCorpus.derive` — the same
mechanism MobilityFlink uses — and threaded through `BerlinMODTopology.build(Params)`,
so the topology auto-scales to the corpus span instead of carrying a fixed
window/tick. The two stream bindings share one mechanism.

## Throughput harness

Each cell runs against its own fresh `EmbeddedKafkaCluster`, the true analog of
MobilityFlink's independent per-cell jobs: the corpus is produced once into the
input topic, the cell runs as a single-threaded `KafkaStreams` application, and
throughput is the events consumed divided by the wall-clock from streams start
until the application has read the whole input topic (its own
`records-consumed-total` metric reaches the input end offset) and its output has
gone idle. The trailing settle time is excluded from the wall; each consumed
record runs the cell's MEOS predicate, so this is the steady-state per-event
processing rate, directly comparable to the MobilityFlink figures.

Run from `kafka-streams-app/` after `../build-jmeos.sh` (which installs JMEOS into
the local Maven repository and stages `libmeos.so` under `lib/`). The test-scope
classpath that `mvn` reconstructs already carries JMEOS and the embedded broker, so
no jar is referenced by path:

```
CP=$(mvn -q dependency:build-classpath -DincludeScope=test -Dmdep.outputFile=/dev/stdout | tail -1)
LD_LIBRARY_PATH=lib java -Dorg.slf4j.simpleLogger.defaultLogLevel=warn \
-cp target/classes:target/test-classes:$CP \
berlinmod.EmbeddedBrokerBenchmark --csv <berlinmod_instants.csv> [--max N] [--only Q3-continuous]
```

## Figures

Real BerlinMOD corpus (216,075 instants, 5 vehicles, ~11 days, EPSG:3857),
single-broker `EmbeddedKafkaCluster`, one stream thread per cell, Java 21,
16-core host; libmeos built with `-DMEOS/CBUFFER/NPOINT/POSE/RGEO=ON`.

| Cell | Events in | Output rows | Wall (ms) | Throughput (ev/s) |
|---|---:|---:|---:|---:|
| Q1-continuous | 216075 | 5 | 1899 | 113,785 |
| Q1-snapshot | 216075 | 703 | 1701 | 127,029 |
| Q1-windowed | 216075 | 86 | 1650 | 130,956 |
| Q2-continuous | 216075 | 61170 | 1289 | 167,631 |
| Q2-snapshot | 216075 | 140 | 1683 | 128,388 |
| Q2-windowed | 216075 | 50 | 1704 | 126,806 |
| Q3-continuous | 216075 | 216075 | 4618 | 46,790 |
| Q3-snapshot | 216075 | 97 | 2607 | 82,883 |
| Q3-windowed | 216075 | 50 | 4188 | 51,594 |
| Q4-continuous | 216075 | 62 | 9356 | 23,095 |
| Q4-snapshot | 216075 | 4685 | 10499 | 20,581 |
| Q4-windowed | 216075 | 98 | 9741 | 22,182 |
| Q5-continuous | 216075 | 60577 | 22889 | 9,440 |
| Q5-snapshot | 216075 | 34 | 2006 | 107,715 |
| Q5-windowed | 216075 | 6 | 2896 | 74,612 |
| Q6-continuous | 216075 | 216075 | 8708 | 24,814 |
| Q6-snapshot | 216075 | 698 | 7258 | 29,771 |
| Q6-windowed | 216075 | 203 | 6268 | 34,473 |
| Q7-continuous | 216075 | 5 | 4321 | 50,006 |
| Q7-snapshot | 216075 | 632 | 7487 | 28,860 |
| Q7-windowed | 216075 | 53 | 11625 | 18,587 |
| Q8-continuous | 216075 | 216075 | 4807 | 44,950 |
| Q8-snapshot | 216075 | 281 | 2825 | 76,487 |
| Q8-windowed | 216075 | 77 | 5946 | 36,340 |
| Q9-continuous | 216075 | 107870 | 4561 | 47,375 |
| Q9-snapshot | 216075 | 140 | 2281 | 94,729 |
| Q9-windowed | 216075 | 22 | 2384 | 90,636 |

The per-event MEOS-predicate cells that emit one row per input event
(Q3/Q8/Q9-continuous, 216,075 output rows through `edwithin_tgeo_geo` /
`eintersects_tgeo_geo`) sustain 45,000–47,000 ev/s. In the cross-platform
streaming catalog these cells run below MobilityFlink's in-JVM mini-cluster on
the same corpus, because Kafka Streams routes every record through the broker;
the per-cell shape matches across both engines. Q5-continuous is the O(V²)
all-pairs-meeting outlier (9,440 ev/s). Non-spatial cells (Q1/Q2) reach
110,000–168,000 ev/s. The `TopologyTestDriver` correctness harness
([`BerlinMODBenchmark`](../src/main/java/berlinmod/BerlinMODBenchmark.java))
reports output cardinality and correctness, not throughput: its per-event
bookkeeping dominates wall-clock, so a no-predicate cell already runs far below a
real broker's rate.
65 changes: 65 additions & 0 deletions kafka-streams-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,26 @@
<jackson.version>2.18.2</jackson.version>
<slf4j.version>2.0.16</slf4j.version>
<junit.version>5.11.4</junit.version>
<meos.enabled>true</meos.enabled>
<meos.lib.dir>${project.basedir}/lib</meos.lib.dir>
</properties>

<dependencies>
<!--
JMEOS is consumed as an ordinary local-repository dependency. It is
not on Maven Central; install it (and the native libmeos.so) from
source by running ../build-jmeos.sh once before building this module.
-->
<dependency>
<groupId>com.mobilitydb</groupId>
<artifactId>jmeos</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-ffi</artifactId>
<version>2.1.10</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
Expand Down Expand Up @@ -61,6 +78,44 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<!-- Embedded Kafka broker for the runtime throughput benchmark. -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
<type>test-jar</type><classifier>test</classifier><scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<type>test-jar</type><classifier>test</classifier><scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<type>test-jar</type><classifier>test</classifier><scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-server-common</artifactId>
<version>${kafka.version}</version>
<type>test-jar</type><classifier>test</classifier><scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<version>2.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -76,6 +131,16 @@
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.5</version>
<configuration>
<reuseForks>false</reuseForks>
<systemPropertyVariables>
<meos.enabled>${meos.enabled}</meos.enabled>
</systemPropertyVariables>
<environmentVariables>
<LD_LIBRARY_PATH>${meos.lib.dir}</LD_LIBRARY_PATH>
</environmentVariables>
<argLine>--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED</argLine>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
Expand Down
Loading