1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15- //! The REDECRYPTOR is a layer that handles redecryption of events in case we
16- //! couldn't decrypt them imediatelly
15+ //! The Redecryptor (Rd) is a layer and long-running background task which
16+ //! handles redecryption of events in case we couldn't decrypt them imediatelly.
17+ //!
18+ //! Rd listens to the OlmMachine for received room keys. If a new room key has
19+ //! been received it attempts to find any UTDs in the [`EventCache`]. If Rd
20+ //! decrypts any UTDs from the event cache it will replace the events in the
21+ //! cache and send out new [`RoomEventCacheUpdates`] to any of its listeners.
22+ //!
23+ //! There's an additional gotcha, the [`OlmMachine`] might get recreated by
24+ //! calls to [`BaseClient::regenerate_olm()`]. When this happens we will receive
25+ //! a `None` on the room keys stream and we need to re-listen to it.
26+ //!
27+ //! Another gotcha is that room keys might be received on another process if the
28+ //! Client is operating on a iOS device. A separate process is used in this case
29+ //! to receive push notifications. In this case the room key will be received
30+ //! and Rd won't get notified about it. To work around this decryption requests
31+ //! can be explicitly sent to Rd.
32+ //!
33+ //! ┌─────────────┐
34+ //! │ │
35+ //! ┌───────────┤ Timeline │
36+ //! │ │ │
37+ //! │ └─────▲───────┘
38+ //! │ │
39+ //! │ │
40+ //! │ │
41+ //! Decryption │
42+ //! request │
43+ //! │ RoomEventCacheUpdates
44+ //! │ │
45+ //! │ │
46+ //! │ ┌──────────┴────────────┐
47+ //! │ │ │
48+ //! └──────► Redecryptor │
49+ //! │ │
50+ //! └───────────▲───────────┘
51+ //! │
52+ //! │
53+ //! │
54+ //! Received room keys stream
55+ //! │
56+ //! │
57+ //! │
58+ //! ┌───────┴──────┐
59+ //! │ │
60+ //! │ OlmMachine │
61+ //! │ │
62+ //! └──────────────┘
1763
1864use std:: { collections:: BTreeSet , pin:: Pin , sync:: Weak } ;
1965
2066use as_variant:: as_variant;
2167use futures_core:: Stream ;
2268use futures_util:: { StreamExt , pin_mut} ;
69+ #[ cfg( doc) ]
70+ use matrix_sdk_base:: { BaseClient , crypto:: OlmMachine } ;
2371use matrix_sdk_base:: {
2472 crypto:: { store:: types:: RoomKeyInfo , types:: events:: room:: encrypted:: EncryptedEvent } ,
2573 deserialized_responses:: { DecryptedRoomEvent , TimelineEvent , TimelineEventKind } ,
@@ -44,6 +92,13 @@ pub struct DecryptionRetryRequest {
4492type SessionId < ' a > = & ' a str ;
4593
4694impl EventCache {
95+ /// Retrieve a set of events that we weren't able to decrypt.
96+ ///
97+ /// # Arguments
98+ ///
99+ /// * `room_id` - The ID of the room where the events were sent to.
100+ /// * `session_id` - The unique ID of the room key that was used to encrypt
101+ /// the event.
47102 async fn get_utds (
48103 & self ,
49104 room_id : & RoomId ,
@@ -76,6 +131,17 @@ impl EventCache {
76131 Ok ( events. into_iter ( ) . filter_map ( filter_non_utds) . collect ( ) )
77132 }
78133
134+ /// Handle a chunk of events that we were previously unable to decrypt but
135+ /// have now successfully decrypted.
136+ ///
137+ /// This function will replace the existing UTD events in memory and the
138+ /// store and send out a [`RoomEventCacheUpdate`] for the newly
139+ /// decrypted events.
140+ ///
141+ /// # Arguments
142+ ///
143+ /// * `room_id` - The ID of the room where the events were sent to.
144+ /// * `events` - A chunk of events that were successfully decrypted.
79145 #[ instrument( skip_all, fields( room_id) ) ]
80146 async fn on_resolved_utds (
81147 & self ,
@@ -113,12 +179,17 @@ impl EventCache {
113179 Ok ( ( ) )
114180 }
115181
182+ /// Attempt to decrypt a single event.
116183 async fn decrypt_event (
117184 & self ,
118185 room_id : & RoomId ,
119186 event : & Raw < EncryptedEvent > ,
120187 ) -> Option < DecryptedRoomEvent > {
121188 let client = self . inner . client ( ) . ok ( ) ?;
189+ // TODO: Do we need to use the `Room` object to decrypt these events so we can
190+ // calculate if the event should count as a notification, i.e. get the push
191+ // actions. I thing we do, what happens if the room can't be found? We fallback
192+ // to this?
122193 let machine = client. olm_machine ( ) . await ;
123194 let machine = machine. as_ref ( ) ?;
124195
@@ -141,7 +212,10 @@ impl EventCache {
141212 ) -> Result < ( ) , EventCacheError > {
142213 trace ! ( "Retrying to decrypt" ) ;
143214
215+ // Get all the relevant UTDs.
144216 let events = self . get_utds ( room_id, session_id) . await ?;
217+
218+ // Let's attempt to decrypt them them.
145219 let mut decrypted_events = Vec :: with_capacity ( events. len ( ) ) ;
146220
147221 for ( event_id, event) in events {
@@ -152,6 +226,8 @@ impl EventCache {
152226 }
153227 }
154228
229+ // Replace the events and notify listeners that UTDs have been replaced with
230+ // decrypted events.
155231 self . on_resolved_utds ( room_id, decrypted_events) . await ?;
156232
157233 Ok ( ( ) )
@@ -170,6 +246,10 @@ impl Drop for Redecryptor {
170246}
171247
172248impl Redecryptor {
249+ /// Create a new [`Redecryptor`].
250+ ///
251+ /// This creates a task that listens to various streams and attempts to
252+ /// redecrypt UTDs that can be found inside the [`EventCache`].
173253 pub ( super ) fn new ( cache : Weak < EventCacheInner > ) -> Self {
174254 let ( request_decryption_sender, receiver) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
175255
@@ -189,6 +269,10 @@ impl Redecryptor {
189269 } ) ;
190270 }
191271
272+ /// (Re)-subscribe to the room key stream from the [`OlmMachine`].
273+ ///
274+ /// This needs to happen any time this stream returns a `None` meaning that
275+ /// the sending part of the stream has been dropped.
192276 async fn subscribe_to_room_key_stream (
193277 cache : & Weak < EventCacheInner > ,
194278 ) -> Option < impl Stream < Item = Result < Vec < RoomKeyInfo > , BroadcastStreamRecvError > > > {
@@ -215,6 +299,8 @@ impl Redecryptor {
215299
216300 loop {
217301 tokio:: select! {
302+ // An explicit request, presumably from the timeline, has been received to decrypt
303+ // events that were encrypted with a certain room key.
218304 Some ( request) = decryption_request_stream. next( ) => {
219305 let Some ( cache) = Self :: upgrade_event_cache( cache) else {
220306 break false ;
@@ -227,6 +313,8 @@ impl Redecryptor {
227313 . inspect_err( |e| warn!( "Error redecrypting {e:?}" ) ) ;
228314 }
229315 }
316+ // The room key stream from the OlmMachine. Needs to be recreated every time we
317+ // receive a `None` from the stream.
230318 room_keys = room_key_stream. next( ) => {
231319 match room_keys {
232320 Some ( Ok ( room_keys) ) => {
@@ -244,6 +332,8 @@ impl Redecryptor {
244332 Some ( Err ( _) ) => {
245333 todo!( "Handle lagging here, how?" )
246334 } ,
335+ // The stream got closed, this could mean that our OlmMachine got
336+ // regenerated, let's return true and try to recreate the stream.
247337 None => {
248338 break true
249339 }
@@ -258,6 +348,9 @@ impl Redecryptor {
258348 cache : Weak < EventCacheInner > ,
259349 decryption_request_stream : UnboundedReceiverStream < DecryptionRetryRequest > ,
260350 ) {
351+ // We pin the decryption request stream here since that one doesn't need to be
352+ // recreated and we don't want to miss messages coming from the stream
353+ // while recreating it unnecessarily.
261354 pin_mut ! ( decryption_request_stream) ;
262355
263356 while Self :: redecryption_loop ( & cache, & mut decryption_request_stream) . await {
0 commit comments