Skip to content

Commit f9f33d3

Browse files
sanityclaude
andcommitted
feat: implement proximity-based update forwarding
Peers that cache contracts via PUT/GET may not establish proper upstream subscriptions for updates, creating "disconnected subtrees" that never receive updates. This PR fixes that by introducing a proximity cache protocol. ## Problem When a peer calls `seed_contract()` to cache a contract locally, it does NOT automatically establish an upstream subscription in the broadcast tree. Result: a peer can cache a contract but have no path to receive UPDATEs for it. ## Solution Introduce `ProximityCacheManager` that: 1. Tracks which contracts each neighbor has cached 2. Announces cache state changes to neighbors via new `ProximityCache` messages 3. Enhances UPDATE forwarding to include proximity neighbors alongside explicit subscribers The subscription tree (explicit downstream interest) and proximity cache (nearby seeders) remain independent mechanisms, combined only at the broadcast targeting point using HashSet deduplication. ## Changes - Add `ProximityCacheMessage` protocol (CacheAnnounce, CacheStateRequest, CacheStateResponse) - Add `ProximityCacheManager` to track neighbor contract caches - Integrate into OpManager and message routing - Call `announce_contract_cached()` after `seed_contract()` in PUT/GET - Enhance `get_broadcast_targets_update()` to include proximity neighbors Supersedes #2002 (fresh implementation due to 109-commit divergence and PeerId→PeerKeyLocation routing refactor). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 42bfc46 commit f9f33d3

File tree

10 files changed

+528
-12
lines changed

10 files changed

+528
-12
lines changed

crates/core/src/message.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use crate::{
1616
},
1717
ring::{Location, PeerKeyLocation},
1818
};
19-
use freenet_stdlib::prelude::{ContractContainer, ContractKey, DelegateKey, WrappedState};
19+
use freenet_stdlib::prelude::{
20+
ContractContainer, ContractInstanceId, ContractKey, DelegateKey, WrappedState,
21+
};
2022
pub(crate) use sealed_msg_type::{TransactionType, TransactionTypeId};
2123
use serde::{Deserialize, Serialize};
2224
use ulid::Ulid;
@@ -333,6 +335,30 @@ pub(crate) enum NetMessageV1 {
333335
},
334336
Update(UpdateMsg),
335337
Aborted(Transaction),
338+
/// Proximity cache protocol message for tracking which neighbors cache which contracts.
339+
ProximityCache {
340+
message: ProximityCacheMessage,
341+
},
342+
}
343+
344+
/// Messages for the proximity cache protocol.
345+
///
346+
/// This protocol allows neighbors to inform each other which contracts they have cached,
347+
/// enabling UPDATE forwarding to seeders who may not be explicitly subscribed.
348+
#[derive(Debug, Clone, Serialize, Deserialize)]
349+
#[allow(clippy::enum_variant_names)]
350+
pub enum ProximityCacheMessage {
351+
/// Announce changes to our cached contracts.
352+
CacheAnnounce {
353+
/// Contracts we've started caching.
354+
added: Vec<ContractInstanceId>,
355+
/// Contracts we've stopped caching.
356+
removed: Vec<ContractInstanceId>,
357+
},
358+
/// Request a neighbor's full cache state (used on new connections).
359+
CacheStateRequest,
360+
/// Response with the neighbor's full cache state.
361+
CacheStateResponse { contracts: Vec<ContractInstanceId> },
336362
}
337363

338364
trait Versioned {
@@ -357,6 +383,7 @@ impl Versioned for NetMessageV1 {
357383
NetMessageV1::Unsubscribed { .. } => semver::Version::new(1, 0, 0),
358384
NetMessageV1::Update(_) => semver::Version::new(1, 0, 0),
359385
NetMessageV1::Aborted(_) => semver::Version::new(1, 0, 0),
386+
NetMessageV1::ProximityCache { .. } => semver::Version::new(1, 0, 0),
360387
}
361388
}
362389
}
@@ -416,6 +443,10 @@ pub(crate) enum NodeEvent {
416443
ExpectPeerConnection {
417444
addr: SocketAddr,
418445
},
446+
/// Broadcast a proximity cache message to all connected peers.
447+
BroadcastProximityCache {
448+
message: ProximityCacheMessage,
449+
},
419450
}
420451

421452
#[derive(Debug, Clone)]
@@ -495,6 +526,9 @@ impl Display for NodeEvent {
495526
NodeEvent::ExpectPeerConnection { addr } => {
496527
write!(f, "ExpectPeerConnection (from {addr})")
497528
}
529+
NodeEvent::BroadcastProximityCache { message } => {
530+
write!(f, "BroadcastProximityCache ({message:?})")
531+
}
498532
}
499533
}
500534
}
@@ -529,6 +563,7 @@ impl MessageStats for NetMessageV1 {
529563
NetMessageV1::Update(op) => op.id(),
530564
NetMessageV1::Aborted(tx) => tx,
531565
NetMessageV1::Unsubscribed { transaction, .. } => transaction,
566+
NetMessageV1::ProximityCache { .. } => Transaction::NULL,
532567
}
533568
}
534569

@@ -541,6 +576,7 @@ impl MessageStats for NetMessageV1 {
541576
NetMessageV1::Update(op) => op.target().as_ref().map(|b| b.borrow().clone()),
542577
NetMessageV1::Aborted(_) => None,
543578
NetMessageV1::Unsubscribed { .. } => None,
579+
NetMessageV1::ProximityCache { .. } => None,
544580
}
545581
}
546582

