Skip to content

Commit 7de5e7a

Browse files
committed
chore: print relay and p2p status
1 parent dbde974 commit 7de5e7a

File tree

6 files changed

+171
-22
lines changed

6 files changed

+171
-22
lines changed

src/client/main.rs

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use clap::Parser;
22
use std::sync::Arc;
3-
3+
use std::time::Duration;
4+
use tokio::time::interval;
45
use crate::client::{Args, DEFAULT_MTU, P2P_UDP_PORT};
5-
use crate::client::relay::{ClientHandler, new_relay_handler};
6+
use crate::client::relay::{RelayHandler, new_relay_handler};
67
use crate::client::peer::{PeerHandler};
78
use crate::client::prettylog::{log_startup_banner};
89
use crate::codec::frame::{DataFrame, Frame, HandshakeReplyFrame};
@@ -93,10 +94,11 @@ fn init_device(device_config: &HandshakeReplyFrame) -> crate::Result<DeviceHandl
9394
}
9495

9596
async fn run_event_loop(
96-
client_handler: &mut ClientHandler,
97+
client_handler: &mut RelayHandler,
9798
peer_handler: &mut Option<PeerHandler>,
9899
dev: &mut DeviceHandler,
99100
) {
101+
let mut exporter_ticker = interval(Duration::from_secs(30));
100102
loop {
101103
// Build select branches based on whether P2P is enabled
102104
if let Some(peer_handler) = peer_handler {
@@ -163,6 +165,9 @@ async fn run_event_loop(
163165
}
164166
}
165167
}
168+
_ = exporter_ticker.tick() => {
169+
get_status(client_handler, Some(peer_handler)).await;
170+
}
166171
}
167172
} else {
168173
// P2P disabled: relay only
@@ -186,7 +191,66 @@ async fn run_event_loop(
186191
}
187192
}
188193
}
194+
_ = exporter_ticker.tick() => {
195+
get_status(client_handler, None).await;
196+
}
189197
}
190198
}
191199
}
192200
}
201+
202+
async fn get_status(relay: &RelayHandler, peer: Option<&PeerHandler>) {
203+
println!("\n╔══════════════════════════════════════════════════════════════════════╗");
204+
println!("║ CONNECTION STATUS ║");
205+
println!("╚══════════════════════════════════════════════════════════════════════╝");
206+
207+
// Relay Status
208+
let relay_status = relay.get_status();
209+
println!("\n📡 Relay Connection (TCP)");
210+
println!(" ├─ RX Frames: {} (Errors: {})", relay_status.rx_frame, relay_status.rx_error);
211+
println!(" └─ TX Frames: {} (Errors: {})", relay_status.tx_frame, relay_status.tx_error);
212+
213+
// P2P Status
214+
if let Some(peer_handler) = peer {
215+
let peer_status = peer_handler.get_status().await;
216+
217+
if peer_status.is_empty() {
218+
println!("\n🔗 P2P Connections (UDP)");
219+
println!(" └─ No peers configured");
220+
} else {
221+
println!("\n🔗 P2P Connections (UDP): {} peers", peer_status.len());
222+
223+
for (idx, status) in peer_status.iter().enumerate() {
224+
let is_last = idx == peer_status.len() - 1;
225+
let prefix = if is_last { "└─" } else { "├─" };
226+
227+
// Format connection state
228+
let state = match (&status.addr, &status.last_active) {
229+
(None, _) => "❌ Unknown Address".to_string(),
230+
(Some(_), None) => "⏳ Connecting...".to_string(),
231+
(Some(_), Some(last)) => {
232+
let elapsed = last.elapsed().as_secs();
233+
if elapsed < 15 {
234+
format!("✅ Active ({}s ago)", elapsed)
235+
} else {
236+
format!("⚠️ Inactive ({}s ago)", elapsed)
237+
}
238+
}
239+
};
240+
241+
// Format address
242+
let addr_str = status.addr
243+
.map(|a| format!("{}", a))
244+
.unwrap_or_else(|| "N/A".to_string());
245+
246+
println!(" {} Peer: {}", prefix, status.identity);
247+
println!(" {} Address: {}", if is_last { " " } else { "│" }, addr_str);
248+
println!(" {} Status: {}", if is_last { " " } else { "│" }, state);
249+
}
250+
}
251+
} else {
252+
println!("\n🔗 P2P Mode: Disabled");
253+
}
254+
255+
println!();
256+
}

src/client/peer.rs

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,40 @@ struct PeerMeta {
165165
last_active: Option<Instant>,
166166
}
167167

