Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
use crate::data::DataBlock;
use crate::encoder::EncodedBatch;
use crate::encodings::logical::list::StructuralListScheduler;
use crate::encodings::logical::map::StructuralMapScheduler;
use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
use crate::format::pb::{self, column_encoding};
Expand Down Expand Up @@ -774,7 +773,7 @@ impl CoreFieldDecoderStrategy {
Ok(Box::new(StructuralListScheduler::new(child_scheduler))
as Box<dyn StructuralFieldScheduler>)
}
DataType::Map(_, keys_sorted) => {
DataType::Map(entries_field, keys_sorted) => {
// TODO: We only support keys_sorted=false for now,
// because converting a rust arrow map field to the python arrow field will
// lose the keys_sorted property.
Expand All @@ -784,14 +783,14 @@ impl CoreFieldDecoderStrategy {
location: location!(),
});
}
let entries_child = field
.children
.first()
.expect("Map field must have an entries child");
let child_scheduler =
self.create_structural_field_scheduler(entries_child, column_infos)?;
Ok(Box::new(StructuralMapScheduler::new(child_scheduler))
as Box<dyn StructuralFieldScheduler>)

let list_field = Field::try_from(ArrowField::new(
field.name.clone(),
Copy link
Contributor

@xloya xloya Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the name of entries_field here?

DataType::List(entries_field.clone()),
field.nullable,
))?;

self.create_structural_field_scheduler(&list_field, column_infos)
}
_ => todo!("create_structural_field_scheduler for {}", data_type),
}
Expand Down
182 changes: 62 additions & 120 deletions rust/lance-encoding/src/encodings/logical/map.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,32 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{ops::Range, sync::Arc};
use std::sync::Arc;

use arrow_array::{Array, ArrayRef, MapArray};
use arrow_array::{Array, ArrayRef, ListArray, MapArray};
use arrow_schema::DataType;
use futures::future::BoxFuture;
use lance_arrow::deepcopy::deep_copy_nulls;
use lance_core::{Error, Result};
use snafu::location;

use crate::{
decoder::{
DecodedArray, FilterExpression, ScheduledScanLine, SchedulerContext,
StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
StructuralSchedulingJob,
},
decoder::{DecodedArray, StructuralDecodeArrayTask, StructuralFieldDecoder},
encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
encodings::logical::list::ListStructuralEncoder,
repdef::RepDefBuilder,
};

/// A structural encoder for map fields
///
/// Map in Arrow is represented as List<Struct<key, value>>
/// The map's offsets are added to the rep/def builder
/// and the map's entries (struct array) are passed to the child encoder
/// This encoder uses the [`ListStructuralEncoder`] to encode the data
pub struct MapStructuralEncoder {
keep_original_array: bool,
child: Box<dyn FieldEncoder>,
list_encoder: ListStructuralEncoder,
}

impl MapStructuralEncoder {
pub fn new(keep_original_array: bool, child: Box<dyn FieldEncoder>) -> Self {
Self {
keep_original_array,
child,
list_encoder: ListStructuralEncoder::new(keep_original_array, child),
}
}
}
Expand All @@ -44,7 +36,7 @@ impl FieldEncoder for MapStructuralEncoder {
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
mut repdef: RepDefBuilder,
repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
Expand All @@ -53,115 +45,63 @@ impl FieldEncoder for MapStructuralEncoder {
.downcast_ref::<MapArray>()
.expect("MapEncoder used for non-map data");

// Map internally has offsets and entries (struct array)
let entries = map_array.entries();
let offsets = map_array.offsets();
let (entries_field, offsets, entries, validity, _keys_sorted) =
map_array.clone().into_parts();

// Add offsets to RepDefBuilder to handle nullability and list structure
if self.keep_original_array {
repdef.add_offsets(offsets.clone(), array.nulls().cloned())
} else {
repdef.add_offsets(offsets.clone(), deep_copy_nulls(array.nulls()))
};
let list_array = ListArray::new(entries_field, offsets, Arc::new(entries), validity);

// Pass the entries (struct array) to the child encoder
// Convert to Arc<dyn Array>
let entries_arc: ArrayRef = Arc::new(entries.clone());
self.child
.maybe_encode(entries_arc, external_buffers, repdef, row_number, num_rows)
self.list_encoder.maybe_encode(
Arc::new(list_array),
external_buffers,
repdef,
row_number,
num_rows,
)
}

fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
self.child.flush(external_buffers)
self.list_encoder.flush(external_buffers)
}

fn num_columns(&self) -> u32 {
self.child.num_columns()
self.list_encoder.num_columns()
}

fn finish(
&mut self,
external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
self.child.finish(external_buffers)
}
}

#[derive(Debug)]
pub struct StructuralMapScheduler {
child: Box<dyn StructuralFieldScheduler>,
}

impl StructuralMapScheduler {
pub fn new(child: Box<dyn StructuralFieldScheduler>) -> Self {
Self { child }
self.list_encoder.finish(external_buffers)
}
}

impl StructuralFieldScheduler for StructuralMapScheduler {
fn schedule_ranges<'a>(
&'a self,
ranges: &[Range<u64>],
filter: &FilterExpression,
) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
let child = self.child.schedule_ranges(ranges, filter)?;

Ok(Box::new(StructuralMapSchedulingJob::new(child)))
}

