Skip to content

Commit 9c6acb5

Browse files
Revision 2: Make batch update generic with String Templates and add comprehensive tests
1 parent f5ca0cd commit 9c6acb5

File tree

8 files changed

+1065
-322
lines changed

8 files changed

+1065
-322
lines changed

apps/opik-backend/src/main/java/com/comet/opik/domain/SpanDAO.java

Lines changed: 141 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -2252,84 +2252,168 @@ private void bindCost(Span span, Statement statement, String index) {
22522252
}
22532253
}
22542254

2255-
private static final String BULK_UPDATE_TAGS = """
2256-
INSERT INTO spans (
2257-
id,
2258-
project_id,
2259-
workspace_id,
2260-
trace_id,
2261-
parent_span_id,
2262-
name,
2263-
type,
2264-
start_time,
2265-
end_time,
2266-
input,
2267-
output,
2268-
metadata,
2269-
model,
2270-
provider,
2271-
total_estimated_cost,
2272-
total_estimated_cost_version,
2273-
tags,
2274-
usage,
2275-
error_info,
2276-
created_at,
2277-
created_by,
2278-
last_updated_by,
2279-
truncation_threshold
2280-
)
2281-
SELECT
2282-
s.id,
2283-
s.project_id,
2284-
s.workspace_id,
2285-
s.trace_id,
2286-
s.parent_span_id,
2287-
s.name,
2288-
s.type,
2289-
s.start_time,
2290-
s.end_time,
2291-
s.input,
2292-
s.output,
2293-
s.metadata,
2294-
s.model,
2295-
s.provider,
2296-
s.total_estimated_cost,
2297-
s.total_estimated_cost_version,
2298-
{TAGS_EXPRESSION} as tags,
2299-
s.usage, s.error_info, s.created_at, s.created_by, s.last_updated_by,
2300-
s.truncation_threshold
2255+
private static final String BULK_UPDATE = """
2256+
INSERT INTO spans (
2257+
id,
2258+
project_id,
2259+
workspace_id,
2260+
trace_id,
2261+
parent_span_id,
2262+
name,
2263+
type,
2264+
start_time,
2265+
end_time,
2266+
input,
2267+
output,
2268+
metadata,
2269+
model,
2270+
provider,
2271+
total_estimated_cost,
2272+
total_estimated_cost_version,
2273+
tags,
2274+
usage,
2275+
error_info,
2276+
created_at,
2277+
created_by,
2278+
last_updated_by,
2279+
truncation_threshold
2280+
)
2281+
SELECT
2282+
s.id,
2283+
s.project_id,
2284+
s.workspace_id,
2285+
s.trace_id,
2286+
s.parent_span_id,
2287+
<if(name)> :name <else> s.name <endif> as name,
2288+
<if(type)> :type <else> s.type <endif> as type,
2289+
s.start_time,
2290+
<if(end_time)> parseDateTime64BestEffort(:end_time, 9) <else> s.end_time <endif> as end_time,
2291+
<if(input)> :input <else> s.input <endif> as input,
2292+
<if(output)> :output <else> s.output <endif> as output,
2293+
<if(metadata)> :metadata <else> s.metadata <endif> as metadata,
2294+
<if(model)> :model <else> s.model <endif> as model,
2295+
<if(provider)> :provider <else> s.provider <endif> as provider,
2296+
<if(total_estimated_cost)> toDecimal128(:total_estimated_cost, 12) <else> s.total_estimated_cost <endif> as total_estimated_cost,
2297+
<if(total_estimated_cost_version)> :total_estimated_cost_version <else> s.total_estimated_cost_version <endif> as total_estimated_cost_version,
2298+
<if(tags)><if(merge_tags)>arrayConcat(s.tags, :tags)<else>:tags<endif><else>s.tags<endif> as tags,
2299+
<if(usage)> CAST((:usageKeys, :usageValues), 'Map(String, Int64)') <else> s.usage <endif> as usage,
2300+
<if(error_info)> :error_info <else> s.error_info <endif> as error_info,
2301+
s.created_at,
2302+
s.created_by,
2303+
:user_name as last_updated_by,
2304+
:truncation_threshold
23012305
FROM spans s
23022306
WHERE s.id IN :ids AND s.workspace_id = :workspace_id
23032307
ORDER BY (s.workspace_id, s.project_id, s.trace_id, s.parent_span_id, s.id) DESC, s.last_updated_at DESC
23042308
LIMIT 1 BY s.id;
2305-
""";
2309+
""";
23062310

