Skip to content

Commit 1eb38ea

Browse files
Enhance batch operations configuration by adding analytics settings for bulk update chunk size and stream buffer size. Update DAO implementations to utilize these new configuration parameters for improved performance in bulk operations.
1 parent 9999973 commit 1eb38ea

File tree

5 files changed

+45
-17
lines changed

5 files changed

+45
-17
lines changed

apps/opik-backend/config.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,13 @@ batchOperations:
204204
# Description: The maximal number of ids to be used for IN clause. Find requests with a larger number of ids will
205205
# involve the use of temp tables for querying
206206
maxExperimentInClauseSize: ${BATCH_OPERATIONS_MAX_EXPERIMENT_IN_CLAUSE_SIZE:-5000}
207+
analytics:
208+
# Default: 100
209+
# Description: Chunk size for bulk update operations (traces, spans, threads). ClickHouse performs best with batch sizes of 100-1000 records per query.
210+
bulkUpdateChunkSize: ${BATCH_OPERATIONS_BULK_UPDATE_CHUNK_SIZE:-100}
211+
# Default: 1000
212+
# Description: Buffer size for streaming operations to batch records for efficient processing
213+
streamBufferSize: ${BATCH_OPERATIONS_STREAM_BUFFER_SIZE:-1000}
207214

208215
# Configuration for rate limit. This is not enabled by default for open source installations.
209216
# If enabled, rate limit is applied to creation and update of various entities including traces, spans, projects,

apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2365,12 +2365,11 @@ public Mono<Void> bulkUpdateTags(@NonNull Map<UUID, SpanUpdate> idToUpdateMap) {
23652365
.map(id -> idToUpdateMap.get(id).tags().toArray(new String[0]))
23662366
.toList();
23672367

2368-
int CHUNK_SIZE = 100;
2369-
2370-
return Flux.range(0, (ids.size() + CHUNK_SIZE - 1) / CHUNK_SIZE)
2368+
int chunkSize = configuration.getBatchOperations().getAnalytics().getBulkUpdateChunkSize();
2369+
return Flux.range(0, (ids.size() + chunkSize - 1) / chunkSize)
23712370
.flatMap(chunkIndex -> {
2372-
int start = chunkIndex * CHUNK_SIZE;
2373-
int end = Math.min(start + CHUNK_SIZE, ids.size());
2371+
int start = chunkIndex * chunkSize;
2372+
int end = Math.min(start + chunkSize, ids.size());
23742373

23752374
List<UUID> chunkIds = ids.subList(start, end);
23762375
List<String[]> chunkTags = tagsArrays.subList(start, end);

apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3614,12 +3614,11 @@ public Mono<Void> bulkUpdateTags(@NonNull Map<UUID, TraceUpdate> idToUpdateMap)
36143614
.map(id -> idToUpdateMap.get(id).tags().toArray(new String[0]))
36153615
.toList();
36163616

3617-
int CHUNK_SIZE = 100;
3618-
3619-
return Flux.range(0, (ids.size() + CHUNK_SIZE - 1) / CHUNK_SIZE)
3617+
int chunkSize = configuration.getBatchOperations().getAnalytics().getBulkUpdateChunkSize();
3618+
return Flux.range(0, (ids.size() + chunkSize - 1) / chunkSize)
36203619
.flatMap(chunkIndex -> {
3621-
int start = chunkIndex * CHUNK_SIZE;
3622-
int end = Math.min(start + CHUNK_SIZE, ids.size());
3620+
int start = chunkIndex * chunkSize;
3621+
int end = Math.min(start + chunkSize, ids.size());
36233622

36243623
List<UUID> chunkIds = ids.subList(start, end);
36253624
List<String[]> chunkTags = tagsArrays.subList(start, end);

apps/opik-backend/src/main/java/com/comet/opik/domain/threads/TraceThreadDAO.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.comet.opik.api.TraceThreadStatus;
55
import com.comet.opik.api.TraceThreadUpdate;
66
import com.comet.opik.api.events.ProjectWithPendingClosureTraceThreads;
7+
import com.comet.opik.infrastructure.OpikConfiguration;
78
import com.comet.opik.infrastructure.db.TransactionTemplateAsync;
89
import com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils;
910
import com.comet.opik.utils.TemplateUtils;
@@ -284,6 +285,7 @@ AND last_updated_at < parseDateTime64BestEffort(:last_updated_at, 6)
284285

285286
private final @NonNull TransactionTemplateAsync asyncTemplate;
286287
private final @NonNull ConnectionFactory connectionFactory;
288+
private final @NonNull OpikConfiguration configuration;
287289

288290
@Override
289291
public Mono<Long> save(@NonNull List<TraceThreadModel> traceThreads) {
@@ -569,7 +571,7 @@ public Flux<List<TraceThreadModel>> streamPendingClosureThreads(@NonNull UUID pr
569571

570572
return makeFluxContextAware(bindWorkspaceIdToFlux(statement))
571573
.flatMap(result -> result.map((row, rowMetadata) -> TraceThreadMapper.INSTANCE.mapFromRow(row)));
572-
}).buffer(1000);
574+
}).buffer(configuration.getBatchOperations().getAnalytics().getStreamBufferSize());
573575
}
574576

575577
private void bindTemplateParam(TraceThreadCriteria criteria, ST template) {
@@ -620,7 +622,10 @@ public Flux<ThreadIdWithTagsAndMetadata> getIdsTagsAndMetadata(@NonNull List<UUI
620622
var statement = connection.createStatement(SELECT_IDS_TAGS_AND_METADATA)
621623
.bind("ids", ids);
622624

623-
return makeFluxContextAware(bindWorkspaceIdToFlux(statement));
625+
Segment segment = startSegment("trace_threads", "Clickhouse", "get_ids_tags_and_metadata");
626+
627+
return makeFluxContextAware(bindWorkspaceIdToFlux(statement))
628+
.doFinally(signalType -> endSegment(segment));
624629
})
625630
.flatMap(result -> result.map((row, metadata) -> {
626631
var tags = row.get("tags", String[].class);
@@ -679,12 +684,11 @@ public Mono<Void> bulkUpdateTags(@NonNull Map<ThreadIdWithTagsAndMetadata, Trace
679684
.toList();
680685
UUID projectId = metas.get(0).projectId(); // All should have same projectId
681686

682-
int CHUNK_SIZE = 100;
683-
684-
return Flux.range(0, (ids.size() + CHUNK_SIZE - 1) / CHUNK_SIZE)
687+
int chunkSize = configuration.getBatchOperations().getAnalytics().getBulkUpdateChunkSize();
688+
return Flux.range(0, (ids.size() + chunkSize - 1) / chunkSize)
685689
.flatMap(chunkIndex -> {
686-
int start = chunkIndex * CHUNK_SIZE;
687-
int end = Math.min(start + CHUNK_SIZE, ids.size());
690+
int start = chunkIndex * chunkSize;
691+
int end = Math.min(start + chunkSize, ids.size());
688692

689693
List<UUID> chunkIds = ids.subList(start, end);
690694
List<String[]> chunkTags = tagsArrays.subList(start, end);

apps/opik-backend/src/main/java/com/comet/opik/infrastructure/BatchOperationsConfig.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,26 @@ public static class DatasetsConfig {
1414
private @Valid @JsonProperty @Positive int maxExperimentInClauseSize;
1515
}
1616

17+
@Data
18+
public static class AnalyticsConfig {
19+
/**
20+
* Chunk size for bulk update operations (traces, spans, threads).
21+
* ClickHouse performs best with batch sizes of 100-1000 records per query.
22+
* Default: 100
23+
*/
24+
private @Valid @JsonProperty @Positive int bulkUpdateChunkSize = 100;
25+
26+
/**
27+
* Buffer size for streaming operations to batch records for efficient processing.
28+
* Default: 1000
29+
*/
30+
private @Valid @JsonProperty @Positive int streamBufferSize = 1000;
31+
}
32+
1733
@Valid @JsonProperty
1834
@NotNull private DatasetsConfig datasets;
1935

36+
@Valid @JsonProperty
37+
@NotNull private AnalyticsConfig analytics = new AnalyticsConfig();
38+
2039
}

0 commit comments

Comments
 (0)