fn initialize<'a>(
&'a mut self,
filter: &'a FilterExpression,
context: &'a SchedulerContext,
) -> BoxFuture<'a, Result<()>> {
self.child.initialize(filter, context)
}
}

/// Scheduling job for map data
/// A structural decoder for map fields
///
/// Scheduling is handled by the child encoder (struct) and nothing special
/// happens here, similar to list.
#[derive(Debug)]
struct StructuralMapSchedulingJob<'a> {
child: Box<dyn StructuralSchedulingJob + 'a>,
}

impl<'a> StructuralMapSchedulingJob<'a> {
fn new(child: Box<dyn StructuralSchedulingJob + 'a>) -> Self {
Self { child }
}
}

impl StructuralSchedulingJob for StructuralMapSchedulingJob<'_> {
fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
self.child.schedule_next(context)
}
}

/// This decoder uses the [`StructuralListDecoder`] to decode the data as a list.
/// It then simply casts the list array to a map array.
#[derive(Debug)]
pub struct StructuralMapDecoder {
child: Box<dyn StructuralFieldDecoder>,
list_decoder: Box<dyn StructuralFieldDecoder>,
data_type: DataType,
}

impl StructuralMapDecoder {
pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
Self { child, data_type }
pub fn new(list_decoder: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
Self {
list_decoder,
data_type,
}
}
}

impl StructuralFieldDecoder for StructuralMapDecoder {
fn accept_page(&mut self, child: crate::decoder::LoadedPageShard) -> Result<()> {
self.child.accept_page(child)
fn accept_page(&mut self, list_decoder: crate::decoder::LoadedPageShard) -> Result<()> {
self.list_decoder.accept_page(list_decoder)
}

fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
let child_task = self.child.drain(num_rows)?;
Ok(Box::new(StructuralMapDecodeTask::new(
child_task,
self.data_type.clone(),
)))
let child_task = self.list_decoder.drain(num_rows)?;
Ok(Box::new(StructuralMapDecodeTask::new(child_task)))
}

fn data_type(&self) -> &DataType {
Expand All @@ -171,49 +111,45 @@ impl StructuralFieldDecoder for StructuralMapDecoder {

#[derive(Debug)]
struct StructuralMapDecodeTask {
child_task: Box<dyn StructuralDecodeArrayTask>,
data_type: DataType,
list_decode_task: Box<dyn StructuralDecodeArrayTask>,
}

impl StructuralMapDecodeTask {
fn new(child_task: Box<dyn StructuralDecodeArrayTask>, data_type: DataType) -> Self {
Self {
child_task,
data_type,
}
fn new(list_decode_task: Box<dyn StructuralDecodeArrayTask>) -> Self {
Self { list_decode_task }
}
}

impl StructuralDecodeArrayTask for StructuralMapDecodeTask {
fn decode(self: Box<Self>) -> Result<DecodedArray> {
let DecodedArray { array, mut repdef } = self.child_task.decode()?;

// Decode the offsets from RepDef
let (offsets, validity) = repdef.unravel_offsets::<i32>()?;
let DecodedArray { array, repdef } = self.list_decode_task.decode()?;

let list_array =
array
.as_any()
.downcast_ref::<ListArray>()
.ok_or_else(|| Error::Schema {
message: format!(
"Expected list array from map's inner decoder, got: {:?}",
array.data_type()
),
location: location!(),
})?;

// Extract the entries field and keys_sorted from the map data type
let (entries_field, keys_sorted) = match &self.data_type {
DataType::Map(field, keys_sorted) => {
if *keys_sorted {
return Err(Error::NotSupported {
source: "Map type decoder does not support keys_sorted=true now"
.to_string()
.into(),
location: location!(),
});
}
(field.clone(), *keys_sorted)
}
let entries_field = match list_array.data_type() {
DataType::List(field) => field.clone(),
_ => {
return Err(Error::Schema {
message: "Map decoder did not have a map field".to_string(),
message: "List array did not have list data type".to_string(),
location: location!(),
});
}
};

// Convert the decoded array to StructArray
let entries = array
let entries = list_array
.values()
.as_any()
.downcast_ref::<arrow_array::StructArray>()
.ok_or_else(|| Error::Schema {
Expand All @@ -223,7 +159,13 @@ impl StructuralDecodeArrayTask for StructuralMapDecodeTask {
.clone();

// Build the MapArray from offsets, entries, validity, and keys_sorted
let map_array = MapArray::new(entries_field, offsets, entries, validity, keys_sorted);
let map_array = MapArray::new(
entries_field,
list_array.offsets().clone(),
entries,
list_array.nulls().cloned(),
false, // keys_sorted is always false at the moment
);

Ok(DecodedArray {
array: Arc::new(map_array),
Expand Down
9 changes: 7 additions & 2 deletions rust/lance-encoding/src/encodings/logical/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,14 @@ impl StructuralStructDecoder {
location: location!(),
});
}
let child_decoder = Self::field_to_decoder(entries_field, should_validate)?;
let list_field = Arc::new(arrow_schema::Field::new(
field.name().clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question about name

DataType::List(entries_field.clone()),
field.is_nullable(),
));
let list_decoder = Self::field_to_decoder(&list_field, should_validate)?;
Ok(Box::new(StructuralMapDecoder::new(
child_decoder,
list_decoder,
field.data_type().clone(),
)))
}
Expand Down
Loading