Skip to content

Commit ebc728c

Browse files
Support compressed frames.
1 parent 940f928 commit ebc728c

File tree

6 files changed

+82
-24
lines changed

6 files changed

+82
-24
lines changed

ext/hyper_ruby/src/grpc.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,25 +65,20 @@ pub fn is_grpc_request(request: &HyperRequest<Bytes>) -> bool {
6565
true
6666
}
6767

68-
pub fn decode_grpc_frame(bytes: &[u8]) -> Option<Bytes> {
68+
pub fn decode_grpc_frame(bytes: &Bytes) -> Option<(bool, Bytes)> {
6969
if bytes.len() < GRPC_HEADER_SIZE {
7070
return None;
7171
}
7272

7373
// GRPC frame format:
7474
// Compressed-Flag (1 byte) | Message-Length (4 bytes) | Message
7575
let compressed = bytes[0] != 0;
76-
if compressed {
77-
// We don't support compression yet
78-
return None;
79-
}
80-
8176
let message_len = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]) as usize;
8277
if bytes.len() < GRPC_HEADER_SIZE + message_len {
8378
return None;
8479
}
8580

86-
Some(Bytes::copy_from_slice(&bytes[GRPC_HEADER_SIZE..GRPC_HEADER_SIZE + message_len]))
81+
Some((compressed, bytes.slice(GRPC_HEADER_SIZE..GRPC_HEADER_SIZE + message_len)))
8782
}
8883

