diff --git a/registry/endpoints/ws/peers.rs b/registry/endpoints/ws/peers.rs deleted file mode 100644 index 758b6aa..0000000 --- a/registry/endpoints/ws/peers.rs +++ /dev/null @@ -1,88 +0,0 @@ - use crate::{AppState, error::Error, storage::StorageImpl}; - use actix_web::{HttpRequest, HttpResponse, rt, web}; - use actix_ws::AggregatedMessage; - use futures_util::StreamExt; -+use tokio::sync::broadcast::error::RecvError; - - pub async fn peers( - req: HttpRequest, - stream: web::Payload, - app_state: web::Data, - ) -> Result { - let (res, mut session, msg_stream) = - actix_ws::handle(&req, stream).map_err(|e| Error::ws(e))?; - - let mut msg_stream = msg_stream.aggregate_continuations(); - - let mut peer_rx = app_state.peer_updates.subscribe(); - - match app_state.storage.get_peers().await { - Ok(initial_peers) => { - let json = serde_json::to_string(&initial_peers).unwrap_or_else(|_| "[]".to_string()); - if session.text(json).await.is_err() { - return Ok(res); - } - tracing::info!( -+ "sent initial peer list ({} peers) to new client", - initial_peers.len() - ); - } - Err(e) => { - tracing::warn!("failed to fetch initial peers: {:?}", e); -+ session.close(None).await.ok(); - return Ok(res); - } - } - - rt::spawn(async move { - loop { - tokio::select! { - msg = msg_stream.next() => { - match msg { - Some(Ok(AggregatedMessage::Ping(data))) => { - if session.pong(&data).await.is_err() { - break; - } - } - Some(Ok(AggregatedMessage::Pong(_))) => {} - Some(Ok(AggregatedMessage::Close(_))) => { - break; - } - Some(Ok(AggregatedMessage::Text(_))) => { - } - Some(Ok(AggregatedMessage::Binary(_))) => { - } - Some(Err(_)) => { - break; - } - None => { - break; - } - } - } - update = peer_rx.recv() => { - match update { - Ok(peer_update) => { - let json = serde_json::to_string(&peer_update.peer) - .unwrap_or_else(|_| "{}".to_string()); - if session.text(json).await.is_err() { - break; - } -+ tracing::info!("sent peer update to client: {}", peer_update.peer.public_key); - } -+ Err(RecvError::Lagged(n)) => { -+ tracing::warn!("client lagged, missed {} updates", n); - } -+ Err(RecvError::Closed) => { - break; - } - } - } - } - } -+ session.close(None).await.ok(); -+ tracing::info!("client disconnected"); - }); - - Ok(res) - } diff --git a/registry/src/endpoints/ws/peers.rs b/registry/src/endpoints/ws/peers.rs index 597c8e7..7327b34 100644 --- a/registry/src/endpoints/ws/peers.rs +++ b/registry/src/endpoints/ws/peers.rs @@ -27,7 +27,7 @@ pub async fn peers( return Ok(res); } tracing::info!( - "sent initial peer list ({} peers) to new WebSocket client", + "sent initial peer list ({} peers) to new client", initial_peers.len() ); } @@ -72,10 +72,10 @@ pub async fn peers( if session.text(json).await.is_err() { break; } - tracing::info!("sent peer update to WebSocket client: {}", peer_update.peer.public_key); + tracing::info!("sent peer update to client: {}", peer_update.peer.public_key); } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!("WebSocket client lagged, missed {} updates", n); + tracing::warn!("client lagged, missed {} updates", n); } Err(tokio::sync::broadcast::error::RecvError::Closed) => { break; @@ -85,7 +85,7 @@ pub async fn peers( } } session.close(None).await.ok(); - tracing::info!("WebSocket client disconnected"); + tracing::info!("client disconnected"); }); Ok(res)