Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions arrow-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ all-features = true
[features]
ffi = ["arrow-schema/ffi", "arrow-data/ffi"]
force_validate = []
# Enable memory tracking support
pool = ["arrow-buffer/pool", "arrow-data/pool"]

[dev-dependencies]
rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] }
Expand Down
69 changes: 69 additions & 0 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,75 @@ pub trait Array: std::fmt::Debug + Send + Sync {
/// This value will always be greater than returned by `get_buffer_memory_size()` and
/// includes the overhead of the data structures that contain the pointers to the various buffers.
fn get_array_memory_size(&self) -> usize;

/// Claim memory used by this array in the provided memory pool.
///
/// This recursively claims memory for:
/// - All data buffers in this array
/// - All child arrays (for nested types like List, Struct, etc.)
/// - The null bitmap buffer if present
///
/// This method guarantees that the memory pool will only compute occupied memory
/// exactly once. For example, if this array is derived from operations like `slice`,
/// calling `claim` on it would not change the memory pool's usage if the underlying buffers
/// are already counted before.
///
/// # Example
/// ```
/// # use arrow_array::{Int32Array, Array};
/// # use arrow_buffer::TrackingMemoryPool;
/// # use arrow_buffer::MemoryPool;
///
/// let pool = TrackingMemoryPool::default();
///
/// let small_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
/// let small_array_size = small_array.get_buffer_memory_size();
///
/// // Claim the array's memory in the pool
/// small_array.claim(&pool);
///
/// // Create and claim slices of `small_array`; should not increase memory usage
/// let slice1 = small_array.slice(0, 2);
/// let slice2 = small_array.slice(2, 2);
/// slice1.claim(&pool);
/// slice2.claim(&pool);
///
/// assert_eq!(pool.used(), small_array_size);
///
/// // Create a `large_array` which does not derive from the original `small_array`
///
/// let large_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
/// let large_array_size = large_array.get_buffer_memory_size();
///
/// large_array.claim(&pool);
///
/// // Trying to claim more than once is a no-op
/// large_array.claim(&pool);
/// large_array.claim(&pool);
///
/// assert_eq!(pool.used(), small_array_size + large_array_size);
///
/// let sum_of_all_sizes = small_array_size + large_array_size + slice1.get_buffer_memory_size() + slice2.get_buffer_memory_size();
///
/// // `get_buffer_memory_size` works independently of the memory pool, so a sum of all the
/// // arrays in scope will always be >= the memory used reported by the memory pool.
/// assert_ne!(pool.used(), sum_of_all_sizes);
///
/// // Until the final claim is dropped the buffer size remains accounted for
/// drop(small_array);
/// drop(slice1);
///
/// assert_eq!(pool.used(), small_array_size + large_array_size);
///
/// // Dropping this finally releases the buffer that was backing `small_array`
/// drop(slice2);
///
/// assert_eq!(pool.used(), large_array_size);
/// ```
#[cfg(feature = "pool")]
fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
self.to_data().claim(pool)
}
}

/// A reference-counted reference to a generic `Array`
Expand Down
1 change: 1 addition & 0 deletions arrow-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ bench = false
all-features = true

[features]
# Enable memory tracking support
pool = []

[dependencies]
Expand Down
7 changes: 7 additions & 0 deletions arrow-buffer/src/buffer/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,13 @@ impl NullBuffer {
pub fn buffer(&self) -> &Buffer {
self.buffer.inner()
}

/// Claim memory used by this null buffer in the provided memory pool.
#[cfg(feature = "pool")]
pub fn claim(&self, pool: &dyn crate::MemoryPool) {
// NullBuffer wraps a BooleanBuffer which wraps a Buffer
self.buffer.inner().claim(pool);
}
}

impl<'a> IntoIterator for &'a NullBuffer {
Expand Down
2 changes: 2 additions & 0 deletions arrow-data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ bench = false
force_validate = []
# Enable ffi support
ffi = ["arrow-schema/ffi"]
# Enable memory tracking support
pool = ["arrow-buffer/pool"]

[package.metadata.docs.rs]
all-features = true
Expand Down
24 changes: 24 additions & 0 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,30 @@ impl ArrayData {
pub fn into_builder(self) -> ArrayDataBuilder {
self.into()
}

/// Claim memory used by this ArrayData in the provided memory pool.
///
/// This claims memory for:
/// - All buffers in self.buffers
/// - All child ArrayData recursively
/// - The null buffer if present
#[cfg(feature = "pool")]
pub fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
// Claim all data buffers
for buffer in &self.buffers {
buffer.claim(pool);
}

// Claim null buffer if present
if let Some(nulls) = &self.nulls {
nulls.claim(pool);
}

// Recursively claim child data
for child in &self.child_data {
child.claim(pool);
}
}
}

/// Return the expected [`DataTypeLayout`] Arrays of this data
Expand Down
2 changes: 2 additions & 0 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ force_validate = ["arrow-array/force_validate", "arrow-data/force_validate"]
ffi = ["arrow-schema/ffi", "arrow-data/ffi", "arrow-array/ffi"]
chrono-tz = ["arrow-array/chrono-tz"]
canonical_extension_types = ["arrow-schema/canonical_extension_types"]
# Enable memory tracking support
pool = ["arrow-array/pool"]

[dev-dependencies]
chrono = { workspace = true }
Expand Down
Loading