23072311
@WithSpan
23082312
public Mono<Void> bulkUpdate(@NonNull Set<UUID> ids, @NonNull SpanUpdate update, boolean mergeTags) {
23092313
Preconditions.checkArgument(!ids.isEmpty(), "ids must not be empty");
2310-
Preconditions.checkArgument(update.tags() != null && !update.tags().isEmpty(), "tags must not be empty");
2311-
log.info("Bulk updating tags for '{}' spans", ids.size());
2314+
log.info("Bulk updating '{}' spans", ids.size());
23122315

2313-
String tagsExpression = mergeTags
2314-
? "arrayConcat(s.tags, :new_tags)"
2315-
: ":new_tags";
2316-
String query = BULK_UPDATE_TAGS.replace("{TAGS_EXPRESSION}", tagsExpression);
2316+
var template = newBulkUpdateTemplate(update, BULK_UPDATE, mergeTags);
2317+
var query = template.render();
23172318

23182319
return Mono.from(connectionFactory.create())
23192320
.flatMapMany(connection -> {
23202321
var statement = connection.createStatement(query)
2321-
.bind("ids", ids)
2322-
.bind("new_tags", update.tags().toArray(new String[0]));
2322+
.bind("ids", ids);
23232323

2324-
Segment segment = startSegment("spans", "Clickhouse", "bulk_update_tags");
2324+
bindBulkUpdateParams(update, statement);
2325+
TruncationUtils.bindTruncationThreshold(statement, "truncation_threshold", configuration);
23252326

2326-
return makeFluxContextAware(bindWorkspaceIdToFlux(statement))
2327+
Segment segment = startSegment("spans", "Clickhouse", "bulk_update");
2328+
2329+
return makeFluxContextAware(bindUserNameAndWorkspaceContextToStream(statement))
23272330
.doFinally(signalType -> endSegment(segment));
23282331
})
23292332
.then()
23302333
.doOnSuccess(__ -> log.info("Completed bulk update for '{}' spans", ids.size()));
23312334
}
23322335

