Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use crate::{
client_events::{ClientId, HostResult},
node::PeerId,
node::{proximity_cache::ProximityCacheMessage, PeerId},
operations::{
connect::ConnectMsg, get::GetMsg, put::PutMsg, subscribe::SubscribeMsg, update::UpdateMsg,
},
Expand Down Expand Up @@ -255,6 +255,10 @@ pub(crate) enum NetMessageV1 {
},
Update(UpdateMsg),
Aborted(Transaction),
ProximityCache {
from: PeerId,
message: ProximityCacheMessage,
},
}

trait Versioned {
Expand All @@ -279,6 +283,7 @@ impl Versioned for NetMessageV1 {
NetMessageV1::Unsubscribed { .. } => semver::Version::new(1, 0, 0),
NetMessageV1::Update(_) => semver::Version::new(1, 0, 0),
NetMessageV1::Aborted(_) => semver::Version::new(1, 0, 0),
NetMessageV1::ProximityCache { .. } => semver::Version::new(1, 0, 0),
}
}
}
Expand Down Expand Up @@ -339,6 +344,11 @@ pub(crate) enum NodeEvent {
target: PeerId,
msg: Box<NetMessage>,
},
#[allow(dead_code)] // Reserved for future proximity cache broadcasting
BroadcastProximityCache {
from: PeerId,
message: crate::node::proximity_cache::ProximityCacheMessage,
},
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -418,6 +428,12 @@ impl Display for NodeEvent {
NodeEvent::SendMessage { target, msg } => {
write!(f, "SendMessage (to {target}, tx: {})", msg.id())
}
NodeEvent::BroadcastProximityCache { from, message } => {
write!(
f,
"BroadcastProximityCache (from {from}, message: {message:?})"
)
}
}
}
}
Expand Down Expand Up @@ -452,6 +468,7 @@ impl MessageStats for NetMessageV1 {
NetMessageV1::Update(op) => op.id(),
NetMessageV1::Aborted(tx) => tx,
NetMessageV1::Unsubscribed { transaction, .. } => transaction,
NetMessageV1::ProximityCache { .. } => Transaction::NULL,
}
}

Expand All @@ -464,6 +481,7 @@ impl MessageStats for NetMessageV1 {
NetMessageV1::Update(op) => op.target().as_ref().map(|b| b.borrow().clone()),
NetMessageV1::Aborted(_) => None,
NetMessageV1::Unsubscribed { .. } => None,
NetMessageV1::ProximityCache { .. } => None,
}
}

Expand All @@ -476,6 +494,7 @@ impl MessageStats for NetMessageV1 {
NetMessageV1::Update(op) => op.requested_location(),
NetMessageV1::Aborted(_) => None,
NetMessageV1::Unsubscribed { .. } => None,
NetMessageV1::ProximityCache { .. } => None,
}
}
}
Expand All @@ -495,6 +514,9 @@ impl Display for NetMessage {
Unsubscribed { key, from, .. } => {
write!(f, "Unsubscribed {{ key: {key}, from: {from} }}")?;
}
ProximityCache { from, message } => {
write!(f, "ProximityCache {{ from: {from}, message: {message:?} }}")?;
}
},
};
write!(f, "}}")
Expand Down
41 changes: 41 additions & 0 deletions crates/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ mod message_processor;
mod network_bridge;
mod op_state_manager;
mod p2p_impl;
pub(crate) mod proximity_cache;
mod request_router;
pub(crate) mod testing_impl;

