diff --git a/Cargo.lock b/Cargo.lock index 8df1bb0..40872f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,7 +40,7 @@ dependencies = [ "foldhash", "futures-core", "h2", - "http", + "http 0.2.12", "httparse", "httpdate", "itoa", @@ -76,7 +76,7 @@ checksum = "13d324164c51f63867b57e73ba5936ea151b8a41a1d23d1031eeb9f70d0236f8" dependencies = [ "bytestring", "cfg-if", - "http", + "http 0.2.12", "regex", "regex-lite", "serde", @@ -250,6 +250,28 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-lc-rs" +version = "1.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b7b6141e96a8c160799cc2d5adecd5cbbe5054cb8c7c4af53da0f83bb7ad256" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.37.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c34dda4df7017c8db52132f0f8a2e0f8161649d15723ed63fc00c82d0f2081a" +dependencies = [ + "cc", + "cmake", + "dunce", + "fs_extra", +] + [[package]] name = "backon" version = "1.6.0" @@ -340,6 +362,33 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "client" +version = "0.1.0" +dependencies = [ + "console", + "futures", + "futures-util", + "registry", + "serde", + "serde_json", + "thiserror", + "thiserror-ext", + "tokio", + "tokio-tungstenite", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "cmake" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" +dependencies = [ + "cc", +] + [[package]] name = "combine" version = "4.6.7" @@ -387,6 +436,22 @@ dependencies = [ "version_check", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "cpufeatures" version = "0.2.17" @@ -415,6 +480,12 @@ dependencies = [ "typenum", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "deranged" version = "0.5.5" @@ -468,6 +539,12 @@ dependencies = [ "syn", ] +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "either" version = "1.15.0" @@ -548,6 +625,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futures" version = "0.3.31" @@ -647,6 +730,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "getrandom" version = "0.3.4" @@ -670,7 +764,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap", "slab", "tokio", @@ -695,6 +789,16 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +dependencies = [ + "bytes", + "itoa", +] + [[package]] name = "httparse" version = "1.10.1" @@ -846,7 +950,7 @@ version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" dependencies = [ - "getrandom", + "getrandom 0.3.4", "libc", ] @@ -874,9 +978,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.180" +version = "0.2.181" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" +checksum = "459427e2af2b9c839b132acb702a1c654d95e10f8c326bfc2ad11310e458b1c5" [[package]] name = "litemap" @@ -1014,6 +1118,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "openssl-probe" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" + [[package]] name = "parking_lot" version = "0.12.5" @@ -1155,7 +1265,7 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" dependencies = [ - "getrandom", + "getrandom 0.3.4", ] [[package]] @@ -1229,6 +1339,43 @@ version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" +[[package]] +name = "registry" +version = "0.1.0" +dependencies = [ + "actix-web", + "actix-ws", + "base64", + "console", + "futures", + "futures-util", + "ipnetwork", + "redis", + "rustls", + "serde", + "serde_json", + "thiserror", + "thiserror-ext", + "tokio", + "tracing", + "tracing-actix-web", + "tracing-subscriber", +] + +[[package]] +name = "ring" +version = "0.17.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" +dependencies = [ + "cc", + "cfg-if", + "getrandom 0.2.17", + "libc", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rustc_version" version = "0.4.1" @@ -1238,6 +1385,54 @@ dependencies = [ "semver", ] +[[package]] +name = "rustls" +version = "0.23.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" +dependencies = [ + "aws-lc-rs", + "log", + "once_cell", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-native-certs" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pki-types" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" +dependencies = [ + "zeroize", +] + +[[package]] +name = "rustls-webpki" +version = "0.103.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7df23109aa6c1567d1c575b9952556388da57401e4ace1d15f79eedad0d8f53" +dependencies = [ + "aws-lc-rs", + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -1246,9 +1441,18 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "ryu" -version = "1.0.22" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" + +[[package]] +name = "schannel" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +dependencies = [ + "windows-sys 0.61.2", +] [[package]] name = "scopeguard" @@ -1256,6 +1460,29 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.27" @@ -1403,6 +1630,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "syn" version = "2.0.114" @@ -1545,6 +1778,32 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" +dependencies = [ + "rustls", + "tokio", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "rustls", + "rustls-native-certs", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -1633,6 +1892,25 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http 1.4.0", + "httparse", + "log", + "rand", + "rustls", + "rustls-pki-types", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.19.0" @@ -1641,9 +1919,9 @@ checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" [[package]] name = "unicode-ident" -version = "1.0.22" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +checksum = "537dd038a89878be9b64dd4bd1b260315c1bb94f4d784956b81e27a088d9a09e" [[package]] name = "unicode-segmentation" @@ -1663,6 +1941,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.8" @@ -1675,6 +1959,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -1687,7 +1977,7 @@ version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f" dependencies = [ - "getrandom", + "getrandom 0.3.4", "js-sys", "wasm-bindgen", ] @@ -1764,28 +2054,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "wg-mesh" -version = "0.1.0" -dependencies = [ - "actix-web", - "actix-ws", - "base64", - "console", - "futures", - "futures-util", - "ipnetwork", - "redis", - "serde", - "serde_json", - "thiserror", - "thiserror-ext", - "tokio", - "tracing", - "tracing-actix-web", - "tracing-subscriber", -] - [[package]] name = "windows-link" version = "0.2.1" @@ -2030,6 +2298,12 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zeroize" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" + [[package]] name = "zerotrie" version = "0.2.3" @@ -2065,9 +2339,9 @@ dependencies = [ [[package]] name = "zmij" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445" +checksum = "4de98dfa5d5b7fef4ee834d0073d560c9ca7b6c46a71d058c48db7960f8cfaf7" [[package]] name = "zstd" diff --git a/Cargo.toml b/Cargo.toml index 7f532ef..5b0476e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,30 +1,5 @@ -[package] -name = "wg-mesh" -version = "0.1.0" -edition = "2024" +[workspace] +members = ["registry", "client"] +resolver = "3" -[dependencies] -actix-web = "4.12.1" -actix-ws = "0.3.1" -base64 = "0.22.1" -console = "0.16.2" -futures = "0.3.31" -futures-util = "0.3.31" -ipnetwork = { version = "0.21.1", features = ["serde"] } -redis = { version = "=1.0.2", features = ["connection-manager", "tokio-comp"] } -serde = { version = "1.0.228", features = ["derive"] } -serde_json = "1.0.149" -thiserror = "2.0.18" -thiserror-ext = "0.3.0" -tokio = { version = "1.49.0", features = ["macros", "sync"] } -tracing = "0.1.44" -tracing-actix-web = "0.7.21" -tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } - -[[bin]] -name = "registry" -path = "registry/main.rs" - -[[bin]] -name = "client" -path = "client/main.rs" +[workspace.dependencies] diff --git a/bacon.toml b/bacon.toml index 8d5e596..192963a 100644 --- a/bacon.toml +++ b/bacon.toml @@ -4,3 +4,4 @@ need_stdout = true background = false on_change_strategy = "kill_then_restart" kill = ["kill", "-s", "INT"] +watch = ["registry/", "client/", "Cargo.toml"] diff --git a/client/Cargo.toml b/client/Cargo.toml new file mode 100644 index 0000000..6b2adf6 --- /dev/null +++ b/client/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "client" +version = "0.1.0" +edition = "2024" + +[dependencies] +console = "0.16.2" +futures = "0.3.31" +futures-util = "0.3.31" +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.149" +thiserror = "2.0.18" +thiserror-ext = "0.3.0" +tokio = { version = "1.49.0", features = ["macros", "rt-multi-thread"] } +tokio-tungstenite = { version = "0.28.0", features = ["rustls-tls-native-roots"] } +tracing = "0.1.44" +tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } +registry = { path = "../registry" } diff --git a/client/main.rs b/client/main.rs deleted file mode 100644 index f328e4d..0000000 --- a/client/main.rs +++ /dev/null @@ -1 +0,0 @@ -fn main() {} diff --git a/client/src/error.rs b/client/src/error.rs new file mode 100644 index 0000000..1d68dc7 --- /dev/null +++ b/client/src/error.rs @@ -0,0 +1,20 @@ +use thiserror::Error; +use thiserror_ext::{Box, Construct}; +use tokio_tungstenite::tungstenite; + +#[derive(Error, Debug, Box, Construct)] +#[thiserror_ext(newtype(name = Error))] +pub enum ErrorKind { + #[error("error connecting to websocket")] + WsConnect { + url: String, + #[source] + source: tungstenite::Error, + }, + #[error("error reading websocket msg")] + WsRead(#[source] tungstenite::Error), + #[error("error deserializing json")] + DeserializeJson(#[source] serde_json::Error), +} + +pub type Result = core::result::Result; diff --git a/client/src/main.rs b/client/src/main.rs new file mode 100644 index 0000000..cfe3e69 --- /dev/null +++ b/client/src/main.rs @@ -0,0 +1,63 @@ +use console::style; +use futures::StreamExt; +use registry::PeerMessage; +use thiserror_ext::AsReport; +use tokio_tungstenite::tungstenite::Message; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::{ + EnvFilter, fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt, +}; + +use crate::error::Error; + +mod error; + +async fn run() -> crate::error::Result<()> { + let url = get_api_from_env(); + let (ws_stream, response) = tokio_tungstenite::connect_async(&url) + .await + .map_err(|e| Error::ws_connect(e, &url))?; + let (_, mut read) = ws_stream.split(); + tracing::info!("connected, response: {:?}", response.status()); + + while let Some(msg) = read.next().await { + match msg.map_err(|e| Error::ws_read(e))? { + Message::Text(text) => { + let server_msg: PeerMessage = + serde_json::from_str(&text).map_err(|e| Error::deserialize_json(e))?; + match server_msg { + PeerMessage::HydratePeers { peers } => {} + PeerMessage::PeerUpdate { peer } => {} + } + } + _ => {} + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() { + let tracing_env_filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + + tracing_subscriber::registry() + .with(tracing_env_filter) + .with( + tracing_subscriber::fmt::layer() + .compact() + .with_span_events(FmtSpan::CLOSE), + ) + .init(); + + if let Err(e) = run().await { + eprintln!("{}: {}", style("error").red(), e.as_report()); + std::process::exit(1) + } +} + +fn get_api_from_env() -> String { + return "ws://localhost:8080/ws/peers".to_string(); +} diff --git a/registry/Cargo.toml b/registry/Cargo.toml new file mode 100644 index 0000000..7b917f4 --- /dev/null +++ b/registry/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "registry" +version = "0.1.0" +edition = "2024" + +[dependencies] +actix-web = "4.12.1" +actix-ws = "0.3.1" +base64 = "0.22.1" +ipnetwork = { version = "0.21.1", features = ["serde"] } +redis = { version = "=1.0.2", features = ["connection-manager", "tokio-comp"] } +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.149" +console = "0.16.2" +futures = "0.3.31" +futures-util = "0.3.31" +rustls = { version = "0.23.36", features = ["aws-lc-rs"] } +tracing = "0.1.44" +tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } +tracing-actix-web = "0.7.21" +tokio = { version = "1.49.0", features = ["macros", "rt-multi-thread", "sync"] } +thiserror = "2.0.18" +thiserror-ext = "0.3.0" + +[lib] +name = "registry" +path = "src/lib.rs" diff --git a/registry/endpoints/ws/peers.rs b/registry/endpoints/ws/peers.rs index 3bf3b42..758b6aa 100644 --- a/registry/endpoints/ws/peers.rs +++ b/registry/endpoints/ws/peers.rs @@ -1,88 +1,88 @@ -use crate::{AppState, error::Error, storage::StorageImpl}; -use actix_web::{HttpRequest, HttpResponse, rt, web}; -use actix_ws::AggregatedMessage; -use futures_util::StreamExt; -use tokio::sync::broadcast::error::RecvError; - -pub async fn peers( - req: HttpRequest, - stream: web::Payload, - app_state: web::Data, -) -> Result { - let (res, mut session, msg_stream) = - actix_ws::handle(&req, stream).map_err(|e| Error::ws(e))?; - - let mut msg_stream = msg_stream.aggregate_continuations(); - - let mut peer_rx = app_state.peer_updates.subscribe(); - - match app_state.storage.get_peers().await { - Ok(initial_peers) => { - let json = serde_json::to_string(&initial_peers).unwrap_or_else(|_| "[]".to_string()); - if session.text(json).await.is_err() { - return Ok(res); - } - tracing::info!( - "sent initial peer list ({} peers) to new client", - initial_peers.len() - ); - } - Err(e) => { - tracing::warn!("failed to fetch initial peers: {:?}", e); - session.close(None).await.ok(); - return Ok(res); - } - } - - rt::spawn(async move { - loop { - tokio::select! { - msg = msg_stream.next() => { - match msg { - Some(Ok(AggregatedMessage::Ping(data))) => { - if session.pong(&data).await.is_err() { - break; - } - } - Some(Ok(AggregatedMessage::Pong(_))) => {} - Some(Ok(AggregatedMessage::Close(_))) => { - break; - } - Some(Ok(AggregatedMessage::Text(_))) => { - } - Some(Ok(AggregatedMessage::Binary(_))) => { - } - Some(Err(_)) => { - break; - } - None => { - break; - } - } - } - update = peer_rx.recv() => { - match update { - Ok(peer_update) => { - let json = serde_json::to_string(&peer_update.peer) - .unwrap_or_else(|_| "{}".to_string()); - if session.text(json).await.is_err() { - break; - } - tracing::info!("sent peer update to client: {}", peer_update.peer.public_key); - } - Err(RecvError::Lagged(n)) => { - tracing::warn!("client lagged, missed {} updates", n); - } - Err(RecvError::Closed) => { - break; - } - } - } - } - } - session.close(None).await.ok(); - tracing::info!("client disconnected"); - }); - - Ok(res) -} + use crate::{AppState, error::Error, storage::StorageImpl}; + use actix_web::{HttpRequest, HttpResponse, rt, web}; + use actix_ws::AggregatedMessage; + use futures_util::StreamExt; ++use tokio::sync::broadcast::error::RecvError; + + pub async fn peers( + req: HttpRequest, + stream: web::Payload, + app_state: web::Data, + ) -> Result { + let (res, mut session, msg_stream) = + actix_ws::handle(&req, stream).map_err(|e| Error::ws(e))?; + + let mut msg_stream = msg_stream.aggregate_continuations(); + + let mut peer_rx = app_state.peer_updates.subscribe(); + + match app_state.storage.get_peers().await { + Ok(initial_peers) => { + let json = serde_json::to_string(&initial_peers).unwrap_or_else(|_| "[]".to_string()); + if session.text(json).await.is_err() { + return Ok(res); + } + tracing::info!( ++ "sent initial peer list ({} peers) to new client", + initial_peers.len() + ); + } + Err(e) => { + tracing::warn!("failed to fetch initial peers: {:?}", e); ++ session.close(None).await.ok(); + return Ok(res); + } + } + + rt::spawn(async move { + loop { + tokio::select! { + msg = msg_stream.next() => { + match msg { + Some(Ok(AggregatedMessage::Ping(data))) => { + if session.pong(&data).await.is_err() { + break; + } + } + Some(Ok(AggregatedMessage::Pong(_))) => {} + Some(Ok(AggregatedMessage::Close(_))) => { + break; + } + Some(Ok(AggregatedMessage::Text(_))) => { + } + Some(Ok(AggregatedMessage::Binary(_))) => { + } + Some(Err(_)) => { + break; + } + None => { + break; + } + } + } + update = peer_rx.recv() => { + match update { + Ok(peer_update) => { + let json = serde_json::to_string(&peer_update.peer) + .unwrap_or_else(|_| "{}".to_string()); + if session.text(json).await.is_err() { + break; + } ++ tracing::info!("sent peer update to client: {}", peer_update.peer.public_key); + } ++ Err(RecvError::Lagged(n)) => { ++ tracing::warn!("client lagged, missed {} updates", n); + } ++ Err(RecvError::Closed) => { + break; + } + } + } + } + } ++ session.close(None).await.ok(); ++ tracing::info!("client disconnected"); + }); + + Ok(res) + } diff --git a/registry/endpoints/mod.rs b/registry/src/endpoints/mod.rs similarity index 100% rename from registry/endpoints/mod.rs rename to registry/src/endpoints/mod.rs diff --git a/registry/endpoints/peers.rs b/registry/src/endpoints/peers.rs similarity index 100% rename from registry/endpoints/peers.rs rename to registry/src/endpoints/peers.rs diff --git a/registry/endpoints/register.rs b/registry/src/endpoints/register.rs similarity index 97% rename from registry/endpoints/register.rs rename to registry/src/endpoints/register.rs index a77ad09..a9b5dd0 100644 --- a/registry/endpoints/register.rs +++ b/registry/src/endpoints/register.rs @@ -2,9 +2,9 @@ use crate::{ AppState, PeerUpdate, error::Result, storage::{RegisterRequest, StorageImpl}, - utils::Peer, }; use actix_web::{HttpResponse, web}; +use registry::Peer; pub async fn register_peer( app_state: web::Data, diff --git a/registry/endpoints/ws/mod.rs b/registry/src/endpoints/ws/mod.rs similarity index 100% rename from registry/endpoints/ws/mod.rs rename to registry/src/endpoints/ws/mod.rs diff --git a/registry/src/endpoints/ws/peers.rs b/registry/src/endpoints/ws/peers.rs new file mode 100644 index 0000000..597c8e7 --- /dev/null +++ b/registry/src/endpoints/ws/peers.rs @@ -0,0 +1,92 @@ +use crate::{AppState, error::Error, storage::StorageImpl}; +use actix_web::{HttpRequest, HttpResponse, rt, web}; +use actix_ws::AggregatedMessage; +use futures_util::StreamExt; +use registry::PeerMessage; + +pub async fn peers( + req: HttpRequest, + stream: web::Payload, + app_state: web::Data, +) -> Result { + let (res, mut session, msg_stream) = + actix_ws::handle(&req, stream).map_err(|e| Error::ws(e))?; + + let mut msg_stream = msg_stream.aggregate_continuations(); + + let mut peer_rx = app_state.peer_updates.subscribe(); + + match app_state.storage.get_peers().await { + Ok(initial_peers) => { + let msg = PeerMessage::HydratePeers { + peers: initial_peers.clone(), + }; + let json = serde_json::to_string(&msg) + .unwrap_or_else(|_| r#"{"type":"HydratePeers","peers":[]}"#.to_string()); + if session.text(json).await.is_err() { + return Ok(res); + } + tracing::info!( + "sent initial peer list ({} peers) to new WebSocket client", + initial_peers.len() + ); + } + Err(e) => { + tracing::warn!("failed to fetch initial peers: {:?}", e); + session.close(None).await.ok(); + return Ok(res); + } + } + + rt::spawn(async move { + loop { + tokio::select! { + msg = msg_stream.next() => { + match msg { + Some(Ok(AggregatedMessage::Ping(data))) => { + if session.pong(&data).await.is_err() { + break; + } + } + Some(Ok(AggregatedMessage::Pong(_))) => {} + Some(Ok(AggregatedMessage::Close(_))) => { + break; + } + Some(Ok(AggregatedMessage::Text(_))) => { + } + Some(Ok(AggregatedMessage::Binary(_))) => { + } + Some(Err(_)) => { + break; + } + None => { + break; + } + } + } + update = peer_rx.recv() => { + match update { + Ok(peer_update) => { + let json = serde_json::to_string(&peer_update.peer) + .unwrap_or_else(|_| "{}".to_string()); + if session.text(json).await.is_err() { + break; + } + tracing::info!("sent peer update to WebSocket client: {}", peer_update.peer.public_key); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("WebSocket client lagged, missed {} updates", n); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + break; + } + } + } + } + } + session.close(None).await.ok(); + tracing::info!("WebSocket client disconnected"); + }); + + Ok(res) +} diff --git a/registry/error.rs b/registry/src/error.rs similarity index 96% rename from registry/error.rs rename to registry/src/error.rs index 6d277a7..4162477 100644 --- a/registry/error.rs +++ b/registry/src/error.rs @@ -37,6 +37,7 @@ pub enum ErrorKind { impl ResponseError for Error { fn error_response(&self) -> actix_web::HttpResponse { match self.inner() { + ErrorKind::Ws(e) => e.error_response(), _ => HttpResponse::InternalServerError().finish(), } } diff --git a/registry/src/lib.rs b/registry/src/lib.rs new file mode 100644 index 0000000..0aeb8a0 --- /dev/null +++ b/registry/src/lib.rs @@ -0,0 +1,4 @@ +mod types; +mod utils; + +pub use types::peer_message::*; diff --git a/registry/main.rs b/registry/src/main.rs similarity index 98% rename from registry/main.rs rename to registry/src/main.rs index ecd5491..c722630 100644 --- a/registry/main.rs +++ b/registry/src/main.rs @@ -1,10 +1,12 @@ mod endpoints; mod error; mod storage; +mod types; mod utils; use actix_web::{App, HttpServer, web}; use console::style; +use registry::Peer; use thiserror_ext::AsReport; use tokio::sync::broadcast; use tracing::level_filters::LevelFilter; @@ -13,7 +15,6 @@ use tracing_subscriber::{ }; use crate::storage::{Storage, get_storage_from_env}; -use crate::utils::Peer; #[derive(Clone, Debug)] pub struct PeerUpdate { diff --git a/registry/storage/mod.rs b/registry/src/storage/mod.rs similarity index 93% rename from registry/storage/mod.rs rename to registry/src/storage/mod.rs index 8c8cc33..340a478 100644 --- a/registry/storage/mod.rs +++ b/registry/src/storage/mod.rs @@ -1,10 +1,8 @@ -use crate::{ - error::{Error, Result}, - utils::Peer, -}; +use crate::error::{Error, Result}; mod valkey; +use registry::Peer; pub use valkey::RegisterRequest; pub enum Storage { diff --git a/registry/storage/valkey.rs b/registry/src/storage/valkey.rs similarity index 98% rename from registry/storage/valkey.rs rename to registry/src/storage/valkey.rs index ba81920..2cdcadb 100644 --- a/registry/storage/valkey.rs +++ b/registry/src/storage/valkey.rs @@ -3,10 +3,11 @@ use std::net::IpAddr; use ipnetwork::IpNetwork; use redis::AsyncTypedCommands; +use registry::Peer; use serde::Deserialize; use crate::error::Result; -use crate::utils::{Peer, WireguardPublicKey}; +use crate::utils::WireguardPublicKey; use crate::{error::Error, storage::StorageImpl}; pub struct ValkeyStorage { diff --git a/registry/src/types/mod.rs b/registry/src/types/mod.rs new file mode 100644 index 0000000..6d71daa --- /dev/null +++ b/registry/src/types/mod.rs @@ -0,0 +1 @@ +pub mod peer_message; diff --git a/registry/utils/peer.rs b/registry/src/types/peer_message.rs similarity index 61% rename from registry/utils/peer.rs rename to registry/src/types/peer_message.rs index 059b1c9..abd9f86 100644 --- a/registry/utils/peer.rs +++ b/registry/src/types/peer_message.rs @@ -1,6 +1,7 @@ -use ipnetwork::IpNetwork; use serde::{Deserialize, Serialize}; +use ipnetwork::IpNetwork; + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Peer { pub public_key: String, @@ -8,3 +9,10 @@ pub struct Peer { pub port: String, pub allowed_ips: Vec, } + +#[derive(Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum PeerMessage { + HydratePeers { peers: Vec }, + PeerUpdate { peer: Peer }, +} diff --git a/registry/utils/mod.rs b/registry/src/utils/mod.rs similarity index 57% rename from registry/utils/mod.rs rename to registry/src/utils/mod.rs index 00a4845..c0cdc11 100644 --- a/registry/utils/mod.rs +++ b/registry/src/utils/mod.rs @@ -1,5 +1,3 @@ -mod peer; mod wg; -pub use peer::Peer; pub use wg::WireguardPublicKey; diff --git a/registry/utils/wg.rs b/registry/src/utils/wg.rs similarity index 100% rename from registry/utils/wg.rs rename to registry/src/utils/wg.rs