Skip to content

Commit cb877cd

Browse files
committed
chore: integration test add realtime update
1 parent 164dff8 commit cb877cd

File tree

8 files changed

+103
-53
lines changed

8 files changed

+103
-53
lines changed

Cargo.lock

Lines changed: 19 additions & 17 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,5 @@ criterion = "0.3"
7070
rusty-hook = "^0.11.2"
7171
tokio = { version = "1", features = ["full"] }
7272
tracing-subscriber = "0.3"
73-
feature-probe-server = "1.3.3"
73+
feature-probe-server = { version = "1.3.5", features = ["realtime"] }
74+

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ release-test:
1212
cargo test --release --verbose --features async --no-default-features
1313
test:
1414
cargo test --verbose && \
15-
cargo test --verbose --features use_tokio --features internal --features event_tokio --no-default-features
15+
cargo test --verbose --features use_tokio --features internal --features event_tokio --no-default-features && \
16+
cargo test --verbose --features use_tokio --features internal --features event_tokio --features realtime --no-default-features
1617

src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl FPConfig {
7575
};
7676

7777
#[cfg(all(feature = "use_tokio", feature = "realtime"))]
78-
let realtime_url = match self.realtime_url {
78+
let realtime_url = match &self.realtime_url {
7979
None => {
8080
Url::parse(&(remote_url.clone() + "api/realtime")).expect("invalid realtime url")
8181
}

src/feature_probe.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ impl FeatureProbe {
282282
#[cfg(all(feature = "use_tokio", feature = "realtime"))]
283283
fn socket_on_connect(socket: socketio_rs::Socket, server_sdk_key: String) -> SocketCallback {
284284
let sdk_key = server_sdk_key;
285-
trace!("on conect: {:?}", sdk_key);
285+
trace!("socket_on_connect: {:?}", sdk_key);
286286
async move {
287287
if let Err(e) = socket
288288
.emit("register", serde_json::json!({ "sdk_key": sdk_key }))
@@ -296,10 +296,12 @@ impl FeatureProbe {
296296

297297
#[cfg(all(feature = "use_tokio", feature = "realtime"))]
298298
fn socket_on_update(slf: Self, payload: Option<socketio_rs::Payload>) -> SocketCallback {
299-
trace!("on update: {:?}", payload);
299+
use crate::sync::SyncType;
300+
301+
trace!("socket_on_update: {:?}", payload);
300302
async move {
301303
if let Some(syncer) = &slf.syncer {
302-
let _ = syncer.sync_now().await;
304+
let _ = syncer.sync_now(SyncType::Realtime).await;
303305
} else {
304306
tracing::warn!("socket receive update event, but no synchronizer");
305307
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod user;
77
pub use crate::config::FPConfig;
88
pub use crate::evaluate::{load_json, EvalDetail, Repository, Segment, Toggle};
99
pub use crate::feature_probe::FeatureProbe;
10+
pub use crate::sync::SyncType;
1011
pub use crate::user::FPUser;
1112
use headers::{Error, Header, HeaderName, HeaderValue};
1213
use http::header::AUTHORIZATION;

src/sync.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,19 @@ use tracing::trace;
1010
use tracing::{debug, error};
1111
use url::Url;
1212

13-
pub type UpdateCallback = Box<dyn Fn(Repository, Repository) + Send>;
13+
pub type UpdateCallback = Box<dyn Fn(Repository, Repository, SyncType) + Send>;
1414

1515
#[derive(Debug, Clone)]
1616
pub struct Synchronizer {
1717
inner: Arc<Inner>,
1818
}
1919

20+
#[derive(Debug)]
21+
pub enum SyncType {
22+
Realtime,
23+
Polling,
24+
}
25+
2026
struct Inner {
2127
toggles_url: Url,
2228
refresh_interval: Duration,
@@ -77,7 +83,9 @@ impl Synchronizer {
7783

7884
let is_timeout = Self::init_timeout_fn(start_wait, interval_duration, start);
7985
std::thread::spawn(move || loop {
80-
if let Some(r) = Self::should_send(inner.sync_now(), &is_timeout, is_send) {
86+
if let Some(r) =
87+
Self::should_send(inner.sync_now(SyncType::Polling), &is_timeout, is_send)
88+
{
8189
is_send = true;
8290
let _ = tx.try_send(r);
8391
}
@@ -105,7 +113,7 @@ impl Synchronizer {
105113
tokio::spawn(async move {
106114
let mut interval = tokio::time::interval(inner.refresh_interval);
107115
loop {
108-
let result = inner.sync_now().await;
116+
let result = inner.sync_now(SyncType::Polling).await;
109117

110118
if let Some(r) = Self::should_send(result, &is_timeout, is_send) {
111119
is_send = true;
@@ -135,8 +143,8 @@ impl Synchronizer {
135143
}
136144

137145
#[cfg(test)]
138-
fn notify_update(&self, old_repo: Repository, new_repo: Repository) {
139-
self.inner.notify_update(old_repo, new_repo)
146+
fn notify_update(&self, old_repo: Repository, new_repo: Repository, t: SyncType) {
147+
self.inner.notify_update(old_repo, new_repo, t)
140148
}
141149

142150
fn init_timeout_fn(
@@ -172,17 +180,17 @@ impl Synchronizer {
172180
}
173181

174182
#[cfg(all(feature = "use_tokio", feature = "realtime"))]
175-
pub async fn sync_now(&self) -> Result<(), FPError> {
176-
self.inner.sync_now().await
183+
pub async fn sync_now(&self, t: SyncType) -> Result<(), FPError> {
184+
self.inner.sync_now(t).await
177185
}
178186
}
179187

180188
impl Inner {
181189
#[cfg(feature = "use_tokio")]
182-
pub async fn sync_now(&self) -> Result<(), FPError> {
190+
pub async fn sync_now(&self, t: SyncType) -> Result<(), FPError> {
183191
use http::header::USER_AGENT;
184192

185-
trace!("sync now {:?}", self.auth);
193+
trace!("sync_now {:?} {:?}", self.auth, t);
186194
let mut request = self
187195
.client
188196
.request(Method::GET, self.toggles_url.clone())
@@ -213,7 +221,7 @@ impl Inner {
213221
let old = (*repo).clone();
214222
let new = r.clone();
215223
*repo = r;
216-
self.notify_update(old, new);
224+
self.notify_update(old, new, t);
217225
}
218226
let mut is_init = self.is_init.write();
219227
*is_init = true;
@@ -225,8 +233,8 @@ impl Inner {
225233
}
226234

227235
#[cfg(feature = "use_std")]
228-
pub fn sync_now(&self) -> Result<(), FPError> {
229-
trace!("sync_now {:?}", self.auth);
236+
pub fn sync_now(&self, t: SyncType) -> Result<(), FPError> {
237+
trace!("sync_now {:?}, {:?}", self.auth, t);
230238
//TODO: report failure
231239
let mut request = ureq::get(self.toggles_url.as_str())
232240
.set(
@@ -258,7 +266,7 @@ impl Inner {
258266
let old = (*repo).clone();
259267
let new = r.clone();
260268
*repo = r;
261-
self.notify_update(old, new);
269+
self.notify_update(old, new, t);
262270
}
263271
let mut is_init = self.is_init.write();
264272
*is_init = true;
@@ -270,10 +278,10 @@ impl Inner {
270278
}
271279
}
272280

273-
fn notify_update(&self, old_repo: Repository, new_repo: Repository) {
281+
fn notify_update(&self, old_repo: Repository, new_repo: Repository, t: SyncType) {
274282
let lock = self.update_callback.lock();
275283
if let Some(cb) = &*lock {
276-
cb(old_repo, new_repo)
284+
cb(old_repo, new_repo, t)
277285
}
278286
}
279287
}
@@ -291,10 +299,10 @@ mod tests {
291299
let mut syncer = build_synchronizer(9000);
292300
let (tx, rx) = channel();
293301

294-
syncer.set_update_callback(Box::new(move |_old, _new| tx.send(()).unwrap()));
302+
syncer.set_update_callback(Box::new(move |_old, _new, _| tx.send(()).unwrap()));
295303
let old = Repository::default();
296304
let new = Repository::default();
297-
syncer.notify_update(old, new);
305+
syncer.notify_update(old, new, SyncType::Polling);
298306

299307
assert!(rx.try_recv().is_ok())
300308
}

tests/integration_test.rs

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,53 @@
11
use std::{sync::Arc, time::Duration};
22

33
use feature_probe_server::{
4-
http::{serve_http, FpHttpHandler, LocalFileHttpHandler},
4+
http::{serve_http, FpHttpHandler, LocalFileHttpHandlerForTest},
5+
realtime::RealtimeSocket,
56
repo::SdkRepository,
67
ServerConfig,
78
};
8-
use feature_probe_server_sdk::{FPConfig, FPUser, FeatureProbe, Url};
9+
use feature_probe_server_sdk::{FPConfig, FPUser, FeatureProbe, SyncType, Url};
10+
use parking_lot::Mutex;
11+
use tracing::info;
912

1013
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1114
async fn integration_test() {
12-
// let _ = tracing_subscriber::fmt().init();
15+
// let _ = tracing_subscriber::fmt()
16+
// .with_env_filter("feature_probe_server_sdk=trace,integration=trace")
17+
// .pretty()
18+
// .init();
1319

1420
let api_port = 19980;
1521
let server_port = 19990;
16-
setup_server(api_port, server_port).await;
22+
let realtime_port = 19999;
23+
setup_server(api_port, server_port, realtime_port).await;
1724

1825
let config = FPConfig {
1926
remote_url: Url::parse(&format!("http://127.0.0.1:{}", server_port)).unwrap(),
2027
server_sdk_key: "server-sdk-key1".to_owned(),
21-
refresh_interval: Duration::from_secs(1),
28+
refresh_interval: Duration::from_secs(2),
2229
start_wait: Some(Duration::from_secs(5)),
30+
#[cfg(feature = "realtime")]
31+
realtime_url: Some(Url::parse(&format!("http://127.0.0.1:{}", realtime_port)).unwrap()),
2332
..Default::default()
2433
};
2534

26-
let fp = FeatureProbe::new(config);
35+
let mut fp = FeatureProbe::new(config);
36+
37+
let did_update = {
38+
let did_update = Arc::new(Mutex::new((false, false)));
39+
let did_update_clone = did_update.clone();
40+
41+
fp.set_update_callback(Box::new(move |_old, _new, t| {
42+
let mut lock = did_update_clone.lock();
43+
match t {
44+
SyncType::Realtime => lock.0 = true,
45+
SyncType::Polling => lock.1 = true,
46+
};
47+
}));
48+
49+
did_update
50+
};
2751

2852
let user = FPUser::new();
2953

@@ -32,14 +56,22 @@ async fn integration_test() {
3256
assert!(fp.initialized());
3357

3458
let b = fp.bool_detail("bool_toggle", &user, false);
35-
assert_eq!(b.value, true);
59+
assert!(b.value);
60+
61+
tokio::time::sleep(Duration::from_millis(3000)).await;
62+
let lock = did_update.lock();
63+
#[cfg(feature = "realtime")]
64+
assert!(lock.0);
65+
#[cfg(not(feature = "realtime"))]
66+
assert!(lock.1);
3667
}
3768

38-
async fn setup_server(api_port: u16, server_port: u16) {
69+
async fn setup_server(api_port: u16, server_port: u16, realtime_port: u16) {
70+
let mut mock_api = LocalFileHttpHandlerForTest::default();
71+
mock_api.version_update = true;
3972
// mock fp api
40-
tokio::spawn(serve_http::<LocalFileHttpHandler>(
41-
api_port,
42-
LocalFileHttpHandler {},
73+
tokio::spawn(serve_http::<LocalFileHttpHandlerForTest>(
74+
api_port, mock_api,
4375
));
4476

4577
let server_sdk_key = "server-sdk-key1".to_owned();
@@ -55,15 +87,18 @@ async fn setup_server(api_port: u16, server_port: u16) {
5587
.parse()
5688
.unwrap();
5789
let refresh_interval = Duration::from_secs(1);
58-
let repo = SdkRepository::new(ServerConfig {
90+
let config = ServerConfig {
5991
toggles_url,
6092
server_port,
93+
realtime_port,
6194
refresh_interval,
6295
keys_url: None,
6396
events_url: events_url.clone(),
6497
client_sdk_key: Some(client_sdk_key.clone()),
6598
server_sdk_key: Some(server_sdk_key.clone()),
66-
});
99+
};
100+
let realtime_socket = RealtimeSocket::serve(config.realtime_port);
101+
let repo = SdkRepository::new(config, realtime_socket);
67102
repo.sync(client_sdk_key, server_sdk_key, 1);
68103
let repo = Arc::new(repo);
69104
let feature_probe_server = FpHttpHandler {

0 commit comments

Comments
 (0)