168+
/// Peer connection status information
169+
///
170+
/// Provides a snapshot of a peer's current connection state including
171+
/// its identity, resolved address, and last activity timestamp.
172+
///
173+
/// # Usage
174+
/// Used by `PeerHandler::get_status()` to report the health and connectivity
175+
/// state of all configured P2P peers. Useful for monitoring, debugging,
176+
/// and displaying connection status to users.
177+
///
178+
/// # Fields
179+
/// * `identity` - Unique identifier of the peer
180+
/// * `addr` - Resolved socket address for P2P communication (None if unknown)
181+
/// * `last_active` - Timestamp of last received packet (None if never connected)
182+
#[derive(Debug)]
183+
pub struct PeerStatus {
184+
/// Unique identifier of the peer
185+
pub identity: String,
186+
187+
/// Resolved socket address ([ipv6]:port or [ipv4]:port)
188+
///
189+
/// - `None`: Address not yet known (peer behind NAT, waiting for first packet)
190+
/// - `Some(addr)`: Known address, either from configuration or learned dynamically
191+
pub addr: Option<SocketAddr>,
192+
193+
/// Timestamp of last received packet from this peer
194+
///
195+
/// - `None`: No packets received yet (connection not established)
196+
/// - `Some(instant)`: Last successful communication time
197+
///
198+
/// Connection is considered active if `Instant::now() - last_active < 15s`
199+
pub last_active: Option<Instant>,
200+
}
201+
168202
/// Peer connection manager for P2P communication
169203
///
170204
/// Manages a collection of peer connections and handles:
@@ -483,12 +517,11 @@ impl PeerHandler {
483517
.await
484518
.ok_or("recv from peers channel closed")?;
485519

486-
self.update_peer_active(remote).await;
487520
let (frame, _) = Parser::unmarshal(&buf, self.block.as_ref())?;
488521

489522
match frame {
490523
Frame::KeepAlive(ka) => {
491-
tracing::debug!("Received keepalive from peer {} at {}", ka.identity, remote);
524+
tracing::info!("Received keepalive from peer {} at {}", ka.identity, remote);
492525

493526
// Update peer address and last_active based on keepalive identity
494527
// This enables address learning for peers behind NAT
@@ -505,10 +538,12 @@ impl PeerHandler {
505538
peer.remote_addr = Some(remote);
506539
peer.last_active = Some(Instant::now());
507540
}
508-
509-
continue; // Skip keepalive, receive next frame
541+
542+
// Dont need to reply
543+
continue;
510544
}
511545
_ => {
546+
self.update_peer_active(remote).await;
512547
return Ok(frame);
513548
}
514549
}
@@ -540,7 +575,6 @@ impl PeerHandler {
540575
let peers = self.peers.read().await;
541576
let peer = self.find_peer_by_ip_locked(&peers, dest_ip)
542577
.ok_or("No peer found for destination")?;
543-
544578
(peer.identity.clone(), peer.remote_addr, peer.last_active)
545579
};
546580

@@ -662,4 +696,18 @@ impl PeerHandler {
662696
}
663697
}
664698
}
699+
700+
pub async fn get_status(&self) -> Vec<PeerStatus> {
701+
let guard = self.peers.read().await;
702+
let mut result: Vec<PeerStatus> = Vec::new();
703+
for peer in guard.values() {
704+
let status = PeerStatus {
705+
identity: peer.identity.clone(),
706+
addr: peer.remote_addr,
707+
last_active: peer.last_active,
708+
};
709+
result.push(status);
710+
}
711+
result
712+
}
665713
}

src/client/prettylog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ pub fn log_handshake_success(config: &HandshakeReplyFrame) {
2121
println!("Gateway: {}", config.gateway);
2222
println!("IPv6: {}", config.ipv6);
2323
println!("Peer nodes: {}", config.others.len());
24-
2524
if !config.others.is_empty() {
2625
for (idx, peer) in config.others.iter().enumerate() {
2726
println!(" [{}] Identity: {}", idx + 1, peer.identity);
2827
println!(" Private IP: {}", peer.private_ip);
28+
println!(" IPv6: {}", peer.ipv6);
2929
println!(" CIDR ranges: {}", peer.ciders.join(", "));
3030
}
3131
}

src/client/relay.rs

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -170,21 +170,43 @@ impl RelayClient {
170170
}
171171
}
172172

