Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,6 @@ impl Operation for SubscribeOp {

let ring_max_htl = op_manager.ring.max_hops_to_live.max(1);
let htl = (*htl).min(ring_max_htl);
let this_peer = op_manager.ring.connection_manager.own_location();
let return_not_subbed = || -> OperationResult {
let return_msg = SubscribeMsg::ReturnSub {
key: *key,
Expand Down Expand Up @@ -732,7 +731,10 @@ impl Operation for SubscribeOp {
skip = ?new_skip_list,
"Forwarding seek to next candidate"
);
// Retry seek node when the contract to subscribe has not been found in this node
// Retry seek node when the contract to subscribe has not been found in this node.
// IMPORTANT: Preserve the original subscriber's identity (pub_key) so
// the final contract holder registers the correct peer as the subscriber.
// The recipient will fill in our address from the packet source.
return build_op_result(
*id,
Some(SubscribeState::AwaitingResponse {
Expand All @@ -741,14 +743,15 @@ impl Operation for SubscribeOp {
current_hop: new_htl,
upstream_subscriber: Some(subscriber.clone()),
}),
// Use PeerAddr::Unknown - the subscriber doesn't know their own
// external address (especially behind NAT). The recipient will
// fill this in from the packet source address.
(SubscribeMsg::SeekNode {
id: *id,
key: *key,
// Keep the original subscriber's pub_key but reset the address
// to Unknown - the next hop will fill it from source_addr.
// This ensures the chain of intermediate peers each forward
// the original subscriber's identity toward the contract holder.
subscriber: PeerKeyLocation::with_unknown_addr(
this_peer.pub_key().clone(),
subscriber.pub_key().clone(),
),
target: new_target,
skip_list: new_skip_list,
Expand Down Expand Up @@ -851,13 +854,22 @@ impl Operation for SubscribeOp {
.ring
.k_closest_potentially_caching(key, &skip_list, 3);
if let Some(target) = candidates.first() {
// Use PeerAddr::Unknown - the subscriber doesn't know their own
// external address (especially behind NAT). The recipient will
// fill this in from the packet source address.
let own_loc = op_manager.ring.connection_manager.own_location();
let subscriber = PeerKeyLocation::with_unknown_addr(
own_loc.pub_key().clone(),
);
// Preserve the original subscriber's identity when retrying.
// If we have an upstream_subscriber (we're an intermediate node),
// use their pub_key. Otherwise, we're the originating node.
let subscriber_pub_key =
if let Some(ref upstream) = upstream_subscriber {
upstream.pub_key().clone()
} else {
op_manager
.ring
.connection_manager
.own_location()
.pub_key()
.clone()
};
let subscriber =
PeerKeyLocation::with_unknown_addr(subscriber_pub_key);
return_msg = Some(SubscribeMsg::SeekNode {
id: *id,
key: *key,
Expand Down
15 changes: 7 additions & 8 deletions crates/core/tests/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,14 +636,13 @@ async fn test_put_merge_persists_state(ctx: &mut TestContext) -> TestResult {
Ok(())
}

// This test is disabled due to race conditions in subscription propagation logic.
// The test expects multiple clients across different nodes to receive subscription updates,
// but the PUT caching refactor (commits 2cd337b5-0d432347) changed the subscription semantics.
// Re-enabled after recent fixes to subscription logic - previously exhibited race conditions.
// If this test becomes flaky again, see issue #1798 for historical context.
// Ignored again due to recurring flakiness - fails intermittently with timeout waiting for
// cross-node subscription notifications (Client 3 timeout). See issue #1798.
#[ignore]
// This test validates cross-node subscription propagation: when a client on node-b subscribes
// to a contract PUT on node-a, UPDATE notifications should reach node-b's clients.
// Issue #2220 fixed the root cause where intermediate nodes replaced the original subscriber's
// pub_key with their own when forwarding SeekNode messages.
// Note: This test exhibits timing-related flakiness in CI due to multi-node startup and connection
// timing. When the test infrastructure succeeds in establishing connections, cross-node
// subscriptions work correctly. Consider using --test-threads=1 if flaky in CI.
#[freenet_test(
nodes = ["gateway", "node-a", "node-b"],
auto_connect_peers = true,
Expand Down
Loading