Implement pluggable Lineage in Java SDK#36781
Conversation
| ? (JmsTextMessage message) -> { | ||
| if (message == null) { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
irrelevant change to fix flaky tests
| assertTrue( | ||
| String.format("Too many unacknowledged messages: %d", unackRecords), | ||
| unackRecords < OPTIONS.getNumberOfRecords() * 0.003); | ||
| unackRecords < OPTIONS.getNumberOfRecords() * 0.005); |
There was a problem hiding this comment.
irrelevant change to fix flaky tests
e8b6a7e to
661f5c7
Compare
23b45a2 to
4065277
Compare
|
Assigning reviewers: R: @kennknowles for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #36781 +/- ##
============================================
- Coverage 56.87% 56.86% -0.01%
+ Complexity 3417 3414 -3
============================================
Files 1178 1178
Lines 187492 187495 +3
Branches 3590 3590
============================================
- Hits 106628 106620 -8
- Misses 77472 77480 +8
- Partials 3392 3395 +3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Reminder, please take a look at this pr: @kennknowles |
032292b to
6d7a434
Compare
|
|
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
|
|
@Abacn or @kennknowles would you mind taking a look? The change is low risk and introduces some important flexibility |
|
Reminder, please take a look at this pr: @Abacn |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
|
|
Reminder, please take a look at this pr: @kennknowles |
f0fa053 to
bf61f18
Compare
|
@Abacn thanks, pushed a rewrite along these lines:
PTAL FYI Two tests in StreamingDataflowWorkerTest failed, flaky and unrelated |
Abacn
left a comment
There was a problem hiding this comment.
Thanks, I see the multiple intialization is resolved.
|
|
||
| ## New Features / Improvements | ||
|
|
||
| * Added plugin mechanism to support different Lineage implementations (Java) ([#36790](https://github.com/apache/beam/issues/36790)). |
There was a problem hiding this comment.
This goes to Beam 2.74.0 for now.
There was a problem hiding this comment.
Done, moved to 2.74 section
|
|
||
| private final BoundedTrie metric; | ||
|
|
||
| BoundedTrieMetricsLineage(Lineage.LineageDirection direction) { |
There was a problem hiding this comment.
Both StringSetMetricsLineage and StringSetMetricsLineage's constructor doesn't satisfy its own LineageBase interface documentation:
<p>Implementations must provide a public constructor accepting ({@link
* org.apache.beam.sdk.options.PipelineOptions}, {@link
* org.apache.beam.sdk.metrics.Lineage.LineageDirection}).
please fix
There was a problem hiding this comment.
Fixed - added this parameter to build-in plugins as well, now all are aligned with LineageBase
Abacn
left a comment
There was a problem hiding this comment.
Thanks! Will merge once tests passed
|
@Abacn all checks are green 👍 |
| } | ||
|
|
||
| /** Helper to create a TestPipeline with test lineage configured. */ | ||
| private TestPipeline createTestPipelineWithLineage() { |
There was a problem hiding this comment.
Here it creates TestPipeline in an unconventional way that ignores beamTestPipelineOptions, breaking validate runners
At this stage I would suggest we revert this change at the moment, since it breaks multiple tests+misconfigured tests, then split this PR into smaller ones
There was a problem hiding this comment.
Fully understand this is a difficult task as it involves refactoring and decoupling core component thus very easy to break. Thanks again for your persistence
There was a problem hiding this comment.
@Abacn your fix looks great! I hope it gets merged!
Addresses #36790: "[Feature Request]: Make lineage tracking pluggable"
Changes
org.apache.beam.sdk.lineage.LineageBase- Plugin interface with singleadd()methodorg.apache.beam.sdk.lineage.LineageOptions- Pipeline option (--lineageType) for explicit plugin selectionorg.apache.beam.sdk.metrics.Lineage- Hardcoded metrics → Delegation toLineageBasepluginsorg.apache.beam.sdk.metrics.BoundedTrieMetricsLineage- Default BoundedTrie-based implementationorg.apache.beam.sdk.metrics.StringSetMetricsLineage- Default StringSet-based implementationFileSystems.setDefaultPipelineOptions()Architecture
Before (master):
Lineagewas a concrete class hardcoded to use Beam metrics:After (this PR): Clean separation via composition pattern:
Plugin Selection: Explicit via
--lineageTypepipeline option. The specified class must implementLineageBaseand have a public(PipelineOptions, Lineage.LineageDirection)constructor. When not set, defaults to metrics-based lineage.Backward Compatibility: ✅ All existing code works unchanged (24+ call sites, static utilities, enums).
Why Pluggable Lineage?
1. Runner Fragmentation
Metrics-based lineage is scattered across runners with inconsistent support:
Impact: Multi-runner organizations must consolidate lineage from different metrics backends, each with different APIs and formats.
Plugin Solution: Single implementation works consistently across all runners.
2. Enterprise Integration
Organizations with existing lineage infrastructure need:
Example: Flyte workflow executing a Beam pipeline needs to tag lineage with Flyte execution ID and cost allocation. This context exists in the orchestrator, not in Beam workers' metrics.
3. Standard Formats
OpenLineage is the industry standard. Plugin enables direct emission vs. export metrics → parse → transform → send.
Initialization
Lineage.setDefaultPipelineOptions(options)is called fromFileSystems.setDefaultPipelineOptions()(same pattern asMetrics).Rationale:
FileSystems.setDefaultPipelineOptions()is called at 48+ locations covering all execution scenarios (pipeline construction, worker startup, deserialization).Known Limitation: Follows existing
FileSystemspattern despite known issues (#18430). Architectural improvements would address all subsystems together.Thread Safety
Uses synchronized block for thread-safe initialization.
setDefaultPipelineOptionsalways re-initializes under a lock, ensuring plugins pick up any options changes.Example: OpenLineage Plugin
For demonstration only (OpenLineage integration out of scope)