Skip to content

Commit 618b472

Browse files
Make tests event-based by injecting time into Discovery, Reliability, and Fragmentation
Add an optional 'now' parameter (defaulting to steady_clock::now()) to all time-dependent methods in Discovery, Reliability, and Fragmentation. This allows tests to advance time deterministically without sleeping, making them faster, more reliable, and immune to CI timing variability. Tests converted from time-based (sleep_for) to event-based: - Discovery: check_timeouts and touch_peer tests - Reliability: all retransmission/timeout tests (7 sleeps removed) - Fragmentation: cleanup_expired test The only remaining sleep_for is in Integration.cpp's polling loop for genuine async UDP networking, which is inherently time-dependent.
1 parent 1df030f commit 618b472

9 files changed

Lines changed: 108 additions & 77 deletions

File tree

src/nuclearnet/Discovery.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,10 @@
8686
return packet;
8787
}
8888

89-
void Discovery::process_announce(const sock_t& source, const uint8_t* data, std::size_t length) {
89+
void Discovery::process_announce(const sock_t& source,
90+
const uint8_t* data,
91+
std::size_t length,
92+
std::chrono::steady_clock::time_point now) {
9093
// Minimum size: header(5) + name_length(2) + num_subscriptions(2) = 9
9194
if (length < sizeof(PacketHeader) + sizeof(uint16_t) + sizeof(uint16_t)) {
9295
return;
@@ -148,14 +151,14 @@
148151
PeerInfo info;
149152
info.name = name;
150153
info.address = source;
151-
info.last_seen = std::chrono::steady_clock::now();
154+
info.last_seen = now;
152155
info.subscriptions = std::move(subscriptions);
153156
peers.emplace(source, std::move(info));
154157
is_new = true;
155158
}
156159
else {
157160
// Existing peer — update last_seen and check for subscription changes
158-
it->second.last_seen = std::chrono::steady_clock::now();
161+
it->second.last_seen = now;
159162
if (it->second.subscriptions != subscriptions) {
160163
it->second.subscriptions = std::move(subscriptions);
161164
subs_changed = true;
@@ -198,16 +201,15 @@
198201
}
199202
}
200203

201-
void Discovery::touch_peer(const sock_t& source) {
204+
void Discovery::touch_peer(const sock_t& source, std::chrono::steady_clock::time_point now) {
202205
const std::lock_guard<std::mutex> lock(peers_mutex);
203206
auto it = peers.find(source);
204207
if (it != peers.end()) {
205-
it->second.last_seen = std::chrono::steady_clock::now();
208+
it->second.last_seen = now;
206209
}
207210
}
208211

209-
std::vector<PeerInfo> Discovery::check_timeouts() {
210-
auto now = std::chrono::steady_clock::now();
212+
std::vector<PeerInfo> Discovery::check_timeouts(std::chrono::steady_clock::time_point now) {
211213
std::vector<PeerInfo> removed;
212214

213215
{

src/nuclearnet/Discovery.hpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,12 @@ namespace network {
119119
* @param source The UDP source address (IP + port) of the packet
120120
* @param data The raw packet data
121121
* @param length The length of the packet data
122+
* @param now The current time (defaults to steady_clock::now())
122123
*/
123-
void process_announce(const sock_t& source, const uint8_t* data, std::size_t length);
124+
void process_announce(const sock_t& source,
125+
const uint8_t* data,
126+
std::size_t length,
127+
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now());
124128

125129
/**
126130
* Process a received leave packet from a peer.
@@ -133,15 +137,20 @@ namespace network {
133137
* Update the last_seen timestamp for a peer (called on any received packet).
134138
*
135139
* @param source The UDP source address
140+
* @param now The current time (defaults to steady_clock::now())
136141
*/
137-
void touch_peer(const sock_t& source);
142+
void touch_peer(const sock_t& source,
143+
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now());
138144

139145
/**
140146
* Check for peers that have timed out and remove them.
141147
*
148+
* @param now The current time (defaults to steady_clock::now())
149+
*
142150
* @return List of peers that were removed due to timeout
143151
*/
144-
std::vector<PeerInfo> check_timeouts();
152+
std::vector<PeerInfo> check_timeouts(
153+
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now());
145154

146155
/**
147156
* Get the current list of known peers.

src/nuclearnet/Fragmentation.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ namespace network {
7171
uint8_t flags,
7272
const uint8_t* data,
7373
std::size_t data_length,
74-
AssembledPacket& out_packet) {
74+
AssembledPacket& out_packet,
75+
std::chrono::steady_clock::time_point now) {
7576
if (packet_count == 0 || packet_no >= packet_count) {
7677
return false;
7778
}
@@ -92,7 +93,7 @@ namespace network {
9293
assembly.hash = hash;
9394
assembly.flags = flags;
9495
assembly.packet_count = packet_count;
95-
assembly.last_update = std::chrono::steady_clock::now();
96+
assembly.last_update = now;
9697

9798
// Store this fragment
9899
assembly.fragments[packet_no].assign(data, data + data_length);
@@ -128,8 +129,7 @@ namespace network {
128129
return false;
129130
}
130131

131-
std::size_t Fragmentation::cleanup_expired() {
132-
auto now = std::chrono::steady_clock::now();
132+
std::size_t Fragmentation::cleanup_expired(std::chrono::steady_clock::time_point now) {
133133
std::size_t removed = 0;
134134

135135
const std::lock_guard<std::mutex> lock(assembly_mutex);

src/nuclearnet/Fragmentation.hpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ namespace network {
100100
* @param data The fragment payload
101101
*
102102
* @param out_packet Filled with the assembled packet when reassembly completes
103+
* @param now The current time (defaults to steady_clock::now())
103104
*
104105
* @return true if all fragments are now received and @p out_packet is valid, false otherwise
105106
*/
@@ -111,14 +112,18 @@ namespace network {
111112
uint8_t flags,
112113
const uint8_t* data,
113114
std::size_t data_length,
114-
AssembledPacket& out_packet);
115+
AssembledPacket& out_packet,
116+
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now());
115117

116118
/**
117119
* Clean up assemblies that have timed out.
118120
*
121+
* @param now The current time (defaults to steady_clock::now())
122+
*
119123
* @return Number of assemblies that were discarded
120124
*/
121-
std::size_t cleanup_expired();
125+
std::size_t cleanup_expired(
126+
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now());
122127

123128
/**
124129
* Get the packet MTU (max payload per fragment).

src/nuclearnet/Reliability.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040
uint64_t hash,
4141
uint8_t flags,
4242
const uint8_t* payload,
43-
std::size_t payload_len) {
43+
std::size_t payload_len,
44+
std::chrono::steady_clock::time_point now) {
4445
TrackedPacket tp;
4546
tp.target = target;
4647
tp.packet_id = packet_id;
@@ -49,7 +50,7 @@
4950
tp.flags = flags;
5051
tp.payload.assign(payload, payload + payload_len);
5152
tp.acked.resize(packet_count, false);
52-
tp.last_send = std::chrono::steady_clock::now();
53+
tp.last_send = now;
5354

5455
TrackingKey key{target, packet_id};
5556

@@ -61,7 +62,8 @@
6162
uint16_t packet_id,
6263
uint16_t packet_count,
6364
const uint8_t* ack_bitset,
64-
std::size_t bitset_size) {
65+
std::size_t bitset_size,
66+
std::chrono::steady_clock::time_point now) {
6567
TrackingKey key{source, packet_id};
6668

6769
const std::lock_guard<std::mutex> lock(tracking_mutex);
@@ -73,7 +75,6 @@
7375
auto& tp = it->second;
7476

7577
// Update RTT estimate based on time since last send
76-
auto now = std::chrono::steady_clock::now();
7778
auto rtt = now - tp.last_send;
7879
{
7980
const std::lock_guard<std::mutex> rtt_lock(rtt_mutex);
@@ -165,8 +166,9 @@
165166
return packet;
166167
}
167168

168-
std::vector<Reliability::RetransmitRequest> Reliability::check_retransmissions(uint16_t packet_mtu) {
169-
auto now = std::chrono::steady_clock::now();
169+
std::vector<Reliability::RetransmitRequest> Reliability::check_retransmissions(
170+
uint16_t packet_mtu,
171+
std::chrono::steady_clock::time_point now) {
170172
std::vector<RetransmitRequest> retransmissions;
171173
std::vector<TrackingKey> expired;
172174

src/nuclearnet/Reliability.hpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,16 @@ namespace network {
7878
* @param flags Packet flags
7979
* @param payload Pointer to the full original payload (copied internally for retransmission)
8080
* @param payload_len Length of the payload in bytes
81+
* @param now The current time (defaults to steady_clock::now())
8182
*/
8283
void track_packet(const sock_t& target,
8384
uint16_t packet_id,
8485
uint16_t packet_count,
8586
uint64_t hash,
8687
uint8_t flags,
8788
const uint8_t* payload,
88-
std::size_t payload_len);
89+
std::size_t payload_len,
90+
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now());
8991

9092
/**
9193
* Process a received ACK packet.
@@ -95,12 +97,14 @@ namespace network {
9597
* @param packet_count Total fragments in the group
9698
* @param ack_bitset Bitset of received fragments (1 bit per fragment, LSB first)
9799
* @param bitset_size Size of the ack_bitset in bytes
100+
* @param now The current time (defaults to steady_clock::now())
98101
*/
99102
void process_ack(const sock_t& source,
100103
uint16_t packet_id,
101104
uint16_t packet_count,
102105
const uint8_t* ack_bitset,
103-
std::size_t bitset_size);
106+
std::size_t bitset_size,
107+
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now());
104108

105109
/**
106110
* Process a received NACK packet.
@@ -147,10 +151,13 @@ namespace network {
147151
* Check for packets that need retransmission and return them.
148152
*
149153
* @param packet_mtu The MTU to use for fragmenting retransmissions
154+
* @param now The current time (defaults to steady_clock::now())
150155
*
151156
* @return List of fragments that need to be retransmitted
152157
*/
153-
std::vector<RetransmitRequest> check_retransmissions(uint16_t packet_mtu);
158+
std::vector<RetransmitRequest> check_retransmissions(
159+
uint16_t packet_mtu,
160+
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now());
154161

155162
/**
156163
* Remove all tracking for a given peer (e.g., on disconnect).

tests/tests/nuclearnet/Discovery.cpp

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
#include <catch2/catch_test_macros.hpp>
2626
#include <chrono>
27-
#include <thread>
2827
#include <vector>
2928

3029
#include "util/platform.hpp"
@@ -142,13 +141,18 @@ SCENARIO("Discovery check_timeouts removes stale peers", "[nuclearnet][discovery
142141

143142
disc.set_leave_callback([&](const PeerInfo&) { leave_called = true; });
144143

144+
// Add peer at time T
145+
auto t = std::chrono::steady_clock::now();
145146
auto announce = Discovery::build_announce_packet("peer_a", {});
146-
disc.process_announce(peer_addr, announce.data(), announce.size());
147+
disc.process_announce(peer_addr, announce.data(), announce.size(), t);
147148

148-
// Wait for timeout
149-
std::this_thread::sleep_for(std::chrono::milliseconds(30));
149+
// Check at T+10ms (before timeout) — peer should still be there
150+
auto removed = disc.check_timeouts(t + std::chrono::milliseconds(10));
151+
REQUIRE(removed.empty());
152+
REQUIRE(disc.has_peer(peer_addr));
150153

151-
auto removed = disc.check_timeouts();
154+
// Check at T+25ms (after 20ms timeout) — peer should be removed
155+
removed = disc.check_timeouts(t + std::chrono::milliseconds(25));
152156
REQUIRE(removed.size() == 1);
153157
REQUIRE(removed[0].name == "peer_a");
154158
REQUIRE(leave_called);
@@ -160,19 +164,24 @@ SCENARIO("Discovery touch_peer resets timeout", "[nuclearnet][discovery]") {
160164

161165
sock_t peer_addr = make_addr(0x0A000001, 5000);
162166

167+
// Add peer at time T
168+
auto t = std::chrono::steady_clock::now();
163169
auto announce = Discovery::build_announce_packet("peer_a", {});
164-
disc.process_announce(peer_addr, announce.data(), announce.size());
170+
disc.process_announce(peer_addr, announce.data(), announce.size(), t);
165171

166-
// Wait part of the timeout, then touch
167-
std::this_thread::sleep_for(std::chrono::milliseconds(120));
168-
disc.touch_peer(peer_addr);
172+
// Touch at T+120ms (before 200ms timeout expires)
173+
disc.touch_peer(peer_addr, t + std::chrono::milliseconds(120));
169174

170-
// Wait another partial timeout (total would have expired without touch)
171-
std::this_thread::sleep_for(std::chrono::milliseconds(120));
172-
173-
auto removed = disc.check_timeouts();
175+
// Check at T+240ms — 240ms since announce, but only 120ms since touch
176+
// Since timeout is 200ms from last_seen, peer should still be alive
177+
auto removed = disc.check_timeouts(t + std::chrono::milliseconds(240));
174178
REQUIRE(removed.empty());
175179
REQUIRE(disc.has_peer(peer_addr));
180+
181+
// Check at T+325ms — 205ms since touch, should now be timed out
182+
removed = disc.check_timeouts(t + std::chrono::milliseconds(325));
183+
REQUIRE(removed.size() == 1);
184+
REQUIRE(removed[0].name == "peer_a");
176185
}
177186

178187
SCENARIO("Discovery get_peers returns all known peers", "[nuclearnet][discovery]") {

tests/tests/nuclearnet/Fragmentation.cpp

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
#include <chrono>
2727
#include <cstdint>
2828
#include <numeric>
29-
#include <thread>
3029
#include <vector>
3130

3231
using NUClear::network::Fragmentation;
@@ -191,20 +190,22 @@ SCENARIO("Fragmentation cleanup_expired removes stale assemblies", "[nuclearnet]
191190
// Use a very short timeout for testing
192191
Fragmentation frag(100, 64 * 1024 * 1024, std::chrono::milliseconds(1));
193192

194-
// Submit a partial assembly
193+
// Submit a partial assembly at time T
194+
auto t = std::chrono::steady_clock::now();
195195
uint8_t data[50] = {};
196196
Fragmentation::AssembledPacket result;
197-
frag.submit_fragment(1, 1, 0, 3, 0x1234, 0, data, 50, result);
197+
frag.submit_fragment(1, 1, 0, 3, 0x1234, 0, data, 50, result, t);
198198

199-
// Wait for the timeout
200-
std::this_thread::sleep_for(std::chrono::milliseconds(10));
199+
// Cleanup at T (not expired yet) — nothing removed
200+
std::size_t removed = frag.cleanup_expired(t);
201+
REQUIRE(removed == 0);
201202

202-
// Cleanup should remove it
203-
std::size_t removed = frag.cleanup_expired();
203+
// Cleanup at T+10ms (past 1ms timeout) — should remove it
204+
removed = frag.cleanup_expired(t + std::chrono::milliseconds(10));
204205
REQUIRE(removed == 1);
205206

206207
// Second cleanup should find nothing
207-
removed = frag.cleanup_expired();
208+
removed = frag.cleanup_expired(t + std::chrono::milliseconds(20));
208209
REQUIRE(removed == 0);
209210
}
210211

0 commit comments

Comments
 (0)