173-
pub struct ClientHandler {
173+
#[derive(Clone)]
174+
#[derive(Debug)]
175+
pub struct RelayStatus {
176+
pub rx_error: u64,
177+
pub rx_frame: u64,
178+
pub tx_frame: u64,
179+
pub tx_error: u64,
180+
}
181+
182+
impl Default for RelayStatus {
183+
fn default() -> Self {
184+
Self{
185+
rx_error: 0,
186+
tx_error: 0,
187+
rx_frame: 0,
188+
tx_frame: 0,
189+
}
190+
}
191+
}
192+
193+
pub struct RelayHandler {
174194
outbound_tx: Option<mpsc::Sender<Frame>>,
175195
inbound_rx: mpsc::Receiver<Frame>,
176196
inbound_tx: mpsc::Sender<Frame>,
177197
block: Arc<Box<dyn Block>>,
198+
metrics: RelayStatus,
178199
}
179200

180-
impl ClientHandler {
181-
pub fn new(block: Arc<Box<dyn Block>>) -> ClientHandler {
201+
impl RelayHandler {
202+
pub fn new(block: Arc<Box<dyn Block>>) -> RelayHandler {
182203
let (inbound_tx, inbound_rx) = mpsc::channel(10);
183-
ClientHandler {
204+
RelayHandler {
184205
outbound_tx: None,
185206
inbound_rx,
186207
inbound_tx,
187208
block,
209+
metrics: Default::default(),
188210
}
189211
}
190212

@@ -231,28 +253,44 @@ impl ClientHandler {
231253
}
232254

233255
pub async fn send_frame(&mut self, frame: Frame) -> crate::Result<()> {
256+
self.metrics.tx_frame += 1;
234257
let outbound_tx = match self.outbound_tx.clone() {
235258
Some(tx) => tx,
236-
None => {return Err("relay connection disconnect".into())}
259+
None => {
260+
self.metrics.tx_error += 1;
261+
return Err("relay connection disconnect".into())}
237262
};
238263

239264
let result = outbound_tx.send(frame).await;
240265
match result {
241266
Ok(()) => Ok(()),
242-
Err(e) => Err(format!("device=> server fail {:?}", e).into()),
267+
Err(e) => {
268+
self.metrics.tx_error += 1;
269+
Err(format!("device=> server fail {:?}", e).into())
270+
},
243271
}
244272
}
245273

246274
pub async fn recv_frame(&mut self) -> crate::Result<Frame> {
247275
let result = self.inbound_rx.recv().await;
248276
match result {
249-
Some(frame) => Ok(frame),
250-
None => Err("server => device fail for closed channel".into()),
277+
Some(frame) => {
278+
self.metrics.rx_frame += 1;
279+
Ok(frame)
280+
},
281+
None => {
282+
self.metrics.rx_error += 1;
283+
Err("server => device fail for closed channel".into())
284+
},
251285
}
252286
}
287+
288+
pub fn get_status(&self) -> RelayStatus {
289+
self.metrics.clone()
290+
}
253291
}
254292

255-
pub async fn new_relay_handler(args: &Args, block: Arc<Box<dyn Block>>)->crate::Result<(ClientHandler, HandshakeReplyFrame)> {
293+
pub async fn new_relay_handler(args: &Args, block: Arc<Box<dyn Block>>)->crate::Result<(RelayHandler, HandshakeReplyFrame)> {
256294
let ipv6 = utils::get_ipv6().unwrap_or("".to_string());
257295
let client_config = RelayClientConfig {
258296
server_addr: args.server.clone(),
@@ -264,7 +302,7 @@ pub async fn new_relay_handler(args: &Args, block: Arc<Box<dyn Block>>)->crate::
264302
port: P2P_UDP_PORT,
265303
};
266304

267-
let mut handler = ClientHandler::new(block);
305+
let mut handler = RelayHandler::new(block);
268306
let (config_ready_tx, mut config_ready_rx) = mpsc::channel(CONFIG_CHANNEL_SIZE);
269307
handler.run_client(client_config, config_ready_tx);
270308

src/utils/device.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl Device {
6767
}
6868
packet = self.outbound_rx.recv() => {
6969
if let Some(packet) = packet {
70-
tracing::info!("server => device {} bytes", packet.len());
70+
tracing::debug!("server => device {} bytes", packet.len());
7171
let result = dev.write(packet.as_slice()).await;
7272
if let Err(e) = result {
7373
tracing::error!("write device fail: {:?}", e);
@@ -129,7 +129,7 @@ impl DeviceHandler {
129129
return Err("device handler send none".into());
130130
}
131131
};
132-
tracing::info!("device => server outbound tx len: {}", packet.len());
132+
tracing::debug!("device => server outbound tx len: {}", packet.len());
133133
let result = outbound_tx.send(packet).await;
134134
match result {
135135
Ok(_) => Ok(()),

src/utils/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ pub fn get_ipv6() -> Option<String> {
3030

3131
for api in &apis {
3232
if let Ok(ipv6) = fetch_ipv6_from_url(api) {
33-
tracing::info!("Retrieved public IPv6 from {}: {}", api, ipv6);
3433
return Some(ipv6);
3534
}
3635
}

0 commit comments

Comments
 (0)