@@ -553,6 +589,7 @@ impl MessageStats for NetMessageV1 {
553589
NetMessageV1::Update(op) => op.requested_location(),
554590
NetMessageV1::Aborted(_) => None,
555591
NetMessageV1::Unsubscribed { .. } => None,
592+
NetMessageV1::ProximityCache { .. } => None,
556593
}
557594
}
558595
}
@@ -572,6 +609,9 @@ impl Display for NetMessage {
572609
Unsubscribed { key, from, .. } => {
573610
write!(f, "Unsubscribed {{ key: {key}, from: {from} }}")?;
574611
}
612+
ProximityCache { message } => {
613+
write!(f, "ProximityCache {{ {message:?} }}")?;
614+
}
575615
},
576616
};
577617
write!(f, "}}")

crates/core/src/node/mod.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ mod message_processor;
6868
mod network_bridge;
6969
mod op_state_manager;
7070
mod p2p_impl;
71+
pub(crate) mod proximity_cache;
7172
mod request_router;
7273
pub(crate) mod testing_impl;
7374

@@ -864,6 +865,13 @@ async fn process_message_v1<CB>(
864865
op_manager.ring.remove_subscriber(key, &peer_id);
865866
break;
866867
}
868+
NetMessageV1::ProximityCache { .. } => {
869+
// Legacy path doesn't have source_addr - ProximityCache requires connection-based routing
870+
tracing::warn!(
871+
"ProximityCache message received via legacy path (no source address)"
872+
);
873+
break;
874+
}
867875
_ => break, // Exit the loop if no applicable message type is found
868876
}
869877
}
@@ -1105,6 +1113,30 @@ where
11051113
op_manager.ring.remove_subscriber(key, &peer_id);
11061114
break;
11071115
}
1116+
NetMessageV1::ProximityCache { ref message } => {
1117+
let Some(source) = source_addr else {
1118+
tracing::warn!(
1119+
"Received ProximityCache message without source address (pure network)"
1120+
);
1121+
break;
1122+
};
1123+
tracing::debug!(
1124+
from = %source,
1125+
"Processing ProximityCache message (pure network)"
1126+
);
1127+
if let Some(response) = op_manager
1128+
.proximity_cache
1129+
.handle_message(source, message.clone())
1130+
{
1131+
// Send response back to sender
1132+
let response_msg =
1133+
NetMessage::V1(NetMessageV1::ProximityCache { message: response });
1134+
if let Err(err) = conn_manager.send(source, response_msg).await {
1135+
tracing::error!(%err, %source, "Failed to send ProximityCache response");
1136+
}
1137+
}
1138+
break;
1139+
}
11081140
_ => break, // Exit the loop if no applicable message type is found
11091141
}
11101142
}

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,6 +1271,27 @@ impl P2pConnManager {
12711271
}
12721272
}
12731273
}
1274+
NodeEvent::BroadcastProximityCache { message } => {
1275+
// Broadcast ProximityCache message to all connected peers
1276+
tracing::debug!(
1277+
?message,
1278+
peer_count = ctx.connections.len(),
1279+
"Broadcasting ProximityCache message to connected peers"
1280+
);
1281+
1282+
let msg = crate::message::NetMessage::V1(
1283+
crate::message::NetMessageV1::ProximityCache {
1284+
message: message.clone(),
1285+
},
1286+
);
1287+
1288+
for peer_addr in ctx.connections.keys() {
1289+
tracing::debug!(%peer_addr, "Sending ProximityCache to peer");
1290+
if let Err(e) = ctx.bridge.send(*peer_addr, msg.clone()).await {
1291+
tracing::warn!(%peer_addr, "Failed to send ProximityCache: {e}");
1292+
}
1293+
}
1294+
}
12741295
NodeEvent::Disconnect { cause } => {
12751296
tracing::info!(
12761297
"Disconnecting from network{}",

crates/core/src/node/op_state_manager.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ use crate::{
3232
ring::{ConnectionManager, LiveTransactionTracker, PeerKeyLocation, Ring},
3333
};
3434

35-
use super::{network_bridge::EventLoopNotificationsSender, NetEventRegister, NodeConfig};
35+
use super::{
36+
network_bridge::EventLoopNotificationsSender, proximity_cache::ProximityCacheManager,
37+
NetEventRegister, NodeConfig,
38+
};
3639

3740
#[cfg(debug_assertions)]
3841
macro_rules! check_id_op {
@@ -213,6 +216,8 @@ pub(crate) struct OpManager {
213216
pub is_gateway: bool,
214217
/// Sub-operation tracking for atomic operation execution
215218
sub_op_tracker: SubOperationTracker,
219+
/// Proximity cache manager for tracking neighbor contract caches
220+
pub proximity_cache: Arc<ProximityCacheManager>,
216221
}
217222

218223
impl Clone for OpManager {
@@ -228,6 +233,7 @@ impl Clone for OpManager {
228233
peer_ready: self.peer_ready.clone(),
229234
is_gateway: self.is_gateway,
230235
sub_op_tracker: self.sub_op_tracker.clone(),
236+
proximity_cache: self.proximity_cache.clone(),
231237
}
232238
}
233239
}
@@ -284,6 +290,8 @@ impl OpManager {
284290
tracing::debug!("Regular peer node: peer_ready will be set after first handshake");
285291
}
286292

293+
let proximity_cache = Arc::new(ProximityCacheManager::new());
294+
287295
Ok(Self {
288296
ring,
289297
ops,
@@ -295,6 +303,7 @@ impl OpManager {
295303
peer_ready,
296304
is_gateway,
297305
sub_op_tracker,
306+
proximity_cache,
298307
})
299308
}
300309

0 commit comments

Comments
 (0)