2336+
private ST newBulkUpdateTemplate(SpanUpdate spanUpdate, String sql, boolean mergeTags) {
2337+
var template = new ST(sql);
2338+
2339+
if (StringUtils.isNotBlank(spanUpdate.name())) {
2340+
template.add("name", spanUpdate.name());
2341+
}
2342+
Optional.ofNullable(spanUpdate.type())
2343+
.ifPresent(type -> template.add("type", type.toString()));
2344+
Optional.ofNullable(spanUpdate.input())
2345+
.ifPresent(input -> template.add("input", input.toString()));
2346+
Optional.ofNullable(spanUpdate.output())
2347+
.ifPresent(output -> template.add("output", output.toString()));
2348+
Optional.ofNullable(spanUpdate.tags())
2349+
.ifPresent(tags -> {
2350+
template.add("tags", tags.toString());
2351+
template.add("merge_tags", mergeTags);
2352+
});
2353+
Optional.ofNullable(spanUpdate.metadata())
2354+
.ifPresent(metadata -> template.add("metadata", metadata.toString()));
2355+
if (StringUtils.isNotBlank(spanUpdate.model())) {
2356+
template.add("model", spanUpdate.model());
2357+
}
2358+
if (StringUtils.isNotBlank(spanUpdate.provider())) {
2359+
template.add("provider", spanUpdate.provider());
2360+
}
2361+
Optional.ofNullable(spanUpdate.endTime())
2362+
.ifPresent(endTime -> template.add("end_time", endTime.toString()));
2363+
Optional.ofNullable(spanUpdate.usage())
2364+
.ifPresent(usage -> template.add("usage", usage.toString()));
2365+
Optional.ofNullable(spanUpdate.errorInfo())
2366+
.ifPresent(errorInfo -> template.add("error_info", JsonUtils.readTree(errorInfo).toString()));
2367+
2368+
if (spanUpdate.totalEstimatedCost() != null) {
2369+
template.add("total_estimated_cost", "total_estimated_cost");
2370+
template.add("total_estimated_cost_version", "total_estimated_cost_version");
2371+
}
2372+
return template;
2373+
}
2374+
2375+
private void bindBulkUpdateParams(SpanUpdate spanUpdate, Statement statement) {
2376+
if (StringUtils.isNotBlank(spanUpdate.name())) {
2377+
statement.bind("name", spanUpdate.name());
2378+
}
2379+
Optional.ofNullable(spanUpdate.type())
2380+
.ifPresent(type -> statement.bind("type", type.toString()));
2381+
Optional.ofNullable(spanUpdate.input())
2382+
.ifPresent(input -> statement.bind("input", input.toString()));
2383+
Optional.ofNullable(spanUpdate.output())
2384+
.ifPresent(output -> statement.bind("output", output.toString()));
2385+
Optional.ofNullable(spanUpdate.tags())
2386+
.ifPresent(tags -> statement.bind("tags", tags.toArray(String[]::new)));
2387+
Optional.ofNullable(spanUpdate.usage())
2388+
.ifPresent(usage -> {
2389+
var usageKeys = new ArrayList<String>();
2390+
var usageValues = new ArrayList<Integer>();
2391+
for (var entry : usage.entrySet()) {
2392+
usageKeys.add(entry.getKey());
2393+
usageValues.add(entry.getValue());
2394+
}
2395+
statement.bind("usageKeys", usageKeys.toArray(String[]::new));
2396+
statement.bind("usageValues", usageValues.toArray(Integer[]::new));
2397+
});
2398+
Optional.ofNullable(spanUpdate.endTime())
2399+
.ifPresent(endTime -> statement.bind("end_time", endTime.toString()));
2400+
Optional.ofNullable(spanUpdate.metadata())
2401+
.ifPresent(metadata -> statement.bind("metadata", metadata.toString()));
2402+
if (StringUtils.isNotBlank(spanUpdate.model())) {
2403+
statement.bind("model", spanUpdate.model());
2404+
}
2405+
if (StringUtils.isNotBlank(spanUpdate.provider())) {
2406+
statement.bind("provider", spanUpdate.provider());
2407+
}
2408+
Optional.ofNullable(spanUpdate.errorInfo())
2409+
.ifPresent(errorInfo -> statement.bind("error_info", JsonUtils.readTree(errorInfo).toString()));
2410+
2411+
if (spanUpdate.totalEstimatedCost() != null) {
2412+
statement.bind("total_estimated_cost", spanUpdate.totalEstimatedCost().toString());
2413+
statement.bind("total_estimated_cost_version", "");
2414+
}
2415+
}
2416+
23332417
private JsonNode getMetadataWithProvider(Row row, Set<SpanField> exclude, String provider) {
23342418
// Parse base metadata from database
23352419
JsonNode baseMetadata = Optional

apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,9 @@ public Mono<Void> update(@NonNull UUID id, @NonNull SpanUpdate spanUpdate) {
209209
public Mono<Void> batchUpdate(@NonNull SpanBatchUpdate batchUpdate) {
210210
log.info("Batch updating '{}' spans", batchUpdate.ids().size());
211211

212-
Set<String> tags = batchUpdate.update().tags();
213-
if (tags == null || tags.isEmpty()) {
214-
log.info("No tags to update for '{}' spans", batchUpdate.ids().size());
215-
return Mono.empty();
216-
}
217-
218212
boolean mergeTags = Boolean.TRUE.equals(batchUpdate.mergeTags());
219213
return spanDAO.bulkUpdate(batchUpdate.ids(), batchUpdate.update(), mergeTags)
220-
.doOnSuccess(__ -> log.info("Bulk updated '{}' spans with {} tags",
221-
batchUpdate.ids().size(), mergeTags ? "merged" : "replaced"));
214+
.doOnSuccess(__ -> log.info("Completed batch update for '{}' spans", batchUpdate.ids().size()));
222215
}
223216

224217
private Mono<Long> insertUpdate(Project project, SpanUpdate spanUpdate, UUID id) {

0 commit comments

Comments
 (0)