Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions examples/rust/cycle-benchmark/src/bin/cycle_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,24 @@ fn run_as_primary(params: Params, app_config: ApplicationConfig) {
match signalling {
SignallingType::DirectMpsc => {
let config = direct_mpsc::make_primary_config(params, app_config);
direct_mpsc::Primary::new(config).run().unwrap();
direct_mpsc::Primary::new(config)
.expect("failed to create mpsc primary")
.run()
.unwrap();
}
signalling @ SignallingType::DirectTcp | signalling @ SignallingType::DirectUnix => {
let config = direct_sockets::make_primary_config(params, app_config, signalling);
direct_sockets::Primary::new(config).run().unwrap();
direct_sockets::Primary::new(config)
.expect("failed to create direct socket primary")
.run()
.unwrap();
}
signalling @ SignallingType::RelayedTcp | signalling @ SignallingType::RelayedUnix => {
let config = relayed_sockets::make_primary_config(params, app_config, signalling);
relayed_sockets::Primary::new(config).run().unwrap();
relayed_sockets::Primary::new(config)
.expect("failed to create relayed socket primary")
.run()
.unwrap();
}
}
}
Expand Down Expand Up @@ -185,6 +194,7 @@ mod direct_mpsc {
recorder_ids: vec![],
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
}
}

Expand Down Expand Up @@ -233,6 +243,7 @@ mod direct_sockets {
recorder_ids: app_config.recorders(),
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
endpoint: endpoint(&app_config, signalling),
}
}
Expand Down Expand Up @@ -314,6 +325,7 @@ mod relayed_sockets {
recorder_ids: app_config.recorders(),
worker_assignments: app_config.worker_assignments().remove(&agent_id).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
bind_address_senders: endpoints.0,
bind_address_receivers: endpoints.1,
id: agent_id,
Expand Down
18 changes: 13 additions & 5 deletions examples/rust/mini-adas/src/bin/adas_primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ fn main() {
MAX_ADDITIONAL_SUBSCRIBERS,
);

// Setup primary
let mut primary = cfg::Primary::new(config);

// Run primary
primary.run().unwrap()
// Setup and run primary
cfg::Primary::new(config)
.unwrap_or_else(|err| {
feo_log::error!("Failed to initialize primary agent: {err:?}");
std::process::exit(1);
})
.run()
.unwrap();
}

