Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions examples/rust/cycle-benchmark/src/bin/cycle_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,24 @@ fn run_as_primary(params: Params, app_config: ApplicationConfig) {
match signalling {
SignallingType::DirectMpsc => {
let config = direct_mpsc::make_primary_config(params, app_config);
direct_mpsc::Primary::new(config).run().unwrap();
direct_mpsc::Primary::new(config)
.expect("failed to create mpsc primary")
.run()
.unwrap();
}
signalling @ SignallingType::DirectTcp | signalling @ SignallingType::DirectUnix => {
let config = direct_sockets::make_primary_config(params, app_config, signalling);
direct_sockets::Primary::new(config).run().unwrap();
direct_sockets::Primary::new(config)
.expect("failed to create direct socket primary")
.run()
.unwrap();
}
signalling @ SignallingType::RelayedTcp | signalling @ SignallingType::RelayedUnix => {
let config = relayed_sockets::make_primary_config(params, app_config, signalling);
relayed_sockets::Primary::new(config).run().unwrap();
relayed_sockets::Primary::new(config)
.expect("failed to create relayed socket primary")
.run()
.unwrap();
}
}
}
Expand Down Expand Up @@ -233,6 +242,7 @@ mod direct_sockets {
recorder_ids: app_config.recorders(),
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
endpoint: endpoint(&app_config, signalling),
}
}
Expand Down Expand Up @@ -314,6 +324,7 @@ mod relayed_sockets {
recorder_ids: app_config.recorders(),
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
bind_address_senders: endpoints.0,
bind_address_receivers: endpoints.1,
id: agent_id,
Expand Down
18 changes: 13 additions & 5 deletions examples/rust/mini-adas/src/bin/adas_primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ fn main() {
MAX_ADDITIONAL_SUBSCRIBERS,
);

// Setup primary
let mut primary = cfg::Primary::new(config);

// Run primary
primary.run().unwrap()
// Setup and run primary
cfg::Primary::new(config)
.unwrap_or_else(|err| {
feo_log::error!("Failed to initialize primary agent: {err:?}");
std::process::exit(1);
})
.run()
.unwrap();
}

