diff --git a/DESCRIPTION b/DESCRIPTION index c110b1e48..dfbd52636 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -30,7 +30,7 @@ Imports: jsonlite, later (>= 1.4.0), lifecycle, - promises (>= 1.3.1), + promises (>= 1.5.0), R6, rlang (>= 1.1.0), S7 (>= 0.2.0), @@ -45,6 +45,8 @@ Suggests: knitr, magick, openssl, + otel (>= 0.2.0), + otelsdk (>= 0.2.0), paws.common, png, rmarkdown, @@ -82,11 +84,13 @@ Collate: 'content-pdf.R' 'content-replay.R' 'httr2.R' + 'import-standalone-defer.R' 'import-standalone-obj-type.R' 'import-standalone-purrr.R' 'import-standalone-types-check.R' 'interpolate.R' 'live.R' + 'otel.R' 'parallel-chat.R' 'params.R' 'provider-any.R' diff --git a/R/chat-tools.R b/R/chat-tools.R index dbe64a65e..1b763dcbd 100644 --- a/R/chat-tools.R +++ b/R/chat-tools.R @@ -33,7 +33,8 @@ on_load({ echo = "none", on_tool_request = function(request) invisible(), on_tool_result = function(result) invisible(), - yield_request = FALSE + yield_request = FALSE, + otel_span = NULL ) { tool_requests <- extract_tool_requests(turn) @@ -51,7 +52,7 @@ on_load({ next } - result <- invoke_tool(request) + result <- invoke_tool(request, otel_span = otel_span) if (promises::is.promise(result@value)) { cli::cli_abort( @@ -78,7 +79,8 @@ on_load({ echo = "none", on_tool_request = function(request) invisible(), on_tool_result = function(result) invisible(), - yield_request = FALSE + yield_request = FALSE, + otel_span = NULL ) { tool_requests <- extract_tool_requests(turn) @@ -94,7 +96,7 @@ on_load({ return(rejected) } - result <- coro::await(invoke_tool_async(request)) + result <- coro::await(invoke_tool_async(request, otel_span = otel_span)) maybe_echo_tool(result, echo = echo) on_tool_result(result) @@ -144,7 +146,7 @@ new_tool_result <- function(request, result = NULL, error = NULL) { } # Also need to handle edge cases: https://platform.openai.com/docs/guides/function-calling/edge-cases -invoke_tool <- function(request) { +invoke_tool <- function(request, otel_span = NULL) { if (is.null(request@tool)) { return(new_tool_result(request, error = "Unknown tool")) } @@ -155,19 +157,25 @@ invoke_tool <- function(request) { return(args) } + tool_span <- local_tool_otel_span( + request, + parent = otel_span + ) + tryCatch( { result <- do.call(request@tool, args) new_tool_result(request, result) }, error = function(e) { + record_tool_otel_span_error(tool_span, e) new_tool_result(request, error = e) } ) } on_load( - invoke_tool_async <- coro::async(function(request) { + invoke_tool_async <- coro::async(function(request, otel_span = NULL) { if (is.null(request@tool)) { return(new_tool_result(request, error = "Unknown tool")) } @@ -178,12 +186,18 @@ on_load( return(args) } + tool_span <- local_tool_otel_span( + request, + parent = otel_span + ) + tryCatch( { result <- await(do.call(request@tool, args)) new_tool_result(request, result) }, error = function(e) { + record_tool_otel_span_error(tool_span, e) new_tool_result(request, error = e) } ) diff --git a/R/chat.R b/R/chat.R index 18797cce5..2bafc0bb6 100644 --- a/R/chat.R +++ b/R/chat.R @@ -456,14 +456,17 @@ Chat <- R6::R6Class( yield_as_content = FALSE ) { tool_errors <- list() - withr::defer(warn_tool_errors(tool_errors)) + defer(warn_tool_errors(tool_errors)) + + agent_span <- local_agent_otel_span(private$provider, activate = FALSE) while (!is.null(user_turn)) { assistant_chunks <- private$submit_turns( user_turn, stream = stream, echo = echo, - yield_as_content = yield_as_content + yield_as_content = yield_as_content, + otel_span = agent_span ) for (chunk in assistant_chunks) { yield(chunk) @@ -478,7 +481,8 @@ Chat <- R6::R6Class( echo = echo, on_tool_request = private$callback_on_tool_request$invoke, on_tool_result = private$callback_on_tool_result$invoke, - yield_request = yield_as_content + yield_request = yield_as_content, + otel_span = agent_span ) tool_results <- list() @@ -515,14 +519,17 @@ Chat <- R6::R6Class( yield_as_content = FALSE ) { tool_errors <- list() - withr::defer(warn_tool_errors(tool_errors)) + defer(warn_tool_errors(tool_errors)) + + agent_span <- local_agent_otel_span(private$provider, activate = FALSE) while (!is.null(user_turn)) { assistant_chunks <- private$submit_turns_async( user_turn, stream = stream, echo = echo, - yield_as_content = yield_as_content + yield_as_content = yield_as_content, + otel_span = agent_span ) for (chunk in await_each(assistant_chunks)) { yield(chunk) @@ -537,11 +544,12 @@ Chat <- R6::R6Class( echo = echo, on_tool_request = private$callback_on_tool_request$invoke_async, on_tool_result = private$callback_on_tool_result$invoke_async, - yield_request = yield_as_content + yield_request = yield_as_content, + otel_span = agent_span ) if (tool_mode == "sequential") { tool_results <- list() - for (tool_step in coro::await_each(tool_calls)) { + for (tool_step in await_each(tool_calls)) { if (yield_as_content) { yield(tool_step) } @@ -587,19 +595,27 @@ Chat <- R6::R6Class( stream, echo, type = NULL, - yield_as_content = FALSE + yield_as_content = FALSE, + otel_span = NULL ) { if (echo == "all") { cat_line(format(user_turn), prefix = "> ") } + chat_span <- local_chat_otel_span( + private$provider, + parent = otel_span + ) + response <- chat_perform( provider = private$provider, mode = if (stream) "stream" else "value", turns = c(private$.turns, list(user_turn)), tools = if (is.null(type)) private$tools, - type = type + type = type, + otel_span = chat_span ) + emit <- emitter(echo) any_text <- FALSE @@ -619,9 +635,15 @@ Chat <- R6::R6Class( result <- stream_merge_chunks(private$provider, result, chunk) } - turn <- value_turn(private$provider, result, has_type = !is.null(type)) + record_chat_otel_span_status(chat_span, result) + turn <- value_turn( + private$provider, + result, + has_type = !is.null(type) + ) turn <- match_tools(turn, private$tools) } else { + record_chat_otel_span_status(chat_span, response) turn <- value_turn( private$provider, resp_body_json(response), @@ -673,15 +695,23 @@ Chat <- R6::R6Class( stream, echo, type = NULL, - yield_as_content = FALSE + yield_as_content = FALSE, + otel_span = NULL ) { + chat_span <- local_chat_otel_span( + private$provider, + parent = otel_span + ) + response <- chat_perform( provider = private$provider, mode = if (stream) "async-stream" else "async-value", turns = c(private$.turns, list(user_turn)), tools = if (is.null(type)) private$tools, - type = type + type = type, + otel_span = chat_span ) + emit <- emitter(echo) any_text <- FALSE @@ -701,10 +731,16 @@ Chat <- R6::R6Class( result <- stream_merge_chunks(private$provider, result, chunk) } - turn <- value_turn(private$provider, result, has_type = !is.null(type)) + record_chat_otel_span_status(chat_span, result) + turn <- value_turn( + private$provider, + result, + has_type = !is.null(type) + ) } else { result <- await(response) + record_chat_otel_span_status(chat_span, result) turn <- value_turn( private$provider, resp_body_json(result), diff --git a/R/httr2.R b/R/httr2.R index 7a20e2839..be5ca7cbb 100644 --- a/R/httr2.R +++ b/R/httr2.R @@ -6,12 +6,15 @@ chat_perform <- function( mode = c("value", "stream", "async-stream", "async-value"), turns, tools = NULL, - type = NULL + type = NULL, + otel_span = NULL ) { mode <- arg_match(mode) stream <- mode %in% c("stream", "async-stream") tools <- tools %||% list() + setup_active_promise_otel_span(otel_span) + req <- chat_request( provider = provider, turns = turns, @@ -23,14 +26,28 @@ chat_perform <- function( switch( mode, "value" = req_perform(req), - "stream" = chat_perform_stream(provider, req), + "stream" = chat_perform_stream( + provider, + req, + otel_span = otel_span + ), "async-value" = req_perform_promise(req), - "async-stream" = chat_perform_async_stream(provider, req) + "async-stream" = chat_perform_async_stream( + provider, + req, + otel_span = otel_span + ) ) } on_load( - chat_perform_stream <- coro::generator(function(provider, req) { + chat_perform_stream <- coro::generator(function( + provider, + req, + otel_span = NULL + ) { + setup_active_promise_otel_span(otel_span) + resp <- req_perform_connection(req) on.exit(close(resp)) @@ -47,7 +64,13 @@ on_load( ) on_load( - chat_perform_async_stream <- coro::async_generator(function(provider, req) { + chat_perform_async_stream <- coro::async_generator(function( + provider, + req, + otel_span = NULL + ) { + setup_active_promise_otel_span(otel_span) + resp <- req_perform_connection(req, blocking = FALSE) on.exit(close(resp)) diff --git a/R/import-standalone-defer.R b/R/import-standalone-defer.R new file mode 100644 index 000000000..e689e5092 --- /dev/null +++ b/R/import-standalone-defer.R @@ -0,0 +1,35 @@ +# Standalone file: do not edit by hand +# Source: https://github.com/r-lib/withr/blob/HEAD/R/standalone-defer.R +# Generated by: usethis::use_standalone("r-lib/withr", "defer") +# ---------------------------------------------------------------------- +# +# --- +# repo: r-lib/withr +# file: standalone-defer.R +# last-updated: 2024-01-15 +# license: https://unlicense.org +# --- +# +# `defer()` is similar to `on.exit()` but with a better default for +# `add` (hardcoded to `TRUE`) and `after` (`FALSE` by default). +# It also supports adding handlers to other frames which is useful +# to implement `local_` functions. +# +# +# ## Changelog +# +# 2024-01-15: +# * Rewritten to be pure base R. +# +# nocov start + +defer <- function(expr, envir = parent.frame(), after = FALSE) { + thunk <- as.call(list(function() expr)) + do.call( + on.exit, + list(thunk, add = TRUE, after = after), + envir = envir + ) +} + +# nocov end diff --git a/R/otel.R b/R/otel.R new file mode 100644 index 000000000..0849de7b7 --- /dev/null +++ b/R/otel.R @@ -0,0 +1,193 @@ +otel_tracer_name <- "co.posit.r-package.ellmer" + +otel_cache_tracer <- NULL +local_chat_otel_span <- NULL +local_tool_otel_span <- NULL +local_agent_otel_span <- NULL + +local({ + otel_is_tracing <- FALSE + otel_tracer <- NULL + + otel_cache_tracer <<- function() { + if (!requireNamespace("otel", quietly = TRUE)) { + return() + } + otel_tracer <<- otel::get_tracer(otel_tracer_name) + otel_is_tracing <<- tracer_enabled(otel_tracer) + } + + local_chat_otel_span <<- function( + provider, + parent = NULL, + local_envir = parent.frame() + ) { + if (!otel_is_tracing) { + return() + } + chat_span <- + otel::start_span( + sprintf("chat %s", provider@model), + options = list( + parent = parent, + kind = "client" + ), + attributes = list( + "gen_ai.operation.name" = "chat", + "gen_ai.provider.name" = tolower(provider@name), + "gen_ai.request.model" = provider@model + ), + tracer = otel_tracer + ) + + defer(otel::end_span(chat_span), envir = local_envir) + + chat_span + } + + # Starts an Open Telemetry span that abides by the semantic conventions for + # Generative AI tool calls. + # + # Must be activated for the calling scope. + # + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span + local_tool_otel_span <<- function( + request, + parent = NULL, + local_envir = parent.frame() + ) { + if (!otel_is_tracing) { + return() + } + tool_span <- + otel::start_span( + sprintf("execute_tool %s", request@tool@name), + options = list(parent = parent), + attributes = compact(list( + "gen_ai.operation.name" = "execute_tool", + "gen_ai.tool.name" = request@tool@name, + "gen_ai.tool.description" = request@tool@description, + "gen_ai.tool.call.id" = request@id + )), + tracer = otel_tracer + ) + + setup_active_promise_otel_span(tool_span, local_envir) + + defer(otel::end_span(tool_span), envir = local_envir) + + tool_span + } + + # Starts an Open Telemetry span that abides by the semantic conventions for + # Generative AI "agents". + # + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference + # local_otel_span_agent + local_agent_otel_span <<- function( + provider, + activate = TRUE, + local_envir = parent.frame() + ) { + if (!otel_is_tracing) { + return() + } + if (activate) { + abort(c( + "Activating the agent span is not supported at this time.", + "*" = "Activating the span here would set it as the active span globally (via otel::local_active_span() until the calling function ends (a long time).", + "*" = "`coro::setup()` would address this and be appropriate", + "i" = "Work around: Activate only where necessary or over a single yield in the calling scope." + )) + } + agent_span <- + otel::start_span( + "invoke_agent", + options = list(kind = "client"), + attributes = list( + "gen_ai.operation.name" = "chat", + "gen_ai.provider.name" = tolower(provider@name) + ), + tracer = otel_tracer + ) + + ## Do not activate! + ## The current usage of `local_agent_otel_span()` is in a multi-step coroutine. + ## This would require deactivating only after the coroutine is done, + ## but not between yields, that is too long and unpredictable. + ## The span should only be activated in the specific steps where it is needed. + # setup_active_promise_otel_span(agent_span, local_envir) + + defer(otel::end_span(agent_span), envir = local_envir) + + agent_span + } +}) + +tracer_enabled <- function(tracer) { + .subset2(tracer, "is_enabled")() +} + +span_recording <- function(span) { + .subset2(span, "is_recording")() +} + +with_otel_record <- function(expr) { + on.exit(otel_cache_tracer()) + otelsdk::with_otel_record({ + otel_cache_tracer() + expr + }) +} + +record_chat_otel_span_status <- function(span, result) { + if (is.null(span) || !span_recording(span)) { + return() + } + if (!is.null(result$model)) { + span$set_attribute("gen_ai.response.model", result$model) + } + if (!is.null(result$id)) { + span$set_attribute("gen_ai.response.id", result$id) + } + # TODO: Fixme @atheriel! + # if (!is.null(result$usage)) { + # span$set_attribute("gen_ai.usage.input_tokens", result$usage$prompt_tokens) + # span$set_attribute( + # "gen_ai.usage.output_tokens", + # result$usage$completion_tokens + # ) + # } + # TODO: Consider setting gen_ai.response.finish_reasons. + span$set_status("ok") +} + +record_tool_otel_span_error <- function(span, error) { + if (is.null(span) || !span_recording(span)) { + return() + } + span$record_exception(error) + span$set_status("error") + span$set_attribute("error.type", class(error)[1L]) +} + +# Only activate the span if it is non-NULL. If +# otel_promise_domain is TRUE, also ensure that the active span is reactivated upon promise domain restoration. +#' Activate and use handoff promise domain for Open Telemetry span +#' +#' @param otel_span An Open Telemetry span object. +#' @param activation_scope The scope in which to activate the span. +#' @noRd +setup_active_promise_otel_span <- function( + span, + activation_scope = parent.frame() +) { + if (is.null(span) || !span_recording(span)) { + return() + } + + promises::local_otel_promise_domain(activation_scope) + otel::local_active_span(span, activation_scope = activation_scope) + + invisible() +} diff --git a/R/provider-claude.R b/R/provider-claude.R index 10212ade4..146c0a637 100644 --- a/R/provider-claude.R +++ b/R/provider-claude.R @@ -634,7 +634,7 @@ method(batch_retrieve, ProviderAnthropic) <- function(provider, batch) { req <- req_url(req, batch$results_url) req <- req_progress(req, "down") - path <- withr::local_tempfile() + path <- local_tempfile() req <- req_perform(req, path = path) lines <- readLines(path, warn = FALSE) diff --git a/R/provider-openai.R b/R/provider-openai.R index 3069ce64a..79bec29eb 100644 --- a/R/provider-openai.R +++ b/R/provider-openai.R @@ -463,7 +463,7 @@ method(batch_submit, ProviderOpenAI) <- function( conversations, type = NULL ) { - path <- withr::local_tempfile() + path <- local_tempfile() # First put the requests in a file # https://platform.openai.com/docs/api-reference/batch/request-input diff --git a/R/tokens.R b/R/tokens.R index 7232a5d44..16be827f3 100644 --- a/R/tokens.R +++ b/R/tokens.R @@ -85,7 +85,7 @@ local_tokens <- function(frame = parent.frame()) { old <- the$tokens the$tokens <- tokens_row() - defer(the$tokens <- old, env = frame) + defer(the$tokens <- old, envir = frame) } #' Report on token usage in the current session diff --git a/R/utils.R b/R/utils.R index 0828ac64d..1838bdb39 100644 --- a/R/utils.R +++ b/R/utils.R @@ -30,11 +30,6 @@ key_exists <- function(name) { !identical(Sys.getenv(name), "") } -defer <- function(expr, env = caller_env(), after = FALSE) { - thunk <- as.call(list(function() expr)) - do.call(on.exit, list(thunk, TRUE, after), envir = env) -} - set_default <- function(value, default, arg = caller_arg(value)) { if (is.null(value)) { if (!is_testing() || is_snapshot()) { diff --git a/R/zzz.R b/R/zzz.R index ef0a14dae..6c3f385f4 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -1,6 +1,7 @@ .onLoad <- function(libname, pkgname) { run_on_load() S7::methods_register() + otel_cache_tracer() } # Work around S7 bug diff --git a/tests/testthat/test-otel.R b/tests/testthat/test-otel.R new file mode 100644 index 000000000..aa0faf5fc --- /dev/null +++ b/tests/testthat/test-otel.R @@ -0,0 +1,237 @@ +test_that("tracing works as expected for synchronous chats", { + skip_if_not_installed("otelsdk") + + # Capture spans from a typical synchronous chat with tool calls. + spans <- with_otel_record({ + test_tools_simple(chat_openai_test) + })[["traces"]] + + # Check we have two top-level "invoke_agent" spans (one for each chat() + # invocation) that start their respective traces. + agent_spans <- Filter(function(x) x$name == "invoke_agent", spans) + expect_length(agent_spans, 2L) + expect_equal(agent_spans[[1L]]$parent, agent_spans[[2L]]$parent) + expect_equal(agent_spans[[1L]]$kind, "client") + agent_span_ids <- sapply(agent_spans, function(x) x$span_id) + + # We should have (at least) two "execute_tool" spans + # (one for each tool invocation) that are children of the agent spans. + # Note 2025/11: Some models may call tools early and cache the results, so we can not check for existance of multiple parent ids (newer openai models) + # Note 2025/11: Some models call more tools than necessary + # Ex: anthropic calls the date tool twice. So we check for at least 2. + tool_spans <- Filter(function(x) startsWith(x$name, "execute_tool"), spans) + expect_gte(length(tool_spans), 2L) + expect_true(all(vapply( + tool_spans, + function(x) x$parent %in% agent_span_ids, + logical(1) + ))) + expect_equal(tool_spans[[1L]]$kind, "internal") + + # And "chat" spans that correspond to model calls before and after each + # tool call -- these are also children of the agent spans and siblings of one + # another. + # Note 2025/09: Not all models are calling tools the same. There must be AT LEAST 2 chat spans. Ex: anthropic has 4, openai has 3. :shrug: + chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) + expect_gte(length(chat_spans), 2L) + expect_true(all(vapply( + chat_spans, + function(x) x$parent %in% agent_span_ids, + logical(1) + ))) + expect_equal(chat_spans[[1L]]$kind, "client") + + # Ensure we record the result (and therefore set the status) of chat spans. + expect_true(all(vapply(chat_spans, function(x) x$status == "ok", logical(1)))) + + # We should also get some underlying HTTP spans that correspond to the model + # calls. + chat_span_ids <- sapply(chat_spans, function(x) x$span_id) + http_spans <- Filter(function(x) startsWith(x$name, "POST"), spans) + expect_true(all(vapply( + http_spans, + function(x) x$parent %in% chat_span_ids, + logical(1) + ))) +}) + +test_that("tracing works as expected for synchronous streams", { + skip_if_not_installed("otelsdk") + + # Capture spans when a stream is suspended. + spans <- with_otel_record({ + chat <- chat_openai_test(system_prompt = "Be terse.") + stream <- chat$stream("What's your favourite Apache Spark feature?") + local(otel::start_local_active_span("simultaneous")) + coro::collect(stream) + })[["traces"]] + + # Check we have one top-level "invoke_agent" span. + expect_length(spans[names(spans) == "invoke_agent"], 1L) + + # And one child span for the stream (confirming that it ends when collected). + chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) + expect_length(chat_spans, 1L) + expect_equal(chat_spans[[1L]]$parent, spans[["invoke_agent"]]$span_id) + + # Verify that the span started when the stream was suspended is not part of + # the agent trace. + expect_equal(spans[["simultaneous"]]$parent, spans[["invoke_agent"]]$parent) + + # But that HTTP spans are. + chat_span_ids <- sapply(chat_spans, function(x) x$span_id) + http_spans <- Filter(function(x) startsWith(x$name, "POST"), spans) + expect_true(all(vapply( + http_spans, + function(x) x$parent %in% chat_span_ids, + logical(1) + ))) +}) + +test_that("tracing works as expected for asynchronous chats", { + skip_if_not_installed("otelsdk") + + # chat <- chat_openai_test( + chat <- chat_anthropic_test( + system_prompt = "Always use a tool to answer. Reply with 'It is ____.'." + ) + chat$register_tool(tool( + coro::async(function() "2024-01-01"), + "Return the current date", + name = "current_date" + )) + + # Capture spans for an async chat with async tool calls interleaved with + # other synchronous and asynchronous spans. + spans <- with_otel_record({ + p1 <- chat$chat_async("What's the current date in Y-M-D format?") |> + promises::then(function(result) { + chat$chat_async("What date will it be 47 days from now?") + }) + p2 <- promises::promise(function(resolve, reject) { + span <- otel::start_span("concurrent") + otel::local_active_span(span) # just to try to mess with things + later::later( + function() { + otel::local_active_span(span) # just to try to mess with things + resolve(NULL) + span$end() + }, + 0.1 + ) + }) + + local({ + # Typical external usage + local(otel::start_local_active_span("simultaneous")) + }) + + sync(p2) + sync(p1) + })[["traces"]] + + ## Debug span output; name, id, parent + # ignore <- Map( + # spans, + # format(names(spans), justify = "right"), + # f = function(span, name) { + # message(name, " - ", span$span_id, " - ", span$parent) + # } + # ) + + # Check we have two top-level extra spans + expect_length(spans[names(spans) == "concurrent"], 1L) + expect_length(spans[names(spans) == "simultaneous"], 1L) + expect_equal(spans[["concurrent"]]$parent, spans[["simultaneous"]]$parent) + + # Check we have two top-level "invoke_agent" spans (one for each chat() + # invocation) that start their respective traces. + agent_spans <- Filter(function(x) x$name == "invoke_agent", spans) + expect_length(agent_spans, 2L) + expect_equal(agent_spans[[1]]$parent, spans[["concurrent"]]$parent) + expect_equal(agent_spans[[2]]$parent, spans[["concurrent"]]$parent) + + agent_span_ids <- sapply(agent_spans, function(x) x$span_id) + + # We should have two "execute_tool" spans (one for each tool invocation) + # that are children of the agent spans. + tool_spans <- Filter(function(x) startsWith(x$name, "execute_tool"), spans) + expect_gte(length(tool_spans), 1L) + expect_true(all(vapply( + tool_spans, + function(x) x$parent %in% agent_span_ids, + logical(1) + ))) + + # And four "chat" spans that correspond to model calls before and after each + # tool call -- these are also children of the agent spans and siblings of one + # another. + chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) + expect_gte(length(chat_spans), 2 * length(tool_spans)) + expect_true(all(vapply( + chat_spans, + function(x) x$parent %in% agent_span_ids, + logical(1) + ))) + + # Ensure we record the result (and therefore set the status) of chat spans. + expect_true(all(vapply(chat_spans, function(x) x$status == "ok", logical(1)))) + + # We should also get some underlying HTTP spans that correspond to the model + # calls. + chat_span_ids <- sapply(chat_spans, function(x) x$span_id) + http_spans <- Filter(function(x) startsWith(x$name, "POST"), spans) + expect_true(all(vapply( + http_spans, + function(x) x$parent %in% chat_span_ids, + logical(1) + ))) +}) + +test_that("tracing works as expected for asynchronous streams", { + skip_if_not_installed("otelsdk") + + # Capture spans when an async stream is used in concert with other + # synchronous and asynchronous spans. + spans <- with_otel_record({ + chat <- chat_openai_test(system_prompt = "Be terse.") + stream <- chat$stream_async("What's your favourite Apache Spark feature?") + p <- promises::promise(function(resolve, reject) { + span <- local(otel::start_span("concurrent")) + later::later( + function() { + on.exit(span$end()) + otel::with_active_span(span, { + resolve(NULL) + }) + }, + 0.1 + ) + }) + local(otel::start_local_active_span("simultaneous")) + sync(p) + sync(coro::async_collect(stream)) + })[["traces"]] + + # Check we have one top-level "invoke_agent" span. + expect_length(spans[names(spans) == "invoke_agent"], 1L) + + # And one child span for the stream (confirming that it ends when collected). + chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) + expect_length(chat_spans, 1L) + expect_equal(chat_spans[[1L]]$parent, spans[["invoke_agent"]]$span_id) + + # Verify that the spans started when the stream was suspended are not part of + # the agent trace. + expect_equal(spans[["concurrent"]]$parent, spans[["invoke_agent"]]$parent) + expect_equal(spans[["simultaneous"]]$parent, spans[["invoke_agent"]]$parent) + + # But that HTTP spans are. + chat_span_ids <- sapply(chat_spans, function(x) x$span_id) + http_spans <- Filter(function(x) startsWith(x$name, "POST"), spans) + expect_true(all(vapply( + http_spans, + function(x) x$parent %in% chat_span_ids, + logical(1) + ))) +})