Skip to content

[da-vinci][server] Improve heartbeat-lag ingestion info log readability#2765

Open
haoxu07 wants to merge 6 commits intolinkedin:mainfrom
haoxu07:xhao/improve-ingestion-info-log-format
Open

[da-vinci][server] Improve heartbeat-lag ingestion info log readability#2765
haoxu07 wants to merge 6 commits intolinkedin:mainfrom
haoxu07:xhao/improve-ingestion-info-log-format

Conversation

@haoxu07
Copy link
Copy Markdown
Contributor

@haoxu07 haoxu07 commented Apr 29, 2026

Problem

When a heartbeat lag exceeds the threshold, KafkaStoreIngestionService.attemptToPrintIngestionInfoFor emits an INGESTION_DEBUGGER_LOGGER.warn containing the ingestion info for every partition assigned to the same shared consumer, not just the triggering one. In production this produced log lines that were hard to read:

  • The header claimed the dump was for one partition (e.g. storeC_v89-40) but the body listed 16+ siblings across multiple stores sharing the consumer.
  • Consumer-level fields (consumerIdStr, elapsedTimeSinceLastConsumerPollInMs) were repeated identically on every row.
  • Rows were emitted in HashMap iteration order — partitions actually exhibiting lag were scattered among healthy siblings with offsetLag:0.
  • The triggering partition was indistinguishable from its siblings in the dump.

Sample of the original log (excerpt, with synthetic store names):

WARN  TopicPartitionIngestionInfo
Ingestion info for topic partition: storeC_v89-40, isCurrentVersion: false
storeA_v3-7: {latestOffset:4000961, offsetLag:3971738, msgRate:0.0, byteRate:0.0, consumerIdStr:venice-shared-consumer-..., elapsedTimeSinceLastConsumerPollInMs:53694, elapsedTimeSinceLastRecordForPartitionInMs:385294, versionTopicName:storeA_v3}
storeB_v2-84: {latestOffset:5346109, offsetLag:0, msgRate:0.0, byteRate:0.0, consumerIdStr:venice-shared-consumer-..., elapsedTimeSinceLastConsumerPollInMs:53694, elapsedTimeSinceLastRecordForPartitionInMs:385301, versionTopicName:storeB_v2}
... 14 more rows in random order ...

Change

Rewrites KafkaConsumerService.convertTopicPartitionIngestionInfoMapToStr to produce a fixed-width, sorted table:

  • Hoists consumer-level fields into a header line (printed once, not per row).
  • Sorts rows by offsetLag descending with a deterministic tie-breaker by partition name ascending (so equal-lag rows — common at lag=0 — render in stable order across runs).
  • Keeps versionTopic as a column: for hybrid leaders past EOP, the partition is on a real-time topic (e.g. store_rt-40) but the consumer is hydrating a versioned topic (e.g. store_v3). The mapping is not derivable from the partition name alone.
  • Accepts a triggeringPartition parameter; when non-null and matching a row, that row's 4-character left margin is replaced with " * " and the row is suffixed with "(triggered)" so operators can immediately see which partition prompted the dump. A backward-compatible single-argument overload is preserved so the change is source/binary-compatible.
  • Column widths adapt to actual data so values stay aligned regardless of partition-name length or value magnitude.
  • Locale-safe numeric formatting (Locale.ROOT) and consistent \n line endings across platforms.

AggKafkaConsumerService.getIngestionInfoFor(...,regionName) now passes the queried partition through as the trigger. The slow-shared-consumer alarm site in KafkaConsumerService.getMaxElapsedTimeMSSinceLastPollInConsumerPool passes null since that signal is not tied to a specific partition.

