-
Notifications
You must be signed in to change notification settings - Fork 10
Connection timeout #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
fd411d7
0bccf71
14f95ec
9dc193f
bf8d314
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,6 +46,7 @@ where | |
|
|
||
| all_activities: Vec<ActivityId>, | ||
| all_recorders: Vec<AgentId>, | ||
| connection_timeout: Duration, | ||
| } | ||
|
|
||
| impl<L> SchedulerConnector<L> | ||
|
|
@@ -57,6 +58,7 @@ where | |
| server: SocketServer<L>, | ||
| activity_ids: impl IntoIterator<Item = ActivityId>, | ||
| recorder_ids: impl IntoIterator<Item = AgentId>, | ||
| connection_timeout: Duration, | ||
| ) -> Self { | ||
| let events = Events::with_capacity(32); | ||
|
|
||
|
|
@@ -73,6 +75,7 @@ where | |
| recorder_id_token_map, | ||
| all_activities, | ||
| all_recorders, | ||
| connection_timeout, | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -83,9 +86,10 @@ impl TcpSchedulerConnector { | |
| bind_address: SocketAddr, | ||
| activity_ids: impl IntoIterator<Item = ActivityId>, | ||
| recorder_ids: impl IntoIterator<Item = AgentId>, | ||
| connection_timeout: Duration, | ||
| ) -> Self { | ||
| let tcp_server = TcpServer::new(bind_address); | ||
| Self::new_with_server(tcp_server, activity_ids, recorder_ids) | ||
| Self::new_with_server(tcp_server, activity_ids, recorder_ids, connection_timeout) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -95,9 +99,10 @@ impl UnixSchedulerConnector { | |
| path: &Path, | ||
| activity_ids: impl IntoIterator<Item = ActivityId>, | ||
| recorder_ids: impl IntoIterator<Item = AgentId>, | ||
| connection_timeout: Duration, | ||
| ) -> Self { | ||
| let unix_server = UnixServer::new(path); | ||
| Self::new_with_server(unix_server, activity_ids, recorder_ids) | ||
| Self::new_with_server(unix_server, activity_ids, recorder_ids, connection_timeout) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -109,11 +114,18 @@ where | |
| let mut missing_activities: HashSet<ActivityId> = | ||
| self.all_activities.iter().cloned().collect(); | ||
| let mut missing_recorders: HashSet<AgentId> = self.all_recorders.iter().cloned().collect(); | ||
| let start_time = std::time::Instant::now(); | ||
|
||
|
|
||
| while !missing_activities.is_empty() || !missing_recorders.is_empty() { | ||
| let elapsed = start_time.elapsed(); | ||
| if elapsed >= self.connection_timeout { | ||
| return Err(Error::Io((std::io::ErrorKind::TimedOut.into(), "CONNECTION_TIMEOUT"))); | ||
| } | ||
| let remaining_timeout = self.connection_timeout.saturating_sub(elapsed); | ||
| // Wait for a new connection, but no longer than the remaining overall timeout. | ||
| if let Some((token, signal)) = self | ||
| .server | ||
| .receive(&mut self.events, Duration::from_secs(1)) | ||
| .receive(&mut self.events, remaining_timeout) | ||
| { | ||
| match signal { | ||
| ProtocolSignal::ActivityHello(activity_id) => { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our original implementation we avoided making senders and receivers "Send", because this is a limitation that not all communication implementations do actually fulfill. As an example, we found iceoryx2. For that reason, we used builders that are passed to the target thread, and the target thread uses them to build it senders and receivers directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, thank you for the detailed clarification. I wasn't familiar with using the builder pattern to avoid Send bounds, and your explanation was very helpful.
I'll remove the unnecessary Send trait bounds, as the builder pattern already ensures thread safety by creating the channel components within the target thread. I'll ensure this pattern is followed consistently.