feat: implement client

This commit is contained in:
2026-02-12 13:29:15 -08:00
parent 0d0a548a46
commit 5aa6b98742
23 changed files with 645 additions and 164 deletions

27
registry/Cargo.toml Normal file
View File

@@ -0,0 +1,27 @@
[package]
name = "registry"
version = "0.1.0"
edition = "2024"
[dependencies]
actix-web = "4.12.1"
actix-ws = "0.3.1"
base64 = "0.22.1"
ipnetwork = { version = "0.21.1", features = ["serde"] }
redis = { version = "=1.0.2", features = ["connection-manager", "tokio-comp"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.149"
console = "0.16.2"
futures = "0.3.31"
futures-util = "0.3.31"
rustls = { version = "0.23.36", features = ["aws-lc-rs"] }
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
tracing-actix-web = "0.7.21"
tokio = { version = "1.49.0", features = ["macros", "rt-multi-thread", "sync"] }
thiserror = "2.0.18"
thiserror-ext = "0.3.0"
[lib]
name = "registry"
path = "src/lib.rs"

View File

@@ -1,88 +1,88 @@
use crate::{AppState, error::Error, storage::StorageImpl};
use actix_web::{HttpRequest, HttpResponse, rt, web};
use actix_ws::AggregatedMessage;
use futures_util::StreamExt;
use tokio::sync::broadcast::error::RecvError;
pub async fn peers(
req: HttpRequest,
stream: web::Payload,
app_state: web::Data<AppState>,
) -> Result<HttpResponse, Error> {
let (res, mut session, msg_stream) =
actix_ws::handle(&req, stream).map_err(|e| Error::ws(e))?;
let mut msg_stream = msg_stream.aggregate_continuations();
let mut peer_rx = app_state.peer_updates.subscribe();
match app_state.storage.get_peers().await {
Ok(initial_peers) => {
let json = serde_json::to_string(&initial_peers).unwrap_or_else(|_| "[]".to_string());
if session.text(json).await.is_err() {
return Ok(res);
}
tracing::info!(
"sent initial peer list ({} peers) to new client",
initial_peers.len()
);
}
Err(e) => {
tracing::warn!("failed to fetch initial peers: {:?}", e);
session.close(None).await.ok();
return Ok(res);
}
}
rt::spawn(async move {
loop {
tokio::select! {
msg = msg_stream.next() => {
match msg {
Some(Ok(AggregatedMessage::Ping(data))) => {
if session.pong(&data).await.is_err() {
break;
}
}
Some(Ok(AggregatedMessage::Pong(_))) => {}
Some(Ok(AggregatedMessage::Close(_))) => {
break;
}
Some(Ok(AggregatedMessage::Text(_))) => {
}
Some(Ok(AggregatedMessage::Binary(_))) => {
}
Some(Err(_)) => {
break;
}
None => {
break;
}
}
}
update = peer_rx.recv() => {
match update {
Ok(peer_update) => {
let json = serde_json::to_string(&peer_update.peer)
.unwrap_or_else(|_| "{}".to_string());
if session.text(json).await.is_err() {
break;
}
tracing::info!("sent peer update to client: {}", peer_update.peer.public_key);
}
Err(RecvError::Lagged(n)) => {
tracing::warn!("client lagged, missed {} updates", n);
}
Err(RecvError::Closed) => {
break;
}
}
}
}
}
session.close(None).await.ok();
tracing::info!("client disconnected");
});
Ok(res)
}
use crate::{AppState, error::Error, storage::StorageImpl};
use actix_web::{HttpRequest, HttpResponse, rt, web};
use actix_ws::AggregatedMessage;
use futures_util::StreamExt;
+use tokio::sync::broadcast::error::RecvError;
pub async fn peers(
req: HttpRequest,
stream: web::Payload,
app_state: web::Data<AppState>,
) -> Result<HttpResponse, Error> {
let (res, mut session, msg_stream) =
actix_ws::handle(&req, stream).map_err(|e| Error::ws(e))?;
let mut msg_stream = msg_stream.aggregate_continuations();
let mut peer_rx = app_state.peer_updates.subscribe();
match app_state.storage.get_peers().await {
Ok(initial_peers) => {
let json = serde_json::to_string(&initial_peers).unwrap_or_else(|_| "[]".to_string());
if session.text(json).await.is_err() {
return Ok(res);
}
tracing::info!(
+ "sent initial peer list ({} peers) to new client",
initial_peers.len()
);
}
Err(e) => {
tracing::warn!("failed to fetch initial peers: {:?}", e);
+ session.close(None).await.ok();
return Ok(res);
}
}
rt::spawn(async move {
loop {
tokio::select! {
msg = msg_stream.next() => {
match msg {
Some(Ok(AggregatedMessage::Ping(data))) => {
if session.pong(&data).await.is_err() {
break;
}
}
Some(Ok(AggregatedMessage::Pong(_))) => {}
Some(Ok(AggregatedMessage::Close(_))) => {
break;
}
Some(Ok(AggregatedMessage::Text(_))) => {
}
Some(Ok(AggregatedMessage::Binary(_))) => {
}
Some(Err(_)) => {
break;
}
None => {
break;
}
}
}
update = peer_rx.recv() => {
match update {
Ok(peer_update) => {
let json = serde_json::to_string(&peer_update.peer)
.unwrap_or_else(|_| "{}".to_string());
if session.text(json).await.is_err() {
break;
}
+ tracing::info!("sent peer update to client: {}", peer_update.peer.public_key);
}
+ Err(RecvError::Lagged(n)) => {
+ tracing::warn!("client lagged, missed {} updates", n);
}
+ Err(RecvError::Closed) => {
break;
}
}
}
}
}
+ session.close(None).await.ok();
+ tracing::info!("client disconnected");
});
Ok(res)
}

View File

@@ -2,9 +2,9 @@ use crate::{
AppState, PeerUpdate,
error::Result,
storage::{RegisterRequest, StorageImpl},
utils::Peer,
};
use actix_web::{HttpResponse, web};
use registry::Peer;
pub async fn register_peer(
app_state: web::Data<AppState>,

View File

@@ -0,0 +1,92 @@
use crate::{AppState, error::Error, storage::StorageImpl};
use actix_web::{HttpRequest, HttpResponse, rt, web};
use actix_ws::AggregatedMessage;
use futures_util::StreamExt;
use registry::PeerMessage;
pub async fn peers(
req: HttpRequest,
stream: web::Payload,
app_state: web::Data<AppState>,
) -> Result<HttpResponse, Error> {
let (res, mut session, msg_stream) =
actix_ws::handle(&req, stream).map_err(|e| Error::ws(e))?;
let mut msg_stream = msg_stream.aggregate_continuations();
let mut peer_rx = app_state.peer_updates.subscribe();
match app_state.storage.get_peers().await {
Ok(initial_peers) => {
let msg = PeerMessage::HydratePeers {
peers: initial_peers.clone(),
};
let json = serde_json::to_string(&msg)
.unwrap_or_else(|_| r#"{"type":"HydratePeers","peers":[]}"#.to_string());
if session.text(json).await.is_err() {
return Ok(res);
}
tracing::info!(
"sent initial peer list ({} peers) to new WebSocket client",
initial_peers.len()
);
}
Err(e) => {
tracing::warn!("failed to fetch initial peers: {:?}", e);
session.close(None).await.ok();
return Ok(res);
}
}
rt::spawn(async move {
loop {
tokio::select! {
msg = msg_stream.next() => {
match msg {
Some(Ok(AggregatedMessage::Ping(data))) => {
if session.pong(&data).await.is_err() {
break;
}
}
Some(Ok(AggregatedMessage::Pong(_))) => {}
Some(Ok(AggregatedMessage::Close(_))) => {
break;
}
Some(Ok(AggregatedMessage::Text(_))) => {
}
Some(Ok(AggregatedMessage::Binary(_))) => {
}
Some(Err(_)) => {
break;
}
None => {
break;
}
}
}
update = peer_rx.recv() => {
match update {
Ok(peer_update) => {
let json = serde_json::to_string(&peer_update.peer)
.unwrap_or_else(|_| "{}".to_string());
if session.text(json).await.is_err() {
break;
}
tracing::info!("sent peer update to WebSocket client: {}", peer_update.peer.public_key);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!("WebSocket client lagged, missed {} updates", n);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
}
}
session.close(None).await.ok();
tracing::info!("WebSocket client disconnected");
});
Ok(res)
}

View File

@@ -37,6 +37,7 @@ pub enum ErrorKind {
impl ResponseError for Error {
fn error_response(&self) -> actix_web::HttpResponse<actix_web::body::BoxBody> {
match self.inner() {
ErrorKind::Ws(e) => e.error_response(),
_ => HttpResponse::InternalServerError().finish(),
}
}

4
registry/src/lib.rs Normal file
View File

@@ -0,0 +1,4 @@
mod types;
mod utils;
pub use types::peer_message::*;

View File

@@ -1,10 +1,12 @@
mod endpoints;
mod error;
mod storage;
mod types;
mod utils;
use actix_web::{App, HttpServer, web};
use console::style;
use registry::Peer;
use thiserror_ext::AsReport;
use tokio::sync::broadcast;
use tracing::level_filters::LevelFilter;
@@ -13,7 +15,6 @@ use tracing_subscriber::{
};
use crate::storage::{Storage, get_storage_from_env};
use crate::utils::Peer;
#[derive(Clone, Debug)]
pub struct PeerUpdate {

View File

@@ -1,10 +1,8 @@
use crate::{
error::{Error, Result},
utils::Peer,
};
use crate::error::{Error, Result};
mod valkey;
use registry::Peer;
pub use valkey::RegisterRequest;
pub enum Storage {

View File

@@ -3,10 +3,11 @@ use std::net::IpAddr;
use ipnetwork::IpNetwork;
use redis::AsyncTypedCommands;
use registry::Peer;
use serde::Deserialize;
use crate::error::Result;
use crate::utils::{Peer, WireguardPublicKey};
use crate::utils::WireguardPublicKey;
use crate::{error::Error, storage::StorageImpl};
pub struct ValkeyStorage {

View File

@@ -0,0 +1 @@
pub mod peer_message;

View File

@@ -1,6 +1,7 @@
use ipnetwork::IpNetwork;
use serde::{Deserialize, Serialize};
use ipnetwork::IpNetwork;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Peer {
pub public_key: String,
@@ -8,3 +9,10 @@ pub struct Peer {
pub port: String,
pub allowed_ips: Vec<IpNetwork>,
}
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum PeerMessage {
HydratePeers { peers: Vec<Peer> },
PeerUpdate { peer: Peer },
}

View File

@@ -1,5 +1,3 @@
mod peer;
mod wg;
pub use peer::Peer;
pub use wg::WireguardPublicKey;