Skip to content

Commit 56706ac

Browse files
committed
refactor(sdk) Introduce RoomEventCacheStateLock and read/write guards.
This patch extracts fields from `RoomEventCacheState` and move them into `RoomEventCacheStateLock`. This lock provides 2 methods: `read` and `write`, respectively to acquire a read-only lock, and a write-only lock, represented by the `RoomEventCacheStateLockReadGuard` and the `RoomEventCacheStateLockWriteGuard` types. All “public” methods on `RoomEventCacheState` now are facade to the read and write guards. This refactoring makes the code to compile with the last change in `EventCacheStore::lock`, which now returns a `EventCacheStoreLockState`. The next step is to re-load `RoomEventCacheStateLock` when the lock is dirty! But before doing that, we need this new mechanism to centralise the management of the store lock.
1 parent 8b612cc commit 56706ac

File tree

25 files changed

+1205
-893
lines changed

25 files changed

+1205
-893
lines changed

benchmarks/benches/event_cache.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ fn find_event_relations(c: &mut Criterion) {
317317
let (target, relations) = room_event_cache
318318
.find_event_with_relations(target_event_id, filter)
319319
.await
320+
.unwrap()
320321
.unwrap();
321322
assert_eq!(target.event_id().as_deref().unwrap(), target_event_id);
322323
assert_eq!(relations.len(), num_related_events as usize);

benchmarks/benches/room_bench.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) {
181181
.lock()
182182
.await
183183
.unwrap()
184+
.as_clean()
185+
.unwrap()
184186
.clear_all_linked_chunks()
185187
.await
186188
.unwrap();

crates/matrix-sdk-ui/src/timeline/builder.rs

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ impl TimelineBuilder {
164164
room.client().event_cache().subscribe()?;
165165

166166
let (room_event_cache, event_cache_drop) = room.event_cache().await?;
167-
let (_, event_subscriber) = room_event_cache.subscribe().await;
167+
let (_, event_subscriber) = room_event_cache.subscribe().await.unwrap();
168168

169169
let is_room_encrypted = room
170170
.latest_encryption_state()
@@ -209,36 +209,36 @@ impl TimelineBuilder {
209209
.instrument(span)
210210
});
211211

212-
let thread_update_join_handle = if let TimelineFocus::Thread { root_event_id: root } =
213-
&focus
214-
{
215-
Some({
216-
let span = info_span!(
217-
parent: Span::none(),
218-
"thread_live_update_handler",
219-
room_id = ?room.room_id(),
220-
focus = focus.debug_string(),
221-
prefix = internal_id_prefix
222-
);
223-
span.follows_from(Span::current());
224-
225-
// Note: must be done here *before* spawning the task, to avoid race conditions
226-
// with event cache updates happening in the background.
227-
let (_events, receiver) = room_event_cache.subscribe_to_thread(root.clone()).await;
228-
229-
spawn(
230-
thread_updates_task(
231-
receiver,
232-
room_event_cache.clone(),
233-
controller.clone(),
234-
root.clone(),
212+
let thread_update_join_handle =
213+
if let TimelineFocus::Thread { root_event_id: root } = &focus {
214+
Some({
215+
let span = info_span!(
216+
parent: Span::none(),
217+
"thread_live_update_handler",
218+
room_id = ?room.room_id(),
219+
focus = focus.debug_string(),
220+
prefix = internal_id_prefix
221+
);
222+
span.follows_from(Span::current());
223+
224+
// Note: must be done here *before* spawning the task, to avoid race conditions
225+
// with event cache updates happening in the background.
226+
let (_events, receiver) =
227+
room_event_cache.subscribe_to_thread(root.clone()).await.unwrap();
228+
229+
spawn(
230+
thread_updates_task(
231+
receiver,
232+
room_event_cache.clone(),
233+
controller.clone(),
234+
root.clone(),
235+
)
236+
.instrument(span),
235237
)
236-
.instrument(span),
237-
)
238-
})
239-
} else {
240-
None
241-
};
238+
})
239+
} else {
240+
None
241+
};
242242

243243
let local_echo_listener_handle = {
244244
let timeline_controller = controller.clone();

crates/matrix-sdk-ui/src/timeline/controller/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ impl<P: RoomDataProvider> TimelineController<P> {
446446
match focus {
447447
TimelineFocus::Live { .. } => {
448448
// Retrieve the cached events, and add them to the timeline.
449-
let events = room_event_cache.events().await;
449+
let events = room_event_cache.events().await?;
450450

451451
let has_events = !events.is_empty();
452452

@@ -556,7 +556,8 @@ impl<P: RoomDataProvider> TimelineController<P> {
556556
}
557557

558558
TimelineFocus::Thread { root_event_id, .. } => {
559-
let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await;
559+
let (events, _) =
560+
room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
560561
let has_events = !events.is_empty();
561562

562563
// For each event, we also need to find the related events, as they don't
@@ -565,7 +566,7 @@ impl<P: RoomDataProvider> TimelineController<P> {
565566
let mut related_events = Vector::new();
566567
for event_id in events.iter().filter_map(|event| event.event_id()) {
567568
if let Some((_original, related)) =
568-
room_event_cache.find_event_with_relations(&event_id, None).await
569+
room_event_cache.find_event_with_relations(&event_id, None).await?
569570
{
570571
related_events.extend(related);
571572
}

crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ impl PinnedEventsRoom for Room {
173173
Box::pin(async move {
174174
if let Ok((cache, _handles)) = self.event_cache().await
175175
&& let Some(ret) =
176-
cache.find_event_with_relations(event_id, related_event_filters).await
176+
cache.find_event_with_relations(event_id, related_event_filters).await?
177177
{
178178
debug!("Loaded pinned event {event_id} and related events from cache");
179179
return Ok(ret);

crates/matrix-sdk-ui/src/timeline/tasks.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use matrix_sdk::{
2828
use ruma::OwnedEventId;
2929
use tokio::sync::broadcast::{Receiver, error::RecvError};
3030
use tokio_stream::StreamExt as _;
31-
use tracing::{instrument, trace, warn};
31+
use tracing::{error, instrument, trace, warn};
3232

3333
use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEventOrigin};
3434

@@ -93,7 +93,14 @@ pub(in crate::timeline) async fn thread_updates_task(
9393
// The updates might have lagged, but the room event cache might
9494
// have events, so retrieve them and add them back again to the
9595
// timeline, after clearing it.
96-
let (initial_events, _) = room_event_cache.subscribe_to_thread(root.clone()).await;
96+
let (initial_events, _) =
97+
match room_event_cache.subscribe_to_thread(root.clone()).await {
98+
Ok(values) => values,
99+
Err(err) => {
100+
error!(?err, "Subscribing to thread failed");
101+
break;
102+
}
103+
};
97104

98105
timeline_controller
99106
.replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
@@ -145,7 +152,16 @@ pub(in crate::timeline) async fn room_event_cache_updates_task(
145152
// The updates might have lagged, but the room event cache might have
146153
// events, so retrieve them and add them back again to the timeline,
147154
// after clearing it.
148-
let initial_events = room_event_cache.events().await;
155+
let initial_events = match room_event_cache.events().await {
156+
Ok(initial_events) => initial_events,
157+
Err(err) => {
158+
error!(
159+
?err,
160+
"Failed to replace the initial remote events in the event cache"
161+
);
162+
break;
163+
}
164+
};
149165

150166
timeline_controller
151167
.replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)

crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ async fn test_an_utd_from_the_event_cache_as_an_initial_item_is_decrypted() {
7373
// The item is an encrypted event! It has been stored before having a chance to
7474
// be decrypted. Damn. We want to see if decryption will trigger automatically.
7575
event_cache_store
76+
.as_clean()
77+
.unwrap()
7678
.handle_linked_chunk_updates(
7779
LinkedChunkId::Room(room_id),
7880
vec![
@@ -212,6 +214,8 @@ async fn test_an_utd_from_the_event_cache_as_a_paginated_item_is_decrypted() {
212214
// chance to be decrypted. Damn. We want to see if decryption will trigger
213215
// automatically.
214216
event_cache_store
217+
.as_clean()
218+
.unwrap()
215219
.handle_linked_chunk_updates(
216220
LinkedChunkId::Room(room_id),
217221
vec![

crates/matrix-sdk-ui/tests/integration/timeline/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,8 @@ async fn test_timeline_receives_a_limited_number_of_events_when_subscribing() {
827827

828828
// The event cache contains 30 events.
829829
event_cache_store
830+
.as_clean()
831+
.unwrap()
830832
.handle_linked_chunk_updates(
831833
LinkedChunkId::Room(room_id),
832834
vec![

crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ async fn test_cached_events_are_kept_for_different_room_instances() {
256256
assert!(!items.is_empty()); // We just loaded some events
257257
assert_pending!(timeline_stream);
258258

259-
assert!(room_cache.find_event(event_id!("$1")).await.is_some());
259+
assert!(room_cache.find_event(event_id!("$1")).await.unwrap().is_some());
260260

261261
// Drop the existing room and timeline instances
262262
drop(timeline_stream);
@@ -277,7 +277,7 @@ async fn test_cached_events_are_kept_for_different_room_instances() {
277277

278278
let (items, _) = timeline.subscribe().await;
279279
assert!(!items.is_empty()); // These events came from the cache
280-
assert!(room_cache.find_event(event_id!("$1")).await.is_some());
280+
assert!(room_cache.find_event(event_id!("$1")).await.unwrap().is_some());
281281

282282
// Drop the existing room and timeline instances
283283
server.server().reset().await;
@@ -402,7 +402,7 @@ async fn test_pinned_timeline_with_no_pinned_events_on_pagination_is_just_empty(
402402
.expect("Pagination of events should successful");
403403

404404
// Assert the event is loaded and added to the cache
405-
assert!(event_cache.find_event(event_id).await.is_some());
405+
assert!(event_cache.find_event(event_id).await.unwrap().is_some());
406406

407407
// And it won't cause an update in the pinned events timeline since it's not
408408
// pinned

crates/matrix-sdk/src/event_cache/deduplicator.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::collections::BTreeSet;
1919

2020
use matrix_sdk_base::{
21-
event_cache::store::EventCacheStoreLock,
21+
event_cache::store::EventCacheStoreLockGuard,
2222
linked_chunk::{LinkedChunkId, Position},
2323
};
2424
use ruma::OwnedEventId;
@@ -32,7 +32,7 @@ use super::{
3232
/// information about the duplicates found in the new events, including the
3333
/// events that are not loaded in memory.
3434
pub async fn filter_duplicate_events(
35-
store: &EventCacheStoreLock,
35+
store_guard: &EventCacheStoreLockGuard,
3636
linked_chunk_id: LinkedChunkId<'_>,
3737
linked_chunk: &EventLinkedChunk,
3838
mut new_events: Vec<Event>,
@@ -50,10 +50,8 @@ pub async fn filter_duplicate_events(
5050
});
5151
}
5252

53-
let store = store.lock().await?;
54-
5553
// Let the store do its magic ✨
56-
let duplicated_event_ids = store
54+
let duplicated_event_ids = store_guard
5755
.filter_duplicated_events(
5856
linked_chunk_id,
5957
new_events.iter().filter_map(|event| event.event_id()).collect(),
@@ -148,7 +146,10 @@ pub(super) struct DeduplicationOutcome {
148146
mod tests {
149147
use std::ops::Not as _;
150148

151-
use matrix_sdk_base::{deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier};
149+
use matrix_sdk_base::{
150+
deserialized_responses::TimelineEvent, event_cache::store::EventCacheStoreLock,
151+
linked_chunk::ChunkIdentifier,
152+
};
152153
use matrix_sdk_test::{async_test, event_factory::EventFactory};
153154
use ruma::{EventId, owned_event_id, serde::Raw, user_id};
154155

@@ -222,6 +223,8 @@ mod tests {
222223
.unwrap();
223224

224225
let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
226+
let event_cache_store = event_cache_store.lock().await.unwrap();
227+
let event_cache_store_guard = event_cache_store.as_clean().unwrap();
225228

226229
{
227230
// When presenting with only duplicate events, some of them in the in-memory
@@ -232,7 +235,7 @@ mod tests {
232235
linked_chunk.push_events([event_1.clone(), event_2.clone(), event_3.clone()]);
233236

234237
let outcome = filter_duplicate_events(
235-
&event_cache_store,
238+
event_cache_store_guard,
236239
LinkedChunkId::Room(room_id),
237240
&linked_chunk,
238241
vec![event_0.clone(), event_1.clone(), event_2.clone(), event_3.clone()],
@@ -247,7 +250,7 @@ mod tests {
247250
linked_chunk.push_events([event_2.clone(), event_3.clone()]);
248251

249252
let outcome = filter_duplicate_events(
250-
&event_cache_store,
253+
event_cache_store_guard,
251254
LinkedChunkId::Room(room_id),
252255
&linked_chunk,
253256
vec![event_0, event_1, event_2, event_3, event_4],
@@ -351,6 +354,8 @@ mod tests {
351354

352355
// Wrap the store into its lock.
353356
let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
357+
let event_cache_store = event_cache_store.lock().await.unwrap();
358+
let event_cache_store_guard = event_cache_store.as_clean().unwrap();
354359

355360
let linked_chunk = EventLinkedChunk::new();
356361

@@ -360,7 +365,7 @@ mod tests {
360365
in_store_duplicated_event_ids,
361366
non_empty_all_duplicates,
362367
} = filter_duplicate_events(
363-
&event_cache_store,
368+
event_cache_store_guard,
364369
LinkedChunkId::Room(room_id),
365370
&linked_chunk,
366371
vec![ev1, ev2, ev3, ev4],

0 commit comments

Comments
 (0)