Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
import com.weibo.rill.flow.interfaces.model.task.BaseTask;
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.interfaces.model.task.TaskInvokeMsg;
import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
Expand All @@ -31,11 +33,11 @@
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.olympicene.traversal.utils.ConditionsUtil;
import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode;
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException;
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping;
import com.weibo.rill.flow.olympicene.traversal.utils.ConditionsUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
Expand Down Expand Up @@ -117,7 +119,25 @@ public ExecutionResult finish(String executionId, NotifyInfo notifyInfo, Map<Str
if (notifyInfo.getTaskStatus() != null && notifyInfo.getTaskStatus().isCompleted()) {
taskInfo.updateInvokeMsg(notifyInfo.getTaskInvokeMsg());
updateTaskInvokeEndTime(taskInfo);
taskInfo.setTaskStatus(notifyInfo.getTaskStatus());
TaskStatus finalStatus = notifyInfo.getTaskStatus();
// 仅超时(msg="timeout")时,若 skipOnTimeout=true 且 tolerance=true,则将状态置为 SKIPPED 而非 FAILED
boolean isTimeout = java.util.Optional.ofNullable(notifyInfo.getTaskInvokeMsg())
.map(TaskInvokeMsg::getMsg)
.map("timeout"::equals)
.orElse(false);
if (finalStatus == TaskStatus.FAILED && isTimeout) {
boolean tolerance = java.util.Optional.ofNullable(taskInfo.getTask())
.map(BaseTask::isTolerance).orElse(false);
boolean skipOnTimeout = java.util.Optional.ofNullable(taskInfo.getTask())
.map(BaseTask::getTimeline)
.map(Timeline::getSkipOnTimeout)
.map(Boolean::parseBoolean)
.orElse(false);
if (tolerance && skipOnTimeout) {
finalStatus = TaskStatus.SKIPPED;
}
}
taskInfo.setTaskStatus(finalStatus);
dagInfoStorage.saveTaskInfos(executionId, ImmutableSet.of(taskInfo));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.weibo.rill.flow.olympicene.core.model.NotifyInfo
import com.weibo.rill.flow.olympicene.core.model.dag.DAG
import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus
import com.weibo.rill.flow.interfaces.model.task.TaskInfo
import com.weibo.rill.flow.interfaces.model.task.TaskInvokeMsg
import com.weibo.rill.flow.interfaces.model.task.TaskStatus
import com.weibo.rill.flow.olympicene.core.runtime.DAGParser
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure
Expand Down Expand Up @@ -219,4 +220,156 @@ class SuspenseTaskTraversalTest extends Specification {
["url": "bbb"] | DAGEvent.TASK_FINISH
["text": "aaa"] | DAGEvent.TASK_FAILED
}
def "suspense task should be SKIPPED on timeout when skipOnTimeout=true and tolerance=true"() {
given:
// skipOnTimeout=true 且 tolerance=true,超时后节点应变为 SKIPPED
String text = "version: 0.0.1\n" +
"namespace: olympicene\n" +
"service: mca\n" +
"name: test\n" +
"type: flow\n" +
"tasks: \n" +
"- category: suspense\n" +
" name: A\n" +
" tolerance: true\n" +
" timeline:\n" +
" timeoutInSeconds: \"120\"\n" +
" skipOnTimeout: \"true\"\n" +
" conditions:\n" +
" - \$.input.[?(@.url == \"bbb\")]\n" +
" next: B\n" +
"- category: function\n" +
" name: B\n" +
" resourceName: \"olympicene::test::funtion1::prod\" \n" +
" pattern: task_scheduler\n" +
" inputMappings:\n" +
" - target: \$.input.gopUrls\n" +
" source: \$.context.gopUrls\n" +
" outputMappings:\n" +
" - target: \$.context.url\n" +
" source: \$.output.url"
DAG dag = dagParser.parse(text)

when:
// 提交 DAG,suspense 节点进入 RUNNING 状态等待唤醒
olympicene.submit('timeout_skip_1', dag, [:])
// 模拟超时:TimeCheckRunner 会以 taskStatus=FAILED, msg="timeout" 调用 finishTaskSync
olympicene.wakeup('timeout_skip_1', [:],
NotifyInfo.builder()
.taskInfoName('A')
.taskStatus(TaskStatus.FAILED)
.taskInvokeMsg(TaskInvokeMsg.builder().msg("timeout").build())
.build())
TaskInfo taskInfo = dagStorage.getBasicTaskInfo('timeout_skip_1', 'A')

then:
// skipOnTimeout=true && tolerance=true && timeout → 应为 SKIPPED
taskInfo.getTaskStatus() == TaskStatus.SKIPPED
}

def "suspense task should be FAILED on timeout when skipOnTimeout=false"() {
given:
// skipOnTimeout=false,超时后节点应触发 TASK_FAILED 事件(不被跳过)
String text = "version: 0.0.1\n" +
"namespace: olympicene\n" +
"service: mca\n" +
"name: test\n" +
"type: flow\n" +
"tasks: \n" +
"- category: suspense\n" +
" name: A\n" +
" tolerance: true\n" +
" timeline:\n" +
" timeoutInSeconds: \"120\"\n" +
" skipOnTimeout: \"false\"\n" +
" conditions:\n" +
" - \$.input.[?(@.url == \"bbb\")]\n"
DAG dag = dagParser.parse(text)

when:
olympicene.submit('timeout_noskip_1', dag, [:])
olympicene.wakeup('timeout_noskip_1', [:],
NotifyInfo.builder()
.taskInfoName('A')
.taskStatus(TaskStatus.FAILED)
.taskInvokeMsg(TaskInvokeMsg.builder().msg("timeout").build())
.build())

then:
// skipOnTimeout=false → 触发 TASK_FAILED 回调事件,而不是 TASK_SKIPPED
1 * callback.onEvent({
Event event -> event.eventCode == DAGEvent.TASK_FAILED.getCode()
})
}

def "suspense task should be FAILED on timeout when tolerance=false even if skipOnTimeout=true"() {
given:
// tolerance=false,即使 skipOnTimeout=true,超时后也应触发 TASK_FAILED 事件
String text = "version: 0.0.1\n" +
"namespace: olympicene\n" +
"service: mca\n" +
"name: test\n" +
"type: flow\n" +
"tasks: \n" +
"- category: suspense\n" +
" name: A\n" +
" tolerance: false\n" +
" timeline:\n" +
" timeoutInSeconds: \"120\"\n" +
" skipOnTimeout: \"true\"\n" +
" conditions:\n" +
" - \$.input.[?(@.url == \"bbb\")]\n"
DAG dag = dagParser.parse(text)

when:
olympicene.submit('timeout_tol_false_1', dag, [:])
olympicene.wakeup('timeout_tol_false_1', [:],
NotifyInfo.builder()
.taskInfoName('A')
.taskStatus(TaskStatus.FAILED)
.taskInvokeMsg(TaskInvokeMsg.builder().msg("timeout").build())
.build())

then:
// tolerance=false → 触发 TASK_FAILED 回调事件,而不是 TASK_SKIPPED
1 * callback.onEvent({
Event event -> event.eventCode == DAGEvent.TASK_FAILED.getCode()
})
}
def "suspense task should remain FAILED on interruption even if skipOnTimeout=true and tolerance=true"() {
given:
// tolerance=true 且 skipOnTimeout=true,但触发的是打断而非超时,应保持 FAILED
String text = "version: 0.0.1\n" +
"namespace: olympicene\n" +
"service: mca\n" +
"name: test\n" +
"type: flow\n" +
"tasks: \n" +
"- category: suspense\n" +
" name: A\n" +
" tolerance: true\n" +
" timeline:\n" +
" timeoutInSeconds: \"120\"\n" +
" skipOnTimeout: \"true\"\n" +
" inputMappings:\n" +
" - target: \$.input.url\n" +
" source: \$.context.url\n" +
" - target: \$.input.text\n" +
" source: \$.context.text\n" +
" conditions:\n" +
" - \$.input.[?(@.url == \"bbb\")]\n" +
" interruptions:\n" +
" - \$.input.[?(@.text == \"aaa\")]\n"
DAG dag = dagParser.parse(text)

when:
// 提交 DAG,初始上下文触发 interruption 条件(text=aaa)
olympicene.submit('interrupt_with_skip_flag_1', dag, ["text": "aaa"])

then:
// 虽然 tolerance=true && skipOnTimeout=true,但触发的是打断而非超时,应为 TASK_FAILED 而非 TASK_SKIPPED
1 * callback.onEvent({
Event event -> event.eventCode == DAGEvent.TASK_FAILED.getCode()
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@
@Getter
public class Timeline {
private String timeoutInSeconds;
private String skipOnTimeout;
private String suspenseIntervalSeconds;
private String suspenseTimestamp;

@JsonCreator
public Timeline(@JsonProperty("timeoutInSeconds") String timeoutInSeconds,
@JsonProperty("skipOnTimeout") String skipOnTimeout,
@JsonProperty("suspenseIntervalSeconds") String suspenseIntervalSeconds,
@JsonProperty("suspenseTimestamp") String suspenseTimestamp) {
this.timeoutInSeconds = timeoutInSeconds;
this.skipOnTimeout = skipOnTimeout;
this.suspenseIntervalSeconds = suspenseIntervalSeconds;
this.suspenseTimestamp = suspenseTimestamp;
}
}

Loading