Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 106 additions & 3 deletions crates/store/re_chunk/src/iter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::Arc;
use std::{
borrow::{Borrow as _, Cow},
sync::Arc,
};

use arrow::{
array::{
Expand Down Expand Up @@ -743,6 +746,106 @@ impl ChunkComponentSlicer for bool {
}
}

/// Generic component slicer that casts to a primitive type.
///
/// In the happy path (when the array is already the target type), this performs zero-copy slicing.
/// When casting is required, it allocates and owns the casted array.
pub struct CastToPrimitive<P, T>
where
P: arrow::array::ArrowPrimitiveType<Native = T>,
T: arrow::datatypes::ArrowNativeType,
{
_phantom: std::marker::PhantomData<(P, T)>,
}

/// Iterator that owns the array values and component spans.
///
/// This is necessary when we need to cast the array, as the casted array
/// must be owned by the iterator rather than borrowed from the caller.
struct OwnedSliceIterator<'a, T, I>
where
T: arrow::datatypes::ArrowNativeType,
I: Iterator<Item = Span<usize>>,
{
values: Cow<'a, arrow::buffer::ScalarBuffer<T>>,
component_spans: I,
}

impl<'a, T, I> Iterator for OwnedSliceIterator<'a, T, I>
where
T: arrow::datatypes::ArrowNativeType + Clone,
I: Iterator<Item = Span<usize>>,
{
type Item = Cow<'a, [T]>;

fn next(&mut self) -> Option<Self::Item> {
let span = self.component_spans.next()?;
match &self.values {
Cow::Borrowed(values) => Some(Cow::Borrowed(&values[span.range()])),
// TODO(grtlr): This `clone` here makes me sad, but I don't see a way around it.
Cow::Owned(values) => Some(Cow::Owned(values[span.range()].to_vec())),
}
}
}

fn error_on_cast_failure(
component: ComponentIdentifier,
target: &arrow::datatypes::DataType,
actual: &arrow::datatypes::DataType,
error: &arrow::error::ArrowError,
) {
if cfg!(debug_assertions) {
panic!(
"[DEBUG ASSERT] cast from {actual:?} to {target:?} failed for {component}: {error}. Data discarded"
);
} else {
re_log::error_once!(
"cast from {actual:?} to {target:?} failed for {component}: {error}. Data discarded"
);
}
}

impl<P, T> ChunkComponentSlicer for CastToPrimitive<P, T>
where
P: arrow::array::ArrowPrimitiveType<Native = T>,
T: arrow::datatypes::ArrowNativeType + Clone,
{
type Item<'a> = Cow<'a, [T]>;

fn slice<'a>(
component: ComponentIdentifier,
array: &'a dyn arrow::array::Array,
component_spans: impl Iterator<Item = Span<usize>> + 'a,
) -> impl Iterator<Item = Self::Item<'a>> + 'a {
// We first try to down cast (happy path - zero copy).
if let Some(values) = array.downcast_array_ref::<arrow::array::PrimitiveArray<P>>() {
return Either::Right(OwnedSliceIterator {
values: Cow::Borrowed(values.values()),
component_spans,
});
}

// Then we try to perform a primitive cast (requires ownership).
let casted = match arrow::compute::cast(array, &P::DATA_TYPE) {
Ok(casted) => casted,
Err(err) => {
error_on_cast_failure(component, &P::DATA_TYPE, array.data_type(), &err);
return Either::Left(std::iter::empty());
}
};

let Some(values) = casted.downcast_array_ref::<arrow::array::PrimitiveArray<P>>() else {
error_on_downcast_failure(component, "ArrowPrimitiveArray<T>", array.data_type());
return Either::Left(std::iter::empty());
};

Either::Right(OwnedSliceIterator {
values: Cow::Owned(values.values().clone()),
component_spans,
})
}
}

// ---

