Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
compatible. Both coders can decode encoded bytes from the other coder
([#38139](https://github.com/apache/beam/issues/38139)).
* (Python) Added type alias for with_exception_handling to be used for typehints. ([#38173](https://github.com/apache/beam/issues/38173)).
* Added plugin mechanism to support different Lineage implementations (Java) ([#36790](https://github.com/apache/beam/issues/36790)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ public static void setDefaultPipelineOptions(PipelineOptions options) {

// entry to set other PipelineOption determined flags
Metrics.setDefaultPipelineOptions(options);
Lineage.setDefaultPipelineOptions(options);

while (true) {
KV<Long, Integer> revision = FILESYSTEM_REVISION.get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.lineage;

import org.apache.beam.sdk.annotations.Internal;

/**
* Plugin interface for lineage implementations.
*
* <p>This is the core contract that lineage plugins must implement. Custom implementations are
* selected via the {@code --lineageType} pipeline option (see {@link LineageOptions}).
*
* <p>Implementations must provide a public constructor accepting ({@link
* org.apache.beam.sdk.options.PipelineOptions}, {@link
* org.apache.beam.sdk.metrics.Lineage.LineageDirection}).
*
* <p>End users should use the {@link org.apache.beam.sdk.metrics.Lineage} facade class instead of
* implementing this interface directly.
*/
@Internal
public interface LineageBase {
/**
* Adds the given FQN as lineage.
*
* @param rollupSegments should be an iterable of strings whose concatenation is a valid <a
* href="https://cloud.google.com/data-catalog/docs/fully-qualified-names">Dataplex FQN </a>
* which is already escaped.
* <p>In particular, this means they will often have trailing delimiters.
*/
void add(Iterable<String> rollupSegments);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.lineage;

import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Pipeline options for selecting a custom {@link LineageBase} implementation.
*
* <p>When not set, the default Metrics-based lineage is used. Can be set from the command line:
* {@code --lineageType=com.example.MyLineage}
*/
public interface LineageOptions extends PipelineOptions {

@Description(
"The fully qualified class name of the LineageBase implementation to use for recording "
+ "lineage. The class must implement LineageBase and have a public constructor accepting "
+ "(PipelineOptions, Lineage.LineageDirection). "
+ "If not specified, the default Metrics-based lineage is used.")
@Nullable
Class<? extends LineageBase> getLineageType();

void setLineageType(@Nullable Class<? extends LineageBase> lineageClass);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Lineage tracking support for Apache Beam pipelines.
*
* <p>This package provides a plugin mechanism to support different lineage implementations through
* the {@link org.apache.beam.sdk.lineage.LineageBase} interface. Lineage implementations can be
* selected via the {@code --lineageType} pipeline option to track data lineage information during
* pipeline execution.
*
* <p>For lineage capabilities, see {@link org.apache.beam.sdk.metrics.Lineage}.
*/
package org.apache.beam.sdk.lineage;
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.metrics;

import org.apache.beam.sdk.lineage.LineageBase;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

/**
* Lineage implementation that stores lineage information in {@link BoundedTrie} metrics.
*
* <p>Used when {@link Metrics.MetricsFlag#lineageRollupEnabled()} is true.
*/
class BoundedTrieMetricsLineage implements LineageBase {

private final BoundedTrie metric;

@SuppressWarnings("unused")
public BoundedTrieMetricsLineage(PipelineOptions options, Lineage.LineageDirection direction) {
Comment thread
Abacn marked this conversation as resolved.
Lineage.Type type =
(direction == Lineage.LineageDirection.SOURCE)
? Lineage.Type.SOURCEV2
: Lineage.Type.SINKV2;
this.metric = Metrics.boundedTrie(Lineage.LINEAGE_NAMESPACE, type.toString());
}

@Override
public void add(Iterable<String> rollupSegments) {
metric.add(ImmutableList.copyOf(rollupSegments));
}
}
111 changes: 90 additions & 21 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,121 @@
*/
package org.apache.beam.sdk.metrics;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.lineage.LineageBase;
import org.apache.beam.sdk.lineage.LineageOptions;
import org.apache.beam.sdk.metrics.Metrics.MetricsFlag;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Standard collection of metrics used to record source and sinks information for lineage tracking.
*/
public class Lineage {

public static final String LINEAGE_NAMESPACE = "lineage";
private static final Lineage SOURCES = new Lineage(Type.SOURCE);
private static final Lineage SINKS = new Lineage(Type.SINK);
private static final Logger LOG = LoggerFactory.getLogger(Lineage.class);

private static volatile @Nullable Lineage sources;
private static volatile @Nullable Lineage sinks;
private static volatile @Nullable Class<? extends LineageBase> currentLineageType;

private static final Object INIT_LOCK = new Object();

// Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot.
private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.`]");

private final Metric metric;
private final LineageBase delegate;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI
We are using Lineage as a facade around LineageBase, so we don't expose the latter and we avoid any (breaking) changes in other parts of Beam


private Lineage(Type type) {
if (MetricsFlag.lineageRollupEnabled()) {
this.metric =
Metrics.boundedTrie(
LINEAGE_NAMESPACE,
type == Type.SOURCE ? Type.SOURCEV2.toString() : Type.SINKV2.toString());
} else {
this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString());
public enum LineageDirection {
SOURCE,
SINK
}

private Lineage(LineageBase delegate) {
this.delegate = checkNotNull(delegate, "delegate cannot be null");
}

@Internal
public static void setDefaultPipelineOptions(PipelineOptions options) {
checkNotNull(options, "options cannot be null");
Class<? extends LineageBase> requestedType = options.as(LineageOptions.class).getLineageType();

if (canSkipInit(requestedType)) {
return;
}
synchronized (INIT_LOCK) {
if (canSkipInit(requestedType)) {
return;
}
sources = createLineage(options, LineageDirection.SOURCE);
sinks = createLineage(options, LineageDirection.SINK);
currentLineageType = requestedType;
LOG.debug("Lineage initialized with type {}", requestedType);
}
}

private static boolean canSkipInit(@Nullable Class<? extends LineageBase> requestedType) {
if (sources == null) {
return false;
}
// When no type is requested, preserve whatever is already initialized.
// When a type is requested, only re-init if it differs from the active type.
return requestedType == null || requestedType.equals(currentLineageType);
}

private static Lineage createLineage(PipelineOptions options, LineageDirection direction) {
Class<? extends LineageBase> lineageClass = options.as(LineageOptions.class).getLineageType();

if (lineageClass != null) {
try {
LineageBase lineage =
lineageClass
.getDeclaredConstructor(PipelineOptions.class, LineageDirection.class)
.newInstance(options, direction);
LOG.info("Using {} for lineage direction {}", lineageClass.getName(), direction);
return new Lineage(lineage);
} catch (ReflectiveOperationException e) {
throw new IllegalArgumentException(
"Failed to instantiate lineage implementation: "
+ lineageClass.getName()
+ ". The class must have a public constructor accepting "
+ "(PipelineOptions, Lineage.LineageDirection).",
e);
}
}

LOG.debug("Using default Metrics-based lineage for direction {}", direction);
LineageBase defaultLineage =
MetricsFlag.lineageRollupEnabled()
? new BoundedTrieMetricsLineage(options, direction)
: new StringSetMetricsLineage(options, direction);
return new Lineage(defaultLineage);
}

/** {@link Lineage} representing sources and optionally side inputs. */
public static Lineage getSources() {
return SOURCES;
return checkNotNull(
sources,
"Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first.");
}

/** {@link Lineage} representing sinks. */
public static Lineage getSinks() {
return SINKS;
return checkNotNull(
sinks,
"Lineage not initialized. FileSystems.setDefaultPipelineOptions must be called first.");
}

@VisibleForTesting
Expand Down Expand Up @@ -140,12 +210,7 @@ public void add(String system, Iterable<String> segments) {
* <p>In particular, this means they will often have trailing delimiters.
*/
public void add(Iterable<String> rollupSegments) {
ImmutableList<String> segments = ImmutableList.copyOf(rollupSegments);
if (MetricsFlag.lineageRollupEnabled()) {
((BoundedTrie) this.metric).add(segments);
} else {
((StringSet) this.metric).add(String.join("", segments));
}
delegate.add(rollupSegments);
}

/**
Expand All @@ -156,6 +221,8 @@ public void add(Iterable<String> rollupSegments) {
* @param truncatedMarker the marker to use to represent truncated FQNs.
* @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing
* truncatedMarker.
* <p>NOTE: When using a custom Lineage plugin, this method will return empty results since
* lineage is not stored in Metrics.
*/
public static Set<String> query(MetricResults results, Type type, String truncatedMarker) {
MetricQueryResults lineageQueryResults = getLineageQueryResults(results, type);
Expand Down Expand Up @@ -184,6 +251,8 @@ public static Set<String> query(MetricResults results, Type type, String truncat
* @param results FQNs from the result
* @param type sources or sinks
* @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing '*'.
* <p>NOTE: When using a custom Lineage plugin, this method will return empty results since
* lineage is not stored in Metrics.
*/
public static Set<String> query(MetricResults results, Type type) {
if (MetricsFlag.lineageRollupEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.metrics;

import org.apache.beam.sdk.lineage.LineageBase;
import org.apache.beam.sdk.options.PipelineOptions;

/**
* Lineage implementation that stores lineage information in {@link StringSet} metrics.
*
* <p>Used when {@link Metrics.MetricsFlag#lineageRollupEnabled()} is false.
*/
class StringSetMetricsLineage implements LineageBase {

private final StringSet metric;

@SuppressWarnings("unused")
public StringSetMetricsLineage(PipelineOptions options, Lineage.LineageDirection direction) {
Lineage.Type type =
(direction == Lineage.LineageDirection.SOURCE) ? Lineage.Type.SOURCE : Lineage.Type.SINK;
this.metric = Metrics.stringSet(Lineage.LINEAGE_NAMESPACE, type.toString());
}

@Override
public void add(Iterable<String> rollupSegments) {
metric.add(String.join("", rollupSegments));
}
}
Loading
Loading