Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified flink-processor/jar/JMEOS.jar
Binary file not shown.
Binary file added flink-processor/lib/libmeos.so
Binary file not shown.
107 changes: 22 additions & 85 deletions flink-processor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
<kafka.version>3.2.0</kafka.version>
<log4j.version>2.17.2</log4j.version>
<target.java.version>21</target.java.version>
<!-- MEOS smoke tests run when libmeos is present; the runner resolves it
through the repo lib via LD_LIBRARY_PATH (avoids a stale system
/usr/local/lib/libmeos.so shadowing the pinned one). -->
<meos.enabled>true</meos.enabled>
<meos.lib.dir>${project.basedir}/lib</meos.lib.dir>
</properties>

<dependencies>
Expand Down Expand Up @@ -214,6 +219,23 @@
</execution>
</executions>
</plugin>
<!-- Run the MEOS smoke tests against the pinned libmeos: propagate
meos.enabled to the forked JVM and resolve libmeos from the repo
lib dir (LD_LIBRARY_PATH takes precedence over a stale system lib). -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.5</version>
<configuration>
<systemPropertyVariables>
<meos.enabled>${meos.enabled}</meos.enabled>
</systemPropertyVariables>
<environmentVariables>
<LD_LIBRARY_PATH>${meos.lib.dir}</LD_LIBRARY_PATH>
</environmentVariables>
<argLine>--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED</argLine>
</configuration>
</plugin>
</plugins>
</build>

Expand All @@ -226,29 +248,6 @@
both). When a family is excluded, its generated MeosOps* facade sources
and its smoke test are dropped from the build. -->
<profiles>
<profile>
<id>cbuffer-exclude-unset</id>
<activation>
<property><name>!CBUFFER</name></property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>**/meos/MeosOpsTCbuffer.java</exclude>
<exclude>**/meos/MeosOpsFreeCbuffer.java</exclude>
<exclude>**/meos/MeosOpsCbufferSet.java</exclude>
</excludes>
<testExcludes combine.children="append">
<testExclude>**/MeosCbufferSmokeTest.java</testExclude>
</testExcludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>cbuffer-exclude-off</id>
<activation>
Expand Down Expand Up @@ -295,29 +294,6 @@
</plugins>
</build>
</profile>
<profile>
<id>pose-exclude-unset</id>
<activation>
<property><name>!POSE</name></property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>**/meos/MeosOpsTPose.java</exclude>
<exclude>**/meos/MeosOpsFreePose.java</exclude>
<exclude>**/meos/MeosOpsPoseSet.java</exclude>
</excludes>
<testExcludes combine.children="append">
<testExclude>**/MeosPoseSmokeTest.java</testExclude>
</testExcludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>pose-exclude-off</id>
<activation>
Expand Down Expand Up @@ -364,26 +340,6 @@
</plugins>
</build>
</profile>
<profile>
<id>rgeo-exclude-unset</id>
<activation>
<property><name>!RGEO</name></property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>**/meos/MeosOpsTRGeometry.java</exclude>
<exclude>**/meos/MeosOpsTRGeometryInst.java</exclude>
<exclude>**/meos/MeosOpsFreeRgeo.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>rgeo-exclude-off</id>
<activation>
Expand Down Expand Up @@ -424,25 +380,6 @@
</plugins>
</build>
</profile>
<profile>
<id>h3-exclude-unset</id>
<activation>
<property><name>!H3</name></property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>**/meos/MeosOpsTh3index.java</exclude>
<exclude>**/meos/MeosOpsFreeH3.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>h3-exclude-off</id>
<activation>
Expand Down
120 changes: 120 additions & 0 deletions flink-processor/src/test/java/berlinmod/BerlinMODSetSetJoinTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*****************************************************************************
*
* This MobilityDB code is provided under The PostgreSQL License.
* Copyright (c) 2020-2026, Université libre de Bruxelles and MobilityDB
* contributors
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written
* agreement is hereby granted, provided that the above copyright notice and
* this paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL UNIVERSITE LIBRE DE BRUXELLES BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
* EVEN IF UNIVERSITE LIBRE DE BRUXELLES HAS BEEN ADVISED OF THE POSSIBILITY
* OF SUCH DAMAGE.
*
* UNIVERSITE LIBRE DE BRUXELLES SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON
* AN "AS IS" BASIS, AND UNIVERSITE LIBRE DE BRUXELLES HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*****************************************************************************/
package berlinmod;