pub struct ChunkIndicesIter {
Expand Down Expand Up @@ -945,11 +1048,11 @@ fn error_on_downcast_failure(
) {
if cfg!(debug_assertions) {
panic!(
"[DEBUG ASSERT] downcast failed to {target} failed for {component}. Array data type was {actual:?}. Data discarded"
"[DEBUG ASSERT] downcast to {target} failed for {component}. Array data type was {actual:?}. Data discarded"
);
} else {
re_log::error_once!(
"downcast failed to {target} for {component}. Array data type was {actual:?}. data discarded"
"downcast to {target} failed for {component}. Array data type was {actual:?}. Data discarded"
);
}
}
Expand Down
7 changes: 6 additions & 1 deletion crates/store/re_chunk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ pub use self::chunk::{
};
pub use self::helpers::{ChunkShared, UnitChunkShared};
pub use self::iter::{
ChunkComponentIter, ChunkComponentIterItem, ChunkComponentSlicer, ChunkIndicesIter,
// TODO: Right place? Maybe `re_view`?
CastToPrimitive,
ChunkComponentIter,
ChunkComponentIterItem,
ChunkComponentSlicer,
ChunkIndicesIter,
};
pub use self::latest_at::LatestAtQuery;
pub use self::range::{RangeQuery, RangeQueryOptions};
Expand Down
30 changes: 30 additions & 0 deletions crates/store/re_types/src/dynamic_archetype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,36 @@ impl DynamicArchetype {
self
}

// TODO: hackity
/// Adds a field of arbitrary data to this archetype.
///
/// In many cases, it might be more convenient to use [`Self::with_component`] to log an existing Rerun component instead.
#[inline]
pub fn with_component_from_data_with_type(
mut self,
field: impl AsRef<str>,
component_type: ComponentType,
array: arrow::array::ArrayRef,
) -> Self {
let field = field.as_ref();
let component = field.into();

self.batches.insert(
component,
SerializedComponentBatch {
array,
descriptor: {
let mut desc = ComponentDescriptor::partial(component);
if let Some(archetype_name) = self.archetype_name {
desc = desc.with_builtin_archetype(archetype_name);
}
desc.with_component_type(component_type)
},
},
);
self
}

/// Adds an existing Rerun [`Component`] to this archetype.
#[inline]
pub fn with_component<C: Component>(
Expand Down
23 changes: 17 additions & 6 deletions crates/viewer/re_view_time_series/src/series_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
use itertools::Itertools as _;

use re_chunk_store::RangeQuery;
use re_chunk_store::external::re_chunk::CastToPrimitive;
use re_log_types::{EntityPath, TimeInt};
use re_types::external::arrow;
use re_types::external::arrow::datatypes::DataType as ArrowDatatype;
use re_types::{ComponentDescriptor, ComponentIdentifier, Loggable as _, RowId, components};
use re_view::{ChunksWithComponent, HybridRangeResults, RangeResultsExt as _, clamped_or_nothing};
Expand All @@ -22,8 +24,10 @@ pub fn determine_num_series(all_scalar_chunks: &ChunksWithComponent<'_>) -> usiz
.iter()
.find_map(|chunk| {
chunk
.iter_slices::<f64>()
.find_map(|slice| (!slice.is_empty()).then_some(slice.len()))
// TODO(grtlr): The comment above is even more important when we do casting (allocations) here.
// TODO(grtlr): Unless we're on the happy path this allocates and happens everytime the visualizer runs!
.iter_slices::<CastToPrimitive<arrow::datatypes::Float64Type, f64>>()
.find_map(|values| (!values.is_empty()).then_some(values.len()))
})
.unwrap_or(1)
}
Expand Down Expand Up @@ -95,7 +99,10 @@ pub fn collect_scalars(
let points = &mut *points_per_series[0];
all_scalar_chunks
.iter()
.flat_map(|chunk| chunk.iter_slices::<f64>())
.flat_map(|chunk| {
// TODO(grtlr): Unless we're on the happy path this allocates and happens everytime the visualizer runs!
chunk.iter_slices::<CastToPrimitive<arrow::datatypes::Float64Type, f64>>()
})
.enumerate()
.for_each(|(i, values)| {
if let Some(value) = values.first() {
Expand All @@ -107,13 +114,17 @@ pub fn collect_scalars(
} else {
all_scalar_chunks
.iter()
.flat_map(|chunk| chunk.iter_slices::<f64>())
.flat_map(|chunk| {
// TODO(grtlr): Unless we're on the happy path this allocates and happens everytime the visualizer runs!
chunk.iter_slices::<CastToPrimitive<arrow::datatypes::Float64Type, f64>>()
})
.enumerate()
.for_each(|(i, values)| {
for (points, value) in points_per_series.iter_mut().zip(values) {
let values_slice = values.as_ref();
for (points, value) in points_per_series.iter_mut().zip(values_slice) {
points[i].value = *value;
}
for points in points_per_series.iter_mut().skip(values.len()) {
for points in points_per_series.iter_mut().skip(values_slice.len()) {
points[i].attrs.kind = PlotSeriesKind::Clear;
}
});
Expand Down
68 changes: 68 additions & 0 deletions crates/viewer/re_view_time_series/tests/blueprint.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::sync::Arc;

use re_chunk_store::RowId;
use re_log_types::{EntityPath, TimePoint};
use re_test_context::TestContext;
use re_test_viewport::TestContextExt as _;
use re_types::{
Archetype as _, DynamicArchetype,
archetypes::{self, Scalars},
blueprint, components,
external::arrow::array::{Float32Array, Int64Array},
};
use re_view_time_series::TimeSeriesView;
use re_viewer_context::{BlueprintContext as _, TimeControlCommand, ViewClass as _, ViewId};
Expand Down Expand Up @@ -70,3 +74,67 @@ fn setup_blueprint(test_context: &mut TestContext) -> ViewId {
blueprint.add_view_at_root(view)
})
}

// TODO: Move this test to a better place.
#[test]
pub fn test_blueprint_f64_with_time_series() {
let mut test_context = TestContext::new_with_view_class::<TimeSeriesView>();

let timeline = re_log_types::Timeline::log_tick();

for i in 0..32 {
let timepoint = TimePoint::from([(timeline, i)]);
let t = i as f64 / 8.0;
test_context.log_entity("plots/sin", |builder| {
builder.with_archetype(RowId::new(), timepoint.clone(), &Scalars::single(t.sin()))
});
test_context.log_entity("plots/cos", |builder| {
builder.with_archetype(
RowId::new(),
timepoint.clone(),
// an untagged component
&DynamicArchetype::new(Scalars::name()).with_component_from_data(
"scalars",
Arc::new(Float32Array::from(vec![t.cos() as f32])),
),
)
});
test_context.log_entity("plots/line", |builder| {
builder.with_archetype(
RowId::new(),
timepoint,
// an untagged component
&DynamicArchetype::new(Scalars::name()).with_component_from_data(
"scalars",
// Something that stays in the same domain as a sine wave.
Arc::new(Int64Array::from(vec![(i % 2) * 2 - 1])),
),
)
});
}

// test_context
// .save_recording_to_file("/Users/goertler/Desktop/dyn_f64.rrd")
// .unwrap();

test_context.send_time_commands(
test_context.active_store_id(),
[TimeControlCommand::SetActiveTimeline(*timeline.name())],
);

let view_id = setup_descriptor_override_blueprint(&mut test_context);
test_context.run_view_ui_and_save_snapshot(
view_id,
"blueprint_f64_with_time_series",
egui::vec2(300.0, 300.0),
None,
);
}

fn setup_descriptor_override_blueprint(test_context: &mut TestContext) -> ViewId {
test_context.setup_viewport_blueprint(|_ctx, blueprint| {
let view = ViewBlueprint::new_with_root_wildcard(TimeSeriesView::identifier());

blueprint.add_view_at_root(view)
})
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading