From 1565634fb1209759bf96d8aeacf87fa664a6c872 Mon Sep 17 00:00:00 2001 From: alienx5499 Date: Sat, 4 Oct 2025 21:19:11 +0530 Subject: [PATCH 1/8] feat: implement Kademlia periodic replication and republishing --- include/libp2p/protocol/kademlia/config.hpp | 48 +++++ .../protocol/kademlia/impl/kademlia_impl.hpp | 36 ++++ .../libp2p/protocol/kademlia/impl/storage.hpp | 4 + .../protocol/kademlia/impl/storage_impl.hpp | 2 + include/libp2p/protocol/kademlia/kademlia.hpp | 16 ++ src/protocol/kademlia/impl/kademlia_impl.cpp | 169 ++++++++++++++++++ src/protocol/kademlia/impl/storage_impl.cpp | 18 ++ 7 files changed, 293 insertions(+) diff --git a/include/libp2p/protocol/kademlia/config.hpp b/include/libp2p/protocol/kademlia/config.hpp index 34b22ade7..70104aa3e 100644 --- a/include/libp2p/protocol/kademlia/config.hpp +++ b/include/libp2p/protocol/kademlia/config.hpp @@ -47,6 +47,44 @@ namespace libp2p::protocol::kademlia { */ std::chrono::seconds delay = 10s; }; + + struct PeriodicReplication { + /** + * True if periodic replication is enabled + */ + bool enabled = true; + + /** + * Interval for periodic replication + * @note Default: 1h + */ + std::chrono::seconds interval = 1h; + + /** + * Number of peers to replicate to per cycle + * @note Default: 3 (subset of K_VALUE) + */ + size_t peers_per_cycle = 3; + }; + + struct PeriodicRepublishing { + /** + * True if periodic republishing is enabled + */ + bool enabled = true; + + /** + * Interval for periodic republishing + * @note Default: 24h + */ + std::chrono::seconds interval = 24h; + + /** + * Number of peers to republish to per cycle + * @note Default: 6 (subset of K_VALUE) + */ + size_t peers_per_cycle = 6; + }; } // namespace class Config { @@ -146,6 +184,16 @@ namespace libp2p::protocol::kademlia { */ RandomWalk randomWalk{}; + /** + * Periodic replication config + */ + PeriodicReplication periodicReplication{}; + + /** + * Periodic republishing config + */ + PeriodicRepublishing periodicRepublishing{}; + // https://github.com/libp2p/rust-libp2p/blob/c6cf7fec6913aa590622aeea16709fce6e9c99a5/protocols/kad/src/query/peers/closest.rs#L110-L120 size_t query_initial_peers = K_VALUE; diff --git a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp index d1c0d24d7..3dfa200ab 100644 --- a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp +++ b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp @@ -84,6 +84,22 @@ namespace libp2p::protocol::kademlia { std::shared_ptr openSession( std::shared_ptr stream) override; + /// Set replication interval + /// @param interval - replication interval + void setReplicationInterval(std::chrono::seconds interval) override; + + /// Set republishing interval + /// @param interval - republishing interval + void setRepublishingInterval(std::chrono::seconds interval) override; + + /// Enable/disable periodic replication + /// @param enabled - whether to enable replication + void setReplicationEnabled(bool enabled) override; + + /// Enable/disable periodic republishing + /// @param enabled - whether to enable republishing + void setRepublishingEnabled(bool enabled) override; + private: void onPutValue(const std::shared_ptr &session, Message &&msg); void onGetValue(const std::shared_ptr &session, Message &&msg); @@ -145,6 +161,26 @@ namespace libp2p::protocol::kademlia { basic::Scheduler::Handle handle{}; } random_walking_; + // Periodic replication and republishing + basic::Scheduler::Handle replication_timer_; + basic::Scheduler::Handle republishing_timer_; + + // Mutable configuration for runtime changes + mutable std::chrono::seconds replication_interval_; + mutable std::chrono::seconds republishing_interval_; + mutable bool replication_enabled_; + mutable bool republishing_enabled_; + + // Periodic operation callbacks + void onReplicationTimer(); + void onRepublishingTimer(); + void performReplication(); + void performRepublishing(); + + // Helper methods for periodic operations + std::vector getClosestPeers(const Key& key, size_t count); + void replicateRecord(const Key& key, const Value& value, bool extend_expiration); + log::SubLogger log_; }; diff --git a/include/libp2p/protocol/kademlia/impl/storage.hpp b/include/libp2p/protocol/kademlia/impl/storage.hpp index 1416ac400..a007f128d 100644 --- a/include/libp2p/protocol/kademlia/impl/storage.hpp +++ b/include/libp2p/protocol/kademlia/impl/storage.hpp @@ -25,6 +25,10 @@ namespace libp2p::protocol::kademlia { /// @returns true if it has value corresponding to given @param key. virtual bool hasValue(const Key &key) const = 0; + + /// Get all stored records for periodic operations + /// @return vector of stored records + virtual std::vector> getAllRecords() const = 0; }; } // namespace libp2p::protocol::kademlia diff --git a/include/libp2p/protocol/kademlia/impl/storage_impl.hpp b/include/libp2p/protocol/kademlia/impl/storage_impl.hpp index 2332b549d..7dec2666b 100644 --- a/include/libp2p/protocol/kademlia/impl/storage_impl.hpp +++ b/include/libp2p/protocol/kademlia/impl/storage_impl.hpp @@ -65,6 +65,8 @@ namespace libp2p::protocol::kademlia { bool hasValue(const Key &key) const override; + std::vector> getAllRecords() const override; + private: void onRefreshTimer(); void setTimerRefresh(); diff --git a/include/libp2p/protocol/kademlia/kademlia.hpp b/include/libp2p/protocol/kademlia/kademlia.hpp index 773ed633d..5aab9acb7 100644 --- a/include/libp2p/protocol/kademlia/kademlia.hpp +++ b/include/libp2p/protocol/kademlia/kademlia.hpp @@ -15,6 +15,22 @@ namespace libp2p::protocol::kademlia { virtual ~Kademlia() = default; virtual void start() = 0; + + /// Set replication interval + /// @param interval - replication interval + virtual void setReplicationInterval(std::chrono::seconds interval) = 0; + + /// Set republishing interval + /// @param interval - republishing interval + virtual void setRepublishingInterval(std::chrono::seconds interval) = 0; + + /// Enable/disable periodic replication + /// @param enabled - whether to enable replication + virtual void setReplicationEnabled(bool enabled) = 0; + + /// Enable/disable periodic republishing + /// @param enabled - whether to enable republishing + virtual void setRepublishingEnabled(bool enabled) = 0; }; } // namespace libp2p::protocol::kademlia diff --git a/src/protocol/kademlia/impl/kademlia_impl.cpp b/src/protocol/kademlia/impl/kademlia_impl.cpp index cfabbeb69..4a00b1713 100644 --- a/src/protocol/kademlia/impl/kademlia_impl.cpp +++ b/src/protocol/kademlia/impl/kademlia_impl.cpp @@ -120,6 +120,33 @@ namespace libp2p::protocol::kademlia { if (config_.randomWalk.enabled) { randomWalk(); } + + // Initialize mutable configuration + replication_interval_ = config_.periodicReplication.interval; + republishing_interval_ = config_.periodicRepublishing.interval; + replication_enabled_ = config_.periodicReplication.enabled; + republishing_enabled_ = config_.periodicRepublishing.enabled; + + // start periodic replication and republishing + if (replication_enabled_) { + replication_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (self) { + self->onReplicationTimer(); + } + }, replication_interval_); + } + + if (republishing_enabled_) { + republishing_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (self) { + self->onRepublishingTimer(); + } + }, republishing_interval_); + } } outcome::result KademliaImpl::bootstrap() { @@ -648,4 +675,146 @@ namespace libp2p::protocol::kademlia { std::move(handler)); } + void KademliaImpl::setReplicationInterval(std::chrono::seconds interval) { + replication_interval_ = interval; + if (replication_timer_) { + replication_timer_.reset(); + } + if (replication_enabled_) { + replication_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (self) { + self->onReplicationTimer(); + } + }, interval); + } + } + + void KademliaImpl::setRepublishingInterval(std::chrono::seconds interval) { + republishing_interval_ = interval; + if (republishing_timer_) { + republishing_timer_.reset(); + } + if (republishing_enabled_) { + republishing_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (self) { + self->onRepublishingTimer(); + } + }, interval); + } + } + + void KademliaImpl::setReplicationEnabled(bool enabled) { + replication_enabled_ = enabled; + if (enabled) { + setReplicationInterval(replication_interval_); + } else if (replication_timer_) { + replication_timer_.reset(); + } + } + + void KademliaImpl::setRepublishingEnabled(bool enabled) { + republishing_enabled_ = enabled; + if (enabled) { + setRepublishingInterval(republishing_interval_); + } else if (republishing_timer_) { + republishing_timer_.reset(); + } + } + + void KademliaImpl::onReplicationTimer() { + performReplication(); + // Schedule next replication + if (replication_enabled_) { + replication_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (self) { + self->onReplicationTimer(); + } + }, replication_interval_); + } + } + + void KademliaImpl::onRepublishingTimer() { + performRepublishing(); + // Schedule next republishing + if (republishing_enabled_) { + republishing_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (self) { + self->onRepublishingTimer(); + } + }, republishing_interval_); + } + } + + void KademliaImpl::performReplication() { + log_.debug("Performing periodic replication"); + + try { + auto records = storage_->getAllRecords(); + for (const auto& [key, value] : records) { + replicateRecord(key, value, false); // false = don't extend expiration + } + } catch (const std::exception& e) { + log_.error("Error during replication: {}", e.what()); + } + } + + void KademliaImpl::performRepublishing() { + log_.debug("Performing periodic republishing"); + + try { + auto records = storage_->getAllRecords(); + for (const auto& [key, value] : records) { + replicateRecord(key, value, true); // true = extend expiration + } + } catch (const std::exception& e) { + log_.error("Error during republishing: {}", e.what()); + } + } + + std::vector KademliaImpl::getClosestPeers(const Key& key, size_t count) { + std::vector closest_peers; + + // Get peers from peer routing table + HashedKey hashed_key(key); + auto peers = peer_routing_table_->getNearestPeers(hashed_key.hash, count); + + for (const auto& peer : peers) { + if (peer != self_id_) { // Don't include self + closest_peers.push_back(peer); + } + } + + return closest_peers; + } + + void KademliaImpl::replicateRecord(const Key& key, const Value& value, bool extend_expiration) { + auto closest_peers = getClosestPeers(key, + extend_expiration ? config_.periodicRepublishing.peers_per_cycle + : config_.periodicReplication.peers_per_cycle); + + if (closest_peers.empty()) { + log_.debug("No peers available for replication/republishing of key: {}", + multi::detail::encodeBase58(key)); + return; + } + + // Create and start put value executor + auto executor = createPutValueExecutor(key, value, closest_peers); + if (executor) { + std::ignore = executor->start(); + log_.debug("Started {} for key: {} to {} peers", + extend_expiration ? "republishing" : "replication", + multi::detail::encodeBase58(key), + closest_peers.size()); + } + } + } // namespace libp2p::protocol::kademlia diff --git a/src/protocol/kademlia/impl/storage_impl.cpp b/src/protocol/kademlia/impl/storage_impl.cpp index b8a5fd0be..40183eae3 100644 --- a/src/protocol/kademlia/impl/storage_impl.cpp +++ b/src/protocol/kademlia/impl/storage_impl.cpp @@ -117,4 +117,22 @@ namespace libp2p::protocol::kademlia { }, config_.storageRefreshInterval); } + + std::vector> StorageImpl::getAllRecords() const { + std::vector> records; + auto now = scheduler_->now(); + + // Iterate through all records and get their values from backend + for (const auto& record : *table_) { + // Only include non-expired records + if (record.expire_time > now) { + auto value_result = backend_->getValue(record.key); + if (value_result) { + records.emplace_back(record.key, value_result.value()); + } + } + } + + return records; + } } // namespace libp2p::protocol::kademlia From ce24de7efa2dfaafe60b73cf698a553f8edd6bc8 Mon Sep 17 00:00:00 2001 From: alienx5499 Date: Wed, 8 Oct 2025 13:36:22 +0530 Subject: [PATCH 2/8] refactor(kademlia): drop duplicated mutable flags; read from config_ directly per review --- .../protocol/kademlia/impl/kademlia_impl.hpp | 6 ---- src/protocol/kademlia/impl/kademlia_impl.cpp | 34 +++++++------------ 2 files changed, 12 insertions(+), 28 deletions(-) diff --git a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp index 3dfa200ab..e58c98146 100644 --- a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp +++ b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp @@ -164,12 +164,6 @@ namespace libp2p::protocol::kademlia { // Periodic replication and republishing basic::Scheduler::Handle replication_timer_; basic::Scheduler::Handle republishing_timer_; - - // Mutable configuration for runtime changes - mutable std::chrono::seconds replication_interval_; - mutable std::chrono::seconds republishing_interval_; - mutable bool replication_enabled_; - mutable bool republishing_enabled_; // Periodic operation callbacks void onReplicationTimer(); diff --git a/src/protocol/kademlia/impl/kademlia_impl.cpp b/src/protocol/kademlia/impl/kademlia_impl.cpp index 4a00b1713..3fa93d80e 100644 --- a/src/protocol/kademlia/impl/kademlia_impl.cpp +++ b/src/protocol/kademlia/impl/kademlia_impl.cpp @@ -121,31 +121,25 @@ namespace libp2p::protocol::kademlia { randomWalk(); } - // Initialize mutable configuration - replication_interval_ = config_.periodicReplication.interval; - republishing_interval_ = config_.periodicRepublishing.interval; - replication_enabled_ = config_.periodicReplication.enabled; - republishing_enabled_ = config_.periodicRepublishing.enabled; - // start periodic replication and republishing - if (replication_enabled_) { + if (config_.periodicReplication.enabled) { replication_timer_ = scheduler_->scheduleWithHandle( [weak_self{weak_from_this()}] { auto self = weak_self.lock(); if (self) { self->onReplicationTimer(); } - }, replication_interval_); + }, config_.periodicReplication.interval); } - if (republishing_enabled_) { + if (config_.periodicRepublishing.enabled) { republishing_timer_ = scheduler_->scheduleWithHandle( [weak_self{weak_from_this()}] { auto self = weak_self.lock(); if (self) { self->onRepublishingTimer(); } - }, republishing_interval_); + }, config_.periodicRepublishing.interval); } } @@ -676,11 +670,10 @@ namespace libp2p::protocol::kademlia { } void KademliaImpl::setReplicationInterval(std::chrono::seconds interval) { - replication_interval_ = interval; if (replication_timer_) { replication_timer_.reset(); } - if (replication_enabled_) { + if (config_.periodicReplication.enabled) { replication_timer_ = scheduler_->scheduleWithHandle( [weak_self{weak_from_this()}] { auto self = weak_self.lock(); @@ -692,11 +685,10 @@ namespace libp2p::protocol::kademlia { } void KademliaImpl::setRepublishingInterval(std::chrono::seconds interval) { - republishing_interval_ = interval; if (republishing_timer_) { republishing_timer_.reset(); } - if (republishing_enabled_) { + if (config_.periodicRepublishing.enabled) { republishing_timer_ = scheduler_->scheduleWithHandle( [weak_self{weak_from_this()}] { auto self = weak_self.lock(); @@ -708,18 +700,16 @@ namespace libp2p::protocol::kademlia { } void KademliaImpl::setReplicationEnabled(bool enabled) { - replication_enabled_ = enabled; if (enabled) { - setReplicationInterval(replication_interval_); + setReplicationInterval(config_.periodicReplication.interval); } else if (replication_timer_) { replication_timer_.reset(); } } void KademliaImpl::setRepublishingEnabled(bool enabled) { - republishing_enabled_ = enabled; if (enabled) { - setRepublishingInterval(republishing_interval_); + setRepublishingInterval(config_.periodicRepublishing.interval); } else if (republishing_timer_) { republishing_timer_.reset(); } @@ -728,28 +718,28 @@ namespace libp2p::protocol::kademlia { void KademliaImpl::onReplicationTimer() { performReplication(); // Schedule next replication - if (replication_enabled_) { + if (config_.periodicReplication.enabled) { replication_timer_ = scheduler_->scheduleWithHandle( [weak_self{weak_from_this()}] { auto self = weak_self.lock(); if (self) { self->onReplicationTimer(); } - }, replication_interval_); + }, config_.periodicReplication.interval); } } void KademliaImpl::onRepublishingTimer() { performRepublishing(); // Schedule next republishing - if (republishing_enabled_) { + if (config_.periodicRepublishing.enabled) { republishing_timer_ = scheduler_->scheduleWithHandle( [weak_self{weak_from_this()}] { auto self = weak_self.lock(); if (self) { self->onRepublishingTimer(); } - }, republishing_interval_); + }, config_.periodicRepublishing.interval); } } From 52862fe104f6e8801d615c22de851d30498bf156 Mon Sep 17 00:00:00 2001 From: alienx5499 Date: Wed, 8 Oct 2025 13:41:39 +0530 Subject: [PATCH 3/8] refactor(kademlia): remove runtime setters from interface; rely on Config as requested --- .../protocol/kademlia/impl/kademlia_impl.hpp | 16 +------ include/libp2p/protocol/kademlia/kademlia.hpp | 16 ------- src/protocol/kademlia/impl/kademlia_impl.cpp | 46 +------------------ 3 files changed, 2 insertions(+), 76 deletions(-) diff --git a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp index e58c98146..b2b0b3ef9 100644 --- a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp +++ b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp @@ -84,21 +84,7 @@ namespace libp2p::protocol::kademlia { std::shared_ptr openSession( std::shared_ptr stream) override; - /// Set replication interval - /// @param interval - replication interval - void setReplicationInterval(std::chrono::seconds interval) override; - - /// Set republishing interval - /// @param interval - republishing interval - void setRepublishingInterval(std::chrono::seconds interval) override; - - /// Enable/disable periodic replication - /// @param enabled - whether to enable replication - void setReplicationEnabled(bool enabled) override; - - /// Enable/disable periodic republishing - /// @param enabled - whether to enable republishing - void setRepublishingEnabled(bool enabled) override; + // Periodic behavior is driven by configuration only private: void onPutValue(const std::shared_ptr &session, Message &&msg); diff --git a/include/libp2p/protocol/kademlia/kademlia.hpp b/include/libp2p/protocol/kademlia/kademlia.hpp index 5aab9acb7..773ed633d 100644 --- a/include/libp2p/protocol/kademlia/kademlia.hpp +++ b/include/libp2p/protocol/kademlia/kademlia.hpp @@ -15,22 +15,6 @@ namespace libp2p::protocol::kademlia { virtual ~Kademlia() = default; virtual void start() = 0; - - /// Set replication interval - /// @param interval - replication interval - virtual void setReplicationInterval(std::chrono::seconds interval) = 0; - - /// Set republishing interval - /// @param interval - republishing interval - virtual void setRepublishingInterval(std::chrono::seconds interval) = 0; - - /// Enable/disable periodic replication - /// @param enabled - whether to enable replication - virtual void setReplicationEnabled(bool enabled) = 0; - - /// Enable/disable periodic republishing - /// @param enabled - whether to enable republishing - virtual void setRepublishingEnabled(bool enabled) = 0; }; } // namespace libp2p::protocol::kademlia diff --git a/src/protocol/kademlia/impl/kademlia_impl.cpp b/src/protocol/kademlia/impl/kademlia_impl.cpp index 3fa93d80e..65d08d705 100644 --- a/src/protocol/kademlia/impl/kademlia_impl.cpp +++ b/src/protocol/kademlia/impl/kademlia_impl.cpp @@ -669,51 +669,7 @@ namespace libp2p::protocol::kademlia { std::move(handler)); } - void KademliaImpl::setReplicationInterval(std::chrono::seconds interval) { - if (replication_timer_) { - replication_timer_.reset(); - } - if (config_.periodicReplication.enabled) { - replication_timer_ = scheduler_->scheduleWithHandle( - [weak_self{weak_from_this()}] { - auto self = weak_self.lock(); - if (self) { - self->onReplicationTimer(); - } - }, interval); - } - } - - void KademliaImpl::setRepublishingInterval(std::chrono::seconds interval) { - if (republishing_timer_) { - republishing_timer_.reset(); - } - if (config_.periodicRepublishing.enabled) { - republishing_timer_ = scheduler_->scheduleWithHandle( - [weak_self{weak_from_this()}] { - auto self = weak_self.lock(); - if (self) { - self->onRepublishingTimer(); - } - }, interval); - } - } - - void KademliaImpl::setReplicationEnabled(bool enabled) { - if (enabled) { - setReplicationInterval(config_.periodicReplication.interval); - } else if (replication_timer_) { - replication_timer_.reset(); - } - } - - void KademliaImpl::setRepublishingEnabled(bool enabled) { - if (enabled) { - setRepublishingInterval(config_.periodicRepublishing.interval); - } else if (republishing_timer_) { - republishing_timer_.reset(); - } - } + // Periodic behavior is driven by configuration only; no runtime setters void KademliaImpl::onReplicationTimer() { performReplication(); From f919e261d9a6589a3ea1f1d1d76a9dd0601084b0 Mon Sep 17 00:00:00 2001 From: alienx5499 Date: Wed, 8 Oct 2025 13:47:25 +0530 Subject: [PATCH 4/8] refactor(kademlia): drop try/catch in periodic replication/republishing per review --- src/protocol/kademlia/impl/kademlia_impl.cpp | 24 +++++++------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/src/protocol/kademlia/impl/kademlia_impl.cpp b/src/protocol/kademlia/impl/kademlia_impl.cpp index 65d08d705..109078360 100644 --- a/src/protocol/kademlia/impl/kademlia_impl.cpp +++ b/src/protocol/kademlia/impl/kademlia_impl.cpp @@ -701,27 +701,19 @@ namespace libp2p::protocol::kademlia { void KademliaImpl::performReplication() { log_.debug("Performing periodic replication"); - - try { - auto records = storage_->getAllRecords(); - for (const auto& [key, value] : records) { - replicateRecord(key, value, false); // false = don't extend expiration - } - } catch (const std::exception& e) { - log_.error("Error during replication: {}", e.what()); + + auto records = storage_->getAllRecords(); + for (const auto& [key, value]: records) { + replicateRecord(key, value, false); // false = don't extend expiration } } void KademliaImpl::performRepublishing() { log_.debug("Performing periodic republishing"); - - try { - auto records = storage_->getAllRecords(); - for (const auto& [key, value] : records) { - replicateRecord(key, value, true); // true = extend expiration - } - } catch (const std::exception& e) { - log_.error("Error during republishing: {}", e.what()); + + auto records = storage_->getAllRecords(); + for (const auto& [key, value] : records) { + replicateRecord(key, value, true); // true = extend expiration } } From 37a603a2cbb5b785608b46c0f07438eb5c2ff618 Mon Sep 17 00:00:00 2001 From: alienx5499 Date: Wed, 8 Oct 2025 14:08:07 +0530 Subject: [PATCH 5/8] refactor(kademlia): deduplicate timer logic into setReplicationTimer/setRepublishingTimer per review --- .../protocol/kademlia/impl/kademlia_impl.hpp | 2 + src/protocol/kademlia/impl/kademlia_impl.cpp | 52 ++++++++++++------- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp index b2b0b3ef9..6bfd0f18a 100644 --- a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp +++ b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp @@ -152,6 +152,8 @@ namespace libp2p::protocol::kademlia { basic::Scheduler::Handle republishing_timer_; // Periodic operation callbacks + void setReplicationTimer(); + void setRepublishingTimer(); void onReplicationTimer(); void onRepublishingTimer(); void performReplication(); diff --git a/src/protocol/kademlia/impl/kademlia_impl.cpp b/src/protocol/kademlia/impl/kademlia_impl.cpp index 109078360..1b96a1ece 100644 --- a/src/protocol/kademlia/impl/kademlia_impl.cpp +++ b/src/protocol/kademlia/impl/kademlia_impl.cpp @@ -1,3 +1,34 @@ + void KademliaImpl::setReplicationTimer() { + if (not config_.periodicReplication.enabled) { + return; + } + replication_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (not self) { + return; + } + self->setReplicationTimer(); + self->onReplicationTimer(); + }, + config_.periodicReplication.interval); + } + + void KademliaImpl::setRepublishingTimer() { + if (not config_.periodicRepublishing.enabled) { + return; + } + republishing_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (not self) { + return; + } + self->setRepublishingTimer(); + self->onRepublishingTimer(); + }, + config_.periodicRepublishing.interval); + } /** * Copyright Quadrivium LLC * All Rights Reserved @@ -122,25 +153,8 @@ namespace libp2p::protocol::kademlia { } // start periodic replication and republishing - if (config_.periodicReplication.enabled) { - replication_timer_ = scheduler_->scheduleWithHandle( - [weak_self{weak_from_this()}] { - auto self = weak_self.lock(); - if (self) { - self->onReplicationTimer(); - } - }, config_.periodicReplication.interval); - } - - if (config_.periodicRepublishing.enabled) { - republishing_timer_ = scheduler_->scheduleWithHandle( - [weak_self{weak_from_this()}] { - auto self = weak_self.lock(); - if (self) { - self->onRepublishingTimer(); - } - }, config_.periodicRepublishing.interval); - } + setReplicationTimer(); + setRepublishingTimer(); } outcome::result KademliaImpl::bootstrap() { From 90158137e739f95cbc295de0d14fecd3d082ca79 Mon Sep 17 00:00:00 2001 From: alienx5499 Date: Wed, 8 Oct 2025 19:49:01 +0530 Subject: [PATCH 6/8] feat(kademlia): on republish extend local expiration by putValue; log on failure --- src/protocol/kademlia/impl/kademlia_impl.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/protocol/kademlia/impl/kademlia_impl.cpp b/src/protocol/kademlia/impl/kademlia_impl.cpp index 1b96a1ece..117bf923e 100644 --- a/src/protocol/kademlia/impl/kademlia_impl.cpp +++ b/src/protocol/kademlia/impl/kademlia_impl.cpp @@ -748,6 +748,15 @@ namespace libp2p::protocol::kademlia { } void KademliaImpl::replicateRecord(const Key& key, const Value& value, bool extend_expiration) { + // If republishing, extend local expiration by putting the value back to storage + if (extend_expiration) { + auto put_res = storage_->putValue(key, value); + if (!put_res) { + log_.warn("Republish: failed to extend expiration for key: {}: {}", + multi::detail::encodeBase58(key), put_res.error()); + } + } + auto closest_peers = getClosestPeers(key, extend_expiration ? config_.periodicRepublishing.peers_per_cycle : config_.periodicReplication.peers_per_cycle); From 0cd8188231e1a3d82a7021802a1dd7c1e1c89759 Mon Sep 17 00:00:00 2001 From: alienx5499 Date: Wed, 8 Oct 2025 20:00:51 +0530 Subject: [PATCH 7/8] perf(connection_manager): reserve only for open connections as suggested in review --- src/network/impl/connection_manager_impl.cpp | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/network/impl/connection_manager_impl.cpp b/src/network/impl/connection_manager_impl.cpp index 1b9d7d540..d70fb2f17 100644 --- a/src/network/impl/connection_manager_impl.cpp +++ b/src/network/impl/connection_manager_impl.cpp @@ -68,9 +68,18 @@ namespace libp2p::network { std::vector ConnectionManagerImpl::getConnections() const { std::vector out; - out.reserve(connections_.size()); + // Pre-allocate space for better performance (only open connections) + size_t total_connections = 0; + for (const auto &entry : connections_) { + for (const auto &conn : entry.second) { + if (not conn->isClosed()) { + ++total_connections; + } + } + } + out.reserve(total_connections); - for (auto &&entry : connections_) { + for (const auto &entry : connections_) { for (const auto &conn : entry.second) { if (not conn->isClosed()) { out.emplace_back(conn); From 549ed16e7373cade81b6a284d71c7cd455fc6725 Mon Sep 17 00:00:00 2001 From: Ruslan Tushov Date: Wed, 8 Oct 2025 20:11:32 +0500 Subject: [PATCH 8/8] Update kademlia_impl.cpp --- src/protocol/kademlia/impl/kademlia_impl.cpp | 83 ++++++++------------ 1 file changed, 32 insertions(+), 51 deletions(-) diff --git a/src/protocol/kademlia/impl/kademlia_impl.cpp b/src/protocol/kademlia/impl/kademlia_impl.cpp index 117bf923e..0e42de3e7 100644 --- a/src/protocol/kademlia/impl/kademlia_impl.cpp +++ b/src/protocol/kademlia/impl/kademlia_impl.cpp @@ -1,34 +1,3 @@ - void KademliaImpl::setReplicationTimer() { - if (not config_.periodicReplication.enabled) { - return; - } - replication_timer_ = scheduler_->scheduleWithHandle( - [weak_self{weak_from_this()}] { - auto self = weak_self.lock(); - if (not self) { - return; - } - self->setReplicationTimer(); - self->onReplicationTimer(); - }, - config_.periodicReplication.interval); - } - - void KademliaImpl::setRepublishingTimer() { - if (not config_.periodicRepublishing.enabled) { - return; - } - republishing_timer_ = scheduler_->scheduleWithHandle( - [weak_self{weak_from_this()}] { - auto self = weak_self.lock(); - if (not self) { - return; - } - self->setRepublishingTimer(); - self->onRepublishingTimer(); - }, - config_.periodicRepublishing.interval); - } /** * Copyright Quadrivium LLC * All Rights Reserved @@ -685,32 +654,44 @@ namespace libp2p::protocol::kademlia { // Periodic behavior is driven by configuration only; no runtime setters + void KademliaImpl::setReplicationTimer() { + if (not config_.periodicReplication.enabled) { + return; + } + replication_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (not self) { + return; + } + self->setReplicationTimer(); + self->onReplicationTimer(); + }, + config_.periodicReplication.interval); + } + + void KademliaImpl::setRepublishingTimer() { + if (not config_.periodicRepublishing.enabled) { + return; + } + republishing_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (not self) { + return; + } + self->setRepublishingTimer(); + self->onRepublishingTimer(); + }, + config_.periodicRepublishing.interval); + } + void KademliaImpl::onReplicationTimer() { performReplication(); - // Schedule next replication - if (config_.periodicReplication.enabled) { - replication_timer_ = scheduler_->scheduleWithHandle( - [weak_self{weak_from_this()}] { - auto self = weak_self.lock(); - if (self) { - self->onReplicationTimer(); - } - }, config_.periodicReplication.interval); - } } void KademliaImpl::onRepublishingTimer() { performRepublishing(); - // Schedule next republishing - if (config_.periodicRepublishing.enabled) { - republishing_timer_ = scheduler_->scheduleWithHandle( - [weak_self{weak_from_this()}] { - auto self = weak_self.lock(); - if (self) { - self->onRepublishingTimer(); - } - }, config_.periodicRepublishing.interval); - } } void KademliaImpl::performReplication() {