From b455e8ffbff0a42049ab93221a38ce6db47090de Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 27 Apr 2026 14:40:40 -0400 Subject: [PATCH 1/2] Revert "FileBasedSinkTest failure (#38284)" This reverts commit a375d6c608db87f6d8cb0305aab4f759489f2679. --- .../org/apache/beam/sdk/metrics/Lineage.java | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index dc30d82adcf4..1a193ec006e0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.lineage.LineageOptions; import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.checkerframework.checker.nullness.qual.Nullable; @@ -123,24 +122,16 @@ private static Lineage createLineage(PipelineOptions options, LineageDirection d /** {@link Lineage} representing sources and optionally side inputs. */ public static Lineage getSources() { - Lineage localSources = sources; - if (localSources == null) { - return createDefaultLineage(LineageDirection.SOURCE); - } - return localSources; + return checkNotNull( + sources, + "Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first."); } /** {@link Lineage} representing sinks. */ public static Lineage getSinks() { - Lineage localSinks = sinks; - if (localSinks == null) { - return createDefaultLineage(LineageDirection.SINK); - } - return localSinks; - } - - private static Lineage createDefaultLineage(LineageDirection direction) { - return createLineage(PipelineOptionsFactory.create(), direction); + return checkNotNull( + sinks, + "Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first."); } @VisibleForTesting From e2c4a94e2e1c2116d8bed9d12722e0991ea35d20 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 27 Apr 2026 14:40:50 -0400 Subject: [PATCH 2/2] Revert "Implement pluggable Lineage in Java SDK (#36781)" This reverts commit 5a5f402d6a200ac515b2c3779496346416ea36fc. --- CHANGES.md | 1 - .../org/apache/beam/sdk/io/FileSystems.java | 1 - .../apache/beam/sdk/lineage/LineageBase.java | 46 --- .../beam/sdk/lineage/LineageOptions.java | 41 --- .../apache/beam/sdk/lineage/package-info.java | 28 -- .../metrics/BoundedTrieMetricsLineage.java | 46 --- .../org/apache/beam/sdk/metrics/Lineage.java | 111 ++----- .../sdk/metrics/StringSetMetricsLineage.java | 43 --- .../DefaultPipelineOptionsRegistrar.java | 2 - .../beam/sdk/lineage/LineagePluginTest.java | 298 ------------------ .../apache/beam/sdk/lineage/TestLineage.java | 77 ----- 11 files changed, 21 insertions(+), 673 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageBase.java delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageOptions.java delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieMetricsLineage.java delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetMetricsLineage.java delete mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineagePluginTest.java delete mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java diff --git a/CHANGES.md b/CHANGES.md index a1e0abee3796..26612883b7ad 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,7 +74,6 @@ compatible. Both coders can decode encoded bytes from the other coder ([#38139](https://github.com/apache/beam/issues/38139)). * (Python) Added type alias for with_exception_handling to be used for typehints. ([#38173](https://github.com/apache/beam/issues/38173)). -* Added plugin mechanism to support different Lineage implementations (Java) ([#36790](https://github.com/apache/beam/issues/36790)). ## Breaking Changes diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 6133ca9fdb39..155df53c6c2e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -578,7 +578,6 @@ public static void setDefaultPipelineOptions(PipelineOptions options) { // entry to set other PipelineOption determined flags Metrics.setDefaultPipelineOptions(options); - Lineage.setDefaultPipelineOptions(options); while (true) { KV revision = FILESYSTEM_REVISION.get(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageBase.java deleted file mode 100644 index c11ff9411a9a..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageBase.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.lineage; - -import org.apache.beam.sdk.annotations.Internal; - -/** - * Plugin interface for lineage implementations. - * - *

This is the core contract that lineage plugins must implement. Custom implementations are - * selected via the {@code --lineageType} pipeline option (see {@link LineageOptions}). - * - *

Implementations must provide a public constructor accepting ({@link - * org.apache.beam.sdk.options.PipelineOptions}, {@link - * org.apache.beam.sdk.metrics.Lineage.LineageDirection}). - * - *

End users should use the {@link org.apache.beam.sdk.metrics.Lineage} facade class instead of - * implementing this interface directly. - */ -@Internal -public interface LineageBase { - /** - * Adds the given FQN as lineage. - * - * @param rollupSegments should be an iterable of strings whose concatenation is a valid Dataplex FQN - * which is already escaped. - *

In particular, this means they will often have trailing delimiters. - */ - void add(Iterable rollupSegments); -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageOptions.java deleted file mode 100644 index 274874ac0c5f..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageOptions.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.lineage; - -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.checkerframework.checker.nullness.qual.Nullable; - -/** - * Pipeline options for selecting a custom {@link LineageBase} implementation. - * - *

When not set, the default Metrics-based lineage is used. Can be set from the command line: - * {@code --lineageType=com.example.MyLineage} - */ -public interface LineageOptions extends PipelineOptions { - - @Description( - "The fully qualified class name of the LineageBase implementation to use for recording " - + "lineage. The class must implement LineageBase and have a public constructor accepting " - + "(PipelineOptions, Lineage.LineageDirection). " - + "If not specified, the default Metrics-based lineage is used.") - @Nullable - Class getLineageType(); - - void setLineageType(@Nullable Class lineageClass); -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java deleted file mode 100644 index 4fa2dcbb6b71..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Lineage tracking support for Apache Beam pipelines. - * - *

This package provides a plugin mechanism to support different lineage implementations through - * the {@link org.apache.beam.sdk.lineage.LineageBase} interface. Lineage implementations can be - * selected via the {@code --lineageType} pipeline option to track data lineage information during - * pipeline execution. - * - *

For lineage capabilities, see {@link org.apache.beam.sdk.metrics.Lineage}. - */ -package org.apache.beam.sdk.lineage; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieMetricsLineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieMetricsLineage.java deleted file mode 100644 index a2f321ba3df3..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/BoundedTrieMetricsLineage.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import org.apache.beam.sdk.lineage.LineageBase; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; - -/** - * Lineage implementation that stores lineage information in {@link BoundedTrie} metrics. - * - *

Used when {@link Metrics.MetricsFlag#lineageRollupEnabled()} is true. - */ -class BoundedTrieMetricsLineage implements LineageBase { - - private final BoundedTrie metric; - - @SuppressWarnings("unused") - public BoundedTrieMetricsLineage(PipelineOptions options, Lineage.LineageDirection direction) { - Lineage.Type type = - (direction == Lineage.LineageDirection.SOURCE) - ? Lineage.Type.SOURCEV2 - : Lineage.Type.SINKV2; - this.metric = Metrics.boundedTrie(Lineage.LINEAGE_NAMESPACE, type.toString()); - } - - @Override - public void add(Iterable rollupSegments) { - metric.add(ImmutableList.copyOf(rollupSegments)); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 1a193ec006e0..1e0124fc518b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.metrics; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; - import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -26,112 +24,44 @@ import java.util.Set; import java.util.regex.Pattern; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.lineage.LineageBase; -import org.apache.beam.sdk.lineage.LineageOptions; import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Standard collection of metrics used to record source and sinks information for lineage tracking. */ public class Lineage { - public static final String LINEAGE_NAMESPACE = "lineage"; - private static final Logger LOG = LoggerFactory.getLogger(Lineage.class); - - private static volatile @Nullable Lineage sources; - private static volatile @Nullable Lineage sinks; - private static volatile @Nullable Class currentLineageType; - - private static final Object INIT_LOCK = new Object(); + public static final String LINEAGE_NAMESPACE = "lineage"; + private static final Lineage SOURCES = new Lineage(Type.SOURCE); + private static final Lineage SINKS = new Lineage(Type.SINK); // Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot. private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.`]"); - private final LineageBase delegate; - - public enum LineageDirection { - SOURCE, - SINK - } - - private Lineage(LineageBase delegate) { - this.delegate = checkNotNull(delegate, "delegate cannot be null"); - } - - @Internal - public static void setDefaultPipelineOptions(PipelineOptions options) { - checkNotNull(options, "options cannot be null"); - Class requestedType = options.as(LineageOptions.class).getLineageType(); - - if (canSkipInit(requestedType)) { - return; - } - synchronized (INIT_LOCK) { - if (canSkipInit(requestedType)) { - return; - } - sources = createLineage(options, LineageDirection.SOURCE); - sinks = createLineage(options, LineageDirection.SINK); - currentLineageType = requestedType; - LOG.debug("Lineage initialized with type {}", requestedType); - } - } - - private static boolean canSkipInit(@Nullable Class requestedType) { - if (sources == null) { - return false; - } - // When no type is requested, preserve whatever is already initialized. - // When a type is requested, only re-init if it differs from the active type. - return requestedType == null || requestedType.equals(currentLineageType); - } - - private static Lineage createLineage(PipelineOptions options, LineageDirection direction) { - Class lineageClass = options.as(LineageOptions.class).getLineageType(); + private final Metric metric; - if (lineageClass != null) { - try { - LineageBase lineage = - lineageClass - .getDeclaredConstructor(PipelineOptions.class, LineageDirection.class) - .newInstance(options, direction); - LOG.info("Using {} for lineage direction {}", lineageClass.getName(), direction); - return new Lineage(lineage); - } catch (ReflectiveOperationException e) { - throw new IllegalArgumentException( - "Failed to instantiate lineage implementation: " - + lineageClass.getName() - + ". The class must have a public constructor accepting " - + "(PipelineOptions, Lineage.LineageDirection).", - e); - } + private Lineage(Type type) { + if (MetricsFlag.lineageRollupEnabled()) { + this.metric = + Metrics.boundedTrie( + LINEAGE_NAMESPACE, + type == Type.SOURCE ? Type.SOURCEV2.toString() : Type.SINKV2.toString()); + } else { + this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString()); } - - LOG.debug("Using default Metrics-based lineage for direction {}", direction); - LineageBase defaultLineage = - MetricsFlag.lineageRollupEnabled() - ? new BoundedTrieMetricsLineage(options, direction) - : new StringSetMetricsLineage(options, direction); - return new Lineage(defaultLineage); } /** {@link Lineage} representing sources and optionally side inputs. */ public static Lineage getSources() { - return checkNotNull( - sources, - "Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first."); + return SOURCES; } /** {@link Lineage} representing sinks. */ public static Lineage getSinks() { - return checkNotNull( - sinks, - "Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first."); + return SINKS; } @VisibleForTesting @@ -210,7 +140,12 @@ public void add(String system, Iterable segments) { *

In particular, this means they will often have trailing delimiters. */ public void add(Iterable rollupSegments) { - delegate.add(rollupSegments); + ImmutableList segments = ImmutableList.copyOf(rollupSegments); + if (MetricsFlag.lineageRollupEnabled()) { + ((BoundedTrie) this.metric).add(segments); + } else { + ((StringSet) this.metric).add(String.join("", segments)); + } } /** @@ -221,8 +156,6 @@ public void add(Iterable rollupSegments) { * @param truncatedMarker the marker to use to represent truncated FQNs. * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing * truncatedMarker. - *

NOTE: When using a custom Lineage plugin, this method will return empty results since - * lineage is not stored in Metrics. */ public static Set query(MetricResults results, Type type, String truncatedMarker) { MetricQueryResults lineageQueryResults = getLineageQueryResults(results, type); @@ -251,8 +184,6 @@ public static Set query(MetricResults results, Type type, String truncat * @param results FQNs from the result * @param type sources or sinks * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing '*'. - *

NOTE: When using a custom Lineage plugin, this method will return empty results since - * lineage is not stored in Metrics. */ public static Set query(MetricResults results, Type type) { if (MetricsFlag.lineageRollupEnabled()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetMetricsLineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetMetricsLineage.java deleted file mode 100644 index a12bffb1bfec..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetMetricsLineage.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import org.apache.beam.sdk.lineage.LineageBase; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * Lineage implementation that stores lineage information in {@link StringSet} metrics. - * - *

Used when {@link Metrics.MetricsFlag#lineageRollupEnabled()} is false. - */ -class StringSetMetricsLineage implements LineageBase { - - private final StringSet metric; - - @SuppressWarnings("unused") - public StringSetMetricsLineage(PipelineOptions options, Lineage.LineageDirection direction) { - Lineage.Type type = - (direction == Lineage.LineageDirection.SOURCE) ? Lineage.Type.SOURCE : Lineage.Type.SINK; - this.metric = Metrics.stringSet(Lineage.LINEAGE_NAMESPACE, type.toString()); - } - - @Override - public void add(Iterable rollupSegments) { - metric.add(String.join("", rollupSegments)); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java index 5689e7f0bf5b..7fc8a829482d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.options; import com.google.auto.service.AutoService; -import org.apache.beam.sdk.lineage.LineageOptions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; /** @@ -36,7 +35,6 @@ public Iterable> getPipelineOptions() { .add(ExperimentalOptions.class) .add(SdkHarnessOptions.class) .add(PortablePipelineOptions.class) - .add(LineageOptions.class) .build(); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineagePluginTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineagePluginTest.java deleted file mode 100644 index ff50dc1ffbe2..000000000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineagePluginTest.java +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.lineage; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.notNullValue; -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Tests for {@link LineageBase} pipeline option selection and DirectRunner integration. */ -@RunWith(JUnit4.class) -public class LineagePluginTest { - - private static final Logger LOG = LoggerFactory.getLogger(LineagePluginTest.class); - - /** - * TestWatcher that logs detailed lineage diagnostics only when tests fail. This keeps successful - * test output clean while providing deep debugging for failures. - */ - @Rule - public TestWatcher lineageDebugLogger = - new TestWatcher() { - @Override - protected void failed(Throwable e, Description description) { - LOG.error("=== Lineage Test Failure Diagnostics ==="); - LOG.error("Test: {}", description.getMethodName()); - LOG.error("Error:", e); - - List sources = TestLineage.getRecordedSources(); - List sinks = TestLineage.getRecordedSinks(); - - LOG.error("Recorded Sources ({}):", sources.size()); - for (int i = 0; i < sources.size(); i++) { - LOG.error(" [{}] \"{}\"", i, sources.get(i)); - } - - LOG.error("Recorded Sinks ({}):", sinks.size()); - for (int i = 0; i < sinks.size(); i++) { - LOG.error(" [{}] \"{}\"", i, sinks.get(i)); - } - - LOG.error("========================================"); - } - }; - - @Before - public void setUp() { - // Clear any recorded lineage from previous tests - TestLineage.clearRecorded(); - } - - /** Helper to create a TestPipeline with test lineage configured. */ - private TestPipeline createTestPipelineWithLineage() { - LineageOptions options = PipelineOptionsFactory.create().as(LineageOptions.class); - options.setLineageType(TestLineage.class); - TestPipeline pipeline = TestPipeline.fromOptions(options); - // Disable enforcement since we're not using @Rule - pipeline.enableAbandonedNodeEnforcement(false); - return pipeline; - } - - @Test - public void testExplicitLineageSelection() { - // Instantiate TestLineage directly and verify behavior - LineageOptions options = PipelineOptionsFactory.create().as(LineageOptions.class); - options.setLineageType(TestLineage.class); - - // Test with SOURCE direction - TestLineage sourceLineage = new TestLineage(options, Lineage.LineageDirection.SOURCE); - assertThat(sourceLineage, notNullValue()); - assertThat(sourceLineage, instanceOf(LineageBase.class)); - assertEquals(Lineage.LineageDirection.SOURCE, sourceLineage.getDirection()); - - // Test with SINK direction - TestLineage sinkLineage = new TestLineage(options, Lineage.LineageDirection.SINK); - assertThat(sinkLineage, notNullValue()); - assertThat(sinkLineage, instanceOf(LineageBase.class)); - assertEquals(Lineage.LineageDirection.SINK, sinkLineage.getDirection()); - } - - @Test - @Category(NeedsRunner.class) - public void testLineageIntegrationWithSimpleFQN() { - // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() - TestPipeline pipeline = createTestPipelineWithLineage(); - - // Run pipeline that records lineage - pipeline - .apply(Create.of("a", "b", "c")) - .apply(ParDo.of(new RecordSourceLineageDoFn("testsystem", Arrays.asList("db", "table")))); - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); - - // Verify lineage was recorded - List sources = TestLineage.getRecordedSources(); - assertThat(sources, hasItem("testsystem:db.table")); - } - - @Test - @Category(NeedsRunner.class) - public void testLineageIntegrationWithSubtype() { - // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() - TestPipeline pipeline = createTestPipelineWithLineage(); - - // Run pipeline that records lineage with subtype - pipeline - .apply(Create.of(1, 2, 3)) - .apply( - ParDo.of( - new RecordSourceLineageWithSubtypeDoFn( - "spanner", - "table", - Arrays.asList("project", "instance", "database", "table")))); - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); - - // Verify lineage was recorded with subtype - List sources = TestLineage.getRecordedSources(); - assertThat(sources, hasItem("spanner:table:project.instance.database.table")); - } - - @Test - @Category(NeedsRunner.class) - public void testLineageIntegrationWithLastSegmentSeparator() { - // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() - TestPipeline pipeline = createTestPipelineWithLineage(); - - // Run pipeline that records lineage with custom separator - pipeline - .apply(Create.of("x", "y", "z")) - .apply( - ParDo.of( - new RecordSourceLineageWithSeparatorDoFn( - "gcs", Arrays.asList("bucket", "path/to/file.txt"), "/"))); - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); - - // Verify lineage was recorded with separator - List sources = TestLineage.getRecordedSources(); - assertThat(sources, hasItem("gcs:bucket.`path/to/file.txt`")); - } - - @Test - @Category(NeedsRunner.class) - public void testLineageIntegrationWithBothSourcesAndSinks() { - // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() - TestPipeline pipeline = createTestPipelineWithLineage(); - - // Run pipeline that records both source and sink lineage - pipeline - .apply(Create.of("data1", "data2")) - .apply(ParDo.of(new RecordBothSourceAndSinkLineageDoFn())); - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); - - // Verify both source and sink lineage were recorded - List sources = TestLineage.getRecordedSources(); - List sinks = TestLineage.getRecordedSinks(); - - assertThat(sources, hasItem("input-system:input-db.input-table")); - assertThat(sinks, hasItem("output-system:output-db.output-table")); - } - - @Test - @Category(NeedsRunner.class) - public void testLineageIntegrationWithMultipleElements() { - // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() - TestPipeline pipeline = createTestPipelineWithLineage(); - - // Run pipeline with multiple elements to test thread safety - pipeline - .apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) - .apply(ParDo.of(new RecordSourceLineageDoFn("system", Arrays.asList("resource")))); - - PipelineResult result = pipeline.run(); - result.waitUntilFinish(); - - // Verify lineage was recorded for all elements (may have duplicates) - List sources = TestLineage.getRecordedSources(); - assertThat(sources, hasSize(10)); // One per element - assertThat(sources, hasItem("system:resource")); - } - - // Helper DoFn classes for recording lineage - - /** DoFn that records source lineage with simple FQN. */ - private static class RecordSourceLineageDoFn extends DoFn { - private final String system; - private final List segments; - - RecordSourceLineageDoFn(String system, List segments) { - this.system = system; - this.segments = segments; - } - - @ProcessElement - public void processElement(ProcessContext c) { - // !!! Lineage Caller !!! - Lineage.getSources().add(system, segments); - c.output(c.element()); - } - } - - /** DoFn that records source lineage with subtype. */ - private static class RecordSourceLineageWithSubtypeDoFn extends DoFn { - private final String system; - private final String subtype; - private final List segments; - - RecordSourceLineageWithSubtypeDoFn(String system, String subtype, List segments) { - this.system = system; - this.subtype = subtype; - this.segments = segments; - } - - @ProcessElement - public void processElement(ProcessContext c) { - // !!! Lineage Caller !!! - Lineage.getSources().add(system, subtype, segments, null); - c.output(c.element()); - } - } - - /** DoFn that records source lineage with custom last segment separator. */ - private static class RecordSourceLineageWithSeparatorDoFn extends DoFn { - private final String system; - private final List segments; - private final String separator; - - RecordSourceLineageWithSeparatorDoFn(String system, List segments, String separator) { - this.system = system; - this.segments = segments; - this.separator = separator; - } - - @ProcessElement - public void processElement(ProcessContext c) { - // !!! Lineage Caller !!! - Lineage.getSources().add(system, segments, separator); - c.output(c.element()); - } - } - - /** DoFn that records both source and sink lineage. */ - private static class RecordBothSourceAndSinkLineageDoFn extends DoFn { - @ProcessElement - public void processElement(ProcessContext c) { - // !!! Lineage Caller !!! - Lineage.getSources().add("input-system", ImmutableList.of("input-db", "input-table")); - // !!! Lineage Caller !!! - Lineage.getSinks().add("output-system", ImmutableList.of("output-db", "output-table")); - c.output(c.element()); - } - } -} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java deleted file mode 100644 index 5344c190fdab..000000000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.lineage; - -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; - -/** - * A test implementation of {@link LineageBase} for integration testing with DirectRunner. - * - *

This implementation records all lineage FQNs in thread-safe static storage for test - * assertions. - */ -public class TestLineage implements LineageBase { - - // Thread-safe storage for recorded lineage, keyed by direction - private static final ConcurrentHashMap> RECORDED_LINEAGE = - new ConcurrentHashMap<>(); - - private final Lineage.LineageDirection direction; - - /** Constructor used by reflection via {@code --lineageType} pipeline option. */ - public TestLineage(PipelineOptions options, Lineage.LineageDirection direction) { - this(direction); - } - - public TestLineage(Lineage.LineageDirection direction) { - this.direction = direction; - } - - @Override - public void add(Iterable rollupSegments) { - // Record the FQN for test assertions - String fqn = String.join("", rollupSegments); - RECORDED_LINEAGE.computeIfAbsent(direction, k -> new CopyOnWriteArrayList<>()).add(fqn); - } - - public Lineage.LineageDirection getDirection() { - return direction; - } - - /** Returns all recorded source lineage FQNs. */ - public static List getRecordedSources() { - return ImmutableList.copyOf( - RECORDED_LINEAGE.getOrDefault(Lineage.LineageDirection.SOURCE, ImmutableList.of())); - } - - /** Returns all recorded sink lineage FQNs. */ - public static List getRecordedSinks() { - return ImmutableList.copyOf( - RECORDED_LINEAGE.getOrDefault(Lineage.LineageDirection.SINK, ImmutableList.of())); - } - - /** Clears all recorded lineage. Should be called in @Before to ensure test isolation. */ - public static void clearRecorded() { - RECORDED_LINEAGE.clear(); - } -}