Skip to content

Commit 58fcd33

Browse files
emmaling27Convex, Inc.
authored andcommitted
Add client and server ts information to websocket messages to surface websocket latency issues (#40827)
This PR adds the `clientTs` to the initial websocket connect message, which the sync worker uses to calculate the `client_clock_skew`, which is the difference between the client clock and server clock, plus whatever latency between client sending and server receiving the message. We send the latest possible `serverTs` before the server `Transition` message gets sent down to the client. On the client, we can then understand how long it took for the `Transition` message to download by comparing the current client time with the `serverTs` and subtracting the `clientClockSkew`. GitOrigin-RevId: 9b8d2693a40196a78ee08aa3e2f49c695d0172d0
1 parent c3a91b2 commit 58fcd33

File tree

12 files changed

+78
-2
lines changed

12 files changed

+78
-2
lines changed

crates/convex/src/base_client/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,8 @@ impl RemoteQuerySet {
277277
start_version,
278278
end_version,
279279
modifications,
280+
client_clock_skew: _,
281+
server_ts: _,
280282
} = transition
281283
else {
282284
panic!("not transition");

crates/convex/src/client/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,8 @@ pub mod tests {
565565
log_lines: LogLinesMessage(vec![]),
566566
})
567567
.collect(),
568+
client_clock_skew: None,
569+
server_ts: None,
568570
},
569571
end_version,
570572
)
@@ -719,6 +721,7 @@ pub mod tests {
719721
connection_count: 0,
720722
last_close_reason: "InitialConnect".to_string(),
721723
max_observed_timestamp: None,
724+
client_ts: None,
722725
},
723726
ClientMessage::ModifyQuerySet {
724727
base_version: 0,
@@ -852,6 +855,7 @@ pub mod tests {
852855
connection_count: 0,
853856
last_close_reason: "InitialConnect".to_string(),
854857
max_observed_timestamp: None,
858+
client_ts: None,
855859
},
856860
ClientMessage::ModifyQuerySet {
857861
base_version: 0,
@@ -912,6 +916,7 @@ pub mod tests {
912916
connection_count: 0,
913917
last_close_reason: "InitialConnect".to_string(),
914918
max_observed_timestamp: None,
919+
client_ts: None,
915920
},
916921
ClientMessage::ModifyQuerySet {
917922
base_version: 0,

crates/convex/src/sync/testing.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ impl SyncProtocol for TestProtocolManager {
8383
connection_count,
8484
last_close_reason: "InitialConnect".to_string(),
8585
max_observed_timestamp: None,
86+
client_ts: None,
8687
})
8788
.await?;
8889

crates/convex/src/sync/web_socket_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ impl WebSocketInternal {
341341
connection_count,
342342
last_close_reason,
343343
max_observed_timestamp,
344+
client_ts: Some(0),
344345
};
345346
let msg = Message::Text(
346347
serde_json::Value::try_from(message)

crates/convex/sync_types/src/json.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,10 @@ enum ClientMessageJson {
173173
#[serde(default)]
174174
#[serde(skip_serializing_if = "Option::is_none")]
175175
max_observed_timestamp: Option<String>,
176+
177+
#[serde(default)]
178+
#[serde(skip_serializing_if = "Option::is_none")]
179+
client_ts: Option<i64>,
176180
},
177181
#[serde(rename_all = "camelCase")]
178182
ModifyQuerySet {
@@ -219,11 +223,13 @@ impl TryFrom<ClientMessage> for JsonValue {
219223
connection_count,
220224
last_close_reason,
221225
max_observed_timestamp,
226+
client_ts,
222227
} => ClientMessageJson::Connect {
223228
session_id: format!("{}", session_id.as_hyphenated()),
224229
connection_count,
225230
last_close_reason: Some(last_close_reason),
226231
max_observed_timestamp: max_observed_timestamp.map(|ts| u64_to_string(ts.into())),
232+
client_ts: client_ts.map(|ts| ts as i64),
227233
},
228234
ClientMessage::ModifyQuerySet {
229235
base_version,
@@ -303,6 +309,7 @@ impl TryFrom<JsonValue> for ClientMessage {
303309
connection_count,
304310
last_close_reason,
305311
max_observed_timestamp,
312+
client_ts,
306313
} => ClientMessage::Connect {
307314
session_id: session_id.parse()?,
308315
connection_count,
@@ -312,6 +319,7 @@ impl TryFrom<JsonValue> for ClientMessage {
312319
.transpose()?
313320
.map(Timestamp::try_from)
314321
.transpose()?,
322+
client_ts: client_ts.map(|ts| ts as u64),
315323
},
316324
ClientMessageJson::ModifyQuerySet {
317325
base_version,
@@ -523,11 +531,15 @@ impl<V: Into<JsonValue>> From<ServerMessage<V>> for JsonValue {
523531
start_version,
524532
end_version,
525533
modifications,
534+
client_clock_skew,
535+
server_ts,
526536
} => json!({
527537
"type": "Transition",
528538
"startVersion": JsonValue::from(start_version),
529539
"endVersion": JsonValue::from(end_version),
530540
"modifications": modifications.into_iter().map(JsonValue::from).collect::<Vec<JsonValue>>(),
541+
"clientClockSkew": JsonValue::from(client_clock_skew),
542+
"serverTs": JsonValue::from(server_ts),
531543
}),
532544
ServerMessage::MutationResponse {
533545
request_id,
@@ -634,6 +646,8 @@ impl<V: TryFrom<JsonValue, Error = anyhow::Error>> TryFrom<JsonValue> for Server
634646
start_version: JsonValue,
635647
end_version: JsonValue,
636648
modifications: Vec<JsonValue>,
649+
client_clock_skew: Option<i64>,
650+
server_ts: Option<i64>,
637651
},
638652
#[serde(rename_all = "camelCase")]
639653
MutationResponse {
@@ -671,13 +685,17 @@ impl<V: TryFrom<JsonValue, Error = anyhow::Error>> TryFrom<JsonValue> for Server
671685
start_version,
672686
end_version,
673687
modifications,
688+
client_clock_skew,
689+
server_ts,
674690
} => ServerMessage::Transition {
675691
start_version: start_version.try_into()?,
676692
end_version: end_version.try_into()?,
677693
modifications: modifications
678694
.into_iter()
679695
.map(|sm: JsonValue| sm.try_into())
680696
.collect::<anyhow::Result<Vec<StateModification<V>>>>()?,
697+
client_clock_skew,
698+
server_ts: server_ts.map(Timestamp::try_from).transpose()?,
681699
},
682700
ServerMessageJson::MutationResponse {
683701
request_id,
@@ -1054,6 +1072,8 @@ mod tests {
10541072
journal: None,
10551073
error_data: Some(TestValue(JsonValue::Null)),
10561074
}],
1075+
client_clock_skew: Some(1),
1076+
server_ts: Some(Timestamp::must(1)),
10571077
});
10581078
}
10591079
}

crates/convex/sync_types/src/types.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ pub enum ClientMessage {
108108
connection_count: u32,
109109
last_close_reason: String,
110110
max_observed_timestamp: Option<Timestamp>,
111+
client_ts: Option<u64>,
111112
},
112113
ModifyQuerySet {
113114
base_version: QuerySetVersion,
@@ -308,6 +309,12 @@ pub enum ServerMessage<V: 'static> {
308309
proptest(strategy = "prop::collection::vec(any::<StateModification<V>>(), 0..8)")
309310
)]
310311
modifications: Vec<StateModification<V>>,
312+
/// The difference between the timestamp in `ClientMessage::Connect` and
313+
/// the timestamp when the client message was received by the server.
314+
client_clock_skew: Option<i64>,
315+
/// The timestamp right before this message was sent back to the
316+
/// client.
317+
server_ts: Option<Timestamp>,
311318
},
312319
MutationResponse {
313320
request_id: SessionRequestSeqNumber,
@@ -333,6 +340,17 @@ pub enum ServerMessage<V: 'static> {
333340
Ping,
334341
}
335342

343+
impl<V: 'static> ServerMessage<V> {
344+
pub fn inject_server_ts(&mut self, ts: Timestamp) {
345+
match self {
346+
Self::Transition {
347+
ref mut server_ts, ..
348+
} => *server_ts = Some(ts),
349+
_ => {},
350+
}
351+
}
352+
}
353+
336354
#[derive(Clone, Debug, Eq, PartialEq)]
337355
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
338356
pub enum ErrorPayload<V: 'static> {

