Skip to content

feat(consumer): Add memory-based batching limit to Rust consumer#7854

Merged
onewland merged 7 commits intomasterfrom
feat/consumer-max-batch-size-bytes
Apr 1, 2026
Merged

feat(consumer): Add memory-based batching limit to Rust consumer#7854
onewland merged 7 commits intomasterfrom
feat/consumer-max-batch-size-bytes

Conversation

@onewland
Copy link
Copy Markdown
Contributor

@onewland onewland commented Mar 30, 2026

Add a --max-batch-size-calculation CLI flag to the Rust consumer that controls how --max-batch-size is interpreted: either as a message count (rows, the default) or as total encoded byte size (bytes).

The existing --max-batch-size parameter limits batches by number of messages, but this is insufficient to prevent Kubernetes OOMs during backlogs because message sizes can be unpredictable (particularly in eap_items across item types)

With --max-batch-size-calculation=bytes, operators can set --max-batch-size to a byte threshold (e.g. 100000000 for ~100MB) and the Reduce accumulator will flush when the total encoded byte size reaches that limit. Using a single flag instead of a separate --max-batch-size-bytes parameter avoids ambiguity about which limit applies.

Changes:

  • Python CLI/config: New --max-batch-size-calculation click option (rows/bytes) on rust_consumer, wired through ConsumerConfig
  • Rust config: BatchSizeCalculation enum with #[serde(rename_all = "lowercase")], deserialized from the JSON config
  • Rust types: BytesInsertBatch tracks num_bytes (set from encoded_rows.len() at creation, summed on merge)
  • Rust factory: Both JSONEachRow and RowBinary Reduce steps use byte-based compute_batch_size when Bytes is configured

Validation:

  • tested locally that ~10KB messages will batch correctly with a 50KB-specified batch size or batches of 10 rows.

Next steps:

  • emit a metric for what we're currently accumulating in batch sizes
  • try setting --max-batch-size-calculation on a few s4s2 and de consumers to a reasonable value of that metric
  • replace this for all consumers in all regions? it seems safer but also like kind of a tedious process; maybe eap + outcomes is good enough

https://linear.app/getsentry/issue/EAP-460/add-max-batch-size-bytes-to-consumer

onewland and others added 4 commits March 30, 2026 15:19
Add a new CLI option to limit batch size by total encoded bytes instead
of message count. This prevents Kubernetes OOMs caused by large messages
accumulating during backlogs, where the existing message-count-based
limit is insufficient because message sizes vary widely.

When --max-batch-size-bytes is set, the Reduce step uses byte-based
flushing. When unset, existing message-count behavior is preserved.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Use a type alias for the batch size function pointer in the RowBinary
Reduce path to satisfy clippy's type_complexity lint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…lation enum

Instead of a separate --max-batch-size-bytes parameter, use a
--max-batch-size-calculation flag ("rows" or "bytes") that controls how
--max-batch-size is interpreted. This avoids ambiguity when both
parameters could be specified.

On the Rust side, max_batch_size_calculation is a proper serde enum
(BatchSizeCalculation) so invalid values fail at config deserialization
time.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@onewland onewland marked this pull request as ready for review March 31, 2026 14:35
@onewland onewland requested a review from a team as a code owner March 31, 2026 14:35
@onewland onewland changed the title feat(consumer): Add --max-batch-size-bytes parameter to Rust consumer feat(consumer): Add memory-based batching limit to Rust consumer Mar 31, 2026
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: RowBinary path never sets num_bytes, breaking byte-based batching
    • In make_rust_processor_row_binary, the per-message BytesInsertBatch<Vec<T>> now calls .with_num_bytes(...) using summed item_type_metrics.bytes_processed, so byte-based Reduce batching can flush correctly.

Create PR

Or push these changes by commenting:

@cursor push 810e8ee99a
Preview (810e8ee99a)
diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs
--- a/rust_snuba/src/strategies/processor.rs
+++ b/rust_snuba/src/strategies/processor.rs
@@ -213,7 +213,14 @@
             }
         }
 
+        let num_bytes = transformed
+            .item_type_metrics
+            .as_ref()
+            .map(|metrics| metrics.bytes_processed.values().sum())
+            .unwrap_or(0);
+
         let mut payload = BytesInsertBatch::from_rows(transformed.rows)
+            .with_num_bytes(num_bytes)
             .with_message_timestamp(timestamp)
             .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([(
                 partition.index,

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Copy link
Copy Markdown
Member

@volokluev volokluev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a test?

@onewland
Copy link
Copy Markdown
Contributor Author

can you add a test?

yeah, it's not ready yet because we really should add it to the RowBinary writer (as the bot comment[s] said)

Add EstimatedSize trait and implement it for EAPItemRow so that
byte-based batch size calculation works for the RowBinary pipeline,
not just JSONEachRow. The estimate accounts for heap allocations in
all attribute Vec fields.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

onewland and others added 2 commits March 31, 2026 13:29
…ulation

Four tests that exercise the Reduce step with both BatchSizeCalculation
modes, verifying that byte-based batching flushes earlier than row-based
when messages are large, and that row-based batching ignores byte size.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Existing config JSON without the new field would fail deserialization.
Default to Rows to preserve backwards compatibility.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Member

@kylemumma kylemumma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@onewland onewland merged commit f39941c into master Apr 1, 2026
46 of 47 checks passed
@onewland onewland deleted the feat/consumer-max-batch-size-bytes branch April 1, 2026 23:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants