Skip to content

Commit 7c1f7f5

Browse files
[OPIK-2497] [BE] Add batch tag update endpoints for traces, spans and threads
1 parent 1fe1337 commit 7c1f7f5

File tree

23 files changed

+1995
-75
lines changed

23 files changed

+1995
-75
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.comet.opik.api;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
5+
import com.fasterxml.jackson.databind.annotation.JsonNaming;
6+
import io.swagger.v3.oas.annotations.media.Schema;
7+
import jakarta.validation.Valid;
8+
import jakarta.validation.constraints.NotEmpty;
9+
import jakarta.validation.constraints.NotNull;
10+
import jakarta.validation.constraints.Size;
11+
import lombok.Builder;
12+
13+
import java.util.Set;
14+
import java.util.UUID;
15+
16+
@Builder(toBuilder = true)
17+
@JsonIgnoreProperties(ignoreUnknown = true)
18+
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
19+
@Schema(description = "Request to batch update multiple spans")
20+
public record SpanBatchUpdate(
21+
@NotNull @NotEmpty @Size(min = 1, max = 1000) @Schema(description = "List of span IDs to update (max 1000)") Set<UUID> ids,
22+
@NotNull @Valid @Schema(description = "Update to apply to all spans") SpanUpdate update,
23+
@Schema(description = "If true, merge tags with existing tags instead of replacing them. Default: false") Boolean mergeTags) {
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.comet.opik.api;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
5+
import com.fasterxml.jackson.databind.annotation.JsonNaming;
6+
import io.swagger.v3.oas.annotations.media.Schema;
7+
import jakarta.validation.Valid;
8+
import jakarta.validation.constraints.NotEmpty;
9+
import jakarta.validation.constraints.NotNull;
10+
import jakarta.validation.constraints.Size;
11+
import lombok.Builder;
12+
13+
import java.util.Set;
14+
import java.util.UUID;
15+
16+
@Builder(toBuilder = true)
17+
@JsonIgnoreProperties(ignoreUnknown = true)
18+
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
19+
@Schema(description = "Request to batch update multiple traces")
20+
public record TraceBatchUpdate(
21+
@NotNull @NotEmpty @Size(min = 1, max = 1000) @Schema(description = "List of trace IDs to update (max 1000)") Set<UUID> ids,
22+
@NotNull @Valid @Schema(description = "Update to apply to all traces") TraceUpdate update,
23+
@Schema(description = "If true, merge tags with existing tags instead of replacing them. Default: false") Boolean mergeTags) {
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.comet.opik.api;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
5+
import com.fasterxml.jackson.databind.annotation.JsonNaming;
6+
import io.swagger.v3.oas.annotations.media.Schema;
7+
import jakarta.validation.Valid;
8+
import jakarta.validation.constraints.NotEmpty;
9+
import jakarta.validation.constraints.NotNull;
10+
import jakarta.validation.constraints.Size;
11+
import lombok.Builder;
12+
13+
import java.util.Set;
14+
import java.util.UUID;
15+
16+
@Builder(toBuilder = true)
17+
@JsonIgnoreProperties(ignoreUnknown = true)
18+
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
19+
@Schema(description = "Request to batch update multiple trace threads")
20+
public record TraceThreadBatchUpdate(
21+
@NotNull @NotEmpty @Size(min = 1, max = 1000) @Schema(description = "List of thread model IDs to update (max 1000)") Set<UUID> ids,
22+
@NotNull @Valid @Schema(description = "Update to apply to all threads") TraceThreadUpdate update,
23+
@Schema(description = "If true, merge tags with existing tags instead of replacing them. Default: false") Boolean mergeTags) {
24+
}

apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/SpansResource.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.comet.opik.api.ProjectStats;
1313
import com.comet.opik.api.Span;
1414
import com.comet.opik.api.SpanBatch;
15+
import com.comet.opik.api.SpanBatchUpdate;
1516
import com.comet.opik.api.SpanSearchStreamRequest;
1617
import com.comet.opik.api.SpanUpdate;
1718
import com.comet.opik.api.filter.FiltersFactory;
@@ -232,6 +233,28 @@ public Response createSpans(
232233
return Response.noContent().build();
233234
}
234235

236+
@PATCH
237+
@Path("/batch")
238+
@Operation(operationId = "batchUpdateSpans", summary = "Batch update spans", description = "Update multiple spans", responses = {
239+
@ApiResponse(responseCode = "204", description = "No Content"),
240+
@ApiResponse(responseCode = "400", description = "Bad Request", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
241+
@RateLimited
242+
public Response batchUpdate(
243+
@RequestBody(content = @Content(schema = @Schema(implementation = SpanBatchUpdate.class))) @Valid @NotNull SpanBatchUpdate batchUpdate) {
244+
245+
String workspaceId = requestContext.get().getWorkspaceId();
246+
247+
log.info("Batch updating '{}' spans on workspaceId '{}'", batchUpdate.ids().size(), workspaceId);
248+
249+
spanService.batchUpdate(batchUpdate)
250+
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
251+
.block();
252+
253+
log.info("Batch updated '{}' spans on workspaceId '{}'", batchUpdate.ids().size(), workspaceId);
254+
255+
return Response.noContent().build();
256+
}
257+
235258
@PATCH
236259
@Path("{id}")
237260
@Operation(operationId = "updateSpan", summary = "Update span by id", description = "Update span by id", responses = {

apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/TracesResource.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
import com.comet.opik.api.Trace;
1717
import com.comet.opik.api.Trace.TracePage;
1818
import com.comet.opik.api.TraceBatch;
19+
import com.comet.opik.api.TraceBatchUpdate;
1920
import com.comet.opik.api.TraceSearchStreamRequest;
2021
import com.comet.opik.api.TraceThread;
2122
import com.comet.opik.api.TraceThreadBatchIdentifier;
23+
import com.comet.opik.api.TraceThreadBatchUpdate;
2224
import com.comet.opik.api.TraceThreadIdentifier;
2325
import com.comet.opik.api.TraceThreadSearchStreamRequest;
2426
import com.comet.opik.api.TraceThreadUpdate;
@@ -305,6 +307,28 @@ public Response createTraces(
305307
return Response.noContent().build();
306308
}
307309

310+
@PATCH
311+
@Path("/batch")
312+
@Operation(operationId = "batchUpdateTraces", summary = "Batch update traces", description = "Update multiple traces", responses = {
313+
@ApiResponse(responseCode = "204", description = "No Content"),
314+
@ApiResponse(responseCode = "400", description = "Bad Request", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
315+
@RateLimited
316+
public Response batchUpdate(
317+
@RequestBody(content = @Content(schema = @Schema(implementation = TraceBatchUpdate.class))) @Valid @NotNull TraceBatchUpdate batchUpdate) {
318+
319+
String workspaceId = requestContext.get().getWorkspaceId();
320+
321+
log.info("Batch updating '{}' traces on workspaceId '{}'", batchUpdate.ids().size(), workspaceId);
322+
323+
service.batchUpdate(batchUpdate)
324+
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
325+
.block();
326+
327+
log.info("Batch updated '{}' traces on workspaceId '{}'", batchUpdate.ids().size(), workspaceId);
328+
329+
return Response.noContent().build();
330+
}
331+
308332
@PATCH
309333
@Path("{id}")
310334
@Operation(operationId = "updateTrace", summary = "Update trace by id", description = "Update trace by id", responses = {
@@ -787,6 +811,28 @@ public Response closeTraceThread(
787811
return Response.noContent().build();
788812
}
789813

814+
@PATCH
815+
@Path("/threads/batch")
816+
@Operation(operationId = "batchUpdateThreads", summary = "Batch update threads", description = "Update multiple threads", responses = {
817+
@ApiResponse(responseCode = "204", description = "No Content"),
818+
@ApiResponse(responseCode = "400", description = "Bad Request", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
819+
@RateLimited
820+
public Response batchUpdateThreads(
821+
@RequestBody(content = @Content(schema = @Schema(implementation = TraceThreadBatchUpdate.class))) @Valid @NotNull TraceThreadBatchUpdate batchUpdate) {
822+
823+
String workspaceId = requestContext.get().getWorkspaceId();
824+
825+
log.info("Batch updating '{}' threads on workspaceId '{}'", batchUpdate.ids().size(), workspaceId);
826+
827+
traceThreadService.batchUpdate(batchUpdate)
828+
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
829+
.block();
830+
831+
log.info("Batch updated '{}' threads on workspaceId '{}'", batchUpdate.ids().size(), workspaceId);
832+
833+
return Response.noContent().build();
834+
}
835+
790836
@PATCH
791837
@Path("/threads/{threadModelId}")
792838
@Operation(operationId = "updateThread", summary = "Update thread", description = "Update thread", responses = {

apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2253,6 +2253,168 @@ private void bindCost(Span span, Statement statement, String index) {
22532253
}
22542254
}
22552255

2256+
private static final String BULK_UPDATE = """
2257+
INSERT INTO spans (
2258+
id,
2259+
project_id,
2260+
workspace_id,
2261+
trace_id,
2262+
parent_span_id,
2263+
name,
2264+
type,
2265+
start_time,
2266+
end_time,
2267+
input,
2268+
output,
2269+
metadata,
2270+
model,
2271+
provider,
2272+
total_estimated_cost,
2273+
total_estimated_cost_version,
2274+
tags,
2275+
usage,
2276+
error_info,
2277+
created_at,
2278+
created_by,
2279+
last_updated_by,
2280+
truncation_threshold
2281+
)
2282+
SELECT
2283+
s.id,
2284+
s.project_id,
2285+
s.workspace_id,
2286+
s.trace_id,
2287+
s.parent_span_id,
2288+
<if(name)> :name <else> s.name <endif> as name,
2289+
<if(type)> :type <else> s.type <endif> as type,
2290+
s.start_time,
2291+
<if(end_time)> parseDateTime64BestEffort(:end_time, 9) <else> s.end_time <endif> as end_time,
2292+
<if(input)> :input <else> s.input <endif> as input,
2293+
<if(output)> :output <else> s.output <endif> as output,
2294+
<if(metadata)> :metadata <else> s.metadata <endif> as metadata,
2295+
<if(model)> :model <else> s.model <endif> as model,
2296+
<if(provider)> :provider <else> s.provider <endif> as provider,
2297+
<if(total_estimated_cost)> toDecimal128(:total_estimated_cost, 12) <else> s.total_estimated_cost <endif> as total_estimated_cost,
2298+
<if(total_estimated_cost_version)> :total_estimated_cost_version <else> s.total_estimated_cost_version <endif> as total_estimated_cost_version,
2299+
<if(tags)><if(merge_tags)>arrayConcat(s.tags, :tags)<else>:tags<endif><else>s.tags<endif> as tags,
2300+
<if(usage)> CAST((:usageKeys, :usageValues), 'Map(String, Int64)') <else> s.usage <endif> as usage,
2301+
<if(error_info)> :error_info <else> s.error_info <endif> as error_info,
2302+
s.created_at,
2303+
s.created_by,
2304+
:user_name as last_updated_by,
2305+
:truncation_threshold
2306+
FROM spans s
2307+
WHERE s.id IN :ids AND s.workspace_id = :workspace_id
2308+
ORDER BY (s.workspace_id, s.project_id, s.trace_id, s.parent_span_id, s.id) DESC, s.last_updated_at DESC
2309+
LIMIT 1 BY s.id;
2310+
""";
2311+
2312+
@WithSpan
2313+
public Mono<Void> bulkUpdate(@NonNull Set<UUID> ids, @NonNull SpanUpdate update, boolean mergeTags) {
2314+
Preconditions.checkArgument(!ids.isEmpty(), "ids must not be empty");
2315+
log.info("Bulk updating '{}' spans", ids.size());
2316+
2317+
var template = newBulkUpdateTemplate(update, BULK_UPDATE, mergeTags);
2318+
var query = template.render();
2319+
2320+
return Mono.from(connectionFactory.create())
2321+
.flatMapMany(connection -> {
2322+
var statement = connection.createStatement(query)
2323+
.bind("ids", ids);
2324+
2325+
bindBulkUpdateParams(update, statement);
2326+
TruncationUtils.bindTruncationThreshold(statement, "truncation_threshold", configuration);
2327+
2328+
Segment segment = startSegment("spans", "Clickhouse", "bulk_update");
2329+
2330+
return makeFluxContextAware(bindUserNameAndWorkspaceContextToStream(statement))
2331+
.doFinally(signalType -> endSegment(segment));
2332+
})
2333+
.then()
2334+
.doOnSuccess(__ -> log.info("Completed bulk update for '{}' spans", ids.size()));
2335+
}
2336+
2337+
private ST newBulkUpdateTemplate(SpanUpdate spanUpdate, String sql, boolean mergeTags) {
2338+
var template = new ST(sql);
2339+
2340+
if (StringUtils.isNotBlank(spanUpdate.name())) {
2341+
template.add("name", spanUpdate.name());
2342+
}
2343+
Optional.ofNullable(spanUpdate.type())
2344+
.ifPresent(type -> template.add("type", type.toString()));
2345+
Optional.ofNullable(spanUpdate.input())
2346+
.ifPresent(input -> template.add("input", input.toString()));
2347+
Optional.ofNullable(spanUpdate.output())
2348+
.ifPresent(output -> template.add("output", output.toString()));
2349+
Optional.ofNullable(spanUpdate.tags())
2350+
.ifPresent(tags -> {
2351+
template.add("tags", tags.toString());
2352+
template.add("merge_tags", mergeTags);
2353+
});
2354+
Optional.ofNullable(spanUpdate.metadata())
2355+
.ifPresent(metadata -> template.add("metadata", metadata.toString()));
2356+
if (StringUtils.isNotBlank(spanUpdate.model())) {
2357+
template.add("model", spanUpdate.model());
2358+
}
2359+
if (StringUtils.isNotBlank(spanUpdate.provider())) {
2360+
template.add("provider", spanUpdate.provider());
2361+
}
2362+
Optional.ofNullable(spanUpdate.endTime())
2363+
.ifPresent(endTime -> template.add("end_time", endTime.toString()));
2364+
Optional.ofNullable(spanUpdate.usage())
2365+
.ifPresent(usage -> template.add("usage", usage.toString()));
2366+
Optional.ofNullable(spanUpdate.errorInfo())
2367+
.ifPresent(errorInfo -> template.add("error_info", JsonUtils.readTree(errorInfo).toString()));
2368+
2369+
if (spanUpdate.totalEstimatedCost() != null) {
2370+
template.add("total_estimated_cost", "total_estimated_cost");
2371+
template.add("total_estimated_cost_version", "total_estimated_cost_version");
2372+
}
2373+
return template;
2374+
}
2375+
2376+
private void bindBulkUpdateParams(SpanUpdate spanUpdate, Statement statement) {
2377+
if (StringUtils.isNotBlank(spanUpdate.name())) {
2378+
statement.bind("name", spanUpdate.name());
2379+
}
2380+
Optional.ofNullable(spanUpdate.type())
2381+
.ifPresent(type -> statement.bind("type", type.toString()));
2382+
Optional.ofNullable(spanUpdate.input())
2383+
.ifPresent(input -> statement.bind("input", input.toString()));
2384+
Optional.ofNullable(spanUpdate.output())
2385+
.ifPresent(output -> statement.bind("output", output.toString()));
2386+
Optional.ofNullable(spanUpdate.tags())
2387+
.ifPresent(tags -> statement.bind("tags", tags.toArray(String[]::new)));
2388+
Optional.ofNullable(spanUpdate.usage())
2389+
.ifPresent(usage -> {
2390+
var usageKeys = new ArrayList<String>();
2391+
var usageValues = new ArrayList<Integer>();
2392+
for (var entry : usage.entrySet()) {
2393+
usageKeys.add(entry.getKey());
2394+
usageValues.add(entry.getValue());
2395+
}
2396+
statement.bind("usageKeys", usageKeys.toArray(String[]::new));
2397+
statement.bind("usageValues", usageValues.toArray(Integer[]::new));
2398+
});
2399+
Optional.ofNullable(spanUpdate.endTime())
2400+
.ifPresent(endTime -> statement.bind("end_time", endTime.toString()));
2401+
Optional.ofNullable(spanUpdate.metadata())
2402+
.ifPresent(metadata -> statement.bind("metadata", metadata.toString()));
2403+
if (StringUtils.isNotBlank(spanUpdate.model())) {
2404+
statement.bind("model", spanUpdate.model());
2405+
}
2406+
if (StringUtils.isNotBlank(spanUpdate.provider())) {
2407+
statement.bind("provider", spanUpdate.provider());
2408+
}
2409+
Optional.ofNullable(spanUpdate.errorInfo())
2410+
.ifPresent(errorInfo -> statement.bind("error_info", JsonUtils.readTree(errorInfo).toString()));
2411+
2412+
if (spanUpdate.totalEstimatedCost() != null) {
2413+
statement.bind("total_estimated_cost", spanUpdate.totalEstimatedCost().toString());
2414+
statement.bind("total_estimated_cost_version", "");
2415+
}
2416+
}
2417+
22562418
private JsonNode getMetadataWithProvider(Row row, Set<SpanField> exclude, String provider) {
22572419
// Parse base metadata from database
22582420
JsonNode baseMetadata = Optional

apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.comet.opik.api.ProjectStats;
77
import com.comet.opik.api.Span;
88
import com.comet.opik.api.SpanBatch;
9+
import com.comet.opik.api.SpanBatchUpdate;
910
import com.comet.opik.api.SpanUpdate;
1011
import com.comet.opik.api.SpansCountResponse;
1112
import com.comet.opik.api.attachment.AttachmentInfo;
@@ -204,6 +205,15 @@ public Mono<Void> update(@NonNull UUID id, @NonNull SpanUpdate spanUpdate) {
204205
.then()))));
205206
}
206207

208+
@WithSpan
209+
public Mono<Void> batchUpdate(@NonNull SpanBatchUpdate batchUpdate) {
210+
log.info("Batch updating '{}' spans", batchUpdate.ids().size());
211+
212+
boolean mergeTags = Boolean.TRUE.equals(batchUpdate.mergeTags());
213+
return spanDAO.bulkUpdate(batchUpdate.ids(), batchUpdate.update(), mergeTags)
214+
.doOnSuccess(__ -> log.info("Completed batch update for '{}' spans", batchUpdate.ids().size()));
215+
}
216+
207217
private Mono<Long> insertUpdate(Project project, SpanUpdate spanUpdate, UUID id) {
208218
return IdGenerator
209219
.validateVersionAsync(id, SPAN_KEY)

0 commit comments

Comments
 (0)