/// Parameters of the primary
Expand Down Expand Up @@ -102,6 +105,7 @@ mod cfg {
recorder_ids: vec![],
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
}
}
}
Expand All @@ -126,6 +130,7 @@ mod cfg {
recorder_ids: params.recorder_ids,
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
endpoint: NodeAddress::Tcp(BIND_ADDR),
}
}
Expand All @@ -151,6 +156,7 @@ mod cfg {
recorder_ids: params.recorder_ids,
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
endpoint: NodeAddress::UnixSocket(socket_paths().0),
}
}
Expand Down Expand Up @@ -186,6 +192,7 @@ mod cfg {
recorder_ids: params.recorder_ids,
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
bind_address_senders: NodeAddress::Tcp(BIND_ADDR),
bind_address_receivers: NodeAddress::Tcp(BIND_ADDR2),
id: AGENT_ID,
Expand Down Expand Up @@ -225,6 +232,7 @@ mod cfg {
recorder_ids: params.recorder_ids,
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
bind_address_senders: NodeAddress::UnixSocket(socket_paths().0),
bind_address_receivers: NodeAddress::UnixSocket(socket_paths().1),
id: AGENT_ID,
Expand Down
13 changes: 9 additions & 4 deletions feo/src/agent/direct/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct PrimaryConfig {
pub worker_assignments: Vec<(WorkerId, Vec<ActivityIdAndBuilder>)>,
/// Receive timeout of the scheduler's connector
pub timeout: Duration,
/// Timeout for waiting on initial connections from workers/recorders.
pub connection_timeout: Duration,
/// Endpoint on which the connector of the scheduler waits for connections
pub endpoint: NodeAddress,
}
Expand All @@ -55,14 +57,15 @@ pub struct Primary {

impl Primary {
/// Create a new instance
pub fn new(config: PrimaryConfig) -> Self {
pub fn new(config: PrimaryConfig) -> Result<Self, Error> {
let PrimaryConfig {
cycle_time,
activity_dependencies,
recorder_ids,
endpoint,
worker_assignments,
timeout,
connection_timeout,
} = config;

// Create worker threads first so that the connector of the scheduler can connect
Expand Down Expand Up @@ -100,14 +103,16 @@ impl Primary {
addr,
activity_dependencies.keys().cloned(),
recorder_ids.iter().cloned(),
connection_timeout,
)) as Box<dyn ConnectScheduler>,
NodeAddress::UnixSocket(path) => Box::new(UnixSchedulerConnector::new(
&path,
activity_dependencies.keys().cloned(),
recorder_ids.iter().cloned(),
connection_timeout,
)) as Box<dyn ConnectScheduler>,
};
connector.connect_remotes().expect("failed to connect");
connector.connect_remotes()?;

let scheduler = Scheduler::new(
cycle_time,
Expand All @@ -117,10 +122,10 @@ impl Primary {
recorder_ids,
);

Self {
Ok(Self {
scheduler,
_worker_threads,
}
})
}

/// Run the agent
Expand Down
8 changes: 4 additions & 4 deletions feo/src/agent/direct/primary_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct Primary {

impl Primary {
/// Create a new instance
pub fn new(config: PrimaryConfig) -> Self {
pub fn new(config: PrimaryConfig) -> Result<Self, Error> {
let PrimaryConfig {
cycle_time,
activity_dependencies,
Expand Down Expand Up @@ -89,7 +89,7 @@ impl Primary {
})
.collect();

connector.connect_remotes().expect("failed to connect");
connector.connect_remotes()?;

let scheduler = Scheduler::new(
cycle_time,
Expand All @@ -99,10 +99,10 @@ impl Primary {
recorder_ids,
);

Self {
Ok(Self {
scheduler,
_worker_threads,
}
})
}

/// Run the agent
Expand Down
15 changes: 9 additions & 6 deletions feo/src/agent/relayed/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct PrimaryConfig {
pub worker_assignments: Vec<(WorkerId, Vec<ActivityIdAndBuilder>)>,
/// Receive timeout of the scheduler's connector
pub timeout: Duration,
/// Timeout for waiting on initial connections from workers/recorders.
pub connection_timeout: Duration,
/// The socket address to which secondary agents' senders shall connect
pub bind_address_senders: NodeAddress,
/// The socket address to which secondary agents' receivers shall connect
Expand All @@ -62,7 +64,7 @@ pub struct Primary {

impl Primary {
/// Create a new instance
pub fn new(config: PrimaryConfig) -> Self {
pub fn new(config: PrimaryConfig) -> Result<Self, Error> {
let PrimaryConfig {
id,
cycle_time,
Expand All @@ -72,6 +74,7 @@ impl Primary {
bind_address_receivers,
worker_assignments,
timeout,
connection_timeout,
worker_agent_map,
activity_worker_map,
} = config;
Expand All @@ -84,7 +87,7 @@ impl Primary {
id,
bind_senders,
bind_receivers,
timeout,
connection_timeout,
worker_agent_map,
activity_worker_map,
recorder_ids.clone(),
Expand All @@ -97,7 +100,7 @@ impl Primary {
id,
bind_senders,
bind_receivers,
timeout,
connection_timeout,
worker_agent_map,
activity_worker_map,
recorder_ids.clone(),
Expand Down Expand Up @@ -126,7 +129,7 @@ impl Primary {
})
.collect();

connector.connect_remotes().expect("failed to connect");
connector.connect_remotes()?;

let scheduler = Scheduler::new(
cycle_time,
Expand All @@ -136,10 +139,10 @@ impl Primary {
recorder_ids,
);

Self {
Ok(Self {
scheduler,
_worker_threads,
}
})
}

/// Run the agent
Expand Down
24 changes: 19 additions & 5 deletions feo/src/signalling/direct/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use alloc::vec::Vec;
use core::net::SocketAddr;
use core::time::Duration;
use feo_log::warn;
use feo_time::Instant;
use mio::net::{TcpListener, UnixListener};
use mio::{Events, Token};
use std::collections::{HashMap, HashSet};
Expand All @@ -46,6 +47,7 @@ where

all_activities: Vec<ActivityId>,
all_recorders: Vec<AgentId>,
connection_timeout: Duration,
}

impl<L> SchedulerConnector<L>
Expand All @@ -57,6 +59,7 @@ where
server: SocketServer<L>,
activity_ids: impl IntoIterator<Item = ActivityId>,
recorder_ids: impl IntoIterator<Item = AgentId>,
connection_timeout: Duration,
) -> Self {
let events = Events::with_capacity(32);

Expand All @@ -73,6 +76,7 @@ where
recorder_id_token_map,
all_activities,
all_recorders,
connection_timeout,
}
}
}
Expand All @@ -83,9 +87,10 @@ impl TcpSchedulerConnector {
bind_address: SocketAddr,
activity_ids: impl IntoIterator<Item = ActivityId>,
recorder_ids: impl IntoIterator<Item = AgentId>,
connection_timeout: Duration,
) -> Self {
let tcp_server = TcpServer::new(bind_address);
Self::new_with_server(tcp_server, activity_ids, recorder_ids)
Self::new_with_server(tcp_server, activity_ids, recorder_ids, connection_timeout)
}
}

Expand All @@ -95,9 +100,10 @@ impl UnixSchedulerConnector {
path: &Path,
activity_ids: impl IntoIterator<Item = ActivityId>,
recorder_ids: impl IntoIterator<Item = AgentId>,
connection_timeout: Duration,
) -> Self {
let unix_server = UnixServer::new(path);
Self::new_with_server(unix_server, activity_ids, recorder_ids)
Self::new_with_server(unix_server, activity_ids, recorder_ids, connection_timeout)
}
}

Expand All @@ -109,11 +115,19 @@ where
let mut missing_activities: HashSet<ActivityId> =
self.all_activities.iter().cloned().collect();
let mut missing_recorders: HashSet<AgentId> = self.all_recorders.iter().cloned().collect();
let start_time = Instant::now();

while !missing_activities.is_empty() || !missing_recorders.is_empty() {
if let Some((token, signal)) = self
.server
.receive(&mut self.events, Duration::from_secs(1))
let elapsed = start_time.elapsed();
if elapsed >= self.connection_timeout {
return Err(Error::Io((
std::io::ErrorKind::TimedOut.into(),
"CONNECTION_TIMEOUT",
)));
}
let remaining_timeout = self.connection_timeout.saturating_sub(elapsed);
// Wait for a new connection, but no longer than the remaining overall timeout.
if let Some((token, signal)) = self.server.receive(&mut self.events, remaining_timeout)
{
match signal {
ProtocolSignal::ActivityHello(activity_id) => {
Expand Down
Loading
Loading