/// Parameters of the primary
Expand Down Expand Up @@ -102,6 +105,7 @@ mod cfg {
recorder_ids: vec![],
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
}
}
}
Expand All @@ -126,6 +130,7 @@ mod cfg {
recorder_ids: params.recorder_ids,
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
endpoint: NodeAddress::Tcp(BIND_ADDR),
}
}
Expand All @@ -151,6 +156,7 @@ mod cfg {
recorder_ids: params.recorder_ids,
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
endpoint: NodeAddress::UnixSocket(socket_paths().0),
}
}
Expand Down Expand Up @@ -186,6 +192,7 @@ mod cfg {
recorder_ids: params.recorder_ids,
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
bind_address_senders: NodeAddress::Tcp(BIND_ADDR),
bind_address_receivers: NodeAddress::Tcp(BIND_ADDR2),
id: AGENT_ID,
Expand Down Expand Up @@ -225,6 +232,7 @@ mod cfg {
recorder_ids: params.recorder_ids,
worker_assignments: agent_assignments().remove(&AGENT_ID).unwrap(),
timeout: Duration::from_secs(10),
connection_timeout: Duration::from_secs(10),
bind_address_senders: NodeAddress::UnixSocket(socket_paths().0),
bind_address_receivers: NodeAddress::UnixSocket(socket_paths().1),
id: AGENT_ID,
Expand Down
13 changes: 9 additions & 4 deletions feo/src/agent/direct/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct PrimaryConfig {
pub worker_assignments: Vec<(WorkerId, Vec<ActivityIdAndBuilder>)>,
/// Receive timeout of the scheduler's connector
pub timeout: Duration,
/// Timeout for waiting on initial connections from workers/recorders.
pub connection_timeout: Duration,
/// Endpoint on which the connector of the scheduler waits for connections
pub endpoint: NodeAddress,
}
Expand All @@ -55,14 +57,15 @@ pub struct Primary {

impl Primary {
/// Create a new instance
pub fn new(config: PrimaryConfig) -> Self {
pub fn new(config: PrimaryConfig) -> Result<Self, Error>{
let PrimaryConfig {
cycle_time,
activity_dependencies,
recorder_ids,
endpoint,
worker_assignments,
timeout,
connection_timeout,
} = config;

// Create worker threads first so that the connector of the scheduler can connect
Expand Down Expand Up @@ -100,14 +103,16 @@ impl Primary {
addr,
activity_dependencies.keys().cloned(),
recorder_ids.iter().cloned(),
connection_timeout,
)) as Box<dyn ConnectScheduler>,
NodeAddress::UnixSocket(path) => Box::new(UnixSchedulerConnector::new(
&path,
activity_dependencies.keys().cloned(),
recorder_ids.iter().cloned(),
connection_timeout,
)) as Box<dyn ConnectScheduler>,
};
connector.connect_remotes().expect("failed to connect");
connector.connect_remotes()?;

let scheduler = Scheduler::new(
cycle_time,
Expand All @@ -117,10 +122,10 @@ impl Primary {
recorder_ids,
);

Self {
Ok(Self {
scheduler,
_worker_threads,
}
})
}

/// Run the agent
Expand Down
11 changes: 7 additions & 4 deletions feo/src/agent/direct/primary_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct PrimaryConfig {
pub worker_assignments: Vec<(WorkerId, Vec<ActivityIdAndBuilder>)>,
/// Receive timeout of the scheduler's connector
pub timeout: Duration,
/// Timeout for waiting on initial connections from workers/recorders.
pub connection_timeout: Duration,
}

/// Primary agent
Expand All @@ -51,13 +53,14 @@ pub struct Primary {

impl Primary {
/// Create a new instance
pub fn new(config: PrimaryConfig) -> Self {
pub fn new(config: PrimaryConfig) -> Result<Self, Error>{
let PrimaryConfig {
cycle_time,
activity_dependencies,
recorder_ids,
worker_assignments,
timeout,
connection_timeout,
} = config;

let activity_worker_map: HashMap<ActivityId, WorkerId> = worker_assignments
Expand Down Expand Up @@ -89,7 +92,7 @@ impl Primary {
})
.collect();

connector.connect_remotes().expect("failed to connect");
connector.connect_remotes()?;

let scheduler = Scheduler::new(
cycle_time,
Expand All @@ -99,10 +102,10 @@ impl Primary {
recorder_ids,
);

Self {
Ok(Self {
scheduler,
_worker_threads,
}
})
}

/// Run the agent
Expand Down
25 changes: 18 additions & 7 deletions feo/src/agent/relayed/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use crate::error::Error;
use crate::ids::{ActivityId, AgentId, WorkerId};
use crate::scheduler::Scheduler;
use crate::signalling::common::interface::{ConnectScheduler, ConnectWorker};
use crate::signalling::relayed::sockets_mpsc::{SchedulerConnectorTcp, SchedulerConnectorUnix};
use crate::signalling::relayed::interface::IsChannel;
use crate::signalling::relayed::sockets_mpsc::{
InterChannelTcp, InterChannelUnix, IntraChannel, SchedulerConnectorTcp, SchedulerConnectorUnix,
};
use crate::timestamp;
use crate::worker::Worker;
use alloc::boxed::Box;
Expand All @@ -42,6 +45,8 @@ pub struct PrimaryConfig {
pub worker_assignments: Vec<(WorkerId, Vec<ActivityIdAndBuilder>)>,
/// Receive timeout of the scheduler's connector
pub timeout: Duration,
/// Timeout for waiting on initial connections from workers/recorders.
pub connection_timeout: Duration,
/// The socket address to which secondary agents' senders shall connect
pub bind_address_senders: NodeAddress,
/// The socket address to which secondary agents' receivers shall connect
Expand All @@ -62,7 +67,12 @@ pub struct Primary {

impl Primary {
/// Create a new instance
pub fn new(config: PrimaryConfig) -> Self {
pub fn new(config: PrimaryConfig) -> Result<Self, Error>
where
<InterChannelTcp as IsChannel>::MultiReceiver: Send,
<InterChannelUnix as IsChannel>::MultiReceiver: Send,
<IntraChannel as IsChannel>::Sender: Send,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our original implementation we avoided making senders and receivers "Send", because this is a limitation that not all communication implementations do actually fulfill. As an example, we found iceoryx2. For that reason, we used builders that are passed to the target thread, and the target thread uses them to build it senders and receivers directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, thank you for the detailed clarification. I wasn't familiar with using the builder pattern to avoid Send bounds, and your explanation was very helpful.

I'll remove the unnecessary Send trait bounds, as the builder pattern already ensures thread safety by creating the channel components within the target thread. I'll ensure this pattern is followed consistently.

{
let PrimaryConfig {
id,
cycle_time,
Expand All @@ -72,6 +82,7 @@ impl Primary {
bind_address_receivers,
worker_assignments,
timeout,
connection_timeout,
worker_agent_map,
activity_worker_map,
} = config;
Expand All @@ -84,7 +95,7 @@ impl Primary {
id,
bind_senders,
bind_receivers,
timeout,
connection_timeout,
worker_agent_map,
activity_worker_map,
recorder_ids.clone(),
Expand All @@ -97,7 +108,7 @@ impl Primary {
id,
bind_senders,
bind_receivers,
timeout,
connection_timeout,
worker_agent_map,
activity_worker_map,
recorder_ids.clone(),
Expand Down Expand Up @@ -126,7 +137,7 @@ impl Primary {
})
.collect();

connector.connect_remotes().expect("failed to connect");
connector.connect_remotes()?;

let scheduler = Scheduler::new(
cycle_time,
Expand All @@ -136,10 +147,10 @@ impl Primary {
recorder_ids,
);

Self {
Ok(Self {
scheduler,
_worker_threads,
}
})
}

/// Run the agent
Expand Down
18 changes: 15 additions & 3 deletions feo/src/signalling/direct/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ where

all_activities: Vec<ActivityId>,
all_recorders: Vec<AgentId>,
connection_timeout: Duration,
}

impl<L> SchedulerConnector<L>
Expand All @@ -57,6 +58,7 @@ where
server: SocketServer<L>,
activity_ids: impl IntoIterator<Item = ActivityId>,
recorder_ids: impl IntoIterator<Item = AgentId>,
connection_timeout: Duration,
) -> Self {
let events = Events::with_capacity(32);

Expand All @@ -73,6 +75,7 @@ where
recorder_id_token_map,
all_activities,
all_recorders,
connection_timeout,
}
}
}
Expand All @@ -83,9 +86,10 @@ impl TcpSchedulerConnector {
bind_address: SocketAddr,
activity_ids: impl IntoIterator<Item = ActivityId>,
recorder_ids: impl IntoIterator<Item = AgentId>,
connection_timeout: Duration,
) -> Self {
let tcp_server = TcpServer::new(bind_address);
Self::new_with_server(tcp_server, activity_ids, recorder_ids)
Self::new_with_server(tcp_server, activity_ids, recorder_ids, connection_timeout)
}
}

Expand All @@ -95,9 +99,10 @@ impl UnixSchedulerConnector {
path: &Path,
activity_ids: impl IntoIterator<Item = ActivityId>,
recorder_ids: impl IntoIterator<Item = AgentId>,
connection_timeout: Duration,
) -> Self {
let unix_server = UnixServer::new(path);
Self::new_with_server(unix_server, activity_ids, recorder_ids)
Self::new_with_server(unix_server, activity_ids, recorder_ids, connection_timeout)
}
}

Expand All @@ -109,11 +114,18 @@ where
let mut missing_activities: HashSet<ActivityId> =
self.all_activities.iter().cloned().collect();
let mut missing_recorders: HashSet<AgentId> = self.all_recorders.iter().cloned().collect();
let start_time = std::time::Instant::now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a use-statement for Instant from feo_time and just use Instant::now() here. (As you already did in relayed/connectors/relays.rs.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Updated this to match the implementation in relayed/connectors/relays.rs using Instant::now().


while !missing_activities.is_empty() || !missing_recorders.is_empty() {
let elapsed = start_time.elapsed();
if elapsed >= self.connection_timeout {
return Err(Error::Io((std::io::ErrorKind::TimedOut.into(), "CONNECTION_TIMEOUT")));
}
let remaining_timeout = self.connection_timeout.saturating_sub(elapsed);
// Wait for a new connection, but no longer than the remaining overall timeout.
if let Some((token, signal)) = self
.server
.receive(&mut self.events, Duration::from_secs(1))
.receive(&mut self.events, remaining_timeout)
{
match signal {
ProtocolSignal::ActivityHello(activity_id) => {
Expand Down
Loading
Loading