From d3c839a2d0137b81b43b7c9a838e659feb6bda18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 26 Sep 2025 14:21:28 +0200 Subject: [PATCH 01/25] feat(event cache): Add a method to access the linked chunk mutably --- crates/matrix-sdk/src/event_cache/room/mod.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 78a14c1b16d..3cc1cfb0a87 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -1301,6 +1301,12 @@ mod private { &self.room_linked_chunk } + /// Returns a mutable reference to the underlying room linked chunk. + #[cfg(feature = "e2e-encryption")] + pub(in crate::event_cache) fn room_linked_chunk_mut(&mut self) -> &mut EventLinkedChunk { + &mut self.room_linked_chunk + } + //// Find a single event in this room, starting from the most recent event. /// /// **Warning**! It looks into the loaded events from the in-memory From 8103b9cc23d737a069c67c9a54dded90458e8ee5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 24 Sep 2025 10:11:34 +0200 Subject: [PATCH 02/25] feat(event cache): Create the redecryptor --- crates/matrix-sdk/src/event_cache/mod.rs | 1 + .../matrix-sdk/src/event_cache/redecryptor.rs | 208 ++++++++++++++++++ crates/matrix-sdk/src/event_cache/room/mod.rs | 2 +- 3 files changed, 210 insertions(+), 1 deletion(-) create mode 100644 crates/matrix-sdk/src/event_cache/redecryptor.rs diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 810e07fa553..68fae9200e3 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -74,6 +74,7 @@ use crate::{ mod deduplicator; mod pagination; +mod redecryptor; mod room; pub use pagination::{RoomPagination, RoomPaginationStatus}; diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs new file mode 100644 index 00000000000..dc94450c9fb --- /dev/null +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -0,0 +1,208 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The REDECRYPTOR is a layer that handles redecryption of events in case we +//! couldn't decrypt them imediatelly + +use std::sync::Weak; + +use as_variant::as_variant; +use futures_core::Stream; +use futures_util::{StreamExt, pin_mut}; +use matrix_sdk_base::{ + crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent}, + deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, +}; +use matrix_sdk_common::executor::{JoinHandle, spawn}; +use ruma::{OwnedEventId, RoomId, events::AnySyncTimelineEvent, serde::Raw}; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; +use tracing::{info, instrument, trace, warn}; + +use crate::{ + Client, + event_cache::{ + EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, + }, +}; + +impl EventCache { + async fn get_utds( + &self, + room_key_info: &RoomKeyInfo, + ) -> Result)>, EventCacheError> { + let map_timeline_event = |event: TimelineEvent| { + let event_id = event.event_id(); + + // Only pick out events that are UTDs, get just the Raw event as this is what + // the OlmMachine needs. + let event = + as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event); + // Zip the event ID and event together so we don't have to pick out the event ID + // again. We need the event ID to replace the event in the cache. + event_id.zip(event) + }; + + // Load the relevant events from the event cache store and attempt to redecrypt + // things. + let store = self.inner.store.lock().await?; + let events = store + .get_room_events( + &room_key_info.room_id, + Some("m.room.encrypted"), + Some(&room_key_info.session_id), + ) + .await?; + + Ok(events.into_iter().filter_map(map_timeline_event).collect()) + } + + #[instrument(skip_all, fields(room_id))] + async fn on_resolved_utds( + &self, + room_id: &RoomId, + events: Vec<(OwnedEventId, DecryptedRoomEvent)>, + ) -> Result<(), EventCacheError> { + // Get the cache for this particular room and lock the state for the duration of + // the decryption. + let (room_cache, _drop_handles) = self.for_room(room_id).await?; + let mut state = room_cache.inner.state.write().await; + + let event_ids: Vec<_> = events.iter().map(|(event_id, _)| event_id).collect(); + + trace!(?event_ids, "Replacing successfully re-decrypted events"); + + for (event_id, decrypted) in events { + // The event isn't in the cache, nothing to replace. Realistically this can't + // happen since we retrieved the list of events from the cache itself and + // `find_event()` will look into the store as well. + if let Some((location, mut target_event)) = state.find_event(&event_id).await? { + target_event.kind = TimelineEventKind::Decrypted(decrypted); + + // TODO: `replace_event_at()` propagates changes to the store for every event, + // we should probably have a bulk version of this? + state.replace_event_at(location, target_event).await? + } + } + + // We replaced a bunch of events, reactive updates for those replacements have + // been queued up. We need to send them out to our subscribers now. + let diffs = state.room_linked_chunk_mut().updates_as_vector_diffs(); + + let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs, + origin: EventsOrigin::Cache, + }); + + Ok(()) + } + + async fn decrypt_event( + &self, + room_id: &RoomId, + event: &Raw, + ) -> Option { + let client = self.inner.client().ok()?; + let machine = client.olm_machine().await; + let machine = machine.as_ref()?; + + match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await { + Ok(decrypted) => Some(decrypted), + Err(e) => { + warn!("Failed to redecrypt an event despite receiving a room key for it {e:?}"); + None + } + } + } + + /// Attempt to redecrypt events after a room key with the given session ID + /// has been received. + #[instrument(skip_all, fields(room_key_info))] + async fn retry_decryption(&self, room_key_info: RoomKeyInfo) -> Result<(), EventCacheError> { + trace!("Retrying to decrypt"); + + let events = self.get_utds(&room_key_info).await?; + let mut decrypted_events = Vec::with_capacity(events.len()); + + for (event_id, event) in events { + // If we managed to decrypt the event, and we should have to since we received + // the room key for this specific event, then replace the event. + if let Some(decrypted) = + self.decrypt_event(&room_key_info.room_id, event.cast_ref_unchecked()).await + { + decrypted_events.push((event_id, decrypted)); + } + } + + self.on_resolved_utds(&room_key_info.room_id, decrypted_events).await?; + + Ok(()) + } +} + +pub(crate) struct Redecryptor { + task: JoinHandle<()>, +} + +impl Drop for Redecryptor { + fn drop(&mut self) { + self.task.abort(); + } +} + +impl Redecryptor { + pub fn new(client: Client, cache: Weak) -> Self { + let task = spawn(async { + let stream = { + let machine = client.olm_machine().await; + machine.as_ref().unwrap().store().room_keys_received_stream() + }; + + drop(client); + + Self::listen_for_room_keys_task(cache, stream).await; + }); + + Self { task } + } + + async fn listen_for_room_keys_task( + cache: Weak, + received_stream: impl Stream, BroadcastStreamRecvError>>, + ) { + pin_mut!(received_stream); + + // TODO: We need to relisten to this stream if it dies due to the cross-process + // lock reloading the Olm machine. + while let Some(update) = received_stream.next().await { + if let Ok(room_keys) = update { + let Some(event_cache) = cache.upgrade() else { + break; + }; + + let cache = EventCache { inner: event_cache }; + + for key in room_keys { + let _ = cache + .retry_decryption(key) + .await + .inspect_err(|e| warn!("Error redecrypting {e:?}")); + } + } else { + todo!("Redecrypt all visible events?") + } + } + + info!("Shutting down the event cache redecryptor"); + } +} diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 3cc1cfb0a87..21f66bf690a 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -1597,7 +1597,7 @@ mod private { /// observers that a single item has been replaced. Otherwise, /// such a notification is not emitted, because observers are /// unlikely to observe the store updates directly. - async fn replace_event_at( + pub(crate) async fn replace_event_at( &mut self, location: EventLocation, event: Event, From f2cc6c650add4bd55994b6c0e3c10940d37d8f06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 26 Sep 2025 14:21:28 +0200 Subject: [PATCH 03/25] test(redecryptor): Add a test to show that the redecryptor works --- .../matrix-sdk/src/event_cache/redecryptor.rs | 175 ++++++++++++++++++ 1 file changed, 175 insertions(+) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index dc94450c9fb..35197619557 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -206,3 +206,178 @@ impl Redecryptor { info!("Shutting down the event cache redecryptor"); } } + +#[cfg(not(target_family = "wasm"))] +#[cfg(test)] +mod tests { + use std::time::Duration; + + use assert_matches2::assert_matches; + use eyeball_im::VectorDiff; + use matrix_sdk_base::deserialized_responses::TimelineEventKind; + use matrix_sdk_test::{ + JoinedRoomBuilder, StateTestEvent, async_test, event_factory::EventFactory, + }; + use ruma::{device_id, event_id, room_id, user_id}; + use serde_json::json; + + use crate::{ + assert_let_timeout, encryption::EncryptionSettings, event_cache::RoomEventCacheUpdate, + test_utils::mocks::MatrixMockServer, + }; + + #[async_test] + async fn test_redecryptor() { + let room_id = room_id!("!test:localhost"); + + let alice_user_id = user_id!("@alice:localhost"); + let alice_device_id = device_id!("ALICEDEVICE"); + let bob_user_id = user_id!("@bob:localhost"); + let bob_device_id = device_id!("BOBDEVICE"); + + let matrix_mock_server = MatrixMockServer::new().await; + matrix_mock_server.mock_crypto_endpoints_preset().await; + + let encryption_settings = + EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; + + // Create some clients for Alice and Bob. + + let alice = matrix_mock_server + .client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id) + .on_builder(|builder| { + builder + .with_enable_share_history_on_invite(true) + .with_encryption_settings(encryption_settings) + }) + .build() + .await; + + let bob = matrix_mock_server + .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id) + .on_builder(|builder| { + builder + .with_enable_share_history_on_invite(true) + .with_encryption_settings(encryption_settings) + }) + .build() + .await; + + bob.event_cache().subscribe().expect("Bob should be able to enable the event cache"); + + // Ensure that Alice and Bob are aware of their devices and identities. + matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await; + + let event_factory = EventFactory::new().room(room_id); + let alice_member_event = event_factory.member(alice_user_id).into_raw(); + let bob_member_event = event_factory.member(bob_user_id).into_raw(); + + // Let us now create a room for them. + let room_builder = JoinedRoomBuilder::new(room_id) + .add_state_event(StateTestEvent::Create) + .add_state_event(StateTestEvent::Encryption); + + matrix_mock_server + .mock_sync() + .ok_and_run(&alice, |builder| { + builder.add_joined_room(room_builder.clone()); + }) + .await; + + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_joined_room(room_builder); + }) + .await; + + let room = alice + .get_room(room_id) + .expect("Alice should have access to the room now that we synced"); + + // Alice will send a single event to the room, but this will trigger a to-device + // message containing the room key to be sent as well. We capture both the event + // and the to-device message. + + let event_type = "m.room.message"; + let content = json!({"body": "It's a secret to everybody", "msgtype": "m.text"}); + + let event_id = event_id!("$some_id"); + let (event_receiver, mock) = + matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id); + let (_guard, room_key) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await; + + { + let _guard = mock.mock_once().mount_as_scoped().await; + + matrix_mock_server + .mock_get_members() + .ok(vec![alice_member_event.clone(), bob_member_event.clone()]) + .mock_once() + .mount() + .await; + + room.send_raw(event_type, content) + .await + .expect("We should be able to send an initial message"); + }; + + // Let's now see what Bob's event cache does. + + let (room_cache, _) = bob + .event_cache() + .for_room(room_id) + .await + .expect("We should be able to get to the event cache for a specific room"); + + let (_, mut subscriber) = room_cache.subscribe().await; + + // Let us retrieve the captured event and to-device message. + let event = event_receiver.await.expect("Alice should have sent the event by now"); + let room_key = room_key.await; + + // Let us forward the event to Bob. + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event)); + }) + .await; + + // Alright, Bob has received an update from the cache. + + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + // There should be a single new event, and it should be a UTD as we did not + // receive the room key yet. + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Append { values }); + assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. }); + + // Now we send the room key to Bob. + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_to_device_event( + room_key + .deserialize_as() + .expect("We should be able to deserialize the room key"), + ); + }) + .await; + + // Bob should receive a new update from the cache. + assert_let_timeout!( + Duration::from_secs(1), + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + // It should replace the UTD with a decrypted event. + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Set { index, value }); + assert_eq!(*index, 0); + assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. }); + } +} From e93423504569bf280f4dd6b3405647056ff55832 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 2 Oct 2025 13:53:37 +0200 Subject: [PATCH 04/25] feat(redecryptor): Rejigger things so we can relisten to the room key stream --- .../matrix-sdk/src/event_cache/redecryptor.rs | 170 ++++++++++++------ 1 file changed, 118 insertions(+), 52 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 35197619557..f3e81c554f3 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -15,7 +15,7 @@ //! The REDECRYPTOR is a layer that handles redecryption of events in case we //! couldn't decrypt them imediatelly -use std::sync::Weak; +use std::{collections::BTreeSet, pin::Pin, sync::Weak}; use as_variant::as_variant; use futures_core::Stream; @@ -25,21 +25,29 @@ use matrix_sdk_base::{ deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, }; use matrix_sdk_common::executor::{JoinHandle, spawn}; -use ruma::{OwnedEventId, RoomId, events::AnySyncTimelineEvent, serde::Raw}; -use tokio_stream::wrappers::errors::BroadcastStreamRecvError; +use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncTimelineEvent, serde::Raw}; +use tokio::sync::mpsc::UnboundedSender; +use tokio_stream::wrappers::{UnboundedReceiverStream, errors::BroadcastStreamRecvError}; use tracing::{info, instrument, trace, warn}; -use crate::{ - Client, - event_cache::{ - EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, - }, +use crate::event_cache::{ + EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, }; +/// The information sent across the channel to the long-running task requesting +/// that the supplied set of sessions be retried. +pub struct DecryptionRetryRequest { + room_id: OwnedRoomId, + session_ids: BTreeSet, +} + +type SessionId<'a> = &'a str; + impl EventCache { async fn get_utds( &self, - room_key_info: &RoomKeyInfo, + room_id: &RoomId, + session_id: SessionId<'_>, ) -> Result)>, EventCacheError> { let map_timeline_event = |event: TimelineEvent| { let event_id = event.event_id(); @@ -56,13 +64,8 @@ impl EventCache { // Load the relevant events from the event cache store and attempt to redecrypt // things. let store = self.inner.store.lock().await?; - let events = store - .get_room_events( - &room_key_info.room_id, - Some("m.room.encrypted"), - Some(&room_key_info.session_id), - ) - .await?; + let events = + store.get_room_events(room_id, Some("m.room.encrypted"), Some(session_id)).await?; Ok(events.into_iter().filter_map(map_timeline_event).collect()) } @@ -128,29 +131,32 @@ impl EventCache { /// Attempt to redecrypt events after a room key with the given session ID /// has been received. #[instrument(skip_all, fields(room_key_info))] - async fn retry_decryption(&self, room_key_info: RoomKeyInfo) -> Result<(), EventCacheError> { + async fn retry_decryption( + &self, + room_id: &RoomId, + session_id: SessionId<'_>, + ) -> Result<(), EventCacheError> { trace!("Retrying to decrypt"); - let events = self.get_utds(&room_key_info).await?; + let events = self.get_utds(room_id, session_id).await?; let mut decrypted_events = Vec::with_capacity(events.len()); for (event_id, event) in events { // If we managed to decrypt the event, and we should have to since we received // the room key for this specific event, then replace the event. - if let Some(decrypted) = - self.decrypt_event(&room_key_info.room_id, event.cast_ref_unchecked()).await - { + if let Some(decrypted) = self.decrypt_event(room_id, event.cast_ref_unchecked()).await { decrypted_events.push((event_id, decrypted)); } } - self.on_resolved_utds(&room_key_info.room_id, decrypted_events).await?; + self.on_resolved_utds(room_id, decrypted_events).await?; Ok(()) } } pub(crate) struct Redecryptor { + request_decryption_sender: UnboundedSender, task: JoinHandle<()>, } @@ -161,46 +167,98 @@ impl Drop for Redecryptor { } impl Redecryptor { - pub fn new(client: Client, cache: Weak) -> Self { + pub(super) fn new(cache: Weak) -> Self { + let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let task = spawn(async { - let stream = { - let machine = client.olm_machine().await; - machine.as_ref().unwrap().store().room_keys_received_stream() - }; + let request_redecryption_stream = UnboundedReceiverStream::new(receiver); - drop(client); + Self::listen_for_room_keys_task(cache, request_redecryption_stream).await; + }); + + Self { task, request_decryption_sender } + } - Self::listen_for_room_keys_task(cache, stream).await; + #[allow(dead_code)] + pub(super) fn request_decryption(&self, request: DecryptionRetryRequest) { + let _ = self.request_decryption_sender.send(request).inspect_err(|_| { + warn!("Requesting a decryption while the redecryption task has been shut down") }); + } - Self { task } + async fn subscribe_to_room_key_stream( + cache: &Weak, + ) -> Option, BroadcastStreamRecvError>>> { + let event_cache = cache.upgrade()?; + let client = event_cache.client().ok()?; + let machine = client.olm_machine().await; + + machine.as_ref().map(|m| m.store().room_keys_received_stream()) + } + + fn upgrade_event_cache(cache: &Weak) -> Option { + cache.upgrade().map(|inner| EventCache { inner }) + } + + async fn redecryption_loop( + cache: &Weak, + decryption_request_stream: &mut Pin<&mut impl Stream>, + ) -> bool { + let Some(room_key_stream) = Self::subscribe_to_room_key_stream(cache).await else { + return false; + }; + + pin_mut!(room_key_stream); + + loop { + tokio::select! { + Some(request) = decryption_request_stream.next() => { + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + for session_id in request.session_ids { + let _ = cache + .retry_decryption(&request.room_id, &session_id) + .await + .inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}")); + } + } + room_keys = room_key_stream.next() => { + match room_keys { + Some(Ok(room_keys)) => { + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + for key in room_keys { + let _ = cache + .retry_decryption(&key.room_id, &key.session_id) + .await + .inspect_err(|e| warn!("Error redecrypting {e:?}")); + } + }, + Some(Err(_)) => { + todo!("Handle lagging here, how?") + }, + None => { + break true + } + } + } + else => break false, + } + } } async fn listen_for_room_keys_task( cache: Weak, - received_stream: impl Stream, BroadcastStreamRecvError>>, + decryption_request_stream: UnboundedReceiverStream, ) { - pin_mut!(received_stream); - - // TODO: We need to relisten to this stream if it dies due to the cross-process - // lock reloading the Olm machine. - while let Some(update) = received_stream.next().await { - if let Ok(room_keys) = update { - let Some(event_cache) = cache.upgrade() else { - break; - }; - - let cache = EventCache { inner: event_cache }; - - for key in room_keys { - let _ = cache - .retry_decryption(key) - .await - .inspect_err(|e| warn!("Error redecrypting {e:?}")); - } - } else { - todo!("Redecrypt all visible events?") - } + pin_mut!(decryption_request_stream); + + while Self::redecryption_loop(&cache, &mut decryption_request_stream).await { + info!("Regenerating the re-decryption streams") } info!("Shutting down the event cache redecryptor"); @@ -336,6 +394,14 @@ mod tests { let event = event_receiver.await.expect("Alice should have sent the event by now"); let room_key = room_key.await; + // We regenerate the Olm machine to check if the room key stream is recreated to + // correctly. + bob.inner + .base_client + .regenerate_olm(None) + .await + .expect("We should be able to regenerate the Olm machine"); + // Let us forward the event to Bob. matrix_mock_server .mock_sync() From 5c3bca86a438c8eecc253cab1eb80ec583d5be4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 2 Oct 2025 14:10:48 +0200 Subject: [PATCH 05/25] doc(event cache): Document the redecryptor --- .../matrix-sdk/src/event_cache/redecryptor.rs | 98 ++++++++++++++++++- 1 file changed, 96 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index f3e81c554f3..9c02c0bff0b 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -12,14 +12,63 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! The REDECRYPTOR is a layer that handles redecryption of events in case we -//! couldn't decrypt them imediatelly +//! The Redecryptor (Rd) is a layer and long-running background task which +//! handles redecryption of events in case we couldn't decrypt them imediatelly. +//! +//! Rd listens to the OlmMachine for received room keys. If a new room key has +//! been received it attempts to find any UTDs in the [`EventCache`]. If Rd +//! decrypts any UTDs from the event cache it will replace the events in the +//! cache and send out new [`RoomEventCacheUpdates`] to any of its listeners. +//! +//! There's an additional gotcha, the [`OlmMachine`] might get recreated by +//! calls to [`BaseClient::regenerate_olm()`]. When this happens we will receive +//! a `None` on the room keys stream and we need to re-listen to it. +//! +//! Another gotcha is that room keys might be received on another process if the +//! Client is operating on a iOS device. A separate process is used in this case +//! to receive push notifications. In this case the room key will be received +//! and Rd won't get notified about it. To work around this decryption requests +//! can be explicitly sent to Rd. +//! +//! +//! ┌─────────────┐ +//! │ │ +//! ┌───────────┤ Timeline │◄────────────┐ +//! │ │ │ │ +//! │ └─────▲───────┘ │ +//! │ │ │ +//! │ │ │ +//! │ │ │ +//! Decryption │ Redecryptor +//! request │ report +//! │ RoomEventCacheUpdates │ +//! │ │ │ +//! │ │ │ +//! │ ┌──────────┴────────────┐ │ +//! │ │ │ │ +//! └──────► Redecryptor │────────┘ +//! │ │ +//! └───────────▲───────────┘ +//! │ +//! │ +//! │ +//! Received room keys stream +//! │ +//! │ +//! │ +//! ┌───────┴──────┐ +//! │ │ +//! │ OlmMachine │ +//! │ │ +//! └──────────────┘ use std::{collections::BTreeSet, pin::Pin, sync::Weak}; use as_variant::as_variant; use futures_core::Stream; use futures_util::{StreamExt, pin_mut}; +#[cfg(doc)] +use matrix_sdk_base::{BaseClient, crypto::OlmMachine}; use matrix_sdk_base::{ crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent}, deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, @@ -44,6 +93,13 @@ pub struct DecryptionRetryRequest { type SessionId<'a> = &'a str; impl EventCache { + /// Retrieve a set of events that we weren't able to decrypt. + /// + /// # Arguments + /// + /// * `room_id` - The ID of the room where the events were sent to. + /// * `session_id` - The unique ID of the room key that was used to encrypt + /// the event. async fn get_utds( &self, room_id: &RoomId, @@ -70,6 +126,17 @@ impl EventCache { Ok(events.into_iter().filter_map(map_timeline_event).collect()) } + /// Handle a chunk of events that we were previously unable to decrypt but + /// have now successfully decrypted. + /// + /// This function will replace the existing UTD events in memory and the + /// store and send out a [`RoomEventCacheUpdate`] for the newly + /// decrypted events. + /// + /// # Arguments + /// + /// * `room_id` - The ID of the room where the events were sent to. + /// * `events` - A chunk of events that were successfully decrypted. #[instrument(skip_all, fields(room_id))] async fn on_resolved_utds( &self, @@ -110,12 +177,17 @@ impl EventCache { Ok(()) } + /// Attempt to decrypt a single event. async fn decrypt_event( &self, room_id: &RoomId, event: &Raw, ) -> Option { let client = self.inner.client().ok()?; + // TODO: Do we need to use the `Room` object to decrypt these events so we can + // calculate if the event should count as a notification, i.e. get the push + // actions. I thing we do, what happens if the room can't be found? We fallback + // to this? let machine = client.olm_machine().await; let machine = machine.as_ref()?; @@ -138,7 +210,10 @@ impl EventCache { ) -> Result<(), EventCacheError> { trace!("Retrying to decrypt"); + // Get all the relevant UTDs. let events = self.get_utds(room_id, session_id).await?; + + // Let's attempt to decrypt them them. let mut decrypted_events = Vec::with_capacity(events.len()); for (event_id, event) in events { @@ -149,6 +224,8 @@ impl EventCache { } } + // Replace the events and notify listeners that UTDs have been replaced with + // decrypted events. self.on_resolved_utds(room_id, decrypted_events).await?; Ok(()) @@ -167,6 +244,10 @@ impl Drop for Redecryptor { } impl Redecryptor { + /// Create a new [`Redecryptor`]. + /// + /// This creates a task that listens to various streams and attempts to + /// redecrypt UTDs that can be found inside the [`EventCache`]. pub(super) fn new(cache: Weak) -> Self { let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -186,6 +267,10 @@ impl Redecryptor { }); } + /// (Re)-subscribe to the room key stream from the [`OlmMachine`]. + /// + /// This needs to happen any time this stream returns a `None` meaning that + /// the sending part of the stream has been dropped. async fn subscribe_to_room_key_stream( cache: &Weak, ) -> Option, BroadcastStreamRecvError>>> { @@ -212,6 +297,8 @@ impl Redecryptor { loop { tokio::select! { + // An explicit request, presumably from the timeline, has been received to decrypt + // events that were encrypted with a certain room key. Some(request) = decryption_request_stream.next() => { let Some(cache) = Self::upgrade_event_cache(cache) else { break false; @@ -224,6 +311,8 @@ impl Redecryptor { .inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}")); } } + // The room key stream from the OlmMachine. Needs to be recreated every time we + // receive a `None` from the stream. room_keys = room_key_stream.next() => { match room_keys { Some(Ok(room_keys)) => { @@ -241,6 +330,8 @@ impl Redecryptor { Some(Err(_)) => { todo!("Handle lagging here, how?") }, + // The stream got closed, this could mean that our OlmMachine got + // regenerated, let's return true and try to recreate the stream. None => { break true } @@ -255,6 +346,9 @@ impl Redecryptor { cache: Weak, decryption_request_stream: UnboundedReceiverStream, ) { + // We pin the decryption request stream here since that one doesn't need to be + // recreated and we don't want to miss messages coming from the stream + // while recreating it unnecessarily. pin_mut!(decryption_request_stream); while Self::redecryption_loop(&cache, &mut decryption_request_stream).await { From 4ed239351a742432f5019661a87d8899011485e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 25 Sep 2025 13:47:01 +0200 Subject: [PATCH 06/25] feat(event cache): Enable the redecryptor in the event cache --- crates/matrix-sdk/src/event_cache/mod.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 68fae9200e3..217033b8866 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -74,10 +74,13 @@ use crate::{ mod deduplicator; mod pagination; +#[cfg(feature = "e2e-encryption")] mod redecryptor; mod room; pub use pagination::{RoomPagination, RoomPaginationStatus}; +#[cfg(feature = "e2e-encryption")] +pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport}; pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate}; /// An error observed in the [`EventCache`]. @@ -149,6 +152,10 @@ pub struct EventCacheDropHandles { /// The task used to automatically shrink the linked chunks. auto_shrink_linked_chunk_task: JoinHandle<()>, + + /// The task used to automatically redecrypt UTDs. + #[cfg(feature = "e2e-encryption")] + _redecryptor: redecryptor::Redecryptor, } impl fmt::Debug for EventCacheDropHandles { @@ -258,10 +265,15 @@ impl EventCache { auto_shrink_receiver, )); + #[cfg(feature = "e2e-encryption")] + let redecryptor = redecryptor::Redecryptor::new(Arc::downgrade(&self.inner)); + Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task, auto_shrink_linked_chunk_task, + #[cfg(feature = "e2e-encryption")] + _redecryptor: redecryptor, }) }); From a2f89e85b967175f8283cefcdb7ea7a3f574c321 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 7 Oct 2025 16:08:16 +0200 Subject: [PATCH 07/25] feat: Redecryptor start to send out redecryptor reports --- crates/matrix-sdk/src/event_cache/mod.rs | 21 ++- .../matrix-sdk/src/event_cache/redecryptor.rs | 138 +++++++++++++++--- 2 files changed, 137 insertions(+), 22 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 217033b8866..50c67c1b7a5 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -208,6 +208,9 @@ impl EventCache { linked_chunk_update_sender.clone(), ))); + #[cfg(feature = "e2e-encryption")] + let redecryption_channels = redecryptor::RedecryptorChannels::new(); + Self { inner: Arc::new(EventCacheInner { client, @@ -221,6 +224,8 @@ impl EventCache { _thread_subscriber_task: thread_subscriber_task, #[cfg(feature = "experimental-search")] _search_indexing_task: search_indexing_task, + #[cfg(feature = "e2e-encryption")] + redecryption_channels, thread_subscriber_receiver, }), } @@ -266,7 +271,18 @@ impl EventCache { )); #[cfg(feature = "e2e-encryption")] - let redecryptor = redecryptor::Redecryptor::new(Arc::downgrade(&self.inner)); + let redecryptor = { + let receiver = self + .inner + .redecryption_channels + .decryption_request_receiver + .lock() + .take() + .expect("We should have initialized the channel an subscribing should happen only once"); + + redecryptor::Redecryptor::new(Arc::downgrade(&self.inner), receiver) + }; + Arc::new(EventCacheDropHandles { listen_updates_task, @@ -853,6 +869,9 @@ struct EventCacheInner { /// This is helpful for tests to coordinate that a new thread subscription /// has been sent or not. thread_subscriber_receiver: Receiver<()>, + + #[cfg(feature = "e2e-encryption")] + redecryption_channels: redecryptor::RedecryptorChannels, } type AutoShrinkChannelPayload = OwnedRoomId; diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 9c02c0bff0b..26bff9be284 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -72,25 +72,75 @@ use matrix_sdk_base::{BaseClient, crypto::OlmMachine}; use matrix_sdk_base::{ crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent}, deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, + locks::Mutex, }; +#[cfg(doc)] +use matrix_sdk_common::deserialized_responses::EncryptionInfo; use matrix_sdk_common::executor::{JoinHandle, spawn}; use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncTimelineEvent, serde::Raw}; -use tokio::sync::mpsc::UnboundedSender; -use tokio_stream::wrappers::{UnboundedReceiverStream, errors::BroadcastStreamRecvError}; +use tokio::sync::{ + broadcast, + mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, +}; +use tokio_stream::wrappers::{ + BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError, +}; use tracing::{info, instrument, trace, warn}; use crate::event_cache::{ EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, }; +type SessionId<'a> = &'a str; +type OwnedSessionId = String; + /// The information sent across the channel to the long-running task requesting /// that the supplied set of sessions be retried. +#[derive(Debug, Clone)] pub struct DecryptionRetryRequest { - room_id: OwnedRoomId, - session_ids: BTreeSet, + /// The room ID of the room the events belong to. + pub room_id: OwnedRoomId, + /// Events that are not decrypted. + pub utd_session_ids: BTreeSet, + /// Events that are decrypted but might need to have their + /// [`EncryptionInfo`] refreshed. + pub refresh_info_session_ids: BTreeSet, } -type SessionId<'a> = &'a str; +/// A report coming from the redecryptor. +#[derive(Debug, Clone)] +pub enum RedecryptorReport { + /// Events which we were able to decrypt. + ResolvedUtds { + /// The room ID of the room the events belong to. + room_id: OwnedRoomId, + /// The list of event IDs of the decrypted events. + events: BTreeSet, + }, + /// The redecryptor might have missed some room keys so it might not have + /// re-decrypted events that are now decryptable. + Lagging, +} + +pub(super) struct RedecryptorChannels { + utd_reporter: broadcast::Sender, + pub(super) decryption_request_sender: UnboundedSender, + pub(super) decryption_request_receiver: + Mutex>>, +} + +impl RedecryptorChannels { + pub(super) fn new() -> Self { + let (utd_reporter, _) = broadcast::channel(100); + let (decryption_request_sender, decryption_request_receiver) = unbounded_channel(); + + Self { + utd_reporter, + decryption_request_sender, + decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)), + } + } +} impl EventCache { /// Retrieve a set of events that we weren't able to decrypt. @@ -148,7 +198,7 @@ impl EventCache { let (room_cache, _drop_handles) = self.for_room(room_id).await?; let mut state = room_cache.inner.state.write().await; - let event_ids: Vec<_> = events.iter().map(|(event_id, _)| event_id).collect(); + let event_ids: BTreeSet<_> = events.iter().cloned().map(|(event_id, _)| event_id).collect(); trace!(?event_ids, "Replacing successfully re-decrypted events"); @@ -174,6 +224,14 @@ impl EventCache { origin: EventsOrigin::Cache, }); + // We report that we resolved some UTDs, this is mainly for listeners that don't + // care about the actual events, just about the fact that UTDs got + // resolved. Not sure if we'll have more such listeners but the UTD hook + // is one such thing. + let report = + RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids }; + let _ = self.inner.redecryption_channels.utd_reporter.send(report); + Ok(()) } @@ -230,10 +288,28 @@ impl EventCache { Ok(()) } + + /// Explicitly request the redecryption of a set of events. + /// + /// TODO: Explain when and why this might be useful. + pub fn request_decryption(&self, request: DecryptionRetryRequest) { + let _ = + self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err( + |_| warn!("Requesting a decryption while the redecryption task has been shut down"), + ); + } + + /// Subscribe to reports that the redecryptor generates. + /// + /// TODO: Explain when the redecryptor might send such reports. + pub fn subscrube_to_decryption_reports( + &self, + ) -> impl Stream> { + BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe()) + } } pub(crate) struct Redecryptor { - request_decryption_sender: UnboundedSender, task: JoinHandle<()>, } @@ -248,23 +324,17 @@ impl Redecryptor { /// /// This creates a task that listens to various streams and attempts to /// redecrypt UTDs that can be found inside the [`EventCache`]. - pub(super) fn new(cache: Weak) -> Self { - let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel(); - + pub(super) fn new( + cache: Weak, + receiver: UnboundedReceiver, + ) -> Self { let task = spawn(async { let request_redecryption_stream = UnboundedReceiverStream::new(receiver); Self::listen_for_room_keys_task(cache, request_redecryption_stream).await; }); - Self { task, request_decryption_sender } - } - - #[allow(dead_code)] - pub(super) fn request_decryption(&self, request: DecryptionRetryRequest) { - let _ = self.request_decryption_sender.send(request).inspect_err(|_| { - warn!("Requesting a decryption while the redecryption task has been shut down") - }); + Self { task } } /// (Re)-subscribe to the room key stream from the [`OlmMachine`]. @@ -304,18 +374,23 @@ impl Redecryptor { break false; }; - for session_id in request.session_ids { + for session_id in request.utd_session_ids { let _ = cache .retry_decryption(&request.room_id, &session_id) .await .inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}")); } + + // TODO: Deal with encryption info updating as well. } // The room key stream from the OlmMachine. Needs to be recreated every time we // receive a `None` from the stream. room_keys = room_key_stream.next() => { match room_keys { Some(Ok(room_keys)) => { + // Alright, some room keys were received and persisted in our store, + // let's attempt to redecrypt events that were encrypted using these + // room keys. let Some(cache) = Self::upgrade_event_cache(cache) else { break false; }; @@ -326,9 +401,21 @@ impl Redecryptor { .await .inspect_err(|e| warn!("Error redecrypting {e:?}")); } + + // TODO: Deal with encryption info updating as well. }, Some(Err(_)) => { - todo!("Handle lagging here, how?") + // We missed some room keys, we need to report this in case a listener + // has and idea which UTDs we should attempt to redecrypt. + // + // This would most likely be the timeline. The timeline might attempt + // to redecrypt all UTDs it is showing to the user. + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + let message = RedecryptorReport::Lagging; + let _ = cache.inner.redecryption_channels.utd_reporter.send(message); }, // The stream got closed, this could mean that our OlmMachine got // regenerated, let's return true and try to recreate the stream. @@ -352,7 +439,16 @@ impl Redecryptor { pin_mut!(decryption_request_stream); while Self::redecryption_loop(&cache, &mut decryption_request_stream).await { - info!("Regenerating the re-decryption streams") + info!("Regenerating the re-decryption streams"); + + let Some(cache) = Self::upgrade_event_cache(&cache) else { + break; + }; + + // Report that the stream got recreated so listeners can attempt to redecrypt + // any UTDs they might be seeing. + let message = RedecryptorReport::Lagging; + let _ = cache.inner.redecryption_channels.utd_reporter.send(message); } info!("Shutting down the event cache redecryptor"); From 621d936b4c6e9a256a0d461b778b8c7c753e1fd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 10 Oct 2025 12:19:46 +0200 Subject: [PATCH 08/25] feat(redecryptor): Let the redecryptor listen to room key withheld updates --- .../matrix-sdk/src/event_cache/redecryptor.rs | 138 ++++++++++++++++-- 1 file changed, 123 insertions(+), 15 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 26bff9be284..623346f4443 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -15,10 +15,16 @@ //! The Redecryptor (Rd) is a layer and long-running background task which //! handles redecryption of events in case we couldn't decrypt them imediatelly. //! -//! Rd listens to the OlmMachine for received room keys. If a new room key has -//! been received it attempts to find any UTDs in the [`EventCache`]. If Rd -//! decrypts any UTDs from the event cache it will replace the events in the -//! cache and send out new [`RoomEventCacheUpdates`] to any of its listeners. +//! Rd listens to the OlmMachine for received room keys and new +//! m.room_key.withheld events. +//! +//! If a new room key has been received it attempts to find any UTDs in the +//! [`EventCache`]. If Rd decrypts any UTDs from the event cache it will replace +//! the events in the cache and send out new [`RoomEventCacheUpdates`] to any of +//! its listeners. +//! +//! If a new withheld info has been received it attempts to find any relevant +//! events and updates the [`EncryptionInfo`] of an event. //! //! There's an additional gotcha, the [`OlmMachine`] might get recreated by //! calls to [`BaseClient::regenerate_olm()`]. When this happens we will receive @@ -70,7 +76,10 @@ use futures_util::{StreamExt, pin_mut}; #[cfg(doc)] use matrix_sdk_base::{BaseClient, crypto::OlmMachine}; use matrix_sdk_base::{ - crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent}, + crypto::{ + store::types::{RoomKeyInfo, RoomKeyWithheldInfo}, + types::events::room::encrypted::EncryptedEvent, + }, deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, locks::Mutex, }; @@ -155,7 +164,7 @@ impl EventCache { room_id: &RoomId, session_id: SessionId<'_>, ) -> Result)>, EventCacheError> { - let map_timeline_event = |event: TimelineEvent| { + let filter = |event: TimelineEvent| { let event_id = event.event_id(); // Only pick out events that are UTDs, get just the Raw event as this is what @@ -167,13 +176,31 @@ impl EventCache { event_id.zip(event) }; - // Load the relevant events from the event cache store and attempt to redecrypt - // things. let store = self.inner.store.lock().await?; let events = store.get_room_events(room_id, Some("m.room.encrypted"), Some(session_id)).await?; - Ok(events.into_iter().filter_map(map_timeline_event).collect()) + Ok(events.into_iter().filter_map(filter).collect()) + } + + async fn get_decrypted_events( + &self, + room_id: &RoomId, + session_id: SessionId<'_>, + ) -> Result, EventCacheError> { + let filter = |event: TimelineEvent| { + let event_id = event.event_id(); + + let event = as_variant!(event.kind, TimelineEventKind::Decrypted(event) => event); + // Zip the event ID and event together so we don't have to pick out the event ID + // again. We need the event ID to replace the event in the cache. + event_id.zip(event) + }; + + let store = self.inner.store.lock().await?; + let events = store.get_room_events(room_id, None, Some(session_id)).await?; + + Ok(events.into_iter().filter_map(filter).collect()) } /// Handle a chunk of events that we were previously unable to decrypt but @@ -289,6 +316,38 @@ impl EventCache { Ok(()) } + async fn update_encryption_info( + &self, + room_id: &RoomId, + session_id: SessionId<'_>, + ) -> Result<(), EventCacheError> { + let client = self.inner.client().ok().unwrap(); + let room = client.get_room(room_id).unwrap(); + + // Get all the relevant events. + let events = self.get_decrypted_events(room_id, session_id).await?; + + // Let's attempt to update their encryption info. + let mut updated_events = Vec::with_capacity(events.len()); + + for (event_id, mut event) in events { + let new_encryption_info = + room.get_encryption_info(session_id, &event.encryption_info.sender).await; + + // Only create a replacement if the encryption info actually changed. + if let Some(new_encryption_info) = new_encryption_info + && event.encryption_info != new_encryption_info + { + event.encryption_info = new_encryption_info; + updated_events.push((event_id, event)); + } + } + + self.on_resolved_utds(room_id, updated_events).await?; + + Ok(()) + } + /// Explicitly request the redecryption of a set of events. /// /// TODO: Explain when and why this might be useful. @@ -343,12 +402,17 @@ impl Redecryptor { /// the sending part of the stream has been dropped. async fn subscribe_to_room_key_stream( cache: &Weak, - ) -> Option, BroadcastStreamRecvError>>> { + ) -> Option<( + impl Stream, BroadcastStreamRecvError>>, + impl Stream>, + )> { let event_cache = cache.upgrade()?; let client = event_cache.client().ok()?; let machine = client.olm_machine().await; - machine.as_ref().map(|m| m.store().room_keys_received_stream()) + machine.as_ref().map(|m| { + (m.store().room_keys_received_stream(), m.store().room_keys_withheld_received_stream()) + }) } fn upgrade_event_cache(cache: &Weak) -> Option { @@ -359,11 +423,14 @@ impl Redecryptor { cache: &Weak, decryption_request_stream: &mut Pin<&mut impl Stream>, ) -> bool { - let Some(room_key_stream) = Self::subscribe_to_room_key_stream(cache).await else { + let Some((room_key_stream, withheld_stream)) = + Self::subscribe_to_room_key_stream(cache).await + else { return false; }; pin_mut!(room_key_stream); + pin_mut!(withheld_stream); loop { tokio::select! { @@ -374,6 +441,8 @@ impl Redecryptor { break false; }; + trace!(?request, "Received a redecryption request"); + for session_id in request.utd_session_ids { let _ = cache .retry_decryption(&request.room_id, &session_id) @@ -381,7 +450,14 @@ impl Redecryptor { .inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}")); } - // TODO: Deal with encryption info updating as well. + for session_id in request.refresh_info_session_ids { + let _ = cache.update_encryption_info(&request.room_id, &session_id).await.inspect_err(|e| + warn!( + room_id = %request.room_id, + session_id = session_id, + "Unable to update the encryption info {e:?}", + )); + } } // The room key stream from the OlmMachine. Needs to be recreated every time we // receive a `None` from the stream. @@ -395,14 +471,23 @@ impl Redecryptor { break false; }; - for key in room_keys { + trace!(?room_keys, "Received new room keys"); + + for key in &room_keys { let _ = cache .retry_decryption(&key.room_id, &key.session_id) .await .inspect_err(|e| warn!("Error redecrypting {e:?}")); } - // TODO: Deal with encryption info updating as well. + for key in room_keys { + let _ = cache.update_encryption_info(&key.room_id, &key.session_id).await.inspect_err(|e| + warn!( + room_id = %key.room_id, + session_id = key.session_id, + "Unable to update the encryption info {e:?}", + )); + } }, Some(Err(_)) => { // We missed some room keys, we need to report this in case a listener @@ -424,6 +509,29 @@ impl Redecryptor { } } } + withheld_info = withheld_stream.next() => { + match withheld_info { + Some(infos) => { + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + trace!(?infos, "Received new withheld infos"); + + for RoomKeyWithheldInfo { room_id, session_id, .. } in &infos { + let _ = cache.update_encryption_info(room_id, session_id).await.inspect_err(|e| + warn!( + room_id = %room_id, + session_id = session_id, + "Unable to update the encryption info {e:?}", + )); + } + } + // The stream got closed, same as for the room key stream, we'll try to + // recreate the streams. + None => break true + } + } else => break false, } } From 3a0a5b98883b5499d6e295932c34921f4213d29e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 10 Oct 2025 15:48:21 +0200 Subject: [PATCH 09/25] feat(redecryptor): Use the room to redecrypt events This allows us to properly calculate the push actions. --- .../matrix-sdk/src/event_cache/redecryptor.rs | 110 ++++++++++++++---- 1 file changed, 85 insertions(+), 25 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 623346f4443..250c7f8c3e4 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -86,7 +86,12 @@ use matrix_sdk_base::{ #[cfg(doc)] use matrix_sdk_common::deserialized_responses::EncryptionInfo; use matrix_sdk_common::executor::{JoinHandle, spawn}; -use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncTimelineEvent, serde::Raw}; +use ruma::{ + OwnedEventId, OwnedRoomId, RoomId, + events::{AnySyncTimelineEvent, room::encrypted::OriginalSyncRoomEncryptedEvent}, + push::Action, + serde::Raw, +}; use tokio::sync::{ broadcast, mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, @@ -96,8 +101,12 @@ use tokio_stream::wrappers::{ }; use tracing::{info, instrument, trace, warn}; -use crate::event_cache::{ - EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, +use crate::{ + Room, + event_cache::{ + EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, + }, + room::PushContext, }; type SessionId<'a> = &'a str; @@ -218,24 +227,29 @@ impl EventCache { async fn on_resolved_utds( &self, room_id: &RoomId, - events: Vec<(OwnedEventId, DecryptedRoomEvent)>, + events: Vec<(OwnedEventId, DecryptedRoomEvent, Option>)>, ) -> Result<(), EventCacheError> { // Get the cache for this particular room and lock the state for the duration of // the decryption. let (room_cache, _drop_handles) = self.for_room(room_id).await?; let mut state = room_cache.inner.state.write().await; - let event_ids: BTreeSet<_> = events.iter().cloned().map(|(event_id, _)| event_id).collect(); + let event_ids: BTreeSet<_> = + events.iter().cloned().map(|(event_id, _, _)| event_id).collect(); trace!(?event_ids, "Replacing successfully re-decrypted events"); - for (event_id, decrypted) in events { + for (event_id, decrypted, actions) in events { // The event isn't in the cache, nothing to replace. Realistically this can't // happen since we retrieved the list of events from the cache itself and // `find_event()` will look into the store as well. if let Some((location, mut target_event)) = state.find_event(&event_id).await? { target_event.kind = TimelineEventKind::Decrypted(decrypted); + if let Some(actions) = actions { + target_event.set_push_actions(actions); + } + // TODO: `replace_event_at()` propagates changes to the store for every event, // we should probably have a bulk version of this? state.replace_event_at(location, target_event).await? @@ -266,21 +280,50 @@ impl EventCache { async fn decrypt_event( &self, room_id: &RoomId, + room: Option<&Room>, + push_context: Option<&PushContext>, event: &Raw, - ) -> Option { - let client = self.inner.client().ok()?; - // TODO: Do we need to use the `Room` object to decrypt these events so we can - // calculate if the event should count as a notification, i.e. get the push - // actions. I thing we do, what happens if the room can't be found? We fallback - // to this? - let machine = client.olm_machine().await; - let machine = machine.as_ref()?; - - match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await { - Ok(decrypted) => Some(decrypted), - Err(e) => { - warn!("Failed to redecrypt an event despite receiving a room key for it {e:?}"); - None + ) -> Option<(DecryptedRoomEvent, Option>)> { + if let Some(room) = room { + match room + .decrypt_event( + event.cast_ref_unchecked::(), + push_context, + ) + .await + { + Ok(maybe_decrypted) => { + let actions = maybe_decrypted.push_actions().map(|a| a.to_vec()); + + if let TimelineEventKind::Decrypted(decrypted) = maybe_decrypted.kind { + Some((decrypted, actions)) + } else { + warn!( + "Failed to redecrypt an event despite receiving a room key or request to redecrypt" + ); + None + } + } + Err(e) => { + warn!( + "Failed to redecrypt an event despite receiving a room key or request to redecrypt {e:?}" + ); + None + } + } + } else { + let client = self.inner.client().ok()?; + let machine = client.olm_machine().await; + let machine = machine.as_ref()?; + + match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await { + Ok(decrypted) => Some((decrypted, None)), + Err(e) => { + warn!( + "Failed to redecrypt an event despite receiving a room key or a request to redecrypt {e:?}" + ); + None + } } } } @@ -298,14 +341,26 @@ impl EventCache { // Get all the relevant UTDs. let events = self.get_utds(room_id, session_id).await?; + let room = self.inner.client().ok().and_then(|client| client.get_room(room_id)); + let push_context = + if let Some(room) = &room { room.push_context().await.ok().flatten() } else { None }; + // Let's attempt to decrypt them them. let mut decrypted_events = Vec::with_capacity(events.len()); for (event_id, event) in events { // If we managed to decrypt the event, and we should have to since we received // the room key for this specific event, then replace the event. - if let Some(decrypted) = self.decrypt_event(room_id, event.cast_ref_unchecked()).await { - decrypted_events.push((event_id, decrypted)); + if let Some((decrypted, actions)) = self + .decrypt_event( + room_id, + room.as_ref(), + push_context.as_ref(), + event.cast_ref_unchecked(), + ) + .await + { + decrypted_events.push((event_id, decrypted, actions)); } } @@ -321,8 +376,13 @@ impl EventCache { room_id: &RoomId, session_id: SessionId<'_>, ) -> Result<(), EventCacheError> { - let client = self.inner.client().ok().unwrap(); - let room = client.get_room(room_id).unwrap(); + let Ok(client) = self.inner.client() else { + return Ok(()); + }; + + let Some(room) = client.get_room(room_id) else { + return Ok(()); + }; // Get all the relevant events. let events = self.get_decrypted_events(room_id, session_id).await?; @@ -339,7 +399,7 @@ impl EventCache { && event.encryption_info != new_encryption_info { event.encryption_info = new_encryption_info; - updated_events.push((event_id, event)); + updated_events.push((event_id, event, None)); } } From 7e9885881517f27a92383550acbbe12105288a13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 16 Oct 2025 12:33:43 +0200 Subject: [PATCH 10/25] feat(redecryptor): More precise logs for the redecryption attempts --- .../matrix-sdk/src/event_cache/redecryptor.rs | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 250c7f8c3e4..f0ec55dbcdb 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -229,6 +229,11 @@ impl EventCache { room_id: &RoomId, events: Vec<(OwnedEventId, DecryptedRoomEvent, Option>)>, ) -> Result<(), EventCacheError> { + if events.is_empty() { + trace!("No events were redecrypted or updated, nothing to replace"); + return Ok(()); + } + // Get the cache for this particular room and lock the state for the duration of // the decryption. let (room_cache, _drop_handles) = self.for_room(room_id).await?; @@ -237,7 +242,6 @@ impl EventCache { let event_ids: BTreeSet<_> = events.iter().cloned().map(|(event_id, _, _)| event_id).collect(); - trace!(?event_ids, "Replacing successfully re-decrypted events"); for (event_id, decrypted, actions) in events { // The event isn't in the cache, nothing to replace. Realistically this can't @@ -330,7 +334,7 @@ impl EventCache { /// Attempt to redecrypt events after a room key with the given session ID /// has been received. - #[instrument(skip_all, fields(room_key_info))] + #[instrument(skip_all, fields(room_id, session_id))] async fn retry_decryption( &self, room_id: &RoomId, @@ -364,6 +368,13 @@ impl EventCache { } } + let event_ids: BTreeSet<_> = + decrypted_events.iter().map(|(event_id, _, _)| event_id).collect(); + + if !event_ids.is_empty() { + trace!(?event_ids, "Successfully redecrypted events"); + } + // Replace the events and notify listeners that UTDs have been replaced with // decrypted events. self.on_resolved_utds(room_id, decrypted_events).await?; @@ -371,6 +382,7 @@ impl EventCache { Ok(()) } + #[instrument(skip_all, fields(room_id, session_id))] async fn update_encryption_info( &self, room_id: &RoomId, @@ -403,6 +415,13 @@ impl EventCache { } } + let event_ids: BTreeSet<_> = + updated_events.iter().map(|(event_id, _, _)| event_id).collect(); + + if !event_ids.is_empty() { + trace!(?event_ids, "Replacing the encryption info of some events"); + } + self.on_resolved_utds(room_id, updated_events).await?; Ok(()) From 4109fddc97935434888c0f7aa419b7664c251824 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 16 Oct 2025 12:33:43 +0200 Subject: [PATCH 11/25] feat(redecryptor): Post-process the events once they are replaced --- crates/matrix-sdk/src/event_cache/redecryptor.rs | 9 +++++++-- crates/matrix-sdk/src/event_cache/room/mod.rs | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index f0ec55dbcdb..7d99d277dbf 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -241,7 +241,7 @@ impl EventCache { let event_ids: BTreeSet<_> = events.iter().cloned().map(|(event_id, _, _)| event_id).collect(); - + let mut new_events = Vec::with_capacity(events.len()); for (event_id, decrypted, actions) in events { // The event isn't in the cache, nothing to replace. Realistically this can't @@ -256,10 +256,15 @@ impl EventCache { // TODO: `replace_event_at()` propagates changes to the store for every event, // we should probably have a bulk version of this? - state.replace_event_at(location, target_event).await? + state.replace_event_at(location, target_event.clone()).await?; + new_events.push(target_event); } } + // TODO: Is this useful? Do I need to call this once per event as well because + // replace event at works on a single event? + state.post_process_new_events(new_events, false).await?; + // We replaced a bunch of events, reactive updates for those replacements have // been queued up. We need to send them out to our subscribers now. let diffs = state.room_linked_chunk_mut().updates_as_vector_diffs(); diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 21f66bf690a..4ea5dcfaba9 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -1439,7 +1439,7 @@ mod private { /// linked chunk. /// /// Flushes updates to disk first. - async fn post_process_new_events( + pub(crate) async fn post_process_new_events( &mut self, events: Vec, is_sync: bool, From 4a519bd5474325ba8db12b6227529dd1ca087f63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 16 Oct 2025 12:33:43 +0200 Subject: [PATCH 12/25] test(redecryptor): More tests for the redecryptor --- .../matrix-sdk/src/event_cache/redecryptor.rs | 207 ++++++++++++++++-- 1 file changed, 194 insertions(+), 13 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 7d99d277dbf..0a49d1a463a 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -650,25 +650,31 @@ impl Redecryptor { #[cfg(not(target_family = "wasm"))] #[cfg(test)] mod tests { - use std::time::Duration; + use std::{collections::BTreeSet, time::Duration}; use assert_matches2::assert_matches; use eyeball_im::VectorDiff; - use matrix_sdk_base::deserialized_responses::TimelineEventKind; + use matrix_sdk_base::deserialized_responses::{TimelineEventKind, VerificationState}; use matrix_sdk_test::{ JoinedRoomBuilder, StateTestEvent, async_test, event_factory::EventFactory, }; - use ruma::{device_id, event_id, room_id, user_id}; + use ruma::{RoomId, device_id, event_id, room_id, user_id}; use serde_json::json; + use tracing::Instrument; use crate::{ - assert_let_timeout, encryption::EncryptionSettings, event_cache::RoomEventCacheUpdate, + Client, assert_let_timeout, + encryption::EncryptionSettings, + event_cache::{DecryptionRetryRequest, RoomEventCacheUpdate}, test_utils::mocks::MatrixMockServer, }; - #[async_test] - async fn test_redecryptor() { - let room_id = room_id!("!test:localhost"); + async fn set_up_clients( + room_id: &RoomId, + alice_enables_cross_signing: bool, + ) -> (Client, Client, MatrixMockServer) { + let alice_span = tracing::info_span!("alice"); + let bob_span = tracing::info_span!("bob"); let alice_user_id = user_id!("@alice:localhost"); let alice_device_id = device_id!("ALICEDEVICE"); @@ -678,8 +684,10 @@ mod tests { let matrix_mock_server = MatrixMockServer::new().await; matrix_mock_server.mock_crypto_endpoints_preset().await; - let encryption_settings = - EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; + let encryption_settings = EncryptionSettings { + auto_enable_cross_signing: alice_enables_cross_signing, + ..Default::default() + }; // Create some clients for Alice and Bob. @@ -691,8 +699,12 @@ mod tests { .with_encryption_settings(encryption_settings) }) .build() + .instrument(alice_span.clone()) .await; + let encryption_settings = + EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; + let bob = matrix_mock_server .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id) .on_builder(|builder| { @@ -701,6 +713,7 @@ mod tests { .with_encryption_settings(encryption_settings) }) .build() + .instrument(bob_span.clone()) .await; bob.event_cache().subscribe().expect("Bob should be able to enable the event cache"); @@ -708,10 +721,6 @@ mod tests { // Ensure that Alice and Bob are aware of their devices and identities. matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await; - let event_factory = EventFactory::new().room(room_id); - let alice_member_event = event_factory.member(alice_user_id).into_raw(); - let bob_member_event = event_factory.member(bob_user_id).into_raw(); - // Let us now create a room for them. let room_builder = JoinedRoomBuilder::new(room_id) .add_state_event(StateTestEvent::Create) @@ -722,6 +731,7 @@ mod tests { .ok_and_run(&alice, |builder| { builder.add_joined_room(room_builder.clone()); }) + .instrument(alice_span) .await; matrix_mock_server @@ -729,8 +739,25 @@ mod tests { .ok_and_run(&bob, |builder| { builder.add_joined_room(room_builder); }) + .instrument(bob_span) .await; + (alice, bob, matrix_mock_server) + } + + #[async_test] + async fn test_redecryptor() { + let room_id = room_id!("!test:localhost"); + + let event_factory = EventFactory::new().room(room_id); + let (alice, bob, matrix_mock_server) = set_up_clients(room_id, true).await; + + let alice_user_id = alice.user_id().unwrap(); + let bob_user_id = bob.user_id().unwrap(); + + let alice_member_event = event_factory.member(alice_user_id).into_raw(); + let bob_member_event = event_factory.member(bob_user_id).into_raw(); + let room = alice .get_room(room_id) .expect("Alice should have access to the room now that we synced"); @@ -828,4 +855,158 @@ mod tests { assert_eq!(*index, 0); assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. }); } + + #[async_test] + async fn test_redecryptor_updating_encryption_info() { + let alice_span = tracing::info_span!("alice"); + let bob_span = tracing::info_span!("bob"); + + let room_id = room_id!("!test:localhost"); + + let event_factory = EventFactory::new().room(room_id); + let (alice, bob, matrix_mock_server) = set_up_clients(room_id, false).await; + + let alice_user_id = alice.user_id().unwrap(); + let bob_user_id = bob.user_id().unwrap(); + + let alice_member_event = event_factory.member(alice_user_id).into_raw(); + let bob_member_event = event_factory.member(bob_user_id).into_raw(); + + let room = alice + .get_room(room_id) + .expect("Alice should have access to the room now that we synced"); + + // Alice will send a single event to the room, but this will trigger a to-device + // message containing the room key to be sent as well. We capture both the event + // and the to-device message. + + let event_type = "m.room.message"; + let content = json!({"body": "It's a secret to everybody", "msgtype": "m.text"}); + + let event_id = event_id!("$some_id"); + let (event_receiver, mock) = + matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id); + let (_guard, room_key) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await; + + { + let _guard = mock.mock_once().mount_as_scoped().await; + + matrix_mock_server + .mock_get_members() + .ok(vec![alice_member_event.clone(), bob_member_event.clone()]) + .mock_once() + .mount() + .await; + + room.send_raw(event_type, content) + .into_future() + .instrument(alice_span.clone()) + .await + .expect("We should be able to send an initial message"); + }; + + // Let's now see what Bob's event cache does. + + let (room_cache, _) = bob + .event_cache() + .for_room(room_id) + .instrument(bob_span.clone()) + .await + .expect("We should be able to get to the event cache for a specific room"); + + let (_, mut subscriber) = room_cache.subscribe().await; + + // Let us retrieve the captured event and to-device message. + let event = event_receiver.await.expect("Alice should have sent the event by now"); + let room_key = room_key.await; + + // Let us forward the event to Bob. + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event)); + }) + .instrument(bob_span.clone()) + .await; + + // Alright, Bob has received an update from the cache. + + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + // There should be a single new event, and it should be a UTD as we did not + // receive the room key yet. + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Append { values }); + assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. }); + + // Now we send the room key to Bob. + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_to_device_event( + room_key + .deserialize_as() + .expect("We should be able to deserialize the room key"), + ); + }) + .instrument(bob_span.clone()) + .await; + + // Bob should receive a new update from the cache. + assert_let_timeout!( + Duration::from_secs(1), + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + // It should replace the UTD with a decrypted event. + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value }); + assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. }); + + let encryption_info = value.encryption_info().unwrap(); + assert_matches!(&encryption_info.verification_state, VerificationState::Unverified(_)); + let session_id = encryption_info.session_id().unwrap().to_owned(); + + // Alice now creates the identity. + alice + .encryption() + .bootstrap_cross_signing(None) + .await + .expect("Alice should be able to create the cross-signing keys"); + + bob.update_tracked_users_for_testing([alice_user_id]).instrument(bob_span.clone()).await; + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_change_device(alice_user_id); + }) + .instrument(bob_span.clone()) + .await; + + bob.event_cache().request_decryption(DecryptionRetryRequest { + room_id: room_id.into(), + utd_session_ids: BTreeSet::new(), + refresh_info_session_ids: BTreeSet::from([session_id]), + }); + + // Bob should again receive a new update from the cache, this time updating the + // encryption info. + assert_let_timeout!( + Duration::from_secs(1), + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value }); + assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. }); + let encryption_info = value.encryption_info().unwrap(); + + assert_matches!( + &encryption_info.verification_state, + VerificationState::Unverified(_), + "The event should now know about the identity but still be unverified" + ); + } } From 4c4cd41457c26c20002c58b0a77b0defda56a8f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 15 Oct 2025 12:00:59 +0200 Subject: [PATCH 13/25] test(timeline): Workarounds to get the timeline tests passing This is necessary because both the timeline and the event cache attempt to redecrypt events currently. This will change once only the event cache handles this task. --- crates/matrix-sdk-ui/src/timeline/tests/encryption.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs b/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs index 6bca21cc42f..6b9dfd7f14c 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs @@ -424,6 +424,17 @@ async fn test_retry_edit_decryption() { assert!(msg.is_edited()); assert_eq!(msg.body(), "This is Error"); + let item = + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 0, value } => value); + + // TODO: We receive this update twice, since the event cache decrypts things as + // well as the timeline. + assert_matches!(item.encryption_info(), Some(_)); + assert_matches!(item.latest_edit_json(), Some(_)); + assert_let!(Some(msg) = item.content().as_message()); + assert!(msg.is_edited()); + assert_eq!(msg.body(), "This is Error"); + // (There are no more items) assert_pending!(stream); } From d7d4730b21cb15a75f5ef28260b8a764b76d4a26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 15 Oct 2025 12:00:59 +0200 Subject: [PATCH 14/25] docs(redecryptor): Document the redecryptor a bit more --- .../matrix-sdk/src/event_cache/redecryptor.rs | 160 +++++++++++++++--- 1 file changed, 135 insertions(+), 25 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 0a49d1a463a..fa0cfccfc28 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -12,16 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! The Redecryptor (Rd) is a layer and long-running background task which -//! handles redecryption of events in case we couldn't decrypt them imediatelly. +//! The Redecryptor (affectionately known as R2D2) is a layer and long-running +//! background task which handles redecryption of events in case we couldn't +//! decrypt them imediatelly. //! -//! Rd listens to the OlmMachine for received room keys and new +//! There are various reasons why a room key might not be available imediatelly +//! when the event becomes available: +//! - The to-device message containing the room key just arrives late, i.e. +//! after the room event. +//! - The event is a historic event and we need to first download the room +//! key from the backup. +//! - The event is a historic event in a previously unjoined room, we need +//! to receive historic room keys as defined in [MSC3061](https://github.com/matrix-org/matrix-spec/pull/1655#issuecomment-2213152255). +//! +//! R2D2 listens to the OlmMachine for received room keys and new //! m.room_key.withheld events. //! //! If a new room key has been received it attempts to find any UTDs in the -//! [`EventCache`]. If Rd decrypts any UTDs from the event cache it will replace -//! the events in the cache and send out new [`RoomEventCacheUpdates`] to any of -//! its listeners. +//! [`EventCache`]. If R2D2 decrypts any UTDs from the event cache it will +//! replace the events in the cache and send out new [`RoomEventCacheUpdates`] +//! to any of its listeners. //! //! If a new withheld info has been received it attempts to find any relevant //! events and updates the [`EncryptionInfo`] of an event. @@ -31,28 +41,49 @@ //! a `None` on the room keys stream and we need to re-listen to it. //! //! Another gotcha is that room keys might be received on another process if the -//! Client is operating on a iOS device. A separate process is used in this case -//! to receive push notifications. In this case the room key will be received -//! and Rd won't get notified about it. To work around this decryption requests -//! can be explicitly sent to Rd. +//! Client is operating on a Apple device. A separate process is used in this +//! case to receive push notifications. In this case the room key will be +//! received and R2D2 won't get notified about it. To work around this +//! decryption requests can be explicitly sent to R2D2. //! +//! ```markdown +//! +//! .----------------------. +//! | | +//! | Beeb, boop! | +//! | . +//! ----------------------._ \ +//! -; _____ +//! .`/L|__`. +//! / =[_]O|` \ +//! |"+_____":| +//! __:='|____`-:__ +//! ||[] ||====|| []|| +//! ||[] ||====|| []|| +//! |:== ||====|| ==:| +//! ||[] ||====|| []|| +//! ||[] ||====|| []|| +//! _||_ ||====|| _||_ +//! (====) |:====:| (====) +//! }--{ | | | | }--{ +//! (____) |_| |_| (____) //! //! ┌─────────────┐ //! │ │ -//! ┌───────────┤ Timeline │◄────────────┐ +//! ┌───────────┤ Timeline │◄────────────┐ //! │ │ │ │ -//! │ └─────▲───────┘ │ -//! │ │ │ -//! │ │ │ -//! │ │ │ -//! Decryption │ Redecryptor -//! request │ report -//! │ RoomEventCacheUpdates │ -//! │ │ │ -//! │ │ │ -//! │ ┌──────────┴────────────┐ │ +//! │ └──────▲──────┘ │ +//! │ │ │ +//! │ │ │ +//! │ │ │ +//! Decryption │ Redecryptor +//! request │ report +//! │ RoomEventCacheUpdates │ +//! │ │ │ +//! │ │ │ +//! │ ┌───────────┴───────────┐ │ //! │ │ │ │ -//! └──────► Redecryptor │────────┘ +//! └──────► R2D2 │────────┘ //! │ │ //! └───────────▲───────────┘ //! │ @@ -67,6 +98,7 @@ //! │ OlmMachine │ //! │ │ //! └──────────────┘ +//! ``` use std::{collections::BTreeSet, pin::Pin, sync::Weak}; @@ -101,6 +133,8 @@ use tokio_stream::wrappers::{ }; use tracing::{info, instrument, trace, warn}; +#[cfg(doc)] +use super::RoomEventCache; use crate::{ Room, event_cache::{ @@ -434,7 +468,44 @@ impl EventCache { /// Explicitly request the redecryption of a set of events. /// - /// TODO: Explain when and why this might be useful. + /// The redecryption logic in the event cache might sometimes miss that a + /// room key has become available and that a certain set of events has + /// become decryptable. + /// + /// This might happen because some room keys might arrive in a separate + /// process handling push notifications or if a room key arrives but the + /// process shuts down before we could have decrypted the events. + /// + /// For this reason it is useful to tell the event cache explicitly that + /// some events should be retried to be redecrypted. + /// + /// This method allows you to do so. The events that get decrypted, if any, + /// will be advertised over the usual event cache subscription mechanism + /// which can be accessed using the [`RoomEventCache::subscribe()`] + /// method. + /// + /// # Examples + /// + /// ```no_run + /// # use matrix_sdk::{Client, event_cache::DecryptionRetryRequest}; + /// # use url::Url; + /// # use ruma::owned_room_id; + /// # use std::collections::BTreeSet; + /// # async { + /// # let homeserver = Url::parse("http://localhost:8080")?; + /// # let client = Client::new(homeserver).await?; + /// let event_cache = client.event_cache(); + /// let room_id = owned_room_id!("!my_room:localhost"); + /// + /// let request = DecryptionRetryRequest { + /// room_id, + /// utd_session_ids: BTreeSet::from(["session_id".into()]), + /// refresh_info_session_ids: BTreeSet::new(), + /// }; + /// + /// event_cache.request_decryption(request); + /// # anyhow::Ok(()) }; + /// ``` pub fn request_decryption(&self, request: DecryptionRetryRequest) { let _ = self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err( @@ -444,8 +515,47 @@ impl EventCache { /// Subscribe to reports that the redecryptor generates. /// - /// TODO: Explain when the redecryptor might send such reports. - pub fn subscrube_to_decryption_reports( + /// The redecryption logic in the event cache might sometimes miss that a + /// room key has become available and that a certain set of events has + /// become decryptable. + /// + /// This might happen because some room keys might arrive in a separate + /// process handling push notifications or if room keys arrive faster than + /// we can handle them. + /// + /// This stream can be used to get notified about such situations as well as + /// a general channel where the event cache reports which events got + /// successfully redecrypted. + /// + /// # Examples + /// + /// ```no_run + /// # use matrix_sdk::{Client, event_cache::RedecryptorReport}; + /// # use url::Url; + /// # use tokio_stream::StreamExt; + /// # async { + /// # let homeserver = Url::parse("http://localhost:8080")?; + /// # let client = Client::new(homeserver).await?; + /// let event_cache = client.event_cache(); + /// + /// let mut stream = event_cache.subscribe_to_decryption_reports(); + /// + /// while let Some(Ok(report)) = stream.next().await { + /// match report { + /// RedecryptorReport::Lagging => { + /// // The event cache might have missed to redecrypt some events. We should tell + /// // it which events we care about, i.e. which events we're displaying to the + /// // user, and let it redecrypt things with an explicit request. + /// } + /// RedecryptorReport::ResolvedUtds { .. } => { + /// // This may be interesting for statistical reasons or in case we'd like to + /// // fetch and inspect these events in some manner. + /// } + /// } + /// } + /// # anyhow::Ok(()) }; + /// ``` + pub fn subscribe_to_decryption_reports( &self, ) -> impl Stream> { BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe()) From d2eab603c1d34a4f0d310183064a2f39071dfd2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 12 Nov 2025 10:12:15 +0100 Subject: [PATCH 15/25] fix(event-cache): Limit the visibility of room_linked_chunk_mut a bit better --- crates/matrix-sdk/src/event_cache/room/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 4ea5dcfaba9..e51bc95fd43 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -1303,7 +1303,7 @@ mod private { /// Returns a mutable reference to the underlying room linked chunk. #[cfg(feature = "e2e-encryption")] - pub(in crate::event_cache) fn room_linked_chunk_mut(&mut self) -> &mut EventLinkedChunk { + pub(in super::super) fn room_linked_chunk_mut(&mut self) -> &mut EventLinkedChunk { &mut self.room_linked_chunk } From 84a21a42d0a54aa73708eb070f97dcb5ce839a9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 12 Nov 2025 10:12:15 +0100 Subject: [PATCH 16/25] fix(event-cache): Don't hold on to the event cache locks as long when fetching events --- crates/matrix-sdk/src/event_cache/redecryptor.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index fa0cfccfc28..6c936995e13 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -219,9 +219,10 @@ impl EventCache { event_id.zip(event) }; - let store = self.inner.store.lock().await?; - let events = - store.get_room_events(room_id, Some("m.room.encrypted"), Some(session_id)).await?; + let events = { + let store = self.inner.store.lock().await?; + store.get_room_events(room_id, Some("m.room.encrypted"), Some(session_id)).await? + }; Ok(events.into_iter().filter_map(filter).collect()) } @@ -240,8 +241,10 @@ impl EventCache { event_id.zip(event) }; - let store = self.inner.store.lock().await?; - let events = store.get_room_events(room_id, None, Some(session_id)).await?; + let events = { + let store = self.inner.store.lock().await?; + store.get_room_events(room_id, None, Some(session_id)).await? + }; Ok(events.into_iter().filter_map(filter).collect()) } From 3ad70623bbc5e26cdc50abed79af206a1445d335 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 12 Nov 2025 10:12:15 +0100 Subject: [PATCH 17/25] chore(redecryptor): Use relative imports more often --- crates/matrix-sdk/src/event_cache/redecryptor.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 6c936995e13..9b8fd06d3b2 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -135,13 +135,8 @@ use tracing::{info, instrument, trace, warn}; #[cfg(doc)] use super::RoomEventCache; -use crate::{ - Room, - event_cache::{ - EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, - }, - room::PushContext, -}; +use super::{EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate}; +use crate::{Room, room::PushContext}; type SessionId<'a> = &'a str; type OwnedSessionId = String; From 717f016f21b867da5465a962c0ef334fe2f21df1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 12 Nov 2025 10:12:15 +0100 Subject: [PATCH 18/25] docs(redecryptor): Add some docs to the Redecryptor struct itself --- crates/matrix-sdk/src/event_cache/redecryptor.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 9b8fd06d3b2..205e3290fde 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -293,8 +293,6 @@ impl EventCache { } } - // TODO: Is this useful? Do I need to call this once per event as well because - // replace event at works on a single event? state.post_process_new_events(new_events, false).await?; // We replaced a bunch of events, reactive updates for those replacements have @@ -560,6 +558,12 @@ impl EventCache { } } +/// Struct holding on to the redecryption task. +/// +/// This struct implements the bulk of the redecryption task. It listens to the +/// various streams that should trigger redecryption attempts. +/// +/// For more info see the [module level docs](self). pub(crate) struct Redecryptor { task: JoinHandle<()>, } From 952c5af07c44f07d8b13a646d31c21b58c69b6c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 12 Nov 2025 10:12:15 +0100 Subject: [PATCH 19/25] chore(redecryptor): Time how long it takes to replace UTDs --- crates/matrix-sdk/src/event_cache/redecryptor.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 205e3290fde..a2ab22d0cb9 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -114,6 +114,7 @@ use matrix_sdk_base::{ }, deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, locks::Mutex, + timer, }; #[cfg(doc)] use matrix_sdk_common::deserialized_responses::EncryptionInfo; @@ -266,6 +267,8 @@ impl EventCache { return Ok(()); } + timer!("Resolving UTDs"); + // Get the cache for this particular room and lock the state for the duration of // the decryption. let (room_cache, _drop_handles) = self.for_room(room_id).await?; From f9c23b361256b84cabc336c65da24cc061d952b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 12 Nov 2025 11:25:24 +0100 Subject: [PATCH 20/25] refactor(redecryptor): Use an abort handle to manage the redecryption task --- crates/matrix-sdk/src/event_cache/redecryptor.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index a2ab22d0cb9..4346787abb1 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -118,7 +118,7 @@ use matrix_sdk_base::{ }; #[cfg(doc)] use matrix_sdk_common::deserialized_responses::EncryptionInfo; -use matrix_sdk_common::executor::{JoinHandle, spawn}; +use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt, spawn}; use ruma::{ OwnedEventId, OwnedRoomId, RoomId, events::{AnySyncTimelineEvent, room::encrypted::OriginalSyncRoomEncryptedEvent}, @@ -568,13 +568,7 @@ impl EventCache { /// /// For more info see the [module level docs](self). pub(crate) struct Redecryptor { - task: JoinHandle<()>, -} - -impl Drop for Redecryptor { - fn drop(&mut self) { - self.task.abort(); - } + _task: AbortOnDrop<()>, } impl Redecryptor { @@ -590,9 +584,10 @@ impl Redecryptor { let request_redecryption_stream = UnboundedReceiverStream::new(receiver); Self::listen_for_room_keys_task(cache, request_redecryption_stream).await; - }); + }) + .abort_on_drop(); - Self { task } + Self { _task: task } } /// (Re)-subscribe to the room key stream from the [`OlmMachine`]. From 38df621b8a594ba348781f91ccd4453f3442df55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 12 Nov 2025 11:25:24 +0100 Subject: [PATCH 21/25] chore(event-cache): Limit the visibility of post_process_new_events --- crates/matrix-sdk/src/event_cache/room/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index e51bc95fd43..800df158413 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -1439,7 +1439,7 @@ mod private { /// linked chunk. /// /// Flushes updates to disk first. - pub(crate) async fn post_process_new_events( + pub(in super::super) async fn post_process_new_events( &mut self, events: Vec, is_sync: bool, From 2e9e9aedd77b4b3019277c30ab6161909ae33822 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 12 Nov 2025 11:25:24 +0100 Subject: [PATCH 22/25] chore(redecryptor): Ensure the upgrade_event_cache method is inlined --- crates/matrix-sdk/src/event_cache/redecryptor.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 4346787abb1..5fcf4b0c334 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -609,6 +609,7 @@ impl Redecryptor { }) } + #[inline(always)] fn upgrade_event_cache(cache: &Weak) -> Option { cache.upgrade().map(|inner| EventCache { inner }) } From 913ebe9fa972d5ce0470e9f938c21054a2eef7a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 12 Nov 2025 11:25:24 +0100 Subject: [PATCH 23/25] docs(redecryptor): Clarify that we're talking about the UI timeline in the r2d2 docs --- crates/matrix-sdk/src/event_cache/redecryptor.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 5fcf4b0c334..fc121760da0 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -46,6 +46,10 @@ //! received and R2D2 won't get notified about it. To work around this //! decryption requests can be explicitly sent to R2D2. //! +//! In the graph bellow the Timeline block is meant to be the `Timeline` from +//! the `matrix-sdk-ui` crate, but it could be any other listener that +//! subscribes to [`RedecryptorReport`] stream. +//! //! ```markdown //! //! .----------------------. @@ -688,8 +692,9 @@ impl Redecryptor { // We missed some room keys, we need to report this in case a listener // has and idea which UTDs we should attempt to redecrypt. // - // This would most likely be the timeline. The timeline might attempt - // to redecrypt all UTDs it is showing to the user. + // This would most likely be the timeline from the UI crate. The + // timeline might attempt to redecrypt all UTDs it is showing to the + // user. let Some(cache) = Self::upgrade_event_cache(cache) else { break false; }; From 0d08ed0758057fd46cf6ecbbde2a7c279e9253c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 12 Nov 2025 11:25:24 +0100 Subject: [PATCH 24/25] refactor(redecryptor): Add some type aliases for the event ID/event tuples --- crates/matrix-sdk/src/event_cache/redecryptor.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index fc121760da0..9debfa6de7a 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -146,6 +146,10 @@ use crate::{Room, room::PushContext}; type SessionId<'a> = &'a str; type OwnedSessionId = String; +type EventIdAndUtd = (OwnedEventId, Raw); +type EventIdAndEvent = (OwnedEventId, DecryptedRoomEvent); +type ResolvedUtd = (OwnedEventId, DecryptedRoomEvent, Option>); + /// The information sent across the channel to the long-running task requesting /// that the supplied set of sessions be retried. #[derive(Debug, Clone)] @@ -206,7 +210,7 @@ impl EventCache { &self, room_id: &RoomId, session_id: SessionId<'_>, - ) -> Result)>, EventCacheError> { + ) -> Result, EventCacheError> { let filter = |event: TimelineEvent| { let event_id = event.event_id(); @@ -231,7 +235,7 @@ impl EventCache { &self, room_id: &RoomId, session_id: SessionId<'_>, - ) -> Result, EventCacheError> { + ) -> Result, EventCacheError> { let filter = |event: TimelineEvent| { let event_id = event.event_id(); @@ -264,7 +268,7 @@ impl EventCache { async fn on_resolved_utds( &self, room_id: &RoomId, - events: Vec<(OwnedEventId, DecryptedRoomEvent, Option>)>, + events: Vec, ) -> Result<(), EventCacheError> { if events.is_empty() { trace!("No events were redecrypted or updated, nothing to replace"); From 9508675acaf489474d56fcc55fb21d5d4b91ad26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 12 Nov 2025 11:25:24 +0100 Subject: [PATCH 25/25] fix(redecryptor): Early return if we don't have any events to process --- crates/matrix-sdk/src/event_cache/redecryptor.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 9debfa6de7a..c5b2323994f 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -391,6 +391,11 @@ impl EventCache { // Get all the relevant UTDs. let events = self.get_utds(room_id, session_id).await?; + if events.is_empty() { + trace!("No relevant events found."); + return Ok(()); + } + let room = self.inner.client().ok().and_then(|client| client.get_room(room_id)); let push_context = if let Some(room) = &room { room.push_context().await.ok().flatten() } else { None }; @@ -434,6 +439,8 @@ impl EventCache { room_id: &RoomId, session_id: SessionId<'_>, ) -> Result<(), EventCacheError> { + trace!("Updating encryption info"); + let Ok(client) = self.inner.client() else { return Ok(()); }; @@ -445,6 +452,11 @@ impl EventCache { // Get all the relevant events. let events = self.get_decrypted_events(room_id, session_id).await?; + if events.is_empty() { + trace!("No relevant events found."); + return Ok(()); + } + // Let's attempt to update their encryption info. let mut updated_events = Vec::with_capacity(events.len());