diff --git a/arrow-array/Cargo.toml b/arrow-array/Cargo.toml index 8ebe21c70772..06dda5c63cdd 100644 --- a/arrow-array/Cargo.toml +++ b/arrow-array/Cargo.toml @@ -54,6 +54,7 @@ all-features = true [features] ffi = ["arrow-schema/ffi", "arrow-data/ffi"] force_validate = [] +pool = ["arrow-buffer/pool", "arrow-data/pool"] [dev-dependencies] rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] } diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index 5fdfb9fb2244..deec5a96684c 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -336,6 +336,34 @@ pub trait Array: std::fmt::Debug + Send + Sync { /// This value will always be greater than returned by `get_buffer_memory_size()` and /// includes the overhead of the data structures that contain the pointers to the various buffers. fn get_array_memory_size(&self) -> usize; + + /// Claim memory used by this array in the provided memory pool. + /// + /// This recursively claims memory for: + /// - All data buffers in this array + /// - All child arrays (for nested types like List, Struct, etc.) + /// - The null bitmap buffer if present + /// + /// This method guarantees that the memory pool will only compute occupied memory + /// exactly once. For example, if this array is derived from operations like `slice`, + /// calling `claim` on it would not change the memory pool's usage if the underlying buffers + /// are already counted before. + /// + /// # Example + /// ``` + /// # use arrow_array::{Int32Array, Array}; + /// # use arrow_buffer::TrackingMemoryPool; + /// + /// let array = Int32Array::from(vec![1, 2, 3, 4, 5]); + /// let pool = TrackingMemoryPool::default(); + /// + /// // Claim the array's memory in the pool + /// array.claim(&pool); + /// ``` + #[cfg(feature = "pool")] + fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) { + self.to_data().claim(pool) + } } /// A reference-counted reference to a generic `Array` @@ -419,6 +447,11 @@ impl Array for ArrayRef { fn get_array_memory_size(&self) -> usize { self.as_ref().get_array_memory_size() } + + #[cfg(feature = "pool")] + fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) { + self.as_ref().claim(pool) + } } impl Array for &T { @@ -489,6 +522,11 @@ impl Array for &T { fn get_array_memory_size(&self) -> usize { T::get_array_memory_size(self) } + + #[cfg(feature = "pool")] + fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) { + T::claim(self, pool) + } } /// A generic trait for accessing the values of an [`Array`] diff --git a/arrow-buffer/src/buffer/null.rs b/arrow-buffer/src/buffer/null.rs index e5e3a610ead2..ce5f4bfdbca3 100644 --- a/arrow-buffer/src/buffer/null.rs +++ b/arrow-buffer/src/buffer/null.rs @@ -221,6 +221,13 @@ impl NullBuffer { pub fn buffer(&self) -> &Buffer { self.buffer.inner() } + + /// Claim memory used by this null buffer in the provided memory pool. + #[cfg(feature = "pool")] + pub fn claim(&self, pool: &dyn crate::MemoryPool) { + // NullBuffer wraps a BooleanBuffer which wraps a Buffer + self.buffer.inner().claim(pool); + } } impl<'a> IntoIterator for &'a NullBuffer { diff --git a/arrow-data/Cargo.toml b/arrow-data/Cargo.toml index fbed24fea1fa..ec5f6b1aa032 100644 --- a/arrow-data/Cargo.toml +++ b/arrow-data/Cargo.toml @@ -39,6 +39,8 @@ bench = false force_validate = [] # Enable ffi support ffi = ["arrow-schema/ffi"] +# Enable memory pool support +pool = ["arrow-buffer/pool"] [package.metadata.docs.rs] all-features = true diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index fca19bc3aafe..0776168a9d06 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -1572,6 +1572,30 @@ impl ArrayData { pub fn into_builder(self) -> ArrayDataBuilder { self.into() } + + /// Claim memory used by this ArrayData in the provided memory pool. + /// + /// This claims memory for: + /// - All buffers in self.buffers + /// - All child ArrayData recursively + /// - The null buffer if present + #[cfg(feature = "pool")] + pub fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) { + // Claim all data buffers + for buffer in &self.buffers { + buffer.claim(pool); + } + + // Claim null buffer if present + if let Some(nulls) = &self.nulls { + nulls.claim(pool); + } + + // Recursively claim child data + for child in &self.child_data { + child.claim(pool); + } + } } /// Return the expected [`DataTypeLayout`] Arrays of this data diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 0be22561a50c..a88eba7678f6 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -81,6 +81,8 @@ force_validate = ["arrow-array/force_validate", "arrow-data/force_validate"] ffi = ["arrow-schema/ffi", "arrow-data/ffi", "arrow-array/ffi"] chrono-tz = ["arrow-array/chrono-tz"] canonical_extension_types = ["arrow-schema/canonical_extension_types"] +# Enable memory tracking support +pool = ["arrow-array/pool"] [dev-dependencies] chrono = { workspace = true } @@ -114,6 +116,11 @@ name = "zero_copy_ipc" required-features = ["prettyprint"] path = "examples/zero_copy_ipc.rs" +[[example]] +name = "memory_tracking" +required-features = ["pool"] +path = "examples/memory_tracking.rs" + [[bench]] name = "aggregate_kernels" harness = false diff --git a/arrow/examples/memory_tracking.rs b/arrow/examples/memory_tracking.rs new file mode 100644 index 000000000000..70e8a7a96272 --- /dev/null +++ b/arrow/examples/memory_tracking.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Example demonstrating the Array memory tracking functionality + +use arrow_array::{Array, Int32Array, ListArray}; +use arrow_buffer::{MemoryPool, TrackingMemoryPool}; +use arrow_schema::{DataType, Field}; +use std::sync::Arc; + +fn main() { + let pool = TrackingMemoryPool::default(); + + println!("Arrow Array Memory Tracking Example"); + println!("==================================="); + + // Basic array memory tracking + let array = Int32Array::from(vec![1, 2, 3, 4, 5]); + array.claim(&pool); + println!("Int32Array (5 elements): {} bytes", pool.used()); + + // Nested array (recursive tracking) + let offsets = arrow_buffer::OffsetBuffer::new(vec![0, 2, 4].into()); + let field = Arc::new(Field::new("item", DataType::Int32, false)); + let list_array = ListArray::new(field, offsets, Arc::new(array), None); + + let before_list = pool.used(); + list_array.claim(&pool); + let after_list = pool.used(); + println!("ListArray (nested): +{} bytes", after_list - before_list); + + // No double-counting for derived arrays + let large_array = Int32Array::from((0..1000).collect::>()); + large_array.claim(&pool); + let original_usage = pool.used(); + println!("Original array (1000 elements): {original_usage} bytes"); + + // Create and claim slices - should not increase memory usage + let slice1 = large_array.slice(0, 100); + let slice2 = large_array.slice(500, 200); + + slice1.claim(&pool); + slice2.claim(&pool); + let final_usage = pool.used(); + + println!("After claiming 2 slices: {final_usage} bytes"); + println!( + "Increase: {} bytes (slices share the same buffer!)", + final_usage - original_usage + ); +}