crates/local_backend/src/subs/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,13 @@ async fn run_sync_socket(
214214
}
215215
},
216216
maybe_message = server_rx.next().fuse() => {
217-
let (message, send_time) = match maybe_message {
217+
let (mut message, send_time) = match maybe_message {
218218
Some(m) => m,
219219
None => break 'top,
220220
};
221221
let delay = st.runtime.monotonic_now() - send_time;
222222
log_websocket_message_out(&message, delay);
223+
message.inject_server_ts(st.runtime.generate_timestamp()?);
223224
let serialized = serde_json::to_string(&JsonValue::from(message))?;
224225
if tx.send(Message::Text(serialized.into())).await.is_err() {
225226
break 'top;

crates/sync/src/tests.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ impl<RT: Runtime> SyncTest<RT> {
174174
connection_count: 0,
175175
last_close_reason: "InitialConnect".to_string(),
176176
max_observed_timestamp,
177+
client_ts: None,
177178
},
178179
self.rt.monotonic_now(),
179180
))?;
@@ -303,6 +304,7 @@ async fn test_basic_account(rt: TestRuntime) -> anyhow::Result<()> {
303304
start_version: start,
304305
end_version: end,
305306
modifications,
307+
..
306308
} = sync_worker.receive().await?);
307309
assert_eq!(start.query_set, 0);
308310
assert_eq!(end.query_set, 1);
@@ -328,6 +330,7 @@ async fn test_basic_account(rt: TestRuntime) -> anyhow::Result<()> {
328330
start_version: start,
329331
end_version: end,
330332
modifications,
333+
..
331334
} = sync_worker.receive().await?);
332335
assert_eq!(start.query_set, 1);
333336
assert_eq!(end.query_set, 1);
@@ -355,7 +358,8 @@ async fn test_basic_account(rt: TestRuntime) -> anyhow::Result<()> {
355358
must_let!(let ServerMessage::Transition {
356359
start_version: start,
357360
end_version: end,
358-
modifications
361+
modifications,
362+
..
359363
} = sync_worker.receive().await?);
360364
assert_eq!(start.query_set, 1);
361365
assert_eq!(end.query_set, 2);
@@ -380,6 +384,7 @@ async fn test_basic_account(rt: TestRuntime) -> anyhow::Result<()> {
380384
start_version: start,
381385
end_version: end,
382386
modifications,
387+
..
383388
} = sync_worker.receive().await?);
384389
assert_eq!(start.query_set, 2);
385390
assert_eq!(end.query_set, 2);
@@ -406,6 +411,7 @@ async fn test_basic_account(rt: TestRuntime) -> anyhow::Result<()> {
406411
start_version: start,
407412
end_version: end,
408413
modifications,
414+
..
409415
} = sync_worker.receive().await?);
410416
assert_eq!(start.query_set, 2);
411417
assert_eq!(end.query_set, 3);
@@ -450,6 +456,8 @@ async fn test_remove_in_progress_query(rt: TestRuntime) -> anyhow::Result<()> {
450456
start_version: start,
451457
end_version: end,
452458
modifications,
459+
client_clock_skew: _,
460+
server_ts: _,
453461
} = sync_worker.receive().await?);
454462
assert_eq!(start.query_set, 0);
455463
assert_eq!(end.query_set, 1);

