Skip to content

Commit 41a9b67

Browse files
[FLINK-36746][core] Fix the deadlock bug in SerializedThrowable
Co-authored-by: raoraoxiong <[email protected]>
1 parent fd273a5 commit 41a9b67

File tree

2 files changed

+81
-9
lines changed

2 files changed

+81
-9
lines changed

flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ private SerializedThrowable(Throwable exception, Set<Throwable> alreadySeen) {
9494
}
9595
}
9696
// mimic suppressed exceptions
97-
addAllSuppressed(exception.getSuppressed());
97+
this.addAllSuppressed(exception.getSuppressed(), alreadySeen);
9898
} else {
9999
// copy from that serialized throwable
100100
SerializedThrowable other = (SerializedThrowable) exception;
@@ -104,7 +104,7 @@ private SerializedThrowable(Throwable exception, Set<Throwable> alreadySeen) {
104104
this.cachedException = other.cachedException;
105105
this.setStackTrace(other.getStackTrace());
106106
this.initCause(other.getCause());
107-
this.addAllSuppressed(other.getSuppressed());
107+
this.addAllSuppressed(other.getSuppressed(), alreadySeen);
108108
}
109109
}
110110

@@ -141,15 +141,23 @@ public String getFullStringifiedStackTrace() {
141141
return fullStringifiedStackTrace;
142142
}
143143

144-
private void addAllSuppressed(Throwable[] suppressed) {
144+
/**
145+
* Add all suppressed exceptions to this exception.
146+
*
147+
* @param suppressed The suppressed exceptions to add.
148+
* @param alreadySeen The set of exceptions that have already been seen.
149+
*/
150+
private void addAllSuppressed(Throwable[] suppressed, Set<Throwable> alreadySeen) {
145151
for (Throwable s : suppressed) {
146-
SerializedThrowable serializedThrowable;
147-
if (s instanceof SerializedThrowable) {
148-
serializedThrowable = (SerializedThrowable) s;
149-
} else {
150-
serializedThrowable = new SerializedThrowable(s);
152+
if (alreadySeen.add(s)) {
153+
SerializedThrowable serializedThrowable;
154+
if (s instanceof SerializedThrowable) {
155+
serializedThrowable = (SerializedThrowable) s;
156+
} else {
157+
serializedThrowable = new SerializedThrowable(s);
158+
}
159+
this.addSuppressed(serializedThrowable);
151160
}
152-
this.addSuppressed(serializedThrowable);
153161
}
154162
}
155163

flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,21 @@
1919
package org.apache.flink.runtime.util;
2020

2121
import org.apache.flink.core.testutils.CommonTestUtils;
22+
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
2223
import org.apache.flink.testutils.ClassLoaderUtils;
2324
import org.apache.flink.util.ExceptionUtils;
2425
import org.apache.flink.util.InstantiationUtil;
2526
import org.apache.flink.util.SerializedThrowable;
2627

2728
import org.junit.jupiter.api.Test;
2829

30+
import java.io.IOException;
31+
import java.net.InetSocketAddress;
32+
import java.net.SocketAddress;
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
import java.util.concurrent.CountDownLatch;
36+
2937
import static org.assertj.core.api.Assertions.assertThat;
3038
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3139
import static org.assertj.core.api.Assertions.fail;
@@ -179,4 +187,60 @@ void testCopySuppressed() {
179187
.isInstanceOf(SerializedThrowable.class)
180188
.hasMessage("java.lang.Exception: suppressed");
181189
}
190+
191+
@Test
192+
void testCyclicSuppressedThrowableSerialized() {
193+
SerializedThrowable serializedThrowable = new SerializedThrowable(mockThrowable());
194+
assertThat(serializedThrowable).isNotNull();
195+
}
196+
197+
@Test
198+
void testCyclicSuppressedThrowableConcurrentSerialized() throws InterruptedException {
199+
Throwable throwable = mockThrowable();
200+
CountDownLatch countDownLatch = new CountDownLatch(2);
201+
List<Thread> list = new ArrayList<>();
202+
for (int i = 0; i < 2; i++) {
203+
String threadName = "t" + i;
204+
Thread t = createThread(countDownLatch, throwable, threadName);
205+
t.start();
206+
countDownLatch.countDown();
207+
list.add(t);
208+
}
209+
for (Thread thread : list) {
210+
thread.join();
211+
}
212+
}
213+
214+
private static Thread createThread(
215+
CountDownLatch countDownLatch, Throwable throwable, String threadName) {
216+
Thread t =
217+
new Thread(
218+
() -> {
219+
try {
220+
countDownLatch.await();
221+
SerializedThrowable serializedThrowable =
222+
new SerializedThrowable(throwable);
223+
assertThat(serializedThrowable).isNotNull();
224+
} catch (Exception e) {
225+
throw new RuntimeException(e);
226+
}
227+
});
228+
t.setName(threadName);
229+
return t;
230+
}
231+
232+
private static Throwable mockThrowable() {
233+
SocketAddress remoteAddr = new InetSocketAddress(80);
234+
RemoteTransportException remoteTransportException =
235+
new RemoteTransportException(
236+
"Connection unexpectedly closed by remote task manager '"
237+
+ remoteAddr
238+
+ "'. "
239+
+ "This might indicate that the remote task manager was lost.",
240+
remoteAddr,
241+
new IOException("connection reset by peer."));
242+
RuntimeException runtimeException = new RuntimeException(remoteTransportException);
243+
remoteTransportException.addSuppressed(runtimeException);
244+
return remoteTransportException;
245+
}
182246
}

0 commit comments

Comments
 (0)