Expand Down Expand Up @@ -826,6 +827,26 @@ async fn process_message_v1<CB>(
op_manager.ring.remove_subscriber(key, from);
break;
}
NetMessageV1::ProximityCache { from, ref message } => {
tracing::debug!(?from, "Processing proximity cache message");

// Handle the proximity cache message
if let Some(response) = op_manager
.proximity_cache
.handle_message(from.clone(), message.clone())
.await
{
// Send response directly back to the sender
let response_msg = NetMessage::V1(NetMessageV1::ProximityCache {
from: op_manager.ring.connection_manager.get_peer_key().unwrap(),
message: response,
});
if let Err(err) = conn_manager.send(&from, response_msg).await {
tracing::error!(%err, ?from, "Failed to send proximity cache response");
}
}
break;
}
_ => break, // Exit the loop if no applicable message type is found
}
}
Expand Down Expand Up @@ -1047,6 +1068,26 @@ where
op_manager.ring.remove_subscriber(key, from);
break;
}
NetMessageV1::ProximityCache { from, ref message } => {
tracing::debug!(?from, "Processing proximity cache message (pure network)");

// Handle the proximity cache message
if let Some(response) = op_manager
.proximity_cache
.handle_message(from.clone(), message.clone())
.await
{
// Send response directly back to the sender
let response_msg = NetMessage::V1(NetMessageV1::ProximityCache {
from: op_manager.ring.connection_manager.get_peer_key().unwrap(),
message: response,
});
if let Err(err) = conn_manager.send(&from, response_msg).await {
tracing::error!(%err, ?from, "Failed to send proximity cache response");
}
}
break;
}
_ => break, // Exit the loop if no applicable message type is found
}
}
Expand Down
115 changes: 89 additions & 26 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,39 @@ impl P2pConnManager {
peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(),
"Received inbound message from peer - processing"
);

// Handle ProximityCache messages directly before normal processing
if let crate::message::NetMessage::V1(
crate::message::NetMessageV1::ProximityCache { from, message },
) = &msg
{
tracing::info!(?from, ?message, "Processing ProximityCache message directly in InboundMessage handler");

// Handle the proximity cache message
if let Some(response) = op_manager
.proximity_cache
.handle_message(from.clone(), message.clone())
.await
{
// Send response directly back to the sender
let response_msg = crate::message::NetMessage::V1(
crate::message::NetMessageV1::ProximityCache {
from: op_manager
.ring
.connection_manager
.get_peer_key()
.unwrap(),
message: response,
},
);
if let Err(err) = ctx.bridge.send(from, response_msg).await {
tracing::error!(%err, ?from, "Failed to send ProximityCache response");
}
}
// ProximityCache processed, skip normal message handling
continue;
}

