diff --git a/CHANGES.md b/CHANGES.md index bdcbd3451c7b..cc1ec48ba188 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,6 +74,7 @@ compatible. Both coders can decode encoded bytes from the other coder ([#38139](https://github.com/apache/beam/issues/38139)). * (Python) Added type alias for with_exception_handling to be used for typehints. ([#38173](https://github.com/apache/beam/issues/38173)). +* Added plugin mechanism to support different Lineage implementations (Java) ([#36790](https://github.com/apache/beam/issues/36790)). ## Breaking Changes diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 155df53c6c2e..6133ca9fdb39 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -578,6 +578,7 @@ public static void setDefaultPipelineOptions(PipelineOptions options) { // entry to set other PipelineOption determined flags Metrics.setDefaultPipelineOptions(options); + Lineage.setDefaultPipelineOptions(options); while (true) { KV revision = FILESYSTEM_REVISION.get(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageBase.java new file mode 100644 index 000000000000..c11ff9411a9a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageBase.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import org.apache.beam.sdk.annotations.Internal; + +/** + * Plugin interface for lineage implementations. + * + *

This is the core contract that lineage plugins must implement. Custom implementations are + * selected via the {@code --lineageType} pipeline option (see {@link LineageOptions}). + * + *

Implementations must provide a public constructor accepting ({@link + * org.apache.beam.sdk.options.PipelineOptions}, {@link + * org.apache.beam.sdk.metrics.Lineage.LineageDirection}). + * + *

End users should use the {@link org.apache.beam.sdk.metrics.Lineage} facade class instead of + * implementing this interface directly. + */ +@Internal +public interface LineageBase { + /** + * Adds the given FQN as lineage. + * + * @param rollupSegments should be an iterable of strings whose concatenation is a valid Dataplex FQN + * which is already escaped. + *

In particular, this means they will often have trailing delimiters. + */ + void add(Iterable rollupSegments); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageOptions.java new file mode 100644 index 000000000000..274874ac0c5f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Pipeline options for selecting a custom {@link LineageBase} implementation. + * + *

When not set, the default Metrics-based lineage is used. Can be set from the command line: + * {@code --lineageType=com.example.MyLineage} + */ +public interface LineageOptions extends PipelineOptions { + + @Description( + "The fully qualified class name of the LineageBase implementation to use for recording " + + "lineage. The class must implement LineageBase and have a public constructor accepting " + + "(PipelineOptions, Lineage.LineageDirection). " + + "If not specified, the default Metrics-based lineage is used.") + @Nullable + Class getLineageType(); + + void setLineageType(@Nullable Class lineageClass); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java new file mode 100644 index 000000000000..4fa2dcbb6b71 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Lineage tracking support for Apache Beam pipelines. + * + *

This package provides a plugin mechanism to support different lineage implementations through + * the {@link org.apache.beam.sdk.lineage.LineageBase} interface. Lineage implementations can be + * selected via the {@code --lineageType} pipeline option to track data lineage information during + * pipeline execution. + * + *

For lineage capabilities, see {@link org.apache.beam.sdk.metrics.Lineage}. + */ +package org.apache.beam.sdk.lineage; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieMetricsLineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieMetricsLineage.java new file mode 100644 index 000000000000..a2f321ba3df3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieMetricsLineage.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.lineage.LineageBase; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +/** + * Lineage implementation that stores lineage information in {@link BoundedTrie} metrics. + * + *

Used when {@link Metrics.MetricsFlag#lineageRollupEnabled()} is true. + */ +class BoundedTrieMetricsLineage implements LineageBase { + + private final BoundedTrie metric; + + @SuppressWarnings("unused") + public BoundedTrieMetricsLineage(PipelineOptions options, Lineage.LineageDirection direction) { + Lineage.Type type = + (direction == Lineage.LineageDirection.SOURCE) + ? Lineage.Type.SOURCEV2 + : Lineage.Type.SINKV2; + this.metric = Metrics.boundedTrie(Lineage.LINEAGE_NAMESPACE, type.toString()); + } + + @Override + public void add(Iterable rollupSegments) { + metric.add(ImmutableList.copyOf(rollupSegments)); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 1e0124fc518b..1a193ec006e0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.metrics; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -24,44 +26,112 @@ import java.util.Set; import java.util.regex.Pattern; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.lineage.LineageBase; +import org.apache.beam.sdk.lineage.LineageOptions; import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Standard collection of metrics used to record source and sinks information for lineage tracking. */ public class Lineage { - public static final String LINEAGE_NAMESPACE = "lineage"; - private static final Lineage SOURCES = new Lineage(Type.SOURCE); - private static final Lineage SINKS = new Lineage(Type.SINK); + private static final Logger LOG = LoggerFactory.getLogger(Lineage.class); + + private static volatile @Nullable Lineage sources; + private static volatile @Nullable Lineage sinks; + private static volatile @Nullable Class currentLineageType; + + private static final Object INIT_LOCK = new Object(); + // Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot. private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.`]"); - private final Metric metric; + private final LineageBase delegate; - private Lineage(Type type) { - if (MetricsFlag.lineageRollupEnabled()) { - this.metric = - Metrics.boundedTrie( - LINEAGE_NAMESPACE, - type == Type.SOURCE ? Type.SOURCEV2.toString() : Type.SINKV2.toString()); - } else { - this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString()); + public enum LineageDirection { + SOURCE, + SINK + } + + private Lineage(LineageBase delegate) { + this.delegate = checkNotNull(delegate, "delegate cannot be null"); + } + + @Internal + public static void setDefaultPipelineOptions(PipelineOptions options) { + checkNotNull(options, "options cannot be null"); + Class requestedType = options.as(LineageOptions.class).getLineageType(); + + if (canSkipInit(requestedType)) { + return; + } + synchronized (INIT_LOCK) { + if (canSkipInit(requestedType)) { + return; + } + sources = createLineage(options, LineageDirection.SOURCE); + sinks = createLineage(options, LineageDirection.SINK); + currentLineageType = requestedType; + LOG.debug("Lineage initialized with type {}", requestedType); + } + } + + private static boolean canSkipInit(@Nullable Class requestedType) { + if (sources == null) { + return false; + } + // When no type is requested, preserve whatever is already initialized. + // When a type is requested, only re-init if it differs from the active type. + return requestedType == null || requestedType.equals(currentLineageType); + } + + private static Lineage createLineage(PipelineOptions options, LineageDirection direction) { + Class lineageClass = options.as(LineageOptions.class).getLineageType(); + + if (lineageClass != null) { + try { + LineageBase lineage = + lineageClass + .getDeclaredConstructor(PipelineOptions.class, LineageDirection.class) + .newInstance(options, direction); + LOG.info("Using {} for lineage direction {}", lineageClass.getName(), direction); + return new Lineage(lineage); + } catch (ReflectiveOperationException e) { + throw new IllegalArgumentException( + "Failed to instantiate lineage implementation: " + + lineageClass.getName() + + ". The class must have a public constructor accepting " + + "(PipelineOptions, Lineage.LineageDirection).", + e); + } } + + LOG.debug("Using default Metrics-based lineage for direction {}", direction); + LineageBase defaultLineage = + MetricsFlag.lineageRollupEnabled() + ? new BoundedTrieMetricsLineage(options, direction) + : new StringSetMetricsLineage(options, direction); + return new Lineage(defaultLineage); } /** {@link Lineage} representing sources and optionally side inputs. */ public static Lineage getSources() { - return SOURCES; + return checkNotNull( + sources, + "Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first."); } /** {@link Lineage} representing sinks. */ public static Lineage getSinks() { - return SINKS; + return checkNotNull( + sinks, + "Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first."); } @VisibleForTesting @@ -140,12 +210,7 @@ public void add(String system, Iterable segments) { *

In particular, this means they will often have trailing delimiters. */ public void add(Iterable rollupSegments) { - ImmutableList segments = ImmutableList.copyOf(rollupSegments); - if (MetricsFlag.lineageRollupEnabled()) { - ((BoundedTrie) this.metric).add(segments); - } else { - ((StringSet) this.metric).add(String.join("", segments)); - } + delegate.add(rollupSegments); } /** @@ -156,6 +221,8 @@ public void add(Iterable rollupSegments) { * @param truncatedMarker the marker to use to represent truncated FQNs. * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing * truncatedMarker. + *

NOTE: When using a custom Lineage plugin, this method will return empty results since + * lineage is not stored in Metrics. */ public static Set query(MetricResults results, Type type, String truncatedMarker) { MetricQueryResults lineageQueryResults = getLineageQueryResults(results, type); @@ -184,6 +251,8 @@ public static Set query(MetricResults results, Type type, String truncat * @param results FQNs from the result * @param type sources or sinks * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing '*'. + *

NOTE: When using a custom Lineage plugin, this method will return empty results since + * lineage is not stored in Metrics. */ public static Set query(MetricResults results, Type type) { if (MetricsFlag.lineageRollupEnabled()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetMetricsLineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetMetricsLineage.java new file mode 100644 index 000000000000..a12bffb1bfec --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetMetricsLineage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.lineage.LineageBase; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Lineage implementation that stores lineage information in {@link StringSet} metrics. + * + *

Used when {@link Metrics.MetricsFlag#lineageRollupEnabled()} is false. + */ +class StringSetMetricsLineage implements LineageBase { + + private final StringSet metric; + + @SuppressWarnings("unused") + public StringSetMetricsLineage(PipelineOptions options, Lineage.LineageDirection direction) { + Lineage.Type type = + (direction == Lineage.LineageDirection.SOURCE) ? Lineage.Type.SOURCE : Lineage.Type.SINK; + this.metric = Metrics.stringSet(Lineage.LINEAGE_NAMESPACE, type.toString()); + } + + @Override + public void add(Iterable rollupSegments) { + metric.add(String.join("", rollupSegments)); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java index 7fc8a829482d..5689e7f0bf5b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.options; import com.google.auto.service.AutoService; +import org.apache.beam.sdk.lineage.LineageOptions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; /** @@ -35,6 +36,7 @@ public Iterable> getPipelineOptions() { .add(ExperimentalOptions.class) .add(SdkHarnessOptions.class) .add(PortablePipelineOptions.class) + .add(LineageOptions.class) .build(); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineagePluginTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineagePluginTest.java new file mode 100644 index 000000000000..ff50dc1ffbe2 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineagePluginTest.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Tests for {@link LineageBase} pipeline option selection and DirectRunner integration. */ +@RunWith(JUnit4.class) +public class LineagePluginTest { + + private static final Logger LOG = LoggerFactory.getLogger(LineagePluginTest.class); + + /** + * TestWatcher that logs detailed lineage diagnostics only when tests fail. This keeps successful + * test output clean while providing deep debugging for failures. + */ + @Rule + public TestWatcher lineageDebugLogger = + new TestWatcher() { + @Override + protected void failed(Throwable e, Description description) { + LOG.error("=== Lineage Test Failure Diagnostics ==="); + LOG.error("Test: {}", description.getMethodName()); + LOG.error("Error:", e); + + List sources = TestLineage.getRecordedSources(); + List sinks = TestLineage.getRecordedSinks(); + + LOG.error("Recorded Sources ({}):", sources.size()); + for (int i = 0; i < sources.size(); i++) { + LOG.error(" [{}] \"{}\"", i, sources.get(i)); + } + + LOG.error("Recorded Sinks ({}):", sinks.size()); + for (int i = 0; i < sinks.size(); i++) { + LOG.error(" [{}] \"{}\"", i, sinks.get(i)); + } + + LOG.error("========================================"); + } + }; + + @Before + public void setUp() { + // Clear any recorded lineage from previous tests + TestLineage.clearRecorded(); + } + + /** Helper to create a TestPipeline with test lineage configured. */ + private TestPipeline createTestPipelineWithLineage() { + LineageOptions options = PipelineOptionsFactory.create().as(LineageOptions.class); + options.setLineageType(TestLineage.class); + TestPipeline pipeline = TestPipeline.fromOptions(options); + // Disable enforcement since we're not using @Rule + pipeline.enableAbandonedNodeEnforcement(false); + return pipeline; + } + + @Test + public void testExplicitLineageSelection() { + // Instantiate TestLineage directly and verify behavior + LineageOptions options = PipelineOptionsFactory.create().as(LineageOptions.class); + options.setLineageType(TestLineage.class); + + // Test with SOURCE direction + TestLineage sourceLineage = new TestLineage(options, Lineage.LineageDirection.SOURCE); + assertThat(sourceLineage, notNullValue()); + assertThat(sourceLineage, instanceOf(LineageBase.class)); + assertEquals(Lineage.LineageDirection.SOURCE, sourceLineage.getDirection()); + + // Test with SINK direction + TestLineage sinkLineage = new TestLineage(options, Lineage.LineageDirection.SINK); + assertThat(sinkLineage, notNullValue()); + assertThat(sinkLineage, instanceOf(LineageBase.class)); + assertEquals(Lineage.LineageDirection.SINK, sinkLineage.getDirection()); + } + + @Test + @Category(NeedsRunner.class) + public void testLineageIntegrationWithSimpleFQN() { + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); + + // Run pipeline that records lineage + pipeline + .apply(Create.of("a", "b", "c")) + .apply(ParDo.of(new RecordSourceLineageDoFn("testsystem", Arrays.asList("db", "table")))); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify lineage was recorded + List sources = TestLineage.getRecordedSources(); + assertThat(sources, hasItem("testsystem:db.table")); + } + + @Test + @Category(NeedsRunner.class) + public void testLineageIntegrationWithSubtype() { + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); + + // Run pipeline that records lineage with subtype + pipeline + .apply(Create.of(1, 2, 3)) + .apply( + ParDo.of( + new RecordSourceLineageWithSubtypeDoFn( + "spanner", + "table", + Arrays.asList("project", "instance", "database", "table")))); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify lineage was recorded with subtype + List sources = TestLineage.getRecordedSources(); + assertThat(sources, hasItem("spanner:table:project.instance.database.table")); + } + + @Test + @Category(NeedsRunner.class) + public void testLineageIntegrationWithLastSegmentSeparator() { + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); + + // Run pipeline that records lineage with custom separator + pipeline + .apply(Create.of("x", "y", "z")) + .apply( + ParDo.of( + new RecordSourceLineageWithSeparatorDoFn( + "gcs", Arrays.asList("bucket", "path/to/file.txt"), "/"))); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify lineage was recorded with separator + List sources = TestLineage.getRecordedSources(); + assertThat(sources, hasItem("gcs:bucket.`path/to/file.txt`")); + } + + @Test + @Category(NeedsRunner.class) + public void testLineageIntegrationWithBothSourcesAndSinks() { + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); + + // Run pipeline that records both source and sink lineage + pipeline + .apply(Create.of("data1", "data2")) + .apply(ParDo.of(new RecordBothSourceAndSinkLineageDoFn())); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify both source and sink lineage were recorded + List sources = TestLineage.getRecordedSources(); + List sinks = TestLineage.getRecordedSinks(); + + assertThat(sources, hasItem("input-system:input-db.input-table")); + assertThat(sinks, hasItem("output-system:output-db.output-table")); + } + + @Test + @Category(NeedsRunner.class) + public void testLineageIntegrationWithMultipleElements() { + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); + + // Run pipeline with multiple elements to test thread safety + pipeline + .apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .apply(ParDo.of(new RecordSourceLineageDoFn("system", Arrays.asList("resource")))); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify lineage was recorded for all elements (may have duplicates) + List sources = TestLineage.getRecordedSources(); + assertThat(sources, hasSize(10)); // One per element + assertThat(sources, hasItem("system:resource")); + } + + // Helper DoFn classes for recording lineage + + /** DoFn that records source lineage with simple FQN. */ + private static class RecordSourceLineageDoFn extends DoFn { + private final String system; + private final List segments; + + RecordSourceLineageDoFn(String system, List segments) { + this.system = system; + this.segments = segments; + } + + @ProcessElement + public void processElement(ProcessContext c) { + // !!! Lineage Caller !!! + Lineage.getSources().add(system, segments); + c.output(c.element()); + } + } + + /** DoFn that records source lineage with subtype. */ + private static class RecordSourceLineageWithSubtypeDoFn extends DoFn { + private final String system; + private final String subtype; + private final List segments; + + RecordSourceLineageWithSubtypeDoFn(String system, String subtype, List segments) { + this.system = system; + this.subtype = subtype; + this.segments = segments; + } + + @ProcessElement + public void processElement(ProcessContext c) { + // !!! Lineage Caller !!! + Lineage.getSources().add(system, subtype, segments, null); + c.output(c.element()); + } + } + + /** DoFn that records source lineage with custom last segment separator. */ + private static class RecordSourceLineageWithSeparatorDoFn extends DoFn { + private final String system; + private final List segments; + private final String separator; + + RecordSourceLineageWithSeparatorDoFn(String system, List segments, String separator) { + this.system = system; + this.segments = segments; + this.separator = separator; + } + + @ProcessElement + public void processElement(ProcessContext c) { + // !!! Lineage Caller !!! + Lineage.getSources().add(system, segments, separator); + c.output(c.element()); + } + } + + /** DoFn that records both source and sink lineage. */ + private static class RecordBothSourceAndSinkLineageDoFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) { + // !!! Lineage Caller !!! + Lineage.getSources().add("input-system", ImmutableList.of("input-db", "input-table")); + // !!! Lineage Caller !!! + Lineage.getSinks().add("output-system", ImmutableList.of("output-db", "output-table")); + c.output(c.element()); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java new file mode 100644 index 000000000000..5344c190fdab --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +/** + * A test implementation of {@link LineageBase} for integration testing with DirectRunner. + * + *

This implementation records all lineage FQNs in thread-safe static storage for test + * assertions. + */ +public class TestLineage implements LineageBase { + + // Thread-safe storage for recorded lineage, keyed by direction + private static final ConcurrentHashMap> RECORDED_LINEAGE = + new ConcurrentHashMap<>(); + + private final Lineage.LineageDirection direction; + + /** Constructor used by reflection via {@code --lineageType} pipeline option. */ + public TestLineage(PipelineOptions options, Lineage.LineageDirection direction) { + this(direction); + } + + public TestLineage(Lineage.LineageDirection direction) { + this.direction = direction; + } + + @Override + public void add(Iterable rollupSegments) { + // Record the FQN for test assertions + String fqn = String.join("", rollupSegments); + RECORDED_LINEAGE.computeIfAbsent(direction, k -> new CopyOnWriteArrayList<>()).add(fqn); + } + + public Lineage.LineageDirection getDirection() { + return direction; + } + + /** Returns all recorded source lineage FQNs. */ + public static List getRecordedSources() { + return ImmutableList.copyOf( + RECORDED_LINEAGE.getOrDefault(Lineage.LineageDirection.SOURCE, ImmutableList.of())); + } + + /** Returns all recorded sink lineage FQNs. */ + public static List getRecordedSinks() { + return ImmutableList.copyOf( + RECORDED_LINEAGE.getOrDefault(Lineage.LineageDirection.SINK, ImmutableList.of())); + } + + /** Clears all recorded lineage. Should be called in @Before to ensure test isolation. */ + public static void clearRecorded() { + RECORDED_LINEAGE.clear(); + } +}