Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.List;
import java.util.Objects;

@Unremovable
Expand All @@ -32,7 +33,7 @@ public class TaskExecutionProcessor implements EventProcessor<TaskExecution> {

private static final Logger log = LoggerFactory.getLogger(TaskExecutionProcessor.class);

final TaskPersistence taskPersistence;
private final TaskPersistence taskPersistence;

@Inject
public TaskExecutionProcessor(TaskPersistence taskPersistence) {
Expand All @@ -41,22 +42,36 @@ public TaskExecutionProcessor(TaskPersistence taskPersistence) {

@Override
public void process(TaskExecution event) {
Objects.requireNonNull(event, "event cannot be null");
log.debug("Processing task: {}", event);
event.setId(generateTaskExecutionId(event));
processBatch(List.of(event));
}

public void processBatch(List<TaskExecution> events) {
Objects.requireNonNull(events, "events cannot be null");

if (events.isEmpty()) {
return;
}

log.debug("Processing task batch size: {}", events.size());

try {
this.taskPersistence.persist(event);
log.debug("Successfully processed the task event with ID: {}", event.getInstanceId());
for (TaskExecution event : events) {
Objects.requireNonNull(event, "event cannot be null");
event.setId(generateTaskExecutionId(event));
}

this.taskPersistence.persistBatch(events);

log.debug("Successfully processed task batch size: {}", events.size());
} catch (SQLException e) {
log.error("Error while processing the task event: {}", event, e);
throw new ProcessEventFailedException("Failed to process the task event with instance ID: " + event.getInstanceId(), e);
log.error("Error while processing task batch size: {}", events.size(), e);
throw new ProcessEventFailedException("Failed to process task event batch", e);
}
}

private String generateTaskExecutionId(TaskExecution taskExecutionEvent) {
// Generate deterministic ID based on instance's ID + task position
return taskExecutionEvent.getInstanceId() +
":" + taskExecutionEvent.getTaskPosition();
return taskExecutionEvent.getInstanceId()
+ ":"
+ taskExecutionEvent.getTaskPosition();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.List;
import java.util.Objects;

@Unremovable
Expand All @@ -40,13 +41,19 @@ public WorkflowEventProcessor(WorkflowPersistence workflowPersistence) {
}

public void process(final WorkflowInstance event) {
processBatch(List.of(event));
}

public void processBatch(final List<WorkflowInstance> events) {
log.debug("Processing workflow batch size: {}", events.size());

try {
this.workflowPersistence.persist(Objects.requireNonNull(event, "event cannot be null"));
log.debug("Successfully processed the workflow event with ID: {}", event.getId());
workflowPersistence.persistBatch(events);
log.debug("Successfully processed {} workflow events", events.size());
} catch (SQLException e) {
log.error("Error while processing the workflow event: {}", event, e);
throw new ProcessEventFailedException("Failed to process the workflow event with instance ID: " + event.getId(), e);
log.error("Error while processing workflow event batch", e);
throw new ProcessEventFailedException("Failed to process workflow event batch", e);
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.sql.SQLException;
import java.sql.Savepoint;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;

@Unremovable
Expand Down Expand Up @@ -66,6 +67,59 @@ public void persist(TaskExecution event) throws SQLException {
}
}

public void persistBatch(List<TaskExecution> events) throws SQLException {
if (events == null || events.isEmpty()) {
return;
}

log.debug("Persisting task DB batch size: {}", events.size());

try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(insertTaskUpsert)) {

conn.setAutoCommit(false);

try {
for (TaskExecution event : events) {
setTaskParameters(stmt, event);
stmt.addBatch();
}

int[] result = stmt.executeBatch();
conn.commit();

log.debug("Committed task DB batch size: {}, executeBatch result length: {}",
events.size(), result.length);
} catch (SQLException e) {
conn.rollback();
// A task may arrive before its workflow. The batch upsert cannot create the
// missing placeholder workflow, so fall back to per-record persistence which
// recovers from foreign key violations individually.
if (isForeignKeyViolation(e)) {
log.debug("Task batch hit foreign key violation; falling back to per-record persistence");
persistEachIndividually(events);
} else {
throw e;
}
}
}
}

private void persistEachIndividually(List<TaskExecution> events) throws SQLException {
for (TaskExecution event : events) {
persist(event);
}
}

private boolean isForeignKeyViolation(SQLException e) {
for (SQLException current = e; current != null; current = current.getNextException()) {
if (INVALID_FOREIGN_KEY.equals(current.getSQLState())) {
return true;
}
}
return false;
}

private void tryInsertTask(TaskExecution event, Connection conn) throws SQLException {
try (PreparedStatement stmt = conn.prepareStatement(insertTaskUpsert)) {
setTaskParameters(stmt, event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.sql.SQLException;
import java.sql.Types;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;

@Unremovable
Expand Down Expand Up @@ -58,6 +59,31 @@ public void persist(WorkflowInstance event) throws SQLException {
}
}

public void persistBatch(List<WorkflowInstance> events) throws SQLException {
if (events == null || events.isEmpty()) {
return;
}

try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(insertWorkflowUpsert)) {

conn.setAutoCommit(false);

try {
for (WorkflowInstance event : events) {
setWorkflowParameters(stmt, event);
stmt.addBatch();
}

stmt.executeBatch();
conn.commit();
} catch (SQLException e) {
conn.rollback();
throw e;
}
}
}

private void setWorkflowParameters(PreparedStatement stmt, WorkflowInstance event) throws SQLException {
stmt.setString(1, event.getId());
stmt.setString(2, event.getNamespace());
Expand Down Expand Up @@ -105,4 +131,4 @@ private String toJsonString(JsonNode node) {
}
}

}
}
Loading
Loading