1919import org .springframework .stereotype .Component ;
2020
2121import com .oceanbase .odc .common .event .AbstractEventListener ;
22+ import com .oceanbase .odc .core .shared .constant .TaskStatus ;
2223import com .oceanbase .odc .metadb .task .JobEntity ;
24+ import com .oceanbase .odc .service .dlm .DLMService ;
2325import com .oceanbase .odc .service .notification .Broker ;
2426import com .oceanbase .odc .service .notification .NotificationProperties ;
2527import com .oceanbase .odc .service .notification .helper .EventBuilder ;
2628import com .oceanbase .odc .service .schedule .ScheduleService ;
2729import com .oceanbase .odc .service .schedule .ScheduleTaskService ;
28- import com .oceanbase .odc .service .task .enums .JobStatus ;
2930import com .oceanbase .odc .service .task .service .TaskFrameworkService ;
3031
3132import lombok .extern .slf4j .Slf4j ;
@@ -50,6 +51,8 @@ public class JobTerminateNotifyListener extends AbstractEventListener<JobTermina
5051 private ScheduleTaskService scheduleTaskService ;
5152 @ Autowired
5253 private ScheduleService scheduleService ;
54+ @ Autowired
55+ private DLMService dlmService ;
5356
5457 @ Override
5558 public void onEvent (JobTerminateEvent event ) {
@@ -60,7 +63,11 @@ public void onEvent(JobTerminateEvent event) {
6063 JobEntity jobEntity = taskFrameworkService .find (event .getJi ().getId ());
6164 scheduleTaskService .findByJobId (jobEntity .getId ())
6265 .ifPresent (task -> {
63- broker .enqueueEvent (event .getStatus () == JobStatus .DONE ? eventBuilder .ofSucceededTask (task )
66+ TaskStatus status =
67+ "DLM" .equalsIgnoreCase (jobEntity .getJobType ())
68+ ? dlmService .getFinalTaskStatus (task .getId ())
69+ : jobEntity .getStatus ().convertTaskStatus ();
70+ broker .enqueueEvent (status == TaskStatus .DONE ? eventBuilder .ofSucceededTask (task )
6471 : eventBuilder .ofFailedTask (task ));
6572 });
6673 } catch (Exception e ) {
0 commit comments