[da-vinci][server] Improve heartbeat-lag ingestion info log readability#2765
[da-vinci][server] Improve heartbeat-lag ingestion info log readability#2765haoxu07 wants to merge 6 commits intolinkedin:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR improves operational log readability for heartbeat-lag ingestion dumps in Da Vinci by switching to a sorted, fixed-width table format with a clear “triggering partition” marker. It also includes a controller-side fix + regression test to ensure loadAllPushes() evaluates the refreshed ZK push status object rather than a stale loop variable.
Changes:
- Rework
KafkaConsumerService.convertTopicPartitionIngestionInfoMapToStr(...)to output a fixed-width, lag-sorted table with consumer-level fields hoisted into a header and an optional triggering-partition marker. - Update ingestion-info call sites and the heartbeat-lag WARN log to use structured placeholders and pass through the triggering partition where applicable.
- Fix controller
AbstractPushMonitor.loadAllPushes()to use the refreshedOfflinePushStatusreturned byupdateOfflinePush(), and add a regression test.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java | Reassign loop variable to refreshed push status returned from ZK so subsequent status checks use up-to-date partition status. |
| services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java | Adds a regression test reproducing the stale-loop-variable issue during controller restart bulk-load. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java | Implements the new ingestion-info table formatter (sorted, fixed-width, optional trigger marker). |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java | Passes the queried partition through as the triggering partition for marking. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java | Reformats the heartbeat-lag dump WARN to use SLF4J placeholders and include region/current-version fields. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceFormatterTest.java | New unit tests covering the formatter’s sorting, alignment, header hoisting, and trigger marking behavior. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java | Updates assertions to match the new formatter output and trigger marking. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The per-partition ingestion info log emitted by
`attemptToPrintIngestionInfoFor`
when a heartbeat lag triggers a dump was hard to read in production:
- Header line claimed the dump was for one partition but the body listed every
sibling partition assigned to the same shared consumer (16+ entries common).
- Consumer-level fields (`consumerIdStr`,
`elapsedTimeSinceLastConsumerPollInMs`)
were repeated identically on every row.
- `versionTopicName` was redundant with the partition name itself.
- Rows were emitted in `HashMap` iteration order, hiding the worst-lag
partitions
among healthy siblings.
- The triggering partition was not distinguishable from its siblings.
This change rewrites
`KafkaConsumerService.convertTopicPartitionIngestionInfoMapToStr`
to produce a fixed-width, sorted table:
- Hoists consumer-level fields into a header line (printed once).
- Sorts rows by `offsetLag` descending so the worst offenders surface at the
top.
- Drops `versionTopicName` and the per-row consumer fields from each row.
- Accepts a `triggeringPartition` parameter; when non-null and matching a row,
that row is prefixed with `* ` and suffixed with `(triggered)` so operators
can immediately see which partition prompted the dump.
- Column widths adapt to actual data so values stay aligned.
`AggKafkaConsumerService.getIngestionInfoFor(...,regionName)` now passes the
queried partition through as the trigger. The slow-shared-consumer alarm site
in `KafkaConsumerService.getMaxElapsedTimeMSSinceLastPollInConsumerPool`
passes
`null` since that signal is not tied to a specific partition.
`KafkaStoreIngestionService.attemptToPrintIngestionInfoFor` reformats the WARN
message ("Heartbeat-lag dump for replica X (isCurrentVersion=Y, region=Z)") so
the structured fields are SLF4J placeholders rather than concatenated into the
message body.
Sample output (16-partition shared consumer dump):
consumer=venice-shared-consumer-... lastPoll=53694ms partitions=16
partition lag msgRate byteRate
lastRecord(ms) latestOffset
cert-basic-dataset-northguard_v3-32 3979827 0.00 0.00
1120585 4005186
cert-basic-dataset-northguard_v3-25 3972689 0.00 0.00
268715 4002268
...
* cert-basic-dataset_v89-40 0 0.00 0.00
1226927 8232319 (triggered)
Tests:
- 16 new unit tests in `KafkaConsumerServiceFormatterTest` covering null/empty
input, sort order, trigger marker placement, column-width adaptation,
consumer-field hoisting, byteRate formatting, and a realistic 16-partition
reproduction of the production log that motivated the change.
- Updated `AggKafkaConsumerServiceTest.testGetIngestionInfoForSuccess` for the
new format (key:value form is gone).
For hybrid leaders past EOP, the partition is on a real-time topic (e.g. `store_rt-40`) but the consumer is hydrating a versioned topic (e.g. `store_v3`). The mapping from realtime partition to consuming version is not derivable from the partition name alone, so the previous change removed important diagnostic information when it dropped this field. Restore `versionTopic` as a left-aligned column at the end of the row so the realtime → version mapping is visible. For batch partitions the column duplicates the partition's topic (slight redundancy), but consistency wins over conditional emission and the column never alters its position. Two new unit tests: - `versionTopicColumnDistinguishesHybridLeaderFromBatch` — confirms an rt-partition row prints the consuming version topic. - `emptyVersionTopicIsRenderedAsBlankNotNull` — defensive coverage for the legacy null-versionTopic path. Updated existing tests: - Renamed `columnHeaderListsSixColumnsInOrder` → `...AllColumnsInOrder` and verifies all seven titles appear in order. - `widthsAdaptToHeaderWhenDataIsSmall` now compares row vs header total width as the alignment invariant (more robust than per-column math). - `realisticSharedConsumerScenarioFromIncident` now uses realistic per-partition versionTopic values matching each store's topic. - `AggKafkaConsumerServiceTest.testGetIngestionInfoForSuccess` asserts on the `versionTopic` column header and per-row value.
92df117 to
b86a4b1
Compare
…ed tests Two correctness fixes from Copilot review: 1. `String.format` calls now pass `Locale.ROOT` so the decimal separator stays consistent across hosts regardless of the JVM default locale (avoids `,` instead of `.` on non-US locales). 2. Javadoc updated to reflect that triggering rows use a `" * "` prefix (4-char left margin) and non-trigger rows use `" "`. The previous wording said `"* "` which didn't match the actual output. Test condensation (453 → 350 lines, 18 → 14 tests, no coverage loss): - Merged `nullMapReturnsEmptyString` + `emptyMapReturnsEmptyString` into one test. - Folded `headerCarriesConsumerLevelFields`, `columnHeaderListsAllColumnsInOrder`, and `singleEntryProducesHeaderColumnHeaderAndOneRow` into a single test driven by the same single-entry map (each was checking a different facet of the same 3-line output). - Merged `nullTriggeringPartitionMarksNoRow` + `triggerNotInMapMarksNoRow` into `noMarkerWhenTriggerIsNullOrAbsentFromMap` parameterized over the two trigger values (both verify the same "no marker" invariant). - Tightened `columnsAlignAcrossRowsRegardlessOfDataWidth` and `realisticSharedConsumerScenarioFromIncident` by removing redundant intermediate computations and inlined error messages. - Added small `fmt(...)` and `rowIndexContaining(...)` helpers to remove ceremony from individual tests.
There was a problem hiding this comment.
Pull request overview
Improves operational readability of heartbeat-lag ingestion dumps by switching from per-partition toString() logging to a structured, fixed-width table, and by making the WARN message in KafkaStoreIngestionService more structured.
Changes:
- Reworked
KafkaConsumerService.convertTopicPartitionIngestionInfoMapToStrto emit a fixed-width table with a hoisted consumer header, lag-sorted rows, and optional “triggering partition” marking. - Updated ingestion-info call sites to pass the triggering partition where applicable (and
nullwhere not applicable). - Added/updated unit tests to validate the new formatting and trigger marking behavior.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java | Implements the new ingestion-info table formatter and updates slow-consumer alarm logging to use the new signature. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java | Passes the queried partition as the triggering partition to the formatter. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java | Rewrites the heartbeat-lag WARN message to use structured placeholders and include the formatted dump. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceFormatterTest.java | Adds comprehensive unit tests for the formatter output, sorting, alignment, and trigger marking. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java | Updates assertions to match the new header/column-based output format and trigger marking. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…noisy
errors
Three fixes from PR feedback:
1. **Deterministic sort tie-breaker.** The previous sort considered only
`offsetLag`, so equal-lag rows (very common at lag=0) fell back to
HashMap iteration order — log output for the same input would differ
across JVMs and runs. Chained `.thenComparing(toString)` so equal-lag
rows are ordered by partition name ascending. New unit test
`equalLagRowsAreOrderedDeterministicallyByPartitionName` runs the
formatter twice on the same input and asserts identical output.
2. **Consistent line endings.** The header line uses literal `\n` but
the format strings used `%n` (platform line separator). On Linux
both produce `\n`, but on Windows `%n` would emit `\r\n`, mixing
line endings within the same dump and breaking `out.split("\n")`
in tests. Replaced `%n` with `\n` in `headerFmt` and `rowFmt` so
the formatter emits LF deterministically across platforms.
3. **Downgrade SIT-null and PCS-null ERROR to WARN in
`attemptToPrintIngestionInfoFor`.** Both fire on benign cleanup
races when a heartbeat-lag callback arrives after the version's
ingestion task or partition consumption state has been torn down.
The dump itself is purely diagnostic; the underlying ingestion
problem (if any) surfaces through other logs. Logging at ERROR
tipped ERROR-rate dashboards red without driving any action — same
noise pattern this PR addresses for the WARN dump itself.
The realistic-scenario test used real production store names (`cert-basic-dataset_v89`, `cert-basic-dataset-northguard_v3`, `aa-partial-update-benchmark-medium_v2`) borrowed from the original incident log. Replaced with synthetic names (`storeA_v3`, `storeB_v2`, `storeC_v89`) and renamed the test from `realisticSharedConsumerScenarioFromIncident` to `realisticSharedConsumerScenario`. Test logic and assertions are unchanged — same 16 partitions, same lag values, same trigger.
There was a problem hiding this comment.
Pull request overview
Improves readability of heartbeat-lag ingestion info logs by formatting per-consumer partition ingestion info as a sorted, fixed-width table and by making the heartbeat-lag WARN log message more structured.
Changes:
- Reworked
KafkaConsumerService.convertTopicPartitionIngestionInfoMapToStrto output a fixed-width table sorted by lag, with an optional “triggering partition” marker. - Updated ingestion-info callers to pass the triggering partition where applicable and updated log formatting in
KafkaStoreIngestionService. - Added/updated unit tests to validate the new formatter output and trigger marking behavior.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java | Implements new sorted table formatter and adds triggering-partition marking. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java | Passes queried partition as the triggering partition into the formatter. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java | Refactors heartbeat-lag dump WARN message to use placeholders and updates some log levels. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceFormatterTest.java | New unit tests covering sorting, alignment, header hoisting, and trigger marker behavior. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java | Updates assertions to match the new formatted output. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Two follow-up fixes for the latest review round: 1. **Rate-limit the SIT-null and PCS-null WARNs** (sixpluszero confirmed "it will keep firing and very noisy"). Added a static `RedundantExceptionFilter` to `KafkaStoreIngestionService` keyed by `"sit-null:<versionTopic>"` and `"pcs-null:<replicaId>"`, sized 8 MiB, 10-minute reset window. The heartbeat-lag callback can fire repeatedly during version cleanup or partition rebalance — one WARN per replica per cleanup window is enough for diagnosis, and operators stop seeing the same line spamming the dashboard. 2. **Restore the previous single-arg formatter signature** (Copilot: changing the `public static` signature is a binary-breaking change for any external code compiled against earlier da-vinci-client artifacts). Added a one-line overload `convertTopicPartitionIngestionInfoMapToStr(Map)` that delegates to the new two-arg form with `null` trigger. Source/binary compatibility restored; new callers can use either signature.
sixpluszero
left a comment
There was a problem hiding this comment.
Thank you for the sit check fix!
| Comparator.<Map.Entry<PubSubTopicPartition, TopicPartitionIngestionInfo>>comparingLong( | ||
| e -> e.getValue().getOffsetLag()).reversed().thenComparing(e -> e.getKey().toString())); | ||
|
|
||
| // Pre-format float fields and compute column widths from actual data. We keep the |
There was a problem hiding this comment.
nit: can consider moving this to util. and this looks heavy so it should not be used in a frequent call fashion but only for a point look up
Problem
When a heartbeat lag exceeds the threshold,
KafkaStoreIngestionService.attemptToPrintIngestionInfoForemits anINGESTION_DEBUGGER_LOGGER.warncontaining the ingestion info for every partition assigned to the same shared consumer, not just the triggering one. In production this produced log lines that were hard to read:storeC_v89-40) but the body listed 16+ siblings across multiple stores sharing the consumer.consumerIdStr,elapsedTimeSinceLastConsumerPollInMs) were repeated identically on every row.HashMapiteration order — partitions actually exhibiting lag were scattered among healthy siblings withoffsetLag:0.Sample of the original log (excerpt, with synthetic store names):
Change
Rewrites
KafkaConsumerService.convertTopicPartitionIngestionInfoMapToStrto produce a fixed-width, sorted table:offsetLagdescending with a deterministic tie-breaker by partition name ascending (so equal-lag rows — common at lag=0 — render in stable order across runs).versionTopicas a column: for hybrid leaders past EOP, the partition is on a real-time topic (e.g.store_rt-40) but the consumer is hydrating a versioned topic (e.g.store_v3). The mapping is not derivable from the partition name alone.triggeringPartitionparameter; when non-null and matching a row, that row's 4-character left margin is replaced with" * "and the row is suffixed with"(triggered)"so operators can immediately see which partition prompted the dump. A backward-compatible single-argument overload is preserved so the change is source/binary-compatible.Locale.ROOT) and consistent\nline endings across platforms.AggKafkaConsumerService.getIngestionInfoFor(...,regionName)now passes the queried partition through as the trigger. The slow-shared-consumer alarm site inKafkaConsumerService.getMaxElapsedTimeMSSinceLastPollInConsumerPoolpassesnullsince that signal is not tied to a specific partition.KafkaStoreIngestionService.attemptToPrintIngestionInfoFor:replicaId,isCurrentVersion,region) are SLF4J placeholders rather than concatenated into the message body.RedundantExceptionFilter(10-minute window, keyed by"sit-null:<versionTopic>"/"pcs-null:<replicaId>") so a partition being torn down produces one WARN per cleanup window instead of one per heartbeat-lag callback.Sample new output
Same 16-partition dump as above, after the change (output captured directly from the unit test, with synthetic store names):
The 7
storeA_v3-*partitions now cluster at the top in lag-descending order — the actual signal. Equal-lag rows fall back to partition-name ascending. The triggering partition is unambiguously marked even when itsoffsetLagmatches several siblings in the lag=0 cluster.Compatibility
convertTopicPartitionIngestionInfoMapToStrretains its previous single-argument signature as a backward-compatible overload, plus a new two-argument form that acceptstriggeringPartition. External callers compiled against earlier da-vinci-client artifacts continue to link.TopicPartitionIngestionInfo.toString()and JSON serialization are untouched, so any downstream JSON consumers continue to work.attemptToPrintIngestionInfoForchanged from"Ingestion info for topic partition: ..."to"Heartbeat-lag dump for replica ...". Any log-based alert or dashboard pattern-matching on the old prefix needs updating.Tests
15 unit tests in
KafkaConsumerServiceFormatterTestcovering: null/empty input, single-row layout, sort order including the deterministic tie-breaker, trigger marker placement, column-width adaptation, consumer-field hoisting, byteRate/msgRate formatting (incl. zero/negative), versionTopic column for hybrid leader, defensive equals-based partition matching, and a synthetic 16-partition reconstruction of the production scenario that motivated the change. Plus updatedAggKafkaConsumerServiceTest.testGetIngestionInfoForSuccessfor the new format.All 37 tests in the affected classes pass locally; CI green on previous push.
🤖 Generated with Claude Code