Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 65 additions & 22 deletions rust/worker/src/execution/functions/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ enum StatisticsValue {
/// String metadata value associated with a record.
Str(String),
/// Sparse vector index observed in metadata.
SparseVector(u32),
SparseVector(u32, Option<String>),
}

impl StatisticsValue {
Expand All @@ -114,7 +114,7 @@ impl StatisticsValue {
Self::Int(_) => "int",
Self::Float(_) => "float",
Self::Str(_) => "str",
Self::SparseVector(_) => "sparse",
Self::SparseVector(_, _) => "sparse",
}
}

Expand All @@ -125,12 +125,12 @@ impl StatisticsValue {
Self::Int(_) => "i",
Self::Float(_) => "f",
Self::Str(_) => "s",
Self::SparseVector(_) => "sv",
Self::SparseVector(_, _) => "sv",
}
}

/// A stable representation of the statistics's value.
fn stable_value(&self) -> String {
fn stable_value_index(&self) -> String {
match self {
Self::Bool(b) => {
format!("{b}")
Expand All @@ -140,16 +140,27 @@ impl StatisticsValue {
}
Self::Str(s) => s.clone(),
Self::Float(f) => format!("{f:.16e}"),
Self::SparseVector(index) => {
Self::SparseVector(index, _) => {
format!("{index}")
}
}
}

/// A stable representation of the statistics's value.
fn stable_value_label(&self) -> Option<String> {
match self {
Self::Bool(_) => None,
Self::Int(_) => None,
Self::Str(_) => None,
Self::Float(_) => None,
Self::SparseVector(_, label) => label.clone(),
}
}

/// A stable string representation of a statistics value with type tag.
/// Separate so display repr can change.
fn stable_string(&self) -> String {
format!("{}:{}", self.type_prefix(), self.stable_value())
fn stable_value_string(&self) -> String {
format!("{}:{}", self.type_prefix(), self.stable_value_index())
}

/// Convert MetadataValue to a vector of StatisticsValue.
Expand All @@ -160,18 +171,31 @@ impl StatisticsValue {
MetadataValue::Int(i) => vec![StatisticsValue::Int(*i)],
MetadataValue::Float(f) => vec![StatisticsValue::Float(*f)],
MetadataValue::Str(s) => vec![StatisticsValue::Str(s.clone())],
MetadataValue::SparseVector(sparse) => sparse
.indices
.iter()
.map(|index| StatisticsValue::SparseVector(*index))
.collect(),
MetadataValue::SparseVector(sparse) => {
if let Some(tokens) = sparse.tokens.as_ref() {
sparse
.indices
.iter()
.zip(tokens.iter())
.map(|(index, token)| {
StatisticsValue::SparseVector(*index, Some(token.clone()))
})
.collect()
} else {
sparse
.indices
.iter()
.map(|index| StatisticsValue::SparseVector(*index, None))
.collect()
}
}
}
}
}

impl std::fmt::Display for StatisticsValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.stable_string())
write!(f, "{}", self.stable_value_string())
}
}

Expand All @@ -182,7 +206,9 @@ impl PartialEq for StatisticsValue {
(Self::Int(lhs), Self::Int(rhs)) => lhs == rhs,
(Self::Float(lhs), Self::Float(rhs)) => lhs.to_bits() == rhs.to_bits(),
(Self::Str(lhs), Self::Str(rhs)) => lhs == rhs,
(Self::SparseVector(lhs), Self::SparseVector(rhs)) => lhs == rhs,
(Self::SparseVector(lhs1, lhs2), Self::SparseVector(rhs1, rhs2)) => {
lhs1 == rhs1 && lhs2 == rhs2
}
Comment on lines +209 to +211
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

For consistency with the other match arms in this PartialEq implementation, you can remove the braces and use a single expression with a trailing comma.

Context for Agents
For consistency with the other match arms in this `PartialEq` implementation, you can remove the braces and use a single expression with a trailing comma.

File: rust/worker/src/execution/functions/statistics.rs
Line: 175

_ => false,
}
}
Expand All @@ -198,7 +224,10 @@ impl Hash for StatisticsValue {
StatisticsValue::Int(value) => value.hash(state),
StatisticsValue::Float(value) => value.to_bits().hash(state),
StatisticsValue::Str(value) => value.hash(state),
StatisticsValue::SparseVector(value) => value.hash(state),
StatisticsValue::SparseVector(value, label) => {
value.hash(state);
label.hash(state);
}
}
}
}
Expand Down Expand Up @@ -280,7 +309,13 @@ impl StatisticsFunctionExecutor {
},
"str" => StatisticsValue::Str(value_str.to_string()),
"sparse" => match value_str.parse::<u32>() {
Ok(index) => StatisticsValue::SparseVector(index),
Ok(index) => {
let label = match metadata.get("value_label") {
Some(MetadataValue::Str(v)) => Some(v.clone()),
_ => None,
};
StatisticsValue::SparseVector(index, label)
}
_ => continue,
},
_ => continue,
Expand Down Expand Up @@ -392,9 +427,10 @@ impl AttachedFunctionExecutor for StatisticsFunctionExecutor {
if !count.is_changed() {
continue;
}
let stable_value = stats_value.stable_value();
let stable_string = stats_value.stable_string();
let record_id = format!("{key}::{stable_string}");
let stable_value_index = stats_value.stable_value_index();
let stable_value_string = stats_value.stable_value_string();
let record_id = format!("{key}::{stable_value_string}");
let document = format!("statistics about {key} for {stable_value_string}");

if key != SUMMARY_KEY && count.is_empty() {
records.push(LogRecord {
Expand All @@ -411,16 +447,23 @@ impl AttachedFunctionExecutor for StatisticsFunctionExecutor {
continue;
}

let document = format!("statistics about {key} for {stable_string}");

let mut metadata = HashMap::with_capacity(4);
metadata.insert("count".to_string(), count.output());
metadata.insert("key".to_string(), UpdateMetadataValue::Str(key.clone()));
metadata.insert(
"type".to_string(),
UpdateMetadataValue::Str(stats_value.stable_type().to_string()),
);
metadata.insert("value".to_string(), UpdateMetadataValue::Str(stable_value));
metadata.insert(
"value".to_string(),
UpdateMetadataValue::Str(stable_value_index),
);
if let Some(stable_value_label) = stats_value.stable_value_label() {
metadata.insert(
"value_label".to_string(),
UpdateMetadataValue::Str(stable_value_label),
);
}

records.push(LogRecord {
log_offset: 0,
Expand Down
Loading