Skip to content

Implement pluggable Lineage in Java SDK#36781

Merged
Abacn merged 28 commits into
apache:masterfrom
shnapz:pluggable-lineage
Apr 23, 2026
Merged

Implement pluggable Lineage in Java SDK#36781
Abacn merged 28 commits into
apache:masterfrom
shnapz:pluggable-lineage

Conversation

@shnapz

@shnapz shnapz commented Nov 10, 2025

Copy link
Copy Markdown
Contributor

Addresses #36790: "[Feature Request]: Make lineage tracking pluggable"

Changes

  • Created org.apache.beam.sdk.lineage.LineageBase - Plugin interface with single add() method
  • Created org.apache.beam.sdk.lineage.LineageOptions - Pipeline option (--lineageType) for explicit plugin selection
  • Refactored org.apache.beam.sdk.metrics.Lineage - Hardcoded metrics → Delegation to LineageBase plugins
  • Created org.apache.beam.sdk.metrics.BoundedTrieMetricsLineage - Default BoundedTrie-based implementation
  • Created org.apache.beam.sdk.metrics.StringSetMetricsLineage - Default StringSet-based implementation
  • Added Plugin initialization in FileSystems.setDefaultPipelineOptions()

Architecture

Before (master): Lineage was a concrete class hardcoded to use Beam metrics:

public class Lineage {
  private static final Lineage SOURCES = new Lineage(Type.SOURCE);
  private static final Lineage SINKS = new Lineage(Type.SINK);
  private final Metric metric;  // Hardcoded to Beam metrics

  private Lineage(Type type) {
    this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString());
  }

  public void add(Iterable<String> segments) {
    ((StringSet) metric).add(String.join("", segments));  // Always metrics
  }
}

After (this PR): Clean separation via composition pattern:

// Plugin contract (simple interface)
public interface LineageBase {
  void add(Iterable<String> rollupSegments);
}

// Public API (final facade delegating to plugin)
public final class Lineage {
  private final LineageBase delegate;  // Plugin implementation

  private Lineage(LineageBase delegate) {
    this.delegate = delegate;
  }

  // Delegates to plugin
  public void add(Iterable<String> segments) {
    delegate.add(segments);
  }

  // Convenience overloads
  public void add(String system, Iterable<String> segments) { ... }
  public void add(String system, String subtype, ...) { ... }

  // Static utilities (unchanged)
  public static Lineage getSources() { ... }
  public static Lineage getSinks() { ... }
  public static String wrapSegment(String value) { ... }
  public static Set<String> query(MetricResults results, Type type) { ... }
}

// Default implementations (type-safe, no casts)
class BoundedTrieMetricsLineage implements LineageBase {
  private final BoundedTrie metric;
  @Override
  public void add(Iterable<String> segments) {
    metric.add(ImmutableList.copyOf(segments));
  }
}

class StringSetMetricsLineage implements LineageBase {
  private final StringSet metric;
  @Override
  public void add(Iterable<String> segments) {
    metric.add(String.join("", segments));
  }
}

Plugin Selection: Explicit via --lineageType pipeline option. The specified class must implement LineageBase and have a public (PipelineOptions, Lineage.LineageDirection) constructor. When not set, defaults to metrics-based lineage.

Backward Compatibility: ✅ All existing code works unchanged (24+ call sites, static utilities, enums).

Why Pluggable Lineage?

1. Runner Fragmentation

Metrics-based lineage is scattered across runners with inconsistent support:

  • Dataflow: Real-time metrics export to Cloud Monitoring
  • Flink: Batch-only aggregation (no streaming support yet)
  • Spark/Direct: Varying levels of support

Impact: Multi-runner organizations must consolidate lineage from different metrics backends, each with different APIs and formats.

Plugin Solution: Single implementation works consistently across all runners.

2. Enterprise Integration

Organizations with existing lineage infrastructure need:

  • Direct API integration (Atlan, Collibra, Marquez, DataHub, OpenLineage)
  • Custom metadata enrichment not in metrics subsystem

Example: Flyte workflow executing a Beam pipeline needs to tag lineage with Flyte execution ID and cost allocation. This context exists in the orchestrator, not in Beam workers' metrics.

3. Standard Formats

OpenLineage is the industry standard. Plugin enables direct emission vs. export metrics → parse → transform → send.

Initialization

Lineage.setDefaultPipelineOptions(options) is called from FileSystems.setDefaultPipelineOptions() (same pattern as Metrics).

Rationale: FileSystems.setDefaultPipelineOptions() is called at 48+ locations covering all execution scenarios (pipeline construction, worker startup, deserialization).

