feat(consumer): Add memory-based batching limit to Rust consumer#7854
Merged
feat(consumer): Add memory-based batching limit to Rust consumer#7854
Conversation
Add a new CLI option to limit batch size by total encoded bytes instead of message count. This prevents Kubernetes OOMs caused by large messages accumulating during backlogs, where the existing message-count-based limit is insufficient because message sizes vary widely. When --max-batch-size-bytes is set, the Reduce step uses byte-based flushing. When unset, existing message-count behavior is preserved. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Use a type alias for the batch size function pointer in the RowBinary Reduce path to satisfy clippy's type_complexity lint. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…lation enum
Instead of a separate --max-batch-size-bytes parameter, use a
--max-batch-size-calculation flag ("rows" or "bytes") that controls how
--max-batch-size is interpreted. This avoids ambiguity when both
parameters could be specified.
On the Rust side, max_batch_size_calculation is a proper serde enum
(BatchSizeCalculation) so invalid values fail at config deserialization
time.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: RowBinary path never sets
num_bytes, breaking byte-based batching- In
make_rust_processor_row_binary, the per-messageBytesInsertBatch<Vec<T>>now calls.with_num_bytes(...)using summeditem_type_metrics.bytes_processed, so byte-based Reduce batching can flush correctly.
- In
Or push these changes by commenting:
@cursor push 810e8ee99a
Preview (810e8ee99a)
diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs
--- a/rust_snuba/src/strategies/processor.rs
+++ b/rust_snuba/src/strategies/processor.rs
@@ -213,7 +213,14 @@
}
}
+ let num_bytes = transformed
+ .item_type_metrics
+ .as_ref()
+ .map(|metrics| metrics.bytes_processed.values().sum())
+ .unwrap_or(0);
+
let mut payload = BytesInsertBatch::from_rows(transformed.rows)
+ .with_num_bytes(num_bytes)
.with_message_timestamp(timestamp)
.with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([(
partition.index,This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
Contributor
Author
yeah, it's not ready yet because we really should add it to the RowBinary writer (as the bot comment[s] said) |
Add EstimatedSize trait and implement it for EAPItemRow so that byte-based batch size calculation works for the RowBinary pipeline, not just JSONEachRow. The estimate accounts for heap allocations in all attribute Vec fields. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
…ulation Four tests that exercise the Reduce step with both BatchSizeCalculation modes, verifying that byte-based batching flushes earlier than row-based when messages are large, and that row-based batching ignores byte size. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Existing config JSON without the new field would fail deserialization. Default to Rows to preserve backwards compatibility. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


Add a
--max-batch-size-calculationCLI flag to the Rust consumer that controls how--max-batch-sizeis interpreted: either as a message count (rows, the default) or as total encoded byte size (bytes).The existing
--max-batch-sizeparameter limits batches by number of messages, but this is insufficient to prevent Kubernetes OOMs during backlogs because message sizes can be unpredictable (particularly in eap_items across item types)With
--max-batch-size-calculation=bytes, operators can set--max-batch-sizeto a byte threshold (e.g. 100000000 for ~100MB) and theReduceaccumulator will flush when the total encoded byte size reaches that limit. Using a single flag instead of a separate--max-batch-size-bytesparameter avoids ambiguity about which limit applies.Changes:
--max-batch-size-calculationclick option (rows/bytes) onrust_consumer, wired throughConsumerConfigBatchSizeCalculationenum with#[serde(rename_all = "lowercase")], deserialized from the JSON configBytesInsertBatchtracksnum_bytes(set fromencoded_rows.len()at creation, summed on merge)Reducesteps use byte-basedcompute_batch_sizewhenBytesis configuredValidation:
Next steps:
--max-batch-size-calculationon a few s4s2 and de consumers to a reasonable value of that metrichttps://linear.app/getsentry/issue/EAP-460/add-max-batch-size-bytes-to-consumer