crates/sync/src/worker.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,10 @@ pub struct SyncWorker<RT: Runtime> {
247247

248248
on_connect: Option<(StatusTimer, Box<dyn FnOnce(SessionId) + Send>)>,
249249
partition_id: u64,
250+
251+
/// The difference between the client's clock and the server's clock, in
252+
/// milliseconds. Includes latency between the client and server.
253+
client_clock_skew: Option<i64>,
250254
}
251255

252256
enum QueryResult {
@@ -300,6 +304,7 @@ impl<RT: Runtime> SyncWorker<RT> {
300304
modify_query_to_transition_timers: BTreeMap::new(),
301305
on_connect: Some((connect_timer(partition_id), on_connect)),
302306
partition_id,
307+
client_clock_skew: None,
303308
}
304309
}
305310

@@ -470,11 +475,18 @@ impl<RT: Runtime> SyncWorker<RT> {
470475
last_close_reason,
471476
max_observed_timestamp,
472477
connection_count,
478+
client_ts,
473479
} => {
474480
if let Some((timer, on_connect)) = self.on_connect.take() {
475481
timer.finish();
476482
on_connect(session_id);
477483
}
484+
485+
if let Some(ts) = client_ts {
486+
self.client_clock_skew =
487+
Some(ts as i64 - self.rt.unix_timestamp().as_ms_since_epoch()? as i64);
488+
}
489+
478490
self.state.set_session_id(session_id);
479491
if let Some(max_observed_timestamp) = max_observed_timestamp {
480492
let latest_timestamp = *self
@@ -1001,6 +1013,8 @@ impl<RT: Runtime> SyncWorker<RT> {
10011013
start_version: current_version,
10021014
end_version: new_version,
10031015
modifications: state_modifications.into_values().collect(),
1016+
client_clock_skew: self.client_clock_skew,
1017+
server_ts: None,
10041018
};
10051019
timer.finish();
10061020
metrics::log_query_set_size(self.partition_id, self.state.num_queries());

crates/value/src/heap_size.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,8 @@ impl<V: HeapSize> HeapSize for ServerMessage<V> {
928928
start_version,
929929
end_version,
930930
modifications,
931+
client_clock_skew: _,
932+
server_ts: _,
931933
} => {
932934
start_version.heap_size()
933935
+ end_version.heap_size()

0 commit comments

Comments
 (0)