diff --git a/arrow-array/Cargo.toml b/arrow-array/Cargo.toml index 8ab0bb290e96..e70da909323b 100644 --- a/arrow-array/Cargo.toml +++ b/arrow-array/Cargo.toml @@ -56,6 +56,8 @@ all-features = true [features] ffi = ["arrow-schema/ffi", "arrow-data/ffi"] force_validate = [] +# Enable memory tracking support +pool = ["arrow-buffer/pool", "arrow-data/pool"] [dev-dependencies] rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] } diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index 5ece2457c649..835b8499ea10 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -336,6 +336,75 @@ pub trait Array: std::fmt::Debug + Send + Sync { /// This value will always be greater than returned by `get_buffer_memory_size()` and /// includes the overhead of the data structures that contain the pointers to the various buffers. fn get_array_memory_size(&self) -> usize; + + /// Claim memory used by this array in the provided memory pool. + /// + /// This recursively claims memory for: + /// - All data buffers in this array + /// - All child arrays (for nested types like List, Struct, etc.) + /// - The null bitmap buffer if present + /// + /// This method guarantees that the memory pool will only compute occupied memory + /// exactly once. For example, if this array is derived from operations like `slice`, + /// calling `claim` on it would not change the memory pool's usage if the underlying buffers + /// are already counted before. + /// + /// # Example + /// ``` + /// # use arrow_array::{Int32Array, Array}; + /// # use arrow_buffer::TrackingMemoryPool; + /// # use arrow_buffer::MemoryPool; + /// + /// let pool = TrackingMemoryPool::default(); + /// + /// let small_array = Int32Array::from(vec![1, 2, 3, 4, 5]); + /// let small_array_size = small_array.get_buffer_memory_size(); + /// + /// // Claim the array's memory in the pool + /// small_array.claim(&pool); + /// + /// // Create and claim slices of `small_array`; should not increase memory usage + /// let slice1 = small_array.slice(0, 2); + /// let slice2 = small_array.slice(2, 2); + /// slice1.claim(&pool); + /// slice2.claim(&pool); + /// + /// assert_eq!(pool.used(), small_array_size); + /// + /// // Create a `large_array` which does not derive from the original `small_array` + /// + /// let large_array = Int32Array::from((0..1000).collect::>()); + /// let large_array_size = large_array.get_buffer_memory_size(); + /// + /// large_array.claim(&pool); + /// + /// // Trying to claim more than once is a no-op + /// large_array.claim(&pool); + /// large_array.claim(&pool); + /// + /// assert_eq!(pool.used(), small_array_size + large_array_size); + /// + /// let sum_of_all_sizes = small_array_size + large_array_size + slice1.get_buffer_memory_size() + slice2.get_buffer_memory_size(); + /// + /// // `get_buffer_memory_size` works independently of the memory pool, so a sum of all the + /// // arrays in scope will always be >= the memory used reported by the memory pool. + /// assert_ne!(pool.used(), sum_of_all_sizes); + /// + /// // Until the final claim is dropped the buffer size remains accounted for + /// drop(small_array); + /// drop(slice1); + /// + /// assert_eq!(pool.used(), small_array_size + large_array_size); + /// + /// // Dropping this finally releases the buffer that was backing `small_array` + /// drop(slice2); + /// + /// assert_eq!(pool.used(), large_array_size); + /// ``` + #[cfg(feature = "pool")] + fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) { + self.to_data().claim(pool) + } } /// A reference-counted reference to a generic `Array` diff --git a/arrow-buffer/Cargo.toml b/arrow-buffer/Cargo.toml index 02ea49c37c46..1400c1986361 100644 --- a/arrow-buffer/Cargo.toml +++ b/arrow-buffer/Cargo.toml @@ -36,6 +36,7 @@ bench = false all-features = true [features] +# Enable memory tracking support pool = [] [dependencies] diff --git a/arrow-buffer/src/buffer/null.rs b/arrow-buffer/src/buffer/null.rs index e5e3a610ead2..ce5f4bfdbca3 100644 --- a/arrow-buffer/src/buffer/null.rs +++ b/arrow-buffer/src/buffer/null.rs @@ -221,6 +221,13 @@ impl NullBuffer { pub fn buffer(&self) -> &Buffer { self.buffer.inner() } + + /// Claim memory used by this null buffer in the provided memory pool. + #[cfg(feature = "pool")] + pub fn claim(&self, pool: &dyn crate::MemoryPool) { + // NullBuffer wraps a BooleanBuffer which wraps a Buffer + self.buffer.inner().claim(pool); + } } impl<'a> IntoIterator for &'a NullBuffer { diff --git a/arrow-data/Cargo.toml b/arrow-data/Cargo.toml index 9c7a5206b2f4..9f1b50ed14d9 100644 --- a/arrow-data/Cargo.toml +++ b/arrow-data/Cargo.toml @@ -39,6 +39,8 @@ bench = false force_validate = [] # Enable ffi support ffi = ["arrow-schema/ffi"] +# Enable memory tracking support +pool = ["arrow-buffer/pool"] [package.metadata.docs.rs] all-features = true diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index 329c12bbf425..df2c85db3678 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -1600,6 +1600,30 @@ impl ArrayData { pub fn into_builder(self) -> ArrayDataBuilder { self.into() } + + /// Claim memory used by this ArrayData in the provided memory pool. + /// + /// This claims memory for: + /// - All buffers in self.buffers + /// - All child ArrayData recursively + /// - The null buffer if present + #[cfg(feature = "pool")] + pub fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) { + // Claim all data buffers + for buffer in &self.buffers { + buffer.claim(pool); + } + + // Claim null buffer if present + if let Some(nulls) = &self.nulls { + nulls.claim(pool); + } + + // Recursively claim child data + for child in &self.child_data { + child.claim(pool); + } + } } /// Return the expected [`DataTypeLayout`] Arrays of this data diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 4200cd7a6c78..fa125a068ef9 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -81,6 +81,8 @@ force_validate = ["arrow-array/force_validate", "arrow-data/force_validate"] ffi = ["arrow-schema/ffi", "arrow-data/ffi", "arrow-array/ffi"] chrono-tz = ["arrow-array/chrono-tz"] canonical_extension_types = ["arrow-schema/canonical_extension_types"] +# Enable memory tracking support +pool = ["arrow-array/pool"] [dev-dependencies] chrono = { workspace = true }