Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
6b54911
fix(ci): make the Nix build work with NES_ENABLE_MEOS=OFF and stock p…
estebanzimanyi Jun 11, 2026
bfdc695
feat(berlinmod): scaffold the full BerlinMOD-9 × 3-form parity matrix…
estebanzimanyi May 21, 2026
4dd3302
feat(meos): TEMPORAL_LENGTH aggregation closes BerlinMOD-Q6 streaming…
estebanzimanyi May 21, 2026
78cf5a6
feat(meos): PAIR_MEETING + CROSS_DISTANCE aggregations close Q5 + Q9 …
estebanzimanyi May 21, 2026
afe4e2e
docs(berlinmod): streaming-semantics tier overlay + remove stale 'Not…
estebanzimanyi May 21, 2026
267fe99
feat(meos): parameterize PAIR_MEETING dMeet via SQL constant fifth arg
estebanzimanyi May 21, 2026
9171dbe
feat(meos): parameterize CROSS_DISTANCE (vidA, vidB) via SQL constant…
estebanzimanyi May 21, 2026
4f60e2e
tools(codegen): MEOS-operator generator + design proposal for Nebula …
estebanzimanyi May 21, 2026
d70f88e
fix(meos): proto extra_fields + Werror unused-param in aggregations
estebanzimanyi May 21, 2026
fa4fc7d
feat(meos): W1 codegen output — 5 spatial-relation operators (tgeo × …
estebanzimanyi May 21, 2026
d3e0845
feat(meos): W2 codegen output — close the _tgeo_geo spatial-rel row
estebanzimanyi May 21, 2026
b5aab48
feat(meos): W3 codegen output — closes the _tgeo_tgeo spatial-rel row…
estebanzimanyi May 21, 2026
2e302c5
feat(meos): W4 codegen — distance family (nad + dwithin, 5 ops + 2 te…
estebanzimanyi May 21, 2026
20bd61e
feat(codegen): auto-inject parser glue — closes SQL loop for W1–W4 (2…
estebanzimanyi May 21, 2026
2fe981e
feat(meos): W5a codegen — tnumber NAD ops (4 ops + 2 templates + 2 sy…
estebanzimanyi May 21, 2026
66f15cc
feat(meos): W6 codegen — tgeo restriction at/minus geom (2 ops + 1 te…
estebanzimanyi May 21, 2026
f064d40
feat(meos): W7 codegen — windowed aggregations (12 ops + tools/codege…
estebanzimanyi May 21, 2026
8005822
feat(meos): W8 codegen — tnumber avg/twavg aggregations (3 ops, mecha…
estebanzimanyi May 21, 2026
861ad2a
feat(meos): W9 codegen — tgeo scalar accessors w/ new return types (5…
estebanzimanyi May 21, 2026
8321211
feat(meos): W10 codegen — tcbuffer × geo spatial-rels (10 ops + 1 tem…
estebanzimanyi May 21, 2026
d868f94
feat(meos): W11 codegen — tcbuffer × cbuffer spatial-rels (10 ops + n…
estebanzimanyi May 21, 2026
17fc31a
feat(meos): W12 codegen — tcbuffer × tcbuffer 2-arg spatial-rels (6 o…
estebanzimanyi May 21, 2026
2b1c4cf
feat(meos): W13 codegen — tcbuffer dwithin family (6 ops + 3 with-dis…
estebanzimanyi May 21, 2026
cb74c64
feat(meos): W14 codegen — tpose × geo spatial-rels via composition (9…
estebanzimanyi May 21, 2026
898f145
feat(meos): W15 codegen — tpose × tpose spatial-rels via composition …
estebanzimanyi May 22, 2026
d5b14ad
feat(meos): W18 codegen — tnpoint spatial-rels via composition (18 op…
estebanzimanyi May 22, 2026
53d8854
feat(meos): W19 codegen — tpose + tnpoint nad (nearest-approach dista…
estebanzimanyi May 22, 2026
0f57e10
feat(meos): W20 codegen — tpose + tnpoint dwithin (8 ops + 4 template…
estebanzimanyi May 22, 2026
80800a7
feat(meos): W21 codegen — tcbuffer nad (nearest-approach distance) (3…
estebanzimanyi May 22, 2026
4aab018
methodology(streaming): PROVEN MEOS-parity harness + type-aware calla…
estebanzimanyi May 22, 2026
213dc18
feat(nebula): comparison family (60 operators) + signature-driven des…
estebanzimanyi May 23, 2026
2626e04
feat(nebula): W23 — 18 spatial-relation + comparison operators (exist…
estebanzimanyi May 23, 2026
2d95b93
feat(nebula): W24 — generalized per-event assembler + scalar families…
estebanzimanyi May 23, 2026
601a7a4
tools(nebula): build_local.sh — dev-image build with auto MQTT toggle
estebanzimanyi May 23, 2026
69ea698
feat(nebula): W25 — extract marshaler for unary transforms (214->224)
estebanzimanyi May 23, 2026
024006b
feat(nebula): W26 — box/span family via per-event box-literals (224->…
estebanzimanyi May 23, 2026
63fc742
feat(nebula): W27 — windowed extent->box aggregates (VARSIZED) (243->…
estebanzimanyi May 23, 2026
ba4994c
feat(nebula): W28 — value/time Span extent aggregates (scalar fold) (…
estebanzimanyi May 23, 2026
01797ea
feat(nebula): W29 — 55 box/temporal position predicates (per-event) (…
estebanzimanyi May 23, 2026
4367c77
feat(nebula): W30 — windowed value-union set-collect aggregates (304-…
estebanzimanyi May 23, 2026
59e67af
feat(nebula): durable query-literal round-trip for parameterized aggr…
estebanzimanyi May 23, 2026
dc126d8
feat(nebula): MEOS function library composes over VARSIZED WKB values
estebanzimanyi May 23, 2026
185d950
feat(nebula): TRAJECTORY_WKB — windowed mini-trip trajectory as a hex…
estebanzimanyi May 23, 2026
145b416
feat(nebula): expandable-Temporal* streaming aggregate substrate (309…
estebanzimanyi May 23, 2026
2072c82
feat(nebula): value-output windowed aggregates (Temporal->hex-WKB) (3…
estebanzimanyi May 23, 2026
a468247
feat(nebula): tnumber value-output windowed aggregates (316->324)
estebanzimanyi May 24, 2026
20def2f
fix(nebula): make windowed-aggregate query plans deserialize and run
estebanzimanyi May 24, 2026
2468bf7
fix(nebula): unqualify computed sink field in tpoint_length_wkb systest
estebanzimanyi May 24, 2026
00b96fa
feat(nebula): tnpoint network-constrained windowed aggregates (324->327)
estebanzimanyi May 24, 2026
7e82713
fix(harness): resolve systest tokens for every operator family in pro…
estebanzimanyi May 24, 2026
0fdf374
docs(nebula): measured NebulaStream surface — 327 wired, 38 callable
estebanzimanyi May 24, 2026
44c3791
feat(nebula): unary temporal-transform value-output aggregates (327->…
estebanzimanyi May 24, 2026
7f67f8e
docs(nebula): NebulaStream surface 332 wired, 43 callable
estebanzimanyi May 24, 2026
d820ed5
fix(nebula): initialize MEOS per worker thread (thread-local session …
estebanzimanyi May 24, 2026
13e87d2
feat(nebula): geometry value-output windowed aggregates (332->336)
estebanzimanyi May 24, 2026
01a8b2c
docs(nebula): NebulaStream surface 336 wired, 47 callable
estebanzimanyi May 24, 2026
f1b83fc
docs(nebula): document the cross-stream (pairwise) tier and its boundary
estebanzimanyi May 24, 2026
62704d9
docs(nebula): cross-stream tier follows per-vehicle grouping
estebanzimanyi May 24, 2026
a03cf39
feat(nebula): cross-vehicle STBox predicates as per-event functions (…
estebanzimanyi May 24, 2026
3005237
docs(nebula): NebulaStream 365 wired, 49 callable; STBox cross-vehicl…
estebanzimanyi May 24, 2026
d02d1d6
perf(nebula): TSPATIAL_EXTENT folds an incremental STBox slot, not a …
estebanzimanyi May 24, 2026
3254709
perf(nebula): TNUMBER_EXTENT folds an incremental TBox slot, not a Pa…
estebanzimanyi May 24, 2026
311c750
perf(nebula): span-extent aggregates fold an incremental Span slot, n…
estebanzimanyi May 24, 2026
c2b17c5
perf(nebula): set-union aggregates fold an incremental Set slot, not …
estebanzimanyi May 24, 2026
339f1b2
docs(nebula): aggregates use one incremental MEOS-accumulator slot
estebanzimanyi May 24, 2026
dfb4a60
feat(nebula): cross-vehicle tnumber-vs-tnumber predicates (per-event,…
estebanzimanyi May 24, 2026
c99ffef
docs(nebula): NebulaStream 384 wired, 57 callable; tnumber cross-vehi…
estebanzimanyi May 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
28 changes: 28 additions & 0 deletions .github/workflows/streaming_parity_gate.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Streaming-parity gate — the measured-not-guessed guard (Path-to-100 step 6).
# Pure-Python, path-filtered to the parity tooling: it does NOT build NebulaStream
# and never runs on a normal C++ PR. It checks the committed feed for an L3
# callability regression and an over-claim (a "100%" callability statement while
# the gap list is non-empty). Re-measuring the full surface needs the JMEOS jar +
# libmeos and is run out-of-band (see tools/streaming_parity/feeds/README.md).
name: streaming-parity gate

on:
pull_request:
paths:
- 'tools/streaming_parity/**'
- 'doc/methodology/streaming_parity*'
push:
paths:
- 'tools/streaming_parity/**'
- 'doc/methodology/streaming_parity*'

jobs:
gate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.x'
- name: streaming-parity gate (no L3 regression, no over-claim)
run: python3 tools/streaming_parity/ci_gate.py
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ endif ()

add_custom_target(build_all_plugins)

# Declared here so add_compile_definitions reaches all sibling nes-* targets.
# nes-plugins/CMakeLists.txt re-declares the same option (no-op when cached).
option(NES_ENABLE_MEOS "Enable MEOS plugin (requires libmeos installed on the system)" ON)
if(NES_ENABLE_MEOS)
add_compile_definitions(NES_ENABLE_MEOS)
endif()

# Add target for common lib, which contains a minimal set
# of shared functionality used by all components of nes
file(GLOB NES_DIRECTORIES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "nes-*")
Expand Down
21 changes: 21 additions & 0 deletions Input/input_berlinmod.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
1735711200,100,4.3517,50.8503
1735711200,300,4.2000,50.7500
1735711201,200,4.3060,50.8270
1735711202,100,4.3517,50.8503
1735711202,300,4.2000,50.7500
1735711203,200,4.3060,50.8270
1735711204,100,4.3517,50.8503
1735711204,300,4.2000,50.7500
1735711205,200,4.3060,50.8270
1735711206,100,4.3517,50.8503
1735711206,300,4.2000,50.7500
1735711207,200,4.3060,50.8270
1735711208,100,4.3517,50.8503
1735711208,300,4.2000,50.7500
1735711209,200,4.3060,50.8270
1735711210,100,4.3517,50.8503
1735711210,300,4.2000,50.7500
1735711211,200,4.3060,50.8270
1735711212,100,4.3517,50.8503
1735711212,300,4.2000,50.7500
1735711213,200,4.3060,50.8270
47 changes: 47 additions & 0 deletions Queries/berlinmod/q1_continuous.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# BerlinMOD-Q1 — continuous form
# "Which vehicles have appeared in the stream?"
# Per 1-second sliding bucket: emit (start, end, vehicle_id, event-count-in-bucket).
# Reading N rows over consecutive buckets enumerates the distinct-vehicles-seen set.

query: |
SELECT start,
end,
vehicle_id,
COUNT(time_utc) AS events
FROM berlinmod_stream
GROUP BY vehicle_id
WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
- { name: BERLINMOD_STREAM$EVENTS, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q1_continuous.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
46 changes: 46 additions & 0 deletions Queries/berlinmod/q1_snapshot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# BerlinMOD-Q1 — snapshot form
# "At each 5-second tick, list of distinct vehicles seen in the tick window."
# Streaming approximation of the batch BerlinMOD-Q1 snapshot at time T.

query: |
SELECT start,
end,
vehicle_id,
COUNT(time_utc) AS events
FROM berlinmod_stream
GROUP BY vehicle_id
WINDOW TUMBLING(time_utc, SIZE 5 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
- { name: BERLINMOD_STREAM$EVENTS, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q1_snapshot.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
46 changes: 46 additions & 0 deletions Queries/berlinmod/q1_windowed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# BerlinMOD-Q1 — windowed form
# "Per 10-second tumbling window, distinct vehicles seen."
# Emits one row per (window, vehicle) seen; reading N rows per window = distinctCount.

query: |
SELECT start,
end,
vehicle_id,
COUNT(time_utc) AS events
FROM berlinmod_stream
GROUP BY vehicle_id
WINDOW TUMBLING(time_utc, SIZE 10 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
- { name: BERLINMOD_STREAM$EVENTS, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q1_windowed.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
44 changes: 44 additions & 0 deletions Queries/berlinmod/q2_continuous.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# BerlinMOD-Q2 — continuous form
# "Where is vehicle X (= 200) right now?"
# Per 1-second sliding bucket, emit a trajectory snippet for vehicle X.

query: |
SELECT start,
end,
TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory
FROM berlinmod_stream
WHERE vehicle_id = UINT64(200)
WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED }
config:
file_path: "/workspace/Output/output_berlinmod_q2_continuous.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
43 changes: 43 additions & 0 deletions Queries/berlinmod/q2_snapshot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# BerlinMOD-Q2 — snapshot form
# "At each 5-second tick, snapshot of vehicle X's (= 200) trajectory in the tick."

query: |
SELECT start,
end,
TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory
FROM berlinmod_stream
WHERE vehicle_id = UINT64(200)
WINDOW TUMBLING(time_utc, SIZE 5 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED }
config:
file_path: "/workspace/Output/output_berlinmod_q2_snapshot.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
43 changes: 43 additions & 0 deletions Queries/berlinmod/q2_windowed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# BerlinMOD-Q2 — windowed form
# "Per 10-second tumbling window, trajectory of vehicle X (= 200)."

query: |
SELECT start,
end,
TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory
FROM berlinmod_stream
WHERE vehicle_id = UINT64(200)
WINDOW TUMBLING(time_utc, SIZE 10 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED }
config:
file_path: "/workspace/Output/output_berlinmod_q2_windowed.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
49 changes: 49 additions & 0 deletions Queries/berlinmod/q3_continuous.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# BerlinMOD-Q3 — continuous form
# "Vehicles within 5 km of Brussels city centre, right now."
# Per 1-second sliding bucket, emit (start, end, vehicle_id) for events near P.

query: |
SELECT start,
end,
vehicle_id
FROM berlinmod_stream
WHERE edwithin_tgeo_geo(gps_lon,
gps_lat,
time_utc,
'SRID=4326;POINT(4.3517 50.8503)',
FLOAT64(5000.0)) = INT32(1)
GROUP BY vehicle_id
WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q3_continuous.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
Loading
Loading