ctx.handle_inbound_message(
msg,
&outbound_message,
Expand All @@ -301,17 +334,16 @@ impl P2pConnManager {
)
.await?;
}
ConnEvent::OutboundMessage(NetMessage::V1(NetMessageV1::Aborted(tx))) => {
ConnEvent::OutboundMessage {
target,
msg: NetMessage::V1(NetMessageV1::Aborted(tx)),
} => {
// TODO: handle aborted transaction as internal message
tracing::error!(%tx, "Aborted transaction");
tracing::error!(%tx, target_peer = %target, "Aborted transaction");
}
ConnEvent::OutboundMessage(msg) => {
let Some(target_peer) = msg.target() else {
let id = *msg.id();
tracing::error!(%id, %msg, "Target peer not set, must be set for connection outbound message");
ctx.bridge.op_manager.completed(id);
continue;
};
ConnEvent::OutboundMessage { target, msg } => {
// target is the PeerId from the event - this is the authoritative target
// msg.target() may be None (for ProximityCache) or Some (for other messages)

// Check if message targets self - if so, process locally instead of sending over network
let self_peer_id = ctx
Expand All @@ -321,11 +353,11 @@ impl P2pConnManager {
.connection_manager
.get_peer_key()
.unwrap();
if target_peer.peer == self_peer_id {
if target == self_peer_id {
tracing::error!(
tx = %msg.id(),
msg_type = %msg,
target_peer = %target_peer,
target_peer = %target,
self_peer = %self_peer_id,
"BUG: OutboundMessage targets self! This indicates a routing logic error - messages should not reach OutboundMessage handler if they target self"
);
Expand All @@ -343,17 +375,18 @@ impl P2pConnManager {
tracing::info!(
tx = %msg.id(),
msg_type = %msg,
target_peer = %target_peer,
target_peer = %target,
msg_target = ?msg.target(),
"Sending outbound message to peer"
);
// IMPORTANT: Use a single get() call to avoid TOCTOU race
// between contains_key() and get(). The connection can be
// removed by another task between those two calls.
let peer_connection = ctx.connections.get(&target_peer.peer);
let peer_connection = ctx.connections.get(&target);
tracing::debug!(
tx = %msg.id(),
self_peer = %ctx.bridge.op_manager.ring.connection_manager.pub_key,
target = %target_peer.peer,
target = %target,
conn_map_size = ctx.connections.len(),
has_connection = peer_connection.is_some(),
"[CONN_TRACK] LOOKUP: Checking for existing connection in HashMap"
Expand All @@ -368,15 +401,15 @@ impl P2pConnManager {
} else {
tracing::info!(
tx = %msg.id(),
target_peer = %target_peer,
target_peer = %target,
"Message successfully sent to peer connection"
);
}
}
None => {
tracing::warn!(
id = %msg.id(),
target = %target_peer.peer,
target = %target,
"No existing outbound connection, establishing connection first"
);

Expand All @@ -388,7 +421,7 @@ impl P2pConnManager {
ctx.bridge
.ev_listener_tx
.send(Right(NodeEvent::ConnectPeer {
peer: target_peer.peer.clone(),
peer: target.clone(),
tx,
callback,
is_gw: false,
Expand All @@ -401,11 +434,11 @@ impl P2pConnManager {
// Connection established, try sending again
// IMPORTANT: Use single get() call to avoid TOCTOU race
let peer_connection_retry =
ctx.connections.get(&target_peer.peer);
ctx.connections.get(&target);
tracing::debug!(
tx = %msg.id(),
self_peer = %ctx.bridge.op_manager.ring.connection_manager.pub_key,
target = %target_peer.peer,
target = %target,
conn_map_size = ctx.connections.len(),
has_connection = peer_connection_retry.is_some(),
"[CONN_TRACK] LOOKUP: Retry after connection established - checking for connection in HashMap"
Expand All @@ -419,22 +452,22 @@ impl P2pConnManager {
} else {
tracing::error!(
tx = %tx,
target = %target_peer.peer,
target = %target,
"Connection established successfully but not found in HashMap - possible race condition"
);
}
}
Ok(Some(Err(e))) => {
tracing::error!(
"Failed to establish connection to {}: {:?}",
target_peer.peer,
target,
e
);
}
Ok(None) | Err(_) => {
tracing::error!(
"Timeout or error establishing connection to {}",
target_peer.peer
target
);
}
}
Expand Down Expand Up @@ -815,6 +848,30 @@ impl P2pConnManager {
Err(e) => tracing::error!("Failed to send local subscribe response to result router: {}", e),
}
}
NodeEvent::BroadcastProximityCache { from, message } => {
// Broadcast ProximityCache message to all connected peers
tracing::debug!(
%from,
?message,
peer_count = ctx.connections.len(),
"Broadcasting ProximityCache message to connected peers"
);

use crate::message::{NetMessage, NetMessageV1};
let msg = NetMessage::V1(NetMessageV1::ProximityCache {
from: from.clone(),
message: message.clone(),
});

for peer in ctx.connections.keys() {
if peer != &from {
tracing::debug!(%peer, "Sending ProximityCache to peer");
if let Err(e) = ctx.bridge.send(peer, msg.clone()).await {
tracing::warn!(%peer, "Failed to send ProximityCache: {}", e);
}
}
}
}
NodeEvent::Disconnect { cause } => {
tracing::info!(
"Disconnecting from network{}",
Expand Down Expand Up @@ -1368,7 +1425,13 @@ impl P2pConnManager {
target_peer = %target,
"handle_notification_msg: Message has target peer, routing as OutboundMessage"
);
return EventResult::Event(ConnEvent::OutboundMessage(msg).into());
return EventResult::Event(
ConnEvent::OutboundMessage {
target: target.peer,
msg,
}
.into(),
);
}
}

Expand Down Expand Up @@ -1408,8 +1471,8 @@ impl P2pConnManager {

fn handle_bridge_msg(&self, msg: Option<P2pBridgeEvent>) -> EventResult {
match msg {
Some(Left((_target, msg))) => {
EventResult::Event(ConnEvent::OutboundMessage(*msg).into())
Some(Left((target, msg))) => {
EventResult::Event(ConnEvent::OutboundMessage { target, msg: *msg }.into())
}
Some(Right(action)) => EventResult::Event(ConnEvent::NodeAction(action).into()),
None => EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Bridge).into()),
Expand Down Expand Up @@ -1551,7 +1614,7 @@ enum EventResult {
#[derive(Debug)]
pub(super) enum ConnEvent {
InboundMessage(NetMessage),
OutboundMessage(NetMessage),
OutboundMessage { target: PeerId, msg: NetMessage },
NodeAction(NodeEvent),
ClosedChannel(ChannelCloseReason),
}
Expand Down
Loading
Loading