Skip to content

Commit 9d1d835

Browse files
committed
Add tests for ip and date types
1 parent 1634d55 commit 9d1d835

File tree

2 files changed

+134
-0
lines changed

2 files changed

+134
-0
lines changed

src/aggregation/bucket/composite.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ impl SegmentCompositeCollector {
353353
pub(crate) fn from_req_and_validate(
354354
req: &CompositeAggregation,
355355
sub_aggregations: &mut AggregationsWithAccessor,
356+
col_types: &[Vec<ColumnType>], // for validation
356357
accessor_idx: usize,
357358
) -> crate::Result<Self> {
358359
if req.sources.is_empty() {
@@ -366,6 +367,30 @@ impl SegmentCompositeCollector {
366367
));
367368
}
368369

370+
for source_columns in col_types {
371+
if source_columns.is_empty() {
372+
return Err(TantivyError::InvalidArgument(
373+
"composite aggregation source must have at least one accessor".to_string(),
374+
));
375+
}
376+
if source_columns.contains(&ColumnType::Bytes) {
377+
return Err(TantivyError::InvalidArgument(
378+
"composite aggregation does not support 'bytes' field type".to_string(),
379+
));
380+
}
381+
if source_columns.contains(&ColumnType::DateTime) && source_columns.len() > 1 {
382+
return Err(TantivyError::InvalidArgument(
383+
"composite aggregation expects 'date' fields to have a single column"
384+
.to_string(),
385+
));
386+
}
387+
if source_columns.contains(&ColumnType::IpAddr) && source_columns.len() > 1 {
388+
return Err(TantivyError::InvalidArgument(
389+
"composite aggregation expects 'ip' fields to have a single column".to_string(),
390+
));
391+
}
392+
}
393+
369394
let blueprint = if !sub_aggregations.is_empty() {
370395
let sub_aggregation = build_segment_agg_collector(sub_aggregations)?;
371396
Some(sub_aggregation)
@@ -630,6 +655,9 @@ fn recursive_key_visitor(
630655

631656
#[cfg(test)]
632657
mod tests {
658+
use std::net::{Ipv4Addr, Ipv6Addr};
659+
660+
use common::DateTime;
633661
use serde_json::json;
634662

635663
use crate::aggregation::agg_req::Aggregations;
@@ -1270,6 +1298,104 @@ mod tests {
12701298
Ok(())
12711299
}
12721300

1301+
#[test]
1302+
fn composite_aggregation_test_date_fields() -> crate::Result<()> {
1303+
let mut schema_builder = Schema::builder();
1304+
let date_field = schema_builder.add_date_field("timestamp", FAST);
1305+
let index = Index::create_in_ram(schema_builder.build());
1306+
{
1307+
let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?;
1308+
// Add documents with different dates (string timestamps)
1309+
index_writer
1310+
.add_document(doc!(date_field => DateTime::from_timestamp_secs(1609459200)))?; // 2021-01-01
1311+
index_writer
1312+
.add_document(doc!(date_field => DateTime::from_timestamp_secs(1640995200)))?; // 2022-01-01
1313+
index_writer
1314+
.add_document(doc!(date_field => DateTime::from_timestamp_secs(1609459200)))?; // 2021 duplicate
1315+
index_writer
1316+
.add_document(doc!(date_field => DateTime::from_timestamp_secs(1672531200)))?; // 2023-01-01
1317+
index_writer.commit()?;
1318+
}
1319+
1320+
// Test composite aggregation on date field
1321+
let agg_req: Aggregations = serde_json::from_value(json!({
1322+
"my_composite": {
1323+
"composite": {
1324+
"sources": [
1325+
{"timestamp": {"terms": {"field": "timestamp"}}}
1326+
],
1327+
"size": 10
1328+
}
1329+
}
1330+
}))
1331+
.unwrap();
1332+
1333+
let res = exec_request(agg_req, &index)?;
1334+
let buckets = &res["my_composite"]["buckets"];
1335+
1336+
// Should be ordered by date value (as formatted strings)
1337+
assert_eq!(
1338+
buckets,
1339+
&json!([
1340+
{"key": {"timestamp": "2021-01-01T00:00:00Z"}, "doc_count": 2},
1341+
{"key": {"timestamp": "2022-01-01T00:00:00Z"}, "doc_count": 1},
1342+
{"key": {"timestamp": "2023-01-01T00:00:00Z"}, "doc_count": 1}
1343+
])
1344+
);
1345+
1346+
Ok(())
1347+
}
1348+
1349+
#[test]
1350+
fn composite_aggregation_test_ip_fields() -> crate::Result<()> {
1351+
let mut schema_builder = Schema::builder();
1352+
let ip_field = schema_builder.add_ip_addr_field("ip_addr", FAST);
1353+
let index = Index::create_in_ram(schema_builder.build());
1354+
{
1355+
let ipv4 = |ip: &str| ip.parse::<Ipv4Addr>().unwrap().to_ipv6_mapped();
1356+
let ipv6 = |ip: &str| ip.parse::<Ipv6Addr>().unwrap();
1357+
let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?;
1358+
index_writer.add_document(doc!(ip_field => ipv4("192.168.1.1")))?;
1359+
index_writer.add_document(doc!(ip_field => ipv4("10.0.0.1")))?;
1360+
index_writer.add_document(doc!(ip_field => ipv4("192.168.1.1")))?; // duplicate
1361+
index_writer.add_document(doc!(ip_field => ipv4("172.16.0.1")))?;
1362+
index_writer.add_document(doc!(ip_field => ipv6("2001:db8::1")))?;
1363+
index_writer.add_document(doc!(ip_field => ipv6("::1")))?; // localhost
1364+
index_writer.add_document(doc!(ip_field => ipv6("2001:db8::1")))?; // duplicate
1365+
index_writer.commit()?;
1366+
}
1367+
1368+
// Test composite aggregation on IP field
1369+
let agg_req: Aggregations = serde_json::from_value(json!({
1370+
"my_composite": {
1371+
"composite": {
1372+
"sources": [
1373+
{"ip_addr": {"terms": {"field": "ip_addr"}}}
1374+
],
1375+
"size": 10
1376+
}
1377+
}
1378+
}))
1379+
.unwrap();
1380+
1381+
let res = exec_request(agg_req, &index)?;
1382+
let buckets = &res["my_composite"]["buckets"];
1383+
1384+
// Should be ordered by IP address
1385+
assert_eq!(
1386+
buckets,
1387+
&json!([
1388+
{"key": {"ip_addr": "::1"}, "doc_count": 1},
1389+
{"key": {"ip_addr": "10.0.0.1"}, "doc_count": 1},
1390+
{"key": {"ip_addr": "172.16.0.1"}, "doc_count": 1},
1391+
{"key": {"ip_addr": "192.168.1.1"}, "doc_count": 2},
1392+
{"key": {"ip_addr": "2001:db8::1"}, "doc_count": 2}
1393+
])
1394+
);
1395+
1396+
Ok(())
1397+
}
1398+
12731399
#[test]
12741400
fn composite_aggregation_test_multiple_column_types() -> crate::Result<()> {
12751401
let mut schema_builder = Schema::builder();

src/aggregation/segment_agg_result.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
66
use std::fmt::Debug;
77

8+
use columnar::ColumnType;
9+
810
pub(crate) use super::agg_limits::AggregationLimitsGuard;
911
use super::agg_req::AggregationVariants;
1012
use super::agg_req_with_accessor::{AggregationWithAccessor, AggregationsWithAccessor};
@@ -176,9 +178,15 @@ pub(crate) fn build_single_agg_segment_collector(
176178
SegmentCardinalityCollector::from_req(req.field_type, accessor_idx, missing),
177179
)),
178180
Composite(composite_aggregation) => {
181+
let col_types: Vec<Vec<ColumnType>> = req
182+
.composite_accessors
183+
.iter()
184+
.map(|a| a.iter().map(|ca| ca.column_type).collect())
185+
.collect::<Vec<Vec<ColumnType>>>();
179186
Ok(Box::new(SegmentCompositeCollector::from_req_and_validate(
180187
composite_aggregation,
181188
&mut req.sub_aggregation,
189+
&col_types,
182190
accessor_idx,
183191
)?))
184192
}

0 commit comments

Comments
 (0)