8984
pub fn encode_grpc_frame(message: &[u8]) -> Bytes {

ext/hyper_ruby/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ fn init(ruby: &Ruby) -> Result<(), MagnusError> {
452452
grpc_request_class.define_method("body", method!(GrpcRequest::body, 0))?;
453453
grpc_request_class.define_method("fill_body", method!(GrpcRequest::fill_body, 1))?;
454454
grpc_request_class.define_method("body_size", method!(GrpcRequest::body_size, 0))?;
455+
grpc_request_class.define_method("compressed?", method!(GrpcRequest::is_compressed, 0))?;
455456
grpc_request_class.define_method("inspect", method!(GrpcRequest::inspect, 0))?;
456457

457458
Ok(())

ext/hyper_ruby/src/request.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,15 @@ impl FillBuffer for Request {
7575

7676
impl FillBuffer for GrpcRequest {
7777
fn get_body_bytes(&self) -> Bytes {
78-
grpc::decode_grpc_frame(self.request.body()).unwrap_or_else(|| Bytes::new())
78+
if let Some((_, message)) = grpc::decode_grpc_frame(self.request.body()) {
79+
message
80+
} else {
81+
Bytes::new()
82+
}
7983
}
8084

8185
fn get_body_size(&self) -> usize {
82-
if let Some(message) = grpc::decode_grpc_frame(self.request.body()) {
86+
if let Some((_, message)) = grpc::decode_grpc_frame(self.request.body()) {
8387
message.len()
8488
} else {
8589
0
@@ -217,6 +221,14 @@ impl GrpcRequest {
217221
self.fill_buffer(buffer)
218222
}
219223

224+
pub fn is_compressed(&self) -> bool {
225+
if let Some((compressed, _)) = grpc::decode_grpc_frame(self.request.body()) {
226+
compressed
227+
} else {
228+
false
229+
}
230+
}
231+
220232
pub fn inspect(&self) -> RString {
221233
let body_size = self.body_size();
222234
RString::new(&format!("#<HyperRuby::GrpcRequest service={} method={} body_size={}>", self.service, self.method, body_size))

ext/hyper_ruby/src/response.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl GrpcResponse {
194194
pub fn body(&self) -> RString {
195195
// For gRPC responses, decode the frame
196196
let body = self.response.body().get_data();
197-
if let Some(message) = grpc::decode_grpc_frame(body) {
197+
if let Some((_, message)) = grpc::decode_grpc_frame(body) {
198198
RString::from_slice(message.as_ref())
199199
} else {
200200
RString::new("")

test/test_bad_http_requests.rb

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -104,20 +104,24 @@ def test_header_timeout
104104

105105
# Sleep longer than the timeout
106106
sleep 1.5
107-
108-
# Try to send the rest of the headers, but the connection should be closed
109-
socket.write("Content-Length: 0\r\n")
110-
socket.write("Connection: close\r\n")
111-
socket.write("\r\n")
112-
113-
# Attempt to read response - should be a timeout or connection closed
114-
response = read_http_response(socket)
115-
socket.close
116-
117-
# The server might respond with a 408 timeout, or might just close the connection
118-
# Both behaviors are acceptable according to HTTP/1.1 spec
119-
if response[:status]
120-
assert_equal 408, response[:status].split(" ")[1].to_i # Request Timeout if we got a response
107+
begin
108+
# Try to send the rest of the headers, but the connection should be closed
109+
socket.write("Content-Length: 0\r\n")
110+
socket.write("Connection: close\r\n")
111+
socket.write("\r\n")
112+
113+
# Attempt to read response - should be a timeout or connection closed
114+
response = read_http_response(socket)
115+
socket.close
116+
117+
# The server might respond with a 408 timeout, or might just close the connection
118+
# Both behaviors are acceptable according to HTTP/1.1 spec
119+
if response[:status]
120+
assert_equal 408, response[:status].split(" ")[1].to_i # Request Timeout if we got a response
121+
end
122+
rescue Errno::EPIPE
123+
# This is expected if the server closes the connection due to timeout/error
124+
# This is not an error, so we don't need to assert anything
121125
end
122126
end
123127
end

test/test_grpc.rb

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
# frozen_string_literal: true
22

33
require "test_helper"
4+
require 'zlib'
45
require_relative "echo_pb"
56
require_relative "echo_services_pb"
67

8+
79
class TestGrpc < HyperRubyTest
810
def test_grpc_request
911
buffer = String.new(capacity: 1024)
@@ -129,6 +131,28 @@ def test_grpc_over_unix_socket
129131
end
130132
end
131133

134+
def test_grpc_compression
135+
buffer = String.new(capacity: 1024)
136+
compression_options = GRPC::Core::CompressionOptions.new(default_algorithm: :gzip)
137+
compression_channel_args = compression_options.to_channel_arg_hash
138+
139+
with_server(-> (request) { handler_grpc_compressed(request, buffer) }) do |_client|
140+
stub = Echo::Echo::Stub.new(
141+
"127.0.0.1:3010",
142+
:this_channel_is_insecure,
143+
channel_args: {
144+
'grpc.enable_http_proxy' => 0,
145+
}.merge(compression_channel_args)
146+
)
147+
148+
request = Echo::EchoRequest.new(message: "Hello Compressed GRPC " + ("a" * 10000))
149+
response = stub.echo(request)
150+
151+
assert_instance_of Echo::EchoResponse, response
152+
assert_equal "Decompressed: Hello Compressed GRPC " + ("a" * 10000), response.message
153+
end
154+
end
155+
132156
private
133157

134158
def handler_grpc(request, buffer)
@@ -185,4 +209,26 @@ def handler_grpc_status(request)
185209
HyperRuby::GrpcResponse.error(2, "unknown error") # UNKNOWN = 2
186210
end
187211
end
212+
213+
def handler_grpc_compressed(request, buffer)
214+
assert_equal "application/grpc", request.header("content-type")
215+
assert_equal "echo.Echo", request.service
216+
assert_equal "Echo", request.method
217+
# Check if the message is compressed
218+
assert request.compressed?, "Expected request to be compressed"
219+
220+
# Get the compressed message
221+
request.fill_body(buffer)
222+
223+
decompressed = Zlib.gunzip(buffer)
224+
echo_request = Echo::EchoRequest.decode(decompressed)
225+
226+
echo_response = Echo::EchoResponse.new(message: "Decompressed: #{echo_request.message}")
227+
response_data = Echo::EchoResponse.encode(echo_response)
228+
229+
HyperRuby::GrpcResponse.new(0, response_data)
230+
rescue => e
231+
pp e
232+
raise e
233+
end
188234
end

0 commit comments

Comments
 (0)