diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java new file mode 100644 index 000000000000..d73f31a0bf2e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.Objects; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.PCollection; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +/** + * {@link PTransform} for logging elements of a {@link PCollection}. + * + *

Each element is logged and then emitted unchanged. + * + *

{@code
+ * PCollection words = ...;
+ * PCollection loggedWords = words.apply(LogElements.info().withPrefix("word: "));
+ * }
+ * + * @param the element type of the input {@link PCollection} + */ +public class LogElements extends PTransform, PCollection> { + private static final Logger LOG = LoggerFactory.getLogger(LogElements.class); + + private final Level level; + private final String prefix; + private final boolean withTimestamp; + private final boolean withWindow; + private final boolean withPaneInfo; + + /** Returns a {@link LogElements} transform that logs elements at {@link Level#TRACE}. */ + public static LogElements trace() { + return of(Level.TRACE); + } + + /** Returns a {@link LogElements} transform that logs elements at {@link Level#DEBUG}. */ + public static LogElements debug() { + return of(Level.DEBUG); + } + + /** Returns a {@link LogElements} transform that logs elements at {@link Level#INFO}. */ + public static LogElements info() { + return of(Level.INFO); + } + + /** Returns a {@link LogElements} transform that logs elements at {@link Level#WARN}. */ + public static LogElements warn() { + return of(Level.WARN); + } + + /** Returns a {@link LogElements} transform that logs elements at {@link Level#ERROR}. */ + public static LogElements error() { + return of(Level.ERROR); + } + + /** Returns a {@link LogElements} transform that logs elements at the given level. */ + public static LogElements of(Level level) { + return new LogElements<>(level, "", false, false, false); + } + + private LogElements( + Level level, String prefix, boolean withTimestamp, boolean withWindow, boolean withPaneInfo) { + this.level = Objects.requireNonNull(level, "level"); + this.prefix = Objects.requireNonNull(prefix, "prefix"); + this.withTimestamp = withTimestamp; + this.withWindow = withWindow; + this.withPaneInfo = withPaneInfo; + } + + /** Returns a new {@link LogElements} transform with the given prefix before each element. */ + public LogElements withPrefix(String prefix) { + return new LogElements<>(level, prefix, withTimestamp, withWindow, withPaneInfo); + } + + /** Returns a new {@link LogElements} transform that logs each element's timestamp. */ + public LogElements withTimestamp() { + return new LogElements<>(level, prefix, true, withWindow, withPaneInfo); + } + + /** Returns a new {@link LogElements} transform that logs each element's window. */ + public LogElements withWindow() { + return new LogElements<>(level, prefix, withTimestamp, true, withPaneInfo); + } + + /** Returns a new {@link LogElements} transform that logs each element's pane info. */ + public LogElements withPaneInfo() { + return new LogElements<>(level, prefix, withTimestamp, withWindow, true); + } + + @Override + public PCollection expand(PCollection input) { + return input.apply("Log", ParDo.of(new LoggingFn<>(this))); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData.item("level", level.name()).withLabel("Log Level")) + .addIfNotDefault(DisplayData.item("prefix", prefix).withLabel("Prefix"), "") + .addIfNotDefault( + DisplayData.item("withTimestamp", withTimestamp).withLabel("Log Timestamp"), false) + .addIfNotDefault(DisplayData.item("withWindow", withWindow).withLabel("Log Window"), false) + .addIfNotDefault( + DisplayData.item("withPaneInfo", withPaneInfo).withLabel("Log Pane Info"), false); + } + + static String formatForLogging( + @Nullable Object element, + String prefix, + boolean withTimestamp, + boolean withWindow, + boolean withPaneInfo, + Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo) { + StringBuilder builder = new StringBuilder(prefix).append(element); + if (withTimestamp) { + builder.append(", timestamp=").append(timestamp); + } + if (withWindow) { + builder.append(", window=").append(window); + } + if (withPaneInfo) { + builder.append(", paneInfo=").append(paneInfo); + } + return builder.toString(); + } + + private static void log(Level level, String message) { + switch (level) { + case TRACE: + LOG.trace("{}", message); + break; + case DEBUG: + LOG.debug("{}", message); + break; + case INFO: + LOG.info("{}", message); + break; + case WARN: + LOG.warn("{}", message); + break; + case ERROR: + LOG.error("{}", message); + break; + default: + throw unsupportedLogLevel(level); + } + } + + private static boolean isLoggingEnabled(Level level) { + switch (level) { + case TRACE: + return LOG.isTraceEnabled(); + case DEBUG: + return LOG.isDebugEnabled(); + case INFO: + return LOG.isInfoEnabled(); + case WARN: + return LOG.isWarnEnabled(); + case ERROR: + return LOG.isErrorEnabled(); + default: + throw unsupportedLogLevel(level); + } + } + + private static IllegalArgumentException unsupportedLogLevel(Level level) { + return new IllegalArgumentException("Unsupported log level: " + level); + } + + private static class LoggingFn extends DoFn { + private final Level level; + private final String prefix; + private final boolean withTimestamp; + private final boolean withWindow; + private final boolean withPaneInfo; + + private LoggingFn(LogElements transform) { + this.level = transform.level; + this.prefix = transform.prefix; + this.withTimestamp = transform.withTimestamp; + this.withWindow = transform.withWindow; + this.withPaneInfo = transform.withPaneInfo; + } + + @ProcessElement + public void processElement( + @Element T element, + @DoFn.Timestamp Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + OutputReceiver receiver) { + if (isLoggingEnabled(level)) { + log( + level, + formatForLogging( + element, + prefix, + withTimestamp, + withWindow, + withPaneInfo, + timestamp, + window, + paneInfo)); + } + receiver.output(element); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java new file mode 100644 index 000000000000..aaba169018c1 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.event.Level; + +/** Tests for {@link LogElements}. */ +@RunWith(JUnit4.class) +public class LogElementsTest { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(LogElements.class); + + @Test + @Category(NeedsRunner.class) + public void testLogElementsPreservesElements() { + List elements = Arrays.asList("a", "b", "c"); + + PCollection output = + pipeline.apply(Create.of(elements)).apply(LogElements.info()); + + PAssert.that(output).containsInAnyOrder(elements); + pipeline.run(); + } + + @Test + public void testFormatForLoggingIncludesConfiguredMetadata() { + Instant timestamp = new Instant(0); + IntervalWindow window = new IntervalWindow(timestamp, Duration.standardMinutes(1)); + + String message = + LogElements.formatForLogging( + "a", "row: ", true, true, true, timestamp, window, PaneInfo.NO_FIRING); + + assertThat(message, containsString("row: a")); + assertThat(message, containsString("timestamp=1970-01-01T00:00:00.000Z")); + assertThat( + message, containsString("window=[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)")); + assertThat(message, containsString("paneInfo=PaneInfo.NO_FIRING")); + } + + @Test + @Category(NeedsRunner.class) + public void testLogElementsLogsAtConfiguredLevels() { + applyLogElements("Trace", LogElements.trace().withPrefix("trace: "), "trace-element"); + applyLogElements("Debug", LogElements.debug().withPrefix("debug: "), "debug-element"); + applyLogElements("Info", LogElements.info().withPrefix("info: "), "info-element"); + applyLogElements("Warn", LogElements.warn().withPrefix("warn: "), "warn-element"); + applyLogElements("Error", LogElements.error().withPrefix("error: "), "error-element"); + + pipeline.run(); + + expectedLogs.verifyTrace("trace: trace-element"); + expectedLogs.verifyDebug("debug: debug-element"); + expectedLogs.verifyInfo("info: info-element"); + expectedLogs.verifyWarn("warn: warn-element"); + expectedLogs.verifyError("error: error-element"); + } + + private void applyLogElements(String name, LogElements transform, String element) { + pipeline.apply("Create" + name, Create.of(element)).apply("Log" + name, transform); + } + + @Test + public void testDisplayData() { + DisplayData displayData = + DisplayData.from( + LogElements.of(Level.WARN) + .withPrefix("row: ") + .withTimestamp() + .withWindow() + .withPaneInfo()); + + assertThat(displayData, hasDisplayItem("level", "WARN")); + assertThat(displayData, hasDisplayItem("prefix", "row: ")); + assertThat(displayData, hasDisplayItem("withTimestamp", true)); + assertThat(displayData, hasDisplayItem("withWindow", true)); + assertThat(displayData, hasDisplayItem("withPaneInfo", true)); + } +}