Known Limitation: Follows existing FileSystems pattern despite known issues (#18430). Architectural improvements would address all subsystems together.

Thread Safety

Uses synchronized block for thread-safe initialization. setDefaultPipelineOptions always re-initializes under a lock, ensuring plugins pick up any options changes.

Example: OpenLineage Plugin

For demonstration only (OpenLineage integration out of scope)

// 1. Implement LineageBase with required constructor
class OpenLineageReporter implements LineageBase {
  private final String endpoint;
  private final Lineage.LineageDirection direction;

  // Required constructor for --lineageType instantiation
  public OpenLineageReporter(PipelineOptions options, Lineage.LineageDirection direction) {
    OpenLineageOptions opts = options.as(OpenLineageOptions.class);
    this.endpoint = opts.getOpenLineageUrl();
    this.direction = direction;
  }

  @Override
  public void add(Iterable<String> rollupSegments) {
    String fqn = String.join("", rollupSegments);
    // POST to OpenLineage API with workflow context
    sendToOpenLineage(endpoint, direction, fqn);
  }
}

// 2. Plugin options
public interface OpenLineageOptions extends PipelineOptions {
  @Description("OpenLineage endpoint URL")
  String getOpenLineageUrl();
  void setOpenLineageUrl(String url);
}

// 3. Usage — select plugin via pipeline option
PipelineOptions options = PipelineOptionsFactory.create();
options.as(OpenLineageOptions.class).setOpenLineageUrl("https://lineage-api.example.com");
options.as(LineageOptions.class).setLineageType(OpenLineageReporter.class);
Pipeline p = Pipeline.create(options);

// Or from command line:
// --lineageType=com.example.OpenLineageReporter --openLineageUrl=https://lineage-api.example.com

@github-actions github-actions Bot added the java label Nov 10, 2025
@shnapz shnapz changed the title Add pluggable LineageReporter interface Make Lineage pluggable Nov 11, 2025
@shnapz shnapz changed the title Make Lineage pluggable Implement pluggable Lineage in Java SDK Nov 11, 2025
? (JmsTextMessage message) -> {
if (message == null) {
return null;
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

irrelevant change to fix flaky tests

assertTrue(
String.format("Too many unacknowledged messages: %d", unackRecords),
unackRecords < OPTIONS.getNumberOfRecords() * 0.003);
unackRecords < OPTIONS.getNumberOfRecords() * 0.005);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

irrelevant change to fix flaky tests

@shnapz shnapz marked this pull request as ready for review December 31, 2025 17:51
@github-actions

Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @kennknowles for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@codecov

codecov Bot commented Dec 31, 2025

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 56.86%. Comparing base (298f309) to head (7bcab65).
⚠️ Report is 12 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #36781      +/-   ##
============================================
- Coverage     56.87%   56.86%   -0.01%     
+ Complexity     3417     3414       -3     
============================================
  Files          1178     1178              
  Lines        187492   187495       +3     
  Branches       3590     3590              
============================================
- Hits         106628   106620       -8     
- Misses        77472    77480       +8     
- Partials       3392     3395       +3     
Flag Coverage Δ
java 71.92% <ø> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions

github-actions Bot commented Jan 8, 2026

Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @kennknowles

@shnapz

shnapz commented Jan 12, 2026

Copy link
Copy Markdown
Contributor Author

[PreCommit Java / beam_PreCommit_Java](https://github.com/apache/beam/actions/runs/20927101528/job/60127906908?pr=36781) has apparently unrelated grpc client failure. Before I rebased on latest master it was green

@github-actions

Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@shnapz

shnapz commented Jan 20, 2026

Copy link
Copy Markdown
Contributor Author

@Abacn or @kennknowles would you mind taking a look? The change is low risk and introduces some important flexibility

@github-actions

Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @Abacn

@github-actions

Copy link
Copy Markdown
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions

github-actions Bot commented Feb 6, 2026

Copy link
Copy Markdown
Contributor

Reminder, please take a look at this pr: @kennknowles

@shnapz shnapz force-pushed the pluggable-lineage branch from f0fa053 to bf61f18 Compare April 21, 2026 22:29
@shnapz

shnapz commented Apr 21, 2026

Copy link
Copy Markdown
Contributor Author

@Abacn thanks, pushed a rewrite along these lines:

  • Double-checked lock with canSkipInit(requestedType): skip when SOURCES != null && (requestedType == null || requestedType.equals(CURRENT_LINEAGE_TYPE)).
  • Tracking the active type in a separate volatile Class<? extends LineageBase> CURRENT_LINEAGE_TYPE. Functionally equivalent to your sketch.
  • Switched SOURCES/SINKS to volatile.
  • Also dropped the lazy fallback in getSources()/getSinks() - now checkNotNull with a clear message, so we never silently swap plugin lineage for defaults. WDYT?

PTAL

FYI Two tests in StreamingDataflowWorkerTest failed, flaky and unrelated

@Abacn Abacn left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I see the multiple intialization is resolved.

Comment thread CHANGES.md Outdated

## New Features / Improvements

* Added plugin mechanism to support different Lineage implementations (Java) ([#36790](https://github.com/apache/beam/issues/36790)).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes to Beam 2.74.0 for now.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, moved to 2.74 section


private final BoundedTrie metric;

BoundedTrieMetricsLineage(Lineage.LineageDirection direction) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both StringSetMetricsLineage and StringSetMetricsLineage's constructor doesn't satisfy its own LineageBase interface documentation:

<p>Implementations must provide a public constructor accepting ({@link
 * org.apache.beam.sdk.options.PipelineOptions}, {@link
 * org.apache.beam.sdk.metrics.Lineage.LineageDirection}).

please fix

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed - added this parameter to build-in plugins as well, now all are aligned with LineageBase

@shnapz shnapz requested a review from Abacn April 22, 2026 19:54

@Abacn Abacn left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Will merge once tests passed

@shnapz

shnapz commented Apr 23, 2026

Copy link
Copy Markdown
Contributor Author

@Abacn all checks are green 👍

@Abacn Abacn merged commit 5a5f402 into apache:master Apr 23, 2026
24 of 28 checks passed
@derrickaw derrickaw mentioned this pull request Apr 24, 2026
3 tasks
}

/** Helper to create a TestPipeline with test lineage configured. */
private TestPipeline createTestPipelineWithLineage() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it creates TestPipeline in an unconventional way that ignores beamTestPipelineOptions, breaking validate runners

At this stage I would suggest we revert this change at the moment, since it breaks multiple tests+misconfigured tests, then split this PR into smaller ones

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fully understand this is a difficult task as it involves refactoring and decoupling core component thus very easy to break. Thanks again for your persistence

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternatively, a forward fix: #38296

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Abacn your fix looks great! I hope it gets merged!

@shnapz

shnapz commented Apr 28, 2026

Copy link
Copy Markdown
Contributor Author

@Abacn now #38296 is merged, is there any issue left?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants