diff --git a/Cargo.lock b/Cargo.lock index 0a4e8eb5..fbd4e6be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -338,10 +338,10 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "itoa", "matchit", @@ -354,7 +354,7 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tower", "tower-layer", @@ -371,13 +371,13 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", "rustversion", - "sync_wrapper", + "sync_wrapper 1.0.2", "tower-layer", "tower-service", "tracing", @@ -418,6 +418,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -444,6 +450,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.9.1" @@ -641,6 +653,16 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "colored" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" +dependencies = [ + "lazy_static", + "windows-sys 0.59.0", +] + [[package]] name = "colored" version = "3.0.0" @@ -665,6 +687,17 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "cookie" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7efb37c3e1ccb1ff97164ad95ac1606e8ccd35b3fa0a7d99a304c7f4a428cc24" +dependencies = [ + "percent-encoding", + "time", + "version_check", +] + [[package]] name = "cookie" version = "0.18.1" @@ -676,15 +709,32 @@ dependencies = [ "version_check", ] +[[package]] +name = "cookie_store" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "387461abbc748185c3a6e1673d826918b450b87ff22639429c694619a83b6cf6" +dependencies = [ + "cookie 0.17.0", + "idna 0.3.0", + "log", + "publicsuffix", + "serde", + "serde_derive", + "serde_json", + "time", + "url", +] + [[package]] name = "cookie_store" version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eac901828f88a5241ee0600950ab981148a18f2f756900ffba1b125ca6a3ef9" dependencies = [ - "cookie", + "cookie 0.18.1", "document-features", - "idna", + "idna 1.0.3", "log", "publicsuffix", "serde", @@ -793,6 +843,20 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "der" version = "0.7.10" @@ -856,7 +920,7 @@ dependencies = [ "jsonwebtoken", "log", "mockito", - "reqwest", + "reqwest 0.12.19", "sea-orm", "serde", "serde_json", @@ -931,6 +995,29 @@ dependencies = [ "utoipa", ] +[[package]] +name = "env_filter" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bf3c259d255ca70051b30e2e95b5446cdb8949ac4cd22c0d7fd634d89f568e2" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "jiff", + "log", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -985,6 +1072,22 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "eventsource-client" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c80c6714d1a380314fcb11a22eeff022e1e1c9642f0bb54e15dc9cb29f37b29" +dependencies = [ + "futures", + "hyper 0.14.32", + "hyper-rustls 0.24.2", + "hyper-timeout", + "log", + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -1218,6 +1321,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "h2" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.10" @@ -1229,7 +1351,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http", + "http 1.3.1", "indexmap", "slab", "tokio", @@ -1246,6 +1368,12 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.3" @@ -1317,6 +1445,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.3.1" @@ -1328,6 +1467,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -1335,7 +1485,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.3.1", ] [[package]] @@ -1346,8 +1496,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "pin-project-lite", ] @@ -1369,6 +1519,30 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.27", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.6.0" @@ -1378,9 +1552,9 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2", - "http", - "http-body", + "h2 0.4.10", + "http 1.3.1", + "http-body 1.0.1", "httparse", "httpdate", "itoa", @@ -1390,23 +1564,64 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "log", + "rustls 0.21.12", + "rustls-native-certs", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http", - "hyper", + "http 1.3.1", + "hyper 1.6.0", "hyper-util", - "rustls", + "rustls 0.23.27", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tower-service", "webpki-roots 1.0.0", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper 0.14.32", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.32", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -1415,7 +1630,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "native-tls", "tokio", @@ -1429,20 +1644,20 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc2fdfdbff08affe55bb779f33b053aa1fe5dd5b54c257343c17edfa55711bdb" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-channel", "futures-core", "futures-util", - "http", - "http-body", - "hyper", + "http 1.3.1", + "http-body 1.0.1", + "hyper 1.6.0", "ipnet", "libc", "percent-encoding", "pin-project-lite", "socket2", - "system-configuration", + "system-configuration 0.6.1", "tokio", "tower-service", "tracing", @@ -1565,6 +1780,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "idna" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "idna" version = "1.0.3" @@ -1636,6 +1861,30 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jiff" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49cce2b81f2098e7e3efc35bc2e0a6b7abec9d34128283d7a26fa8f32a6dbb35" +dependencies = [ + "jiff-static", + "log", + "portable-atomic", + "portable-atomic-util", + "serde_core", +] + +[[package]] +name = "jiff-static" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "980af8b43c3ad5d8d349ace167ec8170839f753a42d233ba19e08afe1850fa69" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "js-sys" version = "0.3.77" @@ -1652,7 +1901,7 @@ version = "9.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" dependencies = [ - "base64", + "base64 0.22.1", "js-sys", "pem", "ring", @@ -1835,12 +2084,12 @@ checksum = "7760e0e418d9b7e5777c0374009ca4c93861b9066f18cb334a20ce50ab63aa48" dependencies = [ "assert-json-diff", "bytes", - "colored", + "colored 3.0.0", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "log", "rand 0.9.1", @@ -1967,7 +2216,7 @@ version = "0.10.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" dependencies = [ - "bitflags", + "bitflags 2.9.1", "cfg-if", "foreign-types", "libc", @@ -2118,7 +2367,7 @@ version = "3.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3" dependencies = [ - "base64", + "base64 0.22.1", "serde", ] @@ -2146,6 +2395,26 @@ dependencies = [ "serde", ] +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2211,6 +2480,21 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "portable-atomic" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" + +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + [[package]] name = "potential_utf" version = "0.1.2" @@ -2344,7 +2628,7 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42ea446cab60335f76979ec15e12619a2165b5ae2c12166bef27d283a9fadf" dependencies = [ - "idna", + "idna 1.0.3", "psl-types", ] @@ -2360,7 +2644,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls", + "rustls 0.23.27", "socket2", "thiserror 2.0.12", "tokio", @@ -2380,7 +2664,7 @@ dependencies = [ "rand 0.9.1", "ring", "rustc-hash", - "rustls", + "rustls 0.23.27", "rustls-pki-types", "slab", "thiserror 2.0.12", @@ -2489,7 +2773,7 @@ version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" dependencies = [ - "bitflags", + "bitflags 2.9.1", ] [[package]] @@ -2502,6 +2786,7 @@ dependencies = [ "openssl-sys", "service", "simplelog", + "sse", "tokio", "web", ] @@ -2559,25 +2844,67 @@ dependencies = [ "bytecheck", ] +[[package]] +name = "reqwest" +version = "0.11.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +dependencies = [ + "base64 0.21.7", + "bytes", + "cookie 0.17.0", + "cookie_store 0.20.0", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.27", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper-tls 0.5.0", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 0.1.2", + "system-configuration 0.5.1", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "reqwest" version = "0.12.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2f8e5513d63f2e5b386eb5106dc67eaf3f84e95258e210489136b8b92ad6119" dependencies = [ - "base64", + "base64 0.22.1", "bytes", - "cookie", - "cookie_store", + "cookie 0.18.1", + "cookie_store 0.21.1", "encoding_rs", "futures-core", - "h2", - "http", - "http-body", + "h2 0.4.10", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", - "hyper", - "hyper-rustls", - "hyper-tls", + "hyper 1.6.0", + "hyper-rustls 0.27.7", + "hyper-tls 0.6.0", "hyper-util", "ipnet", "js-sys", @@ -2588,15 +2915,15 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls", + "rustls 0.23.27", "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tokio-native-tls", - "tokio-rustls", + "tokio-rustls 0.26.2", "tower", "tower-http", "tower-service", @@ -2726,13 +3053,25 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" dependencies = [ - "bitflags", + "bitflags 2.9.1", "errno", "libc", "linux-raw-sys", "windows-sys 0.59.0", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.23.27" @@ -2742,11 +3081,32 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.103.3", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + [[package]] name = "rustls-pki-types" version = "1.12.0" @@ -2757,6 +3117,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.3" @@ -2804,6 +3174,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "sdd" version = "3.0.10" @@ -2984,7 +3364,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.9.1", "core-foundation", "core-foundation-sys", "libc", @@ -3012,18 +3392,28 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -3102,6 +3492,7 @@ dependencies = [ "serde_json", "simplelog", "sqlx", + "sse", "tokio", "tower", "utoipa", @@ -3274,7 +3665,7 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6" dependencies = [ - "base64", + "base64 0.22.1", "bigdecimal", "bytes", "chrono", @@ -3295,7 +3686,7 @@ dependencies = [ "once_cell", "percent-encoding", "rust_decimal", - "rustls", + "rustls 0.23.27", "serde", "serde_json", "sha2", @@ -3355,9 +3746,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" dependencies = [ "atoi", - "base64", + "base64 0.22.1", "bigdecimal", - "bitflags", + "bitflags 2.9.1", "byteorder", "bytes", "chrono", @@ -3402,9 +3793,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" dependencies = [ "atoi", - "base64", + "base64 0.22.1", "bigdecimal", - "bitflags", + "bitflags 2.9.1", "byteorder", "chrono", "crc", @@ -3465,6 +3856,38 @@ dependencies = [ "uuid", ] +[[package]] +name = "sse" +version = "1.0.0-beta2" +dependencies = [ + "async-stream", + "axum", + "dashmap", + "log", + "serde", + "serde_json", + "tokio", + "uuid", +] + +[[package]] +name = "sse-test-client" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "colored 2.2.0", + "env_logger", + "eventsource-client", + "futures-util", + "log", + "reqwest 0.11.27", + "serde", + "serde_json", + "tokio", + "uuid", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -3528,6 +3951,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "sync_wrapper" version = "1.0.2" @@ -3548,15 +3977,36 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys 0.5.0", +] + [[package]] name = "system-configuration" version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags", + "bitflags 2.9.1", "core-foundation", - "system-configuration-sys", + "system-configuration-sys 0.6.0", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", ] [[package]] @@ -3723,6 +4173,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bd86198d9ee903fedd2f9a2e72014287c0d9167e4ae43b5853007205dda1b76" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.5.0" @@ -3744,13 +4204,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls", + "rustls 0.23.27", "tokio", ] @@ -3804,7 +4274,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project-lite", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tower-layer", "tower-service", @@ -3819,9 +4289,9 @@ checksum = "4fd0118512cf0b3768f7fcccf0bef1ae41d68f2b45edc1e77432b36c97c56c6d" dependencies = [ "async-trait", "axum-core", - "cookie", + "cookie 0.18.1", "futures-util", - "http", + "http 1.3.1", "parking_lot", "pin-project-lite", "tower-layer", @@ -3834,12 +4304,12 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ - "bitflags", + "bitflags 2.9.1", "bytes", "futures-core", "futures-util", - "http", - "http-body", + "http 1.3.1", + "http-body 1.0.1", "http-body-util", "http-range-header", "httpdate", @@ -3875,7 +4345,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "65856c81ee244e0f8a55ab0f7b769b72fbde387c235f0a73cd97c579818d05eb" dependencies = [ "async-trait", - "http", + "http 1.3.1", "time", "tokio", "tower-cookies", @@ -3894,9 +4364,9 @@ checksum = "fb6abbfcaf6436ec5a772cd9f965401da12db793e404ae6134eac066fa5a04f3" dependencies = [ "async-trait", "axum-core", - "base64", + "base64 0.22.1", "futures", - "http", + "http 1.3.1", "parking_lot", "rand 0.8.5", "serde", @@ -4044,7 +4514,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" dependencies = [ "form_urlencoded", - "idna", + "idna 1.0.3", "percent-encoding", ] @@ -4240,6 +4710,7 @@ name = "web" version = "1.0.0-beta2" dependencies = [ "anyhow", + "async-stream", "async-trait", "axum", "axum-login", @@ -4248,12 +4719,13 @@ dependencies = [ "futures", "log", "password-auth", - "reqwest", + "reqwest 0.12.19", "sea-orm", "serde", "serde_json", "service", "sqlx", + "sse", "time", "tokio", "tower", @@ -4548,13 +5020,23 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" dependencies = [ - "bitflags", + "bitflags 2.9.1", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index dbc61997..b2822316 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,12 +6,16 @@ edition = "2021" default-run = "refactor_platform_rs" [workspace] -members = [".", "entity_api", "entity", "migration", "service", "web", "domain"] +members = [".", "entity_api", "entity", "migration", "service", "web", "domain", "sse", "sse-test-client"] +# Exclude sse-test-client from default builds - it's a development/testing tool only +# and should not be built or deployed in production environments +default-members = [".", "entity_api", "entity", "migration", "service", "web", "domain", "sse"] [dependencies] service = { path = "service" } entity_api = { path = "entity_api" } web = { path = "web" } +sse = { path = "sse" } clap = { version = "4.5.20", features = ["cargo", "derive", "env"] } log = "0.4.22" diff --git a/README.md b/README.md index f95e0184..7d5966c5 100644 --- a/README.md +++ b/README.md @@ -178,6 +178,8 @@ _For additional commands, database utilities, and debugging tips, check the [Con `service` - CLI flags, environment variables, config handling and backend daemon setup +`sse` - Server-Sent Events infrastructure for real-time notifications. In-memory connection management (single-instance only) + `src` - contains a main function that initializes logging and calls all sub-services `web` - API endpoint definition, routing, handling of request/responses, controllers diff --git a/docker-compose.yaml b/docker-compose.yaml index 448309a2..73131a4c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -54,6 +54,23 @@ services: networks: - backend_network + ###################################################### + # CRITICAL: SSE Connection Management Limitation + # + # The rust-app service MUST run as a single instance (replicas: 1) + # because SSE connections are tracked in-memory using DashMap. + # + # ⚠️ DO NOT SCALE HORIZONTALLY WITHOUT REDIS PUB/SUB ⚠️ + # + # If you need to scale beyond 1 replica: + # 1. Add Redis service to docker-compose.yaml + # 2. Update SseManager to use Redis Pub/Sub + # 3. See docs/implementation-plans/sse-communication.md + # "Multi-Instance Architecture" section + # + # Symptom if misconfigured: SSE events randomly fail + # (~50% with 2 replicas, ~67% with 3 replicas, etc.) + ###################################################### rust-app: image: ${BACKEND_IMAGE_NAME} build: diff --git a/docs/architecture/crate_dependency_graph.md b/docs/architecture/crate_dependency_graph.md index 59384b53..7efc3c61 100644 --- a/docs/architecture/crate_dependency_graph.md +++ b/docs/architecture/crate_dependency_graph.md @@ -1,10 +1,14 @@ This diagram represents the dependency structure of the crates in this project. Each arrow indicates a dependency relationship between the crates. For example, the `web` crate depends on both the `domain` and `service` crates, while the `entity_api` crate depends on the `entity` and `service` crates. +The `sse` crate is standalone with no domain dependencies, using generic types to avoid circular dependencies. + ```mermaid graph TD; web-->domain; web-->service; + web-->sse; + service-->sse; domain-->entity_api; entity_api-->entity; entity_api-->service; diff --git a/docs/architecture/network_flow_diagram.md b/docs/architecture/network_flow_diagram.md index 8480aafd..5ee1c46f 100644 --- a/docs/architecture/network_flow_diagram.md +++ b/docs/architecture/network_flow_diagram.md @@ -38,6 +38,7 @@ flowchart TB 4. **API Forwarding**: NextJS forwards API calls to Axum backend on port 4000 5. **Backend Processing**: Axum backend handles API endpoints like `/api/login` with secure caching 6. **Database Operations**: Backend connects to Digital Ocean Managed PostgreSQL +7. **SSE Connections**: Long-lived `/api/sse` connections for real-time events (24h timeout, no buffering) ## Infrastructure Notes @@ -48,4 +49,6 @@ flowchart TB - Axum: Internal port 4000 - **Managed PostgreSQL**: Separate Digital Ocean managed database service, accessed over the internet with SSL - **SSL/TLS**: HTTPS encryption from client to Nginx using Let's Encrypt certificates managed by `certbot`, then unencrypted internal traffic within the container network -- **Database Connection**: Axum connects to managed PostgreSQL over SSL outside the container network \ No newline at end of file +- **Database Connection**: Axum connects to managed PostgreSQL over SSL outside the container network +- **SSE Configuration**: Nginx configured for long-lived connections (24h timeout, proxy buffering disabled) at `/api/sse` endpoint +- **Scaling Limitation**: SSE uses in-memory connection tracking - **single backend instance only** until Redis pub/sub is implemented \ No newline at end of file diff --git a/docs/architecture/system_architecture_diagram.md b/docs/architecture/system_architecture_diagram.md index d583e132..db267272 100644 --- a/docs/architecture/system_architecture_diagram.md +++ b/docs/architecture/system_architecture_diagram.md @@ -32,6 +32,10 @@ The Refactor Platform is a coaching management system built with Rust (Axum back - **Migration**: Database schema versioning and migrations - **Database**: PostgreSQL with `refactor_platform` schema +### Real-Time Communication +- **SSE (Server-Sent Events)**: Unidirectional push notifications from server to client +- **Connection Management**: In-memory registry for active user connections (single-instance only) + ### External Integrations - **TipTap**: Collaborative document editing service - **JWT**: Token generation and validation service @@ -71,11 +75,13 @@ graph TB Router[Router
Route Definitions & Middleware] Controllers[Controllers
HTTP Request Handlers] Auth[Authentication Layer
Session Management] - + SSE[SSE Handler
Real-Time Events] + %% Business Logic Layer Domain[Domain Layer
Business Logic & Models] EntityAPI[Entity API
Database Operations] Service[Service Layer
Configuration & Utilities] + SSEManager[SSE Manager
Connection Registry] %% Data Layer Entity[Entity Layer
Database Models] @@ -96,6 +102,7 @@ graph TB %% Router to Controllers Router --> Controllers Router --> Auth + Router --> SSE %% Controllers breakdown Controllers --> ActionCtrl[Action Controller] @@ -123,6 +130,11 @@ graph TB %% Domain to Data Access Domain --> EntityAPI Domain --> Service + + %% SSE Integration + SSE --> SSEManager + Service --> SSEManager + Domain -.->|send events| SSEManager %% Data Access Layer EntityAPI --> Entity @@ -147,8 +159,8 @@ graph TB classDef database fill:#ffebee class Client,Nginx external - class Web,Router,Controllers,Auth,ActionCtrl,AgreementCtrl,CoachingCtrl,NoteCtrl,OrgCtrl,UserCtrl,GoalCtrl,SessionCtrl,JWTCtrl,HealthCtrl web - class Domain,EntityAPI,Service business + class Web,Router,Controllers,Auth,SSE,ActionCtrl,AgreementCtrl,CoachingCtrl,NoteCtrl,OrgCtrl,UserCtrl,GoalCtrl,SessionCtrl,JWTCtrl,HealthCtrl web + class Domain,EntityAPI,Service,SSEManager business class Entity,Migration data class DB database ``` \ No newline at end of file diff --git a/docs/implementation-plans/sse-communication.md b/docs/implementation-plans/sse-communication.md index e6593690..2ac27da3 100644 --- a/docs/implementation-plans/sse-communication.md +++ b/docs/implementation-plans/sse-communication.md @@ -45,7 +45,7 @@ graph TB subgraph Backend["Backend (Single Instance)"] Handler["SSE Handler
(handler.rs)
• Extract AuthenticatedUser
• Create channel
• Register connection"] - Manager["SSE Manager
(manager.rs)
• DashMap connections
• Filter by scope
• Route messages"] + Manager["SSE Manager
(manager.rs)
• ConnectionRegistry
• O(1) user lookup
• Route messages"] Controller["Action Controller
(action_controller.rs)
• Create resource in DB
• Determine recipient
• Send SSE message"] @@ -104,7 +104,7 @@ sequenceDiagram DB-->>Controller: Action saved Controller->>Controller: Determine recipient
(Coachee) Controller->>Manager: send_message(SseMessage)
scope: User{coachee_id} - Manager->>Manager: Filter connections
by user_id + Manager->>Manager: O(1) lookup in user_index
for coachee_id Manager-->>Handler: Send to Coachee's channel Handler-->>Nginx: SSE event Nginx-->>Coachee: event: action_created
data: {action} @@ -116,33 +116,37 @@ sequenceDiagram ### SSE Manager Internal Structure ```mermaid -graph LR - subgraph "SseManager (In-Memory)" - DashMap["DashMap<ConnectionId, Metadata>"] - - subgraph Connections["Active Connections"] - C1["conn_uuid_1
• user_id: coach_id
• sender: Channel"] - C2["conn_uuid_2
• user_id: coachee_id
• sender: Channel"] - C3["conn_uuid_3
• user_id: coach_id
• sender: Channel"] - end +graph TB + subgraph "ConnectionRegistry (Dual-Index Architecture)" + Primary["Primary Index
DashMap<ConnectionId, ConnectionInfo>
• O(1) registration/cleanup"] + Secondary["Secondary Index
DashMap<UserId, HashSet<ConnectionId>>
• O(1) user lookup"] + end + + subgraph Connections["Active Connections"] + C1["conn_uuid_1
• user_id: coach_id
• sender: Channel"] + C2["conn_uuid_2
• user_id: coachee_id
• sender: Channel"] + C3["conn_uuid_3
• user_id: coach_id
• sender: Channel"] end - subgraph "Message Routing" + subgraph "Message Routing (O(1) lookup)" Msg["SseMessage
• event: ActionCreated
• scope: User{coachee_id}"] - Filter{"Filter by
scope"} + Lookup["O(1) Lookup
user_index[coachee_id]"] end - Msg --> Filter - Filter -->|"user_id == coachee_id"| C2 - Filter -.->|"Skip"| C1 - Filter -.->|"Skip"| C3 + Primary --> Connections + Secondary -->|"coach_id → {uuid_1, uuid_3}"| C1 + Secondary -->|"coach_id → {uuid_1, uuid_3}"| C3 + Secondary -->|"coachee_id → {uuid_2}"| C2 - DashMap --- Connections + Msg --> Lookup + Lookup -->|"Direct lookup"| C2 style C2 fill:#81c784,stroke:#2e7d32,stroke-width:2px,color:#000 - style C1 fill:#ef9a9a,stroke:#c62828,stroke-width:2px,color:#000 - style C3 fill:#ef9a9a,stroke:#c62828,stroke-width:2px,color:#000 - style Filter fill:#ffb74d,stroke:#e65100,stroke-width:2px,color:#000 + style C1 fill:#e0e0e0,stroke:#616161,stroke-width:1px,color:#000 + style C3 fill:#e0e0e0,stroke:#616161,stroke-width:1px,color:#000 + style Lookup fill:#81c784,stroke:#2e7d32,stroke-width:2px,color:#000 + style Primary fill:#b3e5fc,stroke:#01579b,stroke-width:2px,color:#000 + style Secondary fill:#fff9c4,stroke:#f57f17,stroke-width:2px,color:#000 ``` ### Event Types and Scopes @@ -214,9 +218,1108 @@ stateDiagram-v2 --- -## Phase 0: Docker Compose Documentation +## Phase 0: SSE Integration Testing Tool + +### Overview +A standalone Rust binary for testing SSE functionality without requiring a frontend client. The tool authenticates as two users, establishes SSE connections, triggers events via API calls, and validates that events are received correctly. + +**Tool Location:** `sse-test-client/` (new cargo workspace member) + +### 0.1 Create Workspace Structure + +**Add to root `Cargo.toml`:** +```toml +[workspace] +members = [ + # ... existing members + "sse-test-client", +] +``` + +**Create `sse-test-client/Cargo.toml`:** +```toml +[package] +name = "sse-test-client" +version = "0.1.0" +edition = "2021" + +[dependencies] +# HTTP client +reqwest = { version = "0.11", features = ["json", "cookies"] } + +# SSE parsing +eventsource-client = "0.12" + +# CLI +clap = { version = "4.5", features = ["derive"] } + +# Async runtime +tokio = { version = "1", features = ["full"] } + +# Serialization +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + +# Output formatting +colored = "2.1" +anyhow = "1.0" + +# Logging +log = "0.4" +env_logger = "0.11" + +# Utilities +uuid = { version = "1.6", features = ["v4", "serde"] } +``` + +--- + +### 0.2 Tool Architecture + +**File structure:** +``` +sse-test-client/ +├── Cargo.toml +├── src/ +│ ├── main.rs # CLI entry point, scenario orchestration +│ ├── auth.rs # Login and session management +│ ├── sse_client.rs # SSE connection handling +│ ├── api_client.rs # API calls to trigger events +│ ├── scenarios.rs # Test scenario definitions +│ ├── output.rs # Color-coded formatting +│ └── types.rs # Shared types (events, test data) +``` + +--- + +### 0.3 Implement Authentication Module + +**File:** `sse-test-client/src/auth.rs` + +```rust +use anyhow::{Context, Result}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone)] +pub struct UserCredentials { + pub email: String, + pub password: String, +} + +impl UserCredentials { + pub fn parse(input: &str) -> Result { + let parts: Vec<&str> = input.split(':').collect(); + if parts.len() != 2 { + anyhow::bail!("Invalid credentials format. Expected email:password"); + } + Ok(Self { + email: parts[0].to_string(), + password: parts[1].to_string(), + }) + } +} + +#[derive(Debug, Clone)] +pub struct AuthenticatedUser { + pub user_id: String, + pub session_cookie: String, + pub credentials: UserCredentials, +} + +#[derive(Debug, Serialize)] +struct LoginRequest { + email: String, + password: String, +} + +#[derive(Debug, Deserialize)] +struct LoginResponse { + user_id: String, +} + +pub async fn login( + client: &Client, + base_url: &str, + credentials: &UserCredentials, +) -> Result { + let url = format!("{}/user_sessions", base_url); + + let response = client + .post(&url) + .json(&LoginRequest { + email: credentials.email.clone(), + password: credentials.password.clone(), + }) + .send() + .await + .context("Failed to send login request")?; + + if !response.status().is_success() { + anyhow::bail!("Login failed: {}", response.status()); + } + + // Extract session cookie + let session_cookie = response + .cookies() + .find(|cookie| cookie.name() == "session_id") + .context("No session cookie in response")? + .value() + .to_string(); + + let login_response: LoginResponse = response + .json() + .await + .context("Failed to parse login response")?; + + Ok(AuthenticatedUser { + user_id: login_response.user_id, + session_cookie, + credentials: credentials.clone(), + }) +} +``` + +--- + +### 0.4 Implement SSE Client Module + +**File:** `sse-test-client/src/sse_client.rs` + +```rust +use anyhow::{Context, Result}; +use eventsource_client as es; +use log::*; +use serde_json::Value; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; + +#[derive(Debug, Clone)] +pub struct SseEvent { + pub event_type: String, + pub data: Value, + pub timestamp: Instant, +} + +pub struct SseConnection { + pub user_label: String, + event_rx: mpsc::UnboundedReceiver, + _handle: tokio::task::JoinHandle<()>, +} + +impl SseConnection { + pub async fn establish( + base_url: &str, + session_cookie: &str, + user_label: String, + ) -> Result { + let url = format!("{}/sse", base_url); + let (tx, rx) = mpsc::unbounded_channel(); + + let client = es::ClientBuilder::for_url(&url)? + .header("Cookie", &format!("session_id={}", session_cookie))? + .build(); + + let label = user_label.clone(); + let handle = tokio::spawn(async move { + let mut stream = client.stream(); + + while let Some(event_result) = stream.next().await { + match event_result { + Ok(es::SSE::Event(event)) => { + if let Ok(data) = serde_json::from_str(&event.data) { + let sse_event = SseEvent { + event_type: event.event_type, + data, + timestamp: Instant::now(), + }; + + if tx.send(sse_event).is_err() { + debug!("SSE receiver dropped for {}", label); + break; + } + } + } + Ok(es::SSE::Comment(_)) => { + // Ignore comments (keep-alive) + } + Err(e) => { + warn!("SSE error for {}: {}", label, e); + } + } + } + }); + + Ok(Self { + user_label, + event_rx: rx, + _handle: handle, + }) + } + + pub async fn wait_for_event( + &mut self, + event_type: &str, + timeout: Duration, + ) -> Result { + let deadline = Instant::now() + timeout; + + loop { + let remaining = deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + anyhow::bail!("Timeout waiting for event: {}", event_type); + } + + match tokio::time::timeout(remaining, self.event_rx.recv()).await { + Ok(Some(event)) if event.event_type == event_type => { + return Ok(event); + } + Ok(Some(_)) => { + // Wrong event type, keep waiting + continue; + } + Ok(None) => { + anyhow::bail!("SSE connection closed"); + } + Err(_) => { + anyhow::bail!("Timeout waiting for event: {}", event_type); + } + } + } + } +} +``` + +--- + +### 0.5 Implement API Client Module + +**File:** `sse-test-client/src/api_client.rs` + +```rust +use anyhow::{Context, Result}; +use reqwest::Client; +use serde_json::{json, Value}; + +pub struct ApiClient { + client: Client, + base_url: String, +} + +#[derive(Debug, Clone)] +pub struct TestEnvironment { + pub relationship_id: String, + pub session_id: String, +} + +impl ApiClient { + pub fn new(client: Client, base_url: String) -> Self { + Self { client, base_url } + } + + pub async fn setup_test_environment( + &self, + coach_session: &str, + coachee_session: &str, + coach_id: &str, + coachee_id: &str, + ) -> Result { + // Create coaching relationship + let relationship = self + .create_coaching_relationship(coach_session, coach_id, coachee_id) + .await?; + + let relationship_id = relationship["id"] + .as_str() + .context("No relationship ID in response")? + .to_string(); + + // Create coaching session + let session = self + .create_coaching_session(coach_session, &relationship_id) + .await?; + + let session_id = session["id"] + .as_str() + .context("No session ID in response")? + .to_string(); + + Ok(TestEnvironment { + relationship_id, + session_id, + }) + } + + async fn create_coaching_relationship( + &self, + session_cookie: &str, + coach_id: &str, + coachee_id: &str, + ) -> Result { + let url = format!("{}/coaching_relationships", self.base_url); + + let response = self + .client + .post(&url) + .header("Cookie", format!("session_id={}", session_cookie)) + .json(&json!({ + "coach_id": coach_id, + "coachee_id": coachee_id, + })) + .send() + .await + .context("Failed to create coaching relationship")?; + + if !response.status().is_success() { + anyhow::bail!("Failed to create relationship: {}", response.status()); + } + + response.json().await.context("Failed to parse response") + } + + async fn create_coaching_session( + &self, + session_cookie: &str, + relationship_id: &str, + ) -> Result { + let url = format!("{}/coaching_sessions", self.base_url); + + let response = self + .client + .post(&url) + .header("Cookie", format!("session_id={}", session_cookie)) + .json(&json!({ + "coaching_relationship_id": relationship_id, + "date": "2024-01-01", + })) + .send() + .await + .context("Failed to create coaching session")?; + + if !response.status().is_success() { + anyhow::bail!("Failed to create session: {}", response.status()); + } + + response.json().await.context("Failed to parse response") + } + + pub async fn create_action( + &self, + session_cookie: &str, + coaching_session_id: &str, + title: &str, + ) -> Result { + let url = format!("{}/actions", self.base_url); + + let response = self + .client + .post(&url) + .header("Cookie", format!("session_id={}", session_cookie)) + .json(&json!({ + "coaching_session_id": coaching_session_id, + "title": title, + "description": "Created by SSE test tool", + "status": "not_started", + })) + .send() + .await + .context("Failed to create action")?; + + if !response.status().is_success() { + anyhow::bail!("Failed to create action: {}", response.status()); + } + + response.json().await.context("Failed to parse response") + } + + pub async fn update_action( + &self, + session_cookie: &str, + action_id: &str, + title: &str, + ) -> Result { + let url = format!("{}/actions/{}", self.base_url, action_id); + + let response = self + .client + .put(&url) + .header("Cookie", format!("session_id={}", session_cookie)) + .json(&json!({ + "title": title, + })) + .send() + .await + .context("Failed to update action")?; + + if !response.status().is_success() { + anyhow::bail!("Failed to update action: {}", response.status()); + } + + response.json().await.context("Failed to parse response") + } + + pub async fn delete_action( + &self, + session_cookie: &str, + action_id: &str, + ) -> Result<()> { + let url = format!("{}/actions/{}", self.base_url, action_id); + + let response = self + .client + .delete(&url) + .header("Cookie", format!("session_id={}", session_cookie)) + .send() + .await + .context("Failed to delete action")?; + + if !response.status().is_success() { + anyhow::bail!("Failed to delete action: {}", response.status()); + } + + Ok(()) + } + + pub async fn force_logout( + &self, + admin_session_cookie: &str, + user_id: &str, + ) -> Result<()> { + let url = format!("{}/admin/force_logout/{}", self.base_url, user_id); + + let response = self + .client + .post(&url) + .header("Cookie", format!("session_id={}", admin_session_cookie)) + .send() + .await + .context("Failed to force logout")?; + + if !response.status().is_success() { + anyhow::bail!("Failed to force logout: {}", response.status()); + } + + Ok(()) + } +} +``` + +--- + +### 0.6 Implement Output Formatting Module + +**File:** `sse-test-client/src/output.rs` + +```rust +use colored::*; +use serde_json::Value; +use std::time::Duration; + +use crate::sse_client::SseEvent; + +#[derive(Debug)] +pub struct TestResult { + pub scenario: String, + pub passed: bool, + pub message: Option, + pub duration: Duration, +} + +pub fn print_sse_event(user_label: &str, event: &SseEvent) { + let label_colored = if user_label.contains("User 1") { + user_label.bright_blue() + } else { + user_label.bright_magenta() + }; + + println!( + "\n[{}] {} event received", + label_colored.bold(), + event.event_type.yellow() + ); + + if let Ok(pretty) = serde_json::to_string_pretty(&event.data) { + println!(" {}", pretty.dimmed()); + } +} + +pub fn print_test_summary(results: &[TestResult]) { + println!("\n{}", "=== TEST SUMMARY ===".bright_white().bold()); + + let total = results.len(); + let passed = results.iter().filter(|r| r.passed).count(); + let failed = total - passed; + + for result in results { + let status = if result.passed { + "PASS".green().bold() + } else { + "FAIL".red().bold() + }; + + println!( + "[{}] {} ({:?})", + status, result.scenario, result.duration + ); + + if let Some(msg) = &result.message { + println!(" {}", msg.dimmed()); + } + } + + println!( + "\n{}: {} passed, {} failed", + "Results".bold(), + passed.to_string().green(), + failed.to_string().red() + ); +} +``` + +--- + +### 0.7 Implement Test Scenarios Module -### 0.1 Add SSE Scaling Warning to docker-compose.yaml +**File:** `sse-test-client/src/scenarios.rs` + +```rust +use anyhow::Result; +use colored::*; +use std::time::{Duration, Instant}; + +use crate::api_client::{ApiClient, TestEnvironment}; +use crate::auth::AuthenticatedUser; +use crate::output::{print_sse_event, TestResult}; +use crate::sse_client::SseConnection; + +pub async fn test_action_create( + user1: &AuthenticatedUser, + user2: &AuthenticatedUser, + test_env: &TestEnvironment, + api_client: &ApiClient, + sse1: &mut SseConnection, + sse2: &mut SseConnection, +) -> Result { + let start = Instant::now(); + + println!("\n{}", "=== TEST: Action Create ===".bright_cyan().bold()); + + println!("{} User 1 creating action...", "→".blue()); + + let action = api_client + .create_action( + &user1.session_cookie, + &test_env.session_id, + "Test Action - Create", + ) + .await?; + + let action_id = action["id"].as_str().unwrap(); + println!("{} Action created (ID: {})", "✓".green(), action_id); + + println!( + "{} Waiting for User 2 to receive action_created event...", + "→".blue() + ); + + match sse2 + .wait_for_event("action_created", Duration::from_secs(5)) + .await + { + Ok(event) => { + print_sse_event(&sse2.user_label, &event); + + let received_action_id = event.data["data"]["action"]["id"].as_str().unwrap(); + let received_session_id = event.data["data"]["coaching_session_id"] + .as_str() + .unwrap(); + + if received_action_id == action_id + && received_session_id == test_env.session_id + { + println!("{} Event data verified correctly", "✓".green()); + Ok(TestResult { + scenario: "action_create".to_string(), + passed: true, + message: None, + duration: start.elapsed(), + }) + } else { + println!("{} Event data mismatch!", "✗".red()); + Ok(TestResult { + scenario: "action_create".to_string(), + passed: false, + message: Some(format!( + "Expected action_id={}, session_id={}, got action_id={}, session_id={}", + action_id, test_env.session_id, received_action_id, received_session_id + )), + duration: start.elapsed(), + }) + } + } + Err(e) => { + println!("{} Timeout waiting for event: {}", "✗".red(), e); + Ok(TestResult { + scenario: "action_create".to_string(), + passed: false, + message: Some(format!("Timeout: {}", e)), + duration: start.elapsed(), + }) + } + } +} + +pub async fn test_action_update( + user1: &AuthenticatedUser, + user2: &AuthenticatedUser, + test_env: &TestEnvironment, + api_client: &ApiClient, + sse1: &mut SseConnection, + sse2: &mut SseConnection, +) -> Result { + let start = Instant::now(); + + println!("\n{}", "=== TEST: Action Update ===".bright_cyan().bold()); + + // First create an action + println!("{} User 1 creating action...", "→".blue()); + let action = api_client + .create_action( + &user1.session_cookie, + &test_env.session_id, + "Test Action - Update", + ) + .await?; + + let action_id = action["id"].as_str().unwrap(); + + // Wait for and discard the create event + let _ = sse2 + .wait_for_event("action_created", Duration::from_secs(5)) + .await?; + + // Now update the action + println!("{} User 1 updating action...", "→".blue()); + api_client + .update_action(&user1.session_cookie, action_id, "Updated Title") + .await?; + + println!( + "{} Waiting for User 2 to receive action_updated event...", + "→".blue() + ); + + match sse2 + .wait_for_event("action_updated", Duration::from_secs(5)) + .await + { + Ok(event) => { + print_sse_event(&sse2.user_label, &event); + + let received_title = event.data["data"]["action"]["title"].as_str().unwrap(); + + if received_title == "Updated Title" { + println!("{} Event data verified correctly", "✓".green()); + Ok(TestResult { + scenario: "action_update".to_string(), + passed: true, + message: None, + duration: start.elapsed(), + }) + } else { + Ok(TestResult { + scenario: "action_update".to_string(), + passed: false, + message: Some(format!("Title mismatch: {}", received_title)), + duration: start.elapsed(), + }) + } + } + Err(e) => Ok(TestResult { + scenario: "action_update".to_string(), + passed: false, + message: Some(format!("Timeout: {}", e)), + duration: start.elapsed(), + }), + } +} + +pub async fn test_action_delete( + user1: &AuthenticatedUser, + user2: &AuthenticatedUser, + test_env: &TestEnvironment, + api_client: &ApiClient, + sse1: &mut SseConnection, + sse2: &mut SseConnection, +) -> Result { + let start = Instant::now(); + + println!("\n{}", "=== TEST: Action Delete ===".bright_cyan().bold()); + + // Create action + let action = api_client + .create_action( + &user1.session_cookie, + &test_env.session_id, + "Test Action - Delete", + ) + .await?; + + let action_id = action["id"].as_str().unwrap(); + + // Discard create event + let _ = sse2 + .wait_for_event("action_created", Duration::from_secs(5)) + .await?; + + // Delete action + println!("{} User 1 deleting action...", "→".blue()); + api_client + .delete_action(&user1.session_cookie, action_id) + .await?; + + println!( + "{} Waiting for User 2 to receive action_deleted event...", + "→".blue() + ); + + match sse2 + .wait_for_event("action_deleted", Duration::from_secs(5)) + .await + { + Ok(event) => { + print_sse_event(&sse2.user_label, &event); + + let received_action_id = event.data["data"]["action_id"].as_str().unwrap(); + + if received_action_id == action_id { + println!("{} Event data verified correctly", "✓".green()); + Ok(TestResult { + scenario: "action_delete".to_string(), + passed: true, + message: None, + duration: start.elapsed(), + }) + } else { + Ok(TestResult { + scenario: "action_delete".to_string(), + passed: false, + message: Some(format!("Action ID mismatch: {}", received_action_id)), + duration: start.elapsed(), + }) + } + } + Err(e) => Ok(TestResult { + scenario: "action_delete".to_string(), + passed: false, + message: Some(format!("Timeout: {}", e)), + duration: start.elapsed(), + }), + } +} + +pub async fn test_force_logout( + user1: &AuthenticatedUser, + user2: &AuthenticatedUser, + test_env: &TestEnvironment, + api_client: &ApiClient, + sse1: &mut SseConnection, + sse2: &mut SseConnection, +) -> Result { + let start = Instant::now(); + + println!("\n{}", "=== TEST: Force Logout ===".bright_cyan().bold()); + + println!("{} User 1 forcing logout of User 2...", "→".blue()); + + api_client + .force_logout(&user1.session_cookie, &user2.user_id) + .await?; + + println!( + "{} Waiting for User 2 to receive force_logout event...", + "→".blue() + ); + + match sse2 + .wait_for_event("force_logout", Duration::from_secs(5)) + .await + { + Ok(event) => { + print_sse_event(&sse2.user_label, &event); + println!("{} Event received correctly", "✓".green()); + Ok(TestResult { + scenario: "force_logout".to_string(), + passed: true, + message: None, + duration: start.elapsed(), + }) + } + Err(e) => Ok(TestResult { + scenario: "force_logout".to_string(), + passed: false, + message: Some(format!("Timeout: {}", e)), + duration: start.elapsed(), + }), + } +} +``` + +--- + +### 0.8 Implement Main CLI Entry Point + +**File:** `sse-test-client/src/main.rs` + +```rust +use anyhow::Result; +use clap::Parser; +use colored::*; + +mod api_client; +mod auth; +mod output; +mod scenarios; +mod sse_client; + +use api_client::ApiClient; +use auth::{login, UserCredentials}; +use output::{print_test_summary, TestResult}; +use sse_client::SseConnection; + +#[derive(Parser)] +#[command(name = "sse-test-client")] +#[command(about = "SSE Integration Testing Tool")] +struct Cli { + /// Base URL of the backend (e.g., http://localhost:4747) + #[arg(long)] + base_url: String, + + /// User 1 credentials (format: email:password) + #[arg(long)] + user1: String, + + /// User 2 credentials (format: email:password) + #[arg(long)] + user2: String, + + /// Test scenario to run + #[arg(long, value_enum)] + scenario: ScenarioChoice, + + /// Enable verbose output + #[arg(long, short)] + verbose: bool, +} + +#[derive(clap::ValueEnum, Clone)] +enum ScenarioChoice { + ActionCreate, + ActionUpdate, + ActionDelete, + ForceLogout, + All, +} + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::parse(); + + if cli.verbose { + env_logger::Builder::from_default_env() + .filter_level(log::LevelFilter::Debug) + .init(); + } + + println!("{}", "=== SETUP PHASE ===".bright_white().bold()); + + // Parse credentials + let user1_creds = UserCredentials::parse(&cli.user1)?; + let user2_creds = UserCredentials::parse(&cli.user2)?; + + // Authenticate users + println!("{} Authenticating users...", "→".blue()); + let client = reqwest::Client::new(); + let user1 = login(&client, &cli.base_url, &user1_creds).await?; + let user2 = login(&client, &cli.base_url, &user2_creds).await?; + + println!("{} User 1 authenticated (ID: {})", "✓".green(), user1.user_id); + println!("{} User 2 authenticated (ID: {})", "✓".green(), user2.user_id); + + // Set up test environment + println!("\n{} Creating test coaching relationship and session...", "→".blue()); + let api_client = ApiClient::new(client.clone(), cli.base_url.clone()); + let test_env = api_client + .setup_test_environment( + &user1.session_cookie, + &user2.session_cookie, + &user1.user_id, + &user2.user_id, + ) + .await?; + + println!( + "{} Coaching relationship created (ID: {})", + "✓".green(), + test_env.relationship_id + ); + println!( + "{} Coaching session created (ID: {})", + "✓".green(), + test_env.session_id + ); + + // Establish SSE connections + println!("\n{} Establishing SSE connections...", "→".blue()); + let mut sse1 = SseConnection::establish( + &cli.base_url, + &user1.session_cookie, + "User 1 (Coach)".to_string(), + ) + .await?; + + let mut sse2 = SseConnection::establish( + &cli.base_url, + &user2.session_cookie, + "User 2 (Coachee)".to_string(), + ) + .await?; + + println!("{} User 1 SSE connection established", "✓".green()); + println!("{} User 2 SSE connection established", "✓".green()); + + // Run test scenarios + println!("\n{}", "=== TEST PHASE ===".bright_white().bold()); + + let mut results = Vec::new(); + + match cli.scenario { + ScenarioChoice::ActionCreate => { + results.push( + scenarios::test_action_create( + &user1, &user2, &test_env, &api_client, &mut sse1, &mut sse2, + ) + .await?, + ); + } + ScenarioChoice::ActionUpdate => { + results.push( + scenarios::test_action_update( + &user1, &user2, &test_env, &api_client, &mut sse1, &mut sse2, + ) + .await?, + ); + } + ScenarioChoice::ActionDelete => { + results.push( + scenarios::test_action_delete( + &user1, &user2, &test_env, &api_client, &mut sse1, &mut sse2, + ) + .await?, + ); + } + ScenarioChoice::ForceLogout => { + results.push( + scenarios::test_force_logout( + &user1, &user2, &test_env, &api_client, &mut sse1, &mut sse2, + ) + .await?, + ); + } + ScenarioChoice::All => { + results.push( + scenarios::test_action_create( + &user1, &user2, &test_env, &api_client, &mut sse1, &mut sse2, + ) + .await?, + ); + results.push( + scenarios::test_action_update( + &user1, &user2, &test_env, &api_client, &mut sse1, &mut sse2, + ) + .await?, + ); + results.push( + scenarios::test_action_delete( + &user1, &user2, &test_env, &api_client, &mut sse1, &mut sse2, + ) + .await?, + ); + results.push( + scenarios::test_force_logout( + &user1, &user2, &test_env, &api_client, &mut sse1, &mut sse2, + ) + .await?, + ); + } + } + + // Print summary + println!("\n{}", "=== RESULTS ===".bright_white().bold()); + print_test_summary(&results); + + let all_passed = results.iter().all(|r| r.passed); + + if all_passed { + println!("\n{}", "All tests passed! ✓".bright_green().bold()); + } else { + println!("\n{}", "Some tests failed! ✗".bright_red().bold()); + } + + std::process::exit(if all_passed { 0 } else { 1 }); +} +``` + +--- + +### 0.9 Usage Examples + +**Run individual test scenarios:** +```bash +# Test action creation +cargo run --bin sse-test-client -- \ + --base-url http://localhost:4747 \ + --user1 "coach@example.com:password123" \ + --user2 "coachee@example.com:password456" \ + --scenario action-create + +# Test action update +cargo run --bin sse-test-client -- \ + --base-url http://localhost:4747 \ + --user1 "coach@example.com:password123" \ + --user2 "coachee@example.com:password456" \ + --scenario action-update + +# Test force logout +cargo run --bin sse-test-client -- \ + --base-url http://localhost:4747 \ + --user1 "admin@example.com:adminpass" \ + --user2 "user@example.com:userpass" \ + --scenario force-logout + +# Run all tests +cargo run --bin sse-test-client -- \ + --base-url http://localhost:4747 \ + --user1 "coach@example.com:password123" \ + --user2 "coachee@example.com:password456" \ + --scenario all +``` + +**With verbose logging:** +```bash +cargo run --bin sse-test-client -- \ + --base-url http://localhost:4747 \ + --user1 "coach@example.com:password123" \ + --user2 "coachee@example.com:password456" \ + --scenario all \ + --verbose +``` + +--- + +## Phase 1: Docker Compose Documentation + +### 1.1 Add SSE Scaling Warning to docker-compose.yaml **File:** `docker-compose.yaml` **Add a prominent comment above the rust-app service definition (before line 57):** @@ -246,9 +1349,9 @@ stateDiagram-v2 --- -## Phase 1: Nginx Configuration +## Phase 2: Nginx Configuration -### 1.1 Update Nginx Configuration +### 2.1 Update Nginx Configuration **File:** `nginx/conf.d/refactor-platform.conf` **Why:** SSE connections are long-lived (hours) and require special nginx configuration to prevent buffering events or timing out connections. Without these settings, SSE events would be delayed and connections would close after 60 seconds. The 15-second keep-alive from Axum ensures the connection stays healthy within the 24-hour timeout window. @@ -293,9 +1396,9 @@ location /api/sse { --- -## Phase 2: Backend Infrastructure Setup +## Phase 3: Backend Infrastructure Setup -### 2.1 Add Required Dependencies +### 3.1 Add Required Dependencies **File:** `web/Cargo.toml` Add these dependencies: @@ -312,7 +1415,7 @@ dashmap = "6.1" --- -### 2.2 Create SSE Module Structure +### 3.2 Create SSE Module Structure **Files to create:** - `web/src/sse/mod.rs` - `web/src/sse/manager.rs` @@ -322,7 +1425,7 @@ dashmap = "6.1" --- -### 2.3 Define Message Types +### 3.3 Define Message Types **File:** `web/src/sse/message.rs` **Purpose:** Define strongly-typed event messages that can be sent over SSE @@ -332,11 +1435,17 @@ dashmap = "6.1" - All events include context (coaching_session_id or coaching_relationship_id) for client-side filtering - All events are ephemeral (no persistence) - Two message scopes: User (specific user) and Broadcast (all users) +- Trait-based event type extraction (no string manipulation or unwrap!) ```rust use domain::{actions, agreements, notes, overarching_goals, Id}; use serde::Serialize; +/// Trait for getting the SSE event type name +pub trait EventType { + fn event_type(&self) -> &'static str; +} + #[derive(Debug, Clone, Serialize)] #[serde(tag = "type", content = "data")] pub enum Event { @@ -396,9 +1505,26 @@ pub enum Event { ForceLogout { reason: String }, } +impl SseEventType for Event { + fn event_type(&self) -> &'static str { + match self { + Event::ActionCreated { .. } => "action_created", + Event::ActionUpdated { .. } => "action_updated", + Event::ActionDeleted { .. } => "action_deleted", + Event::AgreementCreated { .. } => "agreement_created", + Event::AgreementUpdated { .. } => "agreement_updated", + Event::AgreementDeleted { .. } => "agreement_deleted", + Event::GoalCreated { .. } => "goal_created", + Event::GoalUpdated { .. } => "goal_updated", + Event::GoalDeleted { .. } => "goal_deleted", + Event::ForceLogout { .. } => "force_logout", + } + } +} + #[derive(Debug, Clone)] pub struct Message { - pub event: SseEvent, + pub event: Event, pub scope: MessageScope, } @@ -411,131 +1537,258 @@ pub enum MessageScope { } ``` +**Why trait-based approach:** +- No string manipulation or `unwrap()` calls +- Compile-time enforcement: adding a new event variant will cause a compile error until `event_type()` is updated +- Event type names match serde renames exactly (single source of truth) +- Zero runtime overhead (returns `&'static str`) + --- -### 2.4 Implement Connection Metadata +### 3.4 Implement Connection Types and Registry **File:** `web/src/sse/connection.rs` -**Purpose:** Track metadata for each SSE connection to enable message filtering +**Purpose:** High-performance connection registry with dual indices for O(1) lookups -**Key struct:** +**Key design decisions:** +- Dual-index architecture: O(1) lookup by both connection_id and user_id +- Type-safe `ConnectionId` newtype prevents string confusion +- Eliminated redundant `connection_id` from info struct +- Automatic cleanup of empty user indices + +**Implementation:** ```rust use domain::Id; +use std::collections::HashSet; use std::convert::Infallible; use tokio::sync::mpsc::UnboundedSender; use axum::response::sse::Event; +use dashmap::DashMap; +use log::*; -#[derive(Debug)] -pub struct Metadata { - /// Unique identifier for this connection (generated server-side) - pub connection_id: String, - /// The authenticated user for this connection +/// Unique identifier for a connection (server-generated) +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ConnectionId(String); + +impl ConnectionId { + pub fn new() -> Self { + Self(Id::new_v4().to_string()) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl Default for ConnectionId { + fn default() -> Self { + Self::new() + } +} + +/// Connection information (no redundant connection_id) +#[derive(Debug, Clone)] +pub struct ConnectionInfo { pub user_id: Id, - /// Channel sender for this connection pub sender: UnboundedSender>, } -impl Metadata { - pub fn new(user_id: Id, sender: UnboundedSender>) -> Self { +/// High-performance connection registry with dual indices for O(1) lookups +pub struct ConnectionRegistry { + /// Primary storage: lookup by connection_id for registration/cleanup - O(1) + connections: DashMap, + + /// Secondary index: fast lookup by user_id for message routing - O(1) + user_index: DashMap>, +} + +impl ConnectionRegistry { + pub fn new() -> Self { Self { - connection_id: domain::Id::new_v4().to_string(), - user_id, - sender, + connections: DashMap::new(), + user_index: DashMap::new(), } } + + /// Register a new connection - O(1) + pub fn register(&self, user_id: Id, sender: UnboundedSender>) -> ConnectionId { + let connection_id = ConnectionId::new(); + + // Insert into primary storage + self.connections.insert( + connection_id.clone(), + ConnectionInfo { user_id, sender }, + ); + + // Update secondary index + self.user_index + .entry(user_id) + .or_insert_with(HashSet::new) + .insert(connection_id.clone()); + + connection_id + } + + /// Unregister a connection - O(1) + pub fn unregister(&self, connection_id: &ConnectionId) { + // Remove from primary storage + if let Some((_, info)) = self.connections.remove(connection_id) { + let user_id = info.user_id; + + // Update secondary index + if let Some(mut entry) = self.user_index.get_mut(&user_id) { + entry.remove(connection_id); + + // Clean up empty user entries + if entry.is_empty() { + drop(entry); // Release lock before removal + self.user_index.remove(&user_id); + } + } + } + } + + /// Send message to specific user - O(1) lookup + O(k) send where k = user's connections + pub fn send_to_user(&self, user_id: &Id, event: Event) { + if let Some(connection_ids) = self.user_index.get(user_id) { + for conn_id in connection_ids.iter() { + if let Some(info) = self.connections.get(conn_id) { + if let Err(e) = info.sender.send(Ok(event.clone())) { + warn!( + "Failed to send event to connection {}: {}. Connection will be cleaned up.", + conn_id.as_str(), e + ); + } + } + } + } + } + + /// Broadcast message to all connections - O(n) (unavoidable, but explicit) + pub fn broadcast(&self, event: Event) { + for entry in self.connections.iter() { + if let Err(e) = entry.value().sender.send(Ok(event.clone())) { + warn!( + "Failed to send broadcast to connection {}: {}", + entry.key().as_str(), e + ); + } + } + } + + /// Get total connection count - O(1) + pub fn connection_count(&self) -> usize { + self.connections.len() + } + + /// Get active user count - O(1) + pub fn active_user_count(&self) -> usize { + self.user_index.len() + } + + /// Get connections per user (for monitoring/debugging) - O(1) + pub fn connections_per_user(&self, user_id: &Id) -> usize { + self.user_index + .get(user_id) + .map(|set| set.len()) + .unwrap_or(0) + } +} + +impl Default for ConnectionRegistry { + fn default() -> Self { + Self::new() + } } ``` -**Why these fields:** -- `connection_id`: Server-generated UUID for internal tracking in DashMap -- `user_id`: From authenticated session (via AuthenticatedUser extractor) -- `sender`: Channel to send events to this specific connection +**Performance characteristics:** +- Registration: O(1) +- Unregistration: O(1) +- Send to specific user: O(1) + O(k) where k = user's connections (typically 1-3) +- Broadcast: O(n) where n = total connections +- Get active users: O(1) --- -### 2.5 Implement SSE Manager +### 3.5 Implement SSE Manager **File:** `web/src/sse/manager.rs` -**Purpose:** Central registry for managing all SSE connections and routing messages +**Purpose:** Central manager for routing messages to connections via the registry **Key struct:** ```rust -use crate::sse::connection::Metadata as ConnectionMetadata; +use crate::sse::connection::{ConnectionRegistry, ConnectionId}; use crate::sse::message::{MessageScope, Event as SseEvent, Message as SseMessage}; use axum::response::sse::Event; -use dashmap::DashMap; use domain::Id; use log::*; use std::sync::Arc; pub struct Manager { - connections: Arc>, + registry: Arc, } impl Manager { pub fn new() -> Self { Self { - connections: Arc::new(DashMap::new()), + registry: Arc::new(ConnectionRegistry::new()), } } - pub fn register_connection(&self, metadata: ConnectionMetadata) { - let connection_id = metadata.connection_id.clone(); - debug!( - "Registering SSE connection {} for user {}", - connection_id, metadata.user_id - ); - self.connections.insert(connection_id, metadata); + /// Register a new connection and return its unique ID + pub fn register_connection( + &self, + user_id: Id, + sender: tokio::sync::mpsc::UnboundedSender>, + ) -> ConnectionId { + let connection_id = self.registry.register(user_id, sender); + debug!("Registered SSE connection {} for user {}", connection_id.as_str(), user_id); + connection_id } - pub fn unregister_connection(&self, connection_id: &str) { - debug!("Unregistering SSE connection {}", connection_id); - let connection = self.connections.remove(connection_id); - - if connection.is_none() { - warn!("Attempted to remove SSE Connection {} but connection did not exist", connection_id); - } + /// Unregister a connection by ID + pub fn unregister_connection(&self, connection_id: &ConnectionId) { + debug!("Unregistering SSE connection {}", connection_id.as_str()); + self.registry.unregister(connection_id); } + /// Send a message based on its scope pub fn send_message(&self, message: SseMessage) { - let event_type = format!("{:?}", message.event).split('(').next().unwrap().to_lowercase(); + use crate::sse::message::EventType; - for entry in self.connections.iter() { - let metadata = entry.value(); + let event_type = message.event.event_type(); - if Self::should_receive_message(metadata, &message.scope) { - let event_data = match serde_json::to_string(&message.event) { - Ok(json) => json, - Err(e) => { - error!("Failed to serialize SSE event: {}", e); - continue; - } - }; - - let event = Event::default() - .event(&event_type) - .data(event_data); - - if let Err(e) = metadata.sender.send(Ok(event)) { - warn!( - "Failed to send SSE event to connection {}: {}", - metadata.connection_id, e - ); - // Connection is closed, will be cleaned up on next unregister - } + let event_data = match serde_json::to_string(&message.event) { + Ok(json) => json, + Err(e) => { + error!("Failed to serialize SSE event: {}", e); + return; } - } - } + }; - fn should_receive_message(metadata: &ConnectionMetadata, scope: &MessageScope) -> bool { - match scope { - MessageScope::User { user_id } => metadata.user_id == *user_id, - MessageScope::Broadcast => true, + let event = Event::default() + .event(event_type) + .data(event_data); + + match message.scope { + MessageScope::User { user_id } => { + self.registry.send_to_user(&user_id, event); + } + MessageScope::Broadcast => { + self.registry.broadcast(event); + } } } + /// Get total connection count pub fn connection_count(&self) -> usize { - self.connections.len() + self.registry.connection_count() + } + + /// Get active user count + pub fn active_user_count(&self) -> usize { + self.registry.active_user_count() } } @@ -547,13 +1800,13 @@ impl Default for Manager { ``` **Message routing logic:** -- User scope: Send to all connections where `metadata.user_id == target_user_id` -- Broadcast: Send to all connections +- User scope: O(1) lookup to user's connections, send to all (typically 1-3) +- Broadcast: O(n) iteration through all connections - Backend determines recipients based on business logic (not client-controlled) --- -### 2.6 Implement SSE Handler +### 3.6 Implement SSE Handler **File:** `web/src/sse/handler.rs` **Purpose:** Axum HTTP handler for SSE endpoint @@ -561,13 +1814,11 @@ impl Default for Manager { **Handler signature:** ```rust use crate::extractors::authenticated_user::AuthenticatedUser; -use crate::sse::connection::ConnectionMetadata; use crate::AppState; use async_stream::try_stream; use axum::extract::State; use axum::response::sse::{Event, KeepAlive, Sse}; use axum::response::IntoResponse; -use futures_util::stream::Stream; use log::*; use std::convert::Infallible; use tokio::sync::mpsc; @@ -582,10 +1833,8 @@ pub async fn sse_handler( let (tx, mut rx) = mpsc::unbounded_channel(); - let metadata = ConnectionMetadata::new(user.id, tx); - let connection_id = metadata.connection_id.clone(); - - app_state.sse_manager.register_connection(metadata); + // Register returns the connection_id + let connection_id = app_state.sse_manager.register_connection(user.id, tx); let manager = app_state.sse_manager.clone(); @@ -605,14 +1854,14 @@ pub async fn sse_handler( **Implementation approach:** 1. Extract user from authenticated session (via cookie) 2. Create channel for this connection -3. Register connection with SseManager +3. Register connection with Manager (returns ConnectionId) 4. Create async stream that yields events from channel -5. On stream drop, unregister connection +5. On stream drop, unregister connection using ConnectionId 6. Keep-alive every 15 seconds (default) prevents nginx timeout --- -### 2.7 Add Module Documentation +### 3.7 Add Module Documentation **File:** `web/src/sse/mod.rs` ```rust @@ -625,6 +1874,8 @@ pub async fn sse_handler( //! //! - **Single connection per user**: Each authenticated user establishes one //! SSE connection that stays open across page navigation. +//! - **Dual-index registry**: O(1) lookups for both connection management and +//! user-scoped message routing via separate DashMap indices. //! - **User and Broadcast scopes**: Messages can be sent to specific users or //! broadcast to all connected users. //! - **Ephemeral messages**: All events are ephemeral - if a user is offline, @@ -636,11 +1887,12 @@ pub async fn sse_handler( //! //! 1. Frontend establishes SSE connection via `/sse` endpoint //! 2. Backend extracts user from session cookie (AuthenticatedUser) -//! 3. Connection registered in Manager with user_id +//! 3. Connection registered in ConnectionRegistry with dual indices //! 4. When a resource changes (e.g., action created): //! - Controller determines recipient (e.g., other user in relationship) //! - Controller sends message via `app_state.sse_manager.send_message()` -//! - SseManager filters connections by scope and forwards event +//! - Manager performs O(1) lookup in user_index to find connections +//! - Events sent only to matching connections //! 5. Frontend receives event and updates UI based on context //! //! # Example: Sending an event @@ -680,9 +1932,9 @@ pub async fn sse_handler( //! //! # Modules //! -//! - `connection`: Connection metadata and tracking +//! - `connection`: ConnectionRegistry with dual-index architecture and type-safe ConnectionId //! - `handler`: Axum SSE endpoint handler -//! - `manager`: Central connection registry and message routing +//! - `manager`: High-level message routing (delegates to ConnectionRegistry) //! - `message`: Type-safe event and scope definitions pub mod connection; @@ -695,7 +1947,7 @@ pub use manager::Manager; --- -### 2.8 Update AppState +### 3.8 Update AppState **File:** `service/src/lib.rs` **Add SseManager to AppState:** @@ -713,7 +1965,7 @@ pub struct AppState { --- -### 2.9 Add SSE Route +### 3.9 Add SSE Route **File:** `web/src/router.rs` **Add SSE endpoint:** @@ -739,7 +1991,7 @@ pub fn define_routes(app_state: AppState) -> Router { --- -### 2.10 Initialize SSE Manager +### 3.10 Initialize SSE Manager **File:** `src/main.rs` ```rust @@ -753,9 +2005,9 @@ let app_state = AppState { --- -## Phase 3: Integration with Controllers +## Phase 4: Integration with Controllers -### 3.1 Update Action Controller +### 4.1 Update Action Controller **File:** `web/src/controller/action_controller.rs` **After creating an action, send SSE event to the other user in the coaching relationship:** @@ -830,7 +2082,7 @@ async fn determine_other_user_in_coaching_session( --- -### 3.2 Handle Auth Changes (Security) +### 4.2 Handle Auth Changes (Security) **File:** `web/src/controller/user_session_controller.rs` **On logout, send ForceLogout event:** @@ -864,9 +2116,9 @@ pub async fn delete( --- -## Phase 4: Frontend Integration +## Phase 5: Frontend Integration -### 4.1 Create SSE Client Hook +### 5.1 Create SSE Client Hook **File:** `~/Desktop/refactor/refactor-platform-fe/src/hooks/useSSE.ts` **Purpose:** React hook to establish and manage app-wide SSE connection @@ -907,7 +2159,7 @@ export function useSSE() { --- -### 4.2 Create Typed Event Handler Hook +### 5.2 Create Typed Event Handler Hook **File:** `~/Desktop/refactor/refactor-platform-fe/src/hooks/useSSEEventHandler.ts` **Purpose:** Type-safe event handler registration @@ -945,7 +2197,7 @@ export function useSSEEventHandler( --- -### 4.3 Establish SSE in App Root +### 5.3 Establish SSE in App Root **File:** App root component or layout ```typescript @@ -970,7 +2222,7 @@ function AppLayout({ children }: Props) { --- -### 4.4 Use SSE in Coaching Session Page +### 5.4 Use SSE in Coaching Session Page **File:** Coaching session page component ```typescript @@ -1017,127 +2269,6 @@ function CoachingSessionPage({ sessionId }: Props) { --- -## Phase 5: Testing - -### 5.1 Backend Unit Tests -**File:** `web/src/sse/manager.rs` (tests module) - -**Test cases:** -- Connection registration/unregistration -- User-scoped message routing (only target user receives) -- Broadcast message routing (all users receive) -- Connection count tracking -- Concurrent connection management - -```rust -#[cfg(test)] -mod tests { - use super::*; - use crate::sse::messages::{MessageScope, Event as SseEvent, Message as SseMessage}; - use tokio::sync::mpsc; - - #[test] - fn connection_registration_adds_connection_to_manager() { - let manager = SseManager::new(); - let (tx, _rx) = mpsc::unbounded_channel(); - let user_id = domain::Id::new_v4(); - - let metadata = ConnectionMetadata::new(user_id, tx); - let connection_id = metadata.connection_id.clone(); - - manager.register_connection(metadata); - assert_eq!(manager.connection_count(), 1); - - manager.unregister_connection(&connection_id); - assert_eq!(manager.connection_count(), 0); - } - - #[tokio::test] - async fn user_scoped_message_is_received_by_correct_user() { - let manager = SseManager::new(); - - let user1_id = domain::Id::new_v4(); - let user2_id = domain::Id::new_v4(); - - let (tx1, mut rx1) = mpsc::unbounded_channel(); - let (tx2, mut rx2) = mpsc::unbounded_channel(); - - manager.register_connection(ConnectionMetadata::new(user1_id, tx1)); - manager.register_connection(ConnectionMetadata::new(user2_id, tx2)); - - // Send message to user1 only - manager.send_message(SseMessage { - event: SseEvent::ForceLogout { - reason: "test".to_string(), - }, - scope: MessageScope::User { user_id: user1_id }, - }); - - // User1 receives message - assert!(rx1.try_recv().is_ok()); - // User2 does not - assert!(rx2.try_recv().is_err()); - } - - #[tokio::test] - async fn broadcast_message_is_received_by_all_users() { - let manager = SseManager::new(); - - let (tx1, mut rx1) = mpsc::unbounded_channel(); - let (tx2, mut rx2) = mpsc::unbounded_channel(); - - manager.register_connection(ConnectionMetadata::new( - domain::Id::new_v4(), - tx1, - )); - manager.register_connection(ConnectionMetadata::new( - domain::Id::new_v4(), - tx2, - )); - - manager.send_message(SseMessage { - event: SseEvent::ForceLogout { - reason: "maintenance".to_string(), - }, - scope: MessageScope::Broadcast, - }); - - // Both users receive message - assert!(rx1.try_recv().is_ok()); - assert!(rx2.try_recv().is_ok()); - } -} -``` - ---- - -### 5.2 Backend Integration Tests -**File:** `web/tests/sse_integration_test.rs` - -**Test cases:** -- Unauthenticated requests return 401 -- SSE connection established with valid session -- Connection metadata extracted correctly -- Events flow correctly through the stream -- Connection cleanup on disconnect -- Keep-alive messages sent at correct interval - ---- - -### 5.3 End-to-End Test -**Manual testing scenario:** -1. Open two browser windows -2. Log in as Coach in window 1, Coachee in window 2 -3. Navigate both to same coaching session -4. Create action in window 1 -5. Verify action appears in window 2 without refresh -6. Verify action appears immediately (not delayed) -7. Test with Notes, Agreements, Goals -8. Test force logout (admin forces logout in one window, other windows redirect) -9. Test connection reconnection (kill backend, restart, verify SSE reconnects) - ---- - ## Architecture Diagram ``` @@ -1165,13 +2296,13 @@ mod tests { │ ┌────────────────────────────────────────────────┐ │ │ │ Manager (manager.rs) │ │ │ │ ┌──────────────────────────────────────────┐ │ │ -│ │ │ DashMap │ │ │ -│ │ │ - connection_1 → {user_id, sender} │ │ │ -│ │ │ - connection_2 → {user_id, sender} │ │ │ +│ │ │ ConnectionRegistry │ │ │ +│ │ │ • Primary: DashMap │ │ │ +│ │ │ • Secondary: DashMap │ │ │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ │ │ │ send_message(Message) │ │ -│ │ → Filter connections by scope │ │ +│ │ → O(1) lookup in user_index │ │ │ │ → Send to matching channels │ │ │ └──────────────────▲───────────────────────────┘ │ │ │ │ diff --git a/nginx/conf.d/refactor-platform.conf b/nginx/conf.d/refactor-platform.conf index 4877d8ce..8dcf2858 100644 --- a/nginx/conf.d/refactor-platform.conf +++ b/nginx/conf.d/refactor-platform.conf @@ -136,6 +136,40 @@ server { add_header Content-Type text/plain; } + # SSE endpoint requires special configuration to prevent nginx from + # buffering events or timing out long-lived connections. Without these + # settings, SSE events would be delayed and connections would close after + # 60 seconds. The 15-second keep-alive from Axum ensures the connection + # stays healthy within the 24-hour timeout window. + location /api/sse { + rewrite ^/api(.*)$ $1 break; + proxy_pass http://backend; + + # SSE-specific settings + proxy_buffering off; # Enable immediate event streaming + proxy_cache off; # No caching for real-time streams + proxy_read_timeout 24h; # Allow long-lived connections + proxy_connect_timeout 60s; + proxy_send_timeout 60s; + + # Standard proxy headers + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header X-Forwarded-Host $host; + proxy_set_header X-Forwarded-Port $server_port; + proxy_set_header X-Request-ID $http_x_request_id$request_id; + proxy_set_header Connection ''; # Clear connection header for streaming + + # Enable chunked transfer encoding + chunked_transfer_encoding on; + + # CORS headers (same as other API routes) + add_header 'Access-Control-Allow-Origin' 'https://myrefactor.com' always; + add_header 'Access-Control-Allow-Credentials' 'true' always; + } + # Frontend routing - everything else goes to Next.js location / { # Proxy to the frontend container using upstream diff --git a/service/Cargo.toml b/service/Cargo.toml index f95995d3..3f1f6967 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -15,6 +15,8 @@ features = [ ] [dependencies] +sse = { path = "../sse" } + clap = { version = "4.5.20", features = ["cargo", "derive", "env"] } dotenvy = "0.15" log = "0.4.22" diff --git a/service/src/lib.rs b/service/src/lib.rs index 7daefe3d..9e22746b 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -28,13 +28,19 @@ pub async fn init_database(database_url: &str) -> Result, pub config: Config, + pub sse_manager: Arc, } impl AppState { - pub fn new(app_config: Config, db: &Arc) -> Self { + pub fn new( + app_config: Config, + db: &Arc, + sse_manager: Arc, + ) -> Self { Self { database_connection: Arc::clone(db), config: app_config, + sse_manager, } } diff --git a/src/bin/seed_db.rs b/src/bin/seed_db.rs index bc6c0bae..c98d2ca3 100644 --- a/src/bin/seed_db.rs +++ b/src/bin/seed_db.rs @@ -10,8 +10,9 @@ async fn main() { info!("Seeding database [{}]...", config.database_url()); let db = Arc::new(service::init_database(config.database_url()).await.unwrap()); + let sse_manager = Arc::new(sse::Manager::new()); - let app_state = AppState::new(config, &db); + let app_state = AppState::new(config, &db, sse_manager); entity_api::seed_database(app_state.db_conn_ref()).await; } diff --git a/src/main.rs b/src/main.rs index b7630eb2..c00f1ecc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,7 +45,9 @@ async fn main() { panic!("Failed to establish a useable DBConnection and ping the DB successfully."); } - let app_state = AppState::new(config, &db_conn); + let sse_manager = Arc::new(sse::Manager::new()); + + let app_state = AppState::new(config, &db_conn, sse_manager); web::init_server(app_state).await.unwrap(); } diff --git a/sse-test-client/Cargo.toml b/sse-test-client/Cargo.toml new file mode 100644 index 00000000..dfe8e39e --- /dev/null +++ b/sse-test-client/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "sse-test-client" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "sse-test-client" +path = "src/main.rs" + +[dependencies] +# HTTP client +reqwest = { version = "0.11", features = ["json", "cookies"] } + +# SSE parsing +eventsource-client = "0.12" +futures-util = "0.3" + +# CLI +clap = { version = "4.5", features = ["derive"] } + +# Async runtime +tokio = { version = "1", features = ["full"] } + +# Serialization +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + +# Output formatting +colored = "2.1" +anyhow = "1.0" + +# Logging +log = "0.4" +env_logger = "0.11" + +# Utilities +uuid = { version = "1.6", features = ["v4", "serde"] } diff --git a/sse-test-client/README.md b/sse-test-client/README.md new file mode 100644 index 00000000..6b15e021 --- /dev/null +++ b/sse-test-client/README.md @@ -0,0 +1,204 @@ +# SSE Test Client + +A standalone Rust binary for testing Server-Sent Events (SSE) functionality without requiring a frontend client. The tool authenticates as two users, establishes SSE connections, triggers events via API calls, and validates that events are received correctly. + +## Overview + +This tool validates the SSE infrastructure by: +1. Authenticating two users (typically a coach and coachee) +2. Establishing SSE connections for both users +3. Optionally creating a test coaching relationship and session (for action tests) +4. Triggering events (create/update/delete actions, force logout) +5. Verifying that the correct SSE events are received by the appropriate users + +## Prerequisites + +- Backend server running (default: `http://localhost:4000`) +- Two valid user accounts with credentials +- **For action tests only**: Users must have admin permission to create coaching relationships +- **For connection test**: No special permissions required + +## Usage + +### Run Individual Test Scenarios + +```bash +# Test basic SSE connection (no admin permissions required) +cargo run -p sse-test-client -- \ + --base-url http://localhost:4000 \ + --user1 "user1@example.com:password123" \ + --user2 "user2@example.com:password456" \ + --scenario connection-test + +# Test action creation (requires admin permissions) +cargo run -p sse-test-client -- \ + --base-url http://localhost:4000 \ + --user1 "coach@example.com:password123" \ + --user2 "coachee@example.com:password456" \ + --scenario action-create + +# Test action update (requires admin permissions) +cargo run -p sse-test-client -- \ + --base-url http://localhost:4000 \ + --user1 "coach@example.com:password123" \ + --user2 "coachee@example.com:password456" \ + --scenario action-update + +# Test action delete (requires admin permissions) +cargo run -p sse-test-client -- \ + --base-url http://localhost:4000 \ + --user1 "coach@example.com:password123" \ + --user2 "coachee@example.com:password456" \ + --scenario action-delete + +# Test force logout (requires admin permissions - NOT YET IMPLEMENTED) +cargo run -p sse-test-client -- \ + --base-url http://localhost:4000 \ + --user1 "admin@example.com:adminpass" \ + --user2 "user@example.com:userpass" \ + --scenario force-logout-test +``` + +### Run All Tests + +```bash +cargo run -p sse-test-client -- \ + --base-url http://localhost:4000 \ + --user1 "coach@example.com:password123" \ + --user2 "coachee@example.com:password456" \ + --scenario all +``` + +### Enable Verbose Logging + +```bash +cargo run -p sse-test-client -- \ + --base-url http://localhost:4000 \ + --user1 "coach@example.com:password123" \ + --user2 "coachee@example.com:password456" \ + --scenario all \ + --verbose +``` + +## Available Scenarios + +- `connection-test` - Tests basic SSE connectivity without creating any data (no admin permissions required) +- `action-create` - Tests SSE events for action creation (requires admin permissions) +- `action-update` - Tests SSE events for action updates (requires admin permissions) +- `action-delete` - Tests SSE events for action deletion (requires admin permissions) +- `force-logout-test` - Tests SSE events for force logout (requires admin permissions, NOT YET IMPLEMENTED) +- `all` - Runs all test scenarios sequentially (requires admin permissions for action tests) + +## Command-Line Arguments + +| Argument | Required | Description | +|----------|----------|-------------| +| `--base-url` | Yes | Base URL of the backend (e.g., `http://localhost:000`) | +| `--user1` | Yes | User 1 credentials in format `email:password` | +| `--user2` | Yes | User 2 credentials in format `email:password` | +| `--scenario` | Yes | Test scenario to run (see Available Scenarios) | +| `--verbose` or `-v` | No | Enable verbose output with debug logging | + +## How It Works + +### Setup Phase +1. Authenticates both users and obtains session cookies +2. For action tests: Creates a coaching relationship and session between the users +3. For connection test: Skips coaching data setup +4. Establishes SSE connections for both users + +### Test Phase +For each scenario: +1. **Connection Test**: Verifies SSE connections are established and remain stable +2. **Action Tests**: User 1 triggers an action (e.g., creates an action), the tool waits for User 2 to receive the corresponding SSE event, and validates event data +3. Records the test result (pass/fail) and duration + +### Results Phase +- Displays a summary of all test results +- Shows pass/fail status with durations +- Exits with code 0 if all tests pass, 1 if any fail + +## Example Output + +### Connection Test (No Admin Required) +``` +=== SETUP PHASE === +→ Authenticating users... +✓ User 1 authenticated (ID: 123e4567-e89b-12d3-a456-426614174000) +✓ User 2 authenticated (ID: 234e5678-e89b-12d3-a456-426614174001) + +→ Skipping test environment setup (not needed for this test) + +→ Establishing SSE connections... +✓ User 1 SSE connection established +✓ User 2 SSE connection established + +=== TEST PHASE === + +=== TEST: Connection Test === +Testing basic SSE connectivity without creating any data +✓ User 1 (123e4567-e89b-12d3-a456-426614174000) SSE connection: established +✓ User 2 (234e5678-e89b-12d3-a456-426614174001) SSE connection: established +→ Waiting 2 seconds to verify connections stay alive... +✓ Connections remain stable +✓ SSE infrastructure is working correctly + +=== RESULTS === +=== TEST SUMMARY === +[PASS] connection_test (2.002086s) + SSE connections established and maintained successfully + +Results: 1 passed, 0 failed + +All tests passed! ✓ +``` + +### Action Test (Requires Admin) +``` +=== SETUP PHASE === +→ Authenticating users... +✓ User 1 authenticated (ID: 123e4567-e89b-12d3-a456-426614174000) +✓ User 2 authenticated (ID: 234e5678-e89b-12d3-a456-426614174001) + +→ Creating test coaching relationship and session... +✓ Coaching relationship created (ID: 345e6789-e89b-12d3-a456-426614174002) +✓ Coaching session created (ID: 456e789a-e89b-12d3-a456-426614174003) + +→ Establishing SSE connections... +✓ User 1 SSE connection established +✓ User 2 SSE connection established + +=== TEST PHASE === + +=== TEST: Action Create === +→ User 1 creating action... +✓ Action created (ID: 567e89ab-e89b-12d3-a456-426614174004) +→ Waiting for User 2 to receive action_created event... + +[User 2 (Coachee)] action_created event received + { + "type": "action_created", + "data": { + "coaching_session_id": "456e789a-e89b-12d3-a456-426614174003", + "action": { ... } + } + } +✓ Event data verified correctly + +=== RESULTS === +=== TEST SUMMARY === +[PASS] action_create (234ms) + +Results: 1 passed, 0 failed + +All tests passed! ✓ +``` + +## Module Structure + +- `main.rs` - CLI entry point and scenario orchestration +- `auth.rs` - User authentication and session management +- `sse_client.rs` - SSE connection handling and event listening +- `api_client.rs` - API calls to create test data and trigger events +- `scenarios.rs` - Test scenario implementations +- `output.rs` - Color-coded console output formatting diff --git a/sse-test-client/src/api_client.rs b/sse-test-client/src/api_client.rs new file mode 100644 index 00000000..78682f62 --- /dev/null +++ b/sse-test-client/src/api_client.rs @@ -0,0 +1,270 @@ +use anyhow::{Context, Result}; +use reqwest::Client; +use serde_json::{json, Value}; + +pub struct ApiClient { + client: Client, + base_url: String, +} + +#[derive(Debug, Clone)] +pub struct TestEnvironment { + pub relationship_id: String, + pub session_id: String, +} + +impl ApiClient { + pub fn new(client: Client, base_url: String) -> Self { + Self { client, base_url } + } + + pub async fn setup_test_environment( + &self, + coach_session: &str, + _coachee_session: &str, + coach_id: &str, + coachee_id: &str, + ) -> Result { + // Get user's organizations to find organization_id + let organizations = self.get_user_organizations(coach_session, coach_id).await?; + let organization_id = organizations[0]["id"] + .as_str() + .context("No organization ID found")?; + + // Create coaching relationship + let relationship = self + .create_coaching_relationship(coach_session, organization_id, coach_id, coachee_id) + .await?; + + let relationship_id = relationship["id"] + .as_str() + .context("No relationship ID in response")? + .to_string(); + + // Create coaching session + let session = self + .create_coaching_session(coach_session, &relationship_id) + .await?; + + let session_id = session["id"] + .as_str() + .context("No session ID in response")? + .to_string(); + + Ok(TestEnvironment { + relationship_id, + session_id, + }) + } + + async fn get_user_organizations(&self, session_cookie: &str, user_id: &str) -> Result { + let url = format!("{}/users/{}/organizations", self.base_url, user_id); + + let response = self + .client + .get(&url) + .header("Cookie", format!("id={}", session_cookie)) + .header("x-version", "1.0.0-beta1") + .send() + .await + .context("Failed to get user organizations")?; + + if !response.status().is_success() { + let status = response.status(); + let body = response + .text() + .await + .unwrap_or_else(|_| "Unable to read response body".to_string()); + anyhow::bail!( + "Failed to get organizations: {} - Response: {}", + status, + body + ); + } + + let api_response: Value = response.json().await.context("Failed to parse response")?; + + // Extract the data array from ApiResponse wrapper + api_response["data"] + .as_array() + .context("No data array in response") + .map(|arr| Value::Array(arr.clone())) + } + + async fn create_coaching_relationship( + &self, + session_cookie: &str, + organization_id: &str, + coach_id: &str, + coachee_id: &str, + ) -> Result { + let url = format!( + "{}/organizations/{}/coaching_relationships", + self.base_url, organization_id + ); + + let response = self + .client + .post(&url) + .header("Cookie", format!("id={}", session_cookie)) + .header("x-version", "1.0.0-beta1") + .json(&json!({ + "coach_id": coach_id, + "coachee_id": coachee_id, + })) + .send() + .await + .context("Failed to create coaching relationship")?; + + if !response.status().is_success() { + anyhow::bail!("Failed to create relationship: {}", response.status()); + } + + let api_response: Value = response.json().await.context("Failed to parse response")?; + + // Extract the data from ApiResponse wrapper + api_response["data"] + .as_object() + .context("No data object in response") + .map(|obj| Value::Object(obj.clone())) + } + + async fn create_coaching_session( + &self, + session_cookie: &str, + relationship_id: &str, + ) -> Result { + let url = format!("{}/coaching_sessions", self.base_url); + + let response = self + .client + .post(&url) + .header("Cookie", format!("id={}", session_cookie)) + .header("x-version", "1.0.0-beta1") + .json(&json!({ + "coaching_relationship_id": relationship_id, + "date": "2024-01-01", + })) + .send() + .await + .context("Failed to create coaching session")?; + + if !response.status().is_success() { + anyhow::bail!("Failed to create session: {}", response.status()); + } + + let api_response: Value = response.json().await.context("Failed to parse response")?; + + // Extract the data from ApiResponse wrapper + api_response["data"] + .as_object() + .context("No data object in response") + .map(|obj| Value::Object(obj.clone())) + } + + pub async fn create_action( + &self, + session_cookie: &str, + coaching_session_id: &str, + title: &str, + ) -> Result { + let url = format!("{}/actions", self.base_url); + + let response = self + .client + .post(&url) + .header("Cookie", format!("id={}", session_cookie)) + .header("x-version", "1.0.0-beta1") + .json(&json!({ + "coaching_session_id": coaching_session_id, + "title": title, + "description": "Created by SSE test tool", + "status": "not_started", + })) + .send() + .await + .context("Failed to create action")?; + + if !response.status().is_success() { + anyhow::bail!("Failed to create action: {}", response.status()); + } + + let api_response: Value = response.json().await.context("Failed to parse response")?; + + // Extract the data from ApiResponse wrapper + api_response["data"] + .as_object() + .context("No data object in response") + .map(|obj| Value::Object(obj.clone())) + } + + pub async fn update_action( + &self, + session_cookie: &str, + action_id: &str, + title: &str, + ) -> Result { + let url = format!("{}/actions/{}", self.base_url, action_id); + + let response = self + .client + .put(&url) + .header("Cookie", format!("id={}", session_cookie)) + .header("x-version", "1.0.0-beta1") + .json(&json!({ + "title": title, + })) + .send() + .await + .context("Failed to update action")?; + + if !response.status().is_success() { + anyhow::bail!("Failed to update action: {}", response.status()); + } + + let api_response: Value = response.json().await.context("Failed to parse response")?; + + // Extract the data from ApiResponse wrapper + api_response["data"] + .as_object() + .context("No data object in response") + .map(|obj| Value::Object(obj.clone())) + } + + pub async fn delete_action(&self, session_cookie: &str, action_id: &str) -> Result<()> { + let url = format!("{}/actions/{}", self.base_url, action_id); + + let response = self + .client + .delete(&url) + .header("Cookie", format!("id={}", session_cookie)) + .header("x-version", "1.0.0-beta1") + .send() + .await + .context("Failed to delete action")?; + + if !response.status().is_success() { + anyhow::bail!("Failed to delete action: {}", response.status()); + } + + Ok(()) + } + + pub async fn force_logout(&self, admin_session_cookie: &str, user_id: &str) -> Result<()> { + let url = format!("{}/admin/force_logout/{}", self.base_url, user_id); + + let response = self + .client + .post(&url) + .header("Cookie", format!("id={}", admin_session_cookie)) + .send() + .await + .context("Failed to force logout")?; + + if !response.status().is_success() { + anyhow::bail!("Failed to force logout: {}", response.status()); + } + + Ok(()) + } +} diff --git a/sse-test-client/src/auth.rs b/sse-test-client/src/auth.rs new file mode 100644 index 00000000..b7135fca --- /dev/null +++ b/sse-test-client/src/auth.rs @@ -0,0 +1,91 @@ +use anyhow::{Context, Result}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone)] +pub struct UserCredentials { + pub email: String, + pub password: String, +} + +impl UserCredentials { + pub fn parse(input: &str) -> Result { + let parts: Vec<&str> = input.split(':').collect(); + if parts.len() != 2 { + anyhow::bail!("Invalid credentials format. Expected email:password"); + } + Ok(Self { + email: parts[0].to_string(), + password: parts[1].to_string(), + }) + } +} + +#[derive(Debug, Clone)] +pub struct AuthenticatedUser { + pub user_id: String, + pub session_cookie: String, + pub credentials: UserCredentials, +} + +#[derive(Debug, Serialize)] +struct LoginRequest { + email: String, + password: String, +} + +#[derive(Debug, Deserialize)] +struct UserData { + id: String, +} + +#[derive(Debug, Deserialize)] +struct ApiResponse { + data: UserData, +} + +pub async fn login( + client: &Client, + base_url: &str, + credentials: &UserCredentials, +) -> Result { + let url = format!("{}/login", base_url); + + let response = client + .post(&url) + .form(&LoginRequest { + email: credentials.email.clone(), + password: credentials.password.clone(), + }) + .send() + .await + .context("Failed to send login request")?; + + if !response.status().is_success() { + let status = response.status(); + let body = response + .text() + .await + .unwrap_or_else(|_| "Unable to read response body".to_string()); + anyhow::bail!("Login failed: {} - Response: {}", status, body); + } + + // Extract session cookie + let session_cookie = response + .cookies() + .find(|cookie| cookie.name() == "id") + .context("No session cookie in response")? + .value() + .to_string(); + + let api_response: ApiResponse = response + .json() + .await + .context("Failed to parse login response")?; + + Ok(AuthenticatedUser { + user_id: api_response.data.id, + session_cookie, + credentials: credentials.clone(), + }) +} diff --git a/sse-test-client/src/main.rs b/sse-test-client/src/main.rs new file mode 100644 index 00000000..f4ffd3e4 --- /dev/null +++ b/sse-test-client/src/main.rs @@ -0,0 +1,268 @@ +use anyhow::Result; +use clap::Parser; +use colored::*; + +mod api_client; +mod auth; +mod output; +mod scenarios; +mod sse_client; + +use api_client::ApiClient; +use auth::{login, UserCredentials}; +use output::print_test_summary; +use sse_client::Connection; + +#[derive(Parser)] +#[command(name = "sse-test-client")] +#[command(about = "SSE Integration Testing Tool")] +struct Cli { + /// Base URL of the backend (e.g., http://localhost:4747) + #[arg(long)] + base_url: String, + + /// User 1 credentials (format: email:password) + #[arg(long)] + user1: String, + + /// User 2 credentials (format: email:password) + #[arg(long)] + user2: String, + + /// Test scenario to run + #[arg(long, value_enum)] + scenario: ScenarioChoice, + + /// Enable verbose output + #[arg(long, short)] + verbose: bool, +} + +#[derive(clap::ValueEnum, Clone)] +enum ScenarioChoice { + /// Test basic SSE connection without creating any data + ConnectionTest, + /// Test force logout event (no coaching data needed) + ForceLogoutTest, + /// Test action create event (requires coaching session) + ActionCreate, + /// Test action update event (requires coaching session) + ActionUpdate, + /// Test action delete event (requires coaching session) + ActionDelete, + /// Run all tests including those requiring coaching data + All, +} + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::parse(); + + if cli.verbose { + env_logger::Builder::from_default_env() + .filter_level(log::LevelFilter::Debug) + .init(); + } + + println!("{}", "=== SETUP PHASE ===".bright_white().bold()); + + // Parse credentials + let user1_creds = UserCredentials::parse(&cli.user1)?; + let user2_creds = UserCredentials::parse(&cli.user2)?; + + // Authenticate users + println!("{} Authenticating users...", "→".blue()); + let client = reqwest::Client::new(); + let user1 = login(&client, &cli.base_url, &user1_creds).await?; + let user2 = login(&client, &cli.base_url, &user2_creds).await?; + + println!( + "{} User 1 authenticated (ID: {})", + "✓".green(), + user1.user_id + ); + println!( + "{} User 2 authenticated (ID: {})", + "✓".green(), + user2.user_id + ); + + // Set up test environment only for scenarios that need coaching data + let api_client = ApiClient::new(client.clone(), cli.base_url.clone()); + let test_env = match cli.scenario { + ScenarioChoice::ConnectionTest | ScenarioChoice::ForceLogoutTest => { + println!( + "\n{} Skipping test environment setup (not needed for this test)", + "→".blue() + ); + None + } + _ => { + println!( + "\n{} Creating test coaching relationship and session...", + "→".blue() + ); + let env = api_client + .setup_test_environment( + &user1.session_cookie, + &user2.session_cookie, + &user1.user_id, + &user2.user_id, + ) + .await?; + + println!( + "{} Coaching relationship created (ID: {})", + "✓".green(), + env.relationship_id + ); + println!( + "{} Coaching session created (ID: {})", + "✓".green(), + env.session_id + ); + Some(env) + } + }; + + // Establish SSE connections + println!("\n{} Establishing SSE connections...", "→".blue()); + let mut sse1 = Connection::establish( + &cli.base_url, + &user1.session_cookie, + "User 1 (Coach)".to_string(), + ) + .await?; + + let mut sse2 = Connection::establish( + &cli.base_url, + &user2.session_cookie, + "User 2 (Coachee)".to_string(), + ) + .await?; + + println!("{} User 1 SSE connection established", "✓".green()); + println!("{} User 2 SSE connection established", "✓".green()); + + // Run test scenarios + println!("\n{}", "=== TEST PHASE ===".bright_white().bold()); + + let mut results = Vec::new(); + + match cli.scenario { + ScenarioChoice::ConnectionTest => { + results.push(scenarios::test_connection(&user1, &user2, &mut sse1, &mut sse2).await?); + } + ScenarioChoice::ForceLogoutTest => { + results.push( + scenarios::test_force_logout(&user1, &user2, &api_client, &mut sse1, &mut sse2) + .await?, + ); + } + ScenarioChoice::ActionCreate => { + let env = test_env + .as_ref() + .expect("Test environment required for ActionCreate"); + results.push( + scenarios::test_action_create( + &user1, + &user2, + env, + &api_client, + &mut sse1, + &mut sse2, + ) + .await?, + ); + } + ScenarioChoice::ActionUpdate => { + let env = test_env + .as_ref() + .expect("Test environment required for ActionUpdate"); + results.push( + scenarios::test_action_update( + &user1, + &user2, + env, + &api_client, + &mut sse1, + &mut sse2, + ) + .await?, + ); + } + ScenarioChoice::ActionDelete => { + let env = test_env + .as_ref() + .expect("Test environment required for ActionDelete"); + results.push( + scenarios::test_action_delete( + &user1, + &user2, + env, + &api_client, + &mut sse1, + &mut sse2, + ) + .await?, + ); + } + ScenarioChoice::All => { + results.push(scenarios::test_connection(&user1, &user2, &mut sse1, &mut sse2).await?); + results.push( + scenarios::test_force_logout(&user1, &user2, &api_client, &mut sse1, &mut sse2) + .await?, + ); + let env = test_env + .as_ref() + .expect("Test environment required for All scenarios"); + results.push( + scenarios::test_action_create( + &user1, + &user2, + env, + &api_client, + &mut sse1, + &mut sse2, + ) + .await?, + ); + results.push( + scenarios::test_action_update( + &user1, + &user2, + env, + &api_client, + &mut sse1, + &mut sse2, + ) + .await?, + ); + results.push( + scenarios::test_action_delete( + &user1, + &user2, + env, + &api_client, + &mut sse1, + &mut sse2, + ) + .await?, + ); + } + } + + // Print summary + println!("\n{}", "=== RESULTS ===".bright_white().bold()); + print_test_summary(&results); + + let all_passed = results.iter().all(|r| r.passed); + + if all_passed { + println!("\n{}", "All tests passed! ✓".bright_green().bold()); + } else { + println!("\n{}", "Some tests failed! ✗".bright_red().bold()); + } + + std::process::exit(if all_passed { 0 } else { 1 }); +} diff --git a/sse-test-client/src/output.rs b/sse-test-client/src/output.rs new file mode 100644 index 00000000..ba5c89c3 --- /dev/null +++ b/sse-test-client/src/output.rs @@ -0,0 +1,59 @@ +use colored::*; +use std::time::Duration; + +use crate::sse_client::Event; + +#[derive(Debug)] +pub struct TestResult { + pub scenario: String, + pub passed: bool, + pub message: Option, + pub duration: Duration, +} + +pub fn print_event(user_label: &str, event: &Event) { + let label_colored = if user_label.contains("User 1") { + user_label.bright_blue() + } else { + user_label.bright_magenta() + }; + + println!( + "\n[{}] {} event received", + label_colored.bold(), + event.event_type.yellow() + ); + + if let Ok(pretty) = serde_json::to_string_pretty(&event.data) { + println!(" {}", pretty.dimmed()); + } +} + +pub fn print_test_summary(results: &[TestResult]) { + println!("\n{}", "=== TEST SUMMARY ===".bright_white().bold()); + + let total = results.len(); + let passed = results.iter().filter(|r| r.passed).count(); + let failed = total - passed; + + for result in results { + let status = if result.passed { + "PASS".green().bold() + } else { + "FAIL".red().bold() + }; + + println!("[{}] {} ({:?})", status, result.scenario, result.duration); + + if let Some(msg) = &result.message { + println!(" {}", msg.dimmed()); + } + } + + println!( + "\n{}: {} passed, {} failed", + "Results".bold(), + passed.to_string().green(), + failed.to_string().red() + ); +} diff --git a/sse-test-client/src/scenarios.rs b/sse-test-client/src/scenarios.rs new file mode 100644 index 00000000..bac62e72 --- /dev/null +++ b/sse-test-client/src/scenarios.rs @@ -0,0 +1,318 @@ +use anyhow::Result; +use colored::*; +use std::time::{Duration, Instant}; + +use crate::api_client::{ApiClient, TestEnvironment}; +use crate::auth::AuthenticatedUser; +use crate::output::{print_event, TestResult}; +use crate::sse_client::Connection; + +pub async fn test_action_create( + user1: &AuthenticatedUser, + _user2: &AuthenticatedUser, + test_env: &TestEnvironment, + api_client: &ApiClient, + _sse1: &mut Connection, + sse2: &mut Connection, +) -> Result { + let start = Instant::now(); + + println!("\n{}", "=== TEST: Action Create ===".bright_cyan().bold()); + + println!("{} User 1 creating action...", "→".blue()); + + let action = api_client + .create_action( + &user1.session_cookie, + &test_env.session_id, + "Test Action - Create", + ) + .await?; + + let action_id = action["id"].as_str().unwrap(); + println!("{} Action created (ID: {})", "✓".green(), action_id); + + println!( + "{} Waiting for User 2 to receive action_created event...", + "→".blue() + ); + + match sse2 + .wait_for_event("action_created", Duration::from_secs(5)) + .await + { + Ok(event) => { + print_event(&sse2.user_label, &event); + + let received_action_id = event.data["data"]["action"]["id"].as_str().unwrap(); + let received_session_id = event.data["data"]["coaching_session_id"].as_str().unwrap(); + + if received_action_id == action_id && received_session_id == test_env.session_id { + println!("{} Event data verified correctly", "✓".green()); + Ok(TestResult { + scenario: "action_create".to_string(), + passed: true, + message: None, + duration: start.elapsed(), + }) + } else { + println!("{} Event data mismatch!", "✗".red()); + Ok(TestResult { + scenario: "action_create".to_string(), + passed: false, + message: Some(format!( + "Expected action_id={}, session_id={}, got action_id={}, session_id={}", + action_id, test_env.session_id, received_action_id, received_session_id + )), + duration: start.elapsed(), + }) + } + } + Err(e) => { + println!("{} Timeout waiting for event: {}", "✗".red(), e); + Ok(TestResult { + scenario: "action_create".to_string(), + passed: false, + message: Some(format!("Timeout: {}", e)), + duration: start.elapsed(), + }) + } + } +} + +pub async fn test_action_update( + user1: &AuthenticatedUser, + _user2: &AuthenticatedUser, + test_env: &TestEnvironment, + api_client: &ApiClient, + _sse1: &mut Connection, + sse2: &mut Connection, +) -> Result { + let start = Instant::now(); + + println!("\n{}", "=== TEST: Action Update ===".bright_cyan().bold()); + + // First create an action + println!("{} User 1 creating action...", "→".blue()); + let action = api_client + .create_action( + &user1.session_cookie, + &test_env.session_id, + "Test Action - Update", + ) + .await?; + + let action_id = action["id"].as_str().unwrap(); + + // Wait for and discard the create event + let _ = sse2 + .wait_for_event("action_created", Duration::from_secs(5)) + .await?; + + // Now update the action + println!("{} User 1 updating action...", "→".blue()); + api_client + .update_action(&user1.session_cookie, action_id, "Updated Title") + .await?; + + println!( + "{} Waiting for User 2 to receive action_updated event...", + "→".blue() + ); + + match sse2 + .wait_for_event("action_updated", Duration::from_secs(5)) + .await + { + Ok(event) => { + print_event(&sse2.user_label, &event); + + let received_title = event.data["data"]["action"]["title"].as_str().unwrap(); + + if received_title == "Updated Title" { + println!("{} Event data verified correctly", "✓".green()); + Ok(TestResult { + scenario: "action_update".to_string(), + passed: true, + message: None, + duration: start.elapsed(), + }) + } else { + Ok(TestResult { + scenario: "action_update".to_string(), + passed: false, + message: Some(format!("Title mismatch: {}", received_title)), + duration: start.elapsed(), + }) + } + } + Err(e) => Ok(TestResult { + scenario: "action_update".to_string(), + passed: false, + message: Some(format!("Timeout: {}", e)), + duration: start.elapsed(), + }), + } +} + +pub async fn test_action_delete( + user1: &AuthenticatedUser, + _user2: &AuthenticatedUser, + test_env: &TestEnvironment, + api_client: &ApiClient, + _sse1: &mut Connection, + sse2: &mut Connection, +) -> Result { + let start = Instant::now(); + + println!("\n{}", "=== TEST: Action Delete ===".bright_cyan().bold()); + + // Create action + let action = api_client + .create_action( + &user1.session_cookie, + &test_env.session_id, + "Test Action - Delete", + ) + .await?; + + let action_id = action["id"].as_str().unwrap(); + + // Discard create event + let _ = sse2 + .wait_for_event("action_created", Duration::from_secs(5)) + .await?; + + // Delete action + println!("{} User 1 deleting action...", "→".blue()); + api_client + .delete_action(&user1.session_cookie, action_id) + .await?; + + println!( + "{} Waiting for User 2 to receive action_deleted event...", + "→".blue() + ); + + match sse2 + .wait_for_event("action_deleted", Duration::from_secs(5)) + .await + { + Ok(event) => { + print_event(&sse2.user_label, &event); + + let received_action_id = event.data["data"]["action_id"].as_str().unwrap(); + + if received_action_id == action_id { + println!("{} Event data verified correctly", "✓".green()); + Ok(TestResult { + scenario: "action_delete".to_string(), + passed: true, + message: None, + duration: start.elapsed(), + }) + } else { + Ok(TestResult { + scenario: "action_delete".to_string(), + passed: false, + message: Some(format!("Action ID mismatch: {}", received_action_id)), + duration: start.elapsed(), + }) + } + } + Err(e) => Ok(TestResult { + scenario: "action_delete".to_string(), + passed: false, + message: Some(format!("Timeout: {}", e)), + duration: start.elapsed(), + }), + } +} + +pub async fn test_force_logout( + user1: &AuthenticatedUser, + user2: &AuthenticatedUser, + api_client: &ApiClient, + _sse1: &mut Connection, + sse2: &mut Connection, +) -> Result { + let start = Instant::now(); + + println!("\n{}", "=== TEST: Force Logout ===".bright_cyan().bold()); + + println!("{} User 1 forcing logout of User 2...", "→".blue()); + + api_client + .force_logout(&user1.session_cookie, &user2.user_id) + .await?; + + println!( + "{} Waiting for User 2 to receive force_logout event...", + "→".blue() + ); + + match sse2 + .wait_for_event("force_logout", Duration::from_secs(5)) + .await + { + Ok(event) => { + print_event(&sse2.user_label, &event); + println!("{} Event received correctly", "✓".green()); + Ok(TestResult { + scenario: "force_logout".to_string(), + passed: true, + message: None, + duration: start.elapsed(), + }) + } + Err(e) => Ok(TestResult { + scenario: "force_logout".to_string(), + passed: false, + message: Some(format!("Timeout: {}", e)), + duration: start.elapsed(), + }), + } +} + +pub async fn test_connection( + user1: &AuthenticatedUser, + user2: &AuthenticatedUser, + sse1: &mut Connection, + sse2: &mut Connection, +) -> Result { + let start = Instant::now(); + + println!("\n{}", "=== TEST: Connection Test ===".bright_cyan().bold()); + println!( + "{}", + "Testing basic SSE connectivity without creating any data".bright_white() + ); + + println!( + "{} User 1 ({}) SSE connection: established", + "✓".green(), + user1.user_id + ); + println!( + "{} User 2 ({}) SSE connection: established", + "✓".green(), + user2.user_id + ); + + // Wait a bit to ensure connections are stable + println!( + "{} Waiting 2 seconds to verify connections stay alive...", + "→".blue() + ); + tokio::time::sleep(Duration::from_secs(2)).await; + + println!("{} Connections remain stable", "✓".green()); + println!("{} SSE infrastructure is working correctly", "✓".green()); + + Ok(TestResult { + scenario: "connection_test".to_string(), + passed: true, + message: Some("SSE connections established and maintained successfully".to_string()), + duration: start.elapsed(), + }) +} diff --git a/sse-test-client/src/sse_client.rs b/sse-test-client/src/sse_client.rs new file mode 100644 index 00000000..cea4c684 --- /dev/null +++ b/sse-test-client/src/sse_client.rs @@ -0,0 +1,102 @@ +use anyhow::Result; +use eventsource_client::{self as es, Client}; +use futures_util::stream::StreamExt; +use log::*; +use serde_json::Value; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; + +#[derive(Debug, Clone)] +pub struct Event { + pub event_type: String, + pub data: Value, + pub timestamp: Instant, +} + +pub struct Connection { + pub user_label: String, + event_rx: mpsc::UnboundedReceiver, + _handle: tokio::task::JoinHandle<()>, +} + +impl Connection { + pub async fn establish( + base_url: &str, + session_cookie: &str, + user_label: String, + ) -> Result { + let url = format!("{}/sse", base_url); + let (tx, rx) = mpsc::unbounded_channel(); + + let client = es::ClientBuilder::for_url(&url)? + .header("Cookie", &format!("id={}", session_cookie))? + .build(); + + let label = user_label.clone(); + let handle = tokio::spawn(async move { + let mut stream = client.stream(); + + loop { + match stream.next().await { + Some(Ok(es::SSE::Event(event))) => { + if let Ok(data) = serde_json::from_str(&event.data) { + let sse_event = Event { + event_type: event.event_type, + data, + timestamp: Instant::now(), + }; + + if tx.send(sse_event).is_err() { + debug!("SSE receiver dropped for {}", label); + break; + } + } + } + Some(Ok(es::SSE::Comment(_))) => { + // Ignore comments (keep-alive) + } + Some(Err(e)) => { + warn!("SSE error for {}: {}", label, e); + } + None => { + debug!("SSE stream ended for {}", label); + break; + } + } + } + }); + + Ok(Self { + user_label, + event_rx: rx, + _handle: handle, + }) + } + + pub async fn wait_for_event(&mut self, event_type: &str, timeout: Duration) -> Result { + let deadline = Instant::now() + timeout; + + loop { + let remaining = deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + anyhow::bail!("Timeout waiting for event: {}", event_type); + } + + match tokio::time::timeout(remaining, self.event_rx.recv()).await { + Ok(Some(event)) if event.event_type == event_type => { + return Ok(event); + } + Ok(Some(_)) => { + // Wrong event type, keep waiting + continue; + } + Ok(None) => { + anyhow::bail!("SSE connection closed"); + } + Err(_) => { + anyhow::bail!("Timeout waiting for event: {}", event_type); + } + } + } + } +} diff --git a/sse/Cargo.toml b/sse/Cargo.toml new file mode 100644 index 00000000..0e2cd3fc --- /dev/null +++ b/sse/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "sse" +version = "1.0.0-beta2" +edition = "2021" + +[dependencies] +# SSE and async +axum = "0.7.7" +tokio = { version = "1.44.2", features = ["sync"] } +async-stream = "0.3" + +# Concurrency +dashmap = "6.1" + +# Serialization +serde = { version = "1.0.210", features = ["derive"] } +serde_json = "1.0.128" + +# Logging +log = "0.4.22" + +# UUID generation +uuid = { version = "1.6", features = ["v4"] } diff --git a/sse/src/connection.rs b/sse/src/connection.rs new file mode 100644 index 00000000..c01ee8a0 --- /dev/null +++ b/sse/src/connection.rs @@ -0,0 +1,153 @@ +use axum::response::sse::Event; +use dashmap::DashMap; +use log::*; +use std::collections::HashSet; +use std::convert::Infallible; +use tokio::sync::mpsc::UnboundedSender; + +// Type alias for user IDs (web layer converts domain::Id to String) +pub type UserId = String; + +/// Unique identifier for a connection (server-generated) +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ConnectionId(String); + +impl ConnectionId { + pub fn new() -> Self { + Self(uuid::Uuid::new_v4().to_string()) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl Default for ConnectionId { + fn default() -> Self { + Self::new() + } +} + +/// Connection information (no redundant connection_id) +#[derive(Debug, Clone)] +pub struct ConnectionInfo { + pub user_id: UserId, + pub sender: UnboundedSender>, +} + +/// High-performance connection registry with dual indices for O(1) lookups +pub struct ConnectionRegistry { + /// Primary storage: lookup by connection_id for registration/cleanup - O(1) + connections: DashMap, + + /// Secondary index: fast lookup by user_id for message routing - O(1) + user_index: DashMap>, +} + +impl ConnectionRegistry { + pub fn new() -> Self { + Self { + connections: DashMap::new(), + user_index: DashMap::new(), + } + } + + /// Register a new connection - O(1) + pub fn register( + &self, + user_id: UserId, + sender: UnboundedSender>, + ) -> ConnectionId { + let connection_id = ConnectionId::new(); + + // Insert into primary storage + self.connections.insert( + connection_id.clone(), + ConnectionInfo { + user_id: user_id.clone(), + sender, + }, + ); + + // Update secondary index + self.user_index + .entry(user_id) + .or_default() + .insert(connection_id.clone()); + + connection_id + } + + /// Unregister a connection - O(1) + pub fn unregister(&self, connection_id: &ConnectionId) { + // Remove from primary storage + if let Some((_, info)) = self.connections.remove(connection_id) { + let user_id = info.user_id; + + // Update secondary index + if let Some(mut entry) = self.user_index.get_mut(&user_id) { + entry.remove(connection_id); + + // Clean up empty user entries + if entry.is_empty() { + drop(entry); // Release lock before removal + self.user_index.remove(&user_id); + } + } + } + } + + /// Send message to specific user - O(1) lookup + O(k) send where k = user's connections + pub fn send_to_user(&self, user_id: &UserId, event: Event) { + if let Some(connection_ids) = self.user_index.get(user_id) { + for conn_id in connection_ids.iter() { + if let Some(info) = self.connections.get(conn_id) { + if let Err(e) = info.sender.send(Ok(event.clone())) { + warn!( + "Failed to send event to connection {}: {}. Connection will be cleaned up.", + conn_id.as_str(), + e + ); + } + } + } + } + } + + /// Broadcast message to all connections - O(n) (unavoidable, but explicit) + pub fn broadcast(&self, event: Event) { + for entry in self.connections.iter() { + if let Err(e) = entry.value().sender.send(Ok(event.clone())) { + warn!( + "Failed to send broadcast to connection {}: {}", + entry.key().as_str(), + e + ); + } + } + } + + /// Get total connection count - O(1) + pub fn connection_count(&self) -> usize { + self.connections.len() + } + + /// Get active user count - O(1) + pub fn active_user_count(&self) -> usize { + self.user_index.len() + } + + /// Get connections per user (for monitoring/debugging) - O(1) + pub fn connections_per_user(&self, user_id: &UserId) -> usize { + self.user_index + .get(user_id) + .map(|set| set.len()) + .unwrap_or(0) + } +} + +impl Default for ConnectionRegistry { + fn default() -> Self { + Self::new() + } +} diff --git a/sse/src/lib.rs b/sse/src/lib.rs new file mode 100644 index 00000000..e4675c39 --- /dev/null +++ b/sse/src/lib.rs @@ -0,0 +1,64 @@ +//! Server-Sent Events (SSE) infrastructure for real-time updates. +//! +//! This crate provides a type-safe, app-wide SSE implementation for pushing +//! real-time updates from the backend to authenticated users. +//! +//! # Architecture +//! +//! - **Single connection per user**: Each authenticated user establishes one +//! SSE connection that stays open across page navigation. +//! - **Dual-index registry**: O(1) lookups for both connection management and +//! user-scoped message routing via separate DashMap indices. +//! - **User and Broadcast scopes**: Messages can be sent to specific users or +//! broadcast to all connected users. +//! - **Ephemeral messages**: All events are ephemeral - if a user is offline, +//! they miss the event and see fresh data on next page load. +//! - **Type-safe events**: All event types are strongly typed for compile-time +//! safety and better frontend TypeScript integration. +//! +//! # Message Flow +//! +//! 1. Frontend establishes SSE connection via `/sse` endpoint +//! 2. Backend extracts user from session cookie (AuthenticatedUser) +//! 3. Connection registered in ConnectionRegistry with dual indices +//! 4. When a resource changes (e.g., action created): +//! - Controller determines recipient (e.g., other user in relationship) +//! - Controller sends message via `app_state.sse_manager.send_message()` +//! - Manager performs O(1) lookup in user_index to find connections +//! - Events sent only to matching connections +//! 5. Frontend receives event and updates UI based on context +//! +//! # Example: Sending an event +//! +//! ```rust,ignore +//! use sse::message::{Event as SseEvent, Message as SseMessage, MessageScope}; +//! +//! // In a controller after creating an action +//! app_state.sse_manager.send_message(SseMessage { +//! event: SseEvent::ActionCreated { +//! coaching_session_id, +//! action: action.clone(), +//! }, +//! scope: MessageScope::User { user_id: recipient_id }, +//! }); +//! ``` +//! +//! # Security Considerations +//! +//! - Authentication required (AuthenticatedUser extractor) +//! - Session cookie must be valid +//! - Backend determines recipients (not client-controlled) +//! - nginx configured for long-lived connections (24h timeout) +//! - Keep-alive messages prevent idle timeout +//! +//! # Modules +//! +//! - `connection`: ConnectionRegistry with dual-index architecture and type-safe ConnectionId +//! - `manager`: High-level message routing (delegates to ConnectionRegistry) +//! - `message`: Type-safe event and scope definitions + +pub mod connection; +pub mod manager; +pub mod message; + +pub use manager::Manager; diff --git a/sse/src/manager.rs b/sse/src/manager.rs new file mode 100644 index 00000000..c5603676 --- /dev/null +++ b/sse/src/manager.rs @@ -0,0 +1,78 @@ +use crate::connection::{ConnectionId, ConnectionRegistry, UserId}; +use crate::message::{EventType, Message as SseMessage, MessageScope}; +use axum::response::sse::Event; +use log::*; +use std::sync::Arc; + +pub struct Manager { + registry: Arc, +} + +impl Manager { + pub fn new() -> Self { + Self { + registry: Arc::new(ConnectionRegistry::new()), + } + } + + /// Register a new connection and return its unique ID + pub fn register_connection( + &self, + user_id: UserId, + sender: tokio::sync::mpsc::UnboundedSender>, + ) -> ConnectionId { + let connection_id = self.registry.register(user_id.clone(), sender); + debug!( + "Registered SSE connection {} for user {}", + connection_id.as_str(), + user_id + ); + connection_id + } + + /// Unregister a connection by ID + pub fn unregister_connection(&self, connection_id: &ConnectionId) { + debug!("Unregistering SSE connection {}", connection_id.as_str()); + self.registry.unregister(connection_id); + } + + /// Send a message based on its scope + pub fn send_message(&self, message: SseMessage) { + let event_type = message.event.event_type(); + + let event_data = match serde_json::to_string(&message.event) { + Ok(json) => json, + Err(e) => { + error!("Failed to serialize SSE event: {}", e); + return; + } + }; + + let event = Event::default().event(event_type).data(event_data); + + match message.scope { + MessageScope::User { user_id } => { + self.registry.send_to_user(&user_id, event); + } + MessageScope::Broadcast => { + self.registry.broadcast(event); + } + } + } + + /// Get total connection count + pub fn connection_count(&self) -> usize { + self.registry.connection_count() + } + + /// Get active user count + pub fn active_user_count(&self) -> usize { + self.registry.active_user_count() + } +} + +impl Default for Manager { + fn default() -> Self { + Self::new() + } +} diff --git a/sse/src/message.rs b/sse/src/message.rs new file mode 100644 index 00000000..7dc7a56f --- /dev/null +++ b/sse/src/message.rs @@ -0,0 +1,97 @@ +use serde::Serialize; +use serde_json::Value; + +/// Trait for getting the SSE event type name +pub trait EventType { + fn event_type(&self) -> &'static str; +} + +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", content = "data")] +pub enum Event { + // Actions (session-scoped) + #[serde(rename = "action_created")] + ActionCreated { + coaching_session_id: String, + action: Value, + }, + #[serde(rename = "action_updated")] + ActionUpdated { + coaching_session_id: String, + action: Value, + }, + #[serde(rename = "action_deleted")] + ActionDeleted { + coaching_session_id: String, + action_id: String, + }, + + // Agreements (relationship-scoped) + #[serde(rename = "agreement_created")] + AgreementCreated { + coaching_relationship_id: String, + agreement: Value, + }, + #[serde(rename = "agreement_updated")] + AgreementUpdated { + coaching_relationship_id: String, + agreement: Value, + }, + #[serde(rename = "agreement_deleted")] + AgreementDeleted { + coaching_relationship_id: String, + agreement_id: String, + }, + + // Overarching Goals (relationship-scoped) + #[serde(rename = "goal_created")] + GoalCreated { + coaching_relationship_id: String, + goal: Value, + }, + #[serde(rename = "goal_updated")] + GoalUpdated { + coaching_relationship_id: String, + goal: Value, + }, + #[serde(rename = "goal_deleted")] + GoalDeleted { + coaching_relationship_id: String, + goal_id: String, + }, + + // System events + #[serde(rename = "force_logout")] + ForceLogout { reason: String }, +} + +impl EventType for Event { + fn event_type(&self) -> &'static str { + match self { + Event::ActionCreated { .. } => "action_created", + Event::ActionUpdated { .. } => "action_updated", + Event::ActionDeleted { .. } => "action_deleted", + Event::AgreementCreated { .. } => "agreement_created", + Event::AgreementUpdated { .. } => "agreement_updated", + Event::AgreementDeleted { .. } => "agreement_deleted", + Event::GoalCreated { .. } => "goal_created", + Event::GoalUpdated { .. } => "goal_updated", + Event::GoalDeleted { .. } => "goal_deleted", + Event::ForceLogout { .. } => "force_logout", + } + } +} + +#[derive(Debug, Clone)] +pub struct Message { + pub event: Event, + pub scope: MessageScope, +} + +#[derive(Debug, Clone)] +pub enum MessageScope { + /// Send to all connections for a specific user + User { user_id: String }, + /// Send to all connected users + Broadcast, +} diff --git a/web/Cargo.toml b/web/Cargo.toml index ae6d05d4..23a7a6c7 100644 --- a/web/Cargo.toml +++ b/web/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] domain = { path = "../domain" } service = { path = "../service" } +sse = { path = "../sse" } axum = "0.7.7" axum-login = "0.16.0" @@ -25,6 +26,7 @@ time = "0.3.36" utoipa = { version = "4.2.0", features = ["axum_extras", "uuid"] } utoipa-rapidoc = { version = "3.0.0", features = ["axum"] } async-trait = "0.1.88" +async-stream = "0.3" futures = "0.3.31" [dependencies.sea-orm] diff --git a/web/src/lib.rs b/web/src/lib.rs index 4698166e..42371305 100644 --- a/web/src/lib.rs +++ b/web/src/lib.rs @@ -26,6 +26,7 @@ pub(crate) mod middleware; pub(crate) mod params; pub(crate) mod protect; mod router; +pub mod sse; pub async fn init_server(app_state: AppState) -> Result<()> { // Session layer diff --git a/web/src/middleware/auth.rs b/web/src/middleware/auth.rs index 7a4c255a..ac26527a 100644 --- a/web/src/middleware/auth.rs +++ b/web/src/middleware/auth.rs @@ -59,7 +59,8 @@ mod tests { let db = Arc::new( sea_orm::MockDatabase::new(sea_orm::DatabaseBackend::Postgres).into_connection(), ); - let app_state = crate::AppState::new(config, &db); + let sse_manager = Arc::new(sse::Manager::new()); + let app_state = crate::AppState::new(config, &db, sse_manager); // Set up session layer let session_store = MemoryStore::default(); @@ -88,7 +89,8 @@ mod tests { let db = Arc::new( sea_orm::MockDatabase::new(sea_orm::DatabaseBackend::Postgres).into_connection(), ); - let app_state = crate::AppState::new(config, &db); + let sse_manager = Arc::new(sse::Manager::new()); + let app_state = crate::AppState::new(config, &db, sse_manager); // Set up session layer let session_store = MemoryStore::default(); @@ -162,7 +164,8 @@ mod tests { .append_query_results([vec![(test_user.clone(), test_role.clone())]]) // For session user lookup .into_connection(), ); - let app_state = crate::AppState::new(config, &db); + let sse_manager = Arc::new(sse::Manager::new()); + let app_state = crate::AppState::new(config, &db, sse_manager); // Set up session layer let session_store = MemoryStore::default(); diff --git a/web/src/router.rs b/web/src/router.rs index 5550c30e..86307df6 100644 --- a/web/src/router.rs +++ b/web/src/router.rs @@ -13,6 +13,7 @@ use crate::controller::{ note_controller, organization, organization_controller, overarching_goal_controller, user, user_controller, user_session_controller, }; +use crate::sse; use utoipa::{ openapi::security::{ApiKey, ApiKeyValue, SecurityScheme}, @@ -118,6 +119,7 @@ impl Modify for SecurityAddon { pub fn define_routes(app_state: AppState) -> Router { Router::new() + .merge(sse_routes(app_state.clone())) .merge(action_routes(app_state.clone())) .merge(agreement_routes(app_state.clone())) .merge(health_routes()) @@ -508,6 +510,13 @@ fn user_overarching_goals_routes(app_state: AppState) -> Router { .with_state(app_state) } +fn sse_routes(app_state: AppState) -> Router { + Router::new() + .route("/sse", get(sse::handler::sse_handler)) + .route_layer(from_fn(require_auth)) + .with_state(app_state) +} + // This will serve static files that we can use as a "fallback" for when the server panics pub fn static_routes() -> Router { Router::new().nest_service("/", ServeDir::new("./")) diff --git a/web/src/sse/handler.rs b/web/src/sse/handler.rs new file mode 100644 index 00000000..b4c1607d --- /dev/null +++ b/web/src/sse/handler.rs @@ -0,0 +1,42 @@ +use crate::extractors::authenticated_user::AuthenticatedUser; +use async_stream::stream; +use axum::extract::State; +use axum::response::sse::{Event, KeepAlive, Sse}; +use futures::Stream; +use log::*; +use service::AppState; +use std::convert::Infallible; +use tokio::sync::mpsc; + +/// SSE handler that establishes a long-lived connection for real-time updates. +/// One connection per authenticated user, stays open across page navigation. +pub(crate) async fn sse_handler( + AuthenticatedUser(user): AuthenticatedUser, + State(app_state): State, +) -> Sse>> { + debug!("Establishing SSE connection for user {}", user.id); + + let (tx, mut rx) = mpsc::unbounded_channel(); + + // Register returns the connection_id (convert domain::Id to String) + let connection_id = app_state + .sse_manager + .register_connection(user.id.to_string(), tx); + + let manager = app_state.sse_manager.clone(); + let user_id = user.id; + + // Create the stream - events arrive from the channel + // The channel sends Result, so we just pass them through + let stream = stream! { + while let Some(event) = rx.recv().await { + yield event; + } + + // Connection closed, clean up + debug!("SSE connection closed for user {}, cleaning up", user_id); + manager.unregister_connection(&connection_id); + }; + + Sse::new(stream).keep_alive(KeepAlive::default()) +} diff --git a/web/src/sse/mod.rs b/web/src/sse/mod.rs new file mode 100644 index 00000000..42359930 --- /dev/null +++ b/web/src/sse/mod.rs @@ -0,0 +1,7 @@ +//! SSE HTTP handler for the web layer. +//! +//! This module contains only the Axum handler for SSE endpoints. +//! The core SSE infrastructure (Manager, ConnectionRegistry, Message types) +//! lives in the `sse` crate to avoid circular dependencies. + +pub mod handler;