Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31270": "re-add specialized Samza translation of Redistribute",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface",
"modification": 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.NeedsRunner;
Expand All @@ -52,6 +51,8 @@ public class LineagePluginTest {

private static final Logger LOG = LoggerFactory.getLogger(LineagePluginTest.class);

@Rule public TestPipeline testPipeline = createTestPipelineWithLineage();

/**
* TestWatcher that logs detailed lineage diagnostics only when tests fail. This keeps successful
* test output clean while providing deep debugging for failures.
Expand Down Expand Up @@ -89,13 +90,11 @@ public void setUp() {
}

/** Helper to create a TestPipeline with test lineage configured. */
private TestPipeline createTestPipelineWithLineage() {
LineageOptions options = PipelineOptionsFactory.create().as(LineageOptions.class);
private static TestPipeline createTestPipelineWithLineage() {
TestPipeline testPipeline = TestPipeline.create();
LineageOptions options = testPipeline.getOptions().as(LineageOptions.class);
options.setLineageType(TestLineage.class);
TestPipeline pipeline = TestPipeline.fromOptions(options);
// Disable enforcement since we're not using @Rule
pipeline.enableAbandonedNodeEnforcement(false);
return pipeline;
return testPipeline;
}

@Test
Expand All @@ -120,16 +119,11 @@ public void testExplicitLineageSelection() {
@Test
@Category(NeedsRunner.class)
public void testLineageIntegrationWithSimpleFQN() {
// Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
TestPipeline pipeline = createTestPipelineWithLineage();

// Run pipeline that records lineage
pipeline
testPipeline
.apply(Create.of("a", "b", "c"))
.apply(ParDo.of(new RecordSourceLineageDoFn("testsystem", Arrays.asList("db", "table"))));

PipelineResult result = pipeline.run();
result.waitUntilFinish();
testPipeline.run();

// Verify lineage was recorded
List<String> sources = TestLineage.getRecordedSources();
Expand All @@ -139,21 +133,16 @@ public void testLineageIntegrationWithSimpleFQN() {
@Test
@Category(NeedsRunner.class)
public void testLineageIntegrationWithSubtype() {
// Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
TestPipeline pipeline = createTestPipelineWithLineage();

// Run pipeline that records lineage with subtype
pipeline
testPipeline
.apply(Create.of(1, 2, 3))
.apply(
ParDo.of(
new RecordSourceLineageWithSubtypeDoFn(
"spanner",
"table",
Arrays.asList("project", "instance", "database", "table"))));

PipelineResult result = pipeline.run();
result.waitUntilFinish();
testPipeline.run();

// Verify lineage was recorded with subtype
List<String> sources = TestLineage.getRecordedSources();
Expand All @@ -163,19 +152,14 @@ public void testLineageIntegrationWithSubtype() {
@Test
@Category(NeedsRunner.class)
public void testLineageIntegrationWithLastSegmentSeparator() {
// Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
TestPipeline pipeline = createTestPipelineWithLineage();

// Run pipeline that records lineage with custom separator
pipeline
testPipeline
.apply(Create.of("x", "y", "z"))
.apply(
ParDo.of(
new RecordSourceLineageWithSeparatorDoFn(
"gcs", Arrays.asList("bucket", "path/to/file.txt"), "/")));

PipelineResult result = pipeline.run();
result.waitUntilFinish();
testPipeline.run();

// Verify lineage was recorded with separator
List<String> sources = TestLineage.getRecordedSources();
Expand All @@ -185,16 +169,11 @@ public void testLineageIntegrationWithLastSegmentSeparator() {
@Test
@Category(NeedsRunner.class)
public void testLineageIntegrationWithBothSourcesAndSinks() {
// Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
TestPipeline pipeline = createTestPipelineWithLineage();

// Run pipeline that records both source and sink lineage
pipeline
testPipeline
.apply(Create.of("data1", "data2"))
.apply(ParDo.of(new RecordBothSourceAndSinkLineageDoFn()));

PipelineResult result = pipeline.run();
result.waitUntilFinish();
testPipeline.run();

// Verify both source and sink lineage were recorded
List<String> sources = TestLineage.getRecordedSources();
Expand All @@ -207,16 +186,11 @@ public void testLineageIntegrationWithBothSourcesAndSinks() {
@Test
@Category(NeedsRunner.class)
public void testLineageIntegrationWithMultipleElements() {
// Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run()
TestPipeline pipeline = createTestPipelineWithLineage();

// Run pipeline with multiple elements to test thread safety
pipeline
testPipeline
.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.apply(ParDo.of(new RecordSourceLineageDoFn("system", Arrays.asList("resource"))));

PipelineResult result = pipeline.run();
result.waitUntilFinish();
testPipeline.run();

// Verify lineage was recorded for all elements (may have duplicates)
List<String> sources = TestLineage.getRecordedSources();
Expand Down
Loading