Skip to content

Commit 5a4c1eb

Browse files
Backpressure fixes and tests.
1 parent d88fc26 commit 5a4c1eb

File tree

4 files changed

+423
-104
lines changed

4 files changed

+423
-104
lines changed

Gemfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ gem "minitest", "~> 5.0"
1414

1515
gem "httpx", "~> 1.2"
1616

17+
# Testing dependencies
18+
gem "concurrent-ruby", "~> 1.2"
19+
1720
# gRPC dependencies
1821
gem "grpc", "~> 1.62"
1922
gem "grpc-tools", "~> 1.62"

Gemfile.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ PATH
66
GEM
77
remote: https://rubygems.org/
88
specs:
9+
concurrent-ruby (1.3.4)
910
google-protobuf (3.25.6)
1011
google-protobuf (3.25.6-arm64-darwin)
1112
googleapis-common-protos-types (1.18.0)
@@ -33,6 +34,7 @@ PLATFORMS
3334
ruby
3435

3536
DEPENDENCIES
37+
concurrent-ruby (~> 1.2)
3638
google-protobuf (~> 3.25)
3739
grpc (~> 1.62)
3840
grpc-tools (~> 1.62)

ext/hyper_ruby/src/lib.rs

Lines changed: 119 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,11 @@ struct Server {
114114
impl Server {
115115
pub fn new() -> Self {
116116
let config = ServerConfig::new();
117-
let (work_tx, work_rx) = crossbeam_channel::bounded(config.channel_capacity);
118117
Self {
119118
server_handle: Arc::new(Mutex::new(None)),
120119
config: RefCell::new(config),
121-
work_rx: RefCell::new(Some(work_rx)),
122-
work_tx: RefCell::new(Some(Arc::new(work_tx))),
120+
work_rx: RefCell::new(None),
121+
work_tx: RefCell::new(None),
123122
runtime: RefCell::new(None),
124123
shutdown: RefCell::new(None),
125124
}
@@ -173,87 +172,96 @@ impl Server {
173172
// Method that Ruby worker threads will call with a block
174173
pub fn run_worker(&self) -> Result<(), MagnusError> {
175174
let block = block_proc().unwrap();
176-
if let Some(work_rx) = self.work_rx.borrow().as_ref() {
177-
178-
loop {
179-
// try getting the next request without yielding the GVL, if there's nothing, wait for one
180-
let work_request = match work_rx.try_recv() {
181-
Ok(work_request) => Ok(work_request),
182-
Err(crossbeam_channel::TryRecvError::Empty) => {
183-
nogvl(|| work_rx.recv())
184-
},
185-
Err(crossbeam_channel::TryRecvError::Disconnected) => {
186-
break;
187-
}
188-
};
189-
190-
match work_request {
191-
Ok(work_request) => {
192-
let hyper_request = work_request.request;
193-
194-
debug!("Processing request:");
195-
debug!(" Method: {}", hyper_request.method());
196-
debug!(" Path: {}", hyper_request.uri().path());
197-
debug!(" Headers: {:?}", hyper_request.headers());
198-
199-
// Convert to appropriate request type
200-
let value = if grpc::is_grpc_request(&hyper_request) {
201-
debug!("Request identified as gRPC");
202-
if let Some(grpc_request) = GrpcRequest::new(hyper_request) {
203-
grpc_request.into_value()
204-
} else {
205-
error!("Failed to create GrpcRequest due to invalid path - returning gRPC error");
206-
// Invalid gRPC request path
207-
let response = GrpcResponse::error(3_u32.into_value(), RString::new("Invalid gRPC request path")).unwrap()
208-
.into_hyper_response();
209-
work_request.response_tx.send(response).unwrap_or_else(|e| error!("Failed to send response: {:?}", e));
210-
continue;
211-
}
175+
176+
// Check if we have a work_rx channel, error out if not
177+
let work_rx = self.work_rx.borrow().as_ref().ok_or_else(|| {
178+
MagnusError::new(magnus::exception::runtime_error(), "Server must be started before running workers")
179+
})?.clone();
180+
181+
loop {
182+
// try getting the next request without yielding the GVL, if there's nothing, wait for one
183+
let work_request = match work_rx.try_recv() {
184+
Ok(work_request) => Ok(work_request),
185+
Err(crossbeam_channel::TryRecvError::Empty) => {
186+
nogvl(|| work_rx.recv())
187+
},
188+
Err(crossbeam_channel::TryRecvError::Disconnected) => {
189+
break;
190+
}
191+
};
192+
193+
match work_request {
194+
Ok(work_request) => {
195+
let hyper_request = work_request.request;
196+
197+
debug!("Processing request:");
198+
debug!(" Method: {}", hyper_request.method());
199+
debug!(" Path: {}", hyper_request.uri().path());
200+
debug!(" Headers: {:?}", hyper_request.headers());
201+
202+
// Convert to appropriate request type
203+
let value = if grpc::is_grpc_request(&hyper_request) {
204+
debug!("Request identified as gRPC");
205+
if let Some(grpc_request) = GrpcRequest::new(hyper_request) {
206+
grpc_request.into_value()
212207
} else {
213-
debug!("Request identified as HTTP");
214-
Request::new(hyper_request).into_value()
215-
};
216-
217-
let hyper_response = match block.call::<_, Value>([value]) {
218-
Ok(result) => {
219-
// Try to convert to either Response or GrpcResponse
220-
if let Ok(grpc_response) = Obj::<GrpcResponse>::try_convert(result) {
221-
(*grpc_response).clone().into_hyper_response()
222-
} else if let Ok(http_response) = Obj::<Response>::try_convert(result) {
223-
(*http_response).clone().into_hyper_response()
224-
} else {
225-
error!("Block returned invalid response type - returning 500 Internal Server Error");
226-
create_error_response("Internal server error")
227-
}
228-
},
229-
Err(e) => {
230-
error!("Block call failed with error: {:?} - returning 500 Internal Server Error", e);
208+
error!("Failed to create GrpcRequest due to invalid path - returning gRPC error");
209+
// Invalid gRPC request path
210+
let response = GrpcResponse::error(3_u32.into_value(), RString::new("Invalid gRPC request path")).unwrap()
211+
.into_hyper_response();
212+
work_request.response_tx.send(response).unwrap_or_else(|e| error!("Failed to send response: {:?}", e));
213+
continue;
214+
}
215+
} else {
216+
debug!("Request identified as HTTP");
217+
Request::new(hyper_request).into_value()
218+
};
219+
220+
let hyper_response = match block.call::<_, Value>([value]) {
221+
Ok(result) => {
222+
// Try to convert to either Response or GrpcResponse
223+
if let Ok(grpc_response) = Obj::<GrpcResponse>::try_convert(result) {
224+
(*grpc_response).clone().into_hyper_response()
225+
} else if let Ok(http_response) = Obj::<Response>::try_convert(result) {
226+
(*http_response).clone().into_hyper_response()
227+
} else {
228+
error!("Block returned invalid response type - returning 500 Internal Server Error");
231229
create_error_response("Internal server error")
232230
}
233-
};
234-
235-
match work_request.response_tx.send(hyper_response) {
236-
Ok(_) => (),
237-
Err(e) => error!("Failed to send response back to client: {:?} - response dropped", e),
231+
},
232+
Err(e) => {
233+
error!("Block call failed with error: {:?} - returning 500 Internal Server Error", e);
234+
create_error_response("Internal server error")
238235
}
236+
};
237+
238+
match work_request.response_tx.send(hyper_response) {
239+
Ok(_) => (),
240+
Err(e) => error!("Failed to send response back to client: {:?} - response dropped", e),
239241
}
240-
Err(_) => {
241-
// Channel closed, exit thread
242-
break;
243-
}
242+
}
243+
Err(_) => {
244+
// Channel closed, exit thread
245+
break;
244246
}
245247
}
246248
}
249+
247250
Ok(())
248251
}
249252

250253
pub fn start(&self) -> Result<(), MagnusError> {
251254
let config = self.config.borrow().clone();
252-
let work_tx = self.work_tx.borrow()
253-
.as_ref()
254-
.ok_or_else(|| MagnusError::new(magnus::exception::runtime_error(), "Work channel not initialized"))?
255-
.clone();
256-
255+
256+
// Create the channel with the current configuration
257+
let (work_tx, work_rx) = crossbeam_channel::bounded(config.channel_capacity);
258+
debug!("Created channel with capacity: {}", config.channel_capacity);
259+
260+
// Store the channel
261+
*self.work_rx.borrow_mut() = Some(work_rx);
262+
let work_tx = Arc::new(work_tx);
263+
*self.work_tx.borrow_mut() = Some(work_tx.clone());
264+
257265
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
258266
*self.shutdown.borrow_mut() = Some(shutdown_tx);
259267

@@ -462,51 +470,58 @@ async fn handle_request(
462470
response_tx,
463471
};
464472

473+
debug!("work_tx capacity: {:?}/{:?}", work_tx.len(), work_tx.capacity());
474+
465475
// First try non-blocking send
466476
match work_tx.try_send(with_completion) {
467477
Ok(()) => {
468478
debug!("Successfully queued request (fast path)");
469479
},
470-
Err(crossbeam_channel::TrySendError::Full(completion)) => {
471-
// Channel is full, fall back to blocking send with timeout
472-
debug!("Channel full, attempting send with timeout");
480+
Err(crossbeam_channel::TrySendError::Full(mut completion)) => {
481+
// Channel is full, implement polling with short delays
482+
debug!("Channel full, attempting to send with polling");
473483

474-
// Use send_timeout to implement backpressure
475-
let send_result = tokio::task::spawn_blocking(move || {
476-
work_tx.send_timeout(
477-
completion,
478-
std::time::Duration::from_millis(send_timeout),
479-
)
480-
}).await;
481-
482-
match send_result {
483-
Ok(Ok(())) => {
484-
debug!("Successfully queued request after timeout wait");
485-
},
486-
Ok(Err(crossbeam_channel::SendTimeoutError::Timeout(_))) => {
487-
// Timeout waiting for channel space
484+
// Use polling with sleep to implement backpressure
485+
let start = std::time::Instant::now();
486+
let max_wait = std::time::Duration::from_millis(send_timeout);
487+
let delay_ms = 10; // 10ms delay between attempts
488+
489+
// Create a new oneshot channel for each attempt to avoid cloning
490+
let mut attempts = 0;
491+
loop {
492+
// Check if we've reached the timeout
493+
if start.elapsed() >= max_wait {
488494
warn!("Channel full after timeout - returning 429 Too Many Requests");
489495
return Ok(if is_grpc {
490496
grpc::create_grpc_error_response(429, 8, "Server too busy, try again later") // RESOURCE_EXHAUSTED = 8
491497
} else {
492498
create_too_many_requests_response("Server too busy, try again later")
493499
});
494-
},
495-
Ok(Err(crossbeam_channel::SendTimeoutError::Disconnected(_))) => {
496-
error!("Worker channel disconnected - server is shutting down, returning 500");
497-
return Ok(if is_grpc {
498-
grpc::create_grpc_error_response(500, 13, "Server shutting down")
499-
} else {
500-
create_error_response("Server shutting down")
501-
});
502-
},
503-
Err(_) => {
504-
error!("Task to send request failed - returning 500 Internal Server Error");
505-
return Ok(if is_grpc {
506-
grpc::create_grpc_error_response(500, 13, "Internal server error")
507-
} else {
508-
create_error_response("Internal server error")
509-
});
500+
}
501+
502+
// Sleep for a short delay before trying again
503+
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
504+
505+
// Try to send again
506+
attempts += 1;
507+
debug!("Retry attempt {} after {}ms", attempts, delay_ms);
508+
match work_tx.try_send(completion) {
509+
Ok(()) => {
510+
debug!("Successfully queued request after {} polling attempts", attempts);
511+
break;
512+
},
513+
Err(crossbeam_channel::TrySendError::Full(returned_completion)) => {
514+
// Get our completion request back and try again
515+
completion = returned_completion;
516+
},
517+
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
518+
error!("Worker channel disconnected - server is shutting down, returning 500");
519+
return Ok(if is_grpc {
520+
grpc::create_grpc_error_response(500, 13, "Server shutting down")
521+
} else {
522+
create_error_response("Server shutting down")
523+
});
524+
}
510525
}
511526
}
512527
},

0 commit comments

Comments
 (0)