Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
compatible. Both coders can decode encoded bytes from the other coder
([#38139](https://github.com/apache/beam/issues/38139)).
* (Python) Added type alias for with_exception_handling to be used for typehints. ([#38173](https://github.com/apache/beam/issues/38173)).
* Added plugin mechanism to support different Lineage implementations (Java) ([#36790](https://github.com/apache/beam/issues/36790)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,6 @@ public static void setDefaultPipelineOptions(PipelineOptions options) {

// entry to set other PipelineOption determined flags
Metrics.setDefaultPipelineOptions(options);
Lineage.setDefaultPipelineOptions(options);

while (true) {
KV<Long, Integer> revision = FILESYSTEM_REVISION.get();
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

120 changes: 21 additions & 99 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,130 +17,51 @@
*/
package org.apache.beam.sdk.metrics;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.lineage.LineageBase;
import org.apache.beam.sdk.lineage.LineageOptions;
import org.apache.beam.sdk.metrics.Metrics.MetricsFlag;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Standard collection of metrics used to record source and sinks information for lineage tracking.
*/
public class Lineage {
public static final String LINEAGE_NAMESPACE = "lineage";
private static final Logger LOG = LoggerFactory.getLogger(Lineage.class);

private static volatile @Nullable Lineage sources;
private static volatile @Nullable Lineage sinks;
private static volatile @Nullable Class<? extends LineageBase> currentLineageType;

private static final Object INIT_LOCK = new Object();

public static final String LINEAGE_NAMESPACE = "lineage";
private static final Lineage SOURCES = new Lineage(Type.SOURCE);
private static final Lineage SINKS = new Lineage(Type.SINK);
// Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot.
private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.`]");

private final LineageBase delegate;

public enum LineageDirection {
SOURCE,
SINK
}

private Lineage(LineageBase delegate) {
this.delegate = checkNotNull(delegate, "delegate cannot be null");
}

@Internal
public static void setDefaultPipelineOptions(PipelineOptions options) {
checkNotNull(options, "options cannot be null");
Class<? extends LineageBase> requestedType = options.as(LineageOptions.class).getLineageType();

if (canSkipInit(requestedType)) {
return;
}
synchronized (INIT_LOCK) {
if (canSkipInit(requestedType)) {
return;
}
sources = createLineage(options, LineageDirection.SOURCE);
sinks = createLineage(options, LineageDirection.SINK);
currentLineageType = requestedType;
LOG.debug("Lineage initialized with type {}", requestedType);
}
}

private static boolean canSkipInit(@Nullable Class<? extends LineageBase> requestedType) {
if (sources == null) {
return false;
}
// When no type is requested, preserve whatever is already initialized.
// When a type is requested, only re-init if it differs from the active type.
return requestedType == null || requestedType.equals(currentLineageType);
}

private static Lineage createLineage(PipelineOptions options, LineageDirection direction) {
Class<? extends LineageBase> lineageClass = options.as(LineageOptions.class).getLineageType();
private final Metric metric;

if (lineageClass != null) {
try {
LineageBase lineage =
lineageClass
.getDeclaredConstructor(PipelineOptions.class, LineageDirection.class)
.newInstance(options, direction);
LOG.info("Using {} for lineage direction {}", lineageClass.getName(), direction);
return new Lineage(lineage);
} catch (ReflectiveOperationException e) {
throw new IllegalArgumentException(
"Failed to instantiate lineage implementation: "
+ lineageClass.getName()
+ ". The class must have a public constructor accepting "
+ "(PipelineOptions, Lineage.LineageDirection).",
e);
}
private Lineage(Type type) {
if (MetricsFlag.lineageRollupEnabled()) {
this.metric =
Metrics.boundedTrie(
LINEAGE_NAMESPACE,
type == Type.SOURCE ? Type.SOURCEV2.toString() : Type.SINKV2.toString());
} else {
this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString());
}

LOG.debug("Using default Metrics-based lineage for direction {}", direction);
LineageBase defaultLineage =
MetricsFlag.lineageRollupEnabled()
? new BoundedTrieMetricsLineage(options, direction)
: new StringSetMetricsLineage(options, direction);
return new Lineage(defaultLineage);
}

/** {@link Lineage} representing sources and optionally side inputs. */
public static Lineage getSources() {
Lineage localSources = sources;
if (localSources == null) {
return createDefaultLineage(LineageDirection.SOURCE);
}
return localSources;
return SOURCES;
}

/** {@link Lineage} representing sinks. */
public static Lineage getSinks() {
Lineage localSinks = sinks;
if (localSinks == null) {
return createDefaultLineage(LineageDirection.SINK);
}
return localSinks;
}

private static Lineage createDefaultLineage(LineageDirection direction) {
return createLineage(PipelineOptionsFactory.create(), direction);
return SINKS;
}

@VisibleForTesting
Expand Down Expand Up @@ -219,7 +140,12 @@ public void add(String system, Iterable<String> segments) {
* <p>In particular, this means they will often have trailing delimiters.
*/
public void add(Iterable<String> rollupSegments) {
delegate.add(rollupSegments);
ImmutableList<String> segments = ImmutableList.copyOf(rollupSegments);
if (MetricsFlag.lineageRollupEnabled()) {
((BoundedTrie) this.metric).add(segments);
} else {
((StringSet) this.metric).add(String.join("", segments));
}
}

/**
Expand All @@ -230,8 +156,6 @@ public void add(Iterable<String> rollupSegments) {
* @param truncatedMarker the marker to use to represent truncated FQNs.
* @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing
* truncatedMarker.
* <p>NOTE: When using a custom Lineage plugin, this method will return empty results since
* lineage is not stored in Metrics.
*/
public static Set<String> query(MetricResults results, Type type, String truncatedMarker) {
MetricQueryResults lineageQueryResults = getLineageQueryResults(results, type);
Expand Down Expand Up @@ -260,8 +184,6 @@ public static Set<String> query(MetricResults results, Type type, String truncat
* @param results FQNs from the result
* @param type sources or sinks
* @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing '*'.
* <p>NOTE: When using a custom Lineage plugin, this method will return empty results since
* lineage is not stored in Metrics.
*/
public static Set<String> query(MetricResults results, Type type) {
if (MetricsFlag.lineageRollupEnabled()) {
Expand Down
Loading
Loading