From 6429125b225fff6a323798d4aa8bf7ff42d24386 Mon Sep 17 00:00:00 2001 From: Aaron Jacobs Date: Thu, 22 May 2025 15:12:07 -0400 Subject: [PATCH 01/33] Add Open Telemetry instrumentation. This commit instruments various operations with Open Telemetry spans that abide by the (still nascent) semantic conventions for Generative AI clients [0]. These conventions classify `ellmer` chatbots as "agents" due to their ability to run tool calls, so in fact there are three types of span: (1) a top-level `invoke_agent` span for each chat interaction; (2) `chat` spans that wrap model API calls; and (3) `execute_tool` spans that wrap tool calls on our end. There's currently no community concensus for how to attach turns to spans, so I've left that out for now. Example code: library(otelsdk) Sys.setenv(OTEL_TRACES_EXPORTER = "stderr") chat <- ellmer::chat_databricks(model = "databricks-claude-3-7-sonnet") chat$chat("Tell me a joke in the form of an SQL query.") Unit tests are included. [0]: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/ Signed-off-by: Aaron Jacobs --- DESCRIPTION | 6 + R/chat.R | 11 +- R/content-tools.R | 5 + R/otel.R | 142 ++++++++++++++++++++++++ tests/testthat/test-otel.R | 220 +++++++++++++++++++++++++++++++++++++ 5 files changed, 383 insertions(+), 1 deletion(-) create mode 100644 R/otel.R create mode 100644 tests/testthat/test-otel.R diff --git a/DESCRIPTION b/DESCRIPTION index 28ca9b3b4..3af052c2a 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -40,6 +40,8 @@ Suggests: knitr, magick, openssl, + otel (>= 0.0.0.9000), + otelsdk (>= 0.0.0.9000), paws.common, rmarkdown, shiny, @@ -78,6 +80,7 @@ Collate: 'import-standalone-purrr.R' 'import-standalone-types-check.R' 'interpolate.R' + 'otel.R' 'params.R' 'provider-openai.R' 'provider-azure.R' @@ -108,3 +111,6 @@ Collate: 'utils-prettytime.R' 'utils.R' 'zzz.R' +Remotes: + r-lib/otel, + r-lib/otelsdk diff --git a/R/chat.R b/R/chat.R index 2fdb4c3c7..27ecd8c03 100644 --- a/R/chat.R +++ b/R/chat.R @@ -492,6 +492,7 @@ Chat <- R6::R6Class( ) { tool_errors <- list() withr::defer(warn_tool_errors(tool_errors)) + start_agent_span(private$provider) while (!is.null(user_turn)) { assistant_chunks <- private$submit_turns( @@ -551,6 +552,8 @@ Chat <- R6::R6Class( ) { tool_errors <- list() withr::defer(warn_tool_errors(tool_errors)) + span <- start_agent_span(private$provider, active = FALSE) + withr::defer(span$end()) while (!is.null(user_turn)) { assistant_chunks <- private$submit_turns_async( @@ -627,7 +630,7 @@ Chat <- R6::R6Class( if (echo == "all") { cat_line(format(user_turn), prefix = "> ") } - + span <- start_chat_span(private$provider) response <- chat_perform( provider = private$provider, mode = if (stream) "stream" else "value", @@ -654,9 +657,11 @@ Chat <- R6::R6Class( result <- stream_merge_chunks(private$provider, result, chunk) } + record_chat_span_status(span, result) turn <- value_turn(private$provider, result, has_type = !is.null(type)) turn <- match_tools(turn, private$tools) } else { + record_chat_span_status(span, response) turn <- value_turn( private$provider, response, @@ -709,6 +714,8 @@ Chat <- R6::R6Class( type = NULL, yield_as_content = FALSE ) { + span <- start_chat_span(private$provider, active = FALSE) + withr::defer(span$end()) response <- chat_perform( provider = private$provider, mode = if (stream) "async-stream" else "async-value", @@ -735,10 +742,12 @@ Chat <- R6::R6Class( result <- stream_merge_chunks(private$provider, result, chunk) } + record_chat_span_status(span, result) turn <- value_turn(private$provider, result, has_type = !is.null(type)) } else { result <- await(response) + record_chat_span_status(span, result) turn <- value_turn(private$provider, result, has_type = !is.null(type)) text <- turn@text if (!is.null(text)) { diff --git a/R/content-tools.R b/R/content-tools.R index a35b47da2..8501e40e7 100644 --- a/R/content-tools.R +++ b/R/content-tools.R @@ -145,12 +145,14 @@ invoke_tool <- function(request) { return(args) } + span <- start_tool_span(request) tryCatch( { result <- do.call(request@tool@fun, args) new_tool_result(request, result) }, error = function(e) { + record_tool_error(span, e) new_tool_result(request, error = e) } ) @@ -168,12 +170,15 @@ on_load( return(args) } + span <- start_tool_span(request, active = FALSE) + withr::defer(span$end()) tryCatch( { result <- await(do.call(request@tool@fun, args)) new_tool_result(request, result) }, error = function(e) { + record_tool_error(span, e) new_tool_result(request, error = e) } ) diff --git a/R/otel.R b/R/otel.R new file mode 100644 index 000000000..9072383dc --- /dev/null +++ b/R/otel.R @@ -0,0 +1,142 @@ +# Starts an Open Telemetry span that abides by the semantic conventions for +# Generative AI completions. +# +# See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference +start_chat_span <- function( + provider, + tracer = default_tracer(), + scope = parent.frame(), + active = TRUE +) { + if (is.null(tracer) || !tracer$is_enabled()) { + return(NULL) + } + # Ensure we set attributes relevant to sampling at span creation time. + attributes <- list( + "gen_ai.operation.name" = "chat", + "gen_ai.system" = tolower(provider@name), + "gen_ai.request.model" = provider@model + ) + if (active) { + tracer$start_span( + name = sprintf("chat %s", provider@model), + options = list(kind = "CLIENT"), + attributes = attributes, + scope = scope + ) + } else { + tracer$start_session( + name = sprintf("chat %s", provider@model), + options = list(kind = "CLIENT"), + attributes = attributes, + session_scope = scope + ) + } +} + +record_chat_span_status <- function(span, result) { + if (is.null(span) || !span$is_recording()) { + return(invisible(span)) + } + if (!is.null(result$model)) { + span$set_attribute("gen_ai.response.model", result$model) + } + if (!is.null(result$id)) { + span$set_attribute("gen_ai.response.id", result$id) + } + if (!is.null(result$usage)) { + span$set_attribute("gen_ai.usage.input_tokens", result$usage$prompt_tokens) + span$set_attribute( + "gen_ai.usage.output_tokens", + result$usage$completion_tokens + ) + } + # TODO: Consider setting gen_ai.response.finish_reasons. + span$set_status("ok") +} + +# Starts an Open Telemetry span that abides by the semantic conventions for +# Generative AI tool calls. +# +# See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span +start_tool_span <- function( + request, + tracer = default_tracer(), + scope = parent.frame(), + active = TRUE +) { + if (is.null(tracer) || !tracer$is_enabled()) { + return(NULL) + } + attributes <- compact(list( + "gen_ai.operation.name" = "execute_tool", + "gen_ai.tool.name" = request@tool@name, + "gen_ai.tool.description" = request@tool@description, + "gen_ai.tool.call.id" = request@id + )) + if (active) { + tracer$start_span( + name = sprintf("execute_tool %s", request@tool@name), + options = list(kind = "INTERNAL"), + attributes = attributes, + scope = scope + ) + } else { + tracer$start_session( + name = sprintf("execute_tool %s", request@tool@name), + options = list(kind = "INTERNAL"), + attributes = attributes, + session_scope = scope + ) + } +} + +record_tool_error <- function(span, error) { + if (is.null(span) || !span$is_recording()) { + return() + } + span$record_exception(error) + span$set_status("error") + span$set_attribute("error.type", class(error)[1]) +} + +# Starts an Open Telemetry span that abides by the semantic conventions for +# Generative AI "agents". +# +# See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference +start_agent_span <- function( + provider, + tracer = default_tracer(), + scope = parent.frame(), + active = TRUE +) { + if (is.null(tracer) || !tracer$is_enabled()) { + return(NULL) + } + attributes <- list( + "gen_ai.operation.name" = "chat", + "gen_ai.system" = tolower(provider@name) + ) + if (active) { + tracer$start_span( + name = "invoke_agent", + options = list(kind = "CLIENT"), + attributes = attributes, + scope = scope + ) + } else { + tracer$start_session( + name = "invoke_agent", + options = list(kind = "CLIENT"), + attributes = attributes, + session_scope = scope + ) + } +} + +default_tracer <- function() { + if (!is_installed("otel")) { + return(NULL) + } + otel::get_tracer("ellmer") +} diff --git a/tests/testthat/test-otel.R b/tests/testthat/test-otel.R new file mode 100644 index 000000000..c8b28d41b --- /dev/null +++ b/tests/testthat/test-otel.R @@ -0,0 +1,220 @@ +test_that("tracing works as expected for synchronous chats", { + skip_if_not_installed("otelsdk") + + # Capture spans from a typical synchronous chat with tool calls. + spans <- otelsdk::with_otel_record({ + test_tools_simple(chat_openai_test) + })[["traces"]] + + # Check we have two top-level "invoke_agent" spans (one for each chat() + # invocation) that start their respective traces. + agent_spans <- Filter(function(x) x$name == "invoke_agent", spans) + expect_length(agent_spans, 2L) + expect_true(all(vapply( + agent_spans, + function(x) identical(x$parent, "0000000000000000"), + logical(1) + ))) + root_spans <- sapply(agent_spans, function(x) x$span_id) + + # We should have two "execute_tool" spans (one for each tool invocation) + # that are children of the agent spans. + tool_spans <- Filter(function(x) startsWith(x$name, "execute_tool"), spans) + expect_length(tool_spans, 2L) + expect_true(all(vapply( + tool_spans, + function(x) x$parent %in% root_spans, + logical(1) + ))) + + # And four "chat" spans that correspond to model calls before and after each + # tool call -- these are also children of the agent spans and siblings of one + # another. + chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) + expect_length(chat_spans, 4L) + expect_true(all(vapply( + chat_spans, + function(x) x$parent %in% root_spans, + logical(1) + ))) + + # Ensure we record the result (and therefore set the status) of chat spans. + expect_true(all(vapply(chat_spans, function(x) x$status == "ok", logical(1)))) + + # We should also get some underlying HTTP spans that correspond to the model + # calls. + chat_span_ids <- sapply(chat_spans, function(x) x$span_id) + http_spans <- Filter(function(x) startsWith(x$name, "POST"), spans) + expect_true(all(vapply( + http_spans, + function(x) x$parent %in% chat_span_ids, + logical(1) + ))) +}) + +test_that("tracing works as expected for synchronous streams", { + skip_if_not_installed("otelsdk") + + # Capture spans when a stream is suspended. + spans <- otelsdk::with_otel_record({ + chat <- chat_openai_test(system_prompt = "Be terse.") + stream <- chat$stream("What's your favourite Apache Spark feature?") + local(otel::start_span("similtaneous")) + coro::collect(stream) + })[["traces"]] + + # Check we have one top-level "invoke_agent" span. + expect_equal(spans[["invoke_agent"]]$parent, "0000000000000000") + + # And one child span for the stream (confirming that it ends when collected). + chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) + expect_length(chat_spans, 1L) + expect_true(all(vapply( + chat_spans, + function(x) identical(x$parent, spans[["invoke_agent"]]$span_id), + logical(1) + ))) + + # Verify that the span started when the stream was suspended is not part of + # the agent trace. + expect_equal(spans[["similtaneous"]]$parent, "0000000000000000") + + # But that HTTP spans are. + chat_span_ids <- sapply(chat_spans, function(x) x$span_id) + http_spans <- Filter(function(x) startsWith(x$name, "POST"), spans) + expect_true(all(vapply( + http_spans, + function(x) x$parent %in% chat_span_ids, + logical(1) + ))) +}) + +test_that("tracing works as expected for asynchronous chats", { + skip_if_not_installed("otelsdk") + + chat <- chat_openai_test( + system_prompt = "Always use a tool to answer. Reply with 'It is ____.'." + ) + chat$register_tool(tool( + coro::async(function() "2024-01-01"), + "Return the current date", + .name = "current_date" + )) + + # Capture spans for an async chat with async tool calls interleaved with + # other synchronous and asynchronous spans. + spans <- otelsdk::with_otel_record({ + p1 <- chat$chat_async("What's the current date in Y-M-D format?") |> + promises::then(function(result) { + chat$chat_async("What date will it be 47 days from now?") + }) + p2 <- promises::promise(function(resolve, reject) { + span <- otel::start_session("concurrent") + later::later( + function() { + on.exit(span$end()) + otel::with_session(resolve(NULL), span) + }, + 0.1 + ) + }) + local(otel::start_span("similtaneous")) + sync(p2) + sync(p1) + })[["traces"]] + + # Check we have two top-level "invoke_agent" spans (one for each chat() + # invocation) that start their respective traces. + agent_spans <- Filter(function(x) x$name == "invoke_agent", spans) + expect_length(agent_spans, 2L) + expect_true(all(vapply( + agent_spans, + function(x) identical(x$parent, "0000000000000000"), + logical(1) + ))) + root_spans <- sapply(agent_spans, function(x) x$span_id) + + # We should have two "execute_tool" spans (one for each tool invocation) + # that are children of the agent spans. + tool_spans <- Filter(function(x) startsWith(x$name, "execute_tool"), spans) + expect_length(tool_spans, 2L) + expect_true(all(vapply( + tool_spans, + function(x) x$parent %in% root_spans, + logical(1) + ))) + + # And four "chat" spans that correspond to model calls before and after each + # tool call -- these are also children of the agent spans and siblings of one + # another. + chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) + expect_length(chat_spans, 4L) + expect_true(all(vapply( + chat_spans, + function(x) x$parent %in% root_spans, + logical(1) + ))) + + # Ensure we record the result (and therefore set the status) of chat spans. + expect_true(all(vapply(chat_spans, function(x) x$status == "ok", logical(1)))) + + # We should also get some underlying HTTP spans that correspond to the model + # calls. + chat_span_ids <- sapply(chat_spans, function(x) x$span_id) + http_spans <- Filter(function(x) startsWith(x$name, "POST"), spans) + expect_true(all(vapply( + http_spans, + function(x) x$parent %in% chat_span_ids, + logical(1) + ))) +}) + +test_that("tracing works as expected for asynchronous streams", { + skip_if_not_installed("otelsdk") + + # Capture spans when an async stream is used in concert with other + # synchronous and asynchronous spans. + spans <- otelsdk::with_otel_record({ + chat <- chat_openai_test(system_prompt = "Be terse.") + stream <- chat$stream_async("What's your favourite Apache Spark feature?") + p <- promises::promise(function(resolve, reject) { + span <- local(otel::start_session("concurrent")) + later::later( + function() { + on.exit(span$end()) + otel::with_session(resolve(NULL), span) + }, + 0.1 + ) + }) + local(otel::start_span("similtaneous")) + sync(p) + sync(coro::async_collect(stream)) + })[["traces"]] + + # Check we have one top-level "invoke_agent" span. + expect_equal(spans[["invoke_agent"]]$parent, "0000000000000000") + + # And one child span for the stream (confirming that it ends when collected). + chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) + expect_length(chat_spans, 1L) + expect_true(all(vapply( + chat_spans, + function(x) identical(x$parent, spans[["invoke_agent"]]$span_id), + logical(1) + ))) + + # Verify that the spans started when the stream was suspended are not part of + # the agent trace. + expect_equal(spans[["concurrent"]]$parent, "0000000000000000") + expect_equal(spans[["similtaneous"]]$parent, "0000000000000000") + + # But that HTTP spans are. + chat_span_ids <- sapply(chat_spans, function(x) x$span_id) + http_spans <- Filter(function(x) startsWith(x$name, "POST"), spans) + expect_true(all(vapply( + http_spans, + function(x) x$parent %in% chat_span_ids, + logical(1) + ))) +}) From a710ce26f19e4f17a7b38c6db802471ae6e980f0 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Mon, 8 Sep 2025 09:51:28 -0400 Subject: [PATCH 02/33] `usethis::use_tidy_description()` --- DESCRIPTION | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index cae4196de..217c7ba9a 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -54,6 +54,9 @@ Suggests: withr VignetteBuilder: knitr +Remotes: + r-lib/otel, + r-lib/otelsdk Config/Needs/website: tidyverse/tidytemplate, rmarkdown Config/testthat/edition: 3 Config/testthat/parallel: true @@ -117,6 +120,3 @@ Collate: 'utils-prettytime.R' 'utils.R' 'zzz.R' -Remotes: - r-lib/otel, - r-lib/otelsdk From bf51c6774abd89ddb3021cbdc8923deae4008492 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 9 Sep 2025 14:10:29 -0400 Subject: [PATCH 03/33] Updates to the latest promises ospans Co-Authored-By: Aaron Jacobs --- DESCRIPTION | 7 +- R/chat-tools.R | 29 ++-- R/chat.R | 92 +++++++++---- R/otel.R | 273 ++++++++++++++++++++++++------------- man/batch_chat.Rd | 2 +- man/chat_azure_openai.Rd | 2 +- man/chat_snowflake.Rd | 2 +- man/ellmer-package.Rd | 2 +- tests/testthat/test-otel.R | 50 ++++--- 9 files changed, 298 insertions(+), 161 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 217c7ba9a..e278183bd 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -30,7 +30,7 @@ Imports: jsonlite, later (>= 1.4.0), lifecycle, - promises (>= 1.3.1), + promises (>= 1.3.3.9001), R6, rlang (>= 1.1.0), S7 (>= 0.2.0) @@ -55,6 +55,7 @@ Suggests: VignetteBuilder: knitr Remotes: + rstudio/promises, r-lib/otel, r-lib/otelsdk Config/Needs/website: tidyverse/tidytemplate, rmarkdown @@ -63,7 +64,7 @@ Config/testthat/parallel: true Config/testthat/start-first: chat, provider* Encoding: UTF-8 Roxygen: list(markdown = TRUE) -RoxygenNote: 7.3.2 +RoxygenNote: 7.3.3 Collate: 'utils-S7.R' 'types.R' @@ -87,8 +88,8 @@ Collate: 'import-standalone-purrr.R' 'import-standalone-types-check.R' 'interpolate.R' - 'otel.R' 'live.R' + 'otel.R' 'parallel-chat.R' 'params.R' 'provider-anthropic.R' diff --git a/R/chat-tools.R b/R/chat-tools.R index 52feb048a..b7f8d133c 100644 --- a/R/chat-tools.R +++ b/R/chat-tools.R @@ -33,7 +33,8 @@ on_load({ echo = "none", on_tool_request = function(request) invisible(), on_tool_result = function(result) invisible(), - yield_request = FALSE + yield_request = FALSE, + parent_ospan = NULL ) { tool_requests <- extract_tool_requests(turn) @@ -51,7 +52,7 @@ on_load({ next } - result <- invoke_tool(request) + result <- invoke_tool(request, parent_ospan = parent_ospan) if (promises::is.promise(result@value)) { cli::cli_abort( @@ -78,7 +79,8 @@ on_load({ echo = "none", on_tool_request = function(request) invisible(), on_tool_result = function(result) invisible(), - yield_request = FALSE + yield_request = FALSE, + parent_ospan = NULL ) { tool_requests <- extract_tool_requests(turn) @@ -94,7 +96,7 @@ on_load({ return(rejected) } - result <- coro::await(invoke_tool_async(request)) + result <- coro::await(invoke_tool_async(request, parent_ospan)) maybe_echo_tool(result, echo = echo) on_tool_result(result) @@ -144,7 +146,7 @@ new_tool_result <- function(request, result = NULL, error = NULL) { } # Also need to handle edge cases: https://platform.openai.com/docs/guides/function-calling/edge-cases -invoke_tool <- function(request) { +invoke_tool <- function(request, parent_ospan = NULL) { if (is.null(request@tool)) { return(new_tool_result(request, error = "Unknown tool")) } @@ -155,21 +157,23 @@ invoke_tool <- function(request) { return(args) } - span <- start_tool_span(request) + tool_ospan <- create_tool_ospan(request, parent_ospan = parent_ospan) + activate_and_cleanup_ospan(tool_ospan, ospan_promise_domain = FALSE) + tryCatch( { result <- do.call(request@tool, args) new_tool_result(request, result) }, error = function(e) { - record_tool_error(span, e) + record_tool_ospan_error(tool_ospan, e) new_tool_result(request, error = e) } ) } on_load( - invoke_tool_async <- coro::async(function(request) { + invoke_tool_async <- coro::async(function(request, parent_ospan = NULL) { if (is.null(request@tool)) { return(new_tool_result(request, error = "Unknown tool")) } @@ -180,15 +184,18 @@ on_load( return(args) } - span <- start_tool_span(request, active = FALSE) - withr::defer(span$end()) + tool_ospan <- create_tool_ospan(request, parent_ospan = parent_ospan) + # Must activate the span in a promise domain so that it propagates to + # async calls made by the tool function. + activate_and_cleanup_ospan(tool_ospan, ospan_promise_domain = TRUE) + tryCatch( { result <- await(do.call(request@tool, args)) new_tool_result(request, result) }, error = function(e) { - record_tool_error(span, e) + record_tool_ospan_error(tool_ospan, e) new_tool_result(request, error = e) } ) diff --git a/R/chat.R b/R/chat.R index e50a8dbee..5d4cc7802 100644 --- a/R/chat.R +++ b/R/chat.R @@ -490,14 +490,17 @@ Chat <- R6::R6Class( ) { tool_errors <- list() withr::defer(warn_tool_errors(tool_errors)) - start_agent_span(private$provider) + + agent_ospan <- create_agent_ospan(private$provider) + withr::defer(promises::end_ospan(agent_ospan)) while (!is.null(user_turn)) { assistant_chunks <- private$submit_turns( user_turn, stream = stream, echo = echo, - yield_as_content = yield_as_content + yield_as_content = yield_as_content, + parent_ospan = agent_ospan ) for (chunk in assistant_chunks) { yield(chunk) @@ -512,7 +515,8 @@ Chat <- R6::R6Class( echo = echo, on_tool_request = private$callback_on_tool_request$invoke, on_tool_result = private$callback_on_tool_result$invoke, - yield_request = yield_as_content + yield_request = yield_as_content, + parent_ospan = agent_ospan ) tool_results <- list() @@ -550,15 +554,18 @@ Chat <- R6::R6Class( ) { tool_errors <- list() withr::defer(warn_tool_errors(tool_errors)) - span <- start_agent_span(private$provider, active = FALSE) - withr::defer(span$end()) + + agent_ospan <- create_agent_ospan(private$provider) + withr::defer(promises::end_ospan(agent_ospan)) + # local_ospan_promise_domain() while (!is.null(user_turn)) { assistant_chunks <- private$submit_turns_async( user_turn, stream = stream, echo = echo, - yield_as_content = yield_as_content + yield_as_content = yield_as_content, + parent_ospan = agent_ospan ) for (chunk in await_each(assistant_chunks)) { yield(chunk) @@ -573,11 +580,12 @@ Chat <- R6::R6Class( echo = echo, on_tool_request = private$callback_on_tool_request$invoke_async, on_tool_result = private$callback_on_tool_result$invoke_async, - yield_request = yield_as_content + yield_request = yield_as_content, + parent_ospan = agent_ospan ) if (tool_mode == "sequential") { tool_results <- list() - for (tool_step in coro::await_each(tool_calls)) { + for (tool_step in await_each(tool_calls)) { if (yield_as_content) { yield(tool_step) } @@ -586,7 +594,9 @@ Chat <- R6::R6Class( } } } else { + # otel::with_active_span(agent_ospan, { tool_results <- coro::collect(tool_calls) + # }) if (yield_as_content) { # Filter out and yield tool requests before awaiting tool results is_request <- map_lgl(tool_results, is_tool_request) @@ -623,12 +633,18 @@ Chat <- R6::R6Class( stream, echo, type = NULL, - yield_as_content = FALSE + yield_as_content = FALSE, + parent_ospan = NULL ) { if (echo == "all") { cat_line(format(user_turn), prefix = "> ") } - span <- start_chat_span(private$provider) + chat_ospan <- create_chat_ospan( + private$provider, + parent_ospan = parent_ospan + ) + activate_and_cleanup_ospan(chat_ospan) + response <- chat_perform( provider = private$provider, mode = if (stream) "stream" else "value", @@ -655,11 +671,15 @@ Chat <- R6::R6Class( result <- stream_merge_chunks(private$provider, result, chunk) } - record_chat_span_status(span, result) - turn <- value_turn(private$provider, result, has_type = !is.null(type)) + record_chat_ospan_status(chat_ospan, result) + turn <- value_turn( + private$provider, + result, + has_type = !is.null(type) + ) turn <- match_tools(turn, private$tools) } else { - record_chat_span_status(span, response) + record_chat_ospan_status(chat_ospan, response) turn <- value_turn( private$provider, response, @@ -710,23 +730,33 @@ Chat <- R6::R6Class( stream, echo, type = NULL, - yield_as_content = FALSE + yield_as_content = FALSE, + parent_ospan = NULL ) { - span <- start_chat_span(private$provider, active = FALSE) - withr::defer(span$end()) - response <- chat_perform( - provider = private$provider, - mode = if (stream) "async-stream" else "async-value", - turns = c(private$.turns, list(user_turn)), - tools = if (is.null(type)) private$tools, - type = type + chat_ospan <- create_chat_ospan( + private$provider, + parent_ospan = parent_ospan ) + withr::defer(promises::end_ospan(chat_ospan)) + # activate_and_cleanup_ospan(chat_ospan) + + promises::with_ospan_promise_domain({ + otel::with_active_span(chat_ospan, { + response <- chat_perform( + provider = private$provider, + mode = if (stream) "async-stream" else "async-value", + turns = c(private$.turns, list(user_turn)), + tools = if (is.null(type)) private$tools, + type = type + ) + }) + }) emit <- emitter(echo) any_text <- FALSE if (stream) { result <- NULL - for (chunk in await_each(response)) { + for (chunk in coro::await_each(response)) { text <- stream_text(private$provider, chunk) if (!is.null(text)) { emit(text) @@ -740,13 +770,21 @@ Chat <- R6::R6Class( result <- stream_merge_chunks(private$provider, result, chunk) } - record_chat_span_status(span, result) - turn <- value_turn(private$provider, result, has_type = !is.null(type)) + record_chat_ospan_status(chat_ospan, result) + turn <- value_turn( + private$provider, + result, + has_type = !is.null(type) + ) } else { result <- await(response) - record_chat_span_status(span, result) - turn <- value_turn(private$provider, result, has_type = !is.null(type)) + record_chat_ospan_status(chat_ospan, result) + turn <- value_turn( + private$provider, + result, + has_type = !is.null(type) + ) text <- turn@text if (!is.null(text)) { emit(text) diff --git a/R/otel.R b/R/otel.R index 9072383dc..cc4c23221 100644 --- a/R/otel.R +++ b/R/otel.R @@ -1,40 +1,97 @@ -# Starts an Open Telemetry span that abides by the semantic conventions for -# Generative AI completions. -# -# See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference -start_chat_span <- function( - provider, - tracer = default_tracer(), - scope = parent.frame(), - active = TRUE -) { - if (is.null(tracer) || !tracer$is_enabled()) { +default_tracer <- function() { + if (!is_installed("otel")) { return(NULL) } - # Ensure we set attributes relevant to sampling at span creation time. - attributes <- list( - "gen_ai.operation.name" = "chat", - "gen_ai.system" = tolower(provider@name), - "gen_ai.request.model" = provider@model - ) - if (active) { - tracer$start_span( - name = sprintf("chat %s", provider@model), - options = list(kind = "CLIENT"), - attributes = attributes, - scope = scope + otel::get_tracer("ellmer") +} + +activate_and_cleanup_ospan <- function( + ospan, + activation_scope = parent.frame(), + ospan_promise_domain = TRUE +) { + if (!is.null(ospan)) { + if (ospan_promise_domain) { + local_ospan_promise_domain() + } + otel::local_active_span( + ospan, + end_on_exit = TRUE, + activation_scope = activation_scope ) + } +} + +local_ospan_promise_domain <- function(.local_envir = parent.frame()) { + local_promise_domain( + promises:::create_otel_ospan_handoff_promise_domain(), + .local_envir = .local_envir + ) +} + +local_promise_domain <- function( + domain, + .local_envir = parent.frame(), + replace = FALSE +) { + oldval <- promises:::current_promise_domain() + globals <- promises:::globals + if (replace) { + globals$domain <- domain } else { - tracer$start_session( - name = sprintf("chat %s", provider@model), - options = list(kind = "CLIENT"), - attributes = attributes, - session_scope = scope - ) + globals$domain <- promises:::compose_domains(oldval, domain) } + withr::defer( + { + globals$domain <- oldval + }, + envir = .local_envir + ) + + invisible() +} + +create_chat_ospan <- function( + provider, + parent_ospan = NULL, + tracer = default_tracer() +) { + promises::create_ospan( + sprintf("chat %s", provider@model), + tracer = tracer, + options = list( + parent = parent_ospan, + kind = "CLIENT" + ), + attributes = list( + "gen_ai.operation.name" = "chat", + "gen_ai.system" = tolower(provider@name), + "gen_ai.request.model" = provider@model + ) + ) } -record_chat_span_status <- function(span, result) { +# # Starts an Open Telemetry span that abides by the semantic conventions for +# # Generative AI completions. +# # +# # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference +# with_chat_ospan_async <- function(provider, expr, tracer = default_tracer()) { +# promises::with_ospan_promise_domain({ +# promises::with_ospan_async( +# sprintf("chat %s", provider@model), +# expr, +# tracer = tracer, +# options = list(kind = "CLIENT"), +# attributes = list( +# "gen_ai.operation.name" = "chat", +# "gen_ai.system" = tolower(provider@name), +# "gen_ai.request.model" = provider@model +# ) +# ) +# }) +# } + +record_chat_ospan_status <- function(span, result) { if (is.null(span) || !span$is_recording()) { return(invisible(span)) } @@ -44,54 +101,61 @@ record_chat_span_status <- function(span, result) { if (!is.null(result$id)) { span$set_attribute("gen_ai.response.id", result$id) } - if (!is.null(result$usage)) { - span$set_attribute("gen_ai.usage.input_tokens", result$usage$prompt_tokens) - span$set_attribute( - "gen_ai.usage.output_tokens", - result$usage$completion_tokens - ) - } + # if (!is.null(result$usage)) { + # span$set_attribute("gen_ai.usage.input_tokens", result$usage$prompt_tokens) + # span$set_attribute( + # "gen_ai.usage.output_tokens", + # result$usage$completion_tokens + # ) + # } # TODO: Consider setting gen_ai.response.finish_reasons. span$set_status("ok") } + # Starts an Open Telemetry span that abides by the semantic conventions for # Generative AI tool calls. # # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span -start_tool_span <- function( +create_tool_ospan <- function( request, - tracer = default_tracer(), - scope = parent.frame(), - active = TRUE + parent_ospan = NULL, + tracer = default_tracer() ) { - if (is.null(tracer) || !tracer$is_enabled()) { - return(NULL) - } - attributes <- compact(list( - "gen_ai.operation.name" = "execute_tool", - "gen_ai.tool.name" = request@tool@name, - "gen_ai.tool.description" = request@tool@description, - "gen_ai.tool.call.id" = request@id - )) - if (active) { - tracer$start_span( - name = sprintf("execute_tool %s", request@tool@name), - options = list(kind = "INTERNAL"), - attributes = attributes, - scope = scope - ) - } else { - tracer$start_session( - name = sprintf("execute_tool %s", request@tool@name), - options = list(kind = "INTERNAL"), - attributes = attributes, - session_scope = scope - ) - } + promises::create_ospan( + sprintf("execute_tool %s", request@tool@name), + tracer = tracer, + options = list( + parent = parent_ospan, + kind = "INTERNAL" + ), + attributes = compact(list( + "gen_ai.operation.name" = "execute_tool", + "gen_ai.tool.name" = request@tool@name, + "gen_ai.tool.description" = request@tool@description, + "gen_ai.tool.call.id" = request@id + )) + ) } -record_tool_error <- function(span, error) { +# with_tool_ospan_async <- function(request, expr, tracer = default_tracer()) { +# promises::with_ospan_promise_domain({ +# promises::with_ospan_async( +# sprintf("execute_tool %s", request@tool@name), +# expr, +# tracer = tracer, +# options = list(kind = "INTERNAL"), +# attributes = compact(list( +# "gen_ai.operation.name" = "execute_tool", +# "gen_ai.tool.name" = request@tool@name, +# "gen_ai.tool.description" = request@tool@description, +# "gen_ai.tool.call.id" = request@id +# )) +# ) +# }) +# } + +record_tool_ospan_error <- function(span, error) { if (is.null(span) || !span$is_recording()) { return() } @@ -100,43 +164,62 @@ record_tool_error <- function(span, error) { span$set_attribute("error.type", class(error)[1]) } + # Starts an Open Telemetry span that abides by the semantic conventions for # Generative AI "agents". # # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference -start_agent_span <- function( +create_agent_ospan <- function( provider, - tracer = default_tracer(), - scope = parent.frame(), - active = TRUE + tracer = default_tracer() ) { - if (is.null(tracer) || !tracer$is_enabled()) { - return(NULL) - } - attributes <- list( - "gen_ai.operation.name" = "chat", - "gen_ai.system" = tolower(provider@name) - ) - if (active) { - tracer$start_span( - name = "invoke_agent", - options = list(kind = "CLIENT"), - attributes = attributes, - scope = scope - ) - } else { - tracer$start_session( - name = "invoke_agent", - options = list(kind = "CLIENT"), - attributes = attributes, - session_scope = scope + promises::create_ospan( + "invoke_agent", + tracer = tracer, + options = list(kind = "CLIENT"), + attributes = list( + "gen_ai.operation.name" = "chat", + "gen_ai.system" = tolower(provider@name) ) - } + ) } -default_tracer <- function() { - if (!is_installed("otel")) { - return(NULL) +# with_agent_ospan_async <- function(provider, expr, tracer = default_tracer()) { +# promises::with_ospan_promise_domain({ +# promises::with_ospan_async( +# "invoke_agent", +# expr, +# tracer = tracer, +# options = list(kind = "CLIENT"), +# attributes = list( +# "gen_ai.operation.name" = "chat", +# "gen_ai.system" = tolower(provider@name) +# ) +# ) +# }) +# } + +# ------------------------ + +gen_adapt_map <- coro::generator(function(.i, .fn, ...) { + for (x in .i) { + yield(.fn(x, ...)) } - otel::get_tracer("ellmer") +}) + + +# prev_gen %>% gen_adapt_ospan(chat_ospan) +gen_adapt_ospan <- function(x, ospan) { + gen_adapt_map(x, function(xi) { + local_ospan_promise_domain() + otel::local_active_span(ospan) + message("Activated ospan. : ", ospan$name, " - ", ospan$span_id) + withr::defer(message( + "deactivating ospan : ", + ospan$name, + " - ", + ospan$span_id + )) + force(xi) + }) } diff --git a/man/batch_chat.Rd b/man/batch_chat.Rd index eb77547e5..e57ee6db7 100644 --- a/man/batch_chat.Rd +++ b/man/batch_chat.Rd @@ -95,7 +95,7 @@ errors in the most helpful way. Fortunately they don't seem to be common, but if you have ideas, please let me know! } \examples{ -\dontshow{if (has_credentials("openai")) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} +\dontshow{if (has_credentials("openai")) withAutoprint(\{ # examplesIf} chat <- chat_openai(model = "gpt-4.1-nano") # Chat ---------------------------------------------------------------------- diff --git a/man/chat_azure_openai.Rd b/man/chat_azure_openai.Rd index fa7e6d617..01a981cf6 100644 --- a/man/chat_azure_openai.Rd +++ b/man/chat_azure_openai.Rd @@ -90,7 +90,7 @@ package. } \examples{ \dontrun{ -chat <- chat_azure_openai(deployment_id = "gpt-4o-mini") +chat <- chat_azure_openai(model = "gpt-4o-mini") chat$chat("Tell me three jokes about statisticians") } } diff --git a/man/chat_snowflake.Rd b/man/chat_snowflake.Rd index 5d971bec9..2b1980973 100644 --- a/man/chat_snowflake.Rd +++ b/man/chat_snowflake.Rd @@ -79,7 +79,7 @@ than a general-purpose model. } } \examples{ -\dontshow{if (has_credentials("snowflake")) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} +\dontshow{if (has_credentials("snowflake")) withAutoprint(\{ # examplesIf} chat <- chat_snowflake() chat$chat("Tell me a joke in the form of a SQL query.") \dontshow{\}) # examplesIf} diff --git a/man/ellmer-package.Rd b/man/ellmer-package.Rd index f609548d1..88f0c4d6e 100644 --- a/man/ellmer-package.Rd +++ b/man/ellmer-package.Rd @@ -32,7 +32,7 @@ Authors: Other contributors: \itemize{ - \item Posit Software, PBC (03wc8by49) [copyright holder, funder] + \item Posit Software, PBC (\href{https://ror.org/03wc8by49}{ROR}) [copyright holder, funder] } } diff --git a/tests/testthat/test-otel.R b/tests/testthat/test-otel.R index c8b28d41b..5d943bab3 100644 --- a/tests/testthat/test-otel.R +++ b/tests/testthat/test-otel.R @@ -15,7 +15,7 @@ test_that("tracing works as expected for synchronous chats", { function(x) identical(x$parent, "0000000000000000"), logical(1) ))) - root_spans <- sapply(agent_spans, function(x) x$span_id) + agent_span_ids <- sapply(agent_spans, function(x) x$span_id) # We should have two "execute_tool" spans (one for each tool invocation) # that are children of the agent spans. @@ -23,18 +23,19 @@ test_that("tracing works as expected for synchronous chats", { expect_length(tool_spans, 2L) expect_true(all(vapply( tool_spans, - function(x) x$parent %in% root_spans, + function(x) x$parent %in% agent_span_ids, logical(1) ))) - # And four "chat" spans that correspond to model calls before and after each + # And "chat" spans that correspond to model calls before and after each # tool call -- these are also children of the agent spans and siblings of one # another. + # Note 2025/09: Not all models are calling tools the same. There must be AT LEAST 2 chat spans. Ex: anthropic has 4, openai has 3. :shrug: chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) - expect_length(chat_spans, 4L) + expect_gte(length(chat_spans), 2L) expect_true(all(vapply( chat_spans, - function(x) x$parent %in% root_spans, + function(x) x$parent %in% agent_span_ids, logical(1) ))) @@ -59,7 +60,7 @@ test_that("tracing works as expected for synchronous streams", { spans <- otelsdk::with_otel_record({ chat <- chat_openai_test(system_prompt = "Be terse.") stream <- chat$stream("What's your favourite Apache Spark feature?") - local(otel::start_span("similtaneous")) + local(otel::start_local_active_span("simultaneous")) coro::collect(stream) })[["traces"]] @@ -77,7 +78,7 @@ test_that("tracing works as expected for synchronous streams", { # Verify that the span started when the stream was suspended is not part of # the agent trace. - expect_equal(spans[["similtaneous"]]$parent, "0000000000000000") + expect_equal(spans[["simultaneous"]]$parent, "0000000000000000") # But that HTTP spans are. chat_span_ids <- sapply(chat_spans, function(x) x$span_id) @@ -92,13 +93,14 @@ test_that("tracing works as expected for synchronous streams", { test_that("tracing works as expected for asynchronous chats", { skip_if_not_installed("otelsdk") - chat <- chat_openai_test( + # chat <- chat_openai_test( + chat <- chat_anthropic_test( system_prompt = "Always use a tool to answer. Reply with 'It is ____.'." ) chat$register_tool(tool( coro::async(function() "2024-01-01"), "Return the current date", - .name = "current_date" + name = "current_date" )) # Capture spans for an async chat with async tool calls interleaved with @@ -109,16 +111,19 @@ test_that("tracing works as expected for asynchronous chats", { chat$chat_async("What date will it be 47 days from now?") }) p2 <- promises::promise(function(resolve, reject) { - span <- otel::start_session("concurrent") + span <- otel::start_span("concurrent") + otel::local_active_span(span) later::later( function() { on.exit(span$end()) - otel::with_session(resolve(NULL), span) + otel::with_active_span(span, { + resolve(NULL) + }) }, 0.1 ) }) - local(otel::start_span("similtaneous")) + local(otel::start_local_active_span("simultaneous")) sync(p2) sync(p1) })[["traces"]] @@ -127,20 +132,21 @@ test_that("tracing works as expected for asynchronous chats", { # invocation) that start their respective traces. agent_spans <- Filter(function(x) x$name == "invoke_agent", spans) expect_length(agent_spans, 2L) + expect_true(all(vapply( agent_spans, function(x) identical(x$parent, "0000000000000000"), logical(1) ))) - root_spans <- sapply(agent_spans, function(x) x$span_id) + agent_span_ids <- sapply(agent_spans, function(x) x$span_id) # We should have two "execute_tool" spans (one for each tool invocation) # that are children of the agent spans. tool_spans <- Filter(function(x) startsWith(x$name, "execute_tool"), spans) - expect_length(tool_spans, 2L) + expect_gte(length(tool_spans), 1L) expect_true(all(vapply( tool_spans, - function(x) x$parent %in% root_spans, + function(x) x$parent %in% agent_span_ids, logical(1) ))) @@ -148,10 +154,10 @@ test_that("tracing works as expected for asynchronous chats", { # tool call -- these are also children of the agent spans and siblings of one # another. chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) - expect_length(chat_spans, 4L) + expect_gte(length(chat_spans), 2L) expect_true(all(vapply( chat_spans, - function(x) x$parent %in% root_spans, + function(x) x$parent %in% agent_span_ids, logical(1) ))) @@ -178,16 +184,18 @@ test_that("tracing works as expected for asynchronous streams", { chat <- chat_openai_test(system_prompt = "Be terse.") stream <- chat$stream_async("What's your favourite Apache Spark feature?") p <- promises::promise(function(resolve, reject) { - span <- local(otel::start_session("concurrent")) + span <- local(otel::start_span("concurrent")) later::later( function() { on.exit(span$end()) - otel::with_session(resolve(NULL), span) + otel::with_active_span(span, { + resolve(NULL) + }) }, 0.1 ) }) - local(otel::start_span("similtaneous")) + local(otel::start_local_active_span("simultaneous")) sync(p) sync(coro::async_collect(stream)) })[["traces"]] @@ -207,7 +215,7 @@ test_that("tracing works as expected for asynchronous streams", { # Verify that the spans started when the stream was suspended are not part of # the agent trace. expect_equal(spans[["concurrent"]]$parent, "0000000000000000") - expect_equal(spans[["similtaneous"]]$parent, "0000000000000000") + expect_equal(spans[["simultaneous"]]$parent, "0000000000000000") # But that HTTP spans are. chat_span_ids <- sapply(chat_spans, function(x) x$span_id) From 2d5245af2bf040c1d6b784a249a10f7606888236 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 9 Sep 2025 14:46:56 -0400 Subject: [PATCH 04/33] Simplify internal api --- R/chat-tools.R | 11 ++- R/chat.R | 15 ++-- R/otel.R | 195 ++++++++++++++++++------------------------------- 3 files changed, 85 insertions(+), 136 deletions(-) diff --git a/R/chat-tools.R b/R/chat-tools.R index b7f8d133c..bdf115d78 100644 --- a/R/chat-tools.R +++ b/R/chat-tools.R @@ -157,8 +157,10 @@ invoke_tool <- function(request, parent_ospan = NULL) { return(args) } - tool_ospan <- create_tool_ospan(request, parent_ospan = parent_ospan) - activate_and_cleanup_ospan(tool_ospan, ospan_promise_domain = FALSE) + tool_ospan <- start_local_active_tool_ospan( + request, + parent_ospan = parent_ospan + ) tryCatch( { @@ -184,7 +186,10 @@ on_load( return(args) } - tool_ospan <- create_tool_ospan(request, parent_ospan = parent_ospan) + tool_ospan <- start_local_active_tool_ospan( + request, + parent_ospan = parent_ospan + ) # Must activate the span in a promise domain so that it propagates to # async calls made by the tool function. activate_and_cleanup_ospan(tool_ospan, ospan_promise_domain = TRUE) diff --git a/R/chat.R b/R/chat.R index 5d4cc7802..31a5ed4f9 100644 --- a/R/chat.R +++ b/R/chat.R @@ -491,8 +491,7 @@ Chat <- R6::R6Class( tool_errors <- list() withr::defer(warn_tool_errors(tool_errors)) - agent_ospan <- create_agent_ospan(private$provider) - withr::defer(promises::end_ospan(agent_ospan)) + agent_ospan <- local_agent_ospan(private$provider) while (!is.null(user_turn)) { assistant_chunks <- private$submit_turns( @@ -555,9 +554,7 @@ Chat <- R6::R6Class( tool_errors <- list() withr::defer(warn_tool_errors(tool_errors)) - agent_ospan <- create_agent_ospan(private$provider) - withr::defer(promises::end_ospan(agent_ospan)) - # local_ospan_promise_domain() + agent_ospan <- local_agent_ospan(private$provider) while (!is.null(user_turn)) { assistant_chunks <- private$submit_turns_async( @@ -639,11 +636,11 @@ Chat <- R6::R6Class( if (echo == "all") { cat_line(format(user_turn), prefix = "> ") } - chat_ospan <- create_chat_ospan( + + chat_ospan <- local_chat_ospan( private$provider, parent_ospan = parent_ospan ) - activate_and_cleanup_ospan(chat_ospan) response <- chat_perform( provider = private$provider, @@ -733,12 +730,10 @@ Chat <- R6::R6Class( yield_as_content = FALSE, parent_ospan = NULL ) { - chat_ospan <- create_chat_ospan( + chat_ospan <- local_chat_ospan( private$provider, parent_ospan = parent_ospan ) - withr::defer(promises::end_ospan(chat_ospan)) - # activate_and_cleanup_ospan(chat_ospan) promises::with_ospan_promise_domain({ otel::with_active_span(chat_ospan, { diff --git a/R/otel.R b/R/otel.R index cc4c23221..4886045b3 100644 --- a/R/otel.R +++ b/R/otel.R @@ -5,6 +5,9 @@ default_tracer <- function() { otel::get_tracer("ellmer") } +# Only activate the span if it is non-NULL. If activated, ensure it is +# automatically ended when the activation scope exits. If +# ospan_promise_domain is TRUE, also ensure that the active span is reactivated upon promise domain restoration. activate_and_cleanup_ospan <- function( ospan, activation_scope = parent.frame(), @@ -12,7 +15,7 @@ activate_and_cleanup_ospan <- function( ) { if (!is.null(ospan)) { if (ospan_promise_domain) { - local_ospan_promise_domain() + local_ospan_promise_domain(activation_scope) } otel::local_active_span( ospan, @@ -22,13 +25,16 @@ activate_and_cleanup_ospan <- function( } } -local_ospan_promise_domain <- function(.local_envir = parent.frame()) { +# If any otel spans are activated within the current promise domain, they will +# be automatically restored during promise restoration. +local_ospan_promise_domain <- function(activation_scope = parent.frame()) { local_promise_domain( promises:::create_otel_ospan_handoff_promise_domain(), - .local_envir = .local_envir + .local_envir = activation_scope ) } +# Modifies the current promise domain to include `domain` for the local scope. local_promise_domain <- function( domain, .local_envir = parent.frame(), @@ -51,45 +57,32 @@ local_promise_domain <- function( invisible() } -create_chat_ospan <- function( + +local_chat_ospan <- function( provider, parent_ospan = NULL, + local_envir = parent.frame(), tracer = default_tracer() ) { - promises::create_ospan( - sprintf("chat %s", provider@model), - tracer = tracer, - options = list( - parent = parent_ospan, - kind = "CLIENT" - ), - attributes = list( - "gen_ai.operation.name" = "chat", - "gen_ai.system" = tolower(provider@name), - "gen_ai.request.model" = provider@model + chat_ospan <- + promises::create_ospan( + sprintf("chat %s", provider@model), + tracer = tracer, + options = list( + parent = parent_ospan, + kind = "CLIENT" + ), + attributes = list( + "gen_ai.operation.name" = "chat", + "gen_ai.system" = tolower(provider@name), + "gen_ai.request.model" = provider@model + ) ) - ) -} -# # Starts an Open Telemetry span that abides by the semantic conventions for -# # Generative AI completions. -# # -# # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference -# with_chat_ospan_async <- function(provider, expr, tracer = default_tracer()) { -# promises::with_ospan_promise_domain({ -# promises::with_ospan_async( -# sprintf("chat %s", provider@model), -# expr, -# tracer = tracer, -# options = list(kind = "CLIENT"), -# attributes = list( -# "gen_ai.operation.name" = "chat", -# "gen_ai.system" = tolower(provider@name), -# "gen_ai.request.model" = provider@model -# ) -# ) -# }) -# } + withr::defer(promises::end_ospan(chat_ospan), envir = local_envir) + + chat_ospan +} record_chat_ospan_status <- function(span, result) { if (is.null(span) || !span$is_recording()) { @@ -101,6 +94,7 @@ record_chat_ospan_status <- function(span, result) { if (!is.null(result$id)) { span$set_attribute("gen_ai.response.id", result$id) } + # TODO: Fixme @atheriel! # if (!is.null(result$usage)) { # span$set_attribute("gen_ai.usage.input_tokens", result$usage$prompt_tokens) # span$set_attribute( @@ -116,52 +110,43 @@ record_chat_ospan_status <- function(span, result) { # Starts an Open Telemetry span that abides by the semantic conventions for # Generative AI tool calls. # +# Must be activated for the calling scope. +# # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span -create_tool_ospan <- function( +start_local_active_tool_ospan <- function( request, parent_ospan = NULL, + local_envir = parent.frame(), tracer = default_tracer() ) { - promises::create_ospan( - sprintf("execute_tool %s", request@tool@name), - tracer = tracer, - options = list( - parent = parent_ospan, - kind = "INTERNAL" - ), - attributes = compact(list( - "gen_ai.operation.name" = "execute_tool", - "gen_ai.tool.name" = request@tool@name, - "gen_ai.tool.description" = request@tool@description, - "gen_ai.tool.call.id" = request@id - )) - ) + tool_ospan <- + promises::create_ospan( + sprintf("execute_tool %s", request@tool@name), + tracer = tracer, + options = list( + parent = parent_ospan, + kind = "INTERNAL" + ), + attributes = compact(list( + "gen_ai.operation.name" = "execute_tool", + "gen_ai.tool.name" = request@tool@name, + "gen_ai.tool.description" = request@tool@description, + "gen_ai.tool.call.id" = request@id + )) + ) + + activate_and_cleanup_ospan(tool_ospan, local_envir) + + tool_ospan } -# with_tool_ospan_async <- function(request, expr, tracer = default_tracer()) { -# promises::with_ospan_promise_domain({ -# promises::with_ospan_async( -# sprintf("execute_tool %s", request@tool@name), -# expr, -# tracer = tracer, -# options = list(kind = "INTERNAL"), -# attributes = compact(list( -# "gen_ai.operation.name" = "execute_tool", -# "gen_ai.tool.name" = request@tool@name, -# "gen_ai.tool.description" = request@tool@description, -# "gen_ai.tool.call.id" = request@id -# )) -# ) -# }) -# } - -record_tool_ospan_error <- function(span, error) { - if (is.null(span) || !span$is_recording()) { +record_tool_ospan_error <- function(ospan, error) { + if (is.null(ospan) || !ospan$is_recording()) { return() } - span$record_exception(error) - span$set_status("error") - span$set_attribute("error.type", class(error)[1]) + ospan$record_exception(error) + ospan$set_status("error") + ospan$set_attribute("error.type", class(error)[1]) } @@ -169,57 +154,21 @@ record_tool_ospan_error <- function(span, error) { # Generative AI "agents". # # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference -create_agent_ospan <- function( +local_agent_ospan <- function( provider, - tracer = default_tracer() + tracer = default_tracer(), + local_envir = parent.frame() ) { - promises::create_ospan( - "invoke_agent", - tracer = tracer, - options = list(kind = "CLIENT"), - attributes = list( - "gen_ai.operation.name" = "chat", - "gen_ai.system" = tolower(provider@name) + agent_ospan <- + promises::create_ospan( + "invoke_agent", + tracer = tracer, + options = list(kind = "CLIENT"), + attributes = list( + "gen_ai.operation.name" = "chat", + "gen_ai.system" = tolower(provider@name) + ) ) - ) -} - -# with_agent_ospan_async <- function(provider, expr, tracer = default_tracer()) { -# promises::with_ospan_promise_domain({ -# promises::with_ospan_async( -# "invoke_agent", -# expr, -# tracer = tracer, -# options = list(kind = "CLIENT"), -# attributes = list( -# "gen_ai.operation.name" = "chat", -# "gen_ai.system" = tolower(provider@name) -# ) -# ) -# }) -# } - -# ------------------------ - -gen_adapt_map <- coro::generator(function(.i, .fn, ...) { - for (x in .i) { - yield(.fn(x, ...)) - } -}) - - -# prev_gen %>% gen_adapt_ospan(chat_ospan) -gen_adapt_ospan <- function(x, ospan) { - gen_adapt_map(x, function(xi) { - local_ospan_promise_domain() - otel::local_active_span(ospan) - message("Activated ospan. : ", ospan$name, " - ", ospan$span_id) - withr::defer(message( - "deactivating ospan : ", - ospan$name, - " - ", - ospan$span_id - )) - force(xi) - }) + withr::defer(promises::end_ospan(agent_ospan), envir = local_envir) + agent_ospan } From 397052e89ac00e18364f922e60e074a091157f60 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 9 Sep 2025 16:27:28 -0400 Subject: [PATCH 05/33] Use `promises::local_ospan_promise_domain()` --- DESCRIPTION | 2 +- R/otel.R | 34 +--------------------------------- 2 files changed, 2 insertions(+), 34 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index e278183bd..7442c7953 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -55,9 +55,9 @@ Suggests: VignetteBuilder: knitr Remotes: - rstudio/promises, r-lib/otel, r-lib/otelsdk + rstudio/promises#179 Config/Needs/website: tidyverse/tidytemplate, rmarkdown Config/testthat/edition: 3 Config/testthat/parallel: true diff --git a/R/otel.R b/R/otel.R index 4886045b3..4a2980699 100644 --- a/R/otel.R +++ b/R/otel.R @@ -15,7 +15,7 @@ activate_and_cleanup_ospan <- function( ) { if (!is.null(ospan)) { if (ospan_promise_domain) { - local_ospan_promise_domain(activation_scope) + promises::local_ospan_promise_domain(activation_scope) } otel::local_active_span( ospan, @@ -25,38 +25,6 @@ activate_and_cleanup_ospan <- function( } } -# If any otel spans are activated within the current promise domain, they will -# be automatically restored during promise restoration. -local_ospan_promise_domain <- function(activation_scope = parent.frame()) { - local_promise_domain( - promises:::create_otel_ospan_handoff_promise_domain(), - .local_envir = activation_scope - ) -} - -# Modifies the current promise domain to include `domain` for the local scope. -local_promise_domain <- function( - domain, - .local_envir = parent.frame(), - replace = FALSE -) { - oldval <- promises:::current_promise_domain() - globals <- promises:::globals - if (replace) { - globals$domain <- domain - } else { - globals$domain <- promises:::compose_domains(oldval, domain) - } - withr::defer( - { - globals$domain <- oldval - }, - envir = .local_envir - ) - - invisible() -} - local_chat_ospan <- function( provider, From 833c308b55d3dee9b026356c5c6ca2f4f087dd0b Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 9 Sep 2025 16:29:33 -0400 Subject: [PATCH 06/33] Import otel as promises does. Remove suggestions on otelsdk and add to check remotes --- DESCRIPTION | 6 ++---- R/otel.R | 3 --- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 7442c7953..15695ef22 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -30,6 +30,7 @@ Imports: jsonlite, later (>= 1.4.0), lifecycle, + otel (>= 0.2.0), promises (>= 1.3.3.9001), R6, rlang (>= 1.1.0), @@ -43,8 +44,6 @@ Suggests: knitr, magick, openssl, - otel (>= 0.0.0.9000), - otelsdk (>= 0.0.0.9000), paws.common, rmarkdown, shiny, @@ -55,9 +54,8 @@ Suggests: VignetteBuilder: knitr Remotes: - r-lib/otel, - r-lib/otelsdk rstudio/promises#179 +Config/Needs/check: r-lib/otelsdk Config/Needs/website: tidyverse/tidytemplate, rmarkdown Config/testthat/edition: 3 Config/testthat/parallel: true diff --git a/R/otel.R b/R/otel.R index 4a2980699..8f097e353 100644 --- a/R/otel.R +++ b/R/otel.R @@ -1,7 +1,4 @@ default_tracer <- function() { - if (!is_installed("otel")) { - return(NULL) - } otel::get_tracer("ellmer") } From 324c955d103ea01f3ef47f3ebd772d45c6a3838d Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 9 Sep 2025 16:31:29 -0400 Subject: [PATCH 07/33] Use new promises main branch (PR was merged) --- DESCRIPTION | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 15695ef22..b79f8a3fd 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -31,7 +31,7 @@ Imports: later (>= 1.4.0), lifecycle, otel (>= 0.2.0), - promises (>= 1.3.3.9001), + promises (>= 1.3.3.9004), R6, rlang (>= 1.1.0), S7 (>= 0.2.0) @@ -54,7 +54,7 @@ Suggests: VignetteBuilder: knitr Remotes: - rstudio/promises#179 + rstudio/promises Config/Needs/check: r-lib/otelsdk Config/Needs/website: tidyverse/tidytemplate, rmarkdown Config/testthat/edition: 3 From a923a1aaa017112a308981d6cff6c4f68dc35085 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 9 Sep 2025 17:04:46 -0400 Subject: [PATCH 08/33] Copy in tracer retrieval from httr2 Co-Authored-By: Aaron Jacobs --- R/otel.R | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/R/otel.R b/R/otel.R index 8f097e353..8af9decb3 100644 --- a/R/otel.R +++ b/R/otel.R @@ -1,6 +1,23 @@ -default_tracer <- function() { - otel::get_tracer("ellmer") -} +otel_tracer_name <- "co.posit.r-package.ellmer" + +# Inspired by httr2:::get_tracer() / shiny:::get_tracer() +# Using local scope avoids an environment object lookup on each call. +default_tracer <- local({ + tracer <- NULL + function() { + if (!is.null(tracer)) { + return(tracer) + } + if (testthat::is_testing()) { + # Don't cache the tracer in unit tests. It interferes with tracer provider + # injection in otelsdk::with_otel_record(). + return(otel::get_tracer()) + } + tracer <<- otel::get_tracer() + tracer + } +}) + # Only activate the span if it is non-NULL. If activated, ensure it is # automatically ended when the activation scope exits. If From bc00cc248cbc3d6faf59ecf94ae2332d519772f9 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Wed, 10 Sep 2025 01:24:37 -0400 Subject: [PATCH 09/33] Fix runtime error with otelsdk where `spn$end(status="auto")` would fail --- R/otel.R | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/R/otel.R b/R/otel.R index 8af9decb3..d60cd024f 100644 --- a/R/otel.R +++ b/R/otel.R @@ -33,9 +33,12 @@ activate_and_cleanup_ospan <- function( } otel::local_active_span( ospan, - end_on_exit = TRUE, + end_on_exit = FALSE, activation_scope = activation_scope ) + # For some reason, when using `end_on_exit = TRUE` above, an error would occur during `spn$end(status = "auto")`. When using `withr::defer()` here, it works fine. + # TODO: Set status? + withr::defer(promises::end_ospan(ospan), envir = activation_scope) } } From e59614b8fb2bf34ccdfbd6e71e8a1381f669b878 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Wed, 10 Sep 2025 01:25:24 -0400 Subject: [PATCH 10/33] Pass through the parent chat ospan to the generator methods. Add ospan activation for chat --- R/chat.R | 23 +++++++++++++++-------- R/httr2.R | 30 +++++++++++++++++++++++++----- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/R/chat.R b/R/chat.R index 31a5ed4f9..fdb5aa1cc 100644 --- a/R/chat.R +++ b/R/chat.R @@ -642,13 +642,19 @@ Chat <- R6::R6Class( parent_ospan = parent_ospan ) - response <- chat_perform( - provider = private$provider, - mode = if (stream) "stream" else "value", - turns = c(private$.turns, list(user_turn)), - tools = if (is.null(type)) private$tools, - type = type - ) + promises::with_ospan_promise_domain({ + otel::with_active_span(chat_ospan, { + response <- chat_perform( + provider = private$provider, + mode = if (stream) "stream" else "value", + turns = c(private$.turns, list(user_turn)), + tools = if (is.null(type)) private$tools, + type = type, + parent_ospan = chat_ospan + ) + }) + }) + emit <- emitter(echo) any_text <- FALSE @@ -742,7 +748,8 @@ Chat <- R6::R6Class( mode = if (stream) "async-stream" else "async-value", turns = c(private$.turns, list(user_turn)), tools = if (is.null(type)) private$tools, - type = type + type = type, + parent_ospan = chat_ospan ) }) }) diff --git a/R/httr2.R b/R/httr2.R index a15234625..548be8281 100644 --- a/R/httr2.R +++ b/R/httr2.R @@ -6,7 +6,8 @@ chat_perform <- function( mode = c("value", "stream", "async-stream", "async-value"), turns, tools = NULL, - type = NULL + type = NULL, + parent_ospan = NULL ) { mode <- arg_match(mode) stream <- mode %in% c("stream", "async-stream") @@ -23,9 +24,13 @@ chat_perform <- function( switch( mode, "value" = chat_perform_value(provider, req), - "stream" = chat_perform_stream(provider, req), + "stream" = chat_perform_stream(provider, req, parent_ospan = parent_ospan), "async-value" = chat_perform_async_value(provider, req), - "async-stream" = chat_perform_async_stream(provider, req) + "async-stream" = chat_perform_async_stream( + provider, + req, + parent_ospan = parent_ospan + ) ) } @@ -34,7 +39,14 @@ chat_perform_value <- function(provider, req) { } on_load( - chat_perform_stream <- coro::generator(function(provider, req) { + chat_perform_stream <- coro::generator(function( + provider, + req, + parent_ospan = NULL + ) { + if (!is.null(parent_ospan)) { + otel::local_active_span(parent_ospan) + } resp <- req_perform_connection(req) on.exit(close(resp)) @@ -55,7 +67,15 @@ chat_perform_async_value <- function(provider, req) { } on_load( - chat_perform_async_stream <- coro::async_generator(function(provider, req) { + chat_perform_async_stream <- coro::async_generator(function( + provider, + req, + parent_ospan = NULL + ) { + if (!is.null(parent_ospan)) { + promises::local_ospan_promise_domain() + otel::local_active_span(parent_ospan) + } resp <- req_perform_connection(req, blocking = FALSE) on.exit(close(resp)) From 5e48b665a92d0df9bdb9efd288a5e4730f1660f9 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Thu, 6 Nov 2025 12:07:19 -0500 Subject: [PATCH 11/33] Apply suggestions from code review Co-authored-by: Charlie Gao <53399081+shikokuchuo@users.noreply.github.com> --- R/otel.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/otel.R b/R/otel.R index d60cd024f..12e980062 100644 --- a/R/otel.R +++ b/R/otel.R @@ -59,7 +59,7 @@ local_chat_ospan <- function( ), attributes = list( "gen_ai.operation.name" = "chat", - "gen_ai.system" = tolower(provider@name), + "gen_ai.provider.name" = tolower(provider@name), "gen_ai.request.model" = provider@model ) ) @@ -151,7 +151,7 @@ local_agent_ospan <- function( options = list(kind = "CLIENT"), attributes = list( "gen_ai.operation.name" = "chat", - "gen_ai.system" = tolower(provider@name) + "gen_ai.provider.name" = tolower(provider@name) ) ) withr::defer(promises::end_ospan(agent_ospan), envir = local_envir) From 5735bc69cbe65cc1b46e1dbe1fb86bd5b361811a Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Thu, 6 Nov 2025 12:33:00 -0500 Subject: [PATCH 12/33] Use existing `is_testing()` method --- R/otel.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/otel.R b/R/otel.R index 12e980062..4f7d671a4 100644 --- a/R/otel.R +++ b/R/otel.R @@ -8,7 +8,7 @@ default_tracer <- local({ if (!is.null(tracer)) { return(tracer) } - if (testthat::is_testing()) { + if (is_testing()) { # Don't cache the tracer in unit tests. It interferes with tracer provider # injection in otelsdk::with_otel_record(). return(otel::get_tracer()) From 30ee7942ec41db09c27e91bcf4198a2a62fe5e26 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Thu, 6 Nov 2025 12:33:44 -0500 Subject: [PATCH 13/33] Use more descriptive otel tracer function for ellmer Inspiration from shiny / promises code reviews --- R/otel.R | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/R/otel.R b/R/otel.R index 4f7d671a4..fd20a544c 100644 --- a/R/otel.R +++ b/R/otel.R @@ -2,7 +2,7 @@ otel_tracer_name <- "co.posit.r-package.ellmer" # Inspired by httr2:::get_tracer() / shiny:::get_tracer() # Using local scope avoids an environment object lookup on each call. -default_tracer <- local({ +ellmer_otel_tracer <- local({ tracer <- NULL function() { if (!is.null(tracer)) { @@ -47,7 +47,7 @@ local_chat_ospan <- function( provider, parent_ospan = NULL, local_envir = parent.frame(), - tracer = default_tracer() + tracer = ellmer_otel_tracer() ) { chat_ospan <- promises::create_ospan( @@ -102,7 +102,7 @@ start_local_active_tool_ospan <- function( request, parent_ospan = NULL, local_envir = parent.frame(), - tracer = default_tracer() + tracer = ellmer_otel_tracer() ) { tool_ospan <- promises::create_ospan( @@ -141,7 +141,7 @@ record_tool_ospan_error <- function(ospan, error) { # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference local_agent_ospan <- function( provider, - tracer = default_tracer(), + tracer = ellmer_otel_tracer(), local_envir = parent.frame() ) { agent_ospan <- From 443439ebb00992b2920a4ce476baf549edcb785c Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Thu, 6 Nov 2025 12:41:47 -0500 Subject: [PATCH 14/33] Use `defer` standalone --- DESCRIPTION | 1 + R/chat.R | 4 ++-- R/httr2.R | 8 ++++++-- R/import-standalone-defer.R | 35 +++++++++++++++++++++++++++++++++++ R/otel.R | 8 +++++--- 5 files changed, 49 insertions(+), 7 deletions(-) create mode 100644 R/import-standalone-defer.R diff --git a/DESCRIPTION b/DESCRIPTION index 839a208e1..583ca6b5d 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -85,6 +85,7 @@ Collate: 'content-pdf.R' 'content-replay.R' 'httr2.R' + 'import-standalone-defer.R' 'import-standalone-obj-type.R' 'import-standalone-purrr.R' 'import-standalone-types-check.R' diff --git a/R/chat.R b/R/chat.R index 502ab95ca..e5542e763 100644 --- a/R/chat.R +++ b/R/chat.R @@ -440,7 +440,7 @@ Chat <- R6::R6Class( yield_as_content = FALSE ) { tool_errors <- list() - withr::defer(warn_tool_errors(tool_errors)) + defer(warn_tool_errors(tool_errors)) agent_ospan <- local_agent_ospan(private$provider) @@ -503,7 +503,7 @@ Chat <- R6::R6Class( yield_as_content = FALSE ) { tool_errors <- list() - withr::defer(warn_tool_errors(tool_errors)) + defer(warn_tool_errors(tool_errors)) agent_ospan <- local_agent_ospan(private$provider) diff --git a/R/httr2.R b/R/httr2.R index 06e4e6be4..f242c9ac8 100644 --- a/R/httr2.R +++ b/R/httr2.R @@ -30,11 +30,15 @@ chat_perform <- function( provider, req, parent_ospan = parent_ospan - ) + ), "value" = req_perform(req), "stream" = chat_perform_stream(provider, req, parent_ospan = parent_ospan), "async-value" = req_perform_promise(req), - "async-stream" = chat_perform_async_stream(provider, req, parent_ospan = parent_ospan) + "async-stream" = chat_perform_async_stream( + provider, + req, + parent_ospan = parent_ospan + ) ) } diff --git a/R/import-standalone-defer.R b/R/import-standalone-defer.R new file mode 100644 index 000000000..e689e5092 --- /dev/null +++ b/R/import-standalone-defer.R @@ -0,0 +1,35 @@ +# Standalone file: do not edit by hand +# Source: https://github.com/r-lib/withr/blob/HEAD/R/standalone-defer.R +# Generated by: usethis::use_standalone("r-lib/withr", "defer") +# ---------------------------------------------------------------------- +# +# --- +# repo: r-lib/withr +# file: standalone-defer.R +# last-updated: 2024-01-15 +# license: https://unlicense.org +# --- +# +# `defer()` is similar to `on.exit()` but with a better default for +# `add` (hardcoded to `TRUE`) and `after` (`FALSE` by default). +# It also supports adding handlers to other frames which is useful +# to implement `local_` functions. +# +# +# ## Changelog +# +# 2024-01-15: +# * Rewritten to be pure base R. +# +# nocov start + +defer <- function(expr, envir = parent.frame(), after = FALSE) { + thunk <- as.call(list(function() expr)) + do.call( + on.exit, + list(thunk, add = TRUE, after = after), + envir = envir + ) +} + +# nocov end diff --git a/R/otel.R b/R/otel.R index fd20a544c..60dac2a74 100644 --- a/R/otel.R +++ b/R/otel.R @@ -38,7 +38,7 @@ activate_and_cleanup_ospan <- function( ) # For some reason, when using `end_on_exit = TRUE` above, an error would occur during `spn$end(status = "auto")`. When using `withr::defer()` here, it works fine. # TODO: Set status? - withr::defer(promises::end_ospan(ospan), envir = activation_scope) + defer(promises::end_ospan(ospan), envir = activation_scope) } } @@ -64,7 +64,7 @@ local_chat_ospan <- function( ) ) - withr::defer(promises::end_ospan(chat_ospan), envir = local_envir) + defer(promises::end_ospan(chat_ospan), envir = local_envir) chat_ospan } @@ -154,6 +154,8 @@ local_agent_ospan <- function( "gen_ai.provider.name" = tolower(provider@name) ) ) - withr::defer(promises::end_ospan(agent_ospan), envir = local_envir) + + defer(promises::end_ospan(agent_ospan), envir = local_envir) + agent_ospan } From 74fa875790017eae3e1d1be57f0435f6bc4703dc Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Thu, 6 Nov 2025 12:47:00 -0500 Subject: [PATCH 15/33] Apply similar logic to remove withr for `local_tempfile()` for main code --- R/provider-anthropic.R | 2 +- R/provider-openai-responses.R | 2 +- R/provider-openai.R | 4 ++-- R/utils.R | 23 +++++++++++++++++++++++ 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/R/provider-anthropic.R b/R/provider-anthropic.R index ebd09e9bd..2337c8e83 100644 --- a/R/provider-anthropic.R +++ b/R/provider-anthropic.R @@ -571,7 +571,7 @@ method(batch_retrieve, ProviderAnthropic) <- function(provider, batch) { req <- req_url(req, batch$results_url) req <- req_progress(req, "down") - path <- withr::local_tempfile() + path <- local_tempfile() req <- req_perform(req, path = path) lines <- readLines(path, warn = FALSE) diff --git a/R/provider-openai-responses.R b/R/provider-openai-responses.R index d082e7ec3..e47557b02 100644 --- a/R/provider-openai-responses.R +++ b/R/provider-openai-responses.R @@ -390,7 +390,7 @@ method(batch_submit, ProviderOpenAIResponses) <- function( conversations, type = NULL ) { - path <- withr::local_tempfile() + path <- local_tempfile() # First put the requests in a file # https://platform.openai.com/docs/api-reference/batch/request-input diff --git a/R/provider-openai.R b/R/provider-openai.R index 959799298..b69e52dbd 100644 --- a/R/provider-openai.R +++ b/R/provider-openai.R @@ -433,7 +433,7 @@ method(batch_submit, ProviderOpenAI) <- function( conversations, type = NULL ) { - path <- withr::local_tempfile() + path <- local_tempfile() # First put the requests in a file # https://platform.openai.com/docs/api-reference/batch/request-input @@ -508,7 +508,7 @@ method(batch_status, ProviderOpenAI) <- function(provider, batch) { # https://docs.anthropic.com/en/api/retrieving-message-batch-results method(batch_retrieve, ProviderOpenAI) <- function(provider, batch) { - path <- withr::local_tempfile() + path <- local_tempfile() req <- base_request(provider) req <- req_url_path_append(req, "/files/", batch$output_file_id, "/content") diff --git a/R/utils.R b/R/utils.R index 940b29bbe..c7d426498 100644 --- a/R/utils.R +++ b/R/utils.R @@ -294,3 +294,26 @@ request_summary <- function(req) { str_trunc <- function(x, n) { ifelse(nchar(x) > n, paste0(substr(x, 1, n - 3), "..."), x) } + + +# Trimmed version of withr::local_tempfile +# https://github.com/r-lib/withr/blob/1497d45f30c98eff80085d3c1dd45403511be878/R/tempfile.R#L49C1-L81C2 +local_tempfile <- function( + lines = NULL, + .local_envir = parent.frame(), + pattern = "file", + tmpdir = tempdir(), + fileext = "" +) { + path <- tempfile(pattern = pattern, tmpdir = tmpdir, fileext = fileext) + if (!is.null(lines)) { + con <- file(path, open = "wb", encoding = "native.enc") + defer(close(con)) + + writeLines(enc2utf8(lines), con, useBytes = TRUE) + } + + defer(unlink(path, recursive = TRUE), envir = .local_envir) + + path +} From 688edd6eea435132db7df6a6f3dd2857c0815607 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Thu, 6 Nov 2025 12:47:40 -0500 Subject: [PATCH 16/33] `activate_and_cleanup_ospan()` -> `setup_otel_span()` --- R/chat-tools.R | 2 +- R/otel.R | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/R/chat-tools.R b/R/chat-tools.R index b2b4241d7..02add8fe2 100644 --- a/R/chat-tools.R +++ b/R/chat-tools.R @@ -192,7 +192,7 @@ on_load( ) # Must activate the span in a promise domain so that it propagates to # async calls made by the tool function. - activate_and_cleanup_ospan(tool_ospan, ospan_promise_domain = TRUE) + setup_otel_span(tool_ospan, ospan_promise_domain = TRUE) tryCatch( { diff --git a/R/otel.R b/R/otel.R index 60dac2a74..53c74540f 100644 --- a/R/otel.R +++ b/R/otel.R @@ -22,7 +22,7 @@ ellmer_otel_tracer <- local({ # Only activate the span if it is non-NULL. If activated, ensure it is # automatically ended when the activation scope exits. If # ospan_promise_domain is TRUE, also ensure that the active span is reactivated upon promise domain restoration. -activate_and_cleanup_ospan <- function( +setup_otel_span <- function( ospan, activation_scope = parent.frame(), ospan_promise_domain = TRUE @@ -36,10 +36,11 @@ activate_and_cleanup_ospan <- function( end_on_exit = FALSE, activation_scope = activation_scope ) - # For some reason, when using `end_on_exit = TRUE` above, an error would occur during `spn$end(status = "auto")`. When using `withr::defer()` here, it works fine. # TODO: Set status? defer(promises::end_ospan(ospan), envir = activation_scope) } + + invisible(ospan) } @@ -120,7 +121,7 @@ start_local_active_tool_ospan <- function( )) ) - activate_and_cleanup_ospan(tool_ospan, local_envir) + setup_otel_span(tool_ospan, local_envir) tool_ospan } From eeb0b8408c3be85def91a528bbfebbbfc1c0e0fd Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Thu, 6 Nov 2025 15:41:05 -0500 Subject: [PATCH 17/33] Update fn names. Move some setup code inside method --- DESCRIPTION | 2 -- R/chat-tools.R | 27 +++++++------- R/chat.R | 75 +++++++++++++++++--------------------- R/httr2.R | 37 +++++++++---------- R/otel.R | 97 +++++++++++++++++++++++++++----------------------- R/utils.R | 5 --- 6 files changed, 117 insertions(+), 126 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 583ca6b5d..cd3e7ab9d 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -56,8 +56,6 @@ Suggests: withr VignetteBuilder: knitr -Remotes: - rstudio/promises Config/Needs/check: r-lib/otelsdk Config/Needs/website: tidyverse/tidytemplate, rmarkdown Config/testthat/edition: 3 diff --git a/R/chat-tools.R b/R/chat-tools.R index 02add8fe2..2f08eb4a3 100644 --- a/R/chat-tools.R +++ b/R/chat-tools.R @@ -34,7 +34,7 @@ on_load({ on_tool_request = function(request) invisible(), on_tool_result = function(result) invisible(), yield_request = FALSE, - parent_ospan = NULL + parent_otel_span = NULL ) { tool_requests <- extract_tool_requests(turn) @@ -52,7 +52,7 @@ on_load({ next } - result <- invoke_tool(request, parent_ospan = parent_ospan) + result <- invoke_tool(request, parent_otel_span = parent_otel_span) if (promises::is.promise(result@value)) { cli::cli_abort( @@ -80,7 +80,7 @@ on_load({ on_tool_request = function(request) invisible(), on_tool_result = function(result) invisible(), yield_request = FALSE, - parent_ospan = NULL + parent_otel_span = NULL ) { tool_requests <- extract_tool_requests(turn) @@ -96,7 +96,7 @@ on_load({ return(rejected) } - result <- coro::await(invoke_tool_async(request, parent_ospan)) + result <- coro::await(invoke_tool_async(request, parent_otel_span)) maybe_echo_tool(result, echo = echo) on_tool_result(result) @@ -146,7 +146,7 @@ new_tool_result <- function(request, result = NULL, error = NULL) { } # Also need to handle edge cases: https://platform.openai.com/docs/guides/function-calling/edge-cases -invoke_tool <- function(request, parent_ospan = NULL) { +invoke_tool <- function(request, parent_otel_span = NULL) { if (is.null(request@tool)) { return(new_tool_result(request, error = "Unknown tool")) } @@ -157,9 +157,9 @@ invoke_tool <- function(request, parent_ospan = NULL) { return(args) } - tool_ospan <- start_local_active_tool_ospan( + tool_span <- local_tool_otel_span( request, - parent_ospan = parent_ospan + parent_otel_span = parent_otel_span ) tryCatch( @@ -168,14 +168,14 @@ invoke_tool <- function(request, parent_ospan = NULL) { new_tool_result(request, result) }, error = function(e) { - record_tool_ospan_error(tool_ospan, e) + record_tool_otel_span_error(tool_span, e) new_tool_result(request, error = e) } ) } on_load( - invoke_tool_async <- coro::async(function(request, parent_ospan = NULL) { + invoke_tool_async <- coro::async(function(request, parent_otel_span = NULL) { if (is.null(request@tool)) { return(new_tool_result(request, error = "Unknown tool")) } @@ -186,13 +186,10 @@ on_load( return(args) } - tool_ospan <- start_local_active_tool_ospan( + tool_span <- local_tool_otel_span( request, - parent_ospan = parent_ospan + parent_otel_span = parent_otel_span ) - # Must activate the span in a promise domain so that it propagates to - # async calls made by the tool function. - setup_otel_span(tool_ospan, ospan_promise_domain = TRUE) tryCatch( { @@ -200,7 +197,7 @@ on_load( new_tool_result(request, result) }, error = function(e) { - record_tool_ospan_error(tool_ospan, e) + record_tool_otel_span_error(tool_span, e) new_tool_result(request, error = e) } ) diff --git a/R/chat.R b/R/chat.R index e5542e763..e9bf53865 100644 --- a/R/chat.R +++ b/R/chat.R @@ -442,7 +442,7 @@ Chat <- R6::R6Class( tool_errors <- list() defer(warn_tool_errors(tool_errors)) - agent_ospan <- local_agent_ospan(private$provider) + agent_span <- local_agent_otel_span(private$provider) while (!is.null(user_turn)) { assistant_chunks <- private$submit_turns( @@ -450,7 +450,7 @@ Chat <- R6::R6Class( stream = stream, echo = echo, yield_as_content = yield_as_content, - parent_ospan = agent_ospan + parent_otel_span = agent_span ) for (chunk in assistant_chunks) { yield(chunk) @@ -466,7 +466,7 @@ Chat <- R6::R6Class( on_tool_request = private$callback_on_tool_request$invoke, on_tool_result = private$callback_on_tool_result$invoke, yield_request = yield_as_content, - parent_ospan = agent_ospan + parent_otel_span = agent_span ) tool_results <- list() @@ -505,7 +505,7 @@ Chat <- R6::R6Class( tool_errors <- list() defer(warn_tool_errors(tool_errors)) - agent_ospan <- local_agent_ospan(private$provider) + agent_span <- local_agent_otel_span(private$provider) while (!is.null(user_turn)) { assistant_chunks <- private$submit_turns_async( @@ -513,7 +513,7 @@ Chat <- R6::R6Class( stream = stream, echo = echo, yield_as_content = yield_as_content, - parent_ospan = agent_ospan + parent_otel_span = agent_span ) for (chunk in await_each(assistant_chunks)) { yield(chunk) @@ -529,7 +529,7 @@ Chat <- R6::R6Class( on_tool_request = private$callback_on_tool_request$invoke_async, on_tool_result = private$callback_on_tool_result$invoke_async, yield_request = yield_as_content, - parent_ospan = agent_ospan + parent_otel_span = agent_span ) if (tool_mode == "sequential") { tool_results <- list() @@ -542,9 +542,7 @@ Chat <- R6::R6Class( } } } else { - # otel::with_active_span(agent_ospan, { tool_results <- coro::collect(tool_calls) - # }) if (yield_as_content) { # Filter out and yield tool requests before awaiting tool results is_request <- map_lgl(tool_results, is_tool_request) @@ -582,29 +580,25 @@ Chat <- R6::R6Class( echo, type = NULL, yield_as_content = FALSE, - parent_ospan = NULL + parent_otel_span = NULL ) { if (echo == "all") { cat_line(format(user_turn), prefix = "> ") } - chat_ospan <- local_chat_ospan( + chat_span <- local_chat_otel_span( private$provider, - parent_ospan = parent_ospan + parent_otel_span = parent_otel_span ) - promises::with_ospan_promise_domain({ - otel::with_active_span(chat_ospan, { - response <- chat_perform( - provider = private$provider, - mode = if (stream) "stream" else "value", - turns = c(private$.turns, list(user_turn)), - tools = if (is.null(type)) private$tools, - type = type, - parent_ospan = chat_ospan - ) - }) - }) + response <- chat_perform( + provider = private$provider, + mode = if (stream) "stream" else "value", + turns = c(private$.turns, list(user_turn)), + tools = if (is.null(type)) private$tools, + type = type, + parent_otel_span = chat_span + ) emit <- emitter(echo) any_text <- FALSE @@ -625,7 +619,7 @@ Chat <- R6::R6Class( result <- stream_merge_chunks(private$provider, result, chunk) } - record_chat_ospan_status(chat_ospan, result) + record_chat_otel_span_status(chat_span, result) turn <- value_turn( private$provider, result, @@ -633,7 +627,7 @@ Chat <- R6::R6Class( ) turn <- match_tools(turn, private$tools) } else { - record_chat_ospan_status(chat_ospan, response) + record_chat_otel_span_status(chat_span, response) turn <- value_turn( private$provider, resp_body_json(response), @@ -686,25 +680,22 @@ Chat <- R6::R6Class( echo, type = NULL, yield_as_content = FALSE, - parent_ospan = NULL + parent_otel_span = NULL ) { - chat_ospan <- local_chat_ospan( + chat_span <- local_chat_otel_span( private$provider, - parent_ospan = parent_ospan + parent_otel_span = parent_otel_span + ) + + response <- chat_perform( + provider = private$provider, + mode = if (stream) "async-stream" else "async-value", + turns = c(private$.turns, list(user_turn)), + tools = if (is.null(type)) private$tools, + type = type, + parent_otel_span = chat_span ) - promises::with_ospan_promise_domain({ - otel::with_active_span(chat_ospan, { - response <- chat_perform( - provider = private$provider, - mode = if (stream) "async-stream" else "async-value", - turns = c(private$.turns, list(user_turn)), - tools = if (is.null(type)) private$tools, - type = type, - parent_ospan = chat_ospan - ) - }) - }) emit <- emitter(echo) any_text <- FALSE @@ -724,7 +715,7 @@ Chat <- R6::R6Class( result <- stream_merge_chunks(private$provider, result, chunk) } - record_chat_ospan_status(chat_ospan, result) + record_chat_otel_span_status(chat_span, result) turn <- value_turn( private$provider, result, @@ -733,7 +724,7 @@ Chat <- R6::R6Class( } else { result <- await(response) - record_chat_ospan_status(chat_ospan, result) + record_chat_otel_span_status(chat_span, result) turn <- value_turn( private$provider, resp_body_json(result), diff --git a/R/httr2.R b/R/httr2.R index f242c9ac8..098137254 100644 --- a/R/httr2.R +++ b/R/httr2.R @@ -7,12 +7,14 @@ chat_perform <- function( turns, tools = NULL, type = NULL, - parent_ospan = NULL + parent_otel_span = NULL ) { mode <- arg_match(mode) stream <- mode %in% c("stream", "async-stream") tools <- tools %||% list() + setup_active_promise_otel_span(parent_otel_span) + req <- chat_request( provider = provider, turns = turns, @@ -23,21 +25,17 @@ chat_perform <- function( switch( mode, - "value" = chat_perform_value(provider, req), - "stream" = chat_perform_stream(provider, req, parent_ospan = parent_ospan), - "async-value" = chat_perform_async_value(provider, req), - "async-stream" = chat_perform_async_stream( + "value" = req_perform(req), + "stream" = chat_perform_stream( provider, req, - parent_ospan = parent_ospan + parent_otel_span = parent_otel_span ), - "value" = req_perform(req), - "stream" = chat_perform_stream(provider, req, parent_ospan = parent_ospan), "async-value" = req_perform_promise(req), "async-stream" = chat_perform_async_stream( provider, req, - parent_ospan = parent_ospan + parent_otel_span = parent_otel_span ) ) } @@ -46,15 +44,17 @@ on_load( chat_perform_stream <- coro::generator(function( provider, req, - parent_ospan = NULL + parent_otel_span = NULL ) { - if (!is.null(parent_ospan)) { - otel::local_active_span(parent_ospan) - } + setup_active_promise_otel_span(parent_otel_span) + resp <- req_perform_connection(req) on.exit(close(resp)) repeat { + # Ensure the span is active for each await/yield point + setup_active_promise_otel_span(parent_otel_span) + event <- chat_resp_stream(provider, resp) data <- stream_parse(provider, event) if (is.null(data)) { @@ -70,16 +70,17 @@ on_load( chat_perform_async_stream <- coro::async_generator(function( provider, req, - parent_ospan = NULL + parent_otel_span = NULL ) { - if (!is.null(parent_ospan)) { - promises::local_ospan_promise_domain() - otel::local_active_span(parent_ospan) - } + setup_active_promise_otel_span(parent_otel_span) + resp <- req_perform_connection(req, blocking = FALSE) on.exit(close(resp)) repeat { + # Ensure the span is active for each await/yield point + setup_active_promise_otel_span(parent_otel_span) + event <- chat_resp_stream(provider, resp) if (is.null(event) && !resp_stream_is_complete(resp)) { fds <- resp$body$get_fdset() diff --git a/R/otel.R b/R/otel.R index 53c74540f..d60facdd0 100644 --- a/R/otel.R +++ b/R/otel.R @@ -18,44 +18,48 @@ ellmer_otel_tracer <- local({ } }) +otel_is_enabled <- function(tracer = ellmer_otel_tracer()) { + .subset2(tracer, "is_enabled")() +} + -# Only activate the span if it is non-NULL. If activated, ensure it is -# automatically ended when the activation scope exits. If -# ospan_promise_domain is TRUE, also ensure that the active span is reactivated upon promise domain restoration. -setup_otel_span <- function( - ospan, - activation_scope = parent.frame(), - ospan_promise_domain = TRUE +# Only activate the span if it is non-NULL. If +# otel_promise_domain is TRUE, also ensure that the active span is reactivated upon promise domain restoration. +#' Activate and use handoff promise domain for Open Telemetry span +#' +#' @param otel_span An Open Telemetry span object. +#' @param activation_scope The scope in which to activate the span. +#' @noRd +setup_active_promise_otel_span <- function( + otel_span, + activation_scope = parent.frame() ) { - if (!is.null(ospan)) { - if (ospan_promise_domain) { - promises::local_ospan_promise_domain(activation_scope) - } - otel::local_active_span( - ospan, - end_on_exit = FALSE, - activation_scope = activation_scope - ) - # TODO: Set status? - defer(promises::end_ospan(ospan), envir = activation_scope) + if (is.null(otel_span) || !otel_is_enabled()) { + return() } - invisible(ospan) + promises::local_otel_promise_domain(activation_scope) + otel::local_active_span( + otel_span, + activation_scope = activation_scope + ) + + invisible() } -local_chat_ospan <- function( +local_chat_otel_span <- function( provider, - parent_ospan = NULL, + parent_otel_span = NULL, local_envir = parent.frame(), tracer = ellmer_otel_tracer() ) { - chat_ospan <- - promises::create_ospan( + chat_span <- + otel::start_span( sprintf("chat %s", provider@model), tracer = tracer, options = list( - parent = parent_ospan, + parent = parent_otel_span, kind = "CLIENT" ), attributes = list( @@ -65,12 +69,12 @@ local_chat_ospan <- function( ) ) - defer(promises::end_ospan(chat_ospan), envir = local_envir) + defer(otel::end_span(chat_span), envir = local_envir) - chat_ospan + chat_span } -record_chat_ospan_status <- function(span, result) { +record_chat_otel_span_status <- function(span, result) { if (is.null(span) || !span$is_recording()) { return(invisible(span)) } @@ -99,18 +103,18 @@ record_chat_ospan_status <- function(span, result) { # Must be activated for the calling scope. # # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span -start_local_active_tool_ospan <- function( +local_tool_otel_span <- function( request, - parent_ospan = NULL, + parent_otel_span = NULL, local_envir = parent.frame(), tracer = ellmer_otel_tracer() ) { - tool_ospan <- - promises::create_ospan( + tool_span <- + otel::start_span( sprintf("execute_tool %s", request@tool@name), tracer = tracer, options = list( - parent = parent_ospan, + parent = parent_otel_span, kind = "INTERNAL" ), attributes = compact(list( @@ -121,18 +125,20 @@ start_local_active_tool_ospan <- function( )) ) - setup_otel_span(tool_ospan, local_envir) + setup_active_promise_otel_span(tool_span, local_envir) - tool_ospan + defer(otel::end_span(tool_span), envir = local_envir) + + tool_span } -record_tool_ospan_error <- function(ospan, error) { - if (is.null(ospan) || !ospan$is_recording()) { +record_tool_otel_span_error <- function(otel_span, error) { + if (is.null(otel_span) || !otel_span$is_recording()) { return() } - ospan$record_exception(error) - ospan$set_status("error") - ospan$set_attribute("error.type", class(error)[1]) + otel_span$record_exception(error) + otel_span$set_status("error") + otel_span$set_attribute("error.type", class(error)[1]) } @@ -140,13 +146,14 @@ record_tool_ospan_error <- function(ospan, error) { # Generative AI "agents". # # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference -local_agent_ospan <- function( +# local_otel_span_agent +local_agent_otel_span <- function( provider, tracer = ellmer_otel_tracer(), local_envir = parent.frame() ) { - agent_ospan <- - promises::create_ospan( + agent_span <- + otel::start_span( "invoke_agent", tracer = tracer, options = list(kind = "CLIENT"), @@ -156,7 +163,9 @@ local_agent_ospan <- function( ) ) - defer(promises::end_ospan(agent_ospan), envir = local_envir) + setup_active_promise_otel_span(agent_span, local_envir) + + defer(otel::end_span(agent_span), envir = local_envir) - agent_ospan + agent_span } diff --git a/R/utils.R b/R/utils.R index c7d426498..d04b9ef96 100644 --- a/R/utils.R +++ b/R/utils.R @@ -30,11 +30,6 @@ key_exists <- function(name) { !identical(Sys.getenv(name), "") } -defer <- function(expr, env = caller_env(), after = FALSE) { - thunk <- as.call(list(function() expr)) - do.call(on.exit, list(thunk, TRUE, after), envir = env) -} - set_default <- function(value, default, arg = caller_arg(value)) { if (is.null(value)) { if (!is_testing() || is_snapshot()) { From e8cb35ce25e88aff088c68a0889a1dcd4876be84 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Thu, 6 Nov 2025 15:45:40 -0500 Subject: [PATCH 18/33] Bump dev version to 0.3.2.9001 --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index cd3e7ab9d..b641549cb 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: ellmer Title: Chat with Large Language Models -Version: 0.3.2.9000 +Version: 0.3.2.9001 Authors@R: c( person("Hadley", "Wickham", , "hadley@posit.co", role = c("aut", "cre"), comment = c(ORCID = "0000-0003-4757-117X")), From 0e19f88862c39e4fef53334bee4fe9f35059683d Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Fri, 7 Nov 2025 11:19:38 -0500 Subject: [PATCH 19/33] Apply suggestion from @shikokuchuo Co-authored-by: Charlie Gao <53399081+shikokuchuo@users.noreply.github.com> --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index b641549cb..fc25a78ce 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -31,7 +31,7 @@ Imports: later (>= 1.4.0), lifecycle, otel (>= 0.2.0), - promises (>= 1.3.3.9004), + promises (>= 1.5.0), R6, rlang (>= 1.1.0), S7 (>= 0.2.0), From 98e3ff67af7dcaa089a68712a5b9d4920a919102 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Fri, 7 Nov 2025 11:21:07 -0500 Subject: [PATCH 20/33] Remove `coro::` namespace for `await_each` --- R/chat.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/chat.R b/R/chat.R index e9bf53865..daf86c3b8 100644 --- a/R/chat.R +++ b/R/chat.R @@ -701,7 +701,7 @@ Chat <- R6::R6Class( if (stream) { result <- NULL - for (chunk in coro::await_each(response)) { + for (chunk in await_each(response)) { text <- stream_text(private$provider, chunk) if (!is.null(text)) { emit(text) From e92322e7aa9be8a526731d8a43fe63c1e541cdd8 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Fri, 7 Nov 2025 11:33:24 -0500 Subject: [PATCH 21/33] Use local_tempfile() helper method --- R/provider-openai.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/R/provider-openai.R b/R/provider-openai.R index 60ddcb5a1..2ff4f113a 100644 --- a/R/provider-openai.R +++ b/R/provider-openai.R @@ -452,7 +452,7 @@ method(batch_submit, ProviderOpenAI) <- function( conversations, type = NULL ) { - path <- withr::local_tempfile() + path <- local_tempfile() # First put the requests in a file # https://platform.openai.com/docs/api-reference/batch/request-input @@ -528,7 +528,7 @@ method(batch_status, ProviderOpenAI) <- function(provider, batch) { # https://platform.openai.com/docs/api-reference/batch/retrieve method(batch_retrieve, ProviderOpenAI) <- function(provider, batch) { # output file - path_output <- withr::local_tempfile() + path_output <- local_tempfile() req <- base_request(provider) req <- req_url_path_append(req, "/files/", batch$output_file_id, "/content") req <- req_progress(req, "down") @@ -537,7 +537,7 @@ method(batch_retrieve, ProviderOpenAI) <- function(provider, batch) { # error file if (length(batch$error_file_id) == 1) { - path_error <- withr::local_tempfile() + path_error <- local_tempfile() req <- base_request(provider) req <- req_url_path_append(req, "/files/", batch$error_file_id, "/content") req <- req_progress(req, "down") From 0c91f8ff4683cadba35d365fa33fd9d2a241a067 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Fri, 7 Nov 2025 11:37:33 -0500 Subject: [PATCH 22/33] Remove otelsdk remote and make a Suggests --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index 18af0ee4f..23933927c 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -46,6 +46,7 @@ Suggests: knitr, magick, openssl, + otelsdk, paws.common, png, rmarkdown, @@ -56,7 +57,6 @@ Suggests: withr VignetteBuilder: knitr -Config/Needs/check: r-lib/otelsdk Config/Needs/website: tidyverse/tidytemplate, rmarkdown Config/testthat/edition: 3 Config/testthat/parallel: true From c377dd66bf72faa7765fa547a7652d4ee228db5b Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Fri, 7 Nov 2025 11:43:48 -0500 Subject: [PATCH 23/33] Refactor otel span argument naming for clarity params/args: `parent_otel_span` -> `otel_span` `local_*_otel_span(parent_otel_span=)` -> `local_*_otel_span(parent=)` --- R/chat-tools.R | 16 ++++++++-------- R/chat.R | 20 ++++++++++---------- R/httr2.R | 20 ++++++++++---------- R/otel.R | 8 ++++---- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/R/chat-tools.R b/R/chat-tools.R index 2f08eb4a3..ee94f38ff 100644 --- a/R/chat-tools.R +++ b/R/chat-tools.R @@ -34,7 +34,7 @@ on_load({ on_tool_request = function(request) invisible(), on_tool_result = function(result) invisible(), yield_request = FALSE, - parent_otel_span = NULL + otel_span = NULL ) { tool_requests <- extract_tool_requests(turn) @@ -52,7 +52,7 @@ on_load({ next } - result <- invoke_tool(request, parent_otel_span = parent_otel_span) + result <- invoke_tool(request, parent = otel_span) if (promises::is.promise(result@value)) { cli::cli_abort( @@ -80,7 +80,7 @@ on_load({ on_tool_request = function(request) invisible(), on_tool_result = function(result) invisible(), yield_request = FALSE, - parent_otel_span = NULL + otel_span = NULL ) { tool_requests <- extract_tool_requests(turn) @@ -96,7 +96,7 @@ on_load({ return(rejected) } - result <- coro::await(invoke_tool_async(request, parent_otel_span)) + result <- coro::await(invoke_tool_async(request, otel_span = otel_span)) maybe_echo_tool(result, echo = echo) on_tool_result(result) @@ -146,7 +146,7 @@ new_tool_result <- function(request, result = NULL, error = NULL) { } # Also need to handle edge cases: https://platform.openai.com/docs/guides/function-calling/edge-cases -invoke_tool <- function(request, parent_otel_span = NULL) { +invoke_tool <- function(request, otel_span = NULL) { if (is.null(request@tool)) { return(new_tool_result(request, error = "Unknown tool")) } @@ -159,7 +159,7 @@ invoke_tool <- function(request, parent_otel_span = NULL) { tool_span <- local_tool_otel_span( request, - parent_otel_span = parent_otel_span + parent = otel_span ) tryCatch( @@ -175,7 +175,7 @@ invoke_tool <- function(request, parent_otel_span = NULL) { } on_load( - invoke_tool_async <- coro::async(function(request, parent_otel_span = NULL) { + invoke_tool_async <- coro::async(function(request, otel_span = NULL) { if (is.null(request@tool)) { return(new_tool_result(request, error = "Unknown tool")) } @@ -188,7 +188,7 @@ on_load( tool_span <- local_tool_otel_span( request, - parent_otel_span = parent_otel_span + parent = otel_span ) tryCatch( diff --git a/R/chat.R b/R/chat.R index daf86c3b8..a6f68095a 100644 --- a/R/chat.R +++ b/R/chat.R @@ -450,7 +450,7 @@ Chat <- R6::R6Class( stream = stream, echo = echo, yield_as_content = yield_as_content, - parent_otel_span = agent_span + otel_span = agent_span ) for (chunk in assistant_chunks) { yield(chunk) @@ -466,7 +466,7 @@ Chat <- R6::R6Class( on_tool_request = private$callback_on_tool_request$invoke, on_tool_result = private$callback_on_tool_result$invoke, yield_request = yield_as_content, - parent_otel_span = agent_span + otel_span = agent_span ) tool_results <- list() @@ -513,7 +513,7 @@ Chat <- R6::R6Class( stream = stream, echo = echo, yield_as_content = yield_as_content, - parent_otel_span = agent_span + otel_span = agent_span ) for (chunk in await_each(assistant_chunks)) { yield(chunk) @@ -529,7 +529,7 @@ Chat <- R6::R6Class( on_tool_request = private$callback_on_tool_request$invoke_async, on_tool_result = private$callback_on_tool_result$invoke_async, yield_request = yield_as_content, - parent_otel_span = agent_span + otel_span = agent_span ) if (tool_mode == "sequential") { tool_results <- list() @@ -580,7 +580,7 @@ Chat <- R6::R6Class( echo, type = NULL, yield_as_content = FALSE, - parent_otel_span = NULL + otel_span = NULL ) { if (echo == "all") { cat_line(format(user_turn), prefix = "> ") @@ -588,7 +588,7 @@ Chat <- R6::R6Class( chat_span <- local_chat_otel_span( private$provider, - parent_otel_span = parent_otel_span + parent = otel_span ) response <- chat_perform( @@ -597,7 +597,7 @@ Chat <- R6::R6Class( turns = c(private$.turns, list(user_turn)), tools = if (is.null(type)) private$tools, type = type, - parent_otel_span = chat_span + otel_span = chat_span ) emit <- emitter(echo) @@ -680,11 +680,11 @@ Chat <- R6::R6Class( echo, type = NULL, yield_as_content = FALSE, - parent_otel_span = NULL + otel_span = NULL ) { chat_span <- local_chat_otel_span( private$provider, - parent_otel_span = parent_otel_span + parent = otel_span ) response <- chat_perform( @@ -693,7 +693,7 @@ Chat <- R6::R6Class( turns = c(private$.turns, list(user_turn)), tools = if (is.null(type)) private$tools, type = type, - parent_otel_span = chat_span + otel_span = chat_span ) emit <- emitter(echo) diff --git a/R/httr2.R b/R/httr2.R index 098137254..4e9bde720 100644 --- a/R/httr2.R +++ b/R/httr2.R @@ -7,13 +7,13 @@ chat_perform <- function( turns, tools = NULL, type = NULL, - parent_otel_span = NULL + otel_span = NULL ) { mode <- arg_match(mode) stream <- mode %in% c("stream", "async-stream") tools <- tools %||% list() - setup_active_promise_otel_span(parent_otel_span) + setup_active_promise_otel_span(otel_span) req <- chat_request( provider = provider, @@ -29,13 +29,13 @@ chat_perform <- function( "stream" = chat_perform_stream( provider, req, - parent_otel_span = parent_otel_span + otel_span = otel_span ), "async-value" = req_perform_promise(req), "async-stream" = chat_perform_async_stream( provider, req, - parent_otel_span = parent_otel_span + otel_span = otel_span ) ) } @@ -44,16 +44,16 @@ on_load( chat_perform_stream <- coro::generator(function( provider, req, - parent_otel_span = NULL + otel_span = NULL ) { - setup_active_promise_otel_span(parent_otel_span) + setup_active_promise_otel_span(otel_span) resp <- req_perform_connection(req) on.exit(close(resp)) repeat { # Ensure the span is active for each await/yield point - setup_active_promise_otel_span(parent_otel_span) + setup_active_promise_otel_span(otel_span) event <- chat_resp_stream(provider, resp) data <- stream_parse(provider, event) @@ -70,16 +70,16 @@ on_load( chat_perform_async_stream <- coro::async_generator(function( provider, req, - parent_otel_span = NULL + otel_span = NULL ) { - setup_active_promise_otel_span(parent_otel_span) + setup_active_promise_otel_span(otel_span) resp <- req_perform_connection(req, blocking = FALSE) on.exit(close(resp)) repeat { # Ensure the span is active for each await/yield point - setup_active_promise_otel_span(parent_otel_span) + setup_active_promise_otel_span(otel_span) event <- chat_resp_stream(provider, resp) if (is.null(event) && !resp_stream_is_complete(resp)) { diff --git a/R/otel.R b/R/otel.R index d60facdd0..58e6bf24e 100644 --- a/R/otel.R +++ b/R/otel.R @@ -50,7 +50,7 @@ setup_active_promise_otel_span <- function( local_chat_otel_span <- function( provider, - parent_otel_span = NULL, + parent = NULL, local_envir = parent.frame(), tracer = ellmer_otel_tracer() ) { @@ -59,7 +59,7 @@ local_chat_otel_span <- function( sprintf("chat %s", provider@model), tracer = tracer, options = list( - parent = parent_otel_span, + parent = parent, kind = "CLIENT" ), attributes = list( @@ -105,7 +105,7 @@ record_chat_otel_span_status <- function(span, result) { # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span local_tool_otel_span <- function( request, - parent_otel_span = NULL, + parent = NULL, local_envir = parent.frame(), tracer = ellmer_otel_tracer() ) { @@ -114,7 +114,7 @@ local_tool_otel_span <- function( sprintf("execute_tool %s", request@tool@name), tracer = tracer, options = list( - parent = parent_otel_span, + parent = parent, kind = "INTERNAL" ), attributes = compact(list( From a7852bc79897f3fdc8f1c17e43b7514f37659d7a Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Fri, 7 Nov 2025 11:49:30 -0500 Subject: [PATCH 24/33] Update chat-tools.R --- R/chat-tools.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/chat-tools.R b/R/chat-tools.R index ee94f38ff..037d01697 100644 --- a/R/chat-tools.R +++ b/R/chat-tools.R @@ -52,7 +52,7 @@ on_load({ next } - result <- invoke_tool(request, parent = otel_span) + result <- invoke_tool(request, otel_span = otel_span) if (promises::is.promise(result@value)) { cli::cli_abort( From 8080dc6617a1e8f183f8b70931f836547deb2247 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Sat, 8 Nov 2025 15:22:18 +0000 Subject: [PATCH 25/33] Update otel tracer caching implementation after #848; add early returns to span creation functions --- R/otel.R | 250 +++++++++++++++++++------------------ R/zzz.R | 1 + tests/testthat/test-otel.R | 8 +- 3 files changed, 132 insertions(+), 127 deletions(-) diff --git a/R/otel.R b/R/otel.R index 58e6bf24e..f1cec9ce9 100644 --- a/R/otel.R +++ b/R/otel.R @@ -1,82 +1,130 @@ otel_tracer_name <- "co.posit.r-package.ellmer" -# Inspired by httr2:::get_tracer() / shiny:::get_tracer() -# Using local scope avoids an environment object lookup on each call. -ellmer_otel_tracer <- local({ - tracer <- NULL - function() { - if (!is.null(tracer)) { - return(tracer) - } - if (is_testing()) { - # Don't cache the tracer in unit tests. It interferes with tracer provider - # injection in otelsdk::with_otel_record(). - return(otel::get_tracer()) - } - tracer <<- otel::get_tracer() - tracer +otel_cache_tracer <- NULL +local_chat_otel_span <- NULL +local_tool_otel_span <- NULL +local_agent_otel_span <- NULL + +local({ + otel_is_tracing <- FALSE + otel_tracer <- NULL + + otel_cache_tracer <<- function() { + otel_tracer <<- otel::get_tracer(otel_tracer_name) + otel_is_tracing <<- tracer_enabled(otel_tracer) } -}) -otel_is_enabled <- function(tracer = ellmer_otel_tracer()) { - .subset2(tracer, "is_enabled")() -} + local_chat_otel_span <<- function( + provider, + parent = NULL, + local_envir = parent.frame() + ) { + if (!otel_is_tracing) { + return() + } + chat_span <- + otel::start_span( + sprintf("chat %s", provider@model), + options = list( + parent = parent, + kind = "CLIENT" + ), + attributes = list( + "gen_ai.operation.name" = "chat", + "gen_ai.provider.name" = tolower(provider@name), + "gen_ai.request.model" = provider@model + ), + tracer = otel_tracer + ) + defer(otel::end_span(chat_span), envir = local_envir) -# Only activate the span if it is non-NULL. If -# otel_promise_domain is TRUE, also ensure that the active span is reactivated upon promise domain restoration. -#' Activate and use handoff promise domain for Open Telemetry span -#' -#' @param otel_span An Open Telemetry span object. -#' @param activation_scope The scope in which to activate the span. -#' @noRd -setup_active_promise_otel_span <- function( - otel_span, - activation_scope = parent.frame() -) { - if (is.null(otel_span) || !otel_is_enabled()) { - return() + chat_span } - promises::local_otel_promise_domain(activation_scope) - otel::local_active_span( - otel_span, - activation_scope = activation_scope - ) + # Starts an Open Telemetry span that abides by the semantic conventions for + # Generative AI tool calls. + # + # Must be activated for the calling scope. + # + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span + local_tool_otel_span <<- function( + request, + parent = NULL, + local_envir = parent.frame() + ) { + if (!otel_is_tracing) { + return() + } + tool_span <- + otel::start_span( + sprintf("execute_tool %s", request@tool@name), + options = list( + parent = parent, + kind = "INTERNAL" + ), + attributes = compact(list( + "gen_ai.operation.name" = "execute_tool", + "gen_ai.tool.name" = request@tool@name, + "gen_ai.tool.description" = request@tool@description, + "gen_ai.tool.call.id" = request@id + )), + tracer = otel_tracer + ) - invisible() -} + setup_active_promise_otel_span(tool_span, local_envir) + defer(otel::end_span(tool_span), envir = local_envir) -local_chat_otel_span <- function( - provider, - parent = NULL, - local_envir = parent.frame(), - tracer = ellmer_otel_tracer() -) { - chat_span <- - otel::start_span( - sprintf("chat %s", provider@model), - tracer = tracer, - options = list( - parent = parent, - kind = "CLIENT" - ), - attributes = list( - "gen_ai.operation.name" = "chat", - "gen_ai.provider.name" = tolower(provider@name), - "gen_ai.request.model" = provider@model + tool_span + } + + # Starts an Open Telemetry span that abides by the semantic conventions for + # Generative AI "agents". + # + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference + # local_otel_span_agent + local_agent_otel_span <<- function( + provider, + local_envir = parent.frame() + ) { + if (!otel_is_tracing) { + return() + } + agent_span <- + otel::start_span( + "invoke_agent", + options = list(kind = "CLIENT"), + attributes = list( + "gen_ai.operation.name" = "chat", + "gen_ai.provider.name" = tolower(provider@name) + ), + tracer = otel_tracer ) - ) - defer(otel::end_span(chat_span), envir = local_envir) + setup_active_promise_otel_span(agent_span, local_envir) + + defer(otel::end_span(agent_span), envir = local_envir) - chat_span + agent_span + } +}) + +tracer_enabled <- function(tracer) { + .subset2(tracer, "is_enabled")() +} + +with_otel_record <- function(expr) { + on.exit(otel_cache_tracer()) + otelsdk::with_otel_record({ + otel_cache_tracer() + expr + }) } record_chat_otel_span_status <- function(span, result) { if (is.null(span) || !span$is_recording()) { - return(invisible(span)) + return() } if (!is.null(result$model)) { span$set_attribute("gen_ai.response.model", result$model) @@ -96,44 +144,8 @@ record_chat_otel_span_status <- function(span, result) { span$set_status("ok") } - -# Starts an Open Telemetry span that abides by the semantic conventions for -# Generative AI tool calls. -# -# Must be activated for the calling scope. -# -# See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span -local_tool_otel_span <- function( - request, - parent = NULL, - local_envir = parent.frame(), - tracer = ellmer_otel_tracer() -) { - tool_span <- - otel::start_span( - sprintf("execute_tool %s", request@tool@name), - tracer = tracer, - options = list( - parent = parent, - kind = "INTERNAL" - ), - attributes = compact(list( - "gen_ai.operation.name" = "execute_tool", - "gen_ai.tool.name" = request@tool@name, - "gen_ai.tool.description" = request@tool@description, - "gen_ai.tool.call.id" = request@id - )) - ) - - setup_active_promise_otel_span(tool_span, local_envir) - - defer(otel::end_span(tool_span), envir = local_envir) - - tool_span -} - -record_tool_otel_span_error <- function(otel_span, error) { - if (is.null(otel_span) || !otel_span$is_recording()) { +record_tool_otel_span_error <- function(span, error) { + if (is.null(span) || !span$is_recording()) { return() } otel_span$record_exception(error) @@ -141,31 +153,23 @@ record_tool_otel_span_error <- function(otel_span, error) { otel_span$set_attribute("error.type", class(error)[1]) } - -# Starts an Open Telemetry span that abides by the semantic conventions for -# Generative AI "agents". -# -# See: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference -# local_otel_span_agent -local_agent_otel_span <- function( - provider, - tracer = ellmer_otel_tracer(), - local_envir = parent.frame() +# Only activate the span if it is non-NULL. If +# otel_promise_domain is TRUE, also ensure that the active span is reactivated upon promise domain restoration. +#' Activate and use handoff promise domain for Open Telemetry span +#' +#' @param otel_span An Open Telemetry span object. +#' @param activation_scope The scope in which to activate the span. +#' @noRd +setup_active_promise_otel_span <- function( + span, + activation_scope = parent.frame() ) { - agent_span <- - otel::start_span( - "invoke_agent", - tracer = tracer, - options = list(kind = "CLIENT"), - attributes = list( - "gen_ai.operation.name" = "chat", - "gen_ai.provider.name" = tolower(provider@name) - ) - ) - - setup_active_promise_otel_span(agent_span, local_envir) + if (is.null(span) || !span$is_recording()) { + return() + } - defer(otel::end_span(agent_span), envir = local_envir) + promises::local_otel_promise_domain(activation_scope) + otel::local_active_span(span, activation_scope = activation_scope) - agent_span + invisible() } diff --git a/R/zzz.R b/R/zzz.R index ef0a14dae..6c3f385f4 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -1,6 +1,7 @@ .onLoad <- function(libname, pkgname) { run_on_load() S7::methods_register() + otel_cache_tracer() } # Work around S7 bug diff --git a/tests/testthat/test-otel.R b/tests/testthat/test-otel.R index 5d943bab3..bb4e088ae 100644 --- a/tests/testthat/test-otel.R +++ b/tests/testthat/test-otel.R @@ -2,7 +2,7 @@ test_that("tracing works as expected for synchronous chats", { skip_if_not_installed("otelsdk") # Capture spans from a typical synchronous chat with tool calls. - spans <- otelsdk::with_otel_record({ + spans <- with_otel_record({ test_tools_simple(chat_openai_test) })[["traces"]] @@ -57,7 +57,7 @@ test_that("tracing works as expected for synchronous streams", { skip_if_not_installed("otelsdk") # Capture spans when a stream is suspended. - spans <- otelsdk::with_otel_record({ + spans <- with_otel_record({ chat <- chat_openai_test(system_prompt = "Be terse.") stream <- chat$stream("What's your favourite Apache Spark feature?") local(otel::start_local_active_span("simultaneous")) @@ -105,7 +105,7 @@ test_that("tracing works as expected for asynchronous chats", { # Capture spans for an async chat with async tool calls interleaved with # other synchronous and asynchronous spans. - spans <- otelsdk::with_otel_record({ + spans <- with_otel_record({ p1 <- chat$chat_async("What's the current date in Y-M-D format?") |> promises::then(function(result) { chat$chat_async("What date will it be 47 days from now?") @@ -180,7 +180,7 @@ test_that("tracing works as expected for asynchronous streams", { # Capture spans when an async stream is used in concert with other # synchronous and asynchronous spans. - spans <- otelsdk::with_otel_record({ + spans <- with_otel_record({ chat <- chat_openai_test(system_prompt = "Be terse.") stream <- chat$stream_async("What's your favourite Apache Spark feature?") p <- promises::promise(function(resolve, reject) { From 37e9ca2b15e6c85b97880e5abcbc56d23010dd04 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Sat, 8 Nov 2025 17:05:20 +0000 Subject: [PATCH 26/33] Corrections for 8080dc6 --- R/otel.R | 6 +++--- R/tokens.R | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/R/otel.R b/R/otel.R index f1cec9ce9..491d34d75 100644 --- a/R/otel.R +++ b/R/otel.R @@ -148,9 +148,9 @@ record_tool_otel_span_error <- function(span, error) { if (is.null(span) || !span$is_recording()) { return() } - otel_span$record_exception(error) - otel_span$set_status("error") - otel_span$set_attribute("error.type", class(error)[1]) + span$record_exception(error) + span$set_status("error") + span$set_attribute("error.type", class(error)[1L]) } # Only activate the span if it is non-NULL. If diff --git a/R/tokens.R b/R/tokens.R index 7232a5d44..16be827f3 100644 --- a/R/tokens.R +++ b/R/tokens.R @@ -85,7 +85,7 @@ local_tokens <- function(frame = parent.frame()) { old <- the$tokens the$tokens <- tokens_row() - defer(the$tokens <- old, env = frame) + defer(the$tokens <- old, envir = frame) } #' Report on token usage in the current session From 01f20e6108aad0e110e8200f89db7f14f250378f Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Sat, 8 Nov 2025 19:23:43 +0000 Subject: [PATCH 27/33] Remove superfluous promise domain setups --- R/httr2.R | 6 ------ 1 file changed, 6 deletions(-) diff --git a/R/httr2.R b/R/httr2.R index 4e9bde720..be5ca7cbb 100644 --- a/R/httr2.R +++ b/R/httr2.R @@ -52,9 +52,6 @@ on_load( on.exit(close(resp)) repeat { - # Ensure the span is active for each await/yield point - setup_active_promise_otel_span(otel_span) - event <- chat_resp_stream(provider, resp) data <- stream_parse(provider, event) if (is.null(data)) { @@ -78,9 +75,6 @@ on_load( on.exit(close(resp)) repeat { - # Ensure the span is active for each await/yield point - setup_active_promise_otel_span(otel_span) - event <- chat_resp_stream(provider, resp) if (is.null(event) && !resp_stream_is_complete(resp)) { fds <- resp$body$get_fdset() From 913270db168a79e5c456683b3b8205b9773579c9 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Mon, 10 Nov 2025 11:36:51 -0500 Subject: [PATCH 28/33] Refactor agent span activation in OpenTelemetry tracing Updated local_agent_otel_span to accept an 'activate' argument. `activate` MUST be `FALSE` for `local_agent_otel_span()` to prevent issues when switching too many coroutine contexts. Improved related tests to check span hierarchy and clarify span activation behavior. --- R/chat.R | 4 ++-- R/otel.R | 16 ++++++++++++++- tests/testthat/test-otel.R | 42 +++++++++++++++++++++++++++----------- 3 files changed, 47 insertions(+), 15 deletions(-) diff --git a/R/chat.R b/R/chat.R index a6f68095a..e1cfe51b8 100644 --- a/R/chat.R +++ b/R/chat.R @@ -442,7 +442,7 @@ Chat <- R6::R6Class( tool_errors <- list() defer(warn_tool_errors(tool_errors)) - agent_span <- local_agent_otel_span(private$provider) + agent_span <- local_agent_otel_span(private$provider, activate = FALSE) while (!is.null(user_turn)) { assistant_chunks <- private$submit_turns( @@ -505,7 +505,7 @@ Chat <- R6::R6Class( tool_errors <- list() defer(warn_tool_errors(tool_errors)) - agent_span <- local_agent_otel_span(private$provider) + agent_span <- local_agent_otel_span(private$provider, activate = FALSE) while (!is.null(user_turn)) { assistant_chunks <- private$submit_turns_async( diff --git a/R/otel.R b/R/otel.R index 491d34d75..fe1da7769 100644 --- a/R/otel.R +++ b/R/otel.R @@ -86,11 +86,20 @@ local({ # local_otel_span_agent local_agent_otel_span <<- function( provider, + activate = TRUE, local_envir = parent.frame() ) { if (!otel_is_tracing) { return() } + if (activate) { + abort(c( + "Activating the agent span is not supported at this time.", + "*" = "Activating the span here would set it as the active span globally (via otel::local_active_span() until the calling function ends (a long time).", + "*" = "`coro::setup()` would address this and be appropriate", + "i" = "Work around: Activate only where necessary or over a single yield in the calling scope." + )) + } agent_span <- otel::start_span( "invoke_agent", @@ -102,7 +111,12 @@ local({ tracer = otel_tracer ) - setup_active_promise_otel_span(agent_span, local_envir) + ## Do not activate! + ## The current usage of `local_agent_otel_span()` is in a multi-step coroutine. + ## This would require deactivating only after the coroutine is done, + ## but not between yields, that is too long and unpredictable. + ## The span should only be activated in the specific steps where it is needed. + # setup_active_promise_otel_span(agent_span, local_envir) defer(otel::end_span(agent_span), envir = local_envir) diff --git a/tests/testthat/test-otel.R b/tests/testthat/test-otel.R index bb4e088ae..1626182d5 100644 --- a/tests/testthat/test-otel.R +++ b/tests/testthat/test-otel.R @@ -112,32 +112,50 @@ test_that("tracing works as expected for asynchronous chats", { }) p2 <- promises::promise(function(resolve, reject) { span <- otel::start_span("concurrent") - otel::local_active_span(span) + otel::local_active_span(span) # just to try to mess with things later::later( function() { - on.exit(span$end()) - otel::with_active_span(span, { - resolve(NULL) - }) + otel::local_active_span(span) # just to try to mess with things + resolve(NULL) + span$end() }, 0.1 ) }) - local(otel::start_local_active_span("simultaneous")) + + local({ + # Typical external usage + local(otel::start_local_active_span("simultaneous")) + }) + sync(p2) sync(p1) })[["traces"]] + ## Debug span output; name, id, parent + # ignore <- Map( + # spans, + # format(names(spans), justify = "right"), + # f = function(span, name) { + # message(name, " - ", span$span_id, " - ", span$parent) + # } + # ) + + # Check we have two top-level extra spans + expect_top_level_span <- function(span) { + expect_true(!is.null(span)) + expect_equal(span$parent, "0000000000000000") + } + expect_top_level_span(spans[["concurrent"]]) + expect_top_level_span(spans[["simultaneous"]]) + # Check we have two top-level "invoke_agent" spans (one for each chat() # invocation) that start their respective traces. agent_spans <- Filter(function(x) x$name == "invoke_agent", spans) expect_length(agent_spans, 2L) + expect_top_level_span(agent_spans[[1]]) + expect_top_level_span(agent_spans[[2]]) - expect_true(all(vapply( - agent_spans, - function(x) identical(x$parent, "0000000000000000"), - logical(1) - ))) agent_span_ids <- sapply(agent_spans, function(x) x$span_id) # We should have two "execute_tool" spans (one for each tool invocation) @@ -154,7 +172,7 @@ test_that("tracing works as expected for asynchronous chats", { # tool call -- these are also children of the agent spans and siblings of one # another. chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) - expect_gte(length(chat_spans), 2L) + expect_equal(length(chat_spans), 2 * length(tool_spans)) expect_true(all(vapply( chat_spans, function(x) x$parent %in% agent_span_ids, From 2dc46ba2db39dbc949fab434a6d4dd470c067a72 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Mon, 10 Nov 2025 11:56:28 -0500 Subject: [PATCH 29/33] Relax span count assertions in otel tracing tests Updated tests to use expect_gte instead of strict length checks for tool and chat spans. This accounts for model variations where tools may be called more than expected or results are cached, improving test robustness across different model behaviors. --- tests/testthat/test-otel.R | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/testthat/test-otel.R b/tests/testthat/test-otel.R index 1626182d5..fe86c0cd5 100644 --- a/tests/testthat/test-otel.R +++ b/tests/testthat/test-otel.R @@ -17,10 +17,13 @@ test_that("tracing works as expected for synchronous chats", { ))) agent_span_ids <- sapply(agent_spans, function(x) x$span_id) - # We should have two "execute_tool" spans (one for each tool invocation) - # that are children of the agent spans. + # We should have (at least) two "execute_tool" spans + # (one for each tool invocation) that are children of the agent spans. + # Note 2025/11: Some models may call tools early and cache the results, so we can not check for existance of multiple parent ids (newer openai models) + # Note 2025/11: Some models call more tools than necessary + # Ex: anthropic calls the date tool twice. So we check for at least 2. tool_spans <- Filter(function(x) startsWith(x$name, "execute_tool"), spans) - expect_length(tool_spans, 2L) + expect_gte(length(tool_spans), 2L) expect_true(all(vapply( tool_spans, function(x) x$parent %in% agent_span_ids, @@ -172,7 +175,7 @@ test_that("tracing works as expected for asynchronous chats", { # tool call -- these are also children of the agent spans and siblings of one # another. chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) - expect_equal(length(chat_spans), 2 * length(tool_spans)) + expect_gte(length(chat_spans), 2 * length(tool_spans)) expect_true(all(vapply( chat_spans, function(x) x$parent %in% agent_span_ids, From 2d78a24d3e30d0a7c3c910ba44a81d6536e4cab3 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Mon, 10 Nov 2025 18:12:49 +0000 Subject: [PATCH 30/33] Refactor otel spans tests to be relative instead of comparing against absolute values --- tests/testthat/test-otel.R | 41 ++++++++++++-------------------------- 1 file changed, 13 insertions(+), 28 deletions(-) diff --git a/tests/testthat/test-otel.R b/tests/testthat/test-otel.R index fe86c0cd5..19a9942ea 100644 --- a/tests/testthat/test-otel.R +++ b/tests/testthat/test-otel.R @@ -10,11 +10,7 @@ test_that("tracing works as expected for synchronous chats", { # invocation) that start their respective traces. agent_spans <- Filter(function(x) x$name == "invoke_agent", spans) expect_length(agent_spans, 2L) - expect_true(all(vapply( - agent_spans, - function(x) identical(x$parent, "0000000000000000"), - logical(1) - ))) + expect_equal(agent_spans[[1L]]$parent, agent_spans[[2L]]$parent) agent_span_ids <- sapply(agent_spans, function(x) x$span_id) # We should have (at least) two "execute_tool" spans @@ -68,20 +64,16 @@ test_that("tracing works as expected for synchronous streams", { })[["traces"]] # Check we have one top-level "invoke_agent" span. - expect_equal(spans[["invoke_agent"]]$parent, "0000000000000000") + expect_length(spans[names(spans) == "invoke_agent"], 1L) # And one child span for the stream (confirming that it ends when collected). chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) expect_length(chat_spans, 1L) - expect_true(all(vapply( - chat_spans, - function(x) identical(x$parent, spans[["invoke_agent"]]$span_id), - logical(1) - ))) + expect_equal(chat_spans[[1L]]$parent, spans[["invoke_agent"]]$span_id) # Verify that the span started when the stream was suspended is not part of # the agent trace. - expect_equal(spans[["simultaneous"]]$parent, "0000000000000000") + expect_equal(spans[["simultaneous"]]$parent, spans[["invoke_agent"]]$parent) # But that HTTP spans are. chat_span_ids <- sapply(chat_spans, function(x) x$span_id) @@ -145,19 +137,16 @@ test_that("tracing works as expected for asynchronous chats", { # ) # Check we have two top-level extra spans - expect_top_level_span <- function(span) { - expect_true(!is.null(span)) - expect_equal(span$parent, "0000000000000000") - } - expect_top_level_span(spans[["concurrent"]]) - expect_top_level_span(spans[["simultaneous"]]) + expect_length(spans[names(spans) == "concurrent"], 1L) + expect_length(spans[names(spans) == "simultaneous"], 1L) + expect_equal(spans[["concurrent"]]$parent, spans[["simultaneous"]]$parent) # Check we have two top-level "invoke_agent" spans (one for each chat() # invocation) that start their respective traces. agent_spans <- Filter(function(x) x$name == "invoke_agent", spans) expect_length(agent_spans, 2L) - expect_top_level_span(agent_spans[[1]]) - expect_top_level_span(agent_spans[[2]]) + expect_equal(agent_spans[[1]]$parent, spans[["concurrent"]]$parent) + expect_equal(agent_spans[[2]]$parent, spans[["concurrent"]]$parent) agent_span_ids <- sapply(agent_spans, function(x) x$span_id) @@ -222,21 +211,17 @@ test_that("tracing works as expected for asynchronous streams", { })[["traces"]] # Check we have one top-level "invoke_agent" span. - expect_equal(spans[["invoke_agent"]]$parent, "0000000000000000") + expect_length(spans[names(spans) == "invoke_agent"], 1L) # And one child span for the stream (confirming that it ends when collected). chat_spans <- Filter(function(x) startsWith(x$name, "chat"), spans) expect_length(chat_spans, 1L) - expect_true(all(vapply( - chat_spans, - function(x) identical(x$parent, spans[["invoke_agent"]]$span_id), - logical(1) - ))) + expect_equal(chat_spans[[1L]]$parent, spans[["invoke_agent"]]$span_id) # Verify that the spans started when the stream was suspended are not part of # the agent trace. - expect_equal(spans[["concurrent"]]$parent, "0000000000000000") - expect_equal(spans[["simultaneous"]]$parent, "0000000000000000") + expect_equal(spans[["concurrent"]]$parent, spans[["invoke_agent"]]$parent) + expect_equal(spans[["simultaneous"]]$parent, spans[["invoke_agent"]]$parent) # But that HTTP spans are. chat_span_ids <- sapply(chat_spans, function(x) x$span_id) From 7f653bbe9c08762a6776d7e8486179071db1e2bc Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Mon, 10 Nov 2025 19:27:57 +0000 Subject: [PATCH 31/33] Simplify span kinds and test --- R/otel.R | 9 +++------ tests/testthat/test-otel.R | 3 +++ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/R/otel.R b/R/otel.R index fe1da7769..36e8bf436 100644 --- a/R/otel.R +++ b/R/otel.R @@ -27,7 +27,7 @@ local({ sprintf("chat %s", provider@model), options = list( parent = parent, - kind = "CLIENT" + kind = "client" ), attributes = list( "gen_ai.operation.name" = "chat", @@ -59,10 +59,7 @@ local({ tool_span <- otel::start_span( sprintf("execute_tool %s", request@tool@name), - options = list( - parent = parent, - kind = "INTERNAL" - ), + options = list(parent = parent), attributes = compact(list( "gen_ai.operation.name" = "execute_tool", "gen_ai.tool.name" = request@tool@name, @@ -103,7 +100,7 @@ local({ agent_span <- otel::start_span( "invoke_agent", - options = list(kind = "CLIENT"), + options = list(kind = "client"), attributes = list( "gen_ai.operation.name" = "chat", "gen_ai.provider.name" = tolower(provider@name) diff --git a/tests/testthat/test-otel.R b/tests/testthat/test-otel.R index 19a9942ea..aa0faf5fc 100644 --- a/tests/testthat/test-otel.R +++ b/tests/testthat/test-otel.R @@ -11,6 +11,7 @@ test_that("tracing works as expected for synchronous chats", { agent_spans <- Filter(function(x) x$name == "invoke_agent", spans) expect_length(agent_spans, 2L) expect_equal(agent_spans[[1L]]$parent, agent_spans[[2L]]$parent) + expect_equal(agent_spans[[1L]]$kind, "client") agent_span_ids <- sapply(agent_spans, function(x) x$span_id) # We should have (at least) two "execute_tool" spans @@ -25,6 +26,7 @@ test_that("tracing works as expected for synchronous chats", { function(x) x$parent %in% agent_span_ids, logical(1) ))) + expect_equal(tool_spans[[1L]]$kind, "internal") # And "chat" spans that correspond to model calls before and after each # tool call -- these are also children of the agent spans and siblings of one @@ -37,6 +39,7 @@ test_that("tracing works as expected for synchronous chats", { function(x) x$parent %in% agent_span_ids, logical(1) ))) + expect_equal(chat_spans[[1L]]$kind, "client") # Ensure we record the result (and therefore set the status) of chat spans. expect_true(all(vapply(chat_spans, function(x) x$status == "ok", logical(1)))) From c03dfde87c3d5ea1cc9eabac2cd6e3e997f0a991 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Mon, 10 Nov 2025 19:39:31 +0000 Subject: [PATCH 32/33] Move otel to suggests --- DESCRIPTION | 4 ++-- R/otel.R | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 840d44e73..332acba9f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -30,7 +30,6 @@ Imports: jsonlite, later (>= 1.4.0), lifecycle, - otel (>= 0.2.0), promises (>= 1.5.0), R6, rlang (>= 1.1.0), @@ -46,7 +45,8 @@ Suggests: knitr, magick, openssl, - otelsdk, + otel (>= 0.2.0), + otelsdk (>= 0.2.0), paws.common, png, rmarkdown, diff --git a/R/otel.R b/R/otel.R index 36e8bf436..d3676dbe8 100644 --- a/R/otel.R +++ b/R/otel.R @@ -10,6 +10,9 @@ local({ otel_tracer <- NULL otel_cache_tracer <<- function() { + if (!requireNamespace("otel", quietly = TRUE)) { + return() + } otel_tracer <<- otel::get_tracer(otel_tracer_name) otel_is_tracing <<- tracer_enabled(otel_tracer) } From ecf85fd8bbfb90c6e2777157a9c2a5d3a9021cbc Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Mon, 10 Nov 2025 19:42:38 +0000 Subject: [PATCH 33/33] Add `span_recording()` helper --- R/otel.R | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/R/otel.R b/R/otel.R index d3676dbe8..0849de7b7 100644 --- a/R/otel.R +++ b/R/otel.R @@ -128,6 +128,10 @@ tracer_enabled <- function(tracer) { .subset2(tracer, "is_enabled")() } +span_recording <- function(span) { + .subset2(span, "is_recording")() +} + with_otel_record <- function(expr) { on.exit(otel_cache_tracer()) otelsdk::with_otel_record({ @@ -137,7 +141,7 @@ with_otel_record <- function(expr) { } record_chat_otel_span_status <- function(span, result) { - if (is.null(span) || !span$is_recording()) { + if (is.null(span) || !span_recording(span)) { return() } if (!is.null(result$model)) { @@ -159,7 +163,7 @@ record_chat_otel_span_status <- function(span, result) { } record_tool_otel_span_error <- function(span, error) { - if (is.null(span) || !span$is_recording()) { + if (is.null(span) || !span_recording(span)) { return() } span$record_exception(error) @@ -178,7 +182,7 @@ setup_active_promise_otel_span <- function( span, activation_scope = parent.frame() ) { - if (is.null(span) || !span$is_recording()) { + if (is.null(span) || !span_recording(span)) { return() }