KafkaStoreIngestionService.attemptToPrintIngestionInfoFor:

  • Reformats the WARN message so the structured fields (replicaId, isCurrentVersion, region) are SLF4J placeholders rather than concatenated into the message body.
  • Downgrades two paired ERROR logs to WARN for benign cleanup races (SIT-null and PCS-null when the heartbeat-lag callback arrives after the version's ingestion task or partition consumption state has been torn down). Both WARNs are now rate-limited via a static RedundantExceptionFilter (10-minute window, keyed by "sit-null:<versionTopic>" / "pcs-null:<replicaId>") so a partition being torn down produces one WARN per cleanup window instead of one per heartbeat-lag callback.

Sample new output

Same 16-partition dump as above, after the change (output captured directly from the unit test, with synthetic store names):

Heartbeat-lag dump for replica storeC_v89-40 (isCurrentVersion=false, region=ei-ltx1):
consumer=venice-shared-consumer-...-for_non_current_non_aa_wc_leader-0  lastPoll=53694ms  partitions=16
    partition          lag  msgRate  byteRate  lastRecord(ms)  latestOffset  versionTopic
    storeA_v3-32   3979827     0.00      0.00         1120585       4005186  storeA_v3
    storeA_v3-25   3972689     0.00      0.00          268715       4002268  storeA_v3
    storeA_v3-7    3971738     0.00      0.00          385294       4000961  storeA_v3
    storeA_v3-48   3971503     0.00      0.00         1527710       3999731  storeA_v3
    storeA_v3-58   3971168     0.00      0.00          936733       3999572  storeA_v3
    storeA_v3-0    3970739     0.00      0.00         1226920       3997774  storeA_v3
    storeA_v3-52   3968092     0.00      0.00          576128       3997262  storeA_v3
    storeB_v2-33       353     5.14    735.75           53694       5361438  storeB_v2
    storeB_v2-45       168     0.00      0.00          385309       5391638  storeB_v2
    storeB_v2-44       128     0.00      0.00          969889       5351890  storeB_v2
    storeC_v89-82       44     0.00      0.00          969896       8243155  storeC_v89
    storeB_v2-84         0     0.00      0.00          385301       5346109  storeB_v2
    storeC_v89-10        0     0.00      0.00          268715       8235941  storeC_v89
    storeC_v89-21        0     0.00      0.00          385313       8227759  storeC_v89
  * storeC_v89-40        0     0.00      0.00         1226927       8232319  storeC_v89  (triggered)
    storeC_v89-90        0     0.00      0.00         1120579       8220505  storeC_v89

The 7 storeA_v3-* partitions now cluster at the top in lag-descending order — the actual signal. Equal-lag rows fall back to partition-name ascending. The triggering partition is unambiguously marked even when its offsetLag matches several siblings in the lag=0 cluster.

Compatibility

  • Pure logging-format change. No protocol, schema, or persistent-state changes.
  • convertTopicPartitionIngestionInfoMapToStr retains its previous single-argument signature as a backward-compatible overload, plus a new two-argument form that accepts triggeringPartition. External callers compiled against earlier da-vinci-client artifacts continue to link.
  • TopicPartitionIngestionInfo.toString() and JSON serialization are untouched, so any downstream JSON consumers continue to work.
  • The WARN log message text in attemptToPrintIngestionInfoFor changed from "Ingestion info for topic partition: ..." to "Heartbeat-lag dump for replica ...". Any log-based alert or dashboard pattern-matching on the old prefix needs updating.

Tests

15 unit tests in KafkaConsumerServiceFormatterTest covering: null/empty input, single-row layout, sort order including the deterministic tie-breaker, trigger marker placement, column-width adaptation, consumer-field hoisting, byteRate/msgRate formatting (incl. zero/negative), versionTopic column for hybrid leader, defensive equals-based partition matching, and a synthetic 16-partition reconstruction of the production scenario that motivated the change. Plus updated AggKafkaConsumerServiceTest.testGetIngestionInfoForSuccess for the new format.

All 37 tests in the affected classes pass locally; CI green on previous push.

🤖 Generated with Claude Code

Copilot AI review requested due to automatic review settings April 29, 2026 01:00
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves operational log readability for heartbeat-lag ingestion dumps in Da Vinci by switching to a sorted, fixed-width table format with a clear “triggering partition” marker. It also includes a controller-side fix + regression test to ensure loadAllPushes() evaluates the refreshed ZK push status object rather than a stale loop variable.

Changes:

  • Rework KafkaConsumerService.convertTopicPartitionIngestionInfoMapToStr(...) to output a fixed-width, lag-sorted table with consumer-level fields hoisted into a header and an optional triggering-partition marker.
  • Update ingestion-info call sites and the heartbeat-lag WARN log to use structured placeholders and pass through the triggering partition where applicable.
  • Fix controller AbstractPushMonitor.loadAllPushes() to use the refreshed OfflinePushStatus returned by updateOfflinePush(), and add a regression test.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java Reassign loop variable to refreshed push status returned from ZK so subsequent status checks use up-to-date partition status.
services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PartitionStatusBasedPushMonitorTest.java Adds a regression test reproducing the stale-loop-variable issue during controller restart bulk-load.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java Implements the new ingestion-info table formatter (sorted, fixed-width, optional trigger marker).
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java Passes the queried partition through as the triggering partition for marking.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java Reformats the heartbeat-lag dump WARN to use SLF4J placeholders and include region/current-version fields.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceFormatterTest.java New unit tests covering the formatter’s sorting, alignment, header hoisting, and trigger marking behavior.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java Updates assertions to match the new formatter output and trigger marking.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

haoxu07 added 2 commits April 28, 2026 21:50
The per-partition ingestion info log emitted by
`attemptToPrintIngestionInfoFor`
when a heartbeat lag triggers a dump was hard to read in production:

- Header line claimed the dump was for one partition but the body listed every
  sibling partition assigned to the same shared consumer (16+ entries common).
- Consumer-level fields (`consumerIdStr`,
`elapsedTimeSinceLastConsumerPollInMs`)
  were repeated identically on every row.
- `versionTopicName` was redundant with the partition name itself.
- Rows were emitted in `HashMap` iteration order, hiding the worst-lag
partitions
  among healthy siblings.
- The triggering partition was not distinguishable from its siblings.

This change rewrites
`KafkaConsumerService.convertTopicPartitionIngestionInfoMapToStr`
to produce a fixed-width, sorted table:

- Hoists consumer-level fields into a header line (printed once).
- Sorts rows by `offsetLag` descending so the worst offenders surface at the
top.
- Drops `versionTopicName` and the per-row consumer fields from each row.
- Accepts a `triggeringPartition` parameter; when non-null and matching a row,
  that row is prefixed with `* ` and suffixed with `(triggered)` so operators
  can immediately see which partition prompted the dump.
- Column widths adapt to actual data so values stay aligned.

`AggKafkaConsumerService.getIngestionInfoFor(...,regionName)` now passes the
queried partition through as the trigger. The slow-shared-consumer alarm site
in `KafkaConsumerService.getMaxElapsedTimeMSSinceLastPollInConsumerPool`
passes
`null` since that signal is not tied to a specific partition.

`KafkaStoreIngestionService.attemptToPrintIngestionInfoFor` reformats the WARN
message ("Heartbeat-lag dump for replica X (isCurrentVersion=Y, region=Z)") so
the structured fields are SLF4J placeholders rather than concatenated into the
message body.

Sample output (16-partition shared consumer dump):

  consumer=venice-shared-consumer-...  lastPoll=53694ms  partitions=16
      partition                                  lag  msgRate  byteRate
lastRecord(ms)  latestOffset
      cert-basic-dataset-northguard_v3-32   3979827     0.00      0.00
1120585       4005186
      cert-basic-dataset-northguard_v3-25   3972689     0.00      0.00
268715       4002268
      ...
    * cert-basic-dataset_v89-40                   0     0.00      0.00
1226927       8232319  (triggered)

Tests:
- 16 new unit tests in `KafkaConsumerServiceFormatterTest` covering null/empty
  input, sort order, trigger marker placement, column-width adaptation,
  consumer-field hoisting, byteRate formatting, and a realistic 16-partition
  reproduction of the production log that motivated the change.
- Updated `AggKafkaConsumerServiceTest.testGetIngestionInfoForSuccess` for the
  new format (key:value form is gone).
For hybrid leaders past EOP, the partition is on a real-time topic (e.g.
`store_rt-40`) but the consumer is hydrating a versioned topic (e.g.
`store_v3`). The mapping from realtime partition to consuming version is
not derivable from the partition name alone, so the previous change
removed important diagnostic information when it dropped this field.

Restore `versionTopic` as a left-aligned column at the end of the row so
the realtime → version mapping is visible. For batch partitions the
column duplicates the partition's topic (slight redundancy), but
consistency wins over conditional emission and the column never alters
its position.

Two new unit tests:
- `versionTopicColumnDistinguishesHybridLeaderFromBatch` — confirms an
  rt-partition row prints the consuming version topic.
- `emptyVersionTopicIsRenderedAsBlankNotNull` — defensive coverage for
  the legacy null-versionTopic path.

Updated existing tests:
- Renamed `columnHeaderListsSixColumnsInOrder` → `...AllColumnsInOrder`
  and verifies all seven titles appear in order.
- `widthsAdaptToHeaderWhenDataIsSmall` now compares row vs header total
  width as the alignment invariant (more robust than per-column math).
- `realisticSharedConsumerScenarioFromIncident` now uses realistic
  per-partition versionTopic values matching each store's topic.
- `AggKafkaConsumerServiceTest.testGetIngestionInfoForSuccess` asserts
  on the `versionTopic` column header and per-row value.
@haoxu07 haoxu07 force-pushed the xhao/improve-ingestion-info-log-format branch from 92df117 to b86a4b1 Compare April 29, 2026 04:51
…ed tests

Two correctness fixes from Copilot review:

1. `String.format` calls now pass `Locale.ROOT` so the decimal separator
   stays consistent across hosts regardless of the JVM default locale
   (avoids `,` instead of `.` on non-US locales).
2. Javadoc updated to reflect that triggering rows use a `"  * "` prefix
   (4-char left margin) and non-trigger rows use `"    "`. The previous
   wording said `"* "` which didn't match the actual output.

Test condensation (453 → 350 lines, 18 → 14 tests, no coverage loss):
- Merged `nullMapReturnsEmptyString` + `emptyMapReturnsEmptyString` into
  one test.
- Folded `headerCarriesConsumerLevelFields`,
`columnHeaderListsAllColumnsInOrder`,
  and `singleEntryProducesHeaderColumnHeaderAndOneRow` into a single test
  driven by the same single-entry map (each was checking a different
  facet of the same 3-line output).
- Merged `nullTriggeringPartitionMarksNoRow` + `triggerNotInMapMarksNoRow`
  into `noMarkerWhenTriggerIsNullOrAbsentFromMap` parameterized over the
  two trigger values (both verify the same "no marker" invariant).
- Tightened `columnsAlignAcrossRowsRegardlessOfDataWidth` and
  `realisticSharedConsumerScenarioFromIncident` by removing redundant
  intermediate computations and inlined error messages.
- Added small `fmt(...)` and `rowIndexContaining(...)` helpers to remove
  ceremony from individual tests.
Copilot AI review requested due to automatic review settings April 29, 2026 17:14
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Improves operational readability of heartbeat-lag ingestion dumps by switching from per-partition toString() logging to a structured, fixed-width table, and by making the WARN message in KafkaStoreIngestionService more structured.

Changes:

  • Reworked KafkaConsumerService.convertTopicPartitionIngestionInfoMapToStr to emit a fixed-width table with a hoisted consumer header, lag-sorted rows, and optional “triggering partition” marking.
  • Updated ingestion-info call sites to pass the triggering partition where applicable (and null where not applicable).
  • Added/updated unit tests to validate the new formatting and trigger marking behavior.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java Implements the new ingestion-info table formatter and updates slow-consumer alarm logging to use the new signature.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java Passes the queried partition as the triggering partition to the formatter.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java Rewrites the heartbeat-lag WARN message to use structured placeholders and include the formatted dump.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceFormatterTest.java Adds comprehensive unit tests for the formatter output, sorting, alignment, and trigger marking.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java Updates assertions to match the new header/column-based output format and trigger marking.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

haoxu07 added 2 commits April 29, 2026 11:55
…noisy

errors

Three fixes from PR feedback:

1. **Deterministic sort tie-breaker.** The previous sort considered only
   `offsetLag`, so equal-lag rows (very common at lag=0) fell back to
   HashMap iteration order — log output for the same input would differ
   across JVMs and runs. Chained `.thenComparing(toString)` so equal-lag
   rows are ordered by partition name ascending. New unit test
   `equalLagRowsAreOrderedDeterministicallyByPartitionName` runs the
   formatter twice on the same input and asserts identical output.

2. **Consistent line endings.** The header line uses literal `\n` but
   the format strings used `%n` (platform line separator). On Linux
   both produce `\n`, but on Windows `%n` would emit `\r\n`, mixing
   line endings within the same dump and breaking `out.split("\n")`
   in tests. Replaced `%n` with `\n` in `headerFmt` and `rowFmt` so
   the formatter emits LF deterministically across platforms.

3. **Downgrade SIT-null and PCS-null ERROR to WARN in
   `attemptToPrintIngestionInfoFor`.** Both fire on benign cleanup
   races when a heartbeat-lag callback arrives after the version's
   ingestion task or partition consumption state has been torn down.
   The dump itself is purely diagnostic; the underlying ingestion
   problem (if any) surfaces through other logs. Logging at ERROR
   tipped ERROR-rate dashboards red without driving any action — same
   noise pattern this PR addresses for the WARN dump itself.
The realistic-scenario test used real production store names
(`cert-basic-dataset_v89`, `cert-basic-dataset-northguard_v3`,
`aa-partial-update-benchmark-medium_v2`) borrowed from the original
incident log. Replaced with synthetic names (`storeA_v3`, `storeB_v2`,
`storeC_v89`) and renamed the test from
`realisticSharedConsumerScenarioFromIncident` to
`realisticSharedConsumerScenario`. Test logic and assertions are
unchanged — same 16 partitions, same lag values, same trigger.
Copilot AI review requested due to automatic review settings April 30, 2026 06:19
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Improves readability of heartbeat-lag ingestion info logs by formatting per-consumer partition ingestion info as a sorted, fixed-width table and by making the heartbeat-lag WARN log message more structured.

Changes:

  • Reworked KafkaConsumerService.convertTopicPartitionIngestionInfoMapToStr to output a fixed-width table sorted by lag, with an optional “triggering partition” marker.
  • Updated ingestion-info callers to pass the triggering partition where applicable and updated log formatting in KafkaStoreIngestionService.
  • Added/updated unit tests to validate the new formatter output and trigger marking behavior.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java Implements new sorted table formatter and adds triggering-partition marking.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java Passes queried partition as the triggering partition into the formatter.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java Refactors heartbeat-lag dump WARN message to use placeholders and updates some log levels.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceFormatterTest.java New unit tests covering sorting, alignment, header hoisting, and trigger marker behavior.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java Updates assertions to match the new formatted output.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Two follow-up fixes for the latest review round:

1. **Rate-limit the SIT-null and PCS-null WARNs**
   (sixpluszero confirmed "it will keep firing and very noisy"). Added
   a static `RedundantExceptionFilter` to `KafkaStoreIngestionService`
   keyed by `"sit-null:<versionTopic>"` and `"pcs-null:<replicaId>"`,
   sized 8 MiB, 10-minute reset window. The heartbeat-lag callback can
   fire repeatedly during version cleanup or partition rebalance —
   one WARN per replica per cleanup window is enough for diagnosis,
   and operators stop seeing the same line spamming the dashboard.

2. **Restore the previous single-arg formatter signature**
   (Copilot: changing the `public static` signature is a binary-breaking
   change for any external code compiled against earlier
   da-vinci-client artifacts). Added a one-line overload
   `convertTopicPartitionIngestionInfoMapToStr(Map)` that delegates to
   the new two-arg form with `null` trigger. Source/binary compatibility
   restored; new callers can use either signature.
Copy link
Copy Markdown
Contributor

@sixpluszero sixpluszero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the sit check fix!

Comparator.<Map.Entry<PubSubTopicPartition, TopicPartitionIngestionInfo>>comparingLong(
e -> e.getValue().getOffsetLag()).reversed().thenComparing(e -> e.getKey().toString()));

// Pre-format float fields and compute column widths from actual data. We keep the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can consider moving this to util. and this looks heavy so it should not be used in a frequent call fashion but only for a point look up

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants