From 3162b0ca683b4781f15e84c765f9841194ab47de Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 27 Apr 2026 16:30:22 -0400 Subject: [PATCH] Fix LineagePluginTest on runners other than Direct runner --- ...PostCommit_Java_ValidatesRunner_Samza.json | 3 +- .../beam/sdk/lineage/LineagePluginTest.java | 58 +++++-------------- 2 files changed, 18 insertions(+), 43 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json index db03186ab405..6acc30e8280f 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json @@ -4,5 +4,6 @@ "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", "https://github.com/apache/beam/pull/31270": "re-add specialized Samza translation of Redistribute", - "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" + "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface", + "modification": 1 } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineagePluginTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineagePluginTest.java index ff50dc1ffbe2..fd78dd200bf9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineagePluginTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineagePluginTest.java @@ -26,7 +26,6 @@ import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.NeedsRunner; @@ -52,6 +51,8 @@ public class LineagePluginTest { private static final Logger LOG = LoggerFactory.getLogger(LineagePluginTest.class); + @Rule public TestPipeline testPipeline = createTestPipelineWithLineage(); + /** * TestWatcher that logs detailed lineage diagnostics only when tests fail. This keeps successful * test output clean while providing deep debugging for failures. @@ -89,13 +90,11 @@ public void setUp() { } /** Helper to create a TestPipeline with test lineage configured. */ - private TestPipeline createTestPipelineWithLineage() { - LineageOptions options = PipelineOptionsFactory.create().as(LineageOptions.class); + private static TestPipeline createTestPipelineWithLineage() { + TestPipeline testPipeline = TestPipeline.create(); + LineageOptions options = testPipeline.getOptions().as(LineageOptions.class); options.setLineageType(TestLineage.class); - TestPipeline pipeline = TestPipeline.fromOptions(options); - // Disable enforcement since we're not using @Rule - pipeline.enableAbandonedNodeEnforcement(false); - return pipeline; + return testPipeline; } @Test @@ -120,16 +119,11 @@ public void testExplicitLineageSelection() { @Test @Category(NeedsRunner.class) public void testLineageIntegrationWithSimpleFQN() { - // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() - TestPipeline pipeline = createTestPipelineWithLineage(); - // Run pipeline that records lineage - pipeline + testPipeline .apply(Create.of("a", "b", "c")) .apply(ParDo.of(new RecordSourceLineageDoFn("testsystem", Arrays.asList("db", "table")))); - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); + testPipeline.run(); // Verify lineage was recorded List sources = TestLineage.getRecordedSources(); @@ -139,11 +133,8 @@ public void testLineageIntegrationWithSimpleFQN() { @Test @Category(NeedsRunner.class) public void testLineageIntegrationWithSubtype() { - // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() - TestPipeline pipeline = createTestPipelineWithLineage(); - // Run pipeline that records lineage with subtype - pipeline + testPipeline .apply(Create.of(1, 2, 3)) .apply( ParDo.of( @@ -151,9 +142,7 @@ public void testLineageIntegrationWithSubtype() { "spanner", "table", Arrays.asList("project", "instance", "database", "table")))); - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); + testPipeline.run(); // Verify lineage was recorded with subtype List sources = TestLineage.getRecordedSources(); @@ -163,19 +152,14 @@ public void testLineageIntegrationWithSubtype() { @Test @Category(NeedsRunner.class) public void testLineageIntegrationWithLastSegmentSeparator() { - // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() - TestPipeline pipeline = createTestPipelineWithLineage(); - // Run pipeline that records lineage with custom separator - pipeline + testPipeline .apply(Create.of("x", "y", "z")) .apply( ParDo.of( new RecordSourceLineageWithSeparatorDoFn( "gcs", Arrays.asList("bucket", "path/to/file.txt"), "/"))); - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); + testPipeline.run(); // Verify lineage was recorded with separator List sources = TestLineage.getRecordedSources(); @@ -185,16 +169,11 @@ public void testLineageIntegrationWithLastSegmentSeparator() { @Test @Category(NeedsRunner.class) public void testLineageIntegrationWithBothSourcesAndSinks() { - // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() - TestPipeline pipeline = createTestPipelineWithLineage(); - // Run pipeline that records both source and sink lineage - pipeline + testPipeline .apply(Create.of("data1", "data2")) .apply(ParDo.of(new RecordBothSourceAndSinkLineageDoFn())); - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); + testPipeline.run(); // Verify both source and sink lineage were recorded List sources = TestLineage.getRecordedSources(); @@ -207,16 +186,11 @@ public void testLineageIntegrationWithBothSourcesAndSinks() { @Test @Category(NeedsRunner.class) public void testLineageIntegrationWithMultipleElements() { - // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() - TestPipeline pipeline = createTestPipelineWithLineage(); - // Run pipeline with multiple elements to test thread safety - pipeline + testPipeline .apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) .apply(ParDo.of(new RecordSourceLineageDoFn("system", Arrays.asList("resource")))); - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); + testPipeline.run(); // Verify lineage was recorded for all elements (may have duplicates) List sources = TestLineage.getRecordedSources();