fix: cleanup
This commit is contained in:
@@ -1,88 +0,0 @@
|
|||||||
use crate::{AppState, error::Error, storage::StorageImpl};
|
|
||||||
use actix_web::{HttpRequest, HttpResponse, rt, web};
|
|
||||||
use actix_ws::AggregatedMessage;
|
|
||||||
use futures_util::StreamExt;
|
|
||||||
+use tokio::sync::broadcast::error::RecvError;
|
|
||||||
|
|
||||||
pub async fn peers(
|
|
||||||
req: HttpRequest,
|
|
||||||
stream: web::Payload,
|
|
||||||
app_state: web::Data<AppState>,
|
|
||||||
) -> Result<HttpResponse, Error> {
|
|
||||||
let (res, mut session, msg_stream) =
|
|
||||||
actix_ws::handle(&req, stream).map_err(|e| Error::ws(e))?;
|
|
||||||
|
|
||||||
let mut msg_stream = msg_stream.aggregate_continuations();
|
|
||||||
|
|
||||||
let mut peer_rx = app_state.peer_updates.subscribe();
|
|
||||||
|
|
||||||
match app_state.storage.get_peers().await {
|
|
||||||
Ok(initial_peers) => {
|
|
||||||
let json = serde_json::to_string(&initial_peers).unwrap_or_else(|_| "[]".to_string());
|
|
||||||
if session.text(json).await.is_err() {
|
|
||||||
return Ok(res);
|
|
||||||
}
|
|
||||||
tracing::info!(
|
|
||||||
+ "sent initial peer list ({} peers) to new client",
|
|
||||||
initial_peers.len()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!("failed to fetch initial peers: {:?}", e);
|
|
||||||
+ session.close(None).await.ok();
|
|
||||||
return Ok(res);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rt::spawn(async move {
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
msg = msg_stream.next() => {
|
|
||||||
match msg {
|
|
||||||
Some(Ok(AggregatedMessage::Ping(data))) => {
|
|
||||||
if session.pong(&data).await.is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(Ok(AggregatedMessage::Pong(_))) => {}
|
|
||||||
Some(Ok(AggregatedMessage::Close(_))) => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Some(Ok(AggregatedMessage::Text(_))) => {
|
|
||||||
}
|
|
||||||
Some(Ok(AggregatedMessage::Binary(_))) => {
|
|
||||||
}
|
|
||||||
Some(Err(_)) => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
update = peer_rx.recv() => {
|
|
||||||
match update {
|
|
||||||
Ok(peer_update) => {
|
|
||||||
let json = serde_json::to_string(&peer_update.peer)
|
|
||||||
.unwrap_or_else(|_| "{}".to_string());
|
|
||||||
if session.text(json).await.is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
+ tracing::info!("sent peer update to client: {}", peer_update.peer.public_key);
|
|
||||||
}
|
|
||||||
+ Err(RecvError::Lagged(n)) => {
|
|
||||||
+ tracing::warn!("client lagged, missed {} updates", n);
|
|
||||||
}
|
|
||||||
+ Err(RecvError::Closed) => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
+ session.close(None).await.ok();
|
|
||||||
+ tracing::info!("client disconnected");
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
||||||
@@ -27,7 +27,7 @@ pub async fn peers(
|
|||||||
return Ok(res);
|
return Ok(res);
|
||||||
}
|
}
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"sent initial peer list ({} peers) to new WebSocket client",
|
"sent initial peer list ({} peers) to new client",
|
||||||
initial_peers.len()
|
initial_peers.len()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -72,10 +72,10 @@ pub async fn peers(
|
|||||||
if session.text(json).await.is_err() {
|
if session.text(json).await.is_err() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
tracing::info!("sent peer update to WebSocket client: {}", peer_update.peer.public_key);
|
tracing::info!("sent peer update to client: {}", peer_update.peer.public_key);
|
||||||
}
|
}
|
||||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||||
tracing::warn!("WebSocket client lagged, missed {} updates", n);
|
tracing::warn!("client lagged, missed {} updates", n);
|
||||||
}
|
}
|
||||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||||
break;
|
break;
|
||||||
@@ -85,7 +85,7 @@ pub async fn peers(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
session.close(None).await.ok();
|
session.close(None).await.ok();
|
||||||
tracing::info!("WebSocket client disconnected");
|
tracing::info!("client disconnected");
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(res)
|
Ok(res)
|
||||||
|
|||||||
Reference in New Issue
Block a user