diff --git a/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs b/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs index 6bca21cc42f..6b9dfd7f14c 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/encryption.rs @@ -424,6 +424,17 @@ async fn test_retry_edit_decryption() { assert!(msg.is_edited()); assert_eq!(msg.body(), "This is Error"); + let item = + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 0, value } => value); + + // TODO: We receive this update twice, since the event cache decrypts things as + // well as the timeline. + assert_matches!(item.encryption_info(), Some(_)); + assert_matches!(item.latest_edit_json(), Some(_)); + assert_let!(Some(msg) = item.content().as_message()); + assert!(msg.is_edited()); + assert_eq!(msg.body(), "This is Error"); + // (There are no more items) assert_pending!(stream); } diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 810e07fa553..50c67c1b7a5 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -74,9 +74,13 @@ use crate::{ mod deduplicator; mod pagination; +#[cfg(feature = "e2e-encryption")] +mod redecryptor; mod room; pub use pagination::{RoomPagination, RoomPaginationStatus}; +#[cfg(feature = "e2e-encryption")] +pub use redecryptor::{DecryptionRetryRequest, RedecryptorReport}; pub use room::{RoomEventCache, RoomEventCacheSubscriber, ThreadEventCacheUpdate}; /// An error observed in the [`EventCache`]. @@ -148,6 +152,10 @@ pub struct EventCacheDropHandles { /// The task used to automatically shrink the linked chunks. auto_shrink_linked_chunk_task: JoinHandle<()>, + + /// The task used to automatically redecrypt UTDs. + #[cfg(feature = "e2e-encryption")] + _redecryptor: redecryptor::Redecryptor, } impl fmt::Debug for EventCacheDropHandles { @@ -200,6 +208,9 @@ impl EventCache { linked_chunk_update_sender.clone(), ))); + #[cfg(feature = "e2e-encryption")] + let redecryption_channels = redecryptor::RedecryptorChannels::new(); + Self { inner: Arc::new(EventCacheInner { client, @@ -213,6 +224,8 @@ impl EventCache { _thread_subscriber_task: thread_subscriber_task, #[cfg(feature = "experimental-search")] _search_indexing_task: search_indexing_task, + #[cfg(feature = "e2e-encryption")] + redecryption_channels, thread_subscriber_receiver, }), } @@ -257,10 +270,26 @@ impl EventCache { auto_shrink_receiver, )); + #[cfg(feature = "e2e-encryption")] + let redecryptor = { + let receiver = self + .inner + .redecryption_channels + .decryption_request_receiver + .lock() + .take() + .expect("We should have initialized the channel an subscribing should happen only once"); + + redecryptor::Redecryptor::new(Arc::downgrade(&self.inner), receiver) + }; + + Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task, auto_shrink_linked_chunk_task, + #[cfg(feature = "e2e-encryption")] + _redecryptor: redecryptor, }) }); @@ -840,6 +869,9 @@ struct EventCacheInner { /// This is helpful for tests to coordinate that a new thread subscription /// has been sent or not. thread_subscriber_receiver: Receiver<()>, + + #[cfg(feature = "e2e-encryption")] + redecryption_channels: redecryptor::RedecryptorChannels, } type AutoShrinkChannelPayload = OwnedRoomId; diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs new file mode 100644 index 00000000000..c5b2323994f --- /dev/null +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -0,0 +1,1144 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The Redecryptor (affectionately known as R2D2) is a layer and long-running +//! background task which handles redecryption of events in case we couldn't +//! decrypt them imediatelly. +//! +//! There are various reasons why a room key might not be available imediatelly +//! when the event becomes available: +//! - The to-device message containing the room key just arrives late, i.e. +//! after the room event. +//! - The event is a historic event and we need to first download the room +//! key from the backup. +//! - The event is a historic event in a previously unjoined room, we need +//! to receive historic room keys as defined in [MSC3061](https://github.com/matrix-org/matrix-spec/pull/1655#issuecomment-2213152255). +//! +//! R2D2 listens to the OlmMachine for received room keys and new +//! m.room_key.withheld events. +//! +//! If a new room key has been received it attempts to find any UTDs in the +//! [`EventCache`]. If R2D2 decrypts any UTDs from the event cache it will +//! replace the events in the cache and send out new [`RoomEventCacheUpdates`] +//! to any of its listeners. +//! +//! If a new withheld info has been received it attempts to find any relevant +//! events and updates the [`EncryptionInfo`] of an event. +//! +//! There's an additional gotcha, the [`OlmMachine`] might get recreated by +//! calls to [`BaseClient::regenerate_olm()`]. When this happens we will receive +//! a `None` on the room keys stream and we need to re-listen to it. +//! +//! Another gotcha is that room keys might be received on another process if the +//! Client is operating on a Apple device. A separate process is used in this +//! case to receive push notifications. In this case the room key will be +//! received and R2D2 won't get notified about it. To work around this +//! decryption requests can be explicitly sent to R2D2. +//! +//! In the graph bellow the Timeline block is meant to be the `Timeline` from +//! the `matrix-sdk-ui` crate, but it could be any other listener that +//! subscribes to [`RedecryptorReport`] stream. +//! +//! ```markdown +//! +//! .----------------------. +//! | | +//! | Beeb, boop! | +//! | . +//! ----------------------._ \ +//! -; _____ +//! .`/L|__`. +//! / =[_]O|` \ +//! |"+_____":| +//! __:='|____`-:__ +//! ||[] ||====|| []|| +//! ||[] ||====|| []|| +//! |:== ||====|| ==:| +//! ||[] ||====|| []|| +//! ||[] ||====|| []|| +//! _||_ ||====|| _||_ +//! (====) |:====:| (====) +//! }--{ | | | | }--{ +//! (____) |_| |_| (____) +//! +//! ┌─────────────┐ +//! │ │ +//! ┌───────────┤ Timeline │◄────────────┐ +//! │ │ │ │ +//! │ └──────▲──────┘ │ +//! │ │ │ +//! │ │ │ +//! │ │ │ +//! Decryption │ Redecryptor +//! request │ report +//! │ RoomEventCacheUpdates │ +//! │ │ │ +//! │ │ │ +//! │ ┌───────────┴───────────┐ │ +//! │ │ │ │ +//! └──────► R2D2 │────────┘ +//! │ │ +//! └───────────▲───────────┘ +//! │ +//! │ +//! │ +//! Received room keys stream +//! │ +//! │ +//! │ +//! ┌───────┴──────┐ +//! │ │ +//! │ OlmMachine │ +//! │ │ +//! └──────────────┘ +//! ``` + +use std::{collections::BTreeSet, pin::Pin, sync::Weak}; + +use as_variant::as_variant; +use futures_core::Stream; +use futures_util::{StreamExt, pin_mut}; +#[cfg(doc)] +use matrix_sdk_base::{BaseClient, crypto::OlmMachine}; +use matrix_sdk_base::{ + crypto::{ + store::types::{RoomKeyInfo, RoomKeyWithheldInfo}, + types::events::room::encrypted::EncryptedEvent, + }, + deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, + locks::Mutex, + timer, +}; +#[cfg(doc)] +use matrix_sdk_common::deserialized_responses::EncryptionInfo; +use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt, spawn}; +use ruma::{ + OwnedEventId, OwnedRoomId, RoomId, + events::{AnySyncTimelineEvent, room::encrypted::OriginalSyncRoomEncryptedEvent}, + push::Action, + serde::Raw, +}; +use tokio::sync::{ + broadcast, + mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, +}; +use tokio_stream::wrappers::{ + BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError, +}; +use tracing::{info, instrument, trace, warn}; + +#[cfg(doc)] +use super::RoomEventCache; +use super::{EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate}; +use crate::{Room, room::PushContext}; + +type SessionId<'a> = &'a str; +type OwnedSessionId = String; + +type EventIdAndUtd = (OwnedEventId, Raw); +type EventIdAndEvent = (OwnedEventId, DecryptedRoomEvent); +type ResolvedUtd = (OwnedEventId, DecryptedRoomEvent, Option>); + +/// The information sent across the channel to the long-running task requesting +/// that the supplied set of sessions be retried. +#[derive(Debug, Clone)] +pub struct DecryptionRetryRequest { + /// The room ID of the room the events belong to. + pub room_id: OwnedRoomId, + /// Events that are not decrypted. + pub utd_session_ids: BTreeSet, + /// Events that are decrypted but might need to have their + /// [`EncryptionInfo`] refreshed. + pub refresh_info_session_ids: BTreeSet, +} + +/// A report coming from the redecryptor. +#[derive(Debug, Clone)] +pub enum RedecryptorReport { + /// Events which we were able to decrypt. + ResolvedUtds { + /// The room ID of the room the events belong to. + room_id: OwnedRoomId, + /// The list of event IDs of the decrypted events. + events: BTreeSet, + }, + /// The redecryptor might have missed some room keys so it might not have + /// re-decrypted events that are now decryptable. + Lagging, +} + +pub(super) struct RedecryptorChannels { + utd_reporter: broadcast::Sender, + pub(super) decryption_request_sender: UnboundedSender, + pub(super) decryption_request_receiver: + Mutex>>, +} + +impl RedecryptorChannels { + pub(super) fn new() -> Self { + let (utd_reporter, _) = broadcast::channel(100); + let (decryption_request_sender, decryption_request_receiver) = unbounded_channel(); + + Self { + utd_reporter, + decryption_request_sender, + decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)), + } + } +} + +impl EventCache { + /// Retrieve a set of events that we weren't able to decrypt. + /// + /// # Arguments + /// + /// * `room_id` - The ID of the room where the events were sent to. + /// * `session_id` - The unique ID of the room key that was used to encrypt + /// the event. + async fn get_utds( + &self, + room_id: &RoomId, + session_id: SessionId<'_>, + ) -> Result, EventCacheError> { + let filter = |event: TimelineEvent| { + let event_id = event.event_id(); + + // Only pick out events that are UTDs, get just the Raw event as this is what + // the OlmMachine needs. + let event = + as_variant!(event.kind, TimelineEventKind::UnableToDecrypt { event, .. } => event); + // Zip the event ID and event together so we don't have to pick out the event ID + // again. We need the event ID to replace the event in the cache. + event_id.zip(event) + }; + + let events = { + let store = self.inner.store.lock().await?; + store.get_room_events(room_id, Some("m.room.encrypted"), Some(session_id)).await? + }; + + Ok(events.into_iter().filter_map(filter).collect()) + } + + async fn get_decrypted_events( + &self, + room_id: &RoomId, + session_id: SessionId<'_>, + ) -> Result, EventCacheError> { + let filter = |event: TimelineEvent| { + let event_id = event.event_id(); + + let event = as_variant!(event.kind, TimelineEventKind::Decrypted(event) => event); + // Zip the event ID and event together so we don't have to pick out the event ID + // again. We need the event ID to replace the event in the cache. + event_id.zip(event) + }; + + let events = { + let store = self.inner.store.lock().await?; + store.get_room_events(room_id, None, Some(session_id)).await? + }; + + Ok(events.into_iter().filter_map(filter).collect()) + } + + /// Handle a chunk of events that we were previously unable to decrypt but + /// have now successfully decrypted. + /// + /// This function will replace the existing UTD events in memory and the + /// store and send out a [`RoomEventCacheUpdate`] for the newly + /// decrypted events. + /// + /// # Arguments + /// + /// * `room_id` - The ID of the room where the events were sent to. + /// * `events` - A chunk of events that were successfully decrypted. + #[instrument(skip_all, fields(room_id))] + async fn on_resolved_utds( + &self, + room_id: &RoomId, + events: Vec, + ) -> Result<(), EventCacheError> { + if events.is_empty() { + trace!("No events were redecrypted or updated, nothing to replace"); + return Ok(()); + } + + timer!("Resolving UTDs"); + + // Get the cache for this particular room and lock the state for the duration of + // the decryption. + let (room_cache, _drop_handles) = self.for_room(room_id).await?; + let mut state = room_cache.inner.state.write().await; + + let event_ids: BTreeSet<_> = + events.iter().cloned().map(|(event_id, _, _)| event_id).collect(); + let mut new_events = Vec::with_capacity(events.len()); + + for (event_id, decrypted, actions) in events { + // The event isn't in the cache, nothing to replace. Realistically this can't + // happen since we retrieved the list of events from the cache itself and + // `find_event()` will look into the store as well. + if let Some((location, mut target_event)) = state.find_event(&event_id).await? { + target_event.kind = TimelineEventKind::Decrypted(decrypted); + + if let Some(actions) = actions { + target_event.set_push_actions(actions); + } + + // TODO: `replace_event_at()` propagates changes to the store for every event, + // we should probably have a bulk version of this? + state.replace_event_at(location, target_event.clone()).await?; + new_events.push(target_event); + } + } + + state.post_process_new_events(new_events, false).await?; + + // We replaced a bunch of events, reactive updates for those replacements have + // been queued up. We need to send them out to our subscribers now. + let diffs = state.room_linked_chunk_mut().updates_as_vector_diffs(); + + let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs, + origin: EventsOrigin::Cache, + }); + + // We report that we resolved some UTDs, this is mainly for listeners that don't + // care about the actual events, just about the fact that UTDs got + // resolved. Not sure if we'll have more such listeners but the UTD hook + // is one such thing. + let report = + RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids }; + let _ = self.inner.redecryption_channels.utd_reporter.send(report); + + Ok(()) + } + + /// Attempt to decrypt a single event. + async fn decrypt_event( + &self, + room_id: &RoomId, + room: Option<&Room>, + push_context: Option<&PushContext>, + event: &Raw, + ) -> Option<(DecryptedRoomEvent, Option>)> { + if let Some(room) = room { + match room + .decrypt_event( + event.cast_ref_unchecked::(), + push_context, + ) + .await + { + Ok(maybe_decrypted) => { + let actions = maybe_decrypted.push_actions().map(|a| a.to_vec()); + + if let TimelineEventKind::Decrypted(decrypted) = maybe_decrypted.kind { + Some((decrypted, actions)) + } else { + warn!( + "Failed to redecrypt an event despite receiving a room key or request to redecrypt" + ); + None + } + } + Err(e) => { + warn!( + "Failed to redecrypt an event despite receiving a room key or request to redecrypt {e:?}" + ); + None + } + } + } else { + let client = self.inner.client().ok()?; + let machine = client.olm_machine().await; + let machine = machine.as_ref()?; + + match machine.decrypt_room_event(event, room_id, client.decryption_settings()).await { + Ok(decrypted) => Some((decrypted, None)), + Err(e) => { + warn!( + "Failed to redecrypt an event despite receiving a room key or a request to redecrypt {e:?}" + ); + None + } + } + } + } + + /// Attempt to redecrypt events after a room key with the given session ID + /// has been received. + #[instrument(skip_all, fields(room_id, session_id))] + async fn retry_decryption( + &self, + room_id: &RoomId, + session_id: SessionId<'_>, + ) -> Result<(), EventCacheError> { + trace!("Retrying to decrypt"); + + // Get all the relevant UTDs. + let events = self.get_utds(room_id, session_id).await?; + + if events.is_empty() { + trace!("No relevant events found."); + return Ok(()); + } + + let room = self.inner.client().ok().and_then(|client| client.get_room(room_id)); + let push_context = + if let Some(room) = &room { room.push_context().await.ok().flatten() } else { None }; + + // Let's attempt to decrypt them them. + let mut decrypted_events = Vec::with_capacity(events.len()); + + for (event_id, event) in events { + // If we managed to decrypt the event, and we should have to since we received + // the room key for this specific event, then replace the event. + if let Some((decrypted, actions)) = self + .decrypt_event( + room_id, + room.as_ref(), + push_context.as_ref(), + event.cast_ref_unchecked(), + ) + .await + { + decrypted_events.push((event_id, decrypted, actions)); + } + } + + let event_ids: BTreeSet<_> = + decrypted_events.iter().map(|(event_id, _, _)| event_id).collect(); + + if !event_ids.is_empty() { + trace!(?event_ids, "Successfully redecrypted events"); + } + + // Replace the events and notify listeners that UTDs have been replaced with + // decrypted events. + self.on_resolved_utds(room_id, decrypted_events).await?; + + Ok(()) + } + + #[instrument(skip_all, fields(room_id, session_id))] + async fn update_encryption_info( + &self, + room_id: &RoomId, + session_id: SessionId<'_>, + ) -> Result<(), EventCacheError> { + trace!("Updating encryption info"); + + let Ok(client) = self.inner.client() else { + return Ok(()); + }; + + let Some(room) = client.get_room(room_id) else { + return Ok(()); + }; + + // Get all the relevant events. + let events = self.get_decrypted_events(room_id, session_id).await?; + + if events.is_empty() { + trace!("No relevant events found."); + return Ok(()); + } + + // Let's attempt to update their encryption info. + let mut updated_events = Vec::with_capacity(events.len()); + + for (event_id, mut event) in events { + let new_encryption_info = + room.get_encryption_info(session_id, &event.encryption_info.sender).await; + + // Only create a replacement if the encryption info actually changed. + if let Some(new_encryption_info) = new_encryption_info + && event.encryption_info != new_encryption_info + { + event.encryption_info = new_encryption_info; + updated_events.push((event_id, event, None)); + } + } + + let event_ids: BTreeSet<_> = + updated_events.iter().map(|(event_id, _, _)| event_id).collect(); + + if !event_ids.is_empty() { + trace!(?event_ids, "Replacing the encryption info of some events"); + } + + self.on_resolved_utds(room_id, updated_events).await?; + + Ok(()) + } + + /// Explicitly request the redecryption of a set of events. + /// + /// The redecryption logic in the event cache might sometimes miss that a + /// room key has become available and that a certain set of events has + /// become decryptable. + /// + /// This might happen because some room keys might arrive in a separate + /// process handling push notifications or if a room key arrives but the + /// process shuts down before we could have decrypted the events. + /// + /// For this reason it is useful to tell the event cache explicitly that + /// some events should be retried to be redecrypted. + /// + /// This method allows you to do so. The events that get decrypted, if any, + /// will be advertised over the usual event cache subscription mechanism + /// which can be accessed using the [`RoomEventCache::subscribe()`] + /// method. + /// + /// # Examples + /// + /// ```no_run + /// # use matrix_sdk::{Client, event_cache::DecryptionRetryRequest}; + /// # use url::Url; + /// # use ruma::owned_room_id; + /// # use std::collections::BTreeSet; + /// # async { + /// # let homeserver = Url::parse("http://localhost:8080")?; + /// # let client = Client::new(homeserver).await?; + /// let event_cache = client.event_cache(); + /// let room_id = owned_room_id!("!my_room:localhost"); + /// + /// let request = DecryptionRetryRequest { + /// room_id, + /// utd_session_ids: BTreeSet::from(["session_id".into()]), + /// refresh_info_session_ids: BTreeSet::new(), + /// }; + /// + /// event_cache.request_decryption(request); + /// # anyhow::Ok(()) }; + /// ``` + pub fn request_decryption(&self, request: DecryptionRetryRequest) { + let _ = + self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err( + |_| warn!("Requesting a decryption while the redecryption task has been shut down"), + ); + } + + /// Subscribe to reports that the redecryptor generates. + /// + /// The redecryption logic in the event cache might sometimes miss that a + /// room key has become available and that a certain set of events has + /// become decryptable. + /// + /// This might happen because some room keys might arrive in a separate + /// process handling push notifications or if room keys arrive faster than + /// we can handle them. + /// + /// This stream can be used to get notified about such situations as well as + /// a general channel where the event cache reports which events got + /// successfully redecrypted. + /// + /// # Examples + /// + /// ```no_run + /// # use matrix_sdk::{Client, event_cache::RedecryptorReport}; + /// # use url::Url; + /// # use tokio_stream::StreamExt; + /// # async { + /// # let homeserver = Url::parse("http://localhost:8080")?; + /// # let client = Client::new(homeserver).await?; + /// let event_cache = client.event_cache(); + /// + /// let mut stream = event_cache.subscribe_to_decryption_reports(); + /// + /// while let Some(Ok(report)) = stream.next().await { + /// match report { + /// RedecryptorReport::Lagging => { + /// // The event cache might have missed to redecrypt some events. We should tell + /// // it which events we care about, i.e. which events we're displaying to the + /// // user, and let it redecrypt things with an explicit request. + /// } + /// RedecryptorReport::ResolvedUtds { .. } => { + /// // This may be interesting for statistical reasons or in case we'd like to + /// // fetch and inspect these events in some manner. + /// } + /// } + /// } + /// # anyhow::Ok(()) }; + /// ``` + pub fn subscribe_to_decryption_reports( + &self, + ) -> impl Stream> { + BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe()) + } +} + +/// Struct holding on to the redecryption task. +/// +/// This struct implements the bulk of the redecryption task. It listens to the +/// various streams that should trigger redecryption attempts. +/// +/// For more info see the [module level docs](self). +pub(crate) struct Redecryptor { + _task: AbortOnDrop<()>, +} + +impl Redecryptor { + /// Create a new [`Redecryptor`]. + /// + /// This creates a task that listens to various streams and attempts to + /// redecrypt UTDs that can be found inside the [`EventCache`]. + pub(super) fn new( + cache: Weak, + receiver: UnboundedReceiver, + ) -> Self { + let task = spawn(async { + let request_redecryption_stream = UnboundedReceiverStream::new(receiver); + + Self::listen_for_room_keys_task(cache, request_redecryption_stream).await; + }) + .abort_on_drop(); + + Self { _task: task } + } + + /// (Re)-subscribe to the room key stream from the [`OlmMachine`]. + /// + /// This needs to happen any time this stream returns a `None` meaning that + /// the sending part of the stream has been dropped. + async fn subscribe_to_room_key_stream( + cache: &Weak, + ) -> Option<( + impl Stream, BroadcastStreamRecvError>>, + impl Stream>, + )> { + let event_cache = cache.upgrade()?; + let client = event_cache.client().ok()?; + let machine = client.olm_machine().await; + + machine.as_ref().map(|m| { + (m.store().room_keys_received_stream(), m.store().room_keys_withheld_received_stream()) + }) + } + + #[inline(always)] + fn upgrade_event_cache(cache: &Weak) -> Option { + cache.upgrade().map(|inner| EventCache { inner }) + } + + async fn redecryption_loop( + cache: &Weak, + decryption_request_stream: &mut Pin<&mut impl Stream>, + ) -> bool { + let Some((room_key_stream, withheld_stream)) = + Self::subscribe_to_room_key_stream(cache).await + else { + return false; + }; + + pin_mut!(room_key_stream); + pin_mut!(withheld_stream); + + loop { + tokio::select! { + // An explicit request, presumably from the timeline, has been received to decrypt + // events that were encrypted with a certain room key. + Some(request) = decryption_request_stream.next() => { + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + trace!(?request, "Received a redecryption request"); + + for session_id in request.utd_session_ids { + let _ = cache + .retry_decryption(&request.room_id, &session_id) + .await + .inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}")); + } + + for session_id in request.refresh_info_session_ids { + let _ = cache.update_encryption_info(&request.room_id, &session_id).await.inspect_err(|e| + warn!( + room_id = %request.room_id, + session_id = session_id, + "Unable to update the encryption info {e:?}", + )); + } + } + // The room key stream from the OlmMachine. Needs to be recreated every time we + // receive a `None` from the stream. + room_keys = room_key_stream.next() => { + match room_keys { + Some(Ok(room_keys)) => { + // Alright, some room keys were received and persisted in our store, + // let's attempt to redecrypt events that were encrypted using these + // room keys. + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + trace!(?room_keys, "Received new room keys"); + + for key in &room_keys { + let _ = cache + .retry_decryption(&key.room_id, &key.session_id) + .await + .inspect_err(|e| warn!("Error redecrypting {e:?}")); + } + + for key in room_keys { + let _ = cache.update_encryption_info(&key.room_id, &key.session_id).await.inspect_err(|e| + warn!( + room_id = %key.room_id, + session_id = key.session_id, + "Unable to update the encryption info {e:?}", + )); + } + }, + Some(Err(_)) => { + // We missed some room keys, we need to report this in case a listener + // has and idea which UTDs we should attempt to redecrypt. + // + // This would most likely be the timeline from the UI crate. The + // timeline might attempt to redecrypt all UTDs it is showing to the + // user. + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + let message = RedecryptorReport::Lagging; + let _ = cache.inner.redecryption_channels.utd_reporter.send(message); + }, + // The stream got closed, this could mean that our OlmMachine got + // regenerated, let's return true and try to recreate the stream. + None => { + break true + } + } + } + withheld_info = withheld_stream.next() => { + match withheld_info { + Some(infos) => { + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + trace!(?infos, "Received new withheld infos"); + + for RoomKeyWithheldInfo { room_id, session_id, .. } in &infos { + let _ = cache.update_encryption_info(room_id, session_id).await.inspect_err(|e| + warn!( + room_id = %room_id, + session_id = session_id, + "Unable to update the encryption info {e:?}", + )); + } + } + // The stream got closed, same as for the room key stream, we'll try to + // recreate the streams. + None => break true + } + } + else => break false, + } + } + } + + async fn listen_for_room_keys_task( + cache: Weak, + decryption_request_stream: UnboundedReceiverStream, + ) { + // We pin the decryption request stream here since that one doesn't need to be + // recreated and we don't want to miss messages coming from the stream + // while recreating it unnecessarily. + pin_mut!(decryption_request_stream); + + while Self::redecryption_loop(&cache, &mut decryption_request_stream).await { + info!("Regenerating the re-decryption streams"); + + let Some(cache) = Self::upgrade_event_cache(&cache) else { + break; + }; + + // Report that the stream got recreated so listeners can attempt to redecrypt + // any UTDs they might be seeing. + let message = RedecryptorReport::Lagging; + let _ = cache.inner.redecryption_channels.utd_reporter.send(message); + } + + info!("Shutting down the event cache redecryptor"); + } +} + +#[cfg(not(target_family = "wasm"))] +#[cfg(test)] +mod tests { + use std::{collections::BTreeSet, time::Duration}; + + use assert_matches2::assert_matches; + use eyeball_im::VectorDiff; + use matrix_sdk_base::deserialized_responses::{TimelineEventKind, VerificationState}; + use matrix_sdk_test::{ + JoinedRoomBuilder, StateTestEvent, async_test, event_factory::EventFactory, + }; + use ruma::{RoomId, device_id, event_id, room_id, user_id}; + use serde_json::json; + use tracing::Instrument; + + use crate::{ + Client, assert_let_timeout, + encryption::EncryptionSettings, + event_cache::{DecryptionRetryRequest, RoomEventCacheUpdate}, + test_utils::mocks::MatrixMockServer, + }; + + async fn set_up_clients( + room_id: &RoomId, + alice_enables_cross_signing: bool, + ) -> (Client, Client, MatrixMockServer) { + let alice_span = tracing::info_span!("alice"); + let bob_span = tracing::info_span!("bob"); + + let alice_user_id = user_id!("@alice:localhost"); + let alice_device_id = device_id!("ALICEDEVICE"); + let bob_user_id = user_id!("@bob:localhost"); + let bob_device_id = device_id!("BOBDEVICE"); + + let matrix_mock_server = MatrixMockServer::new().await; + matrix_mock_server.mock_crypto_endpoints_preset().await; + + let encryption_settings = EncryptionSettings { + auto_enable_cross_signing: alice_enables_cross_signing, + ..Default::default() + }; + + // Create some clients for Alice and Bob. + + let alice = matrix_mock_server + .client_builder_for_crypto_end_to_end(alice_user_id, alice_device_id) + .on_builder(|builder| { + builder + .with_enable_share_history_on_invite(true) + .with_encryption_settings(encryption_settings) + }) + .build() + .instrument(alice_span.clone()) + .await; + + let encryption_settings = + EncryptionSettings { auto_enable_cross_signing: true, ..Default::default() }; + + let bob = matrix_mock_server + .client_builder_for_crypto_end_to_end(bob_user_id, bob_device_id) + .on_builder(|builder| { + builder + .with_enable_share_history_on_invite(true) + .with_encryption_settings(encryption_settings) + }) + .build() + .instrument(bob_span.clone()) + .await; + + bob.event_cache().subscribe().expect("Bob should be able to enable the event cache"); + + // Ensure that Alice and Bob are aware of their devices and identities. + matrix_mock_server.exchange_e2ee_identities(&alice, &bob).await; + + // Let us now create a room for them. + let room_builder = JoinedRoomBuilder::new(room_id) + .add_state_event(StateTestEvent::Create) + .add_state_event(StateTestEvent::Encryption); + + matrix_mock_server + .mock_sync() + .ok_and_run(&alice, |builder| { + builder.add_joined_room(room_builder.clone()); + }) + .instrument(alice_span) + .await; + + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_joined_room(room_builder); + }) + .instrument(bob_span) + .await; + + (alice, bob, matrix_mock_server) + } + + #[async_test] + async fn test_redecryptor() { + let room_id = room_id!("!test:localhost"); + + let event_factory = EventFactory::new().room(room_id); + let (alice, bob, matrix_mock_server) = set_up_clients(room_id, true).await; + + let alice_user_id = alice.user_id().unwrap(); + let bob_user_id = bob.user_id().unwrap(); + + let alice_member_event = event_factory.member(alice_user_id).into_raw(); + let bob_member_event = event_factory.member(bob_user_id).into_raw(); + + let room = alice + .get_room(room_id) + .expect("Alice should have access to the room now that we synced"); + + // Alice will send a single event to the room, but this will trigger a to-device + // message containing the room key to be sent as well. We capture both the event + // and the to-device message. + + let event_type = "m.room.message"; + let content = json!({"body": "It's a secret to everybody", "msgtype": "m.text"}); + + let event_id = event_id!("$some_id"); + let (event_receiver, mock) = + matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id); + let (_guard, room_key) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await; + + { + let _guard = mock.mock_once().mount_as_scoped().await; + + matrix_mock_server + .mock_get_members() + .ok(vec![alice_member_event.clone(), bob_member_event.clone()]) + .mock_once() + .mount() + .await; + + room.send_raw(event_type, content) + .await + .expect("We should be able to send an initial message"); + }; + + // Let's now see what Bob's event cache does. + + let (room_cache, _) = bob + .event_cache() + .for_room(room_id) + .await + .expect("We should be able to get to the event cache for a specific room"); + + let (_, mut subscriber) = room_cache.subscribe().await; + + // Let us retrieve the captured event and to-device message. + let event = event_receiver.await.expect("Alice should have sent the event by now"); + let room_key = room_key.await; + + // We regenerate the Olm machine to check if the room key stream is recreated to + // correctly. + bob.inner + .base_client + .regenerate_olm(None) + .await + .expect("We should be able to regenerate the Olm machine"); + + // Let us forward the event to Bob. + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event)); + }) + .await; + + // Alright, Bob has received an update from the cache. + + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + // There should be a single new event, and it should be a UTD as we did not + // receive the room key yet. + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Append { values }); + assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. }); + + // Now we send the room key to Bob. + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_to_device_event( + room_key + .deserialize_as() + .expect("We should be able to deserialize the room key"), + ); + }) + .await; + + // Bob should receive a new update from the cache. + assert_let_timeout!( + Duration::from_secs(1), + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + // It should replace the UTD with a decrypted event. + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Set { index, value }); + assert_eq!(*index, 0); + assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. }); + } + + #[async_test] + async fn test_redecryptor_updating_encryption_info() { + let alice_span = tracing::info_span!("alice"); + let bob_span = tracing::info_span!("bob"); + + let room_id = room_id!("!test:localhost"); + + let event_factory = EventFactory::new().room(room_id); + let (alice, bob, matrix_mock_server) = set_up_clients(room_id, false).await; + + let alice_user_id = alice.user_id().unwrap(); + let bob_user_id = bob.user_id().unwrap(); + + let alice_member_event = event_factory.member(alice_user_id).into_raw(); + let bob_member_event = event_factory.member(bob_user_id).into_raw(); + + let room = alice + .get_room(room_id) + .expect("Alice should have access to the room now that we synced"); + + // Alice will send a single event to the room, but this will trigger a to-device + // message containing the room key to be sent as well. We capture both the event + // and the to-device message. + + let event_type = "m.room.message"; + let content = json!({"body": "It's a secret to everybody", "msgtype": "m.text"}); + + let event_id = event_id!("$some_id"); + let (event_receiver, mock) = + matrix_mock_server.mock_room_send().ok_with_capture(event_id, alice_user_id); + let (_guard, room_key) = matrix_mock_server.mock_capture_put_to_device(alice_user_id).await; + + { + let _guard = mock.mock_once().mount_as_scoped().await; + + matrix_mock_server + .mock_get_members() + .ok(vec![alice_member_event.clone(), bob_member_event.clone()]) + .mock_once() + .mount() + .await; + + room.send_raw(event_type, content) + .into_future() + .instrument(alice_span.clone()) + .await + .expect("We should be able to send an initial message"); + }; + + // Let's now see what Bob's event cache does. + + let (room_cache, _) = bob + .event_cache() + .for_room(room_id) + .instrument(bob_span.clone()) + .await + .expect("We should be able to get to the event cache for a specific room"); + + let (_, mut subscriber) = room_cache.subscribe().await; + + // Let us retrieve the captured event and to-device message. + let event = event_receiver.await.expect("Alice should have sent the event by now"); + let room_key = room_key.await; + + // Let us forward the event to Bob. + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(event)); + }) + .instrument(bob_span.clone()) + .await; + + // Alright, Bob has received an update from the cache. + + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + // There should be a single new event, and it should be a UTD as we did not + // receive the room key yet. + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Append { values }); + assert_matches!(&values[0].kind, TimelineEventKind::UnableToDecrypt { .. }); + + // Now we send the room key to Bob. + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_to_device_event( + room_key + .deserialize_as() + .expect("We should be able to deserialize the room key"), + ); + }) + .instrument(bob_span.clone()) + .await; + + // Bob should receive a new update from the cache. + assert_let_timeout!( + Duration::from_secs(1), + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + // It should replace the UTD with a decrypted event. + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value }); + assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. }); + + let encryption_info = value.encryption_info().unwrap(); + assert_matches!(&encryption_info.verification_state, VerificationState::Unverified(_)); + let session_id = encryption_info.session_id().unwrap().to_owned(); + + // Alice now creates the identity. + alice + .encryption() + .bootstrap_cross_signing(None) + .await + .expect("Alice should be able to create the cross-signing keys"); + + bob.update_tracked_users_for_testing([alice_user_id]).instrument(bob_span.clone()).await; + matrix_mock_server + .mock_sync() + .ok_and_run(&bob, |builder| { + builder.add_change_device(alice_user_id); + }) + .instrument(bob_span.clone()) + .await; + + bob.event_cache().request_decryption(DecryptionRetryRequest { + room_id: room_id.into(), + utd_session_ids: BTreeSet::new(), + refresh_info_session_ids: BTreeSet::from([session_id]), + }); + + // Bob should again receive a new update from the cache, this time updating the + // encryption info. + assert_let_timeout!( + Duration::from_secs(1), + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Set { index: 0, value }); + assert_matches!(&value.kind, TimelineEventKind::Decrypted { .. }); + let encryption_info = value.encryption_info().unwrap(); + + assert_matches!( + &encryption_info.verification_state, + VerificationState::Unverified(_), + "The event should now know about the identity but still be unverified" + ); + } +} diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 78a14c1b16d..800df158413 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -1301,6 +1301,12 @@ mod private { &self.room_linked_chunk } + /// Returns a mutable reference to the underlying room linked chunk. + #[cfg(feature = "e2e-encryption")] + pub(in super::super) fn room_linked_chunk_mut(&mut self) -> &mut EventLinkedChunk { + &mut self.room_linked_chunk + } + //// Find a single event in this room, starting from the most recent event. /// /// **Warning**! It looks into the loaded events from the in-memory @@ -1433,7 +1439,7 @@ mod private { /// linked chunk. /// /// Flushes updates to disk first. - async fn post_process_new_events( + pub(in super::super) async fn post_process_new_events( &mut self, events: Vec, is_sync: bool, @@ -1591,7 +1597,7 @@ mod private { /// observers that a single item has been replaced. Otherwise, /// such a notification is not emitted, because observers are /// unlikely to observe the store updates directly. - async fn replace_event_at( + pub(crate) async fn replace_event_at( &mut self, location: EventLocation, event: Event,