diff --git a/parquet-variant-compute/src/arrow_to_variant.rs b/parquet-variant-compute/src/arrow_to_variant.rs index 5e01aba3c1a1..a56e6484bcfc 100644 --- a/parquet-variant-compute/src/arrow_to_variant.rs +++ b/parquet-variant-compute/src/arrow_to_variant.rs @@ -17,8 +17,8 @@ use crate::type_conversion::CastOptions; use arrow::array::{ - Array, AsArray, FixedSizeListArray, GenericBinaryArray, GenericListArray, GenericListViewArray, - GenericStringArray, OffsetSizeTrait, PrimitiveArray, + Array, ArrayRef, AsArray, FixedSizeListArray, GenericBinaryArray, GenericListArray, + GenericListViewArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, }; use arrow::compute::kernels::cast; use arrow::datatypes::{ @@ -553,20 +553,42 @@ impl<'a, L: ListLikeArray> ListArrowToVariantBuilder<'a, L> { } } -/// Trait for list-like arrays that can provide element ranges +/// Trait for list-like arrays that can provide common helpers pub(crate) trait ListLikeArray: Array { + type OffsetSize: OffsetSizeTrait; + /// Get the values array - fn values(&self) -> &dyn Array; + fn values(&self) -> &ArrayRef; + + /// Get the offsets backing the list values + #[cfg(test)] + fn value_offsets(&self) -> Option<&[Self::OffsetSize]>; + + /// Size (number of values) for the element at `index` + #[cfg(test)] + fn value_size(&self, index: usize) -> Self::OffsetSize; /// Get the start and end indices for a list element fn element_range(&self, index: usize) -> Range; } impl ListLikeArray for GenericListArray { - fn values(&self) -> &dyn Array { + type OffsetSize = O; + + fn values(&self) -> &ArrayRef { self.values() } + #[cfg(test)] + fn value_offsets(&self) -> Option<&[Self::OffsetSize]> { + Some(GenericListArray::value_offsets(self)) + } + + #[cfg(test)] + fn value_size(&self, index: usize) -> Self::OffsetSize { + GenericListArray::value_length(self, index) + } + fn element_range(&self, index: usize) -> Range { let offsets = self.offsets(); let start = offsets[index].as_usize(); @@ -576,10 +598,22 @@ impl ListLikeArray for GenericListArray { } impl ListLikeArray for GenericListViewArray { - fn values(&self) -> &dyn Array { + type OffsetSize = O; + + fn values(&self) -> &ArrayRef { self.values() } + #[cfg(test)] + fn value_offsets(&self) -> Option<&[Self::OffsetSize]> { + Some(GenericListViewArray::value_offsets(self)) + } + + #[cfg(test)] + fn value_size(&self, index: usize) -> Self::OffsetSize { + GenericListViewArray::value_size(self, index) + } + fn element_range(&self, index: usize) -> Range { let offsets = self.value_offsets(); let sizes = self.value_sizes(); @@ -590,10 +624,22 @@ impl ListLikeArray for GenericListViewArray { } impl ListLikeArray for FixedSizeListArray { - fn values(&self) -> &dyn Array { + type OffsetSize = i32; + + fn values(&self) -> &ArrayRef { self.values() } + #[cfg(test)] + fn value_offsets(&self) -> Option<&[Self::OffsetSize]> { + None + } + + #[cfg(test)] + fn value_size(&self, _index: usize) -> Self::OffsetSize { + self.value_length() + } + fn element_range(&self, index: usize) -> Range { let value_length = self.value_length().as_usize(); let offset = index * value_length; diff --git a/parquet-variant-compute/src/shred_variant.rs b/parquet-variant-compute/src/shred_variant.rs index 51306ebd1697..fa00bd07d64a 100644 --- a/parquet-variant-compute/src/shred_variant.rs +++ b/parquet-variant-compute/src/shred_variant.rs @@ -22,14 +22,16 @@ use crate::variant_to_arrow::{ PrimitiveVariantToArrowRowBuilder, make_primitive_variant_to_arrow_row_builder, }; use crate::{VariantArray, VariantValueArrayBuilder}; -use arrow::array::{ArrayRef, BinaryViewArray, NullBufferBuilder}; -use arrow::buffer::NullBuffer; +use arrow::array::{ + ArrayRef, BinaryViewArray, GenericListArray, GenericListViewArray, NullBufferBuilder, + OffsetSizeTrait, +}; +use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; use arrow::compute::CastOptions; -use arrow::datatypes::{DataType, Fields, TimeUnit}; +use arrow::datatypes::{ArrowNativeTypeOp, DataType, FieldRef, Fields, TimeUnit}; use arrow::error::{ArrowError, Result}; -use parquet_variant::{Variant, VariantBuilderExt}; - use indexmap::IndexMap; +use parquet_variant::{Variant, VariantBuilderExt, VariantList}; use std::sync::Arc; /// Shreds the input binary variant using a target shredding schema derived from the requested data type. @@ -117,10 +119,17 @@ pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>( DataType::List(_) | DataType::LargeList(_) | DataType::ListView(_) - | DataType::LargeListView(_) - | DataType::FixedSizeList(..) => { + | DataType::LargeListView(_) => { + let typed_value_builder = VariantToShreddedArrayVariantRowBuilder::try_new( + data_type, + cast_options, + capacity, + )?; + VariantToShreddedVariantRowBuilder::Array(typed_value_builder) + } + DataType::FixedSizeList(..) => { return Err(ArrowError::NotYetImplemented( - "Shredding variant array values as arrow lists".to_string(), + "Shredding variant array values as fixed-size lists".to_string(), )); } // Supported shredded primitive types, see Variant shredding spec: @@ -162,13 +171,16 @@ pub(crate) fn make_variant_to_shredded_variant_arrow_row_builder<'a>( pub(crate) enum VariantToShreddedVariantRowBuilder<'a> { Primitive(VariantToShreddedPrimitiveVariantRowBuilder<'a>), + Array(VariantToShreddedArrayVariantRowBuilder<'a>), Object(VariantToShreddedObjectVariantRowBuilder<'a>), } + impl<'a> VariantToShreddedVariantRowBuilder<'a> { pub fn append_null(&mut self) -> Result<()> { use VariantToShreddedVariantRowBuilder::*; match self { Primitive(b) => b.append_null(), + Array(b) => b.append_null(), Object(b) => b.append_null(), } } @@ -177,6 +189,7 @@ impl<'a> VariantToShreddedVariantRowBuilder<'a> { use VariantToShreddedVariantRowBuilder::*; match self { Primitive(b) => b.append_value(value), + Array(b) => b.append_value(value), Object(b) => b.append_value(value), } } @@ -185,6 +198,7 @@ impl<'a> VariantToShreddedVariantRowBuilder<'a> { use VariantToShreddedVariantRowBuilder::*; match self { Primitive(b) => b.finish(), + Array(b) => b.finish(), Object(b) => b.finish(), } } @@ -211,6 +225,7 @@ impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> { top_level, } } + fn append_null(&mut self) -> Result<()> { // Only the top-level struct that represents the variant can be nullable; object fields and // array elements are non-nullable. @@ -218,6 +233,7 @@ impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> { self.value_builder.append_null(); self.typed_value_builder.append_null() } + fn append_value(&mut self, value: Variant<'_, '_>) -> Result { self.nulls.append_non_null(); if self.typed_value_builder.append_value(&value)? { @@ -227,6 +243,7 @@ impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> { } Ok(true) } + fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option)> { Ok(( self.value_builder.build()?, @@ -236,6 +253,285 @@ impl<'a> VariantToShreddedPrimitiveVariantRowBuilder<'a> { } } +pub(crate) struct VariantToShreddedArrayVariantRowBuilder<'a> { + value_builder: VariantValueArrayBuilder, + typed_value_builder: ArrayVariantToArrowRowBuilder<'a>, +} + +impl<'a> VariantToShreddedArrayVariantRowBuilder<'a> { + fn try_new( + data_type: &'a DataType, + cast_options: &'a CastOptions, + capacity: usize, + ) -> Result { + Ok(Self { + value_builder: VariantValueArrayBuilder::new(capacity), + typed_value_builder: ArrayVariantToArrowRowBuilder::try_new( + data_type, + cast_options, + capacity, + )?, + }) + } + + fn append_null(&mut self) -> Result<()> { + self.value_builder.append_value(Variant::Null); + self.typed_value_builder.append_null(); + Ok(()) + } + + fn append_value(&mut self, variant: Variant<'_, '_>) -> Result { + // If the variant is not an array, typed_value must be null. + // If the variant is an array, value must be null. + match variant { + Variant::List(list) => { + self.value_builder.append_null(); + self.typed_value_builder.append_value(list)?; + Ok(true) + } + other => { + self.value_builder.append_value(other); + self.typed_value_builder.append_null(); + Ok(false) + } + } + } + + fn finish(self) -> Result<(BinaryViewArray, ArrayRef, Option)> { + Ok(( + self.value_builder.build()?, + self.typed_value_builder.finish()?, + // All elements of an array must be present (not missing) because + // the array Variant encoding does not allow missing elements + None, + )) + } +} + +enum ArrayVariantToArrowRowBuilder<'a> { + List(VariantToListArrowRowBuilder<'a, i32>), + LargeList(VariantToListArrowRowBuilder<'a, i64>), + ListView(VariantToListViewArrowRowBuilder<'a, i32>), + LargeListView(VariantToListViewArrowRowBuilder<'a, i64>), +} + +impl<'a> ArrayVariantToArrowRowBuilder<'a> { + fn try_new( + data_type: &'a DataType, + cast_options: &'a CastOptions, + capacity: usize, + ) -> Result { + use ArrayVariantToArrowRowBuilder::*; + + let builder = match data_type { + DataType::List(field) => List(VariantToListArrowRowBuilder::try_new( + field.clone(), + field.data_type(), + cast_options, + capacity, + )?), + DataType::LargeList(field) => LargeList(VariantToListArrowRowBuilder::try_new( + field.clone(), + field.data_type(), + cast_options, + capacity, + )?), + DataType::ListView(field) => ListView(VariantToListViewArrowRowBuilder::try_new( + field.clone(), + field.data_type(), + cast_options, + capacity, + )?), + DataType::LargeListView(field) => { + LargeListView(VariantToListViewArrowRowBuilder::try_new( + field.clone(), + field.data_type(), + cast_options, + capacity, + )?) + } + other => { + return Err(ArrowError::InvalidArgumentError(format!( + "Casting to {other:?} is not applicable for array Variant types" + ))); + } + }; + Ok(builder) + } + + fn append_null(&mut self) { + match self { + Self::List(builder) => builder.append_null(), + Self::LargeList(builder) => builder.append_null(), + Self::ListView(builder) => builder.append_null(), + Self::LargeListView(builder) => builder.append_null(), + } + } + + fn append_value(&mut self, list: VariantList<'_, '_>) -> Result<()> { + match self { + Self::List(builder) => builder.append_value(list), + Self::LargeList(builder) => builder.append_value(list), + Self::ListView(builder) => builder.append_value(list), + Self::LargeListView(builder) => builder.append_value(list), + } + } + + fn finish(self) -> Result { + match self { + Self::List(builder) => builder.finish(), + Self::LargeList(builder) => builder.finish(), + Self::ListView(builder) => builder.finish(), + Self::LargeListView(builder) => builder.finish(), + } + } +} + +struct VariantToListArrowRowBuilder<'a, O: OffsetSizeTrait + ArrowNativeTypeOp> { + field: FieldRef, + offsets: Vec, + element_builder: Box>, + nulls: NullBufferBuilder, + current_offset: O, +} + +impl<'a, O: OffsetSizeTrait + ArrowNativeTypeOp> VariantToListArrowRowBuilder<'a, O> { + fn try_new( + field: FieldRef, + element_data_type: &'a DataType, + cast_options: &'a CastOptions, + capacity: usize, + ) -> Result { + let mut offsets = Vec::with_capacity(capacity.checked_add(1).ok_or_else(|| { + ArrowError::ComputeError( + "Capacity exceeded usize::MAX when reserving list offsets".to_string(), + ) + })?); + offsets.push(O::ZERO); + let element_builder = make_variant_to_shredded_variant_arrow_row_builder( + element_data_type, + cast_options, + capacity, + false, + )?; + Ok(Self { + field, + offsets, + element_builder: Box::new(element_builder), + nulls: NullBufferBuilder::new(capacity), + current_offset: O::ZERO, + }) + } + + fn append_null(&mut self) { + self.offsets.push(self.current_offset); + self.nulls.append_null(); + } + + fn append_value(&mut self, list: VariantList<'_, '_>) -> Result<()> { + for element in list.iter() { + self.element_builder.append_value(element)?; + self.current_offset = self.current_offset.add_checked(O::ONE)?; + } + self.offsets.push(self.current_offset); + self.nulls.append_non_null(); + Ok(()) + } + + fn finish(mut self) -> Result { + let (value, typed_value, nulls) = self.element_builder.finish()?; + let element_array = + ShreddedVariantFieldArray::from_parts(Some(value), Some(typed_value), nulls); + let field = Arc::new( + self.field + .as_ref() + .clone() + .with_data_type(element_array.data_type().clone()), + ); + let offsets = OffsetBuffer::::new(ScalarBuffer::from(self.offsets)); + let list_array = GenericListArray::::new( + field, + offsets, + ArrayRef::from(element_array), + self.nulls.finish(), + ); + Ok(Arc::new(list_array)) + } +} + +struct VariantToListViewArrowRowBuilder<'a, O: OffsetSizeTrait + ArrowNativeTypeOp> { + field: FieldRef, + offsets: Vec, + sizes: Vec, + element_builder: Box>, + nulls: NullBufferBuilder, + current_offset: O, +} + +impl<'a, O: OffsetSizeTrait + ArrowNativeTypeOp> VariantToListViewArrowRowBuilder<'a, O> { + fn try_new( + field: FieldRef, + element_data_type: &'a DataType, + cast_options: &'a CastOptions, + capacity: usize, + ) -> Result { + let element_builder = make_variant_to_shredded_variant_arrow_row_builder( + element_data_type, + cast_options, + capacity, + false, + )?; + Ok(Self { + field, + offsets: Vec::with_capacity(capacity), + sizes: Vec::with_capacity(capacity), + element_builder: Box::new(element_builder), + nulls: NullBufferBuilder::new(capacity), + current_offset: O::ZERO, + }) + } + + fn append_null(&mut self) { + self.offsets.push(self.current_offset); + self.sizes.push(O::ZERO); + self.nulls.append_null(); + } + + fn append_value(&mut self, list: VariantList<'_, '_>) -> Result<()> { + let start_offset = self.current_offset; + for element in list.iter() { + self.element_builder.append_value(element)?; + self.current_offset = self.current_offset.add_checked(O::ONE)?; + } + self.offsets.push(start_offset); + self.sizes.push(self.current_offset - start_offset); + self.nulls.append_non_null(); + Ok(()) + } + + fn finish(mut self) -> Result { + let (value, typed_value, nulls) = self.element_builder.finish()?; + let element_array = + ShreddedVariantFieldArray::from_parts(Some(value), Some(typed_value), nulls); + let field = Arc::new( + self.field + .as_ref() + .clone() + .with_data_type(element_array.data_type().clone()), + ); + let offsets = ScalarBuffer::from(self.offsets); + let sizes = ScalarBuffer::from(self.sizes); + let list_view_array = GenericListViewArray::::new( + field, + offsets, + sizes, + ArrayRef::from(element_array), + self.nulls.finish(), + ); + Ok(Arc::new(list_view_array)) + } +} + pub(crate) struct VariantToShreddedObjectVariantRowBuilder<'a> { value_builder: VariantValueArrayBuilder, typed_value_builders: IndexMap<&'a str, VariantToShreddedVariantRowBuilder<'a>>, @@ -280,6 +576,7 @@ impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> { } Ok(()) } + fn append_value(&mut self, value: Variant<'_, '_>) -> Result { let Variant::Object(ref obj) = value else { // Not an object => fall back @@ -329,6 +626,7 @@ impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> { self.nulls.append_non_null(); Ok(true) } + fn finish(mut self) -> Result<(BinaryViewArray, ArrayRef, Option)> { let mut builder = StructArrayBuilder::new(); for (field_name, typed_value_builder) in self.typed_value_builders { @@ -352,12 +650,271 @@ impl<'a> VariantToShreddedObjectVariantRowBuilder<'a> { mod tests { use super::*; use crate::VariantArrayBuilder; - use arrow::array::{Array, FixedSizeBinaryArray, Float64Array, Int64Array}; - use arrow::datatypes::{DataType, Field, Fields, TimeUnit, UnionFields, UnionMode}; - use parquet_variant::{ObjectBuilder, ReadOnlyMetadataBuilder, Variant, VariantBuilder}; + use crate::arrow_to_variant::ListLikeArray; + use arrow::array::{ + Array, BinaryViewArray, FixedSizeBinaryArray, Float64Array, GenericListArray, + GenericListViewArray, Int64Array, ListArray, OffsetSizeTrait, PrimitiveArray, StringArray, + }; + use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, Fields, Int64Type, TimeUnit, UnionFields, UnionMode, + }; + use parquet_variant::{ + BuilderSpecificState, EMPTY_VARIANT_METADATA_BYTES, ObjectBuilder, ReadOnlyMetadataBuilder, + Variant, VariantBuilder, + }; use std::sync::Arc; use uuid::Uuid; + #[derive(Clone)] + enum VariantValue<'a> { + Value(Variant<'a, 'a>), + List(Vec>), + Object(Vec<(&'a str, VariantValue<'a>)>), + Null, + } + + impl<'a, T> From for VariantValue<'a> + where + T: Into>, + { + fn from(value: T) -> Self { + Self::Value(value.into()) + } + } + + #[derive(Clone)] + enum VariantRow<'a> { + Value(VariantValue<'a>), + List(Vec>), + Object(Vec<(&'a str, VariantValue<'a>)>), + Null, + } + + fn build_variant_array(rows: Vec>) -> VariantArray { + let mut builder = VariantArrayBuilder::new(rows.len()); + + fn append_variant_value(builder: &mut B, value: VariantValue) { + match value { + VariantValue::Value(v) => builder.append_value(v), + VariantValue::List(values) => { + let mut list = builder.new_list(); + for v in values { + append_variant_value(&mut list, v); + } + list.finish(); + } + VariantValue::Object(fields) => { + let mut object = builder.new_object(); + for (name, value) in fields { + append_variant_field(&mut object, name, value); + } + object.finish(); + } + VariantValue::Null => builder.append_null(), + } + } + + fn append_variant_field<'a, S: BuilderSpecificState>( + object: &mut ObjectBuilder<'_, S>, + name: &'a str, + value: VariantValue<'a>, + ) { + match value { + VariantValue::Value(v) => { + object.insert(name, v); + } + VariantValue::List(values) => { + let mut list = object.new_list(name); + for v in values { + append_variant_value(&mut list, v); + } + list.finish(); + } + VariantValue::Object(fields) => { + let mut nested = object.new_object(name); + for (field_name, v) in fields { + append_variant_field(&mut nested, field_name, v); + } + nested.finish(); + } + VariantValue::Null => { + object.insert(name, Variant::Null); + } + } + } + + rows.into_iter().for_each(|row| match row { + VariantRow::Value(value) => append_variant_value(&mut builder, value), + VariantRow::List(values) => { + let mut list = builder.new_list(); + for value in values { + append_variant_value(&mut list, value); + } + list.finish(); + } + VariantRow::Object(fields) => { + let mut object = builder.new_object(); + for (name, value) in fields { + append_variant_field(&mut object, name, value); + } + object.finish(); + } + VariantRow::Null => builder.append_null(), + }); + builder.build() + } + + fn downcast_list_like_array( + array: &VariantArray, + ) -> &dyn ListLikeArray { + let typed_value = array.typed_value_field().unwrap(); + if let Some(list) = typed_value.as_any().downcast_ref::>() { + list + } else if let Some(list_view) = typed_value + .as_any() + .downcast_ref::>() + { + list_view + } else { + panic!( + "Expected list-like typed_value with matching offset type, got {}", + typed_value.data_type() + ); + } + } + + fn assert_list_structure( + array: &VariantArray, + expected_len: usize, + expected_offsets: &[O], + expected_sizes: &[Option], + expected_fallbacks: &[Option>], + ) { + assert_eq!(array.len(), expected_len); + + let fallbacks = (array.value_field().unwrap(), Some(array.metadata_field())); + let array = downcast_list_like_array::(array); + + assert_eq!( + array.value_offsets().unwrap(), + expected_offsets, + "list offsets mismatch" + ); + assert_eq!( + array.len(), + expected_sizes.len(), + "expected_sizes should match array length" + ); + assert_eq!( + array.len(), + expected_fallbacks.len(), + "expected_fallbacks should match array length" + ); + assert_eq!( + array.len(), + fallbacks.0.len(), + "fallbacks value field should match array length" + ); + + // Validate per-row shredding outcomes for the list array + for (idx, (expected_size, expected_fallback)) in expected_sizes + .iter() + .zip(expected_fallbacks.iter()) + .enumerate() + { + match expected_size { + Some(len) => { + // Successfully shredded: typed list value present, no fallback value + assert!(array.is_valid(idx)); + assert_eq!(array.value_size(idx), *len); + assert!(fallbacks.0.is_null(idx)); + } + None => { + // Unable to shred: typed list value absent, fallback should carry the variant + assert!(array.is_null(idx)); + assert_eq!(array.value_size(idx), O::zero()); + match expected_fallback { + Some(expected_variant) => { + assert!(fallbacks.0.is_valid(idx)); + let metadata_bytes = fallbacks + .1 + .filter(|m| m.is_valid(idx)) + .map(|m| m.value(idx)) + .filter(|bytes| !bytes.is_empty()) + .unwrap_or(EMPTY_VARIANT_METADATA_BYTES); + assert_eq!( + Variant::new(metadata_bytes, fallbacks.0.value(idx)), + expected_variant.clone() + ); + } + None => unreachable!(), + } + } + } + } + } + + fn assert_list_structure_and_elements( + array: &VariantArray, + expected_len: usize, + expected_offsets: &[O], + expected_sizes: &[Option], + expected_fallbacks: &[Option>], + expected_shredded_elements: (&[Option], &[Option>]), + ) { + assert_list_structure( + array, + expected_len, + expected_offsets, + expected_sizes, + expected_fallbacks, + ); + let array = downcast_list_like_array::(array); + + // Validate the shredded state of list elements (typed values and fallbacks) + let (expected_values, expected_fallbacks) = expected_shredded_elements; + assert_eq!( + expected_values.len(), + expected_fallbacks.len(), + "expected_values and expected_fallbacks should be aligned" + ); + + // Validate the shredded primitive values for list elements + let element_array = ShreddedVariantFieldArray::try_new(array.values().as_ref()).unwrap(); + let element_values = element_array + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::>() + .unwrap(); + assert_eq!(element_values.len(), expected_values.len()); + for (idx, expected_value) in expected_values.iter().enumerate() { + match expected_value { + Some(value) => { + assert!(element_values.is_valid(idx)); + assert_eq!(element_values.value(idx), *value); + } + None => assert!(element_values.is_null(idx)), + } + } + + // Validate fallback variants for list elements that could not be shredded + let element_fallbacks = element_array.value_field().unwrap(); + assert_eq!(element_fallbacks.len(), expected_fallbacks.len()); + for (idx, expected_fallback) in expected_fallbacks.iter().enumerate() { + match expected_fallback { + Some(expected_variant) => { + assert!(element_fallbacks.is_valid(idx)); + assert_eq!( + Variant::new(EMPTY_VARIANT_METADATA_BYTES, element_fallbacks.value(idx)), + expected_variant.clone() + ); + } + None => assert!(element_fallbacks.is_null(idx)), + } + } + } + #[test] fn test_already_shredded_input_error() { // Create a VariantArray that already has typed_value_field @@ -389,13 +946,6 @@ mod tests { assert!(result.typed_value_field().is_none()); } - #[test] - fn test_unsupported_list_schema() { - let input = VariantArray::from_iter([Variant::from(42)]); - let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Int64, true))); - shred_variant(&input, &list_schema).expect_err("unsupported"); - } - #[test] fn test_invalid_fixed_size_binary_shredding() { let mock_uuid_1 = Uuid::new_v4(); @@ -617,54 +1167,391 @@ mod tests { } #[test] - fn test_object_shredding_comprehensive() { - let mut builder = VariantArrayBuilder::new(7); + fn test_array_shredding_as_list() { + let input = build_variant_array(vec![ + // Row 0: List of ints should shred entirely into typed_value + VariantRow::List(vec![ + VariantValue::from(1i64), + VariantValue::from(2i64), + VariantValue::from(3i64), + ]), + // Row 1: Contains incompatible types so values fall back + VariantRow::List(vec![ + VariantValue::from(1i64), + VariantValue::from("two"), + VariantValue::from(Variant::Null), + ]), + // Row 2: Not a list -> entire row falls back + VariantRow::Value(VariantValue::from("not a list")), + // Row 3: Array-level null propagates + VariantRow::Null, + // Row 4: Empty list exercises zero-length offsets + VariantRow::List(vec![]), + ]); + let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Int64, true))); + let result = shred_variant(&input, &list_schema).unwrap(); + assert_eq!(result.len(), 5); - // Row 0: Fully shredded object - builder - .new_object() - .with_field("score", 95.5f64) - .with_field("age", 30i64) - .finish(); + assert_list_structure_and_elements::( + &result, + 5, + &[0, 3, 6, 6, 6, 6], + &[Some(3), Some(3), None, None, Some(0)], + &[ + None, + None, + Some(Variant::from("not a list")), + Some(Variant::Null), + None, + ], + ( + &[Some(1), Some(2), Some(3), Some(1), None, None], + &[ + None, + None, + None, + None, + Some(Variant::from("two")), + Some(Variant::Null), + ], + ), + ); + } - // Row 1: Partially shredded object (extra email field) - builder - .new_object() - .with_field("score", 87.2f64) - .with_field("age", 25i64) - .with_field("email", "bob@example.com") - .finish(); + #[test] + fn test_array_shredding_as_large_list() { + let input = build_variant_array(vec![ + // Row 0: List of ints shreds to typed_value + VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]), + // Row 1: Not a list -> entire row falls back + VariantRow::Value(VariantValue::from("not a list")), + // Row 2: Empty list + VariantRow::List(vec![]), + ]); + let list_schema = DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true))); + let result = shred_variant(&input, &list_schema).unwrap(); + assert_eq!(result.len(), 3); - // Row 2: Missing field (no score) - builder.new_object().with_field("age", 35i64).finish(); + assert_list_structure_and_elements::( + &result, + 3, + &[0, 2, 2, 2], + &[Some(2), None, Some(0)], + &[None, Some(Variant::from("not a list")), None], + (&[Some(1), Some(2)], &[None, None]), + ); + } - // Row 3: Type mismatch (score is string, age is string) - builder - .new_object() - .with_field("score", "ninety-five") - .with_field("age", "thirty") - .finish(); + #[test] + fn test_array_shredding_as_list_view() { + let input = build_variant_array(vec![ + // Row 0: Standard list + VariantRow::List(vec![ + VariantValue::from(1i64), + VariantValue::from(2i64), + VariantValue::from(3i64), + ]), + // Row 1: List with incompatible types -> element fallback + VariantRow::List(vec![ + VariantValue::from(1i64), + VariantValue::from("two"), + VariantValue::from(Variant::Null), + ]), + // Row 2: Not a list -> top-level fallback + VariantRow::Value(VariantValue::from("not a list")), + // Row 3: Top-level Null + VariantRow::Null, + // Row 4: Empty list + VariantRow::List(vec![]), + ]); + let list_schema = DataType::ListView(Arc::new(Field::new("item", DataType::Int64, true))); + let result = shred_variant(&input, &list_schema).unwrap(); + assert_eq!(result.len(), 5); - // Row 4: Non-object - builder.append_variant(Variant::from("not an object")); + assert_list_structure_and_elements::( + &result, + 5, + &[0, 3, 6, 6, 6], + &[Some(3), Some(3), None, None, Some(0)], + &[ + None, + None, + Some(Variant::from("not a list")), + Some(Variant::Null), + None, + ], + ( + &[Some(1), Some(2), Some(3), Some(1), None, None], + &[ + None, + None, + None, + None, + Some(Variant::from("two")), + Some(Variant::Null), + ], + ), + ); + } - // Row 5: Empty object - builder.new_object().finish(); + #[test] + fn test_array_shredding_as_large_list_view() { + let input = build_variant_array(vec![ + // Row 0: List of ints shreds to typed_value + VariantRow::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]), + // Row 1: Not a list -> entire row falls back + VariantRow::Value(VariantValue::from("fallback")), + // Row 2: Empty list + VariantRow::List(vec![]), + ]); + let list_schema = + DataType::LargeListView(Arc::new(Field::new("item", DataType::Int64, true))); + let result = shred_variant(&input, &list_schema).unwrap(); + assert_eq!(result.len(), 3); - // Row 6: Null - builder.append_null(); + assert_list_structure_and_elements::( + &result, + 3, + &[0, 2, 2], + &[Some(2), None, Some(0)], + &[None, Some(Variant::from("fallback")), None], + (&[Some(1), Some(2)], &[None, None]), + ); + } - // Row 7: Object with only "wrong" fields - builder.new_object().with_field("foo", 10).finish(); + #[test] + fn test_array_shredding_with_array_elements() { + let input = build_variant_array(vec![ + // Row 0: [[1, 2], [3, 4], []] - clean nested lists + VariantRow::List(vec![ + VariantValue::List(vec![VariantValue::from(1i64), VariantValue::from(2i64)]), + VariantValue::List(vec![VariantValue::from(3i64), VariantValue::from(4i64)]), + VariantValue::List(vec![]), + ]), + // Row 1: [[5, "bad", null], "not a list inner", null] - inner fallbacks + VariantRow::List(vec![ + VariantValue::List(vec![ + VariantValue::from(5i64), + VariantValue::from("bad"), + VariantValue::from(Variant::Null), + ]), + VariantValue::from("not a list inner"), + VariantValue::Null, + ]), + // Row 2: "not a list" - top-level fallback + VariantRow::Value(VariantValue::from("not a list")), + // Row 3: null row + VariantRow::Null, + ]); + let inner_field = Arc::new(Field::new("item", DataType::Int64, true)); + let inner_list_schema = DataType::List(inner_field); + let list_schema = DataType::List(Arc::new(Field::new( + "item", + inner_list_schema.clone(), + true, + ))); + let result = shred_variant(&input, &list_schema).unwrap(); + assert_eq!(result.len(), 4); - // Row 8: Object with one "right" and one "wrong" field - builder - .new_object() - .with_field("score", 66.67f64) - .with_field("foo", 10) - .finish(); + let typed_value = result + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); - let input = builder.build(); + assert_list_structure::( + &result, + 4, + &[0, 3, 6, 6, 6], + &[Some(3), Some(3), None, None], + &[ + None, + None, + Some(Variant::from("not a list")), + Some(Variant::Null), + ], + ); + + let outer_elements = + ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap(); + assert_eq!(outer_elements.len(), 6); + let outer_values = outer_elements + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let outer_fallbacks = outer_elements.value_field().unwrap(); + + let outer_metadata = BinaryViewArray::from_iter_values(std::iter::repeat_n( + EMPTY_VARIANT_METADATA_BYTES, + outer_elements.len(), + )); + let outer_variant = VariantArray::from_parts( + outer_metadata, + Some(outer_fallbacks.clone()), + Some(Arc::new(outer_values.clone())), + None, + ); + + assert_list_structure_and_elements::( + &outer_variant, + outer_elements.len(), + &[0, 2, 4, 4, 7, 7, 7], + &[Some(2), Some(2), Some(0), Some(3), None, None], + &[ + None, + None, + None, + None, + Some(Variant::from("not a list inner")), + Some(Variant::Null), + ], + ( + &[Some(1), Some(2), Some(3), Some(4), Some(5), None, None], + &[ + None, + None, + None, + None, + None, + Some(Variant::from("bad")), + Some(Variant::Null), + ], + ), + ); + } + + #[test] + fn test_array_shredding_with_object_elements() { + let input = build_variant_array(vec![ + // Row 0: [{"id": 1, "name": "Alice"}, {"id": null}] fully shards + VariantRow::List(vec![ + VariantValue::Object(vec![ + ("id", VariantValue::from(1i64)), + ("name", VariantValue::from("Alice")), + ]), + VariantValue::Object(vec![("id", VariantValue::from(Variant::Null))]), + ]), + // Row 1: "not a list" -> fallback + VariantRow::Value(VariantValue::from("not a list")), + // Row 2: Null row + VariantRow::Null, + ]); + + // Target schema is List> + let object_fields = Fields::from(vec![ + Field::new("id", DataType::Int64, true), + Field::new("name", DataType::Utf8, true), + ]); + let list_schema = DataType::List(Arc::new(Field::new( + "item", + DataType::Struct(object_fields), + true, + ))); + let result = shred_variant(&input, &list_schema).unwrap(); + assert_eq!(result.len(), 3); + + assert_list_structure::( + &result, + 3, + &[0, 2, 2, 2], + &[Some(2), None, None], + &[None, Some(Variant::from("not a list")), Some(Variant::Null)], + ); + + // Validate nested struct fields for each element + let typed_value = result + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let element_array = + ShreddedVariantFieldArray::try_new(typed_value.values().as_ref()).unwrap(); + assert_eq!(element_array.len(), 2); + let element_objects = element_array + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + // Id field [1, Variant::Null] + let id_field = + ShreddedVariantFieldArray::try_new(element_objects.column_by_name("id").unwrap()) + .unwrap(); + let id_values = id_field.value_field().unwrap(); + let id_typed_values = id_field + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(id_values.is_null(0)); + assert_eq!(id_typed_values.value(0), 1); + // null is stored as Variant::Null in values + assert!(id_values.is_valid(1)); + assert_eq!( + Variant::new(EMPTY_VARIANT_METADATA_BYTES, id_values.value(1)), + Variant::Null + ); + assert!(id_typed_values.is_null(1)); + + // Name field ["Alice", null] + let name_field = + ShreddedVariantFieldArray::try_new(element_objects.column_by_name("name").unwrap()) + .unwrap(); + let name_values = name_field.value_field().unwrap(); + let name_typed_values = name_field + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(name_values.is_null(0)); + assert_eq!(name_typed_values.value(0), "Alice"); + // No value provided, both value and typed_value are null + assert!(name_values.is_null(1)); + assert!(name_typed_values.is_null(1)); + } + + #[test] + fn test_object_shredding_comprehensive() { + let input = build_variant_array(vec![ + // Row 0: Fully shredded object + VariantRow::Object(vec![ + ("score", VariantValue::from(95.5f64)), + ("age", VariantValue::from(30i64)), + ]), + // Row 1: Partially shredded object (extra email field) + VariantRow::Object(vec![ + ("score", VariantValue::from(87.2f64)), + ("age", VariantValue::from(25i64)), + ("email", VariantValue::from("bob@example.com")), + ]), + // Row 2: Missing field (no score) + VariantRow::Object(vec![("age", VariantValue::from(35i64))]), + // Row 3: Type mismatch (score is string, age is string) + VariantRow::Object(vec![ + ("score", VariantValue::from("ninety-five")), + ("age", VariantValue::from("thirty")), + ]), + // Row 4: Non-object + VariantRow::Value(VariantValue::from("not an object")), + // Row 5: Empty object + VariantRow::Object(vec![]), + // Row 6: Null + VariantRow::Null, + // Row 7: Object with only "wrong" fields + VariantRow::Object(vec![("foo", VariantValue::from(10))]), + // Row 8: Object with one "right" and one "wrong" field + VariantRow::Object(vec![ + ("score", VariantValue::from(66.67f64)), + ("foo", VariantValue::from(10)), + ]), + ]); // Create target schema: struct // Both types are supported for shredding @@ -979,17 +1866,106 @@ mod tests { ); } + #[test] + fn test_object_shredding_with_array_field() { + let input = build_variant_array(vec![ + // Row 0: Object with well-typed scores list + VariantRow::Object(vec![( + "scores", + VariantValue::List(vec![VariantValue::from(10i64), VariantValue::from(20i64)]), + )]), + // Row 1: Object whose scores list contains incompatible type + VariantRow::Object(vec![( + "scores", + VariantValue::List(vec![ + VariantValue::from("oops"), + VariantValue::from(Variant::Null), + ]), + )]), + // Row 2: Object missing the scores field entirely + VariantRow::Object(vec![]), + // Row 3: Non-object fallback + VariantRow::Value(VariantValue::from("not an object")), + // Row 4: Top-level Null + VariantRow::Null, + ]); + let list_field = Arc::new(Field::new("item", DataType::Int64, true)); + let inner_list_schema = DataType::List(list_field); + let schema = DataType::Struct(Fields::from(vec![Field::new( + "scores", + inner_list_schema.clone(), + true, + )])); + + let result = shred_variant(&input, &schema).unwrap(); + assert_eq!(result.len(), 5); + + // Access base value/typed_value columns + let value_field = result.value_field().unwrap(); + let typed_struct = result + .typed_value_field() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + // Validate base value fallbacks for non-object rows + assert!(value_field.is_null(0)); + assert!(value_field.is_null(1)); + assert!(value_field.is_null(2)); + assert!(value_field.is_valid(3)); + assert_eq!( + Variant::new(result.metadata_field().value(3), value_field.value(3)), + Variant::from("not an object") + ); + assert!(value_field.is_null(4)); + + // Typed struct should only be null for the fallback row + assert!(typed_struct.is_valid(0)); + assert!(typed_struct.is_valid(1)); + assert!(typed_struct.is_valid(2)); + assert!(typed_struct.is_null(3)); + assert!(typed_struct.is_null(4)); + + // Drill into the scores field on the typed struct + let scores_field = + ShreddedVariantFieldArray::try_new(typed_struct.column_by_name("scores").unwrap()) + .unwrap(); + assert_list_structure_and_elements::( + &VariantArray::from_parts( + BinaryViewArray::from_iter_values(std::iter::repeat_n( + EMPTY_VARIANT_METADATA_BYTES, + scores_field.len(), + )), + Some(scores_field.value_field().unwrap().clone()), + Some(scores_field.typed_value_field().unwrap().clone()), + None, + ), + scores_field.len(), + &[0i32, 2, 4, 4, 4, 4], + &[Some(2), Some(2), None, None, None], + &[ + None, + None, + Some(Variant::Null), + Some(Variant::Null), + Some(Variant::Null), + ], + ( + &[Some(10), Some(20), None, None], + &[None, None, Some(Variant::from("oops")), Some(Variant::Null)], + ), + ); + } + #[test] fn test_object_different_schemas() { // Create object with multiple fields - let mut builder = VariantArrayBuilder::new(1); - builder - .new_object() - .with_field("id", 123i32) - .with_field("age", 25i64) - .with_field("score", 95.5f64) - .finish(); - let input = builder.build(); + let input = build_variant_array(vec![VariantRow::Object(vec![ + ("id", VariantValue::from(123i32)), + ("age", VariantValue::from(25i64)), + ("score", VariantValue::from(95.5f64)), + ])]); // Test with schema containing only id field let schema1 = DataType::Struct(Fields::from(vec![Field::new("id", DataType::Int32, true)])); @@ -1023,44 +1999,33 @@ mod tests { let mock_uuid_2 = Uuid::new_v4(); let mock_uuid_3 = Uuid::new_v4(); - let mut builder = VariantArrayBuilder::new(6); - - // Row 0: Fully shredded object with both UUID fields - builder - .new_object() - .with_field("id", mock_uuid_1) - .with_field("session_id", mock_uuid_2) - .finish(); - - // Row 1: Partially shredded object - UUID fields plus extra field - builder - .new_object() - .with_field("id", mock_uuid_2) - .with_field("session_id", mock_uuid_3) - .with_field("name", "test_user") - .finish(); - - // Row 2: Missing UUID field (no session_id) - builder.new_object().with_field("id", mock_uuid_1).finish(); - - // Row 3: Type mismatch - id is UUID but session_id is a string - builder - .new_object() - .with_field("id", mock_uuid_3) - .with_field("session_id", "not-a-uuid") - .finish(); - - // Row 4: Object with non-UUID value in id field - builder - .new_object() - .with_field("id", 12345i64) - .with_field("session_id", mock_uuid_1) - .finish(); - - // Row 5: Null - builder.append_null(); - - let input = builder.build(); + let input = build_variant_array(vec![ + // Row 0: Fully shredded object with both UUID fields + VariantRow::Object(vec![ + ("id", VariantValue::from(mock_uuid_1)), + ("session_id", VariantValue::from(mock_uuid_2)), + ]), + // Row 1: Partially shredded object - UUID fields plus extra field + VariantRow::Object(vec![ + ("id", VariantValue::from(mock_uuid_2)), + ("session_id", VariantValue::from(mock_uuid_3)), + ("name", VariantValue::from("test_user")), + ]), + // Row 2: Missing UUID field (no session_id) + VariantRow::Object(vec![("id", VariantValue::from(mock_uuid_1))]), + // Row 3: Type mismatch - id is UUID but session_id is a string + VariantRow::Object(vec![ + ("id", VariantValue::from(mock_uuid_3)), + ("session_id", VariantValue::from("not-a-uuid")), + ]), + // Row 4: Object with non-UUID value in id field + VariantRow::Object(vec![ + ("id", VariantValue::from(12345i64)), + ("session_id", VariantValue::from(mock_uuid_1)), + ]), + // Row 5: Null + VariantRow::Null, + ]); let fields = Fields::from(vec![ Field::new("id", DataType::FixedSizeBinary(16), true),