Skip to content
10 changes: 9 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ configure_file(nuclear.in ${PROJECT_BINARY_DIR}/nuclear)

# Build the library
find_package(Threads REQUIRED)
file(GLOB_RECURSE src "*.c" "*.cpp" "*.hpp" "*.ipp")
file(
GLOB_RECURSE
src
CONFIGURE_DEPENDS
"*.c"
"*.cpp"
"*.hpp"
"*.ipp"
)
add_library(nuclear STATIC ${src})
add_library(NUClear::nuclear ALIAS nuclear)

Expand Down
52 changes: 18 additions & 34 deletions src/extension/network/NUClearNetwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

#include <algorithm>
#include <array>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstring>
Expand Down Expand Up @@ -501,7 +500,7 @@ namespace extension {
if (ptr) {

auto now = std::chrono::steady_clock::now();
auto timeout = it->last_send + ptr->round_trip_time;
auto timeout = it->last_send + ptr->rtt.timeout();

// Check if we should have expected an ack by now for some packets
if (timeout < now) {
Expand All @@ -510,7 +509,7 @@ namespace extension {
it->last_send = now;

// The next time we should check for a timeout
auto next_timeout = now + ptr->round_trip_time;
auto next_timeout = now + ptr->rtt.timeout();
if (next_timeout < next_event) {
next_event = next_timeout;
next_event_callback(next_event);
Expand Down Expand Up @@ -673,18 +672,11 @@ namespace extension {
remote->last_update = std::chrono::steady_clock::now();

// Check if this packet is a retransmission of data
if (header.type == DATA_RETRANSMISSION) {
if (header.type == DATA_RETRANSMISSION
&& remote->deduplicator.is_duplicate(packet.packet_id)) {

// See if we recently processed this packet
// NOLINTNEXTLINE(readability-qualified-auto) MSVC disagrees
auto it = std::find(remote->recent_packets.begin(),
remote->recent_packets.end(),
packet.packet_id);

// We recently processed this packet, this is just a failed ack
// Send the ack again if it was reliable
if (it != remote->recent_packets.end() && packet.reliable) {

if (packet.reliable) {
// Allocate room for the whole ack packet
std::vector<uint8_t> r(sizeof(ACKPacket) + (packet.packet_count / 8), 0);
ACKPacket& response = *reinterpret_cast<ACKPacket*>(r.data());
Expand All @@ -708,10 +700,10 @@ namespace extension {
0,
&to.sock,
to.size());

// We don't need to process this packet we already did
return;
}

// We don't need to process this packet we already did
return;
}

// If this is a solo packet (in a single chunk)
Expand Down Expand Up @@ -739,13 +731,11 @@ namespace extension {
0,
&to.sock,
to.size());

// Set this packet to have been recently received
remote->recent_packets[remote->recent_packets_index
.fetch_add(1, std::memory_order_relaxed)] =
packet.packet_id;
}

// Add the packet to our deduplicator
remote->deduplicator.add_packet(packet.packet_id);

packet_callback(*remote, packet.hash, packet.reliable, std::move(out));
}
else {
Expand Down Expand Up @@ -851,25 +841,20 @@ namespace extension {
&part.data + p.second.size() - sizeof(DataPacket) + 1);
}

// Add the packet to our deduplicator
remote->deduplicator.add_packet(packet.packet_id);

// Send our assembled data packet
packet_callback(*remote, packet.hash, packet.reliable, std::move(out));

// If the packet was reliable add that it was recently received
if (packet.reliable) {
// Set this packet to have been recently received
remote->recent_packets[remote->recent_packets_index
.fetch_add(1, std::memory_order_relaxed)] =
packet.packet_id;
}

// We have completed this packet, discard the data
assemblers.erase(assemblers.find(packet.packet_id));
}

// Check for and delete any timed out packets
for (auto it = assemblers.begin(); it != assemblers.end();) {
const auto now = std::chrono::steady_clock::now();
const auto timeout = remote->round_trip_time * 10.0;
const auto timeout = remote->rtt.timeout() * 10.0;
const auto& last_chunk_time = it->second.first;

it = now > last_chunk_time + timeout ? assemblers.erase(it) : std::next(it);
Expand Down Expand Up @@ -919,8 +904,7 @@ namespace extension {

// Approximate how long the round trip is to this remote so we can work out how
// long before retransmitting
// We use a baby kalman filter to help smooth out jitter
remote->measure_round_trip(round_trip);
remote->rtt.measure(round_trip);

// Update our acks
bool all_acked = true;
Expand Down Expand Up @@ -987,7 +971,7 @@ namespace extension {
s->last_send = std::chrono::steady_clock::now();

// The next time we should check for a timeout
auto next_timeout = s->last_send + remote->round_trip_time;
auto next_timeout = s->last_send + remote->rtt.timeout();
if (next_timeout < next_event) {
next_event = next_timeout;
next_event_callback(next_event);
Expand Down Expand Up @@ -1108,7 +1092,7 @@ namespace extension {
queue.targets.emplace_back(it->second, acks);

// The next time we should check for a timeout
auto next_timeout = std::chrono::steady_clock::now() + it->second->round_trip_time;
auto next_timeout = std::chrono::steady_clock::now() + it->second->rtt.timeout();
if (next_timeout < next_event) {
next_event = next_timeout;
next_event_callback(next_event);
Expand Down
50 changes: 7 additions & 43 deletions src/extension/network/NUClearNetwork.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

#include "../../util/network/sock_t.hpp"
#include "../../util/platform.hpp"
#include "PacketDeduplicator.hpp"
#include "RTTEstimator.hpp"
#include "wire_protocol.hpp"

namespace NUClear {
Expand All @@ -56,64 +58,26 @@ namespace extension {
std::string name,
const sock_t& target,
const std::chrono::steady_clock::time_point& last_update = std::chrono::steady_clock::now())
: name(std::move(name)), target(target), last_update(last_update) {

// Set our recent packets to an invalid value
recent_packets.fill(-1);
}
: name(std::move(name)), target(target), last_update(last_update) {}

/// The name of the remote target
std::string name;
/// The socket address for the remote target
sock_t target{};
/// When we last received data from the remote target
std::chrono::steady_clock::time_point last_update;
/// A list of the last n packet groups to be received
std::array<int, std::numeric_limits<uint8_t>::max()> recent_packets{};
/// An index for the recent_packets (circular buffer)
std::atomic<uint8_t> recent_packets_index{0};
/// Mutex to protect the fragmented packet storage
std::mutex assemblers_mutex;
/// Storage for fragmented packets while we build them
std::map<uint16_t,
std::pair<std::chrono::steady_clock::time_point, std::map<uint16_t, std::vector<uint8_t>>>>
assemblers;

/// Struct storing the kalman filter for round trip time
struct RoundTripKF {
float process_noise = 1e-6f;
float measurement_noise = 1e-1f;
float variance = 1.0f;
float mean = 1.0f;
};
/// A little kalman filter for estimating round trip time
RoundTripKF round_trip_kf{};

std::chrono::steady_clock::duration round_trip_time{std::chrono::seconds(1)};

void measure_round_trip(std::chrono::steady_clock::duration time) {

// Make our measurement into a float seconds type
const std::chrono::duration<float> m =
std::chrono::duration_cast<std::chrono::duration<float>>(time);

// Alias variables
const auto& Q = round_trip_kf.process_noise;
const auto& R = round_trip_kf.measurement_noise;
auto& P = round_trip_kf.variance;
auto& X = round_trip_kf.mean;

// Calculate our kalman gain
const float K = (P + Q) / (P + Q + R);

// Do filter
P = R * (P + Q) / (R + P + Q);
X = X + (m.count() - X) * K;
/// RTT estimator for this network target
RTTEstimator rtt;

// Put result into our variable
round_trip_time = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
std::chrono::duration<float>(X));
}
/// Packet deduplicator for this network target
PacketDeduplicator deduplicator;
};

NUClearNetwork() = default;
Expand Down
76 changes: 76 additions & 0 deletions src/extension/network/PacketDeduplicator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* MIT License
*
* Copyright (c) 2025 NUClear Contributors
*
* This file is part of the NUClear codebase.
* See https://github.com/Fastcode/NUClear for further info.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "PacketDeduplicator.hpp"

#include <cstdint>

namespace NUClear {
namespace extension {
namespace network {


bool PacketDeduplicator::is_duplicate(uint16_t packet_id) const {
// If we haven't seen any packets yet, nothing is a duplicate
if (!initialized) {
return false;
}

// Calculate relative position in window using unsigned subtraction
const uint16_t relative_id = newest_seen - packet_id;

// If the packet is too old or too new, it's not a duplicate
if (relative_id >= 256) {
return false;
}

return window[relative_id];
}

void PacketDeduplicator::add_packet(uint16_t packet_id) {
// If this is our first packet, just set it as newest_seen
if (!initialized) {
newest_seen = packet_id;
window[0] = true;
initialized = true;
return;
}

// Calculate relative position in window using unsigned subtraction
const uint16_t relative_id = newest_seen - packet_id;

// If the distance is more than half the range, the packet is newer than our newest_seen
if (relative_id > 32768) {
// Calculate how far to shift to make this packet our newest
const uint16_t shift_amount = packet_id - newest_seen;
window <<= shift_amount;
newest_seen = packet_id;
window[0] = true;
}
// Packet is recent enough to be counted
else if (relative_id < 256) {
window[relative_id] = true;
}
}

} // namespace network
} // namespace extension
} // namespace NUClear
67 changes: 67 additions & 0 deletions src/extension/network/PacketDeduplicator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* MIT License
*
* Copyright (c) 2025 NUClear Contributors
*
* This file is part of the NUClear codebase.
* See https://github.com/Fastcode/NUClear for further info.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef NUCLEAR_EXTENSION_NETWORK_PACKET_DEDUPLICATOR_HPP
#define NUCLEAR_EXTENSION_NETWORK_PACKET_DEDUPLICATOR_HPP

#include <bitset>
#include <cstdint>

namespace NUClear {
namespace extension {
namespace network {

/**
* A class that implements a sliding window bitset for packet deduplication.
* Maintains a 256-bit window of recently seen packet IDs, sliding forward as new packets are added.
*/
class PacketDeduplicator {
public:
/**
* Check if a packet ID has been seen recently
*
* @param packet_id The packet ID to check
*
* @return true if the packet has been seen recently, false otherwise
*/
bool is_duplicate(uint16_t packet_id) const;

/**
* Add a packet ID to the window
*
* @param packet_id The packet ID to add
*/
void add_packet(uint16_t packet_id);

private:
/// Whether we've seen any packets yet
bool initialized{false};
/// The newest packet ID we've seen
uint16_t newest_seen{0};
/// The 256-bit window of seen packets (newest at 0, older at higher indices)
std::bitset<256> window;
};

} // namespace network
} // namespace extension
} // namespace NUClear

#endif // NUCLEAR_EXTENSION_NETWORK_PACKET_DEDUPLICATOR_HPP
Loading
Loading