import functions.GeneratedFunctions;
import jnr.ffi.Pointer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.mobilitydb.meos.MeosSetSetJoin;

import java.util.HashSet;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

/**
* Verifies the BerlinMOD trip-level NxN spatial join (the kernel-pruned
* {@link MeosSetSetJoin} set-set family) against an independent per-pair scalar
* baseline ({@code edwithin_tgeo_tgeo} / {@code eintersects_tgeo_tgeo}). The two
* code paths must agree exactly on which trip pairs ever meet / are always
* disjoint. Runs only with {@code -Dmeos.enabled=true} and an extended libmeos
* on the library path.
*/
@EnabledIfSystemProperty(named = "meos.enabled", matches = "true")
class BerlinMODSetSetJoinTest {

// Four trajectory trips: T1 crosses T0's path mid-window; T3 coincides with
// T0; T2 is far from everything.
private static final String[] TRIPS = {
"[POINT(0 0)@2000-01-01, POINT(10 0)@2000-01-02]",
"[POINT(5 -100)@2000-01-01, POINT(5 100)@2000-01-02]",
"[POINT(100 100)@2000-01-01, POINT(110 100)@2000-01-02]",
"[POINT(0 0)@2000-01-01, POINT(10 0)@2000-01-02]",
};
private static final double MEET_DIST = 1.0;

private static Pointer[] trips;

@BeforeAll
static void init() {
GeneratedFunctions.meos_initialize_error_handler((level, code, message) -> { });
GeneratedFunctions.meos_initialize();
trips = new Pointer[TRIPS.length];
for (int i = 0; i < TRIPS.length; i++) trips[i] = GeneratedFunctions.tgeompoint_in(TRIPS[i]);
}

@AfterAll
static void fini() {
GeneratedFunctions.meos_finalize();
}

private static Set<Long> pairSet(int[][] pairs) {
Set<Long> s = new HashSet<>();
for (int[] p : pairs) s.add(((long) p[0] << 32) | (p[1] & 0xffffffffL));
return s;
}

@Test
void eDwithinPairsMatchesScalarBaseline() {
Set<Long> kernel = pairSet(MeosSetSetJoin.eDwithinPairs(trips, trips, MEET_DIST));
Set<Long> baseline = new HashSet<>();
for (int i = 0; i < trips.length; i++)
for (int j = 0; j < trips.length; j++)
if (GeneratedFunctions.edwithin_tgeo_tgeo(trips[i], trips[j], MEET_DIST) == 1)
baseline.add(((long) i << 32) | j);
assertEquals(baseline, kernel, "set-set eDwithinPairs must equal the per-pair edwithin scalar");
// T0/T3 coincide and T1 crosses T0 — the join is non-empty.
org.junit.jupiter.api.Assertions.assertFalse(kernel.isEmpty());
}

@Test
void aDisjointPairsMatchesScalarBaseline() {
Set<Long> kernel = pairSet(MeosSetSetJoin.aDisjointPairs(trips, trips));
Set<Long> baseline = new HashSet<>();
for (int i = 0; i < trips.length; i++)
for (int j = 0; j < trips.length; j++)
if (GeneratedFunctions.eintersects_tgeo_tgeo(trips[i], trips[j]) == 0)
baseline.add(((long) i << 32) | j);
assertEquals(baseline, kernel, "set-set aDisjointPairs must equal the never-intersecting scalar baseline");
}

@Test
void tDwithinPairsSupersetOfEverWithinWithPeriods() {
MeosSetSetJoin.TDwithin t = MeosSetSetJoin.tDwithinPairs(trips, trips, MEET_DIST);
Set<Long> tdw = pairSet(t.pairs);
Set<Long> ever = pairSet(MeosSetSetJoin.eDwithinPairs(trips, trips, MEET_DIST));
// Continuous tDwithin also reports transient trajectory crossings (e.g. T0/T1
// coincide at the mid-window crossing) that the ever-within predicate misses,
// so the within-interval pairs are a superset of the ever-within pairs.
org.junit.jupiter.api.Assertions.assertTrue(tdw.containsAll(ever),
"every ever-within pair has a within-interval");
for (int k = 0; k < t.pairs.length; k++)
assertNotNull(t.periodsHexwkb[k], "every within pair carries its in-range period spanset");
}
}