diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java
new file mode 100644
index 000000000000..d73f31a0bf2e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/LogElements.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.Objects;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.PCollection;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+/**
+ * {@link PTransform} for logging elements of a {@link PCollection}.
+ *
+ *
Each element is logged and then emitted unchanged.
+ *
+ *
{@code
+ * PCollection words = ...;
+ * PCollection loggedWords = words.apply(LogElements.info().withPrefix("word: "));
+ * }
+ *
+ * @param the element type of the input {@link PCollection}
+ */
+public class LogElements extends PTransform, PCollection> {
+ private static final Logger LOG = LoggerFactory.getLogger(LogElements.class);
+
+ private final Level level;
+ private final String prefix;
+ private final boolean withTimestamp;
+ private final boolean withWindow;
+ private final boolean withPaneInfo;
+
+ /** Returns a {@link LogElements} transform that logs elements at {@link Level#TRACE}. */
+ public static LogElements trace() {
+ return of(Level.TRACE);
+ }
+
+ /** Returns a {@link LogElements} transform that logs elements at {@link Level#DEBUG}. */
+ public static LogElements debug() {
+ return of(Level.DEBUG);
+ }
+
+ /** Returns a {@link LogElements} transform that logs elements at {@link Level#INFO}. */
+ public static LogElements info() {
+ return of(Level.INFO);
+ }
+
+ /** Returns a {@link LogElements} transform that logs elements at {@link Level#WARN}. */
+ public static LogElements warn() {
+ return of(Level.WARN);
+ }
+
+ /** Returns a {@link LogElements} transform that logs elements at {@link Level#ERROR}. */
+ public static LogElements error() {
+ return of(Level.ERROR);
+ }
+
+ /** Returns a {@link LogElements} transform that logs elements at the given level. */
+ public static LogElements of(Level level) {
+ return new LogElements<>(level, "", false, false, false);
+ }
+
+ private LogElements(
+ Level level, String prefix, boolean withTimestamp, boolean withWindow, boolean withPaneInfo) {
+ this.level = Objects.requireNonNull(level, "level");
+ this.prefix = Objects.requireNonNull(prefix, "prefix");
+ this.withTimestamp = withTimestamp;
+ this.withWindow = withWindow;
+ this.withPaneInfo = withPaneInfo;
+ }
+
+ /** Returns a new {@link LogElements} transform with the given prefix before each element. */
+ public LogElements withPrefix(String prefix) {
+ return new LogElements<>(level, prefix, withTimestamp, withWindow, withPaneInfo);
+ }
+
+ /** Returns a new {@link LogElements} transform that logs each element's timestamp. */
+ public LogElements withTimestamp() {
+ return new LogElements<>(level, prefix, true, withWindow, withPaneInfo);
+ }
+
+ /** Returns a new {@link LogElements} transform that logs each element's window. */
+ public LogElements withWindow() {
+ return new LogElements<>(level, prefix, withTimestamp, true, withPaneInfo);
+ }
+
+ /** Returns a new {@link LogElements} transform that logs each element's pane info. */
+ public LogElements withPaneInfo() {
+ return new LogElements<>(level, prefix, withTimestamp, withWindow, true);
+ }
+
+ @Override
+ public PCollection expand(PCollection input) {
+ return input.apply("Log", ParDo.of(new LoggingFn<>(this)));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .add(DisplayData.item("level", level.name()).withLabel("Log Level"))
+ .addIfNotDefault(DisplayData.item("prefix", prefix).withLabel("Prefix"), "")
+ .addIfNotDefault(
+ DisplayData.item("withTimestamp", withTimestamp).withLabel("Log Timestamp"), false)
+ .addIfNotDefault(DisplayData.item("withWindow", withWindow).withLabel("Log Window"), false)
+ .addIfNotDefault(
+ DisplayData.item("withPaneInfo", withPaneInfo).withLabel("Log Pane Info"), false);
+ }
+
+ static String formatForLogging(
+ @Nullable Object element,
+ String prefix,
+ boolean withTimestamp,
+ boolean withWindow,
+ boolean withPaneInfo,
+ Instant timestamp,
+ BoundedWindow window,
+ PaneInfo paneInfo) {
+ StringBuilder builder = new StringBuilder(prefix).append(element);
+ if (withTimestamp) {
+ builder.append(", timestamp=").append(timestamp);
+ }
+ if (withWindow) {
+ builder.append(", window=").append(window);
+ }
+ if (withPaneInfo) {
+ builder.append(", paneInfo=").append(paneInfo);
+ }
+ return builder.toString();
+ }
+
+ private static void log(Level level, String message) {
+ switch (level) {
+ case TRACE:
+ LOG.trace("{}", message);
+ break;
+ case DEBUG:
+ LOG.debug("{}", message);
+ break;
+ case INFO:
+ LOG.info("{}", message);
+ break;
+ case WARN:
+ LOG.warn("{}", message);
+ break;
+ case ERROR:
+ LOG.error("{}", message);
+ break;
+ default:
+ throw unsupportedLogLevel(level);
+ }
+ }
+
+ private static boolean isLoggingEnabled(Level level) {
+ switch (level) {
+ case TRACE:
+ return LOG.isTraceEnabled();
+ case DEBUG:
+ return LOG.isDebugEnabled();
+ case INFO:
+ return LOG.isInfoEnabled();
+ case WARN:
+ return LOG.isWarnEnabled();
+ case ERROR:
+ return LOG.isErrorEnabled();
+ default:
+ throw unsupportedLogLevel(level);
+ }
+ }
+
+ private static IllegalArgumentException unsupportedLogLevel(Level level) {
+ return new IllegalArgumentException("Unsupported log level: " + level);
+ }
+
+ private static class LoggingFn extends DoFn {
+ private final Level level;
+ private final String prefix;
+ private final boolean withTimestamp;
+ private final boolean withWindow;
+ private final boolean withPaneInfo;
+
+ private LoggingFn(LogElements transform) {
+ this.level = transform.level;
+ this.prefix = transform.prefix;
+ this.withTimestamp = transform.withTimestamp;
+ this.withWindow = transform.withWindow;
+ this.withPaneInfo = transform.withPaneInfo;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element T element,
+ @DoFn.Timestamp Instant timestamp,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ OutputReceiver receiver) {
+ if (isLoggingEnabled(level)) {
+ log(
+ level,
+ formatForLogging(
+ element,
+ prefix,
+ withTimestamp,
+ withWindow,
+ withPaneInfo,
+ timestamp,
+ window,
+ paneInfo));
+ }
+ receiver.output(element);
+ }
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java
new file mode 100644
index 000000000000..aaba169018c1
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LogElementsTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.event.Level;
+
+/** Tests for {@link LogElements}. */
+@RunWith(JUnit4.class)
+public class LogElementsTest {
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+ @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(LogElements.class);
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testLogElementsPreservesElements() {
+ List elements = Arrays.asList("a", "b", "c");
+
+ PCollection output =
+ pipeline.apply(Create.of(elements)).apply(LogElements.info());
+
+ PAssert.that(output).containsInAnyOrder(elements);
+ pipeline.run();
+ }
+
+ @Test
+ public void testFormatForLoggingIncludesConfiguredMetadata() {
+ Instant timestamp = new Instant(0);
+ IntervalWindow window = new IntervalWindow(timestamp, Duration.standardMinutes(1));
+
+ String message =
+ LogElements.formatForLogging(
+ "a", "row: ", true, true, true, timestamp, window, PaneInfo.NO_FIRING);
+
+ assertThat(message, containsString("row: a"));
+ assertThat(message, containsString("timestamp=1970-01-01T00:00:00.000Z"));
+ assertThat(
+ message, containsString("window=[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)"));
+ assertThat(message, containsString("paneInfo=PaneInfo.NO_FIRING"));
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testLogElementsLogsAtConfiguredLevels() {
+ applyLogElements("Trace", LogElements.trace().withPrefix("trace: "), "trace-element");
+ applyLogElements("Debug", LogElements.debug().withPrefix("debug: "), "debug-element");
+ applyLogElements("Info", LogElements.info().withPrefix("info: "), "info-element");
+ applyLogElements("Warn", LogElements.warn().withPrefix("warn: "), "warn-element");
+ applyLogElements("Error", LogElements.error().withPrefix("error: "), "error-element");
+
+ pipeline.run();
+
+ expectedLogs.verifyTrace("trace: trace-element");
+ expectedLogs.verifyDebug("debug: debug-element");
+ expectedLogs.verifyInfo("info: info-element");
+ expectedLogs.verifyWarn("warn: warn-element");
+ expectedLogs.verifyError("error: error-element");
+ }
+
+ private void applyLogElements(String name, LogElements transform, String element) {
+ pipeline.apply("Create" + name, Create.of(element)).apply("Log" + name, transform);
+ }
+
+ @Test
+ public void testDisplayData() {
+ DisplayData displayData =
+ DisplayData.from(
+ LogElements.of(Level.WARN)
+ .withPrefix("row: ")
+ .withTimestamp()
+ .withWindow()
+ .withPaneInfo());
+
+ assertThat(displayData, hasDisplayItem("level", "WARN"));
+ assertThat(displayData, hasDisplayItem("prefix", "row: "));
+ assertThat(displayData, hasDisplayItem("withTimestamp", true));
+ assertThat(displayData, hasDisplayItem("withWindow", true));
+ assertThat(displayData, hasDisplayItem("withPaneInfo", true));
+ }
+}