Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/per-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ jobs:
- os: macos-arm
runsOn: macos-14
- os: macos-intel
runsOn: macos-15-intel
timeoutMinutes: 30
runsOn: macos-26-intel
timeoutMinutes: 40
runs-on: ${{ matrix.runsOn || matrix.os }}
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
Expand Down Expand Up @@ -168,7 +168,8 @@ jobs:
- os: macos-arm
runsOn: macos-14
- os: macos-intel
runsOn: macos-15-intel
runsOn: macos-26-intel
timeoutMinutes: 40
runs-on: ${{ matrix.runsOn || matrix.os }}
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
Expand Down
2 changes: 1 addition & 1 deletion crates/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ tokio = { version = "1.47", default-features = false, features = [
"sync",
"time",
] }
tonic = { workspace = true, default-features = false, features = ["tls-native-roots", "channel"] }
tonic = { workspace = true, default-features = false, features = ["tls-native-roots", "channel", "gzip"] }
tokio-rustls = { version = "0.26", default-features = false }
tower = { version = "0.5", features = ["util"] }
tracing = "0.1"
Expand Down
48 changes: 29 additions & 19 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ use tonic::{
Code, IntoRequest,
body::Body,
client::GrpcService,
codec::CompressionEncoding,
codegen::InterceptedService,
metadata::{
AsciiMetadataKey, AsciiMetadataValue, BinaryMetadataKey, BinaryMetadataValue, MetadataMap,
Expand Down Expand Up @@ -162,6 +163,13 @@ impl Connection {
/// Connect to a Temporal service.
pub async fn connect(options: ConnectionOptions) -> Result<Self, ClientConnectError> {
let dns_lb_opts = dns::validate_and_get_dns_lb(&options)?.cloned();
// The callback-based override transport cannot decode compressed request bodies, so
// compression is forced off whenever a service override is in use.
let compression = if options.service_override.is_some() {
GrpcCompression::None
} else {
options.grpc_compression
};
let (service, dns_task) = if let Some(service_override) = options.service_override {
(
GrpcMetricSvc {
Expand Down Expand Up @@ -238,7 +246,7 @@ impl Connection {
headers: headers.clone(),
};
let svc = InterceptedService::new(service, interceptor);
let mut svc_client = TemporalServiceClient::new(svc);
let mut svc_client = TemporalServiceClient::new(svc, compression);

let capabilities = if !options.skip_get_system_info {
match svc_client
Expand Down Expand Up @@ -574,31 +582,33 @@ fn get_decode_max_size() -> usize {
}

impl TemporalServiceClient {
fn new<T>(svc: T) -> Self
fn new<T>(svc: T, compression: GrpcCompression) -> Self
where
T: GrpcService<Body> + Send + Sync + Clone + 'static,
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
T::Error: Into<tonic::codegen::StdError>,
<T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
<T as GrpcService<Body>>::Future: Send,
{
let workflow_svc_client = Box::new(
WorkflowServiceClient::new(svc.clone())
.max_decoding_message_size(get_decode_max_size()),
);
let operator_svc_client = Box::new(
OperatorServiceClient::new(svc.clone())
.max_decoding_message_size(get_decode_max_size()),
);
let cloud_svc_client = Box::new(
CloudServiceClient::new(svc.clone()).max_decoding_message_size(get_decode_max_size()),
);
let test_svc_client = Box::new(
TestServiceClient::new(svc.clone()).max_decoding_message_size(get_decode_max_size()),
);
let health_svc_client = Box::new(
HealthClient::new(svc.clone()).max_decoding_message_size(get_decode_max_size()),
);
// The generated service clients don't share a trait exposing the compression setters, so
// a macro applies the same configuration to each concrete client type.
macro_rules! configure {
($client:expr) => {{
let client = $client.max_decoding_message_size(get_decode_max_size());
match compression {
GrpcCompression::Gzip => client
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip),
GrpcCompression::None => client,
}
}};
}

let workflow_svc_client = Box::new(configure!(WorkflowServiceClient::new(svc.clone())));
let operator_svc_client = Box::new(configure!(OperatorServiceClient::new(svc.clone())));
let cloud_svc_client = Box::new(configure!(CloudServiceClient::new(svc.clone())));
let test_svc_client = Box::new(configure!(TestServiceClient::new(svc.clone())));
let health_svc_client = Box::new(configure!(HealthClient::new(svc.clone())));
Comment on lines +593 to +611

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a trait would be much clearer here, though the macro isn't horrible as macros go.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It ends up being uglier since the trait would need to be defined down in the proto crate


Self {
workflow_svc_client,
Expand Down
18 changes: 18 additions & 0 deletions crates/client/src/options_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ pub struct ConnectionOptions {
pub disable_error_code_metric_tags: bool,
/// If set, all gRPC calls will be routed through the provided service.
pub service_override: Option<callback_based::CallbackBasedGrpcService>,
/// Controls transport-level gRPC compression for the client. Defaults to
/// [GrpcCompression::Gzip], which compresses outbound request bodies and accepts
/// compressed responses. Set to [GrpcCompression::None] to opt out.
/// If service_override is specified, is forced to `None`.
#[builder(default)]
pub grpc_compression: GrpcCompression,

// Internal / Core-based SDK only options below =============================================
/// If set true, get_system_info will not be called upon connection.
Expand Down Expand Up @@ -132,6 +138,18 @@ pub struct ClientOptions {
pub data_converter: DataConverter,
}

/// Selects the transport-level compression used for gRPC calls. See
/// [ConnectionOptions::grpc_compression].
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum GrpcCompression {
/// Do not compress requests or advertise acceptance of compressed responses.
None,
/// Gzip-compress outbound requests and accept gzip-compressed responses.
#[default]
Gzip,
}

/// Configuration options for TLS
#[derive(Clone, Default)]
pub struct TlsOptions {
Expand Down
11 changes: 11 additions & 0 deletions crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
#include <stdint.h>
#include <stdlib.h>

typedef enum TemporalCoreClientGrpcCompression {
TemporalCoreClientGrpcCompression_Gzip = 0,
TemporalCoreClientGrpcCompression_None = 1,
} TemporalCoreClientGrpcCompression;

typedef enum TemporalCoreRpcService {
Workflow = 1,
Operator,
Expand Down Expand Up @@ -186,6 +191,12 @@ typedef struct TemporalCoreConnectionOptions {
* http_connect_proxy_options is also set.
*/
const struct TemporalCoreClientDnsLoadBalancingOptions *dns_load_balancing_options;
/**
* Selects transport-level gRPC compression. The zero value enables gzip, which is the
* default. Ignored when grpc_override_callback is set, since that transport cannot decode
* compressed request bodies.
*/
enum TemporalCoreClientGrpcCompression grpc_compression;
} TemporalCoreConnectionOptions;

typedef struct TemporalCoreByteArray {
Expand Down
40 changes: 40 additions & 0 deletions crates/sdk-core-c-bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ pub struct ConnectionOptions {
/// refreshed. If null, DNS load balancing is disabled. Ignored (forced off) when
/// http_connect_proxy_options is also set.
pub dns_load_balancing_options: *const ClientDnsLoadBalancingOptions,
/// Selects transport-level gRPC compression. The zero value enables gzip, which is the
/// default. Ignored when grpc_override_callback is set, since that transport cannot decode
/// compressed request bodies.
pub grpc_compression: ClientGrpcCompression,
}

/// cbindgen:prefix-with-name=true
Comment thread
jmaeagle99 marked this conversation as resolved.
#[derive(Clone, Copy)]
#[repr(C)]
pub enum ClientGrpcCompression {
Gzip = 0,
Comment thread
jmaeagle99 marked this conversation as resolved.
Comment thread
jmaeagle99 marked this conversation as resolved.
None = 1,
}

#[repr(C)]
Expand Down Expand Up @@ -1439,6 +1451,10 @@ impl TryFrom<&ConnectionOptions> for temporalio_client::ConnectionOptions {
.maybe_http_connect_proxy(http_connect_proxy)
.dns_load_balancing(dns_load_balancing)
.maybe_tls_options(tls_cfg)
.grpc_compression(match opts.grpc_compression {
ClientGrpcCompression::Gzip => temporalio_client::GrpcCompression::Gzip,
ClientGrpcCompression::None => temporalio_client::GrpcCompression::None,
})
.build(),
)
}
Expand Down Expand Up @@ -1550,6 +1566,7 @@ mod tests {
grpc_override_callback: None,
grpc_override_callback_user_data: std::ptr::null_mut(),
dns_load_balancing_options: std::ptr::null(),
grpc_compression: ClientGrpcCompression::Gzip,
}
}

Expand All @@ -1576,6 +1593,29 @@ mod tests {
assert_eq!(dns_opts.resolution_interval, Duration::from_millis(5_000));
}

#[test]
fn grpc_compression_defaults_to_gzip() {
Comment thread
jmaeagle99 marked this conversation as resolved.
let opts = base_connection_options();
let converted: temporalio_client::ConnectionOptions = (&opts).try_into().unwrap();
assert_eq!(
converted.grpc_compression,
temporalio_client::GrpcCompression::Gzip
);
}

#[test]
fn grpc_compression_none_passes_through() {
let opts = ConnectionOptions {
grpc_compression: ClientGrpcCompression::None,
..base_connection_options()
};
let converted: temporalio_client::ConnectionOptions = (&opts).try_into().unwrap();
assert_eq!(
converted.grpc_compression,
temporalio_client::GrpcCompression::None
);
}

#[test]
fn dns_load_balancing_silently_disabled_when_http_proxy_set() {
let dns = ClientDnsLoadBalancingOptions {
Expand Down
6 changes: 6 additions & 0 deletions crates/sdk-core-c-bridge/src/tests/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,12 @@ impl Context {
dns_load_balancing_options: pointer_or_null(dns_load_balancing_options.as_deref()),
grpc_override_callback,
grpc_override_callback_user_data,
grpc_compression: match options.grpc_compression {
temporalio_client::GrpcCompression::None => {
crate::client::ClientGrpcCompression::None
}
_ => crate::client::ClientGrpcCompression::Gzip,
},
});

let client_options_ptr = &*client_options as *const _;
Expand Down
1 change: 1 addition & 0 deletions crates/sdk-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ bytes = "1.10"
clap = { version = "4.5", features = ["derive"] }
criterion = { version = "0.8", features = ["async", "async_tokio"] }
crossbeam-queue = "0.3"
flate2 = "1.1"
http-body-util = { version = "0.1" }
hyper = { version = "1.7" }
hyper-util = { version = "0.1", features = [
Expand Down
5 changes: 5 additions & 0 deletions crates/sdk-core/tests/cloud_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ async fn grpc_message_too_large_test() {
shared_tests::grpc_message_too_large().await
}

#[tokio::test]
async fn grpc_compression() {
shared_tests::grpc_compression().await
}

#[tokio::test]
async fn priority_values_sent_to_server() {
shared_tests::priority::priority_values_sent_to_server().await
Expand Down
25 changes: 22 additions & 3 deletions crates/sdk-core/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use std::{
time::{Duration, Instant},
};
use temporalio_client::{
Client, ClientTlsOptions, Connection, ConnectionOptions, NamespacedClient, TlsOptions,
UntypedWorkflow, UntypedWorkflowHandle, WorkflowExecutionInfo, WorkflowGetResultOptions,
WorkflowHandle, WorkflowStartOptions,
Client, ClientTlsOptions, Connection, ConnectionOptions, GrpcCompression, NamespacedClient,
TlsOptions, UntypedWorkflow, UntypedWorkflowHandle, WorkflowExecutionInfo,
WorkflowGetResultOptions, WorkflowHandle, WorkflowStartOptions,
errors::{WorkflowGetResultError, WorkflowStartError},
grpc::WorkflowService,
};
Expand Down Expand Up @@ -209,6 +209,10 @@ pub(crate) fn init_integ_telem() -> Option<&'static CoreRuntime> {
}

pub(crate) async fn get_cloud_client() -> Client {
get_cloud_client_with_compression(GrpcCompression::default()).await
}

async fn get_cloud_client_with_compression(compression: GrpcCompression) -> Client {
let cloud_addr = env::var("TEMPORAL_CLOUD_ADDRESS").unwrap();
let cloud_key = env::var("TEMPORAL_CLIENT_KEY").unwrap();

Expand All @@ -228,13 +232,28 @@ pub(crate) async fn get_cloud_client() -> Client {
}),
..Default::default()
})
.grpc_compression(compression)
.build();
let connection = Connection::connect(connection_opts).await.unwrap();
let namespace = env::var("TEMPORAL_NAMESPACE").expect("TEMPORAL_NAMESPACE must be set");
let client_opts = temporalio_client::ClientOptions::new(namespace).build();
Client::new(connection, client_opts).unwrap()
}

/// Gets a namespaced client targeting cloud if the cloud env vars are present, otherwise the local
/// dev server, configured with the given transport-level gRPC compression.
pub(crate) async fn get_cloud_or_local_client(compression: GrpcCompression) -> Client {
if env::var("TEMPORAL_CLOUD_ADDRESS").is_ok() {
get_cloud_client_with_compression(compression).await
} else {
let mut opts = get_integ_server_options();
opts.grpc_compression = compression;
let connection = Connection::connect(opts).await.expect("Must connect");
let client_opts = temporalio_client::ClientOptions::new(integ_namespace()).build();
Client::new(connection, client_opts).unwrap()
}
}

/// Implements a builder pattern to help integ tests initialize core and create workflows
pub(crate) struct CoreWfStarter {
/// Used for both the task queue and workflow id
Expand Down
5 changes: 5 additions & 0 deletions crates/sdk-core/tests/integ_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ async fn namespace_header_attached_to_relevant_calls() {
server_handle.await.unwrap();
}

#[tokio::test]
async fn grpc_compression() {
crate::shared_tests::grpc_compression().await
}

#[tokio::test]
async fn cloud_ops_test() {
let api_key = match env::var("TEMPORAL_CLIENT_CLOUD_API_KEY